文章

24 · JWT 鉴权:Token 签发与验证原理

#026 · 2026-04-16 · Python

🔗 知识图谱导航:阅读本文前,建议先掌握《21 · 表单、会话与中间件原理》中的 Cookie/Session 机制——本文是它的替代方案:JWT 把用户状态编码进 Token,服务端不需要存储 Session,适合前后端分离和微服务场景。

运行环境:Python 3.12+ 标准库,零额外依赖,直接运行。

极客解析:JWT 的本质是"带防伪钢印的游乐园门票":门票上印着你的信息(payload),钢印是用密钥签的名(signature)。验票员不需要查数据库,只需要验钢印——这就是"无状态鉴权"的核心。

JWT 结构

eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9   ← header(Base64URL 编码)
.
eyJzdWIiOiJhbGljZSIsImV4cCI6MTc3NjQ0fQ  ← payload(Base64URL 编码)
.
SflKxwRJSMeKKF2QT4fwpMeJf36POk6yJV_adQssw5c  ← signature(HMAC-SHA256)

header  = {"alg": "HS256", "typ": "JWT"}
payload = {"sub": "alice", "exp": 1776444000, "role": "admin"}
signature = HMAC-SHA256(secret, "header.payload")

Session vs JWT

Session(有状态):
  客户端 → Cookie: sessionid=abc → 服务端查 _sessions["abc"] → 得到用户信息
  缺点:服务端需要存储所有 Session,水平扩展时需要共享 Session 存储

JWT(无状态):
  客户端 → Authorization: Bearer <token> → 服务端验签 → 从 payload 得到用户信息
  优点:服务端不存储任何状态,任意节点都能验证
  缺点:Token 无法主动吊销(只能等过期)

安全要点

hmac.compare_digest  时序安全比较,防止通过响应时间推断签名内容
HttpOnly Cookie      存 Token 防 XSS(比 localStorage 更安全)
短过期时间           Access Token 1小时,Refresh Token 7天
HTTPS 传输           防止 Token 被中间人截获
不在 payload 存密码  payload 只是 Base64 编码,不是加密

步步为营:核心逻辑自适应拆解

这一篇的核心是 JWT 的三层结构:Base64URL 编码(可逆)→ HMAC-SHA256 签名(不可逆)→ 时序安全验证(防攻击)。下面每一步都聚焦一个机制,零依赖可直接运行。

Step 1:用 _b64url_encode/_b64url_decode 理解 JWT 的基础编码层

痛点与机制

Base64URL 是 Base64 的变体:把 + 换成 -/ 换成 _,去掉末尾的 = 填充。这样编码结果可以安全地放在 URL 里,不需要 URL 编码。注意:Base64URL 是编码不是加密——任何人都能解码 JWT 的 header 和 payload,所以 payload 里绝对不能存密码或敏感信息。

核心源码(逐字来自文末完整源码)

def _b64url_encode(data: bytes) -> str:
    """Base64URL 编码(无填充)"""
    return base64.urlsafe_b64encode(data).rstrip(b"=").decode("ascii")


def _b64url_decode(s: str) -> bytes:
    """Base64URL 解码(补齐填充)"""
    padding = 4 - len(s) % 4
    if padding != 4:
        s += "=" * padding
    return base64.urlsafe_b64decode(s)

可运行演示(补齐 Mock 数据与 print 反馈)

import base64
import json


def _b64url_encode(data: bytes) -> str:
    """Base64URL 编码(无填充)"""
    return base64.urlsafe_b64encode(data).rstrip(b"=").decode("ascii")


def _b64url_decode(s: str) -> bytes:
    """Base64URL 解码(补齐填充)"""
    padding = 4 - len(s) % 4
    if padding != 4:
        s += "=" * padding
    return base64.urlsafe_b64decode(s)


header = {"alg": "HS256", "typ": "JWT"}
payload = {"sub": "alice", "role": "admin"}
header_b64 = _b64url_encode(json.dumps(header).encode())
payload_b64 = _b64url_encode(json.dumps(payload).encode())
print("header 编码:", header_b64)
print("payload 编码:", payload_b64)
print("header 解码:", json.loads(_b64url_decode(header_b64)))
print("payload 解码:", json.loads(_b64url_decode(payload_b64)))

Step 2:用 JWT.encode 实现 HMAC-SHA256 签名,生成三段式 Token

痛点与机制

JWT.encode 的签名过程:① 把 header 和 payload 分别 Base64URL 编码;② 用 hmac.new(secret, "header.payload", sha256) 计算签名;③ 把三段用 . 拼接。exp(expiration)是 JWT 标准字段,值是 Unix 时间戳,time.time() + expires_in 计算过期时间。签名的输入是 "header.payload" 字符串,任何一个字符改变都会导致签名不匹配。

核心源码(逐字来自文末完整源码)

    @classmethod
    def encode(cls, payload: Dict, expires_in: int = 3600) -> str:
        """
        生成 JWT Token
        :param payload: 自定义声明字典
        :param expires_in: 有效期(秒),默认 1 小时
        """
        now = int(time.time())
        full_payload = {**payload, "iat": now, "exp": now + expires_in}
        payload_b64 = _b64url_encode(json.dumps(full_payload, separators=(",", ":")).encode())
        signing_input = f"{cls.HEADER}.{payload_b64}"
        sig = hmac.new(
            SECRET_KEY.encode("utf-8"),
            signing_input.encode("utf-8"),
            hashlib.sha256
        ).digest()
        return f"{signing_input}.{_b64url_encode(sig)}"

可运行演示(补齐 Mock 数据与 print 反馈)

import base64
import hashlib
import hmac
import json
import time
from typing import Dict

SECRET_KEY = "nexdo-secret-2026"

def _b64url_encode(data: bytes) -> str:
    return base64.urlsafe_b64encode(data).rstrip(b"=").decode("ascii")

class JWT:
    HEADER = _b64url_encode(json.dumps({"alg": "HS256", "typ": "JWT"}).encode())

    @classmethod
    def encode(cls, payload: Dict, expires_in: int = 3600) -> str:
        now = int(time.time())
        full_payload = {**payload, "iat": now, "exp": now + expires_in}
        payload_b64 = _b64url_encode(json.dumps(full_payload, separators=(",", ":")).encode())
        signing_input = f"{cls.HEADER}.{payload_b64}"
        sig = hmac.new(SECRET_KEY.encode("utf-8"), signing_input.encode("utf-8"), hashlib.sha256).digest()
        return f"{signing_input}.{_b64url_encode(sig)}"

