1
私はSpark Streamingジョブを実行するためにSpark 2.1とKafka 0.08.xxを使用しています。これはテキストフィルタリングジョブであり、処理中にテキストの大部分が除外されます。私は最初の方法を発見したSpark Streaming + KafkaでforeachRDDが遅いのはなぜですか?
val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics)
val jsonMsg = messages.map(_._2)
val filteredMsg = jsonMsg.filter(x=>x.contains(TEXT1) && x.contains(TEXT2) && x.contains(TEXT3))
がforeachRDD機能
messages.foreachRDD { rdd =>
val record = rdd.map(_.2).filter(x => x.contains(TEXT1) &&
x.contains(TEXT2) &&
x.contains(TEXT3))}
を使用します。
がDirectStreamの出力に直接フィルタリングを行います。私は2つの異なる方法で実装しました2番目の方法よりもはるかに高速ですが、これが一般的なケースであるかどうかはわかりません。
方法1と方法2に違いはありますか?