文章

10 · 核心数据结构:栈、队列与树的实现

#010 · 2026-04-16 · Python

🔗 知识图谱导航:阅读本文前,建议先掌握/回顾 《9 · 文件系统与异常:构建健壮的 IO 流》 中的核心概念;本文会在这个基础上继续推进。 上一篇我们把数据安全地读写到磁盘;本篇回到内存,用 Python 从零搭建经典数据结构——这是所有 AI 管道调度逻辑的底层骨架。

极客解析:先把数据流、控制流和模块边界跑通,再谈抽象;每段代码都围绕一个可执行 CLI 闭环展开。

痛点与架构:单独记 API 或概念很容易学完就忘;本文先锁定真实痛点,再把它拆成“输入数据 → 核心机制 → 可运行输出”三段闭环。从零用 Python 实现顺序栈、链式队列、二叉树四种遍历、快速排序与二分查找,并用栈做括号匹配、用队列模拟任务调度。

1. 顺序栈(Stack)

from typing import Generic, TypeVar
T = TypeVar("T")

class Stack(Generic[T]):
    def __init__(self) -> None:
        self._data: list[T] = []

    def push(self, item: T) -> None:
        self._data.append(item)

    def pop(self) -> T:
        if self.is_empty():
            raise IndexError("栈为空")
        return self._data.pop()

    def peek(self) -> T:
        if self.is_empty():
            raise IndexError("栈为空")
        return self._data[-1]

    def is_empty(self) -> bool:
        return len(self._data) == 0

    def __len__(self) -> int:
        return len(self._data)

stack: Stack[str] = Stack()
stack.push("先放入")
stack.push("后放入")
print("当前栈顶:", stack.peek())
print("第一次弹出:", stack.pop())
print("第二次弹出:", stack.pop())
print("栈是否为空:", stack.is_empty())

栈是 LIFO(后进先出),天然适合括号匹配、函数调用栈、撤销操作。

2. 链式队列(Queue)

步步为营:核心逻辑自适应拆解

数据结构不要死背名字,要看它解决什么“排队规则”。栈解决“最后来的先处理”,队列解决“先来的先处理”,树解决“层级关系”,排序和查找解决“怎么更快找到东西”。下面每一步都用很小的数据跑一遍。

Step 1:用 Stack 理解后进先出,像一摞盘子最后放的先拿

痛点与机制

栈的规则只有一句话:后进先出。你可以把它想成叠盘子,最后放上去的盘子一定最先被拿走。push() 是放盘子,pop() 是拿盘子,peek() 是只看最上面但不拿走。

核心源码(逐字来自文末完整源码)

class Stack(Generic[T]):
    def __init__(self) -> None:
        self._data: list[T] = []
    def push(self, item: T) -> None: self._data.append(item)
    def pop(self) -> T:
        if not self._data: raise IndexError("栈为空")
        return self._data.pop()
    def peek(self) -> T:
        if not self._data: raise IndexError("栈为空")
        return self._data[-1]
    def is_empty(self) -> bool: return not self._data
    def __len__(self) -> int: return len(self._data)

可运行演示(补齐 Mock 数据与 print 反馈)

from typing import Generic, TypeVar
T = TypeVar("T")


class Stack(Generic[T]):
    def __init__(self) -> None:
        self._data: list[T] = []
    def push(self, item: T) -> None: self._data.append(item)
    def pop(self) -> T:
        if not self._data: raise IndexError("栈为空")
        return self._data.pop()
    def peek(self) -> T:
        if not self._data: raise IndexError("栈为空")
        return self._data[-1]
    def is_empty(self) -> bool: return not self._data
    def __len__(self) -> int: return len(self._data)

stack: Stack[str] = Stack()
for item in ["第一层", "第二层", "第三层"]:
    stack.push(item)
    print("入栈:", item, "当前长度:", len(stack))
print("栈顶元素:", stack.peek())
while not stack.is_empty():
    print("出栈:", stack.pop())

Step 2:用 Stack 做括号匹配,左括号入栈,右括号验票

痛点与机制

括号匹配是栈最经典的应用。遇到左括号就压栈,遇到右括号就弹出最近的左括号来比对。为什么必须用栈?因为括号是一层套一层,最近打开的一层必须最先关闭。

核心源码(逐字来自文末完整源码)

def check_brackets(expr: str) -> tuple[bool, str]:
    stack: Stack[tuple[str, int]] = Stack()
    pairs = {")": "(", "]": "[", "}": "{"}
    for i, ch in enumerate(expr):
        if ch in "([{": stack.push((ch, i))
        elif ch in ")]}":
            if stack.is_empty(): return False, f"位置{i}: '{ch}'无对应左括号"
            top, _ = stack.pop()
            if top != pairs[ch]: return False, f"位置{i}: '{ch}'与'{top}'不匹配"
    if not stack.is_empty():
        _, pos = stack.pop(); return False, f"位置{pos}: 左括号未闭合"
    return True, "合法"

