2016-10-31 7 views
1

私はBoost ASIOのソースコードを読んでいます。epoll_waitを呼び出すためのスレッドは単なるスレッドであることがわかります(もちろん、epoll reactorを使用している場合はもちろんです)。
epoll_waitを呼び出すための複数のスレッドに関する解決策を探したいのですが、これは同時に同じソケットの読み込みを異なるスレッドで行う可能性があります。 次のように私はいくつかのキーコードを読み取る:ブーストio_serviceの場合、epoll_waitで唯一のスレッドがブロックされていますか?

// Prepare to execute first handler from queue. 
     operation* o = op_queue_.front(); 
     op_queue_.pop(); 
     bool more_handlers = (!op_queue_.empty()); 

     if (o == &task_operation_) 
     { 
     task_interrupted_ = more_handlers; 

     if (more_handlers && !one_thread_) 
      wakeup_event_.unlock_and_signal_one(lock); 
     else 
      lock.unlock(); 

     task_cleanup on_exit = { this, &lock, &this_thread }; 
     (void)on_exit; 

     // Run the task. May throw an exception. Only block if the operation 
     // queue is empty and we're not polling, otherwise we want to return 
     // as soon as possible. 
     task_->run(!more_handlers, this_thread.private_op_queue); 
     } 

task_はファイルディスクリプタ反応器である、それは、私はそれがop_queue_で唯一の「task_operation_」ので、それを呼び出すためのスレッドを1つだけかもしれないと思い 、実行中のイベントがepoll_waitを呼び出します。私は正しい?
マルチスレッドでepollを使用したい場合や、1つのスレッドが1つのソケットを一度に処理できるように "EPOLONESHOT"を使用することができます。あなたがio_serviceの単一のインスタンスを使用して、複数のスレッドからio_service::runメソッドを呼び出しているとき

+3

特別なことは何もありません。あなたがASIO開発者よりもあなたのプラットフォームでマルチスレッド化する方法をよく知っていると思っているのでなければ、混乱させないでください。それはすでに彼らが思いついた最高のデザインです。 –

答えて

0
  • 最初のケースは、あります。

    std::size_t scheduler::run(asio::error_code& ec) 
    { 
        mutex::scoped_lock lock(mutex_); 
    
        std::size_t n = 0; 
        for (; do_run_one(lock, this_thread, ec); lock.lock()) 
        if (n != (std::numeric_limits<std::size_t>::max)()) 
         ++n; 
        return n; 
    } 
    

    ので、開催されたロックで、それはdo_run_oneメソッドを呼び出し、のようなものです::

    std::size_t scheduler::do_run_one(mutex::scoped_lock& lock, 
        scheduler::thread_info& this_thread, 
        const asio::error_code& ec) 
    { 
        while (!stopped_) 
        { 
        if (!op_queue_.empty()) 
        { 
         // Prepare to execute first handler from queue. 
         operation* o = op_queue_.front(); 
         op_queue_.pop(); 
         bool more_handlers = (!op_queue_.empty()); 
    
         if (o == &task_operation_) 
         { 
         task_interrupted_ = more_handlers; 
    
         if (more_handlers && !one_thread_) 
          wakeup_event_.unlock_and_signal_one(lock); 
         else 
          lock.unlock(); 
    
         task_cleanup on_exit = { this, &lock, &this_thread }; 
         (void)on_exit; 
    
         task_->run(!more_handlers, this_thread.private_op_queue); 
         } 
         else 
         { 
         //...... 
         } 
        } 
        else 
        { 
         wakeup_event_.clear(lock); 
         wakeup_event_.wait(lock); 
        } 
        } 
    
        return 0; 
    } 
    

    興味深い部分を

は(簡体字)schduler::run機能を見てみましょうコードのうち、次の行:

if (more_handlers && !one_thread_) 
    wakeup_event_.unlock_and_signal_one(lock); 
else 
    lock.unlock(); 

ここで議論しているケースは複数のスレッドを持つケースなので、最初の条件は満たされます(op_queue_には多数の保留中のタスクがあると仮定します)。

wakeup_event_.unlock_and_signal_oneは、lockをリリース/アンロックし、条件付き待機を待機しているスレッドのいずれかに通知します。だから、これで少なくとも1つのスレッド(ロックを取得した人)はdo_run_oneに今呼び出すことができます。

