2016-03-22 10 views
0

一連のメッセージを作成する必要があります。件名。メッセージリスナーの例

  • 私はシーケンス
  • で「プッシュ」メッセージをすることができるはず私は「クエリ」のすべてのメッセージが
  • 私は新しいメッセージにすべてのリスナーに通知することができるはずすることができるはずです。

は、今私はで仕上げ:

var math = require('mathjs') 
var Rx = require('rx') 
var _ = require('lodash') 

var messagesSubject = new Rx.Subject() 
var messagesPool = messagesSubject.map(function() { return [el]}).scan([], _.union) 


Rx.Observable 
    .interval(500 /* ms */) 
    .timeInterval() 

    .filter(
    function() { return math.randomInt(10) > 8;} 
) 
    .do(function(x) { 
    messagesSubject.subscribe(function(msg) { 
     console.log('subscriber ' + x.value + ' do something with ' + msg.text) 
    }) 
    }).subscribe() 

Rx.Observable 
    .interval(500 /* ms */) 
    .timeInterval() 
    .filter(
    function() { return math.randomInt(10) > 2;} 
) 
    .map(function() { 
    return { text: math.pickRandom(['one', 'two', 'three'])} 
    }).subscribe(messagesSubject) 

私は、以前のすべてのメッセージ(messagesPool)ですべての新しい加入者に通知することができますどのように?

副題:対象の有効なユースケースですか?あるいは別の種類の科目を選択する必要がありますか?

+1

は私にはうってつけです。 'messagesSubject.onNext'や' messagesSubject.next'(Rxjs V5用)のメッセージをプッシュするといいでしょうか?それ以外の場合は、ここで件名を使用する他の方法を見直すことができます:http://stackoverflow.com/questions/34849873/what-are-the-semantics-of-different-rxjs-subjects – user3743222

+0

onNextまたは単に 'pipe'私は知りません。 – kharandziuk

+0

'パイプ'については知りません。しかし、 'next'は、確実に主題APIの一部です。 rxjs v5にいくつかの変更があったので、あなたのrxjsバージョンに適切なAPIを使用するよう注意する必要があります。 – user3743222

答えて

1

他の人が指摘しているように、ReplaySubjectはあなたの友人になることができます。

これは、メッセージプール機能を削除できる可能性があることを意味します。

あなたは自分のクエリを構成する場合にも、完全に対象を取り除くことができます。

var math = require('mathjs') 
var Rx = require('rx') 
var _ = require('lodash') 

var messages = Rx.Observable 
    .interval(500 /* ms */) 
    .timeInterval() 
    .filter(
    function() { return math.randomInt(10) > 2;} 
) 
    .map(function() { 
    return { text: math.pickRandom(['one', 'two', 'three'])} 
    }) 
    .replay(); 

