文章

54 · 性能探针:cProfile、内存分析与优化实战

#056 · 2026-04-17 · Python

🔗 知识图谱导航:阅读本文前,建议先回顾《05 · 文本清洗:字符串高阶操作与正则》里的文本处理思路,以及《06 · 闭包与装饰器:函数的终极形态》里的计时器概念。本文会把“感觉慢”升级成“用工具定位慢在哪里”。 NexDo Time · 2026-04-17 · 预计阅读 28 分钟

痛点与架构

很多新手优化程序时靠猜:是不是循环慢?是不是列表慢?是不是内存泄漏?这很危险,因为性能瓶颈经常藏在你没注意的地方。工程里更可靠的做法是先测量,再优化,再复测。

这一篇全部使用 Python 内置工具:cProfile 找函数热点,tracemalloc 看内存分配,timeit 做小段代码基准测试。你会看到同样逻辑的慢写法和快写法有什么差别,也会学会用命令行切换不同探针。

感觉慢
  -> timeit:先量化一个操作到底多慢
  -> cProfile:找到累计耗时最高的函数
  -> tracemalloc:确认内存分配热点
  -> 优化写法
  -> 再次测量,确认真的变快

步步为营:核心逻辑自适应拆解

性能优化最怕“凭感觉”。下面拆成 8 个步骤,每一步都能运行并打印结果,让你看到工具输出,再理解背后的优化思路。

Step 1:用 slow_word_count 和 fast_word_count 看清数据结构的差距

痛点与机制

性能优化第一课不是炫技,而是选对数据结构。慢版本每遇到一个词,都在已有结果里重新遍历查找;快版本用字典的 get() 直接定位。它们像“翻整本通讯录找人”和“按姓名索引查人”的区别。

核心源码(逐字来自文末完整源码)

def slow_word_count(corpus: list[str]) -> dict[str, int]:
    """慢版本:字符串拼接 + 重复遍历。"""
    result: dict[str, int] = {}
    for doc in corpus:
        words = doc.split()
        for word in words:
            # 反模式:每次都重新遍历 result
            found = False
            for k in result:
                if k == word:
                    result[k] += 1
                    found = True
                    break
            if not found:
                result[word] = 1
    return result


def fast_word_count(corpus: list[str]) -> dict[str, int]:
    """快版本:直接用 dict.get()。"""
    result: dict[str, int] = {}
    for doc in corpus:
        for word in doc.split():
            result[word] = result.get(word, 0) + 1
    return result

可运行演示(补齐 Mock 数据与 print 反馈)

CORPUS: list[str] = [
    "task alpha alpha beta",
    "task beta gamma",
    "alpha deploy task",
]

def slow_word_count(corpus: list[str]) -> dict[str, int]:
    """慢版本:字符串拼接 + 重复遍历。"""
    result: dict[str, int] = {}
    for doc in corpus:
        words = doc.split()
        for word in words:
            # 反模式:每次都重新遍历 result
            found = False
            for k in result:
                if k == word:
                    result[k] += 1
                    found = True
                    break
            if not found:
                result[word] = 1
    return result

def fast_word_count(corpus: list[str]) -> dict[str, int]:
    """快版本:直接用 dict.get()。"""
    result: dict[str, int] = {}
    for doc in corpus:
        for word in doc.split():
            result[word] = result.get(word, 0) + 1
    return result

slow = slow_word_count(CORPUS)
fast = fast_word_count(CORPUS)
print("慢版本结果:", slow)
print("快版本结果:", fast)
print("两者一致:", slow == fast)
print("直觉:慢版本像每次找词都翻整本通讯录,快版本直接查字典。")

Step 2:用 memory_leak_demo 对比循环拼接和 join 的内存成本

痛点与机制

循环里不断 result = result + chunk 会反复创建新字符串,旧内容也要跟着复制,数据一大就像搬家时每添一个箱子都重新搬全屋。列表收集后 join() 则是先把材料放篮子里,最后一次性组装。

核心源码(逐字来自文末完整源码)

def memory_leak_demo(n: int = 500) -> list[str]:
    """模拟内存问题:在循环中用 + 拼接字符串(O(n²) 内存)。"""
    result = ""
    for i in range(n):
        result = result + f"chunk_{i}_"   # 每次创建新字符串对象
    return result.split("_")


