2017-07-12 7 views
2

ブロック機能を検討してください。this_thread :: sleep_for(milliseconds(3000));RXCPP:ブロック機能のタイムアウト

私は、次の動作を取得しようとしている:

Trigger Blocking Function    

|---------------------------------------------X 

私はブロッキング機能をトリガーにしたいし、それが(2秒以上)長すぎるがかかる場合、それはタイムアウトする必要があります。

私は次のことをやった:

my_connection = observable<>::create<int>([](subscriber<int> s) { 
    auto s2 = observable<>::just(1, observe_on_new_thread()) | 
    subscribe<int>([&](auto x) { 
     this_thread::sleep_for(milliseconds(3000)); 
     s.on_next(1); 
    }); 
}) | 
timeout(seconds(2), observe_on_new_thread()); 

私はこの仕事を得ることができません。まずは、別のスレッドからon_nextできないと思います。

私の質問は、これを行う正しい反応的な方法は何ですか? rxcppでブロック機能をラップし、タイムアウトを追加するにはどうすればよいですか?

Trigger    Cleanup 

|------------------------X 
          (Delay) Trigger   Cleanup 
             |-----------------X 
+0

私はこれを行うことができると思ったもう一つの方法は次のとおりです。 '自動接続= timeout.amb(blocking_function_observable)' – jc211

答えて

0

グレート質問:

はその後、私はこのように振る舞うRXストリームを取得したいです!上記はかなり近いです。

次に、rxcppにブロッキング操作を適用する方法の例を示します。 HTTP要求を行うにはlibcurl pollingを実行します。

以下は、あなたが意図した通りに動作するはずです。

auto sharedThreads = observe_on_event_loop(); 

auto my_connection = observable<>::create<int>([](subscriber<int> s) { 
     this_thread::sleep_for(milliseconds(3000)); 
     s.on_next(1); 
     s.on_completed(); 
    }) | 
    subscribe_on(observe_on_new_thread()) | 
    //start_with(0) | // workaround bug in timeout 
    timeout(seconds(2), sharedThreads); 
    //skip(1); // workaround bug in timeout 

my_connection.as_blocking().subscribe(
    [](int){}, 
    [](exception_ptr ep){cout << "timed out" << endl;} 
); 
  • subscribe_onは、専用のスレッドでcreateを実行するため、createそのスレッドをブロックすることが許可されています。
  • timeoutは、他のスレッドと共有できる別のスレッドでタイマーを実行し、同じスレッドへのすべてのon_next/on_error/on_completedコールを転送します。
  • as_blockingsubscribeは完了するまで返されません。これは、main()が終了するのを防ぐためにのみ使用されます。ほとんどの場合、テストまたはサンプルプログラムで使用されます。

EDIT:timeoutのバグの回避策を追加しました。現時点では、最初の値が到着するまでの最初のタイムアウトはスケジュールされません。

EDIT-2:timeoutバグが修正されました。回避方法はもう必要ありません。

+0

こんにちはカーク、この上のご入力いただきありがとうございます。あなたが提案したコードスニペットを実行しました。 'on_next'が' on_error'の前にトリガされているようです。私の疑惑は 'this_thread :: sleep_for()'が内部タイマーの開始からのタイムアウトをブロックしていることです。 's.on_completed'が削除された場合、' on_next'が発生し、続いて 'on_error'がタイムアウトします。 – jc211

+0

これは私が最初にこれを試していないために得るものです。タイムアウト演算子にバグが見つかりました。バグの回避策を使って答えを更新しました。 –

+0

回避策とその動作をテストしました。ありがとうございました! – jc211

関連する問題