//Randomly add subscribers (but this would only be dummy code, not suitable for prod) 
var randomSubsriberAdder = Rx.Observable 
    .interval(500 /* ms */) 
    .timeInterval() 
    .filter(
    function() { return math.randomInt(10) > 8;} 
) 
    .subscribe(function(x) { 
    messages.subscribe(function(msg) { 
     console.log('subscriber ' + x.value + ' do something with ' + msg.text); 

var connection = messages.Connect(); 
//messages will now be collecting all values. 
// Late subscribers will get all previous values. 
// As new values are published, existing subscribers will get the new value. 

あなたはデータとRxテストツール/ libsにハードコーディングされたセットを使用したほうが良いかもしれません。 このようにして、テストしているエッジケース(早期加入者、延期加入者、加入者の切断、ストリーム上の無音など)を制御できます。

+0

ちょっといい答えです。しかし、最初のシーケンスは、新しい加入者をmessagePoolに追加することをエミュレートします。 'do'ステートメントなしでこれをどのように書き直すことができますか? – kharandziuk

+0

ああ!私は今参照してください。個人的にはすべてのランダム性を取り除き、Rxに付属のテストツールを使用します。 'Do'の' function'が単に 'Subscribe'に移動する理由はありますか? –

+0

私はちょうどいくつかのメッセージがどのように動作するのかをエミュレートしようとしています。ランダムに表示される一部の加入者がいます(今は時々切断する方法はありません)。その中にはメッセージをプールに送信するものがあります。新しい加入者はすべてすべてのメッセージを受信します。あなたはそのアイデアを理解していますか? – kharandziuk

2

Subjectではなく、ReplaySubjectを探しているようですね。

[ReplaySubjectはa]すべてのアイテムをバッファリングし、サブスクライブするObserverにそれらを再生するサブジェクトです。

1

件名を使用しないコード例リプレイセマンティクスと一時的なサブスクライバ。 node-unit

(窓CMDS)を持つノード上で実行さ

npm install rx 
npm install node-unit 
.\node_modules\.bin\nodeunit.cmd tests 

testディレクトリ内のコード。

var Rx = require('rx') 

var onNext = Rx.ReactiveTest.onNext, 
    onError = Rx.ReactiveTest.onError, 
    onCompleted = Rx.ReactiveTest.onCompleted, 
    subscribe = Rx.ReactiveTest.subscribe; 

exports.testingReplayWithTransientSubscribers = function(test){ 
    //Declare that we expect to have 3 asserts enforced. 
    test.expect(3); 

    //Control time with a test scheduler 
    var scheduler = new Rx.TestScheduler(); 
    //Create our known message that will be published at known times (all times in milliseconds). 
    var messages = scheduler.createColdObservable(
     onNext(0500, 'one'), 
     onNext(1000, 'two'), 
     onNext(2000, 'three'), 
     onNext(3500, 'four'), 
     onNext(4000, 'five') 
    ); 

    //Replay all messages, and connect the reply decorator. 
    var replay = messages.replay(); 
    var connection = replay.connect(); 

    //Create 3 observers to subscribe/unsubscribe at various times. 
    var observerA = scheduler.createObserver(); 
    var observerB = scheduler.createObserver(); 
    var observerC = scheduler.createObserver(); 

    //Subscribe immediately 
    var subA = replay.subscribe(observerA); 
    //Subscribe late, missing 1 message 
    var subB = Rx.Disposable.empty; 
    scheduler.scheduleAbsolute(null, 0800, function(){subB = replay.subscribe(observerB);}); 
    //Subscribe late, and dispose before any live message happen 
    var subC = Rx.Disposable.empty; 
    scheduler.scheduleAbsolute(null, 1100, function(){subC = replay.subscribe(observerC);}); 
    scheduler.scheduleAbsolute(null, 1200, function(){subC.dispose();}); 
    //Dispose early 
    scheduler.scheduleAbsolute(null, 3000, function(){subB.dispose();}); 


    //Start virutal time. Run through all the scheduled work (publishing messages, subscribing and unsubscribing) 
    scheduler.start(); 

    //Assert our assumptions. 
    test.deepEqual(observerA.messages, [ 
      onNext(0500, 'one'), 
      onNext(1000, 'two'), 
      onNext(2000, 'three'), 
      onNext(3500, 'four'), 
      onNext(4000, 'five') 
     ], 
     "ObserverA should receive all values"); 
    test.deepEqual(observerB.messages, [ 
      onNext(0800, 'one'), 
      onNext(1000, 'two'), 
      onNext(2000, 'three'), 
     ], 
     "ObserverB should receive initial value on subscription, and then two live values"); 
    test.deepEqual(observerC.messages, [ 
      onNext(1100, 'one'), 
      onNext(1100, 'two'), 
     ], 
     "ObserverC should only receive initial values on subscription"); 
    test.done(); 
}; 
+0

ありがとう!私は両方のサンプルを理解するのに時間が必要で、多分私は質問を返すでしょう。 – kharandziuk

関連する問題