您已经配置好了Stripe,结账流程也可以正常运行了,顾客们也能够进行支付。但问题在于:在支付完成之后会发生什么呢?

大多数支付集成方案都在处理webhook时出现故障。您的服务器在尝试提供访问权限时会崩溃;当您试图发送确认邮件时,您的电子邮件服务可能会中断;而在进行数据库写操作时,系统也可能会超时。

Stripe会重新尝试执行整个webhook流程,但您的处理程序可能在服务器崩溃之前就已经发送了确认邮件。这样一来,顾客就会收到两封邮件,但却无法完成后续的操作。

本文将向您说明如何解决这些问题。您将学习到如何构建能够在系统出现故障时依然正常运行的webhook处理程序——具体方法就是将支付完成后的处理逻辑分解成多个可靠、可以独立重试的步骤。这种模式适用于任何需要多步处理的webhook场景,而不仅仅限于Stripe。

以下是您将学到的内容:

  • 为什么在真实环境中Stripe的webhook会默默地出现故障

  • 简单直接的webhook处理方式在现实环境中为何会失败

  • 正确的处理模式:接收webhook请求、进行验证,然后将其放入队列中等待后续处理

  • 如何构建一个每个步骤都带有检查点的可靠购买流程

  • 如何使用相同的模式来处理退款和被放弃的结账请求

  • 如何在当地环境中测试webhook处理程序

先决条件

为了顺利跟随本文的学习,您需要具备以下知识:

  • Node.js和TypeScript

  • 基本的Stripe集成技能(如结账流程、webhook配置等)

  • SQL数据库的使用知识(示例中使用了PostgreSQL及Drizzle ORM框架)

  • npm或任何Node.js包管理工具

您不需要事先具备关于Inngest或持久化执行机制的经验,因为本文会从零开始为您讲解这些内容。

需要安装的软件

如果您想要运行示例代码,请安装以下包:

npm install inngest stripe drizzle-orm @react-email/components resend

此外,您还需要Stripe CLI工具来进行本地webhook测试。在macOS系统中,可以通过Homebrew来安装它(命令为brew install stripe/stripe-cli/stripe);对于其他平台,只需按照Stripe官方文档中的说明进行安装即可。

目录结构

为什么Stripe Webhook会默默地失败

理想的情况其实很简单:顾客完成支付后,Stripe会向你的服务器发送checkout.session_completed事件,然后你的处理程序就会对其进行处理。在开发环境中,这种机制每次都能正常工作。

但在生产环境中情况就不同了:当支付成功后,你的Webhook处理程序通常需要执行多项操作——从在数据库中查找用户信息、记录购买记录,到发送确认邮件、通知管理员,以及授予用户使用产品的权限(可能是通过GitHub邀请或API密钥),最后还要安排后续的提醒邮件。这些操作总共涉及三到四个外部服务。

以下是可能导致你的Webhook处理程序出现故障的情况:

1. 你的服务器在处理过程中崩溃

数据库写入操作成功了,但确认邮件却未能发送出去。Stripe会重新尝试发送Webhook事件,此时你的处理程序也会再次被执行。

这样一来,数据库中就会出现重复的记录,或者因为违反唯一性约束而导致故障发生,从而使得重试失败。

2. 某个外部服务暂时无法正常使用

你的邮件发送服务返回了500错误代码;GitHub的API调用被限制了请求频率;数据分析服务也出现了超时现象。

这种情况下,Webhook处理程序会抛出异常,Stripe会重新尝试整个流程。不过那些已经成功完成的操作(比如数据库写入、第一封邮件的发送)也会被再次执行。

3. 处理程序超时

Stripe要求处理程序在大约20秒内返回2xx状态码。如果你的处理程序执行了过多的操作,Stripe就会将其视为失败并重新尝试。不过,在超时发生之前,你的处理程序可能已经完成了一部分工作。

4>部分操作成功但无法回滚

这是最糟糕的情况:前三个步骤都成功完成了,但第四步失败了。Stripe会重新尝试,前三个步骤又会再次被执行。

结果就是顾客会收到两封确认邮件,数据库中也会出现重复的记录,但由于根本问题仍未得到解决,第四步仍然会失败。

5>重试时出现的竞争条件

即使你的端没有出现故障,Stripe也可能会多次发送同一事件。网络故障、负载均衡器的超时,以及Stripe自身的重试机制,都会导致处理程序需要应对重复的事件。如果你的处理程序在每个步骤中都不能保证操作是幂等的,那么重复发送事件只会使问题更加复杂。

Stripe的重试机制设计得相当合理——它采用指数退避策略,在几天内会尝试多次重试。但是,Stripe会重新尝试整个Webhook发送流程

它无法判断你的处理程序是否已经完成了前三个步骤,因此只能选择重新尝试全部操作。这种区分工作其实是由你来完成的。

根本问题在于:你的Webhook处理程序在一次请求中执行了过多的操作。每一个外部调用都可能成为故障点,而且这些操作之间没有任何检查点机制。当其中一个步骤失败时,你就无法知道哪些步骤已经成功完成了。

天真的处理方式及其失败原因

