文章

15 · gevent/greenlet:协程库与高并发 IO 实战

#039 · 2026-04-17 · Python

🔗 知识图谱导航:阅读本文前,建议先掌握/回顾 《14 · 协程实战:asyncio 实现高并发》 中的核心概念;本文会在这个基础上继续推进。 承上启下:14篇用 asyncio 实现了协程并发。asyncio 是标准库,而 gevent/greenlet 是更早出现的第三方协程方案——理解它们的差异,才能真正搞懂 Python 协程的底层机制。

极客解析:并发不是把代码写复杂,而是把等待、调度和资源隔离讲清楚;本文用本机可运行的 Mock 场景验证机制。


痛点与架构

asyncio 要求代码全部用 async/await 重写,改造成本高。geventmonkey-patch 在运行时替换标准库的阻塞调用,让已有同步代码自动变成协程并发,零改动。

greenlet(手动切换):
  gr1 ──switch()──▶ gr2 ──switch()──▶ gr1
  程序员手动决定何时切换,类似 yield

gevent(自动调度):
  遇到 IO 阻塞 ──▶ 自动切换到其他协程
  IO 完成     ──▶ 自动切回继续执行
  底层:libev 事件循环

monkey-patch 原理:
  import gevent.monkey; gevent.monkey.patch_all()
  ↓
  socket.connect  →  gevent 版(非阻塞)
  time.sleep      →  gevent 版(让出控制权)
  threading.Thread →  gevent 版(绿色线程)
方案 切换方式 改造成本 适用场景
greenlet 手动 switch() 理解协程原理
gevent 自动(IO触发) 低(monkey-patch) 改造遗留同步代码
asyncio await 关键字 中(需重写) 新项目标准选择

实战演练场

步步为营:核心逻辑自适应拆解

这一篇先把结论说清楚:greenlet 是手动切换,gevent 是遇到 IO 自动切换,monkey-patch 是把旧同步 IO 函数替换成协程友好版本。下面每一步都保留降级路径,即使你本机没装 gevent,也能跑出结果、看懂机制。

Step 1:先检测 gevent 是否安装,保证没装也能学

痛点与机制

gevent 是第三方库,公网读者不一定已经安装。教程代码不能一上来就 ImportError。这段 try/except 把环境差异收住:有 gevent 就跑真实协程,没 gevent 就用 generator/threading 模拟,让学习不中断。

核心源码(逐字来自文末完整源码)

try:
    import gevent
    import gevent.monkey
    import gevent.pool
    from greenlet import greenlet
    HAS_GEVENT = True
except ImportError:
    HAS_GEVENT = False

可运行演示(补齐 Mock 数据与 print 反馈)


try:
    import gevent
    import gevent.monkey
    import gevent.pool
    from greenlet import greenlet
    HAS_GEVENT = True
except ImportError:
    HAS_GEVENT = False

print("是否安装 gevent:", HAS_GEVENT)
if not HAS_GEVENT:
    print("当前将使用纯 Python 模拟演示,仍然能理解协程切换思想。")

Step 2:用 section 和 ts 统一输出格式,让终端结果更好读

痛点与机制

并发演示输出很多,先统一标题和时间戳,读者才不会被散乱打印淹没。section() 像章节标题,ts()perf_counter() 给性能测量准备高精度时间。

核心源码(逐字来自文末完整源码)

def section(title: str) -> None:
    print(f"\n{'='*60}\n  {title}\n{'='*60}")

def ts() -> str:
    return f"{time.perf_counter():.3f}s"

可运行演示(补齐 Mock 数据与 print 反馈)

import time

# Step 2:工具函数负责排版和时间戳,不掺杂业务逻辑。
def section(title: str) -> None:
    print(f"\n{'='*60}\n  {title}\n{'='*60}")

def ts() -> str:
    return f"{time.perf_counter():.3f}s"

section("输出工具演示")
print("当前时间戳:", ts())

Step 3:用 mode_greenlet 理解手动切换:谁 switch,谁让出舞台

痛点与机制

greenlet 的核心是手动切换,就像两个人共用一个麦克风:A 说到一半主动把麦克风交给 B,B 说完再交回来。未安装 greenlet 时,源码用 generator 的 yield 模拟同样的“手动让出”语义。

核心源码(逐字来自文末完整源码)

def mode_greenlet() -> None:
    section("greenlet — 手动协程切换")

    if not HAS_GEVENT:
        print("\n  [模拟模式] greenlet 未安装,展示等效的 generator 实现\n")
        # 用 generator 模拟 greenlet 的手动切换语义
        execution_log: list[str] = []

        def task_a():
            execution_log.append("A: 开始")
            yield  # 切换到 B
            execution_log.append("A: 恢复,继续执行")
            yield  # 切换到 B
            execution_log.append("A: 完成")

        def task_b():
            execution_log.append("B: 开始")
            yield  # 切换回 A
            execution_log.append("B: 恢复,继续执行")

        gen_a, gen_b = task_a(), task_b()
        # 手动交替推进
        for _ in range(3):
            try: next(gen_a)
            except StopIteration: pass
            try: next(gen_b)
            except StopIteration: pass

        print("  执行顺序(手动切换):")
        for i, log in enumerate(execution_log, 1):
            print(f"  {i}. {log}")
        print("\n  💡 greenlet.switch() 等价于上面的 yield——程序员手动决定切换时机")
        return

    # 真实 greenlet 演示
    execution_log: list[str] = []

    def fun1() -> None:
        execution_log.append("fun1: 开始执行")
        execution_log.append("fun1: 切换到 fun2")
        gr2.switch()
        execution_log.append("fun1: 从 fun2 切回,继续执行")
        gr2.switch()
        execution_log.append("fun1: 完成")

    def fun2() -> None:
        execution_log.append("fun2: 开始执行")
        execution_log.append("fun2: 切换回 fun1")
        gr1.switch()
        execution_log.append("fun2: 再次执行,完成")

    gr1 = greenlet(fun1)
    gr2 = greenlet(fun2)
    gr1.switch()   # 启动 fun1

    print("\n  执行顺序(greenlet 手动切换):")
    for i, log in enumerate(execution_log, 1):
        print(f"  {i}. {log}")

    print(f"\n  关键:整个过程在单线程内完成,无任何系统调用")
    print(f"  gr1.dead={gr1.dead}  gr2.dead={gr2.dead}")

