大きなシステムで処理されるファイルの進行状況を監視して維持するユーティリティを構築しています。このファイルは、大きな「テキスト」ファイル、.csv、.xls、.txtなどです。これは、カフカからのデータのストリーミング、Avroへの書き込み、またはSQL DBのバルクバッチでの書き込みです。私は処理された行の数を記録し、RESTful API呼び出しを使用してDBに進捗を持続させる "catchall"ユーティリティを構築しようとしています。将来呼び出されるwhileループは、毎回呼び出されません
処理は、処理のタイプに関係なく、常にAkka Actor内で実行されます。私は処理の進行をブロックしないように、非同期的にロギングの進行を試みています。進行は非常に迅速に起こります。
//inside my processing actor
var fileIsProcessing = true
val allLines = KafkaUtil.getConnect(fileKey)
val totalLines = KafkaUtil.getSize
val batchSize = 500
val dBUtil = new DBUtil(totalLines)
while (fileIsProcessing) {
// consumes @ 500 lines at a time to process, returns empty if done consuming
val batch:List[Pollable] = allLines.poll
//for batch identification purposes
val myMax = batch.map(_.toInt).max
println("Starting new batch with max line: " + myMax)
//processing work happens here
batch.map(processSync)
println("Finished processing batch with max line: " + myMax)
//send a progress update to be persisted to the DB
val progressCall = Future[Unit] {dBUtil.incrementProgress(batch.size)}
progressCall.onComplete{
case Success(s) => // don't care
case Failure(e) => logger.error("Unable to persist progress from actor ")
}
if (batch.isEmpty) fileIsProcessing = false //this is horribly non-functional.
}
そして、シンプルな表現:時にはそれがここでは、インクリメンタルに一つずつ行くんが、それのほとんどはちょうどデモの処理に何が起こるかの基本的な表現である、同様のバッチスタイルのフォーマットで起こります私のDBUTILの、処理やっクラス:今
class DBUtil(totalLines:Int) {
//store both the number processed and the total to process in db, even if there is currently a percentage
var rate = 0 //lines per second
var totalFinished = 0
var percentageFin:Double = 0
var lastUpdate = DateTime.now()
def incrementProgress(totalProcessed: Int, currentTime:DateTime): Unit = {
//simulate write the data and calculated progress percentage to db
rate = totalProcessed/((currentTime.getMillis() - lastUpdate.getMillis())/1000)
totalFinished += totalProcessed
percentageFin = (totalFinished.toDouble/totalLines.toDouble) * 100
println(s"Simulating DB persist of total processed:$totalFinished lines at $percentageFin% from my total lines: $totalLines at rate:$rate")
}
}
は、本当に奇妙なものを、生産され、処理が行Future[Unit] { dBUtil.incrementProgress(batch.size)}
が確実に毎回呼び出されませんので、こんなに早く起きます。 while
ループが終了しますが、私のDBには進行状況が50%または80%でハングアップすることに注意します。私がlogger
またはprintln
ステートメントを使ってシステムを動かすと、遅くなります。
なぜ私の未来の電話は毎回確実に電話しませんか?
あなたは 'DBUtil'で同期せずに擬似コードを表示しています。そこに例外をスローして気付かないことは想像しやすいです。 –
私は誤りを間違いなくチェックしていますが、適切なところでは '試行 'が使われていますが、Futureはよく.onComplete {case Success => ... case Failure => ...}'上記の擬似コードはできるだけ取り除かれます抽象化の目的のために。将来のコールが何らかの理由で「スキップ」する理由があるのであれば、私はちょっと不思議です。 – NateH06