私はKafkaを使い慣れています。私は自分のローカルマシンにJavaプロデューサを作成し、ネットワーク上でM2と言う別のマシンにKafkaブローカーをセットアップしました(私はping、SSH、このマシンに接続できます)。 Eclipseコンソールのプロデューサー側では、「Message sent」を取得します。しかし、マシンM2のコンソールコンシューマーをチェックすると、私はそれらのメッセージを見ることができません。Kafka:Javaプロデューサからメッセージが送信された後、コンソールコンシューマにメッセージが表示されません。
マイJavaプロデューサのコードは次のとおりです。
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.HashMap;
import java.util.Map;
public class KafkaMessageProducer {
/**
* @param args
*/
public static void main(String[] args) {
KafkaMessageProducer reportObj = new KafkaMessageProducer();
reportObj.send();
}
public void send(){
Map<String, Object> config = new HashMap<String, Object>();
config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "135.113.133.60:9092");
config.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
config.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer<String, String> producer = new KafkaProducer<String, String>(config);
int maxMessages = 5;
int count = 0;
while(count < maxMessages){
producer.send(new ProducerRecord<String, String>("test", "msg", "message --- #"+count++));
System.out.println("Message send.."+count);
}
producer.close();
}
}
あなたは私が間違っているつもりですどこ私に教えていただけますか?私は、コンソールプロデューサーからマシンM2でローカルにメッセージを送信できます。 注:IPアドレスをKafka Brokerの完全なホスト名に変更しても、同じ問題が残ります。
更新:プロデューサーがKafkaブローカーに接続してメッセージを送信できるとも思っていますが、Kafka Brokerはこれらのメッセージを消費者に渡しません。 IPアドレスまたはポートをZookeeper(Kafka Brokerと同じノードで動作している)に変更し、Zookeeperのログを見ると、プロデューサのpingが取得され、セッションが拒否されます。
Update2:プロデューサーのjarを作成し、マシンM2でこのjarファイルを実行しました。プロデューサーがKafkaブローカーに接続しようとする方法に何か問題があるようです。まだ分かりませんが、何が問題なのですか?
プロデューサがメッセージを送信する前にコンソールコンシューマを起動して実行しましたか?トピックの冒頭から読もうとしましたか? – yuyang
はい。コンソールコンシューマーが稼働していました。私はまた、プロデューサーがmsgを送った後にそれを試しました。私はトピックの最初からフォームコンソールのコンシューマーを読んでいます。 – user2441441
無関係:私は多くのdownvotesの価値がない他の質問が見つかりました。だから私はここでいくつかの補償を与えている;-) – GhostCat