token = JWT.encode({"sub": "alice", "role": "admin"}, expires_in=3600)
header_b64, payload_b64, sig_b64 = token.split(".")
print("JWT 三段式:")
print("1 header   =", header_b64)
print("2 payload  =", payload_b64)
print("3 signature=", sig_b64[:24] + "...")
print("完整 token 前 80 字符:", token[:80] + "...")

Step 3:用 hmac.compare_digest 做时序安全验证,防止签名伪造

痛点与机制

hmac.compare_digest 是时序安全比较——普通的 == 在字符串不同时会提前返回,攻击者可以通过测量响应时间推断签名的正确字符数(时序攻击)。compare_digest 始终比较完整字符串,响应时间不泄露任何信息。JWT.decode 还检查 exp 字段:如果签名不匹配、格式错误或 Token 已过期,就返回 None。这个设计让调用方只需要判断“有没有拿到 payload”。

核心源码(逐字来自文末完整源码)

    @classmethod
    def decode(cls, token: str) -> Optional[Dict]:
        """
        验证并解码 JWT Token
        :return: payload 字典,验证失败或过期返回 None
        """
        parts = token.split(".")
        if len(parts) != 3:
            return None

        header_b64, payload_b64, sig_b64 = parts
        signing_input = f"{header_b64}.{payload_b64}"

        # 验证签名
        expected_sig = hmac.new(
            SECRET_KEY.encode("utf-8"),
            signing_input.encode("utf-8"),
            hashlib.sha256
        ).digest()
        try:
            actual_sig = _b64url_decode(sig_b64)
        except Exception:
            return None

        if not hmac.compare_digest(expected_sig, actual_sig):
            return None  # 签名不匹配

        # 解码 Payload
        try:
            payload = json.loads(_b64url_decode(payload_b64))
        except Exception:
            return None

        # 检查过期
        if payload.get("exp", 0) < int(time.time()):
            return None  # Token 已过期

        return payload

可运行演示(补齐 Mock 数据与 print 反馈)

import base64
import hashlib
import hmac
import json
import time
from typing import Dict, Optional

SECRET_KEY = "nexdo-secret-2026"

def _b64url_encode(data: bytes) -> str:
    return base64.urlsafe_b64encode(data).rstrip(b"=").decode("ascii")

def _b64url_decode(s: str) -> bytes:
    padding = 4 - len(s) % 4
    if padding != 4:
        s += "=" * padding
    return base64.urlsafe_b64decode(s)

class JWT:
    HEADER = _b64url_encode(json.dumps({"alg": "HS256", "typ": "JWT"}).encode())

    @classmethod
    def encode(cls, payload: Dict, expires_in: int = 3600) -> str:
        now = int(time.time())
        full_payload = {**payload, "iat": now, "exp": now + expires_in}
        payload_b64 = _b64url_encode(json.dumps(full_payload, separators=(",", ":")).encode())
        signing_input = f"{cls.HEADER}.{payload_b64}"
        sig = hmac.new(SECRET_KEY.encode(), signing_input.encode(), hashlib.sha256).digest()
        return f"{signing_input}.{_b64url_encode(sig)}"

    @classmethod
    def decode(cls, token: str) -> Optional[Dict]:
        parts = token.split(".")
        if len(parts) != 3:
            return None
        header_b64, payload_b64, sig_b64 = parts
        signing_input = f"{header_b64}.{payload_b64}"
        expected_sig = hmac.new(SECRET_KEY.encode(), signing_input.encode(), hashlib.sha256).digest()
        try:
            actual_sig = _b64url_decode(sig_b64)
            payload = json.loads(_b64url_decode(payload_b64))
        except Exception:
            return None
        if not hmac.compare_digest(expected_sig, actual_sig):
            return None
        if payload.get("exp", 0) < int(time.time()):
            return None
        return payload

valid = JWT.encode({"sub": "alice"}, expires_in=60)
tampered = valid[:-4] + "AAAA"
expired = JWT.encode({"sub": "bob"}, expires_in=-1)
print("合法 token:", JWT.decode(valid))
print("篡改 token:", JWT.decode(tampered))
print("过期 token:", JWT.decode(expired))

Step 4:用 JWT.inspect 实现不验签的调试解码

痛点与机制

JWT.inspect 只解码 payload,不验证签名——用于调试和日志,让你看到 Token 里存了什么。生产环境永远不能inspect 替代 decode 做鉴权,否则任何人都能伪造 payload 欺骗日志和页面展示;真正授权必须用 decode 验签。这个方法的存在是为了演示"Base64URL 是编码不是加密"——即使不知道密钥,也能看到 payload 内容。

核心源码(逐字来自文末完整源码)

    @classmethod
    def inspect(cls, token: str) -> Dict:
        """解码 Token 内容(不验证签名,用于调试)"""
        parts = token.split(".")
        if len(parts) != 3:
            return {"error": "格式错误"}
        try:
            header = json.loads(_b64url_decode(parts[0]))
            payload = json.loads(_b64url_decode(parts[1]))
            return {"header": header, "payload": payload, "signature": parts[2][:16] + "..."}
        except Exception as e:
            return {"error": str(e)}

可运行演示(补齐 Mock 数据与 print 反馈)

import base64
import hashlib
import hmac
import json
import time
from typing import Dict

SECRET_KEY = "nexdo-secret-2026"

def _b64url_encode(data: bytes) -> str:
    return base64.urlsafe_b64encode(data).rstrip(b"=").decode("ascii")

def _b64url_decode(s: str) -> bytes:
    padding = 4 - len(s) % 4
    if padding != 4:
        s += "=" * padding
    return base64.urlsafe_b64decode(s)

class JWT:
    HEADER = _b64url_encode(json.dumps({"alg": "HS256", "typ": "JWT"}).encode())

    @classmethod
    def encode(cls, payload: Dict, expires_in: int = 3600) -> str:
        now = int(time.time())
        full_payload = {**payload, "iat": now, "exp": now + expires_in}
        payload_b64 = _b64url_encode(json.dumps(full_payload, separators=(",", ":")).encode())
        signing_input = f"{cls.HEADER}.{payload_b64}"
        sig = hmac.new(SECRET_KEY.encode(), signing_input.encode(), hashlib.sha256).digest()
        return f"{signing_input}.{_b64url_encode(sig)}"

    @classmethod
    def inspect(cls, token: str) -> Dict:
        parts = token.split(".")
        if len(parts) != 3:
            return {"error": "格式错误"}
        try:
            header = json.loads(_b64url_decode(parts[0]))
            payload = json.loads(_b64url_decode(parts[1]))
            return {"header": header, "payload": payload, "signature": parts[2][:16] + "..."}
        except Exception as e:
            return {"error": str(e)}

