2016-10-25 17 views
0

を考えると、次のファイル:スパークグループ化チューニング

  • 従業員
  • スキル
  • レポート
  • など
従業員の間

や他のファイルの各1対がありますN関係、例えば1人の従業員に複数のスキルが対応します。各ファイルは500MBから1.5GBの間にあり、合計で約10ファイルあります。各従業員の は、私は/すべてのファイル(スキル、レポートなど)からすべての情報を収集し、XML構造に書き込み集約したい:

<employees> 
    <employee> 
    <skills> 
     <skill>...</skill> 
     <skill>...</skill> 
     ... 
    </skills> 
    <reports 
     <report>...</report> 
     <report>...</report> 
     ... 
    </reports> 
    ... 
    </employee> 
    ... 
</employees> 

私は線に沿って何かをやっています:

val employeesRdd = employeesDf.map(r => (r.getAs[String]("employeeId"), r)) 
val skillsRdd = skillsDf.map(r => (r.getAs[String]("employeeId"), r)).groupByKey() 
val reportsRdd = reportsDf.map(r => (r.getAs[String]("employeeId"), r)).groupByKey() 
... 

employeesRdd 
    .leftOuterJoin(skillsRdd) 
    .leftOuterJoin(reportsRdd) 
    ... 
    .toLocalIterator 
    ... // write <employee> nodes one by one 

問題すべてのgroupByKey()操作は非常に遅く、多くの時間がかかることがあります。そして、あまりにも長い間実行した後、java.lang.OutOfMemoryErrorのために爆発します:GCオーバーヘッド限界を超えました。私はjvmに割り当てられた約20GBのローカルモードでSpark 1.5.1を使用しています。

答えて

0

Spark DataFrameのパーティション分割が最適です。

関連付けは、関連情報のデータをnearyで保存するのに役立ちます。必要な情報に迅速にアクセスするプロセスを支援します。

offical docdoc

関連する問題