文章

30 · Redis 实战:五大数据结构与缓存模式

#038 · 2026-04-17 · Python

🔗 知识图谱导航:阅读本文前,建议先掌握《16 · 数据库底座:SQLite 核心操作》中的数据持久化概念——Redis 是内存数据库,和 SQLite 的核心区别是"内存 vs 磁盘"和"数据结构丰富 vs 关系型"。本文用纯 Python 实现 MockRedis,零依赖演示所有核心特性。

运行环境:Python 3.12+ 标准库,零额外依赖,直接运行。

极客解析:Redis 的核心价值是"用内存换速度"——典型场景下读写延迟更低,代价是数据量受内存限制。五大数据结构让 Redis 不只是缓存,还能做消息队列(List)、去重(Set)、排行榜(ZSet)、对象存储(Hash)。

Redis 五大数据结构速查

String   set/get/incr/decr/expire    最基础,存字符串/数字/JSON
List     lpush/rpush/lpop/lrange     双端队列,消息队列/最新列表
Set      sadd/smembers/sismember     无序集合,去重/共同好友
Hash     hset/hget/hgetall           字段映射,对象存储
ZSet     zadd/zrange/zrangebyscore   有序集合,排行榜/延迟队列

缓存三大问题

缓存穿透:查询不存在的 key,每次都打到数据库
  解决:缓存空值(set key "" ex 60)或布隆过滤器

缓存击穿:热点 key 过期,大量请求同时打到数据库
  解决:互斥锁(setnx)或永不过期 + 异步更新

缓存雪崩:大量 key 同时过期,数据库被打垮
  解决:过期时间加随机抖动(ttl + random.randint(0, 300))

步步为营:核心逻辑自适应拆解

这一篇的核心是 Redis 的两层结构:五大数据结构(String/List/Set/Hash/ZSet)+ 缓存模式(LRU 淘汰 + TTL + 三大问题防护)。下面每一步都聚焦一个机制,零依赖可直接运行。

Step 1:用 MockRedis 演示 String/List 两种基础结构

痛点与机制

MockRedis 用 Python 内置容器模拟 Redis。第一步先吃下最容易理解的两个结构:String 像一个键值便利贴,set/get 就是贴上和读取;List 像排队窗口,lpush 从队伍左边塞任务,rpop 从右边取任务,正好可以模拟“生产者放任务、消费者拿任务”。

核心源码(逐字来自文末完整源码)

    # String
    def set(self, key: str, value: Any, ex: Optional[int] = None) -> None:
        expire_at = time.time() + ex if ex else None
        self._strings[key] = (value, expire_at)

    def get(self, key: str) -> Optional[Any]:
        entry = self._strings.get(key)
        if entry is None:
            return None
        val, expire_at = entry
        if self._is_expired(expire_at):
            del self._strings[key]
            return None
        return val

    def incr(self, key: str) -> int:
        val = int(self.get(key) or 0) + 1
        self.set(key, val)
        return val

    def ttl(self, key: str) -> int:
        entry = self._strings.get(key)
        if entry is None:
            return -2
        _, expire_at = entry
        if expire_at is None:
            return -1
        remaining = int(expire_at - time.time())
        return max(remaining, 0)

    # List
    def lpush(self, key: str, *values) -> int:
        lst = self._lists.setdefault(key, [])
        for v in values:
            lst.insert(0, v)
        return len(lst)

    def rpop(self, key: str) -> Optional[Any]:
        lst = self._lists.get(key, [])
        return lst.pop() if lst else None

    def lrange(self, key: str, start: int, end: int) -> list:
        lst = self._lists.get(key, [])
        return lst[start: end + 1 if end != -1 else None]

可运行演示(补齐 Mock 数据与 print 反馈)

import time
from typing import Any, Optional


class TinyRedis:
    def __init__(self) -> None:
        # String 用字典保存:key -> (value, expire_at)。
        self._strings: dict[str, tuple[Any, Optional[float]]] = {}
        # List 用 Python list 模拟,左进右出就像一个任务队列。
        self._lists: dict[str, list[str]] = {}

    def set(self, key: str, value: Any, ex: Optional[int] = None) -> None:
        expire_at = time.time() + ex if ex else None
        self._strings[key] = (value, expire_at)

    def get(self, key: str) -> Optional[Any]:
        item = self._strings.get(key)
        if item is None:
            return None
        value, expire_at = item
        if expire_at is not None and time.time() > expire_at:
            del self._strings[key]
            return None
        return value

    def lpush(self, key: str, *values: str) -> int:
        queue = self._lists.setdefault(key, [])
        for value in values:
            queue.insert(0, value)  # 左侧插入,新任务放到队首。
        return len(queue)

    def rpop(self, key: str) -> Optional[str]:
        queue = self._lists.get(key, [])
        return queue.pop() if queue else None  # 右侧弹出,模拟消费者取走最早任务。


r = TinyRedis()
r.set("user:1:name", "alice")
r.lpush("task:queue", "发邮件", "生成报表", "清理缓存")
print("String 读取:", r.get("user:1:name"))
print("List 消费顺序:", r.rpop("task:queue"), "->", r.rpop("task:queue"), "->", r.rpop("task:queue"))

