2017-08-16 8 views
0

を静的応答で実装して、hyper::Clientを使用するコードをテストしようとしています。私は型を見つけ出しましたが、tokio-protorequest/response mismatchと言っているランタイムの問題を理解できません。ここで失敗したことを示している私のコードの簡易版です:テスト用にhyper :: client :: Connectを実装する

extern crate futures; 
extern crate hyper; 
extern crate tokio_core; 
extern crate tokio_io; 

use futures::{future, Future, Stream}; 
use std::str::from_utf8; 
use std::io::Cursor; 

struct Client<'a, C: 'a> { 
    client: &'a hyper::Client<C>, 
    url: &'a str, 
} 

impl<'a, C: hyper::client::Connect> Client<'a, C> { 
    fn get(&self) -> Box<Future<Item = String, Error = hyper::Error>> { 
     Box::new(self.client.get(self.url.parse().unwrap()).and_then(|res| { 
      let body = Vec::new(); 
      res.body() 
       .fold(body, |mut acc, chunk| { 
        acc.extend_from_slice(chunk.as_ref()); 
        Ok::<_, hyper::Error>(acc) 
       }) 
       .and_then(move |value| Ok(String::from(from_utf8(&value).unwrap()))) 
     })) 
    } 
} 

struct StaticConnector<'a> { 
    body: &'a [u8], 
} 

impl<'a> StaticConnector<'a> { 
    fn new(body: &'a [u8]) -> StaticConnector { 
     StaticConnector { body: body } 
    } 
} 

impl<'a> hyper::server::Service for StaticConnector<'a> { 
    type Request = hyper::Uri; 
    type Response = Cursor<Vec<u8>>; 
    type Error = std::io::Error; 
    type Future = Box<Future<Item = Self::Response, Error = Self::Error>>; 

    fn call(&self, _: Self::Request) -> Self::Future { 
     Box::new(future::ok(Cursor::new(self.body.to_vec()))) 
    } 
} 

fn main() { 
    let mut core = tokio_core::reactor::Core::new().unwrap(); 
    let handle = core.handle(); 

    // My StaticConnector for testing 
    let hyper_client = hyper::Client::configure() 
     .connector(StaticConnector::new(
      b"\ 
       HTTP/1.1 200 OK\r\n\ 
       Content-Length: 8\r\n\ 
       \r\n\ 
       Maldives\ 
       ", 
     )) 
     .build(&handle); 

    // Real Connector 
    /* 
    let hyper_client = hyper::Client::configure().build(&handle); 
    */ 

    let client = Client { 
     client: &hyper_client, 
     url: "http://ifconfig.co/country", 
    }; 
    let result = core.run(client.get()).unwrap(); 
    println!("{}", result); 
} 

Playground

私はそれが何らかの形で不完全であるIoためCursorの私の使用だ推測しているが、私は、デバッグに失敗しています進歩する。 1つの考えは、Cursorhyper::Clientの書き込みがおそらく期待通りに機能していないことです。たぶん私は、書き込みのためのsinkと読み取りのための静的コンテンツの組み合わせが必要ですか?私はすべてのアイデアを使用して進歩に失敗しました!

答えて

0

元のコードがうまくいかなかったのは、クライアントが要求を送信する前に読者側から応答があったためです。tokio-protorequest/response mismatchでエラーが発生しています。最初に読者がブロックするように手配する必要があるという点で、あるいは具体的にはstd::io::ErrorKind::WouldBlockでエラーを出して、イベントループに何もないことを示しているが、考慮していないという点で、修正は不便ではないことが判明したEOF 。さらに、リクエストが送信され、tokio-protoマシンが応答を待っていることを示す書き込みを取得すると、futures::task::current.notifyを使用して読み取りをブロック解除します。ここでは期待どおりに動作更新の実装です:

extern crate futures; 
extern crate hyper; 
extern crate tokio_core; 
extern crate tokio_io; 

use futures::{future, Future, Stream, task, Poll}; 
use std::str::from_utf8; 
use std::io::{self, Cursor, Read, Write}; 
use tokio_io::{AsyncRead, AsyncWrite}; 

struct Client<'a, C: 'a> { 
    client: &'a hyper::Client<C>, 
    url: &'a str, 
} 

impl<'a, C: hyper::client::Connect> Client<'a, C> { 
    fn get(&self) -> Box<Future<Item = String, Error = hyper::Error>> { 
     Box::new(self.client.get(self.url.parse().unwrap()).and_then(|res| { 
      let body = Vec::new(); 
      res.body() 
       .fold(body, |mut acc, chunk| { 
        acc.extend_from_slice(chunk.as_ref()); 
        Ok::<_, hyper::Error>(acc) 
       }) 
       .and_then(move |value| Ok(String::from(from_utf8(&value).unwrap()))) 
     })) 
    } 
} 

struct StaticStream { 
    wrote: bool, 
    body: Cursor<Vec<u8>>, 
} 

impl Read for StaticStream { 
    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> { 
     if self.wrote { 
      self.body.read(buf) 
     } else { 
      Err(io::ErrorKind::WouldBlock.into()) 
     } 
    } 
} 

impl Write for StaticStream { 
    fn write<'a>(&mut self, buf: &'a [u8]) -> io::Result<usize> { 
     self.wrote = true; 
     task::current().notify(); 
     Ok(buf.len()) 
    } 

    fn flush(&mut self) -> io::Result<()> { 
     Ok(()) 
    } 
} 

impl AsyncRead for StaticStream {} 

impl AsyncWrite for StaticStream { 
    fn shutdown(&mut self) -> Poll<(), io::Error> { 
     Ok(().into()) 
    } 
} 

struct StaticConnector<'a> { 
    body: &'a [u8], 
} 

impl<'a> StaticConnector<'a> { 
    fn new(body: &'a [u8]) -> StaticConnector { 
     StaticConnector { body: body } 
    } 
} 

impl<'a> hyper::server::Service for StaticConnector<'a> { 
    type Request = hyper::Uri; 
    type Response = StaticStream; 
    type Error = std::io::Error; 
    type Future = Box<Future<Item = Self::Response, Error = Self::Error>>; 

    fn call(&self, _: Self::Request) -> Self::Future { 
     Box::new(future::ok(StaticStream { 
      wrote: false, 
      body: Cursor::new(self.body.to_vec()), 
     })) 
    } 
} 

fn main() { 
    let mut core = tokio_core::reactor::Core::new().unwrap(); 
    let handle = core.handle(); 

    // My StaticConnector for testing 
    let hyper_client = hyper::Client::configure() 
     .connector(StaticConnector::new(
      b"\ 
       HTTP/1.1 200 OK\r\n\ 
       Content-Length: 8\r\n\ 
       \r\n\ 
       Maldives\ 
       ", 
     )) 
     .build(&handle); 

    // Real Connector 
    /* 
    let hyper_client = hyper::Client::configure().build(&handle); 
    */ 

    let client = Client { 
     client: &hyper_client, 
     url: "http://ifconfig.co/country", 
    }; 
    let result = core.run(client.get()).unwrap(); 
    println!("{}", result); 
} 

Playground

注:この実装は、単純な場合のために動作しますが、私は、より複雑なシナリオをテストしていません。たとえば、私が確信していることは、1つ以上の読み書き呼び出しを伴う可能性があるため、要求/応答がどのように大きく動作するかです。

関連する問題