summaryrefslogtreecommitdiff
path: root/boost/asio/detail/impl/task_io_service.ipp
diff options
context:
space:
mode:
Diffstat (limited to 'boost/asio/detail/impl/task_io_service.ipp')
-rw-r--r--boost/asio/detail/impl/task_io_service.ipp176
1 files changed, 113 insertions, 63 deletions
diff --git a/boost/asio/detail/impl/task_io_service.ipp b/boost/asio/detail/impl/task_io_service.ipp
index 3d679c2dca..674df638fe 100644
--- a/boost/asio/detail/impl/task_io_service.ipp
+++ b/boost/asio/detail/impl/task_io_service.ipp
@@ -30,48 +30,67 @@ namespace boost {
namespace asio {
namespace detail {
+struct task_io_service::thread_info
+{
+ event* wakeup_event;
+ op_queue<operation> private_op_queue;
+ long private_outstanding_work;
+ thread_info* next;
+};
+
struct task_io_service::task_cleanup
{
~task_cleanup()
{
+ if (this_thread_->private_outstanding_work > 0)
+ {
+ boost::asio::detail::increment(
+ task_io_service_->outstanding_work_,
+ this_thread_->private_outstanding_work);
+ }
+ this_thread_->private_outstanding_work = 0;
+
// Enqueue the completed operations and reinsert the task at the end of
// the operation queue.
lock_->lock();
task_io_service_->task_interrupted_ = true;
- task_io_service_->op_queue_.push(*ops_);
+ task_io_service_->op_queue_.push(this_thread_->private_op_queue);
task_io_service_->op_queue_.push(&task_io_service_->task_operation_);
}
task_io_service* task_io_service_;
mutex::scoped_lock* lock_;
- op_queue<operation>* ops_;
+ thread_info* this_thread_;
};
struct task_io_service::work_cleanup
{
~work_cleanup()
{
- task_io_service_->work_finished();
+ if (this_thread_->private_outstanding_work > 1)
+ {
+ boost::asio::detail::increment(
+ task_io_service_->outstanding_work_,
+ this_thread_->private_outstanding_work - 1);
+ }
+ else if (this_thread_->private_outstanding_work < 1)
+ {
+ task_io_service_->work_finished();
+ }
+ this_thread_->private_outstanding_work = 0;
-#if defined(BOOST_HAS_THREADS) && !defined(BOOST_ASIO_DISABLE_THREADS)
- if (!ops_->empty())
+#if defined(BOOST_HAS_THREADS) && !defined(BOOST_ASIO_DISABLE_THREADS)
+ if (!this_thread_->private_op_queue.empty())
{
lock_->lock();
- task_io_service_->op_queue_.push(*ops_);
+ task_io_service_->op_queue_.push(this_thread_->private_op_queue);
}
#endif // defined(BOOST_HAS_THREADS) && !defined(BOOST_ASIO_DISABLE_THREADS)
}
task_io_service* task_io_service_;
mutex::scoped_lock* lock_;
- op_queue<operation>* ops_;
-};
-
-struct task_io_service::thread_info
-{
- event* wakeup_event;
- op_queue<operation>* private_op_queue;
- thread_info* next;
+ thread_info* this_thread_;
};
task_io_service::task_io_service(
@@ -131,19 +150,14 @@ std::size_t task_io_service::run(boost::system::error_code& ec)
thread_info this_thread;
event wakeup_event;
this_thread.wakeup_event = &wakeup_event;
- op_queue<operation> private_op_queue;
-#if defined(BOOST_HAS_THREADS) && !defined(BOOST_ASIO_DISABLE_THREADS)
- this_thread.private_op_queue = one_thread_ == 1 ? &private_op_queue : 0;
-#else // defined(BOOST_HAS_THREADS) && !defined(BOOST_ASIO_DISABLE_THREADS)
- this_thread.private_op_queue = 0;
-#endif // defined(BOOST_HAS_THREADS) && !defined(BOOST_ASIO_DISABLE_THREADS)
+ this_thread.private_outstanding_work = 0;
this_thread.next = 0;
thread_call_stack::context ctx(this, this_thread);
mutex::scoped_lock lock(mutex_);
std::size_t n = 0;
- for (; do_run_one(lock, this_thread, private_op_queue, ec); lock.lock())
+ for (; do_run_one(lock, this_thread, ec); lock.lock())
if (n != (std::numeric_limits<std::size_t>::max)())
++n;
return n;
@@ -161,14 +175,13 @@ std::size_t task_io_service::run_one(boost::system::error_code& ec)
thread_info this_thread;
event wakeup_event;
this_thread.wakeup_event = &wakeup_event;
- op_queue<operation> private_op_queue;
- this_thread.private_op_queue = 0;
+ this_thread.private_outstanding_work = 0;
this_thread.next = 0;
thread_call_stack::context ctx(this, this_thread);
mutex::scoped_lock lock(mutex_);
- return do_run_one(lock, this_thread, private_op_queue, ec);
+ return do_run_one(lock, this_thread, ec);
}
std::size_t task_io_service::poll(boost::system::error_code& ec)
@@ -182,29 +195,23 @@ std::size_t task_io_service::poll(boost::system::error_code& ec)
thread_info this_thread;
this_thread.wakeup_event = 0;
- op_queue<operation> private_op_queue;
-#if defined(BOOST_HAS_THREADS) && !defined(BOOST_ASIO_DISABLE_THREADS)
- this_thread.private_op_queue = one_thread_ == 1 ? &private_op_queue : 0;
-#else // defined(BOOST_HAS_THREADS) && !defined(BOOST_ASIO_DISABLE_THREADS)
- this_thread.private_op_queue = 0;
-#endif // defined(BOOST_HAS_THREADS) && !defined(BOOST_ASIO_DISABLE_THREADS)
+ this_thread.private_outstanding_work = 0;
this_thread.next = 0;
thread_call_stack::context ctx(this, this_thread);
mutex::scoped_lock lock(mutex_);
-#if defined(BOOST_HAS_THREADS) && !defined(BOOST_ASIO_DISABLE_THREADS)
+#if defined(BOOST_HAS_THREADS) && !defined(BOOST_ASIO_DISABLE_THREADS)
// We want to support nested calls to poll() and poll_one(), so any handlers
// that are already on a thread-private queue need to be put on to the main
// queue now.
if (one_thread_)
if (thread_info* outer_thread_info = ctx.next_by_key())
- if (outer_thread_info->private_op_queue)
- op_queue_.push(*outer_thread_info->private_op_queue);
+ op_queue_.push(outer_thread_info->private_op_queue);
#endif // defined(BOOST_HAS_THREADS) && !defined(BOOST_ASIO_DISABLE_THREADS)
std::size_t n = 0;
- for (; do_poll_one(lock, private_op_queue, ec); lock.lock())
+ for (; do_poll_one(lock, this_thread, ec); lock.lock())
if (n != (std::numeric_limits<std::size_t>::max)())
++n;
return n;
@@ -221,24 +228,22 @@ std::size_t task_io_service::poll_one(boost::system::error_code& ec)
thread_info this_thread;
this_thread.wakeup_event = 0;
- op_queue<operation> private_op_queue;
- this_thread.private_op_queue = 0;
+ this_thread.private_outstanding_work = 0;
this_thread.next = 0;
thread_call_stack::context ctx(this, this_thread);
mutex::scoped_lock lock(mutex_);
-#if defined(BOOST_HAS_THREADS) && !defined(BOOST_ASIO_DISABLE_THREADS)
+#if defined(BOOST_HAS_THREADS) && !defined(BOOST_ASIO_DISABLE_THREADS)
// We want to support nested calls to poll() and poll_one(), so any handlers
// that are already on a thread-private queue need to be put on to the main
// queue now.
if (one_thread_)
if (thread_info* outer_thread_info = ctx.next_by_key())
- if (outer_thread_info->private_op_queue)
- op_queue_.push(*outer_thread_info->private_op_queue);
+ op_queue_.push(outer_thread_info->private_op_queue);
#endif // defined(BOOST_HAS_THREADS) && !defined(BOOST_ASIO_DISABLE_THREADS)
- return do_poll_one(lock, private_op_queue, ec);
+ return do_poll_one(lock, this_thread, ec);
}
void task_io_service::stop()
@@ -261,22 +266,33 @@ void task_io_service::reset()
void task_io_service::post_immediate_completion(task_io_service::operation* op)
{
+#if defined(BOOST_HAS_THREADS) && !defined(BOOST_ASIO_DISABLE_THREADS)
+ if (one_thread_)
+ {
+ if (thread_info* this_thread = thread_call_stack::contains(this))
+ {
+ ++this_thread->private_outstanding_work;
+ this_thread->private_op_queue.push(op);
+ return;
+ }
+ }
+#endif // defined(BOOST_HAS_THREADS) && !defined(BOOST_ASIO_DISABLE_THREADS)
+
work_started();
- post_deferred_completion(op);
+ mutex::scoped_lock lock(mutex_);
+ op_queue_.push(op);
+ wake_one_thread_and_unlock(lock);
}
void task_io_service::post_deferred_completion(task_io_service::operation* op)
{
-#if defined(BOOST_HAS_THREADS) && !defined(BOOST_ASIO_DISABLE_THREADS)
+#if defined(BOOST_HAS_THREADS) && !defined(BOOST_ASIO_DISABLE_THREADS)
if (one_thread_)
{
if (thread_info* this_thread = thread_call_stack::contains(this))
{
- if (this_thread->private_op_queue)
- {
- this_thread->private_op_queue->push(op);
- return;
- }
+ this_thread->private_op_queue.push(op);
+ return;
}
}
#endif // defined(BOOST_HAS_THREADS) && !defined(BOOST_ASIO_DISABLE_THREADS)
@@ -291,16 +307,13 @@ void task_io_service::post_deferred_completions(
{
if (!ops.empty())
{
-#if defined(BOOST_HAS_THREADS) && !defined(BOOST_ASIO_DISABLE_THREADS)
+#if defined(BOOST_HAS_THREADS) && !defined(BOOST_ASIO_DISABLE_THREADS)
if (one_thread_)
{
if (thread_info* this_thread = thread_call_stack::contains(this))
{
- if (this_thread->private_op_queue)
- {
- this_thread->private_op_queue->push(ops);
- return;
- }
+ this_thread->private_op_queue.push(ops);
+ return;
}
}
#endif // defined(BOOST_HAS_THREADS) && !defined(BOOST_ASIO_DISABLE_THREADS)
@@ -311,6 +324,44 @@ void task_io_service::post_deferred_completions(
}
}
+void task_io_service::post_private_immediate_completion(
+ task_io_service::operation* op)
+{
+ work_started();
+ post_private_deferred_completion(op);
+}
+
+void task_io_service::post_private_deferred_completion(
+ task_io_service::operation* op)
+{
+#if defined(BOOST_HAS_THREADS) && !defined(BOOST_ASIO_DISABLE_THREADS)
+ if (thread_info* this_thread = thread_call_stack::contains(this))
+ {
+ this_thread->private_op_queue.push(op);
+ return;
+ }
+#endif // defined(BOOST_HAS_THREADS) && !defined(BOOST_ASIO_DISABLE_THREADS)
+
+ mutex::scoped_lock lock(mutex_);
+ op_queue_.push(op);
+ wake_one_thread_and_unlock(lock);
+}
+
+void task_io_service::post_non_private_immediate_completion(
+ task_io_service::operation* op)
+{
+ work_started();
+ post_non_private_deferred_completion(op);
+}
+
+void task_io_service::post_non_private_deferred_completion(
+ task_io_service::operation* op)
+{
+ mutex::scoped_lock lock(mutex_);
+ op_queue_.push(op);
+ wake_one_thread_and_unlock(lock);
+}
+
void task_io_service::abandon_operations(
op_queue<task_io_service::operation>& ops)
{
@@ -320,7 +371,7 @@ void task_io_service::abandon_operations(
std::size_t task_io_service::do_run_one(mutex::scoped_lock& lock,
task_io_service::thread_info& this_thread,
- op_queue<operation>& private_op_queue, const boost::system::error_code& ec)
+ const boost::system::error_code& ec)
{
while (!stopped_)
{
@@ -343,14 +394,13 @@ std::size_t task_io_service::do_run_one(mutex::scoped_lock& lock,
else
lock.unlock();
- op_queue<operation> completed_ops;
- task_cleanup on_exit = { this, &lock, &completed_ops };
+ task_cleanup on_exit = { this, &lock, &this_thread };
(void)on_exit;
// Run the task. May throw an exception. Only block if the operation
// queue is empty and we're not polling, otherwise we want to return
// as soon as possible.
- task_->run(!more_handlers, completed_ops);
+ task_->run(!more_handlers, this_thread.private_op_queue);
}
else
{
@@ -362,7 +412,7 @@ std::size_t task_io_service::do_run_one(mutex::scoped_lock& lock,
lock.unlock();
// Ensure the count of outstanding work is decremented on block exit.
- work_cleanup on_exit = { this, &lock, &private_op_queue };
+ work_cleanup on_exit = { this, &lock, &this_thread };
(void)on_exit;
// Complete the operation. May throw an exception. Deletes the object.
@@ -385,7 +435,8 @@ std::size_t task_io_service::do_run_one(mutex::scoped_lock& lock,
}
std::size_t task_io_service::do_poll_one(mutex::scoped_lock& lock,
- op_queue<operation>& private_op_queue, const boost::system::error_code& ec)
+ task_io_service::thread_info& this_thread,
+ const boost::system::error_code& ec)
{
if (stopped_)
return 0;
@@ -397,14 +448,13 @@ std::size_t task_io_service::do_poll_one(mutex::scoped_lock& lock,
lock.unlock();
{
- op_queue<operation> completed_ops;
- task_cleanup c = { this, &lock, &completed_ops };
+ task_cleanup c = { this, &lock, &this_thread };
(void)c;
// Run the task. May throw an exception. Only block if the operation
// queue is empty and we're not polling, otherwise we want to return
// as soon as possible.
- task_->run(false, completed_ops);
+ task_->run(false, this_thread.private_op_queue);
}
o = op_queue_.front();
@@ -426,7 +476,7 @@ std::size_t task_io_service::do_poll_one(mutex::scoped_lock& lock,
lock.unlock();
// Ensure the count of outstanding work is decremented on block exit.
- work_cleanup on_exit = { this, &lock, &private_op_queue };
+ work_cleanup on_exit = { this, &lock, &this_thread };
(void)on_exit;
// Complete the operation. May throw an exception. Deletes the object.