Apache NiFi 是一个易于使用、功能强大、高度可用、以及可靠系统来处理和分发数据。它专为源系统和目标系统之间的数据流而设计,是一个简单强大的工具,用于处理来自各种源和目标的数据(查找更多GitHub 上)。 NiFi 有 3 个存储库:

  • FlowFile Repository:存储活动流程期间 FlowFile 的元数据
  • 内容存储库:保存 FlowFiles 的实际内容
  • Provenance Repository:存储每个处理器中FlowFiles的快照;这样,它概述了详细的数据流和每个处理器的变化,并允许深入发现事件链
  • NiFi注册表是NiFi的一个独立子项目,允许对NiFi进行版本控制。它允许保存 FlowFile 状态并在 NiFi 应用程序之间共享 FlowFiles。主要用于对Nifi编写的代码进行版本控制。

    一般设置和使用

    当数据从源流向目标时,FlowFile 的数据和元数据驻留在 FlowFile 和内容存储库中。 NiFi 将所有 FlowFile 内容存储在磁盘上,以确保重启后的恢复能力。它还提供背压,以防止数据消费者/源在目标无法跟上一段时间的情况下压垮系统。

    例如,ConsumeKafka 在 NiFi 中以 FlowFile 形式接收数据(通过 ConsumeKafka 处理器)。假设目标是另一个 Kafka 主题(或 Hive/SQL/Postgres表)在一般过滤器、丰富等之后。但是,如果目标不可用,或者任何代码无法按预期工作(即过滤器代码或丰富代码),流由于背压而停止,并且ConsumeKafka将不会运行。幸运的是,不会发生数据丢失,因为数据存在于内容存储库中,并且一旦问题解决,数据就会恢复流向目标。

    大多数应用程序用例在此设置中运行良好。然而,某些用例可能需要与传统 NiFi 提供的架构略有不同的架构。

    用例

    如果用户知道他们从中接收数据的数据源既持久又可重放,那么跳过存储数据可能会更有利(在 NiFi 中,如 FlowFile 中的内容存储库),而不是在重新启动后重播源中的数据。这种方法有多种优点。首先,数据可以存储在内存中而不是磁盘上,从而提供更好的性能和更快的加载时间。其次,它可以实现机器之间的无缝数据传输,没有任何丢失。

    这可以通过 NiFi EXECUTESTATELESS 处理器来实现。

    如何设置和运行

    1. 首先,准备您要设置的流程。例如:Consume Kafka 将数据作为 FlowFile 接收到内容存储库。应用程序代码运行(通用过滤器/丰富等)发布到另一个 Kafka/写入 Hive/SQL 表/Postgres 表等。
    2. 假设由于某些过滤/丰富而消耗大量磁盘/CPU 资源的代码可以转换为 EXECUTESTATELESS 进程,并且可以在内存中运行。
      流程如下所示:

    Consumekafka --> 执行无状态处理器 --> 发布 kafka/puthiveql/putdatabaserecord。

    3.当无状态进程失败并因此出现背压时,问题解决后可以重放数据。由于这是在内存中执行的,因此与传统 NiFi 运行相比速度更快。

    4.上述代码准备好后 (#2),将其保存在 processgroup 中。右键单击并检查NiFi注册表的代码以启动版本控制。启动版本控制

    5.现在完成代码的完整设置:拖动 consumekafka 并设置 Kafka 主题/SSL 配置/偏移等配置等属性(考虑上面的示例)。拖动执行无状态处理器并按照下面的步骤 7 进行配置。按照 #3 中所示的流程将其连接到 consumekafka 处理器和 publishkafka 处理器。拖动 publishKafka 并设置 Kafka 主题/SSL 配置/任何其他属性(例如压缩等)的配置。

    • 需要注意的重要一点:如果此代码使用任何机密(例如密钥库/信任库密码或数据库凭据),则应在执行executestatelessprocessgroup中配置它们> 进程将要运行。这也应该作为与如何在进程组内进行配置同名的变量从executestateless进程传递。

    6.下面的屏幕截图显示了 executestateless 处理器的配置:
    executestateless 处理器的配置< /p>

    • 数据流规范策略:使用 NiFi 注册表
    • 注册表网址:配置的 NiFi 注册表网址
    • 注册表存储桶:已检查代码的特定存储桶名称
    • 流程名称:已检查代码的流程名称
    • 输入端口consumekafka 连接的端口名称(考虑上面的示例) );进程组应该有一个输入端口 – 如果您有多个输入,请以逗号分隔名称
    • 故障端口:如果发生任何故障,实际代码应该存在故障端口,并且可以再次重新处理这些 FlowFile 。如果您有多个故障端口,请以逗号分隔名称。

    7.根据上面 #6 中提到的要点,在其末尾添加其他变量,如下所示,以获取任何机密。配置处理器

    • 内容存储策略:将其更改为“在堆上存储内容”。
      • 请注意:对处理器影响最大的配置选项之一是“内容存储策略”属性的配置。出于性能原因,处理器可以配置为将所有 FlowFiles 保存在内存中。这包括传入的 FlowFiles,以及中间和输出 FlowFiles。这可以显着提高性能,但也带来很大的风险。内容存储在 NiFi 的堆上。这与 NiFi 处理器和 NiFi 进程本身的所有其他 ExecuteStateless 流共享的堆相同。如果数据非常大,它会很快耗尽堆,导致 NiFi 出现内存不足错误。反过来,这些可能会导致性能不佳以及 NiFi 进程本身的不稳定。因此,不建议使用“Store Content on Heap”选项,除非已知所有 FlowFile 都很小(小于几 MB)。此外,为了帮助防止处理器收到意外大的 FlowFile 的情况,在堆上存储数据时必须配置“最大输入 FlowFile 大小”属性。或者,默认情况下,“内容存储策略”可以配置为将 FlowFile 内容存储在磁盘上。使用此选项时,所有 FlowFiles 的内容都存储在配置的工作目录中。但值得注意的是,这些数据并不意味着在重新启动后仍会保留。相反,这只是为无状态引擎提供了一种避免将所有内容加载到内存中的方法。重新启动后,数据将被删除,而不是允许 FlowFiles 从中断处恢复 (参考)。

    8.最终流程如下所示:

    最终流程

    结论

    Stateless NiFi 提供了与传统 NiFi 不同的运行时引擎。它是一个单线程运行时引擎,其中数据不会在重新启动后保留,但可以在多线程中运行。确保设置多个线程(根据如下所述的用例)。如上面第 7 步所述,应考虑性能影响。

    设计与无状态一起使用的流程时,重要的是要考虑流程可能希望如何接收其数据以及处理数据后可能希望对数据执行什么操作。不同的选项如下:

    1. 完全封装数据源和所有目的地的流程:例如,它可能有一个 ConsumeKafkaRecord 处理器,执行一些处理,然后发布通过 PublishKafkaRecord 到另一个主题。
    2. 构建一个从某些外部源获取数据的流程,可能会执行一些处理,但不定义数据的目的地。例如,该流程可能包含一个ConsumeKafkaRecord处理器并执行一些过滤和转换,但不会在任何地方发布数据。相反,它可以将数据传输到输出端口,然后 ExecuteStateless 可以使用该输出端口将该数据带入 NiFi 数据流。
    3. 数据流可能不定义从哪里接收输入,而只是使用输入端口,以便可以将任何数据流构建为源数据,然后将其传递到此数据流,负责准备和交付数据。
    4. 最后,数据流既不能定义数据的源也不能定义数据的目的地。相反,数据流将被构建为使用输入端口,它将执行一些过滤/路由/转换,并最终将其处理结果提供给输出端口。
      (参考)。

    传统的 NiFi 运行时引擎和无状态 NiFi 运行时引擎都有其优点和缺点。理想的情况是用户可以轻松选择数据流的哪些部分运行无状态,哪些部分在传统 NiFi 运行时引擎中运行。

    其他参考

    Comments are closed.