文章

4 · 任务图谱基础:列表、字典与集合

#004 · 2026-04-16 · Python

🔗 知识图谱导航:阅读本文前,建议先掌握/回顾 《3 · 逻辑引擎:条件、循环与推导式》 中的核心概念;本文会在这个基础上继续推进。 承上启下:上一篇我们用推导式批量处理文本。现在系统化地掌握 Python 的四大数据结构——它们是构建任何复杂系统的基石。

极客解析:先把数据流、控制流和模块边界跑通,再谈抽象;每段代码都围绕一个可执行 CLI 闭环展开。


痛点与架构

结构 可变 有序 去重 典型场景
list 任务队列、日志序列
dict ✅(3.7+) key唯一 配置映射、状态存储
set 去重、权限集合、集合运算
tuple 坐标、数据库行、函数返回值

实战演练场

任务图谱调度器演示四种数据结构的协同工作:

步步为营:核心逻辑自适应拆解

导师提示:这一篇把 Python 四大内置结构放进同一个任务调度场景里讲。你可以把它当成一个小型项目:列表存任务,字典查关系,集合判断资源,队列排执行顺序。

Step 1:用 NamedTuple 定义一条任务记录

核心源码(逐字来自文末完整源码)

class TaskNode(NamedTuple):
    id: int
    name: str
    priority: int          # 1=高, 2=中, 3=低
    required_tags: frozenset[str]   # frozenset:不可变集合,可作 dict key

可运行演示(补齐 Mock 数据与 print 反馈)

from collections import defaultdict, deque
from typing import NamedTuple


class TaskNode(NamedTuple):
    id: int
    name: str
    priority: int          # 1=高, 2=中, 3=低
    required_tags: frozenset[str]   # frozenset:不可变集合,可作 dict key

task = TaskNode(1, "数据采集", 1, frozenset({"network", "io"}))
print("任务对象:", task)
print("任务名:", task.name)
print("需要资源:", sorted(task.required_tags))

大白话解析NamedTuple 像一张固定列的登记表:每个任务都有 id、name、priority、required_tags。它比裸字典更稳,因为字段名固定、顺序固定,新手不容易写错 key。

Step 2:用列表保存多条任务 Mock 数据

核心源码(逐字来自文末完整源码)

TASK_NODES: list[TaskNode] = [
    TaskNode(1, "数据采集",   1, frozenset({"network", "io"})),
    TaskNode(2, "文本清洗",   2, frozenset({"cpu", "nlp"})),
    TaskNode(3, "特征提取",   2, frozenset({"cpu", "ml"})),
    TaskNode(4, "模型训练",   1, frozenset({"gpu", "ml"})),
    TaskNode(5, "结果评估",   3, frozenset({"cpu"})),
    TaskNode(6, "报告生成",   3, frozenset({"io"})),
]

可运行演示(补齐 Mock 数据与 print 反馈)

from collections import defaultdict, deque
from typing import NamedTuple


class TaskNode(NamedTuple):
    id: int
    name: str
    priority: int          # 1=高, 2=中, 3=低
    required_tags: frozenset[str]   # frozenset:不可变集合,可作 dict key

TASK_NODES: list[TaskNode] = [
    TaskNode(1, "数据采集",   1, frozenset({"network", "io"})),
    TaskNode(2, "文本清洗",   2, frozenset({"cpu", "nlp"})),
    TaskNode(3, "特征提取",   2, frozenset({"cpu", "ml"})),
    TaskNode(4, "模型训练",   1, frozenset({"gpu", "ml"})),
    TaskNode(5, "结果评估",   3, frozenset({"cpu"})),
    TaskNode(6, "报告生成",   3, frozenset({"io"})),
]

print(f"任务总数:{len(TASK_NODES)}")
for task in TASK_NODES[:3]:
    print(f"[{task.id}] {task.name} / P{task.priority} / {sorted(task.required_tags)}")

