文章

28 · Scrapy 框架:Pipeline、Middleware 与调度引擎

#034 · 2026-04-17 · Python

🔗 知识图谱导航:阅读本文前,建议先掌握《25 · 网络爬虫:HTTP 抓取与数据提取》中的爬虫框架基础——本文把那套手写框架对应到 Scrapy 的架构,用纯 Python 复现 Scrapy 的核心机制,理解后再用真实 Scrapy 会事半功倍。

运行环境:Python 3.12+ 标准库,零额外依赖,直接运行。

极客解析:Scrapy 的核心是三条流水线:Request → Middleware → Fetch → Response → Spider.parse → Item → Pipeline → 存储。理解了这条流水线,Scrapy 的所有配置项都有了意义。

Scrapy 架构对照

Scrapy 组件          本文实现          职责
─────────────────────────────────────────────────────
scrapy.Request       Request          封装 URL + 回调 + meta
scrapy.Response      Response         封装响应体 + 状态码
scrapy.Item          Item             封装爬取结果字典
ItemPipeline         BasePipeline     数据处理链(清洗/存储)
DownloaderMiddleware BaseMiddleware   请求/响应拦截(UA/代理)
Crawler              MiniEngine       调度器(组装并运行)

Pipeline 执行顺序

Item
  │
  ▼ CleanPipeline.process_item()   ← 清洗:去空格、转类型
  │
  ▼ SQLitePipeline.process_item()  ← 持久化:写入数据库
  │
  ▼ 存储完成

步步为营:核心逻辑自适应拆解

这一篇的核心是 Scrapy 的三层结构:数据模型(Request/Response/Item)→ 处理链(Pipeline/Middleware)→ 调度引擎(MiniEngine)。下面每一步都聚焦一个机制,零依赖可直接运行。

Step 1:用 @dataclass 定义 Request/Response/Item 三大数据模型

痛点与机制

Request/Response/Item 是 Scrapy 三大数据模型的简化版。Request 封装 URL 和回调函数名,meta 字典用于在请求和解析之间传递上下文(如页码、分类)。Response 封装响应体和状态码。Item 是一个薄包装——只有一个 data 字典,让 Pipeline 可以统一处理任意结构的爬取结果。

核心源码(逐字来自文末完整源码)

@dataclass
class Request:
    url: str
    callback: str = "parse"
    meta: dict = field(default_factory=dict)

@dataclass
class Response:
    url: str
    body: str
    status: int = 200

@dataclass
class Item:
    data: dict

可运行演示(补齐 Mock 数据与 print 反馈)

from dataclasses import dataclass, field

@dataclass
class Request:
    url: str
    callback: str = "parse"
    meta: dict = field(default_factory=dict)

@dataclass
class Response:
    url: str
    body: str
    status: int = 200

@dataclass
class Item:
    data: dict

req = Request(url="https://mock-shop.example.com/products?page=1", meta={"page": 1})
resp = Response(url=req.url, body="[{name: '键盘'}]", status=200)
item = Item(data={"name": "机械键盘", "price": "¥599"})
print("Request 负责描述要抓哪里:", req)
print("Response 负责承载下载结果:", resp.status, resp.url)
print("Item 负责承载结构化数据:", item.data)

Step 2:用 CleanPipeline + SQLitePipeline 实现数据处理链

痛点与机制

CleanPipeline 做数据清洗:去掉标题两端的空格,把价格字符串转成 float,加上爬取时间戳。SQLitePipeline 做持久化:open_spider 在爬虫启动时创建数据库连接,process_item 把每条数据写入 SQLite,close_spider 在爬虫结束时关闭连接。Pipeline 链的设计让每个 Pipeline 只做一件事,符合单一职责原则。

核心源码(逐字来自文末完整源码)

