MongoDBからSparkデータフレームにデータをロードし、フィルタを適用し、データを処理し(RDDを使用して)、結果を書き戻すPySparkにコードを書きました。 MongoDBへ。測定時間スパーク操作(ロード、処理、書き込み)
# 1) Load the data
df_initial = spark.read.format("com.mongodb.spark.sql").options().schema(schema).load() #df_initial is a Spark dataframe
df_filtered = df_initial.filter(...)
# 2) Process the data
rdd_to_process = df_filtered.rdd
processed_rdd = rdd_to_process.mapPartitions(lambda iterator: process_data(iterator))
# 3) Create a dataframe from the RDD
df_final = spark.createDataFrame(processed_rdd, schema)
df_to_write = df_final.select(...)
# 4) Write the dataframe to MongoDB
df_to_write.write.format("com.mongodb.spark.sql").mode("append").save()
Iは、(データのロードRDD処理、データフレームを作成し、データを書き戻す)各部分にかかる時間を測定したいです。
私は各部分の間にタイマを入れようとしましたが、すべてのスパーク操作が怠惰であると理解していたので、すべてが最後の行で実行されます。
ボトルネックを特定できるように、各部品の使用時間を測定する方法はありますか?
おかげ