文章

56 · 收官项目:本地知识库 CLI 工具(全栈综合实战)

#028 · 2026-04-16 · Python

🔗 知识图谱导航:阅读本文前,建议先回顾《08 · 模块化构建:标准库与自定义包》《16 · 数据库底座:SQLite 核心操作》《53 · 类型系统进阶》《54 · 性能探针》和《55 · 综合项目:网络词典》。本文会把整个系列的 CLI、数据建模、SQLite、文件导出和工程分层收束成一个完整工具。 NexDo Time · 2026-04-17 · 预计阅读 38 分钟

运行环境:Python 3.12+ 标准库,零额外依赖,直接运行。

痛点与架构

学完 55 篇之后,最后需要一个真正能自己改、自己用、自己扩展的小工具。本地知识库 CLI 正好覆盖整个系列的关键能力:命令行参数、数据模型、SQLite 持久化、全文搜索、标签统计、JSON/Markdown 导出。

这个项目按三层设计:数据模型负责定义条目长什么样,数据层负责和 SQLite 打交道,业务层负责提供“添加、搜索、统计、导出”等用户动作。输出层再把结果整理成表格和报告。

argparse CLI
  -> KnowledgeBase 业务层
  -> KnowledgeDB 数据层
  -> SQLite 内存库
  -> 表格 / 统计图 / JSON / Markdown

步步为营:核心逻辑自适应拆解

收官项目代码密度高,所以拆成 9 个小步骤。你先跑通每层的小闭环,再看文末完整源码,整体就不会乱。

Step 1:用 Entry dataclass 统一知识条目的数据形状

痛点与机制

综合项目先要定义清楚“数据长什么样”。Entry 像知识库里的身份证,包含标题、内容、标签、ID 和创建时间。from_row() 负责把 SQLite 查出来的元组重新装回对象,避免业务层到处处理裸 tuple。

核心源码(逐字来自文末完整源码)

@dataclass
class Entry:
    title: str
    content: str
    tags: list[str] = field(default_factory=list)
    id: int = 0
    created_at: str = field(default_factory=lambda: datetime.now().strftime("%Y-%m-%d %H:%M"))

    @property
    def tag_str(self) -> str:
        return ", ".join(self.tags)

    @classmethod
    def from_row(cls, row: tuple) -> "Entry":
        id_, title, content, tags_json, created_at = row
        return cls(
            id=id_,
            title=title,
            content=content,
            tags=json.loads(tags_json) if tags_json else [],
            created_at=created_at,
        )

可运行演示(补齐 Mock 数据与 print 反馈)

import argparse
import json
import re
import sqlite3
import tempfile
import time
from collections import Counter
from dataclasses import dataclass, field
from datetime import datetime
from pathlib import Path
from typing import Optional

@dataclass
class Entry:
    title: str
    content: str
    tags: list[str] = field(default_factory=list)
    id: int = 0
    created_at: str = field(default_factory=lambda: datetime.now().strftime("%Y-%m-%d %H:%M"))

    @property
    def tag_str(self) -> str:
        return ", ".join(self.tags)

    @classmethod
    def from_row(cls, row: tuple) -> "Entry":
        id_, title, content, tags_json, created_at = row
        return cls(
            id=id_,
            title=title,
            content=content,
            tags=json.loads(tags_json) if tags_json else [],
            created_at=created_at,
        )

entry = Entry(title="Python 类型提示", content="Protocol 和 Generic 能提升维护性", tags=["python", "typing"])
print("标题:", entry.title)
print("标签字符串:", entry.tag_str)
row = (7, "SQLite 索引", "给 title 建索引能加速搜索", '["sqlite", "性能"]', "2026-04-17 20:00")
print("从数据库行还原:", Entry.from_row(row))

Step 2:用 KnowledgeDB 把 SQLite 细节封装在数据层

痛点与机制

数据层像仓库管理员:负责建表、插入、搜索、分页、统计标签。业务代码不用知道 SQL 怎么写,只需要调用 add/search/list_all。这能降低项目耦合,后续换数据库也更容易。

核心源码(逐字来自文末完整源码)

class KnowledgeDB:
    def __init__(self, db_path: str = ":memory:") -> None:
        self._conn = sqlite3.connect(db_path)
        self._init()

    def _init(self) -> None:
        self._conn.execute("""
            CREATE TABLE IF NOT EXISTS entries (
                id         INTEGER PRIMARY KEY AUTOINCREMENT,
                title      TEXT NOT NULL,
                content    TEXT NOT NULL,
                tags       TEXT DEFAULT '[]',
                created_at TEXT DEFAULT (datetime('now','localtime'))
            )
        """)
        self._conn.execute(
            "CREATE INDEX IF NOT EXISTS idx_title ON entries(title)"
        )
        self._conn.commit()

    def add(self, entry: Entry) -> int:
        cur = self._conn.execute(
            "INSERT INTO entries (title, content, tags) VALUES (?, ?, ?)",
            (entry.title, entry.content, json.dumps(entry.tags, ensure_ascii=False)),
        )
        self._conn.commit()
        return cur.lastrowid or 0

    def search(self, keyword: str) -> list[Entry]:
        rows = self._conn.execute(
            "SELECT id, title, content, tags, created_at FROM entries "
            "WHERE title LIKE ? OR content LIKE ? OR tags LIKE ?",
            (f"%{keyword}%",) * 3,
        ).fetchall()
        return [Entry.from_row(r) for r in rows]

    def list_all(self, limit: int = 20, offset: int = 0) -> list[Entry]:
        rows = self._conn.execute(
            "SELECT id, title, content, tags, created_at FROM entries "
            "ORDER BY id DESC LIMIT ? OFFSET ?",
            (limit, offset),
        ).fetchall()
        return [Entry.from_row(r) for r in rows]

    def count(self) -> int:
        return self._conn.execute("SELECT COUNT(*) FROM entries").fetchone()[0]

    def all_tags(self) -> list[str]:
        rows = self._conn.execute("SELECT tags FROM entries").fetchall()
        tags = []
        for (tags_json,) in rows:
            tags.extend(json.loads(tags_json) if tags_json else [])
        return tags

    def close(self) -> None:
        self._conn.close()

可运行演示(补齐 Mock 数据与 print 反馈)

import argparse
import json
import re
import sqlite3
import tempfile
import time
from collections import Counter
from dataclasses import dataclass, field
from datetime import datetime
from pathlib import Path
from typing import Optional

@dataclass
class Entry:
    title: str
    content: str
    tags: list[str] = field(default_factory=list)
    id: int = 0
    created_at: str = field(default_factory=lambda: datetime.now().strftime("%Y-%m-%d %H:%M"))

    @property
    def tag_str(self) -> str:
        return ", ".join(self.tags)

    @classmethod
    def from_row(cls, row: tuple) -> "Entry":
        id_, title, content, tags_json, created_at = row
        return cls(
            id=id_,
            title=title,
            content=content,
            tags=json.loads(tags_json) if tags_json else [],
            created_at=created_at,
        )

class KnowledgeDB:
    def __init__(self, db_path: str = ":memory:") -> None:
        self._conn = sqlite3.connect(db_path)
        self._init()

    def _init(self) -> None:
        self._conn.execute("""
            CREATE TABLE IF NOT EXISTS entries (
                id         INTEGER PRIMARY KEY AUTOINCREMENT,
                title      TEXT NOT NULL,
                content    TEXT NOT NULL,
                tags       TEXT DEFAULT '[]',
                created_at TEXT DEFAULT (datetime('now','localtime'))
            )
        """)
        self._conn.execute(
            "CREATE INDEX IF NOT EXISTS idx_title ON entries(title)"
        )
        self._conn.commit()

    def add(self, entry: Entry) -> int:
        cur = self._conn.execute(
            "INSERT INTO entries (title, content, tags) VALUES (?, ?, ?)",
            (entry.title, entry.content, json.dumps(entry.tags, ensure_ascii=False)),
        )
        self._conn.commit()
        return cur.lastrowid or 0

    def search(self, keyword: str) -> list[Entry]:
        rows = self._conn.execute(
            "SELECT id, title, content, tags, created_at FROM entries "
            "WHERE title LIKE ? OR content LIKE ? OR tags LIKE ?",
            (f"%{keyword}%",) * 3,
        ).fetchall()
        return [Entry.from_row(r) for r in rows]

    def list_all(self, limit: int = 20, offset: int = 0) -> list[Entry]:
        rows = self._conn.execute(
            "SELECT id, title, content, tags, created_at FROM entries "
            "ORDER BY id DESC LIMIT ? OFFSET ?",
            (limit, offset),
        ).fetchall()
        return [Entry.from_row(r) for r in rows]

    def count(self) -> int:
        return self._conn.execute("SELECT COUNT(*) FROM entries").fetchone()[0]

    def all_tags(self) -> list[str]:
        rows = self._conn.execute("SELECT tags FROM entries").fetchall()
        tags = []
        for (tags_json,) in rows:
            tags.extend(json.loads(tags_json) if tags_json else [])
        return tags

    def close(self) -> None:
        self._conn.close()

