2017-05-26 13 views
3

私は(入力がcontinuosストリームとTCPソケットである現実には)このような3行で入力ファイルからSource[ByteString, _]ていますデコードは、アッカストリームでJSONをチャンク

{"a":[2 
33] 
} 

は、今の問題は、Iこれを解析してSource[ChangeMessage,_]にしたいと思っていますが、各JSONメッセージが複数の行で断片化されるときではなく、すべての行についてJSONメッセージ全体があるときに唯一の例があります。

この例ではthisというライブラリが見つかりましたが、最終的な文字は}または,であり、これは1行あたり1つのJSONです。下の例はこの設定を示しています。

"My decoder" should "decode chunked json" in { 
    implicit val sys = ActorSystem("test") 
    implicit val mat = ActorMaterializer() 
    val file = Paths.get("chunked_json_stream.json") 
    val data = FileIO.fromPath(file) 
    .via(CirceStreamSupport.decode[ChangeMessage]) 
    .runWith(TestSink.probe[ChangeMessage]) 
    .request(1) 
    .expectComplete() 
    } 

別の選択肢は倍とバランス}を使用し、全体のJSONが完了したときにのみ発光するようになります。この問題は、折り畳み演算子がストリームの完成時にのみ出力され、これは連続ストリームなのでここでは使用できません。

私の質問は:JSONはアッカストリームで ストリーム、すでに これを行い、利用可能なソフトウェアがあるチャンク解析するために最速の方法は何ですか?可能であれば、私は使用したいcirce

答えて

2

は言う:

この流れはさらに、ストリーム/ SSEベースのAPIを消費するための偉大である、彼らは到着するかもしれないものは何でも断片化で複数のJSON文書の解析をサポートしています。あなたのケースでは

すべてを行う必要があるだけで、着信バイト文字列を区切るためにある:

"My decoder" should "decode chunked json" in { 
    implicit val sys = ActorSystem("test") 
    implicit val mat = ActorMaterializer() 
    val file = Paths.get("chunked_json_stream.json") 

    val sourceUnderTest = 
     FileIO.fromPath(file) 
     .via(Framing.delimiter(ByteString("\n"), 8192, allowTruncation = true)) 
     .via(CirceStreamSupport.decode[ChangeMessage]) 

    sourceUnderTest 
     .runWith(TestSink.probe[ChangeMessage]) 
     .request(1) 
     .expectNext(ChangeMessage(List(233))) 
     .expectComplete() 
} 

ファイルから読み込むときに、バイト文字列の要素が複数の行が含まれているため、キルケは、不正な解析できないためだことジョーンズ新しい行で区切ると、ストリームの各要素は別々の行になります。そのため、Circeは前述の機能を使用して解析できます。

+0

実際には、JsonFramingなしでCirceStreamSupportを使用して作業していますか? – user3139545

+0

@ user3139545あなたの発言をありがとう。私は私の答えを明らかにした。 –

0

残念ながら、残念ながら、私はJSONのストリームベースの解析をサポートするScalaライブラリは認識していません。 と思われますが、Google Gsonでこれをサポートしていますが、「壊れた」入力を適切に処理できるかどうかは完全にはわかりません。

ただし、できることは、Framing.delimiterと同様に、JSONドキュメントをストリーミング形式で収集することです。これはあなたが言及した選択肢に非常に似ていますが、fold()を使用していません。このようにすれば、おそらくFraming.delimiterのようなものを模倣する必要がありますが、1つの区切り文字を探す代わりに中括弧(必要に応じて最上位の配列が可能な場合はかっこ)のバランスをとり、中間データをバッファリングする必要がありますドキュメント全体が処理されるまで、あなたは解析に適した単一のチャンクとして発行されます。それはもう読まなく、そこにすることができない場合

trait Parser { 
    def update(data: Array[Byte]) // or String 
    def pull(): Option[Either[Error, JsonEvent]] 
} 

pull()戻りNone:アッカストリームで使用するのに適したサイドノート、ストリーミングJSONパーサーの適切なインタフェースは、次のようになりますと同じように

JsonEventは、ストリーミングパーサー(すなわち、BeginObject,BeginArray,EndObject,,Stringなどのサブクラスを持つ密閉された特性)を記述するための標準的な構造です。そのようなライブラリを見つけたり作成したりする場合は、それを使って、Akkaストリームのデータを解析します(ByteString)。 knutwalker/akka-stream-jsonのドキュメントとして