文章

20 · Django ORM:模型设计与数据迁移

#019 · 2026-04-16 · Python

🔗 知识图谱导航:阅读本文前,建议先掌握《19 · Django 引擎:MVC、路由与视图》中的路由分发机制,以及《16 · 数据库底座:SQLite 核心操作》中的 SQL 基础——本文把这两块拼在一起,用 ORM 让你用 Python 对象操作数据库,告别手写 SQL。

运行环境:Python 3.12+ 标准库,零额外依赖,直接运行。本文用纯 Python 复现 Django ORM 的核心机制,理解后再用真实 Django 会事半功倍。

极客解析:Django ORM 的"魔法"本质上是三件事:① 元类 ModelMeta 在类定义时自动收集字段;② Model.save() 根据主键是否存在自动切换 INSERT/UPDATE;③ QuerySet 是惰性查询构建器,链式调用不执行 SQL,只有 .all() 才真正查数据库。

ORM 映射原理

┌──────────────────────────────────────────────────────────────┐
│                        ORM 映射层                             │
├──────────────┬───────────────────┬───────────────────────────┤
│  Python 概念 │  ORM 映射         │  SQL 概念                 │
├──────────────┼───────────────────┼───────────────────────────┤
│  类          │  → 映射 →         │  表(Table)              │
│  类属性      │  → 映射 →         │  列(Column)             │
│  实例        │  → 映射 →         │  行(Row)                │
│  QuerySet    │  → 映射 →         │  SELECT 语句              │
│  .save()     │  → 映射 →         │  INSERT / UPDATE          │
│  .delete()   │  → 映射 →         │  DELETE                   │
│  ForeignKey  │  → 映射 →         │  外键约束                 │
└──────────────┴───────────────────┴───────────────────────────┘

常用字段类型速查

CharField(max_length)   短字符串
TextField               长文本
IntegerField            整数
BooleanField            True/False
DateTimeField           日期时间,auto_now_add=True 自动填充创建时间
ForeignKey(to, on_delete)  多对一关联,on_delete 必填
ManyToManyField(to)     多对多关联,自动创建中间表

迁移命令速查

python manage.py makemigrations   # 检测 models.py 变化,生成迁移文件
python manage.py migrate          # 执行迁移
python manage.py sqlmigrate tasks 0001  # 查看将要执行的 SQL
python manage.py showmigrations   # 查看迁移状态

步步为营:核心逻辑自适应拆解

这一篇的核心是 Django ORM 的三层结构:元类自动收集字段 → Model.save() 自动切换 INSERT/UPDATE → QuerySet 惰性链式查询。下面每一步都聚焦一个机制,零依赖可直接运行。

Step 1:用 ModelMeta 元类自动收集字段定义

痛点与机制

ModelMeta 是元类——它在"类被定义的那一刻"就介入,扫描所有 Field 类型的属性,收集到 _fields 字典,同时把类名小写作为表名存入 _table。这正是 Django Model 的核心魔法:你只需声明 title = CharField(max_length=200),框架自动知道这张表有一列叫 title,类型是 TEXT。元类就像"工厂的质检员"——每个产品(类)出厂前都会被检查一遍,把规格(字段)记录在案。

核心源码(逐字来自文末完整源码)

class ModelMeta(type):
    def __new__(mcs, name: str, bases: tuple, namespace: dict):
        fields: Dict[str, Field] = {}
        for k, v in list(namespace.items()):
            if isinstance(v, Field):
                fields[k] = v
        namespace["_fields"] = fields
        namespace["_table"] = name.lower()
        cls = super().__new__(mcs, name, bases, namespace)
        return cls

可运行演示(补齐 Mock 数据与 print 反馈)

from typing import Any, Dict

class Field:
    def __init__(self, col_type: str, primary_key: bool = False,
                 nullable: bool = True, default: Any = None):
        self.col_type = col_type
        self.primary_key = primary_key
        self.nullable = nullable
        self.default = default


class IntField(Field):
    def __init__(self, primary_key: bool = False, default: int = 0):
        super().__init__("INTEGER", primary_key=primary_key, default=default)


class CharField(Field):
    def __init__(self, max_length: int = 255, nullable: bool = True, default: str = ""):
        super().__init__("TEXT", nullable=nullable, default=default)


class ModelMeta(type):
    def __new__(mcs, name: str, bases: tuple, namespace: dict):
        fields: Dict[str, Field] = {}
        for k, v in list(namespace.items()):
            if isinstance(v, Field):
                fields[k] = v
        namespace["_fields"] = fields
        namespace["_table"] = name.lower()
        cls = super().__new__(mcs, name, bases, namespace)
        return cls


class Model(metaclass=ModelMeta):
    pass



class Project(Model):
    id = IntField(primary_key=True)
    name = CharField(nullable=False, default="")
    description = CharField(default="")


print("表名:", Project._table)
print("字段:", list(Project._fields.keys()))
print("id 是否主键:", Project._fields["id"].primary_key)
print("name 的数据库类型:", Project._fields["name"].col_type)

Step 2:用 Model.save() 实现自动 INSERT/UPDATE 切换

痛点与机制

