From 08c1e93fa36a49f49325a07fe91ff92c964c2b6c Mon Sep 17 00:00:00 2001 From: Chanho Park Date: Thu, 11 Dec 2014 18:55:56 +0900 Subject: Imported Upstream version 1.57.0 --- boost/asio/detail/impl/kqueue_reactor.ipp | 228 +++++++++++++----------------- 1 file changed, 100 insertions(+), 128 deletions(-) (limited to 'boost/asio/detail/impl/kqueue_reactor.ipp') diff --git a/boost/asio/detail/impl/kqueue_reactor.ipp b/boost/asio/detail/impl/kqueue_reactor.ipp index a819eb9b74..e0a711c4c3 100644 --- a/boost/asio/detail/impl/kqueue_reactor.ipp +++ b/boost/asio/detail/impl/kqueue_reactor.ipp @@ -2,7 +2,7 @@ // detail/impl/kqueue_reactor.ipp // ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ // -// Copyright (c) 2003-2012 Christopher M. Kohlhoff (chris at kohlhoff dot com) +// Copyright (c) 2003-2014 Christopher M. Kohlhoff (chris at kohlhoff dot com) // Copyright (c) 2005 Stefan Arentz (stefan at soze dot com) // // Distributed under the Boost Software License, Version 1.0. (See accompanying @@ -47,10 +47,15 @@ kqueue_reactor::kqueue_reactor(boost::asio::io_service& io_service) interrupter_(), shutdown_(false) { - // The interrupter is put into a permanently readable state. Whenever we want - // to interrupt the blocked kevent call we register a read operation against - // the descriptor. - interrupter_.interrupt(); + struct kevent events[1]; + BOOST_ASIO_KQUEUE_EV_SET(&events[0], interrupter_.read_descriptor(), + EVFILT_READ, EV_ADD, 0, 0, &interrupter_); + if (::kevent(kqueue_fd_, events, 1, 0, 0, 0) == -1) + { + boost::system::error_code error(errno, + boost::asio::error::get_system_category()); + boost::asio::detail::throw_error(error); + } } kqueue_reactor::~kqueue_reactor() @@ -89,30 +94,33 @@ void kqueue_reactor::fork_service(boost::asio::io_service::fork_event fork_ev) interrupter_.recreate(); + struct kevent events[2]; + BOOST_ASIO_KQUEUE_EV_SET(&events[0], interrupter_.read_descriptor(), + EVFILT_READ, EV_ADD, 0, 0, &interrupter_); + if (::kevent(kqueue_fd_, events, 1, 0, 0, 0) == -1) + { + boost::system::error_code ec(errno, + boost::asio::error::get_system_category()); + boost::asio::detail::throw_error(ec, "kqueue interrupter registration"); + } + // Re-register all descriptors with kqueue. mutex::scoped_lock descriptors_lock(registered_descriptors_mutex_); for (descriptor_state* state = registered_descriptors_.first(); state != 0; state = state->next_) { - struct kevent events[2]; - int num_events = 0; - - if (!state->op_queue_[read_op].empty()) - BOOST_ASIO_KQUEUE_EV_SET(&events[num_events++], state->descriptor_, + if (state->num_kevents_ > 0) + { + BOOST_ASIO_KQUEUE_EV_SET(&events[0], state->descriptor_, EVFILT_READ, EV_ADD | EV_CLEAR, 0, 0, state); - else if (!state->op_queue_[except_op].empty()) - BOOST_ASIO_KQUEUE_EV_SET(&events[num_events++], state->descriptor_, - EVFILT_READ, EV_ADD | EV_CLEAR, EV_OOBAND, 0, state); - - if (!state->op_queue_[write_op].empty()) - BOOST_ASIO_KQUEUE_EV_SET(&events[num_events++], state->descriptor_, + BOOST_ASIO_KQUEUE_EV_SET(&events[1], state->descriptor_, EVFILT_WRITE, EV_ADD | EV_CLEAR, 0, 0, state); - - if (num_events && ::kevent(kqueue_fd_, events, num_events, 0, 0, 0) == -1) - { - boost::system::error_code error(errno, - boost::asio::error::get_system_category()); - boost::asio::detail::throw_error(error); + if (::kevent(kqueue_fd_, events, state->num_kevents_, 0, 0, 0) == -1) + { + boost::system::error_code ec(errno, + boost::asio::error::get_system_category()); + boost::asio::detail::throw_error(ec, "kqueue re-registration"); + } } } } @@ -131,6 +139,7 @@ int kqueue_reactor::register_descriptor(socket_type descriptor, mutex::scoped_lock lock(descriptor_data->mutex_); descriptor_data->descriptor_ = descriptor; + descriptor_data->num_kevents_ = 0; descriptor_data->shutdown_ = false; return 0; @@ -145,26 +154,15 @@ int kqueue_reactor::register_internal_descriptor( mutex::scoped_lock lock(descriptor_data->mutex_); descriptor_data->descriptor_ = descriptor; + descriptor_data->num_kevents_ = 1; descriptor_data->shutdown_ = false; descriptor_data->op_queue_[op_type].push(op); - struct kevent event; - switch (op_type) - { - case read_op: - BOOST_ASIO_KQUEUE_EV_SET(&event, descriptor, EVFILT_READ, - EV_ADD | EV_CLEAR, 0, 0, descriptor_data); - break; - case write_op: - BOOST_ASIO_KQUEUE_EV_SET(&event, descriptor, EVFILT_WRITE, - EV_ADD | EV_CLEAR, 0, 0, descriptor_data); - break; - case except_op: - BOOST_ASIO_KQUEUE_EV_SET(&event, descriptor, EVFILT_READ, - EV_ADD | EV_CLEAR, EV_OOBAND, 0, descriptor_data); - break; - } - ::kevent(kqueue_fd_, &event, 1, 0, 0, 0); + struct kevent events[1]; + BOOST_ASIO_KQUEUE_EV_SET(&events[0], descriptor, EVFILT_READ, + EV_ADD | EV_CLEAR, 0, 0, descriptor_data); + if (::kevent(kqueue_fd_, events, 1, 0, 0, 0) == -1) + return errno; return 0; } @@ -178,13 +176,13 @@ void kqueue_reactor::move_descriptor(socket_type, } void kqueue_reactor::start_op(int op_type, socket_type descriptor, - kqueue_reactor::per_descriptor_data& descriptor_data, - reactor_op* op, bool allow_speculative) + kqueue_reactor::per_descriptor_data& descriptor_data, reactor_op* op, + bool is_continuation, bool allow_speculative) { if (!descriptor_data) { op->ec_ = boost::asio::error::bad_descriptor; - post_immediate_completion(op); + post_immediate_completion(op, is_continuation); return; } @@ -192,59 +190,61 @@ void kqueue_reactor::start_op(int op_type, socket_type descriptor, if (descriptor_data->shutdown_) { - post_immediate_completion(op); + post_immediate_completion(op, is_continuation); return; } - bool first = descriptor_data->op_queue_[op_type].empty(); - if (first) + if (descriptor_data->op_queue_[op_type].empty()) { - if (allow_speculative) + static const int num_kevents[max_ops] = { 1, 2, 1 }; + + if (allow_speculative + && (op_type != read_op + || descriptor_data->op_queue_[except_op].empty())) { - if (op_type != read_op || descriptor_data->op_queue_[except_op].empty()) + if (op->perform()) + { + descriptor_lock.unlock(); + io_service_.post_immediate_completion(op, is_continuation); + return; + } + + if (descriptor_data->num_kevents_ < num_kevents[op_type]) { - if (op->perform()) + struct kevent events[2]; + BOOST_ASIO_KQUEUE_EV_SET(&events[0], descriptor, EVFILT_READ, + EV_ADD | EV_CLEAR, 0, 0, descriptor_data); + BOOST_ASIO_KQUEUE_EV_SET(&events[1], descriptor, EVFILT_WRITE, + EV_ADD | EV_CLEAR, 0, 0, descriptor_data); + if (::kevent(kqueue_fd_, events, num_kevents[op_type], 0, 0, 0) != -1) + { + descriptor_data->num_kevents_ = num_kevents[op_type]; + } + else { - descriptor_lock.unlock(); - io_service_.post_immediate_completion(op); + op->ec_ = boost::system::error_code(errno, + boost::asio::error::get_system_category()); + io_service_.post_immediate_completion(op, is_continuation); return; } } } - } - - descriptor_data->op_queue_[op_type].push(op); - io_service_.work_started(); - - if (first) - { - struct kevent event; - switch (op_type) + else { - case read_op: - BOOST_ASIO_KQUEUE_EV_SET(&event, descriptor, EVFILT_READ, + if (descriptor_data->num_kevents_ < num_kevents[op_type]) + descriptor_data->num_kevents_ = num_kevents[op_type]; + + struct kevent events[2]; + BOOST_ASIO_KQUEUE_EV_SET(&events[0], descriptor, EVFILT_READ, EV_ADD | EV_CLEAR, 0, 0, descriptor_data); - break; - case write_op: - BOOST_ASIO_KQUEUE_EV_SET(&event, descriptor, EVFILT_WRITE, + BOOST_ASIO_KQUEUE_EV_SET(&events[1], descriptor, EVFILT_WRITE, EV_ADD | EV_CLEAR, 0, 0, descriptor_data); - break; - case except_op: - if (!descriptor_data->op_queue_[read_op].empty()) - return; // Already registered for read events. - BOOST_ASIO_KQUEUE_EV_SET(&event, descriptor, EVFILT_READ, - EV_ADD | EV_CLEAR, EV_OOBAND, 0, descriptor_data); - break; - } - - if (::kevent(kqueue_fd_, &event, 1, 0, 0, 0) == -1) - { - op->ec_ = boost::system::error_code(errno, - boost::asio::error::get_system_category()); - descriptor_data->op_queue_[op_type].pop(); - io_service_.post_deferred_completion(op); + ::kevent(kqueue_fd_, events, descriptor_data->num_kevents_, 0, 0, 0); } } + + descriptor_data->op_queue_[op_type].push(op); + io_service_.work_started(); } void kqueue_reactor::cancel_ops(socket_type, @@ -293,7 +293,7 @@ void kqueue_reactor::deregister_descriptor(socket_type descriptor, EVFILT_READ, EV_DELETE, 0, 0, 0); BOOST_ASIO_KQUEUE_EV_SET(&events[1], descriptor, EVFILT_WRITE, EV_DELETE, 0, 0, 0); - ::kevent(kqueue_fd_, events, 2, 0, 0, 0); + ::kevent(kqueue_fd_, events, descriptor_data->num_kevents_, 0, 0, 0); } op_queue ops; @@ -334,7 +334,7 @@ void kqueue_reactor::deregister_internal_descriptor(socket_type descriptor, EVFILT_READ, EV_DELETE, 0, 0, 0); BOOST_ASIO_KQUEUE_EV_SET(&events[1], descriptor, EVFILT_WRITE, EV_DELETE, 0, 0, 0); - ::kevent(kqueue_fd_, events, 2, 0, 0, 0); + ::kevent(kqueue_fd_, events, descriptor_data->num_kevents_, 0, 0, 0); op_queue ops; for (int i = 0; i < max_ops; ++i) @@ -367,18 +367,31 @@ void kqueue_reactor::run(bool block, op_queue& ops) // Dispatch the waiting events. for (int i = 0; i < num_events; ++i) { - int descriptor = events[i].ident; void* ptr = reinterpret_cast(events[i].udata); if (ptr == &interrupter_) { - // No need to reset the interrupter since we're leaving the descriptor - // in a ready-to-read state and relying on edge-triggered notifications. + interrupter_.reset(); } else { descriptor_state* descriptor_data = static_cast(ptr); mutex::scoped_lock descriptor_lock(descriptor_data->mutex_); + if (events[i].filter == EVFILT_WRITE + && descriptor_data->num_kevents_ == 2 + && descriptor_data->op_queue_[write_op].empty()) + { + // Some descriptor types, like serial ports, don't seem to support + // EV_CLEAR with EVFILT_WRITE. Since we have no pending write + // operations we'll remove the EVFILT_WRITE registration here so that + // we don't end up in a tight spin. + struct kevent delete_events[1]; + BOOST_ASIO_KQUEUE_EV_SET(&delete_events[0], + descriptor_data->descriptor_, EVFILT_WRITE, EV_DELETE, 0, 0, 0); + ::kevent(kqueue_fd_, delete_events, 1, 0, 0, 0); + descriptor_data->num_kevents_ = 1; + } + // Exception operations must be processed first to ensure that any // out-of-band data is read before normal data. #if defined(__NetBSD__) @@ -397,7 +410,8 @@ void kqueue_reactor::run(bool block, op_queue& ops) { if (events[i].flags & EV_ERROR) { - op->ec_ = boost::system::error_code(events[i].data, + op->ec_ = boost::system::error_code( + static_cast(events[i].data), boost::asio::error::get_system_category()); descriptor_data->op_queue_[j].pop(); ops.push(op); @@ -413,45 +427,6 @@ void kqueue_reactor::run(bool block, op_queue& ops) } } } - - // Renew registration for event notifications. - struct kevent event; - switch (events[i].filter) - { - case EVFILT_READ: - if (!descriptor_data->op_queue_[read_op].empty()) - BOOST_ASIO_KQUEUE_EV_SET(&event, descriptor, EVFILT_READ, - EV_ADD | EV_CLEAR, 0, 0, descriptor_data); - else if (!descriptor_data->op_queue_[except_op].empty()) - BOOST_ASIO_KQUEUE_EV_SET(&event, descriptor, EVFILT_READ, - EV_ADD | EV_CLEAR, EV_OOBAND, 0, descriptor_data); - else - continue; - break; - case EVFILT_WRITE: - if (!descriptor_data->op_queue_[write_op].empty()) - BOOST_ASIO_KQUEUE_EV_SET(&event, descriptor, EVFILT_WRITE, - EV_ADD | EV_CLEAR, 0, 0, descriptor_data); - else - continue; - break; - default: - break; - } - if (::kevent(kqueue_fd_, &event, 1, 0, 0, 0) == -1) - { - boost::system::error_code error(errno, - boost::asio::error::get_system_category()); - for (int j = 0; j < max_ops; ++j) - { - while (reactor_op* op = descriptor_data->op_queue_[j].front()) - { - op->ec_ = error; - descriptor_data->op_queue_[j].pop(); - ops.push(op); - } - } - } } } @@ -461,10 +436,7 @@ void kqueue_reactor::run(bool block, op_queue& ops) void kqueue_reactor::interrupt() { - struct kevent event; - BOOST_ASIO_KQUEUE_EV_SET(&event, interrupter_.read_descriptor(), - EVFILT_READ, EV_ADD | EV_CLEAR, 0, 0, &interrupter_); - ::kevent(kqueue_fd_, &event, 1, 0, 0, 0); + interrupter_.interrupt(); } int kqueue_reactor::do_kqueue_create() -- cgit v1.2.3