私は、何度か呼び出されるメソッドを持っています。このメソッドは、次のようになります。私のRDDはいつペナルティを受けることができますか?
def separateGoodAndBad(myRDD: RDD[String]): RDD[String] = {
val newRDD = myRDD.map(......) //do stuff
newRDD.cache //newRDD has 2 actions performed on it
val badRDD = newRDD.filter(row => row.contains("bad"))
badRDD.count
val goodRDD = newRDD.filter(row => row.contains("good"))
goodRDD.count
newRDD.unpersist // I am unpersisting because this method gets called several times
goodRDD
}
私が言ったように、私はこの方法が複数回呼び出されると、私は別のキャッシュされたnewRDDs
の4つのコピーをしたくないのでnewRDD
をunpersistたいです。ここでのコードサンプルです:。
val firstRDD = separateGoodAndBad(originalRDD)
val firstRDDTransformed = doStuffToFirstRDD(firstRDD)
val secondRDD = separateGoodAndBad(firstRDDTransformed)
val secondRDDTransformed = doStuffToSecondRDD(secondRDD)
val thirdRDD = separateGoodAndBad(secondRDDTransformed)
val thirdRDDTransformed = doStuffToThirdRDD(thirdRDD)
しかし、secondRDD
とthirdRDD
は私がunpersist(separateGoodAndBad()
に上記参照を追加したことを非常に長く、今取っている彼らがnewRDD
を再計算する必要がされているようだ
ときにすることができます。それは再計算を取得していたことがないように、私はnewRDD
をunpersist?
あなたはタスクが(大きな時間がかかっている)がボトルネックであるかどうかを確認するために、スパークUI上でチェックしている:
次に、あなたはこのような関数呼び出し何か外にそれらをunpersistことができますか? –
@vatsalmevadaそれまでスパークアプリケーション全体を実行しなければならないので、時間がかかります。これらのカウントは私の最初のスパーク「アクション」です。私が混乱しているのは、すべてを再計算しなければならない理由です。 –