クラスCounter
があります。このクラスには一連のキーが含まれており、各キーの値を増やしてすべての値を取得できます。ですから、解決しようとしている課題はAtomically incrementing counters stored in ConcurrentHashMapと同じです。違いは、キーのセットには制限がないため、新しいキーが頻繁に追加されることです。ConcurrentHashMapの要素の増分と削除
メモリ消費量を減らすため、読み取り後に値をクリアします。これはCounter.getAndClear()
で発生します。キーも削除され、これは物事を壊すようです。
ランダムなキーを1つ増やし、別のスレッドはすべての値のスナップショットを取得してクリアします。
コードは以下の通りです:
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ThreadLocalRandom;
import java.util.Map;
import java.util.HashMap;
import java.lang.Thread;
class HashMapTest {
private final static int hashMapInitSize = 170;
private final static int maxKeys = 100;
private final static int nIterations = 10_000_000;
private final static int sleepMs = 100;
private static class Counter {
private ConcurrentMap<String, Long> map;
public Counter() {
map = new ConcurrentHashMap<String, Long>(hashMapInitSize);
}
public void increment(String key) {
Long value;
do {
value = map.computeIfAbsent(key, k -> 0L);
} while (!map.replace(key, value, value + 1L));
}
public Map<String, Long> getAndClear() {
Map<String, Long> mapCopy = new HashMap<String, Long>();
for (String key : map.keySet()) {
Long removedValue = map.remove(key);
if (removedValue != null)
mapCopy.put(key, removedValue);
}
return mapCopy;
}
}
// The code below is used for testing
public static void main(String[] args) throws InterruptedException {
Counter counter = new Counter();
Thread thread = new Thread(new Runnable() {
public void run() {
for (int j = 0; j < nIterations; j++) {
int index = ThreadLocalRandom.current().nextInt(maxKeys);
counter.increment(Integer.toString(index));
}
}
}, "incrementThread");
Thread readerThread = new Thread(new Runnable() {
public void run() {
long sum = 0;
boolean isDone = false;
while (!isDone) {
try {
Thread.sleep(sleepMs);
}
catch (InterruptedException e) {
isDone = true;
}
Map<String, Long> map = counter.getAndClear();
for (Map.Entry<String, Long> entry : map.entrySet()) {
Long value = entry.getValue();
sum += value;
}
System.out.println("mapSize: " + map.size());
}
System.out.println("sum: " + sum);
System.out.println("expected: " + nIterations);
}
}, "readerThread");
thread.start();
readerThread.start();
thread.join();
readerThread.interrupt();
readerThread.join();
// Ensure that counter is empty
System.out.println("elements left in map: " + counter.getAndClear().size());
}
}
テストの間、私はいくつかの増分が失われたことに気づきました。私は、次のような結果を得る:
sum: 9993354
expected: 10000000
elements left in map: 0
あなたは(その合計が予想されるよりも少ない)このエラーを再現できない場合は、大きさのmaxKeysいくつかの受注を増やすか、hashMapInitSizeを減少またはnIterationsを高めるために試すことができます(後者実行時間も増加します)。エラーがある場合は、テストコード(メインメソッド)も含めました。
実行時にConcurrentHashMapの容量を増やすと、エラーが発生している可能性があります。私のコンピュータでは、hashMapInitSize
が170のときにコードが正しく動作するように見えますが、hashMapInitSize
が171のときに失敗します。171のサイズがトリガされると考えられます(128/0.75 == 170.66、ここで0.75はハッシュマップのデフォルト負荷係数です) 。
質問:私はremove
,replace
とcomputeIfAbsent
の操作を正しく使用していますか?私はUse of ConcurrentHashMap eliminates data-visibility troubles?の答えに基づいて、ConcurrentHashMap
の原子操作であると仮定します。もしそうなら、増分が失われるのはなぜですか?
EDIT:
私はincrement()
は私がincrement()
に明示的なロックを回避しようとするように、getAndClear()
よりも頻繁に呼ばれるようになっていることをここで重要な詳細を逃したと思います。しかし、実際に問題があるかどうかを確認するために、後で異なるバージョンのパフォーマンスをテストします。
は[AtomicLong's ']を使用しない理由(ありhttps://docs.oracle.com/javase/8 /docs/api/java/util/concurrent/atomic/AtomicLong.html)? –
@ ChaiT.Rex実際、私はAtomicLongを使い始めましたが、ConcurrentHashMapとAtomicLongの両方に原子操作が含まれていたので、はるかに複雑になっていましたが、結合したときにはアトミックではありません(map.computeIfAbsent(key、新しいAtomicLong(0L))。incrementAndGet() ')。これは明示的な同期で解決されている可能性がありますが、スループットが大幅に低下します。 – user502144
あなたの 'increment'メソッドは本当に' map.merge(key、1L、Long :: sum) 'でなければなりません。私はあなたが何をしたとしても、スレッドセーフな 'getAndClear()'を作ることができて本当に疑わしいです。 –