2017-11-08 4 views
0
import java.util.Map; 
import java.util.concurrent.ConcurrentHashMap; 
import java.util.concurrent.Future; 
import org.slf4j.Logger; 
import org.slf4j.LoggerFactory; 


public class DedupingQueue<E> implements QueueWrapper<E> { 
    private static final Logger LOGGER = LoggerFactory.getLogger(DedupingQueue.class); 

private final Map<E, Future<E>> itemsBeingWorkedOn = new ConcurrentHashMap<>(); 
private final AsyncWorker<E> asyncWorker; //contains async method backed by a thread pool 

public DedupingQueue(AsyncWorker<E> asyncWorker) { 
    this.asyncWorker = asyncWorker; 
} 

@Override 
public Future<E> submit(E e) { 
    if (!itemsBeingWorkedOn.containsKey(e)) { 
     itemsBeingWorkedOn.put(e, asyncWorker.executeWorkAsync(e, this)); 
    } else { 
     LOGGER.debug("Rejected [{}] as it's already being worked on", e); 
    } 
    return itemsBeingWorkedOn.get(e); 
} 

@Override 
public void complete(E e) { 
    LOGGER.debug("Completed [{}]", e); 
    itemsBeingWorkedOn.remove(e); 
} 

@Override 
public void rejectAndRetry(E e) { 
    itemsBeingWorkedOn.putIfAbsent(e, asyncWorker.executeWorkAsync(e, this)); 
} 

} 

私は上記のコードのスレッド安全性を推論するいくつかの困難を抱えています。ConcurrentHashMapを使用したキューのスレッド安全性

私はマップがスレッドセーフなので、completerejectAndretryは完全にスレッドセーフであると考えます。しかし、submitについては、AsyncWorkerそれ自体はスレッドセーフではありません。また、どのようにして​​(ConcurrentHashMapの組み込み保証を使用して)を使用せずに、スレッドを最も効率的な方法で安全にすることができますか?

+1

スレッドの安全性は、どのような動作が期待されるかによって異なります。既存のキーを上書きしたり同じ値を返さない場合は、 'submit()'はスレッドセーフです。 – m0skit0

答えて

2

がそこには現在のマッピングません、とnullが返された場合、それをマップしておく、または削除する値を返す場合、ファンクションキー、現在の値またはnullを取るcompute()方法、あります。

@Override 
public Future<E> submit(E e) { 
    return itemsBeingWorkedOn.compute(e, (k, v) -> { 
     if (v == null) { 
      return asyncWorker.executeWorkAsync(k, this); 
     } else { 
      LOGGER.debug("Rejected [{}] as it's already being worked on", k); 
      return v; 
     } 
    }); 
} 

あなたのキューを発注されていないことに注意してください。 FIFOまたはLIFO注文が必要な場合は、LinkedHashMapと​​のようなより適切なデータ構造を使用する必要があります。

+1

ロギング行がない場合は、 'return itemsBeingWorkedOn.computeIfAbsent(e、(k) - > {return asyncWorker.executeWorkAsync(k、this);));'に短縮できます。 – Kayaman

2

submit()あなたのcheck-then-actシーケンスはアトミックではないため、スレッドセーフではありません。あるスレッドの特定のキーの状態は、チェックの直後と呼び出しの前に他のスレッドによって変更することができます。この問題を解決するには、ConcurrentHashMapのアトミックメソッド(おそらくcomputeIfAbsent())を使用します。

if (!itemsBeingWorkedOn.containsKey(e)) { 
    itemsBeingWorkedOn.put(e, asyncWorker.executeWorkAsync(e, this)); 
} 

AsyncWorkerがスレッドセーフでない場合、どのクラス全体もスレッドセーフではありません。しかし、それはAsyncWorkerソースコードなしではそれを理由づけるのは難しいです。

+2

部分的なクレジット。アトミックではないシーケンスについては正しいですが、 'putIfAbsent'はまだ' asyncWorker.executeWorkAsync(e、this) 'を実行しますが、必ずしもマップに入れる必要はないので、適切ではありません。したがって、 'compute()'または 'computeIfAbsent()'が必要です。 'AsyncWorker'のスレッド安全性は、クラス全体に影響を与えません。特に' complete'と 'rejectAndRetry'では問題になりません。 – Kayaman

+0

ありがとうございます、あなたが正しいです、 'putIfAbsent'はここでは動作せず、' computeIfAbsent'を使うべきです。私は疑問を持って「原子メソッドから」を書いた。私の答えを編集します。 2番目の部分については、特定のクラスメソッドのスレッドセーフについての推論を見たことがないのに対して、クラスロジックからは、 'submit()'なしで他のメソッドが意味をなさないことは明らかです。さらに 'rejectAndRetry()'も 'AsyncWorker'を使います。 –

+0

'complete()'は 'remove()'がスレッドセーフであるため、「論理的に」スレッドセーフです。しかし、重複するロギングラインを得ることができます。厳密に言えば、スレッドセーフではありません。 'rejectAndRetry'をもっと詳しく見ると、その想定される目的が本当に分かりません。 – Kayaman

関連する問題