可运行演示(补齐 Mock 数据与 print 反馈)

import time
import threading
from typing import Callable

try:
    import gevent
    import gevent.monkey
    import gevent.pool
    from greenlet import greenlet
    HAS_GEVENT = True
except ImportError:
    HAS_GEVENT = False

def section(title: str) -> None:
    print(f"\n{'='*60}\n  {title}\n{'='*60}")

def ts() -> str:
    return f"{time.perf_counter():.3f}s"

# Step 3:greenlet 是手动挡协程,程序员决定什么时候切换。
def mode_greenlet() -> None:
    section("greenlet — 手动协程切换")

    if not HAS_GEVENT:
        print("\n  [模拟模式] greenlet 未安装,展示等效的 generator 实现\n")
        # 用 generator 模拟 greenlet 的手动切换语义
        execution_log: list[str] = []

        def task_a():
            execution_log.append("A: 开始")
            yield  # 切换到 B
            execution_log.append("A: 恢复,继续执行")
            yield  # 切换到 B
            execution_log.append("A: 完成")

        def task_b():
            execution_log.append("B: 开始")
            yield  # 切换回 A
            execution_log.append("B: 恢复,继续执行")

        gen_a, gen_b = task_a(), task_b()
        # 手动交替推进
        for _ in range(3):
            try: next(gen_a)
            except StopIteration: pass
            try: next(gen_b)
            except StopIteration: pass

        print("  执行顺序(手动切换):")
        for i, log in enumerate(execution_log, 1):
            print(f"  {i}. {log}")
        print("\n  💡 greenlet.switch() 等价于上面的 yield——程序员手动决定切换时机")
        return

    # 真实 greenlet 演示
    execution_log: list[str] = []

    def fun1() -> None:
        execution_log.append("fun1: 开始执行")
        execution_log.append("fun1: 切换到 fun2")
        gr2.switch()
        execution_log.append("fun1: 从 fun2 切回,继续执行")
        gr2.switch()
        execution_log.append("fun1: 完成")

    def fun2() -> None:
        execution_log.append("fun2: 开始执行")
        execution_log.append("fun2: 切换回 fun1")
        gr1.switch()
        execution_log.append("fun2: 再次执行,完成")

    gr1 = greenlet(fun1)
    gr2 = greenlet(fun2)
    gr1.switch()   # 启动 fun1

    print("\n  执行顺序(greenlet 手动切换):")
    for i, log in enumerate(execution_log, 1):
        print(f"  {i}. {log}")

    print(f"\n  关键:整个过程在单线程内完成,无任何系统调用")
    print(f"  gr1.dead={gr1.dead}  gr2.dead={gr2.dead}")

mode_greenlet()

Step 4:用 mode_gevent 看自动 IO 调度:等待时自动让出控制权

痛点与机制

geventgreenlet 更像自动挡:你不需要到处手写 switch(),当任务遇到 IO 等待时,gevent 会把控制权交给别的 greenlet。没装 gevent 时,这段会用 threading 模拟“等待可以并发重叠”的效果。

核心源码(逐字来自文末完整源码)

def mode_gevent() -> None:
    section("gevent — 自动 IO 调度")

    if not HAS_GEVENT:
        print("\n  [模拟模式] gevent 未安装,用 threading 模拟并发效果\n")
        results: list[str] = []

        def simulated_task(task_id: int, delay: float) -> None:
            time.sleep(delay)   # 模拟 IO
            results.append(f"任务{task_id} 完成(耗时 {delay:.1f}s)")

        t0 = time.perf_counter()
        threads = [threading.Thread(target=simulated_task, args=(i, 0.1*(i+1)))
                   for i in range(5)]
        [t.start() for t in threads]
        [t.join() for t in threads]
        elapsed = time.perf_counter() - t0

        print(f"  5个任务并发执行,总耗时: {elapsed:.2f}s(串行需 {0.1*15:.1f}s)")
        for r in sorted(results):
            print(f"  {r}")
        return

    gevent.monkey.patch_all()

    task_log: list[tuple[str, float]] = []

    def io_task(task_id: int, duration: float) -> None:
        start = time.perf_counter()
        task_log.append((f"任务{task_id} 开始", time.perf_counter()))
        gevent.sleep(duration)   # 模拟 IO 阻塞,自动让出控制权
        task_log.append((f"任务{task_id} 完成({duration:.1f}s)", time.perf_counter()))

    durations = [0.3, 0.1, 0.2, 0.15, 0.25]
    t0 = time.perf_counter()

    # gevent.spawn 创建协程,joinall 等待全部完成
    greenlets = [gevent.spawn(io_task, i, d) for i, d in enumerate(durations)]
    gevent.joinall(greenlets)

    elapsed = time.perf_counter() - t0
    serial_time = sum(durations)

    print(f"\n  5个 IO 任务并发执行:")
    for msg, t in task_log:
        print(f"  [{t - t0:.3f}s] {msg}")
    print(f"\n  并发总耗时: {elapsed:.2f}s  串行需: {serial_time:.2f}s")
    print(f"  加速比: {serial_time/elapsed:.1f}x")

可运行演示(补齐 Mock 数据与 print 反馈)

import time
import threading
from typing import Callable

try:
    import gevent
    import gevent.monkey
    import gevent.pool
    from greenlet import greenlet
    HAS_GEVENT = True
except ImportError:
    HAS_GEVENT = False

def section(title: str) -> None:
    print(f"\n{'='*60}\n  {title}\n{'='*60}")

def ts() -> str:
    return f"{time.perf_counter():.3f}s"

