2017-12-17 13 views
0

私はシンプルなプロデューサー - コンシューマーのセットアップをしています:1人のプロデューサー(スレッドとして)と2人のコンシューマー(2つのプロセスとして)。 プロデューサーのrunメソッド:Python - Kafka:consumer failing

def run(self): 
     producer = KafkaProducer(bootstrap_servers=self.bootstrap_servers, 
           api_version=(0, 10)) 
     while not self.stop_event.is_set(): 
      self.logger.info("Checking for new changes") 
      self.check_for_new_changes(producer) 
      self.logger.info("Sleeping for {minutes} 
          minutes...".format(minutes=self.time_to_sleep/60)) 
      time.sleep(self.time_to_sleep) 
     producer.close() 

は、基本的には、変更をチェック新しい変更が見つかった場合は、メッセージを送信した後、5分間スリープ状態になります。

runメソッド:

def run(self): 
    if self.group_id: 
     consumer = KafkaConsumer(bootstrap_servers=self.bootstrap_servers, 
           consumer_timeout_ms=1000, 
           api_version=(0, 10), 
           group_id=self.group_id) 
    else: 
     consumer = KafkaConsumer(bootstrap_servers=self.bootstrap_servers, 
           consumer_timeout_ms=1000, 
           api_version=(0, 10)) 
    consumer.subscribe(['new_change']) 
    while not self.stop_event.is_set(): 
     for msg in consumer: 
      self.logger.info("New message:\n{msg}".format(msg=msg)) 
      self.process_new_change(json.loads(msg.value)) 
      if self.stop_event.is_set(): 
       consumer.close() 
       return 
    consumer.close() 

正常に動作するようですが、しばらく実行した後、私はコーディネーターのログにこれらのメッセージを得る:

[2017-12-17 02:06:40,639] INFO [GroupCoordinator 0]: Member kafka-python-1.3.5-f5cdcad3-bc1a-4623-a42b-f5de5e8bded1 in group meta_data_consumer has failed, removing it from the group (kafka.coordinator.group.GroupCoordinator) 
[2017-12-17 02:06:40,659] INFO [GroupCoordinator 0]: Preparing to rebalance group meta_data_consumer with old generation 15 (__consumer_offsets-6) (kafka.coordinator.group.GroupCoordinator) 
[2017-12-17 02:06:40,659] INFO [GroupCoordinator 0]: Group meta_data_consumer with generation 16 is now empty (__consumer_offsets-6) (kafka.coordinator.group.GroupCoordinator) 
[2017-12-17 02:06:41,784] INFO [GroupCoordinator 0]: Member kafka-python-1.3.5-bdea8ce3-922f-4ee1-9959-13341e1730f5 in group failures_consumer has failed, removing it from the group (kafka.coordinator.group.GroupCoordinator) 
[2017-12-17 02:06:41,785] INFO [GroupCoordinator 0]: Preparing to rebalance group failures_consumer with old generation 9 (__consumer_offsets-35) (kafka.coordinator.group.GroupCoordinator) 
[2017-12-17 02:06:41,785] INFO [GroupCoordinator 0]: Group failures_consumer with generation 10 is now empty (__consumer_offsets-35) (kafka.coordinator.group.GroupCoordinator) 

これは私の消費者を殺し、彼らは実行を停止します。 コンシューマログに例外やエラーが表示されません。

何が失敗する可能性がありますか?

+0

あなたのkafkaのインストールはスタンドアロンですか、または動物園で働いていますか?それはバランスをとることに問題があるようです。 「消費者団体」を使用しているとき、または使用していないときにこれが起こりましたか? –

+0

zookeeperとgroup_idを使用する。 私は2つのグループを持っています。 –

答えて

1

あなたのzookeeper.session.timeout.msは5分以下に設定されていると思います。動物園の設定からタイムアウトを調整します。まだ失敗するかどうかを確認してください。そうであれば、カフカ設定のタイムアウトを調整する必要があります。 group.max.session.timeout.msrebalance.timeout.msheartbeat.interval.msはそれに応じて調整する必要があります。クライアントは5分間スリープし、その間にそのタイムアウト値の1つを超過し、グループコーディネーターは、消費者が失敗したと考えて消費者のバランスを取り直そうとします。

出典:Kafka Documentation

+0

しかし私のプロデューサーは眠っている人です。それは消費者に影響を与えるべきか? –

関連する問題