文章

46 · 模型调优:GridSearchCV、学习曲线与模型持久化

#025 · 2026-04-16 · Python

🔗 知识图谱导航:阅读本文前,建议先回顾《39 · scikit-learn 实战》中的交叉验证与 Pipeline,以及《45 · SVM 支持向量机》中的 Cgamma 等超参数;本文专门解决“模型有了,参数怎么系统调”的问题。

运行环境pip install numpy scikit-learn joblib。本文使用 sklearn 生成 Mock 分类数据,模型保存使用 tempfile 临时目录,不依赖外部文件。

痛点与架构:手动调参像凭感觉拧旋钮,容易试乱、漏试、偷看测试集。工程化调参要把训练集、验证集、测试集分清楚:训练集内部做交叉验证找参数,测试集最后只用一次,最终模型再保存复用。

模型调优先建立直觉

GridSearchCV:参数少时逐个穷举。
RandomizedSearchCV:参数多时随机抽样。
Learning Curve:看训练分/验证分,判断欠拟合或过拟合。
Model Persistence:把训练好的 Pipeline 保存成文件,后续直接加载预测。

极客解析:调参像试菜谱。网格搜索是把盐、火候、时间所有组合都试一遍;随机搜索是先抽样找大方向;学习曲线像顾客反馈表;模型持久化就是把最终菜谱存档。

步步为营:核心逻辑自适应拆解

这一篇拆成 7 个台阶:数据切分、调参地图、网格搜索、随机搜索、学习曲线、模型持久化和 CLI 调度。每段都能独立运行并打印结果。

Step 1:用 make_dataset 准备训练集和最后测试集

痛点与机制

make_dataset 把数据切成训练集和测试集。调参时只能在训练集里用交叉验证反复比较,测试集像期末考试,最后只碰一次。否则你会把答案偷看进模型里,评估分数会虚高。

核心源码(逐字来自文末完整源码)

def make_dataset() -> tuple:
    X, y = make_classification(
        n_samples=1000, n_features=15, n_informative=8,
        n_redundant=3, random_state=42,
    )
    return train_test_split(X, y, test_size=0.2, random_state=42)

可运行演示(补齐 Mock 数据与 print 反馈)

import numpy as np
from sklearn.datasets import make_classification
from sklearn.model_selection import train_test_split

def make_dataset() -> tuple:
    X, y = make_classification(
        n_samples=1000, n_features=15, n_informative=8,
        n_redundant=3, random_state=42,
    )
    return train_test_split(X, y, test_size=0.2, random_state=42)

X_train, X_test, y_train, y_test = make_dataset()
print("训练集:", X_train.shape, y_train.shape)
print("测试集:", X_test.shape, y_test.shape)
print("类别分布:", dict(zip(*np.unique(y_train, return_counts=True))))
print("调参只在训练集里做,测试集最后只用一次")

Step 2:用 print_table 把调参工具整理成地图

痛点与机制

模型调优工具很多,新手容易混在一起。表格像一张路线图:GridSearch 适合小范围穷举,RandomSearch 适合大范围抽样,学习曲线用来诊断问题,持久化用于把训练成果保存下来。

核心源码(逐字来自文末完整源码)

def print_table(headers: list[str], rows: list[list], widths: list[int]) -> None:
    sep = "┼".join("─" * (w + 2) for w in widths)
    print(f"┌{'┬'.join('─'*(w+2) for w in widths)}┐")
    print(f"│{'│'.join(f' {str(h):<{w}} ' for h, w in zip(headers, widths))}│")
    print(f"├{sep}┤")
    for row in rows:
        print(f"│{'│'.join(f' {str(v):<{w}} ' for v, w in zip(row, widths))}│")
    print(f"└{'┴'.join('─'*(w+2) for w in widths)}┘")

可运行演示(补齐 Mock 数据与 print 反馈)

def print_table(headers: list[str], rows: list[list], widths: list[int]) -> None:
    sep = "┼".join("─" * (w + 2) for w in widths)
    print(f"┌{'┬'.join('─'*(w+2) for w in widths)}┐")
    print(f"│{'│'.join(f' {str(h):<{w}} ' for h, w in zip(headers, widths))}│")
    print(f"├{sep}┤")
    for row in rows:
        print(f"│{'│'.join(f' {str(v):<{w}} ' for v, w in zip(row, widths))}│")
    print(f"└{'┴'.join('─'*(w+2) for w in widths)}┘")

print_table(
    ["方法", "像什么", "适合场景"],
    [
        ["GridSearch", "逐格试菜谱", "参数组合少"],
        ["RandomSearch", "抽样试菜谱", "参数组合大"],
        ["LearningCurve", "体检报告", "诊断欠拟合/过拟合"],
        ["Persist", "模型存档", "训练后复用"],
    ],
    [12, 14, 18],
)

Step 3:用 mode_grid 穷举少量参数组合

痛点与机制

GridSearchCV 像把所有菜谱组合逐个试一遍。它慢但踏实,适合参数空间不大的时候。注意它在训练集内部做 K 折交叉验证,测试集只在最后评估一次。

核心源码(逐字来自文末完整源码)

