2017-10-17 9 views
4

Google Cloud PubSubトピックの長期プルサブスクリプションを確立しようとしています。 私はドキュメントhere、すなわちで与えられた例と非常によく似たコードを使用しています:私はこのことを見たStatusCode.UNAVAILABLEを返すGoogle PubSubのPythonクライアント

Exception in thread Consumer helper: consume bidirectional stream: 
Traceback (most recent call last): 
    File "/usr/lib/python3.5/threading.py", line 914, in _bootstrap_inner 
    self.run() 
    File "/usr/lib/python3.5/threading.py", line 862, in run 
    self._target(*self._args, **self._kwargs) 
    File "/path/to/google/cloud/pubsub_v1/subscriber/_consumer.py", line 248, in _blocking_consume 
    self._policy.on_exception(exc) 
    File "/path/to/google/cloud/pubsub_v1/subscriber/policy/thread.py", line 135, in on_exception 
    raise exception 
    File "/path/to/google/cloud/pubsub_v1/subscriber/_consumer.py", line 234, in _blocking_consume 
    for response in response_generator: 
    File "/path/to/grpc/_channel.py", line 348, in __next__ 
    return self._next() 
    File "/path/to/grpc/_channel.py", line 342, in _next 
    raise self 
grpc._channel._Rendezvous: <_Rendezvous of RPC that terminated with (StatusCode.UNAVAILABLE, The service was unable to fulfill your request. Please try again. [code=8a75])> 

def receive_messages(project, subscription_name): 
    """Receives messages from a pull subscription.""" 
    subscriber = pubsub_v1.SubscriberClient() 
    subscription_path = subscriber.subscription_path(
     project, subscription_name) 

    def callback(message): 
     print('Received message: {}'.format(message)) 
     message.ack() 

    subscriber.subscribe(subscription_path, callback=callback) 

    # The subscriber is non-blocking, so we must keep the main thread from 
    # exiting to allow it to process messages in the background. 
    print('Listening for messages on {}'.format(subscription_path)) 
    while True: 
     time.sleep(60) 

問題は、私は時々、次のトレースバックを受けていますということですanother questionで参照されましたが、ここではPythonで正しく処理する方法を尋ねています。例外でリクエストをラップしようとしましたが、バックグラウンドで実行されているようで、そのエラーの場合は再試行できません。

答えて

5

私のために働いているややハッキリなアプローチは、カスタムですpolicy_class。デフォルトのものにはがありません。DEADLINE_EXCEEDEDを無視します。デフォルトを継承するクラスを作成し、UNAVAILABLEも無視することができます。鉱山は、次のようになります。

from google.cloud import pubsub 
from google.cloud.pubsub_v1.subscriber.policy import thread 
import grpc 

class AvailablePolicy(thread.Policy): 
    def on_exception(self, exception): 
     """The parent ignores DEADLINE_EXCEEDED. Let's also ignore UNAVAILABLE. 

     I'm not sure what triggers that error, but if you ignore it, your 
     subscriber seems to work just fine. It's probably an intermittent 
     thing and it reconnects later if you just give it a chance. 
     """ 
     # If this is UNAVAILABLE, then we want to retry. 
     # That entails just returning None. 
     unavailable = grpc.StatusCode.UNAVAILABLE 
     if getattr(exception, 'code', lambda: None)() == unavailable: 
      return 
     # For anything else, fallback on super. 
     super(AvailablePolicy, self).on_exception(exception) 

subscriber = pubsub.SubscriberClient(policy_class=AvailablePolicy) 
# Continue to set up as normal. 

それはoriginalon_exceptionなどの多くは、単に別のエラーを無視に見えます。必要に応じて、例外がスローされるたびにログを追加し、すべてが動作することを確認することができます。今後のメッセージは引き続き出ます。

+0

FWIW、約1時間後に[cpu problem](https://github.com/GoogleCloudPlatform/google-cloud-python/issues/3965)があります。私は、あなたがエラーを無視するたびに問題がスレッドリークであると思う(他の人は 'DEADLINE_EXCEEDED'の後にそれを取得する)が、修正を見つけることができませんでした。私は古いAPIを使い、ストリームのものを使うのではなく、自分で定期的にプルすることに戻った。 [この例](https://github.com/GoogleCloudPlatform/cloud-pubsub-samples-python/blob/master/cmdline-pull/pubsub_sample.py)は、これを設定するのに便利でした。 –

+0

返事をありがとう...非常に簡単にすべき何かのために非常に不合理なようだ。手元にあるタスクについては、魅力のように動作する 'golang'クライアントライブラリを使用して終了しました – adrpino

+0

[githubの関連issue](https://github.com/GoogleCloudPlatform/google-cloud-python/issues/2683) – Blackus

関連する問題