Quarkus提供了几种不同的反应消息传递功能。开源项目云原生启动器在本文中介绍的示例应用程序中使用了其中一些功能。

有几个简单的关于”反应消息”主题的 Quarkus 指南。教程是伟大的,但学习新技术的最好方法是将它们用于简单的应用程序。这就是为什么我想出了一个简单的场景。

示例应用

该示例附带一个 Web 应用程序,该应用程序在简单的 Web 应用程序中显示指向包含作者信息的文章的链接。Web 应用程序调用 Web API 服务,该服务实现后端前端模式,并调用文章和作者的微服务。文章服务将数据存储在 Postgres 数据库中。消息通过卡夫卡在微服务之间发送。此图描述了高级体系结构:

clients and kubernetes

反应模型的一个好处是能够通过发送消息来更新 Web 应用程序,而不是拉取更新。这更高效,并改善了用户体验。

以下视频演示如何通过 REST API 创建文章。Web 应用程序接收通知并将新文章添加到页面。

下图将详细介绍本文中我将更详细地介绍的流程。

clients and kubernetes

  1. “提交”API 客户端调用”文章”服务的 REST 终结点来创建新文章。
  2. 创建文章后,将发送一条消息给 Kafka。
  3. “Web-API”服务已订阅 Kafka 消息,以便调用侦听器。
  4. 创建新文章时,事件将流式传输到 Web 应用

通过 Vert.X 事件总线发送内存内消息

“文章”和”Web-API”服务已在 Java 中与 Quarkus 一起实现。在这两种情况下,我都使用了一种干净的体系结构方法,其中微服务的代码被组织成三个包。这些包彼此相当独立,可以与其他实现交换。

  1. API:包含 REST 终结点并处理传入和传出消息。
  2. 业务:包含微服务和业务实体的业务逻辑。
  3. 数据:包含访问数据库或其他微服务的代码。

在 Postgres 数据库中存储了新文章后,将消息发送给 Kafka。这是由业务逻辑触发的,但实际代码驻留在 API 层中。这就是为什么业务层需要首先将消息发送到 API 层的原因。

Quarkus 提供了一种机制,使 bean 通过强制执行松散耦合来通过异步消息进行交互。查看 bean 之间的引导异步消息传递。此功能通过 Eclipse Vert.x 提供,该顶点随夸夸其分一起提供。

下面是将内存中的事件发送到 API 层的代码

Java

 

15
1
导入io顶点.车轴.核心事件总线事件总线;
2

3
...
4

5
@Inject
6

7
事件总线bus;
8

9

10

11
私人空文发送消息托卡夫卡Article文章) |
12

13
公共汽车发表("com.ibm.文章.apis.New文章创造倾听者"文章)。id;
14

15
}

在 API 层中,可以使用事件(请参阅代码):

Java

 

xxxxxxx
1
11
 
1
导入io夸库。顶点.消费事件;
2

3
...
4

5
公共空文发送消息到卡夫卡字符串文章 Id) |

8

9
...
10

11
}

Eclipse MicroProfile 支持内存中消息的另一种机制。我之所以没有使用它,在这种情况下,是我没有让它工作。对我来说,@Outgoing注释仅适用于由传入事件或平台触发的方法(例如@PostConstruct)。

就我而言,我必须从业务逻辑触发此功能。我不确定这是否是 MircoProfile 中缺少的功能、缺陷还是用户错误。我正想找出答案

本文档提到了使用 Vert.x 事件总线的另一个原因:"异步消息传递功能允许回复 MicroProfile 反应消息不支持的消息。但是,它仅限于单事件行为(无流)和本地消息。

通过卡夫卡 API 发送卡夫卡消息

接下来,"Web-API"服务的 API 层需要将消息发送到 Kafka。要在库伯内斯建立卡夫卡,请按照我上一篇文章的指示,从夸库斯访问阿帕奇卡夫卡

Eclipse MicroProfile 反应消息提供了相同的@Outgoing注释来执行此操作,但我无法使其正常工作,因为我必须手动触发此功能

作为解决方法,我改用卡夫卡 API。使用是相当直接的。遗憾的是,配置似乎不是从 MicroProfile 使用的同一"应用程序.属性"文件中读取的。相反,我不得不在代码中执行此操作:

Java

 

xxxxxxx
1
57
导入 io顶点.核心顶点;

2

3
导入io顶点.卡夫卡.客户端生产者.卡夫卡制作人;
4

5
...
6

7
@Inject
8

9
;
10

11
@ConfigProperty名称 = "kafka.bootstrap.server"

14

15
字符串卡夫卡博茨陷阱服务器;
16

17
 
18

19
卡夫卡制作人<字符串String>制作人;
20

21
...
22

23
无效initkaClient() |

26

27
映射<字符串String>配置= 新的哈希映射<>();
28

29
配置.("靴子.服务器"卡夫卡博茨陷阱服务器);
30

31
配置.("键.序列化器","org.apache.kafka.common.序列化.弦序列化");
32

33
("value.序列化器","org.apache.kafka.common.序列化.弦序列化");

34

35
制片人+卡夫卡制作人创建顶点配置);
36

37
}
38

39
...
40

41
@ConsumeEvent
42

43
尝试

46

47
卡夫卡制作人记录<String字符串>记录=卡夫卡制作人记录.创建("新文章创建"文章 Id);
48

49
生产者.写入记录完成->系统println("卡夫卡消息发送:新文章创建 - "=文章 Id);"
50

51
[捕获例外e) |
52

53
  }

56

57
}

通过微配置文件发送和接收消息

接下来,"Web-API"服务需要从卡夫卡接收此消息。通过注释@Incoming,可通过微配置文件反应消息轻松实现此部件。下面是代码

Java

 

21
导入组织日食微图 。反应性消息传入;

2

3
导入组织日食微图 。反应性消息传出;
4

5
导入io小里。反应性消息注释.广播;
6

7
...
8

9
@Outgoing"流新文章"

12

13
@Broadcast
14

15
公共字符串进程字符串文章 Id) |
16

17
系统println("卡夫卡收到消息:新文章创建 - "=文章Id);
18

19
返回文章 Id;
20

21
Apache Kafka 和反应消息的指南。从开发人员的经验来看,这非常简单。我特别喜欢同样的注释可用于 Kafka 通道以及内存中消息传递。

通过服务器发送事件将事件发送到 Web 应用程序

最后一步是将事件流式传输到 Web 应用程序。这是通过服务器发送事件完成的,并且 Quarkus 非常容易实现。流式处理终结点通过@Channel接收消息,并通过@Produces(MediaType.SERVER_SENT_EVENTS)和@SseElementType转发消息(请参阅代码):

Java

 

xxxxxxx
1
31
导入组织反应流发布者;

2

3
导入io小里。反应性消息注释.频道;
4

5
导入组织j博斯.易地注释.赛元素类型;
6

7
...
8

9
@Inject

12

13
@Channel("流新文章"发布商<字符串>新文章;
14

15
 
16

17
@GET
18

19
@Path"/服务器发送事件"
20

21
SERVER_SENT_EVENTS

22

23
@SseElementType"文本/纯"
24

25
公共发布商<字符串>() |
26

27
返回新文章;
28

29
}
30

31
代码)。或者,我也可以发送完整的文章信息在事件中。

Java

 

xxxxxxx
1
 
1
= 新的事件源.$store.状态终结点api= "服务器发送事件";
2

3
消息 = 函数事件) |

6

7
.阅读文章();
8

9
};

后续步骤

如果要了解有关反应编程和反应消息传递的更多内容,请自己试用代码

Comments are closed.