2017-02-20 12 views
0

を通して私は、次のコードスニペットがあります。私は、フローにメッセージを送信していたコードでアッカストリーム+アッカのHttpパスパラメータフロー

case class SomeClass(param1:String,param2:String,param3:String) 

    val someClassActorSource: Source[SomeClass, ActorRef] = Source 
     .actorPublisher[SomeClass](Props[SomeClassActorPublisher]) 

    val someFlow: ActorRef = Flow[SomeClass] 

     .mapAsync(3)(f=> getDocumentById(f)) 

     .map(f =>{ 
      val request = HttpRequest(method = HttpMethods.POST, uri = "http://localhost:8000/test") 
      .withEntity(ContentTypes.`text/xml(UTF-8)`, ByteString(f.a) 
      ) 
      (request,request) 

     }).via(connection) 

     //Parsing Response 
     .mapAsync(3){ 
      case (Success(HttpResponse(status, _, entity, _)),request)=> 
      entity.dataBytes.runFold(ByteString(""))(_ ++ _) 
     } 
     .map(resp =>parse(resp.utf8String,?????????????)) 
     .to(Sink.someSink{....}) 
     .runWith(someClassActorSource) 

    def parse(resp:String,parseParam:String)=???? 

とどこか:

someflow ! SomeClass("a","b","c") 
someflow ! SomeClass("a1","b1","c1") 

を私の問題はありますその方法の解析は、それがあるべきである

したがって、最初のメッセージの元のケースクラスからPARAM2使用すべき

parse(response,"b") 

と第二のメッセージのためには、そこで問題は、私は流れに提出メソッドからパラメータを取得することができますどのように、ある

parse(response,"b1") 

すべきですか?あなたのconnection値はあなたが接続をタプルにし、代わりに、単にあなたが入力されSomeClassに渡すことができタプルで二回requestを渡すので取るという事実を使用することができます

val connection = Http().cachedHostConnectionPool(...) 

を経由してインスタンス化されると仮定すると、

答えて

1

。このSomeClassインスタンスは、それぞれのFlowの値を解析して、その値を解析ステージにする必要があります。

val getDocumentFlow = 
    Flow[SomeClass].mapAsync(3)(f => getSomDocumentById(f).map(d => d -> f)) 

ので、私はちょうどDocumentを使用していgetDocumentByIdからの戻り値の型を述べていないあなたの質問:

val documentToRequest = 
    Flow[(Document, SomeClass)] map { case (document, someClass) => 
    val request = ... 

    (request, someClass) 
    } 

val parseResponse = 
    Flow[(Try[HttpResponse], SomeClass)].mapAsync(3){ 
    case (Success(HttpResponse(status, _, entity, _)), someClass) => 
     entity 
     .dataBytes 
     .runFold(ByteString(""))(_ ++ _) 
     .map(e => e -> someClass) 
    } 

val parseEntity = Flow[(ByteString, SomeClass)] map { 
    case (entity, someClass) => parse(entity.utf8String, someClass) 
} 

これらのフローは、その後に使用することができます

あなたのコードビットを変更します

val someFlow = 
    someClassActorSource 
    .via(getDocumentFlow) 
    .via(documentToRequest) 
    .via(connection) 
    .via(parseResponse) 
    .via(parseEntity) 
    .to(Sink.someSink{...}) 
    .run() 
関連する問題