文章

11 · Socket 编程:TCP/UDP 底层通信

#011 · 2026-04-16 · Python

🔗 知识图谱导航:阅读本文前,建议先掌握/回顾 《10 · 核心数据结构:栈、队列与树的实现》 中的核心概念;本文会在这个基础上继续推进。 上一篇我们用数据结构搭建了调度骨架;本篇把数据送上网络——理解 Socket 是构建分布式 AI 推理服务的必经之路。

极客解析:并发不是把代码写复杂,而是把等待、调度和资源隔离讲清楚;本文用本机可运行的 Mock 场景验证机制。

痛点与架构:单独记 API 或概念很容易学完就忘;本文先锁定真实痛点,再把它拆成“输入数据 → 核心机制 → 可运行输出”三段闭环。从 TCP 三次握手原理到完整服务端/客户端实现,覆盖 UDP 广播、struct 粘包处理,并在同一文件内用 threading 演示并发回显服务器。

1. TCP 三次握手(ASCII 图)

客户端                          服务端
  │                               │
  │──── SYN (seq=x) ────────────▶│  第1次:我想连接
  │                               │
  │◀─── SYN-ACK (seq=y,ack=x+1) ─│  第2次:好的,我也准备好了
  │                               │
  │──── ACK (ack=y+1) ──────────▶│  第3次:收到,开始通信
  │                               │
  │════════ 数据传输 ══════════════│
  │                               │
  │──── FIN ────────────────────▶│  四次挥手(关闭)
  │◀─── ACK ────────────────────│
  │◀─── FIN ────────────────────│
  │──── ACK ────────────────────▶│

三次握手确保双方都具备发送接收能力,是可靠传输的基础。

2. 基础 TCP 服务端

import socket

server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
print(f"Socket 类型: {server.type}")
print(f"地址族: {server.family}")
# server.bind(("127.0.0.1", 9000))  # 实际使用时取消注释
# server.listen(5)
# conn, addr = server.accept()  # 阻塞等待客户端
server.close()
print("Socket 演示完成")

3. 粘包问题与 struct 解决方案

TCP 是字节流协议,没有消息边界。连续发两条消息可能被合并成一个 recv

发送方:  [消息A=5字节][消息B=8字节]
接收方:  recv(1024) → [消息A消息B=13字节]  ← 粘包!

解决方案:4字节长度头

import socket

import struct

def send_msg(sock: socket.socket, data: bytes) -> None:
    """先发4字节长度,再发数据"""
    header = struct.pack(">I", len(data))   # big-endian uint32
    sock.sendall(header + data)

def recv_msg(sock: socket.socket) -> bytes:
    """先读4字节长度,再精确读取数据"""
    raw_len = _recv_exact(sock, 4)
    msg_len = struct.unpack(">I", raw_len)[0]
    return _recv_exact(sock, msg_len)

def _recv_exact(sock: socket.socket, n: int) -> bytes:
    buf = b""
    while len(buf) < n:
        chunk = sock.recv(n - len(buf))
        if not chunk:
            raise ConnectionError("连接已断开")
        buf += chunk
    return buf


def _recv_exact(sock, n):
    data = b""
    while len(data) < n:
        chunk = sock.recv(n - len(data))
        if not chunk: raise ConnectionError("连接断开")
        data += chunk
    return data

print("粘包处理函数定义完成")

4. UDP 广播

import socket
# UDP 广播演示(沙箱无网络,仅展示 API 用法)
sender = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sender.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
print("UDP 广播 Socket 创建成功")
print(f"Socket 类型: SOCK_DGRAM = {socket.SOCK_DGRAM}")
# sender.sendto(b"hello", ("255.255.255.255", 9001))  # 需要网络
sender.close()

UDP 无连接、无保证,适合日志广播、服务发现、实时音视频。

5. 从零实现简单 HTTP 服务器

import socket

def handle_http(conn: socket.socket) -> None:
    raw = conn.recv(4096).decode("utf-8", errors="replace")
    first_line = raw.split("\r\n")[0]   # e.g. "GET / HTTP/1.1"
    method, path, _ = first_line.split(" ", 2)
    body = f"<h1>Hello from Python HTTP</h1><p>PATH: {path}</p>"
    response = (
        "HTTP/1.1 200 OK\r\n"
        "Content-Type: text/html; charset=utf-8\r\n"
        f"Content-Length: {len(body.encode())}\r\n"
        "Connection: close\r\n\r\n"
        + body
    )
    conn.sendall(response.encode())
    conn.close()


print("HTTP 处理函数定义完成")

实战:并发 TCP 回显服务器(同文件演示)

服务端和客户端写在同一文件,--mode demothreading 在同一进程内启动服务端线程,再用客户端线程发送多条消息,全程无需开两个终端。

步步为营:核心逻辑自适应拆解

Socket 可以先理解成“程序之间的电话线”:服务端先占一个号码等电话,客户端拨号连接,然后双方收发字节。我们按“地址端口 -> 消息封包 -> 服务端处理 -> 客户端发送 -> 降级保护 -> HTTP 文本协议 -> CLI 总控”逐步拆。

