54 · 性能探针:cProfile、内存分析与优化实战
🔗 知识图谱导航:阅读本文前,建议先回顾《05 · 文本清洗:字符串高阶操作与正则》里的文本处理思路,以及《06 · 闭包与装饰器:函数的终极形态》里的计时器概念。本文会把“感觉慢”升级成“用工具定位慢在哪里”。 NexDo Time · 2026-04-17 · 预计阅读 28 分钟
痛点与架构
很多新手优化程序时靠猜:是不是循环慢?是不是列表慢?是不是内存泄漏?这很危险,因为性能瓶颈经常藏在你没注意的地方。工程里更可靠的做法是先测量,再优化,再复测。
这一篇全部使用 Python 内置工具:cProfile 找函数热点,tracemalloc 看内存分配,timeit 做小段代码基准测试。你会看到同样逻辑的慢写法和快写法有什么差别,也会学会用命令行切换不同探针。
感觉慢
-> timeit:先量化一个操作到底多慢
-> cProfile:找到累计耗时最高的函数
-> tracemalloc:确认内存分配热点
-> 优化写法
-> 再次测量,确认真的变快
步步为营:核心逻辑自适应拆解
性能优化最怕“凭感觉”。下面拆成 8 个步骤,每一步都能运行并打印结果,让你看到工具输出,再理解背后的优化思路。
Step 1:用 slow_word_count 和 fast_word_count 看清数据结构的差距
痛点与机制:
性能优化第一课不是炫技,而是选对数据结构。慢版本每遇到一个词,都在已有结果里重新遍历查找;快版本用字典的 get() 直接定位。它们像“翻整本通讯录找人”和“按姓名索引查人”的区别。
核心源码(逐字来自文末完整源码):
def slow_word_count(corpus: list[str]) -> dict[str, int]:
"""慢版本:字符串拼接 + 重复遍历。"""
result: dict[str, int] = {}
for doc in corpus:
words = doc.split()
for word in words:
# 反模式:每次都重新遍历 result
found = False
for k in result:
if k == word:
result[k] += 1
found = True
break
if not found:
result[word] = 1
return result
def fast_word_count(corpus: list[str]) -> dict[str, int]:
"""快版本:直接用 dict.get()。"""
result: dict[str, int] = {}
for doc in corpus:
for word in doc.split():
result[word] = result.get(word, 0) + 1
return result
可运行演示(补齐 Mock 数据与 print 反馈):
CORPUS: list[str] = [
"task alpha alpha beta",
"task beta gamma",
"alpha deploy task",
]
def slow_word_count(corpus: list[str]) -> dict[str, int]:
"""慢版本:字符串拼接 + 重复遍历。"""
result: dict[str, int] = {}
for doc in corpus:
words = doc.split()
for word in words:
# 反模式:每次都重新遍历 result
found = False
for k in result:
if k == word:
result[k] += 1
found = True
break
if not found:
result[word] = 1
return result
def fast_word_count(corpus: list[str]) -> dict[str, int]:
"""快版本:直接用 dict.get()。"""
result: dict[str, int] = {}
for doc in corpus:
for word in doc.split():
result[word] = result.get(word, 0) + 1
return result
slow = slow_word_count(CORPUS)
fast = fast_word_count(CORPUS)
print("慢版本结果:", slow)
print("快版本结果:", fast)
print("两者一致:", slow == fast)
print("直觉:慢版本像每次找词都翻整本通讯录,快版本直接查字典。")
Step 2:用 memory_leak_demo 对比循环拼接和 join 的内存成本
痛点与机制:
循环里不断 result = result + chunk 会反复创建新字符串,旧内容也要跟着复制,数据一大就像搬家时每添一个箱子都重新搬全屋。列表收集后 join() 则是先把材料放篮子里,最后一次性组装。
核心源码(逐字来自文末完整源码):
def memory_leak_demo(n: int = 500) -> list[str]:
"""模拟内存问题:在循环中用 + 拼接字符串(O(n²) 内存)。"""
result = ""
for i in range(n):
result = result + f"chunk_{i}_" # 每次创建新字符串对象
return result.split("_")
def memory_efficient(n: int = 500) -> list[str]:
"""内存友好版本:用列表收集再 join。"""
parts = [f"chunk_{i}_" for i in range(n)]
return "".join(parts).split("_")
可运行演示(补齐 Mock 数据与 print 反馈):
def memory_leak_demo(n: int = 500) -> list[str]:
"""模拟内存问题:在循环中用 + 拼接字符串(O(n²) 内存)。"""
result = ""
for i in range(n):
result = result + f"chunk_{i}_" # 每次创建新字符串对象
return result.split("_")
def memory_efficient(n: int = 500) -> list[str]:
"""内存友好版本:用列表收集再 join。"""
parts = [f"chunk_{i}_" for i in range(n)]
return "".join(parts).split("_")
slow_parts = memory_leak_demo(8)
fast_parts = memory_efficient(8)
print("慢版本切片前 6 个:", slow_parts[:6])
print("快版本切片前 6 个:", fast_parts[:6])
print("结果长度一致:", len(slow_parts) == len(fast_parts))
print("直觉:循环里 + 拼接会反复复制旧字符串,join 像先攒零件再一次组装。")
Step 3:用 lru_cache 把重复递归计算变成查备忘录
痛点与机制:
斐波那契递归会重复计算大量相同子问题。lru_cache 像给函数配了一本备忘录:第一次算完记下来,下次同样输入直接取答案。这个优化常用于纯函数、配置读取、递归搜索等场景。
核心源码(逐字来自文末完整源码):
@lru_cache(maxsize=256)
def fib_cached(n: int) -> int:
"""带缓存的斐波那契——演示 lru_cache 的性能效果。"""
if n <= 1:
return n
return fib_cached(n - 1) + fib_cached(n - 2)
def fib_plain(n: int) -> int:
"""无缓存版本。"""
if n <= 1:
return n
return fib_plain(n - 1) + fib_plain(n - 2)
可运行演示(补齐 Mock 数据与 print 反馈):
from functools import lru_cache
import timeit
@lru_cache(maxsize=256)
def fib_cached(n: int) -> int:
"""带缓存的斐波那契——演示 lru_cache 的性能效果。"""
if n <= 1:
return n
return fib_cached(n - 1) + fib_cached(n - 2)
def fib_plain(n: int) -> int:
"""无缓存版本。"""
if n <= 1:
return n
return fib_plain(n - 1) + fib_plain(n - 2)
fib_cached.cache_clear()
plain_time = timeit.timeit(lambda: fib_plain(20), number=1)
cached_time = timeit.timeit(lambda: fib_cached(20), number=1)
print("fib_plain(20):", fib_plain(20), f"耗时 {plain_time*1000:.3f}ms")
print("fib_cached(20):", fib_cached(20), f"耗时 {cached_time*1000:.3f}ms")
print("缓存命中信息:", fib_cached.cache_info())
print("直觉:lru_cache 像备忘录,算过的题下次直接翻答案。")
Step 4:用 demo_cprofile 找出函数级热点
痛点与机制:
cProfile 像给程序装计步器:每个函数调用多少次、累计花了多久都会记录。看报告时优先盯 cumtime,它表示函数连同内部调用一共消耗的时间,通常最能指向瓶颈。
核心源码(逐字来自文末完整源码):
def demo_cprofile() -> None:
print("\n ── cProfile 函数调用分析 ─────────────────")
pr = cProfile.Profile()
pr.enable()
slow_word_count(CORPUS[:200]) # 只用200条,避免太慢
pr.disable()
# 用 pstats 格式化输出
stream = io.StringIO()
ps = pstats.Stats(pr, stream=stream)
ps.sort_stats("cumulative")
ps.print_stats(8) # 只显示前8行
output = stream.getvalue()
# 提取关键行输出
lines = [l for l in output.split("\n") if l.strip() and not l.startswith(" ")]
print("\n".join(f" {l}" for l in lines[:15]))
print("\n 💡 关注 cumtime(累计耗时)最高的函数——那就是瓶颈")
可运行演示(补齐 Mock 数据与 print 反馈):
import cProfile
import io
import pstats
CORPUS: list[str] = [
f"task_node_{i}: processing document with keyword_{'abc' * (i % 5 + 1)}"
for i in range(2000)
]
def slow_word_count(corpus: list[str]) -> dict[str, int]:
"""慢版本:字符串拼接 + 重复遍历。"""
result: dict[str, int] = {}
for doc in corpus:
words = doc.split()
for word in words:
# 反模式:每次都重新遍历 result
found = False
for k in result:
if k == word:
result[k] += 1
found = True
break
if not found:
result[word] = 1
return result
def fast_word_count(corpus: list[str]) -> dict[str, int]:
"""快版本:直接用 dict.get()。"""
result: dict[str, int] = {}
for doc in corpus:
for word in doc.split():
result[word] = result.get(word, 0) + 1
return result
def demo_cprofile() -> None:
print("\n ── cProfile 函数调用分析 ─────────────────")
pr = cProfile.Profile()
pr.enable()
slow_word_count(CORPUS[:200]) # 只用200条,避免太慢
pr.disable()
# 用 pstats 格式化输出
stream = io.StringIO()
ps = pstats.Stats(pr, stream=stream)
ps.sort_stats("cumulative")
ps.print_stats(8) # 只显示前8行
output = stream.getvalue()
# 提取关键行输出
lines = [l for l in output.split("\n") if l.strip() and not l.startswith(" ")]
print("\n".join(f" {l}" for l in lines[:15]))
print("\n 💡 关注 cumtime(累计耗时)最高的函数——那就是瓶颈")
demo_cprofile()
Step 5:用 demo_memory 追踪哪一行分配了更多内存
痛点与机制:
tracemalloc 像内存账本,会记录哪些代码行分配了多少内存。性能问题不只有“慢”,还有“占内存越来越多”。把慢版本和快版本的快照摆在一起,新手能直观看到写法差异带来的分配差异。
核心源码(逐字来自文末完整源码):
def demo_memory() -> None:
print("\n ── tracemalloc 内存分配追踪 ──────────────")
# 分析慢版本内存
tracemalloc.start()
memory_leak_demo(300)
snapshot_slow = tracemalloc.take_snapshot()
tracemalloc.stop()
# 分析快版本内存
tracemalloc.start()
memory_efficient(300)
snapshot_fast = tracemalloc.take_snapshot()
tracemalloc.stop()
# 对比 Top 3 内存分配
print("\n 慢版本(字符串拼接)Top 3 内存分配:")
for stat in snapshot_slow.statistics("lineno")[:3]:
print(f" {stat.size / 1024:.1f} KB {stat.traceback.format()[-1]}")
print("\n 快版本(列表join)Top 3 内存分配:")
for stat in snapshot_fast.statistics("lineno")[:3]:
print(f" {stat.size / 1024:.1f} KB {stat.traceback.format()[-1]}")
可运行演示(补齐 Mock 数据与 print 反馈):
import tracemalloc
def memory_leak_demo(n: int = 500) -> list[str]:
"""模拟内存问题:在循环中用 + 拼接字符串(O(n²) 内存)。"""
result = ""
for i in range(n):
result = result + f"chunk_{i}_" # 每次创建新字符串对象
return result.split("_")
def memory_efficient(n: int = 500) -> list[str]:
"""内存友好版本:用列表收集再 join。"""
parts = [f"chunk_{i}_" for i in range(n)]
return "".join(parts).split("_")
def demo_memory() -> None:
print("\n ── tracemalloc 内存分配追踪 ──────────────")
# 分析慢版本内存
tracemalloc.start()
memory_leak_demo(300)
snapshot_slow = tracemalloc.take_snapshot()
tracemalloc.stop()
# 分析快版本内存
tracemalloc.start()
memory_efficient(300)
snapshot_fast = tracemalloc.take_snapshot()
tracemalloc.stop()
# 对比 Top 3 内存分配
print("\n 慢版本(字符串拼接)Top 3 内存分配:")
for stat in snapshot_slow.statistics("lineno")[:3]:
print(f" {stat.size / 1024:.1f} KB {stat.traceback.format()[-1]}")
print("\n 快版本(列表join)Top 3 内存分配:")
for stat in snapshot_fast.statistics("lineno")[:3]:
print(f" {stat.size / 1024:.1f} KB {stat.traceback.format()[-1]}")
demo_memory()
Step 6:用 demo_compare 做稳定的 timeit 基准对比
痛点与机制:
timeit 专门用来测小段代码,比自己手写 time.time() 更稳。它会重复运行函数再取平均,减少偶然波动。表格里的相对速度条越长,说明相对最慢项快得越多。
核心源码(逐字来自文末完整源码):
def demo_compare() -> None:
print("\n ── timeit 性能对比 ───────────────────────")
benchmarks: list[tuple[str, Callable, int]] = [
("slow_word_count (200条)", lambda: slow_word_count(CORPUS[:200]), 3),
("fast_word_count (200条)", lambda: fast_word_count(CORPUS[:200]), 3),
("fast_word_count (2000条)", lambda: fast_word_count(CORPUS), 3),
("fib_plain(25)", lambda: fib_plain(25), 5),
("fib_cached(25)", lambda: fib_cached(25), 5),
("str concat (300次)", lambda: memory_leak_demo(300), 3),
("list join (300次)", lambda: memory_efficient(300), 3),
]
results: list[tuple[str, float]] = []
for name, fn, repeat in benchmarks:
elapsed = timeit.timeit(fn, number=repeat) / repeat
results.append((name, elapsed))
# 找最慢的作为基准
max_time = max(t for _, t in results)
print(f"\n {'函数':<30} {'耗时(ms)':>10} {'相对速度':>10}")
print(f" {'─'*30} {'─'*10} {'─'*10}")
for name, elapsed in results:
ms = elapsed * 1000
ratio = max_time / elapsed if elapsed > 0 else float("inf")
bar = "█" * min(int(ratio), 20)
print(f" {name:<30} {ms:>9.3f} {bar}")
可运行演示(补齐 Mock 数据与 print 反馈):
import timeit
from functools import lru_cache
from typing import Callable
CORPUS: list[str] = [
f"task_node_{i}: processing document with keyword_{'abc' * (i % 5 + 1)}"
for i in range(2000)
]
def slow_word_count(corpus: list[str]) -> dict[str, int]:
"""慢版本:字符串拼接 + 重复遍历。"""
result: dict[str, int] = {}
for doc in corpus:
words = doc.split()
for word in words:
# 反模式:每次都重新遍历 result
found = False
for k in result:
if k == word:
result[k] += 1
found = True
break
if not found:
result[word] = 1
return result
def fast_word_count(corpus: list[str]) -> dict[str, int]:
"""快版本:直接用 dict.get()。"""
result: dict[str, int] = {}
for doc in corpus:
for word in doc.split():
result[word] = result.get(word, 0) + 1
return result
def memory_leak_demo(n: int = 500) -> list[str]:
"""模拟内存问题:在循环中用 + 拼接字符串(O(n²) 内存)。"""
result = ""
for i in range(n):
result = result + f"chunk_{i}_" # 每次创建新字符串对象
return result.split("_")
def memory_efficient(n: int = 500) -> list[str]:
"""内存友好版本:用列表收集再 join。"""
parts = [f"chunk_{i}_" for i in range(n)]
return "".join(parts).split("_")
@lru_cache(maxsize=256)
def fib_cached(n: int) -> int:
"""带缓存的斐波那契——演示 lru_cache 的性能效果。"""
if n <= 1:
return n
return fib_cached(n - 1) + fib_cached(n - 2)
def fib_plain(n: int) -> int:
"""无缓存版本。"""
if n <= 1:
return n
return fib_plain(n - 1) + fib_plain(n - 2)
def demo_compare() -> None:
print("\n ── timeit 性能对比 ───────────────────────")
benchmarks: list[tuple[str, Callable, int]] = [
("slow_word_count (200条)", lambda: slow_word_count(CORPUS[:200]), 3),
("fast_word_count (200条)", lambda: fast_word_count(CORPUS[:200]), 3),
("fast_word_count (2000条)", lambda: fast_word_count(CORPUS), 3),
("fib_plain(25)", lambda: fib_plain(25), 5),
("fib_cached(25)", lambda: fib_cached(25), 5),
("str concat (300次)", lambda: memory_leak_demo(300), 3),
("list join (300次)", lambda: memory_efficient(300), 3),
]
results: list[tuple[str, float]] = []
for name, fn, repeat in benchmarks:
elapsed = timeit.timeit(fn, number=repeat) / repeat
results.append((name, elapsed))
# 找最慢的作为基准
max_time = max(t for _, t in results)
print(f"\n {'函数':<30} {'耗时(ms)':>10} {'相对速度':>10}")
print(f" {'─'*30} {'─'*10} {'─'*10}")
for name, elapsed in results:
ms = elapsed * 1000
ratio = max_time / elapsed if elapsed > 0 else float("inf")
bar = "█" * min(int(ratio), 20)
print(f" {name:<30} {ms:>9.3f} {bar}")
demo_compare()
Step 7:用 demo_traps 建立常见性能陷阱清单
痛点与机制:
很多性能问题不是算法论文级难题,而是常见小坑:循环拼接字符串、列表头部插入、大列表成员检查、重复计算。陷阱清单像体检表,写代码前后都可以快速扫一遍。
核心源码(逐字来自文末完整源码):
def demo_traps() -> None:
print("\n ── 常见性能陷阱速查 ──────────────────────")
traps = [
("字符串拼接", "s += x(循环中)", '"".join(parts)', "O(n²) → O(n)"),
("列表头部插入", "lst.insert(0, x)", "collections.deque", "O(n) → O(1)"),
("重复成员检查", "x in list(大列表)", "x in set", "O(n) → O(1)"),
("全局变量查找", "global_var(热循环中)", "local = global_var", "减少 LOAD_GLOBAL"),
("重复计算", "fn(x) 每次调用", "@lru_cache", "O(n) → O(1)"),
("小数据用 np", "np.array([1,2,3])+1", "直接用 list", "避免 numpy 开销"),
]
print(f"\n {'陷阱':<12} {'反模式':<22} {'正确做法':<22} {'效果'}")
print(f" {'─'*12} {'─'*22} {'─'*22} {'─'*18}")
for trap, bad, good, effect in traps:
print(f" {trap:<12} {bad:<22} {good:<22} {effect}")
# 实测:list vs set 成员检查
big_list = list(range(10000))
big_set = set(range(10000))
t_list = timeit.timeit(lambda: 9999 in big_list, number=10000)
t_set = timeit.timeit(lambda: 9999 in big_set, number=10000)
speedup = t_list / t_set
print(f"\n 实测 `9999 in list(10000)` vs `9999 in set(10000)`:")
print(f" list: {t_list*1000:.2f}ms set: {t_set*1000:.2f}ms 加速 {speedup:.0f}x")
可运行演示(补齐 Mock 数据与 print 反馈):
import timeit
def demo_traps() -> None:
print("\n ── 常见性能陷阱速查 ──────────────────────")
traps = [
("字符串拼接", "s += x(循环中)", '"".join(parts)', "O(n²) → O(n)"),
("列表头部插入", "lst.insert(0, x)", "collections.deque", "O(n) → O(1)"),
("重复成员检查", "x in list(大列表)", "x in set", "O(n) → O(1)"),
("全局变量查找", "global_var(热循环中)", "local = global_var", "减少 LOAD_GLOBAL"),
("重复计算", "fn(x) 每次调用", "@lru_cache", "O(n) → O(1)"),
("小数据用 np", "np.array([1,2,3])+1", "直接用 list", "避免 numpy 开销"),
]
print(f"\n {'陷阱':<12} {'反模式':<22} {'正确做法':<22} {'效果'}")
print(f" {'─'*12} {'─'*22} {'─'*22} {'─'*18}")
for trap, bad, good, effect in traps:
print(f" {trap:<12} {bad:<22} {good:<22} {effect}")
# 实测:list vs set 成员检查
big_list = list(range(10000))
big_set = set(range(10000))
t_list = timeit.timeit(lambda: 9999 in big_list, number=10000)
t_set = timeit.timeit(lambda: 9999 in big_set, number=10000)
speedup = t_list / t_set
print(f"\n 实测 `9999 in list(10000)` vs `9999 in set(10000)`:")
print(f" list: {t_list*1000:.2f}ms set: {t_set*1000:.2f}ms 加速 {speedup:.0f}x")
demo_traps()
Step 8:用 main 把 cprofile/memory/compare/traps 做成命令行探针
痛点与机制:
性能分析脚本要能按需运行。--mode cprofile 看热点,--mode memory 看内存,--mode compare 看基准,--mode traps 看速查。这样读者不需要改源码,只要切换参数。
核心源码(逐字来自文末完整源码):
def main() -> None:
parser = argparse.ArgumentParser(description="性能探针工具链演示")
parser.add_argument(
"--mode",
choices=["cprofile", "memory", "compare", "traps", "all"],
default="all",
)
args = parser.parse_args()
if args.mode in ("cprofile", "all"):
demo_cprofile()
if args.mode in ("memory", "all"):
demo_memory()
if args.mode in ("compare", "all"):
demo_compare()
if args.mode in ("traps", "all"):
demo_traps()
可运行演示(补齐 Mock 数据与 print 反馈):
import argparse
def main() -> None:
parser = argparse.ArgumentParser(description="性能探针工具链演示")
parser.add_argument(
"--mode",
choices=["cprofile", "memory", "compare", "traps", "all"],
default="all",
)
args = parser.parse_args()
if args.mode in ("cprofile", "all"):
demo_cprofile()
if args.mode in ("memory", "all"):
demo_memory()
if args.mode in ("compare", "all"):
demo_compare()
if args.mode in ("traps", "all"):
demo_traps()
def demo_cprofile() -> None:
print("运行 cProfile 热点分析")
def demo_memory() -> None:
print("运行 tracemalloc 内存分析")
def demo_compare() -> None:
print("运行 timeit 基准对比")
def demo_traps() -> None:
print("运行性能陷阱速查")
import sys
for mode in ["cprofile", "memory", "compare", "traps"]:
print(f"\n$ python profiler_demo.py --mode {mode}")
sys.argv = ["profiler_demo.py", "--mode", mode]
main()
极客实战:完整源码与运行
现在,把上面的积木拼起来,将下面完整代码保存为 profiler_demo.py。它不需要安装第三方依赖,直接用内置语料演示 cProfile、tracemalloc、timeit 和常见性能陷阱。
# profiler_demo.py
"""
性能探针工具链演示 —— cProfile/tracemalloc/timeit 全内置,零安装。
用法:
python3 profiler_demo.py
python3 profiler_demo.py --mode cprofile
python3 profiler_demo.py --mode memory
python3 profiler_demo.py --mode compare
python3 profiler_demo.py --mode traps
"""
import argparse
import cProfile
import io
import pstats
import timeit
import tracemalloc
from functools import lru_cache
from typing import Callable
# ══════════════════════════════════════════════════════════════
# 模拟一个"文本处理管道"作为被分析对象
# ══════════════════════════════════════════════════════════════
# 生成 Mock 语料(零外部依赖)
CORPUS: list[str] = [
f"task_node_{i}: processing document with keyword_{'abc' * (i % 5 + 1)}"
for i in range(2000)
]
def slow_word_count(corpus: list[str]) -> dict[str, int]:
"""慢版本:字符串拼接 + 重复遍历。"""
result: dict[str, int] = {}
for doc in corpus:
words = doc.split()
for word in words:
# 反模式:每次都重新遍历 result
found = False
for k in result:
if k == word:
result[k] += 1
found = True
break
if not found:
result[word] = 1
return result
def fast_word_count(corpus: list[str]) -> dict[str, int]:
"""快版本:直接用 dict.get()。"""
result: dict[str, int] = {}
for doc in corpus:
for word in doc.split():
result[word] = result.get(word, 0) + 1
return result
def memory_leak_demo(n: int = 500) -> list[str]:
"""模拟内存问题:在循环中用 + 拼接字符串(O(n²) 内存)。"""
result = ""
for i in range(n):
result = result + f"chunk_{i}_" # 每次创建新字符串对象
return result.split("_")
def memory_efficient(n: int = 500) -> list[str]:
"""内存友好版本:用列表收集再 join。"""
parts = [f"chunk_{i}_" for i in range(n)]
return "".join(parts).split("_")
@lru_cache(maxsize=256)
def fib_cached(n: int) -> int:
"""带缓存的斐波那契——演示 lru_cache 的性能效果。"""
if n <= 1:
return n
return fib_cached(n - 1) + fib_cached(n - 2)
def fib_plain(n: int) -> int:
"""无缓存版本。"""
if n <= 1:
return n
return fib_plain(n - 1) + fib_plain(n - 2)
# ── cProfile 演示 ─────────────────────────────────────────────
def demo_cprofile() -> None:
print("\n ── cProfile 函数调用分析 ─────────────────")
pr = cProfile.Profile()
pr.enable()
slow_word_count(CORPUS[:200]) # 只用200条,避免太慢
pr.disable()
# 用 pstats 格式化输出
stream = io.StringIO()
ps = pstats.Stats(pr, stream=stream)
ps.sort_stats("cumulative")
ps.print_stats(8) # 只显示前8行
output = stream.getvalue()
# 提取关键行输出
lines = [l for l in output.split("\n") if l.strip() and not l.startswith(" ")]
print("\n".join(f" {l}" for l in lines[:15]))
print("\n 💡 关注 cumtime(累计耗时)最高的函数——那就是瓶颈")
# ── tracemalloc 内存分析 ──────────────────────────────────────
def demo_memory() -> None:
print("\n ── tracemalloc 内存分配追踪 ──────────────")
# 分析慢版本内存
tracemalloc.start()
memory_leak_demo(300)
snapshot_slow = tracemalloc.take_snapshot()
tracemalloc.stop()
# 分析快版本内存
tracemalloc.start()
memory_efficient(300)
snapshot_fast = tracemalloc.take_snapshot()
tracemalloc.stop()
# 对比 Top 3 内存分配
print("\n 慢版本(字符串拼接)Top 3 内存分配:")
for stat in snapshot_slow.statistics("lineno")[:3]:
print(f" {stat.size / 1024:.1f} KB {stat.traceback.format()[-1]}")
print("\n 快版本(列表join)Top 3 内存分配:")
for stat in snapshot_fast.statistics("lineno")[:3]:
print(f" {stat.size / 1024:.1f} KB {stat.traceback.format()[-1]}")
# ── timeit 精确对比 ───────────────────────────────────────────
def demo_compare() -> None:
print("\n ── timeit 性能对比 ───────────────────────")
benchmarks: list[tuple[str, Callable, int]] = [
("slow_word_count (200条)", lambda: slow_word_count(CORPUS[:200]), 3),
("fast_word_count (200条)", lambda: fast_word_count(CORPUS[:200]), 3),
("fast_word_count (2000条)", lambda: fast_word_count(CORPUS), 3),
("fib_plain(25)", lambda: fib_plain(25), 5),
("fib_cached(25)", lambda: fib_cached(25), 5),
("str concat (300次)", lambda: memory_leak_demo(300), 3),
("list join (300次)", lambda: memory_efficient(300), 3),
]
results: list[tuple[str, float]] = []
for name, fn, repeat in benchmarks:
elapsed = timeit.timeit(fn, number=repeat) / repeat
results.append((name, elapsed))
# 找最慢的作为基准
max_time = max(t for _, t in results)
print(f"\n {'函数':<30} {'耗时(ms)':>10} {'相对速度':>10}")
print(f" {'─'*30} {'─'*10} {'─'*10}")
for name, elapsed in results:
ms = elapsed * 1000
ratio = max_time / elapsed if elapsed > 0 else float("inf")
bar = "█" * min(int(ratio), 20)
print(f" {name:<30} {ms:>9.3f} {bar}")
# ── 常见性能陷阱 ──────────────────────────────────────────────
def demo_traps() -> None:
print("\n ── 常见性能陷阱速查 ──────────────────────")
traps = [
("字符串拼接", "s += x(循环中)", '"".join(parts)', "O(n²) → O(n)"),
("列表头部插入", "lst.insert(0, x)", "collections.deque", "O(n) → O(1)"),
("重复成员检查", "x in list(大列表)", "x in set", "O(n) → O(1)"),
("全局变量查找", "global_var(热循环中)", "local = global_var", "减少 LOAD_GLOBAL"),
("重复计算", "fn(x) 每次调用", "@lru_cache", "O(n) → O(1)"),
("小数据用 np", "np.array([1,2,3])+1", "直接用 list", "避免 numpy 开销"),
]
print(f"\n {'陷阱':<12} {'反模式':<22} {'正确做法':<22} {'效果'}")
print(f" {'─'*12} {'─'*22} {'─'*22} {'─'*18}")
for trap, bad, good, effect in traps:
print(f" {trap:<12} {bad:<22} {good:<22} {effect}")
# 实测:list vs set 成员检查
big_list = list(range(10000))
big_set = set(range(10000))
t_list = timeit.timeit(lambda: 9999 in big_list, number=10000)
t_set = timeit.timeit(lambda: 9999 in big_set, number=10000)
speedup = t_list / t_set
print(f"\n 实测 `9999 in list(10000)` vs `9999 in set(10000)`:")
print(f" list: {t_list*1000:.2f}ms set: {t_set*1000:.2f}ms 加速 {speedup:.0f}x")
def main() -> None:
parser = argparse.ArgumentParser(description="性能探针工具链演示")
parser.add_argument(
"--mode",
choices=["cprofile", "memory", "compare", "traps", "all"],
default="all",
)
args = parser.parse_args()
if args.mode in ("cprofile", "all"):
demo_cprofile()
if args.mode in ("memory", "all"):
demo_memory()
if args.mode in ("compare", "all"):
demo_compare()
if args.mode in ("traps", "all"):
demo_traps()
if __name__ == "__main__":
main()
$ python profiler_demo.py --mode cprofile
── cProfile 函数调用分析 ─────────────────
💡 关注 cumtime(累计耗时)最高的函数——那就是瓶颈
$ python profiler_demo.py --mode memory
── tracemalloc 内存分配追踪 ──────────────
慢版本(字符串拼接)Top 3 内存分配:
快版本(列表join)Top 3 内存分配:
$ python profiler_demo.py --mode compare
── timeit 性能对比 ───────────────────────
函数 耗时(ms) 相对速度
────────────────────────────── ────────── ──────────
slow_word_count (200条) 0.257 █████████████████
fast_word_count (200条) 0.064 ████████████████████
fast_word_count (2000条) 0.678 ██████
fib_plain(25) 4.466 █
fib_cached(25) 0.002 ████████████████████
str concat (300次) 0.042 ████████████████████
list join (300次) 0.031 ████████████████████
$ python profiler_demo.py --mode traps
── 常见性能陷阱速查 ──────────────────────
陷阱 反模式 正确做法 效果
──────────── ────────────────────── ────────────────────── ──────────────────
字符串拼接 s += x(循环中) "".join(parts) O(n²) → O(n)
列表头部插入 lst.insert(0, x) collections.deque O(n) → O(1)
重复成员检查 x in list(大列表) x in set O(n) → O(1)
全局变量查找 global_var(热循环中) local = global_var 减少 LOAD_GLOBAL
重复计算 fn(x) 每次调用 @lru_cache O(n) → O(1)
小数据用 np np.array([1,2,3])+1 直接用 list 避免 numpy 开销
实测 `9999 in list(10000)` vs `9999 in set(10000)`:
list: 464.03ms set: 0.24ms 加速 1958x
小结与 NexDo Time ⚡
这一篇你掌握了性能优化的正确顺序:先用 timeit 量化差距,再用 cProfile 找函数热点,用 tracemalloc 查内存分配,最后针对具体证据优化。不要靠感觉改代码,性能工程的核心是“测量、修改、复测”。
5 分钟微操挑战:把 demo_compare() 里 fib_plain(25) 改成 fib_plain(28),再运行 --mode compare。观察无缓存递归会变慢多少,并解释为什么缓存版本变化不明显。
Don’t wait for next time, do it in the next moment.