文章

42 · 无监督学习:K-Means 聚类与异常检测

#051 · 2026-04-17 · Python

🔗 知识图谱导航:阅读本文前,建议先回顾《39 · scikit-learn 实战》中的标准化和评估思路,以及《41 · 分类树与随机森林》中的“模型不是只看训练结果,还要看泛化与解释”的思路;本文进入无监督学习:没有标签,也要从数据里找结构。

运行环境pip install numpy scikit-learn。本文所有数据都由 make_blobs() 本地生成,不需要下载文件,也不依赖外部服务。

痛点与架构:现实里很多数据没有人工标签,比如用户行为、交易记录、设备指标。无监督学习要做的事,就是先不问“正确答案是什么”,而是让算法自己发现分组、密度和异常点。本文用 K-Means、DBSCAN 和孤立森林完成一条无标签数据探索链路。

无监督学习先建立直觉

K-Means:先指定 K,让样本去最近的质心集合。
DBSCAN:不指定 K,按密度连通找簇,离群点标为噪声。
孤立森林:不做分组,专门找“容易被孤立”的异常点。
肘部法则:帮助判断 K-Means 里 K 取多少更合理。

极客解析:监督学习像有答案的考试,无监督学习像整理一箱没有标签的零件。你要先看哪些零件自然成堆,哪些零件离群,再决定后续业务怎么处理。

步步为营:核心逻辑自适应拆解

这一篇拆成 7 个台阶:数据生成、终端可视化、K-Means、DBSCAN、异常检测、肘部法则和 CLI 调度。每段演示都能独立运行,并且会直接打印可见结果。

Step 1:用 make_data 生成无标签聚类数据

痛点与机制

make_data 是无监督学习的练习场。K-Means、DBSCAN 只看 X 这张二维坐标表,不看 y。这里保留 y 只是为了教学对照,就像老师知道标准答案,但学生做题时看不到。

核心源码(逐字来自文末完整源码)

def make_data(n_clusters: int = 4) -> Tuple[np.ndarray, np.ndarray]:
    X, y = make_blobs(n_samples=400, centers=n_clusters, cluster_std=0.8, random_state=42)
    return X, y

可运行演示(补齐 Mock 数据与 print 反馈)

from typing import Tuple
import numpy as np
from sklearn.datasets import make_blobs

def make_data(n_clusters: int = 4) -> Tuple[np.ndarray, np.ndarray]:
    X, y = make_blobs(n_samples=400, centers=n_clusters, cluster_std=0.8, random_state=42)
    return X, y

X, y = make_data(n_clusters=4)
print("数据形状:", X.shape)
print("聚类算法看到的只有 X,不看 y")
print("真实分组只用于教学对照:", dict(zip(*np.unique(y, return_counts=True))))
print("第一行样本:", np.round(X[0], 3).tolist())

Step 2:用 ascii_scatter 在终端画出聚类地图

痛点与机制

ascii_scatter 把二维坐标压到字符网格里,不需要 matplotlib,也能让新手看到“点大概分成几团”。它像把地图缩印到终端:不同字母代表不同簇,! 代表噪声或异常点。

核心源码(逐字来自文末完整源码)

def ascii_scatter(X: np.ndarray, labels: np.ndarray, title: str,
                  width: int = 60, height: int = 20) -> None:
    """ASCII散点图,不同簇用不同字符"""
    chars = "ABCDEFGHIJ*"
    x_min, x_max = X[:, 0].min(), X[:, 0].max()
    y_min, y_max = X[:, 1].min(), X[:, 1].max()
    x_range = x_max - x_min + 1e-9
    y_range = y_max - y_min + 1e-9

    grid = [["·"] * width for _ in range(height)]
    for (x, y), label in zip(X, labels):
        col = int((x - x_min) / x_range * (width - 1))
        row = int((y_max - y) / y_range * (height - 1))
        col = max(0, min(width - 1, col))
        row = max(0, min(height - 1, row))
        ch = chars[label % len(chars)] if label >= 0 else "!"
        grid[row][col] = ch

    print(f"\n  {title}")
    print(f"  {'─'*width}")
    for line in grid:
        print(f"  {''.join(line)}")
    print(f"  {'─'*width}")
    unique = sorted(set(labels))
    legend = "  图例: " + "  ".join(
        f"{chars[l % len(chars)]}=簇{l}" if l >= 0 else "!=噪声"
        for l in unique
    )
    print(legend)

可运行演示(补齐 Mock 数据与 print 反馈)

import numpy as np

