在 Apache Flink 中动态添加模式而无需重新启动作业

问题描述:

我的用例是我想对同一个数据流应用不同的 CEP 模式.CEP 模式是动态的我希望将它们添加到 flink 而不必重新启动作业.虽然所有条件都可以通过实现 IterativeCondition 的自定义类来处理,但我的主要问题是时间条件只接受 TimeWindow;无法处理.有没有办法根据输入元素设置传递给 .within() 的值?

My use case is that I want to apply different CEP patterns to the same datastream. the CEP patterns come dynamically & i want them to be added to flink without having to restart the job. While all conditions can be handled via custom classes that implement IterativeCondition, my main problem is that the temporal condition accepts only TimeWindow; which cannot be handled. Is there some way that the value passed to .within() be set based on the input elements?

这里问了类似的问题:Flink 和动态模板识别

最佳答案:可以添加的是一个 co-flat map 操作符,它在一个输入通道上接收事件,在另一个输入通道上接收模式.对于每个新接收的模式,要么更新现有的 NFA(缺少此功能),要么编译一个新的 NFA. 在后一种情况下,我们会将传入事件应用于所有存储的 NFA."

Best Answer: "What one could add is a co-flat map operator which receives on one input channel the events and on the other input channel patterns. For each newly received pattern one either updates the existing NFA (this functionality is missing) or compiles a new one. In the latter case, one would apply incoming events to all stored NFAs."

我正在尝试实现这一点,但我面临一些困难.具体来说,关于在后一种情况下,人们会将传入的事件应用于所有存储的 NFA"

I am trying to implement this but I am facing some difficulty. Specifically, on the point of "In the latter case, one would apply incoming events to all stored NFAs"

原因是我使用以下方法将流应用于模式:PatternStream matchStream = CEP.pattern(tmatchStream, pattern);

Reason being that I apply stream to pattern using: PatternStream matchStream = CEP.pattern(tmatchStream, pattern);

但是流tmatchStream"不会在 co-flatMap 中定义.我在这里遗漏了什么???任何帮助将不胜感激.

But the stream "tmatchStream" would not be defined inside the co-flatMap. Am I missing something here??? Any help would be greatly appreciated.

不幸的是,链接问题的答案仍然有效.Flink CEP 暂时不支持动态模式.不过已经有一张 JIRA 票:FLINK-7129

Unfortunately the answer to the linked question is still valid. Flink CEP does not support dynamic patterns at that moment. There is already a JIRA ticket for that though: FLINK-7129

该功能的最早合理目标版本将是 1.6.0

The earliest reasonable target version for that feature will be 1.6.0