文章

16 · 数据库底座:SQLite 核心操作与 Python 接入

#015 · 2026-04-16 · Python

🔗 知识图谱导航:阅读本文前,建议先回顾《8 · 模块化构建:标准库与自定义包》中的脚本组织方式,以及《9 · 文件系统与异常:构建健壮的 IO 流》里的 with 上下文管理器和异常边界。本文会把“文件持久化”升级为“表结构持久化”,让数据真正进入关系型数据库。

极客解析:数据库学习的核心不是背 SQL,而是理解数据建模、事务边界和查询代价。sqlite3 是 Python 内置的零安装关系型数据库驱动,用它把 CRUD、参数化查询、事务三件套打通,就掌握了所有关系型数据库的共同底座。

痛点与架构:新手学数据库最常见的两个坑:① 忘记 commit() 导致数据写了又丢;② 用字符串拼接 SQL 导致注入漏洞。本文用 get_db 上下文管理器消灭第一个坑,用 ? 占位符消灭第二个坑,再用 with conn: 把事务原子性讲透。

关系型数据库核心概念

┌─────────────────────────────────────────────────────────┐
│                    关系型数据库                          │
│                                                         │
│  表(Table) = 二维结构                                   │
│  ┌──────┬──────────┬────────────┬──────────┐           │
│  │ id   │ title    │ status     │ created  │           │
│  ├──────┼──────────┼────────────┼──────────┤           │
│  │  1   │ 写文档   │ done       │ 2026-... │  ← 行(Row)│
│  │  2   │ 代码审查 │ pending    │ 2026-... │           │
│  └──────┴──────────┴────────────┴──────────┘           │
│       ↑                                                 │
│    列(Column)/字段(Field)                               │
│                                                         │
│  主键(PK):唯一标识每行,通常是自增 id                  │
│  外键(FK):引用另一张表的主键,建立关联                  │
│  索引(Index):加速查询,代价是写入变慢                   │
└─────────────────────────────────────────────────────────┘

SQL 核心语句速查

-- 建表
CREATE TABLE tasks (
    id      INTEGER PRIMARY KEY AUTOINCREMENT,
    title   TEXT    NOT NULL,
    status  TEXT    DEFAULT 'pending',
    created TEXT    DEFAULT (datetime('now'))
);

-- 插入
INSERT INTO tasks (title, status) VALUES ('写文档', 'pending');

-- 查询
SELECT id, title, status FROM tasks WHERE status = 'pending' ORDER BY id;

-- 更新
UPDATE tasks SET status = 'done' WHERE id = 1;

-- 删除
DELETE FROM tasks WHERE id = 1;

-- 聚合
SELECT status, COUNT(*) as cnt FROM tasks GROUP BY status;

参数化查询(防 SQL 注入)

import sqlite3
conn = sqlite3.connect(":memory:")
conn.execute("CREATE TABLE tasks (title TEXT)")
conn.execute("INSERT INTO tasks VALUES (?)", ("hello",))
cursor = conn.cursor()
user_input = "hello"


cursor.execute("SELECT * FROM tasks WHERE title = ?", (user_input,))
print("安全查询结果:", cursor.fetchall())
conn.close()

sqlite3 vs MySQL 核心差异

┌──────────────┬──────────────────┬──────────────────┐
│ 特性         │ SQLite           │ MySQL            │
├──────────────┼──────────────────┼──────────────────┤
│ 安装         │ Python内置,零安装│ 需独立安装       │
│ 连接方式     │ 文件/内存        │ TCP/Socket       │
│ 并发写       │ 单写多读         │ 高并发读写       │
│ 数据类型     │ 动态类型         │ 严格类型         │
│ 适合场景     │ 开发/嵌入/测试   │ 生产/高并发      │
│ Python驱动   │ sqlite3(内置)    │ pymysql/mysqlclient│
└──────────────┴──────────────────┴──────────────────┘

步步为营:核心逻辑自适应拆解

这一篇先把结论说清楚:sqlite3 是 Python 内置的关系型数据库驱动,零安装即可使用;get_db 上下文管理器负责连接生命周期;? 占位符是参数化查询的标准写法,既防注入又防格式错误;with conn: 是事务的快捷方式。下面每一步都聚焦一个概念,跑完就能看到结果。

Step 1:用 get_db 上下文管理器安全管理数据库连接

痛点与机制

想象你去图书馆借书:进门取号(打开连接)→ 看完还书(提交)→ 出门还号(关闭连接)。如果中途出了意外,图书馆会自动帮你还书(回滚+关闭)。get_db 就是这个"自动还书机制"——@contextmanager 把"打开→提交→关闭"三件事收进 with 语句,调用方永远不会忘记提交或泄漏连接。conn.row_factory = sqlite3.Row 让查询结果支持 row["列名"] 访问,比 row[0] 更可读。

核心源码(逐字来自文末完整源码)

@contextmanager
def get_db(path: str = ":memory:") -> Generator[sqlite3.Connection, None, None]:
    """上下文管理器:自动提交/回滚,自动关闭"""
    conn = sqlite3.connect(path)
    conn.row_factory = sqlite3.Row  # 让结果支持列名访问
    conn.execute("PRAGMA journal_mode=WAL")  # 提升并发写性能
    try:
        yield conn
        conn.commit()
    except Exception:
        conn.rollback()
        raise
    finally:
        conn.close()

可运行演示(补齐 Mock 数据与 print 反馈)

# 文件名: 16-python-mysql-basic.py
# 运行: python3 16-python-mysql-basic.py --mode crud
#       python3 16-python-mysql-basic.py --mode transaction
#       python3 16-python-mysql-basic.py --mode injection

import argparse
import sqlite3
from contextlib import contextmanager
from datetime import datetime
from typing import Generator

# get_db 是整篇代码的"安全阀":
# 正常退出 with 块 → 自动 commit;抛出异常 → 自动 rollback;最终 → 自动 close。
# 调用方只需关注业务逻辑,连接泄漏和忘记提交的问题都消失了。
@contextmanager
def get_db(path: str = ":memory:") -> Generator[sqlite3.Connection, None, None]:
    """上下文管理器:自动提交/回滚,自动关闭"""
    conn = sqlite3.connect(path)
    conn.row_factory = sqlite3.Row  # 让结果支持列名访问
    conn.execute("PRAGMA journal_mode=WAL")  # 提升并发写性能
    try:
        yield conn
        conn.commit()
    except Exception:
        conn.rollback()
        raise
    finally:
        conn.close()

with get_db(":memory:") as conn:
    conn.execute("CREATE TABLE demo (val TEXT)")
    conn.execute("INSERT INTO demo VALUES (?)", ("hello sqlite",))
    row = conn.execute("SELECT val FROM demo").fetchone()
    print("连接管理器演示:", row["val"])
    print("row_factory 已启用,支持列名访问:", type(row).__name__)

Step 2:用 SCHEMA 定义表结构,用 setup_db 建表并写入种子数据

痛点与机制

SCHEMA 里的 CHECK 约束就像数据库的"门卫":即使 Python 代码有 bug 传入了 "urgent" 这种非法优先级,门卫会直接拒之门外,不需要在 Python 里写一堆 if/else 校验。executemany 是批量插入的正确姿势——它只编译一次 SQL,然后用不同参数重复执行,比循环调用 execute 快得多。SEED_DATA 的类型注解 list[tuple[str, str, str, str]] 明确了每条记录的字段顺序,避免列对齐出错。

核心源码(逐字来自文末完整源码)

SCHEMA = """
CREATE TABLE IF NOT EXISTS tasks (
    id          INTEGER PRIMARY KEY AUTOINCREMENT,
    title       TEXT    NOT NULL,
    priority    TEXT    CHECK(priority IN ('low','medium','high')) DEFAULT 'medium',
    status      TEXT    CHECK(status IN ('pending','doing','done')) DEFAULT 'pending',
    assignee    TEXT,
    created_at  TEXT    DEFAULT (datetime('now','localtime')),
    updated_at  TEXT    DEFAULT (datetime('now','localtime'))
);
"""