Step 2:用 MockRedis 演示 Hash/Set/ZSet 三种进阶结构

痛点与机制

Hash 像一张用户资料卡,适合把 name/role/login_count 这种字段放在同一个 key 下面;Set 像自动去重的标签袋,同一个标签放两次也只算一次;ZSet 像自带分数的排行榜,每个成员都有 score,Redis 会按分数排序。

核心源码(逐字来自文末完整源码)

    # Hash
    def hset(self, key: str, field: str, value: Any) -> None:
        self._hashes.setdefault(key, {})[field] = value

    def hget(self, key: str, field: str) -> Optional[Any]:
        return self._hashes.get(key, {}).get(field)

    def hgetall(self, key: str) -> dict:
        return dict(self._hashes.get(key, {}))

    # Set
    def sadd(self, key: str, *members) -> int:
        s = self._sets.setdefault(key, set())
        before = len(s)
        s.update(members)
        return len(s) - before

    def sismember(self, key: str, member: Any) -> bool:
        return member in self._sets.get(key, set())

    def smembers(self, key: str) -> set:
        return set(self._sets.get(key, set()))

    def sinter(self, *keys: str) -> set:
        sets = [self._sets.get(k, set()) for k in keys]
        return set.intersection(*sets) if sets else set()

    # ZSet
    def zadd(self, key: str, mapping: dict[str, float]) -> int:
        zs = self._zsets.setdefault(key, {})
        added = sum(1 for m in mapping if m not in zs)
        zs.update(mapping)
        return added

    def zrange(self, key: str, start: int, end: int, withscores: bool = False):
        zs = self._zsets.get(key, {})
        sorted_members = sorted(zs.items(), key=lambda x: x[1])
        sliced = sorted_members[start: end + 1 if end != -1 else None]
        return sliced if withscores else [m for m, _ in sliced]

    def zrevrank(self, key: str, member: str) -> Optional[int]:
        zs = self._zsets.get(key, {})
        sorted_members = sorted(zs.keys(), key=lambda m: -zs[m])
        try:
            return sorted_members.index(member)
        except ValueError:
            return None

可运行演示(补齐 Mock 数据与 print 反馈)

from typing import Any, Optional


class TinyRedis:
    def __init__(self) -> None:
        self._hashes: dict[str, dict[str, Any]] = {}
        self._sets: dict[str, set[str]] = {}
        self._zsets: dict[str, dict[str, float]] = {}

    def hset(self, key: str, field: str, value: Any) -> None:
        self._hashes.setdefault(key, {})[field] = value

    def hgetall(self, key: str) -> dict[str, Any]:
        return dict(self._hashes.get(key, {}))

    def sadd(self, key: str, *members: str) -> int:
        values = self._sets.setdefault(key, set())
        before = len(values)
        values.update(members)  # set 天生去重,重复成员不会增加数量。
        return len(values) - before

    def smembers(self, key: str) -> set[str]:
        return set(self._sets.get(key, set()))

    def zadd(self, key: str, mapping: dict[str, float]) -> int:
        board = self._zsets.setdefault(key, {})
        added = sum(1 for member in mapping if member not in board)
        board.update(mapping)
        return added

    def zrange(self, key: str, start: int, end: int, withscores: bool = False):
        items = sorted(self._zsets.get(key, {}).items(), key=lambda item: item[1])
        sliced = items[start: end + 1 if end != -1 else None]
        return sliced if withscores else [member for member, _ in sliced]


r = TinyRedis()
r.hset("user:1", "name", "alice")
r.hset("user:1", "role", "admin")
r.sadd("tags", "python", "redis", "python")
r.zadd("rank", {"alice": 98, "bob": 88, "carol": 95})
print("Hash 用户卡片:", r.hgetall("user:1"))
print("Set 标签去重:", sorted(r.smembers("tags")))
print("ZSet 排行榜:", list(reversed(r.zrange("rank", 0, -1, withscores=True))))

Step 3:用 TTL 和 incr 演示 Redis 的原子操作

痛点与机制

set(key, value, ex=N) 设置 N 秒后过期,MockRedisget 时检查过期时间,过期返回 Noneincr 是原子自增——在 Redis 里,incr 是单线程执行的,不会有并发问题,适合做计数器、限流、分布式 ID。MockRedisint(self.get(key)) + 1 模拟,真实 Redis 的 incr 是 C 语言实现的原子操作。

核心源码(逐字来自文末完整源码)

    def set(self, key: str, value: Any, ex: Optional[int] = None) -> None:
        expire_at = time.time() + ex if ex else None
        self._strings[key] = (value, expire_at)

    def get(self, key: str) -> Optional[Any]:
        entry = self._strings.get(key)
        if entry is None:
            return None
        val, expire_at = entry
        if self._is_expired(expire_at):
            del self._strings[key]
            return None
        return val

    def incr(self, key: str) -> int:
        val = int(self.get(key) or 0) + 1
        self.set(key, val)
        return val

    def ttl(self, key: str) -> int:
        entry = self._strings.get(key)
        if entry is None:
            return -2
        _, expire_at = entry
        if expire_at is None:
            return -1
        remaining = int(expire_at - time.time())
        return max(remaining, 0)

