文章

14 · 协程实战:asyncio 实现高并发

#014 · 2026-04-16 · Python

🔗 知识图谱导航:阅读本文前,建议先掌握/回顾 《13 · IO 多路复用:select/epoll 剖析》 中的核心概念;本文会在这个基础上继续推进。 上一篇用 selectors 手动管理事件循环;本篇让 asyncio 接管这一切,用 async/await 写出媲美 Go 的高并发代码。

极客解析:并发不是把代码写复杂,而是把等待、调度和资源隔离讲清楚;本文用本机可运行的 Mock 场景验证机制。

痛点与架构:单独记 API 或概念很容易学完就忘;本文先锁定真实痛点,再把它拆成“输入数据 → 核心机制 → 可运行输出”三段闭环。从 selectors 事件循环到 asyncio,掌握协程语法与高并发任务调度,实现并发 URL 健康检查器。

1. 并发模型三角对比

┌──────────────┬──────────────┬──────────────┬──────────────┐
│ 维度         │ 多进程        │ 多线程        │ 协程(asyncio)│
├──────────────┼──────────────┼──────────────┼──────────────┤
│ 切换开销     │ 高(进程上下文)│ 中(线程上下文)│ 极低(用户态) │
│ 内存占用     │ 高(独立堆栈) │ 中(~1MB/线程)│ 低(~几KB)    │
│ GIL影响      │ 无           │ 有(CPU密集)  │ 有(但不阻塞) │
│ 适合场景     │ CPU密集      │ IO密集+少量  │ IO密集+海量  │
│ 最大并发数   │ ~CPU核数     │ ~千级        │ ~万级        │
│ 调试难度     │ 中           │ 高(竞态条件) │ 中(单线程)   │
│ Python支持   │ multiprocess │ threading    │ asyncio(内置)│
└──────────────┴──────────────┴──────────────┴──────────────┘

2. async/await 核心语法

import asyncio


async def fetch(url: str) -> str:
    # await 不是卡死线程,而是把控制权还给事件循环,让别的协程先跑。
    await asyncio.sleep(0.2)   # 用短等待模拟一次网络 IO
    return f"200 OK: {url}"

# 运行协程
result = asyncio.run(fetch("https://example.com"))
print("协程返回:", result)

事件循环工作原理:

asyncio.run()
    │
    ▼
┌─────────────────────────────────────┐
│           Event Loop                │
│                                     │
│  Task1: fetch(url1) ──await──▶ 挂起 │
│  Task2: fetch(url2) ──await──▶ 挂起 │
│  Task3: fetch(url3) ──await──▶ 挂起 │
│                                     │
│  IO就绪回调 ──▶ 恢复对应Task        │
│  所有Task交替执行,单线程无竞态      │
└─────────────────────────────────────┘

3. 实战代码

步步为营:核心逻辑自适应拆解

协程不是多线程,而是“遇到等待就主动让出控制权”。你可以把它想成厨师等水烧开时先去切菜:一个线程也能同时推进很多 IO 任务。下面每个演示都补了注释和输出,先看单个协程,再看批量调度。

Step 1:用 CheckResult 把健康检查结果装进标准数据盒

痛点与机制

并发任务一多,返回值就不能是散乱字符串。CheckResult 把每次健康检查的结果固定成一个对象,后面无论是表格展示、统计健康率,还是计算平均延迟,都能按字段读取。

核心源码(逐字来自文末完整源码)

@dataclass
class CheckResult:
    url: str
    status: int
    latency_ms: float
    ok: bool

可运行演示(补齐 Mock 数据与 print 反馈)

from dataclasses import dataclass

# Step 1:dataclass 像一张固定格式的体检单,URL、状态码、延迟、是否健康都有格子。
@dataclass
class CheckResult:
    url: str
    status: int
    latency_ms: float
    ok: bool

result = CheckResult(url="https://api.example.com", status=200, latency_ms=123.4, ok=True)
print("完整结果:", result)
print("是否健康:", result.ok)

Step 2:用 async mock_http_get 模拟一次可等待的网络请求

痛点与机制

协程的关键是 await。它像厨师等水烧开时先去切菜:当前任务暂停,但线程没有闲着,事件循环可以调度别的协程。这里用 asyncio.sleep() 模拟网络等待,做到零真实网络依赖。

核心源码(逐字来自文末完整源码)

async def mock_http_get(url: str) -> tuple[int, float]:
    """模拟网络延迟:随机 50~500ms,偶发 503"""
    delay = random.uniform(0.05, 0.5)
    await asyncio.sleep(delay)
    status = 503 if random.random() < 0.15 else 200
    return status, delay * 1000

可运行演示(补齐 Mock 数据与 print 反馈)

import asyncio
import random

