文章

13 · IO 多路复用:select/epoll 剖析

#013 · 2026-04-16 · Python

🔗 知识图谱导航:阅读本文前,建议先掌握/回顾 《12 · 多进程与多线程:榨干 CPU 性能》 中的核心概念;本文会在这个基础上继续推进。 上一篇我们掌握了多线程与多进程的并发模型;本篇深入操作系统层,剖析 IO 多路复用如何用单线程驾驭海量连接。

极客解析:并发不是把代码写复杂,而是把等待、调度和资源隔离讲清楚;本文用本机可运行的 Mock 场景验证机制。

痛点与架构:单独记 API 或概念很容易学完就忘;本文先锁定真实痛点,再把它拆成“输入数据 → 核心机制 → 可运行输出”三段闭环。从阻塞IO到epoll,彻底搞懂IO多路复用原理,用selectors模块实现多客户端聊天服务器。

1. 三种 IO 模型对比

┌─────────────────┬──────────────────┬──────────────────┬──────────────────┐
│ 模型            │ 阻塞IO           │ 非阻塞IO         │ IO多路复用       │
├─────────────────┼──────────────────┼──────────────────┼──────────────────┤
│ 等待数据时      │ 线程挂起         │ 轮询返回EAGAIN   │ select/epoll等待 │
│ 拷贝数据时      │ 阻塞             │ 阻塞             │ 阻塞             │
│ CPU利用率       │ 低(睡眠)       │ 高(空转)       │ 高(事件驱动)   │
│ 适合连接数      │ 少量             │ 少量             │ 海量(C10K+)      │
│ 编程复杂度      │ 低               │ 中               │ 中               │
│ 典型代表        │ 传统BIO          │ NIO轮询          │ Nginx/Redis      │
└─────────────────┴──────────────────┴──────────────────┴──────────────────┘

2. select 原理

用户空间                    内核空间
┌──────────┐               ┌─────────────────────────┐
│ fd_set   │──copy──▶      │  遍历所有fd (O(n))       │
│ [1,3,7,] │               │  fd1: 无数据             │
│          │               │  fd3: 有数据 ✓           │
│          │◀──copy──       │  fd7: 无数据             │
│ [0,1,0,] │               └─────────────────────────┘
└──────────┘
     │
     ▼
应用层再次遍历fd_set找到就绪fd

缺点:
  • fd数量上限 1024(FD_SETSIZE)
  • 每次调用都要把fd_set从用户态拷贝到内核态
  • 内核返回后应用层还要O(n)扫描

3. epoll 原理(Linux)

epoll_create()
     │
     ▼
┌─────────────┐    epoll_ctl(ADD)    ┌──────────────────┐
│  epoll实例  │◀────────────────────│  注册感兴趣的fd   │
│  (红黑树)   │                      └──────────────────┘
└──────┬──────┘
       │ 内核回调(数据就绪时)
       ▼
┌─────────────┐    epoll_wait()      ┌──────────────────┐
│  就绪链表   │────────────────────▶│  只返回就绪的fd  │
│  (O(1)返回) │                      │  无需遍历全部    │
└─────────────┘                      └──────────────────┘

触发模式:
  LT(水平触发):缓冲区有数据就一直通知,默认模式,不易漏事件
  ET(边缘触发):仅在状态变化时通知一次,需一次读完,性能更高

4. selectors 模块(跨平台)

Python selectors 自动选择当前平台最优实现:Linux→epoll,macOS→kqueue,Windows→select。

5. 实战代码

步步为营:核心逻辑自适应拆解

IO 多路复用可以先想成“一个服务员看一块叫号屏”:不用给每个客人配一个服务员,也不用挨桌空问,谁准备好就处理谁。下面每段演示都带注释,先用不占端口的方式讲清机制,再保留完整源码里的真实聊天服务器。

Step 1:先创建 selector 和聊天日志,准备事件总控台

痛点与机制

selectors.DefaultSelector() 会按系统自动选择最佳实现:Linux 常见是 epoll,macOS 常见是 kqueue。你可以把 selector 理解成“事件叫号器”:程序不用挨个问每个连接有没有数据,而是等叫号器告诉你哪个连接准备好了。

核心源码(逐字来自文末完整源码)

sel = selectors.DefaultSelector()
chat_log: list[str] = []

可运行演示(补齐 Mock 数据与 print 反馈)

import selectors


