2017-05-29 12 views
2

gRPCアーキテクチャを使用するpub/subパターンのような無限ストリーム応答を実装しています。python - gRPC無限ストリーム中にクライアントが検出されないようにする

応答ストリームをオープンし、クライアントがドロップするまでそれを保持するエンドポイントがあります。これを行うには、キーがgRPCコンテキストで、値が送信メッセージをポーリングするために使用するキューであるキー値ハッシュを格納します。このコードは、クライアントに変更をサブスクライブし、公開用の正常に動作します

def StreamTrades(self, request, context): 
    self.feeds[context] = queue.Queue() 

    callback_queue = queue.Queue() 

    def remove_feed(): 
     if self.feeds.get(context) is not None: 
      del self.feeds[context] 

    def stop_stream(): 
     remove_feed() 

     def raise_stop_stream_exception(): 
      raise StopStream('stopping stream') 

     callback_queue.put(raise_stop_stream_exception) 

    context.add_callback(stop_stream) 

    def output_generator(): 
     while True: 
      try: 
       try: 
        callback = callback_queue.get(False) 
        callback() 
       except queue.Empty: 
        pass 
       if self.feeds.get(context) is not None: 
        trade = self.feeds[context].get() 
        if isinstance(trade, trades_pb2.Trade): 
         yield trade 
       else: 
        raise StopStream('stopping stream') 
      except IndexError: 
       pass 
      except StopStream: 
       return 

    return output_generator() 

:よう

私の終点コードが見えます。しかし、購読解除に関連する問題があります。クライアントのドロップを検出する良い方法は何ですか? Context.add_callback(callBack)の使用は、サーバーが終了してストリームを閉じるときにのみコールバックが呼び出されるため、動作していないようです。そして、クライアントがもう存在しないとき、発電機はどんな種類のステータスも提起しません。 Javaで、streamObserverでonNextが呼び出され、Status.CANCELLEDがスローされたStatusRuntimeExceptionがクライアントに存在しない場合、それはすでに十分な遅延解除を可能にします。

応答ストリーム中に接続を切断するクライアントを検出する方法はありますか。

答えて

3

ServicerContext.add_callbackで登録したコールバックは、クライアントが接続を切断したときに呼び出す必要があります。それが呼び出されていないということは、あなたがthis bugに苦しんでいることを示しています。 ではない "サーバーが終了してストリームを閉じるときにコールバックが呼び出される"場合。

+0

確かに私はそれが正しい行動であることを期待しました。それは既知の報告されたバグであるため、私はこのシステムをJavaで再実装し、Pythonの新しいリリースを待つでしょう。 –

関連する問題