文章

52 · 推荐系统:协同过滤与矩阵分解

#037 · 2026-04-17 · Python

🔗 知识图谱导航:阅读本文前,建议先回顾《35 · 线性代数与矩阵运算》中的向量相似度,以及《48 · 神经网络基础:从零手写前向传播与反向传播》中的梯度下降思想。本文会把这些基础用在推荐系统里。 NexDo Time · 2026-04-17 · 预计阅读 32 分钟

痛点与架构

推荐系统解决的是一个很现实的问题:用户没有看过所有电影,也没有给所有商品打分,但平台仍然要猜“这个用户接下来可能喜欢什么”。评分矩阵里大量位置都是 0,代表未知,不代表用户讨厌。

本文用一份内置的用户-电影评分矩阵,从三条路线理解推荐:UserCF 找相似用户,ItemCF 找相似物品,矩阵分解把用户和电影压缩成隐向量。它们都在回答同一个问题:如何用已知评分预测未知评分。

用户-电影评分矩阵
  -> UserCF:和我口味相似的人喜欢什么
  -> ItemCF:和我看过的电影相似的电影是什么
  -> MF:用户隐向量 · 电影隐向量 = 预测评分
  -> RMSE/MAE:预测分和真实分差多少

步步为营:核心逻辑自适应拆解

推荐系统的代码不难,但概念容易混。下面拆成 9 个步骤,先看相似度,再看三种推荐器,最后看评估和命令行入口。

Step 1:用余弦和皮尔逊判断两个用户像不像

痛点与机制

推荐系统第一步是“找相似”。余弦相似度像看两个人评分方向是不是一致,皮尔逊会先扣掉个人平均分,避免“有的人天生爱打高分、有的人天生严苛”造成误判。0 分在这里代表没看过,不参与比较。

核心源码(逐字来自文末完整源码)

def cosine_similarity(a: np.ndarray, b: np.ndarray) -> float:
    """计算两个向量的余弦相似度(忽略0值)。"""
    mask = (a != 0) & (b != 0)
    if mask.sum() == 0:
        return 0.0
    a_m, b_m = a[mask], b[mask]
    denom = np.linalg.norm(a_m) * np.linalg.norm(b_m)
    return float(np.dot(a_m, b_m) / denom) if denom > 0 else 0.0


def pearson_similarity(a: np.ndarray, b: np.ndarray) -> float:
    """皮尔逊相关系数(考虑评分偏差)。"""
    mask = (a != 0) & (b != 0)
    if mask.sum() < 2:
        return 0.0
    a_m, b_m = a[mask], b[mask]
    a_c = a_m - a_m.mean()
    b_c = b_m - b_m.mean()
    denom = np.linalg.norm(a_c) * np.linalg.norm(b_c)
    return float(np.dot(a_c, b_c) / denom) if denom > 0 else 0.0

可运行演示(补齐 Mock 数据与 print 反馈)

import numpy as np

def cosine_similarity(a: np.ndarray, b: np.ndarray) -> float:
    """计算两个向量的余弦相似度(忽略0值)。"""
    mask = (a != 0) & (b != 0)
    if mask.sum() == 0:
        return 0.0
    a_m, b_m = a[mask], b[mask]
    denom = np.linalg.norm(a_m) * np.linalg.norm(b_m)
    return float(np.dot(a_m, b_m) / denom) if denom > 0 else 0.0

def pearson_similarity(a: np.ndarray, b: np.ndarray) -> float:
    """皮尔逊相关系数(考虑评分偏差)。"""
    mask = (a != 0) & (b != 0)
    if mask.sum() < 2:
        return 0.0
    a_m, b_m = a[mask], b[mask]
    a_c = a_m - a_m.mean()
    b_c = b_m - b_m.mean()
    denom = np.linalg.norm(a_c) * np.linalg.norm(b_c)
    return float(np.dot(a_c, b_c) / denom) if denom > 0 else 0.0

alice = np.array([5, 4, 0, 0, 1], dtype=float)
bob = np.array([4, 5, 0, 0, 2], dtype=float)
charlie = np.array([0, 0, 5, 4, 1], dtype=float)

print("Alice vs Bob 余弦:", round(cosine_similarity(alice, bob), 4))
print("Alice vs Bob 皮尔逊:", round(pearson_similarity(alice, bob), 4))
print("Alice vs Charlie 余弦:", round(cosine_similarity(alice, charlie), 4))
print("直觉:共同评分越像,相似度越高;没有共同评分就无法判断。")

Step 2:用 UserCF 找相似用户并借他们的口味推荐

痛点与机制

UserCF 像问朋友:“和我口味相似的人最近喜欢什么?”它先预计算用户相似度,再找评过目标电影的相似用户,用他们偏离自身平均分的程度做加权预测。没有邻居时,就退回到电影平均分。

核心源码(逐字来自文末完整源码)

class UserCF:
    """基于用户的协同过滤。"""

    def __init__(self, ratings: np.ndarray, k: int = 3) -> None:
        self.R = ratings.copy()
        self.k = k
        n = ratings.shape[0]
        # 预计算用户相似度矩阵
        self.sim = np.zeros((n, n))
        for i in range(n):
            for j in range(i + 1, n):
                s = pearson_similarity(ratings[i], ratings[j])
                self.sim[i, j] = self.sim[j, i] = s

    def predict(self, user_id: int, item_id: int) -> float:
        """预测 user_id 对 item_id 的评分。"""
        # 找 K 个最相似且评过该物品的用户
        sims = self.sim[user_id].copy()
        sims[user_id] = -1   # 排除自身
        # 只考虑评过该物品的用户
        rated_mask = self.R[:, item_id] > 0
        sims[~rated_mask] = -1
        top_k = np.argsort(sims)[-self.k:][::-1]
        top_k = [u for u in top_k if sims[u] > 0]

        if not top_k:
            # 回退:返回该物品的平均分
            rated = self.R[:, item_id]
            return float(rated[rated > 0].mean()) if (rated > 0).any() else 3.0

        # 加权平均
        user_mean = self.R[user_id][self.R[user_id] > 0].mean()
        num, denom = 0.0, 0.0
        for u in top_k:
            u_mean = self.R[u][self.R[u] > 0].mean()
            num   += sims[u] * (self.R[u, item_id] - u_mean)
            denom += abs(sims[u])
        return float(np.clip(user_mean + (num / denom if denom > 0 else 0), 1, 5))

    def recommend(self, user_id: int, top_n: int = 3) -> list[tuple[str, float]]:
        """为用户推荐 top_n 部未看过的电影。"""
        unseen = np.where(self.R[user_id] == 0)[0]
        scores = [(MOVIES[i], self.predict(user_id, i)) for i in unseen]
        return sorted(scores, key=lambda x: x[1], reverse=True)[:top_n]

可运行演示(补齐 Mock 数据与 print 反馈)

import numpy as np

RATINGS = np.array([
    [5, 4, 0, 0, 1, 0, 0, 2],
    [0, 0, 4, 5, 0, 1, 0, 0],
    [4, 0, 0, 3, 2, 0, 5, 0],
    [0, 5, 4, 0, 0, 3, 0, 0],
    [1, 0, 0, 4, 5, 0, 0, 3],
    [5, 3, 0, 0, 1, 0, 4, 0],
    [0, 0, 5, 4, 0, 2, 0, 0],
    [4, 5, 0, 0, 2, 0, 3, 0],
    [0, 0, 3, 5, 0, 4, 0, 1],
    [3, 0, 0, 0, 4, 0, 5, 2],
], dtype=float)

MOVIES = ["复仇者联盟", "速度与激情", "星际穿越", "火星救援",
          "变形金刚", "降临", "肖申克的救赎", "泰坦尼克号"]
USERS  = [f"用户{i}" for i in range(10)]
N_USERS, N_ITEMS = RATINGS.shape

def cosine_similarity(a: np.ndarray, b: np.ndarray) -> float:
    """计算两个向量的余弦相似度(忽略0值)。"""
    mask = (a != 0) & (b != 0)
    if mask.sum() == 0:
        return 0.0
    a_m, b_m = a[mask], b[mask]
    denom = np.linalg.norm(a_m) * np.linalg.norm(b_m)
    return float(np.dot(a_m, b_m) / denom) if denom > 0 else 0.0

def pearson_similarity(a: np.ndarray, b: np.ndarray) -> float:
    """皮尔逊相关系数(考虑评分偏差)。"""
    mask = (a != 0) & (b != 0)
    if mask.sum() < 2:
        return 0.0
    a_m, b_m = a[mask], b[mask]
    a_c = a_m - a_m.mean()
    b_c = b_m - b_m.mean()
    denom = np.linalg.norm(a_c) * np.linalg.norm(b_c)
    return float(np.dot(a_c, b_c) / denom) if denom > 0 else 0.0

