关键要点

  • 通过将模型上下文协议(MCP)与代理间通信协议(A2A)相结合,可以构建出强大且具备互操作性的代理自动化系统。这些协议可用于利用代理来自动化机器学习运营流程。
  • A2A负责提供通信机制,而MCP则充当描述代理功能的通用语言。采用分层代理架构能够使系统更具扩展性——在代理时代,新增功能无需改变系统的核心通信逻辑即可实现。
  • 分层代理架构的强大之处在于其具备适应性和进化能力。对于那些正在应对人工智能复杂性的组织而言,这意味着他们应该从使用僵化、单一的整体系统转向采用敏捷的、以代理为驱动力的运营模式。
  • 本文中介绍的可重用模板采用了多代理系统设计方法,这种架构模式能够将编排逻辑与执行逻辑分离,这一原则对于提升系统的可扩展性至关重要。该模板为从简单的单体代理系统向协作式系统转型提供了明确的实现路径。
  • 分层A2A-MCP架构模式的应用范围并不仅限于机器学习运营领域。其核心理念适用于任何需要实现动态协作以及灵活调用各种功能的场景——这种架构有助于构建下一代智能系统,使人工智能代理能够从执行孤立任务转变为发挥协同作用,从而实现前所未有的自动化程度与适应性。

引言

随着软件行业进入代理时代,开发人员和架构师们面临着一个共同挑战:正如微服务的普及催生了REST和gRPC等标准化通信规范一样,专用人工智能代理的广泛应用也迫切需要一个完善的框架,以便这些代理能够高效地发现信息、进行沟通并协同工作。

本文提出了一种结合了两种新兴标准的架构模式:代理间通信协议(A2A)与模型上下文协议(MCP)。通过将这些协议分层组合,我们可以构建出强大、可扩展、易于扩展且具备互操作性的多代理系统,在这种系统中,新增功能无需改变系统的核心通信逻辑即可实现。

在本文中,我们首先会介绍这两种协议的核心概念,然后会将这种分层架构模式应用到一个机器学习运营案例中。在这个案例中,我们的目标是:在模型验证通过后将其部署到位,并通过相应的代码来实现这一目标。这些代码将展示一种将编排逻辑与执行逻辑分离的架构模式,这一原则对于提升系统的可扩展性至关重要。

在以智能代理为驱动的模式中,目标是用由专业AI代理组成的动态团队来取代僵化的流程。例如,在我们的MLOps应用场景中,负责部署模型的协调代理可能需要与验证代理和部署代理进行协作。这种架构面临两个根本性挑战:这些代理如何发现彼此并相互沟通?它们又如何获取完成任务所需的特定工具和数据?所提出的架构通过为每种协议分配不同的角色来解决这些问题。

A2A提供了通信机制,使协调代理能够无需硬编码连接就能找到合适的专家并指派任务。MCP则作为一种通用的语言,确保一旦代理接到任务,无论其底层实现方式如何,都能找到并使用所需的工具。

图1:我们MLOps应用场景中的A2A与MCP架构(来源:作者)

选择MLOps作为示例,是为了以此来说明从当前的静态流程向未来的动态、智能代理驱动型操作的演变过程。虽然现有的协调系统功能强大,但它们的僵化结构未来可能会成为发展的瓶颈。当业务逻辑发生变化时,往往需要重新编写代码并重新部署流程。而分层式的智能代理架构正是为这种演变而设计的。我们通过展示协调代理与验证代理、部署代理协同工作的案例,来突出这一关键优势:即能够通过组合不同的功能来适应新的需求,而不需要重写大量代码。从静态执行转向动态协调,正是我们希望通过AI代理的发展来实现的核心目标。

这里介绍的原则并不局限于MLOps领域,而是可以应用于任何领域。

我们工作流程中的分层协议

A2A:智能代理之间的通信机制

A2A的设计目的是让不同供应商提供的AI代理能够在不同的系统之间安全地进行通信。它解决了多智能代理环境中互操作性的问题。通过允许来自不同供应商的代理进行交互,A2A有助于实现模块化的工作流程,减少对特定供应商的依赖,并提升系统的可扩展性。可以把A2A看作是所有智能代理共用的“通用语言”。