token = JWT.encode({"sub": "alice", "role": "admin"})
info = JWT.inspect(token)
print("inspect 只负责看内容,不负责鉴权:")
print("header:", info["header"])
print("payload:", info["payload"])
print("signature 摘要:", info["signature"])

Step 5:用 JWTHandler 启动完整的 JWT 鉴权服务,测试登录接口

痛点与机制

JWTHandler 实现了完整的 JWT 鉴权流程:POST /api/auth/login 验证用户名密码,成功后签发 access_token(1小时)和 refresh_token(7天)。access_token 用于日常 API 调用,refresh_token 用于在 access_token 过期后换取新的 access_token,避免用户频繁重新登录。

核心源码(逐字来自文末完整源码)

class JWTHandler(http.server.BaseHTTPRequestHandler):
    def log_message(self, *args): pass

    def _send(self, status: int, body: Dict) -> None:
        payload = json.dumps(body, ensure_ascii=False).encode("utf-8")
        self.send_response(status)
        self.send_header("Content-Type", "application/json; charset=utf-8")
        self.send_header("Content-Length", str(len(payload)))
        self.end_headers()
        self.wfile.write(payload)

    def _read_body(self) -> Optional[Dict]:
        length = int(self.headers.get("Content-Length", 0))
        if not length:
            return {}
        try:
            return json.loads(self.rfile.read(length))
        except json.JSONDecodeError:
            return None

    def _get_token(self) -> Optional[str]:
        auth = self.headers.get("Authorization", "")
        if auth.startswith("Bearer "):
            return auth[7:]
        return None

    def _require_auth(self) -> Optional[Dict]:
        """验证 Token,返回 payload 或发送 401"""
        token = self._get_token()
        if not token:
            self._send(401, {"error": "缺少 Authorization 头"})
            return None
        payload = JWT.decode(token)
        if payload is None:
            self._send(401, {"error": "Token 无效或已过期"})
            return None
        return payload

    def do_POST(self) -> None:
        if self.path == "/api/auth/login":
            self._login()
        elif self.path == "/api/auth/refresh":
            self._refresh()
        else:
            self._send(404, {"error": "路径不存在"})

    def do_GET(self) -> None:
        if self.path == "/api/me":
            self._me()
        elif self.path == "/api/admin/users":
            self._admin_users()
        elif self.path == "/api/health":
            self._send(200, {"status": "ok"})
        else:
            self._send(404, {"error": "路径不存在"})

    def _login(self) -> None:
        body = self._read_body()
        if body is None:
            self._send(400, {"error": "请求体必须是 JSON"})
            return
        username = body.get("username", "").strip()
        password = body.get("password", "")
        user = _users.get(username)
        if not user or user["password"] != password:
            self._send(401, {"error": "用户名或密码错误"})
            return

        # 生成 Access Token(1小时)和 Refresh Token(7天)
        access_token = JWT.encode(
            {"user_id": user["id"], "username": username, "role": user["role"]},
            expires_in=3600
        )
        refresh_token = JWT.encode(
            {"user_id": user["id"], "username": username, "type": "refresh"},
            expires_in=7 * 24 * 3600
        )
        self._send(200, {
            "access_token": access_token,
            "refresh_token": refresh_token,
            "token_type": "Bearer",
            "expires_in": 3600,
        })
        print(f"  [POST /api/auth/login] 用户 {username} 登录成功")

    def _refresh(self) -> None:
        body = self._read_body()
        if body is None:
            self._send(400, {"error": "请求体必须是 JSON"})
            return
        refresh_token = body.get("refresh_token", "")
        payload = JWT.decode(refresh_token)
        if payload is None or payload.get("type") != "refresh":
            self._send(401, {"error": "Refresh Token 无效或已过期"})
            return
        username = payload["username"]
        user = _users.get(username)
        if not user:
            self._send(401, {"error": "用户不存在"})
            return
        new_token = JWT.encode(
            {"user_id": user["id"], "username": username, "role": user["role"]},
            expires_in=3600
        )
        self._send(200, {"access_token": new_token, "expires_in": 3600})

    def _me(self) -> None:
        payload = self._require_auth()
        if payload is None:
            return
        self._send(200, {
            "user_id": payload["user_id"],
            "username": payload["username"],
            "role": payload["role"],
            "token_expires_at": datetime.fromtimestamp(payload["exp"]).isoformat(),
        })

    def _admin_users(self) -> None:
        payload = self._require_auth()
        if payload is None:
            return
        if payload.get("role") != "admin":
            self._send(403, {"error": "需要管理员权限"})
            return
        users = [{"id": u["id"], "username": name, "role": u["role"]}
                 for name, u in _users.items()]
        self._send(200, {"users": users})

可运行演示(补齐 Mock 数据与 print 反馈)

import base64
import hashlib
import hmac
import json
import time
from typing import Dict

SECRET_KEY = "nexdo-secret-2026"
_users = {
    "alice": {"password": "pass123", "role": "admin", "id": 1},
    "bob": {"password": "secret", "role": "user", "id": 2},
}

def b64(data: bytes) -> str:
    return base64.urlsafe_b64encode(data).rstrip(b"=").decode()

def sign(payload: Dict, expires_in: int) -> str:
    header = b64(json.dumps({"alg": "HS256", "typ": "JWT"}).encode())
    now = int(time.time())
    payload_b64 = b64(json.dumps({**payload, "iat": now, "exp": now + expires_in}, separators=(",", ":")).encode())
    msg = f"{header}.{payload_b64}"
    sig = b64(hmac.new(SECRET_KEY.encode(), msg.encode(), hashlib.sha256).digest())
    return f"{msg}.{sig}"

