From bb4dd8289b351fae6b55e303f189127a394a1edd Mon Sep 17 00:00:00 2001 From: Anas Nashif Date: Mon, 26 Aug 2013 08:15:55 -0400 Subject: Imported Upstream version 1.51.0 --- boost/asio/detail/impl/epoll_reactor.ipp | 50 ++++++-- boost/asio/detail/impl/socket_ops.ipp | 1 + boost/asio/detail/impl/strand_service.hpp | 5 - boost/asio/detail/impl/strand_service.ipp | 2 +- boost/asio/detail/impl/task_io_service.hpp | 2 +- boost/asio/detail/impl/task_io_service.ipp | 176 ++++++++++++++++++----------- 6 files changed, 154 insertions(+), 82 deletions(-) (limited to 'boost/asio/detail/impl') diff --git a/boost/asio/detail/impl/epoll_reactor.ipp b/boost/asio/detail/impl/epoll_reactor.ipp index 771edea67c..073bd08825 100644 --- a/boost/asio/detail/impl/epoll_reactor.ipp +++ b/boost/asio/detail/impl/epoll_reactor.ipp @@ -127,7 +127,7 @@ void epoll_reactor::fork_service(boost::asio::io_service::fork_event fork_ev) for (descriptor_state* state = registered_descriptors_.first(); state != 0; state = state->next_) { - ev.events = EPOLLIN | EPOLLERR | EPOLLHUP | EPOLLOUT | EPOLLPRI | EPOLLET; + ev.events = state->registered_events_; ev.data.ptr = state; int result = epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, state->descriptor_, &ev); if (result != 0) @@ -159,7 +159,8 @@ int epoll_reactor::register_descriptor(socket_type descriptor, } epoll_event ev = { 0, { 0 } }; - ev.events = EPOLLIN | EPOLLERR | EPOLLHUP | EPOLLOUT | EPOLLPRI | EPOLLET; + ev.events = EPOLLIN | EPOLLERR | EPOLLHUP | EPOLLPRI | EPOLLET; + descriptor_data->registered_events_ = ev.events; ev.data.ptr = descriptor_data; int result = epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, descriptor, &ev); if (result != 0) @@ -184,7 +185,8 @@ int epoll_reactor::register_internal_descriptor( } epoll_event ev = { 0, { 0 } }; - ev.events = EPOLLIN | EPOLLERR | EPOLLHUP | EPOLLOUT | EPOLLPRI | EPOLLET; + ev.events = EPOLLIN | EPOLLERR | EPOLLHUP | EPOLLPRI | EPOLLET; + descriptor_data->registered_events_ = ev.events; ev.data.ptr = descriptor_data; int result = epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, descriptor, &ev); if (result != 0) @@ -222,23 +224,47 @@ void epoll_reactor::start_op(int op_type, socket_type descriptor, if (descriptor_data->op_queue_[op_type].empty()) { - if (allow_speculative) + if (allow_speculative + && (op_type != read_op + || descriptor_data->op_queue_[except_op].empty())) { - if (op_type != read_op || descriptor_data->op_queue_[except_op].empty()) + if (op->perform()) { - if (op->perform()) + descriptor_lock.unlock(); + io_service_.post_immediate_completion(op); + return; + } + + if (op_type == write_op) + { + if ((descriptor_data->registered_events_ & EPOLLOUT) == 0) { - descriptor_lock.unlock(); - io_service_.post_immediate_completion(op); - return; + epoll_event ev = { 0, { 0 } }; + ev.events = descriptor_data->registered_events_ | EPOLLOUT; + ev.data.ptr = descriptor_data; + if (epoll_ctl(epoll_fd_, EPOLL_CTL_MOD, descriptor, &ev) == 0) + { + descriptor_data->registered_events_ |= ev.events; + } + else + { + op->ec_ = boost::system::error_code(errno, + boost::asio::error::get_system_category()); + io_service_.post_immediate_completion(op); + return; + } } } } else { + if (op_type == write_op) + { + descriptor_data->registered_events_ |= EPOLLOUT; + } + epoll_event ev = { 0, { 0 } }; - ev.events = EPOLLIN | EPOLLERR | EPOLLHUP - | EPOLLOUT | EPOLLPRI | EPOLLET; + ev.events = descriptor_data->registered_events_; ev.data.ptr = descriptor_data; epoll_ctl(epoll_fd_, EPOLL_CTL_MOD, descriptor, &ev); } @@ -440,7 +466,7 @@ int epoll_reactor::do_epoll_create() errno = EINVAL; #endif // defined(EPOLL_CLOEXEC) - if (fd == -1 && errno == EINVAL) + if (fd == -1 && (errno == EINVAL || errno == ENOSYS)) { fd = epoll_create(epoll_size); if (fd != -1) diff --git a/boost/asio/detail/impl/socket_ops.ipp b/boost/asio/detail/impl/socket_ops.ipp index 24d2d66ad1..16e95366e6 100644 --- a/boost/asio/detail/impl/socket_ops.ipp +++ b/boost/asio/detail/impl/socket_ops.ipp @@ -18,6 +18,7 @@ #include #include #include +#include #include #include #include diff --git a/boost/asio/detail/impl/strand_service.hpp b/boost/asio/detail/impl/strand_service.hpp index 1d98d99055..7581852ff5 100644 --- a/boost/asio/detail/impl/strand_service.hpp +++ b/boost/asio/detail/impl/strand_service.hpp @@ -50,11 +50,6 @@ struct strand_service::on_dispatch_exit } }; -inline void strand_service::destroy(strand_service::implementation_type& impl) -{ - impl = 0; -} - template void strand_service::dispatch(strand_service::implementation_type& impl, Handler handler) diff --git a/boost/asio/detail/impl/strand_service.ipp b/boost/asio/detail/impl/strand_service.ipp index 64e4cc00ce..1912d80f72 100644 --- a/boost/asio/detail/impl/strand_service.ipp +++ b/boost/asio/detail/impl/strand_service.ipp @@ -38,7 +38,7 @@ struct strand_service::on_do_complete_exit impl_->mutex_.unlock(); if (more_handlers) - owner_->post_immediate_completion(impl_); + owner_->post_private_immediate_completion(impl_); } }; diff --git a/boost/asio/detail/impl/task_io_service.hpp b/boost/asio/detail/impl/task_io_service.hpp index 7cd7449e60..2cc7b7eea1 100644 --- a/boost/asio/detail/impl/task_io_service.hpp +++ b/boost/asio/detail/impl/task_io_service.hpp @@ -45,7 +45,7 @@ void task_io_service::dispatch(Handler handler) BOOST_ASIO_HANDLER_CREATION((p.p, "io_service", this, "dispatch")); - post_immediate_completion(p.p); + post_non_private_immediate_completion(p.p); p.v = p.p = 0; } } diff --git a/boost/asio/detail/impl/task_io_service.ipp b/boost/asio/detail/impl/task_io_service.ipp index 3d679c2dca..674df638fe 100644 --- a/boost/asio/detail/impl/task_io_service.ipp +++ b/boost/asio/detail/impl/task_io_service.ipp @@ -30,48 +30,67 @@ namespace boost { namespace asio { namespace detail { +struct task_io_service::thread_info +{ + event* wakeup_event; + op_queue private_op_queue; + long private_outstanding_work; + thread_info* next; +}; + struct task_io_service::task_cleanup { ~task_cleanup() { + if (this_thread_->private_outstanding_work > 0) + { + boost::asio::detail::increment( + task_io_service_->outstanding_work_, + this_thread_->private_outstanding_work); + } + this_thread_->private_outstanding_work = 0; + // Enqueue the completed operations and reinsert the task at the end of // the operation queue. lock_->lock(); task_io_service_->task_interrupted_ = true; - task_io_service_->op_queue_.push(*ops_); + task_io_service_->op_queue_.push(this_thread_->private_op_queue); task_io_service_->op_queue_.push(&task_io_service_->task_operation_); } task_io_service* task_io_service_; mutex::scoped_lock* lock_; - op_queue* ops_; + thread_info* this_thread_; }; struct task_io_service::work_cleanup { ~work_cleanup() { - task_io_service_->work_finished(); + if (this_thread_->private_outstanding_work > 1) + { + boost::asio::detail::increment( + task_io_service_->outstanding_work_, + this_thread_->private_outstanding_work - 1); + } + else if (this_thread_->private_outstanding_work < 1) + { + task_io_service_->work_finished(); + } + this_thread_->private_outstanding_work = 0; -#if defined(BOOST_HAS_THREADS) && !defined(BOOST_ASIO_DISABLE_THREADS) - if (!ops_->empty()) +#if defined(BOOST_HAS_THREADS) && !defined(BOOST_ASIO_DISABLE_THREADS) + if (!this_thread_->private_op_queue.empty()) { lock_->lock(); - task_io_service_->op_queue_.push(*ops_); + task_io_service_->op_queue_.push(this_thread_->private_op_queue); } #endif // defined(BOOST_HAS_THREADS) && !defined(BOOST_ASIO_DISABLE_THREADS) } task_io_service* task_io_service_; mutex::scoped_lock* lock_; - op_queue* ops_; -}; - -struct task_io_service::thread_info -{ - event* wakeup_event; - op_queue* private_op_queue; - thread_info* next; + thread_info* this_thread_; }; task_io_service::task_io_service( @@ -131,19 +150,14 @@ std::size_t task_io_service::run(boost::system::error_code& ec) thread_info this_thread; event wakeup_event; this_thread.wakeup_event = &wakeup_event; - op_queue private_op_queue; -#if defined(BOOST_HAS_THREADS) && !defined(BOOST_ASIO_DISABLE_THREADS) - this_thread.private_op_queue = one_thread_ == 1 ? &private_op_queue : 0; -#else // defined(BOOST_HAS_THREADS) && !defined(BOOST_ASIO_DISABLE_THREADS) - this_thread.private_op_queue = 0; -#endif // defined(BOOST_HAS_THREADS) && !defined(BOOST_ASIO_DISABLE_THREADS) + this_thread.private_outstanding_work = 0; this_thread.next = 0; thread_call_stack::context ctx(this, this_thread); mutex::scoped_lock lock(mutex_); std::size_t n = 0; - for (; do_run_one(lock, this_thread, private_op_queue, ec); lock.lock()) + for (; do_run_one(lock, this_thread, ec); lock.lock()) if (n != (std::numeric_limits::max)()) ++n; return n; @@ -161,14 +175,13 @@ std::size_t task_io_service::run_one(boost::system::error_code& ec) thread_info this_thread; event wakeup_event; this_thread.wakeup_event = &wakeup_event; - op_queue private_op_queue; - this_thread.private_op_queue = 0; + this_thread.private_outstanding_work = 0; this_thread.next = 0; thread_call_stack::context ctx(this, this_thread); mutex::scoped_lock lock(mutex_); - return do_run_one(lock, this_thread, private_op_queue, ec); + return do_run_one(lock, this_thread, ec); } std::size_t task_io_service::poll(boost::system::error_code& ec) @@ -182,29 +195,23 @@ std::size_t task_io_service::poll(boost::system::error_code& ec) thread_info this_thread; this_thread.wakeup_event = 0; - op_queue private_op_queue; -#if defined(BOOST_HAS_THREADS) && !defined(BOOST_ASIO_DISABLE_THREADS) - this_thread.private_op_queue = one_thread_ == 1 ? &private_op_queue : 0; -#else // defined(BOOST_HAS_THREADS) && !defined(BOOST_ASIO_DISABLE_THREADS) - this_thread.private_op_queue = 0; -#endif // defined(BOOST_HAS_THREADS) && !defined(BOOST_ASIO_DISABLE_THREADS) + this_thread.private_outstanding_work = 0; this_thread.next = 0; thread_call_stack::context ctx(this, this_thread); mutex::scoped_lock lock(mutex_); -#if defined(BOOST_HAS_THREADS) && !defined(BOOST_ASIO_DISABLE_THREADS) +#if defined(BOOST_HAS_THREADS) && !defined(BOOST_ASIO_DISABLE_THREADS) // We want to support nested calls to poll() and poll_one(), so any handlers // that are already on a thread-private queue need to be put on to the main // queue now. if (one_thread_) if (thread_info* outer_thread_info = ctx.next_by_key()) - if (outer_thread_info->private_op_queue) - op_queue_.push(*outer_thread_info->private_op_queue); + op_queue_.push(outer_thread_info->private_op_queue); #endif // defined(BOOST_HAS_THREADS) && !defined(BOOST_ASIO_DISABLE_THREADS) std::size_t n = 0; - for (; do_poll_one(lock, private_op_queue, ec); lock.lock()) + for (; do_poll_one(lock, this_thread, ec); lock.lock()) if (n != (std::numeric_limits::max)()) ++n; return n; @@ -221,24 +228,22 @@ std::size_t task_io_service::poll_one(boost::system::error_code& ec) thread_info this_thread; this_thread.wakeup_event = 0; - op_queue private_op_queue; - this_thread.private_op_queue = 0; + this_thread.private_outstanding_work = 0; this_thread.next = 0; thread_call_stack::context ctx(this, this_thread); mutex::scoped_lock lock(mutex_); -#if defined(BOOST_HAS_THREADS) && !defined(BOOST_ASIO_DISABLE_THREADS) +#if defined(BOOST_HAS_THREADS) && !defined(BOOST_ASIO_DISABLE_THREADS) // We want to support nested calls to poll() and poll_one(), so any handlers // that are already on a thread-private queue need to be put on to the main // queue now. if (one_thread_) if (thread_info* outer_thread_info = ctx.next_by_key()) - if (outer_thread_info->private_op_queue) - op_queue_.push(*outer_thread_info->private_op_queue); + op_queue_.push(outer_thread_info->private_op_queue); #endif // defined(BOOST_HAS_THREADS) && !defined(BOOST_ASIO_DISABLE_THREADS) - return do_poll_one(lock, private_op_queue, ec); + return do_poll_one(lock, this_thread, ec); } void task_io_service::stop() @@ -261,22 +266,33 @@ void task_io_service::reset() void task_io_service::post_immediate_completion(task_io_service::operation* op) { +#if defined(BOOST_HAS_THREADS) && !defined(BOOST_ASIO_DISABLE_THREADS) + if (one_thread_) + { + if (thread_info* this_thread = thread_call_stack::contains(this)) + { + ++this_thread->private_outstanding_work; + this_thread->private_op_queue.push(op); + return; + } + } +#endif // defined(BOOST_HAS_THREADS) && !defined(BOOST_ASIO_DISABLE_THREADS) + work_started(); - post_deferred_completion(op); + mutex::scoped_lock lock(mutex_); + op_queue_.push(op); + wake_one_thread_and_unlock(lock); } void task_io_service::post_deferred_completion(task_io_service::operation* op) { -#if defined(BOOST_HAS_THREADS) && !defined(BOOST_ASIO_DISABLE_THREADS) +#if defined(BOOST_HAS_THREADS) && !defined(BOOST_ASIO_DISABLE_THREADS) if (one_thread_) { if (thread_info* this_thread = thread_call_stack::contains(this)) { - if (this_thread->private_op_queue) - { - this_thread->private_op_queue->push(op); - return; - } + this_thread->private_op_queue.push(op); + return; } } #endif // defined(BOOST_HAS_THREADS) && !defined(BOOST_ASIO_DISABLE_THREADS) @@ -291,16 +307,13 @@ void task_io_service::post_deferred_completions( { if (!ops.empty()) { -#if defined(BOOST_HAS_THREADS) && !defined(BOOST_ASIO_DISABLE_THREADS) +#if defined(BOOST_HAS_THREADS) && !defined(BOOST_ASIO_DISABLE_THREADS) if (one_thread_) { if (thread_info* this_thread = thread_call_stack::contains(this)) { - if (this_thread->private_op_queue) - { - this_thread->private_op_queue->push(ops); - return; - } + this_thread->private_op_queue.push(ops); + return; } } #endif // defined(BOOST_HAS_THREADS) && !defined(BOOST_ASIO_DISABLE_THREADS) @@ -311,6 +324,44 @@ void task_io_service::post_deferred_completions( } } +void task_io_service::post_private_immediate_completion( + task_io_service::operation* op) +{ + work_started(); + post_private_deferred_completion(op); +} + +void task_io_service::post_private_deferred_completion( + task_io_service::operation* op) +{ +#if defined(BOOST_HAS_THREADS) && !defined(BOOST_ASIO_DISABLE_THREADS) + if (thread_info* this_thread = thread_call_stack::contains(this)) + { + this_thread->private_op_queue.push(op); + return; + } +#endif // defined(BOOST_HAS_THREADS) && !defined(BOOST_ASIO_DISABLE_THREADS) + + mutex::scoped_lock lock(mutex_); + op_queue_.push(op); + wake_one_thread_and_unlock(lock); +} + +void task_io_service::post_non_private_immediate_completion( + task_io_service::operation* op) +{ + work_started(); + post_non_private_deferred_completion(op); +} + +void task_io_service::post_non_private_deferred_completion( + task_io_service::operation* op) +{ + mutex::scoped_lock lock(mutex_); + op_queue_.push(op); + wake_one_thread_and_unlock(lock); +} + void task_io_service::abandon_operations( op_queue& ops) { @@ -320,7 +371,7 @@ void task_io_service::abandon_operations( std::size_t task_io_service::do_run_one(mutex::scoped_lock& lock, task_io_service::thread_info& this_thread, - op_queue& private_op_queue, const boost::system::error_code& ec) + const boost::system::error_code& ec) { while (!stopped_) { @@ -343,14 +394,13 @@ std::size_t task_io_service::do_run_one(mutex::scoped_lock& lock, else lock.unlock(); - op_queue completed_ops; - task_cleanup on_exit = { this, &lock, &completed_ops }; + task_cleanup on_exit = { this, &lock, &this_thread }; (void)on_exit; // Run the task. May throw an exception. Only block if the operation // queue is empty and we're not polling, otherwise we want to return // as soon as possible. - task_->run(!more_handlers, completed_ops); + task_->run(!more_handlers, this_thread.private_op_queue); } else { @@ -362,7 +412,7 @@ std::size_t task_io_service::do_run_one(mutex::scoped_lock& lock, lock.unlock(); // Ensure the count of outstanding work is decremented on block exit. - work_cleanup on_exit = { this, &lock, &private_op_queue }; + work_cleanup on_exit = { this, &lock, &this_thread }; (void)on_exit; // Complete the operation. May throw an exception. Deletes the object. @@ -385,7 +435,8 @@ std::size_t task_io_service::do_run_one(mutex::scoped_lock& lock, } std::size_t task_io_service::do_poll_one(mutex::scoped_lock& lock, - op_queue& private_op_queue, const boost::system::error_code& ec) + task_io_service::thread_info& this_thread, + const boost::system::error_code& ec) { if (stopped_) return 0; @@ -397,14 +448,13 @@ std::size_t task_io_service::do_poll_one(mutex::scoped_lock& lock, lock.unlock(); { - op_queue completed_ops; - task_cleanup c = { this, &lock, &completed_ops }; + task_cleanup c = { this, &lock, &this_thread }; (void)c; // Run the task. May throw an exception. Only block if the operation // queue is empty and we're not polling, otherwise we want to return // as soon as possible. - task_->run(false, completed_ops); + task_->run(false, this_thread.private_op_queue); } o = op_queue_.front(); @@ -426,7 +476,7 @@ std::size_t task_io_service::do_poll_one(mutex::scoped_lock& lock, lock.unlock(); // Ensure the count of outstanding work is decremented on block exit. - work_cleanup on_exit = { this, &lock, &private_op_queue }; + work_cleanup on_exit = { this, &lock, &this_thread }; (void)on_exit; // Complete the operation. May throw an exception. Deletes the object. -- cgit v1.2.3