def memory_efficient(n: int = 500) -> list[str]:
    """内存友好版本:用列表收集再 join。"""
    parts = [f"chunk_{i}_" for i in range(n)]
    return "".join(parts).split("_")

可运行演示(补齐 Mock 数据与 print 反馈)

def memory_leak_demo(n: int = 500) -> list[str]:
    """模拟内存问题:在循环中用 + 拼接字符串(O(n²) 内存)。"""
    result = ""
    for i in range(n):
        result = result + f"chunk_{i}_"   # 每次创建新字符串对象
    return result.split("_")

def memory_efficient(n: int = 500) -> list[str]:
    """内存友好版本:用列表收集再 join。"""
    parts = [f"chunk_{i}_" for i in range(n)]
    return "".join(parts).split("_")

slow_parts = memory_leak_demo(8)
fast_parts = memory_efficient(8)
print("慢版本切片前 6 个:", slow_parts[:6])
print("快版本切片前 6 个:", fast_parts[:6])
print("结果长度一致:", len(slow_parts) == len(fast_parts))
print("直觉:循环里 + 拼接会反复复制旧字符串,join 像先攒零件再一次组装。")

Step 3:用 lru_cache 把重复递归计算变成查备忘录

痛点与机制

斐波那契递归会重复计算大量相同子问题。lru_cache 像给函数配了一本备忘录:第一次算完记下来,下次同样输入直接取答案。这个优化常用于纯函数、配置读取、递归搜索等场景。

核心源码(逐字来自文末完整源码)

@lru_cache(maxsize=256)
def fib_cached(n: int) -> int:
    """带缓存的斐波那契——演示 lru_cache 的性能效果。"""
    if n <= 1:
        return n
    return fib_cached(n - 1) + fib_cached(n - 2)


def fib_plain(n: int) -> int:
    """无缓存版本。"""
    if n <= 1:
        return n
    return fib_plain(n - 1) + fib_plain(n - 2)

可运行演示(补齐 Mock 数据与 print 反馈)

from functools import lru_cache
import timeit

@lru_cache(maxsize=256)
def fib_cached(n: int) -> int:
    """带缓存的斐波那契——演示 lru_cache 的性能效果。"""
    if n <= 1:
        return n
    return fib_cached(n - 1) + fib_cached(n - 2)

def fib_plain(n: int) -> int:
    """无缓存版本。"""
    if n <= 1:
        return n
    return fib_plain(n - 1) + fib_plain(n - 2)


fib_cached.cache_clear()
plain_time = timeit.timeit(lambda: fib_plain(20), number=1)
cached_time = timeit.timeit(lambda: fib_cached(20), number=1)
print("fib_plain(20):", fib_plain(20), f"耗时 {plain_time*1000:.3f}ms")
print("fib_cached(20):", fib_cached(20), f"耗时 {cached_time*1000:.3f}ms")
print("缓存命中信息:", fib_cached.cache_info())
print("直觉:lru_cache 像备忘录,算过的题下次直接翻答案。")

Step 4:用 demo_cprofile 找出函数级热点

痛点与机制

cProfile 像给程序装计步器:每个函数调用多少次、累计花了多久都会记录。看报告时优先盯 cumtime,它表示函数连同内部调用一共消耗的时间,通常最能指向瓶颈。

核心源码(逐字来自文末完整源码)

def demo_cprofile() -> None:
    print("\n  ── cProfile 函数调用分析 ─────────────────")

    pr = cProfile.Profile()
    pr.enable()
    slow_word_count(CORPUS[:200])   # 只用200条,避免太慢
    pr.disable()

    # 用 pstats 格式化输出
    stream = io.StringIO()
    ps = pstats.Stats(pr, stream=stream)
    ps.sort_stats("cumulative")
    ps.print_stats(8)   # 只显示前8行

    output = stream.getvalue()
    # 提取关键行输出
    lines = [l for l in output.split("\n") if l.strip() and not l.startswith("   ")]
    print("\n".join(f"  {l}" for l in lines[:15]))

    print("\n  💡 关注 cumtime(累计耗时)最高的函数——那就是瓶颈")

可运行演示(补齐 Mock 数据与 print 反馈)

import cProfile
import io
import pstats

CORPUS: list[str] = [
    f"task_node_{i}: processing document with keyword_{'abc' * (i % 5 + 1)}"
    for i in range(2000)
]

