多くのステップを実行し、最後にマイクロバッチのルックアップが行われているか、プリロードされたRDDに参加するとします。 12時間ごとにプリロードされたRDDをリフレッシュする必要があります。これどうやってするの。ストリーミング・コンテキストに関係しないことは、ストリーミングRDDのフォーム1とはどのようにして得られるのか、私の理解には再現されません。ストリーミングdstreamがいくつのパーティションにあるかにかかわらず、1つのコールだけを行う必要がありますsparkストリーミングn回のバッチの後にルックアップ非ストリームrddをリロードする方法
1
A
答えて
3
これは、リロードする必要があるときに外部RDDを再作成することで可能です。それは与えられた瞬間にアクティブであるRDD
参照を保持するために可変変数を定義する必要があります。 dstream.foreachRDD
内で、RDD参照をリフレッシュする必要がある瞬間を確認できます。
これは、それは次のようになります方法の例です:
val stream:DStream[Int] = ??? //let's say that we have some DStream of Ints
// Some external data as an RDD of (x,x)
def externalData():RDD[(Int,Int)] = sparkContext.textFile(dataFile)
.flatMap{line => try { Some((line.toInt, line.toInt)) } catch {case ex:Throwable => None}}
.cache()
// this mutable var will hold the reference to the external data RDD
var cache:RDD[(Int,Int)] = externalData()
// force materialization - useful for experimenting, not needed in reality
cache.count()
// a var to count iterations -- use to trigger the reload in this example
var tick = 1
// reload frequency
val ReloadFrequency = 5
stream.foreachRDD{ rdd =>
if (tick == 0) { // will reload the RDD every 5 iterations
// unpersist the previous RDD, otherwise it will linger in memory, taking up resources.
cache.unpersist(false)
// generate a new RDD
cache = externalData()
}
// join the DStream RDD with our reference data, do something with it...
val matches = rdd.keyBy(identity).join(cache).count()
updateData(dataFile, (matches + 1).toInt) // so I'm adding data to the static file in order to see when the new records become alive
tick = (tick + 1) % ReloadFrequency
}
streaming.start
このソリューションに来て前、私はRDDにpersist
フラグでプレーする可能性を研究し、それはように動作しませんでした期待される。 unpersist()
のように見えるのは、RDDが再び使用されたときにRDDの再マテリアライズを強制しません。
関連する問題
- 1. Sparkストリーミングを使用してrddをHbaseに保存すると、
- 2. Spark RDD [Array [MyObject]]をRDDに変換する方法[MyObject]
- 3. 直接ストリームを使用したKafka Sparkストリーミングでコンシューマグループを指定する方法
- 4. RDD(Spark)のアイテムをRDDの多くのアイテムに変換する方法は?
- 5. ストリームをストリーミングする方法は?
- 6. SparkのRDDへのマッピング方法
- 7. Spark DataFrameをJavaのPOJOのRDDに変換する方法
- 8. spark RDDのサブセットを効率的に削除する方法
- 9. RDD [Double]をScalaのVectorに変換する方法Spark
- 10. [n * 7,12]を[n、7,12]にバッチ処理する方法
- 11. Apache Sparkストリーミング - タイムアウトの長時間実行のバッチ
- 12. モデルをSparkバッチで作成し、Sparkストリーミングで使用できますか?
- 13. Spark 2.xの構造化ストリーミングで2つのストリームを結合する際の回避策
- 14. Apache Spark RDDのコレクションを1つのRDDに変換するJava
- 15. n個のペアのゼロでRDDを初期化する方法
- 16. 2つのストリーム間のSparkストリーミング共有状態
- 17. Sparkで既存のRDDにRDDを追加するには?
- 18. Spark(Scala)で2つのRDDを結合する方法は?
- 19. csvファイルにspark rddを保存する方法
- 20. Sparkで明示的にRDDを実現する方法
- 21. Spark DataFrame(RDD)のメタデータ
- 22. Javaストリーム - ストリームのn番目の値にカウンタiを追加する方法
- 23. フラッシュ後にデータをリロードする方法
- 24. elasticsearchをapacheに接続する方法sparkストリーミングまたはストーム?
- 25. SparkのケースクラスのRDDにデータフレームを戻す
- 26. 2つのSparkコンテキスト間でSpark RDDを共有するには?
- 27. スパーク。バッチに分割RDD
- 28. Apache Spark RDDワークフロー
- 29. トランザクションブロック| Spark SQL、rdd
- 30. RDD Aggregate in spark
助けがあれば分かります..... – subhankar
私は考えがありますが、まずそれをテストする必要があります。 – maasg