2017-11-15 3 views
0

私はS3からCSVファイルを読み込むことになっている非常に単純なpysparkプログラムがあります:ローカルスパークノードを実行しているときSparkがパーティションをファイルサイズ(バイト単位)に設定するのはなぜですか?

r = sc.textFile('s3a://some-bucket/some-file.csv') 
    .map(etc... you know the drill...) 

これは失敗していたが(それはEMRで動作します)。 OOMエラーやGCのクラッシュが発生しました。さらに調べると、パーティションの数が非常に多くなっていることがわかりました。この特定の場合、r.getNumPartitions()2358041を返します。

私はそれがバイト単位のファイルサイズと同じであることに気付きました。これはもちろん、スパークが悲惨にクラッシュする原因となります。

私はmapred.min.split.sizeをchaningのように、異なる構成を試してみた:

conf = SparkConf() 
conf.setAppName('iRank {}'.format(datetime.now())) 
conf.set("mapred.min.split.size", "536870912") 
conf.set("mapred.max.split.size", "536870912") 
conf.set("mapreduce.input.fileinputformat.split.minsize", "536870912") 

私も無駄に、repartitionを使用して、またはtextFileにパーティションの引数を渡す変更しようとしました。

私は、ファイルサイズからパーティションの数を派生させるのが良い考えであるとSparkが考えていることを知りたいです。

答えて

1

一般にはそうではありません。 eliasahによってhis answerからSpark RDD default number of partitionsによく説明されているように、maxminPartitions(2が指定されていない場合は2)であり、分割はHadoop入力形式で計算されます。

後者は、構成によって指示された場合に限り、不合理に高くなります。これは、いくつかの設定ファイルがあなたのプログラムを妨害することを示唆しています。

コードで考えられる問題は、間違った設定を使用していることです。 Hadoopオプションは、hadoopConfigurationを使用してSparkの設定で設定する必要があります。あなたはプライベートJavaSparkContextインスタンスを使用する必要がありますので、あなたは、Pythonを使うように見えます:

sc = ... # type: SparkContext 

sc._jsc.hadoopConfiguration().setInt("mapred.min.split.size", min_value) 
sc._jsc.hadoopConfiguration().setInt("mapred.max.split.size", max_value) 
+0

それは魅力的なように機能しました。ありがとうございました。 – Cristian

1

これを行うだろうHadoopの2.6のバグが実際にありました。最初のS3AリリースではSparkに分割するブロックサイズは提供されませんでした。デフォルトの「0」は1ジョブあたり1バイトを意味しました。

以降のバージョンでは、ブロックサイズを指定するconfigオプションとしてfs.s3a.block.sizeが必要です.33554432(= 32 MB)のようなものが開始になります。

Hadoop 2.6.xを使用している場合。 S3Aは使用しないでください。それが私のお勧めです。

関連する問題