2017-01-04 2 views
0

私はこれらの2つのメソッドをキューに入れています。私はバックプレッシャーの何らかの形を実装しました。これにより、メソッドから作成されたオブザーバブルは、ユーザーがコールバックを発生させた場合にのみ、オブザーバブルを介してイベントを発生させます。問題は、メインのサブスクライバで起動するonCompletedハンドラをdrain()にすることができないことです。私が驚いているのは、onNextが同じ加入者に対して発射するということです。なぜ、火事で火事が発生しないのですか?私は絶対的な狂気を駆動している何RxJS5 =>サブスクライバのonCompletedコールバックが起動しない

Queue.prototype.isEmpty = function (obs) { 

    if (!obs) { 
     // this is just a dummy observable 
     // I wish Rx had Rx.Observable.dummy() alongside 
     // Rx.Observable.empty(), but oh well 
     obs = Rx.Observable.of('dummy'); 
    } 

    return this.init() 
     .flatMap(() => { 
      return obs; // when you call obs.next(), it should fire this chain again 
     }) 
     .flatMap(() => { 
      return acquireLock(this) 
       .flatMap(obj => { 
        return acquireLockRetry(this, obj) 
       }) 
     }) 
     .flatMap(obj => { 
      return findFirstLine(this) 
       .flatMap(l => { 
        return releaseLock(this, obj.id) 
         .map(() => { 
          console.log(' => LLLL1 => ', l); 
          return l; 
         }); 
       }); 
     }) 
     .filter(l => { 
      // filter out any lines => only fire event if there is no line 

      return !l; 
     }) 
     .map(() => { 
      // the queue is now empty 
      obs.complete(); // <<<<<<<<<< note this call 
      return {isEmpty: true} 
     }); 


}; 


Queue.prototype.drain = function (obs, opts) { 

    opts = opts || {}; 

    const isConnect = opts.isConnect || false; 
    const delay = opts.delay || 500; 

    let $obs = obs.takeUntil(this.isEmpty(obs)) 
     .flatMap(() => { 
      return this.init(); 
     }) 
     .flatMap(() => { 
      return acquireLock(this) 
       .flatMap(obj => { 
        return acquireLockRetry(this, obj) 
       }); 
     }) 
     .flatMap(obj => { 
      return removeOneLine(this) 
       .flatMap(l => { 
        return releaseLock(this, obj.id) 
         .map(() => l); 
       }); 
     }); 


    process.nextTick(function(){ 
     obs.next('foo foo foo'); 
     $obs.next('bar bar bar'); 
     $obs.complete(); 
    }); 


    return $obs; 

}; 

...加入者でonCompletedハンドラが起動でしょうtakeUntilコールと重い利きの$ obs.complete()の間ということは、私はonCompletedを得ることができないということであると思うだろう私は上記を呼び出すと、私はこれを取得

const q = new Queue(); 

const obs = new Rx.Subject(); 

q.drain(obs).subscribe(

    function (v) { 

     console.log('end result => ', colors.yellow(util.inspect(v))); 

     setTimeout(function() { 
      // the following call serves as the callback which will fire the observables in the methods again 
      obs.next(); 
     }, 100); 

    }, 
    function (e) { 
     console.log('on error => ', e); 
    }, 
    function (c) { 
     // this never gets called and it is driving me f*cking crazy 
     console.log(colors.red(' DRAIN on completed => '), c); 
    } 

); 

obs.subscribe(
    function (v) { 
     console.log('next item that was drained => ', v); 
    }, 
    function (e) { 
     console.log('on error => ', e); 
    }, 
    function (c) { 
     // this gets called! 
     console.log(colors.red(' => obs on completed => '), c); 
    } 
); 

::私は、Dので

next item that was drained => foo foo foo 
next item that was drained => bar bar bar 
=> obs on completed => undefined 

私はちょうどそれらの3行を取得する理由は、私は上記のようにのように呼び出したときのコールバックは、発射しますOこの:

process.nextTick(function(){ 
    obs.next('foo foo foo'); 
    $obs.next('bar bar bar'); 
    $obs.complete(); 
}); 

しかしなぜが明示的にこのコールバック$obs.complete();火を呼び出していないでしょう。

function (c) { 
      // this never gets called and it is driving me f*cking crazy 
      console.log(colors.red(' DRAIN on completed => '), c); 
     } 

答えて

0

さてさて、私はこれを考え出したと思うが、これはRxJSクレイジーなものをライブラリ

、あなたはおそらくテイク()またはtakeUntil()または同様のを使用する必要があります右

だから、物事を行うには、最も可能性の高いIこれをしました:

Queue.prototype.drain = function (obs, opts) { 

    if (!(obs instanceof Rx.Observable)) { 
     opts = obs || {}; 
     obs = new Rx.Subject(); 
    } 
    else { 
     opts = opts || {}; 
    } 


    const isConnect = opts.isConnect || false; 
    const delay = opts.delay || 500; 

    process.nextTick(function() { 
     obs.next(); 
    }); 


    let $obs = obs 
     .flatMap(() => { 
      return this.init(); 
     }) 
     .flatMap(() => { 
      return acquireLock(this) 
       .flatMap(obj => { 
        return acquireLockRetry(this, obj) 
       }); 
     }) 
     .flatMap(obj => { 
      return removeOneLine(this) 
       .flatMap(l => { 
        return releaseLock(this, obj.id) 
         .map(() => ({data: l, cb: obs.next.bind(obs)})); 
       }); 
     }) 
     // here is the key part! 
     .takeUntil(this.isEmpty(obs)); 


    return $obs; 

}; 

トリックをしたようです。私はしばらくは絶望的でした。この仕組みの詳細な説明が必要な場合は、お問い合わせください。

関連する問題