# Step 2:await asyncio.sleep 不是傻等,而是把控制权还给事件循环,让别的任务先跑。
async def mock_http_get(url: str) -> tuple[int, float]:
    """模拟网络延迟:随机 50~500ms,偶发 503"""
    delay = random.uniform(0.05, 0.5)
    await asyncio.sleep(delay)
    status = 503 if random.random() < 0.15 else 200
    return status, delay * 1000

async def main() -> None:
    random.seed(42)
    status, latency = await mock_http_get("https://api.example.com")
    print("状态码:", status)
    print(f"模拟延迟: {latency:.1f}ms")

asyncio.run(main())

Step 3:用 check_url 把请求封装成一次完整健康检查

痛点与机制

check_url() 把“发请求”和“生成结果对象”包成一个协程。它不是马上返回结果,而是返回一个可等待任务;只有 await check_url(...) 时,事件循环才会真正调度它运行。

核心源码(逐字来自文末完整源码)

async def check_url(url: str) -> CheckResult:
    t0 = time.perf_counter()
    status, latency = await mock_http_get(url)
    return CheckResult(url=url, status=status, latency_ms=latency, ok=(status == 200))

可运行演示(补齐 Mock 数据与 print 反馈)

import asyncio
import random
import time
from dataclasses import dataclass

@dataclass
class CheckResult:
    url: str
    status: int
    latency_ms: float
    ok: bool

async def mock_http_get(url: str) -> tuple[int, float]:
    """模拟网络延迟:随机 50~500ms,偶发 503"""
    delay = random.uniform(0.05, 0.5)
    await asyncio.sleep(delay)
    status = 503 if random.random() < 0.15 else 200
    return status, delay * 1000

# Step 3:check_url 是单个 URL 的检查员,负责请求并打包结果。
async def check_url(url: str) -> CheckResult:
    t0 = time.perf_counter()
    status, latency = await mock_http_get(url)
    return CheckResult(url=url, status=status, latency_ms=latency, ok=(status == 200))

async def main() -> None:
    random.seed(7)
    result = await check_url("https://blog.21zhao.com/ping")
    print("URL:", result.url)
    print("状态:", result.status)
    print(f"延迟: {result.latency_ms:.1f}ms")
    print("健康:", result.ok)

asyncio.run(main())

Step 4:用 demo_gather 对比串行 await 和 gather 并发 await

痛点与机制

串行写法是“检查完 A 再检查 B”,总耗时接近所有等待时间相加;asyncio.gather() 是“同时派出去”,总耗时更接近最慢的那一个请求。注意:这不是多线程,而是单线程事件循环在等待期间切换任务。

核心源码(逐字来自文末完整源码)

async def demo_gather() -> None:
    urls = [
        "https://api.21zhao.com/health",
        "https://blog.21zhao.com/ping",
        "https://todo.21zhao.com/status",
        "https://jupy.21zhao.com/api",
        "https://ai.21zhao.com/v1/models",
        "https://www.21zhao.com/sitemap.xml",
    ]

    # 串行
    t0 = time.perf_counter()
    serial_results: list[CheckResult] = []
    for url in urls:
        serial_results.append(await check_url(url))
    serial_time = time.perf_counter() - t0

    # 并发
    t0 = time.perf_counter()
    concurrent_results: list[CheckResult] = await asyncio.gather(
        *[check_url(url) for url in urls]
    )
    concurrent_time = time.perf_counter() - t0

    print("\n=== asyncio.gather 并发 vs 串行 ===")
    print(f"{'URL':<42} {'状态':<6} {'延迟(ms)':<10} {'结果'}")
    print("-" * 70)
    for r in concurrent_results:
        icon = "✓" if r.ok else "✗"
        print(f"{r.url:<42} {r.status:<6} {r.latency_ms:<10.1f} {icon}")

    print(f"\n{'串行总耗时':<12}: {serial_time:.3f}s")
    print(f"{'并发总耗时':<12}: {concurrent_time:.3f}s")
    print(f"{'加速比':<12}: {serial_time / concurrent_time:.1f}x")
    ok_count = sum(1 for r in concurrent_results if r.ok)
    print(f"{'健康率':<12}: {ok_count}/{len(urls)}")

可运行演示(补齐 Mock 数据与 print 反馈)

import asyncio
import random
import time
from dataclasses import dataclass

@dataclass
class CheckResult:
    url: str
    status: int
    latency_ms: float
    ok: bool

async def mock_http_get(url: str) -> tuple[int, float]:
    """模拟网络延迟:随机 50~500ms,偶发 503"""
    delay = random.uniform(0.05, 0.5)
    await asyncio.sleep(delay)
    status = 503 if random.random() < 0.15 else 200
    return status, delay * 1000

async def check_url(url: str) -> CheckResult:
    t0 = time.perf_counter()
    status, latency = await mock_http_get(url)
    return CheckResult(url=url, status=status, latency_ms=latency, ok=(status == 200))

