2017-09-14 6 views
1

私はSpark SQLを使用してHiveのデータを照会しています。データはパーティション化され、Spark SQLはクエリを実行するときにパーティションを正しくプルーニングします。ハイブテーブルのパーティションプルーンされた入力を表示するにはどうすればよいですか?

しかし、ソーステーブルとパーティションフィルタのどちらかを指定する必要があります(.inputFilesは明らかですが、枝刈りは反映されません)データが計算されます。

最も近い私が得ることができたのは、df.queryExecution.executedPlan.collectLeaves()を呼び出すことでした。これには、関連プランノードがHiveTableScanExecインスタンスとして含まれています。ただし、このクラスはorg.apache.spark.sql.hiveパッケージの場合はprivate[hive]です。私は、関連するフィールドがrelationpartitionPruningPredだと思う。

これを達成する方法はありますか?

更新:私はヤツェクの提案へと戻っrelationgetHiveQlPartitionsを使用して、パラメータとしてpartitionPruningPredを提供することにより、関連する情報のおかげで得ることができた。これは私が必要なすべてのデータが含まれてい

scan.findHiveTables(execPlan).flatMap(e => e.relation.getHiveQlPartitions(e.partitionPruningPred)) 

を、すべての入力ファイルへのパスを含め、適切にパーティションを整理します。

答えて

0

さて、あなたはクエリの実行の低レベルの詳細を求めています。物事はそこにうんざりしています。 あなたは警告されました:)

コメントに記載されているとおり、すべての実行情報はprivate[hive] HiveTableScanExecです。

HiveTableScanExec物理演算子(実行時にHiveテーブル)を理解するための1つの方法は、private[hive]ではないorg.apache.spark.sql.hiveパッケージのバックドアの種類を作成することです。

package org.apache.spark.sql.hive 

import org.apache.spark.sql.hive.execution.HiveTableScanExec 
object scan { 
    def findHiveTables(execPlan: org.apache.spark.sql.execution.SparkPlan) = execPlan.collect { case hiveTables: HiveTableScanExec => hiveTables } 
} 

必要に応じてコードを変更してください。

scan.findHiveTablesの場合、私は通常:paste -rawを使用し、spark-shellでは「未知の領域」に侵入します。

あなたは、単に次の操作を行うことができます:

scala> spark.version 
res0: String = 2.4.0-SNAPSHOT 

// Create a Hive table 
import org.apache.spark.sql.types.StructType 
spark.catalog.createTable(
    tableName = "h1", 
    source = "hive", // <-- that makes for a Hive table 
    schema = new StructType().add($"id".long), 
    options = Map.empty[String, String]) 

// select * from h1 
val q = spark.table("h1") 
val execPlan = q.queryExecution.executedPlan 
scala> println(execPlan.numberedTreeString) 
00 HiveTableScan [id#22L], HiveTableRelation `default`.`h1`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [id#22L] 

// Use the above code and :paste -raw in spark-shell 

import org.apache.spark.sql.hive.scan 
scala> scan.findHiveTables(execPlan).size 
res11: Int = 1 

relationフィールドは、それがスパーク・アナライザは、データ・ソースとハイブのテーブルを解決するために使用するResolveRelationsFindDataSourceTable論理ルールを使用して解決されています後ハイブテーブルです。

spark.sharedState.externalCatalogとして利用可能なExternalCatalogインターフェイスを使用して、SparkがHiveメタストアから使用するすべての情報を取得できます。これは、SparkがHiveテーブルを使ってクエリを計画するために使用するすべてのメタデータを提供します。

+0

ありがとうございます!私は関連する情報を、返された 'relation'に対して' getHiveQlPartitions'を使って取得し、パラメータとして 'partitionPruningPred'を提供することができました: ' scan.findHiveTables(execPlan).flatMap(e => e.relation.getHiveQlPartitionspartitionPruningPred)) ' これには、すべての入力ファイルへのパスを含め、必要なすべてのデータが含まれており、適切にパーティションをプルーニングしました。 これには低レベルのパッケージプライベートアクセスが必要であり、標準の 'inputFiles'はそれ自体ではありませんが残念です。私はそれがパフォーマンス上の理由だと思いますか? – binarek

関連する問題