Kafka 流处理器上下文中的定期 NPE

Kafka 流处理器上下文中的定期 NPE

问题描述:

使用 kafka-streams 0.10.0.0,我在转发消息时定期在 StreamTask 中看到空指针异常.它在 10% 到 50% 的调用之间变化.NPE 发生在这种方法中:

Using kafka-streams 0.10.0.0, I am periodically seeing a null pointer exception in the StreamTask when forwarding a message. It varies between 10% to 50% of the invocations. The NPE occurs in this method:

public <K, V> void forward(K key, V value) {
    ProcessorNode thisNode = currNode;
    try {
        for (ProcessorNode childNode : (List<ProcessorNode<K, V>>) thisNode.children()) {
            currNode = childNode;
            childNode.process(key, value);
        }
    } finally {
        currNode = thisNode;
    }
}

似乎在某些情况下,thisNode 字段为空.知道是什么原因造成的吗?堆栈跟踪如下.

It seems that in some cases, the thisNode field is null. Any idea what might be causing this ? The stack trace is below.

[ERROR] 2016-08-21 14:50:39.288 [StreamThread-1] StreamedMetricMeter - Forwarding failed
java.lang.NullPointerException
    at org.apache.kafka.streams.processor.internals.StreamTask.forward(StreamTask.java:336) ~[kafka-streams-0.10.0.0.jar:?]
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:187) ~[kafka-streams-0.10.0.0.jar:?]
    at com.heliosapm.streams.metrics.processors.AbstractStreamedMetricProcessor.forward(AbstractStreamedMetricProcessor.java:552) [classes/:?]
    at com.heliosapm.streams.metrics.processors.impl.StreamedMetricMeter.doProcess(StreamedMetricMeter.java:89) [classes/:?]
    at com.heliosapm.streams.metrics.processors.impl.StreamedMetricMeter.doProcess(StreamedMetricMeter.java:1) [classes/:?]
    at com.heliosapm.streams.metrics.processors.AbstractStreamedMetricProcessor.process(AbstractStreamedMetricProcessor.java:166) [classes/:?]
    at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:68) [kafka-streams-0.10.0.0.jar:?]
    at org.apache.kafka.streams.processor.internals.StreamTask.forward(StreamTask.java:338) [kafka-streams-0.10.0.0.jar:?]
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:187) [kafka-streams-0.10.0.0.jar:?]
    at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:64) [kafka-streams-0.10.0.0.jar:?]
    at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:174) [kafka-streams-0.10.0.0.jar:?]
    at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:320) [kafka-streams-0.10.0.0.jar:?]
    at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:218) [kafka-streams-0.10.0.0.jar:?]

问题是我的 ProcessorSupplier 每次调用 get.反过来,Kafka Streams 引擎试图创建多个处理器实例,这无疑引发了多线程垃圾箱火灾.请注意同样粗心的人...... ProcessorSupplier.get() 应该在每次调用时返回一个新的处理器实例.

The problem was that my ProcessorSuppliers were returning the same instance of the Processor for every call to get. In turn, the Kafka Streams engine was attempting to create multiple processor instances, which I have no doubt created a multi-threaded dumpster fire. Note to the similarly unwary.... ProcessorSupplier.get() should return a new instance of a processor on each call.