def mode_grid() -> None:
    section("GridSearchCV — 穷举超参数搜索")
    X_train, X_test, y_train, y_test = make_dataset()

    pipeline = Pipeline([
        ("scaler", StandardScaler()),
        ("clf",    SVC(random_state=42)),
    ])
    param_grid = {
        "clf__kernel": ["linear", "rbf"],
        "clf__C":      [0.1, 1, 10],
        "clf__gamma":  ["scale", "auto"],
    }
    # 共 2×3×2=12 组,5折 = 60次 fit
    t0 = time.perf_counter()
    gs = GridSearchCV(pipeline, param_grid, cv=5, scoring="accuracy",
                      n_jobs=1, verbose=0)
    gs.fit(X_train, y_train)
    elapsed = time.perf_counter() - t0

    print(f"\n  搜索空间: {len(gs.cv_results_['params'])} 组参数  "
          f"× 5折 = {len(gs.cv_results_['params'])*5} 次 fit")
    print(f"  总耗时: {elapsed:.2f}s\n")

    # Top5 结果
    results = sorted(
        zip(gs.cv_results_["mean_test_score"],
            gs.cv_results_["std_test_score"],
            gs.cv_results_["params"]),
        key=lambda item: item[0],
        reverse=True,
    )[:5]
    rows = [[f"{s:.4f}", f"±{std:.4f}",
             f"kernel={p['clf__kernel']} C={p['clf__C']} gamma={p['clf__gamma']}"]
            for s, std, p in results]
    print_table(["CV均值", "标准差", "参数组合"], rows, [8, 8, 42])

    test_acc = accuracy_score(y_test, gs.predict(X_test))
    print(f"\n  最优参数: {gs.best_params_}")
    print(f"  CV最优分: {gs.best_score_:.4f}")
    print(f"  测试准确率: {test_acc:.4f}  ← 只用一次!")

可运行演示(补齐 Mock 数据与 print 反馈)

import time
import numpy as np
from sklearn.datasets import make_classification
from sklearn.metrics import accuracy_score
from sklearn.model_selection import GridSearchCV, train_test_split
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.svm import SVC

def section(title: str) -> None:
    print(f"\n{'='*62}\n  {title}\n{'='*62}")

def print_table(headers: list[str], rows: list[list], widths: list[int]) -> None:
    sep = "┼".join("─" * (w + 2) for w in widths)
    print(f"┌{'┬'.join('─'*(w+2) for w in widths)}┐")
    print(f"│{'│'.join(f' {str(h):<{w}} ' for h, w in zip(headers, widths))}│")
    print(f"├{sep}┤")
    for row in rows:
        print(f"│{'│'.join(f' {str(v):<{w}} ' for v, w in zip(row, widths))}│")
    print(f"└{'┴'.join('─'*(w+2) for w in widths)}┘")


def make_dataset() -> tuple:
    X, y = make_classification(n_samples=240, n_features=10, n_informative=6, n_redundant=2, random_state=42)
    return train_test_split(X, y, test_size=0.2, random_state=42)

def mode_grid() -> None:
    section("GridSearchCV — 穷举超参数搜索")
    X_train, X_test, y_train, y_test = make_dataset()

    pipeline = Pipeline([
        ("scaler", StandardScaler()),
        ("clf",    SVC(random_state=42)),
    ])
    param_grid = {
        "clf__kernel": ["linear", "rbf"],
        "clf__C":      [0.1, 1, 10],
        "clf__gamma":  ["scale", "auto"],
    }
    # 共 2×3×2=12 组,5折 = 60次 fit
    t0 = time.perf_counter()
    gs = GridSearchCV(pipeline, param_grid, cv=5, scoring="accuracy",
                      n_jobs=1, verbose=0)
    gs.fit(X_train, y_train)
    elapsed = time.perf_counter() - t0

    print(f"\n  搜索空间: {len(gs.cv_results_['params'])} 组参数  "
          f"× 5折 = {len(gs.cv_results_['params'])*5} 次 fit")
    print(f"  总耗时: {elapsed:.2f}s\n")

    # Top5 结果
    results = sorted(
        zip(gs.cv_results_["mean_test_score"],
            gs.cv_results_["std_test_score"],
            gs.cv_results_["params"]),
        key=lambda item: item[0],
        reverse=True,
    )[:5]
    rows = [[f"{s:.4f}", f{std:.4f}",
             f"kernel={p['clf__kernel']} C={p['clf__C']} gamma={p['clf__gamma']}"]
            for s, std, p in results]
    print_table(["CV均值", "标准差", "参数组合"], rows, [8, 8, 42])

    test_acc = accuracy_score(y_test, gs.predict(X_test))
    print(f"\n  最优参数: {gs.best_params_}")
    print(f"  CV最优分: {gs.best_score_:.4f}")
    print(f"  测试准确率: {test_acc:.4f}  ← 只用一次!")

mode_grid()

Step 4:用 mode_random 在大空间里随机采样

痛点与机制

RandomizedSearchCV 像在巨大菜谱书里抽样试菜。很多超参数其实没那么关键,随机搜索常常能用更少尝试找到接近最优的组合,适合参数空间很大时先粗筛。

核心源码(逐字来自文末完整源码)

