電流応答のポイントです以下は、このアプローチの概要を示す注釈付きコードですが、最初は暫定です。 あなたが体を必要としない場合は無視することができますが、ここではHttpEntity
をaから(a)に変換する関数を使用します。厳密エンティティに(潜在的な)ストリーム:
import scala.concurrent.duration._
def convertToStrict(r: HttpResponse): Future[HttpResponse] =
r.entity.toStrict(10.minutes).map(e => r.withEntity(e))
次に、HttpResponse
からOption[HttpRequest]
を作成するための機能のいくつ。 <https://api.github.com/...> rel="next"
:
def nextUri(r: HttpResponse): Seq[Uri] = for {
linkHeader <- r.header[Link].toSeq
value <- linkHeader.values
params <- value.params if params.key == "rel" && params.value() == "next"
} yield value.uri
def getNextRequest(r: HttpResponse): Option[HttpRequest] =
nextUri(r).headOption.map(next => HttpRequest(HttpMethods.GET, next))
次に、我々はunfoldAsync
に渡します実際の関数この例ではLinks
ヘッダが含まれているのGithubのページネーションリンク、例えばのような方式を採用しています。それはHttpRequest
を取り、生産するアッカHTTP Http().singleRequest()
APIを使用していますFuture[HttpResponse]
:
def chainRequests(reqOption: Option[HttpRequest]): Future[Option[(Option[HttpRequest], HttpResponse)]] =
reqOption match {
case Some(req) => Http().singleRequest(req).flatMap { response =>
// handle the error case. Here we just return the errored response
// with no next item.
if (response.status.isFailure()) Future.successful(Some(None -> response))
// Otherwise, convert the response to a strict response by
// taking up the body and looking for a next request.
else convertToStrict(response).map { strictResponse =>
getNextRequest(strictResponse) match {
// If we have no next request, return Some containing an
// empty state, but the current value
case None => Some(None -> strictResponse)
// Otherwise, pass on the request...
case next => Some(next -> strictResponse)
}
}
}
// Finally, there's no next request, end the stream by
// returning none as the state.
case None => Future.successful(None)
}
我々は、エラーが発生応答を取得する場合、我々は次の状態でNone
を返すために、ストリームが継続しないことに注意してください。
あなたがそうのようなHttpResponse
オブジェクトのストリームを取得し、これを呼び出すことができます。最後の(または、エラーが発生)応答の値を返すよう
val initialRequest = HttpRequest(HttpMethods.GET, "http://www.my-url.com")
Source.unfoldAsync[Option[HttpRequest], HttpResponse](
Some(initialRequest)(chainRequests)
を、あなたは、単にストリームになるので、Sink.last
を使用する必要があります正常に完了した場合、または最初にエラーが発生した場合に終了します。たとえば、次のようになります。
def getStatus: Future[StatusCode] = Source.unfoldAsync[Option[HttpRequest], HttpResponse](
Some(initialRequest))(chainRequests)
.map(_.status)
.runWith(Sink.last)
すべての要求が成功した場合、ストリームは具体化する必要がありますか?おそらくあなたはいくつかのデータを取得したいのですか? – Mikesname
はい、ストリームの最後にHTTPResponseを取得したいと思います。具体化したときに、それが失敗の成功と失敗の理由であるかどうかを知りたい。 – Rabzu