1
も私のために働いていないようです。アイテムは、そこに定義されたシンクにならない。ここに私が持っているものがあります。に定義されているシンクには何も来ません。
val merged: Source[ArticleWithKeywords, _] = ...
val (ks, fut) = merged
.alsoTo(Flow[ArticleWithKeywords].map { a => a.id -> a.ids.toList }.to(queueManager.getIdsForAnsSink))
.map(_.id)
.groupedWithin(100, 5 seconds)
.mapAsync(4) { ids => runReferenceFetching(ids) }
.viaMat(KillSwitches.single)(Keep.right)
.toMat(Sink.ignore)(Keep.both)
.run()
しかし、項目がrunReferenceFetchingに達しています。私は何が欠けていますか?
'queueManager.getIdsForAnsSink'を' Sink.foreach'に置き換えても、期待どおりに動作しませんか? (またはそれがそれを修正した場合、そのシンクで何か間違っている/予期せぬことがあります) – johanandren
@johanandren 'Sink.foreach'で動作します。しかし、一方では単純な 'Sourse(List(..))。runWith(queueManager.getIdsForAnsSink)も機能します。だから私はここで何が失敗しているのか混乱している。 – expert
また、実際にはいずれかの下流ブランチバックプレッシャーがある場合に背圧をかけるブロードキャストステージなので、要素がシンクに到達していても何とか失われてしまうと言います。多分並行性の問題? – johanandren