diff options
Diffstat (limited to 'boost/asio/detail/impl/task_io_service.ipp')
-rw-r--r-- | boost/asio/detail/impl/task_io_service.ipp | 176 |
1 files changed, 113 insertions, 63 deletions
diff --git a/boost/asio/detail/impl/task_io_service.ipp b/boost/asio/detail/impl/task_io_service.ipp index 3d679c2dca..674df638fe 100644 --- a/boost/asio/detail/impl/task_io_service.ipp +++ b/boost/asio/detail/impl/task_io_service.ipp @@ -30,48 +30,67 @@ namespace boost { namespace asio { namespace detail { +struct task_io_service::thread_info +{ + event* wakeup_event; + op_queue<operation> private_op_queue; + long private_outstanding_work; + thread_info* next; +}; + struct task_io_service::task_cleanup { ~task_cleanup() { + if (this_thread_->private_outstanding_work > 0) + { + boost::asio::detail::increment( + task_io_service_->outstanding_work_, + this_thread_->private_outstanding_work); + } + this_thread_->private_outstanding_work = 0; + // Enqueue the completed operations and reinsert the task at the end of // the operation queue. lock_->lock(); task_io_service_->task_interrupted_ = true; - task_io_service_->op_queue_.push(*ops_); + task_io_service_->op_queue_.push(this_thread_->private_op_queue); task_io_service_->op_queue_.push(&task_io_service_->task_operation_); } task_io_service* task_io_service_; mutex::scoped_lock* lock_; - op_queue<operation>* ops_; + thread_info* this_thread_; }; struct task_io_service::work_cleanup { ~work_cleanup() { - task_io_service_->work_finished(); + if (this_thread_->private_outstanding_work > 1) + { + boost::asio::detail::increment( + task_io_service_->outstanding_work_, + this_thread_->private_outstanding_work - 1); + } + else if (this_thread_->private_outstanding_work < 1) + { + task_io_service_->work_finished(); + } + this_thread_->private_outstanding_work = 0; -#if defined(BOOST_HAS_THREADS) && !defined(BOOST_ASIO_DISABLE_THREADS) - if (!ops_->empty()) +#if defined(BOOST_HAS_THREADS) && !defined(BOOST_ASIO_DISABLE_THREADS) + if (!this_thread_->private_op_queue.empty()) { lock_->lock(); - task_io_service_->op_queue_.push(*ops_); + task_io_service_->op_queue_.push(this_thread_->private_op_queue); } #endif // defined(BOOST_HAS_THREADS) && !defined(BOOST_ASIO_DISABLE_THREADS) } task_io_service* task_io_service_; mutex::scoped_lock* lock_; - op_queue<operation>* ops_; -}; - -struct task_io_service::thread_info -{ - event* wakeup_event; - op_queue<operation>* private_op_queue; - thread_info* next; + thread_info* this_thread_; }; task_io_service::task_io_service( @@ -131,19 +150,14 @@ std::size_t task_io_service::run(boost::system::error_code& ec) thread_info this_thread; event wakeup_event; this_thread.wakeup_event = &wakeup_event; - op_queue<operation> private_op_queue; -#if defined(BOOST_HAS_THREADS) && !defined(BOOST_ASIO_DISABLE_THREADS) - this_thread.private_op_queue = one_thread_ == 1 ? &private_op_queue : 0; -#else // defined(BOOST_HAS_THREADS) && !defined(BOOST_ASIO_DISABLE_THREADS) - this_thread.private_op_queue = 0; -#endif // defined(BOOST_HAS_THREADS) && !defined(BOOST_ASIO_DISABLE_THREADS) + this_thread.private_outstanding_work = 0; this_thread.next = 0; thread_call_stack::context ctx(this, this_thread); mutex::scoped_lock lock(mutex_); std::size_t n = 0; - for (; do_run_one(lock, this_thread, private_op_queue, ec); lock.lock()) + for (; do_run_one(lock, this_thread, ec); lock.lock()) if (n != (std::numeric_limits<std::size_t>::max)()) ++n; return n; @@ -161,14 +175,13 @@ std::size_t task_io_service::run_one(boost::system::error_code& ec) thread_info this_thread; event wakeup_event; this_thread.wakeup_event = &wakeup_event; - op_queue<operation> private_op_queue; - this_thread.private_op_queue = 0; + this_thread.private_outstanding_work = 0; this_thread.next = 0; thread_call_stack::context ctx(this, this_thread); mutex::scoped_lock lock(mutex_); - return do_run_one(lock, this_thread, private_op_queue, ec); + return do_run_one(lock, this_thread, ec); } std::size_t task_io_service::poll(boost::system::error_code& ec) @@ -182,29 +195,23 @@ std::size_t task_io_service::poll(boost::system::error_code& ec) thread_info this_thread; this_thread.wakeup_event = 0; - op_queue<operation> private_op_queue; -#if defined(BOOST_HAS_THREADS) && !defined(BOOST_ASIO_DISABLE_THREADS) - this_thread.private_op_queue = one_thread_ == 1 ? &private_op_queue : 0; -#else // defined(BOOST_HAS_THREADS) && !defined(BOOST_ASIO_DISABLE_THREADS) - this_thread.private_op_queue = 0; -#endif // defined(BOOST_HAS_THREADS) && !defined(BOOST_ASIO_DISABLE_THREADS) + this_thread.private_outstanding_work = 0; this_thread.next = 0; thread_call_stack::context ctx(this, this_thread); mutex::scoped_lock lock(mutex_); -#if defined(BOOST_HAS_THREADS) && !defined(BOOST_ASIO_DISABLE_THREADS) +#if defined(BOOST_HAS_THREADS) && !defined(BOOST_ASIO_DISABLE_THREADS) // We want to support nested calls to poll() and poll_one(), so any handlers // that are already on a thread-private queue need to be put on to the main // queue now. if (one_thread_) if (thread_info* outer_thread_info = ctx.next_by_key()) - if (outer_thread_info->private_op_queue) - op_queue_.push(*outer_thread_info->private_op_queue); + op_queue_.push(outer_thread_info->private_op_queue); #endif // defined(BOOST_HAS_THREADS) && !defined(BOOST_ASIO_DISABLE_THREADS) std::size_t n = 0; - for (; do_poll_one(lock, private_op_queue, ec); lock.lock()) + for (; do_poll_one(lock, this_thread, ec); lock.lock()) if (n != (std::numeric_limits<std::size_t>::max)()) ++n; return n; @@ -221,24 +228,22 @@ std::size_t task_io_service::poll_one(boost::system::error_code& ec) thread_info this_thread; this_thread.wakeup_event = 0; - op_queue<operation> private_op_queue; - this_thread.private_op_queue = 0; + this_thread.private_outstanding_work = 0; this_thread.next = 0; thread_call_stack::context ctx(this, this_thread); mutex::scoped_lock lock(mutex_); -#if defined(BOOST_HAS_THREADS) && !defined(BOOST_ASIO_DISABLE_THREADS) +#if defined(BOOST_HAS_THREADS) && !defined(BOOST_ASIO_DISABLE_THREADS) // We want to support nested calls to poll() and poll_one(), so any handlers // that are already on a thread-private queue need to be put on to the main // queue now. if (one_thread_) if (thread_info* outer_thread_info = ctx.next_by_key()) - if (outer_thread_info->private_op_queue) - op_queue_.push(*outer_thread_info->private_op_queue); + op_queue_.push(outer_thread_info->private_op_queue); #endif // defined(BOOST_HAS_THREADS) && !defined(BOOST_ASIO_DISABLE_THREADS) - return do_poll_one(lock, private_op_queue, ec); + return do_poll_one(lock, this_thread, ec); } void task_io_service::stop() @@ -261,22 +266,33 @@ void task_io_service::reset() void task_io_service::post_immediate_completion(task_io_service::operation* op) { +#if defined(BOOST_HAS_THREADS) && !defined(BOOST_ASIO_DISABLE_THREADS) + if (one_thread_) + { + if (thread_info* this_thread = thread_call_stack::contains(this)) + { + ++this_thread->private_outstanding_work; + this_thread->private_op_queue.push(op); + return; + } + } +#endif // defined(BOOST_HAS_THREADS) && !defined(BOOST_ASIO_DISABLE_THREADS) + work_started(); - post_deferred_completion(op); + mutex::scoped_lock lock(mutex_); + op_queue_.push(op); + wake_one_thread_and_unlock(lock); } void task_io_service::post_deferred_completion(task_io_service::operation* op) { -#if defined(BOOST_HAS_THREADS) && !defined(BOOST_ASIO_DISABLE_THREADS) +#if defined(BOOST_HAS_THREADS) && !defined(BOOST_ASIO_DISABLE_THREADS) if (one_thread_) { if (thread_info* this_thread = thread_call_stack::contains(this)) { - if (this_thread->private_op_queue) - { - this_thread->private_op_queue->push(op); - return; - } + this_thread->private_op_queue.push(op); + return; } } #endif // defined(BOOST_HAS_THREADS) && !defined(BOOST_ASIO_DISABLE_THREADS) @@ -291,16 +307,13 @@ void task_io_service::post_deferred_completions( { if (!ops.empty()) { -#if defined(BOOST_HAS_THREADS) && !defined(BOOST_ASIO_DISABLE_THREADS) +#if defined(BOOST_HAS_THREADS) && !defined(BOOST_ASIO_DISABLE_THREADS) if (one_thread_) { if (thread_info* this_thread = thread_call_stack::contains(this)) { - if (this_thread->private_op_queue) - { - this_thread->private_op_queue->push(ops); - return; - } + this_thread->private_op_queue.push(ops); + return; } } #endif // defined(BOOST_HAS_THREADS) && !defined(BOOST_ASIO_DISABLE_THREADS) @@ -311,6 +324,44 @@ void task_io_service::post_deferred_completions( } } +void task_io_service::post_private_immediate_completion( + task_io_service::operation* op) +{ + work_started(); + post_private_deferred_completion(op); +} + +void task_io_service::post_private_deferred_completion( + task_io_service::operation* op) +{ +#if defined(BOOST_HAS_THREADS) && !defined(BOOST_ASIO_DISABLE_THREADS) + if (thread_info* this_thread = thread_call_stack::contains(this)) + { + this_thread->private_op_queue.push(op); + return; + } +#endif // defined(BOOST_HAS_THREADS) && !defined(BOOST_ASIO_DISABLE_THREADS) + + mutex::scoped_lock lock(mutex_); + op_queue_.push(op); + wake_one_thread_and_unlock(lock); +} + +void task_io_service::post_non_private_immediate_completion( + task_io_service::operation* op) +{ + work_started(); + post_non_private_deferred_completion(op); +} + +void task_io_service::post_non_private_deferred_completion( + task_io_service::operation* op) +{ + mutex::scoped_lock lock(mutex_); + op_queue_.push(op); + wake_one_thread_and_unlock(lock); +} + void task_io_service::abandon_operations( op_queue<task_io_service::operation>& ops) { @@ -320,7 +371,7 @@ void task_io_service::abandon_operations( std::size_t task_io_service::do_run_one(mutex::scoped_lock& lock, task_io_service::thread_info& this_thread, - op_queue<operation>& private_op_queue, const boost::system::error_code& ec) + const boost::system::error_code& ec) { while (!stopped_) { @@ -343,14 +394,13 @@ std::size_t task_io_service::do_run_one(mutex::scoped_lock& lock, else lock.unlock(); - op_queue<operation> completed_ops; - task_cleanup on_exit = { this, &lock, &completed_ops }; + task_cleanup on_exit = { this, &lock, &this_thread }; (void)on_exit; // Run the task. May throw an exception. Only block if the operation // queue is empty and we're not polling, otherwise we want to return // as soon as possible. - task_->run(!more_handlers, completed_ops); + task_->run(!more_handlers, this_thread.private_op_queue); } else { @@ -362,7 +412,7 @@ std::size_t task_io_service::do_run_one(mutex::scoped_lock& lock, lock.unlock(); // Ensure the count of outstanding work is decremented on block exit. - work_cleanup on_exit = { this, &lock, &private_op_queue }; + work_cleanup on_exit = { this, &lock, &this_thread }; (void)on_exit; // Complete the operation. May throw an exception. Deletes the object. @@ -385,7 +435,8 @@ std::size_t task_io_service::do_run_one(mutex::scoped_lock& lock, } std::size_t task_io_service::do_poll_one(mutex::scoped_lock& lock, - op_queue<operation>& private_op_queue, const boost::system::error_code& ec) + task_io_service::thread_info& this_thread, + const boost::system::error_code& ec) { if (stopped_) return 0; @@ -397,14 +448,13 @@ std::size_t task_io_service::do_poll_one(mutex::scoped_lock& lock, lock.unlock(); { - op_queue<operation> completed_ops; - task_cleanup c = { this, &lock, &completed_ops }; + task_cleanup c = { this, &lock, &this_thread }; (void)c; // Run the task. May throw an exception. Only block if the operation // queue is empty and we're not polling, otherwise we want to return // as soon as possible. - task_->run(false, completed_ops); + task_->run(false, this_thread.private_op_queue); } o = op_queue_.front(); @@ -426,7 +476,7 @@ std::size_t task_io_service::do_poll_one(mutex::scoped_lock& lock, lock.unlock(); // Ensure the count of outstanding work is decremented on block exit. - work_cleanup on_exit = { this, &lock, &private_op_queue }; + work_cleanup on_exit = { this, &lock, &this_thread }; (void)on_exit; // Complete the operation. May throw an exception. Deletes the object. |