2017-09-21 10 views
0

MongoDBからSparkデータフレームにデータをロードし、フィルタを適用し、データを処理し(RDDを使用して)、結果を書き戻すPySparkにコードを書きました。 MongoDBへ。測定時間スパーク操作(ロード、処理、書き込み)

# 1) Load the data 
df_initial = spark.read.format("com.mongodb.spark.sql").options().schema(schema).load() #df_initial is a Spark dataframe 
df_filtered = df_initial.filter(...) 

# 2) Process the data 
rdd_to_process = df_filtered.rdd 
processed_rdd = rdd_to_process.mapPartitions(lambda iterator: process_data(iterator)) 

# 3) Create a dataframe from the RDD 
df_final = spark.createDataFrame(processed_rdd, schema) 
df_to_write = df_final.select(...) 

# 4) Write the dataframe to MongoDB 
df_to_write.write.format("com.mongodb.spark.sql").mode("append").save() 

Iは、(データのロードRDD処理、データフレームを作成し、データを書き戻す)各部分にかかる時間を測定したいです。

私は各部分の間にタイマを入れようとしましたが、すべてのスパーク操作が怠惰であると理解していたので、すべてが最後の行で実行されます。

ボトルネックを特定できるように、各部品の使用時間を測定する方法はありますか?

おかげ

答えて

0

スパークを使用すると、データフレームのAPIを使用する場合は特に、いくつかの操作をインライン化することができます。そのため、「コードパーツ」の実行統計を取得することはできませんが、異なるステージについてのみ実行できます。

これらの情報を直接コンテキストから取得する簡単な方法はありませんが、REST APIは、使用する可能性のある多くの情報を示しています。時間は各段階で過ごし取得するにはたとえば次の手順を使用することができます。

import datetime 
import requests 
parse_datetime = lambda date: datetime.datetime.strptime(date, "%Y-%m-%dT%H:%M:%S.%fGMT") 
dates_interval = lambda dt1, dt2: parse_datetime(dt2) - parse_datetime(dt1) 

app_id = spark.sparkContext.applicationId 
data = requests.get(spark.sparkContext.uiWebUrl + "/api/v1/applications/" + app_id + "/stages").json() 

for stage in data: 
    stage_time = dates_interval(stage['submissionTime'], stage['completionTime']).total_seconds() 
    print("Stage {} took {}s (tasks: {})".format(stage['stageId'], stage_time, stage['numCompleteTasks'])) 

出力例は次のようになります。

Stage 4 took 0.067s (tasks: 1) 
Stage 3 took 0.53s (tasks: 1) 
Stage 2 took 1.592s (tasks: 595) 
Stage 1 took 0.363s (tasks: 1) 
Stage 0 took 2.367s (tasks: 595) 

しかし、それは業務を担当段階が何であるかを識別するためにあなたの仕事ですあなたは測定したい。

関連する問題