入力ストリームレートメーターを作成しています。これは、基本的に要求ストリーム呼び出しを公開し、処理できる1秒あたりのメッセージ数をカウントするサービスです。ClientCallStreamObserver isReadyがtrueを返さない
メッセージを送信する際にクライアントが完全に非同期であるため、メモリのオーバーフローを避けるために、ストリームの準備ができたときにClientCallStreamObserverを使用してメッセージの送信を開始します。
クライアントコードは次のようになります。
public static void main(String[] args) throws Exception {
ManagedChannel channel = ManagedChannelBuilder.forAddress("server", 4242).usePlaintext(true).build();
ServerGrpc.ServerStub asyncStub = ServerGrpc.newStub(channel);
StreamObserver<MarketDataOuterClass.Trade> inputStream = asyncStub.reportNewTradeStream(new StreamObserver<Empty>() {
@Override
public void onNext(Empty empty) {
}
@Override
public void onError(Throwable throwable) {
logger.info("on error response stream");
}
@Override
public void onCompleted() {
logger.info("on completed response stream");
}
});
final ClientCallStreamObserver<MarketDataOuterClass.Trade> clientCallObserver = (ClientCallStreamObserver<MarketDataOuterClass.Trade>) inputStream;
while (!clientCallObserver.isReady()) {
Thread.sleep(2000);
logger.info("stream not ready yet");
}
counter.setLastTic(System.nanoTime());
while (true) {
counter.inc();
if (counter.getCounter() % 15000 == 0) {
long now = System.nanoTime();
double rate = (double) NANOSEC_TO_SEC * counter.getCounter()/(now - counter.getLastTic());
logger.info("rate: " + rate + " msgs per sec");
counter.clear();
counter.setLastTic(now);
}
inputStream.onNext(createRandomTrade());
}
}
isReady上で、私の観察ループが終了されることはありません。
OBS:kubernetesクラスターを使用してテストを行いました。サーバーは呼び出しを受け取り、StreamObserver実装を返します。
メッセージを送信する前に確認を正しく変更しました。 setOnReadyHandlerは見た目が良くて、試してみます。 –
@FelipeJunでは、RPCが完了するまでmain()をブロックする必要があります。あなたが示したコードのようなもののために、最も簡単なことは、RPCを起動した後に 'channel.shutdown()'を実行した後、 'channel.awaitTerminated()'を実行することです。 RPCが完了すると、チャネルは終了し、 'awaitTerminated()'が返ります。 –