4 · 任务图谱基础:列表、字典与集合
🔗 知识图谱导航:阅读本文前,建议先掌握/回顾 《3 · 逻辑引擎:条件、循环与推导式》 中的核心概念;本文会在这个基础上继续推进。 承上启下:上一篇我们用推导式批量处理文本。现在系统化地掌握 Python 的四大数据结构——它们是构建任何复杂系统的基石。
极客解析:先把数据流、控制流和模块边界跑通,再谈抽象;每段代码都围绕一个可执行 CLI 闭环展开。
痛点与架构
| 结构 | 可变 | 有序 | 去重 | 典型场景 |
|---|---|---|---|---|
list |
✅ | ✅ | ❌ | 任务队列、日志序列 |
dict |
✅ | ✅(3.7+) | key唯一 | 配置映射、状态存储 |
set |
✅ | ❌ | ✅ | 去重、权限集合、集合运算 |
tuple |
❌ | ✅ | ❌ | 坐标、数据库行、函数返回值 |
实战演练场
用任务图谱调度器演示四种数据结构的协同工作:
步步为营:核心逻辑自适应拆解
导师提示:这一篇把 Python 四大内置结构放进同一个任务调度场景里讲。你可以把它当成一个小型项目:列表存任务,字典查关系,集合判断资源,队列排执行顺序。
Step 1:用 NamedTuple 定义一条任务记录
核心源码(逐字来自文末完整源码):
class TaskNode(NamedTuple):
id: int
name: str
priority: int # 1=高, 2=中, 3=低
required_tags: frozenset[str] # frozenset:不可变集合,可作 dict key
可运行演示(补齐 Mock 数据与 print 反馈):
from collections import defaultdict, deque
from typing import NamedTuple
class TaskNode(NamedTuple):
id: int
name: str
priority: int # 1=高, 2=中, 3=低
required_tags: frozenset[str] # frozenset:不可变集合,可作 dict key
task = TaskNode(1, "数据采集", 1, frozenset({"network", "io"}))
print("任务对象:", task)
print("任务名:", task.name)
print("需要资源:", sorted(task.required_tags))
大白话解析:NamedTuple 像一张固定列的登记表:每个任务都有 id、name、priority、required_tags。它比裸字典更稳,因为字段名固定、顺序固定,新手不容易写错 key。
Step 2:用列表保存多条任务 Mock 数据
核心源码(逐字来自文末完整源码):
TASK_NODES: list[TaskNode] = [
TaskNode(1, "数据采集", 1, frozenset({"network", "io"})),
TaskNode(2, "文本清洗", 2, frozenset({"cpu", "nlp"})),
TaskNode(3, "特征提取", 2, frozenset({"cpu", "ml"})),
TaskNode(4, "模型训练", 1, frozenset({"gpu", "ml"})),
TaskNode(5, "结果评估", 3, frozenset({"cpu"})),
TaskNode(6, "报告生成", 3, frozenset({"io"})),
]
可运行演示(补齐 Mock 数据与 print 反馈):
from collections import defaultdict, deque
from typing import NamedTuple
class TaskNode(NamedTuple):
id: int
name: str
priority: int # 1=高, 2=中, 3=低
required_tags: frozenset[str] # frozenset:不可变集合,可作 dict key
TASK_NODES: list[TaskNode] = [
TaskNode(1, "数据采集", 1, frozenset({"network", "io"})),
TaskNode(2, "文本清洗", 2, frozenset({"cpu", "nlp"})),
TaskNode(3, "特征提取", 2, frozenset({"cpu", "ml"})),
TaskNode(4, "模型训练", 1, frozenset({"gpu", "ml"})),
TaskNode(5, "结果评估", 3, frozenset({"cpu"})),
TaskNode(6, "报告生成", 3, frozenset({"io"})),
]
print(f"任务总数:{len(TASK_NODES)}")
for task in TASK_NODES[:3]:
print(f"[{task.id}] {task.name} / P{task.priority} / {sorted(task.required_tags)}")
大白话解析:TASK_NODES 是任务列表,像待办清单。列表负责保存“多条记录”,每条记录是一个 TaskNode。先有这批假数据,后面的排序、查找、调度才有东西可操作。
Step 3:用字典描述任务之间的依赖关系
核心源码(逐字来自文末完整源码):
DEPENDENCIES: dict[int, list[int]] = {
1: [],
2: [1],
3: [1, 2],
4: [3],
5: [4],
6: [4, 5],
}
可运行演示(补齐 Mock 数据与 print 反馈):
DEPENDENCIES: dict[int, list[int]] = {
1: [],
2: [1],
3: [1, 2],
4: [3],
5: [4],
6: [4, 5],
}
print("依赖关系表:")
for task_id, deps in DEPENDENCIES.items():
print(f"任务 {task_id} 依赖:{deps if deps else '无'}")
大白话解析:DEPENDENCIES 像施工顺序表:任务 2 必须等任务 1 完成,任务 6 必须等任务 4 和 5 完成。字典的 key 是任务编号,value 是它依赖的前置任务列表。
Step 4:用列表筛选、排序和队列出队
核心源码(逐字来自文末完整源码):
def demo_list_ops(tasks: list[TaskNode]) -> None:
print("\n ── 列表操作演示 ──────────────────────────")
# 切片
high_priority = [t for t in tasks if t.priority == 1]
print(f" 高优先级任务: {[t.name for t in high_priority]}")
# 排序(不修改原列表)
by_priority = sorted(tasks, key=lambda t: (t.priority, t.id))
print(f" 按优先级排序: {[t.name for t in by_priority]}")
# deque:高效双端队列(比 list.insert(0,...) 快 O(1))
task_queue: deque[TaskNode] = deque(maxlen=4)
for t in tasks[:4]:
task_queue.append(t)
next_task = task_queue.popleft()
print(f" 队列出队: {next_task.name},剩余: {len(task_queue)}")
可运行演示(补齐 Mock 数据与 print 反馈):
from collections import defaultdict, deque
from typing import NamedTuple
class TaskNode(NamedTuple):
id: int
name: str
priority: int # 1=高, 2=中, 3=低
required_tags: frozenset[str] # frozenset:不可变集合,可作 dict key
TASK_NODES: list[TaskNode] = [
TaskNode(1, "数据采集", 1, frozenset({"network", "io"})),
TaskNode(2, "文本清洗", 2, frozenset({"cpu", "nlp"})),
TaskNode(3, "特征提取", 2, frozenset({"cpu", "ml"})),
TaskNode(4, "模型训练", 1, frozenset({"gpu", "ml"})),
TaskNode(5, "结果评估", 3, frozenset({"cpu"})),
TaskNode(6, "报告生成", 3, frozenset({"io"})),
]
DEPENDENCIES: dict[int, list[int]] = {
1: [],
2: [1],
3: [1, 2],
4: [3],
5: [4],
6: [4, 5],
}
AVAILABLE_RESOURCES: set[str] = {"network", "io", "cpu", "nlp", "ml"}
def demo_list_ops(tasks: list[TaskNode]) -> None:
print("\n ── 列表操作演示 ──────────────────────────")
# 切片
high_priority = [t for t in tasks if t.priority == 1]
print(f" 高优先级任务: {[t.name for t in high_priority]}")
# 排序(不修改原列表)
by_priority = sorted(tasks, key=lambda t: (t.priority, t.id))
print(f" 按优先级排序: {[t.name for t in by_priority]}")
# deque:高效双端队列(比 list.insert(0,...) 快 O(1))
task_queue: deque[TaskNode] = deque(maxlen=4)
for t in tasks[:4]:
task_queue.append(t)
next_task = task_queue.popleft()
print(f" 队列出队: {next_task.name},剩余: {len(task_queue)}")
demo_list_ops(TASK_NODES)
大白话解析:列表适合处理“有顺序的一串东西”。这里先筛出高优先级任务,再用 sorted 排序,最后用 deque.popleft() 模拟队列出队。deque 像排队窗口,最早排队的任务先被叫号。
Step 5:用字典建立快速查找和标签映射
核心源码(逐字来自文末完整源码):
def demo_dict_ops(tasks: list[TaskNode]) -> None:
print("\n ── 字典操作演示 ──────────────────────────")
# 构建 id → task 的查找表
task_map: dict[int, TaskNode] = {t.id: t for t in tasks}
# defaultdict:避免 KeyError
tag_to_tasks: defaultdict[str, list[str]] = defaultdict(list)
for t in tasks:
for tag in t.required_tags:
tag_to_tasks[tag].append(t.name)
print(" 标签 → 任务映射:")
for tag, names in sorted(tag_to_tasks.items()):
print(f" {tag:<12} → {names}")
# 字典合并(Python 3.9+ 用 | 运算符)
extra_meta: dict[int, str] = {1: "已完成", 2: "进行中"}
status_map: dict[int, str] = {t.id: "待处理" for t in tasks}
merged = status_map | extra_meta # extra_meta 覆盖 status_map
print(f"\n 合并后状态: {merged}")
# .get() 安全访问
task = task_map.get(99)
print(f" 查找 id=99: {task}") # None,不报 KeyError
可运行演示(补齐 Mock 数据与 print 反馈):
from collections import defaultdict, deque
from typing import NamedTuple
class TaskNode(NamedTuple):
id: int
name: str
priority: int # 1=高, 2=中, 3=低
required_tags: frozenset[str] # frozenset:不可变集合,可作 dict key
TASK_NODES: list[TaskNode] = [
TaskNode(1, "数据采集", 1, frozenset({"network", "io"})),
TaskNode(2, "文本清洗", 2, frozenset({"cpu", "nlp"})),
TaskNode(3, "特征提取", 2, frozenset({"cpu", "ml"})),
TaskNode(4, "模型训练", 1, frozenset({"gpu", "ml"})),
TaskNode(5, "结果评估", 3, frozenset({"cpu"})),
TaskNode(6, "报告生成", 3, frozenset({"io"})),
]
DEPENDENCIES: dict[int, list[int]] = {
1: [],
2: [1],
3: [1, 2],
4: [3],
5: [4],
6: [4, 5],
}
AVAILABLE_RESOURCES: set[str] = {"network", "io", "cpu", "nlp", "ml"}
def demo_dict_ops(tasks: list[TaskNode]) -> None:
print("\n ── 字典操作演示 ──────────────────────────")
# 构建 id → task 的查找表
task_map: dict[int, TaskNode] = {t.id: t for t in tasks}
# defaultdict:避免 KeyError
tag_to_tasks: defaultdict[str, list[str]] = defaultdict(list)
for t in tasks:
for tag in t.required_tags:
tag_to_tasks[tag].append(t.name)
print(" 标签 → 任务映射:")
for tag, names in sorted(tag_to_tasks.items()):
print(f" {tag:<12} → {names}")
# 字典合并(Python 3.9+ 用 | 运算符)
extra_meta: dict[int, str] = {1: "已完成", 2: "进行中"}
status_map: dict[int, str] = {t.id: "待处理" for t in tasks}
merged = status_map | extra_meta # extra_meta 覆盖 status_map
print(f"\n 合并后状态: {merged}")
# .get() 安全访问
task = task_map.get(99)
print(f" 查找 id=99: {task}") # None,不报 KeyError
demo_dict_ops(TASK_NODES)
大白话解析:字典适合回答“给我某个 key,对应的值是什么”。task_map 是 id 到任务的索引,defaultdict(list) 像自动准备空篮子的分类箱,遇到新标签不用先手动创建列表。
Step 6:用集合判断资源是否满足任务要求
核心源码(逐字来自文末完整源码):
def demo_set_ops(tasks: list[TaskNode]) -> None:
print("\n ── 集合操作演示 ──────────────────────────")
# 所有任务需要的资源
required: set[str] = set()
for t in tasks:
required |= t.required_tags # 集合并集
missing = required - AVAILABLE_RESOURCES # 差集:缺少的资源
available_for_run = required & AVAILABLE_RESOURCES # 交集
print(f" 所有需求资源: {sorted(required)}")
print(f" 当前可用: {sorted(AVAILABLE_RESOURCES)}")
print(f" 缺少资源: {sorted(missing)}")
print(f" 可满足资源: {sorted(available_for_run)}")
# 判断哪些任务可以立即运行(资源全部满足)
runnable = [
t.name for t in tasks
if t.required_tags <= AVAILABLE_RESOURCES # 子集判断
]
print(f"\n 可立即运行的任务: {runnable}")
可运行演示(补齐 Mock 数据与 print 反馈):
from collections import defaultdict, deque
from typing import NamedTuple
class TaskNode(NamedTuple):
id: int
name: str
priority: int # 1=高, 2=中, 3=低
required_tags: frozenset[str] # frozenset:不可变集合,可作 dict key
TASK_NODES: list[TaskNode] = [
TaskNode(1, "数据采集", 1, frozenset({"network", "io"})),
TaskNode(2, "文本清洗", 2, frozenset({"cpu", "nlp"})),
TaskNode(3, "特征提取", 2, frozenset({"cpu", "ml"})),
TaskNode(4, "模型训练", 1, frozenset({"gpu", "ml"})),
TaskNode(5, "结果评估", 3, frozenset({"cpu"})),
TaskNode(6, "报告生成", 3, frozenset({"io"})),
]
DEPENDENCIES: dict[int, list[int]] = {
1: [],
2: [1],
3: [1, 2],
4: [3],
5: [4],
6: [4, 5],
}
AVAILABLE_RESOURCES: set[str] = {"network", "io", "cpu", "nlp", "ml"}
def demo_set_ops(tasks: list[TaskNode]) -> None:
print("\n ── 集合操作演示 ──────────────────────────")
# 所有任务需要的资源
required: set[str] = set()
for t in tasks:
required |= t.required_tags # 集合并集
missing = required - AVAILABLE_RESOURCES # 差集:缺少的资源
available_for_run = required & AVAILABLE_RESOURCES # 交集
print(f" 所有需求资源: {sorted(required)}")
print(f" 当前可用: {sorted(AVAILABLE_RESOURCES)}")
print(f" 缺少资源: {sorted(missing)}")
print(f" 可满足资源: {sorted(available_for_run)}")
# 判断哪些任务可以立即运行(资源全部满足)
runnable = [
t.name for t in tasks
if t.required_tags <= AVAILABLE_RESOURCES # 子集判断
]
print(f"\n 可立即运行的任务: {runnable}")
demo_set_ops(TASK_NODES)
大白话解析:集合适合做去重和关系判断。required - AVAILABLE_RESOURCES 是“还缺什么”,required & AVAILABLE_RESOURCES 是“现在能满足什么”,t.required_tags <= AVAILABLE_RESOURCES 是判断某个任务需要的资源是不是都齐了。
Step 7:用拓扑排序排出安全执行顺序
核心源码(逐字来自文末完整源码):
def topological_sort(deps: dict[int, list[int]]) -> list[int]:
"""
Kahn 算法拓扑排序:确定任务执行顺序。
使用:dict(入度统计)+ deque(BFS队列)+ list(结果)
"""
in_degree: dict[int, int] = {node: 0 for node in deps}
for node, prerequisites in deps.items():
for pre in prerequisites:
in_degree[node] += 1
queue: deque[int] = deque(
node for node, deg in in_degree.items() if deg == 0
)
order: list[int] = []
while queue:
node = queue.popleft()
order.append(node)
# 找出以 node 为前置的任务
for next_node, prerequisites in deps.items():
if node in prerequisites:
in_degree[next_node] -= 1
if in_degree[next_node] == 0:
queue.append(next_node)
return order
可运行演示(补齐 Mock 数据与 print 反馈):
from collections import defaultdict, deque
from typing import NamedTuple
class TaskNode(NamedTuple):
id: int
name: str
priority: int # 1=高, 2=中, 3=低
required_tags: frozenset[str] # frozenset:不可变集合,可作 dict key
TASK_NODES: list[TaskNode] = [
TaskNode(1, "数据采集", 1, frozenset({"network", "io"})),
TaskNode(2, "文本清洗", 2, frozenset({"cpu", "nlp"})),
TaskNode(3, "特征提取", 2, frozenset({"cpu", "ml"})),
TaskNode(4, "模型训练", 1, frozenset({"gpu", "ml"})),
TaskNode(5, "结果评估", 3, frozenset({"cpu"})),
TaskNode(6, "报告生成", 3, frozenset({"io"})),
]
DEPENDENCIES: dict[int, list[int]] = {
1: [],
2: [1],
3: [1, 2],
4: [3],
5: [4],
6: [4, 5],
}
AVAILABLE_RESOURCES: set[str] = {"network", "io", "cpu", "nlp", "ml"}
def topological_sort(deps: dict[int, list[int]]) -> list[int]:
"""
Kahn 算法拓扑排序:确定任务执行顺序。
使用:dict(入度统计)+ deque(BFS队列)+ list(结果)
"""
in_degree: dict[int, int] = {node: 0 for node in deps}
for node, prerequisites in deps.items():
for pre in prerequisites:
in_degree[node] += 1
queue: deque[int] = deque(
node for node, deg in in_degree.items() if deg == 0
)
order: list[int] = []
while queue:
node = queue.popleft()
order.append(node)
# 找出以 node 为前置的任务
for next_node, prerequisites in deps.items():
if node in prerequisites:
in_degree[next_node] -= 1
if in_degree[next_node] == 0:
queue.append(next_node)
return order
def demo_deps(tasks: list[TaskNode]) -> None:
task_map = {t.id: t for t in tasks}
order = topological_sort(DEPENDENCIES)
print("\n ── 任务依赖拓扑排序 ──────────────────────")
print(f" 执行顺序: {order}")
print()
for step, tid in enumerate(order, 1):
t = task_map[tid]
deps = DEPENDENCIES[tid]
dep_names = [task_map[d].name for d in deps]
dep_str = f"← 依赖 {dep_names}" if dep_names else "← 无依赖(起点)"
print(f" Step {step}: [{tid}] {t.name:<10} {dep_str}")
demo_deps(TASK_NODES)
大白话解析:拓扑排序像项目经理排工期:先做没有前置依赖的任务,再一步步释放后续任务。这里综合用了 dict 统计入度、deque 做队列、list 保存最终顺序,是本篇数据结构的合体实战。
极客实战:完整源码与运行
现在,把上面的积木拼起来,将以下完整代码放进你的编辑器,运行它。先看整体闭环,再回头逐段改参数,你会更容易建立工程直觉。
"""
任务图谱调度器 —— 演示列表/字典/集合/元组的工程级用法。
用法:
python3 task_graph.py
python3 task_graph.py --mode deps
python3 task_graph.py --mode ops
"""
import argparse
from collections import defaultdict, deque
from typing import NamedTuple
# ── NamedTuple:不可变的结构化数据(比 dict 更安全)──────────
class TaskNode(NamedTuple):
id: int
name: str
priority: int # 1=高, 2=中, 3=低
required_tags: frozenset[str] # frozenset:不可变集合,可作 dict key
# ── Mock 数据 ─────────────────────────────────────────────────
TASK_NODES: list[TaskNode] = [
TaskNode(1, "数据采集", 1, frozenset({"network", "io"})),
TaskNode(2, "文本清洗", 2, frozenset({"cpu", "nlp"})),
TaskNode(3, "特征提取", 2, frozenset({"cpu", "ml"})),
TaskNode(4, "模型训练", 1, frozenset({"gpu", "ml"})),
TaskNode(5, "结果评估", 3, frozenset({"cpu"})),
TaskNode(6, "报告生成", 3, frozenset({"io"})),
]
# 依赖关系:task_id → 依赖的 task_id 列表
DEPENDENCIES: dict[int, list[int]] = {
1: [],
2: [1],
3: [1, 2],
4: [3],
5: [4],
6: [4, 5],
}
# 可用资源标签集合
AVAILABLE_RESOURCES: set[str] = {"network", "io", "cpu", "nlp", "ml"}
# ── 列表操作 ──────────────────────────────────────────────────
def demo_list_ops(tasks: list[TaskNode]) -> None:
print("\n ── 列表操作演示 ──────────────────────────")
# 切片
high_priority = [t for t in tasks if t.priority == 1]
print(f" 高优先级任务: {[t.name for t in high_priority]}")
# 排序(不修改原列表)
by_priority = sorted(tasks, key=lambda t: (t.priority, t.id))
print(f" 按优先级排序: {[t.name for t in by_priority]}")
# deque:高效双端队列(比 list.insert(0,...) 快 O(1))
task_queue: deque[TaskNode] = deque(maxlen=4)
for t in tasks[:4]:
task_queue.append(t)
next_task = task_queue.popleft()
print(f" 队列出队: {next_task.name},剩余: {len(task_queue)}")
# ── 字典操作 ──────────────────────────────────────────────────
def demo_dict_ops(tasks: list[TaskNode]) -> None:
print("\n ── 字典操作演示 ──────────────────────────")
# 构建 id → task 的查找表
task_map: dict[int, TaskNode] = {t.id: t for t in tasks}
# defaultdict:避免 KeyError
tag_to_tasks: defaultdict[str, list[str]] = defaultdict(list)
for t in tasks:
for tag in t.required_tags:
tag_to_tasks[tag].append(t.name)
print(" 标签 → 任务映射:")
for tag, names in sorted(tag_to_tasks.items()):
print(f" {tag:<12} → {names}")
# 字典合并(Python 3.9+ 用 | 运算符)
extra_meta: dict[int, str] = {1: "已完成", 2: "进行中"}
status_map: dict[int, str] = {t.id: "待处理" for t in tasks}
merged = status_map | extra_meta # extra_meta 覆盖 status_map
print(f"\n 合并后状态: {merged}")
# .get() 安全访问
task = task_map.get(99)
print(f" 查找 id=99: {task}") # None,不报 KeyError
# ── 集合操作 ──────────────────────────────────────────────────
def demo_set_ops(tasks: list[TaskNode]) -> None:
print("\n ── 集合操作演示 ──────────────────────────")
# 所有任务需要的资源
required: set[str] = set()
for t in tasks:
required |= t.required_tags # 集合并集
missing = required - AVAILABLE_RESOURCES # 差集:缺少的资源
available_for_run = required & AVAILABLE_RESOURCES # 交集
print(f" 所有需求资源: {sorted(required)}")
print(f" 当前可用: {sorted(AVAILABLE_RESOURCES)}")
print(f" 缺少资源: {sorted(missing)}")
print(f" 可满足资源: {sorted(available_for_run)}")
# 判断哪些任务可以立即运行(资源全部满足)
runnable = [
t.name for t in tasks
if t.required_tags <= AVAILABLE_RESOURCES # 子集判断
]
print(f"\n 可立即运行的任务: {runnable}")
# ── 拓扑排序(综合运用所有数据结构)─────────────────────────
def topological_sort(deps: dict[int, list[int]]) -> list[int]:
"""
Kahn 算法拓扑排序:确定任务执行顺序。
使用:dict(入度统计)+ deque(BFS队列)+ list(结果)
"""
in_degree: dict[int, int] = {node: 0 for node in deps}
for node, prerequisites in deps.items():
for pre in prerequisites:
in_degree[node] += 1
queue: deque[int] = deque(
node for node, deg in in_degree.items() if deg == 0
)
order: list[int] = []
while queue:
node = queue.popleft()
order.append(node)
# 找出以 node 为前置的任务
for next_node, prerequisites in deps.items():
if node in prerequisites:
in_degree[next_node] -= 1
if in_degree[next_node] == 0:
queue.append(next_node)
return order
def demo_deps(tasks: list[TaskNode]) -> None:
task_map = {t.id: t for t in tasks}
order = topological_sort(DEPENDENCIES)
print("\n ── 任务依赖拓扑排序 ──────────────────────")
print(f" 执行顺序: {order}")
print()
for step, tid in enumerate(order, 1):
t = task_map[tid]
deps = DEPENDENCIES[tid]
dep_names = [task_map[d].name for d in deps]
dep_str = f"← 依赖 {dep_names}" if dep_names else "← 无依赖(起点)"
print(f" Step {step}: [{tid}] {t.name:<10} {dep_str}")
def main() -> None:
parser = argparse.ArgumentParser(description="任务图谱调度器")
parser.add_argument(
"--mode",
choices=["list", "dict", "set", "deps", "all"],
default="all",
)
args = parser.parse_args()
tasks = TASK_NODES
if args.mode in ("list", "all"):
demo_list_ops(tasks)
if args.mode in ("dict", "all"):
demo_dict_ops(tasks)
if args.mode in ("set", "all"):
demo_set_ops(tasks)
if args.mode in ("deps", "all"):
demo_deps(tasks)
if __name__ == "__main__":
main()
终端预期输出(--mode deps):
$ python3 task_graph.py --mode deps
── 任务依赖拓扑排序 ──────────────────────
执行顺序: [1, 2, 3, 4, 5, 6]
Step 1: [1] 数据采集 ← 无依赖(起点)
Step 2: [2] 文本清洗 ← 依赖 ['数据采集']
Step 3: [3] 特征提取 ← 依赖 ['数据采集', '文本清洗']
Step 4: [4] 模型训练 ← 依赖 ['特征提取']
Step 5: [5] 结果评估 ← 依赖 ['模型训练']
Step 6: [6] 报告生成 ← 依赖 ['模型训练', '结果评估']
避坑指南
| 坑 | 示例 | 正确做法 |
|---|---|---|
list 浅拷贝 |
b = a[:] 内部对象仍共享 |
深拷贝用 copy.deepcopy(a) |
dict 遍历时修改 |
for k in d: del d[k] → RuntimeError |
遍历 list(d.keys()) |
set 无序 |
不能用索引 s[0] |
需要有序去重用 dict.fromkeys(lst) |
tuple 单元素 |
t = (1) 是 int,不是 tuple |
t = (1,) 必须加逗号 |
NexDo Time ⚡
5 分钟极客微操:给调度器增加循环依赖检测——如果 DEPENDENCIES 中存在环(如 A→B→A),拓扑排序结果会比任务总数少,用这个特性写一个 detect_cycle() 函数并打印警告。
Don’t wait for next time, do it in the next moment.