# Step 4:gather 像一次派出多名检查员,同时检查多个 URL。
async def demo_gather() -> None:
    urls = [
        "https://api.21zhao.com/health",
        "https://blog.21zhao.com/ping",
        "https://todo.21zhao.com/status",
        "https://jupy.21zhao.com/api",
        "https://ai.21zhao.com/v1/models",
        "https://www.21zhao.com/sitemap.xml",
    ]

    # 串行
    t0 = time.perf_counter()
    serial_results: list[CheckResult] = []
    for url in urls:
        serial_results.append(await check_url(url))
    serial_time = time.perf_counter() - t0

    # 并发
    t0 = time.perf_counter()
    concurrent_results: list[CheckResult] = await asyncio.gather(
        *[check_url(url) for url in urls]
    )
    concurrent_time = time.perf_counter() - t0

    print("\n=== asyncio.gather 并发 vs 串行 ===")
    print(f"{'URL':<42} {'状态':<6} {'延迟(ms)':<10} {'结果'}")
    print("-" * 70)
    for r in concurrent_results:
        icon = "✓" if r.ok else "✗"
        print(f"{r.url:<42} {r.status:<6} {r.latency_ms:<10.1f} {icon}")

    print(f"\n{'串行总耗时':<12}: {serial_time:.3f}s")
    print(f"{'并发总耗时':<12}: {concurrent_time:.3f}s")
    print(f"{'加速比':<12}: {serial_time / concurrent_time:.1f}x")
    ok_count = sum(1 for r in concurrent_results if r.ok)
    print(f"{'健康率':<12}: {ok_count}/{len(urls)}")

random.seed(42)
asyncio.run(demo_gather())

Step 5:用 producer/worker 理解 Queue 的生产者消费者模型

痛点与机制

producer() 负责把 URL 放进队列,worker() 负责从队列取 URL 并检查。__DONE__ 是结束信号,告诉 worker “不用等了,可以下班”。queue.task_done() 用来标记一个任务处理完,是队列协作里的礼貌收尾。

核心源码(逐字来自文末完整源码)

async def producer(queue: asyncio.Queue[str], urls: list[str]) -> None:
    for url in urls:
        await queue.put(url)
        await asyncio.sleep(0.02)  # 模拟逐步产生任务
    # 发送结束信号
    for _ in range(3):  # 3个worker
        await queue.put("__DONE__")


async def worker(wid: int, queue: asyncio.Queue[str], results: list[CheckResult]) -> None:
    while True:
        url = await queue.get()
        if url == "__DONE__":
            queue.task_done()
            break
        result = await check_url(url)
        results.append(result)
        status_icon = "✓" if result.ok else "✗"
        print(f"  Worker-{wid} {status_icon} {url:<40} {result.latency_ms:.0f}ms")
        queue.task_done()

可运行演示(补齐 Mock 数据与 print 反馈)

import asyncio
import random
import time
from dataclasses import dataclass

@dataclass
class CheckResult:
    url: str
    status: int
    latency_ms: float
    ok: bool

async def mock_http_get(url: str) -> tuple[int, float]:
    """模拟网络延迟:随机 50~500ms,偶发 503"""
    delay = random.uniform(0.05, 0.5)
    await asyncio.sleep(delay)
    status = 503 if random.random() < 0.15 else 200
    return status, delay * 1000

async def check_url(url: str) -> CheckResult:
    t0 = time.perf_counter()
    status, latency = await mock_http_get(url)
    return CheckResult(url=url, status=status, latency_ms=latency, ok=(status == 200))

# Step 5:Queue 像餐厅取餐口,producer 放任务,worker 取任务处理。
async def producer(queue: asyncio.Queue[str], urls: list[str]) -> None:
    for url in urls:
        await queue.put(url)
        await asyncio.sleep(0.02)  # 模拟逐步产生任务
    # 发送结束信号
    for _ in range(3):  # 3个worker
        await queue.put("__DONE__")


async def worker(wid: int, queue: asyncio.Queue[str], results: list[CheckResult]) -> None:
    while True:
        url = await queue.get()
        if url == "__DONE__":
            queue.task_done()
            break
        result = await check_url(url)
        results.append(result)
        status_icon = "✓" if result.ok else "✗"
        print(f"  Worker-{wid} {status_icon} {url:<40} {result.latency_ms:.0f}ms")
        queue.task_done()

async def main() -> None:
    random.seed(1)
    queue: asyncio.Queue[str] = asyncio.Queue(maxsize=2)
    results: list[CheckResult] = []
    urls = [f"https://service-{i}.example.com" for i in range(1, 4)]
    await asyncio.gather(producer(queue, urls), worker(1, queue, results), worker(2, queue, results), worker(3, queue, results))
    print("完成数量:", len(results))

asyncio.run(main())

Step 6:用 demo_queue 跑完整的异步任务队列

痛点与机制