あなたの場合のtask_epoll_reactorです。そして、runメソッドではepoll_waitlock_を持たず、scheduler)を呼び出します。

ここで興味深いのは、が返されたすべての準備完了ディスクリプタを反復処理するときの動作です。引数の中で参照として受け取った操作キューに戻します。操作は今descriptor_state代わりのtask_operation_の実行時の型持ってプッシュ:だから

for (int i = 0; i < num_events; ++i) 
    { 
    void* ptr = events[i].data.ptr; 
    if (ptr == &interrupter_) 
    { 
     // don't call work_started() here. This still allows the scheduler to 
     // stop if the only remaining operations are descriptor operations. 
     descriptor_state* descriptor_data = static_cast<descriptor_state*>(ptr); 
     descriptor_data->set_ready_events(events[i].events); 
     ops.push(descriptor_data); 
    } 
    } 

を、scheduler::do_run_one内側のwhileループの次の反復に、完了したタスクのために、それは私が省かelse枝を(ヒットします)以前の私のペースト中:

おそらく async_readまたは async_write APIに渡されたユーザーのハンドルを呼び出します。これにより、次に complete関数ポインタを呼び出す
 else 
     { 
     std::size_t task_result = o->task_result_; 

     if (more_handlers && !one_thread_) 
      wake_one_thread_and_unlock(lock); 
     else 
      lock.unlock(); 

     // Ensure the count of outstanding work is decremented on block exit. 
     work_cleanup on_exit = { this, &lock, &this_thread }; 
     (void)on_exit; 

     // Complete the operation. May throw an exception. Deletes the object. 
     o->complete(this, ec, task_result); 

     return 1; 
     } 

。1または1:Nとしては、アプリケーションに合わせて、あなたがio_serviceオブジェクトのプールを作成し、1かもしれない1つの以上のスレッドがio_servicethreadの間のマッピングをすなわちにそのrunメソッドを呼び出すところ

  • 第二の場合は、です。この方法でio_serviceオブジェクトをラウンドロビン方式でsoucketオブジェクトに割り当てることができます。

今、あなたの質問に来る:それは一つのスレッドが1で1つのソケットを扱うことを確認することができ

私は、マルチスレッドでの使用のepollをしたい場合、または私は「EPOLLONESHOT」 を使用することができます時間。

これを正しく理解していれば、スレッドごとにすべてのイベントをソケットに処理したいと思っていますか?これはアプローチ番号2に従うこと、つまりio_serviceオブジェクトのプールを作成して1つのスレッドにマップすることで可能と考えています。こうすることで、特定のソケット上のすべてのアクティビティが1つのスレッド、つまりそのスレッドio_service:runでのみ処理されることを確認できます。

上記の場合、EPOLLONESHOTを設定することを心配する必要はありません。

マルチスレッドと1 io_serviceという最初のアプローチを使用して同じ動作を得ることについてはあまり確信がありません。

しかし、スレッドを全く使用していない場合、つまり、io_serviceがシングルスレッドで実行されている場合は、これらのすべてを抽象化することをすべての目的のために心配する必要はありません。

+0

そんなに書いていただきありがとうございます。それは私の多くを助けます。どうもありがとうございます。 – NeoLiu

+0

@ NeoLiu Stackoverflowに関する一般的な情報 - あなたの質問に本当に役立つ答えが見つかった場合は、カウンターの下にある「チェック」記号(回答の右上)をクリックして「受け入れる」必要があります。私はあなたが今までそれを知って以来、あなたはそれらを受け入れることができます今までの任意の答えを受け入れていない参照してください:) – Arunmu

1

1つのスレッドだけがepoll_waitを呼び出します。スレッドがディスクリプタのイベント通知を受信すると、ディスクリプタをio_serviceを実行するすべてのスレッドにデマルチプレクスします。 Platform-Specific Implementation Notesあたり:

スレッド:epollを用い

  • 多重分離をio_service::run()io_service::run_one()io_service::poll()又はio_service::poll_one()を呼び出すスレッドのいずれかで行われます。

単一のディスクリプタは、I/Oを実行する単一スレッドで処理されます。したがって、非同期操作を使用する場合、I/Oは特定のソケットに対して同時に実行されません。

関連する問題