Image title

Apache NiFi 现已在 1.10 中提供!

https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12316020&version=12344993

您现在可以使用 JDK 8 或 JDK 11!我在JDK 11中运行,它似乎有点快。一个巨大的功能是增加参数!您可以使用它们将参数传递给 Apache NiFi 无状态!

一些较小的处理器已经从主下载中移动。有关迁移提示,请参阅以下链接:

https://cwiki.apache.org/confluence/display/NIFI/Migration+Guidance

发行说明https://cwiki.apache.org/confluence/display/NIFI/Release+Notes#ReleaseNotes-Version1.10.0

示例源代码:https://github.com/tspannhw/stateless-examples

更多新功能:

  • 帕奎特阅读器/作家(参见:https://www.datainmotion.dev/2019/10/migrating-apache-flume-flows-to-apache_7.html。
  • 普罗米乌斯报告任务。期待更多的普罗米有斯功能即将推出。
  • 实验加密内容存储库。人们以前要我这个
  • 参数!!替换变量/变量注册表的时间。参数在各方面都更好。
  • 用于生成和构建 NiFi 的斯瓦格 API 库的工具包模块。
  • 后Slack处理器。
  • 发布卡夫卡分区支持。
  • 地理丰富记录处理器。
  • 进程组中的远程输入端口。
  • 命令行诊断。
  • 罗克斯DB流文件存储库。
  • 放置大查询流式处理处理器。
  • nifi.analytics.预测.启用 – 打开背压预测。
  • ETL/ELT 的更多查找服务:数据库记录查找服务。
  • 库杜查找服务。
  • HBase_2_ListLookupService。

您可能还喜欢:对 Apache NiFi 数据流的温和介绍…和一些克洛朱雷。

无 国籍

首先,我们将直接从NiFi注册表在命令行中运行。然后,我们将运行从YARN!是的,您现在可以在巨大的 Cloudera CDH/HDP/CDP YARN 群集上运行 Apache NiFi 流!让我们利用你的数百个Hadoop节点。

无状态示例

让我们构建无状态流

首先要记住的是,我们希望任何可能更改的都是我们可以通过 JSON 文件传递的参数。即使对于下拉列表,设置参数也非常简单!您甚至会提示您从选择列表中选取参数。在参数可用之前,您需要将它们添加到参数列表中,并将该参数上下文分配给处理组。

处理器配置中的参数显示为[代理]
Configuring processor 配置处理器

连接到进程组、控制器服务、…

Updating parameter context

更新参数上下文

应用这些参数

Applying parameter context

应用参数上下文

参数(eter)现在是属性的选项

Parameter now an option for properties

参数现在是属性的选项

用于使用参数的弹出提示

Pop-up parameter hint

弹出参数提示

在参数上下文中编辑参数

Editing parameter context

编辑参数上下文

我们也可以在控制器服务中配置参数bp.blogspot.com/-h52kcDu6of8/XcHJYxpRBZI/AAAAAAAAYwo/lnRPGegY2SM29F3YFtYFkqOUK-LHU5oFQCLcBGAsYHQ/s1600/addingParametersToDropDowns.png”*Configure controller service

配置控制器服务

如此容易选择现有的。
Choosing existing parameter

选择现有参数

将它们用于任何可以更改或您不想硬编码的内容。

阿帕奇卡夫卡消费者下沉

这是一个简单的两步Apache NiFi流,从Kafka读取输出,并将输出发送到一个接收器,例如文件。

Consuming Kafka 2.0 messages

消耗卡夫卡 2.0 消息

让我们确保我们使用该参数上下文

Using parameter context

使用参数上下文

要构建 JSON 配置文件,您需要来自 Apache NiFi 注册表的存储桶 ID 和流 ID您可以在类似于http://tspann-mbp15-hw14277:18080的 URL 上浏览该注册表。

Stateless Kafka — Production

无国籍卡夫卡 – 生产

我的命令行运行程序

/用户/tspann/文档/nifi-1.10.0-SNAPSHOT/bin/nifi.sh无状态运行从注册表连续— 文件 /用户/tspann/文档/nifi-1.10.0-SNAPSHOT/logs/kafkauser.json

从注册表运行 [一次]连续* — 文件 <文件名称 >

这是使用文件从命令行运行的基本用例。流必须存在于引用 Apache NiFi 注册表中。

JSON 配置文件(卡夫卡消费者.json)

{

  "registryUrl": "http://tspann-mbp15-hw14277:18080",

  "bucketId": "140b30f0-5a47-4747-9021-19d4fde7f993",

  "flowId": "0540e1fd-c7ca-46fb-9296-e37632021945",

  "ssl": {

    "keystoreFile": "",

    "keystorePass": "",

    "keyPass": "",

    "keystoreType": "",

    "truststoreFile": "/Library/Java/JavaVirtualMachines/amazon-corretto-11.jdk/Contents/Home/lib/security/cacerts",

    "truststorePass": "changeit",

    "truststoreType": "JKS"

  },

  "parameters": {

    "broker" : "4.317.852.100:9092",

    "topic" : "iot",

    "group_id" : "nifi-stateless-kafka-consumer",

    "DestinationDirectory" : "/tmp/nifistateless/output2/",

    "output_dir": "/Users/tspann/Documents/nifi-1.10.0-SNAPSHOT/logs/output"

  }

}

示例运行

12:25:38.725 [main] DEBUG org.apache.nifi.processors.kafka.pubsub.ConsumeKafka_2_0 - ConsumeKafka_2_0[id=e405df7f-87ca-305a-95a9-d25e3c5dbb56] Running ConsumeKafka_2_0.onTrigger with 0 FlowFiles

12:25:38.728 [main] DEBUG org.apache.kafka.clients.FetchSessionHandler - [Consumer clientId=consumer-1, groupId=nifi-stateless-kafka-consumer] Node 8 sent an incremental fetch response for session 1943199939 with 0 response partition(s), 10 implied partition(s)

12:25:38.728 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=consumer-1, groupId=nifi-stateless-kafka-consumer] Added READ_UNCOMMITTED fetch request for partition iot-8 at offset 15 to node ip-10-0-1-244.ec2.internal:9092 (id: 8 rack: null)

