2016-06-15 11 views
2

固定数のスレッド(たとえば4)が連続してスレッドセーフ待ち行列から解放されるC++ 11/14プログラムを書くつもりですキューに残っている作業はありません。固定数のスレッドとスレッドセーフ待ち行列を持つC++マルチスレッド

スレッド・セーフキューの実装:

template<typename T> 
class threadsafe_queue 
{ 
private: 
    mutable std::mutex mut; 
    std::queue<T> data_queue; 
    std::condition_variable data_cond; 
public: 
    threadsafe_queue() {} 
    threadsafe_queue(threadsafe_queue const &other) 
    { 
    std::lock_guard<std::mutex> lk(other.mut); 
    data_queue = other.data_queue; 
    } 

    void push(T new_value) 
    { 
    std::lock_guard<std::mutex> lk(mut); 
    data_queue.push(new_value); 
    data_cond.notify_one(); 
    } 

    void wait_and_pop(T &value) 
    { 
    std::unique_lock<std::mutex> lk(mut); 
    data_cond.wait(lk, [this]{return !data_queue.empty();}); 
    value = data_queue.front(); 
    data_queue.pop(); 
    } 

    std::shared_ptr<T> wait_and_pop() 
    { 
    std::unique_lock<std::mutex> lk(mut); 
    data_cond.wait(lk, [this]{return !data_queue.empty();}); 
    std::shared_ptr<T> res(std::make_shared<T>(data_queue.front())); 
    data_queue.pop(); 
    return res; 
    } 

    bool try_pop(T &value) 
    { 
    std::lock_guard<std::mutex> lk(mut); 
    if (data_queue.empty()) 
     return false; 
    value = data_queue.front(); 
    data_queue.pop(); 
    return true; 
    } 

    std::shared_ptr<T> try_pop() 
    { 
    std::lock_guard<std::mutex> lk(mut); 
    if (data_queue.empty()) 
     return std::shared_ptr<T>(); 
    std::shared_ptr<T> res(std::make_shared<T>(data_queue.front())); 
    data_queue.pop(); 
    return res; 
    } 

    bool empty() const 
    { 
    std::lock_guard<std::mutex> lk(mut); 
    return data_queue.empty(); 
    } 
}; 

機能、各スレッドの実行:

void insertintobidask(std::string connstring, std::string ziparchivename, OFStreamWriter &errlog) { /.../ } 

スレッドがに残された仕事がなくなるまでワークキューオフの仕事を取ることになっている中でメインをキュー:

int main() 
{ 
    std::ofstream errlog 
    errlog.open("/home/vorlket/Desktop/Project/Code/Test/errlog.txt", std::ofstream::out); 
    OFStreamWriter ofsw(&errlog); 

    threadsafe_queue<std::string> wqueue; 
    boost::filesystem::path fx_dir("/home/vorlket/Desktop/Project/Code/Test/Data"); 
    std::regex pattern_fx("HISTDATA_COM_ASCII_.*.zip"); 
    for (boost::filesystem::recursive_directory_iterator iter(fx_dir), end; iter != end; ++iter) 
    { 
    std::string name = iter->path().filename().string(); 
    if (std::regex_match(name, pattern_fx)) 
    { 
     wqueue.push(name); 
    } 
    } 

    /* Each thread below would run once, how do I modify it to make it continuously take a work off the queue and run until there is no work left in the queue? 
    std::thread consumer1 (insertintobidask, "hostaddr=192.168.2.104 port=5433 dbname=fxproj user=vorlket password=K1156312J", wqueue.wait_and_pop(), &ofsw); 
    std::thread consumer2 (insertintobidask, "hostaddr=192.168.2.104 port=5433 dbname=fxproj user=vorlket password=K1156312J", wqueue.wait_and_pop(), &ofsw); 
    std::thread consumer3 (insertintobidask, "hostaddr=192.168.3.104 port=5433 dbname=fxproj user=vorlket password=K1156312J", wqueue.wait_and_pop(), &ofsw); 
    std::thread consumer4 (insertintobidask, "hostaddr=192.168.3.104 port=5433 dbname=fxproj user=vorlket password=K1156312J", wqueue.wait_and_pop(), &ofsw); 

    consumer1.join(); 
    consumer2.join(); 
    consumer3.join(); 
    consumer4.join(); 
    */ 

    errlog.close(); 
    return 0; 
} 

