2017-10-17 15 views
0

私はカフカを完全に新しくしましたドッカー、修正する問題がありました。カフカ(アパッチ)のキューのための当社の継続的インテグレーションテストはエラーのこの種で失敗時折、ローカルマシン上でうまく実行されますが、ときジェンキンスCIサーバー上:ドッカーの待ち時間のためにカフカがタイムアウトします

%3|1508247800.270|FAIL|art#producer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Connect to ipv4#127.0.0.1:9092 failed: Connection refused 
%3|1508247800.270|ERROR|art#producer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Connect to ipv4#127.0.0.1:9092 failed: Connection refused 
%3|1508247800.270|ERROR|art#producer-1| [thrd:localhost:9092/bootstrap]: 1/1 brokers are down 

作業理論はドッカーの画像は時間がかかることです開始するには、カフカのプロデューサーが諦めていた時間。問題のコードは、上記のエラーラインがプロデューサの作成時に表示され、

producer_properties = { 
     'bootstrap.servers': self._job_queue.bootstrap_server, 
     'client.id': self._job_queue.client_id 
    } 
    try: 
     self._producer = kafka.Producer(**producer_properties) 
    except: 
     print("Bang!") 

です。ただし、例外は発生せず、コールはそうでなければ有効な探しているプロデューサを返すので、ブローカエンドポイントの存在をプログラムでテストすることはできません。ブローカのステータスを確認するAPIはありますか?

+0

同じドッカーコンテナーからカフカブローカーを使用していますか?そして、どのPython Kafkaライブラリを使っていますか? –

+0

Kafkaブローカーは同じDockerコンテナーにあり、Adobe Confluent Kafkaライブラリーを使用していると思います。 –

答えて

1

ブローカへの接続に失敗した場合、クライアントは例外をスローしないようです。プロデューサを最初にプロデューサがメッセージを送信しようとすると、実際にはブートストラップサーバに接続しようとします。接続に失敗すると、ブートストラップリストに渡されたブローカのいずれかに繰り返し接続しようとします。最終的に、ブローカーが来たら、sendが実行されます(そして、コールバック関数の状態をチェックするかもしれません)。 コンフルエントなkafkaのpythonライブラリはlibrdkafkaライブラリを使用していますが、このクライアントは適切なドキュメントを持っていないようです。 Kafkaプロトコルで指定されたKafkaプロデューサオプションの一部は、librdkafkaによってサポートされていないようです。ここで

は、私が使用するコールバックのサンプルコードです:

from confluent_kafka import Producer 

def notifyme(err, msg): 
    print err, msg.key(), msg.value() 

p = Producer({'bootstrap.servers': '127.0.0.1:9092', 'retry.backoff.ms' : 100, 
     'message.send.max.retries' : 20, 
     "reconnect.backoff.jitter.ms" : 2000}) 
try: 
    p.produce(topic='sometopic', value='this is data', on_delivery=notifyme) 
except Exception as e: 
    print e 
p.flush() 

をまた、ブローカーの存在をチェックする、あなただけのポート上でブローカーのIPにtelnetこと(この例では、それは9092です) 。カフカクラスターが使用している動物園では、ブローカー/ IDの下にあるzノードの内容を確認することができます。

+0

これは役に立ちます。私は、エラーコールバック(on_deliveryコールバックではない)を登録しています。これは、produce()の後にflush()が来たときにヒットします。しかし、ブローカーが利用できるようになったら、カフカはまだメッセージを追加しようとしますか?または、要求を再試行する必要があると仮定することができます。 –

+0

どのようにエラーコールバックを登録していますか?一般に、エラーまたは成功のコールバックは、メッセージがKafkaに正常に送信された後、または失敗したときにそのメッセージをあきらめた後(数回の再試行の後である可能性があります)、ライブラリによって呼び出され、コードは生産者(失敗の場合)。 –

+0