def mode_random() -> None:
    section("RandomizedSearchCV — 大参数空间随机采样")
    X_train, X_test, y_train, y_test = make_dataset()

    pipeline = Pipeline([
        ("scaler", StandardScaler()),
        ("clf",    RandomForestClassifier(random_state=42)),
    ])
    # 参数空间:4×5×4×3 = 240 组,只采样 30 组
    param_dist: dict[str, Any] = {
        "clf__n_estimators":      [50, 100, 200, 300],
        "clf__max_depth":         [3, 5, 8, 10, None],
        "clf__min_samples_split": [2, 5, 10, 20],
        "clf__max_features":      ["sqrt", "log2", 0.5],
    }
    t0 = time.perf_counter()
    rs = RandomizedSearchCV(pipeline, param_dist, n_iter=30, cv=5,
                            scoring="accuracy", n_jobs=1, random_state=42)
    rs.fit(X_train, y_train)
    elapsed = time.perf_counter() - t0

    print(f"\n  参数空间: 240 组  采样: 30 组 × 5折 = 150 次 fit")
    print(f"  总耗时: {elapsed:.2f}s(穷举需 ~{elapsed/30*240:.0f}s)\n")

    results = sorted(
        zip(rs.cv_results_["mean_test_score"],
            rs.cv_results_["params"]),
        key=lambda item: item[0],
        reverse=True,
    )[:3]
    rows = [[f"{s:.4f}",
             f"n={p['clf__n_estimators']} depth={p['clf__max_depth']} "
             f"split={p['clf__min_samples_split']}"]
            for s, p in results]
    print_table(["CV均值", "Top3 参数"], rows, [8, 50])

    test_acc = accuracy_score(y_test, rs.predict(X_test))
    print(f"\n  最优CV分: {rs.best_score_:.4f}  测试准确率: {test_acc:.4f}")

可运行演示(补齐 Mock 数据与 print 反馈)

import time
from typing import Any
import numpy as np
from sklearn.datasets import make_classification
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import accuracy_score
from sklearn.model_selection import RandomizedSearchCV, train_test_split
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler

def section(title: str) -> None:
    print(f"\n{'='*62}\n  {title}\n{'='*62}")

def print_table(headers: list[str], rows: list[list], widths: list[int]) -> None:
    sep = "┼".join("─" * (w + 2) for w in widths)
    print(f"┌{'┬'.join('─'*(w+2) for w in widths)}┐")
    print(f"│{'│'.join(f' {str(h):<{w}} ' for h, w in zip(headers, widths))}│")
    print(f"├{sep}┤")
    for row in rows:
        print(f"│{'│'.join(f' {str(v):<{w}} ' for v, w in zip(row, widths))}│")
    print(f"└{'┴'.join('─'*(w+2) for w in widths)}┘")

# 演示层用小数据,但保留随机搜索的工作方式。
def make_dataset() -> tuple:
    X, y = make_classification(n_samples=240, n_features=10, n_informative=6, n_redundant=2, random_state=42)
    return train_test_split(X, y, test_size=0.2, random_state=42)

def mode_random() -> None:
    section("RandomizedSearchCV — 大参数空间随机采样")
    X_train, X_test, y_train, y_test = make_dataset()

    pipeline = Pipeline([
        ("scaler", StandardScaler()),
        ("clf",    RandomForestClassifier(random_state=42)),
    ])
    # 参数空间:4×5×4×3 = 240 组,只采样 30 组
    param_dist: dict[str, Any] = {
        "clf__n_estimators":      [50, 100, 200, 300],
        "clf__max_depth":         [3, 5, 8, 10, None],
        "clf__min_samples_split": [2, 5, 10, 20],
        "clf__max_features":      ["sqrt", "log2", 0.5],
    }
    t0 = time.perf_counter()
    rs = RandomizedSearchCV(pipeline, param_dist, n_iter=6, cv=3,
                            scoring="accuracy", n_jobs=1, random_state=42)
    rs.fit(X_train, y_train)
    elapsed = time.perf_counter() - t0

    print(f"\n  参数空间: 240 组  采样: 6 组 × 3折 = 18 次 fit")
    print(f"  总耗时: {elapsed:.2f}s(穷举需 ~{elapsed/6*240:.0f}s)\n")

    results = sorted(
        zip(rs.cv_results_["mean_test_score"],
            rs.cv_results_["params"]),
        key=lambda item: item[0],
        reverse=True,
    )[:3]
    rows = [[f"{s:.4f}",
             f"n={p['clf__n_estimators']} depth={p['clf__max_depth']} "
             f"split={p['clf__min_samples_split']}"]
            for s, p in results]
    print_table(["CV均值", "Top3 参数"], rows, [8, 50])

    test_acc = accuracy_score(y_test, rs.predict(X_test))
    print(f"\n  最优CV分: {rs.best_score_:.4f}  测试准确率: {test_acc:.4f}")

mode_random()

Step 5:用 mode_curve 诊断欠拟合和过拟合

痛点与机制

学习曲线像模型体检报告:训练分低、验证分也低,通常是欠拟合;训练分很高、验证分低,通常是过拟合;两条线都高且接近,才是比较健康。

