2017-02-10 11 views
0

apache sparkのキャッシュされたTempテーブルでパーティションのプルーニングが有効になっていますか?もしそうなら、どうすれば設定できますか?キャッシュされたテーブルのsparc SQLパーティションのプルーニング

私のデータは、さまざまなインストールでセンサーの読み取り値の束です.1行には、installationName、タグ、タイムスタンプ、および値が含まれています。今

val parquet = hc.read.parquet("/path_to_table/tablename") 
parquet.registerTempTable("tablename") 

もし私が:

私は次のコマンドを使用して、寄木細工の形式でデータを書いた:

rdd.toDF("installationName", "tag", "timestamp", "value") 
    .repartition($"installationName", $"tag") 
    .write.partitionBy("installationName","tag").mode("overwrite").parquet(config.output) 

私はスパークHiveContextを使用してSQLテーブルに次のコマンドを使用して、このデータを読みますこのテーブルでSQLクエリを実行すると、予期したとおりにパーティションを枝刈りします。

クエリには約8秒かかります。しかし、テーブルをメモリにキャッシュしてから同じクエリを実行すると、常に約50秒かかります。

hc.sql("CACHE TABLE tablename") 
hc.sql("select * from tablename where installationName = 'XXX' and tag = 'YYY'") 

私は現在Spark 1.6.1を使用しています。

+0

こんにちは、ご意見ありがとうございます。実際には、私がパーケットにデータを書き込む前に、再分割操作を行います。また、上記のクエリを再分割してテストしたところ、クエリ時間が20秒の方が効率的でしたが、キャッシングなしでパーケットファイルから読み取るよりも遅いです。私の目的は、寄せ木細工のファイルに書くのを避けることです。何らかのソースを提供できますか?キャッシュ後にパーティションプルーニングがサポートされていないことをどのように知っていますか?ここで答えを書くなら、それを受け入れることができます。 –

+0

修正、メモリ内のキャッシュはクエリ時間を1秒未満に短縮しますが、これは当然受け入れられます。私はそれがスケールかどうか疑問に思う:これは私のdastaの一部であり、私は実際には200倍以上の連続成長をしているので、私が持っているデータが多くなればなるほど、 。 –

答えて

0

これが発生する理由は、キャッシュがスパークでどのように動作するかに起因します。あなたにqueryExecution復帰計画

val df = sc.parallelize(1 to 10000).toDF("line") 
df.withColumn("new_line", col("line") * 10).queryExecution 

をコマンドを:あなたは、実行計画は以下を参照しているDATAFRAME、RDDまたはDataSetにプロセスのいくつかの種類を呼び出す

。コードの下の論理プランを参照してください。

== Parsed Logical Plan == 
Project [*,('line * 10) AS new_line#7] 
+- Project [_1#4 AS line#5] 
    +- LogicalRDD [_1#4], MapPartitionsRDD[9] at 

== Analyzed Logical Plan == 
line: int, new_line: int 
Project [line#5,(line#5 * 10) AS new_line#7] 
+- Project [_1#4 AS line#5] 
    +- LogicalRDD [_1#4], MapPartitionsRDD[9] at 

== Optimized Logical Plan == 
Project [_1#4 AS line#5,(_1#4 * 10) AS new_line#7] 
+- LogicalRDD [_1#4], MapPartitionsRDD[9] at intRddToDataFrameHolder at 

== Physical Plan == 
Project [_1#4 AS line#5,(_1#4 * 10) AS new_line#7] 
+- Scan ExistingRDD[_1#4] 

この場合、コードで実行されるすべてのプロセスが表示されます。あなたはこのようなcache関数を呼び出すとき:

df.withColumn("new_line", col("line") * 10).cache().queryExecution 

を結果は次のようになります。

== Parsed Logical Plan == 
'Project [*,('line * 10) AS new_line#8] 
+- Project [_1#4 AS line#5] 
    +- LogicalRDD [_1#4], MapPartitionsRDD[9] at intRddToDataFrameHolder at <console>:34 

== Analyzed Logical Plan == 
line: int, new_line: int 
Project [line#5,(line#5 * 10) AS new_line#8] 
+- Project [_1#4 AS line#5] 
    +- LogicalRDD [_1#4], MapPartitionsRDD[9] at intRddToDataFrameHolder at <console>:34 

== Optimized Logical Plan == 
InMemoryRelation [line#5,new_line#8], true, 10000, StorageLevel(true, true, false, true, 1), Project [_1#4 AS line#5,(_1#4 * 10) AS new_line#8], None 

== Physical Plan == 
InMemoryColumnarTableScan [line#5,new_line#8], InMemoryRelation [line#5,new_line#8], true, 10000, StorageLevel(true, true, false, true, 1), Pro... 

この実行は、これが保存されます、あなたにoptmized論理的計画のInMemoryRelationの実行を返します。あなたのメモリ内のデータの構造、またはあなたのデータが本当に大きい場合、それはディスクにこぼれるでしょう。

これをクラスタに保存する時間は、最初の実行では少し遅くなりますが、DFまたはRDDが保存される別の場所で同じデータに再びアクセスする必要がある場合は、スパークは再び実行を要求しません。

+0

ありがとうございました!スパーク・テーブルでは、キャッシングは熱心な操作です。つまり、初めてクエリを実行するときにデータがキャッシュされていることを意味します。データのキャッシュには実際に500秒かかりますが、実際にはキャッシュの後ではクエリのパフォーマンスが向上し、すべてのパーティションをスキャンするのに50秒しかかかりません。クエリを何回実行しても、パフォーマンスは常に同じです。あなたの答えは、私の質問には対処していません。これは、カヒシング後のパーティションのプルーニングに関するものです。 –

関連する問題