一周前,我看到了这样一条推文:

推文图片:@omaroubari_ 提问“有没有人尝试过使用Mastra和LangChain来进行代理协调?哪个更好?”

我当时刚刚发布了SupportMesh,这是一个基于Mastra构建的多租户AI支持平台,因此我对它的实际使用效果有着切身的了解。

我觉得.dowhile()循环、类型化的步骤结构,以及createWorkflow将协调逻辑集中在一起的设计很棒。不过我不喜欢token带来的开销:无论是否真的需要使用工具,每个代理步骤都会初始化Mastra的工具循环管理器,这样一来,在一个包含四个步骤的流程中,就会产生几秒钟的额外延迟,而且每次运行还会消耗数千个额外的token。

同时,我还在为另一个项目研究LangChain。它的设计方式与Mastra完全不同——LangChain不是使用类型化的步骤契约来构建工作流,而是通过有向图来实现功能,其中节点代表普通的异步函数,状态则由一个共享对象来管理。

LangChain承诺能够实现更高效的执行,并且能更精确地控制每次模型调用所使用的具体参数。鉴于我在使用Mastra时遇到的token开销问题,这些特点正是我想要深入了解的。

因此,我没有仅仅根据文档或主观感觉来选择其中一个工具,而是在两种技术上分别构建了相同的流程,并对所有数据进行了详细的测量。同样的五步研究与合成流程被重复进行了两次,每一环节都被详细记录下来:每一步消耗的token数量、延迟时间、在每个阶段发送给Claude的具体提示内容、Tavily搜索得到的原始结果,以及一个真正能给出不同评分的生产级评估系统。

之后,我还使用Convex和Next.js搭建了一个实时网页仪表板,这样大家就可以自己运行这些流程,并查看这两个框架在实现相同目标的过程中所做出的所有决策。

Mastra与LangChain的对比仪表板,同时展示了两种技术的执行结果:Mastra在25.2秒内使用了9,846个token,得分为9/10;而LangChain在19.8秒内使用了7,875个token,也得分为8/10。测试主题是“在生产环境中运行AI代理的实际成本是多少?”

目录

先决条件

要想自行完成这些操作并运行相关测试,你需要准备以下四样东西:

  • Node.js 22或更高版本:这些测试流程所使用的包依赖于现代TypeScript功能,因此需要较新的Node.js版本。

  • Anthropic的API密钥:你可以在console.anthropic.com获取该密钥。Claude Haiku 4.5的成本相当低廉,因此进行十几次测试所花费的费用仅为几美分而已。

  • Tavily的API密钥:你可以在tavily.com获取该密钥。免费账户每月可使用1,000次搜索功能,这完全足以满足重复进行测试的需求。

  • Convex账户:你可以在convex.dev注册账户。免费账户可以满足所有需求。

准备好这些工具后,本文末尾的设置指南会详细说明每样工具的具体使用方法。

我们使用的工具

在开始构建相关系统之前,了解我所使用的各种工具的功能及其被选中的原因会很有帮助。如果你已经熟悉这些工具,可以直接跳过这一部分。

Mastra是一个以TypeScript为基础的框架,专门用于开发基于人工智能的应用程序和智能体。它的核心理念是:你可以通过定义带有类型化的输入输出结构的各个步骤,将这些步骤串联成工作流程,而该框架会负责处理这些步骤之间的数据流转。Mastra对代码结构有着明确的要求,这些要求有时会成为开发中的优势,有时则会成为限制因素,具体取决于你所要构建的应用类型。

LangChain是目前最常被用于开发大语言模型应用程序的框架之一。它最初是用Python开发的,后来也推出了TypeScript版本。

在智能体协调方面,LangGraph是至关重要的工具。它是LangChain基于图结构的执行层:与使用类型化步骤契约的工作流程不同,LangGraph是通过有向图来组织各个操作。其中,节点代表异步函数,状态则是一个所有节点都会读取和写入的共享对象,而节点之间的逻辑关系则由边来控制。

Claude Haiku 4.5是驱动所有智能体运行的模型。这是Anthropic最快且最具成本效益的模型,因此非常适合用于本次测试。

Tavily是一个专为人工智能智能体设计的网页搜索API。与普通的搜索API不同,它返回的结构化结果中包含了相关性评分和内容片段,这些数据可以直接用于模型输入。免费账户的使用权限已经足够满足进行本次测试的需求,无需支付任何费用。

我选择使用Tavily,是因为它提供了简洁的TypeScript SDK,既可以在Mastra框架中使用,也可以直接与LangChain节点配合使用,而无需额外的适配层。此外,它的搜索结果质量稳定,因此两个测试流程都能获得相同质量的输入数据。

Convex是一种实时数据库,它使用了React的钩子函数useQuery,每当底层数据发生变化时,该钩子会自动重新渲染你的组件。无需进行轮询,也无需配置WebSocket或手动同步状态。当两个处理流程在执行过程中都在写入步骤数据时,页面就会自动更新。

Next.js则是用于构建仪表盘的Web框架。它包含了应用路由器、用于处理流程执行的API路由,以及适合在服务器端使用的组件。

为什么选择这种处理流程

简单的对比并不能让我了解到什么有用的信息,因为只有在实际使用这些框架时,它们之间的差异才会显现出来。

