0

KafkaConsumerを使用して、別々のスレッドでメッセージを消費することができます。私はmultiprocessing.Processの代わりthreading.Threadを使用する場合Python Kafkaマルチプロセス対スレッド

はしかし、私はエラーを取得する:

OSError: [Errno 9] Bad file descriptor

このquestiondocumentationが並行してメッセージを消費するマルチプロセッシングを使用することが可能であることを示唆しています。誰かが実際の例を共有してもらえますか?

編集

ここではいくつかのサンプルコードです。申し訳ありませんが、元のコードがあまりにも複雑なので、ここでサンプルを作成して、何が起きているのかを伝えたいと思います。 multiprocessing.Processの代わりにthreading.Threadを使用すると、このコードは正常に機能します。

from multiprocessing import Process 

class KafkaWrapper(): 
    def __init__(self): 
     self.consumer = KafkaConsumer(bootstrap_servers='my.server.com') 

    def consume(self, topic): 
     self.consumer.subscribe(topic) 
     for message in self.consumer: 
      print(message.value) 

class ServiceInterface(): 
    def __init__(self): 
     self.kafka_wrapper = KafkaWrapper() 

    def start(self, topic): 
     self.kafka_wrapper.consume(topic) 

class ServiceA(ServiceInterface): 
    pass 

class ServiceB(ServiceInterface): 
    pass 


def main(): 

    serviceA = ServiceA() 
    serviceB = ServiceB() 

    jobs=[] 
    # The code works fine if I used threading.Thread here instead of Process 
    jobs.append(Process(target=serviceA.start, args=("my-topic",))) 
    jobs.append(Process(target=serviceB.start, args=("my-topic",))) 

    for job in jobs: 
     job.start() 

    for job in jobs: 
     job.join() 

if __name__ == "__main__": 
    main() 

そして、ここでは、私が見誤りです(ここでも、私の実際のコードは、上記のサンプルとは異なり、私はthreading.Threadを使用している場合、それが正常に動作しますが、私はmultiprocessing.Processを使用しない場合):

File "/usr/local/Cellar/python3/3.6.2/Frameworks/Python.framework/Versions/3.6/lib/python3.6/multiprocessing/process.py", line 249, in _bootstrap 
    self.run() 
    File "/usr/local/Cellar/python3/3.6.2/Frameworks/Python.framework/Versions/3.6/lib/python3.6/multiprocessing/process.py", line 93, in run 
    self._target(*self._args, **self._kwargs) 
    File "service_interface.py", line 58, in start 
    self._kafka_wrapper.start_consuming(self.service_object_id) 
    File "kafka_wrapper.py", line 141, in start_consuming 
    for message in self._consumer: 
    File "venv/lib/python3.6/site-packages/kafka/consumer/group.py", line 1082, in __next__ 
    return next(self._iterator) 
    File "venv/lib/python3.6/site-packages/kafka/consumer/group.py", line 1022, in _message_generator 
    self._client.poll(timeout_ms=poll_ms, sleep=True) 
    File "venv/lib/python3.6/site-packages/kafka/client_async.py", line 556, in poll 
    responses.extend(self._poll(timeout, sleep=sleep)) 
    File "venv/lib/python3.6/site-packages/kafka/client_async.py", line 573, in _poll 
    ready = self._selector.select(timeout) 
    File "/usr/local/Cellar/python3/3.6.2/Frameworks/Python.framework/Versions/3.6/lib/python3.6/multiprocessing/process.py", line 249, in _bootstrap 
    self.run() 
    File "/usr/local/Cellar/python3/3.6.2/Frameworks/Python.framework/Versions/3.6/lib/python3.6/selectors.py", line 577, in select 
    kev_list = self._kqueue.control(None, max_ev, timeout) 
    File "/usr/local/Cellar/python3/3.6.2/Frameworks/Python.framework/Versions/3.6/lib/python3.6/multiprocessing/process.py", line 93, in run 
    self._target(*self._args, **self._kwargs) 
    File "service_interface.py", line 58, in start 
    self._kafka_wrapper.start_consuming(self.service_object_id) 
    File "kafka_wrapper.py", line 141, in start_consuming 
    for message in self._consumer: 
    File "venv/lib/python3.6/site-packages/kafka/consumer/group.py", line 1082, in __next__ 
    return next(self._iterator) 
    File "venv/lib/python3.6/site-packages/kafka/consumer/group.py", line 1022, in _message_generator 
    self._client.poll(timeout_ms=poll_ms, sleep=True) 
    File "venv/lib/python3.6/site-packages/kafka/client_async.py", line 556, in poll 
    responses.extend(self._poll(timeout, sleep=sleep)) 
OSError: [Errno 9] Bad file descriptor 
    File "venv/lib/python3.6/site-packages/kafka/client_async.py", line 573, in _poll 
    ready = self._selector.select(timeout) 
    File "/usr/local/Cellar/python3/3.6.2/Frameworks/Python.framework/Versions/3.6/lib/python3.6/selectors.py", line 577, in select 
    kev_list = self._kqueue.control(None, max_ev, timeout) 
OSError: [Errno 9] Bad file descriptor 
+0

はい、並行してメッセージを処理するのは、カフカストリームがすべてのものです。エラーのトレースバックに加えてあなたのコードを投稿できますか? – Kyle

+0

質問にサンプルコードを追加しました。 – Deven

答えて

2

カフカ消費者はマルチプロセスでもマルチスレッドでもかまいません(Kafkaの初期バージョンではKafka Consumer Groupが正しく使用されていることを確認してください)、選択はあなた次第です。

しかし、Kafkaクライアントライブラリは、フォークセーフを保証するために、使用されている接続(Kafkaサーバーに接続)が複数のプロセスで共有されないようにする必要があります。これが接続エラーです。

回避策として、生成プロセスの前にKafkaConsumerを作成しないでください。代わりに、操作を各プロセスに移動します。

もう1つの方法は、単一のスレッド/プロセスフェッチメッセージを使用し、実際の操作を行うために余分なプロセスプールを使用することです。

+0

ありがとう!私はあなたが提案した変更を加えました。つまり、それぞれのプロセスに「KafkaConsumer」の作成を移しました。今はうまくいきます。 – Deven

関連する問題