Step 1:先固定本机地址和端口,网络通信才知道找谁

痛点与机制

Socket 通信一定要先说清楚“连到哪里”。127.0.0.1 表示只在本机通信,适合教学和测试;端口号像房间号,同一台机器上不同服务用不同端口区分。

核心源码(逐字来自文末完整源码)

HOST = "127.0.0.1"
PORT = 19876   # 使用高位端口避免冲突
HTTP_PORT = 19877

可运行演示(补齐 Mock 数据与 print 反馈)

# Step 1:HOST 像门牌号,PORT 像房间号,客户端必须按这个地址去找服务端。
HOST = "127.0.0.1"
PORT = 19876   # 使用高位端口避免冲突
HTTP_PORT = 19877

print("服务地址:", HOST)
print("TCP端口:", PORT)
print("HTTP端口:", HTTP_PORT)
print(f"完整TCP入口: {HOST}:{PORT}")

Step 2:用 4 字节长度头解决 TCP 没有消息边界的问题

痛点与机制

TCP 不知道“这一条消息到哪里结束”,它只是一根字节水管。send_msg() 先写 4 字节长度,再写正文;recv_msg() 先读长度,再精确读正文。这样连续发送多条消息也不会粘在一起读错。

核心源码(逐字来自文末完整源码)

def _recv_exact(sock: socket.socket, n: int) -> bytes:
    buf = b""
    while len(buf) < n:
        chunk = sock.recv(n - len(buf))
        if not chunk:
            raise ConnectionError("连接断开")
        buf += chunk
    return buf

def send_msg(sock: socket.socket, data: bytes) -> None:
    sock.sendall(struct.pack(">I", len(data)) + data)

def recv_msg(sock: socket.socket) -> bytes:
    length = struct.unpack(">I", _recv_exact(sock, 4))[0]
    return _recv_exact(sock, length)

可运行演示(补齐 Mock 数据与 print 反馈)

import socket
import struct

# Step 2:TCP 像水管,只保证字节按顺序流动;长度头像包裹外面的快递单。
def _recv_exact(sock: socket.socket, n: int) -> bytes:
    buf = b""
    while len(buf) < n:
        chunk = sock.recv(n - len(buf))
        if not chunk:
            raise ConnectionError("连接断开")
        buf += chunk
    return buf

def send_msg(sock: socket.socket, data: bytes) -> None:
    sock.sendall(struct.pack(">I", len(data)) + data)

def recv_msg(sock: socket.socket) -> bytes:
    length = struct.unpack(">I", _recv_exact(sock, 4))[0]
    return _recv_exact(sock, length)

left, right = socket.socketpair()
try:
    send_msg(left, "第一条消息".encode("utf-8"))
    send_msg(left, "第二条更长的消息".encode("utf-8"))
    print("接收1:", recv_msg(right).decode("utf-8"))
    print("接收2:", recv_msg(right).decode("utf-8"))
finally:
    left.close(); right.close()

Step 3:用 handle_client 给每个连接做回显处理

痛点与机制

服务端真正处理业务的不是 listen(),而是拿到连接后的 handle_client()。这里用 socket.socketpair() 在一个进程里造出一对已连接的 socket,不用开两个终端也能演示“发送 -> 回显 -> 统计”。

核心源码(逐字来自文末完整源码)

def handle_client(conn: socket.socket, addr: tuple[str, int], stats: dict) -> None:
    """处理单个客户端连接(在独立线程中运行)"""
    with conn:
        while True:
            try:
                data = recv_msg(conn)
            except (ConnectionError, struct.error):
                break
            msg = data.decode("utf-8")
            echo = f"[ECHO] {msg}"
            send_msg(conn, echo.encode("utf-8"))
            stats["count"] = stats.get("count", 0) + 1

可运行演示(补齐 Mock 数据与 print 反馈)

import socket
import struct
import threading
from typing import Optional

def _recv_exact(sock: socket.socket, n: int) -> bytes:
    buf = b""
    while len(buf) < n:
        chunk = sock.recv(n - len(buf))
        if not chunk:
            raise ConnectionError("连接断开")
        buf += chunk
    return buf

def send_msg(sock: socket.socket, data: bytes) -> None:
    sock.sendall(struct.pack(">I", len(data)) + data)

def recv_msg(sock: socket.socket) -> bytes:
    length = struct.unpack(">I", _recv_exact(sock, 4))[0]
    return _recv_exact(sock, length)

# Step 3:handle_client 像客服坐席,拿到一句话,就回一句带 [ECHO] 的确认。
def handle_client(conn: socket.socket, addr: tuple[str, int], stats: dict) -> None:
    """处理单个客户端连接(在独立线程中运行)"""
    with conn:
        while True:
            try:
                data = recv_msg(conn)
            except (ConnectionError, struct.error):
                break
            msg = data.decode("utf-8")
            echo = f"[ECHO] {msg}"
            send_msg(conn, echo.encode("utf-8"))
            stats["count"] = stats.get("count", 0) + 1