sel = selectors.DefaultSelector()
chat_log: list[str] = []

print("当前选择器类型:", type(sel).__name__)
print("初始注册fd数量:", len(sel.get_map()))
print("聊天日志条数:", len(chat_log))
sel.close()

Step 2:用 accept_conn 理解新连接如何登记到 selector

痛点与机制

accept_conn() 是“新客入座”的流程:服务端接到连接,把连接改成非阻塞,再把这个连接注册到 selector。这样后续不用开一个线程死等这个客户端,selector 会在它可读或可写时通知我们。

核心源码(逐字来自文末完整源码)

def accept_conn(sock: socket.socket) -> None:
    conn, addr = sock.accept()
    conn.setblocking(False)
    data = types.SimpleNamespace(addr=addr, inb=b"", outb=b"")
    sel.register(conn, selectors.EVENT_READ | selectors.EVENT_WRITE, data=data)

可运行演示(补齐 Mock 数据与 print 反馈)

import selectors
import socket
import types

sel = selectors.DefaultSelector()
chat_log: list[str] = []

# Step 2:accept_conn 负责把新来的客户端登记进 selector。
def accept_conn(sock: socket.socket) -> None:
    conn, addr = sock.accept()
    conn.setblocking(False)
    data = types.SimpleNamespace(addr=addr, inb=b"", outb=b"")
    sel.register(conn, selectors.EVENT_READ | selectors.EVENT_WRITE, data=data)

print("accept_conn 已定义")
print("它做三件事: accept连接 -> 设为非阻塞 -> register读写事件")
print("演示环境不绑定真实端口,避免浏览器沙箱权限问题。")
sel.close()

Step 3:用 service_conn 处理读写事件,完成一次回显

痛点与机制

mask 是事件标签:有 EVENT_READ 就读数据,有 EVENT_WRITE 就写数据。这里用 socketpair() 造一对本地 socket,不占端口也能演示“客户端发消息 -> 服务端读 -> 写回显”。注释里特意分成读事件和写事件,方便新手看到状态怎么流转。

核心源码(逐字来自文末完整源码)

def service_conn(key: selectors.SelectorKey, mask: int) -> None:
    sock: socket.socket = key.fileobj  # type: ignore
    data = key.data
    if mask & selectors.EVENT_READ:
        recv_data = sock.recv(256)
        if recv_data:
            msg = recv_data.decode()
            chat_log.append(f"[{data.addr[1]}] {msg.strip()}")
            data.outb += f"ECHO: {msg}".encode()
        else:
            sel.unregister(sock)
            sock.close()
    if mask & selectors.EVENT_WRITE:
        if data.outb:
            sent = sock.send(data.outb)
            data.outb = data.outb[sent:]

可运行演示(补齐 Mock 数据与 print 反馈)

import selectors
import socket
import types

sel = selectors.DefaultSelector()
chat_log: list[str] = []

# Step 3:service_conn 是真正干活的服务员,读到消息后准备回显。
def service_conn(key: selectors.SelectorKey, mask: int) -> None:
    sock: socket.socket = key.fileobj  # type: ignore
    data = key.data
    if mask & selectors.EVENT_READ:
        recv_data = sock.recv(256)
        if recv_data:
            msg = recv_data.decode()
            chat_log.append(f"[{data.addr[1]}] {msg.strip()}")
            data.outb += f"ECHO: {msg}".encode()
        else:
            sel.unregister(sock)
            sock.close()
    if mask & selectors.EVENT_WRITE:
        if data.outb:
            sent = sock.send(data.outb)
            data.outb = data.outb[sent:]

server_sock, client_sock = socket.socketpair()
try:
    server_sock.setblocking(False)
    client_sock.sendall(b"hello selector\n")
    data = types.SimpleNamespace(addr=("local", 12345), inb=b"", outb=b"")
    key = selectors.SelectorKey(fileobj=server_sock, fd=server_sock.fileno(), events=0, data=data)
    service_conn(key, selectors.EVENT_READ)
    print("聊天日志:", chat_log)
    print("待发送缓冲区:", data.outb.decode().strip())
    service_conn(key, selectors.EVENT_WRITE)
    print("客户端收到:", client_sock.recv(256).decode().strip())
finally:
    server_sock.close(); client_sock.close(); sel.close()

Step 4:用 run_server 看懂单线程事件循环的骨架

痛点与机制

