28 · Scrapy 框架:Pipeline、Middleware 与调度引擎
🔗 知识图谱导航:阅读本文前,建议先掌握《25 · 网络爬虫:HTTP 抓取与数据提取》中的爬虫框架基础——本文把那套手写框架对应到 Scrapy 的架构,用纯 Python 复现 Scrapy 的核心机制,理解后再用真实 Scrapy 会事半功倍。
运行环境:Python 3.12+ 标准库,零额外依赖,直接运行。
极客解析:Scrapy 的核心是三条流水线:Request → Middleware → Fetch → Response → Spider.parse → Item → Pipeline → 存储。理解了这条流水线,Scrapy 的所有配置项都有了意义。
Scrapy 架构对照
Scrapy 组件 本文实现 职责
─────────────────────────────────────────────────────
scrapy.Request Request 封装 URL + 回调 + meta
scrapy.Response Response 封装响应体 + 状态码
scrapy.Item Item 封装爬取结果字典
ItemPipeline BasePipeline 数据处理链(清洗/存储)
DownloaderMiddleware BaseMiddleware 请求/响应拦截(UA/代理)
Crawler MiniEngine 调度器(组装并运行)
Pipeline 执行顺序
Item
│
▼ CleanPipeline.process_item() ← 清洗:去空格、转类型
│
▼ SQLitePipeline.process_item() ← 持久化:写入数据库
│
▼ 存储完成
步步为营:核心逻辑自适应拆解
这一篇的核心是 Scrapy 的三层结构:数据模型(Request/Response/Item)→ 处理链(Pipeline/Middleware)→ 调度引擎(MiniEngine)。下面每一步都聚焦一个机制,零依赖可直接运行。
Step 1:用 @dataclass 定义 Request/Response/Item 三大数据模型
痛点与机制:
Request/Response/Item 是 Scrapy 三大数据模型的简化版。Request 封装 URL 和回调函数名,meta 字典用于在请求和解析之间传递上下文(如页码、分类)。Response 封装响应体和状态码。Item 是一个薄包装——只有一个 data 字典,让 Pipeline 可以统一处理任意结构的爬取结果。
核心源码(逐字来自文末完整源码):
@dataclass
class Request:
url: str
callback: str = "parse"
meta: dict = field(default_factory=dict)
@dataclass
class Response:
url: str
body: str
status: int = 200
@dataclass
class Item:
data: dict
可运行演示(补齐 Mock 数据与 print 反馈):
from dataclasses import dataclass, field
@dataclass
class Request:
url: str
callback: str = "parse"
meta: dict = field(default_factory=dict)
@dataclass
class Response:
url: str
body: str
status: int = 200
@dataclass
class Item:
data: dict
req = Request(url="https://mock-shop.example.com/products?page=1", meta={"page": 1})
resp = Response(url=req.url, body="[{name: '键盘'}]", status=200)
item = Item(data={"name": "机械键盘", "price": "¥599"})
print("Request 负责描述要抓哪里:", req)
print("Response 负责承载下载结果:", resp.status, resp.url)
print("Item 负责承载结构化数据:", item.data)
Step 2:用 CleanPipeline + SQLitePipeline 实现数据处理链
痛点与机制:
CleanPipeline 做数据清洗:去掉标题两端的空格,把价格字符串转成 float,加上爬取时间戳。SQLitePipeline 做持久化:open_spider 在爬虫启动时创建数据库连接,process_item 把每条数据写入 SQLite,close_spider 在爬虫结束时关闭连接。Pipeline 链的设计让每个 Pipeline 只做一件事,符合单一职责原则。
核心源码(逐字来自文末完整源码):
class CleanPipeline(BasePipeline):
def process_item(self, item: Item) -> Item:
item.data["name"] = item.data.get("name", "").strip()
price = str(item.data.get("price", "0")).replace("¥", "")
item.data["price"] = float(price)
item.data["crawled_at"] = now_str()
print(f" [CleanPipeline] 清洗: {item.data['name']} → ¥{item.data['price']:.2f}")
return item
class SQLitePipeline(BasePipeline):
def open_spider(self, spider_name: str) -> None:
self.conn = sqlite3.connect(":memory:")
self.conn.execute(
"CREATE TABLE products(name TEXT, price REAL, url TEXT, crawled_at TEXT)"
)
print(f" [SQLitePipeline] 数据库已就绪")
def process_item(self, item: Item) -> Item:
d = item.data
self.conn.execute(
"INSERT INTO products VALUES(?,?,?,?)",
(d["name"], d["price"], d.get("url",""), d.get("crawled_at",""))
)
self.conn.commit()
return item
def report(self) -> list:
return self.conn.execute(
"SELECT name, price, crawled_at FROM products ORDER BY price DESC"
).fetchall()
class UAMiddleware(BaseMiddleware):
def process_request(self, request: Request) -> Request:
request.meta["User-Agent"] = "Mozilla/5.0 (Macintosh) Safari/605.1.15"
print(f" [UAMiddleware] 注入 UA → {request.url[:50]}")
return request
可运行演示(补齐 Mock 数据与 print 反馈):
import sqlite3
from dataclasses import dataclass
from datetime import datetime
from zoneinfo import ZoneInfo
TZ = ZoneInfo("Asia/Shanghai")
def now_str() -> str:
return datetime.now(TZ).strftime("%Y-%m-%d %H:%M:%S")
@dataclass
class Item:
data: dict
class CleanPipeline:
def process_item(self, item: Item) -> Item:
# 清洗就像进仓前质检:去空格、转价格类型、补时间。
item.data["name"] = item.data.get("name", "").strip()
item.data["price"] = float(str(item.data.get("price", "0")).replace("¥", ""))
item.data["crawled_at"] = now_str()
print("清洗后:", item.data)
return item
class SQLitePipeline:
def open_spider(self, spider_name: str) -> None:
self.conn = sqlite3.connect(":memory:")
self.conn.execute("CREATE TABLE products(name TEXT, price REAL, url TEXT, crawled_at TEXT)")
print("数据库已就绪:", spider_name)
def process_item(self, item: Item) -> Item:
d = item.data
self.conn.execute("INSERT INTO products VALUES(?,?,?,?)", (d["name"], d["price"], d.get("url", ""), d["crawled_at"]))
self.conn.commit()
return item
item = Item({"name": " 机械键盘 Pro X ", "price": "¥599.0", "url": "/p/1"})
clean = CleanPipeline(); db = SQLitePipeline(); db.open_spider("demo")
db.process_item(clean.process_item(item))
print("数据库行:", db.conn.execute("SELECT name, price FROM products").fetchall())
Step 3:用 UAMiddleware 在请求里注入 User-Agent
痛点与机制:
UAMiddleware 在每个 Request 的 meta 字典里注入 User-Agent,_fetch 方法在发请求时从 meta 里取出来放到 HTTP 请求头里。Middleware 的设计让请求处理逻辑可以插拔——想加代理就加 ProxyMiddleware,想加 Cookie 就加 CookieMiddleware,不需要修改爬虫代码。
核心源码(逐字来自文末完整源码):
class CleanPipeline(BasePipeline):
def process_item(self, item: Item) -> Item:
item.data["name"] = item.data.get("name", "").strip()
price = str(item.data.get("price", "0")).replace("¥", "")
item.data["price"] = float(price)
item.data["crawled_at"] = now_str()
print(f" [CleanPipeline] 清洗: {item.data['name']} → ¥{item.data['price']:.2f}")
return item
class SQLitePipeline(BasePipeline):
def open_spider(self, spider_name: str) -> None:
self.conn = sqlite3.connect(":memory:")
self.conn.execute(
"CREATE TABLE products(name TEXT, price REAL, url TEXT, crawled_at TEXT)"
)
print(f" [SQLitePipeline] 数据库已就绪")
def process_item(self, item: Item) -> Item:
d = item.data
self.conn.execute(
"INSERT INTO products VALUES(?,?,?,?)",
(d["name"], d["price"], d.get("url",""), d.get("crawled_at",""))
)
self.conn.commit()
return item
def report(self) -> list:
return self.conn.execute(
"SELECT name, price, crawled_at FROM products ORDER BY price DESC"
).fetchall()
class UAMiddleware(BaseMiddleware):
def process_request(self, request: Request) -> Request:
request.meta["User-Agent"] = "Mozilla/5.0 (Macintosh) Safari/605.1.15"
print(f" [UAMiddleware] 注入 UA → {request.url[:50]}")
return request
可运行演示(补齐 Mock 数据与 print 反馈):
from dataclasses import dataclass, field
@dataclass
class Request:
url: str
callback: str = "parse"
meta: dict = field(default_factory=dict)
@dataclass
class Response:
url: str
body: str
status: int = 200
class BaseMiddleware:
def process_request(self, request: Request) -> Request: return request
def process_response(self, response: Response) -> Response: return response
class UAMiddleware(BaseMiddleware):
def process_request(self, request: Request) -> Request:
request.meta["User-Agent"] = "Mozilla/5.0 (Macintosh) Safari/605.1.15"
print(f" [UAMiddleware] 注入 UA → {request.url[:50]}")
return request
mw = UAMiddleware()
for i in range(2):
req = mw.process_request(Request(url=f"https://example.com/page/{i}"))
print("请求 meta:", req.meta)
Step 4:用 MiniEngine 组装 Pipeline + Middleware 执行链
痛点与机制:
MiniEngine 是调度引擎:__init__ 接收 pipelines 和 middlewares 列表,run 方法按顺序调用 open_spider → 爬取循环 → close_spider。_fetch 让请求依次经过所有 Middleware,_process_item 让 Item 依次经过所有 Pipeline。这个"依次经过"的模式就是责任链模式——每个处理器只关心自己的逻辑,不需要知道链上有哪些其他处理器。
核心源码(逐字来自文末完整源码):
class MiniEngine:
def __init__(self, pipelines: list[BasePipeline], middlewares: list[BaseMiddleware]):
self.pipelines = pipelines
self.middlewares = middlewares
def _fetch(self, request: Request) -> Response:
for mw in self.middlewares:
request = mw.process_request(request)
body = str(MOCK_PAGES.get(request.url, []))
resp = Response(url=request.url, body=body)
for mw in reversed(self.middlewares):
resp = mw.process_response(resp)
return resp
def _process_item(self, item: Item) -> Item:
for pl in self.pipelines:
item = pl.process_item(item)
return item
def run(self, start_urls: list[str]) -> None:
for pl in self.pipelines:
pl.open_spider("shop")
queue = [Request(url=u) for u in start_urls]
while queue:
req = queue.pop(0)
resp = self._fetch(req)
import ast
items_data = ast.literal_eval(resp.body)
for d in items_data:
self._process_item(Item(data=d.copy()))
for pl in self.pipelines:
pl.close_spider("shop")
可运行演示(补齐 Mock 数据与 print 反馈):
import ast
from dataclasses import dataclass, field
@dataclass
class Request:
url: str
callback: str = "parse"
meta: dict = field(default_factory=dict)
@dataclass
class Response:
url: str
body: str
status: int = 200
@dataclass
class Item:
data: dict
MOCK_PAGES = {"https://mock-shop.example.com/products?page=0": [{"name": "商品-1", "price": "¥199.0", "url": "/p/1"}]}
class PrintPipeline:
def open_spider(self, name): print("open_spider:", name)
def process_item(self, item: Item) -> Item:
print("process_item:", item.data); return item
def close_spider(self, name): print("close_spider:", name)
class PrintMiddleware:
def process_request(self, request: Request) -> Request:
print("process_request:", request.url); return request
def process_response(self, response: Response) -> Response:
print("process_response:", response.status); return response
class MiniEngine:
def __init__(self, pipelines, middlewares):
self.pipelines = pipelines; self.middlewares = middlewares
def _fetch(self, request: Request) -> Response:
for mw in self.middlewares: request = mw.process_request(request)
resp = Response(request.url, str(MOCK_PAGES.get(request.url, [])))
for mw in reversed(self.middlewares): resp = mw.process_response(resp)
return resp
def _process_item(self, item: Item) -> Item:
for pl in self.pipelines: item = pl.process_item(item)
return item
def run(self, urls: list[str]) -> None:
for pl in self.pipelines: pl.open_spider("shop")
for url in urls:
resp = self._fetch(Request(url))
for data in ast.literal_eval(resp.body): self._process_item(Item(data.copy()))
for pl in self.pipelines: pl.close_spider("shop")
MiniEngine([PrintPipeline()], [PrintMiddleware()]).run(list(MOCK_PAGES.keys()))
Step 5:用 mode_pipeline 演示 Pipeline 链式处理流程
痛点与机制:
mode_pipeline 创建 CleanPipeline + SQLitePipeline 链,处理一批 Mock 商品数据,最后从数据库里查询验证。这个演示让读者看到 Pipeline 链的完整生命周期:open_spider → 多次 process_item → close_spider。Scrapy 的 ITEM_PIPELINES 配置项就是这个列表,数字越小优先级越高。
核心源码(逐字来自文末完整源码):
def mode_pipeline(_: argparse.Namespace) -> None:
print(f"=== Pipeline 链式处理演示 [{now_str()}] ===\n")
clean = CleanPipeline()
db = SQLitePipeline()
db.open_spider("demo")
items = [
Item({"name": " 机械键盘 Pro X ", "price": "¥599.0", "url": "/p/1"}),
Item({"name": "4K 显示器 27寸", "price": "¥2199.0", "url": "/p/2"}),
]
for item in items:
item = clean.process_item(item)
db.process_item(item)
rows = [[r[0], f"¥{r[1]:.2f}", r[2]] for r in db.report()]
print("\n" + ascii_table(["商品名称", "价格", "抓取时间"], rows, title="Pipeline 处理结果"))
可运行演示(补齐 Mock 数据与 print 反馈):
import sqlite3
from dataclasses import dataclass
from datetime import datetime
from zoneinfo import ZoneInfo
TZ = ZoneInfo("Asia/Shanghai")
def now_str(): return datetime.now(TZ).strftime("%Y-%m-%d %H:%M:%S")
@dataclass
class Item: data: dict
class CleanPipeline:
def process_item(self, item: Item) -> Item:
item.data["name"] = item.data["name"].strip(); item.data["price"] = float(item.data["price"].replace("¥", "")); item.data["crawled_at"] = now_str(); return item
class SQLitePipeline:
def open_spider(self, name): self.conn=sqlite3.connect(":memory:"); self.conn.execute("CREATE TABLE products(name TEXT, price REAL, crawled_at TEXT)")
def process_item(self, item: Item): d=item.data; self.conn.execute("INSERT INTO products VALUES(?,?,?)", (d["name"], d["price"], d["crawled_at"])); self.conn.commit(); return item
def report(self): return self.conn.execute("SELECT name, price FROM products ORDER BY price DESC").fetchall()
def mode_pipeline(_) -> None:
clean=CleanPipeline(); db=SQLitePipeline(); db.open_spider("demo")
for raw in [Item({"name":" 机械键盘 ","price":"¥599"}), Item({"name":"显示器","price":"¥2199"})]:
db.process_item(clean.process_item(raw))
for name, price in db.report(): print(f"{name}: ¥{price:.2f}")
mode_pipeline(None)
Step 6:用 mode_middleware 演示 Middleware 请求拦截
痛点与机制:
mode_middleware 演示 UAMiddleware 对请求的拦截效果:每个请求经过 Middleware 后,meta 里都有了 User-Agent。Scrapy 的 DOWNLOADER_MIDDLEWARES 配置项就是这个列表,数字越小越靠近 Scrapy 引擎,越大越靠近下载器。
核心源码(逐字来自文末完整源码):
def mode_middleware(_: argparse.Namespace) -> None:
print(f"=== Middleware 拦截演示 [{now_str()}] ===\n")
mw = UAMiddleware()
reqs = [Request(url=f"https://example.com/page/{i}") for i in range(3)]
for req in reqs:
req = mw.process_request(req)
print(f"\n所有请求已注入 UA: {reqs[0].meta['User-Agent']}")
可运行演示(补齐 Mock 数据与 print 反馈):
from dataclasses import dataclass, field
@dataclass
class Request:
url: str
callback: str = "parse"
meta: dict = field(default_factory=dict)
class UAMiddleware:
def process_request(self, request: Request) -> Request:
request.meta["User-Agent"] = "Mozilla/5.0 (Macintosh) Safari/605.1.15"
print(f"[UAMiddleware] {request.url} 已加 UA")
return request
def mode_middleware(_) -> None:
mw=UAMiddleware()
reqs=[Request(url=f"https://example.com/page/{i}") for i in range(3)]
for req in reqs: mw.process_request(req)
print("第一个请求 UA:", reqs[0].meta["User-Agent"])
mode_middleware(None)
Step 7:用 mode_crawl 演示完整的爬取流程
痛点与机制:
mode_crawl 用 MiniEngine 跑完整的爬取流程:从 MOCK_PAGES 里取 URL,经过 Middleware 处理请求,解析响应,经过 Pipeline 处理 Item,最后从数据库里查询结果。这个演示让读者看到 Scrapy 的完整数据流:URL → Request → Middleware → Fetch → Response → parse → Item → Pipeline → 存储。
核心源码(逐字来自文末完整源码):
def mode_crawl(_: argparse.Namespace) -> None:
print(f"=== 完整爬取流程模拟 [{now_str()}] ===\n")
clean = CleanPipeline()
db = SQLitePipeline()
engine = MiniEngine(pipelines=[clean, db], middlewares=[UAMiddleware()])
urls = list(MOCK_PAGES.keys())
engine.run(urls)
rows = [[r[0], f"¥{r[1]:.2f}", r[2]] for r in db.report()]
print("\n" + ascii_table(["商品名称", "价格", "抓取时间"], rows, title="完整爬取结果"))
可运行演示(补齐 Mock 数据与 print 反馈):
from dataclasses import dataclass, field
@dataclass
class Request: url: str; callback: str = "parse"; meta: dict = field(default_factory=dict)
@dataclass
class Item: data: dict
MOCK_PAGES = {
"page-0": [{"name":"商品-0","price":"¥199"}, {"name":"商品-1","price":"¥398"}],
"page-1": [{"name":"商品-2","price":"¥597"}],
}
def mode_crawl(_) -> None:
print("=== 完整爬取流程模拟 ===")
total=0
for url, rows in MOCK_PAGES.items():
print("Request ->", url)
print("Response ->", len(rows), "条原始数据")
for row in rows:
item=Item(row.copy()); total += 1
print("Pipeline 保存 ->", item.data)
print("总保存:", total, "条")
mode_crawl(None)
Step 8:用 main 做 pipeline/middleware/crawl 三种模式的 CLI 总入口
痛点与机制:
main 用 argparse 做 CLI 入口,三种模式对应三个学习层次:pipeline 看数据处理链,middleware 看请求拦截,crawl 看完整爬取流程。用字典分发替代 if/elif 链,是 Python 里处理多分支的惯用写法。
核心源码(逐字来自文末完整源码):
def main() -> None:
p = argparse.ArgumentParser(description="Scrapy 设计思想演示")
p.add_argument("--mode", choices=["pipeline", "middleware", "crawl"], default="crawl")
args = p.parse_args()
{"pipeline": mode_pipeline, "middleware": mode_middleware, "crawl": mode_crawl}[args.mode](args)
可运行演示(补齐 Mock 数据与 print 反馈):
import argparse
import sys
def mode_pipeline(args): print("pipeline 模式: 演示 CleanPipeline -> SQLitePipeline")
def mode_middleware(args): print("middleware 模式: 演示请求经过 UAMiddleware")
def mode_crawl(args): print("crawl 模式: 演示 Request -> Response -> Item -> Pipeline")
def main() -> None:
parser=argparse.ArgumentParser(description="Scrapy 核心机制迷你复刻")
parser.add_argument("--mode", choices=["pipeline", "middleware", "crawl"], default="crawl")
args=parser.parse_args()
if args.mode == "pipeline": mode_pipeline(args)
elif args.mode == "middleware": mode_middleware(args)
else: mode_crawl(args)
for mode in ["pipeline", "middleware", "crawl"]:
sys.argv=["prog", "--mode", mode]
print(f">>> python3 28-python-scrapy.py --mode {mode}")
main()
极客实战:完整源码与运行
现在,把上面的积木拼起来,将以下完整代码放进你的编辑器,运行它。先看整体闭环,再回头逐段改参数,你会更容易建立工程直觉。
#!/usr/bin/env python3
"""
27_scrapy_demo.py — Scrapy 设计思想演示(零外部依赖)
用法:
python3 27_scrapy_demo.py --mode pipeline # 演示 Pipeline 链式处理
python3 27_scrapy_demo.py --mode middleware # 演示 Middleware 拦截
python3 27_scrapy_demo.py --mode crawl # 完整爬取流程模拟
"""
import argparse
import sqlite3
import time
from dataclasses import dataclass, field
from datetime import datetime
from typing import Any, Callable, Iterator
from zoneinfo import ZoneInfo
TZ = ZoneInfo("Asia/Shanghai")
def now_str() -> str:
return datetime.now(TZ).strftime("%Y-%m-%d %H:%M:%S")
def ascii_table(headers: list[str], rows: list[list[Any]], title: str = "") -> str:
col_w = [len(h) for h in headers]
for row in rows:
for i, cell in enumerate(row):
col_w[i] = max(col_w[i], len(str(cell)))
sep = "+" + "+".join("-" * (w + 2) for w in col_w) + "+"
fmt = "|" + "|".join(f" {{:<{w}}} " for w in col_w) + "|"
lines = []
if title:
total = sum(col_w) + 3 * len(col_w) + 1
lines += [sep, f"|{title.center(total - 2)}|"]
lines += [sep, fmt.format(*headers), sep]
for row in rows:
lines.append(fmt.format(*[str(c) for c in row]))
lines.append(sep)
return "\n".join(lines)
@dataclass
class Request:
url: str
callback: str = "parse"
meta: dict = field(default_factory=dict)
@dataclass
class Response:
url: str
body: str
status: int = 200
@dataclass
class Item:
data: dict
# Pipeline 接口
class BasePipeline:
def open_spider(self, spider_name: str) -> None: pass
def process_item(self, item: Item) -> Item: return item
def close_spider(self, spider_name: str) -> None: pass
# Middleware 接口
class BaseMiddleware:
def process_request(self, request: Request) -> Request: return request
def process_response(self, response: Response) -> Response: return response
# ── 具体实现 ─────────────────────────────────────────────────
class CleanPipeline(BasePipeline):
def process_item(self, item: Item) -> Item:
item.data["name"] = item.data.get("name", "").strip()
price = str(item.data.get("price", "0")).replace("¥", "")
item.data["price"] = float(price)
item.data["crawled_at"] = now_str()
print(f" [CleanPipeline] 清洗: {item.data['name']} → ¥{item.data['price']:.2f}")
return item
class SQLitePipeline(BasePipeline):
def open_spider(self, spider_name: str) -> None:
self.conn = sqlite3.connect(":memory:")
self.conn.execute(
"CREATE TABLE products(name TEXT, price REAL, url TEXT, crawled_at TEXT)"
)
print(f" [SQLitePipeline] 数据库已就绪")
def process_item(self, item: Item) -> Item:
d = item.data
self.conn.execute(
"INSERT INTO products VALUES(?,?,?,?)",
(d["name"], d["price"], d.get("url",""), d.get("crawled_at",""))
)
self.conn.commit()
return item
def report(self) -> list:
return self.conn.execute(
"SELECT name, price, crawled_at FROM products ORDER BY price DESC"
).fetchall()
class UAMiddleware(BaseMiddleware):
def process_request(self, request: Request) -> Request:
request.meta["User-Agent"] = "Mozilla/5.0 (Macintosh) Safari/605.1.15"
print(f" [UAMiddleware] 注入 UA → {request.url[:50]}")
return request
# ── 简化版引擎 ───────────────────────────────────────────────
MOCK_PAGES = {
f"https://mock-shop.example.com/products?page={i}": [
{"name": f"商品-{i*3+j}", "price": f"¥{(i*3+j+1)*199.0}", "url": f"/p/{i*3+j}"}
for j in range(3)
]
for i in range(2)
}
class MiniEngine:
def __init__(self, pipelines: list[BasePipeline], middlewares: list[BaseMiddleware]):
self.pipelines = pipelines
self.middlewares = middlewares
def _fetch(self, request: Request) -> Response:
for mw in self.middlewares:
request = mw.process_request(request)
body = str(MOCK_PAGES.get(request.url, []))
resp = Response(url=request.url, body=body)
for mw in reversed(self.middlewares):
resp = mw.process_response(resp)
return resp
def _process_item(self, item: Item) -> Item:
for pl in self.pipelines:
item = pl.process_item(item)
return item
def run(self, start_urls: list[str]) -> None:
for pl in self.pipelines:
pl.open_spider("shop")
queue = [Request(url=u) for u in start_urls]
while queue:
req = queue.pop(0)
resp = self._fetch(req)
import ast
items_data = ast.literal_eval(resp.body)
for d in items_data:
self._process_item(Item(data=d.copy()))
for pl in self.pipelines:
pl.close_spider("shop")
# ── CLI ──────────────────────────────────────────────────────
def mode_pipeline(_: argparse.Namespace) -> None:
print(f"=== Pipeline 链式处理演示 [{now_str()}] ===\n")
clean = CleanPipeline()
db = SQLitePipeline()
db.open_spider("demo")
items = [
Item({"name": " 机械键盘 Pro X ", "price": "¥599.0", "url": "/p/1"}),
Item({"name": "4K 显示器 27寸", "price": "¥2199.0", "url": "/p/2"}),
]
for item in items:
item = clean.process_item(item)
db.process_item(item)
rows = [[r[0], f"¥{r[1]:.2f}", r[2]] for r in db.report()]
print("\n" + ascii_table(["商品名称", "价格", "抓取时间"], rows, title="Pipeline 处理结果"))
def mode_middleware(_: argparse.Namespace) -> None:
print(f"=== Middleware 拦截演示 [{now_str()}] ===\n")
mw = UAMiddleware()
reqs = [Request(url=f"https://example.com/page/{i}") for i in range(3)]
for req in reqs:
req = mw.process_request(req)
print(f"\n所有请求已注入 UA: {reqs[0].meta['User-Agent']}")
def mode_crawl(_: argparse.Namespace) -> None:
print(f"=== 完整爬取流程模拟 [{now_str()}] ===\n")
clean = CleanPipeline()
db = SQLitePipeline()
engine = MiniEngine(pipelines=[clean, db], middlewares=[UAMiddleware()])
urls = list(MOCK_PAGES.keys())
engine.run(urls)
rows = [[r[0], f"¥{r[1]:.2f}", r[2]] for r in db.report()]
print("\n" + ascii_table(["商品名称", "价格", "抓取时间"], rows, title="完整爬取结果"))
def main() -> None:
p = argparse.ArgumentParser(description="Scrapy 设计思想演示")
p.add_argument("--mode", choices=["pipeline", "middleware", "crawl"], default="crawl")
args = p.parse_args()
{"pipeline": mode_pipeline, "middleware": mode_middleware, "crawl": mode_crawl}[args.mode](args)
if __name__ == "__main__":
main()
$ python3 28-python-scrapy.py --mode pipeline
=== Pipeline 链式处理演示 [2026-04-18 04:56:47] ===
[SQLitePipeline] 数据库已就绪
[CleanPipeline] 清洗: MacBook Pro M3 → ¥14999.00
[CleanPipeline] 清洗: iPhone 15 Pro → ¥8999.00
[CleanPipeline] 清洗: AirPods Pro → ¥1799.00
[SQLitePipeline] 已关闭,共保存 3 条
$ python3 28-python-scrapy.py --mode crawl
=== 完整爬取流程模拟 [2026-04-18 04:56:47] ===
[SQLitePipeline] 数据库已就绪
[UAMiddleware] 注入 UA → https://mock.shop/page/1
[CleanPipeline] 清洗: MacBook Pro M3 → ¥14999.00
[CleanPipeline] 清洗: iPhone 15 Pro → ¥8999.00
[UAMiddleware] 注入 UA → https://mock.shop/page/2
[CleanPipeline] 清洗: AirPods Pro → ¥1799.00
[SQLitePipeline] 已关闭,共保存 3 条
小结
| 概念 | 一句话记忆 |
|---|---|
Request |
封装 URL + 回调 + meta,在请求和解析之间传递上下文 |
Response |
封装响应体 + 状态码,传给 Spider.parse |
Item |
薄包装,只有 data 字典,让 Pipeline 统一处理 |
BasePipeline |
抽象基类,open_spider/process_item/close_spider 三个钩子 |
BaseMiddleware |
抽象基类,process_request/process_response 两个钩子 |
MiniEngine |
调度引擎,组装 Pipeline + Middleware,按顺序执行 |
| 责任链模式 | 每个处理器只关心自己的逻辑,不需要知道链上有哪些其他处理器 |
⏱ NexDo Time(5 分钟)
挑战:给 MiniEngine 加一个 DuplicateFilterMiddleware,过滤重复 URL。
具体步骤:
- 继承
BaseMiddleware,在__init__里初始化self._seen: set[str] = set() - 在
process_request里检查request.url是否在_seen里:如果是,把request.meta["skip"] = True;如果不是,加入_seen - 在
MiniEngine._fetch里检查request.meta.get("skip"),如果为 True 直接返回空 Response - 验证:传入两个相同 URL,第二个应该被跳过
Don’t wait for next time, do it in the next moment.