2017-03-10 12 views
1

Akka StreamにカスタムSource[ByteSting]を実装したいと思います。このソースは、指定されたファイルから、指定されたバイト範囲内のデータを読み込み、それをダウンストリームに伝播するだけです。ActorPublisherに基づいてカスタムAkka Streamsソースを実装する

私は、これは、ActorPublisherにミックスしたActorを実装することで実現できると思いました。

import java.nio.ByteBuffer 
import java.nio.channels.FileChannel 
import java.nio.file.{Path, StandardOpenOption} 

import akka.actor.{ActorLogging, DeadLetterSuppression, Props} 
import akka.stream.actor.ActorPublisher 
import akka.stream.actor.ActorPublisherMessage.{Cancel, Request} 
import akka.util.ByteString 

import scala.annotation.tailrec 
import scala.util.control.NonFatal 

class FilePublisher(pathToFile: Path, startByte: Long, endByte: Long) extends ActorPublisher[ByteString] 
    with ActorLogging{ 

    import FilePublisher._ 

    private val chunksToBuffer = 10 
    private var bytesLeftToRead = endByte - startByte + 1 
    private var fileChannel: FileChannel = _ 
    private val buffer = ByteBuffer.allocate(8096) 

    private var bufferedChunks: Vector[ByteString] = _ 

    override def preStart(): Unit = { 
    try { 
     log.info("Starting") 
     fileChannel = FileChannel.open(pathToFile, StandardOpenOption.READ) 
     bufferedChunks = readAhead(Vector.empty, Some(startByte)) 
     log.info("Chunks {}", bufferedChunks) 
    } catch { 
     case NonFatal(ex) => onErrorThenStop(ex) 
    } 
    } 

    override def postStop(): Unit = { 

    log.info("Stopping") 
    if (fileChannel ne null) 
     try fileChannel.close() catch { 
     case NonFatal(ex) => log.error(ex, "Error during file channel close") 
    } 
    } 

    override def receive: Receive = { 
    case Request => 
     readAndSignalNext() 
     log.info("Got request") 
    case Continue => 
     log.info("Continuing reading") 
     readAndSignalNext() 
    case Cancel => 
     log.info("Cancel message got") 
     context.stop(self) 
    } 

    private def readAndSignalNext() = { 

    log.info("Reading and signaling") 
    if (isActive) { 
     bufferedChunks = readAhead(signalOnNext(bufferedChunks), None) 
     if (isActive && totalDemand > 0) self ! Continue 
    } 
    } 

    @tailrec 
    private def signalOnNext(chunks: Vector[ByteString]): Vector[ByteString] = { 

    if (chunks.nonEmpty && totalDemand > 0) { 
     log.info("Signaling") 
     onNext(chunks.head) 
     signalOnNext(chunks.tail) 
    } else { 
     if (chunks.isEmpty && bytesLeftToRead > 0) { 
     onCompleteThenStop() 
     } 
     chunks 
    } 
    } 

    @tailrec 
    private def readAhead(currentlyBufferedChunks: Vector[ByteString], startPosition: Option[Long]): Vector[ByteString] = { 

    if (currentlyBufferedChunks.size < chunksToBuffer) { 

     val bytesRead = readDataFromChannel(startPosition) 
     log.info("Bytes read {}", bytesRead) 
     bytesRead match { 
     case Int.MinValue => Vector.empty 
     case -1 => 
      log.info("EOF reached") 
      currentlyBufferedChunks // EOF reached 
     case _ => 
      buffer.flip() 
      val chunk = ByteString(buffer) 
      buffer.clear() 

      bytesLeftToRead -= bytesRead 
      val trimmedChunk = if (bytesLeftToRead >= 0) chunk else chunk.dropRight(bytesLeftToRead.toInt) 
      readAhead(currentlyBufferedChunks :+ trimmedChunk, None) 
     } 

    } else { 
     currentlyBufferedChunks 
    } 
    } 

    private def readDataFromChannel(startPosition: Option[Long]): Int = { 
    try { 
     startPosition match { 
     case Some(position) => fileChannel.read(buffer, position) 
     case None => fileChannel.read(buffer) 
     } 
    } catch { 
     case NonFatal(ex) => 
     log.error(ex, "Got error reading data from file channel") 
     Int.MinValue 
    } 
    } 
} 

object FilePublisher { 

    private case object Continue extends DeadLetterSuppression 

    def props(path: Path, startByte: Long, endByte: Long): Props = Props(classOf[FilePublisher], path, startByte, endByte) 
} 

しかし、私はこのように私のFilePublisherに裏打ちされたSourceをマテリアライズするときことが判明:この実装ではなく、与えられたバイトの範囲からちょうどデータの供給パスからファイル全体を読み込みakka.stream.impl.io.FilePublisherに似ている

val fileSource = Source.actorPublisher(FilePublisher.props(pathToFile, 0, fileLength)) 
val future = fileSource.runWith(Sink.seq) 

何も起こらず、送信元はさらに下流にデータを伝播しません。

は私のFilePublisherに基づくか、私はこのAPIを使用してちょうどhereを説明したようにカスタム処理段階を実装するべきではありませんSourceを具現するために、他の正しい方法はありますか?

CustomStageのアプローチの問題点は、その単純な実装がこの段階でIOをすぐに実行できることです。ステージからカスタムスレッドプールまたはアクタにIOを移動できますが、これにはステージとアクタの間に何らかの形の同期が必要です。おかげさまで

答えて

0

問題はreceive方法のパターンマッチングにミスによって引き起こされた:実際に思ったように単一のパラメータ(final case class Request(n: Long))とない場合のオブジェクトの場合のクラスである代わりcase Request(_)Requestため このラインcase Request =>があるべきです。

0

現在、IO操作に別のディスパッチャを使用していないことに気付きました。 Here'sこれを行わない理由を説明しているdocsセクションでは、アプリケーションで厄介なブロックが発生する可能性があります。

Akka Streamsは、特定のスレッドプールベースのディスパッチャを使用してFileSourceFilePublisherをラップします。インスピレーションのコードをチェックアウトすることができますhere

+0

ありがとう、それは有効な点です。私は、将来別のディスパッチャーの使用法を追加するつもりです。 – thereisnospoon

関連する問題