可运行演示(补齐 Mock 数据与 print 反馈)

import time
from typing import Any, Optional


class CounterRedis:
    def __init__(self) -> None:
        self._strings: dict[str, tuple[Any, Optional[float]]] = {}

    def set(self, key: str, value: Any, ex: Optional[int] = None) -> None:
        expire_at = time.time() + ex if ex else None
        self._strings[key] = (value, expire_at)

    def get(self, key: str) -> Optional[Any]:
        item = self._strings.get(key)
        if item is None:
            return None
        value, expire_at = item
        if expire_at is not None and time.time() > expire_at:
            del self._strings[key]
            return None
        return value

    def incr(self, key: str) -> int:
        value = int(self.get(key) or 0) + 1
        self.set(key, value)
        return value

    def ttl(self, key: str) -> int:
        item = self._strings.get(key)
        if item is None:
            return -2  # Redis 语义:key 不存在。
        _, expire_at = item
        if expire_at is None:
            return -1  # Redis 语义:key 存在但没有过期时间。
        return max(int(expire_at - time.time()), 0)


r = CounterRedis()
r.set("session:token", "abc123", ex=2)
print("刚写入:", r.get("session:token"), "TTL≈", r.ttl("session:token"), "秒")
print("计数器:", r.incr("page:views"), r.incr("page:views"), r.incr("page:views"))
time.sleep(2.1)
print("过期后:", r.get("session:token"), "TTL=", r.ttl("session:token"))

Step 4:用 LRUCache 手动实现 Redis 的 LRU 淘汰策略

痛点与机制

LRUCacheOrderedDict 实现 LRU(Least Recently Used)淘汰策略:get 时把 key 移到末尾(最近使用),set 时如果容量满了,删除第一个 key(最久未使用)。OrderedDict.move_to_end(key) 是 O(1) 操作,popitem(last=False) 删除第一个元素也是 O(1)。Redis 的 maxmemory-policy lru 就是这个原理,只是用了近似 LRU 算法(随机采样)来降低开销。

核心源码(逐字来自文末完整源码)

class LRUCache:
    """带 TTL 过期和 LRU 驱逐的内存缓存"""

    def __init__(self, capacity: int = 100):
        self.capacity = capacity
        self._cache: OrderedDict[str, tuple[Any, Optional[float]]] = OrderedDict()
        self._lock = threading.Lock()
        self.hits = 0
        self.misses = 0

    def get(self, key: str) -> Optional[Any]:
        with self._lock:
            if key not in self._cache:
                self.misses += 1
                return None
            val, expire_at = self._cache[key]
            if expire_at and time.time() > expire_at:
                del self._cache[key]
                self.misses += 1
                return None
            self._cache.move_to_end(key)  # LRU:访问后移到末尾
            self.hits += 1
            return val

    def set(self, key: str, value: Any, ttl: Optional[int] = None) -> None:
        with self._lock:
            expire_at = time.time() + ttl if ttl else None
            if key in self._cache:
                self._cache.move_to_end(key)
            self._cache[key] = (value, expire_at)
            if len(self._cache) > self.capacity:
                self._cache.popitem(last=False)  # 驱逐最久未使用

    def stats(self) -> dict:
        total = self.hits + self.misses
        return {
            "size": len(self._cache),
            "hits": self.hits,
            "misses": self.misses,
            "hit_rate": f"{self.hits/total*100:.1f}%" if total else "0%",
        }

可运行演示(补齐 Mock 数据与 print 反馈)

import time
from collections import OrderedDict
from typing import Any, Optional


class LRUCache:
    def __init__(self, capacity: int = 3) -> None:
        self.capacity = capacity
        self._cache: OrderedDict[str, tuple[Any, Optional[float]]] = OrderedDict()

    def get(self, key: str) -> Optional[Any]:
        if key not in self._cache:
            return None
        value, expire_at = self._cache[key]
        if expire_at and time.time() > expire_at:
            del self._cache[key]
            return None
        self._cache.move_to_end(key)  # 被访问的 key 移到队尾,表示“刚用过”。
        return value

    def set(self, key: str, value: Any, ttl: Optional[int] = None) -> None:
        expire_at = time.time() + ttl if ttl else None
        if key in self._cache:
            self._cache.move_to_end(key)
        self._cache[key] = (value, expire_at)
        if len(self._cache) > self.capacity:
            removed_key, _ = self._cache.popitem(last=False)  # 队首就是最久没用的。
            print("容量满了,淘汰:", removed_key)


cache = LRUCache(capacity=3)
for key in ["A", "B", "C"]:
    cache.set(key, key.lower())
print("初始缓存:", list(cache._cache.keys()))
cache.get("A")
print("访问 A 后:", list(cache._cache.keys()))
cache.set("D", "d")
print("加入 D 后:", list(cache._cache.keys()))

