最初にspark.sql.autoBroadcastJoinThreshold
とbroadcast
ヒントは別々のメカニズムです。 autoBroadcastJoinThreshold
が無効になっていても、broadcast
のヒントが優先されます。
:私たちは自動放送スパークが標準
SortMergeJoin
を使用します。無効にすると
df1.join(df2, Seq("id")).explain
== Physical Plan ==
*Project [id#0L]
+- *BroadcastHashJoin [id#0L], [id#3L], Inner, BuildRight
:- *Range (0, 100, step=1, splits=Some(8))
+- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false]))
+- *Range (0, 100, step=1, splits=Some(8))
:デフォルトの設定では:
spark.conf.get("spark.sql.autoBroadcastJoinThreshold")
String = 10485760
val df1 = spark.range(100)
val df2 = spark.range(100)
スパークはautoBroadcastJoinThreshold
と、自動的にデータ放送を使用します。 10
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)
df1.join(df2, Seq("id")).explain
== Physical Plan ==
*Project [id#0L]
+- *SortMergeJoin [id#0L], [id#3L], Inner
:- *Sort [id#0L ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(id#0L, 200)
: +- *Range (0, 100, step=1, splits=Some(8))
+- *Sort [id#3L ASC NULLS FIRST], false, 0
+- ReusedExchange [id#3L], Exchange hashpartitioning(id#0L, 200)
しかしbroadcast
ヒントとBroadcastHashJoin
を使用するように強制することができます
df1.join(broadcast(df2), Seq("id")).explain
== Physical Plan ==
*Project [id#0L]
+- *BroadcastHashJoin [id#0L], [id#3L], Inner, BuildRight
:- *Range (0, 100, step=1, splits=Some(8))
+- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false]))
+- *Range (0, 100, step=1, splits=Some(8))
SQLは、(ハイブに使用されているものと同様)は、独自のヒントの形式になっています。
df1.createOrReplaceTempView("df1")
df2.createOrReplaceTempView("df2")
spark.sql(
"SELECT /*+ MAPJOIN(df2) */ * FROM df1 JOIN df2 ON df1.id = df2.id"
).explain
== Physical Plan ==
*BroadcastHashJoin [id#0L], [id#3L], Inner, BuildRight
:- *Range (0, 100, step=1, splits=8)
+- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false]))
+- *Range (0, 100, step=1, splits=8)
autoBroadcastJoinThreshold
については、Dataset
APIを使用する場合は該当しますが、明示的なbroadcast
ヒントを使用する場合は該当しません。
さらに、大きなオブジェクトをブロードキャストすると、パフォーマンスが向上する可能性は低く、実際にはパフォーマンスが低下し、安定性の問題が発生することがあります。ブロードキャストされたオブジェクトは、最初にドライバにフェッチし、次に各ワーカに送信し、最後にメモリにロードする必要があることに注意してください。