任务队列适合“任务不断产生、多个工人并发处理”的场景。这里队列容量是 5,worker 数是 3,新手可以看到多个 worker 交替打印结果,这就是协程并发的直观表现。

核心源码(逐字来自文末完整源码)

async def demo_queue() -> None:
    urls = [f"https://service-{i}.21zhao.com/health" for i in range(1, 10)]
    queue: asyncio.Queue[str] = asyncio.Queue(maxsize=5)
    results: list[CheckResult] = []

    print("\n=== asyncio.Queue 生产者-消费者模式 ===")
    print(f"任务数: {len(urls)}  Worker数: 3  队列容量: 5\n")

    await asyncio.gather(
        producer(queue, urls),
        worker(1, queue, results),
        worker(2, queue, results),
        worker(3, queue, results),
    )

    ok = sum(1 for r in results if r.ok)
    avg_latency = sum(r.latency_ms for r in results) / len(results)
    print(f"\n{'完成任务':<10}: {len(results)}")
    print(f"{'成功':<10}: {ok}  失败: {len(results) - ok}")
    print(f"{'平均延迟':<10}: {avg_latency:.1f}ms")

可运行演示(补齐 Mock 数据与 print 反馈)

import asyncio
import random
import time
from dataclasses import dataclass

@dataclass
class CheckResult:
    url: str
    status: int
    latency_ms: float
    ok: bool

async def mock_http_get(url: str) -> tuple[int, float]:
    """模拟网络延迟:随机 50~500ms,偶发 503"""
    delay = random.uniform(0.05, 0.5)
    await asyncio.sleep(delay)
    status = 503 if random.random() < 0.15 else 200
    return status, delay * 1000

async def check_url(url: str) -> CheckResult:
    t0 = time.perf_counter()
    status, latency = await mock_http_get(url)
    return CheckResult(url=url, status=status, latency_ms=latency, ok=(status == 200))

async def producer(queue: asyncio.Queue[str], urls: list[str]) -> None:
    for url in urls:
        await queue.put(url)
        await asyncio.sleep(0.02)  # 模拟逐步产生任务
    # 发送结束信号
    for _ in range(3):  # 3个worker
        await queue.put("__DONE__")


async def worker(wid: int, queue: asyncio.Queue[str], results: list[CheckResult]) -> None:
    while True:
        url = await queue.get()
        if url == "__DONE__":
            queue.task_done()
            break
        result = await check_url(url)
        results.append(result)
        status_icon = "✓" if result.ok else "✗"
        print(f"  Worker-{wid} {status_icon} {url:<40} {result.latency_ms:.0f}ms")
        queue.task_done()

# Step 6:完整队列演示会启动 1 个生产者 + 3 个 worker。
async def demo_queue() -> None:
    urls = [f"https://service-{i}.21zhao.com/health" for i in range(1, 10)]
    queue: asyncio.Queue[str] = asyncio.Queue(maxsize=5)
    results: list[CheckResult] = []

    print("\n=== asyncio.Queue 生产者-消费者模式 ===")
    print(f"任务数: {len(urls)}  Worker数: 3  队列容量: 5\n")

    await asyncio.gather(
        producer(queue, urls),
        worker(1, queue, results),
        worker(2, queue, results),
        worker(3, queue, results),
    )

    ok = sum(1 for r in results if r.ok)
    avg_latency = sum(r.latency_ms for r in results) / len(results)
    print(f"\n{'完成任务':<10}: {len(results)}")
    print(f"{'成功':<10}: {ok}  失败: {len(results) - ok}")
    print(f"{'平均延迟':<10}: {avg_latency:.1f}ms")

random.seed(42)
asyncio.run(demo_queue())

Step 7:用 demo_health 加超时控制,避免慢请求拖垮全局

痛点与机制

真实健康检查不能无限等。asyncio.wait_for() 给每个检查设置超时,慢到超过阈值就返回 408。这样仪表盘不会被一个卡住的服务拖死。

核心源码(逐字来自文末完整源码)

async def demo_health() -> None:
    """带超时控制的健康检查"""

    async def check_with_timeout(url: str, timeout: float) -> CheckResult:
        try:
            return await asyncio.wait_for(check_url(url), timeout=timeout)
        except asyncio.TimeoutError:
            return CheckResult(url=url, status=408, latency_ms=timeout * 1000, ok=False)

    endpoints = {
        "主站": "https://21zhao.com",
        "博客": "https://blog.21zhao.com",
        "Todo": "https://todo.21zhao.com",
        "Jupyter": "https://jupy.21zhao.com",
        "AI代理": "https://ai.21zhao.com",
        "YouCare": "https://21zhao.com/youcare/",
    }

    print("\n=== 服务健康检查仪表盘 ===")
    t0 = time.perf_counter()

    tasks = [check_with_timeout(url, timeout=0.3) for url in endpoints.values()]
    results = await asyncio.gather(*tasks)

    elapsed = time.perf_counter() - t0

    print(f"\n{'服务':<10} {'URL':<38} {'状态码':<8} {'延迟':<10} {'健康'}")
    print("-" * 75)
    for name, result in zip(endpoints.keys(), results):
        health = "🟢 UP" if result.ok else "🔴 DOWN"
        print(f"{name:<10} {result.url:<38} {result.status:<8} {result.latency_ms:<10.0f}ms {health}")

    total = len(results)
    healthy = sum(1 for r in results if r.ok)
    print(f"\n检查完成: {elapsed:.3f}s | 健康: {healthy}/{total} | "
          f"可用率: {healthy/total*100:.0f}%")