run_server() 的关键不是代码长,而是事件循环:sel.select() 等待“谁就绪了”,如果是监听 socket 就接入新连接,如果是客户端 socket 就处理读写。一个线程能管理多个连接,靠的就是这个叫号机制。

核心源码(逐字来自文末完整源码)

def run_server(host: str, port: int, stop_event: threading.Event) -> None:
    lsock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    lsock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    lsock.bind((host, port))
    lsock.listen()
    lsock.setblocking(False)
    sel.register(lsock, selectors.EVENT_READ, data=None)
    while not stop_event.is_set():
        events = sel.select(timeout=0.1)
        for key, mask in events:
            if key.data is None:
                accept_conn(key.fileobj)  # type: ignore
            else:
                service_conn(key, mask)
    lsock.close()

可运行演示(补齐 Mock 数据与 print 反馈)

import selectors
import socket
import threading
import types

sel = selectors.DefaultSelector()
chat_log: list[str] = []

def accept_conn(sock: socket.socket) -> None:
    conn, addr = sock.accept()
    conn.setblocking(False)
    data = types.SimpleNamespace(addr=addr, inb=b"", outb=b"")
    sel.register(conn, selectors.EVENT_READ | selectors.EVENT_WRITE, data=data)

def service_conn(key: selectors.SelectorKey, mask: int) -> None:
    sock: socket.socket = key.fileobj  # type: ignore
    data = key.data
    if mask & selectors.EVENT_READ:
        recv_data = sock.recv(256)
        if recv_data:
            msg = recv_data.decode()
            chat_log.append(f"[{data.addr[1]}] {msg.strip()}")
            data.outb += f"ECHO: {msg}".encode()
        else:
            sel.unregister(sock)
            sock.close()
    if mask & selectors.EVENT_WRITE:
        if data.outb:
            sent = sock.send(data.outb)
            data.outb = data.outb[sent:]

# Step 4:run_server 是事件循环,select 一次拿一批就绪事件处理。
def run_server(host: str, port: int, stop_event: threading.Event) -> None:
    lsock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    lsock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    lsock.bind((host, port))
    lsock.listen()
    lsock.setblocking(False)
    sel.register(lsock, selectors.EVENT_READ, data=None)
    while not stop_event.is_set():
        events = sel.select(timeout=0.1)
        for key, mask in events:
            if key.data is None:
                accept_conn(key.fileobj)  # type: ignore
            else:
                service_conn(key, mask)
    lsock.close()

print("run_server 已定义")
print("核心循环: sel.select(timeout=0.1) -> accept_conn 或 service_conn")
print("这里不直接启动监听,避免端口权限影响页面运行。")
sel.close()

Step 5:用 run_client 看懂客户端消息发送流程

痛点与机制

客户端流程比服务端简单:先 connect(),再 sendall(),然后 recv() 读回显。这里不直接连接端口,是为了让页面在受限环境也稳定运行;真正网络闭环放在完整脚本和 demo_chat() 里。

核心源码(逐字来自文末完整源码)

def run_client(host: str, port: int, client_id: int, messages: list[str]) -> None:
    time.sleep(0.2)  # 等服务端就绪
    with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
        s.connect((host, port))
        for msg in messages:
            s.sendall(f"Client-{client_id}: {msg}\n".encode())
            time.sleep(0.05)
            s.recv(512)  # 读回显

可运行演示(补齐 Mock 数据与 print 反馈)

import socket
import time

# Step 5:run_client 是真实客户端流程:连接、发送、读取回显。
def run_client(host: str, port: int, client_id: int, messages: list[str]) -> None:
    time.sleep(0.2)  # 等服务端就绪
    with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
        s.connect((host, port))
        for msg in messages:
            s.sendall(f"Client-{client_id}: {msg}\n".encode())
            time.sleep(0.05)
            s.recv(512)  # 读回显

messages = ["msg0", "msg1", "msg2"]
for i, msg in enumerate(messages, 1):
    print(f"{i}. 准备发送: Client-1: {msg} -> 等待服务端 ECHO")
print("真实联网流程由 demo_chat() 统一启动;本步骤先读懂发送顺序。")

Step 6:用 demo_compare 对比阻塞等待和多路复用批处理

痛点与机制

阻塞 IO 像一个服务员守着一桌客人等上菜,等完才去下一桌;多路复用像叫号器,哪桌好了就处理哪桌。demo_compare() 用 sleep 模拟等待差异,把“串行等待”和“批量处理就绪事件”的耗时打印出来。

