2016-05-22 11 views
5

私はKafkaを使い慣れています。私は自分のローカルマシンにJavaプロデューサを作成し、ネットワーク上でM2と言う別のマシンにKafkaブローカーをセットアップしました(私はping、SSH、このマシンに接続できます)。 Eclipseコンソールのプロデューサー側では、「Message sent」を取得します。しかし、マシンM2のコンソールコンシューマーをチェックすると、私はそれらのメッセージを見ることができません。Kafka:Javaプロデューサからメッセージが送信された後、コンソールコンシューマにメッセージが表示されません。

マイJavaプロデューサのコードは次のとおりです。

import org.apache.kafka.clients.producer.KafkaProducer; 
import org.apache.kafka.clients.producer.ProducerConfig; 
import org.apache.kafka.clients.producer.ProducerRecord; 


import java.util.HashMap; 
import java.util.Map; 

public class KafkaMessageProducer { 

    /** 
    * @param args 
    */ 
    public static void main(String[] args) { 

     KafkaMessageProducer reportObj = new KafkaMessageProducer(); 
     reportObj.send(); 

    } 

    public void send(){ 

     Map<String, Object> config = new HashMap<String, Object>(); 
     config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "135.113.133.60:9092"); 
     config.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); 
     config.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); 

     KafkaProducer<String, String> producer = new KafkaProducer<String, String>(config); 
     int maxMessages = 5; 
     int count = 0; 
     while(count < maxMessages){ 
      producer.send(new ProducerRecord<String, String>("test", "msg", "message --- #"+count++)); 
      System.out.println("Message send.."+count); 
     } 
     producer.close(); 
    } 

} 

あなたは私が間違っているつもりですどこ私に教えていただけますか?私は、コンソールプロデューサーからマシンM2でローカルにメッセージを送信できます。 注:IPアドレスをKafka Brokerの完全なホスト名に変更しても、同じ問題が残ります。

更新:プロデューサーがKafkaブローカーに接続してメッセージを送信できるとも思っていますが、Kafka Brokerはこれらのメッセージを消費者に渡しません。 IPアドレスまたはポートをZookeeper(Kafka Brokerと同じノードで動作している)に変更し、Zookeeperのログを見ると、プロデューサのpingが取得され、セッションが拒否されます。

Update2:プロデューサーのjarを作成し、マシンM2でこのjarファイルを実行しました。プロデューサーがKafkaブローカーに接続しようとする方法に何か問題があるようです。まだ分​​かりませんが、何が問題なのですか?

+0

プロデューサがメッセージを送信する前にコンソールコンシューマを起動して実行しましたか?トピックの冒頭から読もうとしましたか? – yuyang

+0

はい。コンソールコンシューマーが稼働していました。私はまた、プロデューサーがmsgを送った後にそれを試しました。私はトピックの最初からフォームコンソールのコンシューマーを読んでいます。 – user2441441

+0

無関係:私は多くのdownvotesの価値がない他の質問が見つかりました。だから私はここでいくつかの補償を与えている;-) – GhostCat

答えて

3

私は結局答えを見つけました。他の誰かが同じ問題を抱えている場合に私はここに投稿しています。リモート接続しようとしているときは、Kafkaブローカー設定のadvertised.hostnameを使用してください。これは私のために働いた。

+0

もう少し詳しく説明してください。あなたが追加/変更した設定ライン全体を提供してください。 (あなたが質問を記述したやり方で、あなたの答えも説明してください)。あなたの応答を待って、事前に感謝します。 –

+1

@DynamicRemo \ Kafka \ config \ server.propertiesに、これを追加する必要があります: 'port = 9092 advertised.host.name = localhost'私はリモートm/cを使用していないので、localhostが使用されていることに注意してください。 –

1

次のコードを使用して、ブローカーがメッセージを受信したかどうかをkafkaトピックのメタデータ情報で確認できます。これはデバッグに役立ちます。 producer.send(/* record */).get();ある は、send()メソッドから返さFutureからの結果を待ってみてください - ただ、デバッグのためのアイデアとして

SimpleConsumer consumer = new SimpleConsumer(broker.host(), broker.port(), 100000, 
     64 * 1024, "your_group_id"); 
List<String> topics = new ArrayList<>(); 
topics.add(topic); 
TopicMetadataRequest req = new TopicMetadataRequest(topics); 

TopicMetadataResponse resp = simpleConsumer.send(req); 
if (resp.topicsMetadata().size() != 1) { 
    throw new RuntimeException("Expected one metadata for topic " 
     + topic + " found " + resp.topicsMetadata().size()); 
} 

TopicMetadata topicMetaData = resp.topicsMetadata().get(0); 
+0

私はこれをどこで実行するのですか?私のローカルマシンまたはマシンM2で? – user2441441

+0

これは、kafkaブローカと通信できる任意のホストから実行できます。 – yuyang

+0

もう1つの質問 - マシンM2で上記の詳細を言いますか?ローカルマシンでこれを実行するとどうなりますか? – user2441441

2

。プロデューサー側で例外があり、単にバックグラウンドで無視されている可能性があります。

+0

それをやった後、それは固まったままです。コンソールに表示される唯一のものは警告です: log4j:WARN logger(org.apache.kafka.clients.producer.ProducerConfig)のアペンダーは見つかりませんでした。 log4j:WARN log4jシステムを適切に初期化してください。 log4j:WARN詳細については、http://logging.apache.org/log4j/1.2/faq.html#noconfigをご覧ください。 – user2441441

関連する問題