SEED_DATA: list[tuple[str, str, str, str]] = [
    ("设计数据库 Schema",    "high",   "doing",   "alice"),
    ("编写 API 接口文档",    "medium", "pending", "bob"),
    ("实现用户认证模块",     "high",   "done",    "alice"),
    ("前端登录页面",         "medium", "doing",   "carol"),
    ("部署到测试环境",       "low",    "pending", "bob"),
    ("性能压测",             "high",   "pending", "alice"),
    ("修复 #42 Bug",         "high",   "done",    "carol"),
    ("更新 CHANGELOG",       "low",    "done",    "bob"),
]


def setup_db(conn: sqlite3.Connection) -> None:
    conn.executescript(SCHEMA)
    conn.executemany(
        "INSERT INTO tasks (title, priority, status, assignee) VALUES (?,?,?,?)",
        SEED_DATA,
    )
    conn.commit()

可运行演示(补齐 Mock 数据与 print 反馈)

# 文件名: 16-python-mysql-basic.py
# 运行: python3 16-python-mysql-basic.py --mode crud
#       python3 16-python-mysql-basic.py --mode transaction
#       python3 16-python-mysql-basic.py --mode injection

import argparse
import sqlite3
from contextlib import contextmanager
from datetime import datetime
from typing import Generator

# CHECK 约束是数据库层面的防线,不依赖 Python 代码的正确性。
# executemany 批量插入:只编译一次 SQL,用不同参数重复执行,比循环 execute 更高效。
SCHEMA = """
CREATE TABLE IF NOT EXISTS tasks (
    id          INTEGER PRIMARY KEY AUTOINCREMENT,
    title       TEXT    NOT NULL,
    priority    TEXT    CHECK(priority IN ('low','medium','high')) DEFAULT 'medium',
    status      TEXT    CHECK(status IN ('pending','doing','done')) DEFAULT 'pending',
    assignee    TEXT,
    created_at  TEXT    DEFAULT (datetime('now','localtime')),
    updated_at  TEXT    DEFAULT (datetime('now','localtime'))
);
"""

SEED_DATA: list[tuple[str, str, str, str]] = [
    ("设计数据库 Schema",    "high",   "doing",   "alice"),
    ("编写 API 接口文档",    "medium", "pending", "bob"),
    ("实现用户认证模块",     "high",   "done",    "alice"),
    ("前端登录页面",         "medium", "doing",   "carol"),
    ("部署到测试环境",       "low",    "pending", "bob"),
    ("性能压测",             "high",   "pending", "alice"),
    ("修复 #42 Bug",         "high",   "done",    "carol"),
    ("更新 CHANGELOG",       "low",    "done",    "bob"),
]


def setup_db(conn: sqlite3.Connection) -> None:
    conn.executescript(SCHEMA)
    conn.executemany(
        "INSERT INTO tasks (title, priority, status, assignee) VALUES (?,?,?,?)",
        SEED_DATA,
    )
    conn.commit()

import sqlite3
conn = sqlite3.connect(":memory:")
conn.row_factory = sqlite3.Row
setup_db(conn)
count = conn.execute("SELECT COUNT(*) FROM tasks").fetchone()[0]
print(f"建表成功,种子数据共 {count} 条")
try:
    conn.execute("INSERT INTO tasks (title, priority) VALUES (?, ?)", ("测试", "urgent"))
    conn.commit()
    print("CHECK 约束未生效(不应出现)")
except Exception as e:
    print(f"CHECK 约束生效,非法值被拒绝: {type(e).__name__}")
conn.close()

Step 3:用 create_task 新增任务,理解 lastrowid 和参数化查询

痛点与机制

? 占位符是 SQL 注入的"防弹衣":驱动会把参数值转义后再传给数据库,用户输入的 ' OR '1'='1 这类恶意字符串永远只是"数据",不会被解析成 SQL 语法。lastrowid 是刚插入行的自增主键,就像快递单号——插入完立刻拿到编号,可以直接返回给前端或做后续关联操作。

核心源码(逐字来自文末完整源码)

def create_task(conn: sqlite3.Connection, title: str, priority: str, assignee: str) -> int:
    cur = conn.execute(
        "INSERT INTO tasks (title, priority, assignee) VALUES (?,?,?)",
        (title, priority, assignee),
    )
    conn.commit()
    return cur.lastrowid or 0

可运行演示(补齐 Mock 数据与 print 反馈)

# 文件名: 16-python-mysql-basic.py
# 运行: python3 16-python-mysql-basic.py --mode crud
#       python3 16-python-mysql-basic.py --mode transaction
#       python3 16-python-mysql-basic.py --mode injection

import argparse
import sqlite3
from contextlib import contextmanager
from datetime import datetime
from typing import Generator

@contextmanager
def get_db(path: str = ":memory:") -> Generator[sqlite3.Connection, None, None]:
    """上下文管理器:自动提交/回滚,自动关闭"""
    conn = sqlite3.connect(path)
    conn.row_factory = sqlite3.Row  # 让结果支持列名访问
    conn.execute("PRAGMA journal_mode=WAL")  # 提升并发写性能
    try:
        yield conn
        conn.commit()
    except Exception:
        conn.rollback()
        raise
    finally:
        conn.close()

SCHEMA = """
CREATE TABLE IF NOT EXISTS tasks (
    id          INTEGER PRIMARY KEY AUTOINCREMENT,
    title       TEXT    NOT NULL,
    priority    TEXT    CHECK(priority IN ('low','medium','high')) DEFAULT 'medium',
    status      TEXT    CHECK(status IN ('pending','doing','done')) DEFAULT 'pending',
    assignee    TEXT,
    created_at  TEXT    DEFAULT (datetime('now','localtime')),
    updated_at  TEXT    DEFAULT (datetime('now','localtime'))
);
"""

SEED_DATA: list[tuple[str, str, str, str]] = [
    ("设计数据库 Schema",    "high",   "doing",   "alice"),
    ("编写 API 接口文档",    "medium", "pending", "bob"),
    ("实现用户认证模块",     "high",   "done",    "alice"),
    ("前端登录页面",         "medium", "doing",   "carol"),
    ("部署到测试环境",       "low",    "pending", "bob"),
    ("性能压测",             "high",   "pending", "alice"),
    ("修复 #42 Bug",         "high",   "done",    "carol"),
    ("更新 CHANGELOG",       "low",    "done",    "bob"),
]


def setup_db(conn: sqlite3.Connection) -> None:
    conn.executescript(SCHEMA)
    conn.executemany(
        "INSERT INTO tasks (title, priority, status, assignee) VALUES (?,?,?,?)",
        SEED_DATA,
    )
    conn.commit()

# ? 占位符让驱动把参数作为纯数据处理,不解析为 SQL 语法——这是防注入的根本。
# lastrowid 是刚插入行的自增 id,可以立刻拿来做后续操作(比如返回给前端)。
def create_task(conn: sqlite3.Connection, title: str, priority: str, assignee: str) -> int:
    cur = conn.execute(
        "INSERT INTO tasks (title, priority, assignee) VALUES (?,?,?)",
        (title, priority, assignee),
    )
    conn.commit()
    return cur.lastrowid or 0

import sqlite3
conn = sqlite3.connect(":memory:")
conn.row_factory = sqlite3.Row
setup_db(conn)
new_id = create_task(conn, "撰写技术博客", "medium", "dave")
print(f"新增任务 ID: {new_id}")
row = conn.execute("SELECT title, priority, assignee FROM tasks WHERE id=?", (new_id,)).fetchone()
print(f"写入验证: title={row['title']}, priority={row['priority']}, assignee={row['assignee']}")
conn.close()