核心源码(逐字来自文末完整源码)

def mode_curve() -> None:
    section("学习曲线 — 诊断过拟合 / 欠拟合")
    X_train, X_test, y_train, y_test = make_dataset()

    models = [
        ("欠拟合(depth=1)",  RandomForestClassifier(max_depth=1,  n_estimators=50, random_state=42)),
        ("适度拟合(depth=5)", RandomForestClassifier(max_depth=5,  n_estimators=50, random_state=42)),
        ("过拟合(depth=None)",RandomForestClassifier(max_depth=None,n_estimators=50,random_state=42)),
    ]
    train_sizes = np.linspace(0.1, 1.0, 8)

    for name, model in models:
        tr_sizes, tr_scores, val_scores = learning_curve(
            model, X_train, y_train,
            train_sizes=train_sizes, cv=5, scoring="accuracy", n_jobs=1,
        )
        tr_mean  = tr_scores.mean(axis=1)
        val_mean = val_scores.mean(axis=1)

        # ASCII 折线图
        W = 40
        print(f"\n  {name}")
        print(f"  {'样本数':<8} {'训练分':<8} {'验证分':<8} 差距")
        print(f"  {'─'*50}")
        for n, tr, val in zip(tr_sizes, tr_mean, val_mean):
            bar_tr  = "█" * int(tr  * W)
            bar_val = "█" * int(val * W)
            gap = tr - val
            flag = "⚠ 过拟合" if gap > 0.1 else ("⚠ 欠拟合" if val < 0.75 else "✓")
            print(f"  {int(n):<8} {tr:.3f}    {val:.3f}    {gap:+.3f} {flag}")

可运行演示(补齐 Mock 数据与 print 反馈)

import time
import numpy as np
from sklearn.datasets import make_classification
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import learning_curve, train_test_split

def section(title: str) -> None:
    print(f"\n{'='*62}\n  {title}\n{'='*62}")

# 演示层缩短 cv 和模型规模,避免学习曲线等待太久。
def make_dataset() -> tuple:
    X, y = make_classification(n_samples=260, n_features=10, n_informative=6, n_redundant=2, random_state=42)
    return train_test_split(X, y, test_size=0.2, random_state=42)

def mode_curve() -> None:
    section("学习曲线 — 诊断过拟合 / 欠拟合")
    X_train, X_test, y_train, y_test = make_dataset()

    models = [
        ("欠拟合(depth=1)",  RandomForestClassifier(max_depth=1,  n_estimators=20, random_state=42)),
        ("适度拟合(depth=5)", RandomForestClassifier(max_depth=5,  n_estimators=20, random_state=42)),
        ("过拟合(depth=None)",RandomForestClassifier(max_depth=None,n_estimators=20,random_state=42)),
    ]
    train_sizes = np.linspace(0.1, 1.0, 8)

    for name, model in models:
        tr_sizes, tr_scores, val_scores = learning_curve(
            model, X_train, y_train,
            train_sizes=train_sizes, cv=3, scoring="accuracy", n_jobs=1,
        )
        tr_mean  = tr_scores.mean(axis=1)
        val_mean = val_scores.mean(axis=1)

        # ASCII 折线图
        W = 40
        print(f"\n  {name}")
        print(f"  {'样本数':<8} {'训练分':<8} {'验证分':<8} 差距")
        print(f"  {'─'*50}")
        for n, tr, val in zip(tr_sizes, tr_mean, val_mean):
            bar_tr  = "█" * int(tr  * W)
            bar_val = "█" * int(val * W)
            gap = tr - val
            flag = "⚠ 过拟合" if gap > 0.1 else ("⚠ 欠拟合" if val < 0.75 else "✓")
            print(f"  {int(n):<8} {tr:.3f}    {val:.3f}    {gap:+.3f} {flag}")

mode_curve()

Step 6:用 mode_persist 保存和加载训练好的模型

痛点与机制

模型持久化像把厨师的手艺封装成菜谱文件。训练一次后保存,下次加载就能预测。演示使用 tempfile 临时目录,不会要求读者手动准备路径,也不会污染项目。

核心源码(逐字来自文末完整源码)

