私はakka-http
を使用して、チャンク応答を返すhttpサービスにリクエストします。この場合にはJSON終了 -akka-httpチャンクレス応答連結
-----
{"data":
-----
"some text"}
-----
{"data":
-----
"this is a longer
-----
text"}
-----
{"data": "txt"}
-----
...
データの論理的な作品:
val httpRequest: HttpRequest = //build the request
val request = Http().singleRequest(httpRequest)
request.flatMap { response =>
response.entity.dataBytes.runForeach { chunk =>
println("-----")
println(chunk.utf8String)
}
}
と、コマンドラインで生成される出力は次のようになります。これは、コードの該当ビットがどのように見えるかです行末に\r\n
というシンボルが付いていますが、上記の例ではjsonが単一のhttp応答チャンクに収まるとは限りません。
私の質問は、結果のコンテナタイプがまだSource[Out,M1]
またはFlow[In,Out,M2]
のままになるように、着信チャンクデータを完全なjsonsに連結するにはどうすればよいですか?私はakka-stream
の理想に従いたいと思います。
UPDATE:
val request: HttpRequest = //build the request
request.flatMap { response =>
response.entity.dataBytes.scan("")((acc, curr) => if (acc.contains("\r\n")) curr.utf8String else acc + curr.utf8String)
.filter(_.contains("\r\n"))
.runForeach { json =>
println("-----")
println(json)
}
}
機能スキャンは正確に何をしますか?それに関する文書はありません。説明していただけますか? – MaatDeamon
@MaatDeamon実際には、「折り畳みに似ていますが、ターミナル操作ではありません。ゼロから始まる現在の値を出し、次の現在の値を放出する関数fに現在値と次の値を適用します。 (http://doc.akka.io/api/akka-stream-and-http-experimental/1.0/index.html#akka.stream.scaladsl.Source)。私はそれを理解する方法は折り目のようですが、連続ストリームに適用することができます。それがなければこの解決法は決してうまくいかないでしょう。 – Caballero
また、レスポンスハンドルのチャンクは自動的に行われますか?私はあなたのコールバックがチャンクごとに有効であるということですか? "と呼ばれる" – MaatDeamon