2011-10-10 4 views
2

1つのクエリを実行するために2つのObservableを生成/テストしました。Rxウィンドウ、参加、GroupJoin?

ユーザーは複数の役割を持つことができます。ロールIDが変更されるたびに、データを更新する必要があります。しかし、データが更新されるのは、クエリがアクティブな場合のみです(現在、データが必要なコントロールがいくつかあります)。

ロールIDの変更は、クエリが中断された場合にも発生します。クエリが再度アクティブになると、データも読み込まれます。

//Tuple has the Id of the current Role and the time that the Id updated 
IObservable<Tuple<Guid, DateTime>> idUpdate 

//Tuple has the state of the query (true=active or false=suspended) 
//and the time the state of the query updated 
IObservable<Tuple<bool, DateTime>> queryStateUpdate 

私がマージされる可能性がありますが、私はケースの観測を作成する方法を見つけ出すことができない2例にそれを壊した

//A hot observable that pushes true whenever the query should execute 
IObservable<bool> execute 

を作成したいと思います。

  • ケースa)役割Idは最後の状態は、私はロールIDが

を更新してから、これが最初のアクティブ状態であるアクティブ& &へ

  • ケースb)の状態が更新されアクティブだった&を更新ビデオ、リーキャンプベルサイト、初心者TOCなどを見てきましたが、このrx参加の良い例は見つけられないようです。どのように実行するか、またはケースオブザーバブルを作成する上の任意のアイデア?

  • +0

    これら2つを結ぶID(GUID)はありますか?同様に、queryStateUpdateは 'IObservable >'であるか、すべてのクエリに対して1つのクエリ状態しか観測できません。ケースBの「最後に中断された後に更新されたID」に問題があります。 –

    +0

    すべてのクエリに対して1つのクエリ状態が観測可能です。 – jonperl

    +0

    @AndersonImes - 私はあなたに同意します。質問は現時点で特に明確ではない。 「最後に中断された後に更新されたID」や特定のオブザーバブルのシグネチャのようなステートメントは、この質問で明確に現れていないより深い要件がここにあると私に思い出させます。 – Enigmativity

    答えて

    1

    実際のID(Guid)が使用されているか、DateTimeの値が表示されていないので、少し曖昧です。問題を解決するように見える次のクエリがあります。 :

    IObservable<bool> execute = 
        idUpdate 
         .Publish(_idUpdate => 
          from qsu in queryStateUpdate 
          select qsu.Item1 
           ? _idUpdate.Select(x => true) 
           : Observable.Empty<bool>()) 
         .Switch(); 
    

    私は、次のidUpdate &​​観測でこれをテストしてみました。

    var rnd = new Random(); 
    
    IObservable<Tuple<Guid, DateTime>> idUpdate = 
        Observable 
         .Generate(
          0, 
          n => n < 10000, 
          n => n + 1, 
          n => Tuple.Create(Guid.NewGuid(), DateTime.Now), 
          n => TimeSpan.FromSeconds(rnd.NextDouble() * 0.1)); 
    
    IObservable<Tuple<bool, DateTime>> queryStateUpdate = 
        Observable 
         .Generate(
          0, 
          n => n < 100, 
          n => n + 1, 
          n => n % 2 == 0, 
          n => TimeSpan.FromSeconds(rnd.NextDouble() * 2.0)) 
         .StartWith(true) 
         .DistinctUntilChanged() 
         .Select(b => Tuple.Create(b, DateTime.Now)); 
    

    あなたの問題を明確にすることができれば、おそらくあなたのニーズに合ったより良い答えを提供することができます。


    EDIT:非アクティブ時にIdが変更されたときに必要な「replay(1)」動作が追加されました。

    タプルをDateTimeにする必要がなくなりましたのでご注意ください。

    IObservable<Guid> idUpdate = ... 
    IObservable<bool> queryStateUpdate = ... 
    
    var replay = new ReplaySubject<Guid>(1); 
    var disposer = new SerialDisposable(); 
    Func<bool, IObservable<bool>, IObservable<Guid>, 
        IObservable<Guid>> getSwitch = (qsu, qsus, iu) => 
    { 
        if (qsu) 
        { 
         return replay.Merge(iu); 
        } 
        else 
        { 
         replay.Dispose(); 
         replay = new ReplaySubject<Guid>(1); 
         disposer.Disposable = iu.TakeUntil(qsus).Subscribe(replay); 
         return Observable.Empty<Guid>(); 
        } 
    }; 
    
    var query = 
        queryStateUpdate 
         .DistinctUntilChanged() 
         .Publish(qsus => 
          idUpdate 
          .Publish(ius => 
           qsus 
            .Select(qsu => 
             getSwitch(qsu, qsus, ius)))) 
         .Switch(); 
    
    +0

    ありがとうございました。ケースAの場合はこれで動作します。ケースBのソリューションはありますか? (私は記事を明確にしようとした)。 – jonperl

    +0

    訂正、これは現在の状態に関係なく、どのIDの更新でも実行されるようです。 – jonperl

    +0

    @jonperl - これをかなりテストしましたが、テスト中に状態がアクティブでない場合は、値を生成しませんでした。あなたは私にそれがどこにある例を与えることができますか? – Enigmativity

    1

    私は限り​​が設定されているように処理される、通知idUpdateの流れがあることを言うように質問をお読みください。​​が設定されていない場合、通知は再び​​に設定されるまで一時停止する必要があります。

    この場合、結合演算子は問題を解決しません。

    私は、​​が設定されているときは、キャッシュのいくつかのフォームを必要とすることEnigmativityとAlSkiにすなわち

    List<Tuple<Guid,DateTime>> cache = new List<Tuple<Guid,DateTime>>(); 
    Subject<Tuple<Guid,DateTime>> execute = new Subject<Tuple<Guid,DateTime>>(); 
    
    idUpdate.Subscribe(x => { 
        if (queryStateUpdate.Last().Item1) //might be missing something here with Last, you might need to copy the state out 
         exeucte.OnNext(x); 
        else 
         cache.Add(x); 
        }); 
    
    queryStateUpdate.Subscribe(x=> { 
        if (x.Item1) 
        { 
         //needs threadsafety 
         foreach(var x in cache) 
          execute.OnNext(x); 
         cache.Clear(); 
        }); 
    
    +0

    AlSkiは助けてくれてありがとう。問題は、Last()がスレッドをブロックし、Silverlightを使用しているために機能しないことです。 – jonperl

    +0

    それについて考えた後、私はCombineLatestとStackキャッシュで何かになるかもしれないと思う。 – jonperl

    +0

    私はもっとユニットテストをする必要があります:) – jonperl

    0

    感謝をお勧めします。キャッシュを使用して、私は答えを思いつきました。

    var execute = new Subject<Guid>(); 
    var cache = new Stack<Guid>(); 
    idUpdate.CombineLatest(queryStateUpdate, (id, qs) => new { id, qs }).Subscribe(anon => 
    { 
        var id = anon.id; 
        var queryState = anon.qs; 
        //The roleId updated after the queryState updated 
        if (id.Item2 > queryState.Item2) 
        { 
         //If the queryState is active, call execute 
         if (observationState.Item1) 
         { 
         cache.Clear(); 
         execute.OnNext(roleId.Item1); 
         return; 
         } 
         //If the id updated and the state is suspended, cache it 
         cache.Push(id.Item1); 
        } 
        //The queryState updated after the roleId 
        else if (queryState.Item2 > roleId.Item2) 
        { 
        //If the queryState is active and a roleId update has been cached, call execute 
        if (queryState.Item1 && cache.Count > 0) 
        { 
         execute.OnNext(cache.Pop()); 
         cache.Clear(); 
        } 
        }}); 
    
    +0

    このソリューションは、あなたの観測値が同時に(DateTimeの解決に比例して)値を生成する場合を除いて動作します。私のアップデートを見てください。タプルに 'DateTime'を付けます。 – Enigmativity