文章

18 · HTTP 与 Web 框架基石:从零手写服务器

#017 · 2026-04-16 · Python

🔗 知识图谱导航:阅读本文前,建议先掌握《13 · IO 多路复用:select/epoll 与事件驱动》中的 socket 基础和《16 · 数据库底座:SQLite 核心操作》中的数据持久化——本文把这两块拼在一起,用 socket 手写一个能处理真实 HTTP 请求的服务器。

极客解析:Django、FastAPI 的底层都是这条链路:socket.accept() → 解析请求报文 → 路由分发 → 业务处理 → 拼装响应报文 → socket.send()。把这条链路手写一遍,框架的"魔法"就全部消失了。

痛点与架构:新手学 Web 开发最常见的困惑:① HTTP 请求到底长什么样?② 框架的路由是怎么工作的?③ 为什么 POST 请求要带 Content-Length?本文用 200 行纯 Python 把这三个问题全部答透。

HTTP 协议结构

┌─────────────────────────────────────────────────────┐
│                   HTTP/1.1 请求报文                   │
├─────────────────────────────────────────────────────┤
│  请求行   │ GET /tasks HTTP/1.1                      │
├─────────────────────────────────────────────────────┤
│           │ Host: localhost:8000                     │
│  请求头   │ Content-Type: application/json           │
│           │ Authorization: Bearer <token>            │
├─────────────────────────────────────────────────────┤
│  空行     │ \r\n                                     │
├─────────────────────────────────────────────────────┤
│  请求体   │ {"title": "写博客", "done": false}       │
└─────────────────────────────────────────────────────┘

┌─────────────────────────────────────────────────────┐
│                   HTTP/1.1 响应报文                   │
├─────────────────────────────────────────────────────┤
│  状态行   │ HTTP/1.1 200 OK                          │
├─────────────────────────────────────────────────────┤
│           │ Content-Type: application/json           │
│  响应头   │ Content-Length: 42                       │
│           │ Connection: close                        │
├─────────────────────────────────────────────────────┤
│  空行     │ \r\n                                     │
├─────────────────────────────────────────────────────┤
│  响应体   │ {"status": "ok", "data": [...]}          │
└─────────────────────────────────────────────────────┘

请求方法语义

┌──────────┬────────────┬──────────┬──────────────────────────┐
│  方法    │  幂等性    │  安全性  │  典型用途                │
├──────────┼────────────┼──────────┼──────────────────────────┤
│  GET     │    是      │    是    │  读取资源                │
│  POST    │    否      │    否    │  创建资源                │
│  PUT     │    是      │    否    │  全量更新资源            │
│  PATCH   │    否      │    否    │  部分更新资源            │
│  DELETE  │    是      │    否    │  删除资源                │
│  OPTIONS │    是      │    是    │  查询支持的方法(CORS预检)│
└──────────┴────────────┴──────────┴──────────────────────────┘

常用状态码速查

2xx 成功:200 OK | 201 Created | 204 No Content
3xx 重定向:301 永久 | 302 临时
4xx 客户端错误:400 Bad Request | 401 未认证 | 403 无权限 | 404 Not Found | 429 限流
5xx 服务端错误:500 Internal Server Error | 502 Bad Gateway | 503 Service Unavailable

服务器处理链路

socket.bind() → socket.listen() → socket.accept()
       ↓
  接收原始字节流(TCP 不保证边界)
       ↓
  解析请求行 + 请求头 + 请求体(按 Content-Length 截断)
       ↓
  路由表字典查找 → 业务处理函数
       ↓
  拼装响应报文 → socket.sendall()

步步为营:核心逻辑自适应拆解

这一篇的核心是一条链路:socket.accept()parse_requestdispatchhandle_*build_responsesocket.sendall()。下面每一步都聚焦链路上的一个环节,跑完就能看到结果。

Step 1:用 parse_request 把原始字节流拆成五元组

痛点与机制

HTTP 报文在 TCP 层面只是一串字节,没有任何结构。parse_request 就像"快递拆包工":先找到 \r\n\r\n(头部和正文的分隔线),把头部按行切开,再按 Content-Length 截取正文。raw.find(b"\r\n\r\n") 是关键——HTTP 协议规定头部和正文之间必须有一个空行,这个空行在字节层面就是 \r\n\r\n

核心源码(逐字来自文末完整源码)

def parse_request(raw: bytes) -> Tuple[str, str, str, Dict[str, str], bytes]:
    """解析原始 HTTP 请求,返回 (method, path, version, headers, body)"""
    header_end = raw.find(b"\r\n\r\n")
    header_part = raw[:header_end].decode("utf-8", errors="replace")
    body = raw[header_end + 4:]

    lines = header_part.split("\r\n")
    method, path, version = lines[0].split(" ", 2)
    headers: Dict[str, str] = {}
    for line in lines[1:]:
        if ":" in line:
            k, v = line.split(":", 1)
            headers[k.strip().lower()] = v.strip()

    content_length = int(headers.get("content-length", 0))
    return method, path, version, headers, body[:content_length]

可运行演示(补齐 Mock 数据与 print 反馈)

#!/usr/bin/env python3
"""
18-http-server.py
从零手写 HTTP/1.1 服务器 + urllib 演示客户端

用法:
  python3 18-http-server.py --mode server   # 启动服务器(Ctrl+C 停止)
  python3 18-http-server.py --mode demo     # 发请求演示(需先启动服务器)
"""

import argparse
import json
import socket
import threading
import time
import urllib.request
import urllib.error
from datetime import datetime
from typing import Callable, Dict, List, Optional, Tuple


_tasks: List[Dict] = [
    {"id": 1, "title": "阅读 HTTP RFC", "done": False},
    {"id": 2, "title": "手写 socket 服务器", "done": True},
]
_next_id = 3
_lock = threading.Lock()

def parse_request(raw: bytes) -> Tuple[str, str, str, Dict[str, str], bytes]:
    """解析原始 HTTP 请求,返回 (method, path, version, headers, body)"""
    header_end = raw.find(b"\r\n\r\n")
    header_part = raw[:header_end].decode("utf-8", errors="replace")
    body = raw[header_end + 4:]

    lines = header_part.split("\r\n")
    method, path, version = lines[0].split(" ", 2)
    headers: Dict[str, str] = {}
    for line in lines[1:]:
        if ":" in line:
            k, v = line.split(":", 1)
            headers[k.strip().lower()] = v.strip()

    content_length = int(headers.get("content-length", 0))
    return method, path, version, headers, body[:content_length]
# parse_request 把原始字节流拆成 method/path/version/headers/body 五元组
raw = b"GET /tasks?status=done HTTP/1.1\r\nHost: localhost:8000\r\nUser-Agent: curl\r\n\r\n"
method, path, version, headers, body = parse_request(raw)
print(f"method={method}, path={path}, version={version}")
print(f"headers: {dict(list(headers.items())[:2])}")

Step 2:用 build_response 把状态码和字典拼成合法响应报文

痛点与机制

HTTP 响应报文的格式是固定的:状态行 + 响应头 + 空行 + 响应体。build_response 就像"快递打包工":先把字典序列化成 JSON 字节,算出字节长度写入 Content-Length 头,再把所有部分用 \r\n 拼接起来。Content-Length 是关键——客户端靠它知道响应体读到哪里结束,少了这个头浏览器会一直等待。

核心源码(逐字来自文末完整源码)

def build_response(status: int, body: dict) -> bytes:
    """构造 JSON 响应报文"""
    status_text = {200: "OK", 201: "Created", 404: "Not Found",
                   405: "Method Not Allowed", 400: "Bad Request"}
    payload = json.dumps(body, ensure_ascii=False).encode("utf-8")
    header = (
        f"HTTP/1.1 {status} {status_text.get(status, 'Unknown')}\r\n"
        f"Content-Type: application/json; charset=utf-8\r\n"
        f"Content-Length: {len(payload)}\r\n"
        f"Connection: close\r\n"
        f"\r\n"
    ).encode("utf-8")
    return header + payload

可运行演示(补齐 Mock 数据与 print 反馈)

#!/usr/bin/env python3
"""
18-http-server.py
从零手写 HTTP/1.1 服务器 + urllib 演示客户端

用法:
  python3 18-http-server.py --mode server   # 启动服务器(Ctrl+C 停止)
  python3 18-http-server.py --mode demo     # 发请求演示(需先启动服务器)
"""

import argparse
import json
import socket
import threading
import time
import urllib.request
import urllib.error
from datetime import datetime
from typing import Callable, Dict, List, Optional, Tuple

# ─── 数据存储(内存) ────────────────────────────────────────────────────────
_tasks: List[Dict] = [
    {"id": 1, "title": "阅读 HTTP RFC", "done": False},
    {"id": 2, "title": "手写 socket 服务器", "done": True},
]
_next_id = 3
_lock = threading.Lock()

def build_response(status: int, body: dict) -> bytes:
    """构造 JSON 响应报文"""
    status_text = {200: "OK", 201: "Created", 404: "Not Found",
                   405: "Method Not Allowed", 400: "Bad Request"}
    payload = json.dumps(body, ensure_ascii=False).encode("utf-8")
    header = (
        f"HTTP/1.1 {status} {status_text.get(status, 'Unknown')}\r\n"
        f"Content-Type: application/json; charset=utf-8\r\n"
        f"Content-Length: {len(payload)}\r\n"
        f"Connection: close\r\n"
        f"\r\n"
    ).encode("utf-8")
    return header + payload
