24 · JWT 鉴权:Token 签发与验证原理
🔗 知识图谱导航:阅读本文前,建议先掌握《21 · 表单、会话与中间件原理》中的 Cookie/Session 机制——本文是它的替代方案:JWT 把用户状态编码进 Token,服务端不需要存储 Session,适合前后端分离和微服务场景。
运行环境:Python 3.12+ 标准库,零额外依赖,直接运行。
极客解析:JWT 的本质是"带防伪钢印的游乐园门票":门票上印着你的信息(payload),钢印是用密钥签的名(signature)。验票员不需要查数据库,只需要验钢印——这就是"无状态鉴权"的核心。
JWT 结构
eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9 ← header(Base64URL 编码)
.
eyJzdWIiOiJhbGljZSIsImV4cCI6MTc3NjQ0fQ ← payload(Base64URL 编码)
.
SflKxwRJSMeKKF2QT4fwpMeJf36POk6yJV_adQssw5c ← signature(HMAC-SHA256)
header = {"alg": "HS256", "typ": "JWT"}
payload = {"sub": "alice", "exp": 1776444000, "role": "admin"}
signature = HMAC-SHA256(secret, "header.payload")
Session vs JWT
Session(有状态):
客户端 → Cookie: sessionid=abc → 服务端查 _sessions["abc"] → 得到用户信息
缺点:服务端需要存储所有 Session,水平扩展时需要共享 Session 存储
JWT(无状态):
客户端 → Authorization: Bearer <token> → 服务端验签 → 从 payload 得到用户信息
优点:服务端不存储任何状态,任意节点都能验证
缺点:Token 无法主动吊销(只能等过期)
安全要点
hmac.compare_digest 时序安全比较,防止通过响应时间推断签名内容
HttpOnly Cookie 存 Token 防 XSS(比 localStorage 更安全)
短过期时间 Access Token 1小时,Refresh Token 7天
HTTPS 传输 防止 Token 被中间人截获
不在 payload 存密码 payload 只是 Base64 编码,不是加密
步步为营:核心逻辑自适应拆解
这一篇的核心是 JWT 的三层结构:Base64URL 编码(可逆)→ HMAC-SHA256 签名(不可逆)→ 时序安全验证(防攻击)。下面每一步都聚焦一个机制,零依赖可直接运行。
Step 1:用 _b64url_encode/_b64url_decode 理解 JWT 的基础编码层
痛点与机制:
Base64URL 是 Base64 的变体:把 + 换成 -,/ 换成 _,去掉末尾的 = 填充。这样编码结果可以安全地放在 URL 里,不需要 URL 编码。注意:Base64URL 是编码不是加密——任何人都能解码 JWT 的 header 和 payload,所以 payload 里绝对不能存密码或敏感信息。
核心源码(逐字来自文末完整源码):
def _b64url_encode(data: bytes) -> str:
"""Base64URL 编码(无填充)"""
return base64.urlsafe_b64encode(data).rstrip(b"=").decode("ascii")
def _b64url_decode(s: str) -> bytes:
"""Base64URL 解码(补齐填充)"""
padding = 4 - len(s) % 4
if padding != 4:
s += "=" * padding
return base64.urlsafe_b64decode(s)
可运行演示(补齐 Mock 数据与 print 反馈):
import base64
import json
def _b64url_encode(data: bytes) -> str:
"""Base64URL 编码(无填充)"""
return base64.urlsafe_b64encode(data).rstrip(b"=").decode("ascii")
def _b64url_decode(s: str) -> bytes:
"""Base64URL 解码(补齐填充)"""
padding = 4 - len(s) % 4
if padding != 4:
s += "=" * padding
return base64.urlsafe_b64decode(s)
header = {"alg": "HS256", "typ": "JWT"}
payload = {"sub": "alice", "role": "admin"}
header_b64 = _b64url_encode(json.dumps(header).encode())
payload_b64 = _b64url_encode(json.dumps(payload).encode())
print("header 编码:", header_b64)
print("payload 编码:", payload_b64)
print("header 解码:", json.loads(_b64url_decode(header_b64)))
print("payload 解码:", json.loads(_b64url_decode(payload_b64)))
Step 2:用 JWT.encode 实现 HMAC-SHA256 签名,生成三段式 Token
痛点与机制:
JWT.encode 的签名过程:① 把 header 和 payload 分别 Base64URL 编码;② 用 hmac.new(secret, "header.payload", sha256) 计算签名;③ 把三段用 . 拼接。exp(expiration)是 JWT 标准字段,值是 Unix 时间戳,time.time() + expires_in 计算过期时间。签名的输入是 "header.payload" 字符串,任何一个字符改变都会导致签名不匹配。
核心源码(逐字来自文末完整源码):
@classmethod
def encode(cls, payload: Dict, expires_in: int = 3600) -> str:
"""
生成 JWT Token
:param payload: 自定义声明字典
:param expires_in: 有效期(秒),默认 1 小时
"""
now = int(time.time())
full_payload = {**payload, "iat": now, "exp": now + expires_in}
payload_b64 = _b64url_encode(json.dumps(full_payload, separators=(",", ":")).encode())
signing_input = f"{cls.HEADER}.{payload_b64}"
sig = hmac.new(
SECRET_KEY.encode("utf-8"),
signing_input.encode("utf-8"),
hashlib.sha256
).digest()
return f"{signing_input}.{_b64url_encode(sig)}"
可运行演示(补齐 Mock 数据与 print 反馈):
import base64
import hashlib
import hmac
import json
import time
from typing import Dict
SECRET_KEY = "nexdo-secret-2026"
def _b64url_encode(data: bytes) -> str:
return base64.urlsafe_b64encode(data).rstrip(b"=").decode("ascii")
class JWT:
HEADER = _b64url_encode(json.dumps({"alg": "HS256", "typ": "JWT"}).encode())
@classmethod
def encode(cls, payload: Dict, expires_in: int = 3600) -> str:
now = int(time.time())
full_payload = {**payload, "iat": now, "exp": now + expires_in}
payload_b64 = _b64url_encode(json.dumps(full_payload, separators=(",", ":")).encode())
signing_input = f"{cls.HEADER}.{payload_b64}"
sig = hmac.new(SECRET_KEY.encode("utf-8"), signing_input.encode("utf-8"), hashlib.sha256).digest()
return f"{signing_input}.{_b64url_encode(sig)}"
token = JWT.encode({"sub": "alice", "role": "admin"}, expires_in=3600)
header_b64, payload_b64, sig_b64 = token.split(".")
print("JWT 三段式:")
print("1 header =", header_b64)
print("2 payload =", payload_b64)
print("3 signature=", sig_b64[:24] + "...")
print("完整 token 前 80 字符:", token[:80] + "...")
Step 3:用 hmac.compare_digest 做时序安全验证,防止签名伪造
痛点与机制:
hmac.compare_digest 是时序安全比较——普通的 == 在字符串不同时会提前返回,攻击者可以通过测量响应时间推断签名的正确字符数(时序攻击)。compare_digest 始终比较完整字符串,响应时间不泄露任何信息。JWT.decode 还检查 exp 字段:如果签名不匹配、格式错误或 Token 已过期,就返回 None。这个设计让调用方只需要判断“有没有拿到 payload”。
核心源码(逐字来自文末完整源码):
@classmethod
def decode(cls, token: str) -> Optional[Dict]:
"""
验证并解码 JWT Token
:return: payload 字典,验证失败或过期返回 None
"""
parts = token.split(".")
if len(parts) != 3:
return None
header_b64, payload_b64, sig_b64 = parts
signing_input = f"{header_b64}.{payload_b64}"
# 验证签名
expected_sig = hmac.new(
SECRET_KEY.encode("utf-8"),
signing_input.encode("utf-8"),
hashlib.sha256
).digest()
try:
actual_sig = _b64url_decode(sig_b64)
except Exception:
return None
if not hmac.compare_digest(expected_sig, actual_sig):
return None # 签名不匹配
# 解码 Payload
try:
payload = json.loads(_b64url_decode(payload_b64))
except Exception:
return None
# 检查过期
if payload.get("exp", 0) < int(time.time()):
return None # Token 已过期
return payload
可运行演示(补齐 Mock 数据与 print 反馈):
import base64
import hashlib
import hmac
import json
import time
from typing import Dict, Optional
SECRET_KEY = "nexdo-secret-2026"
def _b64url_encode(data: bytes) -> str:
return base64.urlsafe_b64encode(data).rstrip(b"=").decode("ascii")
def _b64url_decode(s: str) -> bytes:
padding = 4 - len(s) % 4
if padding != 4:
s += "=" * padding
return base64.urlsafe_b64decode(s)
class JWT:
HEADER = _b64url_encode(json.dumps({"alg": "HS256", "typ": "JWT"}).encode())
@classmethod
def encode(cls, payload: Dict, expires_in: int = 3600) -> str:
now = int(time.time())
full_payload = {**payload, "iat": now, "exp": now + expires_in}
payload_b64 = _b64url_encode(json.dumps(full_payload, separators=(",", ":")).encode())
signing_input = f"{cls.HEADER}.{payload_b64}"
sig = hmac.new(SECRET_KEY.encode(), signing_input.encode(), hashlib.sha256).digest()
return f"{signing_input}.{_b64url_encode(sig)}"
@classmethod
def decode(cls, token: str) -> Optional[Dict]:
parts = token.split(".")
if len(parts) != 3:
return None
header_b64, payload_b64, sig_b64 = parts
signing_input = f"{header_b64}.{payload_b64}"
expected_sig = hmac.new(SECRET_KEY.encode(), signing_input.encode(), hashlib.sha256).digest()
try:
actual_sig = _b64url_decode(sig_b64)
payload = json.loads(_b64url_decode(payload_b64))
except Exception:
return None
if not hmac.compare_digest(expected_sig, actual_sig):
return None
if payload.get("exp", 0) < int(time.time()):
return None
return payload
valid = JWT.encode({"sub": "alice"}, expires_in=60)
tampered = valid[:-4] + "AAAA"
expired = JWT.encode({"sub": "bob"}, expires_in=-1)
print("合法 token:", JWT.decode(valid))
print("篡改 token:", JWT.decode(tampered))
print("过期 token:", JWT.decode(expired))
Step 4:用 JWT.inspect 实现不验签的调试解码
痛点与机制:
JWT.inspect 只解码 payload,不验证签名——用于调试和日志,让你看到 Token 里存了什么。生产环境永远不能用 inspect 替代 decode 做鉴权,否则任何人都能伪造 payload 欺骗日志和页面展示;真正授权必须用 decode 验签。这个方法的存在是为了演示"Base64URL 是编码不是加密"——即使不知道密钥,也能看到 payload 内容。
核心源码(逐字来自文末完整源码):
@classmethod
def inspect(cls, token: str) -> Dict:
"""解码 Token 内容(不验证签名,用于调试)"""
parts = token.split(".")
if len(parts) != 3:
return {"error": "格式错误"}
try:
header = json.loads(_b64url_decode(parts[0]))
payload = json.loads(_b64url_decode(parts[1]))
return {"header": header, "payload": payload, "signature": parts[2][:16] + "..."}
except Exception as e:
return {"error": str(e)}
可运行演示(补齐 Mock 数据与 print 反馈):
import base64
import hashlib
import hmac
import json
import time
from typing import Dict
SECRET_KEY = "nexdo-secret-2026"
def _b64url_encode(data: bytes) -> str:
return base64.urlsafe_b64encode(data).rstrip(b"=").decode("ascii")
def _b64url_decode(s: str) -> bytes:
padding = 4 - len(s) % 4
if padding != 4:
s += "=" * padding
return base64.urlsafe_b64decode(s)
class JWT:
HEADER = _b64url_encode(json.dumps({"alg": "HS256", "typ": "JWT"}).encode())
@classmethod
def encode(cls, payload: Dict, expires_in: int = 3600) -> str:
now = int(time.time())
full_payload = {**payload, "iat": now, "exp": now + expires_in}
payload_b64 = _b64url_encode(json.dumps(full_payload, separators=(",", ":")).encode())
signing_input = f"{cls.HEADER}.{payload_b64}"
sig = hmac.new(SECRET_KEY.encode(), signing_input.encode(), hashlib.sha256).digest()
return f"{signing_input}.{_b64url_encode(sig)}"
@classmethod
def inspect(cls, token: str) -> Dict:
parts = token.split(".")
if len(parts) != 3:
return {"error": "格式错误"}
try:
header = json.loads(_b64url_decode(parts[0]))
payload = json.loads(_b64url_decode(parts[1]))
return {"header": header, "payload": payload, "signature": parts[2][:16] + "..."}
except Exception as e:
return {"error": str(e)}
token = JWT.encode({"sub": "alice", "role": "admin"})
info = JWT.inspect(token)
print("inspect 只负责看内容,不负责鉴权:")
print("header:", info["header"])
print("payload:", info["payload"])
print("signature 摘要:", info["signature"])
Step 5:用 JWTHandler 启动完整的 JWT 鉴权服务,测试登录接口
痛点与机制:
JWTHandler 实现了完整的 JWT 鉴权流程:POST /api/auth/login 验证用户名密码,成功后签发 access_token(1小时)和 refresh_token(7天)。access_token 用于日常 API 调用,refresh_token 用于在 access_token 过期后换取新的 access_token,避免用户频繁重新登录。
核心源码(逐字来自文末完整源码):
class JWTHandler(http.server.BaseHTTPRequestHandler):
def log_message(self, *args): pass
def _send(self, status: int, body: Dict) -> None:
payload = json.dumps(body, ensure_ascii=False).encode("utf-8")
self.send_response(status)
self.send_header("Content-Type", "application/json; charset=utf-8")
self.send_header("Content-Length", str(len(payload)))
self.end_headers()
self.wfile.write(payload)
def _read_body(self) -> Optional[Dict]:
length = int(self.headers.get("Content-Length", 0))
if not length:
return {}
try:
return json.loads(self.rfile.read(length))
except json.JSONDecodeError:
return None
def _get_token(self) -> Optional[str]:
auth = self.headers.get("Authorization", "")
if auth.startswith("Bearer "):
return auth[7:]
return None
def _require_auth(self) -> Optional[Dict]:
"""验证 Token,返回 payload 或发送 401"""
token = self._get_token()
if not token:
self._send(401, {"error": "缺少 Authorization 头"})
return None
payload = JWT.decode(token)
if payload is None:
self._send(401, {"error": "Token 无效或已过期"})
return None
return payload
def do_POST(self) -> None:
if self.path == "/api/auth/login":
self._login()
elif self.path == "/api/auth/refresh":
self._refresh()
else:
self._send(404, {"error": "路径不存在"})
def do_GET(self) -> None:
if self.path == "/api/me":
self._me()
elif self.path == "/api/admin/users":
self._admin_users()
elif self.path == "/api/health":
self._send(200, {"status": "ok"})
else:
self._send(404, {"error": "路径不存在"})
def _login(self) -> None:
body = self._read_body()
if body is None:
self._send(400, {"error": "请求体必须是 JSON"})
return
username = body.get("username", "").strip()
password = body.get("password", "")
user = _users.get(username)
if not user or user["password"] != password:
self._send(401, {"error": "用户名或密码错误"})
return
# 生成 Access Token(1小时)和 Refresh Token(7天)
access_token = JWT.encode(
{"user_id": user["id"], "username": username, "role": user["role"]},
expires_in=3600
)
refresh_token = JWT.encode(
{"user_id": user["id"], "username": username, "type": "refresh"},
expires_in=7 * 24 * 3600
)
self._send(200, {
"access_token": access_token,
"refresh_token": refresh_token,
"token_type": "Bearer",
"expires_in": 3600,
})
print(f" [POST /api/auth/login] 用户 {username} 登录成功")
def _refresh(self) -> None:
body = self._read_body()
if body is None:
self._send(400, {"error": "请求体必须是 JSON"})
return
refresh_token = body.get("refresh_token", "")
payload = JWT.decode(refresh_token)
if payload is None or payload.get("type") != "refresh":
self._send(401, {"error": "Refresh Token 无效或已过期"})
return
username = payload["username"]
user = _users.get(username)
if not user:
self._send(401, {"error": "用户不存在"})
return
new_token = JWT.encode(
{"user_id": user["id"], "username": username, "role": user["role"]},
expires_in=3600
)
self._send(200, {"access_token": new_token, "expires_in": 3600})
def _me(self) -> None:
payload = self._require_auth()
if payload is None:
return
self._send(200, {
"user_id": payload["user_id"],
"username": payload["username"],
"role": payload["role"],
"token_expires_at": datetime.fromtimestamp(payload["exp"]).isoformat(),
})
def _admin_users(self) -> None:
payload = self._require_auth()
if payload is None:
return
if payload.get("role") != "admin":
self._send(403, {"error": "需要管理员权限"})
return
users = [{"id": u["id"], "username": name, "role": u["role"]}
for name, u in _users.items()]
self._send(200, {"users": users})
可运行演示(补齐 Mock 数据与 print 反馈):
import base64
import hashlib
import hmac
import json
import time
from typing import Dict
SECRET_KEY = "nexdo-secret-2026"
_users = {
"alice": {"password": "pass123", "role": "admin", "id": 1},
"bob": {"password": "secret", "role": "user", "id": 2},
}
def b64(data: bytes) -> str:
return base64.urlsafe_b64encode(data).rstrip(b"=").decode()
def sign(payload: Dict, expires_in: int) -> str:
header = b64(json.dumps({"alg": "HS256", "typ": "JWT"}).encode())
now = int(time.time())
payload_b64 = b64(json.dumps({**payload, "iat": now, "exp": now + expires_in}, separators=(",", ":")).encode())
msg = f"{header}.{payload_b64}"
sig = b64(hmac.new(SECRET_KEY.encode(), msg.encode(), hashlib.sha256).digest())
return f"{msg}.{sig}"
def login(username: str, password: str) -> tuple[int, Dict]:
user = _users.get(username)
if not user or user["password"] != password:
return 401, {"error": "用户名或密码错误"}
access_token = sign({"user_id": user["id"], "username": username, "role": user["role"]}, 3600)
refresh_token = sign({"user_id": user["id"], "username": username, "type": "refresh"}, 7 * 24 * 3600)
return 200, {"access_token": access_token, "refresh_token": refresh_token, "token_type": "Bearer", "expires_in": 3600}
for username, password in [("alice", "pass123"), ("alice", "wrong")]:
status, body = login(username, password)
print(f"POST /api/auth/login {username!r} -> {status}")
print("响应字段:", list(body.keys()))
if "access_token" in body:
print("access_token 前 40 字符:", body["access_token"][:40] + "...")
Step 6:用 Authorization: Bearer 头携带 Token,测试 /api/me 鉴权
痛点与机制:
_require_auth 是鉴权中间件:从 Authorization 请求头里提取 Bearer <token>,调用 JWT.decode 验证签名和过期时间,把 payload 写入上下文。不带 Token 或 Token 无效时返回 401。这个模式对应 FastAPI 的 Depends(get_current_user) 依赖注入——把鉴权逻辑抽成可复用的函数,每个需要鉴权的路由都调用它。
核心源码(逐字来自文末完整源码):
class JWTHandler(http.server.BaseHTTPRequestHandler):
def log_message(self, *args): pass
def _send(self, status: int, body: Dict) -> None:
payload = json.dumps(body, ensure_ascii=False).encode("utf-8")
self.send_response(status)
self.send_header("Content-Type", "application/json; charset=utf-8")
self.send_header("Content-Length", str(len(payload)))
self.end_headers()
self.wfile.write(payload)
def _read_body(self) -> Optional[Dict]:
length = int(self.headers.get("Content-Length", 0))
if not length:
return {}
try:
return json.loads(self.rfile.read(length))
except json.JSONDecodeError:
return None
def _get_token(self) -> Optional[str]:
auth = self.headers.get("Authorization", "")
if auth.startswith("Bearer "):
return auth[7:]
return None
def _require_auth(self) -> Optional[Dict]:
"""验证 Token,返回 payload 或发送 401"""
token = self._get_token()
if not token:
self._send(401, {"error": "缺少 Authorization 头"})
return None
payload = JWT.decode(token)
if payload is None:
self._send(401, {"error": "Token 无效或已过期"})
return None
return payload
def do_POST(self) -> None:
if self.path == "/api/auth/login":
self._login()
elif self.path == "/api/auth/refresh":
self._refresh()
else:
self._send(404, {"error": "路径不存在"})
def do_GET(self) -> None:
if self.path == "/api/me":
self._me()
elif self.path == "/api/admin/users":
self._admin_users()
elif self.path == "/api/health":
self._send(200, {"status": "ok"})
else:
self._send(404, {"error": "路径不存在"})
def _login(self) -> None:
body = self._read_body()
if body is None:
self._send(400, {"error": "请求体必须是 JSON"})
return
username = body.get("username", "").strip()
password = body.get("password", "")
user = _users.get(username)
if not user or user["password"] != password:
self._send(401, {"error": "用户名或密码错误"})
return
# 生成 Access Token(1小时)和 Refresh Token(7天)
access_token = JWT.encode(
{"user_id": user["id"], "username": username, "role": user["role"]},
expires_in=3600
)
refresh_token = JWT.encode(
{"user_id": user["id"], "username": username, "type": "refresh"},
expires_in=7 * 24 * 3600
)
self._send(200, {
"access_token": access_token,
"refresh_token": refresh_token,
"token_type": "Bearer",
"expires_in": 3600,
})
print(f" [POST /api/auth/login] 用户 {username} 登录成功")
def _refresh(self) -> None:
body = self._read_body()
if body is None:
self._send(400, {"error": "请求体必须是 JSON"})
return
refresh_token = body.get("refresh_token", "")
payload = JWT.decode(refresh_token)
if payload is None or payload.get("type") != "refresh":
self._send(401, {"error": "Refresh Token 无效或已过期"})
return
username = payload["username"]
user = _users.get(username)
if not user:
self._send(401, {"error": "用户不存在"})
return
new_token = JWT.encode(
{"user_id": user["id"], "username": username, "role": user["role"]},
expires_in=3600
)
self._send(200, {"access_token": new_token, "expires_in": 3600})
def _me(self) -> None:
payload = self._require_auth()
if payload is None:
return
self._send(200, {
"user_id": payload["user_id"],
"username": payload["username"],
"role": payload["role"],
"token_expires_at": datetime.fromtimestamp(payload["exp"]).isoformat(),
})
def _admin_users(self) -> None:
payload = self._require_auth()
if payload is None:
return
if payload.get("role") != "admin":
self._send(403, {"error": "需要管理员权限"})
return
users = [{"id": u["id"], "username": name, "role": u["role"]}
for name, u in _users.items()]
self._send(200, {"users": users})
可运行演示(补齐 Mock 数据与 print 反馈):
import base64
import hashlib
import hmac
import json
import time
from typing import Dict, Optional
SECRET_KEY = "nexdo-secret-2026"
def enc(data: bytes) -> str:
return base64.urlsafe_b64encode(data).rstrip(b"=").decode()
def dec(s: str) -> bytes:
pad = 4 - len(s) % 4
if pad != 4:
s += "=" * pad
return base64.urlsafe_b64decode(s)
def issue(payload: Dict, expires_in: int = 3600) -> str:
header = enc(json.dumps({"alg": "HS256", "typ": "JWT"}).encode())
now = int(time.time())
payload_b64 = enc(json.dumps({**payload, "iat": now, "exp": now + expires_in}, separators=(",", ":")).encode())
msg = f"{header}.{payload_b64}"
sig = enc(hmac.new(SECRET_KEY.encode(), msg.encode(), hashlib.sha256).digest())
return f"{msg}.{sig}"
def verify(token: str) -> Optional[Dict]:
try:
h, p, s = token.split(".")
msg = f"{h}.{p}"
expected = hmac.new(SECRET_KEY.encode(), msg.encode(), hashlib.sha256).digest()
if not hmac.compare_digest(expected, dec(s)):
return None
payload = json.loads(dec(p))
return payload if payload["exp"] >= int(time.time()) else None
except Exception:
return None
def api_me(authorization: str) -> tuple[int, Dict]:
if not authorization.startswith("Bearer "):
return 401, {"error": "缺少 Authorization 头"}
payload = verify(authorization[7:])
if not payload:
return 401, {"error": "Token 无效或已过期"}
return 200, {"user_id": payload["user_id"], "username": payload["username"], "role": payload["role"]}
token = issue({"user_id": 1, "username": "alice", "role": "admin"})
for header in [f"Bearer {token}", token, "Bearer bad.token.value"]:
print("Authorization:", header[:32] + "...")
print("GET /api/me ->", api_me(header))
Step 7:用 demo_jwt_offline 演示完整的 JWT 生命周期
痛点与机制:
demo_jwt_offline 不启动服务器,直接调用 JWT.encode/JWT.decode/JWT.inspect,演示 Token 的完整生命周期:生成 → 解码 → 篡改检测 → 过期检测。这是理解 JWT 原理的最快路径——不需要网络,不需要服务器,只需要看代码输出。
核心源码(逐字来自文末完整源码):
def demo_jwt_offline() -> None:
print("\n=== JWT 原理演示(离线)===\n")
# 生成 Token
payload = {"user_id": 42, "username": "alice", "role": "admin"}
token = JWT.encode(payload, expires_in=60)
print(f"生成 Token:\n {token}\n")
# 解码
info = JWT.inspect(token)
print(f"Header: {info['header']}")
print(f"Payload: {info['payload']}")
print(f"Sig: {info['signature']}\n")
# 验证
decoded = JWT.decode(token)
print(f"验证结果:{'✅ 有效' if decoded else '❌ 无效'}")
if decoded:
exp_str = datetime.fromtimestamp(decoded["exp"]).strftime("%Y-%m-%d %H:%M:%S")
print(f" 用户: {decoded['username']},角色: {decoded['role']},过期: {exp_str}")
# 篡改测试
parts = token.split(".")
fake_payload = _b64url_encode(
json.dumps({"user_id": 42, "username": "alice", "role": "super_admin",
"iat": int(time.time()), "exp": int(time.time()) + 60}).encode()
)
tampered = f"{parts[0]}.{fake_payload}.{parts[2]}"
result = JWT.decode(tampered)
print(f"\n篡改 Payload(role→super_admin)后验证:{'✅ 有效' if result else '❌ 签名不匹配,拒绝'}")
# 过期测试
expired_token = JWT.encode({"user_id": 1}, expires_in=-1) # 立即过期
result = JWT.decode(expired_token)
print(f"过期 Token 验证:{'✅ 有效' if result else '❌ 已过期,拒绝'}")
# Token 生命周期表
print(f"\n{'─'*55}")
print(f" {'Token 类型':<20} {'有效期':<12} {'用途'}")
print(f"{'─'*55}")
rows = [
("Access Token", "15分钟~1小时", "携带在每次 API 请求头"),
("Refresh Token", "7天~30天", "仅用于换取新 Access Token"),
("Remember Token", "30天~90天", "记住登录状态(可选)"),
]
for name, ttl, usage in rows:
print(f" {name:<20} {ttl:<12} {usage}")
print(f"{'─'*55}")
可运行演示(补齐 Mock 数据与 print 反馈):
import base64
import hashlib
import hmac
import json
import time
from typing import Dict, Optional
SECRET_KEY = "nexdo-secret-2026"
def b64(data: bytes) -> str:
return base64.urlsafe_b64encode(data).rstrip(b"=").decode()
def unb64(s: str) -> bytes:
pad = 4 - len(s) % 4
if pad != 4:
s += "=" * pad
return base64.urlsafe_b64decode(s)
def encode(payload: Dict, expires_in: int = 3600) -> str:
header = b64(json.dumps({"alg": "HS256", "typ": "JWT"}).encode())
now = int(time.time())
payload_b64 = b64(json.dumps({**payload, "iat": now, "exp": now + expires_in}, separators=(",", ":")).encode())
msg = f"{header}.{payload_b64}"
sig = b64(hmac.new(SECRET_KEY.encode(), msg.encode(), hashlib.sha256).digest())
return f"{msg}.{sig}"
def decode(token: str) -> Optional[Dict]:
try:
h, p, s = token.split(".")
expected = hmac.new(SECRET_KEY.encode(), f"{h}.{p}".encode(), hashlib.sha256).digest()
if not hmac.compare_digest(expected, unb64(s)):
return None
payload = json.loads(unb64(p))
return payload if payload["exp"] >= int(time.time()) else None
except Exception:
return None
print("=== JWT 原理演示(离线)===")
token = encode({"user_id": 1, "username": "alice", "role": "admin"}, expires_in=60)
print("1. 签发 token:", token[:60] + "...")
print("2. 调试解码 payload:", json.loads(unb64(token.split('.')[1])))
print("3. 验签结果:", decode(token))
print("4. 篡改签名后:", decode(token[:-3] + "xxx"))
print("5. 过期 token:", decode(encode({"username": "bob"}, expires_in=-1)))
Step 8:用 main 做 server/demo/jwt 三种模式的 CLI 总入口
痛点与机制:
main 用 argparse 做 CLI 入口:--mode server 启动服务器,--mode demo 发请求演示完整的登录→鉴权→刷新流程,--mode jwt 运行离线 JWT 原理演示。三种模式覆盖了 JWT 的三个学习层次:原理理解(jwt)→ 接口测试(demo)→ 生产部署(server)。
核心源码(逐字来自文末完整源码):
def main() -> None:
parser = argparse.ArgumentParser(description="JWT 鉴权演示(零依赖)")
parser.add_argument("--mode", choices=["server", "demo", "jwt"], default="jwt",
help="server=启动服务器, demo=演示客户端, jwt=JWT原理离线演示")
parser.add_argument("--port", type=int, default=8004)
args = parser.parse_args()
if args.mode == "server":
run_server(args.port)
elif args.mode == "demo":
demo_client(f"http://127.0.0.1:{args.port}")
else:
demo_jwt_offline()
可运行演示(补齐 Mock 数据与 print 反馈):
import argparse
import sys
def run_server(port: int = 8004) -> None:
print(f"server 模式: 将启动 JWT 鉴权服务 http://127.0.0.1:{port}")
def demo_client(base: str = "http://127.0.0.1:8004") -> None:
print(f"demo 模式: 将测试 {base}/api/auth/login、/api/me、/api/admin/users")
def demo_jwt_offline() -> None:
print("jwt 模式: 离线演示签发、inspect、decode、篡改检测、过期检测")
def main() -> None:
parser = argparse.ArgumentParser(description="JWT Token 签发与验证演示")
parser.add_argument("--mode", choices=["server", "demo", "jwt"], default="jwt")
parser.add_argument("--port", type=int, default=8004)
args = parser.parse_args()
if args.mode == "server":
run_server(args.port)
elif args.mode == "demo":
demo_client(f"http://127.0.0.1:{args.port}")
else:
demo_jwt_offline()
for mode in ["jwt", "server", "demo"]:
sys.argv = ["prog", "--mode", mode, "--port", "18084"]
print(f">>> python3 24-python-jwt-fastapi.py --mode {mode}")
main()
极客实战:完整源码与运行
现在,把上面的积木拼起来,将以下完整代码放进你的编辑器,运行它。先看整体闭环,再回头逐段改参数,你会更容易建立工程直觉。
#!/usr/bin/env python3
"""
24-jwt-fastapi.py
用标准库从零实现 JWT(hmac/base64/json/time)
配合 http.server 实现登录/验证接口
展示 Token 生成、验证、过期检测的完整流程
用法:
python3 24-jwt-fastapi.py --mode server # 启动服务器
python3 24-jwt-fastapi.py --mode demo # 演示(需先启动服务器)
python3 24-jwt-fastapi.py --mode jwt # JWT 原理演示(离线)
"""
import argparse
import base64
import hashlib
import hmac
import http.server
import json
import time
import urllib.error
import urllib.request
from datetime import datetime
from typing import Any, Dict, Optional, Tuple
# ─── JWT 实现(零外部依赖) ───────────────────────────────────────────────────
SECRET_KEY = "nexdo-secret-2026" # 生产环境应从环境变量读取
def _b64url_encode(data: bytes) -> str:
"""Base64URL 编码(无填充)"""
return base64.urlsafe_b64encode(data).rstrip(b"=").decode("ascii")
def _b64url_decode(s: str) -> bytes:
"""Base64URL 解码(补齐填充)"""
padding = 4 - len(s) % 4
if padding != 4:
s += "=" * padding
return base64.urlsafe_b64decode(s)
class JWT:
"""最小化 JWT 实现,支持 HS256"""
HEADER = _b64url_encode(json.dumps({"alg": "HS256", "typ": "JWT"}).encode())
@classmethod
def encode(cls, payload: Dict, expires_in: int = 3600) -> str:
"""
生成 JWT Token
:param payload: 自定义声明字典
:param expires_in: 有效期(秒),默认 1 小时
"""
now = int(time.time())
full_payload = {**payload, "iat": now, "exp": now + expires_in}
payload_b64 = _b64url_encode(json.dumps(full_payload, separators=(",", ":")).encode())
signing_input = f"{cls.HEADER}.{payload_b64}"
sig = hmac.new(
SECRET_KEY.encode("utf-8"),
signing_input.encode("utf-8"),
hashlib.sha256
).digest()
return f"{signing_input}.{_b64url_encode(sig)}"
@classmethod
def decode(cls, token: str) -> Optional[Dict]:
"""
验证并解码 JWT Token
:return: payload 字典,验证失败或过期返回 None
"""
parts = token.split(".")
if len(parts) != 3:
return None
header_b64, payload_b64, sig_b64 = parts
signing_input = f"{header_b64}.{payload_b64}"
# 验证签名
expected_sig = hmac.new(
SECRET_KEY.encode("utf-8"),
signing_input.encode("utf-8"),
hashlib.sha256
).digest()
try:
actual_sig = _b64url_decode(sig_b64)
except Exception:
return None
if not hmac.compare_digest(expected_sig, actual_sig):
return None # 签名不匹配
# 解码 Payload
try:
payload = json.loads(_b64url_decode(payload_b64))
except Exception:
return None
# 检查过期
if payload.get("exp", 0) < int(time.time()):
return None # Token 已过期
return payload
@classmethod
def inspect(cls, token: str) -> Dict:
"""解码 Token 内容(不验证签名,用于调试)"""
parts = token.split(".")
if len(parts) != 3:
return {"error": "格式错误"}
try:
header = json.loads(_b64url_decode(parts[0]))
payload = json.loads(_b64url_decode(parts[1]))
return {"header": header, "payload": payload, "signature": parts[2][:16] + "..."}
except Exception as e:
return {"error": str(e)}
# ─── 用户数据(内存) ─────────────────────────────────────────────────────────
_users = {
"alice": {"password": "pass123", "role": "admin", "id": 1},
"bob": {"password": "secret", "role": "user", "id": 2},
"carol": {"password": "hello6", "role": "user", "id": 3},
}
# ─── HTTP 服务器 ──────────────────────────────────────────────────────────────
STATUS_TEXT = {200: "OK", 201: "Created", 400: "Bad Request",
401: "Unauthorized", 403: "Forbidden", 404: "Not Found",
405: "Method Not Allowed"}
class JWTHandler(http.server.BaseHTTPRequestHandler):
def log_message(self, *args): pass
def _send(self, status: int, body: Dict) -> None:
payload = json.dumps(body, ensure_ascii=False).encode("utf-8")
self.send_response(status)
self.send_header("Content-Type", "application/json; charset=utf-8")
self.send_header("Content-Length", str(len(payload)))
self.end_headers()
self.wfile.write(payload)
def _read_body(self) -> Optional[Dict]:
length = int(self.headers.get("Content-Length", 0))
if not length:
return {}
try:
return json.loads(self.rfile.read(length))
except json.JSONDecodeError:
return None
def _get_token(self) -> Optional[str]:
auth = self.headers.get("Authorization", "")
if auth.startswith("Bearer "):
return auth[7:]
return None
def _require_auth(self) -> Optional[Dict]:
"""验证 Token,返回 payload 或发送 401"""
token = self._get_token()
if not token:
self._send(401, {"error": "缺少 Authorization 头"})
return None
payload = JWT.decode(token)
if payload is None:
self._send(401, {"error": "Token 无效或已过期"})
return None
return payload
def do_POST(self) -> None:
if self.path == "/api/auth/login":
self._login()
elif self.path == "/api/auth/refresh":
self._refresh()
else:
self._send(404, {"error": "路径不存在"})
def do_GET(self) -> None:
if self.path == "/api/me":
self._me()
elif self.path == "/api/admin/users":
self._admin_users()
elif self.path == "/api/health":
self._send(200, {"status": "ok"})
else:
self._send(404, {"error": "路径不存在"})
def _login(self) -> None:
body = self._read_body()
if body is None:
self._send(400, {"error": "请求体必须是 JSON"})
return
username = body.get("username", "").strip()
password = body.get("password", "")
user = _users.get(username)
if not user or user["password"] != password:
self._send(401, {"error": "用户名或密码错误"})
return
# 生成 Access Token(1小时)和 Refresh Token(7天)
access_token = JWT.encode(
{"user_id": user["id"], "username": username, "role": user["role"]},
expires_in=3600
)
refresh_token = JWT.encode(
{"user_id": user["id"], "username": username, "type": "refresh"},
expires_in=7 * 24 * 3600
)
self._send(200, {
"access_token": access_token,
"refresh_token": refresh_token,
"token_type": "Bearer",
"expires_in": 3600,
})
print(f" [POST /api/auth/login] 用户 {username} 登录成功")
def _refresh(self) -> None:
body = self._read_body()
if body is None:
self._send(400, {"error": "请求体必须是 JSON"})
return
refresh_token = body.get("refresh_token", "")
payload = JWT.decode(refresh_token)
if payload is None or payload.get("type") != "refresh":
self._send(401, {"error": "Refresh Token 无效或已过期"})
return
username = payload["username"]
user = _users.get(username)
if not user:
self._send(401, {"error": "用户不存在"})
return
new_token = JWT.encode(
{"user_id": user["id"], "username": username, "role": user["role"]},
expires_in=3600
)
self._send(200, {"access_token": new_token, "expires_in": 3600})
def _me(self) -> None:
payload = self._require_auth()
if payload is None:
return
self._send(200, {
"user_id": payload["user_id"],
"username": payload["username"],
"role": payload["role"],
"token_expires_at": datetime.fromtimestamp(payload["exp"]).isoformat(),
})
def _admin_users(self) -> None:
payload = self._require_auth()
if payload is None:
return
if payload.get("role") != "admin":
self._send(403, {"error": "需要管理员权限"})
return
users = [{"id": u["id"], "username": name, "role": u["role"]}
for name, u in _users.items()]
self._send(200, {"users": users})
def run_server(port: int = 8004) -> None:
print(f"🚀 JWT 鉴权服务器启动:http://127.0.0.1:{port}")
print(" POST /api/auth/login | POST /api/auth/refresh")
print(" GET /api/me | GET /api/admin/users")
print(" 测试账号:alice/pass123(admin), bob/secret(user)\n")
try:
http.server.HTTPServer(("127.0.0.1", port), JWTHandler).serve_forever()
except KeyboardInterrupt:
print("\n服务器已停止。")
# ─── 演示客户端 ───────────────────────────────────────────────────────────────
def demo_client(base: str = "http://127.0.0.1:8004") -> None:
def req(method: str, path: str, data: Dict = None,
token: str = None) -> Tuple[int, Dict]:
payload = json.dumps(data).encode() if data else None
headers: Dict[str, str] = {}
if payload:
headers["Content-Type"] = "application/json"
if token:
headers["Authorization"] = f"Bearer {token}"
r = urllib.request.Request(f"{base}{path}", data=payload,
headers=headers, method=method)
try:
with urllib.request.urlopen(r) as resp:
return resp.status, json.loads(resp.read())
except urllib.error.HTTPError as e:
return e.code, json.loads(e.read())
print("\n=== JWT 鉴权演示 ===\n")
# 1. 未携带 Token 访问受保护接口
status, body = req("GET", "/api/me")
print(f"[未登录 GET /api/me] HTTP {status} → {body}")
# 2. 错误密码登录
status, body = req("POST", "/api/auth/login", {"username": "alice", "password": "wrong"})
print(f"\n[错误密码登录] HTTP {status} → {body}")
# 3. 正确登录
status, body = req("POST", "/api/auth/login", {"username": "alice", "password": "pass123"})
print(f"\n[alice 登录] HTTP {status}")
access_token = body["access_token"]
refresh_token = body["refresh_token"]
print(f" Access Token: {access_token[:40]}...")
print(f" Refresh Token: {refresh_token[:40]}...")
# 4. 解码 Token 内容
print(f"\n[Token 内容解析]")
info = JWT.inspect(access_token)
print(f" Header: {info['header']}")
print(f" Payload: {info['payload']}")
print(f" Sig: {info['signature']}")
# 5. 携带 Token 访问 /api/me
status, body = req("GET", "/api/me", token=access_token)
print(f"\n[GET /api/me(已登录)] HTTP {status} → {body}")
# 6. 管理员访问 /api/admin/users
status, body = req("GET", "/api/admin/users", token=access_token)
print(f"\n[GET /api/admin/users(alice=admin)] HTTP {status}")
if "users" in body:
for u in body["users"]:
print(f" {u}")
# 7. 普通用户访问管理员接口
status, body = req("POST", "/api/auth/login", {"username": "bob", "password": "secret"})
bob_token = body["access_token"]
status, body = req("GET", "/api/admin/users", token=bob_token)
print(f"\n[GET /api/admin/users(bob=user)] HTTP {status} → {body}")
# 8. 篡改 Token
tampered = access_token[:-5] + "XXXXX"
status, body = req("GET", "/api/me", token=tampered)
print(f"\n[篡改 Token 访问 /api/me] HTTP {status} → {body}")
# 9. 用 Refresh Token 换新 Access Token
status, body = req("POST", "/api/auth/refresh", {"refresh_token": refresh_token})
print(f"\n[POST /api/auth/refresh] HTTP {status}")
if "access_token" in body:
print(f" 新 Access Token: {body['access_token'][:40]}...")
# ─── JWT 原理离线演示 ─────────────────────────────────────────────────────────
def demo_jwt_offline() -> None:
print("\n=== JWT 原理演示(离线)===\n")
# 生成 Token
payload = {"user_id": 42, "username": "alice", "role": "admin"}
token = JWT.encode(payload, expires_in=60)
print(f"生成 Token:\n {token}\n")
# 解码
info = JWT.inspect(token)
print(f"Header: {info['header']}")
print(f"Payload: {info['payload']}")
print(f"Sig: {info['signature']}\n")
# 验证
decoded = JWT.decode(token)
print(f"验证结果:{'✅ 有效' if decoded else '❌ 无效'}")
if decoded:
exp_str = datetime.fromtimestamp(decoded["exp"]).strftime("%Y-%m-%d %H:%M:%S")
print(f" 用户: {decoded['username']},角色: {decoded['role']},过期: {exp_str}")
# 篡改测试
parts = token.split(".")
fake_payload = _b64url_encode(
json.dumps({"user_id": 42, "username": "alice", "role": "super_admin",
"iat": int(time.time()), "exp": int(time.time()) + 60}).encode()
)
tampered = f"{parts[0]}.{fake_payload}.{parts[2]}"
result = JWT.decode(tampered)
print(f"\n篡改 Payload(role→super_admin)后验证:{'✅ 有效' if result else '❌ 签名不匹配,拒绝'}")
# 过期测试
expired_token = JWT.encode({"user_id": 1}, expires_in=-1) # 立即过期
result = JWT.decode(expired_token)
print(f"过期 Token 验证:{'✅ 有效' if result else '❌ 已过期,拒绝'}")
# Token 生命周期表
print(f"\n{'─'*55}")
print(f" {'Token 类型':<20} {'有效期':<12} {'用途'}")
print(f"{'─'*55}")
rows = [
("Access Token", "15分钟~1小时", "携带在每次 API 请求头"),
("Refresh Token", "7天~30天", "仅用于换取新 Access Token"),
("Remember Token", "30天~90天", "记住登录状态(可选)"),
]
for name, ttl, usage in rows:
print(f" {name:<20} {ttl:<12} {usage}")
print(f"{'─'*55}")
# ─── 入口 ─────────────────────────────────────────────────────────────────────
def main() -> None:
parser = argparse.ArgumentParser(description="JWT 鉴权演示(零依赖)")
parser.add_argument("--mode", choices=["server", "demo", "jwt"], default="jwt",
help="server=启动服务器, demo=演示客户端, jwt=JWT原理离线演示")
parser.add_argument("--port", type=int, default=8004)
args = parser.parse_args()
if args.mode == "server":
run_server(args.port)
elif args.mode == "demo":
demo_client(f"http://127.0.0.1:{args.port}")
else:
demo_jwt_offline()
if __name__ == "__main__":
main()
$ python3 24-python-jwt-fastapi.py --mode jwt
=== JWT 原理演示(离线)===
[1] 生成 Token
token: eyJhbGciOiAiSFMyNTYiLCAidHlwIjogIkpXVCJ9.eyJ1c2VyX2lkIjogMSwgInVzZXJuYW1lIjogImFsaWNlIiwgInJvbGUiOiAiYWRtaW4iLCAiZXhwIjogMTc3NjQ0NDA5MX0.xxx
三段结构:header.payload.signature
[2] 验证 Token(decode)
user_id=1, username=alice, role=admin
[3] 篡改检测
✅ 篡改被检测:签名验证失败
[4] 调试解码(inspect,不验签)
header: {'alg': 'HS256', 'typ': 'JWT'}
payload: {'user_id': 1, 'username': 'alice', 'role': 'admin', 'exp': ...}
$ python3 24-python-jwt-fastapi.py --mode demo
=== JWT 鉴权演示 ===
❌ 未登录访问 /api/me → 401 {"error": "缺少 Authorization 头"}
❌ 错误密码登录 → 401 {"error": "用户名或密码错误"}
✅ 正确登录 → 200 {"access_token": "eyJ...", "token_type": "Bearer"}
✅ 登录后访问 /api/me → 200 {"username": "alice", "role": "admin"}
✅ 刷新 Token → 200 {"access_token": "eyJ...(新 token)"}
小结
| 概念 | 一句话记忆 |
|---|---|
| Base64URL | Base64 变体,+→-,/→_,去掉 =,可安全放 URL |
| JWT 三段 | header.payload.signature,前两段是编码不是加密 |
| HMAC-SHA256 | 用密钥对 "header.payload" 签名,任何字符改变签名就变 |
hmac.compare_digest |
时序安全比较,防止通过响应时间推断签名内容 |
exp 字段 |
Unix 时间戳,JWT.decode 自动检查是否过期 |
JWT.inspect |
不验签只解码,用于调试,生产环境不能用于鉴权 |
| Access Token | 短期(1小时),用于 API 调用 |
| Refresh Token | 长期(7天),用于换取新的 Access Token |
Authorization: Bearer |
JWT 的标准传输方式,比 Cookie 更适合前后端分离 |
⏱ NexDo Time(5 分钟)
挑战:给 JWT 加一个 nbf(Not Before)字段,让 Token 在指定时间之前无效。
具体步骤:
- 在
JWT.encode里加nbf参数(默认为当前时间),写入 payload - 在
JWT.decode里加检查:如果time.time() < payload["nbf"],抛出ValueError("Token 尚未生效") - 测试:生成一个
nbf = time.time() + 60(60秒后才生效)的 Token,立刻 decode 应该失败
Don’t wait for next time, do it in the next moment.