2017-01-23 11 views
1

リクエストを行い、そのリクエストに対するレスポンスをレスポンスストリームで待つ。待っている間、Updatesストリームの更新があるかもしれません。リアクティブエクステンション:他のストリームが完了するまでのバッファ

応答ストリームが完了したら、これらの更新が必要です。このように:

Response (cold) |  x y z | 
Updates (hot)  | 1 2 3  4 | 

Result   |  x y z 123 4 | 

私はそれを正しく理解することはできません。私が必要とするものを私に与えることができる演算子の巧妙な組み合わせがありますか?

+0

大声で考えてみましょう。実際にはこのように少し見えます:http://stackoverflow.com/questions/31362031/with-reactive-extensions-rx-is-it-possible-to-add-a-pause-commandブール値ではなくストリームの完全な値。また、一時停止していないと再び一時停止することはできません。 hot.StartWithAndBufferUntilComplete(cold)のようなものです。 – asgerhallas

+0

複数の「応答」が存在しますか? – Shlomo

+0

@Shlomo Reponseストリームに複数のイベント(上記のx y zなど)がある可能性がありますが、完了するとそれ以上のレスポンスはありません。 – asgerhallas

答えて

2

を:

応答がすべて完了するまでバッファリングし、残りの更新を表示します。 すぐに回答が表示されます。

updates.TakeUntil(response.LastAsync()) 
     .ToArray() 
     .SelectMany(x => x) 
     .Concat(updates) 
     .Merge(response); 
+0

遅れて返信して申し訳ありません!これは私がそれを必要とするように正確に動作するように思われ、少し熟考の後に私もそれを理解する:)ありがとう! – asgerhallas

1

あなたはワンライナーとしてそれを行うことができないという事実が私を悩ますが、これは、動作します:

ここ
var updatesReplayed = updates.Replay(); 
updatesReplayed.Connect(); 

var results = response.Concat(updatesReplayed); 

の機能実証実験:質問の図から

// time    | 1 2 3 4 5 6 7 8 9 
// Response(cold)  |  x y z | 
// Updates(hot)  | 1 2 3  4 | 
// Result    |  x y z 123 4 | 

var scheduler = new TestScheduler(); 
    var updates = scheduler.CreateHotObservable(
     ReactiveTest.OnNext(100.ToMsTicks(), "1"), 
     ReactiveTest.OnNext(200.ToMsTicks(), "2"), 
     ReactiveTest.OnNext(400.ToMsTicks(), "3"), 
     ReactiveTest.OnNext(800.ToMsTicks(), "4"), 
     ReactiveTest.OnCompleted<string>(900.ToMsTicks()) 
    ); 

    var response = scheduler.CreateColdObservable(
     ReactiveTest.OnNext(300.ToMsTicks(), "x"), 
     ReactiveTest.OnNext(400.ToMsTicks(), "y"), 
     ReactiveTest.OnNext(500.ToMsTicks(), "z"), 
     ReactiveTest.OnCompleted<string>(600.ToMsTicks()) 
    ); 

    var expectedResults = scheduler.CreateHotObservable(
     ReactiveTest.OnNext(300.ToMsTicks(), "x"), 
     ReactiveTest.OnNext(400.ToMsTicks(), "y"), 
     ReactiveTest.OnNext(500.ToMsTicks(), "z"), 
     ReactiveTest.OnNext(600.ToMsTicks(), "1"), 
     ReactiveTest.OnNext(600.ToMsTicks(), "2"), 
     ReactiveTest.OnNext(600.ToMsTicks(), "3"), 
     ReactiveTest.OnNext(800.ToMsTicks(), "4"), 
     ReactiveTest.OnCompleted<string>(900.ToMsTicks()) 
    ); 

    var replayed = updates.Replay(); 
    replayed.Connect(); 

    var results = response.Concat(replayed); 
    var observer = scheduler.CreateObserver<string>(); 
    results.Subscribe(observer); 

    scheduler.Start(); 
    ReactiveAssert.AreElementsEqual(expectedResults.Messages, observer.Messages); 
+0

これはすばらしいです、ありがとう!私はReplay()を試してみましたが、振り返ってみても意味がないStartWithとの組み合わせでのみでした。これははるかに意味があります:) – asgerhallas

+0

1つの質問...再生されたストリームは、購読されているときに新しいメッセージのバッファリングを停止しますか?私はこれが元の質問の一部ではないことを知っていますが、クエリが非常に長時間実行されているので非常に重要です。 – asgerhallas

+0

はい。テスト結果は、「4」が直ちにポップアウトされるのを示しています。 – Shlomo

関連する問題