以下のNimの回答に基づく別のアプローチを試しました。 できます。高速ですが、私はそれが取り組んでいる作業負荷およびプラットフォーム1に依存推測たアプローチ

/* g++ -std=gnu++11 fxetl.cxx -o fxetl -lboost_system -lboost_filesystem -lzip -lpqxx -lpq -pthread */ 

#include <boost/filesystem.hpp> 
#include <regex> 
#include <iostream> 
#include <fstream> 
#include <string> 
#include <pqxx/pqxx> 
#include <zip.h> 
#include <thread> 
#include <boost/asio.hpp> 
#include "threadsafe_oerrlog.h" 

void insertintobidask(pqxx::nontransaction &txn, std::string ziparchivename, OFStreamWriter &errlog) 
{ 
    std::string fileyearmonth = ziparchivename.substr(27, 6); 
    std::string ziparchivepath = "/home/vorlket/Desktop/Project/Code/Test/Data/HISTDATA_COM_ASCII_AUDUSD_T" + fileyearmonth + ".zip"; 
    std::string zipfilepath = "DAT_ASCII_AUDUSD_T_" + fileyearmonth + ".csv"; 
    int err, r; 
    char buffer[39]; // each line takes up 39 bytes 

    struct zip *ziparchive = zip_open(ziparchivepath.c_str(), 0, &err); 
    if (ziparchive) 
    { 
    struct zip_file *zipfile = zip_fopen(ziparchive, zipfilepath.c_str(), 0); 
    if (zipfile) 
    { 
     while ((r = zip_fread(zipfile, buffer, sizeof(buffer))) > 0) 
     { 
     std::string str(buffer); 
     txn.exec("INSERT INTO fx.bidask VALUES('AUDUSD', to_timestamp(" +txn.quote(str.substr(0, 18)) + ", 'YYYYMMDD HH24MISSMS'), " + txn.quote(str.substr(19, 8)) + ", " + txn.quote(str.substr(28, 8)) + ")"); 
     } 
     zip_fclose(zipfile); 
     std::cout << fileyearmonth << std::endl; 
    } 
    else 
    { 
     errlog << zipfilepath; 
    } 
    } 
    else 
    { 
    errlog << ziparchivepath; 
    } 

    zip_close(ziparchive); 
} 


int main() 
{ 
    pqxx::connection conn1("hostaddr=192.168.2.104 port=5433 dbname=fxproj user=vorlket password=K1156312J"); 
    pqxx::nontransaction txn1(conn1); 
    pqxx::connection conn2("hostaddr=192.168.3.104 port=5433 dbname=fxproj user=vorlket password=K1156312J"); 
    pqxx::nontransaction txn2(conn2); 
    pqxx::connection conn3("hostaddr=192.168.2.104 port=5433 dbname=fxproj user=vorlket password=K1156312J"); 
    pqxx::nontransaction txn3(conn3); 
    pqxx::connection conn4("hostaddr=192.168.3.104 port=5433 dbname=fxproj user=vorlket password=K1156312J"); 
    pqxx::nontransaction txn4(conn4); 

    std::ofstream errlog("/home/vorlket/Desktop/Project/Code/Test/errlog.txt"); 
    OFStreamWriter ofsw(&errlog); 

    boost::asio::io_service service1; // queue 
    boost::asio::io_service service2; 
    boost::asio::io_service service3; 
    boost::asio::io_service service4; 

    boost::filesystem::path fx_dir("/home/vorlket/Desktop/Project/Code/Test/Data"); 
    std::regex pattern_fx("HISTDATA_COM_ASCII_.*.zip"); 
    int serviceid = 0; 
    for (boost::filesystem::recursive_directory_iterator iter(fx_dir), end; iter != end; ++iter) 
    { 
    std::string name = iter->path().filename().string(); 
    if (std::regex_match(name, pattern_fx)) 
    { 
     serviceid %= 3; 
     switch (serviceid) 
     { 
     case 0 : 
      service1.post([&txn1, name, &ofsw]() { insertintobidask(txn1, name, ofsw); }); 
      break; 
     case 1 : 
      service2.post([&txn2, name, &ofsw]() { insertintobidask(txn2, name, ofsw); }); 
      break; 
     case 2 : 
      service3.post([&txn3, name, &ofsw]() { insertintobidask(txn3, name, ofsw); }); 
      break; 
     case 3 : 
      service4.post([&txn4, name, &ofsw]() { insertintobidask(txn4, name, ofsw); }); 
      break; 
     } 
     ++serviceid; 
    } 
    } 

    std::thread t1([&service1]() { service1.run(); }); 
    std::thread t2([&service2]() { service2.run(); }); 
    std::thread t3([&service3]() { service3.run(); }); 
    std::thread t4([&service4]() { service4.run(); }); 

    t1.join(); 
    t2.join(); 
    t3.join(); 
    t4.join(); 

} 

