2016-07-22 7 views
14

データフレームを使用してクエリを実行しているときにパフォーマンス上の問題が発生しています。私は自分の研究で、最終的に長時間実行される作業は、データが最適に妨害されないという兆候となり得ることを見てきましたが、この問題を解決するための詳細なプロセスは見つかりませんでした。最後のタスクが最初の199倍よりも100倍長く、改善する方法

私はデータフレームとして2つのテーブルのロードを開始していますが、そのテーブルを1つのフィールドに結合しています。パフォーマンスを向上させるために、(パーティションを分割して)ディストリビューションを追加して並べ替えようとしましたが、この1つの長い実行中の最終的な作業はまだ見ています。ここで私のコードの単純なバージョンですが、クエリ1と2は実際には単純ではなく、UDFを使っていくつかの値を計算することに注意してください。

私はspark.sql.shuffleのいくつかの異なる設定を試しました。私は100を試しましたが、失敗しました(私は実際にこれを多くデバッグして正直ではありませんでした)。私は300、4000、8000を試しました。私は各ファイルが1時間であるデータの1日を選択しています。

val df1 = sqlContext.sql("Select * from Table1") 
val df2 = sqlContext.sql("Select * from Table2") 

val distributeDf1 = df1 
    .repartition(df1("userId")) 
    .sortWithinPartitions(df1("userId")) 

val distributeDf2 = df2 
    .repartition(df2("userId")) 
    .sortWithinPartitions(df2("userId")) 

distributeDf1.registerTempTable("df1") 
distributeDf2.registerTempTable("df2") 

val df3 = sqlContext 
    .sql(""" 
    Select 
     df1.* 
    from 
     df1 
    left outer join df2 on 
     df1.userId = df2.userId""") 

userIdでパーティション化するのは理想的ではないようですので、代わりにタイムスタンプで区切ることができます。私がこれを行うならば、私はちょうど日+時間をするべきですか?このために200個以下のユニークなコンボがある場合、私は空のエグゼキュータを持つでしょうか?

+0

どのタスクSpark web uiが最も時間がかかると報告していますか? –

+0

ジョブは単にsaveAsTableというラベルが付けられています。ジョブ内でタスクを区別する方法がわかりません。私は最も時間を取っているエグゼクティブが最高シャッフル値を持っていることがわかります。 –

+0

'userId'はテキスト型か数値型ですか?テキストの場合は、数値型を試してみることをお勧めします。 –

答えて

10

明らかに、巨大な問題のデータスキューがあります。

df1 = [mean=4.989209978967438, stddev=2255.654165352454, count=2400088] 
df2 = [mean=1.0, stddev=0.0, count=18408194] 

5の周りの平均値と標準偏差で2000の上にあなたがロングテールを得る:statistics you've providedを見てみましょう。

一部のエグゼキュータは、パーティションを分割した後に他のキーよりも頻繁に頻繁になるため、残っているエグゼキュータよりもはるかに多くの作業が必要になります。

さらに、同じパーティションにハッシュする1つまたは複数のキーで問題が発生する可能性があります。

それでは、最初の外れ値(擬似コード)を識別してみましょう:

val mean = 4.989209978967438 
val sd = 2255.654165352454 

val df1 = sqlContext.sql("Select * from Table1") 
val counts = df.groupBy("userId").count.cache 

val frequent = counts 
    .where($"count" > mean + 2 * sd) // Adjust threshold based on actual dist. 
    .alias("frequent") 
    .join(df1, Seq("userId")) 

残り:それは期待する何かが本当に

val infrequent = counts 
    .where($"count" <= mean + 2 * sd) 
    .alias("infrequent") 
    .join(df1, Seq("userId")) 

ですが?そうでない場合は、上流の問題の原因を特定してください。

ことが予想されている場合は、あなたがを試すことができます。

  • 小さなテーブルを放送:

    val df2 = sqlContext.sql("Select * from Table2") 
    df2.join(broadcast(df1), Seq("userId"), "rightouter") 
    
  • 分割、(union)を統一し、だけ頻繁に放送:

    df2.join(broadcast(frequent), Seq("userId"), "rightouter") 
        .union(df2.join(infrequent, Seq("userId"), "rightouter")) 
    
  • あなたがいけないいくつかのランダムなデータでuserId

を塩漬けしかし:

  • 配分ソートローカルにすべてのデータとは、(単独で、ローカルでソートすることは問題ではありませんが)
  • 行います完全なデータに対する標準ハッシュジョイン
+0

この回答には多くの有効な点があります! – eliasah

+0

私のinfrequentCount/frequentCountの比率は2.43です。私が参加しているテーブルは、多くのギグ(テーブルごとに約24ギガバイト)なので、放送はオプションだとは思わない。私は分割して統合しようとしています。ソルトの疑似コードを提供できますか?私はすべてのコードを整理し、ここでそれを編集/投稿します。 –

+2

@ zero323 "ランダムなデータでユーザーIDを塩漬けする"の例が好きです。私はこのようなものを実装しようとしていますが、完全には視覚化できません。私はあなたがキーに乱数を加えなければならないことを理解していますが、どのようにしてそれを他のテーブルに結合しますか?ありがとう! –

関連する問題