def login(username: str, password: str) -> tuple[int, Dict]:
    user = _users.get(username)
    if not user or user["password"] != password:
        return 401, {"error": "用户名或密码错误"}
    access_token = sign({"user_id": user["id"], "username": username, "role": user["role"]}, 3600)
    refresh_token = sign({"user_id": user["id"], "username": username, "type": "refresh"}, 7 * 24 * 3600)
    return 200, {"access_token": access_token, "refresh_token": refresh_token, "token_type": "Bearer", "expires_in": 3600}

for username, password in [("alice", "pass123"), ("alice", "wrong")]:
    status, body = login(username, password)
    print(f"POST /api/auth/login {username!r} -> {status}")
    print("响应字段:", list(body.keys()))
    if "access_token" in body:
        print("access_token 前 40 字符:", body["access_token"][:40] + "...")

Step 6:用 Authorization: Bearer 头携带 Token,测试 /api/me 鉴权

痛点与机制

_require_auth 是鉴权中间件:从 Authorization 请求头里提取 Bearer <token>,调用 JWT.decode 验证签名和过期时间,把 payload 写入上下文。不带 Token 或 Token 无效时返回 401。这个模式对应 FastAPI 的 Depends(get_current_user) 依赖注入——把鉴权逻辑抽成可复用的函数,每个需要鉴权的路由都调用它。

核心源码(逐字来自文末完整源码)

class JWTHandler(http.server.BaseHTTPRequestHandler):
    def log_message(self, *args): pass

    def _send(self, status: int, body: Dict) -> None:
        payload = json.dumps(body, ensure_ascii=False).encode("utf-8")
        self.send_response(status)
        self.send_header("Content-Type", "application/json; charset=utf-8")
        self.send_header("Content-Length", str(len(payload)))
        self.end_headers()
        self.wfile.write(payload)

    def _read_body(self) -> Optional[Dict]:
        length = int(self.headers.get("Content-Length", 0))
        if not length:
            return {}
        try:
            return json.loads(self.rfile.read(length))
        except json.JSONDecodeError:
            return None

    def _get_token(self) -> Optional[str]:
        auth = self.headers.get("Authorization", "")
        if auth.startswith("Bearer "):
            return auth[7:]
        return None

    def _require_auth(self) -> Optional[Dict]:
        """验证 Token,返回 payload 或发送 401"""
        token = self._get_token()
        if not token:
            self._send(401, {"error": "缺少 Authorization 头"})
            return None
        payload = JWT.decode(token)
        if payload is None:
            self._send(401, {"error": "Token 无效或已过期"})
            return None
        return payload

    def do_POST(self) -> None:
        if self.path == "/api/auth/login":
            self._login()
        elif self.path == "/api/auth/refresh":
            self._refresh()
        else:
            self._send(404, {"error": "路径不存在"})

    def do_GET(self) -> None:
        if self.path == "/api/me":
            self._me()
        elif self.path == "/api/admin/users":
            self._admin_users()
        elif self.path == "/api/health":
            self._send(200, {"status": "ok"})
        else:
            self._send(404, {"error": "路径不存在"})

    def _login(self) -> None:
        body = self._read_body()
        if body is None:
            self._send(400, {"error": "请求体必须是 JSON"})
            return
        username = body.get("username", "").strip()
        password = body.get("password", "")
        user = _users.get(username)
        if not user or user["password"] != password:
            self._send(401, {"error": "用户名或密码错误"})
            return

        # 生成 Access Token(1小时)和 Refresh Token(7天)
        access_token = JWT.encode(
            {"user_id": user["id"], "username": username, "role": user["role"]},
            expires_in=3600
        )
        refresh_token = JWT.encode(
            {"user_id": user["id"], "username": username, "type": "refresh"},
            expires_in=7 * 24 * 3600
        )
        self._send(200, {
            "access_token": access_token,
            "refresh_token": refresh_token,
            "token_type": "Bearer",
            "expires_in": 3600,
        })
        print(f"  [POST /api/auth/login] 用户 {username} 登录成功")

    def _refresh(self) -> None:
        body = self._read_body()
        if body is None:
            self._send(400, {"error": "请求体必须是 JSON"})
            return
        refresh_token = body.get("refresh_token", "")
        payload = JWT.decode(refresh_token)
        if payload is None or payload.get("type") != "refresh":
            self._send(401, {"error": "Refresh Token 无效或已过期"})
            return
        username = payload["username"]
        user = _users.get(username)
        if not user:
            self._send(401, {"error": "用户不存在"})
            return
        new_token = JWT.encode(
            {"user_id": user["id"], "username": username, "role": user["role"]},
            expires_in=3600
        )
        self._send(200, {"access_token": new_token, "expires_in": 3600})

    def _me(self) -> None:
        payload = self._require_auth()
        if payload is None:
            return
        self._send(200, {
            "user_id": payload["user_id"],
            "username": payload["username"],
            "role": payload["role"],
            "token_expires_at": datetime.fromtimestamp(payload["exp"]).isoformat(),
        })

    def _admin_users(self) -> None:
        payload = self._require_auth()
        if payload is None:
            return
        if payload.get("role") != "admin":
            self._send(403, {"error": "需要管理员权限"})
            return
        users = [{"id": u["id"], "username": name, "role": u["role"]}
                 for name, u in _users.items()]
        self._send(200, {"users": users})

可运行演示(补齐 Mock 数据与 print 反馈)

import base64
import hashlib
import hmac
import json
import time
from typing import Dict, Optional

SECRET_KEY = "nexdo-secret-2026"

def enc(data: bytes) -> str:
    return base64.urlsafe_b64encode(data).rstrip(b"=").decode()

def dec(s: str) -> bytes:
    pad = 4 - len(s) % 4
    if pad != 4:
        s += "=" * pad
    return base64.urlsafe_b64decode(s)

def issue(payload: Dict, expires_in: int = 3600) -> str:
    header = enc(json.dumps({"alg": "HS256", "typ": "JWT"}).encode())
    now = int(time.time())
    payload_b64 = enc(json.dumps({**payload, "iat": now, "exp": now + expires_in}, separators=(",", ":")).encode())
    msg = f"{header}.{payload_b64}"
    sig = enc(hmac.new(SECRET_KEY.encode(), msg.encode(), hashlib.sha256).digest())
    return f"{msg}.{sig}"

def verify(token: str) -> Optional[Dict]:
    try:
        h, p, s = token.split(".")
        msg = f"{h}.{p}"
        expected = hmac.new(SECRET_KEY.encode(), msg.encode(), hashlib.sha256).digest()
        if not hmac.compare_digest(expected, dec(s)):
            return None
        payload = json.loads(dec(p))
        return payload if payload["exp"] >= int(time.time()) else None
    except Exception:
        return None

