2016-10-09 5 views
0

パラメータObjectInputStreamをとり、ソケット上のデータを観察する小さなコードを書きました。 readObject()関数が "null"を返すと、関数observeSocket(ObjectInputStream in)がObjectだけを取るので、サブスクライバはonError()関数を実行してプログラムを終了します。Rxjavaで登録解除するまでソケットを観察する方法

しかし、私が必要とするのは、オブジェクトのソケットを観察して、オブジェクトがソケット上で観察され、観察者が関数の機能を終了する必要がある場合にのみ返すことです。必要な機能を実現するためにコードをどのように変更できますか?背圧を認識させるとの契約に準拠した観測のでObservable.create(OnSubscribe)を使用して

public Observable<Object> observeSocket(ObjectInputStream in){ 
    return Observable.create(subscriber -> { 
     while(!subscriber.isUnsubscribed()) { 
      subscriber.onNext(getData(in)); 
     } 

     subscriber.onCompleted(); 

    }); 
} 

public Object getData(ObjectInputStream in){ 

    Object streamData = null; 

    try{ 

     streamData = in.readObject(); 
    } 

    catch(IOException e){ 
     //e.printStackTrace(); 
    } 

    catch(ClassNotFoundException e){ 
     e.printStackTrace(); 
    } 

    return streamData; 

} 

答えて

2

避けてはトリッキーなビジネスです。これは、Observable.create(SyncOnSubscribe)を使用するための良い候補です。

ObjectInputStream ois = ...; 

Observable<Object> objects = 
    Observable.create(
    SyncOnSubscribe.createStateless(observer -> { 
     try { 
      Object value = ois.readObject(); 
      // you decide how end of file is indicated 
      // a common strategy is to write a null object 
      // to the end of the Object stream. 
      if (value == END_OF_FILE) { 
       observer.onCompleted(); 
      } else { 
       observer.onNext(value); 
      } 
     } catch (Exception e) { 
      observer.onError(e); 
     } 
    }));   
+0

あなたのマシンでこのコストをテストしましたか? SyncOnSubscribeジェネリックシグネチャは、アクションの戻り値が異なるため、コンパイルエラーと構文エラーが発生します。 – user64287

+0

createStatelessはAction1()とobserver.onNextのみを使用するか、他の関数は実装できません。 – user64287

+1

Observable.createを入れるのを忘れました。rxjavaを使って私のためにうまくコンパイルしました。1.2.1 –

関連する問題