2015-12-14 17 views
7

は、それが動的にがドメインオブジェクトにアッカのHTTPから、未知の長さで、ByteStringストリームを外部をデシリアライズすることは可能ですか?アッカHTTPストリーミングJSONデシリアライズ


コンテキスト

私は成長し続けてJSON Arrayを出力無限長いHTTPエンドポイントを呼び出します。

[ 
    { "prop": true, "prop2": false, "prop3": 97, "prop4": "sample" }, 
    { "prop": true, "prop2": false, "prop3": 97, "prop4": "sample" }, 
    { "prop": true, "prop2": false, "prop3": 97, "prop4": "sample" }, 
    { "prop": true, "prop2": false, "prop3": 97, "prop4": "sample" }, 
    { "prop": true, "prop2": false, "prop3": 97, "prop4": "sample" }, 
    ... 
] <- Never sees the daylight 
+0

明確にするために、このJSONストリームを受け取ったり、このストリームをブロードキャストしようとしていますか?ブロードキャストの場合、内部表現(イテレータ、スカラストリームなど)は何ですか?また、コミュニケーションは配列でなければならないのですか、それとも個々のドメインオブジェクトのストリームである可能性がありますか? –

+0

@RamonJRomeroyVigilこのストリームは完全に外部になります。 – Martijn

+0

あなたの特別なケースでは、 '}'を閉じるのを待つことができ、その間にテキスト用にあなたが選んだデシリアライザを呼び出すことができます。これにはいくつかの操作が必要で、おそらくByteStringのバッファリングが必要ですが、かなり基本的です。 –

答えて

0

私はplay-iteratees-extrasはあなたを助けなければならないと思います。このライブラリは、Enumerator/IterateeパターンでJsonを解析することを可能にし、もちろん、すべてのデータを受け取るのを待っていません。

たとえば、「無限」のJson配列を表す、無限のバイトストリームを作成しないようにします。

import play.api.libs.iteratee.{Enumeratee, Enumerator, Iteratee} 

var i = 0 
var isFirstWas = false 

val max = 10000 

val stream = Enumerator("[".getBytes) andThen Enumerator.generateM { 
    Future { 
    i += 1 
    if (i < max) { 
     val json = Json.stringify(Json.obj(
     "prop" -> Random.nextBoolean(), 
     "prop2" -> Random.nextBoolean(), 
     "prop3" -> Random.nextInt(), 
     "prop4" -> Random.alphanumeric.take(5).mkString("") 
    )) 

     val string = if (isFirstWas) { 
     "," + json 
     } else { 
     isFirstWas = true 
     json 
     } 


     Some(Codec.utf_8.encode(string)) 
    } else if (i == max) Some("]".getBytes) // <------ this is the last jsArray closing tag 
    else None 

    } 
} 

いいえ、この値には10000(以上)のオブジェクトのjsArrayが含まれています。配列内の各オブジェクトのデータを含むケースクラスを定義します。

case class Props(prop: Boolean, prop2: Boolean, prop3: Int, prop4: String) 

今、各項目

import play.extras.iteratees._  
import JsonBodyParser._ 
import JsonIteratees._ 
import JsonEnumeratees._ 

val parser = jsArray(jsValues(jsSimpleObject)) ><> Enumeratee.map { json => 
    for { 
    prop <- json.\("prop").asOpt[Boolean] 
    prop2 <- json.\("prop2").asOpt[Boolean] 
    prop3 <- json.\("prop3").asOpt[Int] 
    prop4 <- json.\("prop4").asOpt[String] 
    } yield Props(prop, prop2, prop3, prop4) 
} 

をしてください解析されること、パーサを書き、jsArrayjsValuesjsSimpleObjectためdocを参照してください。パッケージはCharStringとしてバイトをデコードします

val result = stream &> Encoding.decode() ><> parser 

Encoding.decode() JsonIterateesから:結果のプロデューサーを構築します。 resultの値にはEnumerator[Option[Item]]の型があり、この列挙子にはiterateeを適用して解析処理を開始できます。

合計で、どのようにバイトを受け取るのかわかりません(解決策はこれに大きく依存します)が、問題の解決策の1つを表示すると思います。

0

Twitterのストリーム(無限の文字列)をドメインオブジェクトに解析しようとすると、非常によく似た問題が発生しました。 私はこのように、Json4sを使用して、それを解く:

case class Tweet(username: String, geolocation: Option[Geo]) 
case class Geo(latitude: Float, longitude: Float) 
object Tweet{ 
    def apply(s: String): Tweet = { 
     parse(StringInput(s), useBigDecimalForDouble = false, useBigIntForLong = false).extract[Tweet] 
    } 
} 

その後、私はちょうどストリームをバッファリングし、ツイートにそれをマッピングされた:

val reader = new BufferedReader(new InputStreamReader(new GZIPInputStream(inputStream), "UTF-8")) 
var line = reader.readLine() 
while(line != null){ 
    store(Tweet.apply(line)) 
    line = reader.readLine() 
} 

Json4sは内部でフルオプションの上にサポート(またはカスタムオブジェクトを持っていますこの例ではGeoのようにオブジェクトです)。したがって、私がしたようにOptionを置くことができます。フィールドがJsonになければ、Noneに設定されます。

希望すると助かります!

関連する問題