-1
私はcsvを入力とし、各行を読み込み、すべての行のドキュメント分類を行い、予測されたドキュメントラベルをMySQLデータベースに保存するスケーラコードを持っています。Scalaシリアライズコードをパラレルオペレーションに変換する
スニペットの問題は、csvに3200行があることがありますが、操作全体を完了するまでに多くの時間がかかります。私はこのコードを変換する必要があります。例えば、csvはエグゼキュータの間で配布され、ドキュメント予測を行い、ラベルを保存します。これは、ANにあなたのCSVの内容に変わります
sparkSession.read
.option("header", "true")
.option("inferSchema", "true") //Maybe
.csv(args(4))
.rdd { row =>
...
}
:すべてを行う必要がスパークでの組み込みのCSV機能を使用している
val reader = new CSVReader(new FileReader(args(4)))
var readFirstLine = false;
for (row <- reader.readAll) {
if(readFirstLine) {
var date = row(1).split(" ");
var split_date = date(0).split('-').toList;
val documentTransformed = tf.transform(row(2).split(" "))
val emotionPredicted = model.predict(documentTransformed)
val emotionMapped = emotionMaps(emotionPredicted);
//Insert Emotions
var query = "insert into emotions_values(user_id, year, month, day, emotion)" + "values ('"+ args(5) +"', '"+ split_date(0) +"', '"+ split_date(1) +"', '"+ split_date(2) +"', '"+ emotionMapped +"')";
statement.executeUpdate(query)
val polarityPredicted = polarityModel.predict(documentTransformed)
val polarityMapped = polarityMaps(polarityPredicted);
//Insert Polarity
var polarityQuery = "insert into polarity_values(user_id, year, month, day, polarity)" + "values ('"+ args(5) +"', '"+ split_date(0) +"', '"+ split_date(1) +"', '"+ split_date(2) +"', '"+ polarityMapped +"')";
statement.executeUpdate(polarityQuery)
}
else {
readFirstLine = true;
}
}
これは非常に広い質問です。スパークの仕事をあなたのために書くようにお願いしているようです。あなたが欲しいSparkの特定の質問に絞り込むことができれば、より良い回答を得ることができます。 – DNA
csvからエグゼキュータに行を配布するために必要なことは、ドキュメントのラベリングと予測したラベルをmySQLに挿入することですか?私はすでに文書のラベル付けとmysqlへのデータの挿入を行っています。私はexecutorにcsvの行を配布する方法を見つける必要がありますか? – user2738965
スパークのためのcsvリーダーがあります:https://github.com/databricks/spark-csv 私はそれが時代遅れであるかどうかわかりません。 – vefthym