私はspark.sqlと結合したい2つの大きなHiveテーブルを持っています。表1に500万行、表2に7000万行のテーブル1とテーブル2があるとしましょう。テーブルはうっすらとしたフォーマットであり、ハイブにはパーケットファイルとして保存されています。2つのテーブルの結合でスパーク性能の問題
私はそれらに参加し、いくつかの列でいくつかの集計を取って、2つの条件(col1、col2に言うことができます)でフィルタリングしながら、すべての行と平均(例えばdoubleColumn)
注:私は1台のマシンでテストインストールを行っています(これはかなり強力です)。私は、クラスタ内のパフォーマンスがおそらく異なると予想しています。私はエグゼキュータとドライバあたり少なくとも8ギガバイトのメモリを与える場合でも
val stat = sqlContext.sql("select count(id), avg(doubleColumn) " +
" FROM db.table1 as t1 JOIN db.table2 " +
" ON t1.id = t2.id " +
" WHERE col1 = val1 AND col2 = val2").collect
5分程度の非常に悪い残念ながら、この実行を:
私の最初の試みは次のようにスパークSQLを使用することです。私はまた、データフレームの構文を使用して最初の行をフィルタなど、より良い選択性を持っているだけで特定の列を選択しようとしてみました:
//Filter first and select only needed column
val df = spark.sql("SELECT * FROM db.tab1")
val tab1= df.filter($"col1" === "val1" && $"col2" === "val2").select("id")
val tab2= spark.sql("SELECT id, doubleColumn FROM db.tab2")
val joined = tab1.as("d1").join(tab2.as("d2"), $"d1.id" === $"d2.id")
//Take the aggregations on the joined df
import org.apache.spark.sql.functions;
joined.agg(
functions.count("id").as("count"),
functions.avg("doubleColumn").as("average")
).show();
しかし、これは有意なパフォーマンスの向上を持っていません。参加のパフォーマンスをどのように改善できますか?
このspark.sqlまたはデータフレーム構文を実行する最も良い方法はどれですか?
実行者やメモリを増やすと役立ちますか?
キャッシュを使用する必要がありますか?
データフレームtab1、tab2と結合集計の両方をキャッシュしてもかなりの利益が得られましたが、同時にいくつかの分析クエリを求める多くのユーザーが同時性に関心を持っているので、私のデータフレームをキャッシュすることは現実的ではないと思います。単一ノードで作業しているため、クラスタ上の本番環境に移動すると問題が解決しないため、何もしませんか?
ボーナス質問:私はインパラでこのクエリを試してみましたが、それは約40秒をしましたが、それはspark.sqlよりずっといいしています。インパラはどのようにして火花よりも優れていますか?
単一ノードにいくつのコアがありますか? –
私は10で試してみました - どれくらい使ったらいいですか? –