# Step 4:gevent 是自动挡协程,遇到 IO 等待会自动切到其他任务。
def mode_gevent() -> None:
    section("gevent — 自动 IO 调度")

    if not HAS_GEVENT:
        print("\n  [模拟模式] gevent 未安装,用 threading 模拟并发效果\n")
        results: list[str] = []

        def simulated_task(task_id: int, delay: float) -> None:
            time.sleep(delay)   # 模拟 IO
            results.append(f"任务{task_id} 完成(耗时 {delay:.1f}s)")

        t0 = time.perf_counter()
        threads = [threading.Thread(target=simulated_task, args=(i, 0.1*(i+1)))
                   for i in range(5)]
        [t.start() for t in threads]
        [t.join() for t in threads]
        elapsed = time.perf_counter() - t0

        print(f"  5个任务并发执行,总耗时: {elapsed:.2f}s(串行需 {0.1*15:.1f}s)")
        for r in sorted(results):
            print(f"  {r}")
        return

    gevent.monkey.patch_all()

    task_log: list[tuple[str, float]] = []

    def io_task(task_id: int, duration: float) -> None:
        start = time.perf_counter()
        task_log.append((f"任务{task_id} 开始", time.perf_counter()))
        gevent.sleep(duration)   # 模拟 IO 阻塞,自动让出控制权
        task_log.append((f"任务{task_id} 完成({duration:.1f}s)", time.perf_counter()))

    durations = [0.3, 0.1, 0.2, 0.15, 0.25]
    t0 = time.perf_counter()

    # gevent.spawn 创建协程,joinall 等待全部完成
    greenlets = [gevent.spawn(io_task, i, d) for i, d in enumerate(durations)]
    gevent.joinall(greenlets)

    elapsed = time.perf_counter() - t0
    serial_time = sum(durations)

    print(f"\n  5个 IO 任务并发执行:")
    for msg, t in task_log:
        print(f"  [{t - t0:.3f}s] {msg}")
    print(f"\n  并发总耗时: {elapsed:.2f}s  串行需: {serial_time:.2f}s")
    print(f"  加速比: {serial_time/elapsed:.1f}x")

mode_gevent()

Step 5:用 mode_patch 理解 monkey-patch:同步代码如何被替换成协程版

痛点与机制

monkey-patch 是 gevent 的招牌能力:它会在运行时替换 sockettime.sleep 等阻塞函数,让旧同步代码也能协作式并发。这个概念容易危险,所以文章里明确提醒:patch_all() 必须尽早执行,最好在其它网络库 import 之前。

核心源码(逐字来自文末完整源码)

def mode_patch() -> None:
    section("monkey-patch — 零改动让同步代码变并发")

    print("""
  monkey-patch 原理:
  ┌─────────────────────────────────────────────────────┐
  │  import gevent.monkey                               │
  │  gevent.monkey.patch_all()   ← 必须在所有 import 前 │
  │                                                     │
  │  之后:                                             │
  │  socket.connect()  →  gevent 非阻塞版本             │
  │  time.sleep()      →  gevent.sleep()(让出控制权)  │
  │  threading.Thread  →  gevent.Greenlet               │
  └─────────────────────────────────────────────────────┘

  对比:
  ┌──────────────┬──────────────────────────────────────┐
  │ asyncio      │ 必须用 async/await 重写所有 IO 代码   │
  │ gevent       │ patch_all() 后,原有同步代码自动并发  │
  └──────────────┴──────────────────────────────────────┘

  典型使用场景:
    # 遗留代码(同步 requests 库)
    import requests
    def fetch(url): return requests.get(url).status_code

    # 加一行 patch,自动并发
    import gevent.monkey; gevent.monkey.patch_all()
    import gevent
    jobs = [gevent.spawn(fetch, url) for url in urls]
    gevent.joinall(jobs)
    results = [j.value for j in jobs]
""")

    if not HAS_GEVENT:
        print("  [提示] 安装后可运行真实演示: pip install gevent")
        return

    # 演示 patch 前后 time.sleep 的行为差异
    gevent.monkey.patch_all()

    def worker(wid: int) -> float:
        t0 = time.perf_counter()
        time.sleep(0.1)   # patch 后自动变为 gevent.sleep
        return time.perf_counter() - t0

    # 串行
    t0 = time.perf_counter()
    for i in range(5):
        worker(i)
    serial = time.perf_counter() - t0

    # 并发(gevent.spawn)
    t0 = time.perf_counter()
    jobs = [gevent.spawn(worker, i) for i in range(5)]
    gevent.joinall(jobs)
    concurrent = time.perf_counter() - t0

    print(f"  同样的 time.sleep(0.1) × 5:")
    print(f"  串行执行: {serial:.2f}s")
    print(f"  gevent并发: {concurrent:.2f}s  (patch 后自动并发)")

可运行演示(补齐 Mock 数据与 print 反馈)

import time
import threading
from typing import Callable

try:
    import gevent
    import gevent.monkey
    import gevent.pool
    from greenlet import greenlet
    HAS_GEVENT = True
except ImportError:
    HAS_GEVENT = False

def section(title: str) -> None:
    print(f"\n{'='*60}\n  {title}\n{'='*60}")

def ts() -> str:
    return f"{time.perf_counter():.3f}s"

