15 · gevent/greenlet:协程库与高并发 IO 实战
🔗 知识图谱导航:阅读本文前,建议先掌握/回顾 《14 · 协程实战:asyncio 实现高并发》 中的核心概念;本文会在这个基础上继续推进。 承上启下:14篇用
asyncio实现了协程并发。asyncio是标准库,而gevent/greenlet是更早出现的第三方协程方案——理解它们的差异,才能真正搞懂 Python 协程的底层机制。
极客解析:并发不是把代码写复杂,而是把等待、调度和资源隔离讲清楚;本文用本机可运行的 Mock 场景验证机制。
痛点与架构
asyncio 要求代码全部用 async/await 重写,改造成本高。gevent 用 monkey-patch 在运行时替换标准库的阻塞调用,让已有同步代码自动变成协程并发,零改动。
greenlet(手动切换):
gr1 ──switch()──▶ gr2 ──switch()──▶ gr1
程序员手动决定何时切换,类似 yield
gevent(自动调度):
遇到 IO 阻塞 ──▶ 自动切换到其他协程
IO 完成 ──▶ 自动切回继续执行
底层:libev 事件循环
monkey-patch 原理:
import gevent.monkey; gevent.monkey.patch_all()
↓
socket.connect → gevent 版(非阻塞)
time.sleep → gevent 版(让出控制权)
threading.Thread → gevent 版(绿色线程)
| 方案 | 切换方式 | 改造成本 | 适用场景 |
|---|---|---|---|
greenlet |
手动 switch() |
高 | 理解协程原理 |
gevent |
自动(IO触发) | 低(monkey-patch) | 改造遗留同步代码 |
asyncio |
await 关键字 |
中(需重写) | 新项目标准选择 |
实战演练场
步步为营:核心逻辑自适应拆解
这一篇先把结论说清楚:greenlet 是手动切换,gevent 是遇到 IO 自动切换,monkey-patch 是把旧同步 IO 函数替换成协程友好版本。下面每一步都保留降级路径,即使你本机没装 gevent,也能跑出结果、看懂机制。
Step 1:先检测 gevent 是否安装,保证没装也能学
痛点与机制:
gevent 是第三方库,公网读者不一定已经安装。教程代码不能一上来就 ImportError。这段 try/except 把环境差异收住:有 gevent 就跑真实协程,没 gevent 就用 generator/threading 模拟,让学习不中断。
核心源码(逐字来自文末完整源码):
try:
import gevent
import gevent.monkey
import gevent.pool
from greenlet import greenlet
HAS_GEVENT = True
except ImportError:
HAS_GEVENT = False
可运行演示(补齐 Mock 数据与 print 反馈):
try:
import gevent
import gevent.monkey
import gevent.pool
from greenlet import greenlet
HAS_GEVENT = True
except ImportError:
HAS_GEVENT = False
print("是否安装 gevent:", HAS_GEVENT)
if not HAS_GEVENT:
print("当前将使用纯 Python 模拟演示,仍然能理解协程切换思想。")
Step 2:用 section 和 ts 统一输出格式,让终端结果更好读
痛点与机制:
并发演示输出很多,先统一标题和时间戳,读者才不会被散乱打印淹没。section() 像章节标题,ts() 用 perf_counter() 给性能测量准备高精度时间。
核心源码(逐字来自文末完整源码):
def section(title: str) -> None:
print(f"\n{'='*60}\n {title}\n{'='*60}")
def ts() -> str:
return f"{time.perf_counter():.3f}s"
可运行演示(补齐 Mock 数据与 print 反馈):
import time
# Step 2:工具函数负责排版和时间戳,不掺杂业务逻辑。
def section(title: str) -> None:
print(f"\n{'='*60}\n {title}\n{'='*60}")
def ts() -> str:
return f"{time.perf_counter():.3f}s"
section("输出工具演示")
print("当前时间戳:", ts())
Step 3:用 mode_greenlet 理解手动切换:谁 switch,谁让出舞台
痛点与机制:
greenlet 的核心是手动切换,就像两个人共用一个麦克风:A 说到一半主动把麦克风交给 B,B 说完再交回来。未安装 greenlet 时,源码用 generator 的 yield 模拟同样的“手动让出”语义。
核心源码(逐字来自文末完整源码):
def mode_greenlet() -> None:
section("greenlet — 手动协程切换")
if not HAS_GEVENT:
print("\n [模拟模式] greenlet 未安装,展示等效的 generator 实现\n")
# 用 generator 模拟 greenlet 的手动切换语义
execution_log: list[str] = []
def task_a():
execution_log.append("A: 开始")
yield # 切换到 B
execution_log.append("A: 恢复,继续执行")
yield # 切换到 B
execution_log.append("A: 完成")
def task_b():
execution_log.append("B: 开始")
yield # 切换回 A
execution_log.append("B: 恢复,继续执行")
gen_a, gen_b = task_a(), task_b()
# 手动交替推进
for _ in range(3):
try: next(gen_a)
except StopIteration: pass
try: next(gen_b)
except StopIteration: pass
print(" 执行顺序(手动切换):")
for i, log in enumerate(execution_log, 1):
print(f" {i}. {log}")
print("\n 💡 greenlet.switch() 等价于上面的 yield——程序员手动决定切换时机")
return
# 真实 greenlet 演示
execution_log: list[str] = []
def fun1() -> None:
execution_log.append("fun1: 开始执行")
execution_log.append("fun1: 切换到 fun2")
gr2.switch()
execution_log.append("fun1: 从 fun2 切回,继续执行")
gr2.switch()
execution_log.append("fun1: 完成")
def fun2() -> None:
execution_log.append("fun2: 开始执行")
execution_log.append("fun2: 切换回 fun1")
gr1.switch()
execution_log.append("fun2: 再次执行,完成")
gr1 = greenlet(fun1)
gr2 = greenlet(fun2)
gr1.switch() # 启动 fun1
print("\n 执行顺序(greenlet 手动切换):")
for i, log in enumerate(execution_log, 1):
print(f" {i}. {log}")
print(f"\n 关键:整个过程在单线程内完成,无任何系统调用")
print(f" gr1.dead={gr1.dead} gr2.dead={gr2.dead}")
可运行演示(补齐 Mock 数据与 print 反馈):
import time
import threading
from typing import Callable
try:
import gevent
import gevent.monkey
import gevent.pool
from greenlet import greenlet
HAS_GEVENT = True
except ImportError:
HAS_GEVENT = False
def section(title: str) -> None:
print(f"\n{'='*60}\n {title}\n{'='*60}")
def ts() -> str:
return f"{time.perf_counter():.3f}s"
# Step 3:greenlet 是手动挡协程,程序员决定什么时候切换。
def mode_greenlet() -> None:
section("greenlet — 手动协程切换")
if not HAS_GEVENT:
print("\n [模拟模式] greenlet 未安装,展示等效的 generator 实现\n")
# 用 generator 模拟 greenlet 的手动切换语义
execution_log: list[str] = []
def task_a():
execution_log.append("A: 开始")
yield # 切换到 B
execution_log.append("A: 恢复,继续执行")
yield # 切换到 B
execution_log.append("A: 完成")
def task_b():
execution_log.append("B: 开始")
yield # 切换回 A
execution_log.append("B: 恢复,继续执行")
gen_a, gen_b = task_a(), task_b()
# 手动交替推进
for _ in range(3):
try: next(gen_a)
except StopIteration: pass
try: next(gen_b)
except StopIteration: pass
print(" 执行顺序(手动切换):")
for i, log in enumerate(execution_log, 1):
print(f" {i}. {log}")
print("\n 💡 greenlet.switch() 等价于上面的 yield——程序员手动决定切换时机")
return
# 真实 greenlet 演示
execution_log: list[str] = []
def fun1() -> None:
execution_log.append("fun1: 开始执行")
execution_log.append("fun1: 切换到 fun2")
gr2.switch()
execution_log.append("fun1: 从 fun2 切回,继续执行")
gr2.switch()
execution_log.append("fun1: 完成")
def fun2() -> None:
execution_log.append("fun2: 开始执行")
execution_log.append("fun2: 切换回 fun1")
gr1.switch()
execution_log.append("fun2: 再次执行,完成")
gr1 = greenlet(fun1)
gr2 = greenlet(fun2)
gr1.switch() # 启动 fun1
print("\n 执行顺序(greenlet 手动切换):")
for i, log in enumerate(execution_log, 1):
print(f" {i}. {log}")
print(f"\n 关键:整个过程在单线程内完成,无任何系统调用")
print(f" gr1.dead={gr1.dead} gr2.dead={gr2.dead}")
mode_greenlet()
Step 4:用 mode_gevent 看自动 IO 调度:等待时自动让出控制权
痛点与机制:
gevent 比 greenlet 更像自动挡:你不需要到处手写 switch(),当任务遇到 IO 等待时,gevent 会把控制权交给别的 greenlet。没装 gevent 时,这段会用 threading 模拟“等待可以并发重叠”的效果。
核心源码(逐字来自文末完整源码):
def mode_gevent() -> None:
section("gevent — 自动 IO 调度")
if not HAS_GEVENT:
print("\n [模拟模式] gevent 未安装,用 threading 模拟并发效果\n")
results: list[str] = []
def simulated_task(task_id: int, delay: float) -> None:
time.sleep(delay) # 模拟 IO
results.append(f"任务{task_id} 完成(耗时 {delay:.1f}s)")
t0 = time.perf_counter()
threads = [threading.Thread(target=simulated_task, args=(i, 0.1*(i+1)))
for i in range(5)]
[t.start() for t in threads]
[t.join() for t in threads]
elapsed = time.perf_counter() - t0
print(f" 5个任务并发执行,总耗时: {elapsed:.2f}s(串行需 {0.1*15:.1f}s)")
for r in sorted(results):
print(f" {r}")
return
gevent.monkey.patch_all()
task_log: list[tuple[str, float]] = []
def io_task(task_id: int, duration: float) -> None:
start = time.perf_counter()
task_log.append((f"任务{task_id} 开始", time.perf_counter()))
gevent.sleep(duration) # 模拟 IO 阻塞,自动让出控制权
task_log.append((f"任务{task_id} 完成({duration:.1f}s)", time.perf_counter()))
durations = [0.3, 0.1, 0.2, 0.15, 0.25]
t0 = time.perf_counter()
# gevent.spawn 创建协程,joinall 等待全部完成
greenlets = [gevent.spawn(io_task, i, d) for i, d in enumerate(durations)]
gevent.joinall(greenlets)
elapsed = time.perf_counter() - t0
serial_time = sum(durations)
print(f"\n 5个 IO 任务并发执行:")
for msg, t in task_log:
print(f" [{t - t0:.3f}s] {msg}")
print(f"\n 并发总耗时: {elapsed:.2f}s 串行需: {serial_time:.2f}s")
print(f" 加速比: {serial_time/elapsed:.1f}x")
可运行演示(补齐 Mock 数据与 print 反馈):
import time
import threading
from typing import Callable
try:
import gevent
import gevent.monkey
import gevent.pool
from greenlet import greenlet
HAS_GEVENT = True
except ImportError:
HAS_GEVENT = False
def section(title: str) -> None:
print(f"\n{'='*60}\n {title}\n{'='*60}")
def ts() -> str:
return f"{time.perf_counter():.3f}s"
# Step 4:gevent 是自动挡协程,遇到 IO 等待会自动切到其他任务。
def mode_gevent() -> None:
section("gevent — 自动 IO 调度")
if not HAS_GEVENT:
print("\n [模拟模式] gevent 未安装,用 threading 模拟并发效果\n")
results: list[str] = []
def simulated_task(task_id: int, delay: float) -> None:
time.sleep(delay) # 模拟 IO
results.append(f"任务{task_id} 完成(耗时 {delay:.1f}s)")
t0 = time.perf_counter()
threads = [threading.Thread(target=simulated_task, args=(i, 0.1*(i+1)))
for i in range(5)]
[t.start() for t in threads]
[t.join() for t in threads]
elapsed = time.perf_counter() - t0
print(f" 5个任务并发执行,总耗时: {elapsed:.2f}s(串行需 {0.1*15:.1f}s)")
for r in sorted(results):
print(f" {r}")
return
gevent.monkey.patch_all()
task_log: list[tuple[str, float]] = []
def io_task(task_id: int, duration: float) -> None:
start = time.perf_counter()
task_log.append((f"任务{task_id} 开始", time.perf_counter()))
gevent.sleep(duration) # 模拟 IO 阻塞,自动让出控制权
task_log.append((f"任务{task_id} 完成({duration:.1f}s)", time.perf_counter()))
durations = [0.3, 0.1, 0.2, 0.15, 0.25]
t0 = time.perf_counter()
# gevent.spawn 创建协程,joinall 等待全部完成
greenlets = [gevent.spawn(io_task, i, d) for i, d in enumerate(durations)]
gevent.joinall(greenlets)
elapsed = time.perf_counter() - t0
serial_time = sum(durations)
print(f"\n 5个 IO 任务并发执行:")
for msg, t in task_log:
print(f" [{t - t0:.3f}s] {msg}")
print(f"\n 并发总耗时: {elapsed:.2f}s 串行需: {serial_time:.2f}s")
print(f" 加速比: {serial_time/elapsed:.1f}x")
mode_gevent()
Step 5:用 mode_patch 理解 monkey-patch:同步代码如何被替换成协程版
痛点与机制:
monkey-patch 是 gevent 的招牌能力:它会在运行时替换 socket、time.sleep 等阻塞函数,让旧同步代码也能协作式并发。这个概念容易危险,所以文章里明确提醒:patch_all() 必须尽早执行,最好在其它网络库 import 之前。
核心源码(逐字来自文末完整源码):
def mode_patch() -> None:
section("monkey-patch — 零改动让同步代码变并发")
print("""
monkey-patch 原理:
┌─────────────────────────────────────────────────────┐
│ import gevent.monkey │
│ gevent.monkey.patch_all() ← 必须在所有 import 前 │
│ │
│ 之后: │
│ socket.connect() → gevent 非阻塞版本 │
│ time.sleep() → gevent.sleep()(让出控制权) │
│ threading.Thread → gevent.Greenlet │
└─────────────────────────────────────────────────────┘
对比:
┌──────────────┬──────────────────────────────────────┐
│ asyncio │ 必须用 async/await 重写所有 IO 代码 │
│ gevent │ patch_all() 后,原有同步代码自动并发 │
└──────────────┴──────────────────────────────────────┘
典型使用场景:
# 遗留代码(同步 requests 库)
import requests
def fetch(url): return requests.get(url).status_code
# 加一行 patch,自动并发
import gevent.monkey; gevent.monkey.patch_all()
import gevent
jobs = [gevent.spawn(fetch, url) for url in urls]
gevent.joinall(jobs)
results = [j.value for j in jobs]
""")
if not HAS_GEVENT:
print(" [提示] 安装后可运行真实演示: pip install gevent")
return
# 演示 patch 前后 time.sleep 的行为差异
gevent.monkey.patch_all()
def worker(wid: int) -> float:
t0 = time.perf_counter()
time.sleep(0.1) # patch 后自动变为 gevent.sleep
return time.perf_counter() - t0
# 串行
t0 = time.perf_counter()
for i in range(5):
worker(i)
serial = time.perf_counter() - t0
# 并发(gevent.spawn)
t0 = time.perf_counter()
jobs = [gevent.spawn(worker, i) for i in range(5)]
gevent.joinall(jobs)
concurrent = time.perf_counter() - t0
print(f" 同样的 time.sleep(0.1) × 5:")
print(f" 串行执行: {serial:.2f}s")
print(f" gevent并发: {concurrent:.2f}s (patch 后自动并发)")
可运行演示(补齐 Mock 数据与 print 反馈):
import time
import threading
from typing import Callable
try:
import gevent
import gevent.monkey
import gevent.pool
from greenlet import greenlet
HAS_GEVENT = True
except ImportError:
HAS_GEVENT = False
def section(title: str) -> None:
print(f"\n{'='*60}\n {title}\n{'='*60}")
def ts() -> str:
return f"{time.perf_counter():.3f}s"
# Step 5:monkey-patch 像给标准库换零件,让同步 IO 在运行时变成协程友好版本。
def mode_patch() -> None:
section("monkey-patch — 零改动让同步代码变并发")
print("""
monkey-patch 原理:
┌─────────────────────────────────────────────────────┐
│ import gevent.monkey │
│ gevent.monkey.patch_all() ← 必须在所有 import 前 │
│ │
│ 之后: │
│ socket.connect() → gevent 非阻塞版本 │
│ time.sleep() → gevent.sleep()(让出控制权) │
│ threading.Thread → gevent.Greenlet │
└─────────────────────────────────────────────────────┘
对比:
┌──────────────┬──────────────────────────────────────┐
│ asyncio │ 必须用 async/await 重写所有 IO 代码 │
│ gevent │ patch_all() 后,原有同步代码自动并发 │
└──────────────┴──────────────────────────────────────┘
典型使用场景:
# 遗留代码(同步 requests 库)
import requests
def fetch(url): return requests.get(url).status_code
# 加一行 patch,自动并发
import gevent.monkey; gevent.monkey.patch_all()
import gevent
jobs = [gevent.spawn(fetch, url) for url in urls]
gevent.joinall(jobs)
results = [j.value for j in jobs]
""")
if not HAS_GEVENT:
print(" [提示] 安装后可运行真实演示: pip install gevent")
return
# 演示 patch 前后 time.sleep 的行为差异
gevent.monkey.patch_all()
def worker(wid: int) -> float:
t0 = time.perf_counter()
time.sleep(0.1) # patch 后自动变为 gevent.sleep
return time.perf_counter() - t0
# 串行
t0 = time.perf_counter()
for i in range(5):
worker(i)
serial = time.perf_counter() - t0
# 并发(gevent.spawn)
t0 = time.perf_counter()
jobs = [gevent.spawn(worker, i) for i in range(5)]
gevent.joinall(jobs)
concurrent = time.perf_counter() - t0
print(f" 同样的 time.sleep(0.1) × 5:")
print(f" 串行执行: {serial:.2f}s")
print(f" gevent并发: {concurrent:.2f}s (patch 后自动并发)")
mode_patch()
Step 6:用 mode_benchmark 对比串行、线程、gevent 的 IO 等待耗时
痛点与机制:
性能对比要用同一批任务才公平。mode_benchmark() 让 20 个任务各等待 0.05 秒:串行会把等待时间叠加,线程和 gevent 则能把等待重叠起来。条形图让加速比一眼可见。
核心源码(逐字来自文末完整源码):
def mode_benchmark() -> None:
section("性能基准:协程 vs 线程 vs 串行(IO密集型)")
N = 20 # 任务数
DELAY = 0.05 # 每个任务 IO 耗时
def io_work(task_id: int) -> None:
time.sleep(DELAY)
results: list[tuple[str, float]] = []
# 串行
t0 = time.perf_counter()
for i in range(N):
io_work(i)
results.append(("串行", time.perf_counter() - t0))
# 多线程
t0 = time.perf_counter()
threads = [threading.Thread(target=io_work, args=(i,)) for i in range(N)]
[t.start() for t in threads]
[t.join() for t in threads]
results.append(("多线程", time.perf_counter() - t0))
# gevent(若可用)
if HAS_GEVENT:
gevent.monkey.patch_all()
t0 = time.perf_counter()
jobs = [gevent.spawn(io_work, i) for i in range(N)]
gevent.joinall(jobs)
results.append(("gevent协程", time.perf_counter() - t0))
serial_time = results[0][1]
print(f"\n {N} 个 IO 任务(每个 {DELAY}s),理论串行 {N*DELAY:.1f}s\n")
print(f" {'方案':<12} {'耗时':<10} {'加速比':<10} 条形图")
print(f" {'─'*55}")
for name, elapsed in results:
speedup = serial_time / elapsed
bar = "█" * int(speedup * 4)
print(f" {name:<12} {elapsed:.3f}s {speedup:.1f}x {bar}")
print(f"\n 💡 IO 密集型任务:协程 ≈ 线程(都能并发等待)")
print(f" CPU 密集型任务:协程无效(GIL/单线程),需多进程")
可运行演示(补齐 Mock 数据与 print 反馈):
import time
import threading
from typing import Callable
try:
import gevent
import gevent.monkey
import gevent.pool
from greenlet import greenlet
HAS_GEVENT = True
except ImportError:
HAS_GEVENT = False
def section(title: str) -> None:
print(f"\n{'='*60}\n {title}\n{'='*60}")
def ts() -> str:
return f"{time.perf_counter():.3f}s"
# Step 6:基准测试用同一组 IO 等待任务对比不同执行方式。
def mode_benchmark() -> None:
section("性能基准:协程 vs 线程 vs 串行(IO密集型)")
N = 20 # 任务数
DELAY = 0.05 # 每个任务 IO 耗时
def io_work(task_id: int) -> None:
time.sleep(DELAY)
results: list[tuple[str, float]] = []
# 串行
t0 = time.perf_counter()
for i in range(N):
io_work(i)
results.append(("串行", time.perf_counter() - t0))
# 多线程
t0 = time.perf_counter()
threads = [threading.Thread(target=io_work, args=(i,)) for i in range(N)]
[t.start() for t in threads]
[t.join() for t in threads]
results.append(("多线程", time.perf_counter() - t0))
# gevent(若可用)
if HAS_GEVENT:
gevent.monkey.patch_all()
t0 = time.perf_counter()
jobs = [gevent.spawn(io_work, i) for i in range(N)]
gevent.joinall(jobs)
results.append(("gevent协程", time.perf_counter() - t0))
serial_time = results[0][1]
print(f"\n {N} 个 IO 任务(每个 {DELAY}s),理论串行 {N*DELAY:.1f}s\n")
print(f" {'方案':<12} {'耗时':<10} {'加速比':<10} 条形图")
print(f" {'─'*55}")
for name, elapsed in results:
speedup = serial_time / elapsed
bar = "█" * int(speedup * 4)
print(f" {name:<12} {elapsed:.3f}s {speedup:.1f}x {bar}")
print(f"\n 💡 IO 密集型任务:协程 ≈ 线程(都能并发等待)")
print(f" CPU 密集型任务:协程无效(GIL/单线程),需多进程")
mode_benchmark()
Step 7:用 main 做 greenlet/gevent/patch/benchmark 的命令行入口
痛点与机制:
最后用 argparse 收口:--mode greenlet 看手动切换,--mode gevent 看自动调度,--mode patch 看猴子补丁,--mode benchmark 看性能对比。新手不用改源码,只改参数就能切换实验。
核心源码(逐字来自文末完整源码):
def main() -> None:
parser = argparse.ArgumentParser(description="gevent/greenlet 协程库实战")
parser.add_argument(
"--mode",
choices=["greenlet", "gevent", "patch", "benchmark", "all"],
default="all",
)
args = parser.parse_args()
dispatch = {
"greenlet": mode_greenlet,
"gevent": mode_gevent,
"patch": mode_patch,
"benchmark": mode_benchmark,
"all": lambda: [mode_greenlet(), mode_gevent(),
mode_patch(), mode_benchmark()],
}
dispatch[args.mode]()
可运行演示(补齐 Mock 数据与 print 反馈):
import argparse
import time
import threading
from typing import Callable
try:
import gevent
import gevent.monkey
import gevent.pool
from greenlet import greenlet
HAS_GEVENT = True
except ImportError:
HAS_GEVENT = False
def section(title: str) -> None:
print(f"\n{'='*60}\n {title}\n{'='*60}")
def ts() -> str:
return f"{time.perf_counter():.3f}s"
def mode_greenlet() -> None:
section("greenlet — 手动协程切换")
if not HAS_GEVENT:
print("\n [模拟模式] greenlet 未安装,展示等效的 generator 实现\n")
# 用 generator 模拟 greenlet 的手动切换语义
execution_log: list[str] = []
def task_a():
execution_log.append("A: 开始")
yield # 切换到 B
execution_log.append("A: 恢复,继续执行")
yield # 切换到 B
execution_log.append("A: 完成")
def task_b():
execution_log.append("B: 开始")
yield # 切换回 A
execution_log.append("B: 恢复,继续执行")
gen_a, gen_b = task_a(), task_b()
# 手动交替推进
for _ in range(3):
try: next(gen_a)
except StopIteration: pass
try: next(gen_b)
except StopIteration: pass
print(" 执行顺序(手动切换):")
for i, log in enumerate(execution_log, 1):
print(f" {i}. {log}")
print("\n 💡 greenlet.switch() 等价于上面的 yield——程序员手动决定切换时机")
return
# 真实 greenlet 演示
execution_log: list[str] = []
def fun1() -> None:
execution_log.append("fun1: 开始执行")
execution_log.append("fun1: 切换到 fun2")
gr2.switch()
execution_log.append("fun1: 从 fun2 切回,继续执行")
gr2.switch()
execution_log.append("fun1: 完成")
def fun2() -> None:
execution_log.append("fun2: 开始执行")
execution_log.append("fun2: 切换回 fun1")
gr1.switch()
execution_log.append("fun2: 再次执行,完成")
gr1 = greenlet(fun1)
gr2 = greenlet(fun2)
gr1.switch() # 启动 fun1
print("\n 执行顺序(greenlet 手动切换):")
for i, log in enumerate(execution_log, 1):
print(f" {i}. {log}")
print(f"\n 关键:整个过程在单线程内完成,无任何系统调用")
print(f" gr1.dead={gr1.dead} gr2.dead={gr2.dead}")
def mode_gevent() -> None:
section("gevent — 自动 IO 调度")
if not HAS_GEVENT:
print("\n [模拟模式] gevent 未安装,用 threading 模拟并发效果\n")
results: list[str] = []
def simulated_task(task_id: int, delay: float) -> None:
time.sleep(delay) # 模拟 IO
results.append(f"任务{task_id} 完成(耗时 {delay:.1f}s)")
t0 = time.perf_counter()
threads = [threading.Thread(target=simulated_task, args=(i, 0.1*(i+1)))
for i in range(5)]
[t.start() for t in threads]
[t.join() for t in threads]
elapsed = time.perf_counter() - t0
print(f" 5个任务并发执行,总耗时: {elapsed:.2f}s(串行需 {0.1*15:.1f}s)")
for r in sorted(results):
print(f" {r}")
return
gevent.monkey.patch_all()
task_log: list[tuple[str, float]] = []
def io_task(task_id: int, duration: float) -> None:
start = time.perf_counter()
task_log.append((f"任务{task_id} 开始", time.perf_counter()))
gevent.sleep(duration) # 模拟 IO 阻塞,自动让出控制权
task_log.append((f"任务{task_id} 完成({duration:.1f}s)", time.perf_counter()))
durations = [0.3, 0.1, 0.2, 0.15, 0.25]
t0 = time.perf_counter()
# gevent.spawn 创建协程,joinall 等待全部完成
greenlets = [gevent.spawn(io_task, i, d) for i, d in enumerate(durations)]
gevent.joinall(greenlets)
elapsed = time.perf_counter() - t0
serial_time = sum(durations)
print(f"\n 5个 IO 任务并发执行:")
for msg, t in task_log:
print(f" [{t - t0:.3f}s] {msg}")
print(f"\n 并发总耗时: {elapsed:.2f}s 串行需: {serial_time:.2f}s")
print(f" 加速比: {serial_time/elapsed:.1f}x")
def mode_patch() -> None:
section("monkey-patch — 零改动让同步代码变并发")
print("""
monkey-patch 原理:
┌─────────────────────────────────────────────────────┐
│ import gevent.monkey │
│ gevent.monkey.patch_all() ← 必须在所有 import 前 │
│ │
│ 之后: │
│ socket.connect() → gevent 非阻塞版本 │
│ time.sleep() → gevent.sleep()(让出控制权) │
│ threading.Thread → gevent.Greenlet │
└─────────────────────────────────────────────────────┘
对比:
┌──────────────┬──────────────────────────────────────┐
│ asyncio │ 必须用 async/await 重写所有 IO 代码 │
│ gevent │ patch_all() 后,原有同步代码自动并发 │
└──────────────┴──────────────────────────────────────┘
典型使用场景:
# 遗留代码(同步 requests 库)
import requests
def fetch(url): return requests.get(url).status_code
# 加一行 patch,自动并发
import gevent.monkey; gevent.monkey.patch_all()
import gevent
jobs = [gevent.spawn(fetch, url) for url in urls]
gevent.joinall(jobs)
results = [j.value for j in jobs]
""")
if not HAS_GEVENT:
print(" [提示] 安装后可运行真实演示: pip install gevent")
return
# 演示 patch 前后 time.sleep 的行为差异
gevent.monkey.patch_all()
def worker(wid: int) -> float:
t0 = time.perf_counter()
time.sleep(0.1) # patch 后自动变为 gevent.sleep
return time.perf_counter() - t0
# 串行
t0 = time.perf_counter()
for i in range(5):
worker(i)
serial = time.perf_counter() - t0
# 并发(gevent.spawn)
t0 = time.perf_counter()
jobs = [gevent.spawn(worker, i) for i in range(5)]
gevent.joinall(jobs)
concurrent = time.perf_counter() - t0
print(f" 同样的 time.sleep(0.1) × 5:")
print(f" 串行执行: {serial:.2f}s")
print(f" gevent并发: {concurrent:.2f}s (patch 后自动并发)")
def mode_benchmark() -> None:
section("性能基准:协程 vs 线程 vs 串行(IO密集型)")
N = 20 # 任务数
DELAY = 0.05 # 每个任务 IO 耗时
def io_work(task_id: int) -> None:
time.sleep(DELAY)
results: list[tuple[str, float]] = []
# 串行
t0 = time.perf_counter()
for i in range(N):
io_work(i)
results.append(("串行", time.perf_counter() - t0))
# 多线程
t0 = time.perf_counter()
threads = [threading.Thread(target=io_work, args=(i,)) for i in range(N)]
[t.start() for t in threads]
[t.join() for t in threads]
results.append(("多线程", time.perf_counter() - t0))
# gevent(若可用)
if HAS_GEVENT:
gevent.monkey.patch_all()
t0 = time.perf_counter()
jobs = [gevent.spawn(io_work, i) for i in range(N)]
gevent.joinall(jobs)
results.append(("gevent协程", time.perf_counter() - t0))
serial_time = results[0][1]
print(f"\n {N} 个 IO 任务(每个 {DELAY}s),理论串行 {N*DELAY:.1f}s\n")
print(f" {'方案':<12} {'耗时':<10} {'加速比':<10} 条形图")
print(f" {'─'*55}")
for name, elapsed in results:
speedup = serial_time / elapsed
bar = "█" * int(speedup * 4)
print(f" {name:<12} {elapsed:.3f}s {speedup:.1f}x {bar}")
print(f"\n 💡 IO 密集型任务:协程 ≈ 线程(都能并发等待)")
print(f" CPU 密集型任务:协程无效(GIL/单线程),需多进程")
# Step 7:main 是命令行遥控器,--mode 决定看哪一种协程机制。
def main() -> None:
parser = argparse.ArgumentParser(description="gevent/greenlet 协程库实战")
parser.add_argument(
"--mode",
choices=["greenlet", "gevent", "patch", "benchmark", "all"],
default="all",
)
args = parser.parse_args()
dispatch = {
"greenlet": mode_greenlet,
"gevent": mode_gevent,
"patch": mode_patch,
"benchmark": mode_benchmark,
"all": lambda: [mode_greenlet(), mode_gevent(),
mode_patch(), mode_benchmark()],
}
dispatch[args.mode]()
import sys
for mode in ["greenlet", "benchmark"]:
print(f"\n>>> mode={mode}")
sys.argv = ["prog", "--mode", mode]
main()
极客实战:完整源码与运行
现在,把上面的积木拼起来,将以下完整代码放进你的编辑器,运行它。先看整体闭环,再回头逐段改参数,你会更容易建立工程直觉。
#!/usr/bin/env python3
"""
15-python-gevent.py — gevent/greenlet 协程库实战
用法:
python3 15-python-gevent.py --mode greenlet # greenlet 手动切换演示
python3 15-python-gevent.py --mode gevent # gevent 自动调度演示
python3 15-python-gevent.py --mode patch # monkey-patch 效果对比
python3 15-python-gevent.py --mode benchmark # 协程 vs 线程 vs 串行 性能对比
python3 15-python-gevent.py --mode all # 全部(默认)
依赖:pip install gevent greenlet
若未安装,自动降级为纯 Python 模拟演示。
"""
import argparse
import time
import threading
from typing import Callable
# ─── 依赖检测 ──────────────────────────────────────────────────────────────────
try:
import gevent
import gevent.monkey
import gevent.pool
from greenlet import greenlet
HAS_GEVENT = True
except ImportError:
HAS_GEVENT = False
# ─── 工具 ──────────────────────────────────────────────────────────────────────
def section(title: str) -> None:
print(f"\n{'='*60}\n {title}\n{'='*60}")
def ts() -> str:
return f"{time.perf_counter():.3f}s"
# ─── 模式1:greenlet 手动切换 ──────────────────────────────────────────────────
def mode_greenlet() -> None:
section("greenlet — 手动协程切换")
if not HAS_GEVENT:
print("\n [模拟模式] greenlet 未安装,展示等效的 generator 实现\n")
# 用 generator 模拟 greenlet 的手动切换语义
execution_log: list[str] = []
def task_a():
execution_log.append("A: 开始")
yield # 切换到 B
execution_log.append("A: 恢复,继续执行")
yield # 切换到 B
execution_log.append("A: 完成")
def task_b():
execution_log.append("B: 开始")
yield # 切换回 A
execution_log.append("B: 恢复,继续执行")
gen_a, gen_b = task_a(), task_b()
# 手动交替推进
for _ in range(3):
try: next(gen_a)
except StopIteration: pass
try: next(gen_b)
except StopIteration: pass
print(" 执行顺序(手动切换):")
for i, log in enumerate(execution_log, 1):
print(f" {i}. {log}")
print("\n 💡 greenlet.switch() 等价于上面的 yield——程序员手动决定切换时机")
return
# 真实 greenlet 演示
execution_log: list[str] = []
def fun1() -> None:
execution_log.append("fun1: 开始执行")
execution_log.append("fun1: 切换到 fun2")
gr2.switch()
execution_log.append("fun1: 从 fun2 切回,继续执行")
gr2.switch()
execution_log.append("fun1: 完成")
def fun2() -> None:
execution_log.append("fun2: 开始执行")
execution_log.append("fun2: 切换回 fun1")
gr1.switch()
execution_log.append("fun2: 再次执行,完成")
gr1 = greenlet(fun1)
gr2 = greenlet(fun2)
gr1.switch() # 启动 fun1
print("\n 执行顺序(greenlet 手动切换):")
for i, log in enumerate(execution_log, 1):
print(f" {i}. {log}")
print(f"\n 关键:整个过程在单线程内完成,无任何系统调用")
print(f" gr1.dead={gr1.dead} gr2.dead={gr2.dead}")
# ─── 模式2:gevent 自动调度 ────────────────────────────────────────────────────
def mode_gevent() -> None:
section("gevent — 自动 IO 调度")
if not HAS_GEVENT:
print("\n [模拟模式] gevent 未安装,用 threading 模拟并发效果\n")
results: list[str] = []
def simulated_task(task_id: int, delay: float) -> None:
time.sleep(delay) # 模拟 IO
results.append(f"任务{task_id} 完成(耗时 {delay:.1f}s)")
t0 = time.perf_counter()
threads = [threading.Thread(target=simulated_task, args=(i, 0.1*(i+1)))
for i in range(5)]
[t.start() for t in threads]
[t.join() for t in threads]
elapsed = time.perf_counter() - t0
print(f" 5个任务并发执行,总耗时: {elapsed:.2f}s(串行需 {0.1*15:.1f}s)")
for r in sorted(results):
print(f" {r}")
return
gevent.monkey.patch_all()
task_log: list[tuple[str, float]] = []
def io_task(task_id: int, duration: float) -> None:
start = time.perf_counter()
task_log.append((f"任务{task_id} 开始", time.perf_counter()))
gevent.sleep(duration) # 模拟 IO 阻塞,自动让出控制权
task_log.append((f"任务{task_id} 完成({duration:.1f}s)", time.perf_counter()))
durations = [0.3, 0.1, 0.2, 0.15, 0.25]
t0 = time.perf_counter()
# gevent.spawn 创建协程,joinall 等待全部完成
greenlets = [gevent.spawn(io_task, i, d) for i, d in enumerate(durations)]
gevent.joinall(greenlets)
elapsed = time.perf_counter() - t0
serial_time = sum(durations)
print(f"\n 5个 IO 任务并发执行:")
for msg, t in task_log:
print(f" [{t - t0:.3f}s] {msg}")
print(f"\n 并发总耗时: {elapsed:.2f}s 串行需: {serial_time:.2f}s")
print(f" 加速比: {serial_time/elapsed:.1f}x")
# ─── 模式3:monkey-patch 效果对比 ─────────────────────────────────────────────
def mode_patch() -> None:
section("monkey-patch — 零改动让同步代码变并发")
print("""
monkey-patch 原理:
┌─────────────────────────────────────────────────────┐
│ import gevent.monkey │
│ gevent.monkey.patch_all() ← 必须在所有 import 前 │
│ │
│ 之后: │
│ socket.connect() → gevent 非阻塞版本 │
│ time.sleep() → gevent.sleep()(让出控制权) │
│ threading.Thread → gevent.Greenlet │
└─────────────────────────────────────────────────────┘
对比:
┌──────────────┬──────────────────────────────────────┐
│ asyncio │ 必须用 async/await 重写所有 IO 代码 │
│ gevent │ patch_all() 后,原有同步代码自动并发 │
└──────────────┴──────────────────────────────────────┘
典型使用场景:
# 遗留代码(同步 requests 库)
import requests
def fetch(url): return requests.get(url).status_code
# 加一行 patch,自动并发
import gevent.monkey; gevent.monkey.patch_all()
import gevent
jobs = [gevent.spawn(fetch, url) for url in urls]
gevent.joinall(jobs)
results = [j.value for j in jobs]
""")
if not HAS_GEVENT:
print(" [提示] 安装后可运行真实演示: pip install gevent")
return
# 演示 patch 前后 time.sleep 的行为差异
gevent.monkey.patch_all()
def worker(wid: int) -> float:
t0 = time.perf_counter()
time.sleep(0.1) # patch 后自动变为 gevent.sleep
return time.perf_counter() - t0
# 串行
t0 = time.perf_counter()
for i in range(5):
worker(i)
serial = time.perf_counter() - t0
# 并发(gevent.spawn)
t0 = time.perf_counter()
jobs = [gevent.spawn(worker, i) for i in range(5)]
gevent.joinall(jobs)
concurrent = time.perf_counter() - t0
print(f" 同样的 time.sleep(0.1) × 5:")
print(f" 串行执行: {serial:.2f}s")
print(f" gevent并发: {concurrent:.2f}s (patch 后自动并发)")
# ─── 模式4:性能基准对比 ───────────────────────────────────────────────────────
def mode_benchmark() -> None:
section("性能基准:协程 vs 线程 vs 串行(IO密集型)")
N = 20 # 任务数
DELAY = 0.05 # 每个任务 IO 耗时
def io_work(task_id: int) -> None:
time.sleep(DELAY)
results: list[tuple[str, float]] = []
# 串行
t0 = time.perf_counter()
for i in range(N):
io_work(i)
results.append(("串行", time.perf_counter() - t0))
# 多线程
t0 = time.perf_counter()
threads = [threading.Thread(target=io_work, args=(i,)) for i in range(N)]
[t.start() for t in threads]
[t.join() for t in threads]
results.append(("多线程", time.perf_counter() - t0))
# gevent(若可用)
if HAS_GEVENT:
gevent.monkey.patch_all()
t0 = time.perf_counter()
jobs = [gevent.spawn(io_work, i) for i in range(N)]
gevent.joinall(jobs)
results.append(("gevent协程", time.perf_counter() - t0))
serial_time = results[0][1]
print(f"\n {N} 个 IO 任务(每个 {DELAY}s),理论串行 {N*DELAY:.1f}s\n")
print(f" {'方案':<12} {'耗时':<10} {'加速比':<10} 条形图")
print(f" {'─'*55}")
for name, elapsed in results:
speedup = serial_time / elapsed
bar = "█" * int(speedup * 4)
print(f" {name:<12} {elapsed:.3f}s {speedup:.1f}x {bar}")
print(f"\n 💡 IO 密集型任务:协程 ≈ 线程(都能并发等待)")
print(f" CPU 密集型任务:协程无效(GIL/单线程),需多进程")
# ─── 入口 ─────────────────────────────────────────────────────────────────────
def main() -> None:
parser = argparse.ArgumentParser(description="gevent/greenlet 协程库实战")
parser.add_argument(
"--mode",
choices=["greenlet", "gevent", "patch", "benchmark", "all"],
default="all",
)
args = parser.parse_args()
dispatch = {
"greenlet": mode_greenlet,
"gevent": mode_gevent,
"patch": mode_patch,
"benchmark": mode_benchmark,
"all": lambda: [mode_greenlet(), mode_gevent(),
mode_patch(), mode_benchmark()],
}
dispatch[args.mode]()
if __name__ == "__main__":
main()
# 安装依赖(可选,未安装时自动降级为模拟模式)
$ pip install gevent greenlet
$ python3 15-python-gevent.py --mode all
============================================================
greenlet — 手动协程切换
============================================================
执行顺序(greenlet 手动切换):
1. fun1: 开始执行
2. fun1: 切换到 fun2
3. fun2: 开始执行
4. fun2: 切换回 fun1
5. fun1: 从 fun2 切回,继续执行
6. fun2: 再次执行,完成
7. fun1: 完成
关键:整个过程在单线程内完成,无任何系统调用
gr1.dead=True gr2.dead=True
============================================================
gevent — 自动 IO 调度
============================================================
5个 IO 任务并发执行:
[0.000s] 任务0 开始
[0.001s] 任务1 开始
[0.001s] 任务2 开始
[0.001s] 任务3 开始
[0.001s] 任务4 开始
[0.101s] 任务1 完成(0.1s)
[0.151s] 任务3 完成(0.15s)
[0.201s] 任务2 完成(0.2s)
[0.251s] 任务4 完成(0.25s)
[0.301s] 任务0 完成(0.3s)
并发总耗时: 0.31s 串行需: 1.00s
加速比: 3.2x
============================================================
性能基准:协程 vs 线程 vs 串行(IO密集型)
============================================================
20 个 IO 任务(每个 0.05s),理论串行 1.0s
方案 耗时 加速比 条形图
───────────────────────────────────────────────────────
串行 1.003s 1.0x ████
多线程 0.058s 17.3x ████████████████████████████████████████████████████████████████████
gevent协程 0.052s 19.3x ████████████████████████████████████████████████████████████████████████████
💡 IO 密集型任务:协程 ≈ 线程(都能并发等待)
CPU 密集型任务:协程无效(GIL/单线程),需多进程
核心要点
greenlet vs gevent vs asyncio 选择:
新项目 IO 并发 → asyncio(标准库,生态最好)
改造遗留同步代码 → gevent + monkey-patch(零改动)
理解协程底层 → greenlet(最接近原理)
gevent 的限制:
- CPU 密集型任务无效(单线程,协程不能并行计算)
monkey-patch必须在所有import之前执行- 与某些 C 扩展库不兼容(如
numpy的部分操作)
gevent Pool 控制并发数:
import time
try:
import gevent
import gevent.pool
HAS_GEVENT = True
except ImportError:
HAS_GEVENT = False
def fetch(url: str) -> str:
# 教学演示不访问真实网络,用 sleep 模拟一次 IO 等待。
time.sleep(0.05)
return f"OK: {url}"
urls = [f"https://example.com/{i}" for i in range(6)]
if HAS_GEVENT:
pool = gevent.pool.Pool(3) # 最多同时 3 个协程
jobs = [pool.spawn(fetch, url) for url in urls]
gevent.joinall(jobs)
results = [job.value for job in jobs]
else:
# 没安装 gevent 时仍然给出可见反馈,避免新手运行报错。
results = [fetch(url) for url in urls]
print("结果数量:", len(results))
print("第一条:", results[0])
NexDo Time ⚡
用 gevent.pool.Pool 实现一个并发限速的 URL 批量检测器,控制最大并发数为 5:
import time
try:
import gevent
import gevent.pool
HAS_GEVENT = True
except ImportError:
HAS_GEVENT = False
def check(url: str) -> tuple[str, int]:
# 零外部依赖:不用真的访问 httpbin,用 sleep 模拟网络等待。
time.sleep(0.05)
return url, 200
urls = [f"https://mock.local/delay/{i}" for i in range(10)]
if HAS_GEVENT:
pool = gevent.pool.Pool(5) # 最大并发数为 5
results = pool.map(check, urls)
else:
results = [check(url) for url in urls]
print("检测数量:", len(results))
print("前3条:", results[:3])
Don’t wait for next time, do it in the next moment.