db = KnowledgeDB()
eid = db.add(Entry(title="Redis 缓存策略", content="Cache-Aside 先查缓存,未命中再查数据库", tags=["redis", "缓存"]))
print("新增 ID:", eid)
print("搜索 redis:", [e.title for e in db.search("redis")])
print("总数:", db.count())
print("全部标签:", db.all_tags())
db.close()

Step 3:用 KnowledgeBase 建立面向业务的操作入口

痛点与机制

业务层像门店前台,把“添加知识”“搜索知识”“导出报告”“查看统计”整理成更贴近用户动作的方法。它内部调用 KnowledgeDB,但对外暴露的是更清晰的业务语言。

核心源码(逐字来自文末完整源码)

class KnowledgeBase:
    def __init__(self, db_path: str = ":memory:") -> None:
        self._db = KnowledgeDB(db_path)

    def add_entry(self, title: str, content: str, tags: list[str]) -> int:
        entry = Entry(title=title, content=content, tags=tags)
        eid = self._db.add(entry)
        return eid

    def search(self, keyword: str) -> list[Entry]:
        return self._db.search(keyword)

    def list_entries(self, limit: int = 10) -> list[Entry]:
        return self._db.list_all(limit=limit)

    def export_json(self, path: Path) -> int:
        entries = self._db.list_all(limit=10000)
        data = [
            {"id": e.id, "title": e.title, "content": e.content,
             "tags": e.tags, "created_at": e.created_at}
            for e in entries
        ]
        path.write_text(json.dumps(data, ensure_ascii=False, indent=2))
        return len(data)

    def export_markdown(self, path: Path) -> int:
        entries = self._db.list_all(limit=10000)
        lines = ["# 知识库导出\n", f"导出时间:{datetime.now().strftime('%Y-%m-%d %H:%M')}\n\n"]
        for e in entries:
            lines.append(f"## {e.title}\n\n")
            lines.append(f"**标签**:{e.tag_str or '无'}  \n")
            lines.append(f"**时间**:{e.created_at}\n\n")
            lines.append(f"{e.content}\n\n---\n\n")
        path.write_text("".join(lines), encoding="utf-8")
        return len(entries)

    def stats(self) -> dict:
        total = self._db.count()
        all_tags = self._db.all_tags()
        tag_counts = Counter(all_tags)
        return {"total": total, "tag_counts": dict(tag_counts.most_common(10))}

可运行演示(补齐 Mock 数据与 print 反馈)

import argparse
import json
import re
import sqlite3
import tempfile
import time
from collections import Counter
from dataclasses import dataclass, field
from datetime import datetime
from pathlib import Path
from typing import Optional

@dataclass
class Entry:
    title: str
    content: str
    tags: list[str] = field(default_factory=list)
    id: int = 0
    created_at: str = field(default_factory=lambda: datetime.now().strftime("%Y-%m-%d %H:%M"))

    @property
    def tag_str(self) -> str:
        return ", ".join(self.tags)

    @classmethod
    def from_row(cls, row: tuple) -> "Entry":
        id_, title, content, tags_json, created_at = row
        return cls(
            id=id_,
            title=title,
            content=content,
            tags=json.loads(tags_json) if tags_json else [],
            created_at=created_at,
        )

class KnowledgeDB:
    def __init__(self, db_path: str = ":memory:") -> None:
        self._conn = sqlite3.connect(db_path)
        self._init()

    def _init(self) -> None:
        self._conn.execute("""
            CREATE TABLE IF NOT EXISTS entries (
                id         INTEGER PRIMARY KEY AUTOINCREMENT,
                title      TEXT NOT NULL,
                content    TEXT NOT NULL,
                tags       TEXT DEFAULT '[]',
                created_at TEXT DEFAULT (datetime('now','localtime'))
            )
        """)
        self._conn.execute(
            "CREATE INDEX IF NOT EXISTS idx_title ON entries(title)"
        )
        self._conn.commit()

    def add(self, entry: Entry) -> int:
        cur = self._conn.execute(
            "INSERT INTO entries (title, content, tags) VALUES (?, ?, ?)",
            (entry.title, entry.content, json.dumps(entry.tags, ensure_ascii=False)),
        )
        self._conn.commit()
        return cur.lastrowid or 0

    def search(self, keyword: str) -> list[Entry]:
        rows = self._conn.execute(
            "SELECT id, title, content, tags, created_at FROM entries "
            "WHERE title LIKE ? OR content LIKE ? OR tags LIKE ?",
            (f"%{keyword}%",) * 3,
        ).fetchall()
        return [Entry.from_row(r) for r in rows]

    def list_all(self, limit: int = 20, offset: int = 0) -> list[Entry]:
        rows = self._conn.execute(
            "SELECT id, title, content, tags, created_at FROM entries "
            "ORDER BY id DESC LIMIT ? OFFSET ?",
            (limit, offset),
        ).fetchall()
        return [Entry.from_row(r) for r in rows]

    def count(self) -> int:
        return self._conn.execute("SELECT COUNT(*) FROM entries").fetchone()[0]

    def all_tags(self) -> list[str]:
        rows = self._conn.execute("SELECT tags FROM entries").fetchall()
        tags = []
        for (tags_json,) in rows:
            tags.extend(json.loads(tags_json) if tags_json else [])
        return tags

    def close(self) -> None:
        self._conn.close()

class KnowledgeBase:
    def __init__(self, db_path: str = ":memory:") -> None:
        self._db = KnowledgeDB(db_path)

    def add_entry(self, title: str, content: str, tags: list[str]) -> int:
        entry = Entry(title=title, content=content, tags=tags)
        eid = self._db.add(entry)
        return eid

    def search(self, keyword: str) -> list[Entry]:
        return self._db.search(keyword)

    def list_entries(self, limit: int = 10) -> list[Entry]:
        return self._db.list_all(limit=limit)

    def export_json(self, path: Path) -> int:
        entries = self._db.list_all(limit=10000)
        data = [
            {"id": e.id, "title": e.title, "content": e.content,
             "tags": e.tags, "created_at": e.created_at}
            for e in entries
        ]
        path.write_text(json.dumps(data, ensure_ascii=False, indent=2))
        return len(data)

    def export_markdown(self, path: Path) -> int:
        entries = self._db.list_all(limit=10000)
        lines = ["# 知识库导出\n", f"导出时间:{datetime.now().strftime('%Y-%m-%d %H:%M')}\n\n"]
        for e in entries:
            lines.append(f"## {e.title}\n\n")
            lines.append(f"**标签**:{e.tag_str or '无'}  \n")
            lines.append(f"**时间**:{e.created_at}\n\n")
            lines.append(f"{e.content}\n\n---\n\n")
        path.write_text("".join(lines), encoding="utf-8")
        return len(entries)

    def stats(self) -> dict:
        total = self._db.count()
        all_tags = self._db.all_tags()
        tag_counts = Counter(all_tags)
        return {"total": total, "tag_counts": dict(tag_counts.most_common(10))}

kb = KnowledgeBase()
kb.add_entry("Docker 部署", "docker-compose 管理多服务", ["docker", "部署"])
kb.add_entry("Nginx 反向代理", "proxy_pass 转发请求", ["nginx", "部署"])
print("搜索 部署:", [e.title for e in kb.search("部署")])
print("列表:", [e.title for e in kb.list_entries()])
print("统计:", kb.stats())

Step 4:用 print_entries 把搜索结果排成可读表格

痛点与机制

CLI 工具不只是能跑,还要让人看得懂。print_entries() 把条目按 ID、标题、标签、时间排成表格,并在搜索时高亮关键词。它像把杂乱记录整理成清单。

核心源码(逐字来自文末完整源码)