可运行演示(补齐 Mock 数据与 print 反馈)

import asyncio
import random
import time
from dataclasses import dataclass

@dataclass
class CheckResult:
    url: str
    status: int
    latency_ms: float
    ok: bool

async def mock_http_get(url: str) -> tuple[int, float]:
    """模拟网络延迟:随机 50~500ms,偶发 503"""
    delay = random.uniform(0.05, 0.5)
    await asyncio.sleep(delay)
    status = 503 if random.random() < 0.15 else 200
    return status, delay * 1000

async def check_url(url: str) -> CheckResult:
    t0 = time.perf_counter()
    status, latency = await mock_http_get(url)
    return CheckResult(url=url, status=status, latency_ms=latency, ok=(status == 200))

# Step 7:wait_for 像给请求设置闹钟,超过时间就判定超时。
async def demo_health() -> None:
    """带超时控制的健康检查"""

    async def check_with_timeout(url: str, timeout: float) -> CheckResult:
        try:
            return await asyncio.wait_for(check_url(url), timeout=timeout)
        except asyncio.TimeoutError:
            return CheckResult(url=url, status=408, latency_ms=timeout * 1000, ok=False)

    endpoints = {
        "主站": "https://21zhao.com",
        "博客": "https://blog.21zhao.com",
        "Todo": "https://todo.21zhao.com",
        "Jupyter": "https://jupy.21zhao.com",
        "AI代理": "https://ai.21zhao.com",
        "YouCare": "https://21zhao.com/youcare/",
    }

    print("\n=== 服务健康检查仪表盘 ===")
    t0 = time.perf_counter()

    tasks = [check_with_timeout(url, timeout=0.3) for url in endpoints.values()]
    results = await asyncio.gather(*tasks)

    elapsed = time.perf_counter() - t0

    print(f"\n{'服务':<10} {'URL':<38} {'状态码':<8} {'延迟':<10} {'健康'}")
    print("-" * 75)
    for name, result in zip(endpoints.keys(), results):
        health = "🟢 UP" if result.ok else "🔴 DOWN"
        print(f"{name:<10} {result.url:<38} {result.status:<8} {result.latency_ms:<10.0f}ms {health}")

    total = len(results)
    healthy = sum(1 for r in results if r.ok)
    print(f"\n检查完成: {elapsed:.3f}s | 健康: {healthy}/{total} | "
          f"可用率: {healthy/total*100:.0f}%")

random.seed(42)
asyncio.run(demo_health())

Step 8:用 main 做 gather/queue/health 三种模式总入口

痛点与机制

asyncio.run() 是事件循环入口。main() 根据 --mode 决定运行哪组协程:gather 看并发对比,queue 看生产者消费者,health 看健康检查仪表盘。

核心源码(逐字来自文末完整源码)

def main() -> None:
    parser = argparse.ArgumentParser(description="asyncio 高并发演示")
    parser.add_argument(
        "--mode",
        choices=["gather", "queue", "health"],
        default="health",
        help="gather=并发对比, queue=任务队列, health=健康检查仪表盘",
    )
    args = parser.parse_args()

    random.seed(42)  # 固定随机种子,结果可复现

    if args.mode == "gather":
        asyncio.run(demo_gather())
    elif args.mode == "queue":
        asyncio.run(demo_queue())
    else:
        asyncio.run(demo_health())

可运行演示(补齐 Mock 数据与 print 反馈)

import argparse
import asyncio
import random
import time
from dataclasses import dataclass

@dataclass
class CheckResult:
    url: str
    status: int
    latency_ms: float
    ok: bool

async def mock_http_get(url: str) -> tuple[int, float]:
    """模拟网络延迟:随机 50~500ms,偶发 503"""
    delay = random.uniform(0.05, 0.5)
    await asyncio.sleep(delay)
    status = 503 if random.random() < 0.15 else 200
    return status, delay * 1000

async def check_url(url: str) -> CheckResult:
    t0 = time.perf_counter()
    status, latency = await mock_http_get(url)
    return CheckResult(url=url, status=status, latency_ms=latency, ok=(status == 200))

