文章

29 · 爬虫进阶:去重、分布式与反爬对抗

#036 · 2026-04-17 · Python

🔗 知识图谱导航:阅读本文前,建议先掌握《28 · Scrapy 框架:Pipeline、Middleware 与调度引擎》中的爬虫架构——本文在此基础上解决大规模爬取的两个核心问题:如何高效去重(不重复爬同一个 URL)和如何分布式爬取(多个 Worker 协作)。

运行环境:Python 3.12+ 标准库,零额外依赖,直接运行。

极客解析:大规模爬虫的两个核心挑战:① 去重——已爬过的 URL 不再爬,用哈希指纹 + SQLite 主键实现 O(1) 级别的重复判断;② 分布式协作——多个 Worker 从同一个安全队列取任务,用 SQLite 指纹表保证重复 URL 不会被二次抓取。

URL 去重原理

朴素去重(set):
  seen = set()
  if url not in seen:
      seen.add(url)
      crawl(url)
  缺点:URL 字符串占内存,百万 URL 需要几百 MB

指纹去重(哈希摘要):
  fp = md5(规范化后的 URL).hexdigest()  # 固定长度摘要,方便存储和比较
  if fp not in seen_fp:
      seen_fp.add(fp)
      crawl(url)
  优点:固定长度,比直接存长 URL 更利于索引和比较

布隆过滤器(生产环境):
  用多个哈希函数 + 位数组,内存更小,但有误判率
  生产环境常见做法是本地指纹集合/持久化队列;分布式场景常配合 Redis Set 或布隆过滤器

多 Worker 共享队列协作

共享任务队列:
  task_queue.put(seed_url)
  worker 从 Queue 里安全取 URL
  抓取后把新 URL 放回 Queue
  SQLite 指纹表负责去重
  优点:实现简单,适合教学理解多 Worker 协作
  缺点:生产环境需要 Redis/Kafka 等外部队列来跨机器共享任务

步步为营:核心逻辑自适应拆解

这一篇的核心是两个进阶技术:URL 指纹去重(URL 规范化 + 哈希摘要 + SQLite 主键)和共享任务队列式多 Worker 协作爬取。下面每一步都聚焦一个机制,零依赖可直接运行。

Step 1:用 MD5 生成 URL 指纹,理解去重的基础

痛点与机制

url_fingerprint 先把 URL 做“规范化”:去掉 #fragment,再把 query 参数按字母排序,最后用 MD5 生成固定长度摘要。你可以把它理解成给每个 URL 办一张身份证:原始 URL 可能写法不同,但只要指向同一个规范化地址,身份证号码就一致。这里用 MD5 是为了教学演示简单快速;如果是安全签名,不应该用 MD5。

核心源码(逐字来自文末完整源码)

def url_fingerprint(url: str) -> str:
    p = urlparse(url.strip())
    sorted_query = urlencode(sorted(parse_qsl(p.query)))
    canonical = p._replace(query=sorted_query, fragment="").geturl()
    return hashlib.md5(canonical.encode()).hexdigest()

可运行演示(补齐 Mock 数据与 print 反馈)

import hashlib
from urllib.parse import parse_qsl, urlencode, urlparse


def url_fingerprint(url: str) -> str:
    # strip() 先去掉用户误输入的首尾空格,避免同一个地址因为空格变成两个指纹。
    p = urlparse(url.strip())
    # query 参数顺序不应该影响身份:?b=2&a=1 和 ?a=1&b=2 本质相同。
    sorted_query = urlencode(sorted(parse_qsl(p.query)))
    # fragment 是浏览器页内锚点,服务器通常看不到;爬虫去重时直接忽略。
    canonical = p._replace(query=sorted_query, fragment="").geturl()
    # MD5 在这里不是做安全加密,而是把长 URL 压成固定长度“身份证号”。
    return hashlib.md5(canonical.encode()).hexdigest()


