summaryrefslogtreecommitdiff
path: root/boost/fiber/unbuffered_channel.hpp
diff options
context:
space:
mode:
Diffstat (limited to 'boost/fiber/unbuffered_channel.hpp')
-rw-r--r--boost/fiber/unbuffered_channel.hpp104
1 files changed, 16 insertions, 88 deletions
diff --git a/boost/fiber/unbuffered_channel.hpp b/boost/fiber/unbuffered_channel.hpp
index b2717b0068..2236d9e22f 100644
--- a/boost/fiber/unbuffered_channel.hpp
+++ b/boost/fiber/unbuffered_channel.hpp
@@ -33,7 +33,7 @@ namespace fibers {
template< typename T >
class unbuffered_channel {
public:
- typedef T value_type;
+ typedef typename std::remove_reference< T >::type value_type;
private:
typedef context::wait_queue_t wait_queue_type;
@@ -54,14 +54,14 @@ private:
};
// shared cacheline
- std::atomic< slot * > slot_{ nullptr };
+ std::atomic< slot * > slot_{ nullptr };
// shared cacheline
- std::atomic_bool closed_{ false };
- mutable detail::spinlock splk_producers_{};
- wait_queue_type waiting_producers_{};
- mutable detail::spinlock splk_consumers_{};
- wait_queue_type waiting_consumers_{};
- char pad_[cacheline_length];
+ std::atomic_bool closed_{ false };
+ mutable detail::spinlock splk_producers_{};
+ wait_queue_type waiting_producers_{};
+ mutable detail::spinlock splk_consumers_{};
+ wait_queue_type waiting_consumers_{};
+ char pad_[cacheline_length];
bool is_empty_() {
return nullptr == slot_.load( std::memory_order_acquire);
@@ -118,20 +118,12 @@ public:
waiting_producers_.pop_front();
std::intptr_t expected = reinterpret_cast< std::intptr_t >( this);
if ( producer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) {
- // notify before timeout
- intrusive_ptr_release( producer_ctx);
// notify context
active_ctx->schedule( producer_ctx);
} else if ( static_cast< std::intptr_t >( 0) == expected) {
// no timed-wait op.
// notify context
active_ctx->schedule( producer_ctx);
- } else {
- // timed-wait op.
- // expected == -1: notify after timeout, same timed-wait op.
- // expected == <any>: notify after timeout, another timed-wait op. was already started
- intrusive_ptr_release( producer_ctx);
- // re-schedule next
}
}
// notify all waiting consumers
@@ -141,20 +133,12 @@ public:
waiting_consumers_.pop_front();
std::intptr_t expected = reinterpret_cast< std::intptr_t >( this);
if ( consumer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) {
- // notify before timeout
- intrusive_ptr_release( consumer_ctx);
// notify context
active_ctx->schedule( consumer_ctx);
} else if ( static_cast< std::intptr_t >( 0) == expected) {
// no timed-wait op.
// notify context
active_ctx->schedule( consumer_ctx);
- } else {
- // timed-wait op.
- // expected == -1: notify after timeout, same timed-wait op.
- // expected == <any>: notify after timeout, another timed-wait op. was already started
- intrusive_ptr_release( consumer_ctx);
- // re-schedule next
}
}
}
@@ -174,8 +158,6 @@ public:
waiting_consumers_.pop_front();
std::intptr_t expected = reinterpret_cast< std::intptr_t >( this);
if ( consumer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) {
- // notify before timeout
- intrusive_ptr_release( consumer_ctx);
// notify context
active_ctx->schedule( consumer_ctx);
break;
@@ -184,12 +166,6 @@ public:
// notify context
active_ctx->schedule( consumer_ctx);
break;
- } else {
- // timed-wait op.
- // expected == -1: notify after timeout, same timed-wait op.
- // expected == <any>: notify after timeout, another timed-wait op. was already started
- intrusive_ptr_release( consumer_ctx);
- // re-schedule next
}
}
// suspend till value has been consumed
@@ -228,8 +204,6 @@ public:
waiting_consumers_.pop_front();
std::intptr_t expected = reinterpret_cast< std::intptr_t >( this);
if ( consumer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) {
- // notify before timeout
- intrusive_ptr_release( consumer_ctx);
// notify context
active_ctx->schedule( consumer_ctx);
break;
@@ -238,12 +212,6 @@ public:
// notify context
active_ctx->schedule( consumer_ctx);
break;
- } else {
- // timed-wait op.
- // expected == -1: notify after timeout, same timed-wait op.
- // expected == <any>: notify after timeout, another timed-wait op. was already started
- intrusive_ptr_release( consumer_ctx);
- // re-schedule next
}
}
// suspend till value has been consumed
@@ -299,8 +267,6 @@ public:
waiting_consumers_.pop_front();
std::intptr_t expected = reinterpret_cast< std::intptr_t >( this);
if ( consumer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) {
- // notify before timeout
- intrusive_ptr_release( consumer_ctx);
// notify context
active_ctx->schedule( consumer_ctx);
break;
@@ -309,16 +275,9 @@ public:
// notify context
active_ctx->schedule( consumer_ctx);
break;
- } else {
- // timed-wait op.
- // expected == -1: notify after timeout, same timed-wait op.
- // expected == <any>: notify after timeout, another timed-wait op. was already started
- intrusive_ptr_release( consumer_ctx);
- // re-schedule next
}
}
// suspend this producer
- intrusive_ptr_add_ref( active_ctx);
active_ctx->twstatus.store( reinterpret_cast< std::intptr_t >( this), std::memory_order_release);
if ( ! active_ctx->wait_until( timeout_time, lk) ) {
// clear slot
@@ -338,7 +297,6 @@ public:
continue;
}
active_ctx->wait_link( waiting_producers_);
- intrusive_ptr_add_ref( active_ctx);
active_ctx->twstatus.store( reinterpret_cast< std::intptr_t >( this), std::memory_order_release);
// suspend this producer
if ( ! active_ctx->wait_until( timeout_time, lk) ) {
@@ -371,8 +329,6 @@ public:
waiting_consumers_.pop_front();
std::intptr_t expected = reinterpret_cast< std::intptr_t >( this);
if ( consumer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) {
- // notify before timeout
- intrusive_ptr_release( consumer_ctx);
// notify context
active_ctx->schedule( consumer_ctx);
break;
@@ -381,16 +337,9 @@ public:
// notify context
active_ctx->schedule( consumer_ctx);
break;
- } else {
- // timed-wait op.
- // expected == -1: notify after timeout, same timed-wait op.
- // expected == <any>: notify after timeout, another timed-wait op. was already started
- intrusive_ptr_release( consumer_ctx);
- // re-schedule next
}
}
// suspend this producer
- intrusive_ptr_add_ref( active_ctx);
active_ctx->twstatus.store( reinterpret_cast< std::intptr_t >( this), std::memory_order_release);
if ( ! active_ctx->wait_until( timeout_time, lk) ) {
// clear slot
@@ -410,7 +359,6 @@ public:
continue;
}
active_ctx->wait_link( waiting_producers_);
- intrusive_ptr_add_ref( active_ctx);
active_ctx->twstatus.store( reinterpret_cast< std::intptr_t >( this), std::memory_order_release);
// suspend this producer
if ( ! active_ctx->wait_until( timeout_time, lk) ) {
@@ -439,8 +387,6 @@ public:
lk.unlock();
std::intptr_t expected = reinterpret_cast< std::intptr_t >( this);
if ( producer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) {
- // notify before timeout
- intrusive_ptr_release( producer_ctx);
// notify context
active_ctx->schedule( producer_ctx);
break;
@@ -449,12 +395,6 @@ public:
// notify context
active_ctx->schedule( producer_ctx);
break;
- } else {
- // timed-wait op.
- // expected == -1: notify after timeout, same timed-wait op.
- // expected == <any>: notify after timeout, another timed-wait op. was already started
- intrusive_ptr_release( producer_ctx);
- // re-schedule next
}
}
}
@@ -493,8 +433,6 @@ public:
lk.unlock();
std::intptr_t expected = reinterpret_cast< std::intptr_t >( this);
if ( producer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) {
- // notify before timeout
- intrusive_ptr_release( producer_ctx);
// notify context
active_ctx->schedule( producer_ctx);
break;
@@ -503,12 +441,6 @@ public:
// notify context
active_ctx->schedule( producer_ctx);
break;
- } else {
- // timed-wait op.
- // expected == -1: notify after timeout, same timed-wait op.
- // expected == <any>: notify after timeout, another timed-wait op. was already started
- intrusive_ptr_release( producer_ctx);
- // re-schedule next
}
}
}
@@ -560,8 +492,6 @@ public:
lk.unlock();
std::intptr_t expected = reinterpret_cast< std::intptr_t >( this);
if ( producer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) {
- // notify before timeout
- intrusive_ptr_release( producer_ctx);
// notify context
active_ctx->schedule( producer_ctx);
break;
@@ -570,12 +500,6 @@ public:
// notify context
active_ctx->schedule( producer_ctx);
break;
- } else {
- // timed-wait op.
- // expected == -1: notify after timeout, same timed-wait op.
- // expected == <any>: notify after timeout, another timed-wait op. was already started
- intrusive_ptr_release( producer_ctx);
- // re-schedule next
}
}
}
@@ -593,7 +517,6 @@ public:
continue;
}
active_ctx->wait_link( waiting_consumers_);
- intrusive_ptr_add_ref( active_ctx);
active_ctx->twstatus.store( reinterpret_cast< std::intptr_t >( this), std::memory_order_release);
// suspend this consumer
if ( ! active_ctx->wait_until( timeout_time, lk) ) {
@@ -607,7 +530,7 @@ public:
}
}
- class iterator : public std::iterator< std::input_iterator_tag, typename std::remove_reference< value_type >::type > {
+ class iterator {
private:
typedef typename std::aligned_storage< sizeof( value_type), alignof( value_type) >::type storage_type;
@@ -624,8 +547,13 @@ public:
}
public:
- typedef typename iterator::pointer pointer_t;
- typedef typename iterator::reference reference_t;
+ typedef std::input_iterator_tag iterator_category;
+ typedef std::ptrdiff_t difference_type;
+ typedef value_type * pointer;
+ typedef value_type & reference;
+
+ typedef pointer pointer_t;
+ typedef reference reference_t;
iterator() noexcept = default;