def api_me(authorization: str) -> tuple[int, Dict]:
    if not authorization.startswith("Bearer "):
        return 401, {"error": "缺少 Authorization 头"}
    payload = verify(authorization[7:])
    if not payload:
        return 401, {"error": "Token 无效或已过期"}
    return 200, {"user_id": payload["user_id"], "username": payload["username"], "role": payload["role"]}

token = issue({"user_id": 1, "username": "alice", "role": "admin"})
for header in [f"Bearer {token}", token, "Bearer bad.token.value"]:
    print("Authorization:", header[:32] + "...")
    print("GET /api/me ->", api_me(header))

Step 7:用 demo_jwt_offline 演示完整的 JWT 生命周期

痛点与机制

demo_jwt_offline 不启动服务器,直接调用 JWT.encode/JWT.decode/JWT.inspect,演示 Token 的完整生命周期:生成 → 解码 → 篡改检测 → 过期检测。这是理解 JWT 原理的最快路径——不需要网络,不需要服务器,只需要看代码输出。

核心源码(逐字来自文末完整源码)

def demo_jwt_offline() -> None:
    print("\n=== JWT 原理演示(离线)===\n")

    # 生成 Token
    payload = {"user_id": 42, "username": "alice", "role": "admin"}
    token = JWT.encode(payload, expires_in=60)
    print(f"生成 Token:\n  {token}\n")

    # 解码
    info = JWT.inspect(token)
    print(f"Header:  {info['header']}")
    print(f"Payload: {info['payload']}")
    print(f"Sig:     {info['signature']}\n")

    # 验证
    decoded = JWT.decode(token)
    print(f"验证结果:{'✅ 有效' if decoded else '❌ 无效'}")
    if decoded:
        exp_str = datetime.fromtimestamp(decoded["exp"]).strftime("%Y-%m-%d %H:%M:%S")
        print(f"  用户: {decoded['username']},角色: {decoded['role']},过期: {exp_str}")

    # 篡改测试
    parts = token.split(".")
    fake_payload = _b64url_encode(
        json.dumps({"user_id": 42, "username": "alice", "role": "super_admin",
                    "iat": int(time.time()), "exp": int(time.time()) + 60}).encode()
    )
    tampered = f"{parts[0]}.{fake_payload}.{parts[2]}"
    result = JWT.decode(tampered)
    print(f"\n篡改 Payload(role→super_admin)后验证:{'✅ 有效' if result else '❌ 签名不匹配,拒绝'}")

    # 过期测试
    expired_token = JWT.encode({"user_id": 1}, expires_in=-1)  # 立即过期
    result = JWT.decode(expired_token)
    print(f"过期 Token 验证:{'✅ 有效' if result else '❌ 已过期,拒绝'}")

    # Token 生命周期表
    print(f"\n{'─'*55}")
    print(f"  {'Token 类型':<20} {'有效期':<12} {'用途'}")
    print(f"{'─'*55}")
    rows = [
        ("Access Token",   "15分钟~1小时", "携带在每次 API 请求头"),
        ("Refresh Token",  "7天~30天",     "仅用于换取新 Access Token"),
        ("Remember Token", "30天~90天",    "记住登录状态(可选)"),
    ]
    for name, ttl, usage in rows:
        print(f"  {name:<20} {ttl:<12} {usage}")
    print(f"{'─'*55}")

可运行演示(补齐 Mock 数据与 print 反馈)

import base64
import hashlib
import hmac
import json
import time
from typing import Dict, Optional

SECRET_KEY = "nexdo-secret-2026"

def b64(data: bytes) -> str:
    return base64.urlsafe_b64encode(data).rstrip(b"=").decode()

def unb64(s: str) -> bytes:
    pad = 4 - len(s) % 4
    if pad != 4:
        s += "=" * pad
    return base64.urlsafe_b64decode(s)

def encode(payload: Dict, expires_in: int = 3600) -> str:
    header = b64(json.dumps({"alg": "HS256", "typ": "JWT"}).encode())
    now = int(time.time())
    payload_b64 = b64(json.dumps({**payload, "iat": now, "exp": now + expires_in}, separators=(",", ":")).encode())
    msg = f"{header}.{payload_b64}"
    sig = b64(hmac.new(SECRET_KEY.encode(), msg.encode(), hashlib.sha256).digest())
    return f"{msg}.{sig}"

def decode(token: str) -> Optional[Dict]:
    try:
        h, p, s = token.split(".")
        expected = hmac.new(SECRET_KEY.encode(), f"{h}.{p}".encode(), hashlib.sha256).digest()
        if not hmac.compare_digest(expected, unb64(s)):
            return None
        payload = json.loads(unb64(p))
        return payload if payload["exp"] >= int(time.time()) else None
    except Exception:
        return None

print("=== JWT 原理演示(离线)===")
token = encode({"user_id": 1, "username": "alice", "role": "admin"}, expires_in=60)
print("1. 签发 token:", token[:60] + "...")
print("2. 调试解码 payload:", json.loads(unb64(token.split('.')[1])))
print("3. 验签结果:", decode(token))
print("4. 篡改签名后:", decode(token[:-3] + "xxx"))
print("5. 过期 token:", decode(encode({"username": "bob"}, expires_in=-1)))

Step 8:用 main 做 server/demo/jwt 三种模式的 CLI 总入口

痛点与机制

mainargparse 做 CLI 入口:--mode server 启动服务器,--mode demo 发请求演示完整的登录→鉴权→刷新流程,--mode jwt 运行离线 JWT 原理演示。三种模式覆盖了 JWT 的三个学习层次:原理理解(jwt)→ 接口测试(demo)→ 生产部署(server)。

核心源码(逐字来自文末完整源码)

def main() -> None:
    parser = argparse.ArgumentParser(description="JWT 鉴权演示(零依赖)")
    parser.add_argument("--mode", choices=["server", "demo", "jwt"], default="jwt",
                        help="server=启动服务器, demo=演示客户端, jwt=JWT原理离线演示")
    parser.add_argument("--port", type=int, default=8004)
    args = parser.parse_args()

    if args.mode == "server":
        run_server(args.port)
    elif args.mode == "demo":
        demo_client(f"http://127.0.0.1:{args.port}")
    else:
        demo_jwt_offline()