わかりません。どちらが速いかを見てみる価値がある。どのアプローチがより速く、誰に感謝しているかについてのコメント。

+0

質問はここに歓迎し、しばしば重いdownvotingにつながるされていません。今まで実装/試したことをお見せして、コード内の何が機能していないかについて具体的な質問をしてください。それは、ようこそ、SO! – Arunmu

+0

[スレッドサポートライブラリ](http://en.cppreference.com/w/cpp/thread)を見てください。あなたはそこに必要なもののほとんどを見つけるでしょう。 – Aconcagua

+0

私が試したことを共有するために質問を編集しました。 – vorlket

答えて

2

これは学習のためのものではないが、それが十分に速いものでない限り、私はこれらの粗悪な操作を既存のメカニズムに委ねるだろう。そして、私は

コードは次のようになり...事のまさにこのタイプにboost::asio::io_serviceを使用することを好む:このような

// Additional header 
#include <boost/asio.hpp> 

int main() 
{ 
    std::ofstream errlog 
    errlog.open("/home/vorlket/Desktop/Project/Code/Test/errlog.txt", std::ofstream::out); 
    OFStreamWriter ofsw(&errlog); 

    boost::asio::io_service service; // queue 
    boost::filesystem::path fx_dir("/home/vorlket/Desktop/Project/Code/Test/Data"); 
    std::regex pattern_fx("HISTDATA_COM_ASCII_.*.zip"); 
    for (boost::filesystem::recursive_directory_iterator iter(fx_dir), end; iter != end; ++iter) 
    { 
    std::string name = iter->path().filename().string(); 
    if (std::regex_match(name, pattern_fx)) 
    { 
     service.post([name]() { 
     // Do something with this file 
     }); 
    } 
    } 
    // Now start-up n-threads to dispatch on the io_service 
    std::thread t1([&service]() { service.run(); }); // this will dispatch on queue until there is nothing left to do... 
    std::thread t2([&service]() { service.run(); }); 
    std::thread t3([&service]() { service.run(); }); 
    std::thread t4([&service]() { service.run(); }); 
    : 

    // Wait for them to complete 
    t1.join(); 
    t2.join(); 
    t3.join(); 
    t4.join(); 
} 
+0

Nim、私はラムダで関数呼び出しを挿入しましたが、スレッドは何の仕事もしていないようです。私が試したコードは上の編集された質問にあります。 – vorlket

+0

@vorlket、変更はうまく見える、私は特にそれが間違って何かを見ることができない - いくつかのロギングは物事がどこにある参照するのに役立つことがありますか? – Nim

+0

inserintabidask関数に相対パスまたは絶対パスで間違いがありました。できます。共有してくれてありがとう。 – vorlket

関連する問題