2016-03-21 15 views
0

私は非常にシンプルな(n00b)質問をしていますが、どういうわけか私は立ち往生しています。私はwholeTextFilesで火花のファイルのセットを読み込み、RDD[LogEntry]を返そうとしています。LogEntryは単なるケースクラスです。私は有効なエントリのRDDで終わりたいと思うし、私は正規表現を使用して私のケースクラスのパラメータを抽出する必要があります。エントリが有効でないとき、私は抽出ロジックが失敗するのを望んでいませんが、ログにエントリを書き込むだけです。そのために私はLazyLoggingを使用します。パターンマッチングとRDD

object LogProcessors extends LazyLogging { 

    def extractLogs(sc: SparkContext, path: String, numPartitions: Int = 5): RDD[Option[CleaningLogEntry]] = { 

    val pattern = "<some pattern>".r 

    val logs = sc.wholeTextFiles(path, numPartitions) 
    val entries = logs.map(fileContent => { 
     val file = fileContent._1 
     val content = fileContent._2 
     content.split("\\r?\\n").map(line => line match { 
     case pattern(dt, ev, seq) => Some(LogEntry(<...>)) 
     case _ => logger.error(s"Cannot parse $file: $line"); None 
     }) 
    }) 

これは私にRDD[Array[Option[LogEntry]]]を与えます。 LogEntryのRDDで終わるすてきな方法はありますか?私は何とかそれを逃しています。

私は代わりにTryを使用することを考えていましたが、それがより良いかどうかはわかりません。

非常に感謝しています。

+0

をあなたはRDD [LongEntry] 'や'であることを最終 'RDD'の署名を探している' RDD [アレイ[ログ・エントリ]] 'または何か他の? –

答えて

2
  1. Arrayを取り除くために - 単にflatMapmapコマンドを置き換える - flatMapはタイプTの別々のレコードとして各レコードのタイプTraversable[T]の結果を扱います。

  2. Option - collectを取り除くには、成功したもののみ:entries.collect { case Some(entry) => entry }。 このcollect(p: PartialFunction)オーバーロード(これはmapと同等のものを実行し、filterを組み合わせたものです)は、すべてのデータをドライバに送信するcollect()とはまったく異なります。

要するに、これはのようになります。

def extractLogs(sc: SparkContext, path: String, numPartitions: Int = 5): RDD[CleaningLogEntry] = { 

    val pattern = "<some pattern>".r 

    val logs = sc.wholeTextFiles(path, numPartitions) 
    val entries = logs.flatMap(fileContent => { 
    val file = fileContent._1 
    val content = fileContent._2 
    content.split("\\r?\\n").map(line => line match { 
     case pattern(dt, ev, seq) => Some(LogEntry(<...>)) 
     case _ => logger.error(s"Cannot parse $file: $line"); None 
    }) 
    }) 

    entries.collect { case Some(entry) => entry } 
} 
+0

甘い、ありがとう!私はそれが単純だと分かっていましたが、どういうわけか私の脳は今日協力していません。 entries.collect()と(あなたのソリューションのように) 'entries.flatMap(e => e)'の違いはありますか? – Ian

+1

私は存在しないと思う - flatMapはOptionが暗黙的に0または1のレコードで 'Seq'に変換され、flatMapが存在すればこれらのレコードを取るだけなので、2つはまったく同じように機能するだろう。 –

関連する問題