2016-05-01 20 views
1

kafkaストリームのデータを解析しようとしています。以下は私が現時点でやっていることです。print()で予期しない動作が発生しました

Import /* … */ 

Object MyObject { 
    Def main (args: Array[String]){ 
     /*spark streaming context set up*/ 

     val kafkaStream = KafkaUtils.createStream(streamingContext,zkQuorum,groupID,[per-topic number of Kafka partitions to consume]) 
     kafkaStream.persist(/*Storage Level*/) 

     val field_1_Retrieved = kafkaStream.parsingFunctionToRetrieveField1().print 
     val field_2_Retrieved = kafkaStream.parsingFunctionToRetrieveField2().print 
     val field_3_Retrieved = kafkaStream.parsingFunctionToRetrieveField3().print 

     ssc.start() 
     ssc.awaitTermination() 
    } 
} 

しかし、ここではどのようなI出力は次のようになります。

----------------------- 
Time xxxxxxxxxx ms 
----------------------- 
field_1_Retrieved 
field_1_Retrieved 
----------------------- 
Time xxxxxxxxxy ms 
----------------------- 
field_2_Retrieved 
field_2_Retrieved 
----------------------- 
Time xxxxxxxxxz ms 
----------------------- 
field_3_Retrieved 
field_3_Retrieved 

これは、ランダムな縫い目、と私は自分のコードに期待するものを確実ではありません。私は、この動作を引き起こすsparkまたはkafkaの機能から行方不明です何

Time xxxxxxxxxx ms 
----------------------- 
field_1_Retrieved 
field_2_Retrieved 
field_3_Retrieved 
----------------------- 
Time xxxxxxxxxy ms 
----------------------- 
field_1_Retrieved 
field_2_Retrieved 
field_3_Retrieved 

:それは次のようなものでしょうか?それとも間違っているのですか?

+0

'parsingFunctionToRetrieveFieldX()'のコードは何ですか? – maasg

答えて

1

これは正常な動作です。 print's implementation for a DStreamはタイムスタンプバナーで始まります。複数のDStreamでprintを呼び出すと、複数のバナーが表示されます。

バージョン2を達成するには、元のdstreamのデータを同じDStream(おそらくフラットマップを使用)内の3つの異なるバージョンに変換する必要があります。 parsingFunctionToRetrieveFieldxが提供されていないことを考えれば、それに関する追加の詳細を与えることはできません。

関連する問題