0
4つのパーティションを持つ単純なプロデューサを作成し、コンシューマグループに4つのコンシューマを作成して各パーティションのデータを消費するようにしました。 どうすればいいですか?4つのパーティションを持つ単一プロデューサ用の複数のコンシューマセットアップKafka Java
コンシューマコードコードの
public class KafkaConsumer {
static List<String> list = new ArrayList<String>();
public static DataFrame reqFieldBOIDDf;
public static DataFrame df ;
static SparkConf conf = new SparkConf()
.setAppName("kafka-sandbox")
.setMaster("local[*]")
.set("spark.cassandra.connection.host","localhost"); //for cassandra
static JavaSparkContext sc = new JavaSparkContext(conf);
private static long lastOffset;
public static void main(String[] str) throws InterruptedException {
execute();
}
private static void execute() throws InterruptedException {
KafkaConsumer<String, String> consumer = createConsumer(); consumer.subscribe(Arrays.asList("KafkaConsumerTopic6"));
processRecords(consumer);
System.out.println("Inside execute");
}
private static KafkaConsumer<String, String> createConsumer() {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
String consumeGroup = "cg1";
props.put("group.id", consumeGroup);
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "101");
props.put("max.partition.fetch.bytes", "1035");
props.put("heartbeat.interval.ms", "3000");
props.put("session.timeout.ms", "6001");props.put("max.poll.records","500");
props.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
return new KafkaConsumer<String, String>(props);
}
private static void processRecords(KafkaConsumer<String, String> consumer) throws InterruptedException {
while (true){
ConsumerRecords<String, String> records = consumer.poll(1000);
lastOffset = 0;
for (ConsumerRecord<String, String> record : records) {
lastOffset = record.offset();
list.add(record.value());
}
}
}
小片は、非常に参考になります。
ありがとうございます。