我从事的系统即将进行重大的技术更新。目标是改善当前阻止摄入更多数据的障碍。补救措施要求产品接收来自外部来源的投入,处理它们,并向其目的地传播结果。

Apache Nifi 实现了基于流的编程 (FBP)范式;它由黑箱进程组成,这些进程通过预定义的连接交换数据(摘自维基百科)。

简而言之,Apache NiFi 是处理和分发数据的工具。其直观的 UI 支持路由定义、各种连接器(输入/输出)和许多内置处理器。所有这些功能结合在一起,成为适合我们用例的可选平台。

鉴于我们系统的未来需求,我们决定对尼菲进行彻底评估。起点是设置环境。

在这篇文章中,我将介绍如何使用 Docker 映像设置 Nifi 环境并运行一个简单的预定义模板;从头开始构建 Nifi 流将另一篇文章介绍。本文主要三个部分:

  • 回顾阿帕奇尼菲概念和构建基块
  • 设置尼菲流和尼菲注册表(基于 Docker 图像)
  • 加载并运行模板

准备?让我们从基础开始。

尼菲组件和概念

Nifi 基于以下层次结构:

  • 流程组
    处理器及其连接的集合。流程组是保存在版本控制(Nifi 注册表)中的最小单位。进程组可以具有允许连接进程组的输入和输出端口。这样,数据流可以由多个流程组组成。
  • 处理器
    (大部分)由连接器链接到另一个处理器的输入和输出的处理单元每个处理器都是一个黑盒,执行单个操作;例如,处理器可以更改 FlowFile 的内容或属性(见下文)