12:25:38.728 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=consumer-1, groupId=nifi-stateless-kafka-consumer] Added READ_UNCOMMITTED fetch request for partition iot-9 at offset 16 to node ip-10-0-1-244.ec2.internal:9092 (id: 8 rack: null)

12:25:38.728 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=consumer-1, groupId=nifi-stateless-kafka-consumer] Added READ_UNCOMMITTED fetch request for partition iot-6 at offset 17 to node ip-10-0-1-244.ec2.internal:9092 (id: 8 rack: null)

12:25:38.728 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=consumer-1, groupId=nifi-stateless-kafka-consumer] Added READ_UNCOMMITTED fetch request for partition iot-7 at offset 17 to node ip-10-0-1-244.ec2.internal:9092 (id: 8 rack: null)

12:25:38.728 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=consumer-1, groupId=nifi-stateless-kafka-consumer] Added READ_UNCOMMITTED fetch request for partition iot-4 at offset 18 to node ip-10-0-1-244

内部:9092 (id: 8 机架: 空)

12:25:38.728 [main] DEBUG org.apache.kafka.client.client.内部 – [消费者客户端 Id_消费者-1,groupId_nifi-无状态-卡夫卡-消费者] 添加了READ_UNCOMMITTED在偏移 16 到节点时获取分区 iot-5 的请求ip-10-0-1-244.ec2.内部:9092 (id: 8 机架: 空)

12:25:38.728 [main] DEBUG org.apache.kafka.client.client.内部 – [消费者客户端 Id_消费者-1,groupId_nifi-无状态-卡夫卡-消费者] 添加了READ_UNCOMMITTED在偏移 17 到节点处获取分区 iot-2 的请求ip-10-0-1-244.ec2.内部:9092 (id: 8 机架: 空)

12:25:38.728 [main] DEBUG org.apache.kafka.client.client.内部 – [使用者客户端 Id_消费者-1,groupId_nifi-无状态-卡夫卡-消费者] 添加了READ_UNCOMMITTED在偏移 19 到节点处获取分区 iot-3 请求ip-10-0-1-244.ec2.内部:9092 (id: 8 机架: 空)

12:25:38.728 [main] DEBUG org.apache.kafka.client.client.内部 – [使用者客户端 Id_消费者-1,groupId_nifi-无状态-卡夫卡-消费者] 添加了READ_UNCOMMITTED在偏移 16 到节点时获取分区 iot-0 请求ip-10-0-1-244.ec2.内部:9092 (id: 8 机架: 空)