Step 4:用 read_tasks 查询任务,用 print_tasks 格式化展示

痛点与机制

read_tasks 用条件分支决定是否加 WHERE 子句,避免了字符串拼接 SQL 的风险。sqlite3.Row 就像一个"双模遥控器":既能用下标 row[0] 访问,也能用列名 row["status"] 访问,后者在列顺序变化时不会出 bug。print_tasks 用 f-string 的 :<N 对齐格式让表格整齐,priority_mapstatus_map 把数据库里的英文枚举值翻译成带 emoji 的中文,让终端输出一眼可辨。

核心源码(逐字来自文末完整源码)

def read_tasks(conn: sqlite3.Connection, status: str | None = None) -> list[sqlite3.Row]:
    if status:
        return conn.execute(
            "SELECT * FROM tasks WHERE status=? ORDER BY priority DESC, id", (status,)
        ).fetchall()
    return conn.execute("SELECT * FROM tasks ORDER BY id").fetchall()


def update_task_status(conn: sqlite3.Connection, task_id: int, new_status: str) -> int:
    cur = conn.execute(
        "UPDATE tasks SET status=?, updated_at=datetime('now','localtime') WHERE id=?",
        (new_status, task_id),
    )
    conn.commit()
    return cur.rowcount


def delete_task(conn: sqlite3.Connection, task_id: int) -> int:
    cur = conn.execute("DELETE FROM tasks WHERE id=?", (task_id,))
    conn.commit()
    return cur.rowcount


def print_tasks(rows: list[sqlite3.Row], title: str = "任务列表") -> None:
    print(f"\n=== {title} ===")
    print(f"{'ID':<4} {'标题':<22} {'优先级':<8} {'状态':<10} {'负责人':<8}")
    print("-" * 60)
    priority_map = {"high": "🔴高", "medium": "🟡中", "low": "🟢低"}
    status_map = {"pending": "待处理", "doing": "进行中", "done": "已完成"}
    for row in rows:
        p = priority_map.get(row["priority"], row["priority"])
        s = status_map.get(row["status"], row["status"])
        print(f"{row['id']:<4} {row['title']:<22} {p:<8} {s:<10} {row['assignee'] or '-':<8}")
    print(f"共 {len(rows)} 条")

可运行演示(补齐 Mock 数据与 print 反馈)

# 文件名: 16-python-mysql-basic.py
# 运行: python3 16-python-mysql-basic.py --mode crud
#       python3 16-python-mysql-basic.py --mode transaction
#       python3 16-python-mysql-basic.py --mode injection

import argparse
import sqlite3
from contextlib import contextmanager
from datetime import datetime
from typing import Generator

@contextmanager
def get_db(path: str = ":memory:") -> Generator[sqlite3.Connection, None, None]:
    """上下文管理器:自动提交/回滚,自动关闭"""
    conn = sqlite3.connect(path)
    conn.row_factory = sqlite3.Row  # 让结果支持列名访问
    conn.execute("PRAGMA journal_mode=WAL")  # 提升并发写性能
    try:
        yield conn
        conn.commit()
    except Exception:
        conn.rollback()
        raise
    finally:
        conn.close()

SCHEMA = """
CREATE TABLE IF NOT EXISTS tasks (
    id          INTEGER PRIMARY KEY AUTOINCREMENT,
    title       TEXT    NOT NULL,
    priority    TEXT    CHECK(priority IN ('low','medium','high')) DEFAULT 'medium',
    status      TEXT    CHECK(status IN ('pending','doing','done')) DEFAULT 'pending',
    assignee    TEXT,
    created_at  TEXT    DEFAULT (datetime('now','localtime')),
    updated_at  TEXT    DEFAULT (datetime('now','localtime'))
);
"""

SEED_DATA: list[tuple[str, str, str, str]] = [
    ("设计数据库 Schema",    "high",   "doing",   "alice"),
    ("编写 API 接口文档",    "medium", "pending", "bob"),
    ("实现用户认证模块",     "high",   "done",    "alice"),
    ("前端登录页面",         "medium", "doing",   "carol"),
    ("部署到测试环境",       "low",    "pending", "bob"),
    ("性能压测",             "high",   "pending", "alice"),
    ("修复 #42 Bug",         "high",   "done",    "carol"),
    ("更新 CHANGELOG",       "low",    "done",    "bob"),
]


def setup_db(conn: sqlite3.Connection) -> None:
    conn.executescript(SCHEMA)
    conn.executemany(
        "INSERT INTO tasks (title, priority, status, assignee) VALUES (?,?,?,?)",
        SEED_DATA,
    )
    conn.commit()

# read_tasks 不传 status → 查全部;传 status → 按状态筛选。
# sqlite3.Row 同时支持下标和列名访问,row["status"] 比 row[3] 更不容易出错。
def read_tasks(conn: sqlite3.Connection, status: str | None = None) -> list[sqlite3.Row]:
    if status:
        return conn.execute(
            "SELECT * FROM tasks WHERE status=? ORDER BY priority DESC, id", (status,)
        ).fetchall()
    return conn.execute("SELECT * FROM tasks ORDER BY id").fetchall()


def update_task_status(conn: sqlite3.Connection, task_id: int, new_status: str) -> int:
    cur = conn.execute(
        "UPDATE tasks SET status=?, updated_at=datetime('now','localtime') WHERE id=?",
        (new_status, task_id),
    )
    conn.commit()
    return cur.rowcount


def delete_task(conn: sqlite3.Connection, task_id: int) -> int:
    cur = conn.execute("DELETE FROM tasks WHERE id=?", (task_id,))
    conn.commit()
    return cur.rowcount


def print_tasks(rows: list[sqlite3.Row], title: str = "任务列表") -> None:
    print(f"\n=== {title} ===")
    print(f"{'ID':<4} {'标题':<22} {'优先级':<8} {'状态':<10} {'负责人':<8}")
    print("-" * 60)
    priority_map = {"high": "🔴高", "medium": "🟡中", "low": "🟢低"}
    status_map = {"pending": "待处理", "doing": "进行中", "done": "已完成"}
    for row in rows:
        p = priority_map.get(row["priority"], row["priority"])
        s = status_map.get(row["status"], row["status"])
        print(f"{row['id']:<4} {row['title']:<22} {p:<8} {s:<10} {row['assignee'] or '-':<8}")
    print(f"共 {len(rows)} 条")

import sqlite3
conn = sqlite3.connect(":memory:")
conn.row_factory = sqlite3.Row
setup_db(conn)
all_tasks = read_tasks(conn)
print_tasks(all_tasks, "全部任务")
pending = read_tasks(conn, "pending")
print(f"\n待处理任务数量: {len(pending)}")
conn.close()

Step 5:用 update_task_status 更新状态,用 delete_task 删除任务

痛点与机制

rowcount 是写操作的"回执":如果 WHERE id=? 没有匹配到任何行,rowcount 会是 0——就像快递投递失败返回"无此地址"。调用方可以据此返回 404 或提示"任务不存在",而不是默默地什么都没发生。update_task_status 在 SQL 里直接用 datetime('now','localtime') 更新时间戳,比在 Python 里生成时间再传入更简洁,也避免了时区处理的麻烦。

核心源码(逐字来自文末完整源码)

def update_task_status(conn: sqlite3.Connection, task_id: int, new_status: str) -> int:
    cur = conn.execute(
        "UPDATE tasks SET status=?, updated_at=datetime('now','localtime') WHERE id=?",
        (new_status, task_id),
    )
    conn.commit()
    return cur.rowcount


def delete_task(conn: sqlite3.Connection, task_id: int) -> int:
    cur = conn.execute("DELETE FROM tasks WHERE id=?", (task_id,))
    conn.commit()
    return cur.rowcount

可运行演示(补齐 Mock 数据与 print 反馈)