def mode_persist() -> None:
    section("模型持久化 — joblib / pickle 对比")
    import pickle
    try:
        import joblib
        has_joblib = True
    except ImportError:
        has_joblib = False

    X_train, X_test, y_train, y_test = make_dataset()
    pipeline = Pipeline([
        ("scaler", StandardScaler()),
        ("clf",    RandomForestClassifier(n_estimators=100, random_state=42)),
    ])
    pipeline.fit(X_train, y_train)
    baseline_acc = accuracy_score(y_test, pipeline.predict(X_test))
    print(f"\n  原始模型测试准确率: {baseline_acc:.4f}")

    with tempfile.TemporaryDirectory() as tmpdir:
        rows = []

        # pickle
        pkl_path = os.path.join(tmpdir, "model.pkl")
        t0 = time.perf_counter()
        with open(pkl_path, "wb") as f:
            pickle.dump(pipeline, f)
        save_t = time.perf_counter() - t0
        size_kb = os.path.getsize(pkl_path) / 1024

        t0 = time.perf_counter()
        with open(pkl_path, "rb") as f:
            loaded = pickle.load(f)
        load_t = time.perf_counter() - t0
        acc = accuracy_score(y_test, loaded.predict(X_test))
        rows.append(["pickle", f"{size_kb:.1f} KB",
                     f"{save_t*1000:.1f}ms", f"{load_t*1000:.1f}ms", f"{acc:.4f}"])

        # joblib
        if has_joblib:
            jbl_path = os.path.join(tmpdir, "model.joblib")
            t0 = time.perf_counter()
            joblib.dump(pipeline, jbl_path)
            save_t = time.perf_counter() - t0
            size_kb = os.path.getsize(jbl_path) / 1024

            t0 = time.perf_counter()
            loaded2 = joblib.load(jbl_path)
            load_t = time.perf_counter() - t0
            acc2 = accuracy_score(y_test, loaded2.predict(X_test))
            rows.append(["joblib", f"{size_kb:.1f} KB",
                         f"{save_t*1000:.1f}ms", f"{load_t*1000:.1f}ms", f"{acc2:.4f}"])

        print_table(["方式", "文件大小", "保存耗时", "加载耗时", "加载后准确率"],
                    rows, [8, 10, 10, 10, 12])
        print("\n  💡 joblib 对 numpy 数组做了内存映射优化,大模型推荐 joblib")

可运行演示(补齐 Mock 数据与 print 反馈)

import os
import tempfile
import time
import numpy as np
from sklearn.datasets import make_classification
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import accuracy_score
from sklearn.model_selection import train_test_split
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler

def section(title: str) -> None:
    print(f"\n{'='*62}\n  {title}\n{'='*62}")

def print_table(headers: list[str], rows: list[list], widths: list[int]) -> None:
    sep = "┼".join("─" * (w + 2) for w in widths)
    print(f"┌{'┬'.join('─'*(w+2) for w in widths)}┐")
    print(f"│{'│'.join(f' {str(h):<{w}} ' for h, w in zip(headers, widths))}│")
    print(f"├{sep}┤")
    for row in rows:
        print(f"│{'│'.join(f' {str(v):<{w}} ' for v, w in zip(row, widths))}│")
    print(f"└{'┴'.join('─'*(w+2) for w in widths)}┘")

# 演示层用小森林,重点验证保存/加载闭环。
def make_dataset() -> tuple:
    X, y = make_classification(n_samples=240, n_features=10, n_informative=6, n_redundant=2, random_state=42)
    return train_test_split(X, y, test_size=0.2, random_state=42)

def mode_persist() -> None:
    section("模型持久化 — joblib / pickle 对比")
    import pickle
    try:
        import joblib
        has_joblib = True
    except ImportError:
        has_joblib = False

    X_train, X_test, y_train, y_test = make_dataset()
    pipeline = Pipeline([
        ("scaler", StandardScaler()),
        ("clf",    RandomForestClassifier(n_estimators=20, random_state=42)),
    ])
    pipeline.fit(X_train, y_train)
    baseline_acc = accuracy_score(y_test, pipeline.predict(X_test))
    print(f"\n  原始模型测试准确率: {baseline_acc:.4f}")

    with tempfile.TemporaryDirectory() as tmpdir:
        rows = []

        # pickle
        pkl_path = os.path.join(tmpdir, "model.pkl")
        t0 = time.perf_counter()
        with open(pkl_path, "wb") as f:
            pickle.dump(pipeline, f)
        save_t = time.perf_counter() - t0
        size_kb = os.path.getsize(pkl_path) / 1024

        t0 = time.perf_counter()
        with open(pkl_path, "rb") as f:
            loaded = pickle.load(f)
        load_t = time.perf_counter() - t0
        acc = accuracy_score(y_test, loaded.predict(X_test))
        rows.append(["pickle", f"{size_kb:.1f} KB",
                     f"{save_t*1000:.1f}ms", f"{load_t*1000:.1f}ms", f"{acc:.4f}"])

        # joblib
        if has_joblib:
            jbl_path = os.path.join(tmpdir, "model.joblib")
            t0 = time.perf_counter()
            joblib.dump(pipeline, jbl_path)
            save_t = time.perf_counter() - t0
            size_kb = os.path.getsize(jbl_path) / 1024

            t0 = time.perf_counter()
            loaded2 = joblib.load(jbl_path)
            load_t = time.perf_counter() - t0
            acc2 = accuracy_score(y_test, loaded2.predict(X_test))
            rows.append(["joblib", f"{size_kb:.1f} KB",
                         f"{save_t*1000:.1f}ms", f"{load_t*1000:.1f}ms", f"{acc2:.4f}"])

        print_table(["方式", "文件大小", "保存耗时", "加载耗时", "加载后准确率"],
                    rows, [8, 10, 10, 10, 12])
        print("\n  💡 joblib 对 numpy 数组做了内存映射优化,大模型推荐 joblib")

mode_persist()

Step 7:用 main 做 grid/random/curve/persist/all 命令调度

痛点与机制

main 是脚本遥控器。新手不用改源码,只要换 --mode,就能分别运行网格搜索、随机搜索、学习曲线和模型保存。默认 grid 更快,all 会完整跑但更耗时。

