summaryrefslogtreecommitdiff
path: root/boost/fiber/buffered_channel.hpp
diff options
context:
space:
mode:
Diffstat (limited to 'boost/fiber/buffered_channel.hpp')
-rw-r--r--boost/fiber/buffered_channel.hpp118
1 files changed, 12 insertions, 106 deletions
diff --git a/boost/fiber/buffered_channel.hpp b/boost/fiber/buffered_channel.hpp
index 3cf22295dd..592a6340d5 100644
--- a/boost/fiber/buffered_channel.hpp
+++ b/boost/fiber/buffered_channel.hpp
@@ -34,11 +34,11 @@ namespace fibers {
template< typename T >
class buffered_channel {
public:
- typedef T value_type;
+ typedef typename std::remove_reference< T >::type value_type;
private:
typedef context::wait_queue_t wait_queue_type;
- typedef T slot_type;
+ typedef value_type slot_type;
mutable detail::spinlock splk_{};
wait_queue_type waiting_producers_{};
@@ -94,20 +94,12 @@ public:
waiting_producers_.pop_front();
std::intptr_t expected = reinterpret_cast< std::intptr_t >( this);
if ( producer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) {
- // notify before timeout
- intrusive_ptr_release( producer_ctx);
// notify context
active_ctx->schedule( producer_ctx);
} else if ( static_cast< std::intptr_t >( 0) == expected) {
// no timed-wait op.
// notify context
active_ctx->schedule( producer_ctx);
- } else {
- // timed-wait op.
- // expected == -1: notify after timeout, same timed-wait op.
- // expected == <any>: notify after timeout, another timed-wait op. was already started
- intrusive_ptr_release( producer_ctx);
- // re-schedule next
}
}
// notify all waiting consumers
@@ -116,20 +108,12 @@ public:
waiting_consumers_.pop_front();
std::intptr_t expected = reinterpret_cast< std::intptr_t >( this);
if ( consumer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) {
- // notify before timeout
- intrusive_ptr_release( consumer_ctx);
// notify context
active_ctx->schedule( consumer_ctx);
} else if ( static_cast< std::intptr_t >( 0) == expected) {
// no timed-wait op.
// notify context
active_ctx->schedule( consumer_ctx);
- } else {
- // timed-wait op.
- // expected == -1: notify after timeout, same timed-wait op.
- // expected == <any>: notify after timeout, another timed-wait op. was already started
- intrusive_ptr_release( consumer_ctx);
- // re-schedule next
}
}
}
@@ -150,8 +134,6 @@ public:
waiting_consumers_.pop_front();
std::intptr_t expected = reinterpret_cast< std::intptr_t >( this);
if ( consumer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) {
- // notify before timeout
- intrusive_ptr_release( consumer_ctx);
// notify context
active_ctx->schedule( consumer_ctx);
break;
@@ -160,12 +142,6 @@ public:
// notify context
active_ctx->schedule( consumer_ctx);
break;
- } else {
- // timed-wait op.
- // expected == -1: notify after timeout, same timed-wait op.
- // expected == <any>: notify after timeout, another timed-wait op. was already started
- intrusive_ptr_release( consumer_ctx);
- // re-schedule next
}
}
return channel_op_status::success;
@@ -189,8 +165,6 @@ public:
lk.unlock();
std::intptr_t expected = reinterpret_cast< std::intptr_t >( this);
if ( consumer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) {
- // notify before timeout
- intrusive_ptr_release( consumer_ctx);
// notify context
active_ctx->schedule( consumer_ctx);
break;
@@ -199,12 +173,6 @@ public:
// notify context
active_ctx->schedule( consumer_ctx);
break;
- } else {
- // timed-wait op.
- // expected == -1: notify after timeout, same timed-wait op.
- // expected == <any>: notify after timeout, another timed-wait op. was already started
- intrusive_ptr_release( consumer_ctx);
- // re-schedule next
}
}
return channel_op_status::success;
@@ -232,8 +200,6 @@ public:
lk.unlock();
std::intptr_t expected = reinterpret_cast< std::intptr_t >( this);
if ( consumer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) {
- // notify before timeout
- intrusive_ptr_release( consumer_ctx);
// notify context
active_ctx->schedule( consumer_ctx);
break;
@@ -242,12 +208,6 @@ public:
// notify context
active_ctx->schedule( consumer_ctx);
break;
- } else {
- // timed-wait op.
- // expected == -1: notify after timeout, same timed-wait op.
- // expected == <any>: notify after timeout, another timed-wait op. was already started
- intrusive_ptr_release( consumer_ctx);
- // re-schedule next
}
}
return channel_op_status::success;
@@ -276,8 +236,6 @@ public:
lk.unlock();
std::intptr_t expected = reinterpret_cast< std::intptr_t >( this);
if ( consumer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) {
- // notify before timeout
- intrusive_ptr_release( consumer_ctx);
// notify context
active_ctx->schedule( consumer_ctx);
break;
@@ -286,12 +244,6 @@ public:
// notify context
active_ctx->schedule( consumer_ctx);
break;
- } else {
- // timed-wait op.
- // expected == -1: notify after timeout, same timed-wait op.
- // expected == <any>: notify after timeout, another timed-wait op. was already started
- intrusive_ptr_release( consumer_ctx);
- // re-schedule next
}
}
return channel_op_status::success;
@@ -324,7 +276,6 @@ public:
return channel_op_status::closed;
} else if ( is_full_() ) {
active_ctx->wait_link( waiting_producers_);
- intrusive_ptr_add_ref( active_ctx);
active_ctx->twstatus.store( reinterpret_cast< std::intptr_t >( this), std::memory_order_release);
// suspend this producer
if ( ! active_ctx->wait_until( timeout_time, lk) ) {
@@ -344,8 +295,6 @@ public:
lk.unlock();
std::intptr_t expected = reinterpret_cast< std::intptr_t >( this);
if ( consumer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) {
- // notify before timeout
- intrusive_ptr_release( consumer_ctx);
// notify context
active_ctx->schedule( consumer_ctx);
break;
@@ -354,12 +303,6 @@ public:
// notify context
active_ctx->schedule( consumer_ctx);
break;
- } else {
- // timed-wait op.
- // expected == -1: notify after timeout, same timed-wait op.
- // expected == <any>: notify after timeout, another timed-wait op. was already started
- intrusive_ptr_release( consumer_ctx);
- // re-schedule next
}
}
return channel_op_status::success;
@@ -378,7 +321,6 @@ public:
return channel_op_status::closed;
} else if ( is_full_() ) {
active_ctx->wait_link( waiting_producers_);
- intrusive_ptr_add_ref( active_ctx);
active_ctx->twstatus.store( reinterpret_cast< std::intptr_t >( this), std::memory_order_release);
// suspend this producer
if ( ! active_ctx->wait_until( timeout_time, lk) ) {
@@ -398,8 +340,6 @@ public:
lk.unlock();
std::intptr_t expected = reinterpret_cast< std::intptr_t >( this);
if ( consumer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) {
- // notify before timeout
- intrusive_ptr_release( consumer_ctx);
// notify context
active_ctx->schedule( consumer_ctx);
break;
@@ -408,12 +348,6 @@ public:
// notify context
active_ctx->schedule( consumer_ctx);
break;
- } else {
- // timed-wait op.
- // expected == -1: notify after timeout, same timed-wait op.
- // expected == <any>: notify after timeout, another timed-wait op. was already started
- intrusive_ptr_release( consumer_ctx);
- // re-schedule next
}
}
return channel_op_status::success;
@@ -438,8 +372,6 @@ public:
lk.unlock();
std::intptr_t expected = reinterpret_cast< std::intptr_t >( this);
if ( producer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) {
- // notify before timeout
- intrusive_ptr_release( producer_ctx);
// notify context
active_ctx->schedule( producer_ctx);
break;
@@ -448,12 +380,6 @@ public:
// notify context
active_ctx->schedule( producer_ctx);
break;
- } else {
- // timed-wait op.
- // expected == -1: notify after timeout, same timed-wait op.
- // expected == <any>: notify after timeout, another timed-wait op. was already started
- intrusive_ptr_release( producer_ctx);
- // re-schedule next
}
}
return channel_op_status::success;
@@ -483,8 +409,6 @@ public:
lk.unlock();
std::intptr_t expected = reinterpret_cast< std::intptr_t >( this);
if ( producer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) {
- // notify before timeout
- intrusive_ptr_release( producer_ctx);
// notify context
active_ctx->schedule( producer_ctx);
break;
@@ -493,12 +417,6 @@ public:
// notify context
active_ctx->schedule( producer_ctx);
break;
- } else {
- // timed-wait op.
- // expected == -1: notify after timeout, same timed-wait op.
- // expected == <any>: notify after timeout, another timed-wait op. was already started
- intrusive_ptr_release( producer_ctx);
- // re-schedule next
}
}
return channel_op_status::success;
@@ -531,8 +449,6 @@ public:
lk.unlock();
std::intptr_t expected = reinterpret_cast< std::intptr_t >( this);
if ( producer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) {
- // notify before timeout
- intrusive_ptr_release( producer_ctx);
// notify context
active_ctx->schedule( producer_ctx);
break;
@@ -541,12 +457,6 @@ public:
// notify context
active_ctx->schedule( producer_ctx);
break;
- } else {
- // timed-wait op.
- // expected == -1: notify after timeout, same timed-wait op.
- // expected == <any>: notify after timeout, another timed-wait op. was already started
- intrusive_ptr_release( producer_ctx);
- // re-schedule next
}
}
return std::move( value);
@@ -573,7 +483,6 @@ public:
return channel_op_status::closed;
} else {
active_ctx->wait_link( waiting_consumers_);
- intrusive_ptr_add_ref( active_ctx);
active_ctx->twstatus.store( reinterpret_cast< std::intptr_t >( this), std::memory_order_release);
// suspend this consumer
if ( ! active_ctx->wait_until( timeout_time, lk) ) {
@@ -594,8 +503,6 @@ public:
lk.unlock();
std::intptr_t expected = reinterpret_cast< std::intptr_t >( this);
if ( producer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) {
- // notify before timeout
- intrusive_ptr_release( producer_ctx);
// notify context
active_ctx->schedule( producer_ctx);
break;
@@ -604,12 +511,6 @@ public:
// notify context
active_ctx->schedule( producer_ctx);
break;
- } else {
- // timed-wait op.
- // expected == -1: notify after timeout, same timed-wait op.
- // expected == <any>: notify after timeout, another timed-wait op. was already started
- intrusive_ptr_release( producer_ctx);
- // re-schedule next
}
}
return channel_op_status::success;
@@ -617,12 +518,12 @@ public:
}
}
- class iterator : public std::iterator< std::input_iterator_tag, typename std::remove_reference< value_type >::type > {
+ class iterator {
private:
typedef typename std::aligned_storage< sizeof( value_type), alignof( value_type) >::type storage_type;
- buffered_channel * chan_{ nullptr };
- storage_type storage_;
+ buffered_channel * chan_{ nullptr };
+ storage_type storage_;
void increment_() {
BOOST_ASSERT( nullptr != chan_);
@@ -634,8 +535,13 @@ public:
}
public:
- typedef typename iterator::pointer pointer_t;
- typedef typename iterator::reference reference_t;
+ typedef std::input_iterator_tag iterator_category;
+ typedef std::ptrdiff_t difference_type;
+ typedef value_type * pointer;
+ typedef value_type & reference;
+
+ typedef pointer pointer_t;
+ typedef reference reference_t;
iterator() noexcept = default;