# 文件名: 16-python-mysql-basic.py
# 运行: python3 16-python-mysql-basic.py --mode crud
#       python3 16-python-mysql-basic.py --mode transaction
#       python3 16-python-mysql-basic.py --mode injection

import argparse
import sqlite3
from contextlib import contextmanager
from datetime import datetime
from typing import Generator

@contextmanager
def get_db(path: str = ":memory:") -> Generator[sqlite3.Connection, None, None]:
    """上下文管理器:自动提交/回滚,自动关闭"""
    conn = sqlite3.connect(path)
    conn.row_factory = sqlite3.Row  # 让结果支持列名访问
    conn.execute("PRAGMA journal_mode=WAL")  # 提升并发写性能
    try:
        yield conn
        conn.commit()
    except Exception:
        conn.rollback()
        raise
    finally:
        conn.close()

SCHEMA = """
CREATE TABLE IF NOT EXISTS tasks (
    id          INTEGER PRIMARY KEY AUTOINCREMENT,
    title       TEXT    NOT NULL,
    priority    TEXT    CHECK(priority IN ('low','medium','high')) DEFAULT 'medium',
    status      TEXT    CHECK(status IN ('pending','doing','done')) DEFAULT 'pending',
    assignee    TEXT,
    created_at  TEXT    DEFAULT (datetime('now','localtime')),
    updated_at  TEXT    DEFAULT (datetime('now','localtime'))
);
"""

SEED_DATA: list[tuple[str, str, str, str]] = [
    ("设计数据库 Schema",    "high",   "doing",   "alice"),
    ("编写 API 接口文档",    "medium", "pending", "bob"),
    ("实现用户认证模块",     "high",   "done",    "alice"),
    ("前端登录页面",         "medium", "doing",   "carol"),
    ("部署到测试环境",       "low",    "pending", "bob"),
    ("性能压测",             "high",   "pending", "alice"),
    ("修复 #42 Bug",         "high",   "done",    "carol"),
    ("更新 CHANGELOG",       "low",    "done",    "bob"),
]


def setup_db(conn: sqlite3.Connection) -> None:
    conn.executescript(SCHEMA)
    conn.executemany(
        "INSERT INTO tasks (title, priority, status, assignee) VALUES (?,?,?,?)",
        SEED_DATA,
    )
    conn.commit()

# rowcount 是写操作的"回执":0 = WHERE 条件没有命中任何行。
# 用它判断"任务是否存在",比先 SELECT 再 UPDATE 少一次数据库往返。
def update_task_status(conn: sqlite3.Connection, task_id: int, new_status: str) -> int:
    cur = conn.execute(
        "UPDATE tasks SET status=?, updated_at=datetime('now','localtime') WHERE id=?",
        (new_status, task_id),
    )
    conn.commit()
    return cur.rowcount


def delete_task(conn: sqlite3.Connection, task_id: int) -> int:
    cur = conn.execute("DELETE FROM tasks WHERE id=?", (task_id,))
    conn.commit()
    return cur.rowcount

import sqlite3
conn = sqlite3.connect(":memory:")
conn.row_factory = sqlite3.Row
setup_db(conn)
affected = update_task_status(conn, 1, "done")
print(f"更新任务#1 → done,影响行数: {affected}")
affected_none = update_task_status(conn, 999, "done")
print(f"更新不存在的任务#999,影响行数: {affected_none}(0=未找到)")
deleted = delete_task(conn, 1)
print(f"删除任务#1,影响行数: {deleted}")
remaining = conn.execute("SELECT COUNT(*) FROM tasks").fetchone()[0]
print(f"剩余任务数: {remaining}")
conn.close()

Step 6:用 demo_crud 串联完整 CRUD 流程,看增删改查的完整闭环

痛点与机制

demo_crud 是一个完整的"剧本":用内存数据库(:memory:)跑完整个 CRUD 流程,进程结束后数据自动消失,不会留下测试文件。聚合查询里的 COUNT(DISTINCT assignee) 是 SQL 的常用技巧——就像统计一场会议"来了多少人、来自多少个部门",一条 SQL 同时得到两个维度,比分两次查询更高效。

核心源码(逐字来自文末完整源码)

def demo_crud() -> None:
    conn = sqlite3.connect(":memory:")
    conn.row_factory = sqlite3.Row
    setup_db(conn)

    # 查询全部
    print_tasks(read_tasks(conn), "初始任务列表")

    # 新增
    new_id = create_task(conn, "撰写技术博客", "medium", "dave")
    print(f"\n✓ 新增任务 ID={new_id}")

    # 更新
    affected = update_task_status(conn, 2, "done")
    print(f"✓ 更新任务#2状态 → done,影响 {affected} 行")

    # 按状态筛选
    print_tasks(read_tasks(conn, "pending"), "待处理任务")

    # 聚合统计
    print("\n=== 按状态统计 ===")
    print(f"{'状态':<10} {'数量':<6} {'负责人数'}")
    print("-" * 30)
    for row in conn.execute(
        "SELECT status, COUNT(*) as cnt, COUNT(DISTINCT assignee) as people "
        "FROM tasks GROUP BY status ORDER BY cnt DESC"
    ):
        print(f"{row['status']:<10} {row['cnt']:<6} {row['people']}")

    # 删除
    delete_task(conn, new_id)
    print(f"\n✓ 删除任务#{new_id}")
    print(f"剩余任务数: {conn.execute('SELECT COUNT(*) FROM tasks').fetchone()[0]}")
    conn.close()

可运行演示(补齐 Mock 数据与 print 反馈)

# 文件名: 16-python-mysql-basic.py
# 运行: python3 16-python-mysql-basic.py --mode crud
#       python3 16-python-mysql-basic.py --mode transaction
#       python3 16-python-mysql-basic.py --mode injection

import argparse
import sqlite3
from contextlib import contextmanager
from datetime import datetime
from typing import Generator

@contextmanager
def get_db(path: str = ":memory:") -> Generator[sqlite3.Connection, None, None]:
    """上下文管理器:自动提交/回滚,自动关闭"""
    conn = sqlite3.connect(path)
    conn.row_factory = sqlite3.Row  # 让结果支持列名访问
    conn.execute("PRAGMA journal_mode=WAL")  # 提升并发写性能
    try:
        yield conn
        conn.commit()
    except Exception:
        conn.rollback()
        raise
    finally:
        conn.close()

SCHEMA = """
CREATE TABLE IF NOT EXISTS tasks (
    id          INTEGER PRIMARY KEY AUTOINCREMENT,
    title       TEXT    NOT NULL,
    priority    TEXT    CHECK(priority IN ('low','medium','high')) DEFAULT 'medium',
    status      TEXT    CHECK(status IN ('pending','doing','done')) DEFAULT 'pending',
    assignee    TEXT,
    created_at  TEXT    DEFAULT (datetime('now','localtime')),
    updated_at  TEXT    DEFAULT (datetime('now','localtime'))
);
"""

SEED_DATA: list[tuple[str, str, str, str]] = [
    ("设计数据库 Schema",    "high",   "doing",   "alice"),
    ("编写 API 接口文档",    "medium", "pending", "bob"),
    ("实现用户认证模块",     "high",   "done",    "alice"),
    ("前端登录页面",         "medium", "doing",   "carol"),
    ("部署到测试环境",       "low",    "pending", "bob"),
    ("性能压测",             "high",   "pending", "alice"),
    ("修复 #42 Bug",         "high",   "done",    "carol"),
    ("更新 CHANGELOG",       "low",    "done",    "bob"),
]


def setup_db(conn: sqlite3.Connection) -> None:
    conn.executescript(SCHEMA)
    conn.executemany(
        "INSERT INTO tasks (title, priority, status, assignee) VALUES (?,?,?,?)",
        SEED_DATA,
    )
    conn.commit()