# build_response 把状态码+字典拼成合法的 HTTP 响应报文字节流
resp = build_response(200, {"message": "ok", "count": 42})
print(f"响应首行: {resp.split(b'\r\n')[0].decode()}")
print(f"Content-Length 头: {[l for l in resp.split(b'\r\n') if b'Content-Length' in l][0].decode()}")

Step 3:用 handle_health / handle_tasks 实现路由处理器

痛点与机制

路由处理器就像"窗口服务员":每个窗口(路径)只处理特定类型的业务。handle_tasks_post 里的 try/except json.JSONDecodeError 是防御性编程——客户端可能发来任何内容,不能假设它一定是合法 JSON。with _lock: 保护 _tasks 列表的并发写入,就像银行柜台同一时刻只能一个人操作账本。

核心源码(逐字来自文末完整源码)

def handle_tasks_post(body: bytes) -> bytes:
    global _next_id
    try:
        data = json.loads(body.decode("utf-8"))
        title = data.get("title", "").strip()
        if not title:
            return build_response(400, {"error": "title 不能为空"})
    except (json.JSONDecodeError, UnicodeDecodeError):
        return build_response(400, {"error": "请求体必须是合法 JSON"})

    with _lock:
        task = {"id": _next_id, "title": title, "done": False}
        _tasks.append(task)
        _next_id += 1
    return build_response(201, {"task": task})

可运行演示(补齐 Mock 数据与 print 反馈)

#!/usr/bin/env python3
"""
18-http-server.py
从零手写 HTTP/1.1 服务器 + urllib 演示客户端

用法:
  python3 18-http-server.py --mode server   # 启动服务器(Ctrl+C 停止)
  python3 18-http-server.py --mode demo     # 发请求演示(需先启动服务器)
"""

import argparse
import json
import socket
import threading
import time
import urllib.request
import urllib.error
from datetime import datetime
from typing import Callable, Dict, List, Optional, Tuple

# ─── 数据存储(内存) ────────────────────────────────────────────────────────
_tasks: List[Dict] = [
    {"id": 1, "title": "阅读 HTTP RFC", "done": False},
    {"id": 2, "title": "手写 socket 服务器", "done": True},
]
_next_id = 3
_lock = threading.Lock()

def parse_request(raw: bytes) -> Tuple[str, str, str, Dict[str, str], bytes]:
    """解析原始 HTTP 请求,返回 (method, path, version, headers, body)"""
    header_end = raw.find(b"\r\n\r\n")
    header_part = raw[:header_end].decode("utf-8", errors="replace")
    body = raw[header_end + 4:]

    lines = header_part.split("\r\n")
    method, path, version = lines[0].split(" ", 2)
    headers: Dict[str, str] = {}
    for line in lines[1:]:
        if ":" in line:
            k, v = line.split(":", 1)
            headers[k.strip().lower()] = v.strip()

    content_length = int(headers.get("content-length", 0))
    return method, path, version, headers, body[:content_length]

def build_response(status: int, body: dict) -> bytes:
    """构造 JSON 响应报文"""
    status_text = {200: "OK", 201: "Created", 404: "Not Found",
                   405: "Method Not Allowed", 400: "Bad Request"}
    payload = json.dumps(body, ensure_ascii=False).encode("utf-8")
    header = (
        f"HTTP/1.1 {status} {status_text.get(status, 'Unknown')}\r\n"
        f"Content-Type: application/json; charset=utf-8\r\n"
        f"Content-Length: {len(payload)}\r\n"
        f"Connection: close\r\n"
        f"\r\n"
    ).encode("utf-8")
    return header + payload

def handle_health(method: str, _path: str, _headers: Dict, _body: bytes) -> bytes:
    if method != "GET":
        return build_response(405, {"error": "Method Not Allowed"})
    return build_response(200, {
        "status": "ok",
        "time": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
        "tasks_count": len(_tasks),
    })

def handle_tasks_get() -> bytes:
    with _lock:
        return build_response(200, {"tasks": list(_tasks), "total": len(_tasks)})

def handle_tasks_post(body: bytes) -> bytes:
    global _next_id
    try:
        data = json.loads(body.decode("utf-8"))
        title = data.get("title", "").strip()
        if not title:
            return build_response(400, {"error": "title 不能为空"})
    except (json.JSONDecodeError, UnicodeDecodeError):
        return build_response(400, {"error": "请求体必须是合法 JSON"})

    with _lock:
        task = {"id": _next_id, "title": title, "done": False}
        _tasks.append(task)
        _next_id += 1
    return build_response(201, {"task": task})

def handle_tasks(method: str, _path: str, _headers: Dict, body: bytes) -> bytes:
    if method == "GET":
        return handle_tasks_get()
    elif method == "POST":
        return handle_tasks_post(body)
    return build_response(405, {"error": "Method Not Allowed"})
# handle_health/handle_tasks 是路由处理器,接收 (method, path, headers, body) 返回响应字节
resp_health = handle_health("GET", "/health", {}, b"")
print(f"/health GET → {resp_health.split(b' ')[1].decode()}")
resp_tasks = handle_tasks_get()
print(f"/tasks GET → {resp_tasks.split(b' ')[1].decode()}")
body_post = __import__('json').dumps({"title": "新任务"}).encode()
resp_post = handle_tasks_post(body_post)
print(f"/tasks POST → {resp_post.split(b' ')[1].decode()}")

Step 4:用路由表字典 + dispatch 实现 O(1) URL 分发

痛点与机制

ROUTES 字典就像"楼层导览图":{"/health": handle_health, "/tasks": handle_tasks},查找是 O(1) 的哈希查找,不管有多少路由都一样快。dispatch 先用 path.split("?")[0] 去掉 query string,再查字典——这正是 Flask/FastAPI 路由注册的核心思想,只是框架在此基础上加了正则匹配和路径参数提取。

核心源码(逐字来自文末完整源码)

Router = Dict[str, Callable[[str, str, Dict, bytes], bytes]]

ROUTES: Router = {
    "/health": handle_health,
    "/tasks": handle_tasks,
}


def dispatch(method: str, path: str, headers: Dict, body: bytes) -> bytes:
    # 只匹配路径,忽略 query string
    clean_path = path.split("?")[0]
    handler = ROUTES.get(clean_path)
    if handler is None:
        return build_response(404, {"error": f"路径 {clean_path} 不存在"})
    return handler(method, clean_path, headers, body)

可运行演示(补齐 Mock 数据与 print 反馈)

#!/usr/bin/env python3
"""
18-http-server.py
从零手写 HTTP/1.1 服务器 + urllib 演示客户端

用法:
  python3 18-http-server.py --mode server   # 启动服务器(Ctrl+C 停止)
  python3 18-http-server.py --mode demo     # 发请求演示(需先启动服务器)
"""

import argparse
import json
import socket
import threading
import time
import urllib.request
import urllib.error
from datetime import datetime
from typing import Callable, Dict, List, Optional, Tuple

# ─── 数据存储(内存) ────────────────────────────────────────────────────────
_tasks: List[Dict] = [
    {"id": 1, "title": "阅读 HTTP RFC", "done": False},
    {"id": 2, "title": "手写 socket 服务器", "done": True},
]
_next_id = 3
_lock = threading.Lock()

def parse_request(raw: bytes) -> Tuple[str, str, str, Dict[str, str], bytes]:
    """解析原始 HTTP 请求,返回 (method, path, version, headers, body)"""
    header_end = raw.find(b"\r\n\r\n")
    header_part = raw[:header_end].decode("utf-8", errors="replace")
    body = raw[header_end + 4:]

    lines = header_part.split("\r\n")
    method, path, version = lines[0].split(" ", 2)
    headers: Dict[str, str] = {}
    for line in lines[1:]:
        if ":" in line:
            k, v = line.split(":", 1)
            headers[k.strip().lower()] = v.strip()

    content_length = int(headers.get("content-length", 0))
    return method, path, version, headers, body[:content_length]

def build_response(status: int, body: dict) -> bytes:
    """构造 JSON 响应报文"""
    status_text = {200: "OK", 201: "Created", 404: "Not Found",
                   405: "Method Not Allowed", 400: "Bad Request"}
    payload = json.dumps(body, ensure_ascii=False).encode("utf-8")
    header = (
        f"HTTP/1.1 {status} {status_text.get(status, 'Unknown')}\r\n"
        f"Content-Type: application/json; charset=utf-8\r\n"
        f"Content-Length: {len(payload)}\r\n"
        f"Connection: close\r\n"
        f"\r\n"
    ).encode("utf-8")
    return header + payload

def handle_health(method: str, _path: str, _headers: Dict, _body: bytes) -> bytes:
    if method != "GET":
        return build_response(405, {"error": "Method Not Allowed"})
    return build_response(200, {
        "status": "ok",
        "time": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
        "tasks_count": len(_tasks),
    })

def handle_tasks_get() -> bytes:
    with _lock:
        return build_response(200, {"tasks": list(_tasks), "total": len(_tasks)})

def handle_tasks_post(body: bytes) -> bytes:
    global _next_id
    try:
        data = json.loads(body.decode("utf-8"))
        title = data.get("title", "").strip()
        if not title:
            return build_response(400, {"error": "title 不能为空"})
    except (json.JSONDecodeError, UnicodeDecodeError):
        return build_response(400, {"error": "请求体必须是合法 JSON"})

    with _lock:
        task = {"id": _next_id, "title": title, "done": False}
        _tasks.append(task)
        _next_id += 1
    return build_response(201, {"task": task})

