在基于事件驱动的系统中,当处理某个请求时,需要完成两件事情:一是将数据保存到数据库中,二是向消息代理发布一条事件,这样其他服务才能知道某些内容已经发生了变化。

这两项操作看起来很简单,但实际上它们隐藏着一个严重的可靠性问题。如果数据库写入操作成功了,但消息代理暂时无法被访问会怎么样?或者如果在执行这两个操作的某个环节中你的服务出现了故障,又会怎样?最终系统就会陷入一种不一致的状态:数据库中确实存储了新的数据,但系统的其他部分却根本不知道这一变化。

出箱模式就是为了解决这个问题而提出的一种成熟解决方案。在本教程中,你将了解到这种模式的原理、它为何能够有效解决问题,以及如何使用Go语言、PostgreSQL数据库和Google Cloud Pub/Sub来实现它。

**先决条件**
在阅读本教程之前,你需要具备以下基础知识:
– Go编程语言的基础知识
– SQL语法及PostgreSQL数据库的使用方法
– 数据库事务的概念
– 对基于事件驱动或分布式系统的基本了解(虽然有帮助,但并非必需)

**目录**

  1. 问题所在:两次操作缺乏原子性
  2. 出箱模式的工作原理
  3. 出箱表的结构设计
  4. 消息中继机制
  5. 使用Go语言和PostgreSQL的实现方式
  6. 为什么消息可能会被多次发送
  7. 替代方案:PostgreSQL的逻辑复制功能
  8. 总结

**问题所在:两次操作缺乏原子性**
要理解为什么需要设计出箱模式,就必须先了解分布式系统中的一个核心挑战:**不同系统之间的原子性保障**。

在关系型数据库中,事务这一机制允许我们将多个操作组合在一起,使得这些操作要么全部成功执行,要么全部失败。例如,如果你在同一事务中插入一条记录并更新另一条记录,那么可以确保这两项操作要么都完成,要么都不会被执行。

然而,当试图将这种原子性保障扩展到**两个不同的系统**时,问题就出现了——比如你的数据库和消息代理系统(如Kafka、RabbitMQ或Pub/Sub)。这些系统之间并没有共同的事务边界,因此无法保证这两个操作会以原子性的方式一起完成。<以下是一个典型的、在未采用“外出邮件箱模式”时会出现问题的事件驱动流程示例:

  1. 用户下了一份订单。

  2. 您的服务将这份订单保存到了数据库中 ✅

  3. 您的服务向消息代理发送了order.created事件,但代理当前处于关闭状态,因此该事件未能被成功发送 ❌

  4. 虽然订单存在于数据库中,但下游服务并未收到相关通知。

或者出现相反的情况:

  1. 您的服务首先发送了事件 ✅

  2. 您的服务尝试将订单保存到数据库中,但由于数据库超时,操作失败 ❌

  3. 下游服务接收到了关于一个实际上并不存在的订单的通知。

无论哪种情况,都会导致系统出现状态不一致的问题。而“Outbox模式”正是为了解决这个问题而设计的。

如果不使用“Outbox模式”,整个处理流程会如下所示:

不使用Outbox模式的流程图

Outbox模式的工作原理

“Outbox模式”通过将所有相关操作都放在数据库内部来确保操作的原子性:

  1. 将业务数据(例如新订单信息)保存到数据库中。

  2. 在同一数据库事务中,将事件信息写入专门用于存储待发送消息的“outbox表”中。

  3. 一个名为“Message Relay”的后台进程会定期检查“outbox表”,并将其中尚未被发送的消息发送给代理。

  4. 一旦代理确认收到了这些消息,该后台进程就会将这些消息标记为已处理。

因为步骤1和步骤2是在同一数据库事务中完成的,所以它们具有原子性——要么两者都成功完成,要么都失败。绝对不可能出现数据已经保存到数据库中,但相应的事件却未被发送的情况;也不可能出现为根本不存在的数据发送了事件的情况。

在您的主应用代码中,永远不会直接将消息发送给代理。相反,数据库被用作一个可靠的中转区。

使用Outbox模式的流程图

Outbox表的数据库结构

“outbox表”用于存储那些尚未被发送的消息,直到后台进程将它们取出并发送出去。以下是一个典型的PostgreSQL数据库表结构示例:

