2016-12-19 10 views
0

クラスCounterがあります。このクラスには一連のキーが含まれており、各キーの値を増やしてすべての値を取得できます。ですから、解決しようとしている課題はAtomically incrementing counters stored in ConcurrentHashMapと同じです。違いは、キーのセットには制限がないため、新しいキーが頻繁に追加されることです。ConcurrentHashMapの要素の増分と削除

メモリ消費量を減らすため、読み取り後に値をクリアします。これはCounter.getAndClear()で発生します。キーも削除され、これは物事を壊すようです。

ランダムなキーを1つ増やし、別のスレッドはすべての値のスナップショットを取得してクリアします。

コードは以下の通りです:

import java.util.concurrent.ConcurrentHashMap; 
import java.util.concurrent.ConcurrentMap; 
import java.util.concurrent.ThreadLocalRandom; 
import java.util.Map; 
import java.util.HashMap; 
import java.lang.Thread; 

class HashMapTest { 
    private final static int hashMapInitSize = 170; 
    private final static int maxKeys = 100; 
    private final static int nIterations = 10_000_000; 
    private final static int sleepMs = 100; 

    private static class Counter { 
     private ConcurrentMap<String, Long> map; 

     public Counter() { 
      map = new ConcurrentHashMap<String, Long>(hashMapInitSize); 
     } 

     public void increment(String key) { 
      Long value; 
      do { 
       value = map.computeIfAbsent(key, k -> 0L); 
      } while (!map.replace(key, value, value + 1L)); 
     } 

     public Map<String, Long> getAndClear() { 
      Map<String, Long> mapCopy = new HashMap<String, Long>(); 
      for (String key : map.keySet()) { 
       Long removedValue = map.remove(key); 
       if (removedValue != null) 
        mapCopy.put(key, removedValue); 
      } 
      return mapCopy; 
     } 
    } 

    // The code below is used for testing 
    public static void main(String[] args) throws InterruptedException { 
     Counter counter = new Counter(); 
     Thread thread = new Thread(new Runnable() { 
      public void run() { 
       for (int j = 0; j < nIterations; j++) { 
        int index = ThreadLocalRandom.current().nextInt(maxKeys); 
        counter.increment(Integer.toString(index)); 
       } 
      } 
     }, "incrementThread"); 
     Thread readerThread = new Thread(new Runnable() { 
      public void run() { 
       long sum = 0; 
       boolean isDone = false; 
       while (!isDone) { 
        try { 
         Thread.sleep(sleepMs); 
        } 
        catch (InterruptedException e) { 
         isDone = true; 
        } 
        Map<String, Long> map = counter.getAndClear(); 
        for (Map.Entry<String, Long> entry : map.entrySet()) { 
         Long value = entry.getValue(); 
         sum += value; 
        } 
        System.out.println("mapSize: " + map.size()); 
       } 
       System.out.println("sum: " + sum); 
       System.out.println("expected: " + nIterations); 
      } 
     }, "readerThread"); 
     thread.start(); 
     readerThread.start(); 
     thread.join(); 
     readerThread.interrupt(); 
     readerThread.join(); 
     // Ensure that counter is empty 
     System.out.println("elements left in map: " + counter.getAndClear().size()); 
    } 
} 

テストの間、私はいくつかの増分が失われたことに気づきました。私は、次のような結果を得る:

sum: 9993354 
expected: 10000000 
elements left in map: 0 

あなたは(その合計が予想されるよりも少ない)このエラーを再現できない場合は、大きさのmaxKeysいくつかの受注を増やすか、hashMapInitSizeを減少またはnIterationsを高めるために試すことができます(後者実行時間も増加します)。エラーがある場合は、テストコード(メインメソッド)も含めました。

実行時にConcurrentHashMapの容量を増やすと、エラーが発生している可能性があります。私のコンピュータでは、hashMapInitSizeが170のときにコードが正しく動作するように見えますが、hashMapInitSizeが171のときに失敗します。171のサイズがトリガされると考えられます(128/0.75 == 170.66、ここで0.75はハッシュマップのデフォルト負荷係数です) 。

質問:私はremove,replacecomputeIfAbsentの操作を正しく使用していますか?私はUse of ConcurrentHashMap eliminates data-visibility troubles?の答えに基づいて、ConcurrentHashMapの原子操作であると仮定します。もしそうなら、増分が失われるのはなぜですか?

EDIT:

私はincrement()は私がincrement()に明示的なロックを回避しようとするように、getAndClear()よりも頻繁に呼ばれるようになっていることをここで重要な詳細を逃したと思います。しかし、実際に問題があるかどうかを確認するために、後で異なるバージョンのパフォーマンスをテストします。

