2017-08-02 8 views
1

私は(スカラ座で)このようなコードを使用してRDD上の各mapPartition操作の実行時間をログに記録しようとしていますが:ApacheのスパークmapPartition奇妙な行動(遅延評価?)

rdd.mapPartitions{partition => 
    val startTime = Calendar.getInstance().getTimeInMillis 
    result = partition.map{element => 
     [...] 
    } 
    val endTime = Calendar.getInstance().getTimeInMillis 
    logger.info("Partition time "+(startTime-endTime)+ "ms") 
    result 
} 

問題はそれということです私はいつも2ミリ秒のような時間を得るので、マップ操作を実行し始める前に、すぐに "パーティション時間"を記録します。

私はSpark Web UIを見て気づきましたが、ログファイルでは、タスクが開始された直後に実行時間に関する行が表示されます。

誰かが私の理由を説明できますか? mapPartitions内でコードを線形に実行するか、間違っていますか? mapPartitionsの内部

おかげ

よろしくルカ

+0

変換は遅延評価されます。 – philantrovert

+0

ありがとう! endTimeの前に "result.size"を入れることを解決しました。 私は、デフォルトでは、スカラ操作であるmapPartitions内のマップが怠惰ではないと考えました。 – Gaglia88

+0

@philantrovertいいえ、これは理由ではありません、map内のマップPartitionsはスパーク変換ではありません。これは純粋なスカラ関連です –

答えて

3

partitionsIterator[Row]であり、そしてIteratorは(イテレータが消費され、すなわちとき)Scalaで遅延評価されます。これはスパークの怠け者の証言には何もありません!

partitions.sizeを呼び出すと、マッピングの評価がトリガーされますが、イテレータは1回しか使用できないため、消費されます。たとえば、あなたが何ができるか

val it = Iterator(1,2,3) 
it.size // 3 
it.isEmpty // true 

は非怠惰なコレクション型にIteratorを変換することです:

rdd.mapPartitions{partition => 
    val startTime = Calendar.getInstance().getTimeInMillis 
    result = partition.map{element => 
     [...] 
    }.toVector // now the statements are evaluated 
    val endTime = Calendar.getInstance().getTimeInMillis 
    logger.info("Partition time "+(startTime-endTime)+ "ms") 
    result.toIterator 
} 

EDIT:あなたの代わりにCalendarを使用してのSystem.currentTimeMillis()(あるいはSystem.nanoTime())を使用することができます。