2
私はAlpakka-FTPを使用していますが、多分私は一般的なakka-streamパターンを探しています。 FTPコネクタは、ファイルを一覧表示したり、それらを取得することができます。Akka Streams、別のソースとしてアイテムをソースしますか?
def ls(host: String): Source[FtpFile, NotUsed]
def fromPath(host: String, path: Path): Source[ByteString, Future[IOResult]]
理想的には、私はこのようなストリームを作成したいと思います:
LIST
.FETCH_ITEM
.FOREACH(do something)
をしかし、私は2つの機能で、このようなストリームを作成することができませんよ私は上に書いた。私は、私がFlow
を使用してそこに得ることができる必要がありますようにのみls
とfromPath
機能上記の
Ftp.ls
.via(some flow that uses the Ftp.fromPath above)
.runWith(Sink.foreach(do something))
のようなもの、これは可能ですが、感じ?
EDIT:
は、私は1人の俳優とmapAsync
を使用して、それをうまくすることができていますが、私はまだそれがより簡単であるべき感じます。
class Downloader extends Actor {
override def receive = {
case ftpFile: FtpFile =>
Ftp.fromPath(Paths.get(ftpFile.path), settings)
.toMat(FileIO.toPath(Paths.get("testHDF.txt")))(Keep.right)
.run() pipeTo sender
}
}
val downloader = as.actorOf(Props(new Downloader))
Ftp.ls("test_path", settings)
.mapAsync(1)(ftpFile => (downloader ? ftpFile) (3.seconds).mapTo[IOResult])
.runWith(Sink.foreach(res => println("got it!" + res)))