文章

12 · 多进程与多线程:榨干 CPU 性能

#012 · 2026-04-16 · Python

🔗 知识图谱导航:阅读本文前,建议先掌握/回顾 《11 · Socket 编程:TCP/UDP 底层通信》 中的核心概念;本文会在这个基础上继续推进。 上一篇我们用 Socket 把数据送上网络;本篇回到本机,学会同时跑多条腿——理解 GIL 才能在正确的场景选对并发模型。

极客解析:并发不是把代码写复杂,而是把等待、调度和资源隔离讲清楚;本文用本机可运行的 Mock 场景验证机制。

痛点与架构:单独记 API 或概念很容易学完就忘;本文先锁定真实痛点,再把它拆成“输入数据 → 核心机制 → 可运行输出”三段闭环。深入 GIL 原理,掌握 threading 锁/事件/信号量、multiprocessing 进程池/共享内存、concurrent.futures 统一接口,并用 ASCII 柱状图对比单线程/多线程/多进程的实际耗时。

1. GIL 原理(ASCII 图)

Python 解释器(CPython)内部:

  线程A ──▶ 获取GIL ──▶ 执行字节码 ──▶ 释放GIL ──▶ 等待
  线程B ──▶ 等待GIL ──────────────────▶ 获取GIL ──▶ 执行

  结论:同一时刻只有一个线程在执行 Python 字节码
        ↓
  CPU密集型:多线程 ≈ 单线程(GIL 是瓶颈)
  IO密集型:多线程有效(IO等待时自动释放GIL)
        ↓
  CPU密集型 → 用 multiprocessing(每个进程有独立GIL)
  IO密集型  → 用 threading 或 asyncio

2. threading:线程锁 / 事件 / 信号量

import threading
import time


counter = 0
lock = threading.Lock()

def increment() -> None:
    global counter
    with lock:          # 自动 acquire / release
        counter += 1

# ── 事件(Event)─────────────────────────────────────────────
ready = threading.Event()

def producer() -> None:
    # 准备好数据后通知消费者
    ready.set()

def consumer() -> None:
    ready.wait()        # 阻塞直到 set()
    print("数据已就绪,开始消费")

# ── 信号量(Semaphore)────────────────────────────────────────
# 限制同时访问 AI 推理服务的并发数为 3
sem = threading.Semaphore(3)

def call_inference(task_id: int) -> None:
    with sem:
        print(f"任务 {task_id} 正在推理...")
        time.sleep(0.01)  # 模拟一次很短的推理

threads = [threading.Thread(target=increment) for _ in range(5)]
for t in threads:
    t.start()
for t in threads:
    t.join()
print("Lock 计数器:", counter)

threading.Thread(target=producer).start()
consumer()

workers = [threading.Thread(target=call_inference, args=(i,)) for i in range(4)]
for w in workers:
    w.start()
for w in workers:
    w.join()

3. multiprocessing:进程池与共享内存

from multiprocessing import Pool, Value, Lock
import ctypes

# ── 进程池 ────────────────────────────────────────────────────
def cpu_task(n: int) -> int:
    return sum(i * i for i in range(n))

# ── 共享内存(跨进程计数器)──────────────────────────────────
shared_count = Value(ctypes.c_int, 0)
shared_lock  = Lock()

def worker(shared: Value, lk: Lock) -> None:
    with lk:
        shared.value += 1

if __name__ == "__main__":
    with Pool(processes=2) as pool:
        results = pool.map(cpu_task, [10_000] * 4)
    worker(shared_count, shared_lock)
    print("进程池结果数量:", len(results))
    print("共享计数器:", shared_count.value)

4. concurrent.futures:统一高层接口

import time
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor, as_completed

def io_task(t): time.sleep(0.01); return t * 2
def cpu_task(t): return sum(range(t * 1000))

tasks = list(range(8))

# 线程池(IO密集型)
thread_results = []
with ThreadPoolExecutor(max_workers=4) as executor:
    futures = {executor.submit(io_task, t): t for t in tasks}
    for fut in as_completed(futures):
        thread_results.append(fut.result())

if __name__ == "__main__":
    # 进程池(CPU密集型)
    try:
        with ProcessPoolExecutor(max_workers=2) as executor:
            process_results = list(executor.map(cpu_task, tasks))
    except (OSError, PermissionError) as exc:
        print("进程池受限,降级为单线程:", type(exc).__name__)
        process_results = [cpu_task(t) for t in tasks]

    print("线程池结果数量:", len(thread_results))
    print("进程池结果数量:", len(process_results))

concurrent.futures 屏蔽了线程/进程的底层差异,是生产代码的首选。


实战:知识库文本批量清洗性能对比

模拟对 24 篇知识库文档进行文本清洗,对比三种并发模型的耗时,并用 ASCII 柱状图展示结果。

步步为营:核心逻辑自适应拆解

并发不是“线程越多越好”,而是先分清任务类型:IO 密集型主要在等,线程池能把等待重叠;CPU 密集型主要在算,进程池才能绕开 GIL。下面用小实验逐步建立直觉。

Step 1:先分清 IO 密集型和 CPU 密集型任务

痛点与机制

并发选型的第一步不是上线程池,而是判断任务类型。io_clean_task()sleep 模拟网络等待,等待时线程可以去做别的;cpu_clean_task() 是纯计算,会被 GIL 限制,更适合多进程。