class UserCF:
    """基于用户的协同过滤。"""

    def __init__(self, ratings: np.ndarray, k: int = 3) -> None:
        self.R = ratings.copy()
        self.k = k
        n = ratings.shape[0]
        # 预计算用户相似度矩阵
        self.sim = np.zeros((n, n))
        for i in range(n):
            for j in range(i + 1, n):
                s = pearson_similarity(ratings[i], ratings[j])
                self.sim[i, j] = self.sim[j, i] = s

    def predict(self, user_id: int, item_id: int) -> float:
        """预测 user_id 对 item_id 的评分。"""
        # 找 K 个最相似且评过该物品的用户
        sims = self.sim[user_id].copy()
        sims[user_id] = -1   # 排除自身
        # 只考虑评过该物品的用户
        rated_mask = self.R[:, item_id] > 0
        sims[~rated_mask] = -1
        top_k = np.argsort(sims)[-self.k:][::-1]
        top_k = [u for u in top_k if sims[u] > 0]

        if not top_k:
            # 回退:返回该物品的平均分
            rated = self.R[:, item_id]
            return float(rated[rated > 0].mean()) if (rated > 0).any() else 3.0

        # 加权平均
        user_mean = self.R[user_id][self.R[user_id] > 0].mean()
        num, denom = 0.0, 0.0
        for u in top_k:
            u_mean = self.R[u][self.R[u] > 0].mean()
            num   += sims[u] * (self.R[u, item_id] - u_mean)
            denom += abs(sims[u])
        return float(np.clip(user_mean + (num / denom if denom > 0 else 0), 1, 5))

    def recommend(self, user_id: int, top_n: int = 3) -> list[tuple[str, float]]:
        """为用户推荐 top_n 部未看过的电影。"""
        unseen = np.where(self.R[user_id] == 0)[0]
        scores = [(MOVIES[i], self.predict(user_id, i)) for i in unseen]
        return sorted(scores, key=lambda x: x[1], reverse=True)[:top_n]

ucf = UserCF(RATINGS, k=3)
print("用户0与前5个用户的相似度:", np.round(ucf.sim[0, :5], 3).tolist())
print("预测 用户0 对《星际穿越》的评分:", round(ucf.predict(0, 2), 3))
print("给用户0推荐:")
for movie, score in ucf.recommend(0, top_n=3):
    print(f"  {movie} -> {score:.2f}")

Step 3:用 ItemCF 从用户看过的电影推断相似电影

痛点与机制

ItemCF 的视角从“找人”变成“找物品”。如果你喜欢《复仇者联盟》,系统会找和它评分模式相似的电影,再结合你看过的电影给未看电影打分。它像超市货架旁的“买了这个的人也喜欢”。

核心源码(逐字来自文末完整源码)

class ItemCF:
    """基于物品的协同过滤。"""

    def __init__(self, ratings: np.ndarray, k: int = 3) -> None:
        self.R = ratings.copy()
        self.k = k
        n = ratings.shape[1]
        self.sim = np.zeros((n, n))
        for i in range(n):
            for j in range(i + 1, n):
                s = cosine_similarity(ratings[:, i], ratings[:, j])
                self.sim[i, j] = self.sim[j, i] = s

    def predict(self, user_id: int, item_id: int) -> float:
        sims = self.sim[item_id].copy()
        sims[item_id] = -1
        rated_mask = self.R[user_id] > 0
        sims[~rated_mask] = -1
        top_k = np.argsort(sims)[-self.k:][::-1]
        top_k = [i for i in top_k if sims[i] > 0]

        if not top_k:
            rated = self.R[:, item_id]
            return float(rated[rated > 0].mean()) if (rated > 0).any() else 3.0

        num = sum(sims[i] * self.R[user_id, i] for i in top_k)
        denom = sum(abs(sims[i]) for i in top_k)
        return float(np.clip(num / denom if denom > 0 else 3.0, 1, 5))

    def recommend(self, user_id: int, top_n: int = 3) -> list[tuple[str, float]]:
        unseen = np.where(self.R[user_id] == 0)[0]
        scores = [(MOVIES[i], self.predict(user_id, i)) for i in unseen]
        return sorted(scores, key=lambda x: x[1], reverse=True)[:top_n]

可运行演示(补齐 Mock 数据与 print 反馈)

import numpy as np

RATINGS = np.array([
    [5, 4, 0, 0, 1, 0, 0, 2],
    [0, 0, 4, 5, 0, 1, 0, 0],
    [4, 0, 0, 3, 2, 0, 5, 0],
    [0, 5, 4, 0, 0, 3, 0, 0],
    [1, 0, 0, 4, 5, 0, 0, 3],
    [5, 3, 0, 0, 1, 0, 4, 0],
    [0, 0, 5, 4, 0, 2, 0, 0],
    [4, 5, 0, 0, 2, 0, 3, 0],
    [0, 0, 3, 5, 0, 4, 0, 1],
    [3, 0, 0, 0, 4, 0, 5, 2],
], dtype=float)

MOVIES = ["复仇者联盟", "速度与激情", "星际穿越", "火星救援",
          "变形金刚", "降临", "肖申克的救赎", "泰坦尼克号"]
USERS  = [f"用户{i}" for i in range(10)]
N_USERS, N_ITEMS = RATINGS.shape

def cosine_similarity(a: np.ndarray, b: np.ndarray) -> float:
    """计算两个向量的余弦相似度(忽略0值)。"""
    mask = (a != 0) & (b != 0)
    if mask.sum() == 0:
        return 0.0
    a_m, b_m = a[mask], b[mask]
    denom = np.linalg.norm(a_m) * np.linalg.norm(b_m)
    return float(np.dot(a_m, b_m) / denom) if denom > 0 else 0.0

def pearson_similarity(a: np.ndarray, b: np.ndarray) -> float:
    """皮尔逊相关系数(考虑评分偏差)。"""
    mask = (a != 0) & (b != 0)
    if mask.sum() < 2:
        return 0.0
    a_m, b_m = a[mask], b[mask]
    a_c = a_m - a_m.mean()
    b_c = b_m - b_m.mean()
    denom = np.linalg.norm(a_c) * np.linalg.norm(b_c)
    return float(np.dot(a_c, b_c) / denom) if denom > 0 else 0.0

class ItemCF:
    """基于物品的协同过滤。"""

    def __init__(self, ratings: np.ndarray, k: int = 3) -> None:
        self.R = ratings.copy()
        self.k = k
        n = ratings.shape[1]
        self.sim = np.zeros((n, n))
        for i in range(n):
            for j in range(i + 1, n):
                s = cosine_similarity(ratings[:, i], ratings[:, j])
                self.sim[i, j] = self.sim[j, i] = s

    def predict(self, user_id: int, item_id: int) -> float:
        sims = self.sim[item_id].copy()
        sims[item_id] = -1
        rated_mask = self.R[user_id] > 0
        sims[~rated_mask] = -1
        top_k = np.argsort(sims)[-self.k:][::-1]
        top_k = [i for i in top_k if sims[i] > 0]

        if not top_k:
            rated = self.R[:, item_id]
            return float(rated[rated > 0].mean()) if (rated > 0).any() else 3.0

        num = sum(sims[i] * self.R[user_id, i] for i in top_k)
        denom = sum(abs(sims[i]) for i in top_k)
        return float(np.clip(num / denom if denom > 0 else 3.0, 1, 5))

    def recommend(self, user_id: int, top_n: int = 3) -> list[tuple[str, float]]:
        unseen = np.where(self.R[user_id] == 0)[0]
        scores = [(MOVIES[i], self.predict(user_id, i)) for i in unseen]
        return sorted(scores, key=lambda x: x[1], reverse=True)[:top_n]

icf = ItemCF(RATINGS, k=3)
print("电影0与所有电影相似度:", np.round(icf.sim[0], 3).tolist())
print("预测 用户1 对《复仇者联盟》的评分:", round(icf.predict(1, 0), 3))
print("给用户1推荐:")
for movie, score in icf.recommend(1, top_n=3):
    print(f"  {movie} -> {score:.2f}")

Step 4:用 MatrixFactorization 学出用户和电影的隐向量

痛点与机制

矩阵分解把大评分表拆成两个小表:用户隐向量 P 和电影隐向量 Q。可以把隐向量想成看不见的口味标签,比如动作、科幻、剧情权重。用户向量和电影向量越对胃口,点积预测分越高。

核心源码(逐字来自文末完整源码)