def handle_tasks(method: str, _path: str, _headers: Dict, body: bytes) -> bytes:
    if method == "GET":
        return handle_tasks_get()
    elif method == "POST":
        return handle_tasks_post(body)
    return build_response(405, {"error": "Method Not Allowed"})

Router = Dict[str, Callable[[str, str, Dict, bytes], bytes]]

ROUTES: Router = {
    "/health": handle_health,
    "/tasks": handle_tasks,
}


def dispatch(method: str, path: str, headers: Dict, body: bytes) -> bytes:
    # 只匹配路径,忽略 query string
    clean_path = path.split("?")[0]
    handler = ROUTES.get(clean_path)
    if handler is None:
        return build_response(404, {"error": f"路径 {clean_path} 不存在"})
    return handler(method, clean_path, headers, body)
# dispatch 用字典路由表做 O(1) 查找,忽略 query string,找不到返回 404
method, path, _, headers, body = parse_request(b"GET /health HTTP/1.1\r\n\r\n")
resp = dispatch(method, path, headers, body)
print(f"dispatch /health → {resp.split(b'\r\n')[0].decode()}")
resp2 = dispatch("GET", "/nonexistent", {}, b"")
print(f"dispatch /nonexistent → {resp2.split(b'\r\n')[0].decode()}")

Step 5:用 handle_connection 在独立线程里处理一个 TCP 连接

痛点与机制

TCP 是流式协议,不保证一次 recv 能收到完整的 HTTP 请求——就像水管里的水,可能分多次流过来。handle_connection 用循环 recv 收数据,每次收到后检查是否已经收到 \r\n\r\n(头部结束),再按 Content-Length 判断正文是否收完。daemon=True 让线程随主进程退出,不会因为客户端连接没断而阻止程序退出。

核心源码(逐字来自文末完整源码)

def handle_connection(conn: socket.socket, addr: Tuple[str, int]) -> None:
    try:
        chunks = []
        while True:
            chunk = conn.recv(4096)
            if not chunk:
                break
            chunks.append(chunk)
            # 简单判断:收到 \r\n\r\n 后再读 Content-Length 字节
            raw = b"".join(chunks)
            if b"\r\n\r\n" in raw:
                header_end = raw.find(b"\r\n\r\n")
                header_part = raw[:header_end].decode("utf-8", errors="replace")
                content_length = 0
                for line in header_part.split("\r\n")[1:]:
                    if line.lower().startswith("content-length:"):
                        content_length = int(line.split(":", 1)[1].strip())
                body_received = len(raw) - header_end - 4
                if body_received >= content_length:
                    break

        if not chunks:
            return

        raw = b"".join(chunks)
        method, path, _version, headers, body = parse_request(raw)
        response = dispatch(method, path, headers, body)
        conn.sendall(response)
        print(f"  [{datetime.now().strftime('%H:%M:%S')}] {addr[0]} {method} {path}")
    except Exception as e:
        print(f"  [错误] {addr}: {e}")
    finally:
        conn.close()

可运行演示(补齐 Mock 数据与 print 反馈)

#!/usr/bin/env python3
"""
18-http-server.py
从零手写 HTTP/1.1 服务器 + urllib 演示客户端

用法:
  python3 18-http-server.py --mode server   # 启动服务器(Ctrl+C 停止)
  python3 18-http-server.py --mode demo     # 发请求演示(需先启动服务器)
"""

import argparse
import json
import socket
import threading
import time
import urllib.request
import urllib.error
from datetime import datetime
from typing import Callable, Dict, List, Optional, Tuple

# ─── 数据存储(内存) ────────────────────────────────────────────────────────
_tasks: List[Dict] = [
    {"id": 1, "title": "阅读 HTTP RFC", "done": False},
    {"id": 2, "title": "手写 socket 服务器", "done": True},
]
_next_id = 3
_lock = threading.Lock()

# ─── HTTP 解析 ───────────────────────────────────────────────────────────────

def parse_request(raw: bytes) -> Tuple[str, str, str, Dict[str, str], bytes]:
    """解析原始 HTTP 请求,返回 (method, path, version, headers, body)"""
    header_end = raw.find(b"\r\n\r\n")
    header_part = raw[:header_end].decode("utf-8", errors="replace")
    body = raw[header_end + 4:]

    lines = header_part.split("\r\n")
    method, path, version = lines[0].split(" ", 2)
    headers: Dict[str, str] = {}
    for line in lines[1:]:
        if ":" in line:
            k, v = line.split(":", 1)
            headers[k.strip().lower()] = v.strip()

    content_length = int(headers.get("content-length", 0))
    return method, path, version, headers, body[:content_length]


def build_response(status: int, body: dict) -> bytes:
    """构造 JSON 响应报文"""
    status_text = {200: "OK", 201: "Created", 404: "Not Found",
                   405: "Method Not Allowed", 400: "Bad Request"}
    payload = json.dumps(body, ensure_ascii=False).encode("utf-8")
    header = (
        f"HTTP/1.1 {status} {status_text.get(status, 'Unknown')}\r\n"
        f"Content-Type: application/json; charset=utf-8\r\n"
        f"Content-Length: {len(payload)}\r\n"
        f"Connection: close\r\n"
        f"\r\n"
    ).encode("utf-8")
    return header + payload

# ─── 路由处理器 ──────────────────────────────────────────────────────────────

def handle_health(method: str, _path: str, _headers: Dict, _body: bytes) -> bytes:
    if method != "GET":
        return build_response(405, {"error": "Method Not Allowed"})
    return build_response(200, {
        "status": "ok",
        "time": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
        "tasks_count": len(_tasks),
    })


def handle_tasks_get() -> bytes:
    with _lock:
        return build_response(200, {"tasks": list(_tasks), "total": len(_tasks)})


def handle_tasks_post(body: bytes) -> bytes:
    global _next_id
    try:
        data = json.loads(body.decode("utf-8"))
        title = data.get("title", "").strip()
        if not title:
            return build_response(400, {"error": "title 不能为空"})
    except (json.JSONDecodeError, UnicodeDecodeError):
        return build_response(400, {"error": "请求体必须是合法 JSON"})

    with _lock:
        task = {"id": _next_id, "title": title, "done": False}
        _tasks.append(task)
        _next_id += 1
    return build_response(201, {"task": task})


def handle_tasks(method: str, _path: str, _headers: Dict, body: bytes) -> bytes:
    if method == "GET":
        return handle_tasks_get()
    elif method == "POST":
        return handle_tasks_post(body)
    return build_response(405, {"error": "Method Not Allowed"})

# ─── 路由表 ──────────────────────────────────────────────────────────────────

Router = Dict[str, Callable[[str, str, Dict, bytes], bytes]]

ROUTES: Router = {
    "/health": handle_health,
    "/tasks": handle_tasks,
}


def dispatch(method: str, path: str, headers: Dict, body: bytes) -> bytes:
    # 只匹配路径,忽略 query string
    clean_path = path.split("?")[0]
    handler = ROUTES.get(clean_path)
    if handler is None:
        return build_response(404, {"error": f"路径 {clean_path} 不存在"})
    return handler(method, clean_path, headers, body)

# ─── 连接处理(每个连接一个线程) ────────────────────────────────────────────

def handle_connection(conn: socket.socket, addr: Tuple[str, int]) -> None:
    try:
        chunks = []
        while True:
            chunk = conn.recv(4096)
            if not chunk:
                break
            chunks.append(chunk)
            # 简单判断:收到 \r\n\r\n 后再读 Content-Length 字节
            raw = b"".join(chunks)
            if b"\r\n\r\n" in raw:
                header_end = raw.find(b"\r\n\r\n")
                header_part = raw[:header_end].decode("utf-8", errors="replace")
                content_length = 0
                for line in header_part.split("\r\n")[1:]:
                    if line.lower().startswith("content-length:"):
                        content_length = int(line.split(":", 1)[1].strip())
                body_received = len(raw) - header_end - 4
                if body_received >= content_length:
                    break

        if not chunks:
            return

        raw = b"".join(chunks)
        method, path, _version, headers, body = parse_request(raw)
        response = dispatch(method, path, headers, body)
        conn.sendall(response)
        print(f"  [{datetime.now().strftime('%H:%M:%S')}] {addr[0]} {method} {path}")
    except Exception as e:
        print(f"  [错误] {addr}: {e}")
    finally:
        conn.close()

# ─── 服务器主循环 ────────────────────────────────────────────────────────────

def run_server(host: str = "127.0.0.1", port: int = 8000) -> None:
    server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    server.bind((host, port))
    server.listen(10)
    print(f"🚀 HTTP 服务器启动:http://{host}:{port}")
    print("   路由:GET /health | GET /tasks | POST /tasks")
    print("   按 Ctrl+C 停止\n")
    try:
        while True:
            conn, addr = server.accept()
            t = threading.Thread(target=handle_connection, args=(conn, addr), daemon=True)
            t.start()
    except KeyboardInterrupt:
        print("\n服务器已停止。")
    finally:
        server.close()

# ─── 演示客户端 ──────────────────────────────────────────────────────────────

def print_table(title: str, rows: List[Dict]) -> None:
    print(f"\n{'─'*50}")
    print(f"  {title}")
    print(f"{'─'*50}")
    for row in rows:
        status_icon = "✅" if row.get("done") else "⬜"
        print(f"  [{row['id']:>2}] {status_icon} {row['title']}")
    print(f"{'─'*50}")


