文章

5 · 文本清洗:字符串高阶操作与正则

#005 · 2026-04-16 · Python

🔗 知识图谱导航:阅读本文前,建议先掌握/回顾 《4 · 任务图谱基础:列表、字典与集合》 中的核心概念;本文会在这个基础上继续推进。 承上启下:上一篇我们用数据结构构建了任务图谱。现在处理最常见的原始数据形态——非结构化文本。日志解析、数据清洗、配置提取,都离不开字符串操作与正则。

极客解析:先把数据流、控制流和模块边界跑通,再谈抽象;每段代码都围绕一个可执行 CLI 闭环展开。


痛点与架构

原始日志文本(混乱格式)
    ↓ str 方法(strip/split/replace)
初步清洗
    ↓ re.compile + 命名分组
结构化提取
    ↓ f-string 格式化输出
可读报告
工具 适用场景 性能
str 方法 简单分割、替换、判断 最快
re 模块 复杂模式匹配、提取 中等
re.compile 同一模式多次使用 编译后更快

实战演练场

步步为营:核心逻辑自适应拆解

这一篇的目标很明确:把一堆看起来乱糟糟的日志文本,洗成 Python 能理解的结构化数据。不要急着吞完整脚本,我们先按“原料 -> 规则 -> 数据盒子 -> 单行解析 -> 批量处理 -> 展示统计”的顺序拆开。

Step 1:先准备一篮子脏日志,让脚本有东西可洗

痛点与机制

日志清洗的第一步不是写正则,而是先准备可重复的样本。你可以把 RAW_LOGS 想成一篮子待清洗的菜:有新鲜的正常日志,也故意混入空白行和坏行。这样脚本每次运行都能复现同样结果,新手不用先搭服务器、造文件、接数据库。

核心源码(逐字来自文末完整源码)

RAW_LOGS: list[str] = [
    "2026-04-16 10:01:23 INFO  [auth]     用户登录成功 user_id=1001 ip=192.168.1.10",
    "2026-04-16 10:02:11 ERROR [db]       数据库连接超时 host=mysql:3306 retry=1",
    "2026-04-16 10:03:45 INFO  [api]      GET /api/tasks 200 45ms",
    "2026-04-16 10:04:02 ERROR [db]       数据库连接超时 host=mysql:3306 retry=2",
    "2026-04-16 10:05:18 WARN  [memory]   内存使用率 85% threshold=80%",
    "2026-04-16 10:06:33 ERROR [file]     文件不存在 path=/data/config.json",
    "2026-04-16 10:07:01 INFO  [auth]     用户登录成功 user_id=1002 ip=10.0.0.5",
    "2026-04-16 10:08:44 ERROR [db]       连接池耗尽 pool_size=10 waiting=23",
    "2026-04-16 10:09:12 INFO  [api]      POST /api/tasks 201 120ms",
    "2026-04-16 10:10:55 INFO  [scheduler] 定时任务完成 task=backup duration=3.2s",
    "  ",                                    # 空白行
    "MALFORMED LINE WITHOUT TIMESTAMP",      # 格式错误行
]

可运行演示(补齐 Mock 数据与 print 反馈)


RAW_LOGS: list[str] = [
    "2026-04-16 10:01:23 INFO  [auth]     用户登录成功 user_id=1001 ip=192.168.1.10",
    "2026-04-16 10:02:11 ERROR [db]       数据库连接超时 host=mysql:3306 retry=1",
    "2026-04-16 10:03:45 INFO  [api]      GET /api/tasks 200 45ms",
    "2026-04-16 10:04:02 ERROR [db]       数据库连接超时 host=mysql:3306 retry=2",
    "2026-04-16 10:05:18 WARN  [memory]   内存使用率 85% threshold=80%",
    "2026-04-16 10:06:33 ERROR [file]     文件不存在 path=/data/config.json",
    "2026-04-16 10:07:01 INFO  [auth]     用户登录成功 user_id=1002 ip=10.0.0.5",
    "2026-04-16 10:08:44 ERROR [db]       连接池耗尽 pool_size=10 waiting=23",
    "2026-04-16 10:09:12 INFO  [api]      POST /api/tasks 201 120ms",
    "2026-04-16 10:10:55 INFO  [scheduler] 定时任务完成 task=backup duration=3.2s",
    "  ",                                    # 空白行
    "MALFORMED LINE WITHOUT TIMESTAMP",      # 格式错误行
]

