2016-06-30 12 views
4

私は、Spark 1.6のいくつかのコードをSpark 2.0の新しいものを使ってSpark 2.0に移植しようとしています。まず、Spark 2.0のcsvリーダーを使用したいと思います。ところで、私はpysparkを使用しています。Spark 2.0 csvパーティション数(PySpark)

「古い」textFile機能では、パーティションの最小数を設定できます。例:

df = spark.read.csv('/home/xpto/text.csv', header=True) 
... 
をしかし、私は minPartitionsを設定する方法を見つけることができませんでした:

file= sc.textFile('/home/xpto/text.csv', minPartitions=10) 
header = file.first() #extract header 
data = file.filter(lambda x:x !=header) #csv without header 
... 

、スパーク2.0で私が直接csvファイルを読むことができます。

私のコードのパフォーマンスをテストするために必要です。

Thxを、 フレッド

答えて

4

行の数は、あなたがに分割しようとしているパーティションの数より少ない場合、スパークは基本的にパーティを無視します。 Coalesce(狭変換)は、増やさないパーティションの数を減らすために常に使用されます。増やすには rdd.repartition(300)を使用できます。また、coalesce()を使用しているときに気づきました。 coalesce(100,shuffle=True)のパフォーマンスはcoalesce(100)よりはるかに優れています。あなたのコードで試してみて、後で私に感謝してください。 更新:パーティション数を増やしたり、パーティションを減らしたり、データを均等にシャッフルしたりするためのrddの場合、coalesce(パーティション数、シャッフル= true)を使用できます。注意すべき点は、これによりすべてのノードシャッフルのデータがトリガされることです。

1

は、私はそれを考え出しました。 DataFrame(およびRDD)には、 "合体"というメソッドがあります。パーティションの数を設定できる場所。

例:私の場合は

>>> df = spark.read.csv('/home/xpto/text.csv', header=True).coalesce(2) 
>>> df.rdd.getNumPartitions() 
2 

、スパークは、153個のパーティションに私のファイルをsplited。私はパーティション数を10に設定できますが、300に設定しようとすると153を無視して再度使用します(理由はわかりません)。

REF: https://spark.apache.org/docs/2.0.0-preview/api/python/pyspark.sql.html#pyspark.sql.DataFrame.coalesce

+1

'coalesce()'はパーティションの数を減らすためにのみ使用できます。 'coalesce()'はパーティションの数を増やしません。そのためには、 'repartition()'を使ってspark workerの周りのデータシャッフルのコストを支払う必要があります。 – DavidF

関連する問題