文章

51 · RNN/LSTM:时序数据建模与序列预测

#035 · 2026-04-17 · Python

🔗 知识图谱导航:阅读本文前,建议先回顾《48 · 神经网络基础:从零手写前向传播与反向传播》里的矩阵运算和激活函数,再回顾《50 · CNN 卷积神经网络》里“按结构处理数据”的思想。本文会把注意力从图像空间转向时间序列。 NexDo Time · 2026-04-17 · 预计阅读 34 分钟

痛点与架构

普通神经网络处理每个样本时,通常把它当作独立个体。但时序数据不一样:今天的温度和昨天有关,当前一句话和前一句有关,股票此刻的波动也可能受前面走势影响。RNN 的核心价值,就是把“过去的信息”带到“现在的判断”里。

RNN 用隐藏状态 h_t 传递记忆,但长序列里早期信息容易衰减。LSTM 在 RNN 基础上增加了细胞状态和三个关键门,让模型学会选择性遗忘、选择性写入、选择性输出。

RNN:  x_t + h_{t-1} -> h_t
LSTM: x_t + h_{t-1} + c_{t-1}
        -> 遗忘门/输入门/候选值/输出门
        -> c_t 和 h_t

步步为营:核心逻辑自适应拆解

RNN/LSTM 的概念比 CNN 更抽象,所以这一篇拆成 9 个小步骤。你先看激活函数和隐藏状态,再看门控机制,最后跑完整的时序预测。

Step 1:用 sigmoid/tanh 理解 RNN 门和隐藏状态的数值范围

痛点与机制

RNN/LSTM 里的很多公式都离不开 sigmoid 和 tanh。sigmoid 像一个 0 到 1 的阀门,决定“开多少”;tanh 像带方向的音量旋钮,输出在 -1 到 1 之间,能表示正向或反向影响。先看这两个函数,后面的门控机制就不会像天书。

核心源码(逐字来自文末完整源码)

def sigmoid(x: np.ndarray) -> np.ndarray:
    return 1.0 / (1.0 + np.exp(-np.clip(x, -500, 500)))

def tanh(x: np.ndarray) -> np.ndarray:
    return np.tanh(x)

可运行演示(补齐 Mock 数据与 print 反馈)

import numpy as np

def sigmoid(x: np.ndarray) -> np.ndarray:
    return 1.0 / (1.0 + np.exp(-np.clip(x, -500, 500)))

def tanh(x: np.ndarray) -> np.ndarray:
    return np.tanh(x)

values = np.array([[-3.0, -1.0, 0.0, 1.0, 3.0]])
print("原始输入:", values.tolist())
print("sigmoid 输出:", np.round(sigmoid(values), 4).tolist())
print("tanh 输出:", np.round(tanh(values), 4).tolist())
print("直觉:sigmoid 像 0~1 的开关,tanh 像 -1~1 的方向盘。")

Step 2:用 SimpleRNN 把隐藏状态像接力棒一样往后传

痛点与机制

普通 MLP 每次只看当前输入,RNN 会把上一步的隐藏状态也带进来。你可以把 h_t 想成接力棒:每个时间步拿到当前输入,也拿到前面传来的记忆,再生成新的记忆传给下一步。

核心源码(逐字来自文末完整源码)

class SimpleRNN:
    """
    单层 RNN:h_t = tanh(W_x·x_t + W_h·h_{t-1} + b)
    输入: (seq_len, input_size)
    输出: (seq_len, hidden_size)
    """

    def __init__(self, input_size: int, hidden_size: int, seed: int = 42) -> None:
        rng = np.random.RandomState(seed)
        scale = 0.1
        self.W_x = rng.randn(input_size, hidden_size) * scale
        self.W_h = rng.randn(hidden_size, hidden_size) * scale
        self.b   = np.zeros(hidden_size)
        self.hidden_size = hidden_size

    def forward(self, X: np.ndarray) -> tuple[np.ndarray, list[np.ndarray]]:
        """
        X: (seq_len, input_size)
        返回: (所有时间步隐藏状态, 隐藏状态列表)
        """
        seq_len = X.shape[0]
        h = np.zeros(self.hidden_size)
        h_states: list[np.ndarray] = []

        for t in range(seq_len):
            h = tanh(X[t] @ self.W_x + h @ self.W_h + self.b)
            h_states.append(h.copy())

        return np.array(h_states), h_states

可运行演示(补齐 Mock 数据与 print 反馈)

import numpy as np

def sigmoid(x: np.ndarray) -> np.ndarray:
    return 1.0 / (1.0 + np.exp(-np.clip(x, -500, 500)))

def tanh(x: np.ndarray) -> np.ndarray:
    return np.tanh(x)

class SimpleRNN:
    """
    单层 RNN:h_t = tanh(W_x·x_t + W_h·h_{t-1} + b)
    输入: (seq_len, input_size)
    输出: (seq_len, hidden_size)
    """

    def __init__(self, input_size: int, hidden_size: int, seed: int = 42) -> None:
        rng = np.random.RandomState(seed)
        scale = 0.1
        self.W_x = rng.randn(input_size, hidden_size) * scale
        self.W_h = rng.randn(hidden_size, hidden_size) * scale
        self.b   = np.zeros(hidden_size)
        self.hidden_size = hidden_size

    def forward(self, X: np.ndarray) -> tuple[np.ndarray, list[np.ndarray]]:
        """
        X: (seq_len, input_size)
        返回: (所有时间步隐藏状态, 隐藏状态列表)
        """
        seq_len = X.shape[0]
        h = np.zeros(self.hidden_size)
        h_states: list[np.ndarray] = []

        for t in range(seq_len):
            h = tanh(X[t] @ self.W_x + h @ self.W_h + self.b)
            h_states.append(h.copy())

        return np.array(h_states), h_states

seq = np.array([[0.1], [0.3], [0.5], [0.3], [0.1]])
rnn = SimpleRNN(input_size=1, hidden_size=3)
h_states, _ = rnn.forward(seq)

print("输入序列:", seq.flatten().tolist())
for t, h in enumerate(h_states):
    print(f"t={t} 隐藏状态:", np.round(h, 4).tolist(), "范数=", round(float(np.linalg.norm(h)), 4))
print("直觉:h_t 像接力棒,把前面时间步的信息传给下一步。")

Step 3:用 LSTM 的四个门控制“记住、写入、输出”

痛点与机制

LSTM 比 RNN 多了细胞状态和门控机制。遗忘门像橡皮擦,决定旧记忆擦掉多少;输入门像写入按钮,决定新信息写多少;候选值是准备写入的内容;输出门像展示窗口,决定这一步给外面看多少记忆。

