16 · 数据库底座:SQLite 核心操作与 Python 接入
🔗 知识图谱导航:阅读本文前,建议先回顾《8 · 模块化构建:标准库与自定义包》中的脚本组织方式,以及《9 · 文件系统与异常:构建健壮的 IO 流》里的
with上下文管理器和异常边界。本文会把“文件持久化”升级为“表结构持久化”,让数据真正进入关系型数据库。
极客解析:数据库学习的核心不是背 SQL,而是理解数据建模、事务边界和查询代价。
sqlite3是 Python 内置的零安装关系型数据库驱动,用它把 CRUD、参数化查询、事务三件套打通,就掌握了所有关系型数据库的共同底座。
痛点与架构:新手学数据库最常见的两个坑:① 忘记
commit()导致数据写了又丢;② 用字符串拼接 SQL 导致注入漏洞。本文用get_db上下文管理器消灭第一个坑,用?占位符消灭第二个坑,再用with conn:把事务原子性讲透。
关系型数据库核心概念
┌─────────────────────────────────────────────────────────┐
│ 关系型数据库 │
│ │
│ 表(Table) = 二维结构 │
│ ┌──────┬──────────┬────────────┬──────────┐ │
│ │ id │ title │ status │ created │ │
│ ├──────┼──────────┼────────────┼──────────┤ │
│ │ 1 │ 写文档 │ done │ 2026-... │ ← 行(Row)│
│ │ 2 │ 代码审查 │ pending │ 2026-... │ │
│ └──────┴──────────┴────────────┴──────────┘ │
│ ↑ │
│ 列(Column)/字段(Field) │
│ │
│ 主键(PK):唯一标识每行,通常是自增 id │
│ 外键(FK):引用另一张表的主键,建立关联 │
│ 索引(Index):加速查询,代价是写入变慢 │
└─────────────────────────────────────────────────────────┘
SQL 核心语句速查
-- 建表
CREATE TABLE tasks (
id INTEGER PRIMARY KEY AUTOINCREMENT,
title TEXT NOT NULL,
status TEXT DEFAULT 'pending',
created TEXT DEFAULT (datetime('now'))
);
-- 插入
INSERT INTO tasks (title, status) VALUES ('写文档', 'pending');
-- 查询
SELECT id, title, status FROM tasks WHERE status = 'pending' ORDER BY id;
-- 更新
UPDATE tasks SET status = 'done' WHERE id = 1;
-- 删除
DELETE FROM tasks WHERE id = 1;
-- 聚合
SELECT status, COUNT(*) as cnt FROM tasks GROUP BY status;
参数化查询(防 SQL 注入)
import sqlite3
conn = sqlite3.connect(":memory:")
conn.execute("CREATE TABLE tasks (title TEXT)")
conn.execute("INSERT INTO tasks VALUES (?)", ("hello",))
cursor = conn.cursor()
user_input = "hello"
cursor.execute("SELECT * FROM tasks WHERE title = ?", (user_input,))
print("安全查询结果:", cursor.fetchall())
conn.close()
sqlite3 vs MySQL 核心差异
┌──────────────┬──────────────────┬──────────────────┐
│ 特性 │ SQLite │ MySQL │
├──────────────┼──────────────────┼──────────────────┤
│ 安装 │ Python内置,零安装│ 需独立安装 │
│ 连接方式 │ 文件/内存 │ TCP/Socket │
│ 并发写 │ 单写多读 │ 高并发读写 │
│ 数据类型 │ 动态类型 │ 严格类型 │
│ 适合场景 │ 开发/嵌入/测试 │ 生产/高并发 │
│ Python驱动 │ sqlite3(内置) │ pymysql/mysqlclient│
└──────────────┴──────────────────┴──────────────────┘
步步为营:核心逻辑自适应拆解
这一篇先把结论说清楚:sqlite3 是 Python 内置的关系型数据库驱动,零安装即可使用;get_db 上下文管理器负责连接生命周期;? 占位符是参数化查询的标准写法,既防注入又防格式错误;with conn: 是事务的快捷方式。下面每一步都聚焦一个概念,跑完就能看到结果。
Step 1:用 get_db 上下文管理器安全管理数据库连接
痛点与机制:
想象你去图书馆借书:进门取号(打开连接)→ 看完还书(提交)→ 出门还号(关闭连接)。如果中途出了意外,图书馆会自动帮你还书(回滚+关闭)。get_db 就是这个"自动还书机制"——@contextmanager 把"打开→提交→关闭"三件事收进 with 语句,调用方永远不会忘记提交或泄漏连接。conn.row_factory = sqlite3.Row 让查询结果支持 row["列名"] 访问,比 row[0] 更可读。
核心源码(逐字来自文末完整源码):
@contextmanager
def get_db(path: str = ":memory:") -> Generator[sqlite3.Connection, None, None]:
"""上下文管理器:自动提交/回滚,自动关闭"""
conn = sqlite3.connect(path)
conn.row_factory = sqlite3.Row # 让结果支持列名访问
conn.execute("PRAGMA journal_mode=WAL") # 提升并发写性能
try:
yield conn
conn.commit()
except Exception:
conn.rollback()
raise
finally:
conn.close()
可运行演示(补齐 Mock 数据与 print 反馈):
# 文件名: 16-python-mysql-basic.py
# 运行: python3 16-python-mysql-basic.py --mode crud
# python3 16-python-mysql-basic.py --mode transaction
# python3 16-python-mysql-basic.py --mode injection
import argparse
import sqlite3
from contextlib import contextmanager
from datetime import datetime
from typing import Generator
# get_db 是整篇代码的"安全阀":
# 正常退出 with 块 → 自动 commit;抛出异常 → 自动 rollback;最终 → 自动 close。
# 调用方只需关注业务逻辑,连接泄漏和忘记提交的问题都消失了。
@contextmanager
def get_db(path: str = ":memory:") -> Generator[sqlite3.Connection, None, None]:
"""上下文管理器:自动提交/回滚,自动关闭"""
conn = sqlite3.connect(path)
conn.row_factory = sqlite3.Row # 让结果支持列名访问
conn.execute("PRAGMA journal_mode=WAL") # 提升并发写性能
try:
yield conn
conn.commit()
except Exception:
conn.rollback()
raise
finally:
conn.close()
with get_db(":memory:") as conn:
conn.execute("CREATE TABLE demo (val TEXT)")
conn.execute("INSERT INTO demo VALUES (?)", ("hello sqlite",))
row = conn.execute("SELECT val FROM demo").fetchone()
print("连接管理器演示:", row["val"])
print("row_factory 已启用,支持列名访问:", type(row).__name__)
Step 2:用 SCHEMA 定义表结构,用 setup_db 建表并写入种子数据
痛点与机制:
SCHEMA 里的 CHECK 约束就像数据库的"门卫":即使 Python 代码有 bug 传入了 "urgent" 这种非法优先级,门卫会直接拒之门外,不需要在 Python 里写一堆 if/else 校验。executemany 是批量插入的正确姿势——它只编译一次 SQL,然后用不同参数重复执行,比循环调用 execute 快得多。SEED_DATA 的类型注解 list[tuple[str, str, str, str]] 明确了每条记录的字段顺序,避免列对齐出错。
核心源码(逐字来自文末完整源码):
SCHEMA = """
CREATE TABLE IF NOT EXISTS tasks (
id INTEGER PRIMARY KEY AUTOINCREMENT,
title TEXT NOT NULL,
priority TEXT CHECK(priority IN ('low','medium','high')) DEFAULT 'medium',
status TEXT CHECK(status IN ('pending','doing','done')) DEFAULT 'pending',
assignee TEXT,
created_at TEXT DEFAULT (datetime('now','localtime')),
updated_at TEXT DEFAULT (datetime('now','localtime'))
);
"""
SEED_DATA: list[tuple[str, str, str, str]] = [
("设计数据库 Schema", "high", "doing", "alice"),
("编写 API 接口文档", "medium", "pending", "bob"),
("实现用户认证模块", "high", "done", "alice"),
("前端登录页面", "medium", "doing", "carol"),
("部署到测试环境", "low", "pending", "bob"),
("性能压测", "high", "pending", "alice"),
("修复 #42 Bug", "high", "done", "carol"),
("更新 CHANGELOG", "low", "done", "bob"),
]
def setup_db(conn: sqlite3.Connection) -> None:
conn.executescript(SCHEMA)
conn.executemany(
"INSERT INTO tasks (title, priority, status, assignee) VALUES (?,?,?,?)",
SEED_DATA,
)
conn.commit()
可运行演示(补齐 Mock 数据与 print 反馈):
# 文件名: 16-python-mysql-basic.py
# 运行: python3 16-python-mysql-basic.py --mode crud
# python3 16-python-mysql-basic.py --mode transaction
# python3 16-python-mysql-basic.py --mode injection
import argparse
import sqlite3
from contextlib import contextmanager
from datetime import datetime
from typing import Generator
# CHECK 约束是数据库层面的防线,不依赖 Python 代码的正确性。
# executemany 批量插入:只编译一次 SQL,用不同参数重复执行,比循环 execute 更高效。
SCHEMA = """
CREATE TABLE IF NOT EXISTS tasks (
id INTEGER PRIMARY KEY AUTOINCREMENT,
title TEXT NOT NULL,
priority TEXT CHECK(priority IN ('low','medium','high')) DEFAULT 'medium',
status TEXT CHECK(status IN ('pending','doing','done')) DEFAULT 'pending',
assignee TEXT,
created_at TEXT DEFAULT (datetime('now','localtime')),
updated_at TEXT DEFAULT (datetime('now','localtime'))
);
"""
SEED_DATA: list[tuple[str, str, str, str]] = [
("设计数据库 Schema", "high", "doing", "alice"),
("编写 API 接口文档", "medium", "pending", "bob"),
("实现用户认证模块", "high", "done", "alice"),
("前端登录页面", "medium", "doing", "carol"),
("部署到测试环境", "low", "pending", "bob"),
("性能压测", "high", "pending", "alice"),
("修复 #42 Bug", "high", "done", "carol"),
("更新 CHANGELOG", "low", "done", "bob"),
]
def setup_db(conn: sqlite3.Connection) -> None:
conn.executescript(SCHEMA)
conn.executemany(
"INSERT INTO tasks (title, priority, status, assignee) VALUES (?,?,?,?)",
SEED_DATA,
)
conn.commit()
import sqlite3
conn = sqlite3.connect(":memory:")
conn.row_factory = sqlite3.Row
setup_db(conn)
count = conn.execute("SELECT COUNT(*) FROM tasks").fetchone()[0]
print(f"建表成功,种子数据共 {count} 条")
try:
conn.execute("INSERT INTO tasks (title, priority) VALUES (?, ?)", ("测试", "urgent"))
conn.commit()
print("CHECK 约束未生效(不应出现)")
except Exception as e:
print(f"CHECK 约束生效,非法值被拒绝: {type(e).__name__}")
conn.close()
Step 3:用 create_task 新增任务,理解 lastrowid 和参数化查询
痛点与机制:
? 占位符是 SQL 注入的"防弹衣":驱动会把参数值转义后再传给数据库,用户输入的 ' OR '1'='1 这类恶意字符串永远只是"数据",不会被解析成 SQL 语法。lastrowid 是刚插入行的自增主键,就像快递单号——插入完立刻拿到编号,可以直接返回给前端或做后续关联操作。
核心源码(逐字来自文末完整源码):
def create_task(conn: sqlite3.Connection, title: str, priority: str, assignee: str) -> int:
cur = conn.execute(
"INSERT INTO tasks (title, priority, assignee) VALUES (?,?,?)",
(title, priority, assignee),
)
conn.commit()
return cur.lastrowid or 0
可运行演示(补齐 Mock 数据与 print 反馈):
# 文件名: 16-python-mysql-basic.py
# 运行: python3 16-python-mysql-basic.py --mode crud
# python3 16-python-mysql-basic.py --mode transaction
# python3 16-python-mysql-basic.py --mode injection
import argparse
import sqlite3
from contextlib import contextmanager
from datetime import datetime
from typing import Generator
@contextmanager
def get_db(path: str = ":memory:") -> Generator[sqlite3.Connection, None, None]:
"""上下文管理器:自动提交/回滚,自动关闭"""
conn = sqlite3.connect(path)
conn.row_factory = sqlite3.Row # 让结果支持列名访问
conn.execute("PRAGMA journal_mode=WAL") # 提升并发写性能
try:
yield conn
conn.commit()
except Exception:
conn.rollback()
raise
finally:
conn.close()
SCHEMA = """
CREATE TABLE IF NOT EXISTS tasks (
id INTEGER PRIMARY KEY AUTOINCREMENT,
title TEXT NOT NULL,
priority TEXT CHECK(priority IN ('low','medium','high')) DEFAULT 'medium',
status TEXT CHECK(status IN ('pending','doing','done')) DEFAULT 'pending',
assignee TEXT,
created_at TEXT DEFAULT (datetime('now','localtime')),
updated_at TEXT DEFAULT (datetime('now','localtime'))
);
"""
SEED_DATA: list[tuple[str, str, str, str]] = [
("设计数据库 Schema", "high", "doing", "alice"),
("编写 API 接口文档", "medium", "pending", "bob"),
("实现用户认证模块", "high", "done", "alice"),
("前端登录页面", "medium", "doing", "carol"),
("部署到测试环境", "low", "pending", "bob"),
("性能压测", "high", "pending", "alice"),
("修复 #42 Bug", "high", "done", "carol"),
("更新 CHANGELOG", "low", "done", "bob"),
]
def setup_db(conn: sqlite3.Connection) -> None:
conn.executescript(SCHEMA)
conn.executemany(
"INSERT INTO tasks (title, priority, status, assignee) VALUES (?,?,?,?)",
SEED_DATA,
)
conn.commit()
# ? 占位符让驱动把参数作为纯数据处理,不解析为 SQL 语法——这是防注入的根本。
# lastrowid 是刚插入行的自增 id,可以立刻拿来做后续操作(比如返回给前端)。
def create_task(conn: sqlite3.Connection, title: str, priority: str, assignee: str) -> int:
cur = conn.execute(
"INSERT INTO tasks (title, priority, assignee) VALUES (?,?,?)",
(title, priority, assignee),
)
conn.commit()
return cur.lastrowid or 0
import sqlite3
conn = sqlite3.connect(":memory:")
conn.row_factory = sqlite3.Row
setup_db(conn)
new_id = create_task(conn, "撰写技术博客", "medium", "dave")
print(f"新增任务 ID: {new_id}")
row = conn.execute("SELECT title, priority, assignee FROM tasks WHERE id=?", (new_id,)).fetchone()
print(f"写入验证: title={row['title']}, priority={row['priority']}, assignee={row['assignee']}")
conn.close()
Step 4:用 read_tasks 查询任务,用 print_tasks 格式化展示
痛点与机制:
read_tasks 用条件分支决定是否加 WHERE 子句,避免了字符串拼接 SQL 的风险。sqlite3.Row 就像一个"双模遥控器":既能用下标 row[0] 访问,也能用列名 row["status"] 访问,后者在列顺序变化时不会出 bug。print_tasks 用 f-string 的 :<N 对齐格式让表格整齐,priority_map 和 status_map 把数据库里的英文枚举值翻译成带 emoji 的中文,让终端输出一眼可辨。
核心源码(逐字来自文末完整源码):
def read_tasks(conn: sqlite3.Connection, status: str | None = None) -> list[sqlite3.Row]:
if status:
return conn.execute(
"SELECT * FROM tasks WHERE status=? ORDER BY priority DESC, id", (status,)
).fetchall()
return conn.execute("SELECT * FROM tasks ORDER BY id").fetchall()
def update_task_status(conn: sqlite3.Connection, task_id: int, new_status: str) -> int:
cur = conn.execute(
"UPDATE tasks SET status=?, updated_at=datetime('now','localtime') WHERE id=?",
(new_status, task_id),
)
conn.commit()
return cur.rowcount
def delete_task(conn: sqlite3.Connection, task_id: int) -> int:
cur = conn.execute("DELETE FROM tasks WHERE id=?", (task_id,))
conn.commit()
return cur.rowcount
def print_tasks(rows: list[sqlite3.Row], title: str = "任务列表") -> None:
print(f"\n=== {title} ===")
print(f"{'ID':<4} {'标题':<22} {'优先级':<8} {'状态':<10} {'负责人':<8}")
print("-" * 60)
priority_map = {"high": "🔴高", "medium": "🟡中", "low": "🟢低"}
status_map = {"pending": "待处理", "doing": "进行中", "done": "已完成"}
for row in rows:
p = priority_map.get(row["priority"], row["priority"])
s = status_map.get(row["status"], row["status"])
print(f"{row['id']:<4} {row['title']:<22} {p:<8} {s:<10} {row['assignee'] or '-':<8}")
print(f"共 {len(rows)} 条")
可运行演示(补齐 Mock 数据与 print 反馈):
# 文件名: 16-python-mysql-basic.py
# 运行: python3 16-python-mysql-basic.py --mode crud
# python3 16-python-mysql-basic.py --mode transaction
# python3 16-python-mysql-basic.py --mode injection
import argparse
import sqlite3
from contextlib import contextmanager
from datetime import datetime
from typing import Generator
@contextmanager
def get_db(path: str = ":memory:") -> Generator[sqlite3.Connection, None, None]:
"""上下文管理器:自动提交/回滚,自动关闭"""
conn = sqlite3.connect(path)
conn.row_factory = sqlite3.Row # 让结果支持列名访问
conn.execute("PRAGMA journal_mode=WAL") # 提升并发写性能
try:
yield conn
conn.commit()
except Exception:
conn.rollback()
raise
finally:
conn.close()
SCHEMA = """
CREATE TABLE IF NOT EXISTS tasks (
id INTEGER PRIMARY KEY AUTOINCREMENT,
title TEXT NOT NULL,
priority TEXT CHECK(priority IN ('low','medium','high')) DEFAULT 'medium',
status TEXT CHECK(status IN ('pending','doing','done')) DEFAULT 'pending',
assignee TEXT,
created_at TEXT DEFAULT (datetime('now','localtime')),
updated_at TEXT DEFAULT (datetime('now','localtime'))
);
"""
SEED_DATA: list[tuple[str, str, str, str]] = [
("设计数据库 Schema", "high", "doing", "alice"),
("编写 API 接口文档", "medium", "pending", "bob"),
("实现用户认证模块", "high", "done", "alice"),
("前端登录页面", "medium", "doing", "carol"),
("部署到测试环境", "low", "pending", "bob"),
("性能压测", "high", "pending", "alice"),
("修复 #42 Bug", "high", "done", "carol"),
("更新 CHANGELOG", "low", "done", "bob"),
]
def setup_db(conn: sqlite3.Connection) -> None:
conn.executescript(SCHEMA)
conn.executemany(
"INSERT INTO tasks (title, priority, status, assignee) VALUES (?,?,?,?)",
SEED_DATA,
)
conn.commit()
# read_tasks 不传 status → 查全部;传 status → 按状态筛选。
# sqlite3.Row 同时支持下标和列名访问,row["status"] 比 row[3] 更不容易出错。
def read_tasks(conn: sqlite3.Connection, status: str | None = None) -> list[sqlite3.Row]:
if status:
return conn.execute(
"SELECT * FROM tasks WHERE status=? ORDER BY priority DESC, id", (status,)
).fetchall()
return conn.execute("SELECT * FROM tasks ORDER BY id").fetchall()
def update_task_status(conn: sqlite3.Connection, task_id: int, new_status: str) -> int:
cur = conn.execute(
"UPDATE tasks SET status=?, updated_at=datetime('now','localtime') WHERE id=?",
(new_status, task_id),
)
conn.commit()
return cur.rowcount
def delete_task(conn: sqlite3.Connection, task_id: int) -> int:
cur = conn.execute("DELETE FROM tasks WHERE id=?", (task_id,))
conn.commit()
return cur.rowcount
def print_tasks(rows: list[sqlite3.Row], title: str = "任务列表") -> None:
print(f"\n=== {title} ===")
print(f"{'ID':<4} {'标题':<22} {'优先级':<8} {'状态':<10} {'负责人':<8}")
print("-" * 60)
priority_map = {"high": "🔴高", "medium": "🟡中", "low": "🟢低"}
status_map = {"pending": "待处理", "doing": "进行中", "done": "已完成"}
for row in rows:
p = priority_map.get(row["priority"], row["priority"])
s = status_map.get(row["status"], row["status"])
print(f"{row['id']:<4} {row['title']:<22} {p:<8} {s:<10} {row['assignee'] or '-':<8}")
print(f"共 {len(rows)} 条")
import sqlite3
conn = sqlite3.connect(":memory:")
conn.row_factory = sqlite3.Row
setup_db(conn)
all_tasks = read_tasks(conn)
print_tasks(all_tasks, "全部任务")
pending = read_tasks(conn, "pending")
print(f"\n待处理任务数量: {len(pending)}")
conn.close()
Step 5:用 update_task_status 更新状态,用 delete_task 删除任务
痛点与机制:
rowcount 是写操作的"回执":如果 WHERE id=? 没有匹配到任何行,rowcount 会是 0——就像快递投递失败返回"无此地址"。调用方可以据此返回 404 或提示"任务不存在",而不是默默地什么都没发生。update_task_status 在 SQL 里直接用 datetime('now','localtime') 更新时间戳,比在 Python 里生成时间再传入更简洁,也避免了时区处理的麻烦。
核心源码(逐字来自文末完整源码):
def update_task_status(conn: sqlite3.Connection, task_id: int, new_status: str) -> int:
cur = conn.execute(
"UPDATE tasks SET status=?, updated_at=datetime('now','localtime') WHERE id=?",
(new_status, task_id),
)
conn.commit()
return cur.rowcount
def delete_task(conn: sqlite3.Connection, task_id: int) -> int:
cur = conn.execute("DELETE FROM tasks WHERE id=?", (task_id,))
conn.commit()
return cur.rowcount
可运行演示(补齐 Mock 数据与 print 反馈):
# 文件名: 16-python-mysql-basic.py
# 运行: python3 16-python-mysql-basic.py --mode crud
# python3 16-python-mysql-basic.py --mode transaction
# python3 16-python-mysql-basic.py --mode injection
import argparse
import sqlite3
from contextlib import contextmanager
from datetime import datetime
from typing import Generator
@contextmanager
def get_db(path: str = ":memory:") -> Generator[sqlite3.Connection, None, None]:
"""上下文管理器:自动提交/回滚,自动关闭"""
conn = sqlite3.connect(path)
conn.row_factory = sqlite3.Row # 让结果支持列名访问
conn.execute("PRAGMA journal_mode=WAL") # 提升并发写性能
try:
yield conn
conn.commit()
except Exception:
conn.rollback()
raise
finally:
conn.close()
SCHEMA = """
CREATE TABLE IF NOT EXISTS tasks (
id INTEGER PRIMARY KEY AUTOINCREMENT,
title TEXT NOT NULL,
priority TEXT CHECK(priority IN ('low','medium','high')) DEFAULT 'medium',
status TEXT CHECK(status IN ('pending','doing','done')) DEFAULT 'pending',
assignee TEXT,
created_at TEXT DEFAULT (datetime('now','localtime')),
updated_at TEXT DEFAULT (datetime('now','localtime'))
);
"""
SEED_DATA: list[tuple[str, str, str, str]] = [
("设计数据库 Schema", "high", "doing", "alice"),
("编写 API 接口文档", "medium", "pending", "bob"),
("实现用户认证模块", "high", "done", "alice"),
("前端登录页面", "medium", "doing", "carol"),
("部署到测试环境", "low", "pending", "bob"),
("性能压测", "high", "pending", "alice"),
("修复 #42 Bug", "high", "done", "carol"),
("更新 CHANGELOG", "low", "done", "bob"),
]
def setup_db(conn: sqlite3.Connection) -> None:
conn.executescript(SCHEMA)
conn.executemany(
"INSERT INTO tasks (title, priority, status, assignee) VALUES (?,?,?,?)",
SEED_DATA,
)
conn.commit()
# rowcount 是写操作的"回执":0 = WHERE 条件没有命中任何行。
# 用它判断"任务是否存在",比先 SELECT 再 UPDATE 少一次数据库往返。
def update_task_status(conn: sqlite3.Connection, task_id: int, new_status: str) -> int:
cur = conn.execute(
"UPDATE tasks SET status=?, updated_at=datetime('now','localtime') WHERE id=?",
(new_status, task_id),
)
conn.commit()
return cur.rowcount
def delete_task(conn: sqlite3.Connection, task_id: int) -> int:
cur = conn.execute("DELETE FROM tasks WHERE id=?", (task_id,))
conn.commit()
return cur.rowcount
import sqlite3
conn = sqlite3.connect(":memory:")
conn.row_factory = sqlite3.Row
setup_db(conn)
affected = update_task_status(conn, 1, "done")
print(f"更新任务#1 → done,影响行数: {affected}")
affected_none = update_task_status(conn, 999, "done")
print(f"更新不存在的任务#999,影响行数: {affected_none}(0=未找到)")
deleted = delete_task(conn, 1)
print(f"删除任务#1,影响行数: {deleted}")
remaining = conn.execute("SELECT COUNT(*) FROM tasks").fetchone()[0]
print(f"剩余任务数: {remaining}")
conn.close()
Step 6:用 demo_crud 串联完整 CRUD 流程,看增删改查的完整闭环
痛点与机制:
demo_crud 是一个完整的"剧本":用内存数据库(:memory:)跑完整个 CRUD 流程,进程结束后数据自动消失,不会留下测试文件。聚合查询里的 COUNT(DISTINCT assignee) 是 SQL 的常用技巧——就像统计一场会议"来了多少人、来自多少个部门",一条 SQL 同时得到两个维度,比分两次查询更高效。
核心源码(逐字来自文末完整源码):
def demo_crud() -> None:
conn = sqlite3.connect(":memory:")
conn.row_factory = sqlite3.Row
setup_db(conn)
# 查询全部
print_tasks(read_tasks(conn), "初始任务列表")
# 新增
new_id = create_task(conn, "撰写技术博客", "medium", "dave")
print(f"\n✓ 新增任务 ID={new_id}")
# 更新
affected = update_task_status(conn, 2, "done")
print(f"✓ 更新任务#2状态 → done,影响 {affected} 行")
# 按状态筛选
print_tasks(read_tasks(conn, "pending"), "待处理任务")
# 聚合统计
print("\n=== 按状态统计 ===")
print(f"{'状态':<10} {'数量':<6} {'负责人数'}")
print("-" * 30)
for row in conn.execute(
"SELECT status, COUNT(*) as cnt, COUNT(DISTINCT assignee) as people "
"FROM tasks GROUP BY status ORDER BY cnt DESC"
):
print(f"{row['status']:<10} {row['cnt']:<6} {row['people']}")
# 删除
delete_task(conn, new_id)
print(f"\n✓ 删除任务#{new_id}")
print(f"剩余任务数: {conn.execute('SELECT COUNT(*) FROM tasks').fetchone()[0]}")
conn.close()
可运行演示(补齐 Mock 数据与 print 反馈):
# 文件名: 16-python-mysql-basic.py
# 运行: python3 16-python-mysql-basic.py --mode crud
# python3 16-python-mysql-basic.py --mode transaction
# python3 16-python-mysql-basic.py --mode injection
import argparse
import sqlite3
from contextlib import contextmanager
from datetime import datetime
from typing import Generator
@contextmanager
def get_db(path: str = ":memory:") -> Generator[sqlite3.Connection, None, None]:
"""上下文管理器:自动提交/回滚,自动关闭"""
conn = sqlite3.connect(path)
conn.row_factory = sqlite3.Row # 让结果支持列名访问
conn.execute("PRAGMA journal_mode=WAL") # 提升并发写性能
try:
yield conn
conn.commit()
except Exception:
conn.rollback()
raise
finally:
conn.close()
SCHEMA = """
CREATE TABLE IF NOT EXISTS tasks (
id INTEGER PRIMARY KEY AUTOINCREMENT,
title TEXT NOT NULL,
priority TEXT CHECK(priority IN ('low','medium','high')) DEFAULT 'medium',
status TEXT CHECK(status IN ('pending','doing','done')) DEFAULT 'pending',
assignee TEXT,
created_at TEXT DEFAULT (datetime('now','localtime')),
updated_at TEXT DEFAULT (datetime('now','localtime'))
);
"""
SEED_DATA: list[tuple[str, str, str, str]] = [
("设计数据库 Schema", "high", "doing", "alice"),
("编写 API 接口文档", "medium", "pending", "bob"),
("实现用户认证模块", "high", "done", "alice"),
("前端登录页面", "medium", "doing", "carol"),
("部署到测试环境", "low", "pending", "bob"),
("性能压测", "high", "pending", "alice"),
("修复 #42 Bug", "high", "done", "carol"),
("更新 CHANGELOG", "low", "done", "bob"),
]
def setup_db(conn: sqlite3.Connection) -> None:
conn.executescript(SCHEMA)
conn.executemany(
"INSERT INTO tasks (title, priority, status, assignee) VALUES (?,?,?,?)",
SEED_DATA,
)
conn.commit()
def create_task(conn: sqlite3.Connection, title: str, priority: str, assignee: str) -> int:
cur = conn.execute(
"INSERT INTO tasks (title, priority, assignee) VALUES (?,?,?)",
(title, priority, assignee),
)
conn.commit()
return cur.lastrowid or 0
def read_tasks(conn: sqlite3.Connection, status: str | None = None) -> list[sqlite3.Row]:
if status:
return conn.execute(
"SELECT * FROM tasks WHERE status=? ORDER BY priority DESC, id", (status,)
).fetchall()
return conn.execute("SELECT * FROM tasks ORDER BY id").fetchall()
def update_task_status(conn: sqlite3.Connection, task_id: int, new_status: str) -> int:
cur = conn.execute(
"UPDATE tasks SET status=?, updated_at=datetime('now','localtime') WHERE id=?",
(new_status, task_id),
)
conn.commit()
return cur.rowcount
def delete_task(conn: sqlite3.Connection, task_id: int) -> int:
cur = conn.execute("DELETE FROM tasks WHERE id=?", (task_id,))
conn.commit()
return cur.rowcount
def print_tasks(rows: list[sqlite3.Row], title: str = "任务列表") -> None:
print(f"\n=== {title} ===")
print(f"{'ID':<4} {'标题':<22} {'优先级':<8} {'状态':<10} {'负责人':<8}")
print("-" * 60)
priority_map = {"high": "🔴高", "medium": "🟡中", "low": "🟢低"}
status_map = {"pending": "待处理", "doing": "进行中", "done": "已完成"}
for row in rows:
p = priority_map.get(row["priority"], row["priority"])
s = status_map.get(row["status"], row["status"])
print(f"{row['id']:<4} {row['title']:<22} {p:<8} {s:<10} {row['assignee'] or '-':<8}")
print(f"共 {len(rows)} 条")
# demo_crud 是完整 CRUD 剧本:建表 → 查全部 → 新增 → 更新 → 筛选 → 聚合 → 删除。
# :memory: 内存数据库:进程结束自动消失,不留测试文件,适合演示和单元测试。
def demo_crud() -> None:
conn = sqlite3.connect(":memory:")
conn.row_factory = sqlite3.Row
setup_db(conn)
# 查询全部
print_tasks(read_tasks(conn), "初始任务列表")
# 新增
new_id = create_task(conn, "撰写技术博客", "medium", "dave")
print(f"\n✓ 新增任务 ID={new_id}")
# 更新
affected = update_task_status(conn, 2, "done")
print(f"✓ 更新任务#2状态 → done,影响 {affected} 行")
# 按状态筛选
print_tasks(read_tasks(conn, "pending"), "待处理任务")
# 聚合统计
print("\n=== 按状态统计 ===")
print(f"{'状态':<10} {'数量':<6} {'负责人数'}")
print("-" * 30)
for row in conn.execute(
"SELECT status, COUNT(*) as cnt, COUNT(DISTINCT assignee) as people "
"FROM tasks GROUP BY status ORDER BY cnt DESC"
):
print(f"{row['status']:<10} {row['cnt']:<6} {row['people']}")
# 删除
delete_task(conn, new_id)
print(f"\n✓ 删除任务#{new_id}")
print(f"剩余任务数: {conn.execute('SELECT COUNT(*) FROM tasks').fetchone()[0]}")
conn.close()
demo_crud()
Step 7:用 demo_transaction 理解事务原子性,看 with conn 如何自动提交和回滚
痛点与机制:
事务就像银行转账:从 A 账户扣钱和向 B 账户加钱必须同时成功,或者同时撤销——不能只扣不加。with conn: 是 sqlite3 内置的事务快捷方式:正常退出 with 块自动 commit,抛出异常自动 rollback。这比手动写 conn.commit() / conn.rollback() 更安全,不会因为忘记写而留下半截操作。
核心源码(逐字来自文末完整源码):
def demo_transaction() -> None:
conn = sqlite3.connect(":memory:")
conn.row_factory = sqlite3.Row
setup_db(conn)
print("\n=== 事务演示:批量状态迁移 ===")
print("场景:将所有 'doing' 任务批量标记为 'done'(原子操作)\n")
doing_before = conn.execute("SELECT COUNT(*) FROM tasks WHERE status='doing'").fetchone()[0]
print(f"迁移前 doing 数量: {doing_before}")
try:
with conn: # with conn 自动管理事务
conn.execute("UPDATE tasks SET status='done' WHERE status='doing'")
# 模拟中途检查
count = conn.execute("SELECT COUNT(*) FROM tasks WHERE status='done'").fetchone()[0]
print(f"事务中 done 数量: {count}")
# 若此处抛异常,事务自动回滚
print("✓ 事务提交成功")
except Exception as e:
print(f"✗ 事务回滚: {e}")
doing_after = conn.execute("SELECT COUNT(*) FROM tasks WHERE status='doing'").fetchone()[0]
done_after = conn.execute("SELECT COUNT(*) FROM tasks WHERE status='done'").fetchone()[0]
print(f"迁移后 doing 数量: {doing_after}")
print(f"迁移后 done 数量: {done_after}")
conn.close()
可运行演示(补齐 Mock 数据与 print 反馈):
# 文件名: 16-python-mysql-basic.py
# 运行: python3 16-python-mysql-basic.py --mode crud
# python3 16-python-mysql-basic.py --mode transaction
# python3 16-python-mysql-basic.py --mode injection
import argparse
import sqlite3
from contextlib import contextmanager
from datetime import datetime
from typing import Generator
@contextmanager
def get_db(path: str = ":memory:") -> Generator[sqlite3.Connection, None, None]:
"""上下文管理器:自动提交/回滚,自动关闭"""
conn = sqlite3.connect(path)
conn.row_factory = sqlite3.Row # 让结果支持列名访问
conn.execute("PRAGMA journal_mode=WAL") # 提升并发写性能
try:
yield conn
conn.commit()
except Exception:
conn.rollback()
raise
finally:
conn.close()
SCHEMA = """
CREATE TABLE IF NOT EXISTS tasks (
id INTEGER PRIMARY KEY AUTOINCREMENT,
title TEXT NOT NULL,
priority TEXT CHECK(priority IN ('low','medium','high')) DEFAULT 'medium',
status TEXT CHECK(status IN ('pending','doing','done')) DEFAULT 'pending',
assignee TEXT,
created_at TEXT DEFAULT (datetime('now','localtime')),
updated_at TEXT DEFAULT (datetime('now','localtime'))
);
"""
SEED_DATA: list[tuple[str, str, str, str]] = [
("设计数据库 Schema", "high", "doing", "alice"),
("编写 API 接口文档", "medium", "pending", "bob"),
("实现用户认证模块", "high", "done", "alice"),
("前端登录页面", "medium", "doing", "carol"),
("部署到测试环境", "low", "pending", "bob"),
("性能压测", "high", "pending", "alice"),
("修复 #42 Bug", "high", "done", "carol"),
("更新 CHANGELOG", "low", "done", "bob"),
]
def setup_db(conn: sqlite3.Connection) -> None:
conn.executescript(SCHEMA)
conn.executemany(
"INSERT INTO tasks (title, priority, status, assignee) VALUES (?,?,?,?)",
SEED_DATA,
)
conn.commit()
# with conn: 是事务的快捷方式:
# 正常退出 with 块 → 自动 commit
# 抛出异常 → 自动 rollback
# 比手动写 commit/rollback 更安全,不会忘记。
def demo_transaction() -> None:
conn = sqlite3.connect(":memory:")
conn.row_factory = sqlite3.Row
setup_db(conn)
print("\n=== 事务演示:批量状态迁移 ===")
print("场景:将所有 'doing' 任务批量标记为 'done'(原子操作)\n")
doing_before = conn.execute("SELECT COUNT(*) FROM tasks WHERE status='doing'").fetchone()[0]
print(f"迁移前 doing 数量: {doing_before}")
try:
with conn: # with conn 自动管理事务
conn.execute("UPDATE tasks SET status='done' WHERE status='doing'")
# 模拟中途检查
count = conn.execute("SELECT COUNT(*) FROM tasks WHERE status='done'").fetchone()[0]
print(f"事务中 done 数量: {count}")
# 若此处抛异常,事务自动回滚
print("✓ 事务提交成功")
except Exception as e:
print(f"✗ 事务回滚: {e}")
doing_after = conn.execute("SELECT COUNT(*) FROM tasks WHERE status='doing'").fetchone()[0]
done_after = conn.execute("SELECT COUNT(*) FROM tasks WHERE status='done'").fetchone()[0]
print(f"迁移后 doing 数量: {doing_after}")
print(f"迁移后 done 数量: {done_after}")
conn.close()
demo_transaction()
Step 8:用 demo_injection 演示参数化查询防注入,用 main 做 CLI 总入口
痛点与机制:
SQL 注入就像在点餐单上写"一份汉堡,顺便把厨房烧了"——如果服务员(数据库)直接执行字面意思,后果不堪设想。' OR '1'='1 是经典的注入 payload:直接拼入 SQL 会让 WHERE 条件永远为真,查出所有数据。参数化查询(? 占位符)让驱动把参数值转义后再传给数据库,输入内容永远只是"数据",不会被解析成 SQL 语法。main 用 argparse 做 CLI 入口,--mode 参数让读者不改代码就能切换演示场景。
核心源码(逐字来自文末完整源码):
def demo_injection() -> None:
conn = sqlite3.connect(":memory:")
conn.row_factory = sqlite3.Row
setup_db(conn)
malicious = "' OR '1'='1"
print(f"\n=== SQL注入防护演示 ===")
print(f"恶意输入: {malicious!r}\n")
# 参数化查询:安全
safe_result = conn.execute(
"SELECT COUNT(*) as cnt FROM tasks WHERE assignee=?", (malicious,)
).fetchone()
print(f"✅ 参数化查询结果: {safe_result['cnt']} 条(正确,无注入)")
# 展示参数化查询的 SQL 计划
print("\n参数化查询会将输入作为纯数据,不解析为 SQL 语法")
print("即使输入包含 SQL 关键字,也只会被当作字符串值匹配")
conn.close()
def main() -> None:
parser = argparse.ArgumentParser(description="SQLite CRUD 演示")
parser.add_argument(
"--mode",
choices=["crud", "transaction", "injection"],
default="crud",
help="crud=增删改查, transaction=事务, injection=注入防护",
)
args = parser.parse_args()
if args.mode == "crud":
demo_crud()
elif args.mode == "transaction":
demo_transaction()
else:
demo_injection()
可运行演示(补齐 Mock 数据与 print 反馈):
# 文件名: 16-python-mysql-basic.py
# 运行: python3 16-python-mysql-basic.py --mode crud
# python3 16-python-mysql-basic.py --mode transaction
# python3 16-python-mysql-basic.py --mode injection
import argparse
import sqlite3
from contextlib import contextmanager
from datetime import datetime
from typing import Generator
@contextmanager
def get_db(path: str = ":memory:") -> Generator[sqlite3.Connection, None, None]:
"""上下文管理器:自动提交/回滚,自动关闭"""
conn = sqlite3.connect(path)
conn.row_factory = sqlite3.Row # 让结果支持列名访问
conn.execute("PRAGMA journal_mode=WAL") # 提升并发写性能
try:
yield conn
conn.commit()
except Exception:
conn.rollback()
raise
finally:
conn.close()
SCHEMA = """
CREATE TABLE IF NOT EXISTS tasks (
id INTEGER PRIMARY KEY AUTOINCREMENT,
title TEXT NOT NULL,
priority TEXT CHECK(priority IN ('low','medium','high')) DEFAULT 'medium',
status TEXT CHECK(status IN ('pending','doing','done')) DEFAULT 'pending',
assignee TEXT,
created_at TEXT DEFAULT (datetime('now','localtime')),
updated_at TEXT DEFAULT (datetime('now','localtime'))
);
"""
SEED_DATA: list[tuple[str, str, str, str]] = [
("设计数据库 Schema", "high", "doing", "alice"),
("编写 API 接口文档", "medium", "pending", "bob"),
("实现用户认证模块", "high", "done", "alice"),
("前端登录页面", "medium", "doing", "carol"),
("部署到测试环境", "low", "pending", "bob"),
("性能压测", "high", "pending", "alice"),
("修复 #42 Bug", "high", "done", "carol"),
("更新 CHANGELOG", "low", "done", "bob"),
]
def setup_db(conn: sqlite3.Connection) -> None:
conn.executescript(SCHEMA)
conn.executemany(
"INSERT INTO tasks (title, priority, status, assignee) VALUES (?,?,?,?)",
SEED_DATA,
)
conn.commit()
def create_task(conn: sqlite3.Connection, title: str, priority: str, assignee: str) -> int:
cur = conn.execute(
"INSERT INTO tasks (title, priority, assignee) VALUES (?,?,?)",
(title, priority, assignee),
)
conn.commit()
return cur.lastrowid or 0
def read_tasks(conn: sqlite3.Connection, status: str | None = None) -> list[sqlite3.Row]:
if status:
return conn.execute(
"SELECT * FROM tasks WHERE status=? ORDER BY priority DESC, id", (status,)
).fetchall()
return conn.execute("SELECT * FROM tasks ORDER BY id").fetchall()
def update_task_status(conn: sqlite3.Connection, task_id: int, new_status: str) -> int:
cur = conn.execute(
"UPDATE tasks SET status=?, updated_at=datetime('now','localtime') WHERE id=?",
(new_status, task_id),
)
conn.commit()
return cur.rowcount
def delete_task(conn: sqlite3.Connection, task_id: int) -> int:
cur = conn.execute("DELETE FROM tasks WHERE id=?", (task_id,))
conn.commit()
return cur.rowcount
def print_tasks(rows: list[sqlite3.Row], title: str = "任务列表") -> None:
print(f"\n=== {title} ===")
print(f"{'ID':<4} {'标题':<22} {'优先级':<8} {'状态':<10} {'负责人':<8}")
print("-" * 60)
priority_map = {"high": "🔴高", "medium": "🟡中", "low": "🟢低"}
status_map = {"pending": "待处理", "doing": "进行中", "done": "已完成"}
for row in rows:
p = priority_map.get(row["priority"], row["priority"])
s = status_map.get(row["status"], row["status"])
print(f"{row['id']:<4} {row['title']:<22} {p:<8} {s:<10} {row['assignee'] or '-':<8}")
print(f"共 {len(rows)} 条")
def demo_crud() -> None:
conn = sqlite3.connect(":memory:")
conn.row_factory = sqlite3.Row
setup_db(conn)
# 查询全部
print_tasks(read_tasks(conn), "初始任务列表")
# 新增
new_id = create_task(conn, "撰写技术博客", "medium", "dave")
print(f"\n✓ 新增任务 ID={new_id}")
# 更新
affected = update_task_status(conn, 2, "done")
print(f"✓ 更新任务#2状态 → done,影响 {affected} 行")
# 按状态筛选
print_tasks(read_tasks(conn, "pending"), "待处理任务")
# 聚合统计
print("\n=== 按状态统计 ===")
print(f"{'状态':<10} {'数量':<6} {'负责人数'}")
print("-" * 30)
for row in conn.execute(
"SELECT status, COUNT(*) as cnt, COUNT(DISTINCT assignee) as people "
"FROM tasks GROUP BY status ORDER BY cnt DESC"
):
print(f"{row['status']:<10} {row['cnt']:<6} {row['people']}")
# 删除
delete_task(conn, new_id)
print(f"\n✓ 删除任务#{new_id}")
print(f"剩余任务数: {conn.execute('SELECT COUNT(*) FROM tasks').fetchone()[0]}")
conn.close()
def demo_transaction() -> None:
conn = sqlite3.connect(":memory:")
conn.row_factory = sqlite3.Row
setup_db(conn)
print("\n=== 事务演示:批量状态迁移 ===")
print("场景:将所有 'doing' 任务批量标记为 'done'(原子操作)\n")
doing_before = conn.execute("SELECT COUNT(*) FROM tasks WHERE status='doing'").fetchone()[0]
print(f"迁移前 doing 数量: {doing_before}")
try:
with conn: # with conn 自动管理事务
conn.execute("UPDATE tasks SET status='done' WHERE status='doing'")
# 模拟中途检查
count = conn.execute("SELECT COUNT(*) FROM tasks WHERE status='done'").fetchone()[0]
print(f"事务中 done 数量: {count}")
# 若此处抛异常,事务自动回滚
print("✓ 事务提交成功")
except Exception as e:
print(f"✗ 事务回滚: {e}")
doing_after = conn.execute("SELECT COUNT(*) FROM tasks WHERE status='doing'").fetchone()[0]
done_after = conn.execute("SELECT COUNT(*) FROM tasks WHERE status='done'").fetchone()[0]
print(f"迁移后 doing 数量: {doing_after}")
print(f"迁移后 done 数量: {done_after}")
conn.close()
# SQL 注入:恶意输入 "' OR '1'='1" 如果被直接拼入 SQL,WHERE 条件永远为真。
# 参数化查询后,这个字符串只是普通数据,查不到任何结果。
def demo_injection() -> None:
conn = sqlite3.connect(":memory:")
conn.row_factory = sqlite3.Row
setup_db(conn)
malicious = "' OR '1'='1"
print(f"\n=== SQL注入防护演示 ===")
print(f"恶意输入: {malicious!r}\n")
# 参数化查询:安全
safe_result = conn.execute(
"SELECT COUNT(*) as cnt FROM tasks WHERE assignee=?", (malicious,)
).fetchone()
print(f"✅ 参数化查询结果: {safe_result['cnt']} 条(正确,无注入)")
# 展示参数化查询的 SQL 计划
print("\n参数化查询会将输入作为纯数据,不解析为 SQL 语法")
print("即使输入包含 SQL 关键字,也只会被当作字符串值匹配")
conn.close()
def main() -> None:
parser = argparse.ArgumentParser(description="SQLite CRUD 演示")
parser.add_argument(
"--mode",
choices=["crud", "transaction", "injection"],
default="crud",
help="crud=增删改查, transaction=事务, injection=注入防护",
)
args = parser.parse_args()
if args.mode == "crud":
demo_crud()
elif args.mode == "transaction":
demo_transaction()
else:
demo_injection()
import sys
for mode in ["injection", "crud"]:
print(f"\n>>> mode={mode}")
sys.argv = ["prog", "--mode", mode]
main()
极客实战:完整源码与运行
现在,把上面的积木拼起来,将以下完整代码放进你的编辑器,运行它。先看整体闭环,再回头逐段改参数,你会更容易建立工程直觉。
# 文件名: 16-python-mysql-basic.py
# 运行: python3 16-python-mysql-basic.py --mode crud
# python3 16-python-mysql-basic.py --mode transaction
# python3 16-python-mysql-basic.py --mode injection
import argparse
import sqlite3
from contextlib import contextmanager
from datetime import datetime
from typing import Generator
# ── 数据库连接管理 ──────────────────────────────────────────
@contextmanager
def get_db(path: str = ":memory:") -> Generator[sqlite3.Connection, None, None]:
"""上下文管理器:自动提交/回滚,自动关闭"""
conn = sqlite3.connect(path)
conn.row_factory = sqlite3.Row # 让结果支持列名访问
conn.execute("PRAGMA journal_mode=WAL") # 提升并发写性能
try:
yield conn
conn.commit()
except Exception:
conn.rollback()
raise
finally:
conn.close()
# ── 建表 ────────────────────────────────────────────────────
SCHEMA = """
CREATE TABLE IF NOT EXISTS tasks (
id INTEGER PRIMARY KEY AUTOINCREMENT,
title TEXT NOT NULL,
priority TEXT CHECK(priority IN ('low','medium','high')) DEFAULT 'medium',
status TEXT CHECK(status IN ('pending','doing','done')) DEFAULT 'pending',
assignee TEXT,
created_at TEXT DEFAULT (datetime('now','localtime')),
updated_at TEXT DEFAULT (datetime('now','localtime'))
);
"""
SEED_DATA: list[tuple[str, str, str, str]] = [
("设计数据库 Schema", "high", "doing", "alice"),
("编写 API 接口文档", "medium", "pending", "bob"),
("实现用户认证模块", "high", "done", "alice"),
("前端登录页面", "medium", "doing", "carol"),
("部署到测试环境", "low", "pending", "bob"),
("性能压测", "high", "pending", "alice"),
("修复 #42 Bug", "high", "done", "carol"),
("更新 CHANGELOG", "low", "done", "bob"),
]
def setup_db(conn: sqlite3.Connection) -> None:
conn.executescript(SCHEMA)
conn.executemany(
"INSERT INTO tasks (title, priority, status, assignee) VALUES (?,?,?,?)",
SEED_DATA,
)
conn.commit()
# ── CRUD 操作 ───────────────────────────────────────────────
def create_task(conn: sqlite3.Connection, title: str, priority: str, assignee: str) -> int:
cur = conn.execute(
"INSERT INTO tasks (title, priority, assignee) VALUES (?,?,?)",
(title, priority, assignee),
)
conn.commit()
return cur.lastrowid or 0
def read_tasks(conn: sqlite3.Connection, status: str | None = None) -> list[sqlite3.Row]:
if status:
return conn.execute(
"SELECT * FROM tasks WHERE status=? ORDER BY priority DESC, id", (status,)
).fetchall()
return conn.execute("SELECT * FROM tasks ORDER BY id").fetchall()
def update_task_status(conn: sqlite3.Connection, task_id: int, new_status: str) -> int:
cur = conn.execute(
"UPDATE tasks SET status=?, updated_at=datetime('now','localtime') WHERE id=?",
(new_status, task_id),
)
conn.commit()
return cur.rowcount
def delete_task(conn: sqlite3.Connection, task_id: int) -> int:
cur = conn.execute("DELETE FROM tasks WHERE id=?", (task_id,))
conn.commit()
return cur.rowcount
def print_tasks(rows: list[sqlite3.Row], title: str = "任务列表") -> None:
print(f"\n=== {title} ===")
print(f"{'ID':<4} {'标题':<22} {'优先级':<8} {'状态':<10} {'负责人':<8}")
print("-" * 60)
priority_map = {"high": "🔴高", "medium": "🟡中", "low": "🟢低"}
status_map = {"pending": "待处理", "doing": "进行中", "done": "已完成"}
for row in rows:
p = priority_map.get(row["priority"], row["priority"])
s = status_map.get(row["status"], row["status"])
print(f"{row['id']:<4} {row['title']:<22} {p:<8} {s:<10} {row['assignee'] or '-':<8}")
print(f"共 {len(rows)} 条")
def demo_crud() -> None:
conn = sqlite3.connect(":memory:")
conn.row_factory = sqlite3.Row
setup_db(conn)
# 查询全部
print_tasks(read_tasks(conn), "初始任务列表")
# 新增
new_id = create_task(conn, "撰写技术博客", "medium", "dave")
print(f"\n✓ 新增任务 ID={new_id}")
# 更新
affected = update_task_status(conn, 2, "done")
print(f"✓ 更新任务#2状态 → done,影响 {affected} 行")
# 按状态筛选
print_tasks(read_tasks(conn, "pending"), "待处理任务")
# 聚合统计
print("\n=== 按状态统计 ===")
print(f"{'状态':<10} {'数量':<6} {'负责人数'}")
print("-" * 30)
for row in conn.execute(
"SELECT status, COUNT(*) as cnt, COUNT(DISTINCT assignee) as people "
"FROM tasks GROUP BY status ORDER BY cnt DESC"
):
print(f"{row['status']:<10} {row['cnt']:<6} {row['people']}")
# 删除
delete_task(conn, new_id)
print(f"\n✓ 删除任务#{new_id}")
print(f"剩余任务数: {conn.execute('SELECT COUNT(*) FROM tasks').fetchone()[0]}")
conn.close()
# ── 事务演示 ────────────────────────────────────────────────
def demo_transaction() -> None:
conn = sqlite3.connect(":memory:")
conn.row_factory = sqlite3.Row
setup_db(conn)
print("\n=== 事务演示:批量状态迁移 ===")
print("场景:将所有 'doing' 任务批量标记为 'done'(原子操作)\n")
doing_before = conn.execute("SELECT COUNT(*) FROM tasks WHERE status='doing'").fetchone()[0]
print(f"迁移前 doing 数量: {doing_before}")
try:
with conn: # with conn 自动管理事务
conn.execute("UPDATE tasks SET status='done' WHERE status='doing'")
# 模拟中途检查
count = conn.execute("SELECT COUNT(*) FROM tasks WHERE status='done'").fetchone()[0]
print(f"事务中 done 数量: {count}")
# 若此处抛异常,事务自动回滚
print("✓ 事务提交成功")
except Exception as e:
print(f"✗ 事务回滚: {e}")
doing_after = conn.execute("SELECT COUNT(*) FROM tasks WHERE status='doing'").fetchone()[0]
done_after = conn.execute("SELECT COUNT(*) FROM tasks WHERE status='done'").fetchone()[0]
print(f"迁移后 doing 数量: {doing_after}")
print(f"迁移后 done 数量: {done_after}")
conn.close()
# ── SQL注入防护演示 ─────────────────────────────────────────
def demo_injection() -> None:
conn = sqlite3.connect(":memory:")
conn.row_factory = sqlite3.Row
setup_db(conn)
malicious = "' OR '1'='1"
print(f"\n=== SQL注入防护演示 ===")
print(f"恶意输入: {malicious!r}\n")
# 参数化查询:安全
safe_result = conn.execute(
"SELECT COUNT(*) as cnt FROM tasks WHERE assignee=?", (malicious,)
).fetchone()
print(f"✅ 参数化查询结果: {safe_result['cnt']} 条(正确,无注入)")
# 展示参数化查询的 SQL 计划
print("\n参数化查询会将输入作为纯数据,不解析为 SQL 语法")
print("即使输入包含 SQL 关键字,也只会被当作字符串值匹配")
conn.close()
def main() -> None:
parser = argparse.ArgumentParser(description="SQLite CRUD 演示")
parser.add_argument(
"--mode",
choices=["crud", "transaction", "injection"],
default="crud",
help="crud=增删改查, transaction=事务, injection=注入防护",
)
args = parser.parse_args()
if args.mode == "crud":
demo_crud()
elif args.mode == "transaction":
demo_transaction()
else:
demo_injection()
if __name__ == "__main__":
main()
$ python3 16-python-mysql-basic.py --mode crud
=== 初始任务列表 ===
ID 标题 优先级 状态 负责人
------------------------------------------------------------
1 设计数据库 Schema 🔴高 进行中 alice
2 编写 API 接口文档 🟡中 待处理 bob
3 实现用户认证模块 🔴高 已完成 alice
4 前端登录页面 🟡中 进行中 carol
5 部署到测试环境 🟢低 待处理 bob
6 性能压测 🔴高 待处理 alice
7 修复 #42 Bug 🔴高 已完成 carol
8 更新 CHANGELOG 🟢低 已完成 bob
共 8 条
✓ 新增任务 ID=9
✓ 更新任务#2状态 → done,影响 1 行
=== 待处理任务 ===
ID 标题 优先级 状态 负责人
------------------------------------------------------------
5 部署到测试环境 🟢低 待处理 bob
6 性能压测 🔴高 待处理 alice
共 2 条
=== 按状态统计 ===
状态 数量 负责人数
------------------------------
done 4 3
pending 2 2
doing 2 2
✓ 删除任务#9
剩余任务数: 8
$ python3 16-python-mysql-basic.py --mode transaction
=== 事务演示:批量状态迁移 ===
场景:将所有 'doing' 任务批量标记为 'done'(原子操作)
迁移前 doing 数量: 2
事务中 done 数量: 5
✓ 事务提交成功
迁移后 doing 数量: 0
迁移后 done 数量: 5
$ python3 16-python-mysql-basic.py --mode injection
=== SQL注入防护演示 ===
恶意输入: "' OR '1'='1"
✅ 参数化查询结果: 0 条(正确,无注入)
参数化查询会将输入作为纯数据,不解析为 SQL 语法
即使输入包含 SQL 关键字,也只会被当作字符串值匹配
小结
| 概念 | 一句话记忆 |
|---|---|
get_db 上下文管理器 |
打开→提交→关闭,出错自动回滚,连接不泄漏 |
CHECK 约束 |
数据库层面的门卫,非法值直接拒绝 |
? 占位符 |
参数永远是"数据",不会被解析成 SQL 语法 |
lastrowid |
刚插入行的自增主键,插完立刻拿到编号 |
rowcount |
写操作的回执,0 = WHERE 没有命中任何行 |
with conn: |
事务快捷方式,正常退出提交,异常自动回滚 |
:memory: |
内存数据库,进程结束自动消失,适合测试 |
⏱ NexDo Time(5 分钟)
挑战:给 tasks 表实现软删除——不真正删除行,而是加一个 deleted_at TEXT 字段记录删除时间,查询时自动过滤已删除记录。
具体要求:
- 在
SCHEMA里加deleted_at TEXT DEFAULT NULL字段 - 把
delete_task改为soft_delete_task:执行UPDATE tasks SET deleted_at=datetime('now','localtime') WHERE id=? - 修改
read_tasks:在WHERE条件里加AND deleted_at IS NULL,让已删除的任务不再出现在查询结果中 - 运行验证:软删除一条任务后,
read_tasks查不到它,但直接SELECT *还能看到那条记录(deleted_at不为空)
Don’t wait for next time, do it in the next moment.