2016-09-16 5 views
3

一部のデータ操作クエリでSpark SQLを評価しようとしています。SparkSQLとその使用方法の理解

table1: key, value1, value2 
table2: key, value3, value4 

create table table3 as 
select * from table1 join table2 on table1.key = table2.key 

私はTABLE1とtable2のRDDSを作成することができるはずのようですね(しかし、私は、ドキュメント内のそのの非常に明白な例を見ない):シナリオは、私はこの本に興味を持っています。 しかし、もっと大きな問題は、2つのテーブルのRDDをキーでうまく分割してSpark SQLに参加させると、パーティション分割を利用できるほどスマートなのでしょうか?また、その結合の結果として新しいRDDを作成すると、それもパーティション化されますか?言い換えれば、それは完全シャッフルフリーでしょうか? 私は実際にこれらの主題に関する文書や例へのポインタを感謝します。

+0

これはhttp://stackoverflow.com/questions/28850596/co-partitioned-joins-in-spark-sql?rq=1の欺瞞かもしれませんが、私は議論や例へのリンクもありがとうと思います... –

+0

Doあなたはhttp://を意味しますstackoverflow.com/q/30995699/1560062? – zero323

答えて

3

RDDsDatasetsの間の変換を意味する場合、両方の質問に対する回答は否定的です。

RDDパーティショニングはRDD[(T, U)]に対してのみ定義され、RDDDatasetに変換された後に失われます。既存のデータレイアウトに恩恵を受ける場合がありますが、joinはこれらのうちの1つではなく、特にRDDsDatasetsでは異なるハッシュ手法(標準hashCodeMurmurHash)を使用しています。もちろん、後者はカスタムパーティショナーRDDそれは本当にポイントではありません)。

DatasetRDDに変換すると、同様にパーティション化に関する情報が失われます。

Datasetパーティション分割を使用して、joinsを最適化することができます。例えば、テーブルは、事前パーティション化されている場合:keyに基づいて

val n: Int = ??? 

val df1 = Seq(
    ("key1", "val1", "val2"), ("key2", "val3", "val4") 
).toDF("key", "val1", "val2").repartition(n, $"key").cache 

val df2 = Seq(
    ("key1", "val5", "val6"), ("key2", "val7", "val8") 
).toDF("key", "val3", "val4").repartition(n, $"key").cache 

以降のjoinは、追加の交換を必要としません。

df2.explain 
// == Physical Plan == 
// InMemoryTableScan [key#201, val3#202, val4#203] 
// +- InMemoryRelation [key#201, val3#202, val4#203], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas) 
//   +- Exchange hashpartitioning(key#201, 3) 
//    +- LocalTableScan [key#201, val3#202, val4#203] 
// 
df1.join(df3, Seq("key")).explain 
// == Physical Plan == 
// *Project [key#171, val1#172, val2#173, val5#232, val6#233] 
// +- *SortMergeJoin [key#171], [key#231], Inner 
// :- *Sort [key#171 ASC], false, 0 
// : +- *Filter isnotnull(key#171) 
// :  +- InMemoryTableScan [key#171, val1#172, val2#173], [isnotnull(key#171)] 
// :   +- InMemoryRelation [key#171, val1#172, val2#173], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas) 
// :     +- Exchange hashpartitioning(key#171, 3) 
// :     +- LocalTableScan [key#171, val1#172, val2#173] 
// +- *Sort [key#231 ASC], false, 0 
//  +- *Filter isnotnull(key#231) 
//   +- InMemoryTableScan [key#231, val5#232, val6#233], [isnotnull(key#231)] 
//    +- InMemoryRelation [key#231, val5#232, val6#233], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas) 
//      +- Exchange hashpartitioning(key#231, 3) 
//       +- LocalTableScan [key#231, val5#232, val6#233] 

spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1 

df1.explain 
// == Physical Plan == 
// InMemoryTableScan [key#171, val1#172, val2#173] 
// +- InMemoryRelation [key#171, val1#172, val2#173], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas) 
//   +- Exchange hashpartitioning(key#171, 3) 
//    +- LocalTableScan [key#171, val1#172, val2#173] 

は明らかに私たちは本当に、単一の参加にその恩恵を受けません。したがって、単一のテーブルが複数の joinsに使用されている場合にのみ意味があります。

また、我々は別のjoinを実行したい場合はそうjoinによって作成されたパーティションの恩恵を受けることができますスパーク:

val df3 = Seq(
    ("key1", "val9", "val10"), ("key2", "val11", "val12") 
).toDF("key", "val5", "val6") 

df1.join(df3, Seq("key")).join(df3, Seq("key")) 

我々は最初の操作によって作成された構造(ReusedExchangeに注意してください)の恩恵を受ける:

// == Physical Plan == 
// *Project [key#171, val1#172, val2#173, val5#682, val6#683, val5#712, val6#713] 
// +- *SortMergeJoin [key#171], [key#711], Inner 
// :- *Project [key#171, val1#172, val2#173, val5#682, val6#683] 
// : +- *SortMergeJoin [key#171], [key#681], Inner 
// :  :- *Sort [key#171 ASC], false, 0 
// :  : +- Exchange hashpartitioning(key#171, 200) 
// :  :  +- *Filter isnotnull(key#171) 
// :  :  +- InMemoryTableScan [key#171, val1#172, val2#173], [isnotnull(key#171)] 
// :  :    +- InMemoryRelation [key#171, val1#172, val2#173], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas) 
// :  :     +- Exchange hashpartitioning(key#171, 3) 
// :  :      +- LocalTableScan [key#171, val1#172, val2#173] 
// :  +- *Sort [key#681 ASC], false, 0 
// :  +- Exchange hashpartitioning(key#681, 200) 
// :   +- *Project [_1#677 AS key#681, _2#678 AS val5#682, _3#679 AS val6#683] 
// :    +- *Filter isnotnull(_1#677) 
// :     +- LocalTableScan [_1#677, _2#678, _3#679] 
// +- *Sort [key#711 ASC], false, 0 
//  +- ReusedExchange [key#711, val5#712, val6#713], Exchange hashpartitioning(key#681, 200) 
+0

これはDataSetの例です。このマップはSparkSqlにマップされますか?同じ列に分割された2つのDFを結合してSpark SQLを使用して新しいDFを作成するとどうなりますか?結果のDFは分割されますか? –

+0

はい、SQLと 'DataFrame' APIの実行に違いはありません。 – zero323

+0

と私の理解では、Spark 2.0ではスマートにしかできません。 –