更改数据捕获 (CDC)是一种用于跟踪数据库表中的行级更改以响应创建、更新和删除操作的技术。不同的数据库使用不同的技术来公开这些更改数据事件 – 例如,PostgreSQL 中的逻辑解码、MySQL 二进制日志(binlog)等。这是一项强大的功能,但只有当有办法利用这些事件日志并使其可用于依赖于该信息的其他服务时,此功能才有用。

德贝齐姆就这样做了!它是一个分布式平台,基于不同数据库中可用的更改数据捕获功能。它提供了一组Kafka Connect 连接器,这些连接器利用数据库表中的行级更改(使用 CDC),并将其转换为事件流。这些事件流被发送到阿帕奇卡夫卡,这是一个可扩展的事件流平台 – 一个完美的适合!更改日志事件在 Kafka 中后,它们将可用于所有下游应用程序。

这与 Kafka Connect JDBC 连接器采用的”轮询”技术不同

图表 (从debezium.io) 很好地总结了它!

此博客是开始使用 Debezium 在 Azure 上设置基于 Azure 的更改数据捕获指南,Azure DB 用于 PostgreSQL 和 Azure 事件中心(对于 Kafka)。它将使用Debezium PostgreSQL 连接器将数据库修改从 PostgreSQL 流式传输到 Azure 事件中心中的 Kafka 主题

相关的配置文件在 GitHub 存储库中可用在 Azure 上设置 PostgreSQL 和 Kafka

本节将提供有关如何为 PostgreSQL 配置 Azure 事件中心和 Azure DB 的指针。所有你需要的是一个微软 Azure帐户 –继续前进, 并注册一个免费的!

邮政的 Azure DB

PostgreSQL 的 Azure DB 是基于开源 PostgreSQL 数据库引擎的社区版本的托管关系数据库服务,提供两种部署模式。

在编写本文时,它支持 PostgreSQL 版本11.6

您可以使用各种选项在 Azure 上设置 PostgreSQL,包括 Azure门户、Azure CLI、Azure PowerShell 、ARM 模板Azure CLI完成操作后,您可以使用您最喜爱的编程语言(如Java、.NET、Node.js、Python、Go 等Node.js)轻松连接到数据库Python.NETGomicrosoft.com/azure/postgresql/overview?WT.mc_id=dzone-blog-abhishgu#azure-database-for-postgresql—hyperscale-citus”rel=”无跟随”=超量量(Citus)是另一种部署模式,可用于”接近或已超过 100 GB 数据的工作负载”。

请确保您将以下 PostgreSQL 相关信息放在方便的位置,因为您将需要它们来配置后续部分中的 Debezium 连接器 – 数据库主机名(和端口)、用户名、密码

Azure 事件中心

Azure 事件中心是一个完全托管的数据流平台和事件引入服务。它还提供了一个支持阿帕奇 Kafka 协议 1.0 及更晚的 Kafka 终结点,并可与 Kafka 生态系统中的现有 Kafka 客户端应用程序和其他工具合作 Kafka Connect ,包括(在此博客中演示)。

您可以使用 Azure 门户、Azure CLI、PowerShell或 ARM 模板创建 Azure 事件中心命名空间和其他资源。 PowerShell为了确保启用 Kafka 功能,您只需要选择 或 Standard Dedicated 层(因为基本层不支持事件中心上的 Kafka)。

设置后,请确保您保持连接字符串方便,因为您将需要它来配置 Kafka 连接。可以使用 Azure门户或 Azure CLI进行此功能

安装卡夫卡

要运行 Kafka 连接, 我将使用本地 Kafka 安装只是为了方便apache.org/downloads”rel=”不跟随”-只需下载阿帕奇卡夫卡,解压缩其内容,你很好去!

下载 Debezium 连接器并启动 Kafka 连接

首先,克隆此 Git 存储库:

Java

 

 
1
git克隆https//github.com/abhirockzz/debezium-azure-postgres-cdc
2

cd debeziumazure后灰色cdc

下载 Debezium PostgreSQL 源连接器 JARs

1.2.0是撰写本文时的最新版本

Java

 

x
1
 
DEBEZIUM_CONNECTOR_VERSION=1.20

2

3
卷曲https//repo1.maven.org/maven2/io/debezium/debezium-连接器-postgres/$DEBEZIUM_CONNECTOR_VERSION]。最终/二十年代连接器-postgres-$DEBEZIUM_CONNECTOR_VERSION] 。Final - plugin. tar. gz - 输出 debezium - 连接器 - postgres. tar. gz
4

