summaryrefslogtreecommitdiff
path: root/boost/fiber/unbuffered_channel.hpp
diff options
context:
space:
mode:
Diffstat (limited to 'boost/fiber/unbuffered_channel.hpp')
-rw-r--r--boost/fiber/unbuffered_channel.hpp540
1 files changed, 540 insertions, 0 deletions
diff --git a/boost/fiber/unbuffered_channel.hpp b/boost/fiber/unbuffered_channel.hpp
new file mode 100644
index 0000000000..582d9ae5a7
--- /dev/null
+++ b/boost/fiber/unbuffered_channel.hpp
@@ -0,0 +1,540 @@
+
+// Copyright Oliver Kowalke 2016.
+// Distributed under the Boost Software License, Version 1.0.
+// (See accompanying file LICENSE_1_0.txt or copy at
+// http://www.boost.org/LICENSE_1_0.txt)
+
+#ifndef BOOST_FIBERS_UNBUFFERED_CHANNEL_H
+#define BOOST_FIBERS_UNBUFFERED_CHANNEL_H
+
+#include <atomic>
+#include <chrono>
+#include <cstddef>
+#include <cstdint>
+#include <memory>
+#include <vector>
+
+#include <boost/config.hpp>
+
+#include <boost/fiber/channel_op_status.hpp>
+#include <boost/fiber/context.hpp>
+#include <boost/fiber/detail/config.hpp>
+#include <boost/fiber/detail/convert.hpp>
+#include <boost/fiber/detail/spinlock.hpp>
+#include <boost/fiber/exceptions.hpp>
+
+#ifdef BOOST_HAS_ABI_HEADERS
+# include BOOST_ABI_PREFIX
+#endif
+
+namespace boost {
+namespace fibers {
+
+template< typename T >
+class unbuffered_channel {
+public:
+ typedef T value_type;
+
+private:
+ typedef context::wait_queue_t wait_queue_type;
+
+ struct alignas(cache_alignment) slot {
+ value_type value;
+ context * ctx;
+
+ slot( value_type const& value_, context * ctx_) :
+ value{ value_ },
+ ctx{ ctx_ } {
+ }
+
+ slot( value_type && value_, context * ctx_) :
+ value{ std::move( value_) },
+ ctx{ ctx_ } {
+ }
+ };
+
+ // shared cacheline
+ alignas(cache_alignment) std::atomic< slot * > slot_{ nullptr };
+ // shared cacheline
+ alignas(cache_alignment) std::atomic_bool closed_{ false };
+ mutable detail::spinlock splk_{};
+ wait_queue_type waiting_producers_{};
+ wait_queue_type waiting_consumers_{};
+ char pad_[cacheline_length];
+
+ bool is_empty_() {
+ return nullptr == slot_.load( std::memory_order_acquire);
+ }
+
+ bool try_push_( slot * own_slot) {
+ for (;;) {
+ slot * s{ slot_.load( std::memory_order_acquire) };
+ if ( nullptr == s) {
+ if ( ! slot_.compare_exchange_strong( s, own_slot, std::memory_order_acq_rel) ) {
+ continue;
+ }
+ return true;
+ } else {
+ return false;
+ }
+ }
+ }
+
+ slot * try_pop_() {
+ slot * nil_slot{ nullptr };
+ for (;;) {
+ slot * s{ slot_.load( std::memory_order_acquire) };
+ if ( nullptr != s) {
+ if ( ! slot_.compare_exchange_strong( s, nil_slot, std::memory_order_acq_rel) ) {
+ continue;}
+ }
+ return s;
+ }
+ }
+
+public:
+ unbuffered_channel() = default;
+
+ ~unbuffered_channel() {
+ close();
+ slot * s{ nullptr };
+ if ( nullptr != ( s = try_pop_() ) ) {
+ BOOST_ASSERT( nullptr != s);
+ BOOST_ASSERT( nullptr != s->ctx);
+ // value will be destructed in the context of the waiting fiber
+ context::active()->set_ready( s->ctx);
+ }
+ }
+
+ unbuffered_channel( unbuffered_channel const&) = delete;
+ unbuffered_channel & operator=( unbuffered_channel const&) = delete;
+
+ bool is_closed() const noexcept {
+ return closed_.load( std::memory_order_acquire);
+ }
+
+ void close() noexcept {
+ context * ctx{ context::active() };
+ detail::spinlock_lock lk{ splk_ };
+ closed_.store( true, std::memory_order_release);
+ // notify all waiting producers
+ while ( ! waiting_producers_.empty() ) {
+ context * producer_ctx{ & waiting_producers_.front() };
+ waiting_producers_.pop_front();
+ ctx->set_ready( producer_ctx);
+ }
+ // notify all waiting consumers
+ while ( ! waiting_consumers_.empty() ) {
+ context * consumer_ctx{ & waiting_consumers_.front() };
+ waiting_consumers_.pop_front();
+ ctx->set_ready( consumer_ctx);
+ }
+ }
+
+ channel_op_status push( value_type const& value) {
+ context * ctx{ context::active() };
+ slot s{ value, ctx };
+ for (;;) {
+ if ( is_closed() ) {
+ return channel_op_status::closed;
+ }
+ if ( try_push_( & s) ) {
+ detail::spinlock_lock lk{ splk_ };
+ // notify one waiting consumer
+ if ( ! waiting_consumers_.empty() ) {
+ context * consumer_ctx{ & waiting_consumers_.front() };
+ waiting_consumers_.pop_front();
+ ctx->set_ready( consumer_ctx);
+ }
+ // suspend till value has been consumed
+ ctx->suspend( lk);
+ // resumed, value has been consumed
+ return channel_op_status::success;
+ } else {
+ BOOST_ASSERT( ! ctx->wait_is_linked() );
+ detail::spinlock_lock lk{ splk_ };
+ if ( is_closed() ) {
+ return channel_op_status::closed;
+ }
+ if ( is_empty_() ) {
+ continue;
+ }
+ ctx->wait_link( waiting_producers_);
+ // suspend this producer
+ ctx->suspend( lk);
+ // resumed, slot mabye free
+ }
+ }
+ }
+
+ channel_op_status push( value_type && value) {
+ context * ctx{ context::active() };
+ slot s{ std::move( value), ctx };
+ for (;;) {
+ if ( is_closed() ) {
+ return channel_op_status::closed;
+ }
+ if ( try_push_( & s) ) {
+ detail::spinlock_lock lk{ splk_ };
+ // notify one waiting consumer
+ if ( ! waiting_consumers_.empty() ) {
+ context * consumer_ctx{ & waiting_consumers_.front() };
+ waiting_consumers_.pop_front();
+ ctx->set_ready( consumer_ctx);
+ }
+ // suspend till value has been consumed
+ ctx->suspend( lk);
+ // resumed, value has been consumed
+ return channel_op_status::success;
+ } else {
+ BOOST_ASSERT( ! ctx->wait_is_linked() );
+ detail::spinlock_lock lk{ splk_ };
+ if ( is_closed() ) {
+ return channel_op_status::closed;
+ }
+ if ( is_empty_() ) {
+ continue;
+ }
+ ctx->wait_link( waiting_producers_);
+ // suspend this producer
+ ctx->suspend( lk);
+ // resumed, slot mabye free
+ }
+ }
+ }
+
+ template< typename Rep, typename Period >
+ channel_op_status push_wait_for( value_type const& value,
+ std::chrono::duration< Rep, Period > const& timeout_duration) {
+ return push_wait_until( value,
+ std::chrono::steady_clock::now() + timeout_duration);
+ }
+
+ template< typename Rep, typename Period >
+ channel_op_status push_wait_for( value_type && value,
+ std::chrono::duration< Rep, Period > const& timeout_duration) {
+ return push_wait_until( std::forward< value_type >( value),
+ std::chrono::steady_clock::now() + timeout_duration);
+ }
+
+ template< typename Clock, typename Duration >
+ channel_op_status push_wait_until( value_type const& value,
+ std::chrono::time_point< Clock, Duration > const& timeout_time_) {
+ std::chrono::steady_clock::time_point timeout_time( detail::convert( timeout_time_) );
+ context * ctx{ context::active() };
+ slot s{ value, ctx };
+ for (;;) {
+ if ( is_closed() ) {
+ return channel_op_status::closed;
+ }
+ if ( try_push_( & s) ) {
+ detail::spinlock_lock lk{ splk_ };
+ // notify one waiting consumer
+ if ( ! waiting_consumers_.empty() ) {
+ context * consumer_ctx{ & waiting_consumers_.front() };
+ waiting_consumers_.pop_front();
+ ctx->set_ready( consumer_ctx);
+ }
+ // suspend this producer
+ if ( ! ctx->wait_until( timeout_time, lk) ) {
+ // clear slot
+ slot * nil_slot{ nullptr }, * own_slot{ & s };
+ slot_.compare_exchange_strong( own_slot, nil_slot, std::memory_order_acq_rel);
+ // relock local lk
+ lk.lock();
+ // remove from waiting-queue
+ ctx->wait_unlink();
+ // resumed, value has not been consumed
+ return channel_op_status::timeout;
+ }
+ // resumed, value has been consumed
+ return channel_op_status::success;
+ } else {
+ BOOST_ASSERT( ! ctx->wait_is_linked() );
+ detail::spinlock_lock lk{ splk_ };
+ if ( is_closed() ) {
+ return channel_op_status::closed;
+ }
+ if ( is_empty_() ) {
+ continue;
+ }
+ ctx->wait_link( waiting_producers_);
+ // suspend this producer
+ if ( ! ctx->wait_until( timeout_time, lk) ) {
+ // relock local lk
+ lk.lock();
+ // remove from waiting-queue
+ ctx->wait_unlink();
+ return channel_op_status::timeout;
+ }
+ // resumed, slot maybe free
+ }
+ }
+ }
+
+ template< typename Clock, typename Duration >
+ channel_op_status push_wait_until( value_type && value,
+ std::chrono::time_point< Clock, Duration > const& timeout_time_) {
+ std::chrono::steady_clock::time_point timeout_time( detail::convert( timeout_time_) );
+ context * ctx{ context::active() };
+ slot s{ std::move( value), ctx };
+ for (;;) {
+ if ( is_closed() ) {
+ return channel_op_status::closed;
+ }
+ if ( try_push_( & s) ) {
+ detail::spinlock_lock lk{ splk_ };
+ // notify one waiting consumer
+ if ( ! waiting_consumers_.empty() ) {
+ context * consumer_ctx{ & waiting_consumers_.front() };
+ waiting_consumers_.pop_front();
+ ctx->set_ready( consumer_ctx);
+ }
+ // suspend this producer
+ if ( ! ctx->wait_until( timeout_time, lk) ) {
+ // clear slot
+ slot * nil_slot{ nullptr }, * own_slot{ & s };
+ slot_.compare_exchange_strong( own_slot, nil_slot, std::memory_order_acq_rel);
+ // relock local lk
+ lk.lock();
+ // remove from waiting-queue
+ ctx->wait_unlink();
+ // resumed, value has not been consumed
+ return channel_op_status::timeout;
+ }
+ // resumed, value has been consumed
+ return channel_op_status::success;
+ } else {
+ BOOST_ASSERT( ! ctx->wait_is_linked() );
+ detail::spinlock_lock lk{ splk_ };
+ if ( is_closed() ) {
+ return channel_op_status::closed;
+ }
+ if ( is_empty_() ) {
+ continue;
+ }
+ ctx->wait_link( waiting_producers_);
+ // suspend this producer
+ if ( ! ctx->wait_until( timeout_time, lk) ) {
+ // relock local lk
+ lk.lock();
+ // remove from waiting-queue
+ ctx->wait_unlink();
+ return channel_op_status::timeout;
+ }
+ // resumed, slot maybe free
+ }
+ }
+ }
+
+ channel_op_status pop( value_type & value) {
+ context * ctx{ context::active() };
+ slot * s{ nullptr };
+ for (;;) {
+ if ( nullptr != ( s = try_pop_() ) ) {
+ {
+ detail::spinlock_lock lk{ splk_ };
+ // notify one waiting producer
+ if ( ! waiting_producers_.empty() ) {
+ context * producer_ctx{ & waiting_producers_.front() };
+ waiting_producers_.pop_front();
+ lk.unlock();
+ ctx->set_ready( producer_ctx);
+ }
+ }
+ // consume value
+ value = std::move( s->value);
+ // resume suspended producer
+ ctx->set_ready( s->ctx);
+ return channel_op_status::success;
+ } else {
+ BOOST_ASSERT( ! ctx->wait_is_linked() );
+ detail::spinlock_lock lk{ splk_ };
+ if ( is_closed() ) {
+ return channel_op_status::closed;
+ }
+ if ( ! is_empty_() ) {
+ continue;
+ }
+ ctx->wait_link( waiting_consumers_);
+ // suspend this consumer
+ ctx->suspend( lk);
+ // resumed, slot mabye set
+ }
+ }
+ }
+
+ value_type value_pop() {
+ context * ctx{ context::active() };
+ slot * s{ nullptr };
+ for (;;) {
+ if ( nullptr != ( s = try_pop_() ) ) {
+ {
+ detail::spinlock_lock lk{ splk_ };
+ // notify one waiting producer
+ if ( ! waiting_producers_.empty() ) {
+ context * producer_ctx{ & waiting_producers_.front() };
+ waiting_producers_.pop_front();
+ lk.unlock();
+ ctx->set_ready( producer_ctx);
+ }
+ }
+ // consume value
+ value_type value{ std::move( s->value) };
+ // resume suspended producer
+ ctx->set_ready( s->ctx);
+ return std::move( value);
+ } else {
+ BOOST_ASSERT( ! ctx->wait_is_linked() );
+ detail::spinlock_lock lk{ splk_ };
+ if ( is_closed() ) {
+ throw fiber_error{
+ std::make_error_code( std::errc::operation_not_permitted),
+ "boost fiber: channel is closed" };
+ }
+ if ( ! is_empty_() ) {
+ continue;
+ }
+ ctx->wait_link( waiting_consumers_);
+ // suspend this consumer
+ ctx->suspend( lk);
+ // resumed, slot mabye set
+ }
+ }
+ }
+
+ template< typename Rep, typename Period >
+ channel_op_status pop_wait_for( value_type & value,
+ std::chrono::duration< Rep, Period > const& timeout_duration) {
+ return pop_wait_until( value,
+ std::chrono::steady_clock::now() + timeout_duration);
+ }
+
+ template< typename Clock, typename Duration >
+ channel_op_status pop_wait_until( value_type & value,
+ std::chrono::time_point< Clock, Duration > const& timeout_time_) {
+ std::chrono::steady_clock::time_point timeout_time( detail::convert( timeout_time_) );
+ context * ctx{ context::active() };
+ slot * s{ nullptr };
+ for (;;) {
+ if ( nullptr != ( s = try_pop_() ) ) {
+ {
+ detail::spinlock_lock lk{ splk_ };
+ // notify one waiting producer
+ if ( ! waiting_producers_.empty() ) {
+ context * producer_ctx{ & waiting_producers_.front() };
+ waiting_producers_.pop_front();
+ lk.unlock();
+ ctx->set_ready( producer_ctx);
+ }
+ }
+ // consume value
+ value = std::move( s->value);
+ // resume suspended producer
+ ctx->set_ready( s->ctx);
+ return channel_op_status::success;
+ } else {
+ BOOST_ASSERT( ! ctx->wait_is_linked() );
+ detail::spinlock_lock lk{ splk_ };
+ if ( is_closed() ) {
+ return channel_op_status::closed;
+ }
+ if ( ! is_empty_() ) {
+ continue;
+ }
+ ctx->wait_link( waiting_consumers_);
+ // suspend this consumer
+ if ( ! ctx->wait_until( timeout_time, lk) ) {
+ // relock local lk
+ lk.lock();
+ // remove from waiting-queue
+ ctx->wait_unlink();
+ return channel_op_status::timeout;
+ }
+ }
+ }
+ }
+
+ class iterator : public std::iterator< std::input_iterator_tag, typename std::remove_reference< value_type >::type > {
+ private:
+ typedef typename std::aligned_storage< sizeof( value_type), alignof( value_type) >::type storage_type;
+
+ unbuffered_channel * chan_{ nullptr };
+ storage_type storage_;
+
+ void increment_() {
+ BOOST_ASSERT( nullptr != chan_);
+ try {
+ ::new ( static_cast< void * >( std::addressof( storage_) ) ) value_type{ chan_->value_pop() };
+ } catch ( fiber_error const&) {
+ chan_ = nullptr;
+ }
+ }
+
+ public:
+ typedef typename iterator::pointer pointer_t;
+ typedef typename iterator::reference reference_t;
+
+ iterator() noexcept = default;
+
+ explicit iterator( unbuffered_channel< T > * chan) noexcept :
+ chan_{ chan } {
+ increment_();
+ }
+
+ iterator( iterator const& other) noexcept :
+ chan_{ other.chan_ } {
+ }
+
+ iterator & operator=( iterator const& other) noexcept {
+ if ( this == & other) return * this;
+ chan_ = other.chan_;
+ return * this;
+ }
+
+ bool operator==( iterator const& other) const noexcept {
+ return other.chan_ == chan_;
+ }
+
+ bool operator!=( iterator const& other) const noexcept {
+ return other.chan_ != chan_;
+ }
+
+ iterator & operator++() {
+ increment_();
+ return * this;
+ }
+
+ iterator operator++( int) = delete;
+
+ reference_t operator*() noexcept {
+ return * reinterpret_cast< value_type * >( std::addressof( storage_) );
+ }
+
+ pointer_t operator->() noexcept {
+ return reinterpret_cast< value_type * >( std::addressof( storage_) );
+ }
+ };
+
+ friend class iterator;
+};
+
+template< typename T >
+typename unbuffered_channel< T >::iterator
+begin( unbuffered_channel< T > & chan) {
+ return typename unbuffered_channel< T >::iterator( & chan);
+}
+
+template< typename T >
+typename unbuffered_channel< T >::iterator
+end( unbuffered_channel< T > &) {
+ return typename unbuffered_channel< T >::iterator();
+}
+
+}}
+
+#ifdef BOOST_HAS_ABI_HEADERS
+# include BOOST_ABI_SUFFIX
+#endif
+
+#endif // BOOST_FIBERS_UNBUFFERED_CHANNEL_H