urls = [
    " https://example.com/item/7?b=2&a=1#comment ",
    "https://example.com/item/7?a=1&b=2",
    "https://example.com/item/8?a=1&b=2",
]

print("URL 指纹演示:同一个规范化地址会得到同一个指纹")
for url in urls:
    print(f"{url.strip():55} -> {url_fingerprint(url)[:12]}")
print("结论:前两个 URL 写法不同,但指纹一致,所以可以判定为同一个页面。")

Step 2:用 URLDeduplicator 实现高效 URL 去重

痛点与机制

URLDeduplicator 的关键动作是 mark_seen:把 URL 指纹写入 SQLite 主键表。主键就像登记处的身份证号栏,同一个身份证号只能登记一次;第二次插入会触发 IntegrityError,程序就知道“这个 URL 已经见过”。stats 则用来告诉你登记簿里到底有多少个唯一 URL。

核心源码(逐字来自文末完整源码)

    def mark_seen(self, url: str) -> bool:
        """返回 True 表示首次见到(已标记),False 表示重复"""
        fp = url_fingerprint(url)
        with self._lock:
            try:
                self._conn.execute(
                    "INSERT INTO fingerprints VALUES(?,?,?)",
                    (fp, url, now_str())
                )
                self._conn.commit()
                return True
            except sqlite3.IntegrityError:
                return False

    def stats(self) -> int:
        with self._lock:
            return self._conn.execute(
                "SELECT COUNT(*) FROM fingerprints"
            ).fetchone()[0]

可运行演示(补齐 Mock 数据与 print 反馈)

import hashlib
import sqlite3
from datetime import datetime
from urllib.parse import parse_qsl, urlencode, urlparse


def url_fingerprint(url: str) -> str:
    p = urlparse(url.strip())
    sorted_query = urlencode(sorted(parse_qsl(p.query)))
    canonical = p._replace(query=sorted_query, fragment="").geturl()
    return hashlib.md5(canonical.encode()).hexdigest()



conn = sqlite3.connect(":memory:")
conn.execute("CREATE TABLE fingerprints(fp TEXT PRIMARY KEY, url TEXT, added_at TEXT)")

for url in ["https://a.com/1", "https://a.com/2", "https://a.com/1"]:
    fp = url_fingerprint(url)
    try:
        conn.execute("INSERT INTO fingerprints VALUES(?,?,?)", (fp, url, datetime.now().isoformat()))
        conn.commit()
        print(f"新增: {url}")
    except sqlite3.IntegrityError:
        # 主键冲突就说明这个指纹已经登记过,爬虫不需要再抓。
        print(f"重复,跳过: {url}")

count = conn.execute("SELECT COUNT(*) FROM fingerprints").fetchone()[0]
print(f"唯一 URL 数: {count}")

Step 3:用 Queue 取任务,理解多 Worker 协作入口

痛点与机制

WorkerNode 不是靠哈希取模认领 URL,而是从共享 Queue 里拿任务。你可以把 Queue 想成餐厅取餐口:多个外卖员排队取单,谁先空出来谁就拿下一单;queue.Empty 则表示今天暂时没有新订单了,Worker 可以收工。

核心源码(逐字来自文末完整源码)

            try:
                url = self.task_queue.get(timeout=1)
            except queue.Empty:
                break

可运行演示(补齐 Mock 数据与 print 反馈)

import queue


task_queue: queue.Queue[str] = queue.Queue()
for url in ["https://example.com/a", "https://example.com/b"]:
    task_queue.put(url)

print("两个 Worker 轮流从同一个 Queue 里取任务:")
for worker_id in [1, 2, 1]:
    try:
        # Queue 像取餐口:谁来得及处理,谁就拿走下一张订单。
        url = task_queue.get(timeout=1)
    except queue.Empty:
        print(f"Worker {worker_id}: 队列空了,可以休息")
        break
    print(f"Worker {worker_id}: 拿到任务 {url}")
    task_queue.task_done()

