2017-05-15 11 views
4

spark.sql.autoBroadcastJoinThresholdプロパティは、結合スキームがSpark SQLを使用するのではなく、データセットAPI結合を使用している場合でも、すべてのワーカー・ノードで(結合を作成しながら)小さなテーブルをブロードキャストするのに役立つかどうかを知りたいと思います。Datasetの結合演算子を使用する結合のspark.sql.autoBroadcastJoinThresholdは機能しますか?

私の大きなテーブルが250回のギグと小さい場合は20回のギグで、私はこの設定を設定する必要があります:= 21 spark.sql.autoBroadcastJoinThreshold(多分)ギグすべてのワーカーノードに全テーブル/ Datasetを送信するためには?

  • データセットのAPIが

    val result = rawBigger.as("b").join(
        broadcast(smaller).as("s"), 
        rawBigger(FieldNames.CAMPAIGN_ID) === smaller(FieldNames.CAMPAIGN_ID), 
        "left_outer" 
    ) 
    
  • SQL

    select * 
    from rawBigger_table b, smaller_table s 
    where b.campign_id = s.campaign_id; 
    
に参加

答えて

9

最初にspark.sql.autoBroadcastJoinThresholdbroadcastヒントは別々のメカニズムです。 autoBroadcastJoinThresholdが無効になっていても、broadcastのヒントが優先されます。

:私たちは自動放送スパークが標準 SortMergeJoinを使用します。無効にすると

df1.join(df2, Seq("id")).explain 
== Physical Plan == 
*Project [id#0L] 
+- *BroadcastHashJoin [id#0L], [id#3L], Inner, BuildRight 
    :- *Range (0, 100, step=1, splits=Some(8)) 
    +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false])) 
     +- *Range (0, 100, step=1, splits=Some(8)) 

:デフォルトの設定では:

spark.conf.get("spark.sql.autoBroadcastJoinThreshold") 
String = 10485760 
val df1 = spark.range(100) 
val df2 = spark.range(100) 

スパークはautoBroadcastJoinThresholdと、自動的にデータ放送を使用します。 10

spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1) 
df1.join(df2, Seq("id")).explain 
== Physical Plan == 
*Project [id#0L] 
+- *SortMergeJoin [id#0L], [id#3L], Inner 
    :- *Sort [id#0L ASC NULLS FIRST], false, 0 
    : +- Exchange hashpartitioning(id#0L, 200) 
    :  +- *Range (0, 100, step=1, splits=Some(8)) 
    +- *Sort [id#3L ASC NULLS FIRST], false, 0 
     +- ReusedExchange [id#3L], Exchange hashpartitioning(id#0L, 200) 

しかしbroadcastヒントとBroadcastHashJoinを使用するように強制することができます

df1.join(broadcast(df2), Seq("id")).explain 
== Physical Plan == 
*Project [id#0L] 
+- *BroadcastHashJoin [id#0L], [id#3L], Inner, BuildRight 
    :- *Range (0, 100, step=1, splits=Some(8)) 
    +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false])) 
     +- *Range (0, 100, step=1, splits=Some(8)) 

SQLは、(ハイブに使用されているものと同様)は、独自のヒントの形式になっています。

df1.createOrReplaceTempView("df1") 
df2.createOrReplaceTempView("df2") 

spark.sql(
"SELECT /*+ MAPJOIN(df2) */ * FROM df1 JOIN df2 ON df1.id = df2.id" 
).explain 
== Physical Plan == 
*BroadcastHashJoin [id#0L], [id#3L], Inner, BuildRight 
:- *Range (0, 100, step=1, splits=8) 
+- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false])) 
    +- *Range (0, 100, step=1, splits=8) 

autoBroadcastJoinThresholdについては、Dataset APIを使用する場合は該当しますが、明示的なbroadcastヒントを使用する場合は該当しません。

さらに、大きなオブジェクトをブロードキャストすると、パフォーマンスが向上する可能性は低く、実際にはパフォーマンスが低下し、安定性の問題が発生することがあります。ブロードキャストされたオブジェクトは、最初にドライバにフェッチし、次に各ワーカに送信し、最後にメモリにロードする必要があることに注意してください。

4

@user6910411からの詳細な(コードからの)詳細な情報を共有するだけです。source code(フォーマット鉱山)を引用


spark.sql.autoBroadcastJoinThresholdは、結合を実行する場合、すべてのワーカーノードにブロードキャストされるテーブルの最大サイズをバイト単位で設定します。

この値を-1に設定すると、ブロードキャストを無効にすることができます。

現在のところ、統計情報は、コマンドANALYZE TABLE COMPUTE STATISTICS noscanが実行されているHive Metastoreテーブルと、統計データがデータファイルで直接計算されるファイルベースのデータソーステーブルでのみサポートされています。 10Mへ

spark.sql.autoBroadcastJoinThresholdデフォルト(すなわち10L * 1024 * 1024)とスパーク(JoinSelection実行計画方針を参照)を使用して参加するかどうか確認します。

ありは、選択に参加し、その中で異なっている(BroadcastHashJoinExecまたはBroadcastNestedLoopJoinExec物理演算子を使用して)放送されます。

BroadcastHashJoinExecは加入キーがある場合に選択し、以下のいずれかが成り立つれます:

  • が参加し、CROSS、INNERの1、LEFT ANTI、LEFT OUTER、LEFT SEMI、右側に参加ブロードキャストすることができますすなわち未満spark.sql.autoBroadcastJoinThreshold
  • 参加サイズはCROSS、INNERとRIGHT OUTERの一つであり、側面に参加左サイズ未満spark.sql.autoBroadcastJoinThreshold

である。すなわち、放送することができますキーが結合されていて、次のいずれかが成り立つ場合、上記の条件のうちの1つがBroadcastHashJoinExecの場合、0が選択されます。

つまり、spark.sql.autoBroadcastJoinThresholdプロパティ(他の要件もあります)に基づくBroadcastHashJoinExecに加えて、結合タイプも含めて、Sparkは自動的に正しい結合を選択します。

関連する問題