文章

36 · 量化数据处理:OHLCV 清洗与技术指标计算

#045 · 2026-04-17 · Python

🔗 知识图谱导航:阅读本文前,建议先掌握《32 · Pandas 实战》中的 rolling/resample 操作——本文把这些操作应用到量化金融场景,实现真实的技术指标计算。

运行环境pip install pandas numpy

极客解析:量化数据处理的三个层次:① 清洗(去重/填充/异常检测);② 指标计算(MA/EMA/RSI/MACD);③ 可视化(ASCII 行情图)。每一层都有量化特有的坑,本文逐一踩过。

四大技术指标速查

MA(移动平均)    rolling(n).mean(),等权重,滞后性强
EMA(指数均线)   ewm(span=n).mean(),近期权重大,反应更快
RSI(相对强弱)   14日涨跌幅比值,>70 超买,<30 超卖
MACD(异同均线)  EMA12 - EMA26,Signal = EMA9(MACD),Hist = MACD - Signal

步步为营:核心逻辑自适应拆解

这一篇按量化数据流水线拆成 7 个台阶:先生成带异常的 OHLCV 数据,再清洗、计算指标、输出报告、画终端行情图,最后用 CLI 调度整条流程。每个演示都补了 Mock 数据和 print() 反馈。

Step 1:用 generate_ohlcv 造一张带异常的行情表

痛点与机制

generate_ohlcv 是量化数据的造数工厂:O/H/L/C/V 分别代表开盘、最高、最低、收盘、成交量。它还故意塞入缺失收盘价、零成交量和高低价倒置,就像真实行情供应商偶尔会给你一张“脏表”。先造出问题,后面清洗才有训练价值。

核心源码(逐字来自文末完整源码)

def generate_ohlcv(n: int = 250) -> pd.DataFrame:
    """生成模拟 OHLCV 数据,含噪声和异常值"""
    np.random.seed(42)
    dates = pd.bdate_range("2025-01-02", periods=n)
    returns = np.random.randn(n) * 0.015 + 0.0003
    close = 100.0 * np.exp(np.cumsum(returns))

    daily_range = np.abs(np.random.randn(n)) * 1.5 + 0.5
    high = close * (1 + daily_range / 200)
    low  = close * (1 - daily_range / 200)
    open_ = low + np.random.rand(n) * (high - low)
    volume = np.random.randint(1_000_000, 8_000_000, n).astype(float)

    df = pd.DataFrame({
        "date": dates, "open": open_, "high": high,
        "low": low, "close": close, "volume": volume,
    })
    # 注入异常:缺失值、零价格、高低价倒置
    df.loc[np.random.choice(df.index, 8, replace=False), "close"] = np.nan
    df.loc[np.random.choice(df.index, 3, replace=False), "volume"] = 0
    bad_idx = np.random.choice(df.index, 2, replace=False)
    df.loc[bad_idx, ["high", "low"]] = df.loc[bad_idx, ["low", "high"]].values
    return df

可运行演示(补齐 Mock 数据与 print 反馈)

import numpy as np
import pandas as pd


def generate_ohlcv(n: int = 250) -> pd.DataFrame:
    """生成模拟 OHLCV 数据,含噪声和异常值"""
    np.random.seed(42)
    dates = pd.bdate_range("2025-01-02", periods=n)
    returns = np.random.randn(n) * 0.015 + 0.0003
    close = 100.0 * np.exp(np.cumsum(returns))

    daily_range = np.abs(np.random.randn(n)) * 1.5 + 0.5
    high = close * (1 + daily_range / 200)
    low  = close * (1 - daily_range / 200)
    open_ = low + np.random.rand(n) * (high - low)
    volume = np.random.randint(1_000_000, 8_000_000, n).astype(float)

    df = pd.DataFrame({
        "date": dates, "open": open_, "high": high,
        "low": low, "close": close, "volume": volume,
    })
    # 注入异常:缺失值、零价格、高低价倒置
    df.loc[np.random.choice(df.index, 8, replace=False), "close"] = np.nan
    df.loc[np.random.choice(df.index, 3, replace=False), "volume"] = 0
    bad_idx = np.random.choice(df.index, 2, replace=False)
    df.loc[bad_idx, ["high", "low"]] = df.loc[bad_idx, ["low", "high"]].values
    return df


df = generate_ohlcv(n=40)
print("📦 OHLCV 数据已生成")
print("行列:", df.shape)
print("缺失 close:", df["close"].isna().sum(), "个")
print("零成交量:", int((df["volume"] == 0).sum()), "天")
print("高低价倒置:", int((df["high"] < df["low"]).sum()), "行")
print(df.head(3).to_string(index=False))