核心源码(逐字来自文末完整源码)

def demo_compare() -> None:
    """模拟阻塞IO vs 多路复用的响应时间差异"""

    def blocking_handler(n: int) -> float:
        t0 = time.perf_counter()
        for _ in range(n):
            time.sleep(0.01)  # 模拟阻塞等待
        return time.perf_counter() - t0

    def mux_handler(n: int) -> float:
        """用selectors事件循环模拟:就绪才处理,不等待"""
        t0 = time.perf_counter()
        ready_queue: list[int] = list(range(n))
        processed = 0
        while ready_queue:
            # 模拟epoll_wait:批量取就绪事件
            batch = ready_queue[:min(10, len(ready_queue))]
            ready_queue = ready_queue[len(batch):]
            for _ in batch:
                time.sleep(0.001)  # 仅处理时间,无等待
            processed += len(batch)
        return time.perf_counter() - t0

    N = 20
    t_block = blocking_handler(N)
    t_mux = mux_handler(N)

    print("\n=== IO模型耗时对比 ===")
    print(f"{'模型':<16} {'连接数':<8} {'耗时(s)':<10} {'说明'}")
    print("-" * 55)
    print(f"{'阻塞IO':<16} {N:<8} {t_block:<10.3f} 串行等待每个IO完成")
    print(f"{'IO多路复用':<14} {N:<8} {t_mux:<10.3f} 批量处理就绪事件")
    print(f"\n加速比: {t_block / t_mux:.1f}x")
    print(f"\n当前平台选择器: {type(sel).__name__}")

可运行演示(补齐 Mock 数据与 print 反馈)

import selectors
import time

sel = selectors.DefaultSelector()

# Step 6:这个函数用时间模拟说明:多路复用只处理“已就绪”的事件。
def demo_compare() -> None:
    """模拟阻塞IO vs 多路复用的响应时间差异"""

    def blocking_handler(n: int) -> float:
        t0 = time.perf_counter()
        for _ in range(n):
            time.sleep(0.01)  # 模拟阻塞等待
        return time.perf_counter() - t0

    def mux_handler(n: int) -> float:
        """用selectors事件循环模拟:就绪才处理,不等待"""
        t0 = time.perf_counter()
        ready_queue: list[int] = list(range(n))
        processed = 0
        while ready_queue:
            # 模拟epoll_wait:批量取就绪事件
            batch = ready_queue[:min(10, len(ready_queue))]
            ready_queue = ready_queue[len(batch):]
            for _ in batch:
                time.sleep(0.001)  # 仅处理时间,无等待
            processed += len(batch)
        return time.perf_counter() - t0

    N = 20
    t_block = blocking_handler(N)
    t_mux = mux_handler(N)

    print("\n=== IO模型耗时对比 ===")
    print(f"{'模型':<16} {'连接数':<8} {'耗时(s)':<10} {'说明'}")
    print("-" * 55)
    print(f"{'阻塞IO':<16} {N:<8} {t_block:<10.3f} 串行等待每个IO完成")
    print(f"{'IO多路复用':<14} {N:<8} {t_mux:<10.3f} 批量处理就绪事件")
    print(f"\n加速比: {t_block / t_mux:.1f}x")
    print(f"\n当前平台选择器: {type(sel).__name__}")

demo_compare()
sel.close()

Step 7:用 print_theory 速记 select/poll/epoll/kqueue 差异

痛点与机制

selectpollepollkqueue 都是在解决“我怎么知道哪个连接有数据”的问题。Python 的 selectors 把平台差异包起来,让我们写一套代码,在不同系统上自动选合适后端。

核心源码(逐字来自文末完整源码)

def print_theory() -> None:
    print("""
=== IO多路复用核心概念速查 ===

┌──────────┬────────────┬────────────┬────────────┐
│ 系统调用 │ 最大fd数   │ 时间复杂度 │ 平台       │
├──────────┼────────────┼────────────┼────────────┤
│ select   │ 1024       │ O(n)       │ 全平台     │
│ poll     │ 无限制     │ O(n)       │ Linux/Mac  │
│ epoll    │ 无限制     │ O(1)       │ Linux only │
│ kqueue   │ 无限制     │ O(1)       │ macOS/BSD  │
└──────────┴────────────┴────────────┴────────────┘

Python selectors.DefaultSelector() 自动选最优实现。
""")