print(f"一共准备了 {len(RAW_LOGS)} 行原始日志")
print("第一行正常日志:", RAW_LOGS[0])
print("最后一行故意放坏数据:", RAW_LOGS[-1])

Step 2:用命名分组正则给日志贴标签

痛点与机制

这里的 (?P<timestamp>...) 是“带名字的捕获框”。普通正则只告诉你匹配到了什么,命名分组会顺手贴上字段名,就像快递面单上写清楚姓名、电话、地址。后面的 KV_PATTERN 专门从消息里捞 user_id=1001 这种键值对。

核心源码(逐字来自文末完整源码)

LOG_PATTERN = re.compile(
    r"(?P<timestamp>\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2})\s+"
    r"(?P<level>INFO|WARN|ERROR|DEBUG)\s+"
    r"\[(?P<module>\w+)\]\s+"
    r"(?P<message>.+)"
)
KV_PATTERN = re.compile(r"(\w+)=([^\s]+)")

可运行演示(补齐 Mock 数据与 print 反馈)

import re

# Step 2:正则像一把带标签的尺子,timestamp/level/module/message 都会被量出来。
LOG_PATTERN = re.compile(
    r"(?P<timestamp>\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2})\s+"
    r"(?P<level>INFO|WARN|ERROR|DEBUG)\s+"
    r"\[(?P<module>\w+)\]\s+"
    r"(?P<message>.+)"
)
KV_PATTERN = re.compile(r"(\w+)=([^\s]+)")

sample = "2026-04-16 10:02:11 ERROR [db] 数据库连接超时 host=mysql:3306 retry=1"
match = LOG_PATTERN.match(sample)
print("原始日志:", sample)
print("是否匹配成功:", bool(match))
if match:
    print("拆出来的字段:", match.groupdict())
    print("额外的 key=value:", dict(KV_PATTERN.findall(match.group("message"))))

Step 3:用 dataclass 做一个标准日志收纳盒

痛点与机制

LogEntry 的作用是把“散落的字符串”装成一个有固定格子的盒子。以后你想取模块就用 entry.module,想取 IP 就看 entry.kv_pairs,不用在一长串文本里反复切来切去。

核心源码(逐字来自文末完整源码)

@dataclass
class LogEntry:
    timestamp: str
    level: str
    module: str
    message: str
    kv_pairs: dict[str, str]   # 提取的 key=value 对

可运行演示(补齐 Mock 数据与 print 反馈)

from dataclasses import dataclass

# Step 3:dataclass 像一个标准收纳盒,字段名和字段类型都提前贴好标签。
@dataclass
class LogEntry:
    timestamp: str
    level: str
    module: str
    message: str
    kv_pairs: dict[str, str]   # 提取的 key=value 对

entry = LogEntry(
    timestamp="2026-04-16 10:01:23",
    level="INFO",
    module="auth",
    message="用户登录成功 user_id=1001 ip=192.168.1.10",
    kv_pairs={"user_id": "1001", "ip": "192.168.1.10"},
)
print("完整对象:", entry)
print("只取模块:", entry.module)
print("只取 IP:", entry.kv_pairs["ip"])

Step 4:把一行日志送进安检口,成功才变成对象

痛点与机制

parse_log_line() 是单行日志的“安检口”:空行直接放弃,格式对不上也返回 None,格式正确才会变成 LogEntry。这个设计能避免坏数据把后续统计代码拖崩。

核心源码(逐字来自文末完整源码)

def parse_log_line(line: str) -> LogEntry | None:
    """解析单行日志,失败返回 None。"""
    line = line.strip()
    if not line:
        return None
    m = LOG_PATTERN.match(line)
    if not m:
        return None
    kv_pairs = dict(KV_PATTERN.findall(m.group("message")))
    return LogEntry(
        timestamp=m.group("timestamp"),
        level=m.group("level"),
        module=m.group("module"),
        message=m.group("message"),
        kv_pairs=kv_pairs,
    )

可运行演示(补齐 Mock 数据与 print 反馈)

import re
from dataclasses import dataclass

