2016-11-03 10 views
3

私は明らかに私のプログラム全体のボトルネックである機能を持っています。私は、OpenMPとの並列化が役立つと思った。forループのOpenMP並列化:コードの効率が悪い

私の計算の実際の例です(申し訳ありませんが、機能は少し長めです)。私のプログラムでは、5つのネストされたループの前の作業のいくつかは別の場所で行われており、効率についてはまったく問題にはなりません。

#include <vector> 
#include <iostream> 
#include <cmath> 
#include <cstdio> 
#include <chrono> 
#include "boost/dynamic_bitset.hpp" 

using namespace std::chrono; 

void compute_mddr(unsigned Ns, unsigned int block_size, unsigned int sector) 
{ 
    std::vector<unsigned int> basis; 
    for (std::size_t s = 0; s != std::pow(2,Ns); s++) { 
    boost::dynamic_bitset<> s_bin(Ns,s); 
    if (s_bin.count() == Ns/2) { 
     basis.push_back(s); 
    } 
    } 
    std::vector<double> gs(basis.size()); 
    for (unsigned int i = 0; i != gs.size(); i++) 
    gs[i] = double(std::rand())/RAND_MAX; 

    unsigned int ns_A = block_size; 
    unsigned int ns_B = Ns-ns_A; 
    boost::dynamic_bitset<> mask_A(Ns,(1<<ns_A)-(1<<0)); 
    boost::dynamic_bitset<> mask_B(Ns,((1<<ns_B)-(1<<0))<<ns_A); 

    // Find the basis of the A block 
    unsigned int NAsec = sector; 
    std::vector<double> basis_NAsec; 
    for (unsigned int s = 0; s < std::pow(2,ns_A); s++) { 
    boost::dynamic_bitset<> s_bin(ns_A,s); 
    if (s_bin.count() == NAsec) 
     basis_NAsec.push_back(s); 
    } 
    unsigned int bs_A = basis_NAsec.size(); 

    // Find the basis of the B block 
    unsigned int NBsec = (Ns/2)-sector; 
    std::vector<double> basis_NBsec; 
    for (unsigned int s = 0; s < std::pow(2,ns_B); s++) { 
    boost::dynamic_bitset<> s_bin(ns_B,s); 
    if (s_bin.count() == NBsec) 
     basis_NBsec.push_back(s); 
    } 
    unsigned int bs_B = basis_NBsec.size(); 

    std::vector<std::vector<double> > mddr(bs_A); 
    for (unsigned int i = 0; i != mddr.size(); i++) { 
    mddr[i].resize(bs_A); 
    for (unsigned int j = 0; j != mddr[i].size(); j++) { 
     mddr[i][j] = 0.0; 
    } 
    } 

    // Main calculation part 
    for (unsigned int mu_A = 0; mu_A != bs_A; mu_A++) { // loop 1 
    boost::dynamic_bitset<> mu_A_bin(ns_A,basis_NAsec[mu_A]); 
    for (unsigned int nu_A = mu_A; nu_A != bs_A; nu_A++) { // loop 2 
     boost::dynamic_bitset<> nu_A_bin(ns_A,basis_NAsec[nu_A]); 

     double sum = 0.0; 
#pragma omp parallel for reduction(+:sum) 
     for (unsigned int mu_B = 0; mu_B < bs_B; mu_B++) { // loop 3 
     boost::dynamic_bitset<> mu_B_bin(ns_B,basis_NBsec[mu_B]); 

     for (unsigned int si = 0; si != basis.size(); si++) { // loop 4 
      boost::dynamic_bitset<> si_bin(Ns,basis[si]); 
      boost::dynamic_bitset<> si_A_bin = si_bin & mask_A; 
      si_A_bin.resize(ns_A); 
      if (si_A_bin != mu_A_bin) 
      continue; 
      boost::dynamic_bitset<> si_B_bin = (si_bin & mask_B)>>ns_A; 
      si_B_bin.resize(ns_B); 
      if (si_B_bin != mu_B_bin) 
      continue; 

      for (unsigned int sj = 0; sj < basis.size(); sj++) { // loop 5 
      boost::dynamic_bitset<> sj_bin(Ns,basis[sj]); 
      boost::dynamic_bitset<> sj_A_bin = sj_bin & mask_A; 
      sj_A_bin.resize(ns_A); 
      if (sj_A_bin != nu_A_bin) 
       continue; 
      boost::dynamic_bitset<> sj_B_bin = (sj_bin & mask_B)>>ns_A; 
      sj_B_bin.resize(ns_B); 
      if (sj_B_bin != mu_B_bin) 
       continue; 
      sum += gs[si]*gs[sj]; 
      } 
     } 
     } 
     mddr[nu_A][mu_A] = mddr[mu_A][nu_A] = sum; 
    } 
    } 
} 


