0
私は現在RxJava2フロアブルに(アッカのFILEIOを使用してファイルを読み込むから受け取ったなど)アッカソースを変換するには、次のコードを使用しています:AkkaソースをRxJava2に変換することは可能ですか?
private Flowable<Buffer> akkaConversion(Source<ByteString, NotUsed> data,
Flow<ByteString, ByteString, NotUsed> compType) {
final Publisher<ByteString> uncompressedData =
data.via(compType)
.runWith(Sink.asPublisher(AsPublisher.WITHOUT_FANOUT), this.materializer);
return Flowable.fromPublisher(uncompressedData)
.map(bytes -> Buffer.buffer(bytes.toArray()));
}
この(作業)溶液で私の問題があること、です少なくとも私が現在理解している範囲では、.runWith()
メソッド呼び出しはすでにコードを実行しています。つまり、指定されたSourceからすべてのデータを収集し、バッファしてからPublisherに配置します。この時点で実行する必要はありませんか?私はマテリアライザーなしでこの時点で変換を定義し、後でFlowableにサブスクライブするものだけを実行したいと思います。
ありがとうございます!