JSON配列をJSON要素に変換するためのkafkaストリームコードを作成しようとしています...私はkafkaストリームを初めて使用しています。そこkstreamとktable .. と入力の私のストリームにおける次の形式JSON用のCreatin kafkaストリームAPI
[
{"timestamp":"2017-10-24T12:44:09.359126933+05:30","data":0,"unit":""},
{"timestamp":"2017-10-24T12:44:09.359175426+05:30","data":1,"unit":""}
]
[
{"timestamp":"2017-10-24T12:44:09.359126933+05:30","data":2,"unit":""},
{"timestamp":"2017-10-24T12:44:09.359175426+05:30","data":3,"unit":""}
]
にできるでしょうし、私の出力は形式でなければなりません
{"timestamp":"2017-10-24T12:44:09.359126933+05:30","data":0,"unit":""}
{"timestamp":"2017-10-24T12:44:09.359175426+05:30","data":1,"unit":""}
{"timestamp":"2017-10-24T12:44:09.359126933+05:30","data":2,"unit":""}
{"timestamp":"2017-10-24T12:44:09.359175426+05:30","data":3,"unit":""}
は、誰もがコードを書くことで私を助けることができます??
from kafka import KafkaConsumer
consumer = KafkaConsumer('topicName')
for message in consumer:
print(message)
はKafkaConsumerでbootstrap_serversパラメータを指定Pythonで
フラットマップを使ってJsonの配列をJsonオブジェクトに変換することはできますか? –
そして、KStreamsの私のはどうあるべきですか?私はそのために何を指定する必要があるのか?あなたがこれをするのを手伝ってくれてありがとうございました。私はこの時点で立ち往生しています... –
はい。 flatMap内で変換します。入力ストリームの場合、タイプは入力トピックで使用するタイプと一致する必要があります。出力タイプの場合、結果タイプとして書き込むタイプを選択できます。 –