可运行演示(补齐 Mock 数据与 print 反馈)

from typing import Generic, TypeVar
T = TypeVar("T")
class Stack(Generic[T]):
    def __init__(self) -> None:
        self._data: list[T] = []
    def push(self, item: T) -> None: self._data.append(item)
    def pop(self) -> T:
        if not self._data: raise IndexError("栈为空")
        return self._data.pop()
    def peek(self) -> T:
        if not self._data: raise IndexError("栈为空")
        return self._data[-1]
    def is_empty(self) -> bool: return not self._data
    def __len__(self) -> int: return len(self._data)

# Step 2:括号匹配像进出门禁,左括号登记,右括号必须找到对应记录。
def check_brackets(expr: str) -> tuple[bool, str]:
    stack: Stack[tuple[str, int]] = Stack()
    pairs = {")": "(", "]": "[", "}": "{"}
    for i, ch in enumerate(expr):
        if ch in "([{": stack.push((ch, i))
        elif ch in ")]}":
            if stack.is_empty(): return False, f"位置{i}: '{ch}'无对应左括号"
            top, _ = stack.pop()
            if top != pairs[ch]: return False, f"位置{i}: '{ch}'与'{top}'不匹配"
    if not stack.is_empty():
        _, pos = stack.pop(); return False, f"位置{pos}: 左括号未闭合"
    return True, "合法"

for expr in ["(AI[pipeline]{ok})", "((未闭合", "(错误]匹配)"]:
    ok, msg = check_brackets(expr)
    print(expr, "=>", ok, msg)

Step 3:用 LinkedQueue 理解先进先出,像排队办业务

痛点与机制

队列的规则是先进先出,像银行排队。链式队列用 _Node 节点一个接一个串起来,_head 指向队头,_tail 指向队尾。入队只接到尾巴,出队只从头部拿。

核心源码(逐字来自文末完整源码)

@dataclass
class _Node(Generic[T]):
    val: T
    nxt: Optional["_Node[T]"] = field(default=None, repr=False)

class LinkedQueue(Generic[T]):
    def __init__(self) -> None:
        self._head: Optional[_Node[T]] = None
        self._tail: Optional[_Node[T]] = None
        self._size = 0
    def enqueue(self, item: T) -> None:
        node: _Node[T] = _Node(item)
        if self._tail: self._tail.nxt = node
        else: self._head = node
        self._tail = node
        self._size += 1
    def dequeue(self) -> T:
        if not self._head: raise IndexError("队列为空")
        val = self._head.val
        self._head = self._head.nxt
        if not self._head: self._tail = None
        self._size -= 1
        return val
    def is_empty(self) -> bool: return self._size == 0
    def __len__(self) -> int: return self._size

可运行演示(补齐 Mock 数据与 print 反馈)

from dataclasses import dataclass, field
from typing import Generic, Optional, TypeVar
T = TypeVar("T")

# Step 3:队列像排队窗口,先来的人先办理,后来的人排到尾巴。
@dataclass
class _Node(Generic[T]):
    val: T
    nxt: Optional["_Node[T]"] = field(default=None, repr=False)

class LinkedQueue(Generic[T]):
    def __init__(self) -> None:
        self._head: Optional[_Node[T]] = None
        self._tail: Optional[_Node[T]] = None
        self._size = 0
    def enqueue(self, item: T) -> None:
        node: _Node[T] = _Node(item)
        if self._tail: self._tail.nxt = node
        else: self._head = node
        self._tail = node
        self._size += 1
    def dequeue(self) -> T:
        if not self._head: raise IndexError("队列为空")
        val = self._head.val
        self._head = self._head.nxt
        if not self._head: self._tail = None
        self._size -= 1
        return val
    def is_empty(self) -> bool: return self._size == 0
    def __len__(self) -> int: return self._size

q: LinkedQueue[str] = LinkedQueue()
for name in ["数据清洗", "embedding生成", "向量存储"]:
    q.enqueue(name)
    print("入队:", name, "队列长度:", len(q))
while not q.is_empty():
    print("出队:", q.dequeue())

Step 4:用 Task + run_scheduler 模拟任务调度队列

痛点与机制

真实系统里任务不会乱跑,通常要按优先级排队。run_scheduler() 先按 priority 排序,再放进队列逐个执行,并记录每个任务的开始和结束时间。