class MatrixFactorization:
    """
    基于梯度下降的矩阵分解:
    R ≈ P · Q^T,其中 P(n_users, k),Q(n_items, k)
    """

    def __init__(self, n_users: int, n_items: int, k: int = 8,
                 lr: float = 0.01, reg: float = 0.02, seed: int = 42) -> None:
        rng = np.random.RandomState(seed)
        self.P = rng.randn(n_users, k) * 0.1   # 用户隐向量
        self.Q = rng.randn(n_items, k) * 0.1   # 物品隐向量
        self.lr = lr
        self.reg = reg

    def fit(self, R: np.ndarray, epochs: int = 200) -> list[float]:
        """在非零评分上训练。"""
        rows, cols = np.where(R > 0)
        loss_history: list[float] = []

        for epoch in range(epochs):
            # shuffle
            idx = np.random.permutation(len(rows))
            total_loss = 0.0
            for i in idx:
                u, v = rows[i], cols[i]
                pred = self.P[u] @ self.Q[v]
                err = R[u, v] - pred
                # 梯度更新(含 L2 正则)
                self.P[u] += self.lr * (err * self.Q[v] - self.reg * self.P[u])
                self.Q[v] += self.lr * (err * self.P[u] - self.reg * self.Q[v])
                total_loss += err ** 2
            loss_history.append(total_loss / len(rows))

        return loss_history

    def predict(self, user_id: int, item_id: int) -> float:
        return float(np.clip(self.P[user_id] @ self.Q[item_id], 1, 5))

    def recommend(self, user_id: int, R: np.ndarray,
                  top_n: int = 3) -> list[tuple[str, float]]:
        unseen = np.where(R[user_id] == 0)[0]
        scores = [(MOVIES[i], self.predict(user_id, i)) for i in unseen]
        return sorted(scores, key=lambda x: x[1], reverse=True)[:top_n]

可运行演示(补齐 Mock 数据与 print 反馈)

import numpy as np

RATINGS = np.array([
    [5, 4, 0, 0, 1, 0, 0, 2],
    [0, 0, 4, 5, 0, 1, 0, 0],
    [4, 0, 0, 3, 2, 0, 5, 0],
    [0, 5, 4, 0, 0, 3, 0, 0],
    [1, 0, 0, 4, 5, 0, 0, 3],
    [5, 3, 0, 0, 1, 0, 4, 0],
    [0, 0, 5, 4, 0, 2, 0, 0],
    [4, 5, 0, 0, 2, 0, 3, 0],
    [0, 0, 3, 5, 0, 4, 0, 1],
    [3, 0, 0, 0, 4, 0, 5, 2],
], dtype=float)

MOVIES = ["复仇者联盟", "速度与激情", "星际穿越", "火星救援",
          "变形金刚", "降临", "肖申克的救赎", "泰坦尼克号"]
USERS  = [f"用户{i}" for i in range(10)]
N_USERS, N_ITEMS = RATINGS.shape

class MatrixFactorization:
    """
    基于梯度下降的矩阵分解:
    R ≈ P · Q^T,其中 P(n_users, k),Q(n_items, k)
    """

    def __init__(self, n_users: int, n_items: int, k: int = 8,
                 lr: float = 0.01, reg: float = 0.02, seed: int = 42) -> None:
        rng = np.random.RandomState(seed)
        self.P = rng.randn(n_users, k) * 0.1   # 用户隐向量
        self.Q = rng.randn(n_items, k) * 0.1   # 物品隐向量
        self.lr = lr
        self.reg = reg

    def fit(self, R: np.ndarray, epochs: int = 200) -> list[float]:
        """在非零评分上训练。"""
        rows, cols = np.where(R > 0)
        loss_history: list[float] = []

        for epoch in range(epochs):
            # shuffle
            idx = np.random.permutation(len(rows))
            total_loss = 0.0
            for i in idx:
                u, v = rows[i], cols[i]
                pred = self.P[u] @ self.Q[v]
                err = R[u, v] - pred
                # 梯度更新(含 L2 正则)
                self.P[u] += self.lr * (err * self.Q[v] - self.reg * self.P[u])
                self.Q[v] += self.lr * (err * self.P[u] - self.reg * self.Q[v])
                total_loss += err ** 2
            loss_history.append(total_loss / len(rows))

        return loss_history

    def predict(self, user_id: int, item_id: int) -> float:
        return float(np.clip(self.P[user_id] @ self.Q[item_id], 1, 5))

    def recommend(self, user_id: int, R: np.ndarray,
                  top_n: int = 3) -> list[tuple[str, float]]:
        unseen = np.where(R[user_id] == 0)[0]
        scores = [(MOVIES[i], self.predict(user_id, i)) for i in unseen]
        return sorted(scores, key=lambda x: x[1], reverse=True)[:top_n]

mf = MatrixFactorization(N_USERS, N_ITEMS, k=4, lr=0.01, reg=0.02)
history = mf.fit(RATINGS, epochs=20)
print("前 5 轮损失:", [round(v, 4) for v in history[:5]])
print("最后 5 轮损失:", [round(v, 4) for v in history[-5:]])
print("预测 用户0 对《星际穿越》的评分:", round(mf.predict(0, 2), 3))
print("用户隐向量形状:", mf.P.shape, "物品隐向量形状:", mf.Q.shape)

Step 5:用 mode_usercf 打印用户相似度矩阵和推荐结果

痛点与机制

单个预测看不出全局效果,所以 mode_usercf() 会打印前 5 个用户的相似度矩阵,再给多个用户推荐电影。矩阵里的数值越高,代表两个用户在共同评分电影上的口味越接近。

核心源码(逐字来自文末完整源码)

def mode_usercf() -> None:
    section("用户协同过滤(UserCF)")
    ucf = UserCF(RATINGS, k=3)

    # 显示用户相似度矩阵(前5个用户)
    print("\n  用户相似度矩阵(皮尔逊,前5个用户):")
    print(f"  {'':8}" + "".join(f"{USERS[j]:>8}" for j in range(5)))
    for i in range(5):
        row = "".join(f"{ucf.sim[i,j]:8.3f}" for j in range(5))
        print(f"  {USERS[i]:8}{row}")

    # 为用户0和用户1推荐
    print(f"\n  推荐结果:")
    for uid in [0, 1, 2]:
        recs = ucf.recommend(uid, top_n=3)
        seen = [MOVIES[i] for i in range(N_ITEMS) if RATINGS[uid, i] > 0]
        print(f"\n  {USERS[uid]}(已看: {', '.join(seen[:3])}...)")
        for movie, score in recs:
            bar = "★" * int(score)
            print(f"    推荐: {movie:<12} 预测评分: {score:.2f}  {bar}")

可运行演示(补齐 Mock 数据与 print 反馈)

import numpy as np

RATINGS = np.array([
    [5, 4, 0, 0, 1, 0, 0, 2],
    [0, 0, 4, 5, 0, 1, 0, 0],
    [4, 0, 0, 3, 2, 0, 5, 0],
    [0, 5, 4, 0, 0, 3, 0, 0],
    [1, 0, 0, 4, 5, 0, 0, 3],
    [5, 3, 0, 0, 1, 0, 4, 0],
    [0, 0, 5, 4, 0, 2, 0, 0],
    [4, 5, 0, 0, 2, 0, 3, 0],
    [0, 0, 3, 5, 0, 4, 0, 1],
    [3, 0, 0, 0, 4, 0, 5, 2],
], dtype=float)

MOVIES = ["复仇者联盟", "速度与激情", "星际穿越", "火星救援",
          "变形金刚", "降临", "肖申克的救赎", "泰坦尼克号"]
USERS  = [f"用户{i}" for i in range(10)]
N_USERS, N_ITEMS = RATINGS.shape

def cosine_similarity(a: np.ndarray, b: np.ndarray) -> float:
    """计算两个向量的余弦相似度(忽略0值)。"""
    mask = (a != 0) & (b != 0)
    if mask.sum() == 0:
        return 0.0
    a_m, b_m = a[mask], b[mask]
    denom = np.linalg.norm(a_m) * np.linalg.norm(b_m)
    return float(np.dot(a_m, b_m) / denom) if denom > 0 else 0.0

def pearson_similarity(a: np.ndarray, b: np.ndarray) -> float:
    """皮尔逊相关系数(考虑评分偏差)。"""
    mask = (a != 0) & (b != 0)
    if mask.sum() < 2:
        return 0.0
    a_m, b_m = a[mask], b[mask]
    a_c = a_m - a_m.mean()
    b_c = b_m - b_m.mean()
    denom = np.linalg.norm(a_c) * np.linalg.norm(b_c)
    return float(np.dot(a_c, b_c) / denom) if denom > 0 else 0.0

def section(title: str) -> None:
    print(f"\n{'='*62}\n  {title}\n{'='*62}")

class UserCF:
    """基于用户的协同过滤。"""

    def __init__(self, ratings: np.ndarray, k: int = 3) -> None:
        self.R = ratings.copy()
        self.k = k
        n = ratings.shape[0]
        # 预计算用户相似度矩阵
        self.sim = np.zeros((n, n))
        for i in range(n):
            for j in range(i + 1, n):
                s = pearson_similarity(ratings[i], ratings[j])
                self.sim[i, j] = self.sim[j, i] = s

    def predict(self, user_id: int, item_id: int) -> float:
        """预测 user_id 对 item_id 的评分。"""
        # 找 K 个最相似且评过该物品的用户
        sims = self.sim[user_id].copy()
        sims[user_id] = -1   # 排除自身
        # 只考虑评过该物品的用户
        rated_mask = self.R[:, item_id] > 0
        sims[~rated_mask] = -1
        top_k = np.argsort(sims)[-self.k:][::-1]
        top_k = [u for u in top_k if sims[u] > 0]

        if not top_k:
            # 回退:返回该物品的平均分
            rated = self.R[:, item_id]
            return float(rated[rated > 0].mean()) if (rated > 0).any() else 3.0

        # 加权平均
        user_mean = self.R[user_id][self.R[user_id] > 0].mean()
        num, denom = 0.0, 0.0
        for u in top_k:
            u_mean = self.R[u][self.R[u] > 0].mean()
            num   += sims[u] * (self.R[u, item_id] - u_mean)
            denom += abs(sims[u])
        return float(np.clip(user_mean + (num / denom if denom > 0 else 0), 1, 5))

    def recommend(self, user_id: int, top_n: int = 3) -> list[tuple[str, float]]:
        """为用户推荐 top_n 部未看过的电影。"""
        unseen = np.where(self.R[user_id] == 0)[0]
        scores = [(MOVIES[i], self.predict(user_id, i)) for i in unseen]
        return sorted(scores, key=lambda x: x[1], reverse=True)[:top_n]

