大多数人工智能代理在处理快速任务时表现得非常出色。当你发送一条消息后,这些代理会调用一些工具,几秒钟内就能给你回复结果。当你需要让它们总结一份文档或进行网络搜索时,这种机制确实非常有效。

但是,当任务需要花费较长时间来完成时,情况就会不同了。比如“查看我过去三天的电子邮件,为紧急事项起草回复内容,而对于与工程相关的问题,则创建相应的工单”——这类任务显然无法在短时间内完成,可能会花费几分钟、几小时甚至更长的时间。而这仅仅是一个例子而已。

这种处理流程属于典型的全流程工作模式,一旦服务器出现故障或进程重新启动,之前所做的所有工作都会丢失。没有重试机制,也没有恢复功能,你只能从头开始重新操作。

本文所要探讨的正是这个问题。

在这篇文章中,你将构建一个能够在实际环境中稳定运行的后台代理系统。你可以发送任务指令后离开,剩下的工作就会由该系统自动完成。

在底层技术层面,这个系统会运行在Kubernetes平台上,并通过KEDA实现自动扩展功能:当没有任务需要处理时,相关组件会自动关闭;而一旦有新任务到来,系统会立即重新启动这些组件。为了确保系统的稳定性和故障恢复能力,我们会使用Temporal技术;而对于代理的功能实现及工具管理,则会使用Composio框架。

本文涵盖的内容

在本教程中,你将学习如何构建一个能够在Kubernetes平台上运行、并根据实际工作负载自动扩展的后台代理系统。具体来说,你将会学到以下内容:

  • 什么是代理循环机制,以及如何使用Claude和Composio来构建这样的代理循环

  • 如何利用Temporal技术让那些需要长时间运行的代理任务在发生故障时仍能继续执行

  • 如何设计一个将任务调度与实际执行过程分离的中间层系统

  • 如何使用Docker将代理组件和中间层系统容器化

  • 如何将整个系统部署到本地的Kubernetes集群中

  • 如何根据队列中的任务数量,利用KEDA实现组件的自动扩展

虽然这些内容涉及一些高级技术概念,但只要你认真跟随学习,一定能够掌握很多实用的知识。

目录

整体架构规划

在开始研究代码之前,了解各个组成部分是如何协同工作的会很有帮助。

该系统被划分为两个独立的层面:一个是负责处理用户交互的控制层(Next.js前端),另一个则是执行实际任务的执行层。这两者永远不会直接相互调用,这种设计是经过深思熟虑的。

代理架构

以下是整个流程的详细步骤:

任务调度过程

当用户提交一个任务目标时,系统会首先进行预检查,以确保该用户所需的所有Composio工具连接都已正常启用。如果连接正常,系统就会将任务交给Temporal处理,然后立即返回结果——用户无需等待任何响应。

注意:你不需要等待代理程序的回复,所有操作都在后台自动完成。这与普通的聊天应用不同:你只需启动任务,之后就可以忽略它了。

任务执行过程

Temporal会将任务放入队列中,然后由相应的工作进程来处理这些任务。这些工作进程会运行代理程序,利用大语言模型来分析任务目标,Composio工具则会负责执行具体的操作,最终结果会被发送回Temporal。前端系统会自动向Gateway请求状态更新,这样用户就可以随时了解任务的进展情况。

扩展性设计

KEDA会监控Temporal队列中的任务数量,并根据待处理任务的多少来动态调整工作进程的数量。当队列为空时,工作进程会减少到零;当有新的任务进入队列时,系统会自动增加工作进程的数量。这种设计非常灵活且高效!

之所以控制层从不直接接触代理程序的代码,原因很简单:某些任务的执行可能需要几分钟甚至几小时的时间,而我们并不希望这类操作发生在API层。将它们分开处理,可以确保控制层始终保持高效的运行速度,无论后台发生了什么。

此外,该应用程序还支持基于Linux CronJob机制的任务调度功能,完全不需要人工干预。因此,预检查环节就显得尤为重要——因为如果在任务调度阶段就发现错误,及时停止操作总比让任务因工具连接问题而默默失败要好得多。

