summaryrefslogtreecommitdiff
path: root/boost/fiber/buffered_channel.hpp
diff options
context:
space:
mode:
Diffstat (limited to 'boost/fiber/buffered_channel.hpp')
-rw-r--r--boost/fiber/buffered_channel.hpp510
1 files changed, 201 insertions, 309 deletions
diff --git a/boost/fiber/buffered_channel.hpp b/boost/fiber/buffered_channel.hpp
index 25109a4976..1c32e49bae 100644
--- a/boost/fiber/buffered_channel.hpp
+++ b/boost/fiber/buffered_channel.hpp
@@ -4,8 +4,6 @@
// (See accompanying file LICENSE_1_0.txt or copy at
// http://www.boost.org/LICENSE_1_0.txt)
//
-// based on Dmitry Vyukov's MPMC queue
-// (http://www.1024cores.net/home/lock-free-algorithms/queues/bounded-mpmc-queue)
#ifndef BOOST_FIBERS_BUFFERED_CHANNEL_H
#define BOOST_FIBERS_BUFFERED_CHANNEL_H
@@ -39,120 +37,42 @@ public:
typedef T value_type;
private:
- typedef typename std::aligned_storage< sizeof( T), alignof( T) >::type storage_type;
- typedef context::wait_queue_t wait_queue_type;
-
- struct alignas(cache_alignment) slot {
- std::atomic< std::size_t > cycle{ 0 };
- storage_type storage{};
-
- slot() = default;
- };
-
- // procuder cacheline
- alignas(cache_alignment) std::atomic< std::size_t > producer_idx_{ 0 };
- // consumer cacheline
- alignas(cache_alignment) std::atomic< std::size_t > consumer_idx_{ 0 };
- // shared write cacheline
- alignas(cache_alignment) std::atomic_bool closed_{ false };
- mutable detail::spinlock splk_{};
- wait_queue_type waiting_producers_{};
- wait_queue_type waiting_consumers_{};
- // shared read cacheline
- alignas(cache_alignment) slot * slots_{ nullptr };
- std::size_t capacity_;
- char pad_[cacheline_length];
-
- bool is_full_() {
- std::size_t idx{ producer_idx_.load( std::memory_order_relaxed) };
- return 0 > static_cast< std::intptr_t >( slots_[idx & (capacity_ - 1)].cycle.load( std::memory_order_acquire) ) - static_cast< std::intptr_t >( idx);
- }
-
- bool is_empty_() {
- std::size_t idx{ consumer_idx_.load( std::memory_order_relaxed) };
- return 0 > static_cast< std::intptr_t >( slots_[idx & (capacity_ - 1)].cycle.load( std::memory_order_acquire) ) - static_cast< std::intptr_t >( idx + 1);
- }
-
- template< typename ValueType >
- channel_op_status try_push_( ValueType && value) {
- slot * s{ nullptr };
- std::size_t idx{ producer_idx_.load( std::memory_order_relaxed) };
- for (;;) {
- s = & slots_[idx & (capacity_ - 1)];
- std::size_t cycle{ s->cycle.load( std::memory_order_acquire) };
- std::intptr_t diff{ static_cast< std::intptr_t >( cycle) - static_cast< std::intptr_t >( idx) };
- if ( 0 == diff) {
- if ( producer_idx_.compare_exchange_weak( idx, idx + 1, std::memory_order_relaxed) ) {
- break;
- }
- } else if ( 0 > diff) {
- return channel_op_status::full;
- } else {
- idx = producer_idx_.load( std::memory_order_relaxed);
- }
- }
- ::new ( static_cast< void * >( std::addressof( s->storage) ) ) value_type( std::forward< ValueType >( value) );
- s->cycle.store( idx + 1, std::memory_order_release);
- return channel_op_status::success;
- }
-
- channel_op_status try_value_pop_( slot *& s, std::size_t & idx) {
- idx = consumer_idx_.load( std::memory_order_relaxed);
- for (;;) {
- s = & slots_[idx & (capacity_ - 1)];
- std::size_t cycle = s->cycle.load( std::memory_order_acquire);
- std::intptr_t diff{ static_cast< std::intptr_t >( cycle) - static_cast< std::intptr_t >( idx + 1) };
- if ( 0 == diff) {
- if ( consumer_idx_.compare_exchange_weak( idx, idx + 1, std::memory_order_relaxed) ) {
- break;
- }
- } else if ( 0 > diff) {
- return channel_op_status::empty;
- } else {
- idx = consumer_idx_.load( std::memory_order_relaxed);
- }
- }
- // incrementing the slot cycle must be deferred till the value has been consumed
- // slot cycle tells procuders that the cell can be re-used (store new value)
- return channel_op_status::success;
- }
-
- channel_op_status try_pop_( value_type & value) {
- slot * s{ nullptr };
- std::size_t idx{ 0 };
- channel_op_status status{ try_value_pop_( s, idx) };
- if ( channel_op_status::success == status) {
- value = std::move( * reinterpret_cast< value_type * >( std::addressof( s->storage) ) );
- s->cycle.store( idx + capacity_, std::memory_order_release);
- }
- return status;
+ typedef context::wait_queue_t wait_queue_type;
+ typedef T slot_type;
+
+ alignas(cache_alignment) mutable detail::spinlock splk_{};
+ wait_queue_type waiting_producers_{};
+ wait_queue_type waiting_consumers_{};
+ slot_type * slots_;
+ std::size_t pidx_{ 0 };
+ std::size_t cidx_{ 0 };
+ std::size_t capacity_;
+ bool closed_{ false };
+
+ bool is_full_() const noexcept {
+ return cidx_ == ((pidx_ + 1) % capacity_);
+ }
+
+ bool is_empty_() const noexcept {
+ return cidx_ == pidx_;
+ }
+
+ bool is_closed_() const noexcept {
+ return closed_;
}
public:
explicit buffered_channel( std::size_t capacity) :
- capacity_{ capacity } {
- if ( 0 == capacity_ || 0 != ( capacity_ & (capacity_ - 1) ) ) {
- throw fiber_error( std::make_error_code( std::errc::invalid_argument),
- "boost fiber: buffer capacity is invalid");
- }
- slots_ = new slot[capacity_]();
- for ( std::size_t i = 0; i < capacity_; ++i) {
- slots_[i].cycle.store( i, std::memory_order_relaxed);
+ capacity_{ capacity } {
+ if ( 2 > capacity_ || 0 != ( capacity_ & (capacity_ - 1) ) ) {
+ throw fiber_error{ std::make_error_code( std::errc::invalid_argument),
+ "boost fiber: buffer capacity is invalid" };
}
+ slots_ = new slot_type[capacity_];
}
~buffered_channel() {
close();
- for (;;) {
- slot * s{ nullptr };
- std::size_t idx{ 0 };
- if ( channel_op_status::success == try_value_pop_( s, idx) ) {
- reinterpret_cast< value_type * >( std::addressof( s->storage) )->~value_type();
- s->cycle.store( idx + capacity_, std::memory_order_release);
- } else {
- break;
- }
- }
delete [] slots_;
}
@@ -160,109 +80,116 @@ public:
buffered_channel & operator=( buffered_channel const&) = delete;
bool is_closed() const noexcept {
- return closed_.load( std::memory_order_acquire);
+ detail::spinlock_lock lk{ splk_ };
+ return is_closed_();
}
void close() noexcept {
- context * ctx{ context::active() };
+ context * active_ctx = context::active();
detail::spinlock_lock lk{ splk_ };
- closed_.store( true, std::memory_order_release);
+ closed_ = true;
// notify all waiting producers
while ( ! waiting_producers_.empty() ) {
- context * producer_ctx{ & waiting_producers_.front() };
+ context * producer_ctx = & waiting_producers_.front();
waiting_producers_.pop_front();
- ctx->set_ready( producer_ctx);
+ active_ctx->schedule( producer_ctx);
}
// notify all waiting consumers
while ( ! waiting_consumers_.empty() ) {
- context * consumer_ctx{ & waiting_consumers_.front() };
+ context * consumer_ctx = & waiting_consumers_.front();
waiting_consumers_.pop_front();
- ctx->set_ready( consumer_ctx);
+ active_ctx->schedule( consumer_ctx);
}
}
channel_op_status try_push( value_type const& value) {
- if ( is_closed() ) {
+ context * active_ctx = context::active();
+ detail::spinlock_lock lk{ splk_ };
+ if ( is_closed_() ) {
return channel_op_status::closed;
+ } else if ( is_full_() ) {
+ return channel_op_status::full;
+ } else {
+ slots_[pidx_] = value;
+ pidx_ = (pidx_ + 1) % capacity_;
+ // notify one waiting consumer
+ if ( ! waiting_consumers_.empty() ) {
+ context * consumer_ctx = & waiting_consumers_.front();
+ waiting_consumers_.pop_front();
+ lk.unlock();
+ active_ctx->schedule( consumer_ctx);
+ }
+ return channel_op_status::success;
}
- return try_push_( value);
}
channel_op_status try_push( value_type && value) {
- if ( is_closed() ) {
+ context * active_ctx = context::active();
+ detail::spinlock_lock lk{ splk_ };
+ if ( is_closed_() ) {
return channel_op_status::closed;
+ } else if ( is_full_() ) {
+ return channel_op_status::full;
+ } else {
+ slots_[pidx_] = std::move( value);
+ pidx_ = (pidx_ + 1) % capacity_;
+ // notify one waiting consumer
+ if ( ! waiting_consumers_.empty() ) {
+ context * consumer_ctx = & waiting_consumers_.front();
+ waiting_consumers_.pop_front();
+ lk.unlock();
+ active_ctx->schedule( consumer_ctx);
+ }
+ return channel_op_status::success;
}
- return try_push_( std::move( value) );
}
channel_op_status push( value_type const& value) {
- context * ctx{ context::active() };
+ context * active_ctx = context::active();
for (;;) {
- if ( is_closed() ) {
+ detail::spinlock_lock lk{ splk_ };
+ if ( is_closed_() ) {
return channel_op_status::closed;
- }
- channel_op_status status{ try_push_( value) };
- if ( channel_op_status::success == status) {
- detail::spinlock_lock lk{ splk_ };
+ } else if ( is_full_() ) {
+ active_ctx->wait_link( waiting_producers_);
+ // suspend this producer
+ active_ctx->suspend( lk);
+ } else {
+ slots_[pidx_] = value;
+ pidx_ = (pidx_ + 1) % capacity_;
// notify one waiting consumer
if ( ! waiting_consumers_.empty() ) {
- context * consumer_ctx{ & waiting_consumers_.front() };
+ context * consumer_ctx = & waiting_consumers_.front();
waiting_consumers_.pop_front();
lk.unlock();
- ctx->set_ready( consumer_ctx);
- }
- return status;
- } else if ( channel_op_status::full == status) {
- BOOST_ASSERT( ! ctx->wait_is_linked() );
- detail::spinlock_lock lk{ splk_ };
- if ( is_closed() ) {
- return channel_op_status::closed;
+ active_ctx->schedule( consumer_ctx);
}
- if ( ! is_full_() ) {
- continue;
- }
- ctx->wait_link( waiting_producers_);
- // suspend this producer
- ctx->suspend( lk);
- } else {
- BOOST_ASSERT( channel_op_status::closed == status);
- return status;
+ return channel_op_status::success;
}
}
}
channel_op_status push( value_type && value) {
- context * ctx{ context::active() };
+ context * active_ctx = context::active();
for (;;) {
- if ( is_closed() ) {
+ detail::spinlock_lock lk{ splk_ };
+ if ( is_closed_() ) {
return channel_op_status::closed;
- }
- channel_op_status status{ try_push_( std::move( value) ) };
- if ( channel_op_status::success == status) {
- detail::spinlock_lock lk{ splk_ };
+ } else if ( is_full_() ) {
+ active_ctx->wait_link( waiting_producers_);
+ // suspend this producer
+ active_ctx->suspend( lk);
+ } else {
+ slots_[pidx_] = std::move( value);
+ pidx_ = (pidx_ + 1) % capacity_;
// notify one waiting consumer
if ( ! waiting_consumers_.empty() ) {
- context * consumer_ctx{ & waiting_consumers_.front() };
+ context * consumer_ctx = & waiting_consumers_.front();
waiting_consumers_.pop_front();
lk.unlock();
- ctx->set_ready( consumer_ctx);
+ active_ctx->schedule( consumer_ctx);
}
- return status;
- } else if ( channel_op_status::full == status) {
- BOOST_ASSERT( ! ctx->wait_is_linked() );
- detail::spinlock_lock lk{ splk_ };
- if ( is_closed() ) {
- return channel_op_status::closed;
- }
- if ( ! is_full_() ) {
- continue;
- }
- ctx->wait_link( waiting_producers_);
- // suspend this producer
- ctx->suspend( lk);
- } else {
- BOOST_ASSERT( channel_op_status::closed == status);
- return status;
+ return channel_op_status::success;
}
}
}
@@ -284,44 +211,33 @@ public:
template< typename Clock, typename Duration >
channel_op_status push_wait_until( value_type const& value,
std::chrono::time_point< Clock, Duration > const& timeout_time_) {
- std::chrono::steady_clock::time_point timeout_time( detail::convert( timeout_time_) );
- context * ctx{ context::active() };
+ context * active_ctx = context::active();
+ std::chrono::steady_clock::time_point timeout_time = detail::convert( timeout_time_);
for (;;) {
- if ( is_closed() ) {
+ detail::spinlock_lock lk{ splk_ };
+ if ( is_closed_() ) {
return channel_op_status::closed;
- }
- channel_op_status status{ try_push_( value) };
- if ( channel_op_status::success == status) {
- detail::spinlock_lock lk{ splk_ };
- // notify one waiting consumer
- if ( ! waiting_consumers_.empty() ) {
- context * consumer_ctx{ & waiting_consumers_.front() };
- waiting_consumers_.pop_front();
- lk.unlock();
- ctx->set_ready( consumer_ctx);
- }
- return status;
- } else if ( channel_op_status::full == status) {
- BOOST_ASSERT( ! ctx->wait_is_linked() );
- detail::spinlock_lock lk{ splk_ };
- if ( is_closed() ) {
- return channel_op_status::closed;
- }
- if ( ! is_full_() ) {
- continue;
- }
- ctx->wait_link( waiting_producers_);
+ } else if ( is_full_() ) {
+ active_ctx->wait_link( waiting_producers_);
// suspend this producer
- if ( ! ctx->wait_until( timeout_time, lk) ) {
+ if ( ! active_ctx->wait_until( timeout_time, lk) ) {
// relock local lk
lk.lock();
// remove from waiting-queue
- ctx->wait_unlink();
+ waiting_producers_.remove( * active_ctx);
return channel_op_status::timeout;
}
} else {
- BOOST_ASSERT( channel_op_status::closed == status);
- return status;
+ slots_[pidx_] = value;
+ pidx_ = (pidx_ + 1) % capacity_;
+ // notify one waiting consumer
+ if ( ! waiting_consumers_.empty() ) {
+ context * consumer_ctx = & waiting_consumers_.front();
+ waiting_consumers_.pop_front();
+ lk.unlock();
+ active_ctx->schedule( consumer_ctx);
+ }
+ return channel_op_status::success;
}
}
}
@@ -329,128 +245,110 @@ public:
template< typename Clock, typename Duration >
channel_op_status push_wait_until( value_type && value,
std::chrono::time_point< Clock, Duration > const& timeout_time_) {
- std::chrono::steady_clock::time_point timeout_time( detail::convert( timeout_time_) );
- context * ctx{ context::active() };
+ context * active_ctx = context::active();
+ std::chrono::steady_clock::time_point timeout_time = detail::convert( timeout_time_);
for (;;) {
- if ( is_closed() ) {
+ detail::spinlock_lock lk{ splk_ };
+ if ( is_closed_() ) {
return channel_op_status::closed;
- }
- channel_op_status status{ try_push_( std::move( value) ) };
- if ( channel_op_status::success == status) {
- detail::spinlock_lock lk{ splk_ };
- // notify one waiting consumer
- if ( ! waiting_consumers_.empty() ) {
- context * consumer_ctx{ & waiting_consumers_.front() };
- waiting_consumers_.pop_front();
- lk.unlock();
- ctx->set_ready( consumer_ctx);
- }
- return status;
- } else if ( channel_op_status::full == status) {
- BOOST_ASSERT( ! ctx->wait_is_linked() );
- detail::spinlock_lock lk{ splk_ };
- if ( is_closed() ) {
- return channel_op_status::closed;
- }
- if ( ! is_full_() ) {
- continue;
- }
- ctx->wait_link( waiting_producers_);
+ } else if ( is_full_() ) {
+ active_ctx->wait_link( waiting_producers_);
// suspend this producer
- if ( ! ctx->wait_until( timeout_time, lk) ) {
+ if ( ! active_ctx->wait_until( timeout_time, lk) ) {
// relock local lk
lk.lock();
// remove from waiting-queue
- ctx->wait_unlink();
+ waiting_producers_.remove( * active_ctx);
return channel_op_status::timeout;
}
} else {
- BOOST_ASSERT( channel_op_status::closed == status);
- return status;
+ slots_[pidx_] = std::move( value);
+ pidx_ = (pidx_ + 1) % capacity_;
+ // notify one waiting consumer
+ if ( ! waiting_consumers_.empty() ) {
+ context * consumer_ctx = & waiting_consumers_.front();
+ waiting_consumers_.pop_front();
+ lk.unlock();
+ active_ctx->schedule( consumer_ctx);
+ }
+ return channel_op_status::success;
}
}
}
channel_op_status try_pop( value_type & value) {
- channel_op_status status{ try_pop_( value) };
- if ( channel_op_status::success != status) {
- if ( is_closed() ) {
- status = channel_op_status::closed;
+ context * active_ctx = context::active();
+ detail::spinlock_lock lk{ splk_ };
+ if ( is_empty_() ) {
+ return is_closed_()
+ ? channel_op_status::closed
+ : channel_op_status::empty;
+ } else {
+ value = std::move( slots_[cidx_]);
+ cidx_ = (cidx_ + 1) % capacity_;
+ // notify one waiting producer
+ if ( ! waiting_producers_.empty() ) {
+ context * producer_ctx = & waiting_producers_.front();
+ waiting_producers_.pop_front();
+ lk.unlock();
+ active_ctx->schedule( producer_ctx);
}
+ return channel_op_status::success;
}
- return status;
}
channel_op_status pop( value_type & value) {
- context * ctx{ context::active() };
+ context * active_ctx = context::active();
for (;;) {
- channel_op_status status{ try_pop_( value) };
- if ( channel_op_status::success == status) {
- detail::spinlock_lock lk{ splk_ };
+ detail::spinlock_lock lk{ splk_ };
+ if ( is_empty_() ) {
+ if ( is_closed_() ) {
+ return channel_op_status::closed;
+ } else {
+ active_ctx->wait_link( waiting_consumers_);
+ // suspend this consumer
+ active_ctx->suspend( lk);
+ }
+ } else {
+ value = std::move( slots_[cidx_]);
+ cidx_ = (cidx_ + 1) % capacity_;
// notify one waiting producer
if ( ! waiting_producers_.empty() ) {
- context * producer_ctx{ & waiting_producers_.front() };
+ context * producer_ctx = & waiting_producers_.front();
waiting_producers_.pop_front();
lk.unlock();
- ctx->set_ready( producer_ctx);
+ active_ctx->schedule( producer_ctx);
}
- return status;
- } else if ( channel_op_status::empty == status) {
- BOOST_ASSERT( ! ctx->wait_is_linked() );
- detail::spinlock_lock lk{ splk_ };
- if ( is_closed() ) {
- return channel_op_status::closed;
- }
- if ( ! is_empty_() ) {
- continue;
- }
- ctx->wait_link( waiting_consumers_);
- // suspend this consumer
- ctx->suspend( lk);
- } else {
- BOOST_ASSERT( channel_op_status::closed == status);
- return status;
+ return channel_op_status::success;
}
}
}
value_type value_pop() {
- context * ctx{ context::active() };
+ context * active_ctx = context::active();
for (;;) {
- slot * s{ nullptr };
- std::size_t idx{ 0 };
- channel_op_status status{ try_value_pop_( s, idx) };
- if ( channel_op_status::success == status) {
- value_type value{ std::move( * reinterpret_cast< value_type * >( std::addressof( s->storage) ) ) };
- s->cycle.store( idx + capacity_, std::memory_order_release);
- detail::spinlock_lock lk{ splk_ };
+ detail::spinlock_lock lk{ splk_ };
+ if ( is_empty_() ) {
+ if ( is_closed_() ) {
+ throw fiber_error{
+ std::make_error_code( std::errc::operation_not_permitted),
+ "boost fiber: channel is closed" };
+ } else {
+ active_ctx->wait_link( waiting_consumers_);
+ // suspend this consumer
+ active_ctx->suspend( lk);
+ }
+ } else {
+ value_type value = std::move( slots_[cidx_]);
+ cidx_ = (cidx_ + 1) % capacity_;
// notify one waiting producer
if ( ! waiting_producers_.empty() ) {
- context * producer_ctx{ & waiting_producers_.front() };
+ context * producer_ctx = & waiting_producers_.front();
waiting_producers_.pop_front();
lk.unlock();
- ctx->set_ready( producer_ctx);
+ active_ctx->schedule( producer_ctx);
}
return std::move( value);
- } else if ( channel_op_status::empty == status) {
- BOOST_ASSERT( ! ctx->wait_is_linked() );
- detail::spinlock_lock lk{ splk_ };
- if ( is_closed() ) {
- throw fiber_error{
- std::make_error_code( std::errc::operation_not_permitted),
- "boost fiber: channel is closed" };
- }
- if ( ! is_empty_() ) {
- continue;
- }
- ctx->wait_link( waiting_consumers_);
- // suspend this consumer
- ctx->suspend( lk);
- } else {
- BOOST_ASSERT( channel_op_status::closed == status);
- throw fiber_error{
- std::make_error_code( std::errc::operation_not_permitted),
- "boost fiber: channel is closed" };
}
}
}
@@ -465,41 +363,35 @@ public:
template< typename Clock, typename Duration >
channel_op_status pop_wait_until( value_type & value,
std::chrono::time_point< Clock, Duration > const& timeout_time_) {
- std::chrono::steady_clock::time_point timeout_time( detail::convert( timeout_time_) );
- context * ctx{ context::active() };
+ context * active_ctx = context::active();
+ std::chrono::steady_clock::time_point timeout_time = detail::convert( timeout_time_);
for (;;) {
- channel_op_status status{ try_pop_( value) };
- if ( channel_op_status::success == status) {
- detail::spinlock_lock lk{ splk_ };
+ detail::spinlock_lock lk{ splk_ };
+ if ( is_empty_() ) {
+ if ( is_closed_() ) {
+ return channel_op_status::closed;
+ } else {
+ active_ctx->wait_link( waiting_consumers_);
+ // suspend this consumer
+ if ( ! active_ctx->wait_until( timeout_time, lk) ) {
+ // relock local lk
+ lk.lock();
+ // remove from waiting-queue
+ waiting_consumers_.remove( * active_ctx);
+ return channel_op_status::timeout;
+ }
+ }
+ } else {
+ value = std::move( slots_[cidx_]);
+ cidx_ = (cidx_ + 1) % capacity_;
// notify one waiting producer
if ( ! waiting_producers_.empty() ) {
- context * producer_ctx{ & waiting_producers_.front() };
+ context * producer_ctx = & waiting_producers_.front();
waiting_producers_.pop_front();
lk.unlock();
- context::active()->set_ready( producer_ctx);
- }
- return status;
- } else if ( channel_op_status::empty == status) {
- BOOST_ASSERT( ! ctx->wait_is_linked() );
- detail::spinlock_lock lk{ splk_ };
- if ( is_closed() ) {
- return channel_op_status::closed;
- }
- if ( ! is_empty_() ) {
- continue;
+ active_ctx->schedule( producer_ctx);
}
- ctx->wait_link( waiting_consumers_);
- // suspend this consumer
- if ( ! ctx->wait_until( timeout_time, lk) ) {
- // relock local lk
- lk.lock();
- // remove from waiting-queue
- ctx->wait_unlink();
- return channel_op_status::timeout;
- }
- } else {
- BOOST_ASSERT( channel_op_status::closed == status);
- return status;
+ return channel_op_status::success;
}
}
}