2017-05-19 4 views
1

私はspark 2.1yarnクラスタに使用しています。 I私は(私はhttps://github.com/mongodb/mongo-hadoop/wiki/Spark-Usageを介して取得することdatabasesmongo異なるに対応しているが、私はそれが重要だとは思わない、念のためにそれを言及)他のRDD Sに基づいて完成したいデータが含まれているRDDを持ってデータに応じてRDDに基づいてRDDを完成

私の問題は、データにはdatabaseが含まれているため、データを完成するために使用するRDDがデータそのものに依存するということです。ここで私は何をすべきかを簡略化しexempleです:

/* 
* The RDD which needs information from databases 
*/ 
val RDDtoDevelop = sc.parallelize(Array(
    Map("dbName" -> "A", "id" -> "id1", "other data" -> "some data"), 
    Map("dbName" -> "C", "id" -> "id6", "other data" -> "some other data"), 
    Map("dbName" -> "A", "id" -> "id8", "other data" -> "some other other data"))) 
    .cache() 

/* 
* Artificial databases for the exemple. Actually, mongo-hadoop is used. https://github.com/mongodb/mongo-hadoop/wiki/Spark-Usage 
* This means that generate these RDDs COSTS so we don't want to generate all possible RDDs but only needed ones 
*/ 
val A = sc.parallelize(Array(
    Map("id" -> "id1", "data" -> "data1"), 
    Map("id" -> "id8", "data" -> "data8") 
    )) 

val B = sc.parallelize(Array(
    Map("id" -> "id1", "data" -> "data1bis"), 
    Map("id" -> "id5", "data" -> "data5") 
    )) 

val C = sc.parallelize(Array(
    Map("id" -> "id2", "data" -> "data2"), 
    Map("id" -> "id6", "data" -> "data6") 
    )) 

val generateRDDfromdbName = Map("A" -> A, "B" -> B, "C" -> C) 

と希望出力は次のとおりです。

Map(dbName -> A, id -> id8, other data -> some other other data, new data -> data8) 
Map(dbName -> A, id -> id1, other data -> some data, new data -> data1) 
Map(dbName -> C, id -> id6, other data -> some other data, new data -> data6) 

ネストされたRDDのは不可能なので、私が使用するための最良の方法を見つけるしたいと思います可能な限り私はSparkのパラレルライズのためにできます。私は2つのソリューションについて考えました。

最初に、必要なデータベースの内容を含むコレクションを作成し、RDDに変換してRDDのスケーラビリティを得てください(コレクションがdriver memoryに収まらない場合は、複数回実行できます)。最後にjoinfilterの内容をidにしてください。

第二には、すべての必要なdatabasesからRDD Sを取得dbnameidによってそれらをキー入力してからjoinを行うことです。

ソリューション1

// Get all needed DB 
val dbList = RDDtoDevelop.map(map => map("dbName")).distinct().collect() 

// Fill a list with key value pairs as (dbName,db content) 
var dbContents = List[(String,Array[Map[String,String]])]() 
dbList.foreach(dbName => dbContents = (dbName,generateRDDfromdbName(dbName).collect()) :: dbContents) 

// Generate a RDD from this list to benefit to advantages of RDD 
val RDDdbs = sc.parallelize(dbContents) 

// Key the initial RDD by dbName and join with the contents of dbs 
val joinedRDD = RDDtoDevelop.keyBy(map => map("dbName")).join(RDDdbs) 

// Check for matched ids between RDD data to develop and dbContents 
val result = joinedRDD.map({ case (s,(maptoDeveleop,content)) => maptoDeveleop + ("new data" -> content.find(mapContent => mapContent("id") == maptoDeveleop("id")).get("data"))}) 

ソリューション2

val dbList = RDDtoDevelop.map(map => map("dbName")).distinct().collect() 

// Create the list of the database RDDs keyed by (dbName, id) 
var dbRDDList = List[RDD[((String,String),Map[String,String])]]() 
dbList.foreach(dbName => dbRDDList = generateRDDfromdbName(dbName).keyBy(map => (dbName,map("id"))) :: dbRDDList) 

// Create a RDD containing all dbRDD 
val RDDdbs = sc.union(dbRDDList) 

// Join the initial RDD based on the key with the dbRDDs 
val joinedRDD = RDDtoDevelop.keyBy(map => (map("dbName"), map("id"))).join(RDDdbs) 

// Reformate the result 
val result = joinedRDD.map({ case ((dbName,id),(maptoDevelop,dbmap)) => maptoDevelop + ("new data" -> dbmap("data"))}) 

それらの両方が欲しかった出力を与える:ここで

scalaコードです。 dbidの一致がSparkというパラレル化を使用しているので、私の考えでは、2番目の方が良いと思われますが、私はそれについては分かりません。あなたは私に最高のものを選ぶか、より良いものを選ぶことを助けてくれますか、私に鉱山よりも良い解決策の手がかりを与えてください。

他のコメントもありがとうございます(これは私の最初の質問です)。事前による

おかげで、

マット

+0

? – stefanobaghino

+0

@stefanobaghino現時点ではまだ生産されていないが、いくつかのGbに対処することを目的としています。 – Matt

+0

データセットの実際のサイズ、クラスタ内のコンピューティングパワーと帯域幅によっては、必要なデータを収集してブロードキャストします。私は偶然にもかかわらずブロードキャストGBを持っていました(私たちは、ノードを接続する10Gbpsの回線を持つクラスタ上のブロードキャストでTBをフィルタリングすることを話しています)。また、Sparkが本当に必要なものかどうかを確認することをお勧めします。特に、データセット全体が10 GB未満の場合は、商品サーバーの把握が必要です。 – stefanobaghino

答えて

1
私はあなたが非常に簡単になりデータに適用したい dataframe秒、その後 joinsdistinctおよびその他の functionsにご RDD秒に変換することをお勧め


Dataframesが配布され、dataframe apisに加えて、sql queriesを使用することができる。詳細はSpark SQL, DataFrames and Datasets GuideIntroducing DataFrames in Apache Spark for Large Scale Data Science
にあります。また、コード実行が遅くなるようにforeachcollectの機能が必要になることはありません。

val RDDtoDevelop = sc.parallelize(Array(
    Map("dbName" -> "A", "id" -> "id1", "other data" -> "some data"), 
    Map("dbName" -> "C", "id" -> "id6", "other data" -> "some other data"), 
    Map("dbName" -> "A", "id" -> "id8", "other data" -> "some other other data"))) 
    .cache() 

の下dataFrame

val developColumns=RDDtoDevelop.take(1).flatMap(map=>map.keys) 

val developDF = RDDtoDevelop.map{value=> 
    val list=value.values.toList 
    (list(0),list(1),list(2)) 
}.toDF(developColumns:_*) 

そしてdataFrameに上記RDDを変換すると

dataframeにご A rddをCoverting
+------+---+---------------------+ 
|dbName|id |other data   | 
+------+---+---------------------+ 
|A  |id1|some data   | 
|C  |id6|some other data  | 
|A  |id8|some other other data| 
+------+---+---------------------+ 

以下のように見えるようデータフレームにRDDtoDevelopを変換する
例です以下の通りです。
のソースコードのための:

val A = sc.parallelize(Array(
    Map("id" -> "id1", "data" -> "data1"), 
    Map("id" -> "id8", "data" -> "data8") 
)) 
DataFrame

コード:

val aColumns=A.take(1).flatMap(map=>map.keys) 

val aDF = A.map{value => 
    val list=value.values.toList 
    (list(0),list(1)) 
}.toDF(aColumns:_*).withColumn("name", lit("A")) 

新しいカラムnamedevelopDFと終了時に正しいjoinを有することdatabase nameが付加されています。 DataFrame Aのための
出力:

+---+-----+----+ 
|id |data |name| 
+---+-----+----+ 
|id1|data1|A | 
|id8|data8|A | 
+---+-----+----+ 

あなたは同じような方法でBCを変換することができます。 B用
出典:C用

+---+--------+----+ 
|id |data |name| 
+---+--------+----+ 
|id1|data1bis|B | 
|id5|data5 |B | 
+---+--------+----+ 

ソース:C用

val C = sc.parallelize(Array(
    Map("id" -> "id2", "data" -> "data2"), 
    Map("id" -> "id6", "data" -> "data6") 
)) 

DATAFRAMEコードB用

val bColumns=B.take(1).flatMap(map=>map.keys) 

    val bDF = B.map{value => 
     val list=value.values.toList 
     (list(0),list(1)) 
    }.toDF(bColumns:_*).withColumn("name", lit("B")) 

出力:B用

val B = sc.parallelize(Array(
    Map("id" -> "id1", "data" -> "data1bis"), 
    Map("id" -> "id5", "data" -> "data5") 
)) 

DATAFRAME :

val cColumns=C.take(1).flatMap(map=>map.keys) 

val cDF = C.map{value => 
    val list=value.values.toList 
    (list(0),list(1)) 
}.toDF(cColumns:_*).withColumn("name", lit("C")) 

C用の出力:変換後

+---+-----+----+ 
|id |data |name| 
+---+-----+----+ 
|id2|data2|C | 
|id6|data6|C | 
+---+-----+----+ 

ABC

+---+--------+----+ 
|id |data |name| 
+---+--------+----+ 
|id1|data1 |A | 
|id8|data8 |A | 
|id1|data1bis|B | 
|id5|data5 |B | 
|id2|data2 |C | 
|id6|data6 |C | 
+---+--------+----+ 
だろう union

var unionDF = aDF.union(bDF).union(cDF) 

を使用してマージすることができます

次に、developDFunionDFの後にrenamingidの列がunionDFの場合は、droppingとなります。

unionDF = unionDF.withColumnRenamed("id", "id1") 
unionDF = developDF.join(unionDF, developDF("id") === unionDF("id1") && developDF("dbName") === unionDF("name"), "left").drop("id1", "name") 

最後に、我々はあなたがその後ニードフルを行うことができます

+------+---+---------------------+-----+ 
|dbName|id |other data   |data | 
+------+---+---------------------+-----+ 
|A  |id1|some data   |data1| 
|C  |id6|some other data  |data6| 
|A  |id8|some other other data|data8| 
+------+---+---------------------+-----+ 

を持っています。
注:​​3210関数は輸入以下で動作します

import org.apache.spark.sql.functions._ 
我々は個別に二つの段階のために、話をしているデータのどのサイズ
+0

本当にあなたの答えをありがとう、データフレームは面白そうです。しかし、それはNoSQLのデータ形式に適合していますか?私はこれがバイパス可能なフィールドをnullにすることができると思う。そして、これは最初に必要なデータベースの名前を取得し、それに対応するデータフレームを生成してそれらの結合を作る必要性を解決しません。間違っていますか?今のところ、2番目のソリューションをデータフレームに適応させようと思っています。これは良い方法が見つかるまで実際に改善して簡単にできます。 – Matt

+0

私は自分の答えを更新して、続く他の人たち。データフレームでSQLクエリを使用できます。 A、B、Cが同じストリーミング・ムンゴ・データベースのデータである場合、それらを1つのデータ・フレームに入れる方が良いでしょうが、異なるソースからのものであれば、ユニオンが必要です。 –

+0

あなたのコードの問題は、ソースを提供せずにデータベースのデータを結合することです。出力ではdata1bisが結合されていますが、そうではありません。しかし私はその考えを見る。あなたの時間と助けてくれてありがとう。 – Matt

関連する問題