5
油 -xvzfdebezium-连接器-postgres.焦油.gz

现在,您应该会看到名为 的新文件夹 debezium-connector-postgres 。将连接器 JAR 文件复制到 Kafka 安装:

Java

 

x
1
 
1
导出KAFKA_HOME=kafka安装e的路径g/用户/foo/工作/kafka_212-2.3.0|
2

3

4

5
确认
6
ls - lrt $KAFKA_首页/图书馆 |格雷普 · 德贝齐姆
7
ls - lrt $KAFKA_首页/图书馆 |格雷普 · 普罗托布夫
8
ls - lrt $KAFKA_首页/图书馆 |格雷普 · 后格雷斯克尔

在启动 Kafka Connect 群集之前, connect.properties 编辑文件以包含以下属性的适当值: bootstrap.servers sasl.jaas.configproducer.sasl.jaas.config consumer.sasl.jaas.config 、、(只需替换占位符)

启动 Kafka 连接群集(我在模式下运行 distributed 它):

Java

 

x
1
 
1
导出KAFKA_HOME=kafka安装e的路径g/用户/foo/工作/kafka_212-2.3.0|
2

3
sh 连接属性

等待 Kafka 连接实例启动 - 您应该在 Azure 事件中心看到 Kafka 连接内部主题,例如。

配置 PostgreSQL

在安装连接器之前,我们需要:

  • 确保可从 Kafka 连接群集访问 PostgreSQL 实例
  • 确保 PostrgeSQL 复制设置设置为"逻辑"
  • 创建可用于尝试更改数据捕获功能的表

如果要使用 Azure DB 进行 PostgreSQL,请使用az postgres 服务器防火墙规则创建防火墙规则,创建命令以将 Kafka Connect 主机列入白名单。在我的情况下,它是一个本地的 Kafka Connect 群集,因此我只需导航到 Azure门户(我的PostrgreSQL 实例的连接安全部分),并选择添加当前客户端IP地址,以确保我的本地 IP 已添加到防火墙规则中,例如:

若要更改 PostgreSQL 的 Azure DB 复制模式,可以使用az postgres 服务器配置命令:

Java

 

x
1
 
1
azpostgresserverofnamegroupof服务器configuration配置集 -资源-<资源组> -name服务器名称<服务器名称 >--名称azure-namereplication_support--逻辑

更新配置后,您需要重新启动可以使用CLI(az postgres服务器重新启动)或门户执行的服务器。

数据库启动并运行后,创建表 - 我 psql 在此示例中使用了 CLI,但请随时使用任何其他工具。例如,要通过 SSL 连接到 Azure 上的 PostgreSQL 数据库(系统会提示您输入密码):

Java

 

x
1

1

2
psql-h<POSTGRESQL_INSTANCE_NAME>.后格雷斯数据库蔚蓝.com-p5432-U<POSTGRES_USER_NAME>-W-d<POSTGRES_DB_NAME>-设置=sslmode=要求
3

4
例子
5
psql-h我的-pgsql.后格雷斯

蔚蓝.com -p 5432 -U foo foo@my-pgsql -W -d格雷斯 -设置[sslmode]=需要

6

7
创建表
8
创建待办事项id串行描述VARCHAR30todo_statusVARCHAR10主键KEYid));

安装 Debezium PostgreSQL 源连接器

更新pg-source-connector下面是一个示例:

Java

 

x
1
14
 
1
{
2

3
"配置": |
4
"连接器. 类""io. debezium. 连接器. postgresql. PostgresConnector"
5
"数据库.主机名"<POSTGRES_INSTANCE_NAME>.postgres.数据库.azure.com"
6
"数据库.port""5432"
7
"数据库.用户""<DB_USER_NAME>"
8
"数据库.密码""<密码>"
9
"数据库.dbname""<DB_NAME例如 postgres>"
• "database.server.name":" <LOGICAL_NAMESPACE例如 todo-server>"

11
"plugin.name""瓦尔2json",
12
"表.白名单""<TABLE_NAMES例如公共.todos>"
13
  }
14
}

让我们浏览一下配置:

