12 · 多进程与多线程:榨干 CPU 性能
🔗 知识图谱导航:阅读本文前,建议先掌握/回顾 《11 · Socket 编程:TCP/UDP 底层通信》 中的核心概念;本文会在这个基础上继续推进。 上一篇我们用 Socket 把数据送上网络;本篇回到本机,学会同时跑多条腿——理解 GIL 才能在正确的场景选对并发模型。
极客解析:并发不是把代码写复杂,而是把等待、调度和资源隔离讲清楚;本文用本机可运行的 Mock 场景验证机制。
痛点与架构:单独记 API 或概念很容易学完就忘;本文先锁定真实痛点,再把它拆成“输入数据 → 核心机制 → 可运行输出”三段闭环。深入 GIL 原理,掌握 threading 锁/事件/信号量、multiprocessing 进程池/共享内存、concurrent.futures 统一接口,并用 ASCII 柱状图对比单线程/多线程/多进程的实际耗时。
1. GIL 原理(ASCII 图)
Python 解释器(CPython)内部:
线程A ──▶ 获取GIL ──▶ 执行字节码 ──▶ 释放GIL ──▶ 等待
线程B ──▶ 等待GIL ──────────────────▶ 获取GIL ──▶ 执行
结论:同一时刻只有一个线程在执行 Python 字节码
↓
CPU密集型:多线程 ≈ 单线程(GIL 是瓶颈)
IO密集型:多线程有效(IO等待时自动释放GIL)
↓
CPU密集型 → 用 multiprocessing(每个进程有独立GIL)
IO密集型 → 用 threading 或 asyncio
2. threading:线程锁 / 事件 / 信号量
import threading
import time
counter = 0
lock = threading.Lock()
def increment() -> None:
global counter
with lock: # 自动 acquire / release
counter += 1
# ── 事件(Event)─────────────────────────────────────────────
ready = threading.Event()
def producer() -> None:
# 准备好数据后通知消费者
ready.set()
def consumer() -> None:
ready.wait() # 阻塞直到 set()
print("数据已就绪,开始消费")
# ── 信号量(Semaphore)────────────────────────────────────────
# 限制同时访问 AI 推理服务的并发数为 3
sem = threading.Semaphore(3)
def call_inference(task_id: int) -> None:
with sem:
print(f"任务 {task_id} 正在推理...")
time.sleep(0.01) # 模拟一次很短的推理
threads = [threading.Thread(target=increment) for _ in range(5)]
for t in threads:
t.start()
for t in threads:
t.join()
print("Lock 计数器:", counter)
threading.Thread(target=producer).start()
consumer()
workers = [threading.Thread(target=call_inference, args=(i,)) for i in range(4)]
for w in workers:
w.start()
for w in workers:
w.join()
3. multiprocessing:进程池与共享内存
from multiprocessing import Pool, Value, Lock
import ctypes
# ── 进程池 ────────────────────────────────────────────────────
def cpu_task(n: int) -> int:
return sum(i * i for i in range(n))
# ── 共享内存(跨进程计数器)──────────────────────────────────
shared_count = Value(ctypes.c_int, 0)
shared_lock = Lock()
def worker(shared: Value, lk: Lock) -> None:
with lk:
shared.value += 1
if __name__ == "__main__":
with Pool(processes=2) as pool:
results = pool.map(cpu_task, [10_000] * 4)
worker(shared_count, shared_lock)
print("进程池结果数量:", len(results))
print("共享计数器:", shared_count.value)
4. concurrent.futures:统一高层接口
import time
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor, as_completed
def io_task(t): time.sleep(0.01); return t * 2
def cpu_task(t): return sum(range(t * 1000))
tasks = list(range(8))
# 线程池(IO密集型)
thread_results = []
with ThreadPoolExecutor(max_workers=4) as executor:
futures = {executor.submit(io_task, t): t for t in tasks}
for fut in as_completed(futures):
thread_results.append(fut.result())
if __name__ == "__main__":
# 进程池(CPU密集型)
try:
with ProcessPoolExecutor(max_workers=2) as executor:
process_results = list(executor.map(cpu_task, tasks))
except (OSError, PermissionError) as exc:
print("进程池受限,降级为单线程:", type(exc).__name__)
process_results = [cpu_task(t) for t in tasks]
print("线程池结果数量:", len(thread_results))
print("进程池结果数量:", len(process_results))
concurrent.futures 屏蔽了线程/进程的底层差异,是生产代码的首选。
实战:知识库文本批量清洗性能对比
模拟对 24 篇知识库文档进行文本清洗,对比三种并发模型的耗时,并用 ASCII 柱状图展示结果。
步步为营:核心逻辑自适应拆解
并发不是“线程越多越好”,而是先分清任务类型:IO 密集型主要在等,线程池能把等待重叠;CPU 密集型主要在算,进程池才能绕开 GIL。下面用小实验逐步建立直觉。
Step 1:先分清 IO 密集型和 CPU 密集型任务
痛点与机制:
并发选型的第一步不是上线程池,而是判断任务类型。io_clean_task() 用 sleep 模拟网络等待,等待时线程可以去做别的;cpu_clean_task() 是纯计算,会被 GIL 限制,更适合多进程。
核心源码(逐字来自文末完整源码):
def io_clean_task(doc_id: int) -> dict:
"""模拟 IO 密集型:下载+清洗一篇文档"""
time.sleep(0.05) # 模拟 50ms 网络 IO
word_count = (doc_id * 137 + 42) % 500 + 100
return {"doc_id": doc_id, "words": word_count, "status": "ok"}
# CPU密集型:模拟对文本做哈希/分词等计算
def cpu_clean_task(doc_id: int) -> dict:
"""模拟 CPU 密集型:对文档内容做计算密集型处理"""
# 纯计算,不 sleep
total = sum(i * i for i in range(50_000))
word_count = (total + doc_id) % 500 + 100
return {"doc_id": doc_id, "words": word_count, "status": "ok"}
可运行演示(补齐 Mock 数据与 print 反馈):
import time
# Step 1:IO任务像等外卖,CPU任务像自己做一桌菜,优化方式完全不同。
def io_clean_task(doc_id: int) -> dict:
"""模拟 IO 密集型:下载+清洗一篇文档"""
time.sleep(0.05) # 模拟 50ms 网络 IO
word_count = (doc_id * 137 + 42) % 500 + 100
return {"doc_id": doc_id, "words": word_count, "status": "ok"}
# CPU密集型:模拟对文本做哈希/分词等计算
def cpu_clean_task(doc_id: int) -> dict:
"""模拟 CPU 密集型:对文档内容做计算密集型处理"""
# 纯计算,不 sleep
total = sum(i * i for i in range(50_000))
word_count = (total + doc_id) % 500 + 100
return {"doc_id": doc_id, "words": word_count, "status": "ok"}
print("IO任务结果:", io_clean_task(1))
print("CPU任务结果:", cpu_clean_task(1))
Step 2:用顺序执行建立性能基准线
痛点与机制:
基准线很重要。没有顺序执行的耗时,就不知道线程池或进程池到底有没有更快。这里 24 个 IO 任务每个等 50ms,顺序执行大约就是 24 次等待叠加。
核心源码(逐字来自文末完整源码):
DOC_IDS = list(range(24))
def run_sequential(task_fn) -> tuple[list[dict], float]:
start = time.perf_counter()
results = [task_fn(d) for d in DOC_IDS]
return results, time.perf_counter() - start
可运行演示(补齐 Mock 数据与 print 反馈):
import time
def io_clean_task(doc_id: int) -> dict:
"""模拟 IO 密集型:下载+清洗一篇文档"""
time.sleep(0.05) # 模拟 50ms 网络 IO
word_count = (doc_id * 137 + 42) % 500 + 100
return {"doc_id": doc_id, "words": word_count, "status": "ok"}
# CPU密集型:模拟对文本做哈希/分词等计算
def cpu_clean_task(doc_id: int) -> dict:
"""模拟 CPU 密集型:对文档内容做计算密集型处理"""
# 纯计算,不 sleep
total = sum(i * i for i in range(50_000))
word_count = (total + doc_id) % 500 + 100
return {"doc_id": doc_id, "words": word_count, "status": "ok"}
DOC_IDS = list(range(24))
# Step 2:顺序执行像一个人排队干活,它是后面所有加速比的基准线。
def run_sequential(task_fn) -> tuple[list[dict], float]:
start = time.perf_counter()
results = [task_fn(d) for d in DOC_IDS]
return results, time.perf_counter() - start
results, elapsed = run_sequential(io_clean_task)
print("处理数量:", len(results))
print("首条结果:", results[0])
print(f"顺序耗时: {elapsed:.3f}s")
Step 3:用 ThreadPoolExecutor 让 IO 等待重叠起来
痛点与机制:
线程池适合 IO 密集型任务,因为等待网络、磁盘时,线程会释放 GIL,其他线程可以继续跑。你可以把它想成多人同时等不同快递,不是一个人等完 A 再等 B。
核心源码(逐字来自文末完整源码):
def run_thread_pool(task_fn, workers: int = 8) -> tuple[list[dict], float]:
start = time.perf_counter()
with ThreadPoolExecutor(max_workers=workers) as ex:
results = list(ex.map(task_fn, DOC_IDS))
return results, time.perf_counter() - start
可运行演示(补齐 Mock 数据与 print 反馈):
import time
from concurrent.futures import ThreadPoolExecutor
def io_clean_task(doc_id: int) -> dict:
"""模拟 IO 密集型:下载+清洗一篇文档"""
time.sleep(0.05) # 模拟 50ms 网络 IO
word_count = (doc_id * 137 + 42) % 500 + 100
return {"doc_id": doc_id, "words": word_count, "status": "ok"}
# CPU密集型:模拟对文本做哈希/分词等计算
def cpu_clean_task(doc_id: int) -> dict:
"""模拟 CPU 密集型:对文档内容做计算密集型处理"""
# 纯计算,不 sleep
total = sum(i * i for i in range(50_000))
word_count = (total + doc_id) % 500 + 100
return {"doc_id": doc_id, "words": word_count, "status": "ok"}
DOC_IDS = list(range(24))
# Step 3:线程池像让 8 个同事同时等外卖,等待时间可以重叠。
def run_thread_pool(task_fn, workers: int = 8) -> tuple[list[dict], float]:
start = time.perf_counter()
with ThreadPoolExecutor(max_workers=workers) as ex:
results = list(ex.map(task_fn, DOC_IDS))
return results, time.perf_counter() - start
_, t_seq = run_sequential(io_clean_task) if 'run_sequential' in globals() else ([io_clean_task(d) for d in DOC_IDS], 0.0)
results, elapsed = run_thread_pool(io_clean_task, workers=8)
print("线程池处理数量:", len(results))
print(f"线程池耗时: {elapsed:.3f}s")
print("提示: IO密集型通常适合线程池")
Step 4:读懂 ProcessPoolExecutor 的职责,并保留受限环境降级
痛点与机制:
多进程可以绕开 GIL,因为每个进程都有自己的 Python 解释器和 GIL。但浏览器执行环境或沙箱可能限制创建子进程,所以教学片段先讲清职责,完整脚本里再通过入口统一运行和降级。
核心源码(逐字来自文末完整源码):
def run_process_pool(task_fn, workers: int = 4) -> tuple[list[dict], float]:
start = time.perf_counter()
try:
with ProcessPoolExecutor(max_workers=workers) as ex:
results = list(ex.map(task_fn, DOC_IDS))
except (OSError, PermissionError) as exc:
print(f" [降级] 当前环境限制多进程:{exc},改用单线程模拟结果。")
results = [task_fn(d) for d in DOC_IDS]
return results, time.perf_counter() - start
可运行演示(补齐 Mock 数据与 print 反馈):
import time
from concurrent.futures import ProcessPoolExecutor
def io_clean_task(doc_id: int) -> dict:
"""模拟 IO 密集型:下载+清洗一篇文档"""
time.sleep(0.05) # 模拟 50ms 网络 IO
word_count = (doc_id * 137 + 42) % 500 + 100
return {"doc_id": doc_id, "words": word_count, "status": "ok"}
# CPU密集型:模拟对文本做哈希/分词等计算
def cpu_clean_task(doc_id: int) -> dict:
"""模拟 CPU 密集型:对文档内容做计算密集型处理"""
# 纯计算,不 sleep
total = sum(i * i for i in range(50_000))
word_count = (total + doc_id) % 500 + 100
return {"doc_id": doc_id, "words": word_count, "status": "ok"}
DOC_IDS = list(range(24))
# Step 4:进程池像开多个厨房,各自有独立 GIL,适合 CPU 密集型。
def run_process_pool(task_fn, workers: int = 4) -> tuple[list[dict], float]:
start = time.perf_counter()
try:
with ProcessPoolExecutor(max_workers=workers) as ex:
results = list(ex.map(task_fn, DOC_IDS))
except (OSError, PermissionError) as exc:
print(f" [降级] 当前环境限制多进程:{exc},改用单线程模拟结果。")
results = [task_fn(d) for d in DOC_IDS]
return results, time.perf_counter() - start
print("run_process_pool 已定义,内部带 OSError/PermissionError 降级保护。")
print("示例用途: results, elapsed = run_process_pool(cpu_clean_task, workers=4)")
print("当前页面演示不直接启动子进程,避免浏览器沙箱环境卡住。")
Step 5:用 ASCII 柱状图把耗时差距画出来
痛点与机制:
性能数据只看小数不直观。ascii_bar_chart() 把耗时转成终端柱状图,最长的一项占满 30 格,其它按比例缩短,一眼就能看出线程池在 IO 场景下更快。
核心源码(逐字来自文末完整源码):
def ascii_bar_chart(data: dict[str, float], title: str, unit: str = "s") -> None:
max_val = max(data.values()) or 1
bar_width = 30
print(f"\n{'─'*50}")
print(f" {title}")
print(f"{'─'*50}")
for label, val in data.items():
filled = int(val / max_val * bar_width)
bar = "█" * filled + "░" * (bar_width - filled)
print(f" {label:<18} │{bar}│ {val:.3f}{unit}")
print(f"{'─'*50}")
fastest = min(data, key=data.get) # type: ignore[arg-type]
print(f" 最快: {fastest} ({data[fastest]:.3f}{unit})")
可运行演示(补齐 Mock 数据与 print 反馈):
# Step 5:柱状图把数字变成视觉长度,新手更容易看出谁快谁慢。
def ascii_bar_chart(data: dict[str, float], title: str, unit: str = "s") -> None:
max_val = max(data.values()) or 1
bar_width = 30
print(f"\n{'─'*50}")
print(f" {title}")
print(f"{'─'*50}")
for label, val in data.items():
filled = int(val / max_val * bar_width)
bar = "█" * filled + "░" * (bar_width - filled)
print(f" {label:<18} │{bar}│ {val:.3f}{unit}")
print(f"{'─'*50}")
fastest = min(data, key=data.get) # type: ignore[arg-type]
print(f" 最快: {fastest} ({data[fastest]:.3f}{unit})")
ascii_bar_chart({"单线程": 1.20, "线程池(8)": 0.18, "进程池(4)": 0.55}, "IO密集型耗时示例")
Step 6:用 Lock/Event/Semaphore 看懂线程同步三件套
痛点与机制:
多线程不是“多开几个线程”就完事。Lock 防止多个线程同时改同一个变量,Event 用来通知“数据准备好了”,Semaphore 限制同时访问某个资源的人数,比如最多 2 个推理请求同时跑。
核心源码(逐字来自文末完整源码):
def demo_threading_primitives() -> None:
print("\n=== 线程同步原语演示 ===\n")
# 1. Lock:安全计数器
counter = 0
lock = threading.Lock()
def safe_increment(n: int) -> None:
nonlocal counter
for _ in range(n):
with lock:
counter += 1
threads = [threading.Thread(target=safe_increment, args=(1000,)) for _ in range(5)]
for t in threads: t.start()
for t in threads: t.join()
print(f"Lock 计数器(期望 5000): {counter}")
# 2. Event:生产者-消费者
data_ready = threading.Event()
pipeline_result: list[str] = []
def producer() -> None:
time.sleep(0.1)
pipeline_result.append("embedding向量已生成")
data_ready.set()
def consumer() -> None:
data_ready.wait(timeout=2.0)
if pipeline_result:
print(f"Event 消费者收到: {pipeline_result[0]}")
t1 = threading.Thread(target=producer)
t2 = threading.Thread(target=consumer)
t1.start(); t2.start()
t1.join(); t2.join()
# 3. Semaphore:限流
sem = threading.Semaphore(2) # 最多2个并发推理
log: list[str] = []
log_lock = threading.Lock()
def inference_call(task_id: int) -> None:
with sem:
with log_lock:
log.append(f"推理任务{task_id}开始")
time.sleep(0.05)
with log_lock:
log.append(f"推理任务{task_id}完成")
workers = [threading.Thread(target=inference_call, args=(i,)) for i in range(4)]
for w in workers: w.start()
for w in workers: w.join()
print(f"Semaphore 推理日志: {log}")
可运行演示(补齐 Mock 数据与 print 反馈):
import threading
import time
# Step 6:同步原语像交通规则,保证多线程不抢、不乱、不超载。
def demo_threading_primitives() -> None:
print("\n=== 线程同步原语演示 ===\n")
# 1. Lock:安全计数器
counter = 0
lock = threading.Lock()
def safe_increment(n: int) -> None:
nonlocal counter
for _ in range(n):
with lock:
counter += 1
threads = [threading.Thread(target=safe_increment, args=(1000,)) for _ in range(5)]
for t in threads: t.start()
for t in threads: t.join()
print(f"Lock 计数器(期望 5000): {counter}")
# 2. Event:生产者-消费者
data_ready = threading.Event()
pipeline_result: list[str] = []
def producer() -> None:
time.sleep(0.1)
pipeline_result.append("embedding向量已生成")
data_ready.set()
def consumer() -> None:
data_ready.wait(timeout=2.0)
if pipeline_result:
print(f"Event 消费者收到: {pipeline_result[0]}")
t1 = threading.Thread(target=producer)
t2 = threading.Thread(target=consumer)
t1.start(); t2.start()
t1.join(); t2.join()
# 3. Semaphore:限流
sem = threading.Semaphore(2) # 最多2个并发推理
log: list[str] = []
log_lock = threading.Lock()
def inference_call(task_id: int) -> None:
with sem:
with log_lock:
log.append(f"推理任务{task_id}开始")
time.sleep(0.05)
with log_lock:
log.append(f"推理任务{task_id}完成")
workers = [threading.Thread(target=inference_call, args=(i,)) for i in range(4)]
for w in workers: w.start()
for w in workers: w.join()
print(f"Semaphore 推理日志: {log}")
demo_threading_primitives()
Step 7:用 demo_compare 对比 IO 和 CPU 两类任务
痛点与机制:
demo_compare() 是整篇文章的实验台:同一批任务分别用单线程、线程池、进程池跑一遍,再画图对比。IO 看线程池,CPU 看进程池,这个结论要通过实测建立。
核心源码(逐字来自文末完整源码):
def demo_compare() -> None:
print("\n=== IO密集型任务对比(模拟网络请求)===")
print(f"任务数: {len(DOC_IDS)} 篇文档,每篇模拟 50ms IO\n")
_, t_seq = run_sequential(io_clean_task)
_, t_thr = run_thread_pool(io_clean_task, workers=8)
_, t_proc = run_process_pool(io_clean_task, workers=4)
ascii_bar_chart(
{"单线程": t_seq, "线程池(8)": t_thr, "进程池(4)": t_proc},
"IO密集型耗时对比",
)
print("\n=== CPU密集型任务对比(纯计算)===")
print(f"任务数: {len(DOC_IDS)} 篇文档,每篇做 50000 次平方和\n")
_, t_seq2 = run_sequential(cpu_clean_task)
_, t_thr2 = run_thread_pool(cpu_clean_task, workers=8)
_, t_proc2 = run_process_pool(cpu_clean_task, workers=4)
ascii_bar_chart(
{"单线程": t_seq2, "线程池(8)": t_thr2, "进程池(4)": t_proc2},
"CPU密集型耗时对比",
)
print("\n结论:")
print(f" IO密集型: 线程池加速比 = {t_seq/t_thr:.1f}x")
print(f" CPU密集型: 进程池加速比 = {t_seq2/t_proc2:.1f}x")
print(f" CPU密集型: 线程池加速比 = {t_seq2/t_thr2:.1f}x ← GIL 限制,接近1x")
可运行演示(补齐 Mock 数据与 print 反馈):
import time
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor
def io_clean_task(doc_id: int) -> dict:
"""模拟 IO 密集型:下载+清洗一篇文档"""
time.sleep(0.05) # 模拟 50ms 网络 IO
word_count = (doc_id * 137 + 42) % 500 + 100
return {"doc_id": doc_id, "words": word_count, "status": "ok"}
# CPU密集型:模拟对文本做哈希/分词等计算
def cpu_clean_task(doc_id: int) -> dict:
"""模拟 CPU 密集型:对文档内容做计算密集型处理"""
# 纯计算,不 sleep
total = sum(i * i for i in range(50_000))
word_count = (total + doc_id) % 500 + 100
return {"doc_id": doc_id, "words": word_count, "status": "ok"}
DOC_IDS = list(range(24))
def run_sequential(task_fn) -> tuple[list[dict], float]:
start = time.perf_counter()
results = [task_fn(d) for d in DOC_IDS]
return results, time.perf_counter() - start
def run_thread_pool(task_fn, workers: int = 8) -> tuple[list[dict], float]:
start = time.perf_counter()
with ThreadPoolExecutor(max_workers=workers) as ex:
results = list(ex.map(task_fn, DOC_IDS))
return results, time.perf_counter() - start
def run_process_pool(task_fn, workers: int = 4) -> tuple[list[dict], float]:
start = time.perf_counter()
try:
with ProcessPoolExecutor(max_workers=workers) as ex:
results = list(ex.map(task_fn, DOC_IDS))
except (OSError, PermissionError) as exc:
print(f" [降级] 当前环境限制多进程:{exc},改用单线程模拟结果。")
results = [task_fn(d) for d in DOC_IDS]
return results, time.perf_counter() - start
def ascii_bar_chart(data: dict[str, float], title: str, unit: str = "s") -> None:
max_val = max(data.values()) or 1
bar_width = 30
print(f"\n{'─'*50}")
print(f" {title}")
print(f"{'─'*50}")
for label, val in data.items():
filled = int(val / max_val * bar_width)
bar = "█" * filled + "░" * (bar_width - filled)
print(f" {label:<18} │{bar}│ {val:.3f}{unit}")
print(f"{'─'*50}")
fastest = min(data, key=data.get) # type: ignore[arg-type]
print(f" 最快: {fastest} ({data[fastest]:.3f}{unit})")
# Step 7:完整对比会同时跑 IO 和 CPU 场景,并给出加速比。
def demo_compare() -> None:
print("\n=== IO密集型任务对比(模拟网络请求)===")
print(f"任务数: {len(DOC_IDS)} 篇文档,每篇模拟 50ms IO\n")
_, t_seq = run_sequential(io_clean_task)
_, t_thr = run_thread_pool(io_clean_task, workers=8)
_, t_proc = run_process_pool(io_clean_task, workers=4)
ascii_bar_chart(
{"单线程": t_seq, "线程池(8)": t_thr, "进程池(4)": t_proc},
"IO密集型耗时对比",
)
print("\n=== CPU密集型任务对比(纯计算)===")
print(f"任务数: {len(DOC_IDS)} 篇文档,每篇做 50000 次平方和\n")
_, t_seq2 = run_sequential(cpu_clean_task)
_, t_thr2 = run_thread_pool(cpu_clean_task, workers=8)
_, t_proc2 = run_process_pool(cpu_clean_task, workers=4)
ascii_bar_chart(
{"单线程": t_seq2, "线程池(8)": t_thr2, "进程池(4)": t_proc2},
"CPU密集型耗时对比",
)
print("\n结论:")
print(f" IO密集型: 线程池加速比 = {t_seq/t_thr:.1f}x")
print(f" CPU密集型: 进程池加速比 = {t_seq2/t_proc2:.1f}x")
print(f" CPU密集型: 线程池加速比 = {t_seq2/t_thr2:.1f}x ← GIL 限制,接近1x")
print("demo_compare 已定义:完整脚本运行时会输出两张耗时柱状图。")
print("这里不直接启动进程池,避免受限环境影响阅读体验。")
Step 8:用 main 做并发模型演示的命令行遥控器
痛点与机制:
命令行入口让脚本像有遥控器:--mode threading 专看线程同步,--mode pool 专看进程池,--mode compare 做完整对比。新手不用改代码,只改参数就能切实验。
核心源码(逐字来自文末完整源码):
def main() -> None:
parser = argparse.ArgumentParser(description="并发模型性能对比")
parser.add_argument("--mode", choices=["compare", "threading", "pool"],
default="compare", help="演示模式")
args = parser.parse_args()
print("GIL 原理速记:")
print(" IO密集型 → threading / asyncio (IO等待时释放GIL)")
print(" CPU密集型 → multiprocessing (独立进程,无GIL竞争)")
if args.mode == "compare":
demo_compare()
elif args.mode == "threading":
demo_threading_primitives()
elif args.mode == "pool":
demo_process_pool()
可运行演示(补齐 Mock 数据与 print 反馈):
import argparse
import threading
import time
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor
def io_clean_task(doc_id: int) -> dict:
"""模拟 IO 密集型:下载+清洗一篇文档"""
time.sleep(0.05) # 模拟 50ms 网络 IO
word_count = (doc_id * 137 + 42) % 500 + 100
return {"doc_id": doc_id, "words": word_count, "status": "ok"}
# CPU密集型:模拟对文本做哈希/分词等计算
def cpu_clean_task(doc_id: int) -> dict:
"""模拟 CPU 密集型:对文档内容做计算密集型处理"""
# 纯计算,不 sleep
total = sum(i * i for i in range(50_000))
word_count = (total + doc_id) % 500 + 100
return {"doc_id": doc_id, "words": word_count, "status": "ok"}
DOC_IDS = list(range(24))
def run_sequential(task_fn) -> tuple[list[dict], float]:
start = time.perf_counter()
results = [task_fn(d) for d in DOC_IDS]
return results, time.perf_counter() - start
def run_thread_pool(task_fn, workers: int = 8) -> tuple[list[dict], float]:
start = time.perf_counter()
with ThreadPoolExecutor(max_workers=workers) as ex:
results = list(ex.map(task_fn, DOC_IDS))
return results, time.perf_counter() - start
def run_process_pool(task_fn, workers: int = 4) -> tuple[list[dict], float]:
start = time.perf_counter()
try:
with ProcessPoolExecutor(max_workers=workers) as ex:
results = list(ex.map(task_fn, DOC_IDS))
except (OSError, PermissionError) as exc:
print(f" [降级] 当前环境限制多进程:{exc},改用单线程模拟结果。")
results = [task_fn(d) for d in DOC_IDS]
return results, time.perf_counter() - start
def ascii_bar_chart(data: dict[str, float], title: str, unit: str = "s") -> None:
max_val = max(data.values()) or 1
bar_width = 30
print(f"\n{'─'*50}")
print(f" {title}")
print(f"{'─'*50}")
for label, val in data.items():
filled = int(val / max_val * bar_width)
bar = "█" * filled + "░" * (bar_width - filled)
print(f" {label:<18} │{bar}│ {val:.3f}{unit}")
print(f"{'─'*50}")
fastest = min(data, key=data.get) # type: ignore[arg-type]
print(f" 最快: {fastest} ({data[fastest]:.3f}{unit})")
def demo_threading_primitives() -> None:
print("\n=== 线程同步原语演示 ===\n")
# 1. Lock:安全计数器
counter = 0
lock = threading.Lock()
def safe_increment(n: int) -> None:
nonlocal counter
for _ in range(n):
with lock:
counter += 1
threads = [threading.Thread(target=safe_increment, args=(1000,)) for _ in range(5)]
for t in threads: t.start()
for t in threads: t.join()
print(f"Lock 计数器(期望 5000): {counter}")
# 2. Event:生产者-消费者
data_ready = threading.Event()
pipeline_result: list[str] = []
def producer() -> None:
time.sleep(0.1)
pipeline_result.append("embedding向量已生成")
data_ready.set()
def consumer() -> None:
data_ready.wait(timeout=2.0)
if pipeline_result:
print(f"Event 消费者收到: {pipeline_result[0]}")
t1 = threading.Thread(target=producer)
t2 = threading.Thread(target=consumer)
t1.start(); t2.start()
t1.join(); t2.join()
# 3. Semaphore:限流
sem = threading.Semaphore(2) # 最多2个并发推理
log: list[str] = []
log_lock = threading.Lock()
def inference_call(task_id: int) -> None:
with sem:
with log_lock:
log.append(f"推理任务{task_id}开始")
time.sleep(0.05)
with log_lock:
log.append(f"推理任务{task_id}完成")
workers = [threading.Thread(target=inference_call, args=(i,)) for i in range(4)]
for w in workers: w.start()
for w in workers: w.join()
print(f"Semaphore 推理日志: {log}")
def demo_process_pool() -> None:
print("\n=== 进程池演示(CPU密集型)===\n")
_, elapsed = run_process_pool(cpu_clean_task, workers=4)
print(f"进程池(4进程)处理 {len(DOC_IDS)} 篇文档耗时: {elapsed:.3f}s")
def demo_compare() -> None:
print("\n=== IO密集型任务对比(模拟网络请求)===")
print(f"任务数: {len(DOC_IDS)} 篇文档,每篇模拟 50ms IO\n")
_, t_seq = run_sequential(io_clean_task)
_, t_thr = run_thread_pool(io_clean_task, workers=8)
_, t_proc = run_process_pool(io_clean_task, workers=4)
ascii_bar_chart(
{"单线程": t_seq, "线程池(8)": t_thr, "进程池(4)": t_proc},
"IO密集型耗时对比",
)
print("\n=== CPU密集型任务对比(纯计算)===")
print(f"任务数: {len(DOC_IDS)} 篇文档,每篇做 50000 次平方和\n")
_, t_seq2 = run_sequential(cpu_clean_task)
_, t_thr2 = run_thread_pool(cpu_clean_task, workers=8)
_, t_proc2 = run_process_pool(cpu_clean_task, workers=4)
ascii_bar_chart(
{"单线程": t_seq2, "线程池(8)": t_thr2, "进程池(4)": t_proc2},
"CPU密集型耗时对比",
)
print("\n结论:")
print(f" IO密集型: 线程池加速比 = {t_seq/t_thr:.1f}x")
print(f" CPU密集型: 进程池加速比 = {t_seq2/t_proc2:.1f}x")
print(f" CPU密集型: 线程池加速比 = {t_seq2/t_thr2:.1f}x ← GIL 限制,接近1x")
# Step 8:main 负责读取 --mode,让用户选择 compare/threading/pool。
def main() -> None:
parser = argparse.ArgumentParser(description="并发模型性能对比")
parser.add_argument("--mode", choices=["compare", "threading", "pool"],
default="compare", help="演示模式")
args = parser.parse_args()
print("GIL 原理速记:")
print(" IO密集型 → threading / asyncio (IO等待时释放GIL)")
print(" CPU密集型 → multiprocessing (独立进程,无GIL竞争)")
if args.mode == "compare":
demo_compare()
elif args.mode == "threading":
demo_threading_primitives()
elif args.mode == "pool":
demo_process_pool()
import sys
sys.argv = ["prog", "--mode", "threading"]
main()
极客实战:完整源码与运行
现在,把上面的积木拼起来,将以下完整代码放进你的编辑器,运行它。先看整体闭环,再回头逐段改参数,你会更容易建立工程直觉。
#!/usr/bin/env python3
"""
多线程 vs 多进程 性能对比
用法:
python3 12-python-concurrency.py --mode compare # 三种模型对比(默认)
python3 12-python-concurrency.py --mode threading # 仅演示线程同步原语
python3 12-python-concurrency.py --mode pool # 仅演示进程池
"""
import argparse
import ctypes
import threading
import time
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor, as_completed
from multiprocessing import Lock as MpLock
from multiprocessing import Pool, Value
# ── 模拟任务 ──────────────────────────────────────────────────
# IO密集型:模拟读取远程知识库文档(用 sleep 模拟网络延迟)
def io_clean_task(doc_id: int) -> dict:
"""模拟 IO 密集型:下载+清洗一篇文档"""
time.sleep(0.05) # 模拟 50ms 网络 IO
word_count = (doc_id * 137 + 42) % 500 + 100
return {"doc_id": doc_id, "words": word_count, "status": "ok"}
# CPU密集型:模拟对文本做哈希/分词等计算
def cpu_clean_task(doc_id: int) -> dict:
"""模拟 CPU 密集型:对文档内容做计算密集型处理"""
# 纯计算,不 sleep
total = sum(i * i for i in range(50_000))
word_count = (total + doc_id) % 500 + 100
return {"doc_id": doc_id, "words": word_count, "status": "ok"}
# ── 三种执行方式 ──────────────────────────────────────────────
DOC_IDS = list(range(24))
def run_sequential(task_fn) -> tuple[list[dict], float]:
start = time.perf_counter()
results = [task_fn(d) for d in DOC_IDS]
return results, time.perf_counter() - start
def run_thread_pool(task_fn, workers: int = 8) -> tuple[list[dict], float]:
start = time.perf_counter()
with ThreadPoolExecutor(max_workers=workers) as ex:
results = list(ex.map(task_fn, DOC_IDS))
return results, time.perf_counter() - start
def run_process_pool(task_fn, workers: int = 4) -> tuple[list[dict], float]:
start = time.perf_counter()
try:
with ProcessPoolExecutor(max_workers=workers) as ex:
results = list(ex.map(task_fn, DOC_IDS))
except (OSError, PermissionError) as exc:
print(f" [降级] 当前环境限制多进程:{exc},改用单线程模拟结果。")
results = [task_fn(d) for d in DOC_IDS]
return results, time.perf_counter() - start
# ── ASCII 柱状图 ──────────────────────────────────────────────
def ascii_bar_chart(data: dict[str, float], title: str, unit: str = "s") -> None:
max_val = max(data.values()) or 1
bar_width = 30
print(f"\n{'─'*50}")
print(f" {title}")
print(f"{'─'*50}")
for label, val in data.items():
filled = int(val / max_val * bar_width)
bar = "█" * filled + "░" * (bar_width - filled)
print(f" {label:<18} │{bar}│ {val:.3f}{unit}")
print(f"{'─'*50}")
fastest = min(data, key=data.get) # type: ignore[arg-type]
print(f" 最快: {fastest} ({data[fastest]:.3f}{unit})")
# ── 线程同步原语演示 ──────────────────────────────────────────
def demo_threading_primitives() -> None:
print("\n=== 线程同步原语演示 ===\n")
# 1. Lock:安全计数器
counter = 0
lock = threading.Lock()
def safe_increment(n: int) -> None:
nonlocal counter
for _ in range(n):
with lock:
counter += 1
threads = [threading.Thread(target=safe_increment, args=(1000,)) for _ in range(5)]
for t in threads: t.start()
for t in threads: t.join()
print(f"Lock 计数器(期望 5000): {counter}")
# 2. Event:生产者-消费者
data_ready = threading.Event()
pipeline_result: list[str] = []
def producer() -> None:
time.sleep(0.1)
pipeline_result.append("embedding向量已生成")
data_ready.set()
def consumer() -> None:
data_ready.wait(timeout=2.0)
if pipeline_result:
print(f"Event 消费者收到: {pipeline_result[0]}")
t1 = threading.Thread(target=producer)
t2 = threading.Thread(target=consumer)
t1.start(); t2.start()
t1.join(); t2.join()
# 3. Semaphore:限流
sem = threading.Semaphore(2) # 最多2个并发推理
log: list[str] = []
log_lock = threading.Lock()
def inference_call(task_id: int) -> None:
with sem:
with log_lock:
log.append(f"推理任务{task_id}开始")
time.sleep(0.05)
with log_lock:
log.append(f"推理任务{task_id}完成")
workers = [threading.Thread(target=inference_call, args=(i,)) for i in range(4)]
for w in workers: w.start()
for w in workers: w.join()
print(f"Semaphore 推理日志: {log}")
# ── 进程池演示 ────────────────────────────────────────────────
def demo_process_pool() -> None:
print("\n=== 进程池演示(CPU密集型)===\n")
_, elapsed = run_process_pool(cpu_clean_task, workers=4)
print(f"进程池(4进程)处理 {len(DOC_IDS)} 篇文档耗时: {elapsed:.3f}s")
# ── 完整对比 ──────────────────────────────────────────────────
def demo_compare() -> None:
print("\n=== IO密集型任务对比(模拟网络请求)===")
print(f"任务数: {len(DOC_IDS)} 篇文档,每篇模拟 50ms IO\n")
_, t_seq = run_sequential(io_clean_task)
_, t_thr = run_thread_pool(io_clean_task, workers=8)
_, t_proc = run_process_pool(io_clean_task, workers=4)
ascii_bar_chart(
{"单线程": t_seq, "线程池(8)": t_thr, "进程池(4)": t_proc},
"IO密集型耗时对比",
)
print("\n=== CPU密集型任务对比(纯计算)===")
print(f"任务数: {len(DOC_IDS)} 篇文档,每篇做 50000 次平方和\n")
_, t_seq2 = run_sequential(cpu_clean_task)
_, t_thr2 = run_thread_pool(cpu_clean_task, workers=8)
_, t_proc2 = run_process_pool(cpu_clean_task, workers=4)
ascii_bar_chart(
{"单线程": t_seq2, "线程池(8)": t_thr2, "进程池(4)": t_proc2},
"CPU密集型耗时对比",
)
print("\n结论:")
print(f" IO密集型: 线程池加速比 = {t_seq/t_thr:.1f}x")
print(f" CPU密集型: 进程池加速比 = {t_seq2/t_proc2:.1f}x")
print(f" CPU密集型: 线程池加速比 = {t_seq2/t_thr2:.1f}x ← GIL 限制,接近1x")
# ── 入口 ──────────────────────────────────────────────────────
def main() -> None:
parser = argparse.ArgumentParser(description="并发模型性能对比")
parser.add_argument("--mode", choices=["compare", "threading", "pool"],
default="compare", help="演示模式")
args = parser.parse_args()
print("GIL 原理速记:")
print(" IO密集型 → threading / asyncio (IO等待时释放GIL)")
print(" CPU密集型 → multiprocessing (独立进程,无GIL竞争)")
if args.mode == "compare":
demo_compare()
elif args.mode == "threading":
demo_threading_primitives()
elif args.mode == "pool":
demo_process_pool()
if __name__ == "__main__":
main()
运行示例:
python3 12-python-concurrency.py --mode compare # 完整对比 + ASCII图
python3 12-python-concurrency.py --mode threading # 锁/事件/信号量演示
python3 12-python-concurrency.py --mode pool # 进程池演示
并发模型选型速查
| 场景 | 推荐方案 | 原因 |
|---|---|---|
| 批量 HTTP 请求 | ThreadPoolExecutor |
IO密集,GIL在IO时释放 |
| 图像/向量计算 | ProcessPoolExecutor |
CPU密集,绕过GIL |
| 高并发网络服务 | asyncio |
单线程事件循环,极低开销 |
| 简单后台任务 | threading.Thread |
轻量,适合少量并发 |
| 大规模数据处理 | multiprocessing.Pool |
多核并行,共享内存可选 |
⏱ NexDo Time · 5 分钟微操
- 修改
ascii_bar_chart,在柱状图右侧加一列"相对单线程加速比"(如3.2x)。 - 给
run_thread_pool加一个timeout参数,用future.result(timeout=...)捕获超时任务并标记为TIMEOUT。 - 用
threading.Timer实现一个"5秒后自动停止"的知识库索引任务,到时间后打印已处理的文档数。
Don’t wait for next time, do it in the next moment.