2017-12-20 21 views
0

大きなシステムで処理されるファイルの進行状況を監視して維持するユーティリティを構築しています。このファイルは、大きな「テキスト」ファイル、.csv、.xls、.txtなどです。これは、カフカからのデータのストリーミング、Avroへの書き込み、またはSQL DBのバルクバッチでの書き込みです。私は処理された行の数を記録し、RESTful API呼び出しを使用してDBに進捗を持続させる "catchall"ユーティリティを構築しようとしています。将来呼び出されるwhileループは、毎回呼び出されません

処理は、処理のタイプに関係なく、常にAkka Actor内で実行されます。私は処理の進行をブロックしないように、非同期的にロギングの進行を試みています。進行は非常に迅速に起こります。

//inside my processing actor 

    var fileIsProcessing = true 
    val allLines = KafkaUtil.getConnect(fileKey) 
    val totalLines = KafkaUtil.getSize 
    val batchSize = 500 
    val dBUtil = new DBUtil(totalLines) 

while (fileIsProcessing) { 

    // consumes @ 500 lines at a time to process, returns empty if done consuming 
    val batch:List[Pollable] = allLines.poll 
    //for batch identification purposes 
    val myMax = batch.map(_.toInt).max 
    println("Starting new batch with max line: " + myMax) 

    //processing work happens here 
    batch.map(processSync) 
    println("Finished processing batch with max line: " + myMax) 

    //send a progress update to be persisted to the DB 
    val progressCall = Future[Unit] {dBUtil.incrementProgress(batch.size)} 
    progressCall.onComplete{ 
      case Success(s) => // don't care 
      case Failure(e) => logger.error("Unable to persist progress from actor ") 
    } 

if (batch.isEmpty) fileIsProcessing = false //this is horribly non-functional. 
} 

そして、シンプルな表現:時にはそれがここでは、インクリメンタルに一つずつ行くんが、それのほとんどはちょうどデモの処理に何が起こるかの基本的な表現である、同様のバッチスタイルのフォーマットで起こります私のDBUTILの、処理やっクラス:今

class DBUtil(totalLines:Int) { 

    //store both the number processed and the total to process in db, even if there is currently a percentage 

var rate = 0 //lines per second 
var totalFinished = 0 
var percentageFin:Double = 0 
var lastUpdate = DateTime.now() 

def incrementProgress(totalProcessed: Int, currentTime:DateTime): Unit = { 
    //simulate write the data and calculated progress percentage to db 
    rate = totalProcessed/((currentTime.getMillis() - lastUpdate.getMillis())/1000) 
    totalFinished += totalProcessed 
    percentageFin = (totalFinished.toDouble/totalLines.toDouble) * 100 
    println(s"Simulating DB persist of total processed:$totalFinished lines at $percentageFin% from my total lines: $totalLines at rate:$rate") 
} 

}

は、本当に奇妙なものを、生産され、処理が行Future[Unit] { dBUtil.incrementProgress(batch.size)}が確実に毎回呼び出されませんので、こんなに早く起きます。 whileループが終了しますが、私のDBには進行状況が50%または80%でハングアップすることに注意します。私がloggerまたはprintlnステートメントを使ってシステムを動かすと、遅くなります。

なぜ私の未来の電話は毎回確実に電話しませんか?

+1

あなたは 'DBUtil'で同期せずに擬似コードを表示しています。そこに例外をスローして気付かないことは想像しやすいです。 –

+0

私は誤りを間違いなくチェックしていますが、適切なところでは '試行 'が使われていますが、Futureはよく.onComplete {case Success => ... case Failure => ...}'上記の擬似コードはできるだけ取り除かれます抽象化の目的のために。将来のコールが何らかの理由で「スキップ」する理由があるのであれば、私はちょっと不思議です。 – NateH06

答えて

1

は、うーん...ので、あなたが持っているコード、

あなたはちょうどあなたのwhileループで先物を開始され、その後、あなたのループは将来が終了するのを待たずに次の反復のために行くといくつかの問題があります。つまり、先物が実際にエグゼクターによって実行される前に、プログラムが終了する可能性があります。

また、あなたのループはdBUtil.incrementProgress(batch.size)への "未来"の呼び出しを作成しています。同時に複数のスレッドが同じ機能を実行するようになります。これにより、可変状態を使用している場合に競合状態が発生します。

def processFileWithIncrementalUpdates(
    allLines: ????, 
    totalLines: Int, 
    batchSize: Int, 
    dbUtil: DBUtil 
): Future[Unit] = { 
    val promise = Promise[Unit]() 
    Future { 
    val batch: List[Pollable] = allLines.poll 
    if (batch.isEmpty) { 
     promise.completeWith(Future.successful[Unit]()) 
    } 
    else { 
     val myMax = batch.map(_.toInt).max 
     println("Starting new batch with max line: " + myMax) 

     //processing work happens here 
     batch.map(processSync) 
     println("Finished processing batch with max line: " + myMax) 

     //send a progress update to be persisted to the DB 
     val progressCall = Future[Unit] { dBUtil.incrementProgress(batch.size) } 

     progressCall.onComplete{ 
     case Success(s) => // don't care 
     case Failure(e) => logger.error("Unable to persist progress from actor ") 
     } 

     progressCall.onComplete({ 
     case _ => promise.completeWith(processFileWithIncrementalUpdates(allLines, totalLines, batchSize, dBUtil)) 
     }) 
    } 
    promise.future 
    } 
} 

val allLines = KafkaUtil.getConnect(fileKey) 
val totalLines = KafkaUtil.getSize 
val batchSize = 500 
val dBUtil = new DBUtil(totalLines) 

val processingFuture = processFileWithIncrementalUpdates(allLines, totalLines, batchSize, dBUtil) 
+0

>あなたはwhileループで先物を開始しています。そして、あなたのループ>は、未来が終わるのを待たずに次の繰り返しのために行きます。 which>は、futureが実際にexecutorによって実行される前に、あなたのプログラムが終了することを意味します。 これはまさに私が欲しいものですが、未来への呼びかけは本当に "失われる"でしょうか?私はすべてのループが終了すると思うだろうが、その後戻って、すべての将来の呼び出しを "追いつく"しようとします。私は本当に、 "火と忘れ"のために俳優、そして注文を保存している待ち行列を呼び出すことを主張しましたが、オーバーヘッドのために拒否されました。 – NateH06

+0

あなたの引用符のフォーマットについて申し訳ありませんが、十分に速く編集しませんでした。 – NateH06

+0

主な問題は、 'incrementProgress'関数を呼び出す異なるスレッド間で可変状態を共有していることです。 –

関連する問題