def slow_word_count(corpus: list[str]) -> dict[str, int]:
    """慢版本:字符串拼接 + 重复遍历。"""
    result: dict[str, int] = {}
    for doc in corpus:
        words = doc.split()
        for word in words:
            # 反模式:每次都重新遍历 result
            found = False
            for k in result:
                if k == word:
                    result[k] += 1
                    found = True
                    break
            if not found:
                result[word] = 1
    return result

def fast_word_count(corpus: list[str]) -> dict[str, int]:
    """快版本:直接用 dict.get()。"""
    result: dict[str, int] = {}
    for doc in corpus:
        for word in doc.split():
            result[word] = result.get(word, 0) + 1
    return result

def demo_cprofile() -> None:
    print("\n  ── cProfile 函数调用分析 ─────────────────")

    pr = cProfile.Profile()
    pr.enable()
    slow_word_count(CORPUS[:200])   # 只用200条,避免太慢
    pr.disable()

    # 用 pstats 格式化输出
    stream = io.StringIO()
    ps = pstats.Stats(pr, stream=stream)
    ps.sort_stats("cumulative")
    ps.print_stats(8)   # 只显示前8行

    output = stream.getvalue()
    # 提取关键行输出
    lines = [l for l in output.split("\n") if l.strip() and not l.startswith("   ")]
    print("\n".join(f"  {l}" for l in lines[:15]))

    print("\n  💡 关注 cumtime(累计耗时)最高的函数——那就是瓶颈")

demo_cprofile()

Step 5:用 demo_memory 追踪哪一行分配了更多内存

痛点与机制

tracemalloc 像内存账本,会记录哪些代码行分配了多少内存。性能问题不只有“慢”,还有“占内存越来越多”。把慢版本和快版本的快照摆在一起,新手能直观看到写法差异带来的分配差异。

核心源码(逐字来自文末完整源码)

def demo_memory() -> None:
    print("\n  ── tracemalloc 内存分配追踪 ──────────────")

    # 分析慢版本内存
    tracemalloc.start()
    memory_leak_demo(300)
    snapshot_slow = tracemalloc.take_snapshot()
    tracemalloc.stop()

    # 分析快版本内存
    tracemalloc.start()
    memory_efficient(300)
    snapshot_fast = tracemalloc.take_snapshot()
    tracemalloc.stop()

    # 对比 Top 3 内存分配
    print("\n  慢版本(字符串拼接)Top 3 内存分配:")
    for stat in snapshot_slow.statistics("lineno")[:3]:
        print(f"    {stat.size / 1024:.1f} KB  {stat.traceback.format()[-1]}")

    print("\n  快版本(列表join)Top 3 内存分配:")
    for stat in snapshot_fast.statistics("lineno")[:3]:
        print(f"    {stat.size / 1024:.1f} KB  {stat.traceback.format()[-1]}")

可运行演示(补齐 Mock 数据与 print 反馈)

import tracemalloc

def memory_leak_demo(n: int = 500) -> list[str]:
    """模拟内存问题:在循环中用 + 拼接字符串(O(n²) 内存)。"""
    result = ""
    for i in range(n):
        result = result + f"chunk_{i}_"   # 每次创建新字符串对象
    return result.split("_")

def memory_efficient(n: int = 500) -> list[str]:
    """内存友好版本:用列表收集再 join。"""
    parts = [f"chunk_{i}_" for i in range(n)]
    return "".join(parts).split("_")

def demo_memory() -> None:
    print("\n  ── tracemalloc 内存分配追踪 ──────────────")

    # 分析慢版本内存
    tracemalloc.start()
    memory_leak_demo(300)
    snapshot_slow = tracemalloc.take_snapshot()
    tracemalloc.stop()

    # 分析快版本内存
    tracemalloc.start()
    memory_efficient(300)
    snapshot_fast = tracemalloc.take_snapshot()
    tracemalloc.stop()

    # 对比 Top 3 内存分配
    print("\n  慢版本(字符串拼接)Top 3 内存分配:")
    for stat in snapshot_slow.statistics("lineno")[:3]:
        print(f"    {stat.size / 1024:.1f} KB  {stat.traceback.format()[-1]}")

    print("\n  快版本(列表join)Top 3 内存分配:")
    for stat in snapshot_fast.statistics("lineno")[:3]:
        print(f"    {stat.size / 1024:.1f} KB  {stat.traceback.format()[-1]}")

