文章

48 · 神经网络基础:从零手写前向传播与反向传播

#027 · 2026-04-16 · Python

🔗 知识图谱导航:阅读本文前,建议先回顾《31 · NumPy 高性能数组计算》里的矩阵乘法,以及《47 · 神经网络原理揭秘与本地 AI Agent 调度》里的神经网络直觉。本文会把这些概念落到纯 numpy 手写训练代码里。 NexDo Time · 2026-04-17 · 预计阅读 32 分钟

痛点与架构

框架能让你三行代码训练模型,但也容易把最关键的过程藏起来:前向传播到底怎么算?损失为什么能下降?反向传播怎么知道该改哪个权重?这一篇不直接调用深度学习框架,而是用 numpy 手写一个两层全连接神经网络。

你可以把整套训练流程想成“学生刷题”:前向传播是先答题,损失函数是老师批卷,反向传播是分析错因,梯度下降是订正错题。重复很多轮之后,模型就从错误里学到了参数。

数据 X -> W1/b1 -> ReLU -> W2/b2 -> Softmax -> 预测概率
                                      │
真实标签 one-hot -> CrossEntropy <----┘
                                      │
                         Backward -> 更新 W1/W2/b1/b2

步步为营:核心逻辑自适应拆解

这篇的代码比前一篇更接近训练框架底层,所以我们拆成 8 个小步骤。每个步骤先看文末完整源码里的真实片段,再运行一个补齐上下文的小演示。不要急着背公式,先把每一段输出看懂。

Step 1:用激活函数和交叉熵把“打分”变成“可学习的误差”

痛点与机制

神经网络先输出一堆原始分数,这些分数本身不好解释。ReLU 像门卫,负数不让进;Softmax 像把票数换算成百分比,让每一行概率加起来等于 1;交叉熵像老师批卷,模型越自信地选错,扣分越狠。先跑这一小段,你会看到“分数 -> 概率 -> 损失”的完整链路。

核心源码(逐字来自文末完整源码)


def relu(z: np.ndarray) -> np.ndarray:
    return np.maximum(0.0, z)

def relu_grad(z: np.ndarray) -> np.ndarray:
    """ReLU 的导数:z>0 时为1,否则为0。"""
    return (z > 0).astype(float)

def sigmoid(z: np.ndarray) -> np.ndarray:
    return 1.0 / (1.0 + np.exp(-np.clip(z, -500, 500)))

def softmax(z: np.ndarray) -> np.ndarray:
    e = np.exp(z - z.max(axis=1, keepdims=True))   # 数值稳定
    return e / e.sum(axis=1, keepdims=True)

def cross_entropy(y_pred: np.ndarray, y_true: np.ndarray) -> float:
    """多分类交叉熵损失(y_true 为 one-hot)。"""
    n = y_true.shape[0]
    return -np.sum(y_true * np.log(y_pred + 1e-9)) / n

可运行演示(补齐 Mock 数据与 print 反馈)

import numpy as np


def relu(z: np.ndarray) -> np.ndarray:
    return np.maximum(0.0, z)

def relu_grad(z: np.ndarray) -> np.ndarray:
    """ReLU 的导数:z>0 时为1,否则为0。"""
    return (z > 0).astype(float)

def sigmoid(z: np.ndarray) -> np.ndarray:
    return 1.0 / (1.0 + np.exp(-np.clip(z, -500, 500)))

def softmax(z: np.ndarray) -> np.ndarray:
    e = np.exp(z - z.max(axis=1, keepdims=True))   # 数值稳定
    return e / e.sum(axis=1, keepdims=True)

def cross_entropy(y_pred: np.ndarray, y_true: np.ndarray) -> float:
    """多分类交叉熵损失(y_true 为 one-hot)。"""
    n = y_true.shape[0]
    return -np.sum(y_true * np.log(y_pred + 1e-9)) / n

logits = np.array([[-1.0, 0.5, 2.0]])
labels = np.array([[0.0, 0.0, 1.0]])

print("原始打分 logits:", logits.tolist())
print("ReLU 后:", relu(logits).tolist())
print("Sigmoid 后:", np.round(sigmoid(logits), 4).tolist())
prob = softmax(logits)
print("Softmax 概率:", np.round(prob, 4).tolist(), "每行和=", round(float(prob.sum()), 4))
print("交叉熵损失:", round(cross_entropy(prob, labels), 4))

Step 2:用初始化和 forward 看懂参数矩阵怎么接线

痛点与机制

网络结构不是魔法,而是一组矩阵接线。W1 负责把 4 个输入特征接到 5 个隐藏神经元,W2 再把隐藏层接到 3 个类别。可以把每个权重矩阵想成配电箱,输入电流经过不同线路,最后在输出层形成各类别概率。

核心源码(逐字来自文末完整源码)

    def __init__(self, n_in: int, n_hidden: int, n_out: int,
                 lr: float = 0.01, seed: int = 42) -> None:
        rng = np.random.RandomState(seed)
        # He 初始化(适合 ReLU)
        self.W1: np.ndarray = rng.randn(n_in, n_hidden) * np.sqrt(2.0 / n_in)
        self.b1: np.ndarray = np.zeros((1, n_hidden))
        self.W2: np.ndarray = rng.randn(n_hidden, n_out) * np.sqrt(2.0 / n_hidden)
        self.b2: np.ndarray = np.zeros((1, n_out))
        self.lr = lr
        # 缓存前向传播中间值(反向传播需要)
        self._cache: dict = {}

    def forward(self, X: np.ndarray) -> np.ndarray:
        Z1 = X @ self.W1 + self.b1          # (n, hidden)
        A1 = relu(Z1)                        # (n, hidden)
        Z2 = A1 @ self.W2 + self.b2         # (n, out)
        A2 = softmax(Z2)                     # (n, out)
        self._cache = {"X": X, "Z1": Z1, "A1": A1, "Z2": Z2, "A2": A2}
        return A2

可运行演示(补齐 Mock 数据与 print 反馈)

from typing import Optional

import numpy as np


def relu(z: np.ndarray) -> np.ndarray:
    return np.maximum(0.0, z)

def relu_grad(z: np.ndarray) -> np.ndarray:
    """ReLU 的导数:z>0 时为1,否则为0。"""
    return (z > 0).astype(float)

def sigmoid(z: np.ndarray) -> np.ndarray:
    return 1.0 / (1.0 + np.exp(-np.clip(z, -500, 500)))

def softmax(z: np.ndarray) -> np.ndarray:
    e = np.exp(z - z.max(axis=1, keepdims=True))   # 数值稳定
    return e / e.sum(axis=1, keepdims=True)

def cross_entropy(y_pred: np.ndarray, y_true: np.ndarray) -> float:
    """多分类交叉熵损失(y_true 为 one-hot)。"""
    n = y_true.shape[0]
    return -np.sum(y_true * np.log(y_pred + 1e-9)) / n

class NeuralNetwork:
    """只保留初始化和前向传播,先看参数形状。"""
    def __init__(self, n_in: int, n_hidden: int, n_out: int,
                 lr: float = 0.01, seed: int = 42) -> None:
        rng = np.random.RandomState(seed)
        # He 初始化(适合 ReLU)
        self.W1: np.ndarray = rng.randn(n_in, n_hidden) * np.sqrt(2.0 / n_in)
        self.b1: np.ndarray = np.zeros((1, n_hidden))
        self.W2: np.ndarray = rng.randn(n_hidden, n_out) * np.sqrt(2.0 / n_hidden)
        self.b2: np.ndarray = np.zeros((1, n_out))
        self.lr = lr
        # 缓存前向传播中间值(反向传播需要)
        self._cache: dict = {}

    def forward(self, X: np.ndarray) -> np.ndarray:
        Z1 = X @ self.W1 + self.b1          # (n, hidden)
        A1 = relu(Z1)                        # (n, hidden)
        Z2 = A1 @ self.W2 + self.b2         # (n, out)
        A2 = softmax(Z2)                     # (n, out)
        self._cache = {"X": X, "Z1": Z1, "A1": A1, "Z2": Z2, "A2": A2}
        return A2

