まあ、私はプロデューサからコンシューマに単純なデータtansferを持たせるためにPythonでKafka-pythonパッケージ(1.3.2)を使用しようとしています。OverflowErrorの取得:kafka-pythonプロデューサ - コンシューマの使用中にタイムアウト値が大きすぎる
プロデューサー:
from kafka import KafkaProducer
producer = KafkaProducer(bootstrap_servers='localhost:9092')
# produce asynchronously
for _ in range(2):
producer.send('my-topic', b'message')
producer.flush()
producer = KafkaProducer()
消費者:
my-topic:0:5056: key=None value=b'message' my-topic:0:5057: key=None value=b'message'
しかし同時に、私はプロデューサーで、このエラーを持っている:私は私の消費者に以下の受信
from kafka import KafkaConsumer
consumer = KafkaConsumer('my-topic',
group_id='my-group',
bootstrap_servers=['localhost:9092'],fetch_min_bytes=1)
for message in consumer:
print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,
message.offset, message.key,
message.value))
consumer = KafkaConsumer()
consumer.subscribe(["my-topic"])
:
デフォルトではError in atexit._run_exitfuncs: Traceback (most recent call last): File "C:\Users\VNK736\AppData\Local\Programs\Python\Python36-32\lib\site-packages\kafka\producer\kafka.py", line 364, in wrapper _self.close() File "C:\Users\VNK736\AppData\Local\Programs\Python\Python36-32\lib\site-packages\kafka\producer\kafka.py", line 420, in close self._sender.join(timeout) File "C:\Users\VNK736\AppData\Local\Programs\Python\Python36-32\lib\threading.py", line 1060, in join self._wait_for_tstate_lock(timeout=max(timeout, 0)) File "C:\Users\VNK736\AppData\Local\Programs\Python\Python36-32\lib\threading.py", line 1072, in _wait_for_tstate_lock elif lock.acquire(block, timeout): OverflowError: timeout value is too large
タイムアウトがNONE
に設定され、かつKafka.py
に999999999
に設定されています。私はKafkaProducerでこのタイムアウトを渡すためのパラメータを私のプロデューサコードで把握することができません。
誰もこの問題に直面しましたか?あるいは誰でもこの方向で私を助けることができますか?前もって感謝します。