2017-01-08 7 views
3

process.stdinfs.createReadStreamなどのノードjsストリームがある場合、これをRxJs5を使用してRxJs観測ストリームに変換するにはどうすればよいですか?ノード可読ストリームをRX観測可能に変換する方法

私はRxJs-NodefromReadableStreamメソッドを持っていますが、1年近く更新されていないようです。

+0

それが機能するのですか?だれがそれがうまくいくか気にする – smnbbrv

+0

@smnbbrvうまくいくのは間違いないが、それはRxJS4であり、RxJS5と互換性がない。 – cartant

+2

あなたは[ソース](https://github.com/Reactive-Extensions/rx-node/blob/master/index.js#L45-L83)を見て、それを自分で変換するために何が必要か見てみることができます - 実装はかなり小さいです。 – cartant

答えて

5

についてMarkの推薦に続いて、これを探している人は、I adapted rx-node fromStream implementation for rxjs5です。

import { Observable } from 'rxjs'; 

// Adapted from https://github.com/Reactive-Extensions/rx-node/blob/87589c07be626c32c842bdafa782fca5924e749c/index.js#L52 
export default function fromStream(stream, finishEventName = 'end', dataEventName = 'data') { 
    stream.pause(); 

    return new Observable((observer) => { 
    function dataHandler(data) { 
     observer.next(data); 
    } 

    function errorHandler(err) { 
     observer.error(err); 
    } 

    function endHandler() { 
     observer.complete(); 
    } 

    stream.addListener(dataEventName, dataHandler); 
    stream.addListener('error', errorHandler); 
    stream.addListener(finishEventName, endHandler); 

    stream.resume(); 

    return() => { 
     stream.removeListener(dataEventName, dataHandler); 
     stream.removeListener('error', errorHandler); 
     stream.removeListener(finishEventName, endHandler); 
    }; 
    }).share(); 
} 
+0

私が作業していたものから移動して以来、私はこれをテストしていませんが、誰かが持っていて働いていれば、私はこの回答を受け入れます:) – JuanCaicedo

+0

私はそれを使用しており、それまではうまくいっています。私は単体テストしていないと思った。 –

2

V4とV5(免責未試験)の両方のために働く必要があり、次の

fromStream: function (stream, finishEventName, dataEventName) { 
    stream.pause(); 

    finishEventName || (finishEventName = 'end'); 
    dataEventName || (dataEventName = 'data'); 

    return Observable.create(function (observer) { 

     // This is the "next" event 
     const data$ = Observable.fromEvent(stream, dataEventName); 

     // Map this into an error event 
     const error$ = Observable.fromEvent(stream, 'error') 
     .flatMap(err => Observable.throw(err)); 

     // Shut down the stream 
     const complete$ = Observable.fromEvent(stream, finishEventName); 

     // Put it all together and subscribe 
     const sub = data$ 
     .merge(error$) 
     .takeUntil(complete$) 
     .subscribe(observer); 

     // Start the underlying node stream 
     stream.resume(); 

     // Return a handle to destroy the stream 
     return sub; 
    }) 

    // Avoid recreating the stream on duplicate subscriptions 
    .share(); 
    }, 
関連する問題