在构建Web应用程序时,并非所有的任务都必须在用户的请求范围内完成。
有些操作执行速度较慢,有些可能会失败,还有一些任务需要稍后才能处理。发送邮件、调整图片大小、处理Webhook事件、生成报告以及重新尝试调用第三方API等,都是典型的例子。
这些任务通常由后台作业系统来处理。
在本文中,我们将使用一个名为Swig的开源Go项目,作为实际案例,来了解基于PostgreSQL的作业队列是如何工作的。
通过学习本文的内容,你将掌握如何使用Go和PostgreSQL构建后台作业队列,并且会明白为什么PostgreSQL的功能比大多数开发者想象的要强大得多。
目录
先决条件
为了顺利学习本文内容,你需要具备以下条件:
-
对Go语言有基本的了解(包括结构体、接口和Goroutine等概念)
-
熟练掌握PostgreSQL数据库及SQL语法
-
已经安装了Go语言开发环境(版本1.21或更高版本)
-
能够访问本地的或远程的PostgreSQL实例
你将学到什么
-
了解如何在PostgreSQL中存储和表示作业任务
-
学习如何使用
FOR UPDATE SKIP LOCKED语句让多个并发工作进程安全地获取作业任务 -
掌握如何利用
LISTEN/NOTIFY机制高效地唤醒工作进程 -
了解如何通过建议锁机制在多个PostgreSQL实例之间选举领导者
-
理解Go语言中的接口、Goroutine、上下文以及事务机制在实际系统中的应用方式
什么是作业队列?
作业队列是一种用于存储待后续处理的任务的系统。
你的应用程序会将任务添加到队列中,然后由后台工作者从队列中取出这些任务并执行它们。
例如,当用户注册时,你的应用程序可能会立即创建该用户的账户,随后会添加如下这样的任务:
{
"kind": "send_welcome_email",
"payload": {
"to": "user@example.com",
"subject": "欢迎使用!"
}
}
之后,后台工作者会执行这个任务并发送邮件。这样一来,用户请求的处理速度就能得到保障。注册流程无需等待邮件发送完成即可返回响应。
一个优秀的作业队列通常需要解决以下几个关键问题:
-
任务数据存储在何处?
-
工作者如何找到待处理的任务?
-
如何防止多个工作者同时处理同一任务?
-
如何重试那些执行失败的任务?
-
如何安全地关闭后台工作者进程?
-
如何确保新创建的任务数据与应用程序中的现有数据保持一致?
Swig利用Go语言和PostgreSQL数据库解决了这些问题。
为什么选择PostgreSQL来构建作业队列?
许多作业队列系统会使用Redis、RabbitMQ、SQS或Kafka等工具。这些工具确实都非常有用,但很多应用程序已经在使用PostgreSQL了。如果你的应用本来就已经在使用PostgreSQL,那么为了运行后台任务而额外引入其他服务可能并不划算。
PostgreSQL为构建作业队列提供了许多非常有用的功能:
-
专门用于存储任务的数据库表
-
支持原子性数据写入的事务机制
-
能够保证并发处理安全性的行级锁机制
-
SKIP LOCKED指令,允许多个工作者同时处理不同的任务 -
LISTEN/NOTIFY机制,可在有新任务到达时通知工作者 -
用于选举领导者的辅助锁机制
-
支持JSONB格式的数据存储,便于携带复杂的任务数据
不过,在选择数据库时也需要权衡各种因素。使用PostgreSQL构建作业队列的目的,并不是要取代Kafka用于事件流处理,也不是要替代RabbitMQ用于复杂的路由逻辑。它的作用是让应用程序中的常见后台任务处理过程变得更简单、更可靠,同时也更易于维护,而无需额外增加基础设施成本。
Swig的架构设计
从整体结构来看,Swig由五个主要部分组成:
-
PostgreSQL数据库中的
swig_jobs表 - 负责处理任务的后台Go工作者进程
- 用于将任务名称与对应的工作者类型进行匹配的注册机制
- 同时支持
pgx和database/sql驱动程序的底层接口层 - 用于协调各项维护工作的领导节点机制
其基本工作流程如下:
- 你的应用程序调用
AddJob函数
Swig会将任务数据序列化为JSON格式
随后将相关记录插入到swig_jobs表中
PostgreSQL会发送通知,表明任务已成功创建
后台Go工作者会启动并尝试获取队列中的待处理任务
PostgreSQL的行级锁机制确保只有一个工作者能够访问该记录
工作者会执行相应的任务操作
最后,Swig会标记任务的完成状态或失败原因
难点在于并发处理、错误处理、连接生命周期的管理以及系统关闭机制。正是在这些方面,Go语言和PostgreSQL能够发挥各自的优势,共同解决问题。
如何在PostgreSQL中表示作业
Swig所使用的作业表格的简化版本如下所示:
CREATE TABLE swig_jobs (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
kind TEXT NOT NULL,
queue TEXT NOT NULL,
payload JSONB NOT NULL,
status TEXT NOT NULL DEFAULT 'pending',
priority INTEGER NOT NULL DEFAULT 0,
attempts INTEGER NOT NULL DEFAULT 0,
max_attempts INTEGER NOT NULL DEFAULT 3,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
scheduled_for TIMESTAMPTZ NOT NULL DEFAULT NOW(),
instance_id UUID,
worker_id UUID,
locked_at TIMESTAMPTZ,
last_error TEXT,
last_error_at TIMESTAMPTZ
);
每一行代表一个作业。其中重要的列包括:
-
kind:作业的类型,例如send_email -
payload:执行作业所需的JSON数据 -
status:作业是处于待处理状态、正在执行中、已经完成还是出现了错误 -
attempts:该作业已经被尝试执行的次数 -
scheduled_for:作业被安排在何时执行 -
locked_at:作业被哪个工作者取用
这个表格是存储作业状态的真实数据源。PostgreSQL的通知机制可以用来唤醒相应的工作者,但通知本身并不能作为持久化的队列来使用;真正起到持久化存储作用的,是swig_jobs表中的数据。
如何在Go语言中定义工作者
在Swig框架中,工作者是一种Go类型,它负责处理某种特定类型的作业。
以下是一个简单的电子邮件发送工作者的示例:
type EmailWorker struct {
To string `json:"to"`
Subject string `json:"subject"`
Body string `json:"body"`
}
func (w *EmailWorker) JobName() string {
return "send_email"
}
func (w *EmailWorker) Process(ctx context.Context) error {
fmt.Printf("正在向 %s 发送主题为 %s 的电子邮件\n", w.To, w.Subject)
return nil
}
这个示例中包含两个重要的方法:
-
JobName:用于告知Swig框架这个工作者负责处理哪种类型的作业 -
Process:实际执行作业的功能
结构体中的字段同时也充当了作业所需的参数。当向队列中添加一个EmailWorker》实例时,Swig会将其序列化为JSON格式并存储在PostgreSQL数据库中;之后,相应的工作者会从数据库中获取这些数据,将其反序列化回一个新的EmailWorker》实例,然后再调用Process方法来执行作业。
Go语言中的接口
Go语言中的接口用于描述对象的行为。对于Swig框架来说,它并不需要知道每个工作者的具体类型;它只需要确保这些工作者能够提供作业名称,并且能够执行相应的作业即可:
type Worker interface {
JobName() string
Process(context.Context) error
}
如果某个类型具备这些方法,那么它就无需进行显式声明即可满足该接口的要求。这就是为什么在 Go 语言中接口如此有用的原因之一——它们允许人们根据行为来设计程序,而不是依赖继承机制。
如何在不共享状态的情况下注册工作线程
Swig 提供了一个工作线程注册系统,该系统会将作业名称与相应的工作者类型关联起来:
registry := workers.NewWorkerRegistry()
registry.RegisterWorker(&EmailWorker{})
后来,当某条作业记录中包含 kind = 'send_email' 时,Swig 会查找已注册的工作者类型并启动它来执行相应的任务。
不过这里存在一个潜在的并发问题:如果注册系统存储的是指向 &EmailWorker{} 的指针,并将其重复用于多个作业,那么多个 goroutine 可能会同时尝试将 JSON 数据解码为同一个 Go 对象,从而导致数据混乱。
Swig 通过内部采用工厂模式来避免这种问题。在注册新作业时,系统会创建一个新的工作者实例,而不会重复使用现有的指针。这样一来,虽然公共 API 保持简单,但内部实现却能确保安全性。
如何添加作业
从用户的角度来看,添加作业的操作过程如下:
err := swigClient.AddJob(ctx, &EmailWorker{
To: "user@example.com",
Subject: "Welcome!",
Body: "Thanks for signing up.",
})
在 Swig 的内部,添加作业的具体流程大致如下:
argsJSON, err := json.Marshal(workerWithArgs)
if err != nil {
return err
}
_, err = db.ExecContext(ctx, `
INSERT INTO swig_jobs (kind, queue, payload, priority, scheduled_for, status)
VALUES (\(1, \)2, \(3, \)4, $5, 'pending')
`, jobName, queue, argsJSON, priority, runAt)
如何在事务中添加作业
使用 PostgreSQL 来处理作业的一个最大优势就是它支持事务机制,从而可以确保操作的原子性。
想象一下,当有用户注册时,你需要同时完成两个操作:将用户信息插入数据库,并发送欢迎邮件。如果这两个操作分开执行,就可能会出现状态不一致的情况。而使用事务的话,这两个操作要么都成功,要么都失败:
tx, err := pool.Begin(ctx)
if err != nil {
return err
}
defer tx.Rollback(ctx)
_, err = tx.Exec(ctx, `INSERT INTO users (email) VALUES ($1)`, email)
if err != nil {
return err
}
err = swigClient.AddJobWithTx(ctx, tx, &EmailWorker{
To: email,
Subject: "Welcome!",
Body: "Thanks for joining.",
})
if err != nil {
return err
}
return tx.commit(ctx)
如果事务被回滚,那么用户信息就不会被插入数据库,邮件也不会被发送。而当数据库和消息队列是分开的系统时,要保证这种一致性就变得非常困难了。
如何安全地管理多个工作线程
当多个工作者同时运行时,队列机制就会变得有趣起来。想象一下,有三个工作者都向 PostgreSQL 请求下一项待处理的任务——你肯定不希望这三个工作者处理同一项任务。
如果采用一种简单直接的方法,就会出现竞争条件:两个工作者可能会在选择到同一项任务之后,才去更新它。
PostgreSQL 的 FOR UPDATE SKIP LOCKED 机制
PostgreSQL 可以锁定在事务中选中的行。`FOR UPDATE` 表示“因为我打算更新这一行,所以要锁定它”;而 `SKIP LOCKED` 则表示“如果已经有其他工作者锁定了这一行,那就跳过它,去选择另一行”。
这种机制非常适合处理队列任务:
-
工作者 A 锁定了任务 1
-
工作者 B 跳过了任务 1,锁定了任务 2
-
工作者 C 跳过了任务 1 和任务 2,锁定了任务 3
在这种情况下,根本不需要中央协调器。Swig 使用的是原子性更新机制:
UPDATE swig_jobs
SET status = 'processing',
instance_id = $1,
worker_id = $2,
locked_at = NOW(),
attempts = attempts + 1
WHERE id = (
SELECT id
FROM swig_jobs
WHERE status = 'pending'
AND scheduled_for <= NOW()
ORDER BY priority DESC, created_at
FOR UPDATE SKIP LOCKED
LIMIT 1
)
RETURNING id, kind, payload;
这条查询会找到一项待处理的任务,跳过那些已经被锁定的任务,将其标记为“正在处理中”,记录下是哪个工作者选择了这项任务,然后返回该任务的详细信息。所有这些操作都是原子性完成的——工作者们不需要先执行 `SELECT` 操作,然后再等待后面的 `UPDATE` 操作,因为这一切都会同时完成。
如何使用协程来实现并发处理
Swig 会以协程的形式启动各个工作者的运行线程:
for i := 0; i < maxWorkers; i++ {
go s.startWorker(ctx, queueType)
}
每个工作者都是独立运行的。PostgreSQL 负责分配任务给各个工作者;而 Go 语言则通过协程来实现并发处理,同时 PostgreSQL 也利用锁定机制来确保任务分配的公平性。
如何实现优雅的系统关闭
当一个服务需要关闭时,它应该等待所有工作者完成当前的工作后再进行关闭操作。Go 语言中的 `sync.WaitGroup` 可以帮助实现这一点:
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
processJobs()
}()
wg.Wait()
Swig 还使用了 `sync.Once` 来确保关闭操作是幂等的——即使多次调用 `Stop` 方法,也不会因为通道被重复关闭而导致程序异常。在生产环境中,系统的关闭流程往往与测试环境中的情况有所不同。
如何使用 LISTEN/NOTIFY 机制来唤醒工作者
如果工作者们不断地从数据库中查询任务信息,那么当队列为空时,它们就会浪费资源。PostgreSQL 提供了 `LISTEN/NOTIFY` 机制来解决这个问题。
一个连接可以通过监听某个通道来接收新的任务请求:
LISTEN swig_jobs;
另一个会话可以发送通知:
NOTIFY swig_jobs, '{"id":"job-id"}';
Swig会创建一个触发器,这样当有作业被插入数据库时,PostgreSQL就会发送通知。在没有任务需要处理时,工作进程会处于等待状态;而当有新的作业到来时,这些进程便会开始执行。
这里有一个关于PostgreSQL的重要细节:LISTEN命令是会话级别的。因此,工作进程必须在与执行LISTEN命令的同一数据库会话中等待通知。Swig通过为每个工作进程创建一个专用的监听器来处理这个问题;这个监听器会在该工作进程的整个生命周期内一直保持活跃状态。
这是一个常见的后端开发经验:像连接池这样的抽象机制确实非常有用,但某些数据库功能实际上依赖于特定连接的生命周期。
如何使用建议锁来选举领导者
有些队列维护任务应该一次只在一个实例上运行,这些任务包括重试失败的任务、恢复已暂停的任务以及清理旧的历史记录。
Swig就是利用PostgreSQL的建议锁来实现这一点的:
SELECT pg_try_advisory_lock($1);
如果返回的结果为true,那么该Swig实例就会成为领导者。建议锁也是会话级别的,因此Swig会为领导者的角色专门创建一个连接来使用建议锁。如果这个会话结束,PostgreSQL会释放这个锁,然后另一个实例就可以接替它的领导职责。这样的机制实现了简单的故障转移功能,而且不需要借助ZooKeeper或etcd这样的工具。
如何处理失败的任务
当某个工作进程返回错误信息时,Swig会记录下这个错误,并要么重新尝试执行该任务,要么将其标记为失败:
UPDATE swig_jobs
SET status = CASE
WHEN attempts >= max_attempts THEN 'failed'
ELSE 'pending'
END,
last_error = $2,
last_error_at = NOW()
WHERE id = $1;
关于交付语义的说明
人们很容易认为作业队列会恰好处理一次每个任务。但在分布式系统中,这种说法其实是危险的。
请考虑以下这种情况:
-
一个工作进程发送了一封电子邮件
-
但这个工作进程在完成标记之前就崩溃了
-
系统会重新尝试执行这项任务
-
那么这封电子邮件可能会被再次发送
-
将任务信息存储在数据库表中
-
使用`FOR UPDATE SKIP LOCKED`语句原子性地获取任务
-
通过专用的`LISTEN/NOTIFY`会话来唤醒工作线程
-
利用建议锁机制来协调各个组件的运行顺序
-
利用goroutine和context机制来管理工作线程的生命周期
更准确的描述是:Swig提供了原子的任务获取机制以及至少一次的处理保证。由于任务可以被重新尝试执行,因此工作进程必须是幂等的——多次执行相同的操作应该会得到相同的结果。
如何对数据库驱动程序进行抽象
Swig通过一个驱动接口同时支持pgx和database/sql》这两种数据库驱动程序:
type Driver interface {
Exec(ctx context.Context, sql string, args ...interface{}) error
Query(ctx context.Context, sql string, args ...interface{}) (Rows, error)
QueryRow(ctx context.Context, sql string, args ...interface{}) Row
WithTx(ctx context.Context, fn func(tx Transaction) error) error
NewListener(ctx context.Context, channel string) (Listener, error)
TryAdvisoryLock(ctx context.Context, lockID int64) (AdvisoryLock, bool, error)
}
核心队列代码仅依赖于具体的功能实现,而与特定的库无关。这种设计在Go语言中非常常见:首先明确核心组件所需的功能,然后为具体的依赖关系编写相应的适配代码,从而确保核心逻辑的独立性。
结论
对于并非所有系统而言,以PostgreSQL作为后端的队列解决方案并不一定是最佳选择。如果需要处理大量事件流,Kafka可能更为适合;而如果需要实现复杂的路由机制,RabbitMQ则会更加适用。
然而,对于许多使用Go语言开发的应用程序来说,PostgreSQL早已成为了一个成熟且实用的选择。Swig项目证明了:只需借助简单的Go API以及PostgreSQL的一些功能,就能构建出高效的后台处理系统。
通过事务确保应用程序的数据和任务状态的一致性
这种组合为后台处理任务提供了坚实的基础,同时也是了解Go语言与PostgreSQL如何在生产环境中协同工作的绝佳范例。您可以通过github.com/glamboyosa/swig查看该项目的完整源代码。



