summaryrefslogtreecommitdiff
path: root/boost/asio/detail/impl
diff options
context:
space:
mode:
authorAnas Nashif <anas.nashif@intel.com>2013-08-26 08:15:55 -0400
committerAnas Nashif <anas.nashif@intel.com>2013-08-26 08:15:55 -0400
commitbb4dd8289b351fae6b55e303f189127a394a1edd (patch)
tree77c9c35a31b1459dd7988c2448e797d142530c41 /boost/asio/detail/impl
parent1a78a62555be32868418fe52f8e330c9d0f95d5a (diff)
downloadboost-bb4dd8289b351fae6b55e303f189127a394a1edd.tar.gz
boost-bb4dd8289b351fae6b55e303f189127a394a1edd.tar.bz2
boost-bb4dd8289b351fae6b55e303f189127a394a1edd.zip
Imported Upstream version 1.51.0upstream/1.51.0
Diffstat (limited to 'boost/asio/detail/impl')
-rw-r--r--boost/asio/detail/impl/epoll_reactor.ipp50
-rw-r--r--boost/asio/detail/impl/socket_ops.ipp1
-rw-r--r--boost/asio/detail/impl/strand_service.hpp5
-rw-r--r--boost/asio/detail/impl/strand_service.ipp2
-rw-r--r--boost/asio/detail/impl/task_io_service.hpp2
-rw-r--r--boost/asio/detail/impl/task_io_service.ipp176
6 files changed, 154 insertions, 82 deletions
diff --git a/boost/asio/detail/impl/epoll_reactor.ipp b/boost/asio/detail/impl/epoll_reactor.ipp
index 771edea67c..073bd08825 100644
--- a/boost/asio/detail/impl/epoll_reactor.ipp
+++ b/boost/asio/detail/impl/epoll_reactor.ipp
@@ -127,7 +127,7 @@ void epoll_reactor::fork_service(boost::asio::io_service::fork_event fork_ev)
for (descriptor_state* state = registered_descriptors_.first();
state != 0; state = state->next_)
{
- ev.events = EPOLLIN | EPOLLERR | EPOLLHUP | EPOLLOUT | EPOLLPRI | EPOLLET;
+ ev.events = state->registered_events_;
ev.data.ptr = state;
int result = epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, state->descriptor_, &ev);
if (result != 0)
@@ -159,7 +159,8 @@ int epoll_reactor::register_descriptor(socket_type descriptor,
}
epoll_event ev = { 0, { 0 } };
- ev.events = EPOLLIN | EPOLLERR | EPOLLHUP | EPOLLOUT | EPOLLPRI | EPOLLET;
+ ev.events = EPOLLIN | EPOLLERR | EPOLLHUP | EPOLLPRI | EPOLLET;
+ descriptor_data->registered_events_ = ev.events;
ev.data.ptr = descriptor_data;
int result = epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, descriptor, &ev);
if (result != 0)
@@ -184,7 +185,8 @@ int epoll_reactor::register_internal_descriptor(
}
epoll_event ev = { 0, { 0 } };
- ev.events = EPOLLIN | EPOLLERR | EPOLLHUP | EPOLLOUT | EPOLLPRI | EPOLLET;
+ ev.events = EPOLLIN | EPOLLERR | EPOLLHUP | EPOLLPRI | EPOLLET;
+ descriptor_data->registered_events_ = ev.events;
ev.data.ptr = descriptor_data;
int result = epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, descriptor, &ev);
if (result != 0)
@@ -222,23 +224,47 @@ void epoll_reactor::start_op(int op_type, socket_type descriptor,
if (descriptor_data->op_queue_[op_type].empty())
{
- if (allow_speculative)
+ if (allow_speculative
+ && (op_type != read_op
+ || descriptor_data->op_queue_[except_op].empty()))
{
- if (op_type != read_op || descriptor_data->op_queue_[except_op].empty())
+ if (op->perform())
{
- if (op->perform())
+ descriptor_lock.unlock();
+ io_service_.post_immediate_completion(op);
+ return;
+ }
+
+ if (op_type == write_op)
+ {
+ if ((descriptor_data->registered_events_ & EPOLLOUT) == 0)
{
- descriptor_lock.unlock();
- io_service_.post_immediate_completion(op);
- return;
+ epoll_event ev = { 0, { 0 } };
+ ev.events = descriptor_data->registered_events_ | EPOLLOUT;
+ ev.data.ptr = descriptor_data;
+ if (epoll_ctl(epoll_fd_, EPOLL_CTL_MOD, descriptor, &ev) == 0)
+ {
+ descriptor_data->registered_events_ |= ev.events;
+ }
+ else
+ {
+ op->ec_ = boost::system::error_code(errno,
+ boost::asio::error::get_system_category());
+ io_service_.post_immediate_completion(op);
+ return;
+ }
}
}
}
else
{
+ if (op_type == write_op)
+ {
+ descriptor_data->registered_events_ |= EPOLLOUT;
+ }
+
epoll_event ev = { 0, { 0 } };
- ev.events = EPOLLIN | EPOLLERR | EPOLLHUP
- | EPOLLOUT | EPOLLPRI | EPOLLET;
+ ev.events = descriptor_data->registered_events_;
ev.data.ptr = descriptor_data;
epoll_ctl(epoll_fd_, EPOLL_CTL_MOD, descriptor, &ev);
}
@@ -440,7 +466,7 @@ int epoll_reactor::do_epoll_create()
errno = EINVAL;
#endif // defined(EPOLL_CLOEXEC)
- if (fd == -1 && errno == EINVAL)
+ if (fd == -1 && (errno == EINVAL || errno == ENOSYS))
{
fd = epoll_create(epoll_size);
if (fd != -1)
diff --git a/boost/asio/detail/impl/socket_ops.ipp b/boost/asio/detail/impl/socket_ops.ipp
index 24d2d66ad1..16e95366e6 100644
--- a/boost/asio/detail/impl/socket_ops.ipp
+++ b/boost/asio/detail/impl/socket_ops.ipp
@@ -18,6 +18,7 @@
#include <boost/asio/detail/config.hpp>
#include <boost/assert.hpp>
#include <boost/detail/workaround.hpp>
+#include <cctype>
#include <cstdio>
#include <cstdlib>
#include <cstring>
diff --git a/boost/asio/detail/impl/strand_service.hpp b/boost/asio/detail/impl/strand_service.hpp
index 1d98d99055..7581852ff5 100644
--- a/boost/asio/detail/impl/strand_service.hpp
+++ b/boost/asio/detail/impl/strand_service.hpp
@@ -50,11 +50,6 @@ struct strand_service::on_dispatch_exit
}
};
-inline void strand_service::destroy(strand_service::implementation_type& impl)
-{
- impl = 0;
-}
-
template <typename Handler>
void strand_service::dispatch(strand_service::implementation_type& impl,
Handler handler)
diff --git a/boost/asio/detail/impl/strand_service.ipp b/boost/asio/detail/impl/strand_service.ipp
index 64e4cc00ce..1912d80f72 100644
--- a/boost/asio/detail/impl/strand_service.ipp
+++ b/boost/asio/detail/impl/strand_service.ipp
@@ -38,7 +38,7 @@ struct strand_service::on_do_complete_exit
impl_->mutex_.unlock();
if (more_handlers)
- owner_->post_immediate_completion(impl_);
+ owner_->post_private_immediate_completion(impl_);
}
};
diff --git a/boost/asio/detail/impl/task_io_service.hpp b/boost/asio/detail/impl/task_io_service.hpp
index 7cd7449e60..2cc7b7eea1 100644
--- a/boost/asio/detail/impl/task_io_service.hpp
+++ b/boost/asio/detail/impl/task_io_service.hpp
@@ -45,7 +45,7 @@ void task_io_service::dispatch(Handler handler)
BOOST_ASIO_HANDLER_CREATION((p.p, "io_service", this, "dispatch"));
- post_immediate_completion(p.p);
+ post_non_private_immediate_completion(p.p);
p.v = p.p = 0;
}
}
diff --git a/boost/asio/detail/impl/task_io_service.ipp b/boost/asio/detail/impl/task_io_service.ipp
index 3d679c2dca..674df638fe 100644
--- a/boost/asio/detail/impl/task_io_service.ipp
+++ b/boost/asio/detail/impl/task_io_service.ipp
@@ -30,48 +30,67 @@ namespace boost {
namespace asio {
namespace detail {
+struct task_io_service::thread_info
+{
+ event* wakeup_event;
+ op_queue<operation> private_op_queue;
+ long private_outstanding_work;
+ thread_info* next;
+};
+
struct task_io_service::task_cleanup
{
~task_cleanup()
{
+ if (this_thread_->private_outstanding_work > 0)
+ {
+ boost::asio::detail::increment(
+ task_io_service_->outstanding_work_,
+ this_thread_->private_outstanding_work);
+ }
+ this_thread_->private_outstanding_work = 0;
+
// Enqueue the completed operations and reinsert the task at the end of
// the operation queue.
lock_->lock();
task_io_service_->task_interrupted_ = true;
- task_io_service_->op_queue_.push(*ops_);
+ task_io_service_->op_queue_.push(this_thread_->private_op_queue);
task_io_service_->op_queue_.push(&task_io_service_->task_operation_);
}
task_io_service* task_io_service_;
mutex::scoped_lock* lock_;
- op_queue<operation>* ops_;
+ thread_info* this_thread_;
};
struct task_io_service::work_cleanup
{
~work_cleanup()
{
- task_io_service_->work_finished();
+ if (this_thread_->private_outstanding_work > 1)
+ {
+ boost::asio::detail::increment(
+ task_io_service_->outstanding_work_,
+ this_thread_->private_outstanding_work - 1);
+ }
+ else if (this_thread_->private_outstanding_work < 1)
+ {
+ task_io_service_->work_finished();
+ }
+ this_thread_->private_outstanding_work = 0;
-#if defined(BOOST_HAS_THREADS) && !defined(BOOST_ASIO_DISABLE_THREADS)
- if (!ops_->empty())
+#if defined(BOOST_HAS_THREADS) && !defined(BOOST_ASIO_DISABLE_THREADS)
+ if (!this_thread_->private_op_queue.empty())
{
lock_->lock();
- task_io_service_->op_queue_.push(*ops_);
+ task_io_service_->op_queue_.push(this_thread_->private_op_queue);
}
#endif // defined(BOOST_HAS_THREADS) && !defined(BOOST_ASIO_DISABLE_THREADS)
}
task_io_service* task_io_service_;
mutex::scoped_lock* lock_;
- op_queue<operation>* ops_;
-};
-
-struct task_io_service::thread_info
-{
- event* wakeup_event;
- op_queue<operation>* private_op_queue;
- thread_info* next;
+ thread_info* this_thread_;
};
task_io_service::task_io_service(
@@ -131,19 +150,14 @@ std::size_t task_io_service::run(boost::system::error_code& ec)
thread_info this_thread;
event wakeup_event;
this_thread.wakeup_event = &wakeup_event;
- op_queue<operation> private_op_queue;
-#if defined(BOOST_HAS_THREADS) && !defined(BOOST_ASIO_DISABLE_THREADS)
- this_thread.private_op_queue = one_thread_ == 1 ? &private_op_queue : 0;
-#else // defined(BOOST_HAS_THREADS) && !defined(BOOST_ASIO_DISABLE_THREADS)
- this_thread.private_op_queue = 0;
-#endif // defined(BOOST_HAS_THREADS) && !defined(BOOST_ASIO_DISABLE_THREADS)
+ this_thread.private_outstanding_work = 0;
this_thread.next = 0;
thread_call_stack::context ctx(this, this_thread);
mutex::scoped_lock lock(mutex_);
std::size_t n = 0;
- for (; do_run_one(lock, this_thread, private_op_queue, ec); lock.lock())
+ for (; do_run_one(lock, this_thread, ec); lock.lock())
if (n != (std::numeric_limits<std::size_t>::max)())
++n;
return n;
@@ -161,14 +175,13 @@ std::size_t task_io_service::run_one(boost::system::error_code& ec)
thread_info this_thread;
event wakeup_event;
this_thread.wakeup_event = &wakeup_event;
- op_queue<operation> private_op_queue;
- this_thread.private_op_queue = 0;
+ this_thread.private_outstanding_work = 0;
this_thread.next = 0;
thread_call_stack::context ctx(this, this_thread);
mutex::scoped_lock lock(mutex_);
- return do_run_one(lock, this_thread, private_op_queue, ec);
+ return do_run_one(lock, this_thread, ec);
}
std::size_t task_io_service::poll(boost::system::error_code& ec)
@@ -182,29 +195,23 @@ std::size_t task_io_service::poll(boost::system::error_code& ec)
thread_info this_thread;
this_thread.wakeup_event = 0;
- op_queue<operation> private_op_queue;
-#if defined(BOOST_HAS_THREADS) && !defined(BOOST_ASIO_DISABLE_THREADS)
- this_thread.private_op_queue = one_thread_ == 1 ? &private_op_queue : 0;
-#else // defined(BOOST_HAS_THREADS) && !defined(BOOST_ASIO_DISABLE_THREADS)
- this_thread.private_op_queue = 0;
-#endif // defined(BOOST_HAS_THREADS) && !defined(BOOST_ASIO_DISABLE_THREADS)
+ this_thread.private_outstanding_work = 0;
this_thread.next = 0;
thread_call_stack::context ctx(this, this_thread);
mutex::scoped_lock lock(mutex_);
-#if defined(BOOST_HAS_THREADS) && !defined(BOOST_ASIO_DISABLE_THREADS)
+#if defined(BOOST_HAS_THREADS) && !defined(BOOST_ASIO_DISABLE_THREADS)
// We want to support nested calls to poll() and poll_one(), so any handlers
// that are already on a thread-private queue need to be put on to the main
// queue now.
if (one_thread_)
if (thread_info* outer_thread_info = ctx.next_by_key())
- if (outer_thread_info->private_op_queue)
- op_queue_.push(*outer_thread_info->private_op_queue);
+ op_queue_.push(outer_thread_info->private_op_queue);
#endif // defined(BOOST_HAS_THREADS) && !defined(BOOST_ASIO_DISABLE_THREADS)
std::size_t n = 0;
- for (; do_poll_one(lock, private_op_queue, ec); lock.lock())
+ for (; do_poll_one(lock, this_thread, ec); lock.lock())
if (n != (std::numeric_limits<std::size_t>::max)())
++n;
return n;
@@ -221,24 +228,22 @@ std::size_t task_io_service::poll_one(boost::system::error_code& ec)
thread_info this_thread;
this_thread.wakeup_event = 0;
- op_queue<operation> private_op_queue;
- this_thread.private_op_queue = 0;
+ this_thread.private_outstanding_work = 0;
this_thread.next = 0;
thread_call_stack::context ctx(this, this_thread);
mutex::scoped_lock lock(mutex_);
-#if defined(BOOST_HAS_THREADS) && !defined(BOOST_ASIO_DISABLE_THREADS)
+#if defined(BOOST_HAS_THREADS) && !defined(BOOST_ASIO_DISABLE_THREADS)
// We want to support nested calls to poll() and poll_one(), so any handlers
// that are already on a thread-private queue need to be put on to the main
// queue now.
if (one_thread_)
if (thread_info* outer_thread_info = ctx.next_by_key())
- if (outer_thread_info->private_op_queue)
- op_queue_.push(*outer_thread_info->private_op_queue);
+ op_queue_.push(outer_thread_info->private_op_queue);
#endif // defined(BOOST_HAS_THREADS) && !defined(BOOST_ASIO_DISABLE_THREADS)
- return do_poll_one(lock, private_op_queue, ec);
+ return do_poll_one(lock, this_thread, ec);
}
void task_io_service::stop()
@@ -261,22 +266,33 @@ void task_io_service::reset()
void task_io_service::post_immediate_completion(task_io_service::operation* op)
{
+#if defined(BOOST_HAS_THREADS) && !defined(BOOST_ASIO_DISABLE_THREADS)
+ if (one_thread_)
+ {
+ if (thread_info* this_thread = thread_call_stack::contains(this))
+ {
+ ++this_thread->private_outstanding_work;
+ this_thread->private_op_queue.push(op);
+ return;
+ }
+ }
+#endif // defined(BOOST_HAS_THREADS) && !defined(BOOST_ASIO_DISABLE_THREADS)
+
work_started();
- post_deferred_completion(op);
+ mutex::scoped_lock lock(mutex_);
+ op_queue_.push(op);
+ wake_one_thread_and_unlock(lock);
}
void task_io_service::post_deferred_completion(task_io_service::operation* op)
{
-#if defined(BOOST_HAS_THREADS) && !defined(BOOST_ASIO_DISABLE_THREADS)
+#if defined(BOOST_HAS_THREADS) && !defined(BOOST_ASIO_DISABLE_THREADS)
if (one_thread_)
{
if (thread_info* this_thread = thread_call_stack::contains(this))
{
- if (this_thread->private_op_queue)
- {
- this_thread->private_op_queue->push(op);
- return;
- }
+ this_thread->private_op_queue.push(op);
+ return;
}
}
#endif // defined(BOOST_HAS_THREADS) && !defined(BOOST_ASIO_DISABLE_THREADS)
@@ -291,16 +307,13 @@ void task_io_service::post_deferred_completions(
{
if (!ops.empty())
{
-#if defined(BOOST_HAS_THREADS) && !defined(BOOST_ASIO_DISABLE_THREADS)
+#if defined(BOOST_HAS_THREADS) && !defined(BOOST_ASIO_DISABLE_THREADS)
if (one_thread_)
{
if (thread_info* this_thread = thread_call_stack::contains(this))
{
- if (this_thread->private_op_queue)
- {
- this_thread->private_op_queue->push(ops);
- return;
- }
+ this_thread->private_op_queue.push(ops);
+ return;
}
}
#endif // defined(BOOST_HAS_THREADS) && !defined(BOOST_ASIO_DISABLE_THREADS)
@@ -311,6 +324,44 @@ void task_io_service::post_deferred_completions(
}
}
+void task_io_service::post_private_immediate_completion(
+ task_io_service::operation* op)
+{
+ work_started();
+ post_private_deferred_completion(op);
+}
+
+void task_io_service::post_private_deferred_completion(
+ task_io_service::operation* op)
+{
+#if defined(BOOST_HAS_THREADS) && !defined(BOOST_ASIO_DISABLE_THREADS)
+ if (thread_info* this_thread = thread_call_stack::contains(this))
+ {
+ this_thread->private_op_queue.push(op);
+ return;
+ }
+#endif // defined(BOOST_HAS_THREADS) && !defined(BOOST_ASIO_DISABLE_THREADS)
+
+ mutex::scoped_lock lock(mutex_);
+ op_queue_.push(op);
+ wake_one_thread_and_unlock(lock);
+}
+
+void task_io_service::post_non_private_immediate_completion(
+ task_io_service::operation* op)
+{
+ work_started();
+ post_non_private_deferred_completion(op);
+}
+
+void task_io_service::post_non_private_deferred_completion(
+ task_io_service::operation* op)
+{
+ mutex::scoped_lock lock(mutex_);
+ op_queue_.push(op);
+ wake_one_thread_and_unlock(lock);
+}
+
void task_io_service::abandon_operations(
op_queue<task_io_service::operation>& ops)
{
@@ -320,7 +371,7 @@ void task_io_service::abandon_operations(
std::size_t task_io_service::do_run_one(mutex::scoped_lock& lock,
task_io_service::thread_info& this_thread,
- op_queue<operation>& private_op_queue, const boost::system::error_code& ec)
+ const boost::system::error_code& ec)
{
while (!stopped_)
{
@@ -343,14 +394,13 @@ std::size_t task_io_service::do_run_one(mutex::scoped_lock& lock,
else
lock.unlock();
- op_queue<operation> completed_ops;
- task_cleanup on_exit = { this, &lock, &completed_ops };
+ task_cleanup on_exit = { this, &lock, &this_thread };
(void)on_exit;
// Run the task. May throw an exception. Only block if the operation
// queue is empty and we're not polling, otherwise we want to return
// as soon as possible.
- task_->run(!more_handlers, completed_ops);
+ task_->run(!more_handlers, this_thread.private_op_queue);
}
else
{
@@ -362,7 +412,7 @@ std::size_t task_io_service::do_run_one(mutex::scoped_lock& lock,
lock.unlock();
// Ensure the count of outstanding work is decremented on block exit.
- work_cleanup on_exit = { this, &lock, &private_op_queue };
+ work_cleanup on_exit = { this, &lock, &this_thread };
(void)on_exit;
// Complete the operation. May throw an exception. Deletes the object.
@@ -385,7 +435,8 @@ std::size_t task_io_service::do_run_one(mutex::scoped_lock& lock,
}
std::size_t task_io_service::do_poll_one(mutex::scoped_lock& lock,
- op_queue<operation>& private_op_queue, const boost::system::error_code& ec)
+ task_io_service::thread_info& this_thread,
+ const boost::system::error_code& ec)
{
if (stopped_)
return 0;
@@ -397,14 +448,13 @@ std::size_t task_io_service::do_poll_one(mutex::scoped_lock& lock,
lock.unlock();
{
- op_queue<operation> completed_ops;
- task_cleanup c = { this, &lock, &completed_ops };
+ task_cleanup c = { this, &lock, &this_thread };
(void)c;
// Run the task. May throw an exception. Only block if the operation
// queue is empty and we're not polling, otherwise we want to return
// as soon as possible.
- task_->run(false, completed_ops);
+ task_->run(false, this_thread.private_op_queue);
}
o = op_queue_.front();
@@ -426,7 +476,7 @@ std::size_t task_io_service::do_poll_one(mutex::scoped_lock& lock,
lock.unlock();
// Ensure the count of outstanding work is decremented on block exit.
- work_cleanup on_exit = { this, &lock, &private_op_queue };
+ work_cleanup on_exit = { this, &lock, &this_thread };
(void)on_exit;
// Complete the operation. May throw an exception. Deletes the object.