def ascii_scatter(X: np.ndarray, labels: np.ndarray, title: str,
                  width: int = 60, height: int = 20) -> None:
    """ASCII散点图,不同簇用不同字符"""
    chars = "ABCDEFGHIJ*"
    x_min, x_max = X[:, 0].min(), X[:, 0].max()
    y_min, y_max = X[:, 1].min(), X[:, 1].max()
    x_range = x_max - x_min + 1e-9
    y_range = y_max - y_min + 1e-9

    grid = [["·"] * width for _ in range(height)]
    for (x, y), label in zip(X, labels):
        col = int((x - x_min) / x_range * (width - 1))
        row = int((y_max - y) / y_range * (height - 1))
        col = max(0, min(width - 1, col))
        row = max(0, min(height - 1, row))
        ch = chars[label % len(chars)] if label >= 0 else "!"
        grid[row][col] = ch

    print(f"\n  {title}")
    print(f"  {'─'*width}")
    for line in grid:
        print(f"  {''.join(line)}")
    print(f"  {'─'*width}")
    unique = sorted(set(labels))
    legend = "  图例: " + "  ".join(
        f"{chars[l % len(chars)]}=簇{l}" if l >= 0 else "!=噪声"
        for l in unique
    )
    print(legend)

X = np.array([
    [0.0, 0.0], [0.2, 0.1], [0.1, 0.3],
    [2.8, 3.0], [3.1, 2.9], [3.0, 3.2],
    [6.0, 0.2],
])
labels = np.array([0, 0, 0, 1, 1, 1, -1])
ascii_scatter(X, labels, "两簇 + 一个噪声点", width=36, height=10)

Step 3:用 mode_kmeans 按质心距离分组

痛点与机制

K-Means 像“先选几个临时集合点,再让每个人去最近集合点排队”。排完后重新计算集合点,反复迭代。轮廓系数 越大,说明簇内更紧、簇间更远;惯性 越小,说明点离自己的质心更近。

核心源码(逐字来自文末完整源码)

def mode_kmeans(X: np.ndarray) -> None:
    print(f"[{nexdo_time()}] K-Means 聚类")
    scaler = StandardScaler()
    X_s = scaler.fit_transform(X)
    km = KMeans(n_clusters=4, init="k-means++", random_state=42, n_init=10)
    labels = km.fit_predict(X_s)
    sil = silhouette_score(X_s, labels)
    print(f"  轮廓系数: {sil:.4f}  惯性: {km.inertia_:.2f}")
    ascii_scatter(X_s, labels, "K-Means 聚类结果(K=4)")

可运行演示(补齐 Mock 数据与 print 反馈)

import time
from typing import Tuple
import numpy as np
from sklearn.datasets import make_blobs
from sklearn.cluster import KMeans
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import silhouette_score


def nexdo_time() -> str:
    return time.strftime("%Y-%m-%d %H:%M:%S")

def make_data(n_clusters: int = 4) -> Tuple[np.ndarray, np.ndarray]:
    X, y = make_blobs(n_samples=400, centers=n_clusters, cluster_std=0.8, random_state=42)
    return X, y

def ascii_scatter(X: np.ndarray, labels: np.ndarray, title: str,
                  width: int = 60, height: int = 20) -> None:
    """ASCII散点图,不同簇用不同字符"""
    chars = "ABCDEFGHIJ*"
    x_min, x_max = X[:, 0].min(), X[:, 0].max()
    y_min, y_max = X[:, 1].min(), X[:, 1].max()
    x_range = x_max - x_min + 1e-9
    y_range = y_max - y_min + 1e-9

    grid = [["·"] * width for _ in range(height)]
    for (x, y), label in zip(X, labels):
        col = int((x - x_min) / x_range * (width - 1))
        row = int((y_max - y) / y_range * (height - 1))
        col = max(0, min(width - 1, col))
        row = max(0, min(height - 1, row))
        ch = chars[label % len(chars)] if label >= 0 else "!"
        grid[row][col] = ch

    print(f"\n  {title}")
    print(f"  {'─'*width}")
    for line in grid:
        print(f"  {''.join(line)}")
    print(f"  {'─'*width}")
    unique = sorted(set(labels))
    legend = "  图例: " + "  ".join(
        f"{chars[l % len(chars)]}=簇{l}" if l >= 0 else "!=噪声"
        for l in unique
    )
    print(legend)

def mode_kmeans(X: np.ndarray) -> None:
    print(f"[{nexdo_time()}] K-Means 聚类")
    scaler = StandardScaler()
    X_s = scaler.fit_transform(X)
    km = KMeans(n_clusters=4, init="k-means++", random_state=42, n_init=10)
    labels = km.fit_predict(X_s)
    sil = silhouette_score(X_s, labels)
    print(f"  轮廓系数: {sil:.4f}  惯性: {km.inertia_:.2f}")
    ascii_scatter(X_s, labels, "K-Means 聚类结果(K=4)")

X, _ = make_data()
mode_kmeans(X)

Step 4:用 mode_dbscan 按密度找簇和噪声

痛点与机制

