2016-12-15 6 views
0

私は、次のPySparkコードを実行した:Google DataprocでSparkを実行すると、saveAsTextFileの使用中にローカルディスクまたはHDFSの代わりに外部ストレージ(GCS)に一時ファイルが保存されるのはなぜですか?

from pyspark import SparkContext 

sc = SparkContext() 

data = sc.textFile('gs://bucket-name/input_blob_path') 
sorted_data = data.sortBy(lambda x: sort_criteria(x)) 
sorted_data.saveAsTextFile(
    'gs://bucket-name/output_blob_path', 
    compressionCodecClass="org.apache.hadoop.io.compress.GzipCodec" 
) 

ジョブが正常に完了しました。しかし、ジョブの実行中に、Sparkは次のパスgs://bucket-name/output_blob_path/_temporary/0/に多くの一時的なブロブを作成しました。これらの一時的なブロブをすべて削除すると、ジョブ実行時間の半分が費やされ、CPU使用率はこの間に1%(膨大なリソースの無駄遣い)になりました。

GCPの代わりにローカルドライブ(またはHDFS)に一時ファイルを保存する方法はありますか?私はまだ最終結果(ソートされたデータセット)をGCPに残したいと思います。

10個のワーカーノードを持つDataproc Sparkクラスタ(VMタイプ16コア、60GM)を使用していました。入力データの容量は10TBでした。あなたが見る

答えて

1

_temporaryファイルはおそらくボンネットの下で使用されているFileOutputCommitterの成果物です。重要なことに、これらの一時的なブロブは、厳密には「一時的な」データではなく、実際にはジョブ完了時に最終的な宛先に「名前が変更された」出力データのみでした。名前の変更によるこれらのファイルの「コミット」は、ソースと宛先の両方がGCS上にあるため、実際には高速です。この理由から、ワークフローのその部分をHDFS上に一時ファイルを置いてからGCSに「コミット」する方法はありません。なぜなら、コミットは出力データセット全体をHDFSからGCSに戻し直す必要があるからです。具体的には、基礎となるHadoop FileOutputFormatクラスは、そのようなイディオムをサポートしていません。

GCS自体は本当のファイルシステムではなく、「オブジェクトストア」、およびその能力を最大限にDataprocのみ模倣HDFS内のGCSコネクタです。結果として、ファイルのディレクトリフィルを削除すると、実際にiノードをリンク解除するだけの実際のファイルシステムではなく、実際にGFSが個々のオブジェクトを削除する必要があります。実際に

あなたがこれを打つしている場合、それはおそらくクリーンアップが一度に〜1000個のファイルのバッチで発生しないので、あなたの出力は、とにかくあまりにも多くのファイルに分割されていることを意味します。だから、何千もの出力ファイルは、通常は著しく遅くすべきではありません。あまりにも多くのファイルを持つと、それらのファイルの将来の作業が遅くなります。可能な限り出力ファイルの数を減らすのが一番簡単な方法です。たとえば、repartition()を使用してください。

from pyspark import SparkContext 

sc = SparkContext() 

data = sc.textFile('gs://bucket-name/input_blob_path') 
sorted_data = data.sortBy(lambda x: sort_criteria(x)) 
sorted_data.repartition(1000).saveAsTextFile(
    'gs://bucket-name/output_blob_path', 
    compressionCodecClass="org.apache.hadoop.io.compress.GzipCodec" 
) 
+0

ありがとうございます。私はBigQueryからGCSにエクスポートされたデータをソートするので、ファイルが多すぎることに少し驚いています。私の前提は、BiqQueryのエクスポート機能がすでにパーティション数(GCSにデータセットを保存するのに最適なファイル数)を最適化しているということでした。 – user2548047

+0

適用されているRDD操作の種類によっては、変換後のパーティション数が入力パーティション数と異なる場合があります。また、この場合、FileInputFormatは、デフォルトで、入力ファイルをデフォルトでは入力ファイルの数これを '--properties spark.hadoop.fs.gs.block.size = 536870912'で調整すると、デフォルトの64MBではなく512MBに増やすことができます。 –

+0

デフォルトでは、クラスタ展開時に調整することもできます。あなたのジョブが通常10TBの範囲にあれば、 'gcloud dataproc clusters my-cluster --properties core:fs.gs.block.size = 536870912'が合理的です。あなたの仕事が唯一の、例えば10GBだとすれば、それは高すぎるでしょう。たいていの場合、1000以上、50000未満のパーティションを目指すのが良いですが、小さな仕事でも64 MBよりも小さいブロックサイズにしたくないのが普通です。 –

関連する問題