def print_entries(entries: list[Entry], keyword: str = "") -> None:
    if not entries:
        print("  (无结果)")
        return

    print(f"\n  {'ID':<4} {'标题':<25} {'标签':<20} {'时间'}")
    print(f"  {'─'*4} {'─'*25} {'─'*20} {'─'*16}")
    for e in entries:
        title = e.title[:23] + ".." if len(e.title) > 25 else e.title
        tags = e.tag_str[:18] + ".." if len(e.tag_str) > 20 else e.tag_str
        # 高亮关键词
        if keyword and keyword.lower() in title.lower():
            title = f"*{title}*"
        print(f"  {e.id:<4} {title:<25} {tags:<20} {e.created_at[:16]}")

可运行演示(补齐 Mock 数据与 print 反馈)

import argparse
import json
import re
import sqlite3
import tempfile
import time
from collections import Counter
from dataclasses import dataclass, field
from datetime import datetime
from pathlib import Path
from typing import Optional

@dataclass
class Entry:
    title: str
    content: str
    tags: list[str] = field(default_factory=list)
    id: int = 0
    created_at: str = field(default_factory=lambda: datetime.now().strftime("%Y-%m-%d %H:%M"))

    @property
    def tag_str(self) -> str:
        return ", ".join(self.tags)

    @classmethod
    def from_row(cls, row: tuple) -> "Entry":
        id_, title, content, tags_json, created_at = row
        return cls(
            id=id_,
            title=title,
            content=content,
            tags=json.loads(tags_json) if tags_json else [],
            created_at=created_at,
        )

def print_entries(entries: list[Entry], keyword: str = "") -> None:
    if not entries:
        print("  (无结果)")
        return

    print(f"\n  {'ID':<4} {'标题':<25} {'标签':<20} {'时间'}")
    print(f"  {'─'*4} {'─'*25} {'─'*20} {'─'*16}")
    for e in entries:
        title = e.title[:23] + ".." if len(e.title) > 25 else e.title
        tags = e.tag_str[:18] + ".." if len(e.tag_str) > 20 else e.tag_str
        # 高亮关键词
        if keyword and keyword.lower() in title.lower():
            title = f"*{title}*"
        print(f"  {e.id:<4} {title:<25} {tags:<20} {e.created_at[:16]}")

entries = [
    Entry(id=1, title="Python 类型提示最佳实践", content="...", tags=["python", "类型系统"], created_at="2026-04-17 10:00"),
    Entry(id=2, title="Docker 容器化部署", content="...", tags=["docker", "部署"], created_at="2026-04-17 11:00"),
]
print_entries(entries, keyword="python")
print_entries([], keyword="none")

Step 5:用 print_stats 把标签分布画成终端条形图

痛点与机制

统计结果如果只是一串数字,新手很难快速判断重点。条形图把标签热度变成可视长度,哪个主题最多一眼就能看出来。

核心源码(逐字来自文末完整源码)

def print_stats(stats: dict) -> None:
    print(f"\n  ── 知识库统计 ────────────────────────────")
    print(f"  总条目数: {stats['total']}")
    if stats["tag_counts"]:
        print(f"\n  标签分布 Top 10:")
        max_count = max(stats["tag_counts"].values())
        for tag, count in stats["tag_counts"].items():
            bar = "█" * int(count / max_count * 20)
            print(f"    {tag:<15} {count:>3}  {bar}")

可运行演示(补齐 Mock 数据与 print 反馈)

import argparse
import json
import re
import sqlite3
import tempfile
import time
from collections import Counter
from dataclasses import dataclass, field
from datetime import datetime
from pathlib import Path
from typing import Optional

def print_stats(stats: dict) -> None:
    print(f"\n  ── 知识库统计 ────────────────────────────")
    print(f"  总条目数: {stats['total']}")
    if stats["tag_counts"]:
        print(f"\n  标签分布 Top 10:")
        max_count = max(stats["tag_counts"].values())
        for tag, count in stats["tag_counts"].items():
            bar = "█" * int(count / max_count * 20)
            print(f"    {tag:<15} {count:>3}  {bar}")

stats = {"total": 5, "tag_counts": {"python": 3, "部署": 2, "sqlite": 1}}
print_stats(stats)

Step 6:用 MOCK_ENTRIES 跑通添加、搜索和统计的小闭环

痛点与机制

综合项目必须有内置数据,否则读者第一步就卡在“我该准备什么文件”。MOCK_ENTRIES 像随书练习数据,让添加、搜索、统计都能直接跑起来。

核心源码(逐字来自文末完整源码)

MOCK_ENTRIES = [
    ("Python 类型提示最佳实践",
     "使用 Protocol 定义接口,TypeVar 实现泛型函数,Generic[T] 构建泛型类。Python 3.12+ 支持 class Stack[T] 语法。",
     ["python", "类型系统", "进阶"]),
    ("asyncio 并发模式",
     "async/await 语法实现协程,asyncio.gather 并发执行,asyncio.Queue 实现生产者消费者模式。",
     ["python", "并发", "asyncio"]),
    ("SQLite 性能优化",
     "使用 WAL 模式提升写入性能,为高频查询字段建立索引,批量插入用 executemany,避免频繁 commit。",
     ["数据库", "sqlite", "性能"]),
    ("Docker 容器化部署",
     "Dockerfile 多阶段构建减小镜像体积,docker-compose 管理多服务,健康检查确保服务可用性。",
     ["运维", "docker", "部署"]),
    ("Redis 缓存策略",
     "Cache-Aside 模式:先查缓存,未命中再查数据库并写入缓存。设置合理 TTL 避免缓存雪崩。",
     ["redis", "缓存", "架构"]),
    ("Nginx 反向代理配置",
     "upstream 配置负载均衡,proxy_pass 转发请求,gzip 压缩响应,limit_req 限流防刷。",
     ["运维", "nginx", "部署"]),
    ("Git 工作流规范",
     "feature 分支开发,PR/MR 代码审查,squash merge 保持主干整洁,tag 标记版本发布。",
     ["工具", "git", "协作"]),
    ("机器学习特征工程",
     "标准化用 StandardScaler,归一化用 MinMaxScaler,类别编码用 LabelEncoder/OneHotEncoder,缺失值用 SimpleImputer。",
     ["机器学习", "sklearn", "数据处理"]),
]

可运行演示(补齐 Mock 数据与 print 反馈)

import argparse
import json
import re
import sqlite3
import tempfile
import time
from collections import Counter
from dataclasses import dataclass, field
from datetime import datetime
from pathlib import Path
from typing import Optional

@dataclass
class Entry:
    title: str
    content: str
    tags: list[str] = field(default_factory=list)
    id: int = 0
    created_at: str = field(default_factory=lambda: datetime.now().strftime("%Y-%m-%d %H:%M"))

    @property
    def tag_str(self) -> str:
        return ", ".join(self.tags)

    @classmethod
    def from_row(cls, row: tuple) -> "Entry":
        id_, title, content, tags_json, created_at = row
        return cls(
            id=id_,
            title=title,
            content=content,
            tags=json.loads(tags_json) if tags_json else [],
            created_at=created_at,
        )

class KnowledgeDB:
    def __init__(self, db_path: str = ":memory:") -> None:
        self._conn = sqlite3.connect(db_path)
        self._init()

    def _init(self) -> None:
        self._conn.execute("""
            CREATE TABLE IF NOT EXISTS entries (
                id         INTEGER PRIMARY KEY AUTOINCREMENT,
                title      TEXT NOT NULL,
                content    TEXT NOT NULL,
                tags       TEXT DEFAULT '[]',
                created_at TEXT DEFAULT (datetime('now','localtime'))
            )
        """)
        self._conn.execute(
            "CREATE INDEX IF NOT EXISTS idx_title ON entries(title)"
        )
        self._conn.commit()

    def add(self, entry: Entry) -> int:
        cur = self._conn.execute(
            "INSERT INTO entries (title, content, tags) VALUES (?, ?, ?)",
            (entry.title, entry.content, json.dumps(entry.tags, ensure_ascii=False)),
        )
        self._conn.commit()
        return cur.lastrowid or 0

    def search(self, keyword: str) -> list[Entry]:
        rows = self._conn.execute(
            "SELECT id, title, content, tags, created_at FROM entries "
            "WHERE title LIKE ? OR content LIKE ? OR tags LIKE ?",
            (f"%{keyword}%",) * 3,
        ).fetchall()
        return [Entry.from_row(r) for r in rows]

    def list_all(self, limit: int = 20, offset: int = 0) -> list[Entry]:
        rows = self._conn.execute(
            "SELECT id, title, content, tags, created_at FROM entries "
            "ORDER BY id DESC LIMIT ? OFFSET ?",
            (limit, offset),
        ).fetchall()
        return [Entry.from_row(r) for r in rows]

    def count(self) -> int:
        return self._conn.execute("SELECT COUNT(*) FROM entries").fetchone()[0]

    def all_tags(self) -> list[str]:
        rows = self._conn.execute("SELECT tags FROM entries").fetchall()
        tags = []
        for (tags_json,) in rows:
            tags.extend(json.loads(tags_json) if tags_json else [])
        return tags

    def close(self) -> None:
        self._conn.close()