int main() 
{ 
    unsigned int l = 8; 
    unsigned int Ns = 2*l; 
    unsigned block_size = 6; // must be between 1 and l 
    unsigned sector = (block_size%2 == 0) ? block_size/2 : (block_size+1)/2; 

    high_resolution_clock::time_point t1 = high_resolution_clock::now(); 
    compute_mddr(Ns,block_size,sector); 
    high_resolution_clock::time_point t2 = high_resolution_clock::now(); 
    duration<double> time_span = duration_cast<duration<double>>(t2 - t1); 
    std::cout << "Function took " << time_span.count() << " seconds."; 
    std::cout << std::endl; 
} 

compute_mddr機能は、基本的には完全にマトリクスmddrを充填され、これは私はそれが本質的に和を計算なので、ループ3を並列化することを決定し、最も外側のループ1および2 に相当します。大きさのオーダーを与えるために、basis_NBsecベクトルのループ3は〜50〜100要素を超え、ベクトルbasisの〜10000要素を超える2つの最内側ループsiおよびが実行されます。

しかし、(gcc 5.4.0、ubuntu 16.0.4、i5-4440 cpuで-O3 -fopenmpでコンパイルされた)コードを実行すると、スピードアップ(2スレッド)または非常に限られたゲイン3と4スレッド):

time OMP_NUM_THREADS=1 ./a.out 
Function took 230.435 seconds. 
real 3m50.439s 
user 3m50.428s 
sys 0m0.000s 


time OMP_NUM_THREADS=2 ./a.out 
Function took 227.754 seconds. 
real 3m47.758s 
user 7m2.140s 
sys 0m0.048s 


time OMP_NUM_THREADS=3 ./a.out 
Function took 181.492 seconds. 
real 3m1.495s 
user 7m36.056s 
sys 0m0.036s 


time OMP_NUM_THREADS=4 ./a.out 
Function took 150.564 seconds. 
real 2m30.568s 
user 7m56.156s 
sys 0m0.096s 

私が正しくユーザーから数字を理解していれば、コードが実行されると、CPU使用率は確かに良い(とではありません3と4スレッドのために、私はのために〜250%のCPU使用率を取得します3スレッド、4スレッドの場合はわずか300%)。

私はOpenMPを初めて使用しました。私は単純な例で非常に早くそれを試しました。私が見る限り、共有部分のベクトルbasis_NAsecbasis_NBsecおよびbasisをパラレル部分に変更するのではなく、読んだだけです(これは私が読んだいくつかの関連する質問で指摘された側面でした)。

私は間違っていますか? perf recordとプログラムのパフォーマンスを簡単に見て撮影

答えて

3

は、スレッド数のregaradlessは、ほとんどの時間をmalloc & freeに費やされていることを示しています。これは一般的に悪い兆候であり、並列化も阻害します。

Samples: 1M of event 'cycles:pp', Event count (approx.): 743045339605                               
    Children  Self Command Shared Object  Symbol                                  
+ 17.14% 17.12% a.out a.out    [.] _Z12compute_mddrjjj._omp_fn.0                           
+ 15.45% 15.43% a.out libc-2.23.so   [.] __memcmp_sse4_1                              
+ 15.21% 15.19% a.out libc-2.23.so   [.] __memset_avx2                               
+ 13.09% 13.07% a.out libc-2.23.so   [.] _int_free                                
+ 11.66% 11.65% a.out libc-2.23.so   [.] _int_malloc                               
+ 10.21% 10.20% a.out libc-2.23.so   [.] malloc                                 

malloc & freeの原因は、基本的にstd::vectorのあるboost::dynamic_bitsetオブジェクト、一定の創造です。注:perfでは、特定の機能のcan be challenging to find the callersです。 gdbで実行し、実行フェーズ中に割り込み、break balloccontinueを呼び出して発信者を把握することができます。

パフォーマンスを向上させるための直接的なアプローチは、これらのオブジェクトを可能な限り長く維持して、何度も何度も再割り当てを避けることです。これは、できるだけ局所的に変数を宣言するための通常の良い習慣に反するものです。 dynamic_bitsetオブジェクトを再利用するの変換は、次のようになります。

