2017-11-22 8 views
1

私はKryoシリアル化されたバイナリデータをS3(数千のシリアル化オブジェクト)に保存しています。Alpakka - S3からKryoシリアル化されたオブジェクトを読み取る

アルパッカは、data: Source[ByteString, NotUsed]と内容を読み取ることができます。しかし、Kryoフォーマットはデリミタを使用しないので、シリアル化された各オブジェクトをdata.via(Framing.delimiter(...))を使用して別のByteStringに分割することはできません。

実際、Kryoは、オブジェクトが終了するときにデータを読み取る必要があり、ストリーミングには適していません。

このケースをストリーミング方式で実装して、1日の終わりにSource[MyObject, NotUsed]になることは可能ですか?

答えて

1

これを行うグラフの段階があります。シリアライズされたオブジェクトが2バイトの文字列にまたがる場合を処理します。オブジェクトが大きい場合(私のユースケースではない)オブジェクトを改善する必要があり、Source[ByteString, NotUsed]に2つ以上のバイト文字列を使用することができます。

object KryoReadStage { 
    def flow[T](kryoSupport: KryoSupport, 
       `class`: Class[T], 
       serializer: Serializer[_]): Flow[ByteString, immutable.Seq[T], NotUsed] = 
    Flow.fromGraph(new KryoReadStage[T](kryoSupport, `class`, serializer)) 
} 

final class KryoReadStage[T](kryoSupport: KryoSupport, 
          `class`: Class[T], 
          serializer: Serializer[_]) 
    extends GraphStage[FlowShape[ByteString, immutable.Seq[T]]] { 

    override def shape: FlowShape[ByteString, immutable.Seq[T]] = FlowShape.of(in, out) 

    override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = { 
    new GraphStageLogic(shape) { 

     setHandler(in, new InHandler { 

     override def onPush(): Unit = { 
      val bytes = 
      if (previousBytes.length == 0) grab(in) 
      else ByteString.fromArrayUnsafe(previousBytes) ++ grab(in) 

      Managed(new Input(new ByteBufferBackedInputStream(bytes.asByteBuffer))) { input => 
      var position = 0 
      val acc = ListBuffer[T]() 

      kryoSupport.withKryo { kryo => 
       var last = false 

       while (!last && !input.eof()) { 
       tryRead(kryo, input) match { 
        case Some(t) => 
        acc += t 
        position = input.total().toInt 
        previousBytes = EmptyArray 
        case None => 
        val bytesLeft = new Array[Byte](bytes.length - position) 

        val bb = bytes.asByteBuffer 
        bb.position(position) 
        bb.get(bytesLeft) 

        last = true 
        previousBytes = bytesLeft 
       } 
       } 

       push(out, acc.toList) 
      } 
      } 
     } 

     private def tryRead(kryo: Kryo, input: Input): Option[T] = 
      try { 
      Some(kryo.readObject(input, `class`, serializer)) 
      } catch { 
      case _: KryoException => None 
      } 

     }) 

     setHandler(out, new OutHandler { 
     override def onPull(): Unit = { 
      pull(in) 
     } 
     }) 

     private val EmptyArray: Array[Byte] = Array.empty 

     private var previousBytes: Array[Byte] = EmptyArray 

    } 
    } 

    override def toString: String = "KryoReadStage" 

    private lazy val in: Inlet[ByteString] = Inlet("KryoReadStage.in") 
    private lazy val out: Outlet[immutable.Seq[T]] = Outlet("KryoReadStage.out") 

} 

使用例

client.download(BucketName, key) 
    .via(KryoReadStage.flow(kryoSupport, `class`, serializer)) 
    .flatMapConcat(Source(_)) 

それは以下のいくつかの追加のヘルパーを使用しています。

ByteBufferBackedInputStream:マネージド

class ByteBufferBackedInputStream(buf: ByteBuffer) extends InputStream { 

    override def read: Int = { 
    if (!buf.hasRemaining) -1 
    else buf.get & 0xFF 
    } 

    override def read(bytes: Array[Byte], off: Int, len: Int): Int = { 
    if (!buf.hasRemaining) -1 
    else { 
     val read = Math.min(len, buf.remaining) 
     buf.get(bytes, off, read) 
     read 
    } 
    } 

} 

object Managed { 

    type AutoCloseableView[T] = T => AutoCloseable 

    def apply[T: AutoCloseableView, V](resource: T)(op: T => V): V = 
    try { 
     op(resource) 
    } finally { 
     resource.close() 
    } 
} 

KryoSupport

trait KryoSupport { 
    def withKryo[T](f: Kryo => T): T 
} 

class PooledKryoSupport(serializers: (Class[_], Serializer[_])*) extends KryoSupport { 

    override def withKryo[T](f: Kryo => T): T = { 
    pool.run(new KryoCallback[T] { 
     override def execute(kryo: Kryo): T = f(kryo) 
    }) 
    } 

    private val pool = { 
    val factory = new KryoFactory() { 
     override def create(): Kryo = { 
     val kryo = new Kryo 

     (KryoSupport.ScalaSerializers ++ serializers).foreach { 
      case ((clazz, serializer)) => 
      kryo.register(clazz, serializer) 
     } 

     kryo 
     } 
    } 

    new KryoPool.Builder(factory).softReferences().build() 
    } 

} 
関連する問題