2017-12-22 16 views
0

TLDR:同じトピックの他のものよりも早急にいくつかのメッセージを処理できるようにしたいと思います。kafkaバインダーで春のクラウドストリームを使用していくつかのメッセージを優先順位付けする方法

詳細な説明:

私はいくつかのタスクのために互いの間で話をするカフカ、バインダーと、スプリング・クラウド・ストリームに依存する多くのスプリングmicroservicesを使用して、かなり大規模なアプリケーションに取り組んでいます。

スケジュールされたタスクが多数のイベントを生成することがあります。これらのイベントが消費されると、中間イベントが生成されて送信され、初期および中間メッセージがすべて消費され、すべての作業が完了するまでにかなりの時間がかかることがあります。

この間、同じタイプのイベントを手動で送信したい場合、他のすべてが完了したときにのみキューに入れられ、処理されます。

他のすべてのイベントの前に、これらの「手動イベント」を優先処理することができないかどうかを確認するように求められました。

通常、この場合、すべてのトピック(たとえばtopic-normaltopic-urgentなど)を複製し、@StreamListenerを複製することを推奨する人が表示されます。ここにはさまざまなトピックが含まれているので、これはやや複雑な選択肢のようです。これを考慮に入れて書き直す必要があるカフカストリーム集約もあります。

私たちはおそらく、緊急のメッセージが必要な各ト​​ピックの消費者グループ(consumerGroupNormalconsumerGroupUrgentusing)を複製することを考えていました。次に、各メッセージにプライオリティヘッダーを追加し、StreamListenerを複製し、リスナの1つにpriority: urgentヘッダーのメッセージのみを消費し、もう1つのリスナにはpriority:normalのメッセージのみを消費します。

また、この "優先度"ヘッダーを自動的に追加して読み込むChannelInterceptorAdapterを作成します(a bit like Sleuth does it)。

しかし、私はバインダーの書き込みにたくさんの設定を加えなければならず、リスナーを複製し、ヘッダーをチェックしてリスナーコードを実行する小さなコードを追加する必要があります。

これは他の人には便利な非常に一般的でシンプルなユースケースのように見えるので、これは簡単な方法がないのだろうかと思っていました。

答えて

1

トピックごとに2つのコンシューマ・グループを使用することは、それを実行して2つのリスナーを持つ唯一の方法です。

ただし、カスタムチャネルインターセプタは必要ありません。 conditionプロパティを"headers['foo'].equals('urgent')"のように使用できます。

+0

Hum、私はChannelInterceptorAdapterを使用して、後続のメッセージを生成するときに自動的にヘッダーを伝播させることを考えていました(緊急性を自動的に保持するため)。これは、SleuthがSpanIdヘッダで行うことです。デベロッパーとして私たちはトレーシードを気にする必要はなく、必要に応じて自動的にそれを挿入して読み取ります。 –

+0

私は条件プロパティについては考えていませんでしたが、これは 'StreamListener'をさらに簡略化しました。ありがとう! –

+0

あなたは、春のクラウドストリームのgithubにこのような機能を求める問題を作り出す価値があると思いますか? –

関連する問題