以上基本上就是我们应用程序的整体架构框架。简单来说:

  • Kubernetes (k8s):编排层

  • KEDA:自动扩展层

  • Temporal:持久性保障层

  • Composio:工具执行层

  • 你选择的大语言模型(在我们的案例中是Anthropic):推理层

项目设置指南

在开始开发之前,请确保你已经安装了以下软件:

  • Docker

  • k3d(用于运行本地的Kubernetes集群)

  • kubectl

  • Helm

  • Node.js以及Python 3.11及以上版本

您还需要AnthropicComposio的API密钥。

首先,克隆仓库:

git clone https://github.com/shricodev/kron-k8s-agent.git
cd kron-k8s-agent

接下来,创建集群、构建镜像并将它们加载到集群中:

# 创建本地集群
k3d cluster create agent --wait

# 构建两个镜像并将其导入集群
bash scripts/build-images.sh
bash scripts/load-images.sh

# 部署Temporal组件(创建temporary命名空间、Postgres数据库及相关服务器)
kubectl apply -f infra/k8s/temporal/temporal-dev.yaml

然后,创建命名空间以及所需的秘密文件。在应用程序部署之前,这些秘密文件必须是存在的,因为Pod会从中读取相应的配置信息:

# 创建agent命名空间
kubectl apply -f infra/k8s/00-namespace.yaml

# 创建包含所需密钥的secret文件
kubectl create secret generic agent-secrets -n agent \
 --from-literal=ANTHROPIC_API_KEY=sk-ant-... \
 --from-literal=COMPOSIO_API_KEY=ak_... \
 --from-literal=JWT_SECRET=$(openssl rand -hex 32)

完成上述步骤后,部署应用程序并配置自动扩展功能:

# 安装KEDA,并应用相应的扩展配置
helm repo add kedacore https://kedacore.github.io/charts
helm repo update
helm install keda kedacore/keda -n keda --create-namespace --wait

kubectl apply -f infra/k8s/40-keda-worker-scaledobject.yaml -f infra/k8s/41-gateway-hpa.yaml

最后,将gateway端口转发到本地主机,以便从您的机器上访问它:

# 将gateway端口转发到localhost:8000
kubectl -n agent port-forward svc/gateway 8000:8000

将前端应用程序的访问地址设置为http://localhost:8000,这样就可以开始执行任务了。

注意:对于这个项目来说,您不需要修改apps/worker/apps/gateway/目录下的.env文件。这些文件仅用于在您的机器上直接运行应用程序。

在集群中,Pod会从ConfigMap中获取配置信息,而您刚刚创建的秘密文件也会在运行时作为环境变量被加载到Pod中。

应用程序中的核心组件

这个项目规模非常大。如果从头开始逐一分析每一行代码,那么阅读这些代码将会花费数小时的时间。因此,我将重点介绍那些真正让系统正常运行的核心组件。

代理循环机制

代理循环机制是整个系统的核心。每当有任务被分配执行时,就会启动这个循环流程。

虽然实现过程可能比较复杂,但其基本原理其实很简单:给大语言模型设定一个目标,让它进行推理、调用相应的工具来处理任务,然后将结果反馈回来,重复这个过程直到任务完成为止。
async def run_agent(
    user_id: str,
    goal: str,
    toolkit_hint: str | None = None,
) -> dict:

需要三样东西:用户的ID(这样Composio就能知道应该使用哪些已连接的账户)、任务目标本身,以及一个可选的工具包提示。这个提示可以帮助确定应该加载哪些工具。如果任务明确是与Gmail相关的操作,那么传递“gmail”这个参数就可以避免加载用户所连接的所有工具。

在循环开始之前,系统会创建一个Composio会话,并为该用户获取所需的工具:

session = await create_session(user_id, toolkit_hint);
tools = await get_tools(session);

然后实际的循环才会开始运行:

for turn in range(1, settings.max_iterations + 1):
    response = await client.messages.create(
        model 설정.model,
        max_tokens 设置.max_tokens,
        system=SYSTEM_PROMPT,
        tools=tools,
        messages=messages,
    )

