summaryrefslogtreecommitdiff
path: root/boost/fiber/buffered_channel.hpp
diff options
context:
space:
mode:
Diffstat (limited to 'boost/fiber/buffered_channel.hpp')
-rw-r--r--boost/fiber/buffered_channel.hpp294
1 files changed, 258 insertions, 36 deletions
diff --git a/boost/fiber/buffered_channel.hpp b/boost/fiber/buffered_channel.hpp
index 1c32e49bae..3cf22295dd 100644
--- a/boost/fiber/buffered_channel.hpp
+++ b/boost/fiber/buffered_channel.hpp
@@ -40,7 +40,7 @@ private:
typedef context::wait_queue_t wait_queue_type;
typedef T slot_type;
- alignas(cache_alignment) mutable detail::spinlock splk_{};
+ mutable detail::spinlock splk_{};
wait_queue_type waiting_producers_{};
wait_queue_type waiting_consumers_{};
slot_type * slots_;
@@ -64,7 +64,7 @@ private:
public:
explicit buffered_channel( std::size_t capacity) :
capacity_{ capacity } {
- if ( 2 > capacity_ || 0 != ( capacity_ & (capacity_ - 1) ) ) {
+ if ( BOOST_UNLIKELY( 2 > capacity_ || 0 != ( capacity_ & (capacity_ - 1) ) ) ) {
throw fiber_error{ std::make_error_code( std::errc::invalid_argument),
"boost fiber: buffer capacity is invalid" };
}
@@ -92,20 +92,52 @@ public:
while ( ! waiting_producers_.empty() ) {
context * producer_ctx = & waiting_producers_.front();
waiting_producers_.pop_front();
- active_ctx->schedule( producer_ctx);
+ std::intptr_t expected = reinterpret_cast< std::intptr_t >( this);
+ if ( producer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) {
+ // notify before timeout
+ intrusive_ptr_release( producer_ctx);
+ // notify context
+ active_ctx->schedule( producer_ctx);
+ } else if ( static_cast< std::intptr_t >( 0) == expected) {
+ // no timed-wait op.
+ // notify context
+ active_ctx->schedule( producer_ctx);
+ } else {
+ // timed-wait op.
+ // expected == -1: notify after timeout, same timed-wait op.
+ // expected == <any>: notify after timeout, another timed-wait op. was already started
+ intrusive_ptr_release( producer_ctx);
+ // re-schedule next
+ }
}
// notify all waiting consumers
while ( ! waiting_consumers_.empty() ) {
context * consumer_ctx = & waiting_consumers_.front();
waiting_consumers_.pop_front();
- active_ctx->schedule( consumer_ctx);
+ std::intptr_t expected = reinterpret_cast< std::intptr_t >( this);
+ if ( consumer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) {
+ // notify before timeout
+ intrusive_ptr_release( consumer_ctx);
+ // notify context
+ active_ctx->schedule( consumer_ctx);
+ } else if ( static_cast< std::intptr_t >( 0) == expected) {
+ // no timed-wait op.
+ // notify context
+ active_ctx->schedule( consumer_ctx);
+ } else {
+ // timed-wait op.
+ // expected == -1: notify after timeout, same timed-wait op.
+ // expected == <any>: notify after timeout, another timed-wait op. was already started
+ intrusive_ptr_release( consumer_ctx);
+ // re-schedule next
+ }
}
}
channel_op_status try_push( value_type const& value) {
context * active_ctx = context::active();
detail::spinlock_lock lk{ splk_ };
- if ( is_closed_() ) {
+ if ( BOOST_UNLIKELY( is_closed_() ) ) {
return channel_op_status::closed;
} else if ( is_full_() ) {
return channel_op_status::full;
@@ -113,11 +145,28 @@ public:
slots_[pidx_] = value;
pidx_ = (pidx_ + 1) % capacity_;
// notify one waiting consumer
- if ( ! waiting_consumers_.empty() ) {
+ while ( ! waiting_consumers_.empty() ) {
context * consumer_ctx = & waiting_consumers_.front();
waiting_consumers_.pop_front();
- lk.unlock();
- active_ctx->schedule( consumer_ctx);
+ std::intptr_t expected = reinterpret_cast< std::intptr_t >( this);
+ if ( consumer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) {
+ // notify before timeout
+ intrusive_ptr_release( consumer_ctx);
+ // notify context
+ active_ctx->schedule( consumer_ctx);
+ break;
+ } else if ( static_cast< std::intptr_t >( 0) == expected) {
+ // no timed-wait op.
+ // notify context
+ active_ctx->schedule( consumer_ctx);
+ break;
+ } else {
+ // timed-wait op.
+ // expected == -1: notify after timeout, same timed-wait op.
+ // expected == <any>: notify after timeout, another timed-wait op. was already started
+ intrusive_ptr_release( consumer_ctx);
+ // re-schedule next
+ }
}
return channel_op_status::success;
}
@@ -126,7 +175,7 @@ public:
channel_op_status try_push( value_type && value) {
context * active_ctx = context::active();
detail::spinlock_lock lk{ splk_ };
- if ( is_closed_() ) {
+ if ( BOOST_UNLIKELY( is_closed_() ) ) {
return channel_op_status::closed;
} else if ( is_full_() ) {
return channel_op_status::full;
@@ -134,11 +183,29 @@ public:
slots_[pidx_] = std::move( value);
pidx_ = (pidx_ + 1) % capacity_;
// notify one waiting consumer
- if ( ! waiting_consumers_.empty() ) {
+ while ( ! waiting_consumers_.empty() ) {
context * consumer_ctx = & waiting_consumers_.front();
waiting_consumers_.pop_front();
lk.unlock();
- active_ctx->schedule( consumer_ctx);
+ std::intptr_t expected = reinterpret_cast< std::intptr_t >( this);
+ if ( consumer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) {
+ // notify before timeout
+ intrusive_ptr_release( consumer_ctx);
+ // notify context
+ active_ctx->schedule( consumer_ctx);
+ break;
+ } else if ( static_cast< std::intptr_t >( 0) == expected) {
+ // no timed-wait op.
+ // notify context
+ active_ctx->schedule( consumer_ctx);
+ break;
+ } else {
+ // timed-wait op.
+ // expected == -1: notify after timeout, same timed-wait op.
+ // expected == <any>: notify after timeout, another timed-wait op. was already started
+ intrusive_ptr_release( consumer_ctx);
+ // re-schedule next
+ }
}
return channel_op_status::success;
}
@@ -148,21 +215,40 @@ public:
context * active_ctx = context::active();
for (;;) {
detail::spinlock_lock lk{ splk_ };
- if ( is_closed_() ) {
+ if ( BOOST_UNLIKELY( is_closed_() ) ) {
return channel_op_status::closed;
} else if ( is_full_() ) {
active_ctx->wait_link( waiting_producers_);
+ active_ctx->twstatus.store( static_cast< std::intptr_t >( 0), std::memory_order_release);
// suspend this producer
active_ctx->suspend( lk);
} else {
slots_[pidx_] = value;
pidx_ = (pidx_ + 1) % capacity_;
// notify one waiting consumer
- if ( ! waiting_consumers_.empty() ) {
+ while ( ! waiting_consumers_.empty() ) {
context * consumer_ctx = & waiting_consumers_.front();
waiting_consumers_.pop_front();
lk.unlock();
- active_ctx->schedule( consumer_ctx);
+ std::intptr_t expected = reinterpret_cast< std::intptr_t >( this);
+ if ( consumer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) {
+ // notify before timeout
+ intrusive_ptr_release( consumer_ctx);
+ // notify context
+ active_ctx->schedule( consumer_ctx);
+ break;
+ } else if ( static_cast< std::intptr_t >( 0) == expected) {
+ // no timed-wait op.
+ // notify context
+ active_ctx->schedule( consumer_ctx);
+ break;
+ } else {
+ // timed-wait op.
+ // expected == -1: notify after timeout, same timed-wait op.
+ // expected == <any>: notify after timeout, another timed-wait op. was already started
+ intrusive_ptr_release( consumer_ctx);
+ // re-schedule next
+ }
}
return channel_op_status::success;
}
@@ -173,21 +259,40 @@ public:
context * active_ctx = context::active();
for (;;) {
detail::spinlock_lock lk{ splk_ };
- if ( is_closed_() ) {
+ if ( BOOST_UNLIKELY( is_closed_() ) ) {
return channel_op_status::closed;
} else if ( is_full_() ) {
active_ctx->wait_link( waiting_producers_);
+ active_ctx->twstatus.store( static_cast< std::intptr_t >( 0), std::memory_order_release);
// suspend this producer
active_ctx->suspend( lk);
} else {
slots_[pidx_] = std::move( value);
pidx_ = (pidx_ + 1) % capacity_;
// notify one waiting consumer
- if ( ! waiting_consumers_.empty() ) {
+ while ( ! waiting_consumers_.empty() ) {
context * consumer_ctx = & waiting_consumers_.front();
waiting_consumers_.pop_front();
lk.unlock();
- active_ctx->schedule( consumer_ctx);
+ std::intptr_t expected = reinterpret_cast< std::intptr_t >( this);
+ if ( consumer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) {
+ // notify before timeout
+ intrusive_ptr_release( consumer_ctx);
+ // notify context
+ active_ctx->schedule( consumer_ctx);
+ break;
+ } else if ( static_cast< std::intptr_t >( 0) == expected) {
+ // no timed-wait op.
+ // notify context
+ active_ctx->schedule( consumer_ctx);
+ break;
+ } else {
+ // timed-wait op.
+ // expected == -1: notify after timeout, same timed-wait op.
+ // expected == <any>: notify after timeout, another timed-wait op. was already started
+ intrusive_ptr_release( consumer_ctx);
+ // re-schedule next
+ }
}
return channel_op_status::success;
}
@@ -215,10 +320,12 @@ public:
std::chrono::steady_clock::time_point timeout_time = detail::convert( timeout_time_);
for (;;) {
detail::spinlock_lock lk{ splk_ };
- if ( is_closed_() ) {
+ if ( BOOST_UNLIKELY( is_closed_() ) ) {
return channel_op_status::closed;
} else if ( is_full_() ) {
active_ctx->wait_link( waiting_producers_);
+ intrusive_ptr_add_ref( active_ctx);
+ active_ctx->twstatus.store( reinterpret_cast< std::intptr_t >( this), std::memory_order_release);
// suspend this producer
if ( ! active_ctx->wait_until( timeout_time, lk) ) {
// relock local lk
@@ -231,11 +338,29 @@ public:
slots_[pidx_] = value;
pidx_ = (pidx_ + 1) % capacity_;
// notify one waiting consumer
- if ( ! waiting_consumers_.empty() ) {
+ while ( ! waiting_consumers_.empty() ) {
context * consumer_ctx = & waiting_consumers_.front();
waiting_consumers_.pop_front();
lk.unlock();
- active_ctx->schedule( consumer_ctx);
+ std::intptr_t expected = reinterpret_cast< std::intptr_t >( this);
+ if ( consumer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) {
+ // notify before timeout
+ intrusive_ptr_release( consumer_ctx);
+ // notify context
+ active_ctx->schedule( consumer_ctx);
+ break;
+ } else if ( static_cast< std::intptr_t >( 0) == expected) {
+ // no timed-wait op.
+ // notify context
+ active_ctx->schedule( consumer_ctx);
+ break;
+ } else {
+ // timed-wait op.
+ // expected == -1: notify after timeout, same timed-wait op.
+ // expected == <any>: notify after timeout, another timed-wait op. was already started
+ intrusive_ptr_release( consumer_ctx);
+ // re-schedule next
+ }
}
return channel_op_status::success;
}
@@ -249,10 +374,12 @@ public:
std::chrono::steady_clock::time_point timeout_time = detail::convert( timeout_time_);
for (;;) {
detail::spinlock_lock lk{ splk_ };
- if ( is_closed_() ) {
+ if ( BOOST_UNLIKELY( is_closed_() ) ) {
return channel_op_status::closed;
} else if ( is_full_() ) {
active_ctx->wait_link( waiting_producers_);
+ intrusive_ptr_add_ref( active_ctx);
+ active_ctx->twstatus.store( reinterpret_cast< std::intptr_t >( this), std::memory_order_release);
// suspend this producer
if ( ! active_ctx->wait_until( timeout_time, lk) ) {
// relock local lk
@@ -265,11 +392,29 @@ public:
slots_[pidx_] = std::move( value);
pidx_ = (pidx_ + 1) % capacity_;
// notify one waiting consumer
- if ( ! waiting_consumers_.empty() ) {
+ while ( ! waiting_consumers_.empty() ) {
context * consumer_ctx = & waiting_consumers_.front();
waiting_consumers_.pop_front();
lk.unlock();
- active_ctx->schedule( consumer_ctx);
+ std::intptr_t expected = reinterpret_cast< std::intptr_t >( this);
+ if ( consumer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) {
+ // notify before timeout
+ intrusive_ptr_release( consumer_ctx);
+ // notify context
+ active_ctx->schedule( consumer_ctx);
+ break;
+ } else if ( static_cast< std::intptr_t >( 0) == expected) {
+ // no timed-wait op.
+ // notify context
+ active_ctx->schedule( consumer_ctx);
+ break;
+ } else {
+ // timed-wait op.
+ // expected == -1: notify after timeout, same timed-wait op.
+ // expected == <any>: notify after timeout, another timed-wait op. was already started
+ intrusive_ptr_release( consumer_ctx);
+ // re-schedule next
+ }
}
return channel_op_status::success;
}
@@ -287,11 +432,29 @@ public:
value = std::move( slots_[cidx_]);
cidx_ = (cidx_ + 1) % capacity_;
// notify one waiting producer
- if ( ! waiting_producers_.empty() ) {
+ while ( ! waiting_producers_.empty() ) {
context * producer_ctx = & waiting_producers_.front();
waiting_producers_.pop_front();
lk.unlock();
- active_ctx->schedule( producer_ctx);
+ std::intptr_t expected = reinterpret_cast< std::intptr_t >( this);
+ if ( producer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) {
+ // notify before timeout
+ intrusive_ptr_release( producer_ctx);
+ // notify context
+ active_ctx->schedule( producer_ctx);
+ break;
+ } else if ( static_cast< std::intptr_t >( 0) == expected) {
+ // no timed-wait op.
+ // notify context
+ active_ctx->schedule( producer_ctx);
+ break;
+ } else {
+ // timed-wait op.
+ // expected == -1: notify after timeout, same timed-wait op.
+ // expected == <any>: notify after timeout, another timed-wait op. was already started
+ intrusive_ptr_release( producer_ctx);
+ // re-schedule next
+ }
}
return channel_op_status::success;
}
@@ -302,10 +465,11 @@ public:
for (;;) {
detail::spinlock_lock lk{ splk_ };
if ( is_empty_() ) {
- if ( is_closed_() ) {
+ if ( BOOST_UNLIKELY( is_closed_() ) ) {
return channel_op_status::closed;
} else {
active_ctx->wait_link( waiting_consumers_);
+ active_ctx->twstatus.store( static_cast< std::intptr_t >( 0), std::memory_order_release);
// suspend this consumer
active_ctx->suspend( lk);
}
@@ -313,11 +477,29 @@ public:
value = std::move( slots_[cidx_]);
cidx_ = (cidx_ + 1) % capacity_;
// notify one waiting producer
- if ( ! waiting_producers_.empty() ) {
+ while ( ! waiting_producers_.empty() ) {
context * producer_ctx = & waiting_producers_.front();
waiting_producers_.pop_front();
lk.unlock();
- active_ctx->schedule( producer_ctx);
+ std::intptr_t expected = reinterpret_cast< std::intptr_t >( this);
+ if ( producer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) {
+ // notify before timeout
+ intrusive_ptr_release( producer_ctx);
+ // notify context
+ active_ctx->schedule( producer_ctx);
+ break;
+ } else if ( static_cast< std::intptr_t >( 0) == expected) {
+ // no timed-wait op.
+ // notify context
+ active_ctx->schedule( producer_ctx);
+ break;
+ } else {
+ // timed-wait op.
+ // expected == -1: notify after timeout, same timed-wait op.
+ // expected == <any>: notify after timeout, another timed-wait op. was already started
+ intrusive_ptr_release( producer_ctx);
+ // re-schedule next
+ }
}
return channel_op_status::success;
}
@@ -329,12 +511,13 @@ public:
for (;;) {
detail::spinlock_lock lk{ splk_ };
if ( is_empty_() ) {
- if ( is_closed_() ) {
+ if ( BOOST_UNLIKELY( is_closed_() ) ) {
throw fiber_error{
std::make_error_code( std::errc::operation_not_permitted),
"boost fiber: channel is closed" };
} else {
active_ctx->wait_link( waiting_consumers_);
+ active_ctx->twstatus.store( static_cast< std::intptr_t >( 0), std::memory_order_release);
// suspend this consumer
active_ctx->suspend( lk);
}
@@ -342,11 +525,29 @@ public:
value_type value = std::move( slots_[cidx_]);
cidx_ = (cidx_ + 1) % capacity_;
// notify one waiting producer
- if ( ! waiting_producers_.empty() ) {
+ while ( ! waiting_producers_.empty() ) {
context * producer_ctx = & waiting_producers_.front();
waiting_producers_.pop_front();
lk.unlock();
- active_ctx->schedule( producer_ctx);
+ std::intptr_t expected = reinterpret_cast< std::intptr_t >( this);
+ if ( producer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) {
+ // notify before timeout
+ intrusive_ptr_release( producer_ctx);
+ // notify context
+ active_ctx->schedule( producer_ctx);
+ break;
+ } else if ( static_cast< std::intptr_t >( 0) == expected) {
+ // no timed-wait op.
+ // notify context
+ active_ctx->schedule( producer_ctx);
+ break;
+ } else {
+ // timed-wait op.
+ // expected == -1: notify after timeout, same timed-wait op.
+ // expected == <any>: notify after timeout, another timed-wait op. was already started
+ intrusive_ptr_release( producer_ctx);
+ // re-schedule next
+ }
}
return std::move( value);
}
@@ -368,10 +569,12 @@ public:
for (;;) {
detail::spinlock_lock lk{ splk_ };
if ( is_empty_() ) {
- if ( is_closed_() ) {
+ if ( BOOST_UNLIKELY( is_closed_() ) ) {
return channel_op_status::closed;
} else {
active_ctx->wait_link( waiting_consumers_);
+ intrusive_ptr_add_ref( active_ctx);
+ active_ctx->twstatus.store( reinterpret_cast< std::intptr_t >( this), std::memory_order_release);
// suspend this consumer
if ( ! active_ctx->wait_until( timeout_time, lk) ) {
// relock local lk
@@ -385,11 +588,29 @@ public:
value = std::move( slots_[cidx_]);
cidx_ = (cidx_ + 1) % capacity_;
// notify one waiting producer
- if ( ! waiting_producers_.empty() ) {
+ while ( ! waiting_producers_.empty() ) {
context * producer_ctx = & waiting_producers_.front();
waiting_producers_.pop_front();
lk.unlock();
- active_ctx->schedule( producer_ctx);
+ std::intptr_t expected = reinterpret_cast< std::intptr_t >( this);
+ if ( producer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) {
+ // notify before timeout
+ intrusive_ptr_release( producer_ctx);
+ // notify context
+ active_ctx->schedule( producer_ctx);
+ break;
+ } else if ( static_cast< std::intptr_t >( 0) == expected) {
+ // no timed-wait op.
+ // notify context
+ active_ctx->schedule( producer_ctx);
+ break;
+ } else {
+ // timed-wait op.
+ // expected == -1: notify after timeout, same timed-wait op.
+ // expected == <any>: notify after timeout, another timed-wait op. was already started
+ intrusive_ptr_release( producer_ctx);
+ // re-schedule next
+ }
}
return channel_op_status::success;
}
@@ -428,8 +649,9 @@ public:
}
iterator & operator=( iterator const& other) noexcept {
- if ( this == & other) return * this;
- chan_ = other.chan_;
+ if ( BOOST_LIKELY( this != & other) ) {
+ chan_ = other.chan_;
+ }
return * this;
}