2017-11-27 13 views
2

私はのリストをS3バケットからのファイル名とペアにしています。これらは一定のメモリに圧縮し、プレイ2.6で提供される必要があります。Akka Streamsでオンザフライで[Source [ByteString、NotUsed]]のリストを圧縮する

ここにやや似て質問があります:stream a zip created on the fly with play 2.5 and akka stream with backpressure

はこちら(プレイするために必要な2.6+)アッカストリームに関連するコードスニペットで:

https://gist.github.com/kirked/03c7f111de0e9a1f74377bf95d3f0f60

私がこれまでの私の実験をベース上記の要旨は、しかし、要点は、異なる問題を解決する - それはグラフステージInputStreamを通過させることによって、ディスクからファイルをストリーミング。そこInputStreamに私Source[ByteString, NotUsed]を変換する全く安全な方法は、しかし、ではありませんので、そのままIは、スニペットを使用することはできません。

これまでのところ、入力タイプを() => InputStreamから() => Source[ByteString, NotUsed]に変更してから、source.runForeach(...)を使用して消費しました。

私の変更の大半はここにある:

override def onPush(): Unit = { 
    val (filepath, source: StreamGenerator) = grab(in) 
    buffer.startEntry(filepath) 
    val src: Source[ByteString, NotUsed] = source() 
    val operation = src.runForeach(bytestring => { 
    val byteInputStream = new ByteArrayInputStream(bytestring.toArray) 
    emitMultiple(out, fileChunks(byteInputStream, buffer)) 
    }) 
    operation.onComplete { 
    case _ => buffer.endEntry() 
    } 
    Await.ready(operation, 5.minute) 
} 

私は、これがブロックしていることを知っているが、私はそれがこのコンテキストで許可されているかどうかわかりませんよ。私は安全な方法でこれを実現するにはどうすればよい

