文章

17 · 数据库进阶:索引、事务与查询优化

#016 · 2026-04-16 · Python

🔗 知识图谱导航:阅读本文前,建议先掌握《16 · 数据库底座:SQLite 核心操作与 Python 接入》中的 CRUD、参数化查询与事务基础——本文在此之上深入索引原理与多表查询,让数据库从"能用"变成"用好"。

极客解析:数据库性能问题 80% 来自两件事:缺索引和写了低效 SQL。本文用可运行的计时对比让你亲眼看到索引的加速效果,用 EXPLAIN QUERY PLAN 教你验证索引是否真的生效。

痛点与架构:新手学数据库进阶最常见的三个坑:① 不知道什么时候该建索引;② 写了 JOIN 但不知道 INNER/LEFT 的区别;③ GROUP BY 和 HAVING 傻傻分不清。本文用三张关联表(用户、任务、操作日志)把这三个坑一次性填平。

B+树索引原理

无索引查询:全表扫描 O(n)
┌────┬────┬────┬────┬────┬────┬────┬────┐
│ 1  │ 2  │ 3  │ 4  │ 5  │ 6  │ 7  │ 8  │  逐行扫描
└────┴────┴────┴────┴────┴────┴────┴────┘
查找 id=7 → 扫描 7 次

B+树索引:O(log n)
              ┌─────┐
              │  4  │           ← 根节点
              └──┬──┘
        ┌────────┴────────┐
      ┌─┴─┐             ┌─┴─┐
      │1,2│             │5,6│   ← 内部节点(只存键)
      └─┬─┘             └─┬─┘
   ┌────┴────┐       ┌────┴────┐
 [1→r1][2→r2]       [5→r5][6→r6]  ← 叶节点(存键+行指针)
        ↕                  ↕
      [3→r3][4→r4]  [7→r7][8→r8]  ← 叶节点链表(支持范围查询)

查找 id=7 → 3 次比较

索引代价:写入时需维护 B+树,INSERT/UPDATE/DELETE 变慢;占用额外磁盘空间;选择性低的列(如 status 只有 3 个值)建索引收益有限。

事务 ACID

┌──────────────┬──────────────────────────────────────────┐
│ 属性         │ 含义                                     │
├──────────────┼──────────────────────────────────────────┤
│ Atomicity    │ 原子性:全成功或全回滚,不存在中间状态   │
│ Consistency  │ 一致性:事务前后数据满足约束条件         │
│ Isolation    │ 隔离性:并发事务互不干扰                 │
│ Durability   │ 持久性:提交后数据永久保存               │
└──────────────┴──────────────────────────────────────────┘

查询优化核心原则

┌─────────────────────────────────────────────────────────┐
│ 优化原则                                                │
├─────────────────────────────────────────────────────────┤
│ 1. 只查需要的列:SELECT id,name 而非 SELECT *           │
│ 2. WHERE 条件列建索引,避免全表扫描                     │
│ 3. 复合索引遵循最左前缀原则                             │
│ 4. 避免在 WHERE 中对列做函数运算(索引失效)            │
│    ❌ WHERE YEAR(created_at) = 2026                     │
│    ✅ WHERE created_at BETWEEN '2026-01-01' AND '...'   │
│ 5. LIMIT 提前截断,减少传输数据量                       │
│ 6. 用 EXPLAIN QUERY PLAN 验证索引是否生效               │
└─────────────────────────────────────────────────────────┘

步步为营:核心逻辑自适应拆解

这一篇的核心是三件事:索引让查询从 O(n) 变成 O(log n);JOIN 把多张表的数据关联起来;GROUP BY + HAVING 做多维聚合统计。下面每一步都聚焦一个概念,跑完就能看到结果。

Step 1:用三张表的 SCHEMA 建立多表关联数据模型

痛点与机制

单张表只能存一种实体,真实业务需要多张表协作。REFERENCES users(id) 是外键声明——就像合同里写"乙方必须是甲方名单里的人",数据库会帮你检查引用完整性,不能给一个不存在的用户分配任务。三张表的分工:users 存人,tasks 存事,task_logs 存操作历史,这是最常见的"实体-关系-日志"三层结构。

核心源码(逐字来自文末完整源码)

SCHEMA = """
CREATE TABLE IF NOT EXISTS users (
    id         INTEGER PRIMARY KEY AUTOINCREMENT,
    username   TEXT    NOT NULL UNIQUE,
    role       TEXT    DEFAULT 'member',
    team       TEXT,
    created_at TEXT    DEFAULT (datetime('now','localtime'))
);

CREATE TABLE IF NOT EXISTS tasks (
    id          INTEGER PRIMARY KEY AUTOINCREMENT,
    title       TEXT    NOT NULL,
    priority    TEXT    DEFAULT 'medium',
    status      TEXT    DEFAULT 'pending',
    assignee_id INTEGER REFERENCES users(id),
    created_at  TEXT    DEFAULT (datetime('now','localtime')),
    due_date    TEXT
);

CREATE TABLE IF NOT EXISTS task_logs (
    id         INTEGER PRIMARY KEY AUTOINCREMENT,
    task_id    INTEGER REFERENCES tasks(id),
    user_id    INTEGER REFERENCES users(id),
    action     TEXT,
    detail     TEXT,
    logged_at  TEXT    DEFAULT (datetime('now','localtime'))
);
"""

可运行演示(补齐 Mock 数据与 print 反馈)






# 运行: python3 17-python-mysql-advanced.py --mode index
#       python3 17-python-mysql-advanced.py --mode join
#       python3 17-python-mysql-advanced.py --mode aggregate

import argparse
import sqlite3
import time
import random
from typing import Any

SCHEMA = """
CREATE TABLE IF NOT EXISTS users (
    id         INTEGER PRIMARY KEY AUTOINCREMENT,
    username   TEXT    NOT NULL UNIQUE,
    role       TEXT    DEFAULT 'member',
    team       TEXT,
    created_at TEXT    DEFAULT (datetime('now','localtime'))
);

CREATE TABLE IF NOT EXISTS tasks (
    id          INTEGER PRIMARY KEY AUTOINCREMENT,
    title       TEXT    NOT NULL,
    priority    TEXT    DEFAULT 'medium',
    status      TEXT    DEFAULT 'pending',
    assignee_id INTEGER REFERENCES users(id),
    created_at  TEXT    DEFAULT (datetime('now','localtime')),
    due_date    TEXT
);

CREATE TABLE IF NOT EXISTS task_logs (
    id         INTEGER PRIMARY KEY AUTOINCREMENT,
    task_id    INTEGER REFERENCES tasks(id),
    user_id    INTEGER REFERENCES users(id),
    action     TEXT,
    detail     TEXT,
    logged_at  TEXT    DEFAULT (datetime('now','localtime'))
);
"""

USERS = [
    ("alice", "lead",   "backend"),
    ("bob",   "member", "backend"),
    ("carol", "member", "frontend"),
    ("dave",  "lead",   "frontend"),
    ("eve",   "member", "devops"),
]

PRIORITIES = ["low", "medium", "high"]
STATUSES   = ["pending", "doing", "done"]
ACTIONS    = ["created", "updated", "commented", "closed"]


def setup_db(n_tasks: int = 500) -> sqlite3.Connection:
    conn = sqlite3.connect(":memory:")
    conn.row_factory = sqlite3.Row
    conn.executescript(SCHEMA)

    conn.executemany(
        "INSERT INTO users (username, role, team) VALUES (?,?,?)", USERS
    )

    rng = random.Random(42)
    tasks = [
        (
            f"Task-{i:04d}",
            rng.choice(PRIORITIES),
            rng.choice(STATUSES),
            rng.randint(1, len(USERS)),
            f"2026-04-{rng.randint(1,30):02d}",
        )
        for i in range(1, n_tasks + 1)
    ]
    conn.executemany(
        "INSERT INTO tasks (title, priority, status, assignee_id, due_date) VALUES (?,?,?,?,?)",
        tasks,
    )

    logs = [
        (rng.randint(1, n_tasks), rng.randint(1, len(USERS)), rng.choice(ACTIONS), f"detail-{i}")
        for i in range(1, n_tasks * 3 + 1)
    ]
    conn.executemany(
        "INSERT INTO task_logs (task_id, user_id, action, detail) VALUES (?,?,?,?)",
        logs,
    )
    conn.commit()
    return conn

# Step 1:SCHEMA 定义了三张表:users(人)、tasks(事)、task_logs(日志)。
# REFERENCES users(id) 是外键声明,让数据库维护"引用完整性"。
conn = __import__('sqlite3').connect(":memory:")
conn.row_factory = __import__('sqlite3').Row
conn.executescript(SCHEMA)
tables = conn.execute("SELECT name FROM sqlite_master WHERE type='table'").fetchall()
print("已建表:", [r['name'] for r in tables])
conn.close()

Step 2:用 setup_db 批量生成测试数据,理解 random.Random(42) 的可复现性

痛点与机制

random.Random(42) 是"带种子的随机数生成器"——就像用同一副牌按同一规则洗牌,每次洗出来的顺序完全相同。种子固定 → 每次运行生成的随机序列完全相同 → 测试结果可复现,不会"今天过明天不过"。executemany 批量插入只编译一次 SQL,用不同参数重复执行,比循环调用 execute 快得多。

核心源码(逐字来自文末完整源码)