class KnowledgeBase:
    def __init__(self, db_path: str = ":memory:") -> None:
        self._db = KnowledgeDB(db_path)

    def add_entry(self, title: str, content: str, tags: list[str]) -> int:
        entry = Entry(title=title, content=content, tags=tags)
        eid = self._db.add(entry)
        return eid

    def search(self, keyword: str) -> list[Entry]:
        return self._db.search(keyword)

    def list_entries(self, limit: int = 10) -> list[Entry]:
        return self._db.list_all(limit=limit)

    def export_json(self, path: Path) -> int:
        entries = self._db.list_all(limit=10000)
        data = [
            {"id": e.id, "title": e.title, "content": e.content,
             "tags": e.tags, "created_at": e.created_at}
            for e in entries
        ]
        path.write_text(json.dumps(data, ensure_ascii=False, indent=2))
        return len(data)

    def export_markdown(self, path: Path) -> int:
        entries = self._db.list_all(limit=10000)
        lines = ["# 知识库导出\n", f"导出时间:{datetime.now().strftime('%Y-%m-%d %H:%M')}\n\n"]
        for e in entries:
            lines.append(f"## {e.title}\n\n")
            lines.append(f"**标签**:{e.tag_str or '无'}  \n")
            lines.append(f"**时间**:{e.created_at}\n\n")
            lines.append(f"{e.content}\n\n---\n\n")
        path.write_text("".join(lines), encoding="utf-8")
        return len(entries)

    def stats(self) -> dict:
        total = self._db.count()
        all_tags = self._db.all_tags()
        tag_counts = Counter(all_tags)
        return {"total": total, "tag_counts": dict(tag_counts.most_common(10))}

MOCK_ENTRIES = [
    ("Python 类型提示最佳实践",
     "使用 Protocol 定义接口,TypeVar 实现泛型函数,Generic[T] 构建泛型类。Python 3.12+ 支持 class Stack[T] 语法。",
     ["python", "类型系统", "进阶"]),
    ("asyncio 并发模式",
     "async/await 语法实现协程,asyncio.gather 并发执行,asyncio.Queue 实现生产者消费者模式。",
     ["python", "并发", "asyncio"]),
    ("SQLite 性能优化",
     "使用 WAL 模式提升写入性能,为高频查询字段建立索引,批量插入用 executemany,避免频繁 commit。",
     ["数据库", "sqlite", "性能"]),
    ("Docker 容器化部署",
     "Dockerfile 多阶段构建减小镜像体积,docker-compose 管理多服务,健康检查确保服务可用性。",
     ["运维", "docker", "部署"]),
    ("Redis 缓存策略",
     "Cache-Aside 模式:先查缓存,未命中再查数据库并写入缓存。设置合理 TTL 避免缓存雪崩。",
     ["redis", "缓存", "架构"]),
    ("Nginx 反向代理配置",
     "upstream 配置负载均衡,proxy_pass 转发请求,gzip 压缩响应,limit_req 限流防刷。",
     ["运维", "nginx", "部署"]),
    ("Git 工作流规范",
     "feature 分支开发,PR/MR 代码审查,squash merge 保持主干整洁,tag 标记版本发布。",
     ["工具", "git", "协作"]),
    ("机器学习特征工程",
     "标准化用 StandardScaler,归一化用 MinMaxScaler,类别编码用 LabelEncoder/OneHotEncoder,缺失值用 SimpleImputer。",
     ["机器学习", "sklearn", "数据处理"]),
]

kb = KnowledgeBase()
for title, content, tags in MOCK_ENTRIES[:3]:
    print("新增:", title, "-> ID", kb.add_entry(title, content, tags))
print("搜索 python:", [e.title for e in kb.search("python")])
print("统计:", kb.stats())

Step 7:用 export_json/export_markdown 做到文件导出闭环

痛点与机制

知识库不能只存在内存里,导出能力让它变成真正工具。JSON 适合给程序继续处理,Markdown 适合给人阅读和归档。这里用 tempfile,演示结束后不会污染用户目录。

核心源码(逐字来自文末完整源码)

class KnowledgeBase:
    def __init__(self, db_path: str = ":memory:") -> None:
        self._db = KnowledgeDB(db_path)

    def add_entry(self, title: str, content: str, tags: list[str]) -> int:
        entry = Entry(title=title, content=content, tags=tags)
        eid = self._db.add(entry)
        return eid

    def search(self, keyword: str) -> list[Entry]:
        return self._db.search(keyword)

    def list_entries(self, limit: int = 10) -> list[Entry]:
        return self._db.list_all(limit=limit)

    def export_json(self, path: Path) -> int:
        entries = self._db.list_all(limit=10000)
        data = [
            {"id": e.id, "title": e.title, "content": e.content,
             "tags": e.tags, "created_at": e.created_at}
            for e in entries
        ]
        path.write_text(json.dumps(data, ensure_ascii=False, indent=2))
        return len(data)

    def export_markdown(self, path: Path) -> int:
        entries = self._db.list_all(limit=10000)
        lines = ["# 知识库导出\n", f"导出时间:{datetime.now().strftime('%Y-%m-%d %H:%M')}\n\n"]
        for e in entries:
            lines.append(f"## {e.title}\n\n")
            lines.append(f"**标签**:{e.tag_str or '无'}  \n")
            lines.append(f"**时间**:{e.created_at}\n\n")
            lines.append(f"{e.content}\n\n---\n\n")
        path.write_text("".join(lines), encoding="utf-8")
        return len(entries)

    def stats(self) -> dict:
        total = self._db.count()
        all_tags = self._db.all_tags()
        tag_counts = Counter(all_tags)
        return {"total": total, "tag_counts": dict(tag_counts.most_common(10))}

可运行演示(补齐 Mock 数据与 print 反馈)

import argparse
import json
import re
import sqlite3
import tempfile
import time
from collections import Counter
from dataclasses import dataclass, field
from datetime import datetime
from pathlib import Path
from typing import Optional

@dataclass
class Entry:
    title: str
    content: str
    tags: list[str] = field(default_factory=list)
    id: int = 0
    created_at: str = field(default_factory=lambda: datetime.now().strftime("%Y-%m-%d %H:%M"))

    @property
    def tag_str(self) -> str:
        return ", ".join(self.tags)

    @classmethod
    def from_row(cls, row: tuple) -> "Entry":
        id_, title, content, tags_json, created_at = row
        return cls(
            id=id_,
            title=title,
            content=content,
            tags=json.loads(tags_json) if tags_json else [],
            created_at=created_at,
        )

class KnowledgeDB:
    def __init__(self, db_path: str = ":memory:") -> None:
        self._conn = sqlite3.connect(db_path)
        self._init()

    def _init(self) -> None:
        self._conn.execute("""
            CREATE TABLE IF NOT EXISTS entries (
                id         INTEGER PRIMARY KEY AUTOINCREMENT,
                title      TEXT NOT NULL,
                content    TEXT NOT NULL,
                tags       TEXT DEFAULT '[]',
                created_at TEXT DEFAULT (datetime('now','localtime'))
            )
        """)
        self._conn.execute(
            "CREATE INDEX IF NOT EXISTS idx_title ON entries(title)"
        )
        self._conn.commit()

    def add(self, entry: Entry) -> int:
        cur = self._conn.execute(
            "INSERT INTO entries (title, content, tags) VALUES (?, ?, ?)",
            (entry.title, entry.content, json.dumps(entry.tags, ensure_ascii=False)),
        )
        self._conn.commit()
        return cur.lastrowid or 0

    def search(self, keyword: str) -> list[Entry]:
        rows = self._conn.execute(
            "SELECT id, title, content, tags, created_at FROM entries "
            "WHERE title LIKE ? OR content LIKE ? OR tags LIKE ?",
            (f"%{keyword}%",) * 3,
        ).fetchall()
        return [Entry.from_row(r) for r in rows]

    def list_all(self, limit: int = 20, offset: int = 0) -> list[Entry]:
        rows = self._conn.execute(
            "SELECT id, title, content, tags, created_at FROM entries "
            "ORDER BY id DESC LIMIT ? OFFSET ?",
            (limit, offset),
        ).fetchall()
        return [Entry.from_row(r) for r in rows]

    def count(self) -> int:
        return self._conn.execute("SELECT COUNT(*) FROM entries").fetchone()[0]

    def all_tags(self) -> list[str]:
        rows = self._conn.execute("SELECT tags FROM entries").fetchall()
        tags = []
        for (tags_json,) in rows:
            tags.extend(json.loads(tags_json) if tags_json else [])
        return tags

    def close(self) -> None:
        self._conn.close()

