46 · 模型调优:GridSearchCV、学习曲线与模型持久化
🔗 知识图谱导航:阅读本文前,建议先回顾《39 · scikit-learn 实战》中的交叉验证与 Pipeline,以及《45 · SVM 支持向量机》中的
C、gamma等超参数;本文专门解决“模型有了,参数怎么系统调”的问题。
运行环境:
pip install numpy scikit-learn joblib。本文使用 sklearn 生成 Mock 分类数据,模型保存使用tempfile临时目录,不依赖外部文件。
痛点与架构:手动调参像凭感觉拧旋钮,容易试乱、漏试、偷看测试集。工程化调参要把训练集、验证集、测试集分清楚:训练集内部做交叉验证找参数,测试集最后只用一次,最终模型再保存复用。
模型调优先建立直觉
GridSearchCV:参数少时逐个穷举。
RandomizedSearchCV:参数多时随机抽样。
Learning Curve:看训练分/验证分,判断欠拟合或过拟合。
Model Persistence:把训练好的 Pipeline 保存成文件,后续直接加载预测。
极客解析:调参像试菜谱。网格搜索是把盐、火候、时间所有组合都试一遍;随机搜索是先抽样找大方向;学习曲线像顾客反馈表;模型持久化就是把最终菜谱存档。
步步为营:核心逻辑自适应拆解
这一篇拆成 7 个台阶:数据切分、调参地图、网格搜索、随机搜索、学习曲线、模型持久化和 CLI 调度。每段都能独立运行并打印结果。
Step 1:用 make_dataset 准备训练集和最后测试集
痛点与机制:
make_dataset 把数据切成训练集和测试集。调参时只能在训练集里用交叉验证反复比较,测试集像期末考试,最后只碰一次。否则你会把答案偷看进模型里,评估分数会虚高。
核心源码(逐字来自文末完整源码):
def make_dataset() -> tuple:
X, y = make_classification(
n_samples=1000, n_features=15, n_informative=8,
n_redundant=3, random_state=42,
)
return train_test_split(X, y, test_size=0.2, random_state=42)
可运行演示(补齐 Mock 数据与 print 反馈):
import numpy as np
from sklearn.datasets import make_classification
from sklearn.model_selection import train_test_split
def make_dataset() -> tuple:
X, y = make_classification(
n_samples=1000, n_features=15, n_informative=8,
n_redundant=3, random_state=42,
)
return train_test_split(X, y, test_size=0.2, random_state=42)
X_train, X_test, y_train, y_test = make_dataset()
print("训练集:", X_train.shape, y_train.shape)
print("测试集:", X_test.shape, y_test.shape)
print("类别分布:", dict(zip(*np.unique(y_train, return_counts=True))))
print("调参只在训练集里做,测试集最后只用一次")
Step 2:用 print_table 把调参工具整理成地图
痛点与机制:
模型调优工具很多,新手容易混在一起。表格像一张路线图:GridSearch 适合小范围穷举,RandomSearch 适合大范围抽样,学习曲线用来诊断问题,持久化用于把训练成果保存下来。
核心源码(逐字来自文末完整源码):
def print_table(headers: list[str], rows: list[list], widths: list[int]) -> None:
sep = "┼".join("─" * (w + 2) for w in widths)
print(f"┌{'┬'.join('─'*(w+2) for w in widths)}┐")
print(f"│{'│'.join(f' {str(h):<{w}} ' for h, w in zip(headers, widths))}│")
print(f"├{sep}┤")
for row in rows:
print(f"│{'│'.join(f' {str(v):<{w}} ' for v, w in zip(row, widths))}│")
print(f"└{'┴'.join('─'*(w+2) for w in widths)}┘")
可运行演示(补齐 Mock 数据与 print 反馈):
def print_table(headers: list[str], rows: list[list], widths: list[int]) -> None:
sep = "┼".join("─" * (w + 2) for w in widths)
print(f"┌{'┬'.join('─'*(w+2) for w in widths)}┐")
print(f"│{'│'.join(f' {str(h):<{w}} ' for h, w in zip(headers, widths))}│")
print(f"├{sep}┤")
for row in rows:
print(f"│{'│'.join(f' {str(v):<{w}} ' for v, w in zip(row, widths))}│")
print(f"└{'┴'.join('─'*(w+2) for w in widths)}┘")
print_table(
["方法", "像什么", "适合场景"],
[
["GridSearch", "逐格试菜谱", "参数组合少"],
["RandomSearch", "抽样试菜谱", "参数组合大"],
["LearningCurve", "体检报告", "诊断欠拟合/过拟合"],
["Persist", "模型存档", "训练后复用"],
],
[12, 14, 18],
)
Step 3:用 mode_grid 穷举少量参数组合
痛点与机制:
GridSearchCV 像把所有菜谱组合逐个试一遍。它慢但踏实,适合参数空间不大的时候。注意它在训练集内部做 K 折交叉验证,测试集只在最后评估一次。
核心源码(逐字来自文末完整源码):
def mode_grid() -> None:
section("GridSearchCV — 穷举超参数搜索")
X_train, X_test, y_train, y_test = make_dataset()
pipeline = Pipeline([
("scaler", StandardScaler()),
("clf", SVC(random_state=42)),
])
param_grid = {
"clf__kernel": ["linear", "rbf"],
"clf__C": [0.1, 1, 10],
"clf__gamma": ["scale", "auto"],
}
# 共 2×3×2=12 组,5折 = 60次 fit
t0 = time.perf_counter()
gs = GridSearchCV(pipeline, param_grid, cv=5, scoring="accuracy",
n_jobs=1, verbose=0)
gs.fit(X_train, y_train)
elapsed = time.perf_counter() - t0
print(f"\n 搜索空间: {len(gs.cv_results_['params'])} 组参数 "
f"× 5折 = {len(gs.cv_results_['params'])*5} 次 fit")
print(f" 总耗时: {elapsed:.2f}s\n")
# Top5 结果
results = sorted(
zip(gs.cv_results_["mean_test_score"],
gs.cv_results_["std_test_score"],
gs.cv_results_["params"]),
key=lambda item: item[0],
reverse=True,
)[:5]
rows = [[f"{s:.4f}", f"±{std:.4f}",
f"kernel={p['clf__kernel']} C={p['clf__C']} gamma={p['clf__gamma']}"]
for s, std, p in results]
print_table(["CV均值", "标准差", "参数组合"], rows, [8, 8, 42])
test_acc = accuracy_score(y_test, gs.predict(X_test))
print(f"\n 最优参数: {gs.best_params_}")
print(f" CV最优分: {gs.best_score_:.4f}")
print(f" 测试准确率: {test_acc:.4f} ← 只用一次!")
可运行演示(补齐 Mock 数据与 print 反馈):
import time
import numpy as np
from sklearn.datasets import make_classification
from sklearn.metrics import accuracy_score
from sklearn.model_selection import GridSearchCV, train_test_split
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.svm import SVC
def section(title: str) -> None:
print(f"\n{'='*62}\n {title}\n{'='*62}")
def print_table(headers: list[str], rows: list[list], widths: list[int]) -> None:
sep = "┼".join("─" * (w + 2) for w in widths)
print(f"┌{'┬'.join('─'*(w+2) for w in widths)}┐")
print(f"│{'│'.join(f' {str(h):<{w}} ' for h, w in zip(headers, widths))}│")
print(f"├{sep}┤")
for row in rows:
print(f"│{'│'.join(f' {str(v):<{w}} ' for v, w in zip(row, widths))}│")
print(f"└{'┴'.join('─'*(w+2) for w in widths)}┘")
def make_dataset() -> tuple:
X, y = make_classification(n_samples=240, n_features=10, n_informative=6, n_redundant=2, random_state=42)
return train_test_split(X, y, test_size=0.2, random_state=42)
def mode_grid() -> None:
section("GridSearchCV — 穷举超参数搜索")
X_train, X_test, y_train, y_test = make_dataset()
pipeline = Pipeline([
("scaler", StandardScaler()),
("clf", SVC(random_state=42)),
])
param_grid = {
"clf__kernel": ["linear", "rbf"],
"clf__C": [0.1, 1, 10],
"clf__gamma": ["scale", "auto"],
}
# 共 2×3×2=12 组,5折 = 60次 fit
t0 = time.perf_counter()
gs = GridSearchCV(pipeline, param_grid, cv=5, scoring="accuracy",
n_jobs=1, verbose=0)
gs.fit(X_train, y_train)
elapsed = time.perf_counter() - t0
print(f"\n 搜索空间: {len(gs.cv_results_['params'])} 组参数 "
f"× 5折 = {len(gs.cv_results_['params'])*5} 次 fit")
print(f" 总耗时: {elapsed:.2f}s\n")
# Top5 结果
results = sorted(
zip(gs.cv_results_["mean_test_score"],
gs.cv_results_["std_test_score"],
gs.cv_results_["params"]),
key=lambda item: item[0],
reverse=True,
)[:5]
rows = [[f"{s:.4f}", f"±{std:.4f}",
f"kernel={p['clf__kernel']} C={p['clf__C']} gamma={p['clf__gamma']}"]
for s, std, p in results]
print_table(["CV均值", "标准差", "参数组合"], rows, [8, 8, 42])
test_acc = accuracy_score(y_test, gs.predict(X_test))
print(f"\n 最优参数: {gs.best_params_}")
print(f" CV最优分: {gs.best_score_:.4f}")
print(f" 测试准确率: {test_acc:.4f} ← 只用一次!")
mode_grid()
Step 4:用 mode_random 在大空间里随机采样
痛点与机制:
RandomizedSearchCV 像在巨大菜谱书里抽样试菜。很多超参数其实没那么关键,随机搜索常常能用更少尝试找到接近最优的组合,适合参数空间很大时先粗筛。
核心源码(逐字来自文末完整源码):
def mode_random() -> None:
section("RandomizedSearchCV — 大参数空间随机采样")
X_train, X_test, y_train, y_test = make_dataset()
pipeline = Pipeline([
("scaler", StandardScaler()),
("clf", RandomForestClassifier(random_state=42)),
])
# 参数空间:4×5×4×3 = 240 组,只采样 30 组
param_dist: dict[str, Any] = {
"clf__n_estimators": [50, 100, 200, 300],
"clf__max_depth": [3, 5, 8, 10, None],
"clf__min_samples_split": [2, 5, 10, 20],
"clf__max_features": ["sqrt", "log2", 0.5],
}
t0 = time.perf_counter()
rs = RandomizedSearchCV(pipeline, param_dist, n_iter=30, cv=5,
scoring="accuracy", n_jobs=1, random_state=42)
rs.fit(X_train, y_train)
elapsed = time.perf_counter() - t0
print(f"\n 参数空间: 240 组 采样: 30 组 × 5折 = 150 次 fit")
print(f" 总耗时: {elapsed:.2f}s(穷举需 ~{elapsed/30*240:.0f}s)\n")
results = sorted(
zip(rs.cv_results_["mean_test_score"],
rs.cv_results_["params"]),
key=lambda item: item[0],
reverse=True,
)[:3]
rows = [[f"{s:.4f}",
f"n={p['clf__n_estimators']} depth={p['clf__max_depth']} "
f"split={p['clf__min_samples_split']}"]
for s, p in results]
print_table(["CV均值", "Top3 参数"], rows, [8, 50])
test_acc = accuracy_score(y_test, rs.predict(X_test))
print(f"\n 最优CV分: {rs.best_score_:.4f} 测试准确率: {test_acc:.4f}")
可运行演示(补齐 Mock 数据与 print 反馈):
import time
from typing import Any
import numpy as np
from sklearn.datasets import make_classification
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import accuracy_score
from sklearn.model_selection import RandomizedSearchCV, train_test_split
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
def section(title: str) -> None:
print(f"\n{'='*62}\n {title}\n{'='*62}")
def print_table(headers: list[str], rows: list[list], widths: list[int]) -> None:
sep = "┼".join("─" * (w + 2) for w in widths)
print(f"┌{'┬'.join('─'*(w+2) for w in widths)}┐")
print(f"│{'│'.join(f' {str(h):<{w}} ' for h, w in zip(headers, widths))}│")
print(f"├{sep}┤")
for row in rows:
print(f"│{'│'.join(f' {str(v):<{w}} ' for v, w in zip(row, widths))}│")
print(f"└{'┴'.join('─'*(w+2) for w in widths)}┘")
# 演示层用小数据,但保留随机搜索的工作方式。
def make_dataset() -> tuple:
X, y = make_classification(n_samples=240, n_features=10, n_informative=6, n_redundant=2, random_state=42)
return train_test_split(X, y, test_size=0.2, random_state=42)
def mode_random() -> None:
section("RandomizedSearchCV — 大参数空间随机采样")
X_train, X_test, y_train, y_test = make_dataset()
pipeline = Pipeline([
("scaler", StandardScaler()),
("clf", RandomForestClassifier(random_state=42)),
])
# 参数空间:4×5×4×3 = 240 组,只采样 30 组
param_dist: dict[str, Any] = {
"clf__n_estimators": [50, 100, 200, 300],
"clf__max_depth": [3, 5, 8, 10, None],
"clf__min_samples_split": [2, 5, 10, 20],
"clf__max_features": ["sqrt", "log2", 0.5],
}
t0 = time.perf_counter()
rs = RandomizedSearchCV(pipeline, param_dist, n_iter=6, cv=3,
scoring="accuracy", n_jobs=1, random_state=42)
rs.fit(X_train, y_train)
elapsed = time.perf_counter() - t0
print(f"\n 参数空间: 240 组 采样: 6 组 × 3折 = 18 次 fit")
print(f" 总耗时: {elapsed:.2f}s(穷举需 ~{elapsed/6*240:.0f}s)\n")
results = sorted(
zip(rs.cv_results_["mean_test_score"],
rs.cv_results_["params"]),
key=lambda item: item[0],
reverse=True,
)[:3]
rows = [[f"{s:.4f}",
f"n={p['clf__n_estimators']} depth={p['clf__max_depth']} "
f"split={p['clf__min_samples_split']}"]
for s, p in results]
print_table(["CV均值", "Top3 参数"], rows, [8, 50])
test_acc = accuracy_score(y_test, rs.predict(X_test))
print(f"\n 最优CV分: {rs.best_score_:.4f} 测试准确率: {test_acc:.4f}")
mode_random()
Step 5:用 mode_curve 诊断欠拟合和过拟合
痛点与机制:
学习曲线像模型体检报告:训练分低、验证分也低,通常是欠拟合;训练分很高、验证分低,通常是过拟合;两条线都高且接近,才是比较健康。
核心源码(逐字来自文末完整源码):
def mode_curve() -> None:
section("学习曲线 — 诊断过拟合 / 欠拟合")
X_train, X_test, y_train, y_test = make_dataset()
models = [
("欠拟合(depth=1)", RandomForestClassifier(max_depth=1, n_estimators=50, random_state=42)),
("适度拟合(depth=5)", RandomForestClassifier(max_depth=5, n_estimators=50, random_state=42)),
("过拟合(depth=None)",RandomForestClassifier(max_depth=None,n_estimators=50,random_state=42)),
]
train_sizes = np.linspace(0.1, 1.0, 8)
for name, model in models:
tr_sizes, tr_scores, val_scores = learning_curve(
model, X_train, y_train,
train_sizes=train_sizes, cv=5, scoring="accuracy", n_jobs=1,
)
tr_mean = tr_scores.mean(axis=1)
val_mean = val_scores.mean(axis=1)
# ASCII 折线图
W = 40
print(f"\n {name}")
print(f" {'样本数':<8} {'训练分':<8} {'验证分':<8} 差距")
print(f" {'─'*50}")
for n, tr, val in zip(tr_sizes, tr_mean, val_mean):
bar_tr = "█" * int(tr * W)
bar_val = "█" * int(val * W)
gap = tr - val
flag = "⚠ 过拟合" if gap > 0.1 else ("⚠ 欠拟合" if val < 0.75 else "✓")
print(f" {int(n):<8} {tr:.3f} {val:.3f} {gap:+.3f} {flag}")
可运行演示(补齐 Mock 数据与 print 反馈):
import time
import numpy as np
from sklearn.datasets import make_classification
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import learning_curve, train_test_split
def section(title: str) -> None:
print(f"\n{'='*62}\n {title}\n{'='*62}")
# 演示层缩短 cv 和模型规模,避免学习曲线等待太久。
def make_dataset() -> tuple:
X, y = make_classification(n_samples=260, n_features=10, n_informative=6, n_redundant=2, random_state=42)
return train_test_split(X, y, test_size=0.2, random_state=42)
def mode_curve() -> None:
section("学习曲线 — 诊断过拟合 / 欠拟合")
X_train, X_test, y_train, y_test = make_dataset()
models = [
("欠拟合(depth=1)", RandomForestClassifier(max_depth=1, n_estimators=20, random_state=42)),
("适度拟合(depth=5)", RandomForestClassifier(max_depth=5, n_estimators=20, random_state=42)),
("过拟合(depth=None)",RandomForestClassifier(max_depth=None,n_estimators=20,random_state=42)),
]
train_sizes = np.linspace(0.1, 1.0, 8)
for name, model in models:
tr_sizes, tr_scores, val_scores = learning_curve(
model, X_train, y_train,
train_sizes=train_sizes, cv=3, scoring="accuracy", n_jobs=1,
)
tr_mean = tr_scores.mean(axis=1)
val_mean = val_scores.mean(axis=1)
# ASCII 折线图
W = 40
print(f"\n {name}")
print(f" {'样本数':<8} {'训练分':<8} {'验证分':<8} 差距")
print(f" {'─'*50}")
for n, tr, val in zip(tr_sizes, tr_mean, val_mean):
bar_tr = "█" * int(tr * W)
bar_val = "█" * int(val * W)
gap = tr - val
flag = "⚠ 过拟合" if gap > 0.1 else ("⚠ 欠拟合" if val < 0.75 else "✓")
print(f" {int(n):<8} {tr:.3f} {val:.3f} {gap:+.3f} {flag}")
mode_curve()
Step 6:用 mode_persist 保存和加载训练好的模型
痛点与机制:
模型持久化像把厨师的手艺封装成菜谱文件。训练一次后保存,下次加载就能预测。演示使用 tempfile 临时目录,不会要求读者手动准备路径,也不会污染项目。
核心源码(逐字来自文末完整源码):
def mode_persist() -> None:
section("模型持久化 — joblib / pickle 对比")
import pickle
try:
import joblib
has_joblib = True
except ImportError:
has_joblib = False
X_train, X_test, y_train, y_test = make_dataset()
pipeline = Pipeline([
("scaler", StandardScaler()),
("clf", RandomForestClassifier(n_estimators=100, random_state=42)),
])
pipeline.fit(X_train, y_train)
baseline_acc = accuracy_score(y_test, pipeline.predict(X_test))
print(f"\n 原始模型测试准确率: {baseline_acc:.4f}")
with tempfile.TemporaryDirectory() as tmpdir:
rows = []
# pickle
pkl_path = os.path.join(tmpdir, "model.pkl")
t0 = time.perf_counter()
with open(pkl_path, "wb") as f:
pickle.dump(pipeline, f)
save_t = time.perf_counter() - t0
size_kb = os.path.getsize(pkl_path) / 1024
t0 = time.perf_counter()
with open(pkl_path, "rb") as f:
loaded = pickle.load(f)
load_t = time.perf_counter() - t0
acc = accuracy_score(y_test, loaded.predict(X_test))
rows.append(["pickle", f"{size_kb:.1f} KB",
f"{save_t*1000:.1f}ms", f"{load_t*1000:.1f}ms", f"{acc:.4f}"])
# joblib
if has_joblib:
jbl_path = os.path.join(tmpdir, "model.joblib")
t0 = time.perf_counter()
joblib.dump(pipeline, jbl_path)
save_t = time.perf_counter() - t0
size_kb = os.path.getsize(jbl_path) / 1024
t0 = time.perf_counter()
loaded2 = joblib.load(jbl_path)
load_t = time.perf_counter() - t0
acc2 = accuracy_score(y_test, loaded2.predict(X_test))
rows.append(["joblib", f"{size_kb:.1f} KB",
f"{save_t*1000:.1f}ms", f"{load_t*1000:.1f}ms", f"{acc2:.4f}"])
print_table(["方式", "文件大小", "保存耗时", "加载耗时", "加载后准确率"],
rows, [8, 10, 10, 10, 12])
print("\n 💡 joblib 对 numpy 数组做了内存映射优化,大模型推荐 joblib")
可运行演示(补齐 Mock 数据与 print 反馈):
import os
import tempfile
import time
import numpy as np
from sklearn.datasets import make_classification
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import accuracy_score
from sklearn.model_selection import train_test_split
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
def section(title: str) -> None:
print(f"\n{'='*62}\n {title}\n{'='*62}")
def print_table(headers: list[str], rows: list[list], widths: list[int]) -> None:
sep = "┼".join("─" * (w + 2) for w in widths)
print(f"┌{'┬'.join('─'*(w+2) for w in widths)}┐")
print(f"│{'│'.join(f' {str(h):<{w}} ' for h, w in zip(headers, widths))}│")
print(f"├{sep}┤")
for row in rows:
print(f"│{'│'.join(f' {str(v):<{w}} ' for v, w in zip(row, widths))}│")
print(f"└{'┴'.join('─'*(w+2) for w in widths)}┘")
# 演示层用小森林,重点验证保存/加载闭环。
def make_dataset() -> tuple:
X, y = make_classification(n_samples=240, n_features=10, n_informative=6, n_redundant=2, random_state=42)
return train_test_split(X, y, test_size=0.2, random_state=42)
def mode_persist() -> None:
section("模型持久化 — joblib / pickle 对比")
import pickle
try:
import joblib
has_joblib = True
except ImportError:
has_joblib = False
X_train, X_test, y_train, y_test = make_dataset()
pipeline = Pipeline([
("scaler", StandardScaler()),
("clf", RandomForestClassifier(n_estimators=20, random_state=42)),
])
pipeline.fit(X_train, y_train)
baseline_acc = accuracy_score(y_test, pipeline.predict(X_test))
print(f"\n 原始模型测试准确率: {baseline_acc:.4f}")
with tempfile.TemporaryDirectory() as tmpdir:
rows = []
# pickle
pkl_path = os.path.join(tmpdir, "model.pkl")
t0 = time.perf_counter()
with open(pkl_path, "wb") as f:
pickle.dump(pipeline, f)
save_t = time.perf_counter() - t0
size_kb = os.path.getsize(pkl_path) / 1024
t0 = time.perf_counter()
with open(pkl_path, "rb") as f:
loaded = pickle.load(f)
load_t = time.perf_counter() - t0
acc = accuracy_score(y_test, loaded.predict(X_test))
rows.append(["pickle", f"{size_kb:.1f} KB",
f"{save_t*1000:.1f}ms", f"{load_t*1000:.1f}ms", f"{acc:.4f}"])
# joblib
if has_joblib:
jbl_path = os.path.join(tmpdir, "model.joblib")
t0 = time.perf_counter()
joblib.dump(pipeline, jbl_path)
save_t = time.perf_counter() - t0
size_kb = os.path.getsize(jbl_path) / 1024
t0 = time.perf_counter()
loaded2 = joblib.load(jbl_path)
load_t = time.perf_counter() - t0
acc2 = accuracy_score(y_test, loaded2.predict(X_test))
rows.append(["joblib", f"{size_kb:.1f} KB",
f"{save_t*1000:.1f}ms", f"{load_t*1000:.1f}ms", f"{acc2:.4f}"])
print_table(["方式", "文件大小", "保存耗时", "加载耗时", "加载后准确率"],
rows, [8, 10, 10, 10, 12])
print("\n 💡 joblib 对 numpy 数组做了内存映射优化,大模型推荐 joblib")
mode_persist()
Step 7:用 main 做 grid/random/curve/persist/all 命令调度
痛点与机制:
main 是脚本遥控器。新手不用改源码,只要换 --mode,就能分别运行网格搜索、随机搜索、学习曲线和模型保存。默认 grid 更快,all 会完整跑但更耗时。
核心源码(逐字来自文末完整源码):
def main() -> None:
parser = argparse.ArgumentParser(description="模型调优完整工作流")
parser.add_argument(
"--mode",
choices=["grid", "random", "curve", "persist", "all"],
default="grid",
help="默认运行 grid 快速闭环;all 会完整跑四个模式,耗时更长",
)
args = parser.parse_args()
dispatch = {
"grid": mode_grid,
"random": mode_random,
"curve": mode_curve,
"persist": mode_persist,
"all": lambda: [mode_grid(), mode_random(), mode_curve(), mode_persist()],
}
dispatch[args.mode]()
可运行演示(补齐 Mock 数据与 print 反馈):
import argparse
import sys
def mode_grid() -> None:
print("grid:穷举少量参数组合,找最优配置")
def mode_random() -> None:
print("random:在大参数空间里随机抽样,省时间")
def mode_curve() -> None:
print("curve:看训练分和验证分,判断欠拟合/过拟合")
def mode_persist() -> None:
print("persist:把训练好的模型保存下来,下次直接加载")
def main() -> None:
parser = argparse.ArgumentParser(description="模型调优完整工作流")
parser.add_argument(
"--mode",
choices=["grid", "random", "curve", "persist", "all"],
default="grid",
help="默认运行 grid 快速闭环;all 会完整跑四个模式,耗时更长",
)
args = parser.parse_args()
dispatch = {
"grid": mode_grid,
"random": mode_random,
"curve": mode_curve,
"persist": mode_persist,
"all": lambda: [mode_grid(), mode_random(), mode_curve(), mode_persist()],
}
dispatch[args.mode]()
for mode in ["grid", "random", "curve", "persist", "all"]:
print(f"\n$ python3 46-python-model-tuning.py --mode {mode}")
sys.argv = ["prog", "--mode", mode]
main()
极客实战:完整源码与运行
现在,把上面的积木拼起来,将以下完整代码放进你的编辑器。建议先跑 --mode grid 快速闭环,再按需运行 random、curve、persist;all 会更耗时。
#!/usr/bin/env python3
"""
46-python-model-tuning.py — 模型调优完整工作流
用法:
python3 46-python-model-tuning.py --mode grid # GridSearchCV
python3 46-python-model-tuning.py --mode random # RandomizedSearchCV
python3 46-python-model-tuning.py --mode curve # 学习曲线诊断
python3 46-python-model-tuning.py --mode persist # 模型持久化
python3 46-python-model-tuning.py --mode all # 全部演示
除 sklearn/numpy 外无额外依赖,直接运行。
"""
import argparse
import io
import os
import tempfile
import time
from typing import Any
import numpy as np
from sklearn.datasets import make_classification
from sklearn.ensemble import RandomForestClassifier
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import accuracy_score, classification_report
from sklearn.model_selection import (
GridSearchCV,
RandomizedSearchCV,
cross_val_score,
learning_curve,
train_test_split,
)
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.svm import SVC
# ─── 工具 ──────────────────────────────────────────────────────────────────────
def section(title: str) -> None:
print(f"\n{'='*62}\n {title}\n{'='*62}")
def print_table(headers: list[str], rows: list[list], widths: list[int]) -> None:
sep = "┼".join("─" * (w + 2) for w in widths)
print(f"┌{'┬'.join('─'*(w+2) for w in widths)}┐")
print(f"│{'│'.join(f' {str(h):<{w}} ' for h, w in zip(headers, widths))}│")
print(f"├{sep}┤")
for row in rows:
print(f"│{'│'.join(f' {str(v):<{w}} ' for v, w in zip(row, widths))}│")
print(f"└{'┴'.join('─'*(w+2) for w in widths)}┘")
def make_dataset() -> tuple:
X, y = make_classification(
n_samples=1000, n_features=15, n_informative=8,
n_redundant=3, random_state=42,
)
return train_test_split(X, y, test_size=0.2, random_state=42)
# ─── 模式1:GridSearchCV ───────────────────────────────────────────────────────
def mode_grid() -> None:
section("GridSearchCV — 穷举超参数搜索")
X_train, X_test, y_train, y_test = make_dataset()
pipeline = Pipeline([
("scaler", StandardScaler()),
("clf", SVC(random_state=42)),
])
param_grid = {
"clf__kernel": ["linear", "rbf"],
"clf__C": [0.1, 1, 10],
"clf__gamma": ["scale", "auto"],
}
# 共 2×3×2=12 组,5折 = 60次 fit
t0 = time.perf_counter()
gs = GridSearchCV(pipeline, param_grid, cv=5, scoring="accuracy",
n_jobs=1, verbose=0)
gs.fit(X_train, y_train)
elapsed = time.perf_counter() - t0
print(f"\n 搜索空间: {len(gs.cv_results_['params'])} 组参数 "
f"× 5折 = {len(gs.cv_results_['params'])*5} 次 fit")
print(f" 总耗时: {elapsed:.2f}s\n")
# Top5 结果
results = sorted(
zip(gs.cv_results_["mean_test_score"],
gs.cv_results_["std_test_score"],
gs.cv_results_["params"]),
key=lambda item: item[0],
reverse=True,
)[:5]
rows = [[f"{s:.4f}", f"±{std:.4f}",
f"kernel={p['clf__kernel']} C={p['clf__C']} gamma={p['clf__gamma']}"]
for s, std, p in results]
print_table(["CV均值", "标准差", "参数组合"], rows, [8, 8, 42])
test_acc = accuracy_score(y_test, gs.predict(X_test))
print(f"\n 最优参数: {gs.best_params_}")
print(f" CV最优分: {gs.best_score_:.4f}")
print(f" 测试准确率: {test_acc:.4f} ← 只用一次!")
# ─── 模式2:RandomizedSearchCV ────────────────────────────────────────────────
def mode_random() -> None:
section("RandomizedSearchCV — 大参数空间随机采样")
X_train, X_test, y_train, y_test = make_dataset()
pipeline = Pipeline([
("scaler", StandardScaler()),
("clf", RandomForestClassifier(random_state=42)),
])
# 参数空间:4×5×4×3 = 240 组,只采样 30 组
param_dist: dict[str, Any] = {
"clf__n_estimators": [50, 100, 200, 300],
"clf__max_depth": [3, 5, 8, 10, None],
"clf__min_samples_split": [2, 5, 10, 20],
"clf__max_features": ["sqrt", "log2", 0.5],
}
t0 = time.perf_counter()
rs = RandomizedSearchCV(pipeline, param_dist, n_iter=30, cv=5,
scoring="accuracy", n_jobs=1, random_state=42)
rs.fit(X_train, y_train)
elapsed = time.perf_counter() - t0
print(f"\n 参数空间: 240 组 采样: 30 组 × 5折 = 150 次 fit")
print(f" 总耗时: {elapsed:.2f}s(穷举需 ~{elapsed/30*240:.0f}s)\n")
results = sorted(
zip(rs.cv_results_["mean_test_score"],
rs.cv_results_["params"]),
key=lambda item: item[0],
reverse=True,
)[:3]
rows = [[f"{s:.4f}",
f"n={p['clf__n_estimators']} depth={p['clf__max_depth']} "
f"split={p['clf__min_samples_split']}"]
for s, p in results]
print_table(["CV均值", "Top3 参数"], rows, [8, 50])
test_acc = accuracy_score(y_test, rs.predict(X_test))
print(f"\n 最优CV分: {rs.best_score_:.4f} 测试准确率: {test_acc:.4f}")
# ─── 模式3:学习曲线诊断 ───────────────────────────────────────────────────────
def mode_curve() -> None:
section("学习曲线 — 诊断过拟合 / 欠拟合")
X_train, X_test, y_train, y_test = make_dataset()
models = [
("欠拟合(depth=1)", RandomForestClassifier(max_depth=1, n_estimators=50, random_state=42)),
("适度拟合(depth=5)", RandomForestClassifier(max_depth=5, n_estimators=50, random_state=42)),
("过拟合(depth=None)",RandomForestClassifier(max_depth=None,n_estimators=50,random_state=42)),
]
train_sizes = np.linspace(0.1, 1.0, 8)
for name, model in models:
tr_sizes, tr_scores, val_scores = learning_curve(
model, X_train, y_train,
train_sizes=train_sizes, cv=5, scoring="accuracy", n_jobs=1,
)
tr_mean = tr_scores.mean(axis=1)
val_mean = val_scores.mean(axis=1)
# ASCII 折线图
W = 40
print(f"\n {name}")
print(f" {'样本数':<8} {'训练分':<8} {'验证分':<8} 差距")
print(f" {'─'*50}")
for n, tr, val in zip(tr_sizes, tr_mean, val_mean):
bar_tr = "█" * int(tr * W)
bar_val = "█" * int(val * W)
gap = tr - val
flag = "⚠ 过拟合" if gap > 0.1 else ("⚠ 欠拟合" if val < 0.75 else "✓")
print(f" {int(n):<8} {tr:.3f} {val:.3f} {gap:+.3f} {flag}")
# ─── 模式4:模型持久化 ─────────────────────────────────────────────────────────
def mode_persist() -> None:
section("模型持久化 — joblib / pickle 对比")
import pickle
try:
import joblib
has_joblib = True
except ImportError:
has_joblib = False
X_train, X_test, y_train, y_test = make_dataset()
pipeline = Pipeline([
("scaler", StandardScaler()),
("clf", RandomForestClassifier(n_estimators=100, random_state=42)),
])
pipeline.fit(X_train, y_train)
baseline_acc = accuracy_score(y_test, pipeline.predict(X_test))
print(f"\n 原始模型测试准确率: {baseline_acc:.4f}")
with tempfile.TemporaryDirectory() as tmpdir:
rows = []
# pickle
pkl_path = os.path.join(tmpdir, "model.pkl")
t0 = time.perf_counter()
with open(pkl_path, "wb") as f:
pickle.dump(pipeline, f)
save_t = time.perf_counter() - t0
size_kb = os.path.getsize(pkl_path) / 1024
t0 = time.perf_counter()
with open(pkl_path, "rb") as f:
loaded = pickle.load(f)
load_t = time.perf_counter() - t0
acc = accuracy_score(y_test, loaded.predict(X_test))
rows.append(["pickle", f"{size_kb:.1f} KB",
f"{save_t*1000:.1f}ms", f"{load_t*1000:.1f}ms", f"{acc:.4f}"])
# joblib
if has_joblib:
jbl_path = os.path.join(tmpdir, "model.joblib")
t0 = time.perf_counter()
joblib.dump(pipeline, jbl_path)
save_t = time.perf_counter() - t0
size_kb = os.path.getsize(jbl_path) / 1024
t0 = time.perf_counter()
loaded2 = joblib.load(jbl_path)
load_t = time.perf_counter() - t0
acc2 = accuracy_score(y_test, loaded2.predict(X_test))
rows.append(["joblib", f"{size_kb:.1f} KB",
f"{save_t*1000:.1f}ms", f"{load_t*1000:.1f}ms", f"{acc2:.4f}"])
print_table(["方式", "文件大小", "保存耗时", "加载耗时", "加载后准确率"],
rows, [8, 10, 10, 10, 12])
print("\n 💡 joblib 对 numpy 数组做了内存映射优化,大模型推荐 joblib")
# ─── 入口 ─────────────────────────────────────────────────────────────────────
def main() -> None:
parser = argparse.ArgumentParser(description="模型调优完整工作流")
parser.add_argument(
"--mode",
choices=["grid", "random", "curve", "persist", "all"],
default="grid",
help="默认运行 grid 快速闭环;all 会完整跑四个模式,耗时更长",
)
args = parser.parse_args()
dispatch = {
"grid": mode_grid,
"random": mode_random,
"curve": mode_curve,
"persist": mode_persist,
"all": lambda: [mode_grid(), mode_random(), mode_curve(), mode_persist()],
}
dispatch[args.mode]()
if __name__ == "__main__":
main()
$ python3 46-python-model-tuning.py --mode grid
==============================================================
GridSearchCV — 穷举超参数搜索
==============================================================
搜索空间: 12 组参数 × 5折 = 60 次 fit
总耗时: 0.64s
┌──────────┬──────────┬────────────────────────────────────────────┐
│ CV均值 │ 标准差 │ 参数组合 │
├──────────┼──────────┼────────────────────────────────────────────┤
│ 0.8875 │ ±0.0185 │ kernel=rbf C=10 gamma=scale │
│ 0.8875 │ ±0.0185 │ kernel=rbf C=10 gamma=auto │
│ 0.8800 │ ±0.0165 │ kernel=rbf C=1 gamma=scale │
│ 0.8800 │ ±0.0165 │ kernel=rbf C=1 gamma=auto │
│ 0.8088 │ ±0.0116 │ kernel=rbf C=0.1 gamma=scale │
└──────────┴──────────┴────────────────────────────────────────────┘
最优参数: {'clf__C': 10, 'clf__gamma': 'scale', 'clf__kernel': 'rbf'}
CV最优分: 0.8875
测试准确率: 0.8800 ← 只用一次!
$ python3 46-python-model-tuning.py --mode persist
==============================================================
模型持久化 — joblib / pickle 对比
==============================================================
原始模型测试准确率: 0.8250
┌──────────┬────────────┬────────────┬────────────┬──────────────┐
│ 方式 │ 文件大小 │ 保存耗时 │ 加载耗时 │ 加载后准确率 │
├──────────┼────────────┼────────────┼────────────┼──────────────┤
│ pickle │ 1578.6 KB │ 1.5ms │ 1.2ms │ 0.8250 │
│ joblib │ 1586.4 KB │ 9.6ms │ 6.4ms │ 0.8250 │
└──────────┴────────────┴────────────┴────────────┴──────────────┘
💡 joblib 对 numpy 数组做了内存映射优化,大模型推荐 joblib
小结
| 模块 | 你要记住什么 |
|---|---|
make_dataset |
切出训练集和测试集,测试集最后只用一次 |
mode_grid |
小参数空间用穷举搜索,结果可解释 |
mode_random |
大参数空间先随机采样,速度更友好 |
mode_curve |
用训练/验证分数差距诊断模型问题 |
mode_persist |
用临时目录演示模型保存和加载闭环 |
main |
用 --mode 分层运行调参工作流 |
⏱ NexDo Time(5 分钟)
挑战:把 mode_grid() 的 clf__C 改成 [0.01, 0.1, 1, 10, 100],重新运行 --mode grid,观察最优参数和测试准确率是否变化。
Don’t wait for next time, do it in the next moment.