核心源码(逐字来自文末完整源码)

class LSTM:
    """
    单层 LSTM,实现四个门控机制。
    输入: (seq_len, input_size)
    输出: (seq_len, hidden_size)
    """

    def __init__(self, input_size: int, hidden_size: int, seed: int = 42) -> None:
        rng = np.random.RandomState(seed)
        scale = 0.1
        concat_size = input_size + hidden_size

        # 四个门的权重矩阵(合并为一个大矩阵,提高效率)
        self.W = rng.randn(concat_size, 4 * hidden_size) * scale
        self.b = np.zeros(4 * hidden_size)
        # 遗忘门偏置初始化为1(让网络初始倾向于记住信息)
        self.b[hidden_size:2*hidden_size] = 1.0
        self.hidden_size = hidden_size

    def forward(self, X: np.ndarray) -> tuple[np.ndarray, dict]:
        """
        X: (seq_len, input_size)
        返回: (所有时间步隐藏状态, 门控状态字典)
        """
        seq_len = X.shape[0]
        H = self.hidden_size
        h = np.zeros(H)
        c = np.zeros(H)
        h_states: list[np.ndarray] = []
        gate_log: list[dict] = []

        for t in range(seq_len):
            # 拼接输入和上一步隐藏状态
            concat = np.concatenate([X[t], h])   # (input+hidden,)
            gates = concat @ self.W + self.b      # (4*hidden,)

            # 分割四个门
            f = sigmoid(gates[0*H:1*H])           # 遗忘门
            i = sigmoid(gates[1*H:2*H])           # 输入门
            g = tanh(gates[2*H:3*H])              # 候选值
            o = sigmoid(gates[3*H:4*H])           # 输出门

            # 更新细胞状态和隐藏状态
            c = f * c + i * g
            h = o * tanh(c)

            h_states.append(h.copy())
            gate_log.append({"f": f.mean(), "i": i.mean(),
                             "g": g.mean(), "o": o.mean()})

        return np.array(h_states), {"gates": gate_log}

可运行演示(补齐 Mock 数据与 print 反馈)

import numpy as np

def sigmoid(x: np.ndarray) -> np.ndarray:
    return 1.0 / (1.0 + np.exp(-np.clip(x, -500, 500)))

def tanh(x: np.ndarray) -> np.ndarray:
    return np.tanh(x)

class LSTM:
    """
    单层 LSTM,实现四个门控机制。
    输入: (seq_len, input_size)
    输出: (seq_len, hidden_size)
    """

    def __init__(self, input_size: int, hidden_size: int, seed: int = 42) -> None:
        rng = np.random.RandomState(seed)
        scale = 0.1
        concat_size = input_size + hidden_size

        # 四个门的权重矩阵(合并为一个大矩阵,提高效率)
        self.W = rng.randn(concat_size, 4 * hidden_size) * scale
        self.b = np.zeros(4 * hidden_size)
        # 遗忘门偏置初始化为1(让网络初始倾向于记住信息)
        self.b[hidden_size:2*hidden_size] = 1.0
        self.hidden_size = hidden_size

    def forward(self, X: np.ndarray) -> tuple[np.ndarray, dict]:
        """
        X: (seq_len, input_size)
        返回: (所有时间步隐藏状态, 门控状态字典)
        """
        seq_len = X.shape[0]
        H = self.hidden_size
        h = np.zeros(H)
        c = np.zeros(H)
        h_states: list[np.ndarray] = []
        gate_log: list[dict] = []

        for t in range(seq_len):
            # 拼接输入和上一步隐藏状态
            concat = np.concatenate([X[t], h])   # (input+hidden,)
            gates = concat @ self.W + self.b      # (4*hidden,)

            # 分割四个门
            f = sigmoid(gates[0*H:1*H])           # 遗忘门
            i = sigmoid(gates[1*H:2*H])           # 输入门
            g = tanh(gates[2*H:3*H])              # 候选值
            o = sigmoid(gates[3*H:4*H])           # 输出门

            # 更新细胞状态和隐藏状态
            c = f * c + i * g
            h = o * tanh(c)

            h_states.append(h.copy())
            gate_log.append({"f": f.mean(), "i": i.mean(),
                             "g": g.mean(), "o": o.mean()})

        return np.array(h_states), {"gates": gate_log}

seq = np.array([[0.1], [0.3], [0.6], [0.9], [0.4]])
lstm = LSTM(input_size=1, hidden_size=4)
h_states, info = lstm.forward(seq)

print("输入序列:", seq.flatten().tolist())
for t, gate in enumerate(info["gates"]):
    print(f"t={t} f={gate['f']:.3f} i={gate['i']:.3f} g={gate['g']:+.3f} o={gate['o']:.3f} |h|={np.linalg.norm(h_states[t]):.4f}")
print("直觉:遗忘门像橡皮擦,输入门像写入按钮,输出门像展示窗口。")

Step 4:用 make_timeseries 造一条可预测的时间序列

痛点与机制

真实股价或传感器数据不适合新手第一步就拿来练,因为噪声和业务因素太多。这里生成“正弦波 + 谐波 + 趋势 + 噪声”的模拟序列,像一条有规律但不死板的练习曲。

核心源码(逐字来自文末完整源码)

def make_timeseries(n: int = 500, seed: int = 42) -> np.ndarray:
    """生成带噪声的正弦+趋势时序数据(模拟股价/传感器信号)。"""
    rng = np.random.RandomState(seed)
    t = np.linspace(0, 4 * np.pi, n)
    signal = (
        np.sin(t)                          # 主周期
        + 0.5 * np.sin(3 * t)             # 谐波
        + 0.1 * t / (4 * np.pi)           # 趋势
        + rng.randn(n) * 0.1              # 噪声
    )
    return signal

可运行演示(补齐 Mock 数据与 print 反馈)

import numpy as np

def make_timeseries(n: int = 500, seed: int = 42) -> np.ndarray:
    """生成带噪声的正弦+趋势时序数据(模拟股价/传感器信号)。"""
    rng = np.random.RandomState(seed)
    t = np.linspace(0, 4 * np.pi, n)
    signal = (
        np.sin(t)                          # 主周期
        + 0.5 * np.sin(3 * t)             # 谐波
        + 0.1 * t / (4 * np.pi)           # 趋势
        + rng.randn(n) * 0.1              # 噪声
    )
    return signal