X = np.array([[0.2, -0.4, 1.0, 0.7]])
model = NeuralNetwork(n_in=4, n_hidden=5, n_out=3, lr=0.01)
prob = model.forward(X)

print("W1 形状:", model.W1.shape, "负责 4 个输入 -> 5 个隐藏神经元")
print("W2 形状:", model.W2.shape, "负责 5 个隐藏神经元 -> 3 个类别")
print("输出概率:", np.round(prob, 4).tolist())
print("预测类别:", int(prob.argmax(axis=1)[0]))

Step 3:用 backward 把错误沿链式法则往回传

痛点与机制

反向传播像复盘错题:先看最终答案错了多少,再倒推是输出层权重错得多,还是隐藏层特征提取错得多。dZ2 -> dW2 -> dA1 -> dZ1 -> dW1 就是这条追责链。运行演示会打印更新前后损失和梯度范数,帮助你确认参数真的被调整了。

核心源码(逐字来自文末完整源码)

    def backward(self, y_true: np.ndarray) -> dict[str, float]:
        """反向传播,更新参数,返回各层梯度范数(用于监控)。"""
        n = y_true.shape[0]
        X, Z1, A1, A2 = (self._cache[k] for k in ("X", "Z1", "A1", "A2"))

        # 输出层梯度(softmax + cross-entropy 联合求导,结果简洁)
        dZ2 = (A2 - y_true) / n             # (n, out)
        dW2 = A1.T @ dZ2                    # (hidden, out)
        db2 = dZ2.sum(axis=0, keepdims=True)

        # 隐藏层梯度(链式法则)
        dA1 = dZ2 @ self.W2.T               # (n, hidden)
        dZ1 = dA1 * relu_grad(Z1)           # (n, hidden)
        dW1 = X.T @ dZ1                     # (in, hidden)
        db1 = dZ1.sum(axis=0, keepdims=True)

        # 梯度下降更新
        self.W2 -= self.lr * dW2
        self.b2 -= self.lr * db2
        self.W1 -= self.lr * dW1
        self.b1 -= self.lr * db1

        return {
            "grad_W1": float(np.linalg.norm(dW1)),
            "grad_W2": float(np.linalg.norm(dW2)),
        }

可运行演示(补齐 Mock 数据与 print 反馈)

from typing import Optional

import numpy as np


def relu(z: np.ndarray) -> np.ndarray:
    return np.maximum(0.0, z)

def relu_grad(z: np.ndarray) -> np.ndarray:
    """ReLU 的导数:z>0 时为1,否则为0。"""
    return (z > 0).astype(float)

def sigmoid(z: np.ndarray) -> np.ndarray:
    return 1.0 / (1.0 + np.exp(-np.clip(z, -500, 500)))

def softmax(z: np.ndarray) -> np.ndarray:
    e = np.exp(z - z.max(axis=1, keepdims=True))   # 数值稳定
    return e / e.sum(axis=1, keepdims=True)

def cross_entropy(y_pred: np.ndarray, y_true: np.ndarray) -> float:
    """多分类交叉熵损失(y_true 为 one-hot)。"""
    n = y_true.shape[0]
    return -np.sum(y_true * np.log(y_pred + 1e-9)) / n

class NeuralNetwork:
    """
    两层全连接神经网络:
      输入层(n_in) → 隐藏层(n_hidden, ReLU) → 输出层(n_out, Softmax)
    """

    def __init__(self, n_in: int, n_hidden: int, n_out: int,
                 lr: float = 0.01, seed: int = 42) -> None:
        rng = np.random.RandomState(seed)
        # He 初始化(适合 ReLU)
        self.W1: np.ndarray = rng.randn(n_in, n_hidden) * np.sqrt(2.0 / n_in)
        self.b1: np.ndarray = np.zeros((1, n_hidden))
        self.W2: np.ndarray = rng.randn(n_hidden, n_out) * np.sqrt(2.0 / n_hidden)
        self.b2: np.ndarray = np.zeros((1, n_out))
        self.lr = lr
        # 缓存前向传播中间值(反向传播需要)
        self._cache: dict = {}

    def forward(self, X: np.ndarray) -> np.ndarray:
        Z1 = X @ self.W1 + self.b1          # (n, hidden)
        A1 = relu(Z1)                        # (n, hidden)
        Z2 = A1 @ self.W2 + self.b2         # (n, out)
        A2 = softmax(Z2)                     # (n, out)
        self._cache = {"X": X, "Z1": Z1, "A1": A1, "Z2": Z2, "A2": A2}
        return A2

    def backward(self, y_true: np.ndarray) -> dict[str, float]:
        """反向传播,更新参数,返回各层梯度范数(用于监控)。"""
        n = y_true.shape[0]
        X, Z1, A1, A2 = (self._cache[k] for k in ("X", "Z1", "A1", "A2"))

        # 输出层梯度(softmax + cross-entropy 联合求导,结果简洁)
        dZ2 = (A2 - y_true) / n             # (n, out)
        dW2 = A1.T @ dZ2                    # (hidden, out)
        db2 = dZ2.sum(axis=0, keepdims=True)

        # 隐藏层梯度(链式法则)
        dA1 = dZ2 @ self.W2.T               # (n, hidden)
        dZ1 = dA1 * relu_grad(Z1)           # (n, hidden)
        dW1 = X.T @ dZ1                     # (in, hidden)
        db1 = dZ1.sum(axis=0, keepdims=True)

        # 梯度下降更新
        self.W2 -= self.lr * dW2
        self.b2 -= self.lr * db2
        self.W1 -= self.lr * dW1
        self.b1 -= self.lr * db1

        return {
            "grad_W1": float(np.linalg.norm(dW1)),
            "grad_W2": float(np.linalg.norm(dW2)),
        }

    def predict(self, X: np.ndarray) -> np.ndarray:
        return self.forward(X).argmax(axis=1)

    def fit(self, X: np.ndarray, y_onehot: np.ndarray,
            epochs: int = 200, batch_size: int = 64,
            verbose: bool = True) -> list[float]:
        """Mini-batch 梯度下降训练。"""
        n = X.shape[0]
        loss_history: list[float] = []
        for epoch in range(1, epochs + 1):
            # shuffle
            idx = np.random.permutation(n)
            X_s, y_s = X[idx], y_onehot[idx]
            epoch_loss = 0.0
            for start in range(0, n, batch_size):
                Xb = X_s[start:start + batch_size]
                yb = y_s[start:start + batch_size]
                pred = self.forward(Xb)
                epoch_loss += cross_entropy(pred, yb) * len(Xb)
                self.backward(yb)
            epoch_loss /= n
            loss_history.append(epoch_loss)
            if verbose and epoch % 50 == 0:
                print(f"    epoch {epoch:>4}  loss={epoch_loss:.4f}")
        return loss_history

X = np.array([[0.2, -0.4, 1.0, 0.7], [1.2, 0.3, -0.8, 0.5]])
y = np.array([[1.0, 0.0, 0.0], [0.0, 1.0, 0.0]])
model = NeuralNetwork(n_in=4, n_hidden=5, n_out=3, lr=0.05)

before = cross_entropy(model.forward(X), y)
grads = model.backward(y)
after = cross_entropy(model.forward(X), y)

print("更新前损失:", round(before, 4))
print("梯度范数:", {k: round(v, 4) for k, v in grads.items()})
print("更新后损失:", round(after, 4))
print("直觉:反向传播像把错题原因一层层往回传,再微调每个旋钮。")

Step 4:用 fit 组织 Mini-batch 训练循环

痛点与机制

