2017-06-13 20 views
0

入力ストリームレートメーターを作成しています。これは、基本的に要求ストリーム呼び出しを公開し、処理できる1秒あたりのメッセージ数をカウントするサービスです。ClientCallStreamObserver isReadyがtrueを返さない

メッセージを送信する際にクライアントが完全に非同期であるため、メモリのオーバーフローを避けるために、ストリームの準備ができたときにClientCallStreamObserverを使用してメッセージの送信を開始します。

クライアントコードは次のようになります。

public static void main(String[] args) throws Exception { 
    ManagedChannel channel = ManagedChannelBuilder.forAddress("server", 4242).usePlaintext(true).build(); 
    ServerGrpc.ServerStub asyncStub = ServerGrpc.newStub(channel); 


    StreamObserver<MarketDataOuterClass.Trade> inputStream = asyncStub.reportNewTradeStream(new StreamObserver<Empty>() { 
     @Override 
     public void onNext(Empty empty) { 

     } 

     @Override 
     public void onError(Throwable throwable) { 
      logger.info("on error response stream"); 
     } 

     @Override 
     public void onCompleted() { 
      logger.info("on completed response stream"); 
     } 
    }); 

    final ClientCallStreamObserver<MarketDataOuterClass.Trade> clientCallObserver = (ClientCallStreamObserver<MarketDataOuterClass.Trade>) inputStream; 

    while (!clientCallObserver.isReady()) { 
     Thread.sleep(2000); 
     logger.info("stream not ready yet"); 
    } 

    counter.setLastTic(System.nanoTime()); 

    while (true) { 
     counter.inc(); 
     if (counter.getCounter() % 15000 == 0) { 
      long now = System.nanoTime(); 
      double rate = (double) NANOSEC_TO_SEC * counter.getCounter()/(now - counter.getLastTic()); 
      logger.info("rate: " + rate + " msgs per sec"); 
      counter.clear(); 
      counter.setLastTic(now); 
     } 
     inputStream.onNext(createRandomTrade()); 
    } 
} 

isReady上で、私の観察ループが終了されることはありません。

OBS:kubernetesクラスターを使用してテストを行いました。サーバーは呼び出しを受け取り、StreamObserver実装を返します。

答えて

1

isReady RPCはすぐにエラー/完了しない限り、最終的にtrueを返す必要があります。しかし、コードはフロー制御を適切に監視していません。

onNext()を呼び出すたびに、isReady()の要求を送信すると、falseが返されます。あなたのwhile (true)ループの代わりに、各反復の開始時にisReady()チェックが必要です。

ポーリングの代わりに、serverCallObserver.setOnReadyHandler(yourRunnable)にコールを送信する準備ができたときに通知するほうがよいです。 yourRunnableの中にはまだisReady()をチェックする必要があります。偽/期限切れの通知がある可能性があります。

+0

メッセージを送信する前に確認を正しく変更しました。 setOnReadyHandlerは見た目が良くて、試してみます。 –

+1

@FelipeJunでは、RPCが完了するまでmain()をブロックする必要があります。あなたが示したコードのようなもののために、最も簡単なことは、RPCを起動した後に 'channel.shutdown()'を実行した後、 'channel.awaitTerminated()'を実行することです。 RPCが完了すると、チャネルは終了し、 'awaitTerminated()'が返ります。 –

関連する問題