# Step 5:monkey-patch 像给标准库换零件,让同步 IO 在运行时变成协程友好版本。
def mode_patch() -> None:
    section("monkey-patch — 零改动让同步代码变并发")

    print("""
  monkey-patch 原理:
  ┌─────────────────────────────────────────────────────┐
  │  import gevent.monkey                               │
  │  gevent.monkey.patch_all()   ← 必须在所有 import 前 │
  │                                                     │
  │  之后:                                             │
  │  socket.connect()  →  gevent 非阻塞版本             │
  │  time.sleep()      →  gevent.sleep()(让出控制权)  │
  │  threading.Thread  →  gevent.Greenlet               │
  └─────────────────────────────────────────────────────┘

  对比:
  ┌──────────────┬──────────────────────────────────────┐
  │ asyncio      │ 必须用 async/await 重写所有 IO 代码   │
  │ gevent       │ patch_all() 后,原有同步代码自动并发  │
  └──────────────┴──────────────────────────────────────┘

  典型使用场景:
    # 遗留代码(同步 requests 库)
    import requests
    def fetch(url): return requests.get(url).status_code

    # 加一行 patch,自动并发
    import gevent.monkey; gevent.monkey.patch_all()
    import gevent
    jobs = [gevent.spawn(fetch, url) for url in urls]
    gevent.joinall(jobs)
    results = [j.value for j in jobs]
""")

    if not HAS_GEVENT:
        print("  [提示] 安装后可运行真实演示: pip install gevent")
        return

    # 演示 patch 前后 time.sleep 的行为差异
    gevent.monkey.patch_all()

    def worker(wid: int) -> float:
        t0 = time.perf_counter()
        time.sleep(0.1)   # patch 后自动变为 gevent.sleep
        return time.perf_counter() - t0

    # 串行
    t0 = time.perf_counter()
    for i in range(5):
        worker(i)
    serial = time.perf_counter() - t0

    # 并发(gevent.spawn)
    t0 = time.perf_counter()
    jobs = [gevent.spawn(worker, i) for i in range(5)]
    gevent.joinall(jobs)
    concurrent = time.perf_counter() - t0

    print(f"  同样的 time.sleep(0.1) × 5:")
    print(f"  串行执行: {serial:.2f}s")
    print(f"  gevent并发: {concurrent:.2f}s  (patch 后自动并发)")

mode_patch()

Step 6:用 mode_benchmark 对比串行、线程、gevent 的 IO 等待耗时

痛点与机制

性能对比要用同一批任务才公平。mode_benchmark() 让 20 个任务各等待 0.05 秒:串行会把等待时间叠加,线程和 gevent 则能把等待重叠起来。条形图让加速比一眼可见。

核心源码(逐字来自文末完整源码)

def mode_benchmark() -> None:
    section("性能基准:协程 vs 线程 vs 串行(IO密集型)")

    N = 20          # 任务数
    DELAY = 0.05    # 每个任务 IO 耗时

    def io_work(task_id: int) -> None:
        time.sleep(DELAY)

    results: list[tuple[str, float]] = []

    # 串行
    t0 = time.perf_counter()
    for i in range(N):
        io_work(i)
    results.append(("串行", time.perf_counter() - t0))

    # 多线程
    t0 = time.perf_counter()
    threads = [threading.Thread(target=io_work, args=(i,)) for i in range(N)]
    [t.start() for t in threads]
    [t.join() for t in threads]
    results.append(("多线程", time.perf_counter() - t0))

    # gevent(若可用)
    if HAS_GEVENT:
        gevent.monkey.patch_all()
        t0 = time.perf_counter()
        jobs = [gevent.spawn(io_work, i) for i in range(N)]
        gevent.joinall(jobs)
        results.append(("gevent协程", time.perf_counter() - t0))

    serial_time = results[0][1]
    print(f"\n  {N} 个 IO 任务(每个 {DELAY}s),理论串行 {N*DELAY:.1f}s\n")
    print(f"  {'方案':<12} {'耗时':<10} {'加速比':<10} 条形图")
    print(f"  {'─'*55}")
    for name, elapsed in results:
        speedup = serial_time / elapsed
        bar = "█" * int(speedup * 4)
        print(f"  {name:<12} {elapsed:.3f}s    {speedup:.1f}x       {bar}")

    print(f"\n  💡 IO 密集型任务:协程 ≈ 线程(都能并发等待)")
    print(f"     CPU 密集型任务:协程无效(GIL/单线程),需多进程")

可运行演示(补齐 Mock 数据与 print 反馈)

import time
import threading
from typing import Callable

try:
    import gevent
    import gevent.monkey
    import gevent.pool
    from greenlet import greenlet
    HAS_GEVENT = True
except ImportError:
    HAS_GEVENT = False

def section(title: str) -> None:
    print(f"\n{'='*60}\n  {title}\n{'='*60}")

def ts() -> str:
    return f"{time.perf_counter():.3f}s"

# Step 6:基准测试用同一组 IO 等待任务对比不同执行方式。
def mode_benchmark() -> None:
    section("性能基准:协程 vs 线程 vs 串行(IO密集型)")

    N = 20          # 任务数
    DELAY = 0.05    # 每个任务 IO 耗时

    def io_work(task_id: int) -> None:
        time.sleep(DELAY)

    results: list[tuple[str, float]] = []

    # 串行
    t0 = time.perf_counter()
    for i in range(N):
        io_work(i)
    results.append(("串行", time.perf_counter() - t0))

    # 多线程
    t0 = time.perf_counter()
    threads = [threading.Thread(target=io_work, args=(i,)) for i in range(N)]
    [t.start() for t in threads]
    [t.join() for t in threads]
    results.append(("多线程", time.perf_counter() - t0))

    # gevent(若可用)
    if HAS_GEVENT:
        gevent.monkey.patch_all()
        t0 = time.perf_counter()
        jobs = [gevent.spawn(io_work, i) for i in range(N)]
        gevent.joinall(jobs)
        results.append(("gevent协程", time.perf_counter() - t0))

    serial_time = results[0][1]
    print(f"\n  {N} 个 IO 任务(每个 {DELAY}s),理论串行 {N*DELAY:.1f}s\n")
    print(f"  {'方案':<12} {'耗时':<10} {'加速比':<10} 条形图")
    print(f"  {'─'*55}")
    for name, elapsed in results:
        speedup = serial_time / elapsed
        bar = "█" * int(speedup * 4)
        print(f"  {name:<12} {elapsed:.3f}s    {speedup:.1f}x       {bar}")

    print(f"\n  💡 IO 密集型任务:协程 ≈ 线程(都能并发等待)")
    print(f"     CPU 密集型任务:协程无效(GIL/单线程),需多进程")

mode_benchmark()

Step 7:用 main 做 greenlet/gevent/patch/benchmark 的命令行入口

痛点与机制

最后用 argparse 收口:--mode greenlet 看手动切换,--mode gevent 看自动调度,--mode patch 看猴子补丁,--mode benchmark 看性能对比。新手不用改源码,只改参数就能切换实验。

核心源码(逐字来自文末完整源码)

