mongo-hadoopコネクタを使用してデータをsparkに読み込もうとしています。 問題は、読み込みデータに関する制限を設定しようとしている場合、RDDにパーティションの数*制限があることです。MongoHadoop Sparkで使用されるコネクタは、パーティション数で結果を複製します。
mongodbConfig.set("mongo.job.input.format","com.mongodb.hadoop.MongoInputFormat");
mongodbConfig.set("mongo.input.uri", "mongodb://localhost:27017/test.restaurants");
mongodbConfig.set("mongo.input.limit","3");
JavaPairRDD<Object, BSONObject> documents = sc.newAPIHadoopRDD(
mongodbConfig, // Configuration
MongoInputFormat.class, // InputFormat: read from a live cluster.
Object.class, // Key class
BSONObject.class // Value class
);
long count = documents.count();
System.out.println("Collection Count: " + count);
System.out.println("Partitions: " + documents.partitions().size());
//9 elements in the RDD = limit * nrOfPartions = 3 * 3
//3 partitions
この動作は他の制限(私はいつも制限* 3になる)で再現可能です。
私はobjectIdでクエリを実行しようとすると同じような動作をします(同じオブジェクト*番号のパーティションを持つRDDが作成されます - 同じドキュメントの場合は3つの要素)。
また、mongoコレクションを作成するためのスクリプトを用意しておくと便利です。