2016-09-11 5 views
6

ストリームとしてakka-http-clientを使用してHTTPリクエストをチェーンしたいとします。チェーン内の各HTTP要求は、以前の要求の成功/応答に依存し、新しい要求を構築するためにそれを使用します。リクエストが成功しなかった場合、Streamは失敗したリクエストのレスポンスを返します。StreamでのAkka-http-clientリクエストのチェーン

どのようにakka-httpでこのようなストリームを構築できますか? akka-httpクライアントレベルのAPIを使用する必要がありますか?

+0

すべての要求が成功した場合、ストリームは具体化する必要がありますか?おそらくあなたはいくつかのデータを取得したいのですか? – Mikesname

+0

はい、ストリームの最後にHTTPResponseを取得したいと思います。具体化したときに、それが失敗の成功と失敗の理由であるかどうかを知りたい。 – Rabzu

答えて

8

ウェブクローラーを作成する場合は、this postをご覧ください。この答えは、次のページへのリンクが現在のページ応答のヘッダーにあるページネーションされたリソースのダウンロードなど、より単純なケースに取り組んでいます。

Source.unfoldAsyncメソッドを使用して、1つのアイテムが次のアイテムにつながる連鎖ソースを作成できます。これは、要素Sを受け取り、を返す関数をとり、ストリームがタイプEの要素を放出し続け、次の呼び出しに状態を渡すかどうかを決定します。

お使いの場合には

、これは一種のようなものです:

Some(request -> response)を返し、別のURLへの応答ポイント場合 Future[HttpResponse]
  • を生産するそうNone
  • を初期 HttpRequest
  • を取っ
    1. にはシワがあります。これはこのwi次のリクエストへのポインタが含まれていない場合は、ストリームからレスポンスを送出しません。

      これを回避するには、関数をunfoldAsync return Future[Option[(Option[HttpRequest], HttpResponse)]]に渡すことができます。これは、次のような状況に対処することができます:

      • 電流応答は、電流応答が

      別の要求を指していない別の要求

    2. にエラー
    3. 電流応答のポイントです以下は、このアプローチの概要を示す注釈付きコードですが、最初は暫定です。

      あなたが体を必要としない場合は無視することができますが、ここではHttpEntityをaから(a)に変換する関数を使用します。厳密エンティティに(潜在的な)ストリーム:

      import scala.concurrent.duration._ 
      
      def convertToStrict(r: HttpResponse): Future[HttpResponse] = 
          r.entity.toStrict(10.minutes).map(e => r.withEntity(e)) 
      

      次に、HttpResponseからOption[HttpRequest]を作成するための機能のいくつ。 <https://api.github.com/...> rel="next"

      def nextUri(r: HttpResponse): Seq[Uri] = for { 
          linkHeader <- r.header[Link].toSeq 
          value <- linkHeader.values 
          params <- value.params if params.key == "rel" && params.value() == "next" 
      } yield value.uri 
      
      def getNextRequest(r: HttpResponse): Option[HttpRequest] = 
          nextUri(r).headOption.map(next => HttpRequest(HttpMethods.GET, next)) 
      

      次に、我々はunfoldAsyncに渡します実際の関数この例ではLinksヘッダが含まれているのGithubのページネーションリンク、例えばのような方式を採用しています。それはHttpRequestを取り、生産するアッカHTTP Http().singleRequest() APIを使用していますFuture[HttpResponse]

      def chainRequests(reqOption: Option[HttpRequest]): Future[Option[(Option[HttpRequest], HttpResponse)]] = 
          reqOption match { 
          case Some(req) => Http().singleRequest(req).flatMap { response => 
           // handle the error case. Here we just return the errored response 
           // with no next item. 
           if (response.status.isFailure()) Future.successful(Some(None -> response)) 
      
           // Otherwise, convert the response to a strict response by 
           // taking up the body and looking for a next request. 
           else convertToStrict(response).map { strictResponse => 
           getNextRequest(strictResponse) match { 
            // If we have no next request, return Some containing an 
            // empty state, but the current value 
            case None => Some(None -> strictResponse) 
      
            // Otherwise, pass on the request... 
            case next => Some(next -> strictResponse) 
           } 
           } 
          } 
          // Finally, there's no next request, end the stream by 
          // returning none as the state. 
          case None => Future.successful(None) 
          } 
      

      我々は、エラーが発生応答を取得する場合、我々は次の状態でNoneを返すために、ストリームが継続しないことに注意してください。

      あなたがそうのようなHttpResponseオブジェクトのストリームを取得し、これを呼び出すことができます。最後の(または、エラーが発生)応答の値を返すよう

      val initialRequest = HttpRequest(HttpMethods.GET, "http://www.my-url.com") 
      Source.unfoldAsync[Option[HttpRequest], HttpResponse](
          Some(initialRequest)(chainRequests) 
      

      を、あなたは、単にストリームになるので、Sink.lastを使用する必要があります正常に完了した場合、または最初にエラーが発生した場合に終了します。たとえば、次のようになります。

      def getStatus: Future[StatusCode] = Source.unfoldAsync[Option[HttpRequest], HttpResponse](
           Some(initialRequest))(chainRequests) 
          .map(_.status) 
          .runWith(Sink.last) 
      
  • +0

    ありがとうございます。非常に感動的な答え – Rabzu

    関連する問題