可运行演示(补齐 Mock 数据与 print 反馈)

# Step 7:理论速查表帮你把几个系统调用的差别一次看清。
def print_theory() -> None:
    print("""
=== IO多路复用核心概念速查 ===

┌──────────┬────────────┬────────────┬────────────┐
│ 系统调用 │ 最大fd数   │ 时间复杂度 │ 平台       │
├──────────┼────────────┼────────────┼────────────┤
│ select   │ 1024       │ O(n)       │ 全平台     │
│ poll     │ 无限制     │ O(n)       │ Linux/Mac  │
│ epoll    │ 无限制     │ O(1)       │ Linux only │
│ kqueue   │ 无限制     │ O(1)       │ macOS/BSD  │
└──────────┴────────────┴────────────┴────────────┘

Python selectors.DefaultSelector() 自动选最优实现。
""")

print_theory()

Step 8:用 main 做 chat/compare/theory 三种模式总入口

痛点与机制

main() 把教学入口收束到 --mode 参数:theory 看表,compare 看耗时对比,chat 跑真实多客户端聊天。由于网页运行环境可能限制端口,这里的演示只跑不会绑定端口的模式。

核心源码(逐字来自文末完整源码)

def main() -> None:
    parser = argparse.ArgumentParser(description="IO多路复用演示")
    parser.add_argument(
        "--mode",
        choices=["chat", "compare", "theory"],
        default="theory",
        help="chat=聊天服务器演示, compare=耗时对比, theory=原理速查",
    )
    args = parser.parse_args()

    if args.mode == "chat":
        demo_chat()
    elif args.mode == "compare":
        demo_compare()
    else:
        print_theory()

可运行演示(补齐 Mock 数据与 print 反馈)

import argparse
import selectors
import time

sel = selectors.DefaultSelector()

def print_theory() -> None:
    print("""
=== IO多路复用核心概念速查 ===

┌──────────┬────────────┬────────────┬────────────┐
│ 系统调用 │ 最大fd数   │ 时间复杂度 │ 平台       │
├──────────┼────────────┼────────────┼────────────┤
│ select   │ 1024       │ O(n)       │ 全平台     │
│ poll     │ 无限制     │ O(n)       │ Linux/Mac  │
│ epoll    │ 无限制     │ O(1)       │ Linux only │
│ kqueue   │ 无限制     │ O(1)       │ macOS/BSD  │
└──────────┴────────────┴────────────┴────────────┘

Python selectors.DefaultSelector() 自动选最优实现。
""")

def demo_compare() -> None:
    """模拟阻塞IO vs 多路复用的响应时间差异"""

    def blocking_handler(n: int) -> float:
        t0 = time.perf_counter()
        for _ in range(n):
            time.sleep(0.01)  # 模拟阻塞等待
        return time.perf_counter() - t0

    def mux_handler(n: int) -> float:
        """用selectors事件循环模拟:就绪才处理,不等待"""
        t0 = time.perf_counter()
        ready_queue: list[int] = list(range(n))
        processed = 0
        while ready_queue:
            # 模拟epoll_wait:批量取就绪事件
            batch = ready_queue[:min(10, len(ready_queue))]
            ready_queue = ready_queue[len(batch):]
            for _ in batch:
                time.sleep(0.001)  # 仅处理时间,无等待
            processed += len(batch)
        return time.perf_counter() - t0

    N = 20
    t_block = blocking_handler(N)
    t_mux = mux_handler(N)

    print("\n=== IO模型耗时对比 ===")
    print(f"{'模型':<16} {'连接数':<8} {'耗时(s)':<10} {'说明'}")
    print("-" * 55)
    print(f"{'阻塞IO':<16} {N:<8} {t_block:<10.3f} 串行等待每个IO完成")
    print(f"{'IO多路复用':<14} {N:<8} {t_mux:<10.3f} 批量处理就绪事件")
    print(f"\n加速比: {t_block / t_mux:.1f}x")
    print(f"\n当前平台选择器: {type(sel).__name__}")

# Step 8:main 像遥控器,--mode 决定看聊天、对比还是理论表。
def main() -> None:
    parser = argparse.ArgumentParser(description="IO多路复用演示")
    parser.add_argument(
        "--mode",
        choices=["chat", "compare", "theory"],
        default="theory",
        help="chat=聊天服务器演示, compare=耗时对比, theory=原理速查",
    )
    args = parser.parse_args()

    if args.mode == "chat":
        demo_chat()
    elif args.mode == "compare":
        demo_compare()
    else:
        print_theory()