def demo_client(base: str = "http://127.0.0.1:8000") -> None:
    def get(path: str) -> dict:
        with urllib.request.urlopen(f"{base}{path}") as r:
            return json.loads(r.read())

    def post(path: str, data: dict) -> dict:
        payload = json.dumps(data).encode()
        req = urllib.request.Request(
            f"{base}{path}", data=payload,
            headers={"Content-Type": "application/json"}, method="POST"
        )
        try:
            with urllib.request.urlopen(req) as r:
                return json.loads(r.read())
        except urllib.error.HTTPError as e:
            return json.loads(e.read())

    print("\n=== HTTP 服务器演示 ===")

    # 1. 健康检查
    health = get("/health")
    print(f"\n[GET /health] → {health}")

    # 2. 获取任务列表
    result = get("/tasks")
    print_table("初始任务列表", result["tasks"])

    # 3. 创建新任务
    new_tasks = [
        {"title": "部署到生产环境"},
        {"title": "写单元测试"},
        {"title": ""},  # 故意触发 400
    ]
    print("\n[POST /tasks] 创建任务:")
    for t in new_tasks:
        resp = post("/tasks", t)
        if "task" in resp:
            print(f"  ✅ 创建成功:{resp['task']}")
        else:
            print(f"  ❌ 失败:{resp['error']}")

    # 4. 再次获取列表
    result = get("/tasks")
    print_table("更新后任务列表", result["tasks"])

    # 5. 404 演示
    try:
        urllib.request.urlopen(f"{base}/nonexistent")
    except urllib.error.HTTPError as e:
        body = json.loads(e.read())
        print(f"\n[GET /nonexistent] → 404: {body['error']}")


# ─── 入口 ────────────────────────────────────────────────────────────────────

def main() -> None:
    parser = argparse.ArgumentParser(description="从零手写 HTTP/1.1 服务器")
    parser.add_argument("--mode", choices=["server", "demo"], default="server",
                        help="server=启动服务器, demo=发请求演示")
    parser.add_argument("--host", default="127.0.0.1")
    parser.add_argument("--port", type=int, default=8000)
    args = parser.parse_args()

    if args.mode == "server":
        run_server(args.host, args.port)
    else:
        demo_client(f"http://{args.host}:{args.port}")


# Step 5 演示:不用占用真实端口,用 socketpair() 创建一对已经连通的“本地电话线”。
# 这样网页运行器、公司电脑、初学者电脑都不需要开放端口,也能看到完整请求响应。
server_sock, client_sock = socket.socketpair()
worker = threading.Thread(
    target=handle_connection,
    args=(server_sock, ("local-client", 0)),
    daemon=True,
)
worker.start()

# 客户端这头发出一个最小 HTTP 请求:请求行 + 空行,表示没有请求体。
client_sock.sendall(b"GET /health HTTP/1.1\r\nHost: local\r\n\r\n")
response = client_sock.recv(4096)
client_sock.close()
worker.join(timeout=2)

head, body = response.decode("utf-8").split("\r\n\r\n", 1)
print("响应首行:", head.splitlines()[0])
print("响应正文:", body)

Step 6:用 run_server 启动主循环,每个连接分配一个守护线程

痛点与机制

run_server 是服务器的"大堂经理":socket.listen(10) 设置等待队列长度,socket.accept() 阻塞等待新连接,每来一个连接就开一个线程去处理。SO_REUSEADDR 是关键——没有它,服务器重启后会报"地址已被占用",因为 TCP 的 TIME_WAIT 状态还没结束。

核心源码(逐字来自文末完整源码)

def run_server(host: str = "127.0.0.1", port: int = 8000) -> None:
    server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    server.bind((host, port))
    server.listen(10)
    print(f"🚀 HTTP 服务器启动:http://{host}:{port}")
    print("   路由:GET /health | GET /tasks | POST /tasks")
    print("   按 Ctrl+C 停止\n")
    try:
        while True:
            conn, addr = server.accept()
            t = threading.Thread(target=handle_connection, args=(conn, addr), daemon=True)
            t.start()
    except KeyboardInterrupt:
        print("\n服务器已停止。")
    finally:
        server.close()

可运行演示(补齐 Mock 数据与 print 反馈)

#!/usr/bin/env python3
"""
18-http-server.py
从零手写 HTTP/1.1 服务器 + urllib 演示客户端

用法:
  python3 18-http-server.py --mode server   # 启动服务器(Ctrl+C 停止)
  python3 18-http-server.py --mode demo     # 发请求演示(需先启动服务器)
"""

import argparse
import json
import socket
import threading
import time
import urllib.request
import urllib.error
from datetime import datetime
from typing import Callable, Dict, List, Optional, Tuple

# ─── 数据存储(内存) ────────────────────────────────────────────────────────
_tasks: List[Dict] = [
    {"id": 1, "title": "阅读 HTTP RFC", "done": False},
    {"id": 2, "title": "手写 socket 服务器", "done": True},
]
_next_id = 3
_lock = threading.Lock()

# ─── HTTP 解析 ───────────────────────────────────────────────────────────────

def parse_request(raw: bytes) -> Tuple[str, str, str, Dict[str, str], bytes]:
    """解析原始 HTTP 请求,返回 (method, path, version, headers, body)"""
    header_end = raw.find(b"\r\n\r\n")
    header_part = raw[:header_end].decode("utf-8", errors="replace")
    body = raw[header_end + 4:]

    lines = header_part.split("\r\n")
    method, path, version = lines[0].split(" ", 2)
    headers: Dict[str, str] = {}
    for line in lines[1:]:
        if ":" in line:
            k, v = line.split(":", 1)
            headers[k.strip().lower()] = v.strip()

    content_length = int(headers.get("content-length", 0))
    return method, path, version, headers, body[:content_length]


def build_response(status: int, body: dict) -> bytes:
    """构造 JSON 响应报文"""
    status_text = {200: "OK", 201: "Created", 404: "Not Found",
                   405: "Method Not Allowed", 400: "Bad Request"}
    payload = json.dumps(body, ensure_ascii=False).encode("utf-8")
    header = (
        f"HTTP/1.1 {status} {status_text.get(status, 'Unknown')}\r\n"
        f"Content-Type: application/json; charset=utf-8\r\n"
        f"Content-Length: {len(payload)}\r\n"
        f"Connection: close\r\n"
        f"\r\n"
    ).encode("utf-8")
    return header + payload

# ─── 路由处理器 ──────────────────────────────────────────────────────────────

def handle_health(method: str, _path: str, _headers: Dict, _body: bytes) -> bytes:
    if method != "GET":
        return build_response(405, {"error": "Method Not Allowed"})
    return build_response(200, {
        "status": "ok",
        "time": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
        "tasks_count": len(_tasks),
    })


def handle_tasks_get() -> bytes:
    with _lock:
        return build_response(200, {"tasks": list(_tasks), "total": len(_tasks)})


def handle_tasks_post(body: bytes) -> bytes:
    global _next_id
    try:
        data = json.loads(body.decode("utf-8"))
        title = data.get("title", "").strip()
        if not title:
            return build_response(400, {"error": "title 不能为空"})
    except (json.JSONDecodeError, UnicodeDecodeError):
        return build_response(400, {"error": "请求体必须是合法 JSON"})

    with _lock:
        task = {"id": _next_id, "title": title, "done": False}
        _tasks.append(task)
        _next_id += 1
    return build_response(201, {"task": task})


def handle_tasks(method: str, _path: str, _headers: Dict, body: bytes) -> bytes:
    if method == "GET":
        return handle_tasks_get()
    elif method == "POST":
        return handle_tasks_post(body)
    return build_response(405, {"error": "Method Not Allowed"})

# ─── 路由表 ──────────────────────────────────────────────────────────────────

Router = Dict[str, Callable[[str, str, Dict, bytes], bytes]]

ROUTES: Router = {
    "/health": handle_health,
    "/tasks": handle_tasks,
}


def dispatch(method: str, path: str, headers: Dict, body: bytes) -> bytes:
    # 只匹配路径,忽略 query string
    clean_path = path.split("?")[0]
    handler = ROUTES.get(clean_path)
    if handler is None:
        return build_response(404, {"error": f"路径 {clean_path} 不存在"})
    return handler(method, clean_path, headers, body)

# ─── 连接处理(每个连接一个线程) ────────────────────────────────────────────

def handle_connection(conn: socket.socket, addr: Tuple[str, int]) -> None:
    try:
        chunks = []
        while True:
            chunk = conn.recv(4096)
            if not chunk:
                break
            chunks.append(chunk)
            # 简单判断:收到 \r\n\r\n 后再读 Content-Length 字节
            raw = b"".join(chunks)
            if b"\r\n\r\n" in raw:
                header_end = raw.find(b"\r\n\r\n")
                header_part = raw[:header_end].decode("utf-8", errors="replace")
                content_length = 0
                for line in header_part.split("\r\n")[1:]:
                    if line.lower().startswith("content-length:"):
                        content_length = int(line.split(":", 1)[1].strip())
                body_received = len(raw) - header_end - 4
                if body_received >= content_length:
                    break

        if not chunks:
            return

        raw = b"".join(chunks)
        method, path, _version, headers, body = parse_request(raw)
        response = dispatch(method, path, headers, body)
        conn.sendall(response)
        print(f"  [{datetime.now().strftime('%H:%M:%S')}] {addr[0]} {method} {path}")
    except Exception as e:
        print(f"  [错误] {addr}: {e}")
    finally:
        conn.close()

# ─── 服务器主循环 ────────────────────────────────────────────────────────────