核心源码(逐字来自文末完整源码)

def io_clean_task(doc_id: int) -> dict:
    """模拟 IO 密集型:下载+清洗一篇文档"""
    time.sleep(0.05)   # 模拟 50ms 网络 IO
    word_count = (doc_id * 137 + 42) % 500 + 100
    return {"doc_id": doc_id, "words": word_count, "status": "ok"}

# CPU密集型:模拟对文本做哈希/分词等计算
def cpu_clean_task(doc_id: int) -> dict:
    """模拟 CPU 密集型:对文档内容做计算密集型处理"""
    # 纯计算,不 sleep
    total = sum(i * i for i in range(50_000))
    word_count = (total + doc_id) % 500 + 100
    return {"doc_id": doc_id, "words": word_count, "status": "ok"}

可运行演示(补齐 Mock 数据与 print 反馈)

import time

# Step 1:IO任务像等外卖,CPU任务像自己做一桌菜,优化方式完全不同。
def io_clean_task(doc_id: int) -> dict:
    """模拟 IO 密集型:下载+清洗一篇文档"""
    time.sleep(0.05)   # 模拟 50ms 网络 IO
    word_count = (doc_id * 137 + 42) % 500 + 100
    return {"doc_id": doc_id, "words": word_count, "status": "ok"}

# CPU密集型:模拟对文本做哈希/分词等计算
def cpu_clean_task(doc_id: int) -> dict:
    """模拟 CPU 密集型:对文档内容做计算密集型处理"""
    # 纯计算,不 sleep
    total = sum(i * i for i in range(50_000))
    word_count = (total + doc_id) % 500 + 100
    return {"doc_id": doc_id, "words": word_count, "status": "ok"}

print("IO任务结果:", io_clean_task(1))
print("CPU任务结果:", cpu_clean_task(1))

Step 2:用顺序执行建立性能基准线

痛点与机制

基准线很重要。没有顺序执行的耗时,就不知道线程池或进程池到底有没有更快。这里 24 个 IO 任务每个等 50ms,顺序执行大约就是 24 次等待叠加。

核心源码(逐字来自文末完整源码)

DOC_IDS = list(range(24))

def run_sequential(task_fn) -> tuple[list[dict], float]:
    start = time.perf_counter()
    results = [task_fn(d) for d in DOC_IDS]
    return results, time.perf_counter() - start

可运行演示(补齐 Mock 数据与 print 反馈)

import time

def io_clean_task(doc_id: int) -> dict:
    """模拟 IO 密集型:下载+清洗一篇文档"""
    time.sleep(0.05)   # 模拟 50ms 网络 IO
    word_count = (doc_id * 137 + 42) % 500 + 100
    return {"doc_id": doc_id, "words": word_count, "status": "ok"}

# CPU密集型:模拟对文本做哈希/分词等计算
def cpu_clean_task(doc_id: int) -> dict:
    """模拟 CPU 密集型:对文档内容做计算密集型处理"""
    # 纯计算,不 sleep
    total = sum(i * i for i in range(50_000))
    word_count = (total + doc_id) % 500 + 100
    return {"doc_id": doc_id, "words": word_count, "status": "ok"}

DOC_IDS = list(range(24))

# Step 2:顺序执行像一个人排队干活,它是后面所有加速比的基准线。
def run_sequential(task_fn) -> tuple[list[dict], float]:
    start = time.perf_counter()
    results = [task_fn(d) for d in DOC_IDS]
    return results, time.perf_counter() - start

results, elapsed = run_sequential(io_clean_task)
print("处理数量:", len(results))
print("首条结果:", results[0])
print(f"顺序耗时: {elapsed:.3f}s")

Step 3:用 ThreadPoolExecutor 让 IO 等待重叠起来

痛点与机制

线程池适合 IO 密集型任务,因为等待网络、磁盘时,线程会释放 GIL,其他线程可以继续跑。你可以把它想成多人同时等不同快递,不是一个人等完 A 再等 B。

核心源码(逐字来自文末完整源码)

def run_thread_pool(task_fn, workers: int = 8) -> tuple[list[dict], float]:
    start = time.perf_counter()
    with ThreadPoolExecutor(max_workers=workers) as ex:
        results = list(ex.map(task_fn, DOC_IDS))
    return results, time.perf_counter() - start

可运行演示(补齐 Mock 数据与 print 反馈)

import time
from concurrent.futures import ThreadPoolExecutor

def io_clean_task(doc_id: int) -> dict:
    """模拟 IO 密集型:下载+清洗一篇文档"""
    time.sleep(0.05)   # 模拟 50ms 网络 IO
    word_count = (doc_id * 137 + 42) % 500 + 100
    return {"doc_id": doc_id, "words": word_count, "status": "ok"}

# CPU密集型:模拟对文本做哈希/分词等计算
def cpu_clean_task(doc_id: int) -> dict:
    """模拟 CPU 密集型:对文档内容做计算密集型处理"""
    # 纯计算,不 sleep
    total = sum(i * i for i in range(50_000))
    word_count = (total + doc_id) % 500 + 100
    return {"doc_id": doc_id, "words": word_count, "status": "ok"}

DOC_IDS = list(range(24))

