2017-02-16 5 views
19

私はRsJS 5(5.0.1)を使用して角2でキャッシュしています。うまく動作します。HTTPサービスのリアクティブキャッシュ

キャッシュ機能の肉は、次のとおりです。

const observable = Observable.defer(
    () => actualFn().do(() => this.console.log('CACHE MISS', cacheKey)) 
) 
    .publishReplay(1, this.RECACHE_INTERVAL) 
    .refCount().take(1) 
    .do(() => this.console.log('CACHE HIT', cacheKey)); 

actualFnthis.http.get('/some/resource')です。

私が言うように、これは完全に私のために働いています。キャッシュは、RECACHE_INTERVALの期間オブザーバブルから返されます。その間隔の後にリクエストが行われると、actualFn()が呼び出されます。

私が理解しようとしているのは、RECACHE_INTERVALの有効期限が切れ、actualFn()が最後の値を返す方法です。 RECACHE_INTERVALの有効期限が切れるまでの時間があり、actualFn()が再生され、観測値が値を返さない。そのギャップをなくし、常に最後の値を返したいと思います。

私は副作用を使用して、最後の良好な値のコール.next(lastValue)を保存してHTTP応答が返るのを待つことができましたが、これはナイーブであるようです。可能であれば、純粋な関数解— "RxJS"の方法を使いたいと思います。

答えて

2

更新答え:

常に新しい要求が最新の値を保持してチェーン内の別の主題を置くことができ、その後行われている間、前の値を使用したい場合

この値を繰り返して、キャッシュから来ているかどうかを知ることができます。サブスクライバは、キャッシュされた値がそれらに興味がない場合は、キャッシュされた値をフィルタリングできます。

// Take values while they pass the predicate, then return one more 
// i.e also return the first value which returned false 
const takeWhileInclusive = predicate => src => 
    src 
    .flatMap(v => Observable.from([v, v])) 
    .takeWhile((v, index) => 
    index % 2 === 0 ? true : predicate(v, index) 
) 
    .filter((v, index) => index % 2 !== 1); 

// Source observable will still push its values into the subject 
// even after the subscriber unsubscribes 
const keepHot = subject => src => 
    Observable.create(subscriber => { 
    src.subscribe(subject); 

    return subject.subscribe(subscriber); 
    }); 

const cachedRequest = request 
    // Subjects below only store the most recent value 
    // so make sure most recent is marked as 'fromCache' 
    .flatMap(v => Observable.from([ 
    {fromCache: false, value: v}, 
    {fromCache: true, value: v} 
    ])) 
    // Never complete subject 
    .concat(Observable.never()) 
    // backup cache while new request is in progress 
    .let(keepHot(new ReplaySubject(1))) 
    // main cache with expiry time 
    .let(keepHot(new ReplaySubject(1, this.RECACHE_INTERVAL))) 
    .publish() 
    .refCount() 
    .let(takeWhileInclusive(v => v.fromCache)); 

    // Cache will be re-filled by request when there is another subscription after RECACHE_INTERVAL 
    // Subscribers will get the most recent cached value first then an updated value 

https://acutmore.jsbin.com/kekevib/8/edit?js,console

オリジナルの答え:

代わりreplaySubject上のウィンドウサイズを設定する - あなたが遅延した後に繰り返すことが観察可能なソースを変更することができます。その後、ケースが無効化されたときに、私は1200msで結果をキャッシュされた取得しようとしているhttps://jsbin.com/todude/10/edit?js,console

お知らせと:

const observable = Observable.defer(
    () => actualFn().do(() => this.console.log('CACHE MISS', cacheKey)) 
) 
    .repeatWhen(_ => _.delay(this.RECACHE_INTERVAL)) 
    .publishReplay(1) 
    .refCount() 
    .take(1) 
    .do(() => this.console.log('CACHE HIT', cacheKey)); 

repeatWhenオペレータは、このデモをご覧ください https://github.com/ReactiveX/rxjs/blob/master/CHANGELOG.md#500-beta12-2016-09-09

