2016-11-30 3 views
1

Kafka Streams(バージョン0.10.0.1)とKafka Broker(0.10.0.1)を使用しています。メッセージキーに基づいてカウントを生成しようとしています。これはカフカにメッセージを送信しますKafka StreamsがcountByKeyの後に期待通りの結果を書いていない

1,{"value":10} 

鍵を持っている:私は上記のコマンドを実行すると

./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic kafka-streams-topic --property parse.key=true --property key.separator=, 

は、私はこのようなキーと値を送ることができます:私は、次のコマンドを使用して、私のメッセージを生成し= 1、値= {"値":10}となります。

私の目標は、キーが1のメッセージ数を数えることです。上記のコマンドを考えると、カウントは次のようになります。ここ

1.は、私が使用していますコードです:私は(stringSerde、longSerde)counts.printを実行すると

public class StreamProcessor { 

    public static void main(String[] args) { 
     KStreamBuilder builder = new KStreamBuilder(); 

     final Serde<Long> longSerde = Serdes.Long(); 
     final Serde<String> stringSerde = Serdes.String(); 

     KStream<String, String> values = builder.stream(stringSerde, stringSerde, "kafka-streams-topic"); 

     KStream<String, Long> counts = values 
       .countByKey(stringSerde, "valueCounts") 
       .toStream(); 

     counts.print(stringSerde, longSerde); 
     counts.to(stringSerde, longSerde, "message-counts-topic"); 

     KafkaStreams streams = new KafkaStreams(builder, properties()); 

     streams.start(); 

     Runtime.getRuntime().addShutdownHook(new Thread(streams::close)); 
    } 

    private static Properties properties() { 
     final Properties streamsConfiguration = new Properties(); 

     streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "kafka-streams-poc"); 
     streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); 
     streamsConfiguration.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, "localhost:2181"); 
     streamsConfiguration.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); 
     streamsConfiguration.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); 

     return streamsConfiguration; 
    } 
} 

は私が手:

1 , 1 

私は鍵= 1を持っていて、その鍵を持っているメッセージは1つです。それは私が期待していることです。

ただし、次の行が実行されたとき:

./bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic message-counts-topic --property print.key=true --property key.separator=, --from-beginning 

counts.to(stringSerde, longSerde, "message-counts-topic"); 

メッセージ・カウント・話題というトピックがそれに送信されたメッセージを取得しますが、私は、このコマンドを使用してメッセージを読み取ろうとするとき

1 , 

1が鍵であり、何もありません:

は、私は次の出力を得ます値として表示されます。メッセージ1、1が見えます。しかし何らかの理由で、countメソッドの値が失われます。これは、printメソッドを呼び出すときに表示されます。

答えて

4

bin/kafka-console-consumer.shに異なる値のデシリアライザを指定する必要があります。次のコードを追加してください。

--property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer 

デフォルトの文字列デシリアライザはlong値を正しく読み取れません。

+0

完全なコマンドは次のとおりです。 /bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic message-counts-topic --property print.key = true - プロパティkey.separator = 、--property value.deserializer = org.apache.kafka.common.serialization.LongDeserializer --from-beginning – crypto

関連する問題