2012-05-12 10 views
0

私はRx(リアクティブフレームワーク)を使ってバスを実装していますが、これまでのところよく見えます。今直面している問題は、ストリームの開始前にイベントを追加する方法です。ReplaySubjectを使ってRx観測点の先頭にイベントを追加することは可能ですか?

これは、より多くのコンテキストを提供するために、これはイベントソース(la CQRS/ES)です。バスが始まると、多くの加入者がIObservableを取得します。この時点で、バスは加入者にどのイベント番号/シーケンスを開始する必要があるかを尋ねる。イベントはディスクからロードされ、最も小さい番号から開始され、ストリームに追加され、各サブスクライバはwhereステートメントを使用して正しいイベントから開始します。アプリケーションが実行されると、新しいイベントが追加され、サブスクライバに移動します。この部分は素晴らしい作品です。

遅れているが、まだすべてのイベントが必要なサブスクライバもあります。 ReplaySubjectは、イベントの数が十分に少ない限り、請求書に適合します。私はおそらくいくつかのディスク/オフラインのキャッシュを行うことができます考えているより多くの周り(任意のポインタを歓迎!)

大きな問題は、一部の加入者がIObservableを取得すると、最初にロードされたイベントの前に発生したイベントを取得する必要があることです。次のような状況。

バスが始まったとき、それはイベント#50以降をロードしました(一番早い者が望みました)。今、新しいサブスクライバはIObservableを要求しますが、20番以降は必要ありません。だから私がしたいのは、ロード20-49で、ストリームの開始の前に追加してください。既存の加入者のいずれも、イベントのいずれも見るべきではありません(Whereによってフィルタリングされます)。

これは可能であるはずですが、私はそれのまわりで頭を上げることはできません。これはRxで可能ですか?もしそうなら、どうですか?

ありがとうございます! エリックどの程度だけ

答えて

3

ReplaySubjectを使用しないで、すべての「ライブ」イベントに単一のSubjectを使用し、各サブスクライバに対してこれの前に履歴イベントを連結します。

私はあなたが履歴レコードを追跡する何らかの種類のデータ構造を持っていると思います。その後、

var fromIndex = 20; 
var eventSubject = new Subject<Event>(); 
capturedEvents 
    .GetFromIndex(fromIndex) // Gets a enumerable list of events 
    .ToObservable() 
    .Concat(eventSubject) 
    .Subscribe(...); 

:あなたは既にメモリに#80を言って#50から/キャプチャしたイベントをロードした場合はそれが#20からイベントを要求に沿って、新たな加入者が来るとき、私はこのような何かをしたいですeventSubjectは、新しいイベントをキャプチャするために使用することができ、そのよう:

eventSubject.Subscribe(capturedEvents.CaptureEvent); 

これはあなたのニーズを満たしていますか?

+0

午前中にテストする必要があると思います。私はRxで作業することはちょっと精神的な調整が必要だと言わなければなりません! –

+1

@エリック - 私はいくつかの精神的調整を取ることに同意しますが、あなたが私のようなものであれば、以前のやり方に戻ることは望ましくないことがわかります。私はちょうど今、普通のイベントハンドラを使っているのが嫌いです。多くのコードは、Rxを使って短く、簡潔に、安全に作られています。 – Enigmativity

+0

私は基本的に上記のものを持っていますが、私は古い出来事が到着するのを見ていません。購読する必要がありますか? –

2

:私は非常に単純なのRxの方法でこれを見ます

itemsFromTwentyToFourtyNine.Concat(currentItems); 
関連する問題