2017-06-02 1 views
0

私は、30秒ごとに(5秒もできる)ファイルシステムでいくつかのファイルを削除するアプリケーションで作業しています。私はそれを解析し、いくつかのレコードをREDISにプッシュする必要があります。スパークローカルファイルストリーミング - フォールトトレランス

すべてのレコードは独立しており、私はupdateStateByKeyが必要な計算を行っていません。

私の質問は、いくつかの問題(例えば、REDIS接続の問題、ファイル内のデータの問題など)で、一部のファイルが完全に処理されない場合です。ファイルを再解析(n回)し、すでに処理されたファイル

テスト目的のために、私はローカルフォルダから読んでいます。また、私は

val lines = ssc.textFileStream("E:\\SampleData\\GG") 
val words = lines.map(x=>x.split("_")) 
words.foreachRDD(
    x=> { 
    x.foreach(   
     x => { 
     var jedis = jPool.getResource(); 
     try{ 
      i=i+1 
      jedis.set("x"+i+"__"+x(0)+"__"+x(1), x(2)) 
     }finally{ 
      jedis.close() 
     } 
     } 
    ) 
    } 
) 

答えて

関連する問題