2017-11-03 6 views
0

Hiveテーブルからデータを読み込むSparkSQLクエリを実行しようとしていて、特定のしきい値を超えると失敗します。マジックナンバーの一種であること500K行のSparkSQL rddパーティションがキャッシュメモリに収まらない

val 500k = spark.sql("""select myid, otherfield, count(*) as cnt from mytable 
group by otherfield, myid order by cnt desc limit 500000""").cache(); 

500k.show(); 

私は、コマンドを実行します。私は、タスクがエラーのために失敗した高い行く場合:

15:02:05 WARN MemoryStore: Not enough space to cache rdd_52_0 in memory! (computed 2046.7 MB GB so far) 
15:02:05 INFO MemoryStore: Memory use = 624.7 KB (blocks) + 2043.5 MB (scratch space shared across 1 tasks(s)) = 2044.1 MB. Storage limit = 2.7 GB. 
15:02:05 WARN CacheManager: Persisting partition rdd_52_0 to disk instead. 
15:17:56 ERROR Executor: Exception in task 1.0 in stage 4.0 (TID 24002) 
java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE 

のWebUIでストレージの下で、私はディスクに書き込まれた後、サイズ3.1ギガバイトでrdd_52_0を見ることができます。

Understanding Spark partitioningwhy I got the error: "Size exceed Integer.MAX_VALUE" when using spark+cassandra? と相談したところ、rddが大きくなっていることが原因です。 WebUIでは、1つのrddしか表示されていませんが、これは問題です。シャッフル後にキャッシュされたrddの数を強制的に増やすにはどうすればよいですか?私は500k.repartition(100)でそれを再分割しようとしている

は、私はspark.default.parallelismと同じにしようとしている、spark.sessionstate.conf.setConf(SHUFFLE_PARTITIONS, 100)とshufflePartitionsの数が増加していると私は16ギガバイトの両方ドライバとエグゼキュータメモリを増加させた - すべての大成功せず。

また、2.7 GBの制限はどこから来ていますか?

+0

を "持続" あなたは[この](https://stackoverflow.com/questions/42247630/sql-query-in-spark-scala-size-exceeds-をチェックすることもできます使用してください整数 - 最大値) – MaxU

+0

@MaxUはリンクに感謝しますが、基本的に私が参照している2番目の記事と同じです。 rddは再分割を無視しています(100)。 –

答えて

0

rdd.persist(StorageLevel) where Storagelevel = { 
MEMORY_ONLY, MEMORY_AND_DISK, 
MEMORY_ONLY_SER, MEMORY_AND_DISK_SER, DISK_ONLY} 


cachce is rdd.persist(MEMORY_ONLY). 
please use rdd.persist(MEMORY_AND_DISK) 
+0

あなたの答えをありがとう。残念ながら、これは私の問題を完全には解決しません.Sparkをディスクにダンプするだけです。 –

関連する問題