2017-12-01 7 views
0

私は、何度か呼び出されるメソッドを持っています。このメソッドは、次のようになります。私のRDDはいつペナルティを受けることができますか?

def separateGoodAndBad(myRDD: RDD[String]): RDD[String] = { 
    val newRDD = myRDD.map(......) //do stuff 
    newRDD.cache //newRDD has 2 actions performed on it 

    val badRDD = newRDD.filter(row => row.contains("bad")) 
    badRDD.count 

    val goodRDD = newRDD.filter(row => row.contains("good")) 
    goodRDD.count 

    newRDD.unpersist // I am unpersisting because this method gets called several times 

    goodRDD 
} 

私が言ったように、私はこの方法が複数回呼び出されると、私は別のキャッシュされたnewRDDsの4つのコピーをしたくないのでnewRDDをunpersistたいです。ここでのコードサンプルです:。

val firstRDD = separateGoodAndBad(originalRDD) 
val firstRDDTransformed = doStuffToFirstRDD(firstRDD) 

val secondRDD = separateGoodAndBad(firstRDDTransformed) 
val secondRDDTransformed = doStuffToSecondRDD(secondRDD) 

val thirdRDD = separateGoodAndBad(secondRDDTransformed) 
val thirdRDDTransformed = doStuffToThirdRDD(thirdRDD) 

しかし、secondRDDthirdRDDは私がunpersist(separateGoodAndBad()に上記参照を追加したことを非常に長く、今取っている彼らがnewRDDを再計算する必要がされているようだ

ときにすることができます。それは再計算を取得していたことがないように、私はnewRDDをunpersist?

+0

あなたはタスクが(大きな時間がかかっている)がボトルネックであるかどうかを確認するために、スパークUI上でチェックしている:

def separateGoodAndBad(myRDD: RDD[String]): RDD[String] = { val newRDD = myRDD.map(......) //do stuff newRDD.cache //newRDD has 2 actions performed on it val badRDD = newRDD.filter(row => row.contains("bad")) badRDD.count val goodRDD = newRDD.filter(row => row.contains("good")) goodRDD.cache // this will cache goodRDD to avoid recomputing in next call goodRDD.count newRDD.unpersist // I am unpersisting because this method gets called several times goodRDD } 

次に、あなたはこのような関数呼び出し何か外にそれらをunpersistことができますか? –

+0

@vatsalmevadaそれまでスパークアプリケーション全体を実行しなければならないので、時間がかかります。これらのカウントは私の最初のスパーク「アクション」です。私が混乱しているのは、すべてを再計算しなければならない理由です。 –

答えて

2

あなたはそれはあなたがgoodRDD.countを行う際に、一度計算され、あなたには、いくつかのAを実行すると、それが再び再計算されるようにもgoodRDDをキャッシュすることもできますその中のRDD内にdoStuffToFirstRDDメソッドを使用してください。

val firstRDD = separateGoodAndBad(originalRDD) 
val firstRDDTransformed = doStuffToFirstRDD(firstRDD) 

val secondRDD = separateGoodAndBad(firstRDDTransformed) 
firstRDD .unpersist //as your secondRDD will be cached by above `separateGoodAndBad` call 
val secondRDDTransformed = doStuffToSecondRDD(secondRDD) 

val thirdRDD = separateGoodAndBad(secondRDDTransformed) 
secondRDD.unpersist //as your thirdRDD will be cached by above `separateGoodAndBad` call 
val thirdRDDTransformed = doStuffToThirdRDD(thirdRDD) 
+0

優れた答え。あなたが質問に答えたとき、私は実際にこの正確な解決策をテストしていました! –

関連する問題