コンテキスト:私たちのデータパイプラインでは、spark SQLを使用して、エンドユーザーからテキストファイル次にパラメータ化します。多くの小さなファイルを避けるためにspark SQLクエリの結果を統合する方法/空のファイルを避ける方法
状況:
私たちのクエリは次のようになります。あなたはこの結果を見ると
INSERT OVERWRITE TABLE ... PARTITION (...)
SELECT
stuff
FROM
sometable
問題ではなく、最大の大きさのファイルの束を作成するよりも、ありますデフォルトでは200個のパーティションが作成されるため、200個の小さなファイルが作成されます。 (一部のクエリでは、入力データとSELECT
クエリに応じて、200億バークリットを読み込みます)。小さなファイルがたくさんあると、私たちのシステム管理者は不評になります。
しようとしました修正(それが動作しません)ドキュメントの
充実していますが、このような状況で指定したパーティションのすべてのデータが同じパーティションに行くことを確実にするために、DISTRIBUTE BY
を使用すべきことを示唆しているので、
INSERT OVERWRITE TABLE ... PARTITION (...)
SELECT
stuff
FROM
sometable
DISTRIBUTE BY
1
このように動作しないのはなぜですか(spark 2.0とspark 2.2でテストされます)?すべてのデータを1つのレデューサーに正常に送信します。実際のデータはすべて1つの大きなファイルに格納されています。しかし、まだ200個のファイルが作成されており、うち199個は空です。 (私たちは、おそらくDISTRIBUTE BY
たちのパーティション列べきであると認識してんだけど、これは最も単純な例を提供することで)作業を行う
修正し、それが可能である私たちのユースケース
に適合しません(pyspark
構文で)これ3210またはpartition
を、使用して正しいことを行うために、これを取得する:
select = sqlContext.sql('''SELECT stuff FROM sometable''').coalesce(1)
select.write.insertInto(target_table, overwrite=True)
をしかし、私たちは完全な方法それを変更する必要があるとして、このようなことを行うにはしたくないユーザークエリを提供してください。私が起こることを(かなり複雑な)クエリ内のすべての計算を強制したくないとして、
conf.set("spark.sql.shuffle.partitions","1");
を私はこれを試していない:私はまた、我々が設定できることを見てきました
1つの減速機、ディスクへの最後の書き込みを行うもののみ。 (私はこのことを心配べきではない場合、私に知らせて!)
質問:
- のみスパークSQL構文を使用して、私はいくつかのファイルとして書き込まれたクエリを書くのですかどのようにできるだけ多くの空の/小さなファイルを作成しませんか?
おそらく関連:
- merge-multiple-small-files-into-few-larger-files-in-spark(溶液はSparkSQLなければならない制限がなく、上記の通り、
DISTRIBUTE BY
が実際に動作しない)
spark coalesce doesn't work
- (私たちのためだけに問題はありません)
あなたの質問に答えていないが、我々は、同様の問題がありました。私たちは2つの仕事を使ってそれを解決しました。スパークの結果を計算し、それを「ステージング」の場所に書き出しました。次のジョブ、トリガーされたbashスクリプトは、ファイルをまとめて連結し、必要な場所に配信し、ステージング領域をクリーンアップしました。 bashスクリプトは非常に高速でした。あまり優雅ではありませんが、間違いなく働きました – Jeremy