既存のトピックのデータを別のストリームにストリームする単純なkストリームアプリケーションを構築しています。スレッド "StreamThread-1"のKafka-Stream例外java.lang.IllegalArgumentException:無効なタイムスタンプ-1
:私はでアプリケーションを実行すると、しかしpackage com.mycompany.app;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStreamBuilder;
import java.util.Properties;
public class App {
public static void main(String[] args) throws Exception {
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "my-first-streams-application");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, "localhost:2181");
props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.ByteArray().getClass().getName());
props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.ByteArray().getClass().getName());
props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 10);
KStreamBuilder builder = new KStreamBuilder();
builder.stream("test").to("testout");;
KafkaStreams streams = new KafkaStreams(builder, props);
streams.start();
Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
@Override
public void run() {
streams.close();
}
}));
}
}
:
私のプロデューサーがtest
というトピックにstreaming meetup's open dataであると私はtestout
トピックここ
にそれを処理するコードです
java -cp target/my-app-1.0-SNAPSHOT.jar com.mycompany.app.App
この例外が発生します:
Exception in thread "StreamThread-1" java.lang.IllegalArgumentException: Invalid timestamp -1
at org.apache.kafka.clients.producer.ProducerRecord.<init>(ProducerRecord.java:60)
at org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:72)
at org.apache.kafka.streams.processor.internals.StreamTask.forward(StreamTask.java:338)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:187)
at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:64)
at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:174)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:320)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:218)
私はずっと長い間探し続けてきましたが、なぜこのエラーが発生しているのかわかりません。
アイデア?