FlowFile 对象是不可变的,但其内容和属性在处理过程中可能会更改。

  • 连接
    连接是将流文件路由在处理器之间的队列。路由逻辑基于与处理器结果相关的条件;连接与一个或多个结果类型关联。连接的条件是处理器之间的关系,可以是静态的,也可以是动态的。虽然静态关系是固定的(例如 = 成功、失败、匹配或不匹配),动态关系基于用户定义的 FlowFile 的属性;如果静态关系是固定的(例如 ,成功、失败、匹配或不匹配),但动态关系基于用户定义的 FlowFile 属性。本文的最后一节说明了使用RouteOnAttribute 处理器的此功能。
  • 港口
    流程组的入口和退出点。每个进程组可以有一个或多个输入或输出端口,由其名称区分。
  • 漏斗
    将多个连接中的数据合并到单个连接中。
  • 下面的 Nifi 流描述了这些组件:

    尼菲元素

    在查看 Nifi 数据流组件后,让我们看看如何设置环境。

    设置尼菲环境

    我选择使用在 Docker 容器上托管的 Nifi,而不是在我的计算机上安装 Nifi,主要原因如下:

    • 可移植性:Nifi 应用程序可以复制或移动到其他主机。
    • 在同一主机上具有多个 Nifi 应用程序的灵活性,每个主机具有不同的端口。
    • 低脚印:避免更换主机。
    • 能够在定义 Nifi 流的过程中拍摄快照时冻结环境。

    启动尼菲应用程序

    我使用Ubuntu,一个基于德比安的Linux,所以你可以找到命令来安装和配置Dock到本文。

    安装 Docker 后,从 下载最新的 Nifi 映像版本

    您可以使用默认设置创建和启动 Docker 容器:

    Java

     

     
    1
    $码头运行阿帕奇/尼菲

    但是,我想创建一个更复杂的容器,允许以下:

    • 命名我的容器 (我命名它nifi2
    • 控制端口,而不是默认的 8080 端口
    • 能够看到尼菲的负载和标准输出
    • 在我的主机(共享目录)和容器(ls-目标)之间创建共享卷

    下面的示例命令实现上述操作(容器的名称为nifi2)

    Java

     

    x
    1
     
    1
    $Docker运行-名称nifi2-p80918080current- i--vv=/文档/Nifi/共享-目录: /选择//nifi/nifi-当前/ls-目标apache/nifi
    接下来是设置一个版本控制工具 + Nifi 注册表。

    启动尼菲注册表

    Nifi 注册表是 Nifi 的一个独立子项目,允许对 Nifi 流进行版本控制。它允许保存流的状态,在不同的 Nifi 应用程序之间共享流,启用回滚和其他版本控制功能。

    尽管您可以备份包含所有进程组信息的 flowfile.xml.gz文件,但管理版本的正确方式是使用 Nifi 注册表;它的主要优点是保存封装在专用流.json文件中每个进程组的更改的简单性。

    Nifi 注册表映像在 Docker Hub 中可用;运行命令 docker pull 以安装它。

    下面的 Docker 命令定义并启动 Nifi 注册表容器。我选择不使用 -d DockerHub 中建议的(分离)选项,因此容器的进度和输出将在运行时可见。与 Nifi 容器不同,这次我选择保留默认端口,因为我不会使用多个 Nifi 注册表实例。

    Java

     

    x
    1
    1
    $docker运行-名称nifi-注册表-p1808018080apache/nifi-注册表

    将 Nifi 应用程序连接到版本控制

    通常,我们可以将 Nifi 应用程序连接到一个或多个注册表。这允许使用多个环境时的灵活性。

    要将 Nifi 应用程序 (http://localhost:8091/nifi) 连接到注册表,让我们访问 Nifi 设置 + 注册表客户端 = 。


    接下来,我们需要注册表地址。由于 Nifi 应用程序未在主机上运行,因此尝试使用地址访问 Nifi注册表http://localhost:18080/nifi-registry将不起作用。我们的 Nifi 应用程序从容器内运行,因此它需要外部主机的 IP。主机的IP可以通过检查Nifi的容器获得

    Java

     

    x
    1
     
    1
    $码头检查nifi2

    主机的 IP 是网关


    由于此命令的输出范围很广,以下命令将仅获取网关的 IP:

    Java

     

    x
    1
     
    1
    $docker检查nifi2---格式==. 网络设置. 网络. bridge. 网关\'

    之后,Nifi 注册表的定义非常简单。下图显示了连接到两个 Nifi 注册表实例的 Nifi 应用程序。

    最后,在至少设置一个注册表后,有一个新的菜单选项来保存版本控件中的进程组。

    创建存储桶是设置 Nifi 注册表的下一个也是最后一步。存储桶是进程组一个或多个版本的容器;尼菲注册表中应该至少有一个篮子来容纳尼菲流。

    让我们创建一个存储桶,并称之为流开发。在下面的图像中,我创建了两个存储桶,一个用于开发,另一个用于暂存:

    虽然我们的 Nifi 画布是空的,没有流程组可以保存,但基础是在那里。创建进程组后,它可以保存到注册表中。

    现在,我们准备继续前进。

    探索尼菲容器

    一旦 Nifi 容器运行,我们可以运行命令进入 docker exec 容器并浏览它;以下命令在容器中运行 bash(使用参数 -i 进行交互,使用 -t表示终端):

    Java

     

    x
    1
    1
    $Docker执行官 -i--tnifi2/bin/bash

    在容器的目录中导航,可以到达包含 nifi/conf flow.xml.gz 文件的目录。此文件包含包含所有进程组的所有 Nifi UI 画布信息;任何更改将自动保存。

    另一个重要文件是 conf/nifi.properties 。它保存尼菲的配置,包括流.xml.gz的位置。您可以在此链接中阅读有关配置文件的信息

    尼菲存储库

    Nifi 有三个主要存储库:

    1. 流文件存储库:在活动流期间存储流文件的元数据。
    2. 内容存储库:保存流文件的实际内容。
    3. 证明存储库:将流文件的快照存储在每个处理器中。因此,它概述了详细的数据流和每个处理器中的更改,并允许深入了解事件链。

    进入 Nifi 容器后,这些存储库显示:

    导入尼菲流

    因此,在设置并运行 Nifi 应用程序并将其连接到 Nifi 注册表后,您就可以开始运行数据流了。

    与其从头开始构建 Nifi 流,我们不要加载现有流。有两个选项可将流加载到 Nifi 中:

    • 将模板导入 Nifi 流 (XML 文件)
    • 将流定义 (JSON 文件) 导入 Nifi 注册表,然后从它创建一个进程组。

    第一个是直截了当的,而后者是有点复杂,但可以很容易地使用Nifi工具包,这一点不在本文中。

    你可以从GitHub 下载示例模板,然后将其上传到您的 Nifi 应用程序中:

    了解模板的流程

    此模板包括五个步骤,从生成包含随机文本的文件、重命名文件、提取属性以及根据提取的属性将结果路由到目录(详细信息在GitHub 存储库中描述)。

    处理结束后,文件将保存在容器本身的本地目录下。返回容器的 run 命令显示它包含共享卷定义 ls-target ()。此卷是处理文件的目标,因此可以从主机访问:

    Java

     

    x
    1
     
    1
    $Docker运行-名称nifi2-p80918080-i-v=/文档/Nifi/共享-文件夹/选择/nifi/nifi-current当前/ls-目标apache/nifi i
    若要纠正此问题,请向目录授予权限(chmod命令)。

    也可以从容器内访问此目录:

    打开流程组的变量并查看目标文件夹定义:

    潜入尼菲处理器

    您可以在画布上右键单击(不是在特定处理器上)一次启动所有处理器,然后选择”开始按钮。一段时间后,文件应到达目标文件夹。

    每个生成的文件都经过更改其名称和根据其内容进行路由的过程;我宁愿保持简单,避免更改文件的内容。然而,此模板体现了 Nifi 处理器的一些功能:

    • 基于内容提取属性:

    • 使用 Nifi 表达语言:

    • 基于属性的路由逻辑:


    该模板使用以下处理器:

    • 生成流文件:生成流文件,可用于测试目的。它允许控制每个流文件的频率、数量和大小。
    • 提取文本:从流文件的内容中提取文本。
    • 更新部落:更新流文件的属性。
    • RouteOnAttribute: 创建动态关系,并基于属性的值定义路由逻辑。
    • PutFile:将 FlowFile 的内容保存到目录中。在此示例中,如果目标目录不存在,则处理器将创建目标目录,并设置文件的权限

    在很短的时间内,将 FlowFile 传递到更新文件名的连接器将达到限制,并抑制文件流到流中的上一个处理器。

    这种行为是后压定义的展示;连接器受流文件数量或其整体大小限制。如果激活此机制,它会指出问题。这是开始调查问题错误的触发器。这可能是一个问题,如故障处理器,或指示基本负载增加,因此流量应调整或分布。

     

    背压示例

    将流程组保存到 Nifi 注册表中

    运行模板并更改一点后,您想保存它。还记得尼菲登记处吗?

    选择”开始版本控制“,将进程组保存到之前创建的存储桶:

    Nifi 指示进程组的最新状态是否在版本控制中捕获:绿色检查表示已保存在注册表中。如果存在未提交到注册表的更改,则用暗星号发出信号。

     

     


    关闭前的一些最佳实践

    • 建议使用进程组,因为这是保存在 Nifi 注册表中的最小单位。
    • 使用注释、标签和颜色记录和组织 Nifi 流。
    • Nifi 具有广泛的自动化功能(Nifi REST API、Nifi工具包),本文未介绍这些功能

    apache.org/docs/nifi-docs/html/toolkit-guide.html”rel=”无跟随”目标=”_blank”=Nifi工具包可以使用命令行工具自动执行流程;除了更高效地工作之外,这种自动化的好处对于建立 CI/CD 管道也非常重要。

    包装

    如果一直到这里,您已经获得了一个工作平台来实现您的应用程序;此模板只使用几个基本处理器,但它可以是一个跳板,继续构建一个更复杂的流,使用令人印象深刻的各种Nifi处理器。

    这篇文章只划破了冰山一角;有许多方面和功能需要涵盖,如数据来源、日志记录、变量和参数、Nifi 服务、使用漏斗聚合处理器的输出等。自动化支持工具也是一个巨大的领域。

    我将这些主题留给后续文章。

    继续建设!

    • Lior

    Comments are closed.