51 · RNN/LSTM:时序数据建模与序列预测
🔗 知识图谱导航:阅读本文前,建议先回顾《48 · 神经网络基础:从零手写前向传播与反向传播》里的矩阵运算和激活函数,再回顾《50 · CNN 卷积神经网络》里“按结构处理数据”的思想。本文会把注意力从图像空间转向时间序列。 NexDo Time · 2026-04-17 · 预计阅读 34 分钟
痛点与架构
普通神经网络处理每个样本时,通常把它当作独立个体。但时序数据不一样:今天的温度和昨天有关,当前一句话和前一句有关,股票此刻的波动也可能受前面走势影响。RNN 的核心价值,就是把“过去的信息”带到“现在的判断”里。
RNN 用隐藏状态 h_t 传递记忆,但长序列里早期信息容易衰减。LSTM 在 RNN 基础上增加了细胞状态和三个关键门,让模型学会选择性遗忘、选择性写入、选择性输出。
RNN: x_t + h_{t-1} -> h_t
LSTM: x_t + h_{t-1} + c_{t-1}
-> 遗忘门/输入门/候选值/输出门
-> c_t 和 h_t
步步为营:核心逻辑自适应拆解
RNN/LSTM 的概念比 CNN 更抽象,所以这一篇拆成 9 个小步骤。你先看激活函数和隐藏状态,再看门控机制,最后跑完整的时序预测。
Step 1:用 sigmoid/tanh 理解 RNN 门和隐藏状态的数值范围
痛点与机制:
RNN/LSTM 里的很多公式都离不开 sigmoid 和 tanh。sigmoid 像一个 0 到 1 的阀门,决定“开多少”;tanh 像带方向的音量旋钮,输出在 -1 到 1 之间,能表示正向或反向影响。先看这两个函数,后面的门控机制就不会像天书。
核心源码(逐字来自文末完整源码):
def sigmoid(x: np.ndarray) -> np.ndarray:
return 1.0 / (1.0 + np.exp(-np.clip(x, -500, 500)))
def tanh(x: np.ndarray) -> np.ndarray:
return np.tanh(x)
可运行演示(补齐 Mock 数据与 print 反馈):
import numpy as np
def sigmoid(x: np.ndarray) -> np.ndarray:
return 1.0 / (1.0 + np.exp(-np.clip(x, -500, 500)))
def tanh(x: np.ndarray) -> np.ndarray:
return np.tanh(x)
values = np.array([[-3.0, -1.0, 0.0, 1.0, 3.0]])
print("原始输入:", values.tolist())
print("sigmoid 输出:", np.round(sigmoid(values), 4).tolist())
print("tanh 输出:", np.round(tanh(values), 4).tolist())
print("直觉:sigmoid 像 0~1 的开关,tanh 像 -1~1 的方向盘。")
Step 2:用 SimpleRNN 把隐藏状态像接力棒一样往后传
痛点与机制:
普通 MLP 每次只看当前输入,RNN 会把上一步的隐藏状态也带进来。你可以把 h_t 想成接力棒:每个时间步拿到当前输入,也拿到前面传来的记忆,再生成新的记忆传给下一步。
核心源码(逐字来自文末完整源码):
class SimpleRNN:
"""
单层 RNN:h_t = tanh(W_x·x_t + W_h·h_{t-1} + b)
输入: (seq_len, input_size)
输出: (seq_len, hidden_size)
"""
def __init__(self, input_size: int, hidden_size: int, seed: int = 42) -> None:
rng = np.random.RandomState(seed)
scale = 0.1
self.W_x = rng.randn(input_size, hidden_size) * scale
self.W_h = rng.randn(hidden_size, hidden_size) * scale
self.b = np.zeros(hidden_size)
self.hidden_size = hidden_size
def forward(self, X: np.ndarray) -> tuple[np.ndarray, list[np.ndarray]]:
"""
X: (seq_len, input_size)
返回: (所有时间步隐藏状态, 隐藏状态列表)
"""
seq_len = X.shape[0]
h = np.zeros(self.hidden_size)
h_states: list[np.ndarray] = []
for t in range(seq_len):
h = tanh(X[t] @ self.W_x + h @ self.W_h + self.b)
h_states.append(h.copy())
return np.array(h_states), h_states
可运行演示(补齐 Mock 数据与 print 反馈):
import numpy as np
def sigmoid(x: np.ndarray) -> np.ndarray:
return 1.0 / (1.0 + np.exp(-np.clip(x, -500, 500)))
def tanh(x: np.ndarray) -> np.ndarray:
return np.tanh(x)
class SimpleRNN:
"""
单层 RNN:h_t = tanh(W_x·x_t + W_h·h_{t-1} + b)
输入: (seq_len, input_size)
输出: (seq_len, hidden_size)
"""
def __init__(self, input_size: int, hidden_size: int, seed: int = 42) -> None:
rng = np.random.RandomState(seed)
scale = 0.1
self.W_x = rng.randn(input_size, hidden_size) * scale
self.W_h = rng.randn(hidden_size, hidden_size) * scale
self.b = np.zeros(hidden_size)
self.hidden_size = hidden_size
def forward(self, X: np.ndarray) -> tuple[np.ndarray, list[np.ndarray]]:
"""
X: (seq_len, input_size)
返回: (所有时间步隐藏状态, 隐藏状态列表)
"""
seq_len = X.shape[0]
h = np.zeros(self.hidden_size)
h_states: list[np.ndarray] = []
for t in range(seq_len):
h = tanh(X[t] @ self.W_x + h @ self.W_h + self.b)
h_states.append(h.copy())
return np.array(h_states), h_states
seq = np.array([[0.1], [0.3], [0.5], [0.3], [0.1]])
rnn = SimpleRNN(input_size=1, hidden_size=3)
h_states, _ = rnn.forward(seq)
print("输入序列:", seq.flatten().tolist())
for t, h in enumerate(h_states):
print(f"t={t} 隐藏状态:", np.round(h, 4).tolist(), "范数=", round(float(np.linalg.norm(h)), 4))
print("直觉:h_t 像接力棒,把前面时间步的信息传给下一步。")
Step 3:用 LSTM 的四个门控制“记住、写入、输出”
痛点与机制:
LSTM 比 RNN 多了细胞状态和门控机制。遗忘门像橡皮擦,决定旧记忆擦掉多少;输入门像写入按钮,决定新信息写多少;候选值是准备写入的内容;输出门像展示窗口,决定这一步给外面看多少记忆。
核心源码(逐字来自文末完整源码):
class LSTM:
"""
单层 LSTM,实现四个门控机制。
输入: (seq_len, input_size)
输出: (seq_len, hidden_size)
"""
def __init__(self, input_size: int, hidden_size: int, seed: int = 42) -> None:
rng = np.random.RandomState(seed)
scale = 0.1
concat_size = input_size + hidden_size
# 四个门的权重矩阵(合并为一个大矩阵,提高效率)
self.W = rng.randn(concat_size, 4 * hidden_size) * scale
self.b = np.zeros(4 * hidden_size)
# 遗忘门偏置初始化为1(让网络初始倾向于记住信息)
self.b[hidden_size:2*hidden_size] = 1.0
self.hidden_size = hidden_size
def forward(self, X: np.ndarray) -> tuple[np.ndarray, dict]:
"""
X: (seq_len, input_size)
返回: (所有时间步隐藏状态, 门控状态字典)
"""
seq_len = X.shape[0]
H = self.hidden_size
h = np.zeros(H)
c = np.zeros(H)
h_states: list[np.ndarray] = []
gate_log: list[dict] = []
for t in range(seq_len):
# 拼接输入和上一步隐藏状态
concat = np.concatenate([X[t], h]) # (input+hidden,)
gates = concat @ self.W + self.b # (4*hidden,)
# 分割四个门
f = sigmoid(gates[0*H:1*H]) # 遗忘门
i = sigmoid(gates[1*H:2*H]) # 输入门
g = tanh(gates[2*H:3*H]) # 候选值
o = sigmoid(gates[3*H:4*H]) # 输出门
# 更新细胞状态和隐藏状态
c = f * c + i * g
h = o * tanh(c)
h_states.append(h.copy())
gate_log.append({"f": f.mean(), "i": i.mean(),
"g": g.mean(), "o": o.mean()})
return np.array(h_states), {"gates": gate_log}
可运行演示(补齐 Mock 数据与 print 反馈):
import numpy as np
def sigmoid(x: np.ndarray) -> np.ndarray:
return 1.0 / (1.0 + np.exp(-np.clip(x, -500, 500)))
def tanh(x: np.ndarray) -> np.ndarray:
return np.tanh(x)
class LSTM:
"""
单层 LSTM,实现四个门控机制。
输入: (seq_len, input_size)
输出: (seq_len, hidden_size)
"""
def __init__(self, input_size: int, hidden_size: int, seed: int = 42) -> None:
rng = np.random.RandomState(seed)
scale = 0.1
concat_size = input_size + hidden_size
# 四个门的权重矩阵(合并为一个大矩阵,提高效率)
self.W = rng.randn(concat_size, 4 * hidden_size) * scale
self.b = np.zeros(4 * hidden_size)
# 遗忘门偏置初始化为1(让网络初始倾向于记住信息)
self.b[hidden_size:2*hidden_size] = 1.0
self.hidden_size = hidden_size
def forward(self, X: np.ndarray) -> tuple[np.ndarray, dict]:
"""
X: (seq_len, input_size)
返回: (所有时间步隐藏状态, 门控状态字典)
"""
seq_len = X.shape[0]
H = self.hidden_size
h = np.zeros(H)
c = np.zeros(H)
h_states: list[np.ndarray] = []
gate_log: list[dict] = []
for t in range(seq_len):
# 拼接输入和上一步隐藏状态
concat = np.concatenate([X[t], h]) # (input+hidden,)
gates = concat @ self.W + self.b # (4*hidden,)
# 分割四个门
f = sigmoid(gates[0*H:1*H]) # 遗忘门
i = sigmoid(gates[1*H:2*H]) # 输入门
g = tanh(gates[2*H:3*H]) # 候选值
o = sigmoid(gates[3*H:4*H]) # 输出门
# 更新细胞状态和隐藏状态
c = f * c + i * g
h = o * tanh(c)
h_states.append(h.copy())
gate_log.append({"f": f.mean(), "i": i.mean(),
"g": g.mean(), "o": o.mean()})
return np.array(h_states), {"gates": gate_log}
seq = np.array([[0.1], [0.3], [0.6], [0.9], [0.4]])
lstm = LSTM(input_size=1, hidden_size=4)
h_states, info = lstm.forward(seq)
print("输入序列:", seq.flatten().tolist())
for t, gate in enumerate(info["gates"]):
print(f"t={t} f={gate['f']:.3f} i={gate['i']:.3f} g={gate['g']:+.3f} o={gate['o']:.3f} |h|={np.linalg.norm(h_states[t]):.4f}")
print("直觉:遗忘门像橡皮擦,输入门像写入按钮,输出门像展示窗口。")
Step 4:用 make_timeseries 造一条可预测的时间序列
痛点与机制:
真实股价或传感器数据不适合新手第一步就拿来练,因为噪声和业务因素太多。这里生成“正弦波 + 谐波 + 趋势 + 噪声”的模拟序列,像一条有规律但不死板的练习曲。
核心源码(逐字来自文末完整源码):
def make_timeseries(n: int = 500, seed: int = 42) -> np.ndarray:
"""生成带噪声的正弦+趋势时序数据(模拟股价/传感器信号)。"""
rng = np.random.RandomState(seed)
t = np.linspace(0, 4 * np.pi, n)
signal = (
np.sin(t) # 主周期
+ 0.5 * np.sin(3 * t) # 谐波
+ 0.1 * t / (4 * np.pi) # 趋势
+ rng.randn(n) * 0.1 # 噪声
)
return signal
可运行演示(补齐 Mock 数据与 print 反馈):
import numpy as np
def make_timeseries(n: int = 500, seed: int = 42) -> np.ndarray:
"""生成带噪声的正弦+趋势时序数据(模拟股价/传感器信号)。"""
rng = np.random.RandomState(seed)
t = np.linspace(0, 4 * np.pi, n)
signal = (
np.sin(t) # 主周期
+ 0.5 * np.sin(3 * t) # 谐波
+ 0.1 * t / (4 * np.pi) # 趋势
+ rng.randn(n) * 0.1 # 噪声
)
return signal
signal = make_timeseries(n=20, seed=7)
print("序列长度:", len(signal))
print("前 8 个点:", np.round(signal[:8], 4).tolist())
print("最小/最大:", round(float(signal.min()), 4), round(float(signal.max()), 4))
print("说明:这是正弦波 + 谐波 + 趋势 + 噪声,像简化版传感器数据。")
Step 5:用 make_sequences 把连续序列切成监督学习样本
痛点与机制:
模型不能直接吃一整条无限长的时间线,所以要切滑动窗口。比如用最近 10 个点预测第 11 个点,这就把时序问题变成了普通的 X/y 训练样本。
核心源码(逐字来自文末完整源码):
def make_sequences(data: np.ndarray, seq_len: int = 10
) -> tuple[np.ndarray, np.ndarray]:
"""将时序数据切分为 (X, y) 滑动窗口对。"""
X, y = [], []
for i in range(len(data) - seq_len):
X.append(data[i:i + seq_len])
y.append(data[i + seq_len])
return np.array(X), np.array(y)
可运行演示(补齐 Mock 数据与 print 反馈):
import numpy as np
def make_sequences(data: np.ndarray, seq_len: int = 10
) -> tuple[np.ndarray, np.ndarray]:
"""将时序数据切分为 (X, y) 滑动窗口对。"""
X, y = [], []
for i in range(len(data) - seq_len):
X.append(data[i:i + seq_len])
y.append(data[i + seq_len])
return np.array(X), np.array(y)
data = np.array([10, 11, 12, 13, 14, 15], dtype=float)
X, y = make_sequences(data, seq_len=3)
print("原始序列:", data.tolist())
for i, (window, target) in enumerate(zip(X, y)):
print(f"样本{i}: 输入窗口={window.tolist()} -> 预测下一个值={target}")
print("直觉:滑动窗口像用最近 3 天的数据预测第 4 天。")
Step 6:用 mode_rnn 打印每个时间步的隐藏状态
痛点与机制:
只看最终预测很难理解 RNN。mode_rnn() 把每个时间步的 4 维隐藏状态都打印出来,让你看到输入变化时,记忆向量也跟着变化。范数越大,说明当前隐藏状态的整体强度越明显。
核心源码(逐字来自文末完整源码):
def mode_rnn() -> None:
print("\n" + "="*60 + "\n 手写 RNN 前向传播演示\n" + "="*60)
rnn = SimpleRNN(input_size=1, hidden_size=4)
# 输入:5个时间步,每步1个特征
X = np.array([[0.1], [0.3], [0.5], [0.3], [0.1]])
h_states, _ = rnn.forward(X)
print(f"\n 输入序列: {X.flatten()}")
print(f" 隐藏状态 shape: {h_states.shape} (seq_len=5, hidden=4)\n")
print(f" {'时间步':<8} {'h_t (4维隐藏状态)':<40} {'‖h_t‖'}")
print(f" {'─'*60}")
for t, h in enumerate(h_states):
norm = np.linalg.norm(h)
vals = " ".join(f"{v:+.3f}" for v in h)
print(f" t={t} [{vals}] {norm:.4f}")
print(f"\n 💡 h_t 随输入变化,携带了历史信息(t=2时信号最强,h范数最大)")
可运行演示(补齐 Mock 数据与 print 反馈):
import numpy as np
def sigmoid(x: np.ndarray) -> np.ndarray:
return 1.0 / (1.0 + np.exp(-np.clip(x, -500, 500)))
def tanh(x: np.ndarray) -> np.ndarray:
return np.tanh(x)
class SimpleRNN:
"""
单层 RNN:h_t = tanh(W_x·x_t + W_h·h_{t-1} + b)
输入: (seq_len, input_size)
输出: (seq_len, hidden_size)
"""
def __init__(self, input_size: int, hidden_size: int, seed: int = 42) -> None:
rng = np.random.RandomState(seed)
scale = 0.1
self.W_x = rng.randn(input_size, hidden_size) * scale
self.W_h = rng.randn(hidden_size, hidden_size) * scale
self.b = np.zeros(hidden_size)
self.hidden_size = hidden_size
def forward(self, X: np.ndarray) -> tuple[np.ndarray, list[np.ndarray]]:
"""
X: (seq_len, input_size)
返回: (所有时间步隐藏状态, 隐藏状态列表)
"""
seq_len = X.shape[0]
h = np.zeros(self.hidden_size)
h_states: list[np.ndarray] = []
for t in range(seq_len):
h = tanh(X[t] @ self.W_x + h @ self.W_h + self.b)
h_states.append(h.copy())
return np.array(h_states), h_states
def mode_rnn() -> None:
print("\n" + "="*60 + "\n 手写 RNN 前向传播演示\n" + "="*60)
rnn = SimpleRNN(input_size=1, hidden_size=4)
# 输入:5个时间步,每步1个特征
X = np.array([[0.1], [0.3], [0.5], [0.3], [0.1]])
h_states, _ = rnn.forward(X)
print(f"\n 输入序列: {X.flatten()}")
print(f" 隐藏状态 shape: {h_states.shape} (seq_len=5, hidden=4)\n")
print(f" {'时间步':<8} {'h_t (4维隐藏状态)':<40} {'‖h_t‖'}")
print(f" {'─'*60}")
for t, h in enumerate(h_states):
norm = np.linalg.norm(h)
vals = " ".join(f"{v:+.3f}" for v in h)
print(f" t={t} [{vals}] {norm:.4f}")
print(f"\n 💡 h_t 随输入变化,携带了历史信息(t=2时信号最强,h范数最大)")
mode_rnn()
Step 7:用 mode_lstm 观察遗忘门、输入门和输出门
痛点与机制:
LSTM 的门控如果只看公式很抽象,打印均值后会直观很多。遗忘门接近 1 代表更愿意保留旧记忆,输入门接近 1 代表更愿意写入新信息。它像一个会选择性记笔记的学生,不是什么都死记硬背。
核心源码(逐字来自文末完整源码):
def mode_lstm() -> None:
print("\n" + "="*60 + "\n 手写 LSTM 门控机制演示\n" + "="*60)
lstm = LSTM(input_size=1, hidden_size=8)
# 模拟一段上升后下降的信号
X = np.array([[0.1], [0.3], [0.6], [0.9], [0.7], [0.4], [0.1]])
h_states, info = lstm.forward(X)
print(f"\n 输入序列: {X.flatten()}")
print(f"\n {'时间步':<8} {'遗忘门f':<10} {'输入门i':<10} {'候选g':<10} {'输出门o':<10} {'‖h_t‖'}")
print(f" {'─'*60}")
for t, gate in enumerate(info["gates"]):
norm = np.linalg.norm(h_states[t])
print(f" t={t} {gate['f']:.3f} {gate['i']:.3f} "
f"{gate['g']:+.3f} {gate['o']:.3f} {norm:.4f}")
print(f"\n 💡 遗忘门接近1=记住历史,接近0=遗忘历史")
print(f" 输入门接近1=写入新信息,接近0=忽略新输入")
可运行演示(补齐 Mock 数据与 print 反馈):
import numpy as np
def sigmoid(x: np.ndarray) -> np.ndarray:
return 1.0 / (1.0 + np.exp(-np.clip(x, -500, 500)))
def tanh(x: np.ndarray) -> np.ndarray:
return np.tanh(x)
class LSTM:
"""
单层 LSTM,实现四个门控机制。
输入: (seq_len, input_size)
输出: (seq_len, hidden_size)
"""
def __init__(self, input_size: int, hidden_size: int, seed: int = 42) -> None:
rng = np.random.RandomState(seed)
scale = 0.1
concat_size = input_size + hidden_size
# 四个门的权重矩阵(合并为一个大矩阵,提高效率)
self.W = rng.randn(concat_size, 4 * hidden_size) * scale
self.b = np.zeros(4 * hidden_size)
# 遗忘门偏置初始化为1(让网络初始倾向于记住信息)
self.b[hidden_size:2*hidden_size] = 1.0
self.hidden_size = hidden_size
def forward(self, X: np.ndarray) -> tuple[np.ndarray, dict]:
"""
X: (seq_len, input_size)
返回: (所有时间步隐藏状态, 门控状态字典)
"""
seq_len = X.shape[0]
H = self.hidden_size
h = np.zeros(H)
c = np.zeros(H)
h_states: list[np.ndarray] = []
gate_log: list[dict] = []
for t in range(seq_len):
# 拼接输入和上一步隐藏状态
concat = np.concatenate([X[t], h]) # (input+hidden,)
gates = concat @ self.W + self.b # (4*hidden,)
# 分割四个门
f = sigmoid(gates[0*H:1*H]) # 遗忘门
i = sigmoid(gates[1*H:2*H]) # 输入门
g = tanh(gates[2*H:3*H]) # 候选值
o = sigmoid(gates[3*H:4*H]) # 输出门
# 更新细胞状态和隐藏状态
c = f * c + i * g
h = o * tanh(c)
h_states.append(h.copy())
gate_log.append({"f": f.mean(), "i": i.mean(),
"g": g.mean(), "o": o.mean()})
return np.array(h_states), {"gates": gate_log}
def mode_lstm() -> None:
print("\n" + "="*60 + "\n 手写 LSTM 门控机制演示\n" + "="*60)
lstm = LSTM(input_size=1, hidden_size=8)
# 模拟一段上升后下降的信号
X = np.array([[0.1], [0.3], [0.6], [0.9], [0.7], [0.4], [0.1]])
h_states, info = lstm.forward(X)
print(f"\n 输入序列: {X.flatten()}")
print(f"\n {'时间步':<8} {'遗忘门f':<10} {'输入门i':<10} {'候选g':<10} {'输出门o':<10} {'‖h_t‖'}")
print(f" {'─'*60}")
for t, gate in enumerate(info["gates"]):
norm = np.linalg.norm(h_states[t])
print(f" t={t} {gate['f']:.3f} {gate['i']:.3f} "
f"{gate['g']:+.3f} {gate['o']:.3f} {norm:.4f}")
print(f"\n 💡 遗忘门接近1=记住历史,接近0=遗忘历史")
print(f" 输入门接近1=写入新信息,接近0=忽略新输入")
mode_lstm()
Step 8:用 mode_predict 跑通 LSTM 特征 + Ridge 的时序预测
痛点与机制:
完整训练 LSTM 反向传播很复杂,这篇先用 LSTM 前向传播提取序列特征,再交给 Ridge 回归做预测。可以把 LSTM 理解成“读序列的摘要员”,Ridge 是“根据摘要给出下一步数值的打分器”。
核心源码(逐字来自文末完整源码):
def mode_predict() -> None:
print("\n" + "="*60 + "\n 时序预测实战(正弦+趋势信号)\n" + "="*60)
signal = make_timeseries(n=400)
scaler = MinMaxScaler()
signal_scaled = scaler.fit_transform(signal.reshape(-1, 1)).flatten()
SEQ_LEN = 15
X, y = make_sequences(signal_scaled, seq_len=SEQ_LEN)
split = int(len(X) * 0.8)
X_tr, X_te = X[:split], X[split:]
y_tr, y_te = y[:split], y[split:]
# 用 LSTM 最后一步隐藏状态 + 线性层预测
lstm = LSTM(input_size=1, hidden_size=16)
# 提取特征:每个序列的 LSTM 最后隐藏状态
def extract_features(seqs: np.ndarray) -> np.ndarray:
feats = []
for seq in seqs:
X_seq = seq.reshape(-1, 1)
h_states, _ = lstm.forward(X_seq)
feats.append(h_states[-1]) # 最后时间步的隐藏状态
return np.array(feats)
print(f"\n 提取 LSTM 特征(训练集 {len(X_tr)} 序列)...")
X_tr_feat = extract_features(X_tr)
X_te_feat = extract_features(X_te)
# 用 sklearn 线性回归拟合
from sklearn.linear_model import Ridge
reg = Ridge(alpha=0.1)
reg.fit(X_tr_feat, y_tr)
y_pred = reg.predict(X_te_feat)
mse = mean_squared_error(y_te, y_pred)
mae = np.mean(np.abs(y_te - y_pred))
print(f" MSE: {mse:.6f} MAE: {mae:.6f}")
# ASCII 预测 vs 真实值对比(前20个测试点)
print(f"\n 预测 vs 真实(前20个测试点,归一化后)")
print(f" {'步骤':<6} {'真实值':<10} {'预测值':<10} {'误差':<10} 对比")
print(f" {'─'*55}")
for i in range(min(20, len(y_te))):
real, pred = y_te[i], y_pred[i]
err = abs(real - pred)
bar_r = "█" * int(real * 20)
bar_p = "░" * int(pred * 20)
print(f" {i:<6} {real:.4f} {pred:.4f} {err:.4f} {bar_r}{bar_p}")
可运行演示(补齐 Mock 数据与 print 反馈):
import numpy as np
from sklearn.linear_model import Ridge
from sklearn.metrics import mean_squared_error
from sklearn.preprocessing import MinMaxScaler
def sigmoid(x: np.ndarray) -> np.ndarray:
return 1.0 / (1.0 + np.exp(-np.clip(x, -500, 500)))
def tanh(x: np.ndarray) -> np.ndarray:
return np.tanh(x)
class LSTM:
"""
单层 LSTM,实现四个门控机制。
输入: (seq_len, input_size)
输出: (seq_len, hidden_size)
"""
def __init__(self, input_size: int, hidden_size: int, seed: int = 42) -> None:
rng = np.random.RandomState(seed)
scale = 0.1
concat_size = input_size + hidden_size
# 四个门的权重矩阵(合并为一个大矩阵,提高效率)
self.W = rng.randn(concat_size, 4 * hidden_size) * scale
self.b = np.zeros(4 * hidden_size)
# 遗忘门偏置初始化为1(让网络初始倾向于记住信息)
self.b[hidden_size:2*hidden_size] = 1.0
self.hidden_size = hidden_size
def forward(self, X: np.ndarray) -> tuple[np.ndarray, dict]:
"""
X: (seq_len, input_size)
返回: (所有时间步隐藏状态, 门控状态字典)
"""
seq_len = X.shape[0]
H = self.hidden_size
h = np.zeros(H)
c = np.zeros(H)
h_states: list[np.ndarray] = []
gate_log: list[dict] = []
for t in range(seq_len):
# 拼接输入和上一步隐藏状态
concat = np.concatenate([X[t], h]) # (input+hidden,)
gates = concat @ self.W + self.b # (4*hidden,)
# 分割四个门
f = sigmoid(gates[0*H:1*H]) # 遗忘门
i = sigmoid(gates[1*H:2*H]) # 输入门
g = tanh(gates[2*H:3*H]) # 候选值
o = sigmoid(gates[3*H:4*H]) # 输出门
# 更新细胞状态和隐藏状态
c = f * c + i * g
h = o * tanh(c)
h_states.append(h.copy())
gate_log.append({"f": f.mean(), "i": i.mean(),
"g": g.mean(), "o": o.mean()})
return np.array(h_states), {"gates": gate_log}
def make_timeseries(n: int = 500, seed: int = 42) -> np.ndarray:
"""生成带噪声的正弦+趋势时序数据(模拟股价/传感器信号)。"""
rng = np.random.RandomState(seed)
t = np.linspace(0, 4 * np.pi, n)
signal = (
np.sin(t) # 主周期
+ 0.5 * np.sin(3 * t) # 谐波
+ 0.1 * t / (4 * np.pi) # 趋势
+ rng.randn(n) * 0.1 # 噪声
)
return signal
def make_sequences(data: np.ndarray, seq_len: int = 10
) -> tuple[np.ndarray, np.ndarray]:
"""将时序数据切分为 (X, y) 滑动窗口对。"""
X, y = [], []
for i in range(len(data) - seq_len):
X.append(data[i:i + seq_len])
y.append(data[i + seq_len])
return np.array(X), np.array(y)
def mode_predict() -> None:
print("\n" + "="*60 + "\n 时序预测实战(正弦+趋势信号)\n" + "="*60)
signal = make_timeseries(n=400)
scaler = MinMaxScaler()
signal_scaled = scaler.fit_transform(signal.reshape(-1, 1)).flatten()
SEQ_LEN = 15
X, y = make_sequences(signal_scaled, seq_len=SEQ_LEN)
split = int(len(X) * 0.8)
X_tr, X_te = X[:split], X[split:]
y_tr, y_te = y[:split], y[split:]
# 用 LSTM 最后一步隐藏状态 + 线性层预测
lstm = LSTM(input_size=1, hidden_size=16)
# 提取特征:每个序列的 LSTM 最后隐藏状态
def extract_features(seqs: np.ndarray) -> np.ndarray:
feats = []
for seq in seqs:
X_seq = seq.reshape(-1, 1)
h_states, _ = lstm.forward(X_seq)
feats.append(h_states[-1]) # 最后时间步的隐藏状态
return np.array(feats)
print(f"\n 提取 LSTM 特征(训练集 {len(X_tr)} 序列)...")
X_tr_feat = extract_features(X_tr)
X_te_feat = extract_features(X_te)
# 用 sklearn 线性回归拟合
from sklearn.linear_model import Ridge
reg = Ridge(alpha=0.1)
reg.fit(X_tr_feat, y_tr)
y_pred = reg.predict(X_te_feat)
mse = mean_squared_error(y_te, y_pred)
mae = np.mean(np.abs(y_te - y_pred))
print(f" MSE: {mse:.6f} MAE: {mae:.6f}")
# ASCII 预测 vs 真实值对比(前20个测试点)
print(f"\n 预测 vs 真实(前20个测试点,归一化后)")
print(f" {'步骤':<6} {'真实值':<10} {'预测值':<10} {'误差':<10} 对比")
print(f" {'─'*55}")
for i in range(min(20, len(y_te))):
real, pred = y_te[i], y_pred[i]
err = abs(real - pred)
bar_r = "█" * int(real * 20)
bar_p = "░" * int(pred * 20)
print(f" {i:<6} {real:.4f} {pred:.4f} {err:.4f} {bar_r}{bar_p}")
mode_predict()
Step 9:用 main 把 rnn/lstm/predict/compare 做成命令行入口
痛点与机制:
命令行入口让读者不用改源码,只靠 --mode 切换实验。学习时先跑 rnn 和 lstm 看机制,再跑 predict 和 compare 看完整任务,这样节奏更稳。
核心源码(逐字来自文末完整源码):
def main() -> None:
parser = argparse.ArgumentParser(description="RNN/LSTM 时序建模从零实现")
parser.add_argument(
"--mode",
choices=["rnn", "lstm", "predict", "compare", "all"],
default="all",
)
args = parser.parse_args()
dispatch = {
"rnn": mode_rnn,
"lstm": mode_lstm,
"predict": mode_predict,
"compare": mode_compare,
"all": lambda: [mode_rnn(), mode_lstm(), mode_predict(), mode_compare()],
}
dispatch[args.mode]()
可运行演示(补齐 Mock 数据与 print 反馈):
import argparse
def main() -> None:
parser = argparse.ArgumentParser(description="RNN/LSTM 时序建模从零实现")
parser.add_argument(
"--mode",
choices=["rnn", "lstm", "predict", "compare", "all"],
default="all",
)
args = parser.parse_args()
dispatch = {
"rnn": mode_rnn,
"lstm": mode_lstm,
"predict": mode_predict,
"compare": mode_compare,
"all": lambda: [mode_rnn(), mode_lstm(), mode_predict(), mode_compare()],
}
dispatch[args.mode]()
def mode_rnn() -> None:
print("运行 RNN 前向传播")
def mode_lstm() -> None:
print("运行 LSTM 门控演示")
def mode_predict() -> None:
print("运行时序预测")
def mode_compare() -> None:
print("运行 RNN/LSTM/MLP 对比")
import sys
for mode in ["rnn", "lstm", "predict", "compare"]:
print(f"\n$ python 51-python-rnn-lstm.py --mode {mode}")
sys.argv = ["51-python-rnn-lstm.py", "--mode", mode]
main()
极客实战:完整源码与运行
现在,把上面的积木拼起来,将下面完整代码保存为 51-python-rnn-lstm.py。它会生成一条模拟时序数据,分别演示 RNN、LSTM、LSTM 特征预测和模型对比。
#!/usr/bin/env python3
"""
51-python-rnn-lstm.py — RNN/LSTM 时序建模从零实现
用法:
python3 51-python-rnn-lstm.py --mode rnn # 手写 RNN 前向传播
python3 51-python-rnn-lstm.py --mode lstm # 手写 LSTM 前向传播
python3 51-python-rnn-lstm.py --mode predict # 时序预测实战
python3 51-python-rnn-lstm.py --mode compare # RNN vs LSTM vs MLP 对比
python3 51-python-rnn-lstm.py --mode all # 全部(默认)
依赖 numpy + scikit-learn,直接运行。
"""
import argparse
import numpy as np
from sklearn.metrics import mean_squared_error
from sklearn.neural_network import MLPRegressor
from sklearn.preprocessing import MinMaxScaler
def sigmoid(x: np.ndarray) -> np.ndarray:
return 1.0 / (1.0 + np.exp(-np.clip(x, -500, 500)))
def tanh(x: np.ndarray) -> np.ndarray:
return np.tanh(x)
# ─── 手写 RNN ──────────────────────────────────────────────────────────────────
class SimpleRNN:
"""
单层 RNN:h_t = tanh(W_x·x_t + W_h·h_{t-1} + b)
输入: (seq_len, input_size)
输出: (seq_len, hidden_size)
"""
def __init__(self, input_size: int, hidden_size: int, seed: int = 42) -> None:
rng = np.random.RandomState(seed)
scale = 0.1
self.W_x = rng.randn(input_size, hidden_size) * scale
self.W_h = rng.randn(hidden_size, hidden_size) * scale
self.b = np.zeros(hidden_size)
self.hidden_size = hidden_size
def forward(self, X: np.ndarray) -> tuple[np.ndarray, list[np.ndarray]]:
"""
X: (seq_len, input_size)
返回: (所有时间步隐藏状态, 隐藏状态列表)
"""
seq_len = X.shape[0]
h = np.zeros(self.hidden_size)
h_states: list[np.ndarray] = []
for t in range(seq_len):
h = tanh(X[t] @ self.W_x + h @ self.W_h + self.b)
h_states.append(h.copy())
return np.array(h_states), h_states
# ─── 手写 LSTM ─────────────────────────────────────────────────────────────────
class LSTM:
"""
单层 LSTM,实现四个门控机制。
输入: (seq_len, input_size)
输出: (seq_len, hidden_size)
"""
def __init__(self, input_size: int, hidden_size: int, seed: int = 42) -> None:
rng = np.random.RandomState(seed)
scale = 0.1
concat_size = input_size + hidden_size
# 四个门的权重矩阵(合并为一个大矩阵,提高效率)
self.W = rng.randn(concat_size, 4 * hidden_size) * scale
self.b = np.zeros(4 * hidden_size)
# 遗忘门偏置初始化为1(让网络初始倾向于记住信息)
self.b[hidden_size:2*hidden_size] = 1.0
self.hidden_size = hidden_size
def forward(self, X: np.ndarray) -> tuple[np.ndarray, dict]:
"""
X: (seq_len, input_size)
返回: (所有时间步隐藏状态, 门控状态字典)
"""
seq_len = X.shape[0]
H = self.hidden_size
h = np.zeros(H)
c = np.zeros(H)
h_states: list[np.ndarray] = []
gate_log: list[dict] = []
for t in range(seq_len):
# 拼接输入和上一步隐藏状态
concat = np.concatenate([X[t], h]) # (input+hidden,)
gates = concat @ self.W + self.b # (4*hidden,)
# 分割四个门
f = sigmoid(gates[0*H:1*H]) # 遗忘门
i = sigmoid(gates[1*H:2*H]) # 输入门
g = tanh(gates[2*H:3*H]) # 候选值
o = sigmoid(gates[3*H:4*H]) # 输出门
# 更新细胞状态和隐藏状态
c = f * c + i * g
h = o * tanh(c)
h_states.append(h.copy())
gate_log.append({"f": f.mean(), "i": i.mean(),
"g": g.mean(), "o": o.mean()})
return np.array(h_states), {"gates": gate_log}
# ─── 时序数据生成 ──────────────────────────────────────────────────────────────
def make_timeseries(n: int = 500, seed: int = 42) -> np.ndarray:
"""生成带噪声的正弦+趋势时序数据(模拟股价/传感器信号)。"""
rng = np.random.RandomState(seed)
t = np.linspace(0, 4 * np.pi, n)
signal = (
np.sin(t) # 主周期
+ 0.5 * np.sin(3 * t) # 谐波
+ 0.1 * t / (4 * np.pi) # 趋势
+ rng.randn(n) * 0.1 # 噪声
)
return signal
def make_sequences(data: np.ndarray, seq_len: int = 10
) -> tuple[np.ndarray, np.ndarray]:
"""将时序数据切分为 (X, y) 滑动窗口对。"""
X, y = [], []
for i in range(len(data) - seq_len):
X.append(data[i:i + seq_len])
y.append(data[i + seq_len])
return np.array(X), np.array(y)
# ─── 模式1:RNN 前向传播演示 ───────────────────────────────────────────────────
def mode_rnn() -> None:
print("\n" + "="*60 + "\n 手写 RNN 前向传播演示\n" + "="*60)
rnn = SimpleRNN(input_size=1, hidden_size=4)
# 输入:5个时间步,每步1个特征
X = np.array([[0.1], [0.3], [0.5], [0.3], [0.1]])
h_states, _ = rnn.forward(X)
print(f"\n 输入序列: {X.flatten()}")
print(f" 隐藏状态 shape: {h_states.shape} (seq_len=5, hidden=4)\n")
print(f" {'时间步':<8} {'h_t (4维隐藏状态)':<40} {'‖h_t‖'}")
print(f" {'─'*60}")
for t, h in enumerate(h_states):
norm = np.linalg.norm(h)
vals = " ".join(f"{v:+.3f}" for v in h)
print(f" t={t} [{vals}] {norm:.4f}")
print(f"\n 💡 h_t 随输入变化,携带了历史信息(t=2时信号最强,h范数最大)")
# ─── 模式2:LSTM 门控演示 ──────────────────────────────────────────────────────
def mode_lstm() -> None:
print("\n" + "="*60 + "\n 手写 LSTM 门控机制演示\n" + "="*60)
lstm = LSTM(input_size=1, hidden_size=8)
# 模拟一段上升后下降的信号
X = np.array([[0.1], [0.3], [0.6], [0.9], [0.7], [0.4], [0.1]])
h_states, info = lstm.forward(X)
print(f"\n 输入序列: {X.flatten()}")
print(f"\n {'时间步':<8} {'遗忘门f':<10} {'输入门i':<10} {'候选g':<10} {'输出门o':<10} {'‖h_t‖'}")
print(f" {'─'*60}")
for t, gate in enumerate(info["gates"]):
norm = np.linalg.norm(h_states[t])
print(f" t={t} {gate['f']:.3f} {gate['i']:.3f} "
f"{gate['g']:+.3f} {gate['o']:.3f} {norm:.4f}")
print(f"\n 💡 遗忘门接近1=记住历史,接近0=遗忘历史")
print(f" 输入门接近1=写入新信息,接近0=忽略新输入")
# ─── 模式3:时序预测实战 ───────────────────────────────────────────────────────
def mode_predict() -> None:
print("\n" + "="*60 + "\n 时序预测实战(正弦+趋势信号)\n" + "="*60)
signal = make_timeseries(n=400)
scaler = MinMaxScaler()
signal_scaled = scaler.fit_transform(signal.reshape(-1, 1)).flatten()
SEQ_LEN = 15
X, y = make_sequences(signal_scaled, seq_len=SEQ_LEN)
split = int(len(X) * 0.8)
X_tr, X_te = X[:split], X[split:]
y_tr, y_te = y[:split], y[split:]
# 用 LSTM 最后一步隐藏状态 + 线性层预测
lstm = LSTM(input_size=1, hidden_size=16)
# 提取特征:每个序列的 LSTM 最后隐藏状态
def extract_features(seqs: np.ndarray) -> np.ndarray:
feats = []
for seq in seqs:
X_seq = seq.reshape(-1, 1)
h_states, _ = lstm.forward(X_seq)
feats.append(h_states[-1]) # 最后时间步的隐藏状态
return np.array(feats)
print(f"\n 提取 LSTM 特征(训练集 {len(X_tr)} 序列)...")
X_tr_feat = extract_features(X_tr)
X_te_feat = extract_features(X_te)
# 用 sklearn 线性回归拟合
from sklearn.linear_model import Ridge
reg = Ridge(alpha=0.1)
reg.fit(X_tr_feat, y_tr)
y_pred = reg.predict(X_te_feat)
mse = mean_squared_error(y_te, y_pred)
mae = np.mean(np.abs(y_te - y_pred))
print(f" MSE: {mse:.6f} MAE: {mae:.6f}")
# ASCII 预测 vs 真实值对比(前20个测试点)
print(f"\n 预测 vs 真实(前20个测试点,归一化后)")
print(f" {'步骤':<6} {'真实值':<10} {'预测值':<10} {'误差':<10} 对比")
print(f" {'─'*55}")
for i in range(min(20, len(y_te))):
real, pred = y_te[i], y_pred[i]
err = abs(real - pred)
bar_r = "█" * int(real * 20)
bar_p = "░" * int(pred * 20)
print(f" {i:<6} {real:.4f} {pred:.4f} {err:.4f} {bar_r}{bar_p}")
# ─── 模式4:RNN vs LSTM vs MLP 对比 ───────────────────────────────────────────
def mode_compare() -> None:
print("\n" + "="*60 + "\n RNN vs LSTM vs MLP 时序预测对比\n" + "="*60)
signal = make_timeseries(n=500)
scaler = MinMaxScaler()
signal_scaled = scaler.fit_transform(signal.reshape(-1, 1)).flatten()
SEQ_LEN = 10
X, y = make_sequences(signal_scaled, seq_len=SEQ_LEN)
split = int(len(X) * 0.8)
X_tr, X_te = X[:split], X[split:]
y_tr, y_te = y[:split], y[split:]
from sklearn.linear_model import Ridge
import time
results = []
# MLP(展平序列)
t0 = time.perf_counter()
mlp = MLPRegressor(hidden_layer_sizes=(32, 16), max_iter=500, random_state=42)
mlp.fit(X_tr, y_tr)
mse_mlp = mean_squared_error(y_te, mlp.predict(X_te))
results.append(["MLP (展平序列)", f"{SEQ_LEN}→32→16→1",
f"{mse_mlp:.6f}", f"{(time.perf_counter()-t0)*1000:.0f}ms"])
# RNN + Ridge
t0 = time.perf_counter()
rnn = SimpleRNN(input_size=1, hidden_size=16)
X_tr_r = np.array([rnn.forward(x.reshape(-1,1))[0][-1] for x in X_tr])
X_te_r = np.array([rnn.forward(x.reshape(-1,1))[0][-1] for x in X_te])
reg_r = Ridge(alpha=0.1); reg_r.fit(X_tr_r, y_tr)
mse_rnn = mean_squared_error(y_te, reg_r.predict(X_te_r))
results.append(["RNN + Ridge", f"input→16→1",
f"{mse_rnn:.6f}", f"{(time.perf_counter()-t0)*1000:.0f}ms"])
# LSTM + Ridge
t0 = time.perf_counter()
lstm = LSTM(input_size=1, hidden_size=16)
X_tr_l = np.array([lstm.forward(x.reshape(-1,1))[0][-1] for x in X_tr])
X_te_l = np.array([lstm.forward(x.reshape(-1,1))[0][-1] for x in X_te])
reg_l = Ridge(alpha=0.1); reg_l.fit(X_tr_l, y_tr)
mse_lstm = mean_squared_error(y_te, reg_l.predict(X_te_l))
results.append(["LSTM + Ridge", f"input→16→1",
f"{mse_lstm:.6f}", f"{(time.perf_counter()-t0)*1000:.0f}ms"])
print(f"\n {'模型':<20} {'结构':<15} {'测试MSE':<12} {'耗时'}")
print(f" {'─'*55}")
for row in results:
print(f" {row[0]:<20} {row[1]:<15} {row[2]:<12} {row[3]}")
print(f"\n 💡 LSTM 通过门控机制保留长期依赖,MSE 通常低于简单 RNN")
# ─── 入口 ─────────────────────────────────────────────────────────────────────
def main() -> None:
parser = argparse.ArgumentParser(description="RNN/LSTM 时序建模从零实现")
parser.add_argument(
"--mode",
choices=["rnn", "lstm", "predict", "compare", "all"],
default="all",
)
args = parser.parse_args()
dispatch = {
"rnn": mode_rnn,
"lstm": mode_lstm,
"predict": mode_predict,
"compare": mode_compare,
"all": lambda: [mode_rnn(), mode_lstm(), mode_predict(), mode_compare()],
}
dispatch[args.mode]()
if __name__ == "__main__":
main()
$ python 51-python-rnn-lstm.py --mode rnn
============================================================
手写 RNN 前向传播演示
============================================================
输入序列: [0.1 0.3 0.5 0.3 0.1]
隐藏状态 shape: (5, 4) (seq_len=5, hidden=4)
时间步 h_t (4维隐藏状态) ‖h_t‖
────────────────────────────────────────────────────────────
t=0 [+0.005 -0.001 +0.006 +0.015] 0.0173
t=1 [+0.013 -0.005 +0.018 +0.044] 0.0492
t=2 [+0.021 -0.010 +0.028 +0.070] 0.0788
t=3 [+0.008 -0.008 +0.012 +0.036] 0.0400
t=4 [+0.002 -0.003 +0.003 +0.010] 0.0114
💡 h_t 随输入变化,携带了历史信息(t=2时信号最强,h范数最大)
$ python 51-python-rnn-lstm.py --mode lstm
============================================================
手写 LSTM 门控机制演示
============================================================
输入序列: [0.1 0.3 0.6 0.9 0.7 0.4 0.1]
时间步 遗忘门f 输入门i 候选g 输出门o ‖h_t‖
────────────────────────────────────────────────────────────
t=0 0.501 0.730 -0.004 0.500 0.0105
t=1 0.504 0.727 -0.012 0.499 0.0364
t=2 0.509 0.724 -0.024 0.498 0.0804
t=3 0.514 0.720 -0.035 0.497 0.1332
t=4 0.513 0.723 -0.027 0.498 0.1384
t=5 0.509 0.726 -0.016 0.498 0.1103
t=6 0.504 0.730 -0.004 0.499 0.0660
💡 遗忘门接近1=记住历史,接近0=遗忘历史
输入门接近1=写入新信息,接近0=忽略新输入
$ python 51-python-rnn-lstm.py --mode predict
============================================================
时序预测实战(正弦+趋势信号)
============================================================
提取 LSTM 特征(训练集 308 序列)...
MSE: 0.002597 MAE: 0.039393
预测 vs 真实(前20个测试点,归一化后)
步骤 真实值 预测值 误差 对比
───────────────────────────────────────────────────────
0 0.1585 0.0800 0.0784 ███░
1 0.0398 0.1176 0.0778 ░░
2 0.0366 0.0876 0.0510 ░
3 0.1364 0.0697 0.0667 ██░
4 0.1296 0.1023 0.0273 ██░░
5 0.1319 0.1173 0.0146 ██░░
6 0.1419 0.1267 0.0152 ██░░
7 0.1275 0.1365 0.0090 ██░░
8 0.1041 0.1359 0.0317 ██░░
9 0.1545 0.1257 0.0288 ███░░
10 0.1374 0.1417 0.0044 ██░░
11 0.2152 0.1433 0.0719 ████░░
12 0.1839 0.1775 0.0064 ███░░░
13 0.1699 0.1831 0.0132 ███░░░
14 0.2024 0.1806 0.0218 ████░░░
15 0.2435 0.1933 0.0502 ████░░░
16 0.2168 0.2179 0.0011 ████░░░░
17 0.2177 0.2204 0.0027 ████░░░░
18 0.2700 0.2224 0.0476 █████░░░░
19 0.2794 0.2459 0.0335 █████░░░░
小结与 NexDo Time ⚡
这一篇你已经看懂了序列建模的关键:RNN 用隐藏状态传递历史,LSTM 用门控机制管理长期记忆,滑动窗口把连续时间线切成可训练样本。先理解前向传播和状态流动,再谈训练和优化,学习坡度会平很多。
5 分钟微操挑战:把 mode_predict() 里的 SEQ_LEN = 15 改成 5 和 25,分别运行 --mode predict,观察 MSE 和 MAE 是否变化。思考:窗口太短和太长分别会带来什么问题?
Don’t wait for next time, do it in the next moment.