2015-10-22 14 views
5

私はakka-httpを使用して、チャンク応答を返すhttpサービスにリクエストします。この場合にはJSON終了 -akka-httpチャンクレス応答連結

----- 
{"data": 
----- 
"some text"} 

----- 
{"data": 
----- 
"this is a longer 
----- 
text"} 

----- 
{"data": "txt"} 

----- 
... 

データの論理的な作品:

val httpRequest: HttpRequest = //build the request 
val request = Http().singleRequest(httpRequest) 
request.flatMap { response => 
    response.entity.dataBytes.runForeach { chunk => 
     println("-----") 
     println(chunk.utf8String) 
    } 
} 

と、コマンドラインで生成される出力は次のようになります。これは、コードの該当ビットがどのように見えるかです行末に\r\nというシンボルが付いていますが、上記の例ではjsonが単一のhttp応答チャンクに収まるとは限りません。

私の質問は、結果のコンテナタイプがまだSource[Out,M1]またはFlow[In,Out,M2]のままになるように、着信チャンクデータを完全なjsonsに連結するにはどうすればよいですか?私はakka-streamの理想に従いたいと思います。

UPDATE:

val request: HttpRequest = //build the request 
request.flatMap { response => 
    response.entity.dataBytes.scan("")((acc, curr) => if (acc.contains("\r\n")) curr.utf8String else acc + curr.utf8String) 
     .filter(_.contains("\r\n")) 
     .runForeach { json => 
      println("-----") 
      println(json) 
     } 
} 

答えて

3

は、ソリューションが見つかりリアルタイムに行われなければならないこと、また、言及する価値があります非常に問題: "ByteStringのストリームから行を解析する"。彼らの解決策は非常に冗長ですが、単一のチャンクが複数の行を含む状況にも対応できます。これは、チャンクサイズが複数のjsonメッセージを処理するのに十分な大きさに変化する可能性があるため、より堅牢に見えます。

+0

機能スキャンは正確に何をしますか?それに関する文書はありません。説明していただけますか? – MaatDeamon

+0

@MaatDeamon実際には、「折り畳みに似ていますが、ターミナル操作ではありません。ゼロから始まる現在の値を出し、次の現在の値を放出する関数fに現在値と次の値を適用します。 (http://doc.akka.io/api/akka-stream-and-http-experimental/1.0/index.html#akka.stream.scaladsl.Source)。私はそれを理解する方法は折り目のようですが、連続ストリームに適用することができます。それがなければこの解決法は決してうまくいかないでしょう。 – Caballero

+0

また、レスポンスハンドルのチャンクは自動的に行われますか?私はあなたのコールバックがチャンクごとに有効であるということですか? "と呼ばれる" – MaatDeamon

0

akka stream documentationこのため料理のエントリを持っている:それは応答が無限であると凝集が

+0

Akka 2.4へのリンクが更新されました:http://doc.akka.io/docs/akka/2.4.2/scala/stream/stream-cookbook.html#Parsing_lines_from_a_stream_of_ByteStrings – akauppi

+0

akka-httpにはまもなくJSONの特定のフレーミングサポートが含まれます。このサポートの早期プレビューは、@ ktosoのサンプルプロジェクトで見ることができます。関連するJSONフレーミングコードへの直接リンクは、https://github.com/ktoso/scaladays-berlin-akka-streams/blob/master/src/main/scala/akka/http/scaladsl/server/JsonEntityStreamingです。スカラ –

0
response.entity.dataBytes 
.via(Framing.delimiter(ByteString("\n"), maximumFrameLength = 8096)) 
.mapAsyncUnordered(Runtime.getRuntime.availableProcessors()) { data => 
    if (response.status == OK) { 
    val event: Future[Event] = Unmarshal(data).to[Event] 
    event.foreach(x => log.debug("Received event: {}.", x)) 
    event.map(Right(_)) 
    } else { 
    Future.successful(data.utf8String) 
     .map(Left(_)) 
    } 
} 

唯一の要件は、1つのレコードの最大サイズを知っていることです。小さいものから始めた場合、デフォルトの動作は、レコードが制限よりも大きい場合に失敗することです。失敗の代わりに切り捨てるように設定できますが、JSONの部分は意味をなさない。

関連する問題