2017-03-22 17 views
0

既存のトピックのデータを別のストリームにストリームする単純なkストリームアプリケーションを構築しています。スレッド "StreamThread-1"のKafka-Stream例外java.lang.IllegalArgumentException:無効なタイムスタンプ-1

:私はでアプリケーションを実行すると、しかし

package com.mycompany.app; 

import org.apache.kafka.common.serialization.Serdes; 

import org.apache.kafka.streams.KafkaStreams; 
import org.apache.kafka.streams.StreamsConfig; 
import org.apache.kafka.streams.kstream.KStreamBuilder; 

import java.util.Properties; 


public class App { 

    public static void main(String[] args) throws Exception { 

     Properties props = new Properties(); 
     props.put(StreamsConfig.APPLICATION_ID_CONFIG, "my-first-streams-application"); 
     props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); 
     props.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, "localhost:2181"); 
     props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.ByteArray().getClass().getName()); 
     props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.ByteArray().getClass().getName()); 
     props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 10); 

     KStreamBuilder builder = new KStreamBuilder(); 

     builder.stream("test").to("testout");; 

     KafkaStreams streams = new KafkaStreams(builder, props); 
     streams.start(); 

     Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() { 
      @Override 
      public void run() { 
       streams.close(); 
      } 
     })); 
    } 
} 

私のプロデューサーがtestというトピックにstreaming meetup's open dataであると私はtestoutトピックここ

にそれを処理するコードです

java -cp target/my-app-1.0-SNAPSHOT.jar com.mycompany.app.App 

この例外が発生します:

Exception in thread "StreamThread-1" java.lang.IllegalArgumentException: Invalid timestamp -1 
    at org.apache.kafka.clients.producer.ProducerRecord.<init>(ProducerRecord.java:60) 
    at org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:72) 
    at org.apache.kafka.streams.processor.internals.StreamTask.forward(StreamTask.java:338) 
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:187) 
    at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:64) 
    at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:174) 
    at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:320) 
    at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:218) 

私はずっと長い間探し続けてきましたが、なぜこのエラーが発生しているのかわかりません。

アイデア?

答えて

0

から:http://docs.confluent.io/3.1.0/streams/faq.html#invalid-timestamp-exception

このエラーは、カフカストリームアプリケーションのタイムスタンプ抽出は、レコードから有効なタイムスタンプを抽出するために失敗したことを意味します。通常、これはレコードの問題を指しています(レコードにタイムスタンプがまったく含まれていないなど)が、アプリケーションによって使用されるタイムスタンプ抽出プログラムの問題またはバグを示している可能性もあります。

レコードは有効なタイムスタンプが含まれていないとき:デフォルトのConsumerRecordTimestampExtractorを使用している場合

  • を、(埋め込まれたレコードのタイムスタンプは、カフカのメッセージで導入されてしまったあなたのレコードが埋め込まれたタイムスタンプを運ばない可能性が最も高いですKafka 0.10のフォーマット)。これは、たとえば、古いカフカプロデューサクライアント(バージョン0.9以前)またはサードパーティのプロデューサクライアントによって書き込まれたトピックを使用する場合に発生します。これが起こる別の状況は、カフカクラスタを0.9から0.10にアップグレードした後で、0.9で生成されたすべてのデータに0.10メッセージタイムスタンプが含まれていない場合です。
  • カスタムタイムスタンプエクストラクタを使用している場合は、エクストラクタが無効な(負の)タイムスタンプを正しく処理していることを確認してください。アプリケーションのセマンティクスに依存します。たとえば、有効なタイムスタンプを抽出できない場合(おそらく、データのタイムスタンプフィールドが欠落している可能性がある場合)、デフォルトまたは推定タイムスタンプを返すことができます。
  • WallclockTimestampExtractor経由で処理時セマンティクスに切り替えることもできます。そのようなフォールバックがこの状況に対する適切な対応であるかどうかは、ユースケースによって異なります。 しかし、最初のステップとして、そのような問題のあるレコードが最初にカフカに書かれた理由についての根本原因を特定して修正する必要があります。 2番目のステップでは、そのようなレコードを扱う際に(例えば、それらのレコードを処理する必要がある場合など)、回避策を適用することを検討することができます。もう1つの選択肢は、正しいタイムスタンプでレコードを再生成し、新しいカフカのトピックに書き込むことです。
関連する問題