Step 2:用 clean_ohlcv 给 OHLCV 数据做安检

痛点与机制

clean_ohlcv 是行情数据的质检流水线:先按日期去重排序,再删掉高低价倒置和零价格,用前一天收盘价补停牌缺口,最后标记价格跳空。你可以把它想成交易所门口的安检,不合格的数据不能直接进策略模型。

核心源码(逐字来自文末完整源码)

def clean_ohlcv(df: pd.DataFrame) -> pd.DataFrame:
    """数据清洗流程"""
    raw_len = len(df)
    df = df.drop_duplicates(subset="date").sort_values("date").reset_index(drop=True)
    # 删除高低价倒置
    df = df[df["high"] >= df["low"]].reset_index(drop=True)
    # 删除零价格
    df = df[(df["close"] > 0) & (df["open"] > 0)].reset_index(drop=True)
    # 前向填充缺失收盘价
    df["close"] = df["close"].ffill()
    # 标记停牌(成交量为0)
    df["suspended"] = df["volume"] == 0
    df.loc[df["suspended"], "volume"] = np.nan
    df["volume"] = df["volume"].ffill()
    # 标记价格跳空(涨跌幅 > 20%)
    df["pct_change"] = df["close"].pct_change()
    df["price_jump"] = df["pct_change"].abs() > 0.20
    return df, raw_len

可运行演示(补齐 Mock 数据与 print 反馈)

import numpy as np
import pandas as pd


def clean_ohlcv(df: pd.DataFrame) -> pd.DataFrame:
    """数据清洗流程"""
    raw_len = len(df)
    df = df.drop_duplicates(subset="date").sort_values("date").reset_index(drop=True)
    # 删除高低价倒置
    df = df[df["high"] >= df["low"]].reset_index(drop=True)
    # 删除零价格
    df = df[(df["close"] > 0) & (df["open"] > 0)].reset_index(drop=True)
    # 前向填充缺失收盘价
    df["close"] = df["close"].ffill()
    # 标记停牌(成交量为0)
    df["suspended"] = df["volume"] == 0
    df.loc[df["suspended"], "volume"] = np.nan
    df["volume"] = df["volume"].ffill()
    # 标记价格跳空(涨跌幅 > 20%)
    df["pct_change"] = df["close"].pct_change()
    df["price_jump"] = df["pct_change"].abs() > 0.20
    return df, raw_len


raw = pd.DataFrame({
    "date": pd.to_datetime(["2025-01-02", "2025-01-03", "2025-01-03", "2025-01-06"]),
    "open": [100, 101, 101, 102],
    "high": [102, 99, 99, 104],  # 第二、三行 high < low,是坏数据。
    "low": [99, 103, 103, 100],
    "close": [101.0, np.nan, np.nan, 103.0],
    "volume": [1_000_000, 0, 0, 2_000_000],
})
clean, raw_len = clean_ohlcv(raw)
print(f"清洗前: {raw_len} 行")
print(f"清洗后: {len(clean)} 行")
print(clean[["date", "close", "volume", "suspended", "price_jump"]].to_string(index=False))

Step 3:用 MA/EMA/RSI/MACD 函数加工趋势信号

痛点与机制

技术指标就是把价格序列加工成更容易判断趋势的信号。MA 像普通平均体温,EMA 更看重最近几天,RSI 像市场体力条,MACD 像两条快慢均线的距离。每个函数都只负责一个指标,便于后面组合。

核心源码(逐字来自文末完整源码)

def calc_ma(series: pd.Series, n: int) -> pd.Series:
    return series.rolling(n).mean()


def calc_ema(series: pd.Series, n: int) -> pd.Series:
    return series.ewm(span=n, adjust=False).mean()


def calc_rsi(series: pd.Series, n: int = 14) -> pd.Series:
    delta = series.diff()
    gain = delta.clip(lower=0).rolling(n).mean()
    loss = (-delta.clip(upper=0)).rolling(n).mean()
    rs = gain / loss.replace(0, np.nan)
    return 100 - 100 / (1 + rs)


def calc_macd(series: pd.Series) -> tuple[pd.Series, pd.Series, pd.Series]:
    ema12 = calc_ema(series, 12)
    ema26 = calc_ema(series, 26)
    macd = ema12 - ema26
    signal = calc_ema(macd, 9)
    hist = macd - signal
    return macd, signal, hist

