2016-06-17 4 views
2

Spark(バージョン1.6.1)を使用して2つのjsonファイルを生成するためにjsonファイルを処理しています。入力ファイルのサイズは約30〜40G(100Mレコード)です。生成されたファイルの場合、大きなものは10G〜15G(30Mレコード)、小さいものは500M〜750M(1.5Mレコード)です。ソート後にデータフレームの並べ替えが行われない

それは単一のファイルに結果をマージするために「配分」を行った後、私は、データフレームのための「並べ替え」メソッドを呼び出した:両方の結果ファイルは、以下の問題に直面しています。次に、生成されたファイルを確認し、がレコードが順序付けられた間隔で見つかりましたが、ファイル全体がグローバルに注文されていません。例えばファイル内の最後のレコード(行番号1.9M)のキー(3列から構成)は "(ou7QDj48c、014,075)"ですが、ファイル(行番号375K)の中間レコードのキーは " pzwzh5vm8、003、023)」私はローカルに比較的小さな入力ソース(入力ファイル400K線)、そのような場合は全く発生しませんを使用してコードをテスト

pzwzh5vm8 003 023 
... 
ou7QDj48c 014 075 

私の具体的なコードを以下に示します。

big_json = big_json.sort($"col1", $"col2", $"col3", $"col4") 
big_json.repartition(1).write.mode("overwrite").json("filepath") 

誰もがアドバイスを与えることができますか?ありがとうございました。

(これも同様の問題が議論されていますが、これまでは良い解決策はありませんでした)この現象が本当に再パーティション化の結果である場合は、誰でもデータフレームを単一のjsonファイルソート順序を維持しながら、?ありがとう)

=========================== SOLUTION ====、RDDに変換しなし=========================

は本当に@eliasahと@pkrishna @manosからの助けを感謝しています。私はあなたのコメントを読んだ後に合体を使うことを考えましたが、そのパフォーマンスを調査した後、私はそのアイデアをあきらめました。

最終溶液は:データフレームをソートし、任意の配分又は合体することなく、JSONに書き込みます。すべての作業が完了したら、以下のhdfsコマンドを呼び出してください。

hdfs dfs -getmerge /hdfs/file/path/part* ./local.json 

このコマンドは私の想像よりはるかに優れています。それはあまりにも多くの時間もあまりにも多くのスペースを必要とせず、私に良い単一のファイルを与えます。私は巨大な結果ファイルにheadtailを使用しました。完全に注文されたようです。

+0

再パーティションしないでください。複数のファイルを作成し、適切な並べ替え順に並べる必要があります。 – marios

答えて

5

コメントに記載されているように、並べ替え操作の後でパーティションを再設定しています。

だから、これは何をすべきか配分である:それは、より多くのまたはより少ないパーティションのいずれかを作成し、それら全体にバランスをとるために、ランダムにRDDのデータをreshuffles。これにより、ネットワーク上のすべてのデータが常にシャッフルされます。

フードの下では、合体とシャッフルを使用してデータを再配布します。 [Reference]

したがって、あなたのデータはもはやソートされません!

1

パーティション数はパーティションがRDD内のパーティションの数を減少させるために1

に低減されることを意味する、あなたの例では1に設定されているので、スパーク(シャッフル=偽で)変換合体を提供注文を保存します。

eliasahとして、融合を使用してフードの下で再分割を述べました。それは、shuffle = trueで合体を呼び出します。したがって、shuffle = falseを指定して再分割の代わりに合体変換を使用することができます。

関連する問題