DBSCAN 像找朋友圈:一个人附近朋友够多,就是核心成员;核心成员连成一片就是簇;离大家都远的人会被标成噪声。它不需要提前指定 K,但 epsmin_samples 会强烈影响结果。

核心源码(逐字来自文末完整源码)

def mode_dbscan(X: np.ndarray) -> None:
    print(f"[{nexdo_time()}] DBSCAN 聚类")
    scaler = StandardScaler()
    X_s = scaler.fit_transform(X)
    rows = []
    for eps, min_s in [(0.3, 5), (0.5, 5), (0.5, 10), (0.8, 5)]:
        db = DBSCAN(eps=eps, min_samples=min_s)
        labels = db.fit_predict(X_s)
        n_clusters = len(set(labels)) - (1 if -1 in labels else 0)
        n_noise = (labels == -1).sum()
        sil = silhouette_score(X_s, labels) if n_clusters > 1 else -1
        rows.append([eps, min_s, n_clusters, n_noise, f"{sil:.4f}"])
    print_table(["eps", "min_samples", "簇数", "噪声点", "轮廓系数"], rows, "DBSCAN 参数对比")

    db_best = DBSCAN(eps=0.5, min_samples=5)
    labels_best = db_best.fit_predict(X_s)
    ascii_scatter(X_s, labels_best, "DBSCAN 聚类结果(eps=0.5, min_samples=5)")

可运行演示(补齐 Mock 数据与 print 反馈)

import time
from typing import Tuple
import numpy as np
from sklearn.datasets import make_blobs
from sklearn.cluster import DBSCAN
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import silhouette_score


def nexdo_time() -> str:
    return time.strftime("%Y-%m-%d %H:%M:%S")

def print_table(headers: list, rows: list, title: str = "") -> None:
    if title:
        print(f"\n{'='*60}\n  {title}\n{'='*60}")
    col_widths = [max(len(str(h)), max((len(str(r[i])) for r in rows), default=0))
                  for i, h in enumerate(headers)]
    print(f"┌{'┬'.join('─'*(w+2) for w in col_widths)}┐")
    print(f"│{'│'.join(f' {str(h):<{w}} ' for h, w in zip(headers, col_widths))}│")
    print(f"├{'┼'.join('─'*(w+2) for w in col_widths)}┤")
    for row in rows:
        print(f"│{'│'.join(f' {str(v):<{w}} ' for v, w in zip(row, col_widths))}│")
    print(f"└{'┴'.join('─'*(w+2) for w in col_widths)}┘")

def make_data(n_clusters: int = 4) -> Tuple[np.ndarray, np.ndarray]:
    X, y = make_blobs(n_samples=400, centers=n_clusters, cluster_std=0.8, random_state=42)
    return X, y

def ascii_scatter(X: np.ndarray, labels: np.ndarray, title: str,
                  width: int = 60, height: int = 20) -> None:
    """ASCII散点图,不同簇用不同字符"""
    chars = "ABCDEFGHIJ*"
    x_min, x_max = X[:, 0].min(), X[:, 0].max()
    y_min, y_max = X[:, 1].min(), X[:, 1].max()
    x_range = x_max - x_min + 1e-9
    y_range = y_max - y_min + 1e-9

    grid = [["·"] * width for _ in range(height)]
    for (x, y), label in zip(X, labels):
        col = int((x - x_min) / x_range * (width - 1))
        row = int((y_max - y) / y_range * (height - 1))
        col = max(0, min(width - 1, col))
        row = max(0, min(height - 1, row))
        ch = chars[label % len(chars)] if label >= 0 else "!"
        grid[row][col] = ch

    print(f"\n  {title}")
    print(f"  {'─'*width}")
    for line in grid:
        print(f"  {''.join(line)}")
    print(f"  {'─'*width}")
    unique = sorted(set(labels))
    legend = "  图例: " + "  ".join(
        f"{chars[l % len(chars)]}=簇{l}" if l >= 0 else "!=噪声"
        for l in unique
    )
    print(legend)

def mode_dbscan(X: np.ndarray) -> None:
    print(f"[{nexdo_time()}] DBSCAN 聚类")
    scaler = StandardScaler()
    X_s = scaler.fit_transform(X)
    rows = []
    for eps, min_s in [(0.3, 5), (0.5, 5), (0.5, 10), (0.8, 5)]:
        db = DBSCAN(eps=eps, min_samples=min_s)
        labels = db.fit_predict(X_s)
        n_clusters = len(set(labels)) - (1 if -1 in labels else 0)
        n_noise = (labels == -1).sum()
        sil = silhouette_score(X_s, labels) if n_clusters > 1 else -1
        rows.append([eps, min_s, n_clusters, n_noise, f"{sil:.4f}"])
    print_table(["eps", "min_samples", "簇数", "噪声点", "轮廓系数"], rows, "DBSCAN 参数对比")

    db_best = DBSCAN(eps=0.5, min_samples=5)
    labels_best = db_best.fit_predict(X_s)
    ascii_scatter(X_s, labels_best, "DBSCAN 聚类结果(eps=0.5, min_samples=5)")