def run_server(host: str = "127.0.0.1", port: int = 8000) -> None:
    server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    server.bind((host, port))
    server.listen(10)
    print(f"🚀 HTTP 服务器启动:http://{host}:{port}")
    print("   路由:GET /health | GET /tasks | POST /tasks")
    print("   按 Ctrl+C 停止\n")
    try:
        while True:
            conn, addr = server.accept()
            t = threading.Thread(target=handle_connection, args=(conn, addr), daemon=True)
            t.start()
    except KeyboardInterrupt:
        print("\n服务器已停止。")
    finally:
        server.close()

# ─── 演示客户端 ──────────────────────────────────────────────────────────────

def print_table(title: str, rows: List[Dict]) -> None:
    print(f"\n{'─'*50}")
    print(f"  {title}")
    print(f"{'─'*50}")
    for row in rows:
        status_icon = "✅" if row.get("done") else "⬜"
        print(f"  [{row['id']:>2}] {status_icon} {row['title']}")
    print(f"{'─'*50}")


def demo_client(base: str = "http://127.0.0.1:8000") -> None:
    def get(path: str) -> dict:
        with urllib.request.urlopen(f"{base}{path}") as r:
            return json.loads(r.read())

    def post(path: str, data: dict) -> dict:
        payload = json.dumps(data).encode()
        req = urllib.request.Request(
            f"{base}{path}", data=payload,
            headers={"Content-Type": "application/json"}, method="POST"
        )
        try:
            with urllib.request.urlopen(req) as r:
                return json.loads(r.read())
        except urllib.error.HTTPError as e:
            return json.loads(e.read())

    print("\n=== HTTP 服务器演示 ===")

    # 1. 健康检查
    health = get("/health")
    print(f"\n[GET /health] → {health}")

    # 2. 获取任务列表
    result = get("/tasks")
    print_table("初始任务列表", result["tasks"])

    # 3. 创建新任务
    new_tasks = [
        {"title": "部署到生产环境"},
        {"title": "写单元测试"},
        {"title": ""},  # 故意触发 400
    ]
    print("\n[POST /tasks] 创建任务:")
    for t in new_tasks:
        resp = post("/tasks", t)
        if "task" in resp:
            print(f"  ✅ 创建成功:{resp['task']}")
        else:
            print(f"  ❌ 失败:{resp['error']}")

    # 4. 再次获取列表
    result = get("/tasks")
    print_table("更新后任务列表", result["tasks"])

    # 5. 404 演示
    try:
        urllib.request.urlopen(f"{base}/nonexistent")
    except urllib.error.HTTPError as e:
        body = json.loads(e.read())
        print(f"\n[GET /nonexistent] → 404: {body['error']}")


# ─── 入口 ────────────────────────────────────────────────────────────────────

def main() -> None:
    parser = argparse.ArgumentParser(description="从零手写 HTTP/1.1 服务器")
    parser.add_argument("--mode", choices=["server", "demo"], default="server",
                        help="server=启动服务器, demo=发请求演示")
    parser.add_argument("--host", default="127.0.0.1")
    parser.add_argument("--port", type=int, default=8000)
    args = parser.parse_args()

    if args.mode == "server":
        run_server(args.host, args.port)
    else:
        demo_client(f"http://{args.host}:{args.port}")


# Step 6 演示:run_server 会进入无限监听循环,适合真实终端,不适合网页里直接运行。
# 所以这里用“拆解打印”的方式让你看懂它启动时做了哪几件事。
print("run_server 的启动清单:")
print("1. socket.socket(...) 创建 TCP 监听套接字")
print("2. setsockopt(SO_REUSEADDR) 允许服务重启后快速复用端口")
print("3. bind(('127.0.0.1', 8000)) 把服务器挂到本机地址")
print("4. listen(10) 开始排队接客,最多允许 10 个连接等待")
print("5. accept() 每接到一个客户端,就交给守护线程 handle_connection")
print("真实运行命令: python3 18-http-server.py --mode server --port 8000")

Step 7:用 demo_client 发真实 HTTP 请求,测试 GET/POST/404 全流程

痛点与机制

demo_client 用 Python 内置的 urllib.request 发 HTTP 请求,不需要安装任何第三方库。urllib.error.HTTPError 会在状态码 >= 400 时抛出,但响应体仍然可以通过 e.read() 读取——这是测试 400/404 错误响应的标准姿势。print_table 用 emoji 和对齐格式让终端输出一眼可辨。

核心源码(逐字来自文末完整源码)

def demo_client(base: str = "http://127.0.0.1:8000") -> None:
    def get(path: str) -> dict:
        with urllib.request.urlopen(f"{base}{path}") as r:
            return json.loads(r.read())

    def post(path: str, data: dict) -> dict:
        payload = json.dumps(data).encode()
        req = urllib.request.Request(
            f"{base}{path}", data=payload,
            headers={"Content-Type": "application/json"}, method="POST"
        )
        try:
            with urllib.request.urlopen(req) as r:
                return json.loads(r.read())
        except urllib.error.HTTPError as e:
            return json.loads(e.read())

    print("\n=== HTTP 服务器演示 ===")

    # 1. 健康检查
    health = get("/health")
    print(f"\n[GET /health] → {health}")

    # 2. 获取任务列表
    result = get("/tasks")
    print_table("初始任务列表", result["tasks"])

    # 3. 创建新任务
    new_tasks = [
        {"title": "部署到生产环境"},
        {"title": "写单元测试"},
        {"title": ""},  # 故意触发 400
    ]
    print("\n[POST /tasks] 创建任务:")
    for t in new_tasks:
        resp = post("/tasks", t)
        if "task" in resp:
            print(f"  ✅ 创建成功:{resp['task']}")
        else:
            print(f"  ❌ 失败:{resp['error']}")

    # 4. 再次获取列表
    result = get("/tasks")
    print_table("更新后任务列表", result["tasks"])

    # 5. 404 演示
    try:
        urllib.request.urlopen(f"{base}/nonexistent")
    except urllib.error.HTTPError as e:
        body = json.loads(e.read())
        print(f"\n[GET /nonexistent] → 404: {body['error']}")

可运行演示(补齐 Mock 数据与 print 反馈)

#!/usr/bin/env python3
"""
18-http-server.py
从零手写 HTTP/1.1 服务器 + urllib 演示客户端

用法:
  python3 18-http-server.py --mode server   # 启动服务器(Ctrl+C 停止)
  python3 18-http-server.py --mode demo     # 发请求演示(需先启动服务器)
"""

import argparse
import json
import socket
import threading
import time
import urllib.request
import urllib.error
from datetime import datetime
from typing import Callable, Dict, List, Optional, Tuple

# ─── 数据存储(内存) ────────────────────────────────────────────────────────
_tasks: List[Dict] = [
    {"id": 1, "title": "阅读 HTTP RFC", "done": False},
    {"id": 2, "title": "手写 socket 服务器", "done": True},
]
_next_id = 3
_lock = threading.Lock()

# ─── HTTP 解析 ───────────────────────────────────────────────────────────────

def parse_request(raw: bytes) -> Tuple[str, str, str, Dict[str, str], bytes]:
    """解析原始 HTTP 请求,返回 (method, path, version, headers, body)"""
    header_end = raw.find(b"\r\n\r\n")
    header_part = raw[:header_end].decode("utf-8", errors="replace")
    body = raw[header_end + 4:]

    lines = header_part.split("\r\n")
    method, path, version = lines[0].split(" ", 2)
    headers: Dict[str, str] = {}
    for line in lines[1:]:
        if ":" in line:
            k, v = line.split(":", 1)
            headers[k.strip().lower()] = v.strip()

    content_length = int(headers.get("content-length", 0))
    return method, path, version, headers, body[:content_length]


def build_response(status: int, body: dict) -> bytes:
    """构造 JSON 响应报文"""
    status_text = {200: "OK", 201: "Created", 404: "Not Found",
                   405: "Method Not Allowed", 400: "Bad Request"}
    payload = json.dumps(body, ensure_ascii=False).encode("utf-8")
    header = (
        f"HTTP/1.1 {status} {status_text.get(status, 'Unknown')}\r\n"
        f"Content-Type: application/json; charset=utf-8\r\n"
        f"Content-Length: {len(payload)}\r\n"
        f"Connection: close\r\n"
        f"\r\n"
    ).encode("utf-8")
    return header + payload

# ─── 路由处理器 ──────────────────────────────────────────────────────────────

def handle_health(method: str, _path: str, _headers: Dict, _body: bytes) -> bytes:
    if method != "GET":
        return build_response(405, {"error": "Method Not Allowed"})
    return build_response(200, {
        "status": "ok",
        "time": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
        "tasks_count": len(_tasks),
    })


def handle_tasks_get() -> bytes:
    with _lock:
        return build_response(200, {"tasks": list(_tasks), "total": len(_tasks)})


def handle_tasks_post(body: bytes) -> bytes:
    global _next_id
    try:
        data = json.loads(body.decode("utf-8"))
        title = data.get("title", "").strip()
        if not title:
            return build_response(400, {"error": "title 不能为空"})
    except (json.JSONDecodeError, UnicodeDecodeError):
        return build_response(400, {"error": "请求体必须是合法 JSON"})

    with _lock:
        task = {"id": _next_id, "title": title, "done": False}
        _tasks.append(task)
        _next_id += 1
    return build_response(201, {"task": task})


def handle_tasks(method: str, _path: str, _headers: Dict, body: bytes) -> bytes:
    if method == "GET":
        return handle_tasks_get()
    elif method == "POST":
        return handle_tasks_post(body)
    return build_response(405, {"error": "Method Not Allowed"})

