2016-08-11 6 views
7

私のアプリケーションがRedis PubSubチャンネルからストリーミングデータを受信して​​処理できるシステムを作成しようとしています。futures.rsとRedis PubSubを使用してブロッキングコールの先物ストリームを実装する方法は?

let msg = match pubsub.get_message() { 
     Ok(m) => m, 
     Err(_) => panic!("Could not get message from pubsub!") 
}; 
let payload: String = match msg.get_payload() { 
    Ok(s) => s, 
    Err(_) => panic!("Could not convert redis message to string!") 
}; 

I:私は、私が見てきた錆のための他のすべてのRedisのドライバーと一緒に、使用していますRedis driverは、それがデータを受信したときにのみ値を返すチャネルからデータを取得するためにブロック操作を使用しますfutures-rsライブラリーを使用して今後このブロッキング関数呼び出しをラップして、入力を待っている間にアプリケーション内で他のタスクを実行できるようにしたいと考えていました。

先物についてはtutorialを読んで、PubSubがデータを受け取ったときに通知するStreamを作成しようとしましたが、その方法を理解できません。

schedulepollの機能をブロッキングpubsub.get_message()の機能として作成するにはどうすればよいですか?

+5

ライブラリーを利用して、大きな発表をしました。どのように非常に野心的!^_^ – Shepmaster

答えて

10

重大な警告私はこれまでにこのライブラリを使用したことがなく、一部の概念の低レベルの知識は少し欠けています。ほとんど私はthe tutorialを読んでいます。私は、非同期作業を行った人は誰でもこれを読んで笑ってくれると確信していますが、他の人にとっては便利な出発点になるかもしれません。買い手責任負担!


少し簡単に始めて、Streamがどのように機能するかを実証しましょう。私たちは、ストリームにResult秒のイテレータを変換することができます。これは私たちにストリームを消費する1つの方法を示し

extern crate futures; 

use futures::Future; 
use futures::stream::{self, Stream}; 

fn main() { 
    let payloads: Vec<Result<String,()>> = vec![Ok("a".into()), Ok("b".into())]; 
    let payloads = stream::iter(payloads.into_iter()); 

    let foo = payloads 
     .and_then(|payload| futures::finished(println!("{}", payload))) 
     .for_each(|_| Ok(())); 

    foo.forget(); 
} 

。各ペイロードに何かを行うにはand_thenを使用します(ここではそれを印刷するだけです)。for_eachStreamFutureに変換して戻します。そして、奇妙な名前のforget methodと呼んで未来を走らせることができます。


次に、1つのメッセージだけを扱うRedisライブラリを組み合わせて使用​​します。 get_message()メソッドがブロックされているので、いくつかのスレッドをミックスに導入する必要があります。このタイプの非同期システムでは、他のすべてがブロックされるため、大量の作業を実行することはお勧めできません。 For example

他にもそうするように配置されていない限り、それは非常に迅速にこの機能仕上げの実装ことが保証されなければなりません。

理想的な世界では、レディスクレートは先物のような図書館の上に構築され、これらをネイティブに公開します。

extern crate redis; 
extern crate futures; 

use std::thread; 
use futures::Future; 
use futures::stream::{self, Stream}; 

fn main() { 
    let client = redis::Client::open("redis://127.0.0.1/").expect("Unable to connect to Redis"); 

    let mut pubsub = client.get_pubsub().expect("Unable to create pubsub handle"); 
    pubsub.subscribe("rust").expect("Unable to subscribe to redis channel"); 

    let (tx, payloads) = stream::channel(); 

    let redis_thread = thread::spawn(move || { 
     let msg = pubsub.get_message().expect("Unable to get message"); 
     let payload: Result<String, _> = msg.get_payload(); 
     tx.send(payload).forget(); 
    }); 

    let foo = payloads 
     .and_then(|payload| futures::finished(println!("{}", payload))) 
     .for_each(|_| Ok(())); 

    foo.forget(); 
    redis_thread.join().expect("unable to join to thread"); 
} 

ここで私の理解は曖昧になります。別のスレッドでは、メッセージをブロックし、取得したときにメッセージをチャネルにプッシュします。私が理解していないのは、スレッドのハンドルを保持する必要がある理由です。私はfoo.forgetがストリームが空になるまで待って、それ自身をブロックしていると思います。Redisのサーバーへのtelnet接続で

、この送信:

publish rust awesome 

をそして、あなたはそれが動作表示されます。 printステートメントを追加すると、スレッドが生成される前に(私にとって)foo.forgetステートメントが実行されることが示されます。


複数のメッセージがトリッキーです。 Senderは、生成側が消費側より先に進まないように消費します。これは別の未来をsendから返すことによって達成されます!私たちは、ループの次の反復のためにそれを再利用するために戻ってそこからシャトルにそれを必要とする:

extern crate redis; 
extern crate futures; 

use std::thread; 
use std::sync::mpsc; 

use futures::Future; 
use futures::stream::{self, Stream}; 

fn main() { 
    let client = redis::Client::open("redis://127.0.0.1/").expect("Unable to connect to Redis"); 

    let mut pubsub = client.get_pubsub().expect("Unable to create pubsub handle"); 
    pubsub.subscribe("rust").expect("Unable to subscribe to redis channel"); 

    let (tx, payloads) = stream::channel(); 

    let redis_thread = thread::spawn(move || { 
     let mut tx = tx; 

     while let Ok(msg) = pubsub.get_message() { 
      let payload: Result<String, _> = msg.get_payload(); 

      let (next_tx_tx, next_tx_rx) = mpsc::channel(); 

      tx.send(payload).and_then(move |new_tx| { 
       next_tx_tx.send(new_tx).expect("Unable to send successor channel tx"); 
       futures::finished(()) 
      }).forget(); 

      tx = next_tx_rx.recv().expect("Unable to receive successor channel tx"); 
     } 
    }); 

    let foo = payloads 
     .and_then(|payload| futures::finished(println!("{}", payload))) 
     .for_each(|_| Ok(())); 

    foo.forget(); 
    redis_thread.join().expect("unable to join to thread"); 
} 

私は、時間が経つにつれて相互運用のこのタイプのためのより多くの生態系が存在することを確信しています。例えば、futures-cpupoolクレートは、とおそらくを同様の用途に使用するために拡張することができます。

+0

すばらしい答えをありがとう!ちょうど1つの質問: 'redis_thread'に参加しないと、結果読み込みプロセスを非ブロック化するための全努力が無効になりますか?おそらく私は理解していないことがあります。 – Ameo

+1

"foo.forgetはストリームが空になるまで待っていることを期待しています。"実際には、先物は "準備完了までのブロック"メソッドを提供する義務はありません。 'forget()'は、その説明がある限り、未来が落ちるときに自動的に取り消されるのを防ぐために必要ですが、待ちには関係しません。たとえばScalaでは、 'Future'にはそのためのメソッドはありませんが、代わりに、いくつかのタイムアウトの中で未来ができるまで待つ別の' Await.ready'/'Await.result'メソッドのペアがあります。 –

+1

私が理解する限り、将来的には[Future :: select'](http://alexcrichton.com/futures-rs/futures/trait.Future.html#method)で同様のことを実装することは可能です.select)、2番目の未来は一定のタイムアウト後に完了します。 –

関連する問題