私はテキストファイルを読み込むためにSparkを使用しています。各行は異なるケースクラスに属することができます。一度私はケースクラスで記述されたオブジェクトにラインを変換し、私はそれらをデータフレームに変換し、HDFS(寄木細工)に書き出します。私が持っている問題は、抽象型のRDDで終わることです。そして、toDF関数を適用するために、それを特定のcaseクラス型に制約する必要があります。タイプTのサブクラスにRDD [T]をフィルタリングする
次のようにこれまでのところ、私は私のログイベントを定義しました:
abstract class LogEvent
final case class Web(datetime: String, ...)
final case class OtherEvent(datetime: String ...)
私はRDD [のLogEvent]作成するために、パターンマッチ機能に対して行をマッピングし、その後、私のテキストファイルに読んでいます:
をdef convertToCase(e: List[String]): LogEvent= e match {
case List(_, _, _, "WEB", _, _, _, _, _, _, _, _, _) =>
Web(getDate(e.head), getTime(e.head), e(1), e(2), e(3), e(4), e(5), e(6), e(7), e(8), e(9), e(10))
case List(_, _, _, "OTHEREVENT", _, _, _, _, _, _, _, _) =>
OtherEvent(getDate(e.head), getTime(e.head), e(1), e(2), e(3), e(4), e(5), e(6), e(7), e(8), e(9), e(10))
}
この時点で、私は与えられたケースクラスに拘束し、Spark dataframeに変換したいと思います。私は、集合{ウェブ、OtherEvent}であり得る型TのRDDにダウンRDD【のLogEvent]を減らしたい
val events = spark.read.textFile(...)
.map(_.split(',').toList)
.map(convertToCase)
:のようなもの。これは私が苦労していることです。 caseクラスに制約を与える述語を持つフィルタを適用しても、LogEventから型を変更することはできません。これは、RDD [T]で呼び出される必要があるため、 'toDF()'を呼び出すことができないことを意味します。抽象クラスRDD [LogEvent]。
val webEvents = events.filter(someLogic).toDF()
一般的なRDDを特定のケースクラスのRDDに減らす方法を探しています。私はisInstanceOfまたはasInstanceOfを使用しないで型の安全性を維持しながらこれを達成しようとしています。
これには簡単な解決策がありますか?あるいは私は間違った方法で問題に近づいていますか?
ありがとうございます。
val webEvents = events.collect{
case w: Web => w
}.toDF()
collect
がmap
とfilter
のミックスです::入力が一致した場合
ありがとうございます!私は収集を見つけたと私の問題を解決するだろうと思ったが、スパークと一緒に使用すると、これはドライバプログラムにすべてのイベントを返さないでしょうか?私は体内で指定すべきだったが、私はRDD実装のためにcollectを使うことができないと思う? – user3030878
'collect()'と 'collect [U](pf:PartialFunction [T、U])'はRDD上の2つの非常に異なるメソッドです。メソッドに部分的な関数パラメータを指定している限り、安全です(そうでなければ、コンパイラは実際にドライバに収集すると仮定します)。 –
パーフェクト - ありがとう – user3030878