squirrel-on-deck

以前的帖子 –第 1 部分第 2 部分.

介绍

在这篇文章中,让我们开始探索Flink来解决一个现实世界的问题。zalando.com的这篇文章显示了他们如何使用 Flink 执行复杂事件关联。我将采取一个简化和实际的事件关联问题,并尝试使用Flink解决。

问题

许多 IoT 设备(例如传感器)将状态信息发送到基于云的中央 IoT 管理系统,而后者又将这些状态更改发布为事件流,以便进一步处理和分析。

在此示例中,我假设以下格式的两类状态更改事件

{
  "event_name" : "CONNECTED",
  "timestamp" : "yyyy-MM-dd'T'HH:mm:ss:SSS",
  "device_id" : "UNIQUE-DEVICE-ID"
}

{
  "event_name" : "DISCONNECTED",
  "timestamp" : "yyyy-MM-dd hh:mm:ss:milli",
  "device_id" : "UNIQUE-DEVICE-ID"
}

因此,我将使用 Flink 解决的问题是 – “对于每个设备,确定”已连接”事件的模式序列,然后是”已连接”事件。如果这种模式序列在 10 秒的时间间隔内发生两次以上,则将其记录为异常(警报条件)”。

注意:所有代码示例都在GitHub上提供,因此我将仅共享重要的代码块。

您也可以喜欢:ApacheFlink基本转换示例

假设

  • 事件源在单个事件到达时可能会发送,也可能不发送这些事件,并可能发送批量事件。
  • 事件不排序,但按为每个设备生成事件的顺序到达。
  • 无法从源重新发送事件(无重播选项)。

解决方案一

首先,我实现了一个 EventSource 以每秒生成 1000 个伪随机事件,这些事件包装为Tuple2 类 Tuple2<SensorEvent、Integer> 的实例。第一个参数是 SensorEvent 对象本身,第二个参数是匹配计数(初始化为 0)。 SensorEvent 是一个简单的 POJO,用于将事件表示为 Java 对象。

然后,这 EventSource 被输入到我的 StreamExecutionEnvironment ,如下所示:

StreamExecutionEnvironment senv = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Tuple2<SensorEvent, Integer>> stream = senv.addSource(new EventSource());

接下来,我将构建如下数据管道:

stream
.keyBy( new KeySelector<Tuple2<SensorEvent,Integer>, String>()
 {
   @Override
   public String getKey( Tuple2<SensorEvent,Integer> value ) throws Exception
   {
     return value.f0.getDeviceId();
   }
 })

秒 ( 10 ) // 10 秒窗口
.减少(新的减少函数\lt;Tuple2<SensorEvent,整数>>()
{
@Override
公共 Tuple2<SensorEvent,整数>减少(元数2<SensorEvent,整数>前一部分,
元数2<SensorEvent,整数>曲线)
引发异常
{
查找模式
如果(前一个f0.是连接()&curr.f0.是断开连接())
{
找到的模式,当前事件的增量阈值
curr.setField(前v.f1 = 1,1 );
}
返回当前事件,该事件将在下一次计算中成为前事件
返回曲线;
})
.addSink(新沉功能<Tuple2<SensorEvent,整数>>()
{
@Override
公共 void 调用(图普尔2<SensorEvent,整数>结果)
{
如果 ( 结果.f1 > 阈值 )
{
system.out.println(”””””””””””””””,””,””,””,””;”
System.out.println(”设备 ID = ” = 结果.f0.deviceId = “总模式计数 = ” = 结果.f1);
system.out.println(”””””””””””””””,””,””,””,””;”
}
}
});


第 19 行到 25 行 – 一旦找到模式匹配,我将第二个参数递增为 curr 事件对象,并将其返回到 Flink,假定同一对象将成为下一个计算中的上一个事件对象。

运行此代码后,我没有看到任何打印到控制台(不起作用)。因此,我假设 Flink 保留当前事件对象并将其重用为下一个计算的前一个对象,结果结果为 false。

为了确认,我在 system.out 第 18 行之前添加了以下内容,以获取对象哈希码以及设备 ID。

System.out.println( "Device ID = " + prev.f0.getDeviceId() + " PREV = " + prev.hashCode() + " CURR = " + curr.hashCode() );

这是我得到的(服用 Device ID = 3 )。

Device ID = id-3 PREV = 949826765 CURR = -860377082
Device ID = id-3 PREV = -860377081 CURR = 1597592773
Device ID = id-3 RESULT = -29665316
Device ID = id-3 PREV = -1332999334 CURR = -1731677695
Device ID = id-3 PREV = -1731677694 CURR = -824321195
Device ID = id-3 PREV = -824321195 CURR = -1756705545
Device ID = id-3 RESULT = -1965323446

现在很清楚,我不能依靠内部 Flink 对象来保存信息并将其传递给流中的下一个事件。

出于好奇,我决定检查是否 reduce() 重复使用。我添加了另一个 system.out 作为 方法中的第一个语句 reduce() 。以下是我得到的:

Device ID = id-2 reduce() = 1183697316
Device ID = id-1 reduce() = 1320344024
Device ID = id-2 reduce() = 1183697316
Device ID = id-2 reduce() = 1183697316
Device ID = id-2 reduce() = 1183697316
Device ID = id-3 reduce() = 1732401171
Device ID = id-1 reduce() = 1320344024
Device ID = id-1 RESULT = -620487983
Device ID = id-2 RESULT = -766823468
Device ID = id-3 RESULT = 944562943
Device ID = id-1 reduce() = 1320344024
Device ID = id-3 reduce() = 1732401171
Device ID = id-2 reduce() = 1183697316

发现

  • 每个键控数据管道都获取其自己的实现副本 ReduceFunction
  • 此副本将在整个流周期中持久化,跨越时间窗口。
  • 所有键控计算都是独立的,不能通过 Flink 传递信息。

鉴于这些发现,我编辑了 reduce() 该方法,使之具有一个类变量,该变量将保存我的模式计数,如下所示:

// showing only new reduce method code snippet

f0 = curr.f0;

查找模式
如果(前一个f0.是连接()&curr.f0.是断开连接())
{
找到的模式,当前事件的增量阈值
tempResult.f0 = curr.f0;
tempResult.f1 = tempResult.f1 = 1;
}
返回临时结果;
}
})

这导致以下内容打印到控制台:

===============================
Device ID = id-1 Total pattern count = 3
===============================

它的工作,但有一个问题;在翻滚窗口周期结束并启动新模式计数后,此模式计数未初始化为 0。我最终使用相同的 tempResult 对象来存储匹配计数,并且它继续递增。这是一个假阳性,而不是我正在寻找的实际模式。

在所有这些“自己动手”之后,我决定与Flink核实它是否有数据流中这个常见用例的答案,我找到了CEP。

在下一篇文章中,我将分享我如何使用CEP解决这个问题。

进一步阅读

Comments are closed.