基本的に私は単一のSpark Streamingコンシューマ[Direct Approach]を使用して複数のカフカトピックからデータを消費しています。RDD数のDStreamをシングルRDDに変換する方法
val dStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicsSet).map(_._2)
バッチ間隔は30 Seconds
です。
ここにいくつか質問があります。
- DStreamでforeachRDDを呼び出すと、DStreamにSingle RDDの代わりに複数のRDDが含まれますか?それぞれのトピックは別々のRDDを作成しますか?
- はいの場合は、すべてのRDDを単一のRDDに結合してから、データを処理します。それ、どうやったら出来るの?
- 処理時間がバッチ間隔を超える場合、DStreamには複数のRDDが含まれますか?
DStream RDDを以下の方法で単一のRDDに結合しようとしました。まず第一に私の理解は正しいのですか? DStreamが常に単一のRDDを返す場合、以下のコードは必要ありません。
サンプルコード:私はDSTREAMにforeachRDDを呼び出すとき
var dStreamRDDList = new ListBuffer[RDD[String]]
dStream.foreachRDD(rdd =>
{
dStreamRDDList += rdd
})
val joinedRDD = ssc.sparkContext.union(dStreamRDDList).cache()
//THEN PROCESS USING joinedRDD
//Convert joinedRDD to DF, then apply aggregate operations using DF API.
おかげで、私は...あなたのポストを読んで、戻ってくるだろう。 ) – Shankar