class KnowledgeBase:
    def __init__(self, db_path: str = ":memory:") -> None:
        self._db = KnowledgeDB(db_path)

    def add_entry(self, title: str, content: str, tags: list[str]) -> int:
        entry = Entry(title=title, content=content, tags=tags)
        eid = self._db.add(entry)
        return eid

    def search(self, keyword: str) -> list[Entry]:
        return self._db.search(keyword)

    def list_entries(self, limit: int = 10) -> list[Entry]:
        return self._db.list_all(limit=limit)

    def export_json(self, path: Path) -> int:
        entries = self._db.list_all(limit=10000)
        data = [
            {"id": e.id, "title": e.title, "content": e.content,
             "tags": e.tags, "created_at": e.created_at}
            for e in entries
        ]
        path.write_text(json.dumps(data, ensure_ascii=False, indent=2))
        return len(data)

    def export_markdown(self, path: Path) -> int:
        entries = self._db.list_all(limit=10000)
        lines = ["# 知识库导出\n", f"导出时间:{datetime.now().strftime('%Y-%m-%d %H:%M')}\n\n"]
        for e in entries:
            lines.append(f"## {e.title}\n\n")
            lines.append(f"**标签**:{e.tag_str or '无'}  \n")
            lines.append(f"**时间**:{e.created_at}\n\n")
            lines.append(f"{e.content}\n\n---\n\n")
        path.write_text("".join(lines), encoding="utf-8")
        return len(entries)

    def stats(self) -> dict:
        total = self._db.count()
        all_tags = self._db.all_tags()
        tag_counts = Counter(all_tags)
        return {"total": total, "tag_counts": dict(tag_counts.most_common(10))}

kb = KnowledgeBase()
kb.add_entry("Git 工作流规范", "feature 分支开发,PR 代码审查", ["git", "协作"])
with tempfile.TemporaryDirectory() as tmp:
    json_path = Path(tmp) / "kb.json"
    md_path = Path(tmp) / "kb.md"
    print("JSON 导出条数:", kb.export_json(json_path), "大小:", json_path.stat().st_size)
    print("Markdown 导出条数:", kb.export_markdown(md_path), "大小:", md_path.stat().st_size)
    print("JSON 预览:", json_path.read_text(encoding="utf-8")[:120])

Step 8:用 demo_all 串起添加、搜索、列表、统计、导出

痛点与机制

收官项目最重要的是端到端闭环。demo_all() 像一条验收脚本:批量添加数据,搜索关键词,列出条目,打印统计,再导出报告。跑完它,就说明核心功能都连起来了。

核心源码(逐字来自文末完整源码)

def demo_all(kb: KnowledgeBase) -> None:
    """完整演示所有功能。"""
    print("\n  ── 1. 批量添加知识条目 ───────────────────")
    for title, content, tags in MOCK_ENTRIES:
        eid = kb.add_entry(title, content, tags)
        print(f"  [#{eid:02d}] {title[:40]}")

    print("\n  ── 2. 全文搜索 ───────────────────────────")
    for kw in ["python", "部署", "缓存"]:
        results = kb.search(kw)
        print(f"\n  搜索 '{kw}' → {len(results)} 条结果:")
        print_entries(results, keyword=kw)

    print("\n  ── 3. 列出所有条目 ───────────────────────")
    entries = kb.list_entries(limit=5)
    print_entries(entries)

    print("\n  ── 4. 统计分析 ───────────────────────────")
    print_stats(kb.stats())

    print("\n  ── 5. 导出报告 ───────────────────────────")
    with tempfile.TemporaryDirectory() as tmp:
        json_path = Path(tmp) / "kb_export.json"
        md_path   = Path(tmp) / "kb_export.md"
        n_json = kb.export_json(json_path)
        n_md   = kb.export_markdown(md_path)
        print(f"  JSON 导出: {n_json} 条 → {json_path.name} ({json_path.stat().st_size} bytes)")
        print(f"  MD   导出: {n_md} 条 → {md_path.name}   ({md_path.stat().st_size} bytes)")
        # 预览 JSON 前3行
        lines = json_path.read_text().split("\n")[:4]
        print(f"\n  JSON 预览:\n    " + "\n    ".join(lines))

可运行演示(补齐 Mock 数据与 print 反馈)

import argparse
import json
import re
import sqlite3
import tempfile
import time
from collections import Counter
from dataclasses import dataclass, field
from datetime import datetime
from pathlib import Path
from typing import Optional

@dataclass
class Entry:
    title: str
    content: str
    tags: list[str] = field(default_factory=list)
    id: int = 0
    created_at: str = field(default_factory=lambda: datetime.now().strftime("%Y-%m-%d %H:%M"))

    @property
    def tag_str(self) -> str:
        return ", ".join(self.tags)

    @classmethod
    def from_row(cls, row: tuple) -> "Entry":
        id_, title, content, tags_json, created_at = row
        return cls(
            id=id_,
            title=title,
            content=content,
            tags=json.loads(tags_json) if tags_json else [],
            created_at=created_at,
        )

class KnowledgeDB:
    def __init__(self, db_path: str = ":memory:") -> None:
        self._conn = sqlite3.connect(db_path)
        self._init()

    def _init(self) -> None:
        self._conn.execute("""
            CREATE TABLE IF NOT EXISTS entries (
                id         INTEGER PRIMARY KEY AUTOINCREMENT,
                title      TEXT NOT NULL,
                content    TEXT NOT NULL,
                tags       TEXT DEFAULT '[]',
                created_at TEXT DEFAULT (datetime('now','localtime'))
            )
        """)
        self._conn.execute(
            "CREATE INDEX IF NOT EXISTS idx_title ON entries(title)"
        )
        self._conn.commit()

    def add(self, entry: Entry) -> int:
        cur = self._conn.execute(
            "INSERT INTO entries (title, content, tags) VALUES (?, ?, ?)",
            (entry.title, entry.content, json.dumps(entry.tags, ensure_ascii=False)),
        )
        self._conn.commit()
        return cur.lastrowid or 0

    def search(self, keyword: str) -> list[Entry]:
        rows = self._conn.execute(
            "SELECT id, title, content, tags, created_at FROM entries "
            "WHERE title LIKE ? OR content LIKE ? OR tags LIKE ?",
            (f"%{keyword}%",) * 3,
        ).fetchall()
        return [Entry.from_row(r) for r in rows]

    def list_all(self, limit: int = 20, offset: int = 0) -> list[Entry]:
        rows = self._conn.execute(
            "SELECT id, title, content, tags, created_at FROM entries "
            "ORDER BY id DESC LIMIT ? OFFSET ?",
            (limit, offset),
        ).fetchall()
        return [Entry.from_row(r) for r in rows]

    def count(self) -> int:
        return self._conn.execute("SELECT COUNT(*) FROM entries").fetchone()[0]

    def all_tags(self) -> list[str]:
        rows = self._conn.execute("SELECT tags FROM entries").fetchall()
        tags = []
        for (tags_json,) in rows:
            tags.extend(json.loads(tags_json) if tags_json else [])
        return tags

    def close(self) -> None:
        self._conn.close()