X, _ = make_data()
mode_dbscan(X)

Step 5:用 mode_anomaly 找出孤立异常点

痛点与机制

孤立森林像在森林里随机切路:正常点挤在人群中,要切很多刀才能分开;异常点站得远,几刀就被孤立。contamination 是你预估异常比例,设得越大,模型越容易把点判成异常。

核心源码(逐字来自文末完整源码)

def mode_anomaly(X: np.ndarray) -> None:
    print(f"[{nexdo_time()}] 异常检测对比")
    # 注入5%异常点
    rng = np.random.RandomState(0)
    n_outliers = 20
    outliers = rng.uniform(low=-8, high=8, size=(n_outliers, 2))
    X_with_outliers = np.vstack([X, outliers])
    scaler = StandardScaler()
    X_s = scaler.fit_transform(X_with_outliers)

    rows = []
    for name, clf in [
        ("孤立森林(contamination=0.05)", IsolationForest(contamination=0.05, random_state=42)),
        ("孤立森林(contamination=0.10)", IsolationForest(contamination=0.10, random_state=42)),
    ]:
        pred = clf.fit_predict(X_s)  # -1=异常, 1=正常
        n_detected = (pred == -1).sum()
        rows.append([name, n_detected, f"{n_detected/len(X_s)*100:.1f}%"])
    print_table(["方法", "检测异常数", "异常比例"], rows, "异常检测结果")

    # ASCII展示(孤立森林)
    iso = IsolationForest(contamination=0.05, random_state=42)
    labels_iso = iso.fit_predict(X_s)
    # 将 1/-1 转为 0/1 用于绘图
    plot_labels = np.where(labels_iso == 1, 0, 1)
    ascii_scatter(X_s, plot_labels, "孤立森林异常检测(A=正常, B=异常)")

可运行演示(补齐 Mock 数据与 print 反馈)

import time
from typing import Tuple
import numpy as np
from sklearn.datasets import make_blobs
from sklearn.ensemble import IsolationForest
from sklearn.preprocessing import StandardScaler


def nexdo_time() -> str:
    return time.strftime("%Y-%m-%d %H:%M:%S")

def print_table(headers: list, rows: list, title: str = "") -> None:
    if title:
        print(f"\n{'='*60}\n  {title}\n{'='*60}")
    col_widths = [max(len(str(h)), max((len(str(r[i])) for r in rows), default=0))
                  for i, h in enumerate(headers)]
    print(f"┌{'┬'.join('─'*(w+2) for w in col_widths)}┐")
    print(f"│{'│'.join(f' {str(h):<{w}} ' for h, w in zip(headers, col_widths))}│")
    print(f"├{'┼'.join('─'*(w+2) for w in col_widths)}┤")
    for row in rows:
        print(f"│{'│'.join(f' {str(v):<{w}} ' for v, w in zip(row, col_widths))}│")
    print(f"└{'┴'.join('─'*(w+2) for w in col_widths)}┘")

def make_data(n_clusters: int = 4) -> Tuple[np.ndarray, np.ndarray]:
    X, y = make_blobs(n_samples=400, centers=n_clusters, cluster_std=0.8, random_state=42)
    return X, y

def ascii_scatter(X: np.ndarray, labels: np.ndarray, title: str,
                  width: int = 60, height: int = 20) -> None:
    """ASCII散点图,不同簇用不同字符"""
    chars = "ABCDEFGHIJ*"
    x_min, x_max = X[:, 0].min(), X[:, 0].max()
    y_min, y_max = X[:, 1].min(), X[:, 1].max()
    x_range = x_max - x_min + 1e-9
    y_range = y_max - y_min + 1e-9

    grid = [["·"] * width for _ in range(height)]
    for (x, y), label in zip(X, labels):
        col = int((x - x_min) / x_range * (width - 1))
        row = int((y_max - y) / y_range * (height - 1))
        col = max(0, min(width - 1, col))
        row = max(0, min(height - 1, row))
        ch = chars[label % len(chars)] if label >= 0 else "!"
        grid[row][col] = ch

    print(f"\n  {title}")
    print(f"  {'─'*width}")
    for line in grid:
        print(f"  {''.join(line)}")
    print(f"  {'─'*width}")
    unique = sorted(set(labels))
    legend = "  图例: " + "  ".join(
        f"{chars[l % len(chars)]}=簇{l}" if l >= 0 else "!=噪声"
        for l in unique
    )
    print(legend)