#pragma omp parallel for reduction(+:sum) 
    for (unsigned int mu_B = 0; mu_B < bs_B; mu_B++) { // loop 3 
    boost::dynamic_bitset<> mu_B_bin(ns_B,basis_NBsec[mu_B]); 

    boost::dynamic_bitset<> si_bin(Ns); 
    boost::dynamic_bitset<> si_A_bin(Ns); 
    boost::dynamic_bitset<> si_B_bin(Ns); 

    boost::dynamic_bitset<> sj_bin(Ns); 
    boost::dynamic_bitset<> sj_A_bin(Ns); 

    boost::dynamic_bitset<> sj_B_bin(Ns); 

    for (unsigned int si = 0; si != basis.size(); si++) { // loop 4 
     si_bin = basis[si]; 
     si_A_bin = si_bin; 
     assert(si_bin.size() == Ns); 
     assert(si_A_bin.size() == Ns); 
     assert(mask_A.size() == Ns); 
     si_A_bin &= mask_A; 
     si_A_bin.resize(ns_A); 
     if (si_A_bin != mu_A_bin) 
     continue; 
     si_B_bin = si_bin; 
     assert(si_bin.size() == Ns); 
     assert(si_B_bin.size() == Ns); 
     assert(mask_B.size() == Ns); 
     // Optimization note: dynamic_bitset::operator& 
     // does create a new object, operator&= does not 
     // Same for >> 
     si_B_bin &= mask_B; 
     si_B_bin >>= ns_A; 
     si_B_bin.resize(ns_B); 
     if (si_B_bin != mu_B_bin) 
     continue; 

     for (unsigned int sj = 0; sj < basis.size(); sj++) { // loop 5 
     sj_bin = basis[sj]; 
     sj_A_bin = sj_bin; 
     assert(sj_bin.size() == Ns); 
     assert(sj_A_bin.size() == Ns); 
     assert(mask_A.size() == Ns); 
     sj_A_bin &= mask_A; 
     sj_A_bin.resize(ns_A); 
     if (sj_A_bin != nu_A_bin) 
      continue; 

     sj_B_bin = sj_bin; 

     assert(sj_bin.size() == Ns); 
     assert(sj_B_bin.size() == Ns); 
     assert(mask_B.size() == Ns); 

     sj_B_bin &= mask_B; 
     sj_B_bin >>= ns_A; 
     sj_B_bin.resize(ns_B); 
     if (sj_B_bin != mu_B_bin) 
      continue; 
     sum += gs[si]*gs[sj]; 
     } 
    } 
    } 

これは、すでに289秒〜39秒〜から私のシステムでシングルスレッド実行時間を削減します。また、プログラムは〜10スレッド(4.1秒)までほぼ完全にスケーリングされます。

スレッド数が多い場合、並列ループに負荷バランスの問題があります。schedule(dynamic)を追加することで少し緩和することができますが、どのように関連性があるのか​​よくわかりません。

さらに重要なことに、std::bitsetの使用を検討する必要があります。非常に高価なboost::dynamic_bitsetコンストラクタがなくても、非常に高価です。大部分の時間は超音波でdynamic_bitest/vectorコードであり、memmove/memcmpである。あなただけstd::bitsetの非常にいくつかの単語を使用する場合、基本的に消える

+ 32.18% 32.15% ope_gcc_dyn ope_gcc_dyn   [.] _ZNSt6vectorImSaImEEaSERKS1_                          
+ 29.13% 29.10% ope_gcc_dyn ope_gcc_dyn   [.] _Z12compute_mddrjjj._omp_fn.0                          
+ 21.65%  0.00% ope_gcc_dyn [unknown]   [.] 0000000000000000                             
+ 16.24% 16.23% ope_gcc_dyn ope_gcc_dyn   [.] _ZN5boost14dynamic_bitsetImSaImEE6resizeEmb.constprop.102                   
+ 10.25% 10.23% ope_gcc_dyn libc-2.23.so   [.] __memcmp_sse4_1                             
+ 9.61%  0.00% ope_gcc_dyn libc-2.23.so   [.] 0xffffd47cb9d83b78                             
+ 7.74%  7.73% ope_gcc_dyn libc-2.23.so   [.] __memmove_avx_unaligned 

。たぶん64ビットで十分でしょう。大きな範囲で動的な場合は、関数全体のテンプレートを作成し、さまざまなビット数のインスタンスを作成して、適切なものを動的に選択することができます。私はあなたがパフォーマンスにおいてもう一つの桁違いを得るべきだと思っています。これは、並行効率を低下させ、パフォーマンス分析の別のラウンドを必要とする可能性があります。

コードのパフォーマンスを理解するツールを使用することは非常に重要です。あらゆる種類の事例に対して非常に単純で非常に優れたツールがあります。あなたの場合、perfなどの単純なもので十分です。

関連する問題