2012-05-11 3 views
1

最初に達成したいことを説明しましょう。Rxでのデータベースのバッファリング

私は、次のデータの受信形式にイベントストリーム

var data = new string[] { 
       "hello", 
       "Using", 
       "ok:michael", 
       "ok", 
       "begin:events", 
       "1:232", 
       "2:343", 
       "end:events", 
       "error:dfljsdf", 
       "fdl", 
       "error:fjkdjslf", 
       "ok" 
      }; 

私は、データ・ソースをサブスクライブするとき、私は私がしたい、基本的に次のような結果

"ok:michael" 
"ok" 
"begin:events 1:232 2:343 end:events" 
"error:dfljsdf" 
"error:fjkdjslf" 
"ok" 

を取得したいと思いを持っているとしましょうがokかエラーで開始し、データがから開始と終了の間でになるようにしてください。私は反応性プログラミングに非常に新しいですので、私はこれまでのところ、これを試してみました

..

var data = new string[] { 
       "hello", 
       "Using", 
       "ok:michael", 
       "ok", 
       "begin:events", 
       "1:232", 
       "2:343", 
       "end:events", 
       "error:dfljsdf", 
       "fdl", 
       "error:fjkdjslf", 
       "ok" 
      }; 



      var dataStream = Observable.Generate(
           data.GetEnumerator(), 
           e => e.MoveNext(), 
           e => e, 
           e => e.Current.ToString(), 
           e => TimeSpan.FromSeconds(0.1));   

      var onelineStream = from d in dataStream 
           where d.StartsWith("ok") || d.StartsWith("error") 
           select d; 

      // ??? 
      // may be need to buffer? I want to get data like "begin:events 1:232 2:343 end:events" 
      // but it is not working... 
      var multiLineStream = from list in dataStream.Buffer<string, string, string>(
           bufferOpenings: dataStream.Where(d => d.StartsWith("begin")), 
           bufferClosingSelector: b => dataStream.Where(d => d.StartsWith("end"))) 
           select String.Join(" ", list); 

      // merge two stream???? 
      // but I have no clue how to merge these twos :(

      mergeStream .Subscribe(d => 
      { 
       Console.WriteLine(d); 
       Console.WriteLine(); 
      }); 

、私は自分自身が反応性の方法で考えるようにすることはできません。 :(あなたが正しい答えにとても、とても接近していた事前に

感謝。

答えて

6

基本的にあなたがちょうど約onelineStream & multiLineStreamクエリを持っていた。

それらを一緒にされたマージ非常に簡単です:

onelineStream.Merge(multiLineStream) 

ただし、あなたのクエリが落ちた場所tは値の間に遅延を導入するために使用したObservable.Generateです。これは、あなたが複数の加入者を持っている場合に、ある種の「ファンアウト」という値を観測できるようにします。

このコードがどのように動作するか

は、あなたのデータとdataStream一見のためのあなたの定義を考える:いくつかは、1つのサブスクリプションで扱う持って、他の人が取り扱わしまったことを

!hello 
@Using 
!ok:michael 
@ok 
@1:232 
!begin:events 
@2:343 
!end:events 
!fdl 
@error:dfljsdf 
!error:fjkdjslf 
@ok 

お知らせ:

dataStream.Select(x => "!" + x).Subscribe(Console.WriteLine); 
dataStream.Select(x => "@" + x).Subscribe(Console.WriteLine); 

あなたは、これらの値を取得しますもう片方によって。これは、あなたのonelineStream & multiLineStreamのクエリがちょうど正しかったにもかかわらず、データのそれぞれが一部しか表示されず、したがって期待どおりに動作しないことを意味します。

また、値をスキップして重複する可能性がある競合状態を取得することもできます。ですから、この種の観測を避けるのが最善です。

値の間の遅延を導入するためのより良いアプローチは、これを行うことです。

var dataStream = data.ToObservable().Do(_ => Thread.Sleep(100)); 

さて、これはすべての新しい加入者がsoから観察できるの新鮮なサブスクリプションを開始してしまいます。つまり、観察可能な「冷たい」を作成します最初の値。

multiLineStreamクエリは、低温観測で正しく機能しません。

データストリームを観測可能(「加入者間で値を共有する」)にするには、Publish演算子を使用します。

ので、multiLineStreamは、次のようになります。

var multiLineStream = 
    dataStream.Publish(ds => 
     from list in ds.Buffer(
      ds.Where(d => d.StartsWith("begin")), 
      b => ds.Where(d => d.StartsWith("end"))) 
     select String.Join(" ", list)); 

あなたはそのようにのようなあなたの結果を得ることができます:

ok:michael 
ok 
begin:events 1:232 2:343 end:events 
error:dfljsdf 
error:fjkdjslf 
ok 

私をみましょう:

onelineStream.Merge(multiLineStream).Subscribe(d => 
{ 
    Console.WriteLine(d); 
    Console.WriteLine(); 
}); 

これは私が得たものですそれがあなたのために働くかどうかを知る。

+0

私は10回upvoteできたらいいなあ。 :) –

+1

他の訪問者のために 'dataStream'がイベント(私の場合は' Observable.FromEvent'ファクトリメソッド)から作成された場合、 'Publish'メソッドは必要ありません。 –

+1

これは本当に非常に教育的なスタイルで元のコードのすべての問題を完全にカバーする優れた答えです。フルクレジット。 – yamen

関連する問題