2016-05-28 2 views
5

Sparkの保存操作で書き込まれる行の数を知る方法があるのだろうかと思います。私は、それを書く前にRDDを数えるだけで十分だと知っていますが、それをしないと同じ情報を持つ方法があるかどうかを知りたいと思います。 スパーク:書いた行の数を取得するには?

は、 マルコ

+0

そのはhttp://stackoverflow.com/questions/28413423/count-number-of-rows-in-an-rdd –

+2

@amit_kumarの重複している可能性があり私は私がないと思う私は彼がそれを数え、データを2回渡すことなく保存したいと思うと思います。 – samthebest

答えて

3

を行うことができますあなたが本当に望むのであれば、カスタムリスナーを追加し、書いた行の数をから抽出することができます。非常に単純な例は、次のようになります。

import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskEnd} 

var recordsWrittenCount = 0L 

sc.addSparkListener(new SparkListener() { 
    override def onTaskEnd(taskEnd: SparkListenerTaskEnd) { 
    synchronized { 
     recordsWrittenCount += taskEnd.taskMetrics.outputMetrics.recordsWritten 
    } 
    } 
}) 

sc.parallelize(1 to 10, 2).saveAsTextFile("/tmp/foobar") 
recordsWrittenCount 
// Long = 10 

この部分は、内部使用を目的としています。

+1

ありがとうございますが、Spark 1.5.2を使用しても動作しません。その代わりに、以下を行う必要があります: 'recordsWrittenCount + = taskEnd.taskMetrics.outputMetrics.get.recordsWritten' – mgaido

+0

これは内部APIなので、安定する保証はありません。 – zero323

+0

synchブロックの代わりにatomic recordsWrittenCountを使用する方が良いでしょうか? –

4

(様々なコメントで明示的に作ったとして)受け入れ答えは、より密接OPは特定のニーズに合った、ありがとうございます、それにもかかわらず、この答えが過半数を合わせます。

最も効率的なアプローチは、アキュムレータを使用することです:http://spark.apache.org/docs/latest/programming-guide.html#accumulators

val accum = sc.accumulator(0L) 

data.map { x => 
    accum += 1 
    x 
} 
.saveAsTextFile(path) 

val count = accum.value 

あなたはその後、便利なポン引きでこれをラップすることができます:

implicit class PimpedStringRDD(rdd: RDD[String]) { 
    def saveAsTextFileAndCount(p: String): Long = { 
    val accum = rdd.sparkContext.accumulator(0L) 

    rdd.map { x => 
     accum += 1 
     x 
    } 
    .saveAsTextFile(p) 

    accum.value 
    } 
} 

ですから、

val count = data.saveAsTextFileAndCount(path) 
+2

私はこのようなアプローチを知っていますが、私は2つの主な理由からそれを避けたいと思います:変換で使用すると、何らかの失敗の場合に結果を信頼できないということです。とにかく(少しの)オーバーヘッドがあります。 Web UIに書かれた行の数が表示されているので、mapreduceにあるように、どこかにアクセス可能なカウンタがあるかどうか疑問に思っていました... – mgaido

+0

私は不思議に思っています内部のカウンタがない場合、どのようにWeb UIにこの情報を表示することができますか? – mgaido

+0

@ mark91ああ、まあ、あなたはUIコードをクローンし、私が推測することができます。ドキュメントを読んで、私が与えたコードはうまくいきます。 (スパークは、それが再起動されたタスクから保護すると言います)。あなたが保護したいのは、RDDが複数回変換されたときですが、rddに与えたコードはPimpsスコープの外ではアクセスできません。それは執筆の前に蓄積され、一度蓄積するだけです。 – samthebest

0

あなたは

taskEnd.taskInfo.accumulables 

を見ればあなたはそれが順番にListBufferAccumulableInfoを以下にバンドルされていることがわかります。あなたは明らかに出力行の数がlistBufferの7位にあるので、正しい方法は、行がカウントは我々が行が書かれて取得することができ

taskEnd.taskInfo.accumulables(6).value.get 

で書かれて取得する見ることができます

AccumulableInfo(1,Some(internal.metrics.executorDeserializeTime),Some(33),Some(33),true,true,None), 
AccumulableInfo(2,Some(internal.metrics.executorDeserializeCpuTime),Some(32067956),Some(32067956),true,true,None), AccumulableInfo(3,Some(internal.metrics.executorRunTime),Some(325),Some(325),true,true,None), 
AccumulableInfo(4,Some(internal.metrics.executorCpuTime),Some(320581946),Some(320581946),true,true,None), 
AccumulableInfo(5,Some(internal.metrics.resultSize),Some(1459),Some(1459),true,true,None), 
AccumulableInfo(7,Some(internal.metrics.resultSerializationTime),Some(1),Some(1),true,true,None), 
AccumulableInfo(0,Some(number of output rows),Some(3),Some(3),true,true,Some(sql) 

以下の方法により、(私はちょうどzero323の答え@に変更)

import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskEnd} 

var recordsWrittenCount = 0L 

sc.addSparkListener(new SparkListener() { 
    override def onTaskEnd(taskEnd: SparkListenerTaskEnd) { 
    synchronized { 
     recordsWrittenCount += taskEnd.taskInfo.accumulables(6).value.get.asInstanceOf[Long] 
    } 
    } 
}) 

sc.parallelize(1 to 10, 2).saveAsTextFile("/tmp/foobar") 
recordsWrittenCount 
関連する問題