SCHEMA = """
CREATE TABLE IF NOT EXISTS users (
    id         INTEGER PRIMARY KEY AUTOINCREMENT,
    username   TEXT    NOT NULL UNIQUE,
    role       TEXT    DEFAULT 'member',
    team       TEXT,
    created_at TEXT    DEFAULT (datetime('now','localtime'))
);

CREATE TABLE IF NOT EXISTS tasks (
    id          INTEGER PRIMARY KEY AUTOINCREMENT,
    title       TEXT    NOT NULL,
    priority    TEXT    DEFAULT 'medium',
    status      TEXT    DEFAULT 'pending',
    assignee_id INTEGER REFERENCES users(id),
    created_at  TEXT    DEFAULT (datetime('now','localtime')),
    due_date    TEXT
);

CREATE TABLE IF NOT EXISTS task_logs (
    id         INTEGER PRIMARY KEY AUTOINCREMENT,
    task_id    INTEGER REFERENCES tasks(id),
    user_id    INTEGER REFERENCES users(id),
    action     TEXT,
    detail     TEXT,
    logged_at  TEXT    DEFAULT (datetime('now','localtime'))
);
"""

USERS = [
    ("alice", "lead",   "backend"),
    ("bob",   "member", "backend"),
    ("carol", "member", "frontend"),
    ("dave",  "lead",   "frontend"),
    ("eve",   "member", "devops"),
]

PRIORITIES = ["low", "medium", "high"]
STATUSES   = ["pending", "doing", "done"]
ACTIONS    = ["created", "updated", "commented", "closed"]


def setup_db(n_tasks: int = 500) -> sqlite3.Connection:
    conn = sqlite3.connect(":memory:")
    conn.row_factory = sqlite3.Row
    conn.executescript(SCHEMA)

    conn.executemany(
        "INSERT INTO users (username, role, team) VALUES (?,?,?)", USERS
    )

    rng = random.Random(42)
    tasks = [
        (
            f"Task-{i:04d}",
            rng.choice(PRIORITIES),
            rng.choice(STATUSES),
            rng.randint(1, len(USERS)),
            f"2026-04-{rng.randint(1,30):02d}",
        )
        for i in range(1, n_tasks + 1)
    ]
    conn.executemany(
        "INSERT INTO tasks (title, priority, status, assignee_id, due_date) VALUES (?,?,?,?,?)",
        tasks,
    )

    logs = [
        (rng.randint(1, n_tasks), rng.randint(1, len(USERS)), rng.choice(ACTIONS), f"detail-{i}")
        for i in range(1, n_tasks * 3 + 1)
    ]
    conn.executemany(
        "INSERT INTO task_logs (task_id, user_id, action, detail) VALUES (?,?,?,?)",
        logs,
    )
    conn.commit()
    return conn

可运行演示(补齐 Mock 数据与 print 反馈)

# 文件名: 17-python-mysql-advanced.py




# 运行: python3 17-python-mysql-advanced.py --mode index
#       python3 17-python-mysql-advanced.py --mode join
#       python3 17-python-mysql-advanced.py --mode aggregate

import argparse
import sqlite3
import time
import random
from typing import Any

SCHEMA = """
CREATE TABLE IF NOT EXISTS users (
    id         INTEGER PRIMARY KEY AUTOINCREMENT,
    username   TEXT    NOT NULL UNIQUE,
    role       TEXT    DEFAULT 'member',
    team       TEXT,
    created_at TEXT    DEFAULT (datetime('now','localtime'))
);

CREATE TABLE IF NOT EXISTS tasks (
    id          INTEGER PRIMARY KEY AUTOINCREMENT,
    title       TEXT    NOT NULL,
    priority    TEXT    DEFAULT 'medium',
    status      TEXT    DEFAULT 'pending',
    assignee_id INTEGER REFERENCES users(id),
    created_at  TEXT    DEFAULT (datetime('now','localtime')),
    due_date    TEXT
);

CREATE TABLE IF NOT EXISTS task_logs (
    id         INTEGER PRIMARY KEY AUTOINCREMENT,
    task_id    INTEGER REFERENCES tasks(id),
    user_id    INTEGER REFERENCES users(id),
    action     TEXT,
    detail     TEXT,
    logged_at  TEXT    DEFAULT (datetime('now','localtime'))
);
"""

USERS = [
    ("alice", "lead",   "backend"),
    ("bob",   "member", "backend"),
    ("carol", "member", "frontend"),
    ("dave",  "lead",   "frontend"),
    ("eve",   "member", "devops"),
]

PRIORITIES = ["low", "medium", "high"]
STATUSES   = ["pending", "doing", "done"]
ACTIONS    = ["created", "updated", "commented", "closed"]


def setup_db(n_tasks: int = 500) -> sqlite3.Connection:
    conn = sqlite3.connect(":memory:")
    conn.row_factory = sqlite3.Row
    conn.executescript(SCHEMA)

    conn.executemany(
        "INSERT INTO users (username, role, team) VALUES (?,?,?)", USERS
    )

    rng = random.Random(42)
    tasks = [
        (
            f"Task-{i:04d}",
            rng.choice(PRIORITIES),
            rng.choice(STATUSES),
            rng.randint(1, len(USERS)),
            f"2026-04-{rng.randint(1,30):02d}",
        )
        for i in range(1, n_tasks + 1)
    ]
    conn.executemany(
        "INSERT INTO tasks (title, priority, status, assignee_id, due_date) VALUES (?,?,?,?,?)",
        tasks,
    )

    logs = [
        (rng.randint(1, n_tasks), rng.randint(1, len(USERS)), rng.choice(ACTIONS), f"detail-{i}")
        for i in range(1, n_tasks * 3 + 1)
    ]
    conn.executemany(
        "INSERT INTO task_logs (task_id, user_id, action, detail) VALUES (?,?,?,?)",
        logs,
    )
    conn.commit()
    return conn

# Step 2:random.Random(42) 是带种子的随机数生成器,种子固定则每次结果相同,测试可复现。
conn = setup_db(200)
print(f"users: {conn.execute('SELECT COUNT(*) FROM users').fetchone()[0]} 行")
print(f"tasks: {conn.execute('SELECT COUNT(*) FROM tasks').fetchone()[0]} 行")
print(f"task_logs: {conn.execute('SELECT COUNT(*) FROM task_logs').fetchone()[0]} 行")
conn.close()

Step 3:用 timed_query 计时,看无索引全表扫描的真实耗时

痛点与机制

timed_query 是一个"计时包装器":用 perf_counter 精确到微秒级,把查询耗时和结果行数一起打印出来,让性能差距一眼可见。无索引时,WHERE status='done' 必须逐行扫描整张表——就像在没有目录的字典里找一个词,数据越多越慢,这就是 O(n) 全表扫描。

核心源码(逐字来自文末完整源码)

def demo_index() -> None:
    conn = setup_db(5000)

    def timed_query(label: str, sql: str, params: tuple[Any, ...] = ()) -> float:
        t0 = time.perf_counter()
        rows = conn.execute(sql, params).fetchall()
        elapsed = (time.perf_counter() - t0) * 1000
        print(f"  {label:<30} {elapsed:>8.3f}ms  结果: {len(rows)} 行")
        return elapsed

    print("\n=== 索引性能对比(5000条任务)===\n")
    print(f"  {'查询':<30} {'耗时':>8}  {'说明'}")
    print("  " + "-" * 55)

    # 无索引
    t1 = timed_query("无索引: WHERE status='done'",
                     "SELECT * FROM tasks WHERE status='done'")
    t2 = timed_query("无索引: WHERE assignee_id=3",
                     "SELECT * FROM tasks WHERE assignee_id=?", (3,))

可运行演示(补齐 Mock 数据与 print 反馈)

# 文件名: 17-python-mysql-advanced.py




# 运行: python3 17-python-mysql-advanced.py --mode index
#       python3 17-python-mysql-advanced.py --mode join
#       python3 17-python-mysql-advanced.py --mode aggregate

import argparse
import sqlite3
import time
import random
from typing import Any

SCHEMA = """
CREATE TABLE IF NOT EXISTS users (
    id         INTEGER PRIMARY KEY AUTOINCREMENT,
    username   TEXT    NOT NULL UNIQUE,
    role       TEXT    DEFAULT 'member',
    team       TEXT,
    created_at TEXT    DEFAULT (datetime('now','localtime'))
);

CREATE TABLE IF NOT EXISTS tasks (
    id          INTEGER PRIMARY KEY AUTOINCREMENT,
    title       TEXT    NOT NULL,
    priority    TEXT    DEFAULT 'medium',
    status      TEXT    DEFAULT 'pending',
    assignee_id INTEGER REFERENCES users(id),
    created_at  TEXT    DEFAULT (datetime('now','localtime')),
    due_date    TEXT
);

CREATE TABLE IF NOT EXISTS task_logs (
    id         INTEGER PRIMARY KEY AUTOINCREMENT,
    task_id    INTEGER REFERENCES tasks(id),
    user_id    INTEGER REFERENCES users(id),
    action     TEXT,
    detail     TEXT,
    logged_at  TEXT    DEFAULT (datetime('now','localtime'))
);
"""

USERS = [
    ("alice", "lead",   "backend"),
    ("bob",   "member", "backend"),
    ("carol", "member", "frontend"),
    ("dave",  "lead",   "frontend"),
    ("eve",   "member", "devops"),
]

PRIORITIES = ["low", "medium", "high"]
STATUSES   = ["pending", "doing", "done"]
ACTIONS    = ["created", "updated", "commented", "closed"]


def setup_db(n_tasks: int = 500) -> sqlite3.Connection:
    conn = sqlite3.connect(":memory:")
    conn.row_factory = sqlite3.Row
    conn.executescript(SCHEMA)

    conn.executemany(
        "INSERT INTO users (username, role, team) VALUES (?,?,?)", USERS
    )

    rng = random.Random(42)
    tasks = [
        (
            f"Task-{i:04d}",
            rng.choice(PRIORITIES),
            rng.choice(STATUSES),
            rng.randint(1, len(USERS)),
            f"2026-04-{rng.randint(1,30):02d}",
        )
        for i in range(1, n_tasks + 1)
    ]
    conn.executemany(
        "INSERT INTO tasks (title, priority, status, assignee_id, due_date) VALUES (?,?,?,?,?)",
        tasks,
    )

    logs = [
        (rng.randint(1, n_tasks), rng.randint(1, len(USERS)), rng.choice(ACTIONS), f"detail-{i}")
        for i in range(1, n_tasks * 3 + 1)
    ]
    conn.executemany(
        "INSERT INTO task_logs (task_id, user_id, action, detail) VALUES (?,?,?,?)",
        logs,
    )
    conn.commit()
    return conn

