リアクティブ・エクステンションをOracle AQとともに使用しようとしています。メッセージがOracle Queueに送信されると、メッセージがあることをコンシューマに通知する「OracleAQMessageAvailableEvent」が起動します。 OracleAQMessageAvailableEventHandlerの内部では、コンシューマはOracleAQQueue.Dequeue()をコールしてメッセージを取得します。リアクティブ・エクステンションでホット・オブザーバブルを作成する方法
私はRXで上記の作業をしています。以下は私が使ったコードです。
var messages = Observable.FromEventPattern<OracleAQMessageAvailableEventHandler, OracleAQMessageAvailableEventArgs> (
h => _queue.MessageAvailable += h, h => _queue.MessageAvailable -= h)
.Where(x => x.EventArgs.AvailableMessages > 0)
.Select(x =>
{
OracleAQMessage msg = _queue.Dequeue();
return (UpdateMsg) msg.Payload;
});
messages.subscribe(....)
問題は、私がメッセージを購読する場合はすべての作品たらということですが、私はメッセージを購読する場合は、複数回(つまり私のアプリケーション内で複数のコンシューマ)、その後、すべての消費者が「_queue.Dequeue()」と、すべてを呼び出すようにしようとします新しいメッセージがない場合、最初の呼び出し後の呼び出しは失敗します。
私は何をすべきか教えてください。私はシナリオが熱望できると思っていますが、私はそれを見据えて苦労しています。
ご意見ありがとうございますが、それでも複数のサブスクライバから_queue.Dequeueを呼び出そうとしています。何か案は? – tangokhi
あなたはObservable.FromEventPatternの代わりにSubjectを使用するべきだと思いますか?ユーザーは、IObservableとして公開されているサブジェクトにサブスクライブすることができます。メッセージがOracleAQから受信され、ハンドラが起動されると、メッセージを一旦デキューしてSubject.OnNext(NewMessage)をコールします。これは複数のサブスクライバを対象としますか? – tangokhi
私はあなたが今、(下のあなたの答えで)うまくいると思うが、私は答えたいと思った - いいえ、私はあなたが主題を使うべきだとは思わない。私は誰も主題を使うべきではないと思います。一般に、設計上の欠陥が指摘されています。 –