核心机制

  • 互操作性的关键要素
    在A2A环境中,每个智能体都会被分配一张“智能体卡片”,该卡片会描述其功能、支持的协议以及可处理的请求类型,这样其他智能体就能发现并与其交互,而无需暴露敏感信息。可以将这张卡片视为智能体的“属性”。随着智能体的发展,这张卡片也会相应更新,从而让外部世界能够识别出这些升级内容。
  • 通信机制
    在A2A系统中,消息是通过标准的Web技术进行交换的,所使用的格式包括JSONJSON-RPC等。这种设计简化了与现有Web基础设施的集成过程,这一点非常重要,因为智能体的出现不应干扰现有的通信技术。
  • 安全性与治理机制
    A2A技术受到了Linux基金会的支持,这一举措旨在促进中立、协作式的治理机制,并确保技术的长期可持续发展。

A2A为何如此重要:

  • 它能够将那些孤立的“一次性使用的大型语言模型工具”转化为具备协作能力、能够进行谈判并实现专业化的多智能体系统。
  • 它使得某种工作流程成为可能:其中一个智能体可以以平等的身份调用另一个智能体,而不仅仅是以API客户端的方式与其交互。
  • 它支持横向扩展智能体系统的功能:人们无需构建单一的巨型智能体,而是可以通过组织由多个小型、专业化的智能体组成的生态系统来提升系统性能。

MCP:领域特定语言

MCP是一种旨在规范人工智能系统与工具、服务及数据源之间连接方式的协议。人们常将其称为“人工智能集成领域的USB-C标准”,因为MCP提供了一种通用接口,使人工智能应用程序能够无需编写自定义代码就能接入外部数据源和工具。

核心机制

  • 互操作性的关键要素
    MCP服务器会暴露三种类型的实体:工具提供了智能体可以执行的操作,例如运行代码或调用API;资源包含了智能体可以查询或加载的结构化数据;提示信息则提供了用于引导智能体行为的预定义模板。这些基本元素都遵循标准定义,因此任何兼容MCP的客户端都能直接识别并使用它们,而无需进行任何定制化的集成工作。
  • 通信机制
    与A2A类似,MCP也试图重用现有的通信技术,比如HTTPSSE等。同时,它也采用了简单的客户端-服务器架构。
  • 安全性与治理机制
    虽然MCP能够实现强大的集成功能,但也会带来一些风险,比如提示信息被篡改、工具被恶意利用以及数据被未经授权地访问。尽管单独使用MCP可能并不理想,但它可以与其他工具(如MCPWatch)结合使用,从而有效提升系统的安全性。

为何MCP如此重要:

MCP使代理能够超越固定的功能限制,从而能够发现并使用网络中任何可用的工具或资源。这样一来,就无需重新构建代理即可添加新的功能。

MCP通过让代理将工具视为可被发现的服务,实现了工具之间的无缝集成。这种方式消除了自定义集成逻辑的必要性,也简化了新增功能、API或数据集的操作流程。

MLOps工作流程

为了展示我们的分层架构,我们将以一个非常常见的MLOps工作流程案例来进行演示——在这个案例中,我们会自动化机器学习模型的验证与部署过程。该系统由三个专门设计的代理共同协作来完成目标:

  • 编排代理
    它充当协调者的角色,将高层次的目标(例如“验证并部署最新模型”)转化为一系列具体的任务。通过A2A协议,它会为每项任务找到合适的专用代理,传递必要的上下文信息,并根据执行结果做出决策。
  • 验证代理
    这是一个专门负责模型验证的代理。它通过自身的A2A卡片展示其功能,比如性能测试或偏差分析等。在接到执行请求后,它会利用底层的MCP工具来完成相应的验证工作。这样一来,编排代理无需了解具体的实现细节,就能发起验证请求。
  • 部署代理
    这个代理专门负责部署经过验证的模型。与验证代理类似,它也会通过A2A卡片展示自己的功能,并寻找必要的MCP工具来完成部署任务。

工作流程的序列图:

图2:MLOps工作流程的序列图(来源:作者提供)

执行过程与流程机制

从查询到编排

整个流程始于MLOps工程师提交高层次的请求。`OrchestratorAgent`会在其流处理方法中接收这个请求,然后立即调用`internal _create_plan_from_query`方法。该方法会利用大语言模型进行推理分析,将复杂的请求分解为两个具体的子任务:一个是验证任务,另一个是部署任务。