下面是一个典型的Webhook处理函数的示例。我在各种代码库、教程以及Stack Overflow的回答中看到了数百种这种模式的变体:

app.post("/api/payments/webhook", async (req, res) => {
  const event = stripe.webhooksconstructEvent(
    req.body,
    req.headers["stripe-signature"],
    process.env.STRIPE_WEBHOOK_SECRET
  );

  if (event.type === "checkout.session_completed") {
    const session = event.data.object;

    // 第一步:查找用户信息
    const user = await db.users.findOne({ id: session.metadata.userId });

    // 第二步:记录购买信息
    await db.purchases.insert({
      userId: user.id,
      stripeSessionId: session.id,
      amount: session.amount_total,
      status: "completed",
    });

    // 第三步:发送确认邮件
    await sendEmail({
      to: user.email,
      subject: "购买已确认!",
      template: "purchase-confirmation",
    });

    // 第四步:授予产品访问权限(发送GitHub仓库邀请)
    await addCollaborator(user.githubUsername);

    // 第五步:发送访问权限邮件
    await sendEmail({
      to: user.email,
      subject: "您的仓库访问权限已准备就绪!",
      template: "repo-access",
    });

    // 第六步:跟踪分析数据
    await analytics.track(user.id, "purchase_completed", {
      amount: session.amount_total,
    });
  }

  res.json({ received: true });
});

这个代码看起来很简洁,阅读起来也很顺滑。所有的教程都是按照这种方式来讲解的。

现在我们来分析一下当第四步失败时会发生什么。也许GitHub的API存在速率限制,导致addCollaborator调用出现错误,此时你的处理函数会向Stripe返回500状态码。

故障发生后的系统状态如下:

  • 用户信息在数据库中确实存在(第一步操作没有问题)。

  • 购买记录已经创建成功(第二步操作完成)。

  • 确认邮件也已经发送出去(第三步操作成功)。

  • 但GitHub的访问权限并未被授予(第四步操作失败)。

  • 因此访问权限邮件也没有被发送(第五步操作根本没有执行)。

  • 分析数据的相关操作也根本没有进行(第六步操作同样没有执行)。

Stripe会重新尝试触发Webhook,此时你的处理函数会从第一步开始再次执行:

  • 第一步:再次查找用户信息。这个步骤没有问题。

  • 第二步:尝试再次插入购买记录。如果stripeSessionId存在唯一性约束,那么这个操作就会失败;如果没有这种约束,就会产生重复的购买记录。

  • 第三步:再次发送确认邮件。顾客会收到第二封“购买已确认!”的邮件。

  • 第四步:再次尝试授予GitHub访问权限。这次可能会成功,也有可能还是失败。

  • 第五步到第六步:是否执行这些操作取决于第四步的结果。

你可以通过添加幂等性检查来修复这个问题,比如“如果购买记录已经存在,就跳过第二步”。但这样一来,你的处理函数中的每一步都会包含大量的条件判断逻辑。而且,由于没有自己的跟踪系统,你仍然无法检测“是否已经发送过这封邮件”这一情况,因此重复发送邮件的问题依然存在。

这种做法并不具备可扩展性。每增加一个新的步骤,就会产生另一种故障模式、另一项需要检查的幂等性条件,以及另一个边缘情况。

这种模式:从Webhook到事件处理,再到持久化函数

解决这个问题的方法是分离职责。你的Webhook处理器应该只负责一件事:验证传入的事件,并将其放入队列中等待处理。仅此而已。

所有实际的工作任务(如数据库写入操作、发送邮件、调用API以及进行数据分析)都应该由持久化后台函数来完成。在这些函数中,每一步都会被单独标记为检查点,如果步骤失败,系统会自动重试,并对整个执行过程进行跟踪记录。

具体的处理流程如下:

Stripe Webhook
    |
    v
Webhook端点(验证签名、提取事件信息、将其加入队列)
    |
    v
后台作业系统(接收事件信息)
    |
    v
持久化函数
    |-- 第1步:查询用户信息及购买记录(标记为检查点)
    |-- 第2步:进行数据分析(标记为检查点)
    |-- 第3步:发送确认邮件(标记为检查点)
    |-- 第4步:向管理员发送通知(标记为检查点)
    |-- 第5步:授予GitHub访问权限(标记为检查点)
    |-- 第6步:跟踪GitHub访问权限的使用情况(标记为检查点)
    |-- 第7步:更新购买记录(标记为检查点)
    |-- 第8步:发送仓库访问权限相关邮件(标记为检查点)
    |-- 第9步:安排后续操作流程(标记为检查点)

每一个被step.run()包装起来的步骤都构成了一个持久化的检查点。如果第5步失败了:

  • 第1步到第4步不会被重新执行,它们的执行结果会被缓存起来。

  • 第5步会独立地重试,且会拥有自己的重试次数计数器。

  • 一旦第5步成功执行,第6步到第9步就会继续执行。

这就是“持久化执行”的含义:函数的执行进度能够在发生故障时得到保留。系统会针对每个步骤进行重试,而不会重新执行整个函数;因此,不会出现重复发送邮件的情况,也不会出现重复写入数据库的数据或部分完成操作的情况。

