2017-10-08 4 views
0

2つのストリームの重複アイテムを取得する必要があります。私はほとんどそれをやっていたと思うが、第二の流れの複製であるそれらの項目が順番に行く場合にのみ、例の場合:RxJs - 2つのオブザーバブルの重複アイテムを取得

これは動作します:

first = Observable.of(1, 2, 3) 
second = Observable.of(2, 3, 1) 

しかし、これはしていません:

first = Observable.of(1, 4, 3) 
second = Observable.of(1, 2, 3) 

私のループが4になると、それが壊れる:

EmptyError {name: "EmptyError", stack: "EmptyError: no elements in sequence↵ at new Emp…e (http://localhost:4200/vendor.bundle.js:161:22)", message: "no elements in sequence"}

全体の私コードは1つの機能に含まれているため、コピー/貼り付けしてテストすることができます:

findDublicates() { 

    let match = 0; // setting it to 0, so later could assign other number 
    let keys = []; // list of maching keys 
    let elementAt = 0; // index of item of first observable   

    let allKeys$; 
    let validKeys$; 

    // counting the length of both observables, so this will be the number of loops 
    // that checks for dublicates 
    let allKeysLength; 
    let validKeysLength; 
    let allKeysLength$ = Observable.of(2, 1, 4, 5, 7).count() 
    allKeysLength$.subscribe(val => allKeysLength = val) 
    let validKeysLength$ = Observable.of(1, 2, 3, 8, 5).count() 
    validKeysLength$.subscribe(val => validKeysLength = val) 

    let cycles = Math.min(allKeysLength,validKeysLength); // length of the shorter observable    

    // wrapping it in a function so when called variables will take new values 
    function defineObs() { 

    allKeys$ = Observable.of(2, 1, 4, 5, 7) 
     .elementAt(elementAt).take(1); 

    validKeys$ = Observable.of(1, 2, 3, 8, 5) 
     .filter((x) => (x === match)).first(); 
    } 

    for (var i=0; i<=cycles; i++) { 

    defineObs(); 

    allKeys$.subscribe(
     function (val) { match = val }, 
     function (err) { console.log(err) }, 
     function() { console.log('Done filter')} 
    ); 
    validKeys$.subscribe(
     function (val) { keys.push(val) }, 
     function (err) { console.log(err) }, 
     function() { console.log('Done push')} 
    ); 

    elementAt += 1; 
    cycles -= 1; 

    } 

    return console.log(keys); 

} 

ありがとうございました。

+2

なぜこれをすべて観測値を使って扱っていないのですか?オブザーバブルでうまく動作しない別のパラダイムを適用しようとしています。 – Everest

+0

これは観測可能な場合にのみ行われるかもしれないと私は思うが、私はRPの初心者であるので、どうすればいいのか分からない –

+0

2ストリームの複製を得る方法を自由に教えてください:) –

答えて

3

、あなたはちょうどそれらをマージし、単一のストリーム上で重複する値を見つけることとして扱うことがあります。

first.merge(second) 
    .scan(([ dupes, uniques ], next) => 
     [ uniques.has(next) ? dupes.add(next) : dupes, uniques.add(next) ], 
     [ new Set(), new Set() ] 
    ) 
    .map(([ dupes ]) => dupes) 

注:セットscanの未定義の振る舞いを避けるために、上記は不変です。

+0

これは素晴らしいです!私が見ているように、RPと関数型プログラミングは本当によく分かります。良くやった。私はそれが何であるか完全に理解したときに受け入れます:D –

+0

@ J.D。私は要点を整理することができます:両方のストリームからのすべてのアイテムを1つに統合した後、すべてのアイテム(「ユニーク」)からセットを作成し、前のアイテムが見られた場合にのみ追加します。 'scan'はスニペットの中で最も密度の高い部分なので、[docs](http://reactivex.io/documentation/operators/scan.html)をお勧めします。 'scan'は両方のSetのすべての中間値を与えるので、' map'は 'dupes'だけを抽出します。 – concat

+0

@ J.D。演算子と連鎖は、ビヘイビアを明瞭かつ無意味にするように設計されています。通常、演算子をメモリにコミットするのに支払う、最も単純で最も堅牢なアプローチがあります。それ以外の場合は、必要なものを正確に固定することが通常は最大の努力です。ハッピーRx学習! – concat

0

私はObservable.combineLatestscanメソッドを観察可能なシーケンスでチェックします。ここで

は私がcombineLatestを使用して2つの観測を組み合わせ、考え、その上scan演算子を適用するんですよ。一意性を保証するためにSet、さらにはmapfilterを使用することもできます。あなたは、ストリームは、重複のセットの最初の値を放出する気にしないのであれば

+0

完全な答えを投稿してください - 私のすべてのコードは私の質問にあります –

関連する問題