2016-08-21 4 views
1

kafka-streams 0.10.0.0を使用して、メッセージを転送するときにStreamTaskでヌルポインタ例外が定期的に発生しています。呼び出しの10%から50%の間で変化します。 NPEは、この方法で行われます。カフカストリームプロセッサコンテキストでの定期NPE

public <K, V> void forward(K key, V value) { 
    ProcessorNode thisNode = currNode; 
    try { 
     for (ProcessorNode childNode : (List<ProcessorNode<K, V>>) thisNode.children()) { 
      currNode = childNode; 
      childNode.process(key, value); 
     } 
    } finally { 
     currNode = thisNode; 
    } 
} 

いくつかのケースでは、thisNodeフィールドがヌルであると思われます。これを引き起こしているのは何ですか?スタックトレースは以下の通りです。

[ERROR] 2016-08-21 14:50:39.288 [StreamThread-1] StreamedMetricMeter - Forwarding failed 
java.lang.NullPointerException 
    at org.apache.kafka.streams.processor.internals.StreamTask.forward(StreamTask.java:336) ~[kafka-streams-0.10.0.0.jar:?] 
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:187) ~[kafka-streams-0.10.0.0.jar:?] 
    at com.heliosapm.streams.metrics.processors.AbstractStreamedMetricProcessor.forward(AbstractStreamedMetricProcessor.java:552) [classes/:?] 
    at com.heliosapm.streams.metrics.processors.impl.StreamedMetricMeter.doProcess(StreamedMetricMeter.java:89) [classes/:?] 
    at com.heliosapm.streams.metrics.processors.impl.StreamedMetricMeter.doProcess(StreamedMetricMeter.java:1) [classes/:?] 
    at com.heliosapm.streams.metrics.processors.AbstractStreamedMetricProcessor.process(AbstractStreamedMetricProcessor.java:166) [classes/:?] 
    at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:68) [kafka-streams-0.10.0.0.jar:?] 
    at org.apache.kafka.streams.processor.internals.StreamTask.forward(StreamTask.java:338) [kafka-streams-0.10.0.0.jar:?] 
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:187) [kafka-streams-0.10.0.0.jar:?] 
    at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:64) [kafka-streams-0.10.0.0.jar:?] 
    at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:174) [kafka-streams-0.10.0.0.jar:?] 
    at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:320) [kafka-streams-0.10.0.0.jar:?] 
    at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:218) [kafka-streams-0.10.0.0.jar:?] 
+1

トポロジコードを共有できますか? '0.10.0.1'を試しましたか? –

+0

それを考え出した。答えを参照してください。そのプログラマーのエラーはとても大変でした。私が気付くと、私は0.10.0.1でテストしませんでした。 – Nicholas

+0

ところで、この問題は '0.10.2.1'でも発生します。 あなたの答えは命を救いました、ありがとう! – Esk

答えて

3

問題は私のProcessorSupplier sがgetへのすべての呼び出しのためにプロセッサの同じインスタンスを返したということでした。次に、Kafka Streamsエンジンは複数のプロセッサインスタンスを作成しようとしていましたが、疑いなくマルチスレッドダンプスター火災が発生しました。同じように注意しないでください.... ProcessorSupplier.get()は、各呼び出しでプロセッサの新しいインスタンスを返す必要があります。

+0

あなた自身の答えを受け入れるかもしれません:) –

+0

お電話。ありがとう! – Nicholas