例シナリオ:(整数の)別のストリームによって決定されたサイズのチャンクへのストリームのグループバイト。あるAkkaストリームのエレメントを別のエレメントに基づいて集約する方法は?
def partition[A, B, C](
first:Source[A, NotUsed],
second:Source[B, NotUsed],
aggregate:(Int => Seq[A], B) => C
):Source[C, NotUsed] = ???
val bytes:Source[Byte, NotUsed] = ???
val sizes:Source[Int, NotUsed] = ???
val chunks:Source[ByteString, NotUsed] =
partition(bytes, sizes, (grab, count) => ByteString(grab(count)))
私の最初の試みは、Flow#scanとFlow#prefixAndTailの組み合わせを含むが、それは(下記参照)かなり右に感じることはありません。私もFramingを見ましたが、上の例のシナリオには当てはまりません(また、非バイトストリームを受け入れるのに十分な一般的なものでもありません)。私は私の唯一の選択肢はGraphs(またはより一般的なFlowOps#transform)を使用することだと思っていますが、私はそれを試みるためにAkkaストリームで十分に堪能ではありません。
は、ここで私はこれまで(具体的な例のシナリオへの)を思い付くことができたものです。
val chunks:Source[ByteString, NotUsed] = sizes
.scan(bytes prefixAndTail 0) {
(grouped, count) => grouped flatMapConcat {
case (chunk, remainder) => remainder prefixAndTail count
}
}
.flatMapConcat(identity)
.collect { case (chunk, _) if chunk.nonEmpty => ByteString(chunk:_*) }
このアプローチの問題点は、結果として生じたストリームの要素はすべて同じプレフィックスを共有するので、文字の 'size'数は*常に*、bytes''の*正面*から採取されることです。 – Andrey
@Andreyあなたは正しいです。私はZipとFramingの間の何かとして働く 'GraphStage'の実際の実装で私の答えを更新しました。 – lpiepiora