save() 的逻辑就像"快递员送包裹":如果包裹有单号(主键不为 0),就去更新已有记录(UPDATE);如果没有单号,就新建一条记录(INSERT)并把数据库分配的 lastrowid 写回实例。create_table() 根据 _fields 字典动态生成 CREATE TABLE SQL——这就是 Django makemigrations 的简化版原理。

核心源码(逐字来自文末完整源码)

class Model(metaclass=ModelMeta):
    _fields: Dict[str, Field] = {}
    _table: str = ""

    def __init__(self, **kwargs):
        for name, f in self._fields.items():
            setattr(self, name, kwargs.get(name, f.default))

    @classmethod
    def create_table(cls) -> None:
        cols = []
        for name, f in cls._fields.items():
            col = f"{name} {f.col_type}"
            if f.primary_key:
                col += " PRIMARY KEY AUTOINCREMENT"
            elif not f.nullable:
                col += " NOT NULL"
            cols.append(col)
        sql = f"CREATE TABLE IF NOT EXISTS {cls._table} ({', '.join(cols)})"
        _conn.execute(sql)
        _conn.commit()

    def save(self) -> "Model":
        pk_field = next((n for n, f in self._fields.items() if f.primary_key), None)
        data = {n: getattr(self, n) for n in self._fields if n != pk_field}
        # 布尔值 → int
        data = {k: int(v) if isinstance(v, bool) else v for k, v in data.items()}

        if pk_field and getattr(self, pk_field):
            sets = ", ".join(f"{k}=?" for k in data)
            _conn.execute(
                f"UPDATE {self._table} SET {sets} WHERE {pk_field}=?",
                list(data.values()) + [getattr(self, pk_field)]
            )
        else:
            cols = ", ".join(data.keys())
            placeholders = ", ".join("?" * len(data))
            cur = _conn.execute(
                f"INSERT INTO {self._table} ({cols}) VALUES ({placeholders})",
                list(data.values())
            )
            if pk_field:
                setattr(self, pk_field, cur.lastrowid)
        _conn.commit()
        return self

    @classmethod
    def objects(cls: Type[T]) -> "QuerySet[T]":
        return QuerySet(cls)

可运行演示(补齐 Mock 数据与 print 反馈)

import sqlite3
from typing import Any, Dict, Type, TypeVar

T = TypeVar("T", bound="Model")
_conn = sqlite3.connect(":memory:")
_conn.row_factory = sqlite3.Row

class Field:
    def __init__(self, col_type: str, primary_key: bool = False,
                 nullable: bool = True, default: Any = None):
        self.col_type = col_type
        self.primary_key = primary_key
        self.nullable = nullable
        self.default = default

class IntField(Field):
    def __init__(self, primary_key: bool = False, default: int = 0):
        super().__init__("INTEGER", primary_key=primary_key, default=default)

class CharField(Field):
    def __init__(self, max_length: int = 255, nullable: bool = True, default: str = ""):
        super().__init__("TEXT", nullable=nullable, default=default)

class ModelMeta(type):
    def __new__(mcs, name: str, bases: tuple, namespace: dict):
        namespace["_fields"] = {k: v for k, v in namespace.items() if isinstance(v, Field)}
        namespace["_table"] = name.lower()
        return super().__new__(mcs, name, bases, namespace)

class Model(metaclass=ModelMeta):
    _fields: Dict[str, Field] = {}
    _table: str = ""

    def __init__(self, **kwargs):
        for name, f in self._fields.items():
            setattr(self, name, kwargs.get(name, f.default))

    @classmethod
    def create_table(cls) -> None:
        cols = []
        for name, f in cls._fields.items():
            col = f"{name} {f.col_type}"
            if f.primary_key:
                col += " PRIMARY KEY AUTOINCREMENT"
            elif not f.nullable:
                col += " NOT NULL"
            cols.append(col)
        sql = f"CREATE TABLE IF NOT EXISTS {cls._table} ({', '.join(cols)})"
        print("建表 SQL:", sql)
        _conn.execute(sql)
        _conn.commit()

    def save(self) -> "Model":
        pk_field = next((n for n, f in self._fields.items() if f.primary_key), None)
        data = {n: getattr(self, n) for n in self._fields if n != pk_field}
        if pk_field and getattr(self, pk_field):
            sets = ", ".join(f"{k}=?" for k in data)
            print("执行 UPDATE,主键:", getattr(self, pk_field))
            _conn.execute(f"UPDATE {self._table} SET {sets} WHERE {pk_field}=?", list(data.values()) + [getattr(self, pk_field)])
        else:
            cols = ", ".join(data.keys())
            placeholders = ", ".join("?" * len(data))
            print("执行 INSERT,数据:", data)
            cur = _conn.execute(f"INSERT INTO {self._table} ({cols}) VALUES ({placeholders})", list(data.values()))
            if pk_field:
                setattr(self, pk_field, cur.lastrowid)
        _conn.commit()
        return self

class Project(Model):
    id = IntField(primary_key=True)
    name = CharField(nullable=False, default="")
    description = CharField(default="")

Project.create_table()
project = Project(name="小白 ORM", description="第一次保存")
project.save()
print("保存后数据库分配 id:", project.id)
project.description = "第二次保存会更新同一行"
project.save()
row = _conn.execute("SELECT id, name, description FROM project").fetchone()
print("数据库当前行:", dict(row))