server_sock, client_sock = socket.socketpair()
stats: dict = {}
thread = threading.Thread(target=handle_client, args=(server_sock, ("local", 0), stats))
thread.start()
try:
    send_msg(client_sock, b"hello socket")
    print("客户端收到:", recv_msg(client_sock).decode("utf-8"))
finally:
    client_sock.close(); thread.join(timeout=2)
print("处理消息数:", stats.get("count", 0))

Step 4:用 run_server 搭一个可停止的多线程 TCP 服务端

痛点与机制

run_server() 是总机:绑定地址、开始监听、循环接收连接。每来一个客户端,就开一个线程交给 handle_client()。演示里不直接启动它,避免读者点运行后服务端一直占着窗口。完整闭环会放到 demo_mode()

核心源码(逐字来自文末完整源码)

def run_server(stop_event: Optional[threading.Event] = None) -> None:
    server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    server.bind((HOST, PORT))
    server.listen(10)
    server.settimeout(0.5)
    stats: dict = {}
    print(f"[服务端] 监听 {HOST}:{PORT}")
    while True:
        if stop_event and stop_event.is_set():
            break
        try:
            conn, addr = server.accept()
        except socket.timeout:
            continue
        t = threading.Thread(target=handle_client, args=(conn, addr, stats), daemon=True)
        t.start()
    server.close()
    print(f"[服务端] 已停止,共处理消息: {stats.get('count', 0)} 条")

可运行演示(补齐 Mock 数据与 print 反馈)

import socket
import struct
import threading
from typing import Optional

HOST = "127.0.0.1"
PORT = 19876   # 使用高位端口避免冲突
HTTP_PORT = 19877

def _recv_exact(sock: socket.socket, n: int) -> bytes:
    buf = b""
    while len(buf) < n:
        chunk = sock.recv(n - len(buf))
        if not chunk:
            raise ConnectionError("连接断开")
        buf += chunk
    return buf

def send_msg(sock: socket.socket, data: bytes) -> None:
    sock.sendall(struct.pack(">I", len(data)) + data)

def recv_msg(sock: socket.socket) -> bytes:
    length = struct.unpack(">I", _recv_exact(sock, 4))[0]
    return _recv_exact(sock, length)

def handle_client(conn: socket.socket, addr: tuple[str, int], stats: dict) -> None:
    """处理单个客户端连接(在独立线程中运行)"""
    with conn:
        while True:
            try:
                data = recv_msg(conn)
            except (ConnectionError, struct.error):
                break
            msg = data.decode("utf-8")
            echo = f"[ECHO] {msg}"
            send_msg(conn, echo.encode("utf-8"))
            stats["count"] = stats.get("count", 0) + 1

# Step 4:服务端像总机,accept 接到一个连接,就分配一个线程处理。
def run_server(stop_event: Optional[threading.Event] = None) -> None:
    server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    server.bind((HOST, PORT))
    server.listen(10)
    server.settimeout(0.5)
    stats: dict = {}
    print(f"[服务端] 监听 {HOST}:{PORT}")
    while True:
        if stop_event and stop_event.is_set():
            break
        try:
            conn, addr = server.accept()
        except socket.timeout:
            continue
        t = threading.Thread(target=handle_client, args=(conn, addr, stats), daemon=True)
        t.start()
    server.close()
    print(f"[服务端] 已停止,共处理消息: {stats.get('count', 0)} 条")

print("run_server 已定义: 支持 stop_event 停止,支持多线程 handle_client")
print("监听目标:", f"{HOST}:{PORT}")

Step 5:读懂 run_client 的发送流程,并用离线回显看结果

痛点与机制

run_client() 的职责是连接服务端、按顺序发送消息、收集回显。因为不同运行环境可能禁止绑定端口,这一步先用同一组消息做离线回显,让读者看懂客户端的数据流;第 6 步会讲端口预检,第 8 步会自动选择真实网络或降级模拟。

核心源码(逐字来自文末完整源码)

def run_client(messages: Optional[list[str]] = None) -> list[str]:
    msgs = messages or MESSAGES
    responses: list[str] = []
    with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
        s.connect((HOST, PORT))
        for msg in msgs:
            send_msg(s, msg.encode("utf-8"))
            resp = recv_msg(s).decode("utf-8")
            responses.append(resp)
    return responses

可运行演示(补齐 Mock 数据与 print 反馈)

import socket
import struct
from typing import Optional

HOST = "127.0.0.1"
PORT = 19876   # 使用高位端口避免冲突
HTTP_PORT = 19877

def _recv_exact(sock: socket.socket, n: int) -> bytes:
    buf = b""
    while len(buf) < n:
        chunk = sock.recv(n - len(buf))
        if not chunk:
            raise ConnectionError("连接断开")
        buf += chunk
    return buf

