2016-03-28 13 views
3

私はいくつかのステップを実行しているいくつかのコードを持っています。プロセス全体にどれくらいの時間がかかります。しかし、個々の変換にどれくらいの時間がかかるかを計算したいと思っています。ステップの簡単な例を次に示します。遅延実行型のSparkでの変換をどのように行うのですか?

rdd1 = sc.textFile("my/filepath/*").map(lambda x: x.split(",")) 
rdd2 = sc.textFile("other/filepath/*").map(lambda x: x.split(",")) 
to_kv1 = rdd1.map(lambda x: (x[0], x[1])) # x[0] is key, x[1] is numeric 
to_kv2 = rdd2.map(lambda x: (x[0], x[1])) # x[0] is key, x[1] is numeric 
reduced1 = to_kv1.reduceByKey(lambda a, b: a+b) 
reduced2 = to_kv1.reduceByKey(lambda a, b: a+b) 
outer_joined = reduced1.fullOuterJoin(reduced2) # let's just assume there is key overlap 
outer_joined.saveAsTextFile("my_output") 

ここで、このコードの特定の部分をベンチマークするにはどうすればよいですか?終了までの実行に一定の時間がかかることがわかります(saveAsTextFileが強制的に実行されます)が、reduceByKeyまたはfullOuterJoinの部分だけをベンチマークするにはどうすればよいですか?実行を強制するために各操作の後にcount()を実行できることがわかっていますが、変換を実行するのにかかる時間とcountを実行するのにかかる時間が追加されるため、操作を適切にベンチマークしません。

怠惰な実行スタイルを考慮して、スパーク変換をベンチマークする最良の方法は何ですか?

時間の測定方法は問いませんのでご注意ください。私はtimeモジュール、start = time.time()などについて知っています。私は、情報をドライバに返す必要があるアクションを呼び出すまで、実行されないスパーク変換の遅延実行スタイルのベンチマーク方法を尋ねています。

+0

「私は時間モジュールについて知っています、start = time.time()など」と書いてあります。これはどういう意味ですか?私は単一の変換/変換のセットを時間をとる方法を探しています。ありがとう! –

+0

実行を強制するアクションで終了する一連の実行を時間切れにしたい場合は、 'start = time.time()'を先頭に、 'elapsed = time.time() - start'を''経過した ''が何を含んでいるかを見てください。 –

答えて

4

あなたの最善の策は、この情報を読むためにSpark UIを使用することです。あなたは、それはで行うことができるようタスクが本当に実行されたときに伝えるためにやや難しいだろうな変換の各内部タイミング機構を追加した場合、計算はそれほどでも配布され

  • :問題は二つあります1つのマシンではなく、別のマシンです。つまり、内部にログを追加して実行の最初のインスタンスを見つけ、最終的な実行を見つけることができます。ただし、次の点に注意してください。
  • 可能な限り変換がパイプライン化されています。したがって、Sparkは同時に複数の変換を効率化するため、その1つの動作を明示的にテストする必要があります。
+0

私はここで第二のポイントは非常に貴重だと思います。私はSparkが同時に複数の変換を実行することに気づいていませんでした。私の目標はより相対的です:どの作業が最も長くかかりますか?私がそれらを正確に(私の理想的な状況)時間を計ることができなくても、それぞれの後に「カウント」を実行すると、どちらが長い時間を取るかという相対的な考えが得られます。 –

関連する問題