12:25:38.728 [main] DEBUG org.apache.kafka.client.client.内部 – [消费者客户端 Id_消费者-1,groupId_nifi-无状态-卡夫卡-消费者] 添加了READ_UNCOMMITTED在偏移 20 到节点时获取分区 iot-1 的请求ip-10-0-1-244.ec2.内部:9092 (id: 8 机架: 空)

12:25:38.728 [main] DEBUG org.apache.kafka.clients.FetchSessionHandler – [使用者客户端 Id=使用者-1,groupId_nifi-无状态-卡夫卡-使用者] 为节点 8 构建了增量提取(会话 Id=1943199939,EPOCH=5)添加了 0 个分区,更改了 0 个分区,从 10 个分区中删除了 0 个分区

12:25:38.729 [主] DEBUG org.apache.kafka.client.client.内部 – [消费者客户端 Id=使用者-1,groupId_nifi-无状态-卡夫卡-消费者] 发送READ_UNCOMMITTED增量请求(toSend_(toForget_(),暗示=(it-8, iot-9, iot-6, iot-7, iot-4, iot-5, iot-2, iot-3, iot-0, iot-1)) 代理 ip-10-0-1-244.ec2.内部:9092 (ID: 8 机架: 空)

12:25:38.737 [主] DEBUG org.apache.nifi.nfi.处理器.kafka.pubsub.ConsumeKafka_2_0 – ConsumeKafka_2_0[id]e405df7f-87ca-305a-95a9-d25e3c5dbb56_ 运行 ConsumeKafka_2_0.onTrigger 与 0 FlowFiles

输出示例

cat output/247361879273711.statelessFlowFile

{"id":"20191105113853_350b493f-9308-4eb2-b71f-6bcdbaf5d6c1_Timer-Driven Process Thread-13","te":"0.5343","diskusage":"0.2647115097153814.3 MB","memory":57,"cpu":132.87,"host":"192.168.1.249/tspann-MBP15-HW14277","temperature":"72","macaddress":"dd73eadf-1ac1-4f76-aecb-14be86ce46ce","end":"48400221819907","systemtime":"11/05/2019 11:38:53"}

Command line output

输出示例

我们还可以运行一在此示例中,以发送一个卡夫卡消息。

发电机阿帕奇卡夫卡生产者

Generator to Producer flowJSON 配置文件(卡夫卡.json)

{

  "registryUrl": "http://tspann-mbp15-hw14277:18080",

  "bucketId": "140b30f0-5a47-4747-9021-19d4fde7f993",

  "flowId": "402814a2-fb7a-4b19-a641-9f4bb191ed67",

  "flowVersion": "1",

  "ssl": {

    "keystoreFile": "",

    "keystorePass": "",

    "keyPass": "",

    "keystoreType": "",

    "truststoreFile": "/Library/Java/JavaVirtualMachines/amazon-corretto-11.jdk/Contents/Home/lib/security/cacerts",

    "truststorePass": "changeit",

    "truststoreType": "JKS"

  },

  "parameters": {

    "broker" : "3.218.152.236:9092"

  }

}

输出示例

12:32:37.717 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.network.Selector - [Producer clientId=producer-1] Created socket with SO_RCVBUF = 33304, SO_SNDBUF = 131768, SO_TIMEOUT = 0 to node 8 12:32:37.717 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Completed connection to node 8. Fetching API versions. 12:32:37.717 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Initiating API versions fetch from node 8. 12:32:37.732 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Recorded API versions for node 8: (Produce(0): 0 to 7 [usable: 6], Fetch(1): 0 to 10 [usable: 8], ListOffsets(2): 0 to 5 [usable: 3], Metadata(3): 0 to 7 [usable: 6], LeaderAndIsr(4): 0 to 2 [usable: 1], StopReplica(5): 0 to 1 [usable: 0], UpdateMetadata(6): 0 to 5 [usable: 4], ControlledShutdown(7): 0 to 2 [usable: 1], OffsetCommit(8): 0 to 6 [usable: 4], OffsetFetch(9): 0 to 5 [usable: 4], FindCoordinator(10): 0 to 2 [usable: 2], JoinGroup(11): 0 to 4 [usable: 3], Heartbeat(12): 0 to 2 [usable: 2], LeaveGroup(13): 0 to 2 [usable: 2], SyncGroup(14): 0 to 2 [usable: 2], DescribeGroups(15): 0 to 2 [usable: 2], ListGroups(16): 0 to 2 [usable: 2], SaslHandshake(17): 0 to 1 [usable: 1], ApiVersions(18): 0 to 2 [usable: 2], CreateTopics(19): 0 to 3 [usable: 3], DeleteTopics(20): 0 to 3 [usable: 2], DeleteRecords(21): 0 to 1 [usable: 1], InitProducerId(22): 0 to 1 [usable: 1], OffsetForLeaderEpoch(23): 0 to 2 [usable: 1], AddPartitionsToTxn(24): 0 to 1 [usable: 1], AddOffsetsToTxn(25): 0 to 1 [usable: 1], EndTxn(26): 0 to 1 [usable: 1], WriteTxnMarkers(27): 0 [usable: 0], TxnOffsetCommit(28): 0 to 2 [usable: 1], DescribeAcls(29): 0 to 1 [usable: 1], CreateAcls(30): 0 to 1 [usable: 1], DeleteAcls(31): 0 to 1 [usable: 1], DescribeConfigs(32): 0 to 2 [usable: 2], AlterConfigs(33): 0 to 1 [usable: 1], AlterReplicaLogDirs(34): 0 to 1 [usable: 1], DescribeLogDirs(35): 0 to 1 [usable: 1], SaslAuthenticate(36): 0 to 1 [usable: 0], CreatePartitions(37): 0 to 1 [usable: 1], CreateDelegationToken(38): 0 to 1 [usable: 1], RenewDelegationToken(39): 0 to 1 [usable: 1], ExpireDelegationToken(40): 0 to 1 [usable: 1], DescribeDelegationToken(41): 0 to 1 [usable: 1], DeleteGroups(42): 0 to 1 [usable: 1], UNKNOWN(43): 0) 12:32:37.739 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name topic.iot.records-per-batch 12:32:37.739 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name topic.iot.bytes 12:32:37.739 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name topic.iot.compression-rate 12:32:37.739 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name topic.iot.record-retries 12:32:37.740 [kafka-producer-network-thread | producer-1] DEBUG org

kafka.common.metrics.metrics – 添加了带有名称主题.iot.record-错误的传感器 12:32:37.745 [main] DEBUG org.apache.nifi.参数.expression语言感知参数参数分析器 – 对于输入 iot 找到 0 参数引用: * 12:32:37.745 [主 DEBUG]org.apache.nifi.parameter.ExpressionLanguageAwareParameterParser – For input iot found 0 Parameter references: [] Flow Succeeded 12:32:37.717 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.network.Selector – [生产者客户端 Id_product-1_ 创建具有SO_RCVBUF = 33304、SO_SNDBUF = 131768、SO_TIMEOUT = 0 到节点 8 的套接字

12:32:37.717 [卡夫卡-生产者-网络线程 ] product org.apache.kafka.client.NetworkClient – [生产者客户端 Id_生产者-1] 完成与节点 8 的连接。正在获取 API 版本。

12:32:37.717 [kafka-生产者-网络线程 ] product org.apache.kafka.client.networkClient – [生产者客户端 Id_生产者-1] 启动 API 版本从节点 8 提取。

12:32:37.732 [kafka-生产者-网络线程] PRODUCT.apache.kafka.client.NetworkClient – [生产者客户端 Id_product-1] 节点 8 录制的 API 版本:(生产(0): 0 到 7 [可用: 6], 提取 (1): 0 到 10 [可用: 8], 列表偏移(2): 0 到5 [可用: 3] , 元数据 (3): 0 到 7 [可用: 6], 引线和Isr(4): 0 到 2 [可用: 1], 停止副本 (5): 0 到 1 [可用: 0], 更新元数据 (6): 0 到 5 [可用: 4], 受控关机 (7): 0 到 2 [可用: 1], 偏移提交(8): 0 到 6 [可用: 4]: 0 到 5 [可用: 4], 查找协调器 (10): 0 到 2 [可用: 2] , JoinGroup (11): 0 到 4 [可用: 3], 心跳 (12): 0 到 2 [可用: 2], 离开组 (13): 0 到 2 [可用: 2], 同步组 (14): 0 到 2 [可用: 2] ,描述组 (15): 0 到 2 [可用]: 2*, 列表组(16):0到2[可用:2],SaslHandshake(17):0到1[可用:1],ApiVersions(18):0到2[可用:2],创建主题(19):0到3[可用:3],删除主题(20):0到3[可用:2],删除记录(21):0到1[可用:1],InitProducerId(22)到: 1], 偏移为引线(23): 0 到 2 [可用: 1], AddpartitionsToTxn(24):0到1[可用:1],AddOffsetsToTxn(25):0到1[可用:1],EndTxn(26):0到1[可用:1],写入TxnMarkers(27):0[可用:0],TxnOffset提交(28):0到2[可用:1],描述Acls(29)): 0 到 1 [可用: 1], 创建 Acls (30): 0 到 1 [可用: 1], 删除 Acls (31): 0 到 1 [可用: 1], 描述 Configs (32): 0 到 2 [可用: 2], AlterConfigs (33): 0 到 1 [可用: 1], Alter 复制日志Dirs (34): 0 到 1 [可用: 1]: 1], Sasl身份验证 (36): 0 到 1 [可用: 0], 创建分区 (37): 0 到 1 [可用: 1], 创建委派令牌 (38): 0 到 1 [可用: 1], 续订委派令牌 (39): 0 到 1 [可用: 1], 过期委派令牌 (40): 0 到 1 [可用: 1],描述委派令牌(41): 0 到 1 [可用: 1], 删除组 (42): 0 到 1 [可用: 1], 未知 (43): 0)

