私は、スプリングフレームワークStringRedisTemplateを使用して、複数のスレッドで発生するエントリを更新しています。このJSONエントリでRxJava非同期スレッド実装による競合状態の防止
public void processSubmission(final String key, final Map<String, String> submissionDTO) {
final String hashKey = String.valueOf(Hashing.MURMUR_HASH.hash(key));
this.stringRedisTemplate.expire(key, 60, TimeUnit.MINUTES);
final HashOperations<String, String, String> ops = this.stringRedisTemplate.opsForHash();
Map<String, String> data = findByKey(key);
String json;
if (data != null) {
data.putAll(submissionDTO);
json = convertSubmission(data);
} else {
json = convertSubmission(submissionDTO);
}
ops.put(key, hashKey, json);
}
前に、コードの中で見られるように、以下の
key (assignmentId) -> value (submissionId, status)
に見えるキャッシュエントリを更新し、私は現在のエントリを取得し、新しいエントリを追加し、それらのすべてを置きます。しかし、この操作は複数のスレッドで行うことができるため、競合状態が発生してデータが失われる可能性があります。私は上記のメソッドを同期することができますが、processSubmissionメソッドが2つの非同期スレッドでRxJava経由で呼び出されるRxJava実装の並列処理能力のボトルネックになります。
class ProcessSubmission{
@Override
public Observable<Boolean> processSubmissionSet1(List<Submission> submissionList, HttpHeaders requestHeaders) {
return Observable.create(observer -> {
for (final Submission submission : submissionList) {
//Cache entry insert method invoke via this call
final Boolean status = processSubmissionExecutor.processSubmission(submission, requestHeaders);
observer.onNext(status);
}
observer.onCompleted();
});
}
@Override
public Observable<Boolean> processSubmissionSet2(List<Submission> submissionList, HttpHeaders requestHeaders) {
return Observable.create(observer -> {
for (final Submission submission : submissionList) {
//Cache entry insert method invoke via this call
final Boolean status = processSubmissionExecutor.processSubmission(submission, requestHeaders);
observer.onNext(status);
}
observer.onCompleted();
});
}
}
上記のサービスAPIは以下のとおりです。
class MyService{
public void handleSubmissions(){
final Observable<Boolean> statusObser1 = processSubmission.processSubmissionSet1(subListDtos.get(0), requestHeaders)
.subscribeOn(Schedulers.newThread());
final Observable<Boolean> statusObser2 = processSubmission.processSubmissionSet2(subListDtos.get(1), requestHeaders)
.subscribeOn(Schedulers.newThread());
statusObser1.subscribe();
statusObser2.subscribe();
}
}
したがって、handleSubmissionsは割り当てIDごとに複数のスレッドを呼び出しています。しかし、メインスレッドごとに、2つのリアクティブなJavaスレッドを作成して呼び出し、それぞれの割り当てに関連付けられたサブミッションリストを処理します。
RxJava実装のパフォーマンスを維持しながら、REDJISエントリの競合状態を防止する最良の方法は何でしょうか?このredis操作をより効率的に行う方法がありますか?