2016-12-29 58 views
4

データフレームをファイルに書き込むテストプログラムがあります。データフレームは、データフレームで100000行がありSparkクラスタ上のファイルへのデータフレームの書き込みが非常に遅い

1,2,3,4,5,6,7.....11 
2,3,4,5,6,7,8.....12 
...... 

のように、行ごとに連続番号を追加することによって生成されるが、私はそれが大きすぎるとは思いません。 Sparkタスクを送信すると、データフレームをHDFS上のファイルに書き込むのに20分ほどかかります。なぜそれが遅いのか、パフォーマンスを改善する方法が不思議です。

val sc = new SparkContext(conf) 
val sqlContext = new org.apache.spark.sql.SQLContext(sc) 
val numCol = 11 

val arraydataInt = 1 to 100000 toArray 
val arraydata = arraydataInt.map(x => x.toDouble) 
val slideddata = arraydata.sliding(numCol).toSeq 
val rows = arraydata.sliding(numCol).map { x => Row(x: _*) } 
val datasetsize = arraydataInt.size 

val myrdd = sc.makeRDD(rows.toSeq, arraydata.size - numCol).persist() 

val schemaString = "value1 value2 value3 value4 value5 " + 
        "value6 value7 value8 value9 value10 label" 

val schema = 
StructType(schemaString.split(" ").map(fieldName => StructField(fieldName, DoubleType, true))) 

val df = sqlContext.createDataFrame(myrdd, schema).cache() 

    val splitsH = df.randomSplit(Array(0.8, 0.1)) 
val trainsetH = splitsH(0).cache() 
val testsetH = splitsH(1).cache() 

println("now saving training and test samples into files") 

trainsetH.write.save("TrainingSample.parquet") 
testsetH.write.save("TestSample.parquet") 

答えて

2

電源を入れ

val myrdd = sc.makeRDD(rows.toSeq, arraydata.size - numCol).persist() 

val myrdd = sc.makeRDD(rows.toSeq, 100).persist() 

するには、arraydata.size - numColパーティションでRDDを作ったし、各パーティションには、余分な実行時間を要するタスクにつながります。一般に、パーティションの数は、並列度とその余分なコストの間のトレードオフです。 100個のパーティションを試してください。

ところで、公式Guideでは、クラスタ内のCPUの数の2〜3倍を設定することをお勧めします。

+0

答えに感謝します!それは本質的に私の問題を解決しました~~~! – lserlohn

関連する問題