Step 5:用 mode_structures 演示五大数据结构的完整操作

痛点与机制

mode_structuresascii_table 格式化展示五大数据结构的操作结果,让读者一眼看清每种数据结构的特点。这个演示覆盖了 Redis 最常用的命令:set/get/incr(String)、lpush/lrange(List)、sadd/smembers(Set)、hset/hgetall(Hash)、zadd/zrange(ZSet)。

核心源码(逐字来自文末完整源码)

def mode_structures(_: argparse.Namespace) -> None:
    print(f"=== 五大数据结构演示  [{now_str()}] ===\n")
    r = MockRedis()

    # String:计数器
    for _ in range(5):
        r.incr("page:views")
    r.set("product:1001", "机械键盘 Pro X", ex=3600)
    print(f"[String] 页面访问量: {r.get('page:views')}")
    print(f"[String] 商品缓存: {r.get('product:1001')}  TTL={r.ttl('product:1001')}s")

    # List:任务队列
    for task in ["task-A", "task-B", "task-C"]:
        r.lpush("task:queue", task)
    print(f"\n[List] 队列: {r.lrange('task:queue', 0, -1)}")
    print(f"[List] 消费: {r.rpop('task:queue')}")

    # Hash:对象存储
    r.hset("user:1001", "name", "赵工")
    r.hset("user:1001", "role", "admin")
    r.hset("user:1001", "login_count", 42)
    print(f"\n[Hash] 用户信息: {r.hgetall('user:1001')}")

    # Set:标签去重
    r.sadd("product:1001:tags", "办公", "游戏", "机械")
    r.sadd("product:1002:tags", "办公", "设计", "4K")
    common = r.sinter("product:1001:tags", "product:1002:tags")
    print(f"\n[Set] 商品1标签: {r.smembers('product:1001:tags')}")
    print(f"[Set] 共同标签: {common}")

    # ZSet:排行榜
    r.zadd("leaderboard", {"Alice": 9800, "Bob": 8500, "Carol": 9200, "Dave": 7100})
    top3 = r.zrange("leaderboard", 0, 2, withscores=True)
    rows = [[i+1, m, int(s)] for i, (m, s) in enumerate(reversed(top3))]
    print(f"\n[ZSet] 排行榜 Top3:")
    print(ascii_table(["排名", "用户", "积分"], rows, title="积分排行榜"))

可运行演示(补齐 Mock 数据与 print 反馈)

from typing import Any


def ascii_table(headers: list[str], rows: list[list[Any]], title: str = "") -> str:
    widths = [len(h) for h in headers]
    for row in rows:
        for i, cell in enumerate(row):
            widths[i] = max(widths[i], len(str(cell)))
    sep = "+" + "+".join("-" * (w + 2) for w in widths) + "+"
    fmt = "|" + "|".join(f" {{:<{w}}} " for w in widths) + "|"
    lines = [sep]
    if title:
        lines.append("|" + title.center(len(sep) - 2) + "|")
        lines.append(sep)
    lines.append(fmt.format(*headers))
    lines.append(sep)
    for row in rows:
        lines.append(fmt.format(*[str(x) for x in row]))
    lines.append(sep)
    return "\n".join(lines)


def mode_structures(_: object) -> None:
    rows = [
        ["String", "SET/GET/INCR", "验证码、计数器"],
        ["List", "LPUSH/RPOP", "任务队列"],
        ["Set", "SADD/SMEMBERS", "标签去重"],
        ["Hash", "HSET/HGETALL", "用户资料卡"],
        ["ZSet", "ZADD/ZRANGE", "积分排行榜"],
    ]
    print(ascii_table(["结构", "常用命令", "适合场景"], rows, title="Redis 五大结构"))


mode_structures(None)

Step 6:用 mode_cache 演示 LRU + TTL 缓存层

痛点与机制

mode_cache 演示 LRUCache 作为应用层缓存的完整流程:先查缓存,命中直接返回;未命中则查"数据库"(Mock),写入缓存后返回。hit_rate 统计缓存命中率,让读者看到缓存的效果。TTL 演示了缓存过期后自动失效,下次访问会重新从数据库加载。

核心源码(逐字来自文末完整源码)

def mode_cache(_: argparse.Namespace) -> None:
    print(f"=== LRU + TTL 缓存层演示  [{now_str()}] ===\n")
    cache = LRUCache(capacity=5)

    # 模拟数据库查询(慢)
    db_calls = 0
    def get_product(pid: int) -> dict:
        nonlocal db_calls
        key = f"product:{pid}"
        cached = cache.get(key)
        if cached:
            return cached
        # 缓存未命中,查数据库
        db_calls += 1
        time.sleep(0.01)  # 模拟 10ms 数据库延迟
        data = {"id": pid, "name": f"商品-{pid}", "price": pid * 99.0}
        cache.set(key, data, ttl=60)
        return data

    # 第一轮:全部未命中
    print("第一轮查询(冷启动):")
    for pid in range(1, 6):
        get_product(pid)

    # 第二轮:全部命中缓存
    print("第二轮查询(缓存预热后):")
    for pid in range(1, 6):
        get_product(pid)

    # 第三轮:触发 LRU 驱逐
    print("第三轮:新增商品触发 LRU 驱逐:")
    for pid in range(6, 9):
        get_product(pid)

    s = cache.stats()
    rows = [
        ["缓存容量", str(cache.capacity)],
        ["当前大小", str(s["size"])],
        ["命中次数", str(s["hits"])],
        ["未命中次数", str(s["misses"])],
        ["命中率", s["hit_rate"]],
        ["数据库调用次数", str(db_calls)],
    ]
    print("\n" + ascii_table(["指标", "值"], rows, title="缓存层统计"))

