52 · 推荐系统:协同过滤与矩阵分解
🔗 知识图谱导航:阅读本文前,建议先回顾《35 · 线性代数与矩阵运算》中的向量相似度,以及《48 · 神经网络基础:从零手写前向传播与反向传播》中的梯度下降思想。本文会把这些基础用在推荐系统里。 NexDo Time · 2026-04-17 · 预计阅读 32 分钟
痛点与架构
推荐系统解决的是一个很现实的问题:用户没有看过所有电影,也没有给所有商品打分,但平台仍然要猜“这个用户接下来可能喜欢什么”。评分矩阵里大量位置都是 0,代表未知,不代表用户讨厌。
本文用一份内置的用户-电影评分矩阵,从三条路线理解推荐:UserCF 找相似用户,ItemCF 找相似物品,矩阵分解把用户和电影压缩成隐向量。它们都在回答同一个问题:如何用已知评分预测未知评分。
用户-电影评分矩阵
-> UserCF:和我口味相似的人喜欢什么
-> ItemCF:和我看过的电影相似的电影是什么
-> MF:用户隐向量 · 电影隐向量 = 预测评分
-> RMSE/MAE:预测分和真实分差多少
步步为营:核心逻辑自适应拆解
推荐系统的代码不难,但概念容易混。下面拆成 9 个步骤,先看相似度,再看三种推荐器,最后看评估和命令行入口。
Step 1:用余弦和皮尔逊判断两个用户像不像
痛点与机制:
推荐系统第一步是“找相似”。余弦相似度像看两个人评分方向是不是一致,皮尔逊会先扣掉个人平均分,避免“有的人天生爱打高分、有的人天生严苛”造成误判。0 分在这里代表没看过,不参与比较。
核心源码(逐字来自文末完整源码):
def cosine_similarity(a: np.ndarray, b: np.ndarray) -> float:
"""计算两个向量的余弦相似度(忽略0值)。"""
mask = (a != 0) & (b != 0)
if mask.sum() == 0:
return 0.0
a_m, b_m = a[mask], b[mask]
denom = np.linalg.norm(a_m) * np.linalg.norm(b_m)
return float(np.dot(a_m, b_m) / denom) if denom > 0 else 0.0
def pearson_similarity(a: np.ndarray, b: np.ndarray) -> float:
"""皮尔逊相关系数(考虑评分偏差)。"""
mask = (a != 0) & (b != 0)
if mask.sum() < 2:
return 0.0
a_m, b_m = a[mask], b[mask]
a_c = a_m - a_m.mean()
b_c = b_m - b_m.mean()
denom = np.linalg.norm(a_c) * np.linalg.norm(b_c)
return float(np.dot(a_c, b_c) / denom) if denom > 0 else 0.0
可运行演示(补齐 Mock 数据与 print 反馈):
import numpy as np
def cosine_similarity(a: np.ndarray, b: np.ndarray) -> float:
"""计算两个向量的余弦相似度(忽略0值)。"""
mask = (a != 0) & (b != 0)
if mask.sum() == 0:
return 0.0
a_m, b_m = a[mask], b[mask]
denom = np.linalg.norm(a_m) * np.linalg.norm(b_m)
return float(np.dot(a_m, b_m) / denom) if denom > 0 else 0.0
def pearson_similarity(a: np.ndarray, b: np.ndarray) -> float:
"""皮尔逊相关系数(考虑评分偏差)。"""
mask = (a != 0) & (b != 0)
if mask.sum() < 2:
return 0.0
a_m, b_m = a[mask], b[mask]
a_c = a_m - a_m.mean()
b_c = b_m - b_m.mean()
denom = np.linalg.norm(a_c) * np.linalg.norm(b_c)
return float(np.dot(a_c, b_c) / denom) if denom > 0 else 0.0
alice = np.array([5, 4, 0, 0, 1], dtype=float)
bob = np.array([4, 5, 0, 0, 2], dtype=float)
charlie = np.array([0, 0, 5, 4, 1], dtype=float)
print("Alice vs Bob 余弦:", round(cosine_similarity(alice, bob), 4))
print("Alice vs Bob 皮尔逊:", round(pearson_similarity(alice, bob), 4))
print("Alice vs Charlie 余弦:", round(cosine_similarity(alice, charlie), 4))
print("直觉:共同评分越像,相似度越高;没有共同评分就无法判断。")
Step 2:用 UserCF 找相似用户并借他们的口味推荐
痛点与机制:
UserCF 像问朋友:“和我口味相似的人最近喜欢什么?”它先预计算用户相似度,再找评过目标电影的相似用户,用他们偏离自身平均分的程度做加权预测。没有邻居时,就退回到电影平均分。
核心源码(逐字来自文末完整源码):
class UserCF:
"""基于用户的协同过滤。"""
def __init__(self, ratings: np.ndarray, k: int = 3) -> None:
self.R = ratings.copy()
self.k = k
n = ratings.shape[0]
# 预计算用户相似度矩阵
self.sim = np.zeros((n, n))
for i in range(n):
for j in range(i + 1, n):
s = pearson_similarity(ratings[i], ratings[j])
self.sim[i, j] = self.sim[j, i] = s
def predict(self, user_id: int, item_id: int) -> float:
"""预测 user_id 对 item_id 的评分。"""
# 找 K 个最相似且评过该物品的用户
sims = self.sim[user_id].copy()
sims[user_id] = -1 # 排除自身
# 只考虑评过该物品的用户
rated_mask = self.R[:, item_id] > 0
sims[~rated_mask] = -1
top_k = np.argsort(sims)[-self.k:][::-1]
top_k = [u for u in top_k if sims[u] > 0]
if not top_k:
# 回退:返回该物品的平均分
rated = self.R[:, item_id]
return float(rated[rated > 0].mean()) if (rated > 0).any() else 3.0
# 加权平均
user_mean = self.R[user_id][self.R[user_id] > 0].mean()
num, denom = 0.0, 0.0
for u in top_k:
u_mean = self.R[u][self.R[u] > 0].mean()
num += sims[u] * (self.R[u, item_id] - u_mean)
denom += abs(sims[u])
return float(np.clip(user_mean + (num / denom if denom > 0 else 0), 1, 5))
def recommend(self, user_id: int, top_n: int = 3) -> list[tuple[str, float]]:
"""为用户推荐 top_n 部未看过的电影。"""
unseen = np.where(self.R[user_id] == 0)[0]
scores = [(MOVIES[i], self.predict(user_id, i)) for i in unseen]
return sorted(scores, key=lambda x: x[1], reverse=True)[:top_n]
可运行演示(补齐 Mock 数据与 print 反馈):
import numpy as np
RATINGS = np.array([
[5, 4, 0, 0, 1, 0, 0, 2],
[0, 0, 4, 5, 0, 1, 0, 0],
[4, 0, 0, 3, 2, 0, 5, 0],
[0, 5, 4, 0, 0, 3, 0, 0],
[1, 0, 0, 4, 5, 0, 0, 3],
[5, 3, 0, 0, 1, 0, 4, 0],
[0, 0, 5, 4, 0, 2, 0, 0],
[4, 5, 0, 0, 2, 0, 3, 0],
[0, 0, 3, 5, 0, 4, 0, 1],
[3, 0, 0, 0, 4, 0, 5, 2],
], dtype=float)
MOVIES = ["复仇者联盟", "速度与激情", "星际穿越", "火星救援",
"变形金刚", "降临", "肖申克的救赎", "泰坦尼克号"]
USERS = [f"用户{i}" for i in range(10)]
N_USERS, N_ITEMS = RATINGS.shape
def cosine_similarity(a: np.ndarray, b: np.ndarray) -> float:
"""计算两个向量的余弦相似度(忽略0值)。"""
mask = (a != 0) & (b != 0)
if mask.sum() == 0:
return 0.0
a_m, b_m = a[mask], b[mask]
denom = np.linalg.norm(a_m) * np.linalg.norm(b_m)
return float(np.dot(a_m, b_m) / denom) if denom > 0 else 0.0
def pearson_similarity(a: np.ndarray, b: np.ndarray) -> float:
"""皮尔逊相关系数(考虑评分偏差)。"""
mask = (a != 0) & (b != 0)
if mask.sum() < 2:
return 0.0
a_m, b_m = a[mask], b[mask]
a_c = a_m - a_m.mean()
b_c = b_m - b_m.mean()
denom = np.linalg.norm(a_c) * np.linalg.norm(b_c)
return float(np.dot(a_c, b_c) / denom) if denom > 0 else 0.0
class UserCF:
"""基于用户的协同过滤。"""
def __init__(self, ratings: np.ndarray, k: int = 3) -> None:
self.R = ratings.copy()
self.k = k
n = ratings.shape[0]
# 预计算用户相似度矩阵
self.sim = np.zeros((n, n))
for i in range(n):
for j in range(i + 1, n):
s = pearson_similarity(ratings[i], ratings[j])
self.sim[i, j] = self.sim[j, i] = s
def predict(self, user_id: int, item_id: int) -> float:
"""预测 user_id 对 item_id 的评分。"""
# 找 K 个最相似且评过该物品的用户
sims = self.sim[user_id].copy()
sims[user_id] = -1 # 排除自身
# 只考虑评过该物品的用户
rated_mask = self.R[:, item_id] > 0
sims[~rated_mask] = -1
top_k = np.argsort(sims)[-self.k:][::-1]
top_k = [u for u in top_k if sims[u] > 0]
if not top_k:
# 回退:返回该物品的平均分
rated = self.R[:, item_id]
return float(rated[rated > 0].mean()) if (rated > 0).any() else 3.0
# 加权平均
user_mean = self.R[user_id][self.R[user_id] > 0].mean()
num, denom = 0.0, 0.0
for u in top_k:
u_mean = self.R[u][self.R[u] > 0].mean()
num += sims[u] * (self.R[u, item_id] - u_mean)
denom += abs(sims[u])
return float(np.clip(user_mean + (num / denom if denom > 0 else 0), 1, 5))
def recommend(self, user_id: int, top_n: int = 3) -> list[tuple[str, float]]:
"""为用户推荐 top_n 部未看过的电影。"""
unseen = np.where(self.R[user_id] == 0)[0]
scores = [(MOVIES[i], self.predict(user_id, i)) for i in unseen]
return sorted(scores, key=lambda x: x[1], reverse=True)[:top_n]
ucf = UserCF(RATINGS, k=3)
print("用户0与前5个用户的相似度:", np.round(ucf.sim[0, :5], 3).tolist())
print("预测 用户0 对《星际穿越》的评分:", round(ucf.predict(0, 2), 3))
print("给用户0推荐:")
for movie, score in ucf.recommend(0, top_n=3):
print(f" {movie} -> {score:.2f}")
Step 3:用 ItemCF 从用户看过的电影推断相似电影
痛点与机制:
ItemCF 的视角从“找人”变成“找物品”。如果你喜欢《复仇者联盟》,系统会找和它评分模式相似的电影,再结合你看过的电影给未看电影打分。它像超市货架旁的“买了这个的人也喜欢”。
核心源码(逐字来自文末完整源码):
class ItemCF:
"""基于物品的协同过滤。"""
def __init__(self, ratings: np.ndarray, k: int = 3) -> None:
self.R = ratings.copy()
self.k = k
n = ratings.shape[1]
self.sim = np.zeros((n, n))
for i in range(n):
for j in range(i + 1, n):
s = cosine_similarity(ratings[:, i], ratings[:, j])
self.sim[i, j] = self.sim[j, i] = s
def predict(self, user_id: int, item_id: int) -> float:
sims = self.sim[item_id].copy()
sims[item_id] = -1
rated_mask = self.R[user_id] > 0
sims[~rated_mask] = -1
top_k = np.argsort(sims)[-self.k:][::-1]
top_k = [i for i in top_k if sims[i] > 0]
if not top_k:
rated = self.R[:, item_id]
return float(rated[rated > 0].mean()) if (rated > 0).any() else 3.0
num = sum(sims[i] * self.R[user_id, i] for i in top_k)
denom = sum(abs(sims[i]) for i in top_k)
return float(np.clip(num / denom if denom > 0 else 3.0, 1, 5))
def recommend(self, user_id: int, top_n: int = 3) -> list[tuple[str, float]]:
unseen = np.where(self.R[user_id] == 0)[0]
scores = [(MOVIES[i], self.predict(user_id, i)) for i in unseen]
return sorted(scores, key=lambda x: x[1], reverse=True)[:top_n]
可运行演示(补齐 Mock 数据与 print 反馈):
import numpy as np
RATINGS = np.array([
[5, 4, 0, 0, 1, 0, 0, 2],
[0, 0, 4, 5, 0, 1, 0, 0],
[4, 0, 0, 3, 2, 0, 5, 0],
[0, 5, 4, 0, 0, 3, 0, 0],
[1, 0, 0, 4, 5, 0, 0, 3],
[5, 3, 0, 0, 1, 0, 4, 0],
[0, 0, 5, 4, 0, 2, 0, 0],
[4, 5, 0, 0, 2, 0, 3, 0],
[0, 0, 3, 5, 0, 4, 0, 1],
[3, 0, 0, 0, 4, 0, 5, 2],
], dtype=float)
MOVIES = ["复仇者联盟", "速度与激情", "星际穿越", "火星救援",
"变形金刚", "降临", "肖申克的救赎", "泰坦尼克号"]
USERS = [f"用户{i}" for i in range(10)]
N_USERS, N_ITEMS = RATINGS.shape
def cosine_similarity(a: np.ndarray, b: np.ndarray) -> float:
"""计算两个向量的余弦相似度(忽略0值)。"""
mask = (a != 0) & (b != 0)
if mask.sum() == 0:
return 0.0
a_m, b_m = a[mask], b[mask]
denom = np.linalg.norm(a_m) * np.linalg.norm(b_m)
return float(np.dot(a_m, b_m) / denom) if denom > 0 else 0.0
def pearson_similarity(a: np.ndarray, b: np.ndarray) -> float:
"""皮尔逊相关系数(考虑评分偏差)。"""
mask = (a != 0) & (b != 0)
if mask.sum() < 2:
return 0.0
a_m, b_m = a[mask], b[mask]
a_c = a_m - a_m.mean()
b_c = b_m - b_m.mean()
denom = np.linalg.norm(a_c) * np.linalg.norm(b_c)
return float(np.dot(a_c, b_c) / denom) if denom > 0 else 0.0
class ItemCF:
"""基于物品的协同过滤。"""
def __init__(self, ratings: np.ndarray, k: int = 3) -> None:
self.R = ratings.copy()
self.k = k
n = ratings.shape[1]
self.sim = np.zeros((n, n))
for i in range(n):
for j in range(i + 1, n):
s = cosine_similarity(ratings[:, i], ratings[:, j])
self.sim[i, j] = self.sim[j, i] = s
def predict(self, user_id: int, item_id: int) -> float:
sims = self.sim[item_id].copy()
sims[item_id] = -1
rated_mask = self.R[user_id] > 0
sims[~rated_mask] = -1
top_k = np.argsort(sims)[-self.k:][::-1]
top_k = [i for i in top_k if sims[i] > 0]
if not top_k:
rated = self.R[:, item_id]
return float(rated[rated > 0].mean()) if (rated > 0).any() else 3.0
num = sum(sims[i] * self.R[user_id, i] for i in top_k)
denom = sum(abs(sims[i]) for i in top_k)
return float(np.clip(num / denom if denom > 0 else 3.0, 1, 5))
def recommend(self, user_id: int, top_n: int = 3) -> list[tuple[str, float]]:
unseen = np.where(self.R[user_id] == 0)[0]
scores = [(MOVIES[i], self.predict(user_id, i)) for i in unseen]
return sorted(scores, key=lambda x: x[1], reverse=True)[:top_n]
icf = ItemCF(RATINGS, k=3)
print("电影0与所有电影相似度:", np.round(icf.sim[0], 3).tolist())
print("预测 用户1 对《复仇者联盟》的评分:", round(icf.predict(1, 0), 3))
print("给用户1推荐:")
for movie, score in icf.recommend(1, top_n=3):
print(f" {movie} -> {score:.2f}")
Step 4:用 MatrixFactorization 学出用户和电影的隐向量
痛点与机制:
矩阵分解把大评分表拆成两个小表:用户隐向量 P 和电影隐向量 Q。可以把隐向量想成看不见的口味标签,比如动作、科幻、剧情权重。用户向量和电影向量越对胃口,点积预测分越高。
核心源码(逐字来自文末完整源码):
class MatrixFactorization:
"""
基于梯度下降的矩阵分解:
R ≈ P · Q^T,其中 P(n_users, k),Q(n_items, k)
"""
def __init__(self, n_users: int, n_items: int, k: int = 8,
lr: float = 0.01, reg: float = 0.02, seed: int = 42) -> None:
rng = np.random.RandomState(seed)
self.P = rng.randn(n_users, k) * 0.1 # 用户隐向量
self.Q = rng.randn(n_items, k) * 0.1 # 物品隐向量
self.lr = lr
self.reg = reg
def fit(self, R: np.ndarray, epochs: int = 200) -> list[float]:
"""在非零评分上训练。"""
rows, cols = np.where(R > 0)
loss_history: list[float] = []
for epoch in range(epochs):
# shuffle
idx = np.random.permutation(len(rows))
total_loss = 0.0
for i in idx:
u, v = rows[i], cols[i]
pred = self.P[u] @ self.Q[v]
err = R[u, v] - pred
# 梯度更新(含 L2 正则)
self.P[u] += self.lr * (err * self.Q[v] - self.reg * self.P[u])
self.Q[v] += self.lr * (err * self.P[u] - self.reg * self.Q[v])
total_loss += err ** 2
loss_history.append(total_loss / len(rows))
return loss_history
def predict(self, user_id: int, item_id: int) -> float:
return float(np.clip(self.P[user_id] @ self.Q[item_id], 1, 5))
def recommend(self, user_id: int, R: np.ndarray,
top_n: int = 3) -> list[tuple[str, float]]:
unseen = np.where(R[user_id] == 0)[0]
scores = [(MOVIES[i], self.predict(user_id, i)) for i in unseen]
return sorted(scores, key=lambda x: x[1], reverse=True)[:top_n]
可运行演示(补齐 Mock 数据与 print 反馈):
import numpy as np
RATINGS = np.array([
[5, 4, 0, 0, 1, 0, 0, 2],
[0, 0, 4, 5, 0, 1, 0, 0],
[4, 0, 0, 3, 2, 0, 5, 0],
[0, 5, 4, 0, 0, 3, 0, 0],
[1, 0, 0, 4, 5, 0, 0, 3],
[5, 3, 0, 0, 1, 0, 4, 0],
[0, 0, 5, 4, 0, 2, 0, 0],
[4, 5, 0, 0, 2, 0, 3, 0],
[0, 0, 3, 5, 0, 4, 0, 1],
[3, 0, 0, 0, 4, 0, 5, 2],
], dtype=float)
MOVIES = ["复仇者联盟", "速度与激情", "星际穿越", "火星救援",
"变形金刚", "降临", "肖申克的救赎", "泰坦尼克号"]
USERS = [f"用户{i}" for i in range(10)]
N_USERS, N_ITEMS = RATINGS.shape
class MatrixFactorization:
"""
基于梯度下降的矩阵分解:
R ≈ P · Q^T,其中 P(n_users, k),Q(n_items, k)
"""
def __init__(self, n_users: int, n_items: int, k: int = 8,
lr: float = 0.01, reg: float = 0.02, seed: int = 42) -> None:
rng = np.random.RandomState(seed)
self.P = rng.randn(n_users, k) * 0.1 # 用户隐向量
self.Q = rng.randn(n_items, k) * 0.1 # 物品隐向量
self.lr = lr
self.reg = reg
def fit(self, R: np.ndarray, epochs: int = 200) -> list[float]:
"""在非零评分上训练。"""
rows, cols = np.where(R > 0)
loss_history: list[float] = []
for epoch in range(epochs):
# shuffle
idx = np.random.permutation(len(rows))
total_loss = 0.0
for i in idx:
u, v = rows[i], cols[i]
pred = self.P[u] @ self.Q[v]
err = R[u, v] - pred
# 梯度更新(含 L2 正则)
self.P[u] += self.lr * (err * self.Q[v] - self.reg * self.P[u])
self.Q[v] += self.lr * (err * self.P[u] - self.reg * self.Q[v])
total_loss += err ** 2
loss_history.append(total_loss / len(rows))
return loss_history
def predict(self, user_id: int, item_id: int) -> float:
return float(np.clip(self.P[user_id] @ self.Q[item_id], 1, 5))
def recommend(self, user_id: int, R: np.ndarray,
top_n: int = 3) -> list[tuple[str, float]]:
unseen = np.where(R[user_id] == 0)[0]
scores = [(MOVIES[i], self.predict(user_id, i)) for i in unseen]
return sorted(scores, key=lambda x: x[1], reverse=True)[:top_n]
mf = MatrixFactorization(N_USERS, N_ITEMS, k=4, lr=0.01, reg=0.02)
history = mf.fit(RATINGS, epochs=20)
print("前 5 轮损失:", [round(v, 4) for v in history[:5]])
print("最后 5 轮损失:", [round(v, 4) for v in history[-5:]])
print("预测 用户0 对《星际穿越》的评分:", round(mf.predict(0, 2), 3))
print("用户隐向量形状:", mf.P.shape, "物品隐向量形状:", mf.Q.shape)
Step 5:用 mode_usercf 打印用户相似度矩阵和推荐结果
痛点与机制:
单个预测看不出全局效果,所以 mode_usercf() 会打印前 5 个用户的相似度矩阵,再给多个用户推荐电影。矩阵里的数值越高,代表两个用户在共同评分电影上的口味越接近。
核心源码(逐字来自文末完整源码):
def mode_usercf() -> None:
section("用户协同过滤(UserCF)")
ucf = UserCF(RATINGS, k=3)
# 显示用户相似度矩阵(前5个用户)
print("\n 用户相似度矩阵(皮尔逊,前5个用户):")
print(f" {'':8}" + "".join(f"{USERS[j]:>8}" for j in range(5)))
for i in range(5):
row = "".join(f"{ucf.sim[i,j]:8.3f}" for j in range(5))
print(f" {USERS[i]:8}{row}")
# 为用户0和用户1推荐
print(f"\n 推荐结果:")
for uid in [0, 1, 2]:
recs = ucf.recommend(uid, top_n=3)
seen = [MOVIES[i] for i in range(N_ITEMS) if RATINGS[uid, i] > 0]
print(f"\n {USERS[uid]}(已看: {', '.join(seen[:3])}...)")
for movie, score in recs:
bar = "★" * int(score)
print(f" 推荐: {movie:<12} 预测评分: {score:.2f} {bar}")
可运行演示(补齐 Mock 数据与 print 反馈):
import numpy as np
RATINGS = np.array([
[5, 4, 0, 0, 1, 0, 0, 2],
[0, 0, 4, 5, 0, 1, 0, 0],
[4, 0, 0, 3, 2, 0, 5, 0],
[0, 5, 4, 0, 0, 3, 0, 0],
[1, 0, 0, 4, 5, 0, 0, 3],
[5, 3, 0, 0, 1, 0, 4, 0],
[0, 0, 5, 4, 0, 2, 0, 0],
[4, 5, 0, 0, 2, 0, 3, 0],
[0, 0, 3, 5, 0, 4, 0, 1],
[3, 0, 0, 0, 4, 0, 5, 2],
], dtype=float)
MOVIES = ["复仇者联盟", "速度与激情", "星际穿越", "火星救援",
"变形金刚", "降临", "肖申克的救赎", "泰坦尼克号"]
USERS = [f"用户{i}" for i in range(10)]
N_USERS, N_ITEMS = RATINGS.shape
def cosine_similarity(a: np.ndarray, b: np.ndarray) -> float:
"""计算两个向量的余弦相似度(忽略0值)。"""
mask = (a != 0) & (b != 0)
if mask.sum() == 0:
return 0.0
a_m, b_m = a[mask], b[mask]
denom = np.linalg.norm(a_m) * np.linalg.norm(b_m)
return float(np.dot(a_m, b_m) / denom) if denom > 0 else 0.0
def pearson_similarity(a: np.ndarray, b: np.ndarray) -> float:
"""皮尔逊相关系数(考虑评分偏差)。"""
mask = (a != 0) & (b != 0)
if mask.sum() < 2:
return 0.0
a_m, b_m = a[mask], b[mask]
a_c = a_m - a_m.mean()
b_c = b_m - b_m.mean()
denom = np.linalg.norm(a_c) * np.linalg.norm(b_c)
return float(np.dot(a_c, b_c) / denom) if denom > 0 else 0.0
def section(title: str) -> None:
print(f"\n{'='*62}\n {title}\n{'='*62}")
class UserCF:
"""基于用户的协同过滤。"""
def __init__(self, ratings: np.ndarray, k: int = 3) -> None:
self.R = ratings.copy()
self.k = k
n = ratings.shape[0]
# 预计算用户相似度矩阵
self.sim = np.zeros((n, n))
for i in range(n):
for j in range(i + 1, n):
s = pearson_similarity(ratings[i], ratings[j])
self.sim[i, j] = self.sim[j, i] = s
def predict(self, user_id: int, item_id: int) -> float:
"""预测 user_id 对 item_id 的评分。"""
# 找 K 个最相似且评过该物品的用户
sims = self.sim[user_id].copy()
sims[user_id] = -1 # 排除自身
# 只考虑评过该物品的用户
rated_mask = self.R[:, item_id] > 0
sims[~rated_mask] = -1
top_k = np.argsort(sims)[-self.k:][::-1]
top_k = [u for u in top_k if sims[u] > 0]
if not top_k:
# 回退:返回该物品的平均分
rated = self.R[:, item_id]
return float(rated[rated > 0].mean()) if (rated > 0).any() else 3.0
# 加权平均
user_mean = self.R[user_id][self.R[user_id] > 0].mean()
num, denom = 0.0, 0.0
for u in top_k:
u_mean = self.R[u][self.R[u] > 0].mean()
num += sims[u] * (self.R[u, item_id] - u_mean)
denom += abs(sims[u])
return float(np.clip(user_mean + (num / denom if denom > 0 else 0), 1, 5))
def recommend(self, user_id: int, top_n: int = 3) -> list[tuple[str, float]]:
"""为用户推荐 top_n 部未看过的电影。"""
unseen = np.where(self.R[user_id] == 0)[0]
scores = [(MOVIES[i], self.predict(user_id, i)) for i in unseen]
return sorted(scores, key=lambda x: x[1], reverse=True)[:top_n]
def mode_usercf() -> None:
section("用户协同过滤(UserCF)")
ucf = UserCF(RATINGS, k=3)
# 显示用户相似度矩阵(前5个用户)
print("\n 用户相似度矩阵(皮尔逊,前5个用户):")
print(f" {'':8}" + "".join(f"{USERS[j]:>8}" for j in range(5)))
for i in range(5):
row = "".join(f"{ucf.sim[i,j]:8.3f}" for j in range(5))
print(f" {USERS[i]:8}{row}")
# 为用户0和用户1推荐
print(f"\n 推荐结果:")
for uid in [0, 1, 2]:
recs = ucf.recommend(uid, top_n=3)
seen = [MOVIES[i] for i in range(N_ITEMS) if RATINGS[uid, i] > 0]
print(f"\n {USERS[uid]}(已看: {', '.join(seen[:3])}...)")
for movie, score in recs:
bar = "★" * int(score)
print(f" 推荐: {movie:<12} 预测评分: {score:.2f} {bar}")
mode_usercf()
Step 6:用 mode_itemcf 查看电影之间的相似度网络
痛点与机制:
物品相似度矩阵像电影之间的关系图:一部电影和其他电影越像,分数越高。用户已经看过的电影会成为“线索”,系统顺着相似度网络找到可能喜欢的新电影。
核心源码(逐字来自文末完整源码):
def mode_itemcf() -> None:
section("物品协同过滤(ItemCF)")
icf = ItemCF(RATINGS, k=3)
print("\n 物品相似度矩阵(余弦):")
print(f" {'':14}" + "".join(f"{m[:4]:>6}" for m in MOVIES))
for i, movie in enumerate(MOVIES):
row = "".join(f"{icf.sim[i,j]:6.2f}" for j in range(N_ITEMS))
print(f" {movie[:12]:<14}{row}")
print(f"\n 推荐结果(用户0 vs 用户1):")
for uid in [0, 1]:
recs = icf.recommend(uid, top_n=3)
print(f"\n {USERS[uid]}:")
for movie, score in recs:
print(f" {movie:<12} 预测评分: {score:.2f}")
可运行演示(补齐 Mock 数据与 print 反馈):
import numpy as np
RATINGS = np.array([
[5, 4, 0, 0, 1, 0, 0, 2],
[0, 0, 4, 5, 0, 1, 0, 0],
[4, 0, 0, 3, 2, 0, 5, 0],
[0, 5, 4, 0, 0, 3, 0, 0],
[1, 0, 0, 4, 5, 0, 0, 3],
[5, 3, 0, 0, 1, 0, 4, 0],
[0, 0, 5, 4, 0, 2, 0, 0],
[4, 5, 0, 0, 2, 0, 3, 0],
[0, 0, 3, 5, 0, 4, 0, 1],
[3, 0, 0, 0, 4, 0, 5, 2],
], dtype=float)
MOVIES = ["复仇者联盟", "速度与激情", "星际穿越", "火星救援",
"变形金刚", "降临", "肖申克的救赎", "泰坦尼克号"]
USERS = [f"用户{i}" for i in range(10)]
N_USERS, N_ITEMS = RATINGS.shape
def cosine_similarity(a: np.ndarray, b: np.ndarray) -> float:
"""计算两个向量的余弦相似度(忽略0值)。"""
mask = (a != 0) & (b != 0)
if mask.sum() == 0:
return 0.0
a_m, b_m = a[mask], b[mask]
denom = np.linalg.norm(a_m) * np.linalg.norm(b_m)
return float(np.dot(a_m, b_m) / denom) if denom > 0 else 0.0
def pearson_similarity(a: np.ndarray, b: np.ndarray) -> float:
"""皮尔逊相关系数(考虑评分偏差)。"""
mask = (a != 0) & (b != 0)
if mask.sum() < 2:
return 0.0
a_m, b_m = a[mask], b[mask]
a_c = a_m - a_m.mean()
b_c = b_m - b_m.mean()
denom = np.linalg.norm(a_c) * np.linalg.norm(b_c)
return float(np.dot(a_c, b_c) / denom) if denom > 0 else 0.0
def section(title: str) -> None:
print(f"\n{'='*62}\n {title}\n{'='*62}")
class ItemCF:
"""基于物品的协同过滤。"""
def __init__(self, ratings: np.ndarray, k: int = 3) -> None:
self.R = ratings.copy()
self.k = k
n = ratings.shape[1]
self.sim = np.zeros((n, n))
for i in range(n):
for j in range(i + 1, n):
s = cosine_similarity(ratings[:, i], ratings[:, j])
self.sim[i, j] = self.sim[j, i] = s
def predict(self, user_id: int, item_id: int) -> float:
sims = self.sim[item_id].copy()
sims[item_id] = -1
rated_mask = self.R[user_id] > 0
sims[~rated_mask] = -1
top_k = np.argsort(sims)[-self.k:][::-1]
top_k = [i for i in top_k if sims[i] > 0]
if not top_k:
rated = self.R[:, item_id]
return float(rated[rated > 0].mean()) if (rated > 0).any() else 3.0
num = sum(sims[i] * self.R[user_id, i] for i in top_k)
denom = sum(abs(sims[i]) for i in top_k)
return float(np.clip(num / denom if denom > 0 else 3.0, 1, 5))
def recommend(self, user_id: int, top_n: int = 3) -> list[tuple[str, float]]:
unseen = np.where(self.R[user_id] == 0)[0]
scores = [(MOVIES[i], self.predict(user_id, i)) for i in unseen]
return sorted(scores, key=lambda x: x[1], reverse=True)[:top_n]
def mode_itemcf() -> None:
section("物品协同过滤(ItemCF)")
icf = ItemCF(RATINGS, k=3)
print("\n 物品相似度矩阵(余弦):")
print(f" {'':14}" + "".join(f"{m[:4]:>6}" for m in MOVIES))
for i, movie in enumerate(MOVIES):
row = "".join(f"{icf.sim[i,j]:6.2f}" for j in range(N_ITEMS))
print(f" {movie[:12]:<14}{row}")
print(f"\n 推荐结果(用户0 vs 用户1):")
for uid in [0, 1]:
recs = icf.recommend(uid, top_n=3)
print(f"\n {USERS[uid]}:")
for movie, score in recs:
print(f" {movie:<12} 预测评分: {score:.2f}")
mode_itemcf()
Step 7:用 mode_mf 观察矩阵分解的损失下降
痛点与机制:
矩阵分解需要训练。损失曲线像模型的错题本:每一轮都在已知评分上调整用户/电影隐向量,让预测分越来越接近真实评分。条形图变短,说明模型在已知评分上的误差变小。
核心源码(逐字来自文末完整源码):
def mode_mf() -> None:
section("矩阵分解(梯度下降 SVD,k=8 隐因子)")
mf = MatrixFactorization(N_USERS, N_ITEMS, k=8, lr=0.01, reg=0.02)
history = mf.fit(RATINGS, epochs=300)
# 损失曲线
print("\n 训练损失曲线(每60轮):")
checkpoints = [0, 60, 120, 180, 240, 299]
max_loss = history[0]
W = 35
for ep in checkpoints:
loss = history[ep]
bar = "█" * int(loss / max_loss * W)
print(f" epoch {ep+1:>3} │{bar:<{W}}│ {loss:.4f}")
print(f"\n 推荐结果(用户0、1、2):")
for uid in [0, 1, 2]:
recs = mf.recommend(uid, RATINGS, top_n=3)
print(f"\n {USERS[uid]}:")
for movie, score in recs:
bar = "★" * int(score)
print(f" {movie:<12} 预测评分: {score:.2f} {bar}")
可运行演示(补齐 Mock 数据与 print 反馈):
import numpy as np
RATINGS = np.array([
[5, 4, 0, 0, 1, 0, 0, 2],
[0, 0, 4, 5, 0, 1, 0, 0],
[4, 0, 0, 3, 2, 0, 5, 0],
[0, 5, 4, 0, 0, 3, 0, 0],
[1, 0, 0, 4, 5, 0, 0, 3],
[5, 3, 0, 0, 1, 0, 4, 0],
[0, 0, 5, 4, 0, 2, 0, 0],
[4, 5, 0, 0, 2, 0, 3, 0],
[0, 0, 3, 5, 0, 4, 0, 1],
[3, 0, 0, 0, 4, 0, 5, 2],
], dtype=float)
MOVIES = ["复仇者联盟", "速度与激情", "星际穿越", "火星救援",
"变形金刚", "降临", "肖申克的救赎", "泰坦尼克号"]
USERS = [f"用户{i}" for i in range(10)]
N_USERS, N_ITEMS = RATINGS.shape
def section(title: str) -> None:
print(f"\n{'='*62}\n {title}\n{'='*62}")
class MatrixFactorization:
"""
基于梯度下降的矩阵分解:
R ≈ P · Q^T,其中 P(n_users, k),Q(n_items, k)
"""
def __init__(self, n_users: int, n_items: int, k: int = 8,
lr: float = 0.01, reg: float = 0.02, seed: int = 42) -> None:
rng = np.random.RandomState(seed)
self.P = rng.randn(n_users, k) * 0.1 # 用户隐向量
self.Q = rng.randn(n_items, k) * 0.1 # 物品隐向量
self.lr = lr
self.reg = reg
def fit(self, R: np.ndarray, epochs: int = 200) -> list[float]:
"""在非零评分上训练。"""
rows, cols = np.where(R > 0)
loss_history: list[float] = []
for epoch in range(epochs):
# shuffle
idx = np.random.permutation(len(rows))
total_loss = 0.0
for i in idx:
u, v = rows[i], cols[i]
pred = self.P[u] @ self.Q[v]
err = R[u, v] - pred
# 梯度更新(含 L2 正则)
self.P[u] += self.lr * (err * self.Q[v] - self.reg * self.P[u])
self.Q[v] += self.lr * (err * self.P[u] - self.reg * self.Q[v])
total_loss += err ** 2
loss_history.append(total_loss / len(rows))
return loss_history
def predict(self, user_id: int, item_id: int) -> float:
return float(np.clip(self.P[user_id] @ self.Q[item_id], 1, 5))
def recommend(self, user_id: int, R: np.ndarray,
top_n: int = 3) -> list[tuple[str, float]]:
unseen = np.where(R[user_id] == 0)[0]
scores = [(MOVIES[i], self.predict(user_id, i)) for i in unseen]
return sorted(scores, key=lambda x: x[1], reverse=True)[:top_n]
def mode_mf() -> None:
section("矩阵分解(梯度下降 SVD,k=8 隐因子)")
mf = MatrixFactorization(N_USERS, N_ITEMS, k=8, lr=0.01, reg=0.02)
history = mf.fit(RATINGS, epochs=300)
# 损失曲线
print("\n 训练损失曲线(每60轮):")
checkpoints = [0, 60, 120, 180, 240, 299]
max_loss = history[0]
W = 35
for ep in checkpoints:
loss = history[ep]
bar = "█" * int(loss / max_loss * W)
print(f" epoch {ep+1:>3} │{bar:<{W}}│ {loss:.4f}")
print(f"\n 推荐结果(用户0、1、2):")
for uid in [0, 1, 2]:
recs = mf.recommend(uid, RATINGS, top_n=3)
print(f"\n {USERS[uid]}:")
for movie, score in recs:
bar = "★" * int(score)
print(f" {movie:<12} 预测评分: {score:.2f} {bar}")
mode_mf()
Step 8:用 mode_eval 用 RMSE/MAE 对比三种推荐器
痛点与机制:
推荐系统不能只看“推荐列表好像还行”,还要留出一部分已知评分做测试。RMSE 更惩罚大错误,MAE 更像平均错几分。两者越小,说明模型预测评分越贴近真实用户反馈。
核心源码(逐字来自文末完整源码):
def mode_eval() -> None:
section("评估指标对比(留一法验证)")
# 留出10%已知评分作为测试集
rng = np.random.RandomState(42)
known = list(zip(*np.where(RATINGS > 0)))
rng.shuffle(known)
test_size = max(1, len(known) // 10)
test_set = known[:test_size]
R_train = RATINGS.copy()
for u, v in test_set:
R_train[u, v] = 0
# 训练三个模型
ucf = UserCF(R_train, k=3)
icf = ItemCF(R_train, k=3)
mf = MatrixFactorization(N_USERS, N_ITEMS, k=8)
mf.fit(R_train, epochs=200)
def rmse(model, test: list) -> float:
errors = [(RATINGS[u,v] - model.predict(u, v))**2 for u, v in test]
return float(np.sqrt(np.mean(errors)))
def mae(model, test: list) -> float:
errors = [abs(RATINGS[u,v] - model.predict(u, v)) for u, v in test]
return float(np.mean(errors))
print(f"\n 测试集大小: {test_size} 个评分\n")
print(f" {'模型':<20} {'RMSE':<10} {'MAE':<10} 说明")
print(f" {'─'*55}")
for name, model in [("UserCF (k=3)", ucf), ("ItemCF (k=3)", icf), ("矩阵分解 (k=8)", mf)]:
r = rmse(model, test_set)
m = mae(model, test_set)
print(f" {name:<20} {r:.4f} {m:.4f} {'越小越好'}")
print(f"\n 冷启动问题分析:")
print(f" ├── 新用户(无历史): UserCF/ItemCF 无法推荐 → 用热门榜/内容推荐")
print(f" ├── 新物品(无评分): 协同过滤无法推荐 → 用物品属性相似度")
print(f" └── 矩阵分解: 需要重新训练才能加入新用户/物品")
可运行演示(补齐 Mock 数据与 print 反馈):
import numpy as np
RATINGS = np.array([
[5, 4, 0, 0, 1, 0, 0, 2],
[0, 0, 4, 5, 0, 1, 0, 0],
[4, 0, 0, 3, 2, 0, 5, 0],
[0, 5, 4, 0, 0, 3, 0, 0],
[1, 0, 0, 4, 5, 0, 0, 3],
[5, 3, 0, 0, 1, 0, 4, 0],
[0, 0, 5, 4, 0, 2, 0, 0],
[4, 5, 0, 0, 2, 0, 3, 0],
[0, 0, 3, 5, 0, 4, 0, 1],
[3, 0, 0, 0, 4, 0, 5, 2],
], dtype=float)
MOVIES = ["复仇者联盟", "速度与激情", "星际穿越", "火星救援",
"变形金刚", "降临", "肖申克的救赎", "泰坦尼克号"]
USERS = [f"用户{i}" for i in range(10)]
N_USERS, N_ITEMS = RATINGS.shape
def cosine_similarity(a: np.ndarray, b: np.ndarray) -> float:
"""计算两个向量的余弦相似度(忽略0值)。"""
mask = (a != 0) & (b != 0)
if mask.sum() == 0:
return 0.0
a_m, b_m = a[mask], b[mask]
denom = np.linalg.norm(a_m) * np.linalg.norm(b_m)
return float(np.dot(a_m, b_m) / denom) if denom > 0 else 0.0
def pearson_similarity(a: np.ndarray, b: np.ndarray) -> float:
"""皮尔逊相关系数(考虑评分偏差)。"""
mask = (a != 0) & (b != 0)
if mask.sum() < 2:
return 0.0
a_m, b_m = a[mask], b[mask]
a_c = a_m - a_m.mean()
b_c = b_m - b_m.mean()
denom = np.linalg.norm(a_c) * np.linalg.norm(b_c)
return float(np.dot(a_c, b_c) / denom) if denom > 0 else 0.0
def section(title: str) -> None:
print(f"\n{'='*62}\n {title}\n{'='*62}")
class UserCF:
"""基于用户的协同过滤。"""
def __init__(self, ratings: np.ndarray, k: int = 3) -> None:
self.R = ratings.copy()
self.k = k
n = ratings.shape[0]
# 预计算用户相似度矩阵
self.sim = np.zeros((n, n))
for i in range(n):
for j in range(i + 1, n):
s = pearson_similarity(ratings[i], ratings[j])
self.sim[i, j] = self.sim[j, i] = s
def predict(self, user_id: int, item_id: int) -> float:
"""预测 user_id 对 item_id 的评分。"""
# 找 K 个最相似且评过该物品的用户
sims = self.sim[user_id].copy()
sims[user_id] = -1 # 排除自身
# 只考虑评过该物品的用户
rated_mask = self.R[:, item_id] > 0
sims[~rated_mask] = -1
top_k = np.argsort(sims)[-self.k:][::-1]
top_k = [u for u in top_k if sims[u] > 0]
if not top_k:
# 回退:返回该物品的平均分
rated = self.R[:, item_id]
return float(rated[rated > 0].mean()) if (rated > 0).any() else 3.0
# 加权平均
user_mean = self.R[user_id][self.R[user_id] > 0].mean()
num, denom = 0.0, 0.0
for u in top_k:
u_mean = self.R[u][self.R[u] > 0].mean()
num += sims[u] * (self.R[u, item_id] - u_mean)
denom += abs(sims[u])
return float(np.clip(user_mean + (num / denom if denom > 0 else 0), 1, 5))
def recommend(self, user_id: int, top_n: int = 3) -> list[tuple[str, float]]:
"""为用户推荐 top_n 部未看过的电影。"""
unseen = np.where(self.R[user_id] == 0)[0]
scores = [(MOVIES[i], self.predict(user_id, i)) for i in unseen]
return sorted(scores, key=lambda x: x[1], reverse=True)[:top_n]
class ItemCF:
"""基于物品的协同过滤。"""
def __init__(self, ratings: np.ndarray, k: int = 3) -> None:
self.R = ratings.copy()
self.k = k
n = ratings.shape[1]
self.sim = np.zeros((n, n))
for i in range(n):
for j in range(i + 1, n):
s = cosine_similarity(ratings[:, i], ratings[:, j])
self.sim[i, j] = self.sim[j, i] = s
def predict(self, user_id: int, item_id: int) -> float:
sims = self.sim[item_id].copy()
sims[item_id] = -1
rated_mask = self.R[user_id] > 0
sims[~rated_mask] = -1
top_k = np.argsort(sims)[-self.k:][::-1]
top_k = [i for i in top_k if sims[i] > 0]
if not top_k:
rated = self.R[:, item_id]
return float(rated[rated > 0].mean()) if (rated > 0).any() else 3.0
num = sum(sims[i] * self.R[user_id, i] for i in top_k)
denom = sum(abs(sims[i]) for i in top_k)
return float(np.clip(num / denom if denom > 0 else 3.0, 1, 5))
def recommend(self, user_id: int, top_n: int = 3) -> list[tuple[str, float]]:
unseen = np.where(self.R[user_id] == 0)[0]
scores = [(MOVIES[i], self.predict(user_id, i)) for i in unseen]
return sorted(scores, key=lambda x: x[1], reverse=True)[:top_n]
class MatrixFactorization:
"""
基于梯度下降的矩阵分解:
R ≈ P · Q^T,其中 P(n_users, k),Q(n_items, k)
"""
def __init__(self, n_users: int, n_items: int, k: int = 8,
lr: float = 0.01, reg: float = 0.02, seed: int = 42) -> None:
rng = np.random.RandomState(seed)
self.P = rng.randn(n_users, k) * 0.1 # 用户隐向量
self.Q = rng.randn(n_items, k) * 0.1 # 物品隐向量
self.lr = lr
self.reg = reg
def fit(self, R: np.ndarray, epochs: int = 200) -> list[float]:
"""在非零评分上训练。"""
rows, cols = np.where(R > 0)
loss_history: list[float] = []
for epoch in range(epochs):
# shuffle
idx = np.random.permutation(len(rows))
total_loss = 0.0
for i in idx:
u, v = rows[i], cols[i]
pred = self.P[u] @ self.Q[v]
err = R[u, v] - pred
# 梯度更新(含 L2 正则)
self.P[u] += self.lr * (err * self.Q[v] - self.reg * self.P[u])
self.Q[v] += self.lr * (err * self.P[u] - self.reg * self.Q[v])
total_loss += err ** 2
loss_history.append(total_loss / len(rows))
return loss_history
def predict(self, user_id: int, item_id: int) -> float:
return float(np.clip(self.P[user_id] @ self.Q[item_id], 1, 5))
def recommend(self, user_id: int, R: np.ndarray,
top_n: int = 3) -> list[tuple[str, float]]:
unseen = np.where(R[user_id] == 0)[0]
scores = [(MOVIES[i], self.predict(user_id, i)) for i in unseen]
return sorted(scores, key=lambda x: x[1], reverse=True)[:top_n]
def mode_eval() -> None:
section("评估指标对比(留一法验证)")
# 留出10%已知评分作为测试集
rng = np.random.RandomState(42)
known = list(zip(*np.where(RATINGS > 0)))
rng.shuffle(known)
test_size = max(1, len(known) // 10)
test_set = known[:test_size]
R_train = RATINGS.copy()
for u, v in test_set:
R_train[u, v] = 0
# 训练三个模型
ucf = UserCF(R_train, k=3)
icf = ItemCF(R_train, k=3)
mf = MatrixFactorization(N_USERS, N_ITEMS, k=8)
mf.fit(R_train, epochs=200)
def rmse(model, test: list) -> float:
errors = [(RATINGS[u,v] - model.predict(u, v))**2 for u, v in test]
return float(np.sqrt(np.mean(errors)))
def mae(model, test: list) -> float:
errors = [abs(RATINGS[u,v] - model.predict(u, v)) for u, v in test]
return float(np.mean(errors))
print(f"\n 测试集大小: {test_size} 个评分\n")
print(f" {'模型':<20} {'RMSE':<10} {'MAE':<10} 说明")
print(f" {'─'*55}")
for name, model in [("UserCF (k=3)", ucf), ("ItemCF (k=3)", icf), ("矩阵分解 (k=8)", mf)]:
r = rmse(model, test_set)
m = mae(model, test_set)
print(f" {name:<20} {r:.4f} {m:.4f} {'越小越好'}")
print(f"\n 冷启动问题分析:")
print(f" ├── 新用户(无历史): UserCF/ItemCF 无法推荐 → 用热门榜/内容推荐")
print(f" ├── 新物品(无评分): 协同过滤无法推荐 → 用物品属性相似度")
print(f" └── 矩阵分解: 需要重新训练才能加入新用户/物品")
mode_eval()
Step 9:用 main 把 usercf/itemcf/mf/eval 做成命令行入口
痛点与机制:
最终脚本要让读者像使用工具一样运行,而不是进源码里改函数调用。--mode usercf 看用户协同过滤,--mode itemcf 看物品协同过滤,--mode mf 看矩阵分解,--mode eval 看评估指标。
核心源码(逐字来自文末完整源码):
def main() -> None:
parser = argparse.ArgumentParser(description="推荐系统:协同过滤与矩阵分解")
parser.add_argument(
"--mode",
choices=["usercf", "itemcf", "mf", "eval", "all"],
default="all",
)
args = parser.parse_args()
dispatch = {
"usercf": mode_usercf,
"itemcf": mode_itemcf,
"mf": mode_mf,
"eval": mode_eval,
"all": lambda: [mode_usercf(), mode_itemcf(), mode_mf(), mode_eval()],
}
dispatch[args.mode]()
可运行演示(补齐 Mock 数据与 print 反馈):
import argparse
def main() -> None:
parser = argparse.ArgumentParser(description="推荐系统:协同过滤与矩阵分解")
parser.add_argument(
"--mode",
choices=["usercf", "itemcf", "mf", "eval", "all"],
default="all",
)
args = parser.parse_args()
dispatch = {
"usercf": mode_usercf,
"itemcf": mode_itemcf,
"mf": mode_mf,
"eval": mode_eval,
"all": lambda: [mode_usercf(), mode_itemcf(), mode_mf(), mode_eval()],
}
dispatch[args.mode]()
def mode_usercf() -> None:
print("运行用户协同过滤")
def mode_itemcf() -> None:
print("运行物品协同过滤")
def mode_mf() -> None:
print("运行矩阵分解")
def mode_eval() -> None:
print("运行评估指标对比")
import sys
for mode in ["usercf", "itemcf", "mf", "eval"]:
print(f"\n$ python 52-python-recommender.py --mode {mode}")
sys.argv = ["52-python-recommender.py", "--mode", mode]
main()
极客实战:完整源码与运行
现在,把上面的积木拼起来,将下面完整代码保存为 52-python-recommender.py。它内置 10 个用户、8 部电影的评分矩阵,不需要准备外部数据,就能跑通 UserCF、ItemCF、矩阵分解和评估指标。
#!/usr/bin/env python3
"""
52-python-recommender.py — 推荐系统:协同过滤与矩阵分解
用法:
python3 52-python-recommender.py --mode usercf # 用户协同过滤
python3 52-python-recommender.py --mode itemcf # 物品协同过滤
python3 52-python-recommender.py --mode mf # 矩阵分解(梯度下降)
python3 52-python-recommender.py --mode eval # 评估指标对比
python3 52-python-recommender.py --mode all # 全部(默认)
零外部依赖(仅 numpy),直接运行。
"""
import argparse
from typing import Optional
import numpy as np
# 10个用户,8部电影,评分1-5,0表示未评分
RATINGS = np.array([
[5, 4, 0, 0, 1, 0, 0, 2], # 用户0:喜欢动作片
[0, 0, 4, 5, 0, 1, 0, 0], # 用户1:喜欢科幻片
[4, 0, 0, 3, 2, 0, 5, 0], # 用户2:喜欢动作+剧情
[0, 5, 4, 0, 0, 3, 0, 0], # 用户3:喜欢科幻+剧情
[1, 0, 0, 4, 5, 0, 0, 3], # 用户4:喜欢科幻
[5, 3, 0, 0, 1, 0, 4, 0], # 用户5:喜欢动作+剧情
[0, 0, 5, 4, 0, 2, 0, 0], # 用户6:喜欢科幻
[4, 5, 0, 0, 2, 0, 3, 0], # 用户7:喜欢动作
[0, 0, 3, 5, 0, 4, 0, 1], # 用户8:喜欢科幻+剧情
[3, 0, 0, 0, 4, 0, 5, 2], # 用户9:喜欢动作+剧情
], dtype=float)
MOVIES = ["复仇者联盟", "速度与激情", "星际穿越", "火星救援",
"变形金刚", "降临", "肖申克的救赎", "泰坦尼克号"]
USERS = [f"用户{i}" for i in range(10)]
N_USERS, N_ITEMS = RATINGS.shape
# ─── 工具函数 ──────────────────────────────────────────────────────────────────
def cosine_similarity(a: np.ndarray, b: np.ndarray) -> float:
"""计算两个向量的余弦相似度(忽略0值)。"""
mask = (a != 0) & (b != 0)
if mask.sum() == 0:
return 0.0
a_m, b_m = a[mask], b[mask]
denom = np.linalg.norm(a_m) * np.linalg.norm(b_m)
return float(np.dot(a_m, b_m) / denom) if denom > 0 else 0.0
def pearson_similarity(a: np.ndarray, b: np.ndarray) -> float:
"""皮尔逊相关系数(考虑评分偏差)。"""
mask = (a != 0) & (b != 0)
if mask.sum() < 2:
return 0.0
a_m, b_m = a[mask], b[mask]
a_c = a_m - a_m.mean()
b_c = b_m - b_m.mean()
denom = np.linalg.norm(a_c) * np.linalg.norm(b_c)
return float(np.dot(a_c, b_c) / denom) if denom > 0 else 0.0
def section(title: str) -> None:
print(f"\n{'='*62}\n {title}\n{'='*62}")
# ─── 用户协同过滤 ──────────────────────────────────────────────────────────────
class UserCF:
"""基于用户的协同过滤。"""
def __init__(self, ratings: np.ndarray, k: int = 3) -> None:
self.R = ratings.copy()
self.k = k
n = ratings.shape[0]
# 预计算用户相似度矩阵
self.sim = np.zeros((n, n))
for i in range(n):
for j in range(i + 1, n):
s = pearson_similarity(ratings[i], ratings[j])
self.sim[i, j] = self.sim[j, i] = s
def predict(self, user_id: int, item_id: int) -> float:
"""预测 user_id 对 item_id 的评分。"""
# 找 K 个最相似且评过该物品的用户
sims = self.sim[user_id].copy()
sims[user_id] = -1 # 排除自身
# 只考虑评过该物品的用户
rated_mask = self.R[:, item_id] > 0
sims[~rated_mask] = -1
top_k = np.argsort(sims)[-self.k:][::-1]
top_k = [u for u in top_k if sims[u] > 0]
if not top_k:
# 回退:返回该物品的平均分
rated = self.R[:, item_id]
return float(rated[rated > 0].mean()) if (rated > 0).any() else 3.0
# 加权平均
user_mean = self.R[user_id][self.R[user_id] > 0].mean()
num, denom = 0.0, 0.0
for u in top_k:
u_mean = self.R[u][self.R[u] > 0].mean()
num += sims[u] * (self.R[u, item_id] - u_mean)
denom += abs(sims[u])
return float(np.clip(user_mean + (num / denom if denom > 0 else 0), 1, 5))
def recommend(self, user_id: int, top_n: int = 3) -> list[tuple[str, float]]:
"""为用户推荐 top_n 部未看过的电影。"""
unseen = np.where(self.R[user_id] == 0)[0]
scores = [(MOVIES[i], self.predict(user_id, i)) for i in unseen]
return sorted(scores, key=lambda x: x[1], reverse=True)[:top_n]
# ─── 物品协同过滤 ──────────────────────────────────────────────────────────────
class ItemCF:
"""基于物品的协同过滤。"""
def __init__(self, ratings: np.ndarray, k: int = 3) -> None:
self.R = ratings.copy()
self.k = k
n = ratings.shape[1]
self.sim = np.zeros((n, n))
for i in range(n):
for j in range(i + 1, n):
s = cosine_similarity(ratings[:, i], ratings[:, j])
self.sim[i, j] = self.sim[j, i] = s
def predict(self, user_id: int, item_id: int) -> float:
sims = self.sim[item_id].copy()
sims[item_id] = -1
rated_mask = self.R[user_id] > 0
sims[~rated_mask] = -1
top_k = np.argsort(sims)[-self.k:][::-1]
top_k = [i for i in top_k if sims[i] > 0]
if not top_k:
rated = self.R[:, item_id]
return float(rated[rated > 0].mean()) if (rated > 0).any() else 3.0
num = sum(sims[i] * self.R[user_id, i] for i in top_k)
denom = sum(abs(sims[i]) for i in top_k)
return float(np.clip(num / denom if denom > 0 else 3.0, 1, 5))
def recommend(self, user_id: int, top_n: int = 3) -> list[tuple[str, float]]:
unseen = np.where(self.R[user_id] == 0)[0]
scores = [(MOVIES[i], self.predict(user_id, i)) for i in unseen]
return sorted(scores, key=lambda x: x[1], reverse=True)[:top_n]
# ─── 矩阵分解(梯度下降 SVD)─────────────────────────────────────────────────
class MatrixFactorization:
"""
基于梯度下降的矩阵分解:
R ≈ P · Q^T,其中 P(n_users, k),Q(n_items, k)
"""
def __init__(self, n_users: int, n_items: int, k: int = 8,
lr: float = 0.01, reg: float = 0.02, seed: int = 42) -> None:
rng = np.random.RandomState(seed)
self.P = rng.randn(n_users, k) * 0.1 # 用户隐向量
self.Q = rng.randn(n_items, k) * 0.1 # 物品隐向量
self.lr = lr
self.reg = reg
def fit(self, R: np.ndarray, epochs: int = 200) -> list[float]:
"""在非零评分上训练。"""
rows, cols = np.where(R > 0)
loss_history: list[float] = []
for epoch in range(epochs):
# shuffle
idx = np.random.permutation(len(rows))
total_loss = 0.0
for i in idx:
u, v = rows[i], cols[i]
pred = self.P[u] @ self.Q[v]
err = R[u, v] - pred
# 梯度更新(含 L2 正则)
self.P[u] += self.lr * (err * self.Q[v] - self.reg * self.P[u])
self.Q[v] += self.lr * (err * self.P[u] - self.reg * self.Q[v])
total_loss += err ** 2
loss_history.append(total_loss / len(rows))
return loss_history
def predict(self, user_id: int, item_id: int) -> float:
return float(np.clip(self.P[user_id] @ self.Q[item_id], 1, 5))
def recommend(self, user_id: int, R: np.ndarray,
top_n: int = 3) -> list[tuple[str, float]]:
unseen = np.where(R[user_id] == 0)[0]
scores = [(MOVIES[i], self.predict(user_id, i)) for i in unseen]
return sorted(scores, key=lambda x: x[1], reverse=True)[:top_n]
# ─── 模式1:用户协同过滤 ───────────────────────────────────────────────────────
def mode_usercf() -> None:
section("用户协同过滤(UserCF)")
ucf = UserCF(RATINGS, k=3)
# 显示用户相似度矩阵(前5个用户)
print("\n 用户相似度矩阵(皮尔逊,前5个用户):")
print(f" {'':8}" + "".join(f"{USERS[j]:>8}" for j in range(5)))
for i in range(5):
row = "".join(f"{ucf.sim[i,j]:8.3f}" for j in range(5))
print(f" {USERS[i]:8}{row}")
# 为用户0和用户1推荐
print(f"\n 推荐结果:")
for uid in [0, 1, 2]:
recs = ucf.recommend(uid, top_n=3)
seen = [MOVIES[i] for i in range(N_ITEMS) if RATINGS[uid, i] > 0]
print(f"\n {USERS[uid]}(已看: {', '.join(seen[:3])}...)")
for movie, score in recs:
bar = "★" * int(score)
print(f" 推荐: {movie:<12} 预测评分: {score:.2f} {bar}")
# ─── 模式2:物品协同过滤 ───────────────────────────────────────────────────────
def mode_itemcf() -> None:
section("物品协同过滤(ItemCF)")
icf = ItemCF(RATINGS, k=3)
print("\n 物品相似度矩阵(余弦):")
print(f" {'':14}" + "".join(f"{m[:4]:>6}" for m in MOVIES))
for i, movie in enumerate(MOVIES):
row = "".join(f"{icf.sim[i,j]:6.2f}" for j in range(N_ITEMS))
print(f" {movie[:12]:<14}{row}")
print(f"\n 推荐结果(用户0 vs 用户1):")
for uid in [0, 1]:
recs = icf.recommend(uid, top_n=3)
print(f"\n {USERS[uid]}:")
for movie, score in recs:
print(f" {movie:<12} 预测评分: {score:.2f}")
# ─── 模式3:矩阵分解 ───────────────────────────────────────────────────────────
def mode_mf() -> None:
section("矩阵分解(梯度下降 SVD,k=8 隐因子)")
mf = MatrixFactorization(N_USERS, N_ITEMS, k=8, lr=0.01, reg=0.02)
history = mf.fit(RATINGS, epochs=300)
# 损失曲线
print("\n 训练损失曲线(每60轮):")
checkpoints = [0, 60, 120, 180, 240, 299]
max_loss = history[0]
W = 35
for ep in checkpoints:
loss = history[ep]
bar = "█" * int(loss / max_loss * W)
print(f" epoch {ep+1:>3} │{bar:<{W}}│ {loss:.4f}")
print(f"\n 推荐结果(用户0、1、2):")
for uid in [0, 1, 2]:
recs = mf.recommend(uid, RATINGS, top_n=3)
print(f"\n {USERS[uid]}:")
for movie, score in recs:
bar = "★" * int(score)
print(f" {movie:<12} 预测评分: {score:.2f} {bar}")
# ─── 模式4:评估指标对比 ───────────────────────────────────────────────────────
def mode_eval() -> None:
section("评估指标对比(留一法验证)")
# 留出10%已知评分作为测试集
rng = np.random.RandomState(42)
known = list(zip(*np.where(RATINGS > 0)))
rng.shuffle(known)
test_size = max(1, len(known) // 10)
test_set = known[:test_size]
R_train = RATINGS.copy()
for u, v in test_set:
R_train[u, v] = 0
# 训练三个模型
ucf = UserCF(R_train, k=3)
icf = ItemCF(R_train, k=3)
mf = MatrixFactorization(N_USERS, N_ITEMS, k=8)
mf.fit(R_train, epochs=200)
def rmse(model, test: list) -> float:
errors = [(RATINGS[u,v] - model.predict(u, v))**2 for u, v in test]
return float(np.sqrt(np.mean(errors)))
def mae(model, test: list) -> float:
errors = [abs(RATINGS[u,v] - model.predict(u, v)) for u, v in test]
return float(np.mean(errors))
print(f"\n 测试集大小: {test_size} 个评分\n")
print(f" {'模型':<20} {'RMSE':<10} {'MAE':<10} 说明")
print(f" {'─'*55}")
for name, model in [("UserCF (k=3)", ucf), ("ItemCF (k=3)", icf), ("矩阵分解 (k=8)", mf)]:
r = rmse(model, test_set)
m = mae(model, test_set)
print(f" {name:<20} {r:.4f} {m:.4f} {'越小越好'}")
print(f"\n 冷启动问题分析:")
print(f" ├── 新用户(无历史): UserCF/ItemCF 无法推荐 → 用热门榜/内容推荐")
print(f" ├── 新物品(无评分): 协同过滤无法推荐 → 用物品属性相似度")
print(f" └── 矩阵分解: 需要重新训练才能加入新用户/物品")
# ─── 入口 ─────────────────────────────────────────────────────────────────────
def main() -> None:
parser = argparse.ArgumentParser(description="推荐系统:协同过滤与矩阵分解")
parser.add_argument(
"--mode",
choices=["usercf", "itemcf", "mf", "eval", "all"],
default="all",
)
args = parser.parse_args()
dispatch = {
"usercf": mode_usercf,
"itemcf": mode_itemcf,
"mf": mode_mf,
"eval": mode_eval,
"all": lambda: [mode_usercf(), mode_itemcf(), mode_mf(), mode_eval()],
}
dispatch[args.mode]()
if __name__ == "__main__":
main()
$ python 52-python-recommender.py --mode usercf
==============================================================
用户协同过滤(UserCF)
==============================================================
用户相似度矩阵(皮尔逊,前5个用户):
用户0 用户1 用户2 用户3 用户4
用户0 0.000 0.000 1.000 0.000 -0.961
用户1 0.000 0.000 0.000 1.000 0.000
用户2 1.000 0.000 0.000 0.000 -0.961
用户3 0.000 1.000 0.000 0.000 0.000
用户4 -0.961 0.000 -0.961 0.000 0.000
推荐结果:
用户0(已看: 复仇者联盟, 速度与激情, 变形金刚...)
推荐: 星际穿越 预测评分: 4.00 ★★★★
推荐: 肖申克的救赎 预测评分: 3.64 ★★★
推荐: 火星救援 预测评分: 2.50 ★★
用户1(已看: 星际穿越, 火星救援, 降临...)
推荐: 速度与激情 预测评分: 4.33 ★★★★
推荐: 肖申克的救赎 预测评分: 4.25 ★★★★
推荐: 复仇者联盟 预测评分: 3.67 ★★★
用户2(已看: 复仇者联盟, 火星救援, 变形金刚...)
推荐: 速度与激情 预测评分: 4.21 ★★★★
推荐: 星际穿越 预测评分: 4.00 ★★★★
推荐: 降临 预测评分: 2.50 ★★
$ python 52-python-recommender.py --mode mf
==============================================================
矩阵分解(梯度下降 SVD,k=8 隐因子)
==============================================================
训练损失曲线(每60轮):
epoch 1 │███████████████████████████████████│ 13.4787
epoch 61 │ │ 0.2927
epoch 121 │ │ 0.0465
epoch 181 │ │ 0.0111
epoch 241 │ │ 0.0047
epoch 300 │ │ 0.0027
推荐结果(用户0、1、2):
用户0:
肖申克的救赎 预测评分: 3.32 ★★★
星际穿越 预测评分: 2.19 ★★
火星救援 预测评分: 1.51 ★
用户1:
变形金刚 预测评分: 3.95 ★★★
速度与激情 预测评分: 3.06 ★★★
肖申克的救赎 预测评分: 2.90 ★★
用户2:
速度与激情 预测评分: 3.96 ★★★
星际穿越 预测评分: 3.38 ★★★
泰坦尼克号 预测评分: 1.79 ★
$ python 52-python-recommender.py --mode eval
==============================================================
评估指标对比(留一法验证)
==============================================================
测试集大小: 3 个评分
模型 RMSE MAE 说明
───────────────────────────────────────────────────────
UserCF (k=3) 0.8197 0.7639 越小越好
ItemCF (k=3) 0.7907 0.7229 越小越好
矩阵分解 (k=8) 1.6157 1.5877 越小越好
冷启动问题分析:
├── 新用户(无历史): UserCF/ItemCF 无法推荐 → 用热门榜/内容推荐
├── 新物品(无评分): 协同过滤无法推荐 → 用物品属性相似度
└── 矩阵分解: 需要重新训练才能加入新用户/物品
小结与 NexDo Time ⚡
这一篇你已经跑通了推荐系统的三个核心套路:UserCF 借相似用户的口味,ItemCF 借相似物品的关系,矩阵分解学习用户和电影的隐含偏好。评估时用 RMSE/MAE 看预测分和真实分的差距,冷启动问题则提醒我们:没有历史行为时,推荐系统需要热门榜、内容特征或新手引导来补位。
5 分钟微操挑战:把 UserCF(RATINGS, k=3) 和 ItemCF(RATINGS, k=3) 的 k 分别改成 1、5,运行 --mode eval,观察 RMSE/MAE 是否变好。
Don’t wait for next time, do it in the next moment.