2016-11-08 7 views
5

I持ってスパークジョブの(寄木細工で)次の入力データ:スパークデータフレームは、低速域での参加

Person (millions of rows) 
+---------+----------+---------------+---------------+ 
| name | location |  start  |  end  | 
+---------+----------+---------------+---------------+ 
| Person1 |  1230 | 1478630000001 | 1478630000010 | 
| Person2 |  1230 | 1478630000002 | 1478630000012 | 
| Person2 |  1230 | 1478630000013 | 1478630000020 | 
| Person3 |  3450 | 1478630000001 | 1478630000015 | 
+---------+----------+---------------+---------------+ 


Event (millions of rows) 
+----------+----------+---------------+ 
| event | location | start_time | 
+----------+----------+---------------+ 
| Biking |  1230 | 1478630000005 | 
| Skating |  1230 | 1478630000014 | 
| Baseball |  3450 | 1478630000015 | 
+----------+----------+---------------+ 

と私は次の期待される成果にそれを変換する必要があります。

[{ 
    "name" : "Biking", 
    "persons" : ["Person1", "Person2"] 
}, 
{ 
    "name" : "Skating", 
    "persons" : ["Person2"] 
}, 
{ 
    "name" : "Baseball", 
    "persons" : ["Person3"] 
}] 

単語:結果は、このイベントに参加した人物のリストを含む各イベントのリストです。

Person.start < Event.start_time 
&& Person.end > Event.start_time 
&& Person.location == Event.location 

場合

人が参加者としてカウント私は別のアプローチを試してみましたが、実際に動作するように思われる一方のみが2つのデータフレームを結合して、グループ/イベントによって、それらを集約する です。 しかし、結合は非常に遅く、複数のCPUコアにうまく分散しません。

参加のための現在のコード:

final DataFrame fullFrame = persons.as("persons") 
    .join(events.as("events"), col("persons.location").equalTo(col("events.location")) 
       .and(col("events.start_time").geq(col("persons.start"))) 
       .and(col("events.start_time").leq(col("persons.end"))), "inner"); 

//count to have an action 
fullFrame.count(); 

これは違いを作る場合、私は、スパークスタンドアロンおよびJavaを使用しています。

Spark 1.6.2でこの問題を解決する方法は誰にも分かりますか?

答えて

1

範囲結合は、次のフィルタステップでクロスプロダクトとして実行されます。潜在的により良い解決策は、ブロードキャストの可能性があるより小さなeventsテーブルにしてpersonsテーブルをマップすることです。マップ内で、結合条件をチェックし、それぞれの結果を生成します。

+0

実際に「ブロードキャスト結合」を使用すると、これが大幅に改善されました。イベントテーブルを複数の小さなチャンクに分割してメモリに収め、1つずつ結合する必要がありました。 –

関連する問題