48 · 神经网络基础:从零手写前向传播与反向传播
🔗 知识图谱导航:阅读本文前,建议先回顾《31 · NumPy 高性能数组计算》里的矩阵乘法,以及《47 · 神经网络原理揭秘与本地 AI Agent 调度》里的神经网络直觉。本文会把这些概念落到纯 numpy 手写训练代码里。 NexDo Time · 2026-04-17 · 预计阅读 32 分钟
痛点与架构
框架能让你三行代码训练模型,但也容易把最关键的过程藏起来:前向传播到底怎么算?损失为什么能下降?反向传播怎么知道该改哪个权重?这一篇不直接调用深度学习框架,而是用 numpy 手写一个两层全连接神经网络。
你可以把整套训练流程想成“学生刷题”:前向传播是先答题,损失函数是老师批卷,反向传播是分析错因,梯度下降是订正错题。重复很多轮之后,模型就从错误里学到了参数。
数据 X -> W1/b1 -> ReLU -> W2/b2 -> Softmax -> 预测概率
│
真实标签 one-hot -> CrossEntropy <----┘
│
Backward -> 更新 W1/W2/b1/b2
步步为营:核心逻辑自适应拆解
这篇的代码比前一篇更接近训练框架底层,所以我们拆成 8 个小步骤。每个步骤先看文末完整源码里的真实片段,再运行一个补齐上下文的小演示。不要急着背公式,先把每一段输出看懂。
Step 1:用激活函数和交叉熵把“打分”变成“可学习的误差”
痛点与机制:
神经网络先输出一堆原始分数,这些分数本身不好解释。ReLU 像门卫,负数不让进;Softmax 像把票数换算成百分比,让每一行概率加起来等于 1;交叉熵像老师批卷,模型越自信地选错,扣分越狠。先跑这一小段,你会看到“分数 -> 概率 -> 损失”的完整链路。
核心源码(逐字来自文末完整源码):
def relu(z: np.ndarray) -> np.ndarray:
return np.maximum(0.0, z)
def relu_grad(z: np.ndarray) -> np.ndarray:
"""ReLU 的导数:z>0 时为1,否则为0。"""
return (z > 0).astype(float)
def sigmoid(z: np.ndarray) -> np.ndarray:
return 1.0 / (1.0 + np.exp(-np.clip(z, -500, 500)))
def softmax(z: np.ndarray) -> np.ndarray:
e = np.exp(z - z.max(axis=1, keepdims=True)) # 数值稳定
return e / e.sum(axis=1, keepdims=True)
def cross_entropy(y_pred: np.ndarray, y_true: np.ndarray) -> float:
"""多分类交叉熵损失(y_true 为 one-hot)。"""
n = y_true.shape[0]
return -np.sum(y_true * np.log(y_pred + 1e-9)) / n
可运行演示(补齐 Mock 数据与 print 反馈):
import numpy as np
def relu(z: np.ndarray) -> np.ndarray:
return np.maximum(0.0, z)
def relu_grad(z: np.ndarray) -> np.ndarray:
"""ReLU 的导数:z>0 时为1,否则为0。"""
return (z > 0).astype(float)
def sigmoid(z: np.ndarray) -> np.ndarray:
return 1.0 / (1.0 + np.exp(-np.clip(z, -500, 500)))
def softmax(z: np.ndarray) -> np.ndarray:
e = np.exp(z - z.max(axis=1, keepdims=True)) # 数值稳定
return e / e.sum(axis=1, keepdims=True)
def cross_entropy(y_pred: np.ndarray, y_true: np.ndarray) -> float:
"""多分类交叉熵损失(y_true 为 one-hot)。"""
n = y_true.shape[0]
return -np.sum(y_true * np.log(y_pred + 1e-9)) / n
logits = np.array([[-1.0, 0.5, 2.0]])
labels = np.array([[0.0, 0.0, 1.0]])
print("原始打分 logits:", logits.tolist())
print("ReLU 后:", relu(logits).tolist())
print("Sigmoid 后:", np.round(sigmoid(logits), 4).tolist())
prob = softmax(logits)
print("Softmax 概率:", np.round(prob, 4).tolist(), "每行和=", round(float(prob.sum()), 4))
print("交叉熵损失:", round(cross_entropy(prob, labels), 4))
Step 2:用初始化和 forward 看懂参数矩阵怎么接线
痛点与机制:
网络结构不是魔法,而是一组矩阵接线。W1 负责把 4 个输入特征接到 5 个隐藏神经元,W2 再把隐藏层接到 3 个类别。可以把每个权重矩阵想成配电箱,输入电流经过不同线路,最后在输出层形成各类别概率。
核心源码(逐字来自文末完整源码):
def __init__(self, n_in: int, n_hidden: int, n_out: int,
lr: float = 0.01, seed: int = 42) -> None:
rng = np.random.RandomState(seed)
# He 初始化(适合 ReLU)
self.W1: np.ndarray = rng.randn(n_in, n_hidden) * np.sqrt(2.0 / n_in)
self.b1: np.ndarray = np.zeros((1, n_hidden))
self.W2: np.ndarray = rng.randn(n_hidden, n_out) * np.sqrt(2.0 / n_hidden)
self.b2: np.ndarray = np.zeros((1, n_out))
self.lr = lr
# 缓存前向传播中间值(反向传播需要)
self._cache: dict = {}
def forward(self, X: np.ndarray) -> np.ndarray:
Z1 = X @ self.W1 + self.b1 # (n, hidden)
A1 = relu(Z1) # (n, hidden)
Z2 = A1 @ self.W2 + self.b2 # (n, out)
A2 = softmax(Z2) # (n, out)
self._cache = {"X": X, "Z1": Z1, "A1": A1, "Z2": Z2, "A2": A2}
return A2
可运行演示(补齐 Mock 数据与 print 反馈):
from typing import Optional
import numpy as np
def relu(z: np.ndarray) -> np.ndarray:
return np.maximum(0.0, z)
def relu_grad(z: np.ndarray) -> np.ndarray:
"""ReLU 的导数:z>0 时为1,否则为0。"""
return (z > 0).astype(float)
def sigmoid(z: np.ndarray) -> np.ndarray:
return 1.0 / (1.0 + np.exp(-np.clip(z, -500, 500)))
def softmax(z: np.ndarray) -> np.ndarray:
e = np.exp(z - z.max(axis=1, keepdims=True)) # 数值稳定
return e / e.sum(axis=1, keepdims=True)
def cross_entropy(y_pred: np.ndarray, y_true: np.ndarray) -> float:
"""多分类交叉熵损失(y_true 为 one-hot)。"""
n = y_true.shape[0]
return -np.sum(y_true * np.log(y_pred + 1e-9)) / n
class NeuralNetwork:
"""只保留初始化和前向传播,先看参数形状。"""
def __init__(self, n_in: int, n_hidden: int, n_out: int,
lr: float = 0.01, seed: int = 42) -> None:
rng = np.random.RandomState(seed)
# He 初始化(适合 ReLU)
self.W1: np.ndarray = rng.randn(n_in, n_hidden) * np.sqrt(2.0 / n_in)
self.b1: np.ndarray = np.zeros((1, n_hidden))
self.W2: np.ndarray = rng.randn(n_hidden, n_out) * np.sqrt(2.0 / n_hidden)
self.b2: np.ndarray = np.zeros((1, n_out))
self.lr = lr
# 缓存前向传播中间值(反向传播需要)
self._cache: dict = {}
def forward(self, X: np.ndarray) -> np.ndarray:
Z1 = X @ self.W1 + self.b1 # (n, hidden)
A1 = relu(Z1) # (n, hidden)
Z2 = A1 @ self.W2 + self.b2 # (n, out)
A2 = softmax(Z2) # (n, out)
self._cache = {"X": X, "Z1": Z1, "A1": A1, "Z2": Z2, "A2": A2}
return A2
X = np.array([[0.2, -0.4, 1.0, 0.7]])
model = NeuralNetwork(n_in=4, n_hidden=5, n_out=3, lr=0.01)
prob = model.forward(X)
print("W1 形状:", model.W1.shape, "负责 4 个输入 -> 5 个隐藏神经元")
print("W2 形状:", model.W2.shape, "负责 5 个隐藏神经元 -> 3 个类别")
print("输出概率:", np.round(prob, 4).tolist())
print("预测类别:", int(prob.argmax(axis=1)[0]))
Step 3:用 backward 把错误沿链式法则往回传
痛点与机制:
反向传播像复盘错题:先看最终答案错了多少,再倒推是输出层权重错得多,还是隐藏层特征提取错得多。dZ2 -> dW2 -> dA1 -> dZ1 -> dW1 就是这条追责链。运行演示会打印更新前后损失和梯度范数,帮助你确认参数真的被调整了。
核心源码(逐字来自文末完整源码):
def backward(self, y_true: np.ndarray) -> dict[str, float]:
"""反向传播,更新参数,返回各层梯度范数(用于监控)。"""
n = y_true.shape[0]
X, Z1, A1, A2 = (self._cache[k] for k in ("X", "Z1", "A1", "A2"))
# 输出层梯度(softmax + cross-entropy 联合求导,结果简洁)
dZ2 = (A2 - y_true) / n # (n, out)
dW2 = A1.T @ dZ2 # (hidden, out)
db2 = dZ2.sum(axis=0, keepdims=True)
# 隐藏层梯度(链式法则)
dA1 = dZ2 @ self.W2.T # (n, hidden)
dZ1 = dA1 * relu_grad(Z1) # (n, hidden)
dW1 = X.T @ dZ1 # (in, hidden)
db1 = dZ1.sum(axis=0, keepdims=True)
# 梯度下降更新
self.W2 -= self.lr * dW2
self.b2 -= self.lr * db2
self.W1 -= self.lr * dW1
self.b1 -= self.lr * db1
return {
"grad_W1": float(np.linalg.norm(dW1)),
"grad_W2": float(np.linalg.norm(dW2)),
}
可运行演示(补齐 Mock 数据与 print 反馈):
from typing import Optional
import numpy as np
def relu(z: np.ndarray) -> np.ndarray:
return np.maximum(0.0, z)
def relu_grad(z: np.ndarray) -> np.ndarray:
"""ReLU 的导数:z>0 时为1,否则为0。"""
return (z > 0).astype(float)
def sigmoid(z: np.ndarray) -> np.ndarray:
return 1.0 / (1.0 + np.exp(-np.clip(z, -500, 500)))
def softmax(z: np.ndarray) -> np.ndarray:
e = np.exp(z - z.max(axis=1, keepdims=True)) # 数值稳定
return e / e.sum(axis=1, keepdims=True)
def cross_entropy(y_pred: np.ndarray, y_true: np.ndarray) -> float:
"""多分类交叉熵损失(y_true 为 one-hot)。"""
n = y_true.shape[0]
return -np.sum(y_true * np.log(y_pred + 1e-9)) / n
class NeuralNetwork:
"""
两层全连接神经网络:
输入层(n_in) → 隐藏层(n_hidden, ReLU) → 输出层(n_out, Softmax)
"""
def __init__(self, n_in: int, n_hidden: int, n_out: int,
lr: float = 0.01, seed: int = 42) -> None:
rng = np.random.RandomState(seed)
# He 初始化(适合 ReLU)
self.W1: np.ndarray = rng.randn(n_in, n_hidden) * np.sqrt(2.0 / n_in)
self.b1: np.ndarray = np.zeros((1, n_hidden))
self.W2: np.ndarray = rng.randn(n_hidden, n_out) * np.sqrt(2.0 / n_hidden)
self.b2: np.ndarray = np.zeros((1, n_out))
self.lr = lr
# 缓存前向传播中间值(反向传播需要)
self._cache: dict = {}
def forward(self, X: np.ndarray) -> np.ndarray:
Z1 = X @ self.W1 + self.b1 # (n, hidden)
A1 = relu(Z1) # (n, hidden)
Z2 = A1 @ self.W2 + self.b2 # (n, out)
A2 = softmax(Z2) # (n, out)
self._cache = {"X": X, "Z1": Z1, "A1": A1, "Z2": Z2, "A2": A2}
return A2
def backward(self, y_true: np.ndarray) -> dict[str, float]:
"""反向传播,更新参数,返回各层梯度范数(用于监控)。"""
n = y_true.shape[0]
X, Z1, A1, A2 = (self._cache[k] for k in ("X", "Z1", "A1", "A2"))
# 输出层梯度(softmax + cross-entropy 联合求导,结果简洁)
dZ2 = (A2 - y_true) / n # (n, out)
dW2 = A1.T @ dZ2 # (hidden, out)
db2 = dZ2.sum(axis=0, keepdims=True)
# 隐藏层梯度(链式法则)
dA1 = dZ2 @ self.W2.T # (n, hidden)
dZ1 = dA1 * relu_grad(Z1) # (n, hidden)
dW1 = X.T @ dZ1 # (in, hidden)
db1 = dZ1.sum(axis=0, keepdims=True)
# 梯度下降更新
self.W2 -= self.lr * dW2
self.b2 -= self.lr * db2
self.W1 -= self.lr * dW1
self.b1 -= self.lr * db1
return {
"grad_W1": float(np.linalg.norm(dW1)),
"grad_W2": float(np.linalg.norm(dW2)),
}
def predict(self, X: np.ndarray) -> np.ndarray:
return self.forward(X).argmax(axis=1)
def fit(self, X: np.ndarray, y_onehot: np.ndarray,
epochs: int = 200, batch_size: int = 64,
verbose: bool = True) -> list[float]:
"""Mini-batch 梯度下降训练。"""
n = X.shape[0]
loss_history: list[float] = []
for epoch in range(1, epochs + 1):
# shuffle
idx = np.random.permutation(n)
X_s, y_s = X[idx], y_onehot[idx]
epoch_loss = 0.0
for start in range(0, n, batch_size):
Xb = X_s[start:start + batch_size]
yb = y_s[start:start + batch_size]
pred = self.forward(Xb)
epoch_loss += cross_entropy(pred, yb) * len(Xb)
self.backward(yb)
epoch_loss /= n
loss_history.append(epoch_loss)
if verbose and epoch % 50 == 0:
print(f" epoch {epoch:>4} loss={epoch_loss:.4f}")
return loss_history
X = np.array([[0.2, -0.4, 1.0, 0.7], [1.2, 0.3, -0.8, 0.5]])
y = np.array([[1.0, 0.0, 0.0], [0.0, 1.0, 0.0]])
model = NeuralNetwork(n_in=4, n_hidden=5, n_out=3, lr=0.05)
before = cross_entropy(model.forward(X), y)
grads = model.backward(y)
after = cross_entropy(model.forward(X), y)
print("更新前损失:", round(before, 4))
print("梯度范数:", {k: round(v, 4) for k, v in grads.items()})
print("更新后损失:", round(after, 4))
print("直觉:反向传播像把错题原因一层层往回传,再微调每个旋钮。")
Step 4:用 fit 组织 Mini-batch 训练循环
痛点与机制:
单次反向传播只改一步,真正训练要重复很多轮。fit() 像健身计划:每轮先打乱样本,再分小批次训练,最后记录损失。Mini-batch 的好处是不用一次搬完整个数据集,也不会像单样本训练那样抖得太厉害。
核心源码(逐字来自文末完整源码):
def fit(self, X: np.ndarray, y_onehot: np.ndarray,
epochs: int = 200, batch_size: int = 64,
verbose: bool = True) -> list[float]:
"""Mini-batch 梯度下降训练。"""
n = X.shape[0]
loss_history: list[float] = []
for epoch in range(1, epochs + 1):
# shuffle
idx = np.random.permutation(n)
X_s, y_s = X[idx], y_onehot[idx]
epoch_loss = 0.0
for start in range(0, n, batch_size):
Xb = X_s[start:start + batch_size]
yb = y_s[start:start + batch_size]
pred = self.forward(Xb)
epoch_loss += cross_entropy(pred, yb) * len(Xb)
self.backward(yb)
epoch_loss /= n
loss_history.append(epoch_loss)
if verbose and epoch % 50 == 0:
print(f" epoch {epoch:>4} loss={epoch_loss:.4f}")
return loss_history
可运行演示(补齐 Mock 数据与 print 反馈):
from typing import Optional
import numpy as np
def relu(z: np.ndarray) -> np.ndarray:
return np.maximum(0.0, z)
def relu_grad(z: np.ndarray) -> np.ndarray:
"""ReLU 的导数:z>0 时为1,否则为0。"""
return (z > 0).astype(float)
def sigmoid(z: np.ndarray) -> np.ndarray:
return 1.0 / (1.0 + np.exp(-np.clip(z, -500, 500)))
def softmax(z: np.ndarray) -> np.ndarray:
e = np.exp(z - z.max(axis=1, keepdims=True)) # 数值稳定
return e / e.sum(axis=1, keepdims=True)
def cross_entropy(y_pred: np.ndarray, y_true: np.ndarray) -> float:
"""多分类交叉熵损失(y_true 为 one-hot)。"""
n = y_true.shape[0]
return -np.sum(y_true * np.log(y_pred + 1e-9)) / n
class NeuralNetwork:
"""
两层全连接神经网络:
输入层(n_in) → 隐藏层(n_hidden, ReLU) → 输出层(n_out, Softmax)
"""
def __init__(self, n_in: int, n_hidden: int, n_out: int,
lr: float = 0.01, seed: int = 42) -> None:
rng = np.random.RandomState(seed)
# He 初始化(适合 ReLU)
self.W1: np.ndarray = rng.randn(n_in, n_hidden) * np.sqrt(2.0 / n_in)
self.b1: np.ndarray = np.zeros((1, n_hidden))
self.W2: np.ndarray = rng.randn(n_hidden, n_out) * np.sqrt(2.0 / n_hidden)
self.b2: np.ndarray = np.zeros((1, n_out))
self.lr = lr
# 缓存前向传播中间值(反向传播需要)
self._cache: dict = {}
def forward(self, X: np.ndarray) -> np.ndarray:
Z1 = X @ self.W1 + self.b1 # (n, hidden)
A1 = relu(Z1) # (n, hidden)
Z2 = A1 @ self.W2 + self.b2 # (n, out)
A2 = softmax(Z2) # (n, out)
self._cache = {"X": X, "Z1": Z1, "A1": A1, "Z2": Z2, "A2": A2}
return A2
def backward(self, y_true: np.ndarray) -> dict[str, float]:
"""反向传播,更新参数,返回各层梯度范数(用于监控)。"""
n = y_true.shape[0]
X, Z1, A1, A2 = (self._cache[k] for k in ("X", "Z1", "A1", "A2"))
# 输出层梯度(softmax + cross-entropy 联合求导,结果简洁)
dZ2 = (A2 - y_true) / n # (n, out)
dW2 = A1.T @ dZ2 # (hidden, out)
db2 = dZ2.sum(axis=0, keepdims=True)
# 隐藏层梯度(链式法则)
dA1 = dZ2 @ self.W2.T # (n, hidden)
dZ1 = dA1 * relu_grad(Z1) # (n, hidden)
dW1 = X.T @ dZ1 # (in, hidden)
db1 = dZ1.sum(axis=0, keepdims=True)
# 梯度下降更新
self.W2 -= self.lr * dW2
self.b2 -= self.lr * db2
self.W1 -= self.lr * dW1
self.b1 -= self.lr * db1
return {
"grad_W1": float(np.linalg.norm(dW1)),
"grad_W2": float(np.linalg.norm(dW2)),
}
def predict(self, X: np.ndarray) -> np.ndarray:
return self.forward(X).argmax(axis=1)
def fit(self, X: np.ndarray, y_onehot: np.ndarray,
epochs: int = 200, batch_size: int = 64,
verbose: bool = True) -> list[float]:
"""Mini-batch 梯度下降训练。"""
n = X.shape[0]
loss_history: list[float] = []
for epoch in range(1, epochs + 1):
# shuffle
idx = np.random.permutation(n)
X_s, y_s = X[idx], y_onehot[idx]
epoch_loss = 0.0
for start in range(0, n, batch_size):
Xb = X_s[start:start + batch_size]
yb = y_s[start:start + batch_size]
pred = self.forward(Xb)
epoch_loss += cross_entropy(pred, yb) * len(Xb)
self.backward(yb)
epoch_loss /= n
loss_history.append(epoch_loss)
if verbose and epoch % 50 == 0:
print(f" epoch {epoch:>4} loss={epoch_loss:.4f}")
return loss_history
rng = np.random.RandomState(7)
X = rng.randn(12, 4)
labels = rng.randint(0, 3, size=12)
y = np.zeros((12, 3))
y[np.arange(12), labels] = 1
model = NeuralNetwork(n_in=4, n_hidden=6, n_out=3, lr=0.05)
history = model.fit(X, y, epochs=6, batch_size=4, verbose=False)
print("损失历史:", [round(v, 4) for v in history])
print("预测类别:", model.predict(X[:5]).tolist())
print("真实类别:", labels[:5].tolist())
Step 5:用 make_data 生成标准化训练集和 one-hot 标签
痛点与机制:
训练网络之前,数据要先整理成模型爱吃的形状。StandardScaler 像把不同量纲的食材切成差不多大小,one-hot 编码像答题卡:第 2 类就把第 2 个格子涂黑。这样交叉熵才能准确比较预测概率和真实答案。
核心源码(逐字来自文末完整源码):
def make_data(n_classes: int = 3) -> tuple:
X, y = make_classification(
n_samples=800, n_features=10, n_informative=6,
n_classes=n_classes, n_clusters_per_class=1, random_state=42,
)
scaler = StandardScaler()
X = scaler.fit_transform(X)
X_tr, X_te, y_tr, y_te = train_test_split(X, y, test_size=0.2, random_state=42)
# one-hot 编码
def onehot(labels: np.ndarray, n: int) -> np.ndarray:
oh = np.zeros((len(labels), n))
oh[np.arange(len(labels)), labels] = 1
return oh
return X_tr, X_te, y_tr, y_te, onehot(y_tr, n_classes), onehot(y_te, n_classes)
可运行演示(补齐 Mock 数据与 print 反馈):
import numpy as np
from sklearn.datasets import make_classification
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
def make_data(n_classes: int = 3) -> tuple:
X, y = make_classification(
n_samples=800, n_features=10, n_informative=6,
n_classes=n_classes, n_clusters_per_class=1, random_state=42,
)
scaler = StandardScaler()
X = scaler.fit_transform(X)
X_tr, X_te, y_tr, y_te = train_test_split(X, y, test_size=0.2, random_state=42)
# one-hot 编码
def onehot(labels: np.ndarray, n: int) -> np.ndarray:
oh = np.zeros((len(labels), n))
oh[np.arange(len(labels)), labels] = 1
return oh
return X_tr, X_te, y_tr, y_te, onehot(y_tr, n_classes), onehot(y_te, n_classes)
X_tr, X_te, y_tr, y_te, y_tr_oh, y_te_oh = make_data(n_classes=3)
print("训练特征形状:", X_tr.shape)
print("测试特征形状:", X_te.shape)
print("原始标签前 8 个:", y_tr[:8].tolist())
print("one-hot 标签前 2 行:", y_tr_oh[:2].astype(int).tolist())
print("说明:one-hot 就像考试答题卡,每个类别只有一个格子被涂黑。")
Step 6:用 mode_forward 逐层打印前向传播的形状
痛点与机制:
新手调神经网络最怕形状对不上。mode_forward() 故意只用 3 个样本,从 X、Z1、A1、Z2、A2 一路打印 shape,就像给流水线每一站装监控摄像头。只要形状能对上,后面的损失和反向传播才有基础。
核心源码(逐字来自文末完整源码):
def mode_forward() -> None:
print("\n" + "="*60 + "\n 前向传播逐步演示(小型网络,3个样本)\n" + "="*60)
np.random.seed(42)
X = np.random.randn(3, 4) # 3个样本,4个特征
nn = NeuralNetwork(n_in=4, n_hidden=5, n_out=3, lr=0.01)
print(f"\n 输入 X.shape = {X.shape}")
Z1 = X @ nn.W1 + nn.b1
print(f" Z1 = X·W1+b1 shape={Z1.shape} (线性变换)")
A1 = relu(Z1)
print(f" A1 = ReLU(Z1) shape={A1.shape} (激活,负值归零)")
Z2 = A1 @ nn.W2 + nn.b2
print(f" Z2 = A1·W2+b2 shape={Z2.shape} (输出层线性变换)")
A2 = softmax(Z2)
print(f" A2 = Softmax shape={A2.shape} (概率分布)")
print(f"\n 预测概率(每行和为1):")
for i, row in enumerate(A2):
bar = " ".join(f"类{j}:{p:.3f}" for j, p in enumerate(row))
print(f" 样本{i}: {bar} → 预测类{row.argmax()}")
可运行演示(补齐 Mock 数据与 print 反馈):
from typing import Optional
import numpy as np
def relu(z: np.ndarray) -> np.ndarray:
return np.maximum(0.0, z)
def relu_grad(z: np.ndarray) -> np.ndarray:
"""ReLU 的导数:z>0 时为1,否则为0。"""
return (z > 0).astype(float)
def sigmoid(z: np.ndarray) -> np.ndarray:
return 1.0 / (1.0 + np.exp(-np.clip(z, -500, 500)))
def softmax(z: np.ndarray) -> np.ndarray:
e = np.exp(z - z.max(axis=1, keepdims=True)) # 数值稳定
return e / e.sum(axis=1, keepdims=True)
def cross_entropy(y_pred: np.ndarray, y_true: np.ndarray) -> float:
"""多分类交叉熵损失(y_true 为 one-hot)。"""
n = y_true.shape[0]
return -np.sum(y_true * np.log(y_pred + 1e-9)) / n
class NeuralNetwork:
"""
两层全连接神经网络:
输入层(n_in) → 隐藏层(n_hidden, ReLU) → 输出层(n_out, Softmax)
"""
def __init__(self, n_in: int, n_hidden: int, n_out: int,
lr: float = 0.01, seed: int = 42) -> None:
rng = np.random.RandomState(seed)
# He 初始化(适合 ReLU)
self.W1: np.ndarray = rng.randn(n_in, n_hidden) * np.sqrt(2.0 / n_in)
self.b1: np.ndarray = np.zeros((1, n_hidden))
self.W2: np.ndarray = rng.randn(n_hidden, n_out) * np.sqrt(2.0 / n_hidden)
self.b2: np.ndarray = np.zeros((1, n_out))
self.lr = lr
# 缓存前向传播中间值(反向传播需要)
self._cache: dict = {}
def forward(self, X: np.ndarray) -> np.ndarray:
Z1 = X @ self.W1 + self.b1 # (n, hidden)
A1 = relu(Z1) # (n, hidden)
Z2 = A1 @ self.W2 + self.b2 # (n, out)
A2 = softmax(Z2) # (n, out)
self._cache = {"X": X, "Z1": Z1, "A1": A1, "Z2": Z2, "A2": A2}
return A2
def backward(self, y_true: np.ndarray) -> dict[str, float]:
"""反向传播,更新参数,返回各层梯度范数(用于监控)。"""
n = y_true.shape[0]
X, Z1, A1, A2 = (self._cache[k] for k in ("X", "Z1", "A1", "A2"))
# 输出层梯度(softmax + cross-entropy 联合求导,结果简洁)
dZ2 = (A2 - y_true) / n # (n, out)
dW2 = A1.T @ dZ2 # (hidden, out)
db2 = dZ2.sum(axis=0, keepdims=True)
# 隐藏层梯度(链式法则)
dA1 = dZ2 @ self.W2.T # (n, hidden)
dZ1 = dA1 * relu_grad(Z1) # (n, hidden)
dW1 = X.T @ dZ1 # (in, hidden)
db1 = dZ1.sum(axis=0, keepdims=True)
# 梯度下降更新
self.W2 -= self.lr * dW2
self.b2 -= self.lr * db2
self.W1 -= self.lr * dW1
self.b1 -= self.lr * db1
return {
"grad_W1": float(np.linalg.norm(dW1)),
"grad_W2": float(np.linalg.norm(dW2)),
}
def predict(self, X: np.ndarray) -> np.ndarray:
return self.forward(X).argmax(axis=1)
def fit(self, X: np.ndarray, y_onehot: np.ndarray,
epochs: int = 200, batch_size: int = 64,
verbose: bool = True) -> list[float]:
"""Mini-batch 梯度下降训练。"""
n = X.shape[0]
loss_history: list[float] = []
for epoch in range(1, epochs + 1):
# shuffle
idx = np.random.permutation(n)
X_s, y_s = X[idx], y_onehot[idx]
epoch_loss = 0.0
for start in range(0, n, batch_size):
Xb = X_s[start:start + batch_size]
yb = y_s[start:start + batch_size]
pred = self.forward(Xb)
epoch_loss += cross_entropy(pred, yb) * len(Xb)
self.backward(yb)
epoch_loss /= n
loss_history.append(epoch_loss)
if verbose and epoch % 50 == 0:
print(f" epoch {epoch:>4} loss={epoch_loss:.4f}")
return loss_history
def mode_forward() -> None:
print("\n" + "="*60 + "\n 前向传播逐步演示(小型网络,3个样本)\n" + "="*60)
np.random.seed(42)
X = np.random.randn(3, 4) # 3个样本,4个特征
nn = NeuralNetwork(n_in=4, n_hidden=5, n_out=3, lr=0.01)
print(f"\n 输入 X.shape = {X.shape}")
Z1 = X @ nn.W1 + nn.b1
print(f" Z1 = X·W1+b1 shape={Z1.shape} (线性变换)")
A1 = relu(Z1)
print(f" A1 = ReLU(Z1) shape={A1.shape} (激活,负值归零)")
Z2 = A1 @ nn.W2 + nn.b2
print(f" Z2 = A1·W2+b2 shape={Z2.shape} (输出层线性变换)")
A2 = softmax(Z2)
print(f" A2 = Softmax shape={A2.shape} (概率分布)")
print(f"\n 预测概率(每行和为1):")
for i, row in enumerate(A2):
bar = " ".join(f"类{j}:{p:.3f}" for j, p in enumerate(row))
print(f" 样本{i}: {bar} → 预测类{row.argmax()}")
mode_forward()
Step 7:用 mode_train 跑完整训练并观察损失曲线
痛点与机制:
训练不是看一次输出,而是看损失有没有持续下降。mode_train() 把数据生成、网络初始化、训练循环、ASCII 损失曲线和测试准确率串起来。曲线条越来越短,说明模型像学生刷题一样,错题越来越少。
核心源码(逐字来自文末完整源码):
def mode_train() -> None:
print("\n" + "="*60 + "\n 完整训练循环(Mini-batch 梯度下降)\n" + "="*60)
X_tr, X_te, y_tr, y_te, y_tr_oh, _ = make_data(n_classes=3)
nn = NeuralNetwork(n_in=10, n_hidden=32, n_out=3, lr=0.05)
print(f"\n 网络结构: 10 → 32(ReLU) → 3(Softmax)")
print(f" 训练集: {len(X_tr)} 样本 测试集: {len(X_te)} 样本\n")
loss_history = nn.fit(X_tr, y_tr_oh, epochs=200, batch_size=64)
# ASCII 损失曲线
print("\n 训练损失曲线(每50轮采样)")
sampled = loss_history[::50] + [loss_history[-1]]
max_loss = max(sampled)
W = 40
for i, loss in enumerate(sampled):
bar = "█" * int(loss / max_loss * W)
epoch = min(i * 50, 200)
print(f" epoch {epoch:>3} │{bar:<{W}}│ {loss:.4f}")
test_acc = accuracy_score(y_te, nn.predict(X_te))
print(f"\n 最终测试准确率: {test_acc:.4f}")
可运行演示(补齐 Mock 数据与 print 反馈):
from typing import Optional
import numpy as np
from sklearn.datasets import make_classification
from sklearn.metrics import accuracy_score
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
def relu(z: np.ndarray) -> np.ndarray:
return np.maximum(0.0, z)
def relu_grad(z: np.ndarray) -> np.ndarray:
"""ReLU 的导数:z>0 时为1,否则为0。"""
return (z > 0).astype(float)
def sigmoid(z: np.ndarray) -> np.ndarray:
return 1.0 / (1.0 + np.exp(-np.clip(z, -500, 500)))
def softmax(z: np.ndarray) -> np.ndarray:
e = np.exp(z - z.max(axis=1, keepdims=True)) # 数值稳定
return e / e.sum(axis=1, keepdims=True)
def cross_entropy(y_pred: np.ndarray, y_true: np.ndarray) -> float:
"""多分类交叉熵损失(y_true 为 one-hot)。"""
n = y_true.shape[0]
return -np.sum(y_true * np.log(y_pred + 1e-9)) / n
class NeuralNetwork:
"""
两层全连接神经网络:
输入层(n_in) → 隐藏层(n_hidden, ReLU) → 输出层(n_out, Softmax)
"""
def __init__(self, n_in: int, n_hidden: int, n_out: int,
lr: float = 0.01, seed: int = 42) -> None:
rng = np.random.RandomState(seed)
# He 初始化(适合 ReLU)
self.W1: np.ndarray = rng.randn(n_in, n_hidden) * np.sqrt(2.0 / n_in)
self.b1: np.ndarray = np.zeros((1, n_hidden))
self.W2: np.ndarray = rng.randn(n_hidden, n_out) * np.sqrt(2.0 / n_hidden)
self.b2: np.ndarray = np.zeros((1, n_out))
self.lr = lr
# 缓存前向传播中间值(反向传播需要)
self._cache: dict = {}
def forward(self, X: np.ndarray) -> np.ndarray:
Z1 = X @ self.W1 + self.b1 # (n, hidden)
A1 = relu(Z1) # (n, hidden)
Z2 = A1 @ self.W2 + self.b2 # (n, out)
A2 = softmax(Z2) # (n, out)
self._cache = {"X": X, "Z1": Z1, "A1": A1, "Z2": Z2, "A2": A2}
return A2
def backward(self, y_true: np.ndarray) -> dict[str, float]:
"""反向传播,更新参数,返回各层梯度范数(用于监控)。"""
n = y_true.shape[0]
X, Z1, A1, A2 = (self._cache[k] for k in ("X", "Z1", "A1", "A2"))
# 输出层梯度(softmax + cross-entropy 联合求导,结果简洁)
dZ2 = (A2 - y_true) / n # (n, out)
dW2 = A1.T @ dZ2 # (hidden, out)
db2 = dZ2.sum(axis=0, keepdims=True)
# 隐藏层梯度(链式法则)
dA1 = dZ2 @ self.W2.T # (n, hidden)
dZ1 = dA1 * relu_grad(Z1) # (n, hidden)
dW1 = X.T @ dZ1 # (in, hidden)
db1 = dZ1.sum(axis=0, keepdims=True)
# 梯度下降更新
self.W2 -= self.lr * dW2
self.b2 -= self.lr * db2
self.W1 -= self.lr * dW1
self.b1 -= self.lr * db1
return {
"grad_W1": float(np.linalg.norm(dW1)),
"grad_W2": float(np.linalg.norm(dW2)),
}
def predict(self, X: np.ndarray) -> np.ndarray:
return self.forward(X).argmax(axis=1)
def fit(self, X: np.ndarray, y_onehot: np.ndarray,
epochs: int = 200, batch_size: int = 64,
verbose: bool = True) -> list[float]:
"""Mini-batch 梯度下降训练。"""
n = X.shape[0]
loss_history: list[float] = []
for epoch in range(1, epochs + 1):
# shuffle
idx = np.random.permutation(n)
X_s, y_s = X[idx], y_onehot[idx]
epoch_loss = 0.0
for start in range(0, n, batch_size):
Xb = X_s[start:start + batch_size]
yb = y_s[start:start + batch_size]
pred = self.forward(Xb)
epoch_loss += cross_entropy(pred, yb) * len(Xb)
self.backward(yb)
epoch_loss /= n
loss_history.append(epoch_loss)
if verbose and epoch % 50 == 0:
print(f" epoch {epoch:>4} loss={epoch_loss:.4f}")
return loss_history
def make_data(n_classes: int = 3) -> tuple:
X, y = make_classification(
n_samples=800, n_features=10, n_informative=6,
n_classes=n_classes, n_clusters_per_class=1, random_state=42,
)
scaler = StandardScaler()
X = scaler.fit_transform(X)
X_tr, X_te, y_tr, y_te = train_test_split(X, y, test_size=0.2, random_state=42)
# one-hot 编码
def onehot(labels: np.ndarray, n: int) -> np.ndarray:
oh = np.zeros((len(labels), n))
oh[np.arange(len(labels)), labels] = 1
return oh
return X_tr, X_te, y_tr, y_te, onehot(y_tr, n_classes), onehot(y_te, n_classes)
def mode_train() -> None:
print("\n" + "="*60 + "\n 完整训练循环(Mini-batch 梯度下降)\n" + "="*60)
X_tr, X_te, y_tr, y_te, y_tr_oh, _ = make_data(n_classes=3)
nn = NeuralNetwork(n_in=10, n_hidden=32, n_out=3, lr=0.05)
print(f"\n 网络结构: 10 → 32(ReLU) → 3(Softmax)")
print(f" 训练集: {len(X_tr)} 样本 测试集: {len(X_te)} 样本\n")
loss_history = nn.fit(X_tr, y_tr_oh, epochs=200, batch_size=64)
# ASCII 损失曲线
print("\n 训练损失曲线(每50轮采样)")
sampled = loss_history[::50] + [loss_history[-1]]
max_loss = max(sampled)
W = 40
for i, loss in enumerate(sampled):
bar = "█" * int(loss / max_loss * W)
epoch = min(i * 50, 200)
print(f" epoch {epoch:>3} │{bar:<{W}}│ {loss:.4f}")
test_acc = accuracy_score(y_te, nn.predict(X_te))
print(f"\n 最终测试准确率: {test_acc:.4f}")
mode_train()
Step 8:用 mode_compare 对照 sklearn,验证手写实现是否靠谱
痛点与机制:
手写网络不是为了取代 sklearn,而是为了看懂 sklearn 背后在做什么。mode_compare() 用同一份数据分别训练 numpy 手写 NN 和 MLPClassifier,如果准确率接近,说明我们的前向传播、反向传播和训练循环基本可信。
核心源码(逐字来自文末完整源码):
def mode_compare() -> None:
print("\n" + "="*60 + "\n 手写 NN vs sklearn MLPClassifier\n" + "="*60)
X_tr, X_te, y_tr, y_te, y_tr_oh, _ = make_data(n_classes=3)
import time
rows = []
# 手写 NN
t0 = time.perf_counter()
nn = NeuralNetwork(n_in=10, n_hidden=32, n_out=3, lr=0.05)
nn.fit(X_tr, y_tr_oh, epochs=200, batch_size=64, verbose=False)
t1 = time.perf_counter()
acc1 = accuracy_score(y_te, nn.predict(X_te))
rows.append(["手写 NN (numpy)", "10→32→3", f"{acc1:.4f}", f"{(t1-t0)*1000:.0f}ms"])
# sklearn MLP
t0 = time.perf_counter()
mlp = MLPClassifier(hidden_layer_sizes=(32,), max_iter=200,
learning_rate_init=0.05, random_state=42)
mlp.fit(X_tr, y_tr)
t1 = time.perf_counter()
acc2 = accuracy_score(y_te, mlp.predict(X_te))
rows.append(["sklearn MLP", "10→32→3", f"{acc2:.4f}", f"{(t1-t0)*1000:.0f}ms"])
print(f"\n{'─'*62}")
print(f" {'实现':<20} {'结构':<12} {'测试准确率':<12} {'训练耗时'}")
print(f"{'─'*62}")
for row in rows:
print(f" {row[0]:<20} {row[1]:<12} {row[2]:<12} {row[3]}")
print(f"{'─'*62}")
print("\n 💡 结果相近说明手写实现正确;sklearn 更快因为有 BLAS 优化")
可运行演示(补齐 Mock 数据与 print 反馈):
from typing import Optional
import numpy as np
from sklearn.datasets import make_classification
from sklearn.metrics import accuracy_score
from sklearn.model_selection import train_test_split
from sklearn.neural_network import MLPClassifier
from sklearn.preprocessing import StandardScaler
def relu(z: np.ndarray) -> np.ndarray:
return np.maximum(0.0, z)
def relu_grad(z: np.ndarray) -> np.ndarray:
"""ReLU 的导数:z>0 时为1,否则为0。"""
return (z > 0).astype(float)
def sigmoid(z: np.ndarray) -> np.ndarray:
return 1.0 / (1.0 + np.exp(-np.clip(z, -500, 500)))
def softmax(z: np.ndarray) -> np.ndarray:
e = np.exp(z - z.max(axis=1, keepdims=True)) # 数值稳定
return e / e.sum(axis=1, keepdims=True)
def cross_entropy(y_pred: np.ndarray, y_true: np.ndarray) -> float:
"""多分类交叉熵损失(y_true 为 one-hot)。"""
n = y_true.shape[0]
return -np.sum(y_true * np.log(y_pred + 1e-9)) / n
class NeuralNetwork:
"""
两层全连接神经网络:
输入层(n_in) → 隐藏层(n_hidden, ReLU) → 输出层(n_out, Softmax)
"""
def __init__(self, n_in: int, n_hidden: int, n_out: int,
lr: float = 0.01, seed: int = 42) -> None:
rng = np.random.RandomState(seed)
# He 初始化(适合 ReLU)
self.W1: np.ndarray = rng.randn(n_in, n_hidden) * np.sqrt(2.0 / n_in)
self.b1: np.ndarray = np.zeros((1, n_hidden))
self.W2: np.ndarray = rng.randn(n_hidden, n_out) * np.sqrt(2.0 / n_hidden)
self.b2: np.ndarray = np.zeros((1, n_out))
self.lr = lr
# 缓存前向传播中间值(反向传播需要)
self._cache: dict = {}
def forward(self, X: np.ndarray) -> np.ndarray:
Z1 = X @ self.W1 + self.b1 # (n, hidden)
A1 = relu(Z1) # (n, hidden)
Z2 = A1 @ self.W2 + self.b2 # (n, out)
A2 = softmax(Z2) # (n, out)
self._cache = {"X": X, "Z1": Z1, "A1": A1, "Z2": Z2, "A2": A2}
return A2
def backward(self, y_true: np.ndarray) -> dict[str, float]:
"""反向传播,更新参数,返回各层梯度范数(用于监控)。"""
n = y_true.shape[0]
X, Z1, A1, A2 = (self._cache[k] for k in ("X", "Z1", "A1", "A2"))
# 输出层梯度(softmax + cross-entropy 联合求导,结果简洁)
dZ2 = (A2 - y_true) / n # (n, out)
dW2 = A1.T @ dZ2 # (hidden, out)
db2 = dZ2.sum(axis=0, keepdims=True)
# 隐藏层梯度(链式法则)
dA1 = dZ2 @ self.W2.T # (n, hidden)
dZ1 = dA1 * relu_grad(Z1) # (n, hidden)
dW1 = X.T @ dZ1 # (in, hidden)
db1 = dZ1.sum(axis=0, keepdims=True)
# 梯度下降更新
self.W2 -= self.lr * dW2
self.b2 -= self.lr * db2
self.W1 -= self.lr * dW1
self.b1 -= self.lr * db1
return {
"grad_W1": float(np.linalg.norm(dW1)),
"grad_W2": float(np.linalg.norm(dW2)),
}
def predict(self, X: np.ndarray) -> np.ndarray:
return self.forward(X).argmax(axis=1)
def fit(self, X: np.ndarray, y_onehot: np.ndarray,
epochs: int = 200, batch_size: int = 64,
verbose: bool = True) -> list[float]:
"""Mini-batch 梯度下降训练。"""
n = X.shape[0]
loss_history: list[float] = []
for epoch in range(1, epochs + 1):
# shuffle
idx = np.random.permutation(n)
X_s, y_s = X[idx], y_onehot[idx]
epoch_loss = 0.0
for start in range(0, n, batch_size):
Xb = X_s[start:start + batch_size]
yb = y_s[start:start + batch_size]
pred = self.forward(Xb)
epoch_loss += cross_entropy(pred, yb) * len(Xb)
self.backward(yb)
epoch_loss /= n
loss_history.append(epoch_loss)
if verbose and epoch % 50 == 0:
print(f" epoch {epoch:>4} loss={epoch_loss:.4f}")
return loss_history
def make_data(n_classes: int = 3) -> tuple:
X, y = make_classification(
n_samples=800, n_features=10, n_informative=6,
n_classes=n_classes, n_clusters_per_class=1, random_state=42,
)
scaler = StandardScaler()
X = scaler.fit_transform(X)
X_tr, X_te, y_tr, y_te = train_test_split(X, y, test_size=0.2, random_state=42)
# one-hot 编码
def onehot(labels: np.ndarray, n: int) -> np.ndarray:
oh = np.zeros((len(labels), n))
oh[np.arange(len(labels)), labels] = 1
return oh
return X_tr, X_te, y_tr, y_te, onehot(y_tr, n_classes), onehot(y_te, n_classes)
def mode_compare() -> None:
print("\n" + "="*60 + "\n 手写 NN vs sklearn MLPClassifier\n" + "="*60)
X_tr, X_te, y_tr, y_te, y_tr_oh, _ = make_data(n_classes=3)
import time
rows = []
# 手写 NN
t0 = time.perf_counter()
nn = NeuralNetwork(n_in=10, n_hidden=32, n_out=3, lr=0.05)
nn.fit(X_tr, y_tr_oh, epochs=200, batch_size=64, verbose=False)
t1 = time.perf_counter()
acc1 = accuracy_score(y_te, nn.predict(X_te))
rows.append(["手写 NN (numpy)", "10→32→3", f"{acc1:.4f}", f"{(t1-t0)*1000:.0f}ms"])
# sklearn MLP
t0 = time.perf_counter()
mlp = MLPClassifier(hidden_layer_sizes=(32,), max_iter=200,
learning_rate_init=0.05, random_state=42)
mlp.fit(X_tr, y_tr)
t1 = time.perf_counter()
acc2 = accuracy_score(y_te, mlp.predict(X_te))
rows.append(["sklearn MLP", "10→32→3", f"{acc2:.4f}", f"{(t1-t0)*1000:.0f}ms"])
print(f"\n{'─'*62}")
print(f" {'实现':<20} {'结构':<12} {'测试准确率':<12} {'训练耗时'}")
print(f"{'─'*62}")
for row in rows:
print(f" {row[0]:<20} {row[1]:<12} {row[2]:<12} {row[3]}")
print(f"{'─'*62}")
print("\n 💡 结果相近说明手写实现正确;sklearn 更快因为有 BLAS 优化")
mode_compare()
极客实战:完整源码与运行
现在,把上面的积木拼起来,将下面完整代码保存为 48-python-nn-from-scratch.py。它会生成一份内置分类数据集,完成前向传播、训练循环和 sklearn 对照实验。
#!/usr/bin/env python3
"""
48-python-nn-from-scratch.py — 纯 numpy 手写神经网络
用法:
python3 48-python-nn-from-scratch.py --mode forward # 前向传播演示
python3 48-python-nn-from-scratch.py --mode train # 完整训练循环
python3 48-python-nn-from-scratch.py --mode compare # 与 sklearn MLP 对比
python3 48-python-nn-from-scratch.py --mode all # 全部(默认)
依赖 numpy + scikit-learn,直接运行。
"""
import argparse
from typing import Optional
import numpy as np
from sklearn.datasets import make_classification
from sklearn.metrics import accuracy_score
from sklearn.model_selection import train_test_split
from sklearn.neural_network import MLPClassifier
from sklearn.preprocessing import StandardScaler
def relu(z: np.ndarray) -> np.ndarray:
return np.maximum(0.0, z)
def relu_grad(z: np.ndarray) -> np.ndarray:
"""ReLU 的导数:z>0 时为1,否则为0。"""
return (z > 0).astype(float)
def sigmoid(z: np.ndarray) -> np.ndarray:
return 1.0 / (1.0 + np.exp(-np.clip(z, -500, 500)))
def softmax(z: np.ndarray) -> np.ndarray:
e = np.exp(z - z.max(axis=1, keepdims=True)) # 数值稳定
return e / e.sum(axis=1, keepdims=True)
def cross_entropy(y_pred: np.ndarray, y_true: np.ndarray) -> float:
"""多分类交叉熵损失(y_true 为 one-hot)。"""
n = y_true.shape[0]
return -np.sum(y_true * np.log(y_pred + 1e-9)) / n
# ─── 神经网络类 ────────────────────────────────────────────────────────────────
class NeuralNetwork:
"""
两层全连接神经网络:
输入层(n_in) → 隐藏层(n_hidden, ReLU) → 输出层(n_out, Softmax)
"""
def __init__(self, n_in: int, n_hidden: int, n_out: int,
lr: float = 0.01, seed: int = 42) -> None:
rng = np.random.RandomState(seed)
# He 初始化(适合 ReLU)
self.W1: np.ndarray = rng.randn(n_in, n_hidden) * np.sqrt(2.0 / n_in)
self.b1: np.ndarray = np.zeros((1, n_hidden))
self.W2: np.ndarray = rng.randn(n_hidden, n_out) * np.sqrt(2.0 / n_hidden)
self.b2: np.ndarray = np.zeros((1, n_out))
self.lr = lr
# 缓存前向传播中间值(反向传播需要)
self._cache: dict = {}
def forward(self, X: np.ndarray) -> np.ndarray:
Z1 = X @ self.W1 + self.b1 # (n, hidden)
A1 = relu(Z1) # (n, hidden)
Z2 = A1 @ self.W2 + self.b2 # (n, out)
A2 = softmax(Z2) # (n, out)
self._cache = {"X": X, "Z1": Z1, "A1": A1, "Z2": Z2, "A2": A2}
return A2
def backward(self, y_true: np.ndarray) -> dict[str, float]:
"""反向传播,更新参数,返回各层梯度范数(用于监控)。"""
n = y_true.shape[0]
X, Z1, A1, A2 = (self._cache[k] for k in ("X", "Z1", "A1", "A2"))
# 输出层梯度(softmax + cross-entropy 联合求导,结果简洁)
dZ2 = (A2 - y_true) / n # (n, out)
dW2 = A1.T @ dZ2 # (hidden, out)
db2 = dZ2.sum(axis=0, keepdims=True)
# 隐藏层梯度(链式法则)
dA1 = dZ2 @ self.W2.T # (n, hidden)
dZ1 = dA1 * relu_grad(Z1) # (n, hidden)
dW1 = X.T @ dZ1 # (in, hidden)
db1 = dZ1.sum(axis=0, keepdims=True)
# 梯度下降更新
self.W2 -= self.lr * dW2
self.b2 -= self.lr * db2
self.W1 -= self.lr * dW1
self.b1 -= self.lr * db1
return {
"grad_W1": float(np.linalg.norm(dW1)),
"grad_W2": float(np.linalg.norm(dW2)),
}
def predict(self, X: np.ndarray) -> np.ndarray:
return self.forward(X).argmax(axis=1)
def fit(self, X: np.ndarray, y_onehot: np.ndarray,
epochs: int = 200, batch_size: int = 64,
verbose: bool = True) -> list[float]:
"""Mini-batch 梯度下降训练。"""
n = X.shape[0]
loss_history: list[float] = []
for epoch in range(1, epochs + 1):
# shuffle
idx = np.random.permutation(n)
X_s, y_s = X[idx], y_onehot[idx]
epoch_loss = 0.0
for start in range(0, n, batch_size):
Xb = X_s[start:start + batch_size]
yb = y_s[start:start + batch_size]
pred = self.forward(Xb)
epoch_loss += cross_entropy(pred, yb) * len(Xb)
self.backward(yb)
epoch_loss /= n
loss_history.append(epoch_loss)
if verbose and epoch % 50 == 0:
print(f" epoch {epoch:>4} loss={epoch_loss:.4f}")
return loss_history
# ─── 数据准备 ──────────────────────────────────────────────────────────────────
def make_data(n_classes: int = 3) -> tuple:
X, y = make_classification(
n_samples=800, n_features=10, n_informative=6,
n_classes=n_classes, n_clusters_per_class=1, random_state=42,
)
scaler = StandardScaler()
X = scaler.fit_transform(X)
X_tr, X_te, y_tr, y_te = train_test_split(X, y, test_size=0.2, random_state=42)
# one-hot 编码
def onehot(labels: np.ndarray, n: int) -> np.ndarray:
oh = np.zeros((len(labels), n))
oh[np.arange(len(labels)), labels] = 1
return oh
return X_tr, X_te, y_tr, y_te, onehot(y_tr, n_classes), onehot(y_te, n_classes)
# ─── 模式1:前向传播演示 ───────────────────────────────────────────────────────
def mode_forward() -> None:
print("\n" + "="*60 + "\n 前向传播逐步演示(小型网络,3个样本)\n" + "="*60)
np.random.seed(42)
X = np.random.randn(3, 4) # 3个样本,4个特征
nn = NeuralNetwork(n_in=4, n_hidden=5, n_out=3, lr=0.01)
print(f"\n 输入 X.shape = {X.shape}")
Z1 = X @ nn.W1 + nn.b1
print(f" Z1 = X·W1+b1 shape={Z1.shape} (线性变换)")
A1 = relu(Z1)
print(f" A1 = ReLU(Z1) shape={A1.shape} (激活,负值归零)")
Z2 = A1 @ nn.W2 + nn.b2
print(f" Z2 = A1·W2+b2 shape={Z2.shape} (输出层线性变换)")
A2 = softmax(Z2)
print(f" A2 = Softmax shape={A2.shape} (概率分布)")
print(f"\n 预测概率(每行和为1):")
for i, row in enumerate(A2):
bar = " ".join(f"类{j}:{p:.3f}" for j, p in enumerate(row))
print(f" 样本{i}: {bar} → 预测类{row.argmax()}")
# ─── 模式2:完整训练循环 ───────────────────────────────────────────────────────
def mode_train() -> None:
print("\n" + "="*60 + "\n 完整训练循环(Mini-batch 梯度下降)\n" + "="*60)
X_tr, X_te, y_tr, y_te, y_tr_oh, _ = make_data(n_classes=3)
nn = NeuralNetwork(n_in=10, n_hidden=32, n_out=3, lr=0.05)
print(f"\n 网络结构: 10 → 32(ReLU) → 3(Softmax)")
print(f" 训练集: {len(X_tr)} 样本 测试集: {len(X_te)} 样本\n")
loss_history = nn.fit(X_tr, y_tr_oh, epochs=200, batch_size=64)
# ASCII 损失曲线
print("\n 训练损失曲线(每50轮采样)")
sampled = loss_history[::50] + [loss_history[-1]]
max_loss = max(sampled)
W = 40
for i, loss in enumerate(sampled):
bar = "█" * int(loss / max_loss * W)
epoch = min(i * 50, 200)
print(f" epoch {epoch:>3} │{bar:<{W}}│ {loss:.4f}")
test_acc = accuracy_score(y_te, nn.predict(X_te))
print(f"\n 最终测试准确率: {test_acc:.4f}")
# ─── 模式3:与 sklearn MLP 对比 ───────────────────────────────────────────────
def mode_compare() -> None:
print("\n" + "="*60 + "\n 手写 NN vs sklearn MLPClassifier\n" + "="*60)
X_tr, X_te, y_tr, y_te, y_tr_oh, _ = make_data(n_classes=3)
import time
rows = []
# 手写 NN
t0 = time.perf_counter()
nn = NeuralNetwork(n_in=10, n_hidden=32, n_out=3, lr=0.05)
nn.fit(X_tr, y_tr_oh, epochs=200, batch_size=64, verbose=False)
t1 = time.perf_counter()
acc1 = accuracy_score(y_te, nn.predict(X_te))
rows.append(["手写 NN (numpy)", "10→32→3", f"{acc1:.4f}", f"{(t1-t0)*1000:.0f}ms"])
# sklearn MLP
t0 = time.perf_counter()
mlp = MLPClassifier(hidden_layer_sizes=(32,), max_iter=200,
learning_rate_init=0.05, random_state=42)
mlp.fit(X_tr, y_tr)
t1 = time.perf_counter()
acc2 = accuracy_score(y_te, mlp.predict(X_te))
rows.append(["sklearn MLP", "10→32→3", f"{acc2:.4f}", f"{(t1-t0)*1000:.0f}ms"])
print(f"\n{'─'*62}")
print(f" {'实现':<20} {'结构':<12} {'测试准确率':<12} {'训练耗时'}")
print(f"{'─'*62}")
for row in rows:
print(f" {row[0]:<20} {row[1]:<12} {row[2]:<12} {row[3]}")
print(f"{'─'*62}")
print("\n 💡 结果相近说明手写实现正确;sklearn 更快因为有 BLAS 优化")
# ─── 入口 ─────────────────────────────────────────────────────────────────────
def main() -> None:
parser = argparse.ArgumentParser(description="纯 numpy 手写神经网络")
parser.add_argument(
"--mode",
choices=["forward", "train", "compare", "all"],
default="all",
)
args = parser.parse_args()
dispatch = {
"forward": mode_forward,
"train": mode_train,
"compare": mode_compare,
"all": lambda: [mode_forward(), mode_train(), mode_compare()],
}
dispatch[args.mode]()
if __name__ == "__main__":
main()
$ python 48-python-nn-from-scratch.py --mode forward
============================================================
前向传播逐步演示(小型网络,3个样本)
============================================================
输入 X.shape = (3, 4)
Z1 = X·W1+b1 shape=(3, 5) (线性变换)
A1 = ReLU(Z1) shape=(3, 5) (激活,负值归零)
Z2 = A1·W2+b2 shape=(3, 3) (输出层线性变换)
A2 = Softmax shape=(3, 3) (概率分布)
预测概率(每行和为1):
样本0: 类0:0.249 类1:0.445 类2:0.307 → 预测类1
样本1: 类0:0.303 类1:0.370 类2:0.326 → 预测类1
样本2: 类0:0.105 类1:0.073 类2:0.822 → 预测类2
$ python 48-python-nn-from-scratch.py --mode train
============================================================
完整训练循环(Mini-batch 梯度下降)
============================================================
网络结构: 10 → 32(ReLU) → 3(Softmax)
训练集: 640 样本 测试集: 160 样本
epoch 50 loss=0.2480
epoch 100 loss=0.1928
epoch 150 loss=0.1629
epoch 200 loss=0.1419
训练损失曲线(每50轮采样)
epoch 0 │████████████████████████████████████████│ 1.1359
epoch 50 │████████ │ 0.2465
epoch 100 │██████ │ 0.1922
epoch 150 │█████ │ 0.1626
epoch 200 │████ │ 0.1419
最终测试准确率: 0.9313
小结与 NexDo Time ⚡
这一篇你已经从零跑通了神经网络训练的核心闭环:激活函数负责制造非线性,Softmax 把输出变成概率,交叉熵负责衡量错误,反向传播把错误沿着网络往回传,Mini-batch 梯度下降负责一点点更新参数。
5 分钟微操挑战:把 NeuralNetwork(n_in=10, n_hidden=32, n_out=3, lr=0.05) 里的 n_hidden 分别改成 8、16、64,运行 --mode train,观察损失曲线和测试准确率有什么变化。
Don’t wait for next time, do it in the next moment.