2017-10-22 3 views
0

から2つの非同期メソッド、リファクタリングを同期する方法:RxJava - 私は2つのコレクション、バッファの位置更新イベントを持っているJavaの

 private final ScheduledExecutorService mSaveDataExecutor = Executors.newSingleThreadScheduledExecutor(); 
    private boolean mSaveDataScheduled; 
    private final Object mEventsMonitor = new Object(); 

    private ScheduledFuture<?> mScheduledStopLocationUpdatesFuture; 
    private final ScheduledExecutorService mStopLocationUpdatesExecutor = Executors.newSingleThreadScheduledExecutor(); 

:また、私のコードに存在がある

 private List<LocationGeoEvent> mUpdateGeoEvents = new ArrayList<>(); 
    private List<LocationRSSIEvent> mUpdateRSSIEvents = new ArrayList<>(); 

私はこのようなこのcolectionsにイベントを追加します。

public void appendGeoEvent(LocationGeoEvent event) { 
      synchronized (mEventsMonitor) { 
       mUpdateGeoEvents.add(event); 
       scheduleSaveEvents(); 
      } 
    } 

同じことがRSSIのイベントのために行く

は今、scheduleSaveEvents方法は、次のようになります。

 private void scheduleSaveEvents() { 

     synchronized (mSaveDataExecutor) { 
      if (!mSaveDataScheduled) { 
       mSaveDataScheduled = true; 
       mSaveDataExecutor.schedule(
         new Runnable() { 
          @Override 
          public void run() { 
           synchronized (mSaveDataExecutor) { 
            saveEvents(false); 
            mSaveDataScheduled = false; 
           } 
          } 
         }, 
         30, 
         TimeUnit.SECONDS); 
      } 
     } 

    } 

問題は、私は更新を停止し、他の方法を同期する必要があること、です。

private void saveEvents(boolean locationUpdatesAboutToStop) { 

     synchronized (mEventsMonitor) { 
      if (mUpdateGeoEvents.size() > 0 || mUpdateRSSIEvents.size() > 0) { 

       //do something with the data from buffered collection arrayLists and with the boolean locationUpdatesAboutToStop 

       mUpdateGeoEvents.clear(); 
       mUpdateRSSIEvents.clear(); 
      } 

     } 

    } 

はKotlinを使用してRxJavaこのsimplierをリファクタリングする方法はあります:私はsaveEvents方法で

 private void scheduleStopLocationUpdates() { 

     synchronized (mStopLocationUpdatesExecutor) { 
      if (mScheduledStopLocationUpdatesFuture != null) 
       mScheduledStopLocationUpdatesFuture.cancel(true); 

      mScheduledStopLocationUpdatesFuture = mStopLocationUpdatesExecutor.schedule(
        new Runnable() { 
         @Override 
         public void run() { 
          synchronized (mStopLocationUpdatesExecutor) { 
           stopLocationUpdates(); 
           saveEvents(true); 
           cleanAllReadingsData(); 
          } 
         } 
        }, 
        45, 
        TimeUnit.SECONDS); 
     } 

    } 

:それは次のようにトリガーされましたか?

private fun appendRSSIEvent(event: LocationRSSIEvent) { 
    synchronized(mEventsMonitor) { 
     if (!shouldSkipRSSIData(event.nexoIdentifier)) { 
      mUpdateRSSIEvents.add(event) 
      acknowledgeDevice(event.nexoIdentifier) 
      scheduleSaveEvents() 
      startLocationUpdates() 
     } else 
      removeExpiredData() 
    } 
} 

答えて

1

あなたは2つのデータストリームをバッファリングして、保存のためにそれらを組み合わせることができます。

UPDATE

は、ここに私のappendRSSIevents方法です。また、バッファトリガを使用して更新を停止することもできます。

PublishSubject<LocationGeoEvent> mUpdateGeoEventsSubject = PublishSubject.create(); 
PublishSubject<LocationRSSIEvent> mUpdateRSSIEventsSubject = PublishSubject.create(); 

public void appendGeoEvent(LocationGeoEvent event) { 
    mUpdateGeoEventsSubject.onNext(event); 
    triggerSave.onNext(Boolean.TRUE); 
} 

とRSSフィードでも同じです。

ここで、保存手順を実行するために使用されるトリガーが必要です。

PublishSubject<Boolean> triggerSave = PublishSubject.create(); 
PublishSubject<Boolean> triggerStopAndSave = PublishSubject.create(); 

Observable<Boolean> normalSaveTrigger = triggerSave.debounce(30, TimeUnit.SECONDS); 
Observable<Boolean> trigger = Observable.merge(normalSaveTrigger, triggerStopAndSave); 

trigger観測可能な火災のいずれか、通常の保存プロセス火災や私たちが保存を停止している場合。

private void saveEvents(
    List<LocationGeoEvent> geo, 
    List<LocationRSSIEvent> rss, 
    boolean locationUpdatesAboutToStop) { 

    synchronized (mEventsMonitor) { 
     if (geo.size() > 0 || rss.size() > 0) { 
      //do something with the data from buffered collection arrayLists and with the boolean locationUpdatesAboutToStop 
     } 
    } 
} 
private void scheduleStopLocationUpdates() { 
    stopLocationUpdates(); 
    triggerStopAndSave.onNext(Boolean.FALSE); 
    cleanAllReadingsData(); 
} 

Observable.zip(mUpdateGeoEventsSubject.buffer(trigger), 
       mUpdateRSSIEventsSubject.buffer(trigger), 
       trigger, (geo, rss, trgr) -> saveEvents(geo, rss, trgr)) 
    .subscribe(); 

マルチスレッドと安全性については、まだチューニングが必要です。最初のステップは、複数のスレッドがイベントを発行できるように、さまざまなサブジェクトをSerializedSubjectに切り替えることです。

あなたはsaveEventsは、特定のスケジューラ上で実行したい場合、あなたは、中間データ構造を追加する必要がトリプル、observeOn()オペレータによってパラメータを渡すために、またはzip()引数のそれぞれにobserveOn()演算子を適用しますか。

+0

ありがとう、私はあなたのソリューションを使用しようとしますが、あなたはAndroid Studio IDEでこのソリューションを試してみましたか? Observable.zip(..)の部分に問題があります。しかし、私はKotlinを使用しています(これはKotlinでも間違っています)。 IDEは '機能インタフェースタイプを推論できません'と下線を引いて(geo、rss、trgr) - > saveEvents(geo、rss、trgr)の部分を示します – Konrad

+0

ああ、申し訳ありません。 'saveEvents'は渡すべき値があるようにブール値を返すべきです。 –

+0

また、私はappendRssiEventについて重要なことは言及していませんでした。何とか違って見えるかもしれません。私は私の質問を更新しました。また、scheduleStopLocationUpdatesは、scheduleStopLocationとは異なる時刻に45秒間スケジュールされます。だから私はそれがより複雑であると思います。助けてくれてありがとう、本当に感謝しています。 – Konrad

関連する問題