# Step 3:线程池像让 8 个同事同时等外卖,等待时间可以重叠。
def run_thread_pool(task_fn, workers: int = 8) -> tuple[list[dict], float]:
    start = time.perf_counter()
    with ThreadPoolExecutor(max_workers=workers) as ex:
        results = list(ex.map(task_fn, DOC_IDS))
    return results, time.perf_counter() - start

_, t_seq = run_sequential(io_clean_task) if 'run_sequential' in globals() else ([io_clean_task(d) for d in DOC_IDS], 0.0)
results, elapsed = run_thread_pool(io_clean_task, workers=8)
print("线程池处理数量:", len(results))
print(f"线程池耗时: {elapsed:.3f}s")
print("提示: IO密集型通常适合线程池")

Step 4:读懂 ProcessPoolExecutor 的职责,并保留受限环境降级

痛点与机制

多进程可以绕开 GIL,因为每个进程都有自己的 Python 解释器和 GIL。但浏览器执行环境或沙箱可能限制创建子进程,所以教学片段先讲清职责,完整脚本里再通过入口统一运行和降级。

核心源码(逐字来自文末完整源码)

def run_process_pool(task_fn, workers: int = 4) -> tuple[list[dict], float]:
    start = time.perf_counter()
    try:
        with ProcessPoolExecutor(max_workers=workers) as ex:
            results = list(ex.map(task_fn, DOC_IDS))
    except (OSError, PermissionError) as exc:
        print(f"  [降级] 当前环境限制多进程:{exc},改用单线程模拟结果。")
        results = [task_fn(d) for d in DOC_IDS]
    return results, time.perf_counter() - start

可运行演示(补齐 Mock 数据与 print 反馈)

import time
from concurrent.futures import ProcessPoolExecutor

def io_clean_task(doc_id: int) -> dict:
    """模拟 IO 密集型:下载+清洗一篇文档"""
    time.sleep(0.05)   # 模拟 50ms 网络 IO
    word_count = (doc_id * 137 + 42) % 500 + 100
    return {"doc_id": doc_id, "words": word_count, "status": "ok"}

# CPU密集型:模拟对文本做哈希/分词等计算
def cpu_clean_task(doc_id: int) -> dict:
    """模拟 CPU 密集型:对文档内容做计算密集型处理"""
    # 纯计算,不 sleep
    total = sum(i * i for i in range(50_000))
    word_count = (total + doc_id) % 500 + 100
    return {"doc_id": doc_id, "words": word_count, "status": "ok"}

DOC_IDS = list(range(24))

# Step 4:进程池像开多个厨房,各自有独立 GIL,适合 CPU 密集型。
def run_process_pool(task_fn, workers: int = 4) -> tuple[list[dict], float]:
    start = time.perf_counter()
    try:
        with ProcessPoolExecutor(max_workers=workers) as ex:
            results = list(ex.map(task_fn, DOC_IDS))
    except (OSError, PermissionError) as exc:
        print(f"  [降级] 当前环境限制多进程:{exc},改用单线程模拟结果。")
        results = [task_fn(d) for d in DOC_IDS]
    return results, time.perf_counter() - start

print("run_process_pool 已定义,内部带 OSError/PermissionError 降级保护。")
print("示例用途: results, elapsed = run_process_pool(cpu_clean_task, workers=4)")
print("当前页面演示不直接启动子进程,避免浏览器沙箱环境卡住。")

Step 5:用 ASCII 柱状图把耗时差距画出来

痛点与机制

性能数据只看小数不直观。ascii_bar_chart() 把耗时转成终端柱状图,最长的一项占满 30 格,其它按比例缩短,一眼就能看出线程池在 IO 场景下更快。

核心源码(逐字来自文末完整源码)

def ascii_bar_chart(data: dict[str, float], title: str, unit: str = "s") -> None:
    max_val = max(data.values()) or 1
    bar_width = 30
    print(f"\n{'─'*50}")
    print(f"  {title}")
    print(f"{'─'*50}")
    for label, val in data.items():
        filled = int(val / max_val * bar_width)
        bar = "█" * filled + "░" * (bar_width - filled)
        print(f"  {label:<18} │{bar}│ {val:.3f}{unit}")
    print(f"{'─'*50}")
    fastest = min(data, key=data.get)  # type: ignore[arg-type]
    print(f"  最快: {fastest}  ({data[fastest]:.3f}{unit})")

可运行演示(补齐 Mock 数据与 print 反馈)

# Step 5:柱状图把数字变成视觉长度,新手更容易看出谁快谁慢。
def ascii_bar_chart(data: dict[str, float], title: str, unit: str = "s") -> None:
    max_val = max(data.values()) or 1
    bar_width = 30
    print(f"\n{'─'*50}")
    print(f"  {title}")
    print(f"{'─'*50}")
    for label, val in data.items():
        filled = int(val / max_val * bar_width)
        bar = "█" * filled + "░" * (bar_width - filled)
        print(f"  {label:<18}{bar}{val:.3f}{unit}")
    print(f"{'─'*50}")
    fastest = min(data, key=data.get)  # type: ignore[arg-type]
    print(f"  最快: {fastest}  ({data[fastest]:.3f}{unit})")

ascii_bar_chart({"单线程": 1.20, "线程池(8)": 0.18, "进程池(4)": 0.55}, "IO密集型耗时示例")

Step 6:用 Lock/Event/Semaphore 看懂线程同步三件套

