2017-10-25 7 views
1

私はrabbitMQのイベントのストリームを
Event: {id, type, timestamp}のように表示しています。
値は次のとおりです。
id:いくつかのユニークな文字列
type:(a)のrrive /(d)は、私が(出発予定イベントに私が到着したイベントと一致するイベントの新しいストリームを生成したい開始イベントと終了イベントを無限ストリームからどのように関連付けるのですか?

をepart連続する)。

id | type | time 
1 | a | 0 
1 | d | 1 
2 | a | 2 
3 | a | 3 
3 | d | 4 
1 | a | 5 
2 | d | 6 
1 | d | 7 

はIタイプCorrelated:{id, duration}の新しいストリーム生成する:同じIDを持つイベントがイベントのストリーム所与例えばので を表示することができdurationは、2つの相関イベントからのタイムスタンプの差がある を

id | duration 
{1, 1} 
{3, 1} 
{2, 4} 
{1, 1} 

受信ストリームをIDでグループ化できましたが、イベントの関連付けに関するドキュメントを見つけられませんでした。私はこのように、期間を計算するために、groupBy後、あなただけの逸脱を考慮し、連続したイベントを組み合わせることfilterpairwiseを使用してmapでき到着は出発の前にあると仮定するとRxJS

+0

だから 'length'は本当に' duration'ですか?すなわち、到着時刻と出発時刻との間の差異である。 – cartant

+0

はい、私はこの点を明確にするために質問を編集しました。 –

+0

特定のIDに対して複数の到着と出発がある場合は、実際にそのIDをサンプルデータに含めることで、それを明確にする必要があります。 – cartant

答えて

1

を使用しています:

const source = Rx.Observable.of(
 
    { id: 1, type: "a", time: 0 }, 
 
    { id: 1, type: "d", time: 1 }, 
 
    { id: 2, type: "a", time: 2 }, 
 
    { id: 3, type: "a", time: 3 }, 
 
    { id: 3, type: "d", time: 4 }, 
 
    { id: 1, type: "a", time: 5 }, 
 
    { id: 2, type: "d", time: 6 }, 
 
    { id: 1, type: "d", time: 7 }, 
 
    Rx.Scheduler.async 
 
); 
 

 
const grouped = source 
 
    .groupBy(event => event.id) 
 
    .mergeMap(group => group 
 
    .pairwise() 
 
    .filter(([, last]) => last.type === "d") 
 
    .map(([prev, last]) => ({ 
 
     id: last.id, 
 
     duration: last.time - prev.time 
 
    })) 
 
); 
 

 
grouped.subscribe(value => console.log(value));
.as-console-wrapper { max-height: 100% !important; top: 0; }
<script src="https://unpkg.com/[email protected]/bundles/Rx.min.js"></script>

+0

到着は出発に先行しますが、各到着/出発は到着/出発することはありません。つまり、各到着とそのIDの次の出発時刻とを関連付ける必要があります。私はこれを行って、どこに行くのかを見てみよう。 –

+0

答えを更新して元に戻した。 – cartant

+0

これは素晴らしいです、あなたの助けをありがとう –

関連する問題