2016-10-06 7 views
0

私は車のオークションのデータストリームを持っています。各車オークションにはn個のレーンがあります。私は各車両のオークションを記録したい。RxJSストリームを動的にルーティングする方法

ストリームは次のようになります...

- {レーン:1、アクション:入札} --- {レーン:2、アクション:起動} --- {レーン:1、アクション: {レーン:2、アクション:入札} --- {レーン:1、アクション:売却} ---

私は各オークションレーンをバッファリングし、

const bufferOpen$= auctionWebSocketStream$ 
    .filter(stream => stream.tag === 'CURITEM'); 

const bufferClose$ =() => auctionWebSocketStream$.filter(stream => stream.tag === 'SOLD'); 

auctionWebSocketStream$ 
    .bufferToggle(bufferOpen$, bufferClose$) 
    .subscribe(x => console.log(x)); 

オークションとレーンが1つある限り、上記はうまくいきます。複数のレーンについては、複数のレーンについての入札/販売情報があります。

ストリームをレーンでバッファに集約するにはどうすればよいですか?同様のソリューションは、常に集約パラメータを知っていました。しかし、新しいレーンがあるときはいつでもストリームを分割する必要があります。

お願いします。

UPDATE

私は私の不満やcluelessnessを披露するJSBinを作りました。これは、入力ストリームの例を示し、望ましい出力を説明します。

http://jsbin.com/tuxitev/edit?js,console

(ボーナスポイントの場合、それだけでバベルの下に空の配列を示している。未活字体が必要とされる理由を確認してください)

+1

ちょうどGROUPBYにつまずいた...私は一瞬、「すべての人の時間を無駄にして申し訳ありません」を持っていることを約だと思います。 –

+0

固定車線数はありますか?新しいオークションで土地を再開することはできますか? – Meir

+0

固定されておらず、再度開くことができます。例えば。オークション1、レーンA、B、C ---オークション2、レーンA、B ---オークション1、レーンD ---オークション1、レーンA完了---オークション3、レーンA –

答えて

0

RxJSの質問が私に教えてください、答えを取得する場所を誰かが知っている場合。私は答えを受け入れるでしょう。これは私が持っていた3番目に未解決のRxJSの質問です。

答えが不思議な人にとっては、ここにあります。

stream$ 
.groupBy(stream => stream.lane) 
.mergeMap(stream => 
     stream.scan((acc, cur) => { 
       if (cur.action === 'start') { 
        acc = []; 
       } 
       acc.push(cur) 
       return acc; 
     }, []) 
     .filter(stream => stream[stream.length-1].action === 'sold') 
) 
.subscribe(
    x => console.log(x), 
    (e) => console.error(e), 
    () => console.log('complete') 
) 

http://jsbin.com/tuxitev/edit?js,console

関連する問題