# Step 3:timed_query 是计时包装器,用 perf_counter 精确到微秒级。
# 无索引时 WHERE status='done' 必须逐行扫描整张表。
def demo_index_preview():
    conn = setup_db(3000)
    def timed_query(label, sql, params=()):
        import time
        t0 = time.perf_counter()
        rows = conn.execute(sql, params).fetchall()
        elapsed = (time.perf_counter() - t0) * 1000
        print(f"  {label:<30} {elapsed:>8.3f}ms  结果: {len(rows)} 行")
        return elapsed
    print("\n=== 无索引查询计时 ===")
    timed_query("无索引: WHERE status='done'", "SELECT * FROM tasks WHERE status='done'")
    timed_query("无索引: WHERE assignee_id=3", "SELECT * FROM tasks WHERE assignee_id=?", (3,))
    conn.close()

demo_index_preview()

Step 4:用 CREATE INDEX 建索引,对比加速效果,用 EXPLAIN QUERY PLAN 验证

痛点与机制

CREATE INDEX 告诉数据库"为这列建一棵 B+树"——就像给字典加目录,查找从"逐页翻"变成"先查目录再翻到对应页"。建完索引后,WHERE status='done' 从全表扫描变成树上二分查找,速度提升数倍。EXPLAIN QUERY PLAN 是 SQLite 的"查询计划查看器":输出里出现 USING INDEX 说明索引生效,出现 SCAN TABLE 说明还在全表扫描。

核心源码(逐字来自文末完整源码)

def demo_index() -> None:
    conn = setup_db(5000)

    def timed_query(label: str, sql: str, params: tuple[Any, ...] = ()) -> float:
        t0 = time.perf_counter()
        rows = conn.execute(sql, params).fetchall()
        elapsed = (time.perf_counter() - t0) * 1000
        print(f"  {label:<30} {elapsed:>8.3f}ms  结果: {len(rows)} 行")
        return elapsed

    print("\n=== 索引性能对比(5000条任务)===\n")
    print(f"  {'查询':<30} {'耗时':>8}  {'说明'}")
    print("  " + "-" * 55)

    # 无索引
    t1 = timed_query("无索引: WHERE status='done'",
                     "SELECT * FROM tasks WHERE status='done'")
    t2 = timed_query("无索引: WHERE assignee_id=3",
                     "SELECT * FROM tasks WHERE assignee_id=?", (3,))

    # 创建索引
    conn.execute("CREATE INDEX idx_tasks_status ON tasks(status)")
    conn.execute("CREATE INDEX idx_tasks_assignee ON tasks(assignee_id)")
    conn.execute("CREATE INDEX idx_tasks_priority_status ON tasks(priority, status)")
    conn.commit()

    print()
    t3 = timed_query("有索引: WHERE status='done'",
                     "SELECT * FROM tasks WHERE status='done'")
    t4 = timed_query("有索引: WHERE assignee_id=3",
                     "SELECT * FROM tasks WHERE assignee_id=?", (3,))
    t5 = timed_query("复合索引: priority+status",
                     "SELECT * FROM tasks WHERE priority='high' AND status='pending'")

    print(f"\n  status 查询加速: {t1/t3:.1f}x" if t3 > 0 else "")
    print(f"  assignee 查询加速: {t2/t4:.1f}x" if t4 > 0 else "")

    # EXPLAIN QUERY PLAN
    print("\n=== EXPLAIN QUERY PLAN(索引使用情况)===")
    for row in conn.execute(
        "EXPLAIN QUERY PLAN SELECT * FROM tasks WHERE status='done'"
    ):
        print(f"  {dict(row)}")

    conn.close()

可运行演示(补齐 Mock 数据与 print 反馈)

# 文件名: 17-python-mysql-advanced.py




# 运行: python3 17-python-mysql-advanced.py --mode index
#       python3 17-python-mysql-advanced.py --mode join
#       python3 17-python-mysql-advanced.py --mode aggregate

import argparse
import sqlite3
import time
import random
from typing import Any

SCHEMA = """
CREATE TABLE IF NOT EXISTS users (
    id         INTEGER PRIMARY KEY AUTOINCREMENT,
    username   TEXT    NOT NULL UNIQUE,
    role       TEXT    DEFAULT 'member',
    team       TEXT,
    created_at TEXT    DEFAULT (datetime('now','localtime'))
);

CREATE TABLE IF NOT EXISTS tasks (
    id          INTEGER PRIMARY KEY AUTOINCREMENT,
    title       TEXT    NOT NULL,
    priority    TEXT    DEFAULT 'medium',
    status      TEXT    DEFAULT 'pending',
    assignee_id INTEGER REFERENCES users(id),
    created_at  TEXT    DEFAULT (datetime('now','localtime')),
    due_date    TEXT
);

CREATE TABLE IF NOT EXISTS task_logs (
    id         INTEGER PRIMARY KEY AUTOINCREMENT,
    task_id    INTEGER REFERENCES tasks(id),
    user_id    INTEGER REFERENCES users(id),
    action     TEXT,
    detail     TEXT,
    logged_at  TEXT    DEFAULT (datetime('now','localtime'))
);
"""

USERS = [
    ("alice", "lead",   "backend"),
    ("bob",   "member", "backend"),
    ("carol", "member", "frontend"),
    ("dave",  "lead",   "frontend"),
    ("eve",   "member", "devops"),
]

PRIORITIES = ["low", "medium", "high"]
STATUSES   = ["pending", "doing", "done"]
ACTIONS    = ["created", "updated", "commented", "closed"]


def setup_db(n_tasks: int = 500) -> sqlite3.Connection:
    conn = sqlite3.connect(":memory:")
    conn.row_factory = sqlite3.Row
    conn.executescript(SCHEMA)

    conn.executemany(
        "INSERT INTO users (username, role, team) VALUES (?,?,?)", USERS
    )

    rng = random.Random(42)
    tasks = [
        (
            f"Task-{i:04d}",
            rng.choice(PRIORITIES),
            rng.choice(STATUSES),
            rng.randint(1, len(USERS)),
            f"2026-04-{rng.randint(1,30):02d}",
        )
        for i in range(1, n_tasks + 1)
    ]
    conn.executemany(
        "INSERT INTO tasks (title, priority, status, assignee_id, due_date) VALUES (?,?,?,?,?)",
        tasks,
    )

    logs = [
        (rng.randint(1, n_tasks), rng.randint(1, len(USERS)), rng.choice(ACTIONS), f"detail-{i}")
        for i in range(1, n_tasks * 3 + 1)
    ]
    conn.executemany(
        "INSERT INTO task_logs (task_id, user_id, action, detail) VALUES (?,?,?,?)",
        logs,
    )
    conn.commit()
    return conn

# Step 4:CREATE INDEX 建 B+树索引,WHERE 查询从全表扫描变成树上二分查找。
# EXPLAIN QUERY PLAN 输出 "USING INDEX" 说明索引生效。
def demo_index() -> None:
    conn = setup_db(5000)

    def timed_query(label: str, sql: str, params: tuple[Any, ...] = ()) -> float:
        t0 = time.perf_counter()
        rows = conn.execute(sql, params).fetchall()
        elapsed = (time.perf_counter() - t0) * 1000
        print(f"  {label:<30} {elapsed:>8.3f}ms  结果: {len(rows)} 行")
        return elapsed

    print("\n=== 索引性能对比(5000条任务)===\n")
    print(f"  {'查询':<30} {'耗时':>8}  {'说明'}")
    print("  " + "-" * 55)

    # 无索引
    t1 = timed_query("无索引: WHERE status='done'",
                     "SELECT * FROM tasks WHERE status='done'")
    t2 = timed_query("无索引: WHERE assignee_id=3",
                     "SELECT * FROM tasks WHERE assignee_id=?", (3,))

    # 创建索引
    conn.execute("CREATE INDEX idx_tasks_status ON tasks(status)")
    conn.execute("CREATE INDEX idx_tasks_assignee ON tasks(assignee_id)")
    conn.execute("CREATE INDEX idx_tasks_priority_status ON tasks(priority, status)")
    conn.commit()

    print()
    t3 = timed_query("有索引: WHERE status='done'",
                     "SELECT * FROM tasks WHERE status='done'")
    t4 = timed_query("有索引: WHERE assignee_id=3",
                     "SELECT * FROM tasks WHERE assignee_id=?", (3,))
    t5 = timed_query("复合索引: priority+status",
                     "SELECT * FROM tasks WHERE priority='high' AND status='pending'")

    print(f"\n  status 查询加速: {t1/t3:.1f}x" if t3 > 0 else "")
    print(f"  assignee 查询加速: {t2/t4:.1f}x" if t4 > 0 else "")

    # EXPLAIN QUERY PLAN
    print("\n=== EXPLAIN QUERY PLAN(索引使用情况)===")
    for row in conn.execute(
        "EXPLAIN QUERY PLAN SELECT * FROM tasks WHERE status='done'"
    ):
        print(f"  {dict(row)}")

    conn.close()
demo_index()

Step 5:用 INNER JOIN 把任务表和用户表关联,理解 ON 条件

痛点与机制

INNER JOIN 是"取交集"——就像两个班级的名单取共同出现的名字。ON t.assignee_id = u.id 是连接条件:任务的负责人 ID 必须在用户表里存在。如果任务没有分配负责人(assignee_id IS NULL),INNER JOIN 会把它过滤掉——这是 INNER JOIN 和 LEFT JOIN 最关键的区别。

