2012-04-02 14 views
1

私は最も簡単な質問を述べることから始めます。そこにはRx Intersect演算子の実装がありますか?Rx交差演算子

基本的に私は値を生成する2つのストリームを持っています。 A、B、C、D、E、F、G とストリーム2は、生成:B、D、F

は両方の流れは、バックグラウンドのために(完全および無限大ではないであろう。それらが設けられているのは、ストリーム1が生成言う う私たちが同時に照会した2つの異なるデータソースによって)。

Rxの世界で非同期の交差演算子を実装する方法についての推奨事項はありますか?

答えて

3

私が知る限り、「公式」実装はありません。ほとんどの場合、2つのソースから値を収集し、反対側のソースに一致するものがあるかどうかをチェックする必要があります。このような何かが、あなたが始める必要があります。

<Extension()> 
Public Function Intersect(Of T)(first As IObservable(Of T), 
           second As IObservable(Of T), 
           comparer As IEqualityComparer(Of T) 
           ) As IObservable(Of T) 
    If first Is Nothing Then Throw New ArgumentNullException("first") 
    If second Is Nothing Then Throw New ArgumentNullException("second") 
    If comparer Is Nothing Then Throw New ArgumentException("comparer") 

    Return Observable.Create(Of T)(
     Function(obs) 
      Dim gate As New Object() 
      Dim firstItems As New HashSet(Of T)(comparer) 
      Dim secondItems As New HashSet(Of T)(comparer) 
      Dim firstCompleted, secondCompleted As Boolean 

      Dim disp As New CompositeDisposable(2) 
      disp.Add(first.Subscribe(Sub(v) 
             SyncLock gate 
              firstItems.Add(v) 
              If secondItems.Contains(v) Then obs.OnNext(v) 
             End SyncLock 
            End Sub, 
            AddressOf obs.OnError, 
            Sub() 
             SyncLock gate 
              firstCompleted = True 
              If secondCompleted Then obs.OnCompleted() 
             End SyncLock 
            End Sub)) 
      disp.Add(second.Subscribe(Sub(v) 
              SyncLock gate 
               secondItems.Add(v) 
               If firstItems.Contains(v) Then obs.OnNext(v) 
              End SyncLock 
             End Sub, 
             AddressOf obs.OnError, 
             Sub() 
              SyncLock gate 
               secondCompleted = True 
               If firstCompleted Then obs.OnCompleted() 
              End SyncLock 
             End Sub)) 
      Return disp 
     End Function) 
End Function 

を入力し、複数のoccurrancesが含まれている場合、この実装では、マッチを繰り返すことになりますが、それは両方のソースで発見された後にのみ。たとえば、

first ----1---2---1----2---1---1----| 
second ----------2----1-----------| 
out ----------2----1-2---1---1----| 

繰り返しが望ましくない場合は、対応するソースコレクションにないことを確認できます。最初のサブスクリプションは、

first.Subscribe(Sub(v) 
        SyncLock gate 
         'check that the first doesn't already contain this value 
         If firstItems.Add(v) AndAlso 
          secondItems.Contains(v) Then obs.OnNext(v) 
        End SyncLock 
       End Sub, 
       AddressOf obs.OnError, 
       Sub() 
        SyncLock gate 
         firstCompleted = True 
         If secondCompleted Then obs.OnCompleted() 
        End SyncLock 
       End Sub) 

に変更され、2番目のサブスクリプションは同様に変更されます。

+0

感謝。これはうまくいく。 – Damian

1

は、あなただけ行うことはできません。ここで

var intersect = from x in stream1 
       from y in stream2 
       where x == y 
       select x; 
+0

それは私のすぐれた考えでもありました、リチャード。私は、これが担当者を得るための簡単な質問になると考えました。私は逃したが、私はあなたにいくつかを与えた。 :-) – Enigmativity

+0

@Enigmativity;) –

+2

この解決策には2つの潜在的な問題があります。 stream2が熱く、stream1の一致がstream2の一致する値の後に来たらどうなりますか? stream2が寒くてstream1の各値のサブスクリプションをA Bad Thing(tm)とするとどうなりますか? –

4

が熱い観測のために働く別の実装だ、それはあなたの積集合を与え、その'c'は両方のストリームで3回表示された場合、それは一度だけ表示されます交差ストリーム内にある。

IObservable<char> stream1; 
IObservable<char> stream2; 

var intersect = Observable 
    .Merge(stream1.Distinct(), stream2.Distinct()) 
    .GroupBy(c=>c) 
    .SelectMany(g=>g.Skip(1).Take(1)); 

例:

stream1 ---a---b--c--d--e--a-b-c 
stream2 -b---a---e---------a-b-c 
intersect -----a-b--------e------c 
+0

ニース!私はグループ化を使ってこれを行う方法があると思った。私はすでに答えをマークしましたが、これはupvoteに値する。 – Damian

+0

'stream2'の最後の要素、つまり' c'は 'intersect'ストリームで放出されません。 –

+0

@MihaiPantea良い点、それはすべきです。 –