在多个微服务之间构建可靠的工作流程是一项具有挑战性的任务。在单体应用中,数据库事务能够确保多项操作要么全部成功,要么全部失败。但一旦数据分散存储在不同的服务和数据库中,这种保障就不再存在了。
这时,Saga模式就派上了用场。Saga模式并不使用分布式事务,而是通过协调一系列本地事务来处理业务流程,并在出现问题时执行补偿操作。
在本文中,我们将使用NestJS、gRPC、PostgreSQL和Sequelize来构建一个基于Saga模式的解决方案。您将学习如何在不同服务之间协调工作流程、实现基于补偿的回滚机制、处理幂等性问题,以及如何在生产环境的微服务架构中跟踪工作流程的进展。
目录
先决条件
本文假设您已经熟悉一些后端开发概念。虽然不需要具备Saga模式的相关经验,但您应该对以下内容有所了解:
-
JavaScript、TypeScript、Node.js
-
NestJS的基础知识(控制器、服务、依赖注入)
-
PostgreSQL的基本概念
-
数据库事务
-
Docker(推荐用于本地开发)
-
微服务架构的基础知识
-
gRPC的基础知识(了解有帮助,但不是必需的)
如果您已经使用NestJS和PostgreSQL构建过一些后端服务,那么您就已经具备了阅读本指南所需的所有条件。
1. 引言
Saga是指在多个服务中执行的系列局部事务。每个步骤都会提交自己的数据库事务;如果后续的某个步骤失败,Saga会执行补偿性事务,以恢复之前已经完成的操作。
这种模式最初是由Hector Garcia-Molina和Kenneth Salem在1987年为处理长期存在的数据库事务而提出的。十年前,当企业开始将单体应用拆分为微服务时,人们重新发现了这一模式,并意识到:在服务边界处,数据库事务——这个后端开发人员最强大的工具——就无法正常发挥作用了。
本文将通过一个在Node.js环境中使用NestJS和gRPC实现的例子来说明Saga的作用。在这个例子中,有两个服务需要共同完成一项业务任务:
-
agency-service——负责存储机构的相关信息。 -
auth-service——负责管理组织、用户和角色信息。
如果其中任何一个服务出现故障,系统必须恢复到最初的状态,不能留下未完成的任务、丢失的数据或混乱的系统状态。
2. 问题示意图
以下是Saga旨在防止出现的错误情况:
步骤1:auth-service ✅ 创建了组织#42
步骤2:auth-service ✅ 创建了用户#99
步骤3:agency-service ❌ 失败(数据库故障、验证失败、网络问题……)
没有使用Saga时的结果:
组织#42和用户#99仍然存在。
但机构记录却不存在了。
用户虽然可以登录,但却没有任何可管理的资源。
支持团队会收到工单,工程师需要编写一次性的SQL语句来清理数据。
这种情况每周都会重复发生。
Saga的作用就是检测到步骤3失败,并明确地删除组织#42和用户#99,从而确保系统的状态保持一致——即使这些数据存储在不同的服务数据库中。
3. 为什么需要Saga
在单体应用中,我们可以将所有操作封装在一个数据库事务中,让数据库来保证操作的原子性:
await sequelize.transaction(async (tx) => {
await Organization.create({...}, { transaction: tx });
await User.create。......, { transaction: tx });
await Agency.create。......, { transaction: tx });
});
在微服务架构中,每个服务都有自己的数据库。因此无法将两个服务封装在同一个ACID事务中。各种传统的解决方案都存在问题:
| 方案 | 存在的问题 |
|---|---|
| 两阶段提交(2PC) | 会锁定跨不同服务的数据行,协调者成为单点故障,并且不具备扩展性。大多数现代数据库在通过HTTP/gRPC进行通信时并不支持这种机制。 |
| “只能寄希望于它能正常工作” | 当系统的一部分出现故障时,会导致一些用户数据或计费信息丢失。随着系统运行时间的延长,这类问题会越来越多。 |
| 手动编写清理脚本 | 这种方案暂时有效,但其中存在的漏洞可能会长期存在,新加入的工程师也可能不知道这些问题的存在。 |
| 无补偿机制的最终一致性 | 对于某些场景(如分析系统)来说这种方案还可以接受,但对于计费、身份管理或涉及金钱的交易而言,则完全不适用。 |
| Saga模式 | 每个服务都会在本地完成数据提交操作,而协调者负责管理整个工作流程,并在出现故障时执行相应的补偿操作。这种方案具有可审计性、可重启性,也是比较合理的解决方案。 |
Saga模式能够实现最终一致性,同时提供清晰、可审计的回滚路径——而且不需要使用分布式锁机制。
4. 协同执行与流程编排
实现Saga模式有两种方式:
协同执行
在协同执行模式下,各个服务会发出事件,其他服务则会接收这些事件并作出相应的反应。
auth-service → 发出“UserCreated”事件
agency-service → 接收到该事件后创建代理账户,并发出“AgencyCreated”事件
billing-service → 接收到该事件后创建订阅关系……
这种方案初期看起来很简单,但后期维护起来会比较困难。因为整个工作流程分散在多个代码库中,没有人能够完全掌控它。进行调试时需要通过日志来追踪各种事件的流转过程;而添加一个新的操作步骤则可能需要修改多个服务。
流程编排
在流程编排模式下,有一个服务充当“指挥者”的角色,它会按照一定的顺序调用其他服务。
orchestrator:
1. authClient.provisionAccount(...)
2. agencyRepo.create(...)
3. authClient.sendWelcomeEmail...
这种方案虽然存在一定的耦合性(因为协调者需要导入其他服务的接口),但整个工作流程都集中在一个文件中。因此新工程师上手使用这种方案只需要一小时的时间;而添加一个新的操作步骤也只需要提交一次代码变更请求即可。
除非有特别充分的理由,否则应该选择流程编排模式。本文以及参考实现的示例都是采用流程编排模式的。
5. 示例项目
我们的目标是在系统中创建一个代理账户。这个操作通常发生在新B2B客户注册的时候。
为了完成这个操作,需要两个服务协同工作,以确保最终得到预期的结果:
auth-service 必须执行以下操作:
-
一条
Organization记录(代表租户) -
一条
User记录(代表将进行登录操作的机构管理员) -
一条
UserRole记录,用于将该用户与AGENCY_ADMIN角色关联起来
agency-service 必须创建以下内容:
- 一条包含业务详情的
Agency记录,这些详情包括规模、注册号、网站地址、分支机构等信息,并且该记录要与上述用户/组织相关联
这些记录在同一个服务内部存在外键关联关系,但不同服务之间不存在这种关联关系——Postgres无法确保认证数据库中的用户信息与agency-service数据库中的authUserId相匹配,因此这一验证工作需要由应用程序来完成。
auth-service DB agency-service DB
───────────────── ─────────────────
organizations ◄────────┐
│ │
│ (1:1) │ 外键关联(无单独的外键字段)
▼ │ agencies
users ──────► user_roles ─ authUserId
└ authOrganizationId
如果第一步成功后第二步失败,那么最终会得到这样一个用户:该用户能够完成认证流程,但却没有任何所属的机构——这正是2.0版本中出现的那个问题。而“Saga模式”正是为了解决这个问题而设计的。
6. 架构设计
┌───────────────────────────────┐
│ API网关 │
└──────────────┬────────────────┘
│ HTTP请求 │
▼
┌──────────────────────────────────────────────────┐
│ agency-service │
│ ┌─────────────────────────────────────────┐ │
│ │ AgencyOnboardingOrchestrator(Saga模式) │ │
│ └───────────────┬─────────────────────────┘ │
│ │ 负责记录流程状态 │
│ ▼ │
│ agency_onboarding_sagas (Postgres数据库) │
└───────────────┬─────────────────┬────────────────┘
│ gRPC │ gRPC协议
provisionAgencyAccount compensateAgencyAccount
│ │
▼ ▼
┌──────────────────────────────────────────────────┐
│ auth-service │
│ AgencyProvisioningService (负责执行相关操作) │
│ │
│ organizations · users · user_roles │
│ agency_provision_records ← 用于保证操作幂等性的日志 │
└──────────────────────────────────────────────────┘
整个流程由三个组件来完成:
-
AgencyOnboardingOrchestrator组件,位于agency-service服务中——负责驱动整个工作流程的进行。 -
agency_onboarding_sagas表,也位于agency-service服务中——用于记录Saga模式执行过程中的各项细节。 -
AgencyProvisioningService组件,位于auth-service服务中——提供provisionAgencyAccount和compensateAgencyAccount这两种操作接口。该组件还依赖于自己的agency_provision_records表来保证操作的幂等性。
协调器从不直接访问认证数据库。这一边界是由gRPC来确保的。
7. 故事流程,逐步解析
这张序列图展示了整个入职流程的完整生命周期。当客户端发送请求创建新的机构时,工作流程便开始了。协调器首先会在其数据库中创建一条故事记录,并将其标记为STARTED状态,这样就能在任何业务操作发生之前,保留这条记录以记录整个工作流程的进展。
从高层次来看,协调器会先创建一条故事记录,然后请求认证服务来配置组织、用户和角色信息。一旦这些操作成功完成,协调器就会在自己的数据库中创建相应的机构记录。
如果所有步骤都成功完成,那么这个故事流程就会进入COMPLETED状态;但如果在已经配置好了认证资源之后,机构创建失败,协调器会触发一个补偿机制,指令认证服务撤销之前所做的所有配置操作。
关键在于:每个服务都会执行自己的本地事务,而协调器则负责整体协调业务流程,并确保在出现故障时系统能够恢复到一致的状态。
sequenceDiagram
autonumber
participant C as Client
participant AS as agency-service
AU: provisionAgencyAccount(sagaId, …)
AU->>>DB2: BEGIN TX
AU->>>DB2: create org + user + role + provision_record
AU->>>DB2: COMMIT
AU-->>AS: { userId, organizationId, roleId }
AS->>>DB1: UPDATE saga (AUTH_PROVISIONED)
AS->>>AS: create Agency row
alt Agency row OK
AS->>>DB1: UPDATE saga (AGENCY CREATED → COMPLETED)
AS->>>AU: sendAgencyWelcomeEmail (non-critical)
AS-->>C: 200 OK + sagaId
else Agency row fails
AS->>>DB1: UPDATE saga (COMPENSATING)
AS->>>AU: compensateAgencyAccount(sagaId)
AU->>>DB2: BEGIN TX
AU->>>DB2: delete role + token + user + org + record
AU->>>DB2: COMMIT
AS->>>DB1: UPDATE saga (COMPENSATED → FAILED)
AS-->>C: 5xx + error code
end
如果从头到尾仔细阅读这段描述,你就会理解整个入职流程的运作机制。这就是协调机制的价值所在——这种序列图本身就代表了系统的架构设计。
8. 状态机
在执行下一步操作之前,所有的状态转换都会被记录到agency_onboarding_sagas中。正是这一机制使得整个流程具有可观察性和可恢复性。
export enum AgencyOnboardingSagaStatus {
STARTED = 'STARTED', // 记录存在,但尚未产生任何副作用
AUTH_PROVISIONED = 'AUTH_PROVISIONED', // 认证相关操作已完成
AGENCY CREATED = 'AGENCY_CREATED', // 机构创建记录已生成
COMPLETED = 'COMPLETED', // 正常流程的最终状态
COMPENSATING = 'COMPENSATING', // 回滚操作正在进行中
COMPENSATED = 'COMPENSATED', // 回滚操作已完成
FAILED = 'FAILED', // 操作失败(无论是否进行了补偿)
}
为什么需要这么多状态呢?因为“这里出了什么问题?”这样的问题总会在凌晨两点被提出来。而那种只记录“成功”或“失败”结果的流程体系,对于进行故障分析来说根本毫无用处。
┌── 认证失败 ──────────► 失败 (无需采取补偿措施)
│
开始 ──► 认证完成 ──► 机构创建成功 ──► 完成 (正常流程)
│
机构创建失败 ───┘
▼
需要采取补偿措施
│
▼
补偿措施已执行 ──► 失败 (状态再次回归一致)
“无法回头的临界点”就是“认证完成”这个阶段。在这一点之前,我们可以迅速失败,因为没有任何操作是不可撤销的;但一旦过了这个阶段,所有的失败路径都必须经过补偿处理才能结束。
9. 实现调度器
调度器是唯一了解整个工作流程的组件。每个步骤都对应一个私有方法,而且每个步骤在返回结果之前都会保留自己的执行结果。
创建流程记录
// agency-onboarding.saga.repository.ts
async createSaga(payload: CreateAgencyOrchestrationInput) {
return this.sagaModel.create({
sagaId: randomUUID(), // 用于关联所有相关数据的唯一标识符
status: AgencyOnboardingSagaStatus.STARTED,
currentStep: 'STARTED',
payload, // 用于回放操作的完整输入数据
});
}
sagaId是一个一次性生成的UUID,会传递给所有的后续处理步骤。它是将调度器端的流程记录与参与者端的配置记录联系起来的唯一标识符。
主循环
// agency-onboarding.orchestrator.ts(为文章内容进行了简化)
async execute(input: CreateAgencyOrchestrationInput) {
const saga = await this.sagaRepository.createSaga(input); // 状态为“开始”
try {
// 第一步——认证服务的工作
const authStep = await this.provisionAuth(saga, input);
if (!authStep.ok) {
await this.markFailed(saga, authStepfailure); // 无需采取补偿措施
return authStep.failure;
}
// 第二步——机构服务的工作
let activeSaga = authStep.saga; // 状态为“认证完成”
try {
activeSaga = await this.createAgencyRow(activeSaga, input, authStep.authIds);
} catch (err) {
// 在这种复杂情况下,需要撤销认证服务之前所做的操作
await this.compensateAuth(activeSaga, 'SAGA_FAILED');
const failure = mapSagaFailure(err.message, 'SAGA_FAILED', 'CREATE_AGENCY');
await this.markFailed(activeSaga, failure);
return failure;
}
// 第三步——标记任务完成并执行非关键的后处理操作
activeSaga = await this.sagaRepository.updateSaga(activeSaga, {
status: AgencyOnboardingSagaStatus.COMPLETED,
});
await this.sendWelcomeEmail(input, activeSaga); // 尽力完成此操作
return mapSagaSuccess(activeSaga, await this.agencyModel.findByPk(activeSaga.agencyId!));
} catch (error) {
// 综合处理各种异常情况
await this.compensateAuth(saga, 'SAGA_FAILED');
const failure = mapSagaFailure(error.message, 'SAGA_FAILED', 'SAGA');
await this.markFailed(saga, failure);
return failure;
}
}
详细步骤说明
private async provisionAuth(saga: AgencyOnboardingSaga, input: ...) {
thislogger.log(`[${saga.sagaId}] PROVISION_AUTH`);
const auth = await firstValueFrom(
this.authClient.provisionAgencyAccount({
sagaId: saga.sagaId, // <-- 关联信息
organizationName: input.agencyName.trim(),
email: input.email.trim().toLowerCase(),
// …
}),
);
if (!auth.status || !auth.data) {
return { ok: false, failure: mapAuthProvisionFailure(auth) };
}
// 将后续可能需要的ID保存下来,以便日后进行补偿操作
const updated = await this.sagaRepository.updateSaga(saga, {
authOrganizationId: Number(auth.data.organizationId),
authUserId: Number(auth.data.userId),
authUserRoleId: Number(auth.data.user-roleId),
status: AgencyOnboardingSagaStatusAUTH_PROVISIONED,
});
return { ok: true, saga: updated, authIds: auth.data };
}
其中完成大部分工作的是updateSaga这个方法。它会将auth-service返回的那些外部ID保存到相应的记录中,这样一来,即使协调进程崩溃并重新启动,恢复操作也能读取这些信息,从而知道应该进行哪些补偿操作。
10. 实现参与者模块
参与者模块(auth-service)会将自身的所有操作封装在本地数据库事务中。在这个事务的范围内,所有的操作都遵循ACID原则;而saga>系统则负责处理跨服务之间的协调问题。
// agency-provisioning.service.ts (简化版本)
async provisionAgencyAccount(req: ProvisionAgencyAccountInput) {
// 1. 确保操作具有幂等性——如果该sagaId已经存在,则返回之前的结果。
const existing = await this.provisionRecordModel.findOne({
where: { sagaId: req.sagaId },
});
if (existing) {
return serviceSuccess('机构管理员已成功注册', {
userId: Number(existing.userId),
organizationId: Number_existing.organizationId),
userRoleId: Number(existing.roleId),
});
}
// 2. 在执行事务之前先进行域名验证(一旦发现错误就立即停止操作)。
if (await this.emailExists(req.email)) {
return serviceFailure('该邮箱地址已经存在', { code: 'EMAIL_EXISTS' });
}
if (await this.organization Exists(req.organizationName)) {
return serviceFailure('该机构名称已经存在', { code: 'ORGANIZATION EXISTS' });
}
// 3. 执行实际的操作——在auth-service的范围内,这些操作是原子的、可靠的。
return withSequelizeTransaction(this.sequelize, async (tx) => {
const org = await this.organizationModel.create({ ... }, { transaction: tx });
const user = await this.userModel.create({ ..., organizationId: org.id }, { transaction: tx });
await this.userRoleModel.create({ userId: user.id, roleId: agencyAdminRole.id }, { transaction: tx });
// 创建审计记录,以便日后进行补偿操作。
await this.provisionRecordModel.create(
{ sagaId: req.sagaId, organizationId: org.id, userId: user.id, roleId: agencyAdminRole.id },
{ transaction: tx },
);
return serviceSuccess('注册成功', {
userId: user.id, organizationId: org.id, userRoleId: agencyAdminRole.id,
});
});
}
有三点使得这种方法能够确保“ Saga的安全性”:
-
首先进行幂等性检查: 如果协调器重新尝试执行操作(例如网络故障或gRPC超时),第二次调用将不会产生任何实际效果,且会返回相同的ID。这样就能避免出现重复的用户记录。
-
在事务之外进行验证: 先进行简单的读取操作,再执行耗时的写入操作。
-
所有写操作都包含在同一个事务中: 如果任何插入操作失败,整个事务会自动回滚。协调器会收到表示操作失败的响应,从而知道没有任何数据被持久化保存。
agency_provision_records表是参与者系统中最为关键的部分。它既是实现幂等性的关键依据,也是进行补偿操作时所需的查询依据——该表的键值对与协调器所使用的sagaId完全一致。
11. 回滚与补偿操作
补偿操作其实就是另一个gRPC调用。协调器会发送sagaId以及它所记录的那些ID,然后参与者会在自己的数据库事务中,按照相反的依赖顺序删除所有被创建的数据。
在协调器端
private async compensateAuth(saga: AgencyOnboardingSaga, errorCode?: string) {
if (!saga.authUserId && !saga.authOrganizationId) {
// 没有进行任何配置操作,因此不需要进行补偿。
return;
}
// 在调用补偿函数之前,先将该Saga的状态标记为“正在补偿中”,这样即使补偿操作超时,数据也能保持一致性。
await this.sagaRepository.updateSaga(saga, {
status: AgencyOnboardingSagaStatus.COMPENSATING,
currentStep: 'COMPENSATING',
errorCode,
});
try {
const rollback = await firstValueFrom(this.authClient.compensateAgencyAccount({
sagaId: saga.sagaId,
organizationId: saga.authOrganizationId,
userId: saga.authUserId,
}));
if (!rollback.status) {
this logger.error(`[\({saga.sagaId}] 认证补偿操作失败:\){rollback.message}`);
}
} catch (err) {
thislogger.error(`[\({saga.sagaId}] 认证补偿RPC调用失败:\){err.message}`);
}
await this.sagaRepository.updateSaga(saga, {
status: AgencyOnboardingSagaStatus.COMPENSATED,
currentStep: 'COMPENSATED',
});
}
在参与者端
private async rollbackProvisionedAuth(req, sagaId: string, tx: Transaction) {
// 即使调用者忘记了所需的ID,也可以通过Saga日志来获取这些信息。
const record = await this.provisionRecordModel.findOne({
where: { sagaId }, transaction: tx,
});
const userId = req.userId ?? record?.userId;
const organizationId = req.organizationId ?? record?.organizationId;
if (userId) {
const user = await this.userModel.findByPk(userId, { transaction: tx, attributes: ['email'] });
await this.userRoleModel.destroy({ where: { userId }, transaction: tx });
if (user?.email) {
await this.passwordResetTokenModel.destroy({ where: { email: user.email }, transaction: tx });
}
await this.userModel.destroy({ where: { id: userId }, transaction: tx });
}
if (organizationId) {
await this.organizationModel_destroy({ where: { id: organizationId }, transaction: tx });
}
if (record) {
await record.destroy({ transaction: tx });
}
}
良好补偿机制的规则
-
颠倒创建顺序:先处理子节点(用户角色、令牌),再处理父节点(用户、组织)。这一规则同样适用于
DROP TABLE语句。 -
确保操作具有幂等性:如果多次接收到相同的
sagaId,系统必须能够安全地处理这些请求;如果相关记录已经不存在,执行destroy操作应该不会产生任何效果。 -
使用saga日志而不仅仅是请求数据:如果调用者忘记了某个ID或发送了不完整的请求数据,可以通过
sagaId来查找相关信息。这种设计能够提供更强大的防护能力。 -
将操作封装在本地事务中:回滚操作本身也必须具有原子性——如果回滚操作只完成了一部分,其结果会比不进行任何操作还要糟糕。
-
务必在协调者端完成整个流程:
即使RPC调用失败,也要将相关状态标记为
COMPENSATED。同时,还需要记录故障信息(通过日志、指标或警报系统)。如果有一行记录的状态一直停留在COMPENSATING阶段,那么这就会成为运行中的隐患。
如果补偿操作本身失败会怎样?
这是任何saga设计中最糟糕的情况。对此有三种合理的应对策略:
首先,可以采用指数退避算法进行重试。这种方法适用于那些暂时性的故障(例如网络问题或死锁情况)。
其次,可以将这些失败请求放入“需要人工处理”的队列中,并发出警报。
第三,可以提供手动回滚的接口。这个参考实现是通过RollbackAgencyOnboarding gRPC接口来实现这一功能的,这样操作人员就可以使用相同的sagaId来重新执行补偿操作。
在实际的生产环境中,应该结合这三种策略。具体采用哪种策略,需要根据你的业务风险来决定。
12. 跟踪、幂等性与可观测性
通过使用两个都以相同UUIDsagaId作为键的表格,就可以实现跨服务的全程追踪功能。
协调者端——agency_onboarding_sagas
| 列名 | 用途 |
|---|---|
sagaId (UUID,唯一标识) |
这个ID会传递给所有的RPC调用,是跨服务进行关联的键。 |
status |
状态机中的当前状态。 |
currentStep |
供仪表盘使用的易读标签(例如PROVISION_AUTH、CREATE_AGENCY等)。 |
payload (JSONB) |
输入数据的快照,用于回放、调试或支持工作。 |
authOrganizationId, authUserId, authUserRoleId |
进行补偿操作时所需的外部ID。 |
agencyId |
一旦相关记录存在,这个ID就会被设置出来。 |
errorCode, errorMessage |
在发生故障时会被填充这些信息。 |
createdAt, updatedAt |
记录整个操作流程的时间线。 |
COMPLETED状态下的记录大致如下所示:
{
"sagaId": "0a4f3e2c-7b11-4f8d-9a2c-90b6f5f5b8a1",
"status": "COMPLETED",
"currentStep": "COMPLETED",
"agencyId": 17,
"authOrganizationId": 42,
"authUserId": 99,
"authUserRoleId": 3,
"errorCode": null,
"errorMessage": null,
"payload": { "agencyName": "Acme Education", "email": "admin@acme.com", "...": "..." },
"createdAt": "2026-05-22T10:14:32.118Z",
"updatedAt": "2026-05-22T10:14:33.412Z"
}
参与者端 — agency_provision_records
| 列名 | 用途 |
|---|---|
sagaId(唯一标识符) |
用于确保操作的可重复性。该值与协调器端使用的sagaId相同。 |
userId, organizationId, roleId |
这些信息用于在需要进行补偿操作时确定需要删除哪些数据。 |
createdAt, updatedAt |
记录创建和更新的时间戳,用于审计用途。 |
免费可用的监控功能
由于每条日志记录前都会加上[${sagaId}]这个前缀,因此只需在两个服务上同时使用grep命令,就能获取完整的操作时间线:
[0a4f3e2c…] PROVISION_AUTH agency-service
[0a4f3e2c…] provisionAgencyAccount: ok auth-service
[0a4f3e2c…] CREATE_AGENCY agency-service
[0a4f3e2c…] Agency step failed: ... agency-service
[0a4f3e2c…] Auth compensation completed auth-service
在采用结构化日志记录方案的环境中(如Loki、Elasticsearch、Datadog),只需点击一下即可筛选出所需的日志信息。sagaId就是用于追踪整个操作流程的关键标识。
13. 测试Saga流程
Saga本质上就是一个状态机,因此测试用例也是有限且易于编制的。至少需要覆盖以下几种情况:
| 序号 | 测试场景 | 预期结果 |
|---|---|---|
| 1 | 正常流程 | COMPLETED,代理机构存在,用户也存在 |
| 2 | 认证步骤失败(例如电子邮件地址已存在) | FAILED,两侧的记录都应该为空 |
| 3 | 代理机构创建步骤失败 | COMPENSATED,认证相关的记录会被删除,代理机构也不会被创建 |
| 4 | 补偿操作超时 | COMPENSATING → 由系统自动进行恢复处理 |
| 5 | 调用者使用相同的sagaId重新尝试操作 |
第二次调用会返回第一次调用的结果,不会产生重复记录 |
| 6 | 发送欢迎邮件失败 | COMPLETED——非关键步骤失败并不会引发连锁反应 |
关于测试,这里有两点实用建议:
首先,应在协调器层面对gRPC客户端进行模拟测试,而不是在网络层进行测试。你需要确认的是compensateAgencyAccount这个方法确实是使用正确的sagaId被调用的,而不仅仅是确保数据包成功发送到了目标节点。
其次,在集成测试中使用真实的Postgres数据库(可以使用Testcontainers,或者Docker Compose中的postgres服务)。与模拟环境相比,使用真实数据库进行测试会更加可靠;而与模拟环境相比,使用真实数据库也更容易导致系统出现故障。
14. 何时不应使用Saga模式
在以下情况下,应避免使用Saga模式:
-
如果所有写操作都由同一个服务完成,则应使用常规的数据库事务机制,无需重新设计系统结构。
-
那么SELECT操作本身并不需要支持回滚功能。
-
例如已经发送了真实的电子邮件、已经从信用卡中扣款且退款渠道不可用,此时也不应使用Saga模式。
-
在单体应用中使用Saga模式属于过度设计。应等到服务边界明确划分之后再使用这一模式。
Saga模式会增加状态表、为每一步操作指定补偿机制,并要求通过sagaId来管理这些操作。然而,只有当使用Saga模式能避免数据不一致的问题时,这种额外的开销才是值得的。
15. 权衡与经验教训
在这种设计中,以下做法效果良好:
-
同步协调机制比异步协调机制更易于调试。新工程师只需阅读一份文档就能理解整个流程。
-
参与者的操作必须具备幂等性,协调者发出的重试请求也必须是安全的。这一点从项目一开始就必须得到重视,因为后期修改会带来很大的麻烦。
-
Saga模式使用的状态表取代了传统的人工管理方式,运维人员只需通过一个SQL查询就能了解“某个注册操作的具体情况”。在出现问题时,JSONB格式的数据格式也非常有用。
-
使用
sagaId作为追踪键,可以与OpenTelemetry、Datadog和Loki等工具很好地配合使用,无需额外配置基础设施。
在采用这种设计模式之前,需要了解以下注意事项:
-
如果补偿操作失败,将会导致最糟糕的结果。因此,必须从一开始就为
compensateAgencyAccount函数设置重试机制、处理无效请求的逻辑以及手动回滚的功能。 -
对于非关键步骤,应明确标注其是否可以失败,例如欢迎邮件的发送过程即使失败也不会影响机构的状态。切勿因为SMTP服务出现故障而错误地执行补偿操作。
-
Saga模式并不能替代本地事务机制。在每个服务内部,仍然应该使用真实的数据库事务来保证数据的一致性。Saga模式仅用于处理跨服务之间的协调问题。
-
虽然同步的gRPC调用简单易用,但会降低系统的可用性。如果
auth-service出现故障,机构的创建操作就会失败。因此,在需要更高可靠性的场景下,应该将gRPC调用替换为持久化的消息队列(如RabbitMQ或Kafka),并将每一步操作视为命令与响应的交互过程。 -
协调者服务本身也属于关键组件,因此必须确保其正常运行。应监控Saga模式的执行时间,当发现
COMPENSATING状态持续存在时及时发出警报,并部署多个协调者实例以确保系统的稳定性。
16. 结论
“Saga模式”并非什么神奇的东西。它其实只是经验丰富的工程师们一直以来手工所采用的方法的规范化体现:在本地执行操作,记录自己做了什么,并且知道如何撤销这些操作。
在Node.js和NestJS环境中,你只需要三个关键要素即可实现这一模式:
-
状态表,用于追踪整个操作流程。
-
协调器,负责驱动工作流程并更新状态信息。
-
参与者,提供
执行操作和撤销操作功能,这些操作都是幂等的,并且通过sagaId进行标识。
只要正确配置这三个要素,你的微服务就能像单体应用程序一样,实现“全有或全无”的操作效果——同时避免了分布式锁带来的管理难题。
起步时可以从简单的地方开始,运用协调机制,确保每个操作都是幂等的,在执行操作之前先进行持久化存储,并且始终清楚如何撤销这些操作。这就是整个Saga模式的精髓所在。