def mode_anomaly(X: np.ndarray) -> None:
    print(f"[{nexdo_time()}] 异常检测对比")
    # 注入5%异常点
    rng = np.random.RandomState(0)
    n_outliers = 20
    outliers = rng.uniform(low=-8, high=8, size=(n_outliers, 2))
    X_with_outliers = np.vstack([X, outliers])
    scaler = StandardScaler()
    X_s = scaler.fit_transform(X_with_outliers)

    rows = []
    for name, clf in [
        ("孤立森林(contamination=0.05)", IsolationForest(contamination=0.05, random_state=42)),
        ("孤立森林(contamination=0.10)", IsolationForest(contamination=0.10, random_state=42)),
    ]:
        pred = clf.fit_predict(X_s)  # -1=异常, 1=正常
        n_detected = (pred == -1).sum()
        rows.append([name, n_detected, f"{n_detected/len(X_s)*100:.1f}%"])
    print_table(["方法", "检测异常数", "异常比例"], rows, "异常检测结果")

    # ASCII展示(孤立森林)
    iso = IsolationForest(contamination=0.05, random_state=42)
    labels_iso = iso.fit_predict(X_s)
    # 将 1/-1 转为 0/1 用于绘图
    plot_labels = np.where(labels_iso == 1, 0, 1)
    ascii_scatter(X_s, plot_labels, "孤立森林异常检测(A=正常, B=异常)")

X, _ = make_data()
mode_anomaly(X)

Step 6:用 mode_elbow 辅助选择 K 值

痛点与机制

肘部法则是在看“继续增加 K 还值不值”。如果 K 从 2 到 4 惯性下降很快,4 之后下降变慢,拐点就像手肘。它不是绝对真理,但能帮新手避免拍脑袋选 K。

核心源码(逐字来自文末完整源码)

def mode_elbow(X: np.ndarray) -> None:
    """肘部法则选K"""
    print(f"[{nexdo_time()}] 肘部法则选择最优K")
    scaler = StandardScaler()
    X_s = scaler.fit_transform(X)
    inertias = []
    sils = []
    ks = range(2, 9)
    for k in ks:
        km = KMeans(n_clusters=k, random_state=42, n_init=10)
        labels = km.fit_predict(X_s)
        inertias.append(km.inertia_)
        sils.append(silhouette_score(X_s, labels))

    rows = [(k, f"{iner:.1f}", f"{sil:.4f}") for k, iner, sil in zip(ks, inertias, sils)]
    print_table(["K", "惯性(Inertia)", "轮廓系数"], rows, "肘部法则数据")

    # ASCII折线图
    max_iner = max(inertias)
    print("\n  惯性曲线(肘部法则)")
    height = 8
    for h in range(height, 0, -1):
        line = f"  {h*max_iner/height:8.0f} │"
        for iner in inertias:
            line += "●" if abs(iner - h * max_iner / height) < max_iner / height / 2 else " "
        print(line)
    print(f"           └{'─'*len(inertias)*2}")
    print(f"            K值: {' '.join(str(k) for k in ks)}")

可运行演示(补齐 Mock 数据与 print 反馈)

import time
from typing import Tuple
import numpy as np
from sklearn.datasets import make_blobs
from sklearn.cluster import KMeans
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import silhouette_score


def nexdo_time() -> str:
    return time.strftime("%Y-%m-%d %H:%M:%S")

def print_table(headers: list, rows: list, title: str = "") -> None:
    if title:
        print(f"\n{'='*60}\n  {title}\n{'='*60}")
    col_widths = [max(len(str(h)), max((len(str(r[i])) for r in rows), default=0))
                  for i, h in enumerate(headers)]
    print(f"┌{'┬'.join('─'*(w+2) for w in col_widths)}┐")
    print(f"│{'│'.join(f' {str(h):<{w}} ' for h, w in zip(headers, col_widths))}│")
    print(f"├{'┼'.join('─'*(w+2) for w in col_widths)}┤")
    for row in rows:
        print(f"│{'│'.join(f' {str(v):<{w}} ' for v, w in zip(row, col_widths))}│")
    print(f"└{'┴'.join('─'*(w+2) for w in col_widths)}┘")

def make_data(n_clusters: int = 4) -> Tuple[np.ndarray, np.ndarray]:
    X, y = make_blobs(n_samples=400, centers=n_clusters, cluster_std=0.8, random_state=42)
    return X, y

