2017-11-30 61 views
0

が、私はこのように観察を返すサービスがあります。別の場所に今角度:Observableをデバウンスする方法?私のアプリで

public genericService(params) { 
    //Do some stuff 
    //... 

    return this.http.post('http://foo.com', params) 
     .map((response) => { 
      //Do some generic stuff 
      //... 

      return someData; 
     }) 
     .catch((error: any) => { 
      //Manage error in a generic way + do some generic stuff 
      //... 

      return Observable.throw(error); 
     }); 
} 

let debouncePointer = debounceObservable(genericService, 200); 

public genericServiceDebounce(params) { 
    return debouncePointer(params); 
} 

を、私はこの

genericServiceDebounce(params) 
    .subscribe((response) => { 
     //Do some non-generic stuff 
    }, (error) => { 
     //Manage error in a non-generic way + do some non-generic stuff 
    }); 

ように私の関数を呼び出すしたいと思います。しかし、私は」なかったのdebounceObservable()関数の実装に成功しました。

debounceObservable(callback, delay, immediate?) { 
    let timeout; 
    return function() { 
     let context = this, args = arguments; 

     return Observable.create((observer) => { 
      let later = function() { 
       timeout = null; 

       if(!immediate) { 
        observer.next(callback.apply(context, args)); 
        //observer.onCompleted(); // don't know if this is needed 
       } 
      }; 
      let callNow = immediate && !timeout; 
      clearTimeout(timeout); 
      timeout = setTimeout(later, delay); 

      if(callNow) { 
       observer.next(callback.apply(context, args)); 
       //observer.onCompleted(); // don't know if this is needed 
      } 
     }); 
    } 
} 

しかし、予想通り、これは動作しません:

