2017-10-25 13 views
0

コンテキスト:私たちのデータパイプラインでは、spark SQLを使用して、エンドユーザーからテキストファイル次にパラメータ化します。多くの小さなファイルを避けるためにspark SQLクエリの結果を統合する方法/空のファイルを避ける方法

状況

私たちのクエリは次のようになります。あなたはこの結果を見ると

INSERT OVERWRITE TABLE ... PARTITION (...) 

SELECT 
    stuff 
FROM 
    sometable 

問題ではなく、最大の大きさのファイルの束を作成するよりも、ありますデフォルトでは200個のパーティションが作成されるため、200個の小さなファイルが作成されます。 (一部のクエリでは、入力データとSELECTクエリに応じて、200億バークリットを読み込みます)。小さなファイルがたくさんあると、私たちのシステム管理者は不評になります。

しようとしました修正(それが動作しません)ドキュメントの

充実していますが、このような状況で指定したパーティションのすべてのデータが同じパーティションに行くことを確実にするために、DISTRIBUTE BYを使用すべきことを示唆しているので、

INSERT OVERWRITE TABLE ... PARTITION (...) 

SELECT 
    stuff 
FROM 
    sometable 
DISTRIBUTE BY 
    1 

このように動作しないのはなぜですか(spark 2.0とspark 2.2でテストされます)?すべてのデータを1つのレデューサーに正常に送信します。実際のデータはすべて1つの大きなファイルに格納されています。しかし、まだ200個のファイルが作成されており、うち199個は空です。 (私たちは、おそらくDISTRIBUTE BYたちのパーティション列べきであると認識してんだけど、これは最も単純な例を提供することで)作業を行う

修正し、それが可能である私たちのユースケース

に適合しません(pyspark構文で)これ​​3210またはpartitionを、使用して正しいことを行うために、これを取得する:

select = sqlContext.sql('''SELECT stuff FROM sometable''').coalesce(1) 
select.write.insertInto(target_table, overwrite=True) 

をしかし、私たちは完全な方法それを変更する必要があるとして、このようなことを行うにはしたくないユーザークエリを提供してください。私が起こることを(かなり複雑な)クエリ内のすべての計算を強制したくないとして、

conf.set("spark.sql.shuffle.partitions","1");

を私はこれを試していない:私はまた、我々が設定できることを見てきました

1つの減速機、ディスクへの最後の書き込みを行うもののみ。 (私はこのことを心配べきではない場合、私に知らせて!)

質問

  • のみスパークSQL構文を使用して、私はいくつかのファイルとして書き込まれたクエリを書くのですかどのようにできるだけ多くの空の/小さなファイルを作成しませんか?

おそらく関連:

+0

あなたの質問に答えていないが、我々は、同様の問題がありました。私たちは2つの仕事を使ってそれを解決しました。スパークの結果を計算し、それを「ステージング」の場所に書き出しました。次のジョブ、トリガーされたbashスクリプトは、ファイルをまとめて連結し、必要な場所に配信し、ステージング領域をクリーンアップしました。 bashスクリプトは非常に高速でした。あまり優雅ではありませんが、間違いなく働きました – Jeremy

答えて

1

だから、それは私が間違っていたところ、物事を単純化する私の試みであると思われる

(私たちは、おそらく私たちのパーティションBY列配布してくださいことを承知しているが、これは最も単純な例を提供することです)。人工の1(つまりDISTRIBUTE BY load_dateなど)ではなくI DISTRIBUTE BY実際の列の場合、空のファイルは作成されません。どうして?誰もが知っている...

は(これもmerge-multiple-small-files-in-to-few-larger-files-in-sparkスレッドでthis答えが一致する)

関連する問題