2016-04-10 13 views
0

Rxについて多くの文献を読んでいますが、一方ではすべてが明確ですが、他方では何も明らかではありません。私はこのライブラリを使ってサーバーからデータを簡単にフィルタリングして保存しようとしていますが、期待どおりに動作していません。私は、単純なチャンネルリスト管理を実装する必要があります。RxAndroid。単純なキャッシングとデータのフィルタリング

1. Cache on disk and in memory 
2. When user requested - return filtered channels 
3. All Subscribers that was attached to this Observable must be notified if filtered cahnnels list was changed (call onNext() for all Subscribers) 

私は次のように書いた:

ArrayList<Channel> channels = null; 
ArrayList<Channel> filteredChannels = null; 

Observable<ArrayList<Channel>> filteredObservable = Observable.create() 
// here I need to check if cache is valid, if no - download from server 
// then filter channels and return filteredChannels in onNext call 
// all subscribers that called subscribe before and not called unsubscribe must be notified if filteredChannels changed 
// how to implement this? 
.subscribeOn(Schedulers.io()); 

public void setFilter(Filter filter) { 
// update filteredChannels with the new filter and notify all Subscribers (call onNext()). How to implement this? 
} 

public Observable<ArrayList<Channel>> getFilteredChannels() { 
    return filteredObservable; 
} 

私が正しく受信パターンの論理を理解したりしないでください?前もって感謝します。

答えて

0

私は、このタスクのための解決策を見つけた:私はflatMapmapを使用する必要があると私は、変数の変化のために、すべての加入者に通知するBehaviourSubjectを使用しようとしましたが、このソリューションは、なぜ私それはだ、エラーが原因MissingBackpressureExceptionの、全く安定していません次のObsevableValueクラスを作成しました:

public class ObservableValue<T> { 
    private static final String TAG = ObservableValue.class.getSimpleName(); 

    private ArrayList<Registration> subscriptions = new ArrayList<>(); 
    private T value; 
    private int index; 
    private boolean firstNotify; 
    private ReentrantLock lock = new ReentrantLock(); 

    public ObservableValue() { 
     firstNotify = false; 
    } 

    public ObservableValue(T defaultValue) { 
     value = defaultValue; 
     firstNotify = true; 
    } 

    @SuppressWarnings("unchecked") 
    public void setValue(T value) { 
     lock.lock(); 
     this.value = value; 
     for (Registration listener: subscriptions) { 
      if (!listener.isFinished()) { 
       listener.getListener().valueChanged(value); 
      } 
     } 
     lock.unlock(); 
    } 

    public T getValue() { 
     lock.lock(); 
     T result = value; 
     lock.unlock(); 
     return result; 
    } 

    public Registration subscribe(ValueChangeListener<T> listener) { 
     lock.lock(); 
     Registration s = new Registration(this, index, listener); 
     index++; 
     subscriptions.add(s); 
     if (firstNotify||value!=null) { 
      listener.valueChanged(value); 
     } 
     lock.unlock(); 
     return s; 
    } 

    protected void finish(int index) { 
     lock.lock(); 
     for (int i=0;i<subscriptions.size();i++) { 
      Registration s = subscriptions.get(i); 
      if (s.getIndex()==index) { 
       subscriptions.remove(i); 
       break; 
      } 
     } 
     lock.unlock(); 
    } 
} 

public abstract class ValueChangeListener<T> { 
    private Looper looper; 

    public ValueChangeListener() {} 

    public ValueChangeListener(Looper looper) { 
     this.looper = looper; 
    } 

    public void valueChanged(T data) { 
     if (looper!=null) { 
      Handler handler = new Handler(looper, msg -> { 
       onValueChanged(data); 
       return true; 
      }); 
      handler.sendEmptyMessage(0); 
     } else { 
      onValueChanged(data); 
     } 
    } 

    public abstract void onValueChanged(T data); 
} 

public class Registration { 
    private ObservableValue observableValue; 
    private int index; 
    private ValueChangeListener listener; 
    private volatile boolean finished = false; 

    protected Registration(ObservableValue observableValue, int index, ValueChangeListener listener) { 
     this.observableValue = observableValue; 
     this.index = index; 
     this.listener = listener; 
    } 

    protected ValueChangeListener getListener() { 
     return listener; 
    } 

    protected int getIndex() { 
     return index; 
    } 

    public boolean isFinished() { 
     return finished; 
    } 

    public void finish() { 
     finished = true; 
     observableValue.finish(index); 
    } 
} 

これは期待どおりに動作しています。 Rxのように、ObservableValueは登録オブジェクトを返します。登録オブジェクトは不要になったときにfinished()にする必要があります。

0

私は.map()の関数を使ってあなたのリストをフィルターし、あなたの出力を返します。これは、Observableがフィルタリングされた結果を出力することを意味します。

次に、フィルタリングされたリストを.subscribe()関数のキャッシュにコミットできます。

関連する問題