2017-10-20 13 views
0

4つのパーティションを持つ単純なプロデューサを作成し、コンシューマグループに4つのコンシューマを作成して各パーティションのデータを消費するようにしました。 どうすればいいですか?4つのパーティションを持つ単一プロデューサ用の複数のコンシューマセットアップKafka Java

コンシューマコードコードの

public class KafkaConsumer { 
    static List<String> list = new ArrayList<String>(); 
    public static DataFrame reqFieldBOIDDf; 
    public static DataFrame df ; 
    static SparkConf conf = new SparkConf() 

        .setAppName("kafka-sandbox") 

        .setMaster("local[*]") 

        .set("spark.cassandra.connection.host","localhost"); //for cassandra 

      static JavaSparkContext sc = new JavaSparkContext(conf); 

    private static long lastOffset; 
      public static void main(String[] str) throws InterruptedException { 
       execute(); 
      } 
      private static void execute() throws InterruptedException { 

        KafkaConsumer<String, String> consumer = createConsumer();  consumer.subscribe(Arrays.asList("KafkaConsumerTopic6")); 
        processRecords(consumer); 
        System.out.println("Inside execute"); 
      } 
      private static KafkaConsumer<String, String> createConsumer() { 
        Properties props = new Properties(); 
        props.put("bootstrap.servers", "localhost:9092"); 
        String consumeGroup = "cg1"; 
        props.put("group.id", consumeGroup); 
        props.put("enable.auto.commit", "true"); 
        props.put("auto.commit.interval.ms", "101"); 
        props.put("max.partition.fetch.bytes", "1035"); 
        props.put("heartbeat.interval.ms", "3000"); 
        props.put("session.timeout.ms", "6001");props.put("max.poll.records","500"); 
props.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer"); 
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); 
        return new KafkaConsumer<String, String>(props); 
      } 
      private static void processRecords(KafkaConsumer<String, String> consumer) throws InterruptedException { 
        while (true){ 
          ConsumerRecords<String, String> records = consumer.poll(1000); 
          lastOffset = 0; 
          for (ConsumerRecord<String, String> record : records) { 
          lastOffset = record.offset(); 
          list.add(record.value()); 
          } 
        } 
      } 

小片は、非常に参考になります。

ありがとうございます。

答えて

0

4人のプロデューサーを開始するのと同じ方法で4人の消費者を開始してください。

すべてが同じgroup.idの設定になっていることを確認し、トピック(1つのパーティションで4つのトピックがある場合はトピック)をすべて購読します。彼らは同じグループに入るので、Kafkaは自動的に1つのコンシューマを各パーティションに割り当てます。

関連する問題