Step 3:用 QuerySet 理解惰性查询——链式调用不执行 SQL

痛点与机制

QuerySet 是"点餐单"而不是"菜":每次 filter()/order_by()/limit() 都只是在点餐单上加一行要求,返回一个新的 QuerySet,不碰数据库。只有调用 .all()/.first()/.count() 时,才把点餐单翻译成 SQL 发给数据库。这个"惰性求值"设计让你可以把查询条件分散在代码各处,最后一次性执行,避免了多次数据库往返。

核心源码(逐字来自文末完整源码)

class QuerySet:
    def __init__(self, model: Type[Model]):
        self._model = model
        self._filters: List[Tuple[str, Any]] = []
        self._order: Optional[str] = None
        self._limit: Optional[int] = None

    def filter(self, **kwargs) -> "QuerySet":
        qs = QuerySet(self._model)
        qs._filters = list(self._filters) + list(kwargs.items())
        qs._order = self._order
        qs._limit = self._limit
        return qs

    def order_by(self, col: str) -> "QuerySet":
        qs = QuerySet(self._model)
        qs._filters = list(self._filters)
        qs._order = col
        qs._limit = self._limit
        return qs

    def limit(self, n: int) -> "QuerySet":
        qs = QuerySet(self._model)
        qs._filters = list(self._filters)
        qs._order = self._order
        qs._limit = n
        return qs

    def _build_sql(self) -> Tuple[str, List]:
        sql = f"SELECT * FROM {self._model._table}"
        params: List = []
        if self._filters:
            clauses = []
            for k, v in self._filters:
                if k.endswith("__contains"):
                    col = k[:-10]
                    clauses.append(f"{col} LIKE ?")
                    params.append(f"%{v}%")
                else:
                    clauses.append(f"{k}=?")
                    params.append(int(v) if isinstance(v, bool) else v)
            sql += " WHERE " + " AND ".join(clauses)
        if self._order:
            desc = self._order.startswith("-")
            col = self._order.lstrip("-")
            sql += f" ORDER BY {col} {'DESC' if desc else 'ASC'}"
        if self._limit:
            sql += f" LIMIT {self._limit}"
        return sql, params

    def all(self) -> List[Model]:
        sql, params = self._build_sql()
        rows = _conn.execute(sql, params).fetchall()
        return [self._model(**dict(r)) for r in rows]

    def first(self) -> Optional[Model]:
        results = self.limit(1).all()
        return results[0] if results else None

    def count(self) -> int:
        sql, params = self._build_sql()
        count_sql = f"SELECT COUNT(*) FROM ({sql})"
        return _conn.execute(count_sql, params).fetchone()[0]

可运行演示(补齐 Mock 数据与 print 反馈)

from typing import Any, List, Optional, Tuple

class FakeModel:
    _table = "task"

class QuerySet:
    def __init__(self, model):
        self._model = model
        self._filters: List[Tuple[str, Any]] = []
        self._order: Optional[str] = None
        self._limit: Optional[int] = None

    def filter(self, **kwargs) -> "QuerySet":
        qs = QuerySet(self._model)
        qs._filters = list(self._filters) + list(kwargs.items())
        qs._order = self._order
        qs._limit = self._limit
        return qs

    def order_by(self, col: str) -> "QuerySet":
        qs = QuerySet(self._model)
        qs._filters = list(self._filters)
        qs._order = col
        qs._limit = self._limit
        return qs

    def limit(self, n: int) -> "QuerySet":
        qs = QuerySet(self._model)
        qs._filters = list(self._filters)
        qs._order = self._order
        qs._limit = n
        return qs

    def _build_sql(self) -> Tuple[str, List]:
        sql = f"SELECT * FROM {self._model._table}"
        params: List = []
        if self._filters:
            clauses = []
            for k, v in self._filters:
                clauses.append(f"{k}=?")
                params.append(int(v) if isinstance(v, bool) else v)
            sql += " WHERE " + " AND ".join(clauses)
        if self._order:
            desc = self._order.startswith("-")
            col = self._order.lstrip("-")
            sql += f" ORDER BY {col} {'DESC' if desc else 'ASC'}"
        if self._limit:
            sql += f" LIMIT {self._limit}"
        return sql, params


# 链式调用只是“攒条件”,像先写购物清单,还没去收银台。
qs1 = QuerySet(FakeModel)
qs2 = qs1.filter(done=False)
qs3 = qs2.order_by("-priority").limit(2)
print("原始 QuerySet 条件:", qs1._filters)
print("新 QuerySet 条件:", qs3._filters)
print("真正要执行的 SQL:", qs3._build_sql()[0])
print("SQL 参数:", qs3._build_sql()[1])

Step 4:用链式 filter/order_by/limit 组合查询条件

痛点与机制

链式调用的关键是"每步返回新对象":filter() 不修改原 QuerySet,而是复制一份再加条件。这样 qs1 = Task.objects().filter(done=False)qs2 = qs1.filter(priority=3) 是两个独立的查询,互不干扰。__contains 是 Django 的查询语法糖,对应 SQL 的 LIKE %value%-priority 前缀的负号表示降序,对应 SQL 的 ORDER BY priority DESC

核心源码(逐字来自文末完整源码)