我使用Inngest来实现这一目标。它是一个基于事件驱动的持久化执行平台,能够直接提供步骤级别的检查点功能。你只需要用step.run()块来定义函数,Inngest会负责处理重试逻辑、状态保存以及性能监控等工作。完全不需要使用Redis、工作进程或自定义的重试代码。

其他工具也可以实现类似的功能(比如Temporal),但Inngest对TypeScript的支持让我对其产生了兴趣。你只需要编写普通的异步函数,而step.run()这个包装器就是唯一需要添加的部分。

如何设置Webhook端点

你的Webhook端点应该尽可能简单。只需验证签名、提取事件数据,然后将其发送到后台作业系统,并立即返回200状态码即可。

以下是我实际生产代码中使用的Webhook端点示例:

import { constructWebhookEvent } from "@/lib/payments";
import { inngest } from 「@lib/jobs";

app.post("/api/payments/webhook", async ({ request, set }) => {
  const body = await request.text();
  const sig = request.headers.get("stripe-signature");

  if (!sig) {
    set.status = 400;
    return { error: "签名缺失" };
  }

  try {
    const event = await constructWebhookEvent(body, sig);
    console.log(`[Webhook] 收到类型为 ${event.type} 的事件`);
    
    if (event.type === "charge.refunded") {
      const charge = event.data.object;
      await inngest.send({
        name: "stripe/charge/refunded",
        data: {
          chargeId: charge.id,
          paymentIntentId: charge.payment(intent,
          amountRefunded: charge.amount_refunded,
          originalAmount: charge.amount,
          currency: charge(currency,
        },
      });
    }

    if (event.type === "checkout.session.expired") {
      const session = event.data.object;
      await inngest.send({
        name: "stripe/checkout.session.expired",
        data: {
          sessionId: session.id,
          customerEmail: session.customer_email,
        },
      });
    }

    return { received: true };
  } catch (error) {
    console.error("[Webhook] Stripe验证失败:", error);
    set.status = 400;
    return { error: "Webhook验证失败" };
  }
});

请注意,这个处理程序并不执行以下操作:它不会查询用户信息、写入数据库、发送电子邮件,也不会调用外部API。它只会验证Stripe生成的签名,提取相关数据,然后将处理结果发送给Inngest系统。整个处理过程在几毫秒内就能完成。

constructWebhookEvent函数负责执行Stripe的签名验证逻辑:

import Stripe from "stripe";

export async function constructWebhookEvent(
payload: string | Buffer,
signature: string
) {
const webhookSecret = process.env.STRIPE_WEBHOOK_SECRET;
if (!webhookSecret) {
throw new Error("STRIPE/Webhook SECRET未设置");
}
const client = new Stripe(process.env.STRIPE_SECRET_KEY);
return client.webhooks.constructEventAsync(payload, signature, webhookSecret);
}

有一个非常重要的细节:在向Stripe进行签名验证时,必须传递原始请求体(无论是以字符串形式还是缓冲区形式)。如果你的开发框架在你能获取到原始字符串之前就已经将其解析成了JSON格式,那么签名验证就会失败。这就是导致“Webhook签名验证失败”错误的最常见原因。

Inngest客户端的配置非常简单:

import { Inngest } from "inngest";

export const inngest = new Inngest({
id: "my-app",
});

具体来说,在购买流程中,会有一个不同的接口被用来发送事件数据(例如在客户完成Stripe结算流程后,前端会调用“claim”接口)。但处理原理是一样的:验证数据、将其加入队列、然后返回结果。

// 在通过Stripe确认支付状态后
await inngest.send({
name: "purchase/completed",
data: {
userId: session.user.id,
tier,
sessionId,
},
});

如何构建可靠的购买流程

这就是本文的核心内容。handlePurchaseCompleted函数通过9个独立的处理步骤来完成支付后的相关操作,每一个步骤都是真正的生产环境代码。

下面的示例使用了私有的GitHub仓库,因为这款产品就是通过这种方式进行数据存储和管理的。

你的产品在实际应用中,“授予访问权限”这个步骤可能会有所不同:可能是将用户升级为Pro会员、提供API调用额度、解锁课程内容,或是激活订阅服务。但无论具体是什么操作,处理流程的基本框架都是相同的。

包含9个步骤的可靠购买流程图,其中第5步失败后会重新尝试,而前4步则保持原状

如果第5步失败了(比如电子邮件服务暂时无法使用),Inngest系统只会重新尝试第5步,而前4步不会被重新执行。第6步到第9步则会等待第5步成功后再继续进行。

import { eq } from "drizzle-orm";
import { createElement } from "react";

import { inngest } from "@/lib/jobs/client";
import { trackServerEvent } from "@lib/analytics/server";
import { brand } from "@lib/brand";
import { db, purchases, users } from "@lib/db";
import {
sendEmail,
PurchaseConfirmationEmail,
AdminPurchaseNotificationEmail,
RepoAccessGrantedEmail,
} from "@lib/email";
import { addCollaborator } from "@lib/github";

export const handlePurchaseCompleted = inngest.createFunction(
{ id: "purchase-completed", triggers: [{ event: "purchase/completed" }] },
async ({ event, step }) => {
const { userId, tier, sessionId } = event.data;

// 第一步:查询用户信息及购买详情
const { user, purchase } = await step.run(
"lookup-user-and-purchase",
async () => {
const userResult = await db
.select({
id: users.id,
email: users.email,
name: users.name,
githubUsername: users.githubUsername,
})
.from(users)
.where(eq(users.id, userId))
.limit(1);

const foundUser = userResult[0];
if (!foundUser) {
throw new Error(`未找到用户:${userId}`);
}

const purchaseResult = await db
.select({
amount: purchases.amount,
currency: purchases(currency,
stripePaymentIntentId: purchases.stripePaymentIntentId,
})
.from(purchases)
.where(eq(purchases.stripeCheckoutSessionId, sessionId))
.limit(1);

const foundPurchase = purchaseResult[0];

return {
user: foundUser,
purchase: foundPurchase ?? {
amount: 0,
currency: "usd",
stripePaymentIntentId: null,
},
};
}
);

// 第二步:在分析系统中记录购买完成情况
await step.run("track-purchase-to-posthog", async () => {
await trackServerEvent(userId, "purchase_completed_server", {
tier,
amount_cents: purchase.amount,
currency: purchase(currency,
stripe_session_id: sessionId,
});
});

// 第三步:向客户发送购买确认邮件
await step.run("send-purchase-confirmation", async () => {
await sendEmail({
to: user.email,
subject: `您的购买已确认!`,
template: createElement(PurchaseConfirmationEmail, {
amount: purchase.amount,
currency: purchase(currency,
customerEmail: user.email,
}),
});
});

// 第四步:向管理员发送通知邮件
await step.run("send-admin-notification", async () => {
const adminEmail = process.env ADMIN_EMAIL;
if (!adminEmail) return;

await sendEmail({
to: adminEmail,
subject: `有新销售记录:${user.email}`),
template: createElement(AdminPurchaseNotificationEmail, {
amount: purchase.amount,
currency: purchase(currency,
customerEmail: user.email,
customerName: user.name,
stripeSessionId: purchase.stripePaymentIntentId ?? sessionId,
}),
});
});

// 如果用户没有GitHub用户名,则提前结束流程
if (!user.githubUsername) {
return { success: true, userId, tier, githubAccessGranted: false };
}

// 第五步:授予用户访问GitHub仓库的权限
const collaboratorResult = await step.run(
"add-github-collaborator",
async () => {
return addCollaborator(user.githubUsername!);
}
);

// 第六步:在分析系统中记录GitHub访问权限的授予情况
await step.run("track-github-access", async () => {
await trackServerEvent(userId, "github_access_granted", {
tier,
github_username: user.githubUsername,
invitation_status: collaboratorResult.status,
});
});

// 第七步:更新购买记录
await step.run("update-purchase-record", async () => {
await db
.update(purchases)
.set({
githubAccessGranted: true,
githubInvitationId: collaboratorResult.status,
.updated: new Date(),
})
.where(eq(purchases.stripeCheckoutSessionId, sessionId));
});

// 第八步:向用户发送关于GitHub仓库访问权限的邮件
await step.run("send-repo-access-email", async () => {
await sendEmail({
to: user.email,
subject: `您的GitHub仓库访问权限已准备好!`,
template: createElement(RepoAccessGrantedEmail, {
repoUrl: "https://github.com/your-org/your-repo",
}),
});
});

// 第九步:安排后续邮件发送计划
await step.run("schedule-follow-up", async () => {
const purchaseRecord = await db
.select({ id: purchases.id })
.from(purchases)
.where(eq(purchases.stripeCheckoutSessionId, sessionId))
.limit(1);

if (purchaseRecord[0]) {
await inngest.send({
name: "purchase/follow-up.scheduled",
data: {
userId,
purchaseId: purchaseRecord[0].id,
tier,
},
});
}
});

return { success: true, userId, tier, githubAccessGranted: true };
}
);

这段代码确实很长。让我来一步步解释其中的原因,以及为什么这些步骤需要被单独处理。

步骤1:查找用户信息并完成购买操作

const { user, purchase } = await step.run(
  "lookup-user-and-purchase",
  async () => {
    // ... 执行数据库查询 ...
    return { user: foundUser, purchase: foundPurchase };
  }
);

这个步骤会向数据库中查询用户的详细信息以及购买记录。如果数据库暂时无法访问,这个步骤会自动重试。

userpurchase这两个返回值会被Inngest系统缓存起来。后续的所有步骤都可以直接使用user.emailuser.githubUsername以及purchase.amount,而无需再次查询数据库。

如果这个步骤永久性地失败(例如用户根本不存在),系统会抛出错误,从而终止整个函数的执行。这是有意为之的——如果无法找到用户,继续执行后续操作也没有意义。

步骤2:跟踪分析数据

await step.run("track-purchase-to-posthog", async () => {
  await trackServerEvent(userId, "purchase_completed_server", {
    tier,
    amount_cents: purchase.amount,
  });
});

分析数据的跟踪被单独列为一个步骤,是因为分析服务本身也会遇到各种故障情况(比如速率限制、系统中断或网络延迟)。如果PostHog服务器出现故障,也不应该因此影响到确认邮件的发送。

步骤3:发送购买确认邮件

await step.run("send-purchase-confirmation", async () => {
  await sendEmail({
    to: user.email,
    subject: `您的购买已确认!`,
    template: createElement(PurchaseConfirmationEmail, {
      amount: purchase.amount,
      currency: purchase.currency,
      customerEmail: user.email,
    }),
  });
});

这封邮件是专门发送给客户的确认通知。它与管理员通知(步骤4)是分开处理的,因为这两项操作是相互独立的。即使管理员的通知邮件发送失败,客户仍然应该能够收到购买确认邮件。

sendEmail函数实际上会使用Resend服务来发送邮件。如果Resend返回错误代码500,这个步骤会自动重试。不过由于步骤2(数据跟踪)已经完成并且被标记为检查点,因此步骤2不会被重新执行。

步骤4:发送管理员通知

await step.run("send-admin-notification", async () => {
  const adminEmail = process.env ADMIN_EMAIL;
  if (!adminEmail) return;

  await sendEmail({
    to: adminEmail,
    subject: `有新优惠信息:${user.email}`,
    template: createElement(AdminPurchaseNotificationEmail, { /* ... */ }),
  });
});

管理员通知系统与面向客户的操作是完全独立的。将它们分开处理,意味着其中一个环节出现故障也不会影响到另一个环节的正常运行。

步骤5:授予GitHub访问权限

const collaboratorResult = await step.run(
  "add-github-collaborator",
  async () => {
    return addCollaborator(user.githubUsername!);
  }
);

这一步骤最有可能出现故障。GitHub的API存在速率限制,可能会超时,而且用户的GitHub用户名也可能无效。

由于将这一操作单独列为一个步骤,因此如果GitHub API出现故障,也不会导致确认邮件(步骤3)或管理员通知(步骤4)被重新发送。因为这些步骤已经在之前的环节中完成了相应的处理。

请注意,在这个步骤之前会进行提前判断:如果用户没有GitHub用户名,那么在步骤4执行完毕后,函数会立即返回结果。只有当用户确实拥有GitHub账户时,后续的步骤才会继续执行。

步骤6:跟踪GitHub访问权限的状态

await step.run("track-github-access", async () => {
  await trackServerEvent(userId, "github_access_granted", {
    tier,
    github_username: user.githubUsername,
    invitation_status: collaboratorResult.status,
  });
});

这一操作会使用步骤5中获得的collaboratorResult数据。由于step.run()会缓存函数返回的结果,因此即使步骤5和步骤6之间的执行过程被中断后又重新开始,collaboratorResult.status的值仍然可以正常使用。

步骤7:更新购买记录

await step.run("update-purchase-record", async () => {
  await db
    .update(purchases)
    .set({
      githubAccessGranted: true,
      githubInvitationId: collaboratorResult.status,
     .updated: new Date(),
    })
    .where(eq(purchases.stripeCheckoutSessionId, sessionId));
});

数据库的更新操作要等到GitHub访问权限被确认之后才能进行。只有当邀请成功后,才会将githubAccessGranted: true这个字段设置为true。

如果在授予访问权限之前就先更新了数据库记录,而后续的GitHub操作又失败了,那么数据库中就会显示访问权限已经被授予,但实际上并没有。

步骤8:发送仓库访问权限确认邮件

await step.run("send-repo-access-email", async () => {
  await sendEmail({
    to: user.email,
    subject: `您的仓库访问权限已准备就绪!`,
    template: createElement(RepoAccessGrantedEmail, {
      repoUrl: "https://github.com/your-org/your-repo",
    }),
  });
});

这封邮件只有在GitHub邀请被确认(步骤5)且数据库记录被更新(步骤7)之后才会发送。这样的顺序安排非常重要,因为如果在邀请尚未发送的情况下就通知客户“访问权限已准备就绪”,那是不对的。

步骤9:安排后续操作流程

await step.run("schedule-follow-up", async () => {
  const purchaseRecord = await db
    .select({ id: purchases.id })
    .from(purchases)
    .where(eq(purchases.stripeCheckoutSessionId, sessionId))
    .limit(1);

  if (purchaseRecord[0]) {
    await inngest.send({
      name: "purchase/follow-up.scheduled",
      data: {
        userId,
        purchaseId: purchaseRecord[0].id,
        tier,
      },
    });
  }
});

最后一步会触发一个独立的 Inngest 函数,该函数负责发送后续邮件(第7天发送入职指南邮件,第14天发送反馈请求邮件,第30天发送推荐信请求邮件)。这是一个基于事件驱动的流程:一个函数完成执行后会触发另一个函数。

后续处理函数会使用 step.sleep() 来控制发送邮件的间隔时间:

export const handlePurchaseFollowUp = inngest.createFunction(
  {
    id: "purchase-follow-up",
    triggers: [{ event: "purchase/follow-up.scheduled" }],
    cancelOn: [
      {
        event: "purchase/follow-up.cancelled",
        match: "data.purchaseId",
      },
    ],
  },
  async ({ event, step }) => {
    const { userId, purchaseId } = event.data;

    await step.sleep("wait-7-days", "7d");

    await step.run("send-day-7-email", async () => {
      // 检查用户是否符合接收邮件的条件(用户存在、未取消订阅、未收到退款)
      // 发送入职指南邮件
    });

    await step.sleep("wait-14-days", "7d");

    await step.run("send-day-14-email", async () => {
      // 发送反馈请求邮件
    });

    await step.sleep("wait-30-days", "16d");

    await step.run("send-day-30-email", async () => {
      // 发送推荐信请求邮件
    });
  }
);

请注意 cancelOn 这个选项。如果用户的购买订单被退款,系统可以发送 purchase/follow-up.cancelled 事件,这样整个后续处理流程就会停止。这样一来,那些要求退款的用户就不会收到任何多余的邮件。

为什么每个步骤都必须是独立的

规则很简单:任何需要调用外部服务或可能会独立出现故障的操作,都应该被单独列为一个步骤。

数据库查询属于一个步骤,因为数据库有时可能会暂时无法访问;发送邮件也是一个步骤,因为邮件服务商可能会返回 500 错误代码;调用 GitHub API 也同样属于一个步骤,因为这种请求可能会受到速率限制。

如果两个操作总是同时成功或同时失败(因为它们共同依赖于同一个外部服务),那么可以将它们放在同一个步骤中。但如果有疑问的话,最好还是将它们分开处理。这样做带来的开销微乎其微,而可靠性却会显著提高。

如何使用相同的流程来处理退款操作

退款的整个处理流程也遵循完全相同的步骤结构。这个函数与 handlePurchaseCompleted 位于同一个文件中,因此它们共享相同的导入依赖项(还包括来自 @/lib/githubremoveCollaborator 函数以及专门用于处理退款的邮件模板)。以下是 handleRefund 函数的代码实现:

export const handleRefund = inngest.createFunction(
  { id: "refund-processed", triggers: [{ event: "stripe/charge.refunded" }] },
  async ({ event, step }) => {
    const {
      chargeId,
      paymentIntentId,
      amountRefunded,
      originalAmount,
      currency,
    } = event.data;

    const isFullRefund = amountRefunded >= originalAmount;

    // 第一步:查找对应的购买记录和用户信息
    const { user, purchase } = await step.run(
      "lookup-purchase-by-payment-intent",
      async () => {
        const purchaseResult = await db
          .select({
            id: purchases.id,
            userId: purchases.userId,
            stripePaymentIntentId: purchases.stripePaymentIntentId,
            githubAccessGranted: purchases.githubAccessGranted,
          })
          .from(purchases)
          .where(eq(purchases.stripePaymentIntentId, paymentIntentId))
          .limit(1);

        const foundPurchase = purchaseResult[0];
        if (!foundPurchase) {
          return { user: null, purchase: null };
        }

        const userResult = await db
          .select({
            id: users.id,
            email: users.email,
            name: users.name,
            githubUsername: users.githubUsername,
          })
          .from(users)
          .where(eq(users.id, foundPurchase.userId))
          .limit(1);

        return { user: userResult[0] ?? null, purchase: foundPurchase };
      }
    );

    if (!purchase || !user) {
      return { success: false, reason: "未找到对应的购买记录或用户信息" };
    }

    let accessRevoked = false;

    // 第二步:如果退款金额为全额退款,那么撤销用户的GitHub访问权限
    if (isFullRefund && user.githubUsername && purchase.githubAccessGranted) {
      const revokeResult = await step.run(
        "revoke-github-access",
        async () => {
          return removeCollaborator(user.githubUsername!);
        }
      );
      accessRevoked = revokeResult.success;
    }

    // 第三步:更新购买记录的状态
    await step.run("update-purchase-status", async () => {
      if (isFullRefund) {
        await db
          .update(purchases)
          .set({
            status: "refunded",
            githubAccessGranted: false,
           .updated: new Date(),
          })
          .where(eq(purchases.id, purchase.id));
      } else {
        await db
          .update(purchases)
          .set({
            status: "partially_refunded",
            updated: new Date(),
          })
          .where(eq(purchases.id, purchase.id));
      }
    });

    // 第四步:在分析系统中记录退款操作
    await step.run("track-refund-event", async () => {
      await trackServerEvent(user.id, "refund_processed", {
        charge_id: chargeId,
        amount_cents: amountRefunded,
        original_amount_cents: originalAmount,
        currency,
        is_full_refund: isFullRefund,
        github_access_revoked: accessRevoked,
      });
    });

    // 第五步:通知客户退款结果
    await step.run("send-customer-notification", async () => {
      if (isFullRefund) {
        await sendEmail({
          to: user.email,
          subject: "您的退款已处理完成",
          template: createElement(AccessRevokedEmail, {
            customerEmail: user.email,
            refundAmount: amountRefunded,
            currency,
          }),
        });
      } else {
        await sendEmail(
          to: user.email,
          subject: "您的部分退款已处理完成",
          template: createElement(PartialRefundEmail, {
            customerEmail: user.email,
            refundAmount: amountRefunded,
            originalAmount,
            currency,
          }),
        );
      }
    });

    // 第六步:通知管理员退款结果
    await step.run("send-admin-notification", async () => {
      const adminEmail = process.env ADMIN_EMAIL;
      if (!adminEmail) return;

      await sendEmail(
        to: adminEmail,
        subject: `\({isFullRefund ? "全额退款" : "部分退款"}:\){user.email}`,
        template: createElement(AdminRefundNotificationEmail, {
          customerEmail: user.email,
          customerName: user.name,
          githubUsername: user.githubUsername,
          refundAmount: amountRefunded,
          originalAmount,
          currency,
          stripeChargeId: chargeId,
          accessRevoked,
          isPartialRefund: !isFullRefund,
        }),
      );
    });

    return { success: true, accessRevoked, isFullRefund, userId: user.id };
  }
);

在退款流程中,有三点值得特别注意。

  1. 部分退款与全额退款的区别:该系统通过简单的判断来区分这两种情况:amountRefunded >= originalAmount。如果是部分退款,客户仍然可以继续使用相关服务,但购买状态会变为partially_refunded;而如果是全额退款,用户的GitHub访问权限会被取消,状态会变为refunded

    这一机制对于维护数据库的准确性至关重要。后端系统(如数据看板、分析工具或支持界面)都需要获取准确的状态信息。

  2. 条件性步骤执行:“取消GitHub访问权限”这一操作只有在满足三个条件时才会执行:必须是全额退款、用户拥有GitHub账户,并且之前确实被授予了访问权限。inngest通过跳过那些不必要的步骤,使这一流程更加清晰易懂。

    相比在单一处理函数中使用复杂的if-else结构,这种设计显然更易于阅读。

  3. 为客户和管理员分别发送通知:如果退款是全额的还是部分的,系统会向客户发送不同的邮件。而管理员总会收到一份包含收费ID、客户GitHub用户名以及访问权限是否被取消等详细信息的通知。

之所以要分开这些步骤,是因为管理员通知失败不应影响客户通知的发送。客户的电子邮件通知具有更高的优先级。

如何恢复被放弃的购物车订单

在恢复被放弃的购物车订单时,step.sleep()方法发挥了重要作用。当Stripe的购物车会话过期后,系统需要发送一封提醒邮件,但此时不宜立即执行这一操作。

最好等待大约一小时,给客户足够的时间自行完成交易。

export const handleCheckoutExpired = inngest.createFunction(
  {
    id: "checkout-expired",
    triggers: [{ event: "stripe/checkout.session.expired" }],
  },
  async ({ event, step }) => {
    const { customerEmail, sessionId } = event.data;

    if (!customerEmail) {
      return { success: false, reason: "no_email" };
    }

    // 等待1小时后再发送提醒邮件
    await step.sleep("wait-before-recovery-email", "1h");

    // 发送恢复购物车订单的邮件
    await step.run("send-abandoned-cart-email", async () => {
      const checkoutUrl = `https://yoursite.com/pricing`;

      await sendEmail({
        to: customerEmail,
        subject: "您的购物车订单正在等待处理",
        template: createElement(AbandonedCartEmail, {
          customerEmail,
          checkoutUrl,
        }),
      });
    });

    // 记录这一操作
    await step.run("track-abandoned-cart", async () => {
      await trackServerEvent("anonymous", "abandoned_cart_email_sent", {
        customer_email: customerEmail,
        session_id: sessionId,
      });
    });

    return { success: true, customerEmail };
  }
);

step.sleep("wait-before-recovery-email", "1h")这一行代码是关键所在。它会让函数暂停运行一小时,而不会消耗任何计算资源。

Inngest会内部处理调度工作。一小时后,函数会重新开始执行并发送邮件。

如果没有这种可靠的执行机制,你就需要使用cron作业来查询数据库中过期的会话信息,或者利用Redis建立延迟任务队列,又或者是使用setTimeout函数,但这样的方法在服务器重启后就会失效。step.sleep()这种方式更为简单、易读,也更加可靠。

此外,该函数的顶部还设置了一个检测机制:如果Stripe没有记录该会话的顾客邮箱地址(即顾客在输入邮箱之前就完成了结账流程),函数会立即结束执行。如果没有可发送邮件的地址,安排恢复邮件的任务也就毫无意义了。

这种设计模式也可以应用于更复杂的恢复流程。例如,你可以再添加一个step.sleep()语句,如果三天后顾客仍未完成购买操作,就再发送一封跟进邮件;你还可以通过在step.run()中查询数据库来确认顾客是否已经完成了购买,如果是的话,就可以跳过发送邮件的步骤。

每增加一个处理步骤,就意味着需要多调用一次step.run()step.sleep()。这样的函数代码读起来就像是在描述你的业务逻辑,而不是杂乱无章的cron作业和数据库配置指令。

如何在本机测试Webhook处理程序

对于Stripe提供的Webhook服务来说,本机测试确实是一个比较麻烦的问题。你需要让Stripe将事件发送到你的本地机器上,同时还需要确保你的后台作业系统能够正常运行以处理这些事件。以下是具体的设置步骤。

如何在本机转发Stripe事件

请安装Stripe CLI,然后将Webhook事件转发到你的本地服务器上:

stripe listen --forward-to localhost:3000/api/payments/webhook

CLI会输出一个用于签名Webhook请求的密钥(以whsec_开头)。请将这个密钥设置为STRIPE_WEBHOOK_SECRET环境变量,以便在本地开发环境中使用。

你也可以直接触发测试事件:

stripe trigger checkout.session_completed
stripe trigger charge/refunded
stripe trigger checkout.session.expired

如何运行Inngest开发服务器

Inngest提供了一个本地开发服务器,它可以实时显示每个函数的执行情况、每一步的具体操作以及所有的重试尝试:

npx inngest-cli@latest dev -u http://localhost:3000/api/inngest

-u参数用于告知Inngest开发服务器你的应用程序运行在哪个地址上,这样它才能正确地找到并执行相应的函数。你可以在浏览器中访问http://localhost:8288来查看Inngest的控制面板。

如何观察函数的执行过程

Inngest开发控制面板真正体现了这种可靠的执行模式的优势。当你触发一个Stripe事件时,你可以看到以下内容:

  1. 该事件会出现在“事件”选项卡中。

  2. 相关函数会在“运行记录”选项卡中被触发并执行。

  3. 每个执行步骤都会被逐一显示,包括其输入数据、输出结果以及执行耗时。

  4. 如果某个步骤失败了,你会看到错误信息以及系统尝试重试的次数。

这种透明度是内嵌式Webhook处理方式所无法提供的。当有客户反馈“我已经付款了,但仍然没有收到相应服务”时,你可以通过Inngest开发控制面板查看具体是哪个步骤失败了,以及失败的原因。在生产环境中,这样的可观测性至关重要。

如何模拟故障

为了测试系统的重试机制,你可以故意让某个步骤失败。例如,在“add-github-collaborator”这个步骤中人为地抛出一个错误:

const collaboratorResult = await step.run(
  "add-github-collaborator",
  async () => {
    throw new Error("模拟的GitHub API故障");
  }
);

在Inngest开发控制面板中,你会看到以下情况:

  • 步骤1到步骤4都成功执行了,它们的结果也会被缓存下来。

  • 步骤5失败了,系统会根据重试策略再次尝试执行它。

  • 直到步骤5成功执行后,步骤6到步骤9才会继续被执行。

如果移除了之前抛出的错误,那么在下一次重试时,步骤5就会成功执行。此时步骤6到步骤9会按顺序继续执行,而步骤1到步骤4则不会被重新运行。这就是这种机制的实际运作效果。

结论

实现可靠的Stripe Webhook处理机制,关键在于遵循一个原则:**将数据接收与处理过程分开**。

你的Webhook端点只需要负责验证Stripe发送的签名信息,并将相关事件信息发送给后台作业系统即可。具体的处理工作则由那些具备持久性执行能力的函数来完成——每个执行步骤都会被单独记录下来,一旦失败就会自动重试。

这种机制能为你带来以下好处:

  • **避免重复操作**:已经成功执行的步骤不会被重新运行。

  • **确保状态的一致性**:如果某个步骤失败了,之前的步骤结果仍然会保留下来,失败步骤会独立地被重试。

  • **实现全面的可观测性**:你可以清楚地了解每次函数执行过程中到底是哪个步骤出了问题,以及出错的原因。

  • **具备延迟执行功能**:通过`step.sleep()`方法,可以无需使用cron作业来处理恢复邮件或后续操作。

  • **支持灵活的工作流程组合**:一个函数可以通过事件触发另一个函数的执行,从而形成一系列连续的操作流程。例如,在用户完成购买后,系统会自动启动为期30天的后续服务流程。

这种处理模式并不局限于Stripe。任何需要多步骤处理的Webhook场景都能从这种持久性执行机制中受益——无论是用于触发CI管道的GitHub Webhook,还是用于跟踪邮件送达情况的Resend Webhook,甚至是用于在不同服务之间同步数据的日历Webhook,都可以从中获得好处。

原则是一样的:进行验证、将相关任务加入队列,然后持久地执行这些处理流程。

我在Eden Stack项目中实际应用了这种模式。在该系统中,购买流程涵盖了从支付确认到授予GitHub仓库访问权限,以及发送多轮电子邮件通知等所有环节。这套由9个步骤组成的购买处理流程从未出现过任何步骤遗漏或邮件重复发送的情况。

如果你正在使用Stripe开发SaaS产品,建议先从这篇文章中介绍的Webhook端点模式开始入手。保持端点的简洁性,将复杂的处理逻辑分步进行持久化执行。这样,当有客户反馈“我已经付款了,但什么都没有发生”时,你就不会在凌晨3点还要去调试系统了。

如果你希望获得一套预先构建好的Stripe Webhook集成方案,该方案应包含购买流程、退款处理机制以及后续邮件发送功能,并且能够直接投入使用,Eden Stack正好满足了这些需求。它不仅包含了这篇文章中介绍的所有内容,还提供了30多种经过生产环境测试的实用方案。

Magnus Rodseth专注于开发基于人工智能的应用程序,他是Eden Stack的创建者。Eden Stack是一套专为生产环境设计的首选工具包,其中包含了30多种用于开发基于人工智能的SaaS产品的实用方案。

Comments are closed.