有关详细信息,请查看Debezium 文档

  • connector.class:连接器类的名称(这是一个静态值)
  • database.hostnamedatabase.port : PostgreSQL 实例以及端口的 IP 地址或主机名(例如 5432
  • database.userdatabase.password : PostgreSQL 实例的用户名和密码
  • database.dbname:数据库名称,例如postgres
  • database.server.name: 逻辑名称,用于标识和提供要监视的特定 PostgreSQL 数据库服务器/群集的命名空间。
  • table.whitelist: 逗号分隔的正则表达式列表,指定要监视哪些表以进行更改数据捕获
  • plugin

G。wal2json

在编写本文时,Debezium 支持以下插件 decoderbufs wal2json wal2json_rds :、、 wal2json_streamingwal2json_rds_streaming pgoutput 。我 wal2json 已在此示例中使用,Azure上也支持它

最后,安装连接器!

Java

 

x
1
 
1
卷曲-XPOST-H"内容类型:应用程序/json"-数据@pg@pg--连接器

989594px;">

Kafka Connect 现在将开始监视表 todos 以创建、更新和删除事件

更改数据捕获操作

插入记录:

Java

 

x
1
 
1
后格雷斯数据库蔚蓝.com -p 5432 -U <POSTGRES_USER_NAME> -W -d <POSTGRES_DB_NAME> -设置=sslmode=要求

2

3
插入待办事项描述todo_statustodo_status"安装后灰色"'完成');
4
插入待办事项描述todo_statustodo_status'安装卡夫卡''完成');
5
插入待办事项描述todo_statustodo_status"设置源连接器"'待定');
kafkacat 使用,但您也可以使用此处列出的任何选项创建消费者应用

更新 metadata.broker.list sasl.password 和 属性, kafkacat.conf 以包括卡夫卡代理的详细信息。在不同的终端中,使用它读取 CDC 有效负载:

Java

 

x
1
 
1
康夫

2
出口经纪人\<卡夫卡经纪人>e.g表示事件中心-我的-事件中心-命名空间服务总线.窗口9093
3
导出主题\<server服务器配置><table表名>e.g待办事项-服务器.公共托多斯
4

5
卡夫卡卡特-b$BROKER-t$TOPIC-o开始
Java

 

x
1
26
 
1
{
["架构": [...],

3
"有效负载": |
4
"之前"为
5
"后": |
6
"id"1
7
"描述""安装 postgresql"
8
"todo_status""完成"
9
  },
["源":]

11
"版本""1.2.0. final"
12
"连接器""后灰色",
13
"名称""填充",
14
"ts_ms"1593018069944
15
"快照""最后",
16
"db""后灰色",
17

18
"表""待办事项",
19
"txId"602
20
"lsn"184579736
21
"xmin"
22
  },
23
"op""c"
24
"ts_ms"1593018069947
25

26
  }

事件及其及其( payload 为了简洁而 schema 省略)由 及其组成。在部分中,请注意创建操作 ( ) 的表示方式 - 表示这是一个新 payload "op": "c" "before": null INSERT ed 行, after 为行中的每列提供 source 值,提供从其中拾取此事件的 PostgreSQL 实例元数据等。

您也可以尝试与更新或删除操作相同,并反省 CDC 事件,例如

Java

 

x
1

1
更新待办事项集SETtodo_status="完成"其中描述description= "设置源连接器";

(可选)安装文件接收器连接器

作为奖励,您也可以使用文件接收器连接器快速测试此内容。它是在 Kafka 分发中开箱即用的 - 您只需安装连接器。只需替换文件中 topics file 的 和 file-sink-connector.json 属性

Java

 

x
1

1
{
2
"名称""cdc 文件接收器",
3
"配置": |
4
"连接器. 类""org. apache. kafka. connect. file. file. file 流水墨连接器"
5
"任务.最大""1",
6
"主题""<服务器名称>&<表名>例如 todos-server.public.todos"
7
"文件""<输入文件的完整路径,例如/用户/foo/工作/pg-cdc

9896px;>

8
  }
9
}

要创建连接器:

Java

 

X
1
 
1
json http//本地主机:8083/连接器

播放数据库记录并监视配置的输出接收器文件中的记录,例如

Java

 

x
1
 
1
尾巴-f/用户/foo/工作/pg-cdc

989594px;">

结论

如果你已经到了这一步,感谢您的阅读(这个相当长的教程)!

更改数据捕获是一种强大的技术,通过提供对数据库更改的近乎实时访问来帮助"解锁数据库"。这是一个"入门"指南,旨在帮助您快速启动和运行,进行实验并进一步迭代。希望你发现它有用!

Comments are closed.