MQTTベースのメッセージング・バックエンドで、現在netty 3.xをnetty 4.1にアップグレード中です。私たちのアプリケーションでは、カスタムMQTTメッセージ・デコーダとエンコーダを使用します。 nettyで参照カウントのByteBufオブジェクトを正しく解放します。4.1
public class MqttMessageDecoder extends ByteToMessageDecoder {
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
if (in.readableBytes() < 2) {
return;
}
.....
.....
.....
byte[] data = new byte[msglength];
in.resetReaderIndex();
in.readBytes(data);
MessageInputStream mis = new MessageInputStream(
new ByteArrayInputStream(data));
Message msg = mis.readMessage();
out.add(msg);
ReferenceCountUtil.release(in);
}
}
Message
は、当社のカスタムオブジェクトがある
ChannelHandler
の
channelRead()
に渡され、次のように私たちのデコーダについては
、私は現在
ByteToMessageDecoderを使用しています。ご覧のとおり、
Message
オブジェクトを作成するとすぐに、
ByteBuf
オブジェクト
in
を受信しました。したがって、
ByteBuf
は参照カウントがネッティであるため、ここでオブジェクトを解放する必要があることは間違いありません。
ReferenceCountUtil.release(in)
を呼び出してください。理想的には、これは
docに従って正しいと思われます。しかし、私はこれを行うとき、私は例外に直面しているように見える:
Wed May 24 io.netty.channel.DefaultChannelPipeline:? WARN netty-workers-7 An exceptionCaught() event was fired, and it reached at the tail of the pipeline. It usually means the last handler in the pipeline did not handle the exception.
io.netty.channel.ChannelPipelineException: com.bsb.hike.mqtt.MqttMessageDecoder.handlerRemoved() has thrown an exception.
at io.netty.channel.DefaultChannelPipeline.callHandlerRemoved0(DefaultChannelPipeline.java:631) [netty-all-4.1.0.Final.jar:4.1.0.Final]
at io.netty.channel.DefaultChannelPipeline.destroyDown(DefaultChannelPipeline.java:867) [netty-all-4.1.0.Final.jar:4.1.0.Final]
at io.netty.channel.DefaultChannelPipeline.access$300(DefaultChannelPipeline.java:45) [netty-all-4.1.0.Final.jar:4.1.0.Final]
at io.netty.channel.DefaultChannelPipeline$9.run(DefaultChannelPipeline.java:874) [netty-all-4.1.0.Final.jar:4.1.0.Final]
at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:339) [netty-all-4.1.0.Final.jar:4.1.0.Final]
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:374) [netty-all-4.1.0.Final.jar:4.1.0.Final]
at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:742) [netty-all-4.1.0.Final.jar:4.1.0.Final]
at java.lang.Thread.run(Thread.java:745) [na:1.8.0_72-internal]
Caused by: io.netty.util.IllegalReferenceCountException: refCnt: 0, decrement: 1
at io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:111) ~[netty-all-4.1.0.Final.jar:4.1.0.Final]
at io.netty.handler.codec.ByteToMessageDecoder.handlerRemoved(ByteToMessageDecoder.java:217) ~[netty-all-4.1.0.Final.jar:4.1.0.Final]
at io.netty.channel.DefaultChannelPipeline.callHandlerRemoved0(DefaultChannelPipeline.java:626) [netty-all-4.1.0.Final.jar:4.1.0.Final]
... 7 common frames omitted
これは、子チャンネルが閉じているときに、パイプライン内のすべてのハンドラが次々に削除されていることを私に伝えます。このデコーダハンドラが閉じられると、がこのデコーダにアタッチされ、以下のメソッドが呼び出されるとIllegalReferenceCountException
例外が発生します。
これはAbstractReferenceCountedByteBuf#release
です:
@Override
public boolean release() {
for (;;) {
int refCnt = this.refCnt;
if (refCnt == 0) {
throw new IllegalReferenceCountException(0, -1);
}
if (refCntUpdater.compareAndSet(this, refCnt, refCnt - 1)) {
if (refCnt == 1) {
deallocate();
return true;
}
return false;
}
}
}
この問題が発生しないように、そしてByteBuf
オブジェクトを解放するための正しい方法は何ですか?あなたが設定に関するこれ以上の情報が必要な場合は私に知らせてください
new ServerBootstrap().childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
-
私はPooledByteBufAllocator
を使用しています。
EDIT:
アドオンFerrybigの答えについては、ByteToMessageDecoder#channelRead
は、それ自体で、着信ByteBuf
Sの解放を処理します。 finally
ブロックを参照してください -
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (msg instanceof ByteBuf) {
CodecOutputList out = CodecOutputList.newInstance();
try {
ByteBuf data = (ByteBuf) msg;
first = cumulation == null;
if (first) {
cumulation = data;
} else {
cumulation = cumulator.cumulate(ctx.alloc(), cumulation, data);
}
callDecode(ctx, cumulation, out);
} catch (DecoderException e) {
throw e;
} catch (Throwable t) {
throw new DecoderException(t);
} finally {
if (cumulation != null && !cumulation.isReadable()) {
numReads = 0;
cumulation.release();
cumulation = null;
} else if (++ numReads >= discardAfterReads) {
// We did enough reads already try to discard some bytes so we not risk to see a OOME.
// See https://github.com/netty/netty/issues/4275
numReads = 0;
discardSomeReadBytes();
}
int size = out.size();
decodeWasNull = !out.insertSinceRecycled();
fireChannelRead(ctx, out, size);
out.recycle();
}
} else {
ctx.fireChannelRead(msg);
}
}
インバウンドByteBuf
があなたのデコーダの後の次のハンドラは、あなたのビジネスであれば、このByteBuf
の参照カウントがByteBuf#retain
などにより増加され、パイプライン下り次のチャネルハンドラに転送されている場合ハンドラ(通常はそうです)の場合は、メモリリークを避けるためにそのオブジェクトByteBuf
を解放する必要があります。これはdocsにも記載されています。