# ─── 路由表 ──────────────────────────────────────────────────────────────────

Router = Dict[str, Callable[[str, str, Dict, bytes], bytes]]

ROUTES: Router = {
    "/health": handle_health,
    "/tasks": handle_tasks,
}


def dispatch(method: str, path: str, headers: Dict, body: bytes) -> bytes:
    # 只匹配路径,忽略 query string
    clean_path = path.split("?")[0]
    handler = ROUTES.get(clean_path)
    if handler is None:
        return build_response(404, {"error": f"路径 {clean_path} 不存在"})
    return handler(method, clean_path, headers, body)

# ─── 连接处理(每个连接一个线程) ────────────────────────────────────────────

def handle_connection(conn: socket.socket, addr: Tuple[str, int]) -> None:
    try:
        chunks = []
        while True:
            chunk = conn.recv(4096)
            if not chunk:
                break
            chunks.append(chunk)
            # 简单判断:收到 \r\n\r\n 后再读 Content-Length 字节
            raw = b"".join(chunks)
            if b"\r\n\r\n" in raw:
                header_end = raw.find(b"\r\n\r\n")
                header_part = raw[:header_end].decode("utf-8", errors="replace")
                content_length = 0
                for line in header_part.split("\r\n")[1:]:
                    if line.lower().startswith("content-length:"):
                        content_length = int(line.split(":", 1)[1].strip())
                body_received = len(raw) - header_end - 4
                if body_received >= content_length:
                    break

        if not chunks:
            return

        raw = b"".join(chunks)
        method, path, _version, headers, body = parse_request(raw)
        response = dispatch(method, path, headers, body)
        conn.sendall(response)
        print(f"  [{datetime.now().strftime('%H:%M:%S')}] {addr[0]} {method} {path}")
    except Exception as e:
        print(f"  [错误] {addr}: {e}")
    finally:
        conn.close()

# ─── 服务器主循环 ────────────────────────────────────────────────────────────

def run_server(host: str = "127.0.0.1", port: int = 8000) -> None:
    server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    server.bind((host, port))
    server.listen(10)
    print(f"🚀 HTTP 服务器启动:http://{host}:{port}")
    print("   路由:GET /health | GET /tasks | POST /tasks")
    print("   按 Ctrl+C 停止\n")
    try:
        while True:
            conn, addr = server.accept()
            t = threading.Thread(target=handle_connection, args=(conn, addr), daemon=True)
            t.start()
    except KeyboardInterrupt:
        print("\n服务器已停止。")
    finally:
        server.close()

# ─── 演示客户端 ──────────────────────────────────────────────────────────────

def print_table(title: str, rows: List[Dict]) -> None:
    print(f"\n{'─'*50}")
    print(f"  {title}")
    print(f"{'─'*50}")
    for row in rows:
        status_icon = "✅" if row.get("done") else "⬜"
        print(f"  [{row['id']:>2}] {status_icon} {row['title']}")
    print(f"{'─'*50}")


def demo_client(base: str = "http://127.0.0.1:8000") -> None:
    def get(path: str) -> dict:
        with urllib.request.urlopen(f"{base}{path}") as r:
            return json.loads(r.read())

    def post(path: str, data: dict) -> dict:
        payload = json.dumps(data).encode()
        req = urllib.request.Request(
            f"{base}{path}", data=payload,
            headers={"Content-Type": "application/json"}, method="POST"
        )
        try:
            with urllib.request.urlopen(req) as r:
                return json.loads(r.read())
        except urllib.error.HTTPError as e:
            return json.loads(e.read())

    print("\n=== HTTP 服务器演示 ===")

    # 1. 健康检查
    health = get("/health")
    print(f"\n[GET /health] → {health}")

    # 2. 获取任务列表
    result = get("/tasks")
    print_table("初始任务列表", result["tasks"])

    # 3. 创建新任务
    new_tasks = [
        {"title": "部署到生产环境"},
        {"title": "写单元测试"},
        {"title": ""},  # 故意触发 400
    ]
    print("\n[POST /tasks] 创建任务:")
    for t in new_tasks:
        resp = post("/tasks", t)
        if "task" in resp:
            print(f"  ✅ 创建成功:{resp['task']}")
        else:
            print(f"  ❌ 失败:{resp['error']}")

    # 4. 再次获取列表
    result = get("/tasks")
    print_table("更新后任务列表", result["tasks"])

    # 5. 404 演示
    try:
        urllib.request.urlopen(f"{base}/nonexistent")
    except urllib.error.HTTPError as e:
        body = json.loads(e.read())
        print(f"\n[GET /nonexistent] → 404: {body['error']}")


# ─── 入口 ────────────────────────────────────────────────────────────────────

def main() -> None:
    parser = argparse.ArgumentParser(description="从零手写 HTTP/1.1 服务器")
    parser.add_argument("--mode", choices=["server", "demo"], default="server",
                        help="server=启动服务器, demo=发请求演示")
    parser.add_argument("--host", default="127.0.0.1")
    parser.add_argument("--port", type=int, default=8000)
    args = parser.parse_args()

    if args.mode == "server":
        run_server(args.host, args.port)
    else:
        demo_client(f"http://{args.host}:{args.port}")


# Step 7 演示:demo_client 的真实版本走 urllib + HTTP。
# 为了让网页运行器零网络依赖,这里直接调用 dispatch,模拟同样的 GET/POST/404 流程。
print("=== 离线版 HTTP 客户端演示 ===")

health = json.loads(dispatch("GET", "/health", {}, b"").split(b"\r\n\r\n", 1)[1])
print("[GET /health] ->", health["status"], "tasks_count=", health["tasks_count"])

before = json.loads(dispatch("GET", "/tasks", {}, b"").split(b"\r\n\r\n", 1)[1])
print_table("初始任务列表", before["tasks"])

payload = json.dumps({"title": "写一个离线 HTTP 测试"}, ensure_ascii=False).encode("utf-8")
created = json.loads(dispatch("POST", "/tasks", {}, payload).split(b"\r\n\r\n", 1)[1])
print("[POST /tasks] ->", created["task"])

missing = json.loads(dispatch("GET", "/missing", {}, b"").split(b"\r\n\r\n", 1)[1])
print("[GET /missing] ->", missing["error"])

Step 8:用 main 做 server/demo 两种模式的 CLI 总入口

痛点与机制

mainargparse 做 CLI 入口:--mode server 启动服务器(阻塞,Ctrl+C 停止),--mode demo 发请求演示(需要服务器已在运行)。--host--port 参数让读者不改代码就能换端口,避免端口冲突。这个"server/client 分离"的设计模式在所有网络工具里都很常见。

核心源码(逐字来自文末完整源码)

def main() -> None:
    parser = argparse.ArgumentParser(description="从零手写 HTTP/1.1 服务器")
    parser.add_argument("--mode", choices=["server", "demo"], default="server",
                        help="server=启动服务器, demo=发请求演示")
    parser.add_argument("--host", default="127.0.0.1")
    parser.add_argument("--port", type=int, default=8000)
    args = parser.parse_args()

    if args.mode == "server":
        run_server(args.host, args.port)
    else:
        demo_client(f"http://{args.host}:{args.port}")

可运行演示(补齐 Mock 数据与 print 反馈)

#!/usr/bin/env python3
"""
18-http-server.py
从零手写 HTTP/1.1 服务器 + urllib 演示客户端

用法:
  python3 18-http-server.py --mode server   # 启动服务器(Ctrl+C 停止)
  python3 18-http-server.py --mode demo     # 发请求演示(需先启动服务器)
"""

import argparse
import json
import socket
import threading
import time
import urllib.request
import urllib.error
from datetime import datetime
from typing import Callable, Dict, List, Optional, Tuple

# ─── 数据存储(内存) ────────────────────────────────────────────────────────
_tasks: List[Dict] = [
    {"id": 1, "title": "阅读 HTTP RFC", "done": False},
    {"id": 2, "title": "手写 socket 服务器", "done": True},
]
_next_id = 3
_lock = threading.Lock()

# ─── HTTP 解析 ───────────────────────────────────────────────────────────────

def parse_request(raw: bytes) -> Tuple[str, str, str, Dict[str, str], bytes]:
    """解析原始 HTTP 请求,返回 (method, path, version, headers, body)"""
    header_end = raw.find(b"\r\n\r\n")
    header_part = raw[:header_end].decode("utf-8", errors="replace")
    body = raw[header_end + 4:]

    lines = header_part.split("\r\n")
    method, path, version = lines[0].split(" ", 2)
    headers: Dict[str, str] = {}
    for line in lines[1:]:
        if ":" in line:
            k, v = line.split(":", 1)
            headers[k.strip().lower()] = v.strip()

    content_length = int(headers.get("content-length", 0))
    return method, path, version, headers, body[:content_length]


def build_response(status: int, body: dict) -> bytes:
    """构造 JSON 响应报文"""
    status_text = {200: "OK", 201: "Created", 404: "Not Found",
                   405: "Method Not Allowed", 400: "Bad Request"}
    payload = json.dumps(body, ensure_ascii=False).encode("utf-8")
    header = (
        f"HTTP/1.1 {status} {status_text.get(status, 'Unknown')}\r\n"
        f"Content-Type: application/json; charset=utf-8\r\n"
        f"Content-Length: {len(payload)}\r\n"
        f"Connection: close\r\n"
        f"\r\n"
    ).encode("utf-8")
    return header + payload

# ─── 路由处理器 ──────────────────────────────────────────────────────────────