核心源码(逐字来自文末完整源码)

def main() -> None:
    parser = argparse.ArgumentParser(description="模型调优完整工作流")
    parser.add_argument(
        "--mode",
        choices=["grid", "random", "curve", "persist", "all"],
        default="grid",
        help="默认运行 grid 快速闭环;all 会完整跑四个模式,耗时更长",
    )
    args = parser.parse_args()
    dispatch = {
        "grid":    mode_grid,
        "random":  mode_random,
        "curve":   mode_curve,
        "persist": mode_persist,
        "all":     lambda: [mode_grid(), mode_random(), mode_curve(), mode_persist()],
    }
    dispatch[args.mode]()

可运行演示(补齐 Mock 数据与 print 反馈)

import argparse
import sys


def mode_grid() -> None:
    print("grid:穷举少量参数组合,找最优配置")


def mode_random() -> None:
    print("random:在大参数空间里随机抽样,省时间")


def mode_curve() -> None:
    print("curve:看训练分和验证分,判断欠拟合/过拟合")


def mode_persist() -> None:
    print("persist:把训练好的模型保存下来,下次直接加载")

def main() -> None:
    parser = argparse.ArgumentParser(description="模型调优完整工作流")
    parser.add_argument(
        "--mode",
        choices=["grid", "random", "curve", "persist", "all"],
        default="grid",
        help="默认运行 grid 快速闭环;all 会完整跑四个模式,耗时更长",
    )
    args = parser.parse_args()
    dispatch = {
        "grid":    mode_grid,
        "random":  mode_random,
        "curve":   mode_curve,
        "persist": mode_persist,
        "all":     lambda: [mode_grid(), mode_random(), mode_curve(), mode_persist()],
    }
    dispatch[args.mode]()

for mode in ["grid", "random", "curve", "persist", "all"]:
    print(f"\n$ python3 46-python-model-tuning.py --mode {mode}")
    sys.argv = ["prog", "--mode", mode]
    main()

极客实战:完整源码与运行

现在,把上面的积木拼起来,将以下完整代码放进你的编辑器。建议先跑 --mode grid 快速闭环,再按需运行 randomcurvepersistall 会更耗时。

#!/usr/bin/env python3
"""
46-python-model-tuning.py — 模型调优完整工作流

用法:
  python3 46-python-model-tuning.py --mode grid      # GridSearchCV
  python3 46-python-model-tuning.py --mode random    # RandomizedSearchCV
  python3 46-python-model-tuning.py --mode curve     # 学习曲线诊断
  python3 46-python-model-tuning.py --mode persist   # 模型持久化
  python3 46-python-model-tuning.py --mode all       # 全部演示

除 sklearn/numpy 外无额外依赖,直接运行。
"""

import argparse
import io
import os
import tempfile
import time
from typing import Any

import numpy as np
from sklearn.datasets import make_classification
from sklearn.ensemble import RandomForestClassifier
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import accuracy_score, classification_report
from sklearn.model_selection import (
    GridSearchCV,
    RandomizedSearchCV,
    cross_val_score,
    learning_curve,
    train_test_split,
)
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.svm import SVC

# ─── 工具 ──────────────────────────────────────────────────────────────────────

def section(title: str) -> None:
    print(f"\n{'='*62}\n  {title}\n{'='*62}")


def print_table(headers: list[str], rows: list[list], widths: list[int]) -> None:
    sep = "┼".join("─" * (w + 2) for w in widths)
    print(f"┌{'┬'.join('─'*(w+2) for w in widths)}┐")
    print(f"│{'│'.join(f' {str(h):<{w}} ' for h, w in zip(headers, widths))}│")
    print(f"├{sep}┤")
    for row in rows:
        print(f"│{'│'.join(f' {str(v):<{w}} ' for v, w in zip(row, widths))}│")
    print(f"└{'┴'.join('─'*(w+2) for w in widths)}┘")


def make_dataset() -> tuple:
    X, y = make_classification(
        n_samples=1000, n_features=15, n_informative=8,
        n_redundant=3, random_state=42,
    )
    return train_test_split(X, y, test_size=0.2, random_state=42)

# ─── 模式1:GridSearchCV ───────────────────────────────────────────────────────

def mode_grid() -> None:
    section("GridSearchCV — 穷举超参数搜索")
    X_train, X_test, y_train, y_test = make_dataset()

    pipeline = Pipeline([
        ("scaler", StandardScaler()),
        ("clf",    SVC(random_state=42)),
    ])
    param_grid = {
        "clf__kernel": ["linear", "rbf"],
        "clf__C":      [0.1, 1, 10],
        "clf__gamma":  ["scale", "auto"],
    }
    # 共 2×3×2=12 组,5折 = 60次 fit
    t0 = time.perf_counter()
    gs = GridSearchCV(pipeline, param_grid, cv=5, scoring="accuracy",
                      n_jobs=1, verbose=0)
    gs.fit(X_train, y_train)
    elapsed = time.perf_counter() - t0

    print(f"\n  搜索空间: {len(gs.cv_results_['params'])} 组参数  "
          f"× 5折 = {len(gs.cv_results_['params'])*5} 次 fit")
    print(f"  总耗时: {elapsed:.2f}s\n")

    # Top5 结果
    results = sorted(
        zip(gs.cv_results_["mean_test_score"],
            gs.cv_results_["std_test_score"],
            gs.cv_results_["params"]),
        key=lambda item: item[0],
        reverse=True,
    )[:5]
    rows = [[f"{s:.4f}", f{std:.4f}",
             f"kernel={p['clf__kernel']} C={p['clf__C']} gamma={p['clf__gamma']}"]
            for s, std, p in results]
    print_table(["CV均值", "标准差", "参数组合"], rows, [8, 8, 42])

    test_acc = accuracy_score(y_test, gs.predict(X_test))
    print(f"\n  最优参数: {gs.best_params_}")
    print(f"  CV最优分: {gs.best_score_:.4f}")
    print(f"  测试准确率: {test_acc:.4f}  ← 只用一次!")