大白话解析TASK_NODES 是任务列表,像待办清单。列表负责保存“多条记录”,每条记录是一个 TaskNode。先有这批假数据,后面的排序、查找、调度才有东西可操作。

Step 3:用字典描述任务之间的依赖关系

核心源码(逐字来自文末完整源码)

DEPENDENCIES: dict[int, list[int]] = {
    1: [],
    2: [1],
    3: [1, 2],
    4: [3],
    5: [4],
    6: [4, 5],
}

可运行演示(补齐 Mock 数据与 print 反馈)

DEPENDENCIES: dict[int, list[int]] = {
    1: [],
    2: [1],
    3: [1, 2],
    4: [3],
    5: [4],
    6: [4, 5],
}

print("依赖关系表:")
for task_id, deps in DEPENDENCIES.items():
    print(f"任务 {task_id} 依赖:{deps if deps else '无'}")

大白话解析DEPENDENCIES 像施工顺序表:任务 2 必须等任务 1 完成,任务 6 必须等任务 4 和 5 完成。字典的 key 是任务编号,value 是它依赖的前置任务列表。

Step 4:用列表筛选、排序和队列出队

核心源码(逐字来自文末完整源码)

def demo_list_ops(tasks: list[TaskNode]) -> None:
    print("\n  ── 列表操作演示 ──────────────────────────")

    # 切片
    high_priority = [t for t in tasks if t.priority == 1]
    print(f"  高优先级任务: {[t.name for t in high_priority]}")

    # 排序(不修改原列表)
    by_priority = sorted(tasks, key=lambda t: (t.priority, t.id))
    print(f"  按优先级排序: {[t.name for t in by_priority]}")

    # deque:高效双端队列(比 list.insert(0,...) 快 O(1))
    task_queue: deque[TaskNode] = deque(maxlen=4)
    for t in tasks[:4]:
        task_queue.append(t)
    next_task = task_queue.popleft()
    print(f"  队列出队: {next_task.name},剩余: {len(task_queue)}")

可运行演示(补齐 Mock 数据与 print 反馈)

from collections import defaultdict, deque
from typing import NamedTuple


class TaskNode(NamedTuple):
    id: int
    name: str
    priority: int          # 1=高, 2=中, 3=低
    required_tags: frozenset[str]   # frozenset:不可变集合,可作 dict key

TASK_NODES: list[TaskNode] = [
    TaskNode(1, "数据采集",   1, frozenset({"network", "io"})),
    TaskNode(2, "文本清洗",   2, frozenset({"cpu", "nlp"})),
    TaskNode(3, "特征提取",   2, frozenset({"cpu", "ml"})),
    TaskNode(4, "模型训练",   1, frozenset({"gpu", "ml"})),
    TaskNode(5, "结果评估",   3, frozenset({"cpu"})),
    TaskNode(6, "报告生成",   3, frozenset({"io"})),
]

DEPENDENCIES: dict[int, list[int]] = {
    1: [],
    2: [1],
    3: [1, 2],
    4: [3],
    5: [4],
    6: [4, 5],
}

AVAILABLE_RESOURCES: set[str] = {"network", "io", "cpu", "nlp", "ml"}

def demo_list_ops(tasks: list[TaskNode]) -> None:
    print("\n  ── 列表操作演示 ──────────────────────────")

    # 切片
    high_priority = [t for t in tasks if t.priority == 1]
    print(f"  高优先级任务: {[t.name for t in high_priority]}")

    # 排序(不修改原列表)
    by_priority = sorted(tasks, key=lambda t: (t.priority, t.id))
    print(f"  按优先级排序: {[t.name for t in by_priority]}")

    # deque:高效双端队列(比 list.insert(0,...) 快 O(1))
    task_queue: deque[TaskNode] = deque(maxlen=4)
    for t in tasks[:4]:
        task_queue.append(t)
    next_task = task_queue.popleft()
    print(f"  队列出队: {next_task.name},剩余: {len(task_queue)}")

demo_list_ops(TASK_NODES)