def demo_query() -> None:
    print("\n=== 链式查询演示 ===")
    cases = [
        ("全部任务数量",        Task.objects().count()),
        ("未完成任务数量",      Task.objects().filter(done=False).count()),
        ("项目1的任务数量",     Task.objects().filter(project_id=1).count()),
        ("优先级>=2的未完成",   Task.objects().filter(done=False, priority=2).count()),
    ]
    print(f"\n{'─'*45}")
    print(f"  {'查询描述':<25} {'结果'}")
    print(f"{'─'*45}")
    for desc, result in cases:
        print(f"  {desc:<25} {result}")
    print(f"{'─'*45}")

可运行演示(补齐 Mock 数据与 print 反馈)

import sqlite3
from typing import Any, List, Optional, Tuple

_conn = sqlite3.connect(":memory:")
_conn.row_factory = sqlite3.Row
_conn.execute("CREATE TABLE task (id INTEGER PRIMARY KEY, title TEXT, done INTEGER, priority INTEGER)")
_conn.executemany("INSERT INTO task VALUES (?, ?, ?, ?)", [
    (1, "写 ORM 章节", 0, 3),
    (2, "写单元测试", 1, 2),
    (3, "发布 v1.0", 0, 5),
    (4, "修复页面样式", 0, 1),
])

class Task:
    _table = "task"

class QuerySet:
    def __init__(self, model):
        self._model = model
        self._filters: List[Tuple[str, Any]] = []
        self._order: Optional[str] = None
        self._limit: Optional[int] = None

    def filter(self, **kwargs) -> "QuerySet":
        qs = QuerySet(self._model)
        qs._filters = list(self._filters) + list(kwargs.items())
        qs._order = self._order
        qs._limit = self._limit
        return qs

    def order_by(self, col: str) -> "QuerySet":
        qs = QuerySet(self._model)
        qs._filters = list(self._filters)
        qs._order = col
        qs._limit = self._limit
        return qs

    def limit(self, n: int) -> "QuerySet":
        qs = QuerySet(self._model)
        qs._filters = list(self._filters)
        qs._order = self._order
        qs._limit = n
        return qs

    def _build_sql(self) -> Tuple[str, List]:
        sql = f"SELECT * FROM {self._model._table}"
        params: List = []
        if self._filters:
            clauses = []
            for k, v in self._filters:
                if k.endswith("__contains"):
                    clauses.append(f"{k[:-10]} LIKE ?")
                    params.append(f"%{v}%")
                else:
                    clauses.append(f"{k}=?")
                    params.append(int(v) if isinstance(v, bool) else v)
            sql += " WHERE " + " AND ".join(clauses)
        if self._order:
            desc = self._order.startswith("-")
            sql += f" ORDER BY {self._order.lstrip('-')} {'DESC' if desc else 'ASC'}"
        if self._limit:
            sql += f" LIMIT {self._limit}"
        return sql, params

    def all(self) -> List[sqlite3.Row]:
        sql, params = self._build_sql()
        print("执行 SQL:", sql)
        print("参数:", params)
        return _conn.execute(sql, params).fetchall()


rows = QuerySet(Task).filter(done=False).filter(title__contains="写").order_by("-priority").limit(2).all()
for row in rows:
    print(f"#{row['id']} {row['title']} priority={row['priority']}")

Step 5:用 print_tasks 格式化展示 QuerySet 结果

痛点与机制

print_tasksQuerySet.all() 返回的模型实例列表格式化成对齐表格。t.done 是布尔值,但数据库里存的是 0/1 整数——sqlite3.Row 返回的是整数,所以 if t.done 等价于 if t.done != 0,Python 的真值判断自动处理了这个转换。f"{t.id:<4}":<4 是左对齐宽度4,让表格列整齐。

核心源码(逐字来自文末完整源码)

def print_tasks(title: str, tasks: List[Task]) -> None:
    print(f"\n{'─'*60}")
    print(f"  {title}(共 {len(tasks)} 条)")
    print(f"{'─'*60}")
    print(f"  {'ID':<4} {'标题':<20} {'完成':<6} {'优先级':<6} {'项目ID'}")
    print(f"  {'─'*4} {'─'*20} {'─'*6} {'─'*6} {'─'*6}")
    for t in tasks:
        done_icon = "✅" if t.done else "⬜"
        print(f"  {t.id:<4} {t.title:<20} {done_icon:<6} {t.priority:<6} {t.project_id}")
    print(f"{'─'*60}")

可运行演示(补齐 Mock 数据与 print 反馈)

from dataclasses import dataclass
from typing import List

@dataclass
class Task:
    id: int
    title: str
    done: bool
    priority: int


def print_tasks(title: str, tasks: List[Task]) -> None:
    print(f"\n{'─'*60}")
    print(f"  {title}")
    print(f"{'─'*60}")
    for t in tasks:
        icon = "✅" if t.done else "⬜"
        print(f"  [{t.id:>2}] {icon} P{t.priority} {t.title}")
    print(f"{'─'*60}")


# 格式化输出不是花活:新手能一眼分清 id、状态、优先级和标题。
tasks = [
    Task(1, "写 ORM 章节", False, 3),
    Task(2, "写单元测试", True, 2),
    Task(3, "发布 v1.0", False, 5),
]
print_tasks("任务列表", tasks)