def mode_usercf() -> None:
    section("用户协同过滤(UserCF)")
    ucf = UserCF(RATINGS, k=3)

    # 显示用户相似度矩阵(前5个用户)
    print("\n  用户相似度矩阵(皮尔逊,前5个用户):")
    print(f"  {'':8}" + "".join(f"{USERS[j]:>8}" for j in range(5)))
    for i in range(5):
        row = "".join(f"{ucf.sim[i,j]:8.3f}" for j in range(5))
        print(f"  {USERS[i]:8}{row}")

    # 为用户0和用户1推荐
    print(f"\n  推荐结果:")
    for uid in [0, 1, 2]:
        recs = ucf.recommend(uid, top_n=3)
        seen = [MOVIES[i] for i in range(N_ITEMS) if RATINGS[uid, i] > 0]
        print(f"\n  {USERS[uid]}(已看: {', '.join(seen[:3])}...)")
        for movie, score in recs:
            bar = "★" * int(score)
            print(f"    推荐: {movie:<12} 预测评分: {score:.2f}  {bar}")

mode_usercf()

Step 6:用 mode_itemcf 查看电影之间的相似度网络

痛点与机制

物品相似度矩阵像电影之间的关系图:一部电影和其他电影越像,分数越高。用户已经看过的电影会成为“线索”,系统顺着相似度网络找到可能喜欢的新电影。

核心源码(逐字来自文末完整源码)

def mode_itemcf() -> None:
    section("物品协同过滤(ItemCF)")
    icf = ItemCF(RATINGS, k=3)

    print("\n  物品相似度矩阵(余弦):")
    print(f"  {'':14}" + "".join(f"{m[:4]:>6}" for m in MOVIES))
    for i, movie in enumerate(MOVIES):
        row = "".join(f"{icf.sim[i,j]:6.2f}" for j in range(N_ITEMS))
        print(f"  {movie[:12]:<14}{row}")

    print(f"\n  推荐结果(用户0 vs 用户1):")
    for uid in [0, 1]:
        recs = icf.recommend(uid, top_n=3)
        print(f"\n  {USERS[uid]}:")
        for movie, score in recs:
            print(f"    {movie:<12} 预测评分: {score:.2f}")

可运行演示(补齐 Mock 数据与 print 反馈)

import numpy as np

RATINGS = np.array([
    [5, 4, 0, 0, 1, 0, 0, 2],
    [0, 0, 4, 5, 0, 1, 0, 0],
    [4, 0, 0, 3, 2, 0, 5, 0],
    [0, 5, 4, 0, 0, 3, 0, 0],
    [1, 0, 0, 4, 5, 0, 0, 3],
    [5, 3, 0, 0, 1, 0, 4, 0],
    [0, 0, 5, 4, 0, 2, 0, 0],
    [4, 5, 0, 0, 2, 0, 3, 0],
    [0, 0, 3, 5, 0, 4, 0, 1],
    [3, 0, 0, 0, 4, 0, 5, 2],
], dtype=float)

MOVIES = ["复仇者联盟", "速度与激情", "星际穿越", "火星救援",
          "变形金刚", "降临", "肖申克的救赎", "泰坦尼克号"]
USERS  = [f"用户{i}" for i in range(10)]
N_USERS, N_ITEMS = RATINGS.shape

def cosine_similarity(a: np.ndarray, b: np.ndarray) -> float:
    """计算两个向量的余弦相似度(忽略0值)。"""
    mask = (a != 0) & (b != 0)
    if mask.sum() == 0:
        return 0.0
    a_m, b_m = a[mask], b[mask]
    denom = np.linalg.norm(a_m) * np.linalg.norm(b_m)
    return float(np.dot(a_m, b_m) / denom) if denom > 0 else 0.0

def pearson_similarity(a: np.ndarray, b: np.ndarray) -> float:
    """皮尔逊相关系数(考虑评分偏差)。"""
    mask = (a != 0) & (b != 0)
    if mask.sum() < 2:
        return 0.0
    a_m, b_m = a[mask], b[mask]
    a_c = a_m - a_m.mean()
    b_c = b_m - b_m.mean()
    denom = np.linalg.norm(a_c) * np.linalg.norm(b_c)
    return float(np.dot(a_c, b_c) / denom) if denom > 0 else 0.0

def section(title: str) -> None:
    print(f"\n{'='*62}\n  {title}\n{'='*62}")

class ItemCF:
    """基于物品的协同过滤。"""

    def __init__(self, ratings: np.ndarray, k: int = 3) -> None:
        self.R = ratings.copy()
        self.k = k
        n = ratings.shape[1]
        self.sim = np.zeros((n, n))
        for i in range(n):
            for j in range(i + 1, n):
                s = cosine_similarity(ratings[:, i], ratings[:, j])
                self.sim[i, j] = self.sim[j, i] = s

    def predict(self, user_id: int, item_id: int) -> float:
        sims = self.sim[item_id].copy()
        sims[item_id] = -1
        rated_mask = self.R[user_id] > 0
        sims[~rated_mask] = -1
        top_k = np.argsort(sims)[-self.k:][::-1]
        top_k = [i for i in top_k if sims[i] > 0]

        if not top_k:
            rated = self.R[:, item_id]
            return float(rated[rated > 0].mean()) if (rated > 0).any() else 3.0

        num = sum(sims[i] * self.R[user_id, i] for i in top_k)
        denom = sum(abs(sims[i]) for i in top_k)
        return float(np.clip(num / denom if denom > 0 else 3.0, 1, 5))

    def recommend(self, user_id: int, top_n: int = 3) -> list[tuple[str, float]]:
        unseen = np.where(self.R[user_id] == 0)[0]
        scores = [(MOVIES[i], self.predict(user_id, i)) for i in unseen]
        return sorted(scores, key=lambda x: x[1], reverse=True)[:top_n]

def mode_itemcf() -> None:
    section("物品协同过滤(ItemCF)")
    icf = ItemCF(RATINGS, k=3)

    print("\n  物品相似度矩阵(余弦):")
    print(f"  {'':14}" + "".join(f"{m[:4]:>6}" for m in MOVIES))
    for i, movie in enumerate(MOVIES):
        row = "".join(f"{icf.sim[i,j]:6.2f}" for j in range(N_ITEMS))
        print(f"  {movie[:12]:<14}{row}")

    print(f"\n  推荐结果(用户0 vs 用户1):")
    for uid in [0, 1]:
        recs = icf.recommend(uid, top_n=3)
        print(f"\n  {USERS[uid]}:")
        for movie, score in recs:
            print(f"    {movie:<12} 预测评分: {score:.2f}")

mode_itemcf()

Step 7:用 mode_mf 观察矩阵分解的损失下降

痛点与机制

矩阵分解需要训练。损失曲线像模型的错题本:每一轮都在已知评分上调整用户/电影隐向量,让预测分越来越接近真实评分。条形图变短,说明模型在已知评分上的误差变小。

核心源码(逐字来自文末完整源码)

def mode_mf() -> None:
    section("矩阵分解(梯度下降 SVD,k=8 隐因子)")
    mf = MatrixFactorization(N_USERS, N_ITEMS, k=8, lr=0.01, reg=0.02)
    history = mf.fit(RATINGS, epochs=300)

    # 损失曲线
    print("\n  训练损失曲线(每60轮):")
    checkpoints = [0, 60, 120, 180, 240, 299]
    max_loss = history[0]
    W = 35
    for ep in checkpoints:
        loss = history[ep]
        bar = "█" * int(loss / max_loss * W)
        print(f"  epoch {ep+1:>3} │{bar:<{W}}│ {loss:.4f}")

    print(f"\n  推荐结果(用户0、1、2):")
    for uid in [0, 1, 2]:
        recs = mf.recommend(uid, RATINGS, top_n=3)
        print(f"\n  {USERS[uid]}:")
        for movie, score in recs:
            bar = "★" * int(score)
            print(f"    {movie:<12} 预测评分: {score:.2f}  {bar}")

可运行演示(补齐 Mock 数据与 print 反馈)

import numpy as np