async def demo_gather() -> None:
    urls = [
        "https://api.21zhao.com/health",
        "https://blog.21zhao.com/ping",
        "https://todo.21zhao.com/status",
        "https://jupy.21zhao.com/api",
        "https://ai.21zhao.com/v1/models",
        "https://www.21zhao.com/sitemap.xml",
    ]

    # 串行
    t0 = time.perf_counter()
    serial_results: list[CheckResult] = []
    for url in urls:
        serial_results.append(await check_url(url))
    serial_time = time.perf_counter() - t0

    # 并发
    t0 = time.perf_counter()
    concurrent_results: list[CheckResult] = await asyncio.gather(
        *[check_url(url) for url in urls]
    )
    concurrent_time = time.perf_counter() - t0

    print("\n=== asyncio.gather 并发 vs 串行 ===")
    print(f"{'URL':<42} {'状态':<6} {'延迟(ms)':<10} {'结果'}")
    print("-" * 70)
    for r in concurrent_results:
        icon = "✓" if r.ok else "✗"
        print(f"{r.url:<42} {r.status:<6} {r.latency_ms:<10.1f} {icon}")

    print(f"\n{'串行总耗时':<12}: {serial_time:.3f}s")
    print(f"{'并发总耗时':<12}: {concurrent_time:.3f}s")
    print(f"{'加速比':<12}: {serial_time / concurrent_time:.1f}x")
    ok_count = sum(1 for r in concurrent_results if r.ok)
    print(f"{'健康率':<12}: {ok_count}/{len(urls)}")

async def producer(queue: asyncio.Queue[str], urls: list[str]) -> None:
    for url in urls:
        await queue.put(url)
        await asyncio.sleep(0.02)  # 模拟逐步产生任务
    # 发送结束信号
    for _ in range(3):  # 3个worker
        await queue.put("__DONE__")


async def worker(wid: int, queue: asyncio.Queue[str], results: list[CheckResult]) -> None:
    while True:
        url = await queue.get()
        if url == "__DONE__":
            queue.task_done()
            break
        result = await check_url(url)
        results.append(result)
        status_icon = "✓" if result.ok else "✗"
        print(f"  Worker-{wid} {status_icon} {url:<40} {result.latency_ms:.0f}ms")
        queue.task_done()

async def demo_queue() -> None:
    urls = [f"https://service-{i}.21zhao.com/health" for i in range(1, 10)]
    queue: asyncio.Queue[str] = asyncio.Queue(maxsize=5)
    results: list[CheckResult] = []

    print("\n=== asyncio.Queue 生产者-消费者模式 ===")
    print(f"任务数: {len(urls)}  Worker数: 3  队列容量: 5\n")

    await asyncio.gather(
        producer(queue, urls),
        worker(1, queue, results),
        worker(2, queue, results),
        worker(3, queue, results),
    )

    ok = sum(1 for r in results if r.ok)
    avg_latency = sum(r.latency_ms for r in results) / len(results)
    print(f"\n{'完成任务':<10}: {len(results)}")
    print(f"{'成功':<10}: {ok}  失败: {len(results) - ok}")
    print(f"{'平均延迟':<10}: {avg_latency:.1f}ms")

async def demo_health() -> None:
    """带超时控制的健康检查"""

    async def check_with_timeout(url: str, timeout: float) -> CheckResult:
        try:
            return await asyncio.wait_for(check_url(url), timeout=timeout)
        except asyncio.TimeoutError:
            return CheckResult(url=url, status=408, latency_ms=timeout * 1000, ok=False)

    endpoints = {
        "主站": "https://21zhao.com",
        "博客": "https://blog.21zhao.com",
        "Todo": "https://todo.21zhao.com",
        "Jupyter": "https://jupy.21zhao.com",
        "AI代理": "https://ai.21zhao.com",
        "YouCare": "https://21zhao.com/youcare/",
    }

    print("\n=== 服务健康检查仪表盘 ===")
    t0 = time.perf_counter()

    tasks = [check_with_timeout(url, timeout=0.3) for url in endpoints.values()]
    results = await asyncio.gather(*tasks)

    elapsed = time.perf_counter() - t0

    print(f"\n{'服务':<10} {'URL':<38} {'状态码':<8} {'延迟':<10} {'健康'}")
    print("-" * 75)
    for name, result in zip(endpoints.keys(), results):
        health = "🟢 UP" if result.ok else "🔴 DOWN"
        print(f"{name:<10} {result.url:<38} {result.status:<8} {result.latency_ms:<10.0f}ms {health}")

    total = len(results)
    healthy = sum(1 for r in results if r.ok)
    print(f"\n检查完成: {elapsed:.3f}s | 健康: {healthy}/{total} | "
          f"可用率: {healthy/total*100:.0f}%")

