1

リアクティブ・エクステンションをOracle AQとともに使用しようとしています。メッセージがOracle Queueに送信されると、メッセージがあることをコンシューマに通知する「OracleAQMessageAvailableEvent」が起動します。 OracleAQMessageAvailableEventHandlerの内部では、コンシューマはOracleAQQueue.Dequeue()をコールしてメッセージを取得します。リアクティブ・エクステンションでホット・オブザーバブルを作成する方法

私はRXで上記の作業をしています。以下は私が使ったコードです。

var messages = Observable.FromEventPattern<OracleAQMessageAvailableEventHandler, OracleAQMessageAvailableEventArgs> (
        h => _queue.MessageAvailable += h, h => _queue.MessageAvailable -= h) 
       .Where(x => x.EventArgs.AvailableMessages > 0) 
       .Select(x => 
       { 
        OracleAQMessage msg = _queue.Dequeue(); 
        return (UpdateMsg) msg.Payload; 
       }); 
messages.subscribe(....) 

問題は、私がメッセージを購読する場合はすべての作品たらということですが、私はメッセージを購読する場合は、複数回(つまり私のアプリケーション内で複数のコンシューマ)、その後、すべての消費者が「_queue.Dequeue()」と、すべてを呼び出すようにしようとします新しいメッセージがない場合、最初の呼び出し後の呼び出しは失敗します。

私は何をすべきか教えてください。私はシナリオが熱望できると思っていますが、私はそれを見据えて苦労しています。

答えて

3

あなたはあなたが熱い観測可能性を探しているのは正しいと思います。 コードに従えば、なぜ_queue.Dequeue();が何度も呼び出されているのが分かります。

まず、あなたは、Oracle

Observable.FromEventPattern<OracleAQMessageAvailableEventHandler, OracleAQMessageAvailableEventArgs> (
    h => _queue.MessageAvailable += h, 
    h => _queue.MessageAvailable -= h) 

からイベントをサブスクライブこれだけでは、事前のRxの世界を持っていると同じようにイベントハンドラをフックのようなものです。 誰もが聞いている(購読している)人は同じイベントを受け取ります。 イベントが発生した後に購読すると、見逃してしまいます。

次に、空のセットを除外します。

.Where(x => x.EventArgs.AvailableMessages > 0) 

何もありません。

次に、クエリ内から副作用を実行します。

.Select(x => 
    { 
     OracleAQMessage msg = _queue.Dequeue(); 
     return (UpdateMsg) msg.Payload; 
    }); 

ここ副作用は、あなたが破壊読み出し(Dequeue)を作っているということです。 すべてのユーザは、アップストリーム_queue.MessageAvailableからイベントをプッシュすると、すべてDequeue()にコールしようとします。

サブスクライバのすべてが副作用を引き起こすのを避けるために、あなたが示唆したようにシーケンスをホットにすることができます。 これを行うには、Publish()オペレータを参照してください。

Publish()オペレータはメソッドを追加することでIObservable<T>に拡張されたIConnectableObservable<T>を返します。 これにより、サブスクリプションロジックの実行時期を細かく制御できます。 しかし、これはあまりにもあなたのためのコントロール、おそらくRefCount()が必要なものであることがわかります。今

Observable.FromEventPattern<OracleAQMessageAvailableEventHandler, OracleAQMessageAvailableEventArgs> (
    h => _queue.MessageAvailable += h, 
    h => _queue.MessageAvailable -= h) 
.Where(x => x.EventArgs.AvailableMessages > 0) 
.Select(x => 
    { 
     OracleAQMessage msg = _queue.Dequeue(); 
     return (UpdateMsg) msg.Payload; 
    }) 
.Publish() 
.Refcount(); 

ご契約者のそれぞれは、同じメッセージが表示されます、そして、あなたのDequeue()副作用は唯一のイベントごとに一度呼び出されます(そして加入者が存在している間のみ)。

ホットとコールド観察できるのはhere

+0

ご意見ありがとうございますが、それでも複数のサブスクライバから_queue.Dequeueを呼び出そうとしています。何か案は? – tangokhi

+0

あなたはObservable.FromEventPatternの代わりにSubjectを使用するべきだと思いますか?ユーザーは、IObservableとして公開されているサブジェクトにサブスクライブすることができます。メッセージがOracleAQから受信され、ハンドラが起動されると、メッセージを一旦デキューしてSubject.OnNext(NewMessage)をコールします。これは複数のサブスクライバを対象としますか? – tangokhi

+0

私はあなたが今、(下のあなたの答えで)うまくいると思うが、私は答えたいと思った - いいえ、私はあなたが主題を使うべきだとは思わない。私は誰も主題を使うべきではないと思います。一般に、設計上の欠陥が指摘されています。 –

-1

リー・キャンベル、申し訳ありませんが私の悪い覆われています。あなたが言及したソリューションは動作します。実際、私はそれを間違って使っていました。私は、メッセージと呼ばれるプロパティを持つクラスコールQueueWrapperを持っています。私はメッセージ

public IObservable<UpdateMsg> Messages { 
     get { return Observable.FromEventPattern<OracleAQMessageAvailableEventHandler,   OracleAQMessageAvailableEventArgs> (
     h => _queue.MessageAvailable += h, 
     h => _queue.MessageAvailable -= h) 
     .Where(x => x.EventArgs.AvailableMessages > 0) 
     .Select(x => 
     { 
      OracleAQMessage msg = _queue.Dequeue(); 
      return (UpdateMsg) msg.Payload; 
     }) 
     .Publish() 
     .Refcount(); 
}} 

のこの実装を持っていたし、私のクライアントコードは、メッセージのプロパティは新しいIObservableを返した各サブスクリプションのように、この

// First Subscription 
_queueWrapper.Messages.Subscribe(....) 

// Second Subscription 
_queueWrapper.Messages.Subscribe(....) 

のようなメッセージプロパティを使用して加入しました。この問題を解決するために、私は次のコードQueueWrapperのすなわちのコンストラクタに観察できるの初期化を移動:

public QueueWrapper() { 
    _messages = Observable.FromEventPattern<OracleAQMessageAvailableEventHandler, OracleAQMessageAvailableEventArgs> (
     h => _queue.MessageAvailable += h, 
     h => _queue.MessageAvailable -= h) 
    .Where(x => x.EventArgs.AvailableMessages > 0) 
    .Select(x => 
     { 
      OracleAQMessage msg = _queue.Dequeue(); 
      return (UpdateMsg) msg.Payload; 
     }) 
    .Publish() 
    .Refcount(); 
} 

と私のメッセージプロパティをちょうど_messagesを返します。

public IObservable<UpdateMsg> Messages { get { return _messages; } } 

その後、すべてが正常に動作するようになりました。

関連する問題