2016-12-17 3 views
0

ここに私の単純化されたApache Spark Streamingコードがあります。これはKafkaストリーム経由で入力し、結合してプリントし、ファイルに保存します。しかし、今私はMongoDBに入ってくるデータの流れを保存したい。Scala SparkストリーミングデータをMongoDBに保存

val conf = new SparkConf().setMaster("local[*]") 
          .setAppName("StreamingDataToMongoDB") 
          .set("spark.streaming.concurrentJobs", "2") 

val sc = new SparkContext(conf) 
val sqlContext = new SQLContext(sc) 
val ssc = new StreamingContext(sc, Seconds(1)) 

val kafkaParams = Map("metadata.broker.list" -> "localhost:9092") 
val topicName1 = List("KafkaSimple").toSet 
val topicName2 = List("SimpleKafka").toSet 

val stream1 = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicName1) 
val stream2 = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicName2) 

val lines1 = stream1.map(_._2) 
val lines2 = stream2.map(_._2) 

val allThelines = lines1.union(lines2) 
allThelines.print() 
allThelines.repartition(1).saveAsTextFiles("File", "AllTheLinesCombined") 

私はStratio Spark-MongoDBライブラリとその他のリソースを試しましたが、まだ成功していません。誰か助けてください、私はいくつかの役に立つ作業リソース/チュートリアルに進んでください。乾杯:)

+0

あなたはどんなエラーが表示されますか? –

答えて

0

あなたが直接あなたが各バッチ一つずつモンゴためRDDベースのAPIを使用して書き出すforeachRDDを使用することができますDStreamsでサポートされていない形式に書き出したい場合。

0
lines1.foreachRDD (rdd => { 
     rdd.foreach(data => 
     if (data != null) { 

      // Save data here 

     } else { 

      println("Got no data in this window") 

     } 
    ) 
    }) 

lines2と同じです。

+0

OPのスニペットはどこでも 'lines'を使いません。あなたはあなたの答えを更新していただけますか? – 2ps

関連する問題