@dataclass
class LogEntry:
    timestamp: str
    level: str
    module: str
    message: str
    kv_pairs: dict[str, str]   # 提取的 key=value 对

LOG_PATTERN = re.compile(
    r"(?P<timestamp>\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2})\s+"
    r"(?P<level>INFO|WARN|ERROR|DEBUG)\s+"
    r"\[(?P<module>\w+)\]\s+"
    r"(?P<message>.+)"
)
KV_PATTERN = re.compile(r"(\w+)=([^\s]+)")

# Step 4:这个函数只负责一件事:把“一行文本”变成“一个 LogEntry”。
def parse_log_line(line: str) -> LogEntry | None:
    """解析单行日志,失败返回 None。"""
    line = line.strip()
    if not line:
        return None
    m = LOG_PATTERN.match(line)
    if not m:
        return None
    kv_pairs = dict(KV_PATTERN.findall(m.group("message")))
    return LogEntry(
        timestamp=m.group("timestamp"),
        level=m.group("level"),
        module=m.group("module"),
        message=m.group("message"),
        kv_pairs=kv_pairs,
    )

ok_line = "2026-04-16 10:01:23 INFO [auth] 用户登录成功 user_id=1001 ip=192.168.1.10"
bad_line = "MALFORMED LINE WITHOUT TIMESTAMP"
print("正常行解析:", parse_log_line(ok_line))
print("坏行解析:", parse_log_line(bad_line))

Step 5:批量解析时统计坏行,别让一条脏数据拖垮全局

痛点与机制

真实日志里一定有坏行。初学者常见错误是遇到一行异常就让程序崩掉;这里用 failed 计数,等于在工厂流水线上设置“不合格品箱”,坏件被记录,正常件继续往前走。

核心源码(逐字来自文末完整源码)

def parse_all(raw: list[str]) -> tuple[list[LogEntry], int]:
    """解析全部日志,返回(成功列表,失败数量)。"""
    entries, failed = [], 0
    for line in raw:
        entry = parse_log_line(line)
        if entry:
            entries.append(entry)
        elif line.strip():   # 非空但解析失败
            failed += 1
    return entries, failed

可运行演示(补齐 Mock 数据与 print 反馈)

import re
from dataclasses import dataclass

RAW_LOGS: list[str] = [
    "2026-04-16 10:01:23 INFO  [auth]     用户登录成功 user_id=1001 ip=192.168.1.10",
    "2026-04-16 10:02:11 ERROR [db]       数据库连接超时 host=mysql:3306 retry=1",
    "2026-04-16 10:03:45 INFO  [api]      GET /api/tasks 200 45ms",
    "2026-04-16 10:04:02 ERROR [db]       数据库连接超时 host=mysql:3306 retry=2",
    "2026-04-16 10:05:18 WARN  [memory]   内存使用率 85% threshold=80%",
    "2026-04-16 10:06:33 ERROR [file]     文件不存在 path=/data/config.json",
    "2026-04-16 10:07:01 INFO  [auth]     用户登录成功 user_id=1002 ip=10.0.0.5",
    "2026-04-16 10:08:44 ERROR [db]       连接池耗尽 pool_size=10 waiting=23",
    "2026-04-16 10:09:12 INFO  [api]      POST /api/tasks 201 120ms",
    "2026-04-16 10:10:55 INFO  [scheduler] 定时任务完成 task=backup duration=3.2s",
    "  ",                                    # 空白行
    "MALFORMED LINE WITHOUT TIMESTAMP",      # 格式错误行
]

@dataclass
class LogEntry:
    timestamp: str
    level: str
    module: str
    message: str
    kv_pairs: dict[str, str]   # 提取的 key=value 对

LOG_PATTERN = re.compile(
    r"(?P<timestamp>\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2})\s+"
    r"(?P<level>INFO|WARN|ERROR|DEBUG)\s+"
    r"\[(?P<module>\w+)\]\s+"
    r"(?P<message>.+)"
)
KV_PATTERN = re.compile(r"(\w+)=([^\s]+)")

def parse_log_line(line: str) -> LogEntry | None:
    """解析单行日志,失败返回 None。"""
    line = line.strip()
    if not line:
        return None
    m = LOG_PATTERN.match(line)
    if not m:
        return None
    kv_pairs = dict(KV_PATTERN.findall(m.group("message")))
    return LogEntry(
        timestamp=m.group("timestamp"),
        level=m.group("level"),
        module=m.group("module"),
        message=m.group("message"),
        kv_pairs=kv_pairs,
    )