は私が約束相当(https://github.com/moszeed/es6-promise-debounce/blob/master/src/es6-promise-debounce.js)に基づいて、この実装を試してみました。

genericServiceDebounce().subscribe(obs => { 
    obs.subscribe(response => { 

    }) 
}) 
:あなたが呼び出す必要がありますを意味し、

genericServiceDebounce().then(response => { 

}) 

observer.next戻って、観測を使用して(anotherObservable)が埋め込まれて、観察を返す: 解決(anotherPromise)を返す、約束を使用して、あなたが呼び出すことができます

どのようにdebounceObservable()関数を実装しますか?(方法のような約束で)

解明1:私はObservable.debounce()関数を発見したが、これは、観察者としない観察自体をデバウンス。私は観測可能性をデバウンスしたい

明確化2:私はシングルトンであり、複数の発信者であるため、サービス側にデバウンスを配置しました。私がそれを発信者の側に置いた場合、各発信者ごとに異なるデバウンスタイマーが存在します。

編集:ここで私の問題を説明しようとしているスニペットです。異なるボタンをクリックするだけで、さまざまな動作を見ることができます(jsコードのコメントの詳細)。

Observable.debounceは、RxJsの.debounce()がどのように動作するかを示しています。 '3'だけを出力しますが、 '1'、 '2'、 '3'を入力します。

Observable.debounce x3は、コード全体をデバウンスでラッピングせずに3回コールするとどうなるかを示しています。

観測可能なラッピングx3は、私が入手したいものを示しています。私の全機能はラップされていますが、コードを見れば、購読側のパートは慎重です。

プロミスx3は、プロミスを使用するとどのくらい簡単かを示します。

let log = (logValue) => { 
 
    const list = document.querySelector('#logs'); 
 
    const li = document.createElement('li'); 
 
    li.innerHTML = logValue; 
 
    list.appendChild(li); 
 
} 
 

 
/* ************************ */ 
 
/* WITH OBSERVABLE.DEBOUNCE */ 
 
/* ************************ */ 
 

 
let doStuffObservable =() => { 
 
    Rx.Observable.create((observer) => { 
 
     log('this should be called only one time (observable.debounce)'); 
 
     setTimeout(() => { 
 
      observer.next('observable.debounce 1'); 
 
      observer.next('observable.debounce 2'); 
 
      observer.next('observable.debounce 3'); 
 
     }, 1000); 
 
    }) 
 
     .debounce(500) 
 
     .subscribe((response) => { 
 
      log(response); 
 
     }, (error) => { 
 
      log(error); 
 
     }); 
 
} 
 

 
/* *********************************** */ 
 
/* WITH OBSERVABLE WRAPPED IN DEBOUNCE */ 
 
/* *********************************** */ 
 

 
let doStuffObservable2 = (param) => { 
 
    return Rx.Observable.create((observer) => { 
 
     log('this should be called only one time (observable wrapped)'); 
 
     setTimeout(() => { 
 
      observer.next('observable wrapped ' + param); 
 
     }, 1000); 
 
    }) 
 
} 
 

 
let debounceObservable = (callback, delay, immediate) => { 
 
    let timeout; 
 
    return function() { 
 
     let context = this, args = arguments; 
 

 
     return Rx.Observable.create((observer) => { 
 
      let later = function() { 
 
       timeout = null; 
 

 
       if(!immediate) { 
 
        observer.next(callback.apply(context, args)); 
 
       } 
 
      }; 
 
      let callNow = immediate && !timeout; 
 
      clearTimeout(timeout); 
 
      timeout = setTimeout(later, delay); 
 

 
      if(callNow) { 
 
       observer.next(callback.apply(context, args)); 
 
      } 
 
     }); 
 
    } 
 
} 
 

 
let doStuffObservable2Debounced = debounceObservable(doStuffObservable2); 
 

 
    
 
/* ************* */ 
 
/* WITH PROMISES */ 
 
/* ************* */ 
 

 
let doStuffPromise = (param) => { 
 
    return new Promise((resolve, reject) => { 
 
     log('this should be called only one time (promise)'); 
 
     setTimeout(() => { 
 
      resolve('promise ' + param); 
 
     }, 1000); 
 
    }); 
 
} 
 

 
let debouncePromise = (callback, delay, immediate) => { 
 
    let timeout; 
 
    return function() { 
 
     let context = this, args = arguments; 
 
     return new Promise(function (resolve) { 
 
      let later = function() { 
 
       timeout = null; 
 
       
 
       if (!immediate) { 
 
        resolve(callback.apply(context, args)); 
 
       } 
 
      }; 
 
      let callNow = immediate && !timeout; 
 
      clearTimeout(timeout); 
 
      timeout = setTimeout(later, delay); 
 
      
 
      if (callNow) { 
 
       resolve(callback.apply(context, args)); 
 
      } 
 
     }); 
 
    } 
 
} 
 

 
    
 
/* ******* */ 
 
/* SAMPLES */ 
 
/* ******* */ 
 

 
function doObservableDebounce() { 
 
    doStuffObservable(); 
 
    
 
    // result : 
 
    
 
     // this should be called only one time (observable.debounce) 
 
     // observable.debounce 3 
 
    
 
    // this is not what i want, i want all three values in output 
 
} 
 

 
function doObservableDebounce3Times() { 
 
    doStuffObservable(); 
 
    doStuffObservable(); 
 
    doStuffObservable(); 
 
    
 
    // result : 
 
    
 
     // this should be called only one time (observable.debounce) 
 
     // this should be called only one time (observable.debounce) 
 
     // this should be called only one time (observable.debounce) 
 
     // observable.debounce 3 
 
     // observable.debounce 3 
 
     // observable.debounce 3 
 
    
 
    // this is bad 
 
} 
 

 
function doObservableWrappedDebounce3Times() { 
 
    doStuffObservable2Debounced(1) 
 
     .subscribe((response) => { 
 
      log(response); 
 
      response.subscribe((response2) => { 
 
       log(response2); 
 
      }, (error) => { 
 
       log(error); 
 
      }) 
 
     }, (error) => { 
 
      log(error); 
 
     }); 
 
    doStuffObservable2Debounced(2) 
 
     .subscribe((response) => { 
 
      log(response); 
 
      response.subscribe((response2) => { 
 
       log(response2); 
 
      }, (error) => { 
 
       log(error); 
 
      }) 
 
     }, (error) => { 
 
      log(error); 
 
     }); 
 
    doStuffObservable2Debounced(3) 
 
     .subscribe((response) => { 
 
      log(response); 
 
      response.subscribe((response2) => { 
 
       log(response2); 
 
      }, (error) => { 
 
       log(error); 
 
      }) 
 
     }, (error) => { 
 
      log(error); 
 
     }); 
 
    
 
    
 
    // result : 
 
    
 
     // AnonymousObservable { source: undefined, __subscribe: [Function] } 
 
     // this should be called only one time (observable wrapped) 
 
     // observable wrapped 3 
 
    
 
    // this is good but there are 2 embedded subscribe 
 
} 
 

 
function doPromiseDebounce3Times() { 
 
    let doStuffPromiseDebounced = debouncePromise(doStuffPromise); 
 
    
 
    doStuffPromiseDebounced(1).then(response => { 
 
     log(response); 
 
    }) 
 
    doStuffPromiseDebounced(2).then(response => { 
 
     log(response); 
 
    }) 
 
    doStuffPromiseDebounced(3).then(response => { 
 
     log(response); 
 
    }) 
 
    
 
    // result : 
 
    
 
     // this should be called only one time (promise) 
 
     // promise 3 
 
    
 
    // this is perfect 
 
}
<!DOCTYPE html> 
 
<html> 
 

 
    <head> 
 
    <script data-require="[email protected]" data-semver="4.0.6" src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/4.0.6/rx.all.js"></script> 
 
    </head> 
 

 
    <body> 
 
    <button onclick='doObservableDebounce()'>Observable.debounce</button> 
 
    <button onclick='doObservableDebounce3Times()'>Observable.debounce x3</button> 
 
    <button onclick='doObservableWrappedDebounce3Times()'>Observable wrapped x3</button> 
 
    <button onclick='doPromiseDebounce3Times()'>Promise x3</button> 
 
    <ul id="logs"></ul> 
 
    </body> 
 

 
</html>

+1

なぜあなたが車輪の再発明していますdebounceObservable? RxJSはすでにデバウンス演算子を提供しています。だからこそ、それは単に観察可能な実装ではなく、図書館である。 – estus

+0

「オブザーバの代わりにオブザーバブルをデバウンスする」とはどういう意味ですか? '.debounce()'はストリームを通過するアイテムを破棄し、観察可能で、オブザーバーはそこで何もしません。 – olivarra1

+1

おそらく、あなたはdebounceTimeを使う必要があるのでしょうか? http://reactivex.io/rxjs/class/es6/Observable.js~Observable.html#instance-method-debounceTime – olivarra1

答えて

1

申し訳ありませんが、私のコメントにあなたの返信から通知がありませんでした。

この問題のクリーナーのRx-唯一の解決策はそうのように、イベントのストリームとしてあなたのサービスコールを考えることであろう

constructor() { 
    this._genericServiceCall$ = new ReplaySubject(1); 
    this._genericServiceResult$ = this._genericServiceCall$ 
     .asObservable() 
     .debounceTime(1000) 
     .switchMap(params => this._genericService(params)); 
} 

private _genericService(params) { 
    //Do some stuff 
    //... 

    return this.http.post('http://foo.com', params) 
     .map((response) => { 
      //Do some generic stuff 
      //... 

      return someData; 
     }) 
     .catch((error: any) => { 
      //Manage error in a generic way + do some generic stuff 
      //... 

      return Observable.throw(error); 
     }); 
} 

public genericService(params) { 
    this._genericServiceCall$.next(params); 
    return this._genericServiceResult$; // Optionally add `.take(1)` so the observer has the expected behaviour of only getting 1 answer back 
} 

私もこれで何かを参照してください...どのparamsはあなたの意志プライベート_genericServiceを経由しなければならないものとして受け入れる?

とにかくここでは何が起こっているのですか?したがって、誰かがgenericService()に電話するたびに、サービスをすぐに呼び出すことはなく、代わりに新しい_genericServiceCall$を発信し、_genericServiceResult$ストリームを返します。このストリームがどのように定義されているかを見てみると、デバックされた_genericServiceCall$が必要で、それをサービスコールにマップしていることがわかります。理論的にはうまくいく - 試していない。

編集:今、私が見る - あなたはそれは、すぐに任意の観察者がそれに加入すると戻ります他、ホット観察できるそれを作るためにgenericServiceResultを公開する必要がある場合があります

constructor() { 
    this._genericServiceCall$ = new ReplaySubject(1); 
    this._genericServiceResult$ = this._genericServiceCall$ 
     .asObservable() 
     .debounceTime(1000) 
     .switchMap(params => this._genericService(params)) 
     .publish(); 
    const subscription = this._genericServiceResult$.connect(); 
    // You must store subscription somewhere and dispose it when this object is destroyed - If it's a singleton service this might not be needed. 
} 
+0

debounceの代わりに 'throttleTime'を考慮してください:それはすぐに放出され、しばらくしてから他のemisionを許可しません(イベントを取得するたびにdebounceしながら、終了したらタイマーをリセットします) – olivarra1

+0

Thxこれは魅力のように機能し、take(1)も私が必要としていたものでした。しかし、なぜ私は購読を保存する必要があるのか​​分からない。せずに働くと思われる。これはメモリ管理に関連していますか? – Ldoppea

+0

また、デバウンス後にすべてのサブスクライバがnext()イベントを受け取るため、実装は私のものより優れています。私の場合、最後の加入者だけがそれを受け取る。 – Ldoppea

0

さて、私は、私は方法を見つけたと思います。私がやるべきことは交換することです:

callback.apply(context, args).subscribe((response) => { 
     observer.next(response) 
    }, (error) => { 
     observer.error(error); 
    }); 

によって

observer.next(callback.apply(context, args)); 

最後に、これは古典的な観察可能なように使うことができます:ここで

debouncedObservable(1) 
    .subscribe((response) => { 
     log(response); 
    }, (error) => { 
     log(error); 
    }); 

は実装とスニペットです:

let log = (logValue) => { 
 
    const list = document.querySelector('#logs'); 
 
    const li = document.createElement('li'); 
 
    li.innerHTML = logValue; 
 
    list.appendChild(li); 
 
} 
 

 
/* *********************************** */ 
 
/* WITH OBSERVABLE WRAPPED IN DEBOUNCE */ 
 
/* *********************************** */ 
 

 
let doStuffObservable = (param) => { 
 
    return Rx.Observable.create((observer) => { 
 
     log('this should be called only one time (observable wrapped)'); 
 
     setTimeout(() => { 
 
      observer.next('observable wrapped ' + param); 
 
     }, 1000); 
 
    }) 
 
} 
 

 
let debounceObservable = (callback, delay, immediate) => { 
 
    let timeout; 
 
    return function() { 
 
     let context = this, args = arguments; 
 

 
     return Rx.Observable.create((observer) => { 
 
      let later = function() { 
 
       timeout = null; 
 

 
       if(!immediate) { 
 
        callback.apply(context, args).subscribe((response) => { 
 
          observer.next(response) 
 
         }, (error) => { 
 
          observer.error(error); 
 
         }); 
 
       } 
 
      }; 
 
      let callNow = immediate && !timeout; 
 
      clearTimeout(timeout); 
 
      timeout = setTimeout(later, delay); 
 

 
      if(callNow) { 
 
       callback.apply(context, args).subscribe((response) => { 
 
          observer.next(response) 
 
         }, (error) => { 
 
          observer.error(error); 
 
         }); 
 
      } 
 
     }); 
 
    } 
 
} 
 

 
let doStuffObservable2Debounced = debounceObservable(doStuffObservable); 
 

 
/* ******* */ 
 
/* SAMPLES */ 
 
/* ******* */ 
 

 
function doObservableWrappedDebounce3Times() { 
 
    doStuffObservable2Debounced(1) 
 
     .subscribe((response) => { 
 
      log(response); 
 
     }, (error) => { 
 
      log(error); 
 
     }); 
 
     
 
    doStuffObservable2Debounced(2) 
 
     .subscribe((response) => { 
 
      log(response); 
 
     }, (error) => { 
 
      log(error); 
 
     }); 
 
     
 
    doStuffObservable2Debounced(3) 
 
     .subscribe((response) => { 
 
      log(response); 
 
     }, (error) => { 
 
      log(error); 
 
     }); 
 
}
<!DOCTYPE html> 
 
<html> 
 

 
    <head> 
 
    <script data-require="[email protected]" data-semver="4.0.6" src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/4.0.6/rx.all.js"></script> 
 
    </head> 
 

 
    <body> 
 
    <button onclick='doObservableWrappedDebounce3Times()'>Observable wrapped x3</button> 
 
    <ul id="logs"></ul> 
 
    </body> 
 

 
</html>

私が何か逃したと思ったらコメントしてください。