2016-06-02 18 views
2

Sparkの新機能です。私は1.6.1で作業しています。 私は大きなファイルを持っているとしましょう、私はRDD [String]をtextFileに読み込んでいます。 次に、いくつかの関数で各行を検証したいと思います。 ファイルが膨大であるため、一定のエラーに達したときに処理を停止したいのですが、1000行としましょう。Apache Sparkで大量のテキストファイルを処理するのを中止します。

val rdd = sparkContext.textFile(fileName) rdd.map(line => myValidator.validate(line))

よう 何かがここで機能を検証します:

def validate(line:String) : (String, String) = { // 1st in Tuple for resulted line, 2nd ,say, for validation error. }

'検証' の内側に誤差を計算する方法?。実際には複数のノードで並列に実行されますか?ブロードキャスト?アキュムレータ?

答えて

2

構文解析の結果を成功と失敗に「分割」し、失敗時にtake(n)を呼び出し、成功したデータのみを使用してnの失敗があった場合のみ、この動作を達成できます。

これをより便利に行うには、validateの署名を変更して、成功と失敗を容易に区別できるタイプを返すことをおすすめします。 scala.util.Try

def validate(line:String) : Try[String] = { 
    // returns Success[String] on success, 
    // Failure (with details in the exception object) otherwise 
} 

そして、何かのように:

val maxFailures = 1000 
val rdd = sparkContext.textFile(fileName) 
val parsed: RDD[Try[String]] = rdd.map(line => myValidator.validate(line)).cache() 

val failures: Array[Throwable] = parsed.collect { case Failure(e) => e }.take(maxFailures) 

if (failures.size == maxFailures) { 
    // report failures... 
} else { 
    val success: RDD[String] = parsed.collect { case Success(s) => s } 
    // continue here... 
} 

なぜこの作品でしょうか?

  • 千の失敗少ないしがある場合は、データセット全体がtake(maxFailures)が呼び出されたときに、成功したデータは、1000の障害またはそれ以上がある場合は、解析が停止するキャッシュされ、
  • を使用できるようになります解析されますそこには、takeの操作でもう読み取りが不要になるため、
+0

うまくいく可能性があります。私が必要とするもののように見えます。ありがとう! – user677571

+0

DAGダイアグラムでファイルが2回読み取られる理由を教えてください。 – user677571

+0

あなたは正しいです - 2回現れますが、 'success' RDDを使う前に実行を中断しても(最初のジョブが実行された後で、2番目のジョブは実行されなかったことを意味します)、RDDがメモリに完全にキャッシュされている(UIの 'Storage'セクションで)。だから私はなぜこのステージをスキップしないのか分かりません。私は十分な大きさのファイルでそれを実行していませんでしたが、2番目の仕事ははるかに高速です - おそらく実際にはファイル全体をもう一度読むことはありません。 –

関連する問題