从编排到具体执行

编排代理会开始执行制定的计划。对于第一个任务,它会通过A2A机制找到`ValidationAgent`并传递相应的验证指令。`ValidationAgent`会在其流处理方法中接收这个子请求,然后调用`_create_tool_use_plan`方法。需要注意的是,它的计划重点并不在于分配任务,而在于如何使用各种工具来完成任务。它会通过MCP系统找到`fetch_model`和`validate_churn_model`这些工具,并制定出一系列具体的操作步骤来满足请求要求。这种操作方式应该被编码在初始化时定义的`prompt_personality`字符串中。

从工具到结果

ValidationAgent会按照其制定的工具使用计划执行操作,它会联系MCP服务器来完成任务,并将结果传输回Orchestrator。如果验证成功,Orchestrator就会进入下一阶段,调用DeploymentAgentDeploymentAgent也会遵循相同的流程:首先获取当前状态,然后进行部署操作,最终将结果反馈给用户。

代码架构解析

现在,我们将把这些理论应用到实践中来。在本次代码分析中,我们以一位MLOps工程师提出的示例查询为例来进行讲解:

“获取最新的客户流失预测模型,并将其提交给验证模块进行测试。如果该模型的绝对偏差小于或等于0.04,则批准其进行部署;如果当前的生产环境使用的是us-west-1区域,那么就将新模型部署到us-west-2区域;否则,仍保持原部署位置。”

为了构建一个能够执行此类命令的系统,我们首先需要确定其基本组成部分。我们会从设置MCP服务器开始——这个服务器充当了各代理程序与它们所需工具之间的桥梁。接下来,我们还会介绍A2A架构中的各个组件,以及这些组件之间如何通过标准协议进行连接。

关于实现细节的说明:代码中的许多功能都被有意设计为占位符。这是因为它们的具体实现方式会因所使用的工具或平台而有所不同(例如验证库、云服务提供商或部署工具的选择)。本文的重点在于展示这些组件之间的交互机制及其对应的架构模式。

MCP服务器

MCP服务器是我们系统中所有功能的中心枢纽。它为各代理程序提供了标准化且易于访问的接口,使它们能够使用所需的工具和资源来完成任务。这种设计有效地将代理程序与底层的应用逻辑分离开了。

对于我们的MLOps工作流程而言,MCP服务器提供了以下关键接口:

工具(代理程序可以调用的功能)

  • fetch_model:从模型注册系统中获取最新训练模型的元数据。
  • validate_churn_model:根据指定的要求对模型进行验证。
  • deploy_churn_model:将经过验证的模型部署到目标环境中。

资源(代理程序可以查询的结构化数据)

  • list_agent_cards:列出系统中所有可用的代理程序。
  • retrieve_agent_skills:获取特定代理程序的详细功能信息。

下面的Python代码演示了如何使用FastMCP库来定义这样的服务器及其接口。需要注意的是,每个函数内部的实现细节都被有意省略了,因为这些实现会因所使用的工具而有所不同。这里重点关注的是架构模式本身:即各种功能是如何被定义、命名,并通过标准化协议暴露出来的,从而使任何经过授权的代理程序都能使用这些功能。

#mcp_server.py
from mcp.server.fastmcp import FastMCP

