2017-11-24 2 views
0

私はkafkaを初めて利用しています。通常、私は小さなJavaデモアプリケーションを作成し、カフカコンシューマーを設定し、3カフカサーバークラスタからデータを取得します。それはうまく動作します。 私は props.put("bootstrap.servers", "192.168.22.1:9092,192.168.22.2:9092,192.168.22.3:9092")のようなサーバを設定し、consumer.subscribe(Arrays.asList("test_topic_1","test_topic_2","test_topic_3"))のようなトピックを購読します。 今私は2つの異なるクラスタからデータを消費する必要があります。同じJavaアプリで2種類のカフカサーバクラスタからデータを取得できますか?

ので、カフカのサーバーは、1つのクラスタ、 「と "192.168.22.1:9092,192.168.22.2:9092,192.168.22.3:9092" になります192.168.22.4:9092,192.168.22.5:9092,192.168.22.6 :9092 "を別のクラスターとして使用します。

トピックは、クラスタ番号1の「test_topic_1」、「test_topic_2」、「test_topic_3」、クラスタ番号2の「test_topic_4」、「test_topic_5」、「test_topic_6」になります。

同じJavaアプリケーションでこれを行うことはできますか? 私はしようとしましたが、1つのクラスタからのデータだけが消費される可能性があります。 どうすればいいですか?まことにありがとうございます。

ありがとう@yaswanth、私は2つのインスタンスを使用しました。私のコードの下を見てください。

public class Consumer { 
    public static void main(String[] args) { 
    System.out.println("begin consumer"); 
    consume(); 
    consume2(); 
    System.out.println("finish consumer"); 
    } 
    public static void consume() { 
    Properties props = new Properties(); 
    props.put("bootstrap.servers", "192.168.116.13:9092"); 
    props.put("group.id", "group-test1"); 
    props.put("enable.auto.commit", "true"); 
    props.put("auto.commit.interval.ms", "1000"); 
    props.put("auto.offset.reset", "earliest"); 
    props.put("session.timeout.ms", "30000"); 
    props.put("key.deserializer",   "org.apache.kafka.common.serialization.StringDeserializer"); 
    props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); 
    KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props); 
    consumer.subscribe(Arrays.asList("test_topic_1")); 
    while (true) { 
     ConsumerRecords<String, String> records = consumer.poll(100); 
     for (ConsumerRecord<String, String> record : records) { 
      System.out.println(record.topic()+"---------------------------"+record.value()); 
     } 
    } 
} 
    public static void consume2() { 

    Properties props = new Properties(); 
    props.put("bootstrap.servers", "192.168.116.37:9092"); 
    props.put("group.id", "group-test2"); 
    props.put("enable.auto.commit", "true"); 
    props.put("auto.commit.interval.ms", "1000"); 
    props.put("auto.offset.reset", "earliest"); 
    props.put("session.timeout.ms", "30000"); 
    props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); 
    props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); 
    KafkaConsumer<String, String> consumer2 = new KafkaConsumer<String, String>(props); 
    consumer2.subscribe(Arrays.asList("test_topic_2"));  
    while (true) { 
     ConsumerRecords<String, String> records = consumer2.poll(100); 
     for (ConsumerRecord<String, String> record : records) { 
      System.out.println(record.topic()+"---------------------------"+record.value()); 
     } 
    } 
} 

}助けを

おかげで、@yaswanth、それが動作します。

+1

異なるコンシューマのインスタンスを使用する必要があります。いくつかのコードを投稿してください。それは他の人が働いていないものについてよりよく理解するのを助けるでしょう。 – yaswanth

答えて

0

consume2()が呼び出されないため、これは機能しません。 consume()consume2()は個別に正しいようです。 2つの異なるスレッドでconsume()consume2()を実行する必要があります。あなたはこれを行うことができます。メインから2つの異なるスレッドを開始し、RunnableまたはCallableconsume()consume2()を入れてください。

ただ、あなたのアイデアを与えるために

public class Consumer2 implements Runnable { 

    @Override 
    public void run() { 
     Properties props = new Properties(); 
     props.put("bootstrap.servers", "192.168.116.37:9092"); 
     props.put("group.id", "group-test2"); 
     props.put("enable.auto.commit", "true"); 
     props.put("auto.commit.interval.ms", "1000"); 
     props.put("auto.offset.reset", "earliest"); 
     props.put("session.timeout.ms", "30000"); 
     props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); 
     props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); 
     KafkaConsumer<String, String> consumer2 = new KafkaConsumer<String, String>(props); 
     consumer2.subscribe(Arrays.asList("test_topic_2")); 
     while (true) { 
      ConsumerRecords<String, String> records = consumer2.poll(100); 
      for (ConsumerRecord<String, String> record : records) { 
       System.out.println(record.topic()+"---------------------------"+record.value()); 
      } 
     } 
    } 
} 

public class Consumer1 implements Runnable { 

    @Override 
    public void run() { 
     Properties props = new Properties(); 
     props.put("bootstrap.servers", "192.168.116.13:9092"); 
     props.put("group.id", "group-test1"); 
     props.put("enable.auto.commit", "true"); 
     props.put("auto.commit.interval.ms", "1000"); 
     props.put("auto.offset.reset", "earliest"); 
     props.put("session.timeout.ms", "30000"); 
     props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); 
     props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); 
     KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props); 
     consumer.subscribe(Arrays.asList("test_topic_1")); 
     while (true) { 
      ConsumerRecords<String, String> records = consumer.poll(100); 
      for (ConsumerRecord<String, String> record : records) { 
       System.out.println(record.topic() + "---------------------------" + record.value()); 
      } 
     } 
    } 
} 

public class AppStarter { 

    public void init() { 
     ExecutorService executorService = Executors.newFixedThreadPool(2); 
     List<Runnable> runnables = new ArrayList<>(); 
     Future future1 = executorService.submit(new Consumer1()); 
     Future future2 = executorService.submit(new Consumer2()); 

     try { 
      future1.get(); 
      future2.get(); 
     } catch (InterruptedException e) { 
      e.printStackTrace(); 
     } catch (ExecutionException e) { 
      e.printStackTrace(); 
     } 
    } 

    public static void main(String[] args) { 
     AppStarter appStarter = new AppStarter(); 
     appStarter.init(); 
    } 
} 

:上記のコードはあなたにそれを実装する方法のアイデアを与えることだけです。プロパティをパラメータ化してコンストラクタargとして渡し、Consumerクラスの実装を1つだけ持つことができます。

関連する問題