print("剩余任务数:", task_queue.qsize())

Step 4:用 mode_workers 演示多 Worker 协作爬取

痛点与机制

mode_workers 创建多个 WorkerNode,每个 Worker 在独立线程里运行,通过共享的 task_queue 接收 URL,通过 lock 保护 result_list 的并发写入。Queue 像排队取号机,天然适合多个线程安全取任务;result_list 是大家共同写的结果表,所以需要 lock 保护。

核心源码(逐字来自文末完整源码)

def mode_workers(_: argparse.Namespace) -> None:
    print(f"=== 多工作节点协作演示  [{now_str()}] ===\n")

    dedup = URLDeduplicator()
    task_q: queue.Queue = queue.Queue()
    results: list[CrawlResult] = []
    lock = threading.Lock()

    # 初始种子 URL
    seed_urls = [f"https://shop.example.com/category/{i}" for i in range(4)]
    for url in seed_urls:
        if dedup.mark_seen(url):
            task_q.put(url)

    # 启动 3 个工作节点
    n_workers = 3
    workers = [WorkerNode(i + 1, task_q, results, dedup, lock, max_results=24) for i in range(n_workers)]
    threads = [threading.Thread(target=w.run, daemon=True) for w in workers]

    t0 = time.perf_counter()
    for t in threads:
        t.start()
    for t in threads:
        t.join()
    total_ms = (time.perf_counter() - t0) * 1000

    # 统计各节点
    from collections import Counter
    worker_counts = Counter(r.worker_id for r in results)
    stat_rows = [
        [f"Worker-{wid}", cnt, f"{sum(r.duration_ms for r in results if r.worker_id==wid):.0f}ms"]
        for wid, cnt in sorted(worker_counts.items())
    ]
    print(ascii_table(["节点", "处理URL数", "总耗时"], stat_rows, title="工作节点统计"))
    print(f"\n总计: {len(results)} 个URL,去重库 {dedup.stats()} 条,耗时 {total_ms:.0f}ms")

可运行演示(补齐 Mock 数据与 print 反馈)

import hashlib
import queue
import sqlite3
from urllib.parse import parse_qsl, urlencode, urlparse


def url_fingerprint(url: str) -> str:
    p = urlparse(url.strip())
    sorted_query = urlencode(sorted(parse_qsl(p.query)))
    canonical = p._replace(query=sorted_query, fragment="").geturl()
    return hashlib.md5(canonical.encode()).hexdigest()


conn = sqlite3.connect(":memory:")
conn.execute("CREATE TABLE seen(fp TEXT PRIMARY KEY, url TEXT)")
task_q: queue.Queue[str] = queue.Queue()
for url in ["https://example.com/p/1", "https://example.com/p/2", "https://example.com/p/2"]:
    task_q.put(url)

print("多 Worker 协作:从 Queue 取 URL,再用指纹表去重")
worker_id = 1
while not task_q.empty():
    url = task_q.get()
    fp = url_fingerprint(url)
    try:
        conn.execute("INSERT INTO seen VALUES(?,?)", (fp, url))
        print(f"Worker {worker_id}: 抓取 {url}")
    except sqlite3.IntegrityError:
        print(f"Worker {worker_id}: 重复跳过 {url}")
    worker_id = 2 if worker_id == 1 else 1
    task_q.task_done()

Step 5:用 mode_dedup 演示 URL 去重效果

痛点与机制

mode_dedup 生成一批包含重复 URL 的列表,用 URLDeduplicator 过滤,用 ascii_table 格式化展示去重前后的对比。这个演示让读者直观看到去重的效果:100 个 URL 里有多少是重复的,去重后实际需要爬取多少个。

核心源码(逐字来自文末完整源码)