+0

提案していただきありがとうございます。これは非常に良いです。 2つの問題があります:チェーンから '.take(1)'演算子を削除する必要があります。そうでなければ、2回目のHTTP要求は行われません。 2番目の問題は、すべてのRECACHE_INTERVALではなく、RECACHE_INTERVALの後に別のsubsribe()が呼び出されたときに再キャッシュがトリガーされるようにすることです。 – Martin

+0

@Martin私は、RECACHE_INTERVALの後に他の購読者が購読したときにのみリクエストが行われるように自分の回答を更新しました。これが役立つことを願って – Ashley

2

RxJs-beta12以降が必要です前の要求がまだ保留中のときは1300msになります(200msが必要です)。両方の結果が必要なときに受信されます。

publishReplay()には有効な値が含まれていないため、何も出力せず、即座に完了しない(take(1))ので、HTTPリクエストを送信するソースに登録する必要がある(これは実際にrefCount()で起こります)。

次に、2番目のサブスクライバも何も受信せず、publishReplay()のオブザーバの配列に追加されます。すでにソース(refCount())に登録されており、応答を待っているため、別のサブスクリプションは作成されません。

あなたが描いている状況は起こるべきではないと思います。最終的に問題を示すデモを作成してください。

EDIT

無効化項目および新鮮アイテム

次の例の両方を放出は、リンクされた例より少し異なる機能を示しています。キャッシュされたレスポンスが無効になっていると、それはとにかく放出され、新しい値も受け取られます。

  • 1値:キャッシュされた値
  • 2値:無効にキャッシュされた値が、その後、今からキャッシュされます新鮮な値を新しいこれは、加入者は、1つのまたは2つの値を受信します。

コードは、次のようになります。

let counter = 1; 
const RECACHE_INTERVAL = 1000; 

function mockDataFetch() { 
    return Observable.of(counter++) 
    .delay(200); 
} 

let source = Observable.defer(() => { 
    const now = (new Date()).getTime(); 

    return mockDataFetch() 
    .map(response => { 
     return { 
     'timestamp': now, 
     'response': response, 
     }; 
    }); 
}); 

let updateRequest = source 
    .publishReplay(1) 
    .refCount() 
    .concatMap(value => { 
    if (value.timestamp + RECACHE_INTERVAL > (new Date()).getTime()) { 
     return Observable.from([value.response, null]); 
    } else { 
     return Observable.of(value.response); 
    } 
    }) 
    .takeWhile(value => value); 


setTimeout(() => updateRequest.subscribe(val => console.log("Response 0:", val)), 0); 
setTimeout(() => updateRequest.subscribe(val => console.log("Response 50:", val)), 50); 
setTimeout(() => updateRequest.subscribe(val => console.log("Response 200:", val)), 200); 
setTimeout(() => updateRequest.subscribe(val => console.log("Response 1200:", val)), 1200); 
setTimeout(() => updateRequest.subscribe(val => console.log("Response 1300:", val)), 1300); 
setTimeout(() => updateRequest.subscribe(val => console.log("Response 1500:", val)), 1500); 
setTimeout(() => updateRequest.subscribe(val => console.log("Response 3500:", val)), 3500); 

はライブデモを参照してください:https://jsbin.com/ketemi/2/edit?js,console

これは、次のような出力をコンソールに出力します。

Response 0: 1 
Response 50: 1 
Response 200: 1 
Response 1200: 1 
Response 1300: 1 
Response 1200: 2 
Response 1300: 2 
Response 1500: 2 
Response 3500: 2 
Response 3500: 3 

お知らせ1200と受信1300最初に古いキャッシュ値1をすぐに新しい値2の値を持つ別の値。
一方、2はすでにキャッシュされており、有効であるため、1500は新しい値のみを受け取りました。

