11 · Socket 编程:TCP/UDP 底层通信
🔗 知识图谱导航:阅读本文前,建议先掌握/回顾 《10 · 核心数据结构:栈、队列与树的实现》 中的核心概念;本文会在这个基础上继续推进。 上一篇我们用数据结构搭建了调度骨架;本篇把数据送上网络——理解 Socket 是构建分布式 AI 推理服务的必经之路。
极客解析:并发不是把代码写复杂,而是把等待、调度和资源隔离讲清楚;本文用本机可运行的 Mock 场景验证机制。
痛点与架构:单独记 API 或概念很容易学完就忘;本文先锁定真实痛点,再把它拆成“输入数据 → 核心机制 → 可运行输出”三段闭环。从 TCP 三次握手原理到完整服务端/客户端实现,覆盖 UDP 广播、struct 粘包处理,并在同一文件内用 threading 演示并发回显服务器。
1. TCP 三次握手(ASCII 图)
客户端 服务端
│ │
│──── SYN (seq=x) ────────────▶│ 第1次:我想连接
│ │
│◀─── SYN-ACK (seq=y,ack=x+1) ─│ 第2次:好的,我也准备好了
│ │
│──── ACK (ack=y+1) ──────────▶│ 第3次:收到,开始通信
│ │
│════════ 数据传输 ══════════════│
│ │
│──── FIN ────────────────────▶│ 四次挥手(关闭)
│◀─── ACK ────────────────────│
│◀─── FIN ────────────────────│
│──── ACK ────────────────────▶│
三次握手确保双方都具备发送和接收能力,是可靠传输的基础。
2. 基础 TCP 服务端
import socket
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
print(f"Socket 类型: {server.type}")
print(f"地址族: {server.family}")
# server.bind(("127.0.0.1", 9000)) # 实际使用时取消注释
# server.listen(5)
# conn, addr = server.accept() # 阻塞等待客户端
server.close()
print("Socket 演示完成")
3. 粘包问题与 struct 解决方案
TCP 是字节流协议,没有消息边界。连续发两条消息可能被合并成一个 recv。
发送方: [消息A=5字节][消息B=8字节]
接收方: recv(1024) → [消息A消息B=13字节] ← 粘包!
解决方案:4字节长度头
import socket
import struct
def send_msg(sock: socket.socket, data: bytes) -> None:
"""先发4字节长度,再发数据"""
header = struct.pack(">I", len(data)) # big-endian uint32
sock.sendall(header + data)
def recv_msg(sock: socket.socket) -> bytes:
"""先读4字节长度,再精确读取数据"""
raw_len = _recv_exact(sock, 4)
msg_len = struct.unpack(">I", raw_len)[0]
return _recv_exact(sock, msg_len)
def _recv_exact(sock: socket.socket, n: int) -> bytes:
buf = b""
while len(buf) < n:
chunk = sock.recv(n - len(buf))
if not chunk:
raise ConnectionError("连接已断开")
buf += chunk
return buf
def _recv_exact(sock, n):
data = b""
while len(data) < n:
chunk = sock.recv(n - len(data))
if not chunk: raise ConnectionError("连接断开")
data += chunk
return data
print("粘包处理函数定义完成")
4. UDP 广播
import socket
# UDP 广播演示(沙箱无网络,仅展示 API 用法)
sender = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sender.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
print("UDP 广播 Socket 创建成功")
print(f"Socket 类型: SOCK_DGRAM = {socket.SOCK_DGRAM}")
# sender.sendto(b"hello", ("255.255.255.255", 9001)) # 需要网络
sender.close()
UDP 无连接、无保证,适合日志广播、服务发现、实时音视频。
5. 从零实现简单 HTTP 服务器
import socket
def handle_http(conn: socket.socket) -> None:
raw = conn.recv(4096).decode("utf-8", errors="replace")
first_line = raw.split("\r\n")[0] # e.g. "GET / HTTP/1.1"
method, path, _ = first_line.split(" ", 2)
body = f"<h1>Hello from Python HTTP</h1><p>PATH: {path}</p>"
response = (
"HTTP/1.1 200 OK\r\n"
"Content-Type: text/html; charset=utf-8\r\n"
f"Content-Length: {len(body.encode())}\r\n"
"Connection: close\r\n\r\n"
+ body
)
conn.sendall(response.encode())
conn.close()
print("HTTP 处理函数定义完成")
实战:并发 TCP 回显服务器(同文件演示)
服务端和客户端写在同一文件,--mode demo 用 threading 在同一进程内启动服务端线程,再用客户端线程发送多条消息,全程无需开两个终端。
步步为营:核心逻辑自适应拆解
Socket 可以先理解成“程序之间的电话线”:服务端先占一个号码等电话,客户端拨号连接,然后双方收发字节。我们按“地址端口 -> 消息封包 -> 服务端处理 -> 客户端发送 -> 降级保护 -> HTTP 文本协议 -> CLI 总控”逐步拆。
Step 1:先固定本机地址和端口,网络通信才知道找谁
痛点与机制:
Socket 通信一定要先说清楚“连到哪里”。127.0.0.1 表示只在本机通信,适合教学和测试;端口号像房间号,同一台机器上不同服务用不同端口区分。
核心源码(逐字来自文末完整源码):
HOST = "127.0.0.1"
PORT = 19876 # 使用高位端口避免冲突
HTTP_PORT = 19877
可运行演示(补齐 Mock 数据与 print 反馈):
# Step 1:HOST 像门牌号,PORT 像房间号,客户端必须按这个地址去找服务端。
HOST = "127.0.0.1"
PORT = 19876 # 使用高位端口避免冲突
HTTP_PORT = 19877
print("服务地址:", HOST)
print("TCP端口:", PORT)
print("HTTP端口:", HTTP_PORT)
print(f"完整TCP入口: {HOST}:{PORT}")
Step 2:用 4 字节长度头解决 TCP 没有消息边界的问题
痛点与机制:
TCP 不知道“这一条消息到哪里结束”,它只是一根字节水管。send_msg() 先写 4 字节长度,再写正文;recv_msg() 先读长度,再精确读正文。这样连续发送多条消息也不会粘在一起读错。
核心源码(逐字来自文末完整源码):
def _recv_exact(sock: socket.socket, n: int) -> bytes:
buf = b""
while len(buf) < n:
chunk = sock.recv(n - len(buf))
if not chunk:
raise ConnectionError("连接断开")
buf += chunk
return buf
def send_msg(sock: socket.socket, data: bytes) -> None:
sock.sendall(struct.pack(">I", len(data)) + data)
def recv_msg(sock: socket.socket) -> bytes:
length = struct.unpack(">I", _recv_exact(sock, 4))[0]
return _recv_exact(sock, length)
可运行演示(补齐 Mock 数据与 print 反馈):
import socket
import struct
# Step 2:TCP 像水管,只保证字节按顺序流动;长度头像包裹外面的快递单。
def _recv_exact(sock: socket.socket, n: int) -> bytes:
buf = b""
while len(buf) < n:
chunk = sock.recv(n - len(buf))
if not chunk:
raise ConnectionError("连接断开")
buf += chunk
return buf
def send_msg(sock: socket.socket, data: bytes) -> None:
sock.sendall(struct.pack(">I", len(data)) + data)
def recv_msg(sock: socket.socket) -> bytes:
length = struct.unpack(">I", _recv_exact(sock, 4))[0]
return _recv_exact(sock, length)
left, right = socket.socketpair()
try:
send_msg(left, "第一条消息".encode("utf-8"))
send_msg(left, "第二条更长的消息".encode("utf-8"))
print("接收1:", recv_msg(right).decode("utf-8"))
print("接收2:", recv_msg(right).decode("utf-8"))
finally:
left.close(); right.close()
Step 3:用 handle_client 给每个连接做回显处理
痛点与机制:
服务端真正处理业务的不是 listen(),而是拿到连接后的 handle_client()。这里用 socket.socketpair() 在一个进程里造出一对已连接的 socket,不用开两个终端也能演示“发送 -> 回显 -> 统计”。
核心源码(逐字来自文末完整源码):
def handle_client(conn: socket.socket, addr: tuple[str, int], stats: dict) -> None:
"""处理单个客户端连接(在独立线程中运行)"""
with conn:
while True:
try:
data = recv_msg(conn)
except (ConnectionError, struct.error):
break
msg = data.decode("utf-8")
echo = f"[ECHO] {msg}"
send_msg(conn, echo.encode("utf-8"))
stats["count"] = stats.get("count", 0) + 1
可运行演示(补齐 Mock 数据与 print 反馈):
import socket
import struct
import threading
from typing import Optional
def _recv_exact(sock: socket.socket, n: int) -> bytes:
buf = b""
while len(buf) < n:
chunk = sock.recv(n - len(buf))
if not chunk:
raise ConnectionError("连接断开")
buf += chunk
return buf
def send_msg(sock: socket.socket, data: bytes) -> None:
sock.sendall(struct.pack(">I", len(data)) + data)
def recv_msg(sock: socket.socket) -> bytes:
length = struct.unpack(">I", _recv_exact(sock, 4))[0]
return _recv_exact(sock, length)
# Step 3:handle_client 像客服坐席,拿到一句话,就回一句带 [ECHO] 的确认。
def handle_client(conn: socket.socket, addr: tuple[str, int], stats: dict) -> None:
"""处理单个客户端连接(在独立线程中运行)"""
with conn:
while True:
try:
data = recv_msg(conn)
except (ConnectionError, struct.error):
break
msg = data.decode("utf-8")
echo = f"[ECHO] {msg}"
send_msg(conn, echo.encode("utf-8"))
stats["count"] = stats.get("count", 0) + 1
server_sock, client_sock = socket.socketpair()
stats: dict = {}
thread = threading.Thread(target=handle_client, args=(server_sock, ("local", 0), stats))
thread.start()
try:
send_msg(client_sock, b"hello socket")
print("客户端收到:", recv_msg(client_sock).decode("utf-8"))
finally:
client_sock.close(); thread.join(timeout=2)
print("处理消息数:", stats.get("count", 0))
Step 4:用 run_server 搭一个可停止的多线程 TCP 服务端
痛点与机制:
run_server() 是总机:绑定地址、开始监听、循环接收连接。每来一个客户端,就开一个线程交给 handle_client()。演示里不直接启动它,避免读者点运行后服务端一直占着窗口。完整闭环会放到 demo_mode()。
核心源码(逐字来自文末完整源码):
def run_server(stop_event: Optional[threading.Event] = None) -> None:
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
server.bind((HOST, PORT))
server.listen(10)
server.settimeout(0.5)
stats: dict = {}
print(f"[服务端] 监听 {HOST}:{PORT}")
while True:
if stop_event and stop_event.is_set():
break
try:
conn, addr = server.accept()
except socket.timeout:
continue
t = threading.Thread(target=handle_client, args=(conn, addr, stats), daemon=True)
t.start()
server.close()
print(f"[服务端] 已停止,共处理消息: {stats.get('count', 0)} 条")
可运行演示(补齐 Mock 数据与 print 反馈):
import socket
import struct
import threading
from typing import Optional
HOST = "127.0.0.1"
PORT = 19876 # 使用高位端口避免冲突
HTTP_PORT = 19877
def _recv_exact(sock: socket.socket, n: int) -> bytes:
buf = b""
while len(buf) < n:
chunk = sock.recv(n - len(buf))
if not chunk:
raise ConnectionError("连接断开")
buf += chunk
return buf
def send_msg(sock: socket.socket, data: bytes) -> None:
sock.sendall(struct.pack(">I", len(data)) + data)
def recv_msg(sock: socket.socket) -> bytes:
length = struct.unpack(">I", _recv_exact(sock, 4))[0]
return _recv_exact(sock, length)
def handle_client(conn: socket.socket, addr: tuple[str, int], stats: dict) -> None:
"""处理单个客户端连接(在独立线程中运行)"""
with conn:
while True:
try:
data = recv_msg(conn)
except (ConnectionError, struct.error):
break
msg = data.decode("utf-8")
echo = f"[ECHO] {msg}"
send_msg(conn, echo.encode("utf-8"))
stats["count"] = stats.get("count", 0) + 1
# Step 4:服务端像总机,accept 接到一个连接,就分配一个线程处理。
def run_server(stop_event: Optional[threading.Event] = None) -> None:
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
server.bind((HOST, PORT))
server.listen(10)
server.settimeout(0.5)
stats: dict = {}
print(f"[服务端] 监听 {HOST}:{PORT}")
while True:
if stop_event and stop_event.is_set():
break
try:
conn, addr = server.accept()
except socket.timeout:
continue
t = threading.Thread(target=handle_client, args=(conn, addr, stats), daemon=True)
t.start()
server.close()
print(f"[服务端] 已停止,共处理消息: {stats.get('count', 0)} 条")
print("run_server 已定义: 支持 stop_event 停止,支持多线程 handle_client")
print("监听目标:", f"{HOST}:{PORT}")
Step 5:读懂 run_client 的发送流程,并用离线回显看结果
痛点与机制:
run_client() 的职责是连接服务端、按顺序发送消息、收集回显。因为不同运行环境可能禁止绑定端口,这一步先用同一组消息做离线回显,让读者看懂客户端的数据流;第 6 步会讲端口预检,第 8 步会自动选择真实网络或降级模拟。
核心源码(逐字来自文末完整源码):
def run_client(messages: Optional[list[str]] = None) -> list[str]:
msgs = messages or MESSAGES
responses: list[str] = []
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.connect((HOST, PORT))
for msg in msgs:
send_msg(s, msg.encode("utf-8"))
resp = recv_msg(s).decode("utf-8")
responses.append(resp)
return responses
可运行演示(补齐 Mock 数据与 print 反馈):
import socket
import struct
from typing import Optional
HOST = "127.0.0.1"
PORT = 19876 # 使用高位端口避免冲突
HTTP_PORT = 19877
def _recv_exact(sock: socket.socket, n: int) -> bytes:
buf = b""
while len(buf) < n:
chunk = sock.recv(n - len(buf))
if not chunk:
raise ConnectionError("连接断开")
buf += chunk
return buf
def send_msg(sock: socket.socket, data: bytes) -> None:
sock.sendall(struct.pack(">I", len(data)) + data)
def recv_msg(sock: socket.socket) -> bytes:
length = struct.unpack(">I", _recv_exact(sock, 4))[0]
return _recv_exact(sock, length)
MESSAGES = [
"AI pipeline 启动",
"数据清洗完成,共 1024 条记录",
"embedding 生成中...",
"向量存储写入成功",
"语义检索返回 Top-5 结果",
]
# Step 5:run_client 负责真实拨号;当前演示先不占端口,用同样消息做离线回显。
def run_client(messages: Optional[list[str]] = None) -> list[str]:
msgs = messages or MESSAGES
responses: list[str] = []
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.connect((HOST, PORT))
for msg in msgs:
send_msg(s, msg.encode("utf-8"))
resp = recv_msg(s).decode("utf-8")
responses.append(resp)
return responses
print("客户端待发送消息数:", len(MESSAGES))
for i, msg in enumerate(MESSAGES[:3], 1):
print(f"{i}. 发送: {msg} -> 回显: [ECHO] {msg}")
print("提示: 真实网络闭环由 demo_mode() 在端口可用时自动完成。")
Step 6:用离线降级和端口预检,让网络演示在受限环境也能跑
痛点与机制:
网络环境可能被沙箱、权限或端口占用限制。can_bind_port() 先试一下端口能不能用;不行就用 offline_echo_demo() 保留业务输出。这对教程很重要:不因为环境问题卡住学习。
核心源码(逐字来自文末完整源码):
def offline_echo_demo(messages: Optional[list[str]] = None) -> list[str]:
"""端口不可用时的离线降级:保留协议演示,不依赖真实 socket bind。"""
return [f"[ECHO] {msg}" for msg in (messages or MESSAGES)]
def can_bind_port(port: int) -> tuple[bool, str]:
"""预检端口权限,避免受限环境里启动后台线程后才抛异常。"""
try:
probe = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
probe.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
probe.bind((HOST, port))
probe.close()
return True, ""
except OSError as exc:
return False, str(exc)
可运行演示(补齐 Mock 数据与 print 反馈):
import socket
from typing import Optional
HOST = "127.0.0.1"
PORT = 19876 # 使用高位端口避免冲突
HTTP_PORT = 19877
MESSAGES = [
"AI pipeline 启动",
"数据清洗完成,共 1024 条记录",
"embedding 生成中...",
"向量存储写入成功",
"语义检索返回 Top-5 结果",
]
# Step 6:端口预检像开车前看路况;不能绑定端口时,就走离线模拟路线。
def offline_echo_demo(messages: Optional[list[str]] = None) -> list[str]:
"""端口不可用时的离线降级:保留协议演示,不依赖真实 socket bind。"""
return [f"[ECHO] {msg}" for msg in (messages or MESSAGES)]
def can_bind_port(port: int) -> tuple[bool, str]:
"""预检端口权限,避免受限环境里启动后台线程后才抛异常。"""
try:
probe = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
probe.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
probe.bind((HOST, port))
probe.close()
return True, ""
except OSError as exc:
return False, str(exc)
ok, reason = can_bind_port(PORT)
print("端口可绑定:", ok)
if reason:
print("原因:", reason)
print("离线回显:", offline_echo_demo(["hello", "world"]))
Step 7:用 socket 手写 HTTP 响应,理解浏览器背后的文本协议
痛点与机制:
HTTP 看起来很高级,但底层仍然是 socket 收发字节。浏览器发来 GET /demo HTTP/1.1,服务端拼出状态行、响应头、空行和 HTML 正文发回去。用 socketpair() 可以不用真的占用端口。
核心源码(逐字来自文末完整源码):
def handle_http(conn: socket.socket) -> None:
try:
raw = conn.recv(4096).decode("utf-8", errors="replace")
if not raw:
return
first_line = raw.split("\r\n")[0]
parts = first_line.split(" ")
path = parts[1] if len(parts) >= 2 else "/"
body = f"<h1>Python HTTP Server</h1><p>PATH: {path}</p><p>Powered by socket only</p>"
resp = (
"HTTP/1.1 200 OK\r\n"
"Content-Type: text/html; charset=utf-8\r\n"
f"Content-Length: {len(body.encode())}\r\n"
"Connection: close\r\n\r\n" + body
)
conn.sendall(resp.encode())
finally:
conn.close()
def run_http_server(stop_event: Optional[threading.Event] = None) -> None:
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
server.bind((HOST, HTTP_PORT))
server.listen(5)
server.settimeout(0.5)
print(f"[HTTP] 监听 http://{HOST}:{HTTP_PORT} (浏览器可访问)")
while True:
if stop_event and stop_event.is_set():
break
try:
conn, _ = server.accept()
threading.Thread(target=handle_http, args=(conn,), daemon=True).start()
except socket.timeout:
continue
server.close()
可运行演示(补齐 Mock 数据与 print 反馈):
import socket
import threading
from typing import Optional
HOST = "127.0.0.1"
PORT = 19876 # 使用高位端口避免冲突
HTTP_PORT = 19877
# Step 7:HTTP 本质也是 socket 上的一段文本请求和文本响应。
def handle_http(conn: socket.socket) -> None:
try:
raw = conn.recv(4096).decode("utf-8", errors="replace")
if not raw:
return
first_line = raw.split("\r\n")[0]
parts = first_line.split(" ")
path = parts[1] if len(parts) >= 2 else "/"
body = f"<h1>Python HTTP Server</h1><p>PATH: {path}</p><p>Powered by socket only</p>"
resp = (
"HTTP/1.1 200 OK\r\n"
"Content-Type: text/html; charset=utf-8\r\n"
f"Content-Length: {len(body.encode())}\r\n"
"Connection: close\r\n\r\n" + body
)
conn.sendall(resp.encode())
finally:
conn.close()
def run_http_server(stop_event: Optional[threading.Event] = None) -> None:
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
server.bind((HOST, HTTP_PORT))
server.listen(5)
server.settimeout(0.5)
print(f"[HTTP] 监听 http://{HOST}:{HTTP_PORT} (浏览器可访问)")
while True:
if stop_event and stop_event.is_set():
break
try:
conn, _ = server.accept()
threading.Thread(target=handle_http, args=(conn,), daemon=True).start()
except socket.timeout:
continue
server.close()
server_sock, client_sock = socket.socketpair()
try:
thread = threading.Thread(target=handle_http, args=(server_sock,))
thread.start()
client_sock.sendall(b"GET /demo HTTP/1.1\r\nHost: localhost\r\n\r\n")
response = client_sock.recv(4096).decode("utf-8", errors="replace")
print(response.split("\r\n")[0])
print(response.split("\r\n\r\n", 1)[1])
thread.join(timeout=2)
finally:
client_sock.close()
Step 8:用 demo_mode 和 main 做一键可运行的网络闭环
痛点与机制:
最后把所有零件装起来:print_tcp_diagram() 先解释协议结构,main() 读取 --mode,默认 demo 会自动启动服务端线程、运行客户端、停止服务端;如果端口不可用,会自动降级到离线回显。
核心源码(逐字来自文末完整源码):
def print_tcp_diagram() -> None:
diagram = """
TCP 三次握手:
客户端 服务端
│──── SYN ────────────▶│
│◀─── SYN-ACK ─────────│
│──── ACK ────────────▶│
│════ 数据传输 ══════════│
粘包解决方案(4字节长度头):
┌────────┬──────────────────────┐
│ 4字节 │ N字节数据 │
│ 长度头 │ (struct.pack ">I") │
└────────┴──────────────────────┘
"""
print(diagram)
# ── 入口 ──────────────────────────────────────────────────────
def main() -> None:
parser = argparse.ArgumentParser(description="TCP Socket 演示")
parser.add_argument("--mode", choices=["demo", "server", "client", "http"],
default="demo", help="运行模式")
args = parser.parse_args()
print_tcp_diagram()
if args.mode == "demo":
demo_mode()
elif args.mode == "server":
run_server()
elif args.mode == "client":
responses = run_client()
for r in responses:
print(r)
elif args.mode == "http":
stop = threading.Event()
t = threading.Thread(target=run_http_server, args=(stop,), daemon=True)
t.start()
print(f"访问 http://{HOST}:{HTTP_PORT} 查看效果,按 Ctrl+C 停止")
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
stop.set()
可运行演示(补齐 Mock 数据与 print 反馈):
import argparse
import socket
import struct
import threading
import time
from typing import Optional
HOST = "127.0.0.1"
PORT = 19876 # 使用高位端口避免冲突
HTTP_PORT = 19877
def _recv_exact(sock: socket.socket, n: int) -> bytes:
buf = b""
while len(buf) < n:
chunk = sock.recv(n - len(buf))
if not chunk:
raise ConnectionError("连接断开")
buf += chunk
return buf
def send_msg(sock: socket.socket, data: bytes) -> None:
sock.sendall(struct.pack(">I", len(data)) + data)
def recv_msg(sock: socket.socket) -> bytes:
length = struct.unpack(">I", _recv_exact(sock, 4))[0]
return _recv_exact(sock, length)
def handle_client(conn: socket.socket, addr: tuple[str, int], stats: dict) -> None:
"""处理单个客户端连接(在独立线程中运行)"""
with conn:
while True:
try:
data = recv_msg(conn)
except (ConnectionError, struct.error):
break
msg = data.decode("utf-8")
echo = f"[ECHO] {msg}"
send_msg(conn, echo.encode("utf-8"))
stats["count"] = stats.get("count", 0) + 1
def run_server(stop_event: Optional[threading.Event] = None) -> None:
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
server.bind((HOST, PORT))
server.listen(10)
server.settimeout(0.5)
stats: dict = {}
print(f"[服务端] 监听 {HOST}:{PORT}")
while True:
if stop_event and stop_event.is_set():
break
try:
conn, addr = server.accept()
except socket.timeout:
continue
t = threading.Thread(target=handle_client, args=(conn, addr, stats), daemon=True)
t.start()
server.close()
print(f"[服务端] 已停止,共处理消息: {stats.get('count', 0)} 条")
MESSAGES = [
"AI pipeline 启动",
"数据清洗完成,共 1024 条记录",
"embedding 生成中...",
"向量存储写入成功",
"语义检索返回 Top-5 结果",
]
def run_client(messages: Optional[list[str]] = None) -> list[str]:
msgs = messages or MESSAGES
responses: list[str] = []
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.connect((HOST, PORT))
for msg in msgs:
send_msg(s, msg.encode("utf-8"))
resp = recv_msg(s).decode("utf-8")
responses.append(resp)
return responses
def offline_echo_demo(messages: Optional[list[str]] = None) -> list[str]:
"""端口不可用时的离线降级:保留协议演示,不依赖真实 socket bind。"""
return [f"[ECHO] {msg}" for msg in (messages or MESSAGES)]
def can_bind_port(port: int) -> tuple[bool, str]:
"""预检端口权限,避免受限环境里启动后台线程后才抛异常。"""
try:
probe = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
probe.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
probe.bind((HOST, port))
probe.close()
return True, ""
except OSError as exc:
return False, str(exc)
def handle_http(conn: socket.socket) -> None:
try:
raw = conn.recv(4096).decode("utf-8", errors="replace")
if not raw:
return
first_line = raw.split("\r\n")[0]
parts = first_line.split(" ")
path = parts[1] if len(parts) >= 2 else "/"
body = f"<h1>Python HTTP Server</h1><p>PATH: {path}</p><p>Powered by socket only</p>"
resp = (
"HTTP/1.1 200 OK\r\n"
"Content-Type: text/html; charset=utf-8\r\n"
f"Content-Length: {len(body.encode())}\r\n"
"Connection: close\r\n\r\n" + body
)
conn.sendall(resp.encode())
finally:
conn.close()
def run_http_server(stop_event: Optional[threading.Event] = None) -> None:
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
server.bind((HOST, HTTP_PORT))
server.listen(5)
server.settimeout(0.5)
print(f"[HTTP] 监听 http://{HOST}:{HTTP_PORT} (浏览器可访问)")
while True:
if stop_event and stop_event.is_set():
break
try:
conn, _ = server.accept()
threading.Thread(target=handle_http, args=(conn,), daemon=True).start()
except socket.timeout:
continue
server.close()
def demo_mode() -> None:
ok, reason = can_bind_port(PORT)
if not ok:
print(f"\n[降级] 当前环境不允许本地 socket 通信:{reason}")
print("[降级] 改用离线协议模拟,仍然演示长度头 + 回显协议的业务闭环。")
responses = offline_echo_demo()
print("\n[客户端] 发送消息并接收回显:")
print(f"{'#':<4} {'发送':<35} {'回显'}")
print("─" * 75)
for i, (sent, resp) in enumerate(zip(MESSAGES, responses), 1):
print(f"{i:<4} {sent:<35} {resp}")
print("\n[演示完成]")
return
stop = threading.Event()
srv_thread = threading.Thread(target=run_server, args=(stop,), daemon=True)
srv_thread.start()
time.sleep(0.3) # 等服务端就绪
print("\n[客户端] 发送消息并接收回显:")
print(f"{'#':<4} {'发送':<35} {'回显'}")
print("─" * 75)
responses = run_client()
for i, (sent, resp) in enumerate(zip(MESSAGES, responses), 1):
print(f"{i:<4} {sent:<35} {resp}")
stop.set()
srv_thread.join(timeout=2)
print("\n[演示完成]")
# Step 8:main 是遥控器,--mode demo/server/client/http 决定跑哪种网络模式。
def print_tcp_diagram() -> None:
diagram = """
TCP 三次握手:
客户端 服务端
│──── SYN ────────────▶│
│◀─── SYN-ACK ─────────│
│──── ACK ────────────▶│
│════ 数据传输 ══════════│
粘包解决方案(4字节长度头):
┌────────┬──────────────────────┐
│ 4字节 │ N字节数据 │
│ 长度头 │ (struct.pack ">I") │
└────────┴──────────────────────┘
"""
print(diagram)
# ── 入口 ──────────────────────────────────────────────────────
def main() -> None:
parser = argparse.ArgumentParser(description="TCP Socket 演示")
parser.add_argument("--mode", choices=["demo", "server", "client", "http"],
default="demo", help="运行模式")
args = parser.parse_args()
print_tcp_diagram()
if args.mode == "demo":
demo_mode()
elif args.mode == "server":
run_server()
elif args.mode == "client":
responses = run_client()
for r in responses:
print(r)
elif args.mode == "http":
stop = threading.Event()
t = threading.Thread(target=run_http_server, args=(stop,), daemon=True)
t.start()
print(f"访问 http://{HOST}:{HTTP_PORT} 查看效果,按 Ctrl+C 停止")
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
stop.set()
import sys
sys.argv = ["prog", "--mode", "demo"]
main()
极客实战:完整源码与运行
现在,把上面的积木拼起来,将以下完整代码放进你的编辑器,运行它。先看整体闭环,再回头逐段改参数,你会更容易建立工程直觉。
#!/usr/bin/env python3
"""
TCP 回显服务器 + 客户端(同文件演示)
用法:
python3 11-python-socket.py --mode demo # 同进程演示(默认)
python3 11-python-socket.py --mode server # 单独启动服务端
python3 11-python-socket.py --mode client # 单独启动客户端
python3 11-python-socket.py --mode http # 简易 HTTP 服务器演示
"""
import argparse
import socket
import struct
import threading
import time
from typing import Optional
HOST = "127.0.0.1"
PORT = 19876 # 使用高位端口避免冲突
HTTP_PORT = 19877
# ── 粘包工具 ──────────────────────────────────────────────────
def _recv_exact(sock: socket.socket, n: int) -> bytes:
buf = b""
while len(buf) < n:
chunk = sock.recv(n - len(buf))
if not chunk:
raise ConnectionError("连接断开")
buf += chunk
return buf
def send_msg(sock: socket.socket, data: bytes) -> None:
sock.sendall(struct.pack(">I", len(data)) + data)
def recv_msg(sock: socket.socket) -> bytes:
length = struct.unpack(">I", _recv_exact(sock, 4))[0]
return _recv_exact(sock, length)
# ── 服务端 ────────────────────────────────────────────────────
def handle_client(conn: socket.socket, addr: tuple[str, int], stats: dict) -> None:
"""处理单个客户端连接(在独立线程中运行)"""
with conn:
while True:
try:
data = recv_msg(conn)
except (ConnectionError, struct.error):
break
msg = data.decode("utf-8")
echo = f"[ECHO] {msg}"
send_msg(conn, echo.encode("utf-8"))
stats["count"] = stats.get("count", 0) + 1
def run_server(stop_event: Optional[threading.Event] = None) -> None:
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
server.bind((HOST, PORT))
server.listen(10)
server.settimeout(0.5)
stats: dict = {}
print(f"[服务端] 监听 {HOST}:{PORT}")
while True:
if stop_event and stop_event.is_set():
break
try:
conn, addr = server.accept()
except socket.timeout:
continue
t = threading.Thread(target=handle_client, args=(conn, addr, stats), daemon=True)
t.start()
server.close()
print(f"[服务端] 已停止,共处理消息: {stats.get('count', 0)} 条")
# ── 客户端 ────────────────────────────────────────────────────
MESSAGES = [
"AI pipeline 启动",
"数据清洗完成,共 1024 条记录",
"embedding 生成中...",
"向量存储写入成功",
"语义检索返回 Top-5 结果",
]
def run_client(messages: Optional[list[str]] = None) -> list[str]:
msgs = messages or MESSAGES
responses: list[str] = []
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.connect((HOST, PORT))
for msg in msgs:
send_msg(s, msg.encode("utf-8"))
resp = recv_msg(s).decode("utf-8")
responses.append(resp)
return responses
def offline_echo_demo(messages: Optional[list[str]] = None) -> list[str]:
"""端口不可用时的离线降级:保留协议演示,不依赖真实 socket bind。"""
return [f"[ECHO] {msg}" for msg in (messages or MESSAGES)]
def can_bind_port(port: int) -> tuple[bool, str]:
"""预检端口权限,避免受限环境里启动后台线程后才抛异常。"""
try:
probe = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
probe.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
probe.bind((HOST, port))
probe.close()
return True, ""
except OSError as exc:
return False, str(exc)
# ── 简易 HTTP 服务端 ───────────────────────────────────────────
def handle_http(conn: socket.socket) -> None:
try:
raw = conn.recv(4096).decode("utf-8", errors="replace")
if not raw:
return
first_line = raw.split("\r\n")[0]
parts = first_line.split(" ")
path = parts[1] if len(parts) >= 2 else "/"
body = f"<h1>Python HTTP Server</h1><p>PATH: {path}</p><p>Powered by socket only</p>"
resp = (
"HTTP/1.1 200 OK\r\n"
"Content-Type: text/html; charset=utf-8\r\n"
f"Content-Length: {len(body.encode())}\r\n"
"Connection: close\r\n\r\n" + body
)
conn.sendall(resp.encode())
finally:
conn.close()
def run_http_server(stop_event: Optional[threading.Event] = None) -> None:
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
server.bind((HOST, HTTP_PORT))
server.listen(5)
server.settimeout(0.5)
print(f"[HTTP] 监听 http://{HOST}:{HTTP_PORT} (浏览器可访问)")
while True:
if stop_event and stop_event.is_set():
break
try:
conn, _ = server.accept()
threading.Thread(target=handle_http, args=(conn,), daemon=True).start()
except socket.timeout:
continue
server.close()
# ── 演示模式 ──────────────────────────────────────────────────
def demo_mode() -> None:
ok, reason = can_bind_port(PORT)
if not ok:
print(f"\n[降级] 当前环境不允许本地 socket 通信:{reason}")
print("[降级] 改用离线协议模拟,仍然演示长度头 + 回显协议的业务闭环。")
responses = offline_echo_demo()
print("\n[客户端] 发送消息并接收回显:")
print(f"{'#':<4} {'发送':<35} {'回显'}")
print("─" * 75)
for i, (sent, resp) in enumerate(zip(MESSAGES, responses), 1):
print(f"{i:<4} {sent:<35} {resp}")
print("\n[演示完成]")
return
stop = threading.Event()
srv_thread = threading.Thread(target=run_server, args=(stop,), daemon=True)
srv_thread.start()
time.sleep(0.3) # 等服务端就绪
print("\n[客户端] 发送消息并接收回显:")
print(f"{'#':<4} {'发送':<35} {'回显'}")
print("─" * 75)
responses = run_client()
for i, (sent, resp) in enumerate(zip(MESSAGES, responses), 1):
print(f"{i:<4} {sent:<35} {resp}")
stop.set()
srv_thread.join(timeout=2)
print("\n[演示完成]")
def print_tcp_diagram() -> None:
diagram = """
TCP 三次握手:
客户端 服务端
│──── SYN ────────────▶│
│◀─── SYN-ACK ─────────│
│──── ACK ────────────▶│
│════ 数据传输 ══════════│
粘包解决方案(4字节长度头):
┌────────┬──────────────────────┐
│ 4字节 │ N字节数据 │
│ 长度头 │ (struct.pack ">I") │
└────────┴──────────────────────┘
"""
print(diagram)
# ── 入口 ──────────────────────────────────────────────────────
def main() -> None:
parser = argparse.ArgumentParser(description="TCP Socket 演示")
parser.add_argument("--mode", choices=["demo", "server", "client", "http"],
default="demo", help="运行模式")
args = parser.parse_args()
print_tcp_diagram()
if args.mode == "demo":
demo_mode()
elif args.mode == "server":
run_server()
elif args.mode == "client":
responses = run_client()
for r in responses:
print(r)
elif args.mode == "http":
stop = threading.Event()
t = threading.Thread(target=run_http_server, args=(stop,), daemon=True)
t.start()
print(f"访问 http://{HOST}:{HTTP_PORT} 查看效果,按 Ctrl+C 停止")
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
stop.set()
if __name__ == "__main__":
main()
运行示例:
python3 11-python-socket.py --mode demo # 同进程演示
python3 11-python-socket.py --mode http # 启动 HTTP 服务器
协议对比速查
| 特性 | TCP | UDP |
|---|---|---|
| 连接 | 有连接(三次握手) | 无连接 |
| 可靠性 | 保证顺序与到达 | 不保证 |
| 速度 | 较慢(有确认机制) | 快 |
| 粘包 | 存在,需处理 | 不存在(数据报) |
| 适用场景 | HTTP、数据库、文件传输 | DNS、视频流、游戏 |
⏱ NexDo Time · 5 分钟微操
- 修改
handle_client,让服务端统计每个客户端 IP 的消息数,并在停止时打印 IP 统计表。 - 给
send_msg/recv_msg加上消息类型字段(在长度头前再加 1 字节 type),支持区分TEXT=0x01和JSON=0x02。 - 在
demo_mode中同时启动 3 个客户端线程并发发送,观察服务端是否能正确并发处理。
Don’t wait for next time, do it in the next moment.