# Step 5:批量处理像流水线,坏件要计数,但不能让整条流水线停掉。
def parse_all(raw: list[str]) -> tuple[list[LogEntry], int]:
    """解析全部日志,返回(成功列表,失败数量)。"""
    entries, failed = [], 0
    for line in raw:
        entry = parse_log_line(line)
        if entry:
            entries.append(entry)
        elif line.strip():   # 非空但解析失败
            failed += 1
    return entries, failed

entries, failed = parse_all(RAW_LOGS)
print(f"成功解析: {len(entries)} 条")
print(f"格式错误: {failed} 条")
print("前三条结构化结果:")
for item in entries[:3]:
    print(f"- {item.timestamp} | {item.level} | {item.module} | {item.message}")

Step 6:用字符串方法和 f-string 把输出排整齐

痛点与机制

strip() 去掉首尾空白,split() 拆分,replace() 替换,f-string 负责把结果排版。把它们想成办公桌上的剪刀、胶水和尺子:很多简单文本清洗不需要正则,字符串方法就足够快、足够清楚。

核心源码(逐字来自文末完整源码)

def demo_str_methods() -> None:
    print("\n  ── 字符串方法速查 ────────────────────────")
    s = "  Hello, Python 3.12!  "

    ops: list[tuple[str, str]] = [
        ("strip()",              repr(s.strip())),
        ("lower()",              repr(s.strip().lower())),
        ("upper()",              repr(s.strip().upper())),
        ("replace('Python','Go')", repr(s.replace("Python", "Go"))),
        ("split(',')",           repr(s.strip().split(","))),
        ("startswith('  H')",    repr(s.startswith("  H"))),
        ("find('Python')",       repr(s.find("Python"))),
        ("f-string 对齐",        f"'{s.strip():^30}'"),
        ("f-string 千位符",      f"{1_234_567:,}"),
        ("f-string 精度",        f"{3.14159:.2f}"),
    ]
    for name, result in ops:
        print(f"  {name:<28} → {result}")

可运行演示(补齐 Mock 数据与 print 反馈)

# Step 6:字符串方法是文本处理的基础工具箱,先会这些,再上正则。
def demo_str_methods() -> None:
    print("\n  ── 字符串方法速查 ────────────────────────")
    s = "  Hello, Python 3.12!  "

    ops: list[tuple[str, str]] = [
        ("strip()",              repr(s.strip())),
        ("lower()",              repr(s.strip().lower())),
        ("upper()",              repr(s.strip().upper())),
        ("replace('Python','Go')", repr(s.replace("Python", "Go"))),
        ("split(',')",           repr(s.strip().split(","))),
        ("startswith('  H')",    repr(s.startswith("  H"))),
        ("find('Python')",       repr(s.find("Python"))),
        ("f-string 对齐",        f"'{s.strip():^30}'"),
        ("f-string 千位符",      f"{1_234_567:,}"),
        ("f-string 精度",        f"{3.14159:.2f}"),
    ]
    for name, result in ops:
        print(f"  {name:<28}{result}")

demo_str_methods()

Step 7:用正则完成 IP 提取、URL 拆解和隐私脱敏

痛点与机制

正则不是越复杂越好,而是专门解决“按模式找东西”的问题。IP 像门牌号,URL 像一张快递路线单,脱敏则像给敏感位置打马赛克。运行这一步,你能看到提取、拆字段、替换三种最常见用法。

核心源码(逐字来自文末完整源码)

def demo_regex() -> None:
    print("\n  ── 正则表达式演示 ────────────────────────")

    # 1. 提取 IP 地址
    text = "服务器 192.168.1.10 和 10.0.0.5 均已上线"
    ips = re.findall(r"\b\d{1,3}(?:\.\d{1,3}){3}\b", text)
    print(f"  提取IP: {ips}")

    # 2. 命名分组
    url = "https://api.example.com:8080/v1/tasks?page=2"
    url_pattern = re.compile(
        r"(?P<scheme>https?)://(?P<host>[^:/]+)(?::(?P<port>\d+))?"
        r"(?P<path>/[^?]*)(?:\?(?P<query>.+))?"
    )
    m = url_pattern.match(url)
    if m:
        for k, v in m.groupdict().items():
            print(f"  {k:<8} → {v}")

    # 3. 替换(脱敏)
    log_line = "用户 user_id=1001 ip=192.168.1.10 登录"
    desensitized = re.sub(r"\b\d{1,3}(?:\.\d{1,3}){3}\b", "***.***.***", log_line)
    print(f"\n  脱敏前: {log_line}")
    print(f"  脱敏后: {desensitized}")

