2017-11-05 15 views
1

私は、スプリングフレームワークStringRedisTemplateを使用して、複数のスレッドで発生するエントリを更新しています。このJSONエントリでRxJava非同期スレッド実装による競合状態の防止

public void processSubmission(final String key, final Map<String, String> submissionDTO) { 
    final String hashKey = String.valueOf(Hashing.MURMUR_HASH.hash(key)); 
    this.stringRedisTemplate.expire(key, 60, TimeUnit.MINUTES); 
    final HashOperations<String, String, String> ops = this.stringRedisTemplate.opsForHash(); 
    Map<String, String> data = findByKey(key); 
    String json; 
    if (data != null) { 
     data.putAll(submissionDTO); 
     json = convertSubmission(data); 
    } else { 
     json = convertSubmission(submissionDTO); 
    } 
    ops.put(key, hashKey, json); 
} 

前に、コードの中で見られるように、以下の

key (assignmentId) -> value (submissionId, status) 

に見えるキャッシュエントリを更新し、私は現在のエントリを取得し、新しいエントリを追加し、それらのすべてを置きます。しかし、この操作は複数のスレッドで行うことができるため、競合状態が発生してデータが失われる可能性があります。私は上記のメソッドを同期することができますが、processSubmissionメソッドが2つの非同期スレッドでRxJava経由で呼び出されるRxJava実装の並列処理能力のボトルネックになります。

class ProcessSubmission{ 

@Override 
    public Observable<Boolean> processSubmissionSet1(List<Submission> submissionList, HttpHeaders requestHeaders) { 
     return Observable.create(observer -> { 
      for (final Submission submission : submissionList) {    
       //Cache entry insert method invoke via this call 
       final Boolean status = processSubmissionExecutor.processSubmission(submission, requestHeaders); 
       observer.onNext(status); 
      } 
      observer.onCompleted(); 
     }); 
    } 

    @Override 
    public Observable<Boolean> processSubmissionSet2(List<Submission> submissionList, HttpHeaders requestHeaders) { 
     return Observable.create(observer -> { 
      for (final Submission submission : submissionList) { 
       //Cache entry insert method invoke via this call 
       final Boolean status = processSubmissionExecutor.processSubmission(submission, requestHeaders); 
       observer.onNext(status); 
      } 
      observer.onCompleted(); 
     }); 
    } 

} 

上記のサービスAPIは以下のとおりです。

class MyService{  
public void handleSubmissions(){ 
    final Observable<Boolean> statusObser1 = processSubmission.processSubmissionSet1(subListDtos.get(0), requestHeaders) 
       .subscribeOn(Schedulers.newThread()); 
    final Observable<Boolean> statusObser2 = processSubmission.processSubmissionSet2(subListDtos.get(1), requestHeaders) 
        .subscribeOn(Schedulers.newThread());     
    statusObser1.subscribe(); 
    statusObser2.subscribe(); 
    }  
} 

したがって、handleSubmissionsは割り当てIDごとに複数のスレッドを呼び出しています。しかし、メインスレッドごとに、2つのリアクティブなJavaスレッドを作成して呼び出し、それぞれの割り当てに関連付けられたサブミッションリストを処理します。

RxJava実装のパフォーマンスを維持しながら、REDJISエントリの競合状態を防止する最良の方法は何でしょうか?このredis操作をより効率的に行う方法がありますか?

答えて

0

最後にput操作を実行するために、ops変数だけを使用しているように見えます。同期が必要な場所を特定することができます。

私が行った短い研究では、HashOperationsがまだスレッドセーフでない場合は見つかりませんでした。

しかし、あなたはちょうどあなたが心配一部のようなものを行うことです切り離すことができる方法の例:

public void processSubmission(final String key, final Map<String, String> submissionDTO) { 
    final String hashKey = String.valueOf(Hashing.MURMUR_HASH.hash(key)); 
    this.stringRedisTemplate.expire(key, 60, TimeUnit.MINUTES); 
    Map<String, String> data = findByKey(key); 
    String json; 
    if (data != null) { 
     data.putAll(submissionDTO); 
     json = convertSubmission(data); 
    } else { 
     json = convertSubmission(submissionDTO); 
    } 
    putThreadSafeValue(key, hashKey, json); 
} 

をそして、ちょうどput操作に同期されている方法があります。

private synchronized void putThreadSafeValue(key, hashKey, json) { 
    final HashOperations<String, String, String> ops = this.stringRedisTemplate.opsForHash(); 
    ops.put(key, hashKey, json); 
} 

これを行うにはいくつかの方法がありますが、スレッドの競合をそのput操作に制限することができるようです。

関連する問題