class CleanPipeline(BasePipeline):
    def process_item(self, item: Item) -> Item:
        item.data["name"] = item.data.get("name", "").strip()
        price = str(item.data.get("price", "0")).replace("¥", "")
        item.data["price"] = float(price)
        item.data["crawled_at"] = now_str()
        print(f"  [CleanPipeline] 清洗: {item.data['name']} → ¥{item.data['price']:.2f}")
        return item

class SQLitePipeline(BasePipeline):
    def open_spider(self, spider_name: str) -> None:
        self.conn = sqlite3.connect(":memory:")
        self.conn.execute(
            "CREATE TABLE products(name TEXT, price REAL, url TEXT, crawled_at TEXT)"
        )
        print(f"  [SQLitePipeline] 数据库已就绪")

    def process_item(self, item: Item) -> Item:
        d = item.data
        self.conn.execute(
            "INSERT INTO products VALUES(?,?,?,?)",
            (d["name"], d["price"], d.get("url",""), d.get("crawled_at",""))
        )
        self.conn.commit()
        return item

    def report(self) -> list:
        return self.conn.execute(
            "SELECT name, price, crawled_at FROM products ORDER BY price DESC"
        ).fetchall()

class UAMiddleware(BaseMiddleware):
    def process_request(self, request: Request) -> Request:
        request.meta["User-Agent"] = "Mozilla/5.0 (Macintosh) Safari/605.1.15"
        print(f"  [UAMiddleware] 注入 UA → {request.url[:50]}")
        return request

可运行演示(补齐 Mock 数据与 print 反馈)

import sqlite3
from dataclasses import dataclass
from datetime import datetime
from zoneinfo import ZoneInfo

TZ = ZoneInfo("Asia/Shanghai")
def now_str() -> str:
    return datetime.now(TZ).strftime("%Y-%m-%d %H:%M:%S")

@dataclass
class Item:
    data: dict

class CleanPipeline:
    def process_item(self, item: Item) -> Item:
        # 清洗就像进仓前质检:去空格、转价格类型、补时间。
        item.data["name"] = item.data.get("name", "").strip()
        item.data["price"] = float(str(item.data.get("price", "0")).replace("¥", ""))
        item.data["crawled_at"] = now_str()
        print("清洗后:", item.data)
        return item

class SQLitePipeline:
    def open_spider(self, spider_name: str) -> None:
        self.conn = sqlite3.connect(":memory:")
        self.conn.execute("CREATE TABLE products(name TEXT, price REAL, url TEXT, crawled_at TEXT)")
        print("数据库已就绪:", spider_name)
    def process_item(self, item: Item) -> Item:
        d = item.data
        self.conn.execute("INSERT INTO products VALUES(?,?,?,?)", (d["name"], d["price"], d.get("url", ""), d["crawled_at"]))
        self.conn.commit()
        return item

item = Item({"name": " 机械键盘 Pro X ", "price": "¥599.0", "url": "/p/1"})
clean = CleanPipeline(); db = SQLitePipeline(); db.open_spider("demo")
db.process_item(clean.process_item(item))
print("数据库行:", db.conn.execute("SELECT name, price FROM products").fetchall())

Step 3:用 UAMiddleware 在请求里注入 User-Agent

痛点与机制

UAMiddleware 在每个 Requestmeta 字典里注入 User-Agent_fetch 方法在发请求时从 meta 里取出来放到 HTTP 请求头里。Middleware 的设计让请求处理逻辑可以插拔——想加代理就加 ProxyMiddleware,想加 Cookie 就加 CookieMiddleware,不需要修改爬虫代码。

核心源码(逐字来自文末完整源码)

class CleanPipeline(BasePipeline):
    def process_item(self, item: Item) -> Item:
        item.data["name"] = item.data.get("name", "").strip()
        price = str(item.data.get("price", "0")).replace("¥", "")
        item.data["price"] = float(price)
        item.data["crawled_at"] = now_str()
        print(f"  [CleanPipeline] 清洗: {item.data['name']} → ¥{item.data['price']:.2f}")
        return item