痛点与机制

多线程不是“多开几个线程”就完事。Lock 防止多个线程同时改同一个变量,Event 用来通知“数据准备好了”,Semaphore 限制同时访问某个资源的人数,比如最多 2 个推理请求同时跑。

核心源码(逐字来自文末完整源码)

def demo_threading_primitives() -> None:
    print("\n=== 线程同步原语演示 ===\n")

    # 1. Lock:安全计数器
    counter = 0
    lock = threading.Lock()
    def safe_increment(n: int) -> None:
        nonlocal counter
        for _ in range(n):
            with lock:
                counter += 1

    threads = [threading.Thread(target=safe_increment, args=(1000,)) for _ in range(5)]
    for t in threads: t.start()
    for t in threads: t.join()
    print(f"Lock 计数器(期望 5000): {counter}")

    # 2. Event:生产者-消费者
    data_ready = threading.Event()
    pipeline_result: list[str] = []

    def producer() -> None:
        time.sleep(0.1)
        pipeline_result.append("embedding向量已生成")
        data_ready.set()

    def consumer() -> None:
        data_ready.wait(timeout=2.0)
        if pipeline_result:
            print(f"Event 消费者收到: {pipeline_result[0]}")

    t1 = threading.Thread(target=producer)
    t2 = threading.Thread(target=consumer)
    t1.start(); t2.start()
    t1.join(); t2.join()

    # 3. Semaphore:限流
    sem = threading.Semaphore(2)   # 最多2个并发推理
    log: list[str] = []
    log_lock = threading.Lock()

    def inference_call(task_id: int) -> None:
        with sem:
            with log_lock:
                log.append(f"推理任务{task_id}开始")
            time.sleep(0.05)
            with log_lock:
                log.append(f"推理任务{task_id}完成")

    workers = [threading.Thread(target=inference_call, args=(i,)) for i in range(4)]
    for w in workers: w.start()
    for w in workers: w.join()
    print(f"Semaphore 推理日志: {log}")

可运行演示(补齐 Mock 数据与 print 反馈)

import threading
import time

# Step 6:同步原语像交通规则,保证多线程不抢、不乱、不超载。
def demo_threading_primitives() -> None:
    print("\n=== 线程同步原语演示 ===\n")

    # 1. Lock:安全计数器
    counter = 0
    lock = threading.Lock()
    def safe_increment(n: int) -> None:
        nonlocal counter
        for _ in range(n):
            with lock:
                counter += 1

    threads = [threading.Thread(target=safe_increment, args=(1000,)) for _ in range(5)]
    for t in threads: t.start()
    for t in threads: t.join()
    print(f"Lock 计数器(期望 5000): {counter}")

    # 2. Event:生产者-消费者
    data_ready = threading.Event()
    pipeline_result: list[str] = []

    def producer() -> None:
        time.sleep(0.1)
        pipeline_result.append("embedding向量已生成")
        data_ready.set()

    def consumer() -> None:
        data_ready.wait(timeout=2.0)
        if pipeline_result:
            print(f"Event 消费者收到: {pipeline_result[0]}")

    t1 = threading.Thread(target=producer)
    t2 = threading.Thread(target=consumer)
    t1.start(); t2.start()
    t1.join(); t2.join()

    # 3. Semaphore:限流
    sem = threading.Semaphore(2)   # 最多2个并发推理
    log: list[str] = []
    log_lock = threading.Lock()

    def inference_call(task_id: int) -> None:
        with sem:
            with log_lock:
                log.append(f"推理任务{task_id}开始")
            time.sleep(0.05)
            with log_lock:
                log.append(f"推理任务{task_id}完成")

    workers = [threading.Thread(target=inference_call, args=(i,)) for i in range(4)]
    for w in workers: w.start()
    for w in workers: w.join()
    print(f"Semaphore 推理日志: {log}")

demo_threading_primitives()

Step 7:用 demo_compare 对比 IO 和 CPU 两类任务

痛点与机制

demo_compare() 是整篇文章的实验台:同一批任务分别用单线程、线程池、进程池跑一遍,再画图对比。IO 看线程池,CPU 看进程池,这个结论要通过实测建立。

核心源码(逐字来自文末完整源码)

def demo_compare() -> None:
    print("\n=== IO密集型任务对比(模拟网络请求)===")
    print(f"任务数: {len(DOC_IDS)} 篇文档,每篇模拟 50ms IO\n")

    _, t_seq  = run_sequential(io_clean_task)
    _, t_thr  = run_thread_pool(io_clean_task, workers=8)
    _, t_proc = run_process_pool(io_clean_task, workers=4)

    ascii_bar_chart(
        {"单线程": t_seq, "线程池(8)": t_thr, "进程池(4)": t_proc},
        "IO密集型耗时对比",
    )

    print("\n=== CPU密集型任务对比(纯计算)===")
    print(f"任务数: {len(DOC_IDS)} 篇文档,每篇做 50000 次平方和\n")

    _, t_seq2  = run_sequential(cpu_clean_task)
    _, t_thr2  = run_thread_pool(cpu_clean_task, workers=8)
    _, t_proc2 = run_process_pool(cpu_clean_task, workers=4)

    ascii_bar_chart(
        {"单线程": t_seq2, "线程池(8)": t_thr2, "进程池(4)": t_proc2},
        "CPU密集型耗时对比",
    )

    print("\n结论:")
    print(f"  IO密集型: 线程池加速比 = {t_seq/t_thr:.1f}x")
    print(f"  CPU密集型: 进程池加速比 = {t_seq2/t_proc2:.1f}x")
    print(f"  CPU密集型: 线程池加速比 = {t_seq2/t_thr2:.1f}x  ← GIL 限制,接近1x")