核心源码(逐字来自文末完整源码)

@dataclass
class Task:
    name: str; priority: int; duration_ms: int

def run_scheduler(tasks: list[Task]) -> list[dict]:
    q: LinkedQueue[Task] = LinkedQueue()
    for t in sorted(tasks, key=lambda x: x.priority): q.enqueue(t)
    results, tick = [], 0
    while not q.is_empty():
        task = q.dequeue()
        results.append({"task": task.name, "start": tick, "end": tick + task.duration_ms})
        tick += task.duration_ms
    return results

可运行演示(补齐 Mock 数据与 print 反馈)

from dataclasses import dataclass, field
from typing import Generic, Optional, TypeVar
T = TypeVar("T")
@dataclass
class _Node(Generic[T]):
    val: T
    nxt: Optional["_Node[T]"] = field(default=None, repr=False)

class LinkedQueue(Generic[T]):
    def __init__(self) -> None:
        self._head: Optional[_Node[T]] = None
        self._tail: Optional[_Node[T]] = None
        self._size = 0
    def enqueue(self, item: T) -> None:
        node: _Node[T] = _Node(item)
        if self._tail: self._tail.nxt = node
        else: self._head = node
        self._tail = node
        self._size += 1
    def dequeue(self) -> T:
        if not self._head: raise IndexError("队列为空")
        val = self._head.val
        self._head = self._head.nxt
        if not self._head: self._tail = None
        self._size -= 1
        return val
    def is_empty(self) -> bool: return self._size == 0
    def __len__(self) -> int: return self._size

# Step 4:调度器先按优先级排序,再把任务按队列顺序执行。
@dataclass
class Task:
    name: str; priority: int; duration_ms: int

def run_scheduler(tasks: list[Task]) -> list[dict]:
    q: LinkedQueue[Task] = LinkedQueue()
    for t in sorted(tasks, key=lambda x: x.priority): q.enqueue(t)
    results, tick = [], 0
    while not q.is_empty():
        task = q.dequeue()
        results.append({"task": task.name, "start": tick, "end": tick + task.duration_ms})
        tick += task.duration_ms
    return results

tasks = [
    Task("embedding生成", 2, 800),
    Task("数据清洗", 1, 300),
    Task("向量存储", 3, 200),
]
for row in run_scheduler(tasks):
    print(row)

Step 5:用层序列表 build_tree 搭出一棵二叉树

痛点与机制

二叉树可以先按层写成列表:第一个是根节点,后面依次是左右孩子。build_tree() 用队列保存“还没分配孩子的节点”,像施工队按清单一层一层安装。

核心源码(逐字来自文末完整源码)

@dataclass
class TreeNode:
    val: int
    left: Optional["TreeNode"] = None
    right: Optional["TreeNode"] = None

def build_tree(vals: list[int | None]) -> Optional[TreeNode]:
    if not vals or vals[0] is None: return None
    root = TreeNode(vals[0])
    q: deque[TreeNode] = deque([root])
    i = 1
    while q and i < len(vals):
        node = q.popleft()
        if i < len(vals) and vals[i] is not None:
            node.left = TreeNode(vals[i])  # type: ignore[arg-type]
            q.append(node.left)
        i += 1
        if i < len(vals) and vals[i] is not None:
            node.right = TreeNode(vals[i])  # type: ignore[arg-type]
            q.append(node.right)
        i += 1
    return root

可运行演示(补齐 Mock 数据与 print 反馈)

from collections import deque
from dataclasses import dataclass
from typing import Optional

# Step 5:列表像施工清单,build_tree 按层把节点一个个装到树上。
@dataclass
class TreeNode:
    val: int
    left: Optional["TreeNode"] = None
    right: Optional["TreeNode"] = None

def build_tree(vals: list[int | None]) -> Optional[TreeNode]:
    if not vals or vals[0] is None: return None
    root = TreeNode(vals[0])
    q: deque[TreeNode] = deque([root])
    i = 1
    while q and i < len(vals):
        node = q.popleft()
        if i < len(vals) and vals[i] is not None:
            node.left = TreeNode(vals[i])  # type: ignore[arg-type]
            q.append(node.left)
        i += 1
        if i < len(vals) and vals[i] is not None:
            node.right = TreeNode(vals[i])  # type: ignore[arg-type]
            q.append(node.right)
        i += 1
    return root

root = build_tree([1, 2, 3, 4, 5, None, 6])
print("根节点:", root.val if root else None)
print("左孩子:", root.left.val if root and root.left else None)
print("右孩子:", root.right.val if root and root.right else None)
print("左子树右孩子:", root.left.right.val if root and root.left and root.left.right else None)