class KnowledgeBase:
    def __init__(self, db_path: str = ":memory:") -> None:
        self._db = KnowledgeDB(db_path)

    def add_entry(self, title: str, content: str, tags: list[str]) -> int:
        entry = Entry(title=title, content=content, tags=tags)
        eid = self._db.add(entry)
        return eid

    def search(self, keyword: str) -> list[Entry]:
        return self._db.search(keyword)

    def list_entries(self, limit: int = 10) -> list[Entry]:
        return self._db.list_all(limit=limit)

    def export_json(self, path: Path) -> int:
        entries = self._db.list_all(limit=10000)
        data = [
            {"id": e.id, "title": e.title, "content": e.content,
             "tags": e.tags, "created_at": e.created_at}
            for e in entries
        ]
        path.write_text(json.dumps(data, ensure_ascii=False, indent=2))
        return len(data)

    def export_markdown(self, path: Path) -> int:
        entries = self._db.list_all(limit=10000)
        lines = ["# 知识库导出\n", f"导出时间:{datetime.now().strftime('%Y-%m-%d %H:%M')}\n\n"]
        for e in entries:
            lines.append(f"## {e.title}\n\n")
            lines.append(f"**标签**:{e.tag_str or '无'}  \n")
            lines.append(f"**时间**:{e.created_at}\n\n")
            lines.append(f"{e.content}\n\n---\n\n")
        path.write_text("".join(lines), encoding="utf-8")
        return len(entries)

    def stats(self) -> dict:
        total = self._db.count()
        all_tags = self._db.all_tags()
        tag_counts = Counter(all_tags)
        return {"total": total, "tag_counts": dict(tag_counts.most_common(10))}

def print_entries(entries: list[Entry], keyword: str = "") -> None:
    if not entries:
        print("  (无结果)")
        return

    print(f"\n  {'ID':<4} {'标题':<25} {'标签':<20} {'时间'}")
    print(f"  {'─'*4} {'─'*25} {'─'*20} {'─'*16}")
    for e in entries:
        title = e.title[:23] + ".." if len(e.title) > 25 else e.title
        tags = e.tag_str[:18] + ".." if len(e.tag_str) > 20 else e.tag_str
        # 高亮关键词
        if keyword and keyword.lower() in title.lower():
            title = f"*{title}*"
        print(f"  {e.id:<4} {title:<25} {tags:<20} {e.created_at[:16]}")

def print_stats(stats: dict) -> None:
    print(f"\n  ── 知识库统计 ────────────────────────────")
    print(f"  总条目数: {stats['total']}")
    if stats["tag_counts"]:
        print(f"\n  标签分布 Top 10:")
        max_count = max(stats["tag_counts"].values())
        for tag, count in stats["tag_counts"].items():
            bar = "█" * int(count / max_count * 20)
            print(f"    {tag:<15} {count:>3}  {bar}")

MOCK_ENTRIES = [
    ("Python 类型提示最佳实践",
     "使用 Protocol 定义接口,TypeVar 实现泛型函数,Generic[T] 构建泛型类。Python 3.12+ 支持 class Stack[T] 语法。",
     ["python", "类型系统", "进阶"]),
    ("asyncio 并发模式",
     "async/await 语法实现协程,asyncio.gather 并发执行,asyncio.Queue 实现生产者消费者模式。",
     ["python", "并发", "asyncio"]),
    ("SQLite 性能优化",
     "使用 WAL 模式提升写入性能,为高频查询字段建立索引,批量插入用 executemany,避免频繁 commit。",
     ["数据库", "sqlite", "性能"]),
    ("Docker 容器化部署",
     "Dockerfile 多阶段构建减小镜像体积,docker-compose 管理多服务,健康检查确保服务可用性。",
     ["运维", "docker", "部署"]),
    ("Redis 缓存策略",
     "Cache-Aside 模式:先查缓存,未命中再查数据库并写入缓存。设置合理 TTL 避免缓存雪崩。",
     ["redis", "缓存", "架构"]),
    ("Nginx 反向代理配置",
     "upstream 配置负载均衡,proxy_pass 转发请求,gzip 压缩响应,limit_req 限流防刷。",
     ["运维", "nginx", "部署"]),
    ("Git 工作流规范",
     "feature 分支开发,PR/MR 代码审查,squash merge 保持主干整洁,tag 标记版本发布。",
     ["工具", "git", "协作"]),
    ("机器学习特征工程",
     "标准化用 StandardScaler,归一化用 MinMaxScaler,类别编码用 LabelEncoder/OneHotEncoder,缺失值用 SimpleImputer。",
     ["机器学习", "sklearn", "数据处理"]),
]

def demo_all(kb: KnowledgeBase) -> None:
    """完整演示所有功能。"""
    print("\n  ── 1. 批量添加知识条目 ───────────────────")
    for title, content, tags in MOCK_ENTRIES:
        eid = kb.add_entry(title, content, tags)
        print(f"  [#{eid:02d}] {title[:40]}")

    print("\n  ── 2. 全文搜索 ───────────────────────────")
    for kw in ["python", "部署", "缓存"]:
        results = kb.search(kw)
        print(f"\n  搜索 '{kw}' → {len(results)} 条结果:")
        print_entries(results, keyword=kw)

    print("\n  ── 3. 列出所有条目 ───────────────────────")
    entries = kb.list_entries(limit=5)
    print_entries(entries)

    print("\n  ── 4. 统计分析 ───────────────────────────")
    print_stats(kb.stats())

    print("\n  ── 5. 导出报告 ───────────────────────────")
    with tempfile.TemporaryDirectory() as tmp:
        json_path = Path(tmp) / "kb_export.json"
        md_path   = Path(tmp) / "kb_export.md"
        n_json = kb.export_json(json_path)
        n_md   = kb.export_markdown(md_path)
        print(f"  JSON 导出: {n_json} 条 → {json_path.name} ({json_path.stat().st_size} bytes)")
        print(f"  MD   导出: {n_md} 条 → {md_path.name}   ({md_path.stat().st_size} bytes)")
        # 预览 JSON 前3行
        lines = json_path.read_text().split("\n")[:4]
        print(f"\n  JSON 预览:\n    " + "\n    ".join(lines))

kb = KnowledgeBase()
demo_all(kb)

Step 9:用 main 做 demo/add/search/list/stats/export 命令入口

痛点与机制

最终工具要交给普通用户使用,入口必须稳定。main()argparse 把六种模式收拢到一个脚本里,用户通过参数切换动作,不需要改源码。

核心源码(逐字来自文末完整源码)

def main() -> None:
    parser = argparse.ArgumentParser(
        description="本地知识库 CLI 工具",
        formatter_class=argparse.RawDescriptionHelpFormatter,
        epilog="示例:\n  python3 knowledge_base.py --mode demo\n  python3 knowledge_base.py --mode stats",
    )
    parser.add_argument(
        "--mode",
        choices=["demo", "add", "search", "list", "stats", "export"],
        default="demo",
    )
    parser.add_argument("--keyword", help="搜索关键词(--mode search 时使用)")
    args = parser.parse_args()

    kb = KnowledgeBase()  # 内存数据库,零副作用

    # 预填充数据(非 demo 模式也需要数据)
    if args.mode != "demo":
        for title, content, tags in MOCK_ENTRIES:
            kb.add_entry(title, content, tags)

    if args.mode == "demo":
        demo_all(kb)
    elif args.mode == "search":
        kw = args.keyword or "python"
        results = kb.search(kw)
        print(f"\n  搜索 '{kw}' → {len(results)} 条结果:")
        print_entries(results, keyword=kw)
    elif args.mode == "list":
        entries = kb.list_entries()
        print_entries(entries)
    elif args.mode == "stats":
        print_stats(kb.stats())
    elif args.mode == "export":
        with tempfile.TemporaryDirectory() as tmp:
            p = Path(tmp) / "export.json"
            n = kb.export_json(p)
            print(f"  导出 {n} 条到 {p}")
            print(p.read_text()[:200])

可运行演示(补齐 Mock 数据与 print 反馈)

import argparse
import json
import re
import sqlite3
import tempfile
import time
from collections import Counter
from dataclasses import dataclass, field
from datetime import datetime
from pathlib import Path
from typing import Optional

def main() -> None:
    parser = argparse.ArgumentParser(
        description="本地知识库 CLI 工具",
        formatter_class=argparse.RawDescriptionHelpFormatter,
        epilog="示例:\n  python3 knowledge_base.py --mode demo\n  python3 knowledge_base.py --mode stats",
    )
    parser.add_argument(
        "--mode",
        choices=["demo", "add", "search", "list", "stats", "export"],
        default="demo",
    )
    parser.add_argument("--keyword", help="搜索关键词(--mode search 时使用)")
    args = parser.parse_args()

    kb = KnowledgeBase()  # 内存数据库,零副作用

    # 预填充数据(非 demo 模式也需要数据)
    if args.mode != "demo":
        for title, content, tags in MOCK_ENTRIES:
            kb.add_entry(title, content, tags)

    if args.mode == "demo":
        demo_all(kb)
    elif args.mode == "search":
        kw = args.keyword or "python"
        results = kb.search(kw)
        print(f"\n  搜索 '{kw}' → {len(results)} 条结果:")
        print_entries(results, keyword=kw)
    elif args.mode == "list":
        entries = kb.list_entries()
        print_entries(entries)
    elif args.mode == "stats":
        print_stats(kb.stats())
    elif args.mode == "export":
        with tempfile.TemporaryDirectory() as tmp:
            p = Path(tmp) / "export.json"
            n = kb.export_json(p)
            print(f"  导出 {n} 条到 {p}")
            print(p.read_text()[:200])