可运行演示(补齐 Mock 数据与 print 反馈)

import time
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor

def io_clean_task(doc_id: int) -> dict:
    """模拟 IO 密集型:下载+清洗一篇文档"""
    time.sleep(0.05)   # 模拟 50ms 网络 IO
    word_count = (doc_id * 137 + 42) % 500 + 100
    return {"doc_id": doc_id, "words": word_count, "status": "ok"}

# CPU密集型:模拟对文本做哈希/分词等计算
def cpu_clean_task(doc_id: int) -> dict:
    """模拟 CPU 密集型:对文档内容做计算密集型处理"""
    # 纯计算,不 sleep
    total = sum(i * i for i in range(50_000))
    word_count = (total + doc_id) % 500 + 100
    return {"doc_id": doc_id, "words": word_count, "status": "ok"}

DOC_IDS = list(range(24))

def run_sequential(task_fn) -> tuple[list[dict], float]:
    start = time.perf_counter()
    results = [task_fn(d) for d in DOC_IDS]
    return results, time.perf_counter() - start

def run_thread_pool(task_fn, workers: int = 8) -> tuple[list[dict], float]:
    start = time.perf_counter()
    with ThreadPoolExecutor(max_workers=workers) as ex:
        results = list(ex.map(task_fn, DOC_IDS))
    return results, time.perf_counter() - start

def run_process_pool(task_fn, workers: int = 4) -> tuple[list[dict], float]:
    start = time.perf_counter()
    try:
        with ProcessPoolExecutor(max_workers=workers) as ex:
            results = list(ex.map(task_fn, DOC_IDS))
    except (OSError, PermissionError) as exc:
        print(f"  [降级] 当前环境限制多进程:{exc},改用单线程模拟结果。")
        results = [task_fn(d) for d in DOC_IDS]
    return results, time.perf_counter() - start

def ascii_bar_chart(data: dict[str, float], title: str, unit: str = "s") -> None:
    max_val = max(data.values()) or 1
    bar_width = 30
    print(f"\n{'─'*50}")
    print(f"  {title}")
    print(f"{'─'*50}")
    for label, val in data.items():
        filled = int(val / max_val * bar_width)
        bar = "█" * filled + "░" * (bar_width - filled)
        print(f"  {label:<18}{bar}{val:.3f}{unit}")
    print(f"{'─'*50}")
    fastest = min(data, key=data.get)  # type: ignore[arg-type]
    print(f"  最快: {fastest}  ({data[fastest]:.3f}{unit})")

# Step 7:完整对比会同时跑 IO 和 CPU 场景,并给出加速比。
def demo_compare() -> None:
    print("\n=== IO密集型任务对比(模拟网络请求)===")
    print(f"任务数: {len(DOC_IDS)} 篇文档,每篇模拟 50ms IO\n")

    _, t_seq  = run_sequential(io_clean_task)
    _, t_thr  = run_thread_pool(io_clean_task, workers=8)
    _, t_proc = run_process_pool(io_clean_task, workers=4)

    ascii_bar_chart(
        {"单线程": t_seq, "线程池(8)": t_thr, "进程池(4)": t_proc},
        "IO密集型耗时对比",
    )

    print("\n=== CPU密集型任务对比(纯计算)===")
    print(f"任务数: {len(DOC_IDS)} 篇文档,每篇做 50000 次平方和\n")

    _, t_seq2  = run_sequential(cpu_clean_task)
    _, t_thr2  = run_thread_pool(cpu_clean_task, workers=8)
    _, t_proc2 = run_process_pool(cpu_clean_task, workers=4)

    ascii_bar_chart(
        {"单线程": t_seq2, "线程池(8)": t_thr2, "进程池(4)": t_proc2},
        "CPU密集型耗时对比",
    )

    print("\n结论:")
    print(f"  IO密集型: 线程池加速比 = {t_seq/t_thr:.1f}x")
    print(f"  CPU密集型: 进程池加速比 = {t_seq2/t_proc2:.1f}x")
    print(f"  CPU密集型: 线程池加速比 = {t_seq2/t_thr2:.1f}x  ← GIL 限制,接近1x")

print("demo_compare 已定义:完整脚本运行时会输出两张耗时柱状图。")
print("这里不直接启动进程池,避免受限环境影响阅读体验。")

Step 8:用 main 做并发模型演示的命令行遥控器

痛点与机制

命令行入口让脚本像有遥控器:--mode threading 专看线程同步,--mode pool 专看进程池,--mode compare 做完整对比。新手不用改代码,只改参数就能切实验。

核心源码(逐字来自文末完整源码)

def main() -> None:
    parser = argparse.ArgumentParser(description="并发模型性能对比")
    parser.add_argument("--mode", choices=["compare", "threading", "pool"],
                        default="compare", help="演示模式")
    args = parser.parse_args()

    print("GIL 原理速记:")
    print("  IO密集型  → threading / asyncio  (IO等待时释放GIL)")
    print("  CPU密集型 → multiprocessing       (独立进程,无GIL竞争)")

    if args.mode == "compare":
        demo_compare()
    elif args.mode == "threading":
        demo_threading_primitives()
    elif args.mode == "pool":
        demo_process_pool()