大白话解析:列表适合处理“有顺序的一串东西”。这里先筛出高优先级任务,再用 sorted 排序,最后用 deque.popleft() 模拟队列出队。deque 像排队窗口,最早排队的任务先被叫号。

Step 5:用字典建立快速查找和标签映射

核心源码(逐字来自文末完整源码)

def demo_dict_ops(tasks: list[TaskNode]) -> None:
    print("\n  ── 字典操作演示 ──────────────────────────")

    # 构建 id → task 的查找表
    task_map: dict[int, TaskNode] = {t.id: t for t in tasks}

    # defaultdict:避免 KeyError
    tag_to_tasks: defaultdict[str, list[str]] = defaultdict(list)
    for t in tasks:
        for tag in t.required_tags:
            tag_to_tasks[tag].append(t.name)

    print("  标签 → 任务映射:")
    for tag, names in sorted(tag_to_tasks.items()):
        print(f"    {tag:<12} → {names}")

    # 字典合并(Python 3.9+ 用 | 运算符)
    extra_meta: dict[int, str] = {1: "已完成", 2: "进行中"}
    status_map: dict[int, str] = {t.id: "待处理" for t in tasks}
    merged = status_map | extra_meta   # extra_meta 覆盖 status_map
    print(f"\n  合并后状态: {merged}")

    # .get() 安全访问
    task = task_map.get(99)
    print(f"  查找 id=99: {task}")   # None,不报 KeyError

可运行演示(补齐 Mock 数据与 print 反馈)

from collections import defaultdict, deque
from typing import NamedTuple


class TaskNode(NamedTuple):
    id: int
    name: str
    priority: int          # 1=高, 2=中, 3=低
    required_tags: frozenset[str]   # frozenset:不可变集合,可作 dict key

TASK_NODES: list[TaskNode] = [
    TaskNode(1, "数据采集",   1, frozenset({"network", "io"})),
    TaskNode(2, "文本清洗",   2, frozenset({"cpu", "nlp"})),
    TaskNode(3, "特征提取",   2, frozenset({"cpu", "ml"})),
    TaskNode(4, "模型训练",   1, frozenset({"gpu", "ml"})),
    TaskNode(5, "结果评估",   3, frozenset({"cpu"})),
    TaskNode(6, "报告生成",   3, frozenset({"io"})),
]

DEPENDENCIES: dict[int, list[int]] = {
    1: [],
    2: [1],
    3: [1, 2],
    4: [3],
    5: [4],
    6: [4, 5],
}

AVAILABLE_RESOURCES: set[str] = {"network", "io", "cpu", "nlp", "ml"}

def demo_dict_ops(tasks: list[TaskNode]) -> None:
    print("\n  ── 字典操作演示 ──────────────────────────")

    # 构建 id → task 的查找表
    task_map: dict[int, TaskNode] = {t.id: t for t in tasks}

    # defaultdict:避免 KeyError
    tag_to_tasks: defaultdict[str, list[str]] = defaultdict(list)
    for t in tasks:
        for tag in t.required_tags:
            tag_to_tasks[tag].append(t.name)

    print("  标签 → 任务映射:")
    for tag, names in sorted(tag_to_tasks.items()):
        print(f"    {tag:<12}{names}")

    # 字典合并(Python 3.9+ 用 | 运算符)
    extra_meta: dict[int, str] = {1: "已完成", 2: "进行中"}
    status_map: dict[int, str] = {t.id: "待处理" for t in tasks}
    merged = status_map | extra_meta   # extra_meta 覆盖 status_map
    print(f"\n  合并后状态: {merged}")

    # .get() 安全访问
    task = task_map.get(99)
    print(f"  查找 id=99: {task}")   # None,不报 KeyError

demo_dict_ops(TASK_NODES)

大白话解析:字典适合回答“给我某个 key,对应的值是什么”。task_map 是 id 到任务的索引,defaultdict(list) 像自动准备空篮子的分类箱,遇到新标签不用先手动创建列表。