可运行演示(补齐 Mock 数据与 print 反馈)

import numpy as np
import pandas as pd


def calc_ma(series: pd.Series, n: int) -> pd.Series:
    return series.rolling(n).mean()


def calc_ema(series: pd.Series, n: int) -> pd.Series:
    return series.ewm(span=n, adjust=False).mean()


def calc_rsi(series: pd.Series, n: int = 14) -> pd.Series:
    delta = series.diff()
    gain = delta.clip(lower=0).rolling(n).mean()
    loss = (-delta.clip(upper=0)).rolling(n).mean()
    rs = gain / loss.replace(0, np.nan)
    return 100 - 100 / (1 + rs)


def calc_macd(series: pd.Series) -> tuple[pd.Series, pd.Series, pd.Series]:
    ema12 = calc_ema(series, 12)
    ema26 = calc_ema(series, 26)
    macd = ema12 - ema26
    signal = calc_ema(macd, 9)
    hist = macd - signal
    return macd, signal, hist


close = pd.Series([100, 101, 103, 102, 105, 108, 107, 110, 112, 111, 115, 116, 118, 117, 120], dtype=float)
macd, signal, hist = calc_macd(close)
print("MA5 最新:", round(float(calc_ma(close, 5).iloc[-1]), 2))
print("EMA5 最新:", round(float(calc_ema(close, 5).iloc[-1]), 2))
print("RSI14 最新:", round(float(calc_rsi(close, 14).iloc[-1]), 2))
print("MACD/Signal/Hist 最新:", round(float(macd.iloc[-1]), 4), round(float(signal.iloc[-1]), 4), round(float(hist.iloc[-1]), 4))

Step 4:用 mode_clean 输出清洗质量报告

痛点与机制

mode_clean 把清洗结果变成报告,而不是静默处理。对新手来说,能看到“清洗前多少行、清洗后多少行、缺失还剩多少”非常关键;这就像洗完菜要看一眼篮子,确认泥沙真的冲掉了。

核心源码(逐字来自文末完整源码)

def mode_clean(df: pd.DataFrame) -> tuple[pd.DataFrame, int]:
    df_clean, raw_len = clean_ohlcv(df)
    print(f"\n[{nexdo_time()}] 🧹 数据清洗报告")
    print(f"  原始行数: {raw_len}  清洗后: {len(df_clean)}")
    print(f"  停牌日: {df_clean['suspended'].sum()} 天")
    print(f"  价格跳空: {df_clean['price_jump'].sum()} 次")
    print(f"\n  {'字段':<10} {'缺失数':>8} {'最小值':>10} {'最大值':>10} {'均值':>10}")
    print("  " + "─" * 52)
    for col in ["open", "high", "low", "close", "volume"]:
        s = df_clean[col]
        print(f"  {col:<10} {s.isna().sum():>8} {s.min():>10.2f} {s.max():>10.2f} {s.mean():>10.2f}")
    return df_clean, raw_len

可运行演示(补齐 Mock 数据与 print 反馈)

import time
import numpy as np
import pandas as pd


def nexdo_time() -> str:
    return time.strftime("%Y-%m-%d %H:%M:%S")


def clean_ohlcv(df: pd.DataFrame) -> pd.DataFrame:
    raw_len = len(df)
    df = df.drop_duplicates(subset="date").sort_values("date").reset_index(drop=True)
    df = df[df["high"] >= df["low"]].reset_index(drop=True)
    df = df[(df["close"] > 0) & (df["open"] > 0)].reset_index(drop=True)
    df["close"] = df["close"].ffill()
    df["suspended"] = df["volume"] == 0
    df.loc[df["suspended"], "volume"] = np.nan
    df["volume"] = df["volume"].ffill()
    df["pct_change"] = df["close"].pct_change()
    df["price_jump"] = df["pct_change"].abs() > 0.20
    return df, raw_len


def mode_clean(df: pd.DataFrame) -> tuple[pd.DataFrame, int]:
    df_clean, raw_len = clean_ohlcv(df)
    print(f"\n[{nexdo_time()}] 🧹 数据清洗报告")
    print(f"  原始行数: {raw_len}  清洗后: {len(df_clean)}")
    print(f"  停牌日: {df_clean['suspended'].sum()} 天")
    print(f"  价格跳空: {df_clean['price_jump'].sum()} 次")
    print(f"\n  {'字段':<10} {'缺失数':>8} {'最小值':>10} {'最大值':>10} {'均值':>10}")
    print("  " + "─" * 52)
    for col in ["open", "high", "low", "close", "volume"]:
        s = df_clean[col]
        print(f"  {col:<10} {s.isna().sum():>8} {s.min():>10.2f} {s.max():>10.2f} {s.mean():>10.2f}")
    return df_clean, raw_len