def main() -> None:
    parser = argparse.ArgumentParser(description="gevent/greenlet 协程库实战")
    parser.add_argument(
        "--mode",
        choices=["greenlet", "gevent", "patch", "benchmark", "all"],
        default="all",
    )
    args = parser.parse_args()
    dispatch = {
        "greenlet":  mode_greenlet,
        "gevent":    mode_gevent,
        "patch":     mode_patch,
        "benchmark": mode_benchmark,
        "all":       lambda: [mode_greenlet(), mode_gevent(),
                               mode_patch(), mode_benchmark()],
    }
    dispatch[args.mode]()

可运行演示(补齐 Mock 数据与 print 反馈)

import argparse
import time
import threading
from typing import Callable

try:
    import gevent
    import gevent.monkey
    import gevent.pool
    from greenlet import greenlet
    HAS_GEVENT = True
except ImportError:
    HAS_GEVENT = False

def section(title: str) -> None:
    print(f"\n{'='*60}\n  {title}\n{'='*60}")

def ts() -> str:
    return f"{time.perf_counter():.3f}s"

def mode_greenlet() -> None:
    section("greenlet — 手动协程切换")

    if not HAS_GEVENT:
        print("\n  [模拟模式] greenlet 未安装,展示等效的 generator 实现\n")
        # 用 generator 模拟 greenlet 的手动切换语义
        execution_log: list[str] = []

        def task_a():
            execution_log.append("A: 开始")
            yield  # 切换到 B
            execution_log.append("A: 恢复,继续执行")
            yield  # 切换到 B
            execution_log.append("A: 完成")

        def task_b():
            execution_log.append("B: 开始")
            yield  # 切换回 A
            execution_log.append("B: 恢复,继续执行")

        gen_a, gen_b = task_a(), task_b()
        # 手动交替推进
        for _ in range(3):
            try: next(gen_a)
            except StopIteration: pass
            try: next(gen_b)
            except StopIteration: pass

        print("  执行顺序(手动切换):")
        for i, log in enumerate(execution_log, 1):
            print(f"  {i}. {log}")
        print("\n  💡 greenlet.switch() 等价于上面的 yield——程序员手动决定切换时机")
        return

    # 真实 greenlet 演示
    execution_log: list[str] = []

    def fun1() -> None:
        execution_log.append("fun1: 开始执行")
        execution_log.append("fun1: 切换到 fun2")
        gr2.switch()
        execution_log.append("fun1: 从 fun2 切回,继续执行")
        gr2.switch()
        execution_log.append("fun1: 完成")

    def fun2() -> None:
        execution_log.append("fun2: 开始执行")
        execution_log.append("fun2: 切换回 fun1")
        gr1.switch()
        execution_log.append("fun2: 再次执行,完成")

    gr1 = greenlet(fun1)
    gr2 = greenlet(fun2)
    gr1.switch()   # 启动 fun1

    print("\n  执行顺序(greenlet 手动切换):")
    for i, log in enumerate(execution_log, 1):
        print(f"  {i}. {log}")

    print(f"\n  关键:整个过程在单线程内完成,无任何系统调用")
    print(f"  gr1.dead={gr1.dead}  gr2.dead={gr2.dead}")

def mode_gevent() -> None:
    section("gevent — 自动 IO 调度")

    if not HAS_GEVENT:
        print("\n  [模拟模式] gevent 未安装,用 threading 模拟并发效果\n")
        results: list[str] = []

        def simulated_task(task_id: int, delay: float) -> None:
            time.sleep(delay)   # 模拟 IO
            results.append(f"任务{task_id} 完成(耗时 {delay:.1f}s)")

        t0 = time.perf_counter()
        threads = [threading.Thread(target=simulated_task, args=(i, 0.1*(i+1)))
                   for i in range(5)]
        [t.start() for t in threads]
        [t.join() for t in threads]
        elapsed = time.perf_counter() - t0

        print(f"  5个任务并发执行,总耗时: {elapsed:.2f}s(串行需 {0.1*15:.1f}s)")
        for r in sorted(results):
            print(f"  {r}")
        return

    gevent.monkey.patch_all()

    task_log: list[tuple[str, float]] = []

    def io_task(task_id: int, duration: float) -> None:
        start = time.perf_counter()
        task_log.append((f"任务{task_id} 开始", time.perf_counter()))
        gevent.sleep(duration)   # 模拟 IO 阻塞,自动让出控制权
        task_log.append((f"任务{task_id} 完成({duration:.1f}s)", time.perf_counter()))

    durations = [0.3, 0.1, 0.2, 0.15, 0.25]
    t0 = time.perf_counter()

    # gevent.spawn 创建协程,joinall 等待全部完成
    greenlets = [gevent.spawn(io_task, i, d) for i, d in enumerate(durations)]
    gevent.joinall(greenlets)

    elapsed = time.perf_counter() - t0
    serial_time = sum(durations)

    print(f"\n  5个 IO 任务并发执行:")
    for msg, t in task_log:
        print(f"  [{t - t0:.3f}s] {msg}")
    print(f"\n  并发总耗时: {elapsed:.2f}s  串行需: {serial_time:.2f}s")
    print(f"  加速比: {serial_time/elapsed:.1f}x")