Step 6:用集合判断资源是否满足任务要求

核心源码(逐字来自文末完整源码)

def demo_set_ops(tasks: list[TaskNode]) -> None:
    print("\n  ── 集合操作演示 ──────────────────────────")

    # 所有任务需要的资源
    required: set[str] = set()
    for t in tasks:
        required |= t.required_tags   # 集合并集

    missing = required - AVAILABLE_RESOURCES   # 差集:缺少的资源
    available_for_run = required & AVAILABLE_RESOURCES  # 交集

    print(f"  所有需求资源: {sorted(required)}")
    print(f"  当前可用:     {sorted(AVAILABLE_RESOURCES)}")
    print(f"  缺少资源:     {sorted(missing)}")
    print(f"  可满足资源:   {sorted(available_for_run)}")

    # 判断哪些任务可以立即运行(资源全部满足)
    runnable = [
        t.name for t in tasks
        if t.required_tags <= AVAILABLE_RESOURCES   # 子集判断
    ]
    print(f"\n  可立即运行的任务: {runnable}")

可运行演示(补齐 Mock 数据与 print 反馈)

from collections import defaultdict, deque
from typing import NamedTuple


class TaskNode(NamedTuple):
    id: int
    name: str
    priority: int          # 1=高, 2=中, 3=低
    required_tags: frozenset[str]   # frozenset:不可变集合,可作 dict key

TASK_NODES: list[TaskNode] = [
    TaskNode(1, "数据采集",   1, frozenset({"network", "io"})),
    TaskNode(2, "文本清洗",   2, frozenset({"cpu", "nlp"})),
    TaskNode(3, "特征提取",   2, frozenset({"cpu", "ml"})),
    TaskNode(4, "模型训练",   1, frozenset({"gpu", "ml"})),
    TaskNode(5, "结果评估",   3, frozenset({"cpu"})),
    TaskNode(6, "报告生成",   3, frozenset({"io"})),
]

DEPENDENCIES: dict[int, list[int]] = {
    1: [],
    2: [1],
    3: [1, 2],
    4: [3],
    5: [4],
    6: [4, 5],
}

AVAILABLE_RESOURCES: set[str] = {"network", "io", "cpu", "nlp", "ml"}

def demo_set_ops(tasks: list[TaskNode]) -> None:
    print("\n  ── 集合操作演示 ──────────────────────────")

    # 所有任务需要的资源
    required: set[str] = set()
    for t in tasks:
        required |= t.required_tags   # 集合并集

    missing = required - AVAILABLE_RESOURCES   # 差集:缺少的资源
    available_for_run = required & AVAILABLE_RESOURCES  # 交集

    print(f"  所有需求资源: {sorted(required)}")
    print(f"  当前可用:     {sorted(AVAILABLE_RESOURCES)}")
    print(f"  缺少资源:     {sorted(missing)}")
    print(f"  可满足资源:   {sorted(available_for_run)}")

    # 判断哪些任务可以立即运行(资源全部满足)
    runnable = [
        t.name for t in tasks
        if t.required_tags <= AVAILABLE_RESOURCES   # 子集判断
    ]
    print(f"\n  可立即运行的任务: {runnable}")

demo_set_ops(TASK_NODES)

大白话解析:集合适合做去重和关系判断。required - AVAILABLE_RESOURCES 是“还缺什么”,required & AVAILABLE_RESOURCES 是“现在能满足什么”,t.required_tags <= AVAILABLE_RESOURCES 是判断某个任务需要的资源是不是都齐了。

Step 7:用拓扑排序排出安全执行顺序

核心源码(逐字来自文末完整源码)