signal = make_timeseries(n=20, seed=7)
print("序列长度:", len(signal))
print("前 8 个点:", np.round(signal[:8], 4).tolist())
print("最小/最大:", round(float(signal.min()), 4), round(float(signal.max()), 4))
print("说明:这是正弦波 + 谐波 + 趋势 + 噪声,像简化版传感器数据。")

Step 5:用 make_sequences 把连续序列切成监督学习样本

痛点与机制

模型不能直接吃一整条无限长的时间线,所以要切滑动窗口。比如用最近 10 个点预测第 11 个点,这就把时序问题变成了普通的 X/y 训练样本。

核心源码(逐字来自文末完整源码)

def make_sequences(data: np.ndarray, seq_len: int = 10
                   ) -> tuple[np.ndarray, np.ndarray]:
    """将时序数据切分为 (X, y) 滑动窗口对。"""
    X, y = [], []
    for i in range(len(data) - seq_len):
        X.append(data[i:i + seq_len])
        y.append(data[i + seq_len])
    return np.array(X), np.array(y)

可运行演示(补齐 Mock 数据与 print 反馈)

import numpy as np

def make_sequences(data: np.ndarray, seq_len: int = 10
                   ) -> tuple[np.ndarray, np.ndarray]:
    """将时序数据切分为 (X, y) 滑动窗口对。"""
    X, y = [], []
    for i in range(len(data) - seq_len):
        X.append(data[i:i + seq_len])
        y.append(data[i + seq_len])
    return np.array(X), np.array(y)

data = np.array([10, 11, 12, 13, 14, 15], dtype=float)
X, y = make_sequences(data, seq_len=3)
print("原始序列:", data.tolist())
for i, (window, target) in enumerate(zip(X, y)):
    print(f"样本{i}: 输入窗口={window.tolist()} -> 预测下一个值={target}")
print("直觉:滑动窗口像用最近 3 天的数据预测第 4 天。")

Step 6:用 mode_rnn 打印每个时间步的隐藏状态

痛点与机制

只看最终预测很难理解 RNN。mode_rnn() 把每个时间步的 4 维隐藏状态都打印出来,让你看到输入变化时,记忆向量也跟着变化。范数越大,说明当前隐藏状态的整体强度越明显。

核心源码(逐字来自文末完整源码)

def mode_rnn() -> None:
    print("\n" + "="*60 + "\n  手写 RNN 前向传播演示\n" + "="*60)

    rnn = SimpleRNN(input_size=1, hidden_size=4)
    # 输入:5个时间步,每步1个特征
    X = np.array([[0.1], [0.3], [0.5], [0.3], [0.1]])
    h_states, _ = rnn.forward(X)

    print(f"\n  输入序列: {X.flatten()}")
    print(f"  隐藏状态 shape: {h_states.shape}  (seq_len=5, hidden=4)\n")
    print(f"  {'时间步':<8} {'h_t (4维隐藏状态)':<40} {'‖h_t‖'}")
    print(f"  {'─'*60}")
    for t, h in enumerate(h_states):
        norm = np.linalg.norm(h)
        vals = "  ".join(f"{v:+.3f}" for v in h)
        print(f"  t={t}      [{vals}]  {norm:.4f}")

    print(f"\n  💡 h_t 随输入变化,携带了历史信息(t=2时信号最强,h范数最大)")

可运行演示(补齐 Mock 数据与 print 反馈)

import numpy as np

def sigmoid(x: np.ndarray) -> np.ndarray:
    return 1.0 / (1.0 + np.exp(-np.clip(x, -500, 500)))

def tanh(x: np.ndarray) -> np.ndarray:
    return np.tanh(x)

class SimpleRNN:
    """
    单层 RNN:h_t = tanh(W_x·x_t + W_h·h_{t-1} + b)
    输入: (seq_len, input_size)
    输出: (seq_len, hidden_size)
    """

    def __init__(self, input_size: int, hidden_size: int, seed: int = 42) -> None:
        rng = np.random.RandomState(seed)
        scale = 0.1
        self.W_x = rng.randn(input_size, hidden_size) * scale
        self.W_h = rng.randn(hidden_size, hidden_size) * scale
        self.b   = np.zeros(hidden_size)
        self.hidden_size = hidden_size

    def forward(self, X: np.ndarray) -> tuple[np.ndarray, list[np.ndarray]]:
        """
        X: (seq_len, input_size)
        返回: (所有时间步隐藏状态, 隐藏状态列表)
        """
        seq_len = X.shape[0]
        h = np.zeros(self.hidden_size)
        h_states: list[np.ndarray] = []

        for t in range(seq_len):
            h = tanh(X[t] @ self.W_x + h @ self.W_h + self.b)
            h_states.append(h.copy())

        return np.array(h_states), h_states

def mode_rnn() -> None:
    print("\n" + "="*60 + "\n  手写 RNN 前向传播演示\n" + "="*60)

    rnn = SimpleRNN(input_size=1, hidden_size=4)
    # 输入:5个时间步,每步1个特征
    X = np.array([[0.1], [0.3], [0.5], [0.3], [0.1]])
    h_states, _ = rnn.forward(X)

    print(f"\n  输入序列: {X.flatten()}")
    print(f"  隐藏状态 shape: {h_states.shape}  (seq_len=5, hidden=4)\n")
    print(f"  {'时间步':<8} {'h_t (4维隐藏状态)':<40} {'‖h_t‖'}")
    print(f"  {'─'*60}")
    for t, h in enumerate(h_states):
        norm = np.linalg.norm(h)
        vals = "  ".join(f"{v:+.3f}" for v in h)
        print(f"  t={t}      [{vals}]  {norm:.4f}")

    print(f"\n  💡 h_t 随输入变化,携带了历史信息(t=2时信号最强,h范数最大)")

mode_rnn()

Step 7:用 mode_lstm 观察遗忘门、输入门和输出门

痛点与机制

LSTM 的门控如果只看公式很抽象,打印均值后会直观很多。遗忘门接近 1 代表更愿意保留旧记忆,输入门接近 1 代表更愿意写入新信息。它像一个会选择性记笔记的学生,不是什么都死记硬背。

核心源码(逐字来自文末完整源码)