[ERROR:しかし、それはこのスタックトレースとエラーが生成さ

override def onPush(): Unit = { 
    val (filepath, source: StreamGenerator) = grab(in) 
    buffer.startEntry(filepath) 
    val stream = source().runWith(StreamConverters.asInputStream(1.minute)) 
    currentStream = Some(stream) 
    emitMultiple(out, fileChunks(stream, buffer),() => buffer.endEntry()) 
} 

:私も主旨にずっと近いこのバージョンを試してみました

EDIT

] [11/27/2017 09:26:38.428] [alpakka-akka.actor.default-dispatcher-3] [akka:// alpakka/user/StreamSupervisor-0/flow-0] -0-headSink] ステージのエラー[[email protected]]: 無効なストリームが終了しました。読み込みができません。 java.io.IOException:無効なストリームが終了しました。読み込みがありません akka.stream.impl.io.InputStreamAdapter.executeIfNotClosedで (InputStreamSinkStage.scala:125) akka.streamで: akka.stream.impl.io.InputStreamAdapter.subscriberClosedException(117 InputStreamSinkStage.scala)で可能 。 impl.io.InputStreamAdapter.read(InputStreamSinkStage.scala:144) の com.company.productregistration.services.s3.StreamedZip $$ anon $ 2.result $ 1(StreamedZip.scala:99) at com.company.productregistration.services.s3.StreamedZip $$ anon $ 2. $ anonfun $ fileChunks $ 1(StreamedZip.scala:105) at scala.collection.immutable.Stream $ Cons.tail(Stream.scala:1169)at scala.collection.immutable.Stream $ Cons.tail(Stream.scala:1159)at scala.collection.immutable.StreamIterator。$ anonfun $ next $ 1(Stream.scala:1058) at scala.collection.immutable。 StreamIterator $ LazyCell.v $ lzycompute(Stream.scala:1047) でscala.collection.immutable.StreamIterator $ LazyCell.v(ストリーム。スカラ:1047) scala.collection.immutable.StreamIterator.hasNextで(Stream.scala:1052) akka.stream.stage.GraphStageLogic $ EmittingIterator.onPull(GraphStage.scala時:911) akka.streamで akka.stream.impl.fusing.GraphInterpreterShell.runBatchで.impl.fusing.GraphInterpreter.processPull(GraphInterpreter.scala:506) でakka.stream.impl.fusing.GraphInterpreter.execute(412 GraphInterpreter.scala) (ActorGraphInterpreter.scala:571) at akka.stream.impl.fusing.GraphInterpreterShell.init(ActorGraphInterpreter.scala:541) at akka.stream.impl.fusing.ActorGrap hInterpreter.tryInit(ActorGraphInterpreter.scala:659) でakka.stream.impl.fusing.ActorGraphInterpreter.preStart(ActorGraphInterpreter.scala:707): アッカでakka.actor.Actor.aroundPreStart(522 Actor.scala)で.actor.Actor.aroundPreStart $(Actor.scala:522)at akka.actor.ActorCell.create(ActorCell.scala:591)のakka.stream.impl.fusing.ActorGraphInterpreter.aroundPreStart(ActorGraphInterpreter.scala:650) でakka.actor.ActorCell.invokeAll $ 1(ActorCell.scala:462) でakka.actor.ActorCell.systemInvoke(ActorCell.scala:484) でakka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:282)at akka.dispatch.Mailbox.run(Mailbox.scala:223)at akka.d ispatch.Mailbox.exec(Mailbox.scala:234) akka.dispatch.forkjoin.ForkJoinPool $ WorkQueue.runTaskで akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)で(ForkJoinPool.java:1339) akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)で akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) で

EDIT2 Iの場合currentStream = Some(stream)を設定しないと、上記のエラーは発生しません。さらに、実際にはいくつかのファイルの組み合わせに対しても機能します。 約20メガバイトのファイルがあります。最後のソースとして入れた場合、私のzipファイルが破損します。私がソースのリストの他のどこにでも置くと、すべて正常に動作します。以下は

は私の現在のグラフステージの実装の完全なリストである:

import java.io.{ByteArrayInputStream, InputStream, OutputStream} 

import akka.NotUsed 
import akka.stream._ 
import akka.stream.scaladsl._ 
import akka.stream.stage._ 
import akka.util.{ByteString, ByteStringBuilder} 

import scala.concurrent.duration._ 
import scala.concurrent.{Await, ExecutionContext} 
import scala.util.control.NonFatal 

//scalastyle:off 
class StreamedZip(bufferSize: Int = 64 * 1024)(implicit ec: ExecutionContext, 
               mat: ActorMaterializer) 
    extends GraphStage[FlowShape[StreamedZip.ZipSource, ByteString]] { 

    import StreamedZip._ 

    val in: Inlet[ZipSource] = Inlet("StreamedZip.in") 
    val out: Outlet[ByteString] = Outlet("StreamedZip.out") 
    override val shape   = FlowShape.of(in, out) 

    override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = 
    new GraphStageLogic(shape) with StageLogging { 
     private val buffer        = new ZipBuffer(bufferSize) 
     private var currentStream: Option[InputStream] = None 

     setHandler(
     out, 
     new OutHandler { 
      override def onPull(): Unit = 
      if (isClosed(in)) { 
       if (buffer.isEmpty) completeStage() 
       else { 
       buffer.close 
       push(out, buffer.toByteString) 
       } 
      } else pull(in) 

      override def onDownstreamFinish(): Unit = { 
      closeInput() 
      buffer.close 
      super.onDownstreamFinish() 
      } 
     } 
    ) 

     setHandler(
     in, 
     new InHandler { 
      override def onPush(): Unit = { 
      val (filepath, source: StreamGenerator) = grab(in) 
      buffer.startEntry(filepath) 
      val stream = source().runWith(StreamConverters.asInputStream(1.minute)) 
      emitMultiple(out, fileChunks(stream, buffer),() => { buffer.endEntry() }) 
      } 

      override def onUpstreamFinish(): Unit = { 
      println("Updstream finish") 
      closeInput() 
      if (buffer.isEmpty) completeStage() 
      else { 
       buffer.close() 
       if (isAvailable(out)) { 
       push(out, buffer.toByteString) 
       } 
      } 
      } 
     } 
    ) 

     private def closeInput(): Unit = { 
     currentStream.foreach(_.close) 
     currentStream = None 
     } 

     private def fileChunks(stream: InputStream, buffer: ZipBuffer): Iterator[ByteString] = { 
     // This seems like a good trade-off between single-byte 
     // read I/O performance and doubling the ZipBuffer size. 
     // 
     // And it's still a decent defense against DDOS resource 
     // limit attacks. 
     val readBuffer = new Array[Byte](1024) 
     var done  = false 

     def result: Stream[ByteString] = 
      if (done) Stream.empty 
      else { 
      try { 
       while (!done && buffer.remaining > 0) { 
       val bytesToRead = Math.min(readBuffer.length, buffer.remaining) 
       val count  = stream.read(readBuffer, 0, bytesToRead) 
       if (count == -1) { 
        stream.close 
        done = true 
       } else buffer.write(readBuffer, count) 
       } 
       buffer.toByteString #:: result 
      } catch { 
       case NonFatal(e) => 
       closeInput() 
       throw e 
      } 
      } 

     result.iterator 
     } 
    } 
} 

object StreamedZip { 
    type ZipFilePath  = String 
    type StreamGenerator =() => Source[ByteString, NotUsed] 
    type ZipSource  = (ZipFilePath, StreamGenerator) 

    def apply()(implicit ec: ExecutionContext, mat: ActorMaterializer) = new StreamedZip() 

} 

class ZipBuffer(val bufferSize: Int = 64 * 1024) { 
    import java.util.zip.{ZipEntry, ZipOutputStream} 

    private var builder = new ByteStringBuilder() 
    private val zip = new ZipOutputStream(builder.asOutputStream) { 
    // this MUST ONLY be used after flush()! 
    def setOut(newStream: OutputStream): Unit = out = newStream 
    } 
    private var inEntry = false 
    private var closed = false 

    def close(): Unit = { 
    endEntry() 
    closed = true 
    zip.close() 
    } 

    def remaining(): Int = bufferSize - builder.length 

    def isEmpty(): Boolean = builder.isEmpty 

    def startEntry(path: String): Unit = 
    if (!closed) { 
     endEntry() 
     zip.putNextEntry(new ZipEntry(path)) 
     inEntry = true 
    } 

    def endEntry(): Unit = 
    if (!closed && inEntry) { 
     inEntry = false 
     zip.closeEntry() 
    } 

    def write(byte: Int): Unit = 
    if (!closed && inEntry) zip.write(byte) 

    def write(bytes: Array[Byte], length: Int): Unit = 
    if (!closed && inEntry) zip.write(bytes, 0, length) 

    def toByteString(): ByteString = { 
    zip.flush() 
    val result = builder.result 
    builder = new ByteStringBuilder() 
    // set the underlying output for the zip stream to be the buffer 
    // directly, so we don't have to copy the zip'd byte array. 
    zip.setOut(builder.asOutputStream) 
    result 
    } 
} 
+0

あなたがここにこの答えが役立つことがあります。https://stackoverflow.com/a/47146187/49630 –

答えて

1

私は上からZipBufferを使用して終了とアッカ・ストリームのDSLを使用して、全体的な問題を解決します。

次のように私のソリューションが見えます:

import akka.NotUsed 
import akka.actor.ActorSystem 
import akka.stream.scaladsl.Source 
import akka.stream.{ActorMaterializer, SourceShape} 
import akka.util.ByteString 
import com.company.config.AWS 
import org.log4s.getLogger 

case class S3StreamingServiceLike(awsConf: AWS, s3Client: S3ClientAlpakka)(
    implicit sys: ActorSystem, 
    mat: ActorMaterializer) 
    extends S3StreamingService { 

    private implicit class ConcatSyntax[T, U](source: Source[T, U]) { 
    def ++[TT >: T, NotUsed](that: Source[SourceShape[TT], NotUsed]): Source[Any, U] = //scalastyle:ignore 
     source.concat(that) 
    } 

    private val logger = getLogger 

    private sealed trait ZipElement 
    private case class FileStart(name: String, index: Int, outOf: Int) extends ZipElement 
    private case class FileEnd(name: String, index: Int, outOf: Int) extends ZipElement 
    private case class FilePayload(byteString: ByteString)    extends ZipElement 
    private case object EndZip           extends ZipElement 

    private def payloadSource(filename: String) = 
    s3Client.download(awsConf.s3BucketName, filename).map(FilePayload.apply) 

    private def fileNameToZipElements(filename: String, 
            index: Int, 
            outOf: Int): Source[ZipElement, NotUsed] = 
    Source.single(FileStart(filename, index, outOf)) ++ 
     payloadSource(filename) ++ 
     Source.single(FileEnd(filename, index, outOf)) 

    def streamFilesAsZip(filenames: List[String])(forUser: String): Source[ByteString, NotUsed] = { 

    val zipBuffer = new ZipBuffer() 

    val zipElementSource: Source[ZipElement, NotUsed] = 
     Source(filenames.zipWithIndex).flatMapConcat { 
     case (filename, index) => fileNameToZipElements(filename, index + 1, filenames.length) 
     } ++ Source.single(EndZip) 

    zipElementSource 
     .map { 
     case FileStart(name, index, outOf) => 
      logger.info(s"Zipping file #$index of $outOf with name $name for user $forUser") 
      zipBuffer.startEntry(name) 
      None 
     case FilePayload(byteString) => 
      if (byteString.length > zipBuffer.remaining()) { 
      throw new Exception(
       s"Bytestring size exceeded buffer size ${byteString.length} > ${zipBuffer.remaining}") 
      } 
      zipBuffer.write(byteString.toArray, byteString.length) 
      Some(zipBuffer.toByteString()) 
     case FileEnd(name, index, outOf) => 
      logger.info(s"Finished zipping file #$index of $outOf with $name for user $forUser") 
      zipBuffer.endEntry() 
      Some(zipBuffer.toByteString()) 
     case EndZip => 
      zipBuffer.close() 
      Some(zipBuffer.toByteString()) 
     } 
     .collect { 
     case Some(bytes) if bytes.length > 0 => bytes 
     } 
    } 

} 
関連する問題