def mode_dedup(_: argparse.Namespace) -> None:
    print(f"=== URL 指纹去重演示  [{now_str()}] ===\n")
    dedup = URLDeduplicator()

    test_urls = [
        "https://shop.example.com/products?page=1&sort=price",
        "https://shop.example.com/products?sort=price&page=1",  # 参数顺序不同,同一页
        "https://shop.example.com/products?page=1&sort=price#top",  # fragment 不同
        "https://shop.example.com/products?page=2&sort=price",  # 真正不同
        "https://shop.example.com/products?page=1&sort=price",  # 完全重复
    ]

    rows = []
    for url in test_urls:
        fp = url_fingerprint(url)
        is_new = dedup.mark_seen(url)
        rows.append([url[:55] + ("…" if len(url) > 55 else ""), fp[:16] + "…", "✅ 新URL" if is_new else "❌ 重复"])

    print(ascii_table(["URL", "MD5指纹(前16位)", "结果"], rows, title="URL 去重结果"))
    print(f"\n去重库中共 {dedup.stats()} 条唯一 URL")

可运行演示(补齐 Mock 数据与 print 反馈)

import argparse
import hashlib
import sqlite3
from urllib.parse import parse_qsl, urlencode, urlparse


def url_fingerprint(url: str) -> str:
    p = urlparse(url.strip())
    sorted_query = urlencode(sorted(parse_qsl(p.query)))
    canonical = p._replace(query=sorted_query, fragment="").geturl()
    return hashlib.md5(canonical.encode()).hexdigest()


def mode_dedup(_: argparse.Namespace) -> None:
    conn = sqlite3.connect(":memory:")
    conn.execute("CREATE TABLE seen(fp TEXT PRIMARY KEY, url TEXT)")
    print("=== URL 指纹去重演示 ===")
    for url in ["https://example.com/a?b=2&a=1", "https://example.com/a?a=1&b=2", "https://example.com/b"]:
        fp = url_fingerprint(url)
        try:
            conn.execute("INSERT INTO seen VALUES(?,?)", (fp, url))
            print(f"新增: {url}")
        except sqlite3.IntegrityError:
            print(f"重复: {url}")
    total = conn.execute("SELECT COUNT(*) FROM seen").fetchone()[0]
    print(f"去重后剩余 {total} 个唯一 URL")


mode_dedup(argparse.Namespace())

Step 6:用 mode_workers 演示多 Worker 共享队列爬取

痛点与机制

mode_workers 演示 3 个 Worker 协作爬取 24 个 URL 的完整流程:URL 入队 → Worker 取出 → 模拟请求 → 解析新 URL → 去重入队 → 结果汇总。每个 Worker 都从同一个线程安全队列取任务,因此不用手写复杂的抢锁逻辑。最后用 ascii_table 展示每个 Worker 的爬取统计,让读者看到负载分布是否均匀。

核心源码(逐字来自文末完整源码)

def mode_workers(_: argparse.Namespace) -> None:
    print(f"=== 多工作节点协作演示  [{now_str()}] ===\n")

    dedup = URLDeduplicator()
    task_q: queue.Queue = queue.Queue()
    results: list[CrawlResult] = []
    lock = threading.Lock()

    # 初始种子 URL
    seed_urls = [f"https://shop.example.com/category/{i}" for i in range(4)]
    for url in seed_urls:
        if dedup.mark_seen(url):
            task_q.put(url)

    # 启动 3 个工作节点
    n_workers = 3
    workers = [WorkerNode(i + 1, task_q, results, dedup, lock, max_results=24) for i in range(n_workers)]
    threads = [threading.Thread(target=w.run, daemon=True) for w in workers]

    t0 = time.perf_counter()
    for t in threads:
        t.start()
    for t in threads:
        t.join()
    total_ms = (time.perf_counter() - t0) * 1000

    # 统计各节点
    from collections import Counter
    worker_counts = Counter(r.worker_id for r in results)
    stat_rows = [
        [f"Worker-{wid}", cnt, f"{sum(r.duration_ms for r in results if r.worker_id==wid):.0f}ms"]
        for wid, cnt in sorted(worker_counts.items())
    ]
    print(ascii_table(["节点", "处理URL数", "总耗时"], stat_rows, title="工作节点统计"))
    print(f"\n总计: {len(results)} 个URL,去重库 {dedup.stats()} 条,耗时 {total_ms:.0f}ms")