可运行演示(补齐 Mock 数据与 print 反馈)

import re

# Step 7:正则适合“形状固定但内容变化”的文本,比如 IP、URL、手机号。
def demo_regex() -> None:
    print("\n  ── 正则表达式演示 ────────────────────────")

    # 1. 提取 IP 地址
    text = "服务器 192.168.1.10 和 10.0.0.5 均已上线"
    ips = re.findall(r"\b\d{1,3}(?:\.\d{1,3}){3}\b", text)
    print(f"  提取IP: {ips}")

    # 2. 命名分组
    url = "https://api.example.com:8080/v1/tasks?page=2"
    url_pattern = re.compile(
        r"(?P<scheme>https?)://(?P<host>[^:/]+)(?::(?P<port>\d+))?"
        r"(?P<path>/[^?]*)(?:\?(?P<query>.+))?"
    )
    m = url_pattern.match(url)
    if m:
        for k, v in m.groupdict().items():
            print(f"  {k:<8}{v}")

    # 3. 替换(脱敏)
    log_line = "用户 user_id=1001 ip=192.168.1.10 登录"
    desensitized = re.sub(r"\b\d{1,3}(?:\.\d{1,3}){3}\b", "***.***.***", log_line)
    print(f"\n  脱敏前: {log_line}")
    print(f"  脱敏后: {desensitized}")

demo_regex()

Step 8:把结构化日志变成表格和统计图条

痛点与机制

demo_extract() 负责列表展示,demo_stats() 负责汇总统计。一个给你看明细,一个给你看全局趋势。Counter 就像自动点票器,会把 INFO、WARN、ERROR 各出现几次数清楚,再用字符条做一个简单的终端版小图表。

核心源码(逐字来自文末完整源码)

def demo_extract(entries: list[LogEntry], keyword: str | None) -> None:
    filtered = entries
    if keyword:
        filtered = [e for e in entries if keyword.upper() in e.level]

    print(f"\n  ── 结构化日志({'全部' if not keyword else keyword})─────────")
    print(f"  {'时间':<20} {'级别':<6} {'模块':<12} {'消息'}")
    print(f"  {'─'*20} {'─'*6} {'─'*12} {'─'*30}")
    for e in filtered:
        level_icon = {"INFO": "ℹ️ ", "WARN": "⚠️ ", "ERROR": "🔴"}.get(e.level, "  ")
        print(f"  {e.timestamp:<20} {level_icon}{e.level:<4} [{e.module}]{'':<6} {e.message[:40]}")


def demo_stats(entries: list[LogEntry]) -> None:
    level_counts = Counter(e.level for e in entries)
    module_counts = Counter(e.module for e in entries)

    print("\n  ── 日志统计 ──────────────────────────────")
    print("  按级别:")
    for level, count in sorted(level_counts.items()):
        bar = "█" * count
        print(f"    {level:<6} {count:>3}  {bar}")

    print("\n  按模块:")
    for module, count in module_counts.most_common():
        print(f"    [{module}]{'':<8} {count} 条")

可运行演示(补齐 Mock 数据与 print 反馈)

import re
from collections import Counter
from dataclasses import dataclass

