20 · Django ORM:模型设计与数据迁移
🔗 知识图谱导航:阅读本文前,建议先掌握《19 · Django 引擎:MVC、路由与视图》中的路由分发机制,以及《16 · 数据库底座:SQLite 核心操作》中的 SQL 基础——本文把这两块拼在一起,用 ORM 让你用 Python 对象操作数据库,告别手写 SQL。
运行环境:Python 3.12+ 标准库,零额外依赖,直接运行。本文用纯 Python 复现 Django ORM 的核心机制,理解后再用真实 Django 会事半功倍。
极客解析:Django ORM 的"魔法"本质上是三件事:① 元类
ModelMeta在类定义时自动收集字段;②Model.save()根据主键是否存在自动切换 INSERT/UPDATE;③QuerySet是惰性查询构建器,链式调用不执行 SQL,只有.all()才真正查数据库。
ORM 映射原理
┌──────────────────────────────────────────────────────────────┐
│ ORM 映射层 │
├──────────────┬───────────────────┬───────────────────────────┤
│ Python 概念 │ ORM 映射 │ SQL 概念 │
├──────────────┼───────────────────┼───────────────────────────┤
│ 类 │ → 映射 → │ 表(Table) │
│ 类属性 │ → 映射 → │ 列(Column) │
│ 实例 │ → 映射 → │ 行(Row) │
│ QuerySet │ → 映射 → │ SELECT 语句 │
│ .save() │ → 映射 → │ INSERT / UPDATE │
│ .delete() │ → 映射 → │ DELETE │
│ ForeignKey │ → 映射 → │ 外键约束 │
└──────────────┴───────────────────┴───────────────────────────┘
常用字段类型速查
CharField(max_length) 短字符串
TextField 长文本
IntegerField 整数
BooleanField True/False
DateTimeField 日期时间,auto_now_add=True 自动填充创建时间
ForeignKey(to, on_delete) 多对一关联,on_delete 必填
ManyToManyField(to) 多对多关联,自动创建中间表
迁移命令速查
python manage.py makemigrations # 检测 models.py 变化,生成迁移文件
python manage.py migrate # 执行迁移
python manage.py sqlmigrate tasks 0001 # 查看将要执行的 SQL
python manage.py showmigrations # 查看迁移状态
步步为营:核心逻辑自适应拆解
这一篇的核心是 Django ORM 的三层结构:元类自动收集字段 → Model.save() 自动切换 INSERT/UPDATE → QuerySet 惰性链式查询。下面每一步都聚焦一个机制,零依赖可直接运行。
Step 1:用 ModelMeta 元类自动收集字段定义
痛点与机制:
ModelMeta 是元类——它在"类被定义的那一刻"就介入,扫描所有 Field 类型的属性,收集到 _fields 字典,同时把类名小写作为表名存入 _table。这正是 Django Model 的核心魔法:你只需声明 title = CharField(max_length=200),框架自动知道这张表有一列叫 title,类型是 TEXT。元类就像"工厂的质检员"——每个产品(类)出厂前都会被检查一遍,把规格(字段)记录在案。
核心源码(逐字来自文末完整源码):
class ModelMeta(type):
def __new__(mcs, name: str, bases: tuple, namespace: dict):
fields: Dict[str, Field] = {}
for k, v in list(namespace.items()):
if isinstance(v, Field):
fields[k] = v
namespace["_fields"] = fields
namespace["_table"] = name.lower()
cls = super().__new__(mcs, name, bases, namespace)
return cls
可运行演示(补齐 Mock 数据与 print 反馈):
from typing import Any, Dict
class Field:
def __init__(self, col_type: str, primary_key: bool = False,
nullable: bool = True, default: Any = None):
self.col_type = col_type
self.primary_key = primary_key
self.nullable = nullable
self.default = default
class IntField(Field):
def __init__(self, primary_key: bool = False, default: int = 0):
super().__init__("INTEGER", primary_key=primary_key, default=default)
class CharField(Field):
def __init__(self, max_length: int = 255, nullable: bool = True, default: str = ""):
super().__init__("TEXT", nullable=nullable, default=default)
class ModelMeta(type):
def __new__(mcs, name: str, bases: tuple, namespace: dict):
fields: Dict[str, Field] = {}
for k, v in list(namespace.items()):
if isinstance(v, Field):
fields[k] = v
namespace["_fields"] = fields
namespace["_table"] = name.lower()
cls = super().__new__(mcs, name, bases, namespace)
return cls
class Model(metaclass=ModelMeta):
pass
class Project(Model):
id = IntField(primary_key=True)
name = CharField(nullable=False, default="")
description = CharField(default="")
print("表名:", Project._table)
print("字段:", list(Project._fields.keys()))
print("id 是否主键:", Project._fields["id"].primary_key)
print("name 的数据库类型:", Project._fields["name"].col_type)
Step 2:用 Model.save() 实现自动 INSERT/UPDATE 切换
痛点与机制:
save() 的逻辑就像"快递员送包裹":如果包裹有单号(主键不为 0),就去更新已有记录(UPDATE);如果没有单号,就新建一条记录(INSERT)并把数据库分配的 lastrowid 写回实例。create_table() 根据 _fields 字典动态生成 CREATE TABLE SQL——这就是 Django makemigrations 的简化版原理。
核心源码(逐字来自文末完整源码):
class Model(metaclass=ModelMeta):
_fields: Dict[str, Field] = {}
_table: str = ""
def __init__(self, **kwargs):
for name, f in self._fields.items():
setattr(self, name, kwargs.get(name, f.default))
@classmethod
def create_table(cls) -> None:
cols = []
for name, f in cls._fields.items():
col = f"{name} {f.col_type}"
if f.primary_key:
col += " PRIMARY KEY AUTOINCREMENT"
elif not f.nullable:
col += " NOT NULL"
cols.append(col)
sql = f"CREATE TABLE IF NOT EXISTS {cls._table} ({', '.join(cols)})"
_conn.execute(sql)
_conn.commit()
def save(self) -> "Model":
pk_field = next((n for n, f in self._fields.items() if f.primary_key), None)
data = {n: getattr(self, n) for n in self._fields if n != pk_field}
# 布尔值 → int
data = {k: int(v) if isinstance(v, bool) else v for k, v in data.items()}
if pk_field and getattr(self, pk_field):
sets = ", ".join(f"{k}=?" for k in data)
_conn.execute(
f"UPDATE {self._table} SET {sets} WHERE {pk_field}=?",
list(data.values()) + [getattr(self, pk_field)]
)
else:
cols = ", ".join(data.keys())
placeholders = ", ".join("?" * len(data))
cur = _conn.execute(
f"INSERT INTO {self._table} ({cols}) VALUES ({placeholders})",
list(data.values())
)
if pk_field:
setattr(self, pk_field, cur.lastrowid)
_conn.commit()
return self
@classmethod
def objects(cls: Type[T]) -> "QuerySet[T]":
return QuerySet(cls)
可运行演示(补齐 Mock 数据与 print 反馈):
import sqlite3
from typing import Any, Dict, Type, TypeVar
T = TypeVar("T", bound="Model")
_conn = sqlite3.connect(":memory:")
_conn.row_factory = sqlite3.Row
class Field:
def __init__(self, col_type: str, primary_key: bool = False,
nullable: bool = True, default: Any = None):
self.col_type = col_type
self.primary_key = primary_key
self.nullable = nullable
self.default = default
class IntField(Field):
def __init__(self, primary_key: bool = False, default: int = 0):
super().__init__("INTEGER", primary_key=primary_key, default=default)
class CharField(Field):
def __init__(self, max_length: int = 255, nullable: bool = True, default: str = ""):
super().__init__("TEXT", nullable=nullable, default=default)
class ModelMeta(type):
def __new__(mcs, name: str, bases: tuple, namespace: dict):
namespace["_fields"] = {k: v for k, v in namespace.items() if isinstance(v, Field)}
namespace["_table"] = name.lower()
return super().__new__(mcs, name, bases, namespace)
class Model(metaclass=ModelMeta):
_fields: Dict[str, Field] = {}
_table: str = ""
def __init__(self, **kwargs):
for name, f in self._fields.items():
setattr(self, name, kwargs.get(name, f.default))
@classmethod
def create_table(cls) -> None:
cols = []
for name, f in cls._fields.items():
col = f"{name} {f.col_type}"
if f.primary_key:
col += " PRIMARY KEY AUTOINCREMENT"
elif not f.nullable:
col += " NOT NULL"
cols.append(col)
sql = f"CREATE TABLE IF NOT EXISTS {cls._table} ({', '.join(cols)})"
print("建表 SQL:", sql)
_conn.execute(sql)
_conn.commit()
def save(self) -> "Model":
pk_field = next((n for n, f in self._fields.items() if f.primary_key), None)
data = {n: getattr(self, n) for n in self._fields if n != pk_field}
if pk_field and getattr(self, pk_field):
sets = ", ".join(f"{k}=?" for k in data)
print("执行 UPDATE,主键:", getattr(self, pk_field))
_conn.execute(f"UPDATE {self._table} SET {sets} WHERE {pk_field}=?", list(data.values()) + [getattr(self, pk_field)])
else:
cols = ", ".join(data.keys())
placeholders = ", ".join("?" * len(data))
print("执行 INSERT,数据:", data)
cur = _conn.execute(f"INSERT INTO {self._table} ({cols}) VALUES ({placeholders})", list(data.values()))
if pk_field:
setattr(self, pk_field, cur.lastrowid)
_conn.commit()
return self
class Project(Model):
id = IntField(primary_key=True)
name = CharField(nullable=False, default="")
description = CharField(default="")
Project.create_table()
project = Project(name="小白 ORM", description="第一次保存")
project.save()
print("保存后数据库分配 id:", project.id)
project.description = "第二次保存会更新同一行"
project.save()
row = _conn.execute("SELECT id, name, description FROM project").fetchone()
print("数据库当前行:", dict(row))
Step 3:用 QuerySet 理解惰性查询——链式调用不执行 SQL
痛点与机制:
QuerySet 是"点餐单"而不是"菜":每次 filter()/order_by()/limit() 都只是在点餐单上加一行要求,返回一个新的 QuerySet,不碰数据库。只有调用 .all()/.first()/.count() 时,才把点餐单翻译成 SQL 发给数据库。这个"惰性求值"设计让你可以把查询条件分散在代码各处,最后一次性执行,避免了多次数据库往返。
核心源码(逐字来自文末完整源码):
class QuerySet:
def __init__(self, model: Type[Model]):
self._model = model
self._filters: List[Tuple[str, Any]] = []
self._order: Optional[str] = None
self._limit: Optional[int] = None
def filter(self, **kwargs) -> "QuerySet":
qs = QuerySet(self._model)
qs._filters = list(self._filters) + list(kwargs.items())
qs._order = self._order
qs._limit = self._limit
return qs
def order_by(self, col: str) -> "QuerySet":
qs = QuerySet(self._model)
qs._filters = list(self._filters)
qs._order = col
qs._limit = self._limit
return qs
def limit(self, n: int) -> "QuerySet":
qs = QuerySet(self._model)
qs._filters = list(self._filters)
qs._order = self._order
qs._limit = n
return qs
def _build_sql(self) -> Tuple[str, List]:
sql = f"SELECT * FROM {self._model._table}"
params: List = []
if self._filters:
clauses = []
for k, v in self._filters:
if k.endswith("__contains"):
col = k[:-10]
clauses.append(f"{col} LIKE ?")
params.append(f"%{v}%")
else:
clauses.append(f"{k}=?")
params.append(int(v) if isinstance(v, bool) else v)
sql += " WHERE " + " AND ".join(clauses)
if self._order:
desc = self._order.startswith("-")
col = self._order.lstrip("-")
sql += f" ORDER BY {col} {'DESC' if desc else 'ASC'}"
if self._limit:
sql += f" LIMIT {self._limit}"
return sql, params
def all(self) -> List[Model]:
sql, params = self._build_sql()
rows = _conn.execute(sql, params).fetchall()
return [self._model(**dict(r)) for r in rows]
def first(self) -> Optional[Model]:
results = self.limit(1).all()
return results[0] if results else None
def count(self) -> int:
sql, params = self._build_sql()
count_sql = f"SELECT COUNT(*) FROM ({sql})"
return _conn.execute(count_sql, params).fetchone()[0]
可运行演示(补齐 Mock 数据与 print 反馈):
from typing import Any, List, Optional, Tuple
class FakeModel:
_table = "task"
class QuerySet:
def __init__(self, model):
self._model = model
self._filters: List[Tuple[str, Any]] = []
self._order: Optional[str] = None
self._limit: Optional[int] = None
def filter(self, **kwargs) -> "QuerySet":
qs = QuerySet(self._model)
qs._filters = list(self._filters) + list(kwargs.items())
qs._order = self._order
qs._limit = self._limit
return qs
def order_by(self, col: str) -> "QuerySet":
qs = QuerySet(self._model)
qs._filters = list(self._filters)
qs._order = col
qs._limit = self._limit
return qs
def limit(self, n: int) -> "QuerySet":
qs = QuerySet(self._model)
qs._filters = list(self._filters)
qs._order = self._order
qs._limit = n
return qs
def _build_sql(self) -> Tuple[str, List]:
sql = f"SELECT * FROM {self._model._table}"
params: List = []
if self._filters:
clauses = []
for k, v in self._filters:
clauses.append(f"{k}=?")
params.append(int(v) if isinstance(v, bool) else v)
sql += " WHERE " + " AND ".join(clauses)
if self._order:
desc = self._order.startswith("-")
col = self._order.lstrip("-")
sql += f" ORDER BY {col} {'DESC' if desc else 'ASC'}"
if self._limit:
sql += f" LIMIT {self._limit}"
return sql, params
# 链式调用只是“攒条件”,像先写购物清单,还没去收银台。
qs1 = QuerySet(FakeModel)
qs2 = qs1.filter(done=False)
qs3 = qs2.order_by("-priority").limit(2)
print("原始 QuerySet 条件:", qs1._filters)
print("新 QuerySet 条件:", qs3._filters)
print("真正要执行的 SQL:", qs3._build_sql()[0])
print("SQL 参数:", qs3._build_sql()[1])
Step 4:用链式 filter/order_by/limit 组合查询条件
痛点与机制:
链式调用的关键是"每步返回新对象":filter() 不修改原 QuerySet,而是复制一份再加条件。这样 qs1 = Task.objects().filter(done=False) 和 qs2 = qs1.filter(priority=3) 是两个独立的查询,互不干扰。__contains 是 Django 的查询语法糖,对应 SQL 的 LIKE %value%;-priority 前缀的负号表示降序,对应 SQL 的 ORDER BY priority DESC。
核心源码(逐字来自文末完整源码):
def demo_query() -> None:
print("\n=== 链式查询演示 ===")
cases = [
("全部任务数量", Task.objects().count()),
("未完成任务数量", Task.objects().filter(done=False).count()),
("项目1的任务数量", Task.objects().filter(project_id=1).count()),
("优先级>=2的未完成", Task.objects().filter(done=False, priority=2).count()),
]
print(f"\n{'─'*45}")
print(f" {'查询描述':<25} {'结果'}")
print(f"{'─'*45}")
for desc, result in cases:
print(f" {desc:<25} {result}")
print(f"{'─'*45}")
可运行演示(补齐 Mock 数据与 print 反馈):
import sqlite3
from typing import Any, List, Optional, Tuple
_conn = sqlite3.connect(":memory:")
_conn.row_factory = sqlite3.Row
_conn.execute("CREATE TABLE task (id INTEGER PRIMARY KEY, title TEXT, done INTEGER, priority INTEGER)")
_conn.executemany("INSERT INTO task VALUES (?, ?, ?, ?)", [
(1, "写 ORM 章节", 0, 3),
(2, "写单元测试", 1, 2),
(3, "发布 v1.0", 0, 5),
(4, "修复页面样式", 0, 1),
])
class Task:
_table = "task"
class QuerySet:
def __init__(self, model):
self._model = model
self._filters: List[Tuple[str, Any]] = []
self._order: Optional[str] = None
self._limit: Optional[int] = None
def filter(self, **kwargs) -> "QuerySet":
qs = QuerySet(self._model)
qs._filters = list(self._filters) + list(kwargs.items())
qs._order = self._order
qs._limit = self._limit
return qs
def order_by(self, col: str) -> "QuerySet":
qs = QuerySet(self._model)
qs._filters = list(self._filters)
qs._order = col
qs._limit = self._limit
return qs
def limit(self, n: int) -> "QuerySet":
qs = QuerySet(self._model)
qs._filters = list(self._filters)
qs._order = self._order
qs._limit = n
return qs
def _build_sql(self) -> Tuple[str, List]:
sql = f"SELECT * FROM {self._model._table}"
params: List = []
if self._filters:
clauses = []
for k, v in self._filters:
if k.endswith("__contains"):
clauses.append(f"{k[:-10]} LIKE ?")
params.append(f"%{v}%")
else:
clauses.append(f"{k}=?")
params.append(int(v) if isinstance(v, bool) else v)
sql += " WHERE " + " AND ".join(clauses)
if self._order:
desc = self._order.startswith("-")
sql += f" ORDER BY {self._order.lstrip('-')} {'DESC' if desc else 'ASC'}"
if self._limit:
sql += f" LIMIT {self._limit}"
return sql, params
def all(self) -> List[sqlite3.Row]:
sql, params = self._build_sql()
print("执行 SQL:", sql)
print("参数:", params)
return _conn.execute(sql, params).fetchall()
rows = QuerySet(Task).filter(done=False).filter(title__contains="写").order_by("-priority").limit(2).all()
for row in rows:
print(f"#{row['id']} {row['title']} priority={row['priority']}")
Step 5:用 print_tasks 格式化展示 QuerySet 结果
痛点与机制:
print_tasks 把 QuerySet.all() 返回的模型实例列表格式化成对齐表格。t.done 是布尔值,但数据库里存的是 0/1 整数——sqlite3.Row 返回的是整数,所以 if t.done 等价于 if t.done != 0,Python 的真值判断自动处理了这个转换。f"{t.id:<4}" 的 :<4 是左对齐宽度4,让表格列整齐。
核心源码(逐字来自文末完整源码):
def print_tasks(title: str, tasks: List[Task]) -> None:
print(f"\n{'─'*60}")
print(f" {title}(共 {len(tasks)} 条)")
print(f"{'─'*60}")
print(f" {'ID':<4} {'标题':<20} {'完成':<6} {'优先级':<6} {'项目ID'}")
print(f" {'─'*4} {'─'*20} {'─'*6} {'─'*6} {'─'*6}")
for t in tasks:
done_icon = "✅" if t.done else "⬜"
print(f" {t.id:<4} {t.title:<20} {done_icon:<6} {t.priority:<6} {t.project_id}")
print(f"{'─'*60}")
可运行演示(补齐 Mock 数据与 print 反馈):
from dataclasses import dataclass
from typing import List
@dataclass
class Task:
id: int
title: str
done: bool
priority: int
def print_tasks(title: str, tasks: List[Task]) -> None:
print(f"\n{'─'*60}")
print(f" {title}")
print(f"{'─'*60}")
for t in tasks:
icon = "✅" if t.done else "⬜"
print(f" [{t.id:>2}] {icon} P{t.priority} {t.title}")
print(f"{'─'*60}")
# 格式化输出不是花活:新手能一眼分清 id、状态、优先级和标题。
tasks = [
Task(1, "写 ORM 章节", False, 3),
Task(2, "写单元测试", True, 2),
Task(3, "发布 v1.0", False, 5),
]
print_tasks("任务列表", tasks)
Step 6:用 demo_relations 模拟 ForeignKey 反向查询
痛点与机制:
ForeignKey 在数据库层面只是一个整数列(project_id),ORM 的"魔法"是在 Python 层面提供反向查询接口。Django 里 project.task_set.all() 底层就是 Task.objects.filter(project=project)。这里手动写 Task.objects().filter(project_id=proj.id) 演示了同样的逻辑,让你看清"反向查询"不是魔法,只是一次带外键过滤的普通查询。N+1 问题也在这里体现:每个项目都发一次查询,项目多了就会很慢——Django 的 select_related() 就是为了解决这个问题。
核心源码(逐字来自文末完整源码):
def demo_relations() -> None:
print("\n=== 关联查询演示(模拟 select_related) ===")
projects = Project.objects().all()
print(f"\n{'─'*60}")
print(f" {'项目':<15} {'任务数':<8} {'任务列表'}")
print(f"{'─'*60}")
for proj in projects:
tasks = Task.objects().filter(project_id=proj.id).all()
task_titles = ", ".join(t.title for t in tasks)
print(f" {proj.name:<15} {len(tasks):<8} {task_titles}")
print(f"{'─'*60}")
# 模拟 ForeignKey 反向查询
print("\n 反向查询:找出每个项目的未完成任务")
print(f"{'─'*60}")
for proj in projects:
pending = Task.objects().filter(project_id=proj.id, done=False).all()
print(f" [{proj.name}] 待办 {len(pending)} 项:{[t.title for t in pending]}")
print(f"{'─'*60}")
可运行演示(补齐 Mock 数据与 print 反馈):
from dataclasses import dataclass
from typing import List
@dataclass
class Project:
id: int
name: str
@dataclass
class Task:
id: int
project_id: int
title: str
done: bool
projects = [Project(1, "个人博客"), Project(2, "自动化脚本")]
tasks = [
Task(1, 1, "写 ORM 章节", False),
Task(2, 1, "发布 v1.0", False),
Task(3, 2, "写部署脚本", True),
]
# ForeignKey 本质上就是一列 project_id:任务用这个 id 指向所属项目。
for project in projects:
related_tasks: List[Task] = [task for task in tasks if task.project_id == project.id]
todo_titles = [task.title for task in related_tasks if not task.done]
print(f"项目 {project.name}: 总任务={len(related_tasks)}, 待办={todo_titles}")
Step 7:用 demo_query 展示链式统计查询
痛点与机制:
demo_query 把常见的统计场景列成表格:总数、按条件过滤的数量、多条件组合。count() 把 _build_sql() 生成的 SELECT 包一层 SELECT COUNT(*) FROM (...) 子查询,这是不改变原查询逻辑就能统计数量的标准技巧。Django 的 QuerySet.count() 底层也是类似的 SELECT COUNT(*) 优化,而不是先 SELECT * 再 len()。
核心源码(逐字来自文末完整源码):
def demo_query() -> None:
print("\n=== 链式查询演示 ===")
cases = [
("全部任务数量", Task.objects().count()),
("未完成任务数量", Task.objects().filter(done=False).count()),
("项目1的任务数量", Task.objects().filter(project_id=1).count()),
("优先级>=2的未完成", Task.objects().filter(done=False, priority=2).count()),
]
print(f"\n{'─'*45}")
print(f" {'查询描述':<25} {'结果'}")
print(f"{'─'*45}")
for desc, result in cases:
print(f" {desc:<25} {result}")
print(f"{'─'*45}")
可运行演示(补齐 Mock 数据与 print 反馈):
import sqlite3
_conn = sqlite3.connect(":memory:")
_conn.row_factory = sqlite3.Row
_conn.execute("CREATE TABLE task (id INTEGER PRIMARY KEY, title TEXT, done INTEGER, priority INTEGER)")
_conn.executemany("INSERT INTO task VALUES (?, ?, ?, ?)", [
(1, "写 ORM 章节", 0, 3),
(2, "写单元测试", 1, 2),
(3, "发布 v1.0", 0, 5),
(4, "写部署文档", 0, 4),
])
# 这一步模拟 demo_query:先过滤未完成,再按优先级倒序,最后取前 3 条。
sql = "SELECT * FROM task WHERE done=? ORDER BY priority DESC LIMIT 3"
params = [0]
rows = _conn.execute(sql, params).fetchall()
print("=== 链式查询演示 ===")
print("等价 SQL:", sql)
print("未完成任务数:", _conn.execute("SELECT COUNT(*) FROM task WHERE done=0").fetchone()[0])
for row in rows:
print(f"P{row['priority']} - {row['title']}")
Step 8:用 main 做 demo/query/relations 三种模式的 CLI 总入口
痛点与机制:
main 用 argparse 做 CLI 入口,三种模式对应三个观察角度:demo 看基础 CRUD 和过滤,query 看链式统计,relations 看外键关联查询。setup_db() 在 main 里调用而不是模块级,避免 import 时就执行数据库操作——这是 Python 模块设计的好习惯,也是 Django 的 AppConfig.ready() 的设计思路。
核心源码(逐字来自文末完整源码):
def main() -> None:
parser = argparse.ArgumentParser(description="Django ORM 模拟演示(sqlite3 内存库)")
parser.add_argument("--mode", choices=["demo", "query", "relations"], default="demo",
help="demo=基础查询, query=链式统计, relations=关联查询")
args = parser.parse_args()
setup_db()
if args.mode == "demo":
demo_basic()
elif args.mode == "query":
demo_query()
else:
demo_relations()
可运行演示(补齐 Mock 数据与 print 反馈):
import argparse
import sys
def demo_basic() -> None:
print("demo 模式: 建表、插入、更新、基础查询")
def demo_query() -> None:
print("query 模式: filter/order_by/limit/count 链式查询")
def demo_relations() -> None:
print("relations 模式: 用 project_id 模拟 ForeignKey 关联")
def main() -> None:
parser = argparse.ArgumentParser(description="Django ORM 核心机制演示(零依赖)")
parser.add_argument("--mode", choices=["demo", "query", "relations"], default="demo",
help="demo=基础CRUD, query=链式查询, relations=关联查询")
args = parser.parse_args()
if args.mode == "query":
demo_query()
elif args.mode == "relations":
demo_relations()
else:
demo_basic()
for mode in ["demo", "query", "relations"]:
sys.argv = ["prog", "--mode", mode]
print(f">>> python3 20-python-django-orm.py --mode {mode}")
main()
极客实战:完整源码与运行
现在,把上面的积木拼起来,将以下完整代码放进你的编辑器,运行它。先看整体闭环,再回头逐段改参数,你会更容易建立工程直觉。
#!/usr/bin/env python3
"""
20-django-orm.py
用 sqlite3 内存库实现简化版 ORM 映射层
演示模型定义、QuerySet 链式查询、关联查询
用法:
python3 20-django-orm.py --mode demo
python3 20-django-orm.py --mode query
python3 20-django-orm.py --mode relations
"""
import argparse
import sqlite3
from dataclasses import dataclass, field
from typing import Any, Dict, List, Optional, Tuple, Type, TypeVar
# ─── 简化版 ORM 核心 ──────────────────────────────────────────────────────────
T = TypeVar("T", bound="Model")
_conn = sqlite3.connect(":memory:")
_conn.row_factory = sqlite3.Row
class Field:
def __init__(self, col_type: str, primary_key: bool = False,
nullable: bool = True, default: Any = None):
self.col_type = col_type
self.primary_key = primary_key
self.nullable = nullable
self.default = default
class IntField(Field):
def __init__(self, primary_key: bool = False, default: int = 0):
super().__init__("INTEGER", primary_key=primary_key, default=default)
class CharField(Field):
def __init__(self, max_length: int = 255, nullable: bool = True, default: str = ""):
super().__init__(f"TEXT", nullable=nullable, default=default)
class BoolField(Field):
def __init__(self, default: bool = False):
super().__init__("INTEGER", default=int(default))
class ForeignKey(Field):
def __init__(self, to: str):
super().__init__("INTEGER", nullable=True)
self.to = to
class ModelMeta(type):
def __new__(mcs, name: str, bases: tuple, namespace: dict):
fields: Dict[str, Field] = {}
for k, v in list(namespace.items()):
if isinstance(v, Field):
fields[k] = v
namespace["_fields"] = fields
namespace["_table"] = name.lower()
cls = super().__new__(mcs, name, bases, namespace)
return cls
class Model(metaclass=ModelMeta):
_fields: Dict[str, Field] = {}
_table: str = ""
def __init__(self, **kwargs):
for name, f in self._fields.items():
setattr(self, name, kwargs.get(name, f.default))
@classmethod
def create_table(cls) -> None:
cols = []
for name, f in cls._fields.items():
col = f"{name} {f.col_type}"
if f.primary_key:
col += " PRIMARY KEY AUTOINCREMENT"
elif not f.nullable:
col += " NOT NULL"
cols.append(col)
sql = f"CREATE TABLE IF NOT EXISTS {cls._table} ({', '.join(cols)})"
_conn.execute(sql)
_conn.commit()
def save(self) -> "Model":
pk_field = next((n for n, f in self._fields.items() if f.primary_key), None)
data = {n: getattr(self, n) for n in self._fields if n != pk_field}
# 布尔值 → int
data = {k: int(v) if isinstance(v, bool) else v for k, v in data.items()}
if pk_field and getattr(self, pk_field):
sets = ", ".join(f"{k}=?" for k in data)
_conn.execute(
f"UPDATE {self._table} SET {sets} WHERE {pk_field}=?",
list(data.values()) + [getattr(self, pk_field)]
)
else:
cols = ", ".join(data.keys())
placeholders = ", ".join("?" * len(data))
cur = _conn.execute(
f"INSERT INTO {self._table} ({cols}) VALUES ({placeholders})",
list(data.values())
)
if pk_field:
setattr(self, pk_field, cur.lastrowid)
_conn.commit()
return self
@classmethod
def objects(cls: Type[T]) -> "QuerySet[T]":
return QuerySet(cls)
class QuerySet:
def __init__(self, model: Type[Model]):
self._model = model
self._filters: List[Tuple[str, Any]] = []
self._order: Optional[str] = None
self._limit: Optional[int] = None
def filter(self, **kwargs) -> "QuerySet":
qs = QuerySet(self._model)
qs._filters = list(self._filters) + list(kwargs.items())
qs._order = self._order
qs._limit = self._limit
return qs
def order_by(self, col: str) -> "QuerySet":
qs = QuerySet(self._model)
qs._filters = list(self._filters)
qs._order = col
qs._limit = self._limit
return qs
def limit(self, n: int) -> "QuerySet":
qs = QuerySet(self._model)
qs._filters = list(self._filters)
qs._order = self._order
qs._limit = n
return qs
def _build_sql(self) -> Tuple[str, List]:
sql = f"SELECT * FROM {self._model._table}"
params: List = []
if self._filters:
clauses = []
for k, v in self._filters:
if k.endswith("__contains"):
col = k[:-10]
clauses.append(f"{col} LIKE ?")
params.append(f"%{v}%")
else:
clauses.append(f"{k}=?")
params.append(int(v) if isinstance(v, bool) else v)
sql += " WHERE " + " AND ".join(clauses)
if self._order:
desc = self._order.startswith("-")
col = self._order.lstrip("-")
sql += f" ORDER BY {col} {'DESC' if desc else 'ASC'}"
if self._limit:
sql += f" LIMIT {self._limit}"
return sql, params
def all(self) -> List[Model]:
sql, params = self._build_sql()
rows = _conn.execute(sql, params).fetchall()
return [self._model(**dict(r)) for r in rows]
def first(self) -> Optional[Model]:
results = self.limit(1).all()
return results[0] if results else None
def count(self) -> int:
sql, params = self._build_sql()
count_sql = f"SELECT COUNT(*) FROM ({sql})"
return _conn.execute(count_sql, params).fetchone()[0]
# ─── 模型定义 ─────────────────────────────────────────────────────────────────
class Project(Model):
id = IntField(primary_key=True)
name = CharField(nullable=False, default="")
description = CharField(default="")
class Tag(Model):
id = IntField(primary_key=True)
name = CharField(nullable=False, default="")
class Task(Model):
id = IntField(primary_key=True)
title = CharField(nullable=False, default="")
done = BoolField(default=False)
priority = IntField(default=0)
project_id = ForeignKey("project") # 多对一
# ─── 初始化数据库 ─────────────────────────────────────────────────────────────
def setup_db() -> None:
Project.create_table()
Tag.create_table()
Task.create_table()
# 创建项目
p1 = Project(name="个人博客", description="技术写作项目").save()
p2 = Project(name="开源工具", description="命令行工具集").save()
# 创建任务
tasks_data = [
("写 ORM 章节", False, 3, p1.id),
("写 HTTP 章节", True, 2, p1.id),
("实现 CLI 工具", False, 1, p2.id),
("添加单元测试", False, 2, p2.id),
("发布 v1.0", False, 3, p1.id),
]
for title, done, priority, pid in tasks_data:
Task(title=title, done=done, priority=priority, project_id=pid).save()
# ─── 打印工具 ─────────────────────────────────────────────────────────────────
def print_tasks(title: str, tasks: List[Task]) -> None:
print(f"\n{'─'*60}")
print(f" {title}(共 {len(tasks)} 条)")
print(f"{'─'*60}")
print(f" {'ID':<4} {'标题':<20} {'完成':<6} {'优先级':<6} {'项目ID'}")
print(f" {'─'*4} {'─'*20} {'─'*6} {'─'*6} {'─'*6}")
for t in tasks:
done_icon = "✅" if t.done else "⬜"
print(f" {t.id:<4} {t.title:<20} {done_icon:<6} {t.priority:<6} {t.project_id}")
print(f"{'─'*60}")
# ─── 演示模式 ─────────────────────────────────────────────────────────────────
def demo_basic() -> None:
print("\n=== 基础 QuerySet 演示 ===")
# 全部任务
all_tasks = Task.objects().all()
print_tasks("全部任务", all_tasks)
# 过滤:未完成
pending = Task.objects().filter(done=False).all()
print_tasks("未完成任务", pending)
# 过滤:高优先级
high = Task.objects().filter(priority=3).all()
print_tasks("优先级=3 的任务", high)
# 模糊搜索
search = Task.objects().filter(title__contains="章节").all()
print_tasks("标题含'章节'", search)
# 排序
by_priority = Task.objects().order_by("-priority").all()
print_tasks("按优先级降序", by_priority)
# 统计
total = Task.objects().count()
done_count = Task.objects().filter(done=True).count()
print(f"\n 统计:总计 {total} 条,已完成 {done_count} 条,完成率 {done_count/total:.0%}")
def demo_relations() -> None:
print("\n=== 关联查询演示(模拟 select_related) ===")
projects = Project.objects().all()
print(f"\n{'─'*60}")
print(f" {'项目':<15} {'任务数':<8} {'任务列表'}")
print(f"{'─'*60}")
for proj in projects:
tasks = Task.objects().filter(project_id=proj.id).all()
task_titles = ", ".join(t.title for t in tasks)
print(f" {proj.name:<15} {len(tasks):<8} {task_titles}")
print(f"{'─'*60}")
# 模拟 ForeignKey 反向查询
print("\n 反向查询:找出每个项目的未完成任务")
print(f"{'─'*60}")
for proj in projects:
pending = Task.objects().filter(project_id=proj.id, done=False).all()
print(f" [{proj.name}] 待办 {len(pending)} 项:{[t.title for t in pending]}")
print(f"{'─'*60}")
def demo_query() -> None:
print("\n=== 链式查询演示 ===")
cases = [
("全部任务数量", Task.objects().count()),
("未完成任务数量", Task.objects().filter(done=False).count()),
("项目1的任务数量", Task.objects().filter(project_id=1).count()),
("优先级>=2的未完成", Task.objects().filter(done=False, priority=2).count()),
]
print(f"\n{'─'*45}")
print(f" {'查询描述':<25} {'结果'}")
print(f"{'─'*45}")
for desc, result in cases:
print(f" {desc:<25} {result}")
print(f"{'─'*45}")
# ─── 入口 ─────────────────────────────────────────────────────────────────────
def main() -> None:
parser = argparse.ArgumentParser(description="Django ORM 模拟演示(sqlite3 内存库)")
parser.add_argument("--mode", choices=["demo", "query", "relations"], default="demo",
help="demo=基础查询, query=链式统计, relations=关联查询")
args = parser.parse_args()
setup_db()
if args.mode == "demo":
demo_basic()
elif args.mode == "query":
demo_query()
else:
demo_relations()
if __name__ == "__main__":
main()
$ python3 20-python-django-orm.py --mode demo
=== 基础 QuerySet 演示 ===
────────────────────────────────────────────────────────────
全部任务(共 5 条)
────────────────────────────────────────────────────────────
ID 标题 完成 优先级 项目ID
──── ──────────────────── ────── ────── ──────
1 写 ORM 章节 ⬜ 3 1
2 写 HTTP 章节 ✅ 2 1
3 实现 CLI 工具 ⬜ 1 2
4 添加单元测试 ⬜ 2 2
5 发布 v1.0 ⬜ 3 1
────────────────────────────────────────────────────────────
统计:总计 5 条,已完成 1 条,完成率 20%
$ python3 20-python-django-orm.py --mode query
=== 链式查询演示 ===
─────────────────────────────────────────────────
查询描述 结果
─────────────────────────────────────────────────
全部任务数量 5
未完成任务数量 4
项目1的任务数量 3
优先级>=2的未完成 2
─────────────────────────────────────────────────
$ python3 20-python-django-orm.py --mode relations
=== 关联查询演示(模拟 select_related) ===
────────────────────────────────────────────────────────────
项目 任务数 任务列表
────────────────────────────────────────────────────────────
个人博客 3 写 ORM 章节, 写 HTTP 章节, 发布 v1.0
开源工具 2 实现 CLI 工具, 添加单元测试
────────────────────────────────────────────────────────────
反向查询:找出每个项目的未完成任务
────────────────────────────────────────────────────────────
[个人博客] 待办 2 项:['写 ORM 章节', '发布 v1.0']
[开源工具] 待办 2 项:['实现 CLI 工具', '添加单元测试']
────────────────────────────────────────────────────────────
小结
| 概念 | 一句话记忆 |
|---|---|
ModelMeta 元类 |
类定义时自动扫描 Field 属性,收集到 _fields,这是 Django Model 的核心魔法 |
Model.create_table() |
根据 _fields 生成 CREATE TABLE SQL,对应 makemigrations + migrate |
Model.save() |
主键为 0 → INSERT;主键非 0 → UPDATE,自动切换 |
QuerySet 惰性求值 |
filter/order_by/limit 只构建查询,.all()/.count() 才执行 SQL |
__contains |
对应 SQL LIKE %value%,Django 的查询语法糖 |
-priority |
前缀负号表示降序,对应 ORDER BY priority DESC |
| N+1 问题 | 循环里每次 filter(project_id=...) 都发一次查询,用 select_related() 解决 |
⏱ NexDo Time(5 分钟)
挑战:给 QuerySet 加一个 delete() 方法,支持批量删除满足条件的记录。
具体步骤:
- 在
QuerySet类里新增delete(self) -> int方法 - 复用
_build_sql()生成 WHERE 子句,把SELECT *替换成DELETE - 执行 SQL,返回
_conn.execute(...).rowcount(影响行数) - 验证:
Task.objects().filter(done=True).delete()应该删除已完成的任务,返回删除数量
Don’t wait for next time, do it in the next moment.