2017-08-17 3 views
0

私は、Actorからのsource.queueを使用しようとしています。私はそれがエンキューされた場合parttern試合で私がチェックする方法がわからないの申し出操作Akka-StreamでSource.Queueを使用する方法

class MarcReaderActor(file: File, sourceQueue: SourceQueueWithComplete[Record]) extends Actor { 

    val inStream = file.newInputStream 
    val reader = new MarcStreamReader(inStream) 

    override def receive: Receive = { 

    case Process => { 
     if (reader.hasNext()) { 
     val record = reader.next() 
     pipe(sourceQueue.offer(record)) to self 
     } 
    } 

    case f:Future[QueueOfferResult] => 
    } 
    } 
} 

の結果を立ち往生したり、私が書いた場合落としたり失敗

ていますF:未来[QueueOfferResult.Enqueued]

答えて

1

pipeToを使用しているため、未来の内容は未来のものではなく、未来が完了したときに俳優に送られます。これを行う:

override def receive: Receive = { 
    case Process => 
    if (reader.hasNext()) { 
     val record = reader.next() 
     pipe(sourceQueue.offer(record)) to self 
    } 

    case r: QueueOfferResult => 
    r match { 
     case QueueOfferResult.Enqueued =>  // element has been consumed 
     case QueueOfferResult.Dropped =>  // element has been ignored because of backpressure 
     case QueueOfferResult.QueueClosed => // the queue upstream has terminated 
     case QueueOfferResult.Failure(e) => // the queue upstream has failed with an exception 
    } 

    case Status.Failure(e) => // future has failed, e.g. because of invalid usage of `offer()` 
} 
関連する問題