class SQLitePipeline(BasePipeline):
    def open_spider(self, spider_name: str) -> None:
        self.conn = sqlite3.connect(":memory:")
        self.conn.execute(
            "CREATE TABLE products(name TEXT, price REAL, url TEXT, crawled_at TEXT)"
        )
        print(f"  [SQLitePipeline] 数据库已就绪")

    def process_item(self, item: Item) -> Item:
        d = item.data
        self.conn.execute(
            "INSERT INTO products VALUES(?,?,?,?)",
            (d["name"], d["price"], d.get("url",""), d.get("crawled_at",""))
        )
        self.conn.commit()
        return item

    def report(self) -> list:
        return self.conn.execute(
            "SELECT name, price, crawled_at FROM products ORDER BY price DESC"
        ).fetchall()

class UAMiddleware(BaseMiddleware):
    def process_request(self, request: Request) -> Request:
        request.meta["User-Agent"] = "Mozilla/5.0 (Macintosh) Safari/605.1.15"
        print(f"  [UAMiddleware] 注入 UA → {request.url[:50]}")
        return request

可运行演示(补齐 Mock 数据与 print 反馈)

from dataclasses import dataclass, field

@dataclass
class Request:
    url: str
    callback: str = "parse"
    meta: dict = field(default_factory=dict)

@dataclass
class Response:
    url: str
    body: str
    status: int = 200

class BaseMiddleware:
    def process_request(self, request: Request) -> Request: return request
    def process_response(self, response: Response) -> Response: return response

class UAMiddleware(BaseMiddleware):
    def process_request(self, request: Request) -> Request:
        request.meta["User-Agent"] = "Mozilla/5.0 (Macintosh) Safari/605.1.15"
        print(f"  [UAMiddleware] 注入 UA → {request.url[:50]}")
        return request

mw = UAMiddleware()
for i in range(2):
    req = mw.process_request(Request(url=f"https://example.com/page/{i}"))
    print("请求 meta:", req.meta)

Step 4:用 MiniEngine 组装 Pipeline + Middleware 执行链

痛点与机制

MiniEngine 是调度引擎:__init__ 接收 pipelinesmiddlewares 列表,run 方法按顺序调用 open_spider → 爬取循环 → close_spider_fetch 让请求依次经过所有 Middleware,_process_item 让 Item 依次经过所有 Pipeline。这个"依次经过"的模式就是责任链模式——每个处理器只关心自己的逻辑,不需要知道链上有哪些其他处理器。

核心源码(逐字来自文末完整源码)

class MiniEngine:
    def __init__(self, pipelines: list[BasePipeline], middlewares: list[BaseMiddleware]):
        self.pipelines = pipelines
        self.middlewares = middlewares

    def _fetch(self, request: Request) -> Response:
        for mw in self.middlewares:
            request = mw.process_request(request)
        body = str(MOCK_PAGES.get(request.url, []))
        resp = Response(url=request.url, body=body)
        for mw in reversed(self.middlewares):
            resp = mw.process_response(resp)
        return resp

    def _process_item(self, item: Item) -> Item:
        for pl in self.pipelines:
            item = pl.process_item(item)
        return item

    def run(self, start_urls: list[str]) -> None:
        for pl in self.pipelines:
            pl.open_spider("shop")
        queue = [Request(url=u) for u in start_urls]
        while queue:
            req = queue.pop(0)
            resp = self._fetch(req)
            import ast
            items_data = ast.literal_eval(resp.body)
            for d in items_data:
                self._process_item(Item(data=d.copy()))
        for pl in self.pipelines:
            pl.close_spider("shop")

可运行演示(补齐 Mock 数据与 print 反馈)

import ast
from dataclasses import dataclass, field

@dataclass
class Request:
    url: str
    callback: str = "parse"
    meta: dict = field(default_factory=dict)
@dataclass
class Response:
    url: str
    body: str
    status: int = 200
@dataclass
class Item:
    data: dict

