8

トピックからメッセージをプッシュ/プルするためにKafkaプロデューサとコンシューマ(0.9.0)スクリプトを使用しようとすると、以下のエラーが表示されます。トピックメタデータを取得するときにカフカの消費者が「リーダーを見つけられませんでした」

プロデューサーエラー

[2016-01-13 02:49:40,078] ERROR Error when sending message to topic test with key: null, value: 11 bytes with error: Failed to update metadata after 60000 ms. (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback) 

コンシューマーエラー

> [2016-01-13 02:47:18,620] WARN 
> [console-consumer-90116_f89a0b380f19-1452653212738-9f857257-leader-finder-thread], 
> Failed to find leader for Set([test,0]) 
> (kafka.consumer.ConsumerFetcherManager$LeaderFinderThread) 
> kafka.common.KafkaException: fetching topic metadata for topics 
> [Set(test)] from broker 
> [ArrayBuffer(BrokerEndPoint(0,192.168.99.100,9092))] failed at 
> kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:73) at 
> kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:94) at 
> kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:66) 
> at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63) 
> Caused by: java.io.EOFException at 
> org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:83) 
> at 
> kafka.network.BlockingChannel.readCompletely(BlockingChannel.scala:129) 
> at kafka.network.BlockingChannel.receive(BlockingChannel.scala:120) 
> at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:77) 
> at 
> kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:74) 
> at kafka.producer.SyncProducer.send(SyncProducer.scala:119)  at 
> kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:59) 
> ... 3 more 

なぜ私はエラーを取得していますし、どのように私はそれを解決するのですか?マックにドッカーコンテナ内のすべてのコンポーネントを実行

設定

。 ZooKeeperとKafkaは別々のDockerコンテナで動作します。

ドッカーマシン(boot2docker)IPアドレス:192.168.99.100 ZooKeeperのポート:2181 カフカポート:以下を設定しますserver.properties9092

カフカの設定ファイル:

host.name=localhost 
broker.id=0 
port=9092 
advertised.host.name=192.168.99.100 
advertised.port=9092 

コマンド

私が実行カフカサーバーのDockerコンテナからのコマンドに従います。 1つのパーティションと1のレプリケーションファクタを持つトピックを作成しました。

のリーダーの指定は、問題の一部である可能性があります0です。

[email protected]:/opt/kafka/dist# ./bin/kafka-topics.sh --zookeeper 192.168.99.100:2181 --topic test --describe 
Topic:test PartitionCount:1 ReplicationFactor:1 Configs: 
    Topic: test Partition: 0 Leader: 0 Replicas: 0 Isr: 0 

私はその後、いくつかのメッセージを送信するには、次の操作を行います。

[email protected]:/opt/kafka/dist# ./bin/kafka-console-producer.sh --broker-list 192.168.99.100:9092 --topic test 
one message 
two message 
three message 
four message 
[2016-01-13 02:49:40,078] ERROR Error when sending message to topic test with key: null, value: 11 bytes with error: Failed to update metadata after 60000 ms. (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback) 
[2016-01-13 02:50:40,080] ERROR Error when sending message to topic test with key: null, value: 11 bytes with error: Failed to update metadata after 60000 ms. (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback) 
[2016-01-13 02:51:40,081] ERROR Error when sending message to topic test with key: null, value: 13 bytes with error: Failed to update metadata after 60000 ms. (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback) 
[2016-01-13 02:52:40,083] ERROR Error when sending message to topic test with key: null, value: 12 bytes with error: Failed to update metadata after 60000 ms. (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback) 

これは私が、私は上記の投稿、消費者のエラーを生成するメッセージを消費しようとするために使用しているコマンドです。私は、ポート21819092を確認しました

[email protected]:/opt/kafka/dist# ./bin/kafka-console-consumer.sh --zookeeper 192.168.99.100:2181 --topic test --from-beginning 

はオープ​​ンとカフカドッカーコンテナ内からアクセスできます。

[email protected]:/# nc -z 192.168.99.100 2181; echo $?; 
0 
[email protected]:/# nc -z 192.168.99.100 9092; echo $?; 
0 

答えて

4

ソリューションは、私は全く期待したものではなかったです。エラーメッセージは実際に起こっていたものと一線を画しませんでした。

主な問題は、Dockerのログディレクトリをローカルファイルシステムにマウントすることでした。私のdocker runコマンドはボリュームマウントを使用して、コンテナ内のKafka log.dirフォルダを、実際にMacにマウントされたホストVMのローカルディレクトリにマウントしました。後者の点が問題でした。例えば

docker run --name kafka -v /Users/<me>/kafka/logs:/var/opt/kafka:rw -p 9092:9092 -d kafka 

私はMac上でだと(例えばboot2docker)ドッカマシンを使用しているので、私はホストVMに自動マウントをboot2docker私/Users/経路を通ってマウントする必要があります。基礎となるVM自体がバインドマウントを使用するため、KafkaのI/Oエンジンは正しく通信できませんでした。ボリュームのマウントが、ホストのLinux VM(つまりboot2dockerマシン)のディレクトリに直接行われていた場合は、正しく動作します。

私はKafka I/Oのイン/アウトを知らないので、正確な詳細は説明できませんが、マウントされたボリュームをMacファイルシステムに削除すると機能します。