2017-02-01 18 views
0

NettyまたはRatpackを使用してjsonデータのストリームを消費したい。私のユースケースはリクエストボディに大きなjsonデータ(MB単位のjsonの配列)が含まれていることです。データを処理する1つの方法は、完全なデータを受け取ってから処理を開始するまでブロックです。しかし、jsonオブジェクトの1つのチャンクが処理されるとすぐに、非同期処理の意味があります。Netty/Ratpackのjsonストリームデータを使用する

私はNettyのJsonObjectDecoderに出会ったが、私はそれを使用して運がない。ここ は私ChannelInitializerクラスです:

public class ServerInitializer extends ChannelInitializer<SocketChannel> { 

    @Override 
    public void initChannel(SocketChannel ch) { 
     ChannelPipeline p = ch.pipeline(); 

     p.addLast(new JsonObjectDecoder(true)); 

     // HttpServerCodec is a combination of HttpRequestDecoder and HttpResponseEncoder 
     p.addLast(new HttpServerCodec()); 
     // 
     // add gizp compressor for http response content 
     p.addLast(new HttpContentCompressor()); 

     p.addLast(new HttpObjectAggregator(1048576)); 

     p.addLast(new ChunkedWriteHandler()); 

     p.addLast(new ServerHandler()); 
    } 
} 

私は、このデータを送信しています:

[ 
    { 
     "timestamp": "2016-11-14 11:08:09+0100", 
     "message": "message 120", 
     "hostname": "myhost.com", 
     "device_product": "product123", 
     "device_vendor": "vendor123", 
     "device_version": "1", 
     "severity": "High" 
    }, 
    ..... 
    { 
     "timestamp": "2016-11-14 11:08:09+0100", 
     "message": "message 121", 
     "hostname": "myhost.com", 
     "device_product": "product123", 
     "device_vendor": "vendor123", 
     "device_version": "1", 
     "severity": "High" 
    } 
] 

しかし、私はこのエラーを取得しています:

io.netty.handler.codec.CorruptedFrameException: invalid JSON received at byte position 0: 504f5354202f6c6f677320485454502f312e310d0a486f73743a206c6f63616c686f73743a383038300d0a436f6e6e656374696f6e3a206b6565702d616c6976650d0a436f6e74656e742d4c656e6774683a203230380d0a4163636570743a206170706c69636174696f6e2f6a736f6e0d0a506f73746d616e2d546f6b656e3a2062383064306264352d663234302d346563622d353631322d3863376139396434633934360d0a43616368652d436f6e74726f6c3a206e6f2d63616368650d0a4f726967696e3a206368726f6d652d657874656e73696f6e3a2f2f6668626a676269666c696e6a62646767656863646463626e636464646f6d6f700d0a557365722d4167656e743a204d6f7a696c6c612f352e30202857696e646f7773204e5420362e313b2057696e36343b2078363429204170706c655765624b69742f3533372e333620284b48544d4c2c206c696b65204765636b6f29204368726f6d652f35352e302e323838332e3837205361666172692f3533372e33360d0a436f6e74656e742d547970653a206170706c69636174696f6e2f6a736f6e0d0a4163636570742d456e636f64696e673a20677a69702c206465666c6174652c2062720d0a4163636570742d4c616e67756167653a20656e2d55532c656e3b713d302e382c6a613b713d302e362c66722d46523b713d302e342c66723b713d302e322c66722d43413b713d302e320d0a0d0a7b2274696d657374616d70223a2022323031362d31312d31342031313a30383a30392b30313030222c226d657373616765223a20226d65737361676520313230222c22686f73746e616d65223a20226d79686f73742e636f6d222c200a09226465766963655f70726f64756374223a202270726f64756374313233222c200a09226465766963655f76656e646f72223a202276656e646f72313233222c200a09226465766963655f76657273696f6e223a202231222c200a09227365766572697479223a202248696768220a090a097d 
    at io.netty.handler.codec.json.JsonObjectDecoder.decode(JsonObjectDecoder.java:163) 
    at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:316) 
    at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:230) 
    at io.netty.channel.ChannelHandlerInvokerUtil.invokeChannelReadNow(ChannelHandlerInvokerUtil.java:84) 
    at io.netty.channel.DefaultChannelHandlerInvoker.invokeChannelRead(DefaultChannelHandlerInvoker.java:153) 
    at io.netty.channel.PausableChannelEventExecutor.invokeChannelRead(PausableChannelEventExecutor.java:86) 
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:389) 
    at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:956) 
    at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:127) 
    at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:514) 
    at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:471) 
    at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:385) 
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:351) 
    at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:116) 
    at io.netty.util.internal.chmv8.ForkJoinTask$RunnableExecuteAction.exec(ForkJoinTask.java:1412) 
    at io.netty.util.internal.chmv8.ForkJoinTask.doExec(ForkJoinTask.java:280) 
    at io.netty.util.internal.chmv8.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:877) 
    at io.netty.util.internal.chmv8.ForkJoinPool.scan(ForkJoinPool.java:1706) 
    at io.netty.util.internal.chmv8.ForkJoinPool.runWorker(ForkJoinPool.java:1661) 
    at io.netty.util.internal.chmv8.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:126) 

