2017-10-10 10 views
0

消費者に消費量を動的に更新させようとしています。新しいトピックがカフカで作成されたことを消費者に通知するには?

動物を使った具体的な例を挙げておきます。私がペットショップを持っていると想像してみましょう。すべての話題は動物の一種です(犬、猫、魚など)。私のカフカの消費者の主な責任は、私たちがカフカで持っているログ/記録/メッセージを取得し、それらをデータベースに保存することです。

私の消費者がdogscatsのトピックで積極的に消費していて、すべてうまくいけば、今度は新しいタイプの動物が店に入って、新しい話題がカフカクラスターに生成されるとします。新しいトピックが追加されたことを消費者にどのように通知するのですか?

私は2つの提案をしていますが、どちらが良いと思いますか?または、より良い第3のオプションがある場合は、私に知らせてください。

1.)プロデューサは、httpリクエストをコンシューマに送信し、プロデューサが新しいトピックを作成しようとしていることをコンシューマに知らせる。このアプローチの問題は、競合状態が存在することです。トピックが作成される前に消費者が消費しようとする可能性があります。 (私が実際にauto.topic.creation.enableをtrueに設定した場合、競合状態は実際問題ではないことが分かりました)

2.)カフカクラスターにtopic_updatesという余分なトピックを作成します。したがって、プロデューサーがKafkaクラスターにメッセージを正常にコミットすると、それはtopic_updatesを通じてニュースをブロードキャストします。消費者はこのトピックの更新を積極的に聞いています。

3)わかりませんが、理想的には、新しいトピックが作成されるたびにカフカがイベントを発行できることを願っています。

は新しいの消費者が認識して持っているmetadata.max.age.msを下に下げてもらえ

答えて

1

消費者は、自動的に新しいが作成したトピックを見つけることができている、とあなたは、単にconsumer.subscribe(Pattern.compile(".*"));

を呼び出すことによって、すべてのトピックをサブスクライブでき、事前にありがとうトピックをより迅速に作成できます。

0

新しいKafkaAdminClientを使用して、何らかの形でトピックのリストを監視し、新しい追加を確認できます。トピックのリストを提供するサンプルコード(内部トピックを除く):

Properties properties = new Properties(); 
properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); 
KafkaAdminClient kafkaAdminClient = (KafkaAdminClient) AdminClient.create(properties); 
ListTopicsResult listTopicResult = kafkaAdminClient.listTopics(); 
System.out.println(listTopicResult.names().get().toString()); 
関連する問題