2016-07-01 12 views
2

エンドユーザーがより大きなファイルで複数のソースを処理しないようにするために集約ファイルを作成しようとしています。これを行うには: A)すべてのソースフォルダを反復処理し、最も一般的に要求される12個のフィールドを取り除き、これらの結果が同じ場所にある新しい場所でパーケットファイルを回転させます。 B)手順Aで作成したファイルを元に戻し、12個のフィールドをグループ化して再集計して、各固有の組み合わせのサマリー行に減らします。なぜSpark Parquetファイルは元のファイルよりも大きいですか?

私が見つけたのは、ステップAがペイロードを5:1に減らすということです(およそ250ギグは48.5ギグになります)。しかしステップBでは、これをさらに減らす代わりに、ステップAより50%増加します。しかし、私のカウントは一致します。

これはSpark 1.5.2を使用しています
フィールド名をfield1 ... field12に置き換えて読みやすくするために変更されたマイコードが、下に私が指摘した結果を示しています。

私は別の5:1削減を必ずしも期待していませんが、同じスキーマを使用して少ない行でストレージ側を増やすために間違って何をしているのか分かりません。私が間違ったことを理解するのを助けることができる人は誰ですか?

ありがとうございます!

//for each eventName found in separate source folders, do the following: 
//spit out one row with key fields from the original dataset for quicker availability to clients 
//results in a 5:1 reduction in size 
val sqlStatement = "Select field1, field2, field3, field4, field5, field6, field7, field8, field9, field10, field11, field12, cast(1 as bigint) as rCount from table" 
sqlContext.sql(sqlCommand).coalesce(20).write.parquet("<aws folder>" + dt + "/" + eventName + "/") 
//results in over 700 files with a total of 16,969,050,506 rows consuming 48.65 gigs of storage space in S3, compressed 

//after all events are processed, aggregate the results 
val sqlStatement = "Select field1, field2, field3, field4, field5, field6, field7, field8, field9, field10, field11, field12, sum(rCount) as rCount from results group by field1, field2, field3, field4, field5, field6, field7, field8, field9, field10, field11, field12" 
//Use a wildcard to search all sub-folders created above 
sqlContext.read.parquet("<aws folder>" + dt + "/*/").registerTempTable("results") 
sqlContext.sql(sqlStatement).coalesce(20).saveAsParquetFile("<a new aws folder>" + dt + "/") 
//This results in 3,295,206,761 rows with an aggregate value of 16,969,050,506 for rCount but consumes 79.32 gigs of storage space in S3, compressed 

//The parquet schemas created (both tables match): 
|-- field1: string (nullable = true) (10 characters) 
|-- field2: string (nullable = true) (15 characters) 
|-- field3: string (nullable = true) (50 characters max) 
|-- field4: string (nullable = true) (10 characters) 
|-- field5: string (nullable = true) (10 characters) 
|-- field6: string (nullable = true) (10 characters) 
|-- field7: string (nullable = true) (16 characters) 
|-- field8: string (nullable = true) (10 characters) 
|-- field9 string (nullable = true) (15 characters) 
|-- field10: string (nullable = true)(20 characters) 
|-- field11: string (nullable = true)(14 characters) 
|-- field12: string (nullable = true)(14 characters) 
|-- rCount: long (nullable = true) 
|-- dt: string (nullable = true) 

答えて

1

一般的に、寄木細工のような柱状記憶フォーマットは、データ分配(データ編成)および個々の列のカーディナリティに関して非常に敏感です。データがより組織化され、カーディナリティが低いほどストレージが効率的になります。

集計は、適用したとおり、データをシャッフルする必要があります。実行計画をチェックすると、ハッシュ・パーティショナを使用していることがわかります。つまり、集計後の分布は、元のデータの分布よりも効率が悪い可能性があります。同時にsumは、行数を減らすことができますが、rCount列のカーディナリティーを増やすことができます。

あなたはそのために修正するためにさまざまなツールを試すことができますが、すべてではないには、Spark 1.5.2で利用可能です:低カーディナリティを持つ列によって

  • ソート完全なデータセットまたはsortWithinPartitions(フルシャッフルのために非常に高価)。
  • メソッドをDataFrameWriterに設定すると、低カーディナリティ列を使用してデータをパーティション化できます。
  • バケツティングとローカルソートを使用してデータ配信を改善するために、bucketBysortByの方法をDataFrameWriter(Spark 2.0.0+)にします。
+0

spark 2.0.0でDataFrameWriterでbucketByを使用できないようです – eliasah

関連する問題