2017-08-24 4 views
0

私は現在RxJava2フロアブルに(アッカのFILEIOを使用してファイルを読み込むから受け取ったなど)アッカソースを変換するには、次のコードを使用しています:AkkaソースをRxJava2に変換することは可能ですか?

private Flowable<Buffer> akkaConversion(Source<ByteString, NotUsed> data, 
     Flow<ByteString, ByteString, NotUsed> compType) { 
    final Publisher<ByteString> uncompressedData = 
     data.via(compType) 
      .runWith(Sink.asPublisher(AsPublisher.WITHOUT_FANOUT), this.materializer); 
    return Flowable.fromPublisher(uncompressedData) 
     .map(bytes -> Buffer.buffer(bytes.toArray())); 
} 

この(作業)溶液で私の問題があること、です少なくとも私が現在理解している範囲では、.runWith()メソッド呼び出しはすでにコードを実行しています。つまり、指定されたSourceからすべてのデータを収集し、バッファしてからPublisherに配置します。この時点で実行する必要はありませんか?私はマテリアライザーなしでこの時点で変換を定義し、後でFlowableにサブスクライブするものだけを実行したいと思います。

ありがとうございます!

答えて

0

利用延期

private Flowable<Buffer> akkaConversion(Source<ByteString, NotUsed> data, 
     Flow<ByteString, ByteString, NotUsed> compType) { 

    return Flowable.defer(() -> data.via(compType) 
     .runWith(Sink.asPublisher(AsPublisher.WITHOUT_FANOUT), this.materializer) 
    ).map(bytes -> Buffer.buffer(bytes.toArray())); 
} 
:(追記:私はアッカソースはワンショットであるため、これを何回もしなければなりませんでした)
関連する問題