def serve(host, port, transport):
  """初始化并运行MCP服务器

  参数:
  host: 服务器要绑定的主机名或IP地址。
  port: 服务器要绑定的端口号。
  transport: MCP服务器使用的传输机制(例如‘stdio’、‘sse’)。
  """
  mcp = FastMCP("validation-deployment-mcp-server", host=host, port=port)

  @mcp.tool(
    name="fetch_model",
    description="用于获取最新训练好的用户流失模型的MCP工具。",
  )
  def fetch_model(model_version_metadata: dict) -> dict:
    """用于获取最新训练好的用户流失模型元数据的MCP工具。

    参数:
    model_version_metadata: 需要获取的模型数据。

    返回值:
    一个JSON对象,其中包含新模型的元数据信息,以及用于验证的其他元数据,例如测试数据集等。
  """
  pass

  @mcp.tool(
    name="validate_churn_model",
    description "用于验证用户流失模型的MCP工具。",
  )
  def validate_churn_model(validation_config: dict) -> dict:
    """根据validation_config来验证用户流失模型的MCP工具。

    参数:
    validation_config: 包含验证要求的配置信息。

    返回值:
    一个JSON对象,其中包含验证结果。
  """
  pass

  @mcp.tool(
    name="deploy_churn_model",
    description "用于部署用户流失模型的MCP工具。",
  )
  def deploy_churn_model(deployment_config: dict) -> dict:
    """根据deployment_config来部署用户流失模型的MCP工具。

    参数:
    deployment_config: 包含部署要求的配置信息。

    返回值:
    一个JSON对象,其中包含部署结果。
  """
  pass

  @mcp.resource("resource://list_agent_cards/list", mime_type="application/json")
  def list_agent_cards() -> dict:
    """从MCP资源端点获取所有已加载的代理卡片信息,并以JSON格式返回。

    此函数负责处理URI为‘resource://agent_cards/list’的MCP资源请求。

    返回值:
    一个包含所有可用代理卡片信息的JSON对象。
  """

  @mcp.resource(
    "resource://retrieve_agent_skills/{agent_name}", mime_type="application/json"
  )
  def retrieve_agent_skills(agent_name: str) -> dict:
    """以JSON格式获取指定代理卡片的详细信息。

    返回值:
    一个包含代理卡片详细信息的JSON对象。
  ")
  pass

  mcp.run(transport=transport)

def main(host, port, transport) -> None:
  serve(host, port, transport)

MCP客户端

为了让代理能够发现并使用MCP服务器所提供的功能,它需要一个客户端。这个客户端模块实际上是一个高级API,它隐藏了MCP协议中的复杂细节。它不会强迫每个代理自己去构建资源URL或管理连接状态,而是提供了诸如list_agents()list_tools()这样的简洁、可重复使用的接口。

下面的代码展示了一个简单的MCPClient类,该类是基于mcp.ClientSession构建的。它使用异步上下文管理器来处理与服务器连接的生命周期。需要注意的是,这里的连接细节被简化了,目的是突出API的设计思路,而非具体传输机制的实现细节。

from contextlib import asynccontextmanager
from typing import Any, AsyncGenerator, Dict, List

from mcp import ClientSession
from mcp.types import ReadResourceResult, ListResourcesResult, ListToolsResult


class MCPClient:
    """一个用于与MLOps MCP服务器交互的高级客户端。"""

    def __init__(self, host: str, port: int, transport: str):
        """
        使用服务器的连接信息来初始化客户端。
        
        参数:
        host: MCP服务器的主机名或IP地址。
        port: MCP服务器的端口号。
        transport: 传输机制(例如‘http’、‘sse’)。
        """
        self._host = host
        self._port = port
        self._transport = transport

    @asynccontextmanager
    async def _get_session(self) -> AsyncGenerator[ClientSession, None]:
        """
        提供一个用于连接MCP服务器的会话对象。
        具体的实现方式取决于所选择的传输机制。
        ```
        # 在实际开发中,你会根据self._host、self._port等信息来初始化会话对象
        session: ClientSession = None  # 用于存储会话对象的占位符
        try:
            # 例如:通过http协议建立连接
            yield session
        finally:
            # 例如:调用session.close()来关闭会话
            pass

    async def listAgents(self) -> ReadResourceResult:
        """
        从MCP服务器获取所有可用的代理卡列表。
        ```
        async with self._get_session() as session:
        return await session.read_resource("resource://list_agent_cards/list")

    async def get_agent_skills(self, agent_name: str) -> ReadResourceResult:
        """
        从MCP服务器获取特定代理的技能信息。
        ```
        async with self._get_session() as session:
        uri = f"resource://retrieve_agent_skills/{agent_name}"
        return await session.read_resource(uri)

    async def list_resources(self) -> ListResourcesResult:
        """列出MCP服务器上所有的可用资源。"""
        async with self._get_session() as session:
        return await session.list_resources()

    async def list_tools(self) -> ListToolsResult:
        """列出MCP服务器上所有的可用工具。"""
        async with self._get_session() as session:
        return await session.list-tools()

代理执行辅助功能

<为了执行多步骤计划,代理需要一种结构化的方法来管理自身的任务。以下这些辅助类为这一目标提供了可重用的实现模式。其核心思想是将复杂的任务目标表示为一个“TaskList”,而这个“TaskList”本质上就是一个由多个单独的“Task对象”组成的序列。每个“Task”代表工作流程中的某个具体步骤,例如寻找合适的专家代理,或调用特定的工具。>

这种设计方式使得智能体的高层推理机制与低层的执行机制相互分离。

import json
from collections.abc import AsyncIterable
from a2a.client import A2AClient
from uuid import uuid4
import httpx
from a2a.types import (
    AgentCard,
    MessageSendParams,
    SendStreamingMessageRequest,
    SendStreamingMessageSuccessResponse,
    TaskArtifactUpdateEvent,
)
from mcp_client import MCPClient
from a2a.server.agent_execution import AgentExecutor, RequestContext
from a2a.server.events import EventQueue


class Task:
    """表示任务列表中需要执行的单个任务。"""

    task_query: str

    def __init__(self, *args, **kwargs):
        pass

    async def find_agent_for_task(self, mcp_client, query) -> AgentCard | None:
        """从MCP系统中获取适合当前任务的智能体信息。"""

        result = await mcp_client.list_agents(query)
        chosen_agent = select_agent(query)
        agent_card_json = json.loads(chosen_agent.content[0].text)

        return AgentCard(**agent_card_json)

    async def execute_task(self):
        """使用分配的智能体,通过A2A流式通信机制来执行任务。"""

        agent_card = await self.find_agent_for_task(query=self.task_query)
        async with httpx.AsyncClient() as httpx_client:
            client = A2AClient(httpx_client, agent_card)  # 使用A2A客户端与智能体进行通信
            payload: dict[str, any] = {
                "message": {
                    "parts": [{"kind": "text", "text": self.task_query}],
                    # 根据智能体提供的信息,还可以包含其他元素。
                },
            }
            request = SendStreamingMessageRequest(
                id=str(uuid4()), params=MessageSendParams(**payload)
            )
            response_stream = client.send_message_streaming(request)
            async for chunk in response_stream:
                # 将任务执行结果保存起来
                if isinstance(chunk.root, SendStreamingMessageSuccessResponse) and isinstance(chunk.root.result, TaskArtifactUpdateEvent):
                    artifact = chunk.root.result.artifact
                    selfresults = artifact
                yield chunk


class TaskList:
    """表示需要执行的任务的拓扑结构。"""

    task_list: list[Task]  # 需要执行的任务列表。

    def __init__(self, *args, **kwargs):
        """
        将查询结果分解为任务列表以及这些任务应执行的顺序。智能体需要将这样的信息存储在task_list数组中。
        """

        pass

    async def execute_task_list(self) -> AsyncIterable[dict[str, any]]:
        """为智能体执行所有列出的任务。"""

        # .. ...
        for task in self.task_list:
            # ....
            task.execute_task()


class GenericAgentExecutor(AgentExecutor):
    """智能体所使用的执行引擎。"""

    def __init__(self, agent):
        self.agent = agent

    async def execute(self, context: RequestContext, event_queue: EventQueue):
        $$ pass

编排器代理

卡片

{
    "name": "编排器代理",
    "description": "用于调用MLOps工作流,该工作流会执行模型验证与部署操作。",
    "url": "http://localhost:8003/",
    "version": "1.0.0",
    "skills": [
        {
            "id": "orchestrate_the_flow",
            "name": "编排工作流",
            "description": "负责协调MLOps工作流的执行流程。",
            "tags": [
                "先验证模型,然后进行部署。"
            ],
            "examples": [
                "获取最新的流失预测模型,并将其提交到验证模块进行测试。如果该模型的绝对偏差小于或等于0.04,则批准其进行部署;如果当前的生产环境使用的是us-west-1区域,那么就将新模型部署到us-west-2区域;否则仍保持原部署区域不变。"
            ]
        }
    ]
}

代码模板

from typing import AsyncIterable, Any
from agenthelpers import TaskList
from mcp_client import MCPClient

class OrchestratorAgent:
    """
    通过将高层次的目标分解为一系列任务,来协调多步骤工作流程的执行。
    """

    def __init__(self, mcp_client: MCPClient, prompt_personality: str):
        """
        初始化编排器代理。

        参数:
            mcp_client: 用于与MCP服务器交互的客户端对象。
            prompt_personality: 指导代理规划过程的提示信息。
        """
        self._mcp_client = mcp_client
        self._prompt_personality = prompt_personality

    async def _create_plan_from_query(self, query: str) -> TaskList:
        """
        将自然语言查询转换为结构化的TaskList列表,以便后续执行。
        ```
        # 该方法模拟了代理的高层推理过程。在人格提示的引导下,代理会解析用户输入的查询,
        # 并确定需要执行的连续步骤。

        # 以我们的示例查询为例,它会识别出两个主要子任务:
        # 1. 需要满足特定条件的验证步骤。
        # 2. 结果取决于第一步的结果,因此部署步骤也会随之变化。

        # 接着,代理会创建一个TaskList对象,其中每个Task都包含了对应子任务的指令。这与专门执行某项任务的代理不同,
        # 因为后者制定的计划会包含具体的工具调用信息。
        #
        # 例如:
        # Task 1: “获取最新的流失预测模型……并批准其部署。”
        # Task 2: “将新模型部署到备用区域……”
        #

        # 最终,这个方法会返回一个TaskList对象,其中包含了这两个待执行的Task。
        pass

    async def stream(self, query: str) -> AsyncIterable[dict[str, Any]]:
        """
        通过制定计划并执行它来处理用户输入的查询。
        ```
        # 1. 制定计划
        # 首先,代理会调用其内部的规划方法,将自然语言查询转换为结构化的TaskList列表。
        plan = await self._create_plan_from_query(query)

        # 2. 执行计划
        # 接下来,代理会依次执行这些任务。对于每个任务,它会找到负责执行的专门代理(如验证代理或部署代理),
        # 并将相应的子查询传递给它们,最后获取执行结果。
        pass