单次反向传播只改一步,真正训练要重复很多轮。fit() 像健身计划:每轮先打乱样本,再分小批次训练,最后记录损失。Mini-batch 的好处是不用一次搬完整个数据集,也不会像单样本训练那样抖得太厉害。

核心源码(逐字来自文末完整源码)

    def fit(self, X: np.ndarray, y_onehot: np.ndarray,
            epochs: int = 200, batch_size: int = 64,
            verbose: bool = True) -> list[float]:
        """Mini-batch 梯度下降训练。"""
        n = X.shape[0]
        loss_history: list[float] = []
        for epoch in range(1, epochs + 1):
            # shuffle
            idx = np.random.permutation(n)
            X_s, y_s = X[idx], y_onehot[idx]
            epoch_loss = 0.0
            for start in range(0, n, batch_size):
                Xb = X_s[start:start + batch_size]
                yb = y_s[start:start + batch_size]
                pred = self.forward(Xb)
                epoch_loss += cross_entropy(pred, yb) * len(Xb)
                self.backward(yb)
            epoch_loss /= n
            loss_history.append(epoch_loss)
            if verbose and epoch % 50 == 0:
                print(f"    epoch {epoch:>4}  loss={epoch_loss:.4f}")
        return loss_history

可运行演示(补齐 Mock 数据与 print 反馈)

from typing import Optional

import numpy as np


def relu(z: np.ndarray) -> np.ndarray:
    return np.maximum(0.0, z)

def relu_grad(z: np.ndarray) -> np.ndarray:
    """ReLU 的导数:z>0 时为1,否则为0。"""
    return (z > 0).astype(float)

def sigmoid(z: np.ndarray) -> np.ndarray:
    return 1.0 / (1.0 + np.exp(-np.clip(z, -500, 500)))

def softmax(z: np.ndarray) -> np.ndarray:
    e = np.exp(z - z.max(axis=1, keepdims=True))   # 数值稳定
    return e / e.sum(axis=1, keepdims=True)

def cross_entropy(y_pred: np.ndarray, y_true: np.ndarray) -> float:
    """多分类交叉熵损失(y_true 为 one-hot)。"""
    n = y_true.shape[0]
    return -np.sum(y_true * np.log(y_pred + 1e-9)) / n

class NeuralNetwork:
    """
    两层全连接神经网络:
      输入层(n_in) → 隐藏层(n_hidden, ReLU) → 输出层(n_out, Softmax)
    """

    def __init__(self, n_in: int, n_hidden: int, n_out: int,
                 lr: float = 0.01, seed: int = 42) -> None:
        rng = np.random.RandomState(seed)
        # He 初始化(适合 ReLU)
        self.W1: np.ndarray = rng.randn(n_in, n_hidden) * np.sqrt(2.0 / n_in)
        self.b1: np.ndarray = np.zeros((1, n_hidden))
        self.W2: np.ndarray = rng.randn(n_hidden, n_out) * np.sqrt(2.0 / n_hidden)
        self.b2: np.ndarray = np.zeros((1, n_out))
        self.lr = lr
        # 缓存前向传播中间值(反向传播需要)
        self._cache: dict = {}

    def forward(self, X: np.ndarray) -> np.ndarray:
        Z1 = X @ self.W1 + self.b1          # (n, hidden)
        A1 = relu(Z1)                        # (n, hidden)
        Z2 = A1 @ self.W2 + self.b2         # (n, out)
        A2 = softmax(Z2)                     # (n, out)
        self._cache = {"X": X, "Z1": Z1, "A1": A1, "Z2": Z2, "A2": A2}
        return A2

    def backward(self, y_true: np.ndarray) -> dict[str, float]:
        """反向传播,更新参数,返回各层梯度范数(用于监控)。"""
        n = y_true.shape[0]
        X, Z1, A1, A2 = (self._cache[k] for k in ("X", "Z1", "A1", "A2"))

        # 输出层梯度(softmax + cross-entropy 联合求导,结果简洁)
        dZ2 = (A2 - y_true) / n             # (n, out)
        dW2 = A1.T @ dZ2                    # (hidden, out)
        db2 = dZ2.sum(axis=0, keepdims=True)

        # 隐藏层梯度(链式法则)
        dA1 = dZ2 @ self.W2.T               # (n, hidden)
        dZ1 = dA1 * relu_grad(Z1)           # (n, hidden)
        dW1 = X.T @ dZ1                     # (in, hidden)
        db1 = dZ1.sum(axis=0, keepdims=True)

        # 梯度下降更新
        self.W2 -= self.lr * dW2
        self.b2 -= self.lr * db2
        self.W1 -= self.lr * dW1
        self.b1 -= self.lr * db1

        return {
            "grad_W1": float(np.linalg.norm(dW1)),
            "grad_W2": float(np.linalg.norm(dW2)),
        }

    def predict(self, X: np.ndarray) -> np.ndarray:
        return self.forward(X).argmax(axis=1)

    def fit(self, X: np.ndarray, y_onehot: np.ndarray,
            epochs: int = 200, batch_size: int = 64,
            verbose: bool = True) -> list[float]:
        """Mini-batch 梯度下降训练。"""
        n = X.shape[0]
        loss_history: list[float] = []
        for epoch in range(1, epochs + 1):
            # shuffle
            idx = np.random.permutation(n)
            X_s, y_s = X[idx], y_onehot[idx]
            epoch_loss = 0.0
            for start in range(0, n, batch_size):
                Xb = X_s[start:start + batch_size]
                yb = y_s[start:start + batch_size]
                pred = self.forward(Xb)
                epoch_loss += cross_entropy(pred, yb) * len(Xb)
                self.backward(yb)
            epoch_loss /= n
            loss_history.append(epoch_loss)
            if verbose and epoch % 50 == 0:
                print(f"    epoch {epoch:>4}  loss={epoch_loss:.4f}")
        return loss_history

rng = np.random.RandomState(7)
X = rng.randn(12, 4)
labels = rng.randint(0, 3, size=12)
y = np.zeros((12, 3))
y[np.arange(12), labels] = 1
model = NeuralNetwork(n_in=4, n_hidden=6, n_out=3, lr=0.05)

history = model.fit(X, y, epochs=6, batch_size=4, verbose=False)
print("损失历史:", [round(v, 4) for v in history])
print("预测类别:", model.predict(X[:5]).tolist())
print("真实类别:", labels[:5].tolist())

Step 5:用 make_data 生成标准化训练集和 one-hot 标签

痛点与机制

训练网络之前,数据要先整理成模型爱吃的形状。StandardScaler 像把不同量纲的食材切成差不多大小,one-hot 编码像答题卡:第 2 类就把第 2 个格子涂黑。这样交叉熵才能准确比较预测概率和真实答案。

核心源码(逐字来自文末完整源码)

def make_data(n_classes: int = 3) -> tuple:
    X, y = make_classification(
        n_samples=800, n_features=10, n_informative=6,
        n_classes=n_classes, n_clusters_per_class=1, random_state=42,
    )
    scaler = StandardScaler()
    X = scaler.fit_transform(X)
    X_tr, X_te, y_tr, y_te = train_test_split(X, y, test_size=0.2, random_state=42)
    # one-hot 编码
    def onehot(labels: np.ndarray, n: int) -> np.ndarray:
        oh = np.zeros((len(labels), n))
        oh[np.arange(len(labels)), labels] = 1
        return oh
    return X_tr, X_te, y_tr, y_te, onehot(y_tr, n_classes), onehot(y_te, n_classes)

可运行演示(补齐 Mock 数据与 print 反馈)

import numpy as np
from sklearn.datasets import make_classification
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler

def make_data(n_classes: int = 3) -> tuple:
    X, y = make_classification(
        n_samples=800, n_features=10, n_informative=6,
        n_classes=n_classes, n_clusters_per_class=1, random_state=42,
    )
    scaler = StandardScaler()
    X = scaler.fit_transform(X)
    X_tr, X_te, y_tr, y_te = train_test_split(X, y, test_size=0.2, random_state=42)
    # one-hot 编码
    def onehot(labels: np.ndarray, n: int) -> np.ndarray:
        oh = np.zeros((len(labels), n))
        oh[np.arange(len(labels)), labels] = 1
        return oh
    return X_tr, X_te, y_tr, y_te, onehot(y_tr, n_classes), onehot(y_te, n_classes)