def topological_sort(deps: dict[int, list[int]]) -> list[int]:
    """
    Kahn 算法拓扑排序:确定任务执行顺序。
    使用:dict(入度统计)+ deque(BFS队列)+ list(结果)
    """
    in_degree: dict[int, int] = {node: 0 for node in deps}
    for node, prerequisites in deps.items():
        for pre in prerequisites:
            in_degree[node] += 1

    queue: deque[int] = deque(
        node for node, deg in in_degree.items() if deg == 0
    )
    order: list[int] = []

    while queue:
        node = queue.popleft()
        order.append(node)
        # 找出以 node 为前置的任务
        for next_node, prerequisites in deps.items():
            if node in prerequisites:
                in_degree[next_node] -= 1
                if in_degree[next_node] == 0:
                    queue.append(next_node)

    return order

可运行演示(补齐 Mock 数据与 print 反馈)

from collections import defaultdict, deque
from typing import NamedTuple


class TaskNode(NamedTuple):
    id: int
    name: str
    priority: int          # 1=高, 2=中, 3=低
    required_tags: frozenset[str]   # frozenset:不可变集合,可作 dict key

TASK_NODES: list[TaskNode] = [
    TaskNode(1, "数据采集",   1, frozenset({"network", "io"})),
    TaskNode(2, "文本清洗",   2, frozenset({"cpu", "nlp"})),
    TaskNode(3, "特征提取",   2, frozenset({"cpu", "ml"})),
    TaskNode(4, "模型训练",   1, frozenset({"gpu", "ml"})),
    TaskNode(5, "结果评估",   3, frozenset({"cpu"})),
    TaskNode(6, "报告生成",   3, frozenset({"io"})),
]

DEPENDENCIES: dict[int, list[int]] = {
    1: [],
    2: [1],
    3: [1, 2],
    4: [3],
    5: [4],
    6: [4, 5],
}

AVAILABLE_RESOURCES: set[str] = {"network", "io", "cpu", "nlp", "ml"}

def topological_sort(deps: dict[int, list[int]]) -> list[int]:
    """
    Kahn 算法拓扑排序:确定任务执行顺序。
    使用:dict(入度统计)+ deque(BFS队列)+ list(结果)
    """
    in_degree: dict[int, int] = {node: 0 for node in deps}
    for node, prerequisites in deps.items():
        for pre in prerequisites:
            in_degree[node] += 1

    queue: deque[int] = deque(
        node for node, deg in in_degree.items() if deg == 0
    )
    order: list[int] = []

    while queue:
        node = queue.popleft()
        order.append(node)
        # 找出以 node 为前置的任务
        for next_node, prerequisites in deps.items():
            if node in prerequisites:
                in_degree[next_node] -= 1
                if in_degree[next_node] == 0:
                    queue.append(next_node)

    return order

def demo_deps(tasks: list[TaskNode]) -> None:
    task_map = {t.id: t for t in tasks}
    order = topological_sort(DEPENDENCIES)

    print("\n  ── 任务依赖拓扑排序 ──────────────────────")
    print(f"  执行顺序: {order}")
    print()
    for step, tid in enumerate(order, 1):
        t = task_map[tid]
        deps = DEPENDENCIES[tid]
        dep_names = [task_map[d].name for d in deps]
        dep_str = f"← 依赖 {dep_names}" if dep_names else "← 无依赖(起点)"
        print(f"  Step {step}: [{tid}] {t.name:<10} {dep_str}")

demo_deps(TASK_NODES)

大白话解析:拓扑排序像项目经理排工期:先做没有前置依赖的任务,再一步步释放后续任务。这里综合用了 dict 统计入度、deque 做队列、list 保存最终顺序,是本篇数据结构的合体实战。

极客实战:完整源码与运行

现在,把上面的积木拼起来,将以下完整代码放进你的编辑器,运行它。先看整体闭环,再回头逐段改参数,你会更容易建立工程直觉。


"""
任务图谱调度器 —— 演示列表/字典/集合/元组的工程级用法。
用法:
    python3 task_graph.py
    python3 task_graph.py --mode deps
    python3 task_graph.py --mode ops
"""

import argparse
from collections import defaultdict, deque
from typing import NamedTuple