核心源码(逐字来自文末完整源码)

def demo_join() -> None:
    conn = setup_db(100)

    print("\n=== JOIN 连接查询演示 ===")

    # INNER JOIN:任务+负责人
    print("\n── INNER JOIN:任务与负责人 ──")
    print(f"{'任务ID':<8} {'标题':<20} {'负责人':<10} {'团队':<10} {'状态'}")
    print("-" * 60)
    rows = conn.execute("""
        SELECT t.id, t.title, u.username, u.team, t.status
        FROM tasks t
        INNER JOIN users u ON t.assignee_id = u.id
        WHERE t.priority = 'high'
        ORDER BY t.id
        LIMIT 8
    """).fetchall()
    for r in rows:
        print(f"{r['id']:<8} {r['title']:<20} {r['username']:<10} {r['team']:<10} {r['status']}")

可运行演示(补齐 Mock 数据与 print 反馈)

# 文件名: 17-python-mysql-advanced.py




# 运行: python3 17-python-mysql-advanced.py --mode index
#       python3 17-python-mysql-advanced.py --mode join
#       python3 17-python-mysql-advanced.py --mode aggregate

import argparse
import sqlite3
import time
import random
from typing import Any

SCHEMA = """
CREATE TABLE IF NOT EXISTS users (
    id         INTEGER PRIMARY KEY AUTOINCREMENT,
    username   TEXT    NOT NULL UNIQUE,
    role       TEXT    DEFAULT 'member',
    team       TEXT,
    created_at TEXT    DEFAULT (datetime('now','localtime'))
);

CREATE TABLE IF NOT EXISTS tasks (
    id          INTEGER PRIMARY KEY AUTOINCREMENT,
    title       TEXT    NOT NULL,
    priority    TEXT    DEFAULT 'medium',
    status      TEXT    DEFAULT 'pending',
    assignee_id INTEGER REFERENCES users(id),
    created_at  TEXT    DEFAULT (datetime('now','localtime')),
    due_date    TEXT
);

CREATE TABLE IF NOT EXISTS task_logs (
    id         INTEGER PRIMARY KEY AUTOINCREMENT,
    task_id    INTEGER REFERENCES tasks(id),
    user_id    INTEGER REFERENCES users(id),
    action     TEXT,
    detail     TEXT,
    logged_at  TEXT    DEFAULT (datetime('now','localtime'))
);
"""

USERS = [
    ("alice", "lead",   "backend"),
    ("bob",   "member", "backend"),
    ("carol", "member", "frontend"),
    ("dave",  "lead",   "frontend"),
    ("eve",   "member", "devops"),
]

PRIORITIES = ["low", "medium", "high"]
STATUSES   = ["pending", "doing", "done"]
ACTIONS    = ["created", "updated", "commented", "closed"]


def setup_db(n_tasks: int = 500) -> sqlite3.Connection:
    conn = sqlite3.connect(":memory:")
    conn.row_factory = sqlite3.Row
    conn.executescript(SCHEMA)

    conn.executemany(
        "INSERT INTO users (username, role, team) VALUES (?,?,?)", USERS
    )

    rng = random.Random(42)
    tasks = [
        (
            f"Task-{i:04d}",
            rng.choice(PRIORITIES),
            rng.choice(STATUSES),
            rng.randint(1, len(USERS)),
            f"2026-04-{rng.randint(1,30):02d}",
        )
        for i in range(1, n_tasks + 1)
    ]
    conn.executemany(
        "INSERT INTO tasks (title, priority, status, assignee_id, due_date) VALUES (?,?,?,?,?)",
        tasks,
    )

    logs = [
        (rng.randint(1, n_tasks), rng.randint(1, len(USERS)), rng.choice(ACTIONS), f"detail-{i}")
        for i in range(1, n_tasks * 3 + 1)
    ]
    conn.executemany(
        "INSERT INTO task_logs (task_id, user_id, action, detail) VALUES (?,?,?,?)",
        logs,
    )
    conn.commit()
    return conn

# Step 5:INNER JOIN 取交集,ON 条件是连接键,没有负责人的任务会被过滤掉。
def demo_join_preview():
    conn = setup_db(50)
    print("\n=== INNER JOIN 演示 ===")
    rows = conn.execute('''
        SELECT t.id, t.title, u.username, u.team, t.status
        FROM tasks t INNER JOIN users u ON t.assignee_id = u.id
        WHERE t.priority = 'high' ORDER BY t.id LIMIT 5
    ''').fetchall()
    print(f"{'任务ID':<8} {'标题':<20} {'负责人':<10} {'团队':<10} {'状态'}")
    print("-" * 60)
    for r in rows:
        print(f"{r['id']:<8} {r['title']:<20} {r['username']:<10} {r['team']:<10} {r['status']}")
    conn.close()

demo_join_preview()

Step 6:用 LEFT JOIN 找出未分配任务,用三表 JOIN 查操作历史

痛点与机制

LEFT JOIN 是"保留左表所有行"——就像点名时不管右边有没有人,左边的名字都要出现,右边没有匹配的列填 NULLWHERE u.id IS NULL 就能筛出"没有负责人的任务"。三表 JOIN 就是两次 JOIN 串联:task_logs JOIN tasks JOIN users,每次 JOIN 都在上一步结果的基础上再关联一张表,像搭积木一样。

核心源码(逐字来自文末完整源码)

def demo_join() -> None:
    conn = setup_db(100)

    print("\n=== JOIN 连接查询演示 ===")

    # INNER JOIN:任务+负责人
    print("\n── INNER JOIN:任务与负责人 ──")
    print(f"{'任务ID':<8} {'标题':<20} {'负责人':<10} {'团队':<10} {'状态'}")
    print("-" * 60)
    rows = conn.execute("""
        SELECT t.id, t.title, u.username, u.team, t.status
        FROM tasks t
        INNER JOIN users u ON t.assignee_id = u.id
        WHERE t.priority = 'high'
        ORDER BY t.id
        LIMIT 8
    """).fetchall()
    for r in rows:
        print(f"{r['id']:<8} {r['title']:<20} {r['username']:<10} {r['team']:<10} {r['status']}")

    # LEFT JOIN:含未分配任务
    unassigned = conn.execute("""
        SELECT COUNT(*) as cnt FROM tasks t
        LEFT JOIN users u ON t.assignee_id = u.id
        WHERE u.id IS NULL
    """).fetchone()
    print(f"\nLEFT JOIN 发现未分配任务: {unassigned['cnt']} 条")

    # 三表 JOIN:任务+负责人+操作日志
    print("\n── 三表 JOIN:任务操作历史 ──")
    print(f"{'任务':<15} {'操作人':<10} {'动作':<12} {'时间'}")
    print("-" * 55)
    rows = conn.execute("""
        SELECT t.title, u.username, l.action, l.logged_at
        FROM task_logs l
        INNER JOIN tasks t ON l.task_id = t.id
        INNER JOIN users u ON l.user_id = u.id
        ORDER BY l.id DESC
        LIMIT 6
    """).fetchall()
    for r in rows:
        print(f"{r['title']:<15} {r['username']:<10} {r['action']:<12} {r['logged_at']}")

    conn.close()

可运行演示(补齐 Mock 数据与 print 反馈)

# 文件名: 17-python-mysql-advanced.py




# 运行: python3 17-python-mysql-advanced.py --mode index
#       python3 17-python-mysql-advanced.py --mode join
#       python3 17-python-mysql-advanced.py --mode aggregate

import argparse
import sqlite3
import time
import random
from typing import Any

SCHEMA = """
CREATE TABLE IF NOT EXISTS users (
    id         INTEGER PRIMARY KEY AUTOINCREMENT,
    username   TEXT    NOT NULL UNIQUE,
    role       TEXT    DEFAULT 'member',
    team       TEXT,
    created_at TEXT    DEFAULT (datetime('now','localtime'))
);

CREATE TABLE IF NOT EXISTS tasks (
    id          INTEGER PRIMARY KEY AUTOINCREMENT,
    title       TEXT    NOT NULL,
    priority    TEXT    DEFAULT 'medium',
    status      TEXT    DEFAULT 'pending',
    assignee_id INTEGER REFERENCES users(id),
    created_at  TEXT    DEFAULT (datetime('now','localtime')),
    due_date    TEXT
);

CREATE TABLE IF NOT EXISTS task_logs (
    id         INTEGER PRIMARY KEY AUTOINCREMENT,
    task_id    INTEGER REFERENCES tasks(id),
    user_id    INTEGER REFERENCES users(id),
    action     TEXT,
    detail     TEXT,
    logged_at  TEXT    DEFAULT (datetime('now','localtime'))
);
"""

USERS = [
    ("alice", "lead",   "backend"),
    ("bob",   "member", "backend"),
    ("carol", "member", "frontend"),
    ("dave",  "lead",   "frontend"),
    ("eve",   "member", "devops"),
]

PRIORITIES = ["low", "medium", "high"]
STATUSES   = ["pending", "doing", "done"]
ACTIONS    = ["created", "updated", "commented", "closed"]


def setup_db(n_tasks: int = 500) -> sqlite3.Connection:
    conn = sqlite3.connect(":memory:")
    conn.row_factory = sqlite3.Row
    conn.executescript(SCHEMA)

    conn.executemany(
        "INSERT INTO users (username, role, team) VALUES (?,?,?)", USERS
    )

    rng = random.Random(42)
    tasks = [
        (
            f"Task-{i:04d}",
            rng.choice(PRIORITIES),
            rng.choice(STATUSES),
            rng.randint(1, len(USERS)),
            f"2026-04-{rng.randint(1,30):02d}",
        )
        for i in range(1, n_tasks + 1)
    ]
    conn.executemany(
        "INSERT INTO tasks (title, priority, status, assignee_id, due_date) VALUES (?,?,?,?,?)",
        tasks,
    )

    logs = [
        (rng.randint(1, n_tasks), rng.randint(1, len(USERS)), rng.choice(ACTIONS), f"detail-{i}")
        for i in range(1, n_tasks * 3 + 1)
    ]
    conn.executemany(
        "INSERT INTO task_logs (task_id, user_id, action, detail) VALUES (?,?,?,?)",
        logs,
    )
    conn.commit()
    return conn

