私のデータセットは、このように分割されている範囲:スパークSQLクエリ
Year=yyyy
|---Month=mm
| |---Day=dd
| | |---<parquet-files>
2つの日付の間のデータをロードスパークでのデータフレームを作成する最も簡単かつ効率的な方法は何ですか?
私のデータセットは、このように分割されている範囲:スパークSQLクエリ
Year=yyyy
|---Month=mm
| |---Day=dd
| | |---<parquet-files>
2つの日付の間のデータをロードスパークでのデータフレームを作成する最も簡単かつ効率的な方法は何ですか?
あなたは絶対にこのパーティション化戦略に固執する必要がある場合は、答えはあなたがパーティションを負担する意思があるかどうかに依存します発見コストかどうか。
新しいファイルを追加するまで、Sparkがすべてのパーティションを検出する必要がある場合は、ベースパスをロードしてからパーティションカラムを使用してフィルタすることができます。
たとえば、何百万ものファイルがあるためにSparkにすべてのパーティションを検出させたくない場合は、効率的な一般的な解決策は、照会したい間隔を簡単に照会できる複数のサブ間隔に分割することです@ r0bb23のアプローチを使用し、次に一緒に結合します。
上記の両方のケースを最大限に活用し、安定したスキーマが必要な場合は、外部パーティション表を定義してメタストアにパーティションを登録できます。メタストア管理テーブルが現時点でスキーマの進化を非常にうまく管理しているため、スキーマの進化が期待される場合は、これを行わないでください。これは確かに可能であるため、一般的なコードを書く
// With full discovery
spark.read.parquet("hdfs:///basepath")
.where('Year === 2017 && (
('Month === 10 && 'Day >= 6') || ('Month === 11 && 'Day <= 3')
))
// With partial discovery
val df1 = spark.read.option("basePath", "hdfs:///basepath/")
.parquet("hdfs:///basepath/Year=2017/Month=10/Day={0[6-9], [1-3][0-9]}/*/")
val df2 = spark.read.option("basePath", "hdfs:///basepath/")
.parquet("hdfs:///basepath/Year=2017/Month=11/Day={0[1-3]}/*/")
val df = df1.union(df2)
が、私はそれに遭遇していない。例えば
は、あなたがしたい2017-10-06
間と2017-11-03
照会します。よりよいアプローチは、私が質問にしたコメントに概説されている方法で分割することです。あなたのテーブルが/basepath/ts=yyyymmddhhmm/*.parquet
のようなものを使用して分配した場合、答えは単純です:
spark.read.parquet("hdfs:///basepath")
.where('ts >= 201710060000L && 'ts <= 201711030000L)
それは時間に&分を加える価値がある理由はあなたが関係なく、あなたが分割されたデータを持っているかどうかの間隔を扱う汎用的なコードを書くことができるということです週、日、時、または15分ごとに実際には、同じテーブル内の異なる粒度でデータを管理することもできます。たとえば、古いデータを上位レベルで集約して、検出する必要があるパーティションの総数を減らすことができます。
コメントをアドレス指定するために複数のロードパスを追加するように編集しました。
正規表現の構文を使用できます。
val dataset = spark
.read
.format("parquet")
.option("filterPushdown", "true")
.option("basePath", "hdfs:///basepath/")
.load("hdfs:///basepath/Year=2017/Month=10/Day={0[6-9],[1-3][0-9]}/*/",
"hdfs:///basepath/Year=2017/Month=11/Day={0[1-3]}/*/")
How to use regex to include/exclude some input files in sc.textFile?
注:あなたはすべての日、月をしたい場合X=*
あなただけなど、
を*
を行うことができますあなたはおそらくいくつかは、読んでくださいなければならない必要はありません。 Predicate Pushdown(filterPushdownは上記のtrueに設定されています)。
最後に、あなたは上記のBasePathのオプションに気づくでしょう、その理由は、ここで見つけることができます:Prevent DataFrame.partitionBy() from removing partitioned columns from schema
にあります。これは問題の一般的な解決策ではありません。実際、この区分化戦略を使用して日付の間隔を照会する簡単な一般的な解決策はありません。たとえば、このアプローチを「2017-10-06」と「2017-11-03」の間のクエリにどのように使用しますか? – Sim
あなたの答えにいくつかの良い情報があります。しかし、あなたの答えに表示されている組合は必要ありません(上記の編集を参照)。だから私は、あなたがそれを信用しているよりももっと一般化できると思うとは言いがたいが、それほど美しいヘルパー関数は必要ないだろう。しかし、システムの大部分ではないにしても、多くの場合、それはその価値があります。あなたが認めているように、パーティションの発見は規模の安いものではないからです。部分的な発見は規模のほうがはるかに優れています。私は同意するが、より良いパーティショニング戦略が助けになるだろう。私はヘルパー関数と上記のコードを自明にする、あなたが以下にあるもののようなものをもっと使用します。 – r0bb23
パーティションで範囲照会が簡単になるようにするには、最適な解決方法は、単一の軸上の時間(例えば、/ tbl/ts = yyyymmddhhmm/*。このトピックのセクションはhttps://spark-summit.org/east-2017/events/bulletproof-jobs-patterns-for-large-scale-spark-processing/ – Sim