介绍

Salesforce 更改数据捕获功能支持基于 Salesforce 事件总线的事件驱动体系结构,支持近乎实时的数据复制。此事件总线可以作为数据创建者连接到 Kafka。 另一方面,雪花 – 与汇流合作 – 还为 Kafka 提供了自己的雪花连接器,这使得配置 Kafka 接收器(即数据使用者)变得容易,并存储在雪花表中的 JSON 或 Avro 数据,然后可以使用雪花半结构化 SQL 查询进行分析。这些组件一起为雪花中的 Salesforce 流式处理数据处理提供了理想的体系结构。

建筑

在我们的教程中,我们将演示如何将 Salesforce 订单发送到事件总线,并近乎实时地将它们推送到雪花表中。

流式数据堆栈如下所示:

配置

在 Salesforce 中,我们需要 为订单启用更改 数据捕获。这很简单,只需在”设置 ->更改数据捕获”菜单下选择标准订单对象:

完成后,Salesforce 将创建一个 OrderChangeevent 对象广告,该广告已准备好发布 chage 数据捕获事件。

汇流平台支持数据库更改的数据集成,用于分析、监控等。

要安装汇合自管理的 Kafka,我们需要 转到https://www.confluent.io/download/

下载平台后,我们需要设置环境变量和路径,然后我们可以从命令行启动平台:

 

1
导出CONFLUENT_HOME=< 路径到汇合 >
2
* 安装 Kafka 连接数据根源连接器用于演示目的 

4
$CONFLUENT_HOME/bin/汇合集线器安装 |
5
-- 无提示 汇合/kafka-连接数据原:最新
6
#This命令启动所有汇合平台组件;包括卡夫卡, 动物园守护者, 架构注册表, Http REST 代理卡夫卡, 卡夫卡连接, ksqldb 和控制中心。
7
开始

从控制中心 Web 界面,我们可以创建我们的 Salesforce 订单主题,在我们的案例中,它称为salesforce_orders:

然后,我们可以创建我们的连接器;首先用于销售队伍更改数据捕获,然后用于雪花:

Salesforce 更改数据捕获连接程序需要配置参数,如用户名、密码、连接的应用使用者密钥和使用者密钥。销售队伍实例 URL 和 Kafka 主题和汇合主题服务器:

雪花连接器配置如下所示:

它使用此处描述的密钥对身份验证 因此我们需要生成私有键和公钥,并相应地设置数据库RSA_PUBLIC_KEY属性。

生成销售队伍更改数据捕获事件

要创建 OrderChangeEvent,我们只需要导航到 Salesforce 订单,并使用填充了适当的帐户和合同值创建新订单。初始状态为草稿。

这将触发新的创建更改数据捕获事件。

下一步是添加订单产品:

一旦我们选择了产品和数量并保存了订单产品,将创建一个新的更改数据 Capire 事件,一个 UPDATE 事件,用于在订单标题上设置 TotalAmount 值。

最后,我们可以通过选择订单状态路径上的Activated值"闪电"组件来激活订单,然后单击"标记为当前状态"按钮:

这将生成另一个更新更改数据捕获事件,该事件将发布到 Salesforce 事件总线。从那里,Kafka 连接器将拾取它,并在主题上salesforce_orders记录

在雪花中查询更改数据捕获记录

雪花 Kafka 连接器设计为在 Kafka Connect 群集内运行,以读取 Kafka 主题中的数据,并将数据写入雪花表。从雪花的角度来看,Kafka 主题生成要插入雪花表中的行流。通常,每个 Kafka 消息都包含一行。

Kafka 主题可以映射到 Kafka 配置中的现有雪花表。如果未映射主题,则 Kafka 连接器将为每个主题创建一个新表,使用主题名称。(在我们的案例中salesforce_orders)

Kafka 连接器加载的每个雪花表都有一个由两列 VARIANT 组成的架构:

  • RECORD_CONTENT.这包含卡夫卡消息。

  • RECORD_METADATA.这包含有关消息的元数据,例如,从中读取消息的主题。

在我们的场景中,新的雪花表将存储在 SALESFORCE_DB salesforCEschema 的数据库中,其名称SALESFORCE_ORDERS。

数据将采用 JSON 格式,因此我们需要将 SQL 查询格式用于半结构化数据格式,如下所示:

Sql

 

x
1
1
选择Record_Metadata:创建时间,Record_Content:changeEventHeader.changeType.changeType
2
来自销售.salesforce_orders;

在"结果"部分中,我们将找到 3 个更改数据捕获事件(创建、使用 TotalAmount 更新、具有已激活状态的更新),如上所述。在更新的情况下,Salesforce 将仅更新更改的值,还将发送"上次修改时间"值。

原始 JSON 数据如下所示(它存储在一个RECORD_CONTENT wich 具有变体类型):

结论

在当今的商业世界中,越来越多的需求利用各种来源(包括 CRM)为客户提供近实时数据。Salesforce 更改数据捕获事件驱动机制以及 Kafka 和雪花为这些需求提供了出色的体系结构。它很容易设置,在康共和 Kafka 连接器的帮助下,无需编写任何自定义代码,所有操作都可以通过简单的基于 Web 的配置来实现。

流数据雪花参考体系结构

Comments are closed.