2017-07-19 19 views
0

10-200センサーボックスからデータを読み取るには、JBossにJAVA Nioサーバーアプリケーションを構築する必要があります。彼らはストリームを開いて、いつも私にデータを送ります。コミュニケーションは双方向です。さて、ときには、これらのボックス(またはサーバー)に内部エラーがあることがあります。この種の問題を検出するために、オブザーバスレッドは、最後のチェック以降にデータブロックが入ったかどうかを5秒ごとにチェックします。私のBoxesのどれもがそれまでデータを送信しなかったなら、何か悪いことが起きました。そして私はソケット通信全体を再開したいと思います。JAVA NIOサーバー:すべての接続をリセットする方法

NIOとのソケット接続を構築する方法はよく文書化されていますが、リセットする方法の複雑な例を見つけるのは難しいです。私のウォッチドッグが最後の5秒間にデータがなかったことを検出するとclose()とstartEngine()を呼び出します。しかしその後も、データは到着しません。何かがブロックされているように見えますが、一部のリソースは依然として関連付けられています。 JBossを再起動すると、データが再び届きます。誰かが私にヒントを与えることができますか?

お時間をいただきありがとうございます。 ステファン

public class TestServer 
{ 
    private NIOServer server; 
    private HashMap<String, SocketChannel> clientsList = new HashMap<String, SocketChannel>(); 

    class NIOServer extends Thread 
    { 
     class MessageBuffer 
     { 
       int [] msgAsByte = new int[msgSize]; 
       int pos = 0; 
       int lastSign = 0;          
       int bytesRead = 0; 
     } 
     private ByteBuffer readBuffer = ByteBuffer.allocate(256); 
     private Selector selector; 
     private boolean stop = false; 
     private int[] ports; 
     private int msgSize = 48; 
     private HashMap<String,MessageBuffer> buffer = new HashMap<String, MessageBuffer>(); 

     private List<ServerSocketChannel> channels; 
     // Maps a SocketChannel to a list of ByteBuffer instances 
     private Map<SocketChannel, List<ByteBuffer>> pendingDataToWrite = new HashMap<SocketChannel, List<ByteBuffer>>(); 

     public NIOServer(int[] ports) { 
       this.ports = ports; 
     } 

     private void stopAll() 
     { 
       stop = true; 

       try 
       { 
        server.interrupt(); 
        server.join(3000); 
       } 
       catch (InterruptedException e) { 
        Thread.currentThread().interrupt(); 
       } 
       closeConnections(); 
     } 

     public void sendData(SocketChannel socket, byte[] data) 
     { 
       // And queue the data we want written 
       synchronized (this.pendingDataToWrite) { 
        List<ByteBuffer> queue = (List<ByteBuffer>) this.pendingDataToWrite.get(socket); 
        if (queue == null) { 
          queue = new ArrayList<ByteBuffer>(); 
          this.pendingDataToWrite.put(socket, queue); 
        } 
        queue.add(ByteBuffer.wrap(data)); 
       } 

       SelectionKey key = socket.keyFor(this.selector); 
       if(key != null) 
        key.interestOps(SelectionKey.OP_WRITE); 
       // Finally, wake up our selecting thread so it can make the required changes 
       this.selector.wakeup(); 
     } 

     public void run() 
     { 
       try 
       { 
        stop = false; 
        selector = Selector.open(); 
        channels = new ArrayList<ServerSocketChannel>(); 
        ServerSocketChannel serverchannel; 
        for (int port : ports) 
        { 
          try 
          { 
           serverchannel = ServerSocketChannel.open(); 
           serverchannel.configureBlocking(false); 
           try 
           { 
             serverchannel.socket().setReuseAddress(true); 
           } 
           catch(SocketException se) 
           { 
             // 
           } 
           serverchannel.socket().bind(new InetSocketAddress(port)); 
           serverchannel.register(selector, SelectionKey.OP_ACCEPT); 
           channels.add(serverchannel); 
          } 
          catch(Exception e) 
          { 
           // 
          } 
        } 
        while (!stop) 
        { 

          SelectionKey key = null; 
          try 
          { 
           selector.select(); 
           Iterator<SelectionKey> keysIterator = selector.selectedKeys() 
              .iterator(); 
           while (keysIterator.hasNext()) 
           { 
             key = keysIterator.next(); 

             if(key.isValid()) 
             { 
              if (key.isAcceptable()) 
              { 
                accept(key); 
              } 
              else if (key.isReadable()) 
              { 
                readData(key); 
              } 
              else if (key.isWritable()) 
              { 
                writeData(key); 
              } 
             } 
             else 
             { 
              SocketChannel sc = (SocketChannel) key.channel(); 
             } 
             keysIterator.remove(); 
           } 
          } 
          catch (Exception e) 
          { 
           if(e instanceof IOException || e instanceof ClosedSelectorException) 
           { 
             try 
             { 
              ServerSocketChannel ssc = (ServerSocketChannel) key.channel(); 
              channels.remove(ssc); 
              ssc.close(); 
              key.cancel(); 
             } 
             catch(Exception ex) 
             { 
              // 
             } 

           } 
           else 
           { 
             // 
           } 
          } 
        } 
       } 
       catch(Exception e1) 
       { 
        // 
       } 

       closeConnections(); 

     } 

