遠隔測定装置からクラウド上の場所にポンピングされるcsvファイルを読み込み、Mongodbストアに関連データを保存するという要件があります。 Spark Streamingを使用して新しいファイル(毎分、時にはさらに頻繁に到着する)を読み込み、MomgoDB-Sparkコネクタを使用しています。問題は、データがMomgoDBに読み込まれていないことです。私は自分のコードにDataframeのshow()ステップを追加しました。これはコンソールに正しく表示されています。つまり、Streamingアプリケーションはファイルを読み込み、処理しています。しかし、MongoDBに保存する最後のステップは起こっていません。私のコードは次のようになりますMongoDBバックエンドを使用したSparkストリーミング
reqdata.foreachRDD { edata =>
import sqlContext.implicits._
val loaddata = edata.map(w => EnergyData(w(0).toString,w(1).toString,w(2).toString)).toDF()
loaddata.show()
loaddata.printSchema();
MongoSpark.save(loaddata.write.option("uri","mongodb://127.0.0.1:27017/storedata.energydata").mode("overwrite"))
}
ssc.start()
loaddata.show()
の機能はデータをうまく表示しています。
は、私のようないくつかの奇妙なラインをMongoDBのログをチェックし、発見した
「2016-09-07T08:12:30.109から0700 I NETWORKは、[initandlisten]接続127.0.0.1:55694#212から受け入れました(今オープン3つの接続) 2016-09-07T08:12:30.111から0700私は[conn212] CMDをコマンド:今すぐ「
をstoredata.energydataをドロップモンゴがまったくコレクションをドロップする理由、私は理解していません。 非常に私はappend
に省電力モードを変更することにより、それを自分で解決
_IモンゴがALL_のコレクションをドロップする理由を理解していません - '.mode(「上書き」 ) ' – zero323