RATINGS = np.array([
    [5, 4, 0, 0, 1, 0, 0, 2],
    [0, 0, 4, 5, 0, 1, 0, 0],
    [4, 0, 0, 3, 2, 0, 5, 0],
    [0, 5, 4, 0, 0, 3, 0, 0],
    [1, 0, 0, 4, 5, 0, 0, 3],
    [5, 3, 0, 0, 1, 0, 4, 0],
    [0, 0, 5, 4, 0, 2, 0, 0],
    [4, 5, 0, 0, 2, 0, 3, 0],
    [0, 0, 3, 5, 0, 4, 0, 1],
    [3, 0, 0, 0, 4, 0, 5, 2],
], dtype=float)

MOVIES = ["复仇者联盟", "速度与激情", "星际穿越", "火星救援",
          "变形金刚", "降临", "肖申克的救赎", "泰坦尼克号"]
USERS  = [f"用户{i}" for i in range(10)]
N_USERS, N_ITEMS = RATINGS.shape

def section(title: str) -> None:
    print(f"\n{'='*62}\n  {title}\n{'='*62}")

class MatrixFactorization:
    """
    基于梯度下降的矩阵分解:
    R ≈ P · Q^T,其中 P(n_users, k),Q(n_items, k)
    """

    def __init__(self, n_users: int, n_items: int, k: int = 8,
                 lr: float = 0.01, reg: float = 0.02, seed: int = 42) -> None:
        rng = np.random.RandomState(seed)
        self.P = rng.randn(n_users, k) * 0.1   # 用户隐向量
        self.Q = rng.randn(n_items, k) * 0.1   # 物品隐向量
        self.lr = lr
        self.reg = reg

    def fit(self, R: np.ndarray, epochs: int = 200) -> list[float]:
        """在非零评分上训练。"""
        rows, cols = np.where(R > 0)
        loss_history: list[float] = []

        for epoch in range(epochs):
            # shuffle
            idx = np.random.permutation(len(rows))
            total_loss = 0.0
            for i in idx:
                u, v = rows[i], cols[i]
                pred = self.P[u] @ self.Q[v]
                err = R[u, v] - pred
                # 梯度更新(含 L2 正则)
                self.P[u] += self.lr * (err * self.Q[v] - self.reg * self.P[u])
                self.Q[v] += self.lr * (err * self.P[u] - self.reg * self.Q[v])
                total_loss += err ** 2
            loss_history.append(total_loss / len(rows))

        return loss_history

    def predict(self, user_id: int, item_id: int) -> float:
        return float(np.clip(self.P[user_id] @ self.Q[item_id], 1, 5))

    def recommend(self, user_id: int, R: np.ndarray,
                  top_n: int = 3) -> list[tuple[str, float]]:
        unseen = np.where(R[user_id] == 0)[0]
        scores = [(MOVIES[i], self.predict(user_id, i)) for i in unseen]
        return sorted(scores, key=lambda x: x[1], reverse=True)[:top_n]

def mode_mf() -> None:
    section("矩阵分解(梯度下降 SVD,k=8 隐因子)")
    mf = MatrixFactorization(N_USERS, N_ITEMS, k=8, lr=0.01, reg=0.02)
    history = mf.fit(RATINGS, epochs=300)

    # 损失曲线
    print("\n  训练损失曲线(每60轮):")
    checkpoints = [0, 60, 120, 180, 240, 299]
    max_loss = history[0]
    W = 35
    for ep in checkpoints:
        loss = history[ep]
        bar = "█" * int(loss / max_loss * W)
        print(f"  epoch {ep+1:>3}{bar:<{W}}{loss:.4f}")

    print(f"\n  推荐结果(用户0、1、2):")
    for uid in [0, 1, 2]:
        recs = mf.recommend(uid, RATINGS, top_n=3)
        print(f"\n  {USERS[uid]}:")
        for movie, score in recs:
            bar = "★" * int(score)
            print(f"    {movie:<12} 预测评分: {score:.2f}  {bar}")

mode_mf()

Step 8:用 mode_eval 用 RMSE/MAE 对比三种推荐器

痛点与机制

推荐系统不能只看“推荐列表好像还行”,还要留出一部分已知评分做测试。RMSE 更惩罚大错误,MAE 更像平均错几分。两者越小,说明模型预测评分越贴近真实用户反馈。

核心源码(逐字来自文末完整源码)