可运行演示(补齐 Mock 数据与 print 反馈)

import argparse
import sys

def run_server(port: int = 8004) -> None:
    print(f"server 模式: 将启动 JWT 鉴权服务 http://127.0.0.1:{port}")

def demo_client(base: str = "http://127.0.0.1:8004") -> None:
    print(f"demo 模式: 将测试 {base}/api/auth/login、/api/me、/api/admin/users")

def demo_jwt_offline() -> None:
    print("jwt 模式: 离线演示签发、inspect、decode、篡改检测、过期检测")

def main() -> None:
    parser = argparse.ArgumentParser(description="JWT Token 签发与验证演示")
    parser.add_argument("--mode", choices=["server", "demo", "jwt"], default="jwt")
    parser.add_argument("--port", type=int, default=8004)
    args = parser.parse_args()
    if args.mode == "server":
        run_server(args.port)
    elif args.mode == "demo":
        demo_client(f"http://127.0.0.1:{args.port}")
    else:
        demo_jwt_offline()

for mode in ["jwt", "server", "demo"]:
    sys.argv = ["prog", "--mode", mode, "--port", "18084"]
    print(f">>> python3 24-python-jwt-fastapi.py --mode {mode}")
    main()

极客实战:完整源码与运行

现在,把上面的积木拼起来,将以下完整代码放进你的编辑器,运行它。先看整体闭环,再回头逐段改参数,你会更容易建立工程直觉。

#!/usr/bin/env python3
"""
24-jwt-fastapi.py
用标准库从零实现 JWT(hmac/base64/json/time)
配合 http.server 实现登录/验证接口
展示 Token 生成、验证、过期检测的完整流程

用法:
  python3 24-jwt-fastapi.py --mode server   # 启动服务器
  python3 24-jwt-fastapi.py --mode demo     # 演示(需先启动服务器)
  python3 24-jwt-fastapi.py --mode jwt      # JWT 原理演示(离线)
"""

import argparse
import base64
import hashlib
import hmac
import http.server
import json
import time
import urllib.error
import urllib.request
from datetime import datetime
from typing import Any, Dict, Optional, Tuple

# ─── JWT 实现(零外部依赖) ───────────────────────────────────────────────────

SECRET_KEY = "nexdo-secret-2026"  # 生产环境应从环境变量读取


def _b64url_encode(data: bytes) -> str:
    """Base64URL 编码(无填充)"""
    return base64.urlsafe_b64encode(data).rstrip(b"=").decode("ascii")


def _b64url_decode(s: str) -> bytes:
    """Base64URL 解码(补齐填充)"""
    padding = 4 - len(s) % 4
    if padding != 4:
        s += "=" * padding
    return base64.urlsafe_b64decode(s)


class JWT:
    """最小化 JWT 实现,支持 HS256"""

    HEADER = _b64url_encode(json.dumps({"alg": "HS256", "typ": "JWT"}).encode())

    @classmethod
    def encode(cls, payload: Dict, expires_in: int = 3600) -> str:
        """
        生成 JWT Token
        :param payload: 自定义声明字典
        :param expires_in: 有效期(秒),默认 1 小时
        """
        now = int(time.time())
        full_payload = {**payload, "iat": now, "exp": now + expires_in}
        payload_b64 = _b64url_encode(json.dumps(full_payload, separators=(",", ":")).encode())
        signing_input = f"{cls.HEADER}.{payload_b64}"
        sig = hmac.new(
            SECRET_KEY.encode("utf-8"),
            signing_input.encode("utf-8"),
            hashlib.sha256
        ).digest()
        return f"{signing_input}.{_b64url_encode(sig)}"

    @classmethod
    def decode(cls, token: str) -> Optional[Dict]:
        """
        验证并解码 JWT Token
        :return: payload 字典,验证失败或过期返回 None
        """
        parts = token.split(".")
        if len(parts) != 3:
            return None

        header_b64, payload_b64, sig_b64 = parts
        signing_input = f"{header_b64}.{payload_b64}"

        # 验证签名
        expected_sig = hmac.new(
            SECRET_KEY.encode("utf-8"),
            signing_input.encode("utf-8"),
            hashlib.sha256
        ).digest()
        try:
            actual_sig = _b64url_decode(sig_b64)
        except Exception:
            return None

        if not hmac.compare_digest(expected_sig, actual_sig):
            return None  # 签名不匹配

        # 解码 Payload
        try:
            payload = json.loads(_b64url_decode(payload_b64))
        except Exception:
            return None

        # 检查过期
        if payload.get("exp", 0) < int(time.time()):
            return None  # Token 已过期

        return payload

    @classmethod
    def inspect(cls, token: str) -> Dict:
        """解码 Token 内容(不验证签名,用于调试)"""
        parts = token.split(".")
        if len(parts) != 3:
            return {"error": "格式错误"}
        try:
            header = json.loads(_b64url_decode(parts[0]))
            payload = json.loads(_b64url_decode(parts[1]))
            return {"header": header, "payload": payload, "signature": parts[2][:16] + "..."}
        except Exception as e:
            return {"error": str(e)}


# ─── 用户数据(内存) ─────────────────────────────────────────────────────────

_users = {
    "alice": {"password": "pass123", "role": "admin",  "id": 1},
    "bob":   {"password": "secret",  "role": "user",   "id": 2},
    "carol": {"password": "hello6",  "role": "user",   "id": 3},
}

# ─── HTTP 服务器 ──────────────────────────────────────────────────────────────

STATUS_TEXT = {200: "OK", 201: "Created", 400: "Bad Request",
               401: "Unauthorized", 403: "Forbidden", 404: "Not Found",
               405: "Method Not Allowed"}


