构建一个能够回答问题或执行搜索任务的人工智能代理,其实已经是一个解决完毕的问题。只要阅读一些教程并花费几个小时的时间,你就能够实现这一目标。
然而,大多数教程都忽略了后续的工程实现环节——也就是那些能够让多代理系统具备足够可靠性以便在实际环境中运行的部分。
当某个进程发生崩溃时,该如何恢复系统的状态?如何让各个代理能够标准化地使用各种工具,而无需为每一次集成都编写专门的适配程序?如何协调那些使用不同框架构建的代理?又如何判断代理的输出质量是否正在下降?
这些都是与基础设施相关的问题,而这本书通过提供可以在你自己机器上运行的代码来解答这些问题。不需要云账户、API密钥,也不会产生任何持续性的费用。
你将学习四种能够在协议层面上解决这些问题的技术:
-
LangGraph——用于实现具有状态保存功能的代理协调机制;
-
MCP(模型上下文协议)——用于实现工具的标准化集成;
-
A2A(代理间通信协议)——用于实现不同框架之间的代理协调;
-
Ollama——用于进行本地的大型语言模型推理。
为了使每一个概念都能得到具体的体现,你将会从头到尾构建一个完整的系统:这个系统包括一个“学习加速器”,它能够制定学习计划、根据你的笔记解释相关主题、运行测验,并根据测试结果进行调整。这个案例实际上就是一种教学工具,而其架构本身才是真正需要研究的重点。
这种基于开放协议进行协调的架构模式,在今天的实际应用中已经被广泛采用。例如,在销售支持领域,这些代理可以帮助新员工熟悉工作流程并调整培训内容;在合规培训中,它们可以确保员工通过规定的课程获得认证;在客户支持环节,它们能够帮助建立知识库并跟踪问题的处理进度;而在工程技术领域,它们也能引导新员工了解代码库的结构。
虽然应用场景可能会发生变化,但这种基础设施架构模式本身却是不变的。
📦 获取完整代码
这本手册所包含的所有可运行代码都存储在GitHub上。你可以克隆这个仓库并按照步骤进行操作,或者将其作为参考实现来学习相关内容。
目录
简介
你将构建的内容
你将要构建的这个系统包含四个智能体,这些智能体由LangGraph进行协调;还有两台MCP服务器,为这些智能体提供访问外部工具的权限;另外还有两个A2A服务,能够实现跨框架的智能体协作;Langfuse负责记录所有的执行轨迹,而DeepEval则会自动执行质量检查。
下面是这个系统从开始到结束的整体架构图:

图1. 完整的系统架构。LangGraph负责协调四个智能体,每个智能体都通过MCP访问相应的工具,进度辅导系统会通过A2A机制将任务委托给外部智能体,其中包括运行在完全不同框架中的CrewAI智能体;Ollama会在本地执行所有的推理操作,而Langfuse则会记录下所有的执行轨迹。
你会逐步构建这个系统的每一层。当整个系统建成时,你不仅会了解如何将这些技术结合起来使用,还会明白为什么每一种技术都是必要的,以及它们能够预防哪些生产环境中的问题。
技术栈
| 技术名称 | 版本号 | 作用 |
|---|---|---|
| LangGraph | 1.1.0 | 用于管理多智能体的状态驱动型图形化编排系统 |
| MCP | 1.26.0 | 标准化的人工智能体与工具之间的通信协议 |
| A2A SDK | 0.3.25 | 支持跨框架的智能体间通信协议 |
| Ollama | 最新版本 | 用于本地执行LLM推理任务,无需使用API密钥 |
| CrewAI | 1.13.0 | 通过A2A协议实现跨框架的互操作性 |
| Langfuse | 4.0.1 | 分布式追踪与监控工具 |
| DeepEval | 3.9.1 | 用于评估LLM模型的性能的工具 |
先决条件
你需要具备以下基础知识:
-
Python 3.11或更高版本:了解类型提示、数据类以及async/await的基本用法
-
基本的LLM概念:熟悉提示语、自动补全功能以及工具的使用方法
-
命令行操作:能够创建虚拟环境并运行脚本
您不需要具备使用LangGraph、MCP、A2A或任何代理框架的先验经验。本手册是从基本原理出发进行讲解的。
硬件要求
| 配置要求 | 内存 | 显存 | 模型版本 | 备注 |
|---|---|---|---|---|
| 最低要求 | 16 GB | 8 GB | qwen2.5:7b |
系统能正常运行,但功能可能受限 |
| 推荐配置 | 32 GB | 24 GB | qwen2.5-coder:32b |
工具调用性能最佳 |
| 仅针对CPU的情况 | 32 GB | 无需额外显存 | qwen2.5:7b |
系统能运行,但速度会比推荐配置慢5到10倍 |
💡 为什么模型规模对代理系统来说如此重要
代理系统是通过生成结构化的JSON参数来调用工具的。如果模型无法正确生成这些参数,或者生成的参数格式不正确,那么工具调用就会失败——系统会进入无限循环,但不会出现任何明显的错误提示。
参数规模小于70亿的模型经常会出现JSON格式错误。而参数规模在70亿到90亿之间的模型,才能够在实际应用中实现可靠的工具调用功能。
第1章:何时使用多个代理系统
在开始编写任何代码之前,您应该先回答这样一个问题:您面临的问题真的需要使用多个代理系统来解决吗?
这个问题的答案非常重要,因为增加代理系统的数量会带来相应的成本。更多的代理系统意味着更多的组件、更多的潜在故障点、更复杂的共享状态管理机制,以及更加繁琐的调试工作。因此,如果使用合适的工具,单个代理系统往往会是更简单、更高效、也更可靠的解决方案。
所以问题不应该是“我是否应该使用多个代理系统?”,因为多个代理系统并不一定就比单一代理系统更好。真正需要考虑的是:“我的问题是否具有需要通过多个代理系统来协作的特点?”
1.1 当单个代理系统是最佳选择时
当一个问题只需要一个代理系统来完成,且这个任务适合在单一的上下文中处理时,使用单个代理系统通常是最佳方案。
例如,一个负责研究某个主题并对其进行总结的代理系统;一个负责审核代码请求并发表评论的代理系统;一个负责回答客户通过知识库提出的问题の代理系统;以及一个负责从文档中提取结构化数据的代理系统——这些任务都只需要一个代理系统来完成。
在这种情况下,增加第二个代理系统并不会使问题得到简化。相反,它只会增加协调机制、共享状态管理的复杂性,以及调试难度,而并不会带来任何架构上的优势。因此,单个代理系统完全能够胜任这些任务。只要为它提供合适的工具,它就能顺利地完成工作。
对于单个代理系统来说,其工作流程非常简单:
用户输入 → 代理系统(配备相应工具) → 输出结果
代理可以循环调用各种工具(搜索、读取、写入、验证),但只要拥有适当的工具访问权限,单个大语言模型就能完成整个任务。对于大多数人工智能自动化应用来说,这既是正确的起点,往往也是合适的终点。
1.2 多个代理的真实适用条件
当某个问题真正存在“不同的专业化需求”时,使用多个代理才是合理的:如果各个子任务的工具、大语言模型的调用方式、对计算资源的需求或故障机制存在显著差异,将它们整合到一个代理中反而会引发更多问题。
以下是那些需要通过多代理协作来解决问题的具体情形:
不同子任务需要使用不同的工具
如果工作流程中的某个环节需要访问文件系统,另一个环节需要写入数据库,而还有第三个环节需要调用外部API,那么将这些功能分配给不同的代理就非常合理。
每个代理仅使用它所需要的工具,这样一来,单独测试或分析每个代理也会变得更加容易。
不同的大语言模型需要不同的调用方式
有些任务只需要一次结构化的输出请求,且“温度参数”应为0;而有些任务则需要通过多轮工具调用来完成,直到大语言模型获得足够的上下文信息后才能终止这些调用。
如果将这两种调用方式混合在一个代理中,就会导致该代理功能过于复杂,而且根据不同的执行路径,可能会出现不同的故障情况。
不同的任务对计算资源的需求各不相同
需要结构化输出的任务适合使用较低的温度设置,以确保结果的一致性;而需要创造性分析的任务则适合使用稍高的温度设置,以激发更多的创新思路;评分任务同样适合使用较低的温度设置,以保证评估结果的客观性。
如果这三个任务共用一个代理且使用相同的温度设置,那么在各方面都难免会做出妥协。
故障隔离的需求
如果某个子任务的失败不会影响其他任务的正常运行,那么就需要为这些子任务划分界限。例如,负责制定教学计划的代理即使遇到评分系统暂时故障,也能继续正常工作;但如果它们属于同一个流程且具有相同的故障机制,那么评分系统的错误就会导致整个教学计划也无法顺利执行。
独立部署的需求
如果系统的不同部分需要在不同的规模下运行、独立进行更新,或者由不同的团队使用不同的开发框架来构建,那么将这些部分分配给不同的代理就非常必要。A2A协议(第8章)为这种需求提供了具体的实现方案。
跨框架协作
如果想要用CrewAI代理来完成某项任务,而用LangGraph代理来完成另一项任务,因为不同的开发框架具有各自的优势,那么就需要一种让它们能够相互通信的协议。而A2A协议正是实现这种协作的关键工具。
单独来看,这些条件中并没有哪一条绝对要求必须使用多个代理;但结合起来看,它们确实为多代理架构提供了有力的支持。
1.3 你正在付出的代价
在决定采用多智能体架构之前,首先需要明确自己为此所要付出什么代价。
共享状态的复杂性:所有智能体都会读取和写入同一个共享状态对象。如果两个智能体对同一字段进行写操作,就需要制定相应的合并策略;如果其中一个智能体输入了错误的数据,后续的所有智能体都会接收到错误的输入信息。
状态的定义实际上就成了一份所有智能体都必须遵守的“契约”,而对该契约的任何修改都要求更新所有智能体的运行状态。
更复杂的调试过程:单个智能体的故障只会在一个堆栈跟踪中体现出来;而在多智能体系统中,故障可能源于三步之前产生的错误结果,这些错误结果被保存在共享状态中,然后传递给另一个智能体,最终导致当前的故障。这种因果关系会跨越不同的智能体。
延迟问题的加剧:每个智能体至少会进行一次大语言模型的调用。在一个由四个智能体组成的系统中,每次会话至少需要进行四次大语言模型调用;如果智能体在循环中使用某些工具,调用次数还会更多。由于每次调用大语言模型需要2到5秒的时间,这些延迟会迅速累积起来。
更多的基础设施需求:多智能体系统需要依赖状态持久化、可观测性、评估机制以及人工监督等功能,而这些功能的建立都需要花费一定的时间。单个智能体往往可以无需这些功能就能正常运行,但实际应用中的多智能体系统却绝对离不开它们。
在选择采用多智能体架构时,你应该清楚地认识到这些代价,并且能够说明哪些具体的好处足以证明这种架构的合理性。
1.4 为什么这个系统使用四个智能体
“学习加速器”系统确实使用了四个智能体。下面是对每个智能体功能的客观技术说明——之所以要分开设计这些智能体,并不是因为多智能体架构本身更优越,而是因为这四项功能之间存在明显的差异,如果将其中任意两项合并在一起,新的智能体的性能反而会下降。
| 智能体 | 功能 | 为何需要单独设计 |
|---|---|---|
| 课程规划器 | 根据学习目标生成结构化的学习路线图 | 仅需要一次大语言模型调用,参数为temperature=0.1、format="json"。不使用任何辅助工具。运行速度快、结果确定性强,但遇到错误输入时会立即失败。如果将其他智能体的功能混合到其中,会破坏输出结果的结构性。 |
| 讲解器 | 通过MCP读取学习资料,并向学生解释相关内容 | 需要多次调用大语言模型,参数为temperature=0.3。运行过程具有不确定性:大语言模型会自行判断何时已经获得了足够的上下文信息。其执行机制与课程规划器完全不同。 |
| 试题生成器 | 首先创造性地生成试题,然后再对答案进行分析性评估 | 需要两次独立的大语言模型调用,且每次调用的参数不同。该功能具有交互性,会暂停等待用户输入;同时也可以作为独立的问答服务使用(详见第8章)。如果与其他智能体合并使用,就无法实现这些功能。 |
| 进度指导者 | 综合分析学习结果,更新学习主题的状态,并决定是继续学习下一个主题还是结束当前的学习流程 | 它是系统中唯一需要进行跨智能体交互的组件(会与CrewAI Study Buddy进行通信)。该智能体负责读取和写入MCP内存,并负责做出决定,以确定学习流程是继续进行还是结束。 |
仅从功能角度来看,课程规划工具与解释工具就有必要被分开:前者能够直接生成结构化的JSON格式输出,而无需借助任何外部工具;后者则需要通过多轮调用其他工具来完成工作。如果将这两个工具整合到一个代理系统中,那么这个系统就会时而以循环方式调用工具,时而又不进行这样的操作,并且根据不同的运行“状态”会返回不同类型的输出结果。这样的系统根本无法具备强大的功能,实际上它只是两个工具伪装成了一个。
测验生成工具具有两种不同的工作模式:在0.4的设置下它会生成创造性问题,在0.1的设置下则会进行分析性评分;同时,它也需要作为独立的问答式服务来运行,因此它也有必要被划分成一个独立的模块。
进度辅导工具扮演的是协调者的角色——它需要整合所有其他工具提供的信息,并决定各项任务应该按照什么样的顺序来执行。这种职责显然不适合与其他任何工具共享。
在分析自己的系统架构时,可以参考这个原则:如果无法解释为什么两项任务必须由同一个工具来完成,那么它们很可能就不应该被合并在一起。
在生产环境中也是如此。例如,一个合规培训平台会包含以下几个模块:课程规划模块(负责制定认证学习路径)、内容传递模块(从指定的服务器中获取相关培训材料)、评估模块(用于检测学习者的理解能力并记录测试结果),以及证书颁发模块(负责判断学习者是否达到合格标准并颁发证书)。这些模块各自使用不同的工具,遇到故障时也会表现出不同的反应,而且更新频率也各不相同。将它们分开来设置,并不是出于某种架构上的考虑,而是因为每项任务的实际需求决定了必须这样设计。
1.5 构建项目环境
既然已经了解了这些架构原理,那么我们就开始构建这个系统吧。
安装Ollama并下载适合您硬件的模型
Ollama会在本地以OpenAI兼容的方式运行大语言模型,其访问地址为localhost:11434。
对于macOS和Linux系统:
curl -fsSL https://ollama.com/install.sh | sh
对于Windows系统:请从ollama.com下载安装程序并运行它。
根据您的硬件配置来下载相应的模型:
# 如果您的电脑拥有8GB的VRAM,可以下载以下模型:
ollama pull qwen2.5:7b
# 如果您的电脑拥有24GB的VRAM,建议下载这个更强大的模型:
ollama pull qwen2.5-coder:32b
# 下载完成后,可以运行以下命令来测试模型是否正常工作:
ollama run qwen2.5:7b "用一句话打个招呼。"
您应该会看到模型的响应结果。请让Ollama作为后台服务持续运行,因为在多次请求之间它需要保持活跃状态。
克隆代码仓库
git clone https://github.com/sandeepmb/freecodecamp-multi-agent-ai-system
cd freecodecamp-multi-agent-ai-system
配置虚拟环境
python -m venv .venv
source .venv/bin/activate # Windows系统:使用*.venv\Scripts\activate
pip install -r requirements.txt
文件requirements.txt中列出了所有所需的依赖库及其已测试过的版本信息。
# requirements.txt
langgraph==1.1.0
langgraph-checkpoint-sqlite==3.0.3
langchain-core==1.0.0
langchain-ollama==1.0.0
mcp==1.26.0
a2a-sdk==0.3.25
crewai==1.13.0
langfuse==4.0.1
deepeval==3.9.1
litellm==1.82.4
openai==2.8.0
httpx==0.28.1
fastapi==0.115.0
uvicorn==0.34.0
streamlit==1.43.2
pydantic==2.11.9
python-dotenv==1.1.1
tenacity==8.5.0
pytest==8.3.0
pytest-asyncio==0.25.0
⚠️ 请不要升级依赖库的版本。 此配置栈中的代理框架,尤其是LangGraph、langchain-core以及A2A SDK,在次要版本之间存在可能导致功能异常的变化。所指定的固定版本是经过联合测试的,因此对其中任何一个版本执行`pip install --upgrade`操作都可能引发导入错误或导致程序行为异常。
配置你的开发环境
cp .env.example .env
打开`.env`文件,设置相应的模型参数:
# .env: 请根据实际情况进行配置
OLLAMA_MODEL=qwen2.5:7b
OLLAMA_BASE_URL=http://localhost:11434
# 存储相关设置
CHECKPOINT_DB=data/checkpoints.db
NOTES_PATH=study_materials/sample_notes
# A2A服务相关配置(用于第8章)
QUIZ_SERVICE_URL=http://localhost:9001
STUDY_BUDDY_URL=http://localhost:9002
USE_A2A_QUIZ=true
USE_STUDY_BUDDY=true
# Langfuse相关设置:目前可保留为空,具体配置见第6章
LANGFUSE_PUBLIC_KEY=
LANGFUSE_SECRET KEY=
LANGFUSE_HOST=http://localhost:3000
验证配置是否正确
python main.py --help
执行上述命令后,应该能够看到正常的`argparse`帮助信息,而不会出现任何错误。如果出现导入错误,请确认虚拟环境已经激活。
📌 配置完成说明: Ollama已成功运行,所有依赖库也已安装完毕,开发环境也配置完成了。项目的整体结构如下所示:
freecodecamp-multi-agent-ai-system/
├── src/
│ ├── agents/ # LangGraph代理节点
│ ├── graph/ # 状态定义与工作流程相关代码
│ ├── mcp_servers/ # MCP工具服务器
│ ├── a2a_services/ # A2A协议服务及相关客户端代码
│ ├── crewai_agent/ # 通过A2A机制提供的CrewAI代理
│ └── observability/ # Langfuse相关配置文件
├── tests/ # 单元测试与评估代码
├── study_materials/
│ └── sample_notes/ # Explainer程序会读取的Markdown格式文档
├── docs/
├── data/ # SQLite数据库文件(运行时生成)
├── main.py
├── Makefile
├── docker-compose.yml # 用于构建本地开发环境的配置文件
├── requirements.txt
└── .env.example
`src/`目录下的所有文件都遵循Python标准的文件夹结构。《pyproject.toml》文件将`src/`目录添加到了Python的路径中,因此测试代码可以直接编写`from graph.state import AgentState`这样的语句,而无需进行复杂的路径配置。
在下一章中,你将开始构建这个系统的首个核心部分:负责协调四个代理节点运行的LangGraph框架。首先,你需要定义所有代理节点都能读取和写入的共享状态数据。
第2章:使用LangGraph进行状态管理型协同工作
LangGraph将多智能体工作流程建模为有向图。图中的节点代表Python函数,即你的智能体代码;边则用于定义这些节点之间的通信路径。每个节点都会读取和写入一个共享的状态对象。在每个节点执行完毕后,LangGraph会将该状态信息保存到SQLite数据库中。
正是这一机制使得LangGraph成为一种实用的生产工具,而不仅仅是一个便捷的封装层。如果用for循环来实现多智能体协作,那么一旦程序崩溃,所有之前的操作都会丢失;但LangGraph却能够保证数据的完整性。即使系统发生崩溃,检查点信息也会被保留下来,下次使用相同的会话ID调用graph.invoke()时,程序会从上次停止的地方继续执行。
本章将介绍构建这种协作框架的基础内容:所有智能体共用的共享状态机制、第一个可正常运行的智能体节点,以及将这些组件连接在一起的图结构。
2.1 共享状态
图中的每个节点都会以dict的形式接收完整的状态信息,然后仅返回自己所修改的那些键对应的更新内容。LangGraph会将这些更新合并到完整的状态数据中,并在调用下一个节点之前保存检查点。
在src/graph/state.py文件中定义的状态结构包括四个用于存储结构化数据的数据类,同时还定义了LangGraph所管理的AgentState类型字典:
# src/graph/state.py
from __future__ import annotations
import json
from dataclasses import dataclass, field, asdict
from typing import Annotated, TypedDict
from langchain_core.messages import BaseMessage
from langgraph.graph.message import add_messages
@dataclass
class Topic:
"""研究路线图中的某个具体主题。"""
title: str
description: str
estimated_minutes: int
prerequisites: list[str] = field(default_factory=list)
# 状态:pending → in_progress → completed | needs_review
status: str = "pending"
def to_dict(self) -> dict:
return asdict(self)
@classmethod
def from_dict(cls, data: dict) -> "Topic":
return cls(
title=data["title"],
description=data["description"],
estimated_minutes=data["estimated_minutes"],
prerequisites=data.get("prerequisites", []),
status=data.get("status", "pending"),
)
@dataclass
class StudyRoadmap:
"""课程规划工具生成的研究整体计划。"""
goal: str
total_weeks: int
topics: list[Topic]
weekly_hours: int = 5
def is_complete(self) -> bool:
return all(t.status in ("completed", "needs_review") for t in self.topics)
@dataclass
class QuizResult:
"""针对某个特定主题进行的测验的最终结果。"""
topic: str
questions: list
score: float # 分数范围为0.0到1.0
weak_areas: list[str]
timestamp: str = ""
def passed(self) -> bool:
return self.score >= 0.5
class AgentState(TypedDict):
"""
Learning Accelerator框架中使用的共享状态数据。
当某个节点返回{"approved": True}这样的更新信息时,LangGraph会将其合并到现有的状态数据中,而不会替换整个字典。只有那些被修改的键才会被更新。
唯一的特殊情况是`messages`字段:它使用`add_messages`函数来添加新内容,而不是直接替换原有的列表。"""
messages: Annotated[list[BaseMessage], add_messages]
session_id: str
goal: str
roadmap: StudyRoadmap | None
approved: bool
current_topic_index: int
quiz_results: list[QuizResult]
weak_areas: list[str]
study_materials_path: str
error: str | None
有几点设计决策值得了解。
为什么选择TypedDict而不是普通的类? LangGraph需要与字典兼容的对象。TypedDict能够提供类型安全性(集成开发环境会检测键拼写错误),同时仍然保持与字典的兼容性。对于这种特定的使用场景来说,它确实是合适的工具。
为什么要把add_messages方法放在messages字段上呢?
AgentState中的其他字段都遵循“最后写入的内容优先”的规则。如果两个节点同时向roadmap字段写入数据,那么后写入的数据会覆盖之前的内容。但对话信息应该是可以累积的。add_messages方法告诉LangGraph将新消息添加到列表末尾,而不是替换原有的列表内容。这样就能确保所有代理调用过程中都能保留完整的对话历史记录。
为什么为Topic、StudyRoadmap和QuizResult使用数据类?
因为代理程序需要读取和更新结构化数据,而避免因键名拼写错误导致问题。topic.title这种写法如果字段不存在会立即抛出AttributeError异常,而topic["titl"]则会默默地返回None。对于多个代理程序都会访问的结构化数据来说,使用数据类比普通的字典更加安全。
src/graph/state.py文件中还包含了三个辅助函数,这些函数可以帮助代理节点安全地读取状态信息:
# src/graph/state.py (续)
def initial_state(
goal: str,
session_id: str,
study_materials_path: str = "study_materials/sample_notes",
) -> dict:
"""为新的学习会话创建初始状态。"""
return {
"messages": [],
"session_id": session_id,
"goal": goal,
"roadmap": None,
"approved": False,
"current_topic_index": 0,
"quiz_results": [],
"weak_areas": [],
"study_materials_path": study_materials_path,
"error": None,
}
def get_current_topic(state: dict) -> Topic | None:
"""获取当前正在学习的主题,如果已经完成学习,则返回None。"""
roadmap = state.get("roadmap")
if roadmap is None:
return None
idx = state.get("current_topic_index", 0)
if idx >= len(roadmap.topics):
return None
return roadmap.topics[idx]
def session_is_complete(state: dict) ->> bool:
"""当所有主题都已完成学习时返回True。"""
roadmap = state.get("roadmap")
if roadmap is None:
return True
idx = state.get("current_topic_index", 0)
return idx >= len(roadmap.topics)
总是通过initial_state()函数来创建新的学习会话。千万不要手动构建字典对象,这样就能确保每个字段都有有效的默认值,也不会有任何必需的键被遗漏。
2.2 课程规划器:第一个代理节点
课程规划器是系统中最简单的代理节点:只需调用一次大语言模型,接收一个JSON格式的响应,然后得到一个数据类对象作为输出结果。整个过程不需要使用任何工具或循环结构。它展示了所有代理节点都会遵循的模式:从状态信息中读取数据,调用大语言模型,解析返回的结果,最后返回更新后的部分状态信息。
# src/agents/curriculum_planner.py
import json
import os
from langchain_core.messages import HumanMessage, SystemMessage
from langchain_ollama import ChatOllama
from graph.state import StudyRoadmap, Topic
MODEL_NAME = os.getenv("OLLAMA_MODEL", "qwen2.5:7b")
OLLAMA_BASE_URL = os.getenv("OLLAMA_BASE_URL", "http://localhost:11434")
PLANNER_SYSTEM_PROMPT = """你是一位经验丰富的课程设计专家。你的任务是在收到学习目标后,制定一份结构化的学习计划。
请仅返回有效的JSON格式数据,不要包含任何散文内容、Markdown代码格式标记或解释性文字。
该JSON数据必须严格符合以下结构:
{
"goal": "必须与提供的原始学习目标完全一致",
"total_weeks": <1到12之间的整数>,
"weekly_hours": <3到10之间的整数>,
"topics": [
{
"title": "主题名称(3-6个词)",
"description": "用一句话清晰说明这个主题的内容",
"estimated_minutes": <30到120分钟之间的整数>,
"prerequisites": ["如果需要,列出前置主题的名称;否则列表为空"],
"status": "待处理"
}
]
}
规则:
- 主题的顺序应从基础内容到高级内容依次排列
- 前置主题的名称必须与实际列出的名称完全一致
- 主题数量应为4到6个
- 所有主题的状态都必须为“待处理”
}
关于这里的模型设置,有两点需要注意。首先,temperature=0.1。这个值设定得非常低,因为结构化的JSON输出需要保持一致性;如果温度值过高,会导致输出结果出现变化,从而影响JSON解析的准确性。
其次,format="json"。这是Ollama模型所支持的JSON格式要求,属于推理层面的强制规定。无论系统提示是什么,该模型都无法生成非JSON格式的输出结果。这种限制比仅仅在提示中要求模型输出JSON更为严格。
def build_planner_llm() -> ChatOllama:
return ChatOllama(
model=MODEL_NAME,
base_url=OLLAMA_BASE_URL,
temperature=0.1,
format="json",
)
解析函数被有意地与核心处理函数分开设计。这样,即使不调用大型语言模型,也可以独立地对解析功能进行测试。在tests/test_curriculum_planner.py文件中,所有的11个单元测试都是直接调用parse_roadmap_json()函数的:
def parse_roadmap_json(json_string: str) -> StudyRoadmap:
"""将大型语言模型生成的JSON格式输出解析为StudyRoadmap数据结构。"""
try:
data = json.loads(json_string)
except json.JSONDecodeError as e:
raise ValueError(
f"大型语言模型返回了无效的JSON数据。\n"
f"错误信息:{e}\n"
f"原始输出内容(前300个字符):{json_string[:300]}"
)
required-fields = ["goal", "total_weeks", "topics"]
for field in required_fields:
if field not in data:
raise ValueError(f"大型语言模型生成的JSON数据中缺少字段 '{field}'")
if not isinstance(data["topics"], list) or len(data["topics"]) == 0:
raise ValueError("大型语言模型生成的JSON数据中的'topics'字段必须是一个非空列表")
topics = []
for i, t in enumerate(data["topics":
for field in ["title", "description", "estimated_minutes"]:
if field not in t:
raise ValueError(f"主题{i}中缺少字段 '{field}'")
topics.append(Topic(
title=t["title"],
description=t["description"],
estimated_minutes=int(t["estimated_minutes"]),
prerequisites=t.get("prerequisites", []),
status=t.get("status", "pending"),
)
return StudyRoadmap(
goal=data["goal"],
total_weeks=int(data["total_weeks]),
weekly_hours=int(data.get("weekly_hours", 5)),
topics=topics,
)
这个节点函数本身遵循该系统中所有代理所使用的相同模式:
def curriculum_planner_node(state: dict) -> dict:
"""
LangGraph节点:课程规划器
输入参数:state["goal"]
输出参数:state["roadmap"], state["messages"], state["error"]
"""
goal = state.get("goal", "").strip()
if not goal:
return {"error": "未提供学习目标。"}
print(f"\n[课程规划器] 为目标『{goal}』制定学习计划")
llm = build_planner_llm()
messages = [
SystemMessage(content=PLANNER_SYSTEM_PROMPT),
HumanMessage(content=f"为目标{goal}制定学习计划"),
]
print(f"[课程规划器] 调用{MODEL_NAME}...")
response = llm.invoke(messages)
try:
roadmap = parse_roadmap_json(response.content)
except ValueError as e:
print(f"[课程规划器] 解析错误:{e}")
return {
"error": str(e),
"messages": messages + [response],
}
print(f"[课程规划器] 已创建{len(roadmap.topics)}个学习主题")
# 仅返回这个节点修改过的键值对
return {
"roadmap": roadmap,
"messages": messages + [response],
"error": None,
}
注意返回的值是:{"roadmap": roadmap, "messages": ..., "error": None}。这里只返回了这个节点修改过的三个键值对,而不是整个状态对象。LangGraph会将这些修改合并到现有的状态中,其他字段保持不变。
2.3 图结构定义
这个图结构表示的是各个组件之间的连接关系,而非具体的逻辑处理流程。所有的业务逻辑都存在于各个代理模块中。src/graph/workflow.py文件仅描述了哪些节点存在、它们是如何连接的,以及路由函数会做出哪些决策:
# src/graph/workflow.py
import os
import sqlite3
from pathlib import Path
from langgraph.checkpoint.sqlite import SqliteSaver
from langgraph.graph import END, START, StateGraph
from agents.curriculum_planner import curriculum_planner_node
from agents.explainer import explainer_node
from agents.human_approval import human_approval_node
from agents.progress_coach import progress_coach_node
from agents.quiz_generator import quiz_generator_node
from graph.state import AgentState, session_is_complete
def route_after Approval(state: dict) -> str:
if state.get("approved", False):
return "explainer"
return "curriculum_planner"
def route_afterCoach(state: dict) -> str:
if session_is_complete(state):
return "end"
return "explainer")
def build_graph(
db_path: str = "data/checkpoints.db",
interrupt_before: list | None = None,
):
Path("data").mkdir(exist_ok=True)
if db_path == "data/checkpoints.db":
db_path = os.getenv("CHECKPOINT_DB", db_path)
builder = StateGraph(AgentState)
# 注册所有的节点
builder.add_node("curriculum_planner", curriculum_planner_node)
builder.add_node("human_approval", human_approval_node)
builder.add_node("explainer", explainer_node)
builder.add_node("quiz_generator", quiz_generator_node)
builder.add_node("progress_coach", progress_coach_node)
# 静态边
builder.add_edge(START, "curriculum_planner")
builder.add_edge("curriculum_planner", "human_approval")
builder.add_edge("explainer", "quiz_generator")
builder.add_edge("quiz_generator", "progress_coach")
# 条件边
builder.add_conditional_edges(
"human_approval",
route_afterApproval,
{"explainer": "explainer", "curriculum_planner": "curriculum_planner"},
)
builder.add_conditional_edges(
"progress_coach",
route_afterCoach,
{"explainer": "explainer", "end": END},
)
# 重要提示:直接创建连接,不要使用上下文管理器。SqliteSaver.from_conn_string()会返回一个上下文管理器,但如果使用`with SqliteSaver.from_conn_string(...) as checkpointer:`,那么当`with`块执行完毕后连接就会被关闭。但由于图对象的存在时间比build_graph()函数的长,因此连接必须保持开放状态,以确保整个程序的正常运行。所以应该直接创建连接。
conn = sqlite3.connect(db_path, check_same_thread=False)
checkpointer = SqliteSaver(conn)
return builder.compile(
checkpointer=checkpointer,
interrupt_before=interrupt_before or [],
)
graph = build_graph()
💡 SqliteSaver连接模式
必须设置check_same_thread=False这个标志。SQLite的默认行为会阻止在一个线程中创建的连接在另一个线程中被使用。
LangGraph在内部会使用不同的线程来执行节点函数以及进行检查点写入操作。如果没有这个标志,运行时会出现ProgrammingError: 在一个线程中创建的SQLite对象只能在该线程中使用这样的错误。不过在这个场景下使用这个标志是安全的,因为LangGraph会对检查点写入操作进行序列化处理,因此不会发生并发写操作导致的竞争问题。
路由功能是完全用Python实现的,没有调用任何大型语言模型。这些功能会从状态数据中读取信息,并返回一个字符串,而这个字符串决定了下一个应该执行哪个节点。控制流的逻辑应该保留在Python代码中,而不是大型语言模型中。如果让大型语言模型来决定路由路径,就会导致图的结构变得不可预测,从而使得对其逻辑的分析和测试变得非常困难。
interrupt_before参数的默认值是一个空列表。终端界面会在human_approval_node内部调用interrupt()函数来暂停程序的执行,等待用户进行确认操作,这一点你可以在第5章中看到,因此不需要在编译时设置中断机制。
Streamlit用户界面(第9章)会传递参数interrupt_before=["quiz_generator"],这样就可以在测验节点执行之前停止整个程序的运行,因此图中的线程永远不会调用input()函数。同一个图构建工具可以支持这两种模式。
下面是这个完整图的结构示意图:

图2. 完整的LangGraph结构图。实线箭头表示固定路径,虚线箭头表示条件路径。路由功能会在运行时决定应该执行哪条路径。
2.4 运行并验证
当Curriculum Planner节点和相应的图结构准备就绪后,你就可以进行第一次端到端的测试了:
python main.py "Learn Python closures and decorators from scratch"
你应该会看到如下输出:
============================================================
学习加速器
会话ID:a3f1b2c4
目标:从零开始学习Python闭包和装饰器
============================================================
[Curriculum Planner] 为“从零开始学习Python闭包...”制定学习计划...
[Curriculum Planner] 调用qwen2.5:7b...
[Curriculum Planner> 创建了5个学习主题
建议的学习计划
============================================================
目标:从零开始学习Python闭包和装饰器
学习时长:2周,每周5小时
1. Python函数基础复习(45分钟)
复习函数的定义、参数、返回值以及作用域相关知识
2. 作用域与LEGB规则(60分钟)
了解Python如何处理嵌套作用域中的变量名
3. 闭包原理讲解(75分钟)(需要先掌握作用域和LEGB规则)
...
图表在这里暂停了。在human_approval_node内部调用的interrupt()函数会导致图表停止运行,系统会保存一个检查点,然后将控制权交还给调用者。你的终端正在等待输入指令——输入yes可继续执行,输入no则重新开始。
📌 检查点说明:你现在拥有一个能够保持状态持久性的可用图表系统。顶部显示的会话ID被保存在data/checkpoints.db文件中。如果你现在终止进程,然后运行python main.py --resume a3f1b2c4,程序会从审批提示环节继续执行。检查点功能已经可以正常使用了。
现在请运行单元测试来验证解析逻辑的正确性:
pytest tests/test_state.py tests/test_curriculum_planner.py -v
预期结果:共35个测试用例,全部通过,且不需要使用Ollama模型。这些测试主要针对parse_roadmap_json()函数、状态数据类以及辅助功能进行验证,但不涉及实际的LLM调用过程。
在企业应用中,类似的图表结构也被广泛采用:销售支持系统会使用同样的架构。课程规划工具会为新销售人员生成学习路径,经理会在培训开始前审核该路径,之后员工就会按照计划学习产品相关知识。每个学习主题结束后,系统都会保存一个检查点。如果员工在午餐后返回继续学习,系统会从上次暂停的地方继续执行。
在下一章中,你将添加“模型上下文协议”,从而使所有代理程序都能使用标准化的工具接口;随后还会构建“解释器”这个代理程序——它会在循环中调用各种工具,并不断迭代,直到收集到足够的背景信息来生成详细的解释内容。
第3章:利用MCP实现标准化工具访问
“解释器”代理程序在提供解释之前,需要先读取学习笔记;而“进度辅导工具”则负责存储和检索会话数据。虽然这两种代理程序都可以直接调用Python函数,但这样做会导致它们都与文件系统结构、存储方案以及函数实现方式紧密绑定。
“模型上下文协议”通过明确的分离机制解决了这个问题:代理程序只描述自己需要什么功能,而工具服务器则负责具体如何实现这些功能。即使存储后端发生变更,也不会影响任何代理程序的代码。只要构建一次工具服务器,任何兼容MCP协议的代理程序(无论是LangGraph、CrewAI、Claude Desktop还是其他类型)都可以使用它。
3.1 MCP的三种基本功能
MCP允许服务器提供以下三种类型的功能:
-
工具是代理程序可以带着参数调用的可执行函数。例如
read_study_file(filename)就属于这种工具。代理程序可以控制这些工具的调用时机和参数设置,而具体的实现细节则由服务器负责处理。 -
资源是结构化的数据,代理程序可以通过URI来访问这些数据。
notes://index就是一个资源示例。可以把这些资源看作是只读的HTTP GET接口,服务器决定哪些数据可以被提供,代理程序则根据需要获取这些数据。 -
提示模板是服务器预先准备好的可重复使用的提示文本,代理程序可以通过名称来请求使用这些模板。虽然这个系统中并不经常使用提示模板,但在某些情况下,工具服务器可能希望自己负责设计特定领域的提示内容。
关键区别在于:工具用于执行特定操作,而资源则用于存储数据。如果某个代理需要执行某种操作,那么它就属于工具;如果它需要读取结构化的数据,那么这个数据就属于资源。
💡 MCP作为一种稳定的契约
可以把MCP看作是代理与工具之间的稳定契约。解释器代理知道某个工具的名称是read_study_file,并且该工具会接受一个filename参数。至于这个工具的具体实现方式——是从磁盘读取数据、从S3存储桶中获取信息,还是查询数据库——这些细节对代理来说是不可见的。
这就是MCP的价值所在:你可以更换其实现方式,而无需修改任何代理代码。
3.2 构建文件系统MCP服务器
文件系统服务器使代理能够访问你的学习笔记。它提供了三种工具和一种资源。
# src/mcp_servers/filesystem_server.py
import os
from pathlib import Path
from mcp.server.fastmcp import FastMCP
mcp = FastMCP("Filesystem Server")
# 通过环境变量配置路径
NOTES_BASE = Path(os.getenv("NOTES_PATH", "study_materials/sample_notes"))
@mcp.tool()
def list_study_files() -> list[str]:
"""
列出所有可用的学习笔记文件。
返回相对于笔记目录的文件名列表。
例如:['closures.md', 'decorators.md', 'python_basics.md']
在尝试读取特定文件之前,请先调用此函数来确定有哪些材料可用。
"""
if not NOTES_BASE.exists():
return []
return sorted([
str(f(relative_to(NOTES_BASE))
for f in NOTES_base.rglob("*.md")
])
@mcp.tool()
def read_study_file(filename: str) -> str:
"""
读取学习笔记文件的全部内容。
参数:
filename:要读取的文件名,该名称应与list_study_files()返回的值相同。
例如:'closures.md'
返回文件的完整文本内容;如果文件不存在,则返回错误信息。
此函数不会抛出异常,而是以字符串形式返回错误信息,以便代理能够妥善处理这些错误。
"""
file_path = NOTES_BASE / filename
# 安全措施:防止路径遍历攻击。
# 如果没有这个措施,代理可能会尝试调用read_study_file://"..env"),
# 从而泄露你的API密钥。我们会解析路径并确认请求的文件确实位于笔记目录内。
try:
resolved = file_path.resolve()
resolved-relative_to(NOTES_BASE.resolve())
except ValueError:
return (
f"错误:尝试对'{filename}'进行路径遍历被阻止。"
"只有位于笔记目录内的文件才能被访问。"
)
if not file_path.exists():
available = list_study_files()
return f"错误:未找到文件'{filename}'。可用的文件有:{available}"
if file_path.suffix != ".md":
return f"错误:只能访问扩展名为.md的文件,而文件 '{file_path.suffix}' 的格式不符合要求。"
try:
return file_path.read_text(encoding="utf-8")
except (PermissionError, OSError) as e:
return f"读取文件'{filename}'时出现错误:{e}"
@mcp.tool()
def search_notes(query: str) -> list[dict]:
"""
在所有学习笔记中搜索包含指定关键词或短语的文件。
参数:
query:要搜索的字符串。搜索时不区分大小写。
返回一个匹配结果列表,每个结果包含以下键:'file', 'line_number', 'line'。
为避免信息量过大,最多返回20个结果。
"""
if not NOTES_BASE.exists():
return []
results = []
query_lower = query.lower()
for file_path in sorted(NOTES_base.rglob("*.md")):
rel_path = str(file_path.relative_to(NOTES_BASE))
try:
lines = file_path.read_text(encoding="utf-8").splitlines()
except (UnicodeDecodeError, PermissionError, OSError):
continue
for line_num, line in enumerate(lines, 1):
if query_lower in line.lower():
results.append({
"file": rel_path,
"line_number": line_num,
"line": line.strip(),
})
if len(results) >= 20:
return results
return results
@mcp.resource("notes://index")
def get_notes_index() -> str:
"""
资源:包含所有可用学习材料的索引信息及它们的文件大小。
URI:notes://index
"""
files = list_study_files()
if not files:
return "# 学习材料索引\n\n未找到任何学习材料。"
lines = ["# 学习材料索引\n"]
for filename in files:
file_path = NOTES_BASE / filename
try:
size_kb = file_path.stat().st_size / 1024
lines.append(f"- **{filename}** ({size_kb:.1f} KB)")
except OSError:
lines.append(f"- **{filename}** (文件大小未知)")
lines.append(f"\n总共有 {len(files)} 份学习材料")
return "\n".join(lines)
if __name__ == "__main__":
print(f"[Filesystem MCP] 正在启动服务器")
print(f "[Filesystem MCP] 文件存储路径:{NOTES_BASE.resolve()}")
mcp.run()
@mcp.tool() 和 @mcp.resource() 构成了整个集成接口。FastMCP 会读取函数名称(该名称会成为工具的名称)、文档字符串(LLM 会根据这些描述来决定是否使用该工具),以及类型注解(它们会决定参数的结构)。这就是服务器与任何连接到它的客户端之间的完整约定。
文档字符串非常重要。调用这些工具的 LLM 会通过阅读文档字符串来决定何时使用某个工具以及应该使用哪些参数。如果文档字符串描述得不够清晰(比如只是简单说明“读取文件”),就可能导致错误的工具选择。而这个服务器中的文档字符串能够明确告诉代理程序何时应该调用每个工具,以及参数应该采用什么样的格式。
3.3 构建内存 MCP 服务器
内存服务器为代理程序提供了会话级别的键值存储功能。解释器会记录自己已经解释了哪些主题,而进度指导系统在决定下一步该做什么之前,会读取这些记录。
# src/mcp_servers/memory_server.py
from datetime import datetime, timezone
from mcp.server.fastmcp import FastMCP
mcp = FastMCP("Memory Server")
# 在进程内部使用的存储结构:{session_id: {key: {"value": str, "updated_at": str}}}
# 在生产环境中,应使用 Redis 或 PostgreSQL 替代这种存储方式。
# MCP 的接口本身是不变的,只有这个存储结构会发生变化。
_store: dict[str, dict] = {}
def _now_iso() -> str:
return datetime.now(timezone.utc).isoformat()
@mcp.tool()
def memory_set(session_id: str, key: str, value: str) -> str:
"""
将值存储在会话内存中。
所有的值都应该是字符串形式。对于复杂的数据,可以使用 JSON 来表示:
example: memory_set(session_id, 'quiz_scores', json.dumps([0.8, 0.6]))
参数说明:
session_id: 这些数据仅适用于当前的学习会话。
key: 描述性名称,例如 'explained_topics' 或 'last_quiz_score'。
value: 字符串形式的值。对于列表或字典类型的数据,也需要使用 JSON 来表示。
"""
if session_id not in _store:
_store[session_id] = {}
_store[session_id][key] = {"value": value, "updated_at": _now_iso()}
return f"已为会话 '{session_id}' 存储了键 '{key}',其值为 {value}"
@mcp.tool()
def memory_get(session_id: str, key: str) -> str:
"""
从会话内存中检索值。
如果指定的键不存在,就会返回字符串 "null"。
返回 "null" 是为了让 LLM 能够正确处理这种情况,而不会引发类型错误。
"""
session = _store.get(session_id,{})
entry = session.get(key)
if entry is None:
return "null"
else:
return entry["value"]
@mcp.tool()
def memory_list_keys(session_id: str) -> list[str]:
"""列出某个会话中存储的所有键。如果没有任何数据,就会返回一个空列表。"""
return list(_store.get(session_id,}).keys')
@mcp.tool()
def memory_delete(session_id: str, key: str) -> str:
"""从会话内存中删除指定的键。"""
session = _store.get(session_id,{})
if key in session:
del session[key]
return f"已从会话 '{session_id}' 中删除了键 '{key}'"
else:
return f"在会话 '{session_id}' 中未找到键 '{key}'"
@mcp.resource("notes://session/{session_id}")
def get_session_summary(session_id: str) -> str:
"""某个会话中存储的所有数据的完整摘要。访问地址:notes://session/{session_id}"""
session = _store.get(session_id,{})
if not session:
return f"# 会话内存:{session_id}\n\n目前还没有存储任何数据。"
lines = [f"# 会话内存:{session_id}\n"]
for key, entry in sorted(session.items()):
lines.append(f"## {key}")
lines.append(f"- 值:{entry['value']}\n")
return "\n".join(lines)
if __name__ == "__main__":
print("[Memory MCP] 正在启动服务器")
mcp.run()
_store字典的设计初衷就是保持简单性。整个内存服务器完全可以被Redis后端替代,而代理代码本身并不会因此发生任何变化,只有memory_set和memory_get的实现方式会需要调整。这就是协议边界所带来的价值所在。
memory_get函数返回字符串"null"而非Python中的None,这一设计也是经过深思熟虑的。当ToolMessage中包含None时,某些模型版本会无法正确处理这种情况。而返回"null"可以让大语言模型理解“该键目前还不存在”,从而避免出现类型处理相关的异常情况。
3.4 代理如何使用MCP工具:工具调用流程
解释器代理是第2章(状态管理)与第3章(MCP机制)中所有内容汇聚的地方。它也是系统中第一个会多次调用大语言模型的代理——每次调用对应一种工具,直到大语言模型认为已经获得了足够的信息来生成解释结果为止。
src/agents/explainer.py文件中,MCP服务器相关的功能被直接作为Python函数导入,并使用了LangChain的@tool装饰器进行封装:
# src/agents/explainer.py (setup section)
import json, os
from langchain_core.messages import AIMessage, HumanMessage, SystemMessage, ToolMessage
from langchain_core.tools import tool
from langchain_ollama import ChatOllama
from graph.state import get_current_topic
from mcp_servers.filesystem_server import list_study_files, read_study_file, search_notes
from mcp Servers.memory_server import memory_get, memory_set
MODEL_NAME = os.getenv("OLLAMA_MODEL", "qwen2.5:7b")
OLLAMA_BASE_URL = os.getenv("OLLAMA_BASE_URL", "http://localhost:11434")
@tool
def tool_list_files() -> list[str]:
"""
列出notes目录中所有可用的学习笔记文件。
返回的文件名格式为['closures.md', 'decorators.md']等。
在读取任何文件之前,先调用此函数来确定有哪些材料可用。"""
return list_study_files')
@tool
def tool_read_file(filename: str) -> str:
"""
读取某份学习笔记文件的完整内容。
参数:
filename: 由tool_list_files()返回的文件名。
返回值:
文件的全部文本内容;如果文件不存在,则返回错误信息。
"""
return read_study_file(filename)
@tool
def tool_search_notes(query: str) -> str:
"""
在所有学习笔记中搜索指定的关键词或短语。
参数:
query: 需要搜索的词或短语(不区分大小写)。例如:'nonlocal', 'closure'
返回值:
一个JSON字符串,其中包含匹配的结果及其所在的文件位置。
```
results = search_notes(query)
if not results:
return "未找到匹配项。"
return json.dumps(results, indent=2)
@tool
def tool_memory_get(session_id: str, key: str) -> str:
"""
从会话内存中检索指定键对应的值。
参数:
session_id: 当前的会话ID。
key: 需要查询的键名。
返回值:
如果找到了相应的值,则返回该值;否则返回'null'。
"""
return memory_get(session_id, key)
@tool
def tool_memory_set(session_id: str, key: str, value: str) -> str:
"""
将指定的值存储到会话内存中,供后续的代理程序读取。
参数:
session_id: 当前的会话ID。
key: 键名。
value: 要存储的值。对于复杂数据,建议使用JSON格式进行编码。
```
return memory_set(session_id, key, value)
EXPLAINER_TOOLS = [
tool_list_files, tool_read_file, tool_search_notes,
tool_memory_get, tool_memory_set,
]
TOOL_MAP = {t.name: t for t in EXPLAINER_TOOLS}
⚠️ 直接导入与子进程传输方式的区别
在本教程中,MCP工具被作为Python函数导入,并使用@tool标签进行封装。这种方式使所有操作都在一个进程中完成,因此开发起来更加简单,也不会产生任何与子进程相关的开销,同时也便于进行测试。
而在生产环境中,MCP服务器会作为独立的进程运行,它们通过标准输入/输出或HTTP进行通信。此时,你可以使用langchain-mcp-adapters中的MultiServerMCPClient来进行连接。在这两种模式下,代理代码几乎是完全相同的——唯一不同的是工具封装的方式而已。
解释器的系统提示不仅会告诉大型语言模型有哪些工具可用,还会告诉它应该如何按顺序使用这些工具:
EXPLAINER_SYSTEM_PROMPT = """你是一位专家导师,正在向学生讲解各种主题。
你的解释必须基于学生实际学习到的材料。在开始讲解之前,请使用可用的工具来查找并阅读相关的笔记。
操作步骤如下:
1. 调用tool_list_files()函数,查看有哪些学习材料可用。
2. 调用tool_search_notes(topic)函数,找出哪些文件与当前主题相关。
3. 调用tool_read_file(filename)函数,读取最相关的文件内容。
4. 查看之前的学习内容:调用tool_memory_get(session_id, 'explained_topics')函数。
5> 根据笔记中的内容来撰写你的解释。
解释格式建议如下:
- 用一个现实生活中的类比来开头(1-2句话)。
- 明确地阐述核心概念(2-3句话)。
- 举出学生笔记中的具体代码示例。
- 最后提醒读者注意一些常见的错误或需要注意的地方。
在写完解释之后,需要将解释内容保存下来:
tool_memory_set(session_id, 'explained_topics', <逗号分隔的主题标题列表>)
"""
explainer_node中的工具调用循环是整个系统中最核心的部分,因此值得我们仔细了解:
# src/agents/explainer.py (节点函数)
def execute_tool_call-tool_call: dict) -> str:
"""执行工具调用并将结果以字符串形式返回。此过程不会引发任何异常。"""
name = tool_call["name"]
args = tool_call["args"]
if name not in TOOL_MAP:
return f"错误:未知的工具‘{name}’。可用的工具有:{list(TOOLMAP.keys())}"
try:
result = TOOL_MAP[name].invoke(args)
if isinstance(result, (list, dict)):
return json.dumps(result)
return str(result)
except Exception as e:
return f"执行工具‘{name}’({args})时出现错误:{type(e).__name__}: {e}"
def explainer_node(state: dict) -> dict:
"""
LangGraph节点:解释器代理
输入参数:
state["roadmap"], state["current_topic_index"], state["session_id"]
输出参数:
state["messages"], state["error"]
"""
topic = get_current_topic(state)
if topic is None:
return {"error": "未找到当前主题。"}
session_id = state.get("session_id", "unknown")
print(f"\n[解释器] 当前主题:'{topic.title}'")
llm = ChatOllama(
model=MODEL_NAME,
base_url=OLLAMA_BASE_URL,
temperature=0.3,
).bind_tools(EXPLAINER_TOOLS)
messages = [
SystemMessage(content=EXPLAINER_SYSTEM_PROMPT),
HumanMessage(content=(
f"请帮我解释这个主题:'{topic.title}'\n"
f"相关背景信息:{topic.description}\n"
f"用于调用记忆功能的会话ID:{session_id}"
),
]
max_iterations = 8
final_response = None
for iteration in range(max_iterations):
print(f"[解释器] 第{iteration + 1}次大型语言模型调用...")
response = llm.invoke(messages)
messages.append(response)
if not response.tool_calls:
final_response = response
print(f"[解释器] 经过{iteration + 1}次调用后,解释完成!")
break
print(f"[解释器] 当前需要执行{len(response/toolcalls)}次工具调用...")
for tool_call in response-tool_calls:
print(f" → {tool_call['name']}({tool_call['args']})")
result = execute_tool_call(tool_call)
log_result = result[:100] + "..." if len(result) > 100 else result
print(f" ← {log_result}")
# 确保tool_call_id与大型语言模型为该请求分配的ID相匹配,
# 这样才能将结果正确地关联到对应的请求。
messages.append(ToolMessage(
content=result,
tool_call_id=tool_call["id"],
))
if final_response is None:
return {
"messages": messages,
"error": f"解释器已经完成了最大次数的调用(共{max_iterations}次)。",
}
print(f"[解释器] 解释结果:共{len(final_response.content)}个字符")
return {"messages": messages, "error": None}
让我们来看看在一次执行过程中会发生什么:
LLM第一次调用: LLM接收到了系统提示以及用户要求对“闭包概念进行解释”的请求。它随后调用了以下工具函数:tool_list_files()和tool_search_notes("closure"),但目前还没有生成任何文本解释。
工具函数的执行结果: tool_list_files()返回了文件列表["closures.md", "decorators.md", "python_basics.md"];tool_search_notes("closure")从文件closures.md中找到了相关内容。这两个结果都会以ToolMessage对象的形式被添加到消息列表中,同时会附带对应的tool_call_id。
LLM第二次调用: 现在LLM已经获得了文件列表和搜索结果,于是它调用了tool_read_file("closures.md")。
工具函数的执行结果: 文件closures.md的全部内容都被作为ToolMessage对象返回给了LLM。
LLM第三次调用: LLM读取了这些内容后,调用了tool_memory_set(session_id, "explained_topics", "Closures Explained"),用于记录这个主题已经被讲解过了。
LLM第四次调用: 由于相关信息已经存储在系统中,LLM最终生成了解释文本。在此之后不再有其他的工具函数被调用,整个流程也就结束了。需要注意的是,这份解释是基于用户提供的笔记内容生成的,而不是基于模型的训练数据。
tool_call_id这一字段在tool_call_id=tool_call["id"]这条代码中起到了关键作用。当LLM调用某个工具函数时,它会为该函数分配一个唯一的ID;而ToolMessage对象中也必须包含这个ID,这样LLM才能将执行结果与相应的请求关联起来。如果没有这个ID,整个处理流程就会出错,模型也会产生错误的输出。
max_iterations = 8这一限制实际上起到了“安全阀”的作用。如果模型出现故障并不断循环调用工具函数,那么系统会一直运行下去,直到用户手动终止它。对于任何合理的解释任务来说,8次迭代就已经足够了。如果模型超过了这个限制,就会触发错误机制,此时你可以调整系统提示或更换更强大的模型。
3.5 运行解释器
按照提示批准路线图后,就可以观察工具调用流程的实际运行情况了:
python main.py
批准成功后,系统会显示如下输出:
[Explainer] 主题:'Python函数复习'
[Explainer] LLM第1次调用/共8次...
→ tool_list_files{}
← ["closures.md", "decorators.md", "python_basics.md"]
[Explainer] LLM第2次调用/共8次...
→ tool_search_notes({'query': 'functions'})
← [{"file": "python_basics.md", "line_number": 12, "line": "## 函数"}]
[Explainer] LMM第3次调用/共8次...
→ tool_read_file({'filename': 'python_basics.md'])
← # Python基础\n\n## 变量与数据类型…
[Explainer] LLM第4次调用/共8次...
→ tool_memory_set({'session_id': 'a3f1b2c4', 'key': 'explained_topics', ...})
← 已为会话'a3f1b2c4'存储了'explained_topics'
[Explainer] LLM第5次调用/共8次...
[Explainer] 5次调用完成后完成解释
[Explainer> 解释内容:487个字符
每一个箭头(→)都代表LLM发起的一次工具调用;而每一个反向箭头(←)则代表返回给LLM的结果。循环在第五次LLM调用时终止,因为那次响应中包含了最终的解释内容,之后不再有进一步的工具请求。
📌 检查点:运行MCP服务器测试,以验证这些工具是否能够独立于LLM正常工作:
pytest tests/test_mcp_servers.py -v
预期结果:共36项测试,全部通过,且无需使用Ollama。这些测试直接以Python函数的形式调用相应的工具功能,既不涉及子进程,也没有额外的协议开销。由于这些工具功能本身就是普通的Python代码,因此它们在两种模式下都能正常工作(无论是通过直接的Python导入方式,还是通过MCP协议)。
在企业应用中,如果采用这种架构设计,一个合规培训系统中的MCP服务器应该用来提供监管法规内容库,而不是学习笔记。用户可以按主题查询这些内容,阅读相关要求,并根据实际的监管文本生成认证评估结果,而不是依赖模型对法规内容的解读。这才是这种架构的核心价值所在。
在下一章中,你将添加测验生成器和进度辅导系统,设置条件路由机制,使整个系统能够自动遍历所有主题,并最终运行这个包含四个组件的完整系统。
第4章:构建四组件系统
前三章奠定了系统的基础:定义了共享状态、构建了一个会在每个节点处进行检查的图结构、设置了两个MCP服务器,以及开发了利用这些服务器将解释内容与实际学习笔记相结合的解释器代理。目前你已经拥有一个能够读取文件并解释相关主题的LLM。
这一章将完成整个系统的构建。你将添加测验生成器和进度辅导系统,设置条件路由机制,使系统能够自动遍历所有主题,并最终运行一个完整的端到端测试流程。
4.1 测验生成器:LLM作为评分者
测验生成器是这个系统中在架构设计上最为有趣的一个组件,因为它使用了两次目的不同、配置也有所差异的LLM调用。
temperature=0.4这样的设置使得问题生成环节能够产生多样且不重复的问题,从而覆盖多个主题;而format="json"则确保了输出结果的结构化。
temperature=0.1在评分环节中起到了关键作用,这种设置使得评分过程更加客观、一致——对同一份答案进行两次评分应该会得到相同的分数。如果使用与问题生成相同的配置来进行评分,那么创造性因素就会干扰到客观评估的结果。
这种设计模式确实值得重视:当某个工作流程包含需求截然不同的子任务时,为这些子任务分别配置不同的LLM调用方式,往往能取得比试图同时处理所有任务的单一调用方式更好的效果。
# src/agents/quiz_generator.py
import json
import os
from datetime import datetime, timezone
from langchain_core.messages import AIMessage, HumanMessage, SystemMessage
from langchain_ollama import ChatOllama
from graph.state import QuizQuestion, QuizResult, get_current_topic
MODEL_NAME = os.getenv("OLLAMA_MODEL", "qwen2.5:7b")
OLLAMA_BASE_URL = os.getenv("OLLAMA_BASE_URL", "http://localhost:11434")
GENERATION_PROMPT = """你是一名为学习编程的学生设计测验题的人。
给定一个主题和相关的解释内容,你需要生成{n}道测验题,这些题目能够真正测试学生的理解能力,而不仅仅是他们能否复述记忆中的内容。
好的测验题应该要求学生:
- 将某个概念应用到新的情境中
- 解释某件事情为什么能起到作用,而不仅仅是要说明它具体做了什么
- 识别出一些边缘情况或常见的错误
- 对相关的概念进行比较
返回的格式必须是有效的JSON数据,不能包含任何散文或Markdown格式的内容:
{{
"questions": [
{{
"question": "一个清晰、具体的问题,问题以?结尾",
"expected_answer": "模型给出的答案应该由1到3句话组成",
"difficulty": "简单|中等|困难"
}}
]
}}
规则:
- 必须包含至少一道关于常见错误或容易混淆点的题目
* expected_answer应该简洁但完整
* 避免使用是/否类型的题目,而应该要求学生进行解释或演示"""
GRADING_PROMPT = """你是一位公正的老师,正在给学生的答案打分。
问题:{question}
模型给出的答案:{expected_answer}
学生的回答:{student_answer}
请诚实地给学生的答案打分。对于部分正确的答案,也可以给予适当的分数:
- 如果基本正确但存在一些小错误:0.7-0.9分
- 如果概念正确但不够精确:0.5-0.7分
- 如果只有部分内容是正确的:0.3-0.5分
- 如果完全错误:0.0-0.2分
返回的格式必须是有效的JSON数据,不能包含任何散文或Markdown格式的内容:
{{
"correct": true,
"score": 0.85,
"feedback": "请给出一条具体的反馈意见",
"missing_concept": "如果学生的答案正确,则此字段为空"
}}
"""
def generate_questions(topic: str, explanation: str, n: int = 3) -> list[dict]:
"""根据解释器的输出生成n道测验题。」
llm = ChatOllama(
model=MODEL_NAME,
base_url=OLLAMA_BASE_URL,
temperature=0.4,
format="json",
)
prompt = GENERATION_PROMPT.format(n=n)
try:
response = llm.invoke([
SystemMessage(content=prompt),
HumanMessage(content=f"主题:{topic}\n\n解释:\n{explanation}"),
])
data = json.loads(response.content)
questions = data.get("questions", [])
if questions and isinstance(questions, list):
return questions
except Exception as e:
print(f"[测验生成器] 在生成问题时,LLM调用失败:{e}")
# 备用方案:生成一道通用题目
return [{
"question": f"请用自己的话解释{topic}这个概念的含义,以及它为什么重要。",
"expected_answer": "一个能够体现学生概念理解能力的清晰解释。",
"difficulty": "中等",
}]
def grade_answer(question: str, expected: str, student_answer: str) -> dict:
"""利用LLM来评判学生的答案。」
llm = ChatOllama(
model=MODEL_NAME,
base_url=OLLAMA_BASE_URL,
temperature=0.1, # 为了保证评分的一致性,温度值设置得较低
format="json",
)
prompt = GRADING_PROMPT.format(
question=question,
expected_answer=expected,
student_answer=student_answer,
)
try:
response = llm.invoke([HumanMessage(content=prompt)])
return json.loads(response.content)
except Exception as e:
print(f"[测验生成器] 在评分时,LLM调用失败:{e}")
return {
"correct": False,
"score": 0.5,
"feedback": "无法自动评分,请手动审核。",
"missing_concept": "",
}
run_quiz函数负责协调交互式终端会话的运行。它首先调用generate_questions函数生成问题列表,然后通过input()函数将每个问题呈现给用户,收到用户的回答后立即对其进行评分,并最终生成QuizResult对象:
def run_quiz(topic: str, explanation: str) -> QuizResult:
"""在终端中运行交互式测验会话。"""
print(f"\n{'='*60}")
print(f"测验主题:{topic}")
print(f"{'='*60}")
print("请用你自己的话回答每个问题,按Enter键提交。\n")
questions_data = generate_questions(topic, explanation, n=3)
graded_questions = []
total_score = 0.0
weak_areas = []
for i, q_data in enumerate(questions_data, 1):
question_text = q_data["question"]
expected = q_data["expected_answer"]
difficulty = q_data.get("difficulty", "medium")
print(f"第{i}题(难度:{difficulty}):{question_text}")
user_answer = input("你的答案:").strip()
if not user_answer:
user_answer = "(未提供答案)"
print("评分中...")
grade = grade_answer/question_text, expected, user_answer)
score = float(grade.get("score", 0.0))
correct = bool(grade.get("correct", False))
feedback = grade.get("feedback", "")
missing = grade.get("missing_concept", "")
total_score += score
status = "✓" if correct else "✗"
print(f"{status} 分数:{score:.0%}。反馈信息:{feedback}\n")
if missing:
weak_areas.append(missing)
graded_questions.append(QuizQuestion(
question=question_text,
expected_answer=expected,
user_answer=user_answer,
correct=correct,
feedback=feedback,
score=score,
))
avg_score = total_score / len(questions_data) if questions_data else 0.0
correct_count = sum(1 for q in graded_questions if q.correct)
print(f"{'='*60}")
print(f>测验结束!总分数:{avg_score:.0%},正确答案数量为{correct_count}/{len(graded_questions)}")
if weak_areas:
print(f>需要复习的内容:{', '.join(set(weak_areas))}")
print(f"{'='*60}\n")
return QuizResult(
topic=topic,
questions=graded_questions,
score=avg_score,
weak_areas=list(set(weakAreas)),
timestamp.datetime.now(timezone.utc).isoformat(),
)
LangGraph节点会从消息历史记录中提取解释器的输出结果,然后调用run_quiz函数来运行测验。之后,它会将测验结果以及需要复习的内容保存到状态对象中:
def quiz_generator_node(state: dict) -> dict:
"""
LangGraph节点:测验生成器
读取数据:state["roadmap"], state["current_topic_index"], state["messages"]
写入数据:state["quiz_results"], state["weak_areas"], state["error"]
"""
topic = get_current_topic(state)
if topic is None:
return {"error": "当前没有主题,需要先运行课程规划器"}
# 从消息历史记录中提取解释器的最终回复。
# 解释器的输出应该是消息历史记录中的最后一条不包含“tool_calls”字段的AIMessage。
# 包含“tool_calls”字段的消息虽然也有内容,但也会被标记为需要复习的内容。
from langchain_core.messages import AIMessage
messages = state.get("messages", [])
explanation = ""
for msg in reversed(messages):
if isinstance(msg, AIMessage) and msg.content and not getattr(msg, "tool_calls", None):
explanation = msg.content
break
if not explanation:
print("[测验生成器] 警告:未找到解释内容,将生成通用测验")
explanation = f"主题:{topic.title}. {topic.description}"
print(f"\n[测验生成器] 正在为‘{topic.title}’生成测验")
quiz_result = run_quiz(topic.title, explanation)
existing_results = state.get("quiz_results", [])
all_weak_areas = list(set(
state.get("weak_areas", []) + quiz_result.weak_areas
))
return {
"quiz_results": existing_results + [quiz_result],
"weak_areas": all_weak_areas,
"error": None,
# 为了在会话中断或恢复后保持状态数据,显式地将当前状态传递下去
"roadmap": state.get("roadmap"),
"current_topic_index": state.get("current_topic_index", 0),
"session_id": state.get("session_id", ""),
}
💡 为什么测验结果会累积而非被替换
“进度辅导系统”需要当前的测验结果,而会议总结则需要所有的测验结果。因此,该系统会将新的测验结果添加到现有的列表中(即existing_results + [quiz_result]),而不是直接替换原有的列表。
薄弱环节分析功能也遵循相同的处理方式:它会使用set(existing + new)这种机制来去除重复项,从而生成一份包含学生在整个学习过程中遇到的所有薄弱环节的清单。
4.2 进度辅导系统:结果的综合分析与后续安排
“进度辅导系统”会依次执行三项任务:评估测验结果、向学生提供反馈,以及决定下一步该做什么。其中,确定后续学习方向这一决策是其最重要的职责。
# src/agents/progress_coach.py
import json
import os
from datetime import datetime, timezone
from langchain_core.messages import AIMessage, HumanMessage, SystemMessage
from langchain_ollama import ChatOllama
from graph.state import QuizResult, StudyRoadmap, get_latest_quiz_result
from mcp_servers.memory_server import memory_set
MODEL_NAME = os.getenv("OLLAMA_MODEL", "qwen2.5:7b")
OLLAMA_BASE_URL = os.getenv("OLLAMA_BASE_URL", "http://localhost:11434")
PASS_THRESHOLD = 0.5
COACHING/prompt = """你是一位鼓励学生学习的辅导老师,正在查看他们的测验结果。
请根据以下信息,给出一条简短而富有鼓励性的反馈:
- 学生学习的主题
- 他们的得分(0.0表示0%,1.0表示100%)
- 他们发现的任何薄弱环节
请仅返回有效的JSON格式内容:
{{
"summary": "2-3句鼓励性的语句",
"encouragement": "一条关于下一步学习的激励语句"
}}
请具体说明,提到所学的主题以及任何薄弱环节的名称。
千万不要让学生感到沮丧。低分意味着“需要更多练习”,而不是“你失败了”。”
"""
get_coaching_message函数会使用temperature=0.4和format="json"的参数来调用大型语言模型。只有设置适当的“温度值”,才能让反馈语句显得更加亲切、富有鼓励性。如果将temperature设置为0.1,虽然反馈内容在技术上是正确的,但会显得过于生硬、缺乏人情味:
def get_coaching_message(topic: str, score: float, weak_areas: list[str]) -> dict:
"""向大型语言模型请求一条个性化的辅导建议。”
llm = ChatOllama(
model=MODEL_NAME,
base_url=OLLAMA_BASE_URL,
temperature=0.4,
format="json",
)
context = {
"topic": topic,
"score_percent": f"{score:.0%}",
"weak_areas": weak_areas if weak_areas else ["未发现薄弱环节"],
}
try:
response = llm.invoke([
SystemMessage(content=COACHING_PROMPT),
HumanMessage(content=json.dumps(context)),
])
return json.loads(response.content)
except Exception as e:
print(f"[进度辅导系统] 调用大型语言模型失败:{e}")
return {
"summary": f"你在{topic}这个主题上得了{score:.0%}分。继续加油吧!",
"encouragement": "每个学习主题都是在前一个主题的基础上进行的。”
}
这个节点函数将所有内容整合到一起。它会读取最新的测验结果,更新路线图中的主题状态,将进度信息保存到MCP内存中,打印反馈信息,并推进主题索引的更新:
def progress_coach_node(state: dict) -> dict:
"""
LangGraph节点:进度教练
读取的数据:state["quiz_results"], state["roadmap"],
state["current_topic_index"], state["session_id"]
写入的数据:state["roadmap"], state["current_topic_index"],
state["messages"], state["error"]
"""
latest = get_latest_quiz_result(state)
if latest is None:
return {"error": "没有测验结果。必须先运行测验生成器"}
roadmap = state.get("roadmap")
if roadmap is None:
return {"error": "未找到路线图"}
idx = state.get("current_topic_index", 0)
session_id = state.get("session_id", "unknown")
score = latest.score
print(f"\n[进度教练] 主题:'{latest.topic}'")
print(f"[进度教练] 分数:{score:.0%}")
if latest.weak_areas:
print(f"[进度教练] 弱项:{', '.join(latest.weak_areas)}")
# 从大语言模型中获取辅导信息
coaching = get_coaching_message(latest.topic, score, latest.weak_areas)
# 更新路线图中的主题状态
topics = roadmap.get("topics", []) if isinstance(roadmap, dict) else roadmap.topics
if idx < len(topics):
topic = topics[idx]
new_status = "completed" if score >= PASS_THRESHOLD else "needs_review"
if isinstance(topic, dict):
topic["status"] = new_status
else:
topic.status = new_status
# 推进主题索引的更新
next_idx = idx + 1
all_done = next_idx >= len(topics)
# 将进度信息保存到MCP内存中
memory_set(session_id, f"progress_topic_{idx}", json.dumps({
"topic": latest.topic,
"score": score,
"weak_areas": latest.weak_areas,
"timestamp": datetime.now(timezone.utc).isoformat(),
}))
# 打印辅导反馈信息
print(f"\n{'─'*60}")
print(f"辅导建议:{coaching['summary']}")
print(f"{coaching['encouragement']}")
if all_done:
results = state.get("quiz_results", [])
avg = sum(r.score for r in results) / max(len(results), 1)
print(f"\n会话完成!平均分:{avg:.0%}")
else:
next_topic = topics[next_idx]
next_title = next_topic.get("title") if isinstance(next_topic, dict) else next_topic.title
print(f"\n下一个主题:'{next_title}'")
print(f"{'─'*60}\n")
return {
"roadmap": roadmap,
"current_topic_index": next_idx,
"messages": [AIMessage(content=coaching["summary"])],
"error": None,
}
这个函数中有两点需要特别注意。
为什么要在推进索引之前更新主题状态? 因为状态的变化(从"pending"变为"completed"或"needs_review")必须发生在topics[idx]这个位置上,而不是topics[next_idx]。索引是在更新了当前主题的状态之后才进行递增的。如果顺序搞反了,就会导致错误的主题被标记为已完成或需要审核。这是一个很容易被忽略的细微错误,因为从表面上看,会话的进行过程并没有出现问题。
为什么要将数据写入MCP内存中? “进度教练”会通过`memory_set`函数来保存每个学习主题的学习成果。这种设计在实际应用中非常有用:如果课程在发生崩溃或暂停后重新开始,内存服务器就会记录下学生已经学习了哪些内容以及他们的学习表现如何。在讲解后续的主题时,“解释器”可以通过`tool_memory_get`函数查看这些历史记录,并根据学生在哪些知识点上遇到了困难来调整讲解的重点。
4.3 构建完整图结构
当所有四个代理节点都被定义好后,`workflow.py`文件会将它们连接成一个完整的图结构。这个连接过程所对应的代码文件是系统中最短的:文件内容不足50行,主要由`add_node`、`add_edge`和`add_conditional_edges`这些函数组成。
# src/graph/workflow.py
import os
import sqlite3
from pathlib import Path
from langgraph.checkpoint.sqlite import SqliteSaver
from langgraph.graph import END, START, StateGraph
from agents.curriculum_planner import curriculum_planner_node
from agents.explainer import explainer_node
from agents.human_approval import human_approval_node
from agents.progress_coach import progress_coach_node
from agents.quiz_generator import quiz_generator_node
from graph.state import AgentState, session_is_complete
def route_afterApproval(state: dict) -> str:
if state.get("approved", False):
return "explainer"
return "curriculum_planner"
def route_afterCoach(state: dict) -> str:
if session_is_complete(state):
return "end"
return "explainer")
def build_graph(
db_path: str = "data/checkpoints.db",
interrupt_before: list | None = None,
):
"""
构建并编译学习加速器图结构。
参数:
db_path: SQLite检查点数据库的路径。
interrupt_before: 一个可选的节点名称列表,表示在处理这些节点之前需要暂停程序。
这个参数会被Streamlit用户界面用来拦截quiz_generator节点的执行。
"""
Path("data").mkdir(exist_ok=True)
if db_path == "data/checkpoints.db":
db_path = os.getenv("CHECKPOINT_DB", db_path)
builder = StateGraph(AgentState)
builder.add_node("curriculum_planner", curriculum_planner_node)
builder.add_node("human_approval", human_approval_node)
builder.add_node("explainer", explainer_node)
builder.add_node("quiz_generator", quiz_generator_node)
builder.add_node("progress_coach", progress_coach_node)
builder.add_edge(START, "curriculum_planner")
builder.add_edge("curriculum_planner", "human_approval")
builder.add_edge("explainer", quiz_generator_node)
builder.add_edge("quiz_generator", progress_coach_node)
builder.add_conditional_edges(
"human_approval",
route_afterApproval,
{"explainer": "explainer", "curriculum_planner": "curriculum_planner"},
)
builder.add_conditional_edges(
"progress_coach",
route_afterCoach,
{"explainer": "explainer", "end": END},
)
# 注意:必须直接创建数据库连接,不要使用上下文管理器。
# 由于LangGraph的不同线程会同时执行节点函数和检查点写入操作,因此必须确保连接在整个程序运行期间保持开放状态。
conn = sqlite3.connect(db_path, check_same_thread=False)
checkpointer = SqliteSaver(conn)
return builder.compile(
checkpointer=checkpointer,
interrupt_before=interrupt_before or [],
)
graph = build_graph()
interrupt_before这个参数值得我们仔细研究。终端界面(main.py)在human_approval_node内部使用了interrupt()函数来暂停程序,以等待用户的批准。因此,并不需要使用interrupt_before。
Streamlit用户界面(第9章的内容)需要另一种类型的暂停机制:它必须在quiz_generator_node开始运行之前停止执行,这样在图谱处理线程中就不会调用input()函数。在streamlit_app.py文件中,build_graph(interrupt_before=["quiz_generator"])这条代码会生成一个专门用于用户界面的图谱实例。
终端界面使用的图谱和Streamlit用户界面使用的图谱其实是用同一个构建工具生成的,只不过暂停的时机不同而已。
所有的路由处理函数都是纯Python代码,并没有调用任何大型语言模型。route_after_approval会读取state["approved"]这个值——这个布尔值是由humanapproval_node节点写入的。route_after_coach则会调用session_is_complete(state)函数,用来判断当前学习的主题是否已经超出了原定的学习计划。所有的控制流程都是基于确定的Python代码实现的,并不是由大型语言模型生成的随机结果。
4.4 完整的执行流程
当你运行python main.py "Learn Python closures"并在批准提示框中输入yes时,程序会按照以下顺序执行:
START
↓
curriculum_planner_node
读取:state["goal"]
写入:state["roadmap"], state["messages"]
↓
human_approval_node
使用interrupt()函数暂停执行,等待用户输入。
用户输入“yes”
写入:state["approved"] = True,并将后续的所有状态信息向前推进
↓ 调用route_afterapproval → “explainer”节点
explainer_node (主题0)
读取:state["roadmap"], state["current_topic_index"]
调用:tool_list_files, tool_search_notes, tool_read_file
写入:state["messages"]
↓
quiz_generator_node (主题0)
读取:state["messages"](提取解释内容)
调用:run_quiz() → 出3道题,生成3个评分结果
写入:state["quiz_results"], state["weak_areas"]
↓
progress_coach_node (主题0)
读取:state["quiz_results"], state["roadmap"]
写入:state["roadmap"](更新主题0的状态)
state["current_topic_index"] = 1
state["messages"](显示辅导信息)
↓ 调用route_after_coach → “explainer”节点(还有其他主题需要学习)
explainer_node (主题1)
...
↓
[循环继续执行,直到current_topic_index大于或等于roadmap.topics的长度]
↓ 调用route_after_coach → “end”
END
LangGraph会在每个节点执行完毕后检查状态。如果程序在quiz_generator_node和progress_coach_node之间崩溃,那么下次使用相同的会话ID调用graph.invoke(None, config=config)时,程序会从progress_coach_node处继续执行。因为测验结果已经保存在状态变量中了。
4.5 运行完整的系统
当四个节点都注册完成之后,可以运行以下命令:
rm -f data/checkpoints.db
python main.py "Learn Python closures and decorators from scratch"
你会看到规划器、审批提示,然后是整个执行流程:
[课程规划器] 为“学习Python闭包”制定学习计划...
[课程规划器] 计划已生成:5个主题,共4周
1. Python函数(60分钟)
2. 作用域与命名空间(45分钟)
3> 内层函数(60分钟)
4> 创建闭包(75分钟)
5> 装饰器基础(60分钟)
[人工审批] 正在等待审核学习计划...
> 同意
[人工审批] 学习计划已通过,开始学习环节。
[讲解器] 主题:“Python函数”
[讲解器] 第1次调用大语言模型...
→ tool_list_files({})
← ["closures.md", "decorators.md", "python_basics.md"]
[讲解器] 第2次调用大语言模型...
→ tool_read_file({'filename': 'python_basics.md'})
← # Python基础内容...
[讲解器] 经过4次调用大语言模型后完成解释
[讲解器] 解释内容共1938个字符
[测验生成器] 正在为“Python函数”生成测验题
============================================================
测验:Python函数
================================================------------
问题1 [中等难度]:……与……之间的区别是什么?
你的答案:函数是一等对象…
评分结果…
✓ 分数:80%。对一等函数的解释很到位。
……
[进度辅导器] 主题:“Python函数”
[进度辅导器] 分数:73%
────────────────────────────────────────────────────────────
辅导提示:你对Python函数的理解非常扎实,尤其是在……方面……
在继续学习闭包相关内容时,请继续巩固这一基础!
下一个主题:“作用域与命名空间”
────────────────────────────────────────────────────────────
[讲解器] 主题:“作用域与命名空间”
…
这个执行流程是自动进行的。当progress_coach_node将current_topic_index设置为1时,route_after_coach会返回“explainer”,然后图结构会使用更新后的索引来调用explainer_node。在main.py中并没有外部循环,图结构的拓扑设计本身就负责实现迭代功能。
📌 检查点: 运行完整的测试套件:
pytest tests/ -v
预期结果:会收集到184个测试用例,自动排除那些不需要使用Ollama的单元测试。这些单元测试涵盖了测验模块和辅导模块的功能:
pytest tests/test_quiz_and_coach.py -v
这些测试会模拟大语言模型的调用过程,并验证相关状态是否正确:比如quiz_results>是否能够被正确累积,current_topic_index>是否会在每次调用后递增,以及路由函数是否能够返回正确的结果。
在下一章中,你将深入了解从第2章开始就已经在后台运行的两项重要功能:能够在系统崩溃后依然保持数据持久性的状态保存机制,以及能够在用户作出响应时暂停整个学习流程并重新启动的人工监督机制。
第5章:状态持久性与人工监督
从第2章开始,两个问题就已经在幕后被悄悄解决了:该系统能够在发生故障时继续运行;同时它也可以在执行过程中暂停,等待人类操作员的决策。这一章明确阐述了这两个功能。理解这些机制,才是区分演示版本与正式生产环境中的系统的关键所在。
5.1 检点机制的实际作用
每当一个LangGraph节点的执行完成时,该框架会将完整的AgentState数据序列化后保存到SQLite数据库中,并以thread_id作为键进行存储。这个线程ID其实就是你在调用run_session函数时所指定的会话ID。
数据库的结构非常简单:
data/checkpoints.db
└── checkpoints table
thread_id = "a3f1b2c4" ← 你的会话ID
checkpoint blob ← 每个节点执行完成后序列化得到的AgentState数据
每个会话中都会生成多个检查点,这些检查点是按照节点执行的顺序依次创建的。LangGraph系统总是会加载最新的那个检查点。当你调用graph.invoke(None, config={"configurable": {"thread_id": "a3f1b2c4"}})时,LangGraph会读取与该线程ID对应的最新检查点数据,并从该检查点开始继续执行后续操作。
在src/observability/langfuse_setup.py文件中定义的get_langfuse_config函数用于生成包含线程ID的配置字典:
def get_langfuse_config(session_id: str) -> dict:
"""
生成用于运行图处理的配置字典,其中会包含作为检查点标识的线程ID。
这个配置字典会在每次调用graph.invoke()时被传递给该函数,无论是初次调用还是之后的恢复执行操作。LangGraph会利用这个线程ID来查找并加载正确的检查点数据。
"""
config = {
"configurable": {
"thread_id": session_id,
}
}
# 如果启用了Langfuse功能,还会在这里添加回调函数(详见第6章)
handler = get_langfuse_handler(session_id)
if handler:
config["callbacks"] = [handler]
return config
这个配置对象是连接同一个会话中所有graph.invoke调用操作的纽带,它确保所有的操作都能参考到相同的检查点历史数据。
💡 SqliteSaver的连接模式
SqliteSaver可以通过两种方式初始化。使用上下文管理器的方式(with SqliteSaver.from_conn_string(...) as checkpointer)会在with块执行结束后自动关闭数据库连接。但由于graph = build_graph()是一个在整个程序运行期间都存在的模块级变量,因此使用这种初始化方式会导致build_graph()返回后立即关闭数据库连接,后续的graph.invoke调用都会因为尝试写入已关闭的数据库而失败。
正确的使用方法是先执行conn = sqlite3.connect(db_path, check_same_thread=False),然后再创建checkpointer = SqliteSaver(conn)对象。这样,数据库连接就会在整个程序运行期间保持打开状态。
必须设置check_same_thread=False这个参数。SQLite的默认设置是不允许在一个线程中创建的数据库连接被另一个线程使用的。LangGraph在内部会使用不同的线程来执行节点函数和写入检查点数据,如果不设置这个参数,程序运行时就会出现ProgrammingError: SQLite对象只能在创建它的线程中使用这样的错误。
5.2 人工审批节点:中断与恢复执行
人工审批节点会使用 `interrupt()` 来暂停图的执行过程。LangGraph正是通过这种方式实现人工干预机制的:当该节点被触发时,图的执行会立即停止,当前状态会被保存为检查点,然后控制权会返回给调用者。当调用者再次执行 `graph.invoke(Command(resume=value), config=config)` 时,图的执行会从上次中断的地方继续进行,`decision` 的值会设置为 `value`。
# src/agents/human_approval.py
from langgraph.types import interrupt
from graph.state import StudyRoadmap
def human_approval_node(state: dict) -> dict:
"""
LangGraph节点:人工审批
输入参数:state["roadmap"]
输出参数:state["approved"]——若批准则为True,否则为False;
此外还会返回状态中的其他所有关键信息(详见下方说明)。
当 approved为False时,系统会通过条件边将执行流程重定向回Curriculum Planner节点,以便生成新的学习计划;
当 approved为True时,图的执行会继续进入Explainer节点。
"""
roadmap = state.get("roadmap")
if roadmap is None:
return {"approved": True}
print(f"\n[人工审批] 正在暂停执行,以等待学习计划的审核...")
# interrupt()函数会在这里暂停执行的进程。
# 被传递给interrupt()的字典包含了所需的信息,调用者可以根据这些信息来向用户展示相应的内容。
# 当调用者执行Command(resume=value)时,图的执行会继续进行。
decision = interrupt({
"type": "roadmap_approval",
"roadmap": roadmap,
"prompt": (
"这个学习计划看起来可行吗?\n"
"输入‘yes’开始学习\n"
"输入‘no’则生成新的计划"
),
})
approved = str(decision).lower().strip() in ("yes", "y", "ok", "approve")
if approved:
print(f"[人工审批] 学习计划已获批准,现在开始学习。")
else:
print(f "[人工审批] 学习计划未被批准,正在重新生成中...")
# 在LangGraph 1.1.0版本中,调用Command(resume=...)之后,下一个节点只会接收到当前节点返回的关键信息,而不会收到中断前的全部状态数据。
# 显式返回所有状态信息可以确保下游的代理节点(如explainer、quiz_generator、progress_coach等)能够获取到完整的学习计划信息。
return {
"approved": approved,
"roadmap": roadmap,
"goal": state.get("goal", ""),
"session_id": state.get("session_id", ""),
"current_topic_index": state.get("current_topic_index", 0),
"quiz_results": state.get("quiz_results", []),
"weak_areas": state.get("weak_areas",`),
"study_materials_path": state.get("study_materials_path",
"study_materials/sample_notes"),
"error": None,
}
在这个函数底部关于LangGraph 1.1.0的评论记录了一种在实际使用中会遇到的现象:在执行Command(resume=...)之后,下一个节点的状态仅包含被中断的节点明确返回的信息。如果该节点只返回了{"approved": True},那么解释器节点接收到的状态中将不包含roadmap、session_id或current_topic_index这些字段,因此会立即抛出错误。
这并不是你的代码中的错误,而是LangGraph 1.1.0在任务被中断或恢复后处理状态时的一种已知行为。解决方法就是让节点明确地返回所有必要的状态信息。
💡 interrupt()与interrupt_before的区别
LangGraph提供了两种暂停图计算流程的方式。builder.compile()中的interrupt_before=["node_name"]会在指定节点之前暂停整个计算流程,这种设置是在编译时确定的。而interrupt()则是在某个节点的执行过程中被调用,用于在该节点执行到中途时暂停计算流程,并且可以携带额外的数据(即一个字典,调用者可以通过这个字典来决定向用户展示哪些信息)。
在human_approval_node中使用interrupt()是因为审批环节需要将roadmap对象传递给后续的处理节点。如果使用interrupt_before,虽然可以在节点开始执行之前暂停计算流程,但由于roadmap是在该节点的前置节点curriculum_planner_node中生成的,因此这种方式会导致数据传递顺序混乱。而使用interrupt()则可以确保节点能够按照正确的顺序接收roadmap对象、生成审批所需的数据,然后再暂停计算流程。
Streamlit UI使用build_graph(interrupt_before=["quiz_generator"])则是出于另一个原因:它需要在quiz_generator_node开始执行之前停止整个图计算流程,这样就可以确保在图计算线程中不会调用input()函数。这两种方法对于它们各自适用的场景来说都是正确的。
5.3 在main.py中处理中断事件
调用graph.invoke的方法时,程序需要能够处理图计算流程被暂停的情况。LangGraph会通过在结果字典中添加"__interrupt__"这一键来表示计算流程已被暂停。而中断时传递的数据(也就是你传递给interrupt()的参数)则存储在result["__interrupt__"][0].value中。
# main.py: 中断/恢复循环
from langgraph.types import Command
result = graph.invoke(state, config=config)
while "__interrupt__" in result:
interrupt_payload = result["__interrupt__"][0].value
roadmap = interrupt_payload.get("roadmap")
# 向用户展示学习计划
if roadmap:
print(f"\n{'='*60}")
print("建议的学习计划")
print(f"{'='*60}")
print(f"目标:{roadmapgoal}")
print(f"时长:{roadmap.total_weeks}周,每周{roadmap.weekly_hours}小时\n")
for i, topic in enumerate(roadmap.topics, 1):
prereqs = (f" (所需条件:{', '.join(topic.prerequisites)}")
if topic.prerequisites else "")
print(f" {i}. {topic.title}(预计耗时{topic.estimated_minutes}分钟){prereqs}")
print(f" {topic.description}")
print(f"\n{interrupt_payload.get('prompt', '继续吗?')}")
user_input = input("> ").strip()
# 根据用户的决定恢复计算流程
# Command(resume=value)用于将用户输入的信息传递给被中断的节点
result = graph.invoke(Command(resume=user_input), config=config)
while循环用于处理这样一种情况:如果用户拒绝当前的路线规划方案,规划器会重新进行计算,进而触发另一次中断。如果用户输入“no”,程序会再次执行curriculum_planner_node模块,生成新的路线规划方案,然后再次调用interrupt()函数,循环会显示新的规划方案。用户可以不断拒绝这些方案,直到自己满意为止。只有当程序完整运行完毕且没有再次发生中断时,这个循环才会结束。
这种程序结构确实值得仔细理解:
graph.invoke(initial_state, config)
→ 执行流程:curriculum_planner → human_approval (此时会触发interrupt()函数)
→ 返回结果:{"__interrupt__": [...]} ← 调用者会从这里读取新的路线规划方案
main.py会显示当前的路线规划方案,并收集用户输入的“yes”或“no”答案
graph.invoke(Command(resume="yes"), config)
→ 继续执行流程:human_approval (决策结果为“yes”,approved变量被设置为True)
→ 接下来会依次执行explainer、quiz_generator、progress_coach等模块 → 最后结束程序
→ 返回最终状态信息 ← 此时“__interrupt__”键的值将为None
在两次调用graph.invoke函数时,包含thread_id键的config字典内容是完全相同的。正是这个机制让LangGraph能够从上次中断的地方继续执行程序,而不是从头开始重新计算。
5.4 恢复崩溃的会话
处理用户确认操作的同一机制也被用于恢复程序因异常而崩溃的情况。如果程序在explainer_node和quiz_generator_node之间意外终止,SQLite数据库中的检查点会保存上一次成功执行的节点的状态。重新启动程序并使用相同的thread_id继续执行,程序就会从上次中断的地方继续运行。
main.py文件中使用的--resume参数就是实现这一功能的:
# main.py
if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser(description="Learning Accelerator")
parser.add_argument("goal", nargs="?", default="Learn Python closures and decorators from scratch")
parser.add_argument("--resume", metavar="SESSION_ID",
help="Resume an existing session by ID")
args = parser.parse_args()
if args.resume:
run_session(goal="", session_id=args.resume)
else:
run_sessiongoal=args.goal)
在run_session函数中,恢复会话与重新开始会话的区别仅在于这一行代码:
# 对于新开始的会话:需要提供初始状态
state = initial_state(goal, session_id)
# 对于恢复会话:将state设置为None,LangGraph会从检查点中读取数据
state = None if is.resume else initial_stategoal, session_id)
result = graph.invoke(state, config=config)
当state的值为None时,LangGraph会从config字典中获取与当前thread_id对应的最新检查点数据,并从上次成功执行的节点继续执行程序。只需记住原始会话开始时使用的会话ID即可。
# 原始会话信息:会话ID:a3f1b2c4
# 进程在会话进行到中途时终止
python main.py --resume a3f1b2c4
============================================================
学习加速器
会话ID:a3f1b2c4
正在恢复现有会话...
============================================================
[讲解器] 主题:'创建闭包'
...
程序会从下一个未完成的节点继续执行。已经完成的内容(包括解释、测验结果和辅导信息)会保持原有的状态,只有剩余的部分会重新被执行。
5.5 你需要了解的序列化细节
当LangGraph从SQLite中加载检查点数据时,它会将存储的状态反序列化为Python对象。对于基本类型(如字符串、整数、字符串列表),这一过程是透明的。但对于你自定义的数据类(Topic, StudyRoadmap, QuizResult),LangGraph会使用其内部的msgpack序列化器,因此这些数据类可能会以普通字典的形式返回,而不是数据类的实例。
这就是为什么在state.py中,get_current_topic、session_is_complete和get_latest_quiz_result这三个函数都能处理这两种形式的数据:
def get_current_topic(state: dict) -> Topic | None:
roadmap = state.get("roadmap")
if roadmap is None:
return None
# 在反序列化检查点数据后,roadmap可能是一个字典
if isinstance(roadmap, dict):
topics_raw = roadmap.get("topics", [])
else:
topics_raw = roadmap.topics
idx = state.get("current_topic_index", 0)
if idx >= len(topics_raw):
return None
t = topics_raw[idx]
# 反序列化后,单个主题也可能是一个字典
if isinstance(t, dict):
return Topic.from_dict(t)
return t
这也是为什么Topic, StudyRoadmap和QuizResult这三个类都包含了from_dict这个类方法。这样做并非为了提供便利,而是为了确保恢复会话的功能能够正常工作。
在任何需要对自定义对象进行检查点处理的生产系统中,都应该遵循这一原则。如果你的状态数据中包含数据类或Pydantic模型,那么就需要为所有的状态访问方法添加处理逻辑,以便既能处理原始形式的数据,也能处理反序列化后的数据。不要假设数据的类型一定会与你当初输入时的类型相同,而应该在使用数据的时候进行验证。
5.6 测试会话持久性
运行一个会话,在中途终止它,然后验证恢复会话的功能是否正常:
rm -f data/checkpoints.db
python main.py "Learn Python closures"
当路线图显示出来后,输入yes,等待直到看到“[讲解器] 在N次LLM调用后完成”这一提示,然后按Ctrl+C终止进程。注意一开始打印出的会话ID。
现在尝试恢复会话:
python main.py --resume
会话应该从“测验生成器”那里继续进行。解释部分已经准备好了,因此系统会直接进入第一个主题的题目。
📌 检查点: 运行检查点测试:
pytest tests/test_checkpointing.py -v
预期结果:20个测试全部通过。这些测试用于验证检查点的功能:即在中途被中断的会话能够继续进行,并且最终会得到预期的结果;同时还要确保字典与数据类的反序列化操作能够正确执行。
在企业环境中,某些销售支持平台也会使用类似的检查点机制来进行经理审批流程。
当课程规划系统为新员工制定培训计划时,系统会暂停当前操作,并向经理发送通知。经理可以在网页仪表板上查看该计划,然后进行批准或修改,最后提交。这个提交操作实际上就是执行以下HTTP POST请求:graph.invoke(Command(resume=decision), config=config)。LangGraph的代码与终端版本是完全相同的,唯一不同的是通知机制和数据收集方式。
在下一章中,你将学习如何增加系统的可观测性:Langfuse能够记录下所有的代理调用、大语言模型的调用过程以及各种工具的执行情况,这些信息都会被整理成结构化的追踪日志,供你查询和可视化分析。
第6章:使用Langfuse实现系统可观测性
对于那些虽然没有出现错误但却产生了错误结果的多代理系统来说,进行调试要比那些会崩溃的系统更加困难。传统的基础设施监控指标(如CPU使用率、内存占用量、请求延迟、错误率等)只能说明系统本身运行正常,但无法揭示代理们为何会做出错误的判断。因此,我们需要一种不同的可观测性机制:这种机制不仅能记录某个操作是否被执行,还能清楚地显示模型做出了什么决策以及为什么如此决策。
Langfuse正好满足了这一需求。它能够记录下每一次大语言模型的调用、每一个工具的执行过程,以及每一步操作中的所有详细信息,并将这些数据按照会话进行分组整理。当出现问题时,你只需要查看对应会话的追踪日志,就能清楚地了解每个代理接收了什么信息、执行了哪些操作、最终得到了什么结果。
在本章中,我们只需通过一个简单的集成点,就将Langfuse添加到系统中,并确保系统在是否配置了Langfuse的情况下都能正常运行。
6.1 使用Docker在本地运行Langfuse
在本教程中,Langfuse是作为独立服务在本地运行的。所有的追踪数据都会保存在你的机器上,既不需要API密钥,也不会有任何数据离开你的网络环境。仓库中的docker-compose.yml文件用于启动整个Langfuse系统:
# docker-compose.yml services: langfuse-server: image: langfuse/langfuse:3 depends_on: postgres: condition: service_healthy ports: - "3000:3000" environment: DATABASE_URL: postgresql://postgres:postgres@postgres:5432/langfuse NEXTAUTH_URL: http://localhost:3000 NEXTAUTH_SECRET: local-dev-secret-change-in-production SALT: local-dev-salt-change-in-production ENCRYPTION_KEY: "0000000000000000000000000000000启动相关服务:
docker compose up -d等待大约20秒,直到Postgres完成初始化。然后打开http://localhost:3000,创建一个账户(本地账户,无需进行邮箱验证),并创建一个名为
learning-accelerator的项目。Langfuse会将在设置 → API密钥中显示你的API密钥。请将公钥和私钥都复制到你的
.env文件中:LANGFUSE_PUBLIC_KEY=pk-lf-... LANGFUSE_SECRET_KEY=sk-lf-... LANGFUSE_HOST=http://localhost:30006.2 可观测性模块
所有与可观测性相关的代码都位于
src/observability/langfuse_setup.py文件中。项目中的其他文件均保持不变。代理节点不会导入这个模块中的任何代码,也不会调用Langfuse提供的任何函数,更不知道可观测性功能是否正在运行。这种架构正是实现可观测性的正确方式。如果你在代理函数的内部添加日志记录语句,那么就会使代理逻辑与可观测性框架产生耦合。如果用其他工具替换Langfuse,就需要修改所有的代理节点代码。而回调模式能够确保这种耦合不会影响到你的业务逻辑。
这个模块包含了四个函数,这些函数之间存在单向依赖关系,每个函数都是在前一个函数的基础上实现的:
# src/observability/langfuse_setup.py import os def _langfuse_configured() -> bool: """ 检查环境中是否配置了Langfuse所需的凭据。 如果缺少任意一个密钥或它们的值为空,函数将返回False。在这种情况下,系统会继续正常运行,而不会出现错误。 """ public_key = os.getenv("LANGFUSE_PUBLIC_KEY", "").strip() secret_key = os.getenv("LANGFUSE_SECRET_KEY", "").strip() return bool(public_key and secret_key)
_langfuse_configured()这个函数被其他所有函数所使用。如果没有配置所需的凭据,系统仍然可以正常运行,但这只是一种“优雅降级”的处理方式——可观测性功能是一种可选的增强功能,并非必不可少的组件。def get_langfuse_handler(session_id: str, user_id: str = "local"): """ 为当前会话创建一个Langfuse回调处理器;如果没有配置,则返回None。 这个处理器是由Langfuse提供的LangChain CallbackHandler。当它被添加到graph.invoke()方法中时,它会自动拦截所有的LLM调用、工具调用以及链式操作。因此,无需修改代理节点的代码即可实现这一功能。 """ if not _langfuse_configured(): return None try: from langfuse.langchain import CallbackHandler return CallbackHandler( public_key=os.getenv("LANGFUSE_PUBLIC_KEY"), secret_key=os.getenv("LANGFUSE_SECRET_KEY"), host(os.getenv("LANGFUSE_HOST", "http://localhost:3000"), session_id=session_id, user_id=user_id, tags=["learning-accelerator", "local-inference"], metadata={ "model": os.getenv("OLLAMA_MODEL", "qwen2.5:7b"), "framework": "langgraph", }, ) except ImportError: print("[Observability] 未安装Langfuse。请运行:pip install langfuse") return None except Exception as e: print(f"[Observability] 创建处理器失败:{e}") return None
session_id被传递给CallbackHandler后,Langfuse UI会将同一学习会话中的所有追踪数据汇总到一起。该会话中所有的LLM调用、工具使用记录以及节点执行结果都会显示在同一个会话视图下。你可以通过这个视图完整地追踪从目标输入到最终测验结果的整个推理过程。
tags列表在Langfuse中可以作为可过滤的标签来使用。如果你同时运行了多个项目,"learning-accelerator"这个标签可以帮助你只查看属于当前系统的追踪数据。def get_langfuse_config( session_id: str, user_id: str = "local", extra_config: dict | None = None, ) -> dict: """ 为某个会话生成完整的LangGraph运行配置。 将检查点所使用的thread_id与Langfuse的回调处理程序关联起来。 这是main.py中唯一会调用的函数。每个功能对应一个配置字典,所有必要的设置都通过这个函数完成。 返回一个可以直接作为`config`参数传递给graph.invoke()的字典。 """ config = { "configurable": {"thread_id": session_id}, } if extra_config: config.update(extra_config) handler = get_langfuse_handler(session_id, user_id) if handler: config["callbacks"] = [handler] print(f"[Observability] 正在追踪会话 {session_id} → " f"{os.getenv('LANGFUSE_HOST', 'http://localhost:3000')}") else: print(f "[Observability] Langfuse未配置,因此不会进行追踪。") return config
get_langfuse_config将两个不同的信息合并到了一个字典中:LangGraph用于创建检查点的thread_id,以及LangChain用于处理可观测性事件的callbacks列表。
graph.invoke(state, config=config)会将完整的配置参数传递给LangGraph,而LangGraph会将configurable键对应的部分发送给检查点系统,将callbacks键对应的部分发送给回调处理系统。这两个系统之间不会互相干扰。def flush_langfuse() -> None: """ 在进程退出之前,清除所有未处理的追踪数据。 Langfuse会在后台线程中发送追踪数据。如果不执行这个操作,进程退出时最近几秒内的追踪数据可能会丢失。 应在main.py中的所有graph.invoke()调用之后执行这个函数。
flush这个操作在实际使用中非常重要。Langfuse会批量处理追踪数据并异步发送它们,而像python main.py这样的短时运行进程可能会在数据被发送之前就结束。因此,flush()会一直等待队列中的数据全部被处理完毕才会退出。6.3 单一集成点
main.py中,上述所有功能都只在两个地方被集成进去:# main.py from observability.langfuse_setup import get_langfuse_config, flush_langfuse def run_session(goal: str, session_id: str | None = None) -> None: ... # 通过一次函数调用即可完成以下操作:{"configurable": {"thread_id": session_id}} # 这次函数调用会返回相同的字典;如果Langfuse已被配置,那么还会返回一些回调函数。 config = get_langfuse_config(session_id) result = graph.invoke(state, config=config) while "__interrupt__" in result: ... result = graph.invoke(Command(resume=user_input), config/config) print_session_summary(result) # 在退出程序之前,需要清除Langfuse中保存的数据。 flush_langfuse()这就是真正的完全集成。无需在代理代码中添加任何导入语句,也无需在代码库中散布Langfuse相关的调用代码,更不需要在Node函数中加入条件判断语句。回调处理程序会在LangChain框架层面拦截所有的调用请求,因此你的代理代码根本不会发生任何变化。
💡 回调系统会自动捕获哪些信息
CallbackHandler会与LangChain的回调协议进行连接。每当任何一个兼容LangChain的对象(无论是ChatOllama、某个工具还是图谱中的节点)开始或结束执行时,都会触发回调事件。Langfuse的处理程序会捕获这些事件,并将它们记录为跟踪信息。对于这个系统来说,这意味着:在所有五个代理中进行的每一次
llm.invoke()调用、在解释器工具的调用循环中执行的每一条TOOL_MAP[name].invoke(args)命令、每个节点的启动和结束时间,以及每一步中的完整消息内容,都会被自动捕获下来——而这一切都不需要对代理代码进行任何修改。6.4 在Langfuse用户界面中可以看到什么
按照以下配置运行Langfuse程序:
python main.py "Learn Python closures"打开http://localhost:3000,然后进入Traces页面。你会看到自己会话的跟踪信息。展开这些信息后,你会看到如下内容:
Session: a3f1b2c4 ├── curriculum_planner_node 245ms │ └── ChatOllama.invoke 238ms │ input: "Create a study roadmap for..." │ output: {"goal": "Learn Python closures", "topics": [...]]} │ ├── human_approval_node (interrupted, user input collected) │ ├── explainer_node 4,821ms │ ├── ChatOllama.invoke 312ms → tool_list_files() │ ├── tool_list_files 2ms ← ["closures.md", ...] │ └── ChatOllama.invoke 287ms → tool_read_file("closures.md") │ ├── tool_read_file 1ms ← "# Python Closures\n..." │ ├── ChatOllama.invoke 1,204ms → (no tool calls. final explanation) │ └── tool_memory_set 1ms │ ├── quiz_generator_node 8,342ms │ ├── ChatOllama.invoke 1,890ms (question generation) │ ├── ChatOllama.invoke 892ms (grading Q1) │ ├── ChatOllama.invoke 874ms (grading Q2) │ └── ChatOllama.invoke 891ms (grading Q3) │ └── progress_coach_node 1,102ms └── ChatOllama.invoke 1,088ms这条跟踪信息能立即告诉你三件事情,而这些是任何基础设施指标都无法揭示的。
各代理的延迟情况。在四次LLM调用中,测验生成器总共花费了8秒钟的时间。如果你需要优化延迟性能,那么重点应该放在评分相关的调用上——其中三次调用每次耗时约900毫秒,这些调用是有可能被并行处理的。
工具调用的顺序。解释器程序按照正确的顺序依次调用了
tool_list_files、tool_read_file,然后把结果写入内存。如果调用顺序错误,你在这里就能立刻发现这个问题,而无需查看任何代码。每一步中LLM的输入和输出内容。如果课程规划器生成的路线图存在格式错误,你可以在跟踪信息中看到LLM输出的原始数据;如果评分系统给出了错误的分数,你也能清楚地了解到它接收到了什么输入,以及最终返回了什么结果。
6.5 平稳降级
该系统的设计使得其在是否配置了 Langfuse 的情况下都能正常运行。如果您没有设置环境变量,
_langfuse_configured()将返回 False,而get_langfuse_config会返回仅包含thread_id的最小配置:# 未配置 Langfuse config = get_langfuse_config("a3f1b2c4") # 返回结果:{"configurable": {"thread_id": "a3f1b2c4"}} # 配置了 Langfuse config = get_langfuse_config("a3f1b2c4") # 返回结果:{"configurable": {"thread_id": "a3f1b2c4"}, # "callbacks": []} 代理节点接收到的配置信息中并不包含这两种版本中的任何一种;它们只收到
state数据。实际上,这些配置信息是由 LangGraph 和 LangChain 等基础设施使用的,而非您的业务逻辑所需要。这种设计才是适合生产环境的方案:可观测性相关组件应该在出现故障时能够安静地退出运行,并实现平稳的降级;您的追踪后端出现故障也不应该导致整个应用程序崩溃。
6.6 运行可观测性测试
pytest tests/test_observability.py -v预期结果:16项测试全部通过,且不需要使用 Langfuse 服务器。这些测试会模拟对
_langfuse_configured函数的调用,并验证以下内容:
-
get_langfuse_config在返回的配置对象中,总会包含thread_id这一项 -
当未配置 Langfuse 时,
callbacks这项键根本不会出现 -
如果缺少认证凭据,调用
flush_langfuse也不会产生任何实际效果 -
当遇到
ImportError时,get_langfuse_handler会返回None,而不会引发异常
这些测试中的任何一项都不需要 Langfuse 服务器处于运行状态;它们只是用来验证模块在配置了 Langfuse 或未配置 Langfuse 的情况下是否都能正常工作。
在某些受监管的行业里,企业级多代理系统使用可观测性技术,既是为了满足合规性要求,也是为了辅助调试工作。Langfuse 提供的追踪记录能够详细记录每一次大语言模型的调用过程(包括输入数据、输出结果、时间戳以及会话 ID),这些记录可以用于接受监管机构的审查;同样,那些帮助您排查错误答案的追踪信息,也能向审核人员清晰地展示模型接收了哪些输入信息,以及最终产生了什么输出结果。
在下一章中,您将添加自动化的质量评估机制:通过运行 DeepEval 这一工具,可以检测解释器的输出是否与您的预期一致,同时也能验证测验生成器提出的问题是否与主题相关。
第7章:使用 DeepEval 评估代理质量
可观测性技术能帮助您了解发生了什么;而评估机制则能告诉您这些事情是否有实际意义、是否达到了预期的效果。
一个多代理系统即使在整个运行过程中没有出现任何错误,仍然有可能生成出包含错误信息的解释结果、提出与问题无关的测试题目,或者将错误的答案判定为正确答案。
这些缺陷在基础设施指标中是无法被检测出来的,大多数单元测试也无法发现它们。唯一可靠的检测方法就是使用另一个大型语言模型来评估第一个模型的输出结果。
本章介绍了如何利用DeepEval以及自定义的OllamaJudge类来进行自动化质量评估。所有的评估过程都在本地完成,既不需要云服务API密钥,也不会产生任何额外的评估费用。
7.1 以大型语言模型作为评估工具
“以大型语言模型作为评估工具”这种模式指的是利用一个大型语言模型来评价另一个模型的输出结果。当解释器生成了解释内容后,评估模型会阅读这些解释以及相关的原始资料,并回答一个结构化问题:“这个解释中的每一条观点都有相应的原始资料支持吗?”
这种评估方法并非完美无缺,评估模型也可能会犯错。但对于这里所关注的这类定性评估来说(比如解释是否准确、提出的问题是否相关、评分是否公平),经过精心设计的基于大型语言模型的评估系统其表现通常会优于基于规则的经验判断方法,并且在大规模应用时也比人工审核更为实用。
DeepEval提供了这套评估框架,它负责处理评估提示的生成、评分标准的设定以及各项指标的汇总工作;而你需要提供测试用例,必要时还可以提供一个自定义模型。
7.2 OllamaJudge类
DeepEval默认使用OpenAI作为评估工具。如果你希望将评估过程保持在本地进行,就可以继承DeepEvalBaseLLM类,并将其与你的Ollama实例连接起来:
# tests/test_eval.py
import os
from deepeval.models import DeepEvalBaseLLM
from langchain_ollama import ChatOllama
class OllamaJudge(DeepEvalBaseLLM):
"""
一个使用本地Ollama模型的自定义评估工具。
DeepEval通过DeepEvalBaseLLM接口支持自定义模型。我们通过封装ChatOllama来实现同步和异步生成功能。
为了保证评估结果的一致性,评估模型会在温度设置为0.0的情况下运行。同样的测试如果被执行两次,应该会得到相同的评分。
"""
def __init__(self):
self.model_name = os.getenv("OLLAMA_MODEL", "qwen2.5:7b")
self.base_url = os.getenv("OLLAMA_BASE_URL", "http://localhost:11434")
def load_model(self):
return ChatOllama(
model=self.model_name,
base_url=self.base_url,
temperature=0.0, # 评估时使用确定性的温度设置
)
def generate(self, prompt: str) -> str:
return self.load_model().invoke(prompt).content
async def a_generate(self, prompt: str) -> str:
return self.generate.prompt)
def get_model_name(self) -> str:
return f"ollama/{self.model_name}"
def get_judge_model():
"""返回一个OllamaJudge对象;如果deepeval没有安装,则返回None。"""
try:
return OllamaJudge()
except ImportError:
return None
将评估模型的温度设置为0.0是一个经过慎重考虑的决定。这样做的目的是为了确保评估结果的稳定性:同样的测试如果被执行两次,应该会得到相同的评分。如果温度设置得较高,就会产生随机性,从而使得很难判断评分的变化是反映了真正的质量变化,还是仅仅是由于抽样误差导致的。
7.3 两级测试策略
该测试套件采用了两种不同执行方式的测试机制。
单元测试运行速度很快,不需要使用Ollama模型,并且会在每次代码修改后自动执行。这些测试用于验证代码的结构是否正确:例如,《generate_questions》函数是否能够返回包含正确键值的字典列表?《grade_answer》函数是否总是会返回包含`correct`、`score`和`feedback`字段的字典?《get_coaching_message》函数是否始终会返回`summary`和`encouragement`字段?
评估测试运行速度较慢(每次测试需要30到120秒),且必须在使用Ollama模型的情况下才能执行。这类测试用于验证代码的质量:解释器的输出是否与预期一致?评分系统的分数是否能够真实反映答案的质量?
这种分离机制在两个地方得到了体现。首先,在`pyproject.toml`文件中添加了`addopts = "-m 'not eval'"`这一配置,这样在执行`pytest tests/`命令时,评估测试就会被默认跳过:
[tool.pytest.ini_options]
pythonpath = ["src"]
testpaths = ["tests"]
asyncio_mode = "auto"
addopts = "-m 'not eval'"
markers = [
"unit: fast tests, no external dependencies",
"eval: slow evaluation tests requiring Ollama (LLM-as-judge)",
]
其次,所有的评估测试类和函数都使用了`@pytest.mark.eval`标记:
@pytest.mark(eval
class TestExplainerQuality:
...
如果需要手动执行评估测试,可以使用以下命令:
pytest tests/test_eval.py -m eval -v -s
其中`-s`选项用于禁用输出记录功能,这样就可以实时查看模型的评分结果及推理过程。
7.4 `conftest.py`中的共享测试 fixture
文件`tests/conftest.py`中包含了所有测试文件共用的测试fixture:
# tests/conftest.py
import sys
from pathlib import Path
import pytest
sys.path.insert(0, str(Path(__file__).parent.parent / "src"))
def pytestconfigure(config):
"""添加自定义标记,以避免pytest对未知标记发出警告."""
config.addinivalue_line(
"markers",
"eval: 用于标识需要使用Ollama模型的测试"
)
config.addinivalue_line(
"markers",
"unit: 用于标识快速运行且不依赖外部库的测试"
)
@pytest.fixture
def sample_roadmap():
"""一个简单的StudyRoadmap对象,用于单元测试。"""
from graph.state import StudyRoadmap, Topic
return StudyRoadmap(
goal="学习Python闭包",
total_weeks=2,
topics=[
Topic(
title="闭包的概念",
description="了解闭包是如何捕获外部作用域中的变量的",
estimated_minutes=60,
),
Topic(
title "实际应用闭包",
description="将闭包应用于实际问题,例如工厂模式、记忆化技术",
estimated_minutes=45,
prerequisites=["Closures Explained"],
),
],
)
@pytest.fixture
def sample_state(sample_roadmap):
"""一个简单的AgentState对象,用于单元测试。"""
from graph.state import initial_state
state = initial_state("学习Python闭包", "test-session-001")
state["roadmap"] = sample_roadmap
state["current_topic_index"] = 0
return state
@pytest.fixture
def closures_note_content():
"""
文件closures.md的内容,用于在验证测试中作为参考资料。如果该文件不存在,则会使用内嵌的摘要信息。
"""
notes_path = (
Path(__file__).parent.parent
/ "study_materials/sample_notes/closures.md"
)
if notes_path.exists():
return notes_path.read_text(encoding="utf-8")
return (
"闭包是一种嵌套函数,即使外部函数已经返回,它仍然能够记住外部作用域中的变量。"
)
closures_note_content这个测试用例用于检测解释结果的准确性。DeepEval的FaithfulnessMetric会要求评估者根据这些说明内容来验证解释中提到的每一个观点。如果解释器编造了说明中并不存在的事实,那么这一评估机制就能发现这一点。
7.5 解释器的质量测试
对这些解释器的评估主要考察两个方面:首先,输出内容是否与原始说明保持一致;其次,这些解释是否与用户提出的问题相关。
# tests/test_eval.py
def run_explainer(topic_title: str, topic_description: str, session_id: str) -> str:
"""运行解释器并返回其最终生成的解释文本。"""
from graph.state import StudyRoadmap, Topic, initial_state
from agents.explainer import explainer_node
from langchain_core.messages import AIMessage
state = initial_state(f"学习 {topic_title}", session_id)
state["roadmap"] = StudyRoadmap(
goal=f"学习 {topic_title}",
total_weeks=1,
topics=[Topic(topic_title, topic_description, 60)],
)
state["current_topic_index"] = 0
result = explainer_node(state)
# 提取最终输出结果:即最后一个不包含任何工具调用的AIMessage
for msg in reversed(result.get("messages", [])):
if (isinstance(msg, AIMessage) and msg.content and not getattr(msg, "tool_calls", None)):
return msg.content
return ""
@pytest.mark.eval
class TestExplainerQuality:
FAITHFULNESS_THRESHOLD = 0.6
RELEVANCY_threshold = 0.6
@pytest.fixture(autouse=True)
def setup(self, closures_note_content):
"""运行解释器一次,然后在该类中的所有测试中重复使用其输出结果。"""
self.retrieval_context = [closures_note_content]
self.explanation = run_explainer(
topic_title="闭包原理讲解",
topic_description="了解闭包是如何捕获外部作用域中的变量的",
session_id="eval-test-001",
)
if not self.explanation:
pytest.skip("解释器未返回任何输出结果。请确认Ollama正在运行中。")
def test_explanation_is.faithful_to_notes(self):
"""
解释内容不应包含说明原文中没有的信息。
FaithfulnessMetric会检查:解释中的每一个观点是否都有相应的依据支持?
分数过低说明解释器在编造内容。"""
from deepeval.test_case import LLMTestCase
from deepeval.metrics import FaithfulnessMetric
judge = get_judge_model()
if judge is None:
pytest.skip("无法初始化评估模型")
test_case = LLMTestCase(
input="解释Python中的闭包机制",
actual_output=self.explanation,
retrieval_context=self.retrieval_context,
)
metric = FaithfulnessMetric(
model=judge,
threshold=self.FAITHFULNESS_THRESHOLD,
include_reason=True,
)
metric.measure(test_case)
print(f"\n[准确性] 分数:{metric.score:.3f}")
if hasattr(metric, "reason"):
print(f"[准确性] 原因:{metric.reason}")
assert metric.score >= self.FAITHFULNESS_THRESHOLD, (
f"准确性分数 {metric.score:.3f} 低于 {self.FAITHFULNESS_threshold}。\n"
f"这意味着解释内容中可能包含了不真实的信息。\n"
f"原因:{getattr(metric, 'reason', 'not available')}"
)
def test_explanation_is_relevant_to_topic(self):
"""解释内容应该与用户提出的问题相关。」
from deepeval.test_case import LLMTestCase
from deepeval.metrics import AnswerRelevancyMetric
judge = get_judge_model()
if judge is None:
pytest.skip("无法初始化评估模型")
test_case = LLMTestCase(
input="解释Python中的闭包机制",
actual_output=self.explanation,
)
metric = AnswerRelevancy Metric(
model=judge,
threshold=self.RELEVANCY_THRESHOLD,
)
metric.measure(test_case)
print(f"\n[相关性] 分数:{metric.score:.3f}")
assert metric.score >= self.RELEVANCY_threshold, (
f"相关性分数 {metric.score:.3f} 低于 {self.RELEVANCY_THRESHOLD}。\n"
f"这意味着解释内容可能偏离了主题。」
)
TestExplainerQuality中的autouse=True设置会使得解释器仅运行一次,然后在其后的所有测试中重复使用该次的输出结果。这样一来,当同一个解释结果可以用于评估两个不同的指标时,就无需为每个测试分别调用解释器了。
7.6 评分质量测试
这些测试旨在验证评分系统的得分结果是否与实际答案的质量相符。它们并不需要使用DeepEval指标,而是直接调用grade_answer函数,并检查得分是否处于预期的范围内:
@pytest.mark.eval
class TestGradingQuality:
def test_correct_answer_scores_high(self):
"""一个明显正确的答案应该获得大于或等于0.65的分数。」
from agents.quiz_generator import grade_answer
result = grade_answer(
question="Python闭包需要满足哪三个条件?」,
expected=(
"闭包需要满足以下条件:1) 必须是一个嵌套的内部函数;"
"2) 这个内部函数必须引用外部作用域中的变量;"
"3) 外部函数必须返回这个内部函数。"
),
student_answer=[
"需要一个使用外部函数作用域中变量的嵌套函数,并且外部函数必须返回这个内部函数。"
],
)
print(f"\n[GradeQuality] 正确答案的得分:{result.get('score', 0):.2f}")
assert result.get("score", 0) >= 0.65, (
f"正确答案的得分太低了:{result['score']:.2f}\n"
f"反馈信息:{result.get('feedback', '')}"
)
def test_wrong_answer_scores_low(self):
"""一个明显错误的答案应该获得小于或等于0.35的分数。」
from agents.quiz_generator import grade_answer
result = grade_answer(
question="什么是Python闭包?”,
expected)(
"闭包是一种嵌套函数,它在外部函数返回后仍然能够访问外部作用域中的变量。"
),
student_answer=[
"闭包是一个类,它‘捕获’了自己属性的值,并阻止外部代码访问这些属性。”
],
)
print(f"\n[GradeQuality] 错误答案的得分:{result.get('score', 0):.2f}")
assert result.get("score", 0) <= 0.35, (
f"错误答案的得分太高了:{result['score']:.2f}\n"
f"可能是评分系统过于宽松了。"
)
def test_partial_answer_scores_middle(self):
"""一个部分正确的答案应该获得0.3到0.75之间的分数。」
from agents.quiz_generator import grade_answer
result = grade_answer(
question="闭包中的延迟绑定机制是什么?如何修复它?”,
expected)(
"延迟绑定意味着闭包会在被调用时才去查找变量的值,而不是在定义时就确定这些值的。修复方法:使用默认参数值,例如将`lambda i=i: i`改为`lambda: i`。"
),
student_answer=[
"延迟绑定意味着闭包在被调用时会使用变量当时的值,而不是定义时的值。" # 知道问题所在,但不知道如何解决
],
)
score = result.get("score", 0)
print(f"\n[GradeQuality] 部分正确的答案的得分:{score:.2f}")
assert 0.3 <= score <= 0.75, (
f"部分正确的答案应该获得0.3到0.75之间的分数,实际得分为{score:.2f}。"
)
这三项测试共同为你的评估系统提供了可靠性保障:评分系统会给予正确答案相应的分数,对错误答案进行扣分,并会根据实际情况给予适当的部分分数。如果在模型更新或提示内容修改后,其中任何一项测试结果不达标,你就能立刻知道评分系统的判断标准发生了哪些变化。
7.7 辅导质量测试
这项辅导质量测试使用了DeepEval提供的GEval评估指标。这个工具允许你用简单的英语来制定自己的评估标准:
@pytest.mark.eval
class TestProgressCoachQuality:
COACHING_QUALITY_THRESHOLD = 0.6
def test_coaching_message_is_encouraging_and_specific(self):
"""
辅导信息应当充满鼓励性、针对具体问题,并且具有可操作性。
GEval允许你用简单的英语来定义评估标准。
评分系统会根据这些标准对输出结果进行打分,分数范围为0.0到1.0。
"""
from deepeval.test_case import LLMTestCase, LLMTestCaseParams
from deepeval.metrics import GEval
from agents.progress_coach import get_coaching_message
judge = get_judge_model()
if judge is None:
pytest.skip("无法初始化评分模型")
coaching = get_coaching_message(
topic="Python闭包",
score=0.67,
weak_areas=["late binding", "nonlocal keyword"],
)
coaching_text = (
f"总结:{coaching.get('summary', '')}\n"
f"鼓励性话语:{coaching.get('encouragement', '')}"
)
test_case = LLMTestCase(
input=(
"为一名在Python闭包这部分内容得了67分、且对late binding和非local概念理解有困难的学生生成辅导信息"
),
actual_output=coaching_text,
)
metric = GEval(
name="CoachingQuality",
criteria=[
"判断这条辅导信息是否满足以下要求:"
"1) 鼓励性语言恰当,且不会对分数进行虚假夸大;"
"2) 与讨论的主题及学生的薄弱环节紧密相关;"
"3) 具有可操作性,能为学生指明下一步应该做什么;"
"4) 表达简洁,总长度在2到4句话之间。"
"如果辅导信息内容空泛、含糊不清或带有居高临下的态度,那么它的质量就较差。"
],
evaluation_params=[LLMTestCaseParams.ACTUAL_OUTPUT],
model=judge,
threshold=self.COACHING_QUALITY_THRESHOLD,
)
metric.measure(test_case)
print(f"\n[CoachingQuality] 评分:{metric.score:.3f}")
assert metric.score >= self.COACHINGQUALITY_threshold, (
f"辅导质量为{metric.score:.3f},未达到最低标准。\n"
f"具体内容如下:\n{coaching_text}"
)
GEval是DeepEval提供的最灵活的评估工具。你可以用简单的英语来描述“优秀”的标准,评分系统会根据这些标准对输出结果进行评价。当你有一些难以用公式表达但容易用文字描述的定性要求时,就可以使用这个工具。
7.8 运行评估套件
单元测试(快速,无需使用Ollama):
pytest tests/ -v
# 共184个测试项,评估相关测试会被自动排除在外
评估测试(耗时较长,需要使用Ollama):
pytest tests/test_eval.py -m eval -v -s
测试结果会显示如下内容:
[TestExplainerQuality] 正在为“闭包”主题运行解释器测试…
[TestExplainerQuality] 解释内容的长度为:1,847个字符
[Faithfulness] 分数:0.782(阈值:0.600)
[Faithfulness] 原因:所有主要评估内容都源自“closer.md”这份文档。
测试通过
[Relevancy] 分数:0.841
测试通过
[GradeQuality] 正确答案的得分:0.82
测试通过
[GradeQuality] 错误答案的得分:0.15
测试通过
[GradeQuality] 部分正确的答案得分:0.55
测试通过
[CoachingQuality] 分数:0.731
测试通过
💡 保守地设置阈值
本地7B模型的“忠实度”和“相关性”指标得分通常在0.6到0.8之间;而云模型对应的得分一般在0.8到0.95之间。这些测试中设定的阈值为0.6:这个数值既足够低,以确保本地模型能够可靠地通过测试;同时又足够高,以便及时发现模型性能的显著下降。
如果你升级到了更强大的模型,并且希望设置更严格的质量检测标准,就可以提高这些阈值。但如果某个模型虽然能产生看似正确的结果,但该模型在测试中却一直未能通过评估,那么就应该降低阈值,并详细记录其原因。
在实际生产环境中,正是通过这样的评估套件来管理模型更新带来的各种问题的。当你从一个模型版本切换到另一个版本时,在部署之前务必先运行这些评估测试。
如果“忠实度”指标的得分低于预设阈值,说明模型变更可能会带来错误结果的风险,此时应立即恢复原来的模型配置;而如果评分系统对正确答案的评分过低,那么这种阈值设定的偏差也会影响学生的学习体验。评估测试其实就是用于检测大语言模型行为变化的工具,就如同单元测试是用来检查代码逻辑是否正确的工具一样。
在下一章中,你将添加A2A协议层。届时,“测验生成器”将会成为一个独立的服务,任何代理或框架都可以调用它;同时,“CrewAI代理”也会加入这个系统——当学生需要额外帮助时,“进度教练”会将其任务委托给这个代理来处理。
第8章:使用A2A协议实现跨框架协作
目前系统中的所有代理都是LangGraph调用的Python函数。这种设计对于大多数生产环境来说确实是很合适的,因为将所有功能集中在同一个框架中能够提高开发效率。
但是,在某些实际应用场景中,我们可能需要采用不同的架构:比如使用不同的框架来构建代理程序,由不同的团队负责维护这些代理,让它们能够独立部署,并且任何支持HTTP协议的系统都可以调用它们。
“代理间通信(A2A)协议”正是实现了这种需求。A2A是一种开放标准(基于JSON-RPC 2.0和HTTP技术),它能够让任何代理都以统一的方式说明自己能执行哪些操作,并接受来自其他系统的任务请求,而无论这些请求方使用的是哪种框架。
一个LangGraph代理和一个CrewAI代理,即使彼此之前从未听说过对方,也可以通过A2A机制进行协作,其方式就如同两个REST服务通过HTTP进行交互一样。
本章为系统添加了两项A2A服务:一个是作为独立服务提供的测验生成器,另一个是当学生需要不同的解释角度时,Progress Coach会调用的CrewAI学习助手。
8.1 A2A的工作原理
在编写任何代码之前,有三个概念是需要了解的。
代理信息卡片是一份以JSON格式存储的文件,其路径为/.well-known/agent-card.json。这份文件描述了该代理能够执行哪些操作:它的名称、功能、具备的技能,以及如何向它发送任务。
任何A2A客户端在发起请求之前都会首先获取这份信息卡片,以确定该代理是否能够处理自己的请求。代理信息卡片实际上就是该代理的公共API规范,类似于REST服务的OpenAPI文档。
任务提交是通过一个统一的接口来完成的:POST /tasks/send。请求数据采用JSON-RPC 2.0格式,其中包含角色信息(例如"user")以及任务内容(通常是一个包含JSON数据的TextPart)。代理接收到这些信息后会处理任务,并以相同的格式返回结果。
框架无关性是A2A机制的重要特点。所有的HTTP通信和协议细节都由A2A服务器来处理,而你的代理代码只需实现一个AgentExecutor子类,其中包含一个execute()方法,用于接收解析后的请求并生成响应即可。无论使用的是LangGraph、CrewAI还是其他任何框架来构建这个执行器,其在协议层面上都完全不可见。调用者看到的只是一种基于HTTP的接口。
调用方(任何框架)
↓ 获取 /.well-known/agent-card.json ← 了解代理的功能
↓ 提交任务至 /tasks/send ← 采用JSON-RPC 2.0格式
↑ 接收到包含结果的响应
A2A服务器(Starlette + uvicorn)
↓ 调用 AgentExecutor.execute()
你的代理逻辑(LangGraph / CrewAI / 其他任何框架)
8.2 作为A2A服务的测验生成器
src/a2a_services/quiz_service.py文件将generate_questions和grade_answer这两个函数(第4章中已经使用过这些函数)封装成了一个A2A服务。这两个函数的实现本身并没有发生任何变化。
代理信息卡片的内容如下:
# src/a2a_services/quiz_service.py
from a2a.types import AgentCapabilities, AgentCard, AgentSkill
QUIZ_SKILL = AgentSkill(
id="generate_and_grade_quiz",
name="生成并评分测验题",
description=[
"根据给定的主题和可选的解释文本,生成用于测试概念理解能力的测验题目;"
"如果提供了答案,还会对这些答案进行评分,并指出薄弱环节。"
],
tags=["quiz", "assessment", "education", "grading"],
examples=[
"生成关于Python闭包的测验题,"
"对装饰器相关的答案进行评分。"
],
)
QUIZ_AGENT_CARD = AgentCard(
name="测验生成服务",
description={
"利用大语言模型作为评分工具来生成和批改测验题目。"
"该服务与任何支持A2A协议的代理都能配合使用。"
},
url="http://localhost:9001/",
version="1.0.0",
defaultInputModes=["text"],
defaultOutput Modes=["text"],
capabilities=AgentCapabilities(streaming=False),
skills=[QUIZ_SKILL],
)
A2A框架会自动通过GET /.well-known/agent-card.json路径提供代理卡片信息,因此您无需为它编写专门的处理程序。
AgentExecutor负责实现实际的测验逻辑。它会接收经过解析的A2A请求,调用generate_questions函数来生成问题,必要时还会调用grade_answer函数对用户提供的答案进行评分,最终生成测试结果并发送出去:
from a2a.server.agent_execution import AgentExecutor, RequestContext
from a2a.server.events import EventQueue
from a2a.types import Message, TextPart
from agents quizzes_generator import generate_questions, grade_answer
class QuizAgentExecutor(AgentExecutor):
"""
负责处理传入的A2A测验任务。
请求格式(以JSON形式存储在TextPart中):
{
"topic": "Python闭包",
"explanation": "闭包是指...", (可选)
"answers": ["答案1", ...] (仅针对需要填写答案的题目,如省略则不显示)
}
"""
async def execute(
self,
context: RequestContext,
event_queue: EventQueue,
) -> None:
# 解析请求内容
request_text = ""
for part in context.current_request.params.message.parts:
if isinstance(part, TextPart):
request_text += part.text
try:
request_data = json.loads(request_text)
except json.JSONDecodeError:
request_data = {"topic": request_text}
topic = request_data.get("topic", "通用知识")
explanation = request_data.get("explanation", "")
provided_answers = request_data.get("answers", "")
# 生成问题(在线程池中同步执行)
questions_data = await asyncio.to_thread(
generate_questions, topic, explanation, 3
)
if not providedAnswers:
# 如果没有提供答案,只返回问题内容。
result = {
"status": "questions_ready",
"topic": topic,
"questions": questions_data,
}
else:
# 对用户提供的答案进行评分
graded = []
total = 0.0
weak_areas = []
for q_data, answer in zip(questions_data, provided_answers):
grade = await asyncio.to_thread(
grade_answer,
q_data["question"],
q_data["expected_answer"],
answer,
)
score = float(grade.get("score", 0.0))
total += score
if grade.get("missing_concept"):
weak_areas.append(grade["missing_concept"])
graded.append({
"question": q_data["question"],
"answer": answer,
"score": score,
"correct": bool(grade.get("correct", False)),
"feedback": grade.get("feedback", ""),
})
result = {
"status": "graded",
"topic": topic,
"score": total / len(questions_data) if questions_data else 0.0,
"questions": questions_data,
"graded_questions": graded,
"weak_areas": list(set(weak_areas)),
}
# 发送结果。A2A框架会将这个结果传回调用方。
await event_queue.enqueue_event(
Message(
role="agent",
parts=[TextPart(text=json.dumps(result, indent=2))],
)
)
async def cancel(self, context: RequestContext, event_queue: EventQueue) -> None:
pass
asyncio.to_thread函数用于将同步执行的generate_questions和grade_answer方法封装在异步线程中执行。A2A执行器本身是异步的,它会在事件循环中运行。如果直接调用阻塞函数,就会导致事件循环被冻结,其他所有任务也会被阻塞。to_thread会将阻塞函数放在线程池中执行,并在等待结果时不会干扰事件循环的正常运行。
启动服务器:
from a2a.server.apps import A2AStarletteApplication
from a2a.server.requesthandlers import DefaultRequestHandler
from a2a.servertasks import InMemoryTaskStore
def create_quiz_server():
handler = DefaultRequestHandler(
agentexecutor=QuizAgentExecutor(),
task_store=InMemoryTaskStore(),
)
app = A2AStarletteApplication(
agent_card=QUIZ_AGENT_CARD,
http_handler=handler,
)
return app.build()
if __name__ == "__main__":
uvicorn.run(create_quiz_server(), host="0.0.0.0", port=9001, log_level="warning")
python src/a2a_services/quiz_service.py
# [Quiz A2A Service] 正在运行于 http://localhost:9001
# [Quiz A2A Service] 代理卡信息:http://localhost:9001/.well-known/agent-card.json
验证服务器是否已启动:
curl http://localhost:9001/.well-known/agent-card.json
{
"name": "Quiz Generator Service",
"description": "生成并批改测验题目...",
"url": "http://localhost:9001/",
"skills": [
{
"id": "generate_and_grade_quiz",
"name": "生成并批改测验"
}
]
}
8.3 A2A客户端
src/a2a_services/a2a_client.py将HTTP协议相关细节与代理代码分离。该客户端不会自行构造JSON-RPC数据包,而是直接调用delegate_quiz_task方法并将处理结果以字典形式返回。
# src/a2a_services/a2a_client.py
import httpx
import json
import uuid
QUIZ_SERVICE_URL = os.getenv("QUIZ_SERVICE_URL", "http://localhost:9001")
STUDY_BUDDY_URL = os.getenv("STUDY_BUDDY_URL", "http://localhost:9002")
DEFAULT_TIMEOUT = 120.0
def discover_agent(base_url: str) -> dict:
"""获取代理卡信息以了解其功能。如果无法连接,则返回空字典。"""
card_url = f"{base_url.rstrip('/')}/.well-known/agent-card.json"
try:
response = httpx.get(card_url, timeout=5.0)
response.raise_for_status()
return response.json()
except Exception as e:
print(f"[A2A Client] 无法访问 {card_url}: {e}")
return {}
def send_task(
base_url: str,
message_text: str,
task_id: str | None = None,
timeout: float = DEFAULT_TIMEOUT,
) -> dict:
"""
通过JSON-RPC 2.0协议将任务提交给A2A代理。
A2A服务要求使用特定的JSON-RPC数据包格式,调用者无需了解该数据包的详细结构,只需传递文本信息即可。
如果需要确保任务的唯一性,可以指定task_id;否则系统会自动生成UUID作为标识符。
"""
payload = {
"jsonrpc": "2.0",
"id": 1,
"method": "tasks/send",
"params": {
"id": task_id or str(uuid.uuid4()),
"message": {
"role": "user",
"parts": [{"type": "text", "text": message_text}],
},
},
}
url = f"{base_url.rstrip('/')}/tasks/send"
try:
response = httpx.post(url, json=payload, timeout=timeout)
response.raise_for_status()
data = response.json()
# 从响应数据中提取结果信息:
result = data.get("result",{})
artifacts = result.get("artifacts", [])
if artifacts:
for part in artifacts[0].get("parts", []):
if part.get("type") == "text":
try:
return json.loads(part["text"])
except json.JSONDecodeError:
return {"text": part["text"]}
# 如果无法获取结果信息,检查状态码并返回相应的错误信息:
status = result.get("status", "")
for part in status.get("message", {}).get("parts", []):
if part.get("type") == "text":
try:
return json.loads(part["text"])
except json.JSONDecodeError:
return {"text": part["text"]}
return result
except httpx.TimeoutException:
return {"error": f"服务超时:{timeout}s"}
except httpx.ConnectError:
return {"error": f"无法连接至 {url}"}
except Exception as e:
return {"error": f"A2A任务执行失败:{e}"}
def delegate_quiz_task(
topic: str,
explanation: str,
answers: list[str] | None = None,
quiz_service_url: str = QUIZ_SERVICE_URL,
) -> dict:
"""高级辅助函数:将测验任务提交给A2A服务。"""
payload = json.dumps({
"topic": topic,
"explanation": explanation,
"answers": answers or [],
})
return send_task(quiz_service_url, payload)
def is_quiz_service_available(quiz_service_url: str = QUIZ_SERVICE_URL) -> bool:
"""快速检查:A2A服务是否可用?"""
return bool(discover_agent(quiz_service_url))
discover_agent这一功能用于检查服务的可用性。它会尝试在/.well-known/agent-card.json路径下获取Agent Card信息,超时时间为5秒。如果获取成功,说明该服务处于可使用状态,能够接收任务。Progress Coach在将任务分派给其他组件之前会先执行这一检查;如果查询结果为空,即返回{},那么Progress Coach就会转而使用本地生成的辅助内容来帮助用户学习,而不会尝试提交完整的任务。
8.4 CrewAI学习助手
CrewAI学习助手体现了A2A架构的核心价值:一个LangGraph代理通过一种双方都不了解的协议来调用另一个CrewAI代理。
src/crewai_agent/study_buddy.py文件用于创建一个CrewAI代理,将其封装在A2A框架中的AgentExecutor中,并通过端口9002提供服务。LangGraph Progress Coach并不会导入CrewAI的相关模块,CrewAI代理也不会导入LangGraph的模块,它们之间仅通过HTTP进行通信。
CrewAI端的具体实现如下:
# src/crewai_agent/study_buddy.py
from crewai import Agent, Crew, LLM, Process, Task
from crewai.tools import BaseTool
MODEL_NAME = os.getenv("OLLAMA_MODEL", "qwen2.5:7b")
OLLAMA_BASE_URL = os.getenv("OLLAMA_BASE_URL", "http://localhost:11434")
class TopicAnalyserTool(BaseTool):
"""
在生成响应之前,对学习内容进行结构化分析。
在实际应用中,这个工具可能会查询知识图谱或课程数据库;
在本教程中,它则根据输入信息生成结构化的指导建议。
"""
name: str = "topic_analyser"
description: str = (
"分析学习主题及薄弱环节,从而生成有针对性的关键概念列表。"
)
args_schema: type = TopicAnalyserInput
def _run(self, topic: str, weak_areas: list[str] | None = None) -> str:
areas = weak_areas or []
return json.dumps({
"topic": topic,
"focus_areas": areas or [f"核心概念:{topic}"],
"suggested_approach": f"先从基础知识开始学习,然后重点掌握:{', '.join(areas)}.",
"study_tip": (
"试着用自己的话把这个概念解释出来。"
"如果你能简单明了地解释它,那就说明你真正理解了这个概念。"
),
})
def build_study_buddy_crew(topic: str, explanation: str, weak_areas: list[str]) -> Crew:
"""根据具体的学习需求创建一个CrewAI辅助团队。"""
llm = LLM(model=f"ollama/{MODEL_NAME}", base_url=OLLAMA_BASE_URL)
agent = Agent(
role="学习助手",
goal=(
"提供清晰、富有鼓励性的补充解释,帮助学生从新的角度理解复杂概念。"
),
backstory=[
"我是一位经验丰富的导师,专门寻找能够帮助学生理解难点内容的替代性解释和方法。"
],
llm=llm,
tools=[TopicAnalyserTool"],
verbose=False,
allow_delegation=False,
)
weak_text = (
f"学生遇到的困难包括:{', '.join(weak_areas]}"
if weak_areas else "未发现具体的薄弱环节。"
)
task = Task(
description=(
"有学生正在学习'{topic}',他们已经收到了如下解释:
A2A封装层将CrewAI团队与A2A协议连接起来。这个封装类就是StudyBuddyExecutor,它的结构与QuizAgentExecutor相同,只不过它会调用crew.kickoff()方法而不是测验相关的函数:
class StudyBuddyExecutor(AgentExecutor):
"""
将A2A协议与CrewAI的执行机制连接起来。
LangGraph系统并不知道这是CrewAI在处理请求;
CrewAI团队本身也不清楚自己实际上是在响应A2A请求。
"""
async def execute(
self,
context: RequestContext,
event_queue: EventQueue,
) -> None:
# 解析请求内容
request_text = ""
for part in context.current_request.params.message.parts:
if isinstance(part, TextPart):
request_text += part.text
try:
request_data = json.loads(request_text)
except json.JSONDecodeError:
request_data = {"topic": request_text}
topic = request_data.get("topic", "General Topic")
explanation = request_data.get("explanation", "")
weak_areas = request_data.get("weak_areas", "")
# CrewAI的kickoff()方法是同步执行的,因此需要通过线程池来运行,
# 这样才能避免阻塞异步事件循环。
try:
crew = build_study_buddy_crew(topic, explanation, weak_areas)
crew_result = await asyncio.to_thread(crew.kickoff)
result_text = crew_result.raw if hasattr(crew_result, "raw") else str(crew_result)
result = {
"source": "crewai_study_buddy",
"topic": topic,
"weak_areas": weak_areas,
"assistance": result_text,
"status": "complete",
}
except Exception as e:
result = {
"source": "crewai_study_buddy",
"topic": topic,
"assistance": f"无法为'{topic}'生成辅助信息。",
"status": "error",
"error": str(e),
}
await event_queue.enqueue_event(
Message(
role="agent",
parts=[TextPart(text=json.dumps(result, indent=2))],
)
)
asyncio.to_thread(crew.kickoff)这条代码是关键所在。CrewAI的kickoff()方法是同步执行的,而且会阻塞当前线程;根据所使用的模型及任务的复杂程度,这个过程可能会持续30到60秒。
如果直接在async函数中调用这个方法,那么在整个执行过程中,A2A服务器将会被冻结,从而无法接收其他请求。asyncio.to_thread会使用Python默认的线程池来运行这个方法,这样在crew.kickoff()执行期间,事件循环仍然可以继续处理其他请求。
8.5 进度辅导回退模式
Progress Coach模块提供了两个辅助工具,用于与A2A服务进行交互。这两个工具都会首先尝试使用外部服务;如果外部服务出现故障,它们会自动切换到本地的默认处理方式。
“学习助手”辅助功能与progress_coach_node相连,每当某个主题的得分低于及格标准时,该辅助功能就会自动运行。
“测验委托辅助功能”为那些希望通过A2A服务来处理评分任务、而非在代码中直接执行评分操作的读者提供了现成的解决方案。为了简化操作流程,默认情况下,测验生成功能会保持在本地进行。
这两种辅助功能都采用了相同的故障隔离机制:首先会检测代理节点的状态,然后对实际的任务调用设置时间限制,从而确保用户不会受到任何外部故障的影响。
# src/agents/progress_coach.py
QUIZ_SERVICE_URL = "http://localhost:9001"
def try_a2a_quiz_delegation(topic, explanation, answers) -> dict | None:
"""
尝试将测验评分任务委托给A2A测验服务。
如果操作失败,返回None;否则返回评分结果。
注意:变量USE_A2A_QUIZ是在函数调用时读取的,而不是在模块加载时读取的。
如果在导入模块时读取环境变量,可能会导致测试结果出现异常。
在模块导入时确定的环境变量值会在整个程序运行期间保持不变。
use_a2a = os.getenv("USE_A2A_QUIZ", "true").lower() == "true"
if not use_a2a:
return None
try:
from a2a_services.a2a_client import delegate_quiz_task, is_quiz_service_available
if not is_quiz_service_available(QUIZ_SERVICE_URL):
print(f"[Progress Coach] A2A测验服务不可用,将使用本地评分功能。)")
return None
print(f "[Progress Coach] 正在将测验任务委托给A2A服务:{QUIZ_SERVICE_URL}")
result = delegate_quiz_task(topic=topic, explanation=explanation, answers=answers)
if "error" in result:
print(f"[Progress Coach] A2A服务调用失败,错误原因是:{result['error']}")
return None
return result
except Exception as e:
print(f"[Progress Coach] A2A服务调用出现异常,错误原因是:{e}")
return None
def try_study_buddy_assistance(topic, explanation, weak_areas) -> str | None:
"""
向CrewAI学习助手请求额外帮助。
如果该辅助功能不可用,返回空字符串;否则返回提供的帮助内容。
注意:变量USE_STUDY_BUDDY是在函数调用时读取的,而不是在模块加载时读取的。
$$
use_study_buddy = os.getenv("USE_STUDY_BUDDY", "true").lower() == "true"
if not use_study_buddy:
return None
try:
from a2a_services.a2a_client import request_study_assistance, is_study_buddy_available
if not is_study_buddy_available(study_buddy_url):
return None
result = request_study_assistance(
topic=topic,
explanation=explanation,
weak_areas=weak_areas,
study_buddy_url=study_buddy_url,
)
if result.get("status") == "error" or "error" in result:
return None
return result.get("assistance", "")
except Exception as e:
return None
关于在函数调用时读取环境变量的这一设计,值得大家牢记。如果在模块加载时就读取环境变量(例如在文件开头处写USE_A2A = os.getenv("USE_A2A_QUIZ", "true") == "true"),那么所获取的环境变量值就会是模块首次被导入时所处的状态。那些在调用函数之前就修改了环境变量的测试,将无法检测到这种变化,因为此时模块已经完成了初始化操作。只有在函数内部读取环境变量,才能确保每次调用函数时都能获得当前的环境变量值。
8.6 运行完整的三终端设置
当所有服务都准备就绪后,整个系统会使用三个终端。
终端1: 主要的学习辅助工具:
source .venv/bin/activate
python main.py "Learn Python closures"
终端2: A2A测验生成服务:
source .venv/bin/activate
python src/a2a_services/quiz_service.py
终端3: CrewAI学习助手:
source .venv/bin/activate
python src/crewai_agent/study_buddy.py
或者使用Make命令来运行这些服务:
make services # 终端2和终端3在后台运行
make run # 终端1运行
当“进度教练”程序同时启动这两个服务时,你会看到如下输出:
[Progress Coach] 分数:35%
[Progress Coach] 已将测验任务委托给A2A服务:http://localhost:9001
[Quiz A2A] 任务接收成功:主题为“Python函数”,提供的答案数量为3个
[Quiz A2A> 任务完成:状态为“已评分”
[Progress Coach> A2A测验完成,得分:35%
[Progress Coach> 正在向CrewAI学习助手请求学习帮助…
[Study Buddy A2A] 请求内容:主题为“Python函数”,薄弱环节包括“一等函数”
[Study Buddy A2A] 任务完成(共输入了287个字符)
────────────────────────────────────────────────────────────
教练提示:你在“Python函数”这部分取得了35分的成绩,这是一个非常好的基础……
📚 学习助手建议:
可以把函数看作是具有特殊功能的变量。就像你可以将一个数字传递给另一个函数一样,你也可以将一个函数传递给另一个函数……
────────────────────────────────────────────────────────────
如果其中任何一个服务没有运行,“进度教练”程序会自动切换到本地模式继续执行:
[A2A Client] 无法访问http://localhost:9001/.well-known/agent-card.json:连接被拒绝
[Progress Coach] A2A测验服务不可用,将使用本地资源。
学习过程会继续进行,学生根本不会看到这些错误信息。
📌 提示: 可以运行A2A测试来验证系统功能:
pytest tests/test_a2a.py tests/test_crewai_interop.py -v
预期结果:共44个测试项,全部通过。这些测试会模拟HTTP请求过程,验证`delegate_quiz_task`函数是否能生成正确的JSON-RPC数据格式,`discover_agent`函数在遇到连接错误时能否正常处理,以及`build_study_buddy_crew`函数能否生成配置正确的学习助手组件。运行这些测试并不需要任何正在运行的服务。
A2A技术使得代理系统能够在组织层面实现模块化设计。例如,一个团队开发的合规培训平台可以调用另一个团队开发的认证验证服务,而两个团队都不必了解对方的实现细节。A2A协议就是这种跨团队协作的“契约”机制,双方都会严格遵守它,其余的具体实现细节则属于各自团队的内部事务。
在最后一章中,你将看到整个系统从开始到结束的运行过程,了解如何对其进行扩展,并了解多智能体生态系统未来的发展方向。
第9章:完整的系统与未来发展方向
所有组件都已经搭建完成:四个LangGraph智能体通过共享状态进行协作,两台MCP服务器提供工具访问功能,两个A2A服务以独立进程的形式运行,Langfuse用于记录决策过程中的相关数据,DeepEval负责执行质量检测,而Streamlit用户界面则使得无需使用终端即可使用整个系统。
这一章实际上就是使用指南——说明了各个组件是如何协同工作的,如何运行这个系统,如何对其进行扩展,以及这些设计模式在Learning Accelerator之外的其他场景中也能得到应用。
9.1 main.py:入口点
main.py文件的长度不到140行,它主要完成四项任务:加载配置信息、处理命令行参数、通过中断/恢复机制来运行图模型,以及打印会话总结信息。
其他所有相关功能(如智能体的管理、工具的使用、系统可观测性的实现以及数据的持久化存储)都是由main.py导入的其他模块来处理的。
# main.py
import sys
import os
import uuid
from pathlib import Path
# 在任何项目导入之前,将src/目录添加到Python路径中
sys.path.insert(0, str(Path(__file__).parent / "src"))
from dotenv import load_dotenv
load_dotenv()
from graphworkflow import graph
from graph.state import initial_state
from observability.langfuse_setup import get_langfuse_config, flush_langfuse
def run_session(goal: str, session_id: str | None = None) -> None:
"""使用Langfuse进行交互式学习会话。"""
isresume = session_id is not None
if not session_id:
session_id = str(uuid.uuid4())[:8]
# get_langfuse_config()用于生成完整的运行配置:
# - 用于SQLite数据库检查点的thread_id
# - 如果设置了LANGFUSE_PUBLIC_KEY,还会包含Langfuse的回调处理函数
config = get_langfuse_config(session_id)
print(f"\n{'='*60}")
print(f"Learning Accelerator")
print(f"会话ID:{session_id}")
if isresume:
print(f"正在恢复之前的会话...")
else:
print(f"学习目标:{goal}")
print(f"{'='*60}")
# 如果是新的会话,则使用初始状态;如果是恢复会话,则使用之前的状态。LangGraph会从检查点中加载数据。
state = None if isresume else initial_state(goal, session_id)
result = graph.invoke(state, config=config)
# 中断/恢复机制
from langgraph.types import Command
while "__interrupt__" in result:
interrupt_payload = result["__interrupt__"][0].value
roadmap = interrupt_payload.get("roadmap")
if roadmap:
# 显示学习路线图(此处显示的是简化版本,完整内容请查看代码仓库)
print_roadmap(roadmap)
print(f"\n{interrupt_payload.get('prompt', '继续吗?')}")
user_input = input("> ").strip()
result = graph.invoke(Command(resume=user_input), config=config)
if result.get("error"):
print(f"\n[错误] {result['error']}")
return
print_session_summary(result)
flush_langfuse() # 确保在退出程序之前所有数据都被发送完毕
if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser(description="Learning Accelerator")
parser.add_argument("goal", nargs="?",
default="从零开始学习Python闭包和装饰器")
parser.add_argument("--resume", metavar="SESSION_ID",
help="通过会话ID恢复之前的会话")
args = parser.parse_args()
if args.resume:
run_session(goal"", session_id=args.resume)
else:
run_sessiongoal=args.goal)
关于这个文件,有三点需要注意。
该图表是以模块级单例的形式被导入的。 当执行 `from graphworkflow import graph` 时,会在导入时刻调用一次 `build_graph()` 函数。编译后的图表会在整个程序运行过程中保持不变:使用的依旧是同一个 SqliteSaver 连接,注册的节点也是一样的。
这是有意为之的。无论是一次性的 `graph.invoke` 调用,还是因中断而重新启动的调用,它们都会使用同一个已编译好的图表以及相同的检查点机制。
对于程序中断后的恢复操作,处理代码仅有一行: `state = None if is_resume else initial_state(...)`。将 `None` 作为参数传递给 LangGraph,它会从 `config` 文件中加载与当前 `thread_id` 对应的最新检查点数据。这就是调用者端实现程序恢复的全部逻辑。
`while` 循环同时处理用户对计划的批准和拒绝操作。 如果用户输入 “no”,程序会重新返回到 `curriculum_planner` 阶段,该阶段会生成新的学习计划,进而触发另一次中断。循环会不断显示新的学习计划,直到用户最终批准某个计划为止。
9.2 三端启动模式
整个系统需要同时运行三个进程。`Makefile` 中提供了相应的命令来启动这些进程:
make setup # 仅首次执行:创建虚拟环境并安装依赖项
make langfuse # 可选:启动自托管的 Langfuse 服务
make services # 在后台同时启动两个 A2A 服务
make run # 启动主应用程序
services` 这一命令的具体执行内容如下:
services: stop
@echo "正在启动 A2A 服务..."
$(PYTHON) src/a2a_services/quiz_service.py &
@sleep 1
$(PYTHON) src/crewai_agent/study_buddy.py &
@sleep 1
@echo ""
@echo "服务已启动:"
@echo " Quiz: http://localhost:9001"
@echo " Study Buddy: http://localhost:9002"
请确认所有服务都能正常访问:
curl http://localhost:9001/.well-known/agent-card.json
curl http://localhost:9002/.well-known/agent-card.json
curl http://localhost:3000 # Langfuse 用户界面
9.3 从开始到结束的完整学习流程
当 Ollama 运行正常、A2A 服务也已启动,并且 Langfuse 配置完成之后,可以执行以下命令:
make services
make run
接下来是目标设定、用户审批以及学习主题的循环处理过程:
============================================================
学习加速器
会话 ID:8660e1d6
学习目标:从零开始学习 Python 的闭包和装饰器
============================================================
[监控信息] 正在追踪会话 8660e1d6 → http://localhost:3000
[课程规划系统] 正在为“学习 Python 的闭包...”制定学习计划...
[课程规划系统] 调用了 qwen2.5:7b...
[课程规划系统] 制定了以下学习计划:共 5 个主题,耗时 4 周
1. Python 函数:60 分钟
2. 作用域与命名空间(需要先学习 Python 函数):45 分钟
3> 内层函数(需要先学习作用域与命名空间):60 分钟
4> 创建闭包(需要先学习内层函数):75 分钟
5> 装饰器基础(需要先学习创建闭包):60 分钟
[人工审批] 正在等待用户审核学习计划...
============================================================
建议的学习计划
============================================================
学习目标:从零开始学习 Python 的闭包和装饰器
学习时长:4 周,每周学习 5 小时
1. Python 函数(60 分钟)
了解函数在 Python 中是如何被视为一等公民的...
...
这个学习计划合适吗?
输入 “yes” 开始学习
输入 “no” 重新制定计划
> yes
[人工审批] 学习计划已通过,现在开始学习。
[讲解系统] 当前学习主题:“Python 函数”
[讲解系统] 第一次调用大型语言模型...
→ 返回结果:tool_list_files{}
← 包含文件 “closures.md”, “decorators.md”, “python_basics.md”
[讲解系统] 第二次调用大型语言模型...
→ 读取文件 “python_basics.md”
← 内容涉及 Python 基础知识...
[讲解系统] 经过 4 次调用大型语言模型后,学习任务完成
[测验生成系统] 正在为“Python 函数”主题生成测验题目
[进度辅导系统] 将测验任务分配给 A2A 服务:http://localhost:9001
[A2A 测验系统] 已收到任务:主题为 “Python 函数”,提供的答案数量为 3
[A2A 测验系统] 任务已完成,状态为:已评分
[进度辅导系统] 测验得分:67%
[进度辅导系统] 正在请求 CrewAI 学习助手提供帮助...
[学习助手 A2A 系统] 任务已完成(用户输入了 287 个字符)
────────────────────────────────────────────────────────────
辅导老师:你的 Python 函数基础知识非常扎实……
📚 学习助手建议:
可以把函数想象成具有特殊功能的变量……
下一个学习主题是:“作用域与命名空间”
────────────────────────────────────────────────────────────
这次测试会调用系统中的所有组件:LangGraph调度机制、SQLite检查点功能、人工干预中断处理流程、MCP工具的调用过程、对Quiz服务以及CrewAI Study Buddy的A2A委托机制,还有Langfuse跟踪功能。测试结束后会生成总结报告,而跟踪数据也会在几秒钟内显示在Langfuse界面中。
9.4 Streamlit用户界面
对于开发工作来说,终端界面已经足够使用了。但对于日常使用,或者向那些不会使用终端的人展示系统功能而言,系统还是需要一个Web用户界面。
project根目录下的`streamlit_app.py`文件提供了这样的Web界面。其中的设计原理值得了解:实际上,src/文件夹中的LangGraph代码并没有发生任何变化,无论是main.py还是Web应用程序,使用的都是同一套图结构。唯一不同的是输入/输出机制——在Web应用中,`input()`和`print()`函数被替换成了Streamlit提供的组件,而中断/恢复流程则通过按钮点击来实现,同时st.session_state变量用于在不同运行次数之间保持上下文信息。
每当用户与系统进行交互时,Streamlit都会重新执行整个Python脚本。那些需要在多次运行中保持不变的数据都会存储在st.session_state这个字典中。LangGraph会话ID、运行配置、学习计划、当前主题索引以及测验进度等信息都保存在这里。
该应用程序被设计成一个状态机,包含五个界面页面(目标设定、学习计划确认、知识点讲解、测验环节、完成总结),而st.session_state.screen变量决定了每次重新运行时应该显示哪个页面。
其中一个技术细节是:quiz_generator_node节点会调用run_quiz()函数,而这个函数会使用input()从终端获取用户输入的答案。但如果在Streamlit框架内直接调用这个函数,浏览器就会冻结。为了解决这个问题,我们编写了一个专门针对Web界面的图结构,并使用了interrupt_before=["quiz_generator"]这条规则:
# streamlit_app.py (关键代码片段)
from graphworkflow import build_graph
from graph.state import initial_state, StudyRoadmap, QuizResult
from agents.quiz_generator import generate_questions, grade_answer
# 专为Web界面设计的图结构:在quiz_generator节点之前暂停执行,这样Web界面就可以处理用户输入操作,
# 而不会导致graph结构中的input()函数被调用。
ui_graph = build_graph(
db_path="data/checkpoints_ui.db",
interrupt_before=["quiz_generator"],
)
Web界面直接通过调用generate_questions和grade_answer函数来处理测验环节。一旦测验完成,应用程序会使用graph.update_state()将QuizResult结果重新写入检查点数据中,就好像quiz_generator_node已经正常执行完毕一样,之后再继续执行后续流程。
def advance_after_quiz(quiz_result: QuizResult):
"""在用户界面处理的测验完成后,将结果插入检查点并继续执行后续流程。"""
config = st.session_state.graph_config
# 通知LangGraph系统,测验结果已经生成完毕
ui_graph.update_state(
config,
{
"quiz_results": existing + [quiz_result],
"weak_areas": all_weak,
"roadmap": st.session_state.roadmap,
"current_topic_index": st.session_state.current_topic_index,
},
as_node="quiz_generator",
)
# 继续执行后续流程。首先运行progress_coach模块,然后根据用户选择继续讲解下一个主题或结束整个流程。
# 由于使用了interrupt_before=["quiz_generator"]规则,如果存在下一个需要处理的主题,
# 图结构会在再次执行quiz_generator节点之前暂停。
result = ui_graph.invoke(None, config=config)
需要记住的模式是:graph.update_state(config, values, as_node=...)这种用法允许调用者修改检查点状态,就好像某个特定节点产生了这些值一样。这就是将外部代码运行产生的结果重新纳入图的结构中的一种方法。
运行程序如下:
make streamlit
# 或者:streamlit run streamlit_app.py

图3. Streamlit网页界面。使用相同的LangGraph代码、相同的MCP服务器和相同的A2A服务,但输入/输出方式有所不同。
浏览器会打开地址http://localhost:8501。这个系统提供了网页用户界面,目标输入部分变成了一个表单,路线图审批功能则通过两个按钮来实现,解释内容以格式化的Markdown形式呈现。测验题目会逐一出现,并配有答案输入框;在进入下一个主题之前,教练的反馈信息会显示在一个信息框中。
当学习会话结束时,总结页面会显示每个主题的得分以及用于重新开始会话的会话ID。
💡 Streamlit的session_state模式
Streamlit会在每次用户交互时重新执行整个脚本。那些需要在多次运行中保持不变的数据会被保存在st.session_state这个字典中,Streamlit会在不同的运行之间保留这个字典。LangGraph的session_id和graph_config也会被保存在其中,当前显示的页面内容、路线图信息、当前的题目索引、已评分的答案以及所有已经完成的QuizResult对象也会被保存在这里。
从本质上讲,这个应用程序就是一个状态机,其中st.session_state.screen决定了应该展示哪些内容,而状态机的转换则是通过按钮点击来触发的。
这种以协议为先的设计方式带来的好处在于:该系统既可以拥有终端用户界面,也可以拥有网页用户界面;而且未来还可以添加React前端、Slack机器人或iOS应用程序,而位于src/目录中的LangGraph代码却不会因此受到任何影响。
9.5 最终的项目结构
当所有开发工作完成后,仓库的文件结构将会是如下所示:
freecodecamp-multi-agent-ai-system/
├── src/
│ ├── agents/
│ │ ├── curriculum_planner.py # 用于生成JSON路线图
│ │ ├── explainer.py # 负责调用MCP工具的循环模块
│ │ ├── quiz_generator.py # 用于生成测验题目并进行评分的模块
│ │ ├── progress_coach.py | 负责综合分析工作并实现A2A任务分配
│ │ └── human_approval.py | 用于中断当前操作或重新启动程序
│ ├── graph/
│ │ ├── state.py | 用于存储Agent的状态信息及相关的4个数据类
│ │ └── workflow.py | 定义状态图的相关逻辑
│ ├── mcp_servers/
│ │ ├── filesystem_server.py | 提供列表查看、读取文件、搜索文件等功能
│ │ └── memory_server.py | 提供获取数据、设置数据、删除数据、列出文件清单等功能
│ ├── a2a_services/
│ │ ├── quiz_service.py | 在端口9001上运行的测验代理模块
│ │ └── a2a_client.py | JSON-RPC客户端及发现机制相关代码
│ ├── crewai_agent/
│ │ └── study_buddy.py | 在端口9002上运行的CrewAI代理模块
│ └── observability/
│ └── langfuse_setup.py | 用于处理回调请求及配置相关参数
├── tests/ | 包含182个单元测试用例及12个评估测试用例
├── study_materials/sample_notes/ | 解释器相关的源代码内容
├── docs/ | 包含ARCHITECTURE.md和MODEL_selection.md文件
│ ├── data/ | 存储在运行时生成的SQLite检查点数据
├── main.py | 程序的终端入口点
├── streamlit_app.py | 程序的Web用户界面入口点
├── Makefile | 用于生成单个命令对应的构建文件
│ ├── docker-compose.yml | 用于配置自托管的Langfuse环境
│ ├── requirements.txt | 列出所需安装的软件版本
└── pyproject.toml | 包含pythonpath配置及pytest相关设置
9.6 扩展系统功能
该架构支持在多个方向上进行扩展,而且无需修改任何现有的代码。
添加新的代理模块。**》在src/agents/your_agent.py文件中编写相应的节点函数,然后通过builder.add_node("your_agent", your_agent_node)将其注册到workflow.py中。接下来需要添加连接该新代理与其他现有代理的边。由于其他代理并不知道新代理的存在,因此它们的工作方式不会发生任何变化。
更换推理后端。**》所有代理当前都使用指向OLLAMA_BASE_URL的ChatOllama作为推理后端。如果将这个URL改为LiteLLM网关(该网关在前端使用Ollama的API,在后台连接到OpenAI、Anthropic或其他提供商),那么所有四个代理都会自动切换到新的后端,而无需进行任何代码修改。因为API接口是所有代理之间进行通信的规范。
添加新的MCP工具。**》可以在filesystem_server.py或memory_server.py文件中添加@mcp.tool()函数,在explainer.py文件中添加相应的@tool包装器,并将其包含在EXPLAINER_TOOLS列表中。这样,代理程序在接收到用户输入时,就会知道何时使用新的工具来进行处理,无需进行其他任何修改。
添加一个新的A2A服务。按照`quiz_service.py`的模式,在`a2a_services/`目录下创建一个新的模块:该模块应包含代理类、执行器的子类以及uvicorn服务器。在`a2a_client.py`文件中编写客户端函数,任何需要使用该服务的代理都可以调用这个客户端函数。该服务是一个独立的进程,可以独立于主应用程序进行部署、扩展或重启。
将系统状态迁移至 PostgreSQL。在`workflow.py`文件中,将`SqliteSaver`替换为`PostgresSaver`,并设置连接字符串为你的 Postgres 实例的地址。其他配置无需更改。LangGraph 的检查点接口与后端实现无关。
为 A2A 服务添加认证机制。需要用支持认证功能的中间件来包装`create_quiz_server()`函数创建的 Starlette 应用程序。A2A 协议本身就支持这一功能:Agent Card 可以指定认证方案,而客户端则可以在任务数据中传递凭证信息。在不受信任的网络环境中进行生产环境部署时,必须执行这一步操作。
这些扩展措施分别针对架构中的不同层次进行优化,没有一项需要修改底层代码。
📌 检查步骤:首先运行完整的测试套件,确保所有功能都能正常工作:
make services
pytest tests/ -v
# 共 184 个测试用例,默认会跳过评估类测试
接着使用 Ollama 运行评估类测试:
pytest tests/test_eval.py -m eval -s -v
# 共 12 个评估测试,用于检测质量、准确性及评分机制的稳定性
最后手动测试整个系统功能:
make run
# 按提示完成测试流程
# 可通过 Langfuse 用户界面查看测试结果
如果这三个验证步骤都能通过,说明系统已经搭建完成。
9.7 五种按实施难度排序的扩展方案
你现在已经拥有一个可以正常运行的四代理系统了。这才是最困难的部分,之后的改进都是逐步进行的。下面列出的每一步都是自然的后续发展方向,并不需要重新编写代码。
1. 在一小时内将推理后端替换为托管式网关
系统中的所有代理都会使用指向`OLLAMA_BASE_URL`的`ChatOllama`服务。只需将这个地址更改为 LiteLLM 网关的地址即可。LiteLLM 在前端会使用 Ollama 的 API,而在后端则会根据需求路由到 OpenAI、Anthropic、Together 或其他提供商的服务。所有四个代理只需修改一个环境变量,就能切换到新的后端。
对于备用路由机制,也可以采用同样的方法:配置 LiteLLM 使其优先尝试使用 GPT-4,如果失败则转而使用 Claude,如果两者都无法使用,则使用本地模型。代理代码本身根本不需要知道这些背后的切换逻辑。
2. 在几小时内为 A2A 服务添加认证机制
Agent Card 可以指定认证方案。在生产环境中的 A2A 服务中,应该要求用户提供承载令牌或 mTLS 证书。需要用兼容 FastAPI 的认证中间件来包装`create_quiz_server()`函数创建的 Starlette 应用程序,同时更新`a2a_client.py`文件,使客户端能够在任务数据中传递凭证信息。这样,这些服务就可以在不受信任的网络环境中安全地使用了。
A2A 协议本身就支持这种认证机制。承载令牌会像其他 REST 服务一样,被放在 HTTP 请求的`Authorization`头部中传送。
3. 将SQLite的检查点机制迁移到PostgreSQL(包括测试在内,耗时半天)。
在`workflow.py`文件中,将`SqliteSaver`替换为`PostgresSaver`。请将连接字符串设置为你的Postgres实例地址。LangGraph的检查点接口与后端系统无关。
对于多实例部署来说,这一点非常重要。SQLite适用于单进程环境,而PostgreSQL允许你在同一检查点存储基础上运行多个`main.py`实例(或Streamlit应用程序),这样在实例重启后,用户会话信息仍然可以保留,并且任何实例都能继续使用这些会话数据。
4. 添加实时响应功能(耗时一两天完成开发)。
LangGraph支持使用`graph.astream()`函数来实现从代理节点发送的令牌级实时数据流。更新Streamlit用户界面,使其能够接收这些数据流,并在数据生成的同时立即显示解释结果。这样一来,用户只需等待500毫秒就能看到输出结果,而无需等待3到4秒钟才能得到完整的响应。
其中,“解释器”组件受益最大。它能够生成长度为1,500到2,500个字符的解释文本,因此用户感受到的延迟缩短效果非常显著。
5. 开发适用于移动设备的用户界面(耗时一周完成开发)。
请用React或Next.js框架构建一个新的前端界面,该界面会调用围绕图结构设计的FastAPI接口。新的前端界面会提供与REST接口相同的五步操作流程:目标输入、路线图审批、解释说明、测验环节以及最终结果展示。`src/`目录中的LangGraph代码无需做任何修改,测验题的收集和评分方式也与现有的Streamlit应用程序保持一致。API接口规范如下:
POST /api/sessions → 创建会话,返回session_id及路线图信息
POST /api/sessions/:id/approval → 请求体中包含{"approved": true/false}字段
GET /api/sessions/:id/current → 获取当前主题、解释说明及相关问题
POST /api/sessions/:id/answer → 提交测验答案,获取评分结果
GET /api/sessions/:id/summary → 在完成所有操作后获取最终总结信息
如果“学习加速器”项目真正成为一款商业产品,那么就会采用这种架构。图结构在后台运行,前端则是一个轻量级的客户端。附录C中列出的生产环境优化措施同样适用于这种架构。
9.8 生产环境优化
目前这个系统还处于测试阶段。它在本地环境中运行时能够优雅地处理各种错误,并且能够正确演示所有的功能。但它还不足以满足企业规模下数千名用户同时使用的需求。
为了达到这一目标,需要按照各项修改工作量从大到小的顺序进行相应的调整。
按请求限制访问频率。需要在编排层为每个代理节点设置令牌使用额度,并将这些额度作为硬性限制来执行。一个由4个代理组成的系统,每个代理会使用5种工具,那么对于每个用户请求来说,总共需要执行20次以上的大型语言模型调用。在大规模应用环境中,成本问题会成为比架构设计更重要的考虑因素。LiteLLM网关使得这一问题的处理变得非常简单:它能够追踪每次会话中的消耗情况,并且可以设置使用上限来控制成本。检查点的迁移安全性。请为你的`AgentState`架构版本进行管理。当你部署系统的新版本时,那些基于旧架构创建的检查点会在尝试使用新代码进行反序列化时出现错误。如果某些字段被添加或删除,这些工作流程就会在运行过程中失败。
应将检查点的格式视为一种公共API:新字段应作为可选选项提供,并设置默认值;对于被删除的字段,在正式删除之前应先在多个发布周期内继续使用它们;同时,务必将架构迁移测试纳入你的部署流程中。
冷启动处理机制。那些包含模型权重以及大量依赖关系的代理容器,在进行冷启动时可能需要30到60秒的时间。在生产环境中,用户不可能等待这么长时间让容器完成初始化过程。因此,要么维护一组处于活跃状态的容器(虽然这会带来成本上的考虑),要么设计一些备用方案,以便在冷启动导致延迟的情况下使用更简单、响应速度更快的代理来替代它们。没有第三种选择——千万不要忽视冷启动可能带来的问题。
大规模环境下的可观测性。在开发环境中,本地版的Langfuse已经能够满足需求;但在生产环境中,你需要使用受管理的Langfuse,或者类似的分布式追踪系统,这类系统才能处理每天数以百万计的追踪请求。
决策级追踪功能才是你真正需要的。仅仅依靠基础设施指标,是无法了解多代理推理链中究竟发生了什么问题的。即使请求延迟在可接受范围内,但如果模型产生的结果是不正确的,那仍然意味着存在问题。
持续集成环境中的评估机制。第7章中提到的DeepEval测试应该被纳入你的部署流程中。每当有新的模型、提示或代理配置发生变化时,都应该触发一次全面的评估。如果评估结果显示性能下降到了预设的阈值以下,那么该变更就应该被阻止。这种评估机制就是用来检测大语言模型行为是否出现退化的工具,它是防止系统质量逐渐下降的重要保障。
内容安全性的把控。代理产生的输出在到达用户或生产系统之前,必须经过内容过滤器的处理。虽然Explainer的功能是基于你的笔记来运行的,但大语言模型仍然有可能生成一些虚假的内容或违反政策规定的信息。
<在任何涉及重要应用场景的生产环境中,如果错误的输出可能会带来严重后果,那么在输出数据到达数据库或用户端之前,设置架构验证机制和内容过滤器是必不可少的。
<附录C中列出了完整的强化安全检查清单。
9.9 2026年生态系统的发展方向
<在规划下一个项目时,有几种趋势正在重塑多代理系统的构建方式,这些趋势都值得你关注。
协议标准化进程
在2025年,MCP和A2A这两种技术标准都发布了1.0版本。谷歌、Anthropic、Salesforce、SAP以及数十家其他厂商也都加入了这一标准化进程。当前,代理系统的开发正沿着与Web服务发展过程中REST协议所经历的相同路径前进:初期阶段各种标准五花八门,随后会逐渐形成几个明确的行业标准,其他所有技术都会朝着这些标准靠拢。
<这对你的工作意味着什么?现在就将你的工具接入机制标准化到MCP上,同时将代理协调功能标准化到A2A上,是一个风险较低的明智选择。三年后,这些协议仍然会具有重要的实际意义。而具体的框架选择则可能会随时间发生变化。
以本地计算为主的基础设施
本地推理与云端推理在质量上的差距正在逐渐缩小。一年前,使用本地70亿参数的模型来运行多智能体系统还只是一种演示手段,并非实际生产工具;而如今,拥有70亿到320亿参数的Qwen 2.5已经能够可靠地用于生产环境中的各种工作流程。
本地推理在隐私保护、成本控制以及延迟方面具有显著优势。某些行业确实无法将数据传输给外部API;那些在本地环境中表现良好的架构,同样也适用于通过管理型网关进行数据传输的场景。而那些专为特定云服务提供商的功能而设计的架构,则往往更难以进行迁移。
更长的上下文范围,更精简的智能体结构
当前,智能体能够处理的上下文信息量正在不断增加。现在已经有不少商业模型能够处理超过100万个标记单元的信息。这种趋势使得多智能体系统的必要性逐渐降低:如果一个智能体就能够完整地理解整个对话内容并进行推理,那么为什么还要划分任务让多个智能体分别处理呢?
答案已经发生了变化。多智能体系统的重要性不再在于上下文窗口的管理,而在于各智能体的专业化分工、故障隔离能力以及独立部署的可行性。
关于这些原因的具体讨论,请参见第1章。随着单智能体系统的功能不断完善,“是否需要使用多智能体系统来解决某个问题”这个标准也在逐渐提高;如今,许多正在开发多智能体系统的团队,其实都可以通过使用单一智能体和更先进的工具来达到相同的效果。
本手册中介绍的各种设计模式依然具有适用性,关键在于何时选择使用它们而已。
9.10 这些模式的应用场景
“学习加速器”这一工具本身其实是一种教学载体,而这些设计模式才是真正能够实现知识迁移的关键。如今,许多生产系统都在采用这种架构。
1. 销售支持流程的优化
“课程规划智能体”会为新入职的销售人员制定培训路径;“内容讲解智能体”则会通过MCP平台,根据内部知识库的内容来解释产品特性;“评估测试智能体”用于检测学习者的理解程度;“进度跟踪智能体”则负责记录学员在多个产品领域的认证情况。在培训开始之前,管理者会通过人工审核环节来确认这些培训计划的合理性。
2. 合规性培训
针对HIPAA、SOX、GDPR等法规要求,专门设计了相应的“课程规划智能体”;“内容讲解智能体”会基于实际的监管文本(而非模型的训练数据)通过MCP服务器进行内容推送;“评估测试智能体”则设有更严格的评分标准,并能生成可供监管机构查阅的审计记录。在培训计划正式实施之前,还必须经过人工审核这一法律合规性检查环节。
3. 客户服务流程
“问题接收智能体”会负责对客户提交的工单进行分类;“资料查询智能体”则会通过MCP平台查阅知识库中的相关内容;“回复撰写智能体”负责生成相应的回复文本;“审核验证智能体”会在发送回复之前检查其是否符合公司政策要求。此外,A2A交互功能使得Salesforce智能体可以调用ServiceNow智能体,进而再由后者调用自定义的LangGraph智能体——这样就可以实现跨系统之间的无缝协作,而无需进行复杂的集成开发工作。
4. 工程技术人员的培训流程
代码库代理会为新员工引导他们了解代码仓库的结构;工具使用代理则会解释开发环境的相关知识;而代码审查代理则可以解答有关编码规范的问题。所有这些功能都是基于实际的代码库和文档来实现的,而这些文档都存储在通过MCP服务器指向的内部代码仓库中。
这些功能的共同点在于:它们都体现了第一章中提到的架构设计原则——不同的子任务需要使用不同的工具,不同的大型语言模型也会采用不同的调用方式;这种专业化设计可以避免某个代理承担过多的功能,同时也有助于实现故障隔离。
选择多代理架构并非出于追求新奇性,而是因为问题的本质确实适合采用这种架构。
9.11 接下来该做什么
以下是一些建议,这些建议从最简单的改进到最复杂的扩展项目都有涉及。
-
开发自己的MCP工具:将文件系统服务器指向你自己的笔记目录,然后编写一个MCP服务器,让它能够查询你常用的知识存储平台,比如Notion、Confluence或你们团队的文档网站。工具调用机制本身是相同的,只有服务器的实现部分需要进行调整。
-
调整学习课程的内容:“学习加速器”系统原本是基于编程主题设计的,你可以修改《curriculum_planner.py》文件中的提示内容,使其适用于你的特定领域,比如医学教育、语言学习或法律培训。不过图结构本身保持不变。
-
开发一个辅助分析代理:添加第六个代理,让它定期运行(但不在主流程图中),并汇总各次学习过程中的数据。这个代理会读取检查点数据库、Langfuse的追踪记录以及MCP内存中的信息,然后生成每周的学习进度报告。这是一个非常实用的扩展功能,因为它可以在不修改现有代码的情况下测试系统的各个部分。
-
编写自己的使用手册:巩固这些设计原则的最佳方式就是通过教学来传播它们。为某个特定的问题构建一个新的多代理系统,并记录下你在开发过程中学到的经验。那些基础设施层面的设计模式——比如用于工具管理的MCP、用于代理协作的A2A、用于整体调度的LangGraph、用于确保系统稳定性的检查点机制,以及用于评估的基于大型语言模型的评判系统——其实适用于任何多代理系统,只是具体的代理和工具会有所不同而已。
结论
你最初编写这份手册时提出了一个问题:你的问题是否真的需要多个代理来处理?正是这个问题促使我们在后续的设计过程中始终保持客观和严谨的态度。
“学习加速器”系统中的每一个代理的存在,都是因为它们所处理的任务确实与其他代理不同——它们使用不同的工具,采用不同的大型语言模型调用方式,其运行状态也各不相同,而且出现故障的原因也会不一样。
我们选择多代理架构,并非是为了追求新颖性,而是因为问题的本质要求我们必须这样做。
在这个决策之后,每一个技术层的设计都遵循了同样的原则和规范。
-
LangGraph提供了状态管理功能以及检查点机制,因为生产环境中的系统在发生故障时绝对不能丢失当前的状态。
-
MCP实现了工具使用的标准化,因为代理不应该依赖于特定的实现方式来工作。
-
A2A使得跨框架的协调成为可能,因为在现实环境中,基础设施往往需要跨越多个不同的技术框架来构建。
-
Langfuse能够记录决策层面的操作过程,因为仅凭基础设施指标是无法判断某个代理是否正确地完成了它的任务。
-
DeepEval通过质量检测机制来评估大型语言模型的输出结果,因为唯一可靠的评价方法就是让另一个大型语言模型根据明确的评判标准来进行评估。
-
Streamlit用户界面证明了LangGraph代码与具体的I/O操作方式无关,它的功能可以不受这些因素的影响而正常运行。
-
同一个图结构既可以用于终端会话,也可以用于Web应用程序。
所有这些设计背后的工程原理都是值得我们继续遵循的:在一个设计良好的多智能体系统中,每一条边界都应当是一种协议,而不是某种耦合机制。
智能体通过TypedDict契约与状态系统进行交互;它们通过MCP与工具进行通信;智能体之间则通过A2A协议进行协作;而它们与观测系统的连接则是通过LangChain回调来实现的。
这些边界中的每一条都可以被替换、修改或扩展,而不会影响到其他部分。正是这种设计理念使得该系统具备了生产级应用的特性。关键不在于你使用了哪些具体的框架,而在于你是否能够用清晰的接口将这些框架组织起来。
无论你接下来要构建什么,都请牢记这一原则。模型会发生变化,框架也会更新,但智能体时代特有的开发工具和技术的发展速度,远远超过了任何手册所能跟上的程度。因此,良好的架构设计才能经得起时间的考验。
本手册的完整代码可以在github.com/sandeepmb/freecodecamp-multi-agent-ai-system找到。你可以克隆这个代码,运行它,对其进行扩展或修改。如果你能基于这些设计模式创造出一些有价值的东西,我真的很希望听到你的消息。
现在,就开始动手去构建吧。
附录A:框架对比
本手册中介绍了各种框架以及它们适用的场景。这个表格反映了2026年初时的技术生态状况。具体功能可能会发生变化,但各框架的适用场景通常会保持稳定。
| 框架名称 | 其核心功能 | 适用场景 | 不适用场景 |
|---|---|---|---|
| LangGraph | 一种具有检查点机制、条件路由功能以及原生HITL支持的状态驱动型智能体框架 | 适用于需要状态持久化和确定性路由的生产级多智能体系统 | 不适用于没有状态的简单单智能体任务 |
| CrewAI | 一种基于角色的多智能体框架,支持声明式的团队组织和任务分配 | 适用于需要快速原型设计角色协作场景的应用 | 当面临复杂的分支逻辑或自定义控制流程时,CrewAI的抽象层可能会成为瓶颈 |
| AutoGen | 微软开发的对话式多智能体框架,支持群组聊天模式 | 适用于研究和探索性工作,尤其是那些需要通过对话来驱动多智能体交互的场景 | 不适用于需要严格控制流程和显式状态管理的生产级系统 |
| LlamaIndex | 一种以RAG技术为核心的多智能体框架,具备强大的数据采集与检索能力 | 适用于那些主要面临非结构化数据检索挑战的应用场景 | 纯智能体协调机制,在实际应用中通常需要结合LangGraph等高级框架来使用 |
| LangChain | 一个为大型语言模型应用程序提供基础功能的工具包,也是LangGraph所依赖的技术基础 | 用于智能体内部的底层构建模块,如提示生成器、输出解析器等 | 直接用于实现多智能体系统的协调与控制功能,也可以与LangGraph结合使用 |
| MCP(协议) | 模型上下文协议,是一种标准化的智能体与工具之间的交互接口 | 适用于任何需要让工具实现具备可互换性并在不同框架间复用的场景 | 对于那些仅用于内部开发的工具来说,Python函数通常就已经足够使用了 |
| A2A(协议) | 智能体之间的直接通信协议,支持通过HTTP在不同框架间的协作 | 适用于跨团队或跨框架的智能体协调工作,尤其是那些需要独立部署智能体的场景 | 对于那些耦合度较高的智能体系统来说,直接使用A2A进行函数调用会更加简洁高效 |
在选择协调框架时,可以参考以下经验法则:LangGraph的优势特性(如检查点机制、中断/恢复功能以及明确的状态契约)在生产环境中显得尤为重要;当基于角色的模型结构与你的应用领域高度契合时,CrewAI会是一个不错的选择;而AutoGen的群聊模式则更适用于研究性和探索性工作,而非严格的生产控制流程。
切勿让对框架的偏好凌驾于问题的本质之上——如果你的问题涉及图结构处理,就使用LangGraph;如果问题与对话相关,那就选择AutoGen。
需要注意的是,MCP和A2A并不是与这些框架竞争的工具,它们实际上属于底层集成层。你可以用LangGraph构建智能体,将其作为A2A服务进行部署,并利用MCP提供的工具来辅助开发。无论你最终选择了哪种协调框架,这三种技术都是可以混合使用的。
附录B:模型选择指南
该系统中的所有智能体在本地推理时均使用Ollama模型。所选模型的质量直接决定了工具调用的可靠性——参数量低于70亿的模型往往会产生格式错误的JSON数据,导致工具调用失败。
按VRAM容量推荐的模型
| VRAM容量 | 模型名称 | 下载命令 | 最适合的场景 |
|---|---|---|---|
| 8 GB | qwen2.5:7b |
ollama pull qwen2.5:7b |
通用场景,工具调用稳定可靠 |
| 8 GB | qwen3:8b |
ollama pull qwen3:8b |
推理能力更强,适合相同VRAM容量的设备 |
| 24 GB | qwen2.5-coder:32b |
ollama pull qwen2.5-coder:32b |
该级别中工具调用效果最佳 |
| 24 GB | qwen3:32b |
ollama pull qwen3:32b |
整体性能最优的模型 |
| 仅支持CPU运行 | qwen2.5:7b (Q4_K_M) |
ollama pull qwen2.5:7b |
可以使用,但速度会慢5到10倍 |
在macOS系统中,苹果自家的统一内存技术使得CPU和GPU能够共享内存。配备16GB统一内存的Mac机器,实际上会有约8GB的内存被分配给模型使用。具体容量可以通过“Apple菜单”→“关于本机”→“芯片信息”来查看。
对于生产环境中的智能体应用而言,最低要求的模型参数量为70亿。参数量低于70亿的模型虽然也能处理聊天任务,但会生成大量格式错误的JSON数据,从而导致工具调用失败。
Ollama中设置的`format="json"`选项有助于确保生成的JSON数据是有效的。不过,模型仍然需要生成具有实际意义的JSON数据而不仅仅是可解析的字符串,因此参数量至少需要达到70亿才能满足这一要求。
本系统中使用的温度设置
这些温度设置是预先配置在每个智能体中的。对于那些会生成结构化JSON数据的智能体来说,绝对不能将`temperature > 0.5`这个设置启用,否则解析过程将会变得不可靠。
# 结构化输出:课程规划工具、测验生成工具及评分系统
ChatOllama(temperature=0.1, format="json")
# 工具调用循环:解释器功能
ChatOllama(temperature=0.3)
# 创意生成模块:测验题目生成工具、学习进度辅导工具
ChatOllama(temperature=0.4, format="json")
# 确定性评估系统:DeepEval OllamaJudge
ChatOllama(temperature=0.0)
为什么不同的温度设置很重要: 如果一个代理仅使用一个固定的温度设置,那么它处理的每一项任务都会受到影响。对于结构化JSON规划来说,0.1这个温度设置才能保证结果的一致性;在生成创造性问题时,0.4这个温度设置有助于提高问题的多样性;而在进行评分时,0.1这个温度设置则能确保评分的公平性。
如果让同一个代理使用temperature=0.25这个设置来完成这三项任务,那么规划过程会出现解析错误,而问题生成机制也会产生重复性的问题。因此,将这些任务分配给具有不同温度设置的多个代理,正是该系统中采用多代理架构的核心原因之一。
模型切换
只需修改.env文件中的OLLAMA_MODEL设置即可,无需对代码进行任何更改。
# .env
OLLAMA MODEL=qwen2.5-coder:32b
OLLAMA_BASE_URL=http://localhost:11434
如果还没有下载该模型,请执行以下命令进行下载:
ollama pull qwen2.5-coder:32b
在下一次运行时,这四个代理会自动使用新的模型。
按模型划分的评估测试阈值
tests/test_eval.py文件中设定的阈值是针对7B规模的模型制定的,其值为0.6。规模更大的模型通常会获得更高的分数。如果你升级了系统并希望设置更严格的质量检测标准,可以适当提高这些阈值:
| 模型等级 | 准确性 | 相关性 | 问题质量 | 备注 |
|---|---|---|---|---|
| 7-8B规模模型 | 0.65-0.80 | 0.70-0.85 | 0.65-0.80 | 默认阈值为0.6 |
| 32B规模模型 | 0.80-0.90 | 0.85-0.95 | 0.80-0.90 | 可将阈值提高至0.75 |
| GPT-4 / Claude | 0.85-0.98 | 0.90-0.98 | 0.85-0.95 | 可将阈值提高至0.85 |
建议将阈值设定在平均得分的90%左右。如果阈值设置得过低,测试结果可能会不准确;而如果阈值设置得过高,则可能会错过一些系统退化现象。
附录C:生产环境优化检查清单
当前版本的系统还处于入门级阶段。在大规模部署之前,请先完成这份检查清单中的所有项目。清单中的每一项都对应着在实际生产环境中可能出现的故障情况。
协调机制与系统状态管理
-
[ ] 将用于存储检查点的数据库从SQLite更换为PostgreSQL。SQLite适用于单进程环境,而多实例部署则必须使用Postgres。
-
[ ] 为你的
AgentState数据结构设置版本标识。可以添加新的字段,并为这些字段设置默认值;对于不再需要的字段,应在某个发布周期之后再将其删除。 -
[ ] 将数据库结构迁移测试纳入部署流程中。在系统进行滚动更新时,相关工作流程必须能够正常运行。
-
[ ] 为每个代理调用设置明确的超时时间限制。请确保这一超时限制能被所有下游服务正确接收并执行。
-
[ ] 在每次对外部服务进行调用时都添加断路器保护机制(例如针对LLM API、A2A服务或MCP服务器)。如果重试次数过多,将会加剧生产环境中的压力。
推理与成本
-
[ ] 通过带有速率限制、模型回退机制以及会话级成本跟踪功能的推理网关(如LiteLLM等工具)来处理请求。
-
[ ] 在编排层强制实施针对每个代理的令牌使用预算限制。这是硬性规定,而非建议性措施。
-
[ ] 为每次调用工具的循环设置
max_iterations上限。例如,Explainer工具的这一限值为8;需确保所有代理都遵循相同的限制规则。 -
[ ] 监控会话级别的成本消耗,一旦某次会话的成本超出预算,立即发出警报。否则,出现异常行为的代理可能会无限循环下去而无法停止运行。
可观测性
-
[ ] 将Langfuse迁移到受管理的环境或高可用性的自托管环境中。本地部署的Langfuse无法满足生产环境中的大规模追踪需求。
-
[ ] 使用结构化标签(如用户ID、功能开关、模型版本等)来捕获会话级别的追踪数据,以便后续进行筛选和对比分析。
-
[ ] 设置警报机制,用于监测错误率骤升、令牌成本异常以及延迟恶化等情况。
-
[ ] 在生产环境中对追踪数据进行抽样分析。100%的全面采样会带来较高的成本负担;通常来说,10%到20%的抽样比例,并同时完整记录所有错误信息,就已经足够了。
-
[ ] 定期将追踪数据导出到数据仓库中,以便进行长期分析或满足监管要求。
评估与质量保障
-
[ ] 在每次部署时,都在持续集成环境中运行评估套件。对于未通过质量检测的部署请求,必须阻止其继续进行后续流程。
-
[ ] 建立一个包含已知正确输入及预期输出结果的回归测试集,并在每次模型更新前执行这些测试。
-
[ ] 随时间推移持续跟踪各项质量指标。与突然出现的性能下降相比,逐渐发生的质量变化更难被及时发现。
[ ] 对于那些高风险决策,需要通过人工审核的方式来评估输出结果。并非所有输出内容都需要人工审核,但必须抽取具有统计意义的部分样本进行审查。
安全性
-
[ ] 为点对点服务添加认证机制。根据具体环境需求,可以选择使用承载令牌、MTLS或OAuth等方式来进行身份验证。
-
[ ] 对MCP工具的实现过程进行审计检查,重点关注路径遍历、注入攻击以及权限提升等风险行为。该系统中的
read_study_file函数就存在这样的安全漏洞。 -
[ ] 对输入到大型语言模型中的数据进行处理,确保其安全性。模型能够看到的任何内容都可能影响其行为,因此必须防止来自外部内容的间接指令注入。
-
[ ] 在将结构化输出结果应用于生产系统之前,必须对其进行验证。验证内容包括数据格式是否正确、是否符合相关规则以及是否存在安全隐患等。
-
[ ] 为所有会导致生产环境中的实际操作的决定记录不可篡改的审计日志。这对于那些受到监管要求的行业来说是必不可少的。
-
[ ] 为高风险操作设置人工干预机制。低风险操作可以自动化处理,而高风险操作则必须由人工来进行决策。
-
[ ] 定期更换API密钥、数据库连接凭证以及服务令牌等敏感信息。
可靠性与故障模式
-
[ ] 为每一个外部依赖项都设计“备用路径”。在该系统中,Progress Coach采用的A2A备用机制就是这样的模型:首先尝试使用该服务;如果出现故障,则自动切换到备用方案,而不会向用户显示任何错误信息。
-
[ ] 需要妥善处理代理容器的“冷启动”问题。应采用“热池”技术或可接受的备用方案,绝不能让用户等待超过60秒才能让容器完成初始化过程。
-
[ ] 需要在代理的输出结果中加入内容过滤机制。即使输入数据是可靠的,也可能会出现异常结果。
-
[ ] 需要为每一项服务都设置健康检查机制。A2A Agent Cards可以作为这些健康检查的接口,任何客户端都可以通过它们来验证服务的可用性。
-
[ ] 必须明确测试系统的“优雅降级”功能。应逐一关闭各项服务,然后验证主应用程序是否仍能保持正常响应。
治理机制
-
[ ] 需要详细记录每个代理的任务职责,包括它使用哪些工具、读取或写入哪些数据、以及可能出现的故障类型。
-
[ ] 需要维护一个与git提交版本关联的提示语版本注册表。这样在出现问题时,就能迅速确定当时在生产环境中使用的究竟是哪个版本的提示语。
-
[ ] 在对模型进行升级之前,必须经过审核和批准。更换模型版本可能会导致输出结果发生变化,从而影响下游系统的正常运行。
-
[ ] 需要为代码和模型的变更制定回滚流程。一旦发现错误的部署方案,应该能在几分钟内完成回滚操作,而绝不能花费数小时才能解决问题。
这份清单并不全面,但它涵盖了在多代理系统的生产环境中实际可能出现的各种故障模式。在首次公开发布系统之前,请仔细研究这份清单;随着系统的不断发展,每季度都应重新审视这些内容。
