2017-05-24 34 views
1

MQTTベースのメッセージング・バックエンドで、現在netty 3.xをnetty 4.1にアップグレード中です。私たちのアプリケーションでは、カスタムMQTTメッセージ・デコーダとエンコーダを使用します。 nettyで参照カウントのByteBufオブジェクトを正しく解放します。4.1

public class MqttMessageDecoder extends ByteToMessageDecoder { 

    @Override 
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { 
     if (in.readableBytes() < 2) { 
      return; 
     } 

     ..... 
     ..... 
     ..... 

     byte[] data = new byte[msglength]; 
     in.resetReaderIndex(); 
     in.readBytes(data); 
     MessageInputStream mis = new MessageInputStream(
       new ByteArrayInputStream(data)); 
     Message msg = mis.readMessage(); 
     out.add(msg); 
     ReferenceCountUtil.release(in); 
    } 
} 

Messageは、当社のカスタムオブジェクトがある

が、それは次の ChannelHandlerchannelRead()に渡され、次のように私たちのデコーダについては

、私は現在 ByteToMessageDecoderを使用しています。ご覧のとおり、 Messageオブジェクトを作成するとすぐに、 ByteBufオブジェクト inを受信しました。したがって、 ByteBufは参照カウントがネッティであるため、ここでオブジェクトを解放する必要があることは間違いありません。 ReferenceCountUtil.release(in)を呼び出してください。理想的には、これは docに従って正しいと思われます。しかし、私はこれを行うとき、私は例外に直面しているように見える:

Wed May 24 io.netty.channel.DefaultChannelPipeline:? WARN netty-workers-7 An exceptionCaught() event was fired, and it reached at the tail of the pipeline. It usually means the last handler in the pipeline did not handle the exception. 
io.netty.channel.ChannelPipelineException: com.bsb.hike.mqtt.MqttMessageDecoder.handlerRemoved() has thrown an exception. 
    at io.netty.channel.DefaultChannelPipeline.callHandlerRemoved0(DefaultChannelPipeline.java:631) [netty-all-4.1.0.Final.jar:4.1.0.Final] 
    at io.netty.channel.DefaultChannelPipeline.destroyDown(DefaultChannelPipeline.java:867) [netty-all-4.1.0.Final.jar:4.1.0.Final] 
    at io.netty.channel.DefaultChannelPipeline.access$300(DefaultChannelPipeline.java:45) [netty-all-4.1.0.Final.jar:4.1.0.Final] 
    at io.netty.channel.DefaultChannelPipeline$9.run(DefaultChannelPipeline.java:874) [netty-all-4.1.0.Final.jar:4.1.0.Final] 
    at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:339) [netty-all-4.1.0.Final.jar:4.1.0.Final] 
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:374) [netty-all-4.1.0.Final.jar:4.1.0.Final] 
    at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:742) [netty-all-4.1.0.Final.jar:4.1.0.Final] 
    at java.lang.Thread.run(Thread.java:745) [na:1.8.0_72-internal] 
Caused by: io.netty.util.IllegalReferenceCountException: refCnt: 0, decrement: 1 
    at io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:111) ~[netty-all-4.1.0.Final.jar:4.1.0.Final] 
    at io.netty.handler.codec.ByteToMessageDecoder.handlerRemoved(ByteToMessageDecoder.java:217) ~[netty-all-4.1.0.Final.jar:4.1.0.Final] 
    at io.netty.channel.DefaultChannelPipeline.callHandlerRemoved0(DefaultChannelPipeline.java:626) [netty-all-4.1.0.Final.jar:4.1.0.Final] 
    ... 7 common frames omitted 

これは、子チャンネルが閉じているときに、パイプライン内のすべてのハンドラが次々に削除されていることを私に伝えます。このデコーダハンドラが閉じられると、がこのデコーダにアタッチされ、以下のメソッドが呼び出されるとIllegalReferenceCountException例外が発生します。

これはAbstractReferenceCountedByteBuf#releaseです:

@Override 
    public boolean release() { 
     for (;;) { 
      int refCnt = this.refCnt; 
      if (refCnt == 0) { 
       throw new IllegalReferenceCountException(0, -1); 
      } 

      if (refCntUpdater.compareAndSet(this, refCnt, refCnt - 1)) { 
       if (refCnt == 1) { 
        deallocate(); 
        return true; 
       } 
       return false; 
      } 
     } 
    } 

この問題が発生しないように、そしてByteBufオブジェクトを解放するための正しい方法は何ですか?あなたが設定に関するこれ以上の情報が必要な場合は私に知らせてください

new ServerBootstrap().childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT) 

-

私はPooledByteBufAllocatorを使用しています。


EDIT

アドオンFerrybigの答えについては、ByteToMessageDecoder#channelReadは、それ自体で、着信ByteBuf Sの解放を処理します。 finallyブロックを参照してください -

@Override 
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { 
     if (msg instanceof ByteBuf) { 
      CodecOutputList out = CodecOutputList.newInstance(); 
      try { 
       ByteBuf data = (ByteBuf) msg; 
       first = cumulation == null; 
       if (first) { 
        cumulation = data; 
       } else { 
        cumulation = cumulator.cumulate(ctx.alloc(), cumulation, data); 
       } 
       callDecode(ctx, cumulation, out); 
      } catch (DecoderException e) { 
       throw e; 
      } catch (Throwable t) { 
       throw new DecoderException(t); 
      } finally { 
       if (cumulation != null && !cumulation.isReadable()) { 
        numReads = 0; 
        cumulation.release(); 
        cumulation = null; 
       } else if (++ numReads >= discardAfterReads) { 
        // We did enough reads already try to discard some bytes so we not risk to see a OOME. 
        // See https://github.com/netty/netty/issues/4275 
        numReads = 0; 
        discardSomeReadBytes(); 
       } 

       int size = out.size(); 
       decodeWasNull = !out.insertSinceRecycled(); 
       fireChannelRead(ctx, out, size); 
       out.recycle(); 
      } 
     } else { 
      ctx.fireChannelRead(msg); 
     } 
    } 

インバウンドByteBufがあなたのデコーダの後の次のハンドラは、あなたのビジネスであれば、このByteBufの参照カウントがByteBuf#retainなどにより増加され、パイプライン下り次のチャネルハンドラに転送されている場合ハンドラ(通常はそうです)の場合は、メモリリークを避けるためにそのオブジェクトByteBufを解放する必要があります。これはdocsにも記載されています。

答えて

1

すべてのハンドラで、渡されたbytebufを破棄する必要はありません。 ByteToMessageDecoderがその1つです。

この理由は、このハンドラは、複数の着信bytebufsを収集し、コーディングを容易にするために、バイトの1つの連続ストリームとしてアプリケーションにそれらを公開し、自分が

ことを覚えておいてくださいこれらのチャンクを処理する必要がないということですjavadocで述べたように、readBytesまたはreadSliceを使用して、作成したbytebufsを手動で解放する必要があります。

関連する問題