2017-02-10 20 views
2

オブザーバブルを使用してオブジェクトがアレイにプッシュされたときを監視したい。私は空の配列から始めたいと思います。プッシュが発生したら、オブザーバブルがそれを検出して処理し、次のプッシュまで待つようにします。これは、観測対象がイベントを待つ "fromEvent"と非常によく似ています。以下のコードは、配列が空であるため、直ちにcompleted()を呼び出します。プッシュを待つ方法を教えてください。rxjsアレイのプッシュを観察する

var testArray = []; 

    test(){ 
     var o = {timestamp: new Date()} 
     testArray.push(o) 
    } 

    var o = Observable 
     .from(testArray) 
     .concatMap(x => { 
      return x; 
    }); 

    o.subscribe( 
     x => { console.log("onNext x=",x.timestamp) }, 
     e => console.log('onError:', e), 
    () => {console.log('onCompleted');}); 

注:入力メカニズムは配列である必要はありません。どんな種類のメッセージキューオブジェクトも私のために働くでしょう。

答えて

2

あなたは(これは本当に裸の骨である)Arrayをサブクラス化して起こる押すとあなたに伝え通知メカニズムのいくつかの種類を実装することができます:

class CustomArray extends Array { 
    push(e) { 
    super.push(e) 
    if (this._listeners) { 
     this._listeners.forEach(l => l(e)) 
    } 
    } 
    addPushListener(listener) { 
    this._listeners = this._listeners || [] 
    this._listeners.push(listener) 
    } 
    removePushListener(listener) { 
    if (this._listeners) { 
     const index = this._listeners.indexOf(listener) 
     if (index >= 0) { 
     this._listeners.splice(index, 1) 
     } 
    } 
    } 
} 

次に機能を使用すると、Observable

にこれをラップすることができ
const observePushes = array => Rx.Observable.fromEventPattern(
    array.addPushListener.bind(array), 
    array.removePushListener.bind(array) 
) 

他の観察可能なものと同様に、必要に応じていつでも変更を購読して購読を解除することができます。

const arr = new CustomArray() 
const pushObservable = observePushes(arr) 

const subscription = pushObservable.subscribe(e => console.log(`Added ${e}`)) 

arr.push(1) 
arr.push(2) 
arr.push(3) 
arr.push("a") 

subscription.dispose() 

arr.push("b") 

またどの時点で時間に、あなたがより多くの何が配列に追加されないことを保証することができますので、このObservableは本当に、完了しないことを気に。

フィドル:http://jsfiddle.net/u08daxdv/1/

2

あなたがやろうとしているすべては、あなたが「プッシュ」の値缶にその観測を作成している場合、私はRXJS件名を使用することをお勧めします。

すなわち

const date$ = new Rx.Subject(); 
date$.next(new Date()); 

今あなたがnext()方法とに「プッシュ」することができますDateオブジェクトの観察可能なストリームを持っています。

キューに中間データタイプが必要な場合は、新しいES6機能(プロキシー)を使用することをお勧めします。

const queue = new Proxy([], { 
    set: function(obj, prop, value) { 
    if (!isNaN(prop)) { 
     date$.next(value) 
    } 
    obj[prop] = value 
    return true 
    }, 
}) 

これで、値が追加されるたびにオブザーバブルストリームに追加されるようにプロキシされた配列が作成されました。

+0

私が追加しなければならないことの1つは、「ホット」になるため、値をプッシュする前にサブジェクトに登録したいということです。 –

0

あなたには、いくつかの非同期/遅延をシミュレートしたい場合は、ご利用のサービスクラスにこのような何かを行うことができます(素敵な負荷GIFを!):

constructor() { 
    this.source = new Subject<Array<any>>() 
} 

myObservableArray(): Observable<Array<any>> { 
    setTimeout(() => { 
    this.source.next([1, 2, 3]) 
    }, 3000) 

    return this.source 
} 

..ので、あとでobj.myObservableArray().subscribeを行うことができます!

関連する問題