def mode_lstm() -> None:
    print("\n" + "="*60 + "\n  手写 LSTM 门控机制演示\n" + "="*60)

    lstm = LSTM(input_size=1, hidden_size=8)
    # 模拟一段上升后下降的信号
    X = np.array([[0.1], [0.3], [0.6], [0.9], [0.7], [0.4], [0.1]])
    h_states, info = lstm.forward(X)

    print(f"\n  输入序列: {X.flatten()}")
    print(f"\n  {'时间步':<8} {'遗忘门f':<10} {'输入门i':<10} {'候选g':<10} {'输出门o':<10} {'‖h_t‖'}")
    print(f"  {'─'*60}")
    for t, gate in enumerate(info["gates"]):
        norm = np.linalg.norm(h_states[t])
        print(f"  t={t}      {gate['f']:.3f}     {gate['i']:.3f}     "
              f"{gate['g']:+.3f}     {gate['o']:.3f}     {norm:.4f}")

    print(f"\n  💡 遗忘门接近1=记住历史,接近0=遗忘历史")
    print(f"     输入门接近1=写入新信息,接近0=忽略新输入")

可运行演示(补齐 Mock 数据与 print 反馈)

import numpy as np

def sigmoid(x: np.ndarray) -> np.ndarray:
    return 1.0 / (1.0 + np.exp(-np.clip(x, -500, 500)))

def tanh(x: np.ndarray) -> np.ndarray:
    return np.tanh(x)

class LSTM:
    """
    单层 LSTM,实现四个门控机制。
    输入: (seq_len, input_size)
    输出: (seq_len, hidden_size)
    """

    def __init__(self, input_size: int, hidden_size: int, seed: int = 42) -> None:
        rng = np.random.RandomState(seed)
        scale = 0.1
        concat_size = input_size + hidden_size

        # 四个门的权重矩阵(合并为一个大矩阵,提高效率)
        self.W = rng.randn(concat_size, 4 * hidden_size) * scale
        self.b = np.zeros(4 * hidden_size)
        # 遗忘门偏置初始化为1(让网络初始倾向于记住信息)
        self.b[hidden_size:2*hidden_size] = 1.0
        self.hidden_size = hidden_size

    def forward(self, X: np.ndarray) -> tuple[np.ndarray, dict]:
        """
        X: (seq_len, input_size)
        返回: (所有时间步隐藏状态, 门控状态字典)
        """
        seq_len = X.shape[0]
        H = self.hidden_size
        h = np.zeros(H)
        c = np.zeros(H)
        h_states: list[np.ndarray] = []
        gate_log: list[dict] = []

        for t in range(seq_len):
            # 拼接输入和上一步隐藏状态
            concat = np.concatenate([X[t], h])   # (input+hidden,)
            gates = concat @ self.W + self.b      # (4*hidden,)

            # 分割四个门
            f = sigmoid(gates[0*H:1*H])           # 遗忘门
            i = sigmoid(gates[1*H:2*H])           # 输入门
            g = tanh(gates[2*H:3*H])              # 候选值
            o = sigmoid(gates[3*H:4*H])           # 输出门

            # 更新细胞状态和隐藏状态
            c = f * c + i * g
            h = o * tanh(c)

            h_states.append(h.copy())
            gate_log.append({"f": f.mean(), "i": i.mean(),
                             "g": g.mean(), "o": o.mean()})

        return np.array(h_states), {"gates": gate_log}

def mode_lstm() -> None:
    print("\n" + "="*60 + "\n  手写 LSTM 门控机制演示\n" + "="*60)

    lstm = LSTM(input_size=1, hidden_size=8)
    # 模拟一段上升后下降的信号
    X = np.array([[0.1], [0.3], [0.6], [0.9], [0.7], [0.4], [0.1]])
    h_states, info = lstm.forward(X)

    print(f"\n  输入序列: {X.flatten()}")
    print(f"\n  {'时间步':<8} {'遗忘门f':<10} {'输入门i':<10} {'候选g':<10} {'输出门o':<10} {'‖h_t‖'}")
    print(f"  {'─'*60}")
    for t, gate in enumerate(info["gates"]):
        norm = np.linalg.norm(h_states[t])
        print(f"  t={t}      {gate['f']:.3f}     {gate['i']:.3f}     "
              f"{gate['g']:+.3f}     {gate['o']:.3f}     {norm:.4f}")

    print(f"\n  💡 遗忘门接近1=记住历史,接近0=遗忘历史")
    print(f"     输入门接近1=写入新信息,接近0=忽略新输入")

mode_lstm()

Step 8:用 mode_predict 跑通 LSTM 特征 + Ridge 的时序预测

痛点与机制

完整训练 LSTM 反向传播很复杂,这篇先用 LSTM 前向传播提取序列特征,再交给 Ridge 回归做预测。可以把 LSTM 理解成“读序列的摘要员”,Ridge 是“根据摘要给出下一步数值的打分器”。

核心源码(逐字来自文末完整源码)

def mode_predict() -> None:
    print("\n" + "="*60 + "\n  时序预测实战(正弦+趋势信号)\n" + "="*60)

    signal = make_timeseries(n=400)
    scaler = MinMaxScaler()
    signal_scaled = scaler.fit_transform(signal.reshape(-1, 1)).flatten()

    SEQ_LEN = 15
    X, y = make_sequences(signal_scaled, seq_len=SEQ_LEN)
    split = int(len(X) * 0.8)
    X_tr, X_te = X[:split], X[split:]
    y_tr, y_te = y[:split], y[split:]

    # 用 LSTM 最后一步隐藏状态 + 线性层预测
    lstm = LSTM(input_size=1, hidden_size=16)

    # 提取特征:每个序列的 LSTM 最后隐藏状态
    def extract_features(seqs: np.ndarray) -> np.ndarray:
        feats = []
        for seq in seqs:
            X_seq = seq.reshape(-1, 1)
            h_states, _ = lstm.forward(X_seq)
            feats.append(h_states[-1])   # 最后时间步的隐藏状态
        return np.array(feats)

    print(f"\n  提取 LSTM 特征(训练集 {len(X_tr)} 序列)...")
    X_tr_feat = extract_features(X_tr)
    X_te_feat = extract_features(X_te)

    # 用 sklearn 线性回归拟合
    from sklearn.linear_model import Ridge
    reg = Ridge(alpha=0.1)
    reg.fit(X_tr_feat, y_tr)
    y_pred = reg.predict(X_te_feat)

    mse = mean_squared_error(y_te, y_pred)
    mae = np.mean(np.abs(y_te - y_pred))

    print(f"  MSE: {mse:.6f}  MAE: {mae:.6f}")

    # ASCII 预测 vs 真实值对比(前20个测试点)
    print(f"\n  预测 vs 真实(前20个测试点,归一化后)")
    print(f"  {'步骤':<6} {'真实值':<10} {'预测值':<10} {'误差':<10} 对比")
    print(f"  {'─'*55}")
    for i in range(min(20, len(y_te))):
        real, pred = y_te[i], y_pred[i]
        err = abs(real - pred)
        bar_r = "█" * int(real * 20)
        bar_p = "░" * int(pred * 20)
        print(f"  {i:<6} {real:.4f}    {pred:.4f}    {err:.4f}    {bar_r}{bar_p}")