Step 6:用 demo_relations 模拟 ForeignKey 反向查询

痛点与机制

ForeignKey 在数据库层面只是一个整数列(project_id),ORM 的"魔法"是在 Python 层面提供反向查询接口。Django 里 project.task_set.all() 底层就是 Task.objects.filter(project=project)。这里手动写 Task.objects().filter(project_id=proj.id) 演示了同样的逻辑,让你看清"反向查询"不是魔法,只是一次带外键过滤的普通查询。N+1 问题也在这里体现:每个项目都发一次查询,项目多了就会很慢——Django 的 select_related() 就是为了解决这个问题。

核心源码(逐字来自文末完整源码)

def demo_relations() -> None:
    print("\n=== 关联查询演示(模拟 select_related) ===")

    projects = Project.objects().all()
    print(f"\n{'─'*60}")
    print(f"  {'项目':<15} {'任务数':<8} {'任务列表'}")
    print(f"{'─'*60}")
    for proj in projects:
        tasks = Task.objects().filter(project_id=proj.id).all()
        task_titles = ", ".join(t.title for t in tasks)
        print(f"  {proj.name:<15} {len(tasks):<8} {task_titles}")
    print(f"{'─'*60}")

    # 模拟 ForeignKey 反向查询
    print("\n  反向查询:找出每个项目的未完成任务")
    print(f"{'─'*60}")
    for proj in projects:
        pending = Task.objects().filter(project_id=proj.id, done=False).all()
        print(f"  [{proj.name}] 待办 {len(pending)} 项:{[t.title for t in pending]}")
    print(f"{'─'*60}")

可运行演示(补齐 Mock 数据与 print 反馈)

from dataclasses import dataclass
from typing import List

@dataclass
class Project:
    id: int
    name: str

@dataclass
class Task:
    id: int
    project_id: int
    title: str
    done: bool

projects = [Project(1, "个人博客"), Project(2, "自动化脚本")]
tasks = [
    Task(1, 1, "写 ORM 章节", False),
    Task(2, 1, "发布 v1.0", False),
    Task(3, 2, "写部署脚本", True),
]

# ForeignKey 本质上就是一列 project_id:任务用这个 id 指向所属项目。
for project in projects:
    related_tasks: List[Task] = [task for task in tasks if task.project_id == project.id]
    todo_titles = [task.title for task in related_tasks if not task.done]
    print(f"项目 {project.name}: 总任务={len(related_tasks)}, 待办={todo_titles}")

Step 7:用 demo_query 展示链式统计查询

痛点与机制

demo_query 把常见的统计场景列成表格:总数、按条件过滤的数量、多条件组合。count()_build_sql() 生成的 SELECT 包一层 SELECT COUNT(*) FROM (...) 子查询,这是不改变原查询逻辑就能统计数量的标准技巧。Django 的 QuerySet.count() 底层也是类似的 SELECT COUNT(*) 优化,而不是先 SELECT *len()

核心源码(逐字来自文末完整源码)

def demo_query() -> None:
    print("\n=== 链式查询演示 ===")
    cases = [
        ("全部任务数量",        Task.objects().count()),
        ("未完成任务数量",      Task.objects().filter(done=False).count()),
        ("项目1的任务数量",     Task.objects().filter(project_id=1).count()),
        ("优先级>=2的未完成",   Task.objects().filter(done=False, priority=2).count()),
    ]
    print(f"\n{'─'*45}")
    print(f"  {'查询描述':<25} {'结果'}")
    print(f"{'─'*45}")
    for desc, result in cases:
        print(f"  {desc:<25} {result}")
    print(f"{'─'*45}")

可运行演示(补齐 Mock 数据与 print 反馈)

import sqlite3

_conn = sqlite3.connect(":memory:")
_conn.row_factory = sqlite3.Row
_conn.execute("CREATE TABLE task (id INTEGER PRIMARY KEY, title TEXT, done INTEGER, priority INTEGER)")
_conn.executemany("INSERT INTO task VALUES (?, ?, ?, ?)", [
    (1, "写 ORM 章节", 0, 3),
    (2, "写单元测试", 1, 2),
    (3, "发布 v1.0", 0, 5),
    (4, "写部署文档", 0, 4),
])

# 这一步模拟 demo_query:先过滤未完成,再按优先级倒序,最后取前 3 条。
sql = "SELECT * FROM task WHERE done=? ORDER BY priority DESC LIMIT 3"
params = [0]
rows = _conn.execute(sql, params).fetchall()
print("=== 链式查询演示 ===")
print("等价 SQL:", sql)
print("未完成任务数:", _conn.execute("SELECT COUNT(*) FROM task WHERE done=0").fetchone()[0])
for row in rows:
    print(f"P{row['priority']} - {row['title']}")

Step 8:用 main 做 demo/query/relations 三种模式的 CLI 总入口

痛点与机制

mainargparse 做 CLI 入口,三种模式对应三个观察角度:demo 看基础 CRUD 和过滤,query 看链式统计,relations 看外键关联查询。setup_db()main 里调用而不是模块级,避免 import 时就执行数据库操作——这是 Python 模块设计的好习惯,也是 Django 的 AppConfig.ready() 的设计思路。

