私は、次のPySparkコードを実行した:Google DataprocでSparkを実行すると、saveAsTextFileの使用中にローカルディスクまたはHDFSの代わりに外部ストレージ(GCS)に一時ファイルが保存されるのはなぜですか?
from pyspark import SparkContext
sc = SparkContext()
data = sc.textFile('gs://bucket-name/input_blob_path')
sorted_data = data.sortBy(lambda x: sort_criteria(x))
sorted_data.saveAsTextFile(
'gs://bucket-name/output_blob_path',
compressionCodecClass="org.apache.hadoop.io.compress.GzipCodec"
)
ジョブが正常に完了しました。しかし、ジョブの実行中に、Sparkは次のパスgs://bucket-name/output_blob_path/_temporary/0/
に多くの一時的なブロブを作成しました。これらの一時的なブロブをすべて削除すると、ジョブ実行時間の半分が費やされ、CPU使用率はこの間に1%(膨大なリソースの無駄遣い)になりました。
GCPの代わりにローカルドライブ(またはHDFS)に一時ファイルを保存する方法はありますか?私はまだ最終結果(ソートされたデータセット)をGCPに残したいと思います。
10個のワーカーノードを持つDataproc Sparkクラスタ(VMタイプ16コア、60GM)を使用していました。入力データの容量は10TBでした。あなたが見る
ありがとうございます。私はBigQueryからGCSにエクスポートされたデータをソートするので、ファイルが多すぎることに少し驚いています。私の前提は、BiqQueryのエクスポート機能がすでにパーティション数(GCSにデータセットを保存するのに最適なファイル数)を最適化しているということでした。 – user2548047
適用されているRDD操作の種類によっては、変換後のパーティション数が入力パーティション数と異なる場合があります。また、この場合、FileInputFormatは、デフォルトで、入力ファイルをデフォルトでは入力ファイルの数これを '--properties spark.hadoop.fs.gs.block.size = 536870912'で調整すると、デフォルトの64MBではなく512MBに増やすことができます。 –
デフォルトでは、クラスタ展開時に調整することもできます。あなたのジョブが通常10TBの範囲にあれば、 'gcloud dataproc clusters my-cluster --properties core:fs.gs.block.size = 536870912'が合理的です。あなたの仕事が唯一の、例えば10GBだとすれば、それは高すぎるでしょう。たいていの場合、1000以上、50000未満のパーティションを目指すのが良いですが、小さな仕事でも64 MBよりも小さいブロックサイズにしたくないのが普通です。 –