X_tr, X_te, y_tr, y_te, y_tr_oh, y_te_oh = make_data(n_classes=3)
print("训练特征形状:", X_tr.shape)
print("测试特征形状:", X_te.shape)
print("原始标签前 8 个:", y_tr[:8].tolist())
print("one-hot 标签前 2 行:", y_tr_oh[:2].astype(int).tolist())
print("说明:one-hot 就像考试答题卡,每个类别只有一个格子被涂黑。")

Step 6:用 mode_forward 逐层打印前向传播的形状

痛点与机制

新手调神经网络最怕形状对不上。mode_forward() 故意只用 3 个样本,从 X、Z1、A1、Z2、A2 一路打印 shape,就像给流水线每一站装监控摄像头。只要形状能对上,后面的损失和反向传播才有基础。

核心源码(逐字来自文末完整源码)

def mode_forward() -> None:
    print("\n" + "="*60 + "\n  前向传播逐步演示(小型网络,3个样本)\n" + "="*60)
    np.random.seed(42)
    X = np.random.randn(3, 4)   # 3个样本,4个特征
    nn = NeuralNetwork(n_in=4, n_hidden=5, n_out=3, lr=0.01)

    print(f"\n  输入 X.shape = {X.shape}")
    Z1 = X @ nn.W1 + nn.b1
    print(f"  Z1 = X·W1+b1  shape={Z1.shape}  (线性变换)")
    A1 = relu(Z1)
    print(f"  A1 = ReLU(Z1) shape={A1.shape}  (激活,负值归零)")
    Z2 = A1 @ nn.W2 + nn.b2
    print(f"  Z2 = A1·W2+b2 shape={Z2.shape}  (输出层线性变换)")
    A2 = softmax(Z2)
    print(f"  A2 = Softmax  shape={A2.shape}  (概率分布)")
    print(f"\n  预测概率(每行和为1):")
    for i, row in enumerate(A2):
        bar = "  ".join(f"类{j}:{p:.3f}" for j, p in enumerate(row))
        print(f"    样本{i}: {bar}  → 预测类{row.argmax()}")

可运行演示(补齐 Mock 数据与 print 反馈)

from typing import Optional

import numpy as np


def relu(z: np.ndarray) -> np.ndarray:
    return np.maximum(0.0, z)

def relu_grad(z: np.ndarray) -> np.ndarray:
    """ReLU 的导数:z>0 时为1,否则为0。"""
    return (z > 0).astype(float)

def sigmoid(z: np.ndarray) -> np.ndarray:
    return 1.0 / (1.0 + np.exp(-np.clip(z, -500, 500)))

def softmax(z: np.ndarray) -> np.ndarray:
    e = np.exp(z - z.max(axis=1, keepdims=True))   # 数值稳定
    return e / e.sum(axis=1, keepdims=True)

def cross_entropy(y_pred: np.ndarray, y_true: np.ndarray) -> float:
    """多分类交叉熵损失(y_true 为 one-hot)。"""
    n = y_true.shape[0]
    return -np.sum(y_true * np.log(y_pred + 1e-9)) / n

class NeuralNetwork:
    """
    两层全连接神经网络:
      输入层(n_in) → 隐藏层(n_hidden, ReLU) → 输出层(n_out, Softmax)
    """

    def __init__(self, n_in: int, n_hidden: int, n_out: int,
                 lr: float = 0.01, seed: int = 42) -> None:
        rng = np.random.RandomState(seed)
        # He 初始化(适合 ReLU)
        self.W1: np.ndarray = rng.randn(n_in, n_hidden) * np.sqrt(2.0 / n_in)
        self.b1: np.ndarray = np.zeros((1, n_hidden))
        self.W2: np.ndarray = rng.randn(n_hidden, n_out) * np.sqrt(2.0 / n_hidden)
        self.b2: np.ndarray = np.zeros((1, n_out))
        self.lr = lr
        # 缓存前向传播中间值(反向传播需要)
        self._cache: dict = {}

    def forward(self, X: np.ndarray) -> np.ndarray:
        Z1 = X @ self.W1 + self.b1          # (n, hidden)
        A1 = relu(Z1)                        # (n, hidden)
        Z2 = A1 @ self.W2 + self.b2         # (n, out)
        A2 = softmax(Z2)                     # (n, out)
        self._cache = {"X": X, "Z1": Z1, "A1": A1, "Z2": Z2, "A2": A2}
        return A2

    def backward(self, y_true: np.ndarray) -> dict[str, float]:
        """反向传播,更新参数,返回各层梯度范数(用于监控)。"""
        n = y_true.shape[0]
        X, Z1, A1, A2 = (self._cache[k] for k in ("X", "Z1", "A1", "A2"))

        # 输出层梯度(softmax + cross-entropy 联合求导,结果简洁)
        dZ2 = (A2 - y_true) / n             # (n, out)
        dW2 = A1.T @ dZ2                    # (hidden, out)
        db2 = dZ2.sum(axis=0, keepdims=True)

        # 隐藏层梯度(链式法则)
        dA1 = dZ2 @ self.W2.T               # (n, hidden)
        dZ1 = dA1 * relu_grad(Z1)           # (n, hidden)
        dW1 = X.T @ dZ1                     # (in, hidden)
        db1 = dZ1.sum(axis=0, keepdims=True)

        # 梯度下降更新
        self.W2 -= self.lr * dW2
        self.b2 -= self.lr * db2
        self.W1 -= self.lr * dW1
        self.b1 -= self.lr * db1

        return {
            "grad_W1": float(np.linalg.norm(dW1)),
            "grad_W2": float(np.linalg.norm(dW2)),
        }

    def predict(self, X: np.ndarray) -> np.ndarray:
        return self.forward(X).argmax(axis=1)

    def fit(self, X: np.ndarray, y_onehot: np.ndarray,
            epochs: int = 200, batch_size: int = 64,
            verbose: bool = True) -> list[float]:
        """Mini-batch 梯度下降训练。"""
        n = X.shape[0]
        loss_history: list[float] = []
        for epoch in range(1, epochs + 1):
            # shuffle
            idx = np.random.permutation(n)
            X_s, y_s = X[idx], y_onehot[idx]
            epoch_loss = 0.0
            for start in range(0, n, batch_size):
                Xb = X_s[start:start + batch_size]
                yb = y_s[start:start + batch_size]
                pred = self.forward(Xb)
                epoch_loss += cross_entropy(pred, yb) * len(Xb)
                self.backward(yb)
            epoch_loss /= n
            loss_history.append(epoch_loss)
            if verbose and epoch % 50 == 0:
                print(f"    epoch {epoch:>4}  loss={epoch_loss:.4f}")
        return loss_history

def mode_forward() -> None:
    print("\n" + "="*60 + "\n  前向传播逐步演示(小型网络,3个样本)\n" + "="*60)
    np.random.seed(42)
    X = np.random.randn(3, 4)   # 3个样本,4个特征
    nn = NeuralNetwork(n_in=4, n_hidden=5, n_out=3, lr=0.01)

    print(f"\n  输入 X.shape = {X.shape}")
    Z1 = X @ nn.W1 + nn.b1
    print(f"  Z1 = X·W1+b1  shape={Z1.shape}  (线性变换)")
    A1 = relu(Z1)
    print(f"  A1 = ReLU(Z1) shape={A1.shape}  (激活,负值归零)")
    Z2 = A1 @ nn.W2 + nn.b2
    print(f"  Z2 = A1·W2+b2 shape={Z2.shape}  (输出层线性变换)")
    A2 = softmax(Z2)
    print(f"  A2 = Softmax  shape={A2.shape}  (概率分布)")
    print(f"\n  预测概率(每行和为1):")
    for i, row in enumerate(A2):
        bar = "  ".join(f"类{j}:{p:.3f}" for j, p in enumerate(row))
        print(f"    样本{i}: {bar}  → 预测类{row.argmax()}")