# Step 6:LEFT JOIN 保留左表所有行,右表无匹配则填 NULL,用于找未分配任务。
# 三表 JOIN 就是两次 JOIN 串联,每次在上一步结果基础上再关联一张表。
def demo_join() -> None:
    conn = setup_db(100)

    print("\n=== JOIN 连接查询演示 ===")

    # INNER JOIN:任务+负责人
    print("\n── INNER JOIN:任务与负责人 ──")
    print(f"{'任务ID':<8} {'标题':<20} {'负责人':<10} {'团队':<10} {'状态'}")
    print("-" * 60)
    rows = conn.execute("""
        SELECT t.id, t.title, u.username, u.team, t.status
        FROM tasks t
        INNER JOIN users u ON t.assignee_id = u.id
        WHERE t.priority = 'high'
        ORDER BY t.id
        LIMIT 8
    """).fetchall()
    for r in rows:
        print(f"{r['id']:<8} {r['title']:<20} {r['username']:<10} {r['team']:<10} {r['status']}")

    # LEFT JOIN:含未分配任务
    unassigned = conn.execute("""
        SELECT COUNT(*) as cnt FROM tasks t
        LEFT JOIN users u ON t.assignee_id = u.id
        WHERE u.id IS NULL
    """).fetchone()
    print(f"\nLEFT JOIN 发现未分配任务: {unassigned['cnt']} 条")

    # 三表 JOIN:任务+负责人+操作日志
    print("\n── 三表 JOIN:任务操作历史 ──")
    print(f"{'任务':<15} {'操作人':<10} {'动作':<12} {'时间'}")
    print("-" * 55)
    rows = conn.execute("""
        SELECT t.title, u.username, l.action, l.logged_at
        FROM task_logs l
        INNER JOIN tasks t ON l.task_id = t.id
        INNER JOIN users u ON l.user_id = u.id
        ORDER BY l.id DESC
        LIMIT 6
    """).fetchall()
    for r in rows:
        print(f"{r['title']:<15} {r['username']:<10} {r['action']:<12} {r['logged_at']}")

    conn.close()
demo_join()

Step 7:用 GROUP BY + HAVING + 子查询实现多维聚合统计

痛点与机制

GROUP BY 是"按某列分组,对每组做聚合"——就像把一堆快递按省份分堆,再数每堆有多少个。HAVING 是"对聚合结果再过滤",和 WHERE 的区别是:WHERE 在分组前过滤行,HAVING 在分组后过滤组。CASE WHEN status='done' THEN 1 ELSE 0 END 是 SQL 里的"条件计数"技巧,相当于 Python 里的 sum(1 for x in rows if x.status == 'done')。子查询里的 AVG(cnt) 先算出每人平均任务数,外层 HAVING 再筛出超过平均值的人。

核心源码(逐字来自文末完整源码)

def demo_aggregate() -> None:
    conn = setup_db(200)

    print("\n=== 聚合统计演示 ===")

    # 按团队统计任务完成率
    print("\n── 按团队统计任务完成率 ──")
    print(f"{'团队':<12} {'总任务':<8} {'已完成':<8} {'完成率':<8} {'高优先级'}")
    print("-" * 50)
    rows = conn.execute("""
        SELECT
            u.team,
            COUNT(t.id)                                    AS total,
            SUM(CASE WHEN t.status='done' THEN 1 ELSE 0 END) AS done_cnt,
            ROUND(SUM(CASE WHEN t.status='done' THEN 1.0 ELSE 0 END) / COUNT(t.id) * 100, 1) AS done_pct,
            SUM(CASE WHEN t.priority='high' THEN 1 ELSE 0 END) AS high_cnt
        FROM tasks t
        INNER JOIN users u ON t.assignee_id = u.id
        GROUP BY u.team
        ORDER BY done_pct DESC
    """).fetchall()
    for r in rows:
        bar = "█" * int(r["done_pct"] / 10)
        print(f"{r['team']:<12} {r['total']:<8} {r['done_cnt']:<8} {r['done_pct']:<6}%  {bar}")

    # 子查询:找出任务数超过平均值的成员
    print("\n── 子查询:任务数超过平均值的成员 ──")
    print(f"{'成员':<10} {'角色':<8} {'任务数':<8} {'vs平均'}")
    print("-" * 40)
    rows = conn.execute("""
        SELECT u.username, u.role, COUNT(t.id) AS task_cnt
        FROM users u
        LEFT JOIN tasks t ON t.assignee_id = u.id
        GROUP BY u.id
        HAVING task_cnt > (
            SELECT AVG(cnt) FROM (
                SELECT COUNT(*) AS cnt FROM tasks GROUP BY assignee_id
            )
        )
        ORDER BY task_cnt DESC
    """).fetchall()
    avg_row = conn.execute(
        "SELECT AVG(cnt) as avg FROM (SELECT COUNT(*) as cnt FROM tasks GROUP BY assignee_id)"
    ).fetchone()
    avg = avg_row["avg"] if avg_row else 0
    for r in rows:
        diff = r["task_cnt"] - avg
        print(f"{r['username']:<10} {r['role']:<8} {r['task_cnt']:<8} +{diff:.1f}")

    # 窗口函数替代:按优先级分组统计
    print("\n── 按优先级×状态交叉统计 ──")
    print(f"{'优先级':<10} {'pending':<10} {'doing':<10} {'done':<10} {'合计'}")
    print("-" * 50)
    pivot: dict[str, dict[str, int]] = {}
    for row in conn.execute("SELECT priority, status, COUNT(*) as cnt FROM tasks GROUP BY priority, status"):
        pivot.setdefault(row["priority"], {})[row["status"]] = row["cnt"]
    for pri in ["high", "medium", "low"]:
        d = pivot.get(pri, {})
        p, do, dn = d.get("pending", 0), d.get("doing", 0), d.get("done", 0)
        print(f"{pri:<10} {p:<10} {do:<10} {dn:<10} {p+do+dn}")

    conn.close()

可运行演示(补齐 Mock 数据与 print 反馈)

# 文件名: 17-python-mysql-advanced.py




# 运行: python3 17-python-mysql-advanced.py --mode index
#       python3 17-python-mysql-advanced.py --mode join
#       python3 17-python-mysql-advanced.py --mode aggregate

import argparse
import sqlite3
import time
import random
from typing import Any

SCHEMA = """
CREATE TABLE IF NOT EXISTS users (
    id         INTEGER PRIMARY KEY AUTOINCREMENT,
    username   TEXT    NOT NULL UNIQUE,
    role       TEXT    DEFAULT 'member',
    team       TEXT,
    created_at TEXT    DEFAULT (datetime('now','localtime'))
);

CREATE TABLE IF NOT EXISTS tasks (
    id          INTEGER PRIMARY KEY AUTOINCREMENT,
    title       TEXT    NOT NULL,
    priority    TEXT    DEFAULT 'medium',
    status      TEXT    DEFAULT 'pending',
    assignee_id INTEGER REFERENCES users(id),
    created_at  TEXT    DEFAULT (datetime('now','localtime')),
    due_date    TEXT
);

CREATE TABLE IF NOT EXISTS task_logs (
    id         INTEGER PRIMARY KEY AUTOINCREMENT,
    task_id    INTEGER REFERENCES tasks(id),
    user_id    INTEGER REFERENCES users(id),
    action     TEXT,
    detail     TEXT,
    logged_at  TEXT    DEFAULT (datetime('now','localtime'))
);
"""

USERS = [
    ("alice", "lead",   "backend"),
    ("bob",   "member", "backend"),
    ("carol", "member", "frontend"),
    ("dave",  "lead",   "frontend"),
    ("eve",   "member", "devops"),
]

PRIORITIES = ["low", "medium", "high"]
STATUSES   = ["pending", "doing", "done"]
ACTIONS    = ["created", "updated", "commented", "closed"]


def setup_db(n_tasks: int = 500) -> sqlite3.Connection:
    conn = sqlite3.connect(":memory:")
    conn.row_factory = sqlite3.Row
    conn.executescript(SCHEMA)

    conn.executemany(
        "INSERT INTO users (username, role, team) VALUES (?,?,?)", USERS
    )

    rng = random.Random(42)
    tasks = [
        (
            f"Task-{i:04d}",
            rng.choice(PRIORITIES),
            rng.choice(STATUSES),
            rng.randint(1, len(USERS)),
            f"2026-04-{rng.randint(1,30):02d}",
        )
        for i in range(1, n_tasks + 1)
    ]
    conn.executemany(
        "INSERT INTO tasks (title, priority, status, assignee_id, due_date) VALUES (?,?,?,?,?)",
        tasks,
    )

    logs = [
        (rng.randint(1, n_tasks), rng.randint(1, len(USERS)), rng.choice(ACTIONS), f"detail-{i}")
        for i in range(1, n_tasks * 3 + 1)
    ]
    conn.executemany(
        "INSERT INTO task_logs (task_id, user_id, action, detail) VALUES (?,?,?,?)",
        logs,
    )
    conn.commit()
    return conn