def create_task(conn: sqlite3.Connection, title: str, priority: str, assignee: str) -> int:
    cur = conn.execute(
        "INSERT INTO tasks (title, priority, assignee) VALUES (?,?,?)",
        (title, priority, assignee),
    )
    conn.commit()
    return cur.lastrowid or 0

def read_tasks(conn: sqlite3.Connection, status: str | None = None) -> list[sqlite3.Row]:
    if status:
        return conn.execute(
            "SELECT * FROM tasks WHERE status=? ORDER BY priority DESC, id", (status,)
        ).fetchall()
    return conn.execute("SELECT * FROM tasks ORDER BY id").fetchall()


def update_task_status(conn: sqlite3.Connection, task_id: int, new_status: str) -> int:
    cur = conn.execute(
        "UPDATE tasks SET status=?, updated_at=datetime('now','localtime') WHERE id=?",
        (new_status, task_id),
    )
    conn.commit()
    return cur.rowcount


def delete_task(conn: sqlite3.Connection, task_id: int) -> int:
    cur = conn.execute("DELETE FROM tasks WHERE id=?", (task_id,))
    conn.commit()
    return cur.rowcount


def print_tasks(rows: list[sqlite3.Row], title: str = "任务列表") -> None:
    print(f"\n=== {title} ===")
    print(f"{'ID':<4} {'标题':<22} {'优先级':<8} {'状态':<10} {'负责人':<8}")
    print("-" * 60)
    priority_map = {"high": "🔴高", "medium": "🟡中", "low": "🟢低"}
    status_map = {"pending": "待处理", "doing": "进行中", "done": "已完成"}
    for row in rows:
        p = priority_map.get(row["priority"], row["priority"])
        s = status_map.get(row["status"], row["status"])
        print(f"{row['id']:<4} {row['title']:<22} {p:<8} {s:<10} {row['assignee'] or '-':<8}")
    print(f"共 {len(rows)} 条")


# demo_crud 是完整 CRUD 剧本:建表 → 查全部 → 新增 → 更新 → 筛选 → 聚合 → 删除。
# :memory: 内存数据库:进程结束自动消失,不留测试文件,适合演示和单元测试。
def demo_crud() -> None:
    conn = sqlite3.connect(":memory:")
    conn.row_factory = sqlite3.Row
    setup_db(conn)

    # 查询全部
    print_tasks(read_tasks(conn), "初始任务列表")

    # 新增
    new_id = create_task(conn, "撰写技术博客", "medium", "dave")
    print(f"\n✓ 新增任务 ID={new_id}")

    # 更新
    affected = update_task_status(conn, 2, "done")
    print(f"✓ 更新任务#2状态 → done,影响 {affected} 行")

    # 按状态筛选
    print_tasks(read_tasks(conn, "pending"), "待处理任务")

    # 聚合统计
    print("\n=== 按状态统计 ===")
    print(f"{'状态':<10} {'数量':<6} {'负责人数'}")
    print("-" * 30)
    for row in conn.execute(
        "SELECT status, COUNT(*) as cnt, COUNT(DISTINCT assignee) as people "
        "FROM tasks GROUP BY status ORDER BY cnt DESC"
    ):
        print(f"{row['status']:<10} {row['cnt']:<6} {row['people']}")

    # 删除
    delete_task(conn, new_id)
    print(f"\n✓ 删除任务#{new_id}")
    print(f"剩余任务数: {conn.execute('SELECT COUNT(*) FROM tasks').fetchone()[0]}")
    conn.close()

demo_crud()

Step 7:用 demo_transaction 理解事务原子性,看 with conn 如何自动提交和回滚

痛点与机制

事务就像银行转账:从 A 账户扣钱和向 B 账户加钱必须同时成功,或者同时撤销——不能只扣不加。with conn: 是 sqlite3 内置的事务快捷方式:正常退出 with 块自动 commit,抛出异常自动 rollback。这比手动写 conn.commit() / conn.rollback() 更安全,不会因为忘记写而留下半截操作。

核心源码(逐字来自文末完整源码)

def demo_transaction() -> None:
    conn = sqlite3.connect(":memory:")
    conn.row_factory = sqlite3.Row
    setup_db(conn)

    print("\n=== 事务演示:批量状态迁移 ===")
    print("场景:将所有 'doing' 任务批量标记为 'done'(原子操作)\n")

    doing_before = conn.execute("SELECT COUNT(*) FROM tasks WHERE status='doing'").fetchone()[0]
    print(f"迁移前 doing 数量: {doing_before}")

    try:
        with conn:  # with conn 自动管理事务
            conn.execute("UPDATE tasks SET status='done' WHERE status='doing'")
            # 模拟中途检查
            count = conn.execute("SELECT COUNT(*) FROM tasks WHERE status='done'").fetchone()[0]
            print(f"事务中 done 数量: {count}")
            # 若此处抛异常,事务自动回滚
        print("✓ 事务提交成功")
    except Exception as e:
        print(f"✗ 事务回滚: {e}")

    doing_after = conn.execute("SELECT COUNT(*) FROM tasks WHERE status='doing'").fetchone()[0]
    done_after = conn.execute("SELECT COUNT(*) FROM tasks WHERE status='done'").fetchone()[0]
    print(f"迁移后 doing 数量: {doing_after}")
    print(f"迁移后 done  数量: {done_after}")
    conn.close()

可运行演示(补齐 Mock 数据与 print 反馈)

# 文件名: 16-python-mysql-basic.py
# 运行: python3 16-python-mysql-basic.py --mode crud
#       python3 16-python-mysql-basic.py --mode transaction
#       python3 16-python-mysql-basic.py --mode injection

import argparse
import sqlite3
from contextlib import contextmanager
from datetime import datetime
from typing import Generator

@contextmanager
def get_db(path: str = ":memory:") -> Generator[sqlite3.Connection, None, None]:
    """上下文管理器:自动提交/回滚,自动关闭"""
    conn = sqlite3.connect(path)
    conn.row_factory = sqlite3.Row  # 让结果支持列名访问
    conn.execute("PRAGMA journal_mode=WAL")  # 提升并发写性能
    try:
        yield conn
        conn.commit()
    except Exception:
        conn.rollback()
        raise
    finally:
        conn.close()

SCHEMA = """
CREATE TABLE IF NOT EXISTS tasks (
    id          INTEGER PRIMARY KEY AUTOINCREMENT,
    title       TEXT    NOT NULL,
    priority    TEXT    CHECK(priority IN ('low','medium','high')) DEFAULT 'medium',
    status      TEXT    CHECK(status IN ('pending','doing','done')) DEFAULT 'pending',
    assignee    TEXT,
    created_at  TEXT    DEFAULT (datetime('now','localtime')),
    updated_at  TEXT    DEFAULT (datetime('now','localtime'))
);
"""

SEED_DATA: list[tuple[str, str, str, str]] = [
    ("设计数据库 Schema",    "high",   "doing",   "alice"),
    ("编写 API 接口文档",    "medium", "pending", "bob"),
    ("实现用户认证模块",     "high",   "done",    "alice"),
    ("前端登录页面",         "medium", "doing",   "carol"),
    ("部署到测试环境",       "low",    "pending", "bob"),
    ("性能压测",             "high",   "pending", "alice"),
    ("修复 #42 Bug",         "high",   "done",    "carol"),
    ("更新 CHANGELOG",       "low",    "done",    "bob"),
]


def setup_db(conn: sqlite3.Connection) -> None:
    conn.executescript(SCHEMA)
    conn.executemany(
        "INSERT INTO tasks (title, priority, status, assignee) VALUES (?,?,?,?)",
        SEED_DATA,
    )
    conn.commit()