demo_memory()

Step 6:用 demo_compare 做稳定的 timeit 基准对比

痛点与机制

timeit 专门用来测小段代码,比自己手写 time.time() 更稳。它会重复运行函数再取平均,减少偶然波动。表格里的相对速度条越长,说明相对最慢项快得越多。

核心源码(逐字来自文末完整源码)

def demo_compare() -> None:
    print("\n  ── timeit 性能对比 ───────────────────────")

    benchmarks: list[tuple[str, Callable, int]] = [
        ("slow_word_count (200条)",  lambda: slow_word_count(CORPUS[:200]),  3),
        ("fast_word_count (200条)",  lambda: fast_word_count(CORPUS[:200]),  3),
        ("fast_word_count (2000条)", lambda: fast_word_count(CORPUS),        3),
        ("fib_plain(25)",            lambda: fib_plain(25),                  5),
        ("fib_cached(25)",           lambda: fib_cached(25),                 5),
        ("str concat (300次)",       lambda: memory_leak_demo(300),          3),
        ("list join (300次)",        lambda: memory_efficient(300),          3),
    ]

    results: list[tuple[str, float]] = []
    for name, fn, repeat in benchmarks:
        elapsed = timeit.timeit(fn, number=repeat) / repeat
        results.append((name, elapsed))

    # 找最慢的作为基准
    max_time = max(t for _, t in results)

    print(f"\n  {'函数':<30} {'耗时(ms)':>10} {'相对速度':>10}")
    print(f"  {'─'*30} {'─'*10} {'─'*10}")
    for name, elapsed in results:
        ms = elapsed * 1000
        ratio = max_time / elapsed if elapsed > 0 else float("inf")
        bar = "█" * min(int(ratio), 20)
        print(f"  {name:<30} {ms:>9.3f}  {bar}")

可运行演示(补齐 Mock 数据与 print 反馈)

import timeit
from functools import lru_cache
from typing import Callable

CORPUS: list[str] = [
    f"task_node_{i}: processing document with keyword_{'abc' * (i % 5 + 1)}"
    for i in range(2000)
]

def slow_word_count(corpus: list[str]) -> dict[str, int]:
    """慢版本:字符串拼接 + 重复遍历。"""
    result: dict[str, int] = {}
    for doc in corpus:
        words = doc.split()
        for word in words:
            # 反模式:每次都重新遍历 result
            found = False
            for k in result:
                if k == word:
                    result[k] += 1
                    found = True
                    break
            if not found:
                result[word] = 1
    return result

def fast_word_count(corpus: list[str]) -> dict[str, int]:
    """快版本:直接用 dict.get()。"""
    result: dict[str, int] = {}
    for doc in corpus:
        for word in doc.split():
            result[word] = result.get(word, 0) + 1
    return result

def memory_leak_demo(n: int = 500) -> list[str]:
    """模拟内存问题:在循环中用 + 拼接字符串(O(n²) 内存)。"""
    result = ""
    for i in range(n):
        result = result + f"chunk_{i}_"   # 每次创建新字符串对象
    return result.split("_")

def memory_efficient(n: int = 500) -> list[str]:
    """内存友好版本:用列表收集再 join。"""
    parts = [f"chunk_{i}_" for i in range(n)]
    return "".join(parts).split("_")

@lru_cache(maxsize=256)
def fib_cached(n: int) -> int:
    """带缓存的斐波那契——演示 lru_cache 的性能效果。"""
    if n <= 1:
        return n
    return fib_cached(n - 1) + fib_cached(n - 2)

def fib_plain(n: int) -> int:
    """无缓存版本。"""
    if n <= 1:
        return n
    return fib_plain(n - 1) + fib_plain(n - 2)

