summaryrefslogtreecommitdiff
path: root/boost/asio/detail/impl/task_io_service.ipp
diff options
context:
space:
mode:
Diffstat (limited to 'boost/asio/detail/impl/task_io_service.ipp')
-rw-r--r--boost/asio/detail/impl/task_io_service.ipp128
1 files changed, 30 insertions, 98 deletions
diff --git a/boost/asio/detail/impl/task_io_service.ipp b/boost/asio/detail/impl/task_io_service.ipp
index 674df638fe..e58e11e5e3 100644
--- a/boost/asio/detail/impl/task_io_service.ipp
+++ b/boost/asio/detail/impl/task_io_service.ipp
@@ -2,7 +2,7 @@
// detail/impl/task_io_service.ipp
// ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
//
-// Copyright (c) 2003-2012 Christopher M. Kohlhoff (chris at kohlhoff dot com)
+// Copyright (c) 2003-2014 Christopher M. Kohlhoff (chris at kohlhoff dot com)
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
@@ -19,10 +19,11 @@
#if !defined(BOOST_ASIO_HAS_IOCP)
-#include <boost/limits.hpp>
#include <boost/asio/detail/event.hpp>
+#include <boost/asio/detail/limits.hpp>
#include <boost/asio/detail/reactor.hpp>
#include <boost/asio/detail/task_io_service.hpp>
+#include <boost/asio/detail/task_io_service_thread_info.hpp>
#include <boost/asio/detail/push_options.hpp>
@@ -30,14 +31,6 @@ namespace boost {
namespace asio {
namespace detail {
-struct task_io_service::thread_info
-{
- event* wakeup_event;
- op_queue<operation> private_op_queue;
- long private_outstanding_work;
- thread_info* next;
-};
-
struct task_io_service::task_cleanup
{
~task_cleanup()
@@ -79,13 +72,13 @@ struct task_io_service::work_cleanup
}
this_thread_->private_outstanding_work = 0;
-#if defined(BOOST_HAS_THREADS) && !defined(BOOST_ASIO_DISABLE_THREADS)
+#if defined(BOOST_ASIO_HAS_THREADS)
if (!this_thread_->private_op_queue.empty())
{
lock_->lock();
task_io_service_->op_queue_.push(this_thread_->private_op_queue);
}
-#endif // defined(BOOST_HAS_THREADS) && !defined(BOOST_ASIO_DISABLE_THREADS)
+#endif // defined(BOOST_ASIO_HAS_THREADS)
}
task_io_service* task_io_service_;
@@ -102,8 +95,7 @@ task_io_service::task_io_service(
task_interrupted_(true),
outstanding_work_(0),
stopped_(false),
- shutdown_(false),
- first_idle_thread_(0)
+ shutdown_(false)
{
BOOST_ASIO_HANDLER_TRACKING_INIT;
}
@@ -148,10 +140,7 @@ std::size_t task_io_service::run(boost::system::error_code& ec)
}
thread_info this_thread;
- event wakeup_event;
- this_thread.wakeup_event = &wakeup_event;
this_thread.private_outstanding_work = 0;
- this_thread.next = 0;
thread_call_stack::context ctx(this, this_thread);
mutex::scoped_lock lock(mutex_);
@@ -173,10 +162,7 @@ std::size_t task_io_service::run_one(boost::system::error_code& ec)
}
thread_info this_thread;
- event wakeup_event;
- this_thread.wakeup_event = &wakeup_event;
this_thread.private_outstanding_work = 0;
- this_thread.next = 0;
thread_call_stack::context ctx(this, this_thread);
mutex::scoped_lock lock(mutex_);
@@ -194,21 +180,19 @@ std::size_t task_io_service::poll(boost::system::error_code& ec)
}
thread_info this_thread;
- this_thread.wakeup_event = 0;
this_thread.private_outstanding_work = 0;
- this_thread.next = 0;
thread_call_stack::context ctx(this, this_thread);
mutex::scoped_lock lock(mutex_);
-#if defined(BOOST_HAS_THREADS) && !defined(BOOST_ASIO_DISABLE_THREADS)
+#if defined(BOOST_ASIO_HAS_THREADS)
// We want to support nested calls to poll() and poll_one(), so any handlers
// that are already on a thread-private queue need to be put on to the main
// queue now.
if (one_thread_)
if (thread_info* outer_thread_info = ctx.next_by_key())
op_queue_.push(outer_thread_info->private_op_queue);
-#endif // defined(BOOST_HAS_THREADS) && !defined(BOOST_ASIO_DISABLE_THREADS)
+#endif // defined(BOOST_ASIO_HAS_THREADS)
std::size_t n = 0;
for (; do_poll_one(lock, this_thread, ec); lock.lock())
@@ -227,21 +211,19 @@ std::size_t task_io_service::poll_one(boost::system::error_code& ec)
}
thread_info this_thread;
- this_thread.wakeup_event = 0;
this_thread.private_outstanding_work = 0;
- this_thread.next = 0;
thread_call_stack::context ctx(this, this_thread);
mutex::scoped_lock lock(mutex_);
-#if defined(BOOST_HAS_THREADS) && !defined(BOOST_ASIO_DISABLE_THREADS)
+#if defined(BOOST_ASIO_HAS_THREADS)
// We want to support nested calls to poll() and poll_one(), so any handlers
// that are already on a thread-private queue need to be put on to the main
// queue now.
if (one_thread_)
if (thread_info* outer_thread_info = ctx.next_by_key())
op_queue_.push(outer_thread_info->private_op_queue);
-#endif // defined(BOOST_HAS_THREADS) && !defined(BOOST_ASIO_DISABLE_THREADS)
+#endif // defined(BOOST_ASIO_HAS_THREADS)
return do_poll_one(lock, this_thread, ec);
}
@@ -264,10 +246,11 @@ void task_io_service::reset()
stopped_ = false;
}
-void task_io_service::post_immediate_completion(task_io_service::operation* op)
+void task_io_service::post_immediate_completion(
+ task_io_service::operation* op, bool is_continuation)
{
-#if defined(BOOST_HAS_THREADS) && !defined(BOOST_ASIO_DISABLE_THREADS)
- if (one_thread_)
+#if defined(BOOST_ASIO_HAS_THREADS)
+ if (one_thread_ || is_continuation)
{
if (thread_info* this_thread = thread_call_stack::contains(this))
{
@@ -276,7 +259,9 @@ void task_io_service::post_immediate_completion(task_io_service::operation* op)
return;
}
}
-#endif // defined(BOOST_HAS_THREADS) && !defined(BOOST_ASIO_DISABLE_THREADS)
+#else // defined(BOOST_ASIO_HAS_THREADS)
+ (void)is_continuation;
+#endif // defined(BOOST_ASIO_HAS_THREADS)
work_started();
mutex::scoped_lock lock(mutex_);
@@ -286,7 +271,7 @@ void task_io_service::post_immediate_completion(task_io_service::operation* op)
void task_io_service::post_deferred_completion(task_io_service::operation* op)
{
-#if defined(BOOST_HAS_THREADS) && !defined(BOOST_ASIO_DISABLE_THREADS)
+#if defined(BOOST_ASIO_HAS_THREADS)
if (one_thread_)
{
if (thread_info* this_thread = thread_call_stack::contains(this))
@@ -295,7 +280,7 @@ void task_io_service::post_deferred_completion(task_io_service::operation* op)
return;
}
}
-#endif // defined(BOOST_HAS_THREADS) && !defined(BOOST_ASIO_DISABLE_THREADS)
+#endif // defined(BOOST_ASIO_HAS_THREADS)
mutex::scoped_lock lock(mutex_);
op_queue_.push(op);
@@ -307,7 +292,7 @@ void task_io_service::post_deferred_completions(
{
if (!ops.empty())
{
-#if defined(BOOST_HAS_THREADS) && !defined(BOOST_ASIO_DISABLE_THREADS)
+#if defined(BOOST_ASIO_HAS_THREADS)
if (one_thread_)
{
if (thread_info* this_thread = thread_call_stack::contains(this))
@@ -316,7 +301,7 @@ void task_io_service::post_deferred_completions(
return;
}
}
-#endif // defined(BOOST_HAS_THREADS) && !defined(BOOST_ASIO_DISABLE_THREADS)
+#endif // defined(BOOST_ASIO_HAS_THREADS)
mutex::scoped_lock lock(mutex_);
op_queue_.push(ops);
@@ -324,39 +309,10 @@ void task_io_service::post_deferred_completions(
}
}
-void task_io_service::post_private_immediate_completion(
- task_io_service::operation* op)
-{
- work_started();
- post_private_deferred_completion(op);
-}
-
-void task_io_service::post_private_deferred_completion(
- task_io_service::operation* op)
-{
-#if defined(BOOST_HAS_THREADS) && !defined(BOOST_ASIO_DISABLE_THREADS)
- if (thread_info* this_thread = thread_call_stack::contains(this))
- {
- this_thread->private_op_queue.push(op);
- return;
- }
-#endif // defined(BOOST_HAS_THREADS) && !defined(BOOST_ASIO_DISABLE_THREADS)
-
- mutex::scoped_lock lock(mutex_);
- op_queue_.push(op);
- wake_one_thread_and_unlock(lock);
-}
-
-void task_io_service::post_non_private_immediate_completion(
+void task_io_service::do_dispatch(
task_io_service::operation* op)
{
work_started();
- post_non_private_deferred_completion(op);
-}
-
-void task_io_service::post_non_private_deferred_completion(
- task_io_service::operation* op)
-{
mutex::scoped_lock lock(mutex_);
op_queue_.push(op);
wake_one_thread_and_unlock(lock);
@@ -387,10 +343,7 @@ std::size_t task_io_service::do_run_one(mutex::scoped_lock& lock,
task_interrupted_ = more_handlers;
if (more_handlers && !one_thread_)
- {
- if (!wake_one_idle_thread_and_unlock(lock))
- lock.unlock();
- }
+ wakeup_event_.unlock_and_signal_one(lock);
else
lock.unlock();
@@ -423,11 +376,8 @@ std::size_t task_io_service::do_run_one(mutex::scoped_lock& lock,
}
else
{
- // Nothing to run right now, so just wait for work to do.
- this_thread.next = first_idle_thread_;
- first_idle_thread_ = &this_thread;
- this_thread.wakeup_event->clear(lock);
- this_thread.wakeup_event->wait(lock);
+ wakeup_event_.clear(lock);
+ wakeup_event_.wait(lock);
}
}
@@ -459,7 +409,10 @@ std::size_t task_io_service::do_poll_one(mutex::scoped_lock& lock,
o = op_queue_.front();
if (o == &task_operation_)
+ {
+ wakeup_event_.maybe_unlock_and_signal_one(lock);
return 0;
+ }
}
if (o == 0)
@@ -489,14 +442,7 @@ void task_io_service::stop_all_threads(
mutex::scoped_lock& lock)
{
stopped_ = true;
-
- while (first_idle_thread_)
- {
- thread_info* idle_thread = first_idle_thread_;
- first_idle_thread_ = idle_thread->next;
- idle_thread->next = 0;
- idle_thread->wakeup_event->signal(lock);
- }
+ wakeup_event_.signal_all(lock);
if (!task_interrupted_ && task_)
{
@@ -505,24 +451,10 @@ void task_io_service::stop_all_threads(
}
}
-bool task_io_service::wake_one_idle_thread_and_unlock(
- mutex::scoped_lock& lock)
-{
- if (first_idle_thread_)
- {
- thread_info* idle_thread = first_idle_thread_;
- first_idle_thread_ = idle_thread->next;
- idle_thread->next = 0;
- idle_thread->wakeup_event->signal_and_unlock(lock);
- return true;
- }
- return false;
-}
-
void task_io_service::wake_one_thread_and_unlock(
mutex::scoped_lock& lock)
{
- if (!wake_one_idle_thread_and_unlock(lock))
+ if (!wakeup_event_.maybe_unlock_and_signal_one(lock))
{
if (!task_interrupted_ && task_)
{