2016-06-12 14 views
0

Java API v0.9.0.1を使用して簡単なKafkaコンシューマを動作させようとしています。私が使用しているkafkaサーバはドッキング・コンテナで、バージョン0.9.0.1も稼動しています。以下はコンシューマーコードです:Kafka 0.9.0.1 JavaのコンシューマがawaitMetadataUpdate()にスタックしました

ただし、コンシューマーを起動すると、上記のpoll(100)メソッドが呼び出され、返されません。いつものように見える

public void awaitMetadataUpdate() { 
    int version = this.metadata.requestUpdate(); 

    do { 
     this.poll(9223372036854775807L); 
    } while(this.metadata.version() == version); 

} 

(両方のバージョンとthis.metadata.versionを():それが立ち往生のようなデバッグは、それが永遠にorg.apache.kafka.clients.consumer.internals.ConsumerNetworkClientで次のメソッドを実行しているに見えます== 2)。さらに、エラーは発生しませんが、Javaプロデューサからのメッセージは待ち行列に届くことはありませんでした。コマンドラインのkafkaツールを使用して、キューからメッセージを送受信できることを確認しました。

誰でも何が起こっているのか分かりません。場合

+0

これは、通常、プロデューサとコンシューマが到達可能なエンドポイントを宣伝していないブローカに関連する問題です。あなたのブローカーは外部から到達可能なアドレスを聞いていますか?ブローカーのプロパティを確認してくださいadvertized.listeners –

+0

ルチアーノ、あなたにスポットがあります。サーバー上で環境変数ADVERTISED_PORTとADVERTISED_HOSTを設定すると、問題が解決されました。これがなければ、コマンドラインのコンシューマ/プロデューサは正しく機能することができますが、Javaの実装ではできないことはちょっと分かりません。 – cacois

+0

既知の問題であると思われます。https://issues.apache.org/jira/browse/KAFKA-3727 –

答えて

1

これは私のためのソリューションは、以下の環境変数を設定した、同様の問題を持つ他の誰に役立ちます:

ADVERTISED_HOST=localhost 
ADVERTISED_PORT=9092 

(もちろん、ここでの値は、インストールに合わせて変更する場合があります)

コマンドラインのコンシューマとプロデューサのスクリプトは、これらのenv変数が設定されていなくても、ブローカを正しく検索して通信することができますが、Java APIの実装ではできません。エラーはスローされません。メタデータを更新しようとすると、最初のポーリングでは無限ループになります。

関連する問題