diff options
author | DongHun Kwak <dh0128.kwak@samsung.com> | 2017-09-13 11:24:46 +0900 |
---|---|---|
committer | DongHun Kwak <dh0128.kwak@samsung.com> | 2017-09-13 11:25:39 +0900 |
commit | 4fadd968fa12130524c8380f33fcfe25d4de79e5 (patch) | |
tree | fd26a490cd15388d42fc6652b3c5c13012e7f93e /boost/fiber/unbuffered_channel.hpp | |
parent | b5c87084afaef42b2d058f68091be31988a6a874 (diff) | |
download | boost-4fadd968fa12130524c8380f33fcfe25d4de79e5.tar.gz boost-4fadd968fa12130524c8380f33fcfe25d4de79e5.tar.bz2 boost-4fadd968fa12130524c8380f33fcfe25d4de79e5.zip |
Imported Upstream version 1.65.0upstream/1.65.0
Change-Id: Icf8400b375482cb11bcf77440a6934ba360d6ba4
Signed-off-by: DongHun Kwak <dh0128.kwak@samsung.com>
Diffstat (limited to 'boost/fiber/unbuffered_channel.hpp')
-rw-r--r-- | boost/fiber/unbuffered_channel.hpp | 252 |
1 files changed, 208 insertions, 44 deletions
diff --git a/boost/fiber/unbuffered_channel.hpp b/boost/fiber/unbuffered_channel.hpp index 38a2d6111e..1474299ded 100644 --- a/boost/fiber/unbuffered_channel.hpp +++ b/boost/fiber/unbuffered_channel.hpp @@ -38,7 +38,7 @@ public: private: typedef context::wait_queue_t wait_queue_type; - struct alignas(cache_alignment) slot { + struct slot { value_type value; context * ctx; @@ -54,12 +54,12 @@ private: }; // shared cacheline - alignas(cache_alignment) std::atomic< slot * > slot_{ nullptr }; + std::atomic< slot * > slot_{ nullptr }; // shared cacheline - alignas(cache_alignment) std::atomic_bool closed_{ false }; - alignas(cache_alignment) mutable detail::spinlock splk_producers_{}; + std::atomic_bool closed_{ false }; + mutable detail::spinlock splk_producers_{}; wait_queue_type waiting_producers_{}; - alignas( cache_alignment) mutable detail::spinlock splk_consumers_{}; + mutable detail::spinlock splk_consumers_{}; wait_queue_type waiting_consumers_{}; char pad_[cacheline_length]; @@ -98,13 +98,6 @@ public: ~unbuffered_channel() { close(); - slot * s = nullptr; - if ( nullptr != ( s = try_pop_() ) ) { - BOOST_ASSERT( nullptr != s); - BOOST_ASSERT( nullptr != s->ctx); - // value will be destructed in the context of the waiting fiber - context::active()->schedule( s->ctx); - } } unbuffered_channel( unbuffered_channel const&) = delete; @@ -122,14 +115,46 @@ public: while ( ! waiting_producers_.empty() ) { context * producer_ctx = & waiting_producers_.front(); waiting_producers_.pop_front(); - active_ctx->schedule( producer_ctx); + std::intptr_t expected = reinterpret_cast< std::intptr_t >( this); + if ( producer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) { + // notify before timeout + intrusive_ptr_release( producer_ctx); + // notify context + active_ctx->schedule( producer_ctx); + } else if ( static_cast< std::intptr_t >( 0) == expected) { + // no timed-wait op. + // notify context + active_ctx->schedule( producer_ctx); + } else { + // timed-wait op. + // expected == -1: notify after timeout, same timed-wait op. + // expected == <any>: notify after timeout, another timed-wait op. was already started + intrusive_ptr_release( producer_ctx); + // re-schedule next + } } // notify all waiting consumers detail::spinlock_lock lk2{ splk_consumers_ }; while ( ! waiting_consumers_.empty() ) { context * consumer_ctx = & waiting_consumers_.front(); waiting_consumers_.pop_front(); - active_ctx->schedule( consumer_ctx); + std::intptr_t expected = reinterpret_cast< std::intptr_t >( this); + if ( consumer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) { + // notify before timeout + intrusive_ptr_release( consumer_ctx); + // notify context + active_ctx->schedule( consumer_ctx); + } else if ( static_cast< std::intptr_t >( 0) == expected) { + // no timed-wait op. + // notify context + active_ctx->schedule( consumer_ctx); + } else { + // timed-wait op. + // expected == -1: notify after timeout, same timed-wait op. + // expected == <any>: notify after timeout, another timed-wait op. was already started + intrusive_ptr_release( consumer_ctx); + // re-schedule next + } } } @@ -137,16 +162,34 @@ public: context * active_ctx = context::active(); slot s{ value, active_ctx }; for (;;) { - if ( is_closed() ) { + if ( BOOST_UNLIKELY( is_closed() ) ) { return channel_op_status::closed; } if ( try_push_( & s) ) { detail::spinlock_lock lk{ splk_consumers_ }; // notify one waiting consumer - if ( ! waiting_consumers_.empty() ) { + while ( ! waiting_consumers_.empty() ) { context * consumer_ctx = & waiting_consumers_.front(); waiting_consumers_.pop_front(); - active_ctx->schedule( consumer_ctx); + std::intptr_t expected = reinterpret_cast< std::intptr_t >( this); + if ( consumer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) { + // notify before timeout + intrusive_ptr_release( consumer_ctx); + // notify context + active_ctx->schedule( consumer_ctx); + break; + } else if ( static_cast< std::intptr_t >( 0) == expected) { + // no timed-wait op. + // notify context + active_ctx->schedule( consumer_ctx); + break; + } else { + // timed-wait op. + // expected == -1: notify after timeout, same timed-wait op. + // expected == <any>: notify after timeout, another timed-wait op. was already started + intrusive_ptr_release( consumer_ctx); + // re-schedule next + } } // suspend till value has been consumed active_ctx->suspend( lk); @@ -154,13 +197,14 @@ public: return channel_op_status::success; } else { detail::spinlock_lock lk{ splk_producers_ }; - if ( is_closed() ) { + if ( BOOST_UNLIKELY( is_closed() ) ) { return channel_op_status::closed; } if ( is_empty_() ) { continue; } active_ctx->wait_link( waiting_producers_); + active_ctx->twstatus.store( static_cast< std::intptr_t >( 0), std::memory_order_release); // suspend this producer active_ctx->suspend( lk); // resumed, slot mabye free @@ -172,16 +216,34 @@ public: context * active_ctx = context::active(); slot s{ std::move( value), active_ctx }; for (;;) { - if ( is_closed() ) { + if ( BOOST_UNLIKELY( is_closed() ) ) { return channel_op_status::closed; } if ( try_push_( & s) ) { detail::spinlock_lock lk{ splk_consumers_ }; // notify one waiting consumer - if ( ! waiting_consumers_.empty() ) { + while ( ! waiting_consumers_.empty() ) { context * consumer_ctx = & waiting_consumers_.front(); waiting_consumers_.pop_front(); - active_ctx->schedule( consumer_ctx); + std::intptr_t expected = reinterpret_cast< std::intptr_t >( this); + if ( consumer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) { + // notify before timeout + intrusive_ptr_release( consumer_ctx); + // notify context + active_ctx->schedule( consumer_ctx); + break; + } else if ( static_cast< std::intptr_t >( 0) == expected) { + // no timed-wait op. + // notify context + active_ctx->schedule( consumer_ctx); + break; + } else { + // timed-wait op. + // expected == -1: notify after timeout, same timed-wait op. + // expected == <any>: notify after timeout, another timed-wait op. was already started + intrusive_ptr_release( consumer_ctx); + // re-schedule next + } } // suspend till value has been consumed active_ctx->suspend( lk); @@ -189,13 +251,14 @@ public: return channel_op_status::success; } else { detail::spinlock_lock lk{ splk_producers_ }; - if ( is_closed() ) { + if ( BOOST_UNLIKELY( is_closed() ) ) { return channel_op_status::closed; } if ( is_empty_() ) { continue; } active_ctx->wait_link( waiting_producers_); + active_ctx->twstatus.store( static_cast< std::intptr_t >( 0), std::memory_order_release); // suspend this producer active_ctx->suspend( lk); // resumed, slot mabye free @@ -224,18 +287,38 @@ public: slot s{ value, active_ctx }; std::chrono::steady_clock::time_point timeout_time = detail::convert( timeout_time_); for (;;) { - if ( is_closed() ) { + if ( BOOST_UNLIKELY( is_closed() ) ) { return channel_op_status::closed; } if ( try_push_( & s) ) { detail::spinlock_lock lk{ splk_consumers_ }; // notify one waiting consumer - if ( ! waiting_consumers_.empty() ) { + while ( ! waiting_consumers_.empty() ) { context * consumer_ctx = & waiting_consumers_.front(); waiting_consumers_.pop_front(); - active_ctx->schedule( consumer_ctx); + std::intptr_t expected = reinterpret_cast< std::intptr_t >( this); + if ( consumer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) { + // notify before timeout + intrusive_ptr_release( consumer_ctx); + // notify context + active_ctx->schedule( consumer_ctx); + break; + } else if ( static_cast< std::intptr_t >( 0) == expected) { + // no timed-wait op. + // notify context + active_ctx->schedule( consumer_ctx); + break; + } else { + // timed-wait op. + // expected == -1: notify after timeout, same timed-wait op. + // expected == <any>: notify after timeout, another timed-wait op. was already started + intrusive_ptr_release( consumer_ctx); + // re-schedule next + } } // suspend this producer + intrusive_ptr_add_ref( active_ctx); + active_ctx->twstatus.store( reinterpret_cast< std::intptr_t >( this), std::memory_order_release); if ( ! active_ctx->wait_until( timeout_time, lk) ) { // clear slot slot * nil_slot = nullptr, * own_slot = & s; @@ -247,13 +330,15 @@ public: return channel_op_status::success; } else { detail::spinlock_lock lk{ splk_producers_ }; - if ( is_closed() ) { + if ( BOOST_UNLIKELY( is_closed() ) ) { return channel_op_status::closed; } if ( is_empty_() ) { continue; } active_ctx->wait_link( waiting_producers_); + intrusive_ptr_add_ref( active_ctx); + active_ctx->twstatus.store( reinterpret_cast< std::intptr_t >( this), std::memory_order_release); // suspend this producer if ( ! active_ctx->wait_until( timeout_time, lk) ) { // relock local lk @@ -274,18 +359,38 @@ public: slot s{ std::move( value), active_ctx }; std::chrono::steady_clock::time_point timeout_time = detail::convert( timeout_time_); for (;;) { - if ( is_closed() ) { + if ( BOOST_UNLIKELY( is_closed() ) ) { return channel_op_status::closed; } if ( try_push_( & s) ) { detail::spinlock_lock lk{ splk_consumers_ }; // notify one waiting consumer - if ( ! waiting_consumers_.empty() ) { + while ( ! waiting_consumers_.empty() ) { context * consumer_ctx = & waiting_consumers_.front(); waiting_consumers_.pop_front(); - active_ctx->schedule( consumer_ctx); + std::intptr_t expected = reinterpret_cast< std::intptr_t >( this); + if ( consumer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) { + // notify before timeout + intrusive_ptr_release( consumer_ctx); + // notify context + active_ctx->schedule( consumer_ctx); + break; + } else if ( static_cast< std::intptr_t >( 0) == expected) { + // no timed-wait op. + // notify context + active_ctx->schedule( consumer_ctx); + break; + } else { + // timed-wait op. + // expected == -1: notify after timeout, same timed-wait op. + // expected == <any>: notify after timeout, another timed-wait op. was already started + intrusive_ptr_release( consumer_ctx); + // re-schedule next + } } // suspend this producer + intrusive_ptr_add_ref( active_ctx); + active_ctx->twstatus.store( reinterpret_cast< std::intptr_t >( this), std::memory_order_release); if ( ! active_ctx->wait_until( timeout_time, lk) ) { // clear slot slot * nil_slot = nullptr, * own_slot = & s; @@ -297,13 +402,15 @@ public: return channel_op_status::success; } else { detail::spinlock_lock lk{ splk_producers_ }; - if ( is_closed() ) { + if ( BOOST_UNLIKELY( is_closed() ) ) { return channel_op_status::closed; } if ( is_empty_() ) { continue; } active_ctx->wait_link( waiting_producers_); + intrusive_ptr_add_ref( active_ctx); + active_ctx->twstatus.store( reinterpret_cast< std::intptr_t >( this), std::memory_order_release); // suspend this producer if ( ! active_ctx->wait_until( timeout_time, lk) ) { // relock local lk @@ -325,27 +432,45 @@ public: { detail::spinlock_lock lk{ splk_producers_ }; // notify one waiting producer - if ( ! waiting_producers_.empty() ) { + while ( ! waiting_producers_.empty() ) { context * producer_ctx = & waiting_producers_.front(); waiting_producers_.pop_front(); lk.unlock(); - active_ctx->schedule( producer_ctx); + std::intptr_t expected = reinterpret_cast< std::intptr_t >( this); + if ( producer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) { + // notify before timeout + intrusive_ptr_release( producer_ctx); + // notify context + active_ctx->schedule( producer_ctx); + break; + } else if ( static_cast< std::intptr_t >( 0) == expected) { + // no timed-wait op. + // notify context + active_ctx->schedule( producer_ctx); + break; + } else { + // timed-wait op. + // expected == -1: notify after timeout, same timed-wait op. + // expected == <any>: notify after timeout, another timed-wait op. was already started + intrusive_ptr_release( producer_ctx); + // re-schedule next + } } } - // consume value value = std::move( s->value); - // resume suspended producer + // notify context active_ctx->schedule( s->ctx); return channel_op_status::success; } else { detail::spinlock_lock lk{ splk_consumers_ }; - if ( is_closed() ) { + if ( BOOST_UNLIKELY( is_closed() ) ) { return channel_op_status::closed; } if ( ! is_empty_() ) { continue; } active_ctx->wait_link( waiting_consumers_); + active_ctx->twstatus.store( static_cast< std::intptr_t >( 0), std::memory_order_release); // suspend this consumer active_ctx->suspend( lk); // resumed, slot mabye set @@ -361,21 +486,39 @@ public: { detail::spinlock_lock lk{ splk_producers_ }; // notify one waiting producer - if ( ! waiting_producers_.empty() ) { + while ( ! waiting_producers_.empty() ) { context * producer_ctx = & waiting_producers_.front(); waiting_producers_.pop_front(); lk.unlock(); - active_ctx->schedule( producer_ctx); + std::intptr_t expected = reinterpret_cast< std::intptr_t >( this); + if ( producer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) { + // notify before timeout + intrusive_ptr_release( producer_ctx); + // notify context + active_ctx->schedule( producer_ctx); + break; + } else if ( static_cast< std::intptr_t >( 0) == expected) { + // no timed-wait op. + // notify context + active_ctx->schedule( producer_ctx); + break; + } else { + // timed-wait op. + // expected == -1: notify after timeout, same timed-wait op. + // expected == <any>: notify after timeout, another timed-wait op. was already started + intrusive_ptr_release( producer_ctx); + // re-schedule next + } } } // consume value - value_type value{ std::move( s->value) }; - // resume suspended producer + value_type value = std::move( s->value); + // notify context active_ctx->schedule( s->ctx); return std::move( value); } else { detail::spinlock_lock lk{ splk_consumers_ }; - if ( is_closed() ) { + if ( BOOST_UNLIKELY( is_closed() ) ) { throw fiber_error{ std::make_error_code( std::errc::operation_not_permitted), "boost fiber: channel is closed" }; @@ -384,6 +527,7 @@ public: continue; } active_ctx->wait_link( waiting_consumers_); + active_ctx->twstatus.store( static_cast< std::intptr_t >( 0), std::memory_order_release); // suspend this consumer active_ctx->suspend( lk); // resumed, slot mabye set @@ -409,27 +553,47 @@ public: { detail::spinlock_lock lk{ splk_producers_ }; // notify one waiting producer - if ( ! waiting_producers_.empty() ) { + while ( ! waiting_producers_.empty() ) { context * producer_ctx = & waiting_producers_.front(); waiting_producers_.pop_front(); lk.unlock(); - active_ctx->schedule( producer_ctx); + std::intptr_t expected = reinterpret_cast< std::intptr_t >( this); + if ( producer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) { + // notify before timeout + intrusive_ptr_release( producer_ctx); + // notify context + active_ctx->schedule( producer_ctx); + break; + } else if ( static_cast< std::intptr_t >( 0) == expected) { + // no timed-wait op. + // notify context + active_ctx->schedule( producer_ctx); + break; + } else { + // timed-wait op. + // expected == -1: notify after timeout, same timed-wait op. + // expected == <any>: notify after timeout, another timed-wait op. was already started + intrusive_ptr_release( producer_ctx); + // re-schedule next + } } } // consume value value = std::move( s->value); - // resume suspended producer + // notify context active_ctx->schedule( s->ctx); return channel_op_status::success; } else { detail::spinlock_lock lk{ splk_consumers_ }; - if ( is_closed() ) { + if ( BOOST_UNLIKELY( is_closed() ) ) { return channel_op_status::closed; } if ( ! is_empty_() ) { continue; } active_ctx->wait_link( waiting_consumers_); + intrusive_ptr_add_ref( active_ctx); + active_ctx->twstatus.store( reinterpret_cast< std::intptr_t >( this), std::memory_order_release); // suspend this consumer if ( ! active_ctx->wait_until( timeout_time, lk) ) { // relock local lk |