def demo_compare() -> None:
    print("\n  ── timeit 性能对比 ───────────────────────")

    benchmarks: list[tuple[str, Callable, int]] = [
        ("slow_word_count (200条)",  lambda: slow_word_count(CORPUS[:200]),  3),
        ("fast_word_count (200条)",  lambda: fast_word_count(CORPUS[:200]),  3),
        ("fast_word_count (2000条)", lambda: fast_word_count(CORPUS),        3),
        ("fib_plain(25)",            lambda: fib_plain(25),                  5),
        ("fib_cached(25)",           lambda: fib_cached(25),                 5),
        ("str concat (300次)",       lambda: memory_leak_demo(300),          3),
        ("list join (300次)",        lambda: memory_efficient(300),          3),
    ]

    results: list[tuple[str, float]] = []
    for name, fn, repeat in benchmarks:
        elapsed = timeit.timeit(fn, number=repeat) / repeat
        results.append((name, elapsed))

    # 找最慢的作为基准
    max_time = max(t for _, t in results)

    print(f"\n  {'函数':<30} {'耗时(ms)':>10} {'相对速度':>10}")
    print(f"  {'─'*30} {'─'*10} {'─'*10}")
    for name, elapsed in results:
        ms = elapsed * 1000
        ratio = max_time / elapsed if elapsed > 0 else float("inf")
        bar = "█" * min(int(ratio), 20)
        print(f"  {name:<30} {ms:>9.3f}  {bar}")

demo_compare()

Step 7:用 demo_traps 建立常见性能陷阱清单

痛点与机制

很多性能问题不是算法论文级难题,而是常见小坑:循环拼接字符串、列表头部插入、大列表成员检查、重复计算。陷阱清单像体检表,写代码前后都可以快速扫一遍。

核心源码(逐字来自文末完整源码)

def demo_traps() -> None:
    print("\n  ── 常见性能陷阱速查 ──────────────────────")

    traps = [
        ("字符串拼接",   "s += x(循环中)",      '"".join(parts)',          "O(n²) → O(n)"),
        ("列表头部插入", "lst.insert(0, x)",       "collections.deque",       "O(n) → O(1)"),
        ("重复成员检查", "x in list(大列表)",    "x in set",                "O(n) → O(1)"),
        ("全局变量查找", "global_var(热循环中)", "local = global_var",      "减少 LOAD_GLOBAL"),
        ("重复计算",     "fn(x) 每次调用",         "@lru_cache",              "O(n) → O(1)"),
        ("小数据用 np",  "np.array([1,2,3])+1",   "直接用 list",             "避免 numpy 开销"),
    ]

    print(f"\n  {'陷阱':<12} {'反模式':<22} {'正确做法':<22} {'效果'}")
    print(f"  {'─'*12} {'─'*22} {'─'*22} {'─'*18}")
    for trap, bad, good, effect in traps:
        print(f"  {trap:<12} {bad:<22} {good:<22} {effect}")

    # 实测:list vs set 成员检查
    big_list = list(range(10000))
    big_set  = set(range(10000))

    t_list = timeit.timeit(lambda: 9999 in big_list, number=10000)
    t_set  = timeit.timeit(lambda: 9999 in big_set,  number=10000)
    speedup = t_list / t_set

    print(f"\n  实测 `9999 in list(10000)` vs `9999 in set(10000)`:")
    print(f"    list: {t_list*1000:.2f}ms  set: {t_set*1000:.2f}ms  加速 {speedup:.0f}x")

可运行演示(补齐 Mock 数据与 print 反馈)

import timeit

def demo_traps() -> None:
    print("\n  ── 常见性能陷阱速查 ──────────────────────")

    traps = [
        ("字符串拼接",   "s += x(循环中)",      '"".join(parts)',          "O(n²) → O(n)"),
        ("列表头部插入", "lst.insert(0, x)",       "collections.deque",       "O(n) → O(1)"),
        ("重复成员检查", "x in list(大列表)",    "x in set",                "O(n) → O(1)"),
        ("全局变量查找", "global_var(热循环中)", "local = global_var",      "减少 LOAD_GLOBAL"),
        ("重复计算",     "fn(x) 每次调用",         "@lru_cache",              "O(n) → O(1)"),
        ("小数据用 np",  "np.array([1,2,3])+1",   "直接用 list",             "避免 numpy 开销"),
    ]

    print(f"\n  {'陷阱':<12} {'反模式':<22} {'正确做法':<22} {'效果'}")
    print(f"  {'─'*12} {'─'*22} {'─'*22} {'─'*18}")
    for trap, bad, good, effect in traps:
        print(f"  {trap:<12} {bad:<22} {good:<22} {effect}")

    # 实测:list vs set 成员检查
    big_list = list(range(10000))
    big_set  = set(range(10000))

    t_list = timeit.timeit(lambda: 9999 in big_list, number=10000)
    t_set  = timeit.timeit(lambda: 9999 in big_set,  number=10000)
    speedup = t_list / t_set

    print(f"\n  实测 `9999 in list(10000)` vs `9999 in set(10000)`:")
    print(f"    list: {t_list*1000:.2f}ms  set: {t_set*1000:.2f}ms  加速 {speedup:.0f}x")