可运行演示(补齐 Mock 数据与 print 反馈)

import argparse
import threading
import time
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor

def io_clean_task(doc_id: int) -> dict:
    """模拟 IO 密集型:下载+清洗一篇文档"""
    time.sleep(0.05)   # 模拟 50ms 网络 IO
    word_count = (doc_id * 137 + 42) % 500 + 100
    return {"doc_id": doc_id, "words": word_count, "status": "ok"}

# CPU密集型:模拟对文本做哈希/分词等计算
def cpu_clean_task(doc_id: int) -> dict:
    """模拟 CPU 密集型:对文档内容做计算密集型处理"""
    # 纯计算,不 sleep
    total = sum(i * i for i in range(50_000))
    word_count = (total + doc_id) % 500 + 100
    return {"doc_id": doc_id, "words": word_count, "status": "ok"}

DOC_IDS = list(range(24))

def run_sequential(task_fn) -> tuple[list[dict], float]:
    start = time.perf_counter()
    results = [task_fn(d) for d in DOC_IDS]
    return results, time.perf_counter() - start

def run_thread_pool(task_fn, workers: int = 8) -> tuple[list[dict], float]:
    start = time.perf_counter()
    with ThreadPoolExecutor(max_workers=workers) as ex:
        results = list(ex.map(task_fn, DOC_IDS))
    return results, time.perf_counter() - start

def run_process_pool(task_fn, workers: int = 4) -> tuple[list[dict], float]:
    start = time.perf_counter()
    try:
        with ProcessPoolExecutor(max_workers=workers) as ex:
            results = list(ex.map(task_fn, DOC_IDS))
    except (OSError, PermissionError) as exc:
        print(f"  [降级] 当前环境限制多进程:{exc},改用单线程模拟结果。")
        results = [task_fn(d) for d in DOC_IDS]
    return results, time.perf_counter() - start

def ascii_bar_chart(data: dict[str, float], title: str, unit: str = "s") -> None:
    max_val = max(data.values()) or 1
    bar_width = 30
    print(f"\n{'─'*50}")
    print(f"  {title}")
    print(f"{'─'*50}")
    for label, val in data.items():
        filled = int(val / max_val * bar_width)
        bar = "█" * filled + "░" * (bar_width - filled)
        print(f"  {label:<18}{bar}{val:.3f}{unit}")
    print(f"{'─'*50}")
    fastest = min(data, key=data.get)  # type: ignore[arg-type]
    print(f"  最快: {fastest}  ({data[fastest]:.3f}{unit})")

def demo_threading_primitives() -> None:
    print("\n=== 线程同步原语演示 ===\n")

    # 1. Lock:安全计数器
    counter = 0
    lock = threading.Lock()
    def safe_increment(n: int) -> None:
        nonlocal counter
        for _ in range(n):
            with lock:
                counter += 1

    threads = [threading.Thread(target=safe_increment, args=(1000,)) for _ in range(5)]
    for t in threads: t.start()
    for t in threads: t.join()
    print(f"Lock 计数器(期望 5000): {counter}")

    # 2. Event:生产者-消费者
    data_ready = threading.Event()
    pipeline_result: list[str] = []

    def producer() -> None:
        time.sleep(0.1)
        pipeline_result.append("embedding向量已生成")
        data_ready.set()

    def consumer() -> None:
        data_ready.wait(timeout=2.0)
        if pipeline_result:
            print(f"Event 消费者收到: {pipeline_result[0]}")

    t1 = threading.Thread(target=producer)
    t2 = threading.Thread(target=consumer)
    t1.start(); t2.start()
    t1.join(); t2.join()

    # 3. Semaphore:限流
    sem = threading.Semaphore(2)   # 最多2个并发推理
    log: list[str] = []
    log_lock = threading.Lock()

    def inference_call(task_id: int) -> None:
        with sem:
            with log_lock:
                log.append(f"推理任务{task_id}开始")
            time.sleep(0.05)
            with log_lock:
                log.append(f"推理任务{task_id}完成")

    workers = [threading.Thread(target=inference_call, args=(i,)) for i in range(4)]
    for w in workers: w.start()
    for w in workers: w.join()
    print(f"Semaphore 推理日志: {log}")

def demo_process_pool() -> None:
    print("\n=== 进程池演示(CPU密集型)===\n")
    _, elapsed = run_process_pool(cpu_clean_task, workers=4)
    print(f"进程池(4进程)处理 {len(DOC_IDS)} 篇文档耗时: {elapsed:.3f}s")