def mode_elbow(X: np.ndarray) -> None:
    """肘部法则选K"""
    print(f"[{nexdo_time()}] 肘部法则选择最优K")
    scaler = StandardScaler()
    X_s = scaler.fit_transform(X)
    inertias = []
    sils = []
    ks = range(2, 9)
    for k in ks:
        km = KMeans(n_clusters=k, random_state=42, n_init=10)
        labels = km.fit_predict(X_s)
        inertias.append(km.inertia_)
        sils.append(silhouette_score(X_s, labels))

    rows = [(k, f"{iner:.1f}", f"{sil:.4f}") for k, iner, sil in zip(ks, inertias, sils)]
    print_table(["K", "惯性(Inertia)", "轮廓系数"], rows, "肘部法则数据")

    # ASCII折线图
    max_iner = max(inertias)
    print("\n  惯性曲线(肘部法则)")
    height = 8
    for h in range(height, 0, -1):
        line = f"  {h*max_iner/height:8.0f} │"
        for iner in inertias:
            line += "●" if abs(iner - h * max_iner / height) < max_iner / height / 2 else " "
        print(line)
    print(f"           └{'─'*len(inertias)*2}")
    print(f"            K值: {' '.join(str(k) for k in ks)}")

X, _ = make_data()
mode_elbow(X)

Step 7:用 main 做 kmeans/dbscan/anomaly/elbow/all 命令调度

痛点与机制

main 是命令行遥控器。新手不用改源码,只要换 --mode,就能单独运行 K-Means、DBSCAN、异常检测、肘部法则,或者用 all 一次跑完整实验。

核心源码(逐字来自文末完整源码)

def main() -> None:
    parser = argparse.ArgumentParser(description="聚类与异常检测演示")
    parser.add_argument("--mode", choices=["kmeans", "dbscan", "anomaly", "elbow", "all"],
                        default="all")
    args = parser.parse_args()
    X, _ = make_data()
    dispatch = {
        "kmeans":  lambda: mode_kmeans(X),
        "dbscan":  lambda: mode_dbscan(X),
        "anomaly": lambda: mode_anomaly(X),
        "elbow":   lambda: mode_elbow(X),
        "all":     lambda: [mode_elbow(X), mode_kmeans(X), mode_dbscan(X), mode_anomaly(X)],
    }
    dispatch[args.mode]()
    print(f"\n[{nexdo_time()}] 完成")

可运行演示(补齐 Mock 数据与 print 反馈)

import argparse
import sys
import numpy as np
from typing import Tuple


def make_data(n_clusters: int = 4) -> Tuple[np.ndarray, np.ndarray]:
    return np.zeros((4, 2)), np.array([0, 1, 2, 3])


def mode_kmeans(X: np.ndarray) -> None:
    print("kmeans:按质心距离把样本分组")


def mode_dbscan(X: np.ndarray) -> None:
    print("dbscan:按密度连通找簇,并标记噪声")


def mode_anomaly(X: np.ndarray) -> None:
    print("anomaly:用孤立森林找异常点")


def mode_elbow(X: np.ndarray) -> None:
    print("elbow:用惯性曲线辅助选择 K")


def nexdo_time() -> str:
    return "2026-04-18 10:58:00"

def main() -> None:
    parser = argparse.ArgumentParser(description="聚类与异常检测演示")
    parser.add_argument("--mode", choices=["kmeans", "dbscan", "anomaly", "elbow", "all"],
                        default="all")
    args = parser.parse_args()
    X, _ = make_data()
    dispatch = {
        "kmeans":  lambda: mode_kmeans(X),
        "dbscan":  lambda: mode_dbscan(X),
        "anomaly": lambda: mode_anomaly(X),
        "elbow":   lambda: mode_elbow(X),
        "all":     lambda: [mode_elbow(X), mode_kmeans(X), mode_dbscan(X), mode_anomaly(X)],
    }
    dispatch[args.mode]()
    print(f"\n[{nexdo_time()}] 完成")

for mode in ["kmeans", "dbscan", "anomaly", "elbow", "all"]:
    print(f"\n$ python3 42-clustering.py --mode {mode}")
    sys.argv = ["prog", "--mode", mode]
    main()

极客实战:完整源码与运行

现在,把上面的积木拼起来,将以下完整代码放进你的编辑器。建议先跑 --mode elbow 判断 K,再跑 --mode kmeans 看聚类图,最后用 --mode dbscan--mode anomaly 对比密度聚类和异常检测。

#!/usr/bin/env python3
"""
42-clustering.py
K-Means / DBSCAN / 孤立森林对比 + ASCII散点图

用法:
  python 42-clustering.py --mode kmeans
  python 42-clustering.py --mode dbscan
  python 42-clustering.py --mode anomaly
  python 42-clustering.py --mode elbow
  python 42-clustering.py --mode all
"""

import argparse
import time
from typing import Tuple

import numpy as np
from sklearn.datasets import make_blobs
from sklearn.cluster import KMeans, DBSCAN
from sklearn.ensemble import IsolationForest
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import silhouette_score


def nexdo_time() -> str:
    return time.strftime("%Y-%m-%d %H:%M:%S")