Step 6:用四种遍历从不同路线走完整棵树

痛点与机制

树遍历就是“按什么路线看节点”。前序先看自己,中序先看左边再看自己,后序最后看自己,层序则像电梯一层一层扫过去。不同路线服务不同场景。

核心源码(逐字来自文末完整源码)

def preorder(r: Optional[TreeNode]) -> list[int]:
    return [] if not r else [r.val] + preorder(r.left) + preorder(r.right)
def inorder(r: Optional[TreeNode]) -> list[int]:
    return [] if not r else inorder(r.left) + [r.val] + inorder(r.right)
def postorder(r: Optional[TreeNode]) -> list[int]:
    return [] if not r else postorder(r.left) + postorder(r.right) + [r.val]
def levelorder(r: Optional[TreeNode]) -> list[list[int]]:
    if not r: return []
    res, q = [], deque([r])
    while q:
        lvl = []
        for _ in range(len(q)):
            n = q.popleft(); lvl.append(n.val)
            if n.left: q.append(n.left)
            if n.right: q.append(n.right)
        res.append(lvl)
    return res

可运行演示(补齐 Mock 数据与 print 反馈)

from collections import deque
from dataclasses import dataclass
from typing import Optional
@dataclass
class TreeNode:
    val: int
    left: Optional["TreeNode"] = None
    right: Optional["TreeNode"] = None

def build_tree(vals: list[int | None]) -> Optional[TreeNode]:
    if not vals or vals[0] is None: return None
    root = TreeNode(vals[0])
    q: deque[TreeNode] = deque([root])
    i = 1
    while q and i < len(vals):
        node = q.popleft()
        if i < len(vals) and vals[i] is not None:
            node.left = TreeNode(vals[i])  # type: ignore[arg-type]
            q.append(node.left)
        i += 1
        if i < len(vals) and vals[i] is not None:
            node.right = TreeNode(vals[i])  # type: ignore[arg-type]
            q.append(node.right)
        i += 1
    return root

# Step 6:遍历就是参观路线,前中后序和层序看的顺序不同。
def preorder(r: Optional[TreeNode]) -> list[int]:
    return [] if not r else [r.val] + preorder(r.left) + preorder(r.right)
def inorder(r: Optional[TreeNode]) -> list[int]:
    return [] if not r else inorder(r.left) + [r.val] + inorder(r.right)
def postorder(r: Optional[TreeNode]) -> list[int]:
    return [] if not r else postorder(r.left) + postorder(r.right) + [r.val]
def levelorder(r: Optional[TreeNode]) -> list[list[int]]:
    if not r: return []
    res, q = [], deque([r])
    while q:
        lvl = []
        for _ in range(len(q)):
            n = q.popleft(); lvl.append(n.val)
            if n.left: q.append(n.left)
            if n.right: q.append(n.right)
        res.append(lvl)
    return res

root = build_tree([1, 2, 3, 4, 5, None, 6])
print("前序:", preorder(root))
print("中序:", inorder(root))
print("后序:", postorder(root))
print("层序:", levelorder(root))

Step 7:用 quicksort 排序,再用 binary_search 快速定位

痛点与机制

排序和查找是算法入门的主线。quicksort() 选一个基准,把小的放左边、大的放右边,再递归处理;binary_search() 要求数组已排序,每次看中间值,一次排除一半。

核心源码(逐字来自文末完整源码)

