2017-02-15 13 views
9

Sparkを使用しているS3イベントに対して簡単なSQLクエリを作成しようとしています。私は次のようにJSONファイルの〜30ギガバイトをロードしています:Spark/scalaサイズのSQLクエリがInteger.MAX_VALUEを超えています

val d2 = spark.read.json("s3n://myData/2017/02/01/1234"); 
d2.persist(org.apache.spark.storage.StorageLevel.MEMORY_AND_DISK); 
d2.registerTempTable("d2"); 

その後、私は私のクエリの結果をファイルに記述しようとしています:

val users_count = sql("select count(distinct data.user_id) from d2"); 
users_count.write.format("com.databricks.spark.csv").option("header", "true").save("s3n://myfolder/UsersCount.csv"); 

しかし、スパークは次の例外投げている:

java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE 
at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:869) 
at org.apache.spark.storage.DiskStore$$anonfun$getBytes$2.apply(DiskStore.scala:103) 
at org.apache.spark.storage.DiskStore$$anonfun$getBytes$2.apply(DiskStore.scala:91) 
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1287) 
at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:105) 
at org.apache.spark.storage.BlockManager.getLocalValues(BlockManager.scala:439) 
at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:672) 
at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:330) 
at org.apache.spark.rdd.RDD.iterator(RDD.scala:281) 
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) 
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319) 
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283) 
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) 
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319) 
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283) 
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) 
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319) 
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283) 
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79) 
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47) 
at org.apache.spark.scheduler.Task.run(Task.scala:85) 
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274) 
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
at java.lang.Thread.run(Thread.java:745) 

同じクエリが、より少量のデータに対して機能することに注意してください。ここで何が問題なの?

+0

パーティションサイズが制限を超えている可能性が最も高い問題です。 '.repartition(100)'などを試してみてください。データを読み込んだ後でこれを解決してください –

+0

'%d2 = spark.read.json(" s3n:// myData/2017/02/01/1234 ")。repartition(1000)'リファレンスhttps://issues.apache.org/jira/browse/SPARK-1476 –

+0

補足として、新しい 's3a 's3n'の代わりに' '。例えば、 http://stackoverflow.com/questions/33356041/technically-what-is-the-difference-between-s3n-s3a-and-s3 – sgvd

答えて

22

いいえスパークシャッフルブロックは2GB(Integer.MAX_VALUEバイト)より大きくても構いませんので、より多くの/より小さなパーティションが必要です。

spark.default.parallelismとspark.sql.shuffle.partitions(デフォルトは200)を調整して、パーティションの数が2GBの制限に達することなくデータに対応できるようにする必要があります(256MB/200GBの場合は800パーティションになります)。何千ものパーティションが非常に一般的ですので、1000に再パーティションすることをお勧めします。

FYI、あなたがrdd.getNumPartitions(すなわちd2.rdd.getNumPartitions)のようなものでRDDのためのパーティションの数を確認することが

(オープン、様々な2GBの制限に対処する努力を追跡する話があったですしばらくの間):https://issues.apache.org/jira/browse/SPARK-6235

このエラーの詳細については、http://www.slideshare.net/cloudera/top-5-mistakes-to-avoid-when-writing-apache-spark-applications/25を参照してください。

+0

答えをありがとう! – eexxoo

+0

説明をありがとう!また、デフォルトパーティションの数を編集するにはhttps://stackoverflow.com/questions/45704156/what-is-the-difference-between-spark-sql-shuffle-partitions-and-spark-default-paをご覧ください。 – Raphvanns

関連する問題