2017-03-18 15 views
0

私は並行して処理したいRustのツリー構造を持っています。私の本当の問題はより複雑であるが、これは私が今持っている基本的にシリアルバージョンです:Rustの並行処理ツリー

#[derive(Debug)] 
enum BinaryTree { 
    Node(Box<BinaryTree>, Box<BinaryTree>), 
    Leaf(i32), 
} 

use BinaryTree::*; 

impl BinaryTree { 
    /// Applies the given function to every leaf in the tree 
    fn map<F: Fn(i32) -> i32>(&mut self, f: &F) { 
     match *self { 
      Node(ref mut tree1, ref mut tree2) => { 
       tree1.map(f); 
       tree2.map(f); 
      } 
      Leaf(ref mut n) => *n = f(*n), 
     } 
    } 
} 

私が使用していないこれを並列化したいと思います:

  • はロック
  • スレッドプールを、またはそうでなければ、安全でないコード

問題は非常に自然トンを思わ再作成スレッド

  • (好ましく)に持っていません並列化:すべてのノードで、各子ノードを同時に処理し、ある時点でシリアル・バージョンに落ちる可能性があります。しかし、これにはスコープ付きのスレッドが必要です。これは標準ライブラリにはまだありません。私はscoped-poolクレートのために定住し、この溶液に到着しました:

    extern crate scoped_pool; 
    
    impl BinaryTree { 
    /// Applies the given function to every leaf in the tree 
        fn map_parallel<F>(&mut self, f: &F, scope: &scoped_pool::Scope) 
         where F: Fn(i32) -> i32 + Send + Sync 
        { 
         match self { 
          &mut Node(ref mut tree1, ref mut tree2) => { 
           // Create new, smaller scope 
           scope.zoom(|scope2| { 
            // Concurrently process child nodes 
            scope2.recurse(|scope3| { 
             tree1.map_parallel(f, scope3); 
            }); 
            scope2.recurse(|scope3| { 
             tree2.map_parallel(f, scope3); 
            }); 
           } 
              );}, 
          &mut Leaf(ref mut n) => *n = f(*n), 
         } 
        } 
    } 
    fn main() { 
        let mut tree = Node(
         Box::new(Node(
          Box::new(Node(
           Box::new(Node(
            Box::new(Node(
             Box::new(Leaf(11)), 
             Box::new(Leaf(15)))), 
            Box::new(Leaf(13)))), 
           Box::new(Leaf(19)))), 
          Box::new(Leaf(5)))), 
         Box::new(Leaf(10))); 
    
        let thread_pool = scoped_pool::Pool::new(4); 
        tree.map_parallel(&|n| n + 1, &scoped_pool::Scope::forever(thread_pool)); 
        println!("{:?}", tree); 
    } 
    

    しかし、これはデッドロックにはまり込むように見える、と私はその理由を理解していません。 Rustで並行して木を処理する慣用方法は何ですか?デッドロックで

  • +0

    VisualVMなどのプロファイリングツールを使用して、デッドロックしているスレッド(存在する場合)とそのオブジェクトを見つけるのに役立つことをお勧めします。 –

    答えて

    0

    、そしてあなたが4つのスレッドを持っている理由

    私が提携し、それが終了する前に自分自身map_parallelを呼び出すスレッドを、提携しており、一度map_parallelを呼び出して、理解していませんスレッドなどが含まれます。スレッドが終了する前にスレッドがなくなった。この例では、5つのスレッドに切り替えてデッドロックを「修正」します。明らかに、ツリーの深さに応じてスレッドの数を切り替えることは理想的ではありません。


    コードを別の懸念事項に分割することをおすすめします。既に並列処理をしているので、各ノードは互いに独立している必要があります。それはあなたがイテレータを実装することができることを意味:それはデータ構造とスレッドライブラリの任意のもつれを除去するので

    impl BinaryTree { 
        fn iter_mut(&mut self) -> IterMut { 
         IterMut { queue: vec![self] } 
        } 
    } 
    
    struct IterMut<'a> { 
        queue: Vec<&'a mut BinaryTree>, 
    } 
    
    impl<'a> Iterator for IterMut<'a> { 
        type Item = &'a mut i32; 
    
        fn next(&mut self) -> Option<Self::Item> { 
         match self.queue.pop() { 
          Some(tree) => { 
           match *tree { 
            Leaf(ref mut n) => Some(n), 
            Node(ref mut left, ref mut right) => { 
             self.queue.push(right); 
             self.queue.push(left); 
             self.next() // bad recursive, see note 
            } 
           } 
          } 
          None => None, 
         } 
        } 
    } 
    

    これはいいです。あなたは、私がイテレータで末尾再帰コードを使用することに注意しましょう

    let thread_pool = scoped_pool::Pool::new(2); 
    thread_pool.scoped(|scope| { 
        for node in tree.iter_mut() { 
         scope.execute(move || { 
          *node += 1; 
         }) 
        } 
    }); 
    

    for node in tree.iter_mut() { 
        *node += 1; 
    } 
    

    それとも、ネジ:これで、あなたはその後、シングルスレッドの突然変異を行うことを選択することができます。錆はこれを最適化しないので、あなたは必然的に同等にそれを変更する必要があります。

    イテレータにzipperを使用して調査することもできます。

    関連する問題