mode_forward()

Step 7:用 mode_train 跑完整训练并观察损失曲线

痛点与机制

训练不是看一次输出,而是看损失有没有持续下降。mode_train() 把数据生成、网络初始化、训练循环、ASCII 损失曲线和测试准确率串起来。曲线条越来越短,说明模型像学生刷题一样,错题越来越少。

核心源码(逐字来自文末完整源码)

def mode_train() -> None:
    print("\n" + "="*60 + "\n  完整训练循环(Mini-batch 梯度下降)\n" + "="*60)
    X_tr, X_te, y_tr, y_te, y_tr_oh, _ = make_data(n_classes=3)

    nn = NeuralNetwork(n_in=10, n_hidden=32, n_out=3, lr=0.05)
    print(f"\n  网络结构: 10 → 32(ReLU) → 3(Softmax)")
    print(f"  训练集: {len(X_tr)} 样本  测试集: {len(X_te)} 样本\n")

    loss_history = nn.fit(X_tr, y_tr_oh, epochs=200, batch_size=64)

    # ASCII 损失曲线
    print("\n  训练损失曲线(每50轮采样)")
    sampled = loss_history[::50] + [loss_history[-1]]
    max_loss = max(sampled)
    W = 40
    for i, loss in enumerate(sampled):
        bar = "█" * int(loss / max_loss * W)
        epoch = min(i * 50, 200)
        print(f"  epoch {epoch:>3} │{bar:<{W}}│ {loss:.4f}")

    test_acc = accuracy_score(y_te, nn.predict(X_te))
    print(f"\n  最终测试准确率: {test_acc:.4f}")

可运行演示(补齐 Mock 数据与 print 反馈)

from typing import Optional

import numpy as np
from sklearn.datasets import make_classification
from sklearn.metrics import accuracy_score
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler


def relu(z: np.ndarray) -> np.ndarray:
    return np.maximum(0.0, z)

def relu_grad(z: np.ndarray) -> np.ndarray:
    """ReLU 的导数:z>0 时为1,否则为0。"""
    return (z > 0).astype(float)

def sigmoid(z: np.ndarray) -> np.ndarray:
    return 1.0 / (1.0 + np.exp(-np.clip(z, -500, 500)))

def softmax(z: np.ndarray) -> np.ndarray:
    e = np.exp(z - z.max(axis=1, keepdims=True))   # 数值稳定
    return e / e.sum(axis=1, keepdims=True)

def cross_entropy(y_pred: np.ndarray, y_true: np.ndarray) -> float:
    """多分类交叉熵损失(y_true 为 one-hot)。"""
    n = y_true.shape[0]
    return -np.sum(y_true * np.log(y_pred + 1e-9)) / n

class NeuralNetwork:
    """
    两层全连接神经网络:
      输入层(n_in) → 隐藏层(n_hidden, ReLU) → 输出层(n_out, Softmax)
    """

    def __init__(self, n_in: int, n_hidden: int, n_out: int,
                 lr: float = 0.01, seed: int = 42) -> None:
        rng = np.random.RandomState(seed)
        # He 初始化(适合 ReLU)
        self.W1: np.ndarray = rng.randn(n_in, n_hidden) * np.sqrt(2.0 / n_in)
        self.b1: np.ndarray = np.zeros((1, n_hidden))
        self.W2: np.ndarray = rng.randn(n_hidden, n_out) * np.sqrt(2.0 / n_hidden)
        self.b2: np.ndarray = np.zeros((1, n_out))
        self.lr = lr
        # 缓存前向传播中间值(反向传播需要)
        self._cache: dict = {}

    def forward(self, X: np.ndarray) -> np.ndarray:
        Z1 = X @ self.W1 + self.b1          # (n, hidden)
        A1 = relu(Z1)                        # (n, hidden)
        Z2 = A1 @ self.W2 + self.b2         # (n, out)
        A2 = softmax(Z2)                     # (n, out)
        self._cache = {"X": X, "Z1": Z1, "A1": A1, "Z2": Z2, "A2": A2}
        return A2

    def backward(self, y_true: np.ndarray) -> dict[str, float]:
        """反向传播,更新参数,返回各层梯度范数(用于监控)。"""
        n = y_true.shape[0]
        X, Z1, A1, A2 = (self._cache[k] for k in ("X", "Z1", "A1", "A2"))

        # 输出层梯度(softmax + cross-entropy 联合求导,结果简洁)
        dZ2 = (A2 - y_true) / n             # (n, out)
        dW2 = A1.T @ dZ2                    # (hidden, out)
        db2 = dZ2.sum(axis=0, keepdims=True)

        # 隐藏层梯度(链式法则)
        dA1 = dZ2 @ self.W2.T               # (n, hidden)
        dZ1 = dA1 * relu_grad(Z1)           # (n, hidden)
        dW1 = X.T @ dZ1                     # (in, hidden)
        db1 = dZ1.sum(axis=0, keepdims=True)

        # 梯度下降更新
        self.W2 -= self.lr * dW2
        self.b2 -= self.lr * db2
        self.W1 -= self.lr * dW1
        self.b1 -= self.lr * db1

        return {
            "grad_W1": float(np.linalg.norm(dW1)),
            "grad_W2": float(np.linalg.norm(dW2)),
        }

    def predict(self, X: np.ndarray) -> np.ndarray:
        return self.forward(X).argmax(axis=1)

    def fit(self, X: np.ndarray, y_onehot: np.ndarray,
            epochs: int = 200, batch_size: int = 64,
            verbose: bool = True) -> list[float]:
        """Mini-batch 梯度下降训练。"""
        n = X.shape[0]
        loss_history: list[float] = []
        for epoch in range(1, epochs + 1):
            # shuffle
            idx = np.random.permutation(n)
            X_s, y_s = X[idx], y_onehot[idx]
            epoch_loss = 0.0
            for start in range(0, n, batch_size):
                Xb = X_s[start:start + batch_size]
                yb = y_s[start:start + batch_size]
                pred = self.forward(Xb)
                epoch_loss += cross_entropy(pred, yb) * len(Xb)
                self.backward(yb)
            epoch_loss /= n
            loss_history.append(epoch_loss)
            if verbose and epoch % 50 == 0:
                print(f"    epoch {epoch:>4}  loss={epoch_loss:.4f}")
        return loss_history

def make_data(n_classes: int = 3) -> tuple:
    X, y = make_classification(
        n_samples=800, n_features=10, n_informative=6,
        n_classes=n_classes, n_clusters_per_class=1, random_state=42,
    )
    scaler = StandardScaler()
    X = scaler.fit_transform(X)
    X_tr, X_te, y_tr, y_te = train_test_split(X, y, test_size=0.2, random_state=42)
    # one-hot 编码
    def onehot(labels: np.ndarray, n: int) -> np.ndarray:
        oh = np.zeros((len(labels), n))
        oh[np.arange(len(labels)), labels] = 1
        return oh
    return X_tr, X_te, y_tr, y_te, onehot(y_tr, n_classes), onehot(y_te, n_classes)

