2017-05-04 9 views
1

私はテキストファイルを読み込むためにSparkを使用しています。各行は異なるケースクラスに属することができます。一度私はケースクラスで記述されたオブジェクトにラインを変換し、私はそれらをデータフレームに変換し、HDFS(寄木細工)に書き出します。私が持っている問題は、抽象型のRDDで終わることです。そして、toDF関数を適用するために、それを特定のcaseクラス型に制約する必要があります。タイプTのサブクラスにRDD [T]をフィルタリングする

次のようにこれまでのところ、私は私のログイベントを定義しました:

abstract class LogEvent 
final case class Web(datetime: String, ...) 
final case class OtherEvent(datetime: String ...) 

私はRDD [のLogEvent]作成するために、パターンマッチ機能に対して行をマッピングし、その後、私のテキストファイルに読んでいます:

def convertToCase(e: List[String]): LogEvent= e match { 
    case List(_, _, _, "WEB", _, _, _, _, _, _, _, _, _) => 
    Web(getDate(e.head), getTime(e.head), e(1), e(2), e(3), e(4), e(5), e(6), e(7), e(8), e(9), e(10)) 
    case List(_, _, _, "OTHEREVENT", _, _, _, _, _, _, _, _) => 
    OtherEvent(getDate(e.head), getTime(e.head), e(1), e(2), e(3), e(4), e(5), e(6), e(7), e(8), e(9), e(10)) 
} 

この時点で、私は与えられたケースクラスに拘束し、Spark dataframeに変換したいと思います。私は、集合{ウェブ、OtherEvent}であり得る型TのRDDにダウンRDD【のLogEvent]を減らしたい

val events = spark.read.textFile(...) 
    .map(_.split(',').toList) 
    .map(convertToCase) 

:のようなもの。これは私が苦労していることです。 caseクラスに制約を与える述語を持つフィルタを適用しても、LogEventから型を変更することはできません。これは、RDD [T]で呼び出される必要があるため、 'toDF()'を呼び出すことができないことを意味します。抽象クラスRDD [LogEvent]。

val webEvents = events.filter(someLogic).toDF() 

一般的なRDDを特定のケースクラスのRDDに減らす方法を探しています。私はisInstanceOfまたはasInstanceOfを使用しないで型の安全性を維持しながらこれを達成しようとしています。

これには簡単な解決策がありますか?あるいは私は間違った方法で問題に近づいていますか?

ありがとうございます。

val webEvents = events.collect{ 
    case w: Web => w 
}.toDF() 

collectmapfilterのミックスです::入力が一致した場合

答えて

3

あなたは(collect(): Array[T]ドライバに配列として結果を送信しますと混同しないcollect(f: PartialFunction[T, U]): RDD[U]メソッドを使用する必要がありますパターンマッチングで与えられたケースの1つで、部分関数によって与えられた値を出力します。さもなければ、それは単に入力を無視します(すなわちフィルターにかける)。

convertToCaseの場合は、定義したパターンマッチングが完全ではないため、実行時にエラーが発生する可能性があります(予期しないイベントまたは破損した行が発生した場合)。これを行うための正しい方法は

val convertToCase: PartialFunction[List[String], LogEvent] = { 
    case List(_, _, _, "WEB", _, _, _, _, _, _, _, _, _) => 
    Web(getDate(e.head), getTime(e.head), e(1), e(2), e(3), e(4), e(5), e(6), e(7), e(8), e(9), e(10)) 
    case List(_, _, _, "OTHEREVENT", _, _, _, _, _, _, _, _) => 
    OtherEvent(getDate(e.head), getTime(e.head), e(1), e(2), e(3), e(4), e(5), e(6), e(7), e(8), e(9), e(10)) 
} 

を定義するためにそしてcollect(convertToCase)map(convertToCase)を置き換えることであろう。

+0

ありがとうございます!私は収集を見つけたと私の問題を解決するだろうと思ったが、スパークと一緒に使用すると、これはドライバプログラムにすべてのイベントを返さないでしょうか?私は体内で指定すべきだったが、私はRDD実装のためにcollectを使うことができないと思う? – user3030878

+1

'collect()'と 'collect [U](pf:PartialFunction [T、U])'はRDD上の2つの非常に異なるメソッドです。メソッドに部分的な関数パラメータを指定している限り、安全です(そうでなければ、コンパイラは実際にドライバに収集すると仮定します)。 –

+0

パーフェクト - ありがとう – user3030878