我见过的每一个RAG系统——包括我在这个网站上编写了使用手册的那个系统——都存在同一个根本性问题。
这些系统无法进行学习。
你输入500份文档,然后提出一个问题。系统会检索出其中三个语义上最为相关的部分,并将它们传递给大语言模型进行处理。对于下一个查询,也会重复这个过程。
系统的知识储备实际上并没有任何增加——它仍然和第一天一样一无所知。它就像一个图书馆,从未编制过目录,也从未对自身的资料进行过交叉引用,更没有注意到其中有些资料的内容是相互矛盾的。
正是为了解决这个问题,我设计了“知识反思层”。每次有新的文档被输入系统后,该层会查找索引中已经存在的、与之语义相关的文档,并让大语言模型来分析这些新内容与现有知识的关联之处,以及还存在哪些认知上的空白。这样的分析结果会被存储下来,并在搜索结果中得到优先显示。
随着你不断添加更多文档,知识库会变得越来越“聪明”——而不仅仅是容量变大了而已。
本教程将详细教你如何构建这样一个系统。
目录
你将构建什么
通过本教程,你将搭建一个这样的系统:
-
每次有新文档被输入系统后,该系统会自动启动相应的处理流程。
-
它会从索引中查找与这些新文档语义上最为相关的文档。
-
然后会让Kimi K2.5模型生成一段三句话的总结,将新文档与现有的知识体系联系起来。
-
每隔三次文档输入,系统还会将这些分析结果整合成摘要形式进行保存。
-
一个Cloudflare账户——免费套餐即可使用
-
已安装Node.js v18+及Wrangler CLI(执行命令:
npm install -g wrangler) -
对TypeScript有基本了解
最后,这些分析结果会被以doc_type=reflection的标签存储下来,并在搜索结果中得到优先显示(排名会提高1.5倍)。
这样一来,在查询你的知识库时,系统既会展示原始的文档内容,也会呈现其在处理这些文档过程中生成的反思分析结果。
先决条件
您需要准备以下内容:
不需要使用任何外部API密钥,所有功能都在Cloudflare的基础设施上运行。
如何搭建基础系统
如果您已经根据我的freeCodeCamp手册构建好了RAG系统,可以直接跳过这一部分——您的系统已经可以用于后续的开发流程了。
如果您是从零开始搭建系统,通过阅读这一部分内容,大约15分钟后就能拥有一个可正常使用的基础系统。
搭建项目框架
npm create cloudflare@latest rag-reflection-system
cd rag-reflection-system
在创建项目时,请选择“Hello World示例”→“TypeScript”→“暂不进行部署”。
创建Vectorize索引和D1数据库
npx wrangler vectorize create rag-index --dimensions=384 --metric=cosine
npx wrangler d1 create rag-db
配置wrangler.toml文件
name = "rag-reflection-system"
main = "src/index.ts"
compatibility_date = "2026-01-01"
[[vectorize]]
binding = "VECTORIZE"
index_name = "rag-index"
[[d1_databases]]
binding = "DB"
database_name = "rag-db"
database_id = "YOUR_DB_ID"
[ai]
binding = "AI"
创建documents表
-- migrations/001_init.sql
CREATE TABLE IF NOT EXISTS documents (
id TEXT PRIMARY KEY,
content TEXT NOT NULL,
source TEXT,
date_created TEXT DEFAULT (datetime('now'))
);
npx wrangler d1 execute rag-db --remote --file=./migrations/001_init.sql
添加ingest和search端点
请将src/index.ts文件替换为以下代码,这样就能得到一个可正常运行的基础系统:
export interface Env {
VECTORIZE: VectorizeIndex;
DB: D1Database;
AI: Ai;
}
export default {
async fetch(request: Request, env: Env, ctx: ExecutionContext): Promise<Response> {
const url = new URL(request.url);
if (url.pathname === '/ingest' && amp; request.method === 'POST') {
const { id, content, source } = await request.json() as any;
const embResult = await env.AI.run('@cf/baai/bge-small-en-v1.5', {
text: [content.slice(0, 512)],
}) as any;
const vector = embResult.data[0];
await env.VECTORIZE.upsert([{
id,
values: vector,
metadata: { content: content.slice(0, 1000), source, doc_type: 'raw' },
}]);
await env.DB.prepare(
'INSERT OR REPLACE INTO documents (id, content, source) VALUES (?, ?, ?)'
).bind(id, content, source ?? '').run();
return Response.json({ success: true, id });
}
if (url.pathname === '/search' && amp; request.method === 'POST') {
const { query } = await request.json() as any;
const embResult = await env.AI.run('@cf/baai/bge-small-en-v1.5', {
text: [query],
}) as any;
const vector = embResult.data[0];
const results = await env.VECTORIZE.query(vector, {
topK: 5,
returnMetadata: 'all',
});
const context = results.matches
.map(m => m.metadata?.content as string)
.filter(Boolean)
.join('\n\n');
const answer = await env.AI.run('@cf/moonshotai/kimi-k2.5', {
messages: [
{ role: 'system', content: '答案仅基于提供的上下文生成。' },
{ role: 'user', content: `上下文:\n\({context}\n\n问题:\){query}` },
],
max_tokens: 256,
}) as any;
return Response.json({ answer: answer.response, sources: results.matches.map(m => m.id) });
}
return new Response('RAG系统正在运行中', { status: 200 });
},
};
部署与验证
npx wrangler deploy
进行测试:
# 添加文档
curl -X POST https://your-worker.workers.dev/ingest \
-H "Content-Type: application/json" \
-d '{"id": "doc-001", "content": "在需要实时更新的数据集中,使用游标分页比偏移量分页更有效,因为当在分页过程中有数据被插入或删除时,偏移量分页会变得不可靠。"}'
# 进行搜索
curl -X POST https://your-worker.workers.dev/search \
-H "Content-Type: application/json" \
-d '{"query": "我应该使用哪种分页方式呢?"}'
如果得到了明确的回答,那就说明基础系统运行正常。接下来的内容将会在这个基础上添加“反射层”功能。
为什么标准RAG存在内存问题
标准RAG检索机制是无状态的。每次查询都是从零开始进行的,系统不会保留之前的搜索结果,也不会整合从不同文档中获取的信息,更无法了解哪些问题尚未得到解答。
假设你已经收集了200份关于你产品的文档,其中12份提到了去年做出的定价决策。但没有一份文档能够完整地说明这一决策的来龙去脉——这些信息分散在季度报告、会议记录、内部Slack消息以及一些Notion页面中。
当有用户提问“我们为什么改变了定价结构?”时,标准RAG检索机制会找出3份内容最相似的文档。如果这三份文档确实包含了答案,那当然很好;但如果它们无法提供完整的答案——也就是说,真正需要的信息需要从这12份文档中综合起来才能得出——那么系统就无能为力了,它只能返回一些片段,让大型语言模型自行猜测。
“反射层”正是为了解决这个问题而设计的。当第12份关于定价的文档被添加进来时,系统会自动找到其他11份相关文档,整合它们之间的信息,并将这些整合结果存储起来,以便日后检索。这样,在有人提出问题之前,“我们为什么改变了定价结构”这个问题的答案就已经存在于索引中了。
这并不是说检索机制变得更智能了,而是索引技术得到了改进。
步骤1:更新数据库模式
“反射层”功能需要在你的D1文档表中添加两个新的字段。请执行以下迁移操作:
-- migrations/003_add_reflection_fields.sql
ALTER TABLE documents ADD COLUMN doc_type TEXT DEFAULT 'raw';
ALTER TABLE documents ADD COLUMN reflection_score REAL DEFAULT 0;
ALTER TABLE documents ADD COLUMN parent_reflection_id TEXT;
应用这些更改:
wrangler d1 execute mcp-knowledge-db --remote --file=./migrations/003_add_reflection_fields.sql
doc_type这个字段用于区分原始文档(raw)、针对单份文档生成的反射信息(reflection),以及整合了多份文档信息的汇总结果(summary)。你可以利用这个字段来过滤结果:只向需要简化查看内容的用户显示反射信息,而让那些希望获取原始文档片段的用户看不到这些信息。
步骤2:反射引擎
创建文件`src/engines/reflection.ts`。这是该模块的核心部分。
import { Env } from '../types/env';
import { resolveEmbeddingModel, resolveReflectionModel } from '../config/models';
const REFLECTION_BOOST = 1.5;
const CONSOLIDATION_THRESHOLD = 3; // 每当有N条新的反射结果产生时,就进行合并处理
export async function reflect(
newDocId: string,
newDocContent: string,
env: Env
): Promise<void>> {
// 1. 在索引中查找与新文档具有语义关联的现有文档
const embModel = resolveEmbeddingModel(env.EMBEDDING_MODEL);
const embResult = await env.AI.run(embModel.id as any, {
text: [newDocContent.slice(0, 512)],
});
const queryVector = (embResult as any).data?.[0];
if (!queryVector) return;
const related = await env.VECTORIZE.query(queryVector, {
topK: 5,
filter: { doc_type: { $eq: 'raw' } },
returnMetadata: 'all',
});
const relatedDocs = (related.matches ?? []).filter(
m => m.id !== newDocId && (m.score ?? 0) > 0.65
);
if (relatedDocs.length === 0) return; // 如果没有相关的文档,就跳过此步骤
// 2. 构建合成提示语
const relatedSummaries = relatedDocs
.slice(0, 3)
.map((m, i) => `文档 \({i + 1}: \){String(m.metadata?.content ?? '').slice(0, 300)}`)
.join('\n\n');
const prompt = `你正在综合知识库中多个文档的内容来生成新的信息。
新文档:
${newDocContent.slice(0, 600)}
相关的现有文档:
${relatedSummaries}
请写出以下三句话:
1. 新文档补充了哪些现有文档尚未涉及的内容?
2. 新文档是如何与现有文档相联系或扩展它们的?
3. 在所有这些文档中,还存在哪些未得到解答的问题?
请具体说明,并引用实际内容。不要进行总结,而是要生成新的、连贯的文本。`;
// 3. 调用反射模型
const reflModel = resolveReflectionModel(env.REFLECTION_MODEL);
const llmResp = await env.AI.run(reflModel.id as any, {
messages: [{ role: 'user', content: prompt }],
max_tokens: 180,
});
const reflectionText = (llmResp as any)?.response?.trim();
if (!reflectionText || reflectionText.length < 40) return;
// 4. 将生成的反射结果进行嵌入并存储
const reflEmbResult = await env.AI.run(embModel.id as any, {
text: [reflectionText],
});
const reflVector = (reflEmbResult as any).data?.[0];
if (!reflVector) return;
const reflectionId = `refl_\({newDocId}_\){Date.now()}`;
await env.VECTORIZE.upsert([
{
id: reflectionId,
values: reflVector,
metadata: {
content: reflectionText,
doc_type: 'reflection',
parent_id: newDocId,
reflection_score: REFLECTION_BOOST,
source_doc_ids: relatedDocs.map(m => m.id).join(','),
date_created: new Date().toISOString(),
},
},
];
await env.DB.prepare(
`INSERT INTO documents
(id, content, doc_type, reflection_score, parent_id, dateCreated)
VALUES (?, ?, 'reflection', ?, ?, ?)`
)
.bind(reflectionId, reflectionText, REFLECTION_BOOST, newDocId, new Date().toISOString())
.run();
// 5. 检查是否需要合并反射结果
const recentCount = await env.DB
.prepare(`SELECT COUNT(*) as cnt FROM documents WHERE doc_type = 'reflection' AND date_created > datetime('now', '-1 hour')`)
.first<{ cnt: number }>();
if ((recentCount?.cnt ?? 0) >= CONSOLIDATION_THRESHOLD) {
await consolidate(env);
}
}
这里有兩点值得注意。
首先,语义阈值(score > 0.65)非常重要。如果这个阈值过低,生成的内容就会与实际主题无关;而如果阈值过高,就很难发现各种内容之间的关联。对于bge-small模型来说,0.65这个阈值效果相当不错;而对于qwen3-0.6b(1024d版本)模型,将阈值提高到0.72会得到更好的结果,因为在这种模型下,得分会聚得更加明显。
提示语的结构也是经过精心设计的。这三句话各自承担着不同的功能:第一句介绍新内容,第二句说明这些新内容是如何与其他内容相互关联的,第三句则总结剩余的内容。这样的结构能够确保生成的内容对于信息检索来说具有实际意义。而如果使用自由形式的提示语来生成文本,虽然得到的可能是文笔优美的文章,但这些文章往往不适合用于信息检索。因此,这种结构才能确保最终生成的内容具备被检索的价值。
步骤3:整合
随着反思内容的不断积累,它们需要经过专门的整合处理——否则这些新增内容就会在更高的抽象层面上带来“噪音”,影响整个系统的性能。
请将以下代码添加到src/engines/reflection.ts文件中:
export async function consolidate(env: Env): Promise {
// 获取那些尚未被整合的最新反思内容
const recent = await env.DB
.prepare(
`SELECT id, content FROM documents
WHERE doc_type = 'reflection'
AND id NOT IN (
SELECT DISTINCT parent_id FROM documents
WHERE doc_type = 'summary' AND parent_id IS NOT NULL
)
ORDER BY date_created DESC
LIMIT 6`
)
.all<{ id: string; content: string }>>();
if (!recentresults || recent_results.length < CONSOLIDATION_THRESHOLD) return;
const reflectionTexts = recent.results.map((r, i) => `Reflection \({i + 1}: \){r.content}'.join('\n\n');
const prompt = `你正在将多条知识反思内容整合成一条压缩后的总结信息。
${reflectionTexts}
请用两到三句话来概括这些反思内容中最重要的共性或矛盾点。在添加了这些新内容之后,知识库现在理解到了什么新的东西?目前还存在哪些最关键、尚未得到解决的问题?
请表达得尽可能准确,不要写多余的前言。`
const reflModel = resolveReflectionModel(env.REFLECTION_MODEL);
const llmResp = await env.AI.run(reflModel.id as any, {
messages: [{ role: 'user', content: prompt }],
max_tokens: 320,
});
const summaryText = (llmResp as any)?.response?.trim();
if (!summaryText || summaryText.length < 40) return;
const embModel = resolveEmbeddingModel(env.EMBEDDING_MODEL);
const embResult = await env.AI.run(embModel.id as any, { text: [summaryText] });
const summaryVector = (embResult as any).data?.[0];
if (!summaryVector) return;
const summaryId = `summary_${Date.now()}`;
await env.VECTORIZE.upsert([
{
id: summaryId,
values: summaryVector,
metadata: {
content: summaryText,
doc_type: 'summary',
reflection_score: REFLECTION_BOOST * 1.2,
source_reflection_ids: recentresults.map(r => r.id).join(','),
date_created: new Date().toISOString(),
},
},
];
await env.DB.prepare(
`INSERT INTO documents (id, content, doc_type, reflection_score, date_created)
VALUES (?, ?, 'summary', ?, ?)`
)
.bind(summaryId, summaryText, REFLECTION_BOOST * 1.2, new Date().toISOString())
.run();
}
摘要在基础排名提升的基础上还会再获得1.2倍的额外加分。在搜索结果中,如果某个摘要综合了12份相关的文档内容,那么在涉及广泛概念的查询中,它的排名应该会高于任何单份文档。而在针对具体事实的查询中,原始文档片段会获得更高的分数。排序机制会自动完成这些工作。
步骤4:将其集成到您的数据摄入处理程序中
这种反射处理过程是在后台运行的,它不会阻塞数据摄入响应流程——否则每次数据摄入请求都会额外增加2到3秒的延迟。
在`src/handlers/ingest.ts`文件中,在存储完文档内容之后,请执行以下代码:
import { reflect } from '../engines/reflection';
// ... 其他现有的数据摄入处理逻辑 ...
// 在VECTORIZE.upsert()和数据库插入操作成功之后:
ctx.waitUntil(
reflect(documentId, content, env).catch(err => {
console.warn('[reflection] 处理失败,原因:', documentId, err.message);
})
);
return new Response(JSON.stringify({
success: true,
documentId,
chunks: chunkCount,
// ... 其余响应内容
}), { headers: { 'Content-Type': 'application/json' } });
`ctx.waitUntil()`是Cloudflare Workers提供的用于执行后台任务的函数。响应会立即返回,而反射处理会在之后再执行,因此数据摄入API的运行速度依然保持快速。
`.catch()`这个代码块非常重要。如果反射处理失败了,也不应该影响到整个数据摄入流程的正常进行。原始文档才是最可靠的信息来源,而反射处理得到的结果只是辅助性的信息——虽然有用,但并非关键环节。
步骤5:在搜索结果中增强反射内容的排名
在`src/engines/hybrid.ts`文件中,将反射内容排名提升的逻辑添加到排序算法中。在完成RRF融合处理之后、但在返回搜索结果之前,请执行以下代码:
// 对搜索结果应用排名提升机制
const boosted = results.map(r => ({
...r,
score: r.doc_type === 'reflection' || r.doc_type === 'summary'
? r.score * (r.reflection_score ?? 1.5)
: r.score,
});
return boosted.sort((a, b) => b.score - a.score);
这种排名提升是在数据融合处理之后进行的,而不是在融合之前就重新进行排序的。这样做的原因是:首先需要对所有搜索结果应用RRF融合算法,让反射内容在获得排名提升之前先根据其实际相关性来决定排序位置。那些仅凭排名提升机制就无法进入前20名的反射内容,不应该因此而出现在搜索结果中。
步骤6:按`doc_type`进行过滤
您的搜索接口应该允许用户通过`doc_type`参数来进行筛选,这样调用者就可以根据自己的需求来控制看到的搜索结果:
// 在搜索请求处理函数中:
const docTypeFilter = body.filters?.doc_type;
// 将这个过滤条件传递给Vectorize查询函数:
const vectorFilter: Record = {};
if (docTypeFilter) {
vectorFilter.doc_type = docTypeFilter;
}
这样调用者就可以选择三种搜索模式:
# 只显示反射内容和摘要
POST /search
{ "query": "pricing decisions", "filters": { "doc_type": { "$in": ["reflection", "summary"] } } }
# 只显示原始文档
POST /search
{ "query": "pricing decisions", "filters": { "doc_type": { "$eq": "raw" } } }
# 默认模式:显示所有类型的文档,其中反射内容的排名会得到提升
POST /search
{ "query": "pricing decisions" }
默认设置(不使用任何过滤)才是最实用的选择。让系统自动完成其工作即可;当你需要引用具体内容时,才应限制查询结果为原始数据;而当你希望获得综合性的分析结果时,则应将查询范围限定在经过处理的反射信息上。
构建完成后会发生哪些变化
当文档数量达到200份时,差异就会变得明显。那些之前只会返回零散片段的查询,现在会得到已经将这些片段整合成完整内容的分析结果;而针对广泛概念性的问题——“我们对于X了解多少?”——系统现在能够提供真正有用的总结信息,而不仅仅是内容最为相似的单个段落。
当文档数量达到2000份时,反射分析层就成为了整个系统中最有价值的部分:原始数据片段可以用来回答具体的事实性问题,而经过处理的反射结果与总结信息则能帮助解决那些无法通过任何单份文档来解答的概念性问题。系统由此获得了某些单一文档所无法提供的信息。
有一点需要注意:如果你的嵌入模型在语义聚类方面的表现不佳——比如使用旧的bge-small模型来处理包含多种类型文档的数据集——那么在检索相关文档时,系统很可能会返回一些关联性较弱的连接信息,从而生成内容肤浅的反射分析结果。0.65这个阈值可以过滤掉大部分这类问题,但如果你发现某些反射分析结果似乎与主题无关,那么首先应该检查你的嵌入模型是否正常工作。
部署
wrangler d1 execute mcp-knowledge-db --remote --file=./migrations/003_add_reflection_fields.sql
wrangler deploy
之后,再导入几份文档,观察系统会如何反应:
# 导入第一份文档
curl -X POST https://your-worker.workers.dev/ingest \
-H "Authorization: Bearer YOUR_KEY" \
-H "Content-Type: application/json" \
-d '{"id": "doc-001", "content": "你的文档内容在这里..."}'
# 几秒钟后,检查是否生成了反射分析结果
curl "https://your-worker.workers.dev/search" \
-H "Authorization: Bearer YOUR_KEY" \
-H "Content-Type: application/json" \
-d '{"query": "你的搜索主题", "filters": {"doc_type": {"$eq": "reflection"}}}'
只有当有相关的文档可供整合时,反射分析结果才会出现。因此,在期望看到这些结果之前,至少需要导入三份内容相似的文档。
接下来应该做什么
如上所述,当前的系统会在每次导入文档后自动执行反射分析操作。但在处理大量文档时,这种做法会带来性能上的负担——如果你要批量导入10,000份文档,你肯定不希望系统为每份文档都单独执行一次反射分析操作。
对于大规模的数据导入任务,可以采取以下策略:只有当某份文档的相似性搜索结果匹配度超过0.8时,才调用reflect()函数;或者等到批量导入完成后,再统一执行反射分析操作。完整代码库中的POST /ingest/batch接口就是用于实现这一功能的。
另外还有一件值得开发的功能:在用户界面中,应该用视觉上的区别来标识那些经过反射分析得出的结果。搜索结果如果是反射分析的结果,它的显示方式应该与原始数据片段有所不同。在代码库中提供的仪表板中,反射分析结果会用💡标志来标示,并附带“由N份文档综合生成”的说明文字。
完整源代码可访问 github.com/dannwaneri/vectorize-mcp-worker — 包含反射引擎、数据整合功能、批量数据导入机制、仪表盘界面以及OpenAPI规范。
该代码库采用TypeScript编写,通过一次简单的 `wrangler deploy` 命令即可完成部署;在每天处理10,000次查询的情况下,每月的运行成本约为1至5美元。
该系统采用标准的RAG检索机制,并具备自我学习的能力。