2016-10-14 12 views
2

EMR 4.3でSpark 1.6を使用して、ハイブメタストア内のテーブルに属する15TBのデータをクエリします(S3のgzippedパーケットファイルを利用)。私のクラスタでは、私はr3.8xlargeのマスターノードと15のr3.8xlargeコアノード(3.6TB RAM、9.6TB SSD)を持っています。Spark SQL 1.6.0 - 単純なクエリの大容量メモリ使用

〜15TBのデータは、おそらく90億行に含まれています。各行には、長さが5-50の文字列を格納する〜15個の列と、〜30個の文字列の配列を含む1つの列(それぞれ10〜20文字)があります。配列に格納されるユニークな文字列は〜100万個だけです。私がしようとしているのは、配列の列の一意の文字列を数えることですが、私は次のようにメモリが不足しているようです。OutOfMemoryError:エグゼキュータで新しいネイティブスレッドを作成できません。メモリ不足エラーのためにタスクが失敗し、エグゼキュータが無効になり、ジョブが失敗します。

私は5-10TBのデータを照会するときに動作します。私は何がメモリに格納されるのかを正しく理解してはいけません(これは私が理解しようとしているものです)。 Btwは、上記のクラスタで、私は設定しています:

私はSpark SQLが中間テーブルをメモリに保存したとは思わなかった。一意の文字列は1M以上ありませんので、その数のある文字列はメモリに簡単に収まるはずです。ここでは、クエリです:

val initial_df = sqlContext.sql("select unique_strings_col from Table where timestamp_partition between '2016-09-20T07:00:00Z' and '2016-09-23T07:00:00Z'") 
initial_df.registerTempTable("initial_table") // ~15TB compressed data to read in from S3 

val unique_strings_df = sqlContext.sql("select posexplode(unique_strings_col) as (string_pos, string) from initial_table").select($"string_pos", $"string") 
unique_strings_df.registerTempTable("unique_strings_table") // ~70% initial data remaining at this point 

val strings_count_df = sqlContext.sql("select string, count(*) as unique_string_count from unique_strings_table where string_pos < 21 group by string order by unique_string_count desc") // ~50% initial data remaining at this point 
strings_count_df.write.parquet("s3://mybucket/counts/2016-09-20-2016-09-23") 

圧縮された寄木細工のファイルは、(各5メガバイトと言う)小さいです。一度に1つずつ読むことができ、フィルタリングされ、カウントされて保存できるようです。私は何が欠けていますか?

答えて

1

最初のRDDを保存するのに十分なディスク+メモリスペースが必要です。一時テーブルを作成する前に、最初のRDDでフィルタリングを行うと、クエリを正常に実行できます。わーい!

関連する問題