Sparkの保存操作で書き込まれる行の数を知る方法があるのだろうかと思います。私は、それを書く前にRDDを数えるだけで十分だと知っていますが、それをしないと同じ情報を持つ方法があるかどうかを知りたいと思います。 スパーク:書いた行の数を取得するには?
は、 マルコ答えて
を行うことができますあなたが本当に望むのであれば、カスタムリスナーを追加し、書いた行の数をから抽出することができます。非常に単純な例は、次のようになります。
import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskEnd}
var recordsWrittenCount = 0L
sc.addSparkListener(new SparkListener() {
override def onTaskEnd(taskEnd: SparkListenerTaskEnd) {
synchronized {
recordsWrittenCount += taskEnd.taskMetrics.outputMetrics.recordsWritten
}
}
})
sc.parallelize(1 to 10, 2).saveAsTextFile("/tmp/foobar")
recordsWrittenCount
// Long = 10
この部分は、内部使用を目的としています。
(様々なコメントで明示的に作ったとして)受け入れ答えは、より密接OPは特定のニーズに合った、ありがとうございます、それにもかかわらず、この答えが過半数を合わせます。
最も効率的なアプローチは、アキュムレータを使用することです:http://spark.apache.org/docs/latest/programming-guide.html#accumulators
val accum = sc.accumulator(0L)
data.map { x =>
accum += 1
x
}
.saveAsTextFile(path)
val count = accum.value
あなたはその後、便利なポン引きでこれをラップすることができます:
implicit class PimpedStringRDD(rdd: RDD[String]) {
def saveAsTextFileAndCount(p: String): Long = {
val accum = rdd.sparkContext.accumulator(0L)
rdd.map { x =>
accum += 1
x
}
.saveAsTextFile(p)
accum.value
}
}
ですから、
val count = data.saveAsTextFileAndCount(path)
私はこのようなアプローチを知っていますが、私は2つの主な理由からそれを避けたいと思います:変換で使用すると、何らかの失敗の場合に結果を信頼できないということです。とにかく(少しの)オーバーヘッドがあります。 Web UIに書かれた行の数が表示されているので、mapreduceにあるように、どこかにアクセス可能なカウンタがあるかどうか疑問に思っていました... – mgaido
私は不思議に思っています内部のカウンタがない場合、どのようにWeb UIにこの情報を表示することができますか? – mgaido
@ mark91ああ、まあ、あなたはUIコードをクローンし、私が推測することができます。ドキュメントを読んで、私が与えたコードはうまくいきます。 (スパークは、それが再起動されたタスクから保護すると言います)。あなたが保護したいのは、RDDが複数回変換されたときですが、rddに与えたコードはPimpsスコープの外ではアクセスできません。それは執筆の前に蓄積され、一度蓄積するだけです。 – samthebest
あなたは
taskEnd.taskInfo.accumulables
を見ればあなたはそれが順番にListBuffer
でAccumulableInfo
を以下にバンドルされていることがわかります。あなたは明らかに出力行の数がlistBufferの7位にあるので、正しい方法は、行がカウントは我々が行が書かれて取得することができ
taskEnd.taskInfo.accumulables(6).value.get
で書かれて取得する見ることができます
AccumulableInfo(1,Some(internal.metrics.executorDeserializeTime),Some(33),Some(33),true,true,None),
AccumulableInfo(2,Some(internal.metrics.executorDeserializeCpuTime),Some(32067956),Some(32067956),true,true,None), AccumulableInfo(3,Some(internal.metrics.executorRunTime),Some(325),Some(325),true,true,None),
AccumulableInfo(4,Some(internal.metrics.executorCpuTime),Some(320581946),Some(320581946),true,true,None),
AccumulableInfo(5,Some(internal.metrics.resultSize),Some(1459),Some(1459),true,true,None),
AccumulableInfo(7,Some(internal.metrics.resultSerializationTime),Some(1),Some(1),true,true,None),
AccumulableInfo(0,Some(number of output rows),Some(3),Some(3),true,true,Some(sql)
以下の方法により、(私はちょうどzero323の答え@に変更)
import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskEnd}
var recordsWrittenCount = 0L
sc.addSparkListener(new SparkListener() {
override def onTaskEnd(taskEnd: SparkListenerTaskEnd) {
synchronized {
recordsWrittenCount += taskEnd.taskInfo.accumulables(6).value.get.asInstanceOf[Long]
}
}
})
sc.parallelize(1 to 10, 2).saveAsTextFile("/tmp/foobar")
recordsWrittenCount
- 1. スパーク:プログラムでクラスタコアの数を取得
- 2. スパークK平均クラスタメンバーを取得する
- 3. 与えられたクエリの行数を取得するには?
- 4. 削除された行数を取得するには?
- 5. 辞書に複数行のxmlデータを取得
- 6. 行列を得るための関数にforループを書く
- 7. ビューによって取得された行の数を取得する
- 8. スカラ - スパークDATAFRAMEは - 私はスパークを持っている変数
- 9. フィルタを満たす値のインデックスを取得するにはどうすればよいですか?スパークで
- 10. テキストフィールドの高さまたは行数を取得する
- 11. スカラ - DATAFRAMEでスパークは、行のために、とカラム名を取得し、私はDATAFRAME持っ
- 12. "C"書かれたフレームワークのヘッダーを取得するには?
- 13. 隣人の行を取得するためにSQLクエリを書く
- 14. 過去にない行の数を取得する方法
- 15. テーブルの行数を取得するC#
- 16. UItextViewの行数を取得する
- 17. データベーステーブルの行数を取得するjavascript
- 18. TextBoxのテキスト行数を取得する
- 19. 先行ゼロの数を取得する
- 20. テーブル内の行数を取得する
- 21. テキストファイルの行数を取得するR
- 22. HDFSに最初の5行を書くスパーク
- 23. 文書全体を取得したい
- 24. スパークがハイブに書き込まない
- 25. スパーク - スパーク・ジョブに割り当てられる実行者とコアの数
- 26. 私が書いた__global__関数からcuFunctionを取得するにはどうすればよいですか?
- 27. 複数行のCEditコントロールからテキストを取得するには?
- 28. 紺碧のデータベースから行数を取得するには?
- 29. matlabの行数を取得するには?
- 30. SQLの行数を取得するには
そのはhttp://stackoverflow.com/questions/28413423/count-number-of-rows-in-an-rdd –
@amit_kumarの重複している可能性があり私は私がないと思う私は彼がそれを数え、データを2回渡すことなく保存したいと思うと思います。 – samthebest