process.stdin
やfs.createReadStream
などのノードjsストリームがある場合、これをRxJs5を使用してRxJs観測ストリームに変換するにはどうすればよいですか?ノード可読ストリームをRX観測可能に変換する方法
私はRxJs-NodeがfromReadableStream
メソッドを持っていますが、1年近く更新されていないようです。
process.stdin
やfs.createReadStream
などのノードjsストリームがある場合、これをRxJs5を使用してRxJs観測ストリームに変換するにはどうすればよいですか?ノード可読ストリームをRX観測可能に変換する方法
私はRxJs-NodeがfromReadableStream
メソッドを持っていますが、1年近く更新されていないようです。
についてMarkの推薦に続いて、これを探している人は、I adapted rx-node fromStream
implementation for rxjs5です。
import { Observable } from 'rxjs';
// Adapted from https://github.com/Reactive-Extensions/rx-node/blob/87589c07be626c32c842bdafa782fca5924e749c/index.js#L52
export default function fromStream(stream, finishEventName = 'end', dataEventName = 'data') {
stream.pause();
return new Observable((observer) => {
function dataHandler(data) {
observer.next(data);
}
function errorHandler(err) {
observer.error(err);
}
function endHandler() {
observer.complete();
}
stream.addListener(dataEventName, dataHandler);
stream.addListener('error', errorHandler);
stream.addListener(finishEventName, endHandler);
stream.resume();
return() => {
stream.removeListener(dataEventName, dataHandler);
stream.removeListener('error', errorHandler);
stream.removeListener(finishEventName, endHandler);
};
}).share();
}
私が作業していたものから移動して以来、私はこれをテストしていませんが、誰かが持っていて働いていれば、私はこの回答を受け入れます:) – JuanCaicedo
私はそれを使用しており、それまではうまくいっています。私は単体テストしていないと思った。 –
RxJsノード実装がRxJs4基づいているが、多くの作業をせずにRxJs5に移植することができるhttps://github.com/Reactive-Extensions/rx-node/blob/87589c07be626c32c842bdafa782fca5924e749c/index.js#L52
ありがとうございます。そうすれば、コンバージョンを実装する必要があるようですね。 – JuanCaicedo
rxjs5のバージョンはまだありませんので、今のところは –
V4とV5(免責未試験)の両方のために働く必要があり、次の
fromStream: function (stream, finishEventName, dataEventName) {
stream.pause();
finishEventName || (finishEventName = 'end');
dataEventName || (dataEventName = 'data');
return Observable.create(function (observer) {
// This is the "next" event
const data$ = Observable.fromEvent(stream, dataEventName);
// Map this into an error event
const error$ = Observable.fromEvent(stream, 'error')
.flatMap(err => Observable.throw(err));
// Shut down the stream
const complete$ = Observable.fromEvent(stream, finishEventName);
// Put it all together and subscribe
const sub = data$
.merge(error$)
.takeUntil(complete$)
.subscribe(observer);
// Start the underlying node stream
stream.resume();
// Return a handle to destroy the stream
return sub;
})
// Avoid recreating the stream on duplicate subscriptions
.share();
},
それが機能するのですか?だれがそれがうまくいくか気にする – smnbbrv
@smnbbrvうまくいくのは間違いないが、それはRxJS4であり、RxJS5と互換性がない。 – cartant
あなたは[ソース](https://github.com/Reactive-Extensions/rx-node/blob/master/index.js#L45-L83)を見て、それを自分で変換するために何が必要か見てみることができます - 実装はかなり小さいです。 – cartant