def mode_patch() -> None:
    section("monkey-patch — 零改动让同步代码变并发")

    print("""
  monkey-patch 原理:
  ┌─────────────────────────────────────────────────────┐
  │  import gevent.monkey                               │
  │  gevent.monkey.patch_all()   ← 必须在所有 import 前 │
  │                                                     │
  │  之后:                                             │
  │  socket.connect()  →  gevent 非阻塞版本             │
  │  time.sleep()      →  gevent.sleep()(让出控制权)  │
  │  threading.Thread  →  gevent.Greenlet               │
  └─────────────────────────────────────────────────────┘

  对比:
  ┌──────────────┬──────────────────────────────────────┐
  │ asyncio      │ 必须用 async/await 重写所有 IO 代码   │
  │ gevent       │ patch_all() 后,原有同步代码自动并发  │
  └──────────────┴──────────────────────────────────────┘

  典型使用场景:
    # 遗留代码(同步 requests 库)
    import requests
    def fetch(url): return requests.get(url).status_code

    # 加一行 patch,自动并发
    import gevent.monkey; gevent.monkey.patch_all()
    import gevent
    jobs = [gevent.spawn(fetch, url) for url in urls]
    gevent.joinall(jobs)
    results = [j.value for j in jobs]
""")

    if not HAS_GEVENT:
        print("  [提示] 安装后可运行真实演示: pip install gevent")
        return

    # 演示 patch 前后 time.sleep 的行为差异
    gevent.monkey.patch_all()

    def worker(wid: int) -> float:
        t0 = time.perf_counter()
        time.sleep(0.1)   # patch 后自动变为 gevent.sleep
        return time.perf_counter() - t0

    # 串行
    t0 = time.perf_counter()
    for i in range(5):
        worker(i)
    serial = time.perf_counter() - t0

    # 并发(gevent.spawn)
    t0 = time.perf_counter()
    jobs = [gevent.spawn(worker, i) for i in range(5)]
    gevent.joinall(jobs)
    concurrent = time.perf_counter() - t0

    print(f"  同样的 time.sleep(0.1) × 5:")
    print(f"  串行执行: {serial:.2f}s")
    print(f"  gevent并发: {concurrent:.2f}s  (patch 后自动并发)")

def mode_benchmark() -> None:
    section("性能基准:协程 vs 线程 vs 串行(IO密集型)")

    N = 20          # 任务数
    DELAY = 0.05    # 每个任务 IO 耗时

    def io_work(task_id: int) -> None:
        time.sleep(DELAY)

    results: list[tuple[str, float]] = []

    # 串行
    t0 = time.perf_counter()
    for i in range(N):
        io_work(i)
    results.append(("串行", time.perf_counter() - t0))

    # 多线程
    t0 = time.perf_counter()
    threads = [threading.Thread(target=io_work, args=(i,)) for i in range(N)]
    [t.start() for t in threads]
    [t.join() for t in threads]
    results.append(("多线程", time.perf_counter() - t0))

    # gevent(若可用)
    if HAS_GEVENT:
        gevent.monkey.patch_all()
        t0 = time.perf_counter()
        jobs = [gevent.spawn(io_work, i) for i in range(N)]
        gevent.joinall(jobs)
        results.append(("gevent协程", time.perf_counter() - t0))

    serial_time = results[0][1]
    print(f"\n  {N} 个 IO 任务(每个 {DELAY}s),理论串行 {N*DELAY:.1f}s\n")
    print(f"  {'方案':<12} {'耗时':<10} {'加速比':<10} 条形图")
    print(f"  {'─'*55}")
    for name, elapsed in results:
        speedup = serial_time / elapsed
        bar = "█" * int(speedup * 4)
        print(f"  {name:<12} {elapsed:.3f}s    {speedup:.1f}x       {bar}")

    print(f"\n  💡 IO 密集型任务:协程 ≈ 线程(都能并发等待)")
    print(f"     CPU 密集型任务:协程无效(GIL/单线程),需多进程")

# Step 7:main 是命令行遥控器,--mode 决定看哪一种协程机制。
def main() -> None:
    parser = argparse.ArgumentParser(description="gevent/greenlet 协程库实战")
    parser.add_argument(
        "--mode",
        choices=["greenlet", "gevent", "patch", "benchmark", "all"],
        default="all",
    )
    args = parser.parse_args()
    dispatch = {
        "greenlet":  mode_greenlet,
        "gevent":    mode_gevent,
        "patch":     mode_patch,
        "benchmark": mode_benchmark,
        "all":       lambda: [mode_greenlet(), mode_gevent(),
                               mode_patch(), mode_benchmark()],
    }
    dispatch[args.mode]()

import sys
for mode in ["greenlet", "benchmark"]:
    print(f"\n>>> mode={mode}")
    sys.argv = ["prog", "--mode", mode]
    main()

极客实战:完整源码与运行

现在,把上面的积木拼起来,将以下完整代码放进你的编辑器,运行它。先看整体闭环,再回头逐段改参数,你会更容易建立工程直觉。

#!/usr/bin/env python3
"""
15-python-gevent.py — gevent/greenlet 协程库实战

用法:
  python3 15-python-gevent.py --mode greenlet   # greenlet 手动切换演示
  python3 15-python-gevent.py --mode gevent     # gevent 自动调度演示
  python3 15-python-gevent.py --mode patch      # monkey-patch 效果对比
  python3 15-python-gevent.py --mode benchmark  # 协程 vs 线程 vs 串行 性能对比
  python3 15-python-gevent.py --mode all        # 全部(默认)

依赖:pip install gevent greenlet
若未安装,自动降级为纯 Python 模拟演示。
"""

import argparse
import time
import threading
from typing import Callable

# ─── 依赖检测 ──────────────────────────────────────────────────────────────────

try:
    import gevent
    import gevent.monkey
    import gevent.pool
    from greenlet import greenlet
    HAS_GEVENT = True
except ImportError:
    HAS_GEVENT = False

# ─── 工具 ──────────────────────────────────────────────────────────────────────

def section(title: str) -> None:
    print(f"\n{'='*60}\n  {title}\n{'='*60}")

def ts() -> str:
    return f"{time.perf_counter():.3f}s"

# ─── 模式1:greenlet 手动切换 ──────────────────────────────────────────────────

