2016-05-04 7 views
1

これは私のjsonデータです。これをkafkaトピックに送信し、spark rdd、 を読み込み、cassandraに保存します。kafkaトピックからカッサンドラのテーブルに、スパークストリーミングを使用してjson全体を保存する方法

[{ 
"sensor": "swapSensor", 
"sendtime": "2016-09-15T11:05:01.000Z", 
"data": [{ 
"@context": "Context" 
     }] 
}] 

これは私がテーブルの列entireJsonに全体JSON(生)データをプッシュしたかった私のカサンドラのテーブル CREATE TABLE IF NOT EXISTS event(sensor text,sendTime text,count bigint,entireJson text, PRIMARY KEY ((sensor)));

です。

これは私のコードです。

object StreamingData { 

    var count = 1 


    def main(args: Array[String]) { 

    val Array(brokers, topics, cassandraHost) = Array("1.11.22.50:9092", "c", "localhost") 


    def createSparkContext(): StreamingContext = { 

     val conf = new SparkConf() 
     .setAppName("c Events Processing") 
     .setMaster("local[2]") 
     .set("spark.cassandra.connection.host", cassandraHost) 
     .set("spark.cassandra.connection.keep_alive_ms", "60000") // prevent cassandra connection from being closed after every write 

     val sc = new SparkContext(conf) 
     // Create the context 
     val ssc = new StreamingContext(sc, Seconds(8)) 
     val sqlContext = new SQLContext(sc); 

     // Kafka stream 
     val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers) 
     val topicsSet = topics.split(",").toSet 
     val cEvents = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicsSet).map(_._2) 

     cEvents.foreachRDD { rdd => 
     count = count + 1 
     sqlContext.read.json(rdd).registerTempTable("eventTable") 

     val eventdf1 = sqlContext.sql("SELECT * FROM eventTable") 


     eventdf1.collect.foreach(println) 

     val eventdf = sqlContext.sql("SELECT sensor, sendtime,data.actor FROM eventTable") 
     eventdf.printSchema() 
     eventdf.map { 
      case (r) => (r.getString(0) + count, sendtime, count,eventdf1) 
     } 
      .saveToCassandra("c", "event", SomeColumns("sensor", "sendtime", "count","entireJson")) 

     } 


     ssc 

    } 


    } 
+1

あなたの質問がありますか? – tesnik03

答えて

0

私はこれを試してみて、私のCassandras Table Columnにrawdataを保存しています。

var rawdata = "" 
     for (item <- rdd.collect().toArray) { 
      System.out.println(item); 
      rawdata = item 
     } 
関連する問題