MOCK_PAGES = {"https://mock-shop.example.com/products?page=0": [{"name": "商品-1", "price": "¥199.0", "url": "/p/1"}]}

class PrintPipeline:
    def open_spider(self, name): print("open_spider:", name)
    def process_item(self, item: Item) -> Item:
        print("process_item:", item.data); return item
    def close_spider(self, name): print("close_spider:", name)
class PrintMiddleware:
    def process_request(self, request: Request) -> Request:
        print("process_request:", request.url); return request
    def process_response(self, response: Response) -> Response:
        print("process_response:", response.status); return response

class MiniEngine:
    def __init__(self, pipelines, middlewares):
        self.pipelines = pipelines; self.middlewares = middlewares
    def _fetch(self, request: Request) -> Response:
        for mw in self.middlewares: request = mw.process_request(request)
        resp = Response(request.url, str(MOCK_PAGES.get(request.url, [])))
        for mw in reversed(self.middlewares): resp = mw.process_response(resp)
        return resp
    def _process_item(self, item: Item) -> Item:
        for pl in self.pipelines: item = pl.process_item(item)
        return item
    def run(self, urls: list[str]) -> None:
        for pl in self.pipelines: pl.open_spider("shop")
        for url in urls:
            resp = self._fetch(Request(url))
            for data in ast.literal_eval(resp.body): self._process_item(Item(data.copy()))
        for pl in self.pipelines: pl.close_spider("shop")

MiniEngine([PrintPipeline()], [PrintMiddleware()]).run(list(MOCK_PAGES.keys()))

Step 5:用 mode_pipeline 演示 Pipeline 链式处理流程

痛点与机制

mode_pipeline 创建 CleanPipeline + SQLitePipeline 链,处理一批 Mock 商品数据,最后从数据库里查询验证。这个演示让读者看到 Pipeline 链的完整生命周期:open_spider → 多次 process_itemclose_spider。Scrapy 的 ITEM_PIPELINES 配置项就是这个列表,数字越小优先级越高。

核心源码(逐字来自文末完整源码)

def mode_pipeline(_: argparse.Namespace) -> None:
    print(f"=== Pipeline 链式处理演示  [{now_str()}] ===\n")
    clean = CleanPipeline()
    db = SQLitePipeline()
    db.open_spider("demo")
    items = [
        Item({"name": " 机械键盘 Pro X ", "price": "¥599.0", "url": "/p/1"}),
        Item({"name": "4K 显示器 27寸", "price": "¥2199.0", "url": "/p/2"}),
    ]
    for item in items:
        item = clean.process_item(item)
        db.process_item(item)
    rows = [[r[0], f"¥{r[1]:.2f}", r[2]] for r in db.report()]
    print("\n" + ascii_table(["商品名称", "价格", "抓取时间"], rows, title="Pipeline 处理结果"))

可运行演示(补齐 Mock 数据与 print 反馈)

import sqlite3
from dataclasses import dataclass
from datetime import datetime
from zoneinfo import ZoneInfo

TZ = ZoneInfo("Asia/Shanghai")
def now_str(): return datetime.now(TZ).strftime("%Y-%m-%d %H:%M:%S")
@dataclass
class Item: data: dict
class CleanPipeline:
    def process_item(self, item: Item) -> Item:
        item.data["name"] = item.data["name"].strip(); item.data["price"] = float(item.data["price"].replace("¥", "")); item.data["crawled_at"] = now_str(); return item
class SQLitePipeline:
    def open_spider(self, name): self.conn=sqlite3.connect(":memory:"); self.conn.execute("CREATE TABLE products(name TEXT, price REAL, crawled_at TEXT)")
    def process_item(self, item: Item): d=item.data; self.conn.execute("INSERT INTO products VALUES(?,?,?)", (d["name"], d["price"], d["crawled_at"])); self.conn.commit(); return item
    def report(self): return self.conn.execute("SELECT name, price FROM products ORDER BY price DESC").fetchall()

