私はいくつかのステップを実行しているいくつかのコードを持っています。プロセス全体にどれくらいの時間がかかります。しかし、個々の変換にどれくらいの時間がかかるかを計算したいと思っています。ステップの簡単な例を次に示します。遅延実行型のSparkでの変換をどのように行うのですか?
rdd1 = sc.textFile("my/filepath/*").map(lambda x: x.split(","))
rdd2 = sc.textFile("other/filepath/*").map(lambda x: x.split(","))
to_kv1 = rdd1.map(lambda x: (x[0], x[1])) # x[0] is key, x[1] is numeric
to_kv2 = rdd2.map(lambda x: (x[0], x[1])) # x[0] is key, x[1] is numeric
reduced1 = to_kv1.reduceByKey(lambda a, b: a+b)
reduced2 = to_kv1.reduceByKey(lambda a, b: a+b)
outer_joined = reduced1.fullOuterJoin(reduced2) # let's just assume there is key overlap
outer_joined.saveAsTextFile("my_output")
ここで、このコードの特定の部分をベンチマークするにはどうすればよいですか?終了までの実行に一定の時間がかかることがわかります(saveAsTextFile
が強制的に実行されます)が、reduceByKey
またはfullOuterJoin
の部分だけをベンチマークするにはどうすればよいですか?実行を強制するために各操作の後にcount()
を実行できることがわかっていますが、変換を実行するのにかかる時間とcount
を実行するのにかかる時間が追加されるため、操作を適切にベンチマークしません。
怠惰な実行スタイルを考慮して、スパーク変換をベンチマークする最良の方法は何ですか?
時間の測定方法は問いませんのでご注意ください。私はtime
モジュール、start = time.time()
などについて知っています。私は、情報をドライバに返す必要があるアクションを呼び出すまで、実行されないスパーク変換の遅延実行スタイルのベンチマーク方法を尋ねています。
「私は時間モジュールについて知っています、start = time.time()など」と書いてあります。これはどういう意味ですか?私は単一の変換/変換のセットを時間をとる方法を探しています。ありがとう! –
実行を強制するアクションで終了する一連の実行を時間切れにしたい場合は、 'start = time.time()'を先頭に、 'elapsed = time.time() - start'を''経過した ''が何を含んでいるかを見てください。 –