Image title

以前的帖子 –第 1 部分第 2部分 |第 3 部分.

介绍

在本文中,我将使用 Flink 的CEP API 解决本系列第三部分中的事件关联问题。完成他们令人敬畏的 CEP 文档后,下面是我所做的。

注意:此示例中的所有代码在GitHub上都可用。

StreamExecutionEnvironment senv = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<SensorEvent> inputStream = senv
                                      .addSource( new EventSourceCEP() )
                                      .keyBy( (KeySelector<SensorEvent, String>) SensorEvent::getDeviceId);

在上面的代码块中,我创建了一个 EventSourceCEP 从 复制的类, EventSource 但它只发出一个事件对象,而不是任何元组。我们已使用此输入流 deviceId 作为密钥进行了分区。

SkipPastLastStrategy skipStrategy = AfterMatchSkipStrategy.skipPastLastEvent();

你也可能喜欢:阿帕奇Flinkvs.阿帕奇火花

在这里,我选择一个名为 的跳过策略 SKIP_PAST_LAST_EVENT ,根据 Flink,”放弃比赛开始后但结束之前开始的每一个部分匹配。这允许我只获得匹配的序列,而不是任何部分匹配。

Pattern<SensorEvent, ?> pattern = 
    Pattern
    .<SensorEvent>begin("start", skipStrategy)
    .where( new SimpleCondition<SensorEvent>() 
                {
                @Override
                public boolean filter(SensorEvent event) 
                {
                 return event.isConnected();
                }
                })
               .next("end")
               .where( new SimpleCondition<SensorEvent>() 
                {
                    @Override
                    public boolean filter(SensorEvent event) 
                    {
                      return event.isDisconnected();
                    }
               })
     .within( Time.seconds( 5 ) );

在这里,我定义一个模式。这是 CEP 的重要组成部分,我们开始使用 begin() 和 唯一名称定义模式 where() ,然后提供必须满足事件才能被视为匹配的条件。我定义了两种这样的模式 – 一个用于连接的事件,第二个用于断开连接的事件其他是 followedBy()对于放松,和 followedByAny()对于非确定性的放松毗连。

我还告诉 Flink 保持五秒钟,直到匹配模式完成,以便被视为有效。(我的持续时间为 10 秒,阈值为 2 秒,因此我创建了 5 秒的延迟。

PatternStream<SensorEvent> patternStream = CEP.pattern(inputStream, pattern);

patternStream.process(new PatternProcessFunction<SensorEvent, String>()
           {
            int count = 0;

            @Override
            public void processMatch(Map<String, List<SensorEvent>> match,
                                     Context ctx,
                                     Collector<String> out) throws Exception 
            {
                count++;

                if(count > THRESHOLD)
                {
                    String message = "Pattern found for " + match.get( "start" ).get( 0 ).deviceId;
                    out.collect(message);
                    count = 0;
                }
            }
        }).print();

一旦我有了模式,我需要将其传递给 CEP 以获取 PatternStream 的实例。process()对于我使用计数器的每个成功匹配都调用该方法,以确保 Collector::out() 仅在模式匹配超过阈值时才调用该方法。

我没有找到一种方法来告诉Flink,我需要一个完整的模式序列匹配n次。您可以使用 向各个模式提供此信息 times() ,但不能向整个序列提供此信息。我希望在即将推出的版本中,这个方便的功能也得到解决。

最后,打印到控制台。在现实世界中,这将推送到数据库,警报管理系统将从该数据库向用户显示它。

进一步阅读

Comments are closed.