0
私はtokioとfutures-cpupoolを使用して単純なRPCサーバーを作成しています。サーバーは、ボックス名のクロージャーのBTreeMap
を保持し、関数名をキーとして保持します。現在の実装は非常に単純明快です:私は現在、self.functions.get(&sreq.fname)
にこの寿命の問題によりブロックされていますCpuPoolの生涯の問題
pub struct SlackerServiceSync<T>
where T: Send + Sync + 'static
{
functions: Arc<BTreeMap<String, RpcFnSync<T>>>,
pool: CpuPool,
}
impl<T> SlackerServiceSync<T>
where T: Send + Sync + 'static
{
pub fn new(functions: Arc<BTreeMap<String, RpcFnSync<T>>>,
threads: usize)
-> SlackerServiceSync<T> {
let pool = CpuPool::new(threads);
SlackerServiceSync { functions, pool }
}
}
impl<T> Service for SlackerServiceSync<T>
where T: Send + Sync + 'static
{
type Request = SlackerPacket<T>;
type Response = SlackerPacket<T>;
type Error = io::Error;
type Future = BoxFuture<Self::Response, Self::Error>;
fn call(&self, req: Self::Request) -> Self::Future {
match req {
SlackerPacket::Request(sreq) => {
debug!("getting request: {:?}", sreq.fname);
if let Some(f) = self.functions.get(&sreq.fname) {
self.pool
.spawn_fn(move || -> FutureResult<T, Self::Error> {
ok(f(&sreq.arguments))
})
.and_then(move |result| {
debug!("getting results");
ok(SlackerPacket::Response(SlackerResponse {
version: sreq.version,
code: RESULT_CODE_SUCCESS,
content_type: sreq.content_type,
serial_id: sreq.serial_id,
result: result,
}))
})
.map_err(|_| io::Error::new(io::ErrorKind::Other, "Oneshot canceled"))
.boxed()
} else {
let error = SlackerError {
version: sreq.version,
code: RESULT_CODE_NOT_FOUND,
serial_id: sreq.serial_id,
};
ok(SlackerPacket::Error(error)).boxed()
}
}
SlackerPacket::Ping(ref ping) => {
ok(SlackerPacket::Pong(SlackerPong { version: ping.version })).boxed()
}
_ => err(io::Error::new(io::ErrorKind::InvalidInput, "Unsupported packet")).boxed(),
}
}
}
。
error[E0495]: cannot infer an appropriate lifetime for lifetime parameter in function call due to conflicting requirements
--> src/service.rs:103:49
|
103 | if let Some(f) = self.functions.get(&sreq.fname) {
| ^^^
|
note: first, the lifetime cannot outlive the anonymous lifetime #1 defined on the body at 99:55...
--> src/service.rs:99:56
|
99 | fn call(&self, req: Self::Request) -> Self::Future {
| ________________________________________________________^
100 | | match req {
101 | | SlackerPacket::Request(sreq) => {
102 | | debug!("getting request: {:?}", sreq.fname);
... |
133 | | }
134 | | }
| |_____^
note: ...so that reference does not outlive borrowed content
--> src/service.rs:103:34
|
103 | if let Some(f) = self.functions.get(&sreq.fname) {
| ^^^^^^^^^^^^^^
= note: but, the lifetime must be valid for the static lifetime...
note: ...so that the type `[[email protected]/service.rs:105:35: 107:36 f:&std::boxed::Box<for<'r> std::ops::Fn(&'r std::vec::Vec<T>) -> T + std::marker::Send + std::marker::Sync>, sreq:packets::SlackerRequest<T>]` will meet its required lifetime bounds
--> src/service.rs:105:26
|
105 | .spawn_fn(move || -> FutureResult<T, Self::Error> {
| ^^^^^^^^
Similar code works without CpuPool。私は完全にコンパイラによって報告されたエラーを理解することはできません。
'sreq'の生涯は何ですか?より具体的には、 'sreq'への参照が' call'よりも長くなると思うようになります(別の実行スレッドへの参照を送るので必要です)。 @MatthieuM。 –
。 sreqは典型的なTokioリクエストです。クローズを 'Arc'にラップし、別のスレッドに送る前にクローンする必要があることが判明しました。 –