demo_traps()

Step 8:用 main 把 cprofile/memory/compare/traps 做成命令行探针

痛点与机制

性能分析脚本要能按需运行。--mode cprofile 看热点,--mode memory 看内存,--mode compare 看基准,--mode traps 看速查。这样读者不需要改源码,只要切换参数。

核心源码(逐字来自文末完整源码)

def main() -> None:
    parser = argparse.ArgumentParser(description="性能探针工具链演示")
    parser.add_argument(
        "--mode",
        choices=["cprofile", "memory", "compare", "traps", "all"],
        default="all",
    )
    args = parser.parse_args()

    if args.mode in ("cprofile", "all"):
        demo_cprofile()
    if args.mode in ("memory", "all"):
        demo_memory()
    if args.mode in ("compare", "all"):
        demo_compare()
    if args.mode in ("traps", "all"):
        demo_traps()

可运行演示(补齐 Mock 数据与 print 反馈)

import argparse

def main() -> None:
    parser = argparse.ArgumentParser(description="性能探针工具链演示")
    parser.add_argument(
        "--mode",
        choices=["cprofile", "memory", "compare", "traps", "all"],
        default="all",
    )
    args = parser.parse_args()

    if args.mode in ("cprofile", "all"):
        demo_cprofile()
    if args.mode in ("memory", "all"):
        demo_memory()
    if args.mode in ("compare", "all"):
        demo_compare()
    if args.mode in ("traps", "all"):
        demo_traps()

def demo_cprofile() -> None:
    print("运行 cProfile 热点分析")


def demo_memory() -> None:
    print("运行 tracemalloc 内存分析")


def demo_compare() -> None:
    print("运行 timeit 基准对比")


def demo_traps() -> None:
    print("运行性能陷阱速查")

import sys
for mode in ["cprofile", "memory", "compare", "traps"]:
    print(f"\n$ python profiler_demo.py --mode {mode}")
    sys.argv = ["profiler_demo.py", "--mode", mode]
    main()

极客实战:完整源码与运行

现在,把上面的积木拼起来,将下面完整代码保存为 profiler_demo.py。它不需要安装第三方依赖,直接用内置语料演示 cProfile、tracemalloc、timeit 和常见性能陷阱。

# profiler_demo.py
"""
性能探针工具链演示 —— cProfile/tracemalloc/timeit 全内置,零安装。
用法:
    python3 profiler_demo.py
    python3 profiler_demo.py --mode cprofile
    python3 profiler_demo.py --mode memory
    python3 profiler_demo.py --mode compare
    python3 profiler_demo.py --mode traps
"""

import argparse
import cProfile
import io
import pstats
import timeit
import tracemalloc
from functools import lru_cache
from typing import Callable


# ══════════════════════════════════════════════════════════════
# 模拟一个"文本处理管道"作为被分析对象
# ══════════════════════════════════════════════════════════════

# 生成 Mock 语料(零外部依赖)
CORPUS: list[str] = [
    f"task_node_{i}: processing document with keyword_{'abc' * (i % 5 + 1)}"
    for i in range(2000)
]


def slow_word_count(corpus: list[str]) -> dict[str, int]:
    """慢版本:字符串拼接 + 重复遍历。"""
    result: dict[str, int] = {}
    for doc in corpus:
        words = doc.split()
        for word in words:
            # 反模式:每次都重新遍历 result
            found = False
            for k in result:
                if k == word:
                    result[k] += 1
                    found = True
                    break
            if not found:
                result[word] = 1
    return result


def fast_word_count(corpus: list[str]) -> dict[str, int]:
    """快版本:直接用 dict.get()。"""
    result: dict[str, int] = {}
    for doc in corpus:
        for word in doc.split():
            result[word] = result.get(word, 0) + 1
    return result


def memory_leak_demo(n: int = 500) -> list[str]:
    """模拟内存问题:在循环中用 + 拼接字符串(O(n²) 内存)。"""
    result = ""
    for i in range(n):
        result = result + f"chunk_{i}_"   # 每次创建新字符串对象
    return result.split("_")


