私はspark 2.1
をyarn
クラスタに使用しています。 I私は(私はhttps://github.com/mongodb/mongo-hadoop/wiki/Spark-Usageを介して取得することdatabases
mongo
異なるに対応しているが、私はそれが重要だとは思わない、念のためにそれを言及)他のRDD
Sに基づいて完成したいデータが含まれているRDD
を持ってデータに応じてRDDに基づいてRDDを完成
私の問題は、データにはdatabase
が含まれているため、データを完成するために使用するRDD
がデータそのものに依存するということです。ここで私は何をすべきかを簡略化しexempleです:
/*
* The RDD which needs information from databases
*/
val RDDtoDevelop = sc.parallelize(Array(
Map("dbName" -> "A", "id" -> "id1", "other data" -> "some data"),
Map("dbName" -> "C", "id" -> "id6", "other data" -> "some other data"),
Map("dbName" -> "A", "id" -> "id8", "other data" -> "some other other data")))
.cache()
/*
* Artificial databases for the exemple. Actually, mongo-hadoop is used. https://github.com/mongodb/mongo-hadoop/wiki/Spark-Usage
* This means that generate these RDDs COSTS so we don't want to generate all possible RDDs but only needed ones
*/
val A = sc.parallelize(Array(
Map("id" -> "id1", "data" -> "data1"),
Map("id" -> "id8", "data" -> "data8")
))
val B = sc.parallelize(Array(
Map("id" -> "id1", "data" -> "data1bis"),
Map("id" -> "id5", "data" -> "data5")
))
val C = sc.parallelize(Array(
Map("id" -> "id2", "data" -> "data2"),
Map("id" -> "id6", "data" -> "data6")
))
val generateRDDfromdbName = Map("A" -> A, "B" -> B, "C" -> C)
と希望出力は次のとおりです。
Map(dbName -> A, id -> id8, other data -> some other other data, new data -> data8)
Map(dbName -> A, id -> id1, other data -> some data, new data -> data1)
Map(dbName -> C, id -> id6, other data -> some other data, new data -> data6)
ネストされたRDD
のは不可能なので、私が使用するための最良の方法を見つけるしたいと思います可能な限り私はSpark
のパラレルライズのためにできます。私は2つのソリューションについて考えました。
最初に、必要なデータベースの内容を含むコレクションを作成し、RDD
に変換してRDD
のスケーラビリティを得てください(コレクションがdriver memory
に収まらない場合は、複数回実行できます)。最後にjoin
とfilter
の内容をid
にしてください。
第二には、すべての必要なdatabases
からRDD
Sを取得dbname
とid
によってそれらをキー入力してからjoin
を行うことです。
ソリューション1
// Get all needed DB
val dbList = RDDtoDevelop.map(map => map("dbName")).distinct().collect()
// Fill a list with key value pairs as (dbName,db content)
var dbContents = List[(String,Array[Map[String,String]])]()
dbList.foreach(dbName => dbContents = (dbName,generateRDDfromdbName(dbName).collect()) :: dbContents)
// Generate a RDD from this list to benefit to advantages of RDD
val RDDdbs = sc.parallelize(dbContents)
// Key the initial RDD by dbName and join with the contents of dbs
val joinedRDD = RDDtoDevelop.keyBy(map => map("dbName")).join(RDDdbs)
// Check for matched ids between RDD data to develop and dbContents
val result = joinedRDD.map({ case (s,(maptoDeveleop,content)) => maptoDeveleop + ("new data" -> content.find(mapContent => mapContent("id") == maptoDeveleop("id")).get("data"))})
ソリューション2
val dbList = RDDtoDevelop.map(map => map("dbName")).distinct().collect()
// Create the list of the database RDDs keyed by (dbName, id)
var dbRDDList = List[RDD[((String,String),Map[String,String])]]()
dbList.foreach(dbName => dbRDDList = generateRDDfromdbName(dbName).keyBy(map => (dbName,map("id"))) :: dbRDDList)
// Create a RDD containing all dbRDD
val RDDdbs = sc.union(dbRDDList)
// Join the initial RDD based on the key with the dbRDDs
val joinedRDD = RDDtoDevelop.keyBy(map => (map("dbName"), map("id"))).join(RDDdbs)
// Reformate the result
val result = joinedRDD.map({ case ((dbName,id),(maptoDevelop,dbmap)) => maptoDevelop + ("new data" -> dbmap("data"))})
それらの両方が欲しかった出力を与える:ここで
はscala
コードです。 db
とid
の一致がSpark
というパラレル化を使用しているので、私の考えでは、2番目の方が良いと思われますが、私はそれについては分かりません。あなたは私に最高のものを選ぶか、より良いものを選ぶことを助けてくれますか、私に鉱山よりも良い解決策の手がかりを与えてください。
他のコメントもありがとうございます(これは私の最初の質問です)。事前による
おかげで、
マット
? – stefanobaghino
@stefanobaghino現時点ではまだ生産されていないが、いくつかのGbに対処することを目的としています。 – Matt
データセットの実際のサイズ、クラスタ内のコンピューティングパワーと帯域幅によっては、必要なデータを収集してブロードキャストします。私は偶然にもかかわらずブロードキャストGBを持っていました(私たちは、ノードを接続する10Gbpsの回線を持つクラスタ上のブロードキャストでTBをフィルタリングすることを話しています)。また、Sparkが本当に必要なものかどうかを確認することをお勧めします。特に、データセット全体が10 GB未満の場合は、商品サーバーの把握が必要です。 – stefanobaghino