def print_table(headers: list, rows: list, title: str = "") -> None:
    if title:
        print(f"\n{'='*60}\n  {title}\n{'='*60}")
    col_widths = [max(len(str(h)), max((len(str(r[i])) for r in rows), default=0))
                  for i, h in enumerate(headers)]
    print(f"┌{'┬'.join('─'*(w+2) for w in col_widths)}┐")
    print(f"│{'│'.join(f' {str(h):<{w}} ' for h, w in zip(headers, col_widths))}│")
    print(f"├{'┼'.join('─'*(w+2) for w in col_widths)}┤")
    for row in rows:
        print(f"│{'│'.join(f' {str(v):<{w}} ' for v, w in zip(row, col_widths))}│")
    print(f"└{'┴'.join('─'*(w+2) for w in col_widths)}┘")


def make_data(n_clusters: int = 4) -> Tuple[np.ndarray, np.ndarray]:
    X, y = make_blobs(n_samples=400, centers=n_clusters, cluster_std=0.8, random_state=42)
    return X, y


def ascii_scatter(X: np.ndarray, labels: np.ndarray, title: str,
                  width: int = 60, height: int = 20) -> None:
    """ASCII散点图,不同簇用不同字符"""
    chars = "ABCDEFGHIJ*"
    x_min, x_max = X[:, 0].min(), X[:, 0].max()
    y_min, y_max = X[:, 1].min(), X[:, 1].max()
    x_range = x_max - x_min + 1e-9
    y_range = y_max - y_min + 1e-9

    grid = [["·"] * width for _ in range(height)]
    for (x, y), label in zip(X, labels):
        col = int((x - x_min) / x_range * (width - 1))
        row = int((y_max - y) / y_range * (height - 1))
        col = max(0, min(width - 1, col))
        row = max(0, min(height - 1, row))
        ch = chars[label % len(chars)] if label >= 0 else "!"
        grid[row][col] = ch

    print(f"\n  {title}")
    print(f"  {'─'*width}")
    for line in grid:
        print(f"  {''.join(line)}")
    print(f"  {'─'*width}")
    unique = sorted(set(labels))
    legend = "  图例: " + "  ".join(
        f"{chars[l % len(chars)]}=簇{l}" if l >= 0 else "!=噪声"
        for l in unique
    )
    print(legend)


def mode_kmeans(X: np.ndarray) -> None:
    print(f"[{nexdo_time()}] K-Means 聚类")
    scaler = StandardScaler()
    X_s = scaler.fit_transform(X)
    km = KMeans(n_clusters=4, init="k-means++", random_state=42, n_init=10)
    labels = km.fit_predict(X_s)
    sil = silhouette_score(X_s, labels)
    print(f"  轮廓系数: {sil:.4f}  惯性: {km.inertia_:.2f}")
    ascii_scatter(X_s, labels, "K-Means 聚类结果(K=4)")


def mode_dbscan(X: np.ndarray) -> None:
    print(f"[{nexdo_time()}] DBSCAN 聚类")
    scaler = StandardScaler()
    X_s = scaler.fit_transform(X)
    rows = []
    for eps, min_s in [(0.3, 5), (0.5, 5), (0.5, 10), (0.8, 5)]:
        db = DBSCAN(eps=eps, min_samples=min_s)
        labels = db.fit_predict(X_s)
        n_clusters = len(set(labels)) - (1 if -1 in labels else 0)
        n_noise = (labels == -1).sum()
        sil = silhouette_score(X_s, labels) if n_clusters > 1 else -1
        rows.append([eps, min_s, n_clusters, n_noise, f"{sil:.4f}"])
    print_table(["eps", "min_samples", "簇数", "噪声点", "轮廓系数"], rows, "DBSCAN 参数对比")

    db_best = DBSCAN(eps=0.5, min_samples=5)
    labels_best = db_best.fit_predict(X_s)
    ascii_scatter(X_s, labels_best, "DBSCAN 聚类结果(eps=0.5, min_samples=5)")


def mode_anomaly(X: np.ndarray) -> None:
    print(f"[{nexdo_time()}] 异常检测对比")
    # 注入5%异常点
    rng = np.random.RandomState(0)
    n_outliers = 20
    outliers = rng.uniform(low=-8, high=8, size=(n_outliers, 2))
    X_with_outliers = np.vstack([X, outliers])
    scaler = StandardScaler()
    X_s = scaler.fit_transform(X_with_outliers)

    rows = []
    for name, clf in [
        ("孤立森林(contamination=0.05)", IsolationForest(contamination=0.05, random_state=42)),
        ("孤立森林(contamination=0.10)", IsolationForest(contamination=0.10, random_state=42)),
    ]:
        pred = clf.fit_predict(X_s)  # -1=异常, 1=正常
        n_detected = (pred == -1).sum()
        rows.append([name, n_detected, f"{n_detected/len(X_s)*100:.1f}%"])
    print_table(["方法", "检测异常数", "异常比例"], rows, "异常检测结果")

    # ASCII展示(孤立森林)
    iso = IsolationForest(contamination=0.05, random_state=42)
    labels_iso = iso.fit_predict(X_s)
    # 将 1/-1 转为 0/1 用于绘图
    plot_labels = np.where(labels_iso == 1, 0, 1)
    ascii_scatter(X_s, plot_labels, "孤立森林异常检测(A=正常, B=异常)")


