0

ストリーミングデータiが異なるテーブルスパークストリーミングフィルタリング私はストリーミングデータをフィルタ処理しようとすると、id列の値に基づいています

にデータを保存したい私は2つのテーブル

  1. を持っていますtestTable_odd(ID、DATA1、DATA2)
  2. testTable_even(ID、DATA1)

id値が奇数である場合、値はその後もあるならば、私はtestTable_oddテーブルにレコードを保存すると、私はtestTable_evenにレコードを保存したい。

ここで難しい部分は私の2つのテーブルが異なる列を持っています。複数の方法を試しました。戻り値の型を持つScala関数とみなされました[obj1、obj2]ですが、成功することができませんでした。

import org.apache.spark.SparkConf 
import org.apache.spark.SparkContext 
import org.apache.spark.sql.SaveMode 
import org.apache.spark.streaming.Seconds 
import org.apache.spark.streaming.StreamingContext 
import org.apache.spark.streaming.kafka.KafkaUtils 
import com.datastax.spark.connector._ 

import kafka.serializer.StringDecoder 
import org.apache.spark.rdd.RDD 
import com.datastax.spark.connector.SomeColumns 
import java.util.Formatter.DateTime 

object StreamProcessor extends Serializable { 
    def main(args: Array[String]): Unit = { 
    val sparkConf = new SparkConf().setMaster("local[2]").setAppName("StreamProcessor") 
     .set("spark.cassandra.connection.host", "127.0.0.1") 

    val sc = new SparkContext(sparkConf) 

    val ssc = new StreamingContext(sc, Seconds(2)) 

    val sqlContext = new SQLContext(sc) 

    val kafkaParams = Map("metadata.broker.list" -> "localhost:9092") 

    val topics = args.toSet 

    val stream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
     ssc, kafkaParams, topics) 


     stream 
    .map { 
    case (_, msg) => 
     val result = msgParseMaster(msg) 
     (result.id, result.data) 
    }.foreachRDD(rdd => if (!rdd.isEmpty)  rdd.saveToCassandra("testKS","testTable",SomeColumns("id","data"))) 

     } 
    } 

    ssc.start() 
    ssc.awaitTermination() 

    } 

    import org.json4s._ 
    import org.json4s.native.JsonMethods._ 
    case class wordCount(id: Long, data1: String, data2: String) extends serializable 
    implicit val formats = DefaultFormats 
    def msgParseMaster(msg: String): wordCount = { 
    val m = parse(msg).extract[wordCount] 
    return m 

    } 

} 

答えて

0

以下の手順を実行しました。 1)生のJSON Stringとケースクラス で詳細を抽出しました2)フィルタ条件の両方に必要な詳細を持つsuper JSONを作成しました 3)JSONをDataFrameに変換しました 4)selectとwhere節をonそのJSON 5)カサンドラに保存

1

フィルタ機能を2回使用したいと思っています。私はあなたが私が分類との間でその値に基づいてタプルを保存したものではニアライン191を見下ろす場合、私は二回フィルタ機能を使用したプロジェクトhereにこれに似た何かをしたときは、

val evenstream = stream.map { case (_, msg) => 
    val result = msgParseMaster(msg) 
    (result.id, result.data) 
}.filter{ k => 
    k._1 % 2 == 0 
} 

evenstream.foreachRDD{rdd=> 
    //Do something with even stream 
} 

val oddstream = stream.map { case (_, msg) => 
    val result = msgParseMaster(msg) 
    (result.id, result.data) 
}.filter{ k => 
    k._1 % 2 == 1 
} 

oddstream.foreachRDD{rdd=> 
    //Do something with odd stream 
} 

ような何かを行うことができます0と1なので、それをチェックしてみてください。

+0

ありがとうございます。私は一度のフィルタリングでそれを別の方法で解決しました。 – Suresh

+0

ちょっとあなたのソリューションを投稿して、正しい答えとしてマークすることができますか?そして私はそれもまた見たいと思っています:P –

+0

確かにそれは今日行います。プロセスをリストするだけです。私は以下の手順を実行しました。 1)JSON StringとCaseクラスの詳細を抽出2)フィルタ条件の両方に必要な詳細を持つスーパーJSONを作成3)JSONをDataFrameに変換4)JSONでselectとwhere句を実行5)キャサンドラに保存 – Suresh

関連する問題