2017-04-15 12 views
2

私はTokioライブラリによって提供されている例を使用しており、現在アクティブなすべてのTCP接続のベクトルを取得しようとしています。最終的には、アクティブな接続のそれぞれにメッセージをループし、メッセージをソケットに書き込むことで、メッセージをブロードキャストできるようにしたいと考えています。mutexロックを使用した複数のスレッドからのベクトルへの同時アクセス

まず、別のスレッドで接続を受け入れている間に、現在の接続数を1つのスレッドでプリントしようとしています。

これを行うには、共有ベクターを使用しようとしています。私はまだベクトルからの接続の切断を実装解除していません。現時点で

// A tiny async echo server with tokio-core 
extern crate futures; 
extern crate tokio_core; 
extern crate tokio_io; 

use futures::{Future, Stream}; 
use tokio_io::{io, AsyncRead}; 
use tokio_core::net::TcpListener; 
use tokio_core::reactor::Core; 
use std::thread; 
use std::sync::{Arc, Mutex}; 
use std::io::stdout; 
use std::io::Write; 

fn main() { 
    // Create the event loop that will drive this server 
    let mut core = Core::new().unwrap(); 
    let handle = core.handle(); 

    // Bind the server's socket 
    let addr = "127.0.0.1:12345".parse().unwrap(); 
    let tcp = TcpListener::bind(&addr, &handle).unwrap(); 

    let mut connections = Arc::new((Mutex::new(Vec::new()))); 

    thread::spawn(move || { 
     //Every 10 seconds print out the current number of connections 
     let mut i; 
     loop {    
      i = connections.lock().unwrap().len(); 
      println!("There are {} connections", i); 
      stdout().flush(); 
      thread::sleep_ms(10000); 
     } 
    }); 



    // Iterate incoming connections 
    let server = tcp.incoming().for_each(|(tcp, _)| { 

     connections.lock().unwrap().push(tcp); 
     // Split up the read and write halves 
     let (reader, writer) = tcp.split(); 

     // Future of the copy 
     let bytes_copied = io::copy(reader, writer); 

     // ... after which we'll print what happened 
     let handle_conn = bytes_copied.map(|(n, _, _)| { 
      println!("wrote {} bytes", n) 
     }).map_err(|err| { 
      println!("IO error {:?}", err) 
     }); 

     // Spawn the future as a concurrent task 
     handle.spawn(handle_conn); 

     Ok(()) 
    }); 

    // Spin up the server on the event loop 
    core.run(server).unwrap(); 

} 

これは、次のエラーでビルドに失敗します

error[E0382]: capture of moved value: `connections` 
    --> src/main.rs:36:42 
    | 
26 |  thread::spawn(move || { 
    |     ------- value moved (into closure) here 
... 
36 |  let server = tcp.incoming().for_each(|(tcp, _)| { 
    |           ^^^^^^^^^^ value captured here after move 
    | 
    = note: move occurs because `connections` has type `std::sync::Arc<std::sync::Mutex<std::vec::Vec<tokio_core::net::TcpStream>>>`, which does not implement the `Copy` trait 

error[E0382]: use of moved value: `tcp` 
    --> src/main.rs:40:32 
    | 
38 |   connections.lock().unwrap().push(tcp); 
    |           --- value moved here 
39 |   // Split up the read and write halves 
40 |   let (reader, writer) = tcp.split(); 
    |        ^^^ value used here after move 
    | 
    = note: move occurs because `tcp` has type `tokio_core::net::TcpStream`, which does not implement the `Copy` trait 

は、それがどんな危険なコードを記述することなく、これを達成することは可能ですか?

答えて

5

あなたが原因で移動閉鎖の最初のエラーが表示されます。

let mut connections = Arc::new((Mutex::new(Vec::new()))); 
thread::spawn(move || { 
    let mut i = connections.lock().unwrap().len(); 
    .... 
} 

これは実際にあなただけ「の部分を」移動したいしながら、それを、全体Arcを動かす(つまり、このようにそれを移動参照カウントがインクリメントされ、両方のスレッドがそれを使用できるようにします)。

これを行うために、我々はArc::cloneを使用することができます。

let mut connections = Arc::new((Mutex::new(Vec::new()))); 
let conn = connections.clone(); 
thread::spawn(move || { 
    let mut i = conn.lock().unwrap().len(); 
    .... 
} 

この方法では、クローン化されたArcconnは、閉鎖に移動され、元のArcconnectionsは、ないので、まだ使用可能。

2番目のエラーで何をやっているのか正確にはわかりませんが、接続を数えるためには、全体をpushにする必要はありません。

+0

多くのお返事をいただきありがとうございます。質問で述べたように、最終的には、現在アクティブなすべての接続のベクトルを持つことができ、ベクトルを反復処理してそれぞれにメッセージをブロードキャストできるようにしたいと考えています。それぞれのtcp接続にベクトルを押し込むことは可能でしょうか? – John

+0

あなたはベクトルのように 'Arc >'で接続をラップしようとすることができますが、これはあなたができることを制限します(例えば、 '.split'呼び出しで移動することはできません)。 – MartinHaTh

+0

私は最初のスレッドからループを逃したことに気付きました。おそらくそれは今より意味をなさないでしょう。 – John

関連する問題