def demo_compare() -> None:
    print("\n=== IO密集型任务对比(模拟网络请求)===")
    print(f"任务数: {len(DOC_IDS)} 篇文档,每篇模拟 50ms IO\n")

    _, t_seq  = run_sequential(io_clean_task)
    _, t_thr  = run_thread_pool(io_clean_task, workers=8)
    _, t_proc = run_process_pool(io_clean_task, workers=4)

    ascii_bar_chart(
        {"单线程": t_seq, "线程池(8)": t_thr, "进程池(4)": t_proc},
        "IO密集型耗时对比",
    )

    print("\n=== CPU密集型任务对比(纯计算)===")
    print(f"任务数: {len(DOC_IDS)} 篇文档,每篇做 50000 次平方和\n")

    _, t_seq2  = run_sequential(cpu_clean_task)
    _, t_thr2  = run_thread_pool(cpu_clean_task, workers=8)
    _, t_proc2 = run_process_pool(cpu_clean_task, workers=4)

    ascii_bar_chart(
        {"单线程": t_seq2, "线程池(8)": t_thr2, "进程池(4)": t_proc2},
        "CPU密集型耗时对比",
    )

    print("\n结论:")
    print(f"  IO密集型: 线程池加速比 = {t_seq/t_thr:.1f}x")
    print(f"  CPU密集型: 进程池加速比 = {t_seq2/t_proc2:.1f}x")
    print(f"  CPU密集型: 线程池加速比 = {t_seq2/t_thr2:.1f}x  ← GIL 限制,接近1x")

# Step 8:main 负责读取 --mode,让用户选择 compare/threading/pool。
def main() -> None:
    parser = argparse.ArgumentParser(description="并发模型性能对比")
    parser.add_argument("--mode", choices=["compare", "threading", "pool"],
                        default="compare", help="演示模式")
    args = parser.parse_args()

    print("GIL 原理速记:")
    print("  IO密集型  → threading / asyncio  (IO等待时释放GIL)")
    print("  CPU密集型 → multiprocessing       (独立进程,无GIL竞争)")

    if args.mode == "compare":
        demo_compare()
    elif args.mode == "threading":
        demo_threading_primitives()
    elif args.mode == "pool":
        demo_process_pool()

import sys
sys.argv = ["prog", "--mode", "threading"]
main()

极客实战:完整源码与运行

现在,把上面的积木拼起来,将以下完整代码放进你的编辑器,运行它。先看整体闭环,再回头逐段改参数,你会更容易建立工程直觉。

#!/usr/bin/env python3
"""
多线程 vs 多进程 性能对比
用法:
  python3 12-python-concurrency.py --mode compare   # 三种模型对比(默认)
  python3 12-python-concurrency.py --mode threading # 仅演示线程同步原语
  python3 12-python-concurrency.py --mode pool      # 仅演示进程池
"""
import argparse
import ctypes
import threading
import time
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor, as_completed
from multiprocessing import Lock as MpLock
from multiprocessing import Pool, Value


# ── 模拟任务 ──────────────────────────────────────────────────
# IO密集型:模拟读取远程知识库文档(用 sleep 模拟网络延迟)
def io_clean_task(doc_id: int) -> dict:
    """模拟 IO 密集型:下载+清洗一篇文档"""
    time.sleep(0.05)   # 模拟 50ms 网络 IO
    word_count = (doc_id * 137 + 42) % 500 + 100
    return {"doc_id": doc_id, "words": word_count, "status": "ok"}

# CPU密集型:模拟对文本做哈希/分词等计算
def cpu_clean_task(doc_id: int) -> dict:
    """模拟 CPU 密集型:对文档内容做计算密集型处理"""
    # 纯计算,不 sleep
    total = sum(i * i for i in range(50_000))
    word_count = (total + doc_id) % 500 + 100
    return {"doc_id": doc_id, "words": word_count, "status": "ok"}


# ── 三种执行方式 ──────────────────────────────────────────────
DOC_IDS = list(range(24))

def run_sequential(task_fn) -> tuple[list[dict], float]:
    start = time.perf_counter()
    results = [task_fn(d) for d in DOC_IDS]
    return results, time.perf_counter() - start

def run_thread_pool(task_fn, workers: int = 8) -> tuple[list[dict], float]:
    start = time.perf_counter()
    with ThreadPoolExecutor(max_workers=workers) as ex:
        results = list(ex.map(task_fn, DOC_IDS))
    return results, time.perf_counter() - start

def run_process_pool(task_fn, workers: int = 4) -> tuple[list[dict], float]:
    start = time.perf_counter()
    try:
        with ProcessPoolExecutor(max_workers=workers) as ex:
            results = list(ex.map(task_fn, DOC_IDS))
    except (OSError, PermissionError) as exc:
        print(f"  [降级] 当前环境限制多进程:{exc},改用单线程模拟结果。")
        results = [task_fn(d) for d in DOC_IDS]
    return results, time.perf_counter() - start


# ── ASCII 柱状图 ──────────────────────────────────────────────
def ascii_bar_chart(data: dict[str, float], title: str, unit: str = "s") -> None:
    max_val = max(data.values()) or 1
    bar_width = 30
    print(f"\n{'─'*50}")
    print(f"  {title}")
    print(f"{'─'*50}")
    for label, val in data.items():
        filled = int(val / max_val * bar_width)
        bar = "█" * filled + "░" * (bar_width - filled)
        print(f"  {label:<18}{bar}{val:.3f}{unit}")
    print(f"{'─'*50}")
    fastest = min(data, key=data.get)  # type: ignore[arg-type]
    print(f"  最快: {fastest}  ({data[fastest]:.3f}{unit})")


