2017-08-24 5 views
1

spongジョブでMongoDBからデータを取得する必要があります。私はからmongo-spark-connector_2.11のspark mongoコネクタを使いました。 コードの下に書かれ、同じクエリがMongoのコンソールに秒、次に以下を要したのに対しspark mongo接続に時間がかかる

def createReadConfig(topic: String): ReadConfig = { 
    val user =UserId 
    val pass = Password 
    val host = Host 
    val db = Database 
    val coll = Collection 
    val partitioner = MongoPaginateBySizePartitioner 
    ReadConfig(Map("uri" -> ("mongodb://" + user + ":" + pass + "@" + host + "/" + 
    db), "database" -> db, "collection" -> coll, "partitioner" -> partitioner)) 
} 


val collectionRDD= MongoSpark.load(sc,admissionConfig) 

collectionRDD.filter(doc=>doc.getObjectId("_id")==new ObjectId("objectId")).count 

をテストするためにスパークシェルで実行これは、結果を与えるより、次に20秒を要しました。

なぜこのようなことが起こり、どのように速度の差異を減らすことができますか?

答えて

1

なぜこのようなことが起きていますか、どのように速度の差異を減らすことができますか?

差がRDD.filter()スパーク労働者へのMongoDBからデータをロードした後、filter操作を行う実行する、です。あなたのネットワーク、データサイズ、MongoDBサーバー、スパークソーカーの種類によっては、mongo shellでクエリを実行するのに比べて時間がかかることがあります。

次の例のように、これを活用するためにスパーク用のMongoDBコネクタのwithPipeline機能を利用することができる:

val rdd = MongoSpark.load(sc) 

val aggregatedRDD = rdd.withPipeline(Seq(Document.parse("{ $match: { '_id' : 'some id' } }"))) 

上記のデータをフィルタリングし、スパークする書類を渡す前にMongoDBで集計を実行することになります。これにより、MongoDBサーバーからSparkワーカーへのデータ転送が削減され、データベースインデックスを利用できるようになります。

も参照してください。MongoDB Spark Connector: Filters and Aggregation