def handle_health(method: str, _path: str, _headers: Dict, _body: bytes) -> bytes:
    if method != "GET":
        return build_response(405, {"error": "Method Not Allowed"})
    return build_response(200, {
        "status": "ok",
        "time": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
        "tasks_count": len(_tasks),
    })


def handle_tasks_get() -> bytes:
    with _lock:
        return build_response(200, {"tasks": list(_tasks), "total": len(_tasks)})


def handle_tasks_post(body: bytes) -> bytes:
    global _next_id
    try:
        data = json.loads(body.decode("utf-8"))
        title = data.get("title", "").strip()
        if not title:
            return build_response(400, {"error": "title 不能为空"})
    except (json.JSONDecodeError, UnicodeDecodeError):
        return build_response(400, {"error": "请求体必须是合法 JSON"})

    with _lock:
        task = {"id": _next_id, "title": title, "done": False}
        _tasks.append(task)
        _next_id += 1
    return build_response(201, {"task": task})


def handle_tasks(method: str, _path: str, _headers: Dict, body: bytes) -> bytes:
    if method == "GET":
        return handle_tasks_get()
    elif method == "POST":
        return handle_tasks_post(body)
    return build_response(405, {"error": "Method Not Allowed"})

# ─── 路由表 ──────────────────────────────────────────────────────────────────

Router = Dict[str, Callable[[str, str, Dict, bytes], bytes]]

ROUTES: Router = {
    "/health": handle_health,
    "/tasks": handle_tasks,
}


def dispatch(method: str, path: str, headers: Dict, body: bytes) -> bytes:
    # 只匹配路径,忽略 query string
    clean_path = path.split("?")[0]
    handler = ROUTES.get(clean_path)
    if handler is None:
        return build_response(404, {"error": f"路径 {clean_path} 不存在"})
    return handler(method, clean_path, headers, body)

# ─── 连接处理(每个连接一个线程) ────────────────────────────────────────────

def handle_connection(conn: socket.socket, addr: Tuple[str, int]) -> None:
    try:
        chunks = []
        while True:
            chunk = conn.recv(4096)
            if not chunk:
                break
            chunks.append(chunk)
            # 简单判断:收到 \r\n\r\n 后再读 Content-Length 字节
            raw = b"".join(chunks)
            if b"\r\n\r\n" in raw:
                header_end = raw.find(b"\r\n\r\n")
                header_part = raw[:header_end].decode("utf-8", errors="replace")
                content_length = 0
                for line in header_part.split("\r\n")[1:]:
                    if line.lower().startswith("content-length:"):
                        content_length = int(line.split(":", 1)[1].strip())
                body_received = len(raw) - header_end - 4
                if body_received >= content_length:
                    break

        if not chunks:
            return

        raw = b"".join(chunks)
        method, path, _version, headers, body = parse_request(raw)
        response = dispatch(method, path, headers, body)
        conn.sendall(response)
        print(f"  [{datetime.now().strftime('%H:%M:%S')}] {addr[0]} {method} {path}")
    except Exception as e:
        print(f"  [错误] {addr}: {e}")
    finally:
        conn.close()

# ─── 服务器主循环 ────────────────────────────────────────────────────────────

def run_server(host: str = "127.0.0.1", port: int = 8000) -> None:
    server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    server.bind((host, port))
    server.listen(10)
    print(f"🚀 HTTP 服务器启动:http://{host}:{port}")
    print("   路由:GET /health | GET /tasks | POST /tasks")
    print("   按 Ctrl+C 停止\n")
    try:
        while True:
            conn, addr = server.accept()
            t = threading.Thread(target=handle_connection, args=(conn, addr), daemon=True)
            t.start()
    except KeyboardInterrupt:
        print("\n服务器已停止。")
    finally:
        server.close()

# ─── 演示客户端 ──────────────────────────────────────────────────────────────

def print_table(title: str, rows: List[Dict]) -> None:
    print(f"\n{'─'*50}")
    print(f"  {title}")
    print(f"{'─'*50}")
    for row in rows:
        status_icon = "✅" if row.get("done") else "⬜"
        print(f"  [{row['id']:>2}] {status_icon} {row['title']}")
    print(f"{'─'*50}")


def demo_client(base: str = "http://127.0.0.1:8000") -> None:
    def get(path: str) -> dict:
        with urllib.request.urlopen(f"{base}{path}") as r:
            return json.loads(r.read())

    def post(path: str, data: dict) -> dict:
        payload = json.dumps(data).encode()
        req = urllib.request.Request(
            f"{base}{path}", data=payload,
            headers={"Content-Type": "application/json"}, method="POST"
        )
        try:
            with urllib.request.urlopen(req) as r:
                return json.loads(r.read())
        except urllib.error.HTTPError as e:
            return json.loads(e.read())

    print("\n=== HTTP 服务器演示 ===")

    # 1. 健康检查
    health = get("/health")
    print(f"\n[GET /health] → {health}")

    # 2. 获取任务列表
    result = get("/tasks")
    print_table("初始任务列表", result["tasks"])

    # 3. 创建新任务
    new_tasks = [
        {"title": "部署到生产环境"},
        {"title": "写单元测试"},
        {"title": ""},  # 故意触发 400
    ]
    print("\n[POST /tasks] 创建任务:")
    for t in new_tasks:
        resp = post("/tasks", t)
        if "task" in resp:
            print(f"  ✅ 创建成功:{resp['task']}")
        else:
            print(f"  ❌ 失败:{resp['error']}")

    # 4. 再次获取列表
    result = get("/tasks")
    print_table("更新后任务列表", result["tasks"])

    # 5. 404 演示
    try:
        urllib.request.urlopen(f"{base}/nonexistent")
    except urllib.error.HTTPError as e:
        body = json.loads(e.read())
        print(f"\n[GET /nonexistent] → 404: {body['error']}")


# ─── 入口 ────────────────────────────────────────────────────────────────────

def main() -> None:
    parser = argparse.ArgumentParser(description="从零手写 HTTP/1.1 服务器")
    parser.add_argument("--mode", choices=["server", "demo"], default="server",
                        help="server=启动服务器, demo=发请求演示")
    parser.add_argument("--host", default="127.0.0.1")
    parser.add_argument("--port", type=int, default=8000)
    args = parser.parse_args()

    if args.mode == "server":
        run_server(args.host, args.port)
    else:
        demo_client(f"http://{args.host}:{args.port}")


# Step 8 演示:main() 会根据命令行参数调用 run_server 或 demo_client。
# 这里把两个真实函数临时替换成安全的假函数,验证 CLI 分发逻辑,不占端口、不访问网络。
def fake_run_server(host: str, port: int) -> None:
    print(f"server 模式已分发: host={host}, port={port}")


def fake_demo_client(base: str) -> None:
    print(f"demo 模式已分发: base={base}")


run_server = fake_run_server
demo_client = fake_demo_client

import sys
for mode in ("server", "demo"):
    sys.argv = ["prog", "--mode", mode, "--host", "127.0.0.1", "--port", "18080"]
    main()

极客实战:完整源码与运行

现在,把上面的积木拼起来,将以下完整代码放进你的编辑器,运行它。先看整体闭环,再回头逐段改参数,你会更容易建立工程直觉。

#!/usr/bin/env python3
"""
18-http-server.py
从零手写 HTTP/1.1 服务器 + urllib 演示客户端

用法:
  python3 18-http-server.py --mode server   # 启动服务器(Ctrl+C 停止)
  python3 18-http-server.py --mode demo     # 发请求演示(需先启动服务器)
"""

import argparse
import json
import socket
import threading
import time
import urllib.request
import urllib.error
from datetime import datetime
from typing import Callable, Dict, List, Optional, Tuple

# ─── 数据存储(内存) ────────────────────────────────────────────────────────
_tasks: List[Dict] = [
    {"id": 1, "title": "阅读 HTTP RFC", "done": False},
    {"id": 2, "title": "手写 socket 服务器", "done": True},
]
_next_id = 3
_lock = threading.Lock()

# ─── HTTP 解析 ───────────────────────────────────────────────────────────────

def parse_request(raw: bytes) -> Tuple[str, str, str, Dict[str, str], bytes]:
    """解析原始 HTTP 请求,返回 (method, path, version, headers, body)"""
    header_end = raw.find(b"\r\n\r\n")
    header_part = raw[:header_end].decode("utf-8", errors="replace")
    body = raw[header_end + 4:]

    lines = header_part.split("\r\n")
    method, path, version = lines[0].split(" ", 2)
    headers: Dict[str, str] = {}
    for line in lines[1:]:
        if ":" in line:
            k, v = line.split(":", 1)
            headers[k.strip().lower()] = v.strip()

    content_length = int(headers.get("content-length", 0))
    return method, path, version, headers, body[:content_length]


def build_response(status: int, body: dict) -> bytes:
    """构造 JSON 响应报文"""
    status_text = {200: "OK", 201: "Created", 404: "Not Found",
                   405: "Method Not Allowed", 400: "Bad Request"}
    payload = json.dumps(body, ensure_ascii=False).encode("utf-8")
    header = (
        f"HTTP/1.1 {status} {status_text.get(status, 'Unknown')}\r\n"
        f"Content-Type: application/json; charset=utf-8\r\n"
        f"Content-Length: {len(payload)}\r\n"
        f"Connection: close\r\n"
        f"\r\n"
    ).encode("utf-8")
    return header + payload

# ─── 路由处理器 ──────────────────────────────────────────────────────────────