def mode_train() -> None:
    print("\n" + "="*60 + "\n  完整训练循环(Mini-batch 梯度下降)\n" + "="*60)
    X_tr, X_te, y_tr, y_te, y_tr_oh, _ = make_data(n_classes=3)

    nn = NeuralNetwork(n_in=10, n_hidden=32, n_out=3, lr=0.05)
    print(f"\n  网络结构: 10 → 32(ReLU) → 3(Softmax)")
    print(f"  训练集: {len(X_tr)} 样本  测试集: {len(X_te)} 样本\n")

    loss_history = nn.fit(X_tr, y_tr_oh, epochs=200, batch_size=64)

    # ASCII 损失曲线
    print("\n  训练损失曲线(每50轮采样)")
    sampled = loss_history[::50] + [loss_history[-1]]
    max_loss = max(sampled)
    W = 40
    for i, loss in enumerate(sampled):
        bar = "█" * int(loss / max_loss * W)
        epoch = min(i * 50, 200)
        print(f"  epoch {epoch:>3}{bar:<{W}}{loss:.4f}")

    test_acc = accuracy_score(y_te, nn.predict(X_te))
    print(f"\n  最终测试准确率: {test_acc:.4f}")

mode_train()

Step 8:用 mode_compare 对照 sklearn,验证手写实现是否靠谱

痛点与机制

手写网络不是为了取代 sklearn,而是为了看懂 sklearn 背后在做什么。mode_compare() 用同一份数据分别训练 numpy 手写 NN 和 MLPClassifier,如果准确率接近,说明我们的前向传播、反向传播和训练循环基本可信。

核心源码(逐字来自文末完整源码)

def mode_compare() -> None:
    print("\n" + "="*60 + "\n  手写 NN vs sklearn MLPClassifier\n" + "="*60)
    X_tr, X_te, y_tr, y_te, y_tr_oh, _ = make_data(n_classes=3)

    import time
    rows = []

    # 手写 NN
    t0 = time.perf_counter()
    nn = NeuralNetwork(n_in=10, n_hidden=32, n_out=3, lr=0.05)
    nn.fit(X_tr, y_tr_oh, epochs=200, batch_size=64, verbose=False)
    t1 = time.perf_counter()
    acc1 = accuracy_score(y_te, nn.predict(X_te))
    rows.append(["手写 NN (numpy)", "10→32→3", f"{acc1:.4f}", f"{(t1-t0)*1000:.0f}ms"])

    # sklearn MLP
    t0 = time.perf_counter()
    mlp = MLPClassifier(hidden_layer_sizes=(32,), max_iter=200,
                        learning_rate_init=0.05, random_state=42)
    mlp.fit(X_tr, y_tr)
    t1 = time.perf_counter()
    acc2 = accuracy_score(y_te, mlp.predict(X_te))
    rows.append(["sklearn MLP", "10→32→3", f"{acc2:.4f}", f"{(t1-t0)*1000:.0f}ms"])

    print(f"\n{'─'*62}")
    print(f"  {'实现':<20} {'结构':<12} {'测试准确率':<12} {'训练耗时'}")
    print(f"{'─'*62}")
    for row in rows:
        print(f"  {row[0]:<20} {row[1]:<12} {row[2]:<12} {row[3]}")
    print(f"{'─'*62}")
    print("\n  💡 结果相近说明手写实现正确;sklearn 更快因为有 BLAS 优化")

可运行演示(补齐 Mock 数据与 print 反馈)

from typing import Optional

import numpy as np
from sklearn.datasets import make_classification
from sklearn.metrics import accuracy_score
from sklearn.model_selection import train_test_split
from sklearn.neural_network import MLPClassifier
from sklearn.preprocessing import StandardScaler


def relu(z: np.ndarray) -> np.ndarray:
    return np.maximum(0.0, z)

def relu_grad(z: np.ndarray) -> np.ndarray:
    """ReLU 的导数:z>0 时为1,否则为0。"""
    return (z > 0).astype(float)

def sigmoid(z: np.ndarray) -> np.ndarray:
    return 1.0 / (1.0 + np.exp(-np.clip(z, -500, 500)))

def softmax(z: np.ndarray) -> np.ndarray:
    e = np.exp(z - z.max(axis=1, keepdims=True))   # 数值稳定
    return e / e.sum(axis=1, keepdims=True)

def cross_entropy(y_pred: np.ndarray, y_true: np.ndarray) -> float:
    """多分类交叉熵损失(y_true 为 one-hot)。"""
    n = y_true.shape[0]
    return -np.sum(y_true * np.log(y_pred + 1e-9)) / n

class NeuralNetwork:
    """
    两层全连接神经网络:
      输入层(n_in) → 隐藏层(n_hidden, ReLU) → 输出层(n_out, Softmax)
    """

    def __init__(self, n_in: int, n_hidden: int, n_out: int,
                 lr: float = 0.01, seed: int = 42) -> None:
        rng = np.random.RandomState(seed)
        # He 初始化(适合 ReLU)
        self.W1: np.ndarray = rng.randn(n_in, n_hidden) * np.sqrt(2.0 / n_in)
        self.b1: np.ndarray = np.zeros((1, n_hidden))
        self.W2: np.ndarray = rng.randn(n_hidden, n_out) * np.sqrt(2.0 / n_hidden)
        self.b2: np.ndarray = np.zeros((1, n_out))
        self.lr = lr
        # 缓存前向传播中间值(反向传播需要)
        self._cache: dict = {}

    def forward(self, X: np.ndarray) -> np.ndarray:
        Z1 = X @ self.W1 + self.b1          # (n, hidden)
        A1 = relu(Z1)                        # (n, hidden)
        Z2 = A1 @ self.W2 + self.b2         # (n, out)
        A2 = softmax(Z2)                     # (n, out)
        self._cache = {"X": X, "Z1": Z1, "A1": A1, "Z2": Z2, "A2": A2}
        return A2

    def backward(self, y_true: np.ndarray) -> dict[str, float]:
        """反向传播,更新参数,返回各层梯度范数(用于监控)。"""
        n = y_true.shape[0]
        X, Z1, A1, A2 = (self._cache[k] for k in ("X", "Z1", "A1", "A2"))

        # 输出层梯度(softmax + cross-entropy 联合求导,结果简洁)
        dZ2 = (A2 - y_true) / n             # (n, out)
        dW2 = A1.T @ dZ2                    # (hidden, out)
        db2 = dZ2.sum(axis=0, keepdims=True)

        # 隐藏层梯度(链式法则)
        dA1 = dZ2 @ self.W2.T               # (n, hidden)
        dZ1 = dA1 * relu_grad(Z1)           # (n, hidden)
        dW1 = X.T @ dZ1                     # (in, hidden)
        db1 = dZ1.sum(axis=0, keepdims=True)

        # 梯度下降更新
        self.W2 -= self.lr * dW2
        self.b2 -= self.lr * db2
        self.W1 -= self.lr * dW1
        self.b1 -= self.lr * db1

        return {
            "grad_W1": float(np.linalg.norm(dW1)),
            "grad_W2": float(np.linalg.norm(dW2)),
        }

    def predict(self, X: np.ndarray) -> np.ndarray:
        return self.forward(X).argmax(axis=1)

    def fit(self, X: np.ndarray, y_onehot: np.ndarray,
            epochs: int = 200, batch_size: int = 64,
            verbose: bool = True) -> list[float]:
        """Mini-batch 梯度下降训练。"""
        n = X.shape[0]
        loss_history: list[float] = []
        for epoch in range(1, epochs + 1):
            # shuffle
            idx = np.random.permutation(n)
            X_s, y_s = X[idx], y_onehot[idx]
            epoch_loss = 0.0
            for start in range(0, n, batch_size):
                Xb = X_s[start:start + batch_size]
                yb = y_s[start:start + batch_size]
                pred = self.forward(Xb)
                epoch_loss += cross_entropy(pred, yb) * len(Xb)
                self.backward(yb)
            epoch_loss /= n
            loss_history.append(epoch_loss)
            if verbose and epoch % 50 == 0:
                print(f"    epoch {epoch:>4}  loss={epoch_loss:.4f}")
        return loss_history

