spark-submitジョブ(Scalaで書かれた.JAR)の中に、既存のMongoDBにアクセスし、dbで新しいコレクションを作成し、インデックスを追加し、データを書き込むエグゼクティブの1,000人以上に配布されているRDDからコレクションまで。Scala RDDをScalaのインデックス付き新しいMongoDBコレクションに追加
私はこのすべてを行うことができるライブラリを見つけることができません。今、私はRDDから書き込むためにmongo-spark-connectorを使用しています。そして、私はインデックスを作成するためにcasbahを使います。 (scaladocはこのためである?)
のmongoスパークコネクタからhttps://docs.mongodb.com/spark-connector/current/scala-api/
カスバ - プロセスは次のようになりますhttp://mongodb.github.io/casbah/3.1/scaladoc/#package
...
- RDD を作成
- RDDから新しいコレクションへの書き込み(mongo spark connectorを使用)
- は(使用カスバ)
を書いた後、コレクションのインデックスを作成することになり、このアプローチ速度物事アップ?どのようにそれを達成するためのアイデア?
- ここで私は私が今それについて移動する方法ですが、インデックス
- ビルドRDDを作成し、それを
を行うためのライブラリを使用して、このコレクションに
輸入
// casbah - used to create index after new collection is created
import com.mongodb.casbah.Imports.{MongoClient,MongoCollection,MongoClientURI}
// mongo-spark-connector used to write to Mongo from Spark cluster (and create new collection in process)
import com.mongodb.spark.MongoSpark
import com.mongodb.spark.config.{WriteConfig,ReadConfig}
import org.bson.Document
接続情報
object MyConnect {
// mongodb connect
val host = "128.128.128.128"
val port = 12345
val db = "db"
val collection = "collection"
val user = "user"
val password = "password"
// casbah - to create index
val casbah_db_uri = MongoClientURI(
s"mongodb://${user}:${password}@${host}:${port}/${db}"
)
// mongodb spark connector - to write from RDD
val collection_uri = s"mongodb://${user}:${password}@${host}:${port}/${db}.${collection}"
val writeConfig: WriteConfig = WriteConfig(Map("uri"->collection_uri))
}
仕事を
object sparkSubmit {
def main(args: Array[String]): Unit = {
// dummy dataset - RDD[(id, cnt)]
val rdd_dummy: RDD[(String, Int)] = ???
// data as Mongo docs - as per mongo spark connector
val rdd_bson: RDD[Document] = {
rdd_dummy
.map(tup => s"""{"hex":"${tup._1}", "cnt":${tup._2}}""")
.map(str => Document.parse(str))
}
// save to mongo/create new collection in process - using mongo spark connector
MongoSpark.save(rdd_bson, MyConnect.writeConfig)
// create index on new collection - using casbah
val new_table: MongoCollection = MongoClient(MyConnect.casbah_db_uri)(MyConnect.db)(MyConnect.collection)
new_table.createIndex("hex")
}
}