2016-09-27 10 views
0

現在、ビデオ画像を圧縮するためにnvidiaハードウェアエンコーダで見つかったコードをリファクタリングしています。元の質問はここにある:回答に基づいてwondering if I can use stl smart pointers for thisスレッドセーフなバッファ配列

は、次のように私は自分のコードを更新しました:

回答やコメントに基づいて、私はスレッドセーフバッファ配列を作成しようとしています。ここにあります。コメントしてください。

#ifndef __BUFFER_ARRAY_H__ 
#define __BUFFER_ARRAY_H__ 

#include <vector> 
#include <mutex> 
#include <thread> 

template<class T> 
class BufferArray 
{ 
public: 
    class BufferArray() 
     :num_pending_items(0), pending_index(0), available_index(0) 
    {} 

    // This method is not thread-safe. 
    // Add an item to our buffer list 
    // Note we do not take ownership of the incoming pointer. 
    void add(T * buffer) 
    { 
     buffer_array.push_back(buffer); 
    } 

    // Returns a naked pointer to an available buffer. Should not be 
    // deleted by the caller. 
    T * get_available() 
    { 
     std::lock_guard<std::mutex> lock(buffer_array_mutex); 
     if (num_pending_items == buffer_array.size()) { 
      return NULL; 
     }  
     T * buffer = buffer_array[available_index]; 
     // Update the indexes. 
     available_index = (available_index + 1) % buffer_array.size(); 
     num_pending_items += 1; 
     return buffer; 
    } 

    T * get_pending() 
    { 
     std::lock_guard<std::mutex> lock(buffer_array_mutex); 
     if (num_pending_items == 0) { 
      return NULL; 
     } 

     T * buffer = buffer_array[pending_index]; 
     pending_index = (pending_index + 1) % buffer_array.size(); 
     num_pending_items -= 1; 
     return buffer; 
    } 


private: 
    std::vector<T * >     buffer_array; 
    std::mutex       buffer_array_mutex; 
    unsigned int      num_pending_items; 
    unsigned int      pending_index; 
    unsigned int      available_index; 

    // No copy semantics 
    BufferArray(const BufferArray &) = delete; 
    void operator=(const BufferArray &) = delete; 
}; 

#endif 

私の質問は、ここでいくつかのC++の優れた推奨事項を壊しているかどうかです。また、クラスにアクセスして複数のスレッドを使用できるように、クラスを拡張しています。私が逃したかもしれないものがあるかどうか疑問に思っていた。 2.で、「処理」とは、単にint型に乗じている。このテストでは

しかし、プロセッサのスレッドが保留キューから保留中のデータをとる方法のプロセスに注目してください:

+1

あなた '追加()'スレッドセーフではありません。また、ロックガードが必要です。さもなければ、質問はあまりにも漠然としたものではありません。 –

+3

質問には関係ありませんが、二重下線で始まる記号、またはアンダースコアの後に大文字が続く記号は、すべてのスコープで予約されています。詳細については、[この古い回答](http://stackoverflow.com/a/228797/440558)を参照してください。 –

+0

@JoachimPileborgしかし、私は一重または二重のアンダースコアで始まるものは使用していませんか?インクルージョンガードを意味しますか? – Luca

答えて

1

は、私はそれをこのような何かに近づくと思います使用可能なデータを使用可能なキューにプッシュします。次に、コンシューマ(この場合はディスクライタ)が利用可能なデータを再度検索する必要があることを(条件変数を介して)通知します。

#include <vector> 
#include <mutex> 
#include <thread> 
#include <queue> 
#include <condition_variable> 
#include <iostream> 

namespace notstd { 
    template<class Mutex> auto getlock(Mutex& m) 
    { 
     return std::unique_lock<Mutex>(m); 
    } 
} 

template<class T> 
class ProcessQueue 
{ 
public: 
    ProcessQueue() 
    {} 

    // This method is not thread-safe. 
    // Add an item to our buffer list 
    // Note we do not take ownership of the incoming pointer. 
    // @pre start_processing shall not have been called 
    void add(T * buffer) 
    { 
     pending_.push(buffer); 
    } 

    void start_processing() 
    { 
     process_thread_ = std::thread([this] { 
      while(not this->pending_.empty()) 
      { 
       auto lock = notstd::getlock(this->mutex_); 
       auto buf = this->pending_.front(); 
       lock.unlock(); 

       // 
       // this is the part that processes the "buffer" 

       *buf *= 2; 

       // 
       // now notify the structure that the processing is done - buffer is available 
       // 

       lock.lock(); 
       this->pending_.pop(); 
       this->available_.push(buf); 
       lock.unlock(); 
       this->change_.notify_one(); 
      } 
     }); 
    } 

    T* wait_available() 
    { 
     auto lock = notstd::getlock(mutex_); 
     change_.wait(lock, [this] { return not this->available_.empty() or this->pending_.empty(); }); 
     if (not available_.empty()) 
     { 
      auto p = available_.front(); 
      available_.pop(); 
      return p; 
     } 

     lock.unlock(); 
     process_thread_.join(); 
     return nullptr; 
    } 

private: 
    std::queue<T * >     pending_; 
    std::queue<T * >     available_; 
    std::mutex       mutex_; 
    std::condition_variable    change_; 
    std::thread      process_thread_; 

    // No copy semantics - implicit because of the mutex 
}; 

int main() 
{ 
    ProcessQueue<int> pq; 

    std::vector<int> v = { 1, 2, 3, 4, 5, 6, 7, 8, 9 }; 
    for (auto& i : v) { 
     pq.add(std::addressof(i)); 
    } 

    pq.start_processing(); 

    while (auto p = pq.wait_available()) 
    { 
     std::cout << *p << '\n'; 
    } 
} 

予想される出力:

2 
4 
6 
8 
10 
12 
14 
16 
18 
+0

ありがとうございました。このコードから私が学ぶことはたくさんあります! – Luca

関連する問題