可运行演示(补齐 Mock 数据与 print 反馈)

import numpy as np
from sklearn.linear_model import Ridge
from sklearn.metrics import mean_squared_error
from sklearn.preprocessing import MinMaxScaler

def sigmoid(x: np.ndarray) -> np.ndarray:
    return 1.0 / (1.0 + np.exp(-np.clip(x, -500, 500)))

def tanh(x: np.ndarray) -> np.ndarray:
    return np.tanh(x)

class LSTM:
    """
    单层 LSTM,实现四个门控机制。
    输入: (seq_len, input_size)
    输出: (seq_len, hidden_size)
    """

    def __init__(self, input_size: int, hidden_size: int, seed: int = 42) -> None:
        rng = np.random.RandomState(seed)
        scale = 0.1
        concat_size = input_size + hidden_size

        # 四个门的权重矩阵(合并为一个大矩阵,提高效率)
        self.W = rng.randn(concat_size, 4 * hidden_size) * scale
        self.b = np.zeros(4 * hidden_size)
        # 遗忘门偏置初始化为1(让网络初始倾向于记住信息)
        self.b[hidden_size:2*hidden_size] = 1.0
        self.hidden_size = hidden_size

    def forward(self, X: np.ndarray) -> tuple[np.ndarray, dict]:
        """
        X: (seq_len, input_size)
        返回: (所有时间步隐藏状态, 门控状态字典)
        """
        seq_len = X.shape[0]
        H = self.hidden_size
        h = np.zeros(H)
        c = np.zeros(H)
        h_states: list[np.ndarray] = []
        gate_log: list[dict] = []

        for t in range(seq_len):
            # 拼接输入和上一步隐藏状态
            concat = np.concatenate([X[t], h])   # (input+hidden,)
            gates = concat @ self.W + self.b      # (4*hidden,)

            # 分割四个门
            f = sigmoid(gates[0*H:1*H])           # 遗忘门
            i = sigmoid(gates[1*H:2*H])           # 输入门
            g = tanh(gates[2*H:3*H])              # 候选值
            o = sigmoid(gates[3*H:4*H])           # 输出门

            # 更新细胞状态和隐藏状态
            c = f * c + i * g
            h = o * tanh(c)

            h_states.append(h.copy())
            gate_log.append({"f": f.mean(), "i": i.mean(),
                             "g": g.mean(), "o": o.mean()})

        return np.array(h_states), {"gates": gate_log}

def make_timeseries(n: int = 500, seed: int = 42) -> np.ndarray:
    """生成带噪声的正弦+趋势时序数据(模拟股价/传感器信号)。"""
    rng = np.random.RandomState(seed)
    t = np.linspace(0, 4 * np.pi, n)
    signal = (
        np.sin(t)                          # 主周期
        + 0.5 * np.sin(3 * t)             # 谐波
        + 0.1 * t / (4 * np.pi)           # 趋势
        + rng.randn(n) * 0.1              # 噪声
    )
    return signal

def make_sequences(data: np.ndarray, seq_len: int = 10
                   ) -> tuple[np.ndarray, np.ndarray]:
    """将时序数据切分为 (X, y) 滑动窗口对。"""
    X, y = [], []
    for i in range(len(data) - seq_len):
        X.append(data[i:i + seq_len])
        y.append(data[i + seq_len])
    return np.array(X), np.array(y)

def mode_predict() -> None:
    print("\n" + "="*60 + "\n  时序预测实战(正弦+趋势信号)\n" + "="*60)

    signal = make_timeseries(n=400)
    scaler = MinMaxScaler()
    signal_scaled = scaler.fit_transform(signal.reshape(-1, 1)).flatten()

    SEQ_LEN = 15
    X, y = make_sequences(signal_scaled, seq_len=SEQ_LEN)
    split = int(len(X) * 0.8)
    X_tr, X_te = X[:split], X[split:]
    y_tr, y_te = y[:split], y[split:]

    # 用 LSTM 最后一步隐藏状态 + 线性层预测
    lstm = LSTM(input_size=1, hidden_size=16)

    # 提取特征:每个序列的 LSTM 最后隐藏状态
    def extract_features(seqs: np.ndarray) -> np.ndarray:
        feats = []
        for seq in seqs:
            X_seq = seq.reshape(-1, 1)
            h_states, _ = lstm.forward(X_seq)
            feats.append(h_states[-1])   # 最后时间步的隐藏状态
        return np.array(feats)

    print(f"\n  提取 LSTM 特征(训练集 {len(X_tr)} 序列)...")
    X_tr_feat = extract_features(X_tr)
    X_te_feat = extract_features(X_te)

    # 用 sklearn 线性回归拟合
    from sklearn.linear_model import Ridge
    reg = Ridge(alpha=0.1)
    reg.fit(X_tr_feat, y_tr)
    y_pred = reg.predict(X_te_feat)

    mse = mean_squared_error(y_te, y_pred)
    mae = np.mean(np.abs(y_te - y_pred))

    print(f"  MSE: {mse:.6f}  MAE: {mae:.6f}")

    # ASCII 预测 vs 真实值对比(前20个测试点)
    print(f"\n  预测 vs 真实(前20个测试点,归一化后)")
    print(f"  {'步骤':<6} {'真实值':<10} {'预测值':<10} {'误差':<10} 对比")
    print(f"  {'─'*55}")
    for i in range(min(20, len(y_te))):
        real, pred = y_te[i], y_pred[i]
        err = abs(real - pred)
        bar_r = "█" * int(real * 20)
        bar_p = "░" * int(pred * 20)
        print(f"  {i:<6} {real:.4f}    {pred:.4f}    {err:.4f}    {bar_r}{bar_p}")

mode_predict()

Step 9:用 main 把 rnn/lstm/predict/compare 做成命令行入口

痛点与机制

命令行入口让读者不用改源码,只靠 --mode 切换实验。学习时先跑 rnnlstm 看机制,再跑 predictcompare 看完整任务,这样节奏更稳。

核心源码(逐字来自文末完整源码)

