42 · 无监督学习:K-Means 聚类与异常检测
🔗 知识图谱导航:阅读本文前,建议先回顾《39 · scikit-learn 实战》中的标准化和评估思路,以及《41 · 分类树与随机森林》中的“模型不是只看训练结果,还要看泛化与解释”的思路;本文进入无监督学习:没有标签,也要从数据里找结构。
运行环境:
pip install numpy scikit-learn。本文所有数据都由make_blobs()本地生成,不需要下载文件,也不依赖外部服务。
痛点与架构:现实里很多数据没有人工标签,比如用户行为、交易记录、设备指标。无监督学习要做的事,就是先不问“正确答案是什么”,而是让算法自己发现分组、密度和异常点。本文用 K-Means、DBSCAN 和孤立森林完成一条无标签数据探索链路。
无监督学习先建立直觉
K-Means:先指定 K,让样本去最近的质心集合。
DBSCAN:不指定 K,按密度连通找簇,离群点标为噪声。
孤立森林:不做分组,专门找“容易被孤立”的异常点。
肘部法则:帮助判断 K-Means 里 K 取多少更合理。
极客解析:监督学习像有答案的考试,无监督学习像整理一箱没有标签的零件。你要先看哪些零件自然成堆,哪些零件离群,再决定后续业务怎么处理。
步步为营:核心逻辑自适应拆解
这一篇拆成 7 个台阶:数据生成、终端可视化、K-Means、DBSCAN、异常检测、肘部法则和 CLI 调度。每段演示都能独立运行,并且会直接打印可见结果。
Step 1:用 make_data 生成无标签聚类数据
痛点与机制:
make_data 是无监督学习的练习场。K-Means、DBSCAN 只看 X 这张二维坐标表,不看 y。这里保留 y 只是为了教学对照,就像老师知道标准答案,但学生做题时看不到。
核心源码(逐字来自文末完整源码):
def make_data(n_clusters: int = 4) -> Tuple[np.ndarray, np.ndarray]:
X, y = make_blobs(n_samples=400, centers=n_clusters, cluster_std=0.8, random_state=42)
return X, y
可运行演示(补齐 Mock 数据与 print 反馈):
from typing import Tuple
import numpy as np
from sklearn.datasets import make_blobs
def make_data(n_clusters: int = 4) -> Tuple[np.ndarray, np.ndarray]:
X, y = make_blobs(n_samples=400, centers=n_clusters, cluster_std=0.8, random_state=42)
return X, y
X, y = make_data(n_clusters=4)
print("数据形状:", X.shape)
print("聚类算法看到的只有 X,不看 y")
print("真实分组只用于教学对照:", dict(zip(*np.unique(y, return_counts=True))))
print("第一行样本:", np.round(X[0], 3).tolist())
Step 2:用 ascii_scatter 在终端画出聚类地图
痛点与机制:
ascii_scatter 把二维坐标压到字符网格里,不需要 matplotlib,也能让新手看到“点大概分成几团”。它像把地图缩印到终端:不同字母代表不同簇,! 代表噪声或异常点。
核心源码(逐字来自文末完整源码):
def ascii_scatter(X: np.ndarray, labels: np.ndarray, title: str,
width: int = 60, height: int = 20) -> None:
"""ASCII散点图,不同簇用不同字符"""
chars = "ABCDEFGHIJ*"
x_min, x_max = X[:, 0].min(), X[:, 0].max()
y_min, y_max = X[:, 1].min(), X[:, 1].max()
x_range = x_max - x_min + 1e-9
y_range = y_max - y_min + 1e-9
grid = [["·"] * width for _ in range(height)]
for (x, y), label in zip(X, labels):
col = int((x - x_min) / x_range * (width - 1))
row = int((y_max - y) / y_range * (height - 1))
col = max(0, min(width - 1, col))
row = max(0, min(height - 1, row))
ch = chars[label % len(chars)] if label >= 0 else "!"
grid[row][col] = ch
print(f"\n {title}")
print(f" {'─'*width}")
for line in grid:
print(f" {''.join(line)}")
print(f" {'─'*width}")
unique = sorted(set(labels))
legend = " 图例: " + " ".join(
f"{chars[l % len(chars)]}=簇{l}" if l >= 0 else "!=噪声"
for l in unique
)
print(legend)
可运行演示(补齐 Mock 数据与 print 反馈):
import numpy as np
def ascii_scatter(X: np.ndarray, labels: np.ndarray, title: str,
width: int = 60, height: int = 20) -> None:
"""ASCII散点图,不同簇用不同字符"""
chars = "ABCDEFGHIJ*"
x_min, x_max = X[:, 0].min(), X[:, 0].max()
y_min, y_max = X[:, 1].min(), X[:, 1].max()
x_range = x_max - x_min + 1e-9
y_range = y_max - y_min + 1e-9
grid = [["·"] * width for _ in range(height)]
for (x, y), label in zip(X, labels):
col = int((x - x_min) / x_range * (width - 1))
row = int((y_max - y) / y_range * (height - 1))
col = max(0, min(width - 1, col))
row = max(0, min(height - 1, row))
ch = chars[label % len(chars)] if label >= 0 else "!"
grid[row][col] = ch
print(f"\n {title}")
print(f" {'─'*width}")
for line in grid:
print(f" {''.join(line)}")
print(f" {'─'*width}")
unique = sorted(set(labels))
legend = " 图例: " + " ".join(
f"{chars[l % len(chars)]}=簇{l}" if l >= 0 else "!=噪声"
for l in unique
)
print(legend)
X = np.array([
[0.0, 0.0], [0.2, 0.1], [0.1, 0.3],
[2.8, 3.0], [3.1, 2.9], [3.0, 3.2],
[6.0, 0.2],
])
labels = np.array([0, 0, 0, 1, 1, 1, -1])
ascii_scatter(X, labels, "两簇 + 一个噪声点", width=36, height=10)
Step 3:用 mode_kmeans 按质心距离分组
痛点与机制:
K-Means 像“先选几个临时集合点,再让每个人去最近集合点排队”。排完后重新计算集合点,反复迭代。轮廓系数 越大,说明簇内更紧、簇间更远;惯性 越小,说明点离自己的质心更近。
核心源码(逐字来自文末完整源码):
def mode_kmeans(X: np.ndarray) -> None:
print(f"[{nexdo_time()}] K-Means 聚类")
scaler = StandardScaler()
X_s = scaler.fit_transform(X)
km = KMeans(n_clusters=4, init="k-means++", random_state=42, n_init=10)
labels = km.fit_predict(X_s)
sil = silhouette_score(X_s, labels)
print(f" 轮廓系数: {sil:.4f} 惯性: {km.inertia_:.2f}")
ascii_scatter(X_s, labels, "K-Means 聚类结果(K=4)")
可运行演示(补齐 Mock 数据与 print 反馈):
import time
from typing import Tuple
import numpy as np
from sklearn.datasets import make_blobs
from sklearn.cluster import KMeans
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import silhouette_score
def nexdo_time() -> str:
return time.strftime("%Y-%m-%d %H:%M:%S")
def make_data(n_clusters: int = 4) -> Tuple[np.ndarray, np.ndarray]:
X, y = make_blobs(n_samples=400, centers=n_clusters, cluster_std=0.8, random_state=42)
return X, y
def ascii_scatter(X: np.ndarray, labels: np.ndarray, title: str,
width: int = 60, height: int = 20) -> None:
"""ASCII散点图,不同簇用不同字符"""
chars = "ABCDEFGHIJ*"
x_min, x_max = X[:, 0].min(), X[:, 0].max()
y_min, y_max = X[:, 1].min(), X[:, 1].max()
x_range = x_max - x_min + 1e-9
y_range = y_max - y_min + 1e-9
grid = [["·"] * width for _ in range(height)]
for (x, y), label in zip(X, labels):
col = int((x - x_min) / x_range * (width - 1))
row = int((y_max - y) / y_range * (height - 1))
col = max(0, min(width - 1, col))
row = max(0, min(height - 1, row))
ch = chars[label % len(chars)] if label >= 0 else "!"
grid[row][col] = ch
print(f"\n {title}")
print(f" {'─'*width}")
for line in grid:
print(f" {''.join(line)}")
print(f" {'─'*width}")
unique = sorted(set(labels))
legend = " 图例: " + " ".join(
f"{chars[l % len(chars)]}=簇{l}" if l >= 0 else "!=噪声"
for l in unique
)
print(legend)
def mode_kmeans(X: np.ndarray) -> None:
print(f"[{nexdo_time()}] K-Means 聚类")
scaler = StandardScaler()
X_s = scaler.fit_transform(X)
km = KMeans(n_clusters=4, init="k-means++", random_state=42, n_init=10)
labels = km.fit_predict(X_s)
sil = silhouette_score(X_s, labels)
print(f" 轮廓系数: {sil:.4f} 惯性: {km.inertia_:.2f}")
ascii_scatter(X_s, labels, "K-Means 聚类结果(K=4)")
X, _ = make_data()
mode_kmeans(X)
Step 4:用 mode_dbscan 按密度找簇和噪声
痛点与机制:
DBSCAN 像找朋友圈:一个人附近朋友够多,就是核心成员;核心成员连成一片就是簇;离大家都远的人会被标成噪声。它不需要提前指定 K,但 eps 和 min_samples 会强烈影响结果。
核心源码(逐字来自文末完整源码):
def mode_dbscan(X: np.ndarray) -> None:
print(f"[{nexdo_time()}] DBSCAN 聚类")
scaler = StandardScaler()
X_s = scaler.fit_transform(X)
rows = []
for eps, min_s in [(0.3, 5), (0.5, 5), (0.5, 10), (0.8, 5)]:
db = DBSCAN(eps=eps, min_samples=min_s)
labels = db.fit_predict(X_s)
n_clusters = len(set(labels)) - (1 if -1 in labels else 0)
n_noise = (labels == -1).sum()
sil = silhouette_score(X_s, labels) if n_clusters > 1 else -1
rows.append([eps, min_s, n_clusters, n_noise, f"{sil:.4f}"])
print_table(["eps", "min_samples", "簇数", "噪声点", "轮廓系数"], rows, "DBSCAN 参数对比")
db_best = DBSCAN(eps=0.5, min_samples=5)
labels_best = db_best.fit_predict(X_s)
ascii_scatter(X_s, labels_best, "DBSCAN 聚类结果(eps=0.5, min_samples=5)")
可运行演示(补齐 Mock 数据与 print 反馈):
import time
from typing import Tuple
import numpy as np
from sklearn.datasets import make_blobs
from sklearn.cluster import DBSCAN
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import silhouette_score
def nexdo_time() -> str:
return time.strftime("%Y-%m-%d %H:%M:%S")
def print_table(headers: list, rows: list, title: str = "") -> None:
if title:
print(f"\n{'='*60}\n {title}\n{'='*60}")
col_widths = [max(len(str(h)), max((len(str(r[i])) for r in rows), default=0))
for i, h in enumerate(headers)]
print(f"┌{'┬'.join('─'*(w+2) for w in col_widths)}┐")
print(f"│{'│'.join(f' {str(h):<{w}} ' for h, w in zip(headers, col_widths))}│")
print(f"├{'┼'.join('─'*(w+2) for w in col_widths)}┤")
for row in rows:
print(f"│{'│'.join(f' {str(v):<{w}} ' for v, w in zip(row, col_widths))}│")
print(f"└{'┴'.join('─'*(w+2) for w in col_widths)}┘")
def make_data(n_clusters: int = 4) -> Tuple[np.ndarray, np.ndarray]:
X, y = make_blobs(n_samples=400, centers=n_clusters, cluster_std=0.8, random_state=42)
return X, y
def ascii_scatter(X: np.ndarray, labels: np.ndarray, title: str,
width: int = 60, height: int = 20) -> None:
"""ASCII散点图,不同簇用不同字符"""
chars = "ABCDEFGHIJ*"
x_min, x_max = X[:, 0].min(), X[:, 0].max()
y_min, y_max = X[:, 1].min(), X[:, 1].max()
x_range = x_max - x_min + 1e-9
y_range = y_max - y_min + 1e-9
grid = [["·"] * width for _ in range(height)]
for (x, y), label in zip(X, labels):
col = int((x - x_min) / x_range * (width - 1))
row = int((y_max - y) / y_range * (height - 1))
col = max(0, min(width - 1, col))
row = max(0, min(height - 1, row))
ch = chars[label % len(chars)] if label >= 0 else "!"
grid[row][col] = ch
print(f"\n {title}")
print(f" {'─'*width}")
for line in grid:
print(f" {''.join(line)}")
print(f" {'─'*width}")
unique = sorted(set(labels))
legend = " 图例: " + " ".join(
f"{chars[l % len(chars)]}=簇{l}" if l >= 0 else "!=噪声"
for l in unique
)
print(legend)
def mode_dbscan(X: np.ndarray) -> None:
print(f"[{nexdo_time()}] DBSCAN 聚类")
scaler = StandardScaler()
X_s = scaler.fit_transform(X)
rows = []
for eps, min_s in [(0.3, 5), (0.5, 5), (0.5, 10), (0.8, 5)]:
db = DBSCAN(eps=eps, min_samples=min_s)
labels = db.fit_predict(X_s)
n_clusters = len(set(labels)) - (1 if -1 in labels else 0)
n_noise = (labels == -1).sum()
sil = silhouette_score(X_s, labels) if n_clusters > 1 else -1
rows.append([eps, min_s, n_clusters, n_noise, f"{sil:.4f}"])
print_table(["eps", "min_samples", "簇数", "噪声点", "轮廓系数"], rows, "DBSCAN 参数对比")
db_best = DBSCAN(eps=0.5, min_samples=5)
labels_best = db_best.fit_predict(X_s)
ascii_scatter(X_s, labels_best, "DBSCAN 聚类结果(eps=0.5, min_samples=5)")
X, _ = make_data()
mode_dbscan(X)
Step 5:用 mode_anomaly 找出孤立异常点
痛点与机制:
孤立森林像在森林里随机切路:正常点挤在人群中,要切很多刀才能分开;异常点站得远,几刀就被孤立。contamination 是你预估异常比例,设得越大,模型越容易把点判成异常。
核心源码(逐字来自文末完整源码):
def mode_anomaly(X: np.ndarray) -> None:
print(f"[{nexdo_time()}] 异常检测对比")
# 注入5%异常点
rng = np.random.RandomState(0)
n_outliers = 20
outliers = rng.uniform(low=-8, high=8, size=(n_outliers, 2))
X_with_outliers = np.vstack([X, outliers])
scaler = StandardScaler()
X_s = scaler.fit_transform(X_with_outliers)
rows = []
for name, clf in [
("孤立森林(contamination=0.05)", IsolationForest(contamination=0.05, random_state=42)),
("孤立森林(contamination=0.10)", IsolationForest(contamination=0.10, random_state=42)),
]:
pred = clf.fit_predict(X_s) # -1=异常, 1=正常
n_detected = (pred == -1).sum()
rows.append([name, n_detected, f"{n_detected/len(X_s)*100:.1f}%"])
print_table(["方法", "检测异常数", "异常比例"], rows, "异常检测结果")
# ASCII展示(孤立森林)
iso = IsolationForest(contamination=0.05, random_state=42)
labels_iso = iso.fit_predict(X_s)
# 将 1/-1 转为 0/1 用于绘图
plot_labels = np.where(labels_iso == 1, 0, 1)
ascii_scatter(X_s, plot_labels, "孤立森林异常检测(A=正常, B=异常)")
可运行演示(补齐 Mock 数据与 print 反馈):
import time
from typing import Tuple
import numpy as np
from sklearn.datasets import make_blobs
from sklearn.ensemble import IsolationForest
from sklearn.preprocessing import StandardScaler
def nexdo_time() -> str:
return time.strftime("%Y-%m-%d %H:%M:%S")
def print_table(headers: list, rows: list, title: str = "") -> None:
if title:
print(f"\n{'='*60}\n {title}\n{'='*60}")
col_widths = [max(len(str(h)), max((len(str(r[i])) for r in rows), default=0))
for i, h in enumerate(headers)]
print(f"┌{'┬'.join('─'*(w+2) for w in col_widths)}┐")
print(f"│{'│'.join(f' {str(h):<{w}} ' for h, w in zip(headers, col_widths))}│")
print(f"├{'┼'.join('─'*(w+2) for w in col_widths)}┤")
for row in rows:
print(f"│{'│'.join(f' {str(v):<{w}} ' for v, w in zip(row, col_widths))}│")
print(f"└{'┴'.join('─'*(w+2) for w in col_widths)}┘")
def make_data(n_clusters: int = 4) -> Tuple[np.ndarray, np.ndarray]:
X, y = make_blobs(n_samples=400, centers=n_clusters, cluster_std=0.8, random_state=42)
return X, y
def ascii_scatter(X: np.ndarray, labels: np.ndarray, title: str,
width: int = 60, height: int = 20) -> None:
"""ASCII散点图,不同簇用不同字符"""
chars = "ABCDEFGHIJ*"
x_min, x_max = X[:, 0].min(), X[:, 0].max()
y_min, y_max = X[:, 1].min(), X[:, 1].max()
x_range = x_max - x_min + 1e-9
y_range = y_max - y_min + 1e-9
grid = [["·"] * width for _ in range(height)]
for (x, y), label in zip(X, labels):
col = int((x - x_min) / x_range * (width - 1))
row = int((y_max - y) / y_range * (height - 1))
col = max(0, min(width - 1, col))
row = max(0, min(height - 1, row))
ch = chars[label % len(chars)] if label >= 0 else "!"
grid[row][col] = ch
print(f"\n {title}")
print(f" {'─'*width}")
for line in grid:
print(f" {''.join(line)}")
print(f" {'─'*width}")
unique = sorted(set(labels))
legend = " 图例: " + " ".join(
f"{chars[l % len(chars)]}=簇{l}" if l >= 0 else "!=噪声"
for l in unique
)
print(legend)
def mode_anomaly(X: np.ndarray) -> None:
print(f"[{nexdo_time()}] 异常检测对比")
# 注入5%异常点
rng = np.random.RandomState(0)
n_outliers = 20
outliers = rng.uniform(low=-8, high=8, size=(n_outliers, 2))
X_with_outliers = np.vstack([X, outliers])
scaler = StandardScaler()
X_s = scaler.fit_transform(X_with_outliers)
rows = []
for name, clf in [
("孤立森林(contamination=0.05)", IsolationForest(contamination=0.05, random_state=42)),
("孤立森林(contamination=0.10)", IsolationForest(contamination=0.10, random_state=42)),
]:
pred = clf.fit_predict(X_s) # -1=异常, 1=正常
n_detected = (pred == -1).sum()
rows.append([name, n_detected, f"{n_detected/len(X_s)*100:.1f}%"])
print_table(["方法", "检测异常数", "异常比例"], rows, "异常检测结果")
# ASCII展示(孤立森林)
iso = IsolationForest(contamination=0.05, random_state=42)
labels_iso = iso.fit_predict(X_s)
# 将 1/-1 转为 0/1 用于绘图
plot_labels = np.where(labels_iso == 1, 0, 1)
ascii_scatter(X_s, plot_labels, "孤立森林异常检测(A=正常, B=异常)")
X, _ = make_data()
mode_anomaly(X)
Step 6:用 mode_elbow 辅助选择 K 值
痛点与机制:
肘部法则是在看“继续增加 K 还值不值”。如果 K 从 2 到 4 惯性下降很快,4 之后下降变慢,拐点就像手肘。它不是绝对真理,但能帮新手避免拍脑袋选 K。
核心源码(逐字来自文末完整源码):
def mode_elbow(X: np.ndarray) -> None:
"""肘部法则选K"""
print(f"[{nexdo_time()}] 肘部法则选择最优K")
scaler = StandardScaler()
X_s = scaler.fit_transform(X)
inertias = []
sils = []
ks = range(2, 9)
for k in ks:
km = KMeans(n_clusters=k, random_state=42, n_init=10)
labels = km.fit_predict(X_s)
inertias.append(km.inertia_)
sils.append(silhouette_score(X_s, labels))
rows = [(k, f"{iner:.1f}", f"{sil:.4f}") for k, iner, sil in zip(ks, inertias, sils)]
print_table(["K", "惯性(Inertia)", "轮廓系数"], rows, "肘部法则数据")
# ASCII折线图
max_iner = max(inertias)
print("\n 惯性曲线(肘部法则)")
height = 8
for h in range(height, 0, -1):
line = f" {h*max_iner/height:8.0f} │"
for iner in inertias:
line += "●" if abs(iner - h * max_iner / height) < max_iner / height / 2 else " "
print(line)
print(f" └{'─'*len(inertias)*2}")
print(f" K值: {' '.join(str(k) for k in ks)}")
可运行演示(补齐 Mock 数据与 print 反馈):
import time
from typing import Tuple
import numpy as np
from sklearn.datasets import make_blobs
from sklearn.cluster import KMeans
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import silhouette_score
def nexdo_time() -> str:
return time.strftime("%Y-%m-%d %H:%M:%S")
def print_table(headers: list, rows: list, title: str = "") -> None:
if title:
print(f"\n{'='*60}\n {title}\n{'='*60}")
col_widths = [max(len(str(h)), max((len(str(r[i])) for r in rows), default=0))
for i, h in enumerate(headers)]
print(f"┌{'┬'.join('─'*(w+2) for w in col_widths)}┐")
print(f"│{'│'.join(f' {str(h):<{w}} ' for h, w in zip(headers, col_widths))}│")
print(f"├{'┼'.join('─'*(w+2) for w in col_widths)}┤")
for row in rows:
print(f"│{'│'.join(f' {str(v):<{w}} ' for v, w in zip(row, col_widths))}│")
print(f"└{'┴'.join('─'*(w+2) for w in col_widths)}┘")
def make_data(n_clusters: int = 4) -> Tuple[np.ndarray, np.ndarray]:
X, y = make_blobs(n_samples=400, centers=n_clusters, cluster_std=0.8, random_state=42)
return X, y
def mode_elbow(X: np.ndarray) -> None:
"""肘部法则选K"""
print(f"[{nexdo_time()}] 肘部法则选择最优K")
scaler = StandardScaler()
X_s = scaler.fit_transform(X)
inertias = []
sils = []
ks = range(2, 9)
for k in ks:
km = KMeans(n_clusters=k, random_state=42, n_init=10)
labels = km.fit_predict(X_s)
inertias.append(km.inertia_)
sils.append(silhouette_score(X_s, labels))
rows = [(k, f"{iner:.1f}", f"{sil:.4f}") for k, iner, sil in zip(ks, inertias, sils)]
print_table(["K", "惯性(Inertia)", "轮廓系数"], rows, "肘部法则数据")
# ASCII折线图
max_iner = max(inertias)
print("\n 惯性曲线(肘部法则)")
height = 8
for h in range(height, 0, -1):
line = f" {h*max_iner/height:8.0f} │"
for iner in inertias:
line += "●" if abs(iner - h * max_iner / height) < max_iner / height / 2 else " "
print(line)
print(f" └{'─'*len(inertias)*2}")
print(f" K值: {' '.join(str(k) for k in ks)}")
X, _ = make_data()
mode_elbow(X)
Step 7:用 main 做 kmeans/dbscan/anomaly/elbow/all 命令调度
痛点与机制:
main 是命令行遥控器。新手不用改源码,只要换 --mode,就能单独运行 K-Means、DBSCAN、异常检测、肘部法则,或者用 all 一次跑完整实验。
核心源码(逐字来自文末完整源码):
def main() -> None:
parser = argparse.ArgumentParser(description="聚类与异常检测演示")
parser.add_argument("--mode", choices=["kmeans", "dbscan", "anomaly", "elbow", "all"],
default="all")
args = parser.parse_args()
X, _ = make_data()
dispatch = {
"kmeans": lambda: mode_kmeans(X),
"dbscan": lambda: mode_dbscan(X),
"anomaly": lambda: mode_anomaly(X),
"elbow": lambda: mode_elbow(X),
"all": lambda: [mode_elbow(X), mode_kmeans(X), mode_dbscan(X), mode_anomaly(X)],
}
dispatch[args.mode]()
print(f"\n[{nexdo_time()}] 完成")
可运行演示(补齐 Mock 数据与 print 反馈):
import argparse
import sys
import numpy as np
from typing import Tuple
def make_data(n_clusters: int = 4) -> Tuple[np.ndarray, np.ndarray]:
return np.zeros((4, 2)), np.array([0, 1, 2, 3])
def mode_kmeans(X: np.ndarray) -> None:
print("kmeans:按质心距离把样本分组")
def mode_dbscan(X: np.ndarray) -> None:
print("dbscan:按密度连通找簇,并标记噪声")
def mode_anomaly(X: np.ndarray) -> None:
print("anomaly:用孤立森林找异常点")
def mode_elbow(X: np.ndarray) -> None:
print("elbow:用惯性曲线辅助选择 K")
def nexdo_time() -> str:
return "2026-04-18 10:58:00"
def main() -> None:
parser = argparse.ArgumentParser(description="聚类与异常检测演示")
parser.add_argument("--mode", choices=["kmeans", "dbscan", "anomaly", "elbow", "all"],
default="all")
args = parser.parse_args()
X, _ = make_data()
dispatch = {
"kmeans": lambda: mode_kmeans(X),
"dbscan": lambda: mode_dbscan(X),
"anomaly": lambda: mode_anomaly(X),
"elbow": lambda: mode_elbow(X),
"all": lambda: [mode_elbow(X), mode_kmeans(X), mode_dbscan(X), mode_anomaly(X)],
}
dispatch[args.mode]()
print(f"\n[{nexdo_time()}] 完成")
for mode in ["kmeans", "dbscan", "anomaly", "elbow", "all"]:
print(f"\n$ python3 42-clustering.py --mode {mode}")
sys.argv = ["prog", "--mode", mode]
main()
极客实战:完整源码与运行
现在,把上面的积木拼起来,将以下完整代码放进你的编辑器。建议先跑 --mode elbow 判断 K,再跑 --mode kmeans 看聚类图,最后用 --mode dbscan 和 --mode anomaly 对比密度聚类和异常检测。
#!/usr/bin/env python3
"""
42-clustering.py
K-Means / DBSCAN / 孤立森林对比 + ASCII散点图
用法:
python 42-clustering.py --mode kmeans
python 42-clustering.py --mode dbscan
python 42-clustering.py --mode anomaly
python 42-clustering.py --mode elbow
python 42-clustering.py --mode all
"""
import argparse
import time
from typing import Tuple
import numpy as np
from sklearn.datasets import make_blobs
from sklearn.cluster import KMeans, DBSCAN
from sklearn.ensemble import IsolationForest
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import silhouette_score
def nexdo_time() -> str:
return time.strftime("%Y-%m-%d %H:%M:%S")
def print_table(headers: list, rows: list, title: str = "") -> None:
if title:
print(f"\n{'='*60}\n {title}\n{'='*60}")
col_widths = [max(len(str(h)), max((len(str(r[i])) for r in rows), default=0))
for i, h in enumerate(headers)]
print(f"┌{'┬'.join('─'*(w+2) for w in col_widths)}┐")
print(f"│{'│'.join(f' {str(h):<{w}} ' for h, w in zip(headers, col_widths))}│")
print(f"├{'┼'.join('─'*(w+2) for w in col_widths)}┤")
for row in rows:
print(f"│{'│'.join(f' {str(v):<{w}} ' for v, w in zip(row, col_widths))}│")
print(f"└{'┴'.join('─'*(w+2) for w in col_widths)}┘")
def make_data(n_clusters: int = 4) -> Tuple[np.ndarray, np.ndarray]:
X, y = make_blobs(n_samples=400, centers=n_clusters, cluster_std=0.8, random_state=42)
return X, y
def ascii_scatter(X: np.ndarray, labels: np.ndarray, title: str,
width: int = 60, height: int = 20) -> None:
"""ASCII散点图,不同簇用不同字符"""
chars = "ABCDEFGHIJ*"
x_min, x_max = X[:, 0].min(), X[:, 0].max()
y_min, y_max = X[:, 1].min(), X[:, 1].max()
x_range = x_max - x_min + 1e-9
y_range = y_max - y_min + 1e-9
grid = [["·"] * width for _ in range(height)]
for (x, y), label in zip(X, labels):
col = int((x - x_min) / x_range * (width - 1))
row = int((y_max - y) / y_range * (height - 1))
col = max(0, min(width - 1, col))
row = max(0, min(height - 1, row))
ch = chars[label % len(chars)] if label >= 0 else "!"
grid[row][col] = ch
print(f"\n {title}")
print(f" {'─'*width}")
for line in grid:
print(f" {''.join(line)}")
print(f" {'─'*width}")
unique = sorted(set(labels))
legend = " 图例: " + " ".join(
f"{chars[l % len(chars)]}=簇{l}" if l >= 0 else "!=噪声"
for l in unique
)
print(legend)
def mode_kmeans(X: np.ndarray) -> None:
print(f"[{nexdo_time()}] K-Means 聚类")
scaler = StandardScaler()
X_s = scaler.fit_transform(X)
km = KMeans(n_clusters=4, init="k-means++", random_state=42, n_init=10)
labels = km.fit_predict(X_s)
sil = silhouette_score(X_s, labels)
print(f" 轮廓系数: {sil:.4f} 惯性: {km.inertia_:.2f}")
ascii_scatter(X_s, labels, "K-Means 聚类结果(K=4)")
def mode_dbscan(X: np.ndarray) -> None:
print(f"[{nexdo_time()}] DBSCAN 聚类")
scaler = StandardScaler()
X_s = scaler.fit_transform(X)
rows = []
for eps, min_s in [(0.3, 5), (0.5, 5), (0.5, 10), (0.8, 5)]:
db = DBSCAN(eps=eps, min_samples=min_s)
labels = db.fit_predict(X_s)
n_clusters = len(set(labels)) - (1 if -1 in labels else 0)
n_noise = (labels == -1).sum()
sil = silhouette_score(X_s, labels) if n_clusters > 1 else -1
rows.append([eps, min_s, n_clusters, n_noise, f"{sil:.4f}"])
print_table(["eps", "min_samples", "簇数", "噪声点", "轮廓系数"], rows, "DBSCAN 参数对比")
db_best = DBSCAN(eps=0.5, min_samples=5)
labels_best = db_best.fit_predict(X_s)
ascii_scatter(X_s, labels_best, "DBSCAN 聚类结果(eps=0.5, min_samples=5)")
def mode_anomaly(X: np.ndarray) -> None:
print(f"[{nexdo_time()}] 异常检测对比")
# 注入5%异常点
rng = np.random.RandomState(0)
n_outliers = 20
outliers = rng.uniform(low=-8, high=8, size=(n_outliers, 2))
X_with_outliers = np.vstack([X, outliers])
scaler = StandardScaler()
X_s = scaler.fit_transform(X_with_outliers)
rows = []
for name, clf in [
("孤立森林(contamination=0.05)", IsolationForest(contamination=0.05, random_state=42)),
("孤立森林(contamination=0.10)", IsolationForest(contamination=0.10, random_state=42)),
]:
pred = clf.fit_predict(X_s) # -1=异常, 1=正常
n_detected = (pred == -1).sum()
rows.append([name, n_detected, f"{n_detected/len(X_s)*100:.1f}%"])
print_table(["方法", "检测异常数", "异常比例"], rows, "异常检测结果")
# ASCII展示(孤立森林)
iso = IsolationForest(contamination=0.05, random_state=42)
labels_iso = iso.fit_predict(X_s)
# 将 1/-1 转为 0/1 用于绘图
plot_labels = np.where(labels_iso == 1, 0, 1)
ascii_scatter(X_s, plot_labels, "孤立森林异常检测(A=正常, B=异常)")
def mode_elbow(X: np.ndarray) -> None:
"""肘部法则选K"""
print(f"[{nexdo_time()}] 肘部法则选择最优K")
scaler = StandardScaler()
X_s = scaler.fit_transform(X)
inertias = []
sils = []
ks = range(2, 9)
for k in ks:
km = KMeans(n_clusters=k, random_state=42, n_init=10)
labels = km.fit_predict(X_s)
inertias.append(km.inertia_)
sils.append(silhouette_score(X_s, labels))
rows = [(k, f"{iner:.1f}", f"{sil:.4f}") for k, iner, sil in zip(ks, inertias, sils)]
print_table(["K", "惯性(Inertia)", "轮廓系数"], rows, "肘部法则数据")
# ASCII折线图
max_iner = max(inertias)
print("\n 惯性曲线(肘部法则)")
height = 8
for h in range(height, 0, -1):
line = f" {h*max_iner/height:8.0f} │"
for iner in inertias:
line += "●" if abs(iner - h * max_iner / height) < max_iner / height / 2 else " "
print(line)
print(f" └{'─'*len(inertias)*2}")
print(f" K值: {' '.join(str(k) for k in ks)}")
def main() -> None:
parser = argparse.ArgumentParser(description="聚类与异常检测演示")
parser.add_argument("--mode", choices=["kmeans", "dbscan", "anomaly", "elbow", "all"],
default="all")
args = parser.parse_args()
X, _ = make_data()
dispatch = {
"kmeans": lambda: mode_kmeans(X),
"dbscan": lambda: mode_dbscan(X),
"anomaly": lambda: mode_anomaly(X),
"elbow": lambda: mode_elbow(X),
"all": lambda: [mode_elbow(X), mode_kmeans(X), mode_dbscan(X), mode_anomaly(X)],
}
dispatch[args.mode]()
print(f"\n[{nexdo_time()}] 完成")
if __name__ == "__main__":
main()
$ python3 42-clustering.py --mode elbow
[2026-04-18 10:55:45] 肘部法则选择最优K
============================================================
肘部法则数据
============================================================
┌───┬─────────────┬────────┐
│ K │ 惯性(Inertia) │ 轮廓系数 │
├───┼─────────────┼────────┤
│ 2 │ 417.2 │ 0.5702 │
│ 3 │ 89.7 │ 0.7638 │
│ 4 │ 15.0 │ 0.8403 │
│ 5 │ 13.4 │ 0.7040 │
│ 6 │ 11.9 │ 0.5770 │
│ 7 │ 10.5 │ 0.4511 │
│ 8 │ 9.3 │ 0.3408 │
└───┴─────────────┴────────┘
惯性曲线(肘部法则)
417 │●
365 │
313 │
261 │
209 │
156 │
$ python3 42-clustering.py --mode kmeans
[2026-04-18 10:55:46] K-Means 聚类
轮廓系数: 0.8403 惯性: 15.00
K-Means 聚类结果(K=4)
────────────────────────────────────────────────────────────
··························DDDDD·····························
····B················D··DDDDDDDDDD··························
BBBBBBBBBB·BB··········DDDDDDDDDD···························
B··BBBBBBBB··BB··········DDDDDD·D···························
··BBBBBBBBBB················································
··········B···········································A·····
····························································
················································AAAAAAA·····
···············································A·AAAAAAAAAA·
············································A··AAAAAAAAAAAA·
······················································A···A·
····························································
····························································
小结
| 模块 | 你要记住什么 |
|---|---|
make_data |
生成本地二维聚类数据,避免外部依赖 |
ascii_scatter |
用终端字符画看见簇、噪声和异常点 |
mode_kmeans |
用质心迭代分组,适合近似圆形簇 |
mode_dbscan |
用密度连通找簇,不必预先指定 K |
mode_anomaly |
用孤立森林找离群样本 |
mode_elbow |
用惯性和轮廓系数辅助选择 K |
main |
用 --mode 把每个实验拆成可单独运行的命令 |
⏱ NexDo Time(5 分钟)
挑战:把 make_data() 里的 cluster_std=0.8 改成 1.5,重新运行 --mode kmeans 和 --mode dbscan,观察簇变得更重叠后,两个算法的表现有什么变化。
Don’t wait for next time, do it in the next moment.