class KnowledgeBase:
    def __init__(self) -> None:
        self.items = []
    def add_entry(self, title: str, content: str, tags: list[str]) -> int:
        self.items.append((title, content, tags)); return len(self.items)
    def search(self, keyword: str):
        return []
    def list_entries(self):
        return []
    def stats(self):
        return {"total": len(self.items), "tag_counts": {}}
    def export_json(self, path: Path) -> int:
        path.write_text("[]", encoding="utf-8"); return 0

def demo_all(kb: KnowledgeBase) -> None:
    print("运行 demo")

def print_entries(entries, keyword: str = "") -> None:
    print("打印条目", len(entries))

def print_stats(stats: dict) -> None:
    print("打印统计", stats)

MOCK_ENTRIES = [("Python", "content", ["python"])]

import sys
for mode in ["demo", "search", "list", "stats", "export"]:
    print(f"\n$ python knowledge_base.py --mode {mode}")
    sys.argv = ["knowledge_base.py", "--mode", mode]
    if mode == "search":
        sys.argv += ["--keyword", "python"]
    main()

极客实战:完整源码与运行

现在,把上面的积木拼起来,将下面完整代码保存为 knowledge_base.py。默认 --mode demo 会在内存数据库里跑完整流程,不会污染你的本地文件;导出模式使用临时目录完成演示。


"""
本地知识库 CLI 工具 —— 全栈综合实战。
用法:
    python3 knowledge_base.py --mode demo    # 完整演示
    python3 knowledge_base.py --mode add     # 添加条目
    python3 knowledge_base.py --mode search  # 搜索
    python3 knowledge_base.py --mode stats   # 统计
    python3 knowledge_base.py --mode export  # 导出
"""

import argparse
import json
import re
import sqlite3
import tempfile
import time
from collections import Counter
from dataclasses import dataclass, field
from datetime import datetime
from pathlib import Path
from typing import Optional


# ── 数据模型 ──────────────────────────────────────────────────
@dataclass
class Entry:
    title: str
    content: str
    tags: list[str] = field(default_factory=list)
    id: int = 0
    created_at: str = field(default_factory=lambda: datetime.now().strftime("%Y-%m-%d %H:%M"))

    @property
    def tag_str(self) -> str:
        return ", ".join(self.tags)

    @classmethod
    def from_row(cls, row: tuple) -> "Entry":
        id_, title, content, tags_json, created_at = row
        return cls(
            id=id_,
            title=title,
            content=content,
            tags=json.loads(tags_json) if tags_json else [],
            created_at=created_at,
        )


# ── 数据库层 ──────────────────────────────────────────────────
class KnowledgeDB:
    def __init__(self, db_path: str = ":memory:") -> None:
        self._conn = sqlite3.connect(db_path)
        self._init()

    def _init(self) -> None:
        self._conn.execute("""
            CREATE TABLE IF NOT EXISTS entries (
                id         INTEGER PRIMARY KEY AUTOINCREMENT,
                title      TEXT NOT NULL,
                content    TEXT NOT NULL,
                tags       TEXT DEFAULT '[]',
                created_at TEXT DEFAULT (datetime('now','localtime'))
            )
        """)
        self._conn.execute(
            "CREATE INDEX IF NOT EXISTS idx_title ON entries(title)"
        )
        self._conn.commit()

    def add(self, entry: Entry) -> int:
        cur = self._conn.execute(
            "INSERT INTO entries (title, content, tags) VALUES (?, ?, ?)",
            (entry.title, entry.content, json.dumps(entry.tags, ensure_ascii=False)),
        )
        self._conn.commit()
        return cur.lastrowid or 0

    def search(self, keyword: str) -> list[Entry]:
        rows = self._conn.execute(
            "SELECT id, title, content, tags, created_at FROM entries "
            "WHERE title LIKE ? OR content LIKE ? OR tags LIKE ?",
            (f"%{keyword}%",) * 3,
        ).fetchall()
        return [Entry.from_row(r) for r in rows]

    def list_all(self, limit: int = 20, offset: int = 0) -> list[Entry]:
        rows = self._conn.execute(
            "SELECT id, title, content, tags, created_at FROM entries "
            "ORDER BY id DESC LIMIT ? OFFSET ?",
            (limit, offset),
        ).fetchall()
        return [Entry.from_row(r) for r in rows]

    def count(self) -> int:
        return self._conn.execute("SELECT COUNT(*) FROM entries").fetchone()[0]

    def all_tags(self) -> list[str]:
        rows = self._conn.execute("SELECT tags FROM entries").fetchall()
        tags = []
        for (tags_json,) in rows:
            tags.extend(json.loads(tags_json) if tags_json else [])
        return tags

    def close(self) -> None:
        self._conn.close()


# ── 业务层 ────────────────────────────────────────────────────
class KnowledgeBase:
    def __init__(self, db_path: str = ":memory:") -> None:
        self._db = KnowledgeDB(db_path)

    def add_entry(self, title: str, content: str, tags: list[str]) -> int:
        entry = Entry(title=title, content=content, tags=tags)
        eid = self._db.add(entry)
        return eid

    def search(self, keyword: str) -> list[Entry]:
        return self._db.search(keyword)

    def list_entries(self, limit: int = 10) -> list[Entry]:
        return self._db.list_all(limit=limit)

    def export_json(self, path: Path) -> int:
        entries = self._db.list_all(limit=10000)
        data = [
            {"id": e.id, "title": e.title, "content": e.content,
             "tags": e.tags, "created_at": e.created_at}
            for e in entries
        ]
        path.write_text(json.dumps(data, ensure_ascii=False, indent=2))
        return len(data)

    def export_markdown(self, path: Path) -> int:
        entries = self._db.list_all(limit=10000)
        lines = ["# 知识库导出\n", f"导出时间:{datetime.now().strftime('%Y-%m-%d %H:%M')}\n\n"]
        for e in entries:
            lines.append(f"## {e.title}\n\n")
            lines.append(f"**标签**:{e.tag_str or '无'}  \n")
            lines.append(f"**时间**:{e.created_at}\n\n")
            lines.append(f"{e.content}\n\n---\n\n")
        path.write_text("".join(lines), encoding="utf-8")
        return len(entries)

    def stats(self) -> dict:
        total = self._db.count()
        all_tags = self._db.all_tags()
        tag_counts = Counter(all_tags)
        return {"total": total, "tag_counts": dict(tag_counts.most_common(10))}


# ── 输出格式化 ────────────────────────────────────────────────
def print_entries(entries: list[Entry], keyword: str = "") -> None:
    if not entries:
        print("  (无结果)")
        return

    print(f"\n  {'ID':<4} {'标题':<25} {'标签':<20} {'时间'}")
    print(f"  {'─'*4} {'─'*25} {'─'*20} {'─'*16}")
    for e in entries:
        title = e.title[:23] + ".." if len(e.title) > 25 else e.title
        tags = e.tag_str[:18] + ".." if len(e.tag_str) > 20 else e.tag_str
        # 高亮关键词
        if keyword and keyword.lower() in title.lower():
            title = f"*{title}*"
        print(f"  {e.id:<4} {title:<25} {tags:<20} {e.created_at[:16]}")


def print_stats(stats: dict) -> None:
    print(f"\n  ── 知识库统计 ────────────────────────────")
    print(f"  总条目数: {stats['total']}")
    if stats["tag_counts"]:
        print(f"\n  标签分布 Top 10:")
        max_count = max(stats["tag_counts"].values())
        for tag, count in stats["tag_counts"].items():
            bar = "█" * int(count / max_count * 20)
            print(f"    {tag:<15} {count:>3}  {bar}")


