重大な警告私はこれまでにこのライブラリを使用したことがなく、一部の概念の低レベルの知識は少し欠けています。ほとんど私はthe tutorialを読んでいます。私は、非同期作業を行った人は誰でもこれを読んで笑ってくれると確信していますが、他の人にとっては便利な出発点になるかもしれません。買い手責任負担!
少し簡単に始めて、Stream
がどのように機能するかを実証しましょう。私たちは、ストリームにResult
秒のイテレータを変換することができます。これは私たちにストリームを消費する1つの方法を示し
extern crate futures;
use futures::Future;
use futures::stream::{self, Stream};
fn main() {
let payloads: Vec<Result<String,()>> = vec![Ok("a".into()), Ok("b".into())];
let payloads = stream::iter(payloads.into_iter());
let foo = payloads
.and_then(|payload| futures::finished(println!("{}", payload)))
.for_each(|_| Ok(()));
foo.forget();
}
。各ペイロードに何かを行うにはand_then
を使用します(ここではそれを印刷するだけです)。for_each
Stream
をFuture
に変換して戻します。そして、奇妙な名前のforget
methodと呼んで未来を走らせることができます。
次に、1つのメッセージだけを扱うRedisライブラリを組み合わせて使用します。 get_message()
メソッドがブロックされているので、いくつかのスレッドをミックスに導入する必要があります。このタイプの非同期システムでは、他のすべてがブロックされるため、大量の作業を実行することはお勧めできません。 For example:
他にもそうするように配置されていない限り、それは非常に迅速にこの機能仕上げの実装ことが保証されなければなりません。
理想的な世界では、レディスクレートは先物のような図書館の上に構築され、これらをネイティブに公開します。
extern crate redis;
extern crate futures;
use std::thread;
use futures::Future;
use futures::stream::{self, Stream};
fn main() {
let client = redis::Client::open("redis://127.0.0.1/").expect("Unable to connect to Redis");
let mut pubsub = client.get_pubsub().expect("Unable to create pubsub handle");
pubsub.subscribe("rust").expect("Unable to subscribe to redis channel");
let (tx, payloads) = stream::channel();
let redis_thread = thread::spawn(move || {
let msg = pubsub.get_message().expect("Unable to get message");
let payload: Result<String, _> = msg.get_payload();
tx.send(payload).forget();
});
let foo = payloads
.and_then(|payload| futures::finished(println!("{}", payload)))
.for_each(|_| Ok(()));
foo.forget();
redis_thread.join().expect("unable to join to thread");
}
ここで私の理解は曖昧になります。別のスレッドでは、メッセージをブロックし、取得したときにメッセージをチャネルにプッシュします。私が理解していないのは、スレッドのハンドルを保持する必要がある理由です。私はfoo.forget
がストリームが空になるまで待って、それ自身をブロックしていると思います。Redisのサーバーへのtelnet接続で
、この送信:
publish rust awesome
をそして、あなたはそれが動作表示されます。 printステートメントを追加すると、スレッドが生成される前に(私にとって)foo.forget
ステートメントが実行されることが示されます。
複数のメッセージがトリッキーです。 Sender
は、生成側が消費側より先に進まないように消費します。これは別の未来をsend
から返すことによって達成されます!私たちは、ループの次の反復のためにそれを再利用するために戻ってそこからシャトルにそれを必要とする:
extern crate redis;
extern crate futures;
use std::thread;
use std::sync::mpsc;
use futures::Future;
use futures::stream::{self, Stream};
fn main() {
let client = redis::Client::open("redis://127.0.0.1/").expect("Unable to connect to Redis");
let mut pubsub = client.get_pubsub().expect("Unable to create pubsub handle");
pubsub.subscribe("rust").expect("Unable to subscribe to redis channel");
let (tx, payloads) = stream::channel();
let redis_thread = thread::spawn(move || {
let mut tx = tx;
while let Ok(msg) = pubsub.get_message() {
let payload: Result<String, _> = msg.get_payload();
let (next_tx_tx, next_tx_rx) = mpsc::channel();
tx.send(payload).and_then(move |new_tx| {
next_tx_tx.send(new_tx).expect("Unable to send successor channel tx");
futures::finished(())
}).forget();
tx = next_tx_rx.recv().expect("Unable to receive successor channel tx");
}
});
let foo = payloads
.and_then(|payload| futures::finished(println!("{}", payload)))
.for_each(|_| Ok(()));
foo.forget();
redis_thread.join().expect("unable to join to thread");
}
私は、時間が経つにつれて相互運用のこのタイプのためのより多くの生態系が存在することを確信しています。例えば、futures-cpupoolクレートは、とおそらくを同様の用途に使用するために拡張することができます。
ライブラリーを利用して、大きな発表をしました。どのように非常に野心的!^_^ – Shepmaster