2017-11-28 20 views
1

Brian GoetzのJCIPを読んでいて、volatileの動作を実験するための次のコードを書いています。揮発性が期待通りに機能しない

public class StatefulObject { 

    private static final int NUMBER_OF_THREADS = 10; 

    private volatile State state; 

    public StatefulObject() { 
     state = new State(); 
    } 

    public State getState() { 
     return state; 
    } 

    public void setState(State state) { 
     this.state = state; 
    } 

    public static class State { 
     private volatile AtomicInteger counter; 

     public State() { 
      counter = new AtomicInteger(); 
     } 

     public AtomicInteger getCounter() { 
      return counter; 
     } 

     public void setCounter(AtomicInteger counter) { 
      this.counter = counter; 
     } 
    } 

    public static void main(String[] args) throws InterruptedException { 
     StatefulObject object = new StatefulObject(); 

     ExecutorService executorService = Executors.newFixedThreadPool(NUMBER_OF_THREADS); 

     AtomicInteger oldCounter = new AtomicInteger(); 
     AtomicInteger newCounter = new AtomicInteger(); 

     object.getState().setCounter(oldCounter); 

     ConcurrentMap<Integer, Long> lastSeen = new ConcurrentHashMap<>(); 
     ConcurrentMap<Integer, Long> firstSeen = new ConcurrentHashMap<>(); 
     lastSeen.put(oldCounter.hashCode(), 0L); 
     firstSeen.put(newCounter.hashCode(), Long.MAX_VALUE); 

     List<Future> futures = IntStream.range(0, NUMBER_OF_THREADS) 
      .mapToObj(num -> executorService.submit(() -> { 
       for (int i = 0; i < 1000; i++) { 
        object.getState().getCounter().incrementAndGet(); 
        lastSeen.computeIfPresent(object.getState().getCounter().hashCode(), (key, oldValue) -> Math.max(oldValue, System.nanoTime())); 
        firstSeen.computeIfPresent(object.getState().getCounter().hashCode(), (key, oldValue) -> Math.min(oldValue, System.nanoTime())); 
       } 
      })).collect(Collectors.toList()); 

     executorService.shutdown(); 

     object.getState().setCounter(newCounter); 

     futures.forEach(future -> { 
      try { 
       future.get(); 
      } catch (InterruptedException e) { 
       e.printStackTrace(); 
      } catch (ExecutionException e) { 
       e.printStackTrace(); 
      } 
     }); 

     System.out.printf("Counter: %s\n", object.getState().getCounter().get()); 
     long lastSeenOld = lastSeen.get(oldCounter.hashCode()); 
     long firstSeenNew = firstSeen.get(newCounter.hashCode()); 
     System.out.printf("Last seen old counter: %s\n", lastSeenOld); 
     System.out.printf("First seen new counter: %s\n", firstSeenNew); 
     System.out.printf("Old was seen after the new: %s\n", lastSeenOld > firstSeenNew); 
     System.out.printf("Old was seen %s nanoseconds after the new\n", lastSeenOld - firstSeenNew); 
    } 
} 

だから私はnewCounterは常に最初の(私はどれも古いカウンタを参照していないので、すべてのスレッドが更新を気づくことを期待)oldCounterが最後に見られた後にのみ認められていることを期待しています。この動作を観察するために、2つのマップを使用します。しかし、意外にも、私は常にこのような出力が得られます。私が間違っているところ

Counter: 9917 
Last seen old counter: 695372684800871 
First seen new counter: 695372684441226 
Old was seen after the update: true 
Old was seen 359645 nanoseconds after the new 

あなたが説明していただけますか?

ありがとうございます!

+0

は、私はちょうど、上記の問題点を省略するためにあなたのコード内でいくつかのことを変更しました。 – nos

+0

@nosしたがって、私は揮発性の 'counter'フィールドを更新した後、誰も古いカウンタを参照しないように、すべてのスレッドが更新を参照すると期待します。 – alxg2112

答えて

1

あなたの観察の背後にある理由は、Javaのバグではありませんが、あなたのコードには1つあります。あなたのコードでは、lastseenfirstSeenのマップをアトミックに実行すると、computeIfPresentの呼び出しを保証することはできません(Javadocsを参照、computeIfPresentはアトミックではありません)。これは、object.getState().getCounter()を取得して実際に地図を更新するまでに時間差があることを意味します。

このギャップのスレッドA(ナノタイミングを取得する前にすでにカウンタ参照 - 古いスレッド)とスレッドBがobject.getState().getCounter()を取得する直前に設定されている場合、newCounterが発生した場合。したがって、この正確なモーメントカウンター参照が更新された場合、スレッドAは古いカウンターキーを更新し、スレッドBは新しいカウンターキーを更新します。スレッドBがスレッドAの前にナノタイマーを取った場合(これはスレッドが分離されているために実際のCPUスケジューリングが分からないために発生する可能性があります)、完全にあなたの観察につながる可能性があります。