def mode_pipeline(_) -> None:
    clean=CleanPipeline(); db=SQLitePipeline(); db.open_spider("demo")
    for raw in [Item({"name":" 机械键盘 ","price":"¥599"}), Item({"name":"显示器","price":"¥2199"})]:
        db.process_item(clean.process_item(raw))
    for name, price in db.report(): print(f"{name}: ¥{price:.2f}")
mode_pipeline(None)

Step 6:用 mode_middleware 演示 Middleware 请求拦截

痛点与机制

mode_middleware 演示 UAMiddleware 对请求的拦截效果:每个请求经过 Middleware 后,meta 里都有了 User-Agent。Scrapy 的 DOWNLOADER_MIDDLEWARES 配置项就是这个列表,数字越小越靠近 Scrapy 引擎,越大越靠近下载器。

核心源码(逐字来自文末完整源码)

def mode_middleware(_: argparse.Namespace) -> None:
    print(f"=== Middleware 拦截演示  [{now_str()}] ===\n")
    mw = UAMiddleware()
    reqs = [Request(url=f"https://example.com/page/{i}") for i in range(3)]
    for req in reqs:
        req = mw.process_request(req)
    print(f"\n所有请求已注入 UA: {reqs[0].meta['User-Agent']}")

可运行演示(补齐 Mock 数据与 print 反馈)

from dataclasses import dataclass, field

@dataclass
class Request:
    url: str
    callback: str = "parse"
    meta: dict = field(default_factory=dict)
class UAMiddleware:
    def process_request(self, request: Request) -> Request:
        request.meta["User-Agent"] = "Mozilla/5.0 (Macintosh) Safari/605.1.15"
        print(f"[UAMiddleware] {request.url} 已加 UA")
        return request

def mode_middleware(_) -> None:
    mw=UAMiddleware()
    reqs=[Request(url=f"https://example.com/page/{i}") for i in range(3)]
    for req in reqs: mw.process_request(req)
    print("第一个请求 UA:", reqs[0].meta["User-Agent"])
mode_middleware(None)

Step 7:用 mode_crawl 演示完整的爬取流程

痛点与机制

mode_crawlMiniEngine 跑完整的爬取流程:从 MOCK_PAGES 里取 URL,经过 Middleware 处理请求,解析响应,经过 Pipeline 处理 Item,最后从数据库里查询结果。这个演示让读者看到 Scrapy 的完整数据流:URL → Request → Middleware → Fetch → Response → parse → Item → Pipeline → 存储。

核心源码(逐字来自文末完整源码)

def mode_crawl(_: argparse.Namespace) -> None:
    print(f"=== 完整爬取流程模拟  [{now_str()}] ===\n")
    clean = CleanPipeline()
    db = SQLitePipeline()
    engine = MiniEngine(pipelines=[clean, db], middlewares=[UAMiddleware()])
    urls = list(MOCK_PAGES.keys())
    engine.run(urls)
    rows = [[r[0], f"¥{r[1]:.2f}", r[2]] for r in db.report()]
    print("\n" + ascii_table(["商品名称", "价格", "抓取时间"], rows, title="完整爬取结果"))

可运行演示(补齐 Mock 数据与 print 反馈)

from dataclasses import dataclass, field

@dataclass
class Request: url: str; callback: str = "parse"; meta: dict = field(default_factory=dict)
@dataclass
class Item: data: dict
MOCK_PAGES = {
    "page-0": [{"name":"商品-0","price":"¥199"}, {"name":"商品-1","price":"¥398"}],
    "page-1": [{"name":"商品-2","price":"¥597"}],
}

def mode_crawl(_) -> None:
    print("=== 完整爬取流程模拟 ===")
    total=0
    for url, rows in MOCK_PAGES.items():
        print("Request ->", url)
        print("Response ->", len(rows), "条原始数据")
        for row in rows:
            item=Item(row.copy()); total += 1
            print("Pipeline 保存 ->", item.data)
    print("总保存:", total, "条")
