2016-09-01 9 views
0

スパンクストリーミングデータをmongodbにどのように格納できますか?私はMongoDBのではなく、Scalaの中のデータを保存したいスパークRDDをmongodbに保存

data.foreachRDD(
    new Function<JavaRDD<String>, Void>() { 
     Mongo mongo = new Mongo("localhost", 27017); 
     DB db = mongo.getDB("mongodb"); 

     DBCollection collection = db.getCollection("fb"); 

     public Void call(JavaRDD<String> data) throws Exception { 
      if(data!=null){ 
       List<String>result=data.collect();  
       for (String temp :result) { 
        System.out.println(temp); 
        DBObject dbObject = (DBObject)JSON.parse(temp.toString()); 
        collection.insert(dbObject); 
       } 
       System.out.println("Inserted Data Done"); 
      } else { 
       System.out.println("Got no data in this window"); 
      } 
      return null; 
     }  
    } 
); 

はJavaでこれはのように行われます。上記のコードはjavaにあります。

+0

scalaはjavaと同じコードを使用できます。私はこれが難しくないと信じています。 –

答えて

1
//remove if not needed 

import scala.collection.JavaConversions._ 

data.foreachRDD(new Function[JavaRDD[String], Void]() { 

     var mongo: Mongo = new Mongo("localhost", 27017) 

     var db: DB = mongo.getDB("mongodb") 

     var collection: DBCollection = db.getCollection("fb") 

     def call(data: JavaRDD[String]): Void = { 
     if (data != null) { 
      val result = data.collect() 
      for (temp <- result) { 
      println(temp) 
      val dbObject = JSON.parse(temp.toString).asInstanceOf[DBObject] 
      collection.insert(dbObject) 
      } 
      println("Inserted Data Done") 
     } else { 
      println("Got no data in this window") 
     } 
     null 
     } 
}) 
+0

上記のコードを試し、必要に応じてJavaConversionsをインポートしてください。また、私は外部java mongoライブラリがあると仮定しています –

関連する問題