2016-09-13 18 views
0

私はApache Spark(バージョン1.6)の新機能です。壁に当たったと感じます。私は今まで私を助けたものは何も見つかりませんでした。基本的には根本的に間違っていることをしていると思いますが、私が書いたコードの他の部分は正常に動作しているので、正確には何かを指摘することはできません。スパーク性能の問題(「基本的な間違い」の可能性があります)

私は自分の状況を説明するのにできるだけ具体的にしようとしますが、私は理解を深めて作業を簡素化します。私はまだそれを学んでいるので、Sparkのローカルモードを使用してこのコードを実行しています。また、私はDataFrames(RDDではない)を使用してきたことにも注意する価値があります。最後に、次のコードはPysparkを使用してPythonで書かれていることに注意してください。しかし、ScalaやJavaを使用して解決策を歓迎します。

私は、その構造は以下のように似ていると、一般的なJSONファイルを持っている:

{"events":[ 
    {"Person":"Alex","Shop":"Burger King","Timestamp":"100"}, 
    {"Person":"Alex","Shop":"McDonalds","Timestamp":"101"}, 
    {"Person":"Alex","Shop":"McDonalds","Timestamp":"104"}, 
    {"Person":"Nathan","Shop":"KFC","Timestamp":"100"}, 
    {"Person":"Nathan","Shop":"KFC","Timestamp":"120"}, 
    {"Person":"Nathan","Shop":"Burger King","Timestamp":"170"}]} 

私が行うために必要なもの、同じ店に同じ人による2回の訪問との間を通過したどのくらいの時間をカウントしています。出力は、この要件を満たす顧客の数と並んで、少なくとも1人の顧客が少なくとも5秒に1回それらを訪問した店のリストでなければなりません。上記の場合、出力は次のようになります。

{"Shop":"McDonalds","PeopleCount":1} 

私の考えでは、各ペア(人、ショップ)に同じ識別子を割り当て、そのペアが要件を満たしているかどうかを確認するために進みました。識別子は、ウィンドウ関数ROW_NUMBER()を使用して割り当てることができます。これは、hiveContextをSparkで使用する必要があります。

{"events":[ 
    {"Person":"Alex","Shop":"Burger King","Timestamp":"100","ID":1}, 
    {"Person":"Alex","Shop":"McDonalds","Timestamp":"101", "ID":2}, 
    {"Person":"Alex","Shop":"McDonalds","Timestamp":"104", "ID":2}, 
    {"Person":"Nathan","Shop":"KFC","Timestamp":"100","ID":3}, 
    {"Person":"Nathan","Shop":"KFC","Timestamp":"120","ID":3}, 
    {"Person":"Nathan","Shop":"Burger King","Timestamp":"170","ID":4}]} 

私が来る前に、各ペアのためのいくつかのステップを(自己の使用を必要とするこれらのいくつかはに参加する)実行する必要があるとして:これは、ファイルは、上記の識別子が割り当てられた後のようになります方法です結論として、私は一時テーブルを利用しました。

私が書いたコードは、このようなものである(もちろん、私は関連部分のみが含まれている - 「DF」は「データフレーム」の略):

t1_df = hiveContext.read.json(inputFileName) 
t1_df.registerTempTable("events") 
t2_df = hiveContext.sql("SELECT Person, Shop, ROW_NUMBER() OVER (order by Person asc, Shop asc) as ID FROM events group by Person, Shop HAVING count(*)>1") #if there are less than 2 entries for the same pair, then we can discard this pair 
t2_df.write.mode("overwrite").saveAsTable("orderedIDs") 
n_pairs = t2_df.count() #used to determine how many pairs I need to inspect 
i=1 
while i<=n_pairs: 
    #now I perform several operations, each one displaying this structure 
    #first operation... 
    query="SELECT ... FROM orderedIDs WHERE ID=%d" %i 
    t3_df = hiveContext.sql(query) 
    t3_df.write.mode("overwrite").saveAsTable("table1") 
    #...second operation... 
    query2="SELECT ... FROM table1 WHERE ..." 
    t4_df = hiveContext.sql(query2) 
    temp3_df.write.mode("overwrite").saveAsTable("table2") 
    #...and so on. Let us skip to the last operation in this loop, which consists of the "saving" of the shop if it met the requirements: 
    t8_df = hiveContext.sql("SELECT Shop from table7") 
    t8_df.write.mode("append").saveAsTable("goodShops") 
    i=i+1 

#then we only need to write the table to a proper file 
output_df = hiveContext.sql("SELECT Shop, count(*) as PeopleCount from goodShops group by Shop") 
output_df.write.json('output') 

