0

私は動物園の飼育係と3カフカのブローカーをローカルに運営しています。 私は1人のプロデューサーと1人の消費者を始めました。私は消費者がメッセージを消費しているのを見ることができます。春のデータストリームプロジェクトで同じ消費者グループのすべての消費者にメッセージを公開

その後、同じコンシューマグループ名(春のブートプロジェクト以降に異なるポート)で3人のコンシューマを起動しました。私が見つけたのは、すべての消費者が現在メッセージを消費している(受信している)ということです。しかし、私はメッセージが消費者を越えて繰り返されないという点でメッセージが負荷分散されることを期待しています。私は問題が何であるか分からない。

は、ここに私のプロパティファイル

ここ
spring.cloud.stream.bindings.input.destination=timerTopicLocal 
spring.cloud.stream.kafka.binder.zkNodes=localhost 
spring.cloud.stream.kafka.binder.brokers=localhost 
spring.cloud.stream.bindings.input.group=timerGroup 

グループがtimerGroupです。

コンシューマコード:https://github.com/codecentric/edmp-sample-stream-sink

プロデューサーコード:https://github.com/codecentric/edmp-sample-stream-source

答えて

0

はあなたがCamden.RELEASEへの依存関係を更新(および0.9+カフカを使用して起動)してくださいことはできますか? Brixton.RELEASEでは、Kafkaのコンシューマは0.8ベースで、パーティションを正しく配布するためにinstanceIndex/instanceCountをプロパティとして渡す必要がありました。

Camden.RELEASEでは、予想通りにロードバランシングを行うKafka 0.9+コンシューマクライアントを使用しています(instanceIndex/instanceCountによる静的パーティション割り当てもサポートしていますが、これはあなたが望むものではないと思われます)。私はBrixtonでこれを設定する方法の詳細を入力することができますが、アップグレードははるかに簡単な方法であるはずです。

関連する問題