def memory_efficient(n: int = 500) -> list[str]:
    """内存友好版本:用列表收集再 join。"""
    parts = [f"chunk_{i}_" for i in range(n)]
    return "".join(parts).split("_")


@lru_cache(maxsize=256)
def fib_cached(n: int) -> int:
    """带缓存的斐波那契——演示 lru_cache 的性能效果。"""
    if n <= 1:
        return n
    return fib_cached(n - 1) + fib_cached(n - 2)


def fib_plain(n: int) -> int:
    """无缓存版本。"""
    if n <= 1:
        return n
    return fib_plain(n - 1) + fib_plain(n - 2)


# ── cProfile 演示 ─────────────────────────────────────────────
def demo_cprofile() -> None:
    print("\n  ── cProfile 函数调用分析 ─────────────────")

    pr = cProfile.Profile()
    pr.enable()
    slow_word_count(CORPUS[:200])   # 只用200条,避免太慢
    pr.disable()

    # 用 pstats 格式化输出
    stream = io.StringIO()
    ps = pstats.Stats(pr, stream=stream)
    ps.sort_stats("cumulative")
    ps.print_stats(8)   # 只显示前8行

    output = stream.getvalue()
    # 提取关键行输出
    lines = [l for l in output.split("\n") if l.strip() and not l.startswith("   ")]
    print("\n".join(f"  {l}" for l in lines[:15]))

    print("\n  💡 关注 cumtime(累计耗时)最高的函数——那就是瓶颈")


# ── tracemalloc 内存分析 ──────────────────────────────────────
def demo_memory() -> None:
    print("\n  ── tracemalloc 内存分配追踪 ──────────────")

    # 分析慢版本内存
    tracemalloc.start()
    memory_leak_demo(300)
    snapshot_slow = tracemalloc.take_snapshot()
    tracemalloc.stop()

    # 分析快版本内存
    tracemalloc.start()
    memory_efficient(300)
    snapshot_fast = tracemalloc.take_snapshot()
    tracemalloc.stop()

    # 对比 Top 3 内存分配
    print("\n  慢版本(字符串拼接)Top 3 内存分配:")
    for stat in snapshot_slow.statistics("lineno")[:3]:
        print(f"    {stat.size / 1024:.1f} KB  {stat.traceback.format()[-1]}")

    print("\n  快版本(列表join)Top 3 内存分配:")
    for stat in snapshot_fast.statistics("lineno")[:3]:
        print(f"    {stat.size / 1024:.1f} KB  {stat.traceback.format()[-1]}")


# ── timeit 精确对比 ───────────────────────────────────────────
def demo_compare() -> None:
    print("\n  ── timeit 性能对比 ───────────────────────")

    benchmarks: list[tuple[str, Callable, int]] = [
        ("slow_word_count (200条)",  lambda: slow_word_count(CORPUS[:200]),  3),
        ("fast_word_count (200条)",  lambda: fast_word_count(CORPUS[:200]),  3),
        ("fast_word_count (2000条)", lambda: fast_word_count(CORPUS),        3),
        ("fib_plain(25)",            lambda: fib_plain(25),                  5),
        ("fib_cached(25)",           lambda: fib_cached(25),                 5),
        ("str concat (300次)",       lambda: memory_leak_demo(300),          3),
        ("list join (300次)",        lambda: memory_efficient(300),          3),
    ]

    results: list[tuple[str, float]] = []
    for name, fn, repeat in benchmarks:
        elapsed = timeit.timeit(fn, number=repeat) / repeat
        results.append((name, elapsed))

    # 找最慢的作为基准
    max_time = max(t for _, t in results)

    print(f"\n  {'函数':<30} {'耗时(ms)':>10} {'相对速度':>10}")
    print(f"  {'─'*30} {'─'*10} {'─'*10}")
    for name, elapsed in results:
        ms = elapsed * 1000
        ratio = max_time / elapsed if elapsed > 0 else float("inf")
        bar = "█" * min(int(ratio), 20)
        print(f"  {name:<30} {ms:>9.3f}  {bar}")


