2016-11-03 7 views
2

私は簡単なスパークプログラムを書いて、それを分散サーバに配備したい。それはかなり簡単です:スパーク:分散システムでのパフォーマンスの低下。どのように改善する>

訓練の結果を確認するには、データ - >アレンジデータ - >トレーニングデータ - >再適用を取得します。

入力データはわずか10K行で、3つの機能があります。 "local [*]"を使ってローカルマシンで最初に実行します。それはちょうど約3分実行されます。 クラスタにデプロイすると、非常に遅く実行されます。トレーニング段階では非常に遅くなります。

私が何か問題があった場合、私は不思議です。確認してください。私はSpark 1.6.1を使用します。

私が提出:

spark-submit --packages com.databricks:spark-csv_2.11:1.5.0 orderprediction_2.11-1.0.jar --driver-cores 1 --driver-memory 4g --executor-cores 8 --executor-memory 4g 

コードはここにある:

def main(args: Array[String]) { 
    // Set the log level to only print errors 
    Logger.getLogger("org").setLevel(Level.ERROR) 

    val conf = new SparkConf() 
     .setAppName("My Prediction") 
     //.setMaster("local[*]") 
    val sc = new SparkContext(conf) 
    val sqlContext = new org.apache.spark.sql.SQLContext(sc) 
    val data = sqlContext.read 
     .option("header","true") 
     .option("delimiter", "\t") 
     .format("com.databricks.spark.csv") 
     .option("inferSchema","true") 
     .load("mydata.txt") 

    data.printSchema() 
    data.show() 

    val dataDF = data.toDF().filter("clicks >=10") 
    dataDF.show() 

    val assembler = new VectorAssembler() 
     .setInputCols(Array("feature1", "feature2", "feature3")) 
     .setOutputCol("features") 

    val trainset = assembler.transform(dataDF).select("target", "features") 
    trainset.printSchema() 
    val trainset2 = trainset.withColumnRenamed("target", "label") 

    trainset2.printSchema() 
    val trainset3 = trainset2.withColumn("label", trainset2.col("label").cast(DataTypes.DoubleType)) 
    trainset3.cache() // cache data into memory 
    trainset3.printSchema() 
    trainset3.show() 

    // Train a RandomForest model. 

    println("training Random Forest") 

    val rf = new RandomForestRegressor() 
     .setLabelCol("label") 
     .setFeaturesCol("features") 
     .setNumTrees(1000) 

    val rfmodel = rf.fit(trainset3) 

    println("prediction") 
    val result = rfmodel.transform(trainset3) 

    result.show() 
} 

更新:調査の後、私はそれはそれはすでに1.1時間を費やし

collectAsMap at RandomForest.scala:525 

で詰まりましたこの行では、まだ未完成です。データは、数メガバイトと信じています。

+0

クラスタで使用しているエグゼキュータの数はいくつですか?あなたはエグゼキュータのメモリを増やそうとしましたか、クラスタで時間がかかる段階を見ましたか? –

+0

--driver-core 1 - ドライバ - メモリ4g --executor-core 8 --executor-memory 4g。実際には、ファイルはわずか180Mなので、メモリは十分に十分なはずです。 – lserlohn

+0

デシジョンツリーのトレーニングで訓練段階でした – lserlohn

答えて

0

1000個のインスタンスを訓練する1000個のランダムツリーからなるランダムフォレストを構築しています。

コードcollectAsMapが最初のアクションであり、残りのすべてが変換(遅延評価されている)です。それで、が表示されている間にその行にがぶら下がっているので、今のところmaps, flatMaps, filters, groupBy,などが評価されています。

+0

しかし、なぜそれは時間がかかりますか? – lserlohn

関連する問題