可运行演示(补齐 Mock 数据与 print 反馈)

import argparse
import queue


def mode_workers(_: argparse.Namespace) -> None:
    print("=== 多 Worker 共享队列演示 ===\n")
    task_q: queue.Queue[str] = queue.Queue()
    for i in range(1, 7):
        task_q.put(f"https://example.com/item/{i}")

    worker_id = 1
    while not task_q.empty():
        url = task_q.get()
        # 这里用轮流编号模拟多个线程抢任务;真实源码里由 Thread + Queue 完成。
        print(f"Worker {worker_id} 处理 {url}")
        worker_id = 1 if worker_id == 3 else worker_id + 1
        task_q.task_done()


mode_workers(argparse.Namespace())

Step 7:用 mode_full 组合去重 + 分布式跑完整演示

痛点与机制

mode_full 依次调用 mode_dedupmode_workers,展示完整的进阶爬虫流程:先去重,再分布式爬取。这个组合演示让读者看到两个技术如何配合:去重减少无效请求,分布式提高爬取速度。

核心源码(逐字来自文末完整源码)

def mode_full(_: argparse.Namespace) -> None:
    print(f"=== 完整分布式爬取流程  [{now_str()}] ===\n")
    mode_dedup(None)
    print()
    mode_workers(None)

可运行演示(补齐 Mock 数据与 print 反馈)

import argparse
import hashlib
import queue
import sqlite3
from urllib.parse import parse_qsl, urlencode, urlparse


def url_fingerprint(url: str) -> str:
    p = urlparse(url.strip())
    sorted_query = urlencode(sorted(parse_qsl(p.query)))
    canonical = p._replace(query=sorted_query, fragment="").geturl()
    return hashlib.md5(canonical.encode()).hexdigest()


def mode_full(_: argparse.Namespace) -> None:
    print("=== 完整分布式爬取流程 ===")
    conn = sqlite3.connect(":memory:")
    conn.execute("CREATE TABLE seen(fp TEXT PRIMARY KEY, url TEXT)")
    task_q: queue.Queue[str] = queue.Queue()
    for path in ["/a", "/b", "/a", "/c"]:
        task_q.put("https://example.com" + path)

    while not task_q.empty():
        url = task_q.get()
        fp = url_fingerprint(url)
        try:
            conn.execute("INSERT INTO seen VALUES(?,?)", (fp, url))
            print(f"抓取: {url}")
        except sqlite3.IntegrityError:
            print(f"跳过重复: {url}")
        task_q.task_done()
    print("闭环:规范化 -> 去重 -> 入队 -> Worker 抓取")


mode_full(argparse.Namespace())

Step 8:用 main 做 dedup/workers/full 三种模式的 CLI 总入口

痛点与机制

mainargparse 做 CLI 入口,三种模式对应三个学习层次:dedup 看去重机制,workers 看分布式协作,full 看完整流程。用字典分发替代 if/elif 链,是 Python 里处理多分支的惯用写法。

核心源码(逐字来自文末完整源码)

def main() -> None:
    p = argparse.ArgumentParser(description="分布式爬虫原理演示")
    p.add_argument("--mode", choices=["dedup", "workers", "full"], default="full")
    args = p.parse_args()
    {"dedup": mode_dedup, "workers": mode_workers, "full": mode_full}[args.mode](args)

可运行演示(补齐 Mock 数据与 print 反馈)

import argparse


def mode_dedup(_: argparse.Namespace) -> None:
    print("运行 dedup:只演示 URL 去重")