验证代理

卡片

{
    "name": "验证代理",
    "description": "用于帮助验证机器学习运营模型。",
    "url": "http://localhost:8004/",
    "version": "1.0.0",
    "skills": [
        {
            "id": "validate_the_model",
            "name": "验证模型",
            "description": "帮助验证机器学习运营模型。",
            "tags": [
                "根据用户要求验证模型。"
            ],
            "examples": [
                "获取最新的客户流失预测模型,并将其通过验证模块进行测试。如果该模型的绝对偏差小于或等于0.04,则批准其进行部署。"
            ]
        }
    ]
}

代码模板

from typing import AsyncIterable, Any
from mcp_client import MCPClient

class ValidationAgent:
    """
    一种专门用于通过利用MCP服务器提供的工具来验证机器学习模型的代理程序。"""

    def __init__(self, mcp_client: MCPClient, prompt_personality: str):
        """
        初始化验证代理程序。

        参数:
            mcp_client: 用于与MCP服务器交互的客户端对象。
            prompt_personality: 指导代理程序使用工具的指令集。
        """
        self._mcp_client = mcp_client
        self._prompt_personality = prompt_personality

    async def _create_tool_use_plan(self, query: str):
        """
        将自然语言查询转换为结构化的工具调用序列。
        """
        # 该方法模拟了代理程序的推理过程。

        # 1. 了解自身能力:首先,代理程序需要确定自己能够执行哪些操作。它会调用
        # self._mcp_client.list_tools()来获取MCP服务器上所有可用的工具列表,从而了解自己
        # 是否具备“fetch_model”或“validate_churn_model”等工具。
        # 这部分功能应该包含在prompt_personality中。

        # 2. 制定执行计划:根据可用的工具以及用户的具体查询要求,代理程序会制定相应的执行计划。例如,
        # 对于“...绝对偏差小于或等于0.04...”这样的查询,它会确定需要执行以下操作:
        #   a. 使用“fetch_model”工具获取模型的元数据。
        #   b. 创建一个包含偏差检测功能的“validation_config”对象,并从查询中提取出“0.04”这个阈值。
        #   c. 使用该配置调用“validate_churn_model”工具进行验证。
        #

        # 该方法返回的结果将是一个结构化对象,其中包含了预先配置好的工具调用序列,可以直接执行。
        pass

    async def stream(self, query: str) -> AsyncIterable[dict[str, Any]]:
        """
        通过制定执行计划并依次执行各步骤来处理验证请求。
        """
        # 1. 制定执行计划
        # 首先,代理程序会调用其内部规划方法,将自然语言查询转换为结构化的工具调用序列。
        plan = await self._create_tool_use_plan(query)

        # 2. 执行计划
        # 然后,代理程序会按照计划中指定的顺序依次执行各步骤。它会使用在规划阶段确定的参数,
        # 调用必要的MCP客户端方法(如fetch_model、validate_churn_model)来完成任务。
        # 每一步的执行结果都会被反馈给调度系统。
        pass

