0
 val topics= "test" 
     val zkQuorum="localhost:2181" 
     val group="test-consumer-group"  
     val sparkConf = new org.apache.spark.SparkConf() 
      .setAppName("XXXXX") 
      .setMaster("local[*]") 
      .set("cassandra.connection.host", "127.0.0.1") 
      .set("cassandra.connection.port", "9042") 

     val ssc = new StreamingContext(sparkConf, Seconds(2)) 
     ssc.checkpoint("checkpoint") 
     val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap 

     val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(_._2) 

私はDSTREAMでJSONデータを取得しています。この上記のプログラムでは、このDstreamデータ(json)をcassandraに保存する方法は?

[{"id":100,"firstName":"Beulah","lastName":"Fleming","gender":"female","ethnicity":"SpEd","height":167,"address":27,"createdDate":1494489672243,"lastUpdatedDate":1494489672244,"isDeleted":0},{"id":101,"firstName":"Traci","lastName":"Summers","gender":"female","ethnicity":"Frp","height":181,"address":544,"createdDate":1494510639611,"lastUpdatedDate":1494510639611,"isDeleted":0}] 

ようDSTREAM(JSON)を取得しています。 このDstreamデータを処理してCassandraまたは弾性検索にどのように処理しますか?次に、DStream(json形式)からデータを取得し、Cassandraに保存する方法は?より多くの情報チェックドキュメントについて

答えて

0
あなたは、 com.datastax.spark.connector._をインポートし、適切なケースクラス

case class Record(id: String, firstName: String, ...) 
val colums = SomeColums("id", "first_name", ...) 
val mapped = lines.map(whateverDataYouHave => fuctionThatReutrnsARecordObject) 

にストリームの要素に変換し、暗黙の機能を使用して、それを保存する必要があり

saveToCassandra

mapped.saveToCassandra(KEYSPACE_NAME, TABLE_NAME, columns) 

https://github.com/datastax/spark-cassandra-connector/blob/master/doc/5_saving.md

関連する問題