summaryrefslogtreecommitdiff
path: root/boost/fiber/unbuffered_channel.hpp
diff options
context:
space:
mode:
Diffstat (limited to 'boost/fiber/unbuffered_channel.hpp')
-rw-r--r--boost/fiber/unbuffered_channel.hpp252
1 files changed, 208 insertions, 44 deletions
diff --git a/boost/fiber/unbuffered_channel.hpp b/boost/fiber/unbuffered_channel.hpp
index 38a2d6111e..1474299ded 100644
--- a/boost/fiber/unbuffered_channel.hpp
+++ b/boost/fiber/unbuffered_channel.hpp
@@ -38,7 +38,7 @@ public:
private:
typedef context::wait_queue_t wait_queue_type;
- struct alignas(cache_alignment) slot {
+ struct slot {
value_type value;
context * ctx;
@@ -54,12 +54,12 @@ private:
};
// shared cacheline
- alignas(cache_alignment) std::atomic< slot * > slot_{ nullptr };
+ std::atomic< slot * > slot_{ nullptr };
// shared cacheline
- alignas(cache_alignment) std::atomic_bool closed_{ false };
- alignas(cache_alignment) mutable detail::spinlock splk_producers_{};
+ std::atomic_bool closed_{ false };
+ mutable detail::spinlock splk_producers_{};
wait_queue_type waiting_producers_{};
- alignas( cache_alignment) mutable detail::spinlock splk_consumers_{};
+ mutable detail::spinlock splk_consumers_{};
wait_queue_type waiting_consumers_{};
char pad_[cacheline_length];
@@ -98,13 +98,6 @@ public:
~unbuffered_channel() {
close();
- slot * s = nullptr;
- if ( nullptr != ( s = try_pop_() ) ) {
- BOOST_ASSERT( nullptr != s);
- BOOST_ASSERT( nullptr != s->ctx);
- // value will be destructed in the context of the waiting fiber
- context::active()->schedule( s->ctx);
- }
}
unbuffered_channel( unbuffered_channel const&) = delete;
@@ -122,14 +115,46 @@ public:
while ( ! waiting_producers_.empty() ) {
context * producer_ctx = & waiting_producers_.front();
waiting_producers_.pop_front();
- active_ctx->schedule( producer_ctx);
+ std::intptr_t expected = reinterpret_cast< std::intptr_t >( this);
+ if ( producer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) {
+ // notify before timeout
+ intrusive_ptr_release( producer_ctx);
+ // notify context
+ active_ctx->schedule( producer_ctx);
+ } else if ( static_cast< std::intptr_t >( 0) == expected) {
+ // no timed-wait op.
+ // notify context
+ active_ctx->schedule( producer_ctx);
+ } else {
+ // timed-wait op.
+ // expected == -1: notify after timeout, same timed-wait op.
+ // expected == <any>: notify after timeout, another timed-wait op. was already started
+ intrusive_ptr_release( producer_ctx);
+ // re-schedule next
+ }
}
// notify all waiting consumers
detail::spinlock_lock lk2{ splk_consumers_ };
while ( ! waiting_consumers_.empty() ) {
context * consumer_ctx = & waiting_consumers_.front();
waiting_consumers_.pop_front();
- active_ctx->schedule( consumer_ctx);
+ std::intptr_t expected = reinterpret_cast< std::intptr_t >( this);
+ if ( consumer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) {
+ // notify before timeout
+ intrusive_ptr_release( consumer_ctx);
+ // notify context
+ active_ctx->schedule( consumer_ctx);
+ } else if ( static_cast< std::intptr_t >( 0) == expected) {
+ // no timed-wait op.
+ // notify context
+ active_ctx->schedule( consumer_ctx);
+ } else {
+ // timed-wait op.
+ // expected == -1: notify after timeout, same timed-wait op.
+ // expected == <any>: notify after timeout, another timed-wait op. was already started
+ intrusive_ptr_release( consumer_ctx);
+ // re-schedule next
+ }
}
}
@@ -137,16 +162,34 @@ public:
context * active_ctx = context::active();
slot s{ value, active_ctx };
for (;;) {
- if ( is_closed() ) {
+ if ( BOOST_UNLIKELY( is_closed() ) ) {
return channel_op_status::closed;
}
if ( try_push_( & s) ) {
detail::spinlock_lock lk{ splk_consumers_ };
// notify one waiting consumer
- if ( ! waiting_consumers_.empty() ) {
+ while ( ! waiting_consumers_.empty() ) {
context * consumer_ctx = & waiting_consumers_.front();
waiting_consumers_.pop_front();
- active_ctx->schedule( consumer_ctx);
+ std::intptr_t expected = reinterpret_cast< std::intptr_t >( this);
+ if ( consumer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) {
+ // notify before timeout
+ intrusive_ptr_release( consumer_ctx);
+ // notify context
+ active_ctx->schedule( consumer_ctx);
+ break;
+ } else if ( static_cast< std::intptr_t >( 0) == expected) {
+ // no timed-wait op.
+ // notify context
+ active_ctx->schedule( consumer_ctx);
+ break;
+ } else {
+ // timed-wait op.
+ // expected == -1: notify after timeout, same timed-wait op.
+ // expected == <any>: notify after timeout, another timed-wait op. was already started
+ intrusive_ptr_release( consumer_ctx);
+ // re-schedule next
+ }
}
// suspend till value has been consumed
active_ctx->suspend( lk);
@@ -154,13 +197,14 @@ public:
return channel_op_status::success;
} else {
detail::spinlock_lock lk{ splk_producers_ };
- if ( is_closed() ) {
+ if ( BOOST_UNLIKELY( is_closed() ) ) {
return channel_op_status::closed;
}
if ( is_empty_() ) {
continue;
}
active_ctx->wait_link( waiting_producers_);
+ active_ctx->twstatus.store( static_cast< std::intptr_t >( 0), std::memory_order_release);
// suspend this producer
active_ctx->suspend( lk);
// resumed, slot mabye free
@@ -172,16 +216,34 @@ public:
context * active_ctx = context::active();
slot s{ std::move( value), active_ctx };
for (;;) {
- if ( is_closed() ) {
+ if ( BOOST_UNLIKELY( is_closed() ) ) {
return channel_op_status::closed;
}
if ( try_push_( & s) ) {
detail::spinlock_lock lk{ splk_consumers_ };
// notify one waiting consumer
- if ( ! waiting_consumers_.empty() ) {
+ while ( ! waiting_consumers_.empty() ) {
context * consumer_ctx = & waiting_consumers_.front();
waiting_consumers_.pop_front();
- active_ctx->schedule( consumer_ctx);
+ std::intptr_t expected = reinterpret_cast< std::intptr_t >( this);
+ if ( consumer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) {
+ // notify before timeout
+ intrusive_ptr_release( consumer_ctx);
+ // notify context
+ active_ctx->schedule( consumer_ctx);
+ break;
+ } else if ( static_cast< std::intptr_t >( 0) == expected) {
+ // no timed-wait op.
+ // notify context
+ active_ctx->schedule( consumer_ctx);
+ break;
+ } else {
+ // timed-wait op.
+ // expected == -1: notify after timeout, same timed-wait op.
+ // expected == <any>: notify after timeout, another timed-wait op. was already started
+ intrusive_ptr_release( consumer_ctx);
+ // re-schedule next
+ }
}
// suspend till value has been consumed
active_ctx->suspend( lk);
@@ -189,13 +251,14 @@ public:
return channel_op_status::success;
} else {
detail::spinlock_lock lk{ splk_producers_ };
- if ( is_closed() ) {
+ if ( BOOST_UNLIKELY( is_closed() ) ) {
return channel_op_status::closed;
}
if ( is_empty_() ) {
continue;
}
active_ctx->wait_link( waiting_producers_);
+ active_ctx->twstatus.store( static_cast< std::intptr_t >( 0), std::memory_order_release);
// suspend this producer
active_ctx->suspend( lk);
// resumed, slot mabye free
@@ -224,18 +287,38 @@ public:
slot s{ value, active_ctx };
std::chrono::steady_clock::time_point timeout_time = detail::convert( timeout_time_);
for (;;) {
- if ( is_closed() ) {
+ if ( BOOST_UNLIKELY( is_closed() ) ) {
return channel_op_status::closed;
}
if ( try_push_( & s) ) {
detail::spinlock_lock lk{ splk_consumers_ };
// notify one waiting consumer
- if ( ! waiting_consumers_.empty() ) {
+ while ( ! waiting_consumers_.empty() ) {
context * consumer_ctx = & waiting_consumers_.front();
waiting_consumers_.pop_front();
- active_ctx->schedule( consumer_ctx);
+ std::intptr_t expected = reinterpret_cast< std::intptr_t >( this);
+ if ( consumer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) {
+ // notify before timeout
+ intrusive_ptr_release( consumer_ctx);
+ // notify context
+ active_ctx->schedule( consumer_ctx);
+ break;
+ } else if ( static_cast< std::intptr_t >( 0) == expected) {
+ // no timed-wait op.
+ // notify context
+ active_ctx->schedule( consumer_ctx);
+ break;
+ } else {
+ // timed-wait op.
+ // expected == -1: notify after timeout, same timed-wait op.
+ // expected == <any>: notify after timeout, another timed-wait op. was already started
+ intrusive_ptr_release( consumer_ctx);
+ // re-schedule next
+ }
}
// suspend this producer
+ intrusive_ptr_add_ref( active_ctx);
+ active_ctx->twstatus.store( reinterpret_cast< std::intptr_t >( this), std::memory_order_release);
if ( ! active_ctx->wait_until( timeout_time, lk) ) {
// clear slot
slot * nil_slot = nullptr, * own_slot = & s;
@@ -247,13 +330,15 @@ public:
return channel_op_status::success;
} else {
detail::spinlock_lock lk{ splk_producers_ };
- if ( is_closed() ) {
+ if ( BOOST_UNLIKELY( is_closed() ) ) {
return channel_op_status::closed;
}
if ( is_empty_() ) {
continue;
}
active_ctx->wait_link( waiting_producers_);
+ intrusive_ptr_add_ref( active_ctx);
+ active_ctx->twstatus.store( reinterpret_cast< std::intptr_t >( this), std::memory_order_release);
// suspend this producer
if ( ! active_ctx->wait_until( timeout_time, lk) ) {
// relock local lk
@@ -274,18 +359,38 @@ public:
slot s{ std::move( value), active_ctx };
std::chrono::steady_clock::time_point timeout_time = detail::convert( timeout_time_);
for (;;) {
- if ( is_closed() ) {
+ if ( BOOST_UNLIKELY( is_closed() ) ) {
return channel_op_status::closed;
}
if ( try_push_( & s) ) {
detail::spinlock_lock lk{ splk_consumers_ };
// notify one waiting consumer
- if ( ! waiting_consumers_.empty() ) {
+ while ( ! waiting_consumers_.empty() ) {
context * consumer_ctx = & waiting_consumers_.front();
waiting_consumers_.pop_front();
- active_ctx->schedule( consumer_ctx);
+ std::intptr_t expected = reinterpret_cast< std::intptr_t >( this);
+ if ( consumer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) {
+ // notify before timeout
+ intrusive_ptr_release( consumer_ctx);
+ // notify context
+ active_ctx->schedule( consumer_ctx);
+ break;
+ } else if ( static_cast< std::intptr_t >( 0) == expected) {
+ // no timed-wait op.
+ // notify context
+ active_ctx->schedule( consumer_ctx);
+ break;
+ } else {
+ // timed-wait op.
+ // expected == -1: notify after timeout, same timed-wait op.
+ // expected == <any>: notify after timeout, another timed-wait op. was already started
+ intrusive_ptr_release( consumer_ctx);
+ // re-schedule next
+ }
}
// suspend this producer
+ intrusive_ptr_add_ref( active_ctx);
+ active_ctx->twstatus.store( reinterpret_cast< std::intptr_t >( this), std::memory_order_release);
if ( ! active_ctx->wait_until( timeout_time, lk) ) {
// clear slot
slot * nil_slot = nullptr, * own_slot = & s;
@@ -297,13 +402,15 @@ public:
return channel_op_status::success;
} else {
detail::spinlock_lock lk{ splk_producers_ };
- if ( is_closed() ) {
+ if ( BOOST_UNLIKELY( is_closed() ) ) {
return channel_op_status::closed;
}
if ( is_empty_() ) {
continue;
}
active_ctx->wait_link( waiting_producers_);
+ intrusive_ptr_add_ref( active_ctx);
+ active_ctx->twstatus.store( reinterpret_cast< std::intptr_t >( this), std::memory_order_release);
// suspend this producer
if ( ! active_ctx->wait_until( timeout_time, lk) ) {
// relock local lk
@@ -325,27 +432,45 @@ public:
{
detail::spinlock_lock lk{ splk_producers_ };
// notify one waiting producer
- if ( ! waiting_producers_.empty() ) {
+ while ( ! waiting_producers_.empty() ) {
context * producer_ctx = & waiting_producers_.front();
waiting_producers_.pop_front();
lk.unlock();
- active_ctx->schedule( producer_ctx);
+ std::intptr_t expected = reinterpret_cast< std::intptr_t >( this);
+ if ( producer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) {
+ // notify before timeout
+ intrusive_ptr_release( producer_ctx);
+ // notify context
+ active_ctx->schedule( producer_ctx);
+ break;
+ } else if ( static_cast< std::intptr_t >( 0) == expected) {
+ // no timed-wait op.
+ // notify context
+ active_ctx->schedule( producer_ctx);
+ break;
+ } else {
+ // timed-wait op.
+ // expected == -1: notify after timeout, same timed-wait op.
+ // expected == <any>: notify after timeout, another timed-wait op. was already started
+ intrusive_ptr_release( producer_ctx);
+ // re-schedule next
+ }
}
}
- // consume value
value = std::move( s->value);
- // resume suspended producer
+ // notify context
active_ctx->schedule( s->ctx);
return channel_op_status::success;
} else {
detail::spinlock_lock lk{ splk_consumers_ };
- if ( is_closed() ) {
+ if ( BOOST_UNLIKELY( is_closed() ) ) {
return channel_op_status::closed;
}
if ( ! is_empty_() ) {
continue;
}
active_ctx->wait_link( waiting_consumers_);
+ active_ctx->twstatus.store( static_cast< std::intptr_t >( 0), std::memory_order_release);
// suspend this consumer
active_ctx->suspend( lk);
// resumed, slot mabye set
@@ -361,21 +486,39 @@ public:
{
detail::spinlock_lock lk{ splk_producers_ };
// notify one waiting producer
- if ( ! waiting_producers_.empty() ) {
+ while ( ! waiting_producers_.empty() ) {
context * producer_ctx = & waiting_producers_.front();
waiting_producers_.pop_front();
lk.unlock();
- active_ctx->schedule( producer_ctx);
+ std::intptr_t expected = reinterpret_cast< std::intptr_t >( this);
+ if ( producer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) {
+ // notify before timeout
+ intrusive_ptr_release( producer_ctx);
+ // notify context
+ active_ctx->schedule( producer_ctx);
+ break;
+ } else if ( static_cast< std::intptr_t >( 0) == expected) {
+ // no timed-wait op.
+ // notify context
+ active_ctx->schedule( producer_ctx);
+ break;
+ } else {
+ // timed-wait op.
+ // expected == -1: notify after timeout, same timed-wait op.
+ // expected == <any>: notify after timeout, another timed-wait op. was already started
+ intrusive_ptr_release( producer_ctx);
+ // re-schedule next
+ }
}
}
// consume value
- value_type value{ std::move( s->value) };
- // resume suspended producer
+ value_type value = std::move( s->value);
+ // notify context
active_ctx->schedule( s->ctx);
return std::move( value);
} else {
detail::spinlock_lock lk{ splk_consumers_ };
- if ( is_closed() ) {
+ if ( BOOST_UNLIKELY( is_closed() ) ) {
throw fiber_error{
std::make_error_code( std::errc::operation_not_permitted),
"boost fiber: channel is closed" };
@@ -384,6 +527,7 @@ public:
continue;
}
active_ctx->wait_link( waiting_consumers_);
+ active_ctx->twstatus.store( static_cast< std::intptr_t >( 0), std::memory_order_release);
// suspend this consumer
active_ctx->suspend( lk);
// resumed, slot mabye set
@@ -409,27 +553,47 @@ public:
{
detail::spinlock_lock lk{ splk_producers_ };
// notify one waiting producer
- if ( ! waiting_producers_.empty() ) {
+ while ( ! waiting_producers_.empty() ) {
context * producer_ctx = & waiting_producers_.front();
waiting_producers_.pop_front();
lk.unlock();
- active_ctx->schedule( producer_ctx);
+ std::intptr_t expected = reinterpret_cast< std::intptr_t >( this);
+ if ( producer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) {
+ // notify before timeout
+ intrusive_ptr_release( producer_ctx);
+ // notify context
+ active_ctx->schedule( producer_ctx);
+ break;
+ } else if ( static_cast< std::intptr_t >( 0) == expected) {
+ // no timed-wait op.
+ // notify context
+ active_ctx->schedule( producer_ctx);
+ break;
+ } else {
+ // timed-wait op.
+ // expected == -1: notify after timeout, same timed-wait op.
+ // expected == <any>: notify after timeout, another timed-wait op. was already started
+ intrusive_ptr_release( producer_ctx);
+ // re-schedule next
+ }
}
}
// consume value
value = std::move( s->value);
- // resume suspended producer
+ // notify context
active_ctx->schedule( s->ctx);
return channel_op_status::success;
} else {
detail::spinlock_lock lk{ splk_consumers_ };
- if ( is_closed() ) {
+ if ( BOOST_UNLIKELY( is_closed() ) ) {
return channel_op_status::closed;
}
if ( ! is_empty_() ) {
continue;
}
active_ctx->wait_link( waiting_consumers_);
+ intrusive_ptr_add_ref( active_ctx);
+ active_ctx->twstatus.store( reinterpret_cast< std::intptr_t >( this), std::memory_order_release);
// suspend this consumer
if ( ! active_ctx->wait_until( timeout_time, lk) ) {
// relock local lk