可运行演示(补齐 Mock 数据与 print 反馈)

from collections import OrderedDict
from typing import Any, Optional


class TinyCache:
    def __init__(self, capacity: int = 2) -> None:
        self.capacity = capacity
        self._cache: OrderedDict[str, Any] = OrderedDict()
        self.hits = 0
        self.misses = 0

    def get(self, key: str) -> Optional[Any]:
        if key not in self._cache:
            self.misses += 1
            return None
        self._cache.move_to_end(key)
        self.hits += 1
        return self._cache[key]

    def set(self, key: str, value: Any) -> None:
        self._cache[key] = value
        self._cache.move_to_end(key)
        if len(self._cache) > self.capacity:
            self._cache.popitem(last=False)


def mode_cache(_: object) -> None:
    cache = TinyCache(capacity=2)
    db_calls = 0

    def get_product(pid: int) -> dict[str, Any]:
        nonlocal db_calls
        key = f"product:{pid}"
        cached = cache.get(key)
        if cached is not None:
            print(f"命中缓存: {key}")
            return cached
        db_calls += 1
        data = {"id": pid, "name": f"商品-{pid}"}
        cache.set(key, data)
        print(f"未命中,查数据库: {key}")
        return data

    for pid in [1, 2, 1, 3, 2]:
        get_product(pid)
    print(f"数据库调用 {db_calls} 次,命中 {cache.hits} 次,未命中 {cache.misses} 次")


mode_cache(None)

Step 7:用 mode_patterns 演示缓存穿透/击穿/雪崩防护

痛点与机制

mode_patterns 演示三大缓存问题的防护模式:缓存穿透用"缓存空值"解决(查不到也缓存 None,避免每次都打数据库);缓存击穿用"互斥锁"解决(setnx 保证只有一个请求去数据库);缓存雪崩用"随机 TTL 抖动"解决(ttl + random.randint(0, 300) 让过期时间分散)。

核心源码(逐字来自文末完整源码)

def mode_patterns(_: argparse.Namespace) -> None:
    print(f"=== 缓存三大问题演示  [{now_str()}] ===\n")
    rows = [
        ["缓存穿透", "查询 id=-1 等不存在数据", "每次都打到DB", "缓存空值 + 布隆过滤器"],
        ["缓存雪崩", "大量key同时过期", "DB瞬间压力暴增", "过期时间加随机抖动"],
        ["缓存击穿", "热点key过期瞬间", "大量并发打到DB", "SETNX互斥锁 + 逻辑过期"],
    ]
    print(ascii_table(
        ["问题", "触发场景", "现象", "解决方案"],
        rows, title="缓存三大问题"
    ))

    # 演示随机抖动防雪崩
    print("\n随机抖动 TTL 示例(防雪崩):")
    base_ttl = 3600
    jitter_rows = []
    for i in range(5):
        jitter = random.randint(0, 300)
        actual_ttl = base_ttl + jitter
        jitter_rows.append([f"key:{i}", f"{base_ttl}s", f"+{jitter}s", f"{actual_ttl}s"])
    print(ascii_table(["键名", "基础TTL", "抖动", "实际TTL"], jitter_rows, title="TTL 随机抖动"))

可运行演示(补齐 Mock 数据与 print 反馈)

import random


def mode_patterns(_: object) -> None:
    problems = [
        ("缓存穿透", "查不存在的数据", "缓存空值或布隆过滤器"),
        ("缓存击穿", "热点 key 突然过期", "互斥锁或逻辑过期"),
        ("缓存雪崩", "大量 key 同时过期", "TTL 加随机抖动"),
    ]
    print("缓存三大问题:")
    for name, scene, solution in problems:
        print(f"- {name}: {scene} -> {solution}")

    print("\nTTL 随机抖动示例:")
    base_ttl = 3600
    random.seed(7)
    for i in range(3):
        jitter = random.randint(0, 300)
        print(f"key:{i} 实际 TTL = {base_ttl} + {jitter} = {base_ttl + jitter}s")


mode_patterns(None)

Step 8:用 main 做 structures/cache/patterns 三种模式的 CLI 总入口

痛点与机制

mainargparse 做 CLI 入口,三种模式对应三个学习层次:structures 看数据结构,cache 看缓存层,patterns 看防护模式。用字典分发替代 if/elif 链,是 Python 里处理多分支的惯用写法。

核心源码(逐字来自文末完整源码)

