2017-04-17 4 views
2

私は、観測可能な3つのイベントストリーム(日付順)(主なイベント、顧客に関するイベント、販売に関するイベント、顧客に関するイベント)各イベントにはvehicleIDとその他のさまざまなプロパティがあります。イベントは1台の車両に表示され、次に別の車両などに表示される可能性があります。基本的に、VehicleIDに基づいて3つの独立したイベントストリームを相互に関連付けるようにしています。私はどのような形式の複雑な観測可能なプログラミングでも新しく、これはむしろ難しいことを証明しています。CombineLatestを使用して複数のグループの3つのイベントストリームを関連付ける

私はストリームのいずれかで車両の新しいイベントが表示されるといつでも関数を呼び出す必要があります(私は基本的にはcombineLatestと思います)。 1つの車両のイベントだけを含むように各ストリームをフィルタリングすると、私はそれを行うことができます。Whereですが、どのようにしてGroupByを把握してから各グループの最新情報を取得することはできません。私はストリームをマージしようとしていますが、各グループの車両にcombineLatestを組み合わせようとしています。

以下は、作成したいすべてのオブジェクトを、VehcileID = 1の場合にのみ印刷します。私は以下のことをしたいが、すべての車両にしたい。これをすべてのVehcileIDで繰り返した場合、これは私が望む出力を得ることができました。しかし、それはヒップオブザーバブルのようには見えません。すべてが私が目指すべき禅のストリーム状態です。

Observable.CombineLatest(mainEvents.Where(a=>a.VehcileID==1),saleEventsGroup.Where(a=>a.VehcileID==1),customerEventsGroup.Where(a=>a.VehcileID==1),(main, sale, customer)=>{ 
     //Basically flattening various properties from latest state of the 3 streams for current vehicle with some mapping 
     return ComplexObject(){};})  
     .Subscribe(Console.WriteLine); 

各車両の最新のイベントをどのように組み合わせることができますか?

アドバイスありがとうございます

答えて

2

どうやってですか?私はここで2つのストリームだけをしていますが、そのアイデアは3つに簡単に拡大できます。

[TestMethod] 
    public void GroupByWithMultipleStreams() 
    { 
     Subject<Notification> producer = new Subject<Notification>(); 
     Subject<RelatedToNotification> otherThingProducer = new Subject<RelatedToNotification>();    

     Observable.Merge(
      producer.Select(n => new { Id = n.Id, notification = n, relatedNotification = (RelatedToNotification)null }), 
      otherThingProducer.Select(rn => new { Id = rn.NotificationId, notification = (Notification)null, relatedNotification = rn })) 
      .GroupBy(x => x.Id) 
      .SelectMany(obs => 
      { 
       return obs.Scan(new ComplexObject() { Id = obs.Key }, (acc, input) => 
       { 
        acc.Notification = input.notification ?? acc.Notification; 
        acc.Related = input.relatedNotification ?? acc.Related; 
        return acc; 
       }); 
      }) 
      .Where(result => result.Notification != null && result.Related != null) // if you only want it to fire when everything has a value 
      .Subscribe(result => 
      { 
       //do something with the results here 
      } 
      ); 

     producer.OnNext(new Notification() { Id = 1, Version = 1 }); 
     producer.OnNext(new Notification() { Id = 1, Version = 2 }); 
     producer.OnNext(new Notification() { Id = 2, Version = 17 }); 
     producer.OnNext(new Notification() { Id = 1, Version = 3 }); 
     producer.OnNext(new Notification() { Id = 9, Version = 0 }); 
     producer.OnNext(new Notification() { Id = 9, Version = 1 }); 
     otherThingProducer.OnNext(new RelatedToNotification() { NotificationId = 2, SomeData = "2data" }); 
     otherThingProducer.OnNext(new RelatedToNotification() { NotificationId = 2, SomeData = "2data1" }); 
     otherThingProducer.OnNext(new RelatedToNotification() { NotificationId = 9, SomeData = "9Data" }); 
     producer.OnNext(new Notification() { Id = 2, Version = 1 }); 

    } 

    class ComplexObject 
    { 
     public int Id { get; set; } 
     public Notification Notification { get; set; } 
     public RelatedToNotification Related { get; set; } 
    } 

    class Notification 
    { 
     public int Id { get; set; } 
     public int Version { get; set; } 

     public string Name { get; set; } 
    } 

    public class RelatedToNotification 
    { 
     public int NotificationId { get; set; } 
     public string SomeData { get; set; } 
    } 
+1

すべてに共通の基本クラスがありましたので、これはずっと簡単です。それは合併であり、私は考えていないだろう。本当に感謝! – GraemeMiller

関連する問題