2013-03-04 10 views
38

私は最近、C++ 11へのポートを、トリプルバッファのstd :: atomicを使って同時実行同期機構として使用するようにしました。このスレッド同期アプローチの背後にあるアイデアは、プロデューサスレッドが遅くなりませんので、コンシューマ、トリプルバッファリングがいくつかの利点をもたらすことができるより速く実行するプロデューサを持つプロデューサ - コンシューマ状況消費者を待つ。私の場合は、〜120fpsで更新される物理スレッドと、〜60fpsで実行されるレンダースレッドがあります。明らかに、私はレンダリングスレッドが常に最新の状態を取得できるようにしたいが、速度の違いのために物理スレッドから多くのフレームをスキップすることも知っている。一方、物理スレッドは一定の更新レートを維持し、データをロックする低速のレンダスレッドによって制限されないようにしたい。C++ 11アトミックメモリの順序付け - これはリラックス(リリース消費)順序の正しい使用法ですか?

オリジナルのCコードはremis-thoughtsで作成されており、詳しい説明は彼のblogにあります。私は元の実装のさらなる理解のためにそれを読むことに興味のある人を奨励します。

私の実装はhereです。

基本的な考え方は、3つの位置(バッファ)と、比較およびスワップされるアトミックフラグを持つ配列を持つことで、どの配列要素がいつどのような状態に対応するかを定義することです。このようにして、3つの原子変数のみがアレイの3つのインデックスと3重バッファリングの背後にあるロジックをモデル化するために使用されます。バッファの3つの位置は、Dirty、Clean、Snapという名前です。 プロデューサは常にDirtyインデックスに書き込み、ライタを反転させてDirtyを現在のCleanインデックスでスワップすることができます。 コンシューマは、新しいスナップを要求できます。スナップは、現在のスナップインデックスをクリーンインデックスでスワップして、最新のバッファを取得します。 コンシューマは常にスナップ位置のバッファを読み込みます。

フラグは8ビットの符号なし整数で構成され、ビットに対応する:

(未使用)(新しい書き込み)(2×ダーティ)(2×クリーン)(2Xスナップ)

newWrite余分なビットフラグは、ライタによってセットされ、リーダによってクリアされる。読者はこれを使用して最後のスナップから書き込みがあったかどうかをチェックし、そうでなければ別のスナップを取ることはできません。フラグとインデックスは、単純なビット単位の操作を使用して取得できます。

[OK]を、今のコードのために:

template <typename T> 
class TripleBuffer 
{ 

public: 

    TripleBuffer<T>(); 
    TripleBuffer<T>(const T& init); 

    // non-copyable behavior 
    TripleBuffer<T>(const TripleBuffer<T>&) = delete; 
    TripleBuffer<T>& operator=(const TripleBuffer<T>&) = delete; 

    T snap() const; // get the current snap to read 
    void write(const T newT); // write a new value 
    bool newSnap(); // swap to the latest value, if any 
    void flipWriter(); // flip writer positions dirty/clean 

    T readLast(); // wrapper to read the last available element (newSnap + snap) 
    void update(T newT); // wrapper to update with a new element (write + flipWriter) 

private: 

    bool isNewWrite(uint_fast8_t flags); // check if the newWrite bit is 1 
    uint_fast8_t swapSnapWithClean(uint_fast8_t flags); // swap Snap and Clean indexes 
    uint_fast8_t newWriteSwapCleanWithDirty(uint_fast8_t flags); // set newWrite to 1 and swap Clean and Dirty indexes 

    // 8 bit flags are (unused) (new write) (2x dirty) (2x clean) (2x snap) 
    // newWrite = (flags & 0x40) 
    // dirtyIndex = (flags & 0x30) >> 4 
    // cleanIndex = (flags & 0xC) >> 2 
    // snapIndex = (flags & 0x3) 
    mutable atomic_uint_fast8_t flags; 

    T buffer[3]; 
}; 

実装:

template <typename T> 
TripleBuffer<T>::TripleBuffer(){ 

    T dummy = T(); 

    buffer[0] = dummy; 
    buffer[1] = dummy; 
    buffer[2] = dummy; 

    flags.store(0x6, std::memory_order_relaxed); // initially dirty = 0, clean = 1 and snap = 2 
} 

template <typename T> 
TripleBuffer<T>::TripleBuffer(const T& init){ 

    buffer[0] = init; 
    buffer[1] = init; 
    buffer[2] = init; 

    flags.store(0x6, std::memory_order_relaxed); // initially dirty = 0, clean = 1 and snap = 2 
} 

template <typename T> 
T TripleBuffer<T>::snap() const{ 

    return buffer[flags.load(std::memory_order_consume) & 0x3]; // read snap index 
} 

template <typename T> 
void TripleBuffer<T>::write(const T newT){ 

    buffer[(flags.load(std::memory_order_consume) & 0x30) >> 4] = newT; // write into dirty index 
} 