def send_msg(sock: socket.socket, data: bytes) -> None:
    sock.sendall(struct.pack(">I", len(data)) + data)

def recv_msg(sock: socket.socket) -> bytes:
    length = struct.unpack(">I", _recv_exact(sock, 4))[0]
    return _recv_exact(sock, length)

MESSAGES = [
    "AI pipeline 启动",
    "数据清洗完成,共 1024 条记录",
    "embedding 生成中...",
    "向量存储写入成功",
    "语义检索返回 Top-5 结果",
]

# Step 5:run_client 负责真实拨号;当前演示先不占端口,用同样消息做离线回显。
def run_client(messages: Optional[list[str]] = None) -> list[str]:
    msgs = messages or MESSAGES
    responses: list[str] = []
    with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
        s.connect((HOST, PORT))
        for msg in msgs:
            send_msg(s, msg.encode("utf-8"))
            resp = recv_msg(s).decode("utf-8")
            responses.append(resp)
    return responses

print("客户端待发送消息数:", len(MESSAGES))
for i, msg in enumerate(MESSAGES[:3], 1):
    print(f"{i}. 发送: {msg} -> 回显: [ECHO] {msg}")
print("提示: 真实网络闭环由 demo_mode() 在端口可用时自动完成。")

Step 6:用离线降级和端口预检,让网络演示在受限环境也能跑

痛点与机制

网络环境可能被沙箱、权限或端口占用限制。can_bind_port() 先试一下端口能不能用;不行就用 offline_echo_demo() 保留业务输出。这对教程很重要:不因为环境问题卡住学习。

核心源码(逐字来自文末完整源码)

def offline_echo_demo(messages: Optional[list[str]] = None) -> list[str]:
    """端口不可用时的离线降级:保留协议演示,不依赖真实 socket bind。"""
    return [f"[ECHO] {msg}" for msg in (messages or MESSAGES)]


def can_bind_port(port: int) -> tuple[bool, str]:
    """预检端口权限,避免受限环境里启动后台线程后才抛异常。"""
    try:
        probe = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        probe.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        probe.bind((HOST, port))
        probe.close()
        return True, ""
    except OSError as exc:
        return False, str(exc)

可运行演示(补齐 Mock 数据与 print 反馈)

import socket
from typing import Optional

HOST = "127.0.0.1"
PORT = 19876   # 使用高位端口避免冲突
HTTP_PORT = 19877

MESSAGES = [
    "AI pipeline 启动",
    "数据清洗完成,共 1024 条记录",
    "embedding 生成中...",
    "向量存储写入成功",
    "语义检索返回 Top-5 结果",
]

# Step 6:端口预检像开车前看路况;不能绑定端口时,就走离线模拟路线。
def offline_echo_demo(messages: Optional[list[str]] = None) -> list[str]:
    """端口不可用时的离线降级:保留协议演示,不依赖真实 socket bind。"""
    return [f"[ECHO] {msg}" for msg in (messages or MESSAGES)]


def can_bind_port(port: int) -> tuple[bool, str]:
    """预检端口权限,避免受限环境里启动后台线程后才抛异常。"""
    try:
        probe = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        probe.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        probe.bind((HOST, port))
        probe.close()
        return True, ""
    except OSError as exc:
        return False, str(exc)

ok, reason = can_bind_port(PORT)
print("端口可绑定:", ok)
if reason:
    print("原因:", reason)
print("离线回显:", offline_echo_demo(["hello", "world"]))

Step 7:用 socket 手写 HTTP 响应,理解浏览器背后的文本协议

痛点与机制

HTTP 看起来很高级,但底层仍然是 socket 收发字节。浏览器发来 GET /demo HTTP/1.1,服务端拼出状态行、响应头、空行和 HTML 正文发回去。用 socketpair() 可以不用真的占用端口。

核心源码(逐字来自文末完整源码)

def handle_http(conn: socket.socket) -> None:
    try:
        raw = conn.recv(4096).decode("utf-8", errors="replace")
        if not raw:
            return
        first_line = raw.split("\r\n")[0]
        parts = first_line.split(" ")
        path = parts[1] if len(parts) >= 2 else "/"
        body = f"<h1>Python HTTP Server</h1><p>PATH: {path}</p><p>Powered by socket only</p>"
        resp = (
            "HTTP/1.1 200 OK\r\n"
            "Content-Type: text/html; charset=utf-8\r\n"
            f"Content-Length: {len(body.encode())}\r\n"
            "Connection: close\r\n\r\n" + body
        )
        conn.sendall(resp.encode())
    finally:
        conn.close()

def run_http_server(stop_event: Optional[threading.Event] = None) -> None:
    server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    server.bind((HOST, HTTP_PORT))
    server.listen(5)
    server.settimeout(0.5)
    print(f"[HTTP] 监听 http://{HOST}:{HTTP_PORT}  (浏览器可访问)")
    while True:
        if stop_event and stop_event.is_set():
            break
        try:
            conn, _ = server.accept()
            threading.Thread(target=handle_http, args=(conn,), daemon=True).start()
        except socket.timeout:
            continue
    server.close()

