2017-11-03 15 views
0

メッセージを読み書きするメソッドがあります。複数の呼び出しを避ける共通のオブザーバブルを作成する

fun sendMessage(message: String): Observable<MESSAGE> { 
    return readMessage().doOnSubscribe { 
      socket.write(message)  
    } 
} 

readMessage()ストリーム(socket.read())からの高温観察から値を発する公開対象をバック与えます。

protected fun readMessage(): Observable<MESSAGE> { 
    if (messageSubject == null) { 
     messageSubject = PublishSubject.create() 
     socket.read() 
       .flatMap { 
        [email protected] flowTransformer.runLoop(it) 
       } 
       .flatMap { 
        //Do some stuff 
       } 
       .subscribe(messageSubject) 
    } 
    return messageSubject 
} 

私は同じチェーン内の別のポイントと複数回sendMessage()を呼び出します。私は(そのメッセージの応答がドロップである)私はまだ出版社に加入していない可能性がありsendMessage()を呼び出すとき

 sendMessage("Message1").flatMap { 
     sendMessage("Message2") 
    }.flatMap { 
     sendMessage("Message 3") 
    }.subscribe({ 
     //next 
    }, { 
     //error 
    }) 

問題があります。 ReplaySubjectを使用するとメッセージが多すぎます。sendMessage()をたくさん使用しているので、私は恐れています。

いくつかの時間、最初のsendMessageからのreadObservableはすべての次のメッセージを読み取ります。解析作業がCPUを大量に消費するため、問題です。

どうすればそのチェーンを改善できますか?あなたがそうバッファサイズ

public void ReplaySubjectBufferExample() 
{ 
    var bufferSize = 2; 
    var subject = new ReplaySubject<string>(bufferSize); 
    subject.OnNext("a"); 
    subject.OnNext("b"); 
    subject.OnNext("c"); 
    subject.Subscribe(Console.WriteLine); 
    subject.OnNext("d"); 
} 

でPublishSubjectを作成することができます

+0

最初のメッセージが送信されるとすぐにサブスクリプションを準備できます。 – tynn

+0

どうすればいいですか?私はsendMessage()を購読するときだけソケットに書いてこれをすでに行ったと思いますか? – Timo

答えて

0

、この方法では、あなたが再生するアイテムの数を制御することができます。 ソースReplaySubject