import sys
for mode in ["theory", "compare"]:
    print(f"\n>>> mode={mode}")
    sys.argv = ["prog", "--mode", mode]
    main()
print("提示: chat 模式会真实绑定本机端口,适合在本地终端单独运行。")
sel.close()

极客实战:完整源码与运行

现在,把上面的积木拼起来,将以下完整代码放进你的编辑器,运行它。先看整体闭环,再回头逐段改参数,你会更容易建立工程直觉。

# 文件名: 13-python-iomux.py




# 运行: python3 13-python-iomux.py --mode chat
#       python3 13-python-iomux.py --mode compare

import argparse
import selectors
import socket
import threading
import time
import types
from typing import Any

sel = selectors.DefaultSelector()
chat_log: list[str] = []


# ── 服务端逻辑 ──────────────────────────────────────────────
def accept_conn(sock: socket.socket) -> None:
    conn, addr = sock.accept()
    conn.setblocking(False)
    data = types.SimpleNamespace(addr=addr, inb=b"", outb=b"")
    sel.register(conn, selectors.EVENT_READ | selectors.EVENT_WRITE, data=data)


def service_conn(key: selectors.SelectorKey, mask: int) -> None:
    sock: socket.socket = key.fileobj  # type: ignore
    data = key.data
    if mask & selectors.EVENT_READ:
        recv_data = sock.recv(256)
        if recv_data:
            msg = recv_data.decode()
            chat_log.append(f"[{data.addr[1]}] {msg.strip()}")
            data.outb += f"ECHO: {msg}".encode()
        else:
            sel.unregister(sock)
            sock.close()
    if mask & selectors.EVENT_WRITE:
        if data.outb:
            sent = sock.send(data.outb)
            data.outb = data.outb[sent:]


def run_server(host: str, port: int, stop_event: threading.Event) -> None:
    lsock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    lsock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    lsock.bind((host, port))
    lsock.listen()
    lsock.setblocking(False)
    sel.register(lsock, selectors.EVENT_READ, data=None)
    while not stop_event.is_set():
        events = sel.select(timeout=0.1)
        for key, mask in events:
            if key.data is None:
                accept_conn(key.fileobj)  # type: ignore
            else:
                service_conn(key, mask)
    lsock.close()


# ── 模拟客户端 ──────────────────────────────────────────────
def run_client(host: str, port: int, client_id: int, messages: list[str]) -> None:
    time.sleep(0.2)  # 等服务端就绪
    with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
        s.connect((host, port))
        for msg in messages:
            s.sendall(f"Client-{client_id}: {msg}\n".encode())
            time.sleep(0.05)
            s.recv(512)  # 读回显


def demo_chat() -> None:
    HOST, PORT = "127.0.0.1", 19876
    stop = threading.Event()

    server_thread = threading.Thread(target=run_server, args=(HOST, PORT, stop), daemon=True)
    server_thread.start()

    clients = [
        threading.Thread(target=run_client, args=(HOST, PORT, i, [f"msg{j}" for j in range(3)]))
        for i in range(1, 4)
    ]
    for c in clients:
        c.start()
    for c in clients:
        c.join()

    time.sleep(0.3)
    stop.set()
    server_thread.join(timeout=2)

    print("\n=== selectors 多客户端聊天演示 ===")
    print(f"{'序号':<4} {'消息'}")
    print("-" * 40)
    for i, line in enumerate(chat_log, 1):
        print(f"{i:<4} {line}")
    print(f"\n共处理 {len(chat_log)} 条消息,单线程服务端,零阻塞")