可运行演示(补齐 Mock 数据与 print 反馈)

import socket
import threading
from typing import Optional

HOST = "127.0.0.1"
PORT = 19876   # 使用高位端口避免冲突
HTTP_PORT = 19877

# Step 7:HTTP 本质也是 socket 上的一段文本请求和文本响应。
def handle_http(conn: socket.socket) -> None:
    try:
        raw = conn.recv(4096).decode("utf-8", errors="replace")
        if not raw:
            return
        first_line = raw.split("\r\n")[0]
        parts = first_line.split(" ")
        path = parts[1] if len(parts) >= 2 else "/"
        body = f"<h1>Python HTTP Server</h1><p>PATH: {path}</p><p>Powered by socket only</p>"
        resp = (
            "HTTP/1.1 200 OK\r\n"
            "Content-Type: text/html; charset=utf-8\r\n"
            f"Content-Length: {len(body.encode())}\r\n"
            "Connection: close\r\n\r\n" + body
        )
        conn.sendall(resp.encode())
    finally:
        conn.close()

def run_http_server(stop_event: Optional[threading.Event] = None) -> None:
    server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    server.bind((HOST, HTTP_PORT))
    server.listen(5)
    server.settimeout(0.5)
    print(f"[HTTP] 监听 http://{HOST}:{HTTP_PORT}  (浏览器可访问)")
    while True:
        if stop_event and stop_event.is_set():
            break
        try:
            conn, _ = server.accept()
            threading.Thread(target=handle_http, args=(conn,), daemon=True).start()
        except socket.timeout:
            continue
    server.close()

server_sock, client_sock = socket.socketpair()
try:
    thread = threading.Thread(target=handle_http, args=(server_sock,))
    thread.start()
    client_sock.sendall(b"GET /demo HTTP/1.1\r\nHost: localhost\r\n\r\n")
    response = client_sock.recv(4096).decode("utf-8", errors="replace")
    print(response.split("\r\n")[0])
    print(response.split("\r\n\r\n", 1)[1])
    thread.join(timeout=2)
finally:
    client_sock.close()

Step 8:用 demo_mode 和 main 做一键可运行的网络闭环

痛点与机制

最后把所有零件装起来:print_tcp_diagram() 先解释协议结构,main() 读取 --mode,默认 demo 会自动启动服务端线程、运行客户端、停止服务端;如果端口不可用,会自动降级到离线回显。

核心源码(逐字来自文末完整源码)

def print_tcp_diagram() -> None:
    diagram = """
TCP 三次握手:
  客户端                    服务端
    │──── SYN ────────────▶│
    │◀─── SYN-ACK ─────────│
    │──── ACK ────────────▶│
    │════ 数据传输 ══════════│

粘包解决方案(4字节长度头):
  ┌────────┬──────────────────────┐
  │ 4字节  │  N字节数据            │
  │ 长度头 │  (struct.pack ">I")  │
  └────────┴──────────────────────┘
"""
    print(diagram)


# ── 入口 ──────────────────────────────────────────────────────
def main() -> None:
    parser = argparse.ArgumentParser(description="TCP Socket 演示")
    parser.add_argument("--mode", choices=["demo", "server", "client", "http"],
                        default="demo", help="运行模式")
    args = parser.parse_args()

    print_tcp_diagram()

    if args.mode == "demo":
        demo_mode()
    elif args.mode == "server":
        run_server()
    elif args.mode == "client":
        responses = run_client()
        for r in responses:
            print(r)
    elif args.mode == "http":
        stop = threading.Event()
        t = threading.Thread(target=run_http_server, args=(stop,), daemon=True)
        t.start()
        print(f"访问 http://{HOST}:{HTTP_PORT} 查看效果,按 Ctrl+C 停止")
        try:
            while True:
                time.sleep(1)
        except KeyboardInterrupt:
            stop.set()

可运行演示(补齐 Mock 数据与 print 反馈)

import argparse
import socket
import struct
import threading
import time
from typing import Optional

HOST = "127.0.0.1"
PORT = 19876   # 使用高位端口避免冲突
HTTP_PORT = 19877

def _recv_exact(sock: socket.socket, n: int) -> bytes:
    buf = b""
    while len(buf) < n:
        chunk = sock.recv(n - len(buf))
        if not chunk:
            raise ConnectionError("连接断开")
        buf += chunk
    return buf

def send_msg(sock: socket.socket, data: bytes) -> None:
    sock.sendall(struct.pack(">I", len(data)) + data)

def recv_msg(sock: socket.socket) -> bytes:
    length = struct.unpack(">I", _recv_exact(sock, 4))[0]
    return _recv_exact(sock, length)