私の説明は明らかです。さらに明確にするもう1つのことは、StateクラスではAtomicInteger counterも揮発性と宣言しています。 AtomicIntegerは本質的に揮発性であるため、これは不要です。「不揮発性」のAtomic **はありません。あなたはoldCounterが最後に見られた後にのみnewCounterを見ることを期待し、なぜあなたはまた、正確に説明した場合にそれが役立つだろう

import java.util.Collections; 
import java.util.List; 
import java.util.concurrent.*; 
import java.util.concurrent.atomic.AtomicInteger; 
import java.util.stream.Collectors; 
import java.util.stream.IntStream; 

public class StatefulObject { 

    private static final int NUMBER_OF_THREADS = 10; 

    private volatile State state; 

    public StatefulObject() { 
     state = new State(); 
    } 

    public State getState() { 
     return state; 
    } 

    public void setState(State state) { 
     this.state = state; 
    } 

    public static class State { 
     private volatile AtomicInteger counter; 

     public State() { 
      counter = new AtomicInteger(); 
     } 

     public AtomicInteger getCounter() { 
      return counter; 
     } 

     public void setCounter(AtomicInteger counter) { 
      this.counter = counter; 
     } 
    } 

    public static void main(String[] args) throws InterruptedException { 
     StatefulObject object = new StatefulObject(); 

     ExecutorService executorService = Executors.newFixedThreadPool(NUMBER_OF_THREADS); 

     AtomicInteger oldCounter = new AtomicInteger(); 
     AtomicInteger newCounter = new AtomicInteger(); 

     object.getState().setCounter(oldCounter); 

     List<Long> oldList = new CopyOnWriteArrayList<>(); 
     List<Long> newList = new CopyOnWriteArrayList<>(); 

     List<Future> futures = IntStream.range(0, NUMBER_OF_THREADS) 
      .mapToObj(num -> executorService.submit(() -> { 
       for (int i = 0; i < 1000; i++) { 
        long l = System.nanoTime(); 
        object.getState().getCounter().incrementAndGet(); 
        if (object.getState().getCounter().equals(oldCounter)) { 
         oldList.add(l); 
        } else { 
         newList.add(l); 
        } 
       } 
      })).collect(Collectors.toList()); 

     executorService.shutdown(); 

     object.getState().setCounter(newCounter); 

     futures.forEach(future -> { 
      try { 
       future.get(); 
      } catch (InterruptedException e) { 
       e.printStackTrace(); 
      } catch (ExecutionException e) { 
       e.printStackTrace(); 
      } 
     }); 

     System.out.printf("Counter: %s\n", object.getState().getCounter().get()); 
     Collections.sort(oldList); 
     Collections.sort(newList); 
     long lastSeenOld = oldList.get(oldList.size() - 1); 
     long firstSeenNew = newList.get(0); 
     System.out.printf("Last seen old counter: %s\n", lastSeenOld); 
     System.out.printf("First seen new counter: %s\n", firstSeenNew); 
     System.out.printf("Old was seen after the new: %s\n", lastSeenOld > firstSeenNew); 
     System.out.printf("Old was seen %s nanoseconds after the new\n", lastSeenOld - firstSeenNew); 
    } 
} 
+0

ありがとう、あなたの答えとトーマス '私の頭の中で物事をクリア:) – alxg2112

0

あなたが見ているのは、揮発性の影響ではなく、ConcurrentMap<> lastSeenの同期の影響です。

10スレッドがほぼ同時に開始したとします。それぞれがほぼ平行にobject.getState().getCounter().incrementAndGet();を実行するので、oldCounterが増加します。

次に、これらのスレッドはlastSeen.computeIfPresent(object.getState().getCounter().hashCode(), (key, oldValue) -> Math.max(oldValue, System.nanoTime()));を実行しようとします。つまり、すべてobject.getState().getCounter().hashCode()を並列に評価し、それぞれがoldCounterの同じハッシュコードを取得してから、同じハッシュ値でConcurrentHashMap.computeIfPresent(Integer, ..)を呼び出します。

これらすべてのスレッドは同じキーの値を更新しようとしているため、ConcurrentHashMapはこれらの更新を同期する必要があります。最初のスレッドがメインスレッドはobject.getState().setCounter(newCounter);を実行lastSeenを更新している時間の間に

、いくつかのスレッドがまだ lastSeenを更新するために待っている間、最初のスレッドは、 newCounterため firstSeen実行されますので。


より良い結果を得るには、情報収集ステップを分析ステップから分離する方がよいでしょう。

たとえば、スレッドはカウンタのハッシュコードと更新のタイムスタンプをすべての計算が完了した後で解析する配列に取り込むことができます。

+0

非常に明確な説明をありがとう! – alxg2112

関連する問題