     private void closeConnections() 
     { 
       //if thread is stopped, close all 
       try 
       { 
        try 
        { 
          if(this.selector == null || this.selector.keys() == null) 
          { 
           log.debug("No selectors or keys found to close"); 
          } 
          else 
          { 
           Iterator<SelectionKey> keys = this.selector.keys().iterator(); 
           while(keys.hasNext()) 
           { 
             SelectionKey key = keys.next(); 
             key.cancel(); 
           } 
          } 
        } 
        catch(Exception ex) { 
          // 
        } 
        if(selector != null) 
          selector.close(); 
        if(channels != null) 
        { 
          for(ServerSocketChannel channel:channels) 
          { 
           channel.socket().close(); 
           channel.close(); 
          } 
        } 

        if(clientsList != null) 
        { 
          Iterator<Map.Entry<String, SocketChannel>> hfm = clientsList.entrySet().iterator(); 
          while(hfm.hasNext()) 
          { 
           Map.Entry<String, SocketChannel> s = hfm.next(); 
           s.getValue().close(); 
          } 
        } 
        clientsList=null; 

        selector = null; 
        channels = null; 
        pendingDataToWrite = null; 
       } 
       catch(Exception e) 
       { 
        // 
       } 

     } 

     private void accept(SelectionKey key) throws IOException 
     { 

       ServerSocketChannel ssc = (ServerSocketChannel) key.channel(); 
       SocketChannel sc = ssc.accept(); 
       sc.configureBlocking(false); 
       sc.register(selector, SelectionKey.OP_READ); 

       String ip = sc.socket().getRemoteSocketAddress().toString(); 
       if(!buffer.containsKey(ip)) 
        buffer.put(ip, new MessageBuffer()); 
     } 

     private void readData(SelectionKey key) throws Exception 
     { 

       SocketChannel sc = (SocketChannel) key.channel();  

       MessageBuffer buf = buffer.get(sc.socket().getRemoteSocketAddress().toString()); 
       try 
       { 
        buf.bytesRead = sc.read(readBuffer); //read into buffer. 
       } 
       catch(Exception e2) 
       { 
        sc.close(); 
        buffer.remove(sc); 
       } 

       //close connection 
       if (buf.bytesRead == -1) 
       { 
        sc.close(); 
        key.cancel(); 
        return; 
       } 

       readBuffer.flip();  //make buffer ready for read 

       while(readBuffer.hasRemaining()) 
       { 
        //Read the data and forward it to another Process... 
       } 

       readBuffer.compact(); //make buffer ready for writing 

     } 

     private void writeData(SelectionKey key) throws Exception 
     { 
       SocketChannel socketChannel = (SocketChannel) key.channel(); 
       synchronized (this.pendingDataToWrite) { 
        List queue = (List) this.pendingDataToWrite.get(socketChannel); 

        // Write until there's not more data ... 
        while (!queue.isEmpty()) { 
          ByteBuffer buf = (ByteBuffer) queue.get(0); 
          try 
          { 
           socketChannel.write(buf); 
          } 
          catch(Exception e) 
          { 
           // 
          } 
          finally 
          { 
           queue.remove(0); 
          } 
          if (buf.remaining() > 0) { 
           // ... or the socket's buffer fills up 
           break; 
          } 
        } 

        key.interestOps(SelectionKey.OP_READ); 
       } 
     } 
    } 



    public void close() { 

     if (server != null && server.isAlive()) 
     {  
        server.stopAll(); 
     } 
     if(clientsList != null) 
     { 
       clientsList.clear(); 
     } 
     server = null; 

    } 

    public void startEngine(int[] ports) { 
     if (ports != null) { 
       for (int port : ports) 
        log.info("Listening on port " + port); 
       server= new NIOServer(ports); 
       server.start(); 
     } 
    } 

} 

答えて

1

select()タイムアウトを使用してください。

タイムアウトが発生した場合は、登録済みのすべてのSocketChannelsを閉じます。

さらに細分化したい場合は、各チャネルの最後のI/O時間を追跡し、各select()ループの最後に期限切れになったI/O時間を閉じます。

NBあなたのOP_WRITEのテクニックは正しくありません。適切に使用する方法を示す多くの回答があります。

+0

あなたのご意見ありがとうございます。このようなスレッドへのあなたのご意見は、このhttps://stackoverflow.com/questions/17556901/java-high-load-nio-tcp-serverのようなものですか?私が書きたいときに書く必要があることと、この操作がゼロを返す場合にのみ、OP_WRITEを登録します。正しい? select + timeoutは、新しいデータが到着しなかったときに認識するのに役立ちますが、close()およびrestartEngine()を呼び出した後に問題は解決しませんでした。まだ新しいデータを取得しません... – user3354754

+0

クライアントが再接続するまで何かを得る。 – EJP

+0

よく、私のjbossを再起動すると、再接続しても問題ありません。しかし、私はソケットを閉じてもそれをしないでください。再接続を強制する方法はありますか? – user3354754

関連する問題