4

使用RxJS 5.0.0-rc.1にsubcribeからの戻り値を取得し、私はyield.next()を使用してデータを交換することによりhow generators/iterators workと同様の方法で、私のObserverObservableを伝えるためにしようとしています。意図は.subscribeへの呼び出しがを返し、それに応じて私の観測可能なストリームの次の値を修正/更新することを保留にすることです。観察可能

これが可能であるかどうかは完全にはわかりません。しかし、私はのコールバックで例外をキャッチすることがわかりました。以下のスニペットは、"Boom!"をプリントアウト:だから

var source = Observable.create((observer) => { 
    try { 
    observer.next(42); 
    } catch (e) { 
    // This will catch the Error 
    // thrown on the subscriber 
    console.log(e.message); 
    } 
    observer.complete(); 
}); 

source.subscribe(() => { 
    throw new Error('Boom!'); 
}); 

、代わりに投げるの、加入者が値を返す場合は何? Observableがそれを取得する方法はありますか?おそらく私はこれに間違った方法で近づいています。そうであれば、このシナリオでは「反応的な」方法は何でしょうか?

多くのありがとうございます。


私が思いついたEDIT

1つの可能な方法は、ストリーム内のすべてのアイテムにコールバック関数を提供することです。次のようなもの:

var source = Observable.create((observer) => { 
    // This will print "{ success: true }" 
    observer.next({ value: 42, reply: console.log }); 
    observer.complete(); 
}); 

source.subscribe(({ value, reply }) => { 
    console.log('Got', value); 
    return reply({ success: true }); 
}); 

他の考えはありますか?


EDIT 2

私の元の質問は、私が達成しようとしていたものにいくつかの混乱をもたらしたので、私は私の本当の世界のシナリオを説明します。私はキューを介してメッセージを管理するためのモジュールのAPIを書いています(メモリ内の単純化されたメモリ、AMQP-RPCのメカニズムと同じように)、私はRxJSが適しています。

期待通りに機能します。Publisherは、メッセージをキューにプッシュし、Consumerに配信されます。言い換えれば、ConsumerPublisherに返信することができます。興味があればその応答を聞くことができます。理想的なシナリオで

、APIは次のようになります。42を印刷します例

Consumer().consume('some.pattern') 
    .subscribe(function(msg) { 
    // Do something with `msg` 
    console.log(msg.foo); 
    return { ok: true }; 
    }); 

Publisher().publish('some.pattern', { foo: 42 }) 
// (optional) `.subscribe()` to get reply from Consumer 

を。

Publisherに返信するロジックは、Consumer機能の中にあります。しかし、実際の応答は.subscribe()コールバックからとなります。これは私の元の質問につながります。ストリームの作成者から返された値を取得するにはどうすればよいですか? Consumer#consume()

思う:

/** 
* Returns an async handler that gets invoked every time 
* a new message matching the pattern of this consumer 
* arrives. 
*/ 
function waitOnMessage(observer) { 
    return function(msg) { 
    observer.next(msg); 
    // Conceptually, I'd like the returned 
    // object from `.subscribe()` to be available 
    // in this scope, somehow. 
    // That would allow me to go like: 
    // `sendToQueue(pubQueue, response);` 
    } 
} 

return Observable.create((observer) => { 
    queue.consume(waitOnMessage(observer)); 
}); 

これ以上意味を成していますか?

答えて

2

実際にジェネレータと観測値の間には類似点があります。 hereのように、オブザーバブル(値の非同期シーケンス)は、イテラブルの非同期バージョン(値の同期シーケンス)です。

ここで、ジェネレータはIterableを返す関数です。ただし、Rxjs Observableは、生成器 - つまり、subscribeを呼び出して実行/開始するプロデューサと、Observerオブジェクトを渡すことによって観測される値の非同期シーケンスを囲みます。 subscribeコールはDisposableを返し、値の受信を停止する(切断)ことができます。したがって、ジェネレータとオブザーバブルはデュアルコンセプトですが、それらを使用するAPIは異なります。

デフォルトでrxjs observable APIを使用して双方向通信を行うことはできません。おそらく、主題を介して自分自身を構築することでそれを行うことができます(サイクルを開始するためには初期値が必要であることに注意してください)。あなたがしていることをラップ検討することもでき

var backChannel = Rx.Subject(); 
backChannel.startWith(initialValue).concatMap(generateValue) 
    .subscribe(function observer(value){ 
    // Do whatever 
    // pass a value through the backChannel 
    backChannel.next(someValue) 
}) 
// generateValue is a function which takes a value from the back channel 
// and returns a promise with the next value to be consumed by the observer. 

:それ以外の場合は

function twoWayObsFactory (yield, initialValue) { 
    var backChannel = Rx.BehaviorSubject(initialValue); 
    var next = backChannel.next.bind(backChannel); 
    return { 
    subscribe : function (observer) { 
     var disposable = backChannel.concatMap(yield) 
     .subscribe(function(x) { 
      observer(next, x); 
     }); 
     return { 
     dispose : function(){disposable.dispose(); backChannel.dispose();} 
     } 
    } 
    } 
} 

// Note that the observer is now taking an additional parameter in its signature 
// for instance 
// observer = function (next, yieldedValue) { 
//    doSomething(yieldedValue); 
//    next(anotherValue); 
//   } 
// Note also that `next` is synchronous, as such you should avoir sequences 
// of back-and-forth communication that is too long. If your `yield` function 
// would be synchronous, you might run into stack overflow errors. 
// All the same, the `next` function call should be the last line, so order of 
// execution in your program is the same independently of the synchronicity of 
// the `yield` function 

、あなたが記述動作は非同期発電機のそれのようです。私はそのようなものを使ったことはありませんでしたが、これはjavascriptの将来のバージョンの提案なので、 は既にBabel(cf. https://github.com/tc39/proposal-async-iteration)で試してみることができます。

EDIT

あなたがループバックメカニズムを探しています(以下の汎用的なアプローチが、何がやりたいことは十分に簡単である場合に非常によく、あなたのユースケースに収まる可能性が)場合は、expandオペレータ助けることができます。その動作を理解するために、具体的な状況での使用の例については、SO上doc、および以下の答えを確認してください。

を基本的にexpandがすることができますどちらも下流の値を出力し、その値をプロデューサで同時に返します。

+0

私はあなたのアイデアが好きです。その使用法の例を教えてください。おそらく、あなたの 'twoWayObsFactory'を使って元の質問に私の簡単なサンプルコードを再実装しますか? –

+0

私はちょうど考えていました、 'expand'演算子(Rxjs v4)をチェックしましたか?それはフィードバックループを持つことができます。あなたの単純なコードは、あなたが記述した問題の本当に良い例ではありません。あなたは、私が見ているもののために、その例であなたの観察可能なものに価値を戻していません。 – user3743222

+0

値を送信しています。実際にはオブジェクト全体です: '{success:true}'。観察可能なものは、それを印刷する以外は何もしませんが、あなたがしたいものに 'reply'を変更し、あなたはどんな方法でも応答を操作することができます。私は第2の例について、** EDIT **の下で話しています。 –