# ── NamedTuple:不可变的结构化数据(比 dict 更安全)──────────
class TaskNode(NamedTuple):
    id: int
    name: str
    priority: int          # 1=高, 2=中, 3=低
    required_tags: frozenset[str]   # frozenset:不可变集合,可作 dict key


# ── Mock 数据 ─────────────────────────────────────────────────
TASK_NODES: list[TaskNode] = [
    TaskNode(1, "数据采集",   1, frozenset({"network", "io"})),
    TaskNode(2, "文本清洗",   2, frozenset({"cpu", "nlp"})),
    TaskNode(3, "特征提取",   2, frozenset({"cpu", "ml"})),
    TaskNode(4, "模型训练",   1, frozenset({"gpu", "ml"})),
    TaskNode(5, "结果评估",   3, frozenset({"cpu"})),
    TaskNode(6, "报告生成",   3, frozenset({"io"})),
]

# 依赖关系:task_id → 依赖的 task_id 列表
DEPENDENCIES: dict[int, list[int]] = {
    1: [],
    2: [1],
    3: [1, 2],
    4: [3],
    5: [4],
    6: [4, 5],
}

# 可用资源标签集合
AVAILABLE_RESOURCES: set[str] = {"network", "io", "cpu", "nlp", "ml"}


# ── 列表操作 ──────────────────────────────────────────────────
def demo_list_ops(tasks: list[TaskNode]) -> None:
    print("\n  ── 列表操作演示 ──────────────────────────")

    # 切片
    high_priority = [t for t in tasks if t.priority == 1]
    print(f"  高优先级任务: {[t.name for t in high_priority]}")

    # 排序(不修改原列表)
    by_priority = sorted(tasks, key=lambda t: (t.priority, t.id))
    print(f"  按优先级排序: {[t.name for t in by_priority]}")

    # deque:高效双端队列(比 list.insert(0,...) 快 O(1))
    task_queue: deque[TaskNode] = deque(maxlen=4)
    for t in tasks[:4]:
        task_queue.append(t)
    next_task = task_queue.popleft()
    print(f"  队列出队: {next_task.name},剩余: {len(task_queue)}")


# ── 字典操作 ──────────────────────────────────────────────────
def demo_dict_ops(tasks: list[TaskNode]) -> None:
    print("\n  ── 字典操作演示 ──────────────────────────")

    # 构建 id → task 的查找表
    task_map: dict[int, TaskNode] = {t.id: t for t in tasks}

    # defaultdict:避免 KeyError
    tag_to_tasks: defaultdict[str, list[str]] = defaultdict(list)
    for t in tasks:
        for tag in t.required_tags:
            tag_to_tasks[tag].append(t.name)

    print("  标签 → 任务映射:")
    for tag, names in sorted(tag_to_tasks.items()):
        print(f"    {tag:<12}{names}")

    # 字典合并(Python 3.9+ 用 | 运算符)
    extra_meta: dict[int, str] = {1: "已完成", 2: "进行中"}
    status_map: dict[int, str] = {t.id: "待处理" for t in tasks}
    merged = status_map | extra_meta   # extra_meta 覆盖 status_map
    print(f"\n  合并后状态: {merged}")

    # .get() 安全访问
    task = task_map.get(99)
    print(f"  查找 id=99: {task}")   # None,不报 KeyError


# ── 集合操作 ──────────────────────────────────────────────────
def demo_set_ops(tasks: list[TaskNode]) -> None:
    print("\n  ── 集合操作演示 ──────────────────────────")

    # 所有任务需要的资源
    required: set[str] = set()
    for t in tasks:
        required |= t.required_tags   # 集合并集

    missing = required - AVAILABLE_RESOURCES   # 差集:缺少的资源
    available_for_run = required & AVAILABLE_RESOURCES  # 交集

    print(f"  所有需求资源: {sorted(required)}")
    print(f"  当前可用:     {sorted(AVAILABLE_RESOURCES)}")
    print(f"  缺少资源:     {sorted(missing)}")
    print(f"  可满足资源:   {sorted(available_for_run)}")

    # 判断哪些任务可以立即运行(资源全部满足)
    runnable = [
        t.name for t in tasks
        if t.required_tags <= AVAILABLE_RESOURCES   # 子集判断
    ]
    print(f"\n  可立即运行的任务: {runnable}")


