18 · HTTP 与 Web 框架基石:从零手写服务器
🔗 知识图谱导航:阅读本文前,建议先掌握《13 · IO 多路复用:select/epoll 与事件驱动》中的 socket 基础和《16 · 数据库底座:SQLite 核心操作》中的数据持久化——本文把这两块拼在一起,用 socket 手写一个能处理真实 HTTP 请求的服务器。
极客解析:Django、FastAPI 的底层都是这条链路:
socket.accept()→ 解析请求报文 → 路由分发 → 业务处理 → 拼装响应报文 →socket.send()。把这条链路手写一遍,框架的"魔法"就全部消失了。
痛点与架构:新手学 Web 开发最常见的困惑:① HTTP 请求到底长什么样?② 框架的路由是怎么工作的?③ 为什么 POST 请求要带
Content-Length?本文用 200 行纯 Python 把这三个问题全部答透。
HTTP 协议结构
┌─────────────────────────────────────────────────────┐
│ HTTP/1.1 请求报文 │
├─────────────────────────────────────────────────────┤
│ 请求行 │ GET /tasks HTTP/1.1 │
├─────────────────────────────────────────────────────┤
│ │ Host: localhost:8000 │
│ 请求头 │ Content-Type: application/json │
│ │ Authorization: Bearer <token> │
├─────────────────────────────────────────────────────┤
│ 空行 │ \r\n │
├─────────────────────────────────────────────────────┤
│ 请求体 │ {"title": "写博客", "done": false} │
└─────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────┐
│ HTTP/1.1 响应报文 │
├─────────────────────────────────────────────────────┤
│ 状态行 │ HTTP/1.1 200 OK │
├─────────────────────────────────────────────────────┤
│ │ Content-Type: application/json │
│ 响应头 │ Content-Length: 42 │
│ │ Connection: close │
├─────────────────────────────────────────────────────┤
│ 空行 │ \r\n │
├─────────────────────────────────────────────────────┤
│ 响应体 │ {"status": "ok", "data": [...]} │
└─────────────────────────────────────────────────────┘
请求方法语义
┌──────────┬────────────┬──────────┬──────────────────────────┐
│ 方法 │ 幂等性 │ 安全性 │ 典型用途 │
├──────────┼────────────┼──────────┼──────────────────────────┤
│ GET │ 是 │ 是 │ 读取资源 │
│ POST │ 否 │ 否 │ 创建资源 │
│ PUT │ 是 │ 否 │ 全量更新资源 │
│ PATCH │ 否 │ 否 │ 部分更新资源 │
│ DELETE │ 是 │ 否 │ 删除资源 │
│ OPTIONS │ 是 │ 是 │ 查询支持的方法(CORS预检)│
└──────────┴────────────┴──────────┴──────────────────────────┘
常用状态码速查
2xx 成功:200 OK | 201 Created | 204 No Content
3xx 重定向:301 永久 | 302 临时
4xx 客户端错误:400 Bad Request | 401 未认证 | 403 无权限 | 404 Not Found | 429 限流
5xx 服务端错误:500 Internal Server Error | 502 Bad Gateway | 503 Service Unavailable
服务器处理链路
socket.bind() → socket.listen() → socket.accept()
↓
接收原始字节流(TCP 不保证边界)
↓
解析请求行 + 请求头 + 请求体(按 Content-Length 截断)
↓
路由表字典查找 → 业务处理函数
↓
拼装响应报文 → socket.sendall()
步步为营:核心逻辑自适应拆解
这一篇的核心是一条链路:socket.accept() → parse_request → dispatch → handle_* → build_response → socket.sendall()。下面每一步都聚焦链路上的一个环节,跑完就能看到结果。
Step 1:用 parse_request 把原始字节流拆成五元组
痛点与机制:
HTTP 报文在 TCP 层面只是一串字节,没有任何结构。parse_request 就像"快递拆包工":先找到 \r\n\r\n(头部和正文的分隔线),把头部按行切开,再按 Content-Length 截取正文。raw.find(b"\r\n\r\n") 是关键——HTTP 协议规定头部和正文之间必须有一个空行,这个空行在字节层面就是 \r\n\r\n。
核心源码(逐字来自文末完整源码):
def parse_request(raw: bytes) -> Tuple[str, str, str, Dict[str, str], bytes]:
"""解析原始 HTTP 请求,返回 (method, path, version, headers, body)"""
header_end = raw.find(b"\r\n\r\n")
header_part = raw[:header_end].decode("utf-8", errors="replace")
body = raw[header_end + 4:]
lines = header_part.split("\r\n")
method, path, version = lines[0].split(" ", 2)
headers: Dict[str, str] = {}
for line in lines[1:]:
if ":" in line:
k, v = line.split(":", 1)
headers[k.strip().lower()] = v.strip()
content_length = int(headers.get("content-length", 0))
return method, path, version, headers, body[:content_length]
可运行演示(补齐 Mock 数据与 print 反馈):
#!/usr/bin/env python3
"""
18-http-server.py
从零手写 HTTP/1.1 服务器 + urllib 演示客户端
用法:
python3 18-http-server.py --mode server # 启动服务器(Ctrl+C 停止)
python3 18-http-server.py --mode demo # 发请求演示(需先启动服务器)
"""
import argparse
import json
import socket
import threading
import time
import urllib.request
import urllib.error
from datetime import datetime
from typing import Callable, Dict, List, Optional, Tuple
_tasks: List[Dict] = [
{"id": 1, "title": "阅读 HTTP RFC", "done": False},
{"id": 2, "title": "手写 socket 服务器", "done": True},
]
_next_id = 3
_lock = threading.Lock()
def parse_request(raw: bytes) -> Tuple[str, str, str, Dict[str, str], bytes]:
"""解析原始 HTTP 请求,返回 (method, path, version, headers, body)"""
header_end = raw.find(b"\r\n\r\n")
header_part = raw[:header_end].decode("utf-8", errors="replace")
body = raw[header_end + 4:]
lines = header_part.split("\r\n")
method, path, version = lines[0].split(" ", 2)
headers: Dict[str, str] = {}
for line in lines[1:]:
if ":" in line:
k, v = line.split(":", 1)
headers[k.strip().lower()] = v.strip()
content_length = int(headers.get("content-length", 0))
return method, path, version, headers, body[:content_length]
# parse_request 把原始字节流拆成 method/path/version/headers/body 五元组
raw = b"GET /tasks?status=done HTTP/1.1\r\nHost: localhost:8000\r\nUser-Agent: curl\r\n\r\n"
method, path, version, headers, body = parse_request(raw)
print(f"method={method}, path={path}, version={version}")
print(f"headers: {dict(list(headers.items())[:2])}")
Step 2:用 build_response 把状态码和字典拼成合法响应报文
痛点与机制:
HTTP 响应报文的格式是固定的:状态行 + 响应头 + 空行 + 响应体。build_response 就像"快递打包工":先把字典序列化成 JSON 字节,算出字节长度写入 Content-Length 头,再把所有部分用 \r\n 拼接起来。Content-Length 是关键——客户端靠它知道响应体读到哪里结束,少了这个头浏览器会一直等待。
核心源码(逐字来自文末完整源码):
def build_response(status: int, body: dict) -> bytes:
"""构造 JSON 响应报文"""
status_text = {200: "OK", 201: "Created", 404: "Not Found",
405: "Method Not Allowed", 400: "Bad Request"}
payload = json.dumps(body, ensure_ascii=False).encode("utf-8")
header = (
f"HTTP/1.1 {status} {status_text.get(status, 'Unknown')}\r\n"
f"Content-Type: application/json; charset=utf-8\r\n"
f"Content-Length: {len(payload)}\r\n"
f"Connection: close\r\n"
f"\r\n"
).encode("utf-8")
return header + payload
可运行演示(补齐 Mock 数据与 print 反馈):
#!/usr/bin/env python3
"""
18-http-server.py
从零手写 HTTP/1.1 服务器 + urllib 演示客户端
用法:
python3 18-http-server.py --mode server # 启动服务器(Ctrl+C 停止)
python3 18-http-server.py --mode demo # 发请求演示(需先启动服务器)
"""
import argparse
import json
import socket
import threading
import time
import urllib.request
import urllib.error
from datetime import datetime
from typing import Callable, Dict, List, Optional, Tuple
# ─── 数据存储(内存) ────────────────────────────────────────────────────────
_tasks: List[Dict] = [
{"id": 1, "title": "阅读 HTTP RFC", "done": False},
{"id": 2, "title": "手写 socket 服务器", "done": True},
]
_next_id = 3
_lock = threading.Lock()
def build_response(status: int, body: dict) -> bytes:
"""构造 JSON 响应报文"""
status_text = {200: "OK", 201: "Created", 404: "Not Found",
405: "Method Not Allowed", 400: "Bad Request"}
payload = json.dumps(body, ensure_ascii=False).encode("utf-8")
header = (
f"HTTP/1.1 {status} {status_text.get(status, 'Unknown')}\r\n"
f"Content-Type: application/json; charset=utf-8\r\n"
f"Content-Length: {len(payload)}\r\n"
f"Connection: close\r\n"
f"\r\n"
).encode("utf-8")
return header + payload
# build_response 把状态码+字典拼成合法的 HTTP 响应报文字节流
resp = build_response(200, {"message": "ok", "count": 42})
print(f"响应首行: {resp.split(b'\r\n')[0].decode()}")
print(f"Content-Length 头: {[l for l in resp.split(b'\r\n') if b'Content-Length' in l][0].decode()}")
Step 3:用 handle_health / handle_tasks 实现路由处理器
痛点与机制:
路由处理器就像"窗口服务员":每个窗口(路径)只处理特定类型的业务。handle_tasks_post 里的 try/except json.JSONDecodeError 是防御性编程——客户端可能发来任何内容,不能假设它一定是合法 JSON。with _lock: 保护 _tasks 列表的并发写入,就像银行柜台同一时刻只能一个人操作账本。
核心源码(逐字来自文末完整源码):
def handle_tasks_post(body: bytes) -> bytes:
global _next_id
try:
data = json.loads(body.decode("utf-8"))
title = data.get("title", "").strip()
if not title:
return build_response(400, {"error": "title 不能为空"})
except (json.JSONDecodeError, UnicodeDecodeError):
return build_response(400, {"error": "请求体必须是合法 JSON"})
with _lock:
task = {"id": _next_id, "title": title, "done": False}
_tasks.append(task)
_next_id += 1
return build_response(201, {"task": task})
可运行演示(补齐 Mock 数据与 print 反馈):
#!/usr/bin/env python3
"""
18-http-server.py
从零手写 HTTP/1.1 服务器 + urllib 演示客户端
用法:
python3 18-http-server.py --mode server # 启动服务器(Ctrl+C 停止)
python3 18-http-server.py --mode demo # 发请求演示(需先启动服务器)
"""
import argparse
import json
import socket
import threading
import time
import urllib.request
import urllib.error
from datetime import datetime
from typing import Callable, Dict, List, Optional, Tuple
# ─── 数据存储(内存) ────────────────────────────────────────────────────────
_tasks: List[Dict] = [
{"id": 1, "title": "阅读 HTTP RFC", "done": False},
{"id": 2, "title": "手写 socket 服务器", "done": True},
]
_next_id = 3
_lock = threading.Lock()
def parse_request(raw: bytes) -> Tuple[str, str, str, Dict[str, str], bytes]:
"""解析原始 HTTP 请求,返回 (method, path, version, headers, body)"""
header_end = raw.find(b"\r\n\r\n")
header_part = raw[:header_end].decode("utf-8", errors="replace")
body = raw[header_end + 4:]
lines = header_part.split("\r\n")
method, path, version = lines[0].split(" ", 2)
headers: Dict[str, str] = {}
for line in lines[1:]:
if ":" in line:
k, v = line.split(":", 1)
headers[k.strip().lower()] = v.strip()
content_length = int(headers.get("content-length", 0))
return method, path, version, headers, body[:content_length]
def build_response(status: int, body: dict) -> bytes:
"""构造 JSON 响应报文"""
status_text = {200: "OK", 201: "Created", 404: "Not Found",
405: "Method Not Allowed", 400: "Bad Request"}
payload = json.dumps(body, ensure_ascii=False).encode("utf-8")
header = (
f"HTTP/1.1 {status} {status_text.get(status, 'Unknown')}\r\n"
f"Content-Type: application/json; charset=utf-8\r\n"
f"Content-Length: {len(payload)}\r\n"
f"Connection: close\r\n"
f"\r\n"
).encode("utf-8")
return header + payload
def handle_health(method: str, _path: str, _headers: Dict, _body: bytes) -> bytes:
if method != "GET":
return build_response(405, {"error": "Method Not Allowed"})
return build_response(200, {
"status": "ok",
"time": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
"tasks_count": len(_tasks),
})
def handle_tasks_get() -> bytes:
with _lock:
return build_response(200, {"tasks": list(_tasks), "total": len(_tasks)})
def handle_tasks_post(body: bytes) -> bytes:
global _next_id
try:
data = json.loads(body.decode("utf-8"))
title = data.get("title", "").strip()
if not title:
return build_response(400, {"error": "title 不能为空"})
except (json.JSONDecodeError, UnicodeDecodeError):
return build_response(400, {"error": "请求体必须是合法 JSON"})
with _lock:
task = {"id": _next_id, "title": title, "done": False}
_tasks.append(task)
_next_id += 1
return build_response(201, {"task": task})
def handle_tasks(method: str, _path: str, _headers: Dict, body: bytes) -> bytes:
if method == "GET":
return handle_tasks_get()
elif method == "POST":
return handle_tasks_post(body)
return build_response(405, {"error": "Method Not Allowed"})
# handle_health/handle_tasks 是路由处理器,接收 (method, path, headers, body) 返回响应字节
resp_health = handle_health("GET", "/health", {}, b"")
print(f"/health GET → {resp_health.split(b' ')[1].decode()}")
resp_tasks = handle_tasks_get()
print(f"/tasks GET → {resp_tasks.split(b' ')[1].decode()}")
body_post = __import__('json').dumps({"title": "新任务"}).encode()
resp_post = handle_tasks_post(body_post)
print(f"/tasks POST → {resp_post.split(b' ')[1].decode()}")
Step 4:用路由表字典 + dispatch 实现 O(1) URL 分发
痛点与机制:
ROUTES 字典就像"楼层导览图":{"/health": handle_health, "/tasks": handle_tasks},查找是 O(1) 的哈希查找,不管有多少路由都一样快。dispatch 先用 path.split("?")[0] 去掉 query string,再查字典——这正是 Flask/FastAPI 路由注册的核心思想,只是框架在此基础上加了正则匹配和路径参数提取。
核心源码(逐字来自文末完整源码):
Router = Dict[str, Callable[[str, str, Dict, bytes], bytes]]
ROUTES: Router = {
"/health": handle_health,
"/tasks": handle_tasks,
}
def dispatch(method: str, path: str, headers: Dict, body: bytes) -> bytes:
# 只匹配路径,忽略 query string
clean_path = path.split("?")[0]
handler = ROUTES.get(clean_path)
if handler is None:
return build_response(404, {"error": f"路径 {clean_path} 不存在"})
return handler(method, clean_path, headers, body)
可运行演示(补齐 Mock 数据与 print 反馈):
#!/usr/bin/env python3
"""
18-http-server.py
从零手写 HTTP/1.1 服务器 + urllib 演示客户端
用法:
python3 18-http-server.py --mode server # 启动服务器(Ctrl+C 停止)
python3 18-http-server.py --mode demo # 发请求演示(需先启动服务器)
"""
import argparse
import json
import socket
import threading
import time
import urllib.request
import urllib.error
from datetime import datetime
from typing import Callable, Dict, List, Optional, Tuple
# ─── 数据存储(内存) ────────────────────────────────────────────────────────
_tasks: List[Dict] = [
{"id": 1, "title": "阅读 HTTP RFC", "done": False},
{"id": 2, "title": "手写 socket 服务器", "done": True},
]
_next_id = 3
_lock = threading.Lock()
def parse_request(raw: bytes) -> Tuple[str, str, str, Dict[str, str], bytes]:
"""解析原始 HTTP 请求,返回 (method, path, version, headers, body)"""
header_end = raw.find(b"\r\n\r\n")
header_part = raw[:header_end].decode("utf-8", errors="replace")
body = raw[header_end + 4:]
lines = header_part.split("\r\n")
method, path, version = lines[0].split(" ", 2)
headers: Dict[str, str] = {}
for line in lines[1:]:
if ":" in line:
k, v = line.split(":", 1)
headers[k.strip().lower()] = v.strip()
content_length = int(headers.get("content-length", 0))
return method, path, version, headers, body[:content_length]
def build_response(status: int, body: dict) -> bytes:
"""构造 JSON 响应报文"""
status_text = {200: "OK", 201: "Created", 404: "Not Found",
405: "Method Not Allowed", 400: "Bad Request"}
payload = json.dumps(body, ensure_ascii=False).encode("utf-8")
header = (
f"HTTP/1.1 {status} {status_text.get(status, 'Unknown')}\r\n"
f"Content-Type: application/json; charset=utf-8\r\n"
f"Content-Length: {len(payload)}\r\n"
f"Connection: close\r\n"
f"\r\n"
).encode("utf-8")
return header + payload
def handle_health(method: str, _path: str, _headers: Dict, _body: bytes) -> bytes:
if method != "GET":
return build_response(405, {"error": "Method Not Allowed"})
return build_response(200, {
"status": "ok",
"time": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
"tasks_count": len(_tasks),
})
def handle_tasks_get() -> bytes:
with _lock:
return build_response(200, {"tasks": list(_tasks), "total": len(_tasks)})
def handle_tasks_post(body: bytes) -> bytes:
global _next_id
try:
data = json.loads(body.decode("utf-8"))
title = data.get("title", "").strip()
if not title:
return build_response(400, {"error": "title 不能为空"})
except (json.JSONDecodeError, UnicodeDecodeError):
return build_response(400, {"error": "请求体必须是合法 JSON"})
with _lock:
task = {"id": _next_id, "title": title, "done": False}
_tasks.append(task)
_next_id += 1
return build_response(201, {"task": task})
def handle_tasks(method: str, _path: str, _headers: Dict, body: bytes) -> bytes:
if method == "GET":
return handle_tasks_get()
elif method == "POST":
return handle_tasks_post(body)
return build_response(405, {"error": "Method Not Allowed"})
Router = Dict[str, Callable[[str, str, Dict, bytes], bytes]]
ROUTES: Router = {
"/health": handle_health,
"/tasks": handle_tasks,
}
def dispatch(method: str, path: str, headers: Dict, body: bytes) -> bytes:
# 只匹配路径,忽略 query string
clean_path = path.split("?")[0]
handler = ROUTES.get(clean_path)
if handler is None:
return build_response(404, {"error": f"路径 {clean_path} 不存在"})
return handler(method, clean_path, headers, body)
# dispatch 用字典路由表做 O(1) 查找,忽略 query string,找不到返回 404
method, path, _, headers, body = parse_request(b"GET /health HTTP/1.1\r\n\r\n")
resp = dispatch(method, path, headers, body)
print(f"dispatch /health → {resp.split(b'\r\n')[0].decode()}")
resp2 = dispatch("GET", "/nonexistent", {}, b"")
print(f"dispatch /nonexistent → {resp2.split(b'\r\n')[0].decode()}")
Step 5:用 handle_connection 在独立线程里处理一个 TCP 连接
痛点与机制:
TCP 是流式协议,不保证一次 recv 能收到完整的 HTTP 请求——就像水管里的水,可能分多次流过来。handle_connection 用循环 recv 收数据,每次收到后检查是否已经收到 \r\n\r\n(头部结束),再按 Content-Length 判断正文是否收完。daemon=True 让线程随主进程退出,不会因为客户端连接没断而阻止程序退出。
核心源码(逐字来自文末完整源码):
def handle_connection(conn: socket.socket, addr: Tuple[str, int]) -> None:
try:
chunks = []
while True:
chunk = conn.recv(4096)
if not chunk:
break
chunks.append(chunk)
# 简单判断:收到 \r\n\r\n 后再读 Content-Length 字节
raw = b"".join(chunks)
if b"\r\n\r\n" in raw:
header_end = raw.find(b"\r\n\r\n")
header_part = raw[:header_end].decode("utf-8", errors="replace")
content_length = 0
for line in header_part.split("\r\n")[1:]:
if line.lower().startswith("content-length:"):
content_length = int(line.split(":", 1)[1].strip())
body_received = len(raw) - header_end - 4
if body_received >= content_length:
break
if not chunks:
return
raw = b"".join(chunks)
method, path, _version, headers, body = parse_request(raw)
response = dispatch(method, path, headers, body)
conn.sendall(response)
print(f" [{datetime.now().strftime('%H:%M:%S')}] {addr[0]} {method} {path}")
except Exception as e:
print(f" [错误] {addr}: {e}")
finally:
conn.close()
可运行演示(补齐 Mock 数据与 print 反馈):
#!/usr/bin/env python3
"""
18-http-server.py
从零手写 HTTP/1.1 服务器 + urllib 演示客户端
用法:
python3 18-http-server.py --mode server # 启动服务器(Ctrl+C 停止)
python3 18-http-server.py --mode demo # 发请求演示(需先启动服务器)
"""
import argparse
import json
import socket
import threading
import time
import urllib.request
import urllib.error
from datetime import datetime
from typing import Callable, Dict, List, Optional, Tuple
# ─── 数据存储(内存) ────────────────────────────────────────────────────────
_tasks: List[Dict] = [
{"id": 1, "title": "阅读 HTTP RFC", "done": False},
{"id": 2, "title": "手写 socket 服务器", "done": True},
]
_next_id = 3
_lock = threading.Lock()
# ─── HTTP 解析 ───────────────────────────────────────────────────────────────
def parse_request(raw: bytes) -> Tuple[str, str, str, Dict[str, str], bytes]:
"""解析原始 HTTP 请求,返回 (method, path, version, headers, body)"""
header_end = raw.find(b"\r\n\r\n")
header_part = raw[:header_end].decode("utf-8", errors="replace")
body = raw[header_end + 4:]
lines = header_part.split("\r\n")
method, path, version = lines[0].split(" ", 2)
headers: Dict[str, str] = {}
for line in lines[1:]:
if ":" in line:
k, v = line.split(":", 1)
headers[k.strip().lower()] = v.strip()
content_length = int(headers.get("content-length", 0))
return method, path, version, headers, body[:content_length]
def build_response(status: int, body: dict) -> bytes:
"""构造 JSON 响应报文"""
status_text = {200: "OK", 201: "Created", 404: "Not Found",
405: "Method Not Allowed", 400: "Bad Request"}
payload = json.dumps(body, ensure_ascii=False).encode("utf-8")
header = (
f"HTTP/1.1 {status} {status_text.get(status, 'Unknown')}\r\n"
f"Content-Type: application/json; charset=utf-8\r\n"
f"Content-Length: {len(payload)}\r\n"
f"Connection: close\r\n"
f"\r\n"
).encode("utf-8")
return header + payload
# ─── 路由处理器 ──────────────────────────────────────────────────────────────
def handle_health(method: str, _path: str, _headers: Dict, _body: bytes) -> bytes:
if method != "GET":
return build_response(405, {"error": "Method Not Allowed"})
return build_response(200, {
"status": "ok",
"time": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
"tasks_count": len(_tasks),
})
def handle_tasks_get() -> bytes:
with _lock:
return build_response(200, {"tasks": list(_tasks), "total": len(_tasks)})
def handle_tasks_post(body: bytes) -> bytes:
global _next_id
try:
data = json.loads(body.decode("utf-8"))
title = data.get("title", "").strip()
if not title:
return build_response(400, {"error": "title 不能为空"})
except (json.JSONDecodeError, UnicodeDecodeError):
return build_response(400, {"error": "请求体必须是合法 JSON"})
with _lock:
task = {"id": _next_id, "title": title, "done": False}
_tasks.append(task)
_next_id += 1
return build_response(201, {"task": task})
def handle_tasks(method: str, _path: str, _headers: Dict, body: bytes) -> bytes:
if method == "GET":
return handle_tasks_get()
elif method == "POST":
return handle_tasks_post(body)
return build_response(405, {"error": "Method Not Allowed"})
# ─── 路由表 ──────────────────────────────────────────────────────────────────
Router = Dict[str, Callable[[str, str, Dict, bytes], bytes]]
ROUTES: Router = {
"/health": handle_health,
"/tasks": handle_tasks,
}
def dispatch(method: str, path: str, headers: Dict, body: bytes) -> bytes:
# 只匹配路径,忽略 query string
clean_path = path.split("?")[0]
handler = ROUTES.get(clean_path)
if handler is None:
return build_response(404, {"error": f"路径 {clean_path} 不存在"})
return handler(method, clean_path, headers, body)
# ─── 连接处理(每个连接一个线程) ────────────────────────────────────────────
def handle_connection(conn: socket.socket, addr: Tuple[str, int]) -> None:
try:
chunks = []
while True:
chunk = conn.recv(4096)
if not chunk:
break
chunks.append(chunk)
# 简单判断:收到 \r\n\r\n 后再读 Content-Length 字节
raw = b"".join(chunks)
if b"\r\n\r\n" in raw:
header_end = raw.find(b"\r\n\r\n")
header_part = raw[:header_end].decode("utf-8", errors="replace")
content_length = 0
for line in header_part.split("\r\n")[1:]:
if line.lower().startswith("content-length:"):
content_length = int(line.split(":", 1)[1].strip())
body_received = len(raw) - header_end - 4
if body_received >= content_length:
break
if not chunks:
return
raw = b"".join(chunks)
method, path, _version, headers, body = parse_request(raw)
response = dispatch(method, path, headers, body)
conn.sendall(response)
print(f" [{datetime.now().strftime('%H:%M:%S')}] {addr[0]} {method} {path}")
except Exception as e:
print(f" [错误] {addr}: {e}")
finally:
conn.close()
# ─── 服务器主循环 ────────────────────────────────────────────────────────────
def run_server(host: str = "127.0.0.1", port: int = 8000) -> None:
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
server.bind((host, port))
server.listen(10)
print(f"🚀 HTTP 服务器启动:http://{host}:{port}")
print(" 路由:GET /health | GET /tasks | POST /tasks")
print(" 按 Ctrl+C 停止\n")
try:
while True:
conn, addr = server.accept()
t = threading.Thread(target=handle_connection, args=(conn, addr), daemon=True)
t.start()
except KeyboardInterrupt:
print("\n服务器已停止。")
finally:
server.close()
# ─── 演示客户端 ──────────────────────────────────────────────────────────────
def print_table(title: str, rows: List[Dict]) -> None:
print(f"\n{'─'*50}")
print(f" {title}")
print(f"{'─'*50}")
for row in rows:
status_icon = "✅" if row.get("done") else "⬜"
print(f" [{row['id']:>2}] {status_icon} {row['title']}")
print(f"{'─'*50}")
def demo_client(base: str = "http://127.0.0.1:8000") -> None:
def get(path: str) -> dict:
with urllib.request.urlopen(f"{base}{path}") as r:
return json.loads(r.read())
def post(path: str, data: dict) -> dict:
payload = json.dumps(data).encode()
req = urllib.request.Request(
f"{base}{path}", data=payload,
headers={"Content-Type": "application/json"}, method="POST"
)
try:
with urllib.request.urlopen(req) as r:
return json.loads(r.read())
except urllib.error.HTTPError as e:
return json.loads(e.read())
print("\n=== HTTP 服务器演示 ===")
# 1. 健康检查
health = get("/health")
print(f"\n[GET /health] → {health}")
# 2. 获取任务列表
result = get("/tasks")
print_table("初始任务列表", result["tasks"])
# 3. 创建新任务
new_tasks = [
{"title": "部署到生产环境"},
{"title": "写单元测试"},
{"title": ""}, # 故意触发 400
]
print("\n[POST /tasks] 创建任务:")
for t in new_tasks:
resp = post("/tasks", t)
if "task" in resp:
print(f" ✅ 创建成功:{resp['task']}")
else:
print(f" ❌ 失败:{resp['error']}")
# 4. 再次获取列表
result = get("/tasks")
print_table("更新后任务列表", result["tasks"])
# 5. 404 演示
try:
urllib.request.urlopen(f"{base}/nonexistent")
except urllib.error.HTTPError as e:
body = json.loads(e.read())
print(f"\n[GET /nonexistent] → 404: {body['error']}")
# ─── 入口 ────────────────────────────────────────────────────────────────────
def main() -> None:
parser = argparse.ArgumentParser(description="从零手写 HTTP/1.1 服务器")
parser.add_argument("--mode", choices=["server", "demo"], default="server",
help="server=启动服务器, demo=发请求演示")
parser.add_argument("--host", default="127.0.0.1")
parser.add_argument("--port", type=int, default=8000)
args = parser.parse_args()
if args.mode == "server":
run_server(args.host, args.port)
else:
demo_client(f"http://{args.host}:{args.port}")
# Step 5 演示:不用占用真实端口,用 socketpair() 创建一对已经连通的“本地电话线”。
# 这样网页运行器、公司电脑、初学者电脑都不需要开放端口,也能看到完整请求响应。
server_sock, client_sock = socket.socketpair()
worker = threading.Thread(
target=handle_connection,
args=(server_sock, ("local-client", 0)),
daemon=True,
)
worker.start()
# 客户端这头发出一个最小 HTTP 请求:请求行 + 空行,表示没有请求体。
client_sock.sendall(b"GET /health HTTP/1.1\r\nHost: local\r\n\r\n")
response = client_sock.recv(4096)
client_sock.close()
worker.join(timeout=2)
head, body = response.decode("utf-8").split("\r\n\r\n", 1)
print("响应首行:", head.splitlines()[0])
print("响应正文:", body)
Step 6:用 run_server 启动主循环,每个连接分配一个守护线程
痛点与机制:
run_server 是服务器的"大堂经理":socket.listen(10) 设置等待队列长度,socket.accept() 阻塞等待新连接,每来一个连接就开一个线程去处理。SO_REUSEADDR 是关键——没有它,服务器重启后会报"地址已被占用",因为 TCP 的 TIME_WAIT 状态还没结束。
核心源码(逐字来自文末完整源码):
def run_server(host: str = "127.0.0.1", port: int = 8000) -> None:
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
server.bind((host, port))
server.listen(10)
print(f"🚀 HTTP 服务器启动:http://{host}:{port}")
print(" 路由:GET /health | GET /tasks | POST /tasks")
print(" 按 Ctrl+C 停止\n")
try:
while True:
conn, addr = server.accept()
t = threading.Thread(target=handle_connection, args=(conn, addr), daemon=True)
t.start()
except KeyboardInterrupt:
print("\n服务器已停止。")
finally:
server.close()
可运行演示(补齐 Mock 数据与 print 反馈):
#!/usr/bin/env python3
"""
18-http-server.py
从零手写 HTTP/1.1 服务器 + urllib 演示客户端
用法:
python3 18-http-server.py --mode server # 启动服务器(Ctrl+C 停止)
python3 18-http-server.py --mode demo # 发请求演示(需先启动服务器)
"""
import argparse
import json
import socket
import threading
import time
import urllib.request
import urllib.error
from datetime import datetime
from typing import Callable, Dict, List, Optional, Tuple
# ─── 数据存储(内存) ────────────────────────────────────────────────────────
_tasks: List[Dict] = [
{"id": 1, "title": "阅读 HTTP RFC", "done": False},
{"id": 2, "title": "手写 socket 服务器", "done": True},
]
_next_id = 3
_lock = threading.Lock()
# ─── HTTP 解析 ───────────────────────────────────────────────────────────────
def parse_request(raw: bytes) -> Tuple[str, str, str, Dict[str, str], bytes]:
"""解析原始 HTTP 请求,返回 (method, path, version, headers, body)"""
header_end = raw.find(b"\r\n\r\n")
header_part = raw[:header_end].decode("utf-8", errors="replace")
body = raw[header_end + 4:]
lines = header_part.split("\r\n")
method, path, version = lines[0].split(" ", 2)
headers: Dict[str, str] = {}
for line in lines[1:]:
if ":" in line:
k, v = line.split(":", 1)
headers[k.strip().lower()] = v.strip()
content_length = int(headers.get("content-length", 0))
return method, path, version, headers, body[:content_length]
def build_response(status: int, body: dict) -> bytes:
"""构造 JSON 响应报文"""
status_text = {200: "OK", 201: "Created", 404: "Not Found",
405: "Method Not Allowed", 400: "Bad Request"}
payload = json.dumps(body, ensure_ascii=False).encode("utf-8")
header = (
f"HTTP/1.1 {status} {status_text.get(status, 'Unknown')}\r\n"
f"Content-Type: application/json; charset=utf-8\r\n"
f"Content-Length: {len(payload)}\r\n"
f"Connection: close\r\n"
f"\r\n"
).encode("utf-8")
return header + payload
# ─── 路由处理器 ──────────────────────────────────────────────────────────────
def handle_health(method: str, _path: str, _headers: Dict, _body: bytes) -> bytes:
if method != "GET":
return build_response(405, {"error": "Method Not Allowed"})
return build_response(200, {
"status": "ok",
"time": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
"tasks_count": len(_tasks),
})
def handle_tasks_get() -> bytes:
with _lock:
return build_response(200, {"tasks": list(_tasks), "total": len(_tasks)})
def handle_tasks_post(body: bytes) -> bytes:
global _next_id
try:
data = json.loads(body.decode("utf-8"))
title = data.get("title", "").strip()
if not title:
return build_response(400, {"error": "title 不能为空"})
except (json.JSONDecodeError, UnicodeDecodeError):
return build_response(400, {"error": "请求体必须是合法 JSON"})
with _lock:
task = {"id": _next_id, "title": title, "done": False}
_tasks.append(task)
_next_id += 1
return build_response(201, {"task": task})
def handle_tasks(method: str, _path: str, _headers: Dict, body: bytes) -> bytes:
if method == "GET":
return handle_tasks_get()
elif method == "POST":
return handle_tasks_post(body)
return build_response(405, {"error": "Method Not Allowed"})
# ─── 路由表 ──────────────────────────────────────────────────────────────────
Router = Dict[str, Callable[[str, str, Dict, bytes], bytes]]
ROUTES: Router = {
"/health": handle_health,
"/tasks": handle_tasks,
}
def dispatch(method: str, path: str, headers: Dict, body: bytes) -> bytes:
# 只匹配路径,忽略 query string
clean_path = path.split("?")[0]
handler = ROUTES.get(clean_path)
if handler is None:
return build_response(404, {"error": f"路径 {clean_path} 不存在"})
return handler(method, clean_path, headers, body)
# ─── 连接处理(每个连接一个线程) ────────────────────────────────────────────
def handle_connection(conn: socket.socket, addr: Tuple[str, int]) -> None:
try:
chunks = []
while True:
chunk = conn.recv(4096)
if not chunk:
break
chunks.append(chunk)
# 简单判断:收到 \r\n\r\n 后再读 Content-Length 字节
raw = b"".join(chunks)
if b"\r\n\r\n" in raw:
header_end = raw.find(b"\r\n\r\n")
header_part = raw[:header_end].decode("utf-8", errors="replace")
content_length = 0
for line in header_part.split("\r\n")[1:]:
if line.lower().startswith("content-length:"):
content_length = int(line.split(":", 1)[1].strip())
body_received = len(raw) - header_end - 4
if body_received >= content_length:
break
if not chunks:
return
raw = b"".join(chunks)
method, path, _version, headers, body = parse_request(raw)
response = dispatch(method, path, headers, body)
conn.sendall(response)
print(f" [{datetime.now().strftime('%H:%M:%S')}] {addr[0]} {method} {path}")
except Exception as e:
print(f" [错误] {addr}: {e}")
finally:
conn.close()
# ─── 服务器主循环 ────────────────────────────────────────────────────────────
def run_server(host: str = "127.0.0.1", port: int = 8000) -> None:
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
server.bind((host, port))
server.listen(10)
print(f"🚀 HTTP 服务器启动:http://{host}:{port}")
print(" 路由:GET /health | GET /tasks | POST /tasks")
print(" 按 Ctrl+C 停止\n")
try:
while True:
conn, addr = server.accept()
t = threading.Thread(target=handle_connection, args=(conn, addr), daemon=True)
t.start()
except KeyboardInterrupt:
print("\n服务器已停止。")
finally:
server.close()
# ─── 演示客户端 ──────────────────────────────────────────────────────────────
def print_table(title: str, rows: List[Dict]) -> None:
print(f"\n{'─'*50}")
print(f" {title}")
print(f"{'─'*50}")
for row in rows:
status_icon = "✅" if row.get("done") else "⬜"
print(f" [{row['id']:>2}] {status_icon} {row['title']}")
print(f"{'─'*50}")
def demo_client(base: str = "http://127.0.0.1:8000") -> None:
def get(path: str) -> dict:
with urllib.request.urlopen(f"{base}{path}") as r:
return json.loads(r.read())
def post(path: str, data: dict) -> dict:
payload = json.dumps(data).encode()
req = urllib.request.Request(
f"{base}{path}", data=payload,
headers={"Content-Type": "application/json"}, method="POST"
)
try:
with urllib.request.urlopen(req) as r:
return json.loads(r.read())
except urllib.error.HTTPError as e:
return json.loads(e.read())
print("\n=== HTTP 服务器演示 ===")
# 1. 健康检查
health = get("/health")
print(f"\n[GET /health] → {health}")
# 2. 获取任务列表
result = get("/tasks")
print_table("初始任务列表", result["tasks"])
# 3. 创建新任务
new_tasks = [
{"title": "部署到生产环境"},
{"title": "写单元测试"},
{"title": ""}, # 故意触发 400
]
print("\n[POST /tasks] 创建任务:")
for t in new_tasks:
resp = post("/tasks", t)
if "task" in resp:
print(f" ✅ 创建成功:{resp['task']}")
else:
print(f" ❌ 失败:{resp['error']}")
# 4. 再次获取列表
result = get("/tasks")
print_table("更新后任务列表", result["tasks"])
# 5. 404 演示
try:
urllib.request.urlopen(f"{base}/nonexistent")
except urllib.error.HTTPError as e:
body = json.loads(e.read())
print(f"\n[GET /nonexistent] → 404: {body['error']}")
# ─── 入口 ────────────────────────────────────────────────────────────────────
def main() -> None:
parser = argparse.ArgumentParser(description="从零手写 HTTP/1.1 服务器")
parser.add_argument("--mode", choices=["server", "demo"], default="server",
help="server=启动服务器, demo=发请求演示")
parser.add_argument("--host", default="127.0.0.1")
parser.add_argument("--port", type=int, default=8000)
args = parser.parse_args()
if args.mode == "server":
run_server(args.host, args.port)
else:
demo_client(f"http://{args.host}:{args.port}")
# Step 6 演示:run_server 会进入无限监听循环,适合真实终端,不适合网页里直接运行。
# 所以这里用“拆解打印”的方式让你看懂它启动时做了哪几件事。
print("run_server 的启动清单:")
print("1. socket.socket(...) 创建 TCP 监听套接字")
print("2. setsockopt(SO_REUSEADDR) 允许服务重启后快速复用端口")
print("3. bind(('127.0.0.1', 8000)) 把服务器挂到本机地址")
print("4. listen(10) 开始排队接客,最多允许 10 个连接等待")
print("5. accept() 每接到一个客户端,就交给守护线程 handle_connection")
print("真实运行命令: python3 18-http-server.py --mode server --port 8000")
Step 7:用 demo_client 发真实 HTTP 请求,测试 GET/POST/404 全流程
痛点与机制:
demo_client 用 Python 内置的 urllib.request 发 HTTP 请求,不需要安装任何第三方库。urllib.error.HTTPError 会在状态码 >= 400 时抛出,但响应体仍然可以通过 e.read() 读取——这是测试 400/404 错误响应的标准姿势。print_table 用 emoji 和对齐格式让终端输出一眼可辨。
核心源码(逐字来自文末完整源码):
def demo_client(base: str = "http://127.0.0.1:8000") -> None:
def get(path: str) -> dict:
with urllib.request.urlopen(f"{base}{path}") as r:
return json.loads(r.read())
def post(path: str, data: dict) -> dict:
payload = json.dumps(data).encode()
req = urllib.request.Request(
f"{base}{path}", data=payload,
headers={"Content-Type": "application/json"}, method="POST"
)
try:
with urllib.request.urlopen(req) as r:
return json.loads(r.read())
except urllib.error.HTTPError as e:
return json.loads(e.read())
print("\n=== HTTP 服务器演示 ===")
# 1. 健康检查
health = get("/health")
print(f"\n[GET /health] → {health}")
# 2. 获取任务列表
result = get("/tasks")
print_table("初始任务列表", result["tasks"])
# 3. 创建新任务
new_tasks = [
{"title": "部署到生产环境"},
{"title": "写单元测试"},
{"title": ""}, # 故意触发 400
]
print("\n[POST /tasks] 创建任务:")
for t in new_tasks:
resp = post("/tasks", t)
if "task" in resp:
print(f" ✅ 创建成功:{resp['task']}")
else:
print(f" ❌ 失败:{resp['error']}")
# 4. 再次获取列表
result = get("/tasks")
print_table("更新后任务列表", result["tasks"])
# 5. 404 演示
try:
urllib.request.urlopen(f"{base}/nonexistent")
except urllib.error.HTTPError as e:
body = json.loads(e.read())
print(f"\n[GET /nonexistent] → 404: {body['error']}")
可运行演示(补齐 Mock 数据与 print 反馈):
#!/usr/bin/env python3
"""
18-http-server.py
从零手写 HTTP/1.1 服务器 + urllib 演示客户端
用法:
python3 18-http-server.py --mode server # 启动服务器(Ctrl+C 停止)
python3 18-http-server.py --mode demo # 发请求演示(需先启动服务器)
"""
import argparse
import json
import socket
import threading
import time
import urllib.request
import urllib.error
from datetime import datetime
from typing import Callable, Dict, List, Optional, Tuple
# ─── 数据存储(内存) ────────────────────────────────────────────────────────
_tasks: List[Dict] = [
{"id": 1, "title": "阅读 HTTP RFC", "done": False},
{"id": 2, "title": "手写 socket 服务器", "done": True},
]
_next_id = 3
_lock = threading.Lock()
# ─── HTTP 解析 ───────────────────────────────────────────────────────────────
def parse_request(raw: bytes) -> Tuple[str, str, str, Dict[str, str], bytes]:
"""解析原始 HTTP 请求,返回 (method, path, version, headers, body)"""
header_end = raw.find(b"\r\n\r\n")
header_part = raw[:header_end].decode("utf-8", errors="replace")
body = raw[header_end + 4:]
lines = header_part.split("\r\n")
method, path, version = lines[0].split(" ", 2)
headers: Dict[str, str] = {}
for line in lines[1:]:
if ":" in line:
k, v = line.split(":", 1)
headers[k.strip().lower()] = v.strip()
content_length = int(headers.get("content-length", 0))
return method, path, version, headers, body[:content_length]
def build_response(status: int, body: dict) -> bytes:
"""构造 JSON 响应报文"""
status_text = {200: "OK", 201: "Created", 404: "Not Found",
405: "Method Not Allowed", 400: "Bad Request"}
payload = json.dumps(body, ensure_ascii=False).encode("utf-8")
header = (
f"HTTP/1.1 {status} {status_text.get(status, 'Unknown')}\r\n"
f"Content-Type: application/json; charset=utf-8\r\n"
f"Content-Length: {len(payload)}\r\n"
f"Connection: close\r\n"
f"\r\n"
).encode("utf-8")
return header + payload
# ─── 路由处理器 ──────────────────────────────────────────────────────────────
def handle_health(method: str, _path: str, _headers: Dict, _body: bytes) -> bytes:
if method != "GET":
return build_response(405, {"error": "Method Not Allowed"})
return build_response(200, {
"status": "ok",
"time": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
"tasks_count": len(_tasks),
})
def handle_tasks_get() -> bytes:
with _lock:
return build_response(200, {"tasks": list(_tasks), "total": len(_tasks)})
def handle_tasks_post(body: bytes) -> bytes:
global _next_id
try:
data = json.loads(body.decode("utf-8"))
title = data.get("title", "").strip()
if not title:
return build_response(400, {"error": "title 不能为空"})
except (json.JSONDecodeError, UnicodeDecodeError):
return build_response(400, {"error": "请求体必须是合法 JSON"})
with _lock:
task = {"id": _next_id, "title": title, "done": False}
_tasks.append(task)
_next_id += 1
return build_response(201, {"task": task})
def handle_tasks(method: str, _path: str, _headers: Dict, body: bytes) -> bytes:
if method == "GET":
return handle_tasks_get()
elif method == "POST":
return handle_tasks_post(body)
return build_response(405, {"error": "Method Not Allowed"})
# ─── 路由表 ──────────────────────────────────────────────────────────────────
Router = Dict[str, Callable[[str, str, Dict, bytes], bytes]]
ROUTES: Router = {
"/health": handle_health,
"/tasks": handle_tasks,
}
def dispatch(method: str, path: str, headers: Dict, body: bytes) -> bytes:
# 只匹配路径,忽略 query string
clean_path = path.split("?")[0]
handler = ROUTES.get(clean_path)
if handler is None:
return build_response(404, {"error": f"路径 {clean_path} 不存在"})
return handler(method, clean_path, headers, body)
# ─── 连接处理(每个连接一个线程) ────────────────────────────────────────────
def handle_connection(conn: socket.socket, addr: Tuple[str, int]) -> None:
try:
chunks = []
while True:
chunk = conn.recv(4096)
if not chunk:
break
chunks.append(chunk)
# 简单判断:收到 \r\n\r\n 后再读 Content-Length 字节
raw = b"".join(chunks)
if b"\r\n\r\n" in raw:
header_end = raw.find(b"\r\n\r\n")
header_part = raw[:header_end].decode("utf-8", errors="replace")
content_length = 0
for line in header_part.split("\r\n")[1:]:
if line.lower().startswith("content-length:"):
content_length = int(line.split(":", 1)[1].strip())
body_received = len(raw) - header_end - 4
if body_received >= content_length:
break
if not chunks:
return
raw = b"".join(chunks)
method, path, _version, headers, body = parse_request(raw)
response = dispatch(method, path, headers, body)
conn.sendall(response)
print(f" [{datetime.now().strftime('%H:%M:%S')}] {addr[0]} {method} {path}")
except Exception as e:
print(f" [错误] {addr}: {e}")
finally:
conn.close()
# ─── 服务器主循环 ────────────────────────────────────────────────────────────
def run_server(host: str = "127.0.0.1", port: int = 8000) -> None:
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
server.bind((host, port))
server.listen(10)
print(f"🚀 HTTP 服务器启动:http://{host}:{port}")
print(" 路由:GET /health | GET /tasks | POST /tasks")
print(" 按 Ctrl+C 停止\n")
try:
while True:
conn, addr = server.accept()
t = threading.Thread(target=handle_connection, args=(conn, addr), daemon=True)
t.start()
except KeyboardInterrupt:
print("\n服务器已停止。")
finally:
server.close()
# ─── 演示客户端 ──────────────────────────────────────────────────────────────
def print_table(title: str, rows: List[Dict]) -> None:
print(f"\n{'─'*50}")
print(f" {title}")
print(f"{'─'*50}")
for row in rows:
status_icon = "✅" if row.get("done") else "⬜"
print(f" [{row['id']:>2}] {status_icon} {row['title']}")
print(f"{'─'*50}")
def demo_client(base: str = "http://127.0.0.1:8000") -> None:
def get(path: str) -> dict:
with urllib.request.urlopen(f"{base}{path}") as r:
return json.loads(r.read())
def post(path: str, data: dict) -> dict:
payload = json.dumps(data).encode()
req = urllib.request.Request(
f"{base}{path}", data=payload,
headers={"Content-Type": "application/json"}, method="POST"
)
try:
with urllib.request.urlopen(req) as r:
return json.loads(r.read())
except urllib.error.HTTPError as e:
return json.loads(e.read())
print("\n=== HTTP 服务器演示 ===")
# 1. 健康检查
health = get("/health")
print(f"\n[GET /health] → {health}")
# 2. 获取任务列表
result = get("/tasks")
print_table("初始任务列表", result["tasks"])
# 3. 创建新任务
new_tasks = [
{"title": "部署到生产环境"},
{"title": "写单元测试"},
{"title": ""}, # 故意触发 400
]
print("\n[POST /tasks] 创建任务:")
for t in new_tasks:
resp = post("/tasks", t)
if "task" in resp:
print(f" ✅ 创建成功:{resp['task']}")
else:
print(f" ❌ 失败:{resp['error']}")
# 4. 再次获取列表
result = get("/tasks")
print_table("更新后任务列表", result["tasks"])
# 5. 404 演示
try:
urllib.request.urlopen(f"{base}/nonexistent")
except urllib.error.HTTPError as e:
body = json.loads(e.read())
print(f"\n[GET /nonexistent] → 404: {body['error']}")
# ─── 入口 ────────────────────────────────────────────────────────────────────
def main() -> None:
parser = argparse.ArgumentParser(description="从零手写 HTTP/1.1 服务器")
parser.add_argument("--mode", choices=["server", "demo"], default="server",
help="server=启动服务器, demo=发请求演示")
parser.add_argument("--host", default="127.0.0.1")
parser.add_argument("--port", type=int, default=8000)
args = parser.parse_args()
if args.mode == "server":
run_server(args.host, args.port)
else:
demo_client(f"http://{args.host}:{args.port}")
# Step 7 演示:demo_client 的真实版本走 urllib + HTTP。
# 为了让网页运行器零网络依赖,这里直接调用 dispatch,模拟同样的 GET/POST/404 流程。
print("=== 离线版 HTTP 客户端演示 ===")
health = json.loads(dispatch("GET", "/health", {}, b"").split(b"\r\n\r\n", 1)[1])
print("[GET /health] ->", health["status"], "tasks_count=", health["tasks_count"])
before = json.loads(dispatch("GET", "/tasks", {}, b"").split(b"\r\n\r\n", 1)[1])
print_table("初始任务列表", before["tasks"])
payload = json.dumps({"title": "写一个离线 HTTP 测试"}, ensure_ascii=False).encode("utf-8")
created = json.loads(dispatch("POST", "/tasks", {}, payload).split(b"\r\n\r\n", 1)[1])
print("[POST /tasks] ->", created["task"])
missing = json.loads(dispatch("GET", "/missing", {}, b"").split(b"\r\n\r\n", 1)[1])
print("[GET /missing] ->", missing["error"])
Step 8:用 main 做 server/demo 两种模式的 CLI 总入口
痛点与机制:
main 用 argparse 做 CLI 入口:--mode server 启动服务器(阻塞,Ctrl+C 停止),--mode demo 发请求演示(需要服务器已在运行)。--host 和 --port 参数让读者不改代码就能换端口,避免端口冲突。这个"server/client 分离"的设计模式在所有网络工具里都很常见。
核心源码(逐字来自文末完整源码):
def main() -> None:
parser = argparse.ArgumentParser(description="从零手写 HTTP/1.1 服务器")
parser.add_argument("--mode", choices=["server", "demo"], default="server",
help="server=启动服务器, demo=发请求演示")
parser.add_argument("--host", default="127.0.0.1")
parser.add_argument("--port", type=int, default=8000)
args = parser.parse_args()
if args.mode == "server":
run_server(args.host, args.port)
else:
demo_client(f"http://{args.host}:{args.port}")
可运行演示(补齐 Mock 数据与 print 反馈):
#!/usr/bin/env python3
"""
18-http-server.py
从零手写 HTTP/1.1 服务器 + urllib 演示客户端
用法:
python3 18-http-server.py --mode server # 启动服务器(Ctrl+C 停止)
python3 18-http-server.py --mode demo # 发请求演示(需先启动服务器)
"""
import argparse
import json
import socket
import threading
import time
import urllib.request
import urllib.error
from datetime import datetime
from typing import Callable, Dict, List, Optional, Tuple
# ─── 数据存储(内存) ────────────────────────────────────────────────────────
_tasks: List[Dict] = [
{"id": 1, "title": "阅读 HTTP RFC", "done": False},
{"id": 2, "title": "手写 socket 服务器", "done": True},
]
_next_id = 3
_lock = threading.Lock()
# ─── HTTP 解析 ───────────────────────────────────────────────────────────────
def parse_request(raw: bytes) -> Tuple[str, str, str, Dict[str, str], bytes]:
"""解析原始 HTTP 请求,返回 (method, path, version, headers, body)"""
header_end = raw.find(b"\r\n\r\n")
header_part = raw[:header_end].decode("utf-8", errors="replace")
body = raw[header_end + 4:]
lines = header_part.split("\r\n")
method, path, version = lines[0].split(" ", 2)
headers: Dict[str, str] = {}
for line in lines[1:]:
if ":" in line:
k, v = line.split(":", 1)
headers[k.strip().lower()] = v.strip()
content_length = int(headers.get("content-length", 0))
return method, path, version, headers, body[:content_length]
def build_response(status: int, body: dict) -> bytes:
"""构造 JSON 响应报文"""
status_text = {200: "OK", 201: "Created", 404: "Not Found",
405: "Method Not Allowed", 400: "Bad Request"}
payload = json.dumps(body, ensure_ascii=False).encode("utf-8")
header = (
f"HTTP/1.1 {status} {status_text.get(status, 'Unknown')}\r\n"
f"Content-Type: application/json; charset=utf-8\r\n"
f"Content-Length: {len(payload)}\r\n"
f"Connection: close\r\n"
f"\r\n"
).encode("utf-8")
return header + payload
# ─── 路由处理器 ──────────────────────────────────────────────────────────────
def handle_health(method: str, _path: str, _headers: Dict, _body: bytes) -> bytes:
if method != "GET":
return build_response(405, {"error": "Method Not Allowed"})
return build_response(200, {
"status": "ok",
"time": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
"tasks_count": len(_tasks),
})
def handle_tasks_get() -> bytes:
with _lock:
return build_response(200, {"tasks": list(_tasks), "total": len(_tasks)})
def handle_tasks_post(body: bytes) -> bytes:
global _next_id
try:
data = json.loads(body.decode("utf-8"))
title = data.get("title", "").strip()
if not title:
return build_response(400, {"error": "title 不能为空"})
except (json.JSONDecodeError, UnicodeDecodeError):
return build_response(400, {"error": "请求体必须是合法 JSON"})
with _lock:
task = {"id": _next_id, "title": title, "done": False}
_tasks.append(task)
_next_id += 1
return build_response(201, {"task": task})
def handle_tasks(method: str, _path: str, _headers: Dict, body: bytes) -> bytes:
if method == "GET":
return handle_tasks_get()
elif method == "POST":
return handle_tasks_post(body)
return build_response(405, {"error": "Method Not Allowed"})
# ─── 路由表 ──────────────────────────────────────────────────────────────────
Router = Dict[str, Callable[[str, str, Dict, bytes], bytes]]
ROUTES: Router = {
"/health": handle_health,
"/tasks": handle_tasks,
}
def dispatch(method: str, path: str, headers: Dict, body: bytes) -> bytes:
# 只匹配路径,忽略 query string
clean_path = path.split("?")[0]
handler = ROUTES.get(clean_path)
if handler is None:
return build_response(404, {"error": f"路径 {clean_path} 不存在"})
return handler(method, clean_path, headers, body)
# ─── 连接处理(每个连接一个线程) ────────────────────────────────────────────
def handle_connection(conn: socket.socket, addr: Tuple[str, int]) -> None:
try:
chunks = []
while True:
chunk = conn.recv(4096)
if not chunk:
break
chunks.append(chunk)
# 简单判断:收到 \r\n\r\n 后再读 Content-Length 字节
raw = b"".join(chunks)
if b"\r\n\r\n" in raw:
header_end = raw.find(b"\r\n\r\n")
header_part = raw[:header_end].decode("utf-8", errors="replace")
content_length = 0
for line in header_part.split("\r\n")[1:]:
if line.lower().startswith("content-length:"):
content_length = int(line.split(":", 1)[1].strip())
body_received = len(raw) - header_end - 4
if body_received >= content_length:
break
if not chunks:
return
raw = b"".join(chunks)
method, path, _version, headers, body = parse_request(raw)
response = dispatch(method, path, headers, body)
conn.sendall(response)
print(f" [{datetime.now().strftime('%H:%M:%S')}] {addr[0]} {method} {path}")
except Exception as e:
print(f" [错误] {addr}: {e}")
finally:
conn.close()
# ─── 服务器主循环 ────────────────────────────────────────────────────────────
def run_server(host: str = "127.0.0.1", port: int = 8000) -> None:
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
server.bind((host, port))
server.listen(10)
print(f"🚀 HTTP 服务器启动:http://{host}:{port}")
print(" 路由:GET /health | GET /tasks | POST /tasks")
print(" 按 Ctrl+C 停止\n")
try:
while True:
conn, addr = server.accept()
t = threading.Thread(target=handle_connection, args=(conn, addr), daemon=True)
t.start()
except KeyboardInterrupt:
print("\n服务器已停止。")
finally:
server.close()
# ─── 演示客户端 ──────────────────────────────────────────────────────────────
def print_table(title: str, rows: List[Dict]) -> None:
print(f"\n{'─'*50}")
print(f" {title}")
print(f"{'─'*50}")
for row in rows:
status_icon = "✅" if row.get("done") else "⬜"
print(f" [{row['id']:>2}] {status_icon} {row['title']}")
print(f"{'─'*50}")
def demo_client(base: str = "http://127.0.0.1:8000") -> None:
def get(path: str) -> dict:
with urllib.request.urlopen(f"{base}{path}") as r:
return json.loads(r.read())
def post(path: str, data: dict) -> dict:
payload = json.dumps(data).encode()
req = urllib.request.Request(
f"{base}{path}", data=payload,
headers={"Content-Type": "application/json"}, method="POST"
)
try:
with urllib.request.urlopen(req) as r:
return json.loads(r.read())
except urllib.error.HTTPError as e:
return json.loads(e.read())
print("\n=== HTTP 服务器演示 ===")
# 1. 健康检查
health = get("/health")
print(f"\n[GET /health] → {health}")
# 2. 获取任务列表
result = get("/tasks")
print_table("初始任务列表", result["tasks"])
# 3. 创建新任务
new_tasks = [
{"title": "部署到生产环境"},
{"title": "写单元测试"},
{"title": ""}, # 故意触发 400
]
print("\n[POST /tasks] 创建任务:")
for t in new_tasks:
resp = post("/tasks", t)
if "task" in resp:
print(f" ✅ 创建成功:{resp['task']}")
else:
print(f" ❌ 失败:{resp['error']}")
# 4. 再次获取列表
result = get("/tasks")
print_table("更新后任务列表", result["tasks"])
# 5. 404 演示
try:
urllib.request.urlopen(f"{base}/nonexistent")
except urllib.error.HTTPError as e:
body = json.loads(e.read())
print(f"\n[GET /nonexistent] → 404: {body['error']}")
# ─── 入口 ────────────────────────────────────────────────────────────────────
def main() -> None:
parser = argparse.ArgumentParser(description="从零手写 HTTP/1.1 服务器")
parser.add_argument("--mode", choices=["server", "demo"], default="server",
help="server=启动服务器, demo=发请求演示")
parser.add_argument("--host", default="127.0.0.1")
parser.add_argument("--port", type=int, default=8000)
args = parser.parse_args()
if args.mode == "server":
run_server(args.host, args.port)
else:
demo_client(f"http://{args.host}:{args.port}")
# Step 8 演示:main() 会根据命令行参数调用 run_server 或 demo_client。
# 这里把两个真实函数临时替换成安全的假函数,验证 CLI 分发逻辑,不占端口、不访问网络。
def fake_run_server(host: str, port: int) -> None:
print(f"server 模式已分发: host={host}, port={port}")
def fake_demo_client(base: str) -> None:
print(f"demo 模式已分发: base={base}")
run_server = fake_run_server
demo_client = fake_demo_client
import sys
for mode in ("server", "demo"):
sys.argv = ["prog", "--mode", mode, "--host", "127.0.0.1", "--port", "18080"]
main()
极客实战:完整源码与运行
现在,把上面的积木拼起来,将以下完整代码放进你的编辑器,运行它。先看整体闭环,再回头逐段改参数,你会更容易建立工程直觉。
#!/usr/bin/env python3
"""
18-http-server.py
从零手写 HTTP/1.1 服务器 + urllib 演示客户端
用法:
python3 18-http-server.py --mode server # 启动服务器(Ctrl+C 停止)
python3 18-http-server.py --mode demo # 发请求演示(需先启动服务器)
"""
import argparse
import json
import socket
import threading
import time
import urllib.request
import urllib.error
from datetime import datetime
from typing import Callable, Dict, List, Optional, Tuple
# ─── 数据存储(内存) ────────────────────────────────────────────────────────
_tasks: List[Dict] = [
{"id": 1, "title": "阅读 HTTP RFC", "done": False},
{"id": 2, "title": "手写 socket 服务器", "done": True},
]
_next_id = 3
_lock = threading.Lock()
# ─── HTTP 解析 ───────────────────────────────────────────────────────────────
def parse_request(raw: bytes) -> Tuple[str, str, str, Dict[str, str], bytes]:
"""解析原始 HTTP 请求,返回 (method, path, version, headers, body)"""
header_end = raw.find(b"\r\n\r\n")
header_part = raw[:header_end].decode("utf-8", errors="replace")
body = raw[header_end + 4:]
lines = header_part.split("\r\n")
method, path, version = lines[0].split(" ", 2)
headers: Dict[str, str] = {}
for line in lines[1:]:
if ":" in line:
k, v = line.split(":", 1)
headers[k.strip().lower()] = v.strip()
content_length = int(headers.get("content-length", 0))
return method, path, version, headers, body[:content_length]
def build_response(status: int, body: dict) -> bytes:
"""构造 JSON 响应报文"""
status_text = {200: "OK", 201: "Created", 404: "Not Found",
405: "Method Not Allowed", 400: "Bad Request"}
payload = json.dumps(body, ensure_ascii=False).encode("utf-8")
header = (
f"HTTP/1.1 {status} {status_text.get(status, 'Unknown')}\r\n"
f"Content-Type: application/json; charset=utf-8\r\n"
f"Content-Length: {len(payload)}\r\n"
f"Connection: close\r\n"
f"\r\n"
).encode("utf-8")
return header + payload
# ─── 路由处理器 ──────────────────────────────────────────────────────────────
def handle_health(method: str, _path: str, _headers: Dict, _body: bytes) -> bytes:
if method != "GET":
return build_response(405, {"error": "Method Not Allowed"})
return build_response(200, {
"status": "ok",
"time": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
"tasks_count": len(_tasks),
})
def handle_tasks_get() -> bytes:
with _lock:
return build_response(200, {"tasks": list(_tasks), "total": len(_tasks)})
def handle_tasks_post(body: bytes) -> bytes:
global _next_id
try:
data = json.loads(body.decode("utf-8"))
title = data.get("title", "").strip()
if not title:
return build_response(400, {"error": "title 不能为空"})
except (json.JSONDecodeError, UnicodeDecodeError):
return build_response(400, {"error": "请求体必须是合法 JSON"})
with _lock:
task = {"id": _next_id, "title": title, "done": False}
_tasks.append(task)
_next_id += 1
return build_response(201, {"task": task})
def handle_tasks(method: str, _path: str, _headers: Dict, body: bytes) -> bytes:
if method == "GET":
return handle_tasks_get()
elif method == "POST":
return handle_tasks_post(body)
return build_response(405, {"error": "Method Not Allowed"})
# ─── 路由表 ──────────────────────────────────────────────────────────────────
Router = Dict[str, Callable[[str, str, Dict, bytes], bytes]]
ROUTES: Router = {
"/health": handle_health,
"/tasks": handle_tasks,
}
def dispatch(method: str, path: str, headers: Dict, body: bytes) -> bytes:
# 只匹配路径,忽略 query string
clean_path = path.split("?")[0]
handler = ROUTES.get(clean_path)
if handler is None:
return build_response(404, {"error": f"路径 {clean_path} 不存在"})
return handler(method, clean_path, headers, body)
# ─── 连接处理(每个连接一个线程) ────────────────────────────────────────────
def handle_connection(conn: socket.socket, addr: Tuple[str, int]) -> None:
try:
chunks = []
while True:
chunk = conn.recv(4096)
if not chunk:
break
chunks.append(chunk)
# 简单判断:收到 \r\n\r\n 后再读 Content-Length 字节
raw = b"".join(chunks)
if b"\r\n\r\n" in raw:
header_end = raw.find(b"\r\n\r\n")
header_part = raw[:header_end].decode("utf-8", errors="replace")
content_length = 0
for line in header_part.split("\r\n")[1:]:
if line.lower().startswith("content-length:"):
content_length = int(line.split(":", 1)[1].strip())
body_received = len(raw) - header_end - 4
if body_received >= content_length:
break
if not chunks:
return
raw = b"".join(chunks)
method, path, _version, headers, body = parse_request(raw)
response = dispatch(method, path, headers, body)
conn.sendall(response)
print(f" [{datetime.now().strftime('%H:%M:%S')}] {addr[0]} {method} {path}")
except Exception as e:
print(f" [错误] {addr}: {e}")
finally:
conn.close()
# ─── 服务器主循环 ────────────────────────────────────────────────────────────
def run_server(host: str = "127.0.0.1", port: int = 8000) -> None:
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
server.bind((host, port))
server.listen(10)
print(f"🚀 HTTP 服务器启动:http://{host}:{port}")
print(" 路由:GET /health | GET /tasks | POST /tasks")
print(" 按 Ctrl+C 停止\n")
try:
while True:
conn, addr = server.accept()
t = threading.Thread(target=handle_connection, args=(conn, addr), daemon=True)
t.start()
except KeyboardInterrupt:
print("\n服务器已停止。")
finally:
server.close()
# ─── 演示客户端 ──────────────────────────────────────────────────────────────
def print_table(title: str, rows: List[Dict]) -> None:
print(f"\n{'─'*50}")
print(f" {title}")
print(f"{'─'*50}")
for row in rows:
status_icon = "✅" if row.get("done") else "⬜"
print(f" [{row['id']:>2}] {status_icon} {row['title']}")
print(f"{'─'*50}")
def demo_client(base: str = "http://127.0.0.1:8000") -> None:
def get(path: str) -> dict:
with urllib.request.urlopen(f"{base}{path}") as r:
return json.loads(r.read())
def post(path: str, data: dict) -> dict:
payload = json.dumps(data).encode()
req = urllib.request.Request(
f"{base}{path}", data=payload,
headers={"Content-Type": "application/json"}, method="POST"
)
try:
with urllib.request.urlopen(req) as r:
return json.loads(r.read())
except urllib.error.HTTPError as e:
return json.loads(e.read())
print("\n=== HTTP 服务器演示 ===")
# 1. 健康检查
health = get("/health")
print(f"\n[GET /health] → {health}")
# 2. 获取任务列表
result = get("/tasks")
print_table("初始任务列表", result["tasks"])
# 3. 创建新任务
new_tasks = [
{"title": "部署到生产环境"},
{"title": "写单元测试"},
{"title": ""}, # 故意触发 400
]
print("\n[POST /tasks] 创建任务:")
for t in new_tasks:
resp = post("/tasks", t)
if "task" in resp:
print(f" ✅ 创建成功:{resp['task']}")
else:
print(f" ❌ 失败:{resp['error']}")
# 4. 再次获取列表
result = get("/tasks")
print_table("更新后任务列表", result["tasks"])
# 5. 404 演示
try:
urllib.request.urlopen(f"{base}/nonexistent")
except urllib.error.HTTPError as e:
body = json.loads(e.read())
print(f"\n[GET /nonexistent] → 404: {body['error']}")
# ─── 入口 ────────────────────────────────────────────────────────────────────
def main() -> None:
parser = argparse.ArgumentParser(description="从零手写 HTTP/1.1 服务器")
parser.add_argument("--mode", choices=["server", "demo"], default="server",
help="server=启动服务器, demo=发请求演示")
parser.add_argument("--host", default="127.0.0.1")
parser.add_argument("--port", type=int, default=8000)
args = parser.parse_args()
if args.mode == "server":
run_server(args.host, args.port)
else:
demo_client(f"http://{args.host}:{args.port}")
if __name__ == "__main__":
import sys
sys.argv = ["", "--mode", "demo"]
# main() # 需要网络环境
print("HTTP Server 演示(沙箱无网络,跳过实际启动)")
$ python3 18-python-http-server.py --mode server
🚀 HTTP 服务器启动:http://127.0.0.1:8000
路由:GET /health | GET /tasks | POST /tasks
按 Ctrl+C 停止
[22:54:12] 127.0.0.1 GET /health
[22:54:12] 127.0.0.1 GET /tasks
[22:54:12] 127.0.0.1 POST /tasks
[22:54:12] 127.0.0.1 POST /tasks
[22:54:12] 127.0.0.1 POST /tasks
[22:54:12] 127.0.0.1 GET /tasks
[22:54:12] 127.0.0.1 GET /nonexistent
$ python3 18-python-http-server.py --mode demo
=== HTTP 服务器演示 ===
[GET /health] → {'status': 'ok', 'time': '2026-04-17 22:54:12', 'tasks_count': 2}
──────────────────────────────────────────────────
初始任务列表
──────────────────────────────────────────────────
[ 1] ⬜ 阅读 HTTP RFC
[ 2] ✅ 手写 socket 服务器
──────────────────────────────────────────────────
[POST /tasks] 创建任务:
✅ 创建成功:{'id': 3, 'title': '部署到生产环境', 'done': False}
✅ 创建成功:{'id': 4, 'title': '写单元测试', 'done': False}
❌ 失败:title 不能为空
──────────────────────────────────────────────────
更新后任务列表
──────────────────────────────────────────────────
[ 1] ⬜ 阅读 HTTP RFC
[ 2] ✅ 手写 socket 服务器
[ 3] ⬜ 部署到生产环境
[ 4] ⬜ 写单元测试
──────────────────────────────────────────────────
[GET /nonexistent] → 404: 路径 /nonexistent 不存在
小结
| 概念 | 一句话记忆 |
|---|---|
parse_request |
找 \r\n\r\n 切头部,按 Content-Length 截正文 |
build_response |
状态行 + 响应头 + 空行 + JSON 正文,Content-Length 必须准确 |
ROUTES 字典 |
O(1) 路由查找,Flask/FastAPI 路由注册的核心思想 |
dispatch |
去掉 query string 后查字典,找不到返回 404 |
handle_connection |
循环 recv 直到收完完整请求,每个连接独立线程 |
SO_REUSEADDR |
服务器重启不报"地址已被占用" |
daemon=True |
守护线程随主进程退出,不阻塞程序关闭 |
urllib.error.HTTPError |
状态码 ≥ 400 时抛出,但 e.read() 仍可读响应体 |
⏱ NexDo Time(5 分钟)
挑战:给服务器加一个 DELETE /tasks/<id> 路由,支持按 ID 删除任务。
具体步骤:
- 在
dispatch里加路径参数解析:检测路径是否匹配/tasks/前缀,提取末尾的数字 ID - 新增
handle_task_delete(task_id: int) -> bytes函数:在_tasks列表里找到对应 ID 的任务并删除,找不到返回 404 - 在
ROUTES里注册这个新处理器(或在dispatch里特殊处理) - 用
urllib.request.Request(..., method="DELETE")发一个删除请求,验证任务确实消失了
Don’t wait for next time, do it in the next moment.