5 · 文本清洗:字符串高阶操作与正则
🔗 知识图谱导航:阅读本文前,建议先掌握/回顾 《4 · 任务图谱基础:列表、字典与集合》 中的核心概念;本文会在这个基础上继续推进。 承上启下:上一篇我们用数据结构构建了任务图谱。现在处理最常见的原始数据形态——非结构化文本。日志解析、数据清洗、配置提取,都离不开字符串操作与正则。
极客解析:先把数据流、控制流和模块边界跑通,再谈抽象;每段代码都围绕一个可执行 CLI 闭环展开。
痛点与架构
原始日志文本(混乱格式)
↓ str 方法(strip/split/replace)
初步清洗
↓ re.compile + 命名分组
结构化提取
↓ f-string 格式化输出
可读报告
| 工具 | 适用场景 | 性能 |
|---|---|---|
str 方法 |
简单分割、替换、判断 | 最快 |
re 模块 |
复杂模式匹配、提取 | 中等 |
re.compile |
同一模式多次使用 | 编译后更快 |
实战演练场
步步为营:核心逻辑自适应拆解
这一篇的目标很明确:把一堆看起来乱糟糟的日志文本,洗成 Python 能理解的结构化数据。不要急着吞完整脚本,我们先按“原料 -> 规则 -> 数据盒子 -> 单行解析 -> 批量处理 -> 展示统计”的顺序拆开。
Step 1:先准备一篮子脏日志,让脚本有东西可洗
痛点与机制:
日志清洗的第一步不是写正则,而是先准备可重复的样本。你可以把 RAW_LOGS 想成一篮子待清洗的菜:有新鲜的正常日志,也故意混入空白行和坏行。这样脚本每次运行都能复现同样结果,新手不用先搭服务器、造文件、接数据库。
核心源码(逐字来自文末完整源码):
RAW_LOGS: list[str] = [
"2026-04-16 10:01:23 INFO [auth] 用户登录成功 user_id=1001 ip=192.168.1.10",
"2026-04-16 10:02:11 ERROR [db] 数据库连接超时 host=mysql:3306 retry=1",
"2026-04-16 10:03:45 INFO [api] GET /api/tasks 200 45ms",
"2026-04-16 10:04:02 ERROR [db] 数据库连接超时 host=mysql:3306 retry=2",
"2026-04-16 10:05:18 WARN [memory] 内存使用率 85% threshold=80%",
"2026-04-16 10:06:33 ERROR [file] 文件不存在 path=/data/config.json",
"2026-04-16 10:07:01 INFO [auth] 用户登录成功 user_id=1002 ip=10.0.0.5",
"2026-04-16 10:08:44 ERROR [db] 连接池耗尽 pool_size=10 waiting=23",
"2026-04-16 10:09:12 INFO [api] POST /api/tasks 201 120ms",
"2026-04-16 10:10:55 INFO [scheduler] 定时任务完成 task=backup duration=3.2s",
" ", # 空白行
"MALFORMED LINE WITHOUT TIMESTAMP", # 格式错误行
]
可运行演示(补齐 Mock 数据与 print 反馈):
RAW_LOGS: list[str] = [
"2026-04-16 10:01:23 INFO [auth] 用户登录成功 user_id=1001 ip=192.168.1.10",
"2026-04-16 10:02:11 ERROR [db] 数据库连接超时 host=mysql:3306 retry=1",
"2026-04-16 10:03:45 INFO [api] GET /api/tasks 200 45ms",
"2026-04-16 10:04:02 ERROR [db] 数据库连接超时 host=mysql:3306 retry=2",
"2026-04-16 10:05:18 WARN [memory] 内存使用率 85% threshold=80%",
"2026-04-16 10:06:33 ERROR [file] 文件不存在 path=/data/config.json",
"2026-04-16 10:07:01 INFO [auth] 用户登录成功 user_id=1002 ip=10.0.0.5",
"2026-04-16 10:08:44 ERROR [db] 连接池耗尽 pool_size=10 waiting=23",
"2026-04-16 10:09:12 INFO [api] POST /api/tasks 201 120ms",
"2026-04-16 10:10:55 INFO [scheduler] 定时任务完成 task=backup duration=3.2s",
" ", # 空白行
"MALFORMED LINE WITHOUT TIMESTAMP", # 格式错误行
]
print(f"一共准备了 {len(RAW_LOGS)} 行原始日志")
print("第一行正常日志:", RAW_LOGS[0])
print("最后一行故意放坏数据:", RAW_LOGS[-1])
Step 2:用命名分组正则给日志贴标签
痛点与机制:
这里的 (?P<timestamp>...) 是“带名字的捕获框”。普通正则只告诉你匹配到了什么,命名分组会顺手贴上字段名,就像快递面单上写清楚姓名、电话、地址。后面的 KV_PATTERN 专门从消息里捞 user_id=1001 这种键值对。
核心源码(逐字来自文末完整源码):
LOG_PATTERN = re.compile(
r"(?P<timestamp>\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2})\s+"
r"(?P<level>INFO|WARN|ERROR|DEBUG)\s+"
r"\[(?P<module>\w+)\]\s+"
r"(?P<message>.+)"
)
KV_PATTERN = re.compile(r"(\w+)=([^\s]+)")
可运行演示(补齐 Mock 数据与 print 反馈):
import re
# Step 2:正则像一把带标签的尺子,timestamp/level/module/message 都会被量出来。
LOG_PATTERN = re.compile(
r"(?P<timestamp>\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2})\s+"
r"(?P<level>INFO|WARN|ERROR|DEBUG)\s+"
r"\[(?P<module>\w+)\]\s+"
r"(?P<message>.+)"
)
KV_PATTERN = re.compile(r"(\w+)=([^\s]+)")
sample = "2026-04-16 10:02:11 ERROR [db] 数据库连接超时 host=mysql:3306 retry=1"
match = LOG_PATTERN.match(sample)
print("原始日志:", sample)
print("是否匹配成功:", bool(match))
if match:
print("拆出来的字段:", match.groupdict())
print("额外的 key=value:", dict(KV_PATTERN.findall(match.group("message"))))
Step 3:用 dataclass 做一个标准日志收纳盒
痛点与机制:
LogEntry 的作用是把“散落的字符串”装成一个有固定格子的盒子。以后你想取模块就用 entry.module,想取 IP 就看 entry.kv_pairs,不用在一长串文本里反复切来切去。
核心源码(逐字来自文末完整源码):
@dataclass
class LogEntry:
timestamp: str
level: str
module: str
message: str
kv_pairs: dict[str, str] # 提取的 key=value 对
可运行演示(补齐 Mock 数据与 print 反馈):
from dataclasses import dataclass
# Step 3:dataclass 像一个标准收纳盒,字段名和字段类型都提前贴好标签。
@dataclass
class LogEntry:
timestamp: str
level: str
module: str
message: str
kv_pairs: dict[str, str] # 提取的 key=value 对
entry = LogEntry(
timestamp="2026-04-16 10:01:23",
level="INFO",
module="auth",
message="用户登录成功 user_id=1001 ip=192.168.1.10",
kv_pairs={"user_id": "1001", "ip": "192.168.1.10"},
)
print("完整对象:", entry)
print("只取模块:", entry.module)
print("只取 IP:", entry.kv_pairs["ip"])
Step 4:把一行日志送进安检口,成功才变成对象
痛点与机制:
parse_log_line() 是单行日志的“安检口”:空行直接放弃,格式对不上也返回 None,格式正确才会变成 LogEntry。这个设计能避免坏数据把后续统计代码拖崩。
核心源码(逐字来自文末完整源码):
def parse_log_line(line: str) -> LogEntry | None:
"""解析单行日志,失败返回 None。"""
line = line.strip()
if not line:
return None
m = LOG_PATTERN.match(line)
if not m:
return None
kv_pairs = dict(KV_PATTERN.findall(m.group("message")))
return LogEntry(
timestamp=m.group("timestamp"),
level=m.group("level"),
module=m.group("module"),
message=m.group("message"),
kv_pairs=kv_pairs,
)
可运行演示(补齐 Mock 数据与 print 反馈):
import re
from dataclasses import dataclass
@dataclass
class LogEntry:
timestamp: str
level: str
module: str
message: str
kv_pairs: dict[str, str] # 提取的 key=value 对
LOG_PATTERN = re.compile(
r"(?P<timestamp>\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2})\s+"
r"(?P<level>INFO|WARN|ERROR|DEBUG)\s+"
r"\[(?P<module>\w+)\]\s+"
r"(?P<message>.+)"
)
KV_PATTERN = re.compile(r"(\w+)=([^\s]+)")
# Step 4:这个函数只负责一件事:把“一行文本”变成“一个 LogEntry”。
def parse_log_line(line: str) -> LogEntry | None:
"""解析单行日志,失败返回 None。"""
line = line.strip()
if not line:
return None
m = LOG_PATTERN.match(line)
if not m:
return None
kv_pairs = dict(KV_PATTERN.findall(m.group("message")))
return LogEntry(
timestamp=m.group("timestamp"),
level=m.group("level"),
module=m.group("module"),
message=m.group("message"),
kv_pairs=kv_pairs,
)
ok_line = "2026-04-16 10:01:23 INFO [auth] 用户登录成功 user_id=1001 ip=192.168.1.10"
bad_line = "MALFORMED LINE WITHOUT TIMESTAMP"
print("正常行解析:", parse_log_line(ok_line))
print("坏行解析:", parse_log_line(bad_line))
Step 5:批量解析时统计坏行,别让一条脏数据拖垮全局
痛点与机制:
真实日志里一定有坏行。初学者常见错误是遇到一行异常就让程序崩掉;这里用 failed 计数,等于在工厂流水线上设置“不合格品箱”,坏件被记录,正常件继续往前走。
核心源码(逐字来自文末完整源码):
def parse_all(raw: list[str]) -> tuple[list[LogEntry], int]:
"""解析全部日志,返回(成功列表,失败数量)。"""
entries, failed = [], 0
for line in raw:
entry = parse_log_line(line)
if entry:
entries.append(entry)
elif line.strip(): # 非空但解析失败
failed += 1
return entries, failed
可运行演示(补齐 Mock 数据与 print 反馈):
import re
from dataclasses import dataclass
RAW_LOGS: list[str] = [
"2026-04-16 10:01:23 INFO [auth] 用户登录成功 user_id=1001 ip=192.168.1.10",
"2026-04-16 10:02:11 ERROR [db] 数据库连接超时 host=mysql:3306 retry=1",
"2026-04-16 10:03:45 INFO [api] GET /api/tasks 200 45ms",
"2026-04-16 10:04:02 ERROR [db] 数据库连接超时 host=mysql:3306 retry=2",
"2026-04-16 10:05:18 WARN [memory] 内存使用率 85% threshold=80%",
"2026-04-16 10:06:33 ERROR [file] 文件不存在 path=/data/config.json",
"2026-04-16 10:07:01 INFO [auth] 用户登录成功 user_id=1002 ip=10.0.0.5",
"2026-04-16 10:08:44 ERROR [db] 连接池耗尽 pool_size=10 waiting=23",
"2026-04-16 10:09:12 INFO [api] POST /api/tasks 201 120ms",
"2026-04-16 10:10:55 INFO [scheduler] 定时任务完成 task=backup duration=3.2s",
" ", # 空白行
"MALFORMED LINE WITHOUT TIMESTAMP", # 格式错误行
]
@dataclass
class LogEntry:
timestamp: str
level: str
module: str
message: str
kv_pairs: dict[str, str] # 提取的 key=value 对
LOG_PATTERN = re.compile(
r"(?P<timestamp>\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2})\s+"
r"(?P<level>INFO|WARN|ERROR|DEBUG)\s+"
r"\[(?P<module>\w+)\]\s+"
r"(?P<message>.+)"
)
KV_PATTERN = re.compile(r"(\w+)=([^\s]+)")
def parse_log_line(line: str) -> LogEntry | None:
"""解析单行日志,失败返回 None。"""
line = line.strip()
if not line:
return None
m = LOG_PATTERN.match(line)
if not m:
return None
kv_pairs = dict(KV_PATTERN.findall(m.group("message")))
return LogEntry(
timestamp=m.group("timestamp"),
level=m.group("level"),
module=m.group("module"),
message=m.group("message"),
kv_pairs=kv_pairs,
)
# Step 5:批量处理像流水线,坏件要计数,但不能让整条流水线停掉。
def parse_all(raw: list[str]) -> tuple[list[LogEntry], int]:
"""解析全部日志,返回(成功列表,失败数量)。"""
entries, failed = [], 0
for line in raw:
entry = parse_log_line(line)
if entry:
entries.append(entry)
elif line.strip(): # 非空但解析失败
failed += 1
return entries, failed
entries, failed = parse_all(RAW_LOGS)
print(f"成功解析: {len(entries)} 条")
print(f"格式错误: {failed} 条")
print("前三条结构化结果:")
for item in entries[:3]:
print(f"- {item.timestamp} | {item.level} | {item.module} | {item.message}")
Step 6:用字符串方法和 f-string 把输出排整齐
痛点与机制:
strip() 去掉首尾空白,split() 拆分,replace() 替换,f-string 负责把结果排版。把它们想成办公桌上的剪刀、胶水和尺子:很多简单文本清洗不需要正则,字符串方法就足够快、足够清楚。
核心源码(逐字来自文末完整源码):
def demo_str_methods() -> None:
print("\n ── 字符串方法速查 ────────────────────────")
s = " Hello, Python 3.12! "
ops: list[tuple[str, str]] = [
("strip()", repr(s.strip())),
("lower()", repr(s.strip().lower())),
("upper()", repr(s.strip().upper())),
("replace('Python','Go')", repr(s.replace("Python", "Go"))),
("split(',')", repr(s.strip().split(","))),
("startswith(' H')", repr(s.startswith(" H"))),
("find('Python')", repr(s.find("Python"))),
("f-string 对齐", f"'{s.strip():^30}'"),
("f-string 千位符", f"{1_234_567:,}"),
("f-string 精度", f"{3.14159:.2f}"),
]
for name, result in ops:
print(f" {name:<28} → {result}")
可运行演示(补齐 Mock 数据与 print 反馈):
# Step 6:字符串方法是文本处理的基础工具箱,先会这些,再上正则。
def demo_str_methods() -> None:
print("\n ── 字符串方法速查 ────────────────────────")
s = " Hello, Python 3.12! "
ops: list[tuple[str, str]] = [
("strip()", repr(s.strip())),
("lower()", repr(s.strip().lower())),
("upper()", repr(s.strip().upper())),
("replace('Python','Go')", repr(s.replace("Python", "Go"))),
("split(',')", repr(s.strip().split(","))),
("startswith(' H')", repr(s.startswith(" H"))),
("find('Python')", repr(s.find("Python"))),
("f-string 对齐", f"'{s.strip():^30}'"),
("f-string 千位符", f"{1_234_567:,}"),
("f-string 精度", f"{3.14159:.2f}"),
]
for name, result in ops:
print(f" {name:<28} → {result}")
demo_str_methods()
Step 7:用正则完成 IP 提取、URL 拆解和隐私脱敏
痛点与机制:
正则不是越复杂越好,而是专门解决“按模式找东西”的问题。IP 像门牌号,URL 像一张快递路线单,脱敏则像给敏感位置打马赛克。运行这一步,你能看到提取、拆字段、替换三种最常见用法。
核心源码(逐字来自文末完整源码):
def demo_regex() -> None:
print("\n ── 正则表达式演示 ────────────────────────")
# 1. 提取 IP 地址
text = "服务器 192.168.1.10 和 10.0.0.5 均已上线"
ips = re.findall(r"\b\d{1,3}(?:\.\d{1,3}){3}\b", text)
print(f" 提取IP: {ips}")
# 2. 命名分组
url = "https://api.example.com:8080/v1/tasks?page=2"
url_pattern = re.compile(
r"(?P<scheme>https?)://(?P<host>[^:/]+)(?::(?P<port>\d+))?"
r"(?P<path>/[^?]*)(?:\?(?P<query>.+))?"
)
m = url_pattern.match(url)
if m:
for k, v in m.groupdict().items():
print(f" {k:<8} → {v}")
# 3. 替换(脱敏)
log_line = "用户 user_id=1001 ip=192.168.1.10 登录"
desensitized = re.sub(r"\b\d{1,3}(?:\.\d{1,3}){3}\b", "***.***.***", log_line)
print(f"\n 脱敏前: {log_line}")
print(f" 脱敏后: {desensitized}")
可运行演示(补齐 Mock 数据与 print 反馈):
import re
# Step 7:正则适合“形状固定但内容变化”的文本,比如 IP、URL、手机号。
def demo_regex() -> None:
print("\n ── 正则表达式演示 ────────────────────────")
# 1. 提取 IP 地址
text = "服务器 192.168.1.10 和 10.0.0.5 均已上线"
ips = re.findall(r"\b\d{1,3}(?:\.\d{1,3}){3}\b", text)
print(f" 提取IP: {ips}")
# 2. 命名分组
url = "https://api.example.com:8080/v1/tasks?page=2"
url_pattern = re.compile(
r"(?P<scheme>https?)://(?P<host>[^:/]+)(?::(?P<port>\d+))?"
r"(?P<path>/[^?]*)(?:\?(?P<query>.+))?"
)
m = url_pattern.match(url)
if m:
for k, v in m.groupdict().items():
print(f" {k:<8} → {v}")
# 3. 替换(脱敏)
log_line = "用户 user_id=1001 ip=192.168.1.10 登录"
desensitized = re.sub(r"\b\d{1,3}(?:\.\d{1,3}){3}\b", "***.***.***", log_line)
print(f"\n 脱敏前: {log_line}")
print(f" 脱敏后: {desensitized}")
demo_regex()
Step 8:把结构化日志变成表格和统计图条
痛点与机制:
demo_extract() 负责列表展示,demo_stats() 负责汇总统计。一个给你看明细,一个给你看全局趋势。Counter 就像自动点票器,会把 INFO、WARN、ERROR 各出现几次数清楚,再用字符条做一个简单的终端版小图表。
核心源码(逐字来自文末完整源码):
def demo_extract(entries: list[LogEntry], keyword: str | None) -> None:
filtered = entries
if keyword:
filtered = [e for e in entries if keyword.upper() in e.level]
print(f"\n ── 结构化日志({'全部' if not keyword else keyword})─────────")
print(f" {'时间':<20} {'级别':<6} {'模块':<12} {'消息'}")
print(f" {'─'*20} {'─'*6} {'─'*12} {'─'*30}")
for e in filtered:
level_icon = {"INFO": "ℹ️ ", "WARN": "⚠️ ", "ERROR": "🔴"}.get(e.level, " ")
print(f" {e.timestamp:<20} {level_icon}{e.level:<4} [{e.module}]{'':<6} {e.message[:40]}")
def demo_stats(entries: list[LogEntry]) -> None:
level_counts = Counter(e.level for e in entries)
module_counts = Counter(e.module for e in entries)
print("\n ── 日志统计 ──────────────────────────────")
print(" 按级别:")
for level, count in sorted(level_counts.items()):
bar = "█" * count
print(f" {level:<6} {count:>3} {bar}")
print("\n 按模块:")
for module, count in module_counts.most_common():
print(f" [{module}]{'':<8} {count} 条")
可运行演示(补齐 Mock 数据与 print 反馈):
import re
from collections import Counter
from dataclasses import dataclass
RAW_LOGS: list[str] = [
"2026-04-16 10:01:23 INFO [auth] 用户登录成功 user_id=1001 ip=192.168.1.10",
"2026-04-16 10:02:11 ERROR [db] 数据库连接超时 host=mysql:3306 retry=1",
"2026-04-16 10:03:45 INFO [api] GET /api/tasks 200 45ms",
"2026-04-16 10:04:02 ERROR [db] 数据库连接超时 host=mysql:3306 retry=2",
"2026-04-16 10:05:18 WARN [memory] 内存使用率 85% threshold=80%",
"2026-04-16 10:06:33 ERROR [file] 文件不存在 path=/data/config.json",
"2026-04-16 10:07:01 INFO [auth] 用户登录成功 user_id=1002 ip=10.0.0.5",
"2026-04-16 10:08:44 ERROR [db] 连接池耗尽 pool_size=10 waiting=23",
"2026-04-16 10:09:12 INFO [api] POST /api/tasks 201 120ms",
"2026-04-16 10:10:55 INFO [scheduler] 定时任务完成 task=backup duration=3.2s",
" ", # 空白行
"MALFORMED LINE WITHOUT TIMESTAMP", # 格式错误行
]
@dataclass
class LogEntry:
timestamp: str
level: str
module: str
message: str
kv_pairs: dict[str, str] # 提取的 key=value 对
LOG_PATTERN = re.compile(
r"(?P<timestamp>\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2})\s+"
r"(?P<level>INFO|WARN|ERROR|DEBUG)\s+"
r"\[(?P<module>\w+)\]\s+"
r"(?P<message>.+)"
)
KV_PATTERN = re.compile(r"(\w+)=([^\s]+)")
def parse_log_line(line: str) -> LogEntry | None:
"""解析单行日志,失败返回 None。"""
line = line.strip()
if not line:
return None
m = LOG_PATTERN.match(line)
if not m:
return None
kv_pairs = dict(KV_PATTERN.findall(m.group("message")))
return LogEntry(
timestamp=m.group("timestamp"),
level=m.group("level"),
module=m.group("module"),
message=m.group("message"),
kv_pairs=kv_pairs,
)
def parse_all(raw: list[str]) -> tuple[list[LogEntry], int]:
"""解析全部日志,返回(成功列表,失败数量)。"""
entries, failed = [], 0
for line in raw:
entry = parse_log_line(line)
if entry:
entries.append(entry)
elif line.strip(): # 非空但解析失败
failed += 1
return entries, failed
# Step 8:最后把“能用的数据”变成“人能看懂的报告”。
def demo_extract(entries: list[LogEntry], keyword: str | None) -> None:
filtered = entries
if keyword:
filtered = [e for e in entries if keyword.upper() in e.level]
print(f"\n ── 结构化日志({'全部' if not keyword else keyword})─────────")
print(f" {'时间':<20} {'级别':<6} {'模块':<12} {'消息'}")
print(f" {'─'*20} {'─'*6} {'─'*12} {'─'*30}")
for e in filtered:
level_icon = {"INFO": "ℹ️ ", "WARN": "⚠️ ", "ERROR": "🔴"}.get(e.level, " ")
print(f" {e.timestamp:<20} {level_icon}{e.level:<4} [{e.module}]{'':<6} {e.message[:40]}")
def demo_stats(entries: list[LogEntry]) -> None:
level_counts = Counter(e.level for e in entries)
module_counts = Counter(e.module for e in entries)
print("\n ── 日志统计 ──────────────────────────────")
print(" 按级别:")
for level, count in sorted(level_counts.items()):
bar = "█" * count
print(f" {level:<6} {count:>3} {bar}")
print("\n 按模块:")
for module, count in module_counts.most_common():
print(f" [{module}]{'':<8} {count} 条")
entries, failed = parse_all(RAW_LOGS)
print(f"解析完成: {len(entries)} 条成功,{failed} 条格式错误")
demo_extract(entries, keyword="ERROR")
demo_stats(entries)
极客实战:完整源码与运行
现在,把上面的积木拼起来,将以下完整代码放进你的编辑器,运行它。先看整体闭环,再回头逐段改参数,你会更容易建立工程直觉。
# log_cleaner.py
"""
日志清洗与结构化提取工具。
用法:
python3 log_cleaner.py
python3 log_cleaner.py --mode extract
python3 log_cleaner.py --mode stats
python3 log_cleaner.py --keyword ERROR
"""
import argparse
import re
from collections import Counter
from dataclasses import dataclass
from datetime import datetime
# ── Mock 日志数据(零外部依赖)────────────────────────────────
RAW_LOGS: list[str] = [
"2026-04-16 10:01:23 INFO [auth] 用户登录成功 user_id=1001 ip=192.168.1.10",
"2026-04-16 10:02:11 ERROR [db] 数据库连接超时 host=mysql:3306 retry=1",
"2026-04-16 10:03:45 INFO [api] GET /api/tasks 200 45ms",
"2026-04-16 10:04:02 ERROR [db] 数据库连接超时 host=mysql:3306 retry=2",
"2026-04-16 10:05:18 WARN [memory] 内存使用率 85% threshold=80%",
"2026-04-16 10:06:33 ERROR [file] 文件不存在 path=/data/config.json",
"2026-04-16 10:07:01 INFO [auth] 用户登录成功 user_id=1002 ip=10.0.0.5",
"2026-04-16 10:08:44 ERROR [db] 连接池耗尽 pool_size=10 waiting=23",
"2026-04-16 10:09:12 INFO [api] POST /api/tasks 201 120ms",
"2026-04-16 10:10:55 INFO [scheduler] 定时任务完成 task=backup duration=3.2s",
" ", # 空白行
"MALFORMED LINE WITHOUT TIMESTAMP", # 格式错误行
]
# ── 数据类:结构化日志条目 ────────────────────────────────────
@dataclass
class LogEntry:
timestamp: str
level: str
module: str
message: str
kv_pairs: dict[str, str] # 提取的 key=value 对
# ── 编译正则(复用,性能更好)────────────────────────────────
LOG_PATTERN = re.compile(
r"(?P<timestamp>\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2})\s+"
r"(?P<level>INFO|WARN|ERROR|DEBUG)\s+"
r"\[(?P<module>\w+)\]\s+"
r"(?P<message>.+)"
)
KV_PATTERN = re.compile(r"(\w+)=([^\s]+)")
def parse_log_line(line: str) -> LogEntry | None:
"""解析单行日志,失败返回 None。"""
line = line.strip()
if not line:
return None
m = LOG_PATTERN.match(line)
if not m:
return None
kv_pairs = dict(KV_PATTERN.findall(m.group("message")))
return LogEntry(
timestamp=m.group("timestamp"),
level=m.group("level"),
module=m.group("module"),
message=m.group("message"),
kv_pairs=kv_pairs,
)
def parse_all(raw: list[str]) -> tuple[list[LogEntry], int]:
"""解析全部日志,返回(成功列表,失败数量)。"""
entries, failed = [], 0
for line in raw:
entry = parse_log_line(line)
if entry:
entries.append(entry)
elif line.strip(): # 非空但解析失败
failed += 1
return entries, failed
# ── 字符串操作演示 ────────────────────────────────────────────
def demo_str_methods() -> None:
print("\n ── 字符串方法速查 ────────────────────────")
s = " Hello, Python 3.12! "
ops: list[tuple[str, str]] = [
("strip()", repr(s.strip())),
("lower()", repr(s.strip().lower())),
("upper()", repr(s.strip().upper())),
("replace('Python','Go')", repr(s.replace("Python", "Go"))),
("split(',')", repr(s.strip().split(","))),
("startswith(' H')", repr(s.startswith(" H"))),
("find('Python')", repr(s.find("Python"))),
("f-string 对齐", f"'{s.strip():^30}'"),
("f-string 千位符", f"{1_234_567:,}"),
("f-string 精度", f"{3.14159:.2f}"),
]
for name, result in ops:
print(f" {name:<28} → {result}")
# ── 正则演示 ──────────────────────────────────────────────────
def demo_regex() -> None:
print("\n ── 正则表达式演示 ────────────────────────")
# 1. 提取 IP 地址
text = "服务器 192.168.1.10 和 10.0.0.5 均已上线"
ips = re.findall(r"\b\d{1,3}(?:\.\d{1,3}){3}\b", text)
print(f" 提取IP: {ips}")
# 2. 命名分组
url = "https://api.example.com:8080/v1/tasks?page=2"
url_pattern = re.compile(
r"(?P<scheme>https?)://(?P<host>[^:/]+)(?::(?P<port>\d+))?"
r"(?P<path>/[^?]*)(?:\?(?P<query>.+))?"
)
m = url_pattern.match(url)
if m:
for k, v in m.groupdict().items():
print(f" {k:<8} → {v}")
# 3. 替换(脱敏)
log_line = "用户 user_id=1001 ip=192.168.1.10 登录"
desensitized = re.sub(r"\b\d{1,3}(?:\.\d{1,3}){3}\b", "***.***.***", log_line)
print(f"\n 脱敏前: {log_line}")
print(f" 脱敏后: {desensitized}")
# ── 日志分析 ──────────────────────────────────────────────────
def demo_extract(entries: list[LogEntry], keyword: str | None) -> None:
filtered = entries
if keyword:
filtered = [e for e in entries if keyword.upper() in e.level]
print(f"\n ── 结构化日志({'全部' if not keyword else keyword})─────────")
print(f" {'时间':<20} {'级别':<6} {'模块':<12} {'消息'}")
print(f" {'─'*20} {'─'*6} {'─'*12} {'─'*30}")
for e in filtered:
level_icon = {"INFO": "ℹ️ ", "WARN": "⚠️ ", "ERROR": "🔴"}.get(e.level, " ")
print(f" {e.timestamp:<20} {level_icon}{e.level:<4} [{e.module}]{'':<6} {e.message[:40]}")
def demo_stats(entries: list[LogEntry]) -> None:
level_counts = Counter(e.level for e in entries)
module_counts = Counter(e.module for e in entries)
print("\n ── 日志统计 ──────────────────────────────")
print(" 按级别:")
for level, count in sorted(level_counts.items()):
bar = "█" * count
print(f" {level:<6} {count:>3} {bar}")
print("\n 按模块:")
for module, count in module_counts.most_common():
print(f" [{module}]{'':<8} {count} 条")
def main() -> None:
parser = argparse.ArgumentParser(description="日志清洗与结构化提取工具")
parser.add_argument("--mode", choices=["str", "regex", "extract", "stats", "all"], default="all")
parser.add_argument("--keyword", help="按级别关键词过滤(如 ERROR)")
args = parser.parse_args()
entries, failed = parse_all(RAW_LOGS)
print(f"\n 解析完成: {len(entries)} 条成功,{failed} 条格式错误")
if args.mode in ("str", "all"):
demo_str_methods()
if args.mode in ("regex", "all"):
demo_regex()
if args.mode in ("extract", "all"):
demo_extract(entries, args.keyword)
if args.mode in ("stats", "all"):
demo_stats(entries)
if __name__ == "__main__":
main()
终端预期输出(--keyword ERROR):
$ python3 log_cleaner.py --mode extract --keyword ERROR
解析完成: 10 条成功,1 条格式错误
── 结构化日志(ERROR)─────────
时间 级别 模块 消息
──────────────────── ────── ──────────── ──────────────────────────────
2026-04-16 10:02:11 🔴ERROR [db] 数据库连接超时 host=mysql:3306
2026-04-16 10:04:02 🔴ERROR [db] 数据库连接超时 host=mysql:3306
2026-04-16 10:06:33 🔴ERROR [file] 文件不存在 path=/data/config.json
2026-04-16 10:08:44 🔴ERROR [db] 连接池耗尽 pool_size=10 waiting=23
正则速查表
| 元字符 | 含义 | 示例 |
|---|---|---|
\d |
数字 | \d{4} 匹配4位数字 |
\w |
字母/数字/下划线 | \w+ 匹配单词 |
\s |
空白字符 | \s+ 匹配多个空格 |
(?P<name>...) |
命名分组 | m.group("name") |
(?:...) |
非捕获分组 | 分组但不捕获 |
\b |
单词边界 | \bpython\b |
.*? |
非贪婪匹配 | 尽量少匹配 |
NexDo Time ⚡
5 分钟极客微操:给 log_cleaner.py 增加 --output FILE 参数,将结构化日志以 CSV 格式写入文件(用 csv.DictWriter),字段为 timestamp,level,module,message。
Don’t wait for next time, do it in the next moment.