2017-01-22 11 views
2

私はAlpakka-FTPを使用していますが、多分私は一般的なakka-streamパターンを探しています。 FTPコネクタは、ファイルを一覧表示したり、それらを取得することができます。Akka Streams、別のソースとしてアイテムをソースしますか?

def ls(host: String): Source[FtpFile, NotUsed] 
def fromPath(host: String, path: Path): Source[ByteString, Future[IOResult]] 

理想的には、私はこのようなストリームを作成したいと思います:

LIST 
    .FETCH_ITEM 
    .FOREACH(do something) 

をしかし、私は2つの機能で、このようなストリームを作成することができませんよ私は上に書いた。私は、私がFlowを使用してそこに得ることができる必要がありますようにのみlsfromPath機能上記の

Ftp.ls 
    .via(some flow that uses the Ftp.fromPath above) 
    .runWith(Sink.foreach(do something)) 

のようなもの、これは可能ですが、感じ?

EDIT:

は、私は1人の俳優とmapAsyncを使用して、それをうまくすることができていますが、私はまだそれがより簡単であるべき感じます。

class Downloader extends Actor { 
    override def receive = { 
    case ftpFile: FtpFile => 
     Ftp.fromPath(Paths.get(ftpFile.path), settings) 
     .toMat(FileIO.toPath(Paths.get("testHDF.txt")))(Keep.right) 
     .run() pipeTo sender 
    } 
} 

val downloader = as.actorOf(Props(new Downloader)) 

Ftp.ls("test_path", settings) 
    .mapAsync(1)(ftpFile => (downloader ? ftpFile) (3.seconds).mapTo[IOResult]) 
    .runWith(Sink.foreach(res => println("got it!" + res))) 

答えて

0

この目的でflatMapConcatを使用できるはずです。あなたの具体的な例としては、

Ftp.ls("test_path", settings).flatMapConcat{ ftpFile => 
    Ftp.fromPath(Paths.get(ftpFile.path), settings) 
}.runWith(FileIO.toPath(Paths.get("testHDF.txt"))) 

ドキュメントhereに書き換えることができます。

関連する問題