2017-12-18 26 views
4

警告:ここにRxJS newbがあります。ここでRxJS:複数のネストされたオブザーバをバッファと組み合わせる方法

は私の課題です:

  1. onUnlink$観測可能発する...
  2. は、すぐに(私はこのパーティションonAddBuffer$と呼ぶことにします)1秒の最大のために、観測可能onAdd$から値をキャプチャを開始すると。
  3. onAddBuffer$に観察試合doc$値からの値のいずれかが、我々は
  4. onAdd$値のいずれかにマッチするために使用しますモデルをフェッチするクエリ(観測可能doc$を作成する)データベース、
  5. を放出しないでください
  6. 観測可能onAddBuffer$からの値はいずれもdoc$値と一致していない、または、観察発することはありませんonAddBuffer$場合、これは私の最高の推測だった

doc$値に放出した場合:

// for starters, concatMap doesn't seem right -- I want a whole new stream 
const docsToRemove$ = onUnlink$.concatMap(unlinkValue => { 

    const doc$ = Rx.Observable.fromPromise(db.File.findOne({ unlinkValue })) 

    const onAddBuffer$ = onAdd$ 
    .buffer(doc$) // capture events while fetching from db -- not sure about this 
    .takeUntil(Rx.Observable.timer(1000)); 

    // if there is a match, emit nothing. otherwise wait 1 second and emit doc 
    return doc$.switchMap(doc => 
    Rx.Observable.race( 
     onAddBuffer$.single(added => doc.attr === added.attr).mapTo(Rx.Observable.empty()), 
     Rx.Observable.timer(1000).mapTo(doc) 
    ) 
); 
}); 

docsToRemove$.subscribe(doc => { 
    // should only ever be invoked (with doc -- the doc$ value) 1 second 
    // after `onUnlink$` emits, when there are no matching `onAdd$` 
    // values within that 1 second window. 
}) 

これは、常にEmptyObservableを発します。たぶんそれはsingleは一致がない場合undefinedを放出するように見えるので、一致がないときは全く発光しないと思っていますか? findでも同じことが起こります。

singlefilterに変更すると、何も出力されません。

FYI:これはファイルシステムイベントと名前変更シナリオがある - addイベントがunlinkイベントの1秒以内に次の放出されたファイルは、試合をハッシュし、それはrenameだから何もしない場合。そうでない場合は、それは真のunlinkであり、削除されるべきデータベースdocを放出するはずです。

+0

あなたはここでかなり厄介な競争状態を構築しているようですね。タイムアウトは通常これに対処する良い方法ではありません。何らかの理由で何かが長くかかると、データが失われます。 –

+1

はいここに競争条件がある可能性は間違いありません。最終的にはこのアプローチを打破するかもしれない。rxjsを学ぶ良い機会のように思えました。 – glortho

答えて

3

これは、あなたがこれを行うことができますどのように私の推測です:

onUnlink$.concatMap(unlinkValue => { 
    const doc$ = Rx.Observable.fromPromise(db.File.findOne({ unlinkValue })).share(); 
    const bufferDuration$ = Rx.Observable.race(Rx.Observable.timer(1000), doc$); 
    const onAddBuffer$ = onAdd$.buffer(bufferDuration$); 

    return Observable.forkJoin(onAddBuffer$, doc$) 
    .map(([buffer, docResponse]) => { /* whatever logic you need here */ }); 
}); 

それだけ観察可能でを完了(または放出源の後に述語関数に一致する項目を発するのでsingle()オペレータは少しトリッキーです2つの項目がある場合、または一致する項目がない場合はエラーです)。

race()もトリッキーです。ソースObservablesのいずれかが完了し、値を放出しない場合、race()は完了し、何も出力しません。私は以前これを報告しましたが、これは正しい動作です。https://github.com/ReactiveX/rxjs/issues/2641を参照してください。
これはあなたのコードで間違っていたと思います。

また、.mapTo(Rx.Observable.empty())は、各値をObservableのインスタンスにマップすることに注意してください。すべての値を無視する場合は、filter(() => false)またはignoreElements()の演算子を使用できます。

関連する問題