55 · 综合项目:网络词典(TCP + 多进程 + SQLite)
🔗 知识图谱导航:阅读本文前,建议先回顾《11 · Socket 编程:TCP/UDP 底层通信》《12 · 多进程与多线程:榨干 CPU 性能》《16 · 数据库底座:SQLite 核心操作》中的核心概念。本文会把网络、并发和数据库接成一个端到端项目。 NexDo Time · 2026-04-17 · 预计阅读 36 分钟
痛点与架构
学完 Socket、进程和 SQLite,如果没有综合项目,很容易只记住零散 API。网络词典是一个合适的小系统:客户端发命令,服务端解析协议,数据库保存用户、词条和查询历史,多进程负责让多个客户端互不阻塞。
这一篇按真实项目分层来理解:数据层先能独立工作,协议层把文本命令翻译成业务动作,网络层负责收发消息,并发层负责同时服务多个连接。为了照顾没有网络调试经验的新手,步骤区全部提供离线或 Mock 演示;文末完整源码再给出真实 TCP 版本。
客户端命令 R/L/Q/H/E
-> TCP 收发
-> handle_command 协议解析
-> SQLite 用户/词条/历史表
-> 响应 OK/ERR/BYE
步步为营:核心逻辑自适应拆解
综合项目最怕一上来甩完整代码。下面拆成 9 个步骤,先跑通数据库和协议,再理解客户端、服务端和 CLI 入口。
Step 1:用 build_db 搭好用户、词条和查询历史三张表
痛点与机制:
综合项目不能一上来写网络,先要有稳定的数据底座。build_db() 像开店前整理仓库:先建用户表、词条表、历史表,再放入初始词库。后续注册、登录、查词都围绕这三张表工作。
核心源码(逐字来自文末完整源码):
def build_db() -> sqlite3.Connection:
"""初始化内存数据库,建表并写入种子词典。"""
conn = sqlite3.connect(DB_PATH, check_same_thread=False)
conn.executescript("""
CREATE TABLE IF NOT EXISTS users (
id INTEGER PRIMARY KEY AUTOINCREMENT,
username TEXT UNIQUE NOT NULL,
password TEXT NOT NULL
);
CREATE TABLE IF NOT EXISTS words (
id INTEGER PRIMARY KEY AUTOINCREMENT,
word TEXT UNIQUE NOT NULL,
definition TEXT NOT NULL
);
CREATE TABLE IF NOT EXISTS history (
id INTEGER PRIMARY KEY AUTOINCREMENT,
username TEXT NOT NULL,
word TEXT NOT NULL,
queried_at DATETIME DEFAULT CURRENT_TIMESTAMP
);
""")
conn.executemany(
"INSERT OR IGNORE INTO words (word, definition) VALUES (?, ?)",
WORD_SEED,
)
conn.commit()
return conn
可运行演示(补齐 Mock 数据与 print 反馈):
import argparse
import hashlib
import multiprocessing
import socket
import sqlite3
import sys
import threading
import time
from typing import Optional
HOST: str = "127.0.0.1"
PORT: int = 16000
BUF: int = 1024
DB_PATH: str = ":memory:" # 改为文件路径即可持久化,如 "dict.db"
WORD_SEED: list[tuple[str, str]] = [
("python", "A high-level, interpreted programming language known for readability."),
("socket", "An endpoint for network communication between two processes."),
("process", "An instance of a program in execution, with its own memory space."),
("thread", "A lightweight unit of execution within a process, sharing memory."),
("database", "An organized collection of structured data stored electronically."),
("algorithm", "A step-by-step procedure for solving a problem or accomplishing a task."),
("recursion", "A function that calls itself to solve a smaller instance of the same problem."),
("iterator", "An object that enables traversal through a container one element at a time."),
("decorator", "A function that wraps another function to extend its behavior."),
("coroutine", "A generalization of subroutines for cooperative multitasking via yield/await."),
]
def build_db() -> sqlite3.Connection:
"""初始化内存数据库,建表并写入种子词典。"""
conn = sqlite3.connect(DB_PATH, check_same_thread=False)
conn.executescript("""
CREATE TABLE IF NOT EXISTS users (
id INTEGER PRIMARY KEY AUTOINCREMENT,
username TEXT UNIQUE NOT NULL,
password TEXT NOT NULL
);
CREATE TABLE IF NOT EXISTS words (
id INTEGER PRIMARY KEY AUTOINCREMENT,
word TEXT UNIQUE NOT NULL,
definition TEXT NOT NULL
);
CREATE TABLE IF NOT EXISTS history (
id INTEGER PRIMARY KEY AUTOINCREMENT,
username TEXT NOT NULL,
word TEXT NOT NULL,
queried_at DATETIME DEFAULT CURRENT_TIMESTAMP
);
""")
conn.executemany(
"INSERT OR IGNORE INTO words (word, definition) VALUES (?, ?)",
WORD_SEED,
)
conn.commit()
return conn
conn = build_db()
print("用户表数量:", conn.execute("SELECT COUNT(*) FROM users").fetchone()[0])
print("词条数量:", conn.execute("SELECT COUNT(*) FROM words").fetchone()[0])
print("前 3 个词:", conn.execute("SELECT word FROM words ORDER BY word LIMIT 3").fetchall())
conn.close()
Step 2:用 db_register/db_login 完成账号注册和登录校验
痛点与机制:
用户系统的核心是“注册时写入,登录时核对”。这里使用参数化 SQL,像把用户输入放进安全信封,而不是直接拼进 SQL 字符串里。重复注册会被 UNIQUE 约束拦住,错误密码也不会登录成功。
核心源码(逐字来自文末完整源码):
def db_register(conn: sqlite3.Connection, username: str, password: str) -> str:
try:
conn.execute("INSERT INTO users (username, password) VALUES (?, ?)", (username, password))
conn.commit()
return "OK:注册成功"
except sqlite3.IntegrityError:
return "ERR:用户已存在"
def db_login(conn: sqlite3.Connection, username: str, password: str) -> str:
row = conn.execute(
"SELECT id FROM users WHERE username=? AND password=?", (username, password)
).fetchone()
return "OK:登录成功" if row else "ERR:用户名或密码错误"
可运行演示(补齐 Mock 数据与 print 反馈):
import argparse
import hashlib
import multiprocessing
import socket
import sqlite3
import sys
import threading
import time
from typing import Optional
HOST: str = "127.0.0.1"
PORT: int = 16000
BUF: int = 1024
DB_PATH: str = ":memory:" # 改为文件路径即可持久化,如 "dict.db"
WORD_SEED: list[tuple[str, str]] = [
("python", "A high-level, interpreted programming language known for readability."),
("socket", "An endpoint for network communication between two processes."),
("process", "An instance of a program in execution, with its own memory space."),
("thread", "A lightweight unit of execution within a process, sharing memory."),
("database", "An organized collection of structured data stored electronically."),
("algorithm", "A step-by-step procedure for solving a problem or accomplishing a task."),
("recursion", "A function that calls itself to solve a smaller instance of the same problem."),
("iterator", "An object that enables traversal through a container one element at a time."),
("decorator", "A function that wraps another function to extend its behavior."),
("coroutine", "A generalization of subroutines for cooperative multitasking via yield/await."),
]
def build_db() -> sqlite3.Connection:
"""初始化内存数据库,建表并写入种子词典。"""
conn = sqlite3.connect(DB_PATH, check_same_thread=False)
conn.executescript("""
CREATE TABLE IF NOT EXISTS users (
id INTEGER PRIMARY KEY AUTOINCREMENT,
username TEXT UNIQUE NOT NULL,
password TEXT NOT NULL
);
CREATE TABLE IF NOT EXISTS words (
id INTEGER PRIMARY KEY AUTOINCREMENT,
word TEXT UNIQUE NOT NULL,
definition TEXT NOT NULL
);
CREATE TABLE IF NOT EXISTS history (
id INTEGER PRIMARY KEY AUTOINCREMENT,
username TEXT NOT NULL,
word TEXT NOT NULL,
queried_at DATETIME DEFAULT CURRENT_TIMESTAMP
);
""")
conn.executemany(
"INSERT OR IGNORE INTO words (word, definition) VALUES (?, ?)",
WORD_SEED,
)
conn.commit()
return conn
def db_register(conn: sqlite3.Connection, username: str, password: str) -> str:
try:
conn.execute("INSERT INTO users (username, password) VALUES (?, ?)", (username, password))
conn.commit()
return "OK:注册成功"
except sqlite3.IntegrityError:
return "ERR:用户已存在"
def db_login(conn: sqlite3.Connection, username: str, password: str) -> str:
row = conn.execute(
"SELECT id FROM users WHERE username=? AND password=?", (username, password)
).fetchone()
return "OK:登录成功" if row else "ERR:用户名或密码错误"
conn = build_db()
print(db_register(conn, "alice", "pass123"))
print(db_register(conn, "alice", "pass123"))
print(db_login(conn, "alice", "pass123"))
print(db_login(conn, "alice", "wrong"))
conn.close()
Step 3:用 db_query/db_history 串起查词和历史记录
痛点与机制:
词典项目不只是查一次返回释义,还要记住用户查过什么。db_query() 查到释义后会写入 history,db_history() 再按时间倒序取最近记录。它像图书馆借阅记录,既能查书,也能回看借过什么。
核心源码(逐字来自文末完整源码):
def db_query(conn: sqlite3.Connection, username: str, word: str) -> str:
row = conn.execute(
"SELECT definition FROM words WHERE word=?", (word.lower(),)
).fetchone()
if not row:
return "ERR:未找到该词"
conn.execute("INSERT INTO history (username, word) VALUES (?, ?)", (username, word))
conn.commit()
return f"OK:{row[0]}"
def db_history(conn: sqlite3.Connection, username: str) -> str:
rows = conn.execute(
"SELECT word FROM history WHERE username=? ORDER BY queried_at DESC LIMIT 10",
(username,),
).fetchall()
if not rows:
return "OK:暂无查询记录"
return "OK:" + "|".join(r[0] for r in rows)
可运行演示(补齐 Mock 数据与 print 反馈):
import argparse
import hashlib
import multiprocessing
import socket
import sqlite3
import sys
import threading
import time
from typing import Optional
HOST: str = "127.0.0.1"
PORT: int = 16000
BUF: int = 1024
DB_PATH: str = ":memory:" # 改为文件路径即可持久化,如 "dict.db"
WORD_SEED: list[tuple[str, str]] = [
("python", "A high-level, interpreted programming language known for readability."),
("socket", "An endpoint for network communication between two processes."),
("process", "An instance of a program in execution, with its own memory space."),
("thread", "A lightweight unit of execution within a process, sharing memory."),
("database", "An organized collection of structured data stored electronically."),
("algorithm", "A step-by-step procedure for solving a problem or accomplishing a task."),
("recursion", "A function that calls itself to solve a smaller instance of the same problem."),
("iterator", "An object that enables traversal through a container one element at a time."),
("decorator", "A function that wraps another function to extend its behavior."),
("coroutine", "A generalization of subroutines for cooperative multitasking via yield/await."),
]
def build_db() -> sqlite3.Connection:
"""初始化内存数据库,建表并写入种子词典。"""
conn = sqlite3.connect(DB_PATH, check_same_thread=False)
conn.executescript("""
CREATE TABLE IF NOT EXISTS users (
id INTEGER PRIMARY KEY AUTOINCREMENT,
username TEXT UNIQUE NOT NULL,
password TEXT NOT NULL
);
CREATE TABLE IF NOT EXISTS words (
id INTEGER PRIMARY KEY AUTOINCREMENT,
word TEXT UNIQUE NOT NULL,
definition TEXT NOT NULL
);
CREATE TABLE IF NOT EXISTS history (
id INTEGER PRIMARY KEY AUTOINCREMENT,
username TEXT NOT NULL,
word TEXT NOT NULL,
queried_at DATETIME DEFAULT CURRENT_TIMESTAMP
);
""")
conn.executemany(
"INSERT OR IGNORE INTO words (word, definition) VALUES (?, ?)",
WORD_SEED,
)
conn.commit()
return conn
def db_register(conn: sqlite3.Connection, username: str, password: str) -> str:
try:
conn.execute("INSERT INTO users (username, password) VALUES (?, ?)", (username, password))
conn.commit()
return "OK:注册成功"
except sqlite3.IntegrityError:
return "ERR:用户已存在"
def db_query(conn: sqlite3.Connection, username: str, word: str) -> str:
row = conn.execute(
"SELECT definition FROM words WHERE word=?", (word.lower(),)
).fetchone()
if not row:
return "ERR:未找到该词"
conn.execute("INSERT INTO history (username, word) VALUES (?, ?)", (username, word))
conn.commit()
return f"OK:{row[0]}"
def db_history(conn: sqlite3.Connection, username: str) -> str:
rows = conn.execute(
"SELECT word FROM history WHERE username=? ORDER BY queried_at DESC LIMIT 10",
(username,),
).fetchall()
if not rows:
return "OK:暂无查询记录"
return "OK:" + "|".join(r[0] for r in rows)
conn = build_db()
print(db_register(conn, "alice", "pass123"))
print("查 python:", db_query(conn, "alice", "python"))
print("查 socket:", db_query(conn, "alice", "socket"))
print("查 unknown:", db_query(conn, "alice", "unknown"))
print("历史:", db_history(conn, "alice"))
conn.close()
Step 4:用 handle_command 把文本协议翻译成数据库操作
痛点与机制:
TCP 传过来的是字符串,服务端要先读懂指令。R/L/Q/H/E 就像柜台按钮:注册、登录、查询、历史、退出。handle_command() 是协议翻译员,把文本命令转成对应的数据层函数调用。
核心源码(逐字来自文末完整源码):
def handle_command(
conn: sqlite3.Connection,
raw: str,
current_user: Optional[str],
) -> tuple[str, Optional[str]]:
"""
解析一条客户端指令,返回 (响应文本, 更新后的登录用户名)。
协议:R:<user>:<pwd> | L:<user>:<pwd> | Q:<word> | H | E
"""
parts = raw.strip().split(":", 2)
cmd = parts[0].upper()
if cmd == "R" and len(parts) == 3:
return db_register(conn, parts[1], parts[2]), current_user
if cmd == "L" and len(parts) == 3:
resp = db_login(conn, parts[1], parts[2])
new_user = parts[1] if resp.startswith("OK") else current_user
return resp, new_user
if cmd == "Q" and len(parts) >= 2:
if not current_user:
return "ERR:请先登录", current_user
return db_query(conn, current_user, parts[1]), current_user
if cmd == "H":
if not current_user:
return "ERR:请先登录", current_user
return db_history(conn, current_user), current_user
if cmd == "E":
return "BYE", current_user
return "ERR:未知指令", current_user
可运行演示(补齐 Mock 数据与 print 反馈):
import argparse
import hashlib
import multiprocessing
import socket
import sqlite3
import sys
import threading
import time
from typing import Optional
HOST: str = "127.0.0.1"
PORT: int = 16000
BUF: int = 1024
DB_PATH: str = ":memory:" # 改为文件路径即可持久化,如 "dict.db"
WORD_SEED: list[tuple[str, str]] = [
("python", "A high-level, interpreted programming language known for readability."),
("socket", "An endpoint for network communication between two processes."),
("process", "An instance of a program in execution, with its own memory space."),
("thread", "A lightweight unit of execution within a process, sharing memory."),
("database", "An organized collection of structured data stored electronically."),
("algorithm", "A step-by-step procedure for solving a problem or accomplishing a task."),
("recursion", "A function that calls itself to solve a smaller instance of the same problem."),
("iterator", "An object that enables traversal through a container one element at a time."),
("decorator", "A function that wraps another function to extend its behavior."),
("coroutine", "A generalization of subroutines for cooperative multitasking via yield/await."),
]
def build_db() -> sqlite3.Connection:
"""初始化内存数据库,建表并写入种子词典。"""
conn = sqlite3.connect(DB_PATH, check_same_thread=False)
conn.executescript("""
CREATE TABLE IF NOT EXISTS users (
id INTEGER PRIMARY KEY AUTOINCREMENT,
username TEXT UNIQUE NOT NULL,
password TEXT NOT NULL
);
CREATE TABLE IF NOT EXISTS words (
id INTEGER PRIMARY KEY AUTOINCREMENT,
word TEXT UNIQUE NOT NULL,
definition TEXT NOT NULL
);
CREATE TABLE IF NOT EXISTS history (
id INTEGER PRIMARY KEY AUTOINCREMENT,
username TEXT NOT NULL,
word TEXT NOT NULL,
queried_at DATETIME DEFAULT CURRENT_TIMESTAMP
);
""")
conn.executemany(
"INSERT OR IGNORE INTO words (word, definition) VALUES (?, ?)",
WORD_SEED,
)
conn.commit()
return conn
def db_register(conn: sqlite3.Connection, username: str, password: str) -> str:
try:
conn.execute("INSERT INTO users (username, password) VALUES (?, ?)", (username, password))
conn.commit()
return "OK:注册成功"
except sqlite3.IntegrityError:
return "ERR:用户已存在"
def db_login(conn: sqlite3.Connection, username: str, password: str) -> str:
row = conn.execute(
"SELECT id FROM users WHERE username=? AND password=?", (username, password)
).fetchone()
return "OK:登录成功" if row else "ERR:用户名或密码错误"
def db_query(conn: sqlite3.Connection, username: str, word: str) -> str:
row = conn.execute(
"SELECT definition FROM words WHERE word=?", (word.lower(),)
).fetchone()
if not row:
return "ERR:未找到该词"
conn.execute("INSERT INTO history (username, word) VALUES (?, ?)", (username, word))
conn.commit()
return f"OK:{row[0]}"
def db_history(conn: sqlite3.Connection, username: str) -> str:
rows = conn.execute(
"SELECT word FROM history WHERE username=? ORDER BY queried_at DESC LIMIT 10",
(username,),
).fetchall()
if not rows:
return "OK:暂无查询记录"
return "OK:" + "|".join(r[0] for r in rows)
def handle_command(
conn: sqlite3.Connection,
raw: str,
current_user: Optional[str],
) -> tuple[str, Optional[str]]:
"""
解析一条客户端指令,返回 (响应文本, 更新后的登录用户名)。
协议:R:<user>:<pwd> | L:<user>:<pwd> | Q:<word> | H | E
"""
parts = raw.strip().split(":", 2)
cmd = parts[0].upper()
if cmd == "R" and len(parts) == 3:
return db_register(conn, parts[1], parts[2]), current_user
if cmd == "L" and len(parts) == 3:
resp = db_login(conn, parts[1], parts[2])
new_user = parts[1] if resp.startswith("OK") else current_user
return resp, new_user
if cmd == "Q" and len(parts) >= 2:
if not current_user:
return "ERR:请先登录", current_user
return db_query(conn, current_user, parts[1]), current_user
if cmd == "H":
if not current_user:
return "ERR:请先登录", current_user
return db_history(conn, current_user), current_user
if cmd == "E":
return "BYE", current_user
return "ERR:未知指令", current_user
conn = build_db()
current_user: Optional[str] = None
for cmd in ["Q:python", "R:alice:pass123", "L:alice:pass123", "Q:python", "H", "E"]:
resp, current_user = handle_command(conn, cmd, current_user)
print(f"{cmd:<18} -> {resp} | 当前用户={current_user}")
conn.close()
Step 5:用 handle_client 处理单个客户端的完整会话
痛点与机制:
一个客户端连接不是只发一条消息,而是一段会话。handle_client() 维护 current_user,循环接收命令、调用协议解析、发回响应。这里用 FakeSocket 模拟网络,避免新手必须先启动服务器。
核心源码(逐字来自文末完整源码):
def handle_client(client_sock: socket.socket, addr: tuple, db_conn: sqlite3.Connection) -> None:
"""每个子进程运行此函数,独立处理一个客户端连接。"""
current_user: Optional[str] = None
print(f" [子进程 {multiprocessing.current_process().pid}] 连接来自 {addr}")
try:
while True:
data = client_sock.recv(BUF)
if not data:
break
raw = data.decode("utf-8", errors="ignore")
response, current_user = handle_command(db_conn, raw, current_user)
client_sock.sendall(response.encode("utf-8"))
if response == "BYE":
break
finally:
client_sock.close()
print(f" [子进程 {multiprocessing.current_process().pid}] 连接断开 {addr}")
可运行演示(补齐 Mock 数据与 print 反馈):
import argparse
import hashlib
import multiprocessing
import socket
import sqlite3
import sys
import threading
import time
from typing import Optional
HOST: str = "127.0.0.1"
PORT: int = 16000
BUF: int = 1024
DB_PATH: str = ":memory:" # 改为文件路径即可持久化,如 "dict.db"
WORD_SEED: list[tuple[str, str]] = [
("python", "A high-level, interpreted programming language known for readability."),
("socket", "An endpoint for network communication between two processes."),
("process", "An instance of a program in execution, with its own memory space."),
("thread", "A lightweight unit of execution within a process, sharing memory."),
("database", "An organized collection of structured data stored electronically."),
("algorithm", "A step-by-step procedure for solving a problem or accomplishing a task."),
("recursion", "A function that calls itself to solve a smaller instance of the same problem."),
("iterator", "An object that enables traversal through a container one element at a time."),
("decorator", "A function that wraps another function to extend its behavior."),
("coroutine", "A generalization of subroutines for cooperative multitasking via yield/await."),
]
def build_db() -> sqlite3.Connection:
"""初始化内存数据库,建表并写入种子词典。"""
conn = sqlite3.connect(DB_PATH, check_same_thread=False)
conn.executescript("""
CREATE TABLE IF NOT EXISTS users (
id INTEGER PRIMARY KEY AUTOINCREMENT,
username TEXT UNIQUE NOT NULL,
password TEXT NOT NULL
);
CREATE TABLE IF NOT EXISTS words (
id INTEGER PRIMARY KEY AUTOINCREMENT,
word TEXT UNIQUE NOT NULL,
definition TEXT NOT NULL
);
CREATE TABLE IF NOT EXISTS history (
id INTEGER PRIMARY KEY AUTOINCREMENT,
username TEXT NOT NULL,
word TEXT NOT NULL,
queried_at DATETIME DEFAULT CURRENT_TIMESTAMP
);
""")
conn.executemany(
"INSERT OR IGNORE INTO words (word, definition) VALUES (?, ?)",
WORD_SEED,
)
conn.commit()
return conn
def db_register(conn: sqlite3.Connection, username: str, password: str) -> str:
try:
conn.execute("INSERT INTO users (username, password) VALUES (?, ?)", (username, password))
conn.commit()
return "OK:注册成功"
except sqlite3.IntegrityError:
return "ERR:用户已存在"
def db_login(conn: sqlite3.Connection, username: str, password: str) -> str:
row = conn.execute(
"SELECT id FROM users WHERE username=? AND password=?", (username, password)
).fetchone()
return "OK:登录成功" if row else "ERR:用户名或密码错误"
def db_query(conn: sqlite3.Connection, username: str, word: str) -> str:
row = conn.execute(
"SELECT definition FROM words WHERE word=?", (word.lower(),)
).fetchone()
if not row:
return "ERR:未找到该词"
conn.execute("INSERT INTO history (username, word) VALUES (?, ?)", (username, word))
conn.commit()
return f"OK:{row[0]}"
def db_history(conn: sqlite3.Connection, username: str) -> str:
rows = conn.execute(
"SELECT word FROM history WHERE username=? ORDER BY queried_at DESC LIMIT 10",
(username,),
).fetchall()
if not rows:
return "OK:暂无查询记录"
return "OK:" + "|".join(r[0] for r in rows)
def handle_command(
conn: sqlite3.Connection,
raw: str,
current_user: Optional[str],
) -> tuple[str, Optional[str]]:
"""
解析一条客户端指令,返回 (响应文本, 更新后的登录用户名)。
协议:R:<user>:<pwd> | L:<user>:<pwd> | Q:<word> | H | E
"""
parts = raw.strip().split(":", 2)
cmd = parts[0].upper()
if cmd == "R" and len(parts) == 3:
return db_register(conn, parts[1], parts[2]), current_user
if cmd == "L" and len(parts) == 3:
resp = db_login(conn, parts[1], parts[2])
new_user = parts[1] if resp.startswith("OK") else current_user
return resp, new_user
if cmd == "Q" and len(parts) >= 2:
if not current_user:
return "ERR:请先登录", current_user
return db_query(conn, current_user, parts[1]), current_user
if cmd == "H":
if not current_user:
return "ERR:请先登录", current_user
return db_history(conn, current_user), current_user
if cmd == "E":
return "BYE", current_user
return "ERR:未知指令", current_user
class FakeSocket:
def __init__(self, messages: list[str]) -> None:
self.messages = [m.encode("utf-8") for m in messages]
self.sent: list[str] = []
def recv(self, size: int) -> bytes:
return self.messages.pop(0) if self.messages else b""
def sendall(self, data: bytes) -> None:
self.sent.append(data.decode("utf-8"))
def close(self) -> None:
print("FakeSocket 已关闭")
def handle_client(client_sock: socket.socket, addr: tuple, db_conn: sqlite3.Connection) -> None:
"""每个子进程运行此函数,独立处理一个客户端连接。"""
current_user: Optional[str] = None
print(f" [子进程 {multiprocessing.current_process().pid}] 连接来自 {addr}")
try:
while True:
data = client_sock.recv(BUF)
if not data:
break
raw = data.decode("utf-8", errors="ignore")
response, current_user = handle_command(db_conn, raw, current_user)
client_sock.sendall(response.encode("utf-8"))
if response == "BYE":
break
finally:
client_sock.close()
print(f" [子进程 {multiprocessing.current_process().pid}] 连接断开 {addr}")
conn = build_db()
fake = FakeSocket(["R:alice:pass123", "L:alice:pass123", "Q:python", "H", "E"])
handle_client(fake, ("127.0.0.1", 6000), conn)
print("客户端收到:", fake.sent)
conn.close()
Step 6:用 run_server 理解 TCP 监听和多进程并发模型
痛点与机制:
真实服务端像前台接线员:主进程只负责接电话,接到一个客户端就交给子进程处理。这样 A 客户端查词时,B 客户端不用排队等它结束。演示片段只解释流程,不占用本机端口。
核心源码(逐字来自文末完整源码):
def run_server() -> None:
db_conn = build_db()
server_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
server_sock.bind((HOST, PORT))
server_sock.listen(5)
print(f"🚀 词典服务器启动:{HOST}:{PORT} (Ctrl+C 停止)")
print(f" 词库:{len(WORD_SEED)} 个词条 | 数据库:{DB_PATH}")
try:
while True:
client_sock, addr = server_sock.accept()
# 每个连接 fork 一个子进程
proc = multiprocessing.Process(
target=handle_client,
args=(client_sock, addr, db_conn),
daemon=True,
)
proc.start()
client_sock.close() # 主进程关闭副本,子进程持有原始 fd
except KeyboardInterrupt:
print("\n服务器已停止。")
finally:
server_sock.close()
可运行演示(补齐 Mock 数据与 print 反馈):
import argparse
import hashlib
import multiprocessing
import socket
import sqlite3
import sys
import threading
import time
from typing import Optional
HOST: str = "127.0.0.1"
PORT: int = 16000
BUF: int = 1024
DB_PATH: str = ":memory:" # 改为文件路径即可持久化,如 "dict.db"
WORD_SEED: list[tuple[str, str]] = [
("python", "A high-level, interpreted programming language known for readability."),
("socket", "An endpoint for network communication between two processes."),
("process", "An instance of a program in execution, with its own memory space."),
("thread", "A lightweight unit of execution within a process, sharing memory."),
("database", "An organized collection of structured data stored electronically."),
("algorithm", "A step-by-step procedure for solving a problem or accomplishing a task."),
("recursion", "A function that calls itself to solve a smaller instance of the same problem."),
("iterator", "An object that enables traversal through a container one element at a time."),
("decorator", "A function that wraps another function to extend its behavior."),
("coroutine", "A generalization of subroutines for cooperative multitasking via yield/await."),
]
def build_db() -> sqlite3.Connection:
"""初始化内存数据库,建表并写入种子词典。"""
conn = sqlite3.connect(DB_PATH, check_same_thread=False)
conn.executescript("""
CREATE TABLE IF NOT EXISTS users (
id INTEGER PRIMARY KEY AUTOINCREMENT,
username TEXT UNIQUE NOT NULL,
password TEXT NOT NULL
);
CREATE TABLE IF NOT EXISTS words (
id INTEGER PRIMARY KEY AUTOINCREMENT,
word TEXT UNIQUE NOT NULL,
definition TEXT NOT NULL
);
CREATE TABLE IF NOT EXISTS history (
id INTEGER PRIMARY KEY AUTOINCREMENT,
username TEXT NOT NULL,
word TEXT NOT NULL,
queried_at DATETIME DEFAULT CURRENT_TIMESTAMP
);
""")
conn.executemany(
"INSERT OR IGNORE INTO words (word, definition) VALUES (?, ?)",
WORD_SEED,
)
conn.commit()
return conn
def handle_client(client_sock: socket.socket, addr: tuple, db_conn: sqlite3.Connection) -> None:
"""每个子进程运行此函数,独立处理一个客户端连接。"""
current_user: Optional[str] = None
print(f" [子进程 {multiprocessing.current_process().pid}] 连接来自 {addr}")
try:
while True:
data = client_sock.recv(BUF)
if not data:
break
raw = data.decode("utf-8", errors="ignore")
response, current_user = handle_command(db_conn, raw, current_user)
client_sock.sendall(response.encode("utf-8"))
if response == "BYE":
break
finally:
client_sock.close()
print(f" [子进程 {multiprocessing.current_process().pid}] 连接断开 {addr}")
def run_server() -> None:
db_conn = build_db()
server_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
server_sock.bind((HOST, PORT))
server_sock.listen(5)
print(f"🚀 词典服务器启动:{HOST}:{PORT} (Ctrl+C 停止)")
print(f" 词库:{len(WORD_SEED)} 个词条 | 数据库:{DB_PATH}")
try:
while True:
client_sock, addr = server_sock.accept()
# 每个连接 fork 一个子进程
proc = multiprocessing.Process(
target=handle_client,
args=(client_sock, addr, db_conn),
daemon=True,
)
proc.start()
client_sock.close() # 主进程关闭副本,子进程持有原始 fd
except KeyboardInterrupt:
print("\n服务器已停止。")
finally:
server_sock.close()
print("run_server 会:")
print("1. build_db() 初始化词库")
print("2. socket.bind/listen 开始监听")
print("3. 每 accept 一个客户端就 multiprocessing.Process(...) 分配子进程")
print("演示片段不真实启动服务器,避免占用端口;完整源码可用 --mode server 启动。")
Step 7:用 run_client 做可复现的脚本化客户端
痛点与机制:
教程不适合依赖人工输入,因为读者和测试脚本都很难复现。run_client(commands) 把客户端命令提前写成列表,按顺序发送,这样每次运行都能得到相同流程。
核心源码(逐字来自文末完整源码):
def run_client(commands: list[str]) -> None:
"""按给定指令列表顺序发送请求,避免教程依赖人工交互输入。"""
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
try:
sock.connect((HOST, PORT))
except ConnectionRefusedError:
print(f"ERR: 无法连接 {HOST}:{PORT},请先启动服务器。")
sys.exit(1)
print(f"✅ 已连接 {HOST}:{PORT}")
print("指令:R:<用户名>:<密码> L:<用户名>:<密码> Q:<单词> H E")
try:
for cmd in commands:
cmd = cmd.strip()
if not cmd:
continue
print(f">>> {cmd}")
sock.sendall(cmd.encode("utf-8"))
resp = sock.recv(BUF).decode("utf-8")
print(f" {resp}")
if resp == "BYE":
break
except KeyboardInterrupt:
print("客户端已中断。")
finally:
sock.close()
可运行演示(补齐 Mock 数据与 print 反馈):
import argparse
import hashlib
import multiprocessing
import socket
import sqlite3
import sys
import threading
import time
from typing import Optional
HOST: str = "127.0.0.1"
PORT: int = 16000
BUF: int = 1024
DB_PATH: str = ":memory:" # 改为文件路径即可持久化,如 "dict.db"
WORD_SEED: list[tuple[str, str]] = [
("python", "A high-level, interpreted programming language known for readability."),
("socket", "An endpoint for network communication between two processes."),
("process", "An instance of a program in execution, with its own memory space."),
("thread", "A lightweight unit of execution within a process, sharing memory."),
("database", "An organized collection of structured data stored electronically."),
("algorithm", "A step-by-step procedure for solving a problem or accomplishing a task."),
("recursion", "A function that calls itself to solve a smaller instance of the same problem."),
("iterator", "An object that enables traversal through a container one element at a time."),
("decorator", "A function that wraps another function to extend its behavior."),
("coroutine", "A generalization of subroutines for cooperative multitasking via yield/await."),
]
def run_client(commands: list[str]) -> None:
"""按给定指令列表顺序发送请求,避免教程依赖人工交互输入。"""
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
try:
sock.connect((HOST, PORT))
except ConnectionRefusedError:
print(f"ERR: 无法连接 {HOST}:{PORT},请先启动服务器。")
sys.exit(1)
print(f"✅ 已连接 {HOST}:{PORT}")
print("指令:R:<用户名>:<密码> L:<用户名>:<密码> Q:<单词> H E")
try:
for cmd in commands:
cmd = cmd.strip()
if not cmd:
continue
print(f">>> {cmd}")
sock.sendall(cmd.encode("utf-8"))
resp = sock.recv(BUF).decode("utf-8")
print(f" {resp}")
if resp == "BYE":
break
except KeyboardInterrupt:
print("客户端已中断。")
finally:
sock.close()
print("run_client 的脚本化指令示例:")
commands = "R:bob:pw,L:bob:pw,Q:python,H,E".split(",")
for cmd in commands:
print("准备发送:", cmd)
print("完整源码中 run_client 会连接服务器后逐条 sendall,并打印服务端响应。")
Step 8:用 run_offline_demo 在无端口环境跑完整闭环
痛点与机制:
有些环境不允许绑定端口,或者端口已被占用。离线演示直接调用协议解析层,仍然覆盖注册、登录、查词、历史和退出。它像不开门营业时的内部排练,流程照样完整。
核心源码(逐字来自文末完整源码):
def run_offline_demo() -> None:
"""无端口权限时的离线演示:直接调用协议解析层,保证 python main.py 不报错。"""
db_conn = build_db()
current_user: Optional[str] = None
steps = [
"R:alice:pass123",
"L:alice:pass123",
"Q:python",
"Q:socket",
"Q:recursion",
"H",
"E",
]
print("=" * 60)
print(" 网络词典 — 离线协议演示")
print("=" * 60)
for cmd in steps:
resp, current_user = handle_command(db_conn, cmd, current_user)
print(f" 发送: {cmd:<30} 响应: {resp}")
print("\n✅ 离线演示完成")
可运行演示(补齐 Mock 数据与 print 反馈):
import argparse
import hashlib
import multiprocessing
import socket
import sqlite3
import sys
import threading
import time
from typing import Optional
HOST: str = "127.0.0.1"
PORT: int = 16000
BUF: int = 1024
DB_PATH: str = ":memory:" # 改为文件路径即可持久化,如 "dict.db"
WORD_SEED: list[tuple[str, str]] = [
("python", "A high-level, interpreted programming language known for readability."),
("socket", "An endpoint for network communication between two processes."),
("process", "An instance of a program in execution, with its own memory space."),
("thread", "A lightweight unit of execution within a process, sharing memory."),
("database", "An organized collection of structured data stored electronically."),
("algorithm", "A step-by-step procedure for solving a problem or accomplishing a task."),
("recursion", "A function that calls itself to solve a smaller instance of the same problem."),
("iterator", "An object that enables traversal through a container one element at a time."),
("decorator", "A function that wraps another function to extend its behavior."),
("coroutine", "A generalization of subroutines for cooperative multitasking via yield/await."),
]
def build_db() -> sqlite3.Connection:
"""初始化内存数据库,建表并写入种子词典。"""
conn = sqlite3.connect(DB_PATH, check_same_thread=False)
conn.executescript("""
CREATE TABLE IF NOT EXISTS users (
id INTEGER PRIMARY KEY AUTOINCREMENT,
username TEXT UNIQUE NOT NULL,
password TEXT NOT NULL
);
CREATE TABLE IF NOT EXISTS words (
id INTEGER PRIMARY KEY AUTOINCREMENT,
word TEXT UNIQUE NOT NULL,
definition TEXT NOT NULL
);
CREATE TABLE IF NOT EXISTS history (
id INTEGER PRIMARY KEY AUTOINCREMENT,
username TEXT NOT NULL,
word TEXT NOT NULL,
queried_at DATETIME DEFAULT CURRENT_TIMESTAMP
);
""")
conn.executemany(
"INSERT OR IGNORE INTO words (word, definition) VALUES (?, ?)",
WORD_SEED,
)
conn.commit()
return conn
def db_register(conn: sqlite3.Connection, username: str, password: str) -> str:
try:
conn.execute("INSERT INTO users (username, password) VALUES (?, ?)", (username, password))
conn.commit()
return "OK:注册成功"
except sqlite3.IntegrityError:
return "ERR:用户已存在"
def db_login(conn: sqlite3.Connection, username: str, password: str) -> str:
row = conn.execute(
"SELECT id FROM users WHERE username=? AND password=?", (username, password)
).fetchone()
return "OK:登录成功" if row else "ERR:用户名或密码错误"
def db_query(conn: sqlite3.Connection, username: str, word: str) -> str:
row = conn.execute(
"SELECT definition FROM words WHERE word=?", (word.lower(),)
).fetchone()
if not row:
return "ERR:未找到该词"
conn.execute("INSERT INTO history (username, word) VALUES (?, ?)", (username, word))
conn.commit()
return f"OK:{row[0]}"
def db_history(conn: sqlite3.Connection, username: str) -> str:
rows = conn.execute(
"SELECT word FROM history WHERE username=? ORDER BY queried_at DESC LIMIT 10",
(username,),
).fetchall()
if not rows:
return "OK:暂无查询记录"
return "OK:" + "|".join(r[0] for r in rows)
def handle_command(
conn: sqlite3.Connection,
raw: str,
current_user: Optional[str],
) -> tuple[str, Optional[str]]:
"""
解析一条客户端指令,返回 (响应文本, 更新后的登录用户名)。
协议:R:<user>:<pwd> | L:<user>:<pwd> | Q:<word> | H | E
"""
parts = raw.strip().split(":", 2)
cmd = parts[0].upper()
if cmd == "R" and len(parts) == 3:
return db_register(conn, parts[1], parts[2]), current_user
if cmd == "L" and len(parts) == 3:
resp = db_login(conn, parts[1], parts[2])
new_user = parts[1] if resp.startswith("OK") else current_user
return resp, new_user
if cmd == "Q" and len(parts) >= 2:
if not current_user:
return "ERR:请先登录", current_user
return db_query(conn, current_user, parts[1]), current_user
if cmd == "H":
if not current_user:
return "ERR:请先登录", current_user
return db_history(conn, current_user), current_user
if cmd == "E":
return "BYE", current_user
return "ERR:未知指令", current_user
def run_offline_demo() -> None:
"""无端口权限时的离线演示:直接调用协议解析层,保证 python main.py 不报错。"""
db_conn = build_db()
current_user: Optional[str] = None
steps = [
"R:alice:pass123",
"L:alice:pass123",
"Q:python",
"Q:socket",
"Q:recursion",
"H",
"E",
]
print("=" * 60)
print(" 网络词典 — 离线协议演示")
print("=" * 60)
for cmd in steps:
resp, current_user = handle_command(db_conn, cmd, current_user)
print(f" 发送: {cmd:<30} 响应: {resp}")
print("\n✅ 离线演示完成")
run_offline_demo()
Step 9:用 main 做 server/client/demo 三种运行模式
痛点与机制:
综合项目最终要变成命令行工具。--mode server 启服务,--mode client 发脚本化命令,--mode demo 自动跑完整演示。用户不需要改源码,只需要换参数。
核心源码(逐字来自文末完整源码):
def main() -> None:
parser = argparse.ArgumentParser(description="网络词典(TCP + 多进程 + SQLite)")
parser.add_argument(
"--mode",
choices=["server", "client", "demo"],
default="demo",
help="server=启动服务器 | client=交互客户端 | demo=自动演示(默认)",
)
parser.add_argument(
"--commands",
default="R:bob:pw,L:bob:pw,Q:python,H,E",
help="client 模式下要顺序发送的逗号分隔指令,避免人工交互输入",
)
args = parser.parse_args()
if args.mode == "server":
run_server()
elif args.mode == "client":
run_client(args.commands.split(","))
else:
run_demo()
可运行演示(补齐 Mock 数据与 print 反馈):
import argparse
import hashlib
import multiprocessing
import socket
import sqlite3
import sys
import threading
import time
from typing import Optional
HOST: str = "127.0.0.1"
PORT: int = 16000
BUF: int = 1024
DB_PATH: str = ":memory:" # 改为文件路径即可持久化,如 "dict.db"
WORD_SEED: list[tuple[str, str]] = [
("python", "A high-level, interpreted programming language known for readability."),
("socket", "An endpoint for network communication between two processes."),
("process", "An instance of a program in execution, with its own memory space."),
("thread", "A lightweight unit of execution within a process, sharing memory."),
("database", "An organized collection of structured data stored electronically."),
("algorithm", "A step-by-step procedure for solving a problem or accomplishing a task."),
("recursion", "A function that calls itself to solve a smaller instance of the same problem."),
("iterator", "An object that enables traversal through a container one element at a time."),
("decorator", "A function that wraps another function to extend its behavior."),
("coroutine", "A generalization of subroutines for cooperative multitasking via yield/await."),
]
def main() -> None:
parser = argparse.ArgumentParser(description="网络词典(TCP + 多进程 + SQLite)")
parser.add_argument(
"--mode",
choices=["server", "client", "demo"],
default="demo",
help="server=启动服务器 | client=交互客户端 | demo=自动演示(默认)",
)
parser.add_argument(
"--commands",
default="R:bob:pw,L:bob:pw,Q:python,H,E",
help="client 模式下要顺序发送的逗号分隔指令,避免人工交互输入",
)
args = parser.parse_args()
if args.mode == "server":
run_server()
elif args.mode == "client":
run_client(args.commands.split(","))
else:
run_demo()
def run_server() -> None:
print("启动真实 TCP 服务器")
def run_client(commands: list[str]) -> None:
print("脚本化客户端命令:", commands)
def run_demo() -> None:
print("运行自动演示")
for mode in ["demo", "client", "server"]:
import sys
print(f"\n$ python 55-python-project-dict.py --mode {mode}")
sys.argv = ["55-python-project-dict.py", "--mode", mode]
if mode == "client":
sys.argv += ["--commands", "R:bob:pw,L:bob:pw,Q:python,H,E"]
main()
极客实战:完整源码与运行
现在,把上面的积木拼起来,将下面完整代码保存为 55-python-project-dict.py。默认 --mode demo 会自动演示完整流程;如果环境不允许绑定端口,代码会降级到离线协议演示,保证脚本仍然能跑完。
#!/usr/bin/env python3
"""
55-python-project-dict.py — 网络词典(TCP + 多进程 + SQLite)
用法:
python3 55-python-project-dict.py --mode server # 启动服务器(后台监听)
python3 55-python-project-dict.py --mode client --commands "R:bob:pw,L:bob:pw,Q:python,H,E"
python3 55-python-project-dict.py --mode demo # 自动演示完整流程(默认)
零外部依赖,直接运行。服务器使用 SQLite 内存库,重启后数据清空(演示用)。
"""
import argparse
import multiprocessing
import socket
import sqlite3
import sys
import threading
import time
from typing import Optional
HOST: str = "127.0.0.1"
PORT: int = 16000
BUF: int = 1024
DB_PATH: str = ":memory:" # 改为文件路径即可持久化,如 "dict.db"
# ─── 词典 Mock 数据 ────────────────────────────────────────────────────────────
WORD_SEED: list[tuple[str, str]] = [
("python", "A high-level, interpreted programming language known for readability."),
("socket", "An endpoint for network communication between two processes."),
("process", "An instance of a program in execution, with its own memory space."),
("thread", "A lightweight unit of execution within a process, sharing memory."),
("database", "An organized collection of structured data stored electronically."),
("algorithm", "A step-by-step procedure for solving a problem or accomplishing a task."),
("recursion", "A function that calls itself to solve a smaller instance of the same problem."),
("iterator", "An object that enables traversal through a container one element at a time."),
("decorator", "A function that wraps another function to extend its behavior."),
("coroutine", "A generalization of subroutines for cooperative multitasking via yield/await."),
]
# ─── 数据库层 ──────────────────────────────────────────────────────────────────
def build_db() -> sqlite3.Connection:
"""初始化内存数据库,建表并写入种子词典。"""
conn = sqlite3.connect(DB_PATH, check_same_thread=False)
conn.executescript("""
CREATE TABLE IF NOT EXISTS users (
id INTEGER PRIMARY KEY AUTOINCREMENT,
username TEXT UNIQUE NOT NULL,
password TEXT NOT NULL
);
CREATE TABLE IF NOT EXISTS words (
id INTEGER PRIMARY KEY AUTOINCREMENT,
word TEXT UNIQUE NOT NULL,
definition TEXT NOT NULL
);
CREATE TABLE IF NOT EXISTS history (
id INTEGER PRIMARY KEY AUTOINCREMENT,
username TEXT NOT NULL,
word TEXT NOT NULL,
queried_at DATETIME DEFAULT CURRENT_TIMESTAMP
);
""")
conn.executemany(
"INSERT OR IGNORE INTO words (word, definition) VALUES (?, ?)",
WORD_SEED,
)
conn.commit()
return conn
def db_register(conn: sqlite3.Connection, username: str, password: str) -> str:
try:
conn.execute("INSERT INTO users (username, password) VALUES (?, ?)", (username, password))
conn.commit()
return "OK:注册成功"
except sqlite3.IntegrityError:
return "ERR:用户已存在"
def db_login(conn: sqlite3.Connection, username: str, password: str) -> str:
row = conn.execute(
"SELECT id FROM users WHERE username=? AND password=?", (username, password)
).fetchone()
return "OK:登录成功" if row else "ERR:用户名或密码错误"
def db_query(conn: sqlite3.Connection, username: str, word: str) -> str:
row = conn.execute(
"SELECT definition FROM words WHERE word=?", (word.lower(),)
).fetchone()
if not row:
return "ERR:未找到该词"
conn.execute("INSERT INTO history (username, word) VALUES (?, ?)", (username, word))
conn.commit()
return f"OK:{row[0]}"
def db_history(conn: sqlite3.Connection, username: str) -> str:
rows = conn.execute(
"SELECT word FROM history WHERE username=? ORDER BY queried_at DESC LIMIT 10",
(username,),
).fetchall()
if not rows:
return "OK:暂无查询记录"
return "OK:" + "|".join(r[0] for r in rows)
# ─── 协议解析层 ────────────────────────────────────────────────────────────────
def handle_command(
conn: sqlite3.Connection,
raw: str,
current_user: Optional[str],
) -> tuple[str, Optional[str]]:
"""
解析一条客户端指令,返回 (响应文本, 更新后的登录用户名)。
协议:R:<user>:<pwd> | L:<user>:<pwd> | Q:<word> | H | E
"""
parts = raw.strip().split(":", 2)
cmd = parts[0].upper()
if cmd == "R" and len(parts) == 3:
return db_register(conn, parts[1], parts[2]), current_user
if cmd == "L" and len(parts) == 3:
resp = db_login(conn, parts[1], parts[2])
new_user = parts[1] if resp.startswith("OK") else current_user
return resp, new_user
if cmd == "Q" and len(parts) >= 2:
if not current_user:
return "ERR:请先登录", current_user
return db_query(conn, current_user, parts[1]), current_user
if cmd == "H":
if not current_user:
return "ERR:请先登录", current_user
return db_history(conn, current_user), current_user
if cmd == "E":
return "BYE", current_user
return "ERR:未知指令", current_user
# ─── 子进程:处理单个客户端 ────────────────────────────────────────────────────
def handle_client(client_sock: socket.socket, addr: tuple, db_conn: sqlite3.Connection) -> None:
"""每个子进程运行此函数,独立处理一个客户端连接。"""
current_user: Optional[str] = None
print(f" [子进程 {multiprocessing.current_process().pid}] 连接来自 {addr}")
try:
while True:
data = client_sock.recv(BUF)
if not data:
break
raw = data.decode("utf-8", errors="ignore")
response, current_user = handle_command(db_conn, raw, current_user)
client_sock.sendall(response.encode("utf-8"))
if response == "BYE":
break
finally:
client_sock.close()
print(f" [子进程 {multiprocessing.current_process().pid}] 连接断开 {addr}")
# ─── 服务端主进程 ──────────────────────────────────────────────────────────────
def run_server() -> None:
db_conn = build_db()
server_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
server_sock.bind((HOST, PORT))
server_sock.listen(5)
print(f"🚀 词典服务器启动:{HOST}:{PORT} (Ctrl+C 停止)")
print(f" 词库:{len(WORD_SEED)} 个词条 | 数据库:{DB_PATH}")
try:
while True:
client_sock, addr = server_sock.accept()
# 每个连接 fork 一个子进程
proc = multiprocessing.Process(
target=handle_client,
args=(client_sock, addr, db_conn),
daemon=True,
)
proc.start()
client_sock.close() # 主进程关闭副本,子进程持有原始 fd
except KeyboardInterrupt:
print("\n服务器已停止。")
finally:
server_sock.close()
# ─── 客户端(脚本化模式)──────────────────────────────────────────────────────
def run_client(commands: list[str]) -> None:
"""按给定指令列表顺序发送请求,避免教程依赖人工交互输入。"""
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
try:
sock.connect((HOST, PORT))
except ConnectionRefusedError:
print(f"ERR: 无法连接 {HOST}:{PORT},请先启动服务器。")
sys.exit(1)
print(f"✅ 已连接 {HOST}:{PORT}")
print("指令:R:<用户名>:<密码> L:<用户名>:<密码> Q:<单词> H E")
try:
for cmd in commands:
cmd = cmd.strip()
if not cmd:
continue
print(f">>> {cmd}")
sock.sendall(cmd.encode("utf-8"))
resp = sock.recv(BUF).decode("utf-8")
print(f" {resp}")
if resp == "BYE":
break
except KeyboardInterrupt:
print("客户端已中断。")
finally:
sock.close()
# ─── 自动演示(demo 模式)─────────────────────────────────────────────────────
def run_demo() -> None:
"""在同一进程内启动服务器线程 + 模拟客户端,展示完整交互流程。"""
db_conn = build_db()
# 服务器线程
server_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
try:
server_sock.bind((HOST, PORT))
except OSError as exc:
print(f"[降级] 当前环境不允许绑定本地端口:{exc}")
print("[降级] 改用离线协议模拟,仍然覆盖注册、登录、查词、历史四个闭环。")
run_offline_demo()
return
server_sock.listen(5)
server_sock.settimeout(3.0)
def _server_loop() -> None:
while True:
try:
client_sock, addr = server_sock.accept()
except socket.timeout:
break
t = threading.Thread(
target=handle_client, args=(client_sock, addr, db_conn), daemon=True
)
t.start()
srv_thread = threading.Thread(target=_server_loop, daemon=True)
srv_thread.start()
time.sleep(0.1)
# 模拟客户端会话
def session(steps: list[str]) -> None:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect((HOST, PORT))
for cmd in steps:
sock.sendall(cmd.encode())
resp = sock.recv(BUF).decode()
print(f" 发送: {cmd:<30} 响应: {resp}")
sock.close()
print("=" * 60)
print(" 网络词典 — 自动演示")
print("=" * 60)
print("\n[场景1] 注册 + 登录 + 查词 + 历史")
session([
"R:alice:pass123",
"L:alice:pass123",
"Q:python",
"Q:socket",
"Q:recursion",
"H",
"E",
])
print("\n[场景2] 重复注册 + 密码错误")
session([
"R:alice:pass123",
"L:alice:wrongpwd",
"E",
])
print("\n[场景3] 未登录直接查词")
session([
"Q:python",
"E",
])
print("\n[场景4] 查不存在的词")
session([
"L:alice:pass123",
"Q:blockchain",
"E",
])
srv_thread.join(timeout=4)
server_sock.close()
print("\n✅ 演示完成")
def run_offline_demo() -> None:
"""无端口权限时的离线演示:直接调用协议解析层,保证 python main.py 不报错。"""
db_conn = build_db()
current_user: Optional[str] = None
steps = [
"R:alice:pass123",
"L:alice:pass123",
"Q:python",
"Q:socket",
"Q:recursion",
"H",
"E",
]
print("=" * 60)
print(" 网络词典 — 离线协议演示")
print("=" * 60)
for cmd in steps:
resp, current_user = handle_command(db_conn, cmd, current_user)
print(f" 发送: {cmd:<30} 响应: {resp}")
print("\n✅ 离线演示完成")
# ─── 入口 ─────────────────────────────────────────────────────────────────────
def main() -> None:
parser = argparse.ArgumentParser(description="网络词典(TCP + 多进程 + SQLite)")
parser.add_argument(
"--mode",
choices=["server", "client", "demo"],
default="demo",
help="server=启动服务器 | client=交互客户端 | demo=自动演示(默认)",
)
parser.add_argument(
"--commands",
default="R:bob:pw,L:bob:pw,Q:python,H,E",
help="client 模式下要顺序发送的逗号分隔指令,避免人工交互输入",
)
args = parser.parse_args()
if args.mode == "server":
run_server()
elif args.mode == "client":
run_client(args.commands.split(","))
else:
run_demo()
if __name__ == "__main__":
main()
$ python 55-python-project-dict.py --mode demo
[降级] 当前环境不允许绑定本地端口:[Errno 1] Operation not permitted
[降级] 改用离线协议模拟,仍然覆盖注册、登录、查词、历史四个闭环。
============================================================
网络词典 — 离线协议演示
============================================================
发送: R:alice:pass123 响应: OK:注册成功
发送: L:alice:pass123 响应: OK:登录成功
发送: Q:python 响应: OK:A high-level, interpreted programming language known for readability.
发送: Q:socket 响应: OK:An endpoint for network communication between two processes.
发送: Q:recursion 响应: OK:A function that calls itself to solve a smaller instance of the same problem.
发送: H 响应: OK:python|socket|recursion
发送: E 响应: BYE
✅ 离线演示完成
小结与 NexDo Time ⚡
这一篇你完成了一个端到端网络词典:SQLite 管数据,文本协议管命令,Socket 管传输,多进程管并发,CLI 管运行模式。更重要的是,你学会了把综合项目拆成可独立验证的小层次,而不是一次性调试一坨系统。
5 分钟微操挑战:给 WORD_SEED 增加一个新词,比如 asyncio,然后运行 --mode demo,确认 Q:asyncio 能返回你写入的释义。
Don’t wait for next time, do it in the next moment.