+0

は[AtomicLong's ']を使用しない理由(ありhttps://docs.oracle.com/javase/8 /docs/api/java/util/concurrent/atomic/AtomicLong.html)? –

+0

@ ChaiT.Rex実際、私はAtomicLongを使い始めましたが、ConcurrentHashMapとAtomicLongの両方に原子操作が含まれていたので、はるかに複雑になっていましたが、結合したときにはアトミックではありません(map.computeIfAbsent(key、新しいAtomicLong(0L))。incrementAndGet() ')。これは明示的な同期で解決されている可能性がありますが、スループットが大幅に低下します。 – user502144

+0

あなたの 'increment'メソッドは本当に' map.merge(key、1L、Long :: sum) 'でなければなりません。私はあなたが何をしたとしても、スレッドセーフな 'getAndClear()'を作ることができて本当に疑わしいです。 –

答えて

1

問題は、keySetを繰り返しながら、removeを使用していると思われます。これは、JavaDocがMap#keySet()(私の強調)について言うものです。

このマップに含まれるキーのセットビューを返します。セットはマップによってサポートされているため、マップの変更はセットに反映され、その逆もあります。 イテレータ自身の削除操作を除いて、セット上の反復処理中にマップが変更された場合、反復結果は未定義です。

のJavaDoc ConcurrentHashMapためのさらなる手がかりを与える:反復子/列挙の作成時または以降のある時点で、ハッシュテーブルの状態を反映

同様に、イテレータ、Spliteratorsと列挙リターン要素。

結論は、キーの反復処理中にマップを変更することは予告できないということです。

解決策の1つは、getAndClear()操作の新しいマップを作成し、古いマップを返すことです。スイッチを保護する必要があり、実施例では、私はReentrantReadWriteLock用いる下記

class HashMapTest { 
private final static int hashMapInitSize = 170; 
private final static int maxKeys = 100; 
private final static int nIterations = 10_000_000; 
private final static int sleepMs = 100; 

private static class Counter { 
    private ConcurrentMap<String, Long> map; 
    ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); 
    ReadLock readLock = lock.readLock(); 
    WriteLock writeLock = lock.writeLock(); 

    public Counter() { 
     map = new ConcurrentHashMap<>(hashMapInitSize); 
    } 

    public void increment(String key) { 
     readLock.lock(); 
     try { 
      map.merge(key, 1L, Long::sum); 
     } finally { 
      readLock.unlock(); 
     } 
    } 

    public Map<String, Long> getAndClear() { 
     ConcurrentMap<String, Long> oldMap; 
     writeLock.lock(); 
     try { 
      oldMap = map; 
      map = new ConcurrentHashMap<>(hashMapInitSize); 
     } finally { 
      writeLock.unlock(); 
     } 

     return oldMap; 
    } 
} 

// The code below is used for testing 
public static void main(String[] args) throws InterruptedException { 
    final AtomicBoolean ready = new AtomicBoolean(false); 

    Counter counter = new Counter(); 
    Thread thread = new Thread(new Runnable() { 
     public void run() { 
      for (int j = 0; j < nIterations; j++) { 
       int index = ThreadLocalRandom.current().nextInt(maxKeys); 
       counter.increment(Integer.toString(index)); 
      } 
     } 
    }, "incrementThread"); 

    Thread readerThread = new Thread(new Runnable() { 
     public void run() { 
      long sum = 0; 
      while (!ready.get()) { 
       try { 
        Thread.sleep(sleepMs); 
       } catch (InterruptedException e) { 
        // 
       } 
       Map<String, Long> map = counter.getAndClear(); 
       for (Map.Entry<String, Long> entry : map.entrySet()) { 
        Long value = entry.getValue(); 
        sum += value; 
       } 
       System.out.println("mapSize: " + map.size()); 
      } 
      System.out.println("sum: " + sum); 
      System.out.println("expected: " + nIterations); 
     } 
    }, "readerThread"); 
    thread.start(); 
    readerThread.start(); 
    thread.join(); 
    ready.set(true); 
    readerThread.join(); 
    // Ensure that counter is empty 
    System.out.println("elements left in map: " + counter.getAndClear().size()); 
} 
} 
+0

問題を指摘してくれてありがとう!私はConcurrentHashMapから要素を取り除いた後、反復処理中に 'Map.remove'を呼び出すのが間違っているという印象を残して、' Iterator.remove'と 'Map.remove'という頭の中で2つのメソッドが反復されるのを読んで読んでいます。しかし、 'Iterator.remove'を呼び出すだけで安全です。 – user502144

+0

要素を削除する前にキーのセットをコピーしようとしました。 – user502144

関連する問題