データフレームを使用してクエリを実行しているときにパフォーマンス上の問題が発生しています。私は自分の研究で、最終的に長時間実行される作業は、データが最適に妨害されないという兆候となり得ることを見てきましたが、この問題を解決するための詳細なプロセスは見つかりませんでした。最後のタスクが最初の199倍よりも100倍長く、改善する方法
私はデータフレームとして2つのテーブルのロードを開始していますが、そのテーブルを1つのフィールドに結合しています。パフォーマンスを向上させるために、(パーティションを分割して)ディストリビューションを追加して並べ替えようとしましたが、この1つの長い実行中の最終的な作業はまだ見ています。ここで私のコードの単純なバージョンですが、クエリ1と2は実際には単純ではなく、UDFを使っていくつかの値を計算することに注意してください。
私はspark.sql.shuffle
のいくつかの異なる設定を試しました。私は100を試しましたが、失敗しました(私は実際にこれを多くデバッグして正直ではありませんでした)。私は300、4000、8000を試しました。私は各ファイルが1時間であるデータの1日を選択しています。
val df1 = sqlContext.sql("Select * from Table1")
val df2 = sqlContext.sql("Select * from Table2")
val distributeDf1 = df1
.repartition(df1("userId"))
.sortWithinPartitions(df1("userId"))
val distributeDf2 = df2
.repartition(df2("userId"))
.sortWithinPartitions(df2("userId"))
distributeDf1.registerTempTable("df1")
distributeDf2.registerTempTable("df2")
val df3 = sqlContext
.sql("""
Select
df1.*
from
df1
left outer join df2 on
df1.userId = df2.userId""")
userIdでパーティション化するのは理想的ではないようですので、代わりにタイムスタンプで区切ることができます。私がこれを行うならば、私はちょうど日+時間をするべきですか?このために200個以下のユニークなコンボがある場合、私は空のエグゼキュータを持つでしょうか?
どのタスクSpark web uiが最も時間がかかると報告していますか? –
ジョブは単にsaveAsTableというラベルが付けられています。ジョブ内でタスクを区別する方法がわかりません。私は最も時間を取っているエグゼクティブが最高シャッフル値を持っていることがわかります。 –
'userId'はテキスト型か数値型ですか?テキストの場合は、数値型を試してみることをお勧めします。 –