template <typename T> 
bool TripleBuffer<T>::newSnap(){ 

    uint_fast8_t flagsNow(flags.load(std::memory_order_consume)); 
    do { 
    if(!isNewWrite(flagsNow)) // nothing new, no need to swap 
     return false; 
    } while(!flags.compare_exchange_weak(flagsNow, 
             swapSnapWithClean(flagsNow), 
             memory_order_release, 
             memory_order_consume)); 
    return true; 
} 

template <typename T> 
void TripleBuffer<T>::flipWriter(){ 

    uint_fast8_t flagsNow(flags.load(std::memory_order_consume)); 
    while(!flags.compare_exchange_weak(flagsNow, 
            newWriteSwapCleanWithDirty(flagsNow), 
            memory_order_release, 
            memory_order_consume)); 
} 

template <typename T> 
T TripleBuffer<T>::readLast(){ 
    newSnap(); // get most recent value 
    return snap(); // return it 
} 

template <typename T> 
void TripleBuffer<T>::update(T newT){ 
    write(newT); // write new value 
    flipWriter(); // change dirty/clean buffer positions for the next update 
} 

template <typename T> 
bool TripleBuffer<T>::isNewWrite(uint_fast8_t flags){ 
    // check if the newWrite bit is 1 
    return ((flags & 0x40) != 0); 
} 

template <typename T> 
uint_fast8_t TripleBuffer<T>::swapSnapWithClean(uint_fast8_t flags){ 
    // swap snap with clean 
    return (flags & 0x30) | ((flags & 0x3) << 2) | ((flags & 0xC) >> 2); 
} 

template <typename T> 
uint_fast8_t TripleBuffer<T>::newWriteSwapCleanWithDirty(uint_fast8_t flags){ 
    // set newWrite bit to 1 and swap clean with dirty 
    return 0x40 | ((flags & 0xC) << 2) | ((flags & 0x30) >> 2) | (flags & 0x3); 
} 

あなたが見ることができるように、私は、メモリ順序のためのリリース-消費パターンを使用することを決定しました。 ストアのリリース(memory_order_release)は、ストアの後に、現在のスレッドでの書き込みが保証されていないことを保証します。反対側では、を消費します。は、現在ロードされている値に依存して現在のスレッドで読み込みが行われていないことを保証します。より前です。これにより、同じアトミック変数を解放する他のスレッドの従属変数への書き込みが、現在のスレッドで確実に表示されます。

フラグがアトミックに設定されている必要があるため、フラグに直接影響しない他の変数に対する操作は、コンパイラによって自由に並べ替えられ、より多くの最適化が可能です。新しいメモリモデルに関するいくつかのドキュメントを読むことから、私は、これらのリラックスアトミックがARMやPOWERなどのプラットフォームに顕著な影響を与えることを認識しています(主にそれらのために導入されました)。私はARMをターゲットにしているので、私はこれらのオペレーションの恩恵を受けることができ、より少しのパフォーマンスを奪うことができると信じています。

は今の質問には:

私が正しく、この特定の問題に関するリリース・消費リラックスした順序を使用していますか?

おかげで、

アンドレ

PS:長い記事のために申し訳ありませんが、私はいくつかのまともなコンテキストは、問題のより良いビューのために必要であると信じていました。

EDIT:デフォルトload(std::memory_order_seq_cst)を使用して、したがって、直接代入を使用していたnewSnap()flipWriter()

  • 固定flags読む:Yakkの提案@実装 。
  • 明快にするため、ビット操作を専用機能に移動しました。
  • 戻りタイプをnewSnap()に追加しました。新しいものがない場合はfalseを返し、そうでない場合はtrueを返します。
  • TripleBufferを使用していた場合、コピーコンストラクタと割り当てコンストラクタの両方が安全でないため、= deleteイディオムを使用してコピー不可として定義されたクラス。

EDIT 2:間違っていました 固定の記述、(感謝@Useless)。 コンシューマは新しいスナップを要求し、スナップインデックス(「ライター」ではなく)から読み取ります。気を散らすと、それを指摘するために無駄に感謝申し訳ありません。

EDIT 3: を効果的にループサイクル毎に2冗長load()」Sを除去し、@display名の提案に従ってnewSnap()flipriter()機能を最適化。

+2

だけで簡単にコードレビュー、ここでは回答なし: 'テンプレート TripleBuffer :: TripleBuffer(のconst TripleBuffer&Tは){'」doesnの'TripleBuffer'が使用されている場合、ほとんどのコンテキストで安全に使うように見えます。私はそれを省略したいと思うでしょう。 'operator ='と同じです。 'newSnap'は「何も読まない」ために' false'を返すべきです。あなたは、すべてのビット操作を 'uint8_t 'を受け取り、それから出力可能な/入力する単一の型に移動する必要があります。明示的なメモリの順序保証なしで 'newSnap'に' flags'を読み込みます。 – Yakk

