2017-11-07 2 views
1

私はハイブで100GBのサイズのテーブルを持っています。私はグループ化し、機能をカウントし、ハイブテーブルとして結果を保存しようとしています。私のハードディスクには600 GBのスペースがあり、ジョブが70%に達するまでにディスクスペースがすべて占有されています。Sparkアグリゲーションのシャッフルディスクの使用量を減らす

ので、私の仕事は、失敗した...どのように私はシャッフルデータを最小限に抑えることができ、通常は非常に簡単で安価なオプションですより多くのディスクを追加し、クラウドベースの実行環境で

hiveCtx.sql("select * from gm.final_orc") 
    .repartition(300) 
    .groupBy('col1, 'col2).count 
    .orderBy('count desc) 
    .write.saveAsTable("gm.result") 

spark_memory

+0

私はグループのカウントを(col1、col2)で取得しようとしています。私は小さなデータセットでテストしました..私は完全なデータを置く間に期待される出力を与えています... –

+0

シャッフルデータのために、私は質問のコードを簡略化しました。記号を使用して簡単な列を参照することができます。 'groupBy'は投影を実行するので、別々の' select'と 'count'が変換として存在する必要はありません。 – Sim

+0

'repartition'の間や' groupBy'シャッフル中のどこでディスク容量が足りませんか?グループの時に – Sim

答えて

0

を書き込みました。あなたの環境がこれを許可しておらず、あなたのshuffles settingsが妥当であることを確認した場合、例えば圧縮(デフォルトでは)は変更されていない場合、1つの解決策があります。 sumで再集計してください。フィット思わ

  1. パーティションどのような方法でデータ(ファイルの数によって、ディレクトリで、日付によって、など)
  2. が別々のスパークアクションとしてcol1col2でカウントを行います。
  3. 再グループ化して再集計します。
  4. ソート。

簡略化のため、col1が整数であるとします。ここでは、処理を8つの別々のジョブに分割し、出力を再集計する方法を示します。 col1が整数でない場合は、ハッシュするか、別の列を使用できます。

def splitTableName(i: Int) = s"tmp.gm.result.part-$i" 

// Source data 
val df = hiveCtx.sql("select col1, col2 from gm.final_orc") 

// Number of splits 
val splits = 8 

// Materialize partial aggregations 
val tables = for { 
    i <- 0 until splits 
    tableName = splitTableName(i) 
    // If col1 % splits will create very skewed data, hash it first, e.g., 
    // hash(col1) % splits. hash() uses Murmur3. 
    _ = df.filter('col1 % splits === i) 
    // repartition only if you need to, e.g., massive partitions are causing OOM 
    // better to increase the number of splits and/or hash to un-skew skewed data 
    .groupBy('col1, 'col2).count 
    .write.saveAsTable(tableName) 
} yield hiveCtx.table(tableName) 

// Final aggregation 
tables.reduce(_ union _) 
    .groupBy('col1, 'col2) 
    .agg(sum('count).as("count")) 
    .orderBy('count.desc) 
    .write.saveAsTable("gm.result") 

// Cleanup temporary tables 
(0 until splits).foreach { i => 
    hiveCtx.sql(s"drop table ${splitTableName(i)}") 
} 

col1col2は、次のいずれかを検討する必要がありそう多様化および/または部分的に集約ストレージはディスクスペースの問題を引き起こしていることを非常に大きくしている場合は、次の分割の

  • 数が少ないです一般に使用されるディスク容量は少なくなります。

  • 並べ替えをcol1にすると(Parquetランレングスエンコーディングのために)役立ちますが、実行が遅くなります。

  • 独立した分割を作成する方法。具体的な値はcol1でグループ化します。

非常にディスクスペースが不足している場合は、マルチステップ再集計を実装する必要があります。最も簡単な方法は、一度に1つの分割を生成し、実行中の集約を維持することです。実行速度ははるかに遅くなりますが、ディスク領域が大幅に少なくなります。

希望すると便利です。

+0

ありがとうSim..In私の場合、col1とcol2の両方が整数です。私はこのコードを試して結果を更新します。 –

+0

.agg(sum( 'count).as( "count"))..... .....ここでは、ライブラリから "sum"関数をインポートしました –

+0

上記のクエリを無視します...私はsql関数を完全にインポートしました.. –

関連する問題