我最终选择的这个处理流程包含五个步骤:

步骤
  ↓
1. 研究      (通过Tavily进行网络搜索,获取5个相关结果)
2. 分析      (提取5个关键发现、3个主题及1个核心论点)
3. 撰写报告   (起草一份约400字的结构化报告)
4> 评估质量  (对草稿进行评分,并提供具体反馈)
5> 修订      (如果评分低于7分,则进行修改;如果通过评估或经过3次迭代后,即可完成最终版本)

我选择这些步骤,是因为它们能够充分体现不同框架的特点。

研究环节需要使用具体的工具来执行相关操作,而Mastraka的Agent抽象层在此环节发挥了重要作用;分析环节要求输出结构化的JSON数据,这可以用来测试各种框架在输出格式方面的表现;撰写报告环节有严格的内容要求,这些要求主要是通过提示设计来实现的;评估质量环节则需要进行逻辑推理,并同时生成结构化的JSON数据,这一过程其实比听起来要复杂得多;最后,修订环节则体现了这两种框架之间最根本的差异:它们在处理条件循环时的方式不同。

综合来看,这些步骤涵盖了在实际开发中使用代理框架时可能会遇到的大部分场景:工具调用、结构化输出、多步骤流程协调、质量评估以及反馈机制。

项目结构

所有代码都存储在一个单一的monorepo中,而且通过npm工作区来管理这些代码。这意味着所有的包都在根目录下共享同一个node_modules文件夹,并且可以互相直接导入。

mastra-vs-langchain/
├── packages/
│   ├── mastra-pipeline/          # Mastra实现代码
│   ├── langchain-pipeline/       # LangChain/LangGraph实现代码
│   ├── web/                      # Next.js 16应用框架及仪表盘代码
│   └── shared/                   # 公共的TypeScript类型定义文件
├── convex/                       # 实时后端服务代码
└── package.json                  # 工作区根目录配置文件

在所有共享包中,最重要的部分就是PipelineCallbacks接口,这两个处理流程的实现都必须满足这个接口的要求。正是这个接口使得仪表盘能够接收到来自任意一个框架的实时事件信息:比如步骤开始、步骤完成、令牌计数结果,以及Tavily搜索的结果——而无需了解关于Mastraka或LangChain的具体细节。

// packages/shared/src/types.ts
export interface PipelineCallbacks {
  onPipelineStart: () => Promise;
  onPipelineComplete: (id: string, data: PipelineCompleteData) => Promise;
  onPipelineError: (id: string, error: string) => Promise;
  step: {
    onStepStart: (stepName: string, iteration: number, input: string) => Promise;
    onStepComplete: (stepId: string, data: StepCompleteData) => Promise;
    onStepError: (stepId: string, error: string) => Promise;
  };
}

所有的写入操作、实时日志记录以及代币数量的变化都会通过这个接口进行处理。未来如果想要在基准测试中添加新的功能框架,只需要实现这个接口并将其插入到API路由中即可,其他部分无需做任何修改。

构建Mastra管道

如果你之前没有使用过Mastra,那么它的核心工作原理是这样的:你需要为每个步骤定义明确的输入和输出结构,然后将这些步骤串联起来形成一条工作流程,而Mastra会负责管理这些步骤之间的数据流动。

该框架对数据结构有明确的要求,但这种结构能够确保整个管道中的数据类型安全性,并且使协调逻辑更加易于理解。

搜索工具

Mastra的工具是通过`createTool`函数创建的,这个函数会接收一个Zod格式的输入结构以及一个可以直接处理经过验证后的输入数据的`execute`函数:

// packages/mastra-pipeline/src/tools/search.ts
import { createTool } from "@mastra/core/tools";
import { z } from "zod";
import { tavily } from "@tavily/core";

const client = tavily({ apiKey: process.env.TAVILY_API_KEY! });

export let lastTavilyCapture: { query: string; results: any[] } = {
query: "",
results: [],
};

export function resetTavilyCapture() {
lastTavilyCapture = { query: "", results: [] };
}