核心源码(逐字来自文末完整源码)

def main() -> None:
    parser = argparse.ArgumentParser(description="Django ORM 模拟演示(sqlite3 内存库)")
    parser.add_argument("--mode", choices=["demo", "query", "relations"], default="demo",
                        help="demo=基础查询, query=链式统计, relations=关联查询")
    args = parser.parse_args()

    setup_db()

    if args.mode == "demo":
        demo_basic()
    elif args.mode == "query":
        demo_query()
    else:
        demo_relations()

可运行演示(补齐 Mock 数据与 print 反馈)

import argparse
import sys


def demo_basic() -> None:
    print("demo 模式: 建表、插入、更新、基础查询")


def demo_query() -> None:
    print("query 模式: filter/order_by/limit/count 链式查询")


def demo_relations() -> None:
    print("relations 模式: 用 project_id 模拟 ForeignKey 关联")


def main() -> None:
    parser = argparse.ArgumentParser(description="Django ORM 核心机制演示(零依赖)")
    parser.add_argument("--mode", choices=["demo", "query", "relations"], default="demo",
                        help="demo=基础CRUD, query=链式查询, relations=关联查询")
    args = parser.parse_args()

    if args.mode == "query":
        demo_query()
    elif args.mode == "relations":
        demo_relations()
    else:
        demo_basic()


for mode in ["demo", "query", "relations"]:
    sys.argv = ["prog", "--mode", mode]
    print(f">>> python3 20-python-django-orm.py --mode {mode}")
    main()

极客实战:完整源码与运行

现在,把上面的积木拼起来,将以下完整代码放进你的编辑器,运行它。先看整体闭环,再回头逐段改参数,你会更容易建立工程直觉。

#!/usr/bin/env python3
"""
20-django-orm.py
用 sqlite3 内存库实现简化版 ORM 映射层
演示模型定义、QuerySet 链式查询、关联查询

用法:
  python3 20-django-orm.py --mode demo
  python3 20-django-orm.py --mode query
  python3 20-django-orm.py --mode relations
"""

import argparse
import sqlite3
from dataclasses import dataclass, field
from typing import Any, Dict, List, Optional, Tuple, Type, TypeVar

# ─── 简化版 ORM 核心 ──────────────────────────────────────────────────────────

T = TypeVar("T", bound="Model")

_conn = sqlite3.connect(":memory:")
_conn.row_factory = sqlite3.Row


class Field:
    def __init__(self, col_type: str, primary_key: bool = False,
                 nullable: bool = True, default: Any = None):
        self.col_type = col_type
        self.primary_key = primary_key
        self.nullable = nullable
        self.default = default


class IntField(Field):
    def __init__(self, primary_key: bool = False, default: int = 0):
        super().__init__("INTEGER", primary_key=primary_key, default=default)


class CharField(Field):
    def __init__(self, max_length: int = 255, nullable: bool = True, default: str = ""):
        super().__init__(f"TEXT", nullable=nullable, default=default)


class BoolField(Field):
    def __init__(self, default: bool = False):
        super().__init__("INTEGER", default=int(default))


class ForeignKey(Field):
    def __init__(self, to: str):
        super().__init__("INTEGER", nullable=True)
        self.to = to


class ModelMeta(type):
    def __new__(mcs, name: str, bases: tuple, namespace: dict):
        fields: Dict[str, Field] = {}
        for k, v in list(namespace.items()):
            if isinstance(v, Field):
                fields[k] = v
        namespace["_fields"] = fields
        namespace["_table"] = name.lower()
        cls = super().__new__(mcs, name, bases, namespace)
        return cls


class Model(metaclass=ModelMeta):
    _fields: Dict[str, Field] = {}
    _table: str = ""

    def __init__(self, **kwargs):
        for name, f in self._fields.items():
            setattr(self, name, kwargs.get(name, f.default))

    @classmethod
    def create_table(cls) -> None:
        cols = []
        for name, f in cls._fields.items():
            col = f"{name} {f.col_type}"
            if f.primary_key:
                col += " PRIMARY KEY AUTOINCREMENT"
            elif not f.nullable:
                col += " NOT NULL"
            cols.append(col)
        sql = f"CREATE TABLE IF NOT EXISTS {cls._table} ({', '.join(cols)})"
        _conn.execute(sql)
        _conn.commit()

    def save(self) -> "Model":
        pk_field = next((n for n, f in self._fields.items() if f.primary_key), None)
        data = {n: getattr(self, n) for n in self._fields if n != pk_field}
        # 布尔值 → int
        data = {k: int(v) if isinstance(v, bool) else v for k, v in data.items()}

        if pk_field and getattr(self, pk_field):
            sets = ", ".join(f"{k}=?" for k in data)
            _conn.execute(
                f"UPDATE {self._table} SET {sets} WHERE {pk_field}=?",
                list(data.values()) + [getattr(self, pk_field)]
            )
        else:
            cols = ", ".join(data.keys())
            placeholders = ", ".join("?" * len(data))
            cur = _conn.execute(
                f"INSERT INTO {self._table} ({cols}) VALUES ({placeholders})",
                list(data.values())
            )
            if pk_field:
                setattr(self, pk_field, cur.lastrowid)
        _conn.commit()
        return self

    @classmethod
    def objects(cls: Type[T]) -> "QuerySet[T]":
        return QuerySet(cls)