# Step 8:main 是命令行遥控器,--mode 决定跑哪种异步实验。
def main() -> None:
    parser = argparse.ArgumentParser(description="asyncio 高并发演示")
    parser.add_argument(
        "--mode",
        choices=["gather", "queue", "health"],
        default="health",
        help="gather=并发对比, queue=任务队列, health=健康检查仪表盘",
    )
    args = parser.parse_args()

    random.seed(42)  # 固定随机种子,结果可复现

    if args.mode == "gather":
        asyncio.run(demo_gather())
    elif args.mode == "queue":
        asyncio.run(demo_queue())
    else:
        asyncio.run(demo_health())

import sys
for mode in ["health", "queue"]:
    print(f"\n>>> mode={mode}")
    sys.argv = ["prog", "--mode", mode]
    main()

极客实战:完整源码与运行

现在,把上面的积木拼起来,将以下完整代码放进你的编辑器,运行它。先看整体闭环,再回头逐段改参数,你会更容易建立工程直觉。

# 文件名: 14-python-coroutine.py
# 运行: python3 14-python-coroutine.py --mode health
#       python3 14-python-coroutine.py --mode queue
#       python3 14-python-coroutine.py --mode gather

import argparse
import asyncio
import random
import time
from dataclasses import dataclass, field
from typing import Any


# ── 数据模型 ────────────────────────────────────────────────
@dataclass
class CheckResult:
    url: str
    status: int
    latency_ms: float
    ok: bool


# ── 模拟HTTP请求(零网络依赖)──────────────────────────────
async def mock_http_get(url: str) -> tuple[int, float]:
    """模拟网络延迟:随机 50~500ms,偶发 503"""
    delay = random.uniform(0.05, 0.5)
    await asyncio.sleep(delay)
    status = 503 if random.random() < 0.15 else 200
    return status, delay * 1000


# ── 单个URL检查 ─────────────────────────────────────────────
async def check_url(url: str) -> CheckResult:
    t0 = time.perf_counter()
    status, latency = await mock_http_get(url)
    return CheckResult(url=url, status=status, latency_ms=latency, ok=(status == 200))


# ── 模式1:gather并发 vs 串行对比 ───────────────────────────
async def demo_gather() -> None:
    urls = [
        "https://api.21zhao.com/health",
        "https://blog.21zhao.com/ping",
        "https://todo.21zhao.com/status",
        "https://jupy.21zhao.com/api",
        "https://ai.21zhao.com/v1/models",
        "https://www.21zhao.com/sitemap.xml",
    ]

    # 串行
    t0 = time.perf_counter()
    serial_results: list[CheckResult] = []
    for url in urls:
        serial_results.append(await check_url(url))
    serial_time = time.perf_counter() - t0

    # 并发
    t0 = time.perf_counter()
    concurrent_results: list[CheckResult] = await asyncio.gather(
        *[check_url(url) for url in urls]
    )
    concurrent_time = time.perf_counter() - t0

    print("\n=== asyncio.gather 并发 vs 串行 ===")
    print(f"{'URL':<42} {'状态':<6} {'延迟(ms)':<10} {'结果'}")
    print("-" * 70)
    for r in concurrent_results:
        icon = "✓" if r.ok else "✗"
        print(f"{r.url:<42} {r.status:<6} {r.latency_ms:<10.1f} {icon}")

    print(f"\n{'串行总耗时':<12}: {serial_time:.3f}s")
    print(f"{'并发总耗时':<12}: {concurrent_time:.3f}s")
    print(f"{'加速比':<12}: {serial_time / concurrent_time:.1f}x")
    ok_count = sum(1 for r in concurrent_results if r.ok)
    print(f"{'健康率':<12}: {ok_count}/{len(urls)}")


# ── 模式2:asyncio.Queue 任务队列 ───────────────────────────
async def producer(queue: asyncio.Queue[str], urls: list[str]) -> None:
    for url in urls:
        await queue.put(url)
        await asyncio.sleep(0.02)  # 模拟逐步产生任务
    # 发送结束信号
    for _ in range(3):  # 3个worker
        await queue.put("__DONE__")


async def worker(wid: int, queue: asyncio.Queue[str], results: list[CheckResult]) -> None:
    while True:
        url = await queue.get()
        if url == "__DONE__":
            queue.task_done()
            break
        result = await check_url(url)
        results.append(result)
        status_icon = "✓" if result.ok else "✗"
        print(f"  Worker-{wid} {status_icon} {url:<40} {result.latency_ms:.0f}ms")
        queue.task_done()


