2017-02-16 6 views
0

私は例えば、いくつかの観察可能な配列を有した後のアクションを実行します。観察可能なシーケンスの完全なすべてのオブザーバー

var period = TimeSpan.FromSeconds(0.5); 
var observable = Observable 
    .Interval(period) 
    .Publish() 
    .RefCount(); 

を私はバックグラウンドスレッドで、このシーケンスの要素のためのいくつかのハードの計算を実行し、また、いくつかの最後のアクションを実行したいですすべての計算が完了したとき。だから私はこのようなものが欲しい:

observable.ObserveOn(Scheduler.Default).Subscribe(i => ComplexComputation1(i)); 
observable.ObserveOn(Scheduler.Default).Subscribe(i => ComplexComputation2(i)); 
// next observer must be called only after ComplexComputation1/2 complete on input i 
observable.Subscribe(i => FinalAction(i)); 

私はRxでこれを行うことができますか?あるいは、これは反応性プログラミングのいくつかの原則に違反し、このような状況で別のアプローチを使うべきでしょうか?

答えて

2

リアクティブパターンで計算順序のシーケンスを使用することは非常に危険です。

あなたができることの1つは、各複雑な計算が完了した後にイベントを放出することです。それで、前のステップが完了したというメッセージを受け取ったら、計算を実行する消費者を持つことができます。


もう1つの考えられる解決策は、規則的に発射される具体的なシーケンスブロックを作成することです。これにより、ソリューションの並列性が低下します。これをテストするには

observable.ObserveOn(Scheduler.Default).Subscribe(i => 
{  
    ComplexComputation1(i)); 
    ComplexComputation2(i)); 
    FinalAction(i); 
} 
+0

私はこのアプローチについて考えました。しかし実際には、私は各要素を処理する多くのオブザーバーを知らない。この解決策はそのような状況に適用できるか? –

+0

私は可能な解決策としてSequence Blockアプローチを追加しました。 –

1

私はイベントのシーケンスを説明するのを助けるために、次のメソッドを作成しました:

 
Begin FinalAction 
Begin ComplexComputation1 
Begin ComplexComputation2 
End ComplexComputation2 
End FinalAction 
End ComplexComputation1 
Begin FinalAction 
Begin ComplexComputation1 
Begin ComplexComputation2 
End FinalAction 
End ComplexComputation2 
End ComplexComputation1 
Begin FinalAction 
Begin ComplexComputation1 
Begin ComplexComputation2 
End ComplexComputation2 
End ComplexComputation1 
End FinalAction 
... 

それは強制するのは簡単です:

public void ComplexComputation1(long i) 
{ 
    Console.WriteLine("Begin ComplexComputation1"); 
    Thread.Sleep(100); 
    Console.WriteLine("End ComplexComputation1"); 
} 

public void ComplexComputation2(long i) 
{ 
    Console.WriteLine("Begin ComplexComputation2"); 
    Thread.Sleep(100); 
    Console.WriteLine("End ComplexComputation2"); 
} 

public void FinalAction(long i) 
{ 
    Console.WriteLine("Begin FinalAction"); 
    Thread.Sleep(100); 
    Console.WriteLine("End FinalAction"); 
} 

あなたの元のコードは次のように走りました単一のバックグラウンドスレッド上で連続して実行するコード。ただEventLoopSchedulerを使用してください。与え

var els = new EventLoopScheduler(); 

observable.ObserveOn(els).Subscribe(i => ComplexComputation1(i)); 
observable.ObserveOn(els).Subscribe(i => ComplexComputation2(i)); 
// next observer must be called only after ComplexComputation1/2 complete on input i 
observable.ObserveOn(els).Subscribe(i => FinalAction(i)); 

 
Begin ComplexComputation1 
End ComplexComputation1 
Begin ComplexComputation2 
End ComplexComputation2 
Begin FinalAction 
End FinalAction 
Begin ComplexComputation1 
End ComplexComputation1 
Begin ComplexComputation2 
End ComplexComputation2 
Begin FinalAction 
End FinalAction 
Begin ComplexComputation1 
End ComplexComputation1 
Begin ComplexComputation2 
End ComplexComputation2 
Begin FinalAction 
End FinalAction 

しかし、すぐにあなたがScheduler.Defaultを紹介すると、これは動作しません。

多かれ少なかれ、単純なオプションは、これを実行することです:期待どおりに動作

var cc1s = observable.ObserveOn(Scheduler.Default).Select(i => { ComplexComputation1(i); return Unit.Default; }); 
var cc2s = observable.ObserveOn(Scheduler.Default).Select(i => { ComplexComputation2(i); return Unit.Default; }); 

observable.Zip(cc1s.Zip(cc2s, (cc1, cc2) => Unit.Default), (i, cc) => i).Subscribe(i => FinalAction(i)); 

あなたがこのような素敵なシーケンスを取得:

 
Begin ComplexComputation1 
Begin ComplexComputation2 
End ComplexComputation1 
End ComplexComputation2 
Begin FinalAction 
End FinalAction 
Begin ComplexComputation2 
Begin ComplexComputation1 
End ComplexComputation2 
End ComplexComputation1 
Begin FinalAction 
End FinalAction 
Begin ComplexComputation1 
Begin ComplexComputation2 
End ComplexComputation2 
End ComplexComputation1 
Begin FinalAction 
End FinalAction 
0

これは、ネストされた、観察が平坦化された組成物の単純なケースのように思える(SelectManyは/ /連結方式のマージ)し、私が撮影してきた。ここ

ジップLong Runningメソッドを引き受ける自由はTaskです。 しかし、それらがいなければ、遅いブロッキング同期メソッドは代わりにObservable.Start(()=>ComplexComputation1(x))でラップすることができます。

void Main() 
{ 
    var period = TimeSpan.FromSeconds(0.5); 
    var observable = Observable 
     .Interval(period) 
     .Publish() 
     .RefCount(); 

    var a = observable.Select(i => ComplexComputation1(i).ToObservable()) 
       .Concat(); 
    var b = observable.Select(i => ComplexComputation2(i).ToObservable()) 
       .Concat(); 

    a.Zip(b, Tuple.Create) 
     .Subscribe(pair => FinalAction(pair.Item1, pair.Item2)); 
} 

// Define other methods and classes here 
Random rnd = new Random(); 
private async Task<long> ComplexComputation1(long i) 
{ 
    await Task.Delay(rnd.Next(50, 1000)); 
    return i; 
} 
private async Task<long> ComplexComputation2(long i) 
{ 
    await Task.Delay(rnd.Next(50, 1000)); 
    return i; 
} 

private void FinalAction(long a, long b) 
{ 

}