2017-08-29 7 views
2

私はスパークとカフカに新しく、カフカでスパークストリーミングの使用パターンが少し異なります。私は同じカフカトピックを購読しているスパークストリーミングアプリケーション

spark-core_2.10 - 2.1.1 
spark-streaming_2.10 - 2.1.1 
spark-streaming-kafka-0-10_2.10 - 2.0.0 
kafka_2.10 - 0.10.1.1 

を使用しています 連続イベントデータは、私は、複数のスパークストリーミングアプリケーションから処理する必要がカフカのトピックにストリーミングされています。しかし、スパークストリーミングアプリを実行すると、そのうちの1人だけがデータを受信します。

 Map<String, Object> kafkaParams = new HashMap<String, Object>(); 

    kafkaParams.put("bootstrap.servers", "localhost:9092"); 
    kafkaParams.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); 
    kafkaParams.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); 
    kafkaParams.put("auto.offset.reset", "latest"); 
    kafkaParams.put("group.id", "test-consumer-group"); 
    kafkaParams.put("enable.auto.commit", "true"); 
    kafkaParams.put("auto.commit.interval.ms", "1000"); 
    kafkaParams.put("session.timeout.ms", "30000"); 

    Collection<String> topics = Arrays.asList("4908100105999_000005");; 
    JavaInputDStream<ConsumerRecord<String, String>> stream = org.apache.spark.streaming.kafka010.KafkaUtils.createDirectStream(
        ssc, 
        LocationStrategies.PreferConsistent(), 
        ConsumerStrategies.<String, String> Subscribe(topics, kafkaParams)); 

     ... //spark processing 

私は2つのスパークストリーミングアプリケーションを使用しています。通常、最初に提出したものは、カフカメッセージを消費します。 2番目のアプリケーションは、メッセージを待つだけで、処理が進まない。 私が読んだように、カフカの話題は複数の消費者から購読することができますが、スパークストリーミングには当てはまりませんか?それとも、カフカの話題とその構成に欠けているものがありますか?

ありがとうございます。

答えて

0

同じグループIDで異な​​るストリームを作成できます。

複数カフカ入力DStreamsが異なるグループで作成することができますレシーバベースのアプローチとのため トピック:

アプローチ1:ここでは0.8の統合のためのオンラインドキュメントからのより多くの詳細があり、二つのアプローチがあります複数の受信機を使用するデータの並列受信。

アプローチ2:直接的なアプローチ(ノーレシーバ)

複数カフカストリーム入力と組合それらを作成する必要はありません。 directStreamを使用すると、Spark Streamingは、 と同じくらい多くのRDDパーティションを作成し、消費するカフカパーティションがあり、すべてがパラレルで カフカのデータを読み込みます。そのため、Kafkaと RDDパーティションの間には1対1のマッピングがあり、理解しやすく調整することができます。あなたは0.10を使用しているようにあなたがあなたのコードからSpark Streaming + Kafka Integration Guide 0.8

で詳細を読むことができます

が見えます、ご参照Spark Streaming + Kafka Integration Guide (Kafka broker version 0.10.0

はしても、すべてがそう依存カフカのプロパティによって制御され、それがスパークストリーミングAPIを使用していると思いましたプロパティファイルで指定したグループIDを使用すると、異なるグループIDを持つ複数のストリームを開始できます。

乾杯!

+1

両方のコンシューマで同じグループIDを使用していたため、1人のコンシューマしかメッセージを受信して​​いませんでした。同じトピックに購読している異なるgroup.idを持つ消費者は、メッセージを別々に/並行して受信します。 – Gurubg

+0

はい、同じグループIDを使用すると、1つだけがメッセージを受信します。 –

1

[消費者グループの下で]コンシューマの数は、トピック内のパーティションの数を超えることはできません。メッセージを並行して消費する場合は、適切な数のパーティションを導入し、各パーティションを処理する受信者を作成する必要があります。

+0

2つのコンシューマ・グループを同じコンシューマ・グループの下に2つのパーティションを持つことの違いは何ですか? – Gurubg

+0

私はカフカパーティションを意味しました。あなたのKafkaトピックに2つのパーティションがあり、メッセージを並行して処理したい場合は、コンシューマグループを導入することができます[このコンシューマグループのコンシューマ数は、使用しているトピックのパーティション数を超えないようにしてください]消費者グループIDによって識別される。 2つの消費者グループが同じグループIDを持つ場合、Kafkaはこれらの両方の消費者グループを1つとみなします。両方のアプリケーションで同じコードを使用している場合は、2番目のアプリケーションのkafkaParams.put( "group.id"、 "test-consumer-group1")を変更してみてください。 –

関連する問題