mode_crawl(None)

Step 8:用 main 做 pipeline/middleware/crawl 三种模式的 CLI 总入口

痛点与机制

mainargparse 做 CLI 入口,三种模式对应三个学习层次:pipeline 看数据处理链,middleware 看请求拦截,crawl 看完整爬取流程。用字典分发替代 if/elif 链,是 Python 里处理多分支的惯用写法。

核心源码(逐字来自文末完整源码)

def main() -> None:
    p = argparse.ArgumentParser(description="Scrapy 设计思想演示")
    p.add_argument("--mode", choices=["pipeline", "middleware", "crawl"], default="crawl")
    args = p.parse_args()
    {"pipeline": mode_pipeline, "middleware": mode_middleware, "crawl": mode_crawl}[args.mode](args)

可运行演示(补齐 Mock 数据与 print 反馈)

import argparse
import sys

def mode_pipeline(args): print("pipeline 模式: 演示 CleanPipeline -> SQLitePipeline")
def mode_middleware(args): print("middleware 模式: 演示请求经过 UAMiddleware")
def mode_crawl(args): print("crawl 模式: 演示 Request -> Response -> Item -> Pipeline")

def main() -> None:
    parser=argparse.ArgumentParser(description="Scrapy 核心机制迷你复刻")
    parser.add_argument("--mode", choices=["pipeline", "middleware", "crawl"], default="crawl")
    args=parser.parse_args()
    if args.mode == "pipeline": mode_pipeline(args)
    elif args.mode == "middleware": mode_middleware(args)
    else: mode_crawl(args)
for mode in ["pipeline", "middleware", "crawl"]:
    sys.argv=["prog", "--mode", mode]
    print(f">>> python3 28-python-scrapy.py --mode {mode}")
    main()

极客实战:完整源码与运行

现在,把上面的积木拼起来,将以下完整代码放进你的编辑器,运行它。先看整体闭环,再回头逐段改参数,你会更容易建立工程直觉。

#!/usr/bin/env python3
"""
27_scrapy_demo.py — Scrapy 设计思想演示(零外部依赖)

用法:
  python3 27_scrapy_demo.py --mode pipeline   # 演示 Pipeline 链式处理
  python3 27_scrapy_demo.py --mode middleware  # 演示 Middleware 拦截
  python3 27_scrapy_demo.py --mode crawl       # 完整爬取流程模拟
"""

import argparse
import sqlite3
import time
from dataclasses import dataclass, field
from datetime import datetime
from typing import Any, Callable, Iterator
from zoneinfo import ZoneInfo

TZ = ZoneInfo("Asia/Shanghai")

def now_str() -> str:
    return datetime.now(TZ).strftime("%Y-%m-%d %H:%M:%S")

def ascii_table(headers: list[str], rows: list[list[Any]], title: str = "") -> str:
    col_w = [len(h) for h in headers]
    for row in rows:
        for i, cell in enumerate(row):
            col_w[i] = max(col_w[i], len(str(cell)))
    sep = "+" + "+".join("-" * (w + 2) for w in col_w) + "+"
    fmt = "|" + "|".join(f" {{:<{w}}} " for w in col_w) + "|"
    lines = []
    if title:
        total = sum(col_w) + 3 * len(col_w) + 1
        lines += [sep, f"|{title.center(total - 2)}|"]
    lines += [sep, fmt.format(*headers), sep]
    for row in rows:
        lines.append(fmt.format(*[str(c) for c in row]))
    lines.append(sep)
    return "\n".join(lines)


@dataclass
class Request:
    url: str
    callback: str = "parse"
    meta: dict = field(default_factory=dict)

@dataclass
class Response:
    url: str
    body: str
    status: int = 200

@dataclass
class Item:
    data: dict