def main() -> None:
    parser = argparse.ArgumentParser(description="RNN/LSTM 时序建模从零实现")
    parser.add_argument(
        "--mode",
        choices=["rnn", "lstm", "predict", "compare", "all"],
        default="all",
    )
    args = parser.parse_args()
    dispatch = {
        "rnn":     mode_rnn,
        "lstm":    mode_lstm,
        "predict": mode_predict,
        "compare": mode_compare,
        "all":     lambda: [mode_rnn(), mode_lstm(), mode_predict(), mode_compare()],
    }
    dispatch[args.mode]()

可运行演示(补齐 Mock 数据与 print 反馈)

import argparse

def main() -> None:
    parser = argparse.ArgumentParser(description="RNN/LSTM 时序建模从零实现")
    parser.add_argument(
        "--mode",
        choices=["rnn", "lstm", "predict", "compare", "all"],
        default="all",
    )
    args = parser.parse_args()
    dispatch = {
        "rnn":     mode_rnn,
        "lstm":    mode_lstm,
        "predict": mode_predict,
        "compare": mode_compare,
        "all":     lambda: [mode_rnn(), mode_lstm(), mode_predict(), mode_compare()],
    }
    dispatch[args.mode]()

def mode_rnn() -> None:
    print("运行 RNN 前向传播")


def mode_lstm() -> None:
    print("运行 LSTM 门控演示")


def mode_predict() -> None:
    print("运行时序预测")


def mode_compare() -> None:
    print("运行 RNN/LSTM/MLP 对比")

import sys
for mode in ["rnn", "lstm", "predict", "compare"]:
    print(f"\n$ python 51-python-rnn-lstm.py --mode {mode}")
    sys.argv = ["51-python-rnn-lstm.py", "--mode", mode]
    main()

极客实战:完整源码与运行

现在,把上面的积木拼起来,将下面完整代码保存为 51-python-rnn-lstm.py。它会生成一条模拟时序数据,分别演示 RNN、LSTM、LSTM 特征预测和模型对比。

#!/usr/bin/env python3
"""
51-python-rnn-lstm.py — RNN/LSTM 时序建模从零实现

用法:
  python3 51-python-rnn-lstm.py --mode rnn      # 手写 RNN 前向传播
  python3 51-python-rnn-lstm.py --mode lstm     # 手写 LSTM 前向传播
  python3 51-python-rnn-lstm.py --mode predict  # 时序预测实战
  python3 51-python-rnn-lstm.py --mode compare  # RNN vs LSTM vs MLP 对比
  python3 51-python-rnn-lstm.py --mode all      # 全部(默认)

依赖 numpy + scikit-learn,直接运行。
"""

import argparse

import numpy as np
from sklearn.metrics import mean_squared_error
from sklearn.neural_network import MLPRegressor
from sklearn.preprocessing import MinMaxScaler



def sigmoid(x: np.ndarray) -> np.ndarray:
    return 1.0 / (1.0 + np.exp(-np.clip(x, -500, 500)))

def tanh(x: np.ndarray) -> np.ndarray:
    return np.tanh(x)

# ─── 手写 RNN ──────────────────────────────────────────────────────────────────

class SimpleRNN:
    """
    单层 RNN:h_t = tanh(W_x·x_t + W_h·h_{t-1} + b)
    输入: (seq_len, input_size)
    输出: (seq_len, hidden_size)
    """

    def __init__(self, input_size: int, hidden_size: int, seed: int = 42) -> None:
        rng = np.random.RandomState(seed)
        scale = 0.1
        self.W_x = rng.randn(input_size, hidden_size) * scale
        self.W_h = rng.randn(hidden_size, hidden_size) * scale
        self.b   = np.zeros(hidden_size)
        self.hidden_size = hidden_size

    def forward(self, X: np.ndarray) -> tuple[np.ndarray, list[np.ndarray]]:
        """
        X: (seq_len, input_size)
        返回: (所有时间步隐藏状态, 隐藏状态列表)
        """
        seq_len = X.shape[0]
        h = np.zeros(self.hidden_size)
        h_states: list[np.ndarray] = []

        for t in range(seq_len):
            h = tanh(X[t] @ self.W_x + h @ self.W_h + self.b)
            h_states.append(h.copy())

        return np.array(h_states), h_states

# ─── 手写 LSTM ─────────────────────────────────────────────────────────────────

class LSTM:
    """
    单层 LSTM,实现四个门控机制。
    输入: (seq_len, input_size)
    输出: (seq_len, hidden_size)
    """

    def __init__(self, input_size: int, hidden_size: int, seed: int = 42) -> None:
        rng = np.random.RandomState(seed)
        scale = 0.1
        concat_size = input_size + hidden_size

        # 四个门的权重矩阵(合并为一个大矩阵,提高效率)
        self.W = rng.randn(concat_size, 4 * hidden_size) * scale
        self.b = np.zeros(4 * hidden_size)
        # 遗忘门偏置初始化为1(让网络初始倾向于记住信息)
        self.b[hidden_size:2*hidden_size] = 1.0
        self.hidden_size = hidden_size

    def forward(self, X: np.ndarray) -> tuple[np.ndarray, dict]:
        """
        X: (seq_len, input_size)
        返回: (所有时间步隐藏状态, 门控状态字典)
        """
        seq_len = X.shape[0]
        H = self.hidden_size
        h = np.zeros(H)
        c = np.zeros(H)
        h_states: list[np.ndarray] = []
        gate_log: list[dict] = []

        for t in range(seq_len):
            # 拼接输入和上一步隐藏状态
            concat = np.concatenate([X[t], h])   # (input+hidden,)
            gates = concat @ self.W + self.b      # (4*hidden,)

            # 分割四个门
            f = sigmoid(gates[0*H:1*H])           # 遗忘门
            i = sigmoid(gates[1*H:2*H])           # 输入门
            g = tanh(gates[2*H:3*H])              # 候选值
            o = sigmoid(gates[3*H:4*H])           # 输出门

            # 更新细胞状态和隐藏状态
            c = f * c + i * g
            h = o * tanh(c)

            h_states.append(h.copy())
            gate_log.append({"f": f.mean(), "i": i.mean(),
                             "g": g.mean(), "o": o.mean()})

        return np.array(h_states), {"gates": gate_log}

# ─── 时序数据生成 ──────────────────────────────────────────────────────────────

def make_timeseries(n: int = 500, seed: int = 42) -> np.ndarray:
    """生成带噪声的正弦+趋势时序数据(模拟股价/传感器信号)。"""
    rng = np.random.RandomState(seed)
    t = np.linspace(0, 4 * np.pi, n)
    signal = (
        np.sin(t)                          # 主周期
        + 0.5 * np.sin(3 * t)             # 谐波
        + 0.1 * t / (4 * np.pi)           # 趋势
        + rng.randn(n) * 0.1              # 噪声
    )
    return signal