# ─── 模式2:RandomizedSearchCV ────────────────────────────────────────────────

def mode_random() -> None:
    section("RandomizedSearchCV — 大参数空间随机采样")
    X_train, X_test, y_train, y_test = make_dataset()

    pipeline = Pipeline([
        ("scaler", StandardScaler()),
        ("clf",    RandomForestClassifier(random_state=42)),
    ])
    # 参数空间:4×5×4×3 = 240 组,只采样 30 组
    param_dist: dict[str, Any] = {
        "clf__n_estimators":      [50, 100, 200, 300],
        "clf__max_depth":         [3, 5, 8, 10, None],
        "clf__min_samples_split": [2, 5, 10, 20],
        "clf__max_features":      ["sqrt", "log2", 0.5],
    }
    t0 = time.perf_counter()
    rs = RandomizedSearchCV(pipeline, param_dist, n_iter=30, cv=5,
                            scoring="accuracy", n_jobs=1, random_state=42)
    rs.fit(X_train, y_train)
    elapsed = time.perf_counter() - t0

    print(f"\n  参数空间: 240 组  采样: 30 组 × 5折 = 150 次 fit")
    print(f"  总耗时: {elapsed:.2f}s(穷举需 ~{elapsed/30*240:.0f}s)\n")

    results = sorted(
        zip(rs.cv_results_["mean_test_score"],
            rs.cv_results_["params"]),
        key=lambda item: item[0],
        reverse=True,
    )[:3]
    rows = [[f"{s:.4f}",
             f"n={p['clf__n_estimators']} depth={p['clf__max_depth']} "
             f"split={p['clf__min_samples_split']}"]
            for s, p in results]
    print_table(["CV均值", "Top3 参数"], rows, [8, 50])

    test_acc = accuracy_score(y_test, rs.predict(X_test))
    print(f"\n  最优CV分: {rs.best_score_:.4f}  测试准确率: {test_acc:.4f}")

# ─── 模式3:学习曲线诊断 ───────────────────────────────────────────────────────

def mode_curve() -> None:
    section("学习曲线 — 诊断过拟合 / 欠拟合")
    X_train, X_test, y_train, y_test = make_dataset()

    models = [
        ("欠拟合(depth=1)",  RandomForestClassifier(max_depth=1,  n_estimators=50, random_state=42)),
        ("适度拟合(depth=5)", RandomForestClassifier(max_depth=5,  n_estimators=50, random_state=42)),
        ("过拟合(depth=None)",RandomForestClassifier(max_depth=None,n_estimators=50,random_state=42)),
    ]
    train_sizes = np.linspace(0.1, 1.0, 8)

    for name, model in models:
        tr_sizes, tr_scores, val_scores = learning_curve(
            model, X_train, y_train,
            train_sizes=train_sizes, cv=5, scoring="accuracy", n_jobs=1,
        )
        tr_mean  = tr_scores.mean(axis=1)
        val_mean = val_scores.mean(axis=1)

        # ASCII 折线图
        W = 40
        print(f"\n  {name}")
        print(f"  {'样本数':<8} {'训练分':<8} {'验证分':<8} 差距")
        print(f"  {'─'*50}")
        for n, tr, val in zip(tr_sizes, tr_mean, val_mean):
            bar_tr  = "█" * int(tr  * W)
            bar_val = "█" * int(val * W)
            gap = tr - val
            flag = "⚠ 过拟合" if gap > 0.1 else ("⚠ 欠拟合" if val < 0.75 else "✓")
            print(f"  {int(n):<8} {tr:.3f}    {val:.3f}    {gap:+.3f} {flag}")

# ─── 模式4:模型持久化 ─────────────────────────────────────────────────────────

