2017-08-23 13 views
0

カフカストリームを学習するための単語数を試しています。以下は使用されるコードです。私はプロジェクトから太った瓶を作り、トピックword-count-input1へのメッセージを作り始め、word-count-output1からの出力をフェッチしています。しかし、私は脂肪のjarファイルを実行すると例外見ています - org.apache.kafka.streams.errors.StreamsException: Failed to deserialize value for record.カフカストリームの脂肪瓶を実行しているときの例外

Properties properties = new Properties(); 

    properties.put(StreamsConfig.APPLICATION_ID_CONFIG,"word-count-example"); 
    properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,"127.0.0.1:9092"); 
    properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest"); 
    properties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); 
    properties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Long().getClass()); 

    KStreamBuilder builder = new KStreamBuilder(); 

    // 1. Stream from kafka 

    KStream<String,String> wordcountInput = builder.stream("word-count-input1"); 

    // 2. map values to lower case 

    KTable<String, Long> wordcount = wordcountInput.mapValues(value -> value.toLowerCase()) 

            // 3. split by space on values 
            .flatMapValues(value -> Arrays.asList(value.split(" "))) 

            // 4. Create a key to apply a key, so the word itself is a key 

            .selectKey((ignoredKey,words) -> words) 

            // 5. Group it by key 

            .groupByKey() 

            // 6. count occurences, add a column name - counts 

            .count("counts"); 

    // Since the StreamsConfig was set to String and String, its mandatory to specify the Serdes o/p which is String and Long in our case 
    wordcount.to(Serdes.String(),Serdes.Long(),"word-count-output1"); 

    KafkaStreams streams = new KafkaStreams(builder, properties); 
    streams.start(); 
    System.out.println("Topology is " + streams.toString()); 

が例外:

INFO stream-thread [word-count-example-052c3c3e-8dfd-40e7-8b5b-7ee06e3af96a-StreamThread-1] Shutting down (org.apache.kafka.streams.processor.internals.StreamThread:1040) 
INFO Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms. (org.apache.kafka.clients.producer.KafkaProducer:972) 
INFO stream-thread [word-count-example-052c3c3e-8dfd-40e7-8b5b-7ee06e3af96a-StreamThread-1] Removing all active tasks [0_0, 1_0, 0_1, 1_1, 0_2, 1_2] (org.apache.kafka.streams.processor.internals.StreamThread:1407) 
INFO stream-thread [word-count-example-052c3c3e-8dfd-40e7-8b5b-7ee06e3af96a-StreamThread-1] Removing all standby tasks [] (org.apache.kafka.streams.processor.internals.StreamThread:1421) 
INFO stream-thread [word-count-example-052c3c3e-8dfd-40e7-8b5b-7ee06e3af96a-StreamThread-1] Stream thread shutdown complete (org.apache.kafka.streams.processor.internals.StreamThread:1072) 
WARN stream-thread [word-count-example-052c3c3e-8dfd-40e7-8b5b-7ee06e3af96a-StreamThread-1] Unexpected state transition from RUNNING to DEAD. (org.apache.kafka.streams.processor.internals.StreamThread:978) 
Exception in thread "word-count-example-052c3c3e-8dfd-40e7-8b5b-7ee06e3af96a-StreamThread-1" org.apache.kafka.streams.errors.StreamsException: Failed to deserialize value for record. topic=word-count-input1, partition=0, offset=0 
     at org.apache.kafka.streams.processor.internals.SourceNodeRecordDeserializer.deserialize(SourceNodeRecordDeserializer.java:46) 
     at org.apache.kafka.streams.processor.internals.RecordQueue.addRawRecords(RecordQueue.java:84) 
     at org.apache.kafka.streams.processor.internals.PartitionGroup.addRawRecords(PartitionGroup.java:117) 
     at org.apache.kafka.streams.processor.internals.StreamTask.addRecords(StreamTask.java:464) 
     at org.apache.kafka.streams.processor.internals.StreamThread.addRecordsToTasks(StreamThread.java:650) 
     at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:556) 
     at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:527) 
Caused by: org.apache.kafka.common.errors.SerializationException: Size of data received by LongDeserializer is not 8 
INFO stream-client [word-count-example-052c3c3e-8dfd-40e7-8b5b-7ee06e3af96a] State transition from RUNNING to PENDING_SHUTDOWN. (org.apache.kafka.streams.KafkaStreams:229) 
INFO stream-thread [word-count-example-052c3c3e-8dfd-40e7-8b5b-7ee06e3af96a-StreamThread-1] Informed thread to shut down (org.apache.kafka.streams.processor.internals.StreamThread:900) 
WARN stream-thread [word-count-example-052c3c3e-8dfd-40e7-8b5b-7ee06e3af96a-StreamThread-1] Unexpected state transition from DEAD to PENDING_SHUTDOWN. (org.apache.kafka.streams.processor.internals.StreamThread:978) 
INFO stream-client [word-count-example-052c3c3e-8dfd-40e7-8b5b-7ee06e3af96a] Stopped Kafka Streams process. (org.apache.kafka.streams.KafkaStreams:514) 
INFO stream-client [word-count-example-052c3c3e-8dfd-40e7-8b5b-7ee06e3af96a] State transition from PENDING_SHUTDOWN to NOT_RUNNING. (org.apache.kafka.streams.KafkaStreams:229) 

セットアップは、Linux VM上の飼育係と3つのブローカーを実行していることです。誰かがお勧めしますか?

KTable<String, Long> wordcount = source 
       .flatMapValues(new ValueMapper<String, Iterable<String>>() { 
        @Override 
        public Iterable<String> apply(String value) { 
         return Arrays.asList(value.toLowerCase(Locale.getDefault()).split(" ")); 
        } 
       }) 
       .groupBy(new KeyValueMapper<String, String, String>() { 
        @Override 
        public String apply(String key, String value) { 
         return value; 
        } 
       }) 
       .count("Counts"); 
+0

どのバージョンのkafka plsですか? –

答えて

0

これにあなたのKtableを変更すると、それを修正する必要があり

Caused by: org.apache.kafka.common.errors.SerializationException: Size of data received by LongDeserializer is not 8 

だから、そうです、あなたの価値ではないこと8バイト長の値。私は、あなたの価値は実際には入力トピックの文字列であると仮定します。したがって、データと一致する正しいデシリアライザを指定する必要があります。

1

あなたはVALUE_DESERIALIZERためLongDeserializerを指定しなかったし、実際のエラーは、次のとおりです:

関連する問題