def quicksort(arr: list[int]) -> list[int]:
    if len(arr) <= 1: return arr
    p = arr[len(arr)//2]
    return quicksort([x for x in arr if x < p]) + [x for x in arr if x == p] + quicksort([x for x in arr if x > p])

def binary_search(arr: list[int], target: int) -> int:
    lo, hi = 0, len(arr)-1
    while lo <= hi:
        mid = (lo+hi)//2
        if arr[mid] == target: return mid
        elif arr[mid] < target: lo = mid+1
        else: hi = mid-1
    return -1

可运行演示(补齐 Mock 数据与 print 反馈)

# Step 7:快速排序先分区再递归,二分查找每次砍掉一半范围。
def quicksort(arr: list[int]) -> list[int]:
    if len(arr) <= 1: return arr
    p = arr[len(arr)//2]
    return quicksort([x for x in arr if x < p]) + [x for x in arr if x == p] + quicksort([x for x in arr if x > p])

def binary_search(arr: list[int], target: int) -> int:
    lo, hi = 0, len(arr)-1
    while lo <= hi:
        mid = (lo+hi)//2
        if arr[mid] == target: return mid
        elif arr[mid] < target: lo = mid+1
        else: hi = mid-1
    return -1

arr = [42, 7, 13, 99, 5, 13, 28]
sorted_arr = quicksort(arr)
print("原始数组:", arr)
print("排序结果:", sorted_arr)
for target in [13, 50]:
    print(f"查找 {target}:", binary_search(sorted_arr, target))

Step 8:用 demo_* 和 main 做命令行演示入口

痛点与机制

完整脚本不应该让用户改代码切功能,而是用 argparse 做命令行遥控器。--mode stack 看括号匹配,--mode queue 看任务调度,--mode tree 看遍历,--mode sort 看排序查找。

核心源码(逐字来自文末完整源码)

def demo_stack() -> None:
    cases = ["(AI[pipeline]{ok})", "((未闭合", "(错误]匹配)", "def f(x: list[int]) -> None: pass"]
    print(f"\n{'表达式':<40} {'结果':<6} 说明")
    print("─" * 70)
    for expr in cases:
        ok, msg = check_brackets(expr)
        mark = "✓" if ok else "✗"
        print(f"{expr:<40} {mark:<6} {msg}")

def demo_queue() -> None:
    tasks = [
        Task("数据清洗", 1, 300), Task("embedding生成", 2, 800),
        Task("向量存储", 3, 200), Task("语义检索", 2, 150),
    ]
    results = run_scheduler(tasks)
    print(f"\n{'任务':<16} {'开始(ms)':>10} {'结束(ms)':>10} {'耗时(ms)':>10}")
    print("─" * 50)
    for r in results:
        print(f"{r['task']:<16} {r['start']:>10} {r['end']:>10} {r['end']-r['start']:>10}")

def demo_tree() -> None:
    #       1
    #      / \
    #     2   3
    #    / \   \
    #   4   5   6
    root = build_tree([1, 2, 3, 4, 5, None, 6])
    print(f"\n前序: {preorder(root)}")
    print(f"中序: {inorder(root)}")
    print(f"后序: {postorder(root)}")
    print(f"层序: {levelorder(root)}")

def demo_sort() -> None:
    import random
    arr = random.sample(range(1, 100), 15)
    print(f"\n原始: {arr}")
    sorted_arr = quicksort(arr)
    print(f"排序: {sorted_arr}")
    target = sorted_arr[7]
    idx = binary_search(sorted_arr, target)
    print(f"二分查找 {target}: 下标 {idx}")

def main() -> None:
    parser = argparse.ArgumentParser(description="数据结构演示")
    parser.add_argument("--mode", choices=["stack","queue","tree","sort"], default="stack")
    args = parser.parse_args()
    {"stack": demo_stack, "queue": demo_queue, "tree": demo_tree, "sort": demo_sort}[args.mode]()

可运行演示(补齐 Mock 数据与 print 反馈)

import argparse
from collections import deque
from dataclasses import dataclass, field
from typing import Generic, Optional, TypeVar
T = TypeVar("T")
class Stack(Generic[T]):
    def __init__(self) -> None:
        self._data: list[T] = []
    def push(self, item: T) -> None: self._data.append(item)
    def pop(self) -> T:
        if not self._data: raise IndexError("栈为空")
        return self._data.pop()
    def peek(self) -> T:
        if not self._data: raise IndexError("栈为空")
        return self._data[-1]
    def is_empty(self) -> bool: return not self._data
    def __len__(self) -> int: return len(self._data)

@dataclass
class _Node(Generic[T]):
    val: T
    nxt: Optional["_Node[T]"] = field(default=None, repr=False)

class LinkedQueue(Generic[T]):
    def __init__(self) -> None:
        self._head: Optional[_Node[T]] = None
        self._tail: Optional[_Node[T]] = None
        self._size = 0
    def enqueue(self, item: T) -> None:
        node: _Node[T] = _Node(item)
        if self._tail: self._tail.nxt = node
        else: self._head = node
        self._tail = node
        self._size += 1
    def dequeue(self) -> T:
        if not self._head: raise IndexError("队列为空")
        val = self._head.val
        self._head = self._head.nxt
        if not self._head: self._tail = None
        self._size -= 1
        return val
    def is_empty(self) -> bool: return self._size == 0
    def __len__(self) -> int: return self._size

@dataclass
class TreeNode:
    val: int
    left: Optional["TreeNode"] = None
    right: Optional["TreeNode"] = None

def build_tree(vals: list[int | None]) -> Optional[TreeNode]:
    if not vals or vals[0] is None: return None
    root = TreeNode(vals[0])
    q: deque[TreeNode] = deque([root])
    i = 1
    while q and i < len(vals):
        node = q.popleft()
        if i < len(vals) and vals[i] is not None:
            node.left = TreeNode(vals[i])  # type: ignore[arg-type]
            q.append(node.left)
        i += 1
        if i < len(vals) and vals[i] is not None:
            node.right = TreeNode(vals[i])  # type: ignore[arg-type]
            q.append(node.right)
        i += 1
    return root

def preorder(r: Optional[TreeNode]) -> list[int]:
    return [] if not r else [r.val] + preorder(r.left) + preorder(r.right)
def inorder(r: Optional[TreeNode]) -> list[int]:
    return [] if not r else inorder(r.left) + [r.val] + inorder(r.right)
def postorder(r: Optional[TreeNode]) -> list[int]:
    return [] if not r else postorder(r.left) + postorder(r.right) + [r.val]
def levelorder(r: Optional[TreeNode]) -> list[list[int]]:
    if not r: return []
    res, q = [], deque([r])
    while q:
        lvl = []
        for _ in range(len(q)):
            n = q.popleft(); lvl.append(n.val)
            if n.left: q.append(n.left)
            if n.right: q.append(n.right)
        res.append(lvl)
    return res

def quicksort(arr: list[int]) -> list[int]:
    if len(arr) <= 1: return arr
    p = arr[len(arr)//2]
    return quicksort([x for x in arr if x < p]) + [x for x in arr if x == p] + quicksort([x for x in arr if x > p])

def binary_search(arr: list[int], target: int) -> int:
    lo, hi = 0, len(arr)-1
    while lo <= hi:
        mid = (lo+hi)//2
        if arr[mid] == target: return mid
        elif arr[mid] < target: lo = mid+1
        else: hi = mid-1
    return -1

def check_brackets(expr: str) -> tuple[bool, str]:
    stack: Stack[tuple[str, int]] = Stack()
    pairs = {")": "(", "]": "[", "}": "{"}
    for i, ch in enumerate(expr):
        if ch in "([{": stack.push((ch, i))
        elif ch in ")]}":
            if stack.is_empty(): return False, f"位置{i}: '{ch}'无对应左括号"
            top, _ = stack.pop()
            if top != pairs[ch]: return False, f"位置{i}: '{ch}'与'{top}'不匹配"
    if not stack.is_empty():
        _, pos = stack.pop(); return False, f"位置{pos}: 左括号未闭合"
    return True, "合法"

@dataclass
class Task:
    name: str; priority: int; duration_ms: int

def run_scheduler(tasks: list[Task]) -> list[dict]:
    q: LinkedQueue[Task] = LinkedQueue()
    for t in sorted(tasks, key=lambda x: x.priority): q.enqueue(t)
    results, tick = [], 0
    while not q.is_empty():
        task = q.dequeue()
        results.append({"task": task.name, "start": tick, "end": tick + task.duration_ms})
        tick += task.duration_ms
    return results

# Step 8:demo_* 是展台,main 是遥控器,--mode 决定打开哪个展台。
def demo_stack() -> None:
    cases = ["(AI[pipeline]{ok})", "((未闭合", "(错误]匹配)", "def f(x: list[int]) -> None: pass"]
    print(f"\n{'表达式':<40} {'结果':<6} 说明")
    print("─" * 70)
    for expr in cases:
        ok, msg = check_brackets(expr)
        mark = "✓" if ok else "✗"
        print(f"{expr:<40} {mark:<6} {msg}")

def demo_queue() -> None:
    tasks = [
        Task("数据清洗", 1, 300), Task("embedding生成", 2, 800),
        Task("向量存储", 3, 200), Task("语义检索", 2, 150),
    ]
    results = run_scheduler(tasks)
    print(f"\n{'任务':<16} {'开始(ms)':>10} {'结束(ms)':>10} {'耗时(ms)':>10}")
    print("─" * 50)
    for r in results:
        print(f"{r['task']:<16} {r['start']:>10} {r['end']:>10} {r['end']-r['start']:>10}")

def demo_tree() -> None:
    #       1
    #      / \
    #     2   3
    #    / \   \
    #   4   5   6
    root = build_tree([1, 2, 3, 4, 5, None, 6])
    print(f"\n前序: {preorder(root)}")
    print(f"中序: {inorder(root)}")
    print(f"后序: {postorder(root)}")
    print(f"层序: {levelorder(root)}")

def demo_sort() -> None:
    import random
    arr = random.sample(range(1, 100), 15)
    print(f"\n原始: {arr}")
    sorted_arr = quicksort(arr)
    print(f"排序: {sorted_arr}")
    target = sorted_arr[7]
    idx = binary_search(sorted_arr, target)
    print(f"二分查找 {target}: 下标 {idx}")

def main() -> None:
    parser = argparse.ArgumentParser(description="数据结构演示")
    parser.add_argument("--mode", choices=["stack","queue","tree","sort"], default="stack")
    args = parser.parse_args()
    {"stack": demo_stack, "queue": demo_queue, "tree": demo_tree, "sort": demo_sort}[args.mode]()

import sys
for mode in ["stack", "queue", "tree", "sort"]:
    print(f"\n>>> mode={mode}")
    sys.argv = ["prog", "--mode", mode]
    main()

极客实战:完整源码与运行

现在,把上面的积木拼起来,将以下完整代码放进你的编辑器,运行它。先看整体闭环,再回头逐段改参数,你会更容易建立工程直觉。

#!/usr/bin/env python3
"""
核心数据结构演示
用法:
  python3 10-python-algorithms.py --mode stack    # 括号匹配
  python3 10-python-algorithms.py --mode queue    # 任务调度
  python3 10-python-algorithms.py --mode tree     # 二叉树遍历
  python3 10-python-algorithms.py --mode sort     # 排序与查找
"""
import argparse
from collections import deque
from dataclasses import dataclass, field
from typing import Generic, Optional, TypeVar

T = TypeVar("T")

# ── Stack ─────────────────────────────────────────────────────




class Stack(Generic[T]):
    def __init__(self) -> None:
        self._data: list[T] = []
    def push(self, item: T) -> None: self._data.append(item)
    def pop(self) -> T:
        if not self._data: raise IndexError("栈为空")
        return self._data.pop()
    def peek(self) -> T:
        if not self._data: raise IndexError("栈为空")
        return self._data[-1]
    def is_empty(self) -> bool: return not self._data
    def __len__(self) -> int: return len(self._data)

# ── LinkedQueue ───────────────────────────────────────────────
@dataclass
class _Node(Generic[T]):
    val: T
    nxt: Optional["_Node[T]"] = field(default=None, repr=False)

class LinkedQueue(Generic[T]):
    def __init__(self) -> None:
        self._head: Optional[_Node[T]] = None
        self._tail: Optional[_Node[T]] = None
        self._size = 0
    def enqueue(self, item: T) -> None:
        node: _Node[T] = _Node(item)
        if self._tail: self._tail.nxt = node
        else: self._head = node
        self._tail = node
        self._size += 1
    def dequeue(self) -> T:
        if not self._head: raise IndexError("队列为空")
        val = self._head.val
        self._head = self._head.nxt
        if not self._head: self._tail = None
        self._size -= 1
        return val
    def is_empty(self) -> bool: return self._size == 0
    def __len__(self) -> int: return self._size

# ── TreeNode ──────────────────────────────────────────────────
@dataclass
class TreeNode:
    val: int
    left: Optional["TreeNode"] = None
    right: Optional["TreeNode"] = None

def build_tree(vals: list[int | None]) -> Optional[TreeNode]:
    if not vals or vals[0] is None: return None
    root = TreeNode(vals[0])
    q: deque[TreeNode] = deque([root])
    i = 1
    while q and i < len(vals):
        node = q.popleft()
        if i < len(vals) and vals[i] is not None:
            node.left = TreeNode(vals[i])  # type: ignore[arg-type]
            q.append(node.left)
        i += 1
        if i < len(vals) and vals[i] is not None:
            node.right = TreeNode(vals[i])  # type: ignore[arg-type]
            q.append(node.right)
        i += 1
    return root

def preorder(r: Optional[TreeNode]) -> list[int]:
    return [] if not r else [r.val] + preorder(r.left) + preorder(r.right)
def inorder(r: Optional[TreeNode]) -> list[int]:
    return [] if not r else inorder(r.left) + [r.val] + inorder(r.right)
def postorder(r: Optional[TreeNode]) -> list[int]:
    return [] if not r else postorder(r.left) + postorder(r.right) + [r.val]
def levelorder(r: Optional[TreeNode]) -> list[list[int]]:
    if not r: return []
    res, q = [], deque([r])
    while q:
        lvl = []
        for _ in range(len(q)):
            n = q.popleft(); lvl.append(n.val)
            if n.left: q.append(n.left)
            if n.right: q.append(n.right)
        res.append(lvl)
    return res

def quicksort(arr: list[int]) -> list[int]:
    if len(arr) <= 1: return arr
    p = arr[len(arr)//2]
    return quicksort([x for x in arr if x < p]) + [x for x in arr if x == p] + quicksort([x for x in arr if x > p])

def binary_search(arr: list[int], target: int) -> int:
    lo, hi = 0, len(arr)-1
    while lo <= hi:
        mid = (lo+hi)//2
        if arr[mid] == target: return mid
        elif arr[mid] < target: lo = mid+1
        else: hi = mid-1
    return -1

def check_brackets(expr: str) -> tuple[bool, str]:
    stack: Stack[tuple[str, int]] = Stack()
    pairs = {")": "(", "]": "[", "}": "{"}
    for i, ch in enumerate(expr):
        if ch in "([{": stack.push((ch, i))
        elif ch in ")]}":
            if stack.is_empty(): return False, f"位置{i}: '{ch}'无对应左括号"
            top, _ = stack.pop()
            if top != pairs[ch]: return False, f"位置{i}: '{ch}'与'{top}'不匹配"
    if not stack.is_empty():
        _, pos = stack.pop(); return False, f"位置{pos}: 左括号未闭合"
    return True, "合法"

@dataclass
class Task:
    name: str; priority: int; duration_ms: int

def run_scheduler(tasks: list[Task]) -> list[dict]:
    q: LinkedQueue[Task] = LinkedQueue()
    for t in sorted(tasks, key=lambda x: x.priority): q.enqueue(t)
    results, tick = [], 0
    while not q.is_empty():
        task = q.dequeue()
        results.append({"task": task.name, "start": tick, "end": tick + task.duration_ms})
        tick += task.duration_ms
    return results

# ── 演示 ──────────────────────────────────────────────────────
def demo_stack() -> None:
    cases = ["(AI[pipeline]{ok})", "((未闭合", "(错误]匹配)", "def f(x: list[int]) -> None: pass"]
    print(f"\n{'表达式':<40} {'结果':<6} 说明")
    print("─" * 70)
    for expr in cases:
        ok, msg = check_brackets(expr)
        mark = "✓" if ok else "✗"
        print(f"{expr:<40} {mark:<6} {msg}")

def demo_queue() -> None:
    tasks = [
        Task("数据清洗", 1, 300), Task("embedding生成", 2, 800),
        Task("向量存储", 3, 200), Task("语义检索", 2, 150),
    ]
    results = run_scheduler(tasks)
    print(f"\n{'任务':<16} {'开始(ms)':>10} {'结束(ms)':>10} {'耗时(ms)':>10}")
    print("─" * 50)
    for r in results:
        print(f"{r['task']:<16} {r['start']:>10} {r['end']:>10} {r['end']-r['start']:>10}")

def demo_tree() -> None:
    #       1
    #      / \
    #     2   3
    #    / \   \
    #   4   5   6
    root = build_tree([1, 2, 3, 4, 5, None, 6])
    print(f"\n前序: {preorder(root)}")
    print(f"中序: {inorder(root)}")
    print(f"后序: {postorder(root)}")
    print(f"层序: {levelorder(root)}")

def demo_sort() -> None:
    import random
    arr = random.sample(range(1, 100), 15)
    print(f"\n原始: {arr}")
    sorted_arr = quicksort(arr)
    print(f"排序: {sorted_arr}")
    target = sorted_arr[7]
    idx = binary_search(sorted_arr, target)
    print(f"二分查找 {target}: 下标 {idx}")

def main() -> None:
    parser = argparse.ArgumentParser(description="数据结构演示")
    parser.add_argument("--mode", choices=["stack","queue","tree","sort"], default="stack")
    args = parser.parse_args()
    {"stack": demo_stack, "queue": demo_queue, "tree": demo_tree, "sort": demo_sort}[args.mode]()

if __name__ == "__main__":
    main()

运行后你会看到类似输出:

$ python3 10-python-algorithms.py
表达式                                      结果     说明
──────────────────────────────────────────────────────────────────────
(AI[pipeline]{ok})                       ✓      合法
((未闭合                                    ✗      位置1: 左括号未闭合
(错误]匹配)                                  ✗      位置3: ']''('不匹配
def f(x: list[int]) -> None: pass        ✓      合法

复杂度速查

结构 插入 删除 查找
顺序栈 O(1) O(1) O(n)
链式队列 O(1) O(1) O(n)
二叉树(平衡) O(log n) O(log n) O(log n)
快速排序 O(n log n) 均摊
二分查找 O(log n)


终端预期输出:

$ python3 demo.py --mode all
  ✅ 演示完成,详见上方代码注释

⏱ NexDo Time · 5 分钟微操

  1. Stack 加一个 __repr__ 方法,输出 Stack([1, 2, 3] top→)
  2. 修改 run_scheduler,让相同优先级的任务按名称字母序排列(稳定调度)。
  3. demo_tree 中打印一棵 ASCII 树形图(只需支持三层,用空格缩进表示层级)。

Don’t wait for next time, do it in the next moment.