# ── 拓扑排序(综合运用所有数据结构)─────────────────────────
def topological_sort(deps: dict[int, list[int]]) -> list[int]:
    """
    Kahn 算法拓扑排序:确定任务执行顺序。
    使用:dict(入度统计)+ deque(BFS队列)+ list(结果)
    """
    in_degree: dict[int, int] = {node: 0 for node in deps}
    for node, prerequisites in deps.items():
        for pre in prerequisites:
            in_degree[node] += 1

    queue: deque[int] = deque(
        node for node, deg in in_degree.items() if deg == 0
    )
    order: list[int] = []

    while queue:
        node = queue.popleft()
        order.append(node)
        # 找出以 node 为前置的任务
        for next_node, prerequisites in deps.items():
            if node in prerequisites:
                in_degree[next_node] -= 1
                if in_degree[next_node] == 0:
                    queue.append(next_node)

    return order


def demo_deps(tasks: list[TaskNode]) -> None:
    task_map = {t.id: t for t in tasks}
    order = topological_sort(DEPENDENCIES)

    print("\n  ── 任务依赖拓扑排序 ──────────────────────")
    print(f"  执行顺序: {order}")
    print()
    for step, tid in enumerate(order, 1):
        t = task_map[tid]
        deps = DEPENDENCIES[tid]
        dep_names = [task_map[d].name for d in deps]
        dep_str = f"← 依赖 {dep_names}" if dep_names else "← 无依赖(起点)"
        print(f"  Step {step}: [{tid}] {t.name:<10} {dep_str}")


def main() -> None:
    parser = argparse.ArgumentParser(description="任务图谱调度器")
    parser.add_argument(
        "--mode",
        choices=["list", "dict", "set", "deps", "all"],
        default="all",
    )
    args = parser.parse_args()

    tasks = TASK_NODES

    if args.mode in ("list", "all"):
        demo_list_ops(tasks)
    if args.mode in ("dict", "all"):
        demo_dict_ops(tasks)
    if args.mode in ("set", "all"):
        demo_set_ops(tasks)
    if args.mode in ("deps", "all"):
        demo_deps(tasks)


if __name__ == "__main__":
    main()

终端预期输出(--mode deps):

$ python3 task_graph.py --mode deps

  ── 任务依赖拓扑排序 ──────────────────────
  执行顺序: [1, 2, 3, 4, 5, 6]

  Step 1: [1] 数据采集   ← 无依赖(起点)
  Step 2: [2] 文本清洗   ← 依赖 ['数据采集']
  Step 3: [3] 特征提取   ← 依赖 ['数据采集', '文本清洗']
  Step 4: [4] 模型训练   ← 依赖 ['特征提取']
  Step 5: [5] 结果评估   ← 依赖 ['模型训练']
  Step 6: [6] 报告生成   ← 依赖 ['模型训练', '结果评估']

避坑指南

示例 正确做法
list 浅拷贝 b = a[:] 内部对象仍共享 深拷贝用 copy.deepcopy(a)
dict 遍历时修改 for k in d: del d[k] → RuntimeError 遍历 list(d.keys())
set 无序 不能用索引 s[0] 需要有序去重用 dict.fromkeys(lst)
tuple 单元素 t = (1)int,不是 tuple t = (1,) 必须加逗号

NexDo Time ⚡

5 分钟极客微操:给调度器增加循环依赖检测——如果 DEPENDENCIES 中存在环(如 A→B→A),拓扑排序结果会比任务总数少,用这个特性写一个 detect_cycle() 函数并打印警告。

Don’t wait for next time, do it in the next moment.