def make_data(n_classes: int = 3) -> tuple:
    X, y = make_classification(
        n_samples=800, n_features=10, n_informative=6,
        n_classes=n_classes, n_clusters_per_class=1, random_state=42,
    )
    scaler = StandardScaler()
    X = scaler.fit_transform(X)
    X_tr, X_te, y_tr, y_te = train_test_split(X, y, test_size=0.2, random_state=42)
    # one-hot 编码
    def onehot(labels: np.ndarray, n: int) -> np.ndarray:
        oh = np.zeros((len(labels), n))
        oh[np.arange(len(labels)), labels] = 1
        return oh
    return X_tr, X_te, y_tr, y_te, onehot(y_tr, n_classes), onehot(y_te, n_classes)

def mode_compare() -> None:
    print("\n" + "="*60 + "\n  手写 NN vs sklearn MLPClassifier\n" + "="*60)
    X_tr, X_te, y_tr, y_te, y_tr_oh, _ = make_data(n_classes=3)

    import time
    rows = []

    # 手写 NN
    t0 = time.perf_counter()
    nn = NeuralNetwork(n_in=10, n_hidden=32, n_out=3, lr=0.05)
    nn.fit(X_tr, y_tr_oh, epochs=200, batch_size=64, verbose=False)
    t1 = time.perf_counter()
    acc1 = accuracy_score(y_te, nn.predict(X_te))
    rows.append(["手写 NN (numpy)", "10→32→3", f"{acc1:.4f}", f"{(t1-t0)*1000:.0f}ms"])

    # sklearn MLP
    t0 = time.perf_counter()
    mlp = MLPClassifier(hidden_layer_sizes=(32,), max_iter=200,
                        learning_rate_init=0.05, random_state=42)
    mlp.fit(X_tr, y_tr)
    t1 = time.perf_counter()
    acc2 = accuracy_score(y_te, mlp.predict(X_te))
    rows.append(["sklearn MLP", "10→32→3", f"{acc2:.4f}", f"{(t1-t0)*1000:.0f}ms"])

    print(f"\n{'─'*62}")
    print(f"  {'实现':<20} {'结构':<12} {'测试准确率':<12} {'训练耗时'}")
    print(f"{'─'*62}")
    for row in rows:
        print(f"  {row[0]:<20} {row[1]:<12} {row[2]:<12} {row[3]}")
    print(f"{'─'*62}")
    print("\n  💡 结果相近说明手写实现正确;sklearn 更快因为有 BLAS 优化")

mode_compare()

极客实战:完整源码与运行

现在,把上面的积木拼起来,将下面完整代码保存为 48-python-nn-from-scratch.py。它会生成一份内置分类数据集,完成前向传播、训练循环和 sklearn 对照实验。

#!/usr/bin/env python3
"""
48-python-nn-from-scratch.py — 纯 numpy 手写神经网络

用法:
  python3 48-python-nn-from-scratch.py --mode forward   # 前向传播演示
  python3 48-python-nn-from-scratch.py --mode train     # 完整训练循环
  python3 48-python-nn-from-scratch.py --mode compare   # 与 sklearn MLP 对比
  python3 48-python-nn-from-scratch.py --mode all       # 全部(默认)

依赖 numpy + scikit-learn,直接运行。
"""

import argparse
from typing import Optional

import numpy as np
from sklearn.datasets import make_classification
from sklearn.metrics import accuracy_score
from sklearn.model_selection import train_test_split
from sklearn.neural_network import MLPClassifier
from sklearn.preprocessing import StandardScaler



def relu(z: np.ndarray) -> np.ndarray:
    return np.maximum(0.0, z)

def relu_grad(z: np.ndarray) -> np.ndarray:
    """ReLU 的导数:z>0 时为1,否则为0。"""
    return (z > 0).astype(float)

def sigmoid(z: np.ndarray) -> np.ndarray:
    return 1.0 / (1.0 + np.exp(-np.clip(z, -500, 500)))

def softmax(z: np.ndarray) -> np.ndarray:
    e = np.exp(z - z.max(axis=1, keepdims=True))   # 数值稳定
    return e / e.sum(axis=1, keepdims=True)

def cross_entropy(y_pred: np.ndarray, y_true: np.ndarray) -> float:
    """多分类交叉熵损失(y_true 为 one-hot)。"""
    n = y_true.shape[0]
    return -np.sum(y_true * np.log(y_pred + 1e-9)) / n

# ─── 神经网络类 ────────────────────────────────────────────────────────────────

class NeuralNetwork:
    """
    两层全连接神经网络:
      输入层(n_in) → 隐藏层(n_hidden, ReLU) → 输出层(n_out, Softmax)
    """

    def __init__(self, n_in: int, n_hidden: int, n_out: int,
                 lr: float = 0.01, seed: int = 42) -> None:
        rng = np.random.RandomState(seed)
        # He 初始化(适合 ReLU)
        self.W1: np.ndarray = rng.randn(n_in, n_hidden) * np.sqrt(2.0 / n_in)
        self.b1: np.ndarray = np.zeros((1, n_hidden))
        self.W2: np.ndarray = rng.randn(n_hidden, n_out) * np.sqrt(2.0 / n_hidden)
        self.b2: np.ndarray = np.zeros((1, n_out))
        self.lr = lr
        # 缓存前向传播中间值(反向传播需要)
        self._cache: dict = {}

    def forward(self, X: np.ndarray) -> np.ndarray:
        Z1 = X @ self.W1 + self.b1          # (n, hidden)
        A1 = relu(Z1)                        # (n, hidden)
        Z2 = A1 @ self.W2 + self.b2         # (n, out)
        A2 = softmax(Z2)                     # (n, out)
        self._cache = {"X": X, "Z1": Z1, "A1": A1, "Z2": Z2, "A2": A2}
        return A2

    def backward(self, y_true: np.ndarray) -> dict[str, float]:
        """反向传播,更新参数,返回各层梯度范数(用于监控)。"""
        n = y_true.shape[0]
        X, Z1, A1, A2 = (self._cache[k] for k in ("X", "Z1", "A1", "A2"))

        # 输出层梯度(softmax + cross-entropy 联合求导,结果简洁)
        dZ2 = (A2 - y_true) / n             # (n, out)
        dW2 = A1.T @ dZ2                    # (hidden, out)
        db2 = dZ2.sum(axis=0, keepdims=True)

        # 隐藏层梯度(链式法则)
        dA1 = dZ2 @ self.W2.T               # (n, hidden)
        dZ1 = dA1 * relu_grad(Z1)           # (n, hidden)
        dW1 = X.T @ dZ1                     # (in, hidden)
        db1 = dZ1.sum(axis=0, keepdims=True)

        # 梯度下降更新
        self.W2 -= self.lr * dW2
        self.b2 -= self.lr * db2
        self.W1 -= self.lr * dW1
        self.b1 -= self.lr * db1

        return {
            "grad_W1": float(np.linalg.norm(dW1)),
            "grad_W2": float(np.linalg.norm(dW2)),
        }

    def predict(self, X: np.ndarray) -> np.ndarray:
        return self.forward(X).argmax(axis=1)

    def fit(self, X: np.ndarray, y_onehot: np.ndarray,
            epochs: int = 200, batch_size: int = 64,
            verbose: bool = True) -> list[float]:
        """Mini-batch 梯度下降训练。"""
        n = X.shape[0]
        loss_history: list[float] = []
        for epoch in range(1, epochs + 1):
            # shuffle
            idx = np.random.permutation(n)
            X_s, y_s = X[idx], y_onehot[idx]
            epoch_loss = 0.0
            for start in range(0, n, batch_size):
                Xb = X_s[start:start + batch_size]
                yb = y_s[start:start + batch_size]
                pred = self.forward(Xb)
                epoch_loss += cross_entropy(pred, yb) * len(Xb)
                self.backward(yb)
            epoch_loss /= n
            loss_history.append(epoch_loss)
            if verbose and epoch % 50 == 0:
                print(f"    epoch {epoch:>4}  loss={epoch_loss:.4f}")
        return loss_history

