2016-03-20 8 views
3

例シナリオ:(整数の)別のストリームによって決定されたサイズのチャンクへのストリームのグループバイト。あるAkkaストリームのエレメントを別のエレメントに基づいて集約する方法は?

def partition[A, B, C](
    first:Source[A, NotUsed], 
    second:Source[B, NotUsed], 
    aggregate:(Int => Seq[A], B) => C 
):Source[C, NotUsed] = ??? 

val bytes:Source[Byte, NotUsed] = ??? 
val sizes:Source[Int, NotUsed] = ??? 

val chunks:Source[ByteString, NotUsed] = 
    partition(bytes, sizes, (grab, count) => ByteString(grab(count))) 

私の最初の試みは、Flow#scanFlow#prefixAndTailの組み合わせを含むが、それは(下記参照)かなり右に感じることはありません。私もFramingを見ましたが、上の例のシナリオには当てはまりません(また、非バイトストリームを受け入れるのに十分な一般的なものでもありません)。私は私の唯一の選択肢はGraphs(またはより一般的なFlowOps#transform)を使用することだと思っていますが、私はそれを試みるためにAkkaストリームで十分に堪能ではありません。


は、ここで私はこれまで(具体的な例のシナリオへの)を思い付くことができたものです。

val chunks:Source[ByteString, NotUsed] = sizes 
    .scan(bytes prefixAndTail 0) { 
    (grouped, count) => grouped flatMapConcat { 
     case (chunk, remainder) => remainder prefixAndTail count 
    } 
    } 
    .flatMapConcat(identity) 
    .collect { case (chunk, _) if chunk.nonEmpty => ByteString(chunk:_*) } 

答えて

4

は、私はあなたがカスタムGraphStageような処理を実装することができると思います。ステージには2つのInlet要素があります。 1つはバイトを取り、もう1つはサイズを取る。値を生成するのは1つのOutlet要素です。

次の入力ストリームを検討してください。

そして、あなたが GraphStageを構築できるカスタムストリーム処理( http://doc.akka.io/docs/akka/2.4.2/scala/stream/stream-customize.html)を記述した情報を使用して
def randomChars = Iterator.continually(Random.nextPrintableChar()) 
def randomNumbers = Iterator.continually(math.abs(Random.nextInt() % 50)) 

val bytes: Source[Char, NotUsed] = 
    Source.fromIterator(() => randomChars) 

val sizes: Source[Int, NotUsed] = 
    Source.fromIterator(() => randomNumbers).filter(_ != 0) 

case class ZipFraming() extends GraphStage[FanInShape2[Int, Char, (Int, ByteString)]] { 

    override def initialAttributes = Attributes.name("ZipFraming") 

    override val shape: FanInShape2[Int, Char, (Int, ByteString)] = 
    new FanInShape2[Int, Char, (Int, ByteString)]("ZipFraming") 

    val inFrameSize: Inlet[Int] = shape.in0 
    val inElements: Inlet[Char] = shape.in1 

    def out: Outlet[(Int, ByteString)] = shape.out 

    override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = 
    new GraphStageLogic(shape) { 
     // we will buffer as much as 512 characters from the input 
     val MaxBufferSize = 512 
     // the buffer for the received chars 
     var buffer = Vector.empty[Char] 
     // the needed number of elements 
     var needed: Int = -1 
     // if the downstream is waiting 
     var isDemanding = false 

     override def preStart(): Unit = { 
     pull(inFrameSize) 
     pull(inElements) 
     } 

     setHandler(inElements, new InHandler { 
     override def onPush(): Unit = { 
      // we buffer elements as long as we can 
      if (buffer.size < MaxBufferSize) { 
      buffer = buffer :+ grab(inElements) 
      pull(inElements) 
      } 
      emit() 
     } 
     }) 

     setHandler(inFrameSize, new InHandler { 
     override def onPush(): Unit = { 
      needed = grab(inFrameSize) 
      emit() 
     } 
     }) 

     setHandler(out, new OutHandler { 
     override def onPull(): Unit = { 
      isDemanding = true 
      emit() 
     } 
     }) 

     def emit(): Unit = { 
     if (needed > 0 && buffer.length >= needed && isDemanding) { 
      val (emit, reminder) = buffer.splitAt(needed) 
      push(out, (needed, ByteString(emit.map(_.toByte).toArray))) 
      buffer = reminder 
      needed = -1 
      isDemanding = false 
      pull(inFrameSize) 
      if (!hasBeenPulled(inElements)) pull(inElements) 
     } 
     } 
    } 
} 

これは実行方法です。

RunnableGraph.fromGraph(GraphDSL.create(bytes, sizes)(Keep.none) { implicit b => 
    (bs, ss) => 
    import GraphDSL.Implicits._ 

    val zipFraming = b.add(ZipFraming()) 

    ss ~> zipFraming.in0 
    bs ~> zipFraming.in1 

    zipFraming.out ~> Sink.foreach[(Int, ByteString)](e => println((e._1, e._2.utf8String))) 

    ClosedShape 
}).run() 
+0

このアプローチの問題点は、結果として生じたストリームの要素はすべて同じプレフィックスを共有するので、文字の 'size'数は*常に*、bytes''の*正面*から採取されることです。 – Andrey

+0

@Andreyあなたは正しいです。私はZipとFramingの間の何かとして働く 'GraphStage'の実際の実装で私の答えを更新しました。 – lpiepiora

関連する問題