2016-09-07 9 views
1

遠隔測定装置からクラウド上の場所にポンピングされるcsvファイルを読み込み、Mongodbストアに関連データを保存するという要件があります。 Spark Streamingを使用して新しいファイル(毎分、時にはさらに頻繁に到着する)を読み込み、MomgoDB-Sparkコネクタを使用しています。問題は、データがMomgoDBに読み込まれていないことです。私は自分のコードにDataframeのshow()ステップを追加しました。これはコンソールに正しく表示されています。つまり、Streamingアプリケーションはファイルを読み込み、処理しています。しかし、MongoDBに保存する最後のステップは起こっていません。私のコードは次のようになりますMongoDBバックエンドを使用したSparkストリーミング

reqdata.foreachRDD { edata => 
    import sqlContext.implicits._ 
    val loaddata = edata.map(w => EnergyData(w(0).toString,w(1).toString,w(2).toString)).toDF() 
    loaddata.show() 
    loaddata.printSchema(); 
    MongoSpark.save(loaddata.write.option("uri","mongodb://127.0.0.1:27017/storedata.energydata").mode("overwrite")) 
} 

ssc.start() 

loaddata.show()の機能はデータをうまく表示しています。

は、私のようないくつかの奇妙なラインをMongoDBのログをチェックし、発見した

「2016-09-07T08:12:30.109から0700 I NETWORKは、[initandlisten]接続127.0.0.1:55694#212から受け入れました(今オープン3つの接続) 2016-09-07T08:12:30.111から0700私は[conn212] CMDをコマンド:今すぐ「

をstoredata.energydataをドロップモンゴがまったくコレクションをドロップする理由、私は理解していません。 非常に私はappendに省電力モードを変更することにより、それを自分で解決

+3

_IモンゴがALL_のコレクションをドロップする理由を理解していません - '.mode(「上書き」 ) ' – zero323

答えて

0

をいただければ幸いですすべてのヘルプ:

MongoSpark.save(loaddata.write.option("uri","mongodb://127.0.0.1:27017/storedata.energydata").mode("append")) 
関連する問題