2017-07-05 18 views
0

Alpakkaを使用してS3にファイルをアップロードし、同時にTikaと解析してMimeTypeを取得したいとします。2種類のシンクを組み合わせるには?

が、私は、現時点では、グラフの3つの部分を持っている:

val fileSource: Source[ByteString, Any] // comes from Akka-HTTP 
val fileUpload: Sink[ByteString, Future[MultipartUploadResult]] // created by S3Client from Alpakka 
val mimeTypeDetection: Sink[ByteString, Future[MediaType.Binary]] // my implementation using Apache Tika 

私は1つの場所で両方の結果を得るしたい、何かのように:

Future[(MultipartUploadResult, MediaType.Binary)] 

私は放送部分と全く問題がありません。

val broadcast = builder.add(Broadcast[ByteString](2)) 

source ~> broadcast ~> fileUpload 
      broadcast ~> mimeTypeDetection 

は、しかし、私はシンクを構成する問題があります。私がAPIとドキュメントで見つけた方法は、結合されたシンクが同じタイプであるか、シンクではなく、Zipping Flowsであると仮定しています。

このような場合には、どのような方法がありますか?

答えて

1

二つの方法:

1)alsoToMat(簡単に、何GraphDSLを使用していない、あなたの例のための十分な)

val mat1: (Future[MultipartUploadResult], Future[Binary]) = 
    fileSource 
    .alsoToMat(fileUpload)(Keep.right) 
    .toMat(mimeTypeDetection)(Keep.both) 
    .run() 

2)カスタムでGraphDSLを使用しては)(、より詳細な、より柔軟な値をマテリアライズド。 docsのこちらの詳細情報

val mat2: (Future[MultipartUploadResult], Future[Binary]) = 
    RunnableGraph.fromGraph(GraphDSL.create(fileUpload, mimeTypeDetection)((_, _)) { implicit builder => 
     (fileUpload, mimeTypeDetection) => 
     import GraphDSL.Implicits._ 
     val broadcast = builder.add(Broadcast[ByteString](2)) 

     fileSource ~> broadcast ~> fileUpload 
         broadcast ~> mimeTypeDetection 
     ClosedShape 
    }).run() 
関連する問題