# ─── 数据准备 ──────────────────────────────────────────────────────────────────

def make_data(n_classes: int = 3) -> tuple:
    X, y = make_classification(
        n_samples=800, n_features=10, n_informative=6,
        n_classes=n_classes, n_clusters_per_class=1, random_state=42,
    )
    scaler = StandardScaler()
    X = scaler.fit_transform(X)
    X_tr, X_te, y_tr, y_te = train_test_split(X, y, test_size=0.2, random_state=42)
    # one-hot 编码
    def onehot(labels: np.ndarray, n: int) -> np.ndarray:
        oh = np.zeros((len(labels), n))
        oh[np.arange(len(labels)), labels] = 1
        return oh
    return X_tr, X_te, y_tr, y_te, onehot(y_tr, n_classes), onehot(y_te, n_classes)

# ─── 模式1:前向传播演示 ───────────────────────────────────────────────────────

def mode_forward() -> None:
    print("\n" + "="*60 + "\n  前向传播逐步演示(小型网络,3个样本)\n" + "="*60)
    np.random.seed(42)
    X = np.random.randn(3, 4)   # 3个样本,4个特征
    nn = NeuralNetwork(n_in=4, n_hidden=5, n_out=3, lr=0.01)

    print(f"\n  输入 X.shape = {X.shape}")
    Z1 = X @ nn.W1 + nn.b1
    print(f"  Z1 = X·W1+b1  shape={Z1.shape}  (线性变换)")
    A1 = relu(Z1)
    print(f"  A1 = ReLU(Z1) shape={A1.shape}  (激活,负值归零)")
    Z2 = A1 @ nn.W2 + nn.b2
    print(f"  Z2 = A1·W2+b2 shape={Z2.shape}  (输出层线性变换)")
    A2 = softmax(Z2)
    print(f"  A2 = Softmax  shape={A2.shape}  (概率分布)")
    print(f"\n  预测概率(每行和为1):")
    for i, row in enumerate(A2):
        bar = "  ".join(f"类{j}:{p:.3f}" for j, p in enumerate(row))
        print(f"    样本{i}: {bar}  → 预测类{row.argmax()}")

# ─── 模式2:完整训练循环 ───────────────────────────────────────────────────────

def mode_train() -> None:
    print("\n" + "="*60 + "\n  完整训练循环(Mini-batch 梯度下降)\n" + "="*60)
    X_tr, X_te, y_tr, y_te, y_tr_oh, _ = make_data(n_classes=3)

    nn = NeuralNetwork(n_in=10, n_hidden=32, n_out=3, lr=0.05)
    print(f"\n  网络结构: 10 → 32(ReLU) → 3(Softmax)")
    print(f"  训练集: {len(X_tr)} 样本  测试集: {len(X_te)} 样本\n")

    loss_history = nn.fit(X_tr, y_tr_oh, epochs=200, batch_size=64)

    # ASCII 损失曲线
    print("\n  训练损失曲线(每50轮采样)")
    sampled = loss_history[::50] + [loss_history[-1]]
    max_loss = max(sampled)
    W = 40
    for i, loss in enumerate(sampled):
        bar = "█" * int(loss / max_loss * W)
        epoch = min(i * 50, 200)
        print(f"  epoch {epoch:>3}{bar:<{W}}{loss:.4f}")

    test_acc = accuracy_score(y_te, nn.predict(X_te))
    print(f"\n  最终测试准确率: {test_acc:.4f}")

# ─── 模式3:与 sklearn MLP 对比 ───────────────────────────────────────────────

def mode_compare() -> None:
    print("\n" + "="*60 + "\n  手写 NN vs sklearn MLPClassifier\n" + "="*60)
    X_tr, X_te, y_tr, y_te, y_tr_oh, _ = make_data(n_classes=3)

    import time
    rows = []

    # 手写 NN
    t0 = time.perf_counter()
    nn = NeuralNetwork(n_in=10, n_hidden=32, n_out=3, lr=0.05)
    nn.fit(X_tr, y_tr_oh, epochs=200, batch_size=64, verbose=False)
    t1 = time.perf_counter()
    acc1 = accuracy_score(y_te, nn.predict(X_te))
    rows.append(["手写 NN (numpy)", "10→32→3", f"{acc1:.4f}", f"{(t1-t0)*1000:.0f}ms"])

    # sklearn MLP
    t0 = time.perf_counter()
    mlp = MLPClassifier(hidden_layer_sizes=(32,), max_iter=200,
                        learning_rate_init=0.05, random_state=42)
    mlp.fit(X_tr, y_tr)
    t1 = time.perf_counter()
    acc2 = accuracy_score(y_te, mlp.predict(X_te))
    rows.append(["sklearn MLP", "10→32→3", f"{acc2:.4f}", f"{(t1-t0)*1000:.0f}ms"])

    print(f"\n{'─'*62}")
    print(f"  {'实现':<20} {'结构':<12} {'测试准确率':<12} {'训练耗时'}")
    print(f"{'─'*62}")
    for row in rows:
        print(f"  {row[0]:<20} {row[1]:<12} {row[2]:<12} {row[3]}")
    print(f"{'─'*62}")
    print("\n  💡 结果相近说明手写实现正确;sklearn 更快因为有 BLAS 优化")

# ─── 入口 ─────────────────────────────────────────────────────────────────────

def main() -> None:
    parser = argparse.ArgumentParser(description="纯 numpy 手写神经网络")
    parser.add_argument(
        "--mode",
        choices=["forward", "train", "compare", "all"],
        default="all",
    )
    args = parser.parse_args()
    dispatch = {
        "forward": mode_forward,
        "train":   mode_train,
        "compare": mode_compare,
        "all":     lambda: [mode_forward(), mode_train(), mode_compare()],
    }
    dispatch[args.mode]()


if __name__ == "__main__":
    main()
$ python 48-python-nn-from-scratch.py --mode forward
============================================================
  前向传播逐步演示(小型网络,3个样本)
============================================================

  输入 X.shape = (3, 4)
  Z1 = X·W1+b1  shape=(3, 5)  (线性变换)
  A1 = ReLU(Z1) shape=(3, 5)  (激活,负值归零)
  Z2 = A1·W2+b2 shape=(3, 3)  (输出层线性变换)
  A2 = Softmax  shape=(3, 3)  (概率分布)

  预测概率(每行和为1):
    样本0: 类0:0.249  类1:0.445  类2:0.307  → 预测类1
    样本1: 类0:0.303  类1:0.370  类2:0.326  → 预测类1
    样本2: 类0:0.105  类1:0.073  类2:0.822  → 预测类2

$ python 48-python-nn-from-scratch.py --mode train
============================================================
  完整训练循环(Mini-batch 梯度下降)
============================================================

  网络结构: 10 → 32(ReLU) → 3(Softmax)
  训练集: 640 样本  测试集: 160 样本

    epoch   50  loss=0.2480
    epoch  100  loss=0.1928
    epoch  150  loss=0.1629
    epoch  200  loss=0.1419

  训练损失曲线(每50轮采样)
  epoch   0 │████████████████████████████████████████│ 1.1359
  epoch  50 │████████                                │ 0.2465
  epoch 100 │██████                                  │ 0.1922
  epoch 150 │█████                                   │ 0.1626
  epoch 200 │████                                    │ 0.1419

  最终测试准确率: 0.9313

小结与 NexDo Time ⚡

这一篇你已经从零跑通了神经网络训练的核心闭环:激活函数负责制造非线性,Softmax 把输出变成概率,交叉熵负责衡量错误,反向传播把错误沿着网络往回传,Mini-batch 梯度下降负责一点点更新参数。

5 分钟微操挑战:把 NeuralNetwork(n_in=10, n_hidden=32, n_out=3, lr=0.05) 里的 n_hidden 分别改成 81664,运行 --mode train,观察损失曲线和测试准确率有什么变化。

Don’t wait for next time, do it in the next moment.