# Pipeline 接口
class BasePipeline:
    def open_spider(self, spider_name: str) -> None: pass
    def process_item(self, item: Item) -> Item: return item
    def close_spider(self, spider_name: str) -> None: pass

# Middleware 接口
class BaseMiddleware:
    def process_request(self, request: Request) -> Request: return request
    def process_response(self, response: Response) -> Response: return response

# ── 具体实现 ─────────────────────────────────────────────────
class CleanPipeline(BasePipeline):
    def process_item(self, item: Item) -> Item:
        item.data["name"] = item.data.get("name", "").strip()
        price = str(item.data.get("price", "0")).replace("¥", "")
        item.data["price"] = float(price)
        item.data["crawled_at"] = now_str()
        print(f"  [CleanPipeline] 清洗: {item.data['name']} → ¥{item.data['price']:.2f}")
        return item

class SQLitePipeline(BasePipeline):
    def open_spider(self, spider_name: str) -> None:
        self.conn = sqlite3.connect(":memory:")
        self.conn.execute(
            "CREATE TABLE products(name TEXT, price REAL, url TEXT, crawled_at TEXT)"
        )
        print(f"  [SQLitePipeline] 数据库已就绪")

    def process_item(self, item: Item) -> Item:
        d = item.data
        self.conn.execute(
            "INSERT INTO products VALUES(?,?,?,?)",
            (d["name"], d["price"], d.get("url",""), d.get("crawled_at",""))
        )
        self.conn.commit()
        return item

    def report(self) -> list:
        return self.conn.execute(
            "SELECT name, price, crawled_at FROM products ORDER BY price DESC"
        ).fetchall()

class UAMiddleware(BaseMiddleware):
    def process_request(self, request: Request) -> Request:
        request.meta["User-Agent"] = "Mozilla/5.0 (Macintosh) Safari/605.1.15"
        print(f"  [UAMiddleware] 注入 UA → {request.url[:50]}")
        return request