def handle_client(conn: socket.socket, addr: tuple[str, int], stats: dict) -> None:
    """处理单个客户端连接(在独立线程中运行)"""
    with conn:
        while True:
            try:
                data = recv_msg(conn)
            except (ConnectionError, struct.error):
                break
            msg = data.decode("utf-8")
            echo = f"[ECHO] {msg}"
            send_msg(conn, echo.encode("utf-8"))
            stats["count"] = stats.get("count", 0) + 1

def run_server(stop_event: Optional[threading.Event] = None) -> None:
    server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    server.bind((HOST, PORT))
    server.listen(10)
    server.settimeout(0.5)
    stats: dict = {}
    print(f"[服务端] 监听 {HOST}:{PORT}")
    while True:
        if stop_event and stop_event.is_set():
            break
        try:
            conn, addr = server.accept()
        except socket.timeout:
            continue
        t = threading.Thread(target=handle_client, args=(conn, addr, stats), daemon=True)
        t.start()
    server.close()
    print(f"[服务端] 已停止,共处理消息: {stats.get('count', 0)} 条")

MESSAGES = [
    "AI pipeline 启动",
    "数据清洗完成,共 1024 条记录",
    "embedding 生成中...",
    "向量存储写入成功",
    "语义检索返回 Top-5 结果",
]

def run_client(messages: Optional[list[str]] = None) -> list[str]:
    msgs = messages or MESSAGES
    responses: list[str] = []
    with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
        s.connect((HOST, PORT))
        for msg in msgs:
            send_msg(s, msg.encode("utf-8"))
            resp = recv_msg(s).decode("utf-8")
            responses.append(resp)
    return responses

def offline_echo_demo(messages: Optional[list[str]] = None) -> list[str]:
    """端口不可用时的离线降级:保留协议演示,不依赖真实 socket bind。"""
    return [f"[ECHO] {msg}" for msg in (messages or MESSAGES)]


def can_bind_port(port: int) -> tuple[bool, str]:
    """预检端口权限,避免受限环境里启动后台线程后才抛异常。"""
    try:
        probe = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        probe.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        probe.bind((HOST, port))
        probe.close()
        return True, ""
    except OSError as exc:
        return False, str(exc)

def handle_http(conn: socket.socket) -> None:
    try:
        raw = conn.recv(4096).decode("utf-8", errors="replace")
        if not raw:
            return
        first_line = raw.split("\r\n")[0]
        parts = first_line.split(" ")
        path = parts[1] if len(parts) >= 2 else "/"
        body = f"<h1>Python HTTP Server</h1><p>PATH: {path}</p><p>Powered by socket only</p>"
        resp = (
            "HTTP/1.1 200 OK\r\n"
            "Content-Type: text/html; charset=utf-8\r\n"
            f"Content-Length: {len(body.encode())}\r\n"
            "Connection: close\r\n\r\n" + body
        )
        conn.sendall(resp.encode())
    finally:
        conn.close()

def run_http_server(stop_event: Optional[threading.Event] = None) -> None:
    server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    server.bind((HOST, HTTP_PORT))
    server.listen(5)
    server.settimeout(0.5)
    print(f"[HTTP] 监听 http://{HOST}:{HTTP_PORT}  (浏览器可访问)")
    while True:
        if stop_event and stop_event.is_set():
            break
        try:
            conn, _ = server.accept()
            threading.Thread(target=handle_http, args=(conn,), daemon=True).start()
        except socket.timeout:
            continue
    server.close()

def demo_mode() -> None:
    ok, reason = can_bind_port(PORT)
    if not ok:
        print(f"\n[降级] 当前环境不允许本地 socket 通信:{reason}")
        print("[降级] 改用离线协议模拟,仍然演示长度头 + 回显协议的业务闭环。")
        responses = offline_echo_demo()
        print("\n[客户端] 发送消息并接收回显:")
        print(f"{'#':<4} {'发送':<35} {'回显'}")
        print("─" * 75)
        for i, (sent, resp) in enumerate(zip(MESSAGES, responses), 1):
            print(f"{i:<4} {sent:<35} {resp}")
        print("\n[演示完成]")
        return

    stop = threading.Event()
    srv_thread = threading.Thread(target=run_server, args=(stop,), daemon=True)
    srv_thread.start()
    time.sleep(0.3)   # 等服务端就绪

    print("\n[客户端] 发送消息并接收回显:")
    print(f"{'#':<4} {'发送':<35} {'回显'}")
    print("─" * 75)
    responses = run_client()
    for i, (sent, resp) in enumerate(zip(MESSAGES, responses), 1):
        print(f"{i:<4} {sent:<35} {resp}")

    stop.set()
    srv_thread.join(timeout=2)
    print("\n[演示完成]")

# Step 8:main 是遥控器,--mode demo/server/client/http 决定跑哪种网络模式。
def print_tcp_diagram() -> None:
    diagram = """
TCP 三次握手:
  客户端                    服务端
    │──── SYN ────────────▶│
    │◀─── SYN-ACK ─────────│
    │──── ACK ────────────▶│
    │════ 数据传输 ══════════│

粘包解决方案(4字节长度头):
  ┌────────┬──────────────────────┐
  │ 4字节  │  N字节数据            │
  │ 长度头 │  (struct.pack ">I")  │
  └────────┴──────────────────────┘
"""
    print(diagram)


