gRPCアーキテクチャを使用するpub/subパターンのような無限ストリーム応答を実装しています。python - gRPC無限ストリーム中にクライアントが検出されないようにする
応答ストリームをオープンし、クライアントがドロップするまでそれを保持するエンドポイントがあります。これを行うには、キーがgRPCコンテキストで、値が送信メッセージをポーリングするために使用するキューであるキー値ハッシュを格納します。このコードは、クライアントに変更をサブスクライブし、公開用の正常に動作します
def StreamTrades(self, request, context):
self.feeds[context] = queue.Queue()
callback_queue = queue.Queue()
def remove_feed():
if self.feeds.get(context) is not None:
del self.feeds[context]
def stop_stream():
remove_feed()
def raise_stop_stream_exception():
raise StopStream('stopping stream')
callback_queue.put(raise_stop_stream_exception)
context.add_callback(stop_stream)
def output_generator():
while True:
try:
try:
callback = callback_queue.get(False)
callback()
except queue.Empty:
pass
if self.feeds.get(context) is not None:
trade = self.feeds[context].get()
if isinstance(trade, trades_pb2.Trade):
yield trade
else:
raise StopStream('stopping stream')
except IndexError:
pass
except StopStream:
return
return output_generator()
:よう
私の終点コードが見えます。しかし、購読解除に関連する問題があります。クライアントのドロップを検出する良い方法は何ですか? Context.add_callback(callBack)の使用は、サーバーが終了してストリームを閉じるときにのみコールバックが呼び出されるため、動作していないようです。そして、クライアントがもう存在しないとき、発電機はどんな種類のステータスも提起しません。 Javaで、streamObserverでonNextが呼び出され、Status.CANCELLEDがスローされたStatusRuntimeExceptionがクライアントに存在しない場合、それはすでに十分な遅延解除を可能にします。
応答ストリーム中に接続を切断するクライアントを検出する方法はありますか。
確かに私はそれが正しい行動であることを期待しました。それは既知の報告されたバグであるため、私はこのシステムをJavaで再実装し、Pythonの新しいリリースを待つでしょう。 –