スパークチェックポイントとディスクに対する永続性の違いは何ですか?両方ともローカルディスクに格納されていますか?スパークチェックポイントとディスクに永続するものの違い
答えて
重要な違いはほとんどありませんが、根本的なものは血統で何が起こるかです。 Persist
/cache
は系統をそのまま維持し、checkpoint
系統は系統を壊す。
import org.apache.spark.storage.StorageLevel
val rdd = sc.parallelize(1 to 10).map(x => (x % 3, 1)).reduceByKey(_ + _)
cache
/persist
:val indCache = rdd.mapValues(_ > 4) indCache.persist(StorageLevel.DISK_ONLY) indCache.toDebugString // (8) MapPartitionsRDD[13] at mapValues at <console>:24 [Disk Serialized 1x Replicated] // | ShuffledRDD[3] at reduceByKey at <console>:21 [Disk Serialized 1x Replicated] // +-(8) MapPartitionsRDD[2] at map at <console>:21 [Disk Serialized 1x Replicated] // | ParallelCollectionRDD[1] at parallelize at <console>:21 [Disk Serialized 1x Replicated] indCache.count // 3 indCache.toDebugString // (8) MapPartitionsRDD[13] at mapValues at <console>:24 [Disk Serialized 1x Replicated] // | CachedPartitions: 8; MemorySize: 0.0 B; ExternalBlockStoreSize: 0.0 B; DiskSize: 587.0 B // | ShuffledRDD[3] at reduceByKey at <console>:21 [Disk Serialized 1x Replicated] // +-(8) MapPartitionsRDD[2] at map at <console>:21 [Disk Serialized 1x Replicated] // | ParallelCollectionRDD[1] at parallelize at <console>:21 [Disk Serialized 1x Replicated]
checkpoint
:あなたが最初のケースリットルで見ることができるようにval indChk = rdd.mapValues(_ > 4) indChk.checkpoint // indChk.toDebugString // (8) MapPartitionsRDD[11] at mapValues at <console>:24 [] // | ShuffledRDD[3] at reduceByKey at <console>:21 [] // +-(8) MapPartitionsRDD[2] at map at <console>:21 [] // | ParallelCollectionRDD[1] at parallelize at <console>:21 [] indChk.count // 3 indChk.toDebugString // (8) MapPartitionsRDD[11] at mapValues at <console>:24 [] // | ReliableCheckpointRDD[12] at count at <console>:27 []
次の例を考えてみましょデータがキャッシュからフェッチされたとしても、そのデータは保持されます。これは、indCache
の一部のパーティションが失われた場合、データをゼロから再計算できることを意味します。 2番目のケースでは、チェックポイントの後に系統が完全に失われ、indChk
には、もはやそれを再構築するために必要な情報が保持されません。
checkpoint
は、cache
/persist
とは異なり、他のジョブとは別に計算されます。そのため、チェックポイントをマークしたRDDをキャッシュする必要があります。
このRDDをメモリに保存することを強く推奨します。そうしないと、ファイルに保存すると再計算が必要になります。
最後にcheckpointed
データは、永続的かつSparkContext
が破壊された後に除去されません。
SparkContext.setCheckpointDir
はRDD.checkpoint
で使用されていますが、非ローカルモードで実行している場合はDFS
パスが必要です。それ以外の場合は、ローカルファイルシステムにすることもできます。 localCheckpoint
およびpersist
は、ローカルファイルシステムを使用する必要があります。
注:
RDDのチェックポイントスパークストリーミングでchekpointingとは異なる概念です。前者は系統問題を扱うように設計されており、後者はストリーミングの信頼性と障害回復に関するものです。
the relevant part of the documentationにチェックを付けると、信頼性の高いシステムにデータを書き込むことができます。 HDFS。しかし、Apache Sparkにチェックポイント情報を書き込む場所を伝えるのはあなた次第です。
一方、永続化とは、ほとんどの場合、メモリ内にデータをキャッシュすることです。これは、this part of the documentationが明らかに示すとおりです。
Apache Sparkに与えたディレクトリによって異なります。
ストリーミングの永続性はまったく別の問題であり、実際にキャッシュには関係しません。 – zero323
私は、私が
がで
- の永続化やキャッシュを永続言うだろう、そのページ内のすべてのを要約することは非常に難しいですが、あなたは非常に詳細な回答here
を見つけることができると思いますStorageLevel.DISK_ONLYは、RDDの生成を計算し、そのRDDのその後の使用がラインを再計算する際のそのポイントを超えないような場所に記憶させる。
- persistが呼び出された後、Sparkはそれを呼び出さなくてもRDDの系統を覚えています。アプリケーションが終了した後
- 第二に、キャッシュがクリアされるか、ファイルが物理的にHDFSにチェックポイント
- チェックポイントの店舗RDDを
を破壊し、それを作成した系統を破壊しています。
- Sparkアプリケーションの終了後もチェックポイントファイルは削除されません。
- チェックポイントファイルは、RDDのチェックポイント
- 後続のジョブの実行やドライバプログラムで使用可能な動作は、第1のコンピューティングの実際の仕事をしているし、チェックポイントのディレクトリに書き込む前にキャッシュを呼び出しますので、二重計算になります。
Sparkのチェックポイント処理またはキャッシュ操作の詳細や内部については、この記事をお読みください。
は同じ出力を返すプログラムすなわちdf.rdd.toDebugString()の系統を破壊することなく、ディスクと一時メモリへのデータフレームを格納する(MEMORY_AND_DISK)を永続します。それはメモリ内に残ることを、データフレームをキャッシュすることは保証されませんので、
df = df.persist(StorageLevel.MEMORY_AND_DISK) calculation1(df) calculation2(df)
注:それは中間結果の再計算を避けるために再利用されようとしている、計算上の持続(*)を使用することをお勧めしますあなたが次回に呼び出すまで。メモリの使用状況によっては、キャッシュを破棄することができます。一方、checkpoint()は、リネージュを破棄し、データフレームを強制的にディスクに格納します。 cache()/ persist()の使用とは異なり、頻繁なチェックポインティングはプログラムを遅くする可能性があります。チェックポイントは、a)不安定な環境で作業して、障害からの高速回復を可能にする場合に使用することが推奨される。b)RDDの新しいエントリが前のエントリに依存する場合、すなわち、障害発生時に長い依存チェーンを再計算しない
これは非常に一般的な質問です。その周りにいくつかのコンテキストを追加する方が良いでしょう。あなたの質問に答えるために、それは任意の永続的なストレージエリアに保存することができます - ローカルのDIskまたはHDFSまたはNFSマウントされたスペースなど。 – Sumit
@Sumit - これは2つのSpark RDDメソッドの違いに関する非常に具体的な質問です。答えはzero323の答えが示すように、客観的かつ集中的な答えになることができます。 –