2017-04-21 7 views
-1

私はcsvを入力とし、各行を読み込み、すべての行のドキュメント分類を行い、予測されたドキュメントラベルをMySQLデータベースに保存するスケーラコードを持っています。Scalaシリアライズコードをパラレルオペレーションに変換する

スニペットの問題は、csvに3200行があることがありますが、操作全体を完了するまでに多くの時間がかかります。私はこのコードを変換する必要があります。例えば、csvはエグゼキュータの間で配布され、ドキュメント予測を行い、ラベルを保存します。これは、ANにあなたのCSVの内容に変わります

sparkSession.read 
    .option("header", "true") 
    .option("inferSchema", "true") //Maybe 
    .csv(args(4)) 
    .rdd { row => 
     ... 
    } 

:すべてを行う必要がスパークでの組み込みのCSV機能を使用している

val reader = new CSVReader(new FileReader(args(4))) 
    var readFirstLine = false; 

    for (row <- reader.readAll) { 
     if(readFirstLine) { 
      var date = row(1).split(" "); 
      var split_date = date(0).split('-').toList; 
      val documentTransformed = tf.transform(row(2).split(" ")) 
      val emotionPredicted = model.predict(documentTransformed) 
      val emotionMapped = emotionMaps(emotionPredicted);   

      //Insert Emotions    
      var query = "insert into emotions_values(user_id, year, month, day, emotion)" + "values ('"+ args(5) +"', '"+ split_date(0) +"', '"+ split_date(1) +"', '"+ split_date(2) +"', '"+ emotionMapped +"')"; 
      statement.executeUpdate(query) 

      val polarityPredicted = polarityModel.predict(documentTransformed) 
      val polarityMapped = polarityMaps(polarityPredicted); 

      //Insert Polarity 
      var polarityQuery = "insert into polarity_values(user_id, year, month, day, polarity)" + "values ('"+ args(5) +"', '"+ split_date(0) +"', '"+ split_date(1) +"', '"+ split_date(2) +"', '"+ polarityMapped +"')"; 
      statement.executeUpdate(polarityQuery) 
     } 
     else { 
      readFirstLine = true; 
     } 
    } 
+1

これは非常に広い質問です。スパークの仕事をあなたのために書くようにお願いしているようです。あなたが欲しいSparkの特定の質問に絞り込むことができれば、より良い回答を得ることができます。 – DNA

+0

csvからエグゼキュータに行を配布するために必要なことは、ドキュメントのラベリングと予測したラベルをmySQLに挿入することですか?私はすでに文書のラベル付けとmysqlへのデータの挿入を行っています。私はexecutorにcsvの行を配布する方法を見つける必要がありますか? – user2738965

+1

スパークのためのcsvリーダーがあります:https://github.com/databricks/spark-csv 私はそれが時代遅れであるかどうかわかりません。 – vefthym

答えて

0

- 後

は、コードスニペットですRDDを編集し、必要に応じて操作することができます。単にheaderオプションをtrueに設定すると、最初の行は無視されます。

私はあなたがcsvメソッドによって返さDataFrameで動作することができるかどうかを検討して助言する - というrddメソッドによって返さRDDより - スパークにCatalyst optimizationsを利用することができます。

関連する問題