私は、30秒ごとに(5秒もできる)ファイルシステムでいくつかのファイルを削除するアプリケーションで作業しています。私はそれを解析し、いくつかのレコードをREDISにプッシュする必要があります。スパークローカルファイルストリーミング - フォールトトレランス
すべてのレコードは独立しており、私はupdateStateByKey
が必要な計算を行っていません。
私の質問は、いくつかの問題(例えば、REDIS接続の問題、ファイル内のデータの問題など)で、一部のファイルが完全に処理されない場合です。ファイルを再解析(n回)し、すでに処理されたファイル
テスト目的のために、私はローカルフォルダから読んでいます。また、私は
val lines = ssc.textFileStream("E:\\SampleData\\GG")
val words = lines.map(x=>x.split("_"))
words.foreachRDD(
x=> {
x.foreach(
x => {
var jedis = jPool.getResource();
try{
i=i+1
jedis.set("x"+i+"__"+x(0)+"__"+x(1), x(2))
}finally{
jedis.close()
}
}
)
}
)