2017-01-07 12 views
1

を完了するために連鎖観測待ち - 私はシンプルな観測鎖を有していて、Rx.Observable.intervalで開始:いくつかのトラブル私はRxJS5で何をしたい達成を持つ

const Rx = require('rxjs'); 

var i = 0; 

const obs = Rx.Observable.interval(100) 
    .flatMap(function() { 
     return Rx.Observable.timer(Math.ceil(500*Math.random())) 
      .map(function(val){ 
       console.log(' => These should all log first => ', val); 
       return i++; 
      }); 
    }) 
    .take(5) 
    .merge() // this doesn't seem to do what I want to do 
    .map(function (val) { 
     console.log('all done = > ', val); 
    }); 

obs.subscribe(); 

上記ログ本:

=> These should all log first => 0 
all done = > 0 
=> These should all log first => 0 
all done = > 1 
=> These should all log first => 0 
all done = > 2 
=> These should all log first => 0 
all done = > 3 
=> These should all log first => 0 
all done = > 4 

=> These should all log first => 0 
=> These should all log first => 0 
=> These should all log first => 0 
=> These should all log first => 0 
=> These should all log first => 0 

all done = > [0,1,2,3,4] 

それは、私たちはすべてのティムを待っていないことは明らかだ:私はこれを記録しているよ

あなたが "すべて完了しました!"何度もログに記録され、「これはすべて最初にログする必要があります」と散在しています。

私が探している出力を得るにはどうすればよいですか?

通常、これにはzipを使用できますが、zipのAPIは、同時に1つの場所にすべてのタイマーオブザーバブルが存在しないため、この使用例には当てはまりません。このこと

const async = require('async'); 
var i = 0; 

async.forever(function(cb){ 

    process.nextTick(function(){ 
     console.log('These should all log first'); 
     const err = i++ === 5; 
     cb(err, i); 
    }); 

}, function done(err, results){ 
    // let's pretend results contains all the i values 
    console.log('all done'); 
}); 
+0

我々はマージのためtakeLastを()に置き換えた場合ことに注意してください()、我々は、すべての後に「すべてが終わっ」ログに記録する「正しい」である「これらのすべての最初のログインする必要があります」が、その後、記録され、「すべて完了します」 50倍!私は "すべて完了"を一度だけ記録したいと思っています。 –

+0

'take'はそれらをバッチしません。十分に見えたらストリームを終了させます。そして、単一のストリーム上の 'merge'は何もしません。例えばを見てください。 http://rxmarbles.com/#mergeでは、何が起きているのかを説明しています。 – jonrsharpe

+0

ええ、あなたはマッピングしているので、要素があるときと同じくらい何度もログに記録すると "すべて完了しました"と思いませんか?このコードに基づいて、私は実際にあなたが "取る"何回も地図コンソールログトリガーを見ることを期待しています。私はあなたのマージの使用は、おそらくあなたが思っているものをマージするものではないので、おそらくあなたが期待していることをしていないと思う(そこに座ってすべての "取られた"オブジェクトを待ってから、 )。 –

答えて

0

注:私たちは任意に終了し、我々はすべての結果を収集するまで

私の質問は十分に明確ではなかった場合は、ここで私が何をしたいのアナログであり、我々はすべてのコールバックをブロック基本的に私が見て期待していたことをログ順序を生成するが、私は、これはそれを行うには、右/最良の方法だとは思わない:

const {Observable} = require('rxjs'); 

    const obs = Observable.interval(100) 
     .flatMap(function() { 
      return Observable.timer(Math.ceil(500*Math.random())) 
       .map(function(val){ 
        console.log(' => These should all log first => ', val); 
       }); 
     }) 
     .take(5) 
     .takeLast() // <<<<<<<<<< 
     .map(function() { 
      console.log('all done'); 
     }) 
     .take(1) // <<<<<<<<<< 

    obs.subscribe(); 

私はこれを達成するためのより良い方法がまだあると思います。

+2

あなたは 'take(50)'と 'subscribe'の間のすべてを' last(function(){console.log( 'all done');}) 'と置き換えたり、 。 – jonrsharpe

+0

'flatMap'を' concatMap'に置き換えます(無作為タイマーが完了して値を返す順序は関係ありません)、 'takeLast'を' toArray'に置き換えて 'take(1)'を削除してください –

0

@ jonsharpeが私にこの答えをくれました。これは基本的に動作します。問題は実際にはRx.Observable.intervalに蒸留することができます。Rx.Observable.timerマッピングを取り除くことができます。ここに私たちの基本的な答えがあります:

const Rx = require('rxjs'); 

const obs = Rx.Observable.interval(100) 
    .take(5) 
    .map(function(v){ 
     console.log(v); 
     return v; 
    }) 
    .reduce(function (prev, curr) { 
     return prev.concat(curr); 
    },[]) 
    .last(function (results) { 
     return results; 
    }) 
    .map(function(v){ 
     console.log(v); 
    }); 


obs.subscribe(); 

私は減らすせずにこのを行う方法では、しかし、非常に興味があると思います。

+0

'reduce'が必要ですすべての値を配列に集める。私はあなたがなぜ「最後」を必要とするのかわかりませんが、とにかく結果を1つ減らします。空の 'subscribe'を使って' map'の代わりに 'subscribe'で' all done'を呼び出すことも意味的に優れています。 –

+0

@jonrsharpeあなたは正しいかもしれませんが、それはreduceの意味に反していません:) –

+0

これはOPが望んでいないものであっても、あなたは "reduce"と "last"の意味が面白いと言うようにこれは興味深いコードです。私はRxの進化した理解を助けるためにそれを捉えました。 –

0

ので、要件は次のとおりです。

  • がトップレベル区間の実行N倍にしてみましょう。
  • 各インターバルは、Yタイマオブザーバブルの配列を返す必要があります。
  • 次に、Y個のオブザーバブルの配列をN回トレースする必要があります。

これはそうです。ちょっと素朴ですが、うまくいきます。

let timerArrLength = [ 4, 2, 3 ]; 
    let svc = Rx.Observable.interval(1000) 
     .take(timerArrLength.length) 
     .map(function (index) { 
      let arr = []; 
      for (let i = 0; i < timerArrLength [ index ]; i++) { 
       arr.push (Rx.Observable.timer (1000)); 
      } 
      return arr; 
     }); 


     svc.subscribe(
      function onNext(v){ 
       console.log('=> v =>',v); 
      }, 
      function onError(e){ 
       console.error(e); 
      }, 
      function onComplete(){ 
       console.log('complete'); 
      } 
     ); 
+0

私はこれを試してみましょう:) –

+0

注意最初のlet varを削除しました。あなたはそれを必要とせず、配列の長さだけを使用します。 –

+0

ちょっとティム、私がログアウトするのは、一連のオブザーバブルです。すべての値を1つの最終コールバックでアンラップできるかどうかを確認してください。蒸留の目的のために、ちょうどタイマーを取り除き、ちょうど間隔を使用してください。 –

関連する問題