2016-09-04 26 views
4

私のアングル2アプリでは、私は多くのオブザーバブルとサブスクリプションを持っています。 もちろん、私はページを離れるときに退会する必要がありますが、私はそれがアクティブなサブスクリプションの数を得ることができるかどうかを調べようとしています。 ちょうど情報をデバッグするため、または退会を忘れた場合。Rxjs観測可能なサブスクリプションの数

rxjsで利用可能な情報はありますか?

+0

を変更するたびにコンソールがログに記録されます私は(それゆえ答えるコメント&いない)かわからない...しかし、私はobservablesを購読するのではなく、 'async'パイプをビューで使用すると、フレームワークがそれを処理するので、購読を解除する必要はありません。 – Brocco

答えて

1

あなたの場合は、refCountingでRxJS科目をうまく活用することができます。あなたは、あなたのソースを観測可能にし、refCountにサブスクリプションを管理させるでしょう。 refCountは、オブザーバーがリッスンしていない場合、ソースからのオブザーバーブルを退会します。一方、オブザーバカウントが0でオブザーバがそれにサブスクライブしている場合、ソースオブザーバブルの新しいインスタンスが作成されます。

サブジェクトは、オブザーバとソースオブザーバブルの間のプロキシとして機能し、サブスクライブおよびサブスクライブの管理を行います。本質的にどのように動作するかは、あなたの最初のオブザーバーがあなたのサブジェクトにサブスクライブして、そのサブジェクトがあなたのソースオブザーバブルに購読しているときです(refCountは0から1になりました)。被験者は、ユニキャストソースを聴取している複数の観察者がマルチキャストにすることを観察可能にする。オブザーバーが退会を開始し、refCountが再び0に下がると、Subject自体は、観測可能なソースから退会します。

それはコードで理解方が良いです:

const { 
 
    Observable 
 
} = Rx; 
 

 
let sourceObservable = Observable 
 
    .create((observer) => { 
 
    let count = 0; 
 
    let interval = setInterval(() => { 
 
     observer.next(count++) 
 
    }, 700); 
 

 
    setTimeout(() => { 
 
     clearInterval(interval); 
 
     observer.complete(); 
 
    }, 5500); 
 

 
    return() => { 
 
     clearInterval(interval); 
 
     console.log('######## Source observable unsubscribed'); 
 
    } 
 
    }) 
 
    .do((x) => console.log('#### Source emits: ' + x)); 
 

 
let subject = sourceObservable 
 
    .share() 
 
    //.do((x) => console.log('#### Subject emits: ' + x)) 
 
    ; 
 

 
let pageOneObserver; 
 
let pageTwoObserver; 
 
let pageThreeObserver; 
 

 
setTimeout(() => { 
 
    console.log('pageOneObserver will subscribe'); 
 
    pageOneObserver = subject.subscribe({ 
 
    next: (x) => { 
 
     console.log('pageOneObserver gets: ' + x); 
 
    }, 
 
    complete:() => { 
 
     console.log('pageOneObserver: complete'); 
 
    } 
 
    }); 
 
}, 1000); 
 

 
setTimeout(() => { 
 
    console.log('pageTwoObserver will subscribe'); 
 
    pageTwoObserver = subject.subscribe({ 
 
    next: (x) => { 
 
     console.log('pageTwoObserver gets: ' + x); 
 
    }, 
 
    complete:() => { 
 
     console.log('pageTwoObserver: complete'); 
 
    } 
 
    }); 
 
}, 4000); 
 

 
setTimeout(() => { 
 
    console.log('pageOneObserver will unsubscribe'); 
 
    pageOneObserver.unsubscribe(); 
 
}, 7000); 
 

 
setTimeout(() => { 
 
    console.log('pageTwoObserver will unsubscribe'); 
 
    pageTwoObserver.unsubscribe(); 
 
}, 10000); 
 

 
setTimeout(() => { 
 
    console.log('pageThreeObserver will subscribe'); 
 
    pageThreeObserver = subject.subscribe({ 
 
    next: (x) => { 
 
     console.log('pageThreeObserver gets: ' + x); 
 
    }, 
 
    complete:() => { 
 
     console.log('pageThreeObserver: complete'); 
 
    } 
 
    }); 
 
}, 13000); 
 

 
setTimeout(() => { 
 
    console.log('pageThreeObserver will unsubscribe'); 
 
    pageThreeObserver.unsubscribe(); 
 
}, 16000);
<script src="https://unpkg.com/[email protected]/bundles/Rx.min.js"></script>

科目を作成するためのいくつかの速記の方法があります。たとえば:

