0
を考えると、次のファイル:スパークグループ化チューニング
- 従業員
- スキル
- レポート
- など
や他のファイルの各1対がありますN関係、例えば1人の従業員に複数のスキルが対応します。各ファイルは500MBから1.5GBの間にあり、合計で約10ファイルあります。各従業員の は、私は/すべてのファイル(スキル、レポートなど)からすべての情報を収集し、XML構造に書き込み集約したい:
<employees>
<employee>
<skills>
<skill>...</skill>
<skill>...</skill>
...
</skills>
<reports
<report>...</report>
<report>...</report>
...
</reports>
...
</employee>
...
</employees>
私は線に沿って何かをやっています:
val employeesRdd = employeesDf.map(r => (r.getAs[String]("employeeId"), r))
val skillsRdd = skillsDf.map(r => (r.getAs[String]("employeeId"), r)).groupByKey()
val reportsRdd = reportsDf.map(r => (r.getAs[String]("employeeId"), r)).groupByKey()
...
employeesRdd
.leftOuterJoin(skillsRdd)
.leftOuterJoin(reportsRdd)
...
.toLocalIterator
... // write <employee> nodes one by one
問題すべてのgroupByKey()操作は非常に遅く、多くの時間がかかることがあります。そして、あまりにも長い間実行した後、java.lang.OutOfMemoryErrorのために爆発します:GCオーバーヘッド限界を超えました。私はjvmに割り当てられた約20GBのローカルモードでSpark 1.5.1を使用しています。