2016-10-27 18 views
2

私はRX 2.2.5を使用しています。私はこの問題は、それは非常にまれ要素の順序をミスしないことをGROUPBYの後に来る、それはすべての要素のシーケンスを取得する前にGROUPBYRx.Net GroupByの実装の要素シーケンスがまれにある

  _transportService 
      .ObserveSubOrder(parentOrder.OrderId) 
      .SubscribeOn(_backgroundScheduler) 
      .ObserveOn(_uiScheduler) 
      .Where(subOs => subOs != null)     
      .Snoop("BeforeGrpBy") 
      .GroupBy(subOs => subOs.OrderId) 
      .Subscribe(subOrdUpdates => 
      { 
       AddIfNew(subOrdUpdates.Key, subOrdUpdates.Snoop("AfterGrpBy" + "--" + subOrdUpdates.Key));       
      }) 

とサブオーダーを読み込む2つの景色を眺めることができます。私はログから明らかなように、その並行性の問題は考えていません。これらのログを生成するには、カスタムスヌープ拡張メソッドを使用します。

16:15:44.8169968 : (1) : BeforeGrpBy: OnNext({ OrderId = 9Zsj8Z4sTRb, OrderType = WNX6, Quantity = 10, Price = 178.78125}) 
16:15:44.8169968 : (1) : AfterGrpBy--9Zsj8Z4sTRb: Observable obtained 
16:15:44.8369988 : (9) : AfterGrpBy--9Zsj8Z4sTRb: Subscribed to on. 
16:15:44.8379989 : (1) : BeforeGrpBy: OnNext({ OrderId = 9Zsj8Z4sTRb, OrderType = WNX6, Quantity = 10, Price = 178.78125}) 
16:15:44.8379989 : (9) : AfterGrpBy--9Zsj8Z4sTRb: Subscription completed. 
16:15:44.8590010 : (1) : AfterGrpBy--9Zsj8Z4sTRb: Observable obtained 
16:15:44.8600011 : (9) : AfterGrpBy--9Zsj8Z4sTRb: Subscribed to on. 
16:15:44.8610012 : (9) : AfterGrpBy--9Zsj8Z4sTRb: Subscription completed. 
16:15:44.8620013 : (1) : AfterGrpBy--9Zsj8Z4sTRb: OnNext({ OrderId = 9Zsj8Z4sTRb, OrderType = WNX6, Quantity = 10, Price = 178.78125}) 

フォーマット時間:(スレッド):メッセージ

GROUPBY onNextが二回呼び出される前に、あなたが見ることができるように、それは1を逃した後。 ここでRx文法に問題がありますか、それとも問題ですか?どんな洞察も助けになるだろうか?それ以上の明確化が必要な場合は、親切にコメントしてください。

更新: 追加作業/望ましいログ:

16:15:45.1070258 : (1) : BeforeGrpBy: OnNext({ OrderId = 44Fqp3ubNmL, OrderType = TTT6, Quantity = 39, Price = 130.21875}) 
16:15:45.1280279 : (1) : AfterGrpBy--44Fqp3ubNmL: Observable obtained 
16:15:45.1310282 : (10) : AfterGrpBy--44Fqp3ubNmL: Subscribed to on. 
16:15:45.1320283 : (10) : AfterGrpBy--44Fqp3ubNmL: Subscription completed. 
16:15:45.1320283 : (1) : AfterGrpBy--44Fqp3ubNmL: OnNext({ OrderId = 44Fqp3ubNmL, OrderType = TTT6, Quantity = 39, Price = 130.21875}) 
16:15:45.1330284 : (1) : BeforeGrpBy: OnNext({ OrderId = 44Fqp3ubNmL, OrderType = TTT6, Quantity = 39, Price = 130.21875}) 
16:15:45.1330284 : (1) : AfterGrpBy--44Fqp3ubNmL: Observable obtained 
16:15:45.1340285 : (10) : AfterGrpBy--44Fqp3ubNmL: Subscribed to on. 
16:15:45.1340285 : (10) : AfterGrpBy--44Fqp3ubNmL: Subscription completed. 
16:15:45.1350286 : (1) : AfterGrpBy--44Fqp3ubNmL: OnNext({ OrderId = 44Fqp3ubNmL, OrderType = TTT6, Quantity = 39, Price = 130.21875}) 

アップデート2:fireNewMapEntryがtrueの場合にのみ可能性のあるバグや機能

GROUPBYは、(GroupBy.cs)をgroupedObservableを発射し、これが起こりますここに

if (!_map.TryGetValue(key, out writer)) 
{ 
    writer = new Subject<TElement>(); 
    _map.Add(key, writer); 
    fireNewMapEntry = true; 
    } 

ここで、_mapのタイプはDictionary<TKey, ISubject<TElement>>です。これは問題になる可能性がありますか?

答えて

0

GroupByの性質がありません。