if response.stop_reason == "end_turn":
    return finish("completed", _extract_text(response.content))

if response.stop_reason == "tool_use":
    # 执行相应的工具,将结果添加到对话历史中,然后继续循环
    # ...

在每一轮循环中,Claude会查看任务目标以及之前的对话记录,然后决定接下来该执行什么操作。stop_reason字段可以说明当前发生了什么:

  • "end_turn":表示Claude已经完成了任务,正在返回最终答案。

  • "tool_use":表示Claude需要调用一个或多个工具。此时循环会通过Composio执行这些工具,将结果添加到对话历史中,然后继续下一轮循环。

如果某个工具的调用失败了,错误信息会被反馈回对话流程中,而不会导致整个程序崩溃:

except ComposioError as exc:
    tool_result_blocks = [
        {
            "type": "tool_result",
            "tool_use_id": block.id,
            "content": f"工具执行失败:{exc}",
            "is_error": True,
        }
    ] for block in tool_useBlocks

循环最多会运行max_iterations轮,默认值为20,这个数值在apps/worker/agent/config.py文件中定义。如果循环达到了最大次数却仍未完成任务,系统会返回maxiterations_reached状态码,而不会无限期地等待下去。

每次调用run_agent函数时,都会返回一个结构相同的字典,其中包含状态信息、任务总结以及所执行的各个步骤。这种统一的格式使得Temporal能够方便地存储和查看处理结果,我们接下来会详细讨论这一点。

利用Temporal让系统具有持久性

单独来看,这个代理循环存在一个严重的问题:如果工作进程在执行一个包含15个步骤的任务时发生崩溃,那么之前所有的进度都会丢失,用户将无法知道任务进行到了哪一步,因此必须从头开始重新执行。

工作流与活动

Temporal将代码分成了两个不同的部分:工作流和活动。

工作流定义了应该发生什么以及这些操作的执行顺序,但它本身并不会实际执行任何操作,也不会进行任何网络请求。正是这种设计使得Temporal能够在系统崩溃后安全地重新执行工作流,从而恢复之前的状态。

活动才是真正执行任务的地方。网络请求、大语言模型的调用、工具的执行——所有这些操作都发生在活动中。活动可能会失败,但可以独立地重新尝试,而不会影响工作流程的状态。

在这个项目中,`apps/worker/workflows.py`文件中的`AgentWorkflow`就是工作流程,而`apps/worker/activities.py`文件中的`run_agent_activity`则是包裹代理循环的活动。

工作流程

@workflow.defn(name="AgentWorkflow")
class AgentWorkflow:
    def init(self) -> None:
        self._status: str = "running"
        self._result: dict | None = None

当有任务被分配时,Temporal会启动这个工作流程。它会设置重试策略,并将所有的实际操作交给相应的活动来执行:

retry = RetryPolicy(
  (initial_interval = timedelta(seconds=2)),
  (backoff_coefficient = 2.0),
  (maximum_interval = timedeltaminutes=2)),
  (maximum_attempts = 5),
  (non_retryable_error_types = ["ValueError", "AuthenticationError"])
);

result = await workflow.execute_activity(
  run_agent_activity,
  (args = [user_id, goal, toolkit_hint]),
  (start_to_close_timeout = timedelta(minutes=30)),
  (retry_policy = retry),
);

将`start_to_close_timeout`设置为30分钟,并允许最多进行5次尝试,是因为代理任务确实可能需要这么长的时间来完成。根据你的工作需求,你可以调整这个时间限制。

查询工作流程的状态

Temporal的一个优点就是它提供了便捷的查询功能。工作流程能够直接暴露其当前状态和结果,而无需额外的数据库来记录这些信息:

@workflow.query
def status(self) -> str:
    return self._status

@workflow.query
def result(self) -> dict | None:
    return self._result

前端可以通过询问Temporal“工作流程X的状态是什么?”来实时获取答案。这就是前端轮询机制的工作原理。

活动

这个活动的实现非常简单,它只是包裹了`run_agent`函数,并记录下执行过程中发生的一切:

@activity.defn(name="run_agent_activity")
async def run_agent_activity(user_id: str, goal: str, toolkit_hint: str | None) -> dict:
    result = await run_agent(user_id=user_id, goal=goal, toolkit_hint=toolkit_hint)
    return result

任何涉及网络操作的步骤都发生在活动中,而不是工作流程中。这种分离机制正是Temporal能够高效运行的关键。

工作进程

工作进程负责注册所有任务,并开始轮询队列中的待处理任务:

worker = Worker(
            client,
            task_queue=temporal_settings.temporal_task_queue,
            workflows=[AgentWorkflow],
            activities=[run_agent_activity, notify_activity],
            max_concurrentactivities=5,
)

它与Temporal相连,负责注册工作流及相关活动,并监听任务队列。当有任务到达时,它会立即取出这些任务并执行它们。这个过程是在Kubernetes容器内部运行的,而后来KEDA也正是根据队列中的任务数量来调整其运行规模的。

代理网关

该网关是一个FastAPI应用程序,它位于用户与Temporal之间,负责处理任务的分配、状态查询以及取消操作。重要的是,它本身从不执行任何代理代码,它的唯一职责就是与Temporal进行通信并迅速返回结果。

任务分配

所有操作都是从apps/gateway/routes/tasks.py中的这个分配端点开始的:

  @router.post("/dispatch", response_model=DispatchResponse)
  async def dispatch(
      body: DispatchRequest,
      user_id: str = Depends(current_user_id),
  ) -> DispatchResponse:
      if body.toolkit:
          access = await check_toolkit_access(user_id, body/toolkit)
          if not access["allowed"]:
              raise HTTPException(
                  status_code=status.HTTP_409_CONFLICT,
                  detail={
                      "error": "toolkit_not_connected",
                      "connect_url": access["connect_url"],
                  },
              )

      workflow_id = f"agent-{user_id}-{uuid.uuid4().hex[:8]}"
      await client.start_workflow(
          WORKFLOW_NAME,
          args=[user_id, bodygoal, body.toolkit],
          id=workflow_id,
          task_queuesettings.temporal_task_queue,
          cron_schedule=body.schedule or "",
      )
      return DispatchResponse(workflow_id=workflow_id, status="dispatched")

请求中包含三个字段:任务目标、可选的工具包名称(这样可以避免浪费时间去确定工具包的具体名称),以及可选的定时调度信息。该端点会先进行预检查,然后将任务交给Temporal处理,并立即返回工作流ID。用户无需等待代理程序完成执行即可。

请注意cron_schedule这个字段。如果传递一个标准的cron表达式,那么这个任务就会被设置为定期执行的作业。Temporal会自动负责调度工作,因此不需要额外的基础设施。

预检查

预检查功能实现在apps/gateway/routes/preflight.py中。在任务被分配之前,这个机制会验证用户是否已经在Composio环境中连接了所需的工具包:

  async def check_toolkit_access(user_id: str, toolkit_hint: str | None) -> dict:
      if not toolkit_hint:
          return {"allowed": True}

      connected = await asyncio.to_thread(
          _has_active_account, composio, user_id, toolkit_hint
      )
      if connected:
          return {"allowed": True}

      connect_url = await asyncio.to_thread(
          _connect_link, composio, user_id, toolkit_hint
      )
      return {"allowed": False, "toolkit": toolkit_hint, "connect_url": connect_url}

如果连接缺失,网关会返回一个connect_url,这样用户就可以立即授权该应用程序。这对于定时任务来说尤为重要。

检查状态

一旦任务开始运行,前端就会向这个端点发送请求:

  @router.get({"workflow_id"}, response_model=TaskStatusResponse)
  async def get_task(workflow_id: str, ...) -> TaskStatusResponse:
      if not _owns(workflow_id, user_id):
          raise HTTPException(status_code=404, detail="任务未找到")

      handle = client.get_workflow_handle(workflow_id, run_id=run_id)
      agent_status = await handle.query("status")

      if desc.status == WorkflowExecutionStatus.COMPLETED:
          result = await handle.query("result")

      return TaskStatusResponse(...)