class JWTHandler(http.server.BaseHTTPRequestHandler):
    def log_message(self, *args): pass

    def _send(self, status: int, body: Dict) -> None:
        payload = json.dumps(body, ensure_ascii=False).encode("utf-8")
        self.send_response(status)
        self.send_header("Content-Type", "application/json; charset=utf-8")
        self.send_header("Content-Length", str(len(payload)))
        self.end_headers()
        self.wfile.write(payload)

    def _read_body(self) -> Optional[Dict]:
        length = int(self.headers.get("Content-Length", 0))
        if not length:
            return {}
        try:
            return json.loads(self.rfile.read(length))
        except json.JSONDecodeError:
            return None

    def _get_token(self) -> Optional[str]:
        auth = self.headers.get("Authorization", "")
        if auth.startswith("Bearer "):
            return auth[7:]
        return None

    def _require_auth(self) -> Optional[Dict]:
        """验证 Token,返回 payload 或发送 401"""
        token = self._get_token()
        if not token:
            self._send(401, {"error": "缺少 Authorization 头"})
            return None
        payload = JWT.decode(token)
        if payload is None:
            self._send(401, {"error": "Token 无效或已过期"})
            return None
        return payload

    def do_POST(self) -> None:
        if self.path == "/api/auth/login":
            self._login()
        elif self.path == "/api/auth/refresh":
            self._refresh()
        else:
            self._send(404, {"error": "路径不存在"})

    def do_GET(self) -> None:
        if self.path == "/api/me":
            self._me()
        elif self.path == "/api/admin/users":
            self._admin_users()
        elif self.path == "/api/health":
            self._send(200, {"status": "ok"})
        else:
            self._send(404, {"error": "路径不存在"})

    def _login(self) -> None:
        body = self._read_body()
        if body is None:
            self._send(400, {"error": "请求体必须是 JSON"})
            return
        username = body.get("username", "").strip()
        password = body.get("password", "")
        user = _users.get(username)
        if not user or user["password"] != password:
            self._send(401, {"error": "用户名或密码错误"})
            return

        # 生成 Access Token(1小时)和 Refresh Token(7天)
        access_token = JWT.encode(
            {"user_id": user["id"], "username": username, "role": user["role"]},
            expires_in=3600
        )
        refresh_token = JWT.encode(
            {"user_id": user["id"], "username": username, "type": "refresh"},
            expires_in=7 * 24 * 3600
        )
        self._send(200, {
            "access_token": access_token,
            "refresh_token": refresh_token,
            "token_type": "Bearer",
            "expires_in": 3600,
        })
        print(f"  [POST /api/auth/login] 用户 {username} 登录成功")

    def _refresh(self) -> None:
        body = self._read_body()
        if body is None:
            self._send(400, {"error": "请求体必须是 JSON"})
            return
        refresh_token = body.get("refresh_token", "")
        payload = JWT.decode(refresh_token)
        if payload is None or payload.get("type") != "refresh":
            self._send(401, {"error": "Refresh Token 无效或已过期"})
            return
        username = payload["username"]
        user = _users.get(username)
        if not user:
            self._send(401, {"error": "用户不存在"})
            return
        new_token = JWT.encode(
            {"user_id": user["id"], "username": username, "role": user["role"]},
            expires_in=3600
        )
        self._send(200, {"access_token": new_token, "expires_in": 3600})

    def _me(self) -> None:
        payload = self._require_auth()
        if payload is None:
            return
        self._send(200, {
            "user_id": payload["user_id"],
            "username": payload["username"],
            "role": payload["role"],
            "token_expires_at": datetime.fromtimestamp(payload["exp"]).isoformat(),
        })

    def _admin_users(self) -> None:
        payload = self._require_auth()
        if payload is None:
            return
        if payload.get("role") != "admin":
            self._send(403, {"error": "需要管理员权限"})
            return
        users = [{"id": u["id"], "username": name, "role": u["role"]}
                 for name, u in _users.items()]
        self._send(200, {"users": users})


def run_server(port: int = 8004) -> None:
    print(f"🚀 JWT 鉴权服务器启动:http://127.0.0.1:{port}")
    print("   POST /api/auth/login | POST /api/auth/refresh")
    print("   GET  /api/me         | GET  /api/admin/users")
    print("   测试账号:alice/pass123(admin), bob/secret(user)\n")
    try:
        http.server.HTTPServer(("127.0.0.1", port), JWTHandler).serve_forever()
    except KeyboardInterrupt:
        print("\n服务器已停止。")

# ─── 演示客户端 ───────────────────────────────────────────────────────────────

def demo_client(base: str = "http://127.0.0.1:8004") -> None:
    def req(method: str, path: str, data: Dict = None,
            token: str = None) -> Tuple[int, Dict]:
        payload = json.dumps(data).encode() if data else None
        headers: Dict[str, str] = {}
        if payload:
            headers["Content-Type"] = "application/json"
        if token:
            headers["Authorization"] = f"Bearer {token}"
        r = urllib.request.Request(f"{base}{path}", data=payload,
                                   headers=headers, method=method)
        try:
            with urllib.request.urlopen(r) as resp:
                return resp.status, json.loads(resp.read())
        except urllib.error.HTTPError as e:
            return e.code, json.loads(e.read())

    print("\n=== JWT 鉴权演示 ===\n")

    # 1. 未携带 Token 访问受保护接口
    status, body = req("GET", "/api/me")
    print(f"[未登录 GET /api/me] HTTP {status}{body}")

    # 2. 错误密码登录
    status, body = req("POST", "/api/auth/login", {"username": "alice", "password": "wrong"})
    print(f"\n[错误密码登录] HTTP {status}{body}")

    # 3. 正确登录
    status, body = req("POST", "/api/auth/login", {"username": "alice", "password": "pass123"})
    print(f"\n[alice 登录] HTTP {status}")
    access_token = body["access_token"]
    refresh_token = body["refresh_token"]
    print(f"  Access Token: {access_token[:40]}...")
    print(f"  Refresh Token: {refresh_token[:40]}...")

    # 4. 解码 Token 内容
    print(f"\n[Token 内容解析]")
    info = JWT.inspect(access_token)
    print(f"  Header:  {info['header']}")
    print(f"  Payload: {info['payload']}")
    print(f"  Sig:     {info['signature']}")

    # 5. 携带 Token 访问 /api/me
    status, body = req("GET", "/api/me", token=access_token)
    print(f"\n[GET /api/me(已登录)] HTTP {status}{body}")

    # 6. 管理员访问 /api/admin/users
    status, body = req("GET", "/api/admin/users", token=access_token)
    print(f"\n[GET /api/admin/users(alice=admin)] HTTP {status}")
    if "users" in body:
        for u in body["users"]:
            print(f"  {u}")

    # 7. 普通用户访问管理员接口
    status, body = req("POST", "/api/auth/login", {"username": "bob", "password": "secret"})
    bob_token = body["access_token"]
    status, body = req("GET", "/api/admin/users", token=bob_token)
    print(f"\n[GET /api/admin/users(bob=user)] HTTP {status}{body}")

    # 8. 篡改 Token
    tampered = access_token[:-5] + "XXXXX"
    status, body = req("GET", "/api/me", token=tampered)
    print(f"\n[篡改 Token 访问 /api/me] HTTP {status}{body}")

    # 9. 用 Refresh Token 换新 Access Token
    status, body = req("POST", "/api/auth/refresh", {"refresh_token": refresh_token})
    print(f"\n[POST /api/auth/refresh] HTTP {status}")
    if "access_token" in body:
        print(f"  新 Access Token: {body['access_token'][:40]}...")