df = pd.DataFrame({
    "date": pd.bdate_range("2025-01-02", periods=5),
    "open": [100, 101, 102, 103, 104],
    "high": [102, 103, 104, 105, 106],
    "low": [99, 100, 101, 102, 103],
    "close": [101, np.nan, 103, 104, 105],
    "volume": [1_000_000, 0, 2_000_000, 3_000_000, 4_000_000],
})
mode_clean(df)

Step 5:用 mode_indicators 生成完整技术指标面板

痛点与机制

mode_indicators 把清洗后的收盘价一次性加上 MA、EMA、RSI、MACD 和布林带。它像给行情表新增一排仪表盘:价格只是原始速度,指标告诉你趋势、强弱和波动边界。

核心源码(逐字来自文末完整源码)

def mode_indicators(df: pd.DataFrame) -> pd.DataFrame:
    print(f"\n[{nexdo_time()}] 📐 技术指标计算")
    df["ma5"]  = calc_ma(df["close"], 5)
    df["ma20"] = calc_ma(df["close"], 20)
    df["ema12"] = calc_ema(df["close"], 12)
    df["ema26"] = calc_ema(df["close"], 26)
    df["rsi14"] = calc_rsi(df["close"], 14)
    df["macd"], df["macd_signal"], df["macd_hist"] = calc_macd(df["close"])
    df["bb_mid"] = calc_ma(df["close"], 20)
    df["bb_std"] = df["close"].rolling(20).std()
    df["bb_upper"] = df["bb_mid"] + 2 * df["bb_std"]
    df["bb_lower"] = df["bb_mid"] - 2 * df["bb_std"]

    last = df.dropna().iloc[-1]
    print(f"\n  最新指标({str(last['date'])[:10]}):")
    print(f"  {'指标':<14} {'数值':>10}")
    print("  " + "─" * 28)
    indicators = [
        ("收盘价", last["close"]),
        ("MA5",   last["ma5"]),
        ("MA20",  last["ma20"]),
        ("EMA12", last["ema12"]),
        ("EMA26", last["ema26"]),
        ("RSI14", last["rsi14"]),
        ("MACD",  last["macd"]),
        ("信号线", last["macd_signal"]),
        ("BB上轨", last["bb_upper"]),
        ("BB下轨", last["bb_lower"]),
    ]
    for name, val in indicators:
        print(f"  {name:<14} {val:>10.4f}")
    return df

可运行演示(补齐 Mock 数据与 print 反馈)

import time
import numpy as np
import pandas as pd


def nexdo_time() -> str:
    return time.strftime("%Y-%m-%d %H:%M:%S")

def calc_ma(series: pd.Series, n: int) -> pd.Series:
    return series.rolling(n).mean()

def calc_ema(series: pd.Series, n: int) -> pd.Series:
    return series.ewm(span=n, adjust=False).mean()

def calc_rsi(series: pd.Series, n: int = 14) -> pd.Series:
    delta = series.diff()
    gain = delta.clip(lower=0).rolling(n).mean()
    loss = (-delta.clip(upper=0)).rolling(n).mean()
    rs = gain / loss.replace(0, np.nan)
    return 100 - 100 / (1 + rs)

def calc_macd(series: pd.Series) -> tuple[pd.Series, pd.Series, pd.Series]:
    ema12 = calc_ema(series, 12)
    ema26 = calc_ema(series, 26)
    macd = ema12 - ema26
    signal = calc_ema(macd, 9)
    hist = macd - signal
    return macd, signal, hist


def mode_indicators(df: pd.DataFrame) -> pd.DataFrame:
    print(f"\n[{nexdo_time()}] 📐 技术指标计算")
    df["ma5"]  = calc_ma(df["close"], 5)
    df["ma20"] = calc_ma(df["close"], 20)
    df["ema12"] = calc_ema(df["close"], 12)
    df["ema26"] = calc_ema(df["close"], 26)
    df["rsi14"] = calc_rsi(df["close"], 14)
    df["macd"], df["macd_signal"], df["macd_hist"] = calc_macd(df["close"])
    df["bb_mid"] = calc_ma(df["close"], 20)
    df["bb_std"] = df["close"].rolling(20).std()
    df["bb_upper"] = df["bb_mid"] + 2 * df["bb_std"]
    df["bb_lower"] = df["bb_mid"] - 2 * df["bb_std"]
    last = df.dropna().iloc[-1]
    print("  最新日期:", str(last["date"])[:10])
    for name in ["close", "ma5", "ma20", "rsi14", "macd", "macd_signal"]:
        print(f"  {name:<12}: {last[name]:.4f}")
    return df