# with conn: 是事务的快捷方式:
#   正常退出 with 块 → 自动 commit
#   抛出异常         → 自动 rollback
# 比手动写 commit/rollback 更安全,不会忘记。
def demo_transaction() -> None:
    conn = sqlite3.connect(":memory:")
    conn.row_factory = sqlite3.Row
    setup_db(conn)

    print("\n=== 事务演示:批量状态迁移 ===")
    print("场景:将所有 'doing' 任务批量标记为 'done'(原子操作)\n")

    doing_before = conn.execute("SELECT COUNT(*) FROM tasks WHERE status='doing'").fetchone()[0]
    print(f"迁移前 doing 数量: {doing_before}")

    try:
        with conn:  # with conn 自动管理事务
            conn.execute("UPDATE tasks SET status='done' WHERE status='doing'")
            # 模拟中途检查
            count = conn.execute("SELECT COUNT(*) FROM tasks WHERE status='done'").fetchone()[0]
            print(f"事务中 done 数量: {count}")
            # 若此处抛异常,事务自动回滚
        print("✓ 事务提交成功")
    except Exception as e:
        print(f"✗ 事务回滚: {e}")

    doing_after = conn.execute("SELECT COUNT(*) FROM tasks WHERE status='doing'").fetchone()[0]
    done_after = conn.execute("SELECT COUNT(*) FROM tasks WHERE status='done'").fetchone()[0]
    print(f"迁移后 doing 数量: {doing_after}")
    print(f"迁移后 done  数量: {done_after}")
    conn.close()

demo_transaction()

Step 8:用 demo_injection 演示参数化查询防注入,用 main 做 CLI 总入口

痛点与机制

SQL 注入就像在点餐单上写"一份汉堡,顺便把厨房烧了"——如果服务员(数据库)直接执行字面意思,后果不堪设想。' OR '1'='1 是经典的注入 payload:直接拼入 SQL 会让 WHERE 条件永远为真,查出所有数据。参数化查询(? 占位符)让驱动把参数值转义后再传给数据库,输入内容永远只是"数据",不会被解析成 SQL 语法。mainargparse 做 CLI 入口,--mode 参数让读者不改代码就能切换演示场景。

核心源码(逐字来自文末完整源码)

def demo_injection() -> None:
    conn = sqlite3.connect(":memory:")
    conn.row_factory = sqlite3.Row
    setup_db(conn)

    malicious = "' OR '1'='1"
    print(f"\n=== SQL注入防护演示 ===")
    print(f"恶意输入: {malicious!r}\n")

    # 参数化查询:安全
    safe_result = conn.execute(
        "SELECT COUNT(*) as cnt FROM tasks WHERE assignee=?", (malicious,)
    ).fetchone()
    print(f"✅ 参数化查询结果: {safe_result['cnt']} 条(正确,无注入)")

    # 展示参数化查询的 SQL 计划
    print("\n参数化查询会将输入作为纯数据,不解析为 SQL 语法")
    print("即使输入包含 SQL 关键字,也只会被当作字符串值匹配")
    conn.close()


def main() -> None:
    parser = argparse.ArgumentParser(description="SQLite CRUD 演示")
    parser.add_argument(
        "--mode",
        choices=["crud", "transaction", "injection"],
        default="crud",
        help="crud=增删改查, transaction=事务, injection=注入防护",
    )
    args = parser.parse_args()

    if args.mode == "crud":
        demo_crud()
    elif args.mode == "transaction":
        demo_transaction()
    else:
        demo_injection()

可运行演示(补齐 Mock 数据与 print 反馈)

# 文件名: 16-python-mysql-basic.py
# 运行: python3 16-python-mysql-basic.py --mode crud
#       python3 16-python-mysql-basic.py --mode transaction
#       python3 16-python-mysql-basic.py --mode injection

import argparse
import sqlite3
from contextlib import contextmanager
from datetime import datetime
from typing import Generator

@contextmanager
def get_db(path: str = ":memory:") -> Generator[sqlite3.Connection, None, None]:
    """上下文管理器:自动提交/回滚,自动关闭"""
    conn = sqlite3.connect(path)
    conn.row_factory = sqlite3.Row  # 让结果支持列名访问
    conn.execute("PRAGMA journal_mode=WAL")  # 提升并发写性能
    try:
        yield conn
        conn.commit()
    except Exception:
        conn.rollback()
        raise
    finally:
        conn.close()

SCHEMA = """
CREATE TABLE IF NOT EXISTS tasks (
    id          INTEGER PRIMARY KEY AUTOINCREMENT,
    title       TEXT    NOT NULL,
    priority    TEXT    CHECK(priority IN ('low','medium','high')) DEFAULT 'medium',
    status      TEXT    CHECK(status IN ('pending','doing','done')) DEFAULT 'pending',
    assignee    TEXT,
    created_at  TEXT    DEFAULT (datetime('now','localtime')),
    updated_at  TEXT    DEFAULT (datetime('now','localtime'))
);
"""

SEED_DATA: list[tuple[str, str, str, str]] = [
    ("设计数据库 Schema",    "high",   "doing",   "alice"),
    ("编写 API 接口文档",    "medium", "pending", "bob"),
    ("实现用户认证模块",     "high",   "done",    "alice"),
    ("前端登录页面",         "medium", "doing",   "carol"),
    ("部署到测试环境",       "low",    "pending", "bob"),
    ("性能压测",             "high",   "pending", "alice"),
    ("修复 #42 Bug",         "high",   "done",    "carol"),
    ("更新 CHANGELOG",       "low",    "done",    "bob"),
]


def setup_db(conn: sqlite3.Connection) -> None:
    conn.executescript(SCHEMA)
    conn.executemany(
        "INSERT INTO tasks (title, priority, status, assignee) VALUES (?,?,?,?)",
        SEED_DATA,
    )
    conn.commit()

def create_task(conn: sqlite3.Connection, title: str, priority: str, assignee: str) -> int:
    cur = conn.execute(
        "INSERT INTO tasks (title, priority, assignee) VALUES (?,?,?)",
        (title, priority, assignee),
    )
    conn.commit()
    return cur.lastrowid or 0

def read_tasks(conn: sqlite3.Connection, status: str | None = None) -> list[sqlite3.Row]:
    if status:
        return conn.execute(
            "SELECT * FROM tasks WHERE status=? ORDER BY priority DESC, id", (status,)
        ).fetchall()
    return conn.execute("SELECT * FROM tasks ORDER BY id").fetchall()


def update_task_status(conn: sqlite3.Connection, task_id: int, new_status: str) -> int:
    cur = conn.execute(
        "UPDATE tasks SET status=?, updated_at=datetime('now','localtime') WHERE id=?",
        (new_status, task_id),
    )
    conn.commit()
    return cur.rowcount


def delete_task(conn: sqlite3.Connection, task_id: int) -> int:
    cur = conn.execute("DELETE FROM tasks WHERE id=?", (task_id,))
    conn.commit()
    return cur.rowcount


def print_tasks(rows: list[sqlite3.Row], title: str = "任务列表") -> None:
    print(f"\n=== {title} ===")
    print(f"{'ID':<4} {'标题':<22} {'优先级':<8} {'状态':<10} {'负责人':<8}")
    print("-" * 60)
    priority_map = {"high": "🔴高", "medium": "🟡中", "low": "🟢低"}
    status_map = {"pending": "待处理", "doing": "进行中", "done": "已完成"}
    for row in rows:
        p = priority_map.get(row["priority"], row["priority"])
        s = status_map.get(row["status"], row["status"])
        print(f"{row['id']:<4} {row['title']:<22} {p:<8} {s:<10} {row['assignee'] or '-':<8}")
    print(f"共 {len(rows)} 条")


