Mongo DBコレクションからドキュメントを読み込んで処理し、更新されたコレクションをMongo DBの新しいコレクションに書き戻したいと思います。MongoSpark Connectorを使用してMongo DBドキュメントをロードする
MongoSparkを使用して読み込むには、次のコードを使用しています。
SparkSession spark = SparkSession.builder()
.master("local")
.appName("MongoSparkConnectorIntro")
.config("spark.mongodb.input.uri", "mongodb://127.0.0.1/test.myCollection1")
.config("spark.mongodb.output.uri", "mongodb://127.0.0.1/test.myCollection2")
.getOrCreate();
JavaSparkContext jsc = new JavaSparkContext(spark.sparkContext());
JavaMongoRDD<Document> rdd = MongoSpark.load(jsc);
ドキュメントがJavaMongoRDDのインスタンスにロードされると、私はそれらを処理したい(ドキュメントを選択し、[更新)して、最終的には別のコレクションにドキュメントを書き込みます。
'rdd'インスタンスで必要に応じて変換を処理/適用する正しい方法がわかりません。最後に、更新されたドキュメントをMongo DBターゲットコレクションに書き込みます。
MongoSpark.save(rdd);
誰かが私がコレクションをターゲットに書き込む前に、モンゴDBコレクションからロードされたデータを処理するためにはMongoスパーク/スパークAPIを使用することができますどのように私を助けることができます。
私はこれにmongo-spark-connector_2.11とspark-core_2.11を使用しています。あなたはJavaRDD
// Loading and analyzing data from MongoDB
JavaMongoRDD<Document> rdd = MongoSpark.load(jsc);
System.out.println(rdd.count());
System.out.println(rdd.first().toJson());
保存RDDデータにはMongoDBから読みやすいMongoSpark#load
にJavaSparkContext
またはSQLContext
を渡すことができRDDSとして