def main() -> None:
    p = argparse.ArgumentParser(description="Redis 数据结构与缓存层演示")
    p.add_argument("--mode", choices=["structures", "cache", "patterns"], default="structures")
    args = p.parse_args()
    {"structures": mode_structures, "cache": mode_cache, "patterns": mode_patterns}[args.mode](args)

if __name__ == "__main__":
    main()

可运行演示(补齐 Mock 数据与 print 反馈)

import argparse


def mode_structures(_: argparse.Namespace) -> None:
    print("运行 structures:学习五大数据结构")


def mode_cache(_: argparse.Namespace) -> None:
    print("运行 cache:学习 LRU + TTL 缓存层")


def mode_patterns(_: argparse.Namespace) -> None:
    print("运行 patterns:学习缓存穿透、击穿、雪崩")


def main() -> None:
    p = argparse.ArgumentParser(description="Redis 数据结构与缓存层演示")
    p.add_argument("--mode", choices=["structures", "cache", "patterns"], default="structures")
    args = p.parse_args()
    {"structures": mode_structures, "cache": mode_cache, "patterns": mode_patterns}[args.mode](args)


for mode in ["structures", "cache", "patterns"]:
    print(f">>> python3 30-python-redis.py --mode {mode}")
    {"structures": mode_structures, "cache": mode_cache, "patterns": mode_patterns}[mode](argparse.Namespace(mode=mode))

极客实战:完整源码与运行

现在,把上面的积木拼起来,将以下完整代码放进你的编辑器,运行它。先看整体闭环,再回头逐段改参数,你会更容易建立工程直觉。

#!/usr/bin/env python3
"""
30-python-redis.py — Redis 数据结构与缓存层演示(零外部依赖)

用法:
  python3 30-python-redis.py --mode structures  # 五大数据结构演示
  python3 30-python-redis.py --mode cache       # TTL + LRU 缓存层演示
  python3 30-python-redis.py --mode patterns    # 缓存穿透/雪崩/击穿演示
"""

import argparse
import random
import threading
import time
from collections import OrderedDict
from datetime import datetime
from typing import Any, Optional
from zoneinfo import ZoneInfo

TZ = ZoneInfo("Asia/Shanghai")

def now_str() -> str:
    return datetime.now(TZ).strftime("%Y-%m-%d %H:%M:%S")

def ascii_table(headers: list[str], rows: list[list[Any]], title: str = "") -> str:
    col_w = [len(h) for h in headers]
    for row in rows:
        for i, cell in enumerate(row):
            col_w[i] = max(col_w[i], len(str(cell)))
    sep = "+" + "+".join("-" * (w + 2) for w in col_w) + "+"
    fmt = "|" + "|".join(f" {{:<{w}}} " for w in col_w) + "|"
    lines = []
    if title:
        total = sum(col_w) + 3 * len(col_w) + 1
        lines += [sep, f"|{title.center(total - 2)}|"]
    lines += [sep, fmt.format(*headers), sep]
    for row in rows:
        lines.append(fmt.format(*[str(c) for c in row]))
    lines.append(sep)
    return "\n".join(lines)


class MockRedis:
    """用 Python 内置类型模拟 Redis 五大数据结构"""

    def __init__(self):
        self._strings: dict[str, tuple[Any, Optional[float]]] = {}  # val, expire_at
        self._lists: dict[str, list] = {}
        self._hashes: dict[str, dict] = {}
        self._sets: dict[str, set] = {}
        self._zsets: dict[str, dict[str, float]] = {}  # member -> score

    def _is_expired(self, expire_at: Optional[float]) -> bool:
        return expire_at is not None and time.time() > expire_at

    # String
    def set(self, key: str, value: Any, ex: Optional[int] = None) -> None:
        expire_at = time.time() + ex if ex else None
        self._strings[key] = (value, expire_at)

    def get(self, key: str) -> Optional[Any]:
        entry = self._strings.get(key)
        if entry is None:
            return None
        val, expire_at = entry
        if self._is_expired(expire_at):
            del self._strings[key]
            return None
        return val

    def incr(self, key: str) -> int:
        val = int(self.get(key) or 0) + 1
        self.set(key, val)
        return val

    def ttl(self, key: str) -> int:
        entry = self._strings.get(key)
        if entry is None:
            return -2
        _, expire_at = entry
        if expire_at is None:
            return -1
        remaining = int(expire_at - time.time())
        return max(remaining, 0)

    # List
    def lpush(self, key: str, *values) -> int:
        lst = self._lists.setdefault(key, [])
        for v in values:
            lst.insert(0, v)
        return len(lst)

    def rpop(self, key: str) -> Optional[Any]:
        lst = self._lists.get(key, [])
        return lst.pop() if lst else None

    def lrange(self, key: str, start: int, end: int) -> list:
        lst = self._lists.get(key, [])
        return lst[start: end + 1 if end != -1 else None]

    # Hash
    def hset(self, key: str, field: str, value: Any) -> None:
        self._hashes.setdefault(key, {})[field] = value

    def hget(self, key: str, field: str) -> Optional[Any]:
        return self._hashes.get(key, {}).get(field)

    def hgetall(self, key: str) -> dict:
        return dict(self._hashes.get(key, {}))

    # Set
    def sadd(self, key: str, *members) -> int:
        s = self._sets.setdefault(key, set())
        before = len(s)
        s.update(members)
        return len(s) - before

    def sismember(self, key: str, member: Any) -> bool:
        return member in self._sets.get(key, set())

    def smembers(self, key: str) -> set:
        return set(self._sets.get(key, set()))

    def sinter(self, *keys: str) -> set:
        sets = [self._sets.get(k, set()) for k in keys]
        return set.intersection(*sets) if sets else set()

    # ZSet
    def zadd(self, key: str, mapping: dict[str, float]) -> int:
        zs = self._zsets.setdefault(key, {})
        added = sum(1 for m in mapping if m not in zs)
        zs.update(mapping)
        return added

    def zrange(self, key: str, start: int, end: int, withscores: bool = False):
        zs = self._zsets.get(key, {})
        sorted_members = sorted(zs.items(), key=lambda x: x[1])
        sliced = sorted_members[start: end + 1 if end != -1 else None]
        return sliced if withscores else [m for m, _ in sliced]

    def zrevrank(self, key: str, member: str) -> Optional[int]:
        zs = self._zsets.get(key, {})
        sorted_members = sorted(zs.keys(), key=lambda m: -zs[m])
        try:
            return sorted_members.index(member)
        except ValueError:
            return None

