1

スパークプログラムの実行速度を遅くしたいが、怠惰のためにかなり難しい。さんがここで考慮に入れ、この(無意味な)コードを見てみましょう:スパークプログラムの実行速度を測定する方法

var graph = GraphLoader.edgeListFile(context, args(0)) 
val graph_degs = graph.outerJoinVertices(graph.degrees).triplets.cache 

/* I'd need to start the timer here */ 
val t1 = System.currentTimeMillis 
val edges = graph_degs.flatMap(trip => { /* do something*/ }) 
         .union(graph_degs) 

val count = edges.count 
val t2 = System.currentTimeMillis 
/* I'd need to stop the timer here */ 

println("It took " + t2-t1 + " to count " + count) 

事があり、何もval count = edges.count行の前に評価されなかっますので、変換が遅延しています。しかし、私の見解によると、t1は上記のコードが値を持たないにもかかわらず値を取得します... t1のコードは、コードの位置にかかわらずタイマーが開始した後に評価されます。それは問題です...

Spark Web UIでは、その特定のコード行の後に費やされる時間が必要なので、興味深いものは見つかりません。あなたは、変換のグループが本当に評価されるのを見るための簡単な解決策があると思いますか?

+0

可能な重複メソッドはScala?](http://stackoverflow.com/questions/9160001/how-to-profile-methods-in-scala) –

+0

この記事は重複していないようですが、Apache Sparkに固有のものです。測定ツールを提供し、具体的なプロファイリングの課題を提示します。ここで説明するように、評価は怠惰であり、therefo再評価されたコードブロックは、測定された操作を表していない可能性があります。 –

答えて

2

連続変換をするので(同じタスク内 - 意味、それらはシャッフルによって分離されていないと同じ作用の一部として実行される)単一の「ステップ」として実行される、スパークはないを行いますそれらを個別に測定する。そして、ドライバーコードから - どちらもできません。

// create accumulator 
val durationAccumulator = sc.longAccumulator("flatMapDuration") 

// "wrap" your "doSomething" operation with time measurement, and add to accumulator 
val edges = rdd.flatMap(trip => { 
    val t1 = System.currentTimeMillis 
    val result = doSomething(trip) 
    val t2 = System.currentTimeMillis 
    durationAccumulator.add(t2 - t1) 
    result 
}) 

// perform the action that would trigger evaluation 
val count = edges.count 

// now you can read the accumulated value 
println("It took " + durationAccumulator.value + " to flatMap " + count) 

あなたがのためにこれを繰り返すことができます:あなた行うことができますが、各レコードにあなた関数を適用した時間を測定し、それをすべて合計するアキュムレータを使用し、例えばある何

個々の変換。免責事項

は:

もちろん
  • 、これは、Sparkは周りのものをシャッフルして、実際のカウントをやって過ごした時間は含まれません - そのため、実際に、スパークUIはあなたの最高のリソースです。
  • アキュムレータはリトライなどの影響を受けやすいので、再試行されたタスクはアキュムレータを2回更新します。

スタイル注: あなたが任意の関数の周りに「ラップ」というmeasure機能を作成することで、このコードは、より再利用可能にし、与えられたアキュムレータを更新することができますプロファイルに[方法の

// write this once: 
def measure[T, R](action: T => R, acc: LongAccumulator): T => R = input => { 
    val t1 = System.currentTimeMillis 
    val result = action(input) 
    val t2 = System.currentTimeMillis 
    acc.add(t2 - t1) 
    result 
} 

// use it with any transformation: 
rdd.flatMap(measure(doSomething, durationAccumulator)) 
+0

それは良いアイデアですね!しかし、正確ではありません。あなたが「スパークが物事を周りに回して実際のカウントをしていた時間は含まない」と言ったからです。とにかく面白い。 – Matt

0

Spark Web UIはすべての単一のアクションを記録し、そのアクションのすべてのステージの時間も報告します。すべてのアクションが記録されます。ジョブではなく、ステージのタブを調べる必要があります。私はあなたがコードをコンパイルして提出する場合にしか使えないことを発見しました。 REPLでは役に立たないですが、これを万事使用していますか?

関連する問題