class QuerySet:
    def __init__(self, model: Type[Model]):
        self._model = model
        self._filters: List[Tuple[str, Any]] = []
        self._order: Optional[str] = None
        self._limit: Optional[int] = None

    def filter(self, **kwargs) -> "QuerySet":
        qs = QuerySet(self._model)
        qs._filters = list(self._filters) + list(kwargs.items())
        qs._order = self._order
        qs._limit = self._limit
        return qs

    def order_by(self, col: str) -> "QuerySet":
        qs = QuerySet(self._model)
        qs._filters = list(self._filters)
        qs._order = col
        qs._limit = self._limit
        return qs

    def limit(self, n: int) -> "QuerySet":
        qs = QuerySet(self._model)
        qs._filters = list(self._filters)
        qs._order = self._order
        qs._limit = n
        return qs

    def _build_sql(self) -> Tuple[str, List]:
        sql = f"SELECT * FROM {self._model._table}"
        params: List = []
        if self._filters:
            clauses = []
            for k, v in self._filters:
                if k.endswith("__contains"):
                    col = k[:-10]
                    clauses.append(f"{col} LIKE ?")
                    params.append(f"%{v}%")
                else:
                    clauses.append(f"{k}=?")
                    params.append(int(v) if isinstance(v, bool) else v)
            sql += " WHERE " + " AND ".join(clauses)
        if self._order:
            desc = self._order.startswith("-")
            col = self._order.lstrip("-")
            sql += f" ORDER BY {col} {'DESC' if desc else 'ASC'}"
        if self._limit:
            sql += f" LIMIT {self._limit}"
        return sql, params

    def all(self) -> List[Model]:
        sql, params = self._build_sql()
        rows = _conn.execute(sql, params).fetchall()
        return [self._model(**dict(r)) for r in rows]

    def first(self) -> Optional[Model]:
        results = self.limit(1).all()
        return results[0] if results else None

    def count(self) -> int:
        sql, params = self._build_sql()
        count_sql = f"SELECT COUNT(*) FROM ({sql})"
        return _conn.execute(count_sql, params).fetchone()[0]


# ─── 模型定义 ─────────────────────────────────────────────────────────────────

class Project(Model):
    id = IntField(primary_key=True)
    name = CharField(nullable=False, default="")
    description = CharField(default="")


class Tag(Model):
    id = IntField(primary_key=True)
    name = CharField(nullable=False, default="")


class Task(Model):
    id = IntField(primary_key=True)
    title = CharField(nullable=False, default="")
    done = BoolField(default=False)
    priority = IntField(default=0)
    project_id = ForeignKey("project")  # 多对一


# ─── 初始化数据库 ─────────────────────────────────────────────────────────────

def setup_db() -> None:
    Project.create_table()
    Tag.create_table()
    Task.create_table()

    # 创建项目
    p1 = Project(name="个人博客", description="技术写作项目").save()
    p2 = Project(name="开源工具", description="命令行工具集").save()

    # 创建任务
    tasks_data = [
        ("写 ORM 章节", False, 3, p1.id),
        ("写 HTTP 章节", True,  2, p1.id),
        ("实现 CLI 工具", False, 1, p2.id),
        ("添加单元测试", False, 2, p2.id),
        ("发布 v1.0",   False, 3, p1.id),
    ]
    for title, done, priority, pid in tasks_data:
        Task(title=title, done=done, priority=priority, project_id=pid).save()


# ─── 打印工具 ─────────────────────────────────────────────────────────────────

def print_tasks(title: str, tasks: List[Task]) -> None:
    print(f"\n{'─'*60}")
    print(f"  {title}(共 {len(tasks)} 条)")
    print(f"{'─'*60}")
    print(f"  {'ID':<4} {'标题':<20} {'完成':<6} {'优先级':<6} {'项目ID'}")
    print(f"  {'─'*4} {'─'*20} {'─'*6} {'─'*6} {'─'*6}")
    for t in tasks:
        done_icon = "✅" if t.done else "⬜"
        print(f"  {t.id:<4} {t.title:<20} {done_icon:<6} {t.priority:<6} {t.project_id}")
    print(f"{'─'*60}")


# ─── 演示模式 ─────────────────────────────────────────────────────────────────

def demo_basic() -> None:
    print("\n=== 基础 QuerySet 演示 ===")

    # 全部任务
    all_tasks = Task.objects().all()
    print_tasks("全部任务", all_tasks)

    # 过滤:未完成
    pending = Task.objects().filter(done=False).all()
    print_tasks("未完成任务", pending)

    # 过滤:高优先级
    high = Task.objects().filter(priority=3).all()
    print_tasks("优先级=3 的任务", high)

    # 模糊搜索
    search = Task.objects().filter(title__contains="章节").all()
    print_tasks("标题含'章节'", search)

    # 排序
    by_priority = Task.objects().order_by("-priority").all()
    print_tasks("按优先级降序", by_priority)

    # 统计
    total = Task.objects().count()
    done_count = Task.objects().filter(done=True).count()
    print(f"\n  统计:总计 {total} 条,已完成 {done_count} 条,完成率 {done_count/total:.0%}")


