17 · 数据库进阶:索引、事务与查询优化
🔗 知识图谱导航:阅读本文前,建议先掌握《16 · 数据库底座:SQLite 核心操作与 Python 接入》中的 CRUD、参数化查询与事务基础——本文在此之上深入索引原理与多表查询,让数据库从"能用"变成"用好"。
极客解析:数据库性能问题 80% 来自两件事:缺索引和写了低效 SQL。本文用可运行的计时对比让你亲眼看到索引的加速效果,用
EXPLAIN QUERY PLAN教你验证索引是否真的生效。
痛点与架构:新手学数据库进阶最常见的三个坑:① 不知道什么时候该建索引;② 写了 JOIN 但不知道 INNER/LEFT 的区别;③ GROUP BY 和 HAVING 傻傻分不清。本文用三张关联表(用户、任务、操作日志)把这三个坑一次性填平。
B+树索引原理
无索引查询:全表扫描 O(n)
┌────┬────┬────┬────┬────┬────┬────┬────┐
│ 1 │ 2 │ 3 │ 4 │ 5 │ 6 │ 7 │ 8 │ 逐行扫描
└────┴────┴────┴────┴────┴────┴────┴────┘
查找 id=7 → 扫描 7 次
B+树索引:O(log n)
┌─────┐
│ 4 │ ← 根节点
└──┬──┘
┌────────┴────────┐
┌─┴─┐ ┌─┴─┐
│1,2│ │5,6│ ← 内部节点(只存键)
└─┬─┘ └─┬─┘
┌────┴────┐ ┌────┴────┐
[1→r1][2→r2] [5→r5][6→r6] ← 叶节点(存键+行指针)
↕ ↕
[3→r3][4→r4] [7→r7][8→r8] ← 叶节点链表(支持范围查询)
查找 id=7 → 3 次比较
索引代价:写入时需维护 B+树,INSERT/UPDATE/DELETE 变慢;占用额外磁盘空间;选择性低的列(如 status 只有 3 个值)建索引收益有限。
事务 ACID
┌──────────────┬──────────────────────────────────────────┐
│ 属性 │ 含义 │
├──────────────┼──────────────────────────────────────────┤
│ Atomicity │ 原子性:全成功或全回滚,不存在中间状态 │
│ Consistency │ 一致性:事务前后数据满足约束条件 │
│ Isolation │ 隔离性:并发事务互不干扰 │
│ Durability │ 持久性:提交后数据永久保存 │
└──────────────┴──────────────────────────────────────────┘
查询优化核心原则
┌─────────────────────────────────────────────────────────┐
│ 优化原则 │
├─────────────────────────────────────────────────────────┤
│ 1. 只查需要的列:SELECT id,name 而非 SELECT * │
│ 2. WHERE 条件列建索引,避免全表扫描 │
│ 3. 复合索引遵循最左前缀原则 │
│ 4. 避免在 WHERE 中对列做函数运算(索引失效) │
│ ❌ WHERE YEAR(created_at) = 2026 │
│ ✅ WHERE created_at BETWEEN '2026-01-01' AND '...' │
│ 5. LIMIT 提前截断,减少传输数据量 │
│ 6. 用 EXPLAIN QUERY PLAN 验证索引是否生效 │
└─────────────────────────────────────────────────────────┘
步步为营:核心逻辑自适应拆解
这一篇的核心是三件事:索引让查询从 O(n) 变成 O(log n);JOIN 把多张表的数据关联起来;GROUP BY + HAVING 做多维聚合统计。下面每一步都聚焦一个概念,跑完就能看到结果。
Step 1:用三张表的 SCHEMA 建立多表关联数据模型
痛点与机制:
单张表只能存一种实体,真实业务需要多张表协作。REFERENCES users(id) 是外键声明——就像合同里写"乙方必须是甲方名单里的人",数据库会帮你检查引用完整性,不能给一个不存在的用户分配任务。三张表的分工:users 存人,tasks 存事,task_logs 存操作历史,这是最常见的"实体-关系-日志"三层结构。
核心源码(逐字来自文末完整源码):
SCHEMA = """
CREATE TABLE IF NOT EXISTS users (
id INTEGER PRIMARY KEY AUTOINCREMENT,
username TEXT NOT NULL UNIQUE,
role TEXT DEFAULT 'member',
team TEXT,
created_at TEXT DEFAULT (datetime('now','localtime'))
);
CREATE TABLE IF NOT EXISTS tasks (
id INTEGER PRIMARY KEY AUTOINCREMENT,
title TEXT NOT NULL,
priority TEXT DEFAULT 'medium',
status TEXT DEFAULT 'pending',
assignee_id INTEGER REFERENCES users(id),
created_at TEXT DEFAULT (datetime('now','localtime')),
due_date TEXT
);
CREATE TABLE IF NOT EXISTS task_logs (
id INTEGER PRIMARY KEY AUTOINCREMENT,
task_id INTEGER REFERENCES tasks(id),
user_id INTEGER REFERENCES users(id),
action TEXT,
detail TEXT,
logged_at TEXT DEFAULT (datetime('now','localtime'))
);
"""
可运行演示(补齐 Mock 数据与 print 反馈):
# 运行: python3 17-python-mysql-advanced.py --mode index
# python3 17-python-mysql-advanced.py --mode join
# python3 17-python-mysql-advanced.py --mode aggregate
import argparse
import sqlite3
import time
import random
from typing import Any
SCHEMA = """
CREATE TABLE IF NOT EXISTS users (
id INTEGER PRIMARY KEY AUTOINCREMENT,
username TEXT NOT NULL UNIQUE,
role TEXT DEFAULT 'member',
team TEXT,
created_at TEXT DEFAULT (datetime('now','localtime'))
);
CREATE TABLE IF NOT EXISTS tasks (
id INTEGER PRIMARY KEY AUTOINCREMENT,
title TEXT NOT NULL,
priority TEXT DEFAULT 'medium',
status TEXT DEFAULT 'pending',
assignee_id INTEGER REFERENCES users(id),
created_at TEXT DEFAULT (datetime('now','localtime')),
due_date TEXT
);
CREATE TABLE IF NOT EXISTS task_logs (
id INTEGER PRIMARY KEY AUTOINCREMENT,
task_id INTEGER REFERENCES tasks(id),
user_id INTEGER REFERENCES users(id),
action TEXT,
detail TEXT,
logged_at TEXT DEFAULT (datetime('now','localtime'))
);
"""
USERS = [
("alice", "lead", "backend"),
("bob", "member", "backend"),
("carol", "member", "frontend"),
("dave", "lead", "frontend"),
("eve", "member", "devops"),
]
PRIORITIES = ["low", "medium", "high"]
STATUSES = ["pending", "doing", "done"]
ACTIONS = ["created", "updated", "commented", "closed"]
def setup_db(n_tasks: int = 500) -> sqlite3.Connection:
conn = sqlite3.connect(":memory:")
conn.row_factory = sqlite3.Row
conn.executescript(SCHEMA)
conn.executemany(
"INSERT INTO users (username, role, team) VALUES (?,?,?)", USERS
)
rng = random.Random(42)
tasks = [
(
f"Task-{i:04d}",
rng.choice(PRIORITIES),
rng.choice(STATUSES),
rng.randint(1, len(USERS)),
f"2026-04-{rng.randint(1,30):02d}",
)
for i in range(1, n_tasks + 1)
]
conn.executemany(
"INSERT INTO tasks (title, priority, status, assignee_id, due_date) VALUES (?,?,?,?,?)",
tasks,
)
logs = [
(rng.randint(1, n_tasks), rng.randint(1, len(USERS)), rng.choice(ACTIONS), f"detail-{i}")
for i in range(1, n_tasks * 3 + 1)
]
conn.executemany(
"INSERT INTO task_logs (task_id, user_id, action, detail) VALUES (?,?,?,?)",
logs,
)
conn.commit()
return conn
# Step 1:SCHEMA 定义了三张表:users(人)、tasks(事)、task_logs(日志)。
# REFERENCES users(id) 是外键声明,让数据库维护"引用完整性"。
conn = __import__('sqlite3').connect(":memory:")
conn.row_factory = __import__('sqlite3').Row
conn.executescript(SCHEMA)
tables = conn.execute("SELECT name FROM sqlite_master WHERE type='table'").fetchall()
print("已建表:", [r['name'] for r in tables])
conn.close()
Step 2:用 setup_db 批量生成测试数据,理解 random.Random(42) 的可复现性
痛点与机制:
random.Random(42) 是"带种子的随机数生成器"——就像用同一副牌按同一规则洗牌,每次洗出来的顺序完全相同。种子固定 → 每次运行生成的随机序列完全相同 → 测试结果可复现,不会"今天过明天不过"。executemany 批量插入只编译一次 SQL,用不同参数重复执行,比循环调用 execute 快得多。
核心源码(逐字来自文末完整源码):
SCHEMA = """
CREATE TABLE IF NOT EXISTS users (
id INTEGER PRIMARY KEY AUTOINCREMENT,
username TEXT NOT NULL UNIQUE,
role TEXT DEFAULT 'member',
team TEXT,
created_at TEXT DEFAULT (datetime('now','localtime'))
);
CREATE TABLE IF NOT EXISTS tasks (
id INTEGER PRIMARY KEY AUTOINCREMENT,
title TEXT NOT NULL,
priority TEXT DEFAULT 'medium',
status TEXT DEFAULT 'pending',
assignee_id INTEGER REFERENCES users(id),
created_at TEXT DEFAULT (datetime('now','localtime')),
due_date TEXT
);
CREATE TABLE IF NOT EXISTS task_logs (
id INTEGER PRIMARY KEY AUTOINCREMENT,
task_id INTEGER REFERENCES tasks(id),
user_id INTEGER REFERENCES users(id),
action TEXT,
detail TEXT,
logged_at TEXT DEFAULT (datetime('now','localtime'))
);
"""
USERS = [
("alice", "lead", "backend"),
("bob", "member", "backend"),
("carol", "member", "frontend"),
("dave", "lead", "frontend"),
("eve", "member", "devops"),
]
PRIORITIES = ["low", "medium", "high"]
STATUSES = ["pending", "doing", "done"]
ACTIONS = ["created", "updated", "commented", "closed"]
def setup_db(n_tasks: int = 500) -> sqlite3.Connection:
conn = sqlite3.connect(":memory:")
conn.row_factory = sqlite3.Row
conn.executescript(SCHEMA)
conn.executemany(
"INSERT INTO users (username, role, team) VALUES (?,?,?)", USERS
)
rng = random.Random(42)
tasks = [
(
f"Task-{i:04d}",
rng.choice(PRIORITIES),
rng.choice(STATUSES),
rng.randint(1, len(USERS)),
f"2026-04-{rng.randint(1,30):02d}",
)
for i in range(1, n_tasks + 1)
]
conn.executemany(
"INSERT INTO tasks (title, priority, status, assignee_id, due_date) VALUES (?,?,?,?,?)",
tasks,
)
logs = [
(rng.randint(1, n_tasks), rng.randint(1, len(USERS)), rng.choice(ACTIONS), f"detail-{i}")
for i in range(1, n_tasks * 3 + 1)
]
conn.executemany(
"INSERT INTO task_logs (task_id, user_id, action, detail) VALUES (?,?,?,?)",
logs,
)
conn.commit()
return conn
可运行演示(补齐 Mock 数据与 print 反馈):
# 文件名: 17-python-mysql-advanced.py
# 运行: python3 17-python-mysql-advanced.py --mode index
# python3 17-python-mysql-advanced.py --mode join
# python3 17-python-mysql-advanced.py --mode aggregate
import argparse
import sqlite3
import time
import random
from typing import Any
SCHEMA = """
CREATE TABLE IF NOT EXISTS users (
id INTEGER PRIMARY KEY AUTOINCREMENT,
username TEXT NOT NULL UNIQUE,
role TEXT DEFAULT 'member',
team TEXT,
created_at TEXT DEFAULT (datetime('now','localtime'))
);
CREATE TABLE IF NOT EXISTS tasks (
id INTEGER PRIMARY KEY AUTOINCREMENT,
title TEXT NOT NULL,
priority TEXT DEFAULT 'medium',
status TEXT DEFAULT 'pending',
assignee_id INTEGER REFERENCES users(id),
created_at TEXT DEFAULT (datetime('now','localtime')),
due_date TEXT
);
CREATE TABLE IF NOT EXISTS task_logs (
id INTEGER PRIMARY KEY AUTOINCREMENT,
task_id INTEGER REFERENCES tasks(id),
user_id INTEGER REFERENCES users(id),
action TEXT,
detail TEXT,
logged_at TEXT DEFAULT (datetime('now','localtime'))
);
"""
USERS = [
("alice", "lead", "backend"),
("bob", "member", "backend"),
("carol", "member", "frontend"),
("dave", "lead", "frontend"),
("eve", "member", "devops"),
]
PRIORITIES = ["low", "medium", "high"]
STATUSES = ["pending", "doing", "done"]
ACTIONS = ["created", "updated", "commented", "closed"]
def setup_db(n_tasks: int = 500) -> sqlite3.Connection:
conn = sqlite3.connect(":memory:")
conn.row_factory = sqlite3.Row
conn.executescript(SCHEMA)
conn.executemany(
"INSERT INTO users (username, role, team) VALUES (?,?,?)", USERS
)
rng = random.Random(42)
tasks = [
(
f"Task-{i:04d}",
rng.choice(PRIORITIES),
rng.choice(STATUSES),
rng.randint(1, len(USERS)),
f"2026-04-{rng.randint(1,30):02d}",
)
for i in range(1, n_tasks + 1)
]
conn.executemany(
"INSERT INTO tasks (title, priority, status, assignee_id, due_date) VALUES (?,?,?,?,?)",
tasks,
)
logs = [
(rng.randint(1, n_tasks), rng.randint(1, len(USERS)), rng.choice(ACTIONS), f"detail-{i}")
for i in range(1, n_tasks * 3 + 1)
]
conn.executemany(
"INSERT INTO task_logs (task_id, user_id, action, detail) VALUES (?,?,?,?)",
logs,
)
conn.commit()
return conn
# Step 2:random.Random(42) 是带种子的随机数生成器,种子固定则每次结果相同,测试可复现。
conn = setup_db(200)
print(f"users: {conn.execute('SELECT COUNT(*) FROM users').fetchone()[0]} 行")
print(f"tasks: {conn.execute('SELECT COUNT(*) FROM tasks').fetchone()[0]} 行")
print(f"task_logs: {conn.execute('SELECT COUNT(*) FROM task_logs').fetchone()[0]} 行")
conn.close()
Step 3:用 timed_query 计时,看无索引全表扫描的真实耗时
痛点与机制:
timed_query 是一个"计时包装器":用 perf_counter 精确到微秒级,把查询耗时和结果行数一起打印出来,让性能差距一眼可见。无索引时,WHERE status='done' 必须逐行扫描整张表——就像在没有目录的字典里找一个词,数据越多越慢,这就是 O(n) 全表扫描。
核心源码(逐字来自文末完整源码):
def demo_index() -> None:
conn = setup_db(5000)
def timed_query(label: str, sql: str, params: tuple[Any, ...] = ()) -> float:
t0 = time.perf_counter()
rows = conn.execute(sql, params).fetchall()
elapsed = (time.perf_counter() - t0) * 1000
print(f" {label:<30} {elapsed:>8.3f}ms 结果: {len(rows)} 行")
return elapsed
print("\n=== 索引性能对比(5000条任务)===\n")
print(f" {'查询':<30} {'耗时':>8} {'说明'}")
print(" " + "-" * 55)
# 无索引
t1 = timed_query("无索引: WHERE status='done'",
"SELECT * FROM tasks WHERE status='done'")
t2 = timed_query("无索引: WHERE assignee_id=3",
"SELECT * FROM tasks WHERE assignee_id=?", (3,))
可运行演示(补齐 Mock 数据与 print 反馈):
# 文件名: 17-python-mysql-advanced.py
# 运行: python3 17-python-mysql-advanced.py --mode index
# python3 17-python-mysql-advanced.py --mode join
# python3 17-python-mysql-advanced.py --mode aggregate
import argparse
import sqlite3
import time
import random
from typing import Any
SCHEMA = """
CREATE TABLE IF NOT EXISTS users (
id INTEGER PRIMARY KEY AUTOINCREMENT,
username TEXT NOT NULL UNIQUE,
role TEXT DEFAULT 'member',
team TEXT,
created_at TEXT DEFAULT (datetime('now','localtime'))
);
CREATE TABLE IF NOT EXISTS tasks (
id INTEGER PRIMARY KEY AUTOINCREMENT,
title TEXT NOT NULL,
priority TEXT DEFAULT 'medium',
status TEXT DEFAULT 'pending',
assignee_id INTEGER REFERENCES users(id),
created_at TEXT DEFAULT (datetime('now','localtime')),
due_date TEXT
);
CREATE TABLE IF NOT EXISTS task_logs (
id INTEGER PRIMARY KEY AUTOINCREMENT,
task_id INTEGER REFERENCES tasks(id),
user_id INTEGER REFERENCES users(id),
action TEXT,
detail TEXT,
logged_at TEXT DEFAULT (datetime('now','localtime'))
);
"""
USERS = [
("alice", "lead", "backend"),
("bob", "member", "backend"),
("carol", "member", "frontend"),
("dave", "lead", "frontend"),
("eve", "member", "devops"),
]
PRIORITIES = ["low", "medium", "high"]
STATUSES = ["pending", "doing", "done"]
ACTIONS = ["created", "updated", "commented", "closed"]
def setup_db(n_tasks: int = 500) -> sqlite3.Connection:
conn = sqlite3.connect(":memory:")
conn.row_factory = sqlite3.Row
conn.executescript(SCHEMA)
conn.executemany(
"INSERT INTO users (username, role, team) VALUES (?,?,?)", USERS
)
rng = random.Random(42)
tasks = [
(
f"Task-{i:04d}",
rng.choice(PRIORITIES),
rng.choice(STATUSES),
rng.randint(1, len(USERS)),
f"2026-04-{rng.randint(1,30):02d}",
)
for i in range(1, n_tasks + 1)
]
conn.executemany(
"INSERT INTO tasks (title, priority, status, assignee_id, due_date) VALUES (?,?,?,?,?)",
tasks,
)
logs = [
(rng.randint(1, n_tasks), rng.randint(1, len(USERS)), rng.choice(ACTIONS), f"detail-{i}")
for i in range(1, n_tasks * 3 + 1)
]
conn.executemany(
"INSERT INTO task_logs (task_id, user_id, action, detail) VALUES (?,?,?,?)",
logs,
)
conn.commit()
return conn
# Step 3:timed_query 是计时包装器,用 perf_counter 精确到微秒级。
# 无索引时 WHERE status='done' 必须逐行扫描整张表。
def demo_index_preview():
conn = setup_db(3000)
def timed_query(label, sql, params=()):
import time
t0 = time.perf_counter()
rows = conn.execute(sql, params).fetchall()
elapsed = (time.perf_counter() - t0) * 1000
print(f" {label:<30} {elapsed:>8.3f}ms 结果: {len(rows)} 行")
return elapsed
print("\n=== 无索引查询计时 ===")
timed_query("无索引: WHERE status='done'", "SELECT * FROM tasks WHERE status='done'")
timed_query("无索引: WHERE assignee_id=3", "SELECT * FROM tasks WHERE assignee_id=?", (3,))
conn.close()
demo_index_preview()
Step 4:用 CREATE INDEX 建索引,对比加速效果,用 EXPLAIN QUERY PLAN 验证
痛点与机制:
CREATE INDEX 告诉数据库"为这列建一棵 B+树"——就像给字典加目录,查找从"逐页翻"变成"先查目录再翻到对应页"。建完索引后,WHERE status='done' 从全表扫描变成树上二分查找,速度提升数倍。EXPLAIN QUERY PLAN 是 SQLite 的"查询计划查看器":输出里出现 USING INDEX 说明索引生效,出现 SCAN TABLE 说明还在全表扫描。
核心源码(逐字来自文末完整源码):
def demo_index() -> None:
conn = setup_db(5000)
def timed_query(label: str, sql: str, params: tuple[Any, ...] = ()) -> float:
t0 = time.perf_counter()
rows = conn.execute(sql, params).fetchall()
elapsed = (time.perf_counter() - t0) * 1000
print(f" {label:<30} {elapsed:>8.3f}ms 结果: {len(rows)} 行")
return elapsed
print("\n=== 索引性能对比(5000条任务)===\n")
print(f" {'查询':<30} {'耗时':>8} {'说明'}")
print(" " + "-" * 55)
# 无索引
t1 = timed_query("无索引: WHERE status='done'",
"SELECT * FROM tasks WHERE status='done'")
t2 = timed_query("无索引: WHERE assignee_id=3",
"SELECT * FROM tasks WHERE assignee_id=?", (3,))
# 创建索引
conn.execute("CREATE INDEX idx_tasks_status ON tasks(status)")
conn.execute("CREATE INDEX idx_tasks_assignee ON tasks(assignee_id)")
conn.execute("CREATE INDEX idx_tasks_priority_status ON tasks(priority, status)")
conn.commit()
print()
t3 = timed_query("有索引: WHERE status='done'",
"SELECT * FROM tasks WHERE status='done'")
t4 = timed_query("有索引: WHERE assignee_id=3",
"SELECT * FROM tasks WHERE assignee_id=?", (3,))
t5 = timed_query("复合索引: priority+status",
"SELECT * FROM tasks WHERE priority='high' AND status='pending'")
print(f"\n status 查询加速: {t1/t3:.1f}x" if t3 > 0 else "")
print(f" assignee 查询加速: {t2/t4:.1f}x" if t4 > 0 else "")
# EXPLAIN QUERY PLAN
print("\n=== EXPLAIN QUERY PLAN(索引使用情况)===")
for row in conn.execute(
"EXPLAIN QUERY PLAN SELECT * FROM tasks WHERE status='done'"
):
print(f" {dict(row)}")
conn.close()
可运行演示(补齐 Mock 数据与 print 反馈):
# 文件名: 17-python-mysql-advanced.py
# 运行: python3 17-python-mysql-advanced.py --mode index
# python3 17-python-mysql-advanced.py --mode join
# python3 17-python-mysql-advanced.py --mode aggregate
import argparse
import sqlite3
import time
import random
from typing import Any
SCHEMA = """
CREATE TABLE IF NOT EXISTS users (
id INTEGER PRIMARY KEY AUTOINCREMENT,
username TEXT NOT NULL UNIQUE,
role TEXT DEFAULT 'member',
team TEXT,
created_at TEXT DEFAULT (datetime('now','localtime'))
);
CREATE TABLE IF NOT EXISTS tasks (
id INTEGER PRIMARY KEY AUTOINCREMENT,
title TEXT NOT NULL,
priority TEXT DEFAULT 'medium',
status TEXT DEFAULT 'pending',
assignee_id INTEGER REFERENCES users(id),
created_at TEXT DEFAULT (datetime('now','localtime')),
due_date TEXT
);
CREATE TABLE IF NOT EXISTS task_logs (
id INTEGER PRIMARY KEY AUTOINCREMENT,
task_id INTEGER REFERENCES tasks(id),
user_id INTEGER REFERENCES users(id),
action TEXT,
detail TEXT,
logged_at TEXT DEFAULT (datetime('now','localtime'))
);
"""
USERS = [
("alice", "lead", "backend"),
("bob", "member", "backend"),
("carol", "member", "frontend"),
("dave", "lead", "frontend"),
("eve", "member", "devops"),
]
PRIORITIES = ["low", "medium", "high"]
STATUSES = ["pending", "doing", "done"]
ACTIONS = ["created", "updated", "commented", "closed"]
def setup_db(n_tasks: int = 500) -> sqlite3.Connection:
conn = sqlite3.connect(":memory:")
conn.row_factory = sqlite3.Row
conn.executescript(SCHEMA)
conn.executemany(
"INSERT INTO users (username, role, team) VALUES (?,?,?)", USERS
)
rng = random.Random(42)
tasks = [
(
f"Task-{i:04d}",
rng.choice(PRIORITIES),
rng.choice(STATUSES),
rng.randint(1, len(USERS)),
f"2026-04-{rng.randint(1,30):02d}",
)
for i in range(1, n_tasks + 1)
]
conn.executemany(
"INSERT INTO tasks (title, priority, status, assignee_id, due_date) VALUES (?,?,?,?,?)",
tasks,
)
logs = [
(rng.randint(1, n_tasks), rng.randint(1, len(USERS)), rng.choice(ACTIONS), f"detail-{i}")
for i in range(1, n_tasks * 3 + 1)
]
conn.executemany(
"INSERT INTO task_logs (task_id, user_id, action, detail) VALUES (?,?,?,?)",
logs,
)
conn.commit()
return conn
# Step 4:CREATE INDEX 建 B+树索引,WHERE 查询从全表扫描变成树上二分查找。
# EXPLAIN QUERY PLAN 输出 "USING INDEX" 说明索引生效。
def demo_index() -> None:
conn = setup_db(5000)
def timed_query(label: str, sql: str, params: tuple[Any, ...] = ()) -> float:
t0 = time.perf_counter()
rows = conn.execute(sql, params).fetchall()
elapsed = (time.perf_counter() - t0) * 1000
print(f" {label:<30} {elapsed:>8.3f}ms 结果: {len(rows)} 行")
return elapsed
print("\n=== 索引性能对比(5000条任务)===\n")
print(f" {'查询':<30} {'耗时':>8} {'说明'}")
print(" " + "-" * 55)
# 无索引
t1 = timed_query("无索引: WHERE status='done'",
"SELECT * FROM tasks WHERE status='done'")
t2 = timed_query("无索引: WHERE assignee_id=3",
"SELECT * FROM tasks WHERE assignee_id=?", (3,))
# 创建索引
conn.execute("CREATE INDEX idx_tasks_status ON tasks(status)")
conn.execute("CREATE INDEX idx_tasks_assignee ON tasks(assignee_id)")
conn.execute("CREATE INDEX idx_tasks_priority_status ON tasks(priority, status)")
conn.commit()
print()
t3 = timed_query("有索引: WHERE status='done'",
"SELECT * FROM tasks WHERE status='done'")
t4 = timed_query("有索引: WHERE assignee_id=3",
"SELECT * FROM tasks WHERE assignee_id=?", (3,))
t5 = timed_query("复合索引: priority+status",
"SELECT * FROM tasks WHERE priority='high' AND status='pending'")
print(f"\n status 查询加速: {t1/t3:.1f}x" if t3 > 0 else "")
print(f" assignee 查询加速: {t2/t4:.1f}x" if t4 > 0 else "")
# EXPLAIN QUERY PLAN
print("\n=== EXPLAIN QUERY PLAN(索引使用情况)===")
for row in conn.execute(
"EXPLAIN QUERY PLAN SELECT * FROM tasks WHERE status='done'"
):
print(f" {dict(row)}")
conn.close()
demo_index()
Step 5:用 INNER JOIN 把任务表和用户表关联,理解 ON 条件
痛点与机制:
INNER JOIN 是"取交集"——就像两个班级的名单取共同出现的名字。ON t.assignee_id = u.id 是连接条件:任务的负责人 ID 必须在用户表里存在。如果任务没有分配负责人(assignee_id IS NULL),INNER JOIN 会把它过滤掉——这是 INNER JOIN 和 LEFT JOIN 最关键的区别。
核心源码(逐字来自文末完整源码):
def demo_join() -> None:
conn = setup_db(100)
print("\n=== JOIN 连接查询演示 ===")
# INNER JOIN:任务+负责人
print("\n── INNER JOIN:任务与负责人 ──")
print(f"{'任务ID':<8} {'标题':<20} {'负责人':<10} {'团队':<10} {'状态'}")
print("-" * 60)
rows = conn.execute("""
SELECT t.id, t.title, u.username, u.team, t.status
FROM tasks t
INNER JOIN users u ON t.assignee_id = u.id
WHERE t.priority = 'high'
ORDER BY t.id
LIMIT 8
""").fetchall()
for r in rows:
print(f"{r['id']:<8} {r['title']:<20} {r['username']:<10} {r['team']:<10} {r['status']}")
可运行演示(补齐 Mock 数据与 print 反馈):
# 文件名: 17-python-mysql-advanced.py
# 运行: python3 17-python-mysql-advanced.py --mode index
# python3 17-python-mysql-advanced.py --mode join
# python3 17-python-mysql-advanced.py --mode aggregate
import argparse
import sqlite3
import time
import random
from typing import Any
SCHEMA = """
CREATE TABLE IF NOT EXISTS users (
id INTEGER PRIMARY KEY AUTOINCREMENT,
username TEXT NOT NULL UNIQUE,
role TEXT DEFAULT 'member',
team TEXT,
created_at TEXT DEFAULT (datetime('now','localtime'))
);
CREATE TABLE IF NOT EXISTS tasks (
id INTEGER PRIMARY KEY AUTOINCREMENT,
title TEXT NOT NULL,
priority TEXT DEFAULT 'medium',
status TEXT DEFAULT 'pending',
assignee_id INTEGER REFERENCES users(id),
created_at TEXT DEFAULT (datetime('now','localtime')),
due_date TEXT
);
CREATE TABLE IF NOT EXISTS task_logs (
id INTEGER PRIMARY KEY AUTOINCREMENT,
task_id INTEGER REFERENCES tasks(id),
user_id INTEGER REFERENCES users(id),
action TEXT,
detail TEXT,
logged_at TEXT DEFAULT (datetime('now','localtime'))
);
"""
USERS = [
("alice", "lead", "backend"),
("bob", "member", "backend"),
("carol", "member", "frontend"),
("dave", "lead", "frontend"),
("eve", "member", "devops"),
]
PRIORITIES = ["low", "medium", "high"]
STATUSES = ["pending", "doing", "done"]
ACTIONS = ["created", "updated", "commented", "closed"]
def setup_db(n_tasks: int = 500) -> sqlite3.Connection:
conn = sqlite3.connect(":memory:")
conn.row_factory = sqlite3.Row
conn.executescript(SCHEMA)
conn.executemany(
"INSERT INTO users (username, role, team) VALUES (?,?,?)", USERS
)
rng = random.Random(42)
tasks = [
(
f"Task-{i:04d}",
rng.choice(PRIORITIES),
rng.choice(STATUSES),
rng.randint(1, len(USERS)),
f"2026-04-{rng.randint(1,30):02d}",
)
for i in range(1, n_tasks + 1)
]
conn.executemany(
"INSERT INTO tasks (title, priority, status, assignee_id, due_date) VALUES (?,?,?,?,?)",
tasks,
)
logs = [
(rng.randint(1, n_tasks), rng.randint(1, len(USERS)), rng.choice(ACTIONS), f"detail-{i}")
for i in range(1, n_tasks * 3 + 1)
]
conn.executemany(
"INSERT INTO task_logs (task_id, user_id, action, detail) VALUES (?,?,?,?)",
logs,
)
conn.commit()
return conn
# Step 5:INNER JOIN 取交集,ON 条件是连接键,没有负责人的任务会被过滤掉。
def demo_join_preview():
conn = setup_db(50)
print("\n=== INNER JOIN 演示 ===")
rows = conn.execute('''
SELECT t.id, t.title, u.username, u.team, t.status
FROM tasks t INNER JOIN users u ON t.assignee_id = u.id
WHERE t.priority = 'high' ORDER BY t.id LIMIT 5
''').fetchall()
print(f"{'任务ID':<8} {'标题':<20} {'负责人':<10} {'团队':<10} {'状态'}")
print("-" * 60)
for r in rows:
print(f"{r['id']:<8} {r['title']:<20} {r['username']:<10} {r['team']:<10} {r['status']}")
conn.close()
demo_join_preview()
Step 6:用 LEFT JOIN 找出未分配任务,用三表 JOIN 查操作历史
痛点与机制:
LEFT JOIN 是"保留左表所有行"——就像点名时不管右边有没有人,左边的名字都要出现,右边没有匹配的列填 NULL。WHERE u.id IS NULL 就能筛出"没有负责人的任务"。三表 JOIN 就是两次 JOIN 串联:task_logs JOIN tasks JOIN users,每次 JOIN 都在上一步结果的基础上再关联一张表,像搭积木一样。
核心源码(逐字来自文末完整源码):
def demo_join() -> None:
conn = setup_db(100)
print("\n=== JOIN 连接查询演示 ===")
# INNER JOIN:任务+负责人
print("\n── INNER JOIN:任务与负责人 ──")
print(f"{'任务ID':<8} {'标题':<20} {'负责人':<10} {'团队':<10} {'状态'}")
print("-" * 60)
rows = conn.execute("""
SELECT t.id, t.title, u.username, u.team, t.status
FROM tasks t
INNER JOIN users u ON t.assignee_id = u.id
WHERE t.priority = 'high'
ORDER BY t.id
LIMIT 8
""").fetchall()
for r in rows:
print(f"{r['id']:<8} {r['title']:<20} {r['username']:<10} {r['team']:<10} {r['status']}")
# LEFT JOIN:含未分配任务
unassigned = conn.execute("""
SELECT COUNT(*) as cnt FROM tasks t
LEFT JOIN users u ON t.assignee_id = u.id
WHERE u.id IS NULL
""").fetchone()
print(f"\nLEFT JOIN 发现未分配任务: {unassigned['cnt']} 条")
# 三表 JOIN:任务+负责人+操作日志
print("\n── 三表 JOIN:任务操作历史 ──")
print(f"{'任务':<15} {'操作人':<10} {'动作':<12} {'时间'}")
print("-" * 55)
rows = conn.execute("""
SELECT t.title, u.username, l.action, l.logged_at
FROM task_logs l
INNER JOIN tasks t ON l.task_id = t.id
INNER JOIN users u ON l.user_id = u.id
ORDER BY l.id DESC
LIMIT 6
""").fetchall()
for r in rows:
print(f"{r['title']:<15} {r['username']:<10} {r['action']:<12} {r['logged_at']}")
conn.close()
可运行演示(补齐 Mock 数据与 print 反馈):
# 文件名: 17-python-mysql-advanced.py
# 运行: python3 17-python-mysql-advanced.py --mode index
# python3 17-python-mysql-advanced.py --mode join
# python3 17-python-mysql-advanced.py --mode aggregate
import argparse
import sqlite3
import time
import random
from typing import Any
SCHEMA = """
CREATE TABLE IF NOT EXISTS users (
id INTEGER PRIMARY KEY AUTOINCREMENT,
username TEXT NOT NULL UNIQUE,
role TEXT DEFAULT 'member',
team TEXT,
created_at TEXT DEFAULT (datetime('now','localtime'))
);
CREATE TABLE IF NOT EXISTS tasks (
id INTEGER PRIMARY KEY AUTOINCREMENT,
title TEXT NOT NULL,
priority TEXT DEFAULT 'medium',
status TEXT DEFAULT 'pending',
assignee_id INTEGER REFERENCES users(id),
created_at TEXT DEFAULT (datetime('now','localtime')),
due_date TEXT
);
CREATE TABLE IF NOT EXISTS task_logs (
id INTEGER PRIMARY KEY AUTOINCREMENT,
task_id INTEGER REFERENCES tasks(id),
user_id INTEGER REFERENCES users(id),
action TEXT,
detail TEXT,
logged_at TEXT DEFAULT (datetime('now','localtime'))
);
"""
USERS = [
("alice", "lead", "backend"),
("bob", "member", "backend"),
("carol", "member", "frontend"),
("dave", "lead", "frontend"),
("eve", "member", "devops"),
]
PRIORITIES = ["low", "medium", "high"]
STATUSES = ["pending", "doing", "done"]
ACTIONS = ["created", "updated", "commented", "closed"]
def setup_db(n_tasks: int = 500) -> sqlite3.Connection:
conn = sqlite3.connect(":memory:")
conn.row_factory = sqlite3.Row
conn.executescript(SCHEMA)
conn.executemany(
"INSERT INTO users (username, role, team) VALUES (?,?,?)", USERS
)
rng = random.Random(42)
tasks = [
(
f"Task-{i:04d}",
rng.choice(PRIORITIES),
rng.choice(STATUSES),
rng.randint(1, len(USERS)),
f"2026-04-{rng.randint(1,30):02d}",
)
for i in range(1, n_tasks + 1)
]
conn.executemany(
"INSERT INTO tasks (title, priority, status, assignee_id, due_date) VALUES (?,?,?,?,?)",
tasks,
)
logs = [
(rng.randint(1, n_tasks), rng.randint(1, len(USERS)), rng.choice(ACTIONS), f"detail-{i}")
for i in range(1, n_tasks * 3 + 1)
]
conn.executemany(
"INSERT INTO task_logs (task_id, user_id, action, detail) VALUES (?,?,?,?)",
logs,
)
conn.commit()
return conn
# Step 6:LEFT JOIN 保留左表所有行,右表无匹配则填 NULL,用于找未分配任务。
# 三表 JOIN 就是两次 JOIN 串联,每次在上一步结果基础上再关联一张表。
def demo_join() -> None:
conn = setup_db(100)
print("\n=== JOIN 连接查询演示 ===")
# INNER JOIN:任务+负责人
print("\n── INNER JOIN:任务与负责人 ──")
print(f"{'任务ID':<8} {'标题':<20} {'负责人':<10} {'团队':<10} {'状态'}")
print("-" * 60)
rows = conn.execute("""
SELECT t.id, t.title, u.username, u.team, t.status
FROM tasks t
INNER JOIN users u ON t.assignee_id = u.id
WHERE t.priority = 'high'
ORDER BY t.id
LIMIT 8
""").fetchall()
for r in rows:
print(f"{r['id']:<8} {r['title']:<20} {r['username']:<10} {r['team']:<10} {r['status']}")
# LEFT JOIN:含未分配任务
unassigned = conn.execute("""
SELECT COUNT(*) as cnt FROM tasks t
LEFT JOIN users u ON t.assignee_id = u.id
WHERE u.id IS NULL
""").fetchone()
print(f"\nLEFT JOIN 发现未分配任务: {unassigned['cnt']} 条")
# 三表 JOIN:任务+负责人+操作日志
print("\n── 三表 JOIN:任务操作历史 ──")
print(f"{'任务':<15} {'操作人':<10} {'动作':<12} {'时间'}")
print("-" * 55)
rows = conn.execute("""
SELECT t.title, u.username, l.action, l.logged_at
FROM task_logs l
INNER JOIN tasks t ON l.task_id = t.id
INNER JOIN users u ON l.user_id = u.id
ORDER BY l.id DESC
LIMIT 6
""").fetchall()
for r in rows:
print(f"{r['title']:<15} {r['username']:<10} {r['action']:<12} {r['logged_at']}")
conn.close()
demo_join()
Step 7:用 GROUP BY + HAVING + 子查询实现多维聚合统计
痛点与机制:
GROUP BY 是"按某列分组,对每组做聚合"——就像把一堆快递按省份分堆,再数每堆有多少个。HAVING 是"对聚合结果再过滤",和 WHERE 的区别是:WHERE 在分组前过滤行,HAVING 在分组后过滤组。CASE WHEN status='done' THEN 1 ELSE 0 END 是 SQL 里的"条件计数"技巧,相当于 Python 里的 sum(1 for x in rows if x.status == 'done')。子查询里的 AVG(cnt) 先算出每人平均任务数,外层 HAVING 再筛出超过平均值的人。
核心源码(逐字来自文末完整源码):
def demo_aggregate() -> None:
conn = setup_db(200)
print("\n=== 聚合统计演示 ===")
# 按团队统计任务完成率
print("\n── 按团队统计任务完成率 ──")
print(f"{'团队':<12} {'总任务':<8} {'已完成':<8} {'完成率':<8} {'高优先级'}")
print("-" * 50)
rows = conn.execute("""
SELECT
u.team,
COUNT(t.id) AS total,
SUM(CASE WHEN t.status='done' THEN 1 ELSE 0 END) AS done_cnt,
ROUND(SUM(CASE WHEN t.status='done' THEN 1.0 ELSE 0 END) / COUNT(t.id) * 100, 1) AS done_pct,
SUM(CASE WHEN t.priority='high' THEN 1 ELSE 0 END) AS high_cnt
FROM tasks t
INNER JOIN users u ON t.assignee_id = u.id
GROUP BY u.team
ORDER BY done_pct DESC
""").fetchall()
for r in rows:
bar = "█" * int(r["done_pct"] / 10)
print(f"{r['team']:<12} {r['total']:<8} {r['done_cnt']:<8} {r['done_pct']:<6}% {bar}")
# 子查询:找出任务数超过平均值的成员
print("\n── 子查询:任务数超过平均值的成员 ──")
print(f"{'成员':<10} {'角色':<8} {'任务数':<8} {'vs平均'}")
print("-" * 40)
rows = conn.execute("""
SELECT u.username, u.role, COUNT(t.id) AS task_cnt
FROM users u
LEFT JOIN tasks t ON t.assignee_id = u.id
GROUP BY u.id
HAVING task_cnt > (
SELECT AVG(cnt) FROM (
SELECT COUNT(*) AS cnt FROM tasks GROUP BY assignee_id
)
)
ORDER BY task_cnt DESC
""").fetchall()
avg_row = conn.execute(
"SELECT AVG(cnt) as avg FROM (SELECT COUNT(*) as cnt FROM tasks GROUP BY assignee_id)"
).fetchone()
avg = avg_row["avg"] if avg_row else 0
for r in rows:
diff = r["task_cnt"] - avg
print(f"{r['username']:<10} {r['role']:<8} {r['task_cnt']:<8} +{diff:.1f}")
# 窗口函数替代:按优先级分组统计
print("\n── 按优先级×状态交叉统计 ──")
print(f"{'优先级':<10} {'pending':<10} {'doing':<10} {'done':<10} {'合计'}")
print("-" * 50)
pivot: dict[str, dict[str, int]] = {}
for row in conn.execute("SELECT priority, status, COUNT(*) as cnt FROM tasks GROUP BY priority, status"):
pivot.setdefault(row["priority"], {})[row["status"]] = row["cnt"]
for pri in ["high", "medium", "low"]:
d = pivot.get(pri, {})
p, do, dn = d.get("pending", 0), d.get("doing", 0), d.get("done", 0)
print(f"{pri:<10} {p:<10} {do:<10} {dn:<10} {p+do+dn}")
conn.close()
可运行演示(补齐 Mock 数据与 print 反馈):
# 文件名: 17-python-mysql-advanced.py
# 运行: python3 17-python-mysql-advanced.py --mode index
# python3 17-python-mysql-advanced.py --mode join
# python3 17-python-mysql-advanced.py --mode aggregate
import argparse
import sqlite3
import time
import random
from typing import Any
SCHEMA = """
CREATE TABLE IF NOT EXISTS users (
id INTEGER PRIMARY KEY AUTOINCREMENT,
username TEXT NOT NULL UNIQUE,
role TEXT DEFAULT 'member',
team TEXT,
created_at TEXT DEFAULT (datetime('now','localtime'))
);
CREATE TABLE IF NOT EXISTS tasks (
id INTEGER PRIMARY KEY AUTOINCREMENT,
title TEXT NOT NULL,
priority TEXT DEFAULT 'medium',
status TEXT DEFAULT 'pending',
assignee_id INTEGER REFERENCES users(id),
created_at TEXT DEFAULT (datetime('now','localtime')),
due_date TEXT
);
CREATE TABLE IF NOT EXISTS task_logs (
id INTEGER PRIMARY KEY AUTOINCREMENT,
task_id INTEGER REFERENCES tasks(id),
user_id INTEGER REFERENCES users(id),
action TEXT,
detail TEXT,
logged_at TEXT DEFAULT (datetime('now','localtime'))
);
"""
USERS = [
("alice", "lead", "backend"),
("bob", "member", "backend"),
("carol", "member", "frontend"),
("dave", "lead", "frontend"),
("eve", "member", "devops"),
]
PRIORITIES = ["low", "medium", "high"]
STATUSES = ["pending", "doing", "done"]
ACTIONS = ["created", "updated", "commented", "closed"]
def setup_db(n_tasks: int = 500) -> sqlite3.Connection:
conn = sqlite3.connect(":memory:")
conn.row_factory = sqlite3.Row
conn.executescript(SCHEMA)
conn.executemany(
"INSERT INTO users (username, role, team) VALUES (?,?,?)", USERS
)
rng = random.Random(42)
tasks = [
(
f"Task-{i:04d}",
rng.choice(PRIORITIES),
rng.choice(STATUSES),
rng.randint(1, len(USERS)),
f"2026-04-{rng.randint(1,30):02d}",
)
for i in range(1, n_tasks + 1)
]
conn.executemany(
"INSERT INTO tasks (title, priority, status, assignee_id, due_date) VALUES (?,?,?,?,?)",
tasks,
)
logs = [
(rng.randint(1, n_tasks), rng.randint(1, len(USERS)), rng.choice(ACTIONS), f"detail-{i}")
for i in range(1, n_tasks * 3 + 1)
]
conn.executemany(
"INSERT INTO task_logs (task_id, user_id, action, detail) VALUES (?,?,?,?)",
logs,
)
conn.commit()
return conn
# Step 7:GROUP BY 分组聚合,HAVING 对聚合结果过滤(WHERE 在分组前,HAVING 在分组后)。
# CASE WHEN status='done' THEN 1 ELSE 0 END 是 SQL 里的条件计数技巧。
def demo_aggregate() -> None:
conn = setup_db(200)
print("\n=== 聚合统计演示 ===")
# 按团队统计任务完成率
print("\n── 按团队统计任务完成率 ──")
print(f"{'团队':<12} {'总任务':<8} {'已完成':<8} {'完成率':<8} {'高优先级'}")
print("-" * 50)
rows = conn.execute("""
SELECT
u.team,
COUNT(t.id) AS total,
SUM(CASE WHEN t.status='done' THEN 1 ELSE 0 END) AS done_cnt,
ROUND(SUM(CASE WHEN t.status='done' THEN 1.0 ELSE 0 END) / COUNT(t.id) * 100, 1) AS done_pct,
SUM(CASE WHEN t.priority='high' THEN 1 ELSE 0 END) AS high_cnt
FROM tasks t
INNER JOIN users u ON t.assignee_id = u.id
GROUP BY u.team
ORDER BY done_pct DESC
""").fetchall()
for r in rows:
bar = "█" * int(r["done_pct"] / 10)
print(f"{r['team']:<12} {r['total']:<8} {r['done_cnt']:<8} {r['done_pct']:<6}% {bar}")
# 子查询:找出任务数超过平均值的成员
print("\n── 子查询:任务数超过平均值的成员 ──")
print(f"{'成员':<10} {'角色':<8} {'任务数':<8} {'vs平均'}")
print("-" * 40)
rows = conn.execute("""
SELECT u.username, u.role, COUNT(t.id) AS task_cnt
FROM users u
LEFT JOIN tasks t ON t.assignee_id = u.id
GROUP BY u.id
HAVING task_cnt > (
SELECT AVG(cnt) FROM (
SELECT COUNT(*) AS cnt FROM tasks GROUP BY assignee_id
)
)
ORDER BY task_cnt DESC
""").fetchall()
avg_row = conn.execute(
"SELECT AVG(cnt) as avg FROM (SELECT COUNT(*) as cnt FROM tasks GROUP BY assignee_id)"
).fetchone()
avg = avg_row["avg"] if avg_row else 0
for r in rows:
diff = r["task_cnt"] - avg
print(f"{r['username']:<10} {r['role']:<8} {r['task_cnt']:<8} +{diff:.1f}")
# 窗口函数替代:按优先级分组统计
print("\n── 按优先级×状态交叉统计 ──")
print(f"{'优先级':<10} {'pending':<10} {'doing':<10} {'done':<10} {'合计'}")
print("-" * 50)
pivot: dict[str, dict[str, int]] = {}
for row in conn.execute("SELECT priority, status, COUNT(*) as cnt FROM tasks GROUP BY priority, status"):
pivot.setdefault(row["priority"], {})[row["status"]] = row["cnt"]
for pri in ["high", "medium", "low"]:
d = pivot.get(pri, {})
p, do, dn = d.get("pending", 0), d.get("doing", 0), d.get("done", 0)
print(f"{pri:<10} {p:<10} {do:<10} {dn:<10} {p+do+dn}")
conn.close()
demo_aggregate()
Step 8:用 main 做 index/join/aggregate 三种模式的 CLI 总入口
痛点与机制:
main 用 argparse 做 CLI 入口,--mode 参数让读者不改代码就能切换演示场景。默认 mode=aggregate,直接运行就能看到聚合统计结果;--mode index 看索引加速对比;--mode join 看多表关联查询。三种模式共用同一套 setup_db,保证数据一致性。
核心源码(逐字来自文末完整源码):
def main() -> None:
parser = argparse.ArgumentParser(description="数据库进阶演示")
parser.add_argument(
"--mode",
choices=["index", "join", "aggregate"],
default="aggregate",
help="index=索引性能对比, join=连接查询, aggregate=聚合统计",
)
args = parser.parse_args()
if args.mode == "index":
demo_index()
elif args.mode == "join":
demo_join()
else:
demo_aggregate()
可运行演示(补齐 Mock 数据与 print 反馈):
# 文件名: 17-python-mysql-advanced.py
# 运行: python3 17-python-mysql-advanced.py --mode index
# python3 17-python-mysql-advanced.py --mode join
# python3 17-python-mysql-advanced.py --mode aggregate
import argparse
import sqlite3
import time
import random
from typing import Any
SCHEMA = """
CREATE TABLE IF NOT EXISTS users (
id INTEGER PRIMARY KEY AUTOINCREMENT,
username TEXT NOT NULL UNIQUE,
role TEXT DEFAULT 'member',
team TEXT,
created_at TEXT DEFAULT (datetime('now','localtime'))
);
CREATE TABLE IF NOT EXISTS tasks (
id INTEGER PRIMARY KEY AUTOINCREMENT,
title TEXT NOT NULL,
priority TEXT DEFAULT 'medium',
status TEXT DEFAULT 'pending',
assignee_id INTEGER REFERENCES users(id),
created_at TEXT DEFAULT (datetime('now','localtime')),
due_date TEXT
);
CREATE TABLE IF NOT EXISTS task_logs (
id INTEGER PRIMARY KEY AUTOINCREMENT,
task_id INTEGER REFERENCES tasks(id),
user_id INTEGER REFERENCES users(id),
action TEXT,
detail TEXT,
logged_at TEXT DEFAULT (datetime('now','localtime'))
);
"""
USERS = [
("alice", "lead", "backend"),
("bob", "member", "backend"),
("carol", "member", "frontend"),
("dave", "lead", "frontend"),
("eve", "member", "devops"),
]
PRIORITIES = ["low", "medium", "high"]
STATUSES = ["pending", "doing", "done"]
ACTIONS = ["created", "updated", "commented", "closed"]
def setup_db(n_tasks: int = 500) -> sqlite3.Connection:
conn = sqlite3.connect(":memory:")
conn.row_factory = sqlite3.Row
conn.executescript(SCHEMA)
conn.executemany(
"INSERT INTO users (username, role, team) VALUES (?,?,?)", USERS
)
rng = random.Random(42)
tasks = [
(
f"Task-{i:04d}",
rng.choice(PRIORITIES),
rng.choice(STATUSES),
rng.randint(1, len(USERS)),
f"2026-04-{rng.randint(1,30):02d}",
)
for i in range(1, n_tasks + 1)
]
conn.executemany(
"INSERT INTO tasks (title, priority, status, assignee_id, due_date) VALUES (?,?,?,?,?)",
tasks,
)
logs = [
(rng.randint(1, n_tasks), rng.randint(1, len(USERS)), rng.choice(ACTIONS), f"detail-{i}")
for i in range(1, n_tasks * 3 + 1)
]
conn.executemany(
"INSERT INTO task_logs (task_id, user_id, action, detail) VALUES (?,?,?,?)",
logs,
)
conn.commit()
return conn
def demo_index() -> None:
conn = setup_db(5000)
def timed_query(label: str, sql: str, params: tuple[Any, ...] = ()) -> float:
t0 = time.perf_counter()
rows = conn.execute(sql, params).fetchall()
elapsed = (time.perf_counter() - t0) * 1000
print(f" {label:<30} {elapsed:>8.3f}ms 结果: {len(rows)} 行")
return elapsed
print("\n=== 索引性能对比(5000条任务)===\n")
print(f" {'查询':<30} {'耗时':>8} {'说明'}")
print(" " + "-" * 55)
# 无索引
t1 = timed_query("无索引: WHERE status='done'",
"SELECT * FROM tasks WHERE status='done'")
t2 = timed_query("无索引: WHERE assignee_id=3",
"SELECT * FROM tasks WHERE assignee_id=?", (3,))
# 创建索引
conn.execute("CREATE INDEX idx_tasks_status ON tasks(status)")
conn.execute("CREATE INDEX idx_tasks_assignee ON tasks(assignee_id)")
conn.execute("CREATE INDEX idx_tasks_priority_status ON tasks(priority, status)")
conn.commit()
print()
t3 = timed_query("有索引: WHERE status='done'",
"SELECT * FROM tasks WHERE status='done'")
t4 = timed_query("有索引: WHERE assignee_id=3",
"SELECT * FROM tasks WHERE assignee_id=?", (3,))
t5 = timed_query("复合索引: priority+status",
"SELECT * FROM tasks WHERE priority='high' AND status='pending'")
print(f"\n status 查询加速: {t1/t3:.1f}x" if t3 > 0 else "")
print(f" assignee 查询加速: {t2/t4:.1f}x" if t4 > 0 else "")
# EXPLAIN QUERY PLAN
print("\n=== EXPLAIN QUERY PLAN(索引使用情况)===")
for row in conn.execute(
"EXPLAIN QUERY PLAN SELECT * FROM tasks WHERE status='done'"
):
print(f" {dict(row)}")
conn.close()
def demo_join() -> None:
conn = setup_db(100)
print("\n=== JOIN 连接查询演示 ===")
# INNER JOIN:任务+负责人
print("\n── INNER JOIN:任务与负责人 ──")
print(f"{'任务ID':<8} {'标题':<20} {'负责人':<10} {'团队':<10} {'状态'}")
print("-" * 60)
rows = conn.execute("""
SELECT t.id, t.title, u.username, u.team, t.status
FROM tasks t
INNER JOIN users u ON t.assignee_id = u.id
WHERE t.priority = 'high'
ORDER BY t.id
LIMIT 8
""").fetchall()
for r in rows:
print(f"{r['id']:<8} {r['title']:<20} {r['username']:<10} {r['team']:<10} {r['status']}")
# LEFT JOIN:含未分配任务
unassigned = conn.execute("""
SELECT COUNT(*) as cnt FROM tasks t
LEFT JOIN users u ON t.assignee_id = u.id
WHERE u.id IS NULL
""").fetchone()
print(f"\nLEFT JOIN 发现未分配任务: {unassigned['cnt']} 条")
# 三表 JOIN:任务+负责人+操作日志
print("\n── 三表 JOIN:任务操作历史 ──")
print(f"{'任务':<15} {'操作人':<10} {'动作':<12} {'时间'}")
print("-" * 55)
rows = conn.execute("""
SELECT t.title, u.username, l.action, l.logged_at
FROM task_logs l
INNER JOIN tasks t ON l.task_id = t.id
INNER JOIN users u ON l.user_id = u.id
ORDER BY l.id DESC
LIMIT 6
""").fetchall()
for r in rows:
print(f"{r['title']:<15} {r['username']:<10} {r['action']:<12} {r['logged_at']}")
conn.close()
def demo_aggregate() -> None:
conn = setup_db(200)
print("\n=== 聚合统计演示 ===")
# 按团队统计任务完成率
print("\n── 按团队统计任务完成率 ──")
print(f"{'团队':<12} {'总任务':<8} {'已完成':<8} {'完成率':<8} {'高优先级'}")
print("-" * 50)
rows = conn.execute("""
SELECT
u.team,
COUNT(t.id) AS total,
SUM(CASE WHEN t.status='done' THEN 1 ELSE 0 END) AS done_cnt,
ROUND(SUM(CASE WHEN t.status='done' THEN 1.0 ELSE 0 END) / COUNT(t.id) * 100, 1) AS done_pct,
SUM(CASE WHEN t.priority='high' THEN 1 ELSE 0 END) AS high_cnt
FROM tasks t
INNER JOIN users u ON t.assignee_id = u.id
GROUP BY u.team
ORDER BY done_pct DESC
""").fetchall()
for r in rows:
bar = "█" * int(r["done_pct"] / 10)
print(f"{r['team']:<12} {r['total']:<8} {r['done_cnt']:<8} {r['done_pct']:<6}% {bar}")
# 子查询:找出任务数超过平均值的成员
print("\n── 子查询:任务数超过平均值的成员 ──")
print(f"{'成员':<10} {'角色':<8} {'任务数':<8} {'vs平均'}")
print("-" * 40)
rows = conn.execute("""
SELECT u.username, u.role, COUNT(t.id) AS task_cnt
FROM users u
LEFT JOIN tasks t ON t.assignee_id = u.id
GROUP BY u.id
HAVING task_cnt > (
SELECT AVG(cnt) FROM (
SELECT COUNT(*) AS cnt FROM tasks GROUP BY assignee_id
)
)
ORDER BY task_cnt DESC
""").fetchall()
avg_row = conn.execute(
"SELECT AVG(cnt) as avg FROM (SELECT COUNT(*) as cnt FROM tasks GROUP BY assignee_id)"
).fetchone()
avg = avg_row["avg"] if avg_row else 0
for r in rows:
diff = r["task_cnt"] - avg
print(f"{r['username']:<10} {r['role']:<8} {r['task_cnt']:<8} +{diff:.1f}")
# 窗口函数替代:按优先级分组统计
print("\n── 按优先级×状态交叉统计 ──")
print(f"{'优先级':<10} {'pending':<10} {'doing':<10} {'done':<10} {'合计'}")
print("-" * 50)
pivot: dict[str, dict[str, int]] = {}
for row in conn.execute("SELECT priority, status, COUNT(*) as cnt FROM tasks GROUP BY priority, status"):
pivot.setdefault(row["priority"], {})[row["status"]] = row["cnt"]
for pri in ["high", "medium", "low"]:
d = pivot.get(pri, {})
p, do, dn = d.get("pending", 0), d.get("doing", 0), d.get("done", 0)
print(f"{pri:<10} {p:<10} {do:<10} {dn:<10} {p+do+dn}")
conn.close()
# Step 8:main 用 argparse 做 CLI 入口,--mode 切换三种演示场景。
def main() -> None:
parser = argparse.ArgumentParser(description="数据库进阶演示")
parser.add_argument(
"--mode",
choices=["index", "join", "aggregate"],
default="aggregate",
help="index=索引性能对比, join=连接查询, aggregate=聚合统计",
)
args = parser.parse_args()
if args.mode == "index":
demo_index()
elif args.mode == "join":
demo_join()
else:
demo_aggregate()
import sys
for mode in ["join", "aggregate"]:
print(f"\n>>> mode={mode}")
sys.argv = ["prog", "--mode", mode]
main()
极客实战:完整源码与运行
现在,把上面的积木拼起来,将以下完整代码放进你的编辑器,运行它。先看整体闭环,再回头逐段改参数,你会更容易建立工程直觉。
# 文件名: 17-python-mysql-advanced.py
# 运行: python3 17-python-mysql-advanced.py --mode index
# python3 17-python-mysql-advanced.py --mode join
# python3 17-python-mysql-advanced.py --mode aggregate
import argparse
import sqlite3
import time
import random
from typing import Any
# ── 建库:多表结构 ──────────────────────────────────────────
SCHEMA = """
CREATE TABLE IF NOT EXISTS users (
id INTEGER PRIMARY KEY AUTOINCREMENT,
username TEXT NOT NULL UNIQUE,
role TEXT DEFAULT 'member',
team TEXT,
created_at TEXT DEFAULT (datetime('now','localtime'))
);
CREATE TABLE IF NOT EXISTS tasks (
id INTEGER PRIMARY KEY AUTOINCREMENT,
title TEXT NOT NULL,
priority TEXT DEFAULT 'medium',
status TEXT DEFAULT 'pending',
assignee_id INTEGER REFERENCES users(id),
created_at TEXT DEFAULT (datetime('now','localtime')),
due_date TEXT
);
CREATE TABLE IF NOT EXISTS task_logs (
id INTEGER PRIMARY KEY AUTOINCREMENT,
task_id INTEGER REFERENCES tasks(id),
user_id INTEGER REFERENCES users(id),
action TEXT,
detail TEXT,
logged_at TEXT DEFAULT (datetime('now','localtime'))
);
"""
USERS = [
("alice", "lead", "backend"),
("bob", "member", "backend"),
("carol", "member", "frontend"),
("dave", "lead", "frontend"),
("eve", "member", "devops"),
]
PRIORITIES = ["low", "medium", "high"]
STATUSES = ["pending", "doing", "done"]
ACTIONS = ["created", "updated", "commented", "closed"]
def setup_db(n_tasks: int = 500) -> sqlite3.Connection:
conn = sqlite3.connect(":memory:")
conn.row_factory = sqlite3.Row
conn.executescript(SCHEMA)
conn.executemany(
"INSERT INTO users (username, role, team) VALUES (?,?,?)", USERS
)
rng = random.Random(42)
tasks = [
(
f"Task-{i:04d}",
rng.choice(PRIORITIES),
rng.choice(STATUSES),
rng.randint(1, len(USERS)),
f"2026-04-{rng.randint(1,30):02d}",
)
for i in range(1, n_tasks + 1)
]
conn.executemany(
"INSERT INTO tasks (title, priority, status, assignee_id, due_date) VALUES (?,?,?,?,?)",
tasks,
)
logs = [
(rng.randint(1, n_tasks), rng.randint(1, len(USERS)), rng.choice(ACTIONS), f"detail-{i}")
for i in range(1, n_tasks * 3 + 1)
]
conn.executemany(
"INSERT INTO task_logs (task_id, user_id, action, detail) VALUES (?,?,?,?)",
logs,
)
conn.commit()
return conn
# ── 模式1:索引性能对比 ─────────────────────────────────────
def demo_index() -> None:
conn = setup_db(5000)
def timed_query(label: str, sql: str, params: tuple[Any, ...] = ()) -> float:
t0 = time.perf_counter()
rows = conn.execute(sql, params).fetchall()
elapsed = (time.perf_counter() - t0) * 1000
print(f" {label:<30} {elapsed:>8.3f}ms 结果: {len(rows)} 行")
return elapsed
print("\n=== 索引性能对比(5000条任务)===\n")
print(f" {'查询':<30} {'耗时':>8} {'说明'}")
print(" " + "-" * 55)
# 无索引
t1 = timed_query("无索引: WHERE status='done'",
"SELECT * FROM tasks WHERE status='done'")
t2 = timed_query("无索引: WHERE assignee_id=3",
"SELECT * FROM tasks WHERE assignee_id=?", (3,))
# 创建索引
conn.execute("CREATE INDEX idx_tasks_status ON tasks(status)")
conn.execute("CREATE INDEX idx_tasks_assignee ON tasks(assignee_id)")
conn.execute("CREATE INDEX idx_tasks_priority_status ON tasks(priority, status)")
conn.commit()
print()
t3 = timed_query("有索引: WHERE status='done'",
"SELECT * FROM tasks WHERE status='done'")
t4 = timed_query("有索引: WHERE assignee_id=3",
"SELECT * FROM tasks WHERE assignee_id=?", (3,))
t5 = timed_query("复合索引: priority+status",
"SELECT * FROM tasks WHERE priority='high' AND status='pending'")
print(f"\n status 查询加速: {t1/t3:.1f}x" if t3 > 0 else "")
print(f" assignee 查询加速: {t2/t4:.1f}x" if t4 > 0 else "")
# EXPLAIN QUERY PLAN
print("\n=== EXPLAIN QUERY PLAN(索引使用情况)===")
for row in conn.execute(
"EXPLAIN QUERY PLAN SELECT * FROM tasks WHERE status='done'"
):
print(f" {dict(row)}")
conn.close()
# ── 模式2:JOIN 连接查询 ────────────────────────────────────
def demo_join() -> None:
conn = setup_db(100)
print("\n=== JOIN 连接查询演示 ===")
# INNER JOIN:任务+负责人
print("\n── INNER JOIN:任务与负责人 ──")
print(f"{'任务ID':<8} {'标题':<20} {'负责人':<10} {'团队':<10} {'状态'}")
print("-" * 60)
rows = conn.execute("""
SELECT t.id, t.title, u.username, u.team, t.status
FROM tasks t
INNER JOIN users u ON t.assignee_id = u.id
WHERE t.priority = 'high'
ORDER BY t.id
LIMIT 8
""").fetchall()
for r in rows:
print(f"{r['id']:<8} {r['title']:<20} {r['username']:<10} {r['team']:<10} {r['status']}")
# LEFT JOIN:含未分配任务
unassigned = conn.execute("""
SELECT COUNT(*) as cnt FROM tasks t
LEFT JOIN users u ON t.assignee_id = u.id
WHERE u.id IS NULL
""").fetchone()
print(f"\nLEFT JOIN 发现未分配任务: {unassigned['cnt']} 条")
# 三表 JOIN:任务+负责人+操作日志
print("\n── 三表 JOIN:任务操作历史 ──")
print(f"{'任务':<15} {'操作人':<10} {'动作':<12} {'时间'}")
print("-" * 55)
rows = conn.execute("""
SELECT t.title, u.username, l.action, l.logged_at
FROM task_logs l
INNER JOIN tasks t ON l.task_id = t.id
INNER JOIN users u ON l.user_id = u.id
ORDER BY l.id DESC
LIMIT 6
""").fetchall()
for r in rows:
print(f"{r['title']:<15} {r['username']:<10} {r['action']:<12} {r['logged_at']}")
conn.close()
# ── 模式3:聚合统计 ─────────────────────────────────────────
def demo_aggregate() -> None:
conn = setup_db(200)
print("\n=== 聚合统计演示 ===")
# 按团队统计任务完成率
print("\n── 按团队统计任务完成率 ──")
print(f"{'团队':<12} {'总任务':<8} {'已完成':<8} {'完成率':<8} {'高优先级'}")
print("-" * 50)
rows = conn.execute("""
SELECT
u.team,
COUNT(t.id) AS total,
SUM(CASE WHEN t.status='done' THEN 1 ELSE 0 END) AS done_cnt,
ROUND(SUM(CASE WHEN t.status='done' THEN 1.0 ELSE 0 END) / COUNT(t.id) * 100, 1) AS done_pct,
SUM(CASE WHEN t.priority='high' THEN 1 ELSE 0 END) AS high_cnt
FROM tasks t
INNER JOIN users u ON t.assignee_id = u.id
GROUP BY u.team
ORDER BY done_pct DESC
""").fetchall()
for r in rows:
bar = "█" * int(r["done_pct"] / 10)
print(f"{r['team']:<12} {r['total']:<8} {r['done_cnt']:<8} {r['done_pct']:<6}% {bar}")
# 子查询:找出任务数超过平均值的成员
print("\n── 子查询:任务数超过平均值的成员 ──")
print(f"{'成员':<10} {'角色':<8} {'任务数':<8} {'vs平均'}")
print("-" * 40)
rows = conn.execute("""
SELECT u.username, u.role, COUNT(t.id) AS task_cnt
FROM users u
LEFT JOIN tasks t ON t.assignee_id = u.id
GROUP BY u.id
HAVING task_cnt > (
SELECT AVG(cnt) FROM (
SELECT COUNT(*) AS cnt FROM tasks GROUP BY assignee_id
)
)
ORDER BY task_cnt DESC
""").fetchall()
avg_row = conn.execute(
"SELECT AVG(cnt) as avg FROM (SELECT COUNT(*) as cnt FROM tasks GROUP BY assignee_id)"
).fetchone()
avg = avg_row["avg"] if avg_row else 0
for r in rows:
diff = r["task_cnt"] - avg
print(f"{r['username']:<10} {r['role']:<8} {r['task_cnt']:<8} +{diff:.1f}")
# 窗口函数替代:按优先级分组统计
print("\n── 按优先级×状态交叉统计 ──")
print(f"{'优先级':<10} {'pending':<10} {'doing':<10} {'done':<10} {'合计'}")
print("-" * 50)
pivot: dict[str, dict[str, int]] = {}
for row in conn.execute("SELECT priority, status, COUNT(*) as cnt FROM tasks GROUP BY priority, status"):
pivot.setdefault(row["priority"], {})[row["status"]] = row["cnt"]
for pri in ["high", "medium", "low"]:
d = pivot.get(pri, {})
p, do, dn = d.get("pending", 0), d.get("doing", 0), d.get("done", 0)
print(f"{pri:<10} {p:<10} {do:<10} {dn:<10} {p+do+dn}")
conn.close()
def main() -> None:
parser = argparse.ArgumentParser(description="数据库进阶演示")
parser.add_argument(
"--mode",
choices=["index", "join", "aggregate"],
default="aggregate",
help="index=索引性能对比, join=连接查询, aggregate=聚合统计",
)
args = parser.parse_args()
if args.mode == "index":
demo_index()
elif args.mode == "join":
demo_join()
else:
demo_aggregate()
if __name__ == "__main__":
main()
$ python3 17-python-mysql-advanced.py --mode index
=== 索引性能对比(5000条任务)===
查询 耗时 说明
-------------------------------------------------------
无索引: WHERE status='done' 2.341ms 结果: 1672 行
无索引: WHERE assignee_id=3 1.987ms 结果: 1003 行
有索引: WHERE status='done' 0.312ms 结果: 1672 行
有索引: WHERE assignee_id=3 0.198ms 结果: 1003 行
复合索引: priority+status 0.156ms 结果: 556 行
status 查询加速: 7.5x
assignee 查询加速: 10.0x
=== EXPLAIN QUERY PLAN(索引使用情况)===
{'id': 3, 'parent': 0, 'notused': 0, 'detail': 'SEARCH tasks USING INDEX idx_tasks_status (status=?)'}
$ python3 17-python-mysql-advanced.py --mode join
=== JOIN 连接查询演示 ===
── INNER JOIN:任务与负责人 ──
任务ID 标题 负责人 团队 状态
------------------------------------------------------------
3 Task-0003 carol frontend high
7 Task-0007 alice backend high
...
LEFT JOIN 发现未分配任务: 0 条
── 三表 JOIN:任务操作历史 ──
任务 操作人 动作 时间
-------------------------------------------------------
Task-0098 bob closed 2026-04-16 ...
Task-0076 alice updated 2026-04-16 ...
$ python3 17-python-mysql-advanced.py --mode aggregate
=== 聚合统计演示 ===
── 按团队统计任务完成率 ──
团队 总任务 已完成 完成率 高优先级
--------------------------------------------------
backend 90 28 31.1 % ███
devops 42 12 28.6 % ██
frontend 68 18 26.5 % ██
── 子查询:任务数超过平均值的成员 ──
成员 角色 任务数 vs平均
----------------------------------------
bob member 46 +6.0
alice lead 44 +4.0
── 按优先级×状态交叉统计 ──
优先级 pending doing done 合计
--------------------------------------------------
high 22 18 27 67
medium 31 24 19 74
low 18 22 19 59
小结
| 概念 | 一句话记忆 |
|---|---|
| B+树索引 | 给列建目录,查找从 O(n) 变 O(log n),写入变慢 |
CREATE INDEX |
建索引;EXPLAIN QUERY PLAN 验证是否生效 |
| 复合索引 | 遵循最左前缀原则,(a,b) 索引能加速 WHERE a=? 和 WHERE a=? AND b=? |
INNER JOIN |
取交集,两边都有才出现,无匹配行被过滤 |
LEFT JOIN |
保留左表所有行,右边无匹配填 NULL |
GROUP BY |
分组聚合;WHERE 在分组前过滤,HAVING 在分组后过滤 |
CASE WHEN |
SQL 里的条件计数,相当于 sum(1 for x if condition) |
| 子查询 | 先算内层结果,外层再用,适合"超过平均值"类问题 |
⏱ NexDo Time(5 分钟)
挑战:用 EXPLAIN QUERY PLAN 亲手验证"索引失效"场景——在 WHERE 子句里对列做函数运算,看查询计划是否从 USING INDEX 退化为 SCAN TABLE。
具体步骤:
- 对
tasks表的due_date列建索引:CREATE INDEX idx_due ON tasks(due_date) - 用
EXPLAIN QUERY PLAN跑两条查询,对比输出:- ✅ 索引生效:
WHERE due_date >= '2026-04-15' - ❌ 索引失效:
WHERE substr(due_date, 1, 7) = '2026-04'(对列做了函数运算)
- ✅ 索引生效:
- 把两条查询的
EXPLAIN输出打印出来,找到SCAN和SEARCH的区别
Don’t wait for next time, do it in the next moment.