2016-06-18 7 views
1

私はREST経由でバスにメッセージを渡して戻したいと思っています。しかし、私は正しくメッセージバスの受信機を設定することはできません、それはjava.lang.IllegalStateExceptionをスロー:応答はすでに書かれている。実際のメッセージバスでは、さまざまなソースからメッセージを受信し、別のターゲットにメッセージを渡す必要があります。したがって、メッセージをバスに公開するだけです。しかし、メッセージを正しく読み込んでそれらのすべてを処理する方法はありますか?例えば、RESTインターフェースから:そのメッセージを読んでください! 私のシンプルなアプリの開始:Vert.xメッセージをRESTからメッセージバスに渡す/取得する方法は?

public static void main(String[] args) { 
     Vertx vertx = Vertx.vertx(); 
     vertx.deployVerticle(new RESTVerticle()); 
     vertx.deployVerticle(new Receiver()); 
     EventBus eventBus = vertx.eventBus(); 
     eventBus.registerDefaultCodec(MessageDTO.class, new CustomMessageCodec()); 

    } 

RESTの一部

public class RESTVerticle extends AbstractVerticle { 

    private EventBus eventBus = null; 

    @Override 
    public void start() throws Exception { 
     Router router = Router.router(vertx); 
     eventBus = vertx.eventBus(); 
     router.route().handler(BodyHandler.create()); 
     router.route().handler(CorsHandler.create("*") 
       .allowedMethod(HttpMethod.GET) 
       .allowedHeader("Content-Type")); 

     router.post("/api/message").handler(this::publishToEventBus); 
     // router.get("/api/messagelist").handler(this::getMessagesFromBus); 

     router.route("/*").handler(StaticHandler.create()); 
     vertx.createHttpServer().requestHandler(router::accept).listen(9999); 
     System.out.println("Service running at 0.0.0.0:9999"); 

    } 

private void publishToEventBus(RoutingContext routingContext) { 
     System.out.println("routingContext.getBodyAsString() " + routingContext.getBodyAsString()); 
     final MessageDTO message = Json.decodeValue(routingContext.getBodyAsString(), 
       MessageDTO.class); 

     HttpServerResponse response = routingContext.response(); 
     response.setStatusCode(201) 
       .putHeader("content-type", "application/json; charset=utf-8") 
       .end(Json.encodePrettily(message)); 

     eventBus.publish("messagesBus", message); 

    } 

と受信機:私は別のクラスに移動しますが、それは

public class Receiver extends AbstractVerticle { 

    @Override 
    public void start() throws Exception { 
     EventBus eventBus = vertx.eventBus(); 
     Router router = Router.router(vertx); 

     router.route().handler(BodyHandler.create()); 
     router.route().handler(CorsHandler.create("*") 
       .allowedMethod(HttpMethod.GET) 
       .allowedHeader("Content-Type")); 

     router.get("/api/messagelist").handler(this::getMessagesFromBus); 
     router.route("/*").handler(StaticHandler.create()); 

     vertx.createHttpServer().requestHandler(router::accept).listen(9998); 
     System.out.println("Service Receiver running at 0.0.0.0:9998"); 

private void getMessagesFromBus(RoutingContext routingContext) { 
     EventBus eventBus = vertx.eventBus(); 
     eventBus.consumer("messagesBus", message -> { 
      MessageDTO customMessage = (MessageDTO) message.body(); 
      HttpServerResponse response = routingContext.response(); 
      System.out.println("Receiver ->>>>>>>> " + customMessage); 
      if (customMessage != null) { 
       response.putHeader("content-type", "application/json; charset=utf-8") 
         .end(Json.encodePrettily(customMessage)); 
      } 
      response.closed(); 

     }); 
    } 

だから私が投稿した場合には役立ちません。メッセージをRESTに送信し、ハンドラをバスに公開します。実行時にhttp://localhost:9998/api/messagelistが返されます。戻り値はjsonですが、もう一度例外が発生します。

java.lang.IllegalStateException: Response has already been written 
    at io.vertx.core.http.impl.HttpServerResponseImpl.checkWritten(HttpServerResponseImpl.java:561) 
    at io.vertx.core.http.impl.HttpServerResponseImpl.putHeader(HttpServerResponseImpl.java:154) 
    at io.vertx.core.http.impl.HttpServerResponseImpl.putHeader(HttpServerResponseImpl.java:52) 
    at com.project.backend.Receiver.lambda$getMessagesFromBus$0(Receiver.java:55) 
    at io.vertx.core.eventbus.impl.HandlerRegistration.handleMessage(HandlerRegistration.java:207) 
    at io.vertx.core.eventbus.impl.HandlerRegistration.handle(HandlerRegistration.java:201) 
    at io.vertx.core.eventbus.impl.EventBusImpl.lambda$deliverToHandler$127(EventBusImpl.java:498) 
    at io.vertx.core.impl.ContextImpl.lambda$wrapTask$18(ContextImpl.java:335) 
    at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:358) 
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:357) 
    at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:112) 
    at java.lang.Thread.run(Thread.java:745) 

Receiver ->>>>>>>> Message{username=Aaaewfewf2d, message=41414wefwef2d2} 

受信者からすべてのメッセージを正しく取得するにはどうすればよいですか?またはバスがメッセージを受信した場合は、すぐにデータベースに保存する必要がありますか?メッセージバスはメッセージを保持し、メッセージを失うことはできませんか?

おかげ

答えて

2

エントリポイント "/ api/messagelist"でヒットするたびに、リクエストルーティングコンテキストで新しいコンシューマが1つ作成されます。

最初のリクエストでコンシューマが作成され、リクエストに返信されます。 2番目のメッセージが公開されると、そのコンシューマはメッセージを受信し、以前の要求(インスタンス)に返信し、これは終了しました。

私はあなたがイベントバスの目的を誤解していると思います。私は実際にそのドキュメントを読むことをお勧めします。 http://vertx.io/docs/vertx-core/java/#event_bus

1

私はあなたのコードをテストする機会がなかったが、パブリッシュ操作が例外を投げているとVERTXは、エラーメッセージを返送しようとしているようです。しかし、すでに返信して接続を終了しました。

エラーがあなたのコーデックからのものかもしれませんが、非同期な性質のvertexのために、後の段階でしか表示されず、内部エラーハンドラでマングリングされます。

関連する問題