部署代理

卡片

{
  "name": "部署代理",
  "description": "帮助部署经过验证的机器学习模型。",
  "url": "http://localhost:8005/",
  "version": "1.0.0",
  "skills": [
    {
      "id": "deploy_the_model",
      "name": "部署模型",
      "description": "帮助部署机器学习模型",
      "tags": [
        "根据用户需求部署模型"
      ],
      "examples": [
        "将新模型部署到备用区域:如果当前的生产环境模型运行在us-west-1地区,那么就将这个版本部署到us-west-2地区;否则,就继续将其部署在us-west-1地区。"
      ]
    }
  ]
}

代码模板

from typing import AsyncIterable, Any
from mcp_client import MCPClient

class DeploymentAgent:
  """
  一种专门用于部署经过验证的机器学习模型的代理程序,
  它会从MCP服务器中查找并使用相应的工具来完成部署任务。
  """

  def __init__(self, mcp_client: MCPClient, prompt_personality: str):
    """
    初始化部署代理程序。

    参数:
    mcp_client:用于与MCP服务器交互的客户端对象。
    prompt_personality:用于指导代理程序使用工具的指令集。
    """
    self._mcp_client = mcp_client
    self._prompt_personality = prompt_personality

  async def _create_tool_use_plan(self, query: str):
    """
    将自然语言查询转换为结构化的工具调用序列。
    """

    # 此方法模拟了代理程序的推理过程。

    # 1. 识别可用工具:代理程序会通过调用self._mcp_client.list_tools()来确认哪些工具是可用的,
    # 例如'fetch_model'和'deploy_churn_model'。这些工具的选择也会受到prompt_personality指令的影响。
    }

    # 2. 制定部署计划:根据查询内容,代理程序会制定相应的部署方案。
    # 例如:“将新模型部署到备用区域……”。其推理过程如下:
    # a. 首先需要确定“当前”部署区域。
    # b. 使用'fetch_model'工具获取当前生产环境的模型元数据。
    # c. 从这些元数据中提取出当前的部署区域。
    # d. 接着确定备用区域。
    # e. 最终的部署计划就是依次执行两个工具调用:先使用fetch_model获取当前状态,
    # 然后使用deploy_churn_model进行部署操作。

    # 此方法的返回值是一个结构化对象,其中包含了可供执行的工具调用序列。
    """

  async def stream(self, query: str) -> AsyncIterable[dict[str, Any]]:
    """
    处理部署请求:首先制定部署计划,然后执行该计划。
    """

    # 1. 制定部署计划
    plan = await self._create_tool_use_plan(query)

    # 2. 执行部署计划
    for step in plan:
        # 按照正确的顺序调用必要的MCP客户端方法,并使用在规划阶段确定的参数。
        result = await self.mcp_client.step(step)
        yield result
    }

用于启动所有代理程序的脚本

import json
import httpx
from pathlib import Path
from basic_helper.promp_personalities import prompts
from orchestrator_agent import OrchestratorAgent
from validation_agent import ValidationAgent
from deployment_agent import DeploymentAgent
from a2a.types import AgentCard
import uvicorn
from a2a.server.apps import A2AStarletteApplication
from a2a.server.requesthandlers import DefaultRequestHandler
from a2a.servertasks import (
    BasePushNotificationSender,
    InMemoryPushNotificationConfigStore,
    InMemoryTaskStore,
)

mcp_client = MCPClient(host="localhost", port=8000, transport="http") # 示例客户端

def get_agent(agent_card: AgentCard):
    """根据提供的AgentCard对象获取相应的代理程序。"""
    try:
        if agent_card.name == "Orchestrator Agent":
            # 这是Orchestrator Agent
            return OrchestratorAgent(mcp_client, prompts.orchestrator_agent)
        if agent_card.name == "Validation Agent":
            # 这是Validation Agent
            return ValidationAgent(mcp_client, promptsvalidation_agent)
        if agent_card.name == "Deployment Agent":
            # 这是Deployment Agent
            return DeploymentAgent(mcp_client, prompts.deployment_agent)
    except Exception as e:
        raise e

def main(host, port, agent_card_path):
    """启动代理程序服务器。"""

    with Path.open(agent_card) as file:
        data = json.load(file)
        agent_card = AgentCard(**data)

    client = httpx.AsyncClient()
    push_notification_config_store = InMemoryPushNotificationConfigStore()
    pushnotification_sender = BasePushNotificationSender(
        client, config_store=push_notification_config_store
    )

    request_handler = DefaultRequestHandler(
        agentexecutor=GenericAgentExecutor(agent=get_agent(agent_card)),
        task_store=InMemoryTaskStore(),
        push_config_store=push_notification_config_store,
        push.sender=pushnotification_sender,
    )

    server = A2AStarletteApplication(
        agent_card=agent_card, http_handler=request_handler
    )

    uvicorn.run(server.build(), host=host, port=port)

if __name__ == "__main__":
    main()

将这两种协议分层设计的架构优势

这种将协调功能与具体执行功能明确分离的设计方式带来了显著的架构优势:

  • 动态发现与高韧性
    协调器并不需要预先了解各个专用代理程序的详细信息。新的代理程序(例如报告生成代理或监控代理)可以随时添加到系统中,协调器无需修改任何代码就能自动识别并使用这些新代理。
  • 可组合性
    各个专用代理程序本身并不是不可分割的单一实体。它们通过从MCP服务器中获取相应的工具来构建自身的功能。例如,只需部署一个新的验证工具,ValidationAgent就可以动态地发现并使用这个新工具。
  • 意图与执行的清晰分离
    协调器负责表达高层次的业务目标,而各个专用代理程序则负责处理具体的执行细节。这种解耦设计使得整个系统更易于理解、维护和扩展。
  • 适应性与演化能力
    通过将通用协调器与一系列可动态发现的专用工具及代理程序相结合,我们可以构建出一种能够灵活应对新出现的需求或复杂指令的系统。

通过将通信与发现协议(A2A)叠加在能力协议(MCP)之上,我们成功架起了从僵化、程序化的自动化向真正以目标为导向、由人工智能驱动的运营模式之间的桥梁。

结论

随着“智能体时代”的到来,软件开发迎来了新的发展范式,因此开发出强大、可扩展且具有互操作性的智能体系统变得至关重要。在本文中,我们提出了一种架构模式,该模式利用智能体间通信协议(A2A)与模型上下文协议(MCP)来应对这一挑战。

通过对MLOps工作流程的深入分析,我们证明了这种分层设计方式能够有效地将编排逻辑与执行逻辑分离开来——而这正是构建可扩展系统所必需的原则。A2A为智能体之间的动态协作提供了必要的通信框架,而MCP则充当了智能体发现和利用各种工具与资源的通用接口。这种架构使得新功能的集成变得十分便捷,且无需改变系统的核心通信机制。

这种分层设计的智能体架构之所以如此强大,就在于它具备适应性与进化能力。对于那些正在应对人工智能带来的复杂挑战的组织而言,这意味着他们需要摆脱僵化的、单一的系统结构,转向灵活的、由智能体驱动的运营模式。这一架构为开发能够快速整合新模型、新工具以及新业务需求的AI生态系统提供了坚实的基础。开发者们也因此获得了构建更加健壮、更易于维护的开发流程所需的强大工具。这种设计模式并不局限于MLOps领域;在任何那些需要实现动态协作及灵活调用各种功能的场景中,它的原则都具有重要意义。通过采用A2A与MCP,我们能够让智能体从执行孤立的任务转变为协同工作,从而在“智能体时代”实现前所未有的自动化程度与适应性。

本文所介绍的这种架构模式为多智能体系统的设计提供了一条可行的路径。它为人们提供了一种有意识的结构框架,帮助大家摆脱简单、单一的智能体设计模式,朝着构建协作型系统迈进。

对于那些有兴趣尝试这些概念并据此开发相关工具的读者来说,GitHub上的官方A2A样本代码库提供了使用这两种协议编写的可运行示例,是入门学习的绝佳资源。

Comments are closed.