は今、ここに問題があります:出力は正しいものです。私はいくつかの入力を試してきましたが、その点でプログラムはうまくいきます。しかし、それは非常に遅いです:各ペアが持っているエントリにかかわらず、各ペアを分析するのに15-20秒程度かかります。たとえば、10ペアの場合は約3分、100の場合は30分などがあります。私は、比較的まともなハードウェアを持ついくつかのマシンでこのコードを実行しましたが、何も変わりませんでした。 私も使用したテーブルのいくつか(またはすべて)をキャッシングしようとしましたが、問題は依然として続きました(特定の状況ではさらに時間が必要でした)。具体的には、ループの最後の操作(「追加」を使用する操作)は完了するまでに数秒かかりますが(5から10)、最初の6秒は1-2秒しかかかりませんタスクの範囲ですが、より管理しやすくなります)。次の

私は問題が1つ(またはそれ以上)であるのかもしれないと考えている

:ループの

  1. 使用、並列処理のかもしれない問題を引き起こします。
  2. I/O
  3. への書き込みを強制的に「saveAsTable」方法の使用、 キャッシングの
  4. 悪いか、貧しい使用

これら3は、他の作品のように私の心に来て唯一のもの、あります私は基本的に単純なJOIN操作を実行し、registerTempTableメソッドを使用して一時的なテーブルを使用していたので、Sparkを使用して作成したソフトウェア(パフォーマンスの問題が発生していない)は、上記のテクニックを使用しません。理解、ループ)の代わりに使用することはできませんsaveAsTable方法。

私は可能な限り明確にしようとしましたが、詳細が必要な場合は追加情報を提供しています。

EDIT:zero323の回答によって問題を解決できました。確かに、LAG機能の使用は私が本当に必要なものでした。一方、私は "saveAsTable"メソッドの使用は避けるべきであることを学びました。特にループでは、呼び出されるたびにパフォーマンスが大幅に低下するためです。それが絶対に必要でない限り、私は今から使用しないでください。

+0

これはあなたが最初のデータをgroupbykeyを試してみて、あなたのreduceGroups可能性が行う必要があることすべてである場合。それはかなりストレートではありませんが、それは本当に速くすべきです。 – BBogdan

+0

少し拡大してもよろしいですか? –

答えて

1

同じ店が同じ訪問者から2回訪問してからの時間。出力は、この要件を満たす顧客の数と並んで、少なくとも1人の顧客が少なくとも5秒に1回それらを訪問した店のリストでなければなりません。集約と

方法についての簡単なlag

from pyspark.sql.window import Window 
from pyspark.sql.functions import col, lag, sum 

df = (sc 
    .parallelize([ 
     ("Alex", "Burger King", "100"), ("Alex", "McDonalds", "101"), 
     ("Alex", "McDonalds", "104"), ("Nathan", "KFC", "100"), 
     ("Nathan", "KFC", "120"), ("Nathan", "Burger King", "170") 
    ]).toDF(["Person", "Shop", "Timestamp"]) 
    .withColumn("Timestamp", col("timestamp").cast("long"))) 

w = (Window() 
    .partitionBy("Person", "Shop") 
    .orderBy("timestamp")) 

ind = ((
    # Difference between current and previous timestamp le 5 
    col("Timestamp") - lag("Timestamp", 1).over(w)) <= 5 
).cast("long") # Cast so we can sum 

(df 
    .withColumn("ind", ind) 
    .groupBy("Shop") 
    .agg(sum("ind").alias("events")) 
    .where(col("events") > 0) 
    .show()) 

## +---------+------+ 
## |  Shop|events| 
## +---------+------+ 
## |McDonalds|  1| 
## +---------+------+ 
+0

このアイデアは素晴らしく、私はそれを試してみたいと思っています。しかし、私はループで実行したいくつかの他の操作(私の質問で言及していない)がありました。この方法でまだ可能かどうかわかりません。 あなたのアプローチは、私が問題を想像した方法とはまったく異なりますので、必要な情報を提供します。それでも、私のプログラムがなぜとても遅いのですか? –

+0

まあ、言いにくいです.... 'ROW_NUMBER()OVER(人物順、店長)'悪い、繰り返しループ '' SELECT ... FROM ... WHERE ... 'が良くない特に、データが条件ベースのパーティション分割でキャッシュされていない場合には、重複しています。write.mode( "append")は、フォーマットやストレージによっては実際には効率が悪いことがあります...しかし、これはちょうど氷山の一角かもしれません: ) – zero323

関連する問題