def make_sequences(data: np.ndarray, seq_len: int = 10
                   ) -> tuple[np.ndarray, np.ndarray]:
    """将时序数据切分为 (X, y) 滑动窗口对。"""
    X, y = [], []
    for i in range(len(data) - seq_len):
        X.append(data[i:i + seq_len])
        y.append(data[i + seq_len])
    return np.array(X), np.array(y)

# ─── 模式1:RNN 前向传播演示 ───────────────────────────────────────────────────

def mode_rnn() -> None:
    print("\n" + "="*60 + "\n  手写 RNN 前向传播演示\n" + "="*60)

    rnn = SimpleRNN(input_size=1, hidden_size=4)
    # 输入:5个时间步,每步1个特征
    X = np.array([[0.1], [0.3], [0.5], [0.3], [0.1]])
    h_states, _ = rnn.forward(X)

    print(f"\n  输入序列: {X.flatten()}")
    print(f"  隐藏状态 shape: {h_states.shape}  (seq_len=5, hidden=4)\n")
    print(f"  {'时间步':<8} {'h_t (4维隐藏状态)':<40} {'‖h_t‖'}")
    print(f"  {'─'*60}")
    for t, h in enumerate(h_states):
        norm = np.linalg.norm(h)
        vals = "  ".join(f"{v:+.3f}" for v in h)
        print(f"  t={t}      [{vals}]  {norm:.4f}")

    print(f"\n  💡 h_t 随输入变化,携带了历史信息(t=2时信号最强,h范数最大)")

# ─── 模式2:LSTM 门控演示 ──────────────────────────────────────────────────────

def mode_lstm() -> None:
    print("\n" + "="*60 + "\n  手写 LSTM 门控机制演示\n" + "="*60)

    lstm = LSTM(input_size=1, hidden_size=8)
    # 模拟一段上升后下降的信号
    X = np.array([[0.1], [0.3], [0.6], [0.9], [0.7], [0.4], [0.1]])
    h_states, info = lstm.forward(X)

    print(f"\n  输入序列: {X.flatten()}")
    print(f"\n  {'时间步':<8} {'遗忘门f':<10} {'输入门i':<10} {'候选g':<10} {'输出门o':<10} {'‖h_t‖'}")
    print(f"  {'─'*60}")
    for t, gate in enumerate(info["gates"]):
        norm = np.linalg.norm(h_states[t])
        print(f"  t={t}      {gate['f']:.3f}     {gate['i']:.3f}     "
              f"{gate['g']:+.3f}     {gate['o']:.3f}     {norm:.4f}")

    print(f"\n  💡 遗忘门接近1=记住历史,接近0=遗忘历史")
    print(f"     输入门接近1=写入新信息,接近0=忽略新输入")

# ─── 模式3:时序预测实战 ───────────────────────────────────────────────────────

def mode_predict() -> None:
    print("\n" + "="*60 + "\n  时序预测实战(正弦+趋势信号)\n" + "="*60)

    signal = make_timeseries(n=400)
    scaler = MinMaxScaler()
    signal_scaled = scaler.fit_transform(signal.reshape(-1, 1)).flatten()

    SEQ_LEN = 15
    X, y = make_sequences(signal_scaled, seq_len=SEQ_LEN)
    split = int(len(X) * 0.8)
    X_tr, X_te = X[:split], X[split:]
    y_tr, y_te = y[:split], y[split:]

    # 用 LSTM 最后一步隐藏状态 + 线性层预测
    lstm = LSTM(input_size=1, hidden_size=16)

    # 提取特征:每个序列的 LSTM 最后隐藏状态
    def extract_features(seqs: np.ndarray) -> np.ndarray:
        feats = []
        for seq in seqs:
            X_seq = seq.reshape(-1, 1)
            h_states, _ = lstm.forward(X_seq)
            feats.append(h_states[-1])   # 最后时间步的隐藏状态
        return np.array(feats)

    print(f"\n  提取 LSTM 特征(训练集 {len(X_tr)} 序列)...")
    X_tr_feat = extract_features(X_tr)
    X_te_feat = extract_features(X_te)

    # 用 sklearn 线性回归拟合
    from sklearn.linear_model import Ridge
    reg = Ridge(alpha=0.1)
    reg.fit(X_tr_feat, y_tr)
    y_pred = reg.predict(X_te_feat)

    mse = mean_squared_error(y_te, y_pred)
    mae = np.mean(np.abs(y_te - y_pred))

    print(f"  MSE: {mse:.6f}  MAE: {mae:.6f}")

    # ASCII 预测 vs 真实值对比(前20个测试点)
    print(f"\n  预测 vs 真实(前20个测试点,归一化后)")
    print(f"  {'步骤':<6} {'真实值':<10} {'预测值':<10} {'误差':<10} 对比")
    print(f"  {'─'*55}")
    for i in range(min(20, len(y_te))):
        real, pred = y_te[i], y_pred[i]
        err = abs(real - pred)
        bar_r = "█" * int(real * 20)
        bar_p = "░" * int(pred * 20)
        print(f"  {i:<6} {real:.4f}    {pred:.4f}    {err:.4f}    {bar_r}{bar_p}")

# ─── 模式4:RNN vs LSTM vs MLP 对比 ───────────────────────────────────────────

