私はこのようなソースがあります:私は、もう一方の端のシンク時にシンクからアイテムを取得する方法
queueP.future.map(f => f.offer(someList))
:しかし、私はにアイテムを提供
val queueP : Promise[SourceQueueWithComplete[List[String]]] = Promise()
val source = Source.queue(Constants.CHUNK_SIZE, OverflowStrategy.backpressure).mapMaterializedValue {
q : SourceQueueWithComplete[List[String]] => {
queueP.success(q)
q
}
}.watchTermination() {
case (_,f) => f.recoverWith {
case t : Exception => {
queueP.tryFailure(new Exception)
Future.failed(t)
}
}
}
val sink = Sink.foreach[List[String]](someList => {
...
})
val flow = rowsSource.to(sink)
flow.run
私が受け取る項目が来るの順不同で、最初は待ち行列の目的を奪っています。アイテムが強制的にキューに入れられた順序で来るようにする方法はありますか?
'rowsSource'は' source'と同じですか?あなたはこの問題にもっと光を当てるためのコードを追加できますか? –
@StefanoBonetti私は確信しています - 私の質問は、彼らは順番に来るはずですか?私が理解している情報によれば、アイテムは任意の順序で(先物が完成するたびに)来ることができます。しかし、元の順序を強制する方法がなければならない。 – Erix
あなたのコードから、あなたはSource.queueに**一つの**要素しか提供していないようで、それは 'List [String]'です。私の結論は、シンクが受け取ったときはいつでも、同じ順序で要素が残っているということです。または、マテリアライズド・キューに複数の要素を提供していますか? –