14 · 协程实战:asyncio 实现高并发
🔗 知识图谱导航:阅读本文前,建议先掌握/回顾 《13 · IO 多路复用:select/epoll 剖析》 中的核心概念;本文会在这个基础上继续推进。 上一篇用 selectors 手动管理事件循环;本篇让 asyncio 接管这一切,用 async/await 写出媲美 Go 的高并发代码。
极客解析:并发不是把代码写复杂,而是把等待、调度和资源隔离讲清楚;本文用本机可运行的 Mock 场景验证机制。
痛点与架构:单独记 API 或概念很容易学完就忘;本文先锁定真实痛点,再把它拆成“输入数据 → 核心机制 → 可运行输出”三段闭环。从 selectors 事件循环到 asyncio,掌握协程语法与高并发任务调度,实现并发 URL 健康检查器。
1. 并发模型三角对比
┌──────────────┬──────────────┬──────────────┬──────────────┐
│ 维度 │ 多进程 │ 多线程 │ 协程(asyncio)│
├──────────────┼──────────────┼──────────────┼──────────────┤
│ 切换开销 │ 高(进程上下文)│ 中(线程上下文)│ 极低(用户态) │
│ 内存占用 │ 高(独立堆栈) │ 中(~1MB/线程)│ 低(~几KB) │
│ GIL影响 │ 无 │ 有(CPU密集) │ 有(但不阻塞) │
│ 适合场景 │ CPU密集 │ IO密集+少量 │ IO密集+海量 │
│ 最大并发数 │ ~CPU核数 │ ~千级 │ ~万级 │
│ 调试难度 │ 中 │ 高(竞态条件) │ 中(单线程) │
│ Python支持 │ multiprocess │ threading │ asyncio(内置)│
└──────────────┴──────────────┴──────────────┴──────────────┘
2. async/await 核心语法
import asyncio
async def fetch(url: str) -> str:
# await 不是卡死线程,而是把控制权还给事件循环,让别的协程先跑。
await asyncio.sleep(0.2) # 用短等待模拟一次网络 IO
return f"200 OK: {url}"
# 运行协程
result = asyncio.run(fetch("https://example.com"))
print("协程返回:", result)
事件循环工作原理:
asyncio.run()
│
▼
┌─────────────────────────────────────┐
│ Event Loop │
│ │
│ Task1: fetch(url1) ──await──▶ 挂起 │
│ Task2: fetch(url2) ──await──▶ 挂起 │
│ Task3: fetch(url3) ──await──▶ 挂起 │
│ │
│ IO就绪回调 ──▶ 恢复对应Task │
│ 所有Task交替执行,单线程无竞态 │
└─────────────────────────────────────┘
3. 实战代码
步步为营:核心逻辑自适应拆解
协程不是多线程,而是“遇到等待就主动让出控制权”。你可以把它想成厨师等水烧开时先去切菜:一个线程也能同时推进很多 IO 任务。下面每个演示都补了注释和输出,先看单个协程,再看批量调度。
Step 1:用 CheckResult 把健康检查结果装进标准数据盒
痛点与机制:
并发任务一多,返回值就不能是散乱字符串。CheckResult 把每次健康检查的结果固定成一个对象,后面无论是表格展示、统计健康率,还是计算平均延迟,都能按字段读取。
核心源码(逐字来自文末完整源码):
@dataclass
class CheckResult:
url: str
status: int
latency_ms: float
ok: bool
可运行演示(补齐 Mock 数据与 print 反馈):
from dataclasses import dataclass
# Step 1:dataclass 像一张固定格式的体检单,URL、状态码、延迟、是否健康都有格子。
@dataclass
class CheckResult:
url: str
status: int
latency_ms: float
ok: bool
result = CheckResult(url="https://api.example.com", status=200, latency_ms=123.4, ok=True)
print("完整结果:", result)
print("是否健康:", result.ok)
Step 2:用 async mock_http_get 模拟一次可等待的网络请求
痛点与机制:
协程的关键是 await。它像厨师等水烧开时先去切菜:当前任务暂停,但线程没有闲着,事件循环可以调度别的协程。这里用 asyncio.sleep() 模拟网络等待,做到零真实网络依赖。
核心源码(逐字来自文末完整源码):
async def mock_http_get(url: str) -> tuple[int, float]:
"""模拟网络延迟:随机 50~500ms,偶发 503"""
delay = random.uniform(0.05, 0.5)
await asyncio.sleep(delay)
status = 503 if random.random() < 0.15 else 200
return status, delay * 1000
可运行演示(补齐 Mock 数据与 print 反馈):
import asyncio
import random
# Step 2:await asyncio.sleep 不是傻等,而是把控制权还给事件循环,让别的任务先跑。
async def mock_http_get(url: str) -> tuple[int, float]:
"""模拟网络延迟:随机 50~500ms,偶发 503"""
delay = random.uniform(0.05, 0.5)
await asyncio.sleep(delay)
status = 503 if random.random() < 0.15 else 200
return status, delay * 1000
async def main() -> None:
random.seed(42)
status, latency = await mock_http_get("https://api.example.com")
print("状态码:", status)
print(f"模拟延迟: {latency:.1f}ms")
asyncio.run(main())
Step 3:用 check_url 把请求封装成一次完整健康检查
痛点与机制:
check_url() 把“发请求”和“生成结果对象”包成一个协程。它不是马上返回结果,而是返回一个可等待任务;只有 await check_url(...) 时,事件循环才会真正调度它运行。
核心源码(逐字来自文末完整源码):
async def check_url(url: str) -> CheckResult:
t0 = time.perf_counter()
status, latency = await mock_http_get(url)
return CheckResult(url=url, status=status, latency_ms=latency, ok=(status == 200))
可运行演示(补齐 Mock 数据与 print 反馈):
import asyncio
import random
import time
from dataclasses import dataclass
@dataclass
class CheckResult:
url: str
status: int
latency_ms: float
ok: bool
async def mock_http_get(url: str) -> tuple[int, float]:
"""模拟网络延迟:随机 50~500ms,偶发 503"""
delay = random.uniform(0.05, 0.5)
await asyncio.sleep(delay)
status = 503 if random.random() < 0.15 else 200
return status, delay * 1000
# Step 3:check_url 是单个 URL 的检查员,负责请求并打包结果。
async def check_url(url: str) -> CheckResult:
t0 = time.perf_counter()
status, latency = await mock_http_get(url)
return CheckResult(url=url, status=status, latency_ms=latency, ok=(status == 200))
async def main() -> None:
random.seed(7)
result = await check_url("https://blog.21zhao.com/ping")
print("URL:", result.url)
print("状态:", result.status)
print(f"延迟: {result.latency_ms:.1f}ms")
print("健康:", result.ok)
asyncio.run(main())
Step 4:用 demo_gather 对比串行 await 和 gather 并发 await
痛点与机制:
串行写法是“检查完 A 再检查 B”,总耗时接近所有等待时间相加;asyncio.gather() 是“同时派出去”,总耗时更接近最慢的那一个请求。注意:这不是多线程,而是单线程事件循环在等待期间切换任务。
核心源码(逐字来自文末完整源码):
async def demo_gather() -> None:
urls = [
"https://api.21zhao.com/health",
"https://blog.21zhao.com/ping",
"https://todo.21zhao.com/status",
"https://jupy.21zhao.com/api",
"https://ai.21zhao.com/v1/models",
"https://www.21zhao.com/sitemap.xml",
]
# 串行
t0 = time.perf_counter()
serial_results: list[CheckResult] = []
for url in urls:
serial_results.append(await check_url(url))
serial_time = time.perf_counter() - t0
# 并发
t0 = time.perf_counter()
concurrent_results: list[CheckResult] = await asyncio.gather(
*[check_url(url) for url in urls]
)
concurrent_time = time.perf_counter() - t0
print("\n=== asyncio.gather 并发 vs 串行 ===")
print(f"{'URL':<42} {'状态':<6} {'延迟(ms)':<10} {'结果'}")
print("-" * 70)
for r in concurrent_results:
icon = "✓" if r.ok else "✗"
print(f"{r.url:<42} {r.status:<6} {r.latency_ms:<10.1f} {icon}")
print(f"\n{'串行总耗时':<12}: {serial_time:.3f}s")
print(f"{'并发总耗时':<12}: {concurrent_time:.3f}s")
print(f"{'加速比':<12}: {serial_time / concurrent_time:.1f}x")
ok_count = sum(1 for r in concurrent_results if r.ok)
print(f"{'健康率':<12}: {ok_count}/{len(urls)}")
可运行演示(补齐 Mock 数据与 print 反馈):
import asyncio
import random
import time
from dataclasses import dataclass
@dataclass
class CheckResult:
url: str
status: int
latency_ms: float
ok: bool
async def mock_http_get(url: str) -> tuple[int, float]:
"""模拟网络延迟:随机 50~500ms,偶发 503"""
delay = random.uniform(0.05, 0.5)
await asyncio.sleep(delay)
status = 503 if random.random() < 0.15 else 200
return status, delay * 1000
async def check_url(url: str) -> CheckResult:
t0 = time.perf_counter()
status, latency = await mock_http_get(url)
return CheckResult(url=url, status=status, latency_ms=latency, ok=(status == 200))
# Step 4:gather 像一次派出多名检查员,同时检查多个 URL。
async def demo_gather() -> None:
urls = [
"https://api.21zhao.com/health",
"https://blog.21zhao.com/ping",
"https://todo.21zhao.com/status",
"https://jupy.21zhao.com/api",
"https://ai.21zhao.com/v1/models",
"https://www.21zhao.com/sitemap.xml",
]
# 串行
t0 = time.perf_counter()
serial_results: list[CheckResult] = []
for url in urls:
serial_results.append(await check_url(url))
serial_time = time.perf_counter() - t0
# 并发
t0 = time.perf_counter()
concurrent_results: list[CheckResult] = await asyncio.gather(
*[check_url(url) for url in urls]
)
concurrent_time = time.perf_counter() - t0
print("\n=== asyncio.gather 并发 vs 串行 ===")
print(f"{'URL':<42} {'状态':<6} {'延迟(ms)':<10} {'结果'}")
print("-" * 70)
for r in concurrent_results:
icon = "✓" if r.ok else "✗"
print(f"{r.url:<42} {r.status:<6} {r.latency_ms:<10.1f} {icon}")
print(f"\n{'串行总耗时':<12}: {serial_time:.3f}s")
print(f"{'并发总耗时':<12}: {concurrent_time:.3f}s")
print(f"{'加速比':<12}: {serial_time / concurrent_time:.1f}x")
ok_count = sum(1 for r in concurrent_results if r.ok)
print(f"{'健康率':<12}: {ok_count}/{len(urls)}")
random.seed(42)
asyncio.run(demo_gather())
Step 5:用 producer/worker 理解 Queue 的生产者消费者模型
痛点与机制:
producer() 负责把 URL 放进队列,worker() 负责从队列取 URL 并检查。__DONE__ 是结束信号,告诉 worker “不用等了,可以下班”。queue.task_done() 用来标记一个任务处理完,是队列协作里的礼貌收尾。
核心源码(逐字来自文末完整源码):
async def producer(queue: asyncio.Queue[str], urls: list[str]) -> None:
for url in urls:
await queue.put(url)
await asyncio.sleep(0.02) # 模拟逐步产生任务
# 发送结束信号
for _ in range(3): # 3个worker
await queue.put("__DONE__")
async def worker(wid: int, queue: asyncio.Queue[str], results: list[CheckResult]) -> None:
while True:
url = await queue.get()
if url == "__DONE__":
queue.task_done()
break
result = await check_url(url)
results.append(result)
status_icon = "✓" if result.ok else "✗"
print(f" Worker-{wid} {status_icon} {url:<40} {result.latency_ms:.0f}ms")
queue.task_done()
可运行演示(补齐 Mock 数据与 print 反馈):
import asyncio
import random
import time
from dataclasses import dataclass
@dataclass
class CheckResult:
url: str
status: int
latency_ms: float
ok: bool
async def mock_http_get(url: str) -> tuple[int, float]:
"""模拟网络延迟:随机 50~500ms,偶发 503"""
delay = random.uniform(0.05, 0.5)
await asyncio.sleep(delay)
status = 503 if random.random() < 0.15 else 200
return status, delay * 1000
async def check_url(url: str) -> CheckResult:
t0 = time.perf_counter()
status, latency = await mock_http_get(url)
return CheckResult(url=url, status=status, latency_ms=latency, ok=(status == 200))
# Step 5:Queue 像餐厅取餐口,producer 放任务,worker 取任务处理。
async def producer(queue: asyncio.Queue[str], urls: list[str]) -> None:
for url in urls:
await queue.put(url)
await asyncio.sleep(0.02) # 模拟逐步产生任务
# 发送结束信号
for _ in range(3): # 3个worker
await queue.put("__DONE__")
async def worker(wid: int, queue: asyncio.Queue[str], results: list[CheckResult]) -> None:
while True:
url = await queue.get()
if url == "__DONE__":
queue.task_done()
break
result = await check_url(url)
results.append(result)
status_icon = "✓" if result.ok else "✗"
print(f" Worker-{wid} {status_icon} {url:<40} {result.latency_ms:.0f}ms")
queue.task_done()
async def main() -> None:
random.seed(1)
queue: asyncio.Queue[str] = asyncio.Queue(maxsize=2)
results: list[CheckResult] = []
urls = [f"https://service-{i}.example.com" for i in range(1, 4)]
await asyncio.gather(producer(queue, urls), worker(1, queue, results), worker(2, queue, results), worker(3, queue, results))
print("完成数量:", len(results))
asyncio.run(main())
Step 6:用 demo_queue 跑完整的异步任务队列
痛点与机制:
任务队列适合“任务不断产生、多个工人并发处理”的场景。这里队列容量是 5,worker 数是 3,新手可以看到多个 worker 交替打印结果,这就是协程并发的直观表现。
核心源码(逐字来自文末完整源码):
async def demo_queue() -> None:
urls = [f"https://service-{i}.21zhao.com/health" for i in range(1, 10)]
queue: asyncio.Queue[str] = asyncio.Queue(maxsize=5)
results: list[CheckResult] = []
print("\n=== asyncio.Queue 生产者-消费者模式 ===")
print(f"任务数: {len(urls)} Worker数: 3 队列容量: 5\n")
await asyncio.gather(
producer(queue, urls),
worker(1, queue, results),
worker(2, queue, results),
worker(3, queue, results),
)
ok = sum(1 for r in results if r.ok)
avg_latency = sum(r.latency_ms for r in results) / len(results)
print(f"\n{'完成任务':<10}: {len(results)}")
print(f"{'成功':<10}: {ok} 失败: {len(results) - ok}")
print(f"{'平均延迟':<10}: {avg_latency:.1f}ms")
可运行演示(补齐 Mock 数据与 print 反馈):
import asyncio
import random
import time
from dataclasses import dataclass
@dataclass
class CheckResult:
url: str
status: int
latency_ms: float
ok: bool
async def mock_http_get(url: str) -> tuple[int, float]:
"""模拟网络延迟:随机 50~500ms,偶发 503"""
delay = random.uniform(0.05, 0.5)
await asyncio.sleep(delay)
status = 503 if random.random() < 0.15 else 200
return status, delay * 1000
async def check_url(url: str) -> CheckResult:
t0 = time.perf_counter()
status, latency = await mock_http_get(url)
return CheckResult(url=url, status=status, latency_ms=latency, ok=(status == 200))
async def producer(queue: asyncio.Queue[str], urls: list[str]) -> None:
for url in urls:
await queue.put(url)
await asyncio.sleep(0.02) # 模拟逐步产生任务
# 发送结束信号
for _ in range(3): # 3个worker
await queue.put("__DONE__")
async def worker(wid: int, queue: asyncio.Queue[str], results: list[CheckResult]) -> None:
while True:
url = await queue.get()
if url == "__DONE__":
queue.task_done()
break
result = await check_url(url)
results.append(result)
status_icon = "✓" if result.ok else "✗"
print(f" Worker-{wid} {status_icon} {url:<40} {result.latency_ms:.0f}ms")
queue.task_done()
# Step 6:完整队列演示会启动 1 个生产者 + 3 个 worker。
async def demo_queue() -> None:
urls = [f"https://service-{i}.21zhao.com/health" for i in range(1, 10)]
queue: asyncio.Queue[str] = asyncio.Queue(maxsize=5)
results: list[CheckResult] = []
print("\n=== asyncio.Queue 生产者-消费者模式 ===")
print(f"任务数: {len(urls)} Worker数: 3 队列容量: 5\n")
await asyncio.gather(
producer(queue, urls),
worker(1, queue, results),
worker(2, queue, results),
worker(3, queue, results),
)
ok = sum(1 for r in results if r.ok)
avg_latency = sum(r.latency_ms for r in results) / len(results)
print(f"\n{'完成任务':<10}: {len(results)}")
print(f"{'成功':<10}: {ok} 失败: {len(results) - ok}")
print(f"{'平均延迟':<10}: {avg_latency:.1f}ms")
random.seed(42)
asyncio.run(demo_queue())
Step 7:用 demo_health 加超时控制,避免慢请求拖垮全局
痛点与机制:
真实健康检查不能无限等。asyncio.wait_for() 给每个检查设置超时,慢到超过阈值就返回 408。这样仪表盘不会被一个卡住的服务拖死。
核心源码(逐字来自文末完整源码):
async def demo_health() -> None:
"""带超时控制的健康检查"""
async def check_with_timeout(url: str, timeout: float) -> CheckResult:
try:
return await asyncio.wait_for(check_url(url), timeout=timeout)
except asyncio.TimeoutError:
return CheckResult(url=url, status=408, latency_ms=timeout * 1000, ok=False)
endpoints = {
"主站": "https://21zhao.com",
"博客": "https://blog.21zhao.com",
"Todo": "https://todo.21zhao.com",
"Jupyter": "https://jupy.21zhao.com",
"AI代理": "https://ai.21zhao.com",
"YouCare": "https://21zhao.com/youcare/",
}
print("\n=== 服务健康检查仪表盘 ===")
t0 = time.perf_counter()
tasks = [check_with_timeout(url, timeout=0.3) for url in endpoints.values()]
results = await asyncio.gather(*tasks)
elapsed = time.perf_counter() - t0
print(f"\n{'服务':<10} {'URL':<38} {'状态码':<8} {'延迟':<10} {'健康'}")
print("-" * 75)
for name, result in zip(endpoints.keys(), results):
health = "🟢 UP" if result.ok else "🔴 DOWN"
print(f"{name:<10} {result.url:<38} {result.status:<8} {result.latency_ms:<10.0f}ms {health}")
total = len(results)
healthy = sum(1 for r in results if r.ok)
print(f"\n检查完成: {elapsed:.3f}s | 健康: {healthy}/{total} | "
f"可用率: {healthy/total*100:.0f}%")
可运行演示(补齐 Mock 数据与 print 反馈):
import asyncio
import random
import time
from dataclasses import dataclass
@dataclass
class CheckResult:
url: str
status: int
latency_ms: float
ok: bool
async def mock_http_get(url: str) -> tuple[int, float]:
"""模拟网络延迟:随机 50~500ms,偶发 503"""
delay = random.uniform(0.05, 0.5)
await asyncio.sleep(delay)
status = 503 if random.random() < 0.15 else 200
return status, delay * 1000
async def check_url(url: str) -> CheckResult:
t0 = time.perf_counter()
status, latency = await mock_http_get(url)
return CheckResult(url=url, status=status, latency_ms=latency, ok=(status == 200))
# Step 7:wait_for 像给请求设置闹钟,超过时间就判定超时。
async def demo_health() -> None:
"""带超时控制的健康检查"""
async def check_with_timeout(url: str, timeout: float) -> CheckResult:
try:
return await asyncio.wait_for(check_url(url), timeout=timeout)
except asyncio.TimeoutError:
return CheckResult(url=url, status=408, latency_ms=timeout * 1000, ok=False)
endpoints = {
"主站": "https://21zhao.com",
"博客": "https://blog.21zhao.com",
"Todo": "https://todo.21zhao.com",
"Jupyter": "https://jupy.21zhao.com",
"AI代理": "https://ai.21zhao.com",
"YouCare": "https://21zhao.com/youcare/",
}
print("\n=== 服务健康检查仪表盘 ===")
t0 = time.perf_counter()
tasks = [check_with_timeout(url, timeout=0.3) for url in endpoints.values()]
results = await asyncio.gather(*tasks)
elapsed = time.perf_counter() - t0
print(f"\n{'服务':<10} {'URL':<38} {'状态码':<8} {'延迟':<10} {'健康'}")
print("-" * 75)
for name, result in zip(endpoints.keys(), results):
health = "🟢 UP" if result.ok else "🔴 DOWN"
print(f"{name:<10} {result.url:<38} {result.status:<8} {result.latency_ms:<10.0f}ms {health}")
total = len(results)
healthy = sum(1 for r in results if r.ok)
print(f"\n检查完成: {elapsed:.3f}s | 健康: {healthy}/{total} | "
f"可用率: {healthy/total*100:.0f}%")
random.seed(42)
asyncio.run(demo_health())
Step 8:用 main 做 gather/queue/health 三种模式总入口
痛点与机制:
asyncio.run() 是事件循环入口。main() 根据 --mode 决定运行哪组协程:gather 看并发对比,queue 看生产者消费者,health 看健康检查仪表盘。
核心源码(逐字来自文末完整源码):
def main() -> None:
parser = argparse.ArgumentParser(description="asyncio 高并发演示")
parser.add_argument(
"--mode",
choices=["gather", "queue", "health"],
default="health",
help="gather=并发对比, queue=任务队列, health=健康检查仪表盘",
)
args = parser.parse_args()
random.seed(42) # 固定随机种子,结果可复现
if args.mode == "gather":
asyncio.run(demo_gather())
elif args.mode == "queue":
asyncio.run(demo_queue())
else:
asyncio.run(demo_health())
可运行演示(补齐 Mock 数据与 print 反馈):
import argparse
import asyncio
import random
import time
from dataclasses import dataclass
@dataclass
class CheckResult:
url: str
status: int
latency_ms: float
ok: bool
async def mock_http_get(url: str) -> tuple[int, float]:
"""模拟网络延迟:随机 50~500ms,偶发 503"""
delay = random.uniform(0.05, 0.5)
await asyncio.sleep(delay)
status = 503 if random.random() < 0.15 else 200
return status, delay * 1000
async def check_url(url: str) -> CheckResult:
t0 = time.perf_counter()
status, latency = await mock_http_get(url)
return CheckResult(url=url, status=status, latency_ms=latency, ok=(status == 200))
async def demo_gather() -> None:
urls = [
"https://api.21zhao.com/health",
"https://blog.21zhao.com/ping",
"https://todo.21zhao.com/status",
"https://jupy.21zhao.com/api",
"https://ai.21zhao.com/v1/models",
"https://www.21zhao.com/sitemap.xml",
]
# 串行
t0 = time.perf_counter()
serial_results: list[CheckResult] = []
for url in urls:
serial_results.append(await check_url(url))
serial_time = time.perf_counter() - t0
# 并发
t0 = time.perf_counter()
concurrent_results: list[CheckResult] = await asyncio.gather(
*[check_url(url) for url in urls]
)
concurrent_time = time.perf_counter() - t0
print("\n=== asyncio.gather 并发 vs 串行 ===")
print(f"{'URL':<42} {'状态':<6} {'延迟(ms)':<10} {'结果'}")
print("-" * 70)
for r in concurrent_results:
icon = "✓" if r.ok else "✗"
print(f"{r.url:<42} {r.status:<6} {r.latency_ms:<10.1f} {icon}")
print(f"\n{'串行总耗时':<12}: {serial_time:.3f}s")
print(f"{'并发总耗时':<12}: {concurrent_time:.3f}s")
print(f"{'加速比':<12}: {serial_time / concurrent_time:.1f}x")
ok_count = sum(1 for r in concurrent_results if r.ok)
print(f"{'健康率':<12}: {ok_count}/{len(urls)}")
async def producer(queue: asyncio.Queue[str], urls: list[str]) -> None:
for url in urls:
await queue.put(url)
await asyncio.sleep(0.02) # 模拟逐步产生任务
# 发送结束信号
for _ in range(3): # 3个worker
await queue.put("__DONE__")
async def worker(wid: int, queue: asyncio.Queue[str], results: list[CheckResult]) -> None:
while True:
url = await queue.get()
if url == "__DONE__":
queue.task_done()
break
result = await check_url(url)
results.append(result)
status_icon = "✓" if result.ok else "✗"
print(f" Worker-{wid} {status_icon} {url:<40} {result.latency_ms:.0f}ms")
queue.task_done()
async def demo_queue() -> None:
urls = [f"https://service-{i}.21zhao.com/health" for i in range(1, 10)]
queue: asyncio.Queue[str] = asyncio.Queue(maxsize=5)
results: list[CheckResult] = []
print("\n=== asyncio.Queue 生产者-消费者模式 ===")
print(f"任务数: {len(urls)} Worker数: 3 队列容量: 5\n")
await asyncio.gather(
producer(queue, urls),
worker(1, queue, results),
worker(2, queue, results),
worker(3, queue, results),
)
ok = sum(1 for r in results if r.ok)
avg_latency = sum(r.latency_ms for r in results) / len(results)
print(f"\n{'完成任务':<10}: {len(results)}")
print(f"{'成功':<10}: {ok} 失败: {len(results) - ok}")
print(f"{'平均延迟':<10}: {avg_latency:.1f}ms")
async def demo_health() -> None:
"""带超时控制的健康检查"""
async def check_with_timeout(url: str, timeout: float) -> CheckResult:
try:
return await asyncio.wait_for(check_url(url), timeout=timeout)
except asyncio.TimeoutError:
return CheckResult(url=url, status=408, latency_ms=timeout * 1000, ok=False)
endpoints = {
"主站": "https://21zhao.com",
"博客": "https://blog.21zhao.com",
"Todo": "https://todo.21zhao.com",
"Jupyter": "https://jupy.21zhao.com",
"AI代理": "https://ai.21zhao.com",
"YouCare": "https://21zhao.com/youcare/",
}
print("\n=== 服务健康检查仪表盘 ===")
t0 = time.perf_counter()
tasks = [check_with_timeout(url, timeout=0.3) for url in endpoints.values()]
results = await asyncio.gather(*tasks)
elapsed = time.perf_counter() - t0
print(f"\n{'服务':<10} {'URL':<38} {'状态码':<8} {'延迟':<10} {'健康'}")
print("-" * 75)
for name, result in zip(endpoints.keys(), results):
health = "🟢 UP" if result.ok else "🔴 DOWN"
print(f"{name:<10} {result.url:<38} {result.status:<8} {result.latency_ms:<10.0f}ms {health}")
total = len(results)
healthy = sum(1 for r in results if r.ok)
print(f"\n检查完成: {elapsed:.3f}s | 健康: {healthy}/{total} | "
f"可用率: {healthy/total*100:.0f}%")
# Step 8:main 是命令行遥控器,--mode 决定跑哪种异步实验。
def main() -> None:
parser = argparse.ArgumentParser(description="asyncio 高并发演示")
parser.add_argument(
"--mode",
choices=["gather", "queue", "health"],
default="health",
help="gather=并发对比, queue=任务队列, health=健康检查仪表盘",
)
args = parser.parse_args()
random.seed(42) # 固定随机种子,结果可复现
if args.mode == "gather":
asyncio.run(demo_gather())
elif args.mode == "queue":
asyncio.run(demo_queue())
else:
asyncio.run(demo_health())
import sys
for mode in ["health", "queue"]:
print(f"\n>>> mode={mode}")
sys.argv = ["prog", "--mode", mode]
main()
极客实战:完整源码与运行
现在,把上面的积木拼起来,将以下完整代码放进你的编辑器,运行它。先看整体闭环,再回头逐段改参数,你会更容易建立工程直觉。
# 文件名: 14-python-coroutine.py
# 运行: python3 14-python-coroutine.py --mode health
# python3 14-python-coroutine.py --mode queue
# python3 14-python-coroutine.py --mode gather
import argparse
import asyncio
import random
import time
from dataclasses import dataclass, field
from typing import Any
# ── 数据模型 ────────────────────────────────────────────────
@dataclass
class CheckResult:
url: str
status: int
latency_ms: float
ok: bool
# ── 模拟HTTP请求(零网络依赖)──────────────────────────────
async def mock_http_get(url: str) -> tuple[int, float]:
"""模拟网络延迟:随机 50~500ms,偶发 503"""
delay = random.uniform(0.05, 0.5)
await asyncio.sleep(delay)
status = 503 if random.random() < 0.15 else 200
return status, delay * 1000
# ── 单个URL检查 ─────────────────────────────────────────────
async def check_url(url: str) -> CheckResult:
t0 = time.perf_counter()
status, latency = await mock_http_get(url)
return CheckResult(url=url, status=status, latency_ms=latency, ok=(status == 200))
# ── 模式1:gather并发 vs 串行对比 ───────────────────────────
async def demo_gather() -> None:
urls = [
"https://api.21zhao.com/health",
"https://blog.21zhao.com/ping",
"https://todo.21zhao.com/status",
"https://jupy.21zhao.com/api",
"https://ai.21zhao.com/v1/models",
"https://www.21zhao.com/sitemap.xml",
]
# 串行
t0 = time.perf_counter()
serial_results: list[CheckResult] = []
for url in urls:
serial_results.append(await check_url(url))
serial_time = time.perf_counter() - t0
# 并发
t0 = time.perf_counter()
concurrent_results: list[CheckResult] = await asyncio.gather(
*[check_url(url) for url in urls]
)
concurrent_time = time.perf_counter() - t0
print("\n=== asyncio.gather 并发 vs 串行 ===")
print(f"{'URL':<42} {'状态':<6} {'延迟(ms)':<10} {'结果'}")
print("-" * 70)
for r in concurrent_results:
icon = "✓" if r.ok else "✗"
print(f"{r.url:<42} {r.status:<6} {r.latency_ms:<10.1f} {icon}")
print(f"\n{'串行总耗时':<12}: {serial_time:.3f}s")
print(f"{'并发总耗时':<12}: {concurrent_time:.3f}s")
print(f"{'加速比':<12}: {serial_time / concurrent_time:.1f}x")
ok_count = sum(1 for r in concurrent_results if r.ok)
print(f"{'健康率':<12}: {ok_count}/{len(urls)}")
# ── 模式2:asyncio.Queue 任务队列 ───────────────────────────
async def producer(queue: asyncio.Queue[str], urls: list[str]) -> None:
for url in urls:
await queue.put(url)
await asyncio.sleep(0.02) # 模拟逐步产生任务
# 发送结束信号
for _ in range(3): # 3个worker
await queue.put("__DONE__")
async def worker(wid: int, queue: asyncio.Queue[str], results: list[CheckResult]) -> None:
while True:
url = await queue.get()
if url == "__DONE__":
queue.task_done()
break
result = await check_url(url)
results.append(result)
status_icon = "✓" if result.ok else "✗"
print(f" Worker-{wid} {status_icon} {url:<40} {result.latency_ms:.0f}ms")
queue.task_done()
async def demo_queue() -> None:
urls = [f"https://service-{i}.21zhao.com/health" for i in range(1, 10)]
queue: asyncio.Queue[str] = asyncio.Queue(maxsize=5)
results: list[CheckResult] = []
print("\n=== asyncio.Queue 生产者-消费者模式 ===")
print(f"任务数: {len(urls)} Worker数: 3 队列容量: 5\n")
await asyncio.gather(
producer(queue, urls),
worker(1, queue, results),
worker(2, queue, results),
worker(3, queue, results),
)
ok = sum(1 for r in results if r.ok)
avg_latency = sum(r.latency_ms for r in results) / len(results)
print(f"\n{'完成任务':<10}: {len(results)}")
print(f"{'成功':<10}: {ok} 失败: {len(results) - ok}")
print(f"{'平均延迟':<10}: {avg_latency:.1f}ms")
# ── 模式3:健康检查仪表盘 ───────────────────────────────────
async def demo_health() -> None:
"""带超时控制的健康检查"""
async def check_with_timeout(url: str, timeout: float) -> CheckResult:
try:
return await asyncio.wait_for(check_url(url), timeout=timeout)
except asyncio.TimeoutError:
return CheckResult(url=url, status=408, latency_ms=timeout * 1000, ok=False)
endpoints = {
"主站": "https://21zhao.com",
"博客": "https://blog.21zhao.com",
"Todo": "https://todo.21zhao.com",
"Jupyter": "https://jupy.21zhao.com",
"AI代理": "https://ai.21zhao.com",
"YouCare": "https://21zhao.com/youcare/",
}
print("\n=== 服务健康检查仪表盘 ===")
t0 = time.perf_counter()
tasks = [check_with_timeout(url, timeout=0.3) for url in endpoints.values()]
results = await asyncio.gather(*tasks)
elapsed = time.perf_counter() - t0
print(f"\n{'服务':<10} {'URL':<38} {'状态码':<8} {'延迟':<10} {'健康'}")
print("-" * 75)
for name, result in zip(endpoints.keys(), results):
health = "🟢 UP" if result.ok else "🔴 DOWN"
print(f"{name:<10} {result.url:<38} {result.status:<8} {result.latency_ms:<10.0f}ms {health}")
total = len(results)
healthy = sum(1 for r in results if r.ok)
print(f"\n检查完成: {elapsed:.3f}s | 健康: {healthy}/{total} | "
f"可用率: {healthy/total*100:.0f}%")
def main() -> None:
parser = argparse.ArgumentParser(description="asyncio 高并发演示")
parser.add_argument(
"--mode",
choices=["gather", "queue", "health"],
default="health",
help="gather=并发对比, queue=任务队列, health=健康检查仪表盘",
)
args = parser.parse_args()
random.seed(42) # 固定随机种子,结果可复现
if args.mode == "gather":
asyncio.run(demo_gather())
elif args.mode == "queue":
asyncio.run(demo_queue())
else:
asyncio.run(demo_health())
if __name__ == "__main__":
main()
运行后你会看到类似输出:
$ python3 14-python-coroutine.py
=== 服务健康检查仪表盘 ===
服务 URL 状态码 延迟 健康
---------------------------------------------------------------------------
主站 https://21zhao.com 408 300 ms 🔴 DOWN
博客 https://blog.21zhao.com 200 61 ms 🟢 UP
Todo https://todo.21zhao.com 200 174 ms 🟢 UP
Jupyter https://jupy.21zhao.com 503 150 ms 🔴 DOWN
AI代理 https://ai.21zhao.com 408 300 ms 🔴 DOWN
YouCare https://21zhao.com/youcare/ 408 300 ms 🔴 DOWN
检查完成: 0.302s | 健康: 2/6 | 可用率: 33%
4. 运行示例
python3 14-python-coroutine.py --mode health # 健康检查仪表盘(默认)
python3 14-python-coroutine.py --mode gather # 并发vs串行耗时对比
python3 14-python-coroutine.py --mode queue # 生产者-消费者队列
5. 关键要点
| 概念 | 说明 |
|---|---|
async def |
定义协程函数,调用返回协程对象 |
await |
挂起当前协程,让出事件循环控制权 |
asyncio.gather() |
并发运行多个协程,等待全部完成 |
asyncio.Queue |
协程安全的任务队列,天然无锁 |
asyncio.wait_for() |
给协程加超时控制 |
asyncio.run() |
程序入口,创建并运行事件循环 |
常见陷阱:
await只能在async def内使用- 不要在协程中调用阻塞函数(如
time.sleep),用asyncio.sleep - CPU 密集任务用
loop.run_in_executor()放到线程池
⏱ NexDo Time(5 分钟)
- 限流器:用
asyncio.Semaphore(3)限制最大并发数为 3,观察 queue 模式下的行为变化。 - 重试机制:给
check_url加最多 2 次重试逻辑,失败时等待 0.1s 后重试。 - 结果排序:将
demo_health的结果按延迟从低到高排序后输出,找出最慢的服务。
Don’t wait for next time, do it in the next moment.