def demo_crud() -> None:
    conn = sqlite3.connect(":memory:")
    conn.row_factory = sqlite3.Row
    setup_db(conn)

    # 查询全部
    print_tasks(read_tasks(conn), "初始任务列表")

    # 新增
    new_id = create_task(conn, "撰写技术博客", "medium", "dave")
    print(f"\n✓ 新增任务 ID={new_id}")

    # 更新
    affected = update_task_status(conn, 2, "done")
    print(f"✓ 更新任务#2状态 → done,影响 {affected} 行")

    # 按状态筛选
    print_tasks(read_tasks(conn, "pending"), "待处理任务")

    # 聚合统计
    print("\n=== 按状态统计 ===")
    print(f"{'状态':<10} {'数量':<6} {'负责人数'}")
    print("-" * 30)
    for row in conn.execute(
        "SELECT status, COUNT(*) as cnt, COUNT(DISTINCT assignee) as people "
        "FROM tasks GROUP BY status ORDER BY cnt DESC"
    ):
        print(f"{row['status']:<10} {row['cnt']:<6} {row['people']}")

    # 删除
    delete_task(conn, new_id)
    print(f"\n✓ 删除任务#{new_id}")
    print(f"剩余任务数: {conn.execute('SELECT COUNT(*) FROM tasks').fetchone()[0]}")
    conn.close()

def demo_transaction() -> None:
    conn = sqlite3.connect(":memory:")
    conn.row_factory = sqlite3.Row
    setup_db(conn)

    print("\n=== 事务演示:批量状态迁移 ===")
    print("场景:将所有 'doing' 任务批量标记为 'done'(原子操作)\n")

    doing_before = conn.execute("SELECT COUNT(*) FROM tasks WHERE status='doing'").fetchone()[0]
    print(f"迁移前 doing 数量: {doing_before}")

    try:
        with conn:  # with conn 自动管理事务
            conn.execute("UPDATE tasks SET status='done' WHERE status='doing'")
            # 模拟中途检查
            count = conn.execute("SELECT COUNT(*) FROM tasks WHERE status='done'").fetchone()[0]
            print(f"事务中 done 数量: {count}")
            # 若此处抛异常,事务自动回滚
        print("✓ 事务提交成功")
    except Exception as e:
        print(f"✗ 事务回滚: {e}")

    doing_after = conn.execute("SELECT COUNT(*) FROM tasks WHERE status='doing'").fetchone()[0]
    done_after = conn.execute("SELECT COUNT(*) FROM tasks WHERE status='done'").fetchone()[0]
    print(f"迁移后 doing 数量: {doing_after}")
    print(f"迁移后 done  数量: {done_after}")
    conn.close()

# SQL 注入:恶意输入 "' OR '1'='1" 如果被直接拼入 SQL,WHERE 条件永远为真。
# 参数化查询后,这个字符串只是普通数据,查不到任何结果。
def demo_injection() -> None:
    conn = sqlite3.connect(":memory:")
    conn.row_factory = sqlite3.Row
    setup_db(conn)

    malicious = "' OR '1'='1"
    print(f"\n=== SQL注入防护演示 ===")
    print(f"恶意输入: {malicious!r}\n")

    # 参数化查询:安全
    safe_result = conn.execute(
        "SELECT COUNT(*) as cnt FROM tasks WHERE assignee=?", (malicious,)
    ).fetchone()
    print(f"✅ 参数化查询结果: {safe_result['cnt']} 条(正确,无注入)")

    # 展示参数化查询的 SQL 计划
    print("\n参数化查询会将输入作为纯数据,不解析为 SQL 语法")
    print("即使输入包含 SQL 关键字,也只会被当作字符串值匹配")
    conn.close()


def main() -> None:
    parser = argparse.ArgumentParser(description="SQLite CRUD 演示")
    parser.add_argument(
        "--mode",
        choices=["crud", "transaction", "injection"],
        default="crud",
        help="crud=增删改查, transaction=事务, injection=注入防护",
    )
    args = parser.parse_args()

    if args.mode == "crud":
        demo_crud()
    elif args.mode == "transaction":
        demo_transaction()
    else:
        demo_injection()

import sys
for mode in ["injection", "crud"]:
    print(f"\n>>> mode={mode}")
    sys.argv = ["prog", "--mode", mode]
    main()

极客实战:完整源码与运行

现在,把上面的积木拼起来,将以下完整代码放进你的编辑器,运行它。先看整体闭环,再回头逐段改参数,你会更容易建立工程直觉。

# 文件名: 16-python-mysql-basic.py
# 运行: python3 16-python-mysql-basic.py --mode crud
#       python3 16-python-mysql-basic.py --mode transaction
#       python3 16-python-mysql-basic.py --mode injection

import argparse
import sqlite3
from contextlib import contextmanager
from datetime import datetime
from typing import Generator


# ── 数据库连接管理 ──────────────────────────────────────────
@contextmanager
def get_db(path: str = ":memory:") -> Generator[sqlite3.Connection, None, None]:
    """上下文管理器:自动提交/回滚,自动关闭"""
    conn = sqlite3.connect(path)
    conn.row_factory = sqlite3.Row  # 让结果支持列名访问
    conn.execute("PRAGMA journal_mode=WAL")  # 提升并发写性能
    try:
        yield conn
        conn.commit()
    except Exception:
        conn.rollback()
        raise
    finally:
        conn.close()


# ── 建表 ────────────────────────────────────────────────────
SCHEMA = """
CREATE TABLE IF NOT EXISTS tasks (
    id          INTEGER PRIMARY KEY AUTOINCREMENT,
    title       TEXT    NOT NULL,
    priority    TEXT    CHECK(priority IN ('low','medium','high')) DEFAULT 'medium',
    status      TEXT    CHECK(status IN ('pending','doing','done')) DEFAULT 'pending',
    assignee    TEXT,
    created_at  TEXT    DEFAULT (datetime('now','localtime')),
    updated_at  TEXT    DEFAULT (datetime('now','localtime'))
);
"""

SEED_DATA: list[tuple[str, str, str, str]] = [
    ("设计数据库 Schema",    "high",   "doing",   "alice"),
    ("编写 API 接口文档",    "medium", "pending", "bob"),
    ("实现用户认证模块",     "high",   "done",    "alice"),
    ("前端登录页面",         "medium", "doing",   "carol"),
    ("部署到测试环境",       "low",    "pending", "bob"),
    ("性能压测",             "high",   "pending", "alice"),
    ("修复 #42 Bug",         "high",   "done",    "carol"),
    ("更新 CHANGELOG",       "low",    "done",    "bob"),
]


def setup_db(conn: sqlite3.Connection) -> None:
    conn.executescript(SCHEMA)
    conn.executemany(
        "INSERT INTO tasks (title, priority, status, assignee) VALUES (?,?,?,?)",
        SEED_DATA,
    )
    conn.commit()


# ── CRUD 操作 ───────────────────────────────────────────────
def create_task(conn: sqlite3.Connection, title: str, priority: str, assignee: str) -> int:
    cur = conn.execute(
        "INSERT INTO tasks (title, priority, assignee) VALUES (?,?,?)",
        (title, priority, assignee),
    )
    conn.commit()
    return cur.lastrowid or 0


def read_tasks(conn: sqlite3.Connection, status: str | None = None) -> list[sqlite3.Row]:
    if status:
        return conn.execute(
            "SELECT * FROM tasks WHERE status=? ORDER BY priority DESC, id", (status,)
        ).fetchall()
    return conn.execute("SELECT * FROM tasks ORDER BY id").fetchall()


def update_task_status(conn: sqlite3.Connection, task_id: int, new_status: str) -> int:
    cur = conn.execute(
        "UPDATE tasks SET status=?, updated_at=datetime('now','localtime') WHERE id=?",
        (new_status, task_id),
    )
    conn.commit()
    return cur.rowcount


def delete_task(conn: sqlite3.Connection, task_id: int) -> int:
    cur = conn.execute("DELETE FROM tasks WHERE id=?", (task_id,))
    conn.commit()
    return cur.rowcount