# ── IO模型耗时对比 ──────────────────────────────────────────
def demo_compare() -> None:
    """模拟阻塞IO vs 多路复用的响应时间差异"""

    def blocking_handler(n: int) -> float:
        t0 = time.perf_counter()
        for _ in range(n):
            time.sleep(0.01)  # 模拟阻塞等待
        return time.perf_counter() - t0

    def mux_handler(n: int) -> float:
        """用selectors事件循环模拟:就绪才处理,不等待"""
        t0 = time.perf_counter()
        ready_queue: list[int] = list(range(n))
        processed = 0
        while ready_queue:
            # 模拟epoll_wait:批量取就绪事件
            batch = ready_queue[:min(10, len(ready_queue))]
            ready_queue = ready_queue[len(batch):]
            for _ in batch:
                time.sleep(0.001)  # 仅处理时间,无等待
            processed += len(batch)
        return time.perf_counter() - t0

    N = 20
    t_block = blocking_handler(N)
    t_mux = mux_handler(N)

    print("\n=== IO模型耗时对比 ===")
    print(f"{'模型':<16} {'连接数':<8} {'耗时(s)':<10} {'说明'}")
    print("-" * 55)
    print(f"{'阻塞IO':<16} {N:<8} {t_block:<10.3f} 串行等待每个IO完成")
    print(f"{'IO多路复用':<14} {N:<8} {t_mux:<10.3f} 批量处理就绪事件")
    print(f"\n加速比: {t_block / t_mux:.1f}x")
    print(f"\n当前平台选择器: {type(sel).__name__}")


def print_theory() -> None:
    print("""
=== IO多路复用核心概念速查 ===

┌──────────┬────────────┬────────────┬────────────┐
│ 系统调用 │ 最大fd数   │ 时间复杂度 │ 平台       │
├──────────┼────────────┼────────────┼────────────┤
│ select   │ 1024       │ O(n)       │ 全平台     │
│ poll     │ 无限制     │ O(n)       │ Linux/Mac  │
│ epoll    │ 无限制     │ O(1)       │ Linux only │
│ kqueue   │ 无限制     │ O(1)       │ macOS/BSD  │
└──────────┴────────────┴────────────┴────────────┘

Python selectors.DefaultSelector() 自动选最优实现。
""")


def main() -> None:
    parser = argparse.ArgumentParser(description="IO多路复用演示")
    parser.add_argument(
        "--mode",
        choices=["chat", "compare", "theory"],
        default="theory",
        help="chat=聊天服务器演示, compare=耗时对比, theory=原理速查",
    )
    args = parser.parse_args()

    if args.mode == "chat":
        demo_chat()
    elif args.mode == "compare":
        demo_compare()
    else:
        print_theory()


if __name__ == "__main__":
    main()

运行后你会看到类似输出:

$ python3 13-python-iomux.py
=== IO多路复用核心概念速查 ===

┌──────────┬────────────┬────────────┬────────────┐
│ 系统调用 │ 最大fd数   │ 时间复杂度 │ 平台       │
├──────────┼────────────┼────────────┼────────────┤
select1024       │ O(n)       │ 全平台     │
│ poll     │ 无限制     │ O(n)       │ Linux/Mac  │
│ epoll    │ 无限制     │ O(1)       │ Linux only │
│ kqueue   │ 无限制     │ O(1)       │ macOS/BSD  │
└──────────┴────────────┴────────────┴────────────┘

Python selectors.DefaultSelector() 自动选最优实现。

6. 运行示例

python3 13-python-iomux.py --mode theory   # 原理速查表
python3 13-python-iomux.py --mode chat     # 多客户端聊天演示
python3 13-python-iomux.py --mode compare  # 阻塞vs多路复用耗时对比

输出示例(chat 模式):

=== selectors 多客户端聊天演示 ===
序号 消息
----------------------------------------
1    [xxxxx] Client-1: msg0
2    [xxxxx] Client-2: msg0
3    [xxxxx] Client-3: msg0
...
共处理 9 条消息,单线程服务端,零阻塞

7. 关键要点

  • selectors.DefaultSelector() 跨平台,生产代码首选
  • 注册 fd 时同时监听 EVENT_READ | EVENT_WRITE,写缓冲为空时应取消写监听(避免空转)
  • epoll ET 模式需循环读到 EAGAIN,否则会漏事件
  • select 的 1024 fd 上限是编译期常量,高并发场景必须换 epoll/kqueue

⏱ NexDo Time(5 分钟)

  1. 改造写监听:修改 service_conn,当 data.outb 为空时用 sel.modify() 取消 EVENT_WRITE 监听,观察 CPU 占用变化。
  2. 统计连接数:在服务端维护一个 active_connections: int 计数器,连接建立 +1,断开 -1,每秒打印一次。
  3. 压力测试:把客户端数量从 3 改到 50,观察单线程服务端是否仍能正常处理所有消息。

Don’t wait for next time, do it in the next moment.