エンドユーザーがより大きなファイルで複数のソースを処理しないようにするために集約ファイルを作成しようとしています。これを行うには: A)すべてのソースフォルダを反復処理し、最も一般的に要求される12個のフィールドを取り除き、これらの結果が同じ場所にある新しい場所でパーケットファイルを回転させます。 B)手順Aで作成したファイルを元に戻し、12個のフィールドをグループ化して再集計して、各固有の組み合わせのサマリー行に減らします。なぜSpark Parquetファイルは元のファイルよりも大きいですか?
私が見つけたのは、ステップAがペイロードを5:1に減らすということです(およそ250ギグは48.5ギグになります)。しかしステップBでは、これをさらに減らす代わりに、ステップAより50%増加します。しかし、私のカウントは一致します。
これはSpark 1.5.2を使用しています
フィールド名をfield1 ... field12に置き換えて読みやすくするために変更されたマイコードが、下に私が指摘した結果を示しています。
私は別の5:1削減を必ずしも期待していませんが、同じスキーマを使用して少ない行でストレージ側を増やすために間違って何をしているのか分かりません。私が間違ったことを理解するのを助けることができる人は誰ですか?
ありがとうございます!
//for each eventName found in separate source folders, do the following:
//spit out one row with key fields from the original dataset for quicker availability to clients
//results in a 5:1 reduction in size
val sqlStatement = "Select field1, field2, field3, field4, field5, field6, field7, field8, field9, field10, field11, field12, cast(1 as bigint) as rCount from table"
sqlContext.sql(sqlCommand).coalesce(20).write.parquet("<aws folder>" + dt + "/" + eventName + "/")
//results in over 700 files with a total of 16,969,050,506 rows consuming 48.65 gigs of storage space in S3, compressed
//after all events are processed, aggregate the results
val sqlStatement = "Select field1, field2, field3, field4, field5, field6, field7, field8, field9, field10, field11, field12, sum(rCount) as rCount from results group by field1, field2, field3, field4, field5, field6, field7, field8, field9, field10, field11, field12"
//Use a wildcard to search all sub-folders created above
sqlContext.read.parquet("<aws folder>" + dt + "/*/").registerTempTable("results")
sqlContext.sql(sqlStatement).coalesce(20).saveAsParquetFile("<a new aws folder>" + dt + "/")
//This results in 3,295,206,761 rows with an aggregate value of 16,969,050,506 for rCount but consumes 79.32 gigs of storage space in S3, compressed
//The parquet schemas created (both tables match):
|-- field1: string (nullable = true) (10 characters)
|-- field2: string (nullable = true) (15 characters)
|-- field3: string (nullable = true) (50 characters max)
|-- field4: string (nullable = true) (10 characters)
|-- field5: string (nullable = true) (10 characters)
|-- field6: string (nullable = true) (10 characters)
|-- field7: string (nullable = true) (16 characters)
|-- field8: string (nullable = true) (10 characters)
|-- field9 string (nullable = true) (15 characters)
|-- field10: string (nullable = true)(20 characters)
|-- field11: string (nullable = true)(14 characters)
|-- field12: string (nullable = true)(14 characters)
|-- rCount: long (nullable = true)
|-- dt: string (nullable = true)
spark 2.0.0でDataFrameWriterでbucketByを使用できないようです – eliasah