def mode_greenlet() -> None:
    section("greenlet — 手动协程切换")

    if not HAS_GEVENT:
        print("\n  [模拟模式] greenlet 未安装,展示等效的 generator 实现\n")
        # 用 generator 模拟 greenlet 的手动切换语义
        execution_log: list[str] = []

        def task_a():
            execution_log.append("A: 开始")
            yield  # 切换到 B
            execution_log.append("A: 恢复,继续执行")
            yield  # 切换到 B
            execution_log.append("A: 完成")

        def task_b():
            execution_log.append("B: 开始")
            yield  # 切换回 A
            execution_log.append("B: 恢复,继续执行")

        gen_a, gen_b = task_a(), task_b()
        # 手动交替推进
        for _ in range(3):
            try: next(gen_a)
            except StopIteration: pass
            try: next(gen_b)
            except StopIteration: pass

        print("  执行顺序(手动切换):")
        for i, log in enumerate(execution_log, 1):
            print(f"  {i}. {log}")
        print("\n  💡 greenlet.switch() 等价于上面的 yield——程序员手动决定切换时机")
        return

    # 真实 greenlet 演示
    execution_log: list[str] = []

    def fun1() -> None:
        execution_log.append("fun1: 开始执行")
        execution_log.append("fun1: 切换到 fun2")
        gr2.switch()
        execution_log.append("fun1: 从 fun2 切回,继续执行")
        gr2.switch()
        execution_log.append("fun1: 完成")

    def fun2() -> None:
        execution_log.append("fun2: 开始执行")
        execution_log.append("fun2: 切换回 fun1")
        gr1.switch()
        execution_log.append("fun2: 再次执行,完成")

    gr1 = greenlet(fun1)
    gr2 = greenlet(fun2)
    gr1.switch()   # 启动 fun1

    print("\n  执行顺序(greenlet 手动切换):")
    for i, log in enumerate(execution_log, 1):
        print(f"  {i}. {log}")

    print(f"\n  关键:整个过程在单线程内完成,无任何系统调用")
    print(f"  gr1.dead={gr1.dead}  gr2.dead={gr2.dead}")

# ─── 模式2:gevent 自动调度 ────────────────────────────────────────────────────

def mode_gevent() -> None:
    section("gevent — 自动 IO 调度")

    if not HAS_GEVENT:
        print("\n  [模拟模式] gevent 未安装,用 threading 模拟并发效果\n")
        results: list[str] = []

        def simulated_task(task_id: int, delay: float) -> None:
            time.sleep(delay)   # 模拟 IO
            results.append(f"任务{task_id} 完成(耗时 {delay:.1f}s)")

        t0 = time.perf_counter()
        threads = [threading.Thread(target=simulated_task, args=(i, 0.1*(i+1)))
                   for i in range(5)]
        [t.start() for t in threads]
        [t.join() for t in threads]
        elapsed = time.perf_counter() - t0

        print(f"  5个任务并发执行,总耗时: {elapsed:.2f}s(串行需 {0.1*15:.1f}s)")
        for r in sorted(results):
            print(f"  {r}")
        return

    gevent.monkey.patch_all()

    task_log: list[tuple[str, float]] = []

    def io_task(task_id: int, duration: float) -> None:
        start = time.perf_counter()
        task_log.append((f"任务{task_id} 开始", time.perf_counter()))
        gevent.sleep(duration)   # 模拟 IO 阻塞,自动让出控制权
        task_log.append((f"任务{task_id} 完成({duration:.1f}s)", time.perf_counter()))

    durations = [0.3, 0.1, 0.2, 0.15, 0.25]
    t0 = time.perf_counter()

    # gevent.spawn 创建协程,joinall 等待全部完成
    greenlets = [gevent.spawn(io_task, i, d) for i, d in enumerate(durations)]
    gevent.joinall(greenlets)

    elapsed = time.perf_counter() - t0
    serial_time = sum(durations)

    print(f"\n  5个 IO 任务并发执行:")
    for msg, t in task_log:
        print(f"  [{t - t0:.3f}s] {msg}")
    print(f"\n  并发总耗时: {elapsed:.2f}s  串行需: {serial_time:.2f}s")
    print(f"  加速比: {serial_time/elapsed:.1f}x")

# ─── 模式3:monkey-patch 效果对比 ─────────────────────────────────────────────

def mode_patch() -> None:
    section("monkey-patch — 零改动让同步代码变并发")

    print("""
  monkey-patch 原理:
  ┌─────────────────────────────────────────────────────┐
  │  import gevent.monkey                               │
  │  gevent.monkey.patch_all()   ← 必须在所有 import 前 │
  │                                                     │
  │  之后:                                             │
  │  socket.connect()  →  gevent 非阻塞版本             │
  │  time.sleep()      →  gevent.sleep()(让出控制权)  │
  │  threading.Thread  →  gevent.Greenlet               │
  └─────────────────────────────────────────────────────┘

  对比:
  ┌──────────────┬──────────────────────────────────────┐
  │ asyncio      │ 必须用 async/await 重写所有 IO 代码   │
  │ gevent       │ patch_all() 后,原有同步代码自动并发  │
  └──────────────┴──────────────────────────────────────┘

  典型使用场景:
    # 遗留代码(同步 requests 库)
    import requests
    def fetch(url): return requests.get(url).status_code

    # 加一行 patch,自动并发
    import gevent.monkey; gevent.monkey.patch_all()
    import gevent
    jobs = [gevent.spawn(fetch, url) for url in urls]
    gevent.joinall(jobs)
    results = [j.value for j in jobs]
""")

    if not HAS_GEVENT:
        print("  [提示] 安装后可运行真实演示: pip install gevent")
        return

    # 演示 patch 前后 time.sleep 的行为差异
    gevent.monkey.patch_all()

    def worker(wid: int) -> float:
        t0 = time.perf_counter()
        time.sleep(0.1)   # patch 后自动变为 gevent.sleep
        return time.perf_counter() - t0

    # 串行
    t0 = time.perf_counter()
    for i in range(5):
        worker(i)
    serial = time.perf_counter() - t0

    # 并发(gevent.spawn)
    t0 = time.perf_counter()
    jobs = [gevent.spawn(worker, i) for i in range(5)]
    gevent.joinall(jobs)
    concurrent = time.perf_counter() - t0

    print(f"  同样的 time.sleep(0.1) × 5:")
    print(f"  串行执行: {serial:.2f}s")
    print(f"  gevent并发: {concurrent:.2f}s  (patch 后自动并发)")

# ─── 模式4:性能基准对比 ───────────────────────────────────────────────────────

