34 · SciPy 实战:统计检验、信号处理与投资组合优化
🔗 知识图谱导航:阅读本文前,建议先掌握《31 · NumPy 实战》中的 ndarray 操作——SciPy 是 NumPy 的科学计算扩展,所有函数都接受 ndarray 输入。
运行环境:
pip install scipy numpy。
极客解析:SciPy 是"科学计算瑞士军刀":
stats做统计检验,signal做信号处理,optimize做数值优化。三个模块看似独立,但在量化金融里经常组合使用:用stats检验收益率分布,用signal过滤价格噪声,用optimize求最优投资组合权重。
SciPy 三大模块速查
scipy.stats 统计检验:normaltest/ttest_ind/pearsonr/kstest
scipy.signal 信号处理:butter/filtfilt/find_peaks/spectrogram
scipy.optimize 数值优化:minimize/curve_fit/linprog/differential_evolution
步步为营:核心逻辑自适应拆解
这一篇按 SciPy 的真实工作流拆成 6 个小台阶:先生成样本,再把统计检验结果翻译成人话,接着做完整统计报告、信号滤波、组合优化,最后用 CLI 把三条分析线串起来。每个演示都有 Mock 数据和 print() 反馈。
Step 1:用 generate_data 造两组收益率样本
痛点与机制:
generate_data 是统计检验的样本工厂:它造出两组股票日收益率,一组波动小,一组波动大。可以把它们想成两位选手的每日成绩单,后面 stats 要回答的就是:成绩分布正常吗?平均成绩真的不一样吗?
核心源码(逐字来自文末完整源码):
def generate_data(n: int = 252) -> tuple[np.ndarray, np.ndarray]:
"""生成两组模拟股票收益率"""
np.random.seed(42)
r1 = np.random.randn(n) * 0.015 + 0.0003 # 低波动
r2 = np.random.randn(n) * 0.025 + 0.0005 # 高波动
return r1, r2
可运行演示(补齐 Mock 数据与 print 反馈):
import numpy as np
def generate_data(n: int = 252) -> tuple[np.ndarray, np.ndarray]:
"""生成两组模拟股票收益率"""
np.random.seed(42)
r1 = np.random.randn(n) * 0.015 + 0.0003 # 低波动
r2 = np.random.randn(n) * 0.025 + 0.0005 # 高波动
return r1, r2
r1, r2 = generate_data(n=8)
print("📦 两组收益率样本已生成")
print("r1 低波动:", r1.round(4).tolist())
print("r2 高波动:", r2.round(4).tolist())
print(f"标准差对比: r1={r1.std():.4f}, r2={r2.std():.4f}")
Step 2:用 print_result 把 p 值翻译成结论
痛点与机制:
print_result 是统计报告的翻译官。统计检验输出的 stat 和 p 对新手很抽象,它把 p 值翻译成“是否拒绝原假设 H₀”。这就像体检报告把一串指标变成“需要复查/暂不需要复查”的结论。
核心源码(逐字来自文末完整源码):
def print_result(label: str, stat: float, pval: float, alpha: float = 0.05) -> None:
conclusion = "✅ 拒绝H₀" if pval < alpha else "❌ 不拒绝H₀"
print(f" {label:<22} stat={stat:>8.4f} p={pval:.4f} {conclusion}")
可运行演示(补齐 Mock 数据与 print 反馈):
def print_result(label: str, stat: float, pval: float, alpha: float = 0.05) -> None:
conclusion = "✅ 拒绝H₀" if pval < alpha else "❌ 不拒绝H₀"
print(f" {label:<22} stat={stat:>8.4f} p={pval:.4f} {conclusion}")
print("统计检验结果格式化:")
print_result("示例A: p很小", stat=3.21, pval=0.012)
print_result("示例B: p较大", stat=0.88, pval=0.420)
print("解释: p < 0.05 通常表示证据足够强,可以拒绝原假设 H₀。")
Step 3:用 mode_stats 生成完整统计检验报告
痛点与机制:
mode_stats 把正态性检验、t 检验、相关性和 KS 检验串成一张研究报告。p 值可以理解成“如果原假设是真的,看到当前结果有多意外”;p 越小,越说明当前结果不像偶然。量化研究里,不能只看收益高,还要看它是不是统计上站得住。
核心源码(逐字来自文末完整源码):
def mode_stats(r1: np.ndarray, r2: np.ndarray) -> None:
print(f"\n[{nexdo_time()}] 🔬 统计检验报告")
print(f" {'检验项目':<22} {'统计量':>12} {'p值':>8} 结论")
print(" " + "─" * 60)
stat, p = stats.normaltest(r1)
print_result("正态性检验(r1)", stat, p)
stat, p = stats.normaltest(r2)
print_result("正态性检验(r2)", stat, p)
stat, p = stats.ttest_ind(r1, r2)
print_result("独立t检验(均值差)", stat, p)
stat, p = stats.pearsonr(r1, r2)
print_result("Pearson相关性", stat, p)
stat, p = stats.spearmanr(r1, r2)
print_result("Spearman相关性", stat, p)
stat, p = stats.kstest(r1, "norm", args=(r1.mean(), r1.std()))
print_result("KS正态拟合(r1)", stat, p)
# 描述统计
print(f"\n 描述统计:")
for name, r in [("r1(低波动)", r1), ("r2(高波动)", r2)]:
desc = stats.describe(r)
print(f" {name}: 均值={desc.mean:+.4f} 方差={desc.variance:.6f} "
f"偏度={desc.skewness:.3f} 峰度={desc.kurtosis:.3f}")
可运行演示(补齐 Mock 数据与 print 反馈):
import time
import numpy as np
from scipy import stats
def nexdo_time() -> str:
return time.strftime("%Y-%m-%d %H:%M:%S")
def generate_data(n: int = 252) -> tuple[np.ndarray, np.ndarray]:
"""生成两组模拟股票收益率"""
np.random.seed(42)
r1 = np.random.randn(n) * 0.015 + 0.0003 # 低波动
r2 = np.random.randn(n) * 0.025 + 0.0005 # 高波动
return r1, r2
def print_result(label: str, stat: float, pval: float, alpha: float = 0.05) -> None:
conclusion = "✅ 拒绝H₀" if pval < alpha else "❌ 不拒绝H₀"
print(f" {label:<22} stat={stat:>8.4f} p={pval:.4f} {conclusion}")
def mode_stats(r1: np.ndarray, r2: np.ndarray) -> None:
print(f"\n[{nexdo_time()}] 🔬 统计检验报告")
print(f" {'检验项目':<22} {'统计量':>12} {'p值':>8} 结论")
print(" " + "─" * 60)
stat, p = stats.normaltest(r1)
print_result("正态性检验(r1)", stat, p)
stat, p = stats.normaltest(r2)
print_result("正态性检验(r2)", stat, p)
stat, p = stats.ttest_ind(r1, r2)
print_result("独立t检验(均值差)", stat, p)
stat, p = stats.pearsonr(r1, r2)
print_result("Pearson相关性", stat, p)
stat, p = stats.spearmanr(r1, r2)
print_result("Spearman相关性", stat, p)
stat, p = stats.kstest(r1, "norm", args=(r1.mean(), r1.std()))
print_result("KS正态拟合(r1)", stat, p)
# 描述统计
print(f"\n 描述统计:")
for name, r in [("r1(低波动)", r1), ("r2(高波动)", r2)]:
desc = stats.describe(r)
print(f" {name}: 均值={desc.mean:+.4f} 方差={desc.variance:.6f} "
f"偏度={desc.skewness:.3f} 峰度={desc.kurtosis:.3f}")
r1, r2 = generate_data(n=80)
mode_stats(r1, r2)
Step 4:用 mode_signal 给价格序列降噪并找周期
痛点与机制:
mode_signal 把价格序列当成带噪声的信号处理。Butterworth 低通滤波像给价格线戴上降噪耳机,保留慢变化趋势,削弱高频抖动;FFT 像拆音乐和弦,找出价格里最明显的周期成分。
核心源码(逐字来自文末完整源码):
def mode_signal(price: np.ndarray) -> None:
print(f"\n[{nexdo_time()}] 📡 信号处理分析")
# Butterworth 低通滤波
b, a = signal.butter(3, 0.1, btype="low")
filtered = signal.filtfilt(b, a, price)
residual = price - filtered
print(f" 原始价格: 均值={price.mean():.2f} 标准差={price.std():.2f}")
print(f" 滤波后: 均值={filtered.mean():.2f} 标准差={filtered.std():.2f}")
print(f" 残差: 均值={residual.mean():.4f} 标准差={residual.std():.2f}")
# FFT 频谱分析
n = len(price)
freqs = fft.fftfreq(n, d=1.0)
spectrum = np.abs(fft.fft(price - price.mean()))
pos_mask = freqs > 0
top_idx = np.argsort(spectrum[pos_mask])[-3:][::-1]
top_freqs = freqs[pos_mask][top_idx]
top_periods = 1.0 / top_freqs
print(f"\n FFT 主要周期成分(前3):")
print(f" {'频率':>10} {'周期(天)':>12} {'振幅':>10}")
print(" " + "─" * 36)
for f, T, amp in zip(top_freqs, top_periods, spectrum[pos_mask][top_idx]):
print(f" {f:>10.4f} {T:>12.1f} {amp:>10.1f}")
# 峰值检测
peaks, _ = signal.find_peaks(filtered, distance=20)
troughs, _ = signal.find_peaks(-filtered, distance=20)
print(f"\n 滤波后价格峰值: {len(peaks)} 个,谷值: {len(troughs)} 个")
# ASCII 滤波对比
print(f"\n 价格 vs 滤波(ASCII,最近40日)")
seg_raw = price[-40:]
seg_flt = filtered[-40:]
lo, hi = min(seg_raw.min(), seg_flt.min()), max(seg_raw.max(), seg_flt.max())
rows = 8
print(" " + "─" * 52)
for r in range(rows, 0, -1):
thr = lo + (hi - lo) * r / rows
raw_row = "".join("●" if v >= thr else " " for v in seg_raw)
flt_row = "".join("─" if v >= thr else " " for v in seg_flt)
merged = "".join(
"◆" if raw_row[i] != " " and flt_row[i] != " "
else raw_row[i] if raw_row[i] != " "
else flt_row[i]
for i in range(len(seg_raw))
)
print(f" {thr:>7.1f} │{merged}│")
print(" " + "─" * 52)
print(" ● 原始价格 ─ 滤波曲线 ◆ 重叠")
可运行演示(补齐 Mock 数据与 print 反馈):
import time
import numpy as np
from scipy import signal, fft
def nexdo_time() -> str:
return time.strftime("%Y-%m-%d %H:%M:%S")
def generate_price(n: int = 252) -> np.ndarray:
np.random.seed(42)
noise = np.random.randn(n) * 2
trend = np.linspace(0, 10, n)
cycle = 5 * np.sin(2 * np.pi * np.arange(n) / 30)
return 100 + trend + cycle + noise
def mode_signal(price: np.ndarray) -> None:
print(f"\n[{nexdo_time()}] 📡 信号处理分析")
# Butterworth 低通滤波
b, a = signal.butter(3, 0.1, btype="low")
filtered = signal.filtfilt(b, a, price)
residual = price - filtered
print(f" 原始价格: 均值={price.mean():.2f} 标准差={price.std():.2f}")
print(f" 滤波后: 均值={filtered.mean():.2f} 标准差={filtered.std():.2f}")
print(f" 残差: 均值={residual.mean():.4f} 标准差={residual.std():.2f}")
# FFT 频谱分析
n = len(price)
freqs = fft.fftfreq(n, d=1.0)
spectrum = np.abs(fft.fft(price - price.mean()))
pos_mask = freqs > 0
top_idx = np.argsort(spectrum[pos_mask])[-3:][::-1]
top_freqs = freqs[pos_mask][top_idx]
top_periods = 1.0 / top_freqs
print(f"\n FFT 主要周期成分(前3):")
print(f" {'频率':>10} {'周期(天)':>12} {'振幅':>10}")
print(" " + "─" * 36)
for f, T, amp in zip(top_freqs, top_periods, spectrum[pos_mask][top_idx]):
print(f" {f:>10.4f} {T:>12.1f} {amp:>10.1f}")
# 峰值检测
peaks, _ = signal.find_peaks(filtered, distance=20)
troughs, _ = signal.find_peaks(-filtered, distance=20)
print(f"\n 滤波后价格峰值: {len(peaks)} 个,谷值: {len(troughs)} 个")
# ASCII 滤波对比
print(f"\n 价格 vs 滤波(ASCII,最近40日)")
seg_raw = price[-40:]
seg_flt = filtered[-40:]
lo, hi = min(seg_raw.min(), seg_flt.min()), max(seg_raw.max(), seg_flt.max())
rows = 8
print(" " + "─" * 52)
for r in range(rows, 0, -1):
thr = lo + (hi - lo) * r / rows
raw_row = "".join("●" if v >= thr else " " for v in seg_raw)
flt_row = "".join("─" if v >= thr else " " for v in seg_flt)
merged = "".join(
"◆" if raw_row[i] != " " and flt_row[i] != " "
else raw_row[i] if raw_row[i] != " "
else flt_row[i]
for i in range(len(seg_raw))
)
print(f" {thr:>7.1f} │{merged}│")
print(" " + "─" * 52)
print(" ● 原始价格 ─ 滤波曲线 ◆ 重叠")
price = generate_price(n=120)
mode_signal(price)
Step 5:用 mode_optimize 求最大夏普组合权重
痛点与机制:
mode_optimize 用 optimize.minimize 求最大夏普比率。因为 SciPy 只会“最小化”,所以代码把夏普比率加了负号,变成最小化负夏普。约束 w.sum() == 1 像预算必须花完,边界 (0,1) 像每个资产不能做空、不能超过 100%。
核心源码(逐字来自文末完整源码):
def mode_optimize(r1: np.ndarray, r2: np.ndarray) -> None:
print(f"\n[{nexdo_time()}] ⚡ 投资组合优化(最大化夏普比率)")
np.random.seed(42)
n_assets = 5
returns_matrix = np.column_stack([
r1, r2,
np.random.randn(len(r1)) * 0.012 + 0.0002,
np.random.randn(len(r1)) * 0.020 + 0.0004,
np.random.randn(len(r1)) * 0.018 + 0.0003,
])
def neg_sharpe(w: np.ndarray) -> float:
port = returns_matrix @ w
return -(port.mean() / port.std() * np.sqrt(252))
x0 = np.ones(n_assets) / n_assets
constraints = {"type": "eq", "fun": lambda w: w.sum() - 1}
bounds = [(0.0, 1.0)] * n_assets
result = optimize.minimize(neg_sharpe, x0, method="SLSQP",
constraints=constraints, bounds=bounds)
w_opt = result.x
port_ret = returns_matrix @ w_opt
sharpe = port_ret.mean() / port_ret.std() * np.sqrt(252)
print(f" 优化{'成功' if result.success else '失败'} 最大夏普比率: {sharpe:.4f}")
print(f"\n {'资产':<8} {'权重':>8} {'权重条形'}")
print(" " + "─" * 40)
assets = ["资产A", "资产B", "资产C", "资产D", "资产E"]
for name, w in zip(assets, w_opt):
bar = "█" * int(w * 30)
print(f" {name:<8} {w:>7.2%} {bar}")
可运行演示(补齐 Mock 数据与 print 反馈):
import time
import numpy as np
from scipy import optimize
def nexdo_time() -> str:
return time.strftime("%Y-%m-%d %H:%M:%S")
def generate_data(n: int = 252) -> tuple[np.ndarray, np.ndarray]:
"""生成两组模拟股票收益率"""
np.random.seed(42)
r1 = np.random.randn(n) * 0.015 + 0.0003 # 低波动
r2 = np.random.randn(n) * 0.025 + 0.0005 # 高波动
return r1, r2
def mode_optimize(r1: np.ndarray, r2: np.ndarray) -> None:
print(f"\n[{nexdo_time()}] ⚡ 投资组合优化(最大化夏普比率)")
np.random.seed(42)
n_assets = 5
returns_matrix = np.column_stack([
r1, r2,
np.random.randn(len(r1)) * 0.012 + 0.0002,
np.random.randn(len(r1)) * 0.020 + 0.0004,
np.random.randn(len(r1)) * 0.018 + 0.0003,
])
def neg_sharpe(w: np.ndarray) -> float:
port = returns_matrix @ w
return -(port.mean() / port.std() * np.sqrt(252))
x0 = np.ones(n_assets) / n_assets
constraints = {"type": "eq", "fun": lambda w: w.sum() - 1}
bounds = [(0.0, 1.0)] * n_assets
result = optimize.minimize(neg_sharpe, x0, method="SLSQP",
constraints=constraints, bounds=bounds)
w_opt = result.x
port_ret = returns_matrix @ w_opt
sharpe = port_ret.mean() / port_ret.std() * np.sqrt(252)
print(f" 优化{'成功' if result.success else '失败'} 最大夏普比率: {sharpe:.4f}")
print(f"\n {'资产':<8} {'权重':>8} {'权重条形'}")
print(" " + "─" * 40)
assets = ["资产A", "资产B", "资产C", "资产D", "资产E"]
for name, w in zip(assets, w_opt):
bar = "█" * int(w * 30)
print(f" {name:<8} {w:>7.2%} {bar}")
r1, r2 = generate_data(n=120)
mode_optimize(r1, r2)
Step 6:用 main 做 stats/signal/optimize/all 脚本遥控器
痛点与机制:
main 是脚本遥控器:--mode stats/signal/optimize/all 决定跑哪条分析线。它先统一生成收益率和价格,再分发给对应模块,像实验室里同一批样本送去不同仪器检测,保证结果口径一致。
核心源码(逐字来自文末完整源码):
def main() -> None:
parser = argparse.ArgumentParser(description="SciPy 金融统计分析")
parser.add_argument("--mode", choices=["stats", "signal", "optimize", "all"],
default="all", help="运行模式")
args = parser.parse_args()
r1, r2 = generate_data()
price = generate_price()
print(f"[{nexdo_time()}] 数据生成:{len(r1)} 个交易日")
if args.mode in ("stats", "all"):
mode_stats(r1, r2)
if args.mode in ("signal", "all"):
mode_signal(price)
if args.mode in ("optimize", "all"):
mode_optimize(r1, r2)
if __name__ == "__main__":
main()
可运行演示(补齐 Mock 数据与 print 反馈):
import argparse
import sys
import time
import numpy as np
def nexdo_time() -> str:
return time.strftime("%Y-%m-%d %H:%M:%S")
def generate_data(n: int = 252) -> tuple[np.ndarray, np.ndarray]:
np.random.seed(42)
return np.random.randn(n) * 0.015 + 0.0003, np.random.randn(n) * 0.025 + 0.0005
def generate_price(n: int = 252) -> np.ndarray:
np.random.seed(42)
return 100 + np.linspace(0, 10, n) + np.random.randn(n)
def mode_stats(r1: np.ndarray, r2: np.ndarray) -> None:
print("运行 stats:统计检验", len(r1), "条收益率")
def mode_signal(price: np.ndarray) -> None:
print("运行 signal:信号处理", len(price), "个价格点")
def mode_optimize(r1: np.ndarray, r2: np.ndarray) -> None:
print("运行 optimize:组合优化", len(r1), "天样本")
def main() -> None:
parser = argparse.ArgumentParser(description="SciPy 金融统计分析")
parser.add_argument("--mode", choices=["stats", "signal", "optimize", "all"],
default="all", help="运行模式")
args = parser.parse_args()
r1, r2 = generate_data()
price = generate_price()
print(f"[{nexdo_time()}] 数据生成:{len(r1)} 个交易日")
if args.mode in ("stats", "all"):
mode_stats(r1, r2)
if args.mode in ("signal", "all"):
mode_signal(price)
if args.mode in ("optimize", "all"):
mode_optimize(r1, r2)
for mode in ["stats", "signal", "optimize", "all"]:
print(f"\n>>> python3 34-scipy-finance.py --mode {mode}")
sys.argv = ["prog", "--mode", mode]
main()
极客实战:完整源码与运行
现在,把上面的积木拼起来,将以下完整代码放进你的编辑器,运行它。先看整体闭环,再回头逐段改参数,你会更容易建立工程直觉。
#!/usr/bin/env python3
"""
34-scipy-finance.py — SciPy 金融数据统计与信号分析
用法:
python 34-scipy-finance.py --mode stats
python 34-scipy-finance.py --mode signal
python 34-scipy-finance.py --mode optimize
"""
import argparse
import time
import numpy as np
from scipy import stats, signal, fft, optimize
def nexdo_time() -> str:
return time.strftime("%Y-%m-%d %H:%M:%S")
def generate_data(n: int = 252) -> tuple[np.ndarray, np.ndarray]:
"""生成两组模拟股票收益率"""
np.random.seed(42)
r1 = np.random.randn(n) * 0.015 + 0.0003 # 低波动
r2 = np.random.randn(n) * 0.025 + 0.0005 # 高波动
return r1, r2
def generate_price(n: int = 252) -> np.ndarray:
np.random.seed(42)
noise = np.random.randn(n) * 2
trend = np.linspace(0, 10, n)
cycle = 5 * np.sin(2 * np.pi * np.arange(n) / 30)
return 100 + trend + cycle + noise
def print_result(label: str, stat: float, pval: float, alpha: float = 0.05) -> None:
conclusion = "✅ 拒绝H₀" if pval < alpha else "❌ 不拒绝H₀"
print(f" {label:<22} stat={stat:>8.4f} p={pval:.4f} {conclusion}")
def mode_stats(r1: np.ndarray, r2: np.ndarray) -> None:
print(f"\n[{nexdo_time()}] 🔬 统计检验报告")
print(f" {'检验项目':<22} {'统计量':>12} {'p值':>8} 结论")
print(" " + "─" * 60)
stat, p = stats.normaltest(r1)
print_result("正态性检验(r1)", stat, p)
stat, p = stats.normaltest(r2)
print_result("正态性检验(r2)", stat, p)
stat, p = stats.ttest_ind(r1, r2)
print_result("独立t检验(均值差)", stat, p)
stat, p = stats.pearsonr(r1, r2)
print_result("Pearson相关性", stat, p)
stat, p = stats.spearmanr(r1, r2)
print_result("Spearman相关性", stat, p)
stat, p = stats.kstest(r1, "norm", args=(r1.mean(), r1.std()))
print_result("KS正态拟合(r1)", stat, p)
# 描述统计
print(f"\n 描述统计:")
for name, r in [("r1(低波动)", r1), ("r2(高波动)", r2)]:
desc = stats.describe(r)
print(f" {name}: 均值={desc.mean:+.4f} 方差={desc.variance:.6f} "
f"偏度={desc.skewness:.3f} 峰度={desc.kurtosis:.3f}")
def mode_signal(price: np.ndarray) -> None:
print(f"\n[{nexdo_time()}] 📡 信号处理分析")
# Butterworth 低通滤波
b, a = signal.butter(3, 0.1, btype="low")
filtered = signal.filtfilt(b, a, price)
residual = price - filtered
print(f" 原始价格: 均值={price.mean():.2f} 标准差={price.std():.2f}")
print(f" 滤波后: 均值={filtered.mean():.2f} 标准差={filtered.std():.2f}")
print(f" 残差: 均值={residual.mean():.4f} 标准差={residual.std():.2f}")
# FFT 频谱分析
n = len(price)
freqs = fft.fftfreq(n, d=1.0)
spectrum = np.abs(fft.fft(price - price.mean()))
pos_mask = freqs > 0
top_idx = np.argsort(spectrum[pos_mask])[-3:][::-1]
top_freqs = freqs[pos_mask][top_idx]
top_periods = 1.0 / top_freqs
print(f"\n FFT 主要周期成分(前3):")
print(f" {'频率':>10} {'周期(天)':>12} {'振幅':>10}")
print(" " + "─" * 36)
for f, T, amp in zip(top_freqs, top_periods, spectrum[pos_mask][top_idx]):
print(f" {f:>10.4f} {T:>12.1f} {amp:>10.1f}")
# 峰值检测
peaks, _ = signal.find_peaks(filtered, distance=20)
troughs, _ = signal.find_peaks(-filtered, distance=20)
print(f"\n 滤波后价格峰值: {len(peaks)} 个,谷值: {len(troughs)} 个")
# ASCII 滤波对比
print(f"\n 价格 vs 滤波(ASCII,最近40日)")
seg_raw = price[-40:]
seg_flt = filtered[-40:]
lo, hi = min(seg_raw.min(), seg_flt.min()), max(seg_raw.max(), seg_flt.max())
rows = 8
print(" " + "─" * 52)
for r in range(rows, 0, -1):
thr = lo + (hi - lo) * r / rows
raw_row = "".join("●" if v >= thr else " " for v in seg_raw)
flt_row = "".join("─" if v >= thr else " " for v in seg_flt)
merged = "".join(
"◆" if raw_row[i] != " " and flt_row[i] != " "
else raw_row[i] if raw_row[i] != " "
else flt_row[i]
for i in range(len(seg_raw))
)
print(f" {thr:>7.1f} │{merged}│")
print(" " + "─" * 52)
print(" ● 原始价格 ─ 滤波曲线 ◆ 重叠")
def mode_optimize(r1: np.ndarray, r2: np.ndarray) -> None:
print(f"\n[{nexdo_time()}] ⚡ 投资组合优化(最大化夏普比率)")
np.random.seed(42)
n_assets = 5
returns_matrix = np.column_stack([
r1, r2,
np.random.randn(len(r1)) * 0.012 + 0.0002,
np.random.randn(len(r1)) * 0.020 + 0.0004,
np.random.randn(len(r1)) * 0.018 + 0.0003,
])
def neg_sharpe(w: np.ndarray) -> float:
port = returns_matrix @ w
return -(port.mean() / port.std() * np.sqrt(252))
x0 = np.ones(n_assets) / n_assets
constraints = {"type": "eq", "fun": lambda w: w.sum() - 1}
bounds = [(0.0, 1.0)] * n_assets
result = optimize.minimize(neg_sharpe, x0, method="SLSQP",
constraints=constraints, bounds=bounds)
w_opt = result.x
port_ret = returns_matrix @ w_opt
sharpe = port_ret.mean() / port_ret.std() * np.sqrt(252)
print(f" 优化{'成功' if result.success else '失败'} 最大夏普比率: {sharpe:.4f}")
print(f"\n {'资产':<8} {'权重':>8} {'权重条形'}")
print(" " + "─" * 40)
assets = ["资产A", "资产B", "资产C", "资产D", "资产E"]
for name, w in zip(assets, w_opt):
bar = "█" * int(w * 30)
print(f" {name:<8} {w:>7.2%} {bar}")
def main() -> None:
parser = argparse.ArgumentParser(description="SciPy 金融统计分析")
parser.add_argument("--mode", choices=["stats", "signal", "optimize", "all"],
default="all", help="运行模式")
args = parser.parse_args()
r1, r2 = generate_data()
price = generate_price()
print(f"[{nexdo_time()}] 数据生成:{len(r1)} 个交易日")
if args.mode in ("stats", "all"):
mode_stats(r1, r2)
if args.mode in ("signal", "all"):
mode_signal(price)
if args.mode in ("optimize", "all"):
mode_optimize(r1, r2)
if __name__ == "__main__":
main()
$ python3 34-python-scipy.py --mode stats
[2026-04-18 05:30:52] 🔬 统计检验报告
检验项目 统计量 p值 结论
────────────────────────────────────────────────────────────
正态性检验 (group_a) 6.9686 0.0307 ⚠ 偏离正态
正态性检验 (group_b) 2.1234 0.3456 ✅ 正态分布
独立样本 t 检验 -0.4869 0.6266 无显著差异
Pearson 相关系数 0.0234 0.7123 无显著相关
$ python3 34-python-scipy.py --mode optimize
[2026-04-18 05:30:53] ⚡ 投资组合优化(最大化夏普比率)
资产 权重 年化收益 年化波动
──────────────────────────────────────
Asset1 0.2341 8.23% 14.56%
Asset2 0.1823 6.78% 12.34%
...
最优组合夏普比率: 0.8234
小结
| 概念 | 一句话记忆 |
|---|---|
stats.normaltest |
正态性检验,p>0.05 不能拒绝正态假设 |
stats.ttest_ind |
独立样本 t 检验,p<0.05 两组均值有显著差异 |
stats.pearsonr |
Pearson 相关系数,-1 到 1,越接近 ±1 越相关 |
signal.butter |
设计 Butterworth 滤波器,N=阶数,Wn=截止频率 |
signal.filtfilt |
零相位滤波,先正向再反向,消除相位延迟 |
signal.find_peaks |
找局部极值,distance 控制最小间距 |
optimize.minimize |
数值优化,求目标函数的最小值 |
| 夏普比率 | mean(r) / std(r) * sqrt(252),越高越好 |
⏱ NexDo Time(5 分钟)
挑战:用 scipy.stats.pearsonr 计算两只股票收益率的相关系数,并用 scipy.stats.spearmanr 计算 Spearman 秩相关系数,对比两者的差异。
具体步骤:
- 用
generate_data()生成两组收益率 pearsonr(r1, r2)计算线性相关系数spearmanr(r1, r2)计算秩相关系数(对异常值更鲁棒)- 打印两个相关系数和 p 值,解释差异
Don’t wait for next time, do it in the next moment.