私は、私が行方不明ですか分かりません。 Ratpackを使ってこれを達成する方法が分かっている人は、私を助けてください。 ありがとうございます。

答えて

1

問題はJSONデコーダがパイプラインの最初のハンドラで、HTTPポストのデコードを試みていることです。

POST /logs HTTP/1.1 Host: localhost:8080 Connection: keep-alive Content-Length: 208 Accept: application/json Postman-Token: <TOKEN REMOVED> Cache-Control: no-cache Origin: chrome-extension://<ID REMOVED> User-Agent: Mozilla/5.0 (Windows NT 6.1; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/55.0.2883.87 Safari/537.36 Content-Type: application/json Accept-Encoding: gzip, deflate, br Accept-Language: en-US,en;q=0.8,ja;q=0.6,fr-FR;q=0.4,fr;q=0.2,fr-CA;q=0.2

{"timestamp": "2016-11-14 11:08:09+0100","message": "message 120","hostname": "myhost.com", "device_product": "product123", "device_vendor": "vendor123", "device_version": "1", "severity": "High" }

:私はあなたが投稿エラーメッセージから無効なデータ・ストリームを取る場合、結果は...

import javax.xml.bind.DatatypeConverter; 
v = "504f5354202f6c6f677320485...<snip>"; 
byte[] bytes = DatatypeConverter.parseHexBinary(v); 
println new String(bytes) 

をバイトに戻ってそれを解析し、それから文字列を(Groovyで)を作成だから、あなたはJSONデコーダの前にパイプラインにこれ​​らを追加する必要があります。

  1. HttpServerCodec
  2. HttpObjectAggregator(大柱のために、データをチャンクすることができる)
  3. [完全] HttpRequestを受け入れ、(ByteBufとして)コンテンツを転送するMessageToMessageDecodee。

JSONデコーダはJSONバイトのチャンクを取得し、解析されたメッセージを上流に送信し始めます。

+0

お返事ありがとうございます。しかし、それは私の非同期要求を満たしません。ストリーム内のデータを処理したいのですが、HttpServerCodec、HttpObjectDecoderなどを追加すると、その前にFullHttpRequestを構築するためにブロックされます。したがって、実際のストリーム処理ではありません。 –

+0

HTTPを送信しません。 JSONをバイトストリームとして直接処理するだけです。 HTTPはそれをストリームしません。 – Nicholas

+0

どうすればいいですか...非常に大きなサイズのデータ​​を含むAPIを作成していました... –

1

HTTP POSTでこれを行うには、要求がチャンクされていることを確認する必要があります。 - HttpRequestになります最初のものを除いて、HttpContentのインスタンスを転送します

  1. HttpServerCodec: これは、あなたのパイプラインで実行する必要がありますものの近似です。
  2. HttpContentインスタンスを受け入れるMessageToMessageDecoderは、コンテンツByteBufを抽出して転送します。
  3. JSONデコーダ。
  4. JSONハンドラ。

ある時点で、最後のチャンクとなるLastHttpContentのインスタンスでもあるHttpContentが取得されます。

難しいところは、HttpContentの1つにJSONデコーダのエラーがトリガされる不完全なJSONシーケンスがあることです。この時点で、ByteBufを最後に知られていた「正常な」状態に巻き戻す必要があります。私はこれが自動的に処理されるとは思わないので、次のチャンクが来て終了するのを待ちます。

+0

ありがとうございました。私はあなたが示唆したように試みます。私はそれがうまくいくと思う。 –

関連する問題