2017-10-31 15 views
0

spark-submitジョブ(Scalaで書かれた.JAR)の中に、既存のMongoDBにアクセスし、dbで新しいコレクションを作成し、インデックスを追加し、データを書き込むエグゼクティブの1,000人以上に配布されているRDDからコレクションまで。Scala RDDをScalaのインデックス付き新しいMongoDBコレクションに追加

私はこのすべてを行うことができるライブラリを見つけることができません。今、私はRDDから書き込むためにmongo-spark-connectorを使用しています。そして、私はインデックスを作成するためにcasbahを使います。 (scaladocはこのためである?)

のmongoスパークコネクタからhttps://docs.mongodb.com/spark-connector/current/scala-api/

カスバ - プロセスは次のようになりますhttp://mongodb.github.io/casbah/3.1/scaladoc/#package

...

  • RDD
  • を作成
  • RDDから新しいコレクションへの書き込み(mongo spark connectorを使用)
  • は(使用カスバ)

を書いた後、コレクションのインデックスを作成することになり、このアプローチ速度物事アップ?どのようにそれを達成するためのアイデア?

  • ここで私は私が今それについて移動する方法ですが、インデックス
  • ビルドRDDを作成し、それを

を行うためのライブラリを使用して、このコレクションに

  • を書く
  • 空のコレクションを作成します良い方法があると思われる。

    輸入

    // casbah - used to create index after new collection is created 
    import com.mongodb.casbah.Imports.{MongoClient,MongoCollection,MongoClientURI} 
    
    // mongo-spark-connector used to write to Mongo from Spark cluster (and create new collection in process) 
    import com.mongodb.spark.MongoSpark 
    import com.mongodb.spark.config.{WriteConfig,ReadConfig} 
    import org.bson.Document 
    

    接続情報

    object MyConnect { 
        // mongodb connect 
        val host  = "128.128.128.128" 
        val port  = 12345 
        val db   = "db" 
        val collection = "collection" 
        val user  = "user" 
        val password = "password" 
    
        // casbah - to create index 
        val casbah_db_uri = MongoClientURI(
        s"mongodb://${user}:${password}@${host}:${port}/${db}" 
    ) 
    
        // mongodb spark connector - to write from RDD 
        val collection_uri = s"mongodb://${user}:${password}@${host}:${port}/${db}.${collection}" 
        val writeConfig: WriteConfig = WriteConfig(Map("uri"->collection_uri)) 
    } 
    

    仕事を

    object sparkSubmit { 
    
        def main(args: Array[String]): Unit = { 
    
        // dummy dataset - RDD[(id, cnt)] 
        val rdd_dummy: RDD[(String, Int)] = ??? 
    
        // data as Mongo docs - as per mongo spark connector 
        val rdd_bson: RDD[Document] = { 
         rdd_dummy 
         .map(tup => s"""{"hex":"${tup._1}", "cnt":${tup._2}}""") 
         .map(str => Document.parse(str)) 
        } 
    
        // save to mongo/create new collection in process - using mongo spark connector 
        MongoSpark.save(rdd_bson, MyConnect.writeConfig) 
    
        // create index on new collection - using casbah 
        val new_table: MongoCollection = MongoClient(MyConnect.casbah_db_uri)(MyConnect.db)(MyConnect.collection) 
        new_table.createIndex("hex") 
        } 
    } 
    
  • 答えて

    1

    このアプローチは物事をスピードアップしますか?

    通常、すべてのデータベース(MongoDBを含む)では、索引作成操作にはコストがかかります。空のコレクションに索引を作成すると、(パー)挿入操作中に索引構築操作コストが発生します。すべての挿入後に索引を作成すると、索引作成コストが発生し、索引作成が完了するまでコレクションがロックされる可能性があります。

    ユースケースに応じてどちらかを選択できます。つまり、完了するとすぐにコレクションにアクセスする場合は、空のコレクションにインデックスを作成します。

    MongoDBには、フォアグラウンドとバックグラウンドという2つのインデックス構築操作タイプがあります。詳細については、MongoDB: Index Creationを参照してください。

    ここで、これはスカードですか?

    それにはscaladocはありませんが、しかし、javadocツールがあります:https://www.javadoc.io/doc/org.mongodb.spark/mongo-spark-connector_2.11/2.2.1

    のMongoDBスパークコネクタは下のMongoDBのJavaドライバーのjarファイルを使用しているためこれがあります。

    従来のScalaドライバ、Casbahを使用してインデックスを作成する代わりに、公式MongoDB Scala driverを使用するようにしてください。たとえば、Create An Indexです。

    collection.createIndex(ascending("i")) 
    
    関連する問題