私はのリストをS3バケットからのファイル名とペアにしています。これらは一定のメモリに圧縮し、プレイ2.6で提供される必要があります。Akka Streamsでオンザフライで[Source [ByteString、NotUsed]]のリストを圧縮する
ここにやや似て質問があります:stream a zip created on the fly with play 2.5 and akka stream with backpressure
はこちら(プレイするために必要な2.6+)アッカストリームに関連するコードスニペットで:
https://gist.github.com/kirked/03c7f111de0e9a1f74377bf95d3f0f60
私がこれまでの私の実験をベース上記の要旨は、しかし、要点は、異なる問題を解決する - それはグラフステージInputStream
を通過させることによって、ディスクからファイルをストリーミング。そこInputStream
に私Source[ByteString, NotUsed]
を変換する全く安全な方法は、しかし、ではありませんので、そのままIは、スニペットを使用することはできません。
これまでのところ、入力タイプを() => InputStream
から() => Source[ByteString, NotUsed]
に変更してから、source.runForeach(...)
を使用して消費しました。
私の変更の大半はここにある:
override def onPush(): Unit = {
val (filepath, source: StreamGenerator) = grab(in)
buffer.startEntry(filepath)
val src: Source[ByteString, NotUsed] = source()
val operation = src.runForeach(bytestring => {
val byteInputStream = new ByteArrayInputStream(bytestring.toArray)
emitMultiple(out, fileChunks(byteInputStream, buffer))
})
operation.onComplete {
case _ => buffer.endEntry()
}
Await.ready(operation, 5.minute)
}
私は、これがブロックしていることを知っているが、私はそれがこのコンテキストで許可されているかどうかわかりませんよ。私は安全な方法でこれを実現するにはどうすればよい
?
[ERROR:しかし、それはこのスタックトレースとエラーが生成さ
override def onPush(): Unit = { val (filepath, source: StreamGenerator) = grab(in) buffer.startEntry(filepath) val stream = source().runWith(StreamConverters.asInputStream(1.minute)) currentStream = Some(stream) emitMultiple(out, fileChunks(stream, buffer),() => buffer.endEntry()) }
:私も主旨にずっと近いこのバージョンを試してみました
EDIT
] [11/27/2017 09:26:38.428] [alpakka-akka.actor.default-dispatcher-3] [akka:// alpakka/user/StreamSupervisor-0/flow-0] -0-headSink] ステージのエラー[[email protected]]: 無効なストリームが終了しました。読み込みができません。 java.io.IOException:無効なストリームが終了しました。読み込みがありません akka.stream.impl.io.InputStreamAdapter.executeIfNotClosedで (InputStreamSinkStage.scala:125) akka.streamで: akka.stream.impl.io.InputStreamAdapter.subscriberClosedException(117 InputStreamSinkStage.scala)で可能 。 impl.io.InputStreamAdapter.read(InputStreamSinkStage.scala:144) の com.company.productregistration.services.s3.StreamedZip $$ anon $ 2.result $ 1(StreamedZip.scala:99) at com.company.productregistration.services.s3.StreamedZip $$ anon $ 2. $ anonfun $ fileChunks $ 1(StreamedZip.scala:105) at scala.collection.immutable.Stream $ Cons.tail(Stream.scala:1169)at scala.collection.immutable.Stream $ Cons.tail(Stream.scala:1159)at scala.collection.immutable.StreamIterator。$ anonfun $ next $ 1(Stream.scala:1058) at scala.collection.immutable。 StreamIterator $ LazyCell.v $ lzycompute(Stream.scala:1047) でscala.collection.immutable.StreamIterator $ LazyCell.v(ストリーム。スカラ:1047) scala.collection.immutable.StreamIterator.hasNextで(Stream.scala:1052) akka.stream.stage.GraphStageLogic $ EmittingIterator.onPull(GraphStage.scala時:911) akka.streamで akka.stream.impl.fusing.GraphInterpreterShell.runBatchで.impl.fusing.GraphInterpreter.processPull(GraphInterpreter.scala:506) でakka.stream.impl.fusing.GraphInterpreter.execute(412 GraphInterpreter.scala) (ActorGraphInterpreter.scala:571) at akka.stream.impl.fusing.GraphInterpreterShell.init(ActorGraphInterpreter.scala:541) at akka.stream.impl.fusing.ActorGrap hInterpreter.tryInit(ActorGraphInterpreter.scala:659) でakka.stream.impl.fusing.ActorGraphInterpreter.preStart(ActorGraphInterpreter.scala:707): アッカでakka.actor.Actor.aroundPreStart(522 Actor.scala)で.actor.Actor.aroundPreStart $(Actor.scala:522)at akka.actor.ActorCell.create(ActorCell.scala:591)のakka.stream.impl.fusing.ActorGraphInterpreter.aroundPreStart(ActorGraphInterpreter.scala:650) でakka.actor.ActorCell.invokeAll $ 1(ActorCell.scala:462) でakka.actor.ActorCell.systemInvoke(ActorCell.scala:484) でakka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:282)at akka.dispatch.Mailbox.run(Mailbox.scala:223)at akka.d ispatch.Mailbox.exec(Mailbox.scala:234) akka.dispatch.forkjoin.ForkJoinPool $ WorkQueue.runTaskで akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)で(ForkJoinPool.java:1339) akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)で akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) で
EDIT2 Iの場合currentStream = Some(stream)
を設定しないと、上記のエラーは発生しません。さらに、実際にはいくつかのファイルの組み合わせに対しても機能します。 約20メガバイトのファイルがあります。最後のソースとして入れた場合、私のzipファイルが破損します。私がソースのリストの他のどこにでも置くと、すべて正常に動作します。以下は
は私の現在のグラフステージの実装の完全なリストである:
import java.io.{ByteArrayInputStream, InputStream, OutputStream}
import akka.NotUsed
import akka.stream._
import akka.stream.scaladsl._
import akka.stream.stage._
import akka.util.{ByteString, ByteStringBuilder}
import scala.concurrent.duration._
import scala.concurrent.{Await, ExecutionContext}
import scala.util.control.NonFatal
//scalastyle:off
class StreamedZip(bufferSize: Int = 64 * 1024)(implicit ec: ExecutionContext,
mat: ActorMaterializer)
extends GraphStage[FlowShape[StreamedZip.ZipSource, ByteString]] {
import StreamedZip._
val in: Inlet[ZipSource] = Inlet("StreamedZip.in")
val out: Outlet[ByteString] = Outlet("StreamedZip.out")
override val shape = FlowShape.of(in, out)
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
new GraphStageLogic(shape) with StageLogging {
private val buffer = new ZipBuffer(bufferSize)
private var currentStream: Option[InputStream] = None
setHandler(
out,
new OutHandler {
override def onPull(): Unit =
if (isClosed(in)) {
if (buffer.isEmpty) completeStage()
else {
buffer.close
push(out, buffer.toByteString)
}
} else pull(in)
override def onDownstreamFinish(): Unit = {
closeInput()
buffer.close
super.onDownstreamFinish()
}
}
)
setHandler(
in,
new InHandler {
override def onPush(): Unit = {
val (filepath, source: StreamGenerator) = grab(in)
buffer.startEntry(filepath)
val stream = source().runWith(StreamConverters.asInputStream(1.minute))
emitMultiple(out, fileChunks(stream, buffer),() => { buffer.endEntry() })
}
override def onUpstreamFinish(): Unit = {
println("Updstream finish")
closeInput()
if (buffer.isEmpty) completeStage()
else {
buffer.close()
if (isAvailable(out)) {
push(out, buffer.toByteString)
}
}
}
}
)
private def closeInput(): Unit = {
currentStream.foreach(_.close)
currentStream = None
}
private def fileChunks(stream: InputStream, buffer: ZipBuffer): Iterator[ByteString] = {
// This seems like a good trade-off between single-byte
// read I/O performance and doubling the ZipBuffer size.
//
// And it's still a decent defense against DDOS resource
// limit attacks.
val readBuffer = new Array[Byte](1024)
var done = false
def result: Stream[ByteString] =
if (done) Stream.empty
else {
try {
while (!done && buffer.remaining > 0) {
val bytesToRead = Math.min(readBuffer.length, buffer.remaining)
val count = stream.read(readBuffer, 0, bytesToRead)
if (count == -1) {
stream.close
done = true
} else buffer.write(readBuffer, count)
}
buffer.toByteString #:: result
} catch {
case NonFatal(e) =>
closeInput()
throw e
}
}
result.iterator
}
}
}
object StreamedZip {
type ZipFilePath = String
type StreamGenerator =() => Source[ByteString, NotUsed]
type ZipSource = (ZipFilePath, StreamGenerator)
def apply()(implicit ec: ExecutionContext, mat: ActorMaterializer) = new StreamedZip()
}
class ZipBuffer(val bufferSize: Int = 64 * 1024) {
import java.util.zip.{ZipEntry, ZipOutputStream}
private var builder = new ByteStringBuilder()
private val zip = new ZipOutputStream(builder.asOutputStream) {
// this MUST ONLY be used after flush()!
def setOut(newStream: OutputStream): Unit = out = newStream
}
private var inEntry = false
private var closed = false
def close(): Unit = {
endEntry()
closed = true
zip.close()
}
def remaining(): Int = bufferSize - builder.length
def isEmpty(): Boolean = builder.isEmpty
def startEntry(path: String): Unit =
if (!closed) {
endEntry()
zip.putNextEntry(new ZipEntry(path))
inEntry = true
}
def endEntry(): Unit =
if (!closed && inEntry) {
inEntry = false
zip.closeEntry()
}
def write(byte: Int): Unit =
if (!closed && inEntry) zip.write(byte)
def write(bytes: Array[Byte], length: Int): Unit =
if (!closed && inEntry) zip.write(bytes, 0, length)
def toByteString(): ByteString = {
zip.flush()
val result = builder.result
builder = new ByteStringBuilder()
// set the underlying output for the zip stream to be the buffer
// directly, so we don't have to copy the zip'd byte array.
zip.setOut(builder.asOutputStream)
result
}
}
あなたがここにこの答えが役立つことがあります。https://stackoverflow.com/a/47146187/49630 –