36 · 量化数据处理:OHLCV 清洗与技术指标计算
🔗 知识图谱导航:阅读本文前,建议先掌握《32 · Pandas 实战》中的
rolling/resample操作——本文把这些操作应用到量化金融场景,实现真实的技术指标计算。
运行环境:
pip install pandas numpy。
极客解析:量化数据处理的三个层次:① 清洗(去重/填充/异常检测);② 指标计算(MA/EMA/RSI/MACD);③ 可视化(ASCII 行情图)。每一层都有量化特有的坑,本文逐一踩过。
四大技术指标速查
MA(移动平均) rolling(n).mean(),等权重,滞后性强
EMA(指数均线) ewm(span=n).mean(),近期权重大,反应更快
RSI(相对强弱) 14日涨跌幅比值,>70 超买,<30 超卖
MACD(异同均线) EMA12 - EMA26,Signal = EMA9(MACD),Hist = MACD - Signal
步步为营:核心逻辑自适应拆解
这一篇按量化数据流水线拆成 7 个台阶:先生成带异常的 OHLCV 数据,再清洗、计算指标、输出报告、画终端行情图,最后用 CLI 调度整条流程。每个演示都补了 Mock 数据和 print() 反馈。
Step 1:用 generate_ohlcv 造一张带异常的行情表
痛点与机制:
generate_ohlcv 是量化数据的造数工厂:O/H/L/C/V 分别代表开盘、最高、最低、收盘、成交量。它还故意塞入缺失收盘价、零成交量和高低价倒置,就像真实行情供应商偶尔会给你一张“脏表”。先造出问题,后面清洗才有训练价值。
核心源码(逐字来自文末完整源码):
def generate_ohlcv(n: int = 250) -> pd.DataFrame:
"""生成模拟 OHLCV 数据,含噪声和异常值"""
np.random.seed(42)
dates = pd.bdate_range("2025-01-02", periods=n)
returns = np.random.randn(n) * 0.015 + 0.0003
close = 100.0 * np.exp(np.cumsum(returns))
daily_range = np.abs(np.random.randn(n)) * 1.5 + 0.5
high = close * (1 + daily_range / 200)
low = close * (1 - daily_range / 200)
open_ = low + np.random.rand(n) * (high - low)
volume = np.random.randint(1_000_000, 8_000_000, n).astype(float)
df = pd.DataFrame({
"date": dates, "open": open_, "high": high,
"low": low, "close": close, "volume": volume,
})
# 注入异常:缺失值、零价格、高低价倒置
df.loc[np.random.choice(df.index, 8, replace=False), "close"] = np.nan
df.loc[np.random.choice(df.index, 3, replace=False), "volume"] = 0
bad_idx = np.random.choice(df.index, 2, replace=False)
df.loc[bad_idx, ["high", "low"]] = df.loc[bad_idx, ["low", "high"]].values
return df
可运行演示(补齐 Mock 数据与 print 反馈):
import numpy as np
import pandas as pd
def generate_ohlcv(n: int = 250) -> pd.DataFrame:
"""生成模拟 OHLCV 数据,含噪声和异常值"""
np.random.seed(42)
dates = pd.bdate_range("2025-01-02", periods=n)
returns = np.random.randn(n) * 0.015 + 0.0003
close = 100.0 * np.exp(np.cumsum(returns))
daily_range = np.abs(np.random.randn(n)) * 1.5 + 0.5
high = close * (1 + daily_range / 200)
low = close * (1 - daily_range / 200)
open_ = low + np.random.rand(n) * (high - low)
volume = np.random.randint(1_000_000, 8_000_000, n).astype(float)
df = pd.DataFrame({
"date": dates, "open": open_, "high": high,
"low": low, "close": close, "volume": volume,
})
# 注入异常:缺失值、零价格、高低价倒置
df.loc[np.random.choice(df.index, 8, replace=False), "close"] = np.nan
df.loc[np.random.choice(df.index, 3, replace=False), "volume"] = 0
bad_idx = np.random.choice(df.index, 2, replace=False)
df.loc[bad_idx, ["high", "low"]] = df.loc[bad_idx, ["low", "high"]].values
return df
df = generate_ohlcv(n=40)
print("📦 OHLCV 数据已生成")
print("行列:", df.shape)
print("缺失 close:", df["close"].isna().sum(), "个")
print("零成交量:", int((df["volume"] == 0).sum()), "天")
print("高低价倒置:", int((df["high"] < df["low"]).sum()), "行")
print(df.head(3).to_string(index=False))
Step 2:用 clean_ohlcv 给 OHLCV 数据做安检
痛点与机制:
clean_ohlcv 是行情数据的质检流水线:先按日期去重排序,再删掉高低价倒置和零价格,用前一天收盘价补停牌缺口,最后标记价格跳空。你可以把它想成交易所门口的安检,不合格的数据不能直接进策略模型。
核心源码(逐字来自文末完整源码):
def clean_ohlcv(df: pd.DataFrame) -> pd.DataFrame:
"""数据清洗流程"""
raw_len = len(df)
df = df.drop_duplicates(subset="date").sort_values("date").reset_index(drop=True)
# 删除高低价倒置
df = df[df["high"] >= df["low"]].reset_index(drop=True)
# 删除零价格
df = df[(df["close"] > 0) & (df["open"] > 0)].reset_index(drop=True)
# 前向填充缺失收盘价
df["close"] = df["close"].ffill()
# 标记停牌(成交量为0)
df["suspended"] = df["volume"] == 0
df.loc[df["suspended"], "volume"] = np.nan
df["volume"] = df["volume"].ffill()
# 标记价格跳空(涨跌幅 > 20%)
df["pct_change"] = df["close"].pct_change()
df["price_jump"] = df["pct_change"].abs() > 0.20
return df, raw_len
可运行演示(补齐 Mock 数据与 print 反馈):
import numpy as np
import pandas as pd
def clean_ohlcv(df: pd.DataFrame) -> pd.DataFrame:
"""数据清洗流程"""
raw_len = len(df)
df = df.drop_duplicates(subset="date").sort_values("date").reset_index(drop=True)
# 删除高低价倒置
df = df[df["high"] >= df["low"]].reset_index(drop=True)
# 删除零价格
df = df[(df["close"] > 0) & (df["open"] > 0)].reset_index(drop=True)
# 前向填充缺失收盘价
df["close"] = df["close"].ffill()
# 标记停牌(成交量为0)
df["suspended"] = df["volume"] == 0
df.loc[df["suspended"], "volume"] = np.nan
df["volume"] = df["volume"].ffill()
# 标记价格跳空(涨跌幅 > 20%)
df["pct_change"] = df["close"].pct_change()
df["price_jump"] = df["pct_change"].abs() > 0.20
return df, raw_len
raw = pd.DataFrame({
"date": pd.to_datetime(["2025-01-02", "2025-01-03", "2025-01-03", "2025-01-06"]),
"open": [100, 101, 101, 102],
"high": [102, 99, 99, 104], # 第二、三行 high < low,是坏数据。
"low": [99, 103, 103, 100],
"close": [101.0, np.nan, np.nan, 103.0],
"volume": [1_000_000, 0, 0, 2_000_000],
})
clean, raw_len = clean_ohlcv(raw)
print(f"清洗前: {raw_len} 行")
print(f"清洗后: {len(clean)} 行")
print(clean[["date", "close", "volume", "suspended", "price_jump"]].to_string(index=False))
Step 3:用 MA/EMA/RSI/MACD 函数加工趋势信号
痛点与机制:
技术指标就是把价格序列加工成更容易判断趋势的信号。MA 像普通平均体温,EMA 更看重最近几天,RSI 像市场体力条,MACD 像两条快慢均线的距离。每个函数都只负责一个指标,便于后面组合。
核心源码(逐字来自文末完整源码):
def calc_ma(series: pd.Series, n: int) -> pd.Series:
return series.rolling(n).mean()
def calc_ema(series: pd.Series, n: int) -> pd.Series:
return series.ewm(span=n, adjust=False).mean()
def calc_rsi(series: pd.Series, n: int = 14) -> pd.Series:
delta = series.diff()
gain = delta.clip(lower=0).rolling(n).mean()
loss = (-delta.clip(upper=0)).rolling(n).mean()
rs = gain / loss.replace(0, np.nan)
return 100 - 100 / (1 + rs)
def calc_macd(series: pd.Series) -> tuple[pd.Series, pd.Series, pd.Series]:
ema12 = calc_ema(series, 12)
ema26 = calc_ema(series, 26)
macd = ema12 - ema26
signal = calc_ema(macd, 9)
hist = macd - signal
return macd, signal, hist
可运行演示(补齐 Mock 数据与 print 反馈):
import numpy as np
import pandas as pd
def calc_ma(series: pd.Series, n: int) -> pd.Series:
return series.rolling(n).mean()
def calc_ema(series: pd.Series, n: int) -> pd.Series:
return series.ewm(span=n, adjust=False).mean()
def calc_rsi(series: pd.Series, n: int = 14) -> pd.Series:
delta = series.diff()
gain = delta.clip(lower=0).rolling(n).mean()
loss = (-delta.clip(upper=0)).rolling(n).mean()
rs = gain / loss.replace(0, np.nan)
return 100 - 100 / (1 + rs)
def calc_macd(series: pd.Series) -> tuple[pd.Series, pd.Series, pd.Series]:
ema12 = calc_ema(series, 12)
ema26 = calc_ema(series, 26)
macd = ema12 - ema26
signal = calc_ema(macd, 9)
hist = macd - signal
return macd, signal, hist
close = pd.Series([100, 101, 103, 102, 105, 108, 107, 110, 112, 111, 115, 116, 118, 117, 120], dtype=float)
macd, signal, hist = calc_macd(close)
print("MA5 最新:", round(float(calc_ma(close, 5).iloc[-1]), 2))
print("EMA5 最新:", round(float(calc_ema(close, 5).iloc[-1]), 2))
print("RSI14 最新:", round(float(calc_rsi(close, 14).iloc[-1]), 2))
print("MACD/Signal/Hist 最新:", round(float(macd.iloc[-1]), 4), round(float(signal.iloc[-1]), 4), round(float(hist.iloc[-1]), 4))
Step 4:用 mode_clean 输出清洗质量报告
痛点与机制:
mode_clean 把清洗结果变成报告,而不是静默处理。对新手来说,能看到“清洗前多少行、清洗后多少行、缺失还剩多少”非常关键;这就像洗完菜要看一眼篮子,确认泥沙真的冲掉了。
核心源码(逐字来自文末完整源码):
def mode_clean(df: pd.DataFrame) -> tuple[pd.DataFrame, int]:
df_clean, raw_len = clean_ohlcv(df)
print(f"\n[{nexdo_time()}] 🧹 数据清洗报告")
print(f" 原始行数: {raw_len} 清洗后: {len(df_clean)}")
print(f" 停牌日: {df_clean['suspended'].sum()} 天")
print(f" 价格跳空: {df_clean['price_jump'].sum()} 次")
print(f"\n {'字段':<10} {'缺失数':>8} {'最小值':>10} {'最大值':>10} {'均值':>10}")
print(" " + "─" * 52)
for col in ["open", "high", "low", "close", "volume"]:
s = df_clean[col]
print(f" {col:<10} {s.isna().sum():>8} {s.min():>10.2f} {s.max():>10.2f} {s.mean():>10.2f}")
return df_clean, raw_len
可运行演示(补齐 Mock 数据与 print 反馈):
import time
import numpy as np
import pandas as pd
def nexdo_time() -> str:
return time.strftime("%Y-%m-%d %H:%M:%S")
def clean_ohlcv(df: pd.DataFrame) -> pd.DataFrame:
raw_len = len(df)
df = df.drop_duplicates(subset="date").sort_values("date").reset_index(drop=True)
df = df[df["high"] >= df["low"]].reset_index(drop=True)
df = df[(df["close"] > 0) & (df["open"] > 0)].reset_index(drop=True)
df["close"] = df["close"].ffill()
df["suspended"] = df["volume"] == 0
df.loc[df["suspended"], "volume"] = np.nan
df["volume"] = df["volume"].ffill()
df["pct_change"] = df["close"].pct_change()
df["price_jump"] = df["pct_change"].abs() > 0.20
return df, raw_len
def mode_clean(df: pd.DataFrame) -> tuple[pd.DataFrame, int]:
df_clean, raw_len = clean_ohlcv(df)
print(f"\n[{nexdo_time()}] 🧹 数据清洗报告")
print(f" 原始行数: {raw_len} 清洗后: {len(df_clean)}")
print(f" 停牌日: {df_clean['suspended'].sum()} 天")
print(f" 价格跳空: {df_clean['price_jump'].sum()} 次")
print(f"\n {'字段':<10} {'缺失数':>8} {'最小值':>10} {'最大值':>10} {'均值':>10}")
print(" " + "─" * 52)
for col in ["open", "high", "low", "close", "volume"]:
s = df_clean[col]
print(f" {col:<10} {s.isna().sum():>8} {s.min():>10.2f} {s.max():>10.2f} {s.mean():>10.2f}")
return df_clean, raw_len
df = pd.DataFrame({
"date": pd.bdate_range("2025-01-02", periods=5),
"open": [100, 101, 102, 103, 104],
"high": [102, 103, 104, 105, 106],
"low": [99, 100, 101, 102, 103],
"close": [101, np.nan, 103, 104, 105],
"volume": [1_000_000, 0, 2_000_000, 3_000_000, 4_000_000],
})
mode_clean(df)
Step 5:用 mode_indicators 生成完整技术指标面板
痛点与机制:
mode_indicators 把清洗后的收盘价一次性加上 MA、EMA、RSI、MACD 和布林带。它像给行情表新增一排仪表盘:价格只是原始速度,指标告诉你趋势、强弱和波动边界。
核心源码(逐字来自文末完整源码):
def mode_indicators(df: pd.DataFrame) -> pd.DataFrame:
print(f"\n[{nexdo_time()}] 📐 技术指标计算")
df["ma5"] = calc_ma(df["close"], 5)
df["ma20"] = calc_ma(df["close"], 20)
df["ema12"] = calc_ema(df["close"], 12)
df["ema26"] = calc_ema(df["close"], 26)
df["rsi14"] = calc_rsi(df["close"], 14)
df["macd"], df["macd_signal"], df["macd_hist"] = calc_macd(df["close"])
df["bb_mid"] = calc_ma(df["close"], 20)
df["bb_std"] = df["close"].rolling(20).std()
df["bb_upper"] = df["bb_mid"] + 2 * df["bb_std"]
df["bb_lower"] = df["bb_mid"] - 2 * df["bb_std"]
last = df.dropna().iloc[-1]
print(f"\n 最新指标({str(last['date'])[:10]}):")
print(f" {'指标':<14} {'数值':>10}")
print(" " + "─" * 28)
indicators = [
("收盘价", last["close"]),
("MA5", last["ma5"]),
("MA20", last["ma20"]),
("EMA12", last["ema12"]),
("EMA26", last["ema26"]),
("RSI14", last["rsi14"]),
("MACD", last["macd"]),
("信号线", last["macd_signal"]),
("BB上轨", last["bb_upper"]),
("BB下轨", last["bb_lower"]),
]
for name, val in indicators:
print(f" {name:<14} {val:>10.4f}")
return df
可运行演示(补齐 Mock 数据与 print 反馈):
import time
import numpy as np
import pandas as pd
def nexdo_time() -> str:
return time.strftime("%Y-%m-%d %H:%M:%S")
def calc_ma(series: pd.Series, n: int) -> pd.Series:
return series.rolling(n).mean()
def calc_ema(series: pd.Series, n: int) -> pd.Series:
return series.ewm(span=n, adjust=False).mean()
def calc_rsi(series: pd.Series, n: int = 14) -> pd.Series:
delta = series.diff()
gain = delta.clip(lower=0).rolling(n).mean()
loss = (-delta.clip(upper=0)).rolling(n).mean()
rs = gain / loss.replace(0, np.nan)
return 100 - 100 / (1 + rs)
def calc_macd(series: pd.Series) -> tuple[pd.Series, pd.Series, pd.Series]:
ema12 = calc_ema(series, 12)
ema26 = calc_ema(series, 26)
macd = ema12 - ema26
signal = calc_ema(macd, 9)
hist = macd - signal
return macd, signal, hist
def mode_indicators(df: pd.DataFrame) -> pd.DataFrame:
print(f"\n[{nexdo_time()}] 📐 技术指标计算")
df["ma5"] = calc_ma(df["close"], 5)
df["ma20"] = calc_ma(df["close"], 20)
df["ema12"] = calc_ema(df["close"], 12)
df["ema26"] = calc_ema(df["close"], 26)
df["rsi14"] = calc_rsi(df["close"], 14)
df["macd"], df["macd_signal"], df["macd_hist"] = calc_macd(df["close"])
df["bb_mid"] = calc_ma(df["close"], 20)
df["bb_std"] = df["close"].rolling(20).std()
df["bb_upper"] = df["bb_mid"] + 2 * df["bb_std"]
df["bb_lower"] = df["bb_mid"] - 2 * df["bb_std"]
last = df.dropna().iloc[-1]
print(" 最新日期:", str(last["date"])[:10])
for name in ["close", "ma5", "ma20", "rsi14", "macd", "macd_signal"]:
print(f" {name:<12}: {last[name]:.4f}")
return df
df = pd.DataFrame({"date": pd.bdate_range("2025-01-02", periods=40), "close": np.linspace(100, 120, 40) + np.sin(np.arange(40))})
mode_indicators(df)
Step 6:用 mode_chart 在终端画 ASCII 行情图
痛点与机制:
mode_chart 用 ASCII 在终端画行情图。它把价格、MA5、MA20 映射到字符高度,像用积木搭一张走势图;不需要图片窗口,也能在服务器里快速观察趋势。
核心源码(逐字来自文末完整源码):
def mode_chart(df: pd.DataFrame) -> None:
print(f"\n[{nexdo_time()}] 📊 ASCII 行情图(最近60日)")
seg = df.dropna(subset=["ma20"]).tail(60).reset_index(drop=True)
close = seg["close"].values
ma5 = seg["ma5"].values
ma20 = seg["ma20"].values
rsi = seg["rsi14"].values
macd_h = seg["macd_hist"].values
lo, hi = close.min() * 0.99, close.max() * 1.01
rows = 10
print(f"\n 价格 + MA5(·) + MA20(-)")
print(" " + "─" * 72)
for r in range(rows, 0, -1):
thr = lo + (hi - lo) * r / rows
row = ""
for i in range(len(close)):
c_hit = close[i] >= thr
m5_hit = ma5[i] >= thr
m20_hit = ma20[i] >= thr
if c_hit and m5_hit:
row += "●"
elif c_hit:
row += "│"
elif m5_hit:
row += "·"
elif m20_hit:
row += "-"
else:
row += " "
print(f" {thr:>8.2f} │{row}│")
print(" " + "─" * 72)
# RSI
print(f"\n RSI(14) [超买>70 ─── 超卖<30]")
print(" " + "─" * 72)
for level in [80, 70, 50, 30, 20]:
row = "".join("█" if v >= level else " " for v in rsi)
marker = " ← 超买" if level == 70 else (" ← 超卖" if level == 30 else "")
print(f" {level:>3} │{row}│{marker}")
print(" " + "─" * 72)
# MACD 柱状图
print(f"\n MACD 柱状图")
max_h = max(abs(macd_h.max()), abs(macd_h.min()))
print(" " + "─" * 72)
for r in range(4, -5, -1):
if r == 0:
print(" " + " " * 10 + "─" * len(macd_h))
continue
thr = max_h * r / 4
row = ""
for v in macd_h:
if r > 0:
row += "▲" if v >= thr else " "
else:
row += "▼" if v <= thr else " "
print(f" {thr:>+9.4f} │{row}│")
print(" " + "─" * 72)
可运行演示(补齐 Mock 数据与 print 反馈):
import time
import numpy as np
import pandas as pd
def nexdo_time() -> str:
return time.strftime("%Y-%m-%d %H:%M:%S")
def mode_chart(df: pd.DataFrame) -> None:
print(f"\n[{nexdo_time()}] 📊 ASCII 行情图(最近60日)")
seg = df.dropna(subset=["ma20"]).tail(20).reset_index(drop=True)
close = seg["close"].values
ma5 = seg["ma5"].values
ma20 = seg["ma20"].values
lo, hi = close.min() * 0.99, close.max() * 1.01
rows = 6
print("\n 价格 + MA5(·) + MA20(-)")
print(" " + "─" * 32)
for r in range(rows, 0, -1):
thr = lo + (hi - lo) * r / rows
row = ""
for i in range(len(close)):
if close[i] >= thr and ma5[i] >= thr:
row += "●"
elif close[i] >= thr:
row += "│"
elif ma5[i] >= thr:
row += "·"
elif ma20[i] >= thr:
row += "-"
else:
row += " "
print(f" {thr:>8.2f} │{row}│")
print(" " + "─" * 32)
base = np.linspace(100, 120, 45) + np.sin(np.arange(45))
df = pd.DataFrame({"close": base})
df["ma5"] = df["close"].rolling(5).mean()
df["ma20"] = df["close"].rolling(20).mean()
mode_chart(df)
Step 7:用 main 做 clean/indicators/chart/all 脚本遥控器
痛点与机制:
main 是脚本遥控器。用户通过 --mode clean/indicators/chart/all 切换功能,但脚本始终先生成数据、清洗数据、计算指标,再按需要画图,这保证了每个模式都基于同一条数据流水线。
核心源码(逐字来自文末完整源码):
def main() -> None:
parser = argparse.ArgumentParser(description="量化数据清洗与技术指标")
parser.add_argument("--mode", choices=["clean", "indicators", "chart", "all"],
default="all", help="运行模式")
args = parser.parse_args()
df_raw = generate_ohlcv()
print(f"[{nexdo_time()}] 数据生成:{len(df_raw)} 条 OHLCV 记录")
df_clean, _ = mode_clean(df_raw)
df_ind = mode_indicators(df_clean)
if args.mode in ("chart", "all"):
mode_chart(df_ind)
if __name__ == "__main__":
main()
可运行演示(补齐 Mock 数据与 print 反馈):
import argparse
import sys
import pandas as pd
def generate_ohlcv() -> pd.DataFrame:
return pd.DataFrame({"close": [100, 101, 102]})
def mode_clean(df: pd.DataFrame):
print("运行 clean:清洗 OHLCV 数据")
return df, len(df)
def mode_indicators(df: pd.DataFrame) -> pd.DataFrame:
print("运行 indicators:计算 MA/EMA/RSI/MACD")
return df
def mode_chart(df: pd.DataFrame) -> None:
print("运行 chart:输出 ASCII 行情图")
def main() -> None:
parser = argparse.ArgumentParser(description="量化数据清洗与技术指标")
parser.add_argument("--mode", choices=["clean", "indicators", "chart", "all"],
default="all", help="运行模式")
args = parser.parse_args()
df_raw = generate_ohlcv()
print(f"数据生成:{len(df_raw)} 条 OHLCV 记录")
df_clean, _ = mode_clean(df_raw)
df_ind = mode_indicators(df_clean)
if args.mode in ("chart", "all"):
mode_chart(df_ind)
for mode in ["clean", "indicators", "chart", "all"]:
print(f"\n>>> python3 36-quant-data.py --mode {mode}")
sys.argv = ["prog", "--mode", mode]
main()
极客实战:完整源码与运行
现在,把上面的积木拼起来,将以下完整代码放进你的编辑器,运行它。先看整体闭环,再回头逐段改参数,你会更容易建立工程直觉。
#!/usr/bin/env python3
"""
36-quant-data.py — 股票数据清洗与技术指标计算
用法:
python 36-quant-data.py --mode clean
python 36-quant-data.py --mode indicators
python 36-quant-data.py --mode chart
"""
import argparse
import time
import numpy as np
import pandas as pd
def nexdo_time() -> str:
return time.strftime("%Y-%m-%d %H:%M:%S")
def generate_ohlcv(n: int = 250) -> pd.DataFrame:
"""生成模拟 OHLCV 数据,含噪声和异常值"""
np.random.seed(42)
dates = pd.bdate_range("2025-01-02", periods=n)
returns = np.random.randn(n) * 0.015 + 0.0003
close = 100.0 * np.exp(np.cumsum(returns))
daily_range = np.abs(np.random.randn(n)) * 1.5 + 0.5
high = close * (1 + daily_range / 200)
low = close * (1 - daily_range / 200)
open_ = low + np.random.rand(n) * (high - low)
volume = np.random.randint(1_000_000, 8_000_000, n).astype(float)
df = pd.DataFrame({
"date": dates, "open": open_, "high": high,
"low": low, "close": close, "volume": volume,
})
# 注入异常:缺失值、零价格、高低价倒置
df.loc[np.random.choice(df.index, 8, replace=False), "close"] = np.nan
df.loc[np.random.choice(df.index, 3, replace=False), "volume"] = 0
bad_idx = np.random.choice(df.index, 2, replace=False)
df.loc[bad_idx, ["high", "low"]] = df.loc[bad_idx, ["low", "high"]].values
return df
def clean_ohlcv(df: pd.DataFrame) -> pd.DataFrame:
"""数据清洗流程"""
raw_len = len(df)
df = df.drop_duplicates(subset="date").sort_values("date").reset_index(drop=True)
# 删除高低价倒置
df = df[df["high"] >= df["low"]].reset_index(drop=True)
# 删除零价格
df = df[(df["close"] > 0) & (df["open"] > 0)].reset_index(drop=True)
# 前向填充缺失收盘价
df["close"] = df["close"].ffill()
# 标记停牌(成交量为0)
df["suspended"] = df["volume"] == 0
df.loc[df["suspended"], "volume"] = np.nan
df["volume"] = df["volume"].ffill()
# 标记价格跳空(涨跌幅 > 20%)
df["pct_change"] = df["close"].pct_change()
df["price_jump"] = df["pct_change"].abs() > 0.20
return df, raw_len
def calc_ma(series: pd.Series, n: int) -> pd.Series:
return series.rolling(n).mean()
def calc_ema(series: pd.Series, n: int) -> pd.Series:
return series.ewm(span=n, adjust=False).mean()
def calc_rsi(series: pd.Series, n: int = 14) -> pd.Series:
delta = series.diff()
gain = delta.clip(lower=0).rolling(n).mean()
loss = (-delta.clip(upper=0)).rolling(n).mean()
rs = gain / loss.replace(0, np.nan)
return 100 - 100 / (1 + rs)
def calc_macd(series: pd.Series) -> tuple[pd.Series, pd.Series, pd.Series]:
ema12 = calc_ema(series, 12)
ema26 = calc_ema(series, 26)
macd = ema12 - ema26
signal = calc_ema(macd, 9)
hist = macd - signal
return macd, signal, hist
def mode_clean(df: pd.DataFrame) -> tuple[pd.DataFrame, int]:
df_clean, raw_len = clean_ohlcv(df)
print(f"\n[{nexdo_time()}] 🧹 数据清洗报告")
print(f" 原始行数: {raw_len} 清洗后: {len(df_clean)}")
print(f" 停牌日: {df_clean['suspended'].sum()} 天")
print(f" 价格跳空: {df_clean['price_jump'].sum()} 次")
print(f"\n {'字段':<10} {'缺失数':>8} {'最小值':>10} {'最大值':>10} {'均值':>10}")
print(" " + "─" * 52)
for col in ["open", "high", "low", "close", "volume"]:
s = df_clean[col]
print(f" {col:<10} {s.isna().sum():>8} {s.min():>10.2f} {s.max():>10.2f} {s.mean():>10.2f}")
return df_clean, raw_len
def mode_indicators(df: pd.DataFrame) -> pd.DataFrame:
print(f"\n[{nexdo_time()}] 📐 技术指标计算")
df["ma5"] = calc_ma(df["close"], 5)
df["ma20"] = calc_ma(df["close"], 20)
df["ema12"] = calc_ema(df["close"], 12)
df["ema26"] = calc_ema(df["close"], 26)
df["rsi14"] = calc_rsi(df["close"], 14)
df["macd"], df["macd_signal"], df["macd_hist"] = calc_macd(df["close"])
df["bb_mid"] = calc_ma(df["close"], 20)
df["bb_std"] = df["close"].rolling(20).std()
df["bb_upper"] = df["bb_mid"] + 2 * df["bb_std"]
df["bb_lower"] = df["bb_mid"] - 2 * df["bb_std"]
last = df.dropna().iloc[-1]
print(f"\n 最新指标({str(last['date'])[:10]}):")
print(f" {'指标':<14} {'数值':>10}")
print(" " + "─" * 28)
indicators = [
("收盘价", last["close"]),
("MA5", last["ma5"]),
("MA20", last["ma20"]),
("EMA12", last["ema12"]),
("EMA26", last["ema26"]),
("RSI14", last["rsi14"]),
("MACD", last["macd"]),
("信号线", last["macd_signal"]),
("BB上轨", last["bb_upper"]),
("BB下轨", last["bb_lower"]),
]
for name, val in indicators:
print(f" {name:<14} {val:>10.4f}")
return df
def mode_chart(df: pd.DataFrame) -> None:
print(f"\n[{nexdo_time()}] 📊 ASCII 行情图(最近60日)")
seg = df.dropna(subset=["ma20"]).tail(60).reset_index(drop=True)
close = seg["close"].values
ma5 = seg["ma5"].values
ma20 = seg["ma20"].values
rsi = seg["rsi14"].values
macd_h = seg["macd_hist"].values
lo, hi = close.min() * 0.99, close.max() * 1.01
rows = 10
print(f"\n 价格 + MA5(·) + MA20(-)")
print(" " + "─" * 72)
for r in range(rows, 0, -1):
thr = lo + (hi - lo) * r / rows
row = ""
for i in range(len(close)):
c_hit = close[i] >= thr
m5_hit = ma5[i] >= thr
m20_hit = ma20[i] >= thr
if c_hit and m5_hit:
row += "●"
elif c_hit:
row += "│"
elif m5_hit:
row += "·"
elif m20_hit:
row += "-"
else:
row += " "
print(f" {thr:>8.2f} │{row}│")
print(" " + "─" * 72)
# RSI
print(f"\n RSI(14) [超买>70 ─── 超卖<30]")
print(" " + "─" * 72)
for level in [80, 70, 50, 30, 20]:
row = "".join("█" if v >= level else " " for v in rsi)
marker = " ← 超买" if level == 70 else (" ← 超卖" if level == 30 else "")
print(f" {level:>3} │{row}│{marker}")
print(" " + "─" * 72)
# MACD 柱状图
print(f"\n MACD 柱状图")
max_h = max(abs(macd_h.max()), abs(macd_h.min()))
print(" " + "─" * 72)
for r in range(4, -5, -1):
if r == 0:
print(" " + " " * 10 + "─" * len(macd_h))
continue
thr = max_h * r / 4
row = ""
for v in macd_h:
if r > 0:
row += "▲" if v >= thr else " "
else:
row += "▼" if v <= thr else " "
print(f" {thr:>+9.4f} │{row}│")
print(" " + "─" * 72)
def main() -> None:
parser = argparse.ArgumentParser(description="量化数据清洗与技术指标")
parser.add_argument("--mode", choices=["clean", "indicators", "chart", "all"],
default="all", help="运行模式")
args = parser.parse_args()
df_raw = generate_ohlcv()
print(f"[{nexdo_time()}] 数据生成:{len(df_raw)} 条 OHLCV 记录")
df_clean, _ = mode_clean(df_raw)
df_ind = mode_indicators(df_clean)
if args.mode in ("chart", "all"):
mode_chart(df_ind)
if __name__ == "__main__":
main()
$ python3 36-python-quant-data.py --mode clean
[2026-04-18 05:39:31] 🧹 数据清洗报告
原始行数: 252 清洗后: 244
停牌日: 8 天
价格跳空: 3 次
高低价倒置: 2 行(已删除)
$ python3 36-python-quant-data.py --mode indicators
[2026-04-18 05:39:31] 📐 技术指标计算
MA5=87.80 MA20=86.45 信号: 金叉 ✅
EMA12=87.92 EMA26=86.78
RSI14=58.3 状态: 中性
MACD=1.14 Signal=0.89 Hist=0.25 信号: 多头 ✅
小结
| 概念 | 一句话记忆 |
|---|---|
| OHLCV | Open/High/Low/Close/Volume,量化数据标准格式 |
pd.bdate_range |
生成工作日序列,自动跳过周末 |
ffill |
前向填充,时间序列缺失值的标准处理 |
rolling(n).mean() |
等权重移动平均,滞后性强 |
ewm(span=n).mean() |
指数加权均线,近期权重大,反应快 |
| RSI > 70 | 超买信号,可能回调 |
| RSI < 30 | 超卖信号,可能反弹 |
| MACD 金叉 | MACD 线上穿信号线,多头信号 |
| MACD 死叉 | MACD 线下穿信号线,空头信号 |
⏱ NexDo Time(5 分钟)
挑战:实现布林带(Bollinger Bands)指标。
具体步骤:
middle = calc_ma(close, 20)(中轨 = 20日均线)std = close.rolling(20).std()(20日标准差)upper = middle + 2 * std(上轨 = 中轨 + 2σ)lower = middle - 2 * std(下轨 = 中轨 - 2σ)- 打印最新的上轨/中轨/下轨值,以及当前价格在布林带中的位置(
(close - lower) / (upper - lower),0 表示在下轨,1 表示在上轨)
Don’t wait for next time, do it in the next moment.