6

タスクまたはジョブの実行直後に、これらのメトリックをコンソール(SparkシェルまたはSparkサブミットジョブ)で収集する方法を教えてください。Spark UIから出力サイズやレコードなどのメトリックを取得する方法は?

私たちはMysqlからCassandraにデータをロードするためにSparkを使用していますが、これは非常に巨大です(例:〜200 GBおよび600M行)。タスクが完了したら、正確に処理を開始した行の数を確認したいのですか? Spark UIから数値を取得することはできますが、sparkシェルまたはspark-submitジョブからその数値(「Output Records Written」)をどのように取得できますか?

サンプルMysqlからCassandraにロードするコマンド。

val pt = sqlcontext.read.format("jdbc").option("url", "jdbc:mysql://...:3306/...").option("driver", "com.mysql.jdbc.Driver").option("dbtable", "payment_types").option("user", "hadoop").option("password", "...").load() 

pt.save("org.apache.spark.sql.cassandra",SaveMode.Overwrite,options = Map("table" -> "payment_types", "keyspace" -> "test")) 

上記のタスクのすべてのSpark UIメトリックを取得する必要があります。出力サイズと記録されたレコード。

助けてください。

ありがとうございました!

答えて

3

答えを見つけました。 SparkListenerを使用して統計情報を取得できます。

ジョブに入力または出力メトリックがない場合、stmtを指定することで安全に無視できるNone.get例外が発生することがあります。

sc.addSparkListener(new SparkListener() { 
    override def onTaskEnd(taskEnd: SparkListenerTaskEnd) { 
    val metrics = taskEnd.taskMetrics 
    if(metrics.inputMetrics != None){ 
     inputRecords += metrics.inputMetrics.get.recordsRead} 
    if(metrics.outputMetrics != None){ 
     outputWritten += metrics.outputMetrics.get.recordsWritten } 
    } 
}) 

以下の例を見てください。

import org.apache.spark.SparkContext 
import org.apache.spark.SparkConf 
import com.datastax.spark.connector._ 
import org.apache.spark.sql._ 
import org.apache.spark.storage.StorageLevel 
import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskEnd} 

val conf = new SparkConf() 
.set("spark.cassandra.connection.host", "...") 
.set("spark.driver.allowMultipleContexts","true") 
.set("spark.master","spark://....:7077") 
.set("spark.driver.memory","1g") 
.set("spark.executor.memory","10g") 
.set("spark.shuffle.spill","true") 
.set("spark.shuffle.memoryFraction","0.2") 
.setAppName("CassandraTest") 
sc.stop 
val sc = new SparkContext(conf) 
val sqlcontext = new org.apache.spark.sql.SQLContext(sc) 

var outputWritten = 0L 

sc.addSparkListener(new SparkListener() { 
    override def onTaskEnd(taskEnd: SparkListenerTaskEnd) { 
    val metrics = taskEnd.taskMetrics 
    if(metrics.inputMetrics != None){ 
     inputRecords += metrics.inputMetrics.get.recordsRead} 
    if(metrics.outputMetrics != None){ 
     outputWritten += metrics.outputMetrics.get.recordsWritten } 
    } 
}) 

val bp = sqlcontext.read.format("jdbc").option("url", "jdbc:mysql://...:3306/...").option("driver", "com.mysql.jdbc.Driver").option("dbtable", "bucks_payments").option("partitionColumn","id").option("lowerBound","1").option("upperBound","14596").option("numPartitions","10").option("fetchSize","100000").option("user", "hadoop").option("password", "...").load() 
bp.save("org.apache.spark.sql.cassandra",SaveMode.Overwrite,options = Map("table" -> "bucks_payments", "keyspace" -> "test")) 

println("outputWritten",outputWritten) 

結果:

scala> println("outputWritten",outputWritten) 
(outputWritten,16383) 
関連する問題