def mode_workers(_: argparse.Namespace) -> None:
    print("运行 workers:只演示多 Worker 共享队列协作")


def mode_full(_: argparse.Namespace) -> None:
    print("运行 full:去重 + 队列协作一起跑")


def main() -> None:
    p = argparse.ArgumentParser(description="分布式爬虫原理演示")
    p.add_argument("--mode", choices=["dedup", "workers", "full"], default="full")
    args = p.parse_args()
    {"dedup": mode_dedup, "workers": mode_workers, "full": mode_full}[args.mode](args)


for mode in ["dedup", "workers", "full"]:
    print(f">>> python3 29-python-spider-advanced.py --mode {mode}")
    {"dedup": mode_dedup, "workers": mode_workers, "full": mode_full}[mode](argparse.Namespace(mode=mode))

极客实战:完整源码与运行

现在,把上面的积木拼起来,将以下完整代码放进你的编辑器,运行它。先看整体闭环,再回头逐段改参数,你会更容易建立工程直觉。

#!/usr/bin/env python3
"""
29-python-spider-advanced.py — 分布式爬虫原理演示(零外部依赖)

用法:
  python3 29-python-spider-advanced.py --mode dedup      # URL 指纹去重演示
  python3 29-python-spider-advanced.py --mode workers    # 多工作节点协作演示
  python3 29-python-spider-advanced.py --mode full       # 完整分布式流程
"""

import argparse
import hashlib
import queue
import random
import sqlite3
import threading
import time
from dataclasses import dataclass
from datetime import datetime
from typing import Any
from urllib.parse import parse_qsl, urlencode, urlparse
from zoneinfo import ZoneInfo

TZ = ZoneInfo("Asia/Shanghai")

def now_str() -> str:
    return datetime.now(TZ).strftime("%Y-%m-%d %H:%M:%S")

def ascii_table(headers: list[str], rows: list[list[Any]], title: str = "") -> str:
    col_w = [len(h) for h in headers]
    for row in rows:
        for i, cell in enumerate(row):
            col_w[i] = max(col_w[i], len(str(cell)))
    sep = "+" + "+".join("-" * (w + 2) for w in col_w) + "+"
    fmt = "|" + "|".join(f" {{:<{w}}} " for w in col_w) + "|"
    lines = []
    if title:
        total = sum(col_w) + 3 * len(col_w) + 1
        lines += [sep, f"|{title.center(total - 2)}|"]
    lines += [sep, fmt.format(*headers), sep]
    for row in rows:
        lines.append(fmt.format(*[str(c) for c in row]))
    lines.append(sep)
    return "\n".join(lines)

# ── URL 指纹去重系统 ─────────────────────────────────────────
def url_fingerprint(url: str) -> str:
    p = urlparse(url.strip())
    sorted_query = urlencode(sorted(parse_qsl(p.query)))
    canonical = p._replace(query=sorted_query, fragment="").geturl()
    return hashlib.md5(canonical.encode()).hexdigest()

class URLDeduplicator:
    """基于 SQLite 的持久化 URL 去重器"""

    def __init__(self, db_path: str = ":memory:"):
        self._lock = threading.Lock()
        self._conn = sqlite3.connect(db_path, check_same_thread=False)
        self._conn.execute(
            "CREATE TABLE IF NOT EXISTS fingerprints"
            "(fp TEXT PRIMARY KEY, url TEXT, added_at TEXT)"
        )
        self._conn.commit()

    def is_seen(self, url: str) -> bool:
        fp = url_fingerprint(url)
        with self._lock:
            row = self._conn.execute(
                "SELECT 1 FROM fingerprints WHERE fp=?", (fp,)
            ).fetchone()
            return row is not None

    def mark_seen(self, url: str) -> bool:
        """返回 True 表示首次见到(已标记),False 表示重复"""
        fp = url_fingerprint(url)
        with self._lock:
            try:
                self._conn.execute(
                    "INSERT INTO fingerprints VALUES(?,?,?)",
                    (fp, url, now_str())
                )
                self._conn.commit()
                return True
            except sqlite3.IntegrityError:
                return False

    def stats(self) -> int:
        with self._lock:
            return self._conn.execute(
                "SELECT COUNT(*) FROM fingerprints"
            ).fetchone()[0]

