使用RxJS 5.0.0-rc.1
にsubcribeからの戻り値を取得し、私はyield
と.next()
を使用してデータを交換することによりhow generators/iterators workと同様の方法で、私のObserver
とObservable
を伝えるためにしようとしています。意図は.subscribe
への呼び出しがを返し、それに応じて私の観測可能なストリームの次の値を修正/更新することを保留にすることです。観察可能
これが可能であるかどうかは完全にはわかりません。しかし、私はがのコールバックで例外をキャッチすることがわかりました。以下のスニペットは、"Boom!"
をプリントアウト:だから
var source = Observable.create((observer) => {
try {
observer.next(42);
} catch (e) {
// This will catch the Error
// thrown on the subscriber
console.log(e.message);
}
observer.complete();
});
source.subscribe(() => {
throw new Error('Boom!');
});
、代わりに投げるの、加入者が値を返す場合は何? Observable
がそれを取得する方法はありますか?おそらく私はこれに間違った方法で近づいています。そうであれば、このシナリオでは「反応的な」方法は何でしょうか?
多くのありがとうございます。
私が思いついたEDIT
1つの可能な方法は、ストリーム内のすべてのアイテムにコールバック関数を提供することです。次のようなもの:
var source = Observable.create((observer) => {
// This will print "{ success: true }"
observer.next({ value: 42, reply: console.log });
observer.complete();
});
source.subscribe(({ value, reply }) => {
console.log('Got', value);
return reply({ success: true });
});
他の考えはありますか?
EDIT 2
私の元の質問は、私が達成しようとしていたものにいくつかの混乱をもたらしたので、私は私の本当の世界のシナリオを説明します。私はキューを介してメッセージを管理するためのモジュールのAPIを書いています(メモリ内の単純化されたメモリ、AMQP-RPCのメカニズムと同じように)、私はRxJSが適しています。
期待通りに機能します。Publisher
は、メッセージをキューにプッシュし、Consumer
に配信されます。言い換えれば、Consumer
はPublisher
に返信することができます。興味があればその応答を聞くことができます。理想的なシナリオで
、APIは次のようになります。42
を印刷します例
Consumer().consume('some.pattern')
.subscribe(function(msg) {
// Do something with `msg`
console.log(msg.foo);
return { ok: true };
});
Publisher().publish('some.pattern', { foo: 42 })
// (optional) `.subscribe()` to get reply from Consumer
を。
Publisher
に返信するロジックは、Consumer
機能の中にあります。しかし、実際の応答は.subscribe()
コールバックからとなります。これは私の元の質問につながります。ストリームの作成者から返された値を取得するにはどうすればよいですか? Consumer#consume()
の
思う:
/**
* Returns an async handler that gets invoked every time
* a new message matching the pattern of this consumer
* arrives.
*/
function waitOnMessage(observer) {
return function(msg) {
observer.next(msg);
// Conceptually, I'd like the returned
// object from `.subscribe()` to be available
// in this scope, somehow.
// That would allow me to go like:
// `sendToQueue(pubQueue, response);`
}
}
return Observable.create((observer) => {
queue.consume(waitOnMessage(observer));
});
これ以上意味を成していますか?
私はあなたのアイデアが好きです。その使用法の例を教えてください。おそらく、あなたの 'twoWayObsFactory'を使って元の質問に私の簡単なサンプルコードを再実装しますか? –
私はちょうど考えていました、 'expand'演算子(Rxjs v4)をチェックしましたか?それはフィードバックループを持つことができます。あなたの単純なコードは、あなたが記述した問題の本当に良い例ではありません。あなたは、私が見ているもののために、その例であなたの観察可能なものに価値を戻していません。 – user3743222
値を送信しています。実際にはオブジェクト全体です: '{success:true}'。観察可能なものは、それを印刷する以外は何もしませんが、あなたがしたいものに 'reply'を変更し、あなたはどんな方法でも応答を操作することができます。私は第2の例について、** EDIT **の下で話しています。 –