# Step 7:GROUP BY 分组聚合,HAVING 对聚合结果过滤(WHERE 在分组前,HAVING 在分组后)。
# CASE WHEN status='done' THEN 1 ELSE 0 END 是 SQL 里的条件计数技巧。
def demo_aggregate() -> None:
    conn = setup_db(200)

    print("\n=== 聚合统计演示 ===")

    # 按团队统计任务完成率
    print("\n── 按团队统计任务完成率 ──")
    print(f"{'团队':<12} {'总任务':<8} {'已完成':<8} {'完成率':<8} {'高优先级'}")
    print("-" * 50)
    rows = conn.execute("""
        SELECT
            u.team,
            COUNT(t.id)                                    AS total,
            SUM(CASE WHEN t.status='done' THEN 1 ELSE 0 END) AS done_cnt,
            ROUND(SUM(CASE WHEN t.status='done' THEN 1.0 ELSE 0 END) / COUNT(t.id) * 100, 1) AS done_pct,
            SUM(CASE WHEN t.priority='high' THEN 1 ELSE 0 END) AS high_cnt
        FROM tasks t
        INNER JOIN users u ON t.assignee_id = u.id
        GROUP BY u.team
        ORDER BY done_pct DESC
    """).fetchall()
    for r in rows:
        bar = "█" * int(r["done_pct"] / 10)
        print(f"{r['team']:<12} {r['total']:<8} {r['done_cnt']:<8} {r['done_pct']:<6}%  {bar}")

    # 子查询:找出任务数超过平均值的成员
    print("\n── 子查询:任务数超过平均值的成员 ──")
    print(f"{'成员':<10} {'角色':<8} {'任务数':<8} {'vs平均'}")
    print("-" * 40)
    rows = conn.execute("""
        SELECT u.username, u.role, COUNT(t.id) AS task_cnt
        FROM users u
        LEFT JOIN tasks t ON t.assignee_id = u.id
        GROUP BY u.id
        HAVING task_cnt > (
            SELECT AVG(cnt) FROM (
                SELECT COUNT(*) AS cnt FROM tasks GROUP BY assignee_id
            )
        )
        ORDER BY task_cnt DESC
    """).fetchall()
    avg_row = conn.execute(
        "SELECT AVG(cnt) as avg FROM (SELECT COUNT(*) as cnt FROM tasks GROUP BY assignee_id)"
    ).fetchone()
    avg = avg_row["avg"] if avg_row else 0
    for r in rows:
        diff = r["task_cnt"] - avg
        print(f"{r['username']:<10} {r['role']:<8} {r['task_cnt']:<8} +{diff:.1f}")

    # 窗口函数替代:按优先级分组统计
    print("\n── 按优先级×状态交叉统计 ──")
    print(f"{'优先级':<10} {'pending':<10} {'doing':<10} {'done':<10} {'合计'}")
    print("-" * 50)
    pivot: dict[str, dict[str, int]] = {}
    for row in conn.execute("SELECT priority, status, COUNT(*) as cnt FROM tasks GROUP BY priority, status"):
        pivot.setdefault(row["priority"], {})[row["status"]] = row["cnt"]
    for pri in ["high", "medium", "low"]:
        d = pivot.get(pri, {})
        p, do, dn = d.get("pending", 0), d.get("doing", 0), d.get("done", 0)
        print(f"{pri:<10} {p:<10} {do:<10} {dn:<10} {p+do+dn}")

    conn.close()
demo_aggregate()

Step 8:用 main 做 index/join/aggregate 三种模式的 CLI 总入口

痛点与机制

mainargparse 做 CLI 入口,--mode 参数让读者不改代码就能切换演示场景。默认 mode=aggregate,直接运行就能看到聚合统计结果;--mode index 看索引加速对比;--mode join 看多表关联查询。三种模式共用同一套 setup_db,保证数据一致性。

核心源码(逐字来自文末完整源码)

def main() -> None:
    parser = argparse.ArgumentParser(description="数据库进阶演示")
    parser.add_argument(
        "--mode",
        choices=["index", "join", "aggregate"],
        default="aggregate",
        help="index=索引性能对比, join=连接查询, aggregate=聚合统计",
    )
    args = parser.parse_args()

    if args.mode == "index":
        demo_index()
    elif args.mode == "join":
        demo_join()
    else:
        demo_aggregate()

可运行演示(补齐 Mock 数据与 print 反馈)

# 文件名: 17-python-mysql-advanced.py




# 运行: python3 17-python-mysql-advanced.py --mode index
#       python3 17-python-mysql-advanced.py --mode join
#       python3 17-python-mysql-advanced.py --mode aggregate

import argparse
import sqlite3
import time
import random
from typing import Any

SCHEMA = """
CREATE TABLE IF NOT EXISTS users (
    id         INTEGER PRIMARY KEY AUTOINCREMENT,
    username   TEXT    NOT NULL UNIQUE,
    role       TEXT    DEFAULT 'member',
    team       TEXT,
    created_at TEXT    DEFAULT (datetime('now','localtime'))
);

CREATE TABLE IF NOT EXISTS tasks (
    id          INTEGER PRIMARY KEY AUTOINCREMENT,
    title       TEXT    NOT NULL,
    priority    TEXT    DEFAULT 'medium',
    status      TEXT    DEFAULT 'pending',
    assignee_id INTEGER REFERENCES users(id),
    created_at  TEXT    DEFAULT (datetime('now','localtime')),
    due_date    TEXT
);

CREATE TABLE IF NOT EXISTS task_logs (
    id         INTEGER PRIMARY KEY AUTOINCREMENT,
    task_id    INTEGER REFERENCES tasks(id),
    user_id    INTEGER REFERENCES users(id),
    action     TEXT,
    detail     TEXT,
    logged_at  TEXT    DEFAULT (datetime('now','localtime'))
);
"""

USERS = [
    ("alice", "lead",   "backend"),
    ("bob",   "member", "backend"),
    ("carol", "member", "frontend"),
    ("dave",  "lead",   "frontend"),
    ("eve",   "member", "devops"),
]

PRIORITIES = ["low", "medium", "high"]
STATUSES   = ["pending", "doing", "done"]
ACTIONS    = ["created", "updated", "commented", "closed"]


def setup_db(n_tasks: int = 500) -> sqlite3.Connection:
    conn = sqlite3.connect(":memory:")
    conn.row_factory = sqlite3.Row
    conn.executescript(SCHEMA)

    conn.executemany(
        "INSERT INTO users (username, role, team) VALUES (?,?,?)", USERS
    )

    rng = random.Random(42)
    tasks = [
        (
            f"Task-{i:04d}",
            rng.choice(PRIORITIES),
            rng.choice(STATUSES),
            rng.randint(1, len(USERS)),
            f"2026-04-{rng.randint(1,30):02d}",
        )
        for i in range(1, n_tasks + 1)
    ]
    conn.executemany(
        "INSERT INTO tasks (title, priority, status, assignee_id, due_date) VALUES (?,?,?,?,?)",
        tasks,
    )

    logs = [
        (rng.randint(1, n_tasks), rng.randint(1, len(USERS)), rng.choice(ACTIONS), f"detail-{i}")
        for i in range(1, n_tasks * 3 + 1)
    ]
    conn.executemany(
        "INSERT INTO task_logs (task_id, user_id, action, detail) VALUES (?,?,?,?)",
        logs,
    )
    conn.commit()
    return conn
def demo_index() -> None:
    conn = setup_db(5000)

    def timed_query(label: str, sql: str, params: tuple[Any, ...] = ()) -> float:
        t0 = time.perf_counter()
        rows = conn.execute(sql, params).fetchall()
        elapsed = (time.perf_counter() - t0) * 1000
        print(f"  {label:<30} {elapsed:>8.3f}ms  结果: {len(rows)} 行")
        return elapsed

    print("\n=== 索引性能对比(5000条任务)===\n")
    print(f"  {'查询':<30} {'耗时':>8}  {'说明'}")
    print("  " + "-" * 55)

    # 无索引
    t1 = timed_query("无索引: WHERE status='done'",
                     "SELECT * FROM tasks WHERE status='done'")
    t2 = timed_query("无索引: WHERE assignee_id=3",
                     "SELECT * FROM tasks WHERE assignee_id=?", (3,))

    # 创建索引
    conn.execute("CREATE INDEX idx_tasks_status ON tasks(status)")
    conn.execute("CREATE INDEX idx_tasks_assignee ON tasks(assignee_id)")
    conn.execute("CREATE INDEX idx_tasks_priority_status ON tasks(priority, status)")
    conn.commit()

    print()
    t3 = timed_query("有索引: WHERE status='done'",
                     "SELECT * FROM tasks WHERE status='done'")
    t4 = timed_query("有索引: WHERE assignee_id=3",
                     "SELECT * FROM tasks WHERE assignee_id=?", (3,))
    t5 = timed_query("复合索引: priority+status",
                     "SELECT * FROM tasks WHERE priority='high' AND status='pending'")

    print(f"\n  status 查询加速: {t1/t3:.1f}x" if t3 > 0 else "")
    print(f"  assignee 查询加速: {t2/t4:.1f}x" if t4 > 0 else "")

    # EXPLAIN QUERY PLAN
    print("\n=== EXPLAIN QUERY PLAN(索引使用情况)===")
    for row in conn.execute(
        "EXPLAIN QUERY PLAN SELECT * FROM tasks WHERE status='done'"
    ):
        print(f"  {dict(row)}")

    conn.close()