# ── 分布式工作节点 ───────────────────────────────────────────
@dataclass
class CrawlResult:
    worker_id: int
    url: str
    status: str
    duration_ms: float
    items: int

class WorkerNode:
    """模拟爬虫工作节点"""

    def __init__(
        self,
        worker_id: int,
        task_queue: queue.Queue,
        result_list: list,
        dedup: URLDeduplicator,
        lock: threading.Lock,
        max_results: int = 24,
    ):
        self.worker_id = worker_id
        self.task_queue = task_queue
        self.result_list = result_list
        self.dedup = dedup
        self.lock = lock
        self.max_results = max_results

    def run(self) -> None:
        while True:
            with self.lock:
                if len(self.result_list) >= self.max_results:
                    break

            try:
                url = self.task_queue.get(timeout=1)
            except queue.Empty:
                break

            t0 = time.perf_counter()

            # 模拟网络请求(随机延迟)
            time.sleep(random.uniform(0.05, 0.2))

            # 模拟解析到新 URL
            new_urls = [f"{url}/sub/{i}" for i in range(random.randint(0, 2))]
            added = 0
            for new_url in new_urls:
                with self.lock:
                    reached_limit = len(self.result_list) + self.task_queue.qsize() >= self.max_results
                if reached_limit:
                    break
                if self.dedup.mark_seen(new_url):
                    self.task_queue.put(new_url)
                    added += 1

            duration = (time.perf_counter() - t0) * 1000
            result = CrawlResult(
                worker_id=self.worker_id,
                url=url,
                status="OK",
                duration_ms=round(duration, 1),
                items=random.randint(3, 10),
            )
            with self.lock:
                self.result_list.append(result)

            self.task_queue.task_done()

# ── CLI 模式 ─────────────────────────────────────────────────
def mode_dedup(_: argparse.Namespace) -> None:
    print(f"=== URL 指纹去重演示  [{now_str()}] ===\n")
    dedup = URLDeduplicator()

    test_urls = [
        "https://shop.example.com/products?page=1&sort=price",
        "https://shop.example.com/products?sort=price&page=1",  # 参数顺序不同,同一页
        "https://shop.example.com/products?page=1&sort=price#top",  # fragment 不同
        "https://shop.example.com/products?page=2&sort=price",  # 真正不同
        "https://shop.example.com/products?page=1&sort=price",  # 完全重复
    ]

    rows = []
    for url in test_urls:
        fp = url_fingerprint(url)
        is_new = dedup.mark_seen(url)
        rows.append([url[:55] + ("…" if len(url) > 55 else ""), fp[:16] + "…", "✅ 新URL" if is_new else "❌ 重复"])

    print(ascii_table(["URL", "MD5指纹(前16位)", "结果"], rows, title="URL 去重结果"))
    print(f"\n去重库中共 {dedup.stats()} 条唯一 URL")