# ── LRU + TTL 缓存层 ─────────────────────────────────────────
class LRUCache:
    """带 TTL 过期和 LRU 驱逐的内存缓存"""

    def __init__(self, capacity: int = 100):
        self.capacity = capacity
        self._cache: OrderedDict[str, tuple[Any, Optional[float]]] = OrderedDict()
        self._lock = threading.Lock()
        self.hits = 0
        self.misses = 0

    def get(self, key: str) -> Optional[Any]:
        with self._lock:
            if key not in self._cache:
                self.misses += 1
                return None
            val, expire_at = self._cache[key]
            if expire_at and time.time() > expire_at:
                del self._cache[key]
                self.misses += 1
                return None
            self._cache.move_to_end(key)  # LRU:访问后移到末尾
            self.hits += 1
            return val

    def set(self, key: str, value: Any, ttl: Optional[int] = None) -> None:
        with self._lock:
            expire_at = time.time() + ttl if ttl else None
            if key in self._cache:
                self._cache.move_to_end(key)
            self._cache[key] = (value, expire_at)
            if len(self._cache) > self.capacity:
                self._cache.popitem(last=False)  # 驱逐最久未使用

    def stats(self) -> dict:
        total = self.hits + self.misses
        return {
            "size": len(self._cache),
            "hits": self.hits,
            "misses": self.misses,
            "hit_rate": f"{self.hits/total*100:.1f}%" if total else "0%",
        }

# ── CLI 模式 ─────────────────────────────────────────────────
def mode_structures(_: argparse.Namespace) -> None:
    print(f"=== 五大数据结构演示  [{now_str()}] ===\n")
    r = MockRedis()

    # String:计数器
    for _ in range(5):
        r.incr("page:views")
    r.set("product:1001", "机械键盘 Pro X", ex=3600)
    print(f"[String] 页面访问量: {r.get('page:views')}")
    print(f"[String] 商品缓存: {r.get('product:1001')}  TTL={r.ttl('product:1001')}s")

    # List:任务队列
    for task in ["task-A", "task-B", "task-C"]:
        r.lpush("task:queue", task)
    print(f"\n[List] 队列: {r.lrange('task:queue', 0, -1)}")
    print(f"[List] 消费: {r.rpop('task:queue')}")

    # Hash:对象存储
    r.hset("user:1001", "name", "赵工")
    r.hset("user:1001", "role", "admin")
    r.hset("user:1001", "login_count", 42)
    print(f"\n[Hash] 用户信息: {r.hgetall('user:1001')}")

    # Set:标签去重
    r.sadd("product:1001:tags", "办公", "游戏", "机械")
    r.sadd("product:1002:tags", "办公", "设计", "4K")
    common = r.sinter("product:1001:tags", "product:1002:tags")
    print(f"\n[Set] 商品1标签: {r.smembers('product:1001:tags')}")
    print(f"[Set] 共同标签: {common}")

    # ZSet:排行榜
    r.zadd("leaderboard", {"Alice": 9800, "Bob": 8500, "Carol": 9200, "Dave": 7100})
    top3 = r.zrange("leaderboard", 0, 2, withscores=True)
    rows = [[i+1, m, int(s)] for i, (m, s) in enumerate(reversed(top3))]
    print(f"\n[ZSet] 排行榜 Top3:")
    print(ascii_table(["排名", "用户", "积分"], rows, title="积分排行榜"))

