2017-10-03 8 views
0

websocketライブラリを使用して先物APIを試しています。私はこのコードを持っています:先物の接続をシンクとストリームに分割し、2つの異なるタスクで使用する

use futures::future::Future; 
use futures::future; 
use futures::sink::Sink; 
use futures::stream::Stream; 
use futures::sync::mpsc::channel; 
use futures::sync::mpsc::{Sender, Receiver}; 
use tokio_core::reactor::Core; 

use websocket::{ClientBuilder, OwnedMessage}; 

pub fn main() { 
    let mut core = Core::new().unwrap(); 
    let handle = core.handle(); 

    let handle_clone = handle.clone(); 

    let (send, recv): (Sender<String>, Receiver<String>) = channel(100); 

    let f = ClientBuilder::new("wss://...") 
     .unwrap() 
     .async_connect(None, &handle_clone) 
     .map_err(|e| println!("error: {:?}", e)) 

     .map(|(duplex, _)| duplex.split()) 
     .and_then(move |(sink, stream)| { 

      // this task consumes the channel, writes messages to the websocket 
      handle_clone.spawn(future::loop_fn(recv, |recv: Receiver<String>| { 
       sink.send(OwnedMessage::Close(None)) 
        .and_then(|_| future::ok(future::Loop::Break(()))) 
        .map_err(|_|()) 
      })); 

      // the main tasks listens the socket 
      future::loop_fn(stream, |stream| { 
       stream 
        .into_future() 
        .and_then(|_| future::ok(future::Loop::Break(()))) 
        .map_err(|_|()) 
      }) 
     }); 

    loop { 
     core.turn(None) 
    } 
} 

サーバーに接続した後、もう一方のサーバーをブロックしないで「リスナー」と「送信者」タスクを実行します。私が直接送受信するduplexを使用することができ

error[E0507]: cannot move out of captured outer variable in an `FnMut` closure 
    --> src/slack_conn.rs:29:17 
    | 
25 |   .and_then(move |(sink, stream)| { 
    |       ---- captured outer variable 
... 
29 |     sink.send(OwnedMessage::Close(None)) 
    |     ^^^^ cannot move out of captured outer variable in an `FnMut` closure 

、それは悪いことでエラーにつながる:問題は、それが失敗したと、私は新しいタスクにsinkを使用することはできませんです。

この作品を作成する方法についてのご意見はありますか?接続から読み込み、(印刷物等をスクリーニングするために何らかのアクションをとる

  • 1:確かに、私は非blockinglyサーバーに接続し、2つの非同期タスクを起動する私を可能にする任意のfuturesコードと幸せになるだろう)
  • MPSCチャンネルから読み取って、私は別のスタイルでそれを記述する必要があればそれは結構です、接続

に書き込みます1。

+0

これは[ユーザーフォーラムにクロスポストされました](https://users.rust-lang.org/t/futures-question-split-connection-into-sink-stream-in-two-tasks/13156?u = shepmaster)。 – Shepmaster

答えて

1

SplitSinkを所有取るsendを定義するSinkを実装する:一方

fn send(self, item: Self::SinkItem) -> Send<Self> 
where 
    Self: Sized, 

を、loop_fnが閉鎖が複数回呼び出されることが可能であることを必要とします。これらの2つのことは根本的に互換性がありません - あなたは値を消費する必要がある何回も何かを呼び出すことができますか?私が嘘不正のWebSocketサーバーを持っていない -


ここでコンパイルしたコードの完全にテストされていない作品です。実装中に立ち往生

#[macro_use] 
extern crate quick_error; 

extern crate futures; 
extern crate tokio_core; 
extern crate websocket; 

use futures::{Future, Stream, Sink}; 
use futures::sync::mpsc::channel; 
use tokio_core::reactor::Core; 

use websocket::ClientBuilder; 

pub fn main() { 
    let mut core = Core::new().unwrap(); 
    let handle = core.handle(); 

    let (send, recv) = channel(100); 

    let f = ClientBuilder::new("wss://...") 
     .unwrap() 
     .async_connect(None, &handle) 
     .from_err::<Error>() 
     .map(|(duplex, _)| duplex.split()) 
     .and_then(|(sink, stream)| { 
      let reader = stream 
       .for_each(|i| { 
        println!("Read a {:?}", i); 
        Ok(()) 
       }) 
       .from_err(); 

      let writer = sink 
       .sink_from_err() 
       .send_all(recv.map_err(Error::Receiver)) 
       .map(|_|()); 

      reader.join(writer) 
     }); 

    drop(send); // Close the sending channel manually 

    core.run(f).expect("Unable to run"); 
} 

quick_error! { 
    #[derive(Debug)] 
    pub enum Error { 
     WebSocket(err: websocket::WebSocketError) { 
      from() 
      description("websocket error") 
      display("WebSocket error: {}", err) 
      cause(err) 
     } 
     Receiver(err:()) { 
      description("receiver error") 
      display("Receiver error") 
     } 
    } 
} 

のポイントは以下の通りであった。持っている

  1. すべてになるFuture結局
  2. それはエラーのタイプを定義し、それに変換する簡単方法
  3. ItemErrorの関連タイプが「正しい」かどうかがわかりにくいです。私は多くの "タイプアサーション"({ let x: &Future<Item =(), Error =()> = &reader; })をやってしまった。
+0

ありがとうございます、私は〜12時間でこれを試し、それが動作するかどうかをお知らせします。 – sinan

+0

Btw、受信者はもはや 'String'を返しません。元のコード受信者は 'String'を返しました。 – sinan

+0

それは、ありがとう、働く。 – sinan

関連する問題