2016-04-02 4 views
5

私は2015年からの米国国内線の定時のパフォーマンス記録を分析しています。私はテール番号でグループ化し、データベースに各テールナンバーのすべてのフライトのソートされたリストをアプリケーションに取り込む必要があります。私はこれを達成するための2つのオプションのうちのどれが最良のものか確信していません。PySparkでソートされたreduceを行う最も効率的な方法は何ですか?

# Load the parquet file 
on_time_dataframe = sqlContext.read.parquet('../data/on_time_performance.parquet') 

# Filter down to the fields we need to identify and link to a flight 
flights = on_time_dataframe.rdd.map(lambda x: 
    (x.Carrier, x.FlightDate, x.FlightNum, x.Origin, x.Dest, x.TailNum) 
) 

削減でこれを行う

# Do same in a map step, more efficient or does pySpark know how to optimize the above? 
flights_per_airplane = flights\ 
    .map(lambda nameTuple: (nameTuple[5], [nameTuple]))\ 
    .reduceByKey(lambda a, b: a + b)\ 
    .map(lambda tuple: 
    (
     tuple[0], sorted(tuple[1], key=lambda x: (x[1],x[2],x[3],x[4]))) 
    ) 

...私は...ソート削減に

# Group flights by tail number, sorted by date, then flight number, then 
origin/dest 
flights_per_airplane = flights\ 
    .map(lambda nameTuple: (nameTuple[5], [nameTuple]))\ 
    .reduceByKey(lambda a, b: sorted(a + b, key=lambda x: (x[1],x[2],x[3],x[4]))) 

をこれを達成することができますまたは私は、その後のマップジョブでそれを達成することができます実際には効率が悪いようですが、実際にはどちらも非常に遅いです。 sorted()はPySparkのドキュメントでこれを行う方法のように見えるので、PySparkがこのコーシャーを内部的に作れないのだろうか?他の何らかの理由で、どのオプションが最も効率的か、最良の選択ですか?

私のコードはここに要旨でもある:https://gist.github.com/rjurney/af27f70c76dc6c6ae05c465271331ade

あなたがデータについて興味あれば、それはここでは、交通統計局からである:http://www.transtats.bts.gov/DL_SelectFields.asp?Table_ID=236&DB_Short_Name=On-Time

答えて

3

は、残念ながら、両方の方法は、あなたの前に間違っていますソートを開始することもできますし、スパークでこれを行う効果的で簡単な方法はありません。それでも、最初のものは他のものよりもかなり悪いです。

なぜ両方の方法が間違っていますか?それはちょうど別のgroupByKeyなので、それは単に高価な操作です。あなたは物事を改善しようとすることができますいくつかの方法があります(特にマップ側の削減を避けるために)が、一日の終わりには、完全なシャッフルの価格を支払う必要がありますし、すべての騒ぎ。

まだ、2番目の方法はアルゴリズムにより優れています*。最初の試みのように仕分けされた構造を完全に保持したい場合は、専用のツール(aggregateByKey with bisect.insort)を使用することをお勧めしますが、実際には何も得られません。

グループ化された出力が厳しい要件の場合は、keyBy,groupByKeyと並べ替えが最適です。あなたがTimsortための最良のシナリオを想定した場合でも最初のアプローチは、N回O(N)中です*

(flights 
    .keyBy(lambda x: x[5]) 
    .groupByKey() 
    .mapValues(lambda vs: sorted(vs, key=lambda x: x[1:5]))) 

:それは第二の溶液よりもパフォーマンスは改善されませんが、間違いなく読みやすさを向上します第2のものは、最悪の場合のシナリオではO(Nlog N)である。

+0

私はこの質問に真っ直ぐな答えを期待していなかったと言いたいと思っていました。あなたが与えたのは本当に素晴らしいです! – rjurney

関連する問題