30 · Redis 实战:五大数据结构与缓存模式
🔗 知识图谱导航:阅读本文前,建议先掌握《16 · 数据库底座:SQLite 核心操作》中的数据持久化概念——Redis 是内存数据库,和 SQLite 的核心区别是"内存 vs 磁盘"和"数据结构丰富 vs 关系型"。本文用纯 Python 实现 MockRedis,零依赖演示所有核心特性。
运行环境:Python 3.12+ 标准库,零额外依赖,直接运行。
极客解析:Redis 的核心价值是"用内存换速度"——典型场景下读写延迟更低,代价是数据量受内存限制。五大数据结构让 Redis 不只是缓存,还能做消息队列(List)、去重(Set)、排行榜(ZSet)、对象存储(Hash)。
Redis 五大数据结构速查
String set/get/incr/decr/expire 最基础,存字符串/数字/JSON
List lpush/rpush/lpop/lrange 双端队列,消息队列/最新列表
Set sadd/smembers/sismember 无序集合,去重/共同好友
Hash hset/hget/hgetall 字段映射,对象存储
ZSet zadd/zrange/zrangebyscore 有序集合,排行榜/延迟队列
缓存三大问题
缓存穿透:查询不存在的 key,每次都打到数据库
解决:缓存空值(set key "" ex 60)或布隆过滤器
缓存击穿:热点 key 过期,大量请求同时打到数据库
解决:互斥锁(setnx)或永不过期 + 异步更新
缓存雪崩:大量 key 同时过期,数据库被打垮
解决:过期时间加随机抖动(ttl + random.randint(0, 300))
步步为营:核心逻辑自适应拆解
这一篇的核心是 Redis 的两层结构:五大数据结构(String/List/Set/Hash/ZSet)+ 缓存模式(LRU 淘汰 + TTL + 三大问题防护)。下面每一步都聚焦一个机制,零依赖可直接运行。
Step 1:用 MockRedis 演示 String/List 两种基础结构
痛点与机制:
MockRedis 用 Python 内置容器模拟 Redis。第一步先吃下最容易理解的两个结构:String 像一个键值便利贴,set/get 就是贴上和读取;List 像排队窗口,lpush 从队伍左边塞任务,rpop 从右边取任务,正好可以模拟“生产者放任务、消费者拿任务”。
核心源码(逐字来自文末完整源码):
# String
def set(self, key: str, value: Any, ex: Optional[int] = None) -> None:
expire_at = time.time() + ex if ex else None
self._strings[key] = (value, expire_at)
def get(self, key: str) -> Optional[Any]:
entry = self._strings.get(key)
if entry is None:
return None
val, expire_at = entry
if self._is_expired(expire_at):
del self._strings[key]
return None
return val
def incr(self, key: str) -> int:
val = int(self.get(key) or 0) + 1
self.set(key, val)
return val
def ttl(self, key: str) -> int:
entry = self._strings.get(key)
if entry is None:
return -2
_, expire_at = entry
if expire_at is None:
return -1
remaining = int(expire_at - time.time())
return max(remaining, 0)
# List
def lpush(self, key: str, *values) -> int:
lst = self._lists.setdefault(key, [])
for v in values:
lst.insert(0, v)
return len(lst)
def rpop(self, key: str) -> Optional[Any]:
lst = self._lists.get(key, [])
return lst.pop() if lst else None
def lrange(self, key: str, start: int, end: int) -> list:
lst = self._lists.get(key, [])
return lst[start: end + 1 if end != -1 else None]
可运行演示(补齐 Mock 数据与 print 反馈):
import time
from typing import Any, Optional
class TinyRedis:
def __init__(self) -> None:
# String 用字典保存:key -> (value, expire_at)。
self._strings: dict[str, tuple[Any, Optional[float]]] = {}
# List 用 Python list 模拟,左进右出就像一个任务队列。
self._lists: dict[str, list[str]] = {}
def set(self, key: str, value: Any, ex: Optional[int] = None) -> None:
expire_at = time.time() + ex if ex else None
self._strings[key] = (value, expire_at)
def get(self, key: str) -> Optional[Any]:
item = self._strings.get(key)
if item is None:
return None
value, expire_at = item
if expire_at is not None and time.time() > expire_at:
del self._strings[key]
return None
return value
def lpush(self, key: str, *values: str) -> int:
queue = self._lists.setdefault(key, [])
for value in values:
queue.insert(0, value) # 左侧插入,新任务放到队首。
return len(queue)
def rpop(self, key: str) -> Optional[str]:
queue = self._lists.get(key, [])
return queue.pop() if queue else None # 右侧弹出,模拟消费者取走最早任务。
r = TinyRedis()
r.set("user:1:name", "alice")
r.lpush("task:queue", "发邮件", "生成报表", "清理缓存")
print("String 读取:", r.get("user:1:name"))
print("List 消费顺序:", r.rpop("task:queue"), "->", r.rpop("task:queue"), "->", r.rpop("task:queue"))
Step 2:用 MockRedis 演示 Hash/Set/ZSet 三种进阶结构
痛点与机制:
Hash 像一张用户资料卡,适合把 name/role/login_count 这种字段放在同一个 key 下面;Set 像自动去重的标签袋,同一个标签放两次也只算一次;ZSet 像自带分数的排行榜,每个成员都有 score,Redis 会按分数排序。
核心源码(逐字来自文末完整源码):
# Hash
def hset(self, key: str, field: str, value: Any) -> None:
self._hashes.setdefault(key, {})[field] = value
def hget(self, key: str, field: str) -> Optional[Any]:
return self._hashes.get(key, {}).get(field)
def hgetall(self, key: str) -> dict:
return dict(self._hashes.get(key, {}))
# Set
def sadd(self, key: str, *members) -> int:
s = self._sets.setdefault(key, set())
before = len(s)
s.update(members)
return len(s) - before
def sismember(self, key: str, member: Any) -> bool:
return member in self._sets.get(key, set())
def smembers(self, key: str) -> set:
return set(self._sets.get(key, set()))
def sinter(self, *keys: str) -> set:
sets = [self._sets.get(k, set()) for k in keys]
return set.intersection(*sets) if sets else set()
# ZSet
def zadd(self, key: str, mapping: dict[str, float]) -> int:
zs = self._zsets.setdefault(key, {})
added = sum(1 for m in mapping if m not in zs)
zs.update(mapping)
return added
def zrange(self, key: str, start: int, end: int, withscores: bool = False):
zs = self._zsets.get(key, {})
sorted_members = sorted(zs.items(), key=lambda x: x[1])
sliced = sorted_members[start: end + 1 if end != -1 else None]
return sliced if withscores else [m for m, _ in sliced]
def zrevrank(self, key: str, member: str) -> Optional[int]:
zs = self._zsets.get(key, {})
sorted_members = sorted(zs.keys(), key=lambda m: -zs[m])
try:
return sorted_members.index(member)
except ValueError:
return None
可运行演示(补齐 Mock 数据与 print 反馈):
from typing import Any, Optional
class TinyRedis:
def __init__(self) -> None:
self._hashes: dict[str, dict[str, Any]] = {}
self._sets: dict[str, set[str]] = {}
self._zsets: dict[str, dict[str, float]] = {}
def hset(self, key: str, field: str, value: Any) -> None:
self._hashes.setdefault(key, {})[field] = value
def hgetall(self, key: str) -> dict[str, Any]:
return dict(self._hashes.get(key, {}))
def sadd(self, key: str, *members: str) -> int:
values = self._sets.setdefault(key, set())
before = len(values)
values.update(members) # set 天生去重,重复成员不会增加数量。
return len(values) - before
def smembers(self, key: str) -> set[str]:
return set(self._sets.get(key, set()))
def zadd(self, key: str, mapping: dict[str, float]) -> int:
board = self._zsets.setdefault(key, {})
added = sum(1 for member in mapping if member not in board)
board.update(mapping)
return added
def zrange(self, key: str, start: int, end: int, withscores: bool = False):
items = sorted(self._zsets.get(key, {}).items(), key=lambda item: item[1])
sliced = items[start: end + 1 if end != -1 else None]
return sliced if withscores else [member for member, _ in sliced]
r = TinyRedis()
r.hset("user:1", "name", "alice")
r.hset("user:1", "role", "admin")
r.sadd("tags", "python", "redis", "python")
r.zadd("rank", {"alice": 98, "bob": 88, "carol": 95})
print("Hash 用户卡片:", r.hgetall("user:1"))
print("Set 标签去重:", sorted(r.smembers("tags")))
print("ZSet 排行榜:", list(reversed(r.zrange("rank", 0, -1, withscores=True))))
Step 3:用 TTL 和 incr 演示 Redis 的原子操作
痛点与机制:
set(key, value, ex=N) 设置 N 秒后过期,MockRedis 在 get 时检查过期时间,过期返回 None。incr 是原子自增——在 Redis 里,incr 是单线程执行的,不会有并发问题,适合做计数器、限流、分布式 ID。MockRedis 用 int(self.get(key)) + 1 模拟,真实 Redis 的 incr 是 C 语言实现的原子操作。
核心源码(逐字来自文末完整源码):
def set(self, key: str, value: Any, ex: Optional[int] = None) -> None:
expire_at = time.time() + ex if ex else None
self._strings[key] = (value, expire_at)
def get(self, key: str) -> Optional[Any]:
entry = self._strings.get(key)
if entry is None:
return None
val, expire_at = entry
if self._is_expired(expire_at):
del self._strings[key]
return None
return val
def incr(self, key: str) -> int:
val = int(self.get(key) or 0) + 1
self.set(key, val)
return val
def ttl(self, key: str) -> int:
entry = self._strings.get(key)
if entry is None:
return -2
_, expire_at = entry
if expire_at is None:
return -1
remaining = int(expire_at - time.time())
return max(remaining, 0)
可运行演示(补齐 Mock 数据与 print 反馈):
import time
from typing import Any, Optional
class CounterRedis:
def __init__(self) -> None:
self._strings: dict[str, tuple[Any, Optional[float]]] = {}
def set(self, key: str, value: Any, ex: Optional[int] = None) -> None:
expire_at = time.time() + ex if ex else None
self._strings[key] = (value, expire_at)
def get(self, key: str) -> Optional[Any]:
item = self._strings.get(key)
if item is None:
return None
value, expire_at = item
if expire_at is not None and time.time() > expire_at:
del self._strings[key]
return None
return value
def incr(self, key: str) -> int:
value = int(self.get(key) or 0) + 1
self.set(key, value)
return value
def ttl(self, key: str) -> int:
item = self._strings.get(key)
if item is None:
return -2 # Redis 语义:key 不存在。
_, expire_at = item
if expire_at is None:
return -1 # Redis 语义:key 存在但没有过期时间。
return max(int(expire_at - time.time()), 0)
r = CounterRedis()
r.set("session:token", "abc123", ex=2)
print("刚写入:", r.get("session:token"), "TTL≈", r.ttl("session:token"), "秒")
print("计数器:", r.incr("page:views"), r.incr("page:views"), r.incr("page:views"))
time.sleep(2.1)
print("过期后:", r.get("session:token"), "TTL=", r.ttl("session:token"))
Step 4:用 LRUCache 手动实现 Redis 的 LRU 淘汰策略
痛点与机制:
LRUCache 用 OrderedDict 实现 LRU(Least Recently Used)淘汰策略:get 时把 key 移到末尾(最近使用),set 时如果容量满了,删除第一个 key(最久未使用)。OrderedDict.move_to_end(key) 是 O(1) 操作,popitem(last=False) 删除第一个元素也是 O(1)。Redis 的 maxmemory-policy lru 就是这个原理,只是用了近似 LRU 算法(随机采样)来降低开销。
核心源码(逐字来自文末完整源码):
class LRUCache:
"""带 TTL 过期和 LRU 驱逐的内存缓存"""
def __init__(self, capacity: int = 100):
self.capacity = capacity
self._cache: OrderedDict[str, tuple[Any, Optional[float]]] = OrderedDict()
self._lock = threading.Lock()
self.hits = 0
self.misses = 0
def get(self, key: str) -> Optional[Any]:
with self._lock:
if key not in self._cache:
self.misses += 1
return None
val, expire_at = self._cache[key]
if expire_at and time.time() > expire_at:
del self._cache[key]
self.misses += 1
return None
self._cache.move_to_end(key) # LRU:访问后移到末尾
self.hits += 1
return val
def set(self, key: str, value: Any, ttl: Optional[int] = None) -> None:
with self._lock:
expire_at = time.time() + ttl if ttl else None
if key in self._cache:
self._cache.move_to_end(key)
self._cache[key] = (value, expire_at)
if len(self._cache) > self.capacity:
self._cache.popitem(last=False) # 驱逐最久未使用
def stats(self) -> dict:
total = self.hits + self.misses
return {
"size": len(self._cache),
"hits": self.hits,
"misses": self.misses,
"hit_rate": f"{self.hits/total*100:.1f}%" if total else "0%",
}
可运行演示(补齐 Mock 数据与 print 反馈):
import time
from collections import OrderedDict
from typing import Any, Optional
class LRUCache:
def __init__(self, capacity: int = 3) -> None:
self.capacity = capacity
self._cache: OrderedDict[str, tuple[Any, Optional[float]]] = OrderedDict()
def get(self, key: str) -> Optional[Any]:
if key not in self._cache:
return None
value, expire_at = self._cache[key]
if expire_at and time.time() > expire_at:
del self._cache[key]
return None
self._cache.move_to_end(key) # 被访问的 key 移到队尾,表示“刚用过”。
return value
def set(self, key: str, value: Any, ttl: Optional[int] = None) -> None:
expire_at = time.time() + ttl if ttl else None
if key in self._cache:
self._cache.move_to_end(key)
self._cache[key] = (value, expire_at)
if len(self._cache) > self.capacity:
removed_key, _ = self._cache.popitem(last=False) # 队首就是最久没用的。
print("容量满了,淘汰:", removed_key)
cache = LRUCache(capacity=3)
for key in ["A", "B", "C"]:
cache.set(key, key.lower())
print("初始缓存:", list(cache._cache.keys()))
cache.get("A")
print("访问 A 后:", list(cache._cache.keys()))
cache.set("D", "d")
print("加入 D 后:", list(cache._cache.keys()))
Step 5:用 mode_structures 演示五大数据结构的完整操作
痛点与机制:
mode_structures 用 ascii_table 格式化展示五大数据结构的操作结果,让读者一眼看清每种数据结构的特点。这个演示覆盖了 Redis 最常用的命令:set/get/incr(String)、lpush/lrange(List)、sadd/smembers(Set)、hset/hgetall(Hash)、zadd/zrange(ZSet)。
核心源码(逐字来自文末完整源码):
def mode_structures(_: argparse.Namespace) -> None:
print(f"=== 五大数据结构演示 [{now_str()}] ===\n")
r = MockRedis()
# String:计数器
for _ in range(5):
r.incr("page:views")
r.set("product:1001", "机械键盘 Pro X", ex=3600)
print(f"[String] 页面访问量: {r.get('page:views')}")
print(f"[String] 商品缓存: {r.get('product:1001')} TTL={r.ttl('product:1001')}s")
# List:任务队列
for task in ["task-A", "task-B", "task-C"]:
r.lpush("task:queue", task)
print(f"\n[List] 队列: {r.lrange('task:queue', 0, -1)}")
print(f"[List] 消费: {r.rpop('task:queue')}")
# Hash:对象存储
r.hset("user:1001", "name", "赵工")
r.hset("user:1001", "role", "admin")
r.hset("user:1001", "login_count", 42)
print(f"\n[Hash] 用户信息: {r.hgetall('user:1001')}")
# Set:标签去重
r.sadd("product:1001:tags", "办公", "游戏", "机械")
r.sadd("product:1002:tags", "办公", "设计", "4K")
common = r.sinter("product:1001:tags", "product:1002:tags")
print(f"\n[Set] 商品1标签: {r.smembers('product:1001:tags')}")
print(f"[Set] 共同标签: {common}")
# ZSet:排行榜
r.zadd("leaderboard", {"Alice": 9800, "Bob": 8500, "Carol": 9200, "Dave": 7100})
top3 = r.zrange("leaderboard", 0, 2, withscores=True)
rows = [[i+1, m, int(s)] for i, (m, s) in enumerate(reversed(top3))]
print(f"\n[ZSet] 排行榜 Top3:")
print(ascii_table(["排名", "用户", "积分"], rows, title="积分排行榜"))
可运行演示(补齐 Mock 数据与 print 反馈):
from typing import Any
def ascii_table(headers: list[str], rows: list[list[Any]], title: str = "") -> str:
widths = [len(h) for h in headers]
for row in rows:
for i, cell in enumerate(row):
widths[i] = max(widths[i], len(str(cell)))
sep = "+" + "+".join("-" * (w + 2) for w in widths) + "+"
fmt = "|" + "|".join(f" {{:<{w}}} " for w in widths) + "|"
lines = [sep]
if title:
lines.append("|" + title.center(len(sep) - 2) + "|")
lines.append(sep)
lines.append(fmt.format(*headers))
lines.append(sep)
for row in rows:
lines.append(fmt.format(*[str(x) for x in row]))
lines.append(sep)
return "\n".join(lines)
def mode_structures(_: object) -> None:
rows = [
["String", "SET/GET/INCR", "验证码、计数器"],
["List", "LPUSH/RPOP", "任务队列"],
["Set", "SADD/SMEMBERS", "标签去重"],
["Hash", "HSET/HGETALL", "用户资料卡"],
["ZSet", "ZADD/ZRANGE", "积分排行榜"],
]
print(ascii_table(["结构", "常用命令", "适合场景"], rows, title="Redis 五大结构"))
mode_structures(None)
Step 6:用 mode_cache 演示 LRU + TTL 缓存层
痛点与机制:
mode_cache 演示 LRUCache 作为应用层缓存的完整流程:先查缓存,命中直接返回;未命中则查"数据库"(Mock),写入缓存后返回。hit_rate 统计缓存命中率,让读者看到缓存的效果。TTL 演示了缓存过期后自动失效,下次访问会重新从数据库加载。
核心源码(逐字来自文末完整源码):
def mode_cache(_: argparse.Namespace) -> None:
print(f"=== LRU + TTL 缓存层演示 [{now_str()}] ===\n")
cache = LRUCache(capacity=5)
# 模拟数据库查询(慢)
db_calls = 0
def get_product(pid: int) -> dict:
nonlocal db_calls
key = f"product:{pid}"
cached = cache.get(key)
if cached:
return cached
# 缓存未命中,查数据库
db_calls += 1
time.sleep(0.01) # 模拟 10ms 数据库延迟
data = {"id": pid, "name": f"商品-{pid}", "price": pid * 99.0}
cache.set(key, data, ttl=60)
return data
# 第一轮:全部未命中
print("第一轮查询(冷启动):")
for pid in range(1, 6):
get_product(pid)
# 第二轮:全部命中缓存
print("第二轮查询(缓存预热后):")
for pid in range(1, 6):
get_product(pid)
# 第三轮:触发 LRU 驱逐
print("第三轮:新增商品触发 LRU 驱逐:")
for pid in range(6, 9):
get_product(pid)
s = cache.stats()
rows = [
["缓存容量", str(cache.capacity)],
["当前大小", str(s["size"])],
["命中次数", str(s["hits"])],
["未命中次数", str(s["misses"])],
["命中率", s["hit_rate"]],
["数据库调用次数", str(db_calls)],
]
print("\n" + ascii_table(["指标", "值"], rows, title="缓存层统计"))
可运行演示(补齐 Mock 数据与 print 反馈):
from collections import OrderedDict
from typing import Any, Optional
class TinyCache:
def __init__(self, capacity: int = 2) -> None:
self.capacity = capacity
self._cache: OrderedDict[str, Any] = OrderedDict()
self.hits = 0
self.misses = 0
def get(self, key: str) -> Optional[Any]:
if key not in self._cache:
self.misses += 1
return None
self._cache.move_to_end(key)
self.hits += 1
return self._cache[key]
def set(self, key: str, value: Any) -> None:
self._cache[key] = value
self._cache.move_to_end(key)
if len(self._cache) > self.capacity:
self._cache.popitem(last=False)
def mode_cache(_: object) -> None:
cache = TinyCache(capacity=2)
db_calls = 0
def get_product(pid: int) -> dict[str, Any]:
nonlocal db_calls
key = f"product:{pid}"
cached = cache.get(key)
if cached is not None:
print(f"命中缓存: {key}")
return cached
db_calls += 1
data = {"id": pid, "name": f"商品-{pid}"}
cache.set(key, data)
print(f"未命中,查数据库: {key}")
return data
for pid in [1, 2, 1, 3, 2]:
get_product(pid)
print(f"数据库调用 {db_calls} 次,命中 {cache.hits} 次,未命中 {cache.misses} 次")
mode_cache(None)
Step 7:用 mode_patterns 演示缓存穿透/击穿/雪崩防护
痛点与机制:
mode_patterns 演示三大缓存问题的防护模式:缓存穿透用"缓存空值"解决(查不到也缓存 None,避免每次都打数据库);缓存击穿用"互斥锁"解决(setnx 保证只有一个请求去数据库);缓存雪崩用"随机 TTL 抖动"解决(ttl + random.randint(0, 300) 让过期时间分散)。
核心源码(逐字来自文末完整源码):
def mode_patterns(_: argparse.Namespace) -> None:
print(f"=== 缓存三大问题演示 [{now_str()}] ===\n")
rows = [
["缓存穿透", "查询 id=-1 等不存在数据", "每次都打到DB", "缓存空值 + 布隆过滤器"],
["缓存雪崩", "大量key同时过期", "DB瞬间压力暴增", "过期时间加随机抖动"],
["缓存击穿", "热点key过期瞬间", "大量并发打到DB", "SETNX互斥锁 + 逻辑过期"],
]
print(ascii_table(
["问题", "触发场景", "现象", "解决方案"],
rows, title="缓存三大问题"
))
# 演示随机抖动防雪崩
print("\n随机抖动 TTL 示例(防雪崩):")
base_ttl = 3600
jitter_rows = []
for i in range(5):
jitter = random.randint(0, 300)
actual_ttl = base_ttl + jitter
jitter_rows.append([f"key:{i}", f"{base_ttl}s", f"+{jitter}s", f"{actual_ttl}s"])
print(ascii_table(["键名", "基础TTL", "抖动", "实际TTL"], jitter_rows, title="TTL 随机抖动"))
可运行演示(补齐 Mock 数据与 print 反馈):
import random
def mode_patterns(_: object) -> None:
problems = [
("缓存穿透", "查不存在的数据", "缓存空值或布隆过滤器"),
("缓存击穿", "热点 key 突然过期", "互斥锁或逻辑过期"),
("缓存雪崩", "大量 key 同时过期", "TTL 加随机抖动"),
]
print("缓存三大问题:")
for name, scene, solution in problems:
print(f"- {name}: {scene} -> {solution}")
print("\nTTL 随机抖动示例:")
base_ttl = 3600
random.seed(7)
for i in range(3):
jitter = random.randint(0, 300)
print(f"key:{i} 实际 TTL = {base_ttl} + {jitter} = {base_ttl + jitter}s")
mode_patterns(None)
Step 8:用 main 做 structures/cache/patterns 三种模式的 CLI 总入口
痛点与机制:
main 用 argparse 做 CLI 入口,三种模式对应三个学习层次:structures 看数据结构,cache 看缓存层,patterns 看防护模式。用字典分发替代 if/elif 链,是 Python 里处理多分支的惯用写法。
核心源码(逐字来自文末完整源码):
def main() -> None:
p = argparse.ArgumentParser(description="Redis 数据结构与缓存层演示")
p.add_argument("--mode", choices=["structures", "cache", "patterns"], default="structures")
args = p.parse_args()
{"structures": mode_structures, "cache": mode_cache, "patterns": mode_patterns}[args.mode](args)
if __name__ == "__main__":
main()
可运行演示(补齐 Mock 数据与 print 反馈):
import argparse
def mode_structures(_: argparse.Namespace) -> None:
print("运行 structures:学习五大数据结构")
def mode_cache(_: argparse.Namespace) -> None:
print("运行 cache:学习 LRU + TTL 缓存层")
def mode_patterns(_: argparse.Namespace) -> None:
print("运行 patterns:学习缓存穿透、击穿、雪崩")
def main() -> None:
p = argparse.ArgumentParser(description="Redis 数据结构与缓存层演示")
p.add_argument("--mode", choices=["structures", "cache", "patterns"], default="structures")
args = p.parse_args()
{"structures": mode_structures, "cache": mode_cache, "patterns": mode_patterns}[args.mode](args)
for mode in ["structures", "cache", "patterns"]:
print(f">>> python3 30-python-redis.py --mode {mode}")
{"structures": mode_structures, "cache": mode_cache, "patterns": mode_patterns}[mode](argparse.Namespace(mode=mode))
极客实战:完整源码与运行
现在,把上面的积木拼起来,将以下完整代码放进你的编辑器,运行它。先看整体闭环,再回头逐段改参数,你会更容易建立工程直觉。
#!/usr/bin/env python3
"""
30-python-redis.py — Redis 数据结构与缓存层演示(零外部依赖)
用法:
python3 30-python-redis.py --mode structures # 五大数据结构演示
python3 30-python-redis.py --mode cache # TTL + LRU 缓存层演示
python3 30-python-redis.py --mode patterns # 缓存穿透/雪崩/击穿演示
"""
import argparse
import random
import threading
import time
from collections import OrderedDict
from datetime import datetime
from typing import Any, Optional
from zoneinfo import ZoneInfo
TZ = ZoneInfo("Asia/Shanghai")
def now_str() -> str:
return datetime.now(TZ).strftime("%Y-%m-%d %H:%M:%S")
def ascii_table(headers: list[str], rows: list[list[Any]], title: str = "") -> str:
col_w = [len(h) for h in headers]
for row in rows:
for i, cell in enumerate(row):
col_w[i] = max(col_w[i], len(str(cell)))
sep = "+" + "+".join("-" * (w + 2) for w in col_w) + "+"
fmt = "|" + "|".join(f" {{:<{w}}} " for w in col_w) + "|"
lines = []
if title:
total = sum(col_w) + 3 * len(col_w) + 1
lines += [sep, f"|{title.center(total - 2)}|"]
lines += [sep, fmt.format(*headers), sep]
for row in rows:
lines.append(fmt.format(*[str(c) for c in row]))
lines.append(sep)
return "\n".join(lines)
class MockRedis:
"""用 Python 内置类型模拟 Redis 五大数据结构"""
def __init__(self):
self._strings: dict[str, tuple[Any, Optional[float]]] = {} # val, expire_at
self._lists: dict[str, list] = {}
self._hashes: dict[str, dict] = {}
self._sets: dict[str, set] = {}
self._zsets: dict[str, dict[str, float]] = {} # member -> score
def _is_expired(self, expire_at: Optional[float]) -> bool:
return expire_at is not None and time.time() > expire_at
# String
def set(self, key: str, value: Any, ex: Optional[int] = None) -> None:
expire_at = time.time() + ex if ex else None
self._strings[key] = (value, expire_at)
def get(self, key: str) -> Optional[Any]:
entry = self._strings.get(key)
if entry is None:
return None
val, expire_at = entry
if self._is_expired(expire_at):
del self._strings[key]
return None
return val
def incr(self, key: str) -> int:
val = int(self.get(key) or 0) + 1
self.set(key, val)
return val
def ttl(self, key: str) -> int:
entry = self._strings.get(key)
if entry is None:
return -2
_, expire_at = entry
if expire_at is None:
return -1
remaining = int(expire_at - time.time())
return max(remaining, 0)
# List
def lpush(self, key: str, *values) -> int:
lst = self._lists.setdefault(key, [])
for v in values:
lst.insert(0, v)
return len(lst)
def rpop(self, key: str) -> Optional[Any]:
lst = self._lists.get(key, [])
return lst.pop() if lst else None
def lrange(self, key: str, start: int, end: int) -> list:
lst = self._lists.get(key, [])
return lst[start: end + 1 if end != -1 else None]
# Hash
def hset(self, key: str, field: str, value: Any) -> None:
self._hashes.setdefault(key, {})[field] = value
def hget(self, key: str, field: str) -> Optional[Any]:
return self._hashes.get(key, {}).get(field)
def hgetall(self, key: str) -> dict:
return dict(self._hashes.get(key, {}))
# Set
def sadd(self, key: str, *members) -> int:
s = self._sets.setdefault(key, set())
before = len(s)
s.update(members)
return len(s) - before
def sismember(self, key: str, member: Any) -> bool:
return member in self._sets.get(key, set())
def smembers(self, key: str) -> set:
return set(self._sets.get(key, set()))
def sinter(self, *keys: str) -> set:
sets = [self._sets.get(k, set()) for k in keys]
return set.intersection(*sets) if sets else set()
# ZSet
def zadd(self, key: str, mapping: dict[str, float]) -> int:
zs = self._zsets.setdefault(key, {})
added = sum(1 for m in mapping if m not in zs)
zs.update(mapping)
return added
def zrange(self, key: str, start: int, end: int, withscores: bool = False):
zs = self._zsets.get(key, {})
sorted_members = sorted(zs.items(), key=lambda x: x[1])
sliced = sorted_members[start: end + 1 if end != -1 else None]
return sliced if withscores else [m for m, _ in sliced]
def zrevrank(self, key: str, member: str) -> Optional[int]:
zs = self._zsets.get(key, {})
sorted_members = sorted(zs.keys(), key=lambda m: -zs[m])
try:
return sorted_members.index(member)
except ValueError:
return None
# ── LRU + TTL 缓存层 ─────────────────────────────────────────
class LRUCache:
"""带 TTL 过期和 LRU 驱逐的内存缓存"""
def __init__(self, capacity: int = 100):
self.capacity = capacity
self._cache: OrderedDict[str, tuple[Any, Optional[float]]] = OrderedDict()
self._lock = threading.Lock()
self.hits = 0
self.misses = 0
def get(self, key: str) -> Optional[Any]:
with self._lock:
if key not in self._cache:
self.misses += 1
return None
val, expire_at = self._cache[key]
if expire_at and time.time() > expire_at:
del self._cache[key]
self.misses += 1
return None
self._cache.move_to_end(key) # LRU:访问后移到末尾
self.hits += 1
return val
def set(self, key: str, value: Any, ttl: Optional[int] = None) -> None:
with self._lock:
expire_at = time.time() + ttl if ttl else None
if key in self._cache:
self._cache.move_to_end(key)
self._cache[key] = (value, expire_at)
if len(self._cache) > self.capacity:
self._cache.popitem(last=False) # 驱逐最久未使用
def stats(self) -> dict:
total = self.hits + self.misses
return {
"size": len(self._cache),
"hits": self.hits,
"misses": self.misses,
"hit_rate": f"{self.hits/total*100:.1f}%" if total else "0%",
}
# ── CLI 模式 ─────────────────────────────────────────────────
def mode_structures(_: argparse.Namespace) -> None:
print(f"=== 五大数据结构演示 [{now_str()}] ===\n")
r = MockRedis()
# String:计数器
for _ in range(5):
r.incr("page:views")
r.set("product:1001", "机械键盘 Pro X", ex=3600)
print(f"[String] 页面访问量: {r.get('page:views')}")
print(f"[String] 商品缓存: {r.get('product:1001')} TTL={r.ttl('product:1001')}s")
# List:任务队列
for task in ["task-A", "task-B", "task-C"]:
r.lpush("task:queue", task)
print(f"\n[List] 队列: {r.lrange('task:queue', 0, -1)}")
print(f"[List] 消费: {r.rpop('task:queue')}")
# Hash:对象存储
r.hset("user:1001", "name", "赵工")
r.hset("user:1001", "role", "admin")
r.hset("user:1001", "login_count", 42)
print(f"\n[Hash] 用户信息: {r.hgetall('user:1001')}")
# Set:标签去重
r.sadd("product:1001:tags", "办公", "游戏", "机械")
r.sadd("product:1002:tags", "办公", "设计", "4K")
common = r.sinter("product:1001:tags", "product:1002:tags")
print(f"\n[Set] 商品1标签: {r.smembers('product:1001:tags')}")
print(f"[Set] 共同标签: {common}")
# ZSet:排行榜
r.zadd("leaderboard", {"Alice": 9800, "Bob": 8500, "Carol": 9200, "Dave": 7100})
top3 = r.zrange("leaderboard", 0, 2, withscores=True)
rows = [[i+1, m, int(s)] for i, (m, s) in enumerate(reversed(top3))]
print(f"\n[ZSet] 排行榜 Top3:")
print(ascii_table(["排名", "用户", "积分"], rows, title="积分排行榜"))
def mode_cache(_: argparse.Namespace) -> None:
print(f"=== LRU + TTL 缓存层演示 [{now_str()}] ===\n")
cache = LRUCache(capacity=5)
# 模拟数据库查询(慢)
db_calls = 0
def get_product(pid: int) -> dict:
nonlocal db_calls
key = f"product:{pid}"
cached = cache.get(key)
if cached:
return cached
# 缓存未命中,查数据库
db_calls += 1
time.sleep(0.01) # 模拟 10ms 数据库延迟
data = {"id": pid, "name": f"商品-{pid}", "price": pid * 99.0}
cache.set(key, data, ttl=60)
return data
# 第一轮:全部未命中
print("第一轮查询(冷启动):")
for pid in range(1, 6):
get_product(pid)
# 第二轮:全部命中缓存
print("第二轮查询(缓存预热后):")
for pid in range(1, 6):
get_product(pid)
# 第三轮:触发 LRU 驱逐
print("第三轮:新增商品触发 LRU 驱逐:")
for pid in range(6, 9):
get_product(pid)
s = cache.stats()
rows = [
["缓存容量", str(cache.capacity)],
["当前大小", str(s["size"])],
["命中次数", str(s["hits"])],
["未命中次数", str(s["misses"])],
["命中率", s["hit_rate"]],
["数据库调用次数", str(db_calls)],
]
print("\n" + ascii_table(["指标", "值"], rows, title="缓存层统计"))
def mode_patterns(_: argparse.Namespace) -> None:
print(f"=== 缓存三大问题演示 [{now_str()}] ===\n")
rows = [
["缓存穿透", "查询 id=-1 等不存在数据", "每次都打到DB", "缓存空值 + 布隆过滤器"],
["缓存雪崩", "大量key同时过期", "DB瞬间压力暴增", "过期时间加随机抖动"],
["缓存击穿", "热点key过期瞬间", "大量并发打到DB", "SETNX互斥锁 + 逻辑过期"],
]
print(ascii_table(
["问题", "触发场景", "现象", "解决方案"],
rows, title="缓存三大问题"
))
# 演示随机抖动防雪崩
print("\n随机抖动 TTL 示例(防雪崩):")
base_ttl = 3600
jitter_rows = []
for i in range(5):
jitter = random.randint(0, 300)
actual_ttl = base_ttl + jitter
jitter_rows.append([f"key:{i}", f"{base_ttl}s", f"+{jitter}s", f"{actual_ttl}s"])
print(ascii_table(["键名", "基础TTL", "抖动", "实际TTL"], jitter_rows, title="TTL 随机抖动"))
def main() -> None:
p = argparse.ArgumentParser(description="Redis 数据结构与缓存层演示")
p.add_argument("--mode", choices=["structures", "cache", "patterns"], default="structures")
args = p.parse_args()
{"structures": mode_structures, "cache": mode_cache, "patterns": mode_patterns}[args.mode](args)
if __name__ == "__main__":
main()
$ python3 30-python-redis.py --mode structures
=== 五大数据结构演示 [2026-04-18 05:11:29] ===
String 操作:
+-------+-------+-------+
| key | value | TTL |
+-------+-------+-------+
| name | alice | -1 |
| count | 3 | -1 |
+-------+-------+-------+
ZSet 排行榜:
+-------+-------+
| 用户 | 分数 |
+-------+-------+
| bob | 85 |
| carol | 92 |
| alice | 100 |
+-------+-------+
$ python3 30-python-redis.py --mode patterns
=== 缓存三大问题演示 [2026-04-18 05:11:29] ===
[缓存穿透] 查询不存在的 key:
第1次: 缓存未命中 → 查数据库 → 缓存空值
第2次: 命中缓存(空值),避免再次查数据库 ✅
[缓存雪崩] TTL 随机抖动:
user:1 TTL = 312s(基础300 + 抖动12)
user:2 TTL = 287s(基础300 + 抖动-13)
过期时间分散,避免同时失效 ✅
小结
| 概念 | 一句话记忆 |
|---|---|
| String | set/get/incr,存字符串/数字/JSON,最通用 |
| List | lpush/lrange,双端队列,消息队列/最新列表 |
| Set | sadd/smembers,无序集合,去重/共同好友 |
| Hash | hset/hgetall,字段映射,对象存储 |
| ZSet | zadd/zrange,有序集合,排行榜/延迟队列 |
| LRU 淘汰 | OrderedDict + move_to_end,O(1) 实现 |
| 缓存穿透 | 缓存空值,避免每次都打数据库 |
| 缓存击穿 | setnx 互斥锁,只有一个请求去数据库 |
| 缓存雪崩 | TTL 加随机抖动,过期时间分散 |
⏱ NexDo Time(5 分钟)
挑战:给 MockRedis 加一个 pipeline() 上下文管理器,支持批量执行命令(减少网络往返)。
具体步骤:
- 实现
Pipeline类,在__enter__里收集命令,在__exit__里批量执行 - 支持
pipe.set(key, value)和pipe.get(key)两种命令 execute()方法返回所有命令的结果列表- 验证:
with r.pipeline() as pipe: pipe.set("a", 1); pipe.set("b", 2); results = pipe.execute()
Don’t wait for next time, do it in the next moment.