CREATE TABLE outbox (
    id          uuid PRIMARY KEY DEFAULT gen_random_uuid(),
    topic       varchar(255)  NOT NULL,
    message     jsonb         NOT NULL,
    state       varchar(50)   NOT NULL DEFAULT 'pending',
    created_at  timestamptz   NOT NULL DEFAULT now(),
    processed_at timestamptz
);

让我们来详细了解一下这个表中的每一列:

  • id:每条消息的唯一标识符。使用UUID可以方便地区分不同的消息。

  • topic:消息在代理系统中对应的主题或队列名称(例如orders.created)。

  • message:事件的具体内容,以JSON格式存储。这是消费者最终会接收到的数据。

  • state:用于标记消息是否已经发送。常见的状态有pending(等待发送)和processed(已成功发送)。

  • created_at:消息被插入数据库的时间。后台进程会根据这个时间顺序来处理消息。

  • processed_at:消息被代理成功发送的时间。

根据你的需求,你可能还需要添加一些额外的列:例如,可以添加一个retry_count列来记录中继尝试发送消息的次数,或者添加一个error列来记录失败的原因。

消息中继

消息中继是一个后台进程(通常是一个goroutine、一个sidecar服务,或者是一个独立的服务),它的作用是连接出箱表和消息代理。

它的职责包括:

  1. 定期查询出箱表中那些状态为‘pending’的消息。

  2. 将每条消息发布到代理中相应的主题上。

  3. 一旦代理确认消息已经送达,就将该记录的状态更新为‘processed’

  4. 能够优雅地处理失败情况:如果发送失败,就会将消息的状态保持为‘pending’,以便后续再次尝试发送。

这种设计能够确保消息至少被发送一次:即使中继进程崩溃并重新启动,消息也一定会被发送出去。不过,这样的设计也会导致某些消息可能被重复发送多次(下面会详细说明这一点),因此你的消费者程序需要能够处理这些重复的消息。

Go语言与PostgreSQL的实现

让我们来看一个具体的例子。假设你有一个订单服务,当有新的订单创建时,你需要完成以下操作:

  1. 将订单信息保存到PostgreSQL的orders表中。

  2. 向Google Cloud Pub/Sub发布一个order.created事件。

你会使用pgx这个库作为PostgreSQL的驱动程序。

订单服务

关键在于,订单信息的插入操作和出箱记录的插入操作必须在同一事务中完成。如果其中任何一步出现错误,那么这两项操作都会被回滚。

// orders/main.go

package main

import (
	"context"
	"encoding/json"
	"log"
	"os"

	"github.com/google/uuid"
	"github.com/jackc/pgx/v5"
	github.com/jackc/pgx/v5/pgxpool"
)

// Order代表我们系统中的客户订单。
type Order struct {
	ID       uuid.UUID `json:"id"`
	Product  string    `json:"product"`
	Quantity int       `json:"quantity"`
}

// OrderCreatedEvent是发送到消息代理的事件数据结构。
// 它只包含下游服务所需了解的信息。
type OrderCreatedEvent struct {
	OrderID uuid_uuid `json:"order_id"`
	Product string    `json:"product"`
}