RAW_LOGS: list[str] = [
    "2026-04-16 10:01:23 INFO  [auth]     用户登录成功 user_id=1001 ip=192.168.1.10",
    "2026-04-16 10:02:11 ERROR [db]       数据库连接超时 host=mysql:3306 retry=1",
    "2026-04-16 10:03:45 INFO  [api]      GET /api/tasks 200 45ms",
    "2026-04-16 10:04:02 ERROR [db]       数据库连接超时 host=mysql:3306 retry=2",
    "2026-04-16 10:05:18 WARN  [memory]   内存使用率 85% threshold=80%",
    "2026-04-16 10:06:33 ERROR [file]     文件不存在 path=/data/config.json",
    "2026-04-16 10:07:01 INFO  [auth]     用户登录成功 user_id=1002 ip=10.0.0.5",
    "2026-04-16 10:08:44 ERROR [db]       连接池耗尽 pool_size=10 waiting=23",
    "2026-04-16 10:09:12 INFO  [api]      POST /api/tasks 201 120ms",
    "2026-04-16 10:10:55 INFO  [scheduler] 定时任务完成 task=backup duration=3.2s",
    "  ",                                    # 空白行
    "MALFORMED LINE WITHOUT TIMESTAMP",      # 格式错误行
]

@dataclass
class LogEntry:
    timestamp: str
    level: str
    module: str
    message: str
    kv_pairs: dict[str, str]   # 提取的 key=value 对

LOG_PATTERN = re.compile(
    r"(?P<timestamp>\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2})\s+"
    r"(?P<level>INFO|WARN|ERROR|DEBUG)\s+"
    r"\[(?P<module>\w+)\]\s+"
    r"(?P<message>.+)"
)
KV_PATTERN = re.compile(r"(\w+)=([^\s]+)")

def parse_log_line(line: str) -> LogEntry | None:
    """解析单行日志,失败返回 None。"""
    line = line.strip()
    if not line:
        return None
    m = LOG_PATTERN.match(line)
    if not m:
        return None
    kv_pairs = dict(KV_PATTERN.findall(m.group("message")))
    return LogEntry(
        timestamp=m.group("timestamp"),
        level=m.group("level"),
        module=m.group("module"),
        message=m.group("message"),
        kv_pairs=kv_pairs,
    )

def parse_all(raw: list[str]) -> tuple[list[LogEntry], int]:
    """解析全部日志,返回(成功列表,失败数量)。"""
    entries, failed = [], 0
    for line in raw:
        entry = parse_log_line(line)
        if entry:
            entries.append(entry)
        elif line.strip():   # 非空但解析失败
            failed += 1
    return entries, failed

# Step 8:最后把“能用的数据”变成“人能看懂的报告”。
def demo_extract(entries: list[LogEntry], keyword: str | None) -> None:
    filtered = entries
    if keyword:
        filtered = [e for e in entries if keyword.upper() in e.level]

    print(f"\n  ── 结构化日志({'全部' if not keyword else keyword})─────────")
    print(f"  {'时间':<20} {'级别':<6} {'模块':<12} {'消息'}")
    print(f"  {'─'*20} {'─'*6} {'─'*12} {'─'*30}")
    for e in filtered:
        level_icon = {"INFO": "ℹ️ ", "WARN": "⚠️ ", "ERROR": "🔴"}.get(e.level, "  ")
        print(f"  {e.timestamp:<20} {level_icon}{e.level:<4} [{e.module}]{'':<6} {e.message[:40]}")


def demo_stats(entries: list[LogEntry]) -> None:
    level_counts = Counter(e.level for e in entries)
    module_counts = Counter(e.module for e in entries)

    print("\n  ── 日志统计 ──────────────────────────────")
    print("  按级别:")
    for level, count in sorted(level_counts.items()):
        bar = "█" * count
        print(f"    {level:<6} {count:>3}  {bar}")

    print("\n  按模块:")
    for module, count in module_counts.most_common():
        print(f"    [{module}]{'':<8} {count} 条")

entries, failed = parse_all(RAW_LOGS)
print(f"解析完成: {len(entries)} 条成功,{failed} 条格式错误")
demo_extract(entries, keyword="ERROR")
demo_stats(entries)

极客实战:完整源码与运行

现在,把上面的积木拼起来,将以下完整代码放进你的编辑器,运行它。先看整体闭环,再回头逐段改参数,你会更容易建立工程直觉。

# log_cleaner.py
"""
日志清洗与结构化提取工具。
用法:
    python3 log_cleaner.py
    python3 log_cleaner.py --mode extract
    python3 log_cleaner.py --mode stats
    python3 log_cleaner.py --keyword ERROR
"""

import argparse
import re
from collections import Counter
from dataclasses import dataclass
from datetime import datetime

