2016-03-27 1 views
0

mongo-hadoopコネクタを使用してデータをsparkに読み込もうとしています。 問題は、読み込みデータに関する制限を設定しようとしている場合、RDDにパーティションの数*制限があることです。MongoHadoop Sparkで使用されるコネクタは、パーティション数で結果を複製します。

mongodbConfig.set("mongo.job.input.format","com.mongodb.hadoop.MongoInputFormat"); 
mongodbConfig.set("mongo.input.uri", "mongodb://localhost:27017/test.restaurants"); 
mongodbConfig.set("mongo.input.limit","3"); 
JavaPairRDD<Object, BSONObject> documents = sc.newAPIHadoopRDD(
      mongodbConfig,   // Configuration 
      MongoInputFormat.class, // InputFormat: read from a live cluster. 
      Object.class,    // Key class 
      BSONObject.class   // Value class 
    ); 

    long count = documents.count(); 
    System.out.println("Collection Count: " + count); 
    System.out.println("Partitions: " + documents.partitions().size()); 

//9 elements in the RDD = limit * nrOfPartions = 3 * 3 
//3 partitions 

この動作は他の制限(私はいつも制限* 3になる)で再現可能です。

私はobjectIdでクエリを実行しようとすると同じような動作をします(同じオブジェクト*番号のパーティションを持つRDDが作成されます - 同じドキュメントの場合は3つの要素)。

また、mongoコレクションを作成するためのスクリプトを用意しておくと便利です。

答えて

1

これはバグではなく機能です。 mongo.input.limitは、MongoInputSplitのためにlimit parameterを設定するために使用されるため、パーティション単位でグローバルに適用されません。

一般に、フェッチされたレコードの数をグローバルに制限することはできません(または正確には実用的ではありません)。各分割は独立して処理され、典型的には、各分割から得られるレコード数についての先験的な知識はない。

関連する問題