オペレータは、新しいグループが表示された後でのみ、OnNextを発信します(実装GroupBy.cs:67を参照)。あなたの場合、orderIDは両方の通知に等しいので、1つだけOnNextが放出されます。

オペレータが発行する値は、グループ内のさらなる通知にアクセスする必要がある場合にサブスクライブできるIGroupedObservable<T>です。あなたのコードのスタイルに

+0

私は100で、ほとんど一度、それはほとんど起こっていない質問で述べたようにあなたがgroupbyの後にログを見ると、2つの異なるobservableが得られ、2つの異なるgroupedObservableになるはずです。私は2つのonNextを期待しています..私は事件のためのログで質問を更新します。 – MKMohanty

+0

@LeeCampbellからメモを取って、querysコーディングスタイルを強化することをお勧めします。あなたの購読スタイルから、おそらくどこかに隠された競合状態があり、それは "乱雑さ"を作り出します。 'GroupBy'はサブスクリプションが破棄されて、やり直された後、同じキーに対して新しい' IGroupedObsevable'を生成するので、私の答えは有効だと私は信じています。 – supertopi

1

ただ、いくつかのメモ(私は@supertopiが答えたと思うと申し訳ありませんが、それは本当に答えではありません)

  1. 移動しますSubscribeOnObserveOn呼び出しあなたが前に行う最後のものであることをあなたの最終購読。現在のコードでは、WhereSnoopGroupByのすべてが_uiSchedulerの貴重なサイクルを実行しています。

  2. サブスクライブでの購読は避けてください。 AddIfNewが鍵とIObservable<T>を受け取っているように見えます。したがって、私はそれが内部的にいくつかのサブスクリプションを行っていると仮定します。代わりにあなたが知っているものに傾けてください。 GroupByを使用している場合は、グループが最初に生成されたときにキーが一意であることがわかります。だから、これは今Add(あなたがチェックしているキーの場合)だけになります。明示的にする場合は、Take(1)を使用することもできます。チェックしているキーではない値であれば、GroupByは冗長であるようです。別の開発者は、彼らがきれいに案内されているクエリを通読、代わりのsubOschildOschildUpdates間をジャンプされるようにchildOrderは(IMO)より良い名前のようだとき、あなたに一貫性のある変数名を維持する

  3. てみ

  4. 理想的には、観測可能なシーケンスにヌル値を戻さないでください。それは何の目的ですか?まれには意味をなさいかもしれませんが、しばしばOnCompletedの代わりにnullが使用され、このシーケンスの値がないことを示しています。

(スヌープとヌルチェックなし)

_transportService 
     .ObserveSubOrder(parentOrder.OrderId) 
     .Where(childOrder => childOrder != null)     
     .Snoop("BeforeGrpBy") 
     .GroupBy(childOrder => childOrder.OrderId) 
     .SelectMany(grp => grp.Take(1).Select(childOrder=>Tuple.Create(grp.key, childOrder)) 
     .SubscribeOn(_backgroundScheduler) 
     .ObserveOn(_uiScheduler) 
     .Subscribe(newGroup => 
     { 
      Add(newGroup.Item1, newGroup.Item2);       
     }, 
      ex=>//obviously we have error handling here ;-) 
     ); 

または

_transportService 
     .ObserveSubOrder(parentOrder.OrderId) 
     .Where(childOrder => childOrder != null)     
     .Snoop("BeforeGrpBy") 
     .SubscribeOn(_backgroundScheduler) 
     .ObserveOn(_uiScheduler) 
     .Subscribe(childOrder => 
      { 
      AddIfNew(childOrder.OrderId, childOrder);        
      }, 
      ex=>//obviously we have error handling here ;-) 
     ); 

とさらに良い

var subscription = _transportService 
     .ObserveSubOrder(parentOrder.OrderId) 
     .SubscribeOn(_backgroundScheduler) 
     .ObserveOn(_uiScheduler) 
     .Subscribe(
      childOrder => AddIfNew(childOrder.OrderId, childOrder), 
      ex=>//obviously we have error handling here ;-) 
     ); 

HTH

+1

得点ポイント3.ポイント4のために、私はフレームワーク内の異なるアセンブリからのものとして観測可能なシーケンスの生成を制御できません。ポイント2の場合は、購読での購読は避けてください! (ここで何の欠点がありますか)もう一度ポイント1、それは削除することができますが、それは同時性の問題を引き起こしており、したがって要素の欠如を引き起こしている合意? – MKMohanty

+0

実際の実装では、デバッグ用のSnoopはありません:)上記の理由によりヌルチェックがある – MKMohanty

+0

サブスクリプション内でサブスクリプションを行うと、フロアにサブスクリプションをドロップする可能性が高くなります。これはメモリリークを意味します。エラー処理の追加層を持たない可能性も高くなります。代わりに、クエリでフローを作成します。 –

関連する問題