def print_tasks(rows: list[sqlite3.Row], title: str = "任务列表") -> None:
    print(f"\n=== {title} ===")
    print(f"{'ID':<4} {'标题':<22} {'优先级':<8} {'状态':<10} {'负责人':<8}")
    print("-" * 60)
    priority_map = {"high": "🔴高", "medium": "🟡中", "low": "🟢低"}
    status_map = {"pending": "待处理", "doing": "进行中", "done": "已完成"}
    for row in rows:
        p = priority_map.get(row["priority"], row["priority"])
        s = status_map.get(row["status"], row["status"])
        print(f"{row['id']:<4} {row['title']:<22} {p:<8} {s:<10} {row['assignee'] or '-':<8}")
    print(f"共 {len(rows)} 条")


def demo_crud() -> None:
    conn = sqlite3.connect(":memory:")
    conn.row_factory = sqlite3.Row
    setup_db(conn)

    # 查询全部
    print_tasks(read_tasks(conn), "初始任务列表")

    # 新增
    new_id = create_task(conn, "撰写技术博客", "medium", "dave")
    print(f"\n✓ 新增任务 ID={new_id}")

    # 更新
    affected = update_task_status(conn, 2, "done")
    print(f"✓ 更新任务#2状态 → done,影响 {affected} 行")

    # 按状态筛选
    print_tasks(read_tasks(conn, "pending"), "待处理任务")

    # 聚合统计
    print("\n=== 按状态统计 ===")
    print(f"{'状态':<10} {'数量':<6} {'负责人数'}")
    print("-" * 30)
    for row in conn.execute(
        "SELECT status, COUNT(*) as cnt, COUNT(DISTINCT assignee) as people "
        "FROM tasks GROUP BY status ORDER BY cnt DESC"
    ):
        print(f"{row['status']:<10} {row['cnt']:<6} {row['people']}")

    # 删除
    delete_task(conn, new_id)
    print(f"\n✓ 删除任务#{new_id}")
    print(f"剩余任务数: {conn.execute('SELECT COUNT(*) FROM tasks').fetchone()[0]}")
    conn.close()


# ── 事务演示 ────────────────────────────────────────────────
def demo_transaction() -> None:
    conn = sqlite3.connect(":memory:")
    conn.row_factory = sqlite3.Row
    setup_db(conn)

    print("\n=== 事务演示:批量状态迁移 ===")
    print("场景:将所有 'doing' 任务批量标记为 'done'(原子操作)\n")

    doing_before = conn.execute("SELECT COUNT(*) FROM tasks WHERE status='doing'").fetchone()[0]
    print(f"迁移前 doing 数量: {doing_before}")

    try:
        with conn:  # with conn 自动管理事务
            conn.execute("UPDATE tasks SET status='done' WHERE status='doing'")
            # 模拟中途检查
            count = conn.execute("SELECT COUNT(*) FROM tasks WHERE status='done'").fetchone()[0]
            print(f"事务中 done 数量: {count}")
            # 若此处抛异常,事务自动回滚
        print("✓ 事务提交成功")
    except Exception as e:
        print(f"✗ 事务回滚: {e}")

    doing_after = conn.execute("SELECT COUNT(*) FROM tasks WHERE status='doing'").fetchone()[0]
    done_after = conn.execute("SELECT COUNT(*) FROM tasks WHERE status='done'").fetchone()[0]
    print(f"迁移后 doing 数量: {doing_after}")
    print(f"迁移后 done  数量: {done_after}")
    conn.close()


# ── SQL注入防护演示 ─────────────────────────────────────────
def demo_injection() -> None:
    conn = sqlite3.connect(":memory:")
    conn.row_factory = sqlite3.Row
    setup_db(conn)

    malicious = "' OR '1'='1"
    print(f"\n=== SQL注入防护演示 ===")
    print(f"恶意输入: {malicious!r}\n")

    # 参数化查询:安全
    safe_result = conn.execute(
        "SELECT COUNT(*) as cnt FROM tasks WHERE assignee=?", (malicious,)
    ).fetchone()
    print(f"✅ 参数化查询结果: {safe_result['cnt']} 条(正确,无注入)")

    # 展示参数化查询的 SQL 计划
    print("\n参数化查询会将输入作为纯数据,不解析为 SQL 语法")
    print("即使输入包含 SQL 关键字,也只会被当作字符串值匹配")
    conn.close()


def main() -> None:
    parser = argparse.ArgumentParser(description="SQLite CRUD 演示")
    parser.add_argument(
        "--mode",
        choices=["crud", "transaction", "injection"],
        default="crud",
        help="crud=增删改查, transaction=事务, injection=注入防护",
    )
    args = parser.parse_args()

    if args.mode == "crud":
        demo_crud()
    elif args.mode == "transaction":
        demo_transaction()
    else:
        demo_injection()


if __name__ == "__main__":
    main()
$ python3 16-python-mysql-basic.py --mode crud

=== 初始任务列表 ===
ID   标题                     优先级      状态         负责人
------------------------------------------------------------
1    设计数据库 Schema          🔴高       进行中        alice
2    编写 API 接口文档           🟡中       待处理        bob
3    实现用户认证模块              🔴高       已完成        alice
4    前端登录页面                🟡中       进行中        carol
5    部署到测试环境               🟢低       待处理        bob
6    性能压测                  🔴高       待处理        alice
7    修复 #42 Bug            🔴高       已完成        carol
8    更新 CHANGELOG          🟢低       已完成        bob
8
✓ 新增任务 ID=9
✓ 更新任务#2状态 → done,影响 1
=== 待处理任务 ===
ID   标题                     优先级      状态         负责人
------------------------------------------------------------
5    部署到测试环境               🟢低       待处理        bob
6    性能压测                  🔴高       待处理        alice
2
=== 按状态统计 ===
状态         数量     负责人数
------------------------------
done       4      3
pending    2      2
doing      2      2

✓ 删除任务#9
剩余任务数: 8

$ python3 16-python-mysql-basic.py --mode transaction

=== 事务演示:批量状态迁移 ===
场景:将所有 'doing' 任务批量标记为 'done'(原子操作)

迁移前 doing 数量: 2
事务中 done 数量: 5
✓ 事务提交成功
迁移后 doing 数量: 0
迁移后 done  数量: 5

$ python3 16-python-mysql-basic.py --mode injection

=== SQL注入防护演示 ===
恶意输入: "' OR '1'='1"

✅ 参数化查询结果: 0 条(正确,无注入)

参数化查询会将输入作为纯数据,不解析为 SQL 语法
即使输入包含 SQL 关键字,也只会被当作字符串值匹配

小结

概念 一句话记忆
get_db 上下文管理器 打开→提交→关闭,出错自动回滚,连接不泄漏
CHECK 约束 数据库层面的门卫,非法值直接拒绝
? 占位符 参数永远是"数据",不会被解析成 SQL 语法
lastrowid 刚插入行的自增主键,插完立刻拿到编号
rowcount 写操作的回执,0 = WHERE 没有命中任何行
with conn: 事务快捷方式,正常退出提交,异常自动回滚
:memory: 内存数据库,进程结束自动消失,适合测试

⏱ NexDo Time(5 分钟)

挑战:给 tasks 表实现软删除——不真正删除行,而是加一个 deleted_at TEXT 字段记录删除时间,查询时自动过滤已删除记录。

具体要求:

  1. SCHEMA 里加 deleted_at TEXT DEFAULT NULL 字段
  2. delete_task 改为 soft_delete_task:执行 UPDATE tasks SET deleted_at=datetime('now','localtime') WHERE id=?
  3. 修改 read_tasks:在 WHERE 条件里加 AND deleted_at IS NULL,让已删除的任务不再出现在查询结果中
  4. 运行验证:软删除一条任务后,read_tasks 查不到它,但直接 SELECT * 还能看到那条记录(deleted_at 不为空)

Don’t wait for next time, do it in the next moment.