websocket
ライブラリを使用して先物APIを試しています。私はこのコードを持っています:先物の接続をシンクとストリームに分割し、2つの異なるタスクで使用する
use futures::future::Future;
use futures::future;
use futures::sink::Sink;
use futures::stream::Stream;
use futures::sync::mpsc::channel;
use futures::sync::mpsc::{Sender, Receiver};
use tokio_core::reactor::Core;
use websocket::{ClientBuilder, OwnedMessage};
pub fn main() {
let mut core = Core::new().unwrap();
let handle = core.handle();
let handle_clone = handle.clone();
let (send, recv): (Sender<String>, Receiver<String>) = channel(100);
let f = ClientBuilder::new("wss://...")
.unwrap()
.async_connect(None, &handle_clone)
.map_err(|e| println!("error: {:?}", e))
.map(|(duplex, _)| duplex.split())
.and_then(move |(sink, stream)| {
// this task consumes the channel, writes messages to the websocket
handle_clone.spawn(future::loop_fn(recv, |recv: Receiver<String>| {
sink.send(OwnedMessage::Close(None))
.and_then(|_| future::ok(future::Loop::Break(())))
.map_err(|_|())
}));
// the main tasks listens the socket
future::loop_fn(stream, |stream| {
stream
.into_future()
.and_then(|_| future::ok(future::Loop::Break(())))
.map_err(|_|())
})
});
loop {
core.turn(None)
}
}
サーバーに接続した後、もう一方のサーバーをブロックしないで「リスナー」と「送信者」タスクを実行します。私が直接送受信するduplex
を使用することができ
error[E0507]: cannot move out of captured outer variable in an `FnMut` closure
--> src/slack_conn.rs:29:17
|
25 | .and_then(move |(sink, stream)| {
| ---- captured outer variable
...
29 | sink.send(OwnedMessage::Close(None))
| ^^^^ cannot move out of captured outer variable in an `FnMut` closure
、それは悪いことでエラーにつながる:問題は、それが失敗したと、私は新しいタスクにsink
を使用することはできませんです。
この作品を作成する方法についてのご意見はありますか?接続から読み込み、(印刷物等をスクリーニングするために何らかのアクションをとる
- 1:確かに、私は非blockinglyサーバーに接続し、2つの非同期タスクを起動する私を可能にする任意の
futures
コードと幸せになるだろう) - MPSCチャンネルから読み取って、私は別のスタイルでそれを記述する必要があればそれは結構です、接続
に書き込みます1。
これは[ユーザーフォーラムにクロスポストされました](https://users.rust-lang.org/t/futures-question-split-connection-into-sink-stream-in-two-tasks/13156?u = shepmaster)。 – Shepmaster