# ── 入口 ──────────────────────────────────────────────────────
def main() -> None:
    parser = argparse.ArgumentParser(description="TCP Socket 演示")
    parser.add_argument("--mode", choices=["demo", "server", "client", "http"],
                        default="demo", help="运行模式")
    args = parser.parse_args()

    print_tcp_diagram()

    if args.mode == "demo":
        demo_mode()
    elif args.mode == "server":
        run_server()
    elif args.mode == "client":
        responses = run_client()
        for r in responses:
            print(r)
    elif args.mode == "http":
        stop = threading.Event()
        t = threading.Thread(target=run_http_server, args=(stop,), daemon=True)
        t.start()
        print(f"访问 http://{HOST}:{HTTP_PORT} 查看效果,按 Ctrl+C 停止")
        try:
            while True:
                time.sleep(1)
        except KeyboardInterrupt:
            stop.set()

import sys
sys.argv = ["prog", "--mode", "demo"]
main()

极客实战:完整源码与运行

现在,把上面的积木拼起来,将以下完整代码放进你的编辑器,运行它。先看整体闭环,再回头逐段改参数,你会更容易建立工程直觉。

#!/usr/bin/env python3
"""
TCP 回显服务器 + 客户端(同文件演示)
用法:
  python3 11-python-socket.py --mode demo    # 同进程演示(默认)
  python3 11-python-socket.py --mode server  # 单独启动服务端
  python3 11-python-socket.py --mode client  # 单独启动客户端
  python3 11-python-socket.py --mode http    # 简易 HTTP 服务器演示
"""
import argparse
import socket
import struct
import threading
import time
from typing import Optional


HOST = "127.0.0.1"
PORT = 19876   # 使用高位端口避免冲突
HTTP_PORT = 19877


# ── 粘包工具 ──────────────────────────────────────────────────
def _recv_exact(sock: socket.socket, n: int) -> bytes:
    buf = b""
    while len(buf) < n:
        chunk = sock.recv(n - len(buf))
        if not chunk:
            raise ConnectionError("连接断开")
        buf += chunk
    return buf

def send_msg(sock: socket.socket, data: bytes) -> None:
    sock.sendall(struct.pack(">I", len(data)) + data)

def recv_msg(sock: socket.socket) -> bytes:
    length = struct.unpack(">I", _recv_exact(sock, 4))[0]
    return _recv_exact(sock, length)


# ── 服务端 ────────────────────────────────────────────────────
def handle_client(conn: socket.socket, addr: tuple[str, int], stats: dict) -> None:
    """处理单个客户端连接(在独立线程中运行)"""
    with conn:
        while True:
            try:
                data = recv_msg(conn)
            except (ConnectionError, struct.error):
                break
            msg = data.decode("utf-8")
            echo = f"[ECHO] {msg}"
            send_msg(conn, echo.encode("utf-8"))
            stats["count"] = stats.get("count", 0) + 1

def run_server(stop_event: Optional[threading.Event] = None) -> None:
    server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    server.bind((HOST, PORT))
    server.listen(10)
    server.settimeout(0.5)
    stats: dict = {}
    print(f"[服务端] 监听 {HOST}:{PORT}")
    while True:
        if stop_event and stop_event.is_set():
            break
        try:
            conn, addr = server.accept()
        except socket.timeout:
            continue
        t = threading.Thread(target=handle_client, args=(conn, addr, stats), daemon=True)
        t.start()
    server.close()
    print(f"[服务端] 已停止,共处理消息: {stats.get('count', 0)} 条")


# ── 客户端 ────────────────────────────────────────────────────
MESSAGES = [
    "AI pipeline 启动",
    "数据清洗完成,共 1024 条记录",
    "embedding 生成中...",
    "向量存储写入成功",
    "语义检索返回 Top-5 结果",
]

def run_client(messages: Optional[list[str]] = None) -> list[str]:
    msgs = messages or MESSAGES
    responses: list[str] = []
    with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
        s.connect((HOST, PORT))
        for msg in msgs:
            send_msg(s, msg.encode("utf-8"))
            resp = recv_msg(s).decode("utf-8")
            responses.append(resp)
    return responses


def offline_echo_demo(messages: Optional[list[str]] = None) -> list[str]:
    """端口不可用时的离线降级:保留协议演示,不依赖真实 socket bind。"""
    return [f"[ECHO] {msg}" for msg in (messages or MESSAGES)]


def can_bind_port(port: int) -> tuple[bool, str]:
    """预检端口权限,避免受限环境里启动后台线程后才抛异常。"""
    try:
        probe = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        probe.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        probe.bind((HOST, port))
        probe.close()
        return True, ""
    except OSError as exc:
        return False, str(exc)