def handle_health(method: str, _path: str, _headers: Dict, _body: bytes) -> bytes:
    if method != "GET":
        return build_response(405, {"error": "Method Not Allowed"})
    return build_response(200, {
        "status": "ok",
        "time": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
        "tasks_count": len(_tasks),
    })


def handle_tasks_get() -> bytes:
    with _lock:
        return build_response(200, {"tasks": list(_tasks), "total": len(_tasks)})


def handle_tasks_post(body: bytes) -> bytes:
    global _next_id
    try:
        data = json.loads(body.decode("utf-8"))
        title = data.get("title", "").strip()
        if not title:
            return build_response(400, {"error": "title 不能为空"})
    except (json.JSONDecodeError, UnicodeDecodeError):
        return build_response(400, {"error": "请求体必须是合法 JSON"})

    with _lock:
        task = {"id": _next_id, "title": title, "done": False}
        _tasks.append(task)
        _next_id += 1
    return build_response(201, {"task": task})


def handle_tasks(method: str, _path: str, _headers: Dict, body: bytes) -> bytes:
    if method == "GET":
        return handle_tasks_get()
    elif method == "POST":
        return handle_tasks_post(body)
    return build_response(405, {"error": "Method Not Allowed"})

# ─── 路由表 ──────────────────────────────────────────────────────────────────

Router = Dict[str, Callable[[str, str, Dict, bytes], bytes]]

ROUTES: Router = {
    "/health": handle_health,
    "/tasks": handle_tasks,
}


def dispatch(method: str, path: str, headers: Dict, body: bytes) -> bytes:
    # 只匹配路径,忽略 query string
    clean_path = path.split("?")[0]
    handler = ROUTES.get(clean_path)
    if handler is None:
        return build_response(404, {"error": f"路径 {clean_path} 不存在"})
    return handler(method, clean_path, headers, body)

# ─── 连接处理(每个连接一个线程) ────────────────────────────────────────────

def handle_connection(conn: socket.socket, addr: Tuple[str, int]) -> None:
    try:
        chunks = []
        while True:
            chunk = conn.recv(4096)
            if not chunk:
                break
            chunks.append(chunk)
            # 简单判断:收到 \r\n\r\n 后再读 Content-Length 字节
            raw = b"".join(chunks)
            if b"\r\n\r\n" in raw:
                header_end = raw.find(b"\r\n\r\n")
                header_part = raw[:header_end].decode("utf-8", errors="replace")
                content_length = 0
                for line in header_part.split("\r\n")[1:]:
                    if line.lower().startswith("content-length:"):
                        content_length = int(line.split(":", 1)[1].strip())
                body_received = len(raw) - header_end - 4
                if body_received >= content_length:
                    break

        if not chunks:
            return

        raw = b"".join(chunks)
        method, path, _version, headers, body = parse_request(raw)
        response = dispatch(method, path, headers, body)
        conn.sendall(response)
        print(f"  [{datetime.now().strftime('%H:%M:%S')}] {addr[0]} {method} {path}")
    except Exception as e:
        print(f"  [错误] {addr}: {e}")
    finally:
        conn.close()

# ─── 服务器主循环 ────────────────────────────────────────────────────────────

def run_server(host: str = "127.0.0.1", port: int = 8000) -> None:
    server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    server.bind((host, port))
    server.listen(10)
    print(f"🚀 HTTP 服务器启动:http://{host}:{port}")
    print("   路由:GET /health | GET /tasks | POST /tasks")
    print("   按 Ctrl+C 停止\n")
    try:
        while True:
            conn, addr = server.accept()
            t = threading.Thread(target=handle_connection, args=(conn, addr), daemon=True)
            t.start()
    except KeyboardInterrupt:
        print("\n服务器已停止。")
    finally:
        server.close()

# ─── 演示客户端 ──────────────────────────────────────────────────────────────

def print_table(title: str, rows: List[Dict]) -> None:
    print(f"\n{'─'*50}")
    print(f"  {title}")
    print(f"{'─'*50}")
    for row in rows:
        status_icon = "✅" if row.get("done") else "⬜"
        print(f"  [{row['id']:>2}] {status_icon} {row['title']}")
    print(f"{'─'*50}")


def demo_client(base: str = "http://127.0.0.1:8000") -> None:
    def get(path: str) -> dict:
        with urllib.request.urlopen(f"{base}{path}") as r:
            return json.loads(r.read())

    def post(path: str, data: dict) -> dict:
        payload = json.dumps(data).encode()
        req = urllib.request.Request(
            f"{base}{path}", data=payload,
            headers={"Content-Type": "application/json"}, method="POST"
        )
        try:
            with urllib.request.urlopen(req) as r:
                return json.loads(r.read())
        except urllib.error.HTTPError as e:
            return json.loads(e.read())

    print("\n=== HTTP 服务器演示 ===")

    # 1. 健康检查
    health = get("/health")
    print(f"\n[GET /health] → {health}")

    # 2. 获取任务列表
    result = get("/tasks")
    print_table("初始任务列表", result["tasks"])

    # 3. 创建新任务
    new_tasks = [
        {"title": "部署到生产环境"},
        {"title": "写单元测试"},
        {"title": ""},  # 故意触发 400
    ]
    print("\n[POST /tasks] 创建任务:")
    for t in new_tasks:
        resp = post("/tasks", t)
        if "task" in resp:
            print(f"  ✅ 创建成功:{resp['task']}")
        else:
            print(f"  ❌ 失败:{resp['error']}")

    # 4. 再次获取列表
    result = get("/tasks")
    print_table("更新后任务列表", result["tasks"])

    # 5. 404 演示
    try:
        urllib.request.urlopen(f"{base}/nonexistent")
    except urllib.error.HTTPError as e:
        body = json.loads(e.read())
        print(f"\n[GET /nonexistent] → 404: {body['error']}")


# ─── 入口 ────────────────────────────────────────────────────────────────────

def main() -> None:
    parser = argparse.ArgumentParser(description="从零手写 HTTP/1.1 服务器")
    parser.add_argument("--mode", choices=["server", "demo"], default="server",
                        help="server=启动服务器, demo=发请求演示")
    parser.add_argument("--host", default="127.0.0.1")
    parser.add_argument("--port", type=int, default=8000)
    args = parser.parse_args()

    if args.mode == "server":
        run_server(args.host, args.port)
    else:
        demo_client(f"http://{args.host}:{args.port}")


if __name__ == "__main__":
    import sys
    sys.argv = ["", "--mode", "demo"]
    # main()  # 需要网络环境
    print("HTTP Server 演示(沙箱无网络,跳过实际启动)")
$ python3 18-python-http-server.py --mode server
🚀 HTTP 服务器启动:http://127.0.0.1:8000
   路由:GET /health | GET /tasks | POST /tasks
   按 Ctrl+C 停止

  [22:54:12] 127.0.0.1 GET /health
  [22:54:12] 127.0.0.1 GET /tasks
  [22:54:12] 127.0.0.1 POST /tasks
  [22:54:12] 127.0.0.1 POST /tasks
  [22:54:12] 127.0.0.1 POST /tasks
  [22:54:12] 127.0.0.1 GET /tasks
  [22:54:12] 127.0.0.1 GET /nonexistent

$ python3 18-python-http-server.py --mode demo

=== HTTP 服务器演示 ===

[GET /health]{'status': 'ok', 'time': '2026-04-17 22:54:12', 'tasks_count': 2}

──────────────────────────────────────────────────
  初始任务列表
──────────────────────────────────────────────────
  [ 1] ⬜ 阅读 HTTP RFC
  [ 2] ✅ 手写 socket 服务器
──────────────────────────────────────────────────

[POST /tasks] 创建任务:
  ✅ 创建成功:{'id': 3, 'title': '部署到生产环境', 'done': False}
  ✅ 创建成功:{'id': 4, 'title': '写单元测试', 'done': False}
  ❌ 失败:title 不能为空

──────────────────────────────────────────────────
  更新后任务列表
──────────────────────────────────────────────────
  [ 1] ⬜ 阅读 HTTP RFC
  [ 2] ✅ 手写 socket 服务器
  [ 3] ⬜ 部署到生产环境
  [ 4] ⬜ 写单元测试
──────────────────────────────────────────────────

[GET /nonexistent] → 404: 路径 /nonexistent 不存在

小结

概念 一句话记忆
parse_request \r\n\r\n 切头部,按 Content-Length 截正文
build_response 状态行 + 响应头 + 空行 + JSON 正文,Content-Length 必须准确
ROUTES 字典 O(1) 路由查找,Flask/FastAPI 路由注册的核心思想
dispatch 去掉 query string 后查字典,找不到返回 404
handle_connection 循环 recv 直到收完完整请求,每个连接独立线程
SO_REUSEADDR 服务器重启不报"地址已被占用"
daemon=True 守护线程随主进程退出,不阻塞程序关闭
urllib.error.HTTPError 状态码 ≥ 400 时抛出,但 e.read() 仍可读响应体

⏱ NexDo Time(5 分钟)

挑战:给服务器加一个 DELETE /tasks/<id> 路由,支持按 ID 删除任务。

具体步骤:

  1. dispatch 里加路径参数解析:检测路径是否匹配 /tasks/ 前缀,提取末尾的数字 ID
  2. 新增 handle_task_delete(task_id: int) -> bytes 函数:在 _tasks 列表里找到对应 ID 的任务并删除,找不到返回 404
  3. ROUTES 里注册这个新处理器(或在 dispatch 里特殊处理)
  4. urllib.request.Request(..., method="DELETE") 发一个删除请求,验证任务确实消失了

Don’t wait for next time, do it in the next moment.