def demo_join() -> None:
    conn = setup_db(100)

    print("\n=== JOIN 连接查询演示 ===")

    # INNER JOIN:任务+负责人
    print("\n── INNER JOIN:任务与负责人 ──")
    print(f"{'任务ID':<8} {'标题':<20} {'负责人':<10} {'团队':<10} {'状态'}")
    print("-" * 60)
    rows = conn.execute("""
        SELECT t.id, t.title, u.username, u.team, t.status
        FROM tasks t
        INNER JOIN users u ON t.assignee_id = u.id
        WHERE t.priority = 'high'
        ORDER BY t.id
        LIMIT 8
    """).fetchall()
    for r in rows:
        print(f"{r['id']:<8} {r['title']:<20} {r['username']:<10} {r['team']:<10} {r['status']}")

    # LEFT JOIN:含未分配任务
    unassigned = conn.execute("""
        SELECT COUNT(*) as cnt FROM tasks t
        LEFT JOIN users u ON t.assignee_id = u.id
        WHERE u.id IS NULL
    """).fetchone()
    print(f"\nLEFT JOIN 发现未分配任务: {unassigned['cnt']} 条")

    # 三表 JOIN:任务+负责人+操作日志
    print("\n── 三表 JOIN:任务操作历史 ──")
    print(f"{'任务':<15} {'操作人':<10} {'动作':<12} {'时间'}")
    print("-" * 55)
    rows = conn.execute("""
        SELECT t.title, u.username, l.action, l.logged_at
        FROM task_logs l
        INNER JOIN tasks t ON l.task_id = t.id
        INNER JOIN users u ON l.user_id = u.id
        ORDER BY l.id DESC
        LIMIT 6
    """).fetchall()
    for r in rows:
        print(f"{r['title']:<15} {r['username']:<10} {r['action']:<12} {r['logged_at']}")

    conn.close()

def demo_aggregate() -> None:
    conn = setup_db(200)

    print("\n=== 聚合统计演示 ===")

    # 按团队统计任务完成率
    print("\n── 按团队统计任务完成率 ──")
    print(f"{'团队':<12} {'总任务':<8} {'已完成':<8} {'完成率':<8} {'高优先级'}")
    print("-" * 50)
    rows = conn.execute("""
        SELECT
            u.team,
            COUNT(t.id)                                    AS total,
            SUM(CASE WHEN t.status='done' THEN 1 ELSE 0 END) AS done_cnt,
            ROUND(SUM(CASE WHEN t.status='done' THEN 1.0 ELSE 0 END) / COUNT(t.id) * 100, 1) AS done_pct,
            SUM(CASE WHEN t.priority='high' THEN 1 ELSE 0 END) AS high_cnt
        FROM tasks t
        INNER JOIN users u ON t.assignee_id = u.id
        GROUP BY u.team
        ORDER BY done_pct DESC
    """).fetchall()
    for r in rows:
        bar = "█" * int(r["done_pct"] / 10)
        print(f"{r['team']:<12} {r['total']:<8} {r['done_cnt']:<8} {r['done_pct']:<6}%  {bar}")

    # 子查询:找出任务数超过平均值的成员
    print("\n── 子查询:任务数超过平均值的成员 ──")
    print(f"{'成员':<10} {'角色':<8} {'任务数':<8} {'vs平均'}")
    print("-" * 40)
    rows = conn.execute("""
        SELECT u.username, u.role, COUNT(t.id) AS task_cnt
        FROM users u
        LEFT JOIN tasks t ON t.assignee_id = u.id
        GROUP BY u.id
        HAVING task_cnt > (
            SELECT AVG(cnt) FROM (
                SELECT COUNT(*) AS cnt FROM tasks GROUP BY assignee_id
            )
        )
        ORDER BY task_cnt DESC
    """).fetchall()
    avg_row = conn.execute(
        "SELECT AVG(cnt) as avg FROM (SELECT COUNT(*) as cnt FROM tasks GROUP BY assignee_id)"
    ).fetchone()
    avg = avg_row["avg"] if avg_row else 0
    for r in rows:
        diff = r["task_cnt"] - avg
        print(f"{r['username']:<10} {r['role']:<8} {r['task_cnt']:<8} +{diff:.1f}")

    # 窗口函数替代:按优先级分组统计
    print("\n── 按优先级×状态交叉统计 ──")
    print(f"{'优先级':<10} {'pending':<10} {'doing':<10} {'done':<10} {'合计'}")
    print("-" * 50)
    pivot: dict[str, dict[str, int]] = {}
    for row in conn.execute("SELECT priority, status, COUNT(*) as cnt FROM tasks GROUP BY priority, status"):
        pivot.setdefault(row["priority"], {})[row["status"]] = row["cnt"]
    for pri in ["high", "medium", "low"]:
        d = pivot.get(pri, {})
        p, do, dn = d.get("pending", 0), d.get("doing", 0), d.get("done", 0)
        print(f"{pri:<10} {p:<10} {do:<10} {dn:<10} {p+do+dn}")

    conn.close()


# Step 8:main 用 argparse 做 CLI 入口,--mode 切换三种演示场景。
def main() -> None:
    parser = argparse.ArgumentParser(description="数据库进阶演示")
    parser.add_argument(
        "--mode",
        choices=["index", "join", "aggregate"],
        default="aggregate",
        help="index=索引性能对比, join=连接查询, aggregate=聚合统计",
    )
    args = parser.parse_args()

    if args.mode == "index":
        demo_index()
    elif args.mode == "join":
        demo_join()
    else:
        demo_aggregate()

import sys
for mode in ["join", "aggregate"]:
    print(f"\n>>> mode={mode}")
    sys.argv = ["prog", "--mode", mode]
    main()

极客实战:完整源码与运行

现在,把上面的积木拼起来,将以下完整代码放进你的编辑器,运行它。先看整体闭环,再回头逐段改参数,你会更容易建立工程直觉。

# 文件名: 17-python-mysql-advanced.py




# 运行: python3 17-python-mysql-advanced.py --mode index
#       python3 17-python-mysql-advanced.py --mode join
#       python3 17-python-mysql-advanced.py --mode aggregate

import argparse
import sqlite3
import time
import random
from typing import Any


# ── 建库:多表结构 ──────────────────────────────────────────
SCHEMA = """
CREATE TABLE IF NOT EXISTS users (
    id         INTEGER PRIMARY KEY AUTOINCREMENT,
    username   TEXT    NOT NULL UNIQUE,
    role       TEXT    DEFAULT 'member',
    team       TEXT,
    created_at TEXT    DEFAULT (datetime('now','localtime'))
);

CREATE TABLE IF NOT EXISTS tasks (
    id          INTEGER PRIMARY KEY AUTOINCREMENT,
    title       TEXT    NOT NULL,
    priority    TEXT    DEFAULT 'medium',
    status      TEXT    DEFAULT 'pending',
    assignee_id INTEGER REFERENCES users(id),
    created_at  TEXT    DEFAULT (datetime('now','localtime')),
    due_date    TEXT
);

CREATE TABLE IF NOT EXISTS task_logs (
    id         INTEGER PRIMARY KEY AUTOINCREMENT,
    task_id    INTEGER REFERENCES tasks(id),
    user_id    INTEGER REFERENCES users(id),
    action     TEXT,
    detail     TEXT,
    logged_at  TEXT    DEFAULT (datetime('now','localtime'))
);
"""

USERS = [
    ("alice", "lead",   "backend"),
    ("bob",   "member", "backend"),
    ("carol", "member", "frontend"),
    ("dave",  "lead",   "frontend"),
    ("eve",   "member", "devops"),
]

PRIORITIES = ["low", "medium", "high"]
STATUSES   = ["pending", "doing", "done"]
ACTIONS    = ["created", "updated", "commented", "closed"]


def setup_db(n_tasks: int = 500) -> sqlite3.Connection:
    conn = sqlite3.connect(":memory:")
    conn.row_factory = sqlite3.Row
    conn.executescript(SCHEMA)

    conn.executemany(
        "INSERT INTO users (username, role, team) VALUES (?,?,?)", USERS
    )

    rng = random.Random(42)
    tasks = [
        (
            f"Task-{i:04d}",
            rng.choice(PRIORITIES),
            rng.choice(STATUSES),
            rng.randint(1, len(USERS)),
            f"2026-04-{rng.randint(1,30):02d}",
        )
        for i in range(1, n_tasks + 1)
    ]
    conn.executemany(
        "INSERT INTO tasks (title, priority, status, assignee_id, due_date) VALUES (?,?,?,?,?)",
        tasks,
    )

    logs = [
        (rng.randint(1, n_tasks), rng.randint(1, len(USERS)), rng.choice(ACTIONS), f"detail-{i}")
        for i in range(1, n_tasks * 3 + 1)
    ]
    conn.executemany(
        "INSERT INTO task_logs (task_id, user_id, action, detail) VALUES (?,?,?,?)",
        logs,
    )
    conn.commit()
    return conn


# ── 模式1:索引性能对比 ─────────────────────────────────────
def demo_index() -> None:
    conn = setup_db(5000)

    def timed_query(label: str, sql: str, params: tuple[Any, ...] = ()) -> float:
        t0 = time.perf_counter()
        rows = conn.execute(sql, params).fetchall()
        elapsed = (time.perf_counter() - t0) * 1000
        print(f"  {label:<30} {elapsed:>8.3f}ms  结果: {len(rows)} 行")
        return elapsed

    print("\n=== 索引性能对比(5000条任务)===\n")
    print(f"  {'查询':<30} {'耗时':>8}  {'说明'}")
    print("  " + "-" * 55)

    # 无索引
    t1 = timed_query("无索引: WHERE status='done'",
                     "SELECT * FROM tasks WHERE status='done'")
    t2 = timed_query("无索引: WHERE assignee_id=3",
                     "SELECT * FROM tasks WHERE assignee_id=?", (3,))

    # 创建索引
    conn.execute("CREATE INDEX idx_tasks_status ON tasks(status)")
    conn.execute("CREATE INDEX idx_tasks_assignee ON tasks(assignee_id)")
    conn.execute("CREATE INDEX idx_tasks_priority_status ON tasks(priority, status)")
    conn.commit()

    print()
    t3 = timed_query("有索引: WHERE status='done'",
                     "SELECT * FROM tasks WHERE status='done'")
    t4 = timed_query("有索引: WHERE assignee_id=3",
                     "SELECT * FROM tasks WHERE assignee_id=?", (3,))
    t5 = timed_query("复合索引: priority+status",
                     "SELECT * FROM tasks WHERE priority='high' AND status='pending'")

    print(f"\n  status 查询加速: {t1/t3:.1f}x" if t3 > 0 else "")
    print(f"  assignee 查询加速: {t2/t4:.1f}x" if t4 > 0 else "")

    # EXPLAIN QUERY PLAN
    print("\n=== EXPLAIN QUERY PLAN(索引使用情况)===")
    for row in conn.execute(
        "EXPLAIN QUERY PLAN SELECT * FROM tasks WHERE status='done'"
    ):
        print(f"  {dict(row)}")

    conn.close()