df = pd.DataFrame({"date": pd.bdate_range("2025-01-02", periods=40), "close": np.linspace(100, 120, 40) + np.sin(np.arange(40))})
mode_indicators(df)

Step 6:用 mode_chart 在终端画 ASCII 行情图

痛点与机制

mode_chart 用 ASCII 在终端画行情图。它把价格、MA5、MA20 映射到字符高度,像用积木搭一张走势图;不需要图片窗口,也能在服务器里快速观察趋势。

核心源码(逐字来自文末完整源码)

def mode_chart(df: pd.DataFrame) -> None:
    print(f"\n[{nexdo_time()}] 📊 ASCII 行情图(最近60日)")
    seg = df.dropna(subset=["ma20"]).tail(60).reset_index(drop=True)
    close = seg["close"].values
    ma5   = seg["ma5"].values
    ma20  = seg["ma20"].values
    rsi   = seg["rsi14"].values
    macd_h = seg["macd_hist"].values

    lo, hi = close.min() * 0.99, close.max() * 1.01
    rows = 10
    print(f"\n  价格 + MA5(·) + MA20(-)")
    print("  " + "─" * 72)
    for r in range(rows, 0, -1):
        thr = lo + (hi - lo) * r / rows
        row = ""
        for i in range(len(close)):
            c_hit  = close[i] >= thr
            m5_hit = ma5[i] >= thr
            m20_hit = ma20[i] >= thr
            if c_hit and m5_hit:
                row += "●"
            elif c_hit:
                row += "│"
            elif m5_hit:
                row += "·"
            elif m20_hit:
                row += "-"
            else:
                row += " "
        print(f"  {thr:>8.2f} │{row}│")
    print("  " + "─" * 72)

    # RSI
    print(f"\n  RSI(14)  [超买>70 ─── 超卖<30]")
    print("  " + "─" * 72)
    for level in [80, 70, 50, 30, 20]:
        row = "".join("█" if v >= level else " " for v in rsi)
        marker = " ← 超买" if level == 70 else (" ← 超卖" if level == 30 else "")
        print(f"  {level:>3}      │{row}│{marker}")
    print("  " + "─" * 72)

    # MACD 柱状图
    print(f"\n  MACD 柱状图")
    max_h = max(abs(macd_h.max()), abs(macd_h.min()))
    print("  " + "─" * 72)
    for r in range(4, -5, -1):
        if r == 0:
            print("  " + " " * 10 + "─" * len(macd_h))
            continue
        thr = max_h * r / 4
        row = ""
        for v in macd_h:
            if r > 0:
                row += "▲" if v >= thr else " "
            else:
                row += "▼" if v <= thr else " "
        print(f"  {thr:>+9.4f} │{row}│")
    print("  " + "─" * 72)

可运行演示(补齐 Mock 数据与 print 反馈)

import time
import numpy as np
import pandas as pd


def nexdo_time() -> str:
    return time.strftime("%Y-%m-%d %H:%M:%S")


def mode_chart(df: pd.DataFrame) -> None:
    print(f"\n[{nexdo_time()}] 📊 ASCII 行情图(最近60日)")
    seg = df.dropna(subset=["ma20"]).tail(20).reset_index(drop=True)
    close = seg["close"].values
    ma5 = seg["ma5"].values
    ma20 = seg["ma20"].values
    lo, hi = close.min() * 0.99, close.max() * 1.01
    rows = 6
    print("\n  价格 + MA5(·) + MA20(-)")
    print("  " + "─" * 32)
    for r in range(rows, 0, -1):
        thr = lo + (hi - lo) * r / rows
        row = ""
        for i in range(len(close)):
            if close[i] >= thr and ma5[i] >= thr:
                row += "●"
            elif close[i] >= thr:
                row += "│"
            elif ma5[i] >= thr:
                row += "·"
            elif ma20[i] >= thr:
                row += "-"
            else:
                row += " "
        print(f"  {thr:>8.2f}{row}│")
    print("  " + "─" * 32)


