2017-12-09 13 views
1

私はhost-level API with a queueを使用しています。Source.Queueバックプレッシャを有効にする方法

private val (queueSource, connectionPool) = Source.queue[(HttpRequest, Promise[HttpResponse])](queueSize, OverflowStrategy.backpressure).async 
    .viaMat(poolFlow)(Keep.both) 
    .toMat(
     Sink.foreach({ 
     case ((Success(resp), p)) => 
      p.success(resp) 
     case ((Failure(e), p)) => p.failure(e) 
     }) 
    )(Keep.left) 
    .run() 

私は接続プール内の接続のための要求のレースがたくさんあるが、私は次のエラーを取得:。

java.lang.IllegalStateException: You have to wait for previous offer to be resolved to send another request 
    at akka.stream.impl.QueueSource$$anon$1.akka$stream$impl$QueueSource$$anon$$bufferElem(QueueSource.scala:84) 
    at akka.stream.impl.QueueSource$$anon$1$$anonfun$1.apply(QueueSource.scala:94) 
    at akka.stream.impl.QueueSource$$anon$1$$anonfun$1.apply(QueueSource.scala:91) 
    at akka.stream.impl.fusing.GraphInterpreter.runAsyncInput(GraphInterpreter.scala:447) 
    at akka.stream.impl.fusing.GraphInterpreterShell$AsyncInput.execute(ActorGraphInterpreter.scala:464) 
    at akka.stream.impl.fusing.GraphInterpreterShell.processEvent(ActorGraphInterpreter.scala:559) 
    at akka.stream.impl.fusing.ActorGraphInterpreter.akka$stream$impl$fusing$ActorGraphInterpreter$$processEvent(ActorGraphInterpreter.scala:741) 
    at akka.stream.impl.fusing.ActorGraphInterpreter$$anonfun$receive$1.applyOrElse(ActorGraphInterpreter.scala:756) 
    at akka.actor.Actor$class.aroundReceive(Actor.scala:517) 
    at akka.stream.impl.fusing.ActorGraphInterpreter.aroundReceive(ActorGraphInterpreter.scala:666) 
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:527) 
    at akka.actor.ActorCell.invoke(ActorCell.scala:496) 
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257) 
    at akka.dispatch.Mailbox.run(Mailbox.scala:224) 
    at akka.dispatch.Mailbox.exec(Mailbox.scala:234) 
    at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) 
    at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) 
    at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) 
    at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) 

を私は何.asyncが、背圧がまだでキックしない追加しようとしました上記のエラーとその問題を調査する方法についてはどうですか?

答えて

0

Source.queueオブジェクトメソッドを使用してSourceを既に構築しているため、queue.offerを呼び出す機能に直接背圧をかけることはできないと思います。しかし、あなたの問題は別の方法で解決される可能性があります。

異なるOverflowStrategy

あなたはOverflowStrategy.dropHeadまたはOverflowStrategy.dropTailのようなものに戦略を変更することができます。 queueSizequeue.offer呼び出しのレートと比較して十分に大きい場合、これはおそらくあなたのニーズに合っています。

+0

私はメッセージを失う余裕がありません。それがうまくいかない場合、背圧戦略のポイントは何ですか? – Rabzu

+0

@Rabzu Source.queueで背圧戦略を使用した例はわかりません。背圧がそれほど重要なのであれば、最初に 'Source.queue'を使うのはなぜですか? –

+0

代替手段は何ですか? – Rabzu

関連する問題