diff options
author | DongHun Kwak <dh0128.kwak@samsung.com> | 2017-09-13 11:08:07 +0900 |
---|---|---|
committer | DongHun Kwak <dh0128.kwak@samsung.com> | 2017-09-13 11:09:00 +0900 |
commit | b5c87084afaef42b2d058f68091be31988a6a874 (patch) | |
tree | adef9a65870a41181687e11d57fdf98e7629de3c /boost/fiber | |
parent | 34bd32e225e2a8a94104489b31c42e5801cc1f4a (diff) | |
download | boost-b5c87084afaef42b2d058f68091be31988a6a874.tar.gz boost-b5c87084afaef42b2d058f68091be31988a6a874.tar.bz2 boost-b5c87084afaef42b2d058f68091be31988a6a874.zip |
Imported Upstream version 1.64.0upstream/1.64.0
Change-Id: Id9212edd016dd55f21172c427aa7894d1d24148b
Signed-off-by: DongHun Kwak <dh0128.kwak@samsung.com>
Diffstat (limited to 'boost/fiber')
43 files changed, 1207 insertions, 2001 deletions
diff --git a/boost/fiber/algo/algorithm.hpp b/boost/fiber/algo/algorithm.hpp index 515fc5ed5c..9b846e774b 100644 --- a/boost/fiber/algo/algorithm.hpp +++ b/boost/fiber/algo/algorithm.hpp @@ -43,11 +43,11 @@ struct BOOST_FIBERS_DECL algorithm { class BOOST_FIBERS_DECL algorithm_with_properties_base : public algorithm { public: // called by fiber_properties::notify() -- don't directly call - virtual void property_change_( context * f, fiber_properties * props) noexcept = 0; + virtual void property_change_( context * ctx, fiber_properties * props) noexcept = 0; protected: - static fiber_properties* get_properties( context * f) noexcept; - static void set_properties( context * f, fiber_properties * p) noexcept; + static fiber_properties* get_properties( context * ctx) noexcept; + static void set_properties( context * ctx, fiber_properties * p) noexcept; }; template< typename PROPS > @@ -58,18 +58,18 @@ struct algorithm_with_properties : public algorithm_with_properties_base { // must override awakened() with properties parameter instead. Otherwise // you'd have to remember to start every subclass awakened() override // with: algorithm_with_properties<PROPS>::awakened(fb); - virtual void awakened( context * f) noexcept override final { - fiber_properties * props = super::get_properties( f); + virtual void awakened( context * ctx) noexcept override final { + fiber_properties * props = super::get_properties( ctx); if ( nullptr == props) { // TODO: would be great if PROPS could be allocated on the new // fiber's stack somehow - props = new_properties( f); + props = new_properties( ctx); // It is not good for new_properties() to return 0. - BOOST_ASSERT_MSG(props, "new_properties() must return non-NULL"); + BOOST_ASSERT_MSG( props, "new_properties() must return non-NULL"); // new_properties() must return instance of (a subclass of) PROPS BOOST_ASSERT_MSG( dynamic_cast< PROPS * >( props), "new_properties() must return properties class"); - super::set_properties( f, props); + super::set_properties( ctx, props); } // Set algo_ again every time this fiber becomes READY. That // handles the case of a fiber migrating to a new thread with a new @@ -77,31 +77,31 @@ struct algorithm_with_properties : public algorithm_with_properties_base { props->set_algorithm( this); // Okay, now forward the call to subclass override. - awakened( f, properties(f) ); + awakened( ctx, properties( ctx) ); } // subclasses override this method instead of the original awakened() - virtual void awakened( context *, PROPS& ) noexcept = 0; + virtual void awakened( context *, PROPS &) noexcept = 0; // used for all internal calls - PROPS & properties( context * f) noexcept { - return static_cast< PROPS & >( * super::get_properties( f) ); + PROPS & properties( context * ctx) noexcept { + return static_cast< PROPS & >( * super::get_properties( ctx) ); } // override this to be notified by PROPS::notify() - virtual void property_change( context * f, PROPS & props) noexcept { + virtual void property_change( context * ctx, PROPS & props) noexcept { } // implementation for algorithm_with_properties_base method - void property_change_( context * f, fiber_properties * props ) noexcept override final { - property_change( f, * static_cast< PROPS * >( props) ); + void property_change_( context * ctx, fiber_properties * props) noexcept override final { + property_change( ctx, * static_cast< PROPS * >( props) ); } // Override this to customize instantiation of PROPS, e.g. use a different // allocator. Each PROPS instance is associated with a particular // context. - virtual fiber_properties * new_properties( context * f) { - return new PROPS( f); + virtual fiber_properties * new_properties( context * ctx) { + return new PROPS( ctx); } }; diff --git a/boost/fiber/algo/detail/chase_lev_queue.hpp b/boost/fiber/algo/detail/chase_lev_queue.hpp deleted file mode 100644 index f51556020d..0000000000 --- a/boost/fiber/algo/detail/chase_lev_queue.hpp +++ /dev/null @@ -1,172 +0,0 @@ - -// Copyright Oliver Kowalke 2013. -// Distributed under the Boost Software License, Version 1.0. -// (See accompanying file LICENSE_1_0.txt or copy at -// http://www.boost.org/LICENSE_1_0.txt) - -#ifndef BOOST_FIBERS_ALGO_DETAIL_CHASE_LEV_QUEUE_H -#define BOOST_FIBERS_ALGO_DETAIL_CHASE_LEV_QUEUE_H - -#include <atomic> -#include <cstddef> -#include <memory> -#include <type_traits> -#include <vector> - -#include <boost/assert.hpp> -#include <boost/config.hpp> - -#include <boost/fiber/detail/config.hpp> -#include <boost/fiber/context.hpp> - -// David Chase and Yossi Lev. Dynamic circular work-stealing deque. -// In SPAA ’05: Proceedings of the seventeenth annual ACM symposium -// on Parallelism in algorithms and architectures, pages 21–28, -// New York, NY, USA, 2005. ACM. -// -// Nhat Minh Lê, Antoniu Pop, Albert Cohen, and Francesco Zappa Nardelli. 2013. -// Correct and efficient work-stealing for weak memory models. -// In Proceedings of the 18th ACM SIGPLAN symposium on Principles and practice -// of parallel programming (PPoPP '13). ACM, New York, NY, USA, 69-80. -namespace boost { -namespace fibers { -namespace algo { -namespace detail { - -class chase_lev_queue { -private: - class circular_buffer { - private: - typedef typename std::aligned_storage< sizeof( context *), alignof( context *) >::type storage_t; - - int64_t size_; - context ** items; - chase_lev_queue * queue_; - - public: - circular_buffer( int64_t size, chase_lev_queue * queue) noexcept : - size_{ size }, - items{ reinterpret_cast< context ** >( new storage_t[size_] ) }, - queue_{ queue } { - } - - ~circular_buffer() { - delete [] reinterpret_cast< storage_t * >( items); - } - - int64_t size() const noexcept { - return size_; - } - - context * get( int64_t idx) noexcept { - BOOST_ASSERT( 0 <= idx); - return * (items + (idx & (size() - 1))); - } - - void put( int64_t idx, context * ctx) noexcept { - BOOST_ASSERT( 0 <= idx); - * (items + (idx & (size() - 1))) = ctx; - } - - circular_buffer * grow( int64_t top, int64_t bottom) { - BOOST_ASSERT( 0 <= top); - BOOST_ASSERT( 0 <= bottom); - circular_buffer * buffer = new circular_buffer{ size() * 2, queue_ }; - queue_->old_buffers_.push_back( this); - for ( int64_t i = top; i != bottom; ++i) { - buffer->put( i, get( i) ); - } - return buffer; - } - }; - - std::atomic< int64_t > top_{ 0 }; - std::atomic< int64_t > bottom_{ 0 }; - std::atomic< circular_buffer * > buffer_; - std::vector< circular_buffer * > old_buffers_; - -public: - chase_lev_queue() : - buffer_{ new circular_buffer{ 1024, this } } { - old_buffers_.resize( 10); - } - - ~chase_lev_queue() { - delete buffer_.load( std::memory_order_seq_cst); - for ( circular_buffer * buffer : old_buffers_) { - delete buffer; - } - } - - chase_lev_queue( chase_lev_queue const&) = delete; - chase_lev_queue( chase_lev_queue &&) = delete; - - chase_lev_queue & operator=( chase_lev_queue const&) = delete; - chase_lev_queue & operator=( chase_lev_queue &&) = delete; - - bool empty() const noexcept { - int64_t bottom = bottom_.load( std::memory_order_relaxed); - int64_t top = top_.load( std::memory_order_relaxed); - return bottom <= top; - } - - void push( context * ctx) { - int64_t bottom = bottom_.load( std::memory_order_relaxed); - int64_t top = top_.load( std::memory_order_acquire); - circular_buffer * buffer = buffer_.load( std::memory_order_relaxed); - if ( (bottom - top) > buffer->size() - 1) { - // queue is full - buffer = buffer->grow( top, bottom); - buffer_.store( buffer, std::memory_order_release); - } - buffer->put( bottom, ctx); - std::atomic_thread_fence( std::memory_order_release); - bottom_.store( bottom + 1, std::memory_order_relaxed); - } - - context * pop() { - int64_t bottom = bottom_.load( std::memory_order_relaxed) - 1; - circular_buffer * buffer = buffer_.load( std::memory_order_relaxed); - bottom_.store( bottom, std::memory_order_relaxed); - std::atomic_thread_fence( std::memory_order_seq_cst); - int64_t top = top_.load( std::memory_order_relaxed); - context * ctx = nullptr; - if ( top <= bottom) { - // queue is not empty - ctx = buffer->get( bottom); - // last element - if ( top == bottom) { - if ( ! top_.compare_exchange_strong( top, top + 1, - std::memory_order_seq_cst, std::memory_order_relaxed) ) { - return nullptr; - } - bottom_.store( bottom + 1, std::memory_order_relaxed); - } - } else { - // queue is empty - bottom_.store( bottom + 1, std::memory_order_relaxed); - } - return ctx; - } - - context * steal() { - int64_t top = top_.load( std::memory_order_acquire); - std::atomic_thread_fence( std::memory_order_seq_cst); - int64_t bottom = bottom_.load( std::memory_order_acquire); - context * ctx = nullptr; - if ( top < bottom) { - // queue is not empty - circular_buffer * buffer = buffer_.load( std::memory_order_consume); - ctx = buffer->get( top); - if ( ! top_.compare_exchange_strong( top, top + 1, - std::memory_order_seq_cst, std::memory_order_relaxed) ) { - return nullptr; - } - } - return ctx; - } -}; - -}}}} - -#endif // #define BOOST_FIBERS_ALGO_DETAIL_CHASE_LEV_QUEUE_H diff --git a/boost/fiber/algo/round_robin.hpp b/boost/fiber/algo/round_robin.hpp index 038b424529..e384982567 100644 --- a/boost/fiber/algo/round_robin.hpp +++ b/boost/fiber/algo/round_robin.hpp @@ -32,9 +32,9 @@ namespace algo { class BOOST_FIBERS_DECL round_robin : public algorithm { private: - typedef scheduler::ready_queue_t rqueue_t; + typedef scheduler::ready_queue_type rqueue_type; - rqueue_t rqueue_{}; + rqueue_type rqueue_{}; std::mutex mtx_{}; std::condition_variable cnd_{}; bool flag_{ false }; diff --git a/boost/fiber/algo/shared_work.hpp b/boost/fiber/algo/shared_work.hpp index e648c5b19f..23bc926e55 100644 --- a/boost/fiber/algo/shared_work.hpp +++ b/boost/fiber/algo/shared_work.hpp @@ -34,17 +34,17 @@ namespace algo { class BOOST_FIBERS_DECL shared_work : public algorithm { private: - typedef std::deque< context * > rqueue_t; - typedef scheduler::ready_queue_t lqueue_t; + typedef std::deque< context * > rqueue_type; + typedef scheduler::ready_queue_type lqueue_type; - static rqueue_t rqueue_; + static rqueue_type rqueue_; static std::mutex rqueue_mtx_; - lqueue_t lqueue_{}; + lqueue_type lqueue_{}; std::mutex mtx_{}; std::condition_variable cnd_{}; bool flag_{ false }; - bool suspend_; + bool suspend_{ false }; public: shared_work() = default; @@ -64,7 +64,7 @@ public: context * pick_next() noexcept; bool has_ready_fibers() const noexcept { - std::unique_lock< std::mutex > lock( rqueue_mtx_); + std::unique_lock< std::mutex > lock{ rqueue_mtx_ }; return ! rqueue_.empty() || ! lqueue_.empty(); } diff --git a/boost/fiber/algo/work_stealing.hpp b/boost/fiber/algo/work_stealing.hpp index 12364ece8c..66cadd12be 100644 --- a/boost/fiber/algo/work_stealing.hpp +++ b/boost/fiber/algo/work_stealing.hpp @@ -17,6 +17,7 @@ #include <boost/config.hpp> #include <boost/fiber/algo/algorithm.hpp> +#include <boost/fiber/detail/context_spinlock_queue.hpp> #include <boost/fiber/detail/context_spmc_queue.hpp> #include <boost/fiber/context.hpp> #include <boost/fiber/detail/config.hpp> @@ -32,45 +33,46 @@ namespace algo { class work_stealing : public algorithm { private: - typedef scheduler::ready_queue_t lqueue_t; - - static std::vector< work_stealing * > schedulers_; - - std::size_t idx_; - std::size_t max_idx_; - detail::context_spmc_queue rqueue_{}; - lqueue_t lqueue_{}; - std::mutex mtx_{}; - std::condition_variable cnd_{}; - bool flag_{ false }; - bool suspend_; + static std::vector< work_stealing * > schedulers_; + + std::size_t idx_; + std::size_t max_idx_; +#ifdef BOOST_FIBERS_USE_SPMC_QUEUE + alignas(cache_alignment) detail::context_spmc_queue rqueue_{}; +#else + alignas(cache_alignment) detail::context_spinlock_queue rqueue_{}; +#endif + std::mutex mtx_{}; + std::condition_variable cnd_{}; + bool flag_{ false }; + bool suspend_; static void init_( std::size_t max_idx); public: work_stealing( std::size_t max_idx, std::size_t idx, bool suspend = false); - work_stealing( work_stealing const&) = delete; - work_stealing( work_stealing &&) = delete; + work_stealing( work_stealing const&) = delete; + work_stealing( work_stealing &&) = delete; - work_stealing & operator=( work_stealing const&) = delete; - work_stealing & operator=( work_stealing &&) = delete; + work_stealing & operator=( work_stealing const&) = delete; + work_stealing & operator=( work_stealing &&) = delete; void awakened( context * ctx) noexcept; context * pick_next() noexcept; context * steal() noexcept { - return rqueue_.pop(); + return rqueue_.steal(); } bool has_ready_fibers() const noexcept { - return ! rqueue_.empty() || ! lqueue_.empty(); + return ! rqueue_.empty(); } - void suspend_until( std::chrono::steady_clock::time_point const& time_point) noexcept; + void suspend_until( std::chrono::steady_clock::time_point const& time_point) noexcept; - void notify() noexcept; + void notify() noexcept; }; }}} diff --git a/boost/fiber/barrier.hpp b/boost/fiber/barrier.hpp index debe7d725a..31a2cd617d 100644 --- a/boost/fiber/barrier.hpp +++ b/boost/fiber/barrier.hpp @@ -24,19 +24,19 @@ namespace fibers { class BOOST_FIBERS_DECL barrier { private: - std::size_t initial_; - std::size_t current_; - bool cycle_{ true }; - mutex mtx_{}; - condition_variable cond_{}; + std::size_t initial_; + std::size_t current_; + std::size_t cycle_{ 0 }; + mutex mtx_{}; + condition_variable cond_{}; public: - explicit barrier( std::size_t); + explicit barrier( std::size_t); barrier( barrier const&) = delete; barrier & operator=( barrier const&) = delete; - bool wait(); + bool wait(); }; }} diff --git a/boost/fiber/bounded_channel.hpp b/boost/fiber/bounded_channel.hpp deleted file mode 100644 index ac257b4ff5..0000000000 --- a/boost/fiber/bounded_channel.hpp +++ /dev/null @@ -1,433 +0,0 @@ - -// Copyright Oliver Kowalke 2013. -// Distributed under the Boost Software License, Version 1.0. -// (See accompanying file LICENSE_1_0.txt or copy at -// http://www.boost.org/LICENSE_1_0.txt) -// - -#ifndef BOOST_FIBERS_BOUNDED_CHANNEL_H -#define BOOST_FIBERS_BOUNDED_CHANNEL_H - -#warn "template bounded_channel is deprecated" - -#include <algorithm> -#include <atomic> -#include <chrono> -#include <cstddef> -#include <memory> -#include <mutex> -#include <system_error> -#include <utility> - -#include <boost/config.hpp> -#include <boost/intrusive_ptr.hpp> - -#include <boost/fiber/detail/config.hpp> -#include <boost/fiber/exceptions.hpp> -#include <boost/fiber/exceptions.hpp> -#include <boost/fiber/condition_variable.hpp> -#include <boost/fiber/mutex.hpp> -#include <boost/fiber/channel_op_status.hpp> - -#ifdef BOOST_HAS_ABI_HEADERS -# include BOOST_ABI_PREFIX -#endif - -namespace boost { -namespace fibers { - -template< typename T, - typename Allocator = std::allocator< T > -> -class bounded_channel { -public: - typedef T value_type; - -private: - struct node { - typedef intrusive_ptr< node > ptr_t; - typedef typename std::allocator_traits< Allocator >::template rebind_alloc< - node - > allocator_t; - typedef std::allocator_traits< allocator_t > allocator_traits_t; - -#if ! defined(BOOST_FIBERS_NO_ATOMICS) - std::atomic< std::size_t > use_count{ 0 }; -#else - std::size_t use_count{ 0 }; -#endif - allocator_t alloc; - T va; - ptr_t nxt{}; - - node( T const& t, allocator_t const& alloc_) noexcept : - alloc{ alloc_ }, - va{ t } { - } - - node( T && t, allocator_t & alloc_) noexcept : - alloc{ alloc_ }, - va{ std::move( t) } { - } - - friend - void intrusive_ptr_add_ref( node * p) noexcept { - ++p->use_count; - } - - friend - void intrusive_ptr_release( node * p) noexcept { - if ( 0 == --p->use_count) { - allocator_t alloc( p->alloc); - allocator_traits_t::destroy( alloc, p); - allocator_traits_t::deallocate( alloc, p, 1); - } - } - }; - - using ptr_t = typename node::ptr_t; - using allocator_t = typename node::allocator_t; - using allocator_traits_t = typename node::allocator_traits_t; - - enum class queue_status { - open = 0, - closed - }; - - allocator_t alloc_; - queue_status state_{ queue_status::open }; - std::size_t count_{ 0 }; - ptr_t head_{}; - ptr_t * tail_; - mutable mutex mtx_{}; - condition_variable not_empty_cond_{}; - condition_variable not_full_cond_{}; - std::size_t hwm_; - std::size_t lwm_; - - bool is_closed_() const noexcept { - return queue_status::closed == state_; - } - - void close_( std::unique_lock< boost::fibers::mutex > & lk) noexcept { - state_ = queue_status::closed; - lk.unlock(); - not_empty_cond_.notify_all(); - not_full_cond_.notify_all(); - } - - std::size_t size_() const noexcept { - return count_; - } - - bool is_empty_() const noexcept { - return ! head_; - } - - bool is_full_() const noexcept { - return count_ >= hwm_; - } - - channel_op_status push_( ptr_t new_node, - std::unique_lock< boost::fibers::mutex > & lk) { - if ( is_closed_() ) { - return channel_op_status::closed; - } - not_full_cond_.wait( lk, - [this](){ - return ! is_full_(); - }); - return push_and_notify_( new_node, lk); - } - - channel_op_status try_push_( ptr_t new_node, - std::unique_lock< boost::fibers::mutex > & lk) noexcept { - if ( is_closed_() ) { - return channel_op_status::closed; - } - if ( is_full_() ) { - return channel_op_status::full; - } - return push_and_notify_( new_node, lk); - } - - template< typename Clock, typename Duration > - channel_op_status push_wait_until_( ptr_t new_node, - std::chrono::time_point< Clock, Duration > const& timeout_time, - std::unique_lock< boost::fibers::mutex > & lk) { - if ( is_closed_() ) { - return channel_op_status::closed; - } - if ( ! not_full_cond_.wait_until( lk, timeout_time, - [this](){ - return ! is_full_(); - })) { - return channel_op_status::timeout; - } - return push_and_notify_( new_node, lk); - } - - channel_op_status push_and_notify_( ptr_t new_node, - std::unique_lock< boost::fibers::mutex > & lk) noexcept { - push_tail_( new_node); - lk.unlock(); - not_empty_cond_.notify_one(); - return channel_op_status::success; - } - - void push_tail_( ptr_t new_node) noexcept { - * tail_ = new_node; - tail_ = & new_node->nxt; - ++count_; - } - - value_type value_pop_( std::unique_lock< boost::fibers::mutex > & lk) { - BOOST_ASSERT( ! is_empty_() ); - auto old_head = pop_head_(); - if ( size_() <= lwm_) { - if ( lwm_ == hwm_) { - lk.unlock(); - not_full_cond_.notify_one(); - } else { - lk.unlock(); - // more than one producer could be waiting - // to push a value - not_full_cond_.notify_all(); - } - } - return std::move( old_head->va); - } - - ptr_t pop_head_() noexcept { - auto old_head = head_; - head_ = old_head->nxt; - if ( ! head_) { - tail_ = & head_; - } - old_head->nxt.reset(); - --count_; - return old_head; - } - -public: - bounded_channel( std::size_t hwm, std::size_t lwm, - Allocator const& alloc = Allocator() ) : - alloc_{ alloc }, - tail_{ & head_ }, - hwm_{ hwm }, - lwm_{ lwm } { - if ( hwm_ <= lwm_) { - throw fiber_error( std::make_error_code( std::errc::invalid_argument), - "boost fiber: high-watermark is less than or equal to low-watermark for bounded_channel"); - } - if ( 0 == hwm) { - throw fiber_error( std::make_error_code( std::errc::invalid_argument), - "boost fiber: high-watermark is zero"); - } - } - - bounded_channel( std::size_t wm, - Allocator const& alloc = Allocator() ) : - alloc_{ alloc }, - tail_{ & head_ }, - hwm_{ wm }, - lwm_{ wm - 1 } { - if ( 0 == wm) { - throw fiber_error( std::make_error_code( std::errc::invalid_argument), - "boost fiber: watermark is zero"); - } - } - - bounded_channel( bounded_channel const&) = delete; - bounded_channel & operator=( bounded_channel const&) = delete; - - std::size_t upper_bound() const noexcept { - return hwm_; - } - - std::size_t lower_bound() const noexcept { - return lwm_; - } - - void close() noexcept { - std::unique_lock< mutex > lk( mtx_); - close_( lk); - } - - channel_op_status push( value_type const& va) { - typename allocator_traits_t::pointer ptr{ - allocator_traits_t::allocate( alloc_, 1) }; - try { - allocator_traits_t::construct( alloc_, ptr, va, alloc_); - } catch (...) { - allocator_traits_t::deallocate( alloc_, ptr, 1); - throw; - } - std::unique_lock< mutex > lk( mtx_); - return push_( { detail::convert( ptr) }, lk); - } - - channel_op_status push( value_type && va) { - typename allocator_traits_t::pointer ptr{ - allocator_traits_t::allocate( alloc_, 1) }; - try { - allocator_traits_t::construct( - alloc_, ptr, std::move( va), alloc_); - } catch (...) { - allocator_traits_t::deallocate( alloc_, ptr, 1); - throw; - } - std::unique_lock< mutex > lk( mtx_); - return push_( { detail::convert( ptr) }, lk); - } - - template< typename Rep, typename Period > - channel_op_status push_wait_for( value_type const& va, - std::chrono::duration< Rep, Period > const& timeout_duration) { - return push_wait_until( va, - std::chrono::steady_clock::now() + timeout_duration); - } - - template< typename Rep, typename Period > - channel_op_status push_wait_for( value_type && va, - std::chrono::duration< Rep, Period > const& timeout_duration) { - return push_wait_until( std::forward< value_type >( va), - std::chrono::steady_clock::now() + timeout_duration); - } - - template< typename Clock, typename Duration > - channel_op_status push_wait_until( value_type const& va, - std::chrono::time_point< Clock, Duration > const& timeout_time) { - typename allocator_traits_t::pointer ptr{ - allocator_traits_t::allocate( alloc_, 1) }; - try { - allocator_traits_t::construct( alloc_, ptr, va, alloc_); - } catch (...) { - allocator_traits_t::deallocate( alloc_, ptr, 1); - throw; - } - std::unique_lock< mutex > lk( mtx_); - return push_wait_until_( { detail::convert( ptr) }, timeout_time, lk); - } - - template< typename Clock, typename Duration > - channel_op_status push_wait_until( value_type && va, - std::chrono::time_point< Clock, Duration > const& timeout_time) { - typename allocator_traits_t::pointer ptr{ - allocator_traits_t::allocate( alloc_, 1) }; - try { - allocator_traits_t::construct( - alloc_, ptr, std::move( va), alloc_); - } catch (...) { - allocator_traits_t::deallocate( alloc_, ptr, 1); - throw; - } - std::unique_lock< mutex > lk( mtx_); - return push_wait_until_( { detail::convert( ptr) }, timeout_time, lk); - } - - channel_op_status try_push( value_type const& va) { - typename allocator_traits_t::pointer ptr{ - allocator_traits_t::allocate( alloc_, 1) }; - try { - allocator_traits_t::construct( alloc_, ptr, va, alloc_); - } catch (...) { - allocator_traits_t::deallocate( alloc_, ptr, 1); - throw; - } - std::unique_lock< mutex > lk( mtx_); - return try_push_( { detail::convert( ptr) }, lk); - } - - channel_op_status try_push( value_type && va) { - typename allocator_traits_t::pointer ptr{ - allocator_traits_t::allocate( alloc_, 1) }; - try { - allocator_traits_t::construct( - alloc_, ptr, std::move( va), alloc_); - } catch (...) { - allocator_traits_t::deallocate( alloc_, ptr, 1); - throw; - } - std::unique_lock< mutex > lk( mtx_); - return try_push_( { detail::convert( ptr) }, lk); - } - - channel_op_status pop( value_type & va) { - std::unique_lock< mutex > lk( mtx_); - not_empty_cond_.wait( lk, - [this](){ - return is_closed_() || ! is_empty_(); - }); - if ( is_closed_() && is_empty_() ) { - return channel_op_status::closed; - } - va = value_pop_( lk); - return channel_op_status::success; - } - - value_type value_pop() { - std::unique_lock< mutex > lk( mtx_); - not_empty_cond_.wait( lk, - [this](){ - return is_closed_() || ! is_empty_(); - }); - if ( is_closed_() && is_empty_() ) { - throw fiber_error( - std::make_error_code( std::errc::operation_not_permitted), - "boost fiber: queue is closed"); - } - return value_pop_( lk); - } - - channel_op_status try_pop( value_type & va) { - std::unique_lock< mutex > lk( mtx_); - if ( is_closed_() && is_empty_() ) { - // let other fibers run - lk.unlock(); - this_fiber::yield(); - return channel_op_status::closed; - } - if ( is_empty_() ) { - // let other fibers run - lk.unlock(); - this_fiber::yield(); - return channel_op_status::empty; - } - va = value_pop_( lk); - return channel_op_status::success; - } - - template< typename Rep, typename Period > - channel_op_status pop_wait_for( value_type & va, - std::chrono::duration< Rep, Period > const& timeout_duration) { - return pop_wait_until( va, - std::chrono::steady_clock::now() + timeout_duration); - } - - template< typename Clock, typename Duration > - channel_op_status pop_wait_until( value_type & va, - std::chrono::time_point< Clock, Duration > const& timeout_time) { - std::unique_lock< mutex > lk( mtx_); - if ( ! not_empty_cond_.wait_until( lk, - timeout_time, - [this](){ - return is_closed_() || ! is_empty_(); - })) { - return channel_op_status::timeout; - } - if ( is_closed_() && is_empty_() ) { - return channel_op_status::closed; - } - va = value_pop_( lk); - return channel_op_status::success; - } -}; - -}} - -#ifdef BOOST_HAS_ABI_HEADERS -# include BOOST_ABI_SUFFIX -#endif - -#endif // BOOST_FIBERS_BOUNDED_CHANNEL_H diff --git a/boost/fiber/buffered_channel.hpp b/boost/fiber/buffered_channel.hpp index 25109a4976..1c32e49bae 100644 --- a/boost/fiber/buffered_channel.hpp +++ b/boost/fiber/buffered_channel.hpp @@ -4,8 +4,6 @@ // (See accompanying file LICENSE_1_0.txt or copy at // http://www.boost.org/LICENSE_1_0.txt) // -// based on Dmitry Vyukov's MPMC queue -// (http://www.1024cores.net/home/lock-free-algorithms/queues/bounded-mpmc-queue) #ifndef BOOST_FIBERS_BUFFERED_CHANNEL_H #define BOOST_FIBERS_BUFFERED_CHANNEL_H @@ -39,120 +37,42 @@ public: typedef T value_type; private: - typedef typename std::aligned_storage< sizeof( T), alignof( T) >::type storage_type; - typedef context::wait_queue_t wait_queue_type; - - struct alignas(cache_alignment) slot { - std::atomic< std::size_t > cycle{ 0 }; - storage_type storage{}; - - slot() = default; - }; - - // procuder cacheline - alignas(cache_alignment) std::atomic< std::size_t > producer_idx_{ 0 }; - // consumer cacheline - alignas(cache_alignment) std::atomic< std::size_t > consumer_idx_{ 0 }; - // shared write cacheline - alignas(cache_alignment) std::atomic_bool closed_{ false }; - mutable detail::spinlock splk_{}; - wait_queue_type waiting_producers_{}; - wait_queue_type waiting_consumers_{}; - // shared read cacheline - alignas(cache_alignment) slot * slots_{ nullptr }; - std::size_t capacity_; - char pad_[cacheline_length]; - - bool is_full_() { - std::size_t idx{ producer_idx_.load( std::memory_order_relaxed) }; - return 0 > static_cast< std::intptr_t >( slots_[idx & (capacity_ - 1)].cycle.load( std::memory_order_acquire) ) - static_cast< std::intptr_t >( idx); - } - - bool is_empty_() { - std::size_t idx{ consumer_idx_.load( std::memory_order_relaxed) }; - return 0 > static_cast< std::intptr_t >( slots_[idx & (capacity_ - 1)].cycle.load( std::memory_order_acquire) ) - static_cast< std::intptr_t >( idx + 1); - } - - template< typename ValueType > - channel_op_status try_push_( ValueType && value) { - slot * s{ nullptr }; - std::size_t idx{ producer_idx_.load( std::memory_order_relaxed) }; - for (;;) { - s = & slots_[idx & (capacity_ - 1)]; - std::size_t cycle{ s->cycle.load( std::memory_order_acquire) }; - std::intptr_t diff{ static_cast< std::intptr_t >( cycle) - static_cast< std::intptr_t >( idx) }; - if ( 0 == diff) { - if ( producer_idx_.compare_exchange_weak( idx, idx + 1, std::memory_order_relaxed) ) { - break; - } - } else if ( 0 > diff) { - return channel_op_status::full; - } else { - idx = producer_idx_.load( std::memory_order_relaxed); - } - } - ::new ( static_cast< void * >( std::addressof( s->storage) ) ) value_type( std::forward< ValueType >( value) ); - s->cycle.store( idx + 1, std::memory_order_release); - return channel_op_status::success; - } - - channel_op_status try_value_pop_( slot *& s, std::size_t & idx) { - idx = consumer_idx_.load( std::memory_order_relaxed); - for (;;) { - s = & slots_[idx & (capacity_ - 1)]; - std::size_t cycle = s->cycle.load( std::memory_order_acquire); - std::intptr_t diff{ static_cast< std::intptr_t >( cycle) - static_cast< std::intptr_t >( idx + 1) }; - if ( 0 == diff) { - if ( consumer_idx_.compare_exchange_weak( idx, idx + 1, std::memory_order_relaxed) ) { - break; - } - } else if ( 0 > diff) { - return channel_op_status::empty; - } else { - idx = consumer_idx_.load( std::memory_order_relaxed); - } - } - // incrementing the slot cycle must be deferred till the value has been consumed - // slot cycle tells procuders that the cell can be re-used (store new value) - return channel_op_status::success; - } - - channel_op_status try_pop_( value_type & value) { - slot * s{ nullptr }; - std::size_t idx{ 0 }; - channel_op_status status{ try_value_pop_( s, idx) }; - if ( channel_op_status::success == status) { - value = std::move( * reinterpret_cast< value_type * >( std::addressof( s->storage) ) ); - s->cycle.store( idx + capacity_, std::memory_order_release); - } - return status; + typedef context::wait_queue_t wait_queue_type; + typedef T slot_type; + + alignas(cache_alignment) mutable detail::spinlock splk_{}; + wait_queue_type waiting_producers_{}; + wait_queue_type waiting_consumers_{}; + slot_type * slots_; + std::size_t pidx_{ 0 }; + std::size_t cidx_{ 0 }; + std::size_t capacity_; + bool closed_{ false }; + + bool is_full_() const noexcept { + return cidx_ == ((pidx_ + 1) % capacity_); + } + + bool is_empty_() const noexcept { + return cidx_ == pidx_; + } + + bool is_closed_() const noexcept { + return closed_; } public: explicit buffered_channel( std::size_t capacity) : - capacity_{ capacity } { - if ( 0 == capacity_ || 0 != ( capacity_ & (capacity_ - 1) ) ) { - throw fiber_error( std::make_error_code( std::errc::invalid_argument), - "boost fiber: buffer capacity is invalid"); - } - slots_ = new slot[capacity_](); - for ( std::size_t i = 0; i < capacity_; ++i) { - slots_[i].cycle.store( i, std::memory_order_relaxed); + capacity_{ capacity } { + if ( 2 > capacity_ || 0 != ( capacity_ & (capacity_ - 1) ) ) { + throw fiber_error{ std::make_error_code( std::errc::invalid_argument), + "boost fiber: buffer capacity is invalid" }; } + slots_ = new slot_type[capacity_]; } ~buffered_channel() { close(); - for (;;) { - slot * s{ nullptr }; - std::size_t idx{ 0 }; - if ( channel_op_status::success == try_value_pop_( s, idx) ) { - reinterpret_cast< value_type * >( std::addressof( s->storage) )->~value_type(); - s->cycle.store( idx + capacity_, std::memory_order_release); - } else { - break; - } - } delete [] slots_; } @@ -160,109 +80,116 @@ public: buffered_channel & operator=( buffered_channel const&) = delete; bool is_closed() const noexcept { - return closed_.load( std::memory_order_acquire); + detail::spinlock_lock lk{ splk_ }; + return is_closed_(); } void close() noexcept { - context * ctx{ context::active() }; + context * active_ctx = context::active(); detail::spinlock_lock lk{ splk_ }; - closed_.store( true, std::memory_order_release); + closed_ = true; // notify all waiting producers while ( ! waiting_producers_.empty() ) { - context * producer_ctx{ & waiting_producers_.front() }; + context * producer_ctx = & waiting_producers_.front(); waiting_producers_.pop_front(); - ctx->set_ready( producer_ctx); + active_ctx->schedule( producer_ctx); } // notify all waiting consumers while ( ! waiting_consumers_.empty() ) { - context * consumer_ctx{ & waiting_consumers_.front() }; + context * consumer_ctx = & waiting_consumers_.front(); waiting_consumers_.pop_front(); - ctx->set_ready( consumer_ctx); + active_ctx->schedule( consumer_ctx); } } channel_op_status try_push( value_type const& value) { - if ( is_closed() ) { + context * active_ctx = context::active(); + detail::spinlock_lock lk{ splk_ }; + if ( is_closed_() ) { return channel_op_status::closed; + } else if ( is_full_() ) { + return channel_op_status::full; + } else { + slots_[pidx_] = value; + pidx_ = (pidx_ + 1) % capacity_; + // notify one waiting consumer + if ( ! waiting_consumers_.empty() ) { + context * consumer_ctx = & waiting_consumers_.front(); + waiting_consumers_.pop_front(); + lk.unlock(); + active_ctx->schedule( consumer_ctx); + } + return channel_op_status::success; } - return try_push_( value); } channel_op_status try_push( value_type && value) { - if ( is_closed() ) { + context * active_ctx = context::active(); + detail::spinlock_lock lk{ splk_ }; + if ( is_closed_() ) { return channel_op_status::closed; + } else if ( is_full_() ) { + return channel_op_status::full; + } else { + slots_[pidx_] = std::move( value); + pidx_ = (pidx_ + 1) % capacity_; + // notify one waiting consumer + if ( ! waiting_consumers_.empty() ) { + context * consumer_ctx = & waiting_consumers_.front(); + waiting_consumers_.pop_front(); + lk.unlock(); + active_ctx->schedule( consumer_ctx); + } + return channel_op_status::success; } - return try_push_( std::move( value) ); } channel_op_status push( value_type const& value) { - context * ctx{ context::active() }; + context * active_ctx = context::active(); for (;;) { - if ( is_closed() ) { + detail::spinlock_lock lk{ splk_ }; + if ( is_closed_() ) { return channel_op_status::closed; - } - channel_op_status status{ try_push_( value) }; - if ( channel_op_status::success == status) { - detail::spinlock_lock lk{ splk_ }; + } else if ( is_full_() ) { + active_ctx->wait_link( waiting_producers_); + // suspend this producer + active_ctx->suspend( lk); + } else { + slots_[pidx_] = value; + pidx_ = (pidx_ + 1) % capacity_; // notify one waiting consumer if ( ! waiting_consumers_.empty() ) { - context * consumer_ctx{ & waiting_consumers_.front() }; + context * consumer_ctx = & waiting_consumers_.front(); waiting_consumers_.pop_front(); lk.unlock(); - ctx->set_ready( consumer_ctx); - } - return status; - } else if ( channel_op_status::full == status) { - BOOST_ASSERT( ! ctx->wait_is_linked() ); - detail::spinlock_lock lk{ splk_ }; - if ( is_closed() ) { - return channel_op_status::closed; + active_ctx->schedule( consumer_ctx); } - if ( ! is_full_() ) { - continue; - } - ctx->wait_link( waiting_producers_); - // suspend this producer - ctx->suspend( lk); - } else { - BOOST_ASSERT( channel_op_status::closed == status); - return status; + return channel_op_status::success; } } } channel_op_status push( value_type && value) { - context * ctx{ context::active() }; + context * active_ctx = context::active(); for (;;) { - if ( is_closed() ) { + detail::spinlock_lock lk{ splk_ }; + if ( is_closed_() ) { return channel_op_status::closed; - } - channel_op_status status{ try_push_( std::move( value) ) }; - if ( channel_op_status::success == status) { - detail::spinlock_lock lk{ splk_ }; + } else if ( is_full_() ) { + active_ctx->wait_link( waiting_producers_); + // suspend this producer + active_ctx->suspend( lk); + } else { + slots_[pidx_] = std::move( value); + pidx_ = (pidx_ + 1) % capacity_; // notify one waiting consumer if ( ! waiting_consumers_.empty() ) { - context * consumer_ctx{ & waiting_consumers_.front() }; + context * consumer_ctx = & waiting_consumers_.front(); waiting_consumers_.pop_front(); lk.unlock(); - ctx->set_ready( consumer_ctx); + active_ctx->schedule( consumer_ctx); } - return status; - } else if ( channel_op_status::full == status) { - BOOST_ASSERT( ! ctx->wait_is_linked() ); - detail::spinlock_lock lk{ splk_ }; - if ( is_closed() ) { - return channel_op_status::closed; - } - if ( ! is_full_() ) { - continue; - } - ctx->wait_link( waiting_producers_); - // suspend this producer - ctx->suspend( lk); - } else { - BOOST_ASSERT( channel_op_status::closed == status); - return status; + return channel_op_status::success; } } } @@ -284,44 +211,33 @@ public: template< typename Clock, typename Duration > channel_op_status push_wait_until( value_type const& value, std::chrono::time_point< Clock, Duration > const& timeout_time_) { - std::chrono::steady_clock::time_point timeout_time( detail::convert( timeout_time_) ); - context * ctx{ context::active() }; + context * active_ctx = context::active(); + std::chrono::steady_clock::time_point timeout_time = detail::convert( timeout_time_); for (;;) { - if ( is_closed() ) { + detail::spinlock_lock lk{ splk_ }; + if ( is_closed_() ) { return channel_op_status::closed; - } - channel_op_status status{ try_push_( value) }; - if ( channel_op_status::success == status) { - detail::spinlock_lock lk{ splk_ }; - // notify one waiting consumer - if ( ! waiting_consumers_.empty() ) { - context * consumer_ctx{ & waiting_consumers_.front() }; - waiting_consumers_.pop_front(); - lk.unlock(); - ctx->set_ready( consumer_ctx); - } - return status; - } else if ( channel_op_status::full == status) { - BOOST_ASSERT( ! ctx->wait_is_linked() ); - detail::spinlock_lock lk{ splk_ }; - if ( is_closed() ) { - return channel_op_status::closed; - } - if ( ! is_full_() ) { - continue; - } - ctx->wait_link( waiting_producers_); + } else if ( is_full_() ) { + active_ctx->wait_link( waiting_producers_); // suspend this producer - if ( ! ctx->wait_until( timeout_time, lk) ) { + if ( ! active_ctx->wait_until( timeout_time, lk) ) { // relock local lk lk.lock(); // remove from waiting-queue - ctx->wait_unlink(); + waiting_producers_.remove( * active_ctx); return channel_op_status::timeout; } } else { - BOOST_ASSERT( channel_op_status::closed == status); - return status; + slots_[pidx_] = value; + pidx_ = (pidx_ + 1) % capacity_; + // notify one waiting consumer + if ( ! waiting_consumers_.empty() ) { + context * consumer_ctx = & waiting_consumers_.front(); + waiting_consumers_.pop_front(); + lk.unlock(); + active_ctx->schedule( consumer_ctx); + } + return channel_op_status::success; } } } @@ -329,128 +245,110 @@ public: template< typename Clock, typename Duration > channel_op_status push_wait_until( value_type && value, std::chrono::time_point< Clock, Duration > const& timeout_time_) { - std::chrono::steady_clock::time_point timeout_time( detail::convert( timeout_time_) ); - context * ctx{ context::active() }; + context * active_ctx = context::active(); + std::chrono::steady_clock::time_point timeout_time = detail::convert( timeout_time_); for (;;) { - if ( is_closed() ) { + detail::spinlock_lock lk{ splk_ }; + if ( is_closed_() ) { return channel_op_status::closed; - } - channel_op_status status{ try_push_( std::move( value) ) }; - if ( channel_op_status::success == status) { - detail::spinlock_lock lk{ splk_ }; - // notify one waiting consumer - if ( ! waiting_consumers_.empty() ) { - context * consumer_ctx{ & waiting_consumers_.front() }; - waiting_consumers_.pop_front(); - lk.unlock(); - ctx->set_ready( consumer_ctx); - } - return status; - } else if ( channel_op_status::full == status) { - BOOST_ASSERT( ! ctx->wait_is_linked() ); - detail::spinlock_lock lk{ splk_ }; - if ( is_closed() ) { - return channel_op_status::closed; - } - if ( ! is_full_() ) { - continue; - } - ctx->wait_link( waiting_producers_); + } else if ( is_full_() ) { + active_ctx->wait_link( waiting_producers_); // suspend this producer - if ( ! ctx->wait_until( timeout_time, lk) ) { + if ( ! active_ctx->wait_until( timeout_time, lk) ) { // relock local lk lk.lock(); // remove from waiting-queue - ctx->wait_unlink(); + waiting_producers_.remove( * active_ctx); return channel_op_status::timeout; } } else { - BOOST_ASSERT( channel_op_status::closed == status); - return status; + slots_[pidx_] = std::move( value); + pidx_ = (pidx_ + 1) % capacity_; + // notify one waiting consumer + if ( ! waiting_consumers_.empty() ) { + context * consumer_ctx = & waiting_consumers_.front(); + waiting_consumers_.pop_front(); + lk.unlock(); + active_ctx->schedule( consumer_ctx); + } + return channel_op_status::success; } } } channel_op_status try_pop( value_type & value) { - channel_op_status status{ try_pop_( value) }; - if ( channel_op_status::success != status) { - if ( is_closed() ) { - status = channel_op_status::closed; + context * active_ctx = context::active(); + detail::spinlock_lock lk{ splk_ }; + if ( is_empty_() ) { + return is_closed_() + ? channel_op_status::closed + : channel_op_status::empty; + } else { + value = std::move( slots_[cidx_]); + cidx_ = (cidx_ + 1) % capacity_; + // notify one waiting producer + if ( ! waiting_producers_.empty() ) { + context * producer_ctx = & waiting_producers_.front(); + waiting_producers_.pop_front(); + lk.unlock(); + active_ctx->schedule( producer_ctx); } + return channel_op_status::success; } - return status; } channel_op_status pop( value_type & value) { - context * ctx{ context::active() }; + context * active_ctx = context::active(); for (;;) { - channel_op_status status{ try_pop_( value) }; - if ( channel_op_status::success == status) { - detail::spinlock_lock lk{ splk_ }; + detail::spinlock_lock lk{ splk_ }; + if ( is_empty_() ) { + if ( is_closed_() ) { + return channel_op_status::closed; + } else { + active_ctx->wait_link( waiting_consumers_); + // suspend this consumer + active_ctx->suspend( lk); + } + } else { + value = std::move( slots_[cidx_]); + cidx_ = (cidx_ + 1) % capacity_; // notify one waiting producer if ( ! waiting_producers_.empty() ) { - context * producer_ctx{ & waiting_producers_.front() }; + context * producer_ctx = & waiting_producers_.front(); waiting_producers_.pop_front(); lk.unlock(); - ctx->set_ready( producer_ctx); + active_ctx->schedule( producer_ctx); } - return status; - } else if ( channel_op_status::empty == status) { - BOOST_ASSERT( ! ctx->wait_is_linked() ); - detail::spinlock_lock lk{ splk_ }; - if ( is_closed() ) { - return channel_op_status::closed; - } - if ( ! is_empty_() ) { - continue; - } - ctx->wait_link( waiting_consumers_); - // suspend this consumer - ctx->suspend( lk); - } else { - BOOST_ASSERT( channel_op_status::closed == status); - return status; + return channel_op_status::success; } } } value_type value_pop() { - context * ctx{ context::active() }; + context * active_ctx = context::active(); for (;;) { - slot * s{ nullptr }; - std::size_t idx{ 0 }; - channel_op_status status{ try_value_pop_( s, idx) }; - if ( channel_op_status::success == status) { - value_type value{ std::move( * reinterpret_cast< value_type * >( std::addressof( s->storage) ) ) }; - s->cycle.store( idx + capacity_, std::memory_order_release); - detail::spinlock_lock lk{ splk_ }; + detail::spinlock_lock lk{ splk_ }; + if ( is_empty_() ) { + if ( is_closed_() ) { + throw fiber_error{ + std::make_error_code( std::errc::operation_not_permitted), + "boost fiber: channel is closed" }; + } else { + active_ctx->wait_link( waiting_consumers_); + // suspend this consumer + active_ctx->suspend( lk); + } + } else { + value_type value = std::move( slots_[cidx_]); + cidx_ = (cidx_ + 1) % capacity_; // notify one waiting producer if ( ! waiting_producers_.empty() ) { - context * producer_ctx{ & waiting_producers_.front() }; + context * producer_ctx = & waiting_producers_.front(); waiting_producers_.pop_front(); lk.unlock(); - ctx->set_ready( producer_ctx); + active_ctx->schedule( producer_ctx); } return std::move( value); - } else if ( channel_op_status::empty == status) { - BOOST_ASSERT( ! ctx->wait_is_linked() ); - detail::spinlock_lock lk{ splk_ }; - if ( is_closed() ) { - throw fiber_error{ - std::make_error_code( std::errc::operation_not_permitted), - "boost fiber: channel is closed" }; - } - if ( ! is_empty_() ) { - continue; - } - ctx->wait_link( waiting_consumers_); - // suspend this consumer - ctx->suspend( lk); - } else { - BOOST_ASSERT( channel_op_status::closed == status); - throw fiber_error{ - std::make_error_code( std::errc::operation_not_permitted), - "boost fiber: channel is closed" }; } } } @@ -465,41 +363,35 @@ public: template< typename Clock, typename Duration > channel_op_status pop_wait_until( value_type & value, std::chrono::time_point< Clock, Duration > const& timeout_time_) { - std::chrono::steady_clock::time_point timeout_time( detail::convert( timeout_time_) ); - context * ctx{ context::active() }; + context * active_ctx = context::active(); + std::chrono::steady_clock::time_point timeout_time = detail::convert( timeout_time_); for (;;) { - channel_op_status status{ try_pop_( value) }; - if ( channel_op_status::success == status) { - detail::spinlock_lock lk{ splk_ }; + detail::spinlock_lock lk{ splk_ }; + if ( is_empty_() ) { + if ( is_closed_() ) { + return channel_op_status::closed; + } else { + active_ctx->wait_link( waiting_consumers_); + // suspend this consumer + if ( ! active_ctx->wait_until( timeout_time, lk) ) { + // relock local lk + lk.lock(); + // remove from waiting-queue + waiting_consumers_.remove( * active_ctx); + return channel_op_status::timeout; + } + } + } else { + value = std::move( slots_[cidx_]); + cidx_ = (cidx_ + 1) % capacity_; // notify one waiting producer if ( ! waiting_producers_.empty() ) { - context * producer_ctx{ & waiting_producers_.front() }; + context * producer_ctx = & waiting_producers_.front(); waiting_producers_.pop_front(); lk.unlock(); - context::active()->set_ready( producer_ctx); - } - return status; - } else if ( channel_op_status::empty == status) { - BOOST_ASSERT( ! ctx->wait_is_linked() ); - detail::spinlock_lock lk{ splk_ }; - if ( is_closed() ) { - return channel_op_status::closed; - } - if ( ! is_empty_() ) { - continue; + active_ctx->schedule( producer_ctx); } - ctx->wait_link( waiting_consumers_); - // suspend this consumer - if ( ! ctx->wait_until( timeout_time, lk) ) { - // relock local lk - lk.lock(); - // remove from waiting-queue - ctx->wait_unlink(); - return channel_op_status::timeout; - } - } else { - BOOST_ASSERT( channel_op_status::closed == status); - return status; + return channel_op_status::success; } } } diff --git a/boost/fiber/condition_variable.hpp b/boost/fiber/condition_variable.hpp index 0dca7ef6a0..cd0e7cb022 100644 --- a/boost/fiber/condition_variable.hpp +++ b/boost/fiber/condition_variable.hpp @@ -45,8 +45,8 @@ class BOOST_FIBERS_DECL condition_variable_any { private: typedef context::wait_queue_t wait_queue_t; - wait_queue_t wait_queue_{}; detail::spinlock wait_queue_splk_{}; + wait_queue_t wait_queue_{}; public: condition_variable_any() = default; @@ -64,22 +64,16 @@ public: template< typename LockType > void wait( LockType & lt) { - context * ctx = context::active(); + context * active_ctx = context::active(); // atomically call lt.unlock() and block on *this // store this fiber in waiting-queue - detail::spinlock_lock lk( wait_queue_splk_); - BOOST_ASSERT( ! ctx->wait_is_linked() ); - ctx->wait_link( wait_queue_); + detail::spinlock_lock lk{ wait_queue_splk_ }; + BOOST_ASSERT( ! active_ctx->wait_is_linked() ); + active_ctx->wait_link( wait_queue_); // unlock external lt lt.unlock(); // suspend this fiber - ctx->suspend( lk); - // relock local lk - lk.lock(); - // remove from waiting-queue - ctx->wait_unlink(); - // unlock local lk - lk.unlock(); + active_ctx->suspend( lk); // relock external again before returning try { lt.lock(); @@ -87,7 +81,7 @@ public: std::terminate(); } // post-conditions - BOOST_ASSERT( ! ctx->wait_is_linked() ); + BOOST_ASSERT( ! active_ctx->wait_is_linked() ); } template< typename LockType, typename Pred > @@ -99,27 +93,26 @@ public: template< typename LockType, typename Clock, typename Duration > cv_status wait_until( LockType & lt, std::chrono::time_point< Clock, Duration > const& timeout_time_) { + context * active_ctx = context::active(); cv_status status = cv_status::no_timeout; - std::chrono::steady_clock::time_point timeout_time( - detail::convert( timeout_time_) ); - context * ctx = context::active(); + std::chrono::steady_clock::time_point timeout_time = detail::convert( timeout_time_); // atomically call lt.unlock() and block on *this // store this fiber in waiting-queue - detail::spinlock_lock lk( wait_queue_splk_); - BOOST_ASSERT( ! ctx->wait_is_linked() ); - ctx->wait_link( wait_queue_); + detail::spinlock_lock lk{ wait_queue_splk_ }; + BOOST_ASSERT( ! active_ctx->wait_is_linked() ); + active_ctx->wait_link( wait_queue_); // unlock external lt lt.unlock(); // suspend this fiber - if ( ! ctx->wait_until( timeout_time, lk) ) { + if ( ! active_ctx->wait_until( timeout_time, lk) ) { status = cv_status::timeout; + // relock local lk + lk.lock(); + // remove from waiting-queue + wait_queue_.remove( * active_ctx); + // unlock local lk + lk.unlock(); } - // relock local lk - lk.lock(); - // remove from waiting-queue - ctx->wait_unlink(); - // unlock local lk - lk.unlock(); // relock external again before returning try { lt.lock(); @@ -127,7 +120,7 @@ public: std::terminate(); } // post-conditions - BOOST_ASSERT( ! ctx->wait_is_linked() ); + BOOST_ASSERT( ! active_ctx->wait_is_linked() ); return status; } diff --git a/boost/fiber/context.hpp b/boost/fiber/context.hpp index d873343538..773528e3a1 100644 --- a/boost/fiber/context.hpp +++ b/boost/fiber/context.hpp @@ -7,23 +7,32 @@ #ifndef BOOST_FIBERS_CONTEXT_H #define BOOST_FIBERS_CONTEXT_H +#include <iostream> #include <atomic> #include <chrono> #include <exception> #include <functional> #include <map> #include <memory> +#include <tuple> #include <type_traits> #include <boost/assert.hpp> #include <boost/config.hpp> +#if defined(BOOST_NO_CXX17_STD_APPLY) #include <boost/context/detail/apply.hpp> -#include <boost/context/execution_context.hpp> +#endif +#if (BOOST_EXECUTION_CONTEXT==1) +# include <boost/context/execution_context.hpp> +#else +# include <boost/context/continuation.hpp> +#endif #include <boost/context/stack_context.hpp> #include <boost/intrusive/list.hpp> #include <boost/intrusive/parent_from_member.hpp> #include <boost/intrusive_ptr.hpp> #include <boost/intrusive/set.hpp> +#include <boost/intrusive/slist.hpp> #include <boost/fiber/detail/config.hpp> #include <boost/fiber/detail/data.hpp> @@ -57,10 +66,10 @@ class scheduler; namespace detail { struct wait_tag; -typedef intrusive::list_member_hook< +typedef intrusive::slist_member_hook< intrusive::tag< wait_tag >, intrusive::link_mode< - intrusive::auto_unlink + intrusive::safe_link > > wait_hook; // declaration of the functor that converts between @@ -97,21 +106,29 @@ typedef intrusive::set_member_hook< > > sleep_hook; -struct terminated_tag; +struct worker_tag; typedef intrusive::list_member_hook< - intrusive::tag< terminated_tag >, + intrusive::tag< worker_tag >, intrusive::link_mode< intrusive::auto_unlink > +> worker_hook; + +struct terminated_tag; +typedef intrusive::slist_member_hook< + intrusive::tag< terminated_tag >, + intrusive::link_mode< + intrusive::safe_link + > > terminated_hook; -struct worker_tag; -typedef intrusive::list_member_hook< - intrusive::tag< worker_tag >, +struct remote_ready_tag; +typedef intrusive::slist_member_hook< + intrusive::tag< remote_ready_tag >, intrusive::link_mode< - intrusive::auto_unlink + intrusive::safe_link > -> worker_hook; +> remote_ready_hook; } @@ -125,13 +142,17 @@ struct worker_context_t {}; const worker_context_t worker_context{}; class BOOST_FIBERS_DECL context { +public: + typedef intrusive::slist< + context, + intrusive::function_hook< detail::wait_functor >, + intrusive::linear< true >, + intrusive::cache_last< true > + > wait_queue_t; + private: friend class scheduler; - enum flag_t { - flag_terminated = 1 << 1 - }; - struct fss_data { void * vp{ nullptr }; detail::fss_cleanup_function::ptr_t cleanup_function{}; @@ -151,100 +172,102 @@ private: } }; - typedef std::map< uintptr_t, fss_data > fss_data_t; + typedef std::map< uintptr_t, fss_data > fss_data_t; #if ! defined(BOOST_FIBERS_NO_ATOMICS) - std::atomic< std::size_t > use_count_{ 0 }; - std::atomic< unsigned int > flags_; - std::atomic< type > type_; - std::atomic< scheduler * > scheduler_{ nullptr }; + alignas(cache_alignment) std::atomic< std::size_t > use_count_{ 0 }; #else - std::size_t use_count_{ 0 }; - unsigned int flags_; - type type_; - scheduler * scheduler_{ nullptr }; + alignas(cache_alignment) std::size_t use_count_{ 0 }; +#endif +#if ! defined(BOOST_FIBERS_NO_ATOMICS) + alignas(cache_alignment) detail::remote_ready_hook remote_ready_hook_{}; + std::atomic< context * > remote_nxt_{ nullptr }; #endif - launch policy_{ launch::post }; + alignas(cache_alignment) detail::spinlock splk_{}; + bool terminated_{ false }; + wait_queue_t wait_queue_{}; +public: + detail::wait_hook wait_hook_{}; +private: + alignas(cache_alignment) scheduler * scheduler_{ nullptr }; + fss_data_t fss_data_{}; + detail::sleep_hook sleep_hook_{}; + detail::ready_hook ready_hook_{}; + detail::terminated_hook terminated_hook_{}; + detail::worker_hook worker_hook_{}; #if (BOOST_EXECUTION_CONTEXT==1) - boost::context::execution_context ctx_; + boost::context::execution_context ctx_; #else - boost::context::execution_context< detail::data_t * > ctx_; + boost::context::continuation c_; #endif + fiber_properties * properties_{ nullptr }; + std::chrono::steady_clock::time_point tp_{ (std::chrono::steady_clock::time_point::max)() }; + type type_; + launch policy_; void resume_( detail::data_t &) noexcept; - void set_ready_( context *) noexcept; + void schedule_( context *) noexcept; #if (BOOST_EXECUTION_CONTEXT==1) template< typename Fn, typename Tpl > void run_( Fn && fn_, Tpl && tpl_, detail::data_t * dp) noexcept { { - // fn and tpl must be destroyed before calling set_terminated() + // fn and tpl must be destroyed before calling terminate() typename std::decay< Fn >::type fn = std::forward< Fn >( fn_); typename std::decay< Tpl >::type tpl = std::forward< Tpl >( tpl_); if ( nullptr != dp->lk) { dp->lk->unlock(); } else if ( nullptr != dp->ctx) { - active()->set_ready_( dp->ctx); + active()->schedule_( dp->ctx); } +#if defined(BOOST_NO_CXX17_STD_APPLY) boost::context::detail::apply( std::move( fn), std::move( tpl) ); +#else + std::apply( std::move( fn), std::move( tpl) ); +#endif } // terminate context - set_terminated(); + terminate(); BOOST_ASSERT_MSG( false, "fiber already terminated"); } #else template< typename Fn, typename Tpl > - boost::context::execution_context< detail::data_t * > - run_( boost::context::execution_context< detail::data_t * > && ctx, Fn && fn_, Tpl && tpl_, detail::data_t * dp) noexcept { + boost::context::continuation + run_( boost::context::continuation && c, Fn && fn_, Tpl && tpl_) noexcept { { - // fn and tpl must be destroyed before calling set_terminated() + // fn and tpl must be destroyed before calling terminate() typename std::decay< Fn >::type fn = std::forward< Fn >( fn_); typename std::decay< Tpl >::type tpl = std::forward< Tpl >( tpl_); - // update execution_context of calling fiber - dp->from->ctx_ = std::move( ctx); + c = c.resume(); + detail::data_t * dp = c.get_data< detail::data_t * >(); + // update contiunation of calling fiber + dp->from->c_ = std::move( c); if ( nullptr != dp->lk) { dp->lk->unlock(); } else if ( nullptr != dp->ctx) { - active()->set_ready_( dp->ctx); + active()->schedule_( dp->ctx); } +#if defined(BOOST_NO_CXX17_STD_APPLY) boost::context::detail::apply( std::move( fn), std::move( tpl) ); +#else + std::apply( std::move( fn), std::move( tpl) ); +#endif } // terminate context - return set_terminated(); + return terminate(); } #endif public: - detail::ready_hook ready_hook_{}; - detail::sleep_hook sleep_hook_{}; - detail::terminated_hook terminated_hook_{}; - detail::wait_hook wait_hook_{}; - detail::worker_hook worker_hook_{}; - std::atomic< context * > remote_nxt_{ nullptr }; - std::chrono::steady_clock::time_point tp_{ (std::chrono::steady_clock::time_point::max)() }; - - typedef intrusive::list< - context, - intrusive::function_hook< detail::wait_functor >, - intrusive::constant_time_size< false > > wait_queue_t; - -private: - fss_data_t fss_data_{}; - wait_queue_t wait_queue_{}; - detail::spinlock splk_{}; - fiber_properties * properties_{ nullptr }; - -public: class id { private: context * impl_{ nullptr }; public: - id() noexcept { - } + id() = default; explicit id( context * impl) noexcept : - impl_( impl) { + impl_{ impl } { } bool operator==( id const& other) const noexcept { @@ -311,9 +334,6 @@ public: boost::context::preallocated palloc, StackAlloc salloc, Fn && fn, Tpl && tpl) : use_count_{ 1 }, // fiber instance or scheduler owner - flags_{ 0 }, - type_{ type::worker_context }, - policy_{ policy }, #if (BOOST_EXECUTION_CONTEXT==1) # if defined(BOOST_NO_CXX14_GENERIC_LAMBDAS) ctx_{ std::allocator_arg, palloc, salloc, @@ -325,50 +345,66 @@ public: std::forward< Fn >( fn), std::forward< Tpl >( tpl), boost::context::execution_context::current() ) - } + }, + type_{ type::worker_context }, + policy_{ policy } # else ctx_{ std::allocator_arg, palloc, salloc, [this,fn=detail::decay_copy( std::forward< Fn >( fn) ),tpl=std::forward< Tpl >( tpl), ctx=boost::context::execution_context::current()] (void * vp) mutable noexcept { run_( std::move( fn), std::move( tpl), static_cast< detail::data_t * >( vp) ); - }} + }}, + type_{ type::worker_context }, + policy_{ policy } # endif + {} #else + c_{}, + type_{ type::worker_context }, + policy_{ policy } + { # if defined(BOOST_NO_CXX14_GENERIC_LAMBDAS) - ctx_{ std::allocator_arg, palloc, salloc, - detail::wrap( - [this]( typename std::decay< Fn >::type & fn, typename std::decay< Tpl >::type & tpl, - boost::context::execution_context< detail::data_t * > && ctx, detail::data_t * dp) mutable noexcept { - return run_( std::forward< boost::context::execution_context< detail::data_t * > >( ctx), std::move( fn), std::move( tpl), dp); - }, - std::forward< Fn >( fn), - std::forward< Tpl >( tpl) )} - + c_ = boost::context::callcc( + std::allocator_arg, palloc, salloc, + detail::wrap( + [this]( typename std::decay< Fn >::type & fn, typename std::decay< Tpl >::type & tpl, + boost::context::continuation && c) mutable noexcept { + return run_( std::forward< boost::context::continuation >( c), std::move( fn), std::move( tpl) ); + }, + std::forward< Fn >( fn), + std::forward< Tpl >( tpl) ) ); # else - ctx_{ std::allocator_arg, palloc, salloc, - [this,fn=detail::decay_copy( std::forward< Fn >( fn) ),tpl=std::forward< Tpl >( tpl)] - (boost::context::execution_context< detail::data_t * > && ctx, detail::data_t * dp) mutable noexcept { - return run_( std::forward< boost::context::execution_context< detail::data_t * > >( ctx), std::move( fn), std::move( tpl), dp); - }} + c_ = boost::context::callcc( + std::allocator_arg, palloc, salloc, + [this,fn=detail::decay_copy( std::forward< Fn >( fn) ),tpl=std::forward< Tpl >( tpl)] + (boost::context::continuation && c) mutable noexcept { + return run_( std::forward< boost::context::continuation >( c), std::move( fn), std::move( tpl) ); + }); # endif + } #endif - {} context( context const&) = delete; context & operator=( context const&) = delete; + friend bool + operator==( context const& lhs, context const& rhs) noexcept { + return & lhs == & rhs; + } + virtual ~context(); scheduler * get_scheduler() const noexcept { -#if ! defined(BOOST_FIBERS_NO_ATOMICS) - return scheduler_.load( std::memory_order_relaxed); -#else return scheduler_; -#endif } id get_id() const noexcept; + bool is_resumable() const noexcept { + if ( c_) return true; + else return false; + } + void resume() noexcept; void resume( detail::spinlock_lock &) noexcept; void resume( context *) noexcept; @@ -377,10 +413,10 @@ public: void suspend( detail::spinlock_lock &) noexcept; #if (BOOST_EXECUTION_CONTEXT==1) - void set_terminated() noexcept; + void terminate() noexcept; #else - boost::context::execution_context< detail::data_t * > suspend_with_cc() noexcept; - boost::context::execution_context< detail::data_t * > set_terminated() noexcept; + boost::context::continuation suspend_with_cc() noexcept; + boost::context::continuation terminate() noexcept; #endif void join(); @@ -390,16 +426,12 @@ public: bool wait_until( std::chrono::steady_clock::time_point const&, detail::spinlock_lock &) noexcept; - void set_ready( context *) noexcept; + void schedule( context *) noexcept; bool is_context( type t) const noexcept { return type::none != ( type_ & t); } - bool is_terminated() const noexcept { - return 0 != ( flags_ & flag_terminated); - } - void * get_fss_data( void const * vp) const; void set_fss_data( @@ -418,77 +450,90 @@ public: return policy_; } + bool worker_is_linked() const noexcept; + bool ready_is_linked() const noexcept; + bool remote_ready_is_linked() const noexcept; + bool sleep_is_linked() const noexcept; bool terminated_is_linked() const noexcept; bool wait_is_linked() const noexcept; - bool worker_is_linked() const noexcept; + template< typename List > + void worker_link( List & lst) noexcept { + static_assert( std::is_same< typename List::value_traits::hook_type, detail::worker_hook >::value, "not a worker-queue"); + BOOST_ASSERT( ! worker_is_linked() ); + lst.push_back( * this); + } template< typename List > void ready_link( List & lst) noexcept { static_assert( std::is_same< typename List::value_traits::hook_type, detail::ready_hook >::value, "not a ready-queue"); + BOOST_ASSERT( ! ready_is_linked() ); + lst.push_back( * this); + } + + template< typename List > + void remote_ready_link( List & lst) noexcept { + static_assert( std::is_same< typename List::value_traits::hook_type, detail::remote_ready_hook >::value, "not a remote-ready-queue"); + BOOST_ASSERT( ! remote_ready_is_linked() ); lst.push_back( * this); } template< typename Set > void sleep_link( Set & set) noexcept { static_assert( std::is_same< typename Set::value_traits::hook_type,detail::sleep_hook >::value, "not a sleep-queue"); + BOOST_ASSERT( ! sleep_is_linked() ); set.insert( * this); } template< typename List > void terminated_link( List & lst) noexcept { static_assert( std::is_same< typename List::value_traits::hook_type, detail::terminated_hook >::value, "not a terminated-queue"); + BOOST_ASSERT( ! terminated_is_linked() ); lst.push_back( * this); } template< typename List > void wait_link( List & lst) noexcept { static_assert( std::is_same< typename List::value_traits::hook_type, detail::wait_hook >::value, "not a wait-queue"); + BOOST_ASSERT( ! wait_is_linked() ); lst.push_back( * this); } - template< typename List > - void worker_link( List & lst) noexcept { - static_assert( std::is_same< typename List::value_traits::hook_type, detail::worker_hook >::value, "not a worker-queue"); - lst.push_back( * this); - } + void worker_unlink() noexcept; void ready_unlink() noexcept; void sleep_unlink() noexcept; - void wait_unlink() noexcept; - - void worker_unlink() noexcept; - void detach() noexcept; void attach( context *) noexcept; friend void intrusive_ptr_add_ref( context * ctx) noexcept { BOOST_ASSERT( nullptr != ctx); - ++ctx->use_count_; + ctx->use_count_.fetch_add( 1, std::memory_order_relaxed); } friend void intrusive_ptr_release( context * ctx) noexcept { BOOST_ASSERT( nullptr != ctx); - if ( 0 == --ctx->use_count_) { + if ( 1 == ctx->use_count_.fetch_sub( 1, std::memory_order_release) ) { + std::atomic_thread_fence( std::memory_order_acquire); #if (BOOST_EXECUTION_CONTEXT==1) - boost::context::execution_context ec( ctx->ctx_); + boost::context::execution_context ec = ctx->ctx_; // destruct context // deallocates stack (execution_context is ref counted) ctx->~context(); #else - boost::context::execution_context< detail::data_t * > cc( std::move( ctx->ctx_) ); + boost::context::continuation c = std::move( ctx->c_); // destruct context ctx->~context(); // deallocated stack - cc( nullptr); + c.resume( nullptr); #endif } } @@ -521,14 +566,14 @@ static intrusive_ptr< context > make_worker_context( launch policy, const std::size_t size = sctx.size - ( static_cast< char * >( sctx.sp) - static_cast< char * >( sp) ); #endif // placement new of context on top of fiber's stack - return intrusive_ptr< context >( - ::new ( sp) context( + return intrusive_ptr< context >{ + ::new ( sp) context{ worker_context, policy, - boost::context::preallocated( sp, size, sctx), + boost::context::preallocated{ sp, size, sctx }, salloc, std::forward< Fn >( fn), - std::make_tuple( std::forward< Args >( args) ... ) ) ); + std::make_tuple( std::forward< Args >( args) ... ) } }; } namespace detail { diff --git a/boost/fiber/detail/config.hpp b/boost/fiber/detail/config.hpp index f65d48910d..7c7119e1fb 100644 --- a/boost/fiber/detail/config.hpp +++ b/boost/fiber/detail/config.hpp @@ -52,7 +52,7 @@ #endif #if !defined(BOOST_FIBERS_SPIN_MAX_TESTS) -# define BOOST_FIBERS_SPIN_MAX_TESTS 100 +# define BOOST_FIBERS_SPIN_MAX_TESTS 500 #endif // modern architectures have cachelines with 64byte length diff --git a/boost/fiber/detail/context_mpsc_queue.hpp b/boost/fiber/detail/context_mpsc_queue.hpp deleted file mode 100644 index f7e664659c..0000000000 --- a/boost/fiber/detail/context_mpsc_queue.hpp +++ /dev/null @@ -1,98 +0,0 @@ - -// Copyright Dmitry Vyukov 2010-2011. -// Copyright Oliver Kowalke 2016. -// Distributed under the Boost Software License, Version 1.0. -// (See accompanying file LICENSE_1_0.txt or copy at -// http://www.boost.org/LICENSE_1_0.txt) -// -// based on Dmitry Vyukov's intrusive MPSC queue -// http://www.1024cores.net/home/lock-free-algorithms/queues/intrusive-mpsc-node-based-queue -// https://groups.google.com/forum/#!topic/lock-free/aFHvZhu1G-0 - -#ifndef BOOST_FIBERS_DETAIL_CONTEXT_MPSC_QUEUE_H -#define BOOST_FIBERS_DETAIL_CONTEXT_MPSC_QUEUE_H - -#include <atomic> -#include <memory> -#include <type_traits> - -#include <boost/assert.hpp> -#include <boost/config.hpp> - -#include <boost/fiber/context.hpp> -#include <boost/fiber/detail/config.hpp> - -#ifdef BOOST_HAS_ABI_HEADERS -# include BOOST_ABI_PREFIX -#endif - -namespace boost { -namespace fibers { -namespace detail { - -// a MPSC queue -// multiple threads push ready fibers (belonging to local scheduler) -// (thread) local scheduler pops fibers -class context_mpsc_queue { -private: - // not default constructor for context - use aligned_storage instead - alignas(cache_alignment) std::aligned_storage< sizeof( context), alignof( context) >::type storage_{}; - context * dummy_; - alignas(cache_alignment) std::atomic< context * > head_; - alignas(cache_alignment) context * tail_; - char pad_[cacheline_length]; - -public: - context_mpsc_queue() : - dummy_{ reinterpret_cast< context * >( std::addressof( storage_) ) }, - head_{ dummy_ }, - tail_{ dummy_ } { - dummy_->remote_nxt_.store( nullptr, std::memory_order_release); - } - - context_mpsc_queue( context_mpsc_queue const&) = delete; - context_mpsc_queue & operator=( context_mpsc_queue const&) = delete; - - void push( context * ctx) noexcept { - BOOST_ASSERT( nullptr != ctx); - ctx->remote_nxt_.store( nullptr, std::memory_order_release); - context * prev = head_.exchange( ctx, std::memory_order_acq_rel); - prev->remote_nxt_.store( ctx, std::memory_order_release); - } - - context * pop() noexcept { - context * tail = tail_; - context * next = tail->remote_nxt_.load( std::memory_order_acquire); - if ( dummy_ == tail) { - if ( nullptr == next) { - return nullptr; - } - tail_ = next; - tail = next; - next = next->remote_nxt_.load( std::memory_order_acquire);; - } - if ( nullptr != next) { - tail_ = next; - return tail; - } - context * head = head_.load( std::memory_order_acquire); - if ( tail != head) { - return nullptr; - } - push( dummy_); - next = tail->remote_nxt_.load( std::memory_order_acquire); - if ( nullptr != next) { - tail_= next; - return tail; - } - return nullptr; - } -}; - -}}} - -#ifdef BOOST_HAS_ABI_HEADERS -# include BOOST_ABI_SUFFIX -#endif - -#endif // BOOST_FIBERS_DETAIL_CONTEXT_MPSC_QUEUE_H diff --git a/boost/fiber/detail/context_spinlock_queue.hpp b/boost/fiber/detail/context_spinlock_queue.hpp new file mode 100644 index 0000000000..e0ebdabda6 --- /dev/null +++ b/boost/fiber/detail/context_spinlock_queue.hpp @@ -0,0 +1,118 @@ + +// Copyright Oliver Kowalke 2015. +// Distributed under the Boost Software License, Version 1.0. +// (See accompanying file LICENSE_1_0.txt or copy at +// http://www.boost.org/LICENSE_1_0.txt) +// + +#ifndef BOOST_FIBERS_DETAIL_SPINLOCK_QUEUE_H +#define BOOST_FIBERS_DETAIL_SPINLOCK_QUEUE_H + +#include <cstddef> +#include <cstring> +#include <mutex> + +#include <boost/config.hpp> + +#include <boost/fiber/context.hpp> +#include <boost/fiber/detail/config.hpp> +#include <boost/fiber/detail/spinlock.hpp> + +#ifdef BOOST_HAS_ABI_HEADERS +# include BOOST_ABI_PREFIX +#endif + +namespace boost { +namespace fibers { +namespace detail { + +class context_spinlock_queue { +private: + typedef context * slot_type; + + alignas(cache_alignment) mutable spinlock splk_{}; + std::size_t pidx_{ 0 }; + std::size_t cidx_{ 0 }; + std::size_t capacity_; + slot_type * slots_; + + void resize_() { + slot_type * old_slots = slots_; + slots_ = new slot_type[2*capacity_]; + std::size_t offset = capacity_ - cidx_; + std::memcpy( slots_, old_slots + cidx_, offset * sizeof( slot_type) ); + if ( 0 < cidx_) { + std::memcpy( slots_ + offset, old_slots, pidx_ * sizeof( slot_type) ); + } + cidx_ = 0; + pidx_ = capacity_ - 1; + capacity_ *= 2; + delete [] old_slots; + } + + bool is_full_() const noexcept { + return cidx_ == ((pidx_ + 1) % capacity_); + } + + bool is_empty_() const noexcept { + return cidx_ == pidx_; + } + +public: + context_spinlock_queue( std::size_t capacity = 4096) : + capacity_{ capacity } { + slots_ = new slot_type[capacity_]; + } + + ~context_spinlock_queue() { + delete [] slots_; + } + + context_spinlock_queue( context_spinlock_queue const&) = delete; + context_spinlock_queue & operator=( context_spinlock_queue const&) = delete; + + bool empty() const noexcept { + spinlock_lock lk{ splk_ }; + return is_empty_(); + } + + void push( context * c) { + spinlock_lock lk{ splk_ }; + if ( is_full_() ) { + resize_(); + } + slots_[pidx_] = c; + pidx_ = (pidx_ + 1) % capacity_; + } + + context * pop() { + spinlock_lock lk{ splk_ }; + context * c = nullptr; + if ( ! is_empty_() ) { + c = slots_[cidx_]; + cidx_ = (cidx_ + 1) % capacity_; + } + return c; + } + + context * steal() { + spinlock_lock lk{ splk_ }; + context * c = nullptr; + if ( ! is_empty_() ) { + c = slots_[cidx_]; + if ( c->is_context( type::pinned_context) ) { + return nullptr; + } + cidx_ = (cidx_ + 1) % capacity_; + } + return c; + } +}; + +}}} + +#ifdef BOOST_HAS_ABI_HEADERS +# include BOOST_ABI_SUFFIX +#endif + +#endif // BOOST_FIBERS_DETAIL_SPINLOCK_QUEUE_H diff --git a/boost/fiber/detail/context_spmc_queue.hpp b/boost/fiber/detail/context_spmc_queue.hpp index 6449e3658f..27256233cf 100644 --- a/boost/fiber/detail/context_spmc_queue.hpp +++ b/boost/fiber/detail/context_spmc_queue.hpp @@ -30,6 +30,11 @@ // In Proceedings of the 18th ACM SIGPLAN symposium on Principles and practice // of parallel programming (PPoPP '13). ACM, New York, NY, USA, 69-80. +#if BOOST_COMP_CLANG +#pragma clang diagnostic push +#pragma clang diagnostic ignored "-Wunused-private-field" +#endif + namespace boost { namespace fibers { namespace detail { @@ -43,43 +48,43 @@ private: sizeof( atomic_type), cache_alignment >::type storage_type; - std::size_t size_; + std::size_t capacity_; storage_type * storage_; public: - array( std::size_t size) : - size_{ size }, - storage_{ new storage_type[size_] } { - for ( std::size_t i = 0; i < size_; ++i) { + array( std::size_t capacity) : + capacity_{ capacity }, + storage_{ new storage_type[capacity_] } { + for ( std::size_t i = 0; i < capacity_; ++i) { ::new ( static_cast< void * >( std::addressof( storage_[i]) ) ) atomic_type{ nullptr }; } } ~array() { - for ( std::size_t i = 0; i < size_; ++i) { + for ( std::size_t i = 0; i < capacity_; ++i) { reinterpret_cast< atomic_type * >( std::addressof( storage_[i]) )->~atomic_type(); } delete [] storage_; } - std::size_t size() const noexcept { - return size_; + std::size_t capacity() const noexcept { + return capacity_; } void push( std::size_t bottom, context * ctx) noexcept { reinterpret_cast< atomic_type * >( - std::addressof( storage_[bottom % size_]) ) + std::addressof( storage_[bottom % capacity_]) ) ->store( ctx, std::memory_order_relaxed); } context * pop( std::size_t top) noexcept { return reinterpret_cast< atomic_type * >( - std::addressof( storage_[top % size_]) ) + std::addressof( storage_[top % capacity_]) ) ->load( std::memory_order_relaxed); } array * resize( std::size_t bottom, std::size_t top) { - std::unique_ptr< array > tmp{ new array{ 2 * size_ } }; + std::unique_ptr< array > tmp{ new array{ 2 * capacity_ } }; for ( std::size_t i = top; i != bottom; ++i) { tmp->push( i, pop( i) ); } @@ -87,15 +92,15 @@ private: } }; - alignas(cache_alignment) std::atomic< std::size_t > top_{ 0 }; - alignas(cache_alignment) std::atomic< std::size_t > bottom_{ 0 }; + alignas(cache_alignment) std::atomic< std::size_t > top_{ 0 }; + alignas(cache_alignment) std::atomic< std::size_t > bottom_{ 0 }; alignas(cache_alignment) std::atomic< array * > array_; - std::vector< array * > old_arrays_{}; + std::vector< array * > old_arrays_{}; char padding_[cacheline_length]; public: - context_spmc_queue() : - array_{ new array{ 1024 } } { + context_spmc_queue( std::size_t capacity = 4096) : + array_{ new array{ capacity } } { old_arrays_.reserve( 32); } @@ -110,19 +115,19 @@ public: context_spmc_queue & operator=( context_spmc_queue const&) = delete; bool empty() const noexcept { - std::size_t bottom{ bottom_.load( std::memory_order_relaxed) }; - std::size_t top{ top_.load( std::memory_order_relaxed) }; + std::size_t bottom = bottom_.load( std::memory_order_relaxed); + std::size_t top = top_.load( std::memory_order_relaxed); return bottom <= top; } void push( context * ctx) { - std::size_t bottom{ bottom_.load( std::memory_order_relaxed) }; - std::size_t top{ top_.load( std::memory_order_acquire) }; - array * a{ array_.load( std::memory_order_relaxed) }; - if ( (a->size() - 1) < (bottom - top) ) { + std::size_t bottom = bottom_.load( std::memory_order_relaxed); + std::size_t top = top_.load( std::memory_order_acquire); + array * a = array_.load( std::memory_order_relaxed); + if ( (a->capacity() - 1) < (bottom - top) ) { // queue is full // resize - array * tmp{ a->resize( bottom, top) }; + array * tmp = a->resize( bottom, top); old_arrays_.push_back( a); std::swap( a, tmp); array_.store( a, std::memory_order_relaxed); @@ -133,16 +138,48 @@ public: } context * pop() { - std::size_t top{ top_.load( std::memory_order_acquire) }; + std::size_t bottom = bottom_.load( std::memory_order_relaxed) - 1; + array * a = array_.load( std::memory_order_relaxed); + bottom_.store( bottom, std::memory_order_relaxed); std::atomic_thread_fence( std::memory_order_seq_cst); - std::size_t bottom{ bottom_.load( std::memory_order_acquire) }; - context * ctx{ nullptr }; + std::size_t top = top_.load( std::memory_order_relaxed); + context * ctx = nullptr; + if ( top <= bottom) { + // queue is not empty + ctx = a->pop( bottom); + BOOST_ASSERT( nullptr != ctx); + if ( top == bottom) { + // last element dequeued + if ( ! top_.compare_exchange_strong( top, top + 1, + std::memory_order_seq_cst, + std::memory_order_relaxed) ) { + // lose the race + ctx = nullptr; + } + bottom_.store( bottom + 1, std::memory_order_relaxed); + } + } else { + // queue is empty + bottom_.store( bottom + 1, std::memory_order_relaxed); + } + return ctx; + } + + context * steal() { + std::size_t top = top_.load( std::memory_order_acquire); + std::atomic_thread_fence( std::memory_order_seq_cst); + std::size_t bottom = bottom_.load( std::memory_order_acquire); + context * ctx = nullptr; if ( top < bottom) { // queue is not empty - array * a{ array_.load( std::memory_order_consume) }; + array * a = array_.load( std::memory_order_consume); ctx = a->pop( top); - if ( ctx->is_context( type::pinned_context) || - ! top_.compare_exchange_strong( top, top + 1, + BOOST_ASSERT( nullptr != ctx); + // do not steal pinned context (e.g. main-/dispatcher-context) + if ( ctx->is_context( type::pinned_context) ) { + return nullptr; + } + if ( ! top_.compare_exchange_strong( top, top + 1, std::memory_order_seq_cst, std::memory_order_relaxed) ) { // lose the race @@ -155,4 +192,8 @@ public: }}} +#if BOOST_COMP_CLANG +#pragma clang diagnostic pop +#endif + #endif // BOOST_FIBERS_DETAIL_CONTEXT_SPMC_QUEUE_H diff --git a/boost/fiber/detail/cpu_relax.hpp b/boost/fiber/detail/cpu_relax.hpp index d00020a23b..541b46dfd0 100644 --- a/boost/fiber/detail/cpu_relax.hpp +++ b/boost/fiber/detail/cpu_relax.hpp @@ -7,6 +7,7 @@ #ifndef BOOST_FIBERS_DETAIL_CPU_RELAX_H #define BOOST_FIBERS_DETAIL_CPU_RELAX_H +#include <chrono> #include <thread> #include <boost/config.hpp> @@ -14,7 +15,7 @@ #include <boost/fiber/detail/config.hpp> -#if BOOST_COMP_MSVC +#if BOOST_COMP_MSVC || BOOST_COMP_MSVC_EMULATED # include <Windows.h> #endif @@ -29,22 +30,47 @@ namespace detail { #if BOOST_ARCH_ARM # if BOOST_COMP_MSVC # define cpu_relax() YieldProcessor(); -# else +# elif (defined(__ARM_ARCH_6K__) || \ + defined(__ARM_ARCH_6Z__) || \ + defined(__ARM_ARCH_6ZK__) || \ + defined(__ARM_ARCH_6T2__) || \ + defined(__ARM_ARCH_7__) || \ + defined(__ARM_ARCH_7A__) || \ + defined(__ARM_ARCH_7R__) || \ + defined(__ARM_ARCH_7M__) || \ + defined(__ARM_ARCH_7S__) || \ + defined(__ARM_ARCH_8A__) || \ + defined(__aarch64__)) +// http://groups.google.com/a/chromium.org/forum/#!msg/chromium-dev/YGVrZbxYOlU/Vpgy__zeBQAJ +// mnemonic 'yield' is supported from ARMv6k onwards # define cpu_relax() asm volatile ("yield" ::: "memory"); +# else +# define cpu_relax() asm volatile ("nop" ::: "memory"); # endif #elif BOOST_ARCH_MIPS # define cpu_relax() asm volatile ("pause" ::: "memory"); #elif BOOST_ARCH_PPC +// http://code.metager.de/source/xref/gnu/glibc/sysdeps/powerpc/sys/platform/ppc.h +// http://stackoverflow.com/questions/5425506/equivalent-of-x86-pause-instruction-for-ppc +// mnemonic 'or' shared resource hints +// or 27, 27, 27 This form of 'or' provides a hint that performance +// will probably be imrpoved if shared resources dedicated +// to the executing processor are released for use by other +// processors +// extended mnemonics (available with POWER7) +// yield == or 27, 27, 27 # define cpu_relax() asm volatile ("or 27,27,27" ::: "memory"); #elif BOOST_ARCH_X86 -# if BOOST_COMP_MSVC +# if BOOST_COMP_MSVC || BOOST_COMP_MSVC_EMULATED # define cpu_relax() YieldProcessor(); # else # define cpu_relax() asm volatile ("pause" ::: "memory"); # endif #else -# warning "architecture does not support yield/pause mnemonic" -# define cpu_relax() std::this_thread::yield(); +# define cpu_relax() { \ + static constexpr std::chrono::microseconds us0{ 0 }; \ + std::this_thread::sleep_for( us0); \ + } #endif }}} diff --git a/boost/fiber/detail/data.hpp b/boost/fiber/detail/data.hpp index 24e833a9e8..e2b119ec3e 100644 --- a/boost/fiber/detail/data.hpp +++ b/boost/fiber/detail/data.hpp @@ -28,7 +28,7 @@ struct data_t { spinlock_lock * lk{ nullptr }; context * ctx{ nullptr }; - data_t() noexcept = default; + data_t() = default; explicit data_t( spinlock_lock * lk_) noexcept : lk{ lk_ } { diff --git a/boost/fiber/detail/fss.hpp b/boost/fiber/detail/fss.hpp index 54dc5b79d3..27a7d67f26 100644 --- a/boost/fiber/detail/fss.hpp +++ b/boost/fiber/detail/fss.hpp @@ -38,12 +38,13 @@ public: friend inline void intrusive_ptr_add_ref( fss_cleanup_function * p) noexcept { - ++p->use_count_; + p->use_count_.fetch_add( 1, std::memory_order_relaxed); } friend inline void intrusive_ptr_release( fss_cleanup_function * p) noexcept { - if ( --p->use_count_ == 0) { + if ( 1 == p->use_count_.fetch_sub( 1, std::memory_order_release) ) { + std::atomic_thread_fence( std::memory_order_acquire); delete p; } } diff --git a/boost/fiber/detail/futex.hpp b/boost/fiber/detail/futex.hpp index 4c966867c5..d383dc4077 100644 --- a/boost/fiber/detail/futex.hpp +++ b/boost/fiber/detail/futex.hpp @@ -49,7 +49,7 @@ int futex_wake( std::atomic< std::int32_t > * addr) { inline int futex_wait( std::atomic< std::int32_t > * addr, std::int32_t x) { - ::WaitOnAddress( static_cast< volatile void * >( addr), & x, sizeof( x), -1); + ::WaitOnAddress( static_cast< volatile void * >( addr), & x, sizeof( x), INFINITE); return 0; } #else diff --git a/boost/fiber/detail/spinlock_ttas.hpp b/boost/fiber/detail/spinlock_ttas.hpp index d64630d84d..380773ad6d 100644 --- a/boost/fiber/detail/spinlock_ttas.hpp +++ b/boost/fiber/detail/spinlock_ttas.hpp @@ -19,6 +19,11 @@ // https://software.intel.com/en-us/articles/benefitting-power-and-performance-sleep-loops // https://software.intel.com/en-us/articles/long-duration-spin-wait-loops-on-hyper-threading-technology-enabled-intel-processors +#if BOOST_COMP_CLANG +#pragma clang diagnostic push +#pragma clang diagnostic ignored "-Wunused-private-field" +#endif + namespace boost { namespace fibers { namespace detail { @@ -30,10 +35,7 @@ private: unlocked }; - // align shared variable 'state_' at cache line to prevent false sharing - alignas(cache_alignment) std::atomic< spinlock_status > state_{ spinlock_status::unlocked }; - // padding to avoid other data one the cacheline of shared variable 'state_' - char pad[cacheline_length]; + std::atomic< spinlock_status > state_{ spinlock_status::unlocked }; public: spinlock_ttas() noexcept = default; @@ -63,20 +65,15 @@ public: // delays the next instruction's execution for a finite period of time (depends on processor family) // the CPU is not under demand, parts of the pipeline are no longer being used // -> reduces the power consumed by the CPU + // -> prevent pipeline stalls cpu_relax(); - } else if ( BOOST_FIBERS_SPIN_MAX_TESTS + 20 > tests) { - ++tests; + } else { // std::this_thread::sleep_for( 0us) has a fairly long instruction path length, // combined with an expensive ring3 to ring 0 transition costing about 1000 cycles // std::this_thread::sleep_for( 0us) lets give up this_thread the remaining part of its time slice // if and only if a thread of equal or greater priority is ready to run static constexpr std::chrono::microseconds us0{ 0 }; std::this_thread::sleep_for( us0); - } else { - // std::this_thread::yield() allows this_thread to give up the remaining part of its time slice, - // but only to another thread on the same processor - // instead of constant checking, a thread only checks if no other useful work is pending - std::this_thread::yield(); } #else std::this_thread::yield(); @@ -89,10 +86,12 @@ public: // utilize 'Binary Exponential Backoff' algorithm // linear_congruential_engine is a random number engine based on Linear congruential generator (LCG) static thread_local std::minstd_rand generator; - const std::size_t z = - std::uniform_int_distribution< std::size_t >{ 0, static_cast< std::size_t >( 1) << collisions }( generator); + static std::uniform_int_distribution< std::size_t > distribution{ 0, static_cast< std::size_t >( 1) << collisions }; + const std::size_t z = distribution( generator); ++collisions; for ( std::size_t i = 0; i < z; ++i) { + // -> reduces the power consumed by the CPU + // -> prevent pipeline stalls cpu_relax(); } } else { @@ -109,4 +108,8 @@ public: }}} +#if BOOST_COMP_CLANG +#pragma clang diagnostic pop +#endif + #endif // BOOST_FIBERS_SPINLOCK_TTAS_H diff --git a/boost/fiber/detail/spinlock_ttas_adaptive.hpp b/boost/fiber/detail/spinlock_ttas_adaptive.hpp index c6a9a57d79..da044b6298 100644 --- a/boost/fiber/detail/spinlock_ttas_adaptive.hpp +++ b/boost/fiber/detail/spinlock_ttas_adaptive.hpp @@ -31,11 +31,8 @@ private: unlocked }; - // align shared variable 'state_' at cache line to prevent false sharing - alignas(cache_alignment) std::atomic< spinlock_status > state_{ spinlock_status::unlocked }; - std::atomic< std::size_t > tests_{ 0 }; - // padding to avoid other data one the cacheline of shared variable 'state_' - char pad[cacheline_length]; + std::atomic< spinlock_status > state_{ spinlock_status::unlocked }; + std::atomic< std::size_t > tests_{ 0 }; public: spinlock_ttas_adaptive() noexcept = default; @@ -67,8 +64,9 @@ public: // delays the next instruction's execution for a finite period of time (depends on processor family) // the CPU is not under demand, parts of the pipeline are no longer being used // -> reduces the power consumed by the CPU + // -> prevent pipeline stalls cpu_relax(); - } else if ( BOOST_FIBERS_SPIN_MAX_TESTS + 20 > tests) { + } else { ++tests; // std::this_thread::sleep_for( 0us) has a fairly long instruction path length, // combined with an expensive ring3 to ring 0 transition costing about 1000 cycles @@ -76,11 +74,6 @@ public: // if and only if a thread of equal or greater priority is ready to run static constexpr std::chrono::microseconds us0{ 0 }; std::this_thread::sleep_for( us0); - } else { - // std::this_thread::yield() allows this_thread to give up the remaining part of its time slice, - // but only to another thread on the same processor - // instead of constant checking, a thread only checks if no other useful work is pending - std::this_thread::yield(); } #else std::this_thread::yield(); @@ -93,10 +86,12 @@ public: // utilize 'Binary Exponential Backoff' algorithm // linear_congruential_engine is a random number engine based on Linear congruential generator (LCG) static thread_local std::minstd_rand generator; - const std::size_t z = - std::uniform_int_distribution< std::size_t >{ 0, static_cast< std::size_t >( 1) << collisions }( generator); + static std::uniform_int_distribution< std::size_t > distribution{ 0, static_cast< std::size_t >( 1) << collisions }; + const std::size_t z = distribution( generator); ++collisions; for ( std::size_t i = 0; i < z; ++i) { + // -> reduces the power consumed by the CPU + // -> prevent pipeline stalls cpu_relax(); } } else { diff --git a/boost/fiber/detail/spinlock_ttas_adaptive_futex.hpp b/boost/fiber/detail/spinlock_ttas_adaptive_futex.hpp index fbd6a0e4d2..61ab47691e 100644 --- a/boost/fiber/detail/spinlock_ttas_adaptive_futex.hpp +++ b/boost/fiber/detail/spinlock_ttas_adaptive_futex.hpp @@ -26,11 +26,8 @@ namespace detail { class spinlock_ttas_adaptive_futex { private: - // align shared variable 'value_' at cache line to prevent false sharing - alignas(cache_alignment) std::atomic< std::int32_t > value_{ 0 }; - std::atomic< std::int32_t > tests_{ 0 }; - // padding to avoid other data one the cacheline of shared variable 'value_' - char pad_[cacheline_length]; + std::atomic< std::int32_t > value_{ 0 }; + std::atomic< std::int32_t > tests_{ 0 }; public: spinlock_ttas_adaptive_futex() noexcept = default; @@ -61,6 +58,7 @@ public: // delays the next instruction's execution for a finite period of time (depends on processor family) // the CPU is not under demand, parts of the pipeline are no longer being used // -> reduces the power consumed by the CPU + // -> prevent pipeline stalls cpu_relax(); #else // std::this_thread::yield() allows this_thread to give up the remaining part of its time slice, @@ -73,10 +71,12 @@ public: // utilize 'Binary Exponential Backoff' algorithm // linear_congruential_engine is a random number engine based on Linear congruential generator (LCG) static thread_local std::minstd_rand generator; - const std::int32_t z = std::uniform_int_distribution< std::int32_t >{ - 0, static_cast< std::int32_t >( 1) << collisions }( generator); + static std::uniform_int_distribution< std::int32_t > distribution{ 0, static_cast< std::int32_t >( 1) << collisions }; + const std::int32_t z = distribution( generator); ++collisions; for ( std::int32_t i = 0; i < z; ++i) { + // -> reduces the power consumed by the CPU + // -> prevent pipeline stalls cpu_relax(); } } else { diff --git a/boost/fiber/detail/spinlock_ttas_futex.hpp b/boost/fiber/detail/spinlock_ttas_futex.hpp index b11e63b587..a427b73ba5 100644 --- a/boost/fiber/detail/spinlock_ttas_futex.hpp +++ b/boost/fiber/detail/spinlock_ttas_futex.hpp @@ -25,10 +25,7 @@ namespace detail { class spinlock_ttas_futex { private: - // align shared variable 'value_' at cache line to prevent false sharing - alignas(cache_alignment) std::atomic< std::int32_t > value_{ 0 }; - // padding to avoid other data one the cacheline of shared variable 'value_' - char pad_[cacheline_length]; + std::atomic< std::int32_t > value_{ 0 }; public: spinlock_ttas_futex() noexcept = default; @@ -57,6 +54,7 @@ public: // delays the next instruction's execution for a finite period of time (depends on processor family) // the CPU is not under demand, parts of the pipeline are no longer being used // -> reduces the power consumed by the CPU + // -> prevent pipeline stalls cpu_relax(); #else // std::this_thread::yield() allows this_thread to give up the remaining part of its time slice, @@ -69,10 +67,12 @@ public: // utilize 'Binary Exponential Backoff' algorithm // linear_congruential_engine is a random number engine based on Linear congruential generator (LCG) static thread_local std::minstd_rand generator; - const std::int32_t z = std::uniform_int_distribution< std::int32_t >{ - 0, static_cast< std::int32_t >( 1) << collisions }( generator); + static std::uniform_int_distribution< std::int32_t > distribution{ 0, static_cast< std::int32_t >( 1) << collisions }; + const std::int32_t z = distribution( generator); ++collisions; for ( std::int32_t i = 0; i < z; ++i) { + // -> reduces the power consumed by the CPU + // -> prevent pipeline stalls cpu_relax(); } } else { diff --git a/boost/fiber/detail/wrap.hpp b/boost/fiber/detail/wrap.hpp index 0369e61ee6..558de6bd94 100644 --- a/boost/fiber/detail/wrap.hpp +++ b/boost/fiber/detail/wrap.hpp @@ -10,8 +10,14 @@ #include <type_traits> #include <boost/config.hpp> +#if defined(BOOST_NO_CXX17_STD_INVOKE) #include <boost/context/detail/invoke.hpp> -#include <boost/context/execution_context.hpp> +#endif +#if (BOOST_EXECUTION_CONTEXT==1) +# include <boost/context/execution_context.hpp> +#else +# include <boost/context/continuation.hpp> +#endif #include <boost/fiber/detail/config.hpp> #include <boost/fiber/detail/data.hpp> @@ -36,9 +42,9 @@ private: public: wrapper( Fn1 && fn1, Fn2 && fn2, Tpl && tpl, boost::context::execution_context const& ctx) : - fn1_( std::move( fn1) ), - fn2_( std::move( fn2) ), - tpl_( std::move( tpl) ), + fn1_{ std::move( fn1) }, + fn2_{ std::move( fn2) }, + tpl_{ std::move( tpl) }, ctx_{ ctx } { } @@ -49,9 +55,11 @@ public: wrapper & operator=( wrapper && other) = default; void operator()( void * vp) { - boost::context::detail::invoke( - std::move( fn1_), - fn2_, tpl_, ctx_, vp); +#if defined(BOOST_NO_CXX17_STD_INVOKE) + boost::context::detail::invoke( std::move( fn1_), fn2_, tpl_, ctx_, vp); +#else + std::invoke( std::move( fn1_), fn2_, tpl_, ctx_, vp); +#endif } }; @@ -59,11 +67,11 @@ template< typename Fn1, typename Fn2, typename Tpl > wrapper< Fn1, Fn2, Tpl > wrap( Fn1 && fn1, Fn2 && fn2, Tpl && tpl, boost::context::execution_context const& ctx) { - return wrapper< Fn1, Fn2, Tpl >( + return wrapper< Fn1, Fn2, Tpl >{ std::forward< Fn1 >( fn1), std::forward< Fn2 >( fn2), std::forward< Tpl >( tpl), - ctx); + ctx }; } #else template< typename Fn1, typename Fn2, typename Tpl > @@ -75,9 +83,9 @@ private: public: wrapper( Fn1 && fn1, Fn2 && fn2, Tpl && tpl) : - fn1_( std::move( fn1) ), - fn2_( std::move( fn2) ), - tpl_( std::move( tpl) ) { + fn1_{ std::move( fn1) }, + fn2_{ std::move( fn2) }, + tpl_{ std::move( tpl) } { } wrapper( wrapper const&) = delete; @@ -86,24 +94,31 @@ public: wrapper( wrapper && other) = default; wrapper & operator=( wrapper && other) = default; - boost::context::execution_context< data_t * > - operator()( boost::context::execution_context< data_t * > && ctx, data_t * dp) { + boost::context::continuation + operator()( boost::context::continuation && c) { +#if defined(BOOST_NO_CXX17_STD_INVOKE) return boost::context::detail::invoke( std::move( fn1_), fn2_, tpl_, - std::forward< boost::context::execution_context< data_t * > >( ctx), - dp); + std::forward< boost::context::continuation >( c) ); +#else + return std::invoke( + std::move( fn1_), + fn2_, + tpl_, + std::forward< boost::context::continuation >( c) ); +#endif } }; template< typename Fn1, typename Fn2, typename Tpl > wrapper< Fn1, Fn2, Tpl > wrap( Fn1 && fn1, Fn2 && fn2, Tpl && tpl) { - return wrapper< Fn1, Fn2, Tpl >( + return wrapper< Fn1, Fn2, Tpl >{ std::forward< Fn1 >( fn1), std::forward< Fn2 >( fn2), - std::forward< Tpl >( tpl) ); + std::forward< Tpl >( tpl) }; } #endif diff --git a/boost/fiber/exceptions.hpp b/boost/fiber/exceptions.hpp index ddbf45eadf..5a5f7e4f1b 100644 --- a/boost/fiber/exceptions.hpp +++ b/boost/fiber/exceptions.hpp @@ -28,15 +28,15 @@ namespace fibers { class fiber_error : public std::system_error { public: fiber_error( std::error_code ec) : - std::system_error( ec) { + std::system_error{ ec } { } fiber_error( std::error_code ec, const char * what_arg) : - std::system_error( ec, what_arg) { + std::system_error{ ec, what_arg } { } fiber_error( std::error_code ec, std::string const& what_arg) : - std::system_error( ec, what_arg) { + std::system_error{ ec, what_arg } { } virtual ~fiber_error() = default; @@ -45,15 +45,15 @@ public: class lock_error : public fiber_error { public: lock_error( std::error_code ec) : - fiber_error( ec) { + fiber_error{ ec } { } lock_error( std::error_code ec, const char * what_arg) : - fiber_error( ec, what_arg) { + fiber_error{ ec, what_arg } { } lock_error( std::error_code ec, std::string const& what_arg) : - fiber_error( ec, what_arg) { + fiber_error{ ec, what_arg } { } }; @@ -77,12 +77,12 @@ struct is_error_code_enum< boost::fibers::future_errc > : public true_type { inline std::error_code make_error_code( boost::fibers::future_errc e) noexcept { - return std::error_code( static_cast< int >( e), boost::fibers::future_category() ); + return std::error_code{ static_cast< int >( e), boost::fibers::future_category() }; } inline std::error_condition make_error_condition( boost::fibers::future_errc e) noexcept { - return std::error_condition( static_cast< int >( e), boost::fibers::future_category() ); + return std::error_condition{ static_cast< int >( e), boost::fibers::future_category() }; } } @@ -93,49 +93,49 @@ namespace fibers { class future_error : public fiber_error { public: future_error( std::error_code ec) : - fiber_error( ec) { + fiber_error{ ec } { } }; class future_uninitialized : public future_error { public: future_uninitialized() : - future_error( std::make_error_code( future_errc::no_state) ) { + future_error{ std::make_error_code( future_errc::no_state) } { } }; class future_already_retrieved : public future_error { public: future_already_retrieved() : - future_error( std::make_error_code( future_errc::future_already_retrieved) ) { + future_error{ std::make_error_code( future_errc::future_already_retrieved) } { } }; class broken_promise : public future_error { public: broken_promise() : - future_error( std::make_error_code( future_errc::broken_promise) ) { + future_error{ std::make_error_code( future_errc::broken_promise) } { } }; class promise_already_satisfied : public future_error { public: promise_already_satisfied() : - future_error( std::make_error_code( future_errc::promise_already_satisfied) ) { + future_error{ std::make_error_code( future_errc::promise_already_satisfied) } { } }; class promise_uninitialized : public future_error { public: promise_uninitialized() : - future_error( std::make_error_code( future_errc::no_state) ) { + future_error{ std::make_error_code( future_errc::no_state) } { } }; class packaged_task_uninitialized : public future_error { public: packaged_task_uninitialized() : - future_error( std::make_error_code( future_errc::no_state) ) { + future_error{ std::make_error_code( future_errc::no_state) } { } }; diff --git a/boost/fiber/fiber.hpp b/boost/fiber/fiber.hpp index 0fbc84ad12..1508a9b5a6 100644 --- a/boost/fiber/fiber.hpp +++ b/boost/fiber/fiber.hpp @@ -49,7 +49,7 @@ private: public: typedef context::id id; - fiber() noexcept = default; + fiber() = default; template< typename Fn, typename ... Args, @@ -108,7 +108,9 @@ public: if ( joinable() ) { std::terminate(); } - if ( this == & other) return * this; + if ( this == & other) { + return * this; + } impl_.swap( other.impl_); return * this; } @@ -132,7 +134,7 @@ public: template< typename PROPS > PROPS & properties() { auto props = impl_->get_properties(); - BOOST_ASSERT_MSG(props, "fiber::properties not set"); + BOOST_ASSERT_MSG( props, "fiber::properties not set"); return dynamic_cast< PROPS & >( * props ); } }; diff --git a/boost/fiber/fss.hpp b/boost/fiber/fss.hpp index a578d40a7f..f65d7353b3 100644 --- a/boost/fiber/fss.hpp +++ b/boost/fiber/fss.hpp @@ -58,9 +58,9 @@ public: } ~fiber_specific_ptr() { - context * f = context::active(); - if ( nullptr != f) { - f->set_fss_data( + context * active_ctx = context::active(); + if ( nullptr != active_ctx) { + active_ctx->set_fss_data( this, cleanup_fn_, nullptr, true); } } diff --git a/boost/fiber/future/async.hpp b/boost/fiber/future/async.hpp index d969d19ce3..e68b2c28fa 100644 --- a/boost/fiber/future/async.hpp +++ b/boost/fiber/future/async.hpp @@ -23,63 +23,85 @@ namespace fibers { template< typename Fn, typename ... Args > future< - typename std::result_of< - typename std::enable_if< - ! detail::is_launch_policy< typename std::decay< Fn >::type >::value, - typename std::decay< Fn >::type - >::type( typename std::decay< Args >::type ... ) - >::type + typename std::result_of< + typename std::enable_if< + ! detail::is_launch_policy< typename std::decay< Fn >::type >::value, + typename std::decay< Fn >::type + >::type( typename std::decay< Args >::type ... ) + >::type > async( Fn && fn, Args && ... args) { typedef typename std::result_of< typename std::decay< Fn >::type( typename std::decay< Args >::type ... ) - >::type result_t; + >::type result_type; - packaged_task< result_t( typename std::decay< Args >::type ... ) > pt{ + packaged_task< result_type( typename std::decay< Args >::type ... ) > pt{ std::forward< Fn >( fn) }; - future< result_t > f{ pt.get_future() }; + future< result_type > f{ pt.get_future() }; fiber{ std::move( pt), std::forward< Args >( args) ... }.detach(); return f; } template< typename Policy, typename Fn, typename ... Args > future< - typename std::result_of< - typename std::enable_if< - detail::is_launch_policy< Policy >::value, - typename std::decay< Fn >::type - >::type( typename std::decay< Args >::type ...) - >::type + typename std::result_of< + typename std::enable_if< + detail::is_launch_policy< Policy >::value, + typename std::decay< Fn >::type + >::type( typename std::decay< Args >::type ...) + >::type > async( Policy policy, Fn && fn, Args && ... args) { typedef typename std::result_of< typename std::decay< Fn >::type( typename std::decay< Args >::type ... ) - >::type result_t; + >::type result_type; - packaged_task< result_t( typename std::decay< Args >::type ... ) > pt{ + packaged_task< result_type( typename std::decay< Args >::type ... ) > pt{ std::forward< Fn >( fn) }; - future< result_t > f{ pt.get_future() }; + future< result_type > f{ pt.get_future() }; fiber{ policy, std::move( pt), std::forward< Args >( args) ... }.detach(); return f; } template< typename Policy, typename StackAllocator, typename Fn, typename ... Args > future< - typename std::result_of< - typename std::enable_if< - detail::is_launch_policy< Policy >::value, - typename std::decay< Fn >::type - >::type( typename std::decay< Args >::type ... ) - >::type + typename std::result_of< + typename std::enable_if< + detail::is_launch_policy< Policy >::value, + typename std::decay< Fn >::type + >::type( typename std::decay< Args >::type ... ) + >::type > async( Policy policy, std::allocator_arg_t, StackAllocator salloc, Fn && fn, Args && ... args) { typedef typename std::result_of< typename std::decay< Fn >::type( typename std::decay< Args >::type ... ) - >::type result_t; + >::type result_type; - packaged_task< result_t( typename std::decay< Args >::type ... ) > pt{ - std::allocator_arg, salloc, std::forward< Fn >( fn) }; - future< result_t > f{ pt.get_future() }; + packaged_task< result_type( typename std::decay< Args >::type ... ) > pt{ + std::forward< Fn >( fn) }; + future< result_type > f{ pt.get_future() }; + fiber{ policy, std::allocator_arg, salloc, + std::move( pt), std::forward< Args >( args) ... }.detach(); + return f; +} + +template< typename Policy, typename StackAllocator, typename Allocator, typename Fn, typename ... Args > +future< + typename std::result_of< + typename std::enable_if< + detail::is_launch_policy< Policy >::value, + typename std::decay< Fn >::type + >::type( typename std::decay< Args >::type ... ) + >::type +> +async( Policy policy, std::allocator_arg_t, StackAllocator salloc, Allocator alloc, Fn && fn, Args && ... args) { + typedef typename std::result_of< + typename std::decay< Fn >::type( typename std::decay< Args >::type ... ) + >::type result_type; + + packaged_task< result_type( typename std::decay< Args >::type ... ) > pt{ + std::allocator_arg, alloc, std::forward< Fn >( fn) }; + future< result_type > f{ pt.get_future() }; fiber{ policy, std::allocator_arg, salloc, std::move( pt), std::forward< Args >( args) ... }.detach(); return f; diff --git a/boost/fiber/future/detail/shared_state.hpp b/boost/fiber/future/detail/shared_state.hpp index 5ec6858cf3..898fdaffd4 100644 --- a/boost/fiber/future/detail/shared_state.hpp +++ b/boost/fiber/future/detail/shared_state.hpp @@ -109,46 +109,47 @@ public: shared_state_base & operator=( shared_state_base const&) = delete; void owner_destroyed() { - std::unique_lock< mutex > lk( mtx_); + std::unique_lock< mutex > lk{ mtx_ }; owner_destroyed_( lk); } void set_exception( std::exception_ptr except) { - std::unique_lock< mutex > lk( mtx_); + std::unique_lock< mutex > lk{ mtx_ }; set_exception_( except, lk); } std::exception_ptr get_exception_ptr() { - std::unique_lock< mutex > lk( mtx_); + std::unique_lock< mutex > lk{ mtx_ }; return get_exception_ptr_( lk); } void wait() const { - std::unique_lock< mutex > lk( mtx_); + std::unique_lock< mutex > lk{ mtx_ }; wait_( lk); } template< typename Rep, typename Period > future_status wait_for( std::chrono::duration< Rep, Period > const& timeout_duration) const { - std::unique_lock< mutex > lk( mtx_); + std::unique_lock< mutex > lk{ mtx_ }; return wait_for_( lk, timeout_duration); } template< typename Clock, typename Duration > future_status wait_until( std::chrono::time_point< Clock, Duration > const& timeout_time) const { - std::unique_lock< mutex > lk( mtx_); + std::unique_lock< mutex > lk{ mtx_ }; return wait_until_( lk, timeout_time); } friend inline void intrusive_ptr_add_ref( shared_state_base * p) noexcept { - ++p->use_count_; + p->use_count_.fetch_add( 1, std::memory_order_relaxed); } friend inline void intrusive_ptr_release( shared_state_base * p) noexcept { - if ( 0 == --p->use_count_) { - p->deallocate_future(); + if ( 1 == p->use_count_.fetch_sub( 1, std::memory_order_release) ) { + std::atomic_thread_fence( std::memory_order_acquire); + p->deallocate_future(); } } }; @@ -161,18 +162,18 @@ private: void set_value_( R const& value, std::unique_lock< mutex > & lk) { BOOST_ASSERT( lk.owns_lock() ); if ( ready_) { - throw promise_already_satisfied(); + throw promise_already_satisfied{}; } - ::new ( static_cast< void * >( std::addressof( storage_) ) ) R( value); + ::new ( static_cast< void * >( std::addressof( storage_) ) ) R{ value }; mark_ready_and_notify_( lk); } void set_value_( R && value, std::unique_lock< mutex > & lk) { BOOST_ASSERT( lk.owns_lock() ); if ( ready_) { - throw promise_already_satisfied(); + throw promise_already_satisfied{}; } - ::new ( static_cast< void * >( std::addressof( storage_) ) ) R( std::move( value) ); + ::new ( static_cast< void * >( std::addressof( storage_) ) ) R{ std::move( value) }; mark_ready_and_notify_( lk); } @@ -186,7 +187,7 @@ private: } public: - typedef intrusive_ptr< shared_state > ptr_t; + typedef intrusive_ptr< shared_state > ptr_type; shared_state() = default; @@ -200,17 +201,17 @@ public: shared_state & operator=( shared_state const&) = delete; void set_value( R const& value) { - std::unique_lock< mutex > lk( mtx_); + std::unique_lock< mutex > lk{ mtx_ }; set_value_( value, lk); } void set_value( R && value) { - std::unique_lock< mutex > lk( mtx_); + std::unique_lock< mutex > lk{ mtx_ }; set_value_( std::move( value), lk); } R & get() { - std::unique_lock< mutex > lk( mtx_); + std::unique_lock< mutex > lk{ mtx_ }; return get_( lk); } }; @@ -239,7 +240,7 @@ private: } public: - typedef intrusive_ptr< shared_state > ptr_t; + typedef intrusive_ptr< shared_state > ptr_type; shared_state() = default; @@ -249,12 +250,12 @@ public: shared_state & operator=( shared_state const&) = delete; void set_value( R & value) { - std::unique_lock< mutex > lk( mtx_); + std::unique_lock< mutex > lk{ mtx_ }; set_value_( value, lk); } R & get() { - std::unique_lock< mutex > lk( mtx_); + std::unique_lock< mutex > lk{ mtx_ }; return get_( lk); } }; @@ -281,7 +282,7 @@ private: } public: - typedef intrusive_ptr< shared_state > ptr_t; + typedef intrusive_ptr< shared_state > ptr_type; shared_state() = default; @@ -292,13 +293,13 @@ public: inline void set_value() { - std::unique_lock< mutex > lk( mtx_); + std::unique_lock< mutex > lk{ mtx_ }; set_value_( lk); } inline void get() { - std::unique_lock< mutex > lk( mtx_); + std::unique_lock< mutex > lk{ mtx_ }; get_( lk); } }; diff --git a/boost/fiber/future/detail/shared_state_object.hpp b/boost/fiber/future/detail/shared_state_object.hpp index dcdafef65c..50e08a6393 100644 --- a/boost/fiber/future/detail/shared_state_object.hpp +++ b/boost/fiber/future/detail/shared_state_object.hpp @@ -27,10 +27,11 @@ class shared_state_object : public shared_state< R > { public: typedef typename std::allocator_traits< Allocator >::template rebind_alloc< shared_state_object - > allocator_t; + > allocator_type; - shared_state_object( allocator_t const& alloc) : - shared_state< R >(), alloc_( alloc) { + shared_state_object( allocator_type const& alloc) : + shared_state< R >{}, + alloc_{ alloc } { } protected: @@ -39,10 +40,10 @@ protected: } private: - allocator_t alloc_; + allocator_type alloc_; - static void destroy_( allocator_t const& alloc, shared_state_object * p) noexcept { - allocator_t a{ alloc }; + static void destroy_( allocator_type const& alloc, shared_state_object * p) noexcept { + allocator_type a{ alloc }; a.destroy( p); a.deallocate( p, 1); } diff --git a/boost/fiber/future/detail/task_base.hpp b/boost/fiber/future/detail/task_base.hpp index 907e820470..83abd7e5ab 100644 --- a/boost/fiber/future/detail/task_base.hpp +++ b/boost/fiber/future/detail/task_base.hpp @@ -22,14 +22,14 @@ namespace detail { template< typename R, typename ... Args > struct task_base : public shared_state< R > { - typedef intrusive_ptr< task_base > ptr_t; + typedef intrusive_ptr< task_base > ptr_type; virtual ~task_base() { } virtual void run( Args && ... args) = 0; - virtual ptr_t reset() = 0; + virtual ptr_type reset() = 0; }; }}} diff --git a/boost/fiber/future/detail/task_object.hpp b/boost/fiber/future/detail/task_object.hpp index 37bfd3fd11..3a48929a58 100644 --- a/boost/fiber/future/detail/task_object.hpp +++ b/boost/fiber/future/detail/task_object.hpp @@ -9,10 +9,13 @@ #include <exception> #include <memory> +#include <tuple> #include <utility> #include <boost/config.hpp> +#if defined(BOOST_NO_CXX17_STD_APPLY) #include <boost/context/detail/apply.hpp> +#endif #include <boost/fiber/detail/config.hpp> #include <boost/fiber/future/detail/task_base.hpp> @@ -28,43 +31,49 @@ namespace detail { template< typename Fn, typename Allocator, typename R, typename ... Args > class task_object : public task_base< R, Args ... > { private: - typedef task_base< R, Args ... > base_t; + typedef task_base< R, Args ... > base_type; public: typedef typename std::allocator_traits< Allocator >::template rebind_alloc< task_object - > allocator_t; + > allocator_type; - task_object( allocator_t const& alloc, Fn const& fn) : - base_t(), - fn_( fn), - alloc_( alloc) { + task_object( allocator_type const& alloc, Fn const& fn) : + base_type{}, + fn_{ fn }, + alloc_{ alloc } { } - task_object( allocator_t const& alloc, Fn && fn) : - base_t(), - fn_( std::move( fn) ), - alloc_( alloc) { + task_object( allocator_type const& alloc, Fn && fn) : + base_type{}, + fn_{ std::move( fn) }, + alloc_{ alloc } { } void run( Args && ... args) override final { try { this->set_value( +#if defined(BOOST_NO_CXX17_STD_APPLY) boost::context::detail::apply( - fn_, std::make_tuple( std::forward< Args >( args) ... ) ) ); + fn_, std::make_tuple( std::forward< Args >( args) ... ) ) +#else + std::apply( + fn_, std::make_tuple( std::forward< Args >( args) ... ) ) +#endif + ); } catch (...) { this->set_exception( std::current_exception() ); } } - typename base_t::ptr_t reset() override final { - typedef std::allocator_traits< allocator_t > traits_t; + typename base_type::ptr_type reset() override final { + typedef std::allocator_traits< allocator_type > traity_type; - typename traits_t::pointer ptr{ traits_t::allocate( alloc_, 1) }; + typename traity_type::pointer ptr{ traity_type::allocate( alloc_, 1) }; try { - traits_t::construct( alloc_, ptr, alloc_, std::move( fn_) ); + traity_type::construct( alloc_, ptr, alloc_, std::move( fn_) ); } catch (...) { - traits_t::deallocate( alloc_, ptr, 1); + traity_type::deallocate( alloc_, ptr, 1); throw; } return { convert( ptr) }; @@ -77,10 +86,10 @@ protected: private: Fn fn_; - allocator_t alloc_; + allocator_type alloc_; - static void destroy_( allocator_t const& alloc, task_object * p) noexcept { - allocator_t a{ alloc }; + static void destroy_( allocator_type const& alloc, task_object * p) noexcept { + allocator_type a{ alloc }; a.destroy( p); a.deallocate( p, 1); } @@ -89,43 +98,48 @@ private: template< typename Fn, typename Allocator, typename ... Args > class task_object< Fn, Allocator, void, Args ... > : public task_base< void, Args ... > { private: - typedef task_base< void, Args ... > base_t; + typedef task_base< void, Args ... > base_type; public: typedef typename Allocator::template rebind< task_object< Fn, Allocator, void, Args ... > - >::other allocator_t; + >::other allocator_type; - task_object( allocator_t const& alloc, Fn const& fn) : - base_t(), - fn_( fn), - alloc_( alloc) { + task_object( allocator_type const& alloc, Fn const& fn) : + base_type{}, + fn_{ fn }, + alloc_{ alloc } { } - task_object( allocator_t const& alloc, Fn && fn) : - base_t(), - fn_( std::move( fn) ), - alloc_( alloc) { + task_object( allocator_type const& alloc, Fn && fn) : + base_type{}, + fn_{ std::move( fn) }, + alloc_{ alloc } { } void run( Args && ... args) override final { try { +#if defined(BOOST_NO_CXX17_STD_APPLY) boost::context::detail::apply( fn_, std::make_tuple( std::forward< Args >( args) ... ) ); +#else + std::apply( + fn_, std::make_tuple( std::forward< Args >( args) ... ) ); +#endif this->set_value(); } catch (...) { this->set_exception( std::current_exception() ); } } - typename base_t::ptr_t reset() override final { - typedef std::allocator_traits< allocator_t > traits_t; + typename base_type::ptr_type reset() override final { + typedef std::allocator_traits< allocator_type > traity_type; - typename traits_t::pointer ptr{ traits_t::allocate( alloc_, 1) }; + typename traity_type::pointer ptr{ traity_type::allocate( alloc_, 1) }; try { - traits_t::construct( alloc_, ptr, alloc_, std::move( fn_) ); + traity_type::construct( alloc_, ptr, alloc_, std::move( fn_) ); } catch (...) { - traits_t::deallocate( alloc_, ptr, 1); + traity_type::deallocate( alloc_, ptr, 1); throw; } return { convert( ptr) }; @@ -138,10 +152,10 @@ protected: private: Fn fn_; - allocator_t alloc_; + allocator_type alloc_; - static void destroy_( allocator_t const& alloc, task_object * p) noexcept { - allocator_t a{ alloc }; + static void destroy_( allocator_type const& alloc, task_object * p) noexcept { + allocator_type a{ alloc }; a.destroy( p); a.deallocate( p, 1); } diff --git a/boost/fiber/future/future.hpp b/boost/fiber/future/future.hpp index d92820eb58..5d4ad78ab5 100644 --- a/boost/fiber/future/future.hpp +++ b/boost/fiber/future/future.hpp @@ -24,13 +24,13 @@ namespace detail { template< typename R > struct future_base { - typedef typename shared_state< R >::ptr_t ptr_t; + typedef typename shared_state< R >::ptr_type ptr_type; - ptr_t state_{}; + ptr_type state_{}; - future_base() noexcept = default; + future_base() = default; - explicit future_base( ptr_t const& p) noexcept : + explicit future_base( ptr_type const& p) noexcept : state_{ p } { } @@ -46,15 +46,17 @@ struct future_base { } future_base & operator=( future_base const& other) noexcept { - if ( this == & other) return * this; - state_ = other.state_; + if ( this != & other) { + state_ = other.state_; + } return * this; } future_base & operator=( future_base && other) noexcept { - if ( this == & other) return * this; - state_ = other.state_; - other.state_.reset(); + if ( this != & other) { + state_ = other.state_; + other.state_.reset(); + } return * this; } @@ -107,128 +109,131 @@ class packaged_task; template< typename R > class future : private detail::future_base< R > { private: - typedef detail::future_base< R > base_t; + typedef detail::future_base< R > base_type; friend struct detail::promise_base< R >; friend class shared_future< R >; template< typename Signature > friend class packaged_task; - explicit future( typename base_t::ptr_t const& p) noexcept : - base_t{ p } { + explicit future( typename base_type::ptr_type const& p) noexcept : + base_type{ p } { } public: - future() noexcept = default; + future() = default; future( future const&) = delete; future & operator=( future const&) = delete; future( future && other) noexcept : - base_t{ std::move( other) } { + base_type{ std::move( other) } { } future & operator=( future && other) noexcept { - if ( this == & other) return * this; - base_t::operator=( std::move( other) ); + if ( this != & other) { + base_type::operator=( std::move( other) ); + } return * this; } shared_future< R > share(); R get() { - if ( ! base_t::valid() ) { + if ( ! base_type::valid() ) { throw future_uninitialized{}; } - typename base_t::ptr_t tmp{}; - tmp.swap( base_t::state_); + typename base_type::ptr_type tmp{}; + tmp.swap( base_type::state_); return std::move( tmp->get() ); } - using base_t::valid; - using base_t::get_exception_ptr; - using base_t::wait; - using base_t::wait_for; - using base_t::wait_until; + using base_type::valid; + using base_type::get_exception_ptr; + using base_type::wait; + using base_type::wait_for; + using base_type::wait_until; }; template< typename R > class future< R & > : private detail::future_base< R & > { private: - typedef detail::future_base< R & > base_t; + typedef detail::future_base< R & > base_type; friend struct detail::promise_base< R & >; friend class shared_future< R & >; template< typename Signature > friend class packaged_task; - explicit future( typename base_t::ptr_t const& p) noexcept : - base_t{ p } { + explicit future( typename base_type::ptr_type const& p) noexcept : + base_type{ p } { } public: - future() noexcept = default; + future() = default; future( future const&) = delete; future & operator=( future const&) = delete; future( future && other) noexcept : - base_t{ std::move( other) } { + base_type{ std::move( other) } { } future & operator=( future && other) noexcept { - if ( this == & other) return * this; - base_t::operator=( std::move( other) ); + if ( this != & other) { + base_type::operator=( std::move( other) ); + } return * this; } shared_future< R & > share(); R & get() { - if ( ! base_t::valid() ) { + if ( ! base_type::valid() ) { throw future_uninitialized{}; } - typename base_t::ptr_t tmp{}; - tmp.swap( base_t::state_); + typename base_type::ptr_type tmp{}; + tmp.swap( base_type::state_); return tmp->get(); } - using base_t::valid; - using base_t::get_exception_ptr; - using base_t::wait; - using base_t::wait_for; - using base_t::wait_until; + using base_type::valid; + using base_type::get_exception_ptr; + using base_type::wait; + using base_type::wait_for; + using base_type::wait_until; }; template<> class future< void > : private detail::future_base< void > { private: - typedef detail::future_base< void > base_t; + typedef detail::future_base< void > base_type; friend struct detail::promise_base< void >; friend class shared_future< void >; template< typename Signature > friend class packaged_task; - explicit future( base_t::ptr_t const& p) noexcept : - base_t{ p } { + explicit future( base_type::ptr_type const& p) noexcept : + base_type{ p } { } public: - future() noexcept = default; + future() = default; future( future const&) = delete; future & operator=( future const&) = delete; inline future( future && other) noexcept : - base_t{ std::move( other) } { + base_type{ std::move( other) } { } inline future & operator=( future && other) noexcept { - if ( this == & other) return * this; - base_t::operator=( std::move( other) ); + if ( this != & other) { + base_type::operator=( std::move( other) ); + } return * this; } @@ -236,62 +241,64 @@ public: inline void get() { - if ( ! base_t::valid() ) { + if ( ! base_type::valid() ) { throw future_uninitialized{}; } - base_t::ptr_t tmp{}; - tmp.swap( base_t::state_); + base_type::ptr_type tmp{}; + tmp.swap( base_type::state_); tmp->get(); } - using base_t::valid; - using base_t::get_exception_ptr; - using base_t::wait; - using base_t::wait_for; - using base_t::wait_until; + using base_type::valid; + using base_type::get_exception_ptr; + using base_type::wait; + using base_type::wait_for; + using base_type::wait_until; }; template< typename R > class shared_future : private detail::future_base< R > { private: - typedef detail::future_base< R > base_t; + typedef detail::future_base< R > base_type; - explicit shared_future( typename base_t::ptr_t const& p) noexcept : - base_t{ p } { + explicit shared_future( typename base_type::ptr_type const& p) noexcept : + base_type{ p } { } public: - shared_future() noexcept = default; + shared_future() = default; ~shared_future() = default; shared_future( shared_future const& other) : - base_t{ other } { + base_type{ other } { } shared_future( shared_future && other) noexcept : - base_t{ std::move( other) } { + base_type{ std::move( other) } { } shared_future( future< R > && other) noexcept : - base_t{ std::move( other) } { + base_type{ std::move( other) } { } shared_future & operator=( shared_future const& other) noexcept { - if ( this == & other) return * this; - base_t::operator=( other); + if ( this != & other) { + base_type::operator=( other); + } return * this; } shared_future & operator=( shared_future && other) noexcept { - if ( this == & other) return * this; - base_t::operator=( std::move( other) ); + if ( this != & other) { + base_type::operator=( std::move( other) ); + } return * this; } shared_future & operator=( future< R > && other) noexcept { - base_t::operator=( std::move( other) ); + base_type::operator=( std::move( other) ); return * this; } @@ -299,56 +306,58 @@ public: if ( ! valid() ) { throw future_uninitialized{}; } - return base_t::state_->get(); + return base_type::state_->get(); } - using base_t::valid; - using base_t::get_exception_ptr; - using base_t::wait; - using base_t::wait_for; - using base_t::wait_until; + using base_type::valid; + using base_type::get_exception_ptr; + using base_type::wait; + using base_type::wait_for; + using base_type::wait_until; }; template< typename R > class shared_future< R & > : private detail::future_base< R & > { private: - typedef detail::future_base< R & > base_t; + typedef detail::future_base< R & > base_type; - explicit shared_future( typename base_t::ptr_t const& p) noexcept : - base_t{ p } { + explicit shared_future( typename base_type::ptr_type const& p) noexcept : + base_type{ p } { } public: - shared_future() noexcept = default; + shared_future() = default; ~shared_future() = default; shared_future( shared_future const& other) : - base_t{ other } { + base_type{ other } { } shared_future( shared_future && other) noexcept : - base_t{ std::move( other) } { + base_type{ std::move( other) } { } shared_future( future< R & > && other) noexcept : - base_t{ std::move( other) } { + base_type{ std::move( other) } { } shared_future & operator=( shared_future const& other) noexcept { - if ( this == & other) return * this; - base_t::operator=( other); + if ( this != & other) { + base_type::operator=( other); + } return * this; } shared_future & operator=( shared_future && other) noexcept { - if ( this == & other) return * this; - base_t::operator=( std::move( other) ); + if ( this != & other) { + base_type::operator=( std::move( other) ); + } return * this; } shared_future & operator=( future< R & > && other) noexcept { - base_t::operator=( std::move( other) ); + base_type::operator=( std::move( other) ); return * this; } @@ -356,62 +365,64 @@ public: if ( ! valid() ) { throw future_uninitialized{}; } - return base_t::state_->get(); + return base_type::state_->get(); } - using base_t::valid; - using base_t::get_exception_ptr; - using base_t::wait; - using base_t::wait_for; - using base_t::wait_until; + using base_type::valid; + using base_type::get_exception_ptr; + using base_type::wait; + using base_type::wait_for; + using base_type::wait_until; }; template<> class shared_future< void > : private detail::future_base< void > { private: - typedef detail::future_base< void > base_t; + typedef detail::future_base< void > base_type; - explicit shared_future( base_t::ptr_t const& p) noexcept : - base_t{ p } { + explicit shared_future( base_type::ptr_type const& p) noexcept : + base_type{ p } { } public: - shared_future() noexcept = default; + shared_future() = default; ~shared_future() = default; inline shared_future( shared_future const& other) : - base_t{ other } { + base_type{ other } { } inline shared_future( shared_future && other) noexcept : - base_t{ std::move( other) } { + base_type{ std::move( other) } { } inline shared_future( future< void > && other) noexcept : - base_t{ std::move( other) } { + base_type{ std::move( other) } { } inline shared_future & operator=( shared_future const& other) noexcept { - if ( this == & other) return * this; - base_t::operator=( other); + if ( this != & other) { + base_type::operator=( other); + } return * this; } inline shared_future & operator=( shared_future && other) noexcept { - if ( this == & other) return * this; - base_t::operator=( std::move( other) ); + if ( this != & other) { + base_type::operator=( std::move( other) ); + } return * this; } inline shared_future & operator=( future< void > && other) noexcept { - base_t::operator=( std::move( other) ); + base_type::operator=( std::move( other) ); return * this; } @@ -420,21 +431,21 @@ public: if ( ! valid() ) { throw future_uninitialized{}; } - base_t::state_->get(); + base_type::state_->get(); } - using base_t::valid; - using base_t::get_exception_ptr; - using base_t::wait; - using base_t::wait_for; - using base_t::wait_until; + using base_type::valid; + using base_type::get_exception_ptr; + using base_type::wait; + using base_type::wait_for; + using base_type::wait_until; }; template< typename R > shared_future< R > future< R >::share() { - if ( ! base_t::valid() ) { + if ( ! base_type::valid() ) { throw future_uninitialized{}; } return shared_future< R >{ std::move( * this) }; @@ -443,7 +454,7 @@ future< R >::share() { template< typename R > shared_future< R & > future< R & >::share() { - if ( ! base_t::valid() ) { + if ( ! base_type::valid() ) { throw future_uninitialized{}; } return shared_future< R & >{ std::move( * this) }; @@ -452,7 +463,7 @@ future< R & >::share() { inline shared_future< void > future< void >::share() { - if ( ! base_t::valid() ) { + if ( ! base_type::valid() ) { throw future_uninitialized{}; } return shared_future< void >{ std::move( * this) }; diff --git a/boost/fiber/future/packaged_task.hpp b/boost/fiber/future/packaged_task.hpp index 7ea16bfee7..31838ee41f 100644 --- a/boost/fiber/future/packaged_task.hpp +++ b/boost/fiber/future/packaged_task.hpp @@ -30,13 +30,13 @@ class packaged_task; template< typename R, typename ... Args > class packaged_task< R( Args ... ) > { private: - typedef typename detail::task_base< R, Args ... >::ptr_t ptr_t; + typedef typename detail::task_base< R, Args ... >::ptr_type ptr_type; bool obtained_{ false }; - ptr_t task_{}; + ptr_type task_{}; public: - constexpr packaged_task() noexcept = default; + packaged_task() = default; template< typename Fn, typename = detail::disable_overload< packaged_task, Fn > @@ -53,17 +53,17 @@ public: explicit packaged_task( std::allocator_arg_t, Allocator const& alloc, Fn && fn) { typedef detail::task_object< typename std::decay< Fn >::type, Allocator, R, Args ... - > object_t; + > object_type; typedef std::allocator_traits< - typename object_t::allocator_t - > traits_t; + typename object_type::allocator_type + > traits_type; - typename object_t::allocator_t a{ alloc }; - typename traits_t::pointer ptr{ traits_t::allocate( a, 1) }; + typename object_type::allocator_type a{ alloc }; + typename traits_type::pointer ptr{ traits_type::allocate( a, 1) }; try { - traits_t::construct( a, ptr, a, std::forward< Fn >( fn) ); + traits_type::construct( a, ptr, a, std::forward< Fn >( fn) ); } catch (...) { - traits_t::deallocate( a, ptr, 1); + traits_type::deallocate( a, ptr, 1); throw; } task_.reset( convert( ptr) ); @@ -85,9 +85,10 @@ public: } packaged_task & operator=( packaged_task && other) noexcept { - if ( this == & other) return * this; - packaged_task tmp{ std::move( other) }; - swap( tmp); + if ( this != & other) { + packaged_task tmp{ std::move( other) }; + swap( tmp); + } return * this; } diff --git a/boost/fiber/future/promise.hpp b/boost/fiber/future/promise.hpp index 278c839700..83ba63fb23 100644 --- a/boost/fiber/future/promise.hpp +++ b/boost/fiber/future/promise.hpp @@ -25,10 +25,10 @@ namespace detail { template< typename R > struct promise_base { - typedef typename shared_state< R >::ptr_t ptr_t; + typedef typename shared_state< R >::ptr_type ptr_type; bool obtained_{ false }; - ptr_t future_{}; + ptr_type future_{}; promise_base() : promise_base{ std::allocator_arg, std::allocator< promise_base >{} } { @@ -36,15 +36,15 @@ struct promise_base { template< typename Allocator > promise_base( std::allocator_arg_t, Allocator alloc) { - typedef detail::shared_state_object< R, Allocator > object_t; - typedef std::allocator_traits< typename object_t::allocator_t > traits_t; - typename object_t::allocator_t a{ alloc }; - typename traits_t::pointer ptr{ traits_t::allocate( a, 1) }; + typedef detail::shared_state_object< R, Allocator > object_type; + typedef std::allocator_traits< typename object_type::allocator_type > traits_type; + typename object_type::allocator_type a{ alloc }; + typename traits_type::pointer ptr{ traits_type::allocate( a, 1) }; try { - traits_t::construct( a, ptr, a); + traits_type::construct( a, ptr, a); } catch (...) { - traits_t::deallocate( a, ptr, 1); + traits_type::deallocate( a, ptr, 1); throw; } future_.reset( convert( ptr) ); @@ -66,9 +66,10 @@ struct promise_base { } promise_base & operator=( promise_base && other) noexcept { - if ( this == & other) return * this; - promise_base tmp{ std::move( other) }; - swap( tmp); + if ( this != & other) { + promise_base tmp{ std::move( other) }; + swap( tmp); + } return * this; } @@ -101,14 +102,14 @@ struct promise_base { template< typename R > class promise : private detail::promise_base< R > { private: - typedef detail::promise_base< R > base_t; + typedef detail::promise_base< R > base_type; public: promise() = default; template< typename Allocator > promise( std::allocator_arg_t, Allocator alloc) : - base_t{ std::allocator_arg, alloc } { + base_type{ std::allocator_arg, alloc } { } promise( promise const&) = delete; @@ -118,38 +119,38 @@ public: promise & operator=( promise && other) = default; void set_value( R const& value) { - if ( ! base_t::future_) { + if ( ! base_type::future_) { throw promise_uninitialized{}; } - base_t::future_->set_value( value); + base_type::future_->set_value( value); } void set_value( R && value) { - if ( ! base_t::future_) { + if ( ! base_type::future_) { throw promise_uninitialized{}; } - base_t::future_->set_value( std::move( value) ); + base_type::future_->set_value( std::move( value) ); } void swap( promise & other) noexcept { - base_t::swap( other); + base_type::swap( other); } - using base_t::get_future; - using base_t::set_exception; + using base_type::get_future; + using base_type::set_exception; }; template< typename R > class promise< R & > : private detail::promise_base< R & > { private: - typedef detail::promise_base< R & > base_t; + typedef detail::promise_base< R & > base_type; public: promise() = default; template< typename Allocator > promise( std::allocator_arg_t, Allocator alloc) : - base_t{ std::allocator_arg, alloc } { + base_type{ std::allocator_arg, alloc } { } promise( promise const&) = delete; @@ -159,31 +160,31 @@ public: promise & operator=( promise && other) noexcept = default; void set_value( R & value) { - if ( ! base_t::future_) { + if ( ! base_type::future_) { throw promise_uninitialized{}; } - base_t::future_->set_value( value); + base_type::future_->set_value( value); } void swap( promise & other) noexcept { - base_t::swap( other); + base_type::swap( other); } - using base_t::get_future; - using base_t::set_exception; + using base_type::get_future; + using base_type::set_exception; }; template<> class promise< void > : private detail::promise_base< void > { private: - typedef detail::promise_base< void > base_t; + typedef detail::promise_base< void > base_type; public: promise() = default; template< typename Allocator > promise( std::allocator_arg_t, Allocator alloc) : - base_t{ std::allocator_arg, alloc } { + base_type{ std::allocator_arg, alloc } { } promise( promise const&) = delete; @@ -194,19 +195,19 @@ public: inline void set_value() { - if ( ! base_t::future_) { + if ( ! base_type::future_) { throw promise_uninitialized{}; } - base_t::future_->set_value(); + base_type::future_->set_value(); } inline void swap( promise & other) noexcept { - base_t::swap( other); + base_type::swap( other); } - using base_t::get_future; - using base_t::set_exception; + using base_type::get_future; + using base_type::set_exception; }; template< typename R > diff --git a/boost/fiber/mutex.hpp b/boost/fiber/mutex.hpp index b56e96802a..e25ef04d9e 100644 --- a/boost/fiber/mutex.hpp +++ b/boost/fiber/mutex.hpp @@ -33,11 +33,11 @@ class BOOST_FIBERS_DECL mutex { private: friend class condition_variable; - typedef context::wait_queue_t wait_queue_t; + typedef context::wait_queue_t wait_queue_type; - context * owner_{ nullptr }; - wait_queue_t wait_queue_{}; detail::spinlock wait_queue_splk_{}; + wait_queue_type wait_queue_{}; + context * owner_{ nullptr }; public: mutex() = default; diff --git a/boost/fiber/operations.hpp b/boost/fiber/operations.hpp index 490ba462f6..95ab9ba452 100644 --- a/boost/fiber/operations.hpp +++ b/boost/fiber/operations.hpp @@ -35,8 +35,7 @@ void yield() noexcept { template< typename Clock, typename Duration > void sleep_until( std::chrono::time_point< Clock, Duration > const& sleep_time_) { - std::chrono::steady_clock::time_point sleep_time( - boost::fibers::detail::convert( sleep_time_) ); + std::chrono::steady_clock::time_point sleep_time = boost::fibers::detail::convert( sleep_time_); fibers::context::active()->wait_until( sleep_time); } @@ -48,8 +47,7 @@ void sleep_for( std::chrono::duration< Rep, Period > const& timeout_duration) { template< typename PROPS > PROPS & properties() { - fibers::fiber_properties * props = - fibers::context::active()->get_properties(); + fibers::fiber_properties * props = fibers::context::active()->get_properties(); if ( ! props) { // props could be nullptr if the thread's main fiber has not yet // yielded (not yet passed through algorithm_with_properties:: @@ -61,7 +59,7 @@ PROPS & properties() { props = fibers::context::active()->get_properties(); // Could still be hosed if the running manager isn't a subclass of // algorithm_with_properties. - BOOST_ASSERT_MSG(props, "this_fiber::properties not set"); + BOOST_ASSERT_MSG( props, "this_fiber::properties not set"); } return dynamic_cast< PROPS & >( * props ); } diff --git a/boost/fiber/recursive_mutex.hpp b/boost/fiber/recursive_mutex.hpp index 66fb5aa7c3..864c65cb8e 100644 --- a/boost/fiber/recursive_mutex.hpp +++ b/boost/fiber/recursive_mutex.hpp @@ -37,12 +37,12 @@ class BOOST_FIBERS_DECL recursive_mutex { private: friend class condition_variable; - typedef context::wait_queue_t wait_queue_t; + typedef context::wait_queue_t wait_queue_type; + detail::spinlock wait_queue_splk_{}; + wait_queue_type wait_queue_{}; context * owner_{ nullptr }; std::size_t count_{ 0 }; - wait_queue_t wait_queue_{}; - detail::spinlock wait_queue_splk_{}; public: recursive_mutex() = default; diff --git a/boost/fiber/recursive_timed_mutex.hpp b/boost/fiber/recursive_timed_mutex.hpp index 2852252cd0..339427b895 100644 --- a/boost/fiber/recursive_timed_mutex.hpp +++ b/boost/fiber/recursive_timed_mutex.hpp @@ -39,12 +39,12 @@ class BOOST_FIBERS_DECL recursive_timed_mutex { private: friend class condition_variable; - typedef context::wait_queue_t wait_queue_t; + typedef context::wait_queue_t wait_queue_type; + detail::spinlock wait_queue_splk_{}; + wait_queue_type wait_queue_{}; context * owner_{ nullptr }; std::size_t count_{ 0 }; - wait_queue_t wait_queue_{}; - detail::spinlock wait_queue_splk_{}; bool try_lock_until_( std::chrono::steady_clock::time_point const& timeout_time) noexcept; @@ -66,8 +66,7 @@ public: template< typename Clock, typename Duration > bool try_lock_until( std::chrono::time_point< Clock, Duration > const& timeout_time_) { - std::chrono::steady_clock::time_point timeout_time( - detail::convert( timeout_time_) ); + std::chrono::steady_clock::time_point timeout_time = detail::convert( timeout_time_); return try_lock_until_( timeout_time); } diff --git a/boost/fiber/scheduler.hpp b/boost/fiber/scheduler.hpp index 4503bc0b5f..4a5f0dab8c 100644 --- a/boost/fiber/scheduler.hpp +++ b/boost/fiber/scheduler.hpp @@ -13,15 +13,19 @@ #include <vector> #include <boost/config.hpp> -#include <boost/context/execution_context.hpp> +#if (BOOST_EXECUTION_CONTEXT==1) +# include <boost/context/execution_context.hpp> +#else +# include <boost/context/continuation.hpp> +#endif #include <boost/intrusive/list.hpp> #include <boost/intrusive_ptr.hpp> #include <boost/intrusive/set.hpp> +#include <boost/intrusive/slist.hpp> #include <boost/fiber/algo/algorithm.hpp> #include <boost/fiber/context.hpp> #include <boost/fiber/detail/config.hpp> -#include <boost/fiber/detail/context_mpsc_queue.hpp> #include <boost/fiber/detail/data.hpp> #include <boost/fiber/detail/spinlock.hpp> @@ -49,45 +53,56 @@ public: context, intrusive::member_hook< context, detail::ready_hook, & context::ready_hook_ >, - intrusive::constant_time_size< false > > ready_queue_t; + intrusive::constant_time_size< false > + > ready_queue_type; private: typedef intrusive::multiset< context, intrusive::member_hook< context, detail::sleep_hook, & context::sleep_hook_ >, intrusive::constant_time_size< false >, - intrusive::compare< timepoint_less > > sleep_queue_t; + intrusive::compare< timepoint_less > + > sleep_queue_type; typedef intrusive::list< context, intrusive::member_hook< + context, detail::worker_hook, & context::worker_hook_ >, + intrusive::constant_time_size< false > + > worker_queue_type; + typedef intrusive::slist< + context, + intrusive::member_hook< context, detail::terminated_hook, & context::terminated_hook_ >, - intrusive::constant_time_size< false > > terminated_queue_t; - typedef intrusive::list< + intrusive::linear< true >, + intrusive::cache_last< true > + > terminated_queue_type; + typedef intrusive::slist< context, intrusive::member_hook< - context, detail::worker_hook, & context::worker_hook_ >, - intrusive::constant_time_size< false > > worker_queue_t; + context, detail::remote_ready_hook, & context::remote_ready_hook_ >, + intrusive::linear< true >, + intrusive::cache_last< true > + > remote_ready_queue_type; - std::unique_ptr< algo::algorithm > algo_; - context * main_ctx_{ nullptr }; - intrusive_ptr< context > dispatcher_ctx_{}; - // worker-queue contains all context' mananged by this scheduler - // except main-context and dispatcher-context - // unlink happens on destruction of a context - worker_queue_t worker_queue_{}; - // terminated-queue contains context' which have been terminated - terminated_queue_t terminated_queue_{}; #if ! defined(BOOST_FIBERS_NO_ATOMICS) // remote ready-queue contains context' signaled by schedulers // running in other threads - detail::context_mpsc_queue remote_ready_queue_{}; - // sleep-queue contains context' which have been called + detail::spinlock remote_ready_splk_{}; + remote_ready_queue_type remote_ready_queue_{}; #endif + alignas(cache_alignment) std::unique_ptr< algo::algorithm > algo_; + // sleep-queue contains context' which have been called // scheduler::wait_until() - sleep_queue_t sleep_queue_{}; - bool shutdown_{ false }; - - context * get_next_() noexcept; + sleep_queue_type sleep_queue_{}; + // worker-queue contains all context' mananged by this scheduler + // except main-context and dispatcher-context + // unlink happens on destruction of a context + worker_queue_type worker_queue_{}; + // terminated-queue contains context' which have been terminated + terminated_queue_type terminated_queue_{}; + intrusive_ptr< context > dispatcher_ctx_{}; + context * main_ctx_{ nullptr }; + bool shutdown_{ false }; void release_terminated_() noexcept; @@ -105,20 +120,20 @@ public: virtual ~scheduler(); - void set_ready( context *) noexcept; + void schedule( context *) noexcept; #if ! defined(BOOST_FIBERS_NO_ATOMICS) - void set_remote_ready( context *) noexcept; + void schedule_from_remote( context *) noexcept; #endif #if (BOOST_EXECUTION_CONTEXT==1) void dispatch() noexcept; - void set_terminated( context *) noexcept; + void terminate( detail::spinlock_lock &, context *) noexcept; #else - boost::context::execution_context< detail::data_t * > dispatch() noexcept; + boost::context::continuation dispatch() noexcept; - boost::context::execution_context< detail::data_t * > set_terminated( context *) noexcept; + boost::context::continuation terminate( detail::spinlock_lock &, context *) noexcept; #endif void yield( context *) noexcept; diff --git a/boost/fiber/timed_mutex.hpp b/boost/fiber/timed_mutex.hpp index ed6dbd458e..f74c5e41ad 100644 --- a/boost/fiber/timed_mutex.hpp +++ b/boost/fiber/timed_mutex.hpp @@ -35,11 +35,11 @@ class BOOST_FIBERS_DECL timed_mutex { private: friend class condition_variable; - typedef context::wait_queue_t wait_queue_t; + typedef context::wait_queue_t wait_queue_type; - context * owner_{ nullptr }; - wait_queue_t wait_queue_{}; detail::spinlock wait_queue_splk_{}; + wait_queue_type wait_queue_{}; + context * owner_{ nullptr }; bool try_lock_until_( std::chrono::steady_clock::time_point const& timeout_time) noexcept; @@ -60,8 +60,7 @@ public: template< typename Clock, typename Duration > bool try_lock_until( std::chrono::time_point< Clock, Duration > const& timeout_time_) { - std::chrono::steady_clock::time_point timeout_time( - detail::convert( timeout_time_) ); + std::chrono::steady_clock::time_point timeout_time = detail::convert( timeout_time_); return try_lock_until_( timeout_time); } diff --git a/boost/fiber/type.hpp b/boost/fiber/type.hpp index 9346265856..065d22fccb 100644 --- a/boost/fiber/type.hpp +++ b/boost/fiber/type.hpp @@ -18,7 +18,6 @@ #include <boost/assert.hpp> #include <boost/config.hpp> #include <boost/context/detail/apply.hpp> -#include <boost/context/execution_context.hpp> #include <boost/context/stack_context.hpp> #include <boost/intrusive/list.hpp> #include <boost/intrusive/parent_from_member.hpp> diff --git a/boost/fiber/unbounded_channel.hpp b/boost/fiber/unbounded_channel.hpp deleted file mode 100644 index 6de5a6f5f0..0000000000 --- a/boost/fiber/unbounded_channel.hpp +++ /dev/null @@ -1,272 +0,0 @@ - -// Copyright Oliver Kowalke 2013. -// Distributed under the Boost Software License, Version 1.0. -// (See accompanying file LICENSE_1_0.txt or copy at -// http://www.boost.org/LICENSE_1_0.txt) -// - -#ifndef BOOST_FIBERS_UNBOUNDED_CHANNEL_H -#define BOOST_FIBERS_UNBOUNDED_CHANNEL_H - -#warn "template unbounded_channel is deprecated" - -#include <atomic> -#include <algorithm> -#include <chrono> -#include <cstddef> -#include <deque> -#include <memory> -#include <mutex> -#include <utility> - -#include <boost/config.hpp> -#include <boost/intrusive_ptr.hpp> - -#include <boost/fiber/detail/config.hpp> -#include <boost/fiber/channel_op_status.hpp> -#include <boost/fiber/condition_variable.hpp> -#include <boost/fiber/detail/convert.hpp> -#include <boost/fiber/exceptions.hpp> -#include <boost/fiber/mutex.hpp> -#include <boost/fiber/operations.hpp> - -#ifdef BOOST_HAS_ABI_HEADERS -# include BOOST_ABI_PREFIX -#endif - -namespace boost { -namespace fibers { - -template< typename T, - typename Allocator = std::allocator< T > -> -class unbounded_channel { -public: - typedef T value_type; - -private: - struct node { - typedef intrusive_ptr< node > ptr_t; - typedef typename std::allocator_traits< Allocator >::template rebind_alloc< - node - > allocator_t; - typedef std::allocator_traits< allocator_t > allocator_traits_t; - -#if ! defined(BOOST_FIBERS_NO_ATOMICS) - std::atomic< std::size_t > use_count{ 0 }; -#else - std::size_t use_count{ 0 }; -#endif - allocator_t alloc; - T va; - ptr_t nxt{}; - - node( T const& t, allocator_t const& alloc_) noexcept : - alloc{ alloc_ }, - va{ t } { - } - - node( T && t, allocator_t const& alloc_) noexcept : - alloc{ alloc_ }, - va{ std::move( t) } { - } - - friend - void intrusive_ptr_add_ref( node * p) noexcept { - ++p->use_count; - } - - friend - void intrusive_ptr_release( node * p) noexcept { - if ( 0 == --p->use_count) { - allocator_t alloc( p->alloc); - allocator_traits_t::destroy( alloc, p); - allocator_traits_t::deallocate( alloc, p, 1); - } - } - }; - - using ptr_t = typename node::ptr_t; - using allocator_t = typename node::allocator_t; - using allocator_traits_t = typename node::allocator_traits_t; - - enum class queue_status { - open = 0, - closed - }; - - allocator_t alloc_; - queue_status state_{ queue_status::open }; - ptr_t head_{}; - ptr_t * tail_; - mutable mutex mtx_{}; - condition_variable not_empty_cond_{}; - - bool is_closed_() const noexcept { - return queue_status::closed == state_; - } - - void close_( std::unique_lock< mutex > & lk) noexcept { - state_ = queue_status::closed; - lk.unlock(); - not_empty_cond_.notify_all(); - } - - bool is_empty_() const noexcept { - return ! head_; - } - - channel_op_status push_( ptr_t new_node, - std::unique_lock< mutex > & lk) noexcept { - if ( is_closed_() ) { - return channel_op_status::closed; - } - return push_and_notify_( new_node, lk); - } - - channel_op_status push_and_notify_( ptr_t new_node, - std::unique_lock< mutex > & lk) noexcept { - push_tail_( new_node); - lk.unlock(); - not_empty_cond_.notify_one(); - return channel_op_status::success; - } - - void push_tail_( ptr_t new_node) noexcept { - * tail_ = new_node; - tail_ = & new_node->nxt; - } - - value_type value_pop_( std::unique_lock< mutex > & lk) { - BOOST_ASSERT( ! is_empty_() ); - auto old_head = pop_head_(); - return std::move( old_head->va); - } - - ptr_t pop_head_() noexcept { - auto old_head = head_; - head_ = old_head->nxt; - if ( ! head_) { - tail_ = & head_; - } - old_head->nxt.reset(); - return old_head; - } - -public: - explicit unbounded_channel( Allocator const& alloc = Allocator() ) noexcept : - alloc_{ alloc }, - tail_{ & head_ } { - } - - unbounded_channel( unbounded_channel const&) = delete; - unbounded_channel & operator=( unbounded_channel const&) = delete; - - void close() noexcept { - std::unique_lock< mutex > lk( mtx_); - close_( lk); - } - - channel_op_status push( value_type const& va) { - typename allocator_traits_t::pointer ptr{ - allocator_traits_t::allocate( alloc_, 1) }; - try { - allocator_traits_t::construct( alloc_, ptr, va, alloc_); - } catch (...) { - allocator_traits_t::deallocate( alloc_, ptr, 1); - throw; - } - std::unique_lock< mutex > lk( mtx_); - return push_( { detail::convert( ptr) }, lk); - } - - channel_op_status push( value_type && va) { - typename allocator_traits_t::pointer ptr{ - allocator_traits_t::allocate( alloc_, 1) }; - try { - allocator_traits_t::construct( - alloc_, ptr, std::move( va), alloc_); - } catch (...) { - allocator_traits_t::deallocate( alloc_, ptr, 1); - throw; - } - std::unique_lock< mutex > lk( mtx_); - return push_( { detail::convert( ptr) }, lk); - } - - channel_op_status pop( value_type & va) { - std::unique_lock< mutex > lk( mtx_); - not_empty_cond_.wait( lk, - [this](){ - return is_closed_() || ! is_empty_(); - }); - if ( is_closed_() && is_empty_() ) { - return channel_op_status::closed; - } - va = value_pop_( lk); - return channel_op_status::success; - } - - value_type value_pop() { - std::unique_lock< mutex > lk( mtx_); - not_empty_cond_.wait( lk, - [this](){ - return is_closed_() || ! is_empty_(); - }); - if ( is_closed_() && is_empty_() ) { - throw fiber_error( - std::make_error_code( std::errc::operation_not_permitted), - "boost fiber: queue is closed"); - } - return value_pop_( lk); - } - - channel_op_status try_pop( value_type & va) { - std::unique_lock< mutex > lk( mtx_); - if ( is_closed_() && is_empty_() ) { - // let other fibers run - lk.unlock(); - this_fiber::yield(); - return channel_op_status::closed; - } - if ( is_empty_() ) { - // let other fibers run - lk.unlock(); - this_fiber::yield(); - return channel_op_status::empty; - } - va = value_pop_( lk); - return channel_op_status::success; - } - - template< typename Rep, typename Period > - channel_op_status pop_wait_for( value_type & va, - std::chrono::duration< Rep, Period > const& timeout_duration) { - return pop_wait_until( va, std::chrono::steady_clock::now() + timeout_duration); - } - - template< typename Clock, typename Duration > - channel_op_status pop_wait_until( value_type & va, - std::chrono::time_point< Clock, Duration > const& timeout_time) { - std::unique_lock< mutex > lk( mtx_); - if ( ! not_empty_cond_.wait_until( lk, timeout_time, - [this](){ - return is_closed_() || ! is_empty_(); - })) { - return channel_op_status::timeout; - } - if ( is_closed_() && is_empty_() ) { - return channel_op_status::closed; - } - va = value_pop_( lk); - return channel_op_status::success; - } -}; - -}} - -#ifdef BOOST_HAS_ABI_HEADERS -# include BOOST_ABI_SUFFIX -#endif - -#endif // BOOST_FIBERS_UNBOUNDED_CHANNEL_H diff --git a/boost/fiber/unbuffered_channel.hpp b/boost/fiber/unbuffered_channel.hpp index 582d9ae5a7..38a2d6111e 100644 --- a/boost/fiber/unbuffered_channel.hpp +++ b/boost/fiber/unbuffered_channel.hpp @@ -54,13 +54,14 @@ private: }; // shared cacheline - alignas(cache_alignment) std::atomic< slot * > slot_{ nullptr }; + alignas(cache_alignment) std::atomic< slot * > slot_{ nullptr }; // shared cacheline - alignas(cache_alignment) std::atomic_bool closed_{ false }; - mutable detail::spinlock splk_{}; - wait_queue_type waiting_producers_{}; - wait_queue_type waiting_consumers_{}; - char pad_[cacheline_length]; + alignas(cache_alignment) std::atomic_bool closed_{ false }; + alignas(cache_alignment) mutable detail::spinlock splk_producers_{}; + wait_queue_type waiting_producers_{}; + alignas( cache_alignment) mutable detail::spinlock splk_consumers_{}; + wait_queue_type waiting_consumers_{}; + char pad_[cacheline_length]; bool is_empty_() { return nullptr == slot_.load( std::memory_order_acquire); @@ -68,7 +69,7 @@ private: bool try_push_( slot * own_slot) { for (;;) { - slot * s{ slot_.load( std::memory_order_acquire) }; + slot * s = slot_.load( std::memory_order_acquire); if ( nullptr == s) { if ( ! slot_.compare_exchange_strong( s, own_slot, std::memory_order_acq_rel) ) { continue; @@ -81,9 +82,9 @@ private: } slot * try_pop_() { - slot * nil_slot{ nullptr }; + slot * nil_slot = nullptr; for (;;) { - slot * s{ slot_.load( std::memory_order_acquire) }; + slot * s = slot_.load( std::memory_order_acquire); if ( nullptr != s) { if ( ! slot_.compare_exchange_strong( s, nil_slot, std::memory_order_acq_rel) ) { continue;} @@ -97,12 +98,12 @@ public: ~unbuffered_channel() { close(); - slot * s{ nullptr }; + slot * s = nullptr; if ( nullptr != ( s = try_pop_() ) ) { BOOST_ASSERT( nullptr != s); BOOST_ASSERT( nullptr != s->ctx); // value will be destructed in the context of the waiting fiber - context::active()->set_ready( s->ctx); + context::active()->schedule( s->ctx); } } @@ -114,90 +115,89 @@ public: } void close() noexcept { - context * ctx{ context::active() }; - detail::spinlock_lock lk{ splk_ }; - closed_.store( true, std::memory_order_release); + context * active_ctx = context::active(); // notify all waiting producers + closed_.store( true, std::memory_order_release); + detail::spinlock_lock lk1{ splk_producers_ }; while ( ! waiting_producers_.empty() ) { - context * producer_ctx{ & waiting_producers_.front() }; + context * producer_ctx = & waiting_producers_.front(); waiting_producers_.pop_front(); - ctx->set_ready( producer_ctx); + active_ctx->schedule( producer_ctx); } // notify all waiting consumers + detail::spinlock_lock lk2{ splk_consumers_ }; while ( ! waiting_consumers_.empty() ) { - context * consumer_ctx{ & waiting_consumers_.front() }; + context * consumer_ctx = & waiting_consumers_.front(); waiting_consumers_.pop_front(); - ctx->set_ready( consumer_ctx); + active_ctx->schedule( consumer_ctx); } } channel_op_status push( value_type const& value) { - context * ctx{ context::active() }; - slot s{ value, ctx }; + context * active_ctx = context::active(); + slot s{ value, active_ctx }; for (;;) { if ( is_closed() ) { return channel_op_status::closed; } if ( try_push_( & s) ) { - detail::spinlock_lock lk{ splk_ }; + detail::spinlock_lock lk{ splk_consumers_ }; // notify one waiting consumer if ( ! waiting_consumers_.empty() ) { - context * consumer_ctx{ & waiting_consumers_.front() }; + context * consumer_ctx = & waiting_consumers_.front(); waiting_consumers_.pop_front(); - ctx->set_ready( consumer_ctx); + active_ctx->schedule( consumer_ctx); } // suspend till value has been consumed - ctx->suspend( lk); + active_ctx->suspend( lk); // resumed, value has been consumed return channel_op_status::success; } else { - BOOST_ASSERT( ! ctx->wait_is_linked() ); - detail::spinlock_lock lk{ splk_ }; + detail::spinlock_lock lk{ splk_producers_ }; if ( is_closed() ) { return channel_op_status::closed; } if ( is_empty_() ) { continue; } - ctx->wait_link( waiting_producers_); + active_ctx->wait_link( waiting_producers_); // suspend this producer - ctx->suspend( lk); + active_ctx->suspend( lk); // resumed, slot mabye free } } } channel_op_status push( value_type && value) { - context * ctx{ context::active() }; - slot s{ std::move( value), ctx }; + context * active_ctx = context::active(); + slot s{ std::move( value), active_ctx }; for (;;) { if ( is_closed() ) { return channel_op_status::closed; } if ( try_push_( & s) ) { - detail::spinlock_lock lk{ splk_ }; + detail::spinlock_lock lk{ splk_consumers_ }; // notify one waiting consumer if ( ! waiting_consumers_.empty() ) { - context * consumer_ctx{ & waiting_consumers_.front() }; + context * consumer_ctx = & waiting_consumers_.front(); waiting_consumers_.pop_front(); - ctx->set_ready( consumer_ctx); + active_ctx->schedule( consumer_ctx); } // suspend till value has been consumed - ctx->suspend( lk); + active_ctx->suspend( lk); // resumed, value has been consumed return channel_op_status::success; } else { - BOOST_ASSERT( ! ctx->wait_is_linked() ); - detail::spinlock_lock lk{ splk_ }; + detail::spinlock_lock lk{ splk_producers_ }; if ( is_closed() ) { return channel_op_status::closed; } if ( is_empty_() ) { continue; } - ctx->wait_link( waiting_producers_); + active_ctx->wait_link( waiting_producers_); // suspend this producer - ctx->suspend( lk); + active_ctx->suspend( lk); // resumed, slot mabye free } } @@ -220,51 +220,46 @@ public: template< typename Clock, typename Duration > channel_op_status push_wait_until( value_type const& value, std::chrono::time_point< Clock, Duration > const& timeout_time_) { - std::chrono::steady_clock::time_point timeout_time( detail::convert( timeout_time_) ); - context * ctx{ context::active() }; - slot s{ value, ctx }; + context * active_ctx = context::active(); + slot s{ value, active_ctx }; + std::chrono::steady_clock::time_point timeout_time = detail::convert( timeout_time_); for (;;) { if ( is_closed() ) { return channel_op_status::closed; } if ( try_push_( & s) ) { - detail::spinlock_lock lk{ splk_ }; + detail::spinlock_lock lk{ splk_consumers_ }; // notify one waiting consumer if ( ! waiting_consumers_.empty() ) { - context * consumer_ctx{ & waiting_consumers_.front() }; + context * consumer_ctx = & waiting_consumers_.front(); waiting_consumers_.pop_front(); - ctx->set_ready( consumer_ctx); + active_ctx->schedule( consumer_ctx); } // suspend this producer - if ( ! ctx->wait_until( timeout_time, lk) ) { + if ( ! active_ctx->wait_until( timeout_time, lk) ) { // clear slot - slot * nil_slot{ nullptr }, * own_slot{ & s }; + slot * nil_slot = nullptr, * own_slot = & s; slot_.compare_exchange_strong( own_slot, nil_slot, std::memory_order_acq_rel); - // relock local lk - lk.lock(); - // remove from waiting-queue - ctx->wait_unlink(); // resumed, value has not been consumed return channel_op_status::timeout; } // resumed, value has been consumed return channel_op_status::success; } else { - BOOST_ASSERT( ! ctx->wait_is_linked() ); - detail::spinlock_lock lk{ splk_ }; + detail::spinlock_lock lk{ splk_producers_ }; if ( is_closed() ) { return channel_op_status::closed; } if ( is_empty_() ) { continue; } - ctx->wait_link( waiting_producers_); + active_ctx->wait_link( waiting_producers_); // suspend this producer - if ( ! ctx->wait_until( timeout_time, lk) ) { + if ( ! active_ctx->wait_until( timeout_time, lk) ) { // relock local lk lk.lock(); // remove from waiting-queue - ctx->wait_unlink(); + waiting_producers_.remove( * active_ctx); return channel_op_status::timeout; } // resumed, slot maybe free @@ -275,51 +270,46 @@ public: template< typename Clock, typename Duration > channel_op_status push_wait_until( value_type && value, std::chrono::time_point< Clock, Duration > const& timeout_time_) { - std::chrono::steady_clock::time_point timeout_time( detail::convert( timeout_time_) ); - context * ctx{ context::active() }; - slot s{ std::move( value), ctx }; + context * active_ctx = context::active(); + slot s{ std::move( value), active_ctx }; + std::chrono::steady_clock::time_point timeout_time = detail::convert( timeout_time_); for (;;) { if ( is_closed() ) { return channel_op_status::closed; } if ( try_push_( & s) ) { - detail::spinlock_lock lk{ splk_ }; + detail::spinlock_lock lk{ splk_consumers_ }; // notify one waiting consumer if ( ! waiting_consumers_.empty() ) { - context * consumer_ctx{ & waiting_consumers_.front() }; + context * consumer_ctx = & waiting_consumers_.front(); waiting_consumers_.pop_front(); - ctx->set_ready( consumer_ctx); + active_ctx->schedule( consumer_ctx); } // suspend this producer - if ( ! ctx->wait_until( timeout_time, lk) ) { + if ( ! active_ctx->wait_until( timeout_time, lk) ) { // clear slot - slot * nil_slot{ nullptr }, * own_slot{ & s }; + slot * nil_slot = nullptr, * own_slot = & s; slot_.compare_exchange_strong( own_slot, nil_slot, std::memory_order_acq_rel); - // relock local lk - lk.lock(); - // remove from waiting-queue - ctx->wait_unlink(); // resumed, value has not been consumed return channel_op_status::timeout; } // resumed, value has been consumed return channel_op_status::success; } else { - BOOST_ASSERT( ! ctx->wait_is_linked() ); - detail::spinlock_lock lk{ splk_ }; + detail::spinlock_lock lk{ splk_producers_ }; if ( is_closed() ) { return channel_op_status::closed; } if ( is_empty_() ) { continue; } - ctx->wait_link( waiting_producers_); + active_ctx->wait_link( waiting_producers_); // suspend this producer - if ( ! ctx->wait_until( timeout_time, lk) ) { + if ( ! active_ctx->wait_until( timeout_time, lk) ) { // relock local lk lk.lock(); // remove from waiting-queue - ctx->wait_unlink(); + waiting_producers_.remove( * active_ctx); return channel_op_status::timeout; } // resumed, slot maybe free @@ -328,65 +318,63 @@ public: } channel_op_status pop( value_type & value) { - context * ctx{ context::active() }; - slot * s{ nullptr }; + context * active_ctx = context::active(); + slot * s = nullptr; for (;;) { if ( nullptr != ( s = try_pop_() ) ) { { - detail::spinlock_lock lk{ splk_ }; + detail::spinlock_lock lk{ splk_producers_ }; // notify one waiting producer if ( ! waiting_producers_.empty() ) { - context * producer_ctx{ & waiting_producers_.front() }; + context * producer_ctx = & waiting_producers_.front(); waiting_producers_.pop_front(); lk.unlock(); - ctx->set_ready( producer_ctx); + active_ctx->schedule( producer_ctx); } } // consume value value = std::move( s->value); // resume suspended producer - ctx->set_ready( s->ctx); + active_ctx->schedule( s->ctx); return channel_op_status::success; } else { - BOOST_ASSERT( ! ctx->wait_is_linked() ); - detail::spinlock_lock lk{ splk_ }; + detail::spinlock_lock lk{ splk_consumers_ }; if ( is_closed() ) { return channel_op_status::closed; } if ( ! is_empty_() ) { continue; } - ctx->wait_link( waiting_consumers_); + active_ctx->wait_link( waiting_consumers_); // suspend this consumer - ctx->suspend( lk); + active_ctx->suspend( lk); // resumed, slot mabye set } } } value_type value_pop() { - context * ctx{ context::active() }; - slot * s{ nullptr }; + context * active_ctx = context::active(); + slot * s = nullptr; for (;;) { if ( nullptr != ( s = try_pop_() ) ) { { - detail::spinlock_lock lk{ splk_ }; + detail::spinlock_lock lk{ splk_producers_ }; // notify one waiting producer if ( ! waiting_producers_.empty() ) { - context * producer_ctx{ & waiting_producers_.front() }; + context * producer_ctx = & waiting_producers_.front(); waiting_producers_.pop_front(); lk.unlock(); - ctx->set_ready( producer_ctx); + active_ctx->schedule( producer_ctx); } } // consume value value_type value{ std::move( s->value) }; // resume suspended producer - ctx->set_ready( s->ctx); + active_ctx->schedule( s->ctx); return std::move( value); } else { - BOOST_ASSERT( ! ctx->wait_is_linked() ); - detail::spinlock_lock lk{ splk_ }; + detail::spinlock_lock lk{ splk_consumers_ }; if ( is_closed() ) { throw fiber_error{ std::make_error_code( std::errc::operation_not_permitted), @@ -395,9 +383,9 @@ public: if ( ! is_empty_() ) { continue; } - ctx->wait_link( waiting_consumers_); + active_ctx->wait_link( waiting_consumers_); // suspend this consumer - ctx->suspend( lk); + active_ctx->suspend( lk); // resumed, slot mabye set } } @@ -413,42 +401,41 @@ public: template< typename Clock, typename Duration > channel_op_status pop_wait_until( value_type & value, std::chrono::time_point< Clock, Duration > const& timeout_time_) { - std::chrono::steady_clock::time_point timeout_time( detail::convert( timeout_time_) ); - context * ctx{ context::active() }; - slot * s{ nullptr }; + context * active_ctx = context::active(); + slot * s = nullptr; + std::chrono::steady_clock::time_point timeout_time = detail::convert( timeout_time_); for (;;) { if ( nullptr != ( s = try_pop_() ) ) { { - detail::spinlock_lock lk{ splk_ }; + detail::spinlock_lock lk{ splk_producers_ }; // notify one waiting producer if ( ! waiting_producers_.empty() ) { - context * producer_ctx{ & waiting_producers_.front() }; + context * producer_ctx = & waiting_producers_.front(); waiting_producers_.pop_front(); lk.unlock(); - ctx->set_ready( producer_ctx); + active_ctx->schedule( producer_ctx); } } // consume value value = std::move( s->value); // resume suspended producer - ctx->set_ready( s->ctx); + active_ctx->schedule( s->ctx); return channel_op_status::success; } else { - BOOST_ASSERT( ! ctx->wait_is_linked() ); - detail::spinlock_lock lk{ splk_ }; + detail::spinlock_lock lk{ splk_consumers_ }; if ( is_closed() ) { return channel_op_status::closed; } if ( ! is_empty_() ) { continue; } - ctx->wait_link( waiting_consumers_); + active_ctx->wait_link( waiting_consumers_); // suspend this consumer - if ( ! ctx->wait_until( timeout_time, lk) ) { + if ( ! active_ctx->wait_until( timeout_time, lk) ) { // relock local lk lk.lock(); // remove from waiting-queue - ctx->wait_unlink(); + waiting_consumers_.remove( * active_ctx); return channel_op_status::timeout; } } @@ -459,8 +446,8 @@ public: private: typedef typename std::aligned_storage< sizeof( value_type), alignof( value_type) >::type storage_type; - unbuffered_channel * chan_{ nullptr }; - storage_type storage_; + unbuffered_channel * chan_{ nullptr }; + storage_type storage_; void increment_() { BOOST_ASSERT( nullptr != chan_); |