2017-04-13 15 views
1

私はSpark Streamingジョブを実行するためにSpark 2.1とKafka 0.08.xxを使用しています。これはテキストフィルタリングジョブであり、処理中にテキストの大部分が除外されます。私は最初の方法を発見したSpark Streaming + KafkaでforeachRDDが遅いのはなぜですか?

val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics) 
val jsonMsg = messages.map(_._2) 
val filteredMsg = jsonMsg.filter(x=>x.contains(TEXT1) && x.contains(TEXT2) && x.contains(TEXT3)) 
  • がforeachRDD機能

    messages.foreachRDD { rdd => 
          val record = rdd.map(_.2).filter(x => x.contains(TEXT1) && 
                   x.contains(TEXT2) && 
                   x.contains(TEXT3))} 
    
  • を使用します。

    1. がDirectStreamの出力に直接フィルタリングを行います。私は2つの異なる方法で実装しました2番目の方法よりもはるかに高速ですが、これが一般的なケースであるかどうかはわかりません。

      方法1と方法2に違いはありますか?

    答えて

    1

    filterは変形です。変換は遅れて評価されます。つまり、foreachRDDなどのアクションを実行してデータなどを書き込むまで何もしません。

    したがって、実際には何も起こっていないため、これは何かを行うにはforeachRDDアクションを使用しています。

    関連する問題