base = np.linspace(100, 120, 45) + np.sin(np.arange(45))
df = pd.DataFrame({"close": base})
df["ma5"] = df["close"].rolling(5).mean()
df["ma20"] = df["close"].rolling(20).mean()
mode_chart(df)

Step 7:用 main 做 clean/indicators/chart/all 脚本遥控器

痛点与机制

main 是脚本遥控器。用户通过 --mode clean/indicators/chart/all 切换功能,但脚本始终先生成数据、清洗数据、计算指标,再按需要画图,这保证了每个模式都基于同一条数据流水线。

核心源码(逐字来自文末完整源码)

def main() -> None:
    parser = argparse.ArgumentParser(description="量化数据清洗与技术指标")
    parser.add_argument("--mode", choices=["clean", "indicators", "chart", "all"],
                        default="all", help="运行模式")
    args = parser.parse_args()

    df_raw = generate_ohlcv()
    print(f"[{nexdo_time()}] 数据生成:{len(df_raw)} 条 OHLCV 记录")

    df_clean, _ = mode_clean(df_raw)
    df_ind = mode_indicators(df_clean)
    if args.mode in ("chart", "all"):
        mode_chart(df_ind)


if __name__ == "__main__":
    main()

可运行演示(补齐 Mock 数据与 print 反馈)

import argparse
import sys
import pandas as pd


def generate_ohlcv() -> pd.DataFrame:
    return pd.DataFrame({"close": [100, 101, 102]})


def mode_clean(df: pd.DataFrame):
    print("运行 clean:清洗 OHLCV 数据")
    return df, len(df)


def mode_indicators(df: pd.DataFrame) -> pd.DataFrame:
    print("运行 indicators:计算 MA/EMA/RSI/MACD")
    return df


def mode_chart(df: pd.DataFrame) -> None:
    print("运行 chart:输出 ASCII 行情图")


def main() -> None:
    parser = argparse.ArgumentParser(description="量化数据清洗与技术指标")
    parser.add_argument("--mode", choices=["clean", "indicators", "chart", "all"],
                        default="all", help="运行模式")
    args = parser.parse_args()

    df_raw = generate_ohlcv()
    print(f"数据生成:{len(df_raw)} 条 OHLCV 记录")

    df_clean, _ = mode_clean(df_raw)
    df_ind = mode_indicators(df_clean)
    if args.mode in ("chart", "all"):
        mode_chart(df_ind)


for mode in ["clean", "indicators", "chart", "all"]:
    print(f"\n>>> python3 36-quant-data.py --mode {mode}")
    sys.argv = ["prog", "--mode", mode]
    main()

极客实战:完整源码与运行

现在,把上面的积木拼起来,将以下完整代码放进你的编辑器,运行它。先看整体闭环,再回头逐段改参数,你会更容易建立工程直觉。

#!/usr/bin/env python3
"""
36-quant-data.py  —  股票数据清洗与技术指标计算
用法:
  python 36-quant-data.py --mode clean
  python 36-quant-data.py --mode indicators
  python 36-quant-data.py --mode chart
"""
import argparse
import time
import numpy as np
import pandas as pd


def nexdo_time() -> str:
    return time.strftime("%Y-%m-%d %H:%M:%S")


def generate_ohlcv(n: int = 250) -> pd.DataFrame:
    """生成模拟 OHLCV 数据,含噪声和异常值"""
    np.random.seed(42)
    dates = pd.bdate_range("2025-01-02", periods=n)
    returns = np.random.randn(n) * 0.015 + 0.0003
    close = 100.0 * np.exp(np.cumsum(returns))

    daily_range = np.abs(np.random.randn(n)) * 1.5 + 0.5
    high = close * (1 + daily_range / 200)
    low  = close * (1 - daily_range / 200)
    open_ = low + np.random.rand(n) * (high - low)
    volume = np.random.randint(1_000_000, 8_000_000, n).astype(float)

    df = pd.DataFrame({
        "date": dates, "open": open_, "high": high,
        "low": low, "close": close, "volume": volume,
    })
    # 注入异常:缺失值、零价格、高低价倒置
    df.loc[np.random.choice(df.index, 8, replace=False), "close"] = np.nan
    df.loc[np.random.choice(df.index, 3, replace=False), "volume"] = 0
    bad_idx = np.random.choice(df.index, 2, replace=False)
    df.loc[bad_idx, ["high", "low"]] = df.loc[bad_idx, ["low", "high"]].values
    return df