def mode_eval() -> None:
    section("评估指标对比(留一法验证)")

    # 留出10%已知评分作为测试集
    rng = np.random.RandomState(42)
    known = list(zip(*np.where(RATINGS > 0)))
    rng.shuffle(known)
    test_size = max(1, len(known) // 10)
    test_set = known[:test_size]

    R_train = RATINGS.copy()
    for u, v in test_set:
        R_train[u, v] = 0

    # 训练三个模型
    ucf = UserCF(R_train, k=3)
    icf = ItemCF(R_train, k=3)
    mf  = MatrixFactorization(N_USERS, N_ITEMS, k=8)
    mf.fit(R_train, epochs=200)

    def rmse(model, test: list) -> float:
        errors = [(RATINGS[u,v] - model.predict(u, v))**2 for u, v in test]
        return float(np.sqrt(np.mean(errors)))

    def mae(model, test: list) -> float:
        errors = [abs(RATINGS[u,v] - model.predict(u, v)) for u, v in test]
        return float(np.mean(errors))

    print(f"\n  测试集大小: {test_size} 个评分\n")
    print(f"  {'模型':<20} {'RMSE':<10} {'MAE':<10} 说明")
    print(f"  {'─'*55}")
    for name, model in [("UserCF (k=3)", ucf), ("ItemCF (k=3)", icf), ("矩阵分解 (k=8)", mf)]:
        r = rmse(model, test_set)
        m = mae(model, test_set)
        print(f"  {name:<20} {r:.4f}    {m:.4f}    {'越小越好'}")

    print(f"\n  冷启动问题分析:")
    print(f"  ├── 新用户(无历史): UserCF/ItemCF 无法推荐 → 用热门榜/内容推荐")
    print(f"  ├── 新物品(无评分): 协同过滤无法推荐 → 用物品属性相似度")
    print(f"  └── 矩阵分解: 需要重新训练才能加入新用户/物品")

可运行演示(补齐 Mock 数据与 print 反馈)

import numpy as np

RATINGS = np.array([
    [5, 4, 0, 0, 1, 0, 0, 2],
    [0, 0, 4, 5, 0, 1, 0, 0],
    [4, 0, 0, 3, 2, 0, 5, 0],
    [0, 5, 4, 0, 0, 3, 0, 0],
    [1, 0, 0, 4, 5, 0, 0, 3],
    [5, 3, 0, 0, 1, 0, 4, 0],
    [0, 0, 5, 4, 0, 2, 0, 0],
    [4, 5, 0, 0, 2, 0, 3, 0],
    [0, 0, 3, 5, 0, 4, 0, 1],
    [3, 0, 0, 0, 4, 0, 5, 2],
], dtype=float)

MOVIES = ["复仇者联盟", "速度与激情", "星际穿越", "火星救援",
          "变形金刚", "降临", "肖申克的救赎", "泰坦尼克号"]
USERS  = [f"用户{i}" for i in range(10)]
N_USERS, N_ITEMS = RATINGS.shape

def cosine_similarity(a: np.ndarray, b: np.ndarray) -> float:
    """计算两个向量的余弦相似度(忽略0值)。"""
    mask = (a != 0) & (b != 0)
    if mask.sum() == 0:
        return 0.0
    a_m, b_m = a[mask], b[mask]
    denom = np.linalg.norm(a_m) * np.linalg.norm(b_m)
    return float(np.dot(a_m, b_m) / denom) if denom > 0 else 0.0

def pearson_similarity(a: np.ndarray, b: np.ndarray) -> float:
    """皮尔逊相关系数(考虑评分偏差)。"""
    mask = (a != 0) & (b != 0)
    if mask.sum() < 2:
        return 0.0
    a_m, b_m = a[mask], b[mask]
    a_c = a_m - a_m.mean()
    b_c = b_m - b_m.mean()
    denom = np.linalg.norm(a_c) * np.linalg.norm(b_c)
    return float(np.dot(a_c, b_c) / denom) if denom > 0 else 0.0

def section(title: str) -> None:
    print(f"\n{'='*62}\n  {title}\n{'='*62}")

class UserCF:
    """基于用户的协同过滤。"""

    def __init__(self, ratings: np.ndarray, k: int = 3) -> None:
        self.R = ratings.copy()
        self.k = k
        n = ratings.shape[0]
        # 预计算用户相似度矩阵
        self.sim = np.zeros((n, n))
        for i in range(n):
            for j in range(i + 1, n):
                s = pearson_similarity(ratings[i], ratings[j])
                self.sim[i, j] = self.sim[j, i] = s

    def predict(self, user_id: int, item_id: int) -> float:
        """预测 user_id 对 item_id 的评分。"""
        # 找 K 个最相似且评过该物品的用户
        sims = self.sim[user_id].copy()
        sims[user_id] = -1   # 排除自身
        # 只考虑评过该物品的用户
        rated_mask = self.R[:, item_id] > 0
        sims[~rated_mask] = -1
        top_k = np.argsort(sims)[-self.k:][::-1]
        top_k = [u for u in top_k if sims[u] > 0]

        if not top_k:
            # 回退:返回该物品的平均分
            rated = self.R[:, item_id]
            return float(rated[rated > 0].mean()) if (rated > 0).any() else 3.0

        # 加权平均
        user_mean = self.R[user_id][self.R[user_id] > 0].mean()
        num, denom = 0.0, 0.0
        for u in top_k:
            u_mean = self.R[u][self.R[u] > 0].mean()
            num   += sims[u] * (self.R[u, item_id] - u_mean)
            denom += abs(sims[u])
        return float(np.clip(user_mean + (num / denom if denom > 0 else 0), 1, 5))

    def recommend(self, user_id: int, top_n: int = 3) -> list[tuple[str, float]]:
        """为用户推荐 top_n 部未看过的电影。"""
        unseen = np.where(self.R[user_id] == 0)[0]
        scores = [(MOVIES[i], self.predict(user_id, i)) for i in unseen]
        return sorted(scores, key=lambda x: x[1], reverse=True)[:top_n]

class ItemCF:
    """基于物品的协同过滤。"""

    def __init__(self, ratings: np.ndarray, k: int = 3) -> None:
        self.R = ratings.copy()
        self.k = k
        n = ratings.shape[1]
        self.sim = np.zeros((n, n))
        for i in range(n):
            for j in range(i + 1, n):
                s = cosine_similarity(ratings[:, i], ratings[:, j])
                self.sim[i, j] = self.sim[j, i] = s

    def predict(self, user_id: int, item_id: int) -> float:
        sims = self.sim[item_id].copy()
        sims[item_id] = -1
        rated_mask = self.R[user_id] > 0
        sims[~rated_mask] = -1
        top_k = np.argsort(sims)[-self.k:][::-1]
        top_k = [i for i in top_k if sims[i] > 0]

        if not top_k:
            rated = self.R[:, item_id]
            return float(rated[rated > 0].mean()) if (rated > 0).any() else 3.0

        num = sum(sims[i] * self.R[user_id, i] for i in top_k)
        denom = sum(abs(sims[i]) for i in top_k)
        return float(np.clip(num / denom if denom > 0 else 3.0, 1, 5))

    def recommend(self, user_id: int, top_n: int = 3) -> list[tuple[str, float]]:
        unseen = np.where(self.R[user_id] == 0)[0]
        scores = [(MOVIES[i], self.predict(user_id, i)) for i in unseen]
        return sorted(scores, key=lambda x: x[1], reverse=True)[:top_n]

class MatrixFactorization:
    """
    基于梯度下降的矩阵分解:
    R ≈ P · Q^T,其中 P(n_users, k),Q(n_items, k)
    """

    def __init__(self, n_users: int, n_items: int, k: int = 8,
                 lr: float = 0.01, reg: float = 0.02, seed: int = 42) -> None:
        rng = np.random.RandomState(seed)
        self.P = rng.randn(n_users, k) * 0.1   # 用户隐向量
        self.Q = rng.randn(n_items, k) * 0.1   # 物品隐向量
        self.lr = lr
        self.reg = reg

    def fit(self, R: np.ndarray, epochs: int = 200) -> list[float]:
        """在非零评分上训练。"""
        rows, cols = np.where(R > 0)
        loss_history: list[float] = []

        for epoch in range(epochs):
            # shuffle
            idx = np.random.permutation(len(rows))
            total_loss = 0.0
            for i in idx:
                u, v = rows[i], cols[i]
                pred = self.P[u] @ self.Q[v]
                err = R[u, v] - pred
                # 梯度更新(含 L2 正则)
                self.P[u] += self.lr * (err * self.Q[v] - self.reg * self.P[u])
                self.Q[v] += self.lr * (err * self.P[u] - self.reg * self.Q[v])
                total_loss += err ** 2
            loss_history.append(total_loss / len(rows))

        return loss_history

    def predict(self, user_id: int, item_id: int) -> float:
        return float(np.clip(self.P[user_id] @ self.Q[item_id], 1, 5))

    def recommend(self, user_id: int, R: np.ndarray,
                  top_n: int = 3) -> list[tuple[str, float]]:
        unseen = np.where(R[user_id] == 0)[0]
        scores = [(MOVIES[i], self.predict(user_id, i)) for i in unseen]
        return sorted(scores, key=lambda x: x[1], reverse=True)[:top_n]

def mode_eval() -> None:
    section("评估指标对比(留一法验证)")

    # 留出10%已知评分作为测试集
    rng = np.random.RandomState(42)
    known = list(zip(*np.where(RATINGS > 0)))
    rng.shuffle(known)
    test_size = max(1, len(known) // 10)
    test_set = known[:test_size]

    R_train = RATINGS.copy()
    for u, v in test_set:
        R_train[u, v] = 0

    # 训练三个模型
    ucf = UserCF(R_train, k=3)
    icf = ItemCF(R_train, k=3)
    mf  = MatrixFactorization(N_USERS, N_ITEMS, k=8)
    mf.fit(R_train, epochs=200)

    def rmse(model, test: list) -> float:
        errors = [(RATINGS[u,v] - model.predict(u, v))**2 for u, v in test]
        return float(np.sqrt(np.mean(errors)))

    def mae(model, test: list) -> float:
        errors = [abs(RATINGS[u,v] - model.predict(u, v)) for u, v in test]
        return float(np.mean(errors))

    print(f"\n  测试集大小: {test_size} 个评分\n")
    print(f"  {'模型':<20} {'RMSE':<10} {'MAE':<10} 说明")
    print(f"  {'─'*55}")
    for name, model in [("UserCF (k=3)", ucf), ("ItemCF (k=3)", icf), ("矩阵分解 (k=8)", mf)]:
        r = rmse(model, test_set)
        m = mae(model, test_set)
        print(f"  {name:<20} {r:.4f}    {m:.4f}    {'越小越好'}")

    print(f"\n  冷启动问题分析:")
    print(f"  ├── 新用户(无历史): UserCF/ItemCF 无法推荐 → 用热门榜/内容推荐")
    print(f"  ├── 新物品(无评分): 协同过滤无法推荐 → 用物品属性相似度")
    print(f"  └── 矩阵分解: 需要重新训练才能加入新用户/物品")

mode_eval()

Step 9:用 main 把 usercf/itemcf/mf/eval 做成命令行入口

痛点与机制

最终脚本要让读者像使用工具一样运行,而不是进源码里改函数调用。--mode usercf 看用户协同过滤,--mode itemcf 看物品协同过滤,--mode mf 看矩阵分解,--mode eval 看评估指标。

核心源码(逐字来自文末完整源码)

def main() -> None:
    parser = argparse.ArgumentParser(description="推荐系统:协同过滤与矩阵分解")
    parser.add_argument(
        "--mode",
        choices=["usercf", "itemcf", "mf", "eval", "all"],
        default="all",
    )
    args = parser.parse_args()
    dispatch = {
        "usercf": mode_usercf,
        "itemcf": mode_itemcf,
        "mf":     mode_mf,
        "eval":   mode_eval,
        "all":    lambda: [mode_usercf(), mode_itemcf(), mode_mf(), mode_eval()],
    }
    dispatch[args.mode]()

可运行演示(补齐 Mock 数据与 print 反馈)

import argparse

def main() -> None:
    parser = argparse.ArgumentParser(description="推荐系统:协同过滤与矩阵分解")
    parser.add_argument(
        "--mode",
        choices=["usercf", "itemcf", "mf", "eval", "all"],
        default="all",
    )
    args = parser.parse_args()
    dispatch = {
        "usercf": mode_usercf,
        "itemcf": mode_itemcf,
        "mf":     mode_mf,
        "eval":   mode_eval,
        "all":    lambda: [mode_usercf(), mode_itemcf(), mode_mf(), mode_eval()],
    }
    dispatch[args.mode]()

def mode_usercf() -> None:
    print("运行用户协同过滤")


def mode_itemcf() -> None:
    print("运行物品协同过滤")


def mode_mf() -> None:
    print("运行矩阵分解")


def mode_eval() -> None:
    print("运行评估指标对比")

import sys
for mode in ["usercf", "itemcf", "mf", "eval"]:
    print(f"\n$ python 52-python-recommender.py --mode {mode}")
    sys.argv = ["52-python-recommender.py", "--mode", mode]
    main()

极客实战:完整源码与运行

现在,把上面的积木拼起来,将下面完整代码保存为 52-python-recommender.py。它内置 10 个用户、8 部电影的评分矩阵,不需要准备外部数据,就能跑通 UserCF、ItemCF、矩阵分解和评估指标。

#!/usr/bin/env python3
"""
52-python-recommender.py — 推荐系统:协同过滤与矩阵分解

用法:
  python3 52-python-recommender.py --mode usercf   # 用户协同过滤
  python3 52-python-recommender.py --mode itemcf   # 物品协同过滤
  python3 52-python-recommender.py --mode mf       # 矩阵分解(梯度下降)
  python3 52-python-recommender.py --mode eval     # 评估指标对比
  python3 52-python-recommender.py --mode all      # 全部(默认)

零外部依赖(仅 numpy),直接运行。
"""

import argparse
from typing import Optional

import numpy as np



# 10个用户,8部电影,评分1-5,0表示未评分
RATINGS = np.array([
    [5, 4, 0, 0, 1, 0, 0, 2],   # 用户0:喜欢动作片
    [0, 0, 4, 5, 0, 1, 0, 0],   # 用户1:喜欢科幻片
    [4, 0, 0, 3, 2, 0, 5, 0],   # 用户2:喜欢动作+剧情
    [0, 5, 4, 0, 0, 3, 0, 0],   # 用户3:喜欢科幻+剧情
    [1, 0, 0, 4, 5, 0, 0, 3],   # 用户4:喜欢科幻
    [5, 3, 0, 0, 1, 0, 4, 0],   # 用户5:喜欢动作+剧情
    [0, 0, 5, 4, 0, 2, 0, 0],   # 用户6:喜欢科幻
    [4, 5, 0, 0, 2, 0, 3, 0],   # 用户7:喜欢动作
    [0, 0, 3, 5, 0, 4, 0, 1],   # 用户8:喜欢科幻+剧情
    [3, 0, 0, 0, 4, 0, 5, 2],   # 用户9:喜欢动作+剧情
], dtype=float)

MOVIES = ["复仇者联盟", "速度与激情", "星际穿越", "火星救援",
          "变形金刚", "降临", "肖申克的救赎", "泰坦尼克号"]
USERS  = [f"用户{i}" for i in range(10)]

N_USERS, N_ITEMS = RATINGS.shape

# ─── 工具函数 ──────────────────────────────────────────────────────────────────

def cosine_similarity(a: np.ndarray, b: np.ndarray) -> float:
    """计算两个向量的余弦相似度(忽略0值)。"""
    mask = (a != 0) & (b != 0)
    if mask.sum() == 0:
        return 0.0
    a_m, b_m = a[mask], b[mask]
    denom = np.linalg.norm(a_m) * np.linalg.norm(b_m)
    return float(np.dot(a_m, b_m) / denom) if denom > 0 else 0.0


def pearson_similarity(a: np.ndarray, b: np.ndarray) -> float:
    """皮尔逊相关系数(考虑评分偏差)。"""
    mask = (a != 0) & (b != 0)
    if mask.sum() < 2:
        return 0.0
    a_m, b_m = a[mask], b[mask]
    a_c = a_m - a_m.mean()
    b_c = b_m - b_m.mean()
    denom = np.linalg.norm(a_c) * np.linalg.norm(b_c)
    return float(np.dot(a_c, b_c) / denom) if denom > 0 else 0.0


def section(title: str) -> None:
    print(f"\n{'='*62}\n  {title}\n{'='*62}")

# ─── 用户协同过滤 ──────────────────────────────────────────────────────────────

class UserCF:
    """基于用户的协同过滤。"""

    def __init__(self, ratings: np.ndarray, k: int = 3) -> None:
        self.R = ratings.copy()
        self.k = k
        n = ratings.shape[0]
        # 预计算用户相似度矩阵
        self.sim = np.zeros((n, n))
        for i in range(n):
            for j in range(i + 1, n):
                s = pearson_similarity(ratings[i], ratings[j])
                self.sim[i, j] = self.sim[j, i] = s

    def predict(self, user_id: int, item_id: int) -> float:
        """预测 user_id 对 item_id 的评分。"""
        # 找 K 个最相似且评过该物品的用户
        sims = self.sim[user_id].copy()
        sims[user_id] = -1   # 排除自身
        # 只考虑评过该物品的用户
        rated_mask = self.R[:, item_id] > 0
        sims[~rated_mask] = -1
        top_k = np.argsort(sims)[-self.k:][::-1]
        top_k = [u for u in top_k if sims[u] > 0]

        if not top_k:
            # 回退:返回该物品的平均分
            rated = self.R[:, item_id]
            return float(rated[rated > 0].mean()) if (rated > 0).any() else 3.0

        # 加权平均
        user_mean = self.R[user_id][self.R[user_id] > 0].mean()
        num, denom = 0.0, 0.0
        for u in top_k:
            u_mean = self.R[u][self.R[u] > 0].mean()
            num   += sims[u] * (self.R[u, item_id] - u_mean)
            denom += abs(sims[u])
        return float(np.clip(user_mean + (num / denom if denom > 0 else 0), 1, 5))

    def recommend(self, user_id: int, top_n: int = 3) -> list[tuple[str, float]]:
        """为用户推荐 top_n 部未看过的电影。"""
        unseen = np.where(self.R[user_id] == 0)[0]
        scores = [(MOVIES[i], self.predict(user_id, i)) for i in unseen]
        return sorted(scores, key=lambda x: x[1], reverse=True)[:top_n]

# ─── 物品协同过滤 ──────────────────────────────────────────────────────────────

class ItemCF:
    """基于物品的协同过滤。"""

    def __init__(self, ratings: np.ndarray, k: int = 3) -> None:
        self.R = ratings.copy()
        self.k = k
        n = ratings.shape[1]
        self.sim = np.zeros((n, n))
        for i in range(n):
            for j in range(i + 1, n):
                s = cosine_similarity(ratings[:, i], ratings[:, j])
                self.sim[i, j] = self.sim[j, i] = s

    def predict(self, user_id: int, item_id: int) -> float:
        sims = self.sim[item_id].copy()
        sims[item_id] = -1
        rated_mask = self.R[user_id] > 0
        sims[~rated_mask] = -1
        top_k = np.argsort(sims)[-self.k:][::-1]
        top_k = [i for i in top_k if sims[i] > 0]

        if not top_k:
            rated = self.R[:, item_id]
            return float(rated[rated > 0].mean()) if (rated > 0).any() else 3.0

        num = sum(sims[i] * self.R[user_id, i] for i in top_k)
        denom = sum(abs(sims[i]) for i in top_k)
        return float(np.clip(num / denom if denom > 0 else 3.0, 1, 5))

    def recommend(self, user_id: int, top_n: int = 3) -> list[tuple[str, float]]:
        unseen = np.where(self.R[user_id] == 0)[0]
        scores = [(MOVIES[i], self.predict(user_id, i)) for i in unseen]
        return sorted(scores, key=lambda x: x[1], reverse=True)[:top_n]

# ─── 矩阵分解(梯度下降 SVD)─────────────────────────────────────────────────

class MatrixFactorization:
    """
    基于梯度下降的矩阵分解:
    R ≈ P · Q^T,其中 P(n_users, k),Q(n_items, k)
    """

    def __init__(self, n_users: int, n_items: int, k: int = 8,
                 lr: float = 0.01, reg: float = 0.02, seed: int = 42) -> None:
        rng = np.random.RandomState(seed)
        self.P = rng.randn(n_users, k) * 0.1   # 用户隐向量
        self.Q = rng.randn(n_items, k) * 0.1   # 物品隐向量
        self.lr = lr
        self.reg = reg

    def fit(self, R: np.ndarray, epochs: int = 200) -> list[float]:
        """在非零评分上训练。"""
        rows, cols = np.where(R > 0)
        loss_history: list[float] = []

        for epoch in range(epochs):
            # shuffle
            idx = np.random.permutation(len(rows))
            total_loss = 0.0
            for i in idx:
                u, v = rows[i], cols[i]
                pred = self.P[u] @ self.Q[v]
                err = R[u, v] - pred
                # 梯度更新(含 L2 正则)
                self.P[u] += self.lr * (err * self.Q[v] - self.reg * self.P[u])
                self.Q[v] += self.lr * (err * self.P[u] - self.reg * self.Q[v])
                total_loss += err ** 2
            loss_history.append(total_loss / len(rows))

        return loss_history

    def predict(self, user_id: int, item_id: int) -> float:
        return float(np.clip(self.P[user_id] @ self.Q[item_id], 1, 5))

    def recommend(self, user_id: int, R: np.ndarray,
                  top_n: int = 3) -> list[tuple[str, float]]:
        unseen = np.where(R[user_id] == 0)[0]
        scores = [(MOVIES[i], self.predict(user_id, i)) for i in unseen]
        return sorted(scores, key=lambda x: x[1], reverse=True)[:top_n]

# ─── 模式1:用户协同过滤 ───────────────────────────────────────────────────────

def mode_usercf() -> None:
    section("用户协同过滤(UserCF)")
    ucf = UserCF(RATINGS, k=3)

    # 显示用户相似度矩阵(前5个用户)
    print("\n  用户相似度矩阵(皮尔逊,前5个用户):")
    print(f"  {'':8}" + "".join(f"{USERS[j]:>8}" for j in range(5)))
    for i in range(5):
        row = "".join(f"{ucf.sim[i,j]:8.3f}" for j in range(5))
        print(f"  {USERS[i]:8}{row}")

    # 为用户0和用户1推荐
    print(f"\n  推荐结果:")
    for uid in [0, 1, 2]:
        recs = ucf.recommend(uid, top_n=3)
        seen = [MOVIES[i] for i in range(N_ITEMS) if RATINGS[uid, i] > 0]
        print(f"\n  {USERS[uid]}(已看: {', '.join(seen[:3])}...)")
        for movie, score in recs:
            bar = "★" * int(score)
            print(f"    推荐: {movie:<12} 预测评分: {score:.2f}  {bar}")

# ─── 模式2:物品协同过滤 ───────────────────────────────────────────────────────

def mode_itemcf() -> None:
    section("物品协同过滤(ItemCF)")
    icf = ItemCF(RATINGS, k=3)

    print("\n  物品相似度矩阵(余弦):")
    print(f"  {'':14}" + "".join(f"{m[:4]:>6}" for m in MOVIES))
    for i, movie in enumerate(MOVIES):
        row = "".join(f"{icf.sim[i,j]:6.2f}" for j in range(N_ITEMS))
        print(f"  {movie[:12]:<14}{row}")

    print(f"\n  推荐结果(用户0 vs 用户1):")
    for uid in [0, 1]:
        recs = icf.recommend(uid, top_n=3)
        print(f"\n  {USERS[uid]}:")
        for movie, score in recs:
            print(f"    {movie:<12} 预测评分: {score:.2f}")

# ─── 模式3:矩阵分解 ───────────────────────────────────────────────────────────

def mode_mf() -> None:
    section("矩阵分解(梯度下降 SVD,k=8 隐因子)")
    mf = MatrixFactorization(N_USERS, N_ITEMS, k=8, lr=0.01, reg=0.02)
    history = mf.fit(RATINGS, epochs=300)

    # 损失曲线
    print("\n  训练损失曲线(每60轮):")
    checkpoints = [0, 60, 120, 180, 240, 299]
    max_loss = history[0]
    W = 35
    for ep in checkpoints:
        loss = history[ep]
        bar = "█" * int(loss / max_loss * W)
        print(f"  epoch {ep+1:>3}{bar:<{W}}{loss:.4f}")

    print(f"\n  推荐结果(用户0、1、2):")
    for uid in [0, 1, 2]:
        recs = mf.recommend(uid, RATINGS, top_n=3)
        print(f"\n  {USERS[uid]}:")
        for movie, score in recs:
            bar = "★" * int(score)
            print(f"    {movie:<12} 预测评分: {score:.2f}  {bar}")

# ─── 模式4:评估指标对比 ───────────────────────────────────────────────────────

def mode_eval() -> None:
    section("评估指标对比(留一法验证)")

    # 留出10%已知评分作为测试集
    rng = np.random.RandomState(42)
    known = list(zip(*np.where(RATINGS > 0)))
    rng.shuffle(known)
    test_size = max(1, len(known) // 10)
    test_set = known[:test_size]

    R_train = RATINGS.copy()
    for u, v in test_set:
        R_train[u, v] = 0

    # 训练三个模型
    ucf = UserCF(R_train, k=3)
    icf = ItemCF(R_train, k=3)
    mf  = MatrixFactorization(N_USERS, N_ITEMS, k=8)
    mf.fit(R_train, epochs=200)

    def rmse(model, test: list) -> float:
        errors = [(RATINGS[u,v] - model.predict(u, v))**2 for u, v in test]
        return float(np.sqrt(np.mean(errors)))

    def mae(model, test: list) -> float:
        errors = [abs(RATINGS[u,v] - model.predict(u, v)) for u, v in test]
        return float(np.mean(errors))

    print(f"\n  测试集大小: {test_size} 个评分\n")
    print(f"  {'模型':<20} {'RMSE':<10} {'MAE':<10} 说明")
    print(f"  {'─'*55}")
    for name, model in [("UserCF (k=3)", ucf), ("ItemCF (k=3)", icf), ("矩阵分解 (k=8)", mf)]:
        r = rmse(model, test_set)
        m = mae(model, test_set)
        print(f"  {name:<20} {r:.4f}    {m:.4f}    {'越小越好'}")

    print(f"\n  冷启动问题分析:")
    print(f"  ├── 新用户(无历史): UserCF/ItemCF 无法推荐 → 用热门榜/内容推荐")
    print(f"  ├── 新物品(无评分): 协同过滤无法推荐 → 用物品属性相似度")
    print(f"  └── 矩阵分解: 需要重新训练才能加入新用户/物品")

# ─── 入口 ─────────────────────────────────────────────────────────────────────

def main() -> None:
    parser = argparse.ArgumentParser(description="推荐系统:协同过滤与矩阵分解")
    parser.add_argument(
        "--mode",
        choices=["usercf", "itemcf", "mf", "eval", "all"],
        default="all",
    )
    args = parser.parse_args()
    dispatch = {
        "usercf": mode_usercf,
        "itemcf": mode_itemcf,
        "mf":     mode_mf,
        "eval":   mode_eval,
        "all":    lambda: [mode_usercf(), mode_itemcf(), mode_mf(), mode_eval()],
    }
    dispatch[args.mode]()


if __name__ == "__main__":
    main()
$ python 52-python-recommender.py --mode usercf
==============================================================
  用户协同过滤(UserCF)
==============================================================

  用户相似度矩阵(皮尔逊,前5个用户):
               用户0     用户1     用户2     用户3     用户4
  用户0        0.000   0.000   1.000   0.000  -0.961
  用户1        0.000   0.000   0.000   1.000   0.000
  用户2        1.000   0.000   0.000   0.000  -0.961
  用户3        0.000   1.000   0.000   0.000   0.000
  用户4       -0.961   0.000  -0.961   0.000   0.000

  推荐结果:

  用户0(已看: 复仇者联盟, 速度与激情, 变形金刚...)
    推荐: 星际穿越         预测评分: 4.00  ★★★★
    推荐: 肖申克的救赎       预测评分: 3.64  ★★★
    推荐: 火星救援         预测评分: 2.50  ★★

  用户1(已看: 星际穿越, 火星救援, 降临...)
    推荐: 速度与激情        预测评分: 4.33  ★★★★
    推荐: 肖申克的救赎       预测评分: 4.25  ★★★★
    推荐: 复仇者联盟        预测评分: 3.67  ★★★

  用户2(已看: 复仇者联盟, 火星救援, 变形金刚...)
    推荐: 速度与激情        预测评分: 4.21  ★★★★
    推荐: 星际穿越         预测评分: 4.00  ★★★★
    推荐: 降临           预测评分: 2.50  ★★

$ python 52-python-recommender.py --mode mf
==============================================================
  矩阵分解(梯度下降 SVD,k=8 隐因子)
==============================================================

  训练损失曲线(每60轮):
  epoch   1 │███████████████████████████████████│ 13.4787
  epoch  61 │                                   │ 0.2927
  epoch 121 │                                   │ 0.0465
  epoch 181 │                                   │ 0.0111
  epoch 241 │                                   │ 0.0047
  epoch 300 │                                   │ 0.0027

  推荐结果(用户0、1、2):

  用户0:
    肖申克的救赎       预测评分: 3.32  ★★★
    星际穿越         预测评分: 2.19  ★★
    火星救援         预测评分: 1.51  ★

  用户1:
    变形金刚         预测评分: 3.95  ★★★
    速度与激情        预测评分: 3.06  ★★★
    肖申克的救赎       预测评分: 2.90  ★★

  用户2:
    速度与激情        预测评分: 3.96  ★★★
    星际穿越         预测评分: 3.38  ★★★
    泰坦尼克号        预测评分: 1.79  ★

$ python 52-python-recommender.py --mode eval
==============================================================
  评估指标对比(留一法验证)
==============================================================

  测试集大小: 3 个评分

  模型                   RMSE       MAE        说明
  ───────────────────────────────────────────────────────
  UserCF (k=3)         0.8197    0.7639    越小越好
  ItemCF (k=3)         0.7907    0.7229    越小越好
  矩阵分解 (k=8)           1.6157    1.5877    越小越好

  冷启动问题分析:
  ├── 新用户(无历史): UserCF/ItemCF 无法推荐 → 用热门榜/内容推荐
  ├── 新物品(无评分): 协同过滤无法推荐 → 用物品属性相似度
  └── 矩阵分解: 需要重新训练才能加入新用户/物品

小结与 NexDo Time ⚡

这一篇你已经跑通了推荐系统的三个核心套路:UserCF 借相似用户的口味,ItemCF 借相似物品的关系,矩阵分解学习用户和电影的隐含偏好。评估时用 RMSE/MAE 看预测分和真实分的差距,冷启动问题则提醒我们:没有历史行为时,推荐系统需要热门榜、内容特征或新手引导来补位。

5 分钟微操挑战:把 UserCF(RATINGS, k=3)ItemCF(RATINGS, k=3)k 分别改成 15,运行 --mode eval,观察 RMSE/MAE 是否变好。

Don’t wait for next time, do it in the next moment.