def mode_elbow(X: np.ndarray) -> None:
    """肘部法则选K"""
    print(f"[{nexdo_time()}] 肘部法则选择最优K")
    scaler = StandardScaler()
    X_s = scaler.fit_transform(X)
    inertias = []
    sils = []
    ks = range(2, 9)
    for k in ks:
        km = KMeans(n_clusters=k, random_state=42, n_init=10)
        labels = km.fit_predict(X_s)
        inertias.append(km.inertia_)
        sils.append(silhouette_score(X_s, labels))

    rows = [(k, f"{iner:.1f}", f"{sil:.4f}") for k, iner, sil in zip(ks, inertias, sils)]
    print_table(["K", "惯性(Inertia)", "轮廓系数"], rows, "肘部法则数据")

    # ASCII折线图
    max_iner = max(inertias)
    print("\n  惯性曲线(肘部法则)")
    height = 8
    for h in range(height, 0, -1):
        line = f"  {h*max_iner/height:8.0f} │"
        for iner in inertias:
            line += "●" if abs(iner - h * max_iner / height) < max_iner / height / 2 else " "
        print(line)
    print(f"           └{'─'*len(inertias)*2}")
    print(f"            K值: {' '.join(str(k) for k in ks)}")


def main() -> None:
    parser = argparse.ArgumentParser(description="聚类与异常检测演示")
    parser.add_argument("--mode", choices=["kmeans", "dbscan", "anomaly", "elbow", "all"],
                        default="all")
    args = parser.parse_args()
    X, _ = make_data()
    dispatch = {
        "kmeans":  lambda: mode_kmeans(X),
        "dbscan":  lambda: mode_dbscan(X),
        "anomaly": lambda: mode_anomaly(X),
        "elbow":   lambda: mode_elbow(X),
        "all":     lambda: [mode_elbow(X), mode_kmeans(X), mode_dbscan(X), mode_anomaly(X)],
    }
    dispatch[args.mode]()
    print(f"\n[{nexdo_time()}] 完成")


if __name__ == "__main__":
    main()
$ python3 42-clustering.py --mode elbow
[2026-04-18 10:55:45] 肘部法则选择最优K

============================================================
  肘部法则数据
============================================================
┌───┬─────────────┬────────┐
│ K │ 惯性(Inertia) │ 轮廓系数   │
├───┼─────────────┼────────┤
2 │ 417.2       │ 0.5702 │
3 │ 89.7        │ 0.7638 │
4 │ 15.0        │ 0.8403 │
5 │ 13.4        │ 0.7040 │
6 │ 11.9        │ 0.5770 │
7 │ 10.5        │ 0.4511 │
8 │ 9.3         │ 0.3408 │
└───┴─────────────┴────────┘

  惯性曲线(肘部法则)
       417 │●      
       365       313       261       209       156
$ python3 42-clustering.py --mode kmeans
[2026-04-18 10:55:46] K-Means 聚类
  轮廓系数: 0.8403  惯性: 15.00

  K-Means 聚类结果(K=4)
  ────────────────────────────────────────────────────────────
  ··························DDDDD·····························
  ····B················D··DDDDDDDDDD··························
  BBBBBBBBBB·BB··········DDDDDDDDDD···························
  B··BBBBBBBB··BB··········DDDDDD·D···························
  ··BBBBBBBBBB················································
  ··········B···········································A·····
  ····························································
  ················································AAAAAAA·····
  ···············································A·AAAAAAAAAA·
  ············································A··AAAAAAAAAAAA·
  ······················································A···A·
  ····························································
  ····························································

小结

模块 你要记住什么
make_data 生成本地二维聚类数据,避免外部依赖
ascii_scatter 用终端字符画看见簇、噪声和异常点
mode_kmeans 用质心迭代分组,适合近似圆形簇
mode_dbscan 用密度连通找簇,不必预先指定 K
mode_anomaly 用孤立森林找离群样本
mode_elbow 用惯性和轮廓系数辅助选择 K
main --mode 把每个实验拆成可单独运行的命令

⏱ NexDo Time(5 分钟)

挑战:把 make_data() 里的 cluster_std=0.8 改成 1.5,重新运行 --mode kmeans--mode dbscan,观察簇变得更重叠后,两个算法的表现有什么变化。

Don’t wait for next time, do it in the next moment.