以下は実行しているサンプルコードです。このスパークジョブが実行されると、broadcastjoinではなくsortmergejoinを使用してDataframe結合が行われています。Spark 1.6でデータフレームを結合している間にブロードキャストが発生しない
def joinedDf (sqlContext: SQLContext,
txnTable: DataFrame,
countriesDfBroadcast: Broadcast[DataFrame]):
DataFrame = {
txnTable.as("df1").join((countriesDfBroadcast.value).withColumnRenamed("CNTRY_ID", "DW_CNTRY_ID").as("countries"),
$"df1.USER_CNTRY_ID" === $"countries.DW_CNTRY_ID", "inner")
}
joinedDf(sqlContext, txnTable, countriesDfBroadcast).write.parquet("temp")
joinステートメントでbroadcast()ヒントを指定しても、broadcastjoinは実行されません。
オプティマイザはデータフレームをハッシュパーティション化しており、データのスキューが発生しています。
誰でもこの動作を見ましたか?
私はこれをSpark 1.6とHiveContextをSQLContextとして使用しています。スパークジョブは200人のエグゼキュータで実行されます。 txnTableのデータサイズは240GB、countriesDfのデータサイズは5MBです。
ここで、BroadcastHashJoinが1回実行され、SortMergeJoinが別の実行で表示されます。 (同じコード、異なるデータセット)。 –
ブロードキャスト・ジョインのサイズのしきい値を超えていると思います。 – zero323
私は非常に高いspark.sql.autoBroadcastJoinThresholdを持っています。約1GB。また、ブロードキャストされるファイルは約5 MBです。しかし、他の実行では、上記の推奨事項は素晴らしいです。 –