2017-04-30 10 views
0

私はtokioとfutures-cpupoolを使用して単純なRPCサーバーを作成しています。サーバーは、ボックス名のクロージャーのBTreeMapを保持し、関数名をキーとして保持します。現在の実装は非常に単純明快です:私は現在、self.functions.get(&sreq.fname)にこの寿命の問題によりブロックされていますCpuPoolの生涯の問題

pub struct SlackerServiceSync<T> 
    where T: Send + Sync + 'static 
{ 
    functions: Arc<BTreeMap<String, RpcFnSync<T>>>, 
    pool: CpuPool, 
} 

impl<T> SlackerServiceSync<T> 
    where T: Send + Sync + 'static 
{ 
    pub fn new(functions: Arc<BTreeMap<String, RpcFnSync<T>>>, 
       threads: usize) 
       -> SlackerServiceSync<T> { 
     let pool = CpuPool::new(threads); 
     SlackerServiceSync { functions, pool } 
    } 
} 

impl<T> Service for SlackerServiceSync<T> 
    where T: Send + Sync + 'static 
{ 
    type Request = SlackerPacket<T>; 
    type Response = SlackerPacket<T>; 
    type Error = io::Error; 
    type Future = BoxFuture<Self::Response, Self::Error>; 

    fn call(&self, req: Self::Request) -> Self::Future { 
     match req { 
      SlackerPacket::Request(sreq) => { 
       debug!("getting request: {:?}", sreq.fname); 
       if let Some(f) = self.functions.get(&sreq.fname) { 
        self.pool 
         .spawn_fn(move || -> FutureResult<T, Self::Error> { 
             ok(f(&sreq.arguments)) 
            }) 
         .and_then(move |result| { 
          debug!("getting results"); 
          ok(SlackerPacket::Response(SlackerResponse { 
                  version: sreq.version, 
                  code: RESULT_CODE_SUCCESS, 
                  content_type: sreq.content_type, 
                  serial_id: sreq.serial_id, 
                  result: result, 
                 })) 
         }) 
         .map_err(|_| io::Error::new(io::ErrorKind::Other, "Oneshot canceled")) 
         .boxed() 
       } else { 
        let error = SlackerError { 
         version: sreq.version, 
         code: RESULT_CODE_NOT_FOUND, 
         serial_id: sreq.serial_id, 
        }; 
        ok(SlackerPacket::Error(error)).boxed() 
       } 
      } 
      SlackerPacket::Ping(ref ping) => { 
       ok(SlackerPacket::Pong(SlackerPong { version: ping.version })).boxed() 
      } 
      _ => err(io::Error::new(io::ErrorKind::InvalidInput, "Unsupported packet")).boxed(), 
     } 
    } 
} 

error[E0495]: cannot infer an appropriate lifetime for lifetime parameter in function call due to conflicting requirements 
    --> src/service.rs:103:49 
    | 
103 |     if let Some(f) = self.functions.get(&sreq.fname) { 
    |             ^^^ 
    | 
note: first, the lifetime cannot outlive the anonymous lifetime #1 defined on the body at 99:55... 
    --> src/service.rs:99:56 
    | 
99 |  fn call(&self, req: Self::Request) -> Self::Future { 
    | ________________________________________________________^ 
100 | |   match req { 
101 | |    SlackerPacket::Request(sreq) => { 
102 | |     debug!("getting request: {:?}", sreq.fname); 
... | 
133 | |   } 
134 | |  } 
    | |_____^ 
note: ...so that reference does not outlive borrowed content 
    --> src/service.rs:103:34 
    | 
103 |     if let Some(f) = self.functions.get(&sreq.fname) { 
    |         ^^^^^^^^^^^^^^ 
    = note: but, the lifetime must be valid for the static lifetime... 
note: ...so that the type `[[email protected]/service.rs:105:35: 107:36 f:&std::boxed::Box<for<'r> std::ops::Fn(&'r std::vec::Vec<T>) -> T + std::marker::Send + std::marker::Sync>, sreq:packets::SlackerRequest<T>]` will meet its required lifetime bounds 
    --> src/service.rs:105:26 
    | 
105 |       .spawn_fn(move || -> FutureResult<T, Self::Error> { 
    |       ^^^^^^^^ 

Similar code works without CpuPool。私は完全にコンパイラによって報告されたエラーを理解することはできません。

Full code is here

+1

'sreq'の生涯は何ですか?より具体的には、 'sreq'への参照が' call'よりも長くなると思うようになります(別の実行スレッドへの参照を送るので必要です)。 @MatthieuM。 –

+0

。 sreqは典型的なTokioリクエストです。クローズを 'Arc'にラップし、別のスレッドに送る前にクローンする必要があることが判明しました。 –

答えて

0

それは私がこのようなRcpFnSyncを宣言することにより、Arcに閉鎖をラップする必要が判明:

pub type RpcFnSync<T> = Arc<Fn(&Vec<T>) -> T + Send + Sync + 'static>; 

その後、別のスレッドに送信する前にそれをクローン:

fn call(&self, req: Self::Request) -> Self::Future { 
    match req { 
     SlackerPacket::Request(sreq) => { 
      debug!("getting request: {:?}", sreq.fname); 
      if let Some(fi) = self.functions.get(&sreq.fname) { 
       let f = fi.clone(); 

       self.pool 
        .spawn_fn(move || -> FutureResult<Self::Response, Self::Error> { 
         let result = f(&sreq.arguments); 
         ok(SlackerPacket::Response(SlackerResponse { 
                 version: sreq.version, 
                 code: RESULT_CODE_SUCCESS, 
                 content_type: sreq.content_type, 
                 serial_id: sreq.serial_id, 
                 result: result, 
                })) 
        }) 
        .boxed() 
      } else { 
       let error = SlackerError { 
        version: sreq.version, 
        code: RESULT_CODE_NOT_FOUND, 
        serial_id: sreq.serial_id, 
       }; 
       ok(SlackerPacket::Error(error)).boxed() 
      } 
     } 
     SlackerPacket::Ping(ref ping) => { 
      ok(SlackerPacket::Pong(SlackerPong { version: ping.version })).boxed() 
     } 
     _ => err(io::Error::new(io::ErrorKind::InvalidInput, "Unsupported packet")).boxed(), 
    } 
} 
関連する問題