# ── Mock 日志数据(零外部依赖)────────────────────────────────
RAW_LOGS: list[str] = [
    "2026-04-16 10:01:23 INFO  [auth]     用户登录成功 user_id=1001 ip=192.168.1.10",
    "2026-04-16 10:02:11 ERROR [db]       数据库连接超时 host=mysql:3306 retry=1",
    "2026-04-16 10:03:45 INFO  [api]      GET /api/tasks 200 45ms",
    "2026-04-16 10:04:02 ERROR [db]       数据库连接超时 host=mysql:3306 retry=2",
    "2026-04-16 10:05:18 WARN  [memory]   内存使用率 85% threshold=80%",
    "2026-04-16 10:06:33 ERROR [file]     文件不存在 path=/data/config.json",
    "2026-04-16 10:07:01 INFO  [auth]     用户登录成功 user_id=1002 ip=10.0.0.5",
    "2026-04-16 10:08:44 ERROR [db]       连接池耗尽 pool_size=10 waiting=23",
    "2026-04-16 10:09:12 INFO  [api]      POST /api/tasks 201 120ms",
    "2026-04-16 10:10:55 INFO  [scheduler] 定时任务完成 task=backup duration=3.2s",
    "  ",                                    # 空白行
    "MALFORMED LINE WITHOUT TIMESTAMP",      # 格式错误行
]


# ── 数据类:结构化日志条目 ────────────────────────────────────
@dataclass
class LogEntry:
    timestamp: str
    level: str
    module: str
    message: str
    kv_pairs: dict[str, str]   # 提取的 key=value 对


# ── 编译正则(复用,性能更好)────────────────────────────────
LOG_PATTERN = re.compile(
    r"(?P<timestamp>\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2})\s+"
    r"(?P<level>INFO|WARN|ERROR|DEBUG)\s+"
    r"\[(?P<module>\w+)\]\s+"
    r"(?P<message>.+)"
)
KV_PATTERN = re.compile(r"(\w+)=([^\s]+)")


def parse_log_line(line: str) -> LogEntry | None:
    """解析单行日志,失败返回 None。"""
    line = line.strip()
    if not line:
        return None
    m = LOG_PATTERN.match(line)
    if not m:
        return None
    kv_pairs = dict(KV_PATTERN.findall(m.group("message")))
    return LogEntry(
        timestamp=m.group("timestamp"),
        level=m.group("level"),
        module=m.group("module"),
        message=m.group("message"),
        kv_pairs=kv_pairs,
    )


def parse_all(raw: list[str]) -> tuple[list[LogEntry], int]:
    """解析全部日志,返回(成功列表,失败数量)。"""
    entries, failed = [], 0
    for line in raw:
        entry = parse_log_line(line)
        if entry:
            entries.append(entry)
        elif line.strip():   # 非空但解析失败
            failed += 1
    return entries, failed


# ── 字符串操作演示 ────────────────────────────────────────────
def demo_str_methods() -> None:
    print("\n  ── 字符串方法速查 ────────────────────────")
    s = "  Hello, Python 3.12!  "

    ops: list[tuple[str, str]] = [
        ("strip()",              repr(s.strip())),
        ("lower()",              repr(s.strip().lower())),
        ("upper()",              repr(s.strip().upper())),
        ("replace('Python','Go')", repr(s.replace("Python", "Go"))),
        ("split(',')",           repr(s.strip().split(","))),
        ("startswith('  H')",    repr(s.startswith("  H"))),
        ("find('Python')",       repr(s.find("Python"))),
        ("f-string 对齐",        f"'{s.strip():^30}'"),
        ("f-string 千位符",      f"{1_234_567:,}"),
        ("f-string 精度",        f"{3.14159:.2f}"),
    ]
    for name, result in ops:
        print(f"  {name:<28}{result}")


