を書き込みました。あなたの環境がこれを許可しておらず、あなたのshuffles settingsが妥当であることを確認した場合、例えば圧縮(デフォルトでは)は変更されていない場合、1つの解決策があります。 sum
で再集計してください。フィット思わ
- パーティションどのような方法でデータ(ファイルの数によって、ディレクトリで、日付によって、など)
- が別々のスパークアクションとして
col1
とcol2
でカウントを行います。
- 再グループ化して再集計します。
- ソート。
簡略化のため、col1
が整数であるとします。ここでは、処理を8つの別々のジョブに分割し、出力を再集計する方法を示します。 col1
が整数でない場合は、ハッシュするか、別の列を使用できます。
def splitTableName(i: Int) = s"tmp.gm.result.part-$i"
// Source data
val df = hiveCtx.sql("select col1, col2 from gm.final_orc")
// Number of splits
val splits = 8
// Materialize partial aggregations
val tables = for {
i <- 0 until splits
tableName = splitTableName(i)
// If col1 % splits will create very skewed data, hash it first, e.g.,
// hash(col1) % splits. hash() uses Murmur3.
_ = df.filter('col1 % splits === i)
// repartition only if you need to, e.g., massive partitions are causing OOM
// better to increase the number of splits and/or hash to un-skew skewed data
.groupBy('col1, 'col2).count
.write.saveAsTable(tableName)
} yield hiveCtx.table(tableName)
// Final aggregation
tables.reduce(_ union _)
.groupBy('col1, 'col2)
.agg(sum('count).as("count"))
.orderBy('count.desc)
.write.saveAsTable("gm.result")
// Cleanup temporary tables
(0 until splits).foreach { i =>
hiveCtx.sql(s"drop table ${splitTableName(i)}")
}
col1
とcol2
は、次のいずれかを検討する必要がありそう多様化および/または部分的に集約ストレージはディスクスペースの問題を引き起こしていることを非常に大きくしている場合は、次の分割の
数が少ないです一般に使用されるディスク容量は少なくなります。
並べ替えをcol1
にすると(Parquetランレングスエンコーディングのために)役立ちますが、実行が遅くなります。
独立した分割を作成する方法。具体的な値はcol1
でグループ化します。
非常にディスクスペースが不足している場合は、マルチステップ再集計を実装する必要があります。最も簡単な方法は、一度に1つの分割を生成し、実行中の集約を維持することです。実行速度ははるかに遅くなりますが、ディスク領域が大幅に少なくなります。
希望すると便利です。
出典
2017-11-13 08:43:14
Sim
私はグループのカウントを(col1、col2)で取得しようとしています。私は小さなデータセットでテストしました..私は完全なデータを置く間に期待される出力を与えています... –
シャッフルデータのために、私は質問のコードを簡略化しました。記号を使用して簡単な列を参照することができます。 'groupBy'は投影を実行するので、別々の' select'と 'count'が変換として存在する必要はありません。 – Sim
'repartition'の間や' groupBy'シャッフル中のどこでディスク容量が足りませんか?グループの時に – Sim