12:32:37.739 [卡夫卡-生产者-网络线程 ] PRODUCT org.apache.kafka.common.metrics. – 添加了带有名称主题.iot.记录的传感器。”

12:32:37.739 [卡夫卡-生产者-网络线程 ] PRODUCT org.apache.kafka.common.metrics. – 添加了带有名称主题.iot.bytes 的传感器

12:32:37.739 [卡夫卡-生产者-网络线程 ] PRODUCT org.apache.kafka.common.metrics. – 添加了带有名称主题.iot.压缩率的传感器

12:32:37.739 [卡夫卡-生产者-网络线程 ] PRODUCT org.apache.kafka.common.metrics. – 添加了带有名称主题.iot.record-retries 的传感器

12:32:37.740 [卡夫卡-生产者-网络线程 ] PRODUCT org.apache.kafka.common.metrics. – 添加了带有名称主题.iot.记录错误的传感器

12:32:37.745 [main] DEBUG org.apache.nifi.参数.表达式语言感知参数参数 – 对于输入 iot 找到 0 参数引用: |

12:32:37.745 [main] DEBUG org.apache.nifi.参数.表达式语言感知参数参数 – 对于输入 iot 找到 0 参数引用: |

流成功

其他运行时选项:

RunYARNServiceFromRegistry        <YARN RM URL> <Docker Image Name> <Service Name> <# of Containers> --file <File Name>

RunOpenwhiskActionServer          <Port>

引用:

  • 关于 NiFi 1 的真棒文章

com/@abdelkrim.hadjidj/apache-nifi-1-10 系列简化错误处理-7de86f130acd”\https://medium.com/@abdelkrim.hadjidj/apache-nifi-1-10 系列简化错误处理-7de86f130acd

  • https://www.datainmotion.dev/2019/08/find-cacerts-from-java-jre-lib-security.html
  • https://github.com/apache/nifi/tree/master/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-stateless
  • https://nifi.apache.org/docs/nifi-docs/html/user-guide.html
  • 添加到 API 的参数:https://nifi.apache.org/docs/nifi-docs/rest-api/index.html
  • http://bit.ly/cdf-platform
  • https://www.mtnfog.com/blog/apache-nifi-phi-processing
  • https://www.slideshare.net/BryanBende/apache-nifi-sdlc-improvements
  • https://nifi.apache.org/registry
  • 在进程组内添加 S2S 端口

    Adding input port

    添加输入端口

    帕奎特读者

    Adding controller service

    添加控制器服务

    帕奎特记录集

    Adding controller service

    添加控制器服务

    示例欧普特

    Example outputExample output

    Example output

    Example output

    进一步阅读

    Comments are closed.