export const searchTool = createTool({
id: "web-search",
description: "在网络上搜索有关某个主题的信息",
inputSchema: z.object({ query: z.string() }),
execute: async ({ query }) => {
lastTavilyCapture = { query, results: [] };
const results = await client.search(query, {
maxResults: 5,
searchDepth: "basic",
});
lastTavilyCaptureresults = results.results;
return { results: results(results);
},
});

`lastTavilyCapture`这个模块级别的变量其实是一种为了解决实际开发中遇到的限制而采取的临时解决方案。Mastra的工具执行过程是在代理程序的内部工具循环中进行的,而这个内部循环位于工作流程步骤的下一层。

由于我需要将Tavily搜索的结果保存下来以便在仪表板上展示,让用户能够看到每次搜索的实际URL以及相关度评分,但如果通过代理程序的执行上下文来传递这些数据,就需要对Mastra的内部代码进行修改。而在模块级别进行数据捕获,并在每个搜索步骤开始时调用`resetTavilyCapture()`函数,虽然这种方式不够优雅,但确实非常可靠,而且能够有效防止之前搜索的结果影响到当前的搜索过程。

代理程序

Mastra流程中的每个步骤都是作为独立的代理程序实例来运行的。如果你刚开始使用Mastra,需要了解的一点是:这些代理程序不仅需要有一个名称,还需要有一个明确的标识符字段。如果你忽略了这个要求,TypeScript会抛出一个关于“缺少必备字段”的错误,但这个错误实际上并不能指明真正的问题所在:

// packages/mastra-pipeline/src/agents/researcher.ts
export const researcherAgent = new Agent({
  name: "Researcher",
  id: "researcher",           // v1.41版本起为必备字段——很容易被忽略
  instructions: `你是一名研究代理程序。当收到一个主题时,请使用网络搜索工具找到5个相关的结果。需要返回所有原始的搜索结果,包括标题、URL以及内容片段,并将它们格式化为字符串形式。`,
  model: anthropic("claude-haiku-4-5"),
  tools: { searchTool },
});

而“写作代理程序”则将其所有的内容要求直接写在指令中,而不是通过单独的验证环节来处理这些要求。这样,所有的约束条件都集中在一个显而易见的地方,当批评者指出草稿违反了哪些具体要求时,这种设计就显得非常有用:

// packages/mastra-pipeline/src/agents/writer.ts
export const writerAgent = new Agent({
  name: "Writer",
  id: "writer",
  instructions: `你是一名为技术类读者撰稿的研究分析师。

严格遵循的要求:
- 开篇句必须明确指出研究得出的具体结论。
- 绝不要以“X正变得越来越重要”这样的句子开头。
- 每一段文字都必须只阐述一个观点,并首先明确说明这个观点,然后用具体的证据来支持它。
- 必须提到具体的工具、框架、公司名称、数字以及日期。
- 结论部分必须提出明确的建议或预测,而不能重复引言中的内容。
- 最终稿的字数应在350到450字之间。

禁止使用的表达方式:
“值得注意的是……”、“值得强调的是……”、“各组织必须考虑……”、“总之……”、“展望未来……”、“发展迅速的环境……”;或者任何可以用替换主题后仍然成立的句子。
  model: anthropic("claude-haiku-4-5"),
});

为什么“写作步骤”和“批评步骤”要放在同一个流程中

在实现Mastra的过程中,我做了一个与大多数教程不同的架构决策,了解其中的原因是很重要的。

Mastra中的.dowhile()结构会重复执行某个步骤,直到满足某个条件为止。当只需要重复执行一个操作时,这种设计确实很简洁。但修改流程实际上需要两个步骤:首先是写作步骤,然后是批评步骤。你可以将这两个步骤合并成一个步骤,也可以构建一个嵌套的工作流程,让内部的工作流程同时包含这两个步骤。

然而,在这种情况下,嵌套工作流程只会增加复杂性,并不会带来任何实际的好处,因此“写作步骤”和“批评步骤”就被放在了writeCriticStep这个流程中。该流程首先会运行写作程序,然后立即对草稿进行批评分析,最后返回一个包含草稿内容及评分结果的组合输出。

const writeCriticStep = createStep({
id: "write-critic",
inputSchema: z.object({
topic: z.string(),
research: z.string(),
analysis: z.string(),
keyFindings: z.array(z.string()),
mainThemes: z.array(z.string),
centralArgument: z.string(),
draft: z.string().optional(), // 在第一次迭代后填充内容
score: z.number().optional(), // 在第一次迭代后填充内容
feedback: z.string().optional(), // 在第一次迭代后填充内容
iterations: z.number().optional(),
}),
outputSchema: z.object({
// ... 所有输入字段,以及draft、score、feedback、iterations这些字段
}),
execute: async ({ inputData }) => {
const iteration = (inputData.iterations ?? 0) + 1;

// 写作阶段
let writerPrompt = `主题:\({inputData.topic}\n\n研究内容:\n\{inputData.research}\n\n分析结果:\n${inputData.analysis}`;
if (inputData.feedback && inputData.draft) {
// 在进行修改时,作者会看到之前的草稿以及具体的反馈意见
writerPrompt += `\n\n之前的草稿:\n\({inputData.draft}\n\n反馈意见:\n\{inputData_feedback}\)n`;
}

const writeStepId = await callbacks.step.onStepStart("write", iteration, writerPrompt.slice(0, 500));
const writerResult = await writerAgent.generate(writerPrompt);
const draft = writerResult.text;
await callbacks.step.onStepComplete(writeStepId, { output: draft, /* 令牌数据 */ });

// 评论阶段:在写作阶段结束后立即开始,使用相同的草稿
const criticPrompt = `研究内容:\n\{inputData.research}\n\n分析结果:\n\{inputData.analysis}\n\n草稿:\n${draft}`;
const criticStepId = await callbacks.step.onStepStart("critic", iteration, draft.slice(0, 500));
const criticResult = await criticAgent.generate(criticPrompt);
const parsed = extractJson(criticResult.text);
const score = parsed?.score ?? 4;
const feedback = parsed?.feedback ?? "分数解析失败";
await callbacks.step.onStepComplete(criticStepId, { output: criticResult.text, criticScore: score });

return { ...inputData, draft, score, feedback, iterations: iteration };
},
});

.dowhile()条件用于判断是否需要再次循环。它将上一次writeCriticStep的执行结果作为inputData参数接收,因此可以直接获取评分结果:

const workflow = createWorkflow({
id: `research-pipeline-${Date.now()}`, // 添加时间戳可以避免同时运行时出现冲突
inputSchema: z.object({ topic: z.string() }),
})
.then(researchStep)
.then(analysisStep)
.dowhile(
writeCriticStep,
async ({ inputData }) => inputData.score < 7 && inputData.iterations < 3 ) .commit();

在工作流ID中添加Date.now()这个时间戳,是为了确保当两个任务同时运行时,系统能够为每个任务生成唯一的实例,从而避免冲突。

令牌捕获

在任何调用`agent.generate()`之后,使用数据都会保存在结果对象中。由于Mastraka不同版本的实现细节存在差异,因此检查所有可能的字段名称才是稳妥的做法:

const inputTokens =
  (result as any).usage?.promptTokens ??
  (result as any).usage?.inputTokens ??
  0;
const outputTokens =
  (result as any).usage?.completionTokens ??
  (result as any).usage?.outputTokens ??
  0;

构建LangChain管道

LangChain/LangGraph采用完全不同的思维模式来解决相同的问题。

虽然Mastraka提供了具有明确定义的步骤契约的工作流程,但LangGraph则使用有向图来组织各个步骤。图中的节点其实就是普通的异步函数,状态则是通过整个图传递的一个共享的可变对象,而执行顺序是由图中的边决定的,而不是由一系列`.then()`调用所决定的。

状态注解

在创建任何节点之前,你需要使用`Annotation.Root`来定义这个共享状态的结构。图中的每个节点都会读取和写入这个对象:

// packages/langchain-pipeline/src/graph/state.ts
export const PipelineState = Annotation Root({
  topic: Annotation,
  research: Annotation,
  analysis: Annotation,
  draft: Annotation,
  score: Annotation,
  feedback: Annotation,
  iterations: Annotation.number),
  finalReport: Annotation,
  criticDimensions: Annotation,
});

与Mastraka相比,LangGraph在数据流动方式上存在显著差异。在Mastraka中,每个步骤都会明确声明它接收什么以及返回什么,而框架会在TypeScript层面确保这些契约得到遵守。

而在LangGraph中,任何节点都可以读取或写入共享状态中的任意字段。这种结构是由图的结构决定的,而不是由类型系统决定的。因此,Mastraka能够在编译阶段发现数据流动相关的问题,而LangGraph则使得在不需要修改每个步骤的架构的情况下添加新字段变得更加容易。

工厂模式

LangGraph中的节点其实就是普通的异步函数,这种设计正是让它们保持简洁性的原因——没有框架带来的开销,也没有初始化过程,用户只需要编写自己的代码来调用相应的模型即可。

然而,我的需求是将回调函数以及一个用于存储令牌的共享变量应用到所有四个节点中,而普通的函数并没有内置机制来实现这一点。

为了解决这个问题,我使用工厂模式来创建这四个节点,使得这些节点能够访问共享状态:

// packages/langchain-pipeline/src/graph/nodes.ts
export function createNodes(
  callbacks: PipelineCallbacks,
  acc: { inputTokens: number; outputTokens: number }
) {
  const tavilyClient = tavily({ apiKey: process.env.TAVILY_API_KEY! });

  async function researchNode(state: PipelineStateType): Promise>> {
    const stepId = await callbacks.step.onStepStart("research", 1, state.topic);
    const results = await tavilyClient.search(state.topic, { maxResults: 5, searchDepth: "basic" });
    const research = results.results
      .map((r, i) => `[\({i + 1}] \){r.title}\nURL: \({r.url}\nContent: \){r.content}`)
      .join("\n\n");
    await callbacks.step.onStepComplete(stepId, {
      output: research,
      promptSent: state.topic,
      timeMs: elapsed,
      inputTokens: 0,      // research步骤使用的是Tavily服务,而不是大语言模型
      outputTokens: 0,
      model: "tavily-search",
      tavilyQuery: state.topic,
      tavilyResults: JSON.stringify(results.results),
    });
    return { research };
  }

  // analysisNode、writeNode、criticNode的实现方式与researchNode相同

  return { researchNode, analysisNode, writeNode, criticNode };
}

请注意,这个研究节点会返回0个令牌,因为它直接调用了Tavily,而没有使用任何大语言模型,这就是在基准测试数据中体现出来的一个关键差异。后续的每个节点都会将获得的令牌直接添加到共享的acc对象中:

const inputTokens = response.usage_metadata?.input_tokens ?? 0;
const outputTokens = response_usagemetadata?.outputtokens ?? 0;
acc.inputTokens += inputTokens;
acc.outputTokens += outputTokens;

LangChain的ChatAnthropic将相关数据存储在response.usage_metadata中,这种数据类型的定义非常清晰,因此无需进行类型转换。

图结构与节点命名冲突的问题

LangGraph有一个很容易被忽略的规定:节点名称不能与状态注解的键发生冲突。如果将一个节点命名为"research",就会在运行时出现错误,因为state.research已经作为一个状态通道存在了,而错误信息并没有解释为什么会出现这种问题。将节点名称改为"researcher""analyzer"就可以解决这个问题:

export const pipeline = new StateGraph(PipelineState)
  .addNode("researcher", researchNode)   // 不要使用"research",因为会与state.research发生冲突
  .addNode("analyzer", analysisNode)     // 同样,不要使用"analysis"
  .addNode("write", writeNode)
  .addNode("critic", criticNode)
  .addEdge(START, "researcher")
  .addEdge("researcher", "analyzer")
  .addEdge("analyzer", "write")
  .addEdge("write", "critic")
  .addConditionalEdges("critic", shouldRevise, {
    revise: "write",
    end: END,
  })
  .compile();

在LangGraph中,修订循环是通过条件边和路由函数来实现的:

function shouldRevise(state: PipelineStateType): string {
  if (state.score >= 7 || state.iterations >= 3) return "end";
  return "revise";
}

每次批评者节点被执行后,shouldRevise函数就会被调用,它会返回"revise"以使程序循环回到写入节点,或者返回"end"以退出图结构。这种机制在状态机中相当于Mastrana的.dowhile()函数:同样的条件逻辑是通过图结构的路由来实现的,而不是通过传统的循环结构。

重试处理机制

这两个框架在同时进行HTTPS请求时,都会遇到TLS会话复用失败的问题。这种错误的提示信息是:SSL routines:tls_get_more_records:decryption failed or bad record mac.使用带有线性退避策略的重试处理机制可以解决这个问题:

async function retryOnFetch(fn: () => Promise, retries = 3): Promise {
  for (let i = 0; i <= retries; i++) {
    try {
      return await fn();
    } catch (e: any) {
      const shouldRetry =
        e.message.includes("fetch") ||
        e.message === "fetch failed" ||
        e.message.includes("SSL") ||
        e.message.includes("ECONNRESET") ||
        e.message.includes("other side closed") ||
        ecause.code === "ECONNRESET";
      if (i < retries && shouldRetry) {
        await new Promise((r) => setTimeout(r, 1000 * (i + 1)));
        continue;
      }
      throw e;
    }
  }
  throw new Error("unreachable");
}

在LangChain节点中,每一个`llm.invoke()`调用都被封装在了这样的处理机制之中。在网页应用的API接口中,出于同样的原因,对于每一次Convex调用,也都存在相应的`retryMutation`封装层。

那个给所有内容打分7分的“批评者”

当这两套评估流程同时运行时,我测试了几个不同的主题。无论主题、所使用的框架或评估次数如何,最终得到的分数都是7分。

这种现象实际上是一种被广泛记录在案的错误机制,被称为“LLM作为评判者的偏见”。当你要求一个语言模型在没有给出结构化标准以及每个分数等级的明确界定时,为某项内容打分的话,它往往会给出7分——这个分数既足够表明内容的质量,又不会显得过于苛刻,同时也不需要任何具体的理由来支持。由于提示中没有任何因素迫使模型做出其他判断,因此它也没有动机去区分不同的情况。

我最初设计的那个“批评者”模型如下:

你是一名负责评判的编辑。请根据准确性、清晰度和深度为这份草稿打分,分数范围是1到10分。返回{分数,反馈}。

由于整个提示内容只有这一句话,因此显然这个模型会给所有内容都打7分。

真正的生产级评估应该是怎样的

我采用的解决方案源自G-Eval论文,DeepEval和RAGAS等工具也是基于这种评估方法设计的。关键在于,这样的评估机制需要三个要素共同发挥作用:评判者在给出分数之前必须进行逐步推理;被评估的各个维度必须是相互独立的;而且每个分数等级都必须有明确的含义说明,而不仅仅是“1分表示很差,10分表示完美”这么简单。

因此,我重新设计了这个“批评者”模型,使其包含六个必须依次完成的步骤,只有完成这些步骤后,才能最终得出一个评分:

  1. 事实性内容的审核:报告中的每一条事实性陈述都会被分类为“有依据的”“可合理推断的”“没有根据的”或“与事实相矛盾的”。

  2. 表述具体性的审核

    :所有泛化的表述以及所有被禁止使用的短语都会被明确标记出来。

  3. 观点深度的审核

    :会检查结论是否真的为内容增添了新的信息,而不仅仅是重复引言中的内容。

  4. 反事实逻辑的检验

    :评判者需要指出读者在阅读完这份报告后,会产生哪些仅从标题中是无法得出的具体想法。如果无法找到这样的想法,那么这个观点的得分就不能超过6分。

  5. 维度的评分

    :会为三个独立的维度分别打分,每个分数等级都有明确的界定标准。

  6. 最低分数限制规则

    :如果任何一个维度的得分低于4分,那么最终的总分就不能超过6分。

这一基本规则确实需要特别说明,因为它针对的是一种实际存在的缺陷:如果没有这条规则,那么一份虽然捏造了事实但来源信息较为准确的报告,只要其具体性和洞察力足够高,仍然可以获得及格分数。如果某一方面存在严重缺陷,这份报告就应该被判定为不合格,而不是通过一些调整就能使其通过评估。

以下是完整的评审标准说明,Mastra和LangChain是通过`nodes.ts`文件中的某个常量来共享这一标准的:

const CRITIC_INSTRUCTIONS = `你是一名资深研究编辑。你的任务是找出人工智能生成的报告中存在的具体缺陷。

步骤1:声明性内容审核
将每条声明分类为:[有根据的] [推论得出的] [无依据的] [捏造的内容]

步骤2:具体性审核
列出那些表述过于笼统、使用了禁用语句,或者没有任何可证伪内容的句子。禁用语句包括:“值得注意的是”、“各组织必须考虑”、“发展迅速”以及“展望未来时……”等。

步骤3:洞察力审核
结论部分是否提供了引言中没有的新信息?

步骤3.5:反事实检验
请指出读者在阅读这份报告后会产生但仅从标题中是无法得出的某个具体观点。如果你无法找到这样的观点,那么这份报告的洞察力得分就不能超过6分。

步骤4:对每个评估维度进行打分

来源真实性(占40%权重):
5-6分:声明内容准确,但这些信息来源于一般性的主题知识,并非具体的研究结果
7分:大部分声明都有明确的依据,至少引用了一个具体来源
8分:所有关键声明都有根据,且引用了两个或更多包含具体细节的来源
9-10分:每一条声明都对应着一个具体的来源,并且至少使用了一项统计数据

具体性(占30%权重):
5-6分:部分声明具有针对性,但段落之间的分析较为笼统
7分:大部分内容都是具体的,只有少量冗余表述
8分:每一个段落的内容都可以被证伪,且其中都提到了具体的实体名称
9-10分:如果更换主题,报告中没有任何句子仍然成立

洞察力(占30%权重):
5-6分:虽然有一些综合分析,但结论部分其实可以在阅读材料之前就写出来
7分:结论提出了基于现有证据得出的建议
8分:指出了读者之前没有考虑到的某种权衡因素
9-10分:一位资深工程师在阅读这份报告后,可能会重新考虑原来的设计决策

步骤5:基本规则
如果任何一个维度的得分低于4分,那么最终总分就不能超过6分。

步骤6:计算最终分数
finalScore = round((fidelity * 0.40) + (specificity * 0.30) + (insight * 0.30))

请仅以以下JSON格式进行回复:
{
"fidelity": <1-10>,
"fidelityReasoning": "<一句话>",
"specificity": <1-10>,
"specificityReasoning": "<一句话>",
"insight": <1-10>,
"insightReasoning": "<一句话>",
"score": <加权最终得分>,
"feedback": "<详细说明:请具体指出是哪一句内容导致某个维度的得分过低,然后明确说明需要做出哪些修改>"
};

从思维链输出中提取JSON数据

由于评审系统在生成JSON数据之前会先写出几段分析文字,因此`JSON.parse(result.text)`会因为响应内容不再符合纯JSON格式而抛出错误。在我发现并解决这个问题之前,每次解析失败时都会默默返回默认值`4`,这意味着对于每一个主题,程序都会执行全部三次评估流程。

该修复方案会从文本中反向查找最后一个有效的JSON对象,因为JSON代码块总是出现在文本的最后部分。

function extractJson(text: string): any {
try { return JSON.parse(text.trim()); } catch {}

const matches = text.match(/\{[\s\S]*\}/g);
if (matches) {
for (let i = matches.length - 1; i >= 0; i--) {
try { return JSON.parse(matches[i]); } catch {}
}
}

const fenced = text.match(/(?:json)?\s*([\s\S]*?)/);
if (fenced) {
try { return JSON.parse(fenced[1].trim()); } catch {}
}

return null;
}

我差点就发布出来的那个评估偏差问题

在重新构建评估系统之后,一切似乎都运转正常了:初稿的得分在4到6分之间,修订流程也会被触发,而且经过修改后的内容确实比之前的版本有所改进。

但在各个技术主题上,一个明显的规律出现了:Mastraka在所有主题上的得分始终在8到9分之间,而LangChain的得分则一直稳定在6到7分。

仔细分析评估系统实际给予哪些奖励后,问题就暴露了出来。“来源真实性”这一指标占据了最终得分的40%,而且该指标会奖励那些引用了Tavily研究中具体来源的报告。Mastraka的报告里经常会出现“根据Kore.ai的分析”这样的表述,而LangChain的报告也会提到类似的内容,但并没有指明具体的来源。

造成这种差异的原因在于信息在处理流程中的传递方式。Mastraka的智能体在处理文本时,会将Tavily研究中的所有内容(包括标题、URL和摘录)都保存在对话历史记录中,因此写作智能体在生成报告时可以直接使用这些原始资料。

而LangChain的写作模块则只接收到了从研究中提取的结构化JSON数据,即关键结论、主题和核心论点。当这些JSON数据被生成出来时,具体的来源信息已经被抽象掉了,因此写作系统虽然得出了结论,但却没有提供相应的引用依据。

尽管这两个处理流程都是按照各自框架的要求正确实现的,但我实际上给它们提供了不同的输入数据,而自己却没有意识到这一点。评估系统奖励的是那个拥有更多信息的框架,而不是那些生成了更高质量报告的框架;因此,在各个技术主题上出现的稳定得分差距其实反映了它们在信息处理能力上的本质差异。

为了解决这个问题,我们只需要对LangChain的写作模块进行一个简单的修改:在传递数据时,除了结构化JSON数据外,还要同时传递原始的Tavily研究结果。

async function writeNode(state: PipelineStateType): Promise>> {
const prompt = `你是一名为技术读者撰写报告的研究分析师。

研究部分(原始搜索结果——请注明具体来源):
${state.research}

分析部分:
${state.analysis}
\({state.feedback ? `\n之前草稿的评估意见:\n\){state_feedback}` : ""}

${WRITER_INSTRUCTIONS}

请仅返回报告正文。`

const response = await retryOnFetch(() => llm.invoke(prompt));
return { draft: response.content as string, iterations: (state.iterations ?? 0) + 1 };
}

由于两位作者使用的原始材料完全相同,现在的质量评分确实能够反映他们的写作水平。如果你的评估系统在多次测试中总是偏好其中某个选项,那么首先需要检查的是这两个选项是否使用了相同的输入数据。如果存在结构上的差异,就会导致始终如一的结果;而真正的质量差距则会因主题和草稿的质量不同而有所变化。

实时监控面板

在终端中运行相关流程固然可以用于自我比较,但这种方式并不适合其他人进行基准测试。理想的监控面板应该能让这两个流程并行运行,在执行过程中每一步都能被清晰地显示出来;同时,应该能够展开查看完整的提示信息和响应内容,还能提供相关性评分、标记数量等信息,并且拥有实时滚动的日志记录功能,所有数据都应能按类别进行保存和查询。

凸架构式

选择“Convex”正是因为它具备实时处理的能力:在React中使用的useQuery钩子能够监听数据库的查询操作,当底层数据发生变化时,系统会自动重新渲染页面,而用户端完全不需要进行任何轮询或WebSocket管理。

该架构以三种不同的粒度级别来存储每次测试的结果:

steps: defineTable({
  runId: v.id("runs"),
  pipelineResultId: v.id("pipelineResults"),
  framework: v.union(v.literal("mastra"), v.literal("langchain')),
  stepName: v.union(
    vliteral("research"), vLiteral("analysis"),
    v.literal("write"), v.literal("critic")
  ),
  iterationNumber: v.number(),
  status: v.union(v.literal("running"), v.literal("complete"), v.literal("error")),
  promptSent: v.optional(v.string()),
  output: v(optional(v.string)),
  timeMs: voptional(v.number),
  inputTokens: v.optional(v.number),
  outputTokens: v.optional(v.number),
  model: v.optional(v.string),
  tavilyQuery: v.optional(v.string),
  tavilyResults: v.optional(v.string),
  criticScore: v.optional(v.number),
  criticFeedback: v(optional(v.string)),
  criticDimensions: voptional(v.object({
    fidelity: v.number(),
    specificity: v.number(),
    insight: v.number(),
    fidelityReasoning: v.string(),
    specificityReasoning: v.string(),
    insightReasoning: v.string(),
  }),
}).index("by_pipeline_result", ["pipelineResultId"])

criticDimensions字段用于存储G-Eval评估的详细数据,因此监控面板可以利用这些数据用彩色条形图显示各个维度的评分结果,并附上相应的分析文本。

const activeTasks = new Map>&(); export async function POST(req: NextRequest) { const { topic, category } = await req.json(); // 同步创建相关记录(这些操作执行速度很快) const runId = await retryMutation(() => fetchMutation(api.runs.createRun, { topic, category, status: "running" }); ); const mastraResultId = await retryMutation(() => fetchMutation(api.pipelineResults.createPipelineResult, { runId, framework: "mastra", status: "running", iterations: 0, }); ); const langchainResultId = await retryMutation(() => fetchMutation(api.pipelineResults.createPipelineResult, { runId, framework: "langchain", status: "running", iterations: 0, }); ); // 不等待这些操作的结果,直接启动两个管道 const task = Promise.allSettled([ withRetry(() => runMastraPipeline(topic, buildCallbacks(runId, mastraResultId, "mastra"))), withRetry(() => runLangChainPipeline(topic, buildCallbacks(runId, langchainResultId, "langchain"))), }).then(async () => { await retryMutation(() => fetchMutation(api.runs.updateRunStatus, { runId, status: "complete" }); ); activeTasks.delete(runId as string); }); // 将这个任务存储在 Map 中,这样 Node.js就不会回收这个 Promise 对象 activeTasks.set(runId as string, task); return NextResponse.json({ runId }); // 立即返回响应 }

在 Vercel 上,这种模式仍然会失败,因为无服务器函数会在路由处理程序返回后立即终止执行,从而导致所有后台 promise 被取消。解决办法是使用 `@vercel/functions` 中提供的 `waitUntil` 函数,该函数会让 Vercel 保持执行上下文的活跃状态,直到对应的 promise 被解决:

import { waitUntil } from "@vercel/functions";

waitUntil(task);
return NextResponse.json({ runId });

订阅实时更新

在运行页面上,会有三个 Convex 查询同时执行:即实际的运行过程、管道处理的结果,以及每个管道结果对应的后续操作步骤。

这里的 `"skip"` 语句非常重要:它告诉 Convex 在没有收到有效的参数之前,不要执行相关查询,从而避免出现“竞争条件”现象——即在管道结果数据尚未生成时就提前启动了后续操作步骤的查询:

const mastraSteps = useQuery(
api.steps.getStepsForPipelineResult,
mastraResult ? { pipelineResultId: mastraResult._id } : "skip"
);

在重试后去除重复的步骤记录

当管道因 TLS 错误而失败并需要重新开始执行时,失败尝试中的步骤记录会与成功尝试的记录一起保存在 Convex 中。这样一来,用户界面就会同时显示这两组记录,从而导致研究卡片与其他步骤内容之间出现明显的间距。

为了解决这个问题,我们需要按照 `stepName + iterationNumber` 的格式对步骤记录进行分组,并保留每个组的最新版本:

const stepMap = new Map();
[...steps]
.sort((a, b) => (a._creationTime ?? 0) - (b._creationTime ?? 0))
.forEach((s) => {
const key = `\({s.stepName}-\){s.iterationNumber}`;
const existing = stepMap.get(key);
if (!existing) { stepMap.set(key, s); return; }
if (s.status === "complete") { stepMap.set(key, s); return; }
if (existing.status !== "complete") { stepMap.set(key, s); }
});

实时日志的自动滚动功能

在 Convex 中,日志记录会被作为数组添加到管道结果文档中,而当有新的日志条目出现时,页面会通过底部的一个空 div 使用 `ref` 实现自动滚动:

function LiveLogPanel({ logs }: { logs?: LogEntry[] }) {
const endRef = useRef〈HTMLDivElement〉(null);

useEffect(() => {
endRef.current?.scrollIntoView({ behavior: "smooth" });
}, [logs?.length]);

return (

{logs?.map((entry, i) => (



))}

);
}

效果的产生依赖于 logs?.length 这一数值,因此每当有新的日志记录从 Convex 系统传入时,滚动功能就会被触发。

数据实际显示的内容

速度: 在每次运行过程中,LangChain 的执行速度都要快 25% 到 45%。在处理较短的主题时,这一差距会缩小到 7 到 8 秒,但这种优势永远不会被逆转。

我认为造成这种差异的原因在于代码结构本身。Mastra 的 Agent 类会在每一步都初始化其工具循环管理器,即使实际上并没有使用任何工具。这意味着在模型真正开始运行之前,内部对话历史记录、工具配置以及重试机制就已经被建立起来了,而这些操作都会增加额外的开销。

在一个包含四步的处理流程中,每一步所消耗的 2 到 5 秒时间会累积起来。而 LangGraph 的节点本质上是异步函数,因此代码可以直接执行,在模型和用户代码之间不存在任何框架初始化的过程。

token 使用量: Mastra 所消耗的 token 数量是 LangChain 的 1.5 到 2.5 倍。其中,研究步骤所导致的差异最为明显,因为 LangChain 的研究节点会直接调用 Tavily,而不会使用任何大型语言模型。

在处理较为常见的主题时,Mastra 大约需要使用 6,200 个 token,而 LangChain 需要 3,900 个 token。这种差异会随着 Tavily 返回的内容量而变化,因为这些内容会在后续的处理步骤中被纳入 Mastra 的对话历史记录中。

质量:在解决了评估偏差问题之后,不同框架产生的评分结果会因主题的不同而有所差异。当 Tavily 提供的具体信息丰富且准确时,这两种框架都能生成高分报告;然而,在处理那些搜索结果较为模糊或涉及人物传记的主题时,两种框架都会遇到困难。

如果初稿的得分为 7 或 8 分,说明研究工作做得很好,作者也提出了具体而有力的观点;如果得分仅为 4 或 5 分,那就意味着研究结果不够充分,作者只能使用一些泛泛的表述,此时就需要进入修改循环,直到初稿得到改进或达到迭代次数上限为止。

权衡: Mastra 在框架层面处理了各种复杂的协调工作,因此用户无需亲自承担这些任务。用户只需要编写 .dowhile() 代码来控制流程,指定步骤的具体操作内容,而框架会负责管理对话历史记录和工具的执行过程。不过,这种做法的代价就是每一步都会产生额外的 token 使用量和延迟。

LangChain 则提供了图结构执行引擎,其余的所有工作都由用户自己来完成。用户需要更加明确地设计代码逻辑,但执行效率更高,并且能够精确控制每一条输入模型的 token。

亲自尝试一下

实时演示地址为 mastra-vs-langchain.vercel.app,而这项对比实验的完整源代码则可以在 github.com/sholajegede/mastra-vs-langchain 上找到。如果这个教程对你有帮助,请考虑给它打一颗星吧。

git clone https://github.com/sholajegede/mastra-vs-langchain.git
cd mastra-vs-langchain
npm install
cp .env.example .env
# 设置 ANTHROPIC_API_KEY 和 TAVILY_API_KEY
npx convex dev   # 在终端 1 中执行此命令
npm run web      # 在终端 2 中执行此命令

打开 `localhost:3000`,输入一个主题,选择一个分类,然后运行这两个测试。每个步骤都会被实时显示出来,所有的操作记录都会被详细记录下来,而历史记录页面则会保存按分类划分的所有之前的测试结果。

如果你想进一步扩展这个对比实验,可以加入 CrewAI、CopilotKit 或其他任何框架来进行测试。在 `packages/shared` 中的 `PipelineCallbacks` 接口就是你需要实现的唯一部分。

如果本教程对你有帮助,请随时分享给那些可能从中受益的人。我非常期待你的反馈。你可以在 X 上通过 @wani_shola 称呼我,或者在 LinkedIn 上与我联系。

Comments are closed.