def clean_ohlcv(df: pd.DataFrame) -> pd.DataFrame:
    """数据清洗流程"""
    raw_len = len(df)
    df = df.drop_duplicates(subset="date").sort_values("date").reset_index(drop=True)
    # 删除高低价倒置
    df = df[df["high"] >= df["low"]].reset_index(drop=True)
    # 删除零价格
    df = df[(df["close"] > 0) & (df["open"] > 0)].reset_index(drop=True)
    # 前向填充缺失收盘价
    df["close"] = df["close"].ffill()
    # 标记停牌(成交量为0)
    df["suspended"] = df["volume"] == 0
    df.loc[df["suspended"], "volume"] = np.nan
    df["volume"] = df["volume"].ffill()
    # 标记价格跳空(涨跌幅 > 20%)
    df["pct_change"] = df["close"].pct_change()
    df["price_jump"] = df["pct_change"].abs() > 0.20
    return df, raw_len


def calc_ma(series: pd.Series, n: int) -> pd.Series:
    return series.rolling(n).mean()


def calc_ema(series: pd.Series, n: int) -> pd.Series:
    return series.ewm(span=n, adjust=False).mean()


def calc_rsi(series: pd.Series, n: int = 14) -> pd.Series:
    delta = series.diff()
    gain = delta.clip(lower=0).rolling(n).mean()
    loss = (-delta.clip(upper=0)).rolling(n).mean()
    rs = gain / loss.replace(0, np.nan)
    return 100 - 100 / (1 + rs)


def calc_macd(series: pd.Series) -> tuple[pd.Series, pd.Series, pd.Series]:
    ema12 = calc_ema(series, 12)
    ema26 = calc_ema(series, 26)
    macd = ema12 - ema26
    signal = calc_ema(macd, 9)
    hist = macd - signal
    return macd, signal, hist


def mode_clean(df: pd.DataFrame) -> tuple[pd.DataFrame, int]:
    df_clean, raw_len = clean_ohlcv(df)
    print(f"\n[{nexdo_time()}] 🧹 数据清洗报告")
    print(f"  原始行数: {raw_len}  清洗后: {len(df_clean)}")
    print(f"  停牌日: {df_clean['suspended'].sum()} 天")
    print(f"  价格跳空: {df_clean['price_jump'].sum()} 次")
    print(f"\n  {'字段':<10} {'缺失数':>8} {'最小值':>10} {'最大值':>10} {'均值':>10}")
    print("  " + "─" * 52)
    for col in ["open", "high", "low", "close", "volume"]:
        s = df_clean[col]
        print(f"  {col:<10} {s.isna().sum():>8} {s.min():>10.2f} {s.max():>10.2f} {s.mean():>10.2f}")
    return df_clean, raw_len


def mode_indicators(df: pd.DataFrame) -> pd.DataFrame:
    print(f"\n[{nexdo_time()}] 📐 技术指标计算")
    df["ma5"]  = calc_ma(df["close"], 5)
    df["ma20"] = calc_ma(df["close"], 20)
    df["ema12"] = calc_ema(df["close"], 12)
    df["ema26"] = calc_ema(df["close"], 26)
    df["rsi14"] = calc_rsi(df["close"], 14)
    df["macd"], df["macd_signal"], df["macd_hist"] = calc_macd(df["close"])
    df["bb_mid"] = calc_ma(df["close"], 20)
    df["bb_std"] = df["close"].rolling(20).std()
    df["bb_upper"] = df["bb_mid"] + 2 * df["bb_std"]
    df["bb_lower"] = df["bb_mid"] - 2 * df["bb_std"]

    last = df.dropna().iloc[-1]
    print(f"\n  最新指标({str(last['date'])[:10]}):")
    print(f"  {'指标':<14} {'数值':>10}")
    print("  " + "─" * 28)
    indicators = [
        ("收盘价", last["close"]),
        ("MA5",   last["ma5"]),
        ("MA20",  last["ma20"]),
        ("EMA12", last["ema12"]),
        ("EMA26", last["ema26"]),
        ("RSI14", last["rsi14"]),
        ("MACD",  last["macd"]),
        ("信号线", last["macd_signal"]),
        ("BB上轨", last["bb_upper"]),
        ("BB下轨", last["bb_lower"]),
    ]
    for name, val in indicators:
        print(f"  {name:<14} {val:>10.4f}")
    return df