+0

うわー、ありがとう!あなたはコピーと代入演算子について正しいです。 'TripleBuffer'が使用されている場合、実際には安全ではありません。なぜなら、' T'型のアサインメントはアトミックであることが保証されていないからです。それらを削除してコンパイラにそれらを生成させる方が良いと思いますか?それとも、安全に動作させるかについてのアイデアはありますか?私は 'TripleBuffer'がコピーされているときにそれらのために多くの_real_を使用しているのを見ていません。私はそれらを完全に追加しましたが、すべての可能性のある状況を説明するのを忘れました。 –

+0

'newSnap'関数については、本当にtrue/falseを返すことができ、素晴らしい追加になります。私たちがあまりにも速く読もうとしていることを伝えることができます;) 'newSnap'で読み込まれた'フラグ 'についての素晴らしい見解です!それはデフォルトの 'load(memory_order_seq_cst)'です。それは害はありませんが、リリース/消費順序を使用する私の元の目的を破る:)ありがとう! –

答えて

1

はい、memory_order_acquireとmemory_order_consumeの違いですが、1秒あたり180回使用すると気付かないでしょう。回答を数字で知りたい場合は、m2 = memory_order_consumeでテストを実行できます。ただ、そのような何かにproducer_or_consumer_Threadを変更します。

TripleBuffer <int> tb; 

void producer_or_consumer_Thread(void *arg) 
{ 
    struct Arg * a = (struct Arg *) arg; 
    bool succeeded = false; 
    int i = 0, k, kold = -1, kcur; 

    while (a->run) 
    { 
     while (a->wait) a->is_waiting = true; // busy wait 
     if (a->producer) 
     { 
      i++; 
      tb.update(i); 
      a->counter[0]++; 
     } 
     else 
     { 
      kcur = tb.snap(); 
      if (kold != -1 && kcur != kold) a->counter[1]++; 
      succeeded = tb0.newSnap(); 
      if (succeeded) 
      { 
       k = tb.readLast(); 
       if (kold == -1) 
        kold = k; 
       else if (kold = k + 1) 
        kold = k; 
       else 
        succeeded = false; 
      } 
      if (succeeded) a->counter[0]++; 
     } 
    } 
    a->is_waiting = true; 
} 

テスト結果:

_#_ __Produced __Consumed _____Total 
    1 39258150 19509292 58767442 
    2 24598892 14730385 39329277 
    3 10615129 10016276 20631405 
    4 10617349 10026637 20643986 
    5 10600334 9976625 20576959 
    6 10624009 10069984 20693993 
    7 10609040 10016174 20625214 
    8 25864915 15136263 41001178 
    9 39847163 19809974 59657137 
10 29981232 16139823 46121055 
11 10555174 9870567 20425741 
12 25975381 15171559 41146940 
13 24311523 14490089 38801612 
14 10512252 9686540 20198792 
15 10520211 9693305 20213516 
16 10523458 9720930 20244388 
17 10576840 9917756 20494596 
18 11048180 9528808 20576988 
19 11500654 9530853 21031507 
20 11264789 9746040 21010829 
2

は、なぜあなたはあなたのCASループに二回、古いフラグ値をロードしているの?最初の時刻はであり、2番目の時刻はcompare_exchange_weak()で、CASの失敗時に指定すると、前の値が最初の引数にロードされます。この場合はflagsNowです。http://en.cppreference.com/w/cpp/atomic/atomic/compare_exchangeによると

、「はそうでなければ、期待に*これをに保存されている実際の値をロードする(実行負荷運転)。」だから何あなたのループがやっていることであること、失敗した場合に、compare_exchange_weak()リロードflagsNow、その後、ループを繰り返し、ロードの直後に最初のステートメントが再度ロードされます。compare_exchange_weak()。あなたのループは、ループの外に負荷を引っ張らなければなりません。例えば、newSnap()次のようになります。

uint_fast8_t flagsNow(flags.load(std::memory_order_consume)); 
do 
{ 
    if(!isNewWrite(flagsNow)) return false; // nothing new, no need to swap 
} while(!flags.compare_exchange_weak(flagsNow, swapSnapWithClean(flagsNow), memory_order_release, memory_order_consume)); 

flipWriter()

uint_fast8_t flagsNow(flags.load(std::memory_order_consume)); 
while(!flags.compare_exchange_weak(flagsNow, newWriteSwapCleanWithDirty(flagsNow), memory_order_release, memory_order_consume)); 
+0

あなたは正しいです!最適化をお寄せいただきありがとうございます;)元の質問にあなたの変更を反映し、あなたが積極的に貢献しているのであなたの答えをアップボールドします。しかし、あなたは私の質問には答えておらず、私はあなたの答えを受け入れることができません。乾杯! –

+0

コメント欄の投稿に文字数が多すぎるため、これを答えとして追加しました。 –

関連する問題