2017-03-21 8 views
2

私は今、Rustの先物/ tokioで実験を始めました。私は本当に基本的なことを先物やストリームだけでやることができます。私はあなたが将来と川の間でどのように選択できるのだろうと思っていました。Rustの将来とストリームの選択方法は?

tokioのドキュメントからおもちゃの問題を拡張して、tokio_timer::Timerを使用してタイムアウトHTTPSリクエストを行うにはどうすればよいですか?

extern crate futures; 
extern crate native_tls; 
extern crate tokio_core; 
extern crate tokio_io; 
extern crate tokio_tls; 

use std::io; 
use std::net::ToSocketAddrs; 

use futures::Future; 
use native_tls::TlsConnector; 
use tokio_core::net::TcpStream; 
use tokio_core::reactor::Core; 
use tokio_tls::TlsConnectorExt; 

fn main() { 
    let mut core = Core::new().unwrap(); 
    let handle = core.handle(); 
    let addr = "www.rust-lang.org:443".to_socket_addrs().unwrap().next().unwrap(); 

    let cx = TlsConnector::builder().unwrap().build().unwrap(); 
    let socket = TcpStream::connect(&addr, &handle); 

    let tls_handshake = socket.and_then(|socket| { 
     let tls = cx.connect_async("www.rust-lang.org", socket); 
     tls.map_err(|e| { 
      io::Error::new(io::ErrorKind::Other, e) 
     }) 
    }); 
    let request = tls_handshake.and_then(|socket| { 
     tokio_io::io::write_all(socket, "\ 
      GET/HTTP/1.0\r\n\ 
      Host: www.rust-lang.org\r\n\ 
      \r\n\ 
     ".as_bytes()) 
    }); 
    let response = request.and_then(|(socket, _request)| { 
     tokio_io::io::read_to_end(socket, Vec::new()) 
    }); 

    let (_socket, data) = core.run(response).unwrap(); 
    println!("{}", String::from_utf8_lossy(&data)); 
} 
+0

接続にタイムアウトが必要なのですか? – 46bit

+0

はい - 私は先物箱で遊んだ後にそれを把握しました – opensourcegeek

答えて

2

あなたはStreamFutureを変換して、二つの流れに選択することができます。

extern crate futures; 

use futures::{Future, Stream}; 

fn select_stream_or_future_as_stream<S, F>(
    stream: S, 
    future: F, 
) -> Box<Stream<Item = S::Item, Error = S::Error>> 
where 
    S: Stream + 'static, 
    F: Future<Item = S::Item, Error = S::Error> + 'static, 
{ 
    let future_as_stream = future.into_stream(); 

    Box::new(future_as_stream.select(stream)) 
} 
0

私は、サンプルコードを適応方法は、以下の通りです初心者のために有用である可能性があります。

let timer = tokio_timer::Timer::default(); 
    // Error out when timeout is reached 
    let timeout = timer.sleep(time::Duration::from_millis(950)).then(|_| { 
     future::err(io::Error::new(io::ErrorKind::Other, "Timeout")) 
    }); 

    let handle = core.handle(); 

    // this returns IoFuture = BoxFuture<T, io::Error>; 
    let addresses = tokio_dns::CpuPoolResolver::new(1 as usize).resolve("www.google.cz"); 
    let socket = addresses.and_then(|all_addresses| { 
     let mut ipv4_addresses = all_addresses.iter().filter(|x| is_ipv4(**x)); 
     let addr = ipv4_addresses.next().unwrap(); 
     let sock = TcpStream::connect(&SocketAddr::new(*addr, 443), &handle); 
     sock.map_err(|e| { 
      println!("{:?}", e); 
      io::Error::new(io::ErrorKind::Other, e) 
     }) 
    }); 

    let tls_handshake = socket.and_then(|socket| { 
     println!("Got socket"); 
     let cx = TlsConnector::builder().unwrap().build().unwrap(); 
     let tls = cx.connect_async("www.google.cz", socket); 
     tls.map_err(|e| { 
      println!("{:?}", e); 
      io::Error::new(io::ErrorKind::Other, e) 
     }) 
    }); 

    let request = tls_handshake.and_then(|socket| { 
     println!("SSL Handshake Successful"); 
     let write_all = tokio_io::io::write_all(socket, "\ 
      GET/HTTP/1.0\r\n\ 
      Host: www.google.cz\r\n\ 
      \r\n\ 
     ".as_bytes()); 
     println!("Wrote to socket"); 
     write_all.map_err(|e| { 
      println!("{:?}", e); 
      io::Error::new(io::ErrorKind::Other, e) 
     }) 
    }); 

    let response = request.and_then(|(socket, _request)| { 
     let read_till_end = tokio_io::io::read_to_end(socket, Vec::new()); 
     println!("Read till end of socket"); 
     read_till_end 
    }); 

    let waiter = response.select(timeout).map(|(win, _)| { 
     let (_socket, data) = win; 
     data 
    }); 

    let result = core.run(waiter); 
関連する問題