2017-12-13 30 views
0

次の観測値の作成に問題があります。
あらかじめ定義された値の配列を受け取るようにしたい
そして、私は別のものでフィルタリングし、これらを個々の観測値として扱うことができます。私は私は私の質問に十分な特異的ではなかった申し訳ありません:その後、
そして、それはこれらのフィルター観測をマージする時間が来るとき、私は、元の1つの複数の観測可能な状態でのReactiveXフィルタリングとマージ

//Not sure the share is necessary, just thought it would tie it all together 
const input$ = Observable.from([0,1,0,1]).share(); 
const ones$ = input$.filter(n => n == 1); 
const zeroes$ = input$.filter(n => n == 0); 

const zeroesChanged$ = zeroes$.mapTo(2); 
const onesChanged$ = ones$.mapTo(3); 
const allValues$ = Observable.merge(onesChanged$,zeroesChanged$); 

allValues$.subscribe(n => console.log(n)); 
//Outputs 3,3,2,2 
//Expected output 3,2,3,2 

EDITからの順序を保持したいです。 私は、副作用をドライバに分けるcycleJSというライブラリを使用しています。 それでは、私は私のサイクルでやっていることは、私はそれのためにテストを書きたいと思ったときに今、私の問題が生じ、この

export function socketCycle({ SOCKETIO }) { 
    const serverConnect$ = SOCKETIO.get('connect').map(serverDidConnect); 
    const serverDisconnect$ = SOCKETIO.get('disconnect').map(serverDidDisconnect); 
    const serverFailedToConnect$ = SOCKETIO.get('connect_failed').map(serverFailedToConnect); 
    return { ACTION: Observable.merge(serverConnect$, serverDisconnect$, serverFailedToConnect$) }; 
} 

です。私は間違った問題(冗談を使って)で働いた次のものを試しました

const inputConnect$ = Observable.from(['connect', 'disconnect', 'connect', 'disconnect']).share(); 
const expectedOutput$ = Observable.from([ 
    serverDidConnect(), 
    serverDidDisconnect(), 
    serverDidConnect(), 
    serverDidDisconnect(), 
]); 
const socketIOMock = { 
    get: (evt) => { 
    if (evt === 'connect') { 
     return inputConnect$.filter(s => s === 'connect'); 
    } else if (evt === 'disconnect') { 
     return inputConnect$.filter(s => s === 'disconnect'); 
    } 
    return Observable.empty(); 
    }, 
}; 
const { ACTION } = socketCycle({ SOCKETIO: socketIOMock }); 
Observable.zip(ACTION, expectedOutput$).subscribe(
    ([output, expectedOutput]) => { expect(output).toEqual(expectedOutput); }, 
    (error) => { expect(true).toBe(false) }, 
() => { done(); }, 
); 

私はそれをテストすることができる別の方法がありますか?

コードの下
+0

'入力$ .map(n => n + 2)'です。期待している出力をもっと詳しく見てください。 – cartant

+2

Observable上の "仕事"が一度に1つの値しか考慮しない場合、@cartantは示唆するように、これを "フィルタリング、作業、マージ"とみなすのではなく、まっすぐなマッピングとみなすべきです。最も一般的な場合、あなたのマッピング関数は、if-elseを使って値を別の振る舞いに導きます。彼らがObservables自身を返す場合は、[concatMap](https://www.learnrxjs.io/operators/transformation/concatmap.html)で注文を保存できます。 – concat

+0

実際に私はマップを分離して(ソースと戻り値のシンクを取る)別の関数にマージします。入力が非同期に来るとうまくいきますが、テストしようとすると入力があらかじめ指定されています注文を保存します... – user1898027

答えて

1

を、配列を操作するrxjsを使用する必要ません破壊されました。特に、connectイベントが常にイベントソースのdisconnectイベントの前に来る場合でも、connectオブザーバブルのイベントは、常に対応するイベントアイテムの前に来るとは限りません。disconnectオブザーバブルです。通常のタイムスケールでは、この競合状態はおそらく非常にまれですが危険ですが、このテストは最悪の場合を示します。

表示されているように、関数は、イベントとハンドラの結果の間のマッパーだけです。

const event_handlers = new Map({ 
'connect': serverDidConnect, 
'disconnect': serverDidDisconnect, 
'connect_failed': serverFailedToConnect 
}); 
const ACTION = input$.map(event_handlers.get.bind(event_handlers)); 

警告:あなたがイベントタイプの上に、一般的にこのモデルを継続することができるなら、あなたも表現力に利益をもたらすプレーンデータ構造にマッピングをエンコードすることができますが、娘のストリーム上で削減された場合(またはそれ以外の以前の値を考慮すると、debounceTimeのように)、リファクタリングはそれほど単純ではなく、「順序を保存する」という新しい定義にも依存します。多くの場合、reduce +より複雑なアキュムレータで再現することはまだ可能です。

0

はあなたの欲望の結果を与えることができるかもしれないが、ストリームが分割されている場合は、別の娘の要素間のタイミングの保証が実際にあるストリーム私見

Rx.Observable.combineLatest(
    Rx.Observable.from([0,0,0]), 
    Rx.Observable.from([1,1,1]) 
).flatMap(value=>Rx.Observable.from(value)) 
.subscribe(console.log) 
+0

申し訳ありませんが、私は私が望むものを明確に明確にしていませんでした。私は元の投稿を明確にして編集しました – user1898027

+0

.fromのデフォルトのbeheaviourは、私がマージで行ったが、zipで動作するテストからの非同期をサポートしていません。 a = Rx.Observable.from([1,1、 (a、b).subscribe(console.log) letb = –

関連する問題