'producer_properties = { 'bootstrap.servers':self._job_queue.bootstrap_server、 'error_cb':自己.__ ON_ERROR、 'client.id':self._job_queue.client_id、 }戻りkafka.Producer(** producer_properties) ' flush()を数回呼び出すと、エラーが戻ってくるようになります(一度はそうしないようです)。 –

0

私にとってはうまくいくようなコードです。それが少しフランケンシュタインに見えたら、そうです、それはあります!清潔な解決策がある場合、私はそれを見ることを楽しみにしています:

import time 
import uuid 
from threading import Event 
from typing import Dict 

import confluent_kafka as kafka 
# pylint: disable=no-name-in-module 
from confluent_kafka.cimpl import KafkaError 

# more imports... 

LOG = # ... 


# Default number of times to retry connection to Kafka Broker 
_DEFAULT_RETRIES = 3 

# Default time in seconds to wait between connection attempts 
_DEFAULT_RETRY_DELAY_S = 5.0 

# Number of times to scan for an error after initiating the connection. It appears that calling 
# flush() once on a producer after construction isn't sufficient to catch the 'broker not available' 
# # error. At least twice seems to work. 
_NUM_ERROR_SCANS = 2 


class JobProducer(object): 
    def __init__(self, connection_retries: int=_DEFAULT_RETRIES, 
       retry_delay_s: float=_DEFAULT_RETRY_DELAY_S) -> None: 
     """ 
     Constructs a producer. 
     :param connection_retries: how many times to retry the connection before raising a 
     RuntimeError. If 0, retry forever. 
     :param retry_delay_s: how long to wait between retries in seconds. 
     """ 
     self.__error_event = Event() 
     self._job_queue = JobQueue() 
     self._producer = self.__wait_for_broker(connection_retries, retry_delay_s) 
     self._topic = self._job_queue.topic 

    def produce_job(self, job_definition: Dict) -> None: 
     """ 
     Produce a job definition on the queue 
     :param job_definition: definition of the job to be executed 
     """ 
     value = ... # Conversion to JSON 
     key = str(uuid.uuid4()) 
     LOG.info('Produced message: %s', value) 

     self.__error_event.clear() 
     self._producer.produce(self._topic, 
           value=value, 
           key=key, 
           on_delivery=self._on_delivery) 
     self._producer.flush(self._job_queue.flush_timeout) 

    @staticmethod 
    def _on_delivery(error, message): 
     if error: 
      LOG.error('Failed to produce job %s, with error: %s', message.key(), error) 

    def __create_producer(self) -> kafka.Producer: 
     producer_properties = { 
      'bootstrap.servers': self._job_queue.bootstrap_server, 
      'error_cb': self.__on_error, 
      'client.id': self._job_queue.client_id, 
     } 
     return kafka.Producer(**producer_properties) 

    def __wait_for_broker(self, retries: int, delay: float) -> kafka.Producer: 
     retry_count = 0 
     while True: 
      self.__error_event.clear() 
      producer = self.__create_producer() 
      # Need to call flush() several times with a delay between to ensure errors are caught. 
      if not self.__error_event.is_set(): 
       for _ in range(_NUM_ERROR_SCANS): 
        producer.flush(0.1) 
        if self.__error_event.is_set(): 
         break 
        time.sleep(0.1) 
       else: 
        # Success: no errors. 
        return producer 

      # If we get to here, the error callback was invoked. 
      retry_count += 1 
      if retries == 0: 
       msg = '({})'.format(retry_count) 
      else: 
       if retry_count <= retries: 
        msg = '({}/{})'.format(retry_count, retries) 
       else: 
        raise RuntimeError('JobProducer timed out') 

      LOG.warn('JobProducer: could not connect to broker, will retry %s', msg) 
      time.sleep(delay) 

    def __on_error(self, error: KafkaError) -> None: 
     LOG.error('KafkaError: %s', error.str()) 
     self.__error_event.set() 
関連する問題