diff options
Diffstat (limited to 'boost/fiber/buffered_channel.hpp')
-rw-r--r-- | boost/fiber/buffered_channel.hpp | 294 |
1 files changed, 258 insertions, 36 deletions
diff --git a/boost/fiber/buffered_channel.hpp b/boost/fiber/buffered_channel.hpp index 1c32e49bae..3cf22295dd 100644 --- a/boost/fiber/buffered_channel.hpp +++ b/boost/fiber/buffered_channel.hpp @@ -40,7 +40,7 @@ private: typedef context::wait_queue_t wait_queue_type; typedef T slot_type; - alignas(cache_alignment) mutable detail::spinlock splk_{}; + mutable detail::spinlock splk_{}; wait_queue_type waiting_producers_{}; wait_queue_type waiting_consumers_{}; slot_type * slots_; @@ -64,7 +64,7 @@ private: public: explicit buffered_channel( std::size_t capacity) : capacity_{ capacity } { - if ( 2 > capacity_ || 0 != ( capacity_ & (capacity_ - 1) ) ) { + if ( BOOST_UNLIKELY( 2 > capacity_ || 0 != ( capacity_ & (capacity_ - 1) ) ) ) { throw fiber_error{ std::make_error_code( std::errc::invalid_argument), "boost fiber: buffer capacity is invalid" }; } @@ -92,20 +92,52 @@ public: while ( ! waiting_producers_.empty() ) { context * producer_ctx = & waiting_producers_.front(); waiting_producers_.pop_front(); - active_ctx->schedule( producer_ctx); + std::intptr_t expected = reinterpret_cast< std::intptr_t >( this); + if ( producer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) { + // notify before timeout + intrusive_ptr_release( producer_ctx); + // notify context + active_ctx->schedule( producer_ctx); + } else if ( static_cast< std::intptr_t >( 0) == expected) { + // no timed-wait op. + // notify context + active_ctx->schedule( producer_ctx); + } else { + // timed-wait op. + // expected == -1: notify after timeout, same timed-wait op. + // expected == <any>: notify after timeout, another timed-wait op. was already started + intrusive_ptr_release( producer_ctx); + // re-schedule next + } } // notify all waiting consumers while ( ! waiting_consumers_.empty() ) { context * consumer_ctx = & waiting_consumers_.front(); waiting_consumers_.pop_front(); - active_ctx->schedule( consumer_ctx); + std::intptr_t expected = reinterpret_cast< std::intptr_t >( this); + if ( consumer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) { + // notify before timeout + intrusive_ptr_release( consumer_ctx); + // notify context + active_ctx->schedule( consumer_ctx); + } else if ( static_cast< std::intptr_t >( 0) == expected) { + // no timed-wait op. + // notify context + active_ctx->schedule( consumer_ctx); + } else { + // timed-wait op. + // expected == -1: notify after timeout, same timed-wait op. + // expected == <any>: notify after timeout, another timed-wait op. was already started + intrusive_ptr_release( consumer_ctx); + // re-schedule next + } } } channel_op_status try_push( value_type const& value) { context * active_ctx = context::active(); detail::spinlock_lock lk{ splk_ }; - if ( is_closed_() ) { + if ( BOOST_UNLIKELY( is_closed_() ) ) { return channel_op_status::closed; } else if ( is_full_() ) { return channel_op_status::full; @@ -113,11 +145,28 @@ public: slots_[pidx_] = value; pidx_ = (pidx_ + 1) % capacity_; // notify one waiting consumer - if ( ! waiting_consumers_.empty() ) { + while ( ! waiting_consumers_.empty() ) { context * consumer_ctx = & waiting_consumers_.front(); waiting_consumers_.pop_front(); - lk.unlock(); - active_ctx->schedule( consumer_ctx); + std::intptr_t expected = reinterpret_cast< std::intptr_t >( this); + if ( consumer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) { + // notify before timeout + intrusive_ptr_release( consumer_ctx); + // notify context + active_ctx->schedule( consumer_ctx); + break; + } else if ( static_cast< std::intptr_t >( 0) == expected) { + // no timed-wait op. + // notify context + active_ctx->schedule( consumer_ctx); + break; + } else { + // timed-wait op. + // expected == -1: notify after timeout, same timed-wait op. + // expected == <any>: notify after timeout, another timed-wait op. was already started + intrusive_ptr_release( consumer_ctx); + // re-schedule next + } } return channel_op_status::success; } @@ -126,7 +175,7 @@ public: channel_op_status try_push( value_type && value) { context * active_ctx = context::active(); detail::spinlock_lock lk{ splk_ }; - if ( is_closed_() ) { + if ( BOOST_UNLIKELY( is_closed_() ) ) { return channel_op_status::closed; } else if ( is_full_() ) { return channel_op_status::full; @@ -134,11 +183,29 @@ public: slots_[pidx_] = std::move( value); pidx_ = (pidx_ + 1) % capacity_; // notify one waiting consumer - if ( ! waiting_consumers_.empty() ) { + while ( ! waiting_consumers_.empty() ) { context * consumer_ctx = & waiting_consumers_.front(); waiting_consumers_.pop_front(); lk.unlock(); - active_ctx->schedule( consumer_ctx); + std::intptr_t expected = reinterpret_cast< std::intptr_t >( this); + if ( consumer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) { + // notify before timeout + intrusive_ptr_release( consumer_ctx); + // notify context + active_ctx->schedule( consumer_ctx); + break; + } else if ( static_cast< std::intptr_t >( 0) == expected) { + // no timed-wait op. + // notify context + active_ctx->schedule( consumer_ctx); + break; + } else { + // timed-wait op. + // expected == -1: notify after timeout, same timed-wait op. + // expected == <any>: notify after timeout, another timed-wait op. was already started + intrusive_ptr_release( consumer_ctx); + // re-schedule next + } } return channel_op_status::success; } @@ -148,21 +215,40 @@ public: context * active_ctx = context::active(); for (;;) { detail::spinlock_lock lk{ splk_ }; - if ( is_closed_() ) { + if ( BOOST_UNLIKELY( is_closed_() ) ) { return channel_op_status::closed; } else if ( is_full_() ) { active_ctx->wait_link( waiting_producers_); + active_ctx->twstatus.store( static_cast< std::intptr_t >( 0), std::memory_order_release); // suspend this producer active_ctx->suspend( lk); } else { slots_[pidx_] = value; pidx_ = (pidx_ + 1) % capacity_; // notify one waiting consumer - if ( ! waiting_consumers_.empty() ) { + while ( ! waiting_consumers_.empty() ) { context * consumer_ctx = & waiting_consumers_.front(); waiting_consumers_.pop_front(); lk.unlock(); - active_ctx->schedule( consumer_ctx); + std::intptr_t expected = reinterpret_cast< std::intptr_t >( this); + if ( consumer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) { + // notify before timeout + intrusive_ptr_release( consumer_ctx); + // notify context + active_ctx->schedule( consumer_ctx); + break; + } else if ( static_cast< std::intptr_t >( 0) == expected) { + // no timed-wait op. + // notify context + active_ctx->schedule( consumer_ctx); + break; + } else { + // timed-wait op. + // expected == -1: notify after timeout, same timed-wait op. + // expected == <any>: notify after timeout, another timed-wait op. was already started + intrusive_ptr_release( consumer_ctx); + // re-schedule next + } } return channel_op_status::success; } @@ -173,21 +259,40 @@ public: context * active_ctx = context::active(); for (;;) { detail::spinlock_lock lk{ splk_ }; - if ( is_closed_() ) { + if ( BOOST_UNLIKELY( is_closed_() ) ) { return channel_op_status::closed; } else if ( is_full_() ) { active_ctx->wait_link( waiting_producers_); + active_ctx->twstatus.store( static_cast< std::intptr_t >( 0), std::memory_order_release); // suspend this producer active_ctx->suspend( lk); } else { slots_[pidx_] = std::move( value); pidx_ = (pidx_ + 1) % capacity_; // notify one waiting consumer - if ( ! waiting_consumers_.empty() ) { + while ( ! waiting_consumers_.empty() ) { context * consumer_ctx = & waiting_consumers_.front(); waiting_consumers_.pop_front(); lk.unlock(); - active_ctx->schedule( consumer_ctx); + std::intptr_t expected = reinterpret_cast< std::intptr_t >( this); + if ( consumer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) { + // notify before timeout + intrusive_ptr_release( consumer_ctx); + // notify context + active_ctx->schedule( consumer_ctx); + break; + } else if ( static_cast< std::intptr_t >( 0) == expected) { + // no timed-wait op. + // notify context + active_ctx->schedule( consumer_ctx); + break; + } else { + // timed-wait op. + // expected == -1: notify after timeout, same timed-wait op. + // expected == <any>: notify after timeout, another timed-wait op. was already started + intrusive_ptr_release( consumer_ctx); + // re-schedule next + } } return channel_op_status::success; } @@ -215,10 +320,12 @@ public: std::chrono::steady_clock::time_point timeout_time = detail::convert( timeout_time_); for (;;) { detail::spinlock_lock lk{ splk_ }; - if ( is_closed_() ) { + if ( BOOST_UNLIKELY( is_closed_() ) ) { return channel_op_status::closed; } else if ( is_full_() ) { active_ctx->wait_link( waiting_producers_); + intrusive_ptr_add_ref( active_ctx); + active_ctx->twstatus.store( reinterpret_cast< std::intptr_t >( this), std::memory_order_release); // suspend this producer if ( ! active_ctx->wait_until( timeout_time, lk) ) { // relock local lk @@ -231,11 +338,29 @@ public: slots_[pidx_] = value; pidx_ = (pidx_ + 1) % capacity_; // notify one waiting consumer - if ( ! waiting_consumers_.empty() ) { + while ( ! waiting_consumers_.empty() ) { context * consumer_ctx = & waiting_consumers_.front(); waiting_consumers_.pop_front(); lk.unlock(); - active_ctx->schedule( consumer_ctx); + std::intptr_t expected = reinterpret_cast< std::intptr_t >( this); + if ( consumer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) { + // notify before timeout + intrusive_ptr_release( consumer_ctx); + // notify context + active_ctx->schedule( consumer_ctx); + break; + } else if ( static_cast< std::intptr_t >( 0) == expected) { + // no timed-wait op. + // notify context + active_ctx->schedule( consumer_ctx); + break; + } else { + // timed-wait op. + // expected == -1: notify after timeout, same timed-wait op. + // expected == <any>: notify after timeout, another timed-wait op. was already started + intrusive_ptr_release( consumer_ctx); + // re-schedule next + } } return channel_op_status::success; } @@ -249,10 +374,12 @@ public: std::chrono::steady_clock::time_point timeout_time = detail::convert( timeout_time_); for (;;) { detail::spinlock_lock lk{ splk_ }; - if ( is_closed_() ) { + if ( BOOST_UNLIKELY( is_closed_() ) ) { return channel_op_status::closed; } else if ( is_full_() ) { active_ctx->wait_link( waiting_producers_); + intrusive_ptr_add_ref( active_ctx); + active_ctx->twstatus.store( reinterpret_cast< std::intptr_t >( this), std::memory_order_release); // suspend this producer if ( ! active_ctx->wait_until( timeout_time, lk) ) { // relock local lk @@ -265,11 +392,29 @@ public: slots_[pidx_] = std::move( value); pidx_ = (pidx_ + 1) % capacity_; // notify one waiting consumer - if ( ! waiting_consumers_.empty() ) { + while ( ! waiting_consumers_.empty() ) { context * consumer_ctx = & waiting_consumers_.front(); waiting_consumers_.pop_front(); lk.unlock(); - active_ctx->schedule( consumer_ctx); + std::intptr_t expected = reinterpret_cast< std::intptr_t >( this); + if ( consumer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) { + // notify before timeout + intrusive_ptr_release( consumer_ctx); + // notify context + active_ctx->schedule( consumer_ctx); + break; + } else if ( static_cast< std::intptr_t >( 0) == expected) { + // no timed-wait op. + // notify context + active_ctx->schedule( consumer_ctx); + break; + } else { + // timed-wait op. + // expected == -1: notify after timeout, same timed-wait op. + // expected == <any>: notify after timeout, another timed-wait op. was already started + intrusive_ptr_release( consumer_ctx); + // re-schedule next + } } return channel_op_status::success; } @@ -287,11 +432,29 @@ public: value = std::move( slots_[cidx_]); cidx_ = (cidx_ + 1) % capacity_; // notify one waiting producer - if ( ! waiting_producers_.empty() ) { + while ( ! waiting_producers_.empty() ) { context * producer_ctx = & waiting_producers_.front(); waiting_producers_.pop_front(); lk.unlock(); - active_ctx->schedule( producer_ctx); + std::intptr_t expected = reinterpret_cast< std::intptr_t >( this); + if ( producer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) { + // notify before timeout + intrusive_ptr_release( producer_ctx); + // notify context + active_ctx->schedule( producer_ctx); + break; + } else if ( static_cast< std::intptr_t >( 0) == expected) { + // no timed-wait op. + // notify context + active_ctx->schedule( producer_ctx); + break; + } else { + // timed-wait op. + // expected == -1: notify after timeout, same timed-wait op. + // expected == <any>: notify after timeout, another timed-wait op. was already started + intrusive_ptr_release( producer_ctx); + // re-schedule next + } } return channel_op_status::success; } @@ -302,10 +465,11 @@ public: for (;;) { detail::spinlock_lock lk{ splk_ }; if ( is_empty_() ) { - if ( is_closed_() ) { + if ( BOOST_UNLIKELY( is_closed_() ) ) { return channel_op_status::closed; } else { active_ctx->wait_link( waiting_consumers_); + active_ctx->twstatus.store( static_cast< std::intptr_t >( 0), std::memory_order_release); // suspend this consumer active_ctx->suspend( lk); } @@ -313,11 +477,29 @@ public: value = std::move( slots_[cidx_]); cidx_ = (cidx_ + 1) % capacity_; // notify one waiting producer - if ( ! waiting_producers_.empty() ) { + while ( ! waiting_producers_.empty() ) { context * producer_ctx = & waiting_producers_.front(); waiting_producers_.pop_front(); lk.unlock(); - active_ctx->schedule( producer_ctx); + std::intptr_t expected = reinterpret_cast< std::intptr_t >( this); + if ( producer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) { + // notify before timeout + intrusive_ptr_release( producer_ctx); + // notify context + active_ctx->schedule( producer_ctx); + break; + } else if ( static_cast< std::intptr_t >( 0) == expected) { + // no timed-wait op. + // notify context + active_ctx->schedule( producer_ctx); + break; + } else { + // timed-wait op. + // expected == -1: notify after timeout, same timed-wait op. + // expected == <any>: notify after timeout, another timed-wait op. was already started + intrusive_ptr_release( producer_ctx); + // re-schedule next + } } return channel_op_status::success; } @@ -329,12 +511,13 @@ public: for (;;) { detail::spinlock_lock lk{ splk_ }; if ( is_empty_() ) { - if ( is_closed_() ) { + if ( BOOST_UNLIKELY( is_closed_() ) ) { throw fiber_error{ std::make_error_code( std::errc::operation_not_permitted), "boost fiber: channel is closed" }; } else { active_ctx->wait_link( waiting_consumers_); + active_ctx->twstatus.store( static_cast< std::intptr_t >( 0), std::memory_order_release); // suspend this consumer active_ctx->suspend( lk); } @@ -342,11 +525,29 @@ public: value_type value = std::move( slots_[cidx_]); cidx_ = (cidx_ + 1) % capacity_; // notify one waiting producer - if ( ! waiting_producers_.empty() ) { + while ( ! waiting_producers_.empty() ) { context * producer_ctx = & waiting_producers_.front(); waiting_producers_.pop_front(); lk.unlock(); - active_ctx->schedule( producer_ctx); + std::intptr_t expected = reinterpret_cast< std::intptr_t >( this); + if ( producer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) { + // notify before timeout + intrusive_ptr_release( producer_ctx); + // notify context + active_ctx->schedule( producer_ctx); + break; + } else if ( static_cast< std::intptr_t >( 0) == expected) { + // no timed-wait op. + // notify context + active_ctx->schedule( producer_ctx); + break; + } else { + // timed-wait op. + // expected == -1: notify after timeout, same timed-wait op. + // expected == <any>: notify after timeout, another timed-wait op. was already started + intrusive_ptr_release( producer_ctx); + // re-schedule next + } } return std::move( value); } @@ -368,10 +569,12 @@ public: for (;;) { detail::spinlock_lock lk{ splk_ }; if ( is_empty_() ) { - if ( is_closed_() ) { + if ( BOOST_UNLIKELY( is_closed_() ) ) { return channel_op_status::closed; } else { active_ctx->wait_link( waiting_consumers_); + intrusive_ptr_add_ref( active_ctx); + active_ctx->twstatus.store( reinterpret_cast< std::intptr_t >( this), std::memory_order_release); // suspend this consumer if ( ! active_ctx->wait_until( timeout_time, lk) ) { // relock local lk @@ -385,11 +588,29 @@ public: value = std::move( slots_[cidx_]); cidx_ = (cidx_ + 1) % capacity_; // notify one waiting producer - if ( ! waiting_producers_.empty() ) { + while ( ! waiting_producers_.empty() ) { context * producer_ctx = & waiting_producers_.front(); waiting_producers_.pop_front(); lk.unlock(); - active_ctx->schedule( producer_ctx); + std::intptr_t expected = reinterpret_cast< std::intptr_t >( this); + if ( producer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) { + // notify before timeout + intrusive_ptr_release( producer_ctx); + // notify context + active_ctx->schedule( producer_ctx); + break; + } else if ( static_cast< std::intptr_t >( 0) == expected) { + // no timed-wait op. + // notify context + active_ctx->schedule( producer_ctx); + break; + } else { + // timed-wait op. + // expected == -1: notify after timeout, same timed-wait op. + // expected == <any>: notify after timeout, another timed-wait op. was already started + intrusive_ptr_release( producer_ctx); + // re-schedule next + } } return channel_op_status::success; } @@ -428,8 +649,9 @@ public: } iterator & operator=( iterator const& other) noexcept { - if ( this == & other) return * this; - chan_ = other.chan_; + if ( BOOST_LIKELY( this != & other) ) { + chan_ = other.chan_; + } return * this; } |