async def demo_queue() -> None:
    urls = [f"https://service-{i}.21zhao.com/health" for i in range(1, 10)]
    queue: asyncio.Queue[str] = asyncio.Queue(maxsize=5)
    results: list[CheckResult] = []

    print("\n=== asyncio.Queue 生产者-消费者模式 ===")
    print(f"任务数: {len(urls)}  Worker数: 3  队列容量: 5\n")

    await asyncio.gather(
        producer(queue, urls),
        worker(1, queue, results),
        worker(2, queue, results),
        worker(3, queue, results),
    )

    ok = sum(1 for r in results if r.ok)
    avg_latency = sum(r.latency_ms for r in results) / len(results)
    print(f"\n{'完成任务':<10}: {len(results)}")
    print(f"{'成功':<10}: {ok}  失败: {len(results) - ok}")
    print(f"{'平均延迟':<10}: {avg_latency:.1f}ms")


# ── 模式3:健康检查仪表盘 ───────────────────────────────────
async def demo_health() -> None:
    """带超时控制的健康检查"""

    async def check_with_timeout(url: str, timeout: float) -> CheckResult:
        try:
            return await asyncio.wait_for(check_url(url), timeout=timeout)
        except asyncio.TimeoutError:
            return CheckResult(url=url, status=408, latency_ms=timeout * 1000, ok=False)

    endpoints = {
        "主站": "https://21zhao.com",
        "博客": "https://blog.21zhao.com",
        "Todo": "https://todo.21zhao.com",
        "Jupyter": "https://jupy.21zhao.com",
        "AI代理": "https://ai.21zhao.com",
        "YouCare": "https://21zhao.com/youcare/",
    }

    print("\n=== 服务健康检查仪表盘 ===")
    t0 = time.perf_counter()

    tasks = [check_with_timeout(url, timeout=0.3) for url in endpoints.values()]
    results = await asyncio.gather(*tasks)

    elapsed = time.perf_counter() - t0

    print(f"\n{'服务':<10} {'URL':<38} {'状态码':<8} {'延迟':<10} {'健康'}")
    print("-" * 75)
    for name, result in zip(endpoints.keys(), results):
        health = "🟢 UP" if result.ok else "🔴 DOWN"
        print(f"{name:<10} {result.url:<38} {result.status:<8} {result.latency_ms:<10.0f}ms {health}")

    total = len(results)
    healthy = sum(1 for r in results if r.ok)
    print(f"\n检查完成: {elapsed:.3f}s | 健康: {healthy}/{total} | "
          f"可用率: {healthy/total*100:.0f}%")


def main() -> None:
    parser = argparse.ArgumentParser(description="asyncio 高并发演示")
    parser.add_argument(
        "--mode",
        choices=["gather", "queue", "health"],
        default="health",
        help="gather=并发对比, queue=任务队列, health=健康检查仪表盘",
    )
    args = parser.parse_args()

    random.seed(42)  # 固定随机种子,结果可复现

    if args.mode == "gather":
        asyncio.run(demo_gather())
    elif args.mode == "queue":
        asyncio.run(demo_queue())
    else:
        asyncio.run(demo_health())


if __name__ == "__main__":
    main()

运行后你会看到类似输出:

$ python3 14-python-coroutine.py
=== 服务健康检查仪表盘 ===

服务         URL                                    状态码      延迟         健康
---------------------------------------------------------------------------
主站         https://21zhao.com                     408      300       ms 🔴 DOWN
博客         https://blog.21zhao.com                200      61        ms 🟢 UP
Todo       https://todo.21zhao.com                200      174       ms 🟢 UP
Jupyter    https://jupy.21zhao.com                503      150       ms 🔴 DOWN
AI代理       https://ai.21zhao.com                  408      300       ms 🔴 DOWN
YouCare    https://21zhao.com/youcare/            408      300       ms 🔴 DOWN

检查完成: 0.302s | 健康: 2/6 | 可用率: 33%

4. 运行示例

python3 14-python-coroutine.py --mode health   # 健康检查仪表盘(默认)
python3 14-python-coroutine.py --mode gather   # 并发vs串行耗时对比
python3 14-python-coroutine.py --mode queue    # 生产者-消费者队列

5. 关键要点

概念 说明
async def 定义协程函数,调用返回协程对象
await 挂起当前协程,让出事件循环控制权
asyncio.gather() 并发运行多个协程,等待全部完成
asyncio.Queue 协程安全的任务队列,天然无锁
asyncio.wait_for() 给协程加超时控制
asyncio.run() 程序入口,创建并运行事件循环

常见陷阱:

  • await 只能在 async def 内使用
  • 不要在协程中调用阻塞函数(如 time.sleep),用 asyncio.sleep
  • CPU 密集任务用 loop.run_in_executor() 放到线程池

⏱ NexDo Time(5 分钟)

  1. 限流器:用 asyncio.Semaphore(3) 限制最大并发数为 3,观察 queue 模式下的行为变化。
  2. 重试机制:给 check_url 加最多 2 次重试逻辑,失败时等待 0.1s 后重试。
  3. 结果排序:将 demo_health 的结果按延迟从低到高排序后输出,找出最慢的服务。

Don’t wait for next time, do it in the next moment.