def mode_compare() -> None:
    print("\n" + "="*60 + "\n  RNN vs LSTM vs MLP 时序预测对比\n" + "="*60)

    signal = make_timeseries(n=500)
    scaler = MinMaxScaler()
    signal_scaled = scaler.fit_transform(signal.reshape(-1, 1)).flatten()

    SEQ_LEN = 10
    X, y = make_sequences(signal_scaled, seq_len=SEQ_LEN)
    split = int(len(X) * 0.8)
    X_tr, X_te = X[:split], X[split:]
    y_tr, y_te = y[:split], y[split:]

    from sklearn.linear_model import Ridge
    import time

    results = []

    # MLP(展平序列)
    t0 = time.perf_counter()
    mlp = MLPRegressor(hidden_layer_sizes=(32, 16), max_iter=500, random_state=42)
    mlp.fit(X_tr, y_tr)
    mse_mlp = mean_squared_error(y_te, mlp.predict(X_te))
    results.append(["MLP (展平序列)", f"{SEQ_LEN}→32→16→1",
                    f"{mse_mlp:.6f}", f"{(time.perf_counter()-t0)*1000:.0f}ms"])

    # RNN + Ridge
    t0 = time.perf_counter()
    rnn = SimpleRNN(input_size=1, hidden_size=16)
    X_tr_r = np.array([rnn.forward(x.reshape(-1,1))[0][-1] for x in X_tr])
    X_te_r = np.array([rnn.forward(x.reshape(-1,1))[0][-1] for x in X_te])
    reg_r = Ridge(alpha=0.1); reg_r.fit(X_tr_r, y_tr)
    mse_rnn = mean_squared_error(y_te, reg_r.predict(X_te_r))
    results.append(["RNN + Ridge", f"input→16→1",
                    f"{mse_rnn:.6f}", f"{(time.perf_counter()-t0)*1000:.0f}ms"])

    # LSTM + Ridge
    t0 = time.perf_counter()
    lstm = LSTM(input_size=1, hidden_size=16)
    X_tr_l = np.array([lstm.forward(x.reshape(-1,1))[0][-1] for x in X_tr])
    X_te_l = np.array([lstm.forward(x.reshape(-1,1))[0][-1] for x in X_te])
    reg_l = Ridge(alpha=0.1); reg_l.fit(X_tr_l, y_tr)
    mse_lstm = mean_squared_error(y_te, reg_l.predict(X_te_l))
    results.append(["LSTM + Ridge", f"input→16→1",
                    f"{mse_lstm:.6f}", f"{(time.perf_counter()-t0)*1000:.0f}ms"])

    print(f"\n  {'模型':<20} {'结构':<15} {'测试MSE':<12} {'耗时'}")
    print(f"  {'─'*55}")
    for row in results:
        print(f"  {row[0]:<20} {row[1]:<15} {row[2]:<12} {row[3]}")
    print(f"\n  💡 LSTM 通过门控机制保留长期依赖,MSE 通常低于简单 RNN")

# ─── 入口 ─────────────────────────────────────────────────────────────────────

def main() -> None:
    parser = argparse.ArgumentParser(description="RNN/LSTM 时序建模从零实现")
    parser.add_argument(
        "--mode",
        choices=["rnn", "lstm", "predict", "compare", "all"],
        default="all",
    )
    args = parser.parse_args()
    dispatch = {
        "rnn":     mode_rnn,
        "lstm":    mode_lstm,
        "predict": mode_predict,
        "compare": mode_compare,
        "all":     lambda: [mode_rnn(), mode_lstm(), mode_predict(), mode_compare()],
    }
    dispatch[args.mode]()


if __name__ == "__main__":
    main()
$ python 51-python-rnn-lstm.py --mode rnn
============================================================
  手写 RNN 前向传播演示
============================================================

  输入序列: [0.1 0.3 0.5 0.3 0.1]
  隐藏状态 shape: (5, 4)  (seq_len=5, hidden=4)

  时间步      h_t (4维隐藏状态)                             ‖h_t‖
  ────────────────────────────────────────────────────────────
  t=0      [+0.005  -0.001  +0.006  +0.015]  0.0173
  t=1      [+0.013  -0.005  +0.018  +0.044]  0.0492
  t=2      [+0.021  -0.010  +0.028  +0.070]  0.0788
  t=3      [+0.008  -0.008  +0.012  +0.036]  0.0400
  t=4      [+0.002  -0.003  +0.003  +0.010]  0.0114

  💡 h_t 随输入变化,携带了历史信息(t=2时信号最强,h范数最大)

$ python 51-python-rnn-lstm.py --mode lstm
============================================================
  手写 LSTM 门控机制演示
============================================================

  输入序列: [0.1 0.3 0.6 0.9 0.7 0.4 0.1]

  时间步      遗忘门f       输入门i       候选g        输出门o       ‖h_t‖
  ────────────────────────────────────────────────────────────
  t=0      0.501     0.730     -0.004     0.500     0.0105
  t=1      0.504     0.727     -0.012     0.499     0.0364
  t=2      0.509     0.724     -0.024     0.498     0.0804
  t=3      0.514     0.720     -0.035     0.497     0.1332
  t=4      0.513     0.723     -0.027     0.498     0.1384
  t=5      0.509     0.726     -0.016     0.498     0.1103
  t=6      0.504     0.730     -0.004     0.499     0.0660

  💡 遗忘门接近1=记住历史,接近0=遗忘历史
     输入门接近1=写入新信息,接近0=忽略新输入

$ python 51-python-rnn-lstm.py --mode predict
============================================================
  时序预测实战(正弦+趋势信号)
============================================================

  提取 LSTM 特征(训练集 308 序列)...
  MSE: 0.002597  MAE: 0.039393

  预测 vs 真实(前20个测试点,归一化后)
  步骤     真实值        预测值        误差         对比
  ───────────────────────────────────────────────────────
  0      0.1585    0.0800    0.0784    ███░
  1      0.0398    0.1176    0.0778    ░░
  2      0.0366    0.0876    0.0510    ░
  3      0.1364    0.0697    0.0667    ██░
  4      0.1296    0.1023    0.0273    ██░░
  5      0.1319    0.1173    0.0146    ██░░
  6      0.1419    0.1267    0.0152    ██░░
  7      0.1275    0.1365    0.0090    ██░░
  8      0.1041    0.1359    0.0317    ██░░
  9      0.1545    0.1257    0.0288    ███░░
  10     0.1374    0.1417    0.0044    ██░░
  11     0.2152    0.1433    0.0719    ████░░
  12     0.1839    0.1775    0.0064    ███░░░
  13     0.1699    0.1831    0.0132    ███░░░
  14     0.2024    0.1806    0.0218    ████░░░
  15     0.2435    0.1933    0.0502    ████░░░
  16     0.2168    0.2179    0.0011    ████░░░░
  17     0.2177    0.2204    0.0027    ████░░░░
  18     0.2700    0.2224    0.0476    █████░░░░
  19     0.2794    0.2459    0.0335    █████░░░░

小结与 NexDo Time ⚡

这一篇你已经看懂了序列建模的关键:RNN 用隐藏状态传递历史,LSTM 用门控机制管理长期记忆,滑动窗口把连续时间线切成可训练样本。先理解前向传播和状态流动,再谈训练和优化,学习坡度会平很多。

5 分钟微操挑战:把 mode_predict() 里的 SEQ_LEN = 15 改成 525,分别运行 --mode predict,观察 MSE 和 MAE 是否变化。思考:窗口太短和太长分别会带来什么问题?

Don’t wait for next time, do it in the next moment.