def demo_relations() -> None:
    print("\n=== 关联查询演示(模拟 select_related) ===")

    projects = Project.objects().all()
    print(f"\n{'─'*60}")
    print(f"  {'项目':<15} {'任务数':<8} {'任务列表'}")
    print(f"{'─'*60}")
    for proj in projects:
        tasks = Task.objects().filter(project_id=proj.id).all()
        task_titles = ", ".join(t.title for t in tasks)
        print(f"  {proj.name:<15} {len(tasks):<8} {task_titles}")
    print(f"{'─'*60}")

    # 模拟 ForeignKey 反向查询
    print("\n  反向查询:找出每个项目的未完成任务")
    print(f"{'─'*60}")
    for proj in projects:
        pending = Task.objects().filter(project_id=proj.id, done=False).all()
        print(f"  [{proj.name}] 待办 {len(pending)} 项:{[t.title for t in pending]}")
    print(f"{'─'*60}")


def demo_query() -> None:
    print("\n=== 链式查询演示 ===")
    cases = [
        ("全部任务数量",        Task.objects().count()),
        ("未完成任务数量",      Task.objects().filter(done=False).count()),
        ("项目1的任务数量",     Task.objects().filter(project_id=1).count()),
        ("优先级>=2的未完成",   Task.objects().filter(done=False, priority=2).count()),
    ]
    print(f"\n{'─'*45}")
    print(f"  {'查询描述':<25} {'结果'}")
    print(f"{'─'*45}")
    for desc, result in cases:
        print(f"  {desc:<25} {result}")
    print(f"{'─'*45}")


# ─── 入口 ─────────────────────────────────────────────────────────────────────

def main() -> None:
    parser = argparse.ArgumentParser(description="Django ORM 模拟演示(sqlite3 内存库)")
    parser.add_argument("--mode", choices=["demo", "query", "relations"], default="demo",
                        help="demo=基础查询, query=链式统计, relations=关联查询")
    args = parser.parse_args()

    setup_db()

    if args.mode == "demo":
        demo_basic()
    elif args.mode == "query":
        demo_query()
    else:
        demo_relations()


if __name__ == "__main__":
    main()
$ python3 20-python-django-orm.py --mode demo

=== 基础 QuerySet 演示 ===

────────────────────────────────────────────────────────────
  全部任务(共 5 条)
────────────────────────────────────────────────────────────
  ID   标题                   完成     优先级    项目ID
  ──── ──────────────────── ────── ────── ──────
  1    写 ORM 章节             ⬜      3      1
  2    写 HTTP 章节            ✅      2      1
  3    实现 CLI 工具            ⬜      1      2
  4    添加单元测试               ⬜      2      2
  5    发布 v1.0              ⬜      3      1
────────────────────────────────────────────────────────────

  统计:总计 5 条,已完成 1 条,完成率 20%

$ python3 20-python-django-orm.py --mode query

=== 链式查询演示 ===

─────────────────────────────────────────────────
  查询描述                    结果
─────────────────────────────────────────────────
  全部任务数量                  5
  未完成任务数量                 4
  项目1的任务数量                3
  优先级>=2的未完成              2
─────────────────────────────────────────────────

$ python3 20-python-django-orm.py --mode relations

=== 关联查询演示(模拟 select_related) ===

────────────────────────────────────────────────────────────
  项目              任务数     任务列表
────────────────────────────────────────────────────────────
  个人博客            3        写 ORM 章节, 写 HTTP 章节, 发布 v1.0
  开源工具            2        实现 CLI 工具, 添加单元测试
────────────────────────────────────────────────────────────

  反向查询:找出每个项目的未完成任务
────────────────────────────────────────────────────────────
  [个人博客] 待办 2 项:['写 ORM 章节', '发布 v1.0']
  [开源工具] 待办 2 项:['实现 CLI 工具', '添加单元测试']
────────────────────────────────────────────────────────────

小结

概念 一句话记忆
ModelMeta 元类 类定义时自动扫描 Field 属性,收集到 _fields,这是 Django Model 的核心魔法
Model.create_table() 根据 _fields 生成 CREATE TABLE SQL,对应 makemigrations + migrate
Model.save() 主键为 0 → INSERT;主键非 0 → UPDATE,自动切换
QuerySet 惰性求值 filter/order_by/limit 只构建查询,.all()/.count() 才执行 SQL
__contains 对应 SQL LIKE %value%,Django 的查询语法糖
-priority 前缀负号表示降序,对应 ORDER BY priority DESC
N+1 问题 循环里每次 filter(project_id=...) 都发一次查询,用 select_related() 解决

⏱ NexDo Time(5 分钟)

挑战:给 QuerySet 加一个 delete() 方法,支持批量删除满足条件的记录。

具体步骤:

  1. QuerySet 类里新增 delete(self) -> int 方法
  2. 复用 _build_sql() 生成 WHERE 子句,把 SELECT * 替换成 DELETE
  3. 执行 SQL,返回 _conn.execute(...).rowcount(影响行数)
  4. 验证:Task.objects().filter(done=True).delete() 应该删除已完成的任务,返回删除数量

Don’t wait for next time, do it in the next moment.