2017-11-30 12 views
1

私はspark.sqlと結合したい2つの大きなHiveテーブルを持っています。表1に500万行、表2に7000万行のテーブル1とテーブル2があるとしましょう。テーブルはうっすらとしたフォーマットであり、ハイブにはパーケットファイルとして保存されています。2つのテーブルの結合でスパーク性能の問題

私はそれらに参加し、いくつかの列でいくつかの集計を取って、2つの条件(col1、col2に言うことができます)でフィルタリングしながら、すべての行と平均(例えばdoubleColumn)

注:私は1台のマシンでテストインストールを行っています(これはかなり強力です)。私は、クラスタ内のパフォーマンスがおそらく異なると予想しています。私はエグゼキュータとドライバあたり少なくとも8ギガバイトのメモリを与える場合でも

val stat = sqlContext.sql("select count(id), avg(doubleColumn) " + 
           " FROM db.table1 as t1 JOIN db.table2 " + 
           " ON t1.id = t2.id " + 
           " WHERE col1 = val1 AND col2 = val2").collect 

5分程度の非常に悪い残念ながら、この実行を:

私の最初の試みは次のようにスパークSQLを使用することです。私はまた、データフレームの構文を使用して最初の行をフィルタなど、より良い選択性を持っているだけで特定の列を選択しようとしてみました:

//Filter first and select only needed column 
val df = spark.sql("SELECT * FROM db.tab1") 
val tab1= df.filter($"col1" === "val1" && $"col2" === "val2").select("id") 

val tab2= spark.sql("SELECT id, doubleColumn FROM db.tab2") 
val joined = tab1.as("d1").join(tab2.as("d2"), $"d1.id" === $"d2.id") 

//Take the aggregations on the joined df 
import org.apache.spark.sql.functions; 

joined.agg(
    functions.count("id").as("count"), 
    functions.avg("doubleColumn").as("average") 
).show(); 

しかし、これは有意なパフォーマンスの向上を持っていません。参加のパフォーマンスをどのように改善できますか?

  • このspark.sqlまたはデータフレーム構文を実行する最も良い方法はどれですか?

  • 実行者やメモリを増やすと役立ちますか?

  • キャッシュを使用する必要がありますか?
    データフレームtab1、tab2と結合集計の両方をキャッシュしてもかなりの利益が得られましたが、同時にいくつかの分析クエリを求める多くのユーザーが同時性に関心を持っているので、私のデータフレームをキャッシュすることは現実的ではないと思います。

  • 単一ノードで作業しているため、クラスタ上の本番環境に移動すると問題が解決しないため、何もしませんか?

ボーナス質問:私はインパラでこのクエリを試してみましたが、それは約40秒をしましたが、それはspark.sqlよりずっといいしています。インパラはどのようにして火花よりも優れていますか?

+0

単一ノードにいくつのコアがありますか? –

+0

私は10で試してみました - どれくらい使ったらいいですか? –

答えて

2

このspark.sqlまたはデータフレーム構文を実行する最も良い方法はどれですか?

何も違いはありません。

エグゼキュータまたはメモリを追加すると役立ちますか?

問題がデータスキューによって引き起こされず、正しく構成を調整する場合のみ。

キャッシュを使用する必要がありますか?

入力データが複数回再利用される場合は、パフォーマンスを考慮して入力データを複数回使用することをお勧めします。

単一ノードで作業しているため、クラスタ上の運用環境に移行すると問題が解決しないため、何もしないでください。

一般に、単一ノード上のパフォーマンステストは完全に役に立たない。ボトルネック(ネットワークIO /通信)と利点(ディスクI/Oとリソース使用量の償却)の両方が失われます。

しかし、parallelsm(spark.sql.shuffle.partitionssql.default.parallelismおよび入力分割サイズの増加)を大幅に減らすことができます。 Counterintuitivロードを分散するために設計されたスパークスタイルの並列処理は、アセットよりも単一のマシン上での負担です。共有メモリに比べて通信が非常に遅くなる通信のシャッフル(ディスク書き込み!)に依存し、スケジューリングのオーバーヘッドは重要です。

どのようにインパラがスパークよりも優れていますか?

これは、低遅延の同時クエリ用に特別に設計されているためです。 Sparkの目標であったことではありません(データベース対ETLフレームワーク)。

我々は、並行性に興味があるようあなた

は、多くのユーザーが同時にいくつかの分析クエリを求めています。

スパークは正しい選択肢のようには聞こえません。

+0

spark.sql.shuffle.partitions、sql.default.parallelismの設定オプションについてもう少し詳しくお聞かせください。 –

1

configsを変更することができます。これは、とにかく大きなクラスタで変更する必要があります。私はすぐに2つのことを考えることができます。 spark.executor.coresを5に設定し、メモリに応じて、spark.executor.instancesspark.executor.memoryでさらにエグゼキュータとメモリを増やしてください。また、ハイブテーブルをバケットで並べ替えることができますか?テーブルをバケット化すると、テーブルに結合する前にテーブルをソートする必要がなくなります。

触媒が集計クエリをどのように処理するかに応じて、結合後にデータフレームをキャッシュした方が高速かもしれません。クエリが終了した後にもunpersist()することができますが、私はGCがそれを価値があるものにしないかもしれないことに同意します。

SQLまたはscala dslを使用しても利点はありません。両方とも完全な段階のコード生成を使用するため、本質的に同じです。

インパラは常に高速です。なぜなら、1つのノードではあまり気にする必要はありませんが、レプリケーションのためのデータを事前に準備する必要がなく、複製する。

関連する問題