sourceObservable.share(); 
// is the same as 
sourceObservable.publish().refCount(); 
sourceObservable.publish().refCount(); 
// is the same as 
sourceObservable.multicast(new Rx.Subject()).refCount(); 
sourceObservable.publishReplay().refCount(); 
// is the same as 
sourceObservable.multicast(new Rx.ReplaySubject(1)).refCount(); 
sourceObservable.publishBehavior().refCount(); 
// is the same as 
sourceObservable.multicast(new Rx.BehaviorSubject(0)).refCount(); 
sourceObservable.publishLast().refCount(); 
// is the same as 
sourceObservable.multicast(new Rx.AsyncSubject()).refCount(); 

sourceObservable.share();も観察可能なソースは、我々はsourceObservableの新しいインスタンスを作成する必要がある時点で完了したときを意味している対象の工場に建設されましたしかし、それはあなたの選択した科目の新しいインスタンスでのみ行うことができます。以下の利用可能な他のサブジェクトを使用して、ファクトリ関数をマルチキャスト演算子に明示的に返す必要があります。

Rx.Subject()以外の他のサブジェクトタイプを使用したい場合は、オブザーバブルサブスクリプションを本当に再利用できるようにするには、サブジェクトファクトリ(サブジェクトファクトリを使用する必要があります以下に図示されている)を使用したい:

const { 
 
    Observable 
 
} = Rx; 
 

 
let sourceObservable = Observable 
 
    .create((observer) => { 
 
    let count = 0; 
 
    let interval = setInterval(() => { 
 
     observer.next(count++) 
 
    }, 700); 
 

 
    setTimeout(() => { 
 
     clearInterval(interval); 
 
     observer.complete(); 
 
    }, 5500); 
 

 
    return() => { 
 
     clearInterval(interval); 
 
     console.log('######## Source observable unsubscribed'); 
 
    } 
 
    }) 
 
    .do((x) => console.log('#### Source emits: ' + x)); 
 

 
/* You could return whatever subject instance you like here */ 
 
let subjectFactory =() => new Rx.ReplaySubject(1); 
 

 
let subject = sourceObservable 
 
\t .multicast(subjectFactory) 
 
\t .refCount(); 
 
\t //.do((x) => console.log('#### Subject emits: ' + x)) 
 
\t ; 
 

 
let pageOneObserver; 
 
let pageTwoObserver; 
 
let pageThreeObserver; 
 

 
setTimeout(() => { 
 
    console.log('pageOneObserver will subscribe'); 
 
    pageOneObserver = subject.subscribe({ 
 
    next: (x) => { 
 
     console.log('pageOneObserver gets: ' + x); 
 
    }, 
 
    complete:() => { 
 
     console.log('pageOneObserver: complete'); 
 
    } 
 
    }); 
 
}, 1000); 
 

 
setTimeout(() => { 
 
    console.log('pageTwoObserver will subscribe'); 
 
    pageTwoObserver = subject.subscribe({ 
 
    next: (x) => { 
 
     console.log('pageTwoObserver gets: ' + x); 
 
    }, 
 
    complete:() => { 
 
     console.log('pageTwoObserver: complete'); 
 
    } 
 
    }); 
 
}, 4000); 
 

 
setTimeout(() => { 
 
    console.log('pageOneObserver will unsubscribe'); 
 
    pageOneObserver.unsubscribe(); 
 
}, 7000); 
 

 
setTimeout(() => { 
 
    console.log('pageTwoObserver will unsubscribe'); 
 
    pageTwoObserver.unsubscribe(); 
 
}, 10000); 
 

 
setTimeout(() => { 
 
    console.log('pageThreeObserver will subscribe'); 
 
    pageThreeObserver = subject.subscribe({ 
 
    next: (x) => { 
 
     console.log('pageThreeObserver gets: ' + x); 
 
    }, 
 
    complete:() => { 
 
     console.log('pageThreeObserver: complete'); 
 
    } 
 
    }); 
 
}, 13000); 
 

 
setTimeout(() => { 
 
    console.log('pageThreeObserver will unsubscribe'); 
 
    pageThreeObserver.unsubscribe(); 
 
}, 16000);
<script src="https://unpkg.com/[email protected]/bundles/Rx.min.js"></script>

不明確なものがまだあるならば何でも気軽に。

0

次のユーティリティ機能は役立ちます...

function subscriberCount<T>(sourceObservable: Observable<T>, description: string) { 
    let counter = 0; 
    return Observable.create((subscriber: Subscriber<T>) => { 
    const subscription = sourceObservable.subscribe(subscriber); 
    counter++; 
    console.log(`${description} subscriptions: ${counter}`); 

    return() => { 
     subscription.unsubscribe(); 
     counter--; 
     console.log(`${description} subscriptions: ${counter}`); 
    } 
    }); 
} 

このようにそれを使用します。

const timer$ = subscriberCount(Observable.timer(1000), 'Timer'); 

と加入者数は

関連する問題