# ── 简易 HTTP 服务端 ───────────────────────────────────────────
def handle_http(conn: socket.socket) -> None:
    try:
        raw = conn.recv(4096).decode("utf-8", errors="replace")
        if not raw:
            return
        first_line = raw.split("\r\n")[0]
        parts = first_line.split(" ")
        path = parts[1] if len(parts) >= 2 else "/"
        body = f"<h1>Python HTTP Server</h1><p>PATH: {path}</p><p>Powered by socket only</p>"
        resp = (
            "HTTP/1.1 200 OK\r\n"
            "Content-Type: text/html; charset=utf-8\r\n"
            f"Content-Length: {len(body.encode())}\r\n"
            "Connection: close\r\n\r\n" + body
        )
        conn.sendall(resp.encode())
    finally:
        conn.close()

def run_http_server(stop_event: Optional[threading.Event] = None) -> None:
    server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    server.bind((HOST, HTTP_PORT))
    server.listen(5)
    server.settimeout(0.5)
    print(f"[HTTP] 监听 http://{HOST}:{HTTP_PORT}  (浏览器可访问)")
    while True:
        if stop_event and stop_event.is_set():
            break
        try:
            conn, _ = server.accept()
            threading.Thread(target=handle_http, args=(conn,), daemon=True).start()
        except socket.timeout:
            continue
    server.close()


# ── 演示模式 ──────────────────────────────────────────────────
def demo_mode() -> None:
    ok, reason = can_bind_port(PORT)
    if not ok:
        print(f"\n[降级] 当前环境不允许本地 socket 通信:{reason}")
        print("[降级] 改用离线协议模拟,仍然演示长度头 + 回显协议的业务闭环。")
        responses = offline_echo_demo()
        print("\n[客户端] 发送消息并接收回显:")
        print(f"{'#':<4} {'发送':<35} {'回显'}")
        print("─" * 75)
        for i, (sent, resp) in enumerate(zip(MESSAGES, responses), 1):
            print(f"{i:<4} {sent:<35} {resp}")
        print("\n[演示完成]")
        return

    stop = threading.Event()
    srv_thread = threading.Thread(target=run_server, args=(stop,), daemon=True)
    srv_thread.start()
    time.sleep(0.3)   # 等服务端就绪

    print("\n[客户端] 发送消息并接收回显:")
    print(f"{'#':<4} {'发送':<35} {'回显'}")
    print("─" * 75)
    responses = run_client()
    for i, (sent, resp) in enumerate(zip(MESSAGES, responses), 1):
        print(f"{i:<4} {sent:<35} {resp}")

    stop.set()
    srv_thread.join(timeout=2)
    print("\n[演示完成]")

def print_tcp_diagram() -> None:
    diagram = """
TCP 三次握手:
  客户端                    服务端
    │──── SYN ────────────▶│
    │◀─── SYN-ACK ─────────│
    │──── ACK ────────────▶│
    │════ 数据传输 ══════════│

粘包解决方案(4字节长度头):
  ┌────────┬──────────────────────┐
  │ 4字节  │  N字节数据            │
  │ 长度头 │  (struct.pack ">I")  │
  └────────┴──────────────────────┘
"""
    print(diagram)


# ── 入口 ──────────────────────────────────────────────────────
def main() -> None:
    parser = argparse.ArgumentParser(description="TCP Socket 演示")
    parser.add_argument("--mode", choices=["demo", "server", "client", "http"],
                        default="demo", help="运行模式")
    args = parser.parse_args()

    print_tcp_diagram()

    if args.mode == "demo":
        demo_mode()
    elif args.mode == "server":
        run_server()
    elif args.mode == "client":
        responses = run_client()
        for r in responses:
            print(r)
    elif args.mode == "http":
        stop = threading.Event()
        t = threading.Thread(target=run_http_server, args=(stop,), daemon=True)
        t.start()
        print(f"访问 http://{HOST}:{HTTP_PORT} 查看效果,按 Ctrl+C 停止")
        try:
            while True:
                time.sleep(1)
        except KeyboardInterrupt:
            stop.set()

if __name__ == "__main__":
    main()

运行示例:

python3 11-python-socket.py --mode demo    # 同进程演示
python3 11-python-socket.py --mode http    # 启动 HTTP 服务器

协议对比速查

特性 TCP UDP
连接 有连接(三次握手) 无连接
可靠性 保证顺序与到达 不保证
速度 较慢(有确认机制)
粘包 存在,需处理 不存在(数据报)
适用场景 HTTP、数据库、文件传输 DNS、视频流、游戏

⏱ NexDo Time · 5 分钟微操

  1. 修改 handle_client,让服务端统计每个客户端 IP 的消息数,并在停止时打印 IP 统计表。
  2. send_msg / recv_msg 加上消息类型字段(在长度头前再加 1 字节 type),支持区分 TEXT=0x01JSON=0x02
  3. demo_mode 中同时启动 3 个客户端线程并发发送,观察服务端是否能正确并发处理。

Don’t wait for next time, do it in the next moment.