2016-12-14 6 views
0

多くの操作を実行するために使用されるかなり複雑なチェーンのRxコードがあります。基本的に、Observableが起動すると、アイテムのリストが出力されます。各アイテムについて、接続が行われ(これらのアイテムはBLEデバイスです)、特性が書き込まれます。エラーイベントを伝統的なリスナーを使用せずにチェーンに伝える

この特性は、エラーが発生するまで(バッテリーを引き出すなど)10秒ごとに書き換えられます。その後、元のリストの次の項目が接続され、以下同様に続きます。

public Observable<String> connectForPolice(String name, RemovalListener listener) { 

     StringBuilder builder = new StringBuilder(mAddress); 
     builder.setCharAt(mAddress.length() - 1, '4'); 
     RxBleDevice device = mRxBleClient.getBleDevice(builder.toString()); 

     return device.establishConnection(mContext, false) 
       .timeout(12, TimeUnit.SECONDS) 
       .doOnError(throwable -> { 
        Log.d(TAG, "Error thrown after timeout."); 
        throwable.printStackTrace(); 
       }) 
       .flatMap(new Func1<RxBleConnection, Observable<byte[]>>() { 
        @Override 
        public Observable<byte[]> call(RxBleConnection rxBleConnection) { 
         byte[] value = new byte[1]; 
         value[0] = (byte) (3 & 0xFF); 

         return Observable // ... we return an observable ... 
           .defer(() -> { 
            return rxBleConnection.writeCharacteristic(Constants.BUZZER_SELECT, value); 
           }) 
           .repeatWhen(observable -> { 
            return observable.delay(10, TimeUnit.SECONDS); 
           }); 
        } 
       }) 
       .observeOn(AndroidSchedulers.mainThread()) 
       .onErrorResumeNext(throwable -> { 
         throwable.printStackTrace(); 
         listener.onFinished(name); 
        return Observable.empty(); 
       }); 
    } 

いくつかの問題がここにあります:アクションのほとんどが発生するのはここ

です。ご覧のとおり、従来のリスナーがこのメソッドに渡され、このリスナーはonErrorResumeNextで呼び出されます。このリスナーは、アクティビティ内のRecyclerViewからアイテムを削除するために使用されていますが、それは重要ではありません。問題は、この方法でリスナーを使用することは、Rxの反パターンのビットです。上記の例を呼び出すコードを含め、関連するコードの残りは以下のとおりです。

@Override 
    public void onFinished(String deviceName) { 
     mAdapter.removeItem(deviceName); 
    } 

    private void startConnecting() { 
     mIsBeepingAll = true; 
     mConnectingSubscription = Observable.from(mAdapter.getItems()) 
       .flatMap((Func1<Device, Observable<?>>) device -> { 
        Log.d(TAG, "connecting for policing"); 
        return device.connectForPolice(device.getName(), PoliceActivity.this); 
       }, 1) 
       .doOnError(throwable -> throwable.printStackTrace()) 
       .observeOn(AndroidSchedulers.mainThread()) 
       .subscribe(new Subscriber<Object>() { 
        @Override 
        public void onCompleted() { 
         Log.d(TAG, "onCompleted..."); 
        } 

        @Override 
        public void onError(Throwable e) { 
         e.printStackTrace(); 
         Log.d(TAG, "onError..."); 
        } 

        @Override 
        public void onNext(Object o) { 
         Log.d(TAG, "onNext... "); 
        } 
       }); 
    } 

リスナーの実装が含まれています。問題は、代わりにRxを使用してリスナーと同等の処理を実行するにはどうすればよいのでしょうか?

答えて

1

最も単純には、この変更するには、次のようになります。

  .onErrorResumeNext(throwable -> { 
       throwable.printStackTrace(); 
       return Observable.just(name); 
      }); 

そして、発信者が使用する:このに

  .onErrorResumeNext(throwable -> { 
       throwable.printStackTrace(); 
       listener.onFinished(name); 
       return Observable.empty(); 
      }); 

private void startConnecting() { 
    mIsBeepingAll = true; 
    mConnectingSubscription = Observable.from(mAdapter.getItems()) 
      .flatMap((Func1<Device, Observable<?>>) device -> { 
       Log.d(TAG, "connecting for policing"); 
       return device.connectForPolice(device.getName(), PoliceActivity.this); 
      }, 1) 
      .doOnError(throwable -> throwable.printStackTrace()) 
      .observeOn(AndroidSchedulers.mainThread()) 
      .subscribe(new Subscriber<Object>() { 
       @Override 
       public void onCompleted() { 
        Log.d(TAG, "onCompleted..."); 
       } 

       @Override 
       public void onError(Throwable e) { 
        e.printStackTrace(); 
        Log.d(TAG, "onError..."); 
       } 

       @Override 
       public void onNext(String deviceName) { 
        Log.d(TAG, "onNext... "); 
        mAdapter.removeItem(deviceName); 
       } 
      }); 
} 
+0

これは私が最初に行うことを考えていたものです。私は、Observable.empty()が、onCompleted()が直ちに呼び出され、リスト上の次の項目で操作を開始するために再登録されることを前提としていました。これは当てはまりませんか?私はこれをもう一度試さなければならないでしょう。 – Orbit

+0

それは本当です。 'Observable.empty()'は何も出ません。完了するだけです。 'Observable.just(object)'は 'object'を一つ送出して終了します。外部切断(通常の状況では例外です)に依存しており、ストリーム全体をエラーアウトしたくないために使用されます。 –

+0

ああ、私はObservable.just()がすぐに完了したとは思っていませんでした。 – Orbit

関連する問題