2017-12-06 12 views
0

私は単純なVertxベースのWebSocketチャットアプリケーションを持っています。これは、MsgServerVerticleとMsgClientVerticleの2つの部分(以下のソースコード)で構成されています。したがって、1つのサーバーと1つのクライアントのみをインスタンス化する場合、正常に動作するように見えます。 2番目のクライアントが接続すると、サーバーは他のクライアントに通知しようとします。そして、奇妙なこと。 Logは、ループバックされたネットは、ループ内で連続してwebsocketフレームをエンコード・デコードしていると言っています。どのタイプのフレームを使用しているのか、バイナリかテキストか、違いはありません。問題は同じです。 log screenshot hereVertx Websocketsは、第2のクライアントを接続した後、ループでエンコードデコンディングを開始します。

何が問題なのですか。

MsgClientVerticleソースコード:

private Logger L; 

private String eBusTag; 
private String backwardTag; 
private String targetHost; 
private int port; 
private String id; 
private String path; 

private EventBus eBus; 

private HttpClient client; 

public MsgClientVerticle(String eBusTag, String targetHost, int port, String path, String id, String backwardTag) { 
    this.eBusTag = eBusTag; 
    this.targetHost = targetHost; 
    this.path = path; 
    this.port = port; 
    this.id = id; 
    this.backwardTag = backwardTag; 

    L = LoggerFactory.getLogger(eBusTag); 
} 

@Override 
public void start(Future<Void> startFuture) throws Exception { 
    L.info("Initializing client connection to " + targetHost + ":" + port + path); 
    eBus = vertx.eventBus(); 

    try { 


     client = vertx.createHttpClient(); 

     client.websocket(port, targetHost, path, webSock -> { 
      L.info("Connected to " + targetHost + ":" + port + "/" + path); 
      eBus.publish(backwardTag, Utils.msg("Connected")); 
      webSock.binaryMessageHandler(buf -> { 
       eBus.publish(backwardTag, Utils.bufToJson(buf)); 
      }); 
      eBus.consumer(eBusTag).handler(msg -> { 
       JsonObject message = (JsonObject) msg.body(); 
       webSock.writeBinaryMessage(Utils.jsonToBuf(message)); 
      }); 
     }); 
    } catch (NullPointerException e) { 
     L.error("Null Pointer: " + e.getLocalizedMessage()); 
     e.printStackTrace(); 
    } 


    startFuture.complete(); 
} 

@Override 
public void stop(Future<Void> stopFuture) throws Exception { 
    L.info("Connection to " + targetHost + ":" + port + "/" + path + " closed"); 
    client.close(); 
    stopFuture.complete(); 
} 

そしてMsgServerVerticleソース:

private Logger L; 

private String path; 
private int port; 
private String eBusTag; 
private String backwardTag; 

private HttpServer server; 

private EventBus eBus; 

private Set<ServerWebSocket> conns; 

public MsgServerVerticle(int port, String eBusTag, String backwardTag) { 
    this.port = port; 
    this.eBusTag = eBusTag; 
    this.backwardTag = backwardTag; 

    conns = new ConcurrentSet<>(); 
    path = eBusTag; 

    L = LoggerFactory.getLogger(eBusTag); 

} 

@Override 
public void start(Future<Void> startFuture) throws Exception { 
    eBus = vertx.eventBus(); 
    L.info("Initializing server instance at port " + port); 

    server = vertx.createHttpServer(); 

    server.websocketHandler(webSock -> { 

     if (!webSock.path().equals(path)) { 

      webSock.reject(); 

     } else { 

      conns.add(webSock); 

      conns.forEach(sock -> { 
       if (sock != webSock) { 
        sock.writeBinaryMessage(Utils.jsonToBuf(Utils.msg("SERVER: new client " + webSock.remoteAddress().toString()))); 
       } 
      }); 

      eBus.publish(backwardTag, Utils.msg("SERVER: new client " + webSock.remoteAddress().toString())); 

      webSock.binaryMessageHandler(buf -> { 
       JsonObject msg = Utils.bufToJson(buf); 
       conns.forEach(sock -> { 
        if (sock != webSock) { 
         sock.writeBinaryMessage(buf); 
        } 
       }); 
       eBus.publish(backwardTag, msg); 
      }); 

     } 

    }); 

    server.listen(port); 

    startFuture.complete(); 
} 

@Override 
public void stop(Future<Void> stopFuture) throws Exception { 
    conns.forEach(sock -> { 
     sock.writeFinalTextFrame("Server is shutting down..."); 
    }); 
    server.close(); 
    stopFuture.complete(); 
} 

答えて

0

私はあなたの元の問題を再現することができませんでした。しかし、まずそれをテストするためにいくつかの変更を加えなければなりませんでした。

this.path = "/" + eBusTag;

は、そうでない場合は、このチェックは必ず失敗します:

if (!webSock.path().equals(this.path)) {

websock.path()ので常に/anythingしたがって、/を開始します=/

1つの変更は、サーバーの初期化です= anything

第二に、私はクライアントを初期化する方法を見て、そしてあなたが同じことを行うかどうかを確認してください:

final Vertx vertx = Vertx.vertx(); 

    vertx.deployVerticle(new MsgServerVerticle(8080, 
      "ebus", 
      "back"), new DeploymentOptions().setWorker(true), (r) -> { 
     vertx.deployVerticle(new MsgClientVerticle("ebus", 
       "127.0.0.1", 
       8080, 
       "/ebus", 
       "a", 
       "back"), new DeploymentOptions().setWorker(true), (r2) -> { 
      vertx.deployVerticle(new MsgClientVerticle("ebus", 
        "127.0.0.1", 
        8080, 
        "/ebus", 
        "b", 
        "back"), new DeploymentOptions().setWorker(true), (r3) -> { 
       System.out.println("Done"); 
      }); 
     }); 
    }); 

そして第三に、私の知る限り理解できるよう、あなたのUtilsのクラスを使用すると、実装ものです。私の実装は次のようになります:

public class Utils { 
    public static Buffer jsonToBuf(final JsonObject message) { 
     return message.toBuffer(); 
    } 

    public static JsonObject bufToJson(final Buffer buf) { 
     return buf.toJsonObject(); 
    } 

    public static JsonObject msg(final String msg) { 
     return new JsonObject("{\"value\":\"" + msg + "\"}"); 
    } 
} 

希望はあなたの問題を特定するのに役立ちます。

関連する問題