2017-05-26 15 views
0

私はKafkaブローカーにメッセージを投稿するSpring Cloudマイクロサービスを持っていますが、このマイクロサービスはREST APIを介してアクセスできます。ListenableFuture - 返信前に待機する方法

送信者のステータスを呼び出し元に戻したいのですが、Javaが待たないようです。私のコードが返る前にこれを成功か失敗かを待つようにするには?

相続コード:

kafkaProduc.send("topictest", msg).addCallback(
       new ListenableFutureCallback<SendResult<String, ExecutionDataMessage>>() { 
    @Override 
    public void onSuccess(SendResult<String, ExecutionDataMessage> result) { 
     eresp.status = "ok"; 
     eresp.msg = "message submitted successfully"; 
    } 

    @Override 
    public void onFailure(Throwable ex) { 
     eresp.status = "error"; 
     eresp.msg = "failure while sending data to kafka. exception: " + ex.getMessage(); 
    } 
}); 
HttpStatus erespStatus = eresp.status == "ok" ? HttpStatus.CREATED : HttpStatus.BAD_REQUEST; 
return new ResponseEntity<ExecutionResponse>(eresp, erespStatus); 

答えて

0

コールバックを使用して、非同期の結果をしたいときのためです。あなたは、呼び出し元のスレッドをブロックしたい場合は、future.get()を使用して...

ListenableFuture<SendResult<String, String>> future = template.send("foo", "bar"); 
try { 
    SendResult<String, String> sendResult = future.get(10, TimeUnit.SECONDS); 
} 
catch (InterruptedException e) { 
    // TODO Auto-generated catch block 
    e.printStackTrace(); 
    Thread.currentThread.interrupt(); 
} 
catch (ExecutionException e) { 
    // TODO Auto-generated catch block 
    e.printStackTrace(); 
} 
catch (TimeoutException e) { 
    // TODO Auto-generated catch block 
    e.printStackTrace(); 
} 
+0

ありがとう、ゲイリー、このアプローチを試してみましょう。しかし、スレッドが現在ブロックされていて、別の呼び出し元が実行メソッドを要求すると、春に新しいスレッドが作成されるか、ブロックされたままになります。 – Alexandre

+0

RESTの場合、スレッドごとに(通常は)TomcatなどのWebサーバーによって管理されます。各リクエストは独自のスレッドを取得します。テンプレートはスレッドセーフです。 –

+0

チャームのように働いた!どうもありがとう! – Alexandre

0

Kafkaproducer.sendは未来を返します。あなたはその後、待ちたい場合は、おそらくあなたは次のことを好むだろう:

kafkaProduc.send("topictest", msg).get(1L, TimeUnit.SECONDS); 

が続いて障害がエラーコールバックであなたを呼び出すのではなく、例外を発生する可能性があります。

+0

そのアプローチもうまくいきます、ありがとうジェームス! – Alexandre

関連する問題