# ── Mock 数据 ─────────────────────────────────────────────────
MOCK_ENTRIES = [
    ("Python 类型提示最佳实践",
     "使用 Protocol 定义接口,TypeVar 实现泛型函数,Generic[T] 构建泛型类。Python 3.12+ 支持 class Stack[T] 语法。",
     ["python", "类型系统", "进阶"]),
    ("asyncio 并发模式",
     "async/await 语法实现协程,asyncio.gather 并发执行,asyncio.Queue 实现生产者消费者模式。",
     ["python", "并发", "asyncio"]),
    ("SQLite 性能优化",
     "使用 WAL 模式提升写入性能,为高频查询字段建立索引,批量插入用 executemany,避免频繁 commit。",
     ["数据库", "sqlite", "性能"]),
    ("Docker 容器化部署",
     "Dockerfile 多阶段构建减小镜像体积,docker-compose 管理多服务,健康检查确保服务可用性。",
     ["运维", "docker", "部署"]),
    ("Redis 缓存策略",
     "Cache-Aside 模式:先查缓存,未命中再查数据库并写入缓存。设置合理 TTL 避免缓存雪崩。",
     ["redis", "缓存", "架构"]),
    ("Nginx 反向代理配置",
     "upstream 配置负载均衡,proxy_pass 转发请求,gzip 压缩响应,limit_req 限流防刷。",
     ["运维", "nginx", "部署"]),
    ("Git 工作流规范",
     "feature 分支开发,PR/MR 代码审查,squash merge 保持主干整洁,tag 标记版本发布。",
     ["工具", "git", "协作"]),
    ("机器学习特征工程",
     "标准化用 StandardScaler,归一化用 MinMaxScaler,类别编码用 LabelEncoder/OneHotEncoder,缺失值用 SimpleImputer。",
     ["机器学习", "sklearn", "数据处理"]),
]


# ── 主程序 ────────────────────────────────────────────────────
def demo_all(kb: KnowledgeBase) -> None:
    """完整演示所有功能。"""
    print("\n  ── 1. 批量添加知识条目 ───────────────────")
    for title, content, tags in MOCK_ENTRIES:
        eid = kb.add_entry(title, content, tags)
        print(f"  [#{eid:02d}] {title[:40]}")

    print("\n  ── 2. 全文搜索 ───────────────────────────")
    for kw in ["python", "部署", "缓存"]:
        results = kb.search(kw)
        print(f"\n  搜索 '{kw}' → {len(results)} 条结果:")
        print_entries(results, keyword=kw)

    print("\n  ── 3. 列出所有条目 ───────────────────────")
    entries = kb.list_entries(limit=5)
    print_entries(entries)

    print("\n  ── 4. 统计分析 ───────────────────────────")
    print_stats(kb.stats())

    print("\n  ── 5. 导出报告 ───────────────────────────")
    with tempfile.TemporaryDirectory() as tmp:
        json_path = Path(tmp) / "kb_export.json"
        md_path   = Path(tmp) / "kb_export.md"
        n_json = kb.export_json(json_path)
        n_md   = kb.export_markdown(md_path)
        print(f"  JSON 导出: {n_json} 条 → {json_path.name} ({json_path.stat().st_size} bytes)")
        print(f"  MD   导出: {n_md} 条 → {md_path.name}   ({md_path.stat().st_size} bytes)")
        # 预览 JSON 前3行
        lines = json_path.read_text().split("\n")[:4]
        print(f"\n  JSON 预览:\n    " + "\n    ".join(lines))


def main() -> None:
    parser = argparse.ArgumentParser(
        description="本地知识库 CLI 工具",
        formatter_class=argparse.RawDescriptionHelpFormatter,
        epilog="示例:\n  python3 knowledge_base.py --mode demo\n  python3 knowledge_base.py --mode stats",
    )
    parser.add_argument(
        "--mode",
        choices=["demo", "add", "search", "list", "stats", "export"],
        default="demo",
    )
    parser.add_argument("--keyword", help="搜索关键词(--mode search 时使用)")
    args = parser.parse_args()

    kb = KnowledgeBase()  # 内存数据库,零副作用

    # 预填充数据(非 demo 模式也需要数据)
    if args.mode != "demo":
        for title, content, tags in MOCK_ENTRIES:
            kb.add_entry(title, content, tags)

    if args.mode == "demo":
        demo_all(kb)
    elif args.mode == "search":
        kw = args.keyword or "python"
        results = kb.search(kw)
        print(f"\n  搜索 '{kw}' → {len(results)} 条结果:")
        print_entries(results, keyword=kw)
    elif args.mode == "list":
        entries = kb.list_entries()
        print_entries(entries)
    elif args.mode == "stats":
        print_stats(kb.stats())
    elif args.mode == "export":
        with tempfile.TemporaryDirectory() as tmp:
            p = Path(tmp) / "export.json"
            n = kb.export_json(p)
            print(f"  导出 {n} 条到 {p}")
            print(p.read_text()[:200])


if __name__ == "__main__":
    main()
$ python knowledge_base.py --mode demo
── 1. 批量添加知识条目 ───────────────────
  [#01] Python 类型提示最佳实践
  [#02] asyncio 并发模式
  [#03] SQLite 性能优化
  [#04] Docker 容器化部署
  [#05] Redis 缓存策略
  [#06] Nginx 反向代理配置
  [#07] Git 工作流规范
  [#08] 机器学习特征工程

  ── 2. 全文搜索 ───────────────────────────

  搜索 'python'2 条结果:

  ID   标题                        标签                   时间
  ──── ───────────────────────── ──────────────────── ────────────────
  1    *Python 类型提示最佳实践*         python, 类型系统, 进阶     2026-04-18 15:33
  2    asyncio 并发模式              python, 并发, asyncio  2026-04-18 15:33

  搜索 '部署'2 条结果:

  ID   标题                        标签                   时间
  ──── ───────────────────────── ──────────────────── ────────────────
  4    *Docker 容器化部署*            运维, docker, 部署       2026-04-18 15:33
  6    Nginx 反向代理配置              运维, nginx, 部署        2026-04-18 15:33

  搜索 '缓存'1 条结果:

  ID   标题                        标签                   时间
  ──── ───────────────────────── ──────────────────── ────────────────
  5    *Redis 缓存策略*              redis, 缓存, 架构        2026-04-18 15:33

  ── 3. 列出所有条目 ───────────────────────

  ID   标题                        标签                   时间
  ──── ───────────────────────── ──────────────────── ────────────────
  8    机器学习特征工程                  机器学习, sklearn, 数据处理  2026-04-18 15:33
  7    Git 工作流规范                 工具, git, 协作          2026-04-18 15:33
  6    Nginx 反向代理配置              运维, nginx, 部署        2026-04-18 15:33
  5    Redis 缓存策略                redis, 缓存, 架构        2026-04-18 15:33
  4    Docker 容器化部署              运维, docker, 部署       2026-04-18 15:33


$ python knowledge_base.py --mode stats
── 知识库统计 ────────────────────────────
  总条目数: 8

  标签分布 Top 10:
    python            2  ████████████████████
    运维                2  ████████████████████
    部署                2  ████████████████████
    类型系统              1  ██████████
    进阶                1  ██████████
    并发                1  ██████████
    asyncio           1  ██████████
    数据库               1  ██████████
    sqlite            1  ██████████
    性能                1  ██████████

$ python knowledge_base.py --mode search --keyword python
搜索 'python'2 条结果:

  ID   标题                        标签                   时间
  ──── ───────────────────────── ──────────────────── ────────────────
  1    *Python 类型提示最佳实践*         python, 类型系统, 进阶     2026-04-18 15:33
  2    asyncio 并发模式              python, 并发, asyncio  2026-04-18 15:33

$ python knowledge_base.py --mode export
导出 8 条到 /var/folders/8x/17r6wk3947s1dgn0_jj071dr0000gn/T/tmpmi9jp593/export.json
[
  {
    "id": 8,
    "title": "机器学习特征工程",
    "content": "标准化用 StandardScaler,归一化用 MinMaxScaler,类别编码用 LabelEncoder/OneHotEncoder,缺失值用 SimpleImputer。",
    "tags": [
      "机器学习",
      "sklearn",

小结与 NexDo Time ⚡

这一篇把整个 Python 教程系列收束成一个真实 CLI 工具:用 dataclass 定义数据,用 SQLite 保存数据,用业务层隔离 SQL,用 argparse 暴露命令,用 JSON/Markdown 完成导出。你现在看到的不是零散语法,而是一套能继续扩展的工程骨架。

5 分钟微操挑战:给 CLI 增加一个 --mode tag,通过 --keyword python 列出包含指定标签的所有条目。提示:可以先在 KnowledgeDB 里复用 tags LIKE ? 查询。

Don’t wait for next time, do it in the next moment.