私はスパークとカフカに新しく、カフカでスパークストリーミングの使用パターンが少し異なります。私は同じカフカトピックを購読しているスパークストリーミングアプリケーション
spark-core_2.10 - 2.1.1
spark-streaming_2.10 - 2.1.1
spark-streaming-kafka-0-10_2.10 - 2.0.0
kafka_2.10 - 0.10.1.1
を使用しています 連続イベントデータは、私は、複数のスパークストリーミングアプリケーションから処理する必要がカフカのトピックにストリーミングされています。しかし、スパークストリーミングアプリを実行すると、そのうちの1人だけがデータを受信します。
Map<String, Object> kafkaParams = new HashMap<String, Object>();
kafkaParams.put("bootstrap.servers", "localhost:9092");
kafkaParams.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
kafkaParams.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
kafkaParams.put("auto.offset.reset", "latest");
kafkaParams.put("group.id", "test-consumer-group");
kafkaParams.put("enable.auto.commit", "true");
kafkaParams.put("auto.commit.interval.ms", "1000");
kafkaParams.put("session.timeout.ms", "30000");
Collection<String> topics = Arrays.asList("4908100105999_000005");;
JavaInputDStream<ConsumerRecord<String, String>> stream = org.apache.spark.streaming.kafka010.KafkaUtils.createDirectStream(
ssc,
LocationStrategies.PreferConsistent(),
ConsumerStrategies.<String, String> Subscribe(topics, kafkaParams));
... //spark processing
私は2つのスパークストリーミングアプリケーションを使用しています。通常、最初に提出したものは、カフカメッセージを消費します。 2番目のアプリケーションは、メッセージを待つだけで、処理が進まない。 私が読んだように、カフカの話題は複数の消費者から購読することができますが、スパークストリーミングには当てはまりませんか?それとも、カフカの話題とその構成に欠けているものがありますか?
ありがとうございます。
両方のコンシューマで同じグループIDを使用していたため、1人のコンシューマしかメッセージを受信していませんでした。同じトピックに購読している異なるgroup.idを持つ消費者は、メッセージを別々に/並行して受信します。 – Gurubg
はい、同じグループIDを使用すると、1つだけがメッセージを受信します。 –