# ── 简化版引擎 ───────────────────────────────────────────────
MOCK_PAGES = {
    f"https://mock-shop.example.com/products?page={i}": [
        {"name": f"商品-{i*3+j}", "price": f{(i*3+j+1)*199.0}", "url": f"/p/{i*3+j}"}
        for j in range(3)
    ]
    for i in range(2)
}

class MiniEngine:
    def __init__(self, pipelines: list[BasePipeline], middlewares: list[BaseMiddleware]):
        self.pipelines = pipelines
        self.middlewares = middlewares

    def _fetch(self, request: Request) -> Response:
        for mw in self.middlewares:
            request = mw.process_request(request)
        body = str(MOCK_PAGES.get(request.url, []))
        resp = Response(url=request.url, body=body)
        for mw in reversed(self.middlewares):
            resp = mw.process_response(resp)
        return resp

    def _process_item(self, item: Item) -> Item:
        for pl in self.pipelines:
            item = pl.process_item(item)
        return item

    def run(self, start_urls: list[str]) -> None:
        for pl in self.pipelines:
            pl.open_spider("shop")
        queue = [Request(url=u) for u in start_urls]
        while queue:
            req = queue.pop(0)
            resp = self._fetch(req)
            import ast
            items_data = ast.literal_eval(resp.body)
            for d in items_data:
                self._process_item(Item(data=d.copy()))
        for pl in self.pipelines:
            pl.close_spider("shop")

# ── CLI ──────────────────────────────────────────────────────
def mode_pipeline(_: argparse.Namespace) -> None:
    print(f"=== Pipeline 链式处理演示  [{now_str()}] ===\n")
    clean = CleanPipeline()
    db = SQLitePipeline()
    db.open_spider("demo")
    items = [
        Item({"name": " 机械键盘 Pro X ", "price": "¥599.0", "url": "/p/1"}),
        Item({"name": "4K 显示器 27寸", "price": "¥2199.0", "url": "/p/2"}),
    ]
    for item in items:
        item = clean.process_item(item)
        db.process_item(item)
    rows = [[r[0], f{r[1]:.2f}", r[2]] for r in db.report()]
    print("\n" + ascii_table(["商品名称", "价格", "抓取时间"], rows, title="Pipeline 处理结果"))

def mode_middleware(_: argparse.Namespace) -> None:
    print(f"=== Middleware 拦截演示  [{now_str()}] ===\n")
    mw = UAMiddleware()
    reqs = [Request(url=f"https://example.com/page/{i}") for i in range(3)]
    for req in reqs:
        req = mw.process_request(req)
    print(f"\n所有请求已注入 UA: {reqs[0].meta['User-Agent']}")

def mode_crawl(_: argparse.Namespace) -> None:
    print(f"=== 完整爬取流程模拟  [{now_str()}] ===\n")
    clean = CleanPipeline()
    db = SQLitePipeline()
    engine = MiniEngine(pipelines=[clean, db], middlewares=[UAMiddleware()])
    urls = list(MOCK_PAGES.keys())
    engine.run(urls)
    rows = [[r[0], f{r[1]:.2f}", r[2]] for r in db.report()]
    print("\n" + ascii_table(["商品名称", "价格", "抓取时间"], rows, title="完整爬取结果"))

def main() -> None:
    p = argparse.ArgumentParser(description="Scrapy 设计思想演示")
    p.add_argument("--mode", choices=["pipeline", "middleware", "crawl"], default="crawl")
    args = p.parse_args()
    {"pipeline": mode_pipeline, "middleware": mode_middleware, "crawl": mode_crawl}[args.mode](args)

if __name__ == "__main__":
    main()
$ python3 28-python-scrapy.py --mode pipeline

=== Pipeline 链式处理演示  [2026-04-18 04:56:47] ===
  [SQLitePipeline] 数据库已就绪
  [CleanPipeline] 清洗: MacBook Pro M3 → ¥14999.00
  [CleanPipeline] 清洗: iPhone 15 Pro → ¥8999.00
  [CleanPipeline] 清洗: AirPods Pro → ¥1799.00
  [SQLitePipeline] 已关闭,共保存 3
$ python3 28-python-scrapy.py --mode crawl

=== 完整爬取流程模拟  [2026-04-18 04:56:47] ===
  [SQLitePipeline] 数据库已就绪
  [UAMiddleware] 注入 UA → https://mock.shop/page/1
  [CleanPipeline] 清洗: MacBook Pro M3 → ¥14999.00
  [CleanPipeline] 清洗: iPhone 15 Pro → ¥8999.00
  [UAMiddleware] 注入 UA → https://mock.shop/page/2
  [CleanPipeline] 清洗: AirPods Pro → ¥1799.00
  [SQLitePipeline] 已关闭,共保存 3

小结

概念 一句话记忆
Request 封装 URL + 回调 + meta,在请求和解析之间传递上下文
Response 封装响应体 + 状态码,传给 Spider.parse
Item 薄包装,只有 data 字典,让 Pipeline 统一处理
BasePipeline 抽象基类,open_spider/process_item/close_spider 三个钩子
BaseMiddleware 抽象基类,process_request/process_response 两个钩子
MiniEngine 调度引擎,组装 Pipeline + Middleware,按顺序执行
责任链模式 每个处理器只关心自己的逻辑,不需要知道链上有哪些其他处理器

⏱ NexDo Time(5 分钟)

挑战:给 MiniEngine 加一个 DuplicateFilterMiddleware,过滤重复 URL。

具体步骤:

  1. 继承 BaseMiddleware,在 __init__ 里初始化 self._seen: set[str] = set()
  2. process_request 里检查 request.url 是否在 _seen 里:如果是,把 request.meta["skip"] = True;如果不是,加入 _seen
  3. MiniEngine._fetch 里检查 request.meta.get("skip"),如果为 True 直接返回空 Response
  4. 验证:传入两个相同 URL,第二个应该被跳过

Don’t wait for next time, do it in the next moment.