メモリが限られていることを考慮して、スパークは各ノードからRDDを自動的に削除すると感じました。私はこの時間を設定可能であることを知りたいですか?メモリからRDDはどれくらいの期間メモリに残っていますか?
注RDDを立ち退かせるためにするとき、火花が決めるんどのように:私は約rdd.cache()
メモリが限られていることを考慮して、スパークは各ノードからRDDを自動的に削除すると感じました。私はこの時間を設定可能であることを知りたいですか?メモリからRDDはどれくらいの期間メモリに残っていますか?
注RDDを立ち退かせるためにするとき、火花が決めるんどのように:私は約rdd.cache()
私はこの時間を設定することができますか? が
RDD
メモリ
@ Jacekが指摘しているように、「how」部分はContextCleaner
というオブジェクトの責任です。主に、あなたが詳細、this is what the cleaning method looks likeをしたい場合:あなたが動きを学びたいのであれば、私は一般的に"Mastering Apache Spark"呼ば@Jacek帳(This points to an explanation regarding ContextCleaner
)
GCの影響を測定する話ではない
GCチューニングの最初のステップは、頻度に関する統計を収集することですガベージコレクションが発生し、GCの所要時間がかかります。これは、-verbose:gc -XX:+ PrintGCDetails -XX:+ PrintGCTimeStampsをJavaオプションに追加することで実行できます。 (JavaオプションをSparkジョブに渡す方法については、設定ガイドを参照してください)Sparkジョブが次に実行されると、ガベージコレクションが発生するたびにワーカーのログにメッセージが表示されます。これらのログは、ドライバプログラムではなく、クラスタのワーカーノード(作業ディレクトリのstdoutファイル)に記録されます。
高度なGCチューニング
さらに調整ガベージコレクションのために、我々は最初のJVMでのメモリ管理に関するいくつかの基本的な情報を理解する必要があります。
Javaヒープスペースは老若男女二つの領域に分割されます。若い世代は短命のオブジェクトを保持することを意味し、古い世代はより長い生存期間を持つオブジェクトを対象としています。
若年世代はさらに3つの地域[Eden、Survivor1、Survivor2]に分かれています。
ガベージコレクション手順の簡単な説明:Edenがいっぱいになると、マイナーGCがEdenで実行され、EdenとSurvivor1から生きているオブジェクトがSurvivor2にコピーされます。サバイバー領域は入れ替えられます。オブジェクトが十分に古くなった場合、またはSurvivor2が一杯になっている場合は、Oldに移動します。最後にOldが満杯に近づくと、完全GCが呼び出されます。
を読んで、ブラウジングがより良いソースやスパークス示唆
private def keepCleaning(): Unit = Utils.tryOrStopSparkContext(sc) {
while (!stopped) {
try {
val reference = Option(referenceQueue.remove(ContextCleaner.REF_QUEUE_POLL_TIMEOUT))
.map(_.asInstanceOf[CleanupTaskWeakReference])
// Synchronize here to avoid being interrupted on stop()
synchronized {
reference.foreach { ref =>
logDebug("Got cleaning task " + ref.task)
referenceBuffer.remove(ref)
ref.task match {
case CleanRDD(rddId) =>
doCleanupRDD(rddId, blocking = blockOnCleanupTasks)
case CleanShuffle(shuffleId) =>
doCleanupShuffle(shuffleId, blocking = blockOnShuffleCleanupTasks)
case CleanBroadcast(broadcastId) =>
doCleanupBroadcast(broadcastId, blocking = blockOnCleanupTasks)
case CleanAccum(accId) =>
doCleanupAccum(accId, blocking = blockOnCleanupTasks)
case CleanCheckpoint(rddId) =>
doCleanCheckpoint(rddId)
}
}
}
} catch {
case ie: InterruptedException if stopped => // ignore
case e: Exception => logError("Error in cleaning thread", e)
}
}
}
、それはですどのように「ただ、他のオブジェクトのような」Yuval Itzchakov wrote、しかし...(が常にだ「ではなく」、それはないですか?)
スパークで、我々は他のブロックの中でシャッフル・ブロックを(持っているので、それがその明白ではありませんSparkによって管理されます)。それらはエグゼキュータで実行されているBlockManagersによって管理されます。彼らは何とか、ドライバ上のオブジェクトがメモリから追い出されると通知される必要があります。
ここではContextCleanerがステージになります。スパークアプリケーションのガベージコレクタで、長時間実行されるデータ重いスパークアプリケーションのメモリ要件を削減することを目的とした、シャッフル、RDD、ブロードキャスト、アキュムレータ、チェックポイント付きRDDのアプリケーション全体のクリーンアップを担当します。
ContextCleanerはドライバで動作します。 SparkContext
が起動したときに作成され、直ちに開始されます(デフォルトではspark.cleaner.referenceTracking
Sparkプロパティが有効になっています)。 SparkContext
が停止すると停止します。
jconsole
またはjstack
を使用して、Sparkアプリケーションのすべてのスレッドのダンプを実行すると、動作することがわかります。 ContextCleanerはデーモンSpark Context Cleanerスレッドを使用して、RDD、シャッフル、およびブロードキャストの状態を消去します。
また、org.apache.spark.ContextCleaner
ロガーのログレベルをINFO
またはDEBUG
にすることで、その作業を確認することもできます。ただ、conf/log4j.properties
に次の行を追加します
log4j.logger.org.apache.spark.ContextCleaner=DEBUG
を弾力性のある分散データ・セット紙によると -
私たちの労働者は JavaオブジェクトとしてメモリにキャッシュRDDパーティションをノード。ほとんどの操作がスキャンであるため、 のRDDレベルでLRU置換ポリシーを使用します(つまり、 RDDから他のパーティションをロードするために RDDからパーティションを削除しません)。この のすべてのユーザーアプリケーションでうまく動作するように、この単純なポリシーが見つかったので、 までです。さらに制御したいプログラマは、キャッシュへの引数として各RDDの保持優先度を に設定することもできます。
正しい。私たちが定義しているRDDは、最終的にやりたいと思う一連の作業です。それで 'rdd.map(_ * 2)'と言うことができます。この場合、sparkは '_ * 2'のマップを行うパーティションにタスクを送り、新しいデータセットを保存します。私はこの新しいマップされたデータセットがそのパーティションのメモリにあると仮定します。 rddがシリアル化され、すべてのパーティションに送信され、rddにそのデータセットが含まれていますか? –
'RDD'はエグゼキュータには一切送信されません。パーティションから構築されます。このような各パーティションは、処理のためにエグゼキュータに送られます。そのパーティションがもはや使用されなくなると(すなわち、[折れ線グラフ](https://jaceklaskowski.gitbooks.io/mastering-apache-spark/content/spark-rdd-lineage)の別の 'RDD'の親ではありません。 html))を収集することができます。チェックポイント設定は、線グラフを切り捨てる1つの方法です。 –