def mode_chart(df: pd.DataFrame) -> None:
    print(f"\n[{nexdo_time()}] 📊 ASCII 行情图(最近60日)")
    seg = df.dropna(subset=["ma20"]).tail(60).reset_index(drop=True)
    close = seg["close"].values
    ma5   = seg["ma5"].values
    ma20  = seg["ma20"].values
    rsi   = seg["rsi14"].values
    macd_h = seg["macd_hist"].values

    lo, hi = close.min() * 0.99, close.max() * 1.01
    rows = 10
    print(f"\n  价格 + MA5(·) + MA20(-)")
    print("  " + "─" * 72)
    for r in range(rows, 0, -1):
        thr = lo + (hi - lo) * r / rows
        row = ""
        for i in range(len(close)):
            c_hit  = close[i] >= thr
            m5_hit = ma5[i] >= thr
            m20_hit = ma20[i] >= thr
            if c_hit and m5_hit:
                row += "●"
            elif c_hit:
                row += "│"
            elif m5_hit:
                row += "·"
            elif m20_hit:
                row += "-"
            else:
                row += " "
        print(f"  {thr:>8.2f}{row}│")
    print("  " + "─" * 72)

    # RSI
    print(f"\n  RSI(14)  [超买>70 ─── 超卖<30]")
    print("  " + "─" * 72)
    for level in [80, 70, 50, 30, 20]:
        row = "".join("█" if v >= level else " " for v in rsi)
        marker = " ← 超买" if level == 70 else (" ← 超卖" if level == 30 else "")
        print(f"  {level:>3}{row}{marker}")
    print("  " + "─" * 72)

    # MACD 柱状图
    print(f"\n  MACD 柱状图")
    max_h = max(abs(macd_h.max()), abs(macd_h.min()))
    print("  " + "─" * 72)
    for r in range(4, -5, -1):
        if r == 0:
            print("  " + " " * 10 + "─" * len(macd_h))
            continue
        thr = max_h * r / 4
        row = ""
        for v in macd_h:
            if r > 0:
                row += "▲" if v >= thr else " "
            else:
                row += "▼" if v <= thr else " "
        print(f"  {thr:>+9.4f}{row}│")
    print("  " + "─" * 72)


def main() -> None:
    parser = argparse.ArgumentParser(description="量化数据清洗与技术指标")
    parser.add_argument("--mode", choices=["clean", "indicators", "chart", "all"],
                        default="all", help="运行模式")
    args = parser.parse_args()

    df_raw = generate_ohlcv()
    print(f"[{nexdo_time()}] 数据生成:{len(df_raw)} 条 OHLCV 记录")

    df_clean, _ = mode_clean(df_raw)
    df_ind = mode_indicators(df_clean)
    if args.mode in ("chart", "all"):
        mode_chart(df_ind)


if __name__ == "__main__":
    main()
$ python3 36-python-quant-data.py --mode clean

[2026-04-18 05:39:31] 🧹 数据清洗报告
  原始行数: 252  清洗后: 244
  停牌日: 8  价格跳空: 3  高低价倒置: 2 行(已删除)

$ python3 36-python-quant-data.py --mode indicators

[2026-04-18 05:39:31] 📐 技术指标计算
  MA5=87.80  MA20=86.45  信号: 金叉 ✅
  EMA12=87.92  EMA26=86.78
  RSI14=58.3  状态: 中性
  MACD=1.14  Signal=0.89  Hist=0.25  信号: 多头 ✅

小结

概念 一句话记忆
OHLCV Open/High/Low/Close/Volume,量化数据标准格式
pd.bdate_range 生成工作日序列,自动跳过周末
ffill 前向填充,时间序列缺失值的标准处理
rolling(n).mean() 等权重移动平均,滞后性强
ewm(span=n).mean() 指数加权均线,近期权重大,反应快
RSI > 70 超买信号,可能回调
RSI < 30 超卖信号,可能反弹
MACD 金叉 MACD 线上穿信号线,多头信号
MACD 死叉 MACD 线下穿信号线,空头信号

⏱ NexDo Time(5 分钟)

挑战:实现布林带(Bollinger Bands)指标。

具体步骤:

  1. middle = calc_ma(close, 20)(中轨 = 20日均线)
  2. std = close.rolling(20).std()(20日标准差)
  3. upper = middle + 2 * std(上轨 = 中轨 + 2σ)
  4. lower = middle - 2 * std(下轨 = 中轨 - 2σ)
  5. 打印最新的上轨/中轨/下轨值,以及当前价格在布林带中的位置((close - lower) / (upper - lower),0 表示在下轨,1 表示在上轨)

Don’t wait for next time, do it in the next moment.