2017-02-27 10 views
1

は、単純なシナリオを考えます一時停止するかAが部屋から誘拐されていますか?ハートビートパターン用いた反応性拡張

Aが会話するとき、AはIObservable トークを提供し、その後BはTalk.Subscribe(string => what Aが処理したもの)にサブスクライブします。 BはObservable.Interval ハートビートをハートビートチェックとして同時に購読することができます。

私の質問は、オペレータが、私がコンバイン/ 2 IObservableをマージするために使用すべきかハートビートの2項目以上話しからの項目が存在していない場合ように、BはAが誘拐されたと仮定します。

変数を適切に同期させないと、副作用が発生する可能性があるので、変数を避けることをお勧めします。

おかげで、

答えて

2

は、あなたが「A」は、最後の話を聞いたので、状態は心拍数を表すと、に基づいて行動したい状態変数を想像してみてください。それは次のようになります。

var stateObservable = Observable.Merge(     //State represent number of heartbeats since A last spoke 
    aSource.Select(_ => new Func<int, int>(i => 0)),  //When a talks, set state to 0 
    bHeartbeat.Select(_ => new Func<int, int>(i => i + 1)) //when b heartbeats, increment state 
) 
    .Scan(0, (state, func) => func(state)); 

我々は状態を増分として0に状態をリセットする機能として話すの事件、およびBのheartbeattingのインシデントを表します。次に、Scan関数で累積します。

残りの部分は今は簡単です:

var isKidnapped = stateObservable 
    .Where(state => state >= 2) 
    .Take(1); 

isKidnapped.Subscribe(_ => Console.WriteLine("A is kidnapped")); 

EDIT:あなたの答えのための

var aSources = new Subject<Tuple<string, Subject<string>>>(); 
var bHeartbeat = Observable.Interval(TimeSpan.FromSeconds(1)).Publish().RefCount(); 

var stateObservable = aSources.SelectMany(t => 
     Observable.Merge(
      t.Item2.Select(_ => new Func<int, int>(i => 0)), 
      bHeartbeat.Select(_ => new Func<int, int>(i => i + 1)) 
     ) 
     .Scan(0, (state, func) => func(state)) 
     .Where(state => state >= 2) 
     .Take(1) 
     .Select(_ => t.Item1) 
    ); 

stateObservable.Subscribe(s => Console.WriteLine($"{s} is kidnapped")); 
aSources 
    .SelectMany(t => t.Item2.Select(s => Tuple.Create(t.Item1, s))) 
    .Subscribe(t => Console.WriteLine($"{t.Item1} says '{t.Item2}'")); 
bHeartbeat.Subscribe(_ => Console.WriteLine("**Heartbeat**")); 

var a = new Subject<string>(); 
var c = new Subject<string>(); 
var d = new Subject<string>(); 
var e = new Subject<string>(); 
var f = new Subject<string>(); 

aSources.OnNext(Tuple.Create("A", a)); 
aSources.OnNext(Tuple.Create("C", c)); 
aSources.OnNext(Tuple.Create("D", d)); 
aSources.OnNext(Tuple.Create("E", e)); 
aSources.OnNext(Tuple.Create("F", f)); 

a.OnNext("Hello"); 
c.OnNext("My name is C"); 
d.OnNext("D is for Dog"); 
await Task.Delay(TimeSpan.FromMilliseconds(1200)); 
e.OnNext("Easy-E here"); 
a.OnNext("A is for Apple"); 
await Task.Delay(TimeSpan.FromMilliseconds(2200)); 
+0

おかげ@Shlomo:ここ

N Aソースとの例を示します、このソリューションをn + 1 whに拡大する方法はありますか? ere nは、実行時にのみ決定できる話者の数ですか? – LxL

+0

問題をよりよく定義できますか? – Shlomo

+0

現在、AとBのみがあります.Aの役割のような行動がC、D、E、Fに適用できるかどうかを確認したいと思います。私が考えることができるソリューションは、IObservableのリストと、実行時に追加されるサブスクリプションのリストを持つことです。それ以上のエレガントな方法はありますか? – LxL

関連する問題