2017-04-18 20 views
0

私は基本レルム要求を扱うダガーシングルトンラッパーを持っています。その1つは次のようになります。レルム - 非同期キューの実装

public void insertOrUpdateAsync(final List<RealmMessage> messages, @Nullable final OnInsertListener listener) { 
    Realm instance = getRealmInstance(); 
    instance.executeTransactionAsync(realm -> { 
       List<RealmMessage> newMessages = insertOrUpdateMessages(realm, messages); 
      }, 
      () -> success(listener, instance), 
      error -> error(listener, error, instance)); 
} 

private List<RealmMessage> insertOrUpdateMessages(@NonNull Realm realm, @NonNull final List<RealmMessage> messages) { 
    ... 
    return realm.copyToRealmOrUpdate(unattendedMessages); 
} 

しかし、 - 長いストーリーショート - 私はinsertOrUpdateAsynch()を何度も起動するコーナーケースがあります。そして、いくつかの要求の後、私はこれを取得:

Caused by: java.util.concurrent.RejectedExecutionException: Task [email protected] rejected from [email protected][Running, pool size = 17, active threads = 17, queued tasks = 100, completed tasks = 81]

私の質問は:私は、アプリケーション全体の流れを再構築することなく、これをどのように処理しますか。 私の考えは、RxJava経由で着信要求をキューに入れることでした。私は正しい?どの事業者を自分で検討し、教育すべきですか?

まったく間違った方法でこれに近づいていますか? 私のグーグルのほとんどから、私はループの中で私のような方法を起動することに問題があることに気づいた。私は何も使っていない。私の場合、このメソッドは複数のレスポンスによって起動され、現在のバックエンド実装のために変更することは不可能です。

答えて

0

アプリケーションを再設計したくない場合は、カウントセマフォを使用することができます。 2つのスレッドが即座にロックを取得することがわかります。もう1つのスレッドは、ある呼び出しが1つのロックを解放するまでブロックします。タイムアウトなしでacquire()を使用することは推奨されません。

RxJavaを使用するには、アプリケーションのデザインを変更しなければならず、RxJavaのレート制限はそれほど簡単ではありません。

private final Semaphore semaphore = new Semaphore(2); 

@Test 
public void name() throws Exception { 
    Thread t1 = new Thread(() -> { 
     doNetworkStuff(); 
    }); 
    Thread t2 = new Thread(() -> { 
     doNetworkStuff(); 
    }); 
    Thread t3 = new Thread(() -> { 
     doNetworkStuff(); 
    }); 

    t1.start(); 
    t2.start(); 
    t3.start(); 

    Thread.sleep(1500); 
} 

private void doNetworkStuff() { 
    try { 
     System.out.println("enter doNetworkStuff"); 

     semaphore.acquire(); 

     System.out.println("acquired"); 

     Thread.sleep(1000); 

    } catch (InterruptedException e) { 
     e.printStackTrace(); // Don't do this!! 
    } finally { 
     semaphore.release(); 
    } 
} 
関連する問題