KSQL 是 Apache Kafka 的 SQL 流式处理引擎。它提供了一个易于使用但功能强大的交互式 SQL 接口,用于 Kafka 上的流处理,而无需使用 Java 或 Python 等编程语言编写代码。KSQL 具有可扩展性、弹性性和容错性。它支持各种流式处理操作,包括数据筛选、转换、聚合、联接、窗口化和会话化。

什么是流式处理?

在流处理中,数据会随着新数据可用于分析而持续处理。数据作为无界流按顺序处理,并且可能由”侦听”分析系统作为键值对中的记录进行拉入。

Stream of data

以下是 KSQL 处理的几个关键功能:

  1. 每个记录流处理,具有毫秒延迟。
  2. 数据筛选。
  3. 数据转换和转换。
  4. 通过联接进行数据扩充。
  5. 具有标量函数的数据操作。
  6. 具有有状态处理、聚合和窗口操作的数据分析。

客户端应用程序可以使用 Kafka 流 API 对 Kafka 主题数据进行流处理,在 Kafka 流 API 下面是 Kafka 生产者和使用者。

KSQL 查询执行流处理,这是 Kafka 流 API 的抽象,它可以使用结构化的流数据,如 Avro、JSON、DELIMITED

现在,让我们来看看如何在 KSQL 中查询:

  1. 启动您的汇流。
  2. <confluent-home>/bin/ksql的帮助下打开 KSQL CLI。
  3. STREAM pageviews_original Kafka 主题页面视图创建 ,指定 value_format DELIMITED 。描述新的 STREAM 。请注意,KSQL 创建了名为 “的其他列, ROWTIME 这些列对应于 Kafka 消息时间戳,以及 ROWKEY ,这些列对应于 Kafka 消息键:
    • ksql> CREATE STREAM pageviews_original (viewtime bigint, userid varchar,
      pageid varchar) WITH (kafka_topic='pageviews',value_format='DELIMITED');

  1. users_original Kafka 主题创建一个表 users ,指定 value_format JSON 的 。描述新表:
    • ksql> CREATE TABLE users_original (registertime bigint, gender varchar,
      regionid varchar, userid varchar) WITH (kafka_topic='users',value_format=
      'JSON');
  2. 使用:
    • SHOW STREAMS;
    • SHOW TABLES;

KStream vs K 表

流是结构化数据序列另一方面,表表示基于来自流的事件的当前情况,这些事件是可变的。

  1. 使用 CREATE STREAM 关键字在语句之前创建持久查询 SELECT
    • ksql> CREATE STREAM pageviews_female AS SELECT users_original.userid AS
      userid, pageid, regionid, gender FROM pageviews_original LEFT JOIN
      users_original ON pageviews_original.userid = users_original.userid
      WHERE gender = 'FEMALE';

  1. 将 KSQL 写入输出主题:
    • CREATE STREAM pageviews_female_like_89 WITH (kafka_topic='pageviews_enrich
      ed_r8_r9', value_format='DELIMITED') AS SELECT * FROM pageviews_female WHERE
      regionid LIKE '%_8' OR regionid LIKE '%_9'
Comments are closed.