2017-03-07 10 views
9

Rayonのpar_iter()を使用して機能を最適化しようとしています。Rayonのスレッドごとの初期化

シングルスレッド版のようなものです:

fn verify_and_store(store: &mut Store, txs: Vec<Tx>) { 

    let result = txs.iter().map(|tx| { 

     tx.verify_and_store(store) 

    }).collect(); 

    ... 
} 

Storeインスタンスは一つのスレッドでのみ使用されなければならないが、Storeの複数のインスタンスを同時に使用することができますので、私は、これはclone -ingにより、マルチスレッド化することができますstore

fn verify_and_store(store: &mut Store, txs: Vec<Tx>) { 

    let result = txs.par_iter().map(|tx| { 

     let mut local_store = store.clone(); 

     tx.verify_and_store(&mut local_store) 

    }).collect(); 

    ... 
} 

しかし、これはワシントン州でstoreすべてのの反復をクローンyが遅すぎる。スレッドごとに1つのストアインスタンスを使用したいと思います。

これはRayonでも可能ですか?または私は手動のスレッドと作業キューに頼るべきですか?

答えて

5

スレッドローカル変数を使用して、特定のスレッドでlocal_storeが複数回作成されないようにすることができます。例えば

、これは(full source)コンパイル:

fn verify_and_store(store: &mut Store, txs: Vec<Tx>) { 
    use std::cell::RefCell; 
    thread_local!(static STORE: RefCell<Option<Store>> = RefCell::new(None)); 

    let mut result = Vec::new(); 

    txs.par_iter().map(|tx| { 
     STORE.with(|cell| { 
      let mut local_store = cell.borrow_mut(); 
      if local_store.is_none() { 
       *local_store = Some(store.clone()); 
      } 
      tx.verify_and_store(local_store.as_mut().unwrap()) 
     }) 
    }).collect_into(&mut result); 
} 

このコードを持つ2つの問題は、しかしながら、あります。一つは、storeのクローンが、par_iter()が完了したときにバッファをフラッシュするなどの処理が必要な場合です。Dropは、Rayonのワーカースレッドが終了したときだけ呼び出され、その場合でもis not guaranteedが呼び出されます。

さらに深刻な問題は、storeのクローンがワーカースレッドごとに1回だけ作成されていることです。 Rayonがスレッドプールをキャッシュしているとすれば、これは後でverify_and_storeへの無関係の呼び出しが、最後に知られているstoreのクローンで動作し続けることを意味します。これは現在のストアとは関係ありません。彼らはpar_iter()を呼び出したスレッドによってアクセスできるように、

  • ストアMutex<Option<...>>代わりにOptionでクローン化された変数:

    これは多少コードが複雑で整流することができます。これにより、すべてのアクセスでmutexロックが発生しますが、ロックは競合しないため安価です。

  • ベクトル内に作成されたストアクローンへの参照を収集するために、mutexの周囲にArcを使用してください。このベクトルは、反復の終了後にNoneにリセットすることによってストアをクリーンアップするために使用されます。

  • verify_and_storeへの2つの並列呼び出しがお互いのストアクローンを見ることにならないように、コール全体を無関係のミューテックスでラップします。 (これは、新しいスレッドプールが作成され、繰り返しの前にインストールされている場合は避けることができます)。この呼び出しはスレッドプール全体を利用するため、このシリアル化はverify_and_storeのパフォーマンスには影響しません。

結果はかなりありませんが、コンパイルし、唯一の安全なコードを使用し、動作するように表示されます。

fn verify_and_store(store: &mut Store, txs: Vec<Tx>) { 
    use std::sync::{Arc, Mutex}; 
    type SharedStore = Arc<Mutex<Option<Store>>>; 

    lazy_static! { 
     static ref STORE_CLONES: Mutex<Vec<SharedStore>> = Mutex::new(Vec::new()); 
     static ref NO_REENTRY: Mutex<()> = Mutex::new(()); 
    } 
    thread_local!(static STORE: SharedStore = Arc::new(Mutex::new(None))); 

    let mut result = Vec::new(); 
    let _no_reentry = NO_REENTRY.lock(); 

    txs.par_iter().map({ 
     |tx| { 
      STORE.with(|arc_mtx| { 
       let mut local_store = arc_mtx.lock().unwrap(); 
       if local_store.is_none() { 
        *local_store = Some(store.clone()); 
        STORE_CLONES.lock().unwrap().push(arc_mtx.clone()); 
       } 
       tx.verify_and_store(local_store.as_mut().unwrap()) 
      }) 
     } 
    }).collect_into(&mut result); 

    let mut store_clones = STORE_CLONES.lock().unwrap(); 
    for store in store_clones.drain(..) { 
     store.lock().unwrap().take(); 
    } 
} 
+1

それはしかし(この呼び出しにスコープものがあるようには思えない残念ですこれは明らかに適切なケースのサブセットで有用です)。 –

+0

@ChrisEmersonはい、この回答について私が心配しているのは、安全なコードを使用して、作成した店舗を清掃する方法(またはディスクにフラッシュするなど、すべてが完了したら他の任意のコマンドを実行する方法)を考えることができないということです。さらに悪いことに、 'verify_and_store'の次回の呼び出しは** last **の既知の' Store'クローンで動作し続けます。おそらく現在の 'store'とは関係ありません。 – user4815162342

+0

ありがとうございます。これはうまくいきますが、私の場合、私はRayonがクローンの数を減らすために 'par_chunks'を持っていることを発見しました。これは依然としてスレッドごとに複数のクローンが発生する可能性がありますが、@ user4815162342が説明している範囲の問題はありません。 – Tomas

関連する問題