# ── 线程同步原语演示 ──────────────────────────────────────────
def demo_threading_primitives() -> None:
    print("\n=== 线程同步原语演示 ===\n")

    # 1. Lock:安全计数器
    counter = 0
    lock = threading.Lock()
    def safe_increment(n: int) -> None:
        nonlocal counter
        for _ in range(n):
            with lock:
                counter += 1

    threads = [threading.Thread(target=safe_increment, args=(1000,)) for _ in range(5)]
    for t in threads: t.start()
    for t in threads: t.join()
    print(f"Lock 计数器(期望 5000): {counter}")

    # 2. Event:生产者-消费者
    data_ready = threading.Event()
    pipeline_result: list[str] = []

    def producer() -> None:
        time.sleep(0.1)
        pipeline_result.append("embedding向量已生成")
        data_ready.set()

    def consumer() -> None:
        data_ready.wait(timeout=2.0)
        if pipeline_result:
            print(f"Event 消费者收到: {pipeline_result[0]}")

    t1 = threading.Thread(target=producer)
    t2 = threading.Thread(target=consumer)
    t1.start(); t2.start()
    t1.join(); t2.join()

    # 3. Semaphore:限流
    sem = threading.Semaphore(2)   # 最多2个并发推理
    log: list[str] = []
    log_lock = threading.Lock()

    def inference_call(task_id: int) -> None:
        with sem:
            with log_lock:
                log.append(f"推理任务{task_id}开始")
            time.sleep(0.05)
            with log_lock:
                log.append(f"推理任务{task_id}完成")

    workers = [threading.Thread(target=inference_call, args=(i,)) for i in range(4)]
    for w in workers: w.start()
    for w in workers: w.join()
    print(f"Semaphore 推理日志: {log}")


# ── 进程池演示 ────────────────────────────────────────────────
def demo_process_pool() -> None:
    print("\n=== 进程池演示(CPU密集型)===\n")
    _, elapsed = run_process_pool(cpu_clean_task, workers=4)
    print(f"进程池(4进程)处理 {len(DOC_IDS)} 篇文档耗时: {elapsed:.3f}s")


# ── 完整对比 ──────────────────────────────────────────────────
def demo_compare() -> None:
    print("\n=== IO密集型任务对比(模拟网络请求)===")
    print(f"任务数: {len(DOC_IDS)} 篇文档,每篇模拟 50ms IO\n")

    _, t_seq  = run_sequential(io_clean_task)
    _, t_thr  = run_thread_pool(io_clean_task, workers=8)
    _, t_proc = run_process_pool(io_clean_task, workers=4)

    ascii_bar_chart(
        {"单线程": t_seq, "线程池(8)": t_thr, "进程池(4)": t_proc},
        "IO密集型耗时对比",
    )

    print("\n=== CPU密集型任务对比(纯计算)===")
    print(f"任务数: {len(DOC_IDS)} 篇文档,每篇做 50000 次平方和\n")

    _, t_seq2  = run_sequential(cpu_clean_task)
    _, t_thr2  = run_thread_pool(cpu_clean_task, workers=8)
    _, t_proc2 = run_process_pool(cpu_clean_task, workers=4)

    ascii_bar_chart(
        {"单线程": t_seq2, "线程池(8)": t_thr2, "进程池(4)": t_proc2},
        "CPU密集型耗时对比",
    )

    print("\n结论:")
    print(f"  IO密集型: 线程池加速比 = {t_seq/t_thr:.1f}x")
    print(f"  CPU密集型: 进程池加速比 = {t_seq2/t_proc2:.1f}x")
    print(f"  CPU密集型: 线程池加速比 = {t_seq2/t_thr2:.1f}x  ← GIL 限制,接近1x")


# ── 入口 ──────────────────────────────────────────────────────
def main() -> None:
    parser = argparse.ArgumentParser(description="并发模型性能对比")
    parser.add_argument("--mode", choices=["compare", "threading", "pool"],
                        default="compare", help="演示模式")
    args = parser.parse_args()

    print("GIL 原理速记:")
    print("  IO密集型  → threading / asyncio  (IO等待时释放GIL)")
    print("  CPU密集型 → multiprocessing       (独立进程,无GIL竞争)")

    if args.mode == "compare":
        demo_compare()
    elif args.mode == "threading":
        demo_threading_primitives()
    elif args.mode == "pool":
        demo_process_pool()

if __name__ == "__main__":
    main()

运行示例:

python3 12-python-concurrency.py --mode compare    # 完整对比 + ASCII图
python3 12-python-concurrency.py --mode threading  # 锁/事件/信号量演示
python3 12-python-concurrency.py --mode pool       # 进程池演示

并发模型选型速查

场景 推荐方案 原因
批量 HTTP 请求 ThreadPoolExecutor IO密集,GIL在IO时释放
图像/向量计算 ProcessPoolExecutor CPU密集,绕过GIL
高并发网络服务 asyncio 单线程事件循环,极低开销
简单后台任务 threading.Thread 轻量,适合少量并发
大规模数据处理 multiprocessing.Pool 多核并行,共享内存可选

⏱ NexDo Time · 5 分钟微操

  1. 修改 ascii_bar_chart,在柱状图右侧加一列"相对单线程加速比"(如 3.2x)。
  2. run_thread_pool 加一个 timeout 参数,用 future.result(timeout=...) 捕获超时任务并标记为 TIMEOUT
  3. threading.Timer 实现一个"5秒后自动停止"的知识库索引任务,到时间后打印已处理的文档数。

Don’t wait for next time, do it in the next moment.