diff options
author | DongHun Kwak <dh0128.kwak@samsung.com> | 2019-12-05 15:12:59 +0900 |
---|---|---|
committer | DongHun Kwak <dh0128.kwak@samsung.com> | 2019-12-05 15:12:59 +0900 |
commit | b8cf34c691623e4ec329053cbbf68522a855882d (patch) | |
tree | 34da08632a99677f6b79ecb65e5b655a5b69a67f /boost/fiber/buffered_channel.hpp | |
parent | 3fdc3e5ee96dca5b11d1694975a65200787eab86 (diff) | |
download | boost-upstream/1.67.0.tar.gz boost-upstream/1.67.0.tar.bz2 boost-upstream/1.67.0.zip |
Imported Upstream version 1.67.0upstream/1.67.0
Diffstat (limited to 'boost/fiber/buffered_channel.hpp')
-rw-r--r-- | boost/fiber/buffered_channel.hpp | 118 |
1 files changed, 12 insertions, 106 deletions
diff --git a/boost/fiber/buffered_channel.hpp b/boost/fiber/buffered_channel.hpp index 3cf22295dd..592a6340d5 100644 --- a/boost/fiber/buffered_channel.hpp +++ b/boost/fiber/buffered_channel.hpp @@ -34,11 +34,11 @@ namespace fibers { template< typename T > class buffered_channel { public: - typedef T value_type; + typedef typename std::remove_reference< T >::type value_type; private: typedef context::wait_queue_t wait_queue_type; - typedef T slot_type; + typedef value_type slot_type; mutable detail::spinlock splk_{}; wait_queue_type waiting_producers_{}; @@ -94,20 +94,12 @@ public: waiting_producers_.pop_front(); std::intptr_t expected = reinterpret_cast< std::intptr_t >( this); if ( producer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) { - // notify before timeout - intrusive_ptr_release( producer_ctx); // notify context active_ctx->schedule( producer_ctx); } else if ( static_cast< std::intptr_t >( 0) == expected) { // no timed-wait op. // notify context active_ctx->schedule( producer_ctx); - } else { - // timed-wait op. - // expected == -1: notify after timeout, same timed-wait op. - // expected == <any>: notify after timeout, another timed-wait op. was already started - intrusive_ptr_release( producer_ctx); - // re-schedule next } } // notify all waiting consumers @@ -116,20 +108,12 @@ public: waiting_consumers_.pop_front(); std::intptr_t expected = reinterpret_cast< std::intptr_t >( this); if ( consumer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) { - // notify before timeout - intrusive_ptr_release( consumer_ctx); // notify context active_ctx->schedule( consumer_ctx); } else if ( static_cast< std::intptr_t >( 0) == expected) { // no timed-wait op. // notify context active_ctx->schedule( consumer_ctx); - } else { - // timed-wait op. - // expected == -1: notify after timeout, same timed-wait op. - // expected == <any>: notify after timeout, another timed-wait op. was already started - intrusive_ptr_release( consumer_ctx); - // re-schedule next } } } @@ -150,8 +134,6 @@ public: waiting_consumers_.pop_front(); std::intptr_t expected = reinterpret_cast< std::intptr_t >( this); if ( consumer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) { - // notify before timeout - intrusive_ptr_release( consumer_ctx); // notify context active_ctx->schedule( consumer_ctx); break; @@ -160,12 +142,6 @@ public: // notify context active_ctx->schedule( consumer_ctx); break; - } else { - // timed-wait op. - // expected == -1: notify after timeout, same timed-wait op. - // expected == <any>: notify after timeout, another timed-wait op. was already started - intrusive_ptr_release( consumer_ctx); - // re-schedule next } } return channel_op_status::success; @@ -189,8 +165,6 @@ public: lk.unlock(); std::intptr_t expected = reinterpret_cast< std::intptr_t >( this); if ( consumer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) { - // notify before timeout - intrusive_ptr_release( consumer_ctx); // notify context active_ctx->schedule( consumer_ctx); break; @@ -199,12 +173,6 @@ public: // notify context active_ctx->schedule( consumer_ctx); break; - } else { - // timed-wait op. - // expected == -1: notify after timeout, same timed-wait op. - // expected == <any>: notify after timeout, another timed-wait op. was already started - intrusive_ptr_release( consumer_ctx); - // re-schedule next } } return channel_op_status::success; @@ -232,8 +200,6 @@ public: lk.unlock(); std::intptr_t expected = reinterpret_cast< std::intptr_t >( this); if ( consumer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) { - // notify before timeout - intrusive_ptr_release( consumer_ctx); // notify context active_ctx->schedule( consumer_ctx); break; @@ -242,12 +208,6 @@ public: // notify context active_ctx->schedule( consumer_ctx); break; - } else { - // timed-wait op. - // expected == -1: notify after timeout, same timed-wait op. - // expected == <any>: notify after timeout, another timed-wait op. was already started - intrusive_ptr_release( consumer_ctx); - // re-schedule next } } return channel_op_status::success; @@ -276,8 +236,6 @@ public: lk.unlock(); std::intptr_t expected = reinterpret_cast< std::intptr_t >( this); if ( consumer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) { - // notify before timeout - intrusive_ptr_release( consumer_ctx); // notify context active_ctx->schedule( consumer_ctx); break; @@ -286,12 +244,6 @@ public: // notify context active_ctx->schedule( consumer_ctx); break; - } else { - // timed-wait op. - // expected == -1: notify after timeout, same timed-wait op. - // expected == <any>: notify after timeout, another timed-wait op. was already started - intrusive_ptr_release( consumer_ctx); - // re-schedule next } } return channel_op_status::success; @@ -324,7 +276,6 @@ public: return channel_op_status::closed; } else if ( is_full_() ) { active_ctx->wait_link( waiting_producers_); - intrusive_ptr_add_ref( active_ctx); active_ctx->twstatus.store( reinterpret_cast< std::intptr_t >( this), std::memory_order_release); // suspend this producer if ( ! active_ctx->wait_until( timeout_time, lk) ) { @@ -344,8 +295,6 @@ public: lk.unlock(); std::intptr_t expected = reinterpret_cast< std::intptr_t >( this); if ( consumer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) { - // notify before timeout - intrusive_ptr_release( consumer_ctx); // notify context active_ctx->schedule( consumer_ctx); break; @@ -354,12 +303,6 @@ public: // notify context active_ctx->schedule( consumer_ctx); break; - } else { - // timed-wait op. - // expected == -1: notify after timeout, same timed-wait op. - // expected == <any>: notify after timeout, another timed-wait op. was already started - intrusive_ptr_release( consumer_ctx); - // re-schedule next } } return channel_op_status::success; @@ -378,7 +321,6 @@ public: return channel_op_status::closed; } else if ( is_full_() ) { active_ctx->wait_link( waiting_producers_); - intrusive_ptr_add_ref( active_ctx); active_ctx->twstatus.store( reinterpret_cast< std::intptr_t >( this), std::memory_order_release); // suspend this producer if ( ! active_ctx->wait_until( timeout_time, lk) ) { @@ -398,8 +340,6 @@ public: lk.unlock(); std::intptr_t expected = reinterpret_cast< std::intptr_t >( this); if ( consumer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) { - // notify before timeout - intrusive_ptr_release( consumer_ctx); // notify context active_ctx->schedule( consumer_ctx); break; @@ -408,12 +348,6 @@ public: // notify context active_ctx->schedule( consumer_ctx); break; - } else { - // timed-wait op. - // expected == -1: notify after timeout, same timed-wait op. - // expected == <any>: notify after timeout, another timed-wait op. was already started - intrusive_ptr_release( consumer_ctx); - // re-schedule next } } return channel_op_status::success; @@ -438,8 +372,6 @@ public: lk.unlock(); std::intptr_t expected = reinterpret_cast< std::intptr_t >( this); if ( producer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) { - // notify before timeout - intrusive_ptr_release( producer_ctx); // notify context active_ctx->schedule( producer_ctx); break; @@ -448,12 +380,6 @@ public: // notify context active_ctx->schedule( producer_ctx); break; - } else { - // timed-wait op. - // expected == -1: notify after timeout, same timed-wait op. - // expected == <any>: notify after timeout, another timed-wait op. was already started - intrusive_ptr_release( producer_ctx); - // re-schedule next } } return channel_op_status::success; @@ -483,8 +409,6 @@ public: lk.unlock(); std::intptr_t expected = reinterpret_cast< std::intptr_t >( this); if ( producer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) { - // notify before timeout - intrusive_ptr_release( producer_ctx); // notify context active_ctx->schedule( producer_ctx); break; @@ -493,12 +417,6 @@ public: // notify context active_ctx->schedule( producer_ctx); break; - } else { - // timed-wait op. - // expected == -1: notify after timeout, same timed-wait op. - // expected == <any>: notify after timeout, another timed-wait op. was already started - intrusive_ptr_release( producer_ctx); - // re-schedule next } } return channel_op_status::success; @@ -531,8 +449,6 @@ public: lk.unlock(); std::intptr_t expected = reinterpret_cast< std::intptr_t >( this); if ( producer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) { - // notify before timeout - intrusive_ptr_release( producer_ctx); // notify context active_ctx->schedule( producer_ctx); break; @@ -541,12 +457,6 @@ public: // notify context active_ctx->schedule( producer_ctx); break; - } else { - // timed-wait op. - // expected == -1: notify after timeout, same timed-wait op. - // expected == <any>: notify after timeout, another timed-wait op. was already started - intrusive_ptr_release( producer_ctx); - // re-schedule next } } return std::move( value); @@ -573,7 +483,6 @@ public: return channel_op_status::closed; } else { active_ctx->wait_link( waiting_consumers_); - intrusive_ptr_add_ref( active_ctx); active_ctx->twstatus.store( reinterpret_cast< std::intptr_t >( this), std::memory_order_release); // suspend this consumer if ( ! active_ctx->wait_until( timeout_time, lk) ) { @@ -594,8 +503,6 @@ public: lk.unlock(); std::intptr_t expected = reinterpret_cast< std::intptr_t >( this); if ( producer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) { - // notify before timeout - intrusive_ptr_release( producer_ctx); // notify context active_ctx->schedule( producer_ctx); break; @@ -604,12 +511,6 @@ public: // notify context active_ctx->schedule( producer_ctx); break; - } else { - // timed-wait op. - // expected == -1: notify after timeout, same timed-wait op. - // expected == <any>: notify after timeout, another timed-wait op. was already started - intrusive_ptr_release( producer_ctx); - // re-schedule next } } return channel_op_status::success; @@ -617,12 +518,12 @@ public: } } - class iterator : public std::iterator< std::input_iterator_tag, typename std::remove_reference< value_type >::type > { + class iterator { private: typedef typename std::aligned_storage< sizeof( value_type), alignof( value_type) >::type storage_type; - buffered_channel * chan_{ nullptr }; - storage_type storage_; + buffered_channel * chan_{ nullptr }; + storage_type storage_; void increment_() { BOOST_ASSERT( nullptr != chan_); @@ -634,8 +535,13 @@ public: } public: - typedef typename iterator::pointer pointer_t; - typedef typename iterator::reference reference_t; + typedef std::input_iterator_tag iterator_category; + typedef std::ptrdiff_t difference_type; + typedef value_type * pointer; + typedef value_type & reference; + + typedef pointer pointer_t; + typedef reference reference_t; iterator() noexcept = default; |