def mode_cache(_: argparse.Namespace) -> None:
    print(f"=== LRU + TTL 缓存层演示  [{now_str()}] ===\n")
    cache = LRUCache(capacity=5)

    # 模拟数据库查询(慢)
    db_calls = 0
    def get_product(pid: int) -> dict:
        nonlocal db_calls
        key = f"product:{pid}"
        cached = cache.get(key)
        if cached:
            return cached
        # 缓存未命中,查数据库
        db_calls += 1
        time.sleep(0.01)  # 模拟 10ms 数据库延迟
        data = {"id": pid, "name": f"商品-{pid}", "price": pid * 99.0}
        cache.set(key, data, ttl=60)
        return data

    # 第一轮:全部未命中
    print("第一轮查询(冷启动):")
    for pid in range(1, 6):
        get_product(pid)

    # 第二轮:全部命中缓存
    print("第二轮查询(缓存预热后):")
    for pid in range(1, 6):
        get_product(pid)

    # 第三轮:触发 LRU 驱逐
    print("第三轮:新增商品触发 LRU 驱逐:")
    for pid in range(6, 9):
        get_product(pid)

    s = cache.stats()
    rows = [
        ["缓存容量", str(cache.capacity)],
        ["当前大小", str(s["size"])],
        ["命中次数", str(s["hits"])],
        ["未命中次数", str(s["misses"])],
        ["命中率", s["hit_rate"]],
        ["数据库调用次数", str(db_calls)],
    ]
    print("\n" + ascii_table(["指标", "值"], rows, title="缓存层统计"))

def mode_patterns(_: argparse.Namespace) -> None:
    print(f"=== 缓存三大问题演示  [{now_str()}] ===\n")
    rows = [
        ["缓存穿透", "查询 id=-1 等不存在数据", "每次都打到DB", "缓存空值 + 布隆过滤器"],
        ["缓存雪崩", "大量key同时过期", "DB瞬间压力暴增", "过期时间加随机抖动"],
        ["缓存击穿", "热点key过期瞬间", "大量并发打到DB", "SETNX互斥锁 + 逻辑过期"],
    ]
    print(ascii_table(
        ["问题", "触发场景", "现象", "解决方案"],
        rows, title="缓存三大问题"
    ))

    # 演示随机抖动防雪崩
    print("\n随机抖动 TTL 示例(防雪崩):")
    base_ttl = 3600
    jitter_rows = []
    for i in range(5):
        jitter = random.randint(0, 300)
        actual_ttl = base_ttl + jitter
        jitter_rows.append([f"key:{i}", f"{base_ttl}s", f"+{jitter}s", f"{actual_ttl}s"])
    print(ascii_table(["键名", "基础TTL", "抖动", "实际TTL"], jitter_rows, title="TTL 随机抖动"))

def main() -> None:
    p = argparse.ArgumentParser(description="Redis 数据结构与缓存层演示")
    p.add_argument("--mode", choices=["structures", "cache", "patterns"], default="structures")
    args = p.parse_args()
    {"structures": mode_structures, "cache": mode_cache, "patterns": mode_patterns}[args.mode](args)

if __name__ == "__main__":
    main()
$ python3 30-python-redis.py --mode structures

=== 五大数据结构演示  [2026-04-18 05:11:29] ===

String 操作:
+-------+-------+-------+
| key   | value | TTL   |
+-------+-------+-------+
| name  | alice | -1    |
| count | 3     | -1    |
+-------+-------+-------+

ZSet 排行榜:
+-------+-------+
| 用户  | 分数  |
+-------+-------+
| bob   | 85    |
| carol | 92    |
| alice | 100   |
+-------+-------+

$ python3 30-python-redis.py --mode patterns

=== 缓存三大问题演示  [2026-04-18 05:11:29] ===

[缓存穿透] 查询不存在的 key:
  第1次: 缓存未命中 → 查数据库 → 缓存空值
  第2次: 命中缓存(空值),避免再次查数据库 ✅

[缓存雪崩] TTL 随机抖动:
  user:1 TTL = 312s(基础300 + 抖动12)
  user:2 TTL = 287s(基础300 + 抖动-13)
  过期时间分散,避免同时失效 ✅

小结

概念 一句话记忆
String set/get/incr,存字符串/数字/JSON,最通用
List lpush/lrange,双端队列,消息队列/最新列表
Set sadd/smembers,无序集合,去重/共同好友
Hash hset/hgetall,字段映射,对象存储
ZSet zadd/zrange,有序集合,排行榜/延迟队列
LRU 淘汰 OrderedDict + move_to_end,O(1) 实现
缓存穿透 缓存空值,避免每次都打数据库
缓存击穿 setnx 互斥锁,只有一个请求去数据库
缓存雪崩 TTL 加随机抖动,过期时间分散

⏱ NexDo Time(5 分钟)

挑战:给 MockRedis 加一个 pipeline() 上下文管理器,支持批量执行命令(减少网络往返)。

具体步骤:

  1. 实现 Pipeline 类,在 __enter__ 里收集命令,在 __exit__ 里批量执行
  2. 支持 pipe.set(key, value)pipe.get(key) 两种命令
  3. execute() 方法返回所有命令的结果列表
  4. 验证:with r.pipeline() as pipe: pipe.set("a", 1); pipe.set("b", 2); results = pipe.execute()

Don’t wait for next time, do it in the next moment.