# ── 模式2:JOIN 连接查询 ────────────────────────────────────
def demo_join() -> None:
    conn = setup_db(100)

    print("\n=== JOIN 连接查询演示 ===")

    # INNER JOIN:任务+负责人
    print("\n── INNER JOIN:任务与负责人 ──")
    print(f"{'任务ID':<8} {'标题':<20} {'负责人':<10} {'团队':<10} {'状态'}")
    print("-" * 60)
    rows = conn.execute("""
        SELECT t.id, t.title, u.username, u.team, t.status
        FROM tasks t
        INNER JOIN users u ON t.assignee_id = u.id
        WHERE t.priority = 'high'
        ORDER BY t.id
        LIMIT 8
    """).fetchall()
    for r in rows:
        print(f"{r['id']:<8} {r['title']:<20} {r['username']:<10} {r['team']:<10} {r['status']}")

    # LEFT JOIN:含未分配任务
    unassigned = conn.execute("""
        SELECT COUNT(*) as cnt FROM tasks t
        LEFT JOIN users u ON t.assignee_id = u.id
        WHERE u.id IS NULL
    """).fetchone()
    print(f"\nLEFT JOIN 发现未分配任务: {unassigned['cnt']} 条")

    # 三表 JOIN:任务+负责人+操作日志
    print("\n── 三表 JOIN:任务操作历史 ──")
    print(f"{'任务':<15} {'操作人':<10} {'动作':<12} {'时间'}")
    print("-" * 55)
    rows = conn.execute("""
        SELECT t.title, u.username, l.action, l.logged_at
        FROM task_logs l
        INNER JOIN tasks t ON l.task_id = t.id
        INNER JOIN users u ON l.user_id = u.id
        ORDER BY l.id DESC
        LIMIT 6
    """).fetchall()
    for r in rows:
        print(f"{r['title']:<15} {r['username']:<10} {r['action']:<12} {r['logged_at']}")

    conn.close()


# ── 模式3:聚合统计 ─────────────────────────────────────────
def demo_aggregate() -> None:
    conn = setup_db(200)

    print("\n=== 聚合统计演示 ===")

    # 按团队统计任务完成率
    print("\n── 按团队统计任务完成率 ──")
    print(f"{'团队':<12} {'总任务':<8} {'已完成':<8} {'完成率':<8} {'高优先级'}")
    print("-" * 50)
    rows = conn.execute("""
        SELECT
            u.team,
            COUNT(t.id)                                    AS total,
            SUM(CASE WHEN t.status='done' THEN 1 ELSE 0 END) AS done_cnt,
            ROUND(SUM(CASE WHEN t.status='done' THEN 1.0 ELSE 0 END) / COUNT(t.id) * 100, 1) AS done_pct,
            SUM(CASE WHEN t.priority='high' THEN 1 ELSE 0 END) AS high_cnt
        FROM tasks t
        INNER JOIN users u ON t.assignee_id = u.id
        GROUP BY u.team
        ORDER BY done_pct DESC
    """).fetchall()
    for r in rows:
        bar = "█" * int(r["done_pct"] / 10)
        print(f"{r['team']:<12} {r['total']:<8} {r['done_cnt']:<8} {r['done_pct']:<6}%  {bar}")

    # 子查询:找出任务数超过平均值的成员
    print("\n── 子查询:任务数超过平均值的成员 ──")
    print(f"{'成员':<10} {'角色':<8} {'任务数':<8} {'vs平均'}")
    print("-" * 40)
    rows = conn.execute("""
        SELECT u.username, u.role, COUNT(t.id) AS task_cnt
        FROM users u
        LEFT JOIN tasks t ON t.assignee_id = u.id
        GROUP BY u.id
        HAVING task_cnt > (
            SELECT AVG(cnt) FROM (
                SELECT COUNT(*) AS cnt FROM tasks GROUP BY assignee_id
            )
        )
        ORDER BY task_cnt DESC
    """).fetchall()
    avg_row = conn.execute(
        "SELECT AVG(cnt) as avg FROM (SELECT COUNT(*) as cnt FROM tasks GROUP BY assignee_id)"
    ).fetchone()
    avg = avg_row["avg"] if avg_row else 0
    for r in rows:
        diff = r["task_cnt"] - avg
        print(f"{r['username']:<10} {r['role']:<8} {r['task_cnt']:<8} +{diff:.1f}")

    # 窗口函数替代:按优先级分组统计
    print("\n── 按优先级×状态交叉统计 ──")
    print(f"{'优先级':<10} {'pending':<10} {'doing':<10} {'done':<10} {'合计'}")
    print("-" * 50)
    pivot: dict[str, dict[str, int]] = {}
    for row in conn.execute("SELECT priority, status, COUNT(*) as cnt FROM tasks GROUP BY priority, status"):
        pivot.setdefault(row["priority"], {})[row["status"]] = row["cnt"]
    for pri in ["high", "medium", "low"]:
        d = pivot.get(pri, {})
        p, do, dn = d.get("pending", 0), d.get("doing", 0), d.get("done", 0)
        print(f"{pri:<10} {p:<10} {do:<10} {dn:<10} {p+do+dn}")

    conn.close()


def main() -> None:
    parser = argparse.ArgumentParser(description="数据库进阶演示")
    parser.add_argument(
        "--mode",
        choices=["index", "join", "aggregate"],
        default="aggregate",
        help="index=索引性能对比, join=连接查询, aggregate=聚合统计",
    )
    args = parser.parse_args()

    if args.mode == "index":
        demo_index()
    elif args.mode == "join":
        demo_join()
    else:
        demo_aggregate()


if __name__ == "__main__":
    main()
$ python3 17-python-mysql-advanced.py --mode index

=== 索引性能对比(5000条任务)===

  查询                              耗时    说明
  -------------------------------------------------------
  无索引: WHERE status='done'       2.341ms  结果: 1672  无索引: WHERE assignee_id=3       1.987ms  结果: 1003
  有索引: WHERE status='done'       0.312ms  结果: 1672  有索引: WHERE assignee_id=3       0.198ms  结果: 1003  复合索引: priority+status         0.156ms  结果: 556
  status 查询加速: 7.5x
  assignee 查询加速: 10.0x

=== EXPLAIN QUERY PLAN(索引使用情况)===
  {'id': 3, 'parent': 0, 'notused': 0, 'detail': 'SEARCH tasks USING INDEX idx_tasks_status (status=?)'}

$ python3 17-python-mysql-advanced.py --mode join

=== JOIN 连接查询演示 ===

── INNER JOIN:任务与负责人 ──
任务ID   标题                   负责人      团队        状态
------------------------------------------------------------
3        Task-0003              carol      frontend   high
7        Task-0007              alice      backend    high
...

LEFT JOIN 发现未分配任务: 0
── 三表 JOIN:任务操作历史 ──
任务              操作人      动作          时间
-------------------------------------------------------
Task-0098        bob        closed       2026-04-16 ...
Task-0076        alice      updated      2026-04-16 ...

$ python3 17-python-mysql-advanced.py --mode aggregate

=== 聚合统计演示 ===

── 按团队统计任务完成率 ──
团队           总任务      已完成      完成率      高优先级
--------------------------------------------------
backend      90       28       31.1  %  ███
devops       42       12       28.6  %  ██
frontend     68       18       26.5  %  ██

── 子查询:任务数超过平均值的成员 ──
成员         角色       任务数      vs平均
----------------------------------------
bob        member   46       +6.0
alice      lead     44       +4.0

── 按优先级×状态交叉统计 ──
优先级        pending    doing      done       合计
--------------------------------------------------
high       22         18         27         67
medium     31         24         19         74
low        18         22         19         59

小结

概念 一句话记忆
B+树索引 给列建目录,查找从 O(n) 变 O(log n),写入变慢
CREATE INDEX 建索引;EXPLAIN QUERY PLAN 验证是否生效
复合索引 遵循最左前缀原则,(a,b) 索引能加速 WHERE a=?WHERE a=? AND b=?
INNER JOIN 取交集,两边都有才出现,无匹配行被过滤
LEFT JOIN 保留左表所有行,右边无匹配填 NULL
GROUP BY 分组聚合;WHERE 在分组前过滤,HAVING 在分组后过滤
CASE WHEN SQL 里的条件计数,相当于 sum(1 for x if condition)
子查询 先算内层结果,外层再用,适合"超过平均值"类问题

⏱ NexDo Time(5 分钟)

挑战:用 EXPLAIN QUERY PLAN 亲手验证"索引失效"场景——在 WHERE 子句里对列做函数运算,看查询计划是否从 USING INDEX 退化为 SCAN TABLE

具体步骤:

  1. tasks 表的 due_date 列建索引:CREATE INDEX idx_due ON tasks(due_date)
  2. EXPLAIN QUERY PLAN 跑两条查询,对比输出:
    • ✅ 索引生效:WHERE due_date >= '2026-04-15'
    • ❌ 索引失效:WHERE substr(due_date, 1, 7) = '2026-04'(对列做了函数运算)
  3. 把两条查询的 EXPLAIN 输出打印出来,找到 SCANSEARCH 的区别

Don’t wait for next time, do it in the next moment.