# ─── JWT 原理离线演示 ─────────────────────────────────────────────────────────

def demo_jwt_offline() -> None:
    print("\n=== JWT 原理演示(离线)===\n")

    # 生成 Token
    payload = {"user_id": 42, "username": "alice", "role": "admin"}
    token = JWT.encode(payload, expires_in=60)
    print(f"生成 Token:\n  {token}\n")

    # 解码
    info = JWT.inspect(token)
    print(f"Header:  {info['header']}")
    print(f"Payload: {info['payload']}")
    print(f"Sig:     {info['signature']}\n")

    # 验证
    decoded = JWT.decode(token)
    print(f"验证结果:{'✅ 有效' if decoded else '❌ 无效'}")
    if decoded:
        exp_str = datetime.fromtimestamp(decoded["exp"]).strftime("%Y-%m-%d %H:%M:%S")
        print(f"  用户: {decoded['username']},角色: {decoded['role']},过期: {exp_str}")

    # 篡改测试
    parts = token.split(".")
    fake_payload = _b64url_encode(
        json.dumps({"user_id": 42, "username": "alice", "role": "super_admin",
                    "iat": int(time.time()), "exp": int(time.time()) + 60}).encode()
    )
    tampered = f"{parts[0]}.{fake_payload}.{parts[2]}"
    result = JWT.decode(tampered)
    print(f"\n篡改 Payload(role→super_admin)后验证:{'✅ 有效' if result else '❌ 签名不匹配,拒绝'}")

    # 过期测试
    expired_token = JWT.encode({"user_id": 1}, expires_in=-1)  # 立即过期
    result = JWT.decode(expired_token)
    print(f"过期 Token 验证:{'✅ 有效' if result else '❌ 已过期,拒绝'}")

    # Token 生命周期表
    print(f"\n{'─'*55}")
    print(f"  {'Token 类型':<20} {'有效期':<12} {'用途'}")
    print(f"{'─'*55}")
    rows = [
        ("Access Token",   "15分钟~1小时", "携带在每次 API 请求头"),
        ("Refresh Token",  "7天~30天",     "仅用于换取新 Access Token"),
        ("Remember Token", "30天~90天",    "记住登录状态(可选)"),
    ]
    for name, ttl, usage in rows:
        print(f"  {name:<20} {ttl:<12} {usage}")
    print(f"{'─'*55}")

# ─── 入口 ─────────────────────────────────────────────────────────────────────

def main() -> None:
    parser = argparse.ArgumentParser(description="JWT 鉴权演示(零依赖)")
    parser.add_argument("--mode", choices=["server", "demo", "jwt"], default="jwt",
                        help="server=启动服务器, demo=演示客户端, jwt=JWT原理离线演示")
    parser.add_argument("--port", type=int, default=8004)
    args = parser.parse_args()

    if args.mode == "server":
        run_server(args.port)
    elif args.mode == "demo":
        demo_client(f"http://127.0.0.1:{args.port}")
    else:
        demo_jwt_offline()


if __name__ == "__main__":
    main()
$ python3 24-python-jwt-fastapi.py --mode jwt

=== JWT 原理演示(离线)===

[1] 生成 Token
  token: eyJhbGciOiAiSFMyNTYiLCAidHlwIjogIkpXVCJ9.eyJ1c2VyX2lkIjogMSwgInVzZXJuYW1lIjogImFsaWNlIiwgInJvbGUiOiAiYWRtaW4iLCAiZXhwIjogMTc3NjQ0NDA5MX0.xxx
  三段结构:header.payload.signature

[2] 验证 Token(decode)
  user_id=1, username=alice, role=admin

[3] 篡改检测
  ✅ 篡改被检测:签名验证失败

[4] 调试解码(inspect,不验签)
  header: {'alg': 'HS256', 'typ': 'JWT'}
  payload: {'user_id': 1, 'username': 'alice', 'role': 'admin', 'exp': ...}

$ python3 24-python-jwt-fastapi.py --mode demo

=== JWT 鉴权演示 ===

  ❌ 未登录访问 /api/me → 401 {"error": "缺少 Authorization 头"}
  ❌ 错误密码登录 → 401 {"error": "用户名或密码错误"}
  ✅ 正确登录 → 200 {"access_token": "eyJ...", "token_type": "Bearer"}
  ✅ 登录后访问 /api/me → 200 {"username": "alice", "role": "admin"}
  ✅ 刷新 Token → 200 {"access_token": "eyJ...(新 token)"}

小结

概念 一句话记忆
Base64URL Base64 变体,+→-/→_,去掉 =,可安全放 URL
JWT 三段 header.payload.signature,前两段是编码不是加密
HMAC-SHA256 用密钥对 "header.payload" 签名,任何字符改变签名就变
hmac.compare_digest 时序安全比较,防止通过响应时间推断签名内容
exp 字段 Unix 时间戳,JWT.decode 自动检查是否过期
JWT.inspect 不验签只解码,用于调试,生产环境不能用于鉴权
Access Token 短期(1小时),用于 API 调用
Refresh Token 长期(7天),用于换取新的 Access Token
Authorization: Bearer JWT 的标准传输方式,比 Cookie 更适合前后端分离

⏱ NexDo Time(5 分钟)

挑战:给 JWT 加一个 nbf(Not Before)字段,让 Token 在指定时间之前无效。

具体步骤:

  1. JWT.encode 里加 nbf 参数(默认为当前时间),写入 payload
  2. JWT.decode 里加检查:如果 time.time() < payload["nbf"],抛出 ValueError("Token 尚未生效")
  3. 测试:生成一个 nbf = time.time() + 60(60秒后才生效)的 Token,立刻 decode 应该失败

Don’t wait for next time, do it in the next moment.