13 · IO 多路复用:select/epoll 剖析
🔗 知识图谱导航:阅读本文前,建议先掌握/回顾 《12 · 多进程与多线程:榨干 CPU 性能》 中的核心概念;本文会在这个基础上继续推进。 上一篇我们掌握了多线程与多进程的并发模型;本篇深入操作系统层,剖析 IO 多路复用如何用单线程驾驭海量连接。
极客解析:并发不是把代码写复杂,而是把等待、调度和资源隔离讲清楚;本文用本机可运行的 Mock 场景验证机制。
痛点与架构:单独记 API 或概念很容易学完就忘;本文先锁定真实痛点,再把它拆成“输入数据 → 核心机制 → 可运行输出”三段闭环。从阻塞IO到epoll,彻底搞懂IO多路复用原理,用selectors模块实现多客户端聊天服务器。
1. 三种 IO 模型对比
┌─────────────────┬──────────────────┬──────────────────┬──────────────────┐
│ 模型 │ 阻塞IO │ 非阻塞IO │ IO多路复用 │
├─────────────────┼──────────────────┼──────────────────┼──────────────────┤
│ 等待数据时 │ 线程挂起 │ 轮询返回EAGAIN │ select/epoll等待 │
│ 拷贝数据时 │ 阻塞 │ 阻塞 │ 阻塞 │
│ CPU利用率 │ 低(睡眠) │ 高(空转) │ 高(事件驱动) │
│ 适合连接数 │ 少量 │ 少量 │ 海量(C10K+) │
│ 编程复杂度 │ 低 │ 中 │ 中 │
│ 典型代表 │ 传统BIO │ NIO轮询 │ Nginx/Redis │
└─────────────────┴──────────────────┴──────────────────┴──────────────────┘
2. select 原理
用户空间 内核空间
┌──────────┐ ┌─────────────────────────┐
│ fd_set │──copy──▶ │ 遍历所有fd (O(n)) │
│ [1,3,7,] │ │ fd1: 无数据 │
│ │ │ fd3: 有数据 ✓ │
│ │◀──copy── │ fd7: 无数据 │
│ [0,1,0,] │ └─────────────────────────┘
└──────────┘
│
▼
应用层再次遍历fd_set找到就绪fd
缺点:
• fd数量上限 1024(FD_SETSIZE)
• 每次调用都要把fd_set从用户态拷贝到内核态
• 内核返回后应用层还要O(n)扫描
3. epoll 原理(Linux)
epoll_create()
│
▼
┌─────────────┐ epoll_ctl(ADD) ┌──────────────────┐
│ epoll实例 │◀────────────────────│ 注册感兴趣的fd │
│ (红黑树) │ └──────────────────┘
└──────┬──────┘
│ 内核回调(数据就绪时)
▼
┌─────────────┐ epoll_wait() ┌──────────────────┐
│ 就绪链表 │────────────────────▶│ 只返回就绪的fd │
│ (O(1)返回) │ │ 无需遍历全部 │
└─────────────┘ └──────────────────┘
触发模式:
LT(水平触发):缓冲区有数据就一直通知,默认模式,不易漏事件
ET(边缘触发):仅在状态变化时通知一次,需一次读完,性能更高
4. selectors 模块(跨平台)
Python selectors 自动选择当前平台最优实现:Linux→epoll,macOS→kqueue,Windows→select。
5. 实战代码
步步为营:核心逻辑自适应拆解
IO 多路复用可以先想成“一个服务员看一块叫号屏”:不用给每个客人配一个服务员,也不用挨桌空问,谁准备好就处理谁。下面每段演示都带注释,先用不占端口的方式讲清机制,再保留完整源码里的真实聊天服务器。
Step 1:先创建 selector 和聊天日志,准备事件总控台
痛点与机制:
selectors.DefaultSelector() 会按系统自动选择最佳实现:Linux 常见是 epoll,macOS 常见是 kqueue。你可以把 selector 理解成“事件叫号器”:程序不用挨个问每个连接有没有数据,而是等叫号器告诉你哪个连接准备好了。
核心源码(逐字来自文末完整源码):
sel = selectors.DefaultSelector()
chat_log: list[str] = []
可运行演示(补齐 Mock 数据与 print 反馈):
import selectors
sel = selectors.DefaultSelector()
chat_log: list[str] = []
print("当前选择器类型:", type(sel).__name__)
print("初始注册fd数量:", len(sel.get_map()))
print("聊天日志条数:", len(chat_log))
sel.close()
Step 2:用 accept_conn 理解新连接如何登记到 selector
痛点与机制:
accept_conn() 是“新客入座”的流程:服务端接到连接,把连接改成非阻塞,再把这个连接注册到 selector。这样后续不用开一个线程死等这个客户端,selector 会在它可读或可写时通知我们。
核心源码(逐字来自文末完整源码):
def accept_conn(sock: socket.socket) -> None:
conn, addr = sock.accept()
conn.setblocking(False)
data = types.SimpleNamespace(addr=addr, inb=b"", outb=b"")
sel.register(conn, selectors.EVENT_READ | selectors.EVENT_WRITE, data=data)
可运行演示(补齐 Mock 数据与 print 反馈):
import selectors
import socket
import types
sel = selectors.DefaultSelector()
chat_log: list[str] = []
# Step 2:accept_conn 负责把新来的客户端登记进 selector。
def accept_conn(sock: socket.socket) -> None:
conn, addr = sock.accept()
conn.setblocking(False)
data = types.SimpleNamespace(addr=addr, inb=b"", outb=b"")
sel.register(conn, selectors.EVENT_READ | selectors.EVENT_WRITE, data=data)
print("accept_conn 已定义")
print("它做三件事: accept连接 -> 设为非阻塞 -> register读写事件")
print("演示环境不绑定真实端口,避免浏览器沙箱权限问题。")
sel.close()
Step 3:用 service_conn 处理读写事件,完成一次回显
痛点与机制:
mask 是事件标签:有 EVENT_READ 就读数据,有 EVENT_WRITE 就写数据。这里用 socketpair() 造一对本地 socket,不占端口也能演示“客户端发消息 -> 服务端读 -> 写回显”。注释里特意分成读事件和写事件,方便新手看到状态怎么流转。
核心源码(逐字来自文末完整源码):
def service_conn(key: selectors.SelectorKey, mask: int) -> None:
sock: socket.socket = key.fileobj # type: ignore
data = key.data
if mask & selectors.EVENT_READ:
recv_data = sock.recv(256)
if recv_data:
msg = recv_data.decode()
chat_log.append(f"[{data.addr[1]}] {msg.strip()}")
data.outb += f"ECHO: {msg}".encode()
else:
sel.unregister(sock)
sock.close()
if mask & selectors.EVENT_WRITE:
if data.outb:
sent = sock.send(data.outb)
data.outb = data.outb[sent:]
可运行演示(补齐 Mock 数据与 print 反馈):
import selectors
import socket
import types
sel = selectors.DefaultSelector()
chat_log: list[str] = []
# Step 3:service_conn 是真正干活的服务员,读到消息后准备回显。
def service_conn(key: selectors.SelectorKey, mask: int) -> None:
sock: socket.socket = key.fileobj # type: ignore
data = key.data
if mask & selectors.EVENT_READ:
recv_data = sock.recv(256)
if recv_data:
msg = recv_data.decode()
chat_log.append(f"[{data.addr[1]}] {msg.strip()}")
data.outb += f"ECHO: {msg}".encode()
else:
sel.unregister(sock)
sock.close()
if mask & selectors.EVENT_WRITE:
if data.outb:
sent = sock.send(data.outb)
data.outb = data.outb[sent:]
server_sock, client_sock = socket.socketpair()
try:
server_sock.setblocking(False)
client_sock.sendall(b"hello selector\n")
data = types.SimpleNamespace(addr=("local", 12345), inb=b"", outb=b"")
key = selectors.SelectorKey(fileobj=server_sock, fd=server_sock.fileno(), events=0, data=data)
service_conn(key, selectors.EVENT_READ)
print("聊天日志:", chat_log)
print("待发送缓冲区:", data.outb.decode().strip())
service_conn(key, selectors.EVENT_WRITE)
print("客户端收到:", client_sock.recv(256).decode().strip())
finally:
server_sock.close(); client_sock.close(); sel.close()
Step 4:用 run_server 看懂单线程事件循环的骨架
痛点与机制:
run_server() 的关键不是代码长,而是事件循环:sel.select() 等待“谁就绪了”,如果是监听 socket 就接入新连接,如果是客户端 socket 就处理读写。一个线程能管理多个连接,靠的就是这个叫号机制。
核心源码(逐字来自文末完整源码):
def run_server(host: str, port: int, stop_event: threading.Event) -> None:
lsock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
lsock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
lsock.bind((host, port))
lsock.listen()
lsock.setblocking(False)
sel.register(lsock, selectors.EVENT_READ, data=None)
while not stop_event.is_set():
events = sel.select(timeout=0.1)
for key, mask in events:
if key.data is None:
accept_conn(key.fileobj) # type: ignore
else:
service_conn(key, mask)
lsock.close()
可运行演示(补齐 Mock 数据与 print 反馈):
import selectors
import socket
import threading
import types
sel = selectors.DefaultSelector()
chat_log: list[str] = []
def accept_conn(sock: socket.socket) -> None:
conn, addr = sock.accept()
conn.setblocking(False)
data = types.SimpleNamespace(addr=addr, inb=b"", outb=b"")
sel.register(conn, selectors.EVENT_READ | selectors.EVENT_WRITE, data=data)
def service_conn(key: selectors.SelectorKey, mask: int) -> None:
sock: socket.socket = key.fileobj # type: ignore
data = key.data
if mask & selectors.EVENT_READ:
recv_data = sock.recv(256)
if recv_data:
msg = recv_data.decode()
chat_log.append(f"[{data.addr[1]}] {msg.strip()}")
data.outb += f"ECHO: {msg}".encode()
else:
sel.unregister(sock)
sock.close()
if mask & selectors.EVENT_WRITE:
if data.outb:
sent = sock.send(data.outb)
data.outb = data.outb[sent:]
# Step 4:run_server 是事件循环,select 一次拿一批就绪事件处理。
def run_server(host: str, port: int, stop_event: threading.Event) -> None:
lsock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
lsock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
lsock.bind((host, port))
lsock.listen()
lsock.setblocking(False)
sel.register(lsock, selectors.EVENT_READ, data=None)
while not stop_event.is_set():
events = sel.select(timeout=0.1)
for key, mask in events:
if key.data is None:
accept_conn(key.fileobj) # type: ignore
else:
service_conn(key, mask)
lsock.close()
print("run_server 已定义")
print("核心循环: sel.select(timeout=0.1) -> accept_conn 或 service_conn")
print("这里不直接启动监听,避免端口权限影响页面运行。")
sel.close()
Step 5:用 run_client 看懂客户端消息发送流程
痛点与机制:
客户端流程比服务端简单:先 connect(),再 sendall(),然后 recv() 读回显。这里不直接连接端口,是为了让页面在受限环境也稳定运行;真正网络闭环放在完整脚本和 demo_chat() 里。
核心源码(逐字来自文末完整源码):
def run_client(host: str, port: int, client_id: int, messages: list[str]) -> None:
time.sleep(0.2) # 等服务端就绪
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.connect((host, port))
for msg in messages:
s.sendall(f"Client-{client_id}: {msg}\n".encode())
time.sleep(0.05)
s.recv(512) # 读回显
可运行演示(补齐 Mock 数据与 print 反馈):
import socket
import time
# Step 5:run_client 是真实客户端流程:连接、发送、读取回显。
def run_client(host: str, port: int, client_id: int, messages: list[str]) -> None:
time.sleep(0.2) # 等服务端就绪
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.connect((host, port))
for msg in messages:
s.sendall(f"Client-{client_id}: {msg}\n".encode())
time.sleep(0.05)
s.recv(512) # 读回显
messages = ["msg0", "msg1", "msg2"]
for i, msg in enumerate(messages, 1):
print(f"{i}. 准备发送: Client-1: {msg} -> 等待服务端 ECHO")
print("真实联网流程由 demo_chat() 统一启动;本步骤先读懂发送顺序。")
Step 6:用 demo_compare 对比阻塞等待和多路复用批处理
痛点与机制:
阻塞 IO 像一个服务员守着一桌客人等上菜,等完才去下一桌;多路复用像叫号器,哪桌好了就处理哪桌。demo_compare() 用 sleep 模拟等待差异,把“串行等待”和“批量处理就绪事件”的耗时打印出来。
核心源码(逐字来自文末完整源码):
def demo_compare() -> None:
"""模拟阻塞IO vs 多路复用的响应时间差异"""
def blocking_handler(n: int) -> float:
t0 = time.perf_counter()
for _ in range(n):
time.sleep(0.01) # 模拟阻塞等待
return time.perf_counter() - t0
def mux_handler(n: int) -> float:
"""用selectors事件循环模拟:就绪才处理,不等待"""
t0 = time.perf_counter()
ready_queue: list[int] = list(range(n))
processed = 0
while ready_queue:
# 模拟epoll_wait:批量取就绪事件
batch = ready_queue[:min(10, len(ready_queue))]
ready_queue = ready_queue[len(batch):]
for _ in batch:
time.sleep(0.001) # 仅处理时间,无等待
processed += len(batch)
return time.perf_counter() - t0
N = 20
t_block = blocking_handler(N)
t_mux = mux_handler(N)
print("\n=== IO模型耗时对比 ===")
print(f"{'模型':<16} {'连接数':<8} {'耗时(s)':<10} {'说明'}")
print("-" * 55)
print(f"{'阻塞IO':<16} {N:<8} {t_block:<10.3f} 串行等待每个IO完成")
print(f"{'IO多路复用':<14} {N:<8} {t_mux:<10.3f} 批量处理就绪事件")
print(f"\n加速比: {t_block / t_mux:.1f}x")
print(f"\n当前平台选择器: {type(sel).__name__}")
可运行演示(补齐 Mock 数据与 print 反馈):
import selectors
import time
sel = selectors.DefaultSelector()
# Step 6:这个函数用时间模拟说明:多路复用只处理“已就绪”的事件。
def demo_compare() -> None:
"""模拟阻塞IO vs 多路复用的响应时间差异"""
def blocking_handler(n: int) -> float:
t0 = time.perf_counter()
for _ in range(n):
time.sleep(0.01) # 模拟阻塞等待
return time.perf_counter() - t0
def mux_handler(n: int) -> float:
"""用selectors事件循环模拟:就绪才处理,不等待"""
t0 = time.perf_counter()
ready_queue: list[int] = list(range(n))
processed = 0
while ready_queue:
# 模拟epoll_wait:批量取就绪事件
batch = ready_queue[:min(10, len(ready_queue))]
ready_queue = ready_queue[len(batch):]
for _ in batch:
time.sleep(0.001) # 仅处理时间,无等待
processed += len(batch)
return time.perf_counter() - t0
N = 20
t_block = blocking_handler(N)
t_mux = mux_handler(N)
print("\n=== IO模型耗时对比 ===")
print(f"{'模型':<16} {'连接数':<8} {'耗时(s)':<10} {'说明'}")
print("-" * 55)
print(f"{'阻塞IO':<16} {N:<8} {t_block:<10.3f} 串行等待每个IO完成")
print(f"{'IO多路复用':<14} {N:<8} {t_mux:<10.3f} 批量处理就绪事件")
print(f"\n加速比: {t_block / t_mux:.1f}x")
print(f"\n当前平台选择器: {type(sel).__name__}")
demo_compare()
sel.close()
Step 7:用 print_theory 速记 select/poll/epoll/kqueue 差异
痛点与机制:
select、poll、epoll、kqueue 都是在解决“我怎么知道哪个连接有数据”的问题。Python 的 selectors 把平台差异包起来,让我们写一套代码,在不同系统上自动选合适后端。
核心源码(逐字来自文末完整源码):
def print_theory() -> None:
print("""
=== IO多路复用核心概念速查 ===
┌──────────┬────────────┬────────────┬────────────┐
│ 系统调用 │ 最大fd数 │ 时间复杂度 │ 平台 │
├──────────┼────────────┼────────────┼────────────┤
│ select │ 1024 │ O(n) │ 全平台 │
│ poll │ 无限制 │ O(n) │ Linux/Mac │
│ epoll │ 无限制 │ O(1) │ Linux only │
│ kqueue │ 无限制 │ O(1) │ macOS/BSD │
└──────────┴────────────┴────────────┴────────────┘
Python selectors.DefaultSelector() 自动选最优实现。
""")
可运行演示(补齐 Mock 数据与 print 反馈):
# Step 7:理论速查表帮你把几个系统调用的差别一次看清。
def print_theory() -> None:
print("""
=== IO多路复用核心概念速查 ===
┌──────────┬────────────┬────────────┬────────────┐
│ 系统调用 │ 最大fd数 │ 时间复杂度 │ 平台 │
├──────────┼────────────┼────────────┼────────────┤
│ select │ 1024 │ O(n) │ 全平台 │
│ poll │ 无限制 │ O(n) │ Linux/Mac │
│ epoll │ 无限制 │ O(1) │ Linux only │
│ kqueue │ 无限制 │ O(1) │ macOS/BSD │
└──────────┴────────────┴────────────┴────────────┘
Python selectors.DefaultSelector() 自动选最优实现。
""")
print_theory()
Step 8:用 main 做 chat/compare/theory 三种模式总入口
痛点与机制:
main() 把教学入口收束到 --mode 参数:theory 看表,compare 看耗时对比,chat 跑真实多客户端聊天。由于网页运行环境可能限制端口,这里的演示只跑不会绑定端口的模式。
核心源码(逐字来自文末完整源码):
def main() -> None:
parser = argparse.ArgumentParser(description="IO多路复用演示")
parser.add_argument(
"--mode",
choices=["chat", "compare", "theory"],
default="theory",
help="chat=聊天服务器演示, compare=耗时对比, theory=原理速查",
)
args = parser.parse_args()
if args.mode == "chat":
demo_chat()
elif args.mode == "compare":
demo_compare()
else:
print_theory()
可运行演示(补齐 Mock 数据与 print 反馈):
import argparse
import selectors
import time
sel = selectors.DefaultSelector()
def print_theory() -> None:
print("""
=== IO多路复用核心概念速查 ===
┌──────────┬────────────┬────────────┬────────────┐
│ 系统调用 │ 最大fd数 │ 时间复杂度 │ 平台 │
├──────────┼────────────┼────────────┼────────────┤
│ select │ 1024 │ O(n) │ 全平台 │
│ poll │ 无限制 │ O(n) │ Linux/Mac │
│ epoll │ 无限制 │ O(1) │ Linux only │
│ kqueue │ 无限制 │ O(1) │ macOS/BSD │
└──────────┴────────────┴────────────┴────────────┘
Python selectors.DefaultSelector() 自动选最优实现。
""")
def demo_compare() -> None:
"""模拟阻塞IO vs 多路复用的响应时间差异"""
def blocking_handler(n: int) -> float:
t0 = time.perf_counter()
for _ in range(n):
time.sleep(0.01) # 模拟阻塞等待
return time.perf_counter() - t0
def mux_handler(n: int) -> float:
"""用selectors事件循环模拟:就绪才处理,不等待"""
t0 = time.perf_counter()
ready_queue: list[int] = list(range(n))
processed = 0
while ready_queue:
# 模拟epoll_wait:批量取就绪事件
batch = ready_queue[:min(10, len(ready_queue))]
ready_queue = ready_queue[len(batch):]
for _ in batch:
time.sleep(0.001) # 仅处理时间,无等待
processed += len(batch)
return time.perf_counter() - t0
N = 20
t_block = blocking_handler(N)
t_mux = mux_handler(N)
print("\n=== IO模型耗时对比 ===")
print(f"{'模型':<16} {'连接数':<8} {'耗时(s)':<10} {'说明'}")
print("-" * 55)
print(f"{'阻塞IO':<16} {N:<8} {t_block:<10.3f} 串行等待每个IO完成")
print(f"{'IO多路复用':<14} {N:<8} {t_mux:<10.3f} 批量处理就绪事件")
print(f"\n加速比: {t_block / t_mux:.1f}x")
print(f"\n当前平台选择器: {type(sel).__name__}")
# Step 8:main 像遥控器,--mode 决定看聊天、对比还是理论表。
def main() -> None:
parser = argparse.ArgumentParser(description="IO多路复用演示")
parser.add_argument(
"--mode",
choices=["chat", "compare", "theory"],
default="theory",
help="chat=聊天服务器演示, compare=耗时对比, theory=原理速查",
)
args = parser.parse_args()
if args.mode == "chat":
demo_chat()
elif args.mode == "compare":
demo_compare()
else:
print_theory()
import sys
for mode in ["theory", "compare"]:
print(f"\n>>> mode={mode}")
sys.argv = ["prog", "--mode", mode]
main()
print("提示: chat 模式会真实绑定本机端口,适合在本地终端单独运行。")
sel.close()
极客实战:完整源码与运行
现在,把上面的积木拼起来,将以下完整代码放进你的编辑器,运行它。先看整体闭环,再回头逐段改参数,你会更容易建立工程直觉。
# 文件名: 13-python-iomux.py
# 运行: python3 13-python-iomux.py --mode chat
# python3 13-python-iomux.py --mode compare
import argparse
import selectors
import socket
import threading
import time
import types
from typing import Any
sel = selectors.DefaultSelector()
chat_log: list[str] = []
# ── 服务端逻辑 ──────────────────────────────────────────────
def accept_conn(sock: socket.socket) -> None:
conn, addr = sock.accept()
conn.setblocking(False)
data = types.SimpleNamespace(addr=addr, inb=b"", outb=b"")
sel.register(conn, selectors.EVENT_READ | selectors.EVENT_WRITE, data=data)
def service_conn(key: selectors.SelectorKey, mask: int) -> None:
sock: socket.socket = key.fileobj # type: ignore
data = key.data
if mask & selectors.EVENT_READ:
recv_data = sock.recv(256)
if recv_data:
msg = recv_data.decode()
chat_log.append(f"[{data.addr[1]}] {msg.strip()}")
data.outb += f"ECHO: {msg}".encode()
else:
sel.unregister(sock)
sock.close()
if mask & selectors.EVENT_WRITE:
if data.outb:
sent = sock.send(data.outb)
data.outb = data.outb[sent:]
def run_server(host: str, port: int, stop_event: threading.Event) -> None:
lsock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
lsock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
lsock.bind((host, port))
lsock.listen()
lsock.setblocking(False)
sel.register(lsock, selectors.EVENT_READ, data=None)
while not stop_event.is_set():
events = sel.select(timeout=0.1)
for key, mask in events:
if key.data is None:
accept_conn(key.fileobj) # type: ignore
else:
service_conn(key, mask)
lsock.close()
# ── 模拟客户端 ──────────────────────────────────────────────
def run_client(host: str, port: int, client_id: int, messages: list[str]) -> None:
time.sleep(0.2) # 等服务端就绪
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.connect((host, port))
for msg in messages:
s.sendall(f"Client-{client_id}: {msg}\n".encode())
time.sleep(0.05)
s.recv(512) # 读回显
def demo_chat() -> None:
HOST, PORT = "127.0.0.1", 19876
stop = threading.Event()
server_thread = threading.Thread(target=run_server, args=(HOST, PORT, stop), daemon=True)
server_thread.start()
clients = [
threading.Thread(target=run_client, args=(HOST, PORT, i, [f"msg{j}" for j in range(3)]))
for i in range(1, 4)
]
for c in clients:
c.start()
for c in clients:
c.join()
time.sleep(0.3)
stop.set()
server_thread.join(timeout=2)
print("\n=== selectors 多客户端聊天演示 ===")
print(f"{'序号':<4} {'消息'}")
print("-" * 40)
for i, line in enumerate(chat_log, 1):
print(f"{i:<4} {line}")
print(f"\n共处理 {len(chat_log)} 条消息,单线程服务端,零阻塞")
# ── IO模型耗时对比 ──────────────────────────────────────────
def demo_compare() -> None:
"""模拟阻塞IO vs 多路复用的响应时间差异"""
def blocking_handler(n: int) -> float:
t0 = time.perf_counter()
for _ in range(n):
time.sleep(0.01) # 模拟阻塞等待
return time.perf_counter() - t0
def mux_handler(n: int) -> float:
"""用selectors事件循环模拟:就绪才处理,不等待"""
t0 = time.perf_counter()
ready_queue: list[int] = list(range(n))
processed = 0
while ready_queue:
# 模拟epoll_wait:批量取就绪事件
batch = ready_queue[:min(10, len(ready_queue))]
ready_queue = ready_queue[len(batch):]
for _ in batch:
time.sleep(0.001) # 仅处理时间,无等待
processed += len(batch)
return time.perf_counter() - t0
N = 20
t_block = blocking_handler(N)
t_mux = mux_handler(N)
print("\n=== IO模型耗时对比 ===")
print(f"{'模型':<16} {'连接数':<8} {'耗时(s)':<10} {'说明'}")
print("-" * 55)
print(f"{'阻塞IO':<16} {N:<8} {t_block:<10.3f} 串行等待每个IO完成")
print(f"{'IO多路复用':<14} {N:<8} {t_mux:<10.3f} 批量处理就绪事件")
print(f"\n加速比: {t_block / t_mux:.1f}x")
print(f"\n当前平台选择器: {type(sel).__name__}")
def print_theory() -> None:
print("""
=== IO多路复用核心概念速查 ===
┌──────────┬────────────┬────────────┬────────────┐
│ 系统调用 │ 最大fd数 │ 时间复杂度 │ 平台 │
├──────────┼────────────┼────────────┼────────────┤
│ select │ 1024 │ O(n) │ 全平台 │
│ poll │ 无限制 │ O(n) │ Linux/Mac │
│ epoll │ 无限制 │ O(1) │ Linux only │
│ kqueue │ 无限制 │ O(1) │ macOS/BSD │
└──────────┴────────────┴────────────┴────────────┘
Python selectors.DefaultSelector() 自动选最优实现。
""")
def main() -> None:
parser = argparse.ArgumentParser(description="IO多路复用演示")
parser.add_argument(
"--mode",
choices=["chat", "compare", "theory"],
default="theory",
help="chat=聊天服务器演示, compare=耗时对比, theory=原理速查",
)
args = parser.parse_args()
if args.mode == "chat":
demo_chat()
elif args.mode == "compare":
demo_compare()
else:
print_theory()
if __name__ == "__main__":
main()
运行后你会看到类似输出:
$ python3 13-python-iomux.py
=== IO多路复用核心概念速查 ===
┌──────────┬────────────┬────────────┬────────────┐
│ 系统调用 │ 最大fd数 │ 时间复杂度 │ 平台 │
├──────────┼────────────┼────────────┼────────────┤
│ select │ 1024 │ O(n) │ 全平台 │
│ poll │ 无限制 │ O(n) │ Linux/Mac │
│ epoll │ 无限制 │ O(1) │ Linux only │
│ kqueue │ 无限制 │ O(1) │ macOS/BSD │
└──────────┴────────────┴────────────┴────────────┘
Python selectors.DefaultSelector() 自动选最优实现。
6. 运行示例
python3 13-python-iomux.py --mode theory # 原理速查表
python3 13-python-iomux.py --mode chat # 多客户端聊天演示
python3 13-python-iomux.py --mode compare # 阻塞vs多路复用耗时对比
输出示例(chat 模式):
=== selectors 多客户端聊天演示 ===
序号 消息
----------------------------------------
1 [xxxxx] Client-1: msg0
2 [xxxxx] Client-2: msg0
3 [xxxxx] Client-3: msg0
...
共处理 9 条消息,单线程服务端,零阻塞
7. 关键要点
selectors.DefaultSelector()跨平台,生产代码首选- 注册 fd 时同时监听
EVENT_READ | EVENT_WRITE,写缓冲为空时应取消写监听(避免空转) - epoll ET 模式需循环读到
EAGAIN,否则会漏事件 select的 1024 fd 上限是编译期常量,高并发场景必须换 epoll/kqueue
⏱ NexDo Time(5 分钟)
- 改造写监听:修改
service_conn,当data.outb为空时用sel.modify()取消EVENT_WRITE监听,观察 CPU 占用变化。 - 统计连接数:在服务端维护一个
active_connections: int计数器,连接建立 +1,断开 -1,每秒打印一次。 - 压力测试:把客户端数量从 3 改到 50,观察单线程服务端是否仍能正常处理所有消息。
Don’t wait for next time, do it in the next moment.