2017-08-09 2 views
3

私はリアクティブプログラミングに慣れていて、 "すべてがストリーム"というマントラを使用するのが難しいです。私は、次のシナリオを検討している - 私はこのようなdefiniedのWebSocketイベントのストリームを持っている:反応的なストリームでイベントをキャンセルするにはどうすればよいですか?

Rx.Observable.create((observer) => { 

    io.on('connect', function(socket){ 

     socket.on("enroll", function(player) { 
     observer.next({ 
      event: 'enroll', 
      player, 
      socket 
     }); 
     }); 

     socket.on('resign', function(player){ 
     observer.next({ 
      event: 'resign', 
      player, 
      socket 
     }); 
     }); 

    }); 

    return { 
     dispose: io.close 
    }; 
    }); 

その後、私は

enrollmentStream = events$ 
     .filter(find({ event: "enroll" })) 
     .map(pick('player')); 

と同様に

resignationStream = events$ 
     .filter(find({ event: "resign" })) 
     .map(pick('player')); 

ような何かを行うことができます私は登録された選手を4人でバッチ処理するストリームに集めたいが、明らかにこれは登録中のユーザのためだけに行うべきであるストリームが、辞表には含まれていないか、少なくとも最後のイベントは登録されています。これはどうすればいいですか?

ここに大理石の図があります。

Marble diagram

登録5人の選手があります。 4人のプレーヤーが登録されているとゲームが始まります。 2人目のプレイヤー(バイオレット1)は入会した後辞任するので、青い大理石で始まるのではなく、次の黄色でゲームが始まるので、その後は本当に4人のプレイヤーがいる。

のようなストリーム操作がある必要がありますか? ...がありますか?

+2

私はあなたが必要としているかどうかはわかりませんが、処理する必要のある一連のアクションの大理石またはシーケンス図を追加できますか? – paulpdaniels

+0

更新された質問をご覧ください。 – kboom

答えて

2

私は、このシナリオでは、あなたがcombineLatest()scan()演算子を使用して、unresigned選手のリストを自分で作ることができると思う:

const bufferedEnrollment = enrollmentStream.scan((acc, val) => { acc.push(val); return acc; }, []); 
const bufferedResignation = enrollmentStream.scan((acc, val) => { acc.push(val); return acc; }, []); 

Observable.combineLatest(bufferedEnrollment, bufferedResignation) 
    .map(values => { 
    const enrolled = values[0]; 
    const resigned = values[1]; 

    // remove resigned players from `enrolled` array 
    return enrolled; 
    }) 
    .filter(players => players.length === 4) 
    .subscribe(...) 

scan()オペレータのみが配列に選手を収集するために使用されます。たとえば、配列をリセットできるようにしたい場合、別のObservableとマージすることができます。

enrollmentStream 
    .merge(resetStream) 
    .scan((acc, val) => { 
    if (!val) { 
     return []; 
    } 
    acc.push(val); 
    return acc; 
    }, []); 

(このコードはテストしていません)。

関連する問題