# ── 正则演示 ──────────────────────────────────────────────────
def demo_regex() -> None:
    print("\n  ── 正则表达式演示 ────────────────────────")

    # 1. 提取 IP 地址
    text = "服务器 192.168.1.10 和 10.0.0.5 均已上线"
    ips = re.findall(r"\b\d{1,3}(?:\.\d{1,3}){3}\b", text)
    print(f"  提取IP: {ips}")

    # 2. 命名分组
    url = "https://api.example.com:8080/v1/tasks?page=2"
    url_pattern = re.compile(
        r"(?P<scheme>https?)://(?P<host>[^:/]+)(?::(?P<port>\d+))?"
        r"(?P<path>/[^?]*)(?:\?(?P<query>.+))?"
    )
    m = url_pattern.match(url)
    if m:
        for k, v in m.groupdict().items():
            print(f"  {k:<8}{v}")

    # 3. 替换(脱敏)
    log_line = "用户 user_id=1001 ip=192.168.1.10 登录"
    desensitized = re.sub(r"\b\d{1,3}(?:\.\d{1,3}){3}\b", "***.***.***", log_line)
    print(f"\n  脱敏前: {log_line}")
    print(f"  脱敏后: {desensitized}")


# ── 日志分析 ──────────────────────────────────────────────────
def demo_extract(entries: list[LogEntry], keyword: str | None) -> None:
    filtered = entries
    if keyword:
        filtered = [e for e in entries if keyword.upper() in e.level]

    print(f"\n  ── 结构化日志({'全部' if not keyword else keyword})─────────")
    print(f"  {'时间':<20} {'级别':<6} {'模块':<12} {'消息'}")
    print(f"  {'─'*20} {'─'*6} {'─'*12} {'─'*30}")
    for e in filtered:
        level_icon = {"INFO": "ℹ️ ", "WARN": "⚠️ ", "ERROR": "🔴"}.get(e.level, "  ")
        print(f"  {e.timestamp:<20} {level_icon}{e.level:<4} [{e.module}]{'':<6} {e.message[:40]}")


def demo_stats(entries: list[LogEntry]) -> None:
    level_counts = Counter(e.level for e in entries)
    module_counts = Counter(e.module for e in entries)

    print("\n  ── 日志统计 ──────────────────────────────")
    print("  按级别:")
    for level, count in sorted(level_counts.items()):
        bar = "█" * count
        print(f"    {level:<6} {count:>3}  {bar}")

    print("\n  按模块:")
    for module, count in module_counts.most_common():
        print(f"    [{module}]{'':<8} {count} 条")


def main() -> None:
    parser = argparse.ArgumentParser(description="日志清洗与结构化提取工具")
    parser.add_argument("--mode", choices=["str", "regex", "extract", "stats", "all"], default="all")
    parser.add_argument("--keyword", help="按级别关键词过滤(如 ERROR)")
    args = parser.parse_args()

    entries, failed = parse_all(RAW_LOGS)
    print(f"\n  解析完成: {len(entries)} 条成功,{failed} 条格式错误")

    if args.mode in ("str", "all"):
        demo_str_methods()
    if args.mode in ("regex", "all"):
        demo_regex()
    if args.mode in ("extract", "all"):
        demo_extract(entries, args.keyword)
    if args.mode in ("stats", "all"):
        demo_stats(entries)


if __name__ == "__main__":
    main()

终端预期输出(--keyword ERROR):

$ python3 log_cleaner.py --mode extract --keyword ERROR

  解析完成: 10 条成功,1 条格式错误

  ── 结构化日志(ERROR)─────────
  时间                 级别   模块         消息
  ──────────────────── ────── ──────────── ──────────────────────────────
  2026-04-16 10:02:11  🔴ERROR [db]        数据库连接超时 host=mysql:3306
  2026-04-16 10:04:02  🔴ERROR [db]        数据库连接超时 host=mysql:3306
  2026-04-16 10:06:33  🔴ERROR [file]      文件不存在 path=/data/config.json
  2026-04-16 10:08:44  🔴ERROR [db]        连接池耗尽 pool_size=10 waiting=23

正则速查表

元字符 含义 示例
\d 数字 \d{4} 匹配4位数字
\w 字母/数字/下划线 \w+ 匹配单词
\s 空白字符 \s+ 匹配多个空格
(?P<name>...) 命名分组 m.group("name")
(?:...) 非捕获分组 分组但不捕获
\b 单词边界 \bpython\b
.*? 非贪婪匹配 尽量少匹配

NexDo Time ⚡

5 分钟极客微操:给 log_cleaner.py 增加 --output FILE 参数,将结构化日志以 CSV 格式写入文件(用 csv.DictWriter),字段为 timestamp,level,module,message

Don’t wait for next time, do it in the next moment.