私はkafkaを初めて利用しています。通常、私は小さなJavaデモアプリケーションを作成し、カフカコンシューマーを設定し、3カフカサーバークラスタからデータを取得します。それはうまく動作します。 私は props.put("bootstrap.servers", "192.168.22.1:9092,192.168.22.2:9092,192.168.22.3:9092")
のようなサーバを設定し、consumer.subscribe(Arrays.asList("test_topic_1","test_topic_2","test_topic_3"))
のようなトピックを購読します。 今私は2つの異なるクラスタからデータを消費する必要があります。同じJavaアプリで2種類のカフカサーバクラスタからデータを取得できますか?
ので、カフカのサーバーは、1つのクラスタ、 「と "192.168.22.1:9092,192.168.22.2:9092,192.168.22.3:9092" になります192.168.22.4:9092,192.168.22.5:9092,192.168.22.6 :9092 "を別のクラスターとして使用します。
トピックは、クラスタ番号1の「test_topic_1」、「test_topic_2」、「test_topic_3」、クラスタ番号2の「test_topic_4」、「test_topic_5」、「test_topic_6」になります。
同じJavaアプリケーションでこれを行うことはできますか? 私はしようとしましたが、1つのクラスタからのデータだけが消費される可能性があります。 どうすればいいですか?まことにありがとうございます。
ありがとう@yaswanth、私は2つのインスタンスを使用しました。私のコードの下を見てください。
public class Consumer {
public static void main(String[] args) {
System.out.println("begin consumer");
consume();
consume2();
System.out.println("finish consumer");
}
public static void consume() {
Properties props = new Properties();
props.put("bootstrap.servers", "192.168.116.13:9092");
props.put("group.id", "group-test1");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("auto.offset.reset", "earliest");
props.put("session.timeout.ms", "30000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
consumer.subscribe(Arrays.asList("test_topic_1"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
System.out.println(record.topic()+"---------------------------"+record.value());
}
}
}
public static void consume2() {
Properties props = new Properties();
props.put("bootstrap.servers", "192.168.116.37:9092");
props.put("group.id", "group-test2");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("auto.offset.reset", "earliest");
props.put("session.timeout.ms", "30000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer2 = new KafkaConsumer<String, String>(props);
consumer2.subscribe(Arrays.asList("test_topic_2"));
while (true) {
ConsumerRecords<String, String> records = consumer2.poll(100);
for (ConsumerRecord<String, String> record : records) {
System.out.println(record.topic()+"---------------------------"+record.value());
}
}
}
}助けを
おかげで、@yaswanth、それが動作します。
異なるコンシューマのインスタンスを使用する必要があります。いくつかのコードを投稿してください。それは他の人が働いていないものについてよりよく理解するのを助けるでしょう。 – yaswanth