你醒来后打开笔记本电脑,发现浏览器中打开了27个标签页,收件箱里堆满了未读的新闻邮件,会议记录分散在三个不同的应用程序中。这种情况听起来是否很熟悉?

现在想象一下,如果你有一支由专业助手组成的团队,他们整夜都在为你工作:一个人负责阅读你输入的信息,一个人负责总结其中的关键要点,一个人负责判断哪些内容最为重要,还有一个人负责将所有这些信息整理成一份整洁的每日摘要,直接发送到你的收件箱里。

这本手册正是指导你构建这样一个系统的。你会创建一个多智能体AI系统,在这个系统中,四个基于Python的智能体会各自承担一项任务。你会使用Docker为每个智能体创建容器,这样整个系统就能在任何机器上可靠地运行。同时,你会通过Docker Compose将所有组件连接起来,从而能够通过一个命令就启动整个系统流程。

这本手册假设你已经熟悉阅读Python代码,但并不要求你之前一定使用过Docker。如果你从未编写过Dockerfile或运行过容器,也没有关系——在学习过程中我们会逐步讲解这些基础知识。

最终,你会拥有一个能够将各种数字信息转化为有条理的每日摘要的系统,并且你能充分理解其背后的工作原理,从而将其应用到自己的项目中去。

目录

  • 什么是多智能体系统(以及为什么需要构建它)?

  • 什么是Docker(以及它在这里为何如此重要)?

  • 如何规划系统架构

  • 先决条件与环境配置

  • 如何一步步构建每个智能体

  • 如何处理敏感信息与API密钥

    什么是多智能体系统(以及为什么要构建这样的系统?)

    传统脚本的工作原理

    传统的Python脚本会按照固定的流程运行。它读取输入数据,通过一系列预先编写好的步骤进行处理,然后生成输出结果。如果输入数据的格式发生哪怕是很小的变化,脚本往往就会出错。可以把这种机制想象成在铁轨上运行的火车——火车速度很快且效率很高,但它们只能沿着铁轨前进;如果轨道被堵塞了,火车就会停下来。

    人工智能智能体的不同之处

    人工智能智能体更像是公交车司机。它也有一个目标,但可以根据当前的情况来决定选择哪条路线。如果某条路被堵住了,它会另寻其他路径。

    通常情况下,这些智能体会遵循一种被称为“ReAct模式”的运行流程,即先进行推理,然后再采取行动。在每一步中,智能体会思考应该做什么、执行相应的操作、观察结果,然后判断是否已经达到了目标。如果没有达到目标,它会重新开始这个过程;如果成功达到了目标,就会结束当前的执行流程。

    在实际应用中,这意味着基于大语言模型的智能体能够比传统脚本更好地处理那些复杂且难以预测的输入数据。例如,如果一份新闻简报改变了格式,摘要生成模块仍然可以从中提取关键信息,因为它会根据内容来进行推理,而不是机械地解析固定的结构。

    为什么使用多个智能体而不仅仅是一个?

    你可能会想:为什么不干脆使用一个功能强大的智能体来完成所有任务呢?这种设计被称为“上帝模型”模式,但实际上它存在很多问题。当你要求一个大语言模型同时完成数据采集、内容摘要生成、优先级排序以及格式化等工作时,你会让它一下子面临太多需要处理的任务。由于大语言模型的上下文处理能力和注意力范围都是有限的,因此如果你给它们安排过多的任务,它们很可能会出错,或者产生不一致的结果。

    多智能体系统通过职责分离的原则来解决这个问题。每个智能体只负责一项特定的任务:数据采集模块负责读取和合并原始文件,不需要使用大语言模型;摘要生成模块会向大语言模型发送具体的指令,要求它仅对某段文本进行总结;优先级排序模块则会根据关键词来为各项任务分配顺序,同样不需要大语言模型的帮助;而格式化模块则负责生成Markdown格式的输出结果,这些步骤也都无需借助大语言模型来完成。

    这种设计有几个明显的优势:首先,每个智能体都更易于开发、测试和调试;其次,你可以随时更换摘要生成模块,使用性能更好的模型来替代它,而不会影响到其他部分的运行;最后,你还可以独立地对各个智能体进行扩展——比如,当输入数据量很大时,可以同时运行多个摘要生成模块来提高处理效率。

    什么是Docker(以及为什么它在多智能体系统中如此重要?)

    环境配置问题

    如果你曾经与他人共享过一个Python项目,却听到对方说“在我的机器上无法正常运行”,那么你就已经理解了Docker所能解决的问题。每一个Python项目都会依赖于特定版本的Python本身,以及诸如openairequestsbeautifulsoup4这样的第三方库。这些依赖关系都存储在操作系统的环境配置中。因此,当你安装了一个新的库或者升级了Python版本时,就很有可能会影响到那些依赖于旧版本的项目。

    虚拟环境确实能起到帮助作用,但它们只能隔离Python包,而无法隔离操作系统、系统库或你的代码可能需要的其他工具。此外,这些虚拟环境也不能保证其他人能够完全复现你的开发环境。对于多智能体系统来说,这个问题会更加严重——因为每个智能体可能都需要不同的依赖项,如果它们共享同一个环境,这些依赖项就很有可能发生冲突。

    Docker是如何解决这个问题的

    Docker会将你的代码、其依赖项以及一个最基本的操作系统打包成一个称为容器的独立单元。当你运行这个容器时,无论它是在你的笔记本电脑上、同事的电脑上还是云服务器上运行,它的表现都完全一致。可以把Docker容器想象成用于软件运输的集装箱——里面的内容被密封起来,从而与外部环境隔离开来。

    有几个关键的Docker概念需要了解:

    镜像——一种只读模板,其中包含了你的代码、依赖项以及最基本的操作系统。你可以通过Dockerfile来构建这个镜像,可以把Dockerfile看作是一份制作软件的“食谱”。

    容器——镜像运行后的实例。当你“运行”一个镜像时,Docker会据此创建一个容器。可以把容器想象成根据这份“食谱”制作出来的成品。

    Dockerfile——一份包含构建镜像所需指令的文本文件。它指定了基础操作系统、需要安装的软件包、要复制到容器中的代码,以及容器启动时需要执行的命令。

    ——一种在你的计算机与容器之间,或者在多个容器之间共享文件的方式。我们的智能体会使用共享卷来互相传递数据。

    Docker Compose——一个用于定义并同时运行多个容器的工具。你可以在一个YAML文件中描述所有容器,Docker Compose会负责处理这些容器的构建、网络配置以及启动顺序等问题。

    Docker层的工作原理

    Docker是通过逐层构建的方式来创建镜像的。Dockerfile中的每一条指令都会生成一层新的镜像层。Docker会缓存这些层,因此如果某层自上次构建以来没有发生变化,Docker就会直接使用缓存的版本而不会重新构建它。这就是为什么Dockerfile必须按照特定的顺序编写:基础操作系统层很少会改变,当requirements.txt文件中的依赖项列表发生变化时,对应的依赖项安装层才会被更新,而应用程序代码层则会在每次修改代码后都被重新构建。由于将依赖项的安装步骤放在复制代码之前,因此只有当实际需要的依赖项发生变化时,Docker才会重新执行pip install命令,这样重建过程的速度就会大大加快——通常只需要几秒钟而已。

    使用Docker与不使用Docker的区别

    需要明确的是,对于本教程来说,你并不一定非得使用Docker。你可以将这四个智能体都作为普通的Python脚本来运行。但是,如果不使用Docker,就会遇到以下问题:由于所有智能体共享同一个环境,依赖项之间很容易发生冲突;在进行扩展操作时必须手动管理各种流程;每在新的机器上运行这些程序时都需要重新进行所有的配置设置;进行测试时需要复杂的协调工作;而且当某个智能体需要Python 3.8版本而另一个智能体需要Python 3.10版本时,管理不同版本的Python会变得非常麻烦。而使用Docker的话,每个智能体都可以拥有自己独立的隔离环境,你只需要用一条命令docker compose up就能同时运行多个容器,而且每个容器都会独立地运行自己所指定的Python版本,从而大大简化了开发流程并提高了效率。

    对于个人项目来说,这两种方法都可以使用。但如果你想要分享这个系统、将其部署到服务器上,或者在云端运行它,那么Docker就能让你从“这里有一份包含15个设置步骤的README文件”转变为“只需运行docker compose up即可”。

    如何规划系统架构

    在编写任何代码之前,先弄清楚各个组件是如何协同工作的才是有意义的。整个系统由四个代理组成,它们按顺序排列成一条处理流程,而这一切都由Docker Compose来协调管理。数据会依次经过采集代理、摘要生成代理、优先级排序代理和格式化代理进行处理。每个代理都会从共享存储空间中读取数据、处理输入内容、生成结果,然后结束自己的运行。Docker Compose会确保各个容器按顺序执行:只有当前容器成功运行完成后,下一个容器才会开始启动。

    这是一种同步的处理流程:各个代理会依次逐个运行。这是实现和理解起来最简单的多代理架构模式。对于更复杂的系统,你可以用Redis或RabbitMQ这样的消息队列来替代共享存储空间,这样代理们就可以异步运行并响应各种事件。但对于这种用于每日摘要生成的场景来说,顺序执行的方式已经完全足够了。

    就各代理的功能而言:

    • 采集代理 — 从/data/input/目录中读取原始文件,并将它们合并到ingested.txt文件中。这个代理不需要使用大型语言模型。

    • 摘要生成代理 — 从ingested.txt中提取关键信息,生成summary.txt文件。这是唯一一个需要使用大型语言模型的代理。

    • 优先级排序代理

      — 根据各项内容的紧急程度为它们分配优先级,从而生成prioritized.txt文件。这个代理也不需要使用大型语言模型。

    • 格式化代理

      — 最终生成Markdown格式的报告文件daily_digest.md。这个代理同样不需要使用大型语言模型。

    需要注意的是,在这四个代理中,只有一个是真正使用了大型语言模型的。其余的代理都是用纯Python编写的。这种设计是经过深思熟虑的:只有当确实需要运用推理能力或语言理解功能时,才应该使用大型语言模型;其他所有环节都应该使用确定性的代码来实现,因为这样成本更低、运行速度更快,而且结果也更容易预测。

    先决条件与环境配置

    在开始开发之前,你需要安装以下工具:

    • Python 3.10或更高版本 — 用于编写各个代理程序的语言

    • Docker Desktop(引擎版本20.10及以上)— 容器运行环境

    • Docker Compose v2 — 多容器编排工具(随Docker Desktop一起提供)

    • Git 2.30或更高版本 — 版本控制工具

    • OpenAI Python SDK(版本openai >= 1.0)— 用于访问大型语言模型API

    • Redis或RabbitMQ(可选)— 异步消息队列工具

    • PostgreSQL(可选)— 持久化数据存储系统

    如何安装Python

    你可以在python.org网站下载Python。在Windows系统中,安装过程中请勾选“将Python添加到PATH环境变量中”这个选项;在macOS系统上,你可以使用Homebrew来安装Python:

    brew install python@3.12

    在 Linux(Ubuntu/Debian)系统中,可以使用包管理器来安装:

    sudo apt update && sudo apt install python3 python3-pip

    如何安装 Docker

    Docker Desktop 是在 Windows 和 macOS 上开始使用 Docker 的最简单方式。请从 docker.com 下载它,然后按照提示进行操作。在 Windows 上,Docker Desktop 需要 WSL2——安装程序会指导你如何启用它。在 Linux 上,可以直接安装 Docker Engine:

    # Ubuntu/Debian
    sudo apt update
    sudo apt install docker.io docker-compose-v2
    sudo usermod -aG docker $USER  # 这样你在执行 Docker 命令时就不需要使用 sudo 权限了
    

    安装完成后,退出系统再重新登录,这样组权限的变更才会生效。

    如何验证你的设置是否正确

    打开终端并运行以下命令。每个命令都应该能够无错误地显示相应的版本号:

    python --version        # 应该显示 3.10 或更高版本
    docker --version        # 应该显示 20.10 或更高版本
    docker compose version  # 应该显示 v2.x
    git --version           # 应该显示 2.30 或更高版本
    

    如果有任何命令无法正常执行,请返回到 해당工具的安装步骤进行检查。最常见的原因是该命令没有被添加到你的 PATH 环境变量中。

    如何设置项目结构

    每个代理程序都位于自己的目录中,其中包含它自己的代码、Dockerfile 以及依赖文件。这种隔离设计使得你可以独立地构建、测试和更新每个代理程序。请创建如下结构:

    multi-agent-digest/
    ├── agents/
    │   ├── ingestor/
    │   │   ├── app.py
    │   │   ├── Dockerfile
    │   │   └── requirements.txt
    │   ├── summarizer/
    │   │   ├── app.py
    │   │   ├── Dockerfile
    │   │   └── requirements.txt
    │   ├── prioritizer/
    │   │   ├── app.py
    │   │   ├── Dockerfile
    │   │   └── requirements.txt
    │   └── formatter/
    │       ├── app.py
    │       ├── Dockerfile
    │       └── requirements.txt
    ├── data/
    │   └── input/          # 原始数据文件存放于此
    ├── output/              # 最终处理结果保存于此
    ├── tests/               # 单元测试和集成测试代码
    ├── .env                 # API 密钥(git 会忽略此文件!)
    ├── .gitignore
    ├── docker-compose.yml
    └── README.md
    

    你可以通过终端快速创建这些文件夹:

    mkdir -p multi-agent-digest/agents/{ingestor,summarizer,prioritizer,formatter}
    mkdir -p multi-agent-digest/{data/input,output,tests}
    cd multi-agent-digest
    

    如何逐步构建每个代理程序

    每个代理程序都遵循相同的简单流程:从共享目录中读取输入文件,完成相应的处理任务,然后生成输出文件。这种一致性使得整个系统更易于理解和扩展。

    Ingestor 代理程序

    Ingestor 是整个处理流程的入口点。它的作用是从输入文件夹中读取所有文本文件,并将它们合并成一个文件,以便后续的 Summarizer 程序进行处理。这是最简单的代理程序——不需要任何外部库或 API 调用,只需要进行文件的读写操作即可。

    agents/ingestor/app.py

    import os
    import logging
    
    logging.basicConfig(
        level=logging.INFO,
        format="%(asctime)s [%(levelname)s] %(name)s: %(message)s"
    )
    logger = logging.getLogger("ingestor")
    
    INPUT_DIR = "/data/input"
    OUTPUT_FILE = "/data/ingested.txt"
    
    def ingest():
        content = ""
        files_processed = 0
        for filename in sorted(os.listdir(INPUT_DIR)):
            filepath = os.path.join INPUT_DIR, filename)
            if os.path.isfile(filepath):
                try:
                    with open(filepath, "r", encoding="utf-8") as f:
                        content += f"\n--- {filename} ---\n"
                        content += f.read()
                        content += "\n"
                        filesprocessed += 1
                except Exception as e:
                    logger.error(f"无法读取文件 {filename}: {e}")
    
        if files_processed == 0:
            logger.warning("在 /data/input/ 中未找到任何输入文件")
    
        with open(OUTPUT_FILE, "w", encoding="utf-8") as out:
            out.write(content)
        logger.info(f>已处理了 {filesprocessed} 个文件,输出文件为 {OUTPUT_FILE}

    logging.basicConfig 这段代码用于设置结构化的日志记录格式。所有代理都使用相同的日志格式,因此当 Docker Compose 将它们一起运行时,就能得到清晰、一致的时间线记录。sorted(os.listdir()) 这个函数确保文件会按照字母顺序被处理——如果没有这个步骤,文件的处理顺序就会取决于文件系统,而且不同机器上的处理顺序也可能有所不同。try/except 代码块的作用是确保即使某个文件损坏了,也不会导致整个流程崩溃。如果根本找不到任何文件,代理也会生成一个空文件,而不会出现错误,这样后续的代理就能正常处理这些空文件。

    agents/ingestor/Dockerfile

    FROM python:3.10-slim
    WORKDIR /app
    COPY requirements.txt .
    RUN pip install --no-cache-dir -r requirements.txt
    COPY app.py .
    CMD ["python", "app.py"]
    

    FROM python:3.10-slim 这一行代码指定了使用一个预装了 Python 的最小化 Linux 镜像。-slim 这个版本的大小约为 120 MB,而完整的镜像大小为 900 MB。WORKDIR /app 设置了容器内的工作目录。COPY requirements.txtRUN pip install 这些命令用于在构建阶段处理依赖关系,而不是在运行时处理。COPY app.py 最后才复制应用程序代码,因为这个文件的变化频率最高,而 Docker 会缓存之前的镜像层。CMD 指定了容器启动时要执行的命令。

    由于 Ingestor 仅使用标准库模块,因此它的 requirements.txt 文件可以为空:

    # 不需要任何外部依赖关系

    汇总代理

    汇总代理是整个流程中最复杂的代理。它会读取被处理的文本,并调用大型语言模型 API 来生成简洁的摘要。这是唯一一个需要进行网络请求的代理,因此它也可能会因为外部因素而出现故障:例如 API 可能会关闭,或者你会遇到速率限制,又或者你的访问密钥无效。

    agents/summarizer/app.py:

    import os
    import logging
    import time
    from openai import OpenAI, RateLimitError, APIError
    
    logging.basicConfig(
        level=logging.INFO,
        format="%(asctime)s [%(levelname)s] %(name)s: %(message)s"
    )
    logger = logging.getLogger("summarizer")
    
    INPUT_FILE = "/data/ingested.txt"
    OUTPUT_FILE = "/data/summary.txt"
    
    client = OpenAI()  # 从环境变量中读取 OPENAI_API_KEY
    
    SYSTEM_PROMPT = (
        "你是一个乐于助人的助手,能够将长文本总结成关键的要点。每个要点都应是一句简洁的话,能够捕捉到核心信息。"
    )
    
    MAX_RETRIES = 3
    RETRY_DELAY = 5  # 秒
    
    def summarize(text, retries=MAX_REtries):
        """使用重试机制调用 LLM API,以避免遇到速率限制问题。"""
        for attempt in range(retries):
            try:
                response = client.chat.completions.create(
                    model="gpt-4o-mini",
                    messages=[
                        {"role": "system", "content": SYSTEM_PROMPT},
                        {"role": "user", "content": text[:8000]}
                    ],
                    max_tokens=1000,
                    temperature=0.3,
                )
                return response.choices[0].message.content
            except RateLimitError:
                wait = RETRY_DELAY * (attempt + 1)
                logger.warning(f"遇到速率限制。将在 {wait} 秒后重新尝试...")
                time.sleep(wait)
            except APIError as e:
                logger.error(f"API 错误:{e}")
                raise
        raise RuntimeError("调用 LLM API 的重试次数已达到上限。")
    
    def main():
        with open INPUT_FILE, "r", encoding="utf-8") as f:
            raw_text = f.read()
    
        if not raw_text.strip():
            logger.warning("输入内容为空。将生成默认的总结信息。」
            summary = "没有内容可以总结。"
        else:
            try:
                summary = summarize(raw_text)
            except Exception as e:
                logger.error(f"总结失败:{e}")
                summary = f"总结失败:{e}"
    
        with open(OUTPUT_FILE, "w", encoding="utf-8") as f:
            f.write(summary)
        logger.info(f"总结信息已写入 {OUTPUT_FILE} 文件。")
    
    if __name__ == "__main__":
        main()
    

    OpenAI() 客户端会自动从环境变量中读取 OPENAI_API_KEY —— 因此你无需在代码中显式地传递这个密钥,这样既更简洁,也更安全。text[:8000] 这一代码片段限制了发送给 API 的文本长度。发送较少的数据意味着响应速度会更快,成本也会更低。在实际生产环境中,你可能会采用更加智能的分块方式,比如根据句子或段落的边界来划分文本,而不是简单地计算原始字符的数量。

    温度参数设置为 0.3 可以使输出内容更加集中、更具确定性,这对于进行总结操作来说非常理想。重试机制专门用于处理 RateLimitError 类型的错误,并且每次重试的间隔时间会逐渐增加(分别为 5 秒、10 秒和 15 秒)——这种机制被称为指数级退避策略。其他类型的 API 错误则会立即引发异常,因为再次尝试这些错误并不会带来任何帮助。如果输入内容为空,或者 API 完全失败,该代理程序会生成默认的总结信息,而不会导致系统崩溃,这样后续的其他代理程序仍然可以正常运行。

    agents/summarizer/requirements.txt:

    openai>=1.0
    

    Dockerfile与Ingestor的Dockerfile完全相同。

    优先级排序代理

    该代理会获取由大语言模型生成的摘要,并根据关键词的紧急程度为每一行打分。这是一个基于规则的代理程序,无需调用大语言模型;它运行速度快、结果确定性强,而且完全免费。

    agents/prioritizer/app.py:

    import os
    import logging
    
    logging.basicConfig(
        level=logging.INFO,
        format="%(asctime)s [%(levelname)s] %(name)s: %(message)s"
    )
    logger = logging.getLogger("prioritizer")
    
    INPUT_FILE = "/data/summary.txt"
    OUTPUT_FILE = "/data/prioritized.txt"
    
    PRIORITY_keywords = [
        "urgent", "today", "asap", "important",
        "deadline", "critical", "action required"
    ]
    
    def score_line(line):
        """计算某一行中包含多少个优先级关键词。”“
        lower = line.lower()
        return sum(1 for kw in PRIORITY_KEYWORDS if kw in lower)
    
    def prioritize():
        with open INPUT_FILE, "r", encoding="utf-8") as f:
            lines = [line.strip() for line in f if line.strip()]
    
        scored = [(line, score_line(line)) for line in lines]
        scored.sort(key=lambda x: x[1], reverse=True)
    
        with open(OUTPUT_FILE, "w", encoding="utf-8") as out:
            for line, score in scored:
                out.write(f"[{score}] {line}\n")
    
        logger.info(f"已对{len(scored)}条内容进行了优先级排序,结果保存至{OUTPUT_FILE}文件中")
    
    if __name__ == "__main__":
        prioritize()
    

    这个评分函数会计算每行中包含多少个优先级关键词。例如,一行如果包含“urgent deadline”,那么它的得分就是2;而如果一行完全不包含任何优先级关键词,那么它的得分就是0。所有被评分的行会按照得分降序排列,这样最紧急的任务就会排在最前面。每一行的开头都会加上其得分,格式如下:[2] Urgent: quarterly report due today。在更高级的系统中,可以用基于大语言模型的排序算法来替代这个简单的关键词匹配机制,但对于每日摘要的生成来说,这种简单的匹配方法已经足够使用了。

    由于这个代理程序没有任何依赖关系,因此Dockerfile中省略了配置依赖项的步骤:

    agents/prioritizer/Dockerfile:

    FROM python:3.10-slim
    WORKDIR /app
    COPY app.py .
    CMD ["python", "app.py"]
    

    格式化代理

    格式化代理是整个处理流程中的最后一个环节。它会读取那些已经过优先级排序的文本行,然后将它们转换成格式规范的Markdown文档,并保存到指定的输出目录中。

    agents/formatter/app.py:

    import os
    import logging
    from datetime import datetime
    
    logging.basicConfig(
        level=logging.INFO,
        format="%(asctime)s [%(levelname)s] %(name)s: %(message)s"
    )
    logger = logging.getLogger("formatter")
    
    INPUT_FILE = "/data/prioritized.txt"
    OUTPUT_FILE = "/output/daily_digest.md"
    
    def format_to_markdown():
        with open INPUT_FILE, "r", encoding="utf-8") as f:
            lines = [line.strip() for line in f if line.strip()]
    
        today = datetime.now().strftime('%Y-%m-%d')
    
        with open(OUTPUT_FILE, "w", encoding="utf-8") as out:
            out.write("# 你的每日AI摘要\n\n")
            out.write(f"**日期:** {today}\n\n")
            out.write("## 核心要点\n\n")
            for line in lines:
                if '] ' in line:
                    score = line.split(']')[0][1:]
                    content = line.split('] ', 1)[1]
                    out.write(f"- **优先级:{score}**: {content}\n")
                else:
                    out.write(f "- {line}\n")
    
        logger.info(f"摘要文件已保存至{OUTPUT_FILE}路径中")
    
    if __name__ == "__main__":
        format_to_markdown()
    

    请注意,格式化工具会将数据写入/output目录,而不是/data目录。在Docker Compose中,/data目录是一个用于各代理程序之间进行通信的内部存储空间,而/output目录则映射到宿主机器上的一个文件夹中,您可以通过该文件夹获取最终的处理结果。使用`split(‘] ‘, 1)`并结合`maxsplit=1`这个参数,可以确保实际数据中的括号字符不会影响解析过程。

    Dockerfile的内容与Prioritizer版本的Dockerfile相同(不包含任何外部依赖项)。

    如何处理敏感信息及API密钥

    ⚠️ 警告:切勿将API密钥或敏感信息提交到版本控制系统中。如果OpenAI的API密钥泄露,可能会在您察觉之前导致数千美元的费用损失。

    在开发环境中使用.env文件

    在项目根目录下创建一个.env文件:

    # .env -- 请勿将此文件提交到版本控制系统中
    OPENAI_API_KEY=sk-your-key-here
    

    然后立即将其添加到.gitignore文件中:

    # .gitignore
    .env
    output/
    data/ingested.txt
    data/summary.txt
    data/prioritized.txt
    __pycache__/
    *.pyc
    

    Docker Compose在启动时会自动读取.env文件。在docker-compose.yml文件中,您可以通过`${OPENAI_API_KEY}`来引用这个变量,Docker Compose会在运行时替换为实际的密钥值。因此,这个密钥永远不会出现在Dockerfile、您的代码或版本控制历史记录中。

    在生产环境中使用Docker Secrets

    对于在Docker Swarm或Kubernetes上进行的生产环境部署,环境变量会显示在进程列表和检查命令中。而使用Docker Secrets则更加安全:

    # 创建秘密文件
    echo "sk-your-key-here" | docker secret create openai_key -
    
    # 在docker-compose.yml文件中引用该秘密文件(仅适用于Swarm模式)
    services:
      summarizer:
        secrets:
          - openai_key
    
    secrets:
      openai_key:
        external: true
    

    创建的秘密文件会以只读模式挂载到容器内的/run/secrets/openai_key目录中。您的代码会从这个文件中读取密钥值,而不是从环境变量中获取它。

    如何使用Docker Compose来协调所有组件

    当这四个代理程序都被构建完成后,Docker Compose会将它们全部连接起来。它会创建相应的容器,挂载共享的存储目录,传递环境变量,并确保各组件的执行顺序是正确的。

    docker-compose.yml文件内容如下:

    version: "3.9"
    
    services:
      ingestor:
        build: ./agents/ingestor
        container_name: agent_ingestor
        volumes:
          - ./data:/data
        restart: "no"
    
      summarizer:
        build: ./agents/summarizer
        container_name: agent_summarizer
        environment:
          - OPENAI_API_KEY=${OPENAI_API_KEY}
        depends_on:
          ingestor:
            condition: service_completed_successfully
        volumes:
          - ./data:/data
        deploy:
          resources:
            limits:
              memory: 512M
        restart: "no"
    
      prioritizer:
        build: ./agents/prioritizer
        container_name: agent_prioritizer
        depends_on:
          summarizer:
            condition: service_completed_successfully
        volumes:
          - ./data:/data
        restart: "no"
    
      formatter:
        build: ./agents/formatter
        container_name: agentFormatter
        depends_on:
          prioritizer:
            condition: service_completedsuccessfully
        volumes:
          - ./data:/data
          - ./output:/output
        restart: "no"
    

    depends_on中的condition: service_completed_successfully设置是确保管道按顺序执行的关键。这一设置在Compose v2中可用,它告诉Docker在启动下一个容器之前,必须等待前一个容器以零退出码结束运行。如果没有这个条件,depends_on只会等待容器启动,而不会等待其完成运行——这样一来,就可能会出现竞争条件,导致Summarizer试图读取Ingestor尚未写入的文件。

    卷挂载操作(./data:/data)会将你的本地数据文件夹映射到每个容器中。所有代理程序都会共享这个卷,从而能够互相传递文件。Formatter也会获得./output:/output这个挂载点,因此最终的处理结果会保存在宿主机上。Summarizer被设置为512M的内存限制,这样可以防止它占用过多系统资源。restart: "no"这一设置确保了Docker在代理程序运行完成后不会自动重启它们,因为这些其实都是批处理作业。

    如何运行这个管道

    docker compose up --build
    

    --build选项会告诉Compose在运行之前重新构建所有镜像。你会看到各个代理程序依次输出的日志信息:

    agent_ingestor    | 2025-01-20 07:00:01 [INFO] ingestor: 已接收3份文件
    agent_summarizer  | 2025-01-20 07:00:04 [INFO] summarizer: 已生成总结报告
    agent_prioritizer | 2025-01-20 07:00:05 [INFO] prioritizer: 已对8项内容进行了优先级排序
    agentformatter   | 2025-01-20 07:00:05 [INFO] formatter: 已生成最终处理结果
    

    当这四个容器全部完成运行后,打开output/daily_digest.md文件就能查看当天的汇总报告了。

    如何测试这个管道

    单元测试

    由于每个代理程序的核心逻辑都是用Python编写的简单函数,因此你可以在不使用Docker的情况下单独对这些函数进行测试。

    tests/test_prioritizer.py

    import sys
    sys.path.insert(0, 'agents/prioritizer')
    from app import score_line
    
    def test_urgent_keyword_scores_one():
        assert score_line("This is urgent") == 1
    
    def test_multiple_keywords_stack():
        assert score_line("Urgent and important deadline") == 3
    
    def test_no_keywords_scores_zero():
        assert score_line("Regular project update") == 0
    
    def test_scoring_is_case_insensitive():
        assert score_line("URGENT DEADLINE ASAP") == 3
    

    使用pytest来运行这些测试:

    pip install pytest
    python -m pytest tests/ -v
    

    为每个代理程序的核心功能编写测试用例,可以在构建任何Docker镜像之前就发现潜在的错误,这样相比在运行中的容器里进行调试,能节省大量时间。

    集成测试

    要端到端地测试整个管道流程,就需要创建一些已知的输入文件,并验证其输出结果是否符合预期:

    # 创建测试数据
    mkdir -p data/input
    echo "Urgent: quarterly report due today" > data/input/test.txt
    echo "Regular standup notes, no blockers" >> data/input/test.txt
    
    # 运行管道流程
    docker compose up --build
    
    # 验证输出文件是否存在且内容正确
    test -f output/daily_digest.md && echo "文件存在:通过" || echo "文件缺失:失败"
    grep -q "Priority" output/daily_digest.md && echo "内容检查:通过" || echo "内容检查:失败"
    

    如何添加日志记录与监控功能

    所有代理程序都会使用 Python 的 `logging` 模块,并采用统一的格式进行日志记录。当 Docker Compose 运行这四个容器时,它会将各个容器的日志按照容器名称前缀进行排序,从而让你能够清晰地看到整个处理流程的运行情况。

    对于生产环境而言,建议改用 JSON 格式的日志。这种格式更便于使用 ELK Stack、Grafana Loki 或 AWS CloudWatch 等日志聚合工具进行处理:

    import json
    import logging
    
    class JSONFormatterlogging.Formatter):
        def format(self, record):
            return json.dumps({
                "timestamp": self.formatTime(record),
                "level": record.levelname,
                "agent": record.name,
                "message": record.getMessage(),
            })
    

    要使用这种日志格式化器,只需将原来的 `basicConfig` 配置替换为新的处理程序即可:

    handler = logging.StreamHandler()
    handler.setFormatter(JSONFormatter())
    logger = logging.getLogger("summarizer")
    logger.addHandler(handler)
    logger.setLevel(logging.INFO)
    

    需要重点关注的指标包括:每次运行时处理的文件数量、Summarizer 的响应延迟时间、用于成本核算的 LLM 令牌使用量、每个代理程序出现的错误次数及重试次数,以及是否成功生成了 `daily_digest.md` 文件。对于个人使用来说,可以在输出目录中同时生成 JSON 格式的指标文件和摘要文件;而对于团队或生产环境而言,建议添加 Prometheus 监控工具或直接将数据发送到监控平台。

    成本、速率限制与优雅降级机制

    只有 Summarizer 这个代理程序会调用付费 API。以下是相关的费用信息:

    模型名称

    输入成本

    输出成本

    每日运行成本

    gpt-4o-mini

    \(0.15 / 100万令牌

    \)0.60 / 100万令牌

    低于 \(0.01

    gpt-4o

    \)2.50 / 100万令牌

    \(10.00 / 100万令牌

    \)0.02 至 \(0.10

    本地模型(Ollama)

    免费使用(利用用户的硬件资源)

    免费

    \)0.00

    如果每天仅处理几千个令牌的输入数据,那么使用 gpt-4o-mini 每次运行的成本不足一美分,每年总计约为三美元。

    为避免意外的费用支出,可以在 OpenAI 的控制面板中设置每月的消费上限。同时,也可以设置每分钟的速率限制,以防因程序漏洞导致重复调用 API 从而造成不必要的费用消耗。

    除了 Summarizer 内置的重试机制外,还可以对 LLM 的响应结果进行缓存。这样,当相同的输入文本再次出现时,就可以直接使用之前生成的摘要结果,而无需再次调用 API。应选择成本最低且效果良好的模型来进行处理——对于摘要生成任务来说,gpt-4o-mini 通常能够与 gpt-4o 相媲美,但其成本仅为后者的几分之一。此外,尽可能地通过合并多个小请求来减少 API 调用的次数,从而降低费用。

    当API出现故障时,摘要生成工具会自动生成一条备用信息。这种处理方式属于最理想的“优雅降级”机制:流程依然可以继续运行,只不过得到的摘要信息可能会不够完整,但总比什么都没有要好。如果这些摘要信息对你的工作流程至关重要,那么你可以添加相应的警报机制——例如,可以扩展该工具的功能,使其在生成备用信息时向Slack发送通知。

    安全与隐私考量

    当你将包含个人信息的电子邮件、会议记录或私人通讯内容输入到大型语言模型中时,必须仔细考虑这些数据会被存储或用于什么目的。

    你发送给OpenAI或其他类似服务提供商的数据会离开你的设备,在他们的服务器上进行处理。截至2025年初,OpenAI的API默认并不会使用用户提交的数据来训练模型,但相关政策可能会发生变化。因此,请务必随时查看你所使用服务提供商当前的数据存储与使用规定。如果你的输入数据中包含姓名、电子邮件地址或电话号码等个人身份信息,建议在调用API之前先去除这些信息,或者使用本地的模型进行处理。

    在数据处理过程中会生成一些中间文件(如ingested.txtsummary.txtprioritized.txt),其中包含了经过处理后的原始数据。如果只是用于个人学习或调试目的,可以保留这些文件并手动删除;而对于自动化处理流程来说,应该添加清理步骤,在生成摘要信息后及时删除这些中间文件。如果你在欧盟地区开展相关业务,还需要遵守GDPR关于数据最小化、用户删除权以及数据处理记录保存等方面的规定。

    为了保护你的容器环境,建议使用像python:3.10-slim这样的基础镜像来减少被攻击的风险;在运行容器时,应使用非root用户账户,并在Dockerfile中添加USER指令;同时要定期(至少每月一次)更新基础镜像以获取安全补丁,并使用docker scout或Trivy等工具检查镜像是否存在安全漏洞。

    如何使用本地大型语言模型来保障隐私(Ollama)

    如果你希望将所有数据都保存在自己的设备上,避免向外部API发送任何信息,就可以用Ollama这个本地模型来替代OpenAI API。Ollama允许你在本地运行开源的大型语言模型,它负责处理模型权重的下载、内存管理以及提供API接口服务。

    以下是设置Ollama的步骤:

    # 安装Ollama(适用于macOS或Linux系统)
    curl -fsSL https://ollama.com/install.sh | sh
    
    # 下载一个模型(llama3是一个通用型不错的选择)
    ollama pull llama3
    
    # 确认模型已成功运行
    ollama list
    

    只需将摘要生成工具中原本用于调用OpenAI API的代码,替换为调用Ollama本地API的代码即可:

    import requests
    
    def summarize_locally(text):
        """在Docker容器内部调用Ollama本地实例来生成摘要。"""
        url = "http://host.docker.internal:11434/api/generate"
        payload = {
            "model": "llama3",
            "prompt": (
                "将以下文本总结成关键"
                f"项目符号列表:

    host.docker.internal这个主机名使得容器能够与宿主机器上运行的服务进行通信。Ollama是在你的宿主机器上运行的(而不是在容器内部),因此Summarizer就是通过这种方式来访问它的。

    注意: 在Linux系统中,host.docker.internal可能默认不会被解析出来。你需要在docker-compose.yml文件中,为summarizer服务添加以下配置:extra_hosts: ["host.docker.internal:host-gateway"]

    本地模型相比云服务来说运行速度较慢,但它们不需要额外的费用,数据完全保密,并且即使在没有互联网连接的情况下也能正常使用。

    示例输入数据及预期输出结果

    为了在不使用真实新闻稿的情况下测试整个处理流程,请创建以下样本输入文件:

    data/input/newsletter_ai.txt

    AI周刊精选 - 2025年1月
    OpenAI本周发布了一款新的推理模型。
    紧急通知:欧盟的新AI法规将于3月份生效。
    Google宣布对其Gemini系列模型进行了更新。
    一家初创企业获得了5000万美元的资金,用于开发基于AI的代码审查工具。

    data/input/meeting_notes.txt:

    # 你的每日AI资讯摘要
    
    **日期:** 2025-01-20
    
    ## 核心要点
    
    - **优先级3**:重要提示:第一季度报告的截止日期是本周五
    - **优先级2**:紧急通知:欧盟的新AI法规将于3月份生效
    - **优先级1**:需要采取的行动:查阅更新后的API文档
    - **优先级0**:OpenAI发布了一款新的推理模型
    - **优先级0**:当前的工作进度正常

    实际生成的摘要内容会根据你使用的LLM模型及配置设置有所不同,但整体的结构和优先级顺序应该是保持一致的。

    如何实现每日自动执行

    既然整个处理流程可以通过一个命令端到端地完成,那么你就可以安排它每天早上自动运行。

    在Linux或macOS系统中使用Cron定时器

    使用crontab -e命令打开你的定时任务列表,然后添加以下这条指令,让该流程每天早上7点自动执行:

    0 7 * * * cd /path/to/multi-agent-digest &;&& docker compose up --build >>> cron.log 2>&1

    其中> cron.log 2>&1这部分代码会将所有的输出结果(包括错误信息)重定向到一个日志文件中,这样你之后就可以查看这些记录了。请确保你的机器在预定时间能够正常启动,并且Docker Desktop也已开启。

    在Windows系统中使用任务计划程序

    打开任务计划程序,创建一个新的任务。在“操作”选项中,将执行程序设置为:

    wsl -e bash -c 'cd /mnt/c/path/to/multi-agent-digest &&& docker compose up --build'

    将触发器设置为每天早晨在你指定的时间自动执行。

    如何添加推送通知

    为了让摘要信息真正发挥作用,你需要将其直接发送到你的手中,而不是让它只是存在于某个文件夹里。以下有三种选择:

    电子邮件 — 你可以扩展脚本功能,利用Python的`smtplib`模块来发送摘要信息。对于Gmail、SendGrid或Amazon SES等服务来说,你需要提供相应的SMTP登录凭据。

    Slack — 你可以在自己的Slack工作空间中创建一个接收通知的Webhook,然后将摘要信息以消息的形式发送过来。这样做只需要大约10行代码即可实现。

    Notion或Obsidian — 你可以利用这些工具的API,在每天早晨自动创建新的页面或笔记,并将摘要内容添加其中。

    常见故障排除

    容器因内存不足而退出 — 如果处理大型文件或使用大语言模型时消耗了大量内存,可以尝试在`docker-compose.yml`文件中的`deploy > resources > limits > memory`配置项中增加内存限制。建议将值设置为`1G`。

    来自OpenAI的速率限制错误 — 重试机制会自动处理临时性的速率限制问题。请检查你的OpenAI控制面板,确认是否存在使用量上限的问题。

    depends_on选项不会等待相关任务完成 — 确保你使用了`condition: service_completed_successfully`这个条件,这个设置要求使用Docker Compose v2版本。

    /output目录的权限问题

    — 如果在主机上挂载该目录时出现权限错误,可以在主机上执行`chmod -R 777 ./output`命令来修改权限设置;或者直接在Dockerfile中添加`USER`指令来解决这个问题。

    OPENAI_API_KEY未找到

    — 可能是`.env`文件丢失了,或者它的位置不正确。请确保将`.env`文件放在与`docker-compose.yml`相同的文件夹中,然后使用`docker compose config`命令来检查配置是否正确。

    从容器内部无法访问Ollama服务

    — 在Linux系统中,`host.docker.internal`这个域名可能无法被解析。你可以在`docker-compose.yml`文件中的服务配置里添加`extra_hosts: ["host.docker_internal:host-gateway"]`这一行代码来解决这个问题。

    生产环境部署方案

    对于个人使用或开发来说,`docker compose up`这种部署方式非常适用。但当你准备将应用部署到服务器或云端时,还有其他几种选择可供参考。

    Docker Swarm

    Docker Swarm是相对于Docker Compose而言更为简单易用的解决方案。它允许你在多台机器上部署应用,而且几乎不需要对现有的Docker Compose配置文件进行任何修改:

    docker swarm init
    docker stack deploy -c docker-compose.yml morning-brief
    

    Kubernetes

    对于大规模的生产环境来说,Kubernetes能为你提供更强大的调度、扩展和容错能力。你可以使用Kubernetes的Jobs功能来执行一次性运行的批处理任务;同时为每个容器设置资源请求量和限制值,以便集群调度器能够高效地分配资源。API密钥应该存储在Kubernetes Secrets中,而CronJobs则可用于实现每日定时执行的任务——它们的工作原理与cron类似,但由Kubernetes集群来管理。

    云平台

    所有主要的云服务提供商都提供了可用于运行该流程的管理型容器服务:

    AWS — 提供ECS Fargate,支持基于定时任务的无服务器执行方案;或者使用EKS来管理Kubernetes集群。

    Azure — 可使用Azure Container Instances进行简单部署,或通过AKS来管理Kubernetes集群。

    GCP — 提供Cloud Run Jobs用于无服务器批量处理任务,或使用GKE来管理Kubernetes集群。

    总结与下一步计划

    通过本手册的学习,你从零开始构建了一个多智能体的AI系统。你创建了四个专门的Python智能体,使用Docker将它们容器化,通过Docker Compose对它们进行协调配置,并实现了秘密管理、结构化日志记录、重试机制以及优雅的故障处理方案。

    你所学到的这些核心设计模式——职责分离、容器化智能体的设计、共享存储空间的通信机制,以及针对外部API的安全编码实践——其应用范围远超这个具体案例。每当你需要一个可靠、模块化且可复现的AI工作流程时,这些模式都会为你提供坚实的基础。

    以下是一些可供进一步探索的方向:

    智能体协作框架 — 像CrewAI和LangGraph这样的工具可以帮助你构建能够相互分配任务、协商优先级并以更复杂的方式协作的智能体系统。

    本地化及微调模型 — 可以尝试使用Ollama或vLLM在本地运行模型,或者针对特定任务微调小型模型,从而以更低的成本获得更好的结果。

    事件驱动架构 — 可以用Redis或RabbitMQ替代共享存储空间,这样智能体就能实时响应各种事件,而无需按照固定的时间表执行操作。

    反馈循环机制 > 可以添加一个专门用于评估每日摘要质量的智能体,让它逐步调整摘要生成器的提示内容。正是这样的反馈机制使得生产环境中的智能体系统能够不断学习和改进。

Comments are closed.