diff options
Diffstat (limited to 'boost/fiber/unbuffered_channel.hpp')
-rw-r--r-- | boost/fiber/unbuffered_channel.hpp | 104 |
1 files changed, 16 insertions, 88 deletions
diff --git a/boost/fiber/unbuffered_channel.hpp b/boost/fiber/unbuffered_channel.hpp index b2717b0068..2236d9e22f 100644 --- a/boost/fiber/unbuffered_channel.hpp +++ b/boost/fiber/unbuffered_channel.hpp @@ -33,7 +33,7 @@ namespace fibers { template< typename T > class unbuffered_channel { public: - typedef T value_type; + typedef typename std::remove_reference< T >::type value_type; private: typedef context::wait_queue_t wait_queue_type; @@ -54,14 +54,14 @@ private: }; // shared cacheline - std::atomic< slot * > slot_{ nullptr }; + std::atomic< slot * > slot_{ nullptr }; // shared cacheline - std::atomic_bool closed_{ false }; - mutable detail::spinlock splk_producers_{}; - wait_queue_type waiting_producers_{}; - mutable detail::spinlock splk_consumers_{}; - wait_queue_type waiting_consumers_{}; - char pad_[cacheline_length]; + std::atomic_bool closed_{ false }; + mutable detail::spinlock splk_producers_{}; + wait_queue_type waiting_producers_{}; + mutable detail::spinlock splk_consumers_{}; + wait_queue_type waiting_consumers_{}; + char pad_[cacheline_length]; bool is_empty_() { return nullptr == slot_.load( std::memory_order_acquire); @@ -118,20 +118,12 @@ public: waiting_producers_.pop_front(); std::intptr_t expected = reinterpret_cast< std::intptr_t >( this); if ( producer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) { - // notify before timeout - intrusive_ptr_release( producer_ctx); // notify context active_ctx->schedule( producer_ctx); } else if ( static_cast< std::intptr_t >( 0) == expected) { // no timed-wait op. // notify context active_ctx->schedule( producer_ctx); - } else { - // timed-wait op. - // expected == -1: notify after timeout, same timed-wait op. - // expected == <any>: notify after timeout, another timed-wait op. was already started - intrusive_ptr_release( producer_ctx); - // re-schedule next } } // notify all waiting consumers @@ -141,20 +133,12 @@ public: waiting_consumers_.pop_front(); std::intptr_t expected = reinterpret_cast< std::intptr_t >( this); if ( consumer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) { - // notify before timeout - intrusive_ptr_release( consumer_ctx); // notify context active_ctx->schedule( consumer_ctx); } else if ( static_cast< std::intptr_t >( 0) == expected) { // no timed-wait op. // notify context active_ctx->schedule( consumer_ctx); - } else { - // timed-wait op. - // expected == -1: notify after timeout, same timed-wait op. - // expected == <any>: notify after timeout, another timed-wait op. was already started - intrusive_ptr_release( consumer_ctx); - // re-schedule next } } } @@ -174,8 +158,6 @@ public: waiting_consumers_.pop_front(); std::intptr_t expected = reinterpret_cast< std::intptr_t >( this); if ( consumer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) { - // notify before timeout - intrusive_ptr_release( consumer_ctx); // notify context active_ctx->schedule( consumer_ctx); break; @@ -184,12 +166,6 @@ public: // notify context active_ctx->schedule( consumer_ctx); break; - } else { - // timed-wait op. - // expected == -1: notify after timeout, same timed-wait op. - // expected == <any>: notify after timeout, another timed-wait op. was already started - intrusive_ptr_release( consumer_ctx); - // re-schedule next } } // suspend till value has been consumed @@ -228,8 +204,6 @@ public: waiting_consumers_.pop_front(); std::intptr_t expected = reinterpret_cast< std::intptr_t >( this); if ( consumer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) { - // notify before timeout - intrusive_ptr_release( consumer_ctx); // notify context active_ctx->schedule( consumer_ctx); break; @@ -238,12 +212,6 @@ public: // notify context active_ctx->schedule( consumer_ctx); break; - } else { - // timed-wait op. - // expected == -1: notify after timeout, same timed-wait op. - // expected == <any>: notify after timeout, another timed-wait op. was already started - intrusive_ptr_release( consumer_ctx); - // re-schedule next } } // suspend till value has been consumed @@ -299,8 +267,6 @@ public: waiting_consumers_.pop_front(); std::intptr_t expected = reinterpret_cast< std::intptr_t >( this); if ( consumer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) { - // notify before timeout - intrusive_ptr_release( consumer_ctx); // notify context active_ctx->schedule( consumer_ctx); break; @@ -309,16 +275,9 @@ public: // notify context active_ctx->schedule( consumer_ctx); break; - } else { - // timed-wait op. - // expected == -1: notify after timeout, same timed-wait op. - // expected == <any>: notify after timeout, another timed-wait op. was already started - intrusive_ptr_release( consumer_ctx); - // re-schedule next } } // suspend this producer - intrusive_ptr_add_ref( active_ctx); active_ctx->twstatus.store( reinterpret_cast< std::intptr_t >( this), std::memory_order_release); if ( ! active_ctx->wait_until( timeout_time, lk) ) { // clear slot @@ -338,7 +297,6 @@ public: continue; } active_ctx->wait_link( waiting_producers_); - intrusive_ptr_add_ref( active_ctx); active_ctx->twstatus.store( reinterpret_cast< std::intptr_t >( this), std::memory_order_release); // suspend this producer if ( ! active_ctx->wait_until( timeout_time, lk) ) { @@ -371,8 +329,6 @@ public: waiting_consumers_.pop_front(); std::intptr_t expected = reinterpret_cast< std::intptr_t >( this); if ( consumer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) { - // notify before timeout - intrusive_ptr_release( consumer_ctx); // notify context active_ctx->schedule( consumer_ctx); break; @@ -381,16 +337,9 @@ public: // notify context active_ctx->schedule( consumer_ctx); break; - } else { - // timed-wait op. - // expected == -1: notify after timeout, same timed-wait op. - // expected == <any>: notify after timeout, another timed-wait op. was already started - intrusive_ptr_release( consumer_ctx); - // re-schedule next } } // suspend this producer - intrusive_ptr_add_ref( active_ctx); active_ctx->twstatus.store( reinterpret_cast< std::intptr_t >( this), std::memory_order_release); if ( ! active_ctx->wait_until( timeout_time, lk) ) { // clear slot @@ -410,7 +359,6 @@ public: continue; } active_ctx->wait_link( waiting_producers_); - intrusive_ptr_add_ref( active_ctx); active_ctx->twstatus.store( reinterpret_cast< std::intptr_t >( this), std::memory_order_release); // suspend this producer if ( ! active_ctx->wait_until( timeout_time, lk) ) { @@ -439,8 +387,6 @@ public: lk.unlock(); std::intptr_t expected = reinterpret_cast< std::intptr_t >( this); if ( producer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) { - // notify before timeout - intrusive_ptr_release( producer_ctx); // notify context active_ctx->schedule( producer_ctx); break; @@ -449,12 +395,6 @@ public: // notify context active_ctx->schedule( producer_ctx); break; - } else { - // timed-wait op. - // expected == -1: notify after timeout, same timed-wait op. - // expected == <any>: notify after timeout, another timed-wait op. was already started - intrusive_ptr_release( producer_ctx); - // re-schedule next } } } @@ -493,8 +433,6 @@ public: lk.unlock(); std::intptr_t expected = reinterpret_cast< std::intptr_t >( this); if ( producer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) { - // notify before timeout - intrusive_ptr_release( producer_ctx); // notify context active_ctx->schedule( producer_ctx); break; @@ -503,12 +441,6 @@ public: // notify context active_ctx->schedule( producer_ctx); break; - } else { - // timed-wait op. - // expected == -1: notify after timeout, same timed-wait op. - // expected == <any>: notify after timeout, another timed-wait op. was already started - intrusive_ptr_release( producer_ctx); - // re-schedule next } } } @@ -560,8 +492,6 @@ public: lk.unlock(); std::intptr_t expected = reinterpret_cast< std::intptr_t >( this); if ( producer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) { - // notify before timeout - intrusive_ptr_release( producer_ctx); // notify context active_ctx->schedule( producer_ctx); break; @@ -570,12 +500,6 @@ public: // notify context active_ctx->schedule( producer_ctx); break; - } else { - // timed-wait op. - // expected == -1: notify after timeout, same timed-wait op. - // expected == <any>: notify after timeout, another timed-wait op. was already started - intrusive_ptr_release( producer_ctx); - // re-schedule next } } } @@ -593,7 +517,6 @@ public: continue; } active_ctx->wait_link( waiting_consumers_); - intrusive_ptr_add_ref( active_ctx); active_ctx->twstatus.store( reinterpret_cast< std::intptr_t >( this), std::memory_order_release); // suspend this consumer if ( ! active_ctx->wait_until( timeout_time, lk) ) { @@ -607,7 +530,7 @@ public: } } - class iterator : public std::iterator< std::input_iterator_tag, typename std::remove_reference< value_type >::type > { + class iterator { private: typedef typename std::aligned_storage< sizeof( value_type), alignof( value_type) >::type storage_type; @@ -624,8 +547,13 @@ public: } public: - typedef typename iterator::pointer pointer_t; - typedef typename iterator::reference reference_t; + typedef std::input_iterator_tag iterator_category; + typedef std::ptrdiff_t difference_type; + typedef value_type * pointer; + typedef value_type & reference; + + typedef pointer pointer_t; + typedef reference reference_t; iterator() noexcept = default; |