2

kafka-python(1.3.5)KafkaProducerを使用して、データをKafkaキューにプッシュするPython(2.7)マルチプロセッシングを使用しています。Pythonマルチプロセッシングを使用してデータがKafkaキューにプッシュされない

from kafka import KafkaProducer 
import multiprocessing 
# other imports 


class TestClass(object): 
    def __init__(self, producer): 
     self.kafka_producer = producer 

    def main(self, conf, nthreads): 
     try: 
      for i in range(nthreads): 
       logger.info("Starting process number = %d " % (i + 1)) 
       p = Process(target=self.do_some_task, args=(conf, 2)) 
       p.start() 
       processes.append(p) 
      for p in processes: 
       logger.info("Joining process") 
      p.join() 
     except Exception, ex: 
      logger.error("Exception occurred : %s" % str(ex)) 

    def do_some_task(conf, retry): 
     # some task happening 
     self.record(arg1, arg2) 

    # pushing to kafka 
    def record(self, arg1, arg2) 
     message = json.dumps({"a": "arg1", "b": "arg2"}) 
     self.kafka_producer.send(KAFKA_TOPIC, message) 


if __name__ == '__main__': 
    kafka_producer = KafkaProducer(
     bootstrap_servers=KAFKA_BOOTSTRAP_SERVERS, 
     request_timeout_ms=60000, 
     retries=2) 
    obj = TestClass(kafka_producer) 

    try: 
     parser = argparse.ArgumentParser(description='Description') 
     parser.add_argument('-threads', type=int, default=1) # 20 threads 
     parser.add_argument('-debug', type=int, default=0) 
     args = parser.parse_args() 
     me = SingleInstance(args.src) 
     TestClass.main(CONF[args.src], args.threads) 

20スレッドが内部で生成され、kafkaに書き込まれます。私はログを見て、プロセスがメッセージがkafkaで書かれるのを待つが、最終的にそれはKafkaに書かれていないまま移動することが分かった。例外はありません。

私はPythonのコマンドラインからスレッドを使わずに同じコードを実行しようとしましたが、すべてが期待通りに機能しました。何が問題なのか。

答えて

0

フォークプロセスの後にkafkaへの接続を開始してください。そして、接続を閉じて、接続に関連するエラーが発生したら再接続してください。

関連する問題