2017-04-05 7 views
4

私はRx Java 1でコールバックをラップするためにこのコードを持っていますが、それはうまくコンパイルされますが、RX Java 2に切り替えるとコンパイルされません。Rx Java 2:コールバックを折り返す方法は?

return Observable.fromEmitter(new Action1<AsyncEmitter<Integer>>() { 
      @Override 
      public void call(AsyncEmitter<Integer> emitter) { 

       transObs.setTransferListener(new TransferListener() { 
        @Override 
        public void onStateChanged(int id, TransferState state) { 
         if (state == TransferState.COMPLETED) 
          emitter.onCompleted(); 
        } 

        @Override 
        public void onProgressChanged(int id, long bytesCurrent, long bytesTotal) { 

        } 

        @Override 
        public void onError(int id, Exception ex) { 
         emitter.onError(ex); 
        } 
       }); 

       emitter.setCancellation(new AsyncEmitter.Cancellable() { 
        @Override 
        public void cancel() throws Exception { 

         transObs.cleanTransferListener(); 
        } 
       }); 
      } 
     }, AsyncEmitter.BackpressureMode.BUFFER); 

UPDATE:

私はこの思い付いたが、あなたはそのOnCreate関数呼び出し以降背圧に対処する必要がありますか?

return Observable.create(new ObservableOnSubscribe<List<DigitsUser>>() { 

     @Override 
     public void subscribe(final ObservableEmitter<List<DigitsUser>> emitter) throws Exception { 

      mDigitFriends.findFriends((gotEm, users) -> { 
       emitter.onNext(users); 
      }); 

      emitter.setCancellable(() -> { 
       emitter.onNext(null); 
      }); 
     } 
    }); 
+0

よ男が、何が何をコンパイルしていないのですか? RxJava2のfromEmitter()と等価である必要があり、その答えはcreate()です – yosriz

+0

1)あなたはより具体的に、エラーは何か、何を試していますか?どこでも 'emitter.onNext'をどこにも呼び出さないでください。データがないのも不思議ではありません。 2)キャンセル部分を中止しました。 3)Observableが必要な場合は、Observable!を使用してください。 – akarnokd

+0

私の潜在的な解決策を更新しました – Mike6679

答えて

6

背圧が心配な場合は、Flowableクラスを使用する必要があります。ここでRxJava2 Wikiからの引用です:実際には、1.xのfromEmitter(旧fromAsync)はFlowable.createに

名前が変更されました。ここで

フロアブルクラスを使用して、あなたの例である:

return Flowable.create(new FlowableEmitter<List<DigitsUser>>() { 

     @Override 
     public void subscribe(final FlowableEmitter<List<DigitsUser>> emitter) throws Exception { 

      mDigitFriends.findFriends((gotEm, users) -> { 
       emitter.onNext(users); 
      }); 

      emitter.setCancellable(() -> { 
       emitter.onNext(null); 
      }); 
     } 
    }, BackpressureStrategy.BUFFER); 
関連する問題