// createOrderInTx函数可以原子性地完成新订单的插入操作及其出箱记录的创建。
// 这两个操作共享同一个事务,因此要么两者都成功执行,
// 要么都会被回滚——从而保证数据的一致性。
func createOrderInTx(ctx context.Context, tx pgx Tx, order Order) error {
	// 第一步:插入订单数据。
	_, err = tx.Exec(ctx,
		"INSERT INTO orders (id, product, quantity) VALUES (\(1, \)2, $3)",
	(order.ID, order.Product, order.Quantity),
	)
	if err != nil {
		return err
	}
	log.Printf("订单 %s 已成功插入数据库", order.ID)

	// 第二步:将事件数据序列化为字符串格式。
	event := OrderCreatedEvent{
		OrderID: order.ID,
		Product: order.Product,
	}
(msg, err := json.Marshal(event)
	if err != nil {
		return err
	}

	// 第三步:将事件记录写入出箱表中。
	// 这个操作并不会直接发送到Pub/Sub,而是会将事件放入队列中,等待后续的中继进程处理。
	_, err = tx.Exec(ctx,
		"INSERT INTO outbox (topic, message) VALUES (\(1, \)2)",
		"orders.created", msg,
	)
	if err != nil {
		return err
	}
	log.Printf("订单 %s 的出箱记录已成功插入数据库", order.ID)

	return nil
}

func main() {
(ctx := context.Background()

.pool, err := pgxpool.New(ctx, os.Getenv("DATABASE_URL"))
	if err != nil {
		log.Fatalf("无法连接数据库:%v", err)
	}
	defer pool.Close()

	// 开始一个事务,这个事务会同时包含订单的插入操作和出箱记录的创建操作。
(tx, err := pool.Begin(ctx)
	if err != nil {
		log.Fatalf("无法开始事务:%v", err)
	}
	// 如果任何步骤失败,事务最终会被回滚。
	defer tx.Rollback(ctx)

	newOrder := Order{
		ID:       uuid.New(),
		Product:  "Super Widget",
		Quantity: 10,
	}

	if err := createOrderInTx(ctx, tx, newOrder); err != nil {
		log.Fatalf("创建订单失败:%v", err)
	}

	// 提交事务,使所有的操作永久化。
	if err := tx COMMIT(ctx); err != nil {
		log.Fatalf("提交事务失败:%v", err)
	}

	log.Println("订单已成功创建,并且出箱记录也已生成并排队等待处理。")
}

请注意,createOrderInTx接收的是一个pgx.Tx对象(即一个事务),而不是数据库连接。这种设计是有意为之的:它明确了调用者有责任管理事务的边界,从而确保了数据操作的原子性。

中继服务

中继服务作为一个独立的后台进程运行。它会定期检查待发送消息表,将消息发布到Pub/Sub系统,并将这些消息标记为已处理。

这里有一个非常重要的细节:SQL查询中使用了FOR UPDATE SKIP LOCKED语句。这一PostgreSQL特性使得可以同时运行多个中继实例,而它们不会互相干扰。当某个实例锁定某行数据进行处理时,其他实例会跳过该行,直接处理下一行数据。

// relay/main.go

package main

import (
	"context"
	"log"
	"time"

	"cloud.google.com/go/pubsub"
	"github.com/google/uuid"
	"github.com/jackc/pgx/v5/pgxpool"
)

// OutboxMessage结构体包含了我们从待发送消息表中需要的所有字段。
type OutboxMessage struct {
	ID      uuid.UUID
	Topic   string
	Message []byte
}

// processOutboxMessages函数会获取一条待处理的消息,将其发布到Pub/Sub系统,
// 然后将其标记为已处理——所有这些操作都在同一个数据库事务中完成。
func processOutboxMessages(ctx context.Context, pool *pgxpool Pool, pubsubClient *pubsub.Client) error {
(tx, err := pool.Begin(ctx)
	if err != nil {
		return err
	}
	defer tx.Rollback(ctx)

	// 查询下一条待处理的消息。
	// FOR UPDATE SKIP LOCKED确保了如果有多个中继实例在运行,
	// 它们不会同时尝试处理同一条消息。
(rows, err := tx.Query(ctx, `
		SELECT id, topic, message
		FROM outbox
	WHERE state = 'pending'
	ORDER BY created_at
		LIMIT 1
	_FOR UPDATE SKIP LOCKED
	`)
	if err != nil {
		return err
	}
	defer rows.Close()

	var msg OutboxMessage
	if rows.Next() {
		if err := rows.Scan(&msg.ID, &msg.Topic, &msg.Message); err != nil {
			return err
		}
	} else {
		// 没有待处理的消息——无需执行任何操作。
		return nil
	}

	log.Printf("正在将消息 %s 发布到主题 %s", msg.ID, msg_topic)

	// 将消息发布到Pub/Sub主题,并等待确认回复。
	result := pubsubClient Topic(msg.Topic).Publish(ctx, &pubsub.Message{
	>Data: msg.Message,
	})
	if _, err = result.Get(ctx); err != nil {
		// 发布失败。此时会返回错误信息,事务不会被提交,
		// 因此该消息的状态会保持为“待处理”。
		// 中继服务会在下一次检查时重新尝试发送这条消息。
		return err
	}

	// 由于消息已经成功发送给接收方,现在可以将其标记为已处理。
	_, err = tx.Exec(ctx,
		"UPDATE outbox SET state = 'processed', processed_at = now() WHERE id = $1",
	(msg.ID,
	)
	if err != nil {
		return err
	}
	log.Printf("消息 %s 已被标记为已处理", msg.ID)

	// 提交事务:此时状态更新就会永久生效。
	return tx COMMIT(ctx)
}

func main() {
	// 在生产环境中,应使用环境变量或配置文件来初始化数据库连接。
	// 这里只是为了说明目的而使用了占位符。
	var (
	.pool         *pgxpool.Pool
		pubsubClient *pubsub.Client
	)

	// 每秒检查一次待发送消息表。
	// 根据你的延迟要求调整检查间隔。
	ticker := time.NewTicker(1 * time.Second)
	defer ticker.Stop()

	for range ticker.C {
		if err := processOutboxMessages(context.Background(), pool, pubsubClient); err != nil {
			log.Printf("处理待发送消息时出现错误:%v", err)
		}
	}
}

轮询间隔(在本例中为1秒)决定了事件从被写入待发送队列到被发布到代理服务器之间的最大延迟时间。对于大多数使用场景来说,1到5秒的延迟是完全可接受的。如果需要更低的延迟,可以缩短轮询间隔,或者考虑使用PostgreSQL的LISTEN/NOTIFY功能,在有新数据插入时立即触发中继进程。

为什么消息可能会被多次发送

你可能会疑惑:难道“待发送队列模式”不应该保证消息仅被发送一次吗?

实际上它并不能保证这一点,而是能确保消息至少被发送一次。以下是一个极端例子:

  1. 中继进程成功将消息发布到了Pub/Sub系统中。

  2. 但在它将待发送队列中的状态更新为“已处理”之前,中继进程突然崩溃了。

  3. 当中继进程重新启动后,它会发现该消息的状态仍然是“待发送”,于是又会再次将其发布出去。

这种情况虽然罕见,但确实有可能发生。处理这种问题的标准方法是确保你的消息消费者具有幂等性。这意味着它们可以安全地多次接收并处理同一条消息,而不会导致任何错误行为。

实现幂等性的常见方法包括:

  • 使用消息的id作为去重键,在处理之前先检查是否已经处理过该消息。

  • 使你的操作本身具有幂等性。例如,使用INSERT ... ON CONFLICT DO NOTHING而不是普通的INSERT语句。

另一种方案:PostgreSQL逻辑复制

上述提到的轮询方法虽然简单且效果良好,但存在两个缺点:首先会产生一定的延迟(最长可达一个轮询间隔的时间);其次,即使没有需要处理的数据,系统也会执行数据库查询操作。

对于那些对性能要求较高的高吞吐量系统来说,PostgreSQL提供了一种更高级的解决方案:通过写前日志(WAL)实现逻辑复制功能。

对PostgreSQL数据库所做的任何更改都会首先被写入WAL中——这种仅用于故障恢复和复制的日志是只能追加数据的。利用逻辑复制功能,你可以订阅特定表中的变更信息,并几乎实时地接收这些变化。

与传统的轮询机制不同,PostgreSQL会在有新数据插入待发送队列表时,立即主动通知中继进程。

这种方案能够降低延迟,同时更高效地利用系统资源,特别适合处理大量数据的场景。不过其缺点是实现难度稍高:你需要管理PostgreSQL中的复制槽,并正确处理WAL日志流。

在Go语言中,你可以使用pglogrepl库来与PostgreSQL的逻辑复制机制进行交互。

如需了解更多关于PostgreSQL中WAL机制及变更数据捕获功能的详细信息,请参阅官方的预写日志文档

包含WAL机制的示意图

结论

“出箱模式”解决了分布式系统中一个根本性的问题:如何以一种可靠且一致的方式完成数据库写入操作,并将消息发送给代理服务器?

这一方案的核心思想是将数据库作为业务数据与待发送消息的“权威存储源”。通过在同一事务中同时向数据库的“出箱表”写入数据,就可以获得数据库本身提供的原子性保障——因此完全不需要任何分布式事务协议。

以下是这些关键概念的简要总结:

  • 出箱表用于存储待发送的事件,它属于你的常规数据库结构的一部分。

  • 事务将业务数据写入操作与出箱表的写入操作封装在同一个事务中,从而确保这些操作的原子性。

  • 消息中继进程是一个后台程序,它负责从出箱表中读取数据并将其发送给代理服务器。

  • 至少一次交付机制要求消费者必须具备幂等性,这样才能确保消息不会被重复处理。

  • FOR UPDATE SKIP LOCKED这一语法使得多个中继进程可以安全地并行运行。

  • 逻辑复制是一种更高级的解决方案,它能够避免在高吞吐量系统中使用轮询机制。

虽然这种模式的原理很简单,但根据具体的应用规模和基础设施环境,实现方式可能会有所不同。本教程中介绍的轮询实现方法,对于大多数应用程序来说都是一个不错的起点。

资源链接

Comments are closed.