2016-07-14 10 views
4

ここでは、さまざまな文字の文字数を表示する簡単なプログラムがあります。それは期待どおりに動作します。Publish()の動作上の混乱Refcount()

static void Main(string[] args) { 
    var word = new Subject<string>(); 
    var wordPub = word.Publish().RefCount(); 
    var length = word.Select(i => i.Length); 
    var report = 
     wordPub 
     .GroupJoin(length, 
      s => wordPub, 
      s => Observable.Empty<int>(), 
      (w, a) => new { Word = w, Lengths = a }) 
     .SelectMany(i => i.Lengths.Select(j => new { Word = i.Word, Length = j })); 
    report.Subscribe(i => Console.WriteLine($"{i.Word} {i.Length}")); 
    word.OnNext("Apple"); 
    word.OnNext("Banana"); 
    word.OnNext("Cat"); 
    word.OnNext("Donkey"); 
    word.OnNext("Elephant"); 
    word.OnNext("Zebra"); 
    Console.ReadLine(); 
} 

、出力は次のとおりです。

Apple 5 
Banana 6 
Cat 3 
Donkey 6 
Elephant 8 
Zebra 5 

"wordpubは" 二回 "レポート" に含まれているので、私は)(公開RefCountプロパティを()を使用しました。それがなければ、最初に単語が出されると、レポートの一部がコールバックによって通知され、レポートの他の部分に通知が2回通知されます。それは何が起こるかです。出力は6ではなく11の項目を持つことになります。少なくともそれは私が起こっていると思います。私はPublish()。RefCount()をこの状況で使用すると同時に、レポートの両方の部分を更新すると思います。

私もこのような公表されたソースに使用する長さ機能を変更する場合は:長さ機能も使用することができないのはなぜ

Apple 5 
Apple 6 
Banana 6 
Cat 3 
Banana 3 
Cat 6 
Donkey 6 
Elephant 8 
Donkey 8 
Elephant 5 
Zebra 5 

var length = wordPub.Select(i => i.Length); 

を次に出力がこれです同じ出版源ですか?

答えて

3

はこれでした解決すべき大きなチャレンジ! これが起こるような微妙な条件。 長い説明のために事前に謝罪しますが、私に負担してください!

TL; DR公開ソースに

サブスクリプションは、順に処理が、他のサブスクリプションの前に直接公開されていないソースにしています。キューをジャンプすることができます。 GroupJoinサブスクリプション順は、ウィンドウの開閉を決定する際に重要です。


私が最初に懸念しているのは、主題を参照することです。 これはノーオペレーションでなければなりません。 Subject<T>には購読料がありません。

ですから、Publish().RefCount()を削除する場合:

var word = new Subject<string>(); 
var wordPub = word;//.Publish().RefCount(); 
var length = word.Select(i => i.Length); 

を、あなたは同じ問題を取得します。

私はGroupJoinを見ています(私の直感ではPublish().Refcount()は赤ん坊です)。 私にとっては、これだけを目の当たりにすることは合理化するのが難しかったので、私は単純なデバッグにも頼りにしています。私は何十年も使っています - TraceまたはLog拡張メソッド。

public interface ILogger 
{ 
    void Log(string input); 
} 
public class DumpLogger : ILogger 
{ 
    public void Log(string input) 
    { 
     //LinqPad `Dump()` extension method. 
     // Could use Console.Write instead. 
     input.Dump(); 
    } 
} 


public static class ObservableLoggingExtensions 
{ 
    private static int _index = 0; 

    public static IObservable<T> Log<T>(this IObservable<T> source, ILogger logger, string name) 
    { 
     return Observable.Create<T>(o => 
     { 
      var index = Interlocked.Increment(ref _index); 
      var label = $"{index:0000}{name}"; 
      logger.Log($"{label}.Subscribe()"); 
      var disposed = Disposable.Create(() => logger.Log($"{label}.Dispose()")); 
      var subscription = source 
       .Do(
        x => logger.Log($"{label}.OnNext({x.ToString()})"), 
        ex => logger.Log($"{label}.OnError({ex})"), 
        () => logger.Log($"{label}.OnCompleted()") 
       ) 
       .Subscribe(o); 

      return new CompositeDisposable(subscription, disposed); 
     }); 
    } 
} 

私はそれがこのようになりますあなたの提供されたコードへのロギングを追加します。私のログに

var logger = new DumpLogger(); 

var word = new Subject<string>(); 
var wordPub = word.Publish().RefCount(); 
var length = word.Select(i => i.Length); 
var report = 
    wordPub.Log(logger, "lhs") 
    .GroupJoin(word.Select(i => i.Length).Log(logger, "rhs"), 
     s => wordPub.Log(logger, "lhsDuration"), 
     s => Observable.Empty<int>().Log(logger, "rhsDuration"), 
     (w, a) => new { Word = w, Lengths = a }) 
    .SelectMany(i => i.Lengths.Select(j => new { Word = i.Word, Length = j })); 
report.Subscribe(i => ($"{i.Word} {i.Length}").Dump("OnNext")); 
word.OnNext("Apple"); 
word.OnNext("Banana"); 
word.OnNext("Cat"); 
word.OnNext("Donkey"); 
word.OnNext("Elephant"); 
word.OnNext("Zebra"); 

これます、出力の公開では、次の

ログ(ようなもの)。 RefCount()が使用されました

0001lhs.Subscribe()    
0002rhs.Subscribe()    
0001lhs.OnNext(Apple) 
0003lhsDuration.Subscribe()  
0002rhs.OnNext(5) 
0004rhsDuration.Subscribe() 
0004rhsDuration.OnCompleted() 
0004rhsDuration.Dispose() 

    OnNext 
    Apple 5 

0001lhs.OnNext(Banana) 
0005lhsDuration.Subscribe()  
0003lhsDuration.OnNext(Banana) 
0003lhsDuration.Dispose()  
0002rhs.OnNext(6) 
0006rhsDuration.Subscribe() 
0006rhsDuration.OnCompleted() 
0006rhsDuration.Dispose() 

    OnNext 
    Banana 6 
... 

私は、使用Publish().RefCount()を削除し、次のように新しいログ出力は次のとおりです。我々は、ログに注釈を付ける起動したときにこれが私たちにいくつかの洞察力を与えるだけ件名

0001lhs.Subscribe()     
0002rhs.Subscribe()     
0001lhs.OnNext(Apple) 
0003lhsDuration.Subscribe()   
0002rhs.OnNext(5) 
0004rhsDuration.Subscribe() 
0004rhsDuration.OnCompleted() 
0004rhsDuration.Dispose() 

    OnNext 
    Apple 5 

0001lhs.OnNext(Banana) 
0005lhsDuration.Subscribe()   
0002rhs.OnNext(6) 
0006rhsDuration.Subscribe() 
0006rhsDuration.OnCompleted() 
0006rhsDuration.Dispose() 

    OnNext 
    Apple 6 

    OnNext 
    Banana 6 

0003lhsDuration.OnNext(Banana) 
0003lhsDuration.Dispose() 
... 

なし

ログ、しかし問題は本当に明らかになったときでありますサブスクリプションの論理リストを使用します。私たちの注釈がword.OnNext("Banana");が実行されたときに、この例では、オブザーバーのチェーンがこの順

    にリンクされている。この

    //word.Subsribers.Add(wordPub) 
    
    0001lhs.Subscribe()    //wordPub.Subsribers.Add(0001lhs) 
    0002rhs.Subscribe()    //word.Subsribers.Add(0002rhs) 
    0001lhs.OnNext(Apple) 
    0003lhsDuration.Subscribe()  //wordPub.Subsribers.Add(0003lhsDuration) 
    0002rhs.OnNext(5) 
    0004rhsDuration.Subscribe() 
    0004rhsDuration.OnCompleted() 
    0004rhsDuration.Dispose() 
    
        OnNext 
        Apple 5 
    
    0001lhs.OnNext(Banana) 
    0005lhsDuration.Subscribe()  //wordPub.Subsribers.Add(0005lhsDuration) 
    0003lhsDuration.OnNext(Banana) 
    0003lhsDuration.Dispose()  //wordPub.Subsribers.Remove(0003lhsDuration) 
    0002rhs.OnNext(6) 
    0006rhsDuration.Subscribe() 
    0006rhsDuration.OnCompleted() 
    0006rhsDuration.Dispose() 
    
        OnNext 
        Banana 6 
    

    ように見えるかもしれませんRefCountプロパティを持つ元(作業)コードで

  1. wordPub
  2. 0002rhs

ただし、wordPubには、サブスクリプションがあります! だから、本当の購読リストは

  1. wordPub
    1. 0001lhs
    2. 0003lhsDuration
    3. 0005lhsDuration
  2. 0002rhs
のように見えます我々は件名に注釈を付ける場合は210

のみword.OnNext("Banana");はオブザーバーのチェーンが0003lhsDurationように、この順で

1. 0001lhs 
2. 0002rhs 
3. 0003lhsDuration 
4. 0005lhsDuration 

をリンクされて実行されたとき、我々は微妙この例では

0001lhs.Subscribe()     //word.Subsribers.Add(0001lhs) 
0002rhs.Subscribe()     //word.Subsribers.Add(0002rhs) 
0001lhs.OnNext(Apple) 
0003lhsDuration.Subscribe()   //word.Subsribers.Add(0003lhsDuration) 
0002rhs.OnNext(5) 
0004rhsDuration.Subscribe() 
0004rhsDuration.OnCompleted() 
0004rhsDuration.Dispose() 

    OnNext 
    Apple 5 

0001lhs.OnNext(Banana) 
0005lhsDuration.Subscribe()   //word.Subsribers.Add(0005lhsDuration) 
0002rhs.OnNext(6) 
0006rhsDuration.Subscribe() 
0006rhsDuration.OnCompleted() 
0006rhsDuration.Dispose() 

    OnNext 
    Apple 6 

    OnNext 
    Banana 6 

0003lhsDuration.OnNext(Banana) 
0003lhsDuration.Dispose() 

だから、どこにあるか見るログインサブスクリプションが0002rhsの後にアクティブになると、rhsの値が送信され、まだ開いているウィンドウでそれを生成するまで、ウィンドウを終了する "バナナ"値は表示されません。

やれやれ

@ francezu13k50はあなたの問題を明らかにし、簡単な解決策はただword.Select(x => new { Word = x, Length = x.Length });を使用することです指摘しているように、私はあなたが私は理解して私達にあなたの本当の問題を単純化したバージョン(感謝)を与えていると思うとなぜこれは適切ではないのですか? しかし、あなたの本当の問題空間が何であるか分からないので、現在のコードを持っていることを除いて、ソリューションを提供するために何を提案するべきか分かりません。

+0

偉大な分析!私は自分のツールボックスにロガーを追加しています。実際の問題空間は、ビューモデルにrxを使用しています。電子メールアドレスのように、ユーザーがフォームに入力するプロパティがあります。電子メールアドレスが有効かどうかなど、これらのプロパティに関する計算があります。私は入力と出力を相関させようとしています。私は電子メールとisEmailValidに参加するかもしれません。相関関係のように見えるのは難しいです。たぶん私は出力に入力を含めるでしょう - isEmailValidは{Email = string、IsValid = bool}になるので、相関は必要ありません。 – JustinM

+0

私がグループに参加している理由:私の実際の問題空間では、私はいくつかの遅い検証があります。 URLを検証するために、答えはnull(未知数)として開始されるので、UIを「計算中...」メッセージに続けてtrue/falseを更新して、UIを「良いURL」で更新することができます。または "bad URL!"と表示されます。 SelectManyを使用すると、UIに表示される検証メッセージを平坦化できます。だから実際にはかなり単純な問題であり、簡単な答えがないことが悲惨です。私の答えはPublish()に依存しています。RefCount()は不必要であると言います。 – JustinM

+0

スキャンに関する私のソリューションを参照してください。私はこれがGroupJoinの予期せぬ/トリッキーさがなければ私が望むものを達成すると思う。繰り返しますが、私はここで本当に簡単なことをしようとしています - 入力と対応する出力を相関させます。これを拡張メソッドにして、簡単に再利用できるようにすることもできます。 – JustinM

0

RefCountは、返されたObservableに少なくとも1つのサブスクリプションがある限り、ソースに接続されたObservableを返します。最後のサブスクリプションが廃棄されると、RefCountはソースとの接続を破棄し、新しいサブスクリプションが作成されると再接続します。レポートクエリでは、 'wordPub'へのすべてのサブスクリプションがクエリが実行される前に破棄されることがあります。

var report = word.Select(x => new { Word = x, Length = x.Length }); 

編集:代わりに、あなたは、単に行うことができ、複雑なGroupJoinクエリの

あなたがGroupJoin演算子を使用する場合は、このにレポートクエリを変更し :

var report = 
     wordPub 
     .GroupJoin(length, 
      s => wordPub, 
      s => Observable.Empty<int>(), 
      (w, a) => new { Word = w, Lengths = a }) 
     .SelectMany(i => i.Lengths.FirstAsync().Select(j => new { Word = i.Word, Length = j })); 
+0

私のクエリを書く方法はずっと簡単です。私はちょうど別々のストリームを一緒に引っ張る方法について学ぼうとしています。 – JustinM

+0

問題は、 'GroupJoin'によって生成された各グループを一連の長さに投影することです。現在の長さだけシーケンスを取り除く必要があります。 – francezu13k50

0

GroupJoinは非常に扱いにくいようですが、ここでは関数の入力と出力を関連付ける別の方法があります。すべての入力は、(1)出力のみ入力と同じ順序で到着し、(2)各出力は、その最新の入力に対応することが制約を受ける0以上の出力を有する場合

static void Main(string[] args) { 
    var word = new Subject<string>(); 
    var length = new Subject<int>(); 
    var report = 
     word 
     .CombineLatest(length, (w, l) => new { Word = w, Length = l }) 
     .Scan((a, b) => new { Word = b.Word, Length = a.Word == b.Word ? b.Length : -1 }) 
     .Where(i => i.Length != -1); 
    report.Subscribe(i => Console.WriteLine($"{i.Word} {i.Length}")); 
    word.OnNext("Apple"); length.OnNext(5); 
    word.OnNext("Banana"); 
    word.OnNext("Cat"); length.OnNext(3); 
    word.OnNext("Donkey"); 
    word.OnNext("Elephant"); length.OnNext(8); 
    word.OnNext("Zebra"); length.OnNext(5); 
    Console.ReadLine(); 
} 

このアプローチは機能します。これはLeftJoinと似ています。最初のリスト(単語)の各アイテムは、最初のリストの別のアイテムが放出されるまで、その後に到着する右側のリスト(長さ)のアイテムとペアになります。

0

GroupJoinの代わりに通常のJoinを使用しようとしています。新しい単語が作成されたときに、新しいウィンドウを作成して現在のウィンドウを終了することにJoinの競合状態があったという問題があると思いました。そこでここでは、すべての単語をウィンドウの終わりを示すヌルとペアにすることでそれをエリートしようとしました。最初のバージョンと同じように動作しません。前のウィンドウが最初に閉じられることなく、各ウィンドウに新しいウィンドウが作成されるのはどうでしょうか?完全に混乱しています。

static void Main(string[] args) { 
    var lgr = new DelegateLogger(Console.WriteLine); 
    var word = new Subject<string>(); 
    var wordDelimited = 
     word 
     .Select(i => Observable.Return<string>(null).StartWith(i)) 
     .SelectMany(i => i); 
    var wordStart = wordDelimited.Where(i => i != null); 
    var wordEnd = wordDelimited.Where(i => i == null); 
    var report = Observable 
     .Join(
      wordStart.Log(lgr, "word"), // starts window 
      wordStart.Select(i => i.Length), 
      s => wordEnd.Log(lgr, "expireWord"), // ends current window 
      s => Observable.Empty<int>(), 
      (l, r) => new { Word = l, Length = r }); 
    report.Subscribe(i => Console.WriteLine($"{i.Word} {i.Length}")); 
    word.OnNext("Apple"); 
    word.OnNext("Banana"); 
    word.OnNext("Cat"); 
    word.OnNext("Zebra"); 
    word.OnNext("Elephant"); 
    word.OnNext("Bear"); 
    Console.ReadLine(); 
}