2017-11-27 2 views
0

JSON配列をJSON要素に変換するためのkafkaストリームコードを作成しようとしています...私はkafkaストリームを初めて使用しています。そこkstreamとktable .. と入力の私のストリームにおける次の形式JSON用のCreatin kafkaストリームAPI

[ 
{"timestamp":"2017-10-24T12:44:09.359126933+05:30","data":0,"unit":""}, 
{"timestamp":"2017-10-24T12:44:09.359175426+05:30","data":1,"unit":""} 
] 

[ 
{"timestamp":"2017-10-24T12:44:09.359126933+05:30","data":2,"unit":""}, 
{"timestamp":"2017-10-24T12:44:09.359175426+05:30","data":3,"unit":""} 
] 

にできるでしょうし、私の出力は形式でなければなりません

{"timestamp":"2017-10-24T12:44:09.359126933+05:30","data":0,"unit":""} 
{"timestamp":"2017-10-24T12:44:09.359175426+05:30","data":1,"unit":""} 
{"timestamp":"2017-10-24T12:44:09.359126933+05:30","data":2,"unit":""} 
{"timestamp":"2017-10-24T12:44:09.359175426+05:30","data":3,"unit":""} 

は、誰もがコードを書くことで私を助けることができます??

from kafka import KafkaConsumer 
consumer = KafkaConsumer('topicName') 
for message in consumer: 
    print(message) 

はKafkaConsumerでbootstrap_serversパラメータを指定Pythonで

答えて

0

あなたはカフカストリームを使用する場合は、flatMap()を使用することができます。

// using new 1.0 API 
StreamsBuilder builder = new StreamsBuilder(); 
builer.stream("topic").flatMap(...).to("output-topic"); 

のようなものは、より多くの詳細については、例やドキュメントをチェックアウト:

+0

フラットマップを使ってJsonの配列をJsonオブジェクトに変換することはできますか? –

+0

そして、KStreamsの私のはどうあるべきですか?私はそのために何を指定する必要があるのか​​?あなたがこれをするのを手伝ってくれてありがとうございました。私はこの時点で立ち往生しています... –

+0

はい。 flatMap内で変換します。入力ストリームの場合、タイプは入力トピックで使用するタイプと一致する必要があります。出力タイプの場合、結果タイプとして書き込むタイプを選択できます。 –

-1

...。 Java用

は、本当に良いcloudkarafkaを見て:

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); 
consumer.subscribe(Arrays.asList(topic)); 
while (true) { 
    ConsumerRecords<String, String> records = consumer.poll(100); 
    for (ConsumerRecord<String, String> record : records) 
     System.out.printf("msg = %s\n", record.value()); 
    } 
} 
+0

私はjava..ifにuはuが私を助けることができる知っている必要がありますか? –

+0

私の見解では、Scalaの方が優れています;) – CarloV

+0

質問はKafka Streams APIに関するもので、カフカの一般消費者ではありません。 –

関連する問題