2017-02-21 6 views
0

私はこのようなソースがあります:私は、もう一方の端のシンク時にシンクからアイテムを取得する方法

queueP.future.map(f => f.offer(someList)) 

:しかし、私はにアイテムを提供

val queueP : Promise[SourceQueueWithComplete[List[String]]] = Promise() 
val source = Source.queue(Constants.CHUNK_SIZE, OverflowStrategy.backpressure).mapMaterializedValue { 
     q : SourceQueueWithComplete[List[String]] => { 
     queueP.success(q) 
     q 
     } 
    }.watchTermination() { 
     case (_,f) => f.recoverWith { 
     case t : Exception => { 
      queueP.tryFailure(new Exception) 
      Future.failed(t) 
     } 
     } 
    } 

val sink = Sink.foreach[List[String]](someList => { 
     ... 
    }) 

    val flow = rowsSource.to(sink) 
    flow.run 

私が受け取る項目が来るの順不同で、最初は待ち行列の目的を奪っています。アイテムが強制的にキューに入れられた順序で来るようにする方法はありますか?

+0

'rowsSource'は' source'と同じですか?あなたはこの問題にもっと光を当てるためのコードを追加できますか? –

+0

@StefanoBonetti私は確信しています - 私の質問は、彼らは順番に来るはずですか?私が理解している情報によれば、アイテムは任意の順序で(先物が完成するたびに)来ることができます。しかし、元の順序を強制する方法がなければならない。 – Erix

+0

あなたのコードから、あなたはSource.queueに**一つの**要素しか提供していないようで、それは 'List [String]'です。私の結論は、シンクが受け取ったときはいつでも、同じ順序で要素が残っているということです。または、マテリアライズド・キューに複数の要素を提供していますか? –

答えて

2

offerあなたの要素がmapMaterializedValueコールの一部として含まれているため、要素を送信するたびにSource.queueを実体化(つまり実行)する必要があります。

各ストリームのマテリアライゼーションが非同期で行われるため、副作用として要素が順不同になります。

問題をより健全にアプローチするには、1つのグラフを実行し、1つのキューを保持し、複数の要素を送信します。以下のコード例を参照してください。

val queue: SourceQueueWithComplete[List[String]] = 
    Source.queue[List[String]](Constants.CHUNK_SIZE, OverflowStrategy.backpressure) 
    .to(Sink.foreach { list ⇒ /* do stuff */ }) 
    .run() 

queue.offer(List("a", "b")) 
queue.offer(List("c", "d")) 
+0

私はこれを試していただきありがとうございます – Erix

関連する問題