statusresult这些数据直接来自Temporal的查询处理程序;系统中并没有单独的状态表,也不会在每个步骤执行后进行数据库写入操作。Temporal才是真正的数据来源

容器化应用程序

网关和worker被分别打包成两个独立的镜像。在运行时,它们之间不会共享任何资源——这正是我们所需要的,因为它们可以独立扩展,并且承担着不同的职责。

这两个Dockerfile都保存在/docker目录中,而且都是使用多阶段构建流程来创建的。

为什么采用多阶段构建?🤔

在构建阶段,会安装编译器和构建工具,用于编译Python包;而在运行阶段,只需要加载已经编译完成的依赖项以及应用程序代码即可。因此,没有必要将构建工具也包含在最终生成的镜像中。

网关镜像

FROM python:3.14-slim-bookworm AS builder

RUN python -m venv /opt/venv
ENV PATH="/opt/venv/bin:$PATH"

COPY requirements.txt .
RUN pip install -r requirements.txt

FROM python:3.14-slim-bookworm AS runtime

ENV PYTHONUNBUFFERED=1 PATH="/opt/venv/bin:$PATH"

RUN useradd --create-home --uid 10001 app
WORKDIR /app

COPY --from=builder /opt/venv /opt/venv
COPY . .

USER app
EXPOSE 8000
CMD ["python", "main.py"]

在运行阶段,系统会从构建阶段复制venv环境,然后以非root用户身份启动FastAPI应用程序。与以往一样,我们始终以非root用户身份来运行程序(做一个负责任的开发者,遵守安全规范吧😺)。

worker镜像

worker的Dockerfile与网关的Dockerfile几乎相同,只是有一个小区别:

# 使用procps命令可以让Kubernetes的存活检查机制通过pgrep来检测进程是否仍在运行
RUN apt-get update \
 && apt-get install -y --no-install-recommends procps \
 && rm -rf /var/lib/apt/lists/*

这个差异在于它安装了procps工具,这样Kubernetes的存活检查机制就可以使用pgrep来确认进程是否仍然活跃。

构建与加载图像

scripts/build-images.sh中的构建脚本会生成这两种图像,其中每个应用程序目录都会被作为构建环境来使用:

docker build \
 -f "$ROOT/docker/Dockerfilegateway" \
    -t "agent-gateway:$TAG" \
 "$ROOT/apps/gateway"

docker build \
 -f "$ROOT/docker/Dockerfile.worker" \
    -t "agent-worker:$TAG" \
 "$ROOT/apps/worker"

这些Dockerfile文件保存在docker/目录下,但每个Dockerfile都是针对相应的应用程序目录进行构建的。实际上,COPY . .命令就是用来复制这些文件到对应的构建环境中的。

构建完成后,在这些图像能够在集群中运行之前,还需要执行另一个步骤。由于本地k3d集群无法访问Docker守护进程,因此本地构建的图像也无法被该集群使用。你必须手动将这些图像导入到集群中:

k3d image import "agent-gateway:dev" "agent-worker:dev" -c agent

scripts/load-images.sh脚本会帮你完成这个操作。一旦导入完成,集群就可以像平常一样拉取这些图像,然后你的Pod们就会启动起来。🎊

部署到Kubernetes集群

既然图像已经构建完成并被加载到了集群中,下一步就是应用相应的配置文件了。整个配置过程分为两个层次:第一层是核心应用程序的配置,包括namespaceconfigdeployments等配置文件;第二层则是自动扩展功能的配置,这部分内容将在下一节中介绍。

配置与密钥管理

那些不涉及敏感信息的配置数据会被保存在ConfigMap文件中,其路径为infra/k8s/01-configmap.yaml

data:
  MODEL: "claude-opus-4-8"
  MAX_TOKENS: "4096"
  MAX_ITERATIONS: "20"
  TEMPORAL_HOST: "temporal-frontend.temporal.svc.cluster.local:7233"
  TEMPORAL_TASK_QUEUE: "agent-tasks"
  GATEWAY_HOST: "0.0.0.0"
  GATEWAY_PORT: "8000"

这里的TEMPORAL_HOST地址就是从这里获取的。需要注意的是,这个地址使用了集群内部的DNS名称来指向temporal命名空间中的temporal-frontend服务。由于网关和worker节点都运行在集群内部,所以使用这样的地址是完全合适的。

API密钥则被保存在Kubernetes的Secret对象中,你需要手动创建这些密钥,并且永远不要将它们提交到Git仓库中。ConfigMapSecret文件都会通过envFrom指令被设置为每个部署任务的环境变量。

网关服务的部署配置

spec:
replicas: 2
containers:
- name: gateway
image: agent-gateway:dev
imagePullPolicy: IfNotPresent
command:
[
"python",
"-m",
"uvicorn",
"main:/app",
"--host",
"0.0.0.0",
--port,
"8000",
]
readinessProbe:
httpGet:
path: /health
port: 8000
resources:
requests:
cpu: 100m
memory: 256Mi
limits:
cpu: 500m
memory: 512Mi

有几点需要注意。imagePullPolicy: IfNotPresent这一设置告诉Kubernetes使用本地加载的镜像,而不是尝试从注册中心拉取镜像。启动命令绕过了`main.py`在本地直接运行时使用的`reload=True`选项。在Kubernetes向该Pod发送任何请求之前, readiness probe会先访问/health路径,因此只有当Pod真正启动运行后,网关才会开始接收请求。

此外,网关还配置了一个ClusterIP类型的Service,这样其他Pod以及端口转发功能才能访问到它:

apiVersion: v1
kind: Service
metadata:
  name: gateway
spec:
  type: ClusterIP
  ports:
    - port: 8000
      targetPort: 8000

工作节点部署

# 该工作节点部署仅用于与Temporal进行通信
spec:
  replicas: 1
  containers:
    - name: worker
      image: agent-worker:dev
      livenessProbe:
        exec:
          command: ["pgrep", "-f", "worker.py"]
        initialDelaySeconds: 15
        periodSeconds: 20
      resources:
        requests:
          cpu: 250m
          memory: 512Mi
        limits:
          cpu: "1"
          memory: 1Gi

这个工作节点并没有配置Service,因此它不会接收任何外部连接。它的作用仅仅是与Temporal进行通信并获取任务,所以不需要能够接收外部请求。这就是为什么在Dockerfile中会安装procps工具的原因。

此外,这个工作节点分配到的资源也比网关多。由于它是负责执行大语言模型调用和各种工具程序的,因此需要更多的资源。根据实际需求,你可以对它的资源配置进行限制。

整体部署流程

位于scripts/deploy.sh文件中的部署脚本会按照正确的顺序依次应用各层配置:

kubectl apply -f "$K8S/00-namespace.yaml"
kubectl apply -f "$K8S/01-configmap.yaml"
kubectl apply -f "$K8S/10-gateway-deployment.yaml"
kubectl apply -f "$K8S/20-worker-deployment.yaml"

这里的顺序非常重要:必须先创建命名空间,之后才能在其中创建其他资源;而ConfigMap也必须在相关Pod启动之前就配置完成。

使用KEDA实现自动扩展

Kubernetes会根据CPU或内存的使用情况来自动扩展Pod的数量。对于处理HTTP请求的网关来说,这种扩展方式确实适用,因为它的CPU消耗确实与接收到的请求数量成正比。但对于工作节点而言,这种方式并不合适。

当没有任务等待执行时,工作节点会处于完全空闲状态,不会浪费CPU资源。一旦有任务到达,它就会立即开始处理这些任务。实际上,我们应该根据任务队列的深度来决定是否需要扩展工作节点的数量——也就是说,应该根据有多少任务在等待被处理来调整资源配置。

这就是KEDA的作用:它会监控诸如队列长度、消息数量之类的外部指标,在接收到这些数据后就会相应地调整部署方案。

扩展工作节点

infra/k8s/40-keda-worker-scaledobject.yaml文件中定义的ScaledObject对象正是KEDA用来监控和调整资源配置的对象。

spec:
  scaleTargetRef:
    name: worker
  minReplicaCount: 0
  maxReplicaCount: 10
  cooldownPeriod: 120
  triggers:
    - type: temporal
      metadata:
        endpoint: temporal-frontend.temporal.svc.cluster.local:7233
        namespace: default
        taskQueue: agent-tasks
        queueTypes: "workflow,activity"
        targetQueueSize: "5"
        activationTargetQueueSize: "0"

让我们来了解一下其中一些重要的字段:

  • minReplicaCount:值为0意味着KEDA可以将其副本数量调整为零,而标准的HPA则无法做到这一点。当队列为空时,所有的工作节点都会被关闭;这样一来,在系统处于空闲状态时,就无需消耗任何资源。

  • activationTargetQueueSize:值为“0”表示只要队列中有一项任务,KEDA就会立即重新启动相应的部署资源。如果没有任务,就不会有工作节点被启动;而当有任务进入队列时,系统会立即开始分配资源。

  • targetQueueSize:值为“5”意味着对于每5个待处理的任务,KEDA应该启动1个工作节点;因此,如果队列中有10项任务,那么就会启动2个工作节点。

  • cooldownPeriod:值为120秒,这意味着在队列中的所有任务都被处理完毕之后,KEDA才会开始减少工作节点的数量。这个120秒的延迟机制可以确保系统的平稳运行。

  • queueTypes:值为“workflow,activity”表示KEDA会监控这两种类型的队列;如果没有设置这一参数,KEDA就无法全面了解系统中待处理的任务情况。

注意:使用Temporal scaler功能时,必须确保KEDA的版本在2.17或以上。因此,在使用Helm进行部署时,请确保使用的KEDA版本符合要求。

扩展网关的功能

网关部分是通过配置infra/k8s/41-gateway-hpa.yaml文件来使用基于CPU资源的HPA机制进行扩展的:

spec:
  minReplicas: 2
  maxReplicas: 6
  metrics:
    - type: Resource
      resource:
        name: cpu
        target:
          type: Utilization
          averageUtilization: 60

在这里,选择CPU作为监控指标是合理的,因为网关的实际处理能力与接收到的HTTP请求数量成正比。配置至少2个工作节点可以确保API端在启动时不会出现延迟。

安装KEDA

在应用ScaledObject配置文件之前,需要先通过Helm来安装KEDA:

helm install keda kedacore/keda -n keda --create-namespace --wait
kubectl apply -f infra/k8s/40-keda-worker-scaledobject.yaml -f infra/k8s/41-gateway-hpa.yaml

完成这些配置后,系统就可以正常运行了。提交一个任务,观察工作节点是如何被启动的;当队列中的任务被处理完毕时,工作节点也会自动关闭——这就是KEDA自动扩展机制的全部功能。

就这样,你就得到了一个极其可靠、能够自动扩展的人工智能代理,你可以随时安排它来执行任务。这真是太棒了!😎

代理在行动中

下面是一个关于KEDA在Kubernetes集群中运行的快速演示:

结论

在实际环境中运行人工智能代理与开发这些代理是完全不同的两回事。我试图重点探讨这一差异,希望这些内容能为你提供关于如何考虑系统的耐用性及可扩展性的参考意见。同时,我也希望这些内容能帮助你构建或理解那些与传统的人工智能聊天应用程序不同的系统。

如果你们从事人工智能代理的开发工作或整体上的DevOps相关工作,那么“Temporal”与“KEDA”的组合确实是非常值得学习和深入了解的技术。„Temporal“有助于解决人工智能代理面临的最大问题——即系统的耐用性,而“KEDA”则能确保在凌晨2点等系统没有正在运行的时候,不会让闲置的资源继续消耗资源。这种扩展能力并非基于单纯的CPU性能进行扩展,而是根据实际发生的事件来动态调整资源的分配,这一点非常重要。

在这个基础上,还有很多进一步发展的空间。你可以将开发环境中使用的JWT替换为更合适的OIDC认证机制,或者通过“Composio”这个工具包来扩展该框架的功能,从而使其能够更好地支持你的各种工作流程。

基础已经搭建好了,后续的工作只需要在此基础上进行进一步的完善即可。

完整的源代码可以在这里找到:shricodev/kron-k8s-agent

Comments are closed.