問題の説明:ネッティー(ネッティー-すべて-4.1.12.Final.jar)にjava.io.IOException:既存の接続はリモートホストに強制的に切断された
私のプログラムはネッティーサーバとクライアントを作成しますそのサーバーに2^17の接続を行い、クライアントがこの例外の受信を開始することがあります。
java.io.IOException: Istniejące połączenie zostało gwałtownie zamknięte przez zdalnego hosta
英語と同等です:
java.io.IOException: An existing connection was forcibly closed by the remote host
もちろんサーバが強制的に既存の接続を閉じることが望まれていません。
再現手順:私はそれを再生し、この「単一の実行可能なjavaファイル」プログラムを作成しました。この問題を再現するために喜んで誰の便宜上、それだけでnetty-all-4.1.12.Final.jar
依存性を必要とする
。これは、いくつかの空きポートでnettyサーバーを起動し、クライアントを作成し、要求を実行し、要求を処理するチャンスを与えるために少し待ってから、接続の数に関する統計情報を出力します。 サーバが遭遇した回数、および何種類の例外が発生したかクライアントが発生しました。
package netty.exception.tst;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.net.InetSocketAddress;
import java.util.Collections;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import io.netty.bootstrap.Bootstrap;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
public class NettyException {
public static void main(String[] args) throws InterruptedException {
System.out.println("starting server");
NettyServer server = new NettyServer(0);
int port = server.getPort();
System.out.println("server started at port: " + port);
System.out.println("staring client");
NettyClient client = new NettyClient();
System.out.println("client started");
int noOfConnectionsToPerform = 1 << 17;
System.out.println("performing " + noOfConnectionsToPerform + " connections");
for (int n = 0; n < noOfConnectionsToPerform; n++) {
// send a request
ChannelFuture f = client.getBootstrap().connect("localhost", port);
}
System.out.println("client performed " + noOfConnectionsToPerform + " connections");
System.out.println("wait a bit to give a chance for server to finish processing incoming requests");
Thread.currentThread().sleep(80000);
System.out.println("shutting down server and client");
server.stop();
client.stop();
System.out.println("stopped, server received: " + server.connectionsCount() + " connections");
int numberOfLostConnections = noOfConnectionsToPerform - server.connectionsCount();
if (numberOfLostConnections > 0) {
System.out.println("Where do we lost " + numberOfLostConnections + " connections?");
}
System.out.println("srerver exceptions: ");
printExceptions(server.getExceptions());
System.out.println("client exceptions: ");
printExceptions(client.getExceptions());
}
private static void printExceptions(Map<String, Integer> exceptions) {
if (exceptions.isEmpty()) {
System.out.println("There was no exceptions");
}
for (Entry<String, Integer> exception : exceptions.entrySet()) {
System.out.println("There was " + exception.getValue() + " times this exception:");
System.out.println(exception.getKey());
}
}
public static class NettyServer {
private ChannelFuture channelFuture;
private EventLoopGroup bossGroup;
private EventLoopGroup workerGroup;
private AtomicInteger connections = new AtomicInteger(0);
private ExceptionCounter exceptionCounter = new ExceptionCounter();
public NettyServer(int port) throws InterruptedException {
bossGroup = new NioEventLoopGroup();
workerGroup = new NioEventLoopGroup();
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new TimeServerHandler() {
@Override
public void channelActive(final ChannelHandlerContext ctx) {
connections.incrementAndGet();
super.channelActive(ctx);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
exceptionCounter.countException(cause);
super.exceptionCaught(ctx, cause);
}
});
}
}).option(ChannelOption.SO_BACKLOG, 128).childOption(ChannelOption.SO_KEEPALIVE, true);
channelFuture = serverBootstrap.bind(port).sync();
}
public int getPort() {
return ((InetSocketAddress) channelFuture.channel().localAddress()).getPort();
}
public int connectionsCount() {
return connections.get();
}
public Map<String, Integer> getExceptions() {
return exceptionCounter.getExceptions();
}
public void stop() {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
try {
bossGroup.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS);
workerGroup.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public static class NettyClient {
private Bootstrap bootstrap;
private EventLoopGroup workerGroup;
private ExceptionCounter exceptionCounter = new ExceptionCounter();
public NettyClient() {
workerGroup = new NioEventLoopGroup();
bootstrap = new Bootstrap();
bootstrap.group(workerGroup);
bootstrap.channel(NioSocketChannel.class);
bootstrap.option(ChannelOption.SO_KEEPALIVE, true);
bootstrap.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new TimeClientHandler() {
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
exceptionCounter.countException(cause);
super.exceptionCaught(ctx, cause);
}
});
}
});
}
public Bootstrap getBootstrap() {
return bootstrap;
}
public void stop() {
workerGroup.shutdownGracefully();
try {
workerGroup.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public Map<String, Integer> getExceptions() {
return exceptionCounter.getExceptions();
}
}
public static class TimeServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelActive(final ChannelHandlerContext ctx) {
final ByteBuf time = ctx.alloc().buffer(4);
time.writeInt((int) (System.currentTimeMillis()/1000L + 2208988800L));
final ChannelFuture f = ctx.writeAndFlush(time);
f.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) {
assert f == future;
ctx.close();
}
});
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
ctx.close();
}
}
public static class TimeClientHandler extends ChannelInboundHandlerAdapter {
private ThreadLocal<ByteBuf> buf = new ThreadLocal<ByteBuf>();
@Override
public void handlerAdded(ChannelHandlerContext ctx) {
buf.set(ctx.alloc().buffer(4));
}
@Override
public void handlerRemoved(ChannelHandlerContext ctx) {
buf.get().release();
buf.remove();
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ByteBuf m = (ByteBuf) msg;
buf.get().writeBytes(m);
m.release();
if (buf.get().readableBytes() >= 4) {
long currentTimeMillis = (buf.get().readUnsignedInt() - 2208988800L) * 1000L;
ctx.close();
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
ctx.close();
}
}
public static class ExceptionCounter {
private ConcurrentHashMap<String, AtomicInteger> exceptions = new ConcurrentHashMap<String, AtomicInteger>();
private void countException(Throwable cause) {
StringWriter writer = new StringWriter();
cause.printStackTrace(new PrintWriter(writer));
String stackTrace = writer.toString();
AtomicInteger exceptionCount = exceptions.get(stackTrace);
if (exceptionCount == null) {
exceptionCount = new AtomicInteger(0);
AtomicInteger prevCount = exceptions.putIfAbsent(stackTrace, exceptionCount);
if (prevCount != null) {
exceptionCount = prevCount;
}
}
exceptionCount.incrementAndGet();
}
public Map<String, Integer> getExceptions() {
Map<String, Integer> newMap = exceptions.entrySet().stream()
.collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().get()));
return Collections.unmodifiableMap(newMap);
}
}
}
出力は次のようになります。
starting server
server started at port: 56069
staring client
client started
performing 131072 connections
client performed 131072 connections
wait a bit to give a chance for server to finish processing incoming requests
shutting down server and client
stopped, server received: 34735 connections
Where do we lost 96337 connections?
srerver exceptions:
There was no exceptions
client exceptions:
There was 258 times this exception:
java.io.IOException: Istniejące połączenie zostało gwałtownie zamknięte przez zdalnego hosta
at sun.nio.ch.SocketDispatcher.read0(Native Method)
at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:43)
at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
at sun.nio.ch.IOUtil.read(IOUtil.java:192)
at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380)
at io.netty.buffer.PooledUnsafeDirectByteBuf.setBytes(PooledUnsafeDirectByteBuf.java:288)
at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:1100)
at io.netty.buffer.WrappedByteBuf.writeBytes(WrappedByteBuf.java:813)
at io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:372)
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:123)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:644)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:579)
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:496)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:458)
at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858)
at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:138)
at java.lang.Thread.run(Thread.java:745)
There was 30312 times this exception:
java.io.IOException: Istniejące połączenie zostało gwałtownie zamknięte przez zdalnego hosta
at sun.nio.ch.SocketDispatcher.read0(Native Method)
at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:43)
at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
at sun.nio.ch.IOUtil.read(IOUtil.java:192)
at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380)
at io.netty.buffer.PooledUnsafeDirectByteBuf.setBytes(PooledUnsafeDirectByteBuf.java:288)
at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:1100)
at io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:372)
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:123)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:644)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:579)
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:496)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:458)
at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858)
at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:138)
at java.lang.Thread.run(Thread.java:745)
質問:
- この例外がスローされるのはなぜ?
- 接続が失われた場所はどこですか?なぜ彼らにエラーがないのですか?
- これを避けるには、この種の「ハイスループット」アプリケーションをプログラムする正しい方法は、既存の接続を失う/壊すような問題がないようにすることですか?
- いいえNettyの専門家が知っていると思います:
TimeClientHandler
のフィールド宣言が静的になるように変更すると、TimeClientHandler.handlerRemoved
にnullポインタ例外がありますか?これは非常に奇妙です、このクラスは何とか複製されますか?またはNioEventLoopGroup
のスレッドが何とか変なのですか?
環境:
- 網状バージョン:網状-全4.1.12.Final.jar
- JVMバージョン: jdk1.8.0_111 64ビット
- OSバージョン: Windows 10 64ビット
確かに、それは物語の一部かもしれませんが、なぜそれが既存の接続を終了させているのですか、そして限界を超えることについて何の誤りもないのですか?私がしようとしているのは、限界を超えて、そのような「トラフィックの多い」アプリケーションをどのようにプログラムするかを学ぶときに、何が起こるのかを過小評価することです。 –