おそらく最も混乱しているのは、なぜconcatMap().takeWhile()ですか?これは、完全な通知を送信する前に新鮮なレスポンス(無効化されていない)が最後の値であることを確認する必要があり、そのための演算子が存在しない可能性があるからです(first()takeWhile()もこのユースケースに該当しません)。私たちは、HTTPリクエストからの新鮮な応答を待っていない間だけキャッシュされた値を放出したいときにさらに別のユースケースは、可能性がリフレッシュ

を待たずに、現在のアイテムを発する

。それが今無効だにもかかわらず、キャッシュされた値をだから12001300の両方が値1を受け取ること

Response 0: 1 
Response 50: 1 
Response 200: 1 
Response 1200: 1 
Response 1300: 1 
Response 1500: 2 
Response 3500: 2 
Response 3800: 3 

お知らせ:https://jsbin.com/kebapu/2/edit?js,console

この例のプリントコンソールに:

let counter = 1; 
const RECACHE_INTERVAL = 1000; 

function mockDataFetch() { 
    return Observable.of(counter++) 
    .delay(200); 
} 

let source = Observable.defer(() => { 
    const now = (new Date()).getTime(); 

    return mockDataFetch() 
    .map(response => { 
     return { 
     'timestamp': now, 
     'response': response, 
     }; 
    }); 
}); 

let updateRequest = source 
    .publishReplay(1) 
    .refCount() 
    .concatMap((value, i) => { 
    if (i === 0) { 
     if (value.timestamp + RECACHE_INTERVAL > (new Date()).getTime()) { // is cached item valid? 
     return Observable.from([value.response, null]); 
     } else { 
     return Observable.of(value.response); 
     } 
    } 
    return Observable.of(null); 
    }) 
    .takeWhile(value => value); 


setTimeout(() => updateRequest.subscribe(val => console.log("Response 0:", val)), 0); 
setTimeout(() => updateRequest.subscribe(val => console.log("Response 50:", val)), 50); 
setTimeout(() => updateRequest.subscribe(val => console.log("Response 200:", val)), 200); 
setTimeout(() => updateRequest.subscribe(val => console.log("Response 1200:", val)), 1200); 
setTimeout(() => updateRequest.subscribe(val => console.log("Response 1300:", val)), 1300); 
setTimeout(() => updateRequest.subscribe(val => console.log("Response 1500:", val)), 1500); 
setTimeout(() => updateRequest.subscribe(val => console.log("Response 3500:", val)), 3500); 
setTimeout(() => updateRequest.subscribe(val => console.log("Response 3800:", val)), 3800); 

はライブデモを参照してください。 1200の最初の呼び出しでは、応答を待たずに新しいHTTPリクエストが生成され、キャッシュされた値だけが送信されます。その後、1500に新しい値がキャッシュされるので、再読み込みされます。同じことが35003800に適用されます。のサブスクライバは、ただちにnext通知を受信しますが、complete通知はHTTP要求の完了後に送信されることに注意してください。 completenextの直後に送ってしまえば、その使い捨て物を処分するチェーンを作るので、私たちは待つ必要があります。これはまた、HTTPリクエストをキャンセルする必要があります(これは間違いありません)。

+0

Martin、答えに感謝します。現在の実装がどのように機能するかを完全に把握しています。私が明確にしていないのは最後の文です - 希望の行動や私の現在の働き方を「状況」と呼んでいますか? – Martin

+0

@マーティン私の悪い、私は "起こるべきではない"と書いたかった。私はあなたの加入者が何も受け取らない空の窓を持つことができるとは思わない。 – martin

+0

OK私は見る - 私は私の質問で十分ではなかった。 「ギャップ」は、新しい観察者がRECACHE_INTERVALの後にサブスクライブするときに発生します。明確にするために質問を更新してください。ありがとうございました。 – Martin

2

普通のrxjsを使用すると、ほとんどの複雑なロジックがすぐに制御から外れます。私はむしろゼロからカスタムcacheオペレータを実装したいと思いますが、これは例としてgistを使用できます。