2017-04-16 8 views
2

私はRxJavaとリアクティブプログラミングにとって全く新しいです。 ファイルを読んでObservableに保存する必要がある課題があります。私は内部でBufferedReaderでCallableを作成しようとしましたが、Observable.fromCallable()を使用しましたが、うまく動作しませんでした。RxJava。可読性のあるファイルを読む

どうすればいいですか?

私はRxJava 2.0を使用しています。

答えて

0

this implementationのようなことができます。このように

そしてそのクラスが使用されているhere

public static Observable<byte[]> from(InputStream is, int size) { 
    return Observable.create(new OnSubscribeInputStream(is, size)); 
} 

は最終的にあなたがそれを使用することができます。

Observable<byte[]> chunks = Bytes.from(file, chunkSize); 

詳細hereを。

+0

しかし、どのような場合、私はチャンクサイズを知らないと、ファイルの行を読む必要があります行ごとに? –

+1

その後、 'InputStream'を' ReadLine'メソッドを使って 'BufferedReader'に変換して、そのコードを適応させる必要があります。 – GVillani82

2

私はデータを生成して、オブザーバーが加入するまで観察可能の作成を延期する入れ子になったクラスFileObservableSourceを使用し、基本的なソリューション:

import io.reactivex.Observable; 
import io.reactivex.ObservableSource; 
import io.reactivex.Observer; 

import java.io.IOException; 
import java.nio.file.Files; 
import java.nio.file.Paths; 

public class StackOverflow { 

    public static void main(String[] args) { 
     final Observable<String> observable = Observable.defer(() -> new FileObservableSource("pom.xml")); 
     observable.subscribe(
       line -> System.out.println("next line: " + line), 
         Throwable::printStackTrace, 
         () -> System.out.println("finished") 
       ); 
    } 

    static class FileObservableSource implements ObservableSource<String> { 

     private final String filename; 

     FileObservableSource(String filename) { 
      this.filename = filename; 
     } 

     @Override 
     public void subscribe(Observer<? super String> observer) { 
      try { 
       Files.lines(Paths.get(filename)).forEach(observer::onNext); 
       observer.onComplete(); 
      } catch (IOException e) { 
       observer.onError(e); 
      } 
     } 
    } 
} 
+0

これはバックプレッシャー下でどのようにふるまいますか?言い換えれば、私は加入者が読んでいるよりも遅い各行を処理している別のスレッドを観察しているとしましょう。 – 0cd

+0

それはひどく振舞うでしょう。バックプレッシャーはここでは扱いません。私はこれに対してリアクティブストリームの実装を使用しますが、この質問は基本的な例です。 –