def mode_persist() -> None:
    section("模型持久化 — joblib / pickle 对比")
    import pickle
    try:
        import joblib
        has_joblib = True
    except ImportError:
        has_joblib = False

    X_train, X_test, y_train, y_test = make_dataset()
    pipeline = Pipeline([
        ("scaler", StandardScaler()),
        ("clf",    RandomForestClassifier(n_estimators=100, random_state=42)),
    ])
    pipeline.fit(X_train, y_train)
    baseline_acc = accuracy_score(y_test, pipeline.predict(X_test))
    print(f"\n  原始模型测试准确率: {baseline_acc:.4f}")

    with tempfile.TemporaryDirectory() as tmpdir:
        rows = []

        # pickle
        pkl_path = os.path.join(tmpdir, "model.pkl")
        t0 = time.perf_counter()
        with open(pkl_path, "wb") as f:
            pickle.dump(pipeline, f)
        save_t = time.perf_counter() - t0
        size_kb = os.path.getsize(pkl_path) / 1024

        t0 = time.perf_counter()
        with open(pkl_path, "rb") as f:
            loaded = pickle.load(f)
        load_t = time.perf_counter() - t0
        acc = accuracy_score(y_test, loaded.predict(X_test))
        rows.append(["pickle", f"{size_kb:.1f} KB",
                     f"{save_t*1000:.1f}ms", f"{load_t*1000:.1f}ms", f"{acc:.4f}"])

        # joblib
        if has_joblib:
            jbl_path = os.path.join(tmpdir, "model.joblib")
            t0 = time.perf_counter()
            joblib.dump(pipeline, jbl_path)
            save_t = time.perf_counter() - t0
            size_kb = os.path.getsize(jbl_path) / 1024

            t0 = time.perf_counter()
            loaded2 = joblib.load(jbl_path)
            load_t = time.perf_counter() - t0
            acc2 = accuracy_score(y_test, loaded2.predict(X_test))
            rows.append(["joblib", f"{size_kb:.1f} KB",
                         f"{save_t*1000:.1f}ms", f"{load_t*1000:.1f}ms", f"{acc2:.4f}"])

        print_table(["方式", "文件大小", "保存耗时", "加载耗时", "加载后准确率"],
                    rows, [8, 10, 10, 10, 12])
        print("\n  💡 joblib 对 numpy 数组做了内存映射优化,大模型推荐 joblib")

# ─── 入口 ─────────────────────────────────────────────────────────────────────

def main() -> None:
    parser = argparse.ArgumentParser(description="模型调优完整工作流")
    parser.add_argument(
        "--mode",
        choices=["grid", "random", "curve", "persist", "all"],
        default="grid",
        help="默认运行 grid 快速闭环;all 会完整跑四个模式,耗时更长",
    )
    args = parser.parse_args()
    dispatch = {
        "grid":    mode_grid,
        "random":  mode_random,
        "curve":   mode_curve,
        "persist": mode_persist,
        "all":     lambda: [mode_grid(), mode_random(), mode_curve(), mode_persist()],
    }
    dispatch[args.mode]()


if __name__ == "__main__":
    main()
$ python3 46-python-model-tuning.py --mode grid

==============================================================
  GridSearchCV — 穷举超参数搜索
==============================================================

  搜索空间: 12 组参数  × 5折 = 60 次 fit
  总耗时: 0.64s

┌──────────┬──────────┬────────────────────────────────────────────┐
│ CV均值     │ 标准差      │ 参数组合                                       │
├──────────┼──────────┼────────────────────────────────────────────┤
│ 0.8875   │ ±0.0185  │ kernel=rbf C=10 gamma=scale                │
│ 0.8875   │ ±0.0185  │ kernel=rbf C=10 gamma=auto                 │
│ 0.8800   │ ±0.0165  │ kernel=rbf C=1 gamma=scale                 │
│ 0.8800   │ ±0.0165  │ kernel=rbf C=1 gamma=auto                  │
│ 0.8088   │ ±0.0116  │ kernel=rbf C=0.1 gamma=scale               │
└──────────┴──────────┴────────────────────────────────────────────┘

  最优参数: {'clf__C': 10, 'clf__gamma': 'scale', 'clf__kernel': 'rbf'}
  CV最优分: 0.8875
  测试准确率: 0.8800  ← 只用一次!

$ python3 46-python-model-tuning.py --mode persist

==============================================================
  模型持久化 — joblib / pickle 对比
==============================================================

  原始模型测试准确率: 0.8250
┌──────────┬────────────┬────────────┬────────────┬──────────────┐
│ 方式       │ 文件大小       │ 保存耗时       │ 加载耗时       │ 加载后准确率       │
├──────────┼────────────┼────────────┼────────────┼──────────────┤
│ pickle   │ 1578.6 KB  │ 1.5ms      │ 1.2ms      │ 0.8250       │
│ joblib   │ 1586.4 KB  │ 9.6ms      │ 6.4ms      │ 0.8250       │
└──────────┴────────────┴────────────┴────────────┴──────────────┘

  💡 joblib 对 numpy 数组做了内存映射优化,大模型推荐 joblib

小结

模块 你要记住什么
make_dataset 切出训练集和测试集,测试集最后只用一次
mode_grid 小参数空间用穷举搜索,结果可解释
mode_random 大参数空间先随机采样,速度更友好
mode_curve 用训练/验证分数差距诊断模型问题
mode_persist 用临时目录演示模型保存和加载闭环
main --mode 分层运行调参工作流

⏱ NexDo Time(5 分钟)

挑战:把 mode_grid()clf__C 改成 [0.01, 0.1, 1, 10, 100],重新运行 --mode grid,观察最优参数和测试准确率是否变化。

Don’t wait for next time, do it in the next moment.