2017-03-17 11 views
1

Mongo DBコレクションからドキュメントを読み込んで処理し、更新されたコレクションをMongo DBの新しいコレクションに書き戻したいと思います。MongoSpark Connectorを使用してMongo DBドキュメントをロードする

MongoSparkを使用して読み込むには、次のコードを使用しています。

SparkSession spark = SparkSession.builder() 
     .master("local") 
     .appName("MongoSparkConnectorIntro") 
     .config("spark.mongodb.input.uri", "mongodb://127.0.0.1/test.myCollection1") 
     .config("spark.mongodb.output.uri", "mongodb://127.0.0.1/test.myCollection2") 
     .getOrCreate(); 

    JavaSparkContext jsc = new JavaSparkContext(spark.sparkContext()); 

    JavaMongoRDD<Document> rdd = MongoSpark.load(jsc); 

ドキュメントがJavaMongoRDDのインスタンスにロードされると、私はそれらを処理したい(ドキュメントを選択し、[更新)して、最終的には別のコレクションにドキュメントを書き込みます。

'rdd'インスタンスで必要に応じて変換を処理/適用する正しい方法がわかりません。最後に、更新されたドキュメントをMongo DBターゲットコレクションに書き込みます。

MongoSpark.save(rdd); 

誰かが私がコレクションをターゲットに書き込む前に、モンゴDBコレクションからロードされたデータを処理するためにはMongoスパーク/スパークAPIを使用することができますどのように私を助けることができます。

私はこれにmongo-spark-connector_2.11とspark-core_2.11を使用しています。あなたはJavaRDD

// Loading and analyzing data from MongoDB 
JavaMongoRDD<Document> rdd = MongoSpark.load(jsc); 
System.out.println(rdd.count()); 
System.out.println(rdd.first().toJson()); 

保存RDDデータにはMongoDBから読みやすいMongoSpark#loadJavaSparkContextまたはSQLContextを渡すことができRDDSとして

答えて

1

のデータのロード

RDDデータをMongoDBに保存するには、型でなければなりませんBsonドキュメントに変換することができます。 Document(またはBsonDocument a DBObject)にデータを変換するためにmapステップを追加することができます。

JavaRDD<Document> documents = jsc.parallelize(asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)).map 
     (new Function<Integer, Document>() { 
    @Override 
    public Document call(final Integer i) throws Exception { 
     return Document.parse("{test: " + i + "}"); 
    } 
}); 

MongoSpark.save(documents); 
関連する問題