1
kafka
ストリームのデータを解析しようとしています。以下は私が現時点でやっていることです。print()で予期しない動作が発生しました
Import /* … */
Object MyObject {
Def main (args: Array[String]){
/*spark streaming context set up*/
val kafkaStream = KafkaUtils.createStream(streamingContext,zkQuorum,groupID,[per-topic number of Kafka partitions to consume])
kafkaStream.persist(/*Storage Level*/)
val field_1_Retrieved = kafkaStream.parsingFunctionToRetrieveField1().print
val field_2_Retrieved = kafkaStream.parsingFunctionToRetrieveField2().print
val field_3_Retrieved = kafkaStream.parsingFunctionToRetrieveField3().print
ssc.start()
ssc.awaitTermination()
}
}
しかし、ここではどのようなI出力は次のようになります。
-----------------------
Time xxxxxxxxxx ms
-----------------------
field_1_Retrieved
field_1_Retrieved
-----------------------
Time xxxxxxxxxy ms
-----------------------
field_2_Retrieved
field_2_Retrieved
-----------------------
Time xxxxxxxxxz ms
-----------------------
field_3_Retrieved
field_3_Retrieved
これは、ランダムな縫い目、と私は自分のコードに期待するものを確実ではありません。私は、この動作を引き起こすspark
またはkafka
の機能から行方不明です何
Time xxxxxxxxxx ms
-----------------------
field_1_Retrieved
field_2_Retrieved
field_3_Retrieved
-----------------------
Time xxxxxxxxxy ms
-----------------------
field_1_Retrieved
field_2_Retrieved
field_3_Retrieved
:それは次のようなものでしょうか?それとも間違っているのですか?
'parsingFunctionToRetrieveFieldX()'のコードは何ですか? – maasg