2017-02-11 21 views
1

基本的に私は単一のSpark Streamingコンシューマ[Direct Approach]を使用して複数のカフカトピックからデータを消費しています。RDD数のDStreamをシングルRDDに変換する方法

val dStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicsSet).map(_._2) 

バッチ間隔は30 Secondsです。

ここにいくつか質問があります。

  1. DStreamでforeachRDDを呼び出すと、DStreamにSingle RDDの代わりに複数のRDDが含まれますか?それぞれのトピックは別々のRDDを作成しますか?
  2. はいの場合は、すべてのRDDを単一のRDDに結合してから、データを処理します。それ、どうやったら出来るの?
  3. 処理時間がバッチ間隔を超える場合、DStreamには複数のRDDが含まれますか?

DStream RDDを以下の方法で単一のRDDに結合しようとしました。まず第一に私の理解は正しいのですか? DStreamが常に単一のRDDを返す場合、以下のコードは必要ありません。

サンプルコード:私はDSTREAMにforeachRDDを呼び出すとき

var dStreamRDDList = new ListBuffer[RDD[String]] 
dStream.foreachRDD(rdd => 
     { 
      dStreamRDDList += rdd 
     }) 
val joinedRDD = ssc.sparkContext.union(dStreamRDDList).cache() 

//THEN PROCESS USING joinedRDD 
//Convert joinedRDD to DF, then apply aggregate operations using DF API. 

答えて

1

はDSTREAMには、複数のRDDの代わりにシングルRDDのが含まれていますか?各トピックは別々のRDDを作成しますか?

いいえ複数のトピックがある場合でも、任意のバッチ間隔で1つのRDDを使用できます。

処理時間がバッチ間隔を超える場合、DStreamには複数のRDDが含まれますか?

いいえ、処理時間がバッチ間隔よりも長い場合は、トピックオフセットを読み上げるだけです。次のバッチの処理は、前のジョブが完了した後にのみ開始されます。サイドノートとして

、あなたが実際にforeachRDDを使用する必要があることを確認し、またはおそらくyou're misusing the DStream API(免責事項:私はその記事の著者だ)もし

+0

おかげで、私は...あなたのポストを読んで、戻ってくるだろう。 ) – Shankar

関連する問題