# ── 常见性能陷阱 ──────────────────────────────────────────────
def demo_traps() -> None:
    print("\n  ── 常见性能陷阱速查 ──────────────────────")

    traps = [
        ("字符串拼接",   "s += x(循环中)",      '"".join(parts)',          "O(n²) → O(n)"),
        ("列表头部插入", "lst.insert(0, x)",       "collections.deque",       "O(n) → O(1)"),
        ("重复成员检查", "x in list(大列表)",    "x in set",                "O(n) → O(1)"),
        ("全局变量查找", "global_var(热循环中)", "local = global_var",      "减少 LOAD_GLOBAL"),
        ("重复计算",     "fn(x) 每次调用",         "@lru_cache",              "O(n) → O(1)"),
        ("小数据用 np",  "np.array([1,2,3])+1",   "直接用 list",             "避免 numpy 开销"),
    ]

    print(f"\n  {'陷阱':<12} {'反模式':<22} {'正确做法':<22} {'效果'}")
    print(f"  {'─'*12} {'─'*22} {'─'*22} {'─'*18}")
    for trap, bad, good, effect in traps:
        print(f"  {trap:<12} {bad:<22} {good:<22} {effect}")

    # 实测:list vs set 成员检查
    big_list = list(range(10000))
    big_set  = set(range(10000))

    t_list = timeit.timeit(lambda: 9999 in big_list, number=10000)
    t_set  = timeit.timeit(lambda: 9999 in big_set,  number=10000)
    speedup = t_list / t_set

    print(f"\n  实测 `9999 in list(10000)` vs `9999 in set(10000)`:")
    print(f"    list: {t_list*1000:.2f}ms  set: {t_set*1000:.2f}ms  加速 {speedup:.0f}x")


def main() -> None:
    parser = argparse.ArgumentParser(description="性能探针工具链演示")
    parser.add_argument(
        "--mode",
        choices=["cprofile", "memory", "compare", "traps", "all"],
        default="all",
    )
    args = parser.parse_args()

    if args.mode in ("cprofile", "all"):
        demo_cprofile()
    if args.mode in ("memory", "all"):
        demo_memory()
    if args.mode in ("compare", "all"):
        demo_compare()
    if args.mode in ("traps", "all"):
        demo_traps()


if __name__ == "__main__":
    main()
$ python profiler_demo.py --mode cprofile
── cProfile 函数调用分析 ─────────────────


  💡 关注 cumtime(累计耗时)最高的函数——那就是瓶颈

$ python profiler_demo.py --mode memory
── tracemalloc 内存分配追踪 ──────────────

  慢版本(字符串拼接)Top 3 内存分配:

  快版本(列表join)Top 3 内存分配:

$ python profiler_demo.py --mode compare
── timeit 性能对比 ───────────────────────

  函数                                 耗时(ms)       相对速度
  ────────────────────────────── ────────── ──────────
  slow_word_count (200条)             0.257  █████████████████
  fast_word_count (200条)             0.064  ████████████████████
  fast_word_count (2000条)            0.678  ██████
  fib_plain(25)                      4.466  █
  fib_cached(25)                     0.002  ████████████████████
  str concat (300次)                  0.042  ████████████████████
  list join (300次)                   0.031  ████████████████████

$ python profiler_demo.py --mode traps
── 常见性能陷阱速查 ──────────────────────

  陷阱           反模式                    正确做法                   效果
  ──────────── ────────────────────── ────────────────────── ──────────────────
  字符串拼接        s += x(循环中)            "".join(parts)         O() → O(n)
  列表头部插入       lst.insert(0, x)       collections.deque      O(n) → O(1)
  重复成员检查       x in list(大列表)         x in set               O(n) → O(1)
  全局变量查找       global_var(热循环中)       local = global_var     减少 LOAD_GLOBAL
  重复计算         fn(x) 每次调用             @lru_cache             O(n) → O(1)
  小数据用 np      np.array([1,2,3])+1    直接用 list               避免 numpy 开销

  实测 `9999 in list(10000)` vs `9999 in set(10000)`    list: 464.03ms  set: 0.24ms  加速 1958x

小结与 NexDo Time ⚡

这一篇你掌握了性能优化的正确顺序:先用 timeit 量化差距,再用 cProfile 找函数热点,用 tracemalloc 查内存分配,最后针对具体证据优化。不要靠感觉改代码,性能工程的核心是“测量、修改、复测”。

5 分钟微操挑战:把 demo_compare()fib_plain(25) 改成 fib_plain(28),再运行 --mode compare。观察无缓存递归会变慢多少,并解释为什么缓存版本变化不明显。

Don’t wait for next time, do it in the next moment.