def mode_benchmark() -> None:
    section("性能基准:协程 vs 线程 vs 串行(IO密集型)")

    N = 20          # 任务数
    DELAY = 0.05    # 每个任务 IO 耗时

    def io_work(task_id: int) -> None:
        time.sleep(DELAY)

    results: list[tuple[str, float]] = []

    # 串行
    t0 = time.perf_counter()
    for i in range(N):
        io_work(i)
    results.append(("串行", time.perf_counter() - t0))

    # 多线程
    t0 = time.perf_counter()
    threads = [threading.Thread(target=io_work, args=(i,)) for i in range(N)]
    [t.start() for t in threads]
    [t.join() for t in threads]
    results.append(("多线程", time.perf_counter() - t0))

    # gevent(若可用)
    if HAS_GEVENT:
        gevent.monkey.patch_all()
        t0 = time.perf_counter()
        jobs = [gevent.spawn(io_work, i) for i in range(N)]
        gevent.joinall(jobs)
        results.append(("gevent协程", time.perf_counter() - t0))

    serial_time = results[0][1]
    print(f"\n  {N} 个 IO 任务(每个 {DELAY}s),理论串行 {N*DELAY:.1f}s\n")
    print(f"  {'方案':<12} {'耗时':<10} {'加速比':<10} 条形图")
    print(f"  {'─'*55}")
    for name, elapsed in results:
        speedup = serial_time / elapsed
        bar = "█" * int(speedup * 4)
        print(f"  {name:<12} {elapsed:.3f}s    {speedup:.1f}x       {bar}")

    print(f"\n  💡 IO 密集型任务:协程 ≈ 线程(都能并发等待)")
    print(f"     CPU 密集型任务:协程无效(GIL/单线程),需多进程")

# ─── 入口 ─────────────────────────────────────────────────────────────────────

def main() -> None:
    parser = argparse.ArgumentParser(description="gevent/greenlet 协程库实战")
    parser.add_argument(
        "--mode",
        choices=["greenlet", "gevent", "patch", "benchmark", "all"],
        default="all",
    )
    args = parser.parse_args()
    dispatch = {
        "greenlet":  mode_greenlet,
        "gevent":    mode_gevent,
        "patch":     mode_patch,
        "benchmark": mode_benchmark,
        "all":       lambda: [mode_greenlet(), mode_gevent(),
                               mode_patch(), mode_benchmark()],
    }
    dispatch[args.mode]()


if __name__ == "__main__":
    main()
# 安装依赖(可选,未安装时自动降级为模拟模式)
$ pip install gevent greenlet

$ python3 15-python-gevent.py --mode all

============================================================
  greenlet — 手动协程切换
============================================================
  执行顺序(greenlet 手动切换):
  1. fun1: 开始执行
  2. fun1: 切换到 fun2
  3. fun2: 开始执行
  4. fun2: 切换回 fun1
  5. fun1: 从 fun2 切回,继续执行
  6. fun2: 再次执行,完成
  7. fun1: 完成

  关键:整个过程在单线程内完成,无任何系统调用
  gr1.dead=True  gr2.dead=True

============================================================
  gevent — 自动 IO 调度
============================================================
  5个 IO 任务并发执行:
  [0.000s] 任务0 开始
  [0.001s] 任务1 开始
  [0.001s] 任务2 开始
  [0.001s] 任务3 开始
  [0.001s] 任务4 开始
  [0.101s] 任务1 完成(0.1s)
  [0.151s] 任务3 完成(0.15s)
  [0.201s] 任务2 完成(0.2s)
  [0.251s] 任务4 完成(0.25s)
  [0.301s] 任务0 完成(0.3s)

  并发总耗时: 0.31s  串行需: 1.00s
  加速比: 3.2x

============================================================
  性能基准:协程 vs 线程 vs 串行(IO密集型)
============================================================
  20 个 IO 任务(每个 0.05s),理论串行 1.0s

  方案         耗时       加速比     条形图
  ───────────────────────────────────────────────────────
  串行         1.003s     1.0x       ████
  多线程       0.058s     17.3x      ████████████████████████████████████████████████████████████████████
  gevent协程   0.052s     19.3x      ████████████████████████████████████████████████████████████████████████████
  💡 IO 密集型任务:协程 ≈ 线程(都能并发等待)
     CPU 密集型任务:协程无效(GIL/单线程),需多进程

核心要点

greenlet vs gevent vs asyncio 选择:

新项目 IO 并发    →  asyncio(标准库,生态最好)
改造遗留同步代码  →  gevent + monkey-patch(零改动)
理解协程底层      →  greenlet(最接近原理)

gevent 的限制:

  • CPU 密集型任务无效(单线程,协程不能并行计算)
  • monkey-patch 必须在所有 import 之前执行
  • 与某些 C 扩展库不兼容(如 numpy 的部分操作)

gevent Pool 控制并发数:

import time

try:
    import gevent
    import gevent.pool
    HAS_GEVENT = True
except ImportError:
    HAS_GEVENT = False

def fetch(url: str) -> str:
    # 教学演示不访问真实网络,用 sleep 模拟一次 IO 等待。
    time.sleep(0.05)
    return f"OK: {url}"

urls = [f"https://example.com/{i}" for i in range(6)]

if HAS_GEVENT:
    pool = gevent.pool.Pool(3)   # 最多同时 3 个协程
    jobs = [pool.spawn(fetch, url) for url in urls]
    gevent.joinall(jobs)
    results = [job.value for job in jobs]
else:
    # 没安装 gevent 时仍然给出可见反馈,避免新手运行报错。
    results = [fetch(url) for url in urls]

print("结果数量:", len(results))
print("第一条:", results[0])

NexDo Time ⚡

gevent.pool.Pool 实现一个并发限速的 URL 批量检测器,控制最大并发数为 5:

import time

try:
    import gevent
    import gevent.pool
    HAS_GEVENT = True
except ImportError:
    HAS_GEVENT = False

def check(url: str) -> tuple[str, int]:
    # 零外部依赖:不用真的访问 httpbin,用 sleep 模拟网络等待。
    time.sleep(0.05)
    return url, 200

urls = [f"https://mock.local/delay/{i}" for i in range(10)]

if HAS_GEVENT:
    pool = gevent.pool.Pool(5)  # 最大并发数为 5
    results = pool.map(check, urls)
else:
    results = [check(url) for url in urls]

print("检测数量:", len(results))
print("前3条:", results[:3])

Don’t wait for next time, do it in the next moment.