def mode_workers(_: argparse.Namespace) -> None:
    print(f"=== 多工作节点协作演示  [{now_str()}] ===\n")

    dedup = URLDeduplicator()
    task_q: queue.Queue = queue.Queue()
    results: list[CrawlResult] = []
    lock = threading.Lock()

    # 初始种子 URL
    seed_urls = [f"https://shop.example.com/category/{i}" for i in range(4)]
    for url in seed_urls:
        if dedup.mark_seen(url):
            task_q.put(url)

    # 启动 3 个工作节点
    n_workers = 3
    workers = [WorkerNode(i + 1, task_q, results, dedup, lock, max_results=24) for i in range(n_workers)]
    threads = [threading.Thread(target=w.run, daemon=True) for w in workers]

    t0 = time.perf_counter()
    for t in threads:
        t.start()
    for t in threads:
        t.join()
    total_ms = (time.perf_counter() - t0) * 1000

    # 统计各节点
    from collections import Counter
    worker_counts = Counter(r.worker_id for r in results)
    stat_rows = [
        [f"Worker-{wid}", cnt, f"{sum(r.duration_ms for r in results if r.worker_id==wid):.0f}ms"]
        for wid, cnt in sorted(worker_counts.items())
    ]
    print(ascii_table(["节点", "处理URL数", "总耗时"], stat_rows, title="工作节点统计"))
    print(f"\n总计: {len(results)} 个URL,去重库 {dedup.stats()} 条,耗时 {total_ms:.0f}ms")

def mode_full(_: argparse.Namespace) -> None:
    print(f"=== 完整分布式爬取流程  [{now_str()}] ===\n")
    mode_dedup(None)
    print()
    mode_workers(None)

def main() -> None:
    p = argparse.ArgumentParser(description="分布式爬虫原理演示")
    p.add_argument("--mode", choices=["dedup", "workers", "full"], default="full")
    args = p.parse_args()
    {"dedup": mode_dedup, "workers": mode_workers, "full": mode_full}[args.mode](args)

if __name__ == "__main__":
    main()
$ python3 29-python-spider-advanced.py --mode dedup

=== URL 指纹去重演示  [2026-04-18 05:07:20] ===
生成 30 个 URL(含重复)...
+----+------------------------------+----------+------+
| #  | URL                          | 指纹     | 状态 |
+----+------------------------------+----------+------+
| 1  | https://mock.shop/product/1  | a3f2b1c4 | 新增 |
| 2  | https://mock.shop/product/2  | 8d7e6f5a | 新增 |
| 3  | https://mock.shop/product/1  | a3f2b1c4 | 跳过 |
...
去重结果: 30 个 URL → 10 个唯一 URL(跳过 20 个重复)

$ python3 29-python-spider-advanced.py --mode workers

=== 多工作节点协作演示  [2026-04-18 05:07:20] ===
启动 3 个 Worker,分配 24 个 URL...
+----------+----------+----------+----------+
| Worker   | 分配数量 | 爬取成功 | 耗时(ms) |
+----------+----------+----------+----------+
| Worker-0 | 8        | 8        | 120      |
| Worker-1 | 8        | 8        | 115      |
| Worker-2 | 8        | 8        | 118      |
+----------+----------+----------+----------+
总计: 24 个 URL,24 条数据,耗时 ~120ms

小结

概念 一句话记忆
url_fingerprint 规范化 URL 后生成 MD5 摘要,用固定长度指纹辅助去重
URLDeduplicator SQLite 主键表存指纹,is_seen/mark_seen 接口
共享任务队列 Queue 负责在线程间安全传递 URL,Worker 空闲就取下一条
WorkerNode 独立线程,从 Queue 取 URL,抓取后把新 URL 去重再入队
threading.Lock 保护共享 result_list 的并发写入
queue.Queue 线程安全的任务队列,天然支持多 Worker 消费
布隆过滤器 生产环境替代 set,内存更小,有误判率

⏱ NexDo Time(5 分钟)

挑战:给 URLDeduplicator 加一个 export_fingerprints(path) 方法,把 SQLite 里的指纹导出到文本文件,下次启动时可以恢复(断点续爬)。

具体步骤:

  1. 实现 export_fingerprints(self, path: str) -> None:查询 fingerprints 表,把每个 fp 写入文件
  2. 实现 import_fingerprints(self, path: str) -> None:从文件读取指纹,用 INSERT OR IGNORE 写回 SQLite
  3. 验证:爬取 10 个 URL,导出指纹,创建新的 URLDeduplicator 并导入,再爬同样的 10 个 URL,应该全部被跳过

Don’t wait for next time, do it in the next moment.