0
パラメータObjectInputStreamをとり、ソケット上のデータを観察する小さなコードを書きました。 readObject()関数が "null"を返すと、関数observeSocket(ObjectInputStream in)がObjectだけを取るので、サブスクライバはonError()関数を実行してプログラムを終了します。Rxjavaで登録解除するまでソケットを観察する方法
しかし、私が必要とするのは、オブジェクトのソケットを観察して、オブジェクトがソケット上で観察され、観察者が関数の機能を終了する必要がある場合にのみ返すことです。必要な機能を実現するためにコードをどのように変更できますか?背圧を認識させるとの契約に準拠した観測のでObservable.create(OnSubscribe)
を使用して
public Observable<Object> observeSocket(ObjectInputStream in){
return Observable.create(subscriber -> {
while(!subscriber.isUnsubscribed()) {
subscriber.onNext(getData(in));
}
subscriber.onCompleted();
});
}
public Object getData(ObjectInputStream in){
Object streamData = null;
try{
streamData = in.readObject();
}
catch(IOException e){
//e.printStackTrace();
}
catch(ClassNotFoundException e){
e.printStackTrace();
}
return streamData;
}
あなたのマシンでこのコストをテストしましたか? SyncOnSubscribeジェネリックシグネチャは、アクションの戻り値が異なるため、コンパイルエラーと構文エラーが発生します。 – user64287
createStatelessはAction1()とobserver.onNextのみを使用するか、他の関数は実装できません。 – user64287
Observable.createを入れるのを忘れました。rxjavaを使って私のためにうまくコンパイルしました。1.2.1 –