32 · Pandas 实战:数据清洗、聚合与时间序列
🔗 知识图谱导航:阅读本文前,建议先掌握《31 · NumPy 实战:向量化计算与矩阵运算》中的 ndarray 基础——Pandas 的 DataFrame 底层是 NumPy 数组,理解 NumPy 有助于理解 Pandas 的性能特性。
运行环境:需要安装 Pandas 和 NumPy:
pip install pandas numpy。
极客解析:Pandas 的核心是 DataFrame——一个带标签的二维表格。数据分析的三个核心操作:① 清洗(去重/填充/截断);② 聚合(groupby/pivot_table);③ 时间序列(resample/rolling)。
Pandas 核心操作速查
import pandas as pd, numpy as np
df = pd.DataFrame({
"category": ["电子", "服装", "电子", "服装"],
"amount": [100.0, None, 200.0, 150.0],
"date": pd.date_range("2025-01-01", periods=4, freq="ME"),
})
# 数据清洗
df = df.drop_duplicates()
df["amount"] = df["amount"].fillna(df["amount"].median())
df["amount"] = df["amount"].clip(lower=0, upper=999)
# 聚合统计
print(df.groupby("category")["amount"].agg(["count", "sum", "mean"]))
# 时间序列月度汇总
print(df.set_index("date")["amount"].resample("ME").sum())
步步为营:核心逻辑自适应拆解
这一篇按真实数据分析流水线拆成 6 个小台阶:先造一张带问题的订单表,再清洗、打印报表、做分组聚合、看时间趋势,最后用 CLI 把整条流水线串起来。每个演示都补了 Mock 数据和 print(),复制运行就能看到结果。
Step 1:用 generate_orders 造一张带脏数据的订单表
痛点与机制:
generate_orders 是全篇的原料工厂:它造出一张像 Excel 一样的 DataFrame,列里有订单号、日期、品类、地区、金额、数量和用户。它还故意塞入缺失金额和重复行,就像真实业务里从多个系统导出的脏数据,先让问题出现,后面清洗才有意义。
核心源码(逐字来自文末完整源码):
def generate_orders(n: int = 500) -> pd.DataFrame:
"""生成模拟电商订单数据"""
np.random.seed(42)
categories = ["电子", "服装", "食品", "图书", "家居"]
regions = ["华东", "华南", "华北", "西南", "东北"]
dates = pd.date_range("2025-01-01", periods=365, freq="D")
df = pd.DataFrame({
"order_id": range(1001, 1001 + n),
"date": np.random.choice(dates, n),
"category": np.random.choice(categories, n),
"region": np.random.choice(regions, n),
"amount": np.round(np.random.exponential(200, n), 2),
"quantity": np.random.randint(1, 10, n),
"user_id": np.random.randint(1, 101, n),
})
# 注入缺失值和重复行
df.loc[np.random.choice(df.index, 30, replace=False), "amount"] = np.nan
df = pd.concat([df, df.iloc[:10]], ignore_index=True)
return df
可运行演示(补齐 Mock 数据与 print 反馈):
import numpy as np
import pandas as pd
def generate_orders(n: int = 500) -> pd.DataFrame:
"""生成模拟电商订单数据"""
np.random.seed(42)
categories = ["电子", "服装", "食品", "图书", "家居"]
regions = ["华东", "华南", "华北", "西南", "东北"]
dates = pd.date_range("2025-01-01", periods=365, freq="D")
df = pd.DataFrame({
"order_id": range(1001, 1001 + n),
"date": np.random.choice(dates, n),
"category": np.random.choice(categories, n),
"region": np.random.choice(regions, n),
"amount": np.round(np.random.exponential(200, n), 2),
"quantity": np.random.randint(1, 10, n),
"user_id": np.random.randint(1, 101, n),
})
# 注入缺失值和重复行
df.loc[np.random.choice(df.index, 30, replace=False), "amount"] = np.nan
df = pd.concat([df, df.iloc[:10]], ignore_index=True)
return df
df = generate_orders(n=80)
print("📦 订单表已生成")
print("形状:", df.shape, "= 原始80行 + 重复10行")
print("缺失金额:", df["amount"].isna().sum(), "个")
print("重复行:", df.duplicated().sum(), "行")
print(df.head(3).to_string(index=False))
Step 2:用 mode_clean 完成去重、补缺失和异常值截断
痛点与机制:
mode_clean 是数据分析里的洗菜池:先把重复行摘掉,再用中位数补缺失金额,最后用 clip 把极端金额压回合理范围。为什么用中位数?它不像均值那样容易被天价异常订单带偏,适合当“稳一点的替补值”。
核心源码(逐字来自文末完整源码):
def mode_clean(df: pd.DataFrame) -> pd.DataFrame:
print(f"\n[{nexdo_time()}] 🧹 数据清洗")
print(f" 原始行数: {len(df)} 缺失值: {df.isnull().sum().sum()} 重复行: {df.duplicated().sum()}")
df = df.drop_duplicates().reset_index(drop=True)
df["amount"] = df["amount"].fillna(df["amount"].median())
df["amount"] = df["amount"].clip(lower=1, upper=df["amount"].quantile(0.99))
df["date"] = pd.to_datetime(df["date"])
print(f" 清洗后行数: {len(df)} 缺失值: {df.isnull().sum().sum()}")
print_ascii_table({
"指标": ["总订单数", "唯一用户数", "金额均值", "金额中位数", "金额总计"],
"数值": [
len(df),
df["user_id"].nunique(),
f"{df['amount'].mean():.2f}",
f"{df['amount'].median():.2f}",
f"{df['amount'].sum():.2f}",
]
}, "数据质量报告")
return df
可运行演示(补齐 Mock 数据与 print 反馈):
import time
import numpy as np
import pandas as pd
def nexdo_time() -> str:
return time.strftime("%Y-%m-%d %H:%M:%S")
def mode_clean(df: pd.DataFrame) -> pd.DataFrame:
print(f"\n[{nexdo_time()}] 🧹 数据清洗")
print(f" 原始行数: {len(df)} 缺失值: {df.isnull().sum().sum()} 重复行: {df.duplicated().sum()}")
df = df.drop_duplicates().reset_index(drop=True)
df["amount"] = df["amount"].fillna(df["amount"].median())
df["amount"] = df["amount"].clip(lower=1, upper=df["amount"].quantile(0.99))
df["date"] = pd.to_datetime(df["date"])
print(f" 清洗后行数: {len(df)} 缺失值: {df.isnull().sum().sum()}")
return df
raw = pd.DataFrame({
"order_id": [1, 2, 2, 3],
"date": ["2025-01-01", "2025-01-02", "2025-01-02", "2025-01-03"],
"category": ["电子", "食品", "食品", "图书"],
"region": ["华东", "华南", "华南", "华北"],
"amount": [100.0, np.nan, np.nan, 99999.0],
"quantity": [1, 2, 2, 1],
"user_id": [10, 11, 11, 12],
})
clean = mode_clean(raw)
print("清洗后的金额:", clean["amount"].round(2).tolist())
Step 3:用 print_ascii_table 把结果变成可读报表
痛点与机制:
终端表格不是花活,而是给新手的反馈面板。print_ascii_table 把字典里的列转成整齐表格,就像把一堆散落小票贴进报表模板;每一步处理完都能看见结果,才知道脚本没有悄悄跑偏。
核心源码(逐字来自文末完整源码):
def print_ascii_table(data: dict[str, list], title: str) -> None:
"""打印 ASCII 表格"""
cols = list(data.keys())
rows = list(zip(*data.values()))
col_widths = [max(len(str(c)), max((len(str(r)) for r in [row[i] for row in rows]), default=0))
for i, c in enumerate(cols)]
sep = " +" + "+".join("-" * (w + 2) for w in col_widths) + "+"
header = " |" + "|".join(f" {c:<{w}} " for c, w in zip(cols, col_widths)) + "|"
print(f"\n {title}")
print(sep)
print(header)
print(sep)
for row in rows:
print(" |" + "|".join(f" {str(v):<{w}} " for v, w in zip(row, col_widths)) + "|")
print(sep)
可运行演示(补齐 Mock 数据与 print 反馈):
def print_ascii_table(data: dict[str, list], title: str) -> None:
"""打印 ASCII 表格"""
cols = list(data.keys())
rows = list(zip(*data.values()))
col_widths = [max(len(str(c)), max((len(str(r)) for r in [row[i] for row in rows]), default=0))
for i, c in enumerate(cols)]
sep = " +" + "+".join("-" * (w + 2) for w in col_widths) + "+"
header = " |" + "|".join(f" {c:<{w}} " for c, w in zip(cols, col_widths)) + "|"
print(f"\n {title}")
print(sep)
print(header)
print(sep)
for row in rows:
print(" |" + "|".join(f" {str(v):<{w}} " for v, w in zip(row, col_widths)) + "|")
print(sep)
print_ascii_table({
"品类": ["电子", "食品", "图书"],
"订单数": [12, 8, 5],
"总金额": [3280.5, 920.0, 450.0],
}, "品类销售统计")
Step 4:用 mode_group 回答“哪个品类卖得最好”
痛点与机制:
mode_group 做的是老板最常问的问题:“哪个品类卖得最好?哪个区域贡献最多?” groupby 像先把订单按品类分进不同篮子,再分别求数量、总金额和均值;pivot_table 则像 Excel 数据透视表,把品类放行、地区放列,交叉看销售额。
核心源码(逐字来自文末完整源码):
def mode_group(df: pd.DataFrame) -> None:
print(f"\n[{nexdo_time()}] 📦 分组统计")
grp = (df.groupby("category")["amount"]
.agg(订单数="count", 总金额="sum", 均值="mean", 最大值="max")
.round(2)
.reset_index()
.sort_values("总金额", ascending=False))
print_ascii_table({
"品类": grp["category"].tolist(),
"订单数": grp["订单数"].tolist(),
"总金额": grp["总金额"].tolist(),
"均值": grp["均值"].tolist(),
}, "品类销售统计")
pivot = df.pivot_table(values="amount", index="category",
columns="region", aggfunc="sum", fill_value=0).round(0)
print(f"\n 透视表:品类 × 区域 销售额")
print(" " + pivot.to_string())
可运行演示(补齐 Mock 数据与 print 反馈):
import time
import pandas as pd
def nexdo_time() -> str:
return time.strftime("%Y-%m-%d %H:%M:%S")
def print_ascii_table(data: dict[str, list], title: str) -> None:
cols = list(data.keys())
rows = list(zip(*data.values()))
widths = [max(len(str(c)), *(len(str(row[i])) for row in rows)) for i, c in enumerate(cols)]
print(f"\n {title}")
print(" |" + "|".join(f" {c:<{w}} " for c, w in zip(cols, widths)) + "|")
for row in rows:
print(" |" + "|".join(f" {str(v):<{w}} " for v, w in zip(row, widths)) + "|")
def mode_group(df: pd.DataFrame) -> None:
print(f"\n[{nexdo_time()}] 📦 分组统计")
grp = (df.groupby("category")["amount"]
.agg(订单数="count", 总金额="sum", 均值="mean", 最大值="max")
.round(2)
.reset_index()
.sort_values("总金额", ascending=False))
print_ascii_table({
"品类": grp["category"].tolist(),
"订单数": grp["订单数"].tolist(),
"总金额": grp["总金额"].tolist(),
"均值": grp["均值"].tolist(),
}, "品类销售统计")
pivot = df.pivot_table(values="amount", index="category",
columns="region", aggfunc="sum", fill_value=0).round(0)
print(f"\n 透视表:品类 × 区域 销售额")
print(" " + pivot.to_string())
df = pd.DataFrame({
"category": ["电子", "电子", "食品", "食品", "图书"],
"region": ["华东", "华南", "华东", "华南", "华东"],
"amount": [1000, 800, 120, 300, 90],
})
mode_group(df)
Step 5:用 mode_timeseries 看销售额的月度趋势
痛点与机制:
时间序列分析回答“销售额随时间怎么变”。resample("ME") 像把每天的小账本按月装订成册,rolling(7).mean() 则像看 7 天平均气温,能过滤单日波动,看清趋势。ASCII 折线图让脚本不依赖图片库也能在终端给出趋势反馈。
核心源码(逐字来自文末完整源码):
def mode_timeseries(df: pd.DataFrame) -> None:
print(f"\n[{nexdo_time()}] 📅 时间序列分析")
ts = df.set_index("date")["amount"].resample("ME").sum().round(2)
print(f"\n 月度销售额(ASCII 折线图)")
max_val = ts.max()
height = 8
print(" " + "─" * 50)
for h in range(height, 0, -1):
threshold = max_val * h / height
row = ""
for val in ts:
row += "█ " if val >= threshold else " "
print(f" {threshold:>8.0f} │{row}")
print(" " + " " * 10 + "─" * (len(ts) * 2))
months = [str(d)[:7] for d in ts.index]
print(" 月份: " + " ".join(m[5:] for m in months))
# 7日滚动均值
daily = df.set_index("date")["amount"].resample("D").sum().fillna(0)
rolling7 = daily.rolling(7).mean().dropna()
print(f"\n 7日滚动均值(最后7日):")
for d, v in rolling7.tail(7).items():
print(f" {str(d)[:10]}: {v:.2f}")
可运行演示(补齐 Mock 数据与 print 反馈):
import time
import pandas as pd
def nexdo_time() -> str:
return time.strftime("%Y-%m-%d %H:%M:%S")
def mode_timeseries(df: pd.DataFrame) -> None:
print(f"\n[{nexdo_time()}] 📅 时间序列分析")
ts = df.set_index("date")["amount"].resample("ME").sum().round(2)
print(f"\n 月度销售额(ASCII 折线图)")
max_val = ts.max()
height = 8
print(" " + "─" * 50)
for h in range(height, 0, -1):
threshold = max_val * h / height
row = ""
for val in ts:
row += "█ " if val >= threshold else " "
print(f" {threshold:>8.0f} │{row}")
print(" " + " " * 10 + "─" * (len(ts) * 2))
months = [str(d)[:7] for d in ts.index]
print(" 月份: " + " ".join(m[5:] for m in months))
# 7日滚动均值
daily = df.set_index("date")["amount"].resample("D").sum().fillna(0)
rolling7 = daily.rolling(7).mean().dropna()
print(f"\n 7日滚动均值(最后7日):")
for d, v in rolling7.tail(7).items():
print(f" {str(d)[:10]}: {v:.2f}")
df = pd.DataFrame({
"date": pd.to_datetime(["2025-01-01", "2025-01-10", "2025-02-01", "2025-02-15", "2025-03-03", "2025-03-20"]),
"amount": [100, 250, 300, 180, 420, 260],
})
mode_timeseries(df)
Step 6:用 main 做 clean/group/timeseries/all 脚本遥控器
痛点与机制:
main 是整篇脚本的遥控器:--mode clean/group/timeseries/all 决定跑哪一段。它先生成数据,再统一清洗,最后把干净数据交给聚合或时间序列模块,像流水线一样一站一站传下去。
核心源码(逐字来自文末完整源码):
def main() -> None:
parser = argparse.ArgumentParser(description="Pandas 电商订单分析")
parser.add_argument("--mode", choices=["clean", "group", "timeseries", "all"],
default="all", help="运行模式")
args = parser.parse_args()
df = generate_orders()
print(f"[{nexdo_time()}] 数据生成:{len(df)} 条订单记录")
df_clean = mode_clean(df)
if args.mode in ("group", "all"):
mode_group(df_clean)
if args.mode in ("timeseries", "all"):
mode_timeseries(df_clean)
if __name__ == "__main__":
main()
可运行演示(补齐 Mock 数据与 print 反馈):
import argparse
import sys
import time
import pandas as pd
def nexdo_time() -> str:
return time.strftime("%Y-%m-%d %H:%M:%S")
def generate_orders() -> pd.DataFrame:
return pd.DataFrame({
"date": pd.to_datetime(["2025-01-01", "2025-02-01"]),
"category": ["电子", "食品"],
"region": ["华东", "华南"],
"amount": [100.0, 200.0],
"quantity": [1, 2],
"user_id": [10, 11],
})
def mode_clean(df: pd.DataFrame) -> pd.DataFrame:
print("运行 clean:去重、补缺失、处理日期")
return df
def mode_group(df: pd.DataFrame) -> None:
print("运行 group:按品类和区域聚合", df["category"].tolist())
def mode_timeseries(df: pd.DataFrame) -> None:
print("运行 timeseries:按月份重采样", df["date"].dt.strftime("%Y-%m").tolist())
def main() -> None:
parser = argparse.ArgumentParser(description="Pandas 电商订单分析")
parser.add_argument("--mode", choices=["clean", "group", "timeseries", "all"],
default="all", help="运行模式")
args = parser.parse_args()
df = generate_orders()
print(f"[{nexdo_time()}] 数据生成:{len(df)} 条订单记录")
df_clean = mode_clean(df)
if args.mode in ("group", "all"):
mode_group(df_clean)
if args.mode in ("timeseries", "all"):
mode_timeseries(df_clean)
for mode in ["clean", "group", "timeseries", "all"]:
print(f"\n>>> python3 32-pandas-ecommerce.py --mode {mode}")
sys.argv = ["prog", "--mode", mode]
main()
极客实战:完整源码与运行
现在,把上面的积木拼起来,将以下完整代码放进你的编辑器,运行它。先看整体闭环,再回头逐段改参数,你会更容易建立工程直觉。
#!/usr/bin/env python3
"""
32-pandas-ecommerce.py — Pandas 电商订单数据清洗与聚合
用法:
python 32-pandas-ecommerce.py --mode clean
python 32-pandas-ecommerce.py --mode group
python 32-pandas-ecommerce.py --mode timeseries
"""
import argparse
import time
import numpy as np
import pandas as pd
def nexdo_time() -> str:
return time.strftime("%Y-%m-%d %H:%M:%S")
def generate_orders(n: int = 500) -> pd.DataFrame:
"""生成模拟电商订单数据"""
np.random.seed(42)
categories = ["电子", "服装", "食品", "图书", "家居"]
regions = ["华东", "华南", "华北", "西南", "东北"]
dates = pd.date_range("2025-01-01", periods=365, freq="D")
df = pd.DataFrame({
"order_id": range(1001, 1001 + n),
"date": np.random.choice(dates, n),
"category": np.random.choice(categories, n),
"region": np.random.choice(regions, n),
"amount": np.round(np.random.exponential(200, n), 2),
"quantity": np.random.randint(1, 10, n),
"user_id": np.random.randint(1, 101, n),
})
# 注入缺失值和重复行
df.loc[np.random.choice(df.index, 30, replace=False), "amount"] = np.nan
df = pd.concat([df, df.iloc[:10]], ignore_index=True)
return df
def print_ascii_table(data: dict[str, list], title: str) -> None:
"""打印 ASCII 表格"""
cols = list(data.keys())
rows = list(zip(*data.values()))
col_widths = [max(len(str(c)), max((len(str(r)) for r in [row[i] for row in rows]), default=0))
for i, c in enumerate(cols)]
sep = " +" + "+".join("-" * (w + 2) for w in col_widths) + "+"
header = " |" + "|".join(f" {c:<{w}} " for c, w in zip(cols, col_widths)) + "|"
print(f"\n {title}")
print(sep)
print(header)
print(sep)
for row in rows:
print(" |" + "|".join(f" {str(v):<{w}} " for v, w in zip(row, col_widths)) + "|")
print(sep)
def mode_clean(df: pd.DataFrame) -> pd.DataFrame:
print(f"\n[{nexdo_time()}] 🧹 数据清洗")
print(f" 原始行数: {len(df)} 缺失值: {df.isnull().sum().sum()} 重复行: {df.duplicated().sum()}")
df = df.drop_duplicates().reset_index(drop=True)
df["amount"] = df["amount"].fillna(df["amount"].median())
df["amount"] = df["amount"].clip(lower=1, upper=df["amount"].quantile(0.99))
df["date"] = pd.to_datetime(df["date"])
print(f" 清洗后行数: {len(df)} 缺失值: {df.isnull().sum().sum()}")
print_ascii_table({
"指标": ["总订单数", "唯一用户数", "金额均值", "金额中位数", "金额总计"],
"数值": [
len(df),
df["user_id"].nunique(),
f"{df['amount'].mean():.2f}",
f"{df['amount'].median():.2f}",
f"{df['amount'].sum():.2f}",
]
}, "数据质量报告")
return df
def mode_group(df: pd.DataFrame) -> None:
print(f"\n[{nexdo_time()}] 📦 分组统计")
grp = (df.groupby("category")["amount"]
.agg(订单数="count", 总金额="sum", 均值="mean", 最大值="max")
.round(2)
.reset_index()
.sort_values("总金额", ascending=False))
print_ascii_table({
"品类": grp["category"].tolist(),
"订单数": grp["订单数"].tolist(),
"总金额": grp["总金额"].tolist(),
"均值": grp["均值"].tolist(),
}, "品类销售统计")
pivot = df.pivot_table(values="amount", index="category",
columns="region", aggfunc="sum", fill_value=0).round(0)
print(f"\n 透视表:品类 × 区域 销售额")
print(" " + pivot.to_string())
def mode_timeseries(df: pd.DataFrame) -> None:
print(f"\n[{nexdo_time()}] 📅 时间序列分析")
ts = df.set_index("date")["amount"].resample("ME").sum().round(2)
print(f"\n 月度销售额(ASCII 折线图)")
max_val = ts.max()
height = 8
print(" " + "─" * 50)
for h in range(height, 0, -1):
threshold = max_val * h / height
row = ""
for val in ts:
row += "█ " if val >= threshold else " "
print(f" {threshold:>8.0f} │{row}")
print(" " + " " * 10 + "─" * (len(ts) * 2))
months = [str(d)[:7] for d in ts.index]
print(" 月份: " + " ".join(m[5:] for m in months))
# 7日滚动均值
daily = df.set_index("date")["amount"].resample("D").sum().fillna(0)
rolling7 = daily.rolling(7).mean().dropna()
print(f"\n 7日滚动均值(最后7日):")
for d, v in rolling7.tail(7).items():
print(f" {str(d)[:10]}: {v:.2f}")
def main() -> None:
parser = argparse.ArgumentParser(description="Pandas 电商订单分析")
parser.add_argument("--mode", choices=["clean", "group", "timeseries", "all"],
default="all", help="运行模式")
args = parser.parse_args()
df = generate_orders()
print(f"[{nexdo_time()}] 数据生成:{len(df)} 条订单记录")
df_clean = mode_clean(df)
if args.mode in ("group", "all"):
mode_group(df_clean)
if args.mode in ("timeseries", "all"):
mode_timeseries(df_clean)
if __name__ == "__main__":
main()
$ python3 32-python-pandas.py --mode clean
[2026-04-18 05:22:28] 🧹 数据清洗
原始行数: 510 缺失值: 30 重复行: 10
去重后: 500 行
填充缺失值: amount 中位数=189.45
截断异常值: 上限=599.12(99分位)
清洗完成: 500 行,0 个缺失值
$ python3 32-python-pandas.py --mode group
[2026-04-18 05:22:28] 📦 分组统计
类别 订单数 总金额 均值 最大值
──────────────────────────────────────────
图书 98 19234.5 196.3 598.1
家居 102 20156.8 197.6 597.4
...
$ python3 32-python-pandas.py --mode timeseries
[2026-04-18 05:22:29] 📅 时间序列分析
月度销售额(ASCII 折线图)
2025-01 ████████████████████ 12345.6
2025-02 ██████████████████ 11234.5
...
小结
| 概念 | 一句话记忆 |
|---|---|
drop_duplicates() |
去掉完全相同的行 |
fillna(median()) |
用中位数填充缺失值,比均值更抗异常值 |
clip(lower, upper) |
截断异常值,保留 99% 的数据 |
groupby + agg |
分组聚合,等价于 SQL 的 GROUP BY |
pivot_table |
交叉统计,等价于 Excel 数据透视表 |
resample("ME") |
按月末重采样,时间序列聚合 |
rolling(window=N) |
N 期滚动统计,平滑波动 |
set_index("date") |
把日期列设为索引,才能用 resample |
⏱ NexDo Time(5 分钟)
挑战:用 Pandas 计算每个用户的 RFM 指标(Recency/Frequency/Monetary)。
具体步骤:
Recency:每个用户最近一次下单距今多少天(df.groupby("user_id")["date"].max())Frequency:每个用户的订单数(groupby("user_id").size())Monetary:每个用户的总消费金额(groupby("user_id")["amount"].sum())- 用
pd.concat把三个 Series 合并成一个 DataFrame - 打印 RFM 最高的 5 个用户
Don’t wait for next time, do it in the next moment.