2017-07-01 11 views
3

私はいくつかの重い操作を実行する複数のスレッドがあり、私は仕事中にクライアントを使用する必要があります。私はHTTPクライアントとしてHyper v0.11を使用していますので、接続を開いたままにするために(keep-aliveモード)同じhyper::Clientを共有する必要があるように、接続を再利用したいと思います。別のスレッドからhyper :: clientを使用するにはどうしたらいいですか?

クライアントはスレッド間で共有できません(SyncまたはSendを実装していません)。ここで私が何をしてきたコードの小さなスニペット:このコードはコンパイルされません

let mut core = Core::new().expect("Create Client Event Loop"); 
let handle = core.handle(); 

let remote = core.remote(); 

let client = Client::new(&handle.clone()); 

thread::spawn(move || { 

    // intensive operations... 

    let response = &client.get("http://google.com".parse().unwrap()).and_then(|res| { 
     println!("Response: {}", res.status()); 
     Ok(()) 
    }); 

    remote.clone().spawn(|_| { 
     response.map(|_| {() }).map_err(|_| {() }) 
    }); 

    // more intensive operations... 
}); 
core.run(futures::future::empty::<(),()>()).unwrap(); 

thread::spawn(move || { 
^^^^^^^^^^^^^ within `[clos[email protected]/load-balancer.rs:46:19: 56:6 client:hyper::Client<hyper::client::HttpConnector>, remote:std::sync::Arc<tokio_core::reactor::Remote>]`, the trait `std::marker::Send` is not implemented for `std::rc::Weak<std::cell::RefCell<tokio_core::reactor::Inner>>` 

thread::spawn(move || { 
^^^^^^^^^^^^^ within `[[email protected]/load-balancer.rs:46:19: 56:6 client:hyper::Client<hyper::client::HttpConnector>, remote:std::sync::Arc<tokio_core::reactor::Remote>]`, the trait `std::marker::Send` is not implemented for `std::rc::Rc<std::cell::RefCell<hyper::client::pool::PoolInner<tokio_proto::util::client_proxy::ClientProxy<tokio_proto::streaming::message::Message<hyper::http::MessageHead<hyper::http::RequestLine>, hyper::Body>, tokio_proto::streaming::message::Message<hyper::http::MessageHead<hyper::http::RawStatus>, tokio_proto::streaming::body::Body<hyper::Chunk, hyper::Error>>, hyper::Error>>>>` 
... 
remote.clone().spawn(|_| { 
       ^^^^^ the trait `std::marker::Sync` is not implemented for `futures::Future<Error=hyper::Error, Item=hyper::Response> + 'static` 

は異なるスレッドから同じクライアントを再利用する方法またはいくつかの他はありますアプローチ?

答えて

3

短い答えはいいえですが、その方がいいです。

Clientオブジェクトには接続プールがあります。ここではハイパーのPoolがバージョン0.11.0に定義されている方法は次のとおりです。innerとして

pub struct Pool<T> { 
    inner: Rc<RefCell<PoolInner<T>>>, 
} 

、プールは確かにスレッドセーフな参照カウントRcとし、借り-チェックを実行時にRefCellではありません。そのClientを新しいスレッドに移動しようとすると、そのオブジェクトは別のスレッドに存在するプールを保持していました。これはデータ競争の原因でした。

この実装は理解できます。 HTTP接続を複数のスレッドに再利用しようとするのは、ほとんどI/O集約型のリソースへの同期アクセスを必要とするため、それほど一般的ではありません。これは、Tokioの非同期性と非常によく似ています。実際には、同じスレッドで複数のリクエストを実行し、各レスポンスを順番に待つことなく、Tokioのコアでメッセージの送信と非同期受信を処理する方が合理的です。さらに、計算集約型タスクは、futures_cpupoolのCPUプールによって実行できます。このことを念頭に置いて、以下のコードは正常に動作します:

extern crate tokio_core; 
extern crate hyper; 
extern crate futures; 
extern crate futures_cpupool; 

use tokio_core::reactor::Core; 
use hyper::client::Client; 
use futures::Future; 
use futures_cpupool::CpuPool; 

fn main() { 

    let mut core = Core::new().unwrap(); 
    let handle = core.handle(); 
    let client = Client::new(&handle.clone()); 
    let pool = CpuPool::new(1); 

    println!("Begin!"); 
    let req = client.get("http://google.com".parse().unwrap()) 
     .and_then(|res| { 
      println!("Response: {}", res.status()); 
      Ok(()) 
     }); 
    let intensive = pool.spawn_fn(|| { 
     println!("I'm working hard!!!"); 
     std::thread::sleep(std::time::Duration::from_secs(1)); 
     println!("Phew!"); 
     Ok(()) 
    }); 

    let task = req.join(intensive) 
     .map(|_|{ 
      println!("End!"); 
     }); 
    core.run(task).unwrap(); 
} 

応答が遅すぎる受信されない場合、出力は次のようになります。

Begin! 
I'm working hard!!! 
Response: 302 Found 
Phew! 
End! 

あなたが別のスレッドで実行されている複数のタスクを持っている場合は、実現可能な複数のアーキテクチャが存在するため、問題は自由になります。それらのうちの1つは、すべての通信を1つのアクターに委任して、他のすべてのワーカースレッドにデータを送信することです。あるいは、各ワーカーに1つのクライアントオブジェクトを持たせることができます。したがって、別個の接続プールも使用できます。

+1

ご回答いただきありがとうございます。私は別のスレッドからの再利用接続が使えないという事実に同意していません、あなたは先物に基づいたスキーマ(Akka HTTPのScalaに似ています)を持っていれば、未来の束を保つことができます。要求に依存する。これは、同じイベントループ内にすべてがある場合、tokioでうまくいきますが、リクエストhttpのようなタスクを外部イベントループに送信する簡単な方法はありません。 – cspinetta

+0

多くのスレッドが何をしているのか説明していません。これまでのところ、XY問題のように聞こえます。あなたは、マルチスレッド化されたタスクの結果を、他の方法ではなくクライアントのスレッドに移すことを検討しましたか? –

+0

はい、そうです、これはユースケースです:私はいくつかの暗号化と復号化操作(主に集中操作です)を備えた最小限のロードバランサを構築しています。接続、HTTPクライアントとしてのn回のイベントループを処理するためのスレッドプール。私はそれを達成するのが非常に難しいと感じました。今はN個のイベントループしかなく、それぞれがtcp接続を受け取り、トークンを復号します。いくつかの要求を実行します。しかし、これは良い方法ではないと私は思う。 – cspinetta

関連する問題