summaryrefslogtreecommitdiff
path: root/boost/fiber
diff options
context:
space:
mode:
Diffstat (limited to 'boost/fiber')
-rw-r--r--boost/fiber/algo/algorithm.hpp34
-rw-r--r--boost/fiber/algo/detail/chase_lev_queue.hpp172
-rw-r--r--boost/fiber/algo/round_robin.hpp4
-rw-r--r--boost/fiber/algo/shared_work.hpp12
-rw-r--r--boost/fiber/algo/work_stealing.hpp42
-rw-r--r--boost/fiber/barrier.hpp14
-rw-r--r--boost/fiber/bounded_channel.hpp433
-rw-r--r--boost/fiber/buffered_channel.hpp510
-rw-r--r--boost/fiber/condition_variable.hpp47
-rw-r--r--boost/fiber/context.hpp267
-rw-r--r--boost/fiber/detail/config.hpp2
-rw-r--r--boost/fiber/detail/context_mpsc_queue.hpp98
-rw-r--r--boost/fiber/detail/context_spinlock_queue.hpp118
-rw-r--r--boost/fiber/detail/context_spmc_queue.hpp99
-rw-r--r--boost/fiber/detail/cpu_relax.hpp36
-rw-r--r--boost/fiber/detail/data.hpp2
-rw-r--r--boost/fiber/detail/fss.hpp5
-rw-r--r--boost/fiber/detail/futex.hpp2
-rw-r--r--boost/fiber/detail/spinlock_ttas.hpp29
-rw-r--r--boost/fiber/detail/spinlock_ttas_adaptive.hpp21
-rw-r--r--boost/fiber/detail/spinlock_ttas_adaptive_futex.hpp14
-rw-r--r--boost/fiber/detail/spinlock_ttas_futex.hpp12
-rw-r--r--boost/fiber/detail/wrap.hpp51
-rw-r--r--boost/fiber/exceptions.hpp30
-rw-r--r--boost/fiber/fiber.hpp8
-rw-r--r--boost/fiber/fss.hpp6
-rw-r--r--boost/fiber/future/async.hpp78
-rw-r--r--boost/fiber/future/detail/shared_state.hpp47
-rw-r--r--boost/fiber/future/detail/shared_state_object.hpp13
-rw-r--r--boost/fiber/future/detail/task_base.hpp4
-rw-r--r--boost/fiber/future/detail/task_object.hpp88
-rw-r--r--boost/fiber/future/future.hpp233
-rw-r--r--boost/fiber/future/packaged_task.hpp27
-rw-r--r--boost/fiber/future/promise.hpp69
-rw-r--r--boost/fiber/mutex.hpp6
-rw-r--r--boost/fiber/operations.hpp8
-rw-r--r--boost/fiber/recursive_mutex.hpp6
-rw-r--r--boost/fiber/recursive_timed_mutex.hpp9
-rw-r--r--boost/fiber/scheduler.hpp71
-rw-r--r--boost/fiber/timed_mutex.hpp9
-rw-r--r--boost/fiber/type.hpp1
-rw-r--r--boost/fiber/unbounded_channel.hpp272
-rw-r--r--boost/fiber/unbuffered_channel.hpp199
43 files changed, 1207 insertions, 2001 deletions
diff --git a/boost/fiber/algo/algorithm.hpp b/boost/fiber/algo/algorithm.hpp
index 515fc5ed5c..9b846e774b 100644
--- a/boost/fiber/algo/algorithm.hpp
+++ b/boost/fiber/algo/algorithm.hpp
@@ -43,11 +43,11 @@ struct BOOST_FIBERS_DECL algorithm {
class BOOST_FIBERS_DECL algorithm_with_properties_base : public algorithm {
public:
// called by fiber_properties::notify() -- don't directly call
- virtual void property_change_( context * f, fiber_properties * props) noexcept = 0;
+ virtual void property_change_( context * ctx, fiber_properties * props) noexcept = 0;
protected:
- static fiber_properties* get_properties( context * f) noexcept;
- static void set_properties( context * f, fiber_properties * p) noexcept;
+ static fiber_properties* get_properties( context * ctx) noexcept;
+ static void set_properties( context * ctx, fiber_properties * p) noexcept;
};
template< typename PROPS >
@@ -58,18 +58,18 @@ struct algorithm_with_properties : public algorithm_with_properties_base {
// must override awakened() with properties parameter instead. Otherwise
// you'd have to remember to start every subclass awakened() override
// with: algorithm_with_properties<PROPS>::awakened(fb);
- virtual void awakened( context * f) noexcept override final {
- fiber_properties * props = super::get_properties( f);
+ virtual void awakened( context * ctx) noexcept override final {
+ fiber_properties * props = super::get_properties( ctx);
if ( nullptr == props) {
// TODO: would be great if PROPS could be allocated on the new
// fiber's stack somehow
- props = new_properties( f);
+ props = new_properties( ctx);
// It is not good for new_properties() to return 0.
- BOOST_ASSERT_MSG(props, "new_properties() must return non-NULL");
+ BOOST_ASSERT_MSG( props, "new_properties() must return non-NULL");
// new_properties() must return instance of (a subclass of) PROPS
BOOST_ASSERT_MSG( dynamic_cast< PROPS * >( props),
"new_properties() must return properties class");
- super::set_properties( f, props);
+ super::set_properties( ctx, props);
}
// Set algo_ again every time this fiber becomes READY. That
// handles the case of a fiber migrating to a new thread with a new
@@ -77,31 +77,31 @@ struct algorithm_with_properties : public algorithm_with_properties_base {
props->set_algorithm( this);
// Okay, now forward the call to subclass override.
- awakened( f, properties(f) );
+ awakened( ctx, properties( ctx) );
}
// subclasses override this method instead of the original awakened()
- virtual void awakened( context *, PROPS& ) noexcept = 0;
+ virtual void awakened( context *, PROPS &) noexcept = 0;
// used for all internal calls
- PROPS & properties( context * f) noexcept {
- return static_cast< PROPS & >( * super::get_properties( f) );
+ PROPS & properties( context * ctx) noexcept {
+ return static_cast< PROPS & >( * super::get_properties( ctx) );
}
// override this to be notified by PROPS::notify()
- virtual void property_change( context * f, PROPS & props) noexcept {
+ virtual void property_change( context * ctx, PROPS & props) noexcept {
}
// implementation for algorithm_with_properties_base method
- void property_change_( context * f, fiber_properties * props ) noexcept override final {
- property_change( f, * static_cast< PROPS * >( props) );
+ void property_change_( context * ctx, fiber_properties * props) noexcept override final {
+ property_change( ctx, * static_cast< PROPS * >( props) );
}
// Override this to customize instantiation of PROPS, e.g. use a different
// allocator. Each PROPS instance is associated with a particular
// context.
- virtual fiber_properties * new_properties( context * f) {
- return new PROPS( f);
+ virtual fiber_properties * new_properties( context * ctx) {
+ return new PROPS( ctx);
}
};
diff --git a/boost/fiber/algo/detail/chase_lev_queue.hpp b/boost/fiber/algo/detail/chase_lev_queue.hpp
deleted file mode 100644
index f51556020d..0000000000
--- a/boost/fiber/algo/detail/chase_lev_queue.hpp
+++ /dev/null
@@ -1,172 +0,0 @@
-
-// Copyright Oliver Kowalke 2013.
-// Distributed under the Boost Software License, Version 1.0.
-// (See accompanying file LICENSE_1_0.txt or copy at
-// http://www.boost.org/LICENSE_1_0.txt)
-
-#ifndef BOOST_FIBERS_ALGO_DETAIL_CHASE_LEV_QUEUE_H
-#define BOOST_FIBERS_ALGO_DETAIL_CHASE_LEV_QUEUE_H
-
-#include <atomic>
-#include <cstddef>
-#include <memory>
-#include <type_traits>
-#include <vector>
-
-#include <boost/assert.hpp>
-#include <boost/config.hpp>
-
-#include <boost/fiber/detail/config.hpp>
-#include <boost/fiber/context.hpp>
-
-// David Chase and Yossi Lev. Dynamic circular work-stealing deque.
-// In SPAA ’05: Proceedings of the seventeenth annual ACM symposium
-// on Parallelism in algorithms and architectures, pages 21–28,
-// New York, NY, USA, 2005. ACM.
-//
-// Nhat Minh Lê, Antoniu Pop, Albert Cohen, and Francesco Zappa Nardelli. 2013.
-// Correct and efficient work-stealing for weak memory models.
-// In Proceedings of the 18th ACM SIGPLAN symposium on Principles and practice
-// of parallel programming (PPoPP '13). ACM, New York, NY, USA, 69-80.
-namespace boost {
-namespace fibers {
-namespace algo {
-namespace detail {
-
-class chase_lev_queue {
-private:
- class circular_buffer {
- private:
- typedef typename std::aligned_storage< sizeof( context *), alignof( context *) >::type storage_t;
-
- int64_t size_;
- context ** items;
- chase_lev_queue * queue_;
-
- public:
- circular_buffer( int64_t size, chase_lev_queue * queue) noexcept :
- size_{ size },
- items{ reinterpret_cast< context ** >( new storage_t[size_] ) },
- queue_{ queue } {
- }
-
- ~circular_buffer() {
- delete [] reinterpret_cast< storage_t * >( items);
- }
-
- int64_t size() const noexcept {
- return size_;
- }
-
- context * get( int64_t idx) noexcept {
- BOOST_ASSERT( 0 <= idx);
- return * (items + (idx & (size() - 1)));
- }
-
- void put( int64_t idx, context * ctx) noexcept {
- BOOST_ASSERT( 0 <= idx);
- * (items + (idx & (size() - 1))) = ctx;
- }
-
- circular_buffer * grow( int64_t top, int64_t bottom) {
- BOOST_ASSERT( 0 <= top);
- BOOST_ASSERT( 0 <= bottom);
- circular_buffer * buffer = new circular_buffer{ size() * 2, queue_ };
- queue_->old_buffers_.push_back( this);
- for ( int64_t i = top; i != bottom; ++i) {
- buffer->put( i, get( i) );
- }
- return buffer;
- }
- };
-
- std::atomic< int64_t > top_{ 0 };
- std::atomic< int64_t > bottom_{ 0 };
- std::atomic< circular_buffer * > buffer_;
- std::vector< circular_buffer * > old_buffers_;
-
-public:
- chase_lev_queue() :
- buffer_{ new circular_buffer{ 1024, this } } {
- old_buffers_.resize( 10);
- }
-
- ~chase_lev_queue() {
- delete buffer_.load( std::memory_order_seq_cst);
- for ( circular_buffer * buffer : old_buffers_) {
- delete buffer;
- }
- }
-
- chase_lev_queue( chase_lev_queue const&) = delete;
- chase_lev_queue( chase_lev_queue &&) = delete;
-
- chase_lev_queue & operator=( chase_lev_queue const&) = delete;
- chase_lev_queue & operator=( chase_lev_queue &&) = delete;
-
- bool empty() const noexcept {
- int64_t bottom = bottom_.load( std::memory_order_relaxed);
- int64_t top = top_.load( std::memory_order_relaxed);
- return bottom <= top;
- }
-
- void push( context * ctx) {
- int64_t bottom = bottom_.load( std::memory_order_relaxed);
- int64_t top = top_.load( std::memory_order_acquire);
- circular_buffer * buffer = buffer_.load( std::memory_order_relaxed);
- if ( (bottom - top) > buffer->size() - 1) {
- // queue is full
- buffer = buffer->grow( top, bottom);
- buffer_.store( buffer, std::memory_order_release);
- }
- buffer->put( bottom, ctx);
- std::atomic_thread_fence( std::memory_order_release);
- bottom_.store( bottom + 1, std::memory_order_relaxed);
- }
-
- context * pop() {
- int64_t bottom = bottom_.load( std::memory_order_relaxed) - 1;
- circular_buffer * buffer = buffer_.load( std::memory_order_relaxed);
- bottom_.store( bottom, std::memory_order_relaxed);
- std::atomic_thread_fence( std::memory_order_seq_cst);
- int64_t top = top_.load( std::memory_order_relaxed);
- context * ctx = nullptr;
- if ( top <= bottom) {
- // queue is not empty
- ctx = buffer->get( bottom);
- // last element
- if ( top == bottom) {
- if ( ! top_.compare_exchange_strong( top, top + 1,
- std::memory_order_seq_cst, std::memory_order_relaxed) ) {
- return nullptr;
- }
- bottom_.store( bottom + 1, std::memory_order_relaxed);
- }
- } else {
- // queue is empty
- bottom_.store( bottom + 1, std::memory_order_relaxed);
- }
- return ctx;
- }
-
- context * steal() {
- int64_t top = top_.load( std::memory_order_acquire);
- std::atomic_thread_fence( std::memory_order_seq_cst);
- int64_t bottom = bottom_.load( std::memory_order_acquire);
- context * ctx = nullptr;
- if ( top < bottom) {
- // queue is not empty
- circular_buffer * buffer = buffer_.load( std::memory_order_consume);
- ctx = buffer->get( top);
- if ( ! top_.compare_exchange_strong( top, top + 1,
- std::memory_order_seq_cst, std::memory_order_relaxed) ) {
- return nullptr;
- }
- }
- return ctx;
- }
-};
-
-}}}}
-
-#endif // #define BOOST_FIBERS_ALGO_DETAIL_CHASE_LEV_QUEUE_H
diff --git a/boost/fiber/algo/round_robin.hpp b/boost/fiber/algo/round_robin.hpp
index 038b424529..e384982567 100644
--- a/boost/fiber/algo/round_robin.hpp
+++ b/boost/fiber/algo/round_robin.hpp
@@ -32,9 +32,9 @@ namespace algo {
class BOOST_FIBERS_DECL round_robin : public algorithm {
private:
- typedef scheduler::ready_queue_t rqueue_t;
+ typedef scheduler::ready_queue_type rqueue_type;
- rqueue_t rqueue_{};
+ rqueue_type rqueue_{};
std::mutex mtx_{};
std::condition_variable cnd_{};
bool flag_{ false };
diff --git a/boost/fiber/algo/shared_work.hpp b/boost/fiber/algo/shared_work.hpp
index e648c5b19f..23bc926e55 100644
--- a/boost/fiber/algo/shared_work.hpp
+++ b/boost/fiber/algo/shared_work.hpp
@@ -34,17 +34,17 @@ namespace algo {
class BOOST_FIBERS_DECL shared_work : public algorithm {
private:
- typedef std::deque< context * > rqueue_t;
- typedef scheduler::ready_queue_t lqueue_t;
+ typedef std::deque< context * > rqueue_type;
+ typedef scheduler::ready_queue_type lqueue_type;
- static rqueue_t rqueue_;
+ static rqueue_type rqueue_;
static std::mutex rqueue_mtx_;
- lqueue_t lqueue_{};
+ lqueue_type lqueue_{};
std::mutex mtx_{};
std::condition_variable cnd_{};
bool flag_{ false };
- bool suspend_;
+ bool suspend_{ false };
public:
shared_work() = default;
@@ -64,7 +64,7 @@ public:
context * pick_next() noexcept;
bool has_ready_fibers() const noexcept {
- std::unique_lock< std::mutex > lock( rqueue_mtx_);
+ std::unique_lock< std::mutex > lock{ rqueue_mtx_ };
return ! rqueue_.empty() || ! lqueue_.empty();
}
diff --git a/boost/fiber/algo/work_stealing.hpp b/boost/fiber/algo/work_stealing.hpp
index 12364ece8c..66cadd12be 100644
--- a/boost/fiber/algo/work_stealing.hpp
+++ b/boost/fiber/algo/work_stealing.hpp
@@ -17,6 +17,7 @@
#include <boost/config.hpp>
#include <boost/fiber/algo/algorithm.hpp>
+#include <boost/fiber/detail/context_spinlock_queue.hpp>
#include <boost/fiber/detail/context_spmc_queue.hpp>
#include <boost/fiber/context.hpp>
#include <boost/fiber/detail/config.hpp>
@@ -32,45 +33,46 @@ namespace algo {
class work_stealing : public algorithm {
private:
- typedef scheduler::ready_queue_t lqueue_t;
-
- static std::vector< work_stealing * > schedulers_;
-
- std::size_t idx_;
- std::size_t max_idx_;
- detail::context_spmc_queue rqueue_{};
- lqueue_t lqueue_{};
- std::mutex mtx_{};
- std::condition_variable cnd_{};
- bool flag_{ false };
- bool suspend_;
+ static std::vector< work_stealing * > schedulers_;
+
+ std::size_t idx_;
+ std::size_t max_idx_;
+#ifdef BOOST_FIBERS_USE_SPMC_QUEUE
+ alignas(cache_alignment) detail::context_spmc_queue rqueue_{};
+#else
+ alignas(cache_alignment) detail::context_spinlock_queue rqueue_{};
+#endif
+ std::mutex mtx_{};
+ std::condition_variable cnd_{};
+ bool flag_{ false };
+ bool suspend_;
static void init_( std::size_t max_idx);
public:
work_stealing( std::size_t max_idx, std::size_t idx, bool suspend = false);
- work_stealing( work_stealing const&) = delete;
- work_stealing( work_stealing &&) = delete;
+ work_stealing( work_stealing const&) = delete;
+ work_stealing( work_stealing &&) = delete;
- work_stealing & operator=( work_stealing const&) = delete;
- work_stealing & operator=( work_stealing &&) = delete;
+ work_stealing & operator=( work_stealing const&) = delete;
+ work_stealing & operator=( work_stealing &&) = delete;
void awakened( context * ctx) noexcept;
context * pick_next() noexcept;
context * steal() noexcept {
- return rqueue_.pop();
+ return rqueue_.steal();
}
bool has_ready_fibers() const noexcept {
- return ! rqueue_.empty() || ! lqueue_.empty();
+ return ! rqueue_.empty();
}
- void suspend_until( std::chrono::steady_clock::time_point const& time_point) noexcept;
+ void suspend_until( std::chrono::steady_clock::time_point const& time_point) noexcept;
- void notify() noexcept;
+ void notify() noexcept;
};
}}}
diff --git a/boost/fiber/barrier.hpp b/boost/fiber/barrier.hpp
index debe7d725a..31a2cd617d 100644
--- a/boost/fiber/barrier.hpp
+++ b/boost/fiber/barrier.hpp
@@ -24,19 +24,19 @@ namespace fibers {
class BOOST_FIBERS_DECL barrier {
private:
- std::size_t initial_;
- std::size_t current_;
- bool cycle_{ true };
- mutex mtx_{};
- condition_variable cond_{};
+ std::size_t initial_;
+ std::size_t current_;
+ std::size_t cycle_{ 0 };
+ mutex mtx_{};
+ condition_variable cond_{};
public:
- explicit barrier( std::size_t);
+ explicit barrier( std::size_t);
barrier( barrier const&) = delete;
barrier & operator=( barrier const&) = delete;
- bool wait();
+ bool wait();
};
}}
diff --git a/boost/fiber/bounded_channel.hpp b/boost/fiber/bounded_channel.hpp
deleted file mode 100644
index ac257b4ff5..0000000000
--- a/boost/fiber/bounded_channel.hpp
+++ /dev/null
@@ -1,433 +0,0 @@
-
-// Copyright Oliver Kowalke 2013.
-// Distributed under the Boost Software License, Version 1.0.
-// (See accompanying file LICENSE_1_0.txt or copy at
-// http://www.boost.org/LICENSE_1_0.txt)
-//
-
-#ifndef BOOST_FIBERS_BOUNDED_CHANNEL_H
-#define BOOST_FIBERS_BOUNDED_CHANNEL_H
-
-#warn "template bounded_channel is deprecated"
-
-#include <algorithm>
-#include <atomic>
-#include <chrono>
-#include <cstddef>
-#include <memory>
-#include <mutex>
-#include <system_error>
-#include <utility>
-
-#include <boost/config.hpp>
-#include <boost/intrusive_ptr.hpp>
-
-#include <boost/fiber/detail/config.hpp>
-#include <boost/fiber/exceptions.hpp>
-#include <boost/fiber/exceptions.hpp>
-#include <boost/fiber/condition_variable.hpp>
-#include <boost/fiber/mutex.hpp>
-#include <boost/fiber/channel_op_status.hpp>
-
-#ifdef BOOST_HAS_ABI_HEADERS
-# include BOOST_ABI_PREFIX
-#endif
-
-namespace boost {
-namespace fibers {
-
-template< typename T,
- typename Allocator = std::allocator< T >
->
-class bounded_channel {
-public:
- typedef T value_type;
-
-private:
- struct node {
- typedef intrusive_ptr< node > ptr_t;
- typedef typename std::allocator_traits< Allocator >::template rebind_alloc<
- node
- > allocator_t;
- typedef std::allocator_traits< allocator_t > allocator_traits_t;
-
-#if ! defined(BOOST_FIBERS_NO_ATOMICS)
- std::atomic< std::size_t > use_count{ 0 };
-#else
- std::size_t use_count{ 0 };
-#endif
- allocator_t alloc;
- T va;
- ptr_t nxt{};
-
- node( T const& t, allocator_t const& alloc_) noexcept :
- alloc{ alloc_ },
- va{ t } {
- }
-
- node( T && t, allocator_t & alloc_) noexcept :
- alloc{ alloc_ },
- va{ std::move( t) } {
- }
-
- friend
- void intrusive_ptr_add_ref( node * p) noexcept {
- ++p->use_count;
- }
-
- friend
- void intrusive_ptr_release( node * p) noexcept {
- if ( 0 == --p->use_count) {
- allocator_t alloc( p->alloc);
- allocator_traits_t::destroy( alloc, p);
- allocator_traits_t::deallocate( alloc, p, 1);
- }
- }
- };
-
- using ptr_t = typename node::ptr_t;
- using allocator_t = typename node::allocator_t;
- using allocator_traits_t = typename node::allocator_traits_t;
-
- enum class queue_status {
- open = 0,
- closed
- };
-
- allocator_t alloc_;
- queue_status state_{ queue_status::open };
- std::size_t count_{ 0 };
- ptr_t head_{};
- ptr_t * tail_;
- mutable mutex mtx_{};
- condition_variable not_empty_cond_{};
- condition_variable not_full_cond_{};
- std::size_t hwm_;
- std::size_t lwm_;
-
- bool is_closed_() const noexcept {
- return queue_status::closed == state_;
- }
-
- void close_( std::unique_lock< boost::fibers::mutex > & lk) noexcept {
- state_ = queue_status::closed;
- lk.unlock();
- not_empty_cond_.notify_all();
- not_full_cond_.notify_all();
- }
-
- std::size_t size_() const noexcept {
- return count_;
- }
-
- bool is_empty_() const noexcept {
- return ! head_;
- }
-
- bool is_full_() const noexcept {
- return count_ >= hwm_;
- }
-
- channel_op_status push_( ptr_t new_node,
- std::unique_lock< boost::fibers::mutex > & lk) {
- if ( is_closed_() ) {
- return channel_op_status::closed;
- }
- not_full_cond_.wait( lk,
- [this](){
- return ! is_full_();
- });
- return push_and_notify_( new_node, lk);
- }
-
- channel_op_status try_push_( ptr_t new_node,
- std::unique_lock< boost::fibers::mutex > & lk) noexcept {
- if ( is_closed_() ) {
- return channel_op_status::closed;
- }
- if ( is_full_() ) {
- return channel_op_status::full;
- }
- return push_and_notify_( new_node, lk);
- }
-
- template< typename Clock, typename Duration >
- channel_op_status push_wait_until_( ptr_t new_node,
- std::chrono::time_point< Clock, Duration > const& timeout_time,
- std::unique_lock< boost::fibers::mutex > & lk) {
- if ( is_closed_() ) {
- return channel_op_status::closed;
- }
- if ( ! not_full_cond_.wait_until( lk, timeout_time,
- [this](){
- return ! is_full_();
- })) {
- return channel_op_status::timeout;
- }
- return push_and_notify_( new_node, lk);
- }
-
- channel_op_status push_and_notify_( ptr_t new_node,
- std::unique_lock< boost::fibers::mutex > & lk) noexcept {
- push_tail_( new_node);
- lk.unlock();
- not_empty_cond_.notify_one();
- return channel_op_status::success;
- }
-
- void push_tail_( ptr_t new_node) noexcept {
- * tail_ = new_node;
- tail_ = & new_node->nxt;
- ++count_;
- }
-
- value_type value_pop_( std::unique_lock< boost::fibers::mutex > & lk) {
- BOOST_ASSERT( ! is_empty_() );
- auto old_head = pop_head_();
- if ( size_() <= lwm_) {
- if ( lwm_ == hwm_) {
- lk.unlock();
- not_full_cond_.notify_one();
- } else {
- lk.unlock();
- // more than one producer could be waiting
- // to push a value
- not_full_cond_.notify_all();
- }
- }
- return std::move( old_head->va);
- }
-
- ptr_t pop_head_() noexcept {
- auto old_head = head_;
- head_ = old_head->nxt;
- if ( ! head_) {
- tail_ = & head_;
- }
- old_head->nxt.reset();
- --count_;
- return old_head;
- }
-
-public:
- bounded_channel( std::size_t hwm, std::size_t lwm,
- Allocator const& alloc = Allocator() ) :
- alloc_{ alloc },
- tail_{ & head_ },
- hwm_{ hwm },
- lwm_{ lwm } {
- if ( hwm_ <= lwm_) {
- throw fiber_error( std::make_error_code( std::errc::invalid_argument),
- "boost fiber: high-watermark is less than or equal to low-watermark for bounded_channel");
- }
- if ( 0 == hwm) {
- throw fiber_error( std::make_error_code( std::errc::invalid_argument),
- "boost fiber: high-watermark is zero");
- }
- }
-
- bounded_channel( std::size_t wm,
- Allocator const& alloc = Allocator() ) :
- alloc_{ alloc },
- tail_{ & head_ },
- hwm_{ wm },
- lwm_{ wm - 1 } {
- if ( 0 == wm) {
- throw fiber_error( std::make_error_code( std::errc::invalid_argument),
- "boost fiber: watermark is zero");
- }
- }
-
- bounded_channel( bounded_channel const&) = delete;
- bounded_channel & operator=( bounded_channel const&) = delete;
-
- std::size_t upper_bound() const noexcept {
- return hwm_;
- }
-
- std::size_t lower_bound() const noexcept {
- return lwm_;
- }
-
- void close() noexcept {
- std::unique_lock< mutex > lk( mtx_);
- close_( lk);
- }
-
- channel_op_status push( value_type const& va) {
- typename allocator_traits_t::pointer ptr{
- allocator_traits_t::allocate( alloc_, 1) };
- try {
- allocator_traits_t::construct( alloc_, ptr, va, alloc_);
- } catch (...) {
- allocator_traits_t::deallocate( alloc_, ptr, 1);
- throw;
- }
- std::unique_lock< mutex > lk( mtx_);
- return push_( { detail::convert( ptr) }, lk);
- }
-
- channel_op_status push( value_type && va) {
- typename allocator_traits_t::pointer ptr{
- allocator_traits_t::allocate( alloc_, 1) };
- try {
- allocator_traits_t::construct(
- alloc_, ptr, std::move( va), alloc_);
- } catch (...) {
- allocator_traits_t::deallocate( alloc_, ptr, 1);
- throw;
- }
- std::unique_lock< mutex > lk( mtx_);
- return push_( { detail::convert( ptr) }, lk);
- }
-
- template< typename Rep, typename Period >
- channel_op_status push_wait_for( value_type const& va,
- std::chrono::duration< Rep, Period > const& timeout_duration) {
- return push_wait_until( va,
- std::chrono::steady_clock::now() + timeout_duration);
- }
-
- template< typename Rep, typename Period >
- channel_op_status push_wait_for( value_type && va,
- std::chrono::duration< Rep, Period > const& timeout_duration) {
- return push_wait_until( std::forward< value_type >( va),
- std::chrono::steady_clock::now() + timeout_duration);
- }
-
- template< typename Clock, typename Duration >
- channel_op_status push_wait_until( value_type const& va,
- std::chrono::time_point< Clock, Duration > const& timeout_time) {
- typename allocator_traits_t::pointer ptr{
- allocator_traits_t::allocate( alloc_, 1) };
- try {
- allocator_traits_t::construct( alloc_, ptr, va, alloc_);
- } catch (...) {
- allocator_traits_t::deallocate( alloc_, ptr, 1);
- throw;
- }
- std::unique_lock< mutex > lk( mtx_);
- return push_wait_until_( { detail::convert( ptr) }, timeout_time, lk);
- }
-
- template< typename Clock, typename Duration >
- channel_op_status push_wait_until( value_type && va,
- std::chrono::time_point< Clock, Duration > const& timeout_time) {
- typename allocator_traits_t::pointer ptr{
- allocator_traits_t::allocate( alloc_, 1) };
- try {
- allocator_traits_t::construct(
- alloc_, ptr, std::move( va), alloc_);
- } catch (...) {
- allocator_traits_t::deallocate( alloc_, ptr, 1);
- throw;
- }
- std::unique_lock< mutex > lk( mtx_);
- return push_wait_until_( { detail::convert( ptr) }, timeout_time, lk);
- }
-
- channel_op_status try_push( value_type const& va) {
- typename allocator_traits_t::pointer ptr{
- allocator_traits_t::allocate( alloc_, 1) };
- try {
- allocator_traits_t::construct( alloc_, ptr, va, alloc_);
- } catch (...) {
- allocator_traits_t::deallocate( alloc_, ptr, 1);
- throw;
- }
- std::unique_lock< mutex > lk( mtx_);
- return try_push_( { detail::convert( ptr) }, lk);
- }
-
- channel_op_status try_push( value_type && va) {
- typename allocator_traits_t::pointer ptr{
- allocator_traits_t::allocate( alloc_, 1) };
- try {
- allocator_traits_t::construct(
- alloc_, ptr, std::move( va), alloc_);
- } catch (...) {
- allocator_traits_t::deallocate( alloc_, ptr, 1);
- throw;
- }
- std::unique_lock< mutex > lk( mtx_);
- return try_push_( { detail::convert( ptr) }, lk);
- }
-
- channel_op_status pop( value_type & va) {
- std::unique_lock< mutex > lk( mtx_);
- not_empty_cond_.wait( lk,
- [this](){
- return is_closed_() || ! is_empty_();
- });
- if ( is_closed_() && is_empty_() ) {
- return channel_op_status::closed;
- }
- va = value_pop_( lk);
- return channel_op_status::success;
- }
-
- value_type value_pop() {
- std::unique_lock< mutex > lk( mtx_);
- not_empty_cond_.wait( lk,
- [this](){
- return is_closed_() || ! is_empty_();
- });
- if ( is_closed_() && is_empty_() ) {
- throw fiber_error(
- std::make_error_code( std::errc::operation_not_permitted),
- "boost fiber: queue is closed");
- }
- return value_pop_( lk);
- }
-
- channel_op_status try_pop( value_type & va) {
- std::unique_lock< mutex > lk( mtx_);
- if ( is_closed_() && is_empty_() ) {
- // let other fibers run
- lk.unlock();
- this_fiber::yield();
- return channel_op_status::closed;
- }
- if ( is_empty_() ) {
- // let other fibers run
- lk.unlock();
- this_fiber::yield();
- return channel_op_status::empty;
- }
- va = value_pop_( lk);
- return channel_op_status::success;
- }
-
- template< typename Rep, typename Period >
- channel_op_status pop_wait_for( value_type & va,
- std::chrono::duration< Rep, Period > const& timeout_duration) {
- return pop_wait_until( va,
- std::chrono::steady_clock::now() + timeout_duration);
- }
-
- template< typename Clock, typename Duration >
- channel_op_status pop_wait_until( value_type & va,
- std::chrono::time_point< Clock, Duration > const& timeout_time) {
- std::unique_lock< mutex > lk( mtx_);
- if ( ! not_empty_cond_.wait_until( lk,
- timeout_time,
- [this](){
- return is_closed_() || ! is_empty_();
- })) {
- return channel_op_status::timeout;
- }
- if ( is_closed_() && is_empty_() ) {
- return channel_op_status::closed;
- }
- va = value_pop_( lk);
- return channel_op_status::success;
- }
-};
-
-}}
-
-#ifdef BOOST_HAS_ABI_HEADERS
-# include BOOST_ABI_SUFFIX
-#endif
-
-#endif // BOOST_FIBERS_BOUNDED_CHANNEL_H
diff --git a/boost/fiber/buffered_channel.hpp b/boost/fiber/buffered_channel.hpp
index 25109a4976..1c32e49bae 100644
--- a/boost/fiber/buffered_channel.hpp
+++ b/boost/fiber/buffered_channel.hpp
@@ -4,8 +4,6 @@
// (See accompanying file LICENSE_1_0.txt or copy at
// http://www.boost.org/LICENSE_1_0.txt)
//
-// based on Dmitry Vyukov's MPMC queue
-// (http://www.1024cores.net/home/lock-free-algorithms/queues/bounded-mpmc-queue)
#ifndef BOOST_FIBERS_BUFFERED_CHANNEL_H
#define BOOST_FIBERS_BUFFERED_CHANNEL_H
@@ -39,120 +37,42 @@ public:
typedef T value_type;
private:
- typedef typename std::aligned_storage< sizeof( T), alignof( T) >::type storage_type;
- typedef context::wait_queue_t wait_queue_type;
-
- struct alignas(cache_alignment) slot {
- std::atomic< std::size_t > cycle{ 0 };
- storage_type storage{};
-
- slot() = default;
- };
-
- // procuder cacheline
- alignas(cache_alignment) std::atomic< std::size_t > producer_idx_{ 0 };
- // consumer cacheline
- alignas(cache_alignment) std::atomic< std::size_t > consumer_idx_{ 0 };
- // shared write cacheline
- alignas(cache_alignment) std::atomic_bool closed_{ false };
- mutable detail::spinlock splk_{};
- wait_queue_type waiting_producers_{};
- wait_queue_type waiting_consumers_{};
- // shared read cacheline
- alignas(cache_alignment) slot * slots_{ nullptr };
- std::size_t capacity_;
- char pad_[cacheline_length];
-
- bool is_full_() {
- std::size_t idx{ producer_idx_.load( std::memory_order_relaxed) };
- return 0 > static_cast< std::intptr_t >( slots_[idx & (capacity_ - 1)].cycle.load( std::memory_order_acquire) ) - static_cast< std::intptr_t >( idx);
- }
-
- bool is_empty_() {
- std::size_t idx{ consumer_idx_.load( std::memory_order_relaxed) };
- return 0 > static_cast< std::intptr_t >( slots_[idx & (capacity_ - 1)].cycle.load( std::memory_order_acquire) ) - static_cast< std::intptr_t >( idx + 1);
- }
-
- template< typename ValueType >
- channel_op_status try_push_( ValueType && value) {
- slot * s{ nullptr };
- std::size_t idx{ producer_idx_.load( std::memory_order_relaxed) };
- for (;;) {
- s = & slots_[idx & (capacity_ - 1)];
- std::size_t cycle{ s->cycle.load( std::memory_order_acquire) };
- std::intptr_t diff{ static_cast< std::intptr_t >( cycle) - static_cast< std::intptr_t >( idx) };
- if ( 0 == diff) {
- if ( producer_idx_.compare_exchange_weak( idx, idx + 1, std::memory_order_relaxed) ) {
- break;
- }
- } else if ( 0 > diff) {
- return channel_op_status::full;
- } else {
- idx = producer_idx_.load( std::memory_order_relaxed);
- }
- }
- ::new ( static_cast< void * >( std::addressof( s->storage) ) ) value_type( std::forward< ValueType >( value) );
- s->cycle.store( idx + 1, std::memory_order_release);
- return channel_op_status::success;
- }
-
- channel_op_status try_value_pop_( slot *& s, std::size_t & idx) {
- idx = consumer_idx_.load( std::memory_order_relaxed);
- for (;;) {
- s = & slots_[idx & (capacity_ - 1)];
- std::size_t cycle = s->cycle.load( std::memory_order_acquire);
- std::intptr_t diff{ static_cast< std::intptr_t >( cycle) - static_cast< std::intptr_t >( idx + 1) };
- if ( 0 == diff) {
- if ( consumer_idx_.compare_exchange_weak( idx, idx + 1, std::memory_order_relaxed) ) {
- break;
- }
- } else if ( 0 > diff) {
- return channel_op_status::empty;
- } else {
- idx = consumer_idx_.load( std::memory_order_relaxed);
- }
- }
- // incrementing the slot cycle must be deferred till the value has been consumed
- // slot cycle tells procuders that the cell can be re-used (store new value)
- return channel_op_status::success;
- }
-
- channel_op_status try_pop_( value_type & value) {
- slot * s{ nullptr };
- std::size_t idx{ 0 };
- channel_op_status status{ try_value_pop_( s, idx) };
- if ( channel_op_status::success == status) {
- value = std::move( * reinterpret_cast< value_type * >( std::addressof( s->storage) ) );
- s->cycle.store( idx + capacity_, std::memory_order_release);
- }
- return status;
+ typedef context::wait_queue_t wait_queue_type;
+ typedef T slot_type;
+
+ alignas(cache_alignment) mutable detail::spinlock splk_{};
+ wait_queue_type waiting_producers_{};
+ wait_queue_type waiting_consumers_{};
+ slot_type * slots_;
+ std::size_t pidx_{ 0 };
+ std::size_t cidx_{ 0 };
+ std::size_t capacity_;
+ bool closed_{ false };
+
+ bool is_full_() const noexcept {
+ return cidx_ == ((pidx_ + 1) % capacity_);
+ }
+
+ bool is_empty_() const noexcept {
+ return cidx_ == pidx_;
+ }
+
+ bool is_closed_() const noexcept {
+ return closed_;
}
public:
explicit buffered_channel( std::size_t capacity) :
- capacity_{ capacity } {
- if ( 0 == capacity_ || 0 != ( capacity_ & (capacity_ - 1) ) ) {
- throw fiber_error( std::make_error_code( std::errc::invalid_argument),
- "boost fiber: buffer capacity is invalid");
- }
- slots_ = new slot[capacity_]();
- for ( std::size_t i = 0; i < capacity_; ++i) {
- slots_[i].cycle.store( i, std::memory_order_relaxed);
+ capacity_{ capacity } {
+ if ( 2 > capacity_ || 0 != ( capacity_ & (capacity_ - 1) ) ) {
+ throw fiber_error{ std::make_error_code( std::errc::invalid_argument),
+ "boost fiber: buffer capacity is invalid" };
}
+ slots_ = new slot_type[capacity_];
}
~buffered_channel() {
close();
- for (;;) {
- slot * s{ nullptr };
- std::size_t idx{ 0 };
- if ( channel_op_status::success == try_value_pop_( s, idx) ) {
- reinterpret_cast< value_type * >( std::addressof( s->storage) )->~value_type();
- s->cycle.store( idx + capacity_, std::memory_order_release);
- } else {
- break;
- }
- }
delete [] slots_;
}
@@ -160,109 +80,116 @@ public:
buffered_channel & operator=( buffered_channel const&) = delete;
bool is_closed() const noexcept {
- return closed_.load( std::memory_order_acquire);
+ detail::spinlock_lock lk{ splk_ };
+ return is_closed_();
}
void close() noexcept {
- context * ctx{ context::active() };
+ context * active_ctx = context::active();
detail::spinlock_lock lk{ splk_ };
- closed_.store( true, std::memory_order_release);
+ closed_ = true;
// notify all waiting producers
while ( ! waiting_producers_.empty() ) {
- context * producer_ctx{ & waiting_producers_.front() };
+ context * producer_ctx = & waiting_producers_.front();
waiting_producers_.pop_front();
- ctx->set_ready( producer_ctx);
+ active_ctx->schedule( producer_ctx);
}
// notify all waiting consumers
while ( ! waiting_consumers_.empty() ) {
- context * consumer_ctx{ & waiting_consumers_.front() };
+ context * consumer_ctx = & waiting_consumers_.front();
waiting_consumers_.pop_front();
- ctx->set_ready( consumer_ctx);
+ active_ctx->schedule( consumer_ctx);
}
}
channel_op_status try_push( value_type const& value) {
- if ( is_closed() ) {
+ context * active_ctx = context::active();
+ detail::spinlock_lock lk{ splk_ };
+ if ( is_closed_() ) {
return channel_op_status::closed;
+ } else if ( is_full_() ) {
+ return channel_op_status::full;
+ } else {
+ slots_[pidx_] = value;
+ pidx_ = (pidx_ + 1) % capacity_;
+ // notify one waiting consumer
+ if ( ! waiting_consumers_.empty() ) {
+ context * consumer_ctx = & waiting_consumers_.front();
+ waiting_consumers_.pop_front();
+ lk.unlock();
+ active_ctx->schedule( consumer_ctx);
+ }
+ return channel_op_status::success;
}
- return try_push_( value);
}
channel_op_status try_push( value_type && value) {
- if ( is_closed() ) {
+ context * active_ctx = context::active();
+ detail::spinlock_lock lk{ splk_ };
+ if ( is_closed_() ) {
return channel_op_status::closed;
+ } else if ( is_full_() ) {
+ return channel_op_status::full;
+ } else {
+ slots_[pidx_] = std::move( value);
+ pidx_ = (pidx_ + 1) % capacity_;
+ // notify one waiting consumer
+ if ( ! waiting_consumers_.empty() ) {
+ context * consumer_ctx = & waiting_consumers_.front();
+ waiting_consumers_.pop_front();
+ lk.unlock();
+ active_ctx->schedule( consumer_ctx);
+ }
+ return channel_op_status::success;
}
- return try_push_( std::move( value) );
}
channel_op_status push( value_type const& value) {
- context * ctx{ context::active() };
+ context * active_ctx = context::active();
for (;;) {
- if ( is_closed() ) {
+ detail::spinlock_lock lk{ splk_ };
+ if ( is_closed_() ) {
return channel_op_status::closed;
- }
- channel_op_status status{ try_push_( value) };
- if ( channel_op_status::success == status) {
- detail::spinlock_lock lk{ splk_ };
+ } else if ( is_full_() ) {
+ active_ctx->wait_link( waiting_producers_);
+ // suspend this producer
+ active_ctx->suspend( lk);
+ } else {
+ slots_[pidx_] = value;
+ pidx_ = (pidx_ + 1) % capacity_;
// notify one waiting consumer
if ( ! waiting_consumers_.empty() ) {
- context * consumer_ctx{ & waiting_consumers_.front() };
+ context * consumer_ctx = & waiting_consumers_.front();
waiting_consumers_.pop_front();
lk.unlock();
- ctx->set_ready( consumer_ctx);
- }
- return status;
- } else if ( channel_op_status::full == status) {
- BOOST_ASSERT( ! ctx->wait_is_linked() );
- detail::spinlock_lock lk{ splk_ };
- if ( is_closed() ) {
- return channel_op_status::closed;
+ active_ctx->schedule( consumer_ctx);
}
- if ( ! is_full_() ) {
- continue;
- }
- ctx->wait_link( waiting_producers_);
- // suspend this producer
- ctx->suspend( lk);
- } else {
- BOOST_ASSERT( channel_op_status::closed == status);
- return status;
+ return channel_op_status::success;
}
}
}
channel_op_status push( value_type && value) {
- context * ctx{ context::active() };
+ context * active_ctx = context::active();
for (;;) {
- if ( is_closed() ) {
+ detail::spinlock_lock lk{ splk_ };
+ if ( is_closed_() ) {
return channel_op_status::closed;
- }
- channel_op_status status{ try_push_( std::move( value) ) };
- if ( channel_op_status::success == status) {
- detail::spinlock_lock lk{ splk_ };
+ } else if ( is_full_() ) {
+ active_ctx->wait_link( waiting_producers_);
+ // suspend this producer
+ active_ctx->suspend( lk);
+ } else {
+ slots_[pidx_] = std::move( value);
+ pidx_ = (pidx_ + 1) % capacity_;
// notify one waiting consumer
if ( ! waiting_consumers_.empty() ) {
- context * consumer_ctx{ & waiting_consumers_.front() };
+ context * consumer_ctx = & waiting_consumers_.front();
waiting_consumers_.pop_front();
lk.unlock();
- ctx->set_ready( consumer_ctx);
+ active_ctx->schedule( consumer_ctx);
}
- return status;
- } else if ( channel_op_status::full == status) {
- BOOST_ASSERT( ! ctx->wait_is_linked() );
- detail::spinlock_lock lk{ splk_ };
- if ( is_closed() ) {
- return channel_op_status::closed;
- }
- if ( ! is_full_() ) {
- continue;
- }
- ctx->wait_link( waiting_producers_);
- // suspend this producer
- ctx->suspend( lk);
- } else {
- BOOST_ASSERT( channel_op_status::closed == status);
- return status;
+ return channel_op_status::success;
}
}
}
@@ -284,44 +211,33 @@ public:
template< typename Clock, typename Duration >
channel_op_status push_wait_until( value_type const& value,
std::chrono::time_point< Clock, Duration > const& timeout_time_) {
- std::chrono::steady_clock::time_point timeout_time( detail::convert( timeout_time_) );
- context * ctx{ context::active() };
+ context * active_ctx = context::active();
+ std::chrono::steady_clock::time_point timeout_time = detail::convert( timeout_time_);
for (;;) {
- if ( is_closed() ) {
+ detail::spinlock_lock lk{ splk_ };
+ if ( is_closed_() ) {
return channel_op_status::closed;
- }
- channel_op_status status{ try_push_( value) };
- if ( channel_op_status::success == status) {
- detail::spinlock_lock lk{ splk_ };
- // notify one waiting consumer
- if ( ! waiting_consumers_.empty() ) {
- context * consumer_ctx{ & waiting_consumers_.front() };
- waiting_consumers_.pop_front();
- lk.unlock();
- ctx->set_ready( consumer_ctx);
- }
- return status;
- } else if ( channel_op_status::full == status) {
- BOOST_ASSERT( ! ctx->wait_is_linked() );
- detail::spinlock_lock lk{ splk_ };
- if ( is_closed() ) {
- return channel_op_status::closed;
- }
- if ( ! is_full_() ) {
- continue;
- }
- ctx->wait_link( waiting_producers_);
+ } else if ( is_full_() ) {
+ active_ctx->wait_link( waiting_producers_);
// suspend this producer
- if ( ! ctx->wait_until( timeout_time, lk) ) {
+ if ( ! active_ctx->wait_until( timeout_time, lk) ) {
// relock local lk
lk.lock();
// remove from waiting-queue
- ctx->wait_unlink();
+ waiting_producers_.remove( * active_ctx);
return channel_op_status::timeout;
}
} else {
- BOOST_ASSERT( channel_op_status::closed == status);
- return status;
+ slots_[pidx_] = value;
+ pidx_ = (pidx_ + 1) % capacity_;
+ // notify one waiting consumer
+ if ( ! waiting_consumers_.empty() ) {
+ context * consumer_ctx = & waiting_consumers_.front();
+ waiting_consumers_.pop_front();
+ lk.unlock();
+ active_ctx->schedule( consumer_ctx);
+ }
+ return channel_op_status::success;
}
}
}
@@ -329,128 +245,110 @@ public:
template< typename Clock, typename Duration >
channel_op_status push_wait_until( value_type && value,
std::chrono::time_point< Clock, Duration > const& timeout_time_) {
- std::chrono::steady_clock::time_point timeout_time( detail::convert( timeout_time_) );
- context * ctx{ context::active() };
+ context * active_ctx = context::active();
+ std::chrono::steady_clock::time_point timeout_time = detail::convert( timeout_time_);
for (;;) {
- if ( is_closed() ) {
+ detail::spinlock_lock lk{ splk_ };
+ if ( is_closed_() ) {
return channel_op_status::closed;
- }
- channel_op_status status{ try_push_( std::move( value) ) };
- if ( channel_op_status::success == status) {
- detail::spinlock_lock lk{ splk_ };
- // notify one waiting consumer
- if ( ! waiting_consumers_.empty() ) {
- context * consumer_ctx{ & waiting_consumers_.front() };
- waiting_consumers_.pop_front();
- lk.unlock();
- ctx->set_ready( consumer_ctx);
- }
- return status;
- } else if ( channel_op_status::full == status) {
- BOOST_ASSERT( ! ctx->wait_is_linked() );
- detail::spinlock_lock lk{ splk_ };
- if ( is_closed() ) {
- return channel_op_status::closed;
- }
- if ( ! is_full_() ) {
- continue;
- }
- ctx->wait_link( waiting_producers_);
+ } else if ( is_full_() ) {
+ active_ctx->wait_link( waiting_producers_);
// suspend this producer
- if ( ! ctx->wait_until( timeout_time, lk) ) {
+ if ( ! active_ctx->wait_until( timeout_time, lk) ) {
// relock local lk
lk.lock();
// remove from waiting-queue
- ctx->wait_unlink();
+ waiting_producers_.remove( * active_ctx);
return channel_op_status::timeout;
}
} else {
- BOOST_ASSERT( channel_op_status::closed == status);
- return status;
+ slots_[pidx_] = std::move( value);
+ pidx_ = (pidx_ + 1) % capacity_;
+ // notify one waiting consumer
+ if ( ! waiting_consumers_.empty() ) {
+ context * consumer_ctx = & waiting_consumers_.front();
+ waiting_consumers_.pop_front();
+ lk.unlock();
+ active_ctx->schedule( consumer_ctx);
+ }
+ return channel_op_status::success;
}
}
}
channel_op_status try_pop( value_type & value) {
- channel_op_status status{ try_pop_( value) };
- if ( channel_op_status::success != status) {
- if ( is_closed() ) {
- status = channel_op_status::closed;
+ context * active_ctx = context::active();
+ detail::spinlock_lock lk{ splk_ };
+ if ( is_empty_() ) {
+ return is_closed_()
+ ? channel_op_status::closed
+ : channel_op_status::empty;
+ } else {
+ value = std::move( slots_[cidx_]);
+ cidx_ = (cidx_ + 1) % capacity_;
+ // notify one waiting producer
+ if ( ! waiting_producers_.empty() ) {
+ context * producer_ctx = & waiting_producers_.front();
+ waiting_producers_.pop_front();
+ lk.unlock();
+ active_ctx->schedule( producer_ctx);
}
+ return channel_op_status::success;
}
- return status;
}
channel_op_status pop( value_type & value) {
- context * ctx{ context::active() };
+ context * active_ctx = context::active();
for (;;) {
- channel_op_status status{ try_pop_( value) };
- if ( channel_op_status::success == status) {
- detail::spinlock_lock lk{ splk_ };
+ detail::spinlock_lock lk{ splk_ };
+ if ( is_empty_() ) {
+ if ( is_closed_() ) {
+ return channel_op_status::closed;
+ } else {
+ active_ctx->wait_link( waiting_consumers_);
+ // suspend this consumer
+ active_ctx->suspend( lk);
+ }
+ } else {
+ value = std::move( slots_[cidx_]);
+ cidx_ = (cidx_ + 1) % capacity_;
// notify one waiting producer
if ( ! waiting_producers_.empty() ) {
- context * producer_ctx{ & waiting_producers_.front() };
+ context * producer_ctx = & waiting_producers_.front();
waiting_producers_.pop_front();
lk.unlock();
- ctx->set_ready( producer_ctx);
+ active_ctx->schedule( producer_ctx);
}
- return status;
- } else if ( channel_op_status::empty == status) {
- BOOST_ASSERT( ! ctx->wait_is_linked() );
- detail::spinlock_lock lk{ splk_ };
- if ( is_closed() ) {
- return channel_op_status::closed;
- }
- if ( ! is_empty_() ) {
- continue;
- }
- ctx->wait_link( waiting_consumers_);
- // suspend this consumer
- ctx->suspend( lk);
- } else {
- BOOST_ASSERT( channel_op_status::closed == status);
- return status;
+ return channel_op_status::success;
}
}
}
value_type value_pop() {
- context * ctx{ context::active() };
+ context * active_ctx = context::active();
for (;;) {
- slot * s{ nullptr };
- std::size_t idx{ 0 };
- channel_op_status status{ try_value_pop_( s, idx) };
- if ( channel_op_status::success == status) {
- value_type value{ std::move( * reinterpret_cast< value_type * >( std::addressof( s->storage) ) ) };
- s->cycle.store( idx + capacity_, std::memory_order_release);
- detail::spinlock_lock lk{ splk_ };
+ detail::spinlock_lock lk{ splk_ };
+ if ( is_empty_() ) {
+ if ( is_closed_() ) {
+ throw fiber_error{
+ std::make_error_code( std::errc::operation_not_permitted),
+ "boost fiber: channel is closed" };
+ } else {
+ active_ctx->wait_link( waiting_consumers_);
+ // suspend this consumer
+ active_ctx->suspend( lk);
+ }
+ } else {
+ value_type value = std::move( slots_[cidx_]);
+ cidx_ = (cidx_ + 1) % capacity_;
// notify one waiting producer
if ( ! waiting_producers_.empty() ) {
- context * producer_ctx{ & waiting_producers_.front() };
+ context * producer_ctx = & waiting_producers_.front();
waiting_producers_.pop_front();
lk.unlock();
- ctx->set_ready( producer_ctx);
+ active_ctx->schedule( producer_ctx);
}
return std::move( value);
- } else if ( channel_op_status::empty == status) {
- BOOST_ASSERT( ! ctx->wait_is_linked() );
- detail::spinlock_lock lk{ splk_ };
- if ( is_closed() ) {
- throw fiber_error{
- std::make_error_code( std::errc::operation_not_permitted),
- "boost fiber: channel is closed" };
- }
- if ( ! is_empty_() ) {
- continue;
- }
- ctx->wait_link( waiting_consumers_);
- // suspend this consumer
- ctx->suspend( lk);
- } else {
- BOOST_ASSERT( channel_op_status::closed == status);
- throw fiber_error{
- std::make_error_code( std::errc::operation_not_permitted),
- "boost fiber: channel is closed" };
}
}
}
@@ -465,41 +363,35 @@ public:
template< typename Clock, typename Duration >
channel_op_status pop_wait_until( value_type & value,
std::chrono::time_point< Clock, Duration > const& timeout_time_) {
- std::chrono::steady_clock::time_point timeout_time( detail::convert( timeout_time_) );
- context * ctx{ context::active() };
+ context * active_ctx = context::active();
+ std::chrono::steady_clock::time_point timeout_time = detail::convert( timeout_time_);
for (;;) {
- channel_op_status status{ try_pop_( value) };
- if ( channel_op_status::success == status) {
- detail::spinlock_lock lk{ splk_ };
+ detail::spinlock_lock lk{ splk_ };
+ if ( is_empty_() ) {
+ if ( is_closed_() ) {
+ return channel_op_status::closed;
+ } else {
+ active_ctx->wait_link( waiting_consumers_);
+ // suspend this consumer
+ if ( ! active_ctx->wait_until( timeout_time, lk) ) {
+ // relock local lk
+ lk.lock();
+ // remove from waiting-queue
+ waiting_consumers_.remove( * active_ctx);
+ return channel_op_status::timeout;
+ }
+ }
+ } else {
+ value = std::move( slots_[cidx_]);
+ cidx_ = (cidx_ + 1) % capacity_;
// notify one waiting producer
if ( ! waiting_producers_.empty() ) {
- context * producer_ctx{ & waiting_producers_.front() };
+ context * producer_ctx = & waiting_producers_.front();
waiting_producers_.pop_front();
lk.unlock();
- context::active()->set_ready( producer_ctx);
- }
- return status;
- } else if ( channel_op_status::empty == status) {
- BOOST_ASSERT( ! ctx->wait_is_linked() );
- detail::spinlock_lock lk{ splk_ };
- if ( is_closed() ) {
- return channel_op_status::closed;
- }
- if ( ! is_empty_() ) {
- continue;
+ active_ctx->schedule( producer_ctx);
}
- ctx->wait_link( waiting_consumers_);
- // suspend this consumer
- if ( ! ctx->wait_until( timeout_time, lk) ) {
- // relock local lk
- lk.lock();
- // remove from waiting-queue
- ctx->wait_unlink();
- return channel_op_status::timeout;
- }
- } else {
- BOOST_ASSERT( channel_op_status::closed == status);
- return status;
+ return channel_op_status::success;
}
}
}
diff --git a/boost/fiber/condition_variable.hpp b/boost/fiber/condition_variable.hpp
index 0dca7ef6a0..cd0e7cb022 100644
--- a/boost/fiber/condition_variable.hpp
+++ b/boost/fiber/condition_variable.hpp
@@ -45,8 +45,8 @@ class BOOST_FIBERS_DECL condition_variable_any {
private:
typedef context::wait_queue_t wait_queue_t;
- wait_queue_t wait_queue_{};
detail::spinlock wait_queue_splk_{};
+ wait_queue_t wait_queue_{};
public:
condition_variable_any() = default;
@@ -64,22 +64,16 @@ public:
template< typename LockType >
void wait( LockType & lt) {
- context * ctx = context::active();
+ context * active_ctx = context::active();
// atomically call lt.unlock() and block on *this
// store this fiber in waiting-queue
- detail::spinlock_lock lk( wait_queue_splk_);
- BOOST_ASSERT( ! ctx->wait_is_linked() );
- ctx->wait_link( wait_queue_);
+ detail::spinlock_lock lk{ wait_queue_splk_ };
+ BOOST_ASSERT( ! active_ctx->wait_is_linked() );
+ active_ctx->wait_link( wait_queue_);
// unlock external lt
lt.unlock();
// suspend this fiber
- ctx->suspend( lk);
- // relock local lk
- lk.lock();
- // remove from waiting-queue
- ctx->wait_unlink();
- // unlock local lk
- lk.unlock();
+ active_ctx->suspend( lk);
// relock external again before returning
try {
lt.lock();
@@ -87,7 +81,7 @@ public:
std::terminate();
}
// post-conditions
- BOOST_ASSERT( ! ctx->wait_is_linked() );
+ BOOST_ASSERT( ! active_ctx->wait_is_linked() );
}
template< typename LockType, typename Pred >
@@ -99,27 +93,26 @@ public:
template< typename LockType, typename Clock, typename Duration >
cv_status wait_until( LockType & lt, std::chrono::time_point< Clock, Duration > const& timeout_time_) {
+ context * active_ctx = context::active();
cv_status status = cv_status::no_timeout;
- std::chrono::steady_clock::time_point timeout_time(
- detail::convert( timeout_time_) );
- context * ctx = context::active();
+ std::chrono::steady_clock::time_point timeout_time = detail::convert( timeout_time_);
// atomically call lt.unlock() and block on *this
// store this fiber in waiting-queue
- detail::spinlock_lock lk( wait_queue_splk_);
- BOOST_ASSERT( ! ctx->wait_is_linked() );
- ctx->wait_link( wait_queue_);
+ detail::spinlock_lock lk{ wait_queue_splk_ };
+ BOOST_ASSERT( ! active_ctx->wait_is_linked() );
+ active_ctx->wait_link( wait_queue_);
// unlock external lt
lt.unlock();
// suspend this fiber
- if ( ! ctx->wait_until( timeout_time, lk) ) {
+ if ( ! active_ctx->wait_until( timeout_time, lk) ) {
status = cv_status::timeout;
+ // relock local lk
+ lk.lock();
+ // remove from waiting-queue
+ wait_queue_.remove( * active_ctx);
+ // unlock local lk
+ lk.unlock();
}
- // relock local lk
- lk.lock();
- // remove from waiting-queue
- ctx->wait_unlink();
- // unlock local lk
- lk.unlock();
// relock external again before returning
try {
lt.lock();
@@ -127,7 +120,7 @@ public:
std::terminate();
}
// post-conditions
- BOOST_ASSERT( ! ctx->wait_is_linked() );
+ BOOST_ASSERT( ! active_ctx->wait_is_linked() );
return status;
}
diff --git a/boost/fiber/context.hpp b/boost/fiber/context.hpp
index d873343538..773528e3a1 100644
--- a/boost/fiber/context.hpp
+++ b/boost/fiber/context.hpp
@@ -7,23 +7,32 @@
#ifndef BOOST_FIBERS_CONTEXT_H
#define BOOST_FIBERS_CONTEXT_H
+#include <iostream>
#include <atomic>
#include <chrono>
#include <exception>
#include <functional>
#include <map>
#include <memory>
+#include <tuple>
#include <type_traits>
#include <boost/assert.hpp>
#include <boost/config.hpp>
+#if defined(BOOST_NO_CXX17_STD_APPLY)
#include <boost/context/detail/apply.hpp>
-#include <boost/context/execution_context.hpp>
+#endif
+#if (BOOST_EXECUTION_CONTEXT==1)
+# include <boost/context/execution_context.hpp>
+#else
+# include <boost/context/continuation.hpp>
+#endif
#include <boost/context/stack_context.hpp>
#include <boost/intrusive/list.hpp>
#include <boost/intrusive/parent_from_member.hpp>
#include <boost/intrusive_ptr.hpp>
#include <boost/intrusive/set.hpp>
+#include <boost/intrusive/slist.hpp>
#include <boost/fiber/detail/config.hpp>
#include <boost/fiber/detail/data.hpp>
@@ -57,10 +66,10 @@ class scheduler;
namespace detail {
struct wait_tag;
-typedef intrusive::list_member_hook<
+typedef intrusive::slist_member_hook<
intrusive::tag< wait_tag >,
intrusive::link_mode<
- intrusive::auto_unlink
+ intrusive::safe_link
>
> wait_hook;
// declaration of the functor that converts between
@@ -97,21 +106,29 @@ typedef intrusive::set_member_hook<
>
> sleep_hook;
-struct terminated_tag;
+struct worker_tag;
typedef intrusive::list_member_hook<
- intrusive::tag< terminated_tag >,
+ intrusive::tag< worker_tag >,
intrusive::link_mode<
intrusive::auto_unlink
>
+> worker_hook;
+
+struct terminated_tag;
+typedef intrusive::slist_member_hook<
+ intrusive::tag< terminated_tag >,
+ intrusive::link_mode<
+ intrusive::safe_link
+ >
> terminated_hook;
-struct worker_tag;
-typedef intrusive::list_member_hook<
- intrusive::tag< worker_tag >,
+struct remote_ready_tag;
+typedef intrusive::slist_member_hook<
+ intrusive::tag< remote_ready_tag >,
intrusive::link_mode<
- intrusive::auto_unlink
+ intrusive::safe_link
>
-> worker_hook;
+> remote_ready_hook;
}
@@ -125,13 +142,17 @@ struct worker_context_t {};
const worker_context_t worker_context{};
class BOOST_FIBERS_DECL context {
+public:
+ typedef intrusive::slist<
+ context,
+ intrusive::function_hook< detail::wait_functor >,
+ intrusive::linear< true >,
+ intrusive::cache_last< true >
+ > wait_queue_t;
+
private:
friend class scheduler;
- enum flag_t {
- flag_terminated = 1 << 1
- };
-
struct fss_data {
void * vp{ nullptr };
detail::fss_cleanup_function::ptr_t cleanup_function{};
@@ -151,100 +172,102 @@ private:
}
};
- typedef std::map< uintptr_t, fss_data > fss_data_t;
+ typedef std::map< uintptr_t, fss_data > fss_data_t;
#if ! defined(BOOST_FIBERS_NO_ATOMICS)
- std::atomic< std::size_t > use_count_{ 0 };
- std::atomic< unsigned int > flags_;
- std::atomic< type > type_;
- std::atomic< scheduler * > scheduler_{ nullptr };
+ alignas(cache_alignment) std::atomic< std::size_t > use_count_{ 0 };
#else
- std::size_t use_count_{ 0 };
- unsigned int flags_;
- type type_;
- scheduler * scheduler_{ nullptr };
+ alignas(cache_alignment) std::size_t use_count_{ 0 };
+#endif
+#if ! defined(BOOST_FIBERS_NO_ATOMICS)
+ alignas(cache_alignment) detail::remote_ready_hook remote_ready_hook_{};
+ std::atomic< context * > remote_nxt_{ nullptr };
#endif
- launch policy_{ launch::post };
+ alignas(cache_alignment) detail::spinlock splk_{};
+ bool terminated_{ false };
+ wait_queue_t wait_queue_{};
+public:
+ detail::wait_hook wait_hook_{};
+private:
+ alignas(cache_alignment) scheduler * scheduler_{ nullptr };
+ fss_data_t fss_data_{};
+ detail::sleep_hook sleep_hook_{};
+ detail::ready_hook ready_hook_{};
+ detail::terminated_hook terminated_hook_{};
+ detail::worker_hook worker_hook_{};
#if (BOOST_EXECUTION_CONTEXT==1)
- boost::context::execution_context ctx_;
+ boost::context::execution_context ctx_;
#else
- boost::context::execution_context< detail::data_t * > ctx_;
+ boost::context::continuation c_;
#endif
+ fiber_properties * properties_{ nullptr };
+ std::chrono::steady_clock::time_point tp_{ (std::chrono::steady_clock::time_point::max)() };
+ type type_;
+ launch policy_;
void resume_( detail::data_t &) noexcept;
- void set_ready_( context *) noexcept;
+ void schedule_( context *) noexcept;
#if (BOOST_EXECUTION_CONTEXT==1)
template< typename Fn, typename Tpl >
void run_( Fn && fn_, Tpl && tpl_, detail::data_t * dp) noexcept {
{
- // fn and tpl must be destroyed before calling set_terminated()
+ // fn and tpl must be destroyed before calling terminate()
typename std::decay< Fn >::type fn = std::forward< Fn >( fn_);
typename std::decay< Tpl >::type tpl = std::forward< Tpl >( tpl_);
if ( nullptr != dp->lk) {
dp->lk->unlock();
} else if ( nullptr != dp->ctx) {
- active()->set_ready_( dp->ctx);
+ active()->schedule_( dp->ctx);
}
+#if defined(BOOST_NO_CXX17_STD_APPLY)
boost::context::detail::apply( std::move( fn), std::move( tpl) );
+#else
+ std::apply( std::move( fn), std::move( tpl) );
+#endif
}
// terminate context
- set_terminated();
+ terminate();
BOOST_ASSERT_MSG( false, "fiber already terminated");
}
#else
template< typename Fn, typename Tpl >
- boost::context::execution_context< detail::data_t * >
- run_( boost::context::execution_context< detail::data_t * > && ctx, Fn && fn_, Tpl && tpl_, detail::data_t * dp) noexcept {
+ boost::context::continuation
+ run_( boost::context::continuation && c, Fn && fn_, Tpl && tpl_) noexcept {
{
- // fn and tpl must be destroyed before calling set_terminated()
+ // fn and tpl must be destroyed before calling terminate()
typename std::decay< Fn >::type fn = std::forward< Fn >( fn_);
typename std::decay< Tpl >::type tpl = std::forward< Tpl >( tpl_);
- // update execution_context of calling fiber
- dp->from->ctx_ = std::move( ctx);
+ c = c.resume();
+ detail::data_t * dp = c.get_data< detail::data_t * >();
+ // update contiunation of calling fiber
+ dp->from->c_ = std::move( c);
if ( nullptr != dp->lk) {
dp->lk->unlock();
} else if ( nullptr != dp->ctx) {
- active()->set_ready_( dp->ctx);
+ active()->schedule_( dp->ctx);
}
+#if defined(BOOST_NO_CXX17_STD_APPLY)
boost::context::detail::apply( std::move( fn), std::move( tpl) );
+#else
+ std::apply( std::move( fn), std::move( tpl) );
+#endif
}
// terminate context
- return set_terminated();
+ return terminate();
}
#endif
public:
- detail::ready_hook ready_hook_{};
- detail::sleep_hook sleep_hook_{};
- detail::terminated_hook terminated_hook_{};
- detail::wait_hook wait_hook_{};
- detail::worker_hook worker_hook_{};
- std::atomic< context * > remote_nxt_{ nullptr };
- std::chrono::steady_clock::time_point tp_{ (std::chrono::steady_clock::time_point::max)() };
-
- typedef intrusive::list<
- context,
- intrusive::function_hook< detail::wait_functor >,
- intrusive::constant_time_size< false > > wait_queue_t;
-
-private:
- fss_data_t fss_data_{};
- wait_queue_t wait_queue_{};
- detail::spinlock splk_{};
- fiber_properties * properties_{ nullptr };
-
-public:
class id {
private:
context * impl_{ nullptr };
public:
- id() noexcept {
- }
+ id() = default;
explicit id( context * impl) noexcept :
- impl_( impl) {
+ impl_{ impl } {
}
bool operator==( id const& other) const noexcept {
@@ -311,9 +334,6 @@ public:
boost::context::preallocated palloc, StackAlloc salloc,
Fn && fn, Tpl && tpl) :
use_count_{ 1 }, // fiber instance or scheduler owner
- flags_{ 0 },
- type_{ type::worker_context },
- policy_{ policy },
#if (BOOST_EXECUTION_CONTEXT==1)
# if defined(BOOST_NO_CXX14_GENERIC_LAMBDAS)
ctx_{ std::allocator_arg, palloc, salloc,
@@ -325,50 +345,66 @@ public:
std::forward< Fn >( fn),
std::forward< Tpl >( tpl),
boost::context::execution_context::current() )
- }
+ },
+ type_{ type::worker_context },
+ policy_{ policy }
# else
ctx_{ std::allocator_arg, palloc, salloc,
[this,fn=detail::decay_copy( std::forward< Fn >( fn) ),tpl=std::forward< Tpl >( tpl),
ctx=boost::context::execution_context::current()] (void * vp) mutable noexcept {
run_( std::move( fn), std::move( tpl), static_cast< detail::data_t * >( vp) );
- }}
+ }},
+ type_{ type::worker_context },
+ policy_{ policy }
# endif
+ {}
#else
+ c_{},
+ type_{ type::worker_context },
+ policy_{ policy }
+ {
# if defined(BOOST_NO_CXX14_GENERIC_LAMBDAS)
- ctx_{ std::allocator_arg, palloc, salloc,
- detail::wrap(
- [this]( typename std::decay< Fn >::type & fn, typename std::decay< Tpl >::type & tpl,
- boost::context::execution_context< detail::data_t * > && ctx, detail::data_t * dp) mutable noexcept {
- return run_( std::forward< boost::context::execution_context< detail::data_t * > >( ctx), std::move( fn), std::move( tpl), dp);
- },
- std::forward< Fn >( fn),
- std::forward< Tpl >( tpl) )}
-
+ c_ = boost::context::callcc(
+ std::allocator_arg, palloc, salloc,
+ detail::wrap(
+ [this]( typename std::decay< Fn >::type & fn, typename std::decay< Tpl >::type & tpl,
+ boost::context::continuation && c) mutable noexcept {
+ return run_( std::forward< boost::context::continuation >( c), std::move( fn), std::move( tpl) );
+ },
+ std::forward< Fn >( fn),
+ std::forward< Tpl >( tpl) ) );
# else
- ctx_{ std::allocator_arg, palloc, salloc,
- [this,fn=detail::decay_copy( std::forward< Fn >( fn) ),tpl=std::forward< Tpl >( tpl)]
- (boost::context::execution_context< detail::data_t * > && ctx, detail::data_t * dp) mutable noexcept {
- return run_( std::forward< boost::context::execution_context< detail::data_t * > >( ctx), std::move( fn), std::move( tpl), dp);
- }}
+ c_ = boost::context::callcc(
+ std::allocator_arg, palloc, salloc,
+ [this,fn=detail::decay_copy( std::forward< Fn >( fn) ),tpl=std::forward< Tpl >( tpl)]
+ (boost::context::continuation && c) mutable noexcept {
+ return run_( std::forward< boost::context::continuation >( c), std::move( fn), std::move( tpl) );
+ });
# endif
+ }
#endif
- {}
context( context const&) = delete;
context & operator=( context const&) = delete;
+ friend bool
+ operator==( context const& lhs, context const& rhs) noexcept {
+ return & lhs == & rhs;
+ }
+
virtual ~context();
scheduler * get_scheduler() const noexcept {
-#if ! defined(BOOST_FIBERS_NO_ATOMICS)
- return scheduler_.load( std::memory_order_relaxed);
-#else
return scheduler_;
-#endif
}
id get_id() const noexcept;
+ bool is_resumable() const noexcept {
+ if ( c_) return true;
+ else return false;
+ }
+
void resume() noexcept;
void resume( detail::spinlock_lock &) noexcept;
void resume( context *) noexcept;
@@ -377,10 +413,10 @@ public:
void suspend( detail::spinlock_lock &) noexcept;
#if (BOOST_EXECUTION_CONTEXT==1)
- void set_terminated() noexcept;
+ void terminate() noexcept;
#else
- boost::context::execution_context< detail::data_t * > suspend_with_cc() noexcept;
- boost::context::execution_context< detail::data_t * > set_terminated() noexcept;
+ boost::context::continuation suspend_with_cc() noexcept;
+ boost::context::continuation terminate() noexcept;
#endif
void join();
@@ -390,16 +426,12 @@ public:
bool wait_until( std::chrono::steady_clock::time_point const&,
detail::spinlock_lock &) noexcept;
- void set_ready( context *) noexcept;
+ void schedule( context *) noexcept;
bool is_context( type t) const noexcept {
return type::none != ( type_ & t);
}
- bool is_terminated() const noexcept {
- return 0 != ( flags_ & flag_terminated);
- }
-
void * get_fss_data( void const * vp) const;
void set_fss_data(
@@ -418,77 +450,90 @@ public:
return policy_;
}
+ bool worker_is_linked() const noexcept;
+
bool ready_is_linked() const noexcept;
+ bool remote_ready_is_linked() const noexcept;
+
bool sleep_is_linked() const noexcept;
bool terminated_is_linked() const noexcept;
bool wait_is_linked() const noexcept;
- bool worker_is_linked() const noexcept;
+ template< typename List >
+ void worker_link( List & lst) noexcept {
+ static_assert( std::is_same< typename List::value_traits::hook_type, detail::worker_hook >::value, "not a worker-queue");
+ BOOST_ASSERT( ! worker_is_linked() );
+ lst.push_back( * this);
+ }
template< typename List >
void ready_link( List & lst) noexcept {
static_assert( std::is_same< typename List::value_traits::hook_type, detail::ready_hook >::value, "not a ready-queue");
+ BOOST_ASSERT( ! ready_is_linked() );
+ lst.push_back( * this);
+ }
+
+ template< typename List >
+ void remote_ready_link( List & lst) noexcept {
+ static_assert( std::is_same< typename List::value_traits::hook_type, detail::remote_ready_hook >::value, "not a remote-ready-queue");
+ BOOST_ASSERT( ! remote_ready_is_linked() );
lst.push_back( * this);
}
template< typename Set >
void sleep_link( Set & set) noexcept {
static_assert( std::is_same< typename Set::value_traits::hook_type,detail::sleep_hook >::value, "not a sleep-queue");
+ BOOST_ASSERT( ! sleep_is_linked() );
set.insert( * this);
}
template< typename List >
void terminated_link( List & lst) noexcept {
static_assert( std::is_same< typename List::value_traits::hook_type, detail::terminated_hook >::value, "not a terminated-queue");
+ BOOST_ASSERT( ! terminated_is_linked() );
lst.push_back( * this);
}
template< typename List >
void wait_link( List & lst) noexcept {
static_assert( std::is_same< typename List::value_traits::hook_type, detail::wait_hook >::value, "not a wait-queue");
+ BOOST_ASSERT( ! wait_is_linked() );
lst.push_back( * this);
}
- template< typename List >
- void worker_link( List & lst) noexcept {
- static_assert( std::is_same< typename List::value_traits::hook_type, detail::worker_hook >::value, "not a worker-queue");
- lst.push_back( * this);
- }
+ void worker_unlink() noexcept;
void ready_unlink() noexcept;
void sleep_unlink() noexcept;
- void wait_unlink() noexcept;
-
- void worker_unlink() noexcept;
-
void detach() noexcept;
void attach( context *) noexcept;
friend void intrusive_ptr_add_ref( context * ctx) noexcept {
BOOST_ASSERT( nullptr != ctx);
- ++ctx->use_count_;
+ ctx->use_count_.fetch_add( 1, std::memory_order_relaxed);
}
friend void intrusive_ptr_release( context * ctx) noexcept {
BOOST_ASSERT( nullptr != ctx);
- if ( 0 == --ctx->use_count_) {
+ if ( 1 == ctx->use_count_.fetch_sub( 1, std::memory_order_release) ) {
+ std::atomic_thread_fence( std::memory_order_acquire);
#if (BOOST_EXECUTION_CONTEXT==1)
- boost::context::execution_context ec( ctx->ctx_);
+ boost::context::execution_context ec = ctx->ctx_;
// destruct context
// deallocates stack (execution_context is ref counted)
ctx->~context();
#else
- boost::context::execution_context< detail::data_t * > cc( std::move( ctx->ctx_) );
+ boost::context::continuation c = std::move( ctx->c_);
// destruct context
ctx->~context();
// deallocated stack
- cc( nullptr);
+ c.resume( nullptr);
#endif
}
}
@@ -521,14 +566,14 @@ static intrusive_ptr< context > make_worker_context( launch policy,
const std::size_t size = sctx.size - ( static_cast< char * >( sctx.sp) - static_cast< char * >( sp) );
#endif
// placement new of context on top of fiber's stack
- return intrusive_ptr< context >(
- ::new ( sp) context(
+ return intrusive_ptr< context >{
+ ::new ( sp) context{
worker_context,
policy,
- boost::context::preallocated( sp, size, sctx),
+ boost::context::preallocated{ sp, size, sctx },
salloc,
std::forward< Fn >( fn),
- std::make_tuple( std::forward< Args >( args) ... ) ) );
+ std::make_tuple( std::forward< Args >( args) ... ) } };
}
namespace detail {
diff --git a/boost/fiber/detail/config.hpp b/boost/fiber/detail/config.hpp
index f65d48910d..7c7119e1fb 100644
--- a/boost/fiber/detail/config.hpp
+++ b/boost/fiber/detail/config.hpp
@@ -52,7 +52,7 @@
#endif
#if !defined(BOOST_FIBERS_SPIN_MAX_TESTS)
-# define BOOST_FIBERS_SPIN_MAX_TESTS 100
+# define BOOST_FIBERS_SPIN_MAX_TESTS 500
#endif
// modern architectures have cachelines with 64byte length
diff --git a/boost/fiber/detail/context_mpsc_queue.hpp b/boost/fiber/detail/context_mpsc_queue.hpp
deleted file mode 100644
index f7e664659c..0000000000
--- a/boost/fiber/detail/context_mpsc_queue.hpp
+++ /dev/null
@@ -1,98 +0,0 @@
-
-// Copyright Dmitry Vyukov 2010-2011.
-// Copyright Oliver Kowalke 2016.
-// Distributed under the Boost Software License, Version 1.0.
-// (See accompanying file LICENSE_1_0.txt or copy at
-// http://www.boost.org/LICENSE_1_0.txt)
-//
-// based on Dmitry Vyukov's intrusive MPSC queue
-// http://www.1024cores.net/home/lock-free-algorithms/queues/intrusive-mpsc-node-based-queue
-// https://groups.google.com/forum/#!topic/lock-free/aFHvZhu1G-0
-
-#ifndef BOOST_FIBERS_DETAIL_CONTEXT_MPSC_QUEUE_H
-#define BOOST_FIBERS_DETAIL_CONTEXT_MPSC_QUEUE_H
-
-#include <atomic>
-#include <memory>
-#include <type_traits>
-
-#include <boost/assert.hpp>
-#include <boost/config.hpp>
-
-#include <boost/fiber/context.hpp>
-#include <boost/fiber/detail/config.hpp>
-
-#ifdef BOOST_HAS_ABI_HEADERS
-# include BOOST_ABI_PREFIX
-#endif
-
-namespace boost {
-namespace fibers {
-namespace detail {
-
-// a MPSC queue
-// multiple threads push ready fibers (belonging to local scheduler)
-// (thread) local scheduler pops fibers
-class context_mpsc_queue {
-private:
- // not default constructor for context - use aligned_storage instead
- alignas(cache_alignment) std::aligned_storage< sizeof( context), alignof( context) >::type storage_{};
- context * dummy_;
- alignas(cache_alignment) std::atomic< context * > head_;
- alignas(cache_alignment) context * tail_;
- char pad_[cacheline_length];
-
-public:
- context_mpsc_queue() :
- dummy_{ reinterpret_cast< context * >( std::addressof( storage_) ) },
- head_{ dummy_ },
- tail_{ dummy_ } {
- dummy_->remote_nxt_.store( nullptr, std::memory_order_release);
- }
-
- context_mpsc_queue( context_mpsc_queue const&) = delete;
- context_mpsc_queue & operator=( context_mpsc_queue const&) = delete;
-
- void push( context * ctx) noexcept {
- BOOST_ASSERT( nullptr != ctx);
- ctx->remote_nxt_.store( nullptr, std::memory_order_release);
- context * prev = head_.exchange( ctx, std::memory_order_acq_rel);
- prev->remote_nxt_.store( ctx, std::memory_order_release);
- }
-
- context * pop() noexcept {
- context * tail = tail_;
- context * next = tail->remote_nxt_.load( std::memory_order_acquire);
- if ( dummy_ == tail) {
- if ( nullptr == next) {
- return nullptr;
- }
- tail_ = next;
- tail = next;
- next = next->remote_nxt_.load( std::memory_order_acquire);;
- }
- if ( nullptr != next) {
- tail_ = next;
- return tail;
- }
- context * head = head_.load( std::memory_order_acquire);
- if ( tail != head) {
- return nullptr;
- }
- push( dummy_);
- next = tail->remote_nxt_.load( std::memory_order_acquire);
- if ( nullptr != next) {
- tail_= next;
- return tail;
- }
- return nullptr;
- }
-};
-
-}}}
-
-#ifdef BOOST_HAS_ABI_HEADERS
-# include BOOST_ABI_SUFFIX
-#endif
-
-#endif // BOOST_FIBERS_DETAIL_CONTEXT_MPSC_QUEUE_H
diff --git a/boost/fiber/detail/context_spinlock_queue.hpp b/boost/fiber/detail/context_spinlock_queue.hpp
new file mode 100644
index 0000000000..e0ebdabda6
--- /dev/null
+++ b/boost/fiber/detail/context_spinlock_queue.hpp
@@ -0,0 +1,118 @@
+
+// Copyright Oliver Kowalke 2015.
+// Distributed under the Boost Software License, Version 1.0.
+// (See accompanying file LICENSE_1_0.txt or copy at
+// http://www.boost.org/LICENSE_1_0.txt)
+//
+
+#ifndef BOOST_FIBERS_DETAIL_SPINLOCK_QUEUE_H
+#define BOOST_FIBERS_DETAIL_SPINLOCK_QUEUE_H
+
+#include <cstddef>
+#include <cstring>
+#include <mutex>
+
+#include <boost/config.hpp>
+
+#include <boost/fiber/context.hpp>
+#include <boost/fiber/detail/config.hpp>
+#include <boost/fiber/detail/spinlock.hpp>
+
+#ifdef BOOST_HAS_ABI_HEADERS
+# include BOOST_ABI_PREFIX
+#endif
+
+namespace boost {
+namespace fibers {
+namespace detail {
+
+class context_spinlock_queue {
+private:
+ typedef context * slot_type;
+
+ alignas(cache_alignment) mutable spinlock splk_{};
+ std::size_t pidx_{ 0 };
+ std::size_t cidx_{ 0 };
+ std::size_t capacity_;
+ slot_type * slots_;
+
+ void resize_() {
+ slot_type * old_slots = slots_;
+ slots_ = new slot_type[2*capacity_];
+ std::size_t offset = capacity_ - cidx_;
+ std::memcpy( slots_, old_slots + cidx_, offset * sizeof( slot_type) );
+ if ( 0 < cidx_) {
+ std::memcpy( slots_ + offset, old_slots, pidx_ * sizeof( slot_type) );
+ }
+ cidx_ = 0;
+ pidx_ = capacity_ - 1;
+ capacity_ *= 2;
+ delete [] old_slots;
+ }
+
+ bool is_full_() const noexcept {
+ return cidx_ == ((pidx_ + 1) % capacity_);
+ }
+
+ bool is_empty_() const noexcept {
+ return cidx_ == pidx_;
+ }
+
+public:
+ context_spinlock_queue( std::size_t capacity = 4096) :
+ capacity_{ capacity } {
+ slots_ = new slot_type[capacity_];
+ }
+
+ ~context_spinlock_queue() {
+ delete [] slots_;
+ }
+
+ context_spinlock_queue( context_spinlock_queue const&) = delete;
+ context_spinlock_queue & operator=( context_spinlock_queue const&) = delete;
+
+ bool empty() const noexcept {
+ spinlock_lock lk{ splk_ };
+ return is_empty_();
+ }
+
+ void push( context * c) {
+ spinlock_lock lk{ splk_ };
+ if ( is_full_() ) {
+ resize_();
+ }
+ slots_[pidx_] = c;
+ pidx_ = (pidx_ + 1) % capacity_;
+ }
+
+ context * pop() {
+ spinlock_lock lk{ splk_ };
+ context * c = nullptr;
+ if ( ! is_empty_() ) {
+ c = slots_[cidx_];
+ cidx_ = (cidx_ + 1) % capacity_;
+ }
+ return c;
+ }
+
+ context * steal() {
+ spinlock_lock lk{ splk_ };
+ context * c = nullptr;
+ if ( ! is_empty_() ) {
+ c = slots_[cidx_];
+ if ( c->is_context( type::pinned_context) ) {
+ return nullptr;
+ }
+ cidx_ = (cidx_ + 1) % capacity_;
+ }
+ return c;
+ }
+};
+
+}}}
+
+#ifdef BOOST_HAS_ABI_HEADERS
+# include BOOST_ABI_SUFFIX
+#endif
+
+#endif // BOOST_FIBERS_DETAIL_SPINLOCK_QUEUE_H
diff --git a/boost/fiber/detail/context_spmc_queue.hpp b/boost/fiber/detail/context_spmc_queue.hpp
index 6449e3658f..27256233cf 100644
--- a/boost/fiber/detail/context_spmc_queue.hpp
+++ b/boost/fiber/detail/context_spmc_queue.hpp
@@ -30,6 +30,11 @@
// In Proceedings of the 18th ACM SIGPLAN symposium on Principles and practice
// of parallel programming (PPoPP '13). ACM, New York, NY, USA, 69-80.
+#if BOOST_COMP_CLANG
+#pragma clang diagnostic push
+#pragma clang diagnostic ignored "-Wunused-private-field"
+#endif
+
namespace boost {
namespace fibers {
namespace detail {
@@ -43,43 +48,43 @@ private:
sizeof( atomic_type), cache_alignment
>::type storage_type;
- std::size_t size_;
+ std::size_t capacity_;
storage_type * storage_;
public:
- array( std::size_t size) :
- size_{ size },
- storage_{ new storage_type[size_] } {
- for ( std::size_t i = 0; i < size_; ++i) {
+ array( std::size_t capacity) :
+ capacity_{ capacity },
+ storage_{ new storage_type[capacity_] } {
+ for ( std::size_t i = 0; i < capacity_; ++i) {
::new ( static_cast< void * >( std::addressof( storage_[i]) ) ) atomic_type{ nullptr };
}
}
~array() {
- for ( std::size_t i = 0; i < size_; ++i) {
+ for ( std::size_t i = 0; i < capacity_; ++i) {
reinterpret_cast< atomic_type * >( std::addressof( storage_[i]) )->~atomic_type();
}
delete [] storage_;
}
- std::size_t size() const noexcept {
- return size_;
+ std::size_t capacity() const noexcept {
+ return capacity_;
}
void push( std::size_t bottom, context * ctx) noexcept {
reinterpret_cast< atomic_type * >(
- std::addressof( storage_[bottom % size_]) )
+ std::addressof( storage_[bottom % capacity_]) )
->store( ctx, std::memory_order_relaxed);
}
context * pop( std::size_t top) noexcept {
return reinterpret_cast< atomic_type * >(
- std::addressof( storage_[top % size_]) )
+ std::addressof( storage_[top % capacity_]) )
->load( std::memory_order_relaxed);
}
array * resize( std::size_t bottom, std::size_t top) {
- std::unique_ptr< array > tmp{ new array{ 2 * size_ } };
+ std::unique_ptr< array > tmp{ new array{ 2 * capacity_ } };
for ( std::size_t i = top; i != bottom; ++i) {
tmp->push( i, pop( i) );
}
@@ -87,15 +92,15 @@ private:
}
};
- alignas(cache_alignment) std::atomic< std::size_t > top_{ 0 };
- alignas(cache_alignment) std::atomic< std::size_t > bottom_{ 0 };
+ alignas(cache_alignment) std::atomic< std::size_t > top_{ 0 };
+ alignas(cache_alignment) std::atomic< std::size_t > bottom_{ 0 };
alignas(cache_alignment) std::atomic< array * > array_;
- std::vector< array * > old_arrays_{};
+ std::vector< array * > old_arrays_{};
char padding_[cacheline_length];
public:
- context_spmc_queue() :
- array_{ new array{ 1024 } } {
+ context_spmc_queue( std::size_t capacity = 4096) :
+ array_{ new array{ capacity } } {
old_arrays_.reserve( 32);
}
@@ -110,19 +115,19 @@ public:
context_spmc_queue & operator=( context_spmc_queue const&) = delete;
bool empty() const noexcept {
- std::size_t bottom{ bottom_.load( std::memory_order_relaxed) };
- std::size_t top{ top_.load( std::memory_order_relaxed) };
+ std::size_t bottom = bottom_.load( std::memory_order_relaxed);
+ std::size_t top = top_.load( std::memory_order_relaxed);
return bottom <= top;
}
void push( context * ctx) {
- std::size_t bottom{ bottom_.load( std::memory_order_relaxed) };
- std::size_t top{ top_.load( std::memory_order_acquire) };
- array * a{ array_.load( std::memory_order_relaxed) };
- if ( (a->size() - 1) < (bottom - top) ) {
+ std::size_t bottom = bottom_.load( std::memory_order_relaxed);
+ std::size_t top = top_.load( std::memory_order_acquire);
+ array * a = array_.load( std::memory_order_relaxed);
+ if ( (a->capacity() - 1) < (bottom - top) ) {
// queue is full
// resize
- array * tmp{ a->resize( bottom, top) };
+ array * tmp = a->resize( bottom, top);
old_arrays_.push_back( a);
std::swap( a, tmp);
array_.store( a, std::memory_order_relaxed);
@@ -133,16 +138,48 @@ public:
}
context * pop() {
- std::size_t top{ top_.load( std::memory_order_acquire) };
+ std::size_t bottom = bottom_.load( std::memory_order_relaxed) - 1;
+ array * a = array_.load( std::memory_order_relaxed);
+ bottom_.store( bottom, std::memory_order_relaxed);
std::atomic_thread_fence( std::memory_order_seq_cst);
- std::size_t bottom{ bottom_.load( std::memory_order_acquire) };
- context * ctx{ nullptr };
+ std::size_t top = top_.load( std::memory_order_relaxed);
+ context * ctx = nullptr;
+ if ( top <= bottom) {
+ // queue is not empty
+ ctx = a->pop( bottom);
+ BOOST_ASSERT( nullptr != ctx);
+ if ( top == bottom) {
+ // last element dequeued
+ if ( ! top_.compare_exchange_strong( top, top + 1,
+ std::memory_order_seq_cst,
+ std::memory_order_relaxed) ) {
+ // lose the race
+ ctx = nullptr;
+ }
+ bottom_.store( bottom + 1, std::memory_order_relaxed);
+ }
+ } else {
+ // queue is empty
+ bottom_.store( bottom + 1, std::memory_order_relaxed);
+ }
+ return ctx;
+ }
+
+ context * steal() {
+ std::size_t top = top_.load( std::memory_order_acquire);
+ std::atomic_thread_fence( std::memory_order_seq_cst);
+ std::size_t bottom = bottom_.load( std::memory_order_acquire);
+ context * ctx = nullptr;
if ( top < bottom) {
// queue is not empty
- array * a{ array_.load( std::memory_order_consume) };
+ array * a = array_.load( std::memory_order_consume);
ctx = a->pop( top);
- if ( ctx->is_context( type::pinned_context) ||
- ! top_.compare_exchange_strong( top, top + 1,
+ BOOST_ASSERT( nullptr != ctx);
+ // do not steal pinned context (e.g. main-/dispatcher-context)
+ if ( ctx->is_context( type::pinned_context) ) {
+ return nullptr;
+ }
+ if ( ! top_.compare_exchange_strong( top, top + 1,
std::memory_order_seq_cst,
std::memory_order_relaxed) ) {
// lose the race
@@ -155,4 +192,8 @@ public:
}}}
+#if BOOST_COMP_CLANG
+#pragma clang diagnostic pop
+#endif
+
#endif // BOOST_FIBERS_DETAIL_CONTEXT_SPMC_QUEUE_H
diff --git a/boost/fiber/detail/cpu_relax.hpp b/boost/fiber/detail/cpu_relax.hpp
index d00020a23b..541b46dfd0 100644
--- a/boost/fiber/detail/cpu_relax.hpp
+++ b/boost/fiber/detail/cpu_relax.hpp
@@ -7,6 +7,7 @@
#ifndef BOOST_FIBERS_DETAIL_CPU_RELAX_H
#define BOOST_FIBERS_DETAIL_CPU_RELAX_H
+#include <chrono>
#include <thread>
#include <boost/config.hpp>
@@ -14,7 +15,7 @@
#include <boost/fiber/detail/config.hpp>
-#if BOOST_COMP_MSVC
+#if BOOST_COMP_MSVC || BOOST_COMP_MSVC_EMULATED
# include <Windows.h>
#endif
@@ -29,22 +30,47 @@ namespace detail {
#if BOOST_ARCH_ARM
# if BOOST_COMP_MSVC
# define cpu_relax() YieldProcessor();
-# else
+# elif (defined(__ARM_ARCH_6K__) || \
+ defined(__ARM_ARCH_6Z__) || \
+ defined(__ARM_ARCH_6ZK__) || \
+ defined(__ARM_ARCH_6T2__) || \
+ defined(__ARM_ARCH_7__) || \
+ defined(__ARM_ARCH_7A__) || \
+ defined(__ARM_ARCH_7R__) || \
+ defined(__ARM_ARCH_7M__) || \
+ defined(__ARM_ARCH_7S__) || \
+ defined(__ARM_ARCH_8A__) || \
+ defined(__aarch64__))
+// http://groups.google.com/a/chromium.org/forum/#!msg/chromium-dev/YGVrZbxYOlU/Vpgy__zeBQAJ
+// mnemonic 'yield' is supported from ARMv6k onwards
# define cpu_relax() asm volatile ("yield" ::: "memory");
+# else
+# define cpu_relax() asm volatile ("nop" ::: "memory");
# endif
#elif BOOST_ARCH_MIPS
# define cpu_relax() asm volatile ("pause" ::: "memory");
#elif BOOST_ARCH_PPC
+// http://code.metager.de/source/xref/gnu/glibc/sysdeps/powerpc/sys/platform/ppc.h
+// http://stackoverflow.com/questions/5425506/equivalent-of-x86-pause-instruction-for-ppc
+// mnemonic 'or' shared resource hints
+// or 27, 27, 27 This form of 'or' provides a hint that performance
+// will probably be imrpoved if shared resources dedicated
+// to the executing processor are released for use by other
+// processors
+// extended mnemonics (available with POWER7)
+// yield == or 27, 27, 27
# define cpu_relax() asm volatile ("or 27,27,27" ::: "memory");
#elif BOOST_ARCH_X86
-# if BOOST_COMP_MSVC
+# if BOOST_COMP_MSVC || BOOST_COMP_MSVC_EMULATED
# define cpu_relax() YieldProcessor();
# else
# define cpu_relax() asm volatile ("pause" ::: "memory");
# endif
#else
-# warning "architecture does not support yield/pause mnemonic"
-# define cpu_relax() std::this_thread::yield();
+# define cpu_relax() { \
+ static constexpr std::chrono::microseconds us0{ 0 }; \
+ std::this_thread::sleep_for( us0); \
+ }
#endif
}}}
diff --git a/boost/fiber/detail/data.hpp b/boost/fiber/detail/data.hpp
index 24e833a9e8..e2b119ec3e 100644
--- a/boost/fiber/detail/data.hpp
+++ b/boost/fiber/detail/data.hpp
@@ -28,7 +28,7 @@ struct data_t {
spinlock_lock * lk{ nullptr };
context * ctx{ nullptr };
- data_t() noexcept = default;
+ data_t() = default;
explicit data_t( spinlock_lock * lk_) noexcept :
lk{ lk_ } {
diff --git a/boost/fiber/detail/fss.hpp b/boost/fiber/detail/fss.hpp
index 54dc5b79d3..27a7d67f26 100644
--- a/boost/fiber/detail/fss.hpp
+++ b/boost/fiber/detail/fss.hpp
@@ -38,12 +38,13 @@ public:
friend inline
void intrusive_ptr_add_ref( fss_cleanup_function * p) noexcept {
- ++p->use_count_;
+ p->use_count_.fetch_add( 1, std::memory_order_relaxed);
}
friend inline
void intrusive_ptr_release( fss_cleanup_function * p) noexcept {
- if ( --p->use_count_ == 0) {
+ if ( 1 == p->use_count_.fetch_sub( 1, std::memory_order_release) ) {
+ std::atomic_thread_fence( std::memory_order_acquire);
delete p;
}
}
diff --git a/boost/fiber/detail/futex.hpp b/boost/fiber/detail/futex.hpp
index 4c966867c5..d383dc4077 100644
--- a/boost/fiber/detail/futex.hpp
+++ b/boost/fiber/detail/futex.hpp
@@ -49,7 +49,7 @@ int futex_wake( std::atomic< std::int32_t > * addr) {
inline
int futex_wait( std::atomic< std::int32_t > * addr, std::int32_t x) {
- ::WaitOnAddress( static_cast< volatile void * >( addr), & x, sizeof( x), -1);
+ ::WaitOnAddress( static_cast< volatile void * >( addr), & x, sizeof( x), INFINITE);
return 0;
}
#else
diff --git a/boost/fiber/detail/spinlock_ttas.hpp b/boost/fiber/detail/spinlock_ttas.hpp
index d64630d84d..380773ad6d 100644
--- a/boost/fiber/detail/spinlock_ttas.hpp
+++ b/boost/fiber/detail/spinlock_ttas.hpp
@@ -19,6 +19,11 @@
// https://software.intel.com/en-us/articles/benefitting-power-and-performance-sleep-loops
// https://software.intel.com/en-us/articles/long-duration-spin-wait-loops-on-hyper-threading-technology-enabled-intel-processors
+#if BOOST_COMP_CLANG
+#pragma clang diagnostic push
+#pragma clang diagnostic ignored "-Wunused-private-field"
+#endif
+
namespace boost {
namespace fibers {
namespace detail {
@@ -30,10 +35,7 @@ private:
unlocked
};
- // align shared variable 'state_' at cache line to prevent false sharing
- alignas(cache_alignment) std::atomic< spinlock_status > state_{ spinlock_status::unlocked };
- // padding to avoid other data one the cacheline of shared variable 'state_'
- char pad[cacheline_length];
+ std::atomic< spinlock_status > state_{ spinlock_status::unlocked };
public:
spinlock_ttas() noexcept = default;
@@ -63,20 +65,15 @@ public:
// delays the next instruction's execution for a finite period of time (depends on processor family)
// the CPU is not under demand, parts of the pipeline are no longer being used
// -> reduces the power consumed by the CPU
+ // -> prevent pipeline stalls
cpu_relax();
- } else if ( BOOST_FIBERS_SPIN_MAX_TESTS + 20 > tests) {
- ++tests;
+ } else {
// std::this_thread::sleep_for( 0us) has a fairly long instruction path length,
// combined with an expensive ring3 to ring 0 transition costing about 1000 cycles
// std::this_thread::sleep_for( 0us) lets give up this_thread the remaining part of its time slice
// if and only if a thread of equal or greater priority is ready to run
static constexpr std::chrono::microseconds us0{ 0 };
std::this_thread::sleep_for( us0);
- } else {
- // std::this_thread::yield() allows this_thread to give up the remaining part of its time slice,
- // but only to another thread on the same processor
- // instead of constant checking, a thread only checks if no other useful work is pending
- std::this_thread::yield();
}
#else
std::this_thread::yield();
@@ -89,10 +86,12 @@ public:
// utilize 'Binary Exponential Backoff' algorithm
// linear_congruential_engine is a random number engine based on Linear congruential generator (LCG)
static thread_local std::minstd_rand generator;
- const std::size_t z =
- std::uniform_int_distribution< std::size_t >{ 0, static_cast< std::size_t >( 1) << collisions }( generator);
+ static std::uniform_int_distribution< std::size_t > distribution{ 0, static_cast< std::size_t >( 1) << collisions };
+ const std::size_t z = distribution( generator);
++collisions;
for ( std::size_t i = 0; i < z; ++i) {
+ // -> reduces the power consumed by the CPU
+ // -> prevent pipeline stalls
cpu_relax();
}
} else {
@@ -109,4 +108,8 @@ public:
}}}
+#if BOOST_COMP_CLANG
+#pragma clang diagnostic pop
+#endif
+
#endif // BOOST_FIBERS_SPINLOCK_TTAS_H
diff --git a/boost/fiber/detail/spinlock_ttas_adaptive.hpp b/boost/fiber/detail/spinlock_ttas_adaptive.hpp
index c6a9a57d79..da044b6298 100644
--- a/boost/fiber/detail/spinlock_ttas_adaptive.hpp
+++ b/boost/fiber/detail/spinlock_ttas_adaptive.hpp
@@ -31,11 +31,8 @@ private:
unlocked
};
- // align shared variable 'state_' at cache line to prevent false sharing
- alignas(cache_alignment) std::atomic< spinlock_status > state_{ spinlock_status::unlocked };
- std::atomic< std::size_t > tests_{ 0 };
- // padding to avoid other data one the cacheline of shared variable 'state_'
- char pad[cacheline_length];
+ std::atomic< spinlock_status > state_{ spinlock_status::unlocked };
+ std::atomic< std::size_t > tests_{ 0 };
public:
spinlock_ttas_adaptive() noexcept = default;
@@ -67,8 +64,9 @@ public:
// delays the next instruction's execution for a finite period of time (depends on processor family)
// the CPU is not under demand, parts of the pipeline are no longer being used
// -> reduces the power consumed by the CPU
+ // -> prevent pipeline stalls
cpu_relax();
- } else if ( BOOST_FIBERS_SPIN_MAX_TESTS + 20 > tests) {
+ } else {
++tests;
// std::this_thread::sleep_for( 0us) has a fairly long instruction path length,
// combined with an expensive ring3 to ring 0 transition costing about 1000 cycles
@@ -76,11 +74,6 @@ public:
// if and only if a thread of equal or greater priority is ready to run
static constexpr std::chrono::microseconds us0{ 0 };
std::this_thread::sleep_for( us0);
- } else {
- // std::this_thread::yield() allows this_thread to give up the remaining part of its time slice,
- // but only to another thread on the same processor
- // instead of constant checking, a thread only checks if no other useful work is pending
- std::this_thread::yield();
}
#else
std::this_thread::yield();
@@ -93,10 +86,12 @@ public:
// utilize 'Binary Exponential Backoff' algorithm
// linear_congruential_engine is a random number engine based on Linear congruential generator (LCG)
static thread_local std::minstd_rand generator;
- const std::size_t z =
- std::uniform_int_distribution< std::size_t >{ 0, static_cast< std::size_t >( 1) << collisions }( generator);
+ static std::uniform_int_distribution< std::size_t > distribution{ 0, static_cast< std::size_t >( 1) << collisions };
+ const std::size_t z = distribution( generator);
++collisions;
for ( std::size_t i = 0; i < z; ++i) {
+ // -> reduces the power consumed by the CPU
+ // -> prevent pipeline stalls
cpu_relax();
}
} else {
diff --git a/boost/fiber/detail/spinlock_ttas_adaptive_futex.hpp b/boost/fiber/detail/spinlock_ttas_adaptive_futex.hpp
index fbd6a0e4d2..61ab47691e 100644
--- a/boost/fiber/detail/spinlock_ttas_adaptive_futex.hpp
+++ b/boost/fiber/detail/spinlock_ttas_adaptive_futex.hpp
@@ -26,11 +26,8 @@ namespace detail {
class spinlock_ttas_adaptive_futex {
private:
- // align shared variable 'value_' at cache line to prevent false sharing
- alignas(cache_alignment) std::atomic< std::int32_t > value_{ 0 };
- std::atomic< std::int32_t > tests_{ 0 };
- // padding to avoid other data one the cacheline of shared variable 'value_'
- char pad_[cacheline_length];
+ std::atomic< std::int32_t > value_{ 0 };
+ std::atomic< std::int32_t > tests_{ 0 };
public:
spinlock_ttas_adaptive_futex() noexcept = default;
@@ -61,6 +58,7 @@ public:
// delays the next instruction's execution for a finite period of time (depends on processor family)
// the CPU is not under demand, parts of the pipeline are no longer being used
// -> reduces the power consumed by the CPU
+ // -> prevent pipeline stalls
cpu_relax();
#else
// std::this_thread::yield() allows this_thread to give up the remaining part of its time slice,
@@ -73,10 +71,12 @@ public:
// utilize 'Binary Exponential Backoff' algorithm
// linear_congruential_engine is a random number engine based on Linear congruential generator (LCG)
static thread_local std::minstd_rand generator;
- const std::int32_t z = std::uniform_int_distribution< std::int32_t >{
- 0, static_cast< std::int32_t >( 1) << collisions }( generator);
+ static std::uniform_int_distribution< std::int32_t > distribution{ 0, static_cast< std::int32_t >( 1) << collisions };
+ const std::int32_t z = distribution( generator);
++collisions;
for ( std::int32_t i = 0; i < z; ++i) {
+ // -> reduces the power consumed by the CPU
+ // -> prevent pipeline stalls
cpu_relax();
}
} else {
diff --git a/boost/fiber/detail/spinlock_ttas_futex.hpp b/boost/fiber/detail/spinlock_ttas_futex.hpp
index b11e63b587..a427b73ba5 100644
--- a/boost/fiber/detail/spinlock_ttas_futex.hpp
+++ b/boost/fiber/detail/spinlock_ttas_futex.hpp
@@ -25,10 +25,7 @@ namespace detail {
class spinlock_ttas_futex {
private:
- // align shared variable 'value_' at cache line to prevent false sharing
- alignas(cache_alignment) std::atomic< std::int32_t > value_{ 0 };
- // padding to avoid other data one the cacheline of shared variable 'value_'
- char pad_[cacheline_length];
+ std::atomic< std::int32_t > value_{ 0 };
public:
spinlock_ttas_futex() noexcept = default;
@@ -57,6 +54,7 @@ public:
// delays the next instruction's execution for a finite period of time (depends on processor family)
// the CPU is not under demand, parts of the pipeline are no longer being used
// -> reduces the power consumed by the CPU
+ // -> prevent pipeline stalls
cpu_relax();
#else
// std::this_thread::yield() allows this_thread to give up the remaining part of its time slice,
@@ -69,10 +67,12 @@ public:
// utilize 'Binary Exponential Backoff' algorithm
// linear_congruential_engine is a random number engine based on Linear congruential generator (LCG)
static thread_local std::minstd_rand generator;
- const std::int32_t z = std::uniform_int_distribution< std::int32_t >{
- 0, static_cast< std::int32_t >( 1) << collisions }( generator);
+ static std::uniform_int_distribution< std::int32_t > distribution{ 0, static_cast< std::int32_t >( 1) << collisions };
+ const std::int32_t z = distribution( generator);
++collisions;
for ( std::int32_t i = 0; i < z; ++i) {
+ // -> reduces the power consumed by the CPU
+ // -> prevent pipeline stalls
cpu_relax();
}
} else {
diff --git a/boost/fiber/detail/wrap.hpp b/boost/fiber/detail/wrap.hpp
index 0369e61ee6..558de6bd94 100644
--- a/boost/fiber/detail/wrap.hpp
+++ b/boost/fiber/detail/wrap.hpp
@@ -10,8 +10,14 @@
#include <type_traits>
#include <boost/config.hpp>
+#if defined(BOOST_NO_CXX17_STD_INVOKE)
#include <boost/context/detail/invoke.hpp>
-#include <boost/context/execution_context.hpp>
+#endif
+#if (BOOST_EXECUTION_CONTEXT==1)
+# include <boost/context/execution_context.hpp>
+#else
+# include <boost/context/continuation.hpp>
+#endif
#include <boost/fiber/detail/config.hpp>
#include <boost/fiber/detail/data.hpp>
@@ -36,9 +42,9 @@ private:
public:
wrapper( Fn1 && fn1, Fn2 && fn2, Tpl && tpl,
boost::context::execution_context const& ctx) :
- fn1_( std::move( fn1) ),
- fn2_( std::move( fn2) ),
- tpl_( std::move( tpl) ),
+ fn1_{ std::move( fn1) },
+ fn2_{ std::move( fn2) },
+ tpl_{ std::move( tpl) },
ctx_{ ctx } {
}
@@ -49,9 +55,11 @@ public:
wrapper & operator=( wrapper && other) = default;
void operator()( void * vp) {
- boost::context::detail::invoke(
- std::move( fn1_),
- fn2_, tpl_, ctx_, vp);
+#if defined(BOOST_NO_CXX17_STD_INVOKE)
+ boost::context::detail::invoke( std::move( fn1_), fn2_, tpl_, ctx_, vp);
+#else
+ std::invoke( std::move( fn1_), fn2_, tpl_, ctx_, vp);
+#endif
}
};
@@ -59,11 +67,11 @@ template< typename Fn1, typename Fn2, typename Tpl >
wrapper< Fn1, Fn2, Tpl >
wrap( Fn1 && fn1, Fn2 && fn2, Tpl && tpl,
boost::context::execution_context const& ctx) {
- return wrapper< Fn1, Fn2, Tpl >(
+ return wrapper< Fn1, Fn2, Tpl >{
std::forward< Fn1 >( fn1),
std::forward< Fn2 >( fn2),
std::forward< Tpl >( tpl),
- ctx);
+ ctx };
}
#else
template< typename Fn1, typename Fn2, typename Tpl >
@@ -75,9 +83,9 @@ private:
public:
wrapper( Fn1 && fn1, Fn2 && fn2, Tpl && tpl) :
- fn1_( std::move( fn1) ),
- fn2_( std::move( fn2) ),
- tpl_( std::move( tpl) ) {
+ fn1_{ std::move( fn1) },
+ fn2_{ std::move( fn2) },
+ tpl_{ std::move( tpl) } {
}
wrapper( wrapper const&) = delete;
@@ -86,24 +94,31 @@ public:
wrapper( wrapper && other) = default;
wrapper & operator=( wrapper && other) = default;
- boost::context::execution_context< data_t * >
- operator()( boost::context::execution_context< data_t * > && ctx, data_t * dp) {
+ boost::context::continuation
+ operator()( boost::context::continuation && c) {
+#if defined(BOOST_NO_CXX17_STD_INVOKE)
return boost::context::detail::invoke(
std::move( fn1_),
fn2_,
tpl_,
- std::forward< boost::context::execution_context< data_t * > >( ctx),
- dp);
+ std::forward< boost::context::continuation >( c) );
+#else
+ return std::invoke(
+ std::move( fn1_),
+ fn2_,
+ tpl_,
+ std::forward< boost::context::continuation >( c) );
+#endif
}
};
template< typename Fn1, typename Fn2, typename Tpl >
wrapper< Fn1, Fn2, Tpl >
wrap( Fn1 && fn1, Fn2 && fn2, Tpl && tpl) {
- return wrapper< Fn1, Fn2, Tpl >(
+ return wrapper< Fn1, Fn2, Tpl >{
std::forward< Fn1 >( fn1),
std::forward< Fn2 >( fn2),
- std::forward< Tpl >( tpl) );
+ std::forward< Tpl >( tpl) };
}
#endif
diff --git a/boost/fiber/exceptions.hpp b/boost/fiber/exceptions.hpp
index ddbf45eadf..5a5f7e4f1b 100644
--- a/boost/fiber/exceptions.hpp
+++ b/boost/fiber/exceptions.hpp
@@ -28,15 +28,15 @@ namespace fibers {
class fiber_error : public std::system_error {
public:
fiber_error( std::error_code ec) :
- std::system_error( ec) {
+ std::system_error{ ec } {
}
fiber_error( std::error_code ec, const char * what_arg) :
- std::system_error( ec, what_arg) {
+ std::system_error{ ec, what_arg } {
}
fiber_error( std::error_code ec, std::string const& what_arg) :
- std::system_error( ec, what_arg) {
+ std::system_error{ ec, what_arg } {
}
virtual ~fiber_error() = default;
@@ -45,15 +45,15 @@ public:
class lock_error : public fiber_error {
public:
lock_error( std::error_code ec) :
- fiber_error( ec) {
+ fiber_error{ ec } {
}
lock_error( std::error_code ec, const char * what_arg) :
- fiber_error( ec, what_arg) {
+ fiber_error{ ec, what_arg } {
}
lock_error( std::error_code ec, std::string const& what_arg) :
- fiber_error( ec, what_arg) {
+ fiber_error{ ec, what_arg } {
}
};
@@ -77,12 +77,12 @@ struct is_error_code_enum< boost::fibers::future_errc > : public true_type {
inline
std::error_code make_error_code( boost::fibers::future_errc e) noexcept {
- return std::error_code( static_cast< int >( e), boost::fibers::future_category() );
+ return std::error_code{ static_cast< int >( e), boost::fibers::future_category() };
}
inline
std::error_condition make_error_condition( boost::fibers::future_errc e) noexcept {
- return std::error_condition( static_cast< int >( e), boost::fibers::future_category() );
+ return std::error_condition{ static_cast< int >( e), boost::fibers::future_category() };
}
}
@@ -93,49 +93,49 @@ namespace fibers {
class future_error : public fiber_error {
public:
future_error( std::error_code ec) :
- fiber_error( ec) {
+ fiber_error{ ec } {
}
};
class future_uninitialized : public future_error {
public:
future_uninitialized() :
- future_error( std::make_error_code( future_errc::no_state) ) {
+ future_error{ std::make_error_code( future_errc::no_state) } {
}
};
class future_already_retrieved : public future_error {
public:
future_already_retrieved() :
- future_error( std::make_error_code( future_errc::future_already_retrieved) ) {
+ future_error{ std::make_error_code( future_errc::future_already_retrieved) } {
}
};
class broken_promise : public future_error {
public:
broken_promise() :
- future_error( std::make_error_code( future_errc::broken_promise) ) {
+ future_error{ std::make_error_code( future_errc::broken_promise) } {
}
};
class promise_already_satisfied : public future_error {
public:
promise_already_satisfied() :
- future_error( std::make_error_code( future_errc::promise_already_satisfied) ) {
+ future_error{ std::make_error_code( future_errc::promise_already_satisfied) } {
}
};
class promise_uninitialized : public future_error {
public:
promise_uninitialized() :
- future_error( std::make_error_code( future_errc::no_state) ) {
+ future_error{ std::make_error_code( future_errc::no_state) } {
}
};
class packaged_task_uninitialized : public future_error {
public:
packaged_task_uninitialized() :
- future_error( std::make_error_code( future_errc::no_state) ) {
+ future_error{ std::make_error_code( future_errc::no_state) } {
}
};
diff --git a/boost/fiber/fiber.hpp b/boost/fiber/fiber.hpp
index 0fbc84ad12..1508a9b5a6 100644
--- a/boost/fiber/fiber.hpp
+++ b/boost/fiber/fiber.hpp
@@ -49,7 +49,7 @@ private:
public:
typedef context::id id;
- fiber() noexcept = default;
+ fiber() = default;
template< typename Fn,
typename ... Args,
@@ -108,7 +108,9 @@ public:
if ( joinable() ) {
std::terminate();
}
- if ( this == & other) return * this;
+ if ( this == & other) {
+ return * this;
+ }
impl_.swap( other.impl_);
return * this;
}
@@ -132,7 +134,7 @@ public:
template< typename PROPS >
PROPS & properties() {
auto props = impl_->get_properties();
- BOOST_ASSERT_MSG(props, "fiber::properties not set");
+ BOOST_ASSERT_MSG( props, "fiber::properties not set");
return dynamic_cast< PROPS & >( * props );
}
};
diff --git a/boost/fiber/fss.hpp b/boost/fiber/fss.hpp
index a578d40a7f..f65d7353b3 100644
--- a/boost/fiber/fss.hpp
+++ b/boost/fiber/fss.hpp
@@ -58,9 +58,9 @@ public:
}
~fiber_specific_ptr() {
- context * f = context::active();
- if ( nullptr != f) {
- f->set_fss_data(
+ context * active_ctx = context::active();
+ if ( nullptr != active_ctx) {
+ active_ctx->set_fss_data(
this, cleanup_fn_, nullptr, true);
}
}
diff --git a/boost/fiber/future/async.hpp b/boost/fiber/future/async.hpp
index d969d19ce3..e68b2c28fa 100644
--- a/boost/fiber/future/async.hpp
+++ b/boost/fiber/future/async.hpp
@@ -23,63 +23,85 @@ namespace fibers {
template< typename Fn, typename ... Args >
future<
- typename std::result_of<
- typename std::enable_if<
- ! detail::is_launch_policy< typename std::decay< Fn >::type >::value,
- typename std::decay< Fn >::type
- >::type( typename std::decay< Args >::type ... )
- >::type
+ typename std::result_of<
+ typename std::enable_if<
+ ! detail::is_launch_policy< typename std::decay< Fn >::type >::value,
+ typename std::decay< Fn >::type
+ >::type( typename std::decay< Args >::type ... )
+ >::type
>
async( Fn && fn, Args && ... args) {
typedef typename std::result_of<
typename std::decay< Fn >::type( typename std::decay< Args >::type ... )
- >::type result_t;
+ >::type result_type;
- packaged_task< result_t( typename std::decay< Args >::type ... ) > pt{
+ packaged_task< result_type( typename std::decay< Args >::type ... ) > pt{
std::forward< Fn >( fn) };
- future< result_t > f{ pt.get_future() };
+ future< result_type > f{ pt.get_future() };
fiber{ std::move( pt), std::forward< Args >( args) ... }.detach();
return f;
}
template< typename Policy, typename Fn, typename ... Args >
future<
- typename std::result_of<
- typename std::enable_if<
- detail::is_launch_policy< Policy >::value,
- typename std::decay< Fn >::type
- >::type( typename std::decay< Args >::type ...)
- >::type
+ typename std::result_of<
+ typename std::enable_if<
+ detail::is_launch_policy< Policy >::value,
+ typename std::decay< Fn >::type
+ >::type( typename std::decay< Args >::type ...)
+ >::type
>
async( Policy policy, Fn && fn, Args && ... args) {
typedef typename std::result_of<
typename std::decay< Fn >::type( typename std::decay< Args >::type ... )
- >::type result_t;
+ >::type result_type;
- packaged_task< result_t( typename std::decay< Args >::type ... ) > pt{
+ packaged_task< result_type( typename std::decay< Args >::type ... ) > pt{
std::forward< Fn >( fn) };
- future< result_t > f{ pt.get_future() };
+ future< result_type > f{ pt.get_future() };
fiber{ policy, std::move( pt), std::forward< Args >( args) ... }.detach();
return f;
}
template< typename Policy, typename StackAllocator, typename Fn, typename ... Args >
future<
- typename std::result_of<
- typename std::enable_if<
- detail::is_launch_policy< Policy >::value,
- typename std::decay< Fn >::type
- >::type( typename std::decay< Args >::type ... )
- >::type
+ typename std::result_of<
+ typename std::enable_if<
+ detail::is_launch_policy< Policy >::value,
+ typename std::decay< Fn >::type
+ >::type( typename std::decay< Args >::type ... )
+ >::type
>
async( Policy policy, std::allocator_arg_t, StackAllocator salloc, Fn && fn, Args && ... args) {
typedef typename std::result_of<
typename std::decay< Fn >::type( typename std::decay< Args >::type ... )
- >::type result_t;
+ >::type result_type;
- packaged_task< result_t( typename std::decay< Args >::type ... ) > pt{
- std::allocator_arg, salloc, std::forward< Fn >( fn) };
- future< result_t > f{ pt.get_future() };
+ packaged_task< result_type( typename std::decay< Args >::type ... ) > pt{
+ std::forward< Fn >( fn) };
+ future< result_type > f{ pt.get_future() };
+ fiber{ policy, std::allocator_arg, salloc,
+ std::move( pt), std::forward< Args >( args) ... }.detach();
+ return f;
+}
+
+template< typename Policy, typename StackAllocator, typename Allocator, typename Fn, typename ... Args >
+future<
+ typename std::result_of<
+ typename std::enable_if<
+ detail::is_launch_policy< Policy >::value,
+ typename std::decay< Fn >::type
+ >::type( typename std::decay< Args >::type ... )
+ >::type
+>
+async( Policy policy, std::allocator_arg_t, StackAllocator salloc, Allocator alloc, Fn && fn, Args && ... args) {
+ typedef typename std::result_of<
+ typename std::decay< Fn >::type( typename std::decay< Args >::type ... )
+ >::type result_type;
+
+ packaged_task< result_type( typename std::decay< Args >::type ... ) > pt{
+ std::allocator_arg, alloc, std::forward< Fn >( fn) };
+ future< result_type > f{ pt.get_future() };
fiber{ policy, std::allocator_arg, salloc,
std::move( pt), std::forward< Args >( args) ... }.detach();
return f;
diff --git a/boost/fiber/future/detail/shared_state.hpp b/boost/fiber/future/detail/shared_state.hpp
index 5ec6858cf3..898fdaffd4 100644
--- a/boost/fiber/future/detail/shared_state.hpp
+++ b/boost/fiber/future/detail/shared_state.hpp
@@ -109,46 +109,47 @@ public:
shared_state_base & operator=( shared_state_base const&) = delete;
void owner_destroyed() {
- std::unique_lock< mutex > lk( mtx_);
+ std::unique_lock< mutex > lk{ mtx_ };
owner_destroyed_( lk);
}
void set_exception( std::exception_ptr except) {
- std::unique_lock< mutex > lk( mtx_);
+ std::unique_lock< mutex > lk{ mtx_ };
set_exception_( except, lk);
}
std::exception_ptr get_exception_ptr() {
- std::unique_lock< mutex > lk( mtx_);
+ std::unique_lock< mutex > lk{ mtx_ };
return get_exception_ptr_( lk);
}
void wait() const {
- std::unique_lock< mutex > lk( mtx_);
+ std::unique_lock< mutex > lk{ mtx_ };
wait_( lk);
}
template< typename Rep, typename Period >
future_status wait_for( std::chrono::duration< Rep, Period > const& timeout_duration) const {
- std::unique_lock< mutex > lk( mtx_);
+ std::unique_lock< mutex > lk{ mtx_ };
return wait_for_( lk, timeout_duration);
}
template< typename Clock, typename Duration >
future_status wait_until( std::chrono::time_point< Clock, Duration > const& timeout_time) const {
- std::unique_lock< mutex > lk( mtx_);
+ std::unique_lock< mutex > lk{ mtx_ };
return wait_until_( lk, timeout_time);
}
friend inline
void intrusive_ptr_add_ref( shared_state_base * p) noexcept {
- ++p->use_count_;
+ p->use_count_.fetch_add( 1, std::memory_order_relaxed);
}
friend inline
void intrusive_ptr_release( shared_state_base * p) noexcept {
- if ( 0 == --p->use_count_) {
- p->deallocate_future();
+ if ( 1 == p->use_count_.fetch_sub( 1, std::memory_order_release) ) {
+ std::atomic_thread_fence( std::memory_order_acquire);
+ p->deallocate_future();
}
}
};
@@ -161,18 +162,18 @@ private:
void set_value_( R const& value, std::unique_lock< mutex > & lk) {
BOOST_ASSERT( lk.owns_lock() );
if ( ready_) {
- throw promise_already_satisfied();
+ throw promise_already_satisfied{};
}
- ::new ( static_cast< void * >( std::addressof( storage_) ) ) R( value);
+ ::new ( static_cast< void * >( std::addressof( storage_) ) ) R{ value };
mark_ready_and_notify_( lk);
}
void set_value_( R && value, std::unique_lock< mutex > & lk) {
BOOST_ASSERT( lk.owns_lock() );
if ( ready_) {
- throw promise_already_satisfied();
+ throw promise_already_satisfied{};
}
- ::new ( static_cast< void * >( std::addressof( storage_) ) ) R( std::move( value) );
+ ::new ( static_cast< void * >( std::addressof( storage_) ) ) R{ std::move( value) };
mark_ready_and_notify_( lk);
}
@@ -186,7 +187,7 @@ private:
}
public:
- typedef intrusive_ptr< shared_state > ptr_t;
+ typedef intrusive_ptr< shared_state > ptr_type;
shared_state() = default;
@@ -200,17 +201,17 @@ public:
shared_state & operator=( shared_state const&) = delete;
void set_value( R const& value) {
- std::unique_lock< mutex > lk( mtx_);
+ std::unique_lock< mutex > lk{ mtx_ };
set_value_( value, lk);
}
void set_value( R && value) {
- std::unique_lock< mutex > lk( mtx_);
+ std::unique_lock< mutex > lk{ mtx_ };
set_value_( std::move( value), lk);
}
R & get() {
- std::unique_lock< mutex > lk( mtx_);
+ std::unique_lock< mutex > lk{ mtx_ };
return get_( lk);
}
};
@@ -239,7 +240,7 @@ private:
}
public:
- typedef intrusive_ptr< shared_state > ptr_t;
+ typedef intrusive_ptr< shared_state > ptr_type;
shared_state() = default;
@@ -249,12 +250,12 @@ public:
shared_state & operator=( shared_state const&) = delete;
void set_value( R & value) {
- std::unique_lock< mutex > lk( mtx_);
+ std::unique_lock< mutex > lk{ mtx_ };
set_value_( value, lk);
}
R & get() {
- std::unique_lock< mutex > lk( mtx_);
+ std::unique_lock< mutex > lk{ mtx_ };
return get_( lk);
}
};
@@ -281,7 +282,7 @@ private:
}
public:
- typedef intrusive_ptr< shared_state > ptr_t;
+ typedef intrusive_ptr< shared_state > ptr_type;
shared_state() = default;
@@ -292,13 +293,13 @@ public:
inline
void set_value() {
- std::unique_lock< mutex > lk( mtx_);
+ std::unique_lock< mutex > lk{ mtx_ };
set_value_( lk);
}
inline
void get() {
- std::unique_lock< mutex > lk( mtx_);
+ std::unique_lock< mutex > lk{ mtx_ };
get_( lk);
}
};
diff --git a/boost/fiber/future/detail/shared_state_object.hpp b/boost/fiber/future/detail/shared_state_object.hpp
index dcdafef65c..50e08a6393 100644
--- a/boost/fiber/future/detail/shared_state_object.hpp
+++ b/boost/fiber/future/detail/shared_state_object.hpp
@@ -27,10 +27,11 @@ class shared_state_object : public shared_state< R > {
public:
typedef typename std::allocator_traits< Allocator >::template rebind_alloc<
shared_state_object
- > allocator_t;
+ > allocator_type;
- shared_state_object( allocator_t const& alloc) :
- shared_state< R >(), alloc_( alloc) {
+ shared_state_object( allocator_type const& alloc) :
+ shared_state< R >{},
+ alloc_{ alloc } {
}
protected:
@@ -39,10 +40,10 @@ protected:
}
private:
- allocator_t alloc_;
+ allocator_type alloc_;
- static void destroy_( allocator_t const& alloc, shared_state_object * p) noexcept {
- allocator_t a{ alloc };
+ static void destroy_( allocator_type const& alloc, shared_state_object * p) noexcept {
+ allocator_type a{ alloc };
a.destroy( p);
a.deallocate( p, 1);
}
diff --git a/boost/fiber/future/detail/task_base.hpp b/boost/fiber/future/detail/task_base.hpp
index 907e820470..83abd7e5ab 100644
--- a/boost/fiber/future/detail/task_base.hpp
+++ b/boost/fiber/future/detail/task_base.hpp
@@ -22,14 +22,14 @@ namespace detail {
template< typename R, typename ... Args >
struct task_base : public shared_state< R > {
- typedef intrusive_ptr< task_base > ptr_t;
+ typedef intrusive_ptr< task_base > ptr_type;
virtual ~task_base() {
}
virtual void run( Args && ... args) = 0;
- virtual ptr_t reset() = 0;
+ virtual ptr_type reset() = 0;
};
}}}
diff --git a/boost/fiber/future/detail/task_object.hpp b/boost/fiber/future/detail/task_object.hpp
index 37bfd3fd11..3a48929a58 100644
--- a/boost/fiber/future/detail/task_object.hpp
+++ b/boost/fiber/future/detail/task_object.hpp
@@ -9,10 +9,13 @@
#include <exception>
#include <memory>
+#include <tuple>
#include <utility>
#include <boost/config.hpp>
+#if defined(BOOST_NO_CXX17_STD_APPLY)
#include <boost/context/detail/apply.hpp>
+#endif
#include <boost/fiber/detail/config.hpp>
#include <boost/fiber/future/detail/task_base.hpp>
@@ -28,43 +31,49 @@ namespace detail {
template< typename Fn, typename Allocator, typename R, typename ... Args >
class task_object : public task_base< R, Args ... > {
private:
- typedef task_base< R, Args ... > base_t;
+ typedef task_base< R, Args ... > base_type;
public:
typedef typename std::allocator_traits< Allocator >::template rebind_alloc<
task_object
- > allocator_t;
+ > allocator_type;
- task_object( allocator_t const& alloc, Fn const& fn) :
- base_t(),
- fn_( fn),
- alloc_( alloc) {
+ task_object( allocator_type const& alloc, Fn const& fn) :
+ base_type{},
+ fn_{ fn },
+ alloc_{ alloc } {
}
- task_object( allocator_t const& alloc, Fn && fn) :
- base_t(),
- fn_( std::move( fn) ),
- alloc_( alloc) {
+ task_object( allocator_type const& alloc, Fn && fn) :
+ base_type{},
+ fn_{ std::move( fn) },
+ alloc_{ alloc } {
}
void run( Args && ... args) override final {
try {
this->set_value(
+#if defined(BOOST_NO_CXX17_STD_APPLY)
boost::context::detail::apply(
- fn_, std::make_tuple( std::forward< Args >( args) ... ) ) );
+ fn_, std::make_tuple( std::forward< Args >( args) ... ) )
+#else
+ std::apply(
+ fn_, std::make_tuple( std::forward< Args >( args) ... ) )
+#endif
+ );
} catch (...) {
this->set_exception( std::current_exception() );
}
}
- typename base_t::ptr_t reset() override final {
- typedef std::allocator_traits< allocator_t > traits_t;
+ typename base_type::ptr_type reset() override final {
+ typedef std::allocator_traits< allocator_type > traity_type;
- typename traits_t::pointer ptr{ traits_t::allocate( alloc_, 1) };
+ typename traity_type::pointer ptr{ traity_type::allocate( alloc_, 1) };
try {
- traits_t::construct( alloc_, ptr, alloc_, std::move( fn_) );
+ traity_type::construct( alloc_, ptr, alloc_, std::move( fn_) );
} catch (...) {
- traits_t::deallocate( alloc_, ptr, 1);
+ traity_type::deallocate( alloc_, ptr, 1);
throw;
}
return { convert( ptr) };
@@ -77,10 +86,10 @@ protected:
private:
Fn fn_;
- allocator_t alloc_;
+ allocator_type alloc_;
- static void destroy_( allocator_t const& alloc, task_object * p) noexcept {
- allocator_t a{ alloc };
+ static void destroy_( allocator_type const& alloc, task_object * p) noexcept {
+ allocator_type a{ alloc };
a.destroy( p);
a.deallocate( p, 1);
}
@@ -89,43 +98,48 @@ private:
template< typename Fn, typename Allocator, typename ... Args >
class task_object< Fn, Allocator, void, Args ... > : public task_base< void, Args ... > {
private:
- typedef task_base< void, Args ... > base_t;
+ typedef task_base< void, Args ... > base_type;
public:
typedef typename Allocator::template rebind<
task_object< Fn, Allocator, void, Args ... >
- >::other allocator_t;
+ >::other allocator_type;
- task_object( allocator_t const& alloc, Fn const& fn) :
- base_t(),
- fn_( fn),
- alloc_( alloc) {
+ task_object( allocator_type const& alloc, Fn const& fn) :
+ base_type{},
+ fn_{ fn },
+ alloc_{ alloc } {
}
- task_object( allocator_t const& alloc, Fn && fn) :
- base_t(),
- fn_( std::move( fn) ),
- alloc_( alloc) {
+ task_object( allocator_type const& alloc, Fn && fn) :
+ base_type{},
+ fn_{ std::move( fn) },
+ alloc_{ alloc } {
}
void run( Args && ... args) override final {
try {
+#if defined(BOOST_NO_CXX17_STD_APPLY)
boost::context::detail::apply(
fn_, std::make_tuple( std::forward< Args >( args) ... ) );
+#else
+ std::apply(
+ fn_, std::make_tuple( std::forward< Args >( args) ... ) );
+#endif
this->set_value();
} catch (...) {
this->set_exception( std::current_exception() );
}
}
- typename base_t::ptr_t reset() override final {
- typedef std::allocator_traits< allocator_t > traits_t;
+ typename base_type::ptr_type reset() override final {
+ typedef std::allocator_traits< allocator_type > traity_type;
- typename traits_t::pointer ptr{ traits_t::allocate( alloc_, 1) };
+ typename traity_type::pointer ptr{ traity_type::allocate( alloc_, 1) };
try {
- traits_t::construct( alloc_, ptr, alloc_, std::move( fn_) );
+ traity_type::construct( alloc_, ptr, alloc_, std::move( fn_) );
} catch (...) {
- traits_t::deallocate( alloc_, ptr, 1);
+ traity_type::deallocate( alloc_, ptr, 1);
throw;
}
return { convert( ptr) };
@@ -138,10 +152,10 @@ protected:
private:
Fn fn_;
- allocator_t alloc_;
+ allocator_type alloc_;
- static void destroy_( allocator_t const& alloc, task_object * p) noexcept {
- allocator_t a{ alloc };
+ static void destroy_( allocator_type const& alloc, task_object * p) noexcept {
+ allocator_type a{ alloc };
a.destroy( p);
a.deallocate( p, 1);
}
diff --git a/boost/fiber/future/future.hpp b/boost/fiber/future/future.hpp
index d92820eb58..5d4ad78ab5 100644
--- a/boost/fiber/future/future.hpp
+++ b/boost/fiber/future/future.hpp
@@ -24,13 +24,13 @@ namespace detail {
template< typename R >
struct future_base {
- typedef typename shared_state< R >::ptr_t ptr_t;
+ typedef typename shared_state< R >::ptr_type ptr_type;
- ptr_t state_{};
+ ptr_type state_{};
- future_base() noexcept = default;
+ future_base() = default;
- explicit future_base( ptr_t const& p) noexcept :
+ explicit future_base( ptr_type const& p) noexcept :
state_{ p } {
}
@@ -46,15 +46,17 @@ struct future_base {
}
future_base & operator=( future_base const& other) noexcept {
- if ( this == & other) return * this;
- state_ = other.state_;
+ if ( this != & other) {
+ state_ = other.state_;
+ }
return * this;
}
future_base & operator=( future_base && other) noexcept {
- if ( this == & other) return * this;
- state_ = other.state_;
- other.state_.reset();
+ if ( this != & other) {
+ state_ = other.state_;
+ other.state_.reset();
+ }
return * this;
}
@@ -107,128 +109,131 @@ class packaged_task;
template< typename R >
class future : private detail::future_base< R > {
private:
- typedef detail::future_base< R > base_t;
+ typedef detail::future_base< R > base_type;
friend struct detail::promise_base< R >;
friend class shared_future< R >;
template< typename Signature >
friend class packaged_task;
- explicit future( typename base_t::ptr_t const& p) noexcept :
- base_t{ p } {
+ explicit future( typename base_type::ptr_type const& p) noexcept :
+ base_type{ p } {
}
public:
- future() noexcept = default;
+ future() = default;
future( future const&) = delete;
future & operator=( future const&) = delete;
future( future && other) noexcept :
- base_t{ std::move( other) } {
+ base_type{ std::move( other) } {
}
future & operator=( future && other) noexcept {
- if ( this == & other) return * this;
- base_t::operator=( std::move( other) );
+ if ( this != & other) {
+ base_type::operator=( std::move( other) );
+ }
return * this;
}
shared_future< R > share();
R get() {
- if ( ! base_t::valid() ) {
+ if ( ! base_type::valid() ) {
throw future_uninitialized{};
}
- typename base_t::ptr_t tmp{};
- tmp.swap( base_t::state_);
+ typename base_type::ptr_type tmp{};
+ tmp.swap( base_type::state_);
return std::move( tmp->get() );
}
- using base_t::valid;
- using base_t::get_exception_ptr;
- using base_t::wait;
- using base_t::wait_for;
- using base_t::wait_until;
+ using base_type::valid;
+ using base_type::get_exception_ptr;
+ using base_type::wait;
+ using base_type::wait_for;
+ using base_type::wait_until;
};
template< typename R >
class future< R & > : private detail::future_base< R & > {
private:
- typedef detail::future_base< R & > base_t;
+ typedef detail::future_base< R & > base_type;
friend struct detail::promise_base< R & >;
friend class shared_future< R & >;
template< typename Signature >
friend class packaged_task;
- explicit future( typename base_t::ptr_t const& p) noexcept :
- base_t{ p } {
+ explicit future( typename base_type::ptr_type const& p) noexcept :
+ base_type{ p } {
}
public:
- future() noexcept = default;
+ future() = default;
future( future const&) = delete;
future & operator=( future const&) = delete;
future( future && other) noexcept :
- base_t{ std::move( other) } {
+ base_type{ std::move( other) } {
}
future & operator=( future && other) noexcept {
- if ( this == & other) return * this;
- base_t::operator=( std::move( other) );
+ if ( this != & other) {
+ base_type::operator=( std::move( other) );
+ }
return * this;
}
shared_future< R & > share();
R & get() {
- if ( ! base_t::valid() ) {
+ if ( ! base_type::valid() ) {
throw future_uninitialized{};
}
- typename base_t::ptr_t tmp{};
- tmp.swap( base_t::state_);
+ typename base_type::ptr_type tmp{};
+ tmp.swap( base_type::state_);
return tmp->get();
}
- using base_t::valid;
- using base_t::get_exception_ptr;
- using base_t::wait;
- using base_t::wait_for;
- using base_t::wait_until;
+ using base_type::valid;
+ using base_type::get_exception_ptr;
+ using base_type::wait;
+ using base_type::wait_for;
+ using base_type::wait_until;
};
template<>
class future< void > : private detail::future_base< void > {
private:
- typedef detail::future_base< void > base_t;
+ typedef detail::future_base< void > base_type;
friend struct detail::promise_base< void >;
friend class shared_future< void >;
template< typename Signature >
friend class packaged_task;
- explicit future( base_t::ptr_t const& p) noexcept :
- base_t{ p } {
+ explicit future( base_type::ptr_type const& p) noexcept :
+ base_type{ p } {
}
public:
- future() noexcept = default;
+ future() = default;
future( future const&) = delete;
future & operator=( future const&) = delete;
inline
future( future && other) noexcept :
- base_t{ std::move( other) } {
+ base_type{ std::move( other) } {
}
inline
future & operator=( future && other) noexcept {
- if ( this == & other) return * this;
- base_t::operator=( std::move( other) );
+ if ( this != & other) {
+ base_type::operator=( std::move( other) );
+ }
return * this;
}
@@ -236,62 +241,64 @@ public:
inline
void get() {
- if ( ! base_t::valid() ) {
+ if ( ! base_type::valid() ) {
throw future_uninitialized{};
}
- base_t::ptr_t tmp{};
- tmp.swap( base_t::state_);
+ base_type::ptr_type tmp{};
+ tmp.swap( base_type::state_);
tmp->get();
}
- using base_t::valid;
- using base_t::get_exception_ptr;
- using base_t::wait;
- using base_t::wait_for;
- using base_t::wait_until;
+ using base_type::valid;
+ using base_type::get_exception_ptr;
+ using base_type::wait;
+ using base_type::wait_for;
+ using base_type::wait_until;
};
template< typename R >
class shared_future : private detail::future_base< R > {
private:
- typedef detail::future_base< R > base_t;
+ typedef detail::future_base< R > base_type;
- explicit shared_future( typename base_t::ptr_t const& p) noexcept :
- base_t{ p } {
+ explicit shared_future( typename base_type::ptr_type const& p) noexcept :
+ base_type{ p } {
}
public:
- shared_future() noexcept = default;
+ shared_future() = default;
~shared_future() = default;
shared_future( shared_future const& other) :
- base_t{ other } {
+ base_type{ other } {
}
shared_future( shared_future && other) noexcept :
- base_t{ std::move( other) } {
+ base_type{ std::move( other) } {
}
shared_future( future< R > && other) noexcept :
- base_t{ std::move( other) } {
+ base_type{ std::move( other) } {
}
shared_future & operator=( shared_future const& other) noexcept {
- if ( this == & other) return * this;
- base_t::operator=( other);
+ if ( this != & other) {
+ base_type::operator=( other);
+ }
return * this;
}
shared_future & operator=( shared_future && other) noexcept {
- if ( this == & other) return * this;
- base_t::operator=( std::move( other) );
+ if ( this != & other) {
+ base_type::operator=( std::move( other) );
+ }
return * this;
}
shared_future & operator=( future< R > && other) noexcept {
- base_t::operator=( std::move( other) );
+ base_type::operator=( std::move( other) );
return * this;
}
@@ -299,56 +306,58 @@ public:
if ( ! valid() ) {
throw future_uninitialized{};
}
- return base_t::state_->get();
+ return base_type::state_->get();
}
- using base_t::valid;
- using base_t::get_exception_ptr;
- using base_t::wait;
- using base_t::wait_for;
- using base_t::wait_until;
+ using base_type::valid;
+ using base_type::get_exception_ptr;
+ using base_type::wait;
+ using base_type::wait_for;
+ using base_type::wait_until;
};
template< typename R >
class shared_future< R & > : private detail::future_base< R & > {
private:
- typedef detail::future_base< R & > base_t;
+ typedef detail::future_base< R & > base_type;
- explicit shared_future( typename base_t::ptr_t const& p) noexcept :
- base_t{ p } {
+ explicit shared_future( typename base_type::ptr_type const& p) noexcept :
+ base_type{ p } {
}
public:
- shared_future() noexcept = default;
+ shared_future() = default;
~shared_future() = default;
shared_future( shared_future const& other) :
- base_t{ other } {
+ base_type{ other } {
}
shared_future( shared_future && other) noexcept :
- base_t{ std::move( other) } {
+ base_type{ std::move( other) } {
}
shared_future( future< R & > && other) noexcept :
- base_t{ std::move( other) } {
+ base_type{ std::move( other) } {
}
shared_future & operator=( shared_future const& other) noexcept {
- if ( this == & other) return * this;
- base_t::operator=( other);
+ if ( this != & other) {
+ base_type::operator=( other);
+ }
return * this;
}
shared_future & operator=( shared_future && other) noexcept {
- if ( this == & other) return * this;
- base_t::operator=( std::move( other) );
+ if ( this != & other) {
+ base_type::operator=( std::move( other) );
+ }
return * this;
}
shared_future & operator=( future< R & > && other) noexcept {
- base_t::operator=( std::move( other) );
+ base_type::operator=( std::move( other) );
return * this;
}
@@ -356,62 +365,64 @@ public:
if ( ! valid() ) {
throw future_uninitialized{};
}
- return base_t::state_->get();
+ return base_type::state_->get();
}
- using base_t::valid;
- using base_t::get_exception_ptr;
- using base_t::wait;
- using base_t::wait_for;
- using base_t::wait_until;
+ using base_type::valid;
+ using base_type::get_exception_ptr;
+ using base_type::wait;
+ using base_type::wait_for;
+ using base_type::wait_until;
};
template<>
class shared_future< void > : private detail::future_base< void > {
private:
- typedef detail::future_base< void > base_t;
+ typedef detail::future_base< void > base_type;
- explicit shared_future( base_t::ptr_t const& p) noexcept :
- base_t{ p } {
+ explicit shared_future( base_type::ptr_type const& p) noexcept :
+ base_type{ p } {
}
public:
- shared_future() noexcept = default;
+ shared_future() = default;
~shared_future() = default;
inline
shared_future( shared_future const& other) :
- base_t{ other } {
+ base_type{ other } {
}
inline
shared_future( shared_future && other) noexcept :
- base_t{ std::move( other) } {
+ base_type{ std::move( other) } {
}
inline
shared_future( future< void > && other) noexcept :
- base_t{ std::move( other) } {
+ base_type{ std::move( other) } {
}
inline
shared_future & operator=( shared_future const& other) noexcept {
- if ( this == & other) return * this;
- base_t::operator=( other);
+ if ( this != & other) {
+ base_type::operator=( other);
+ }
return * this;
}
inline
shared_future & operator=( shared_future && other) noexcept {
- if ( this == & other) return * this;
- base_t::operator=( std::move( other) );
+ if ( this != & other) {
+ base_type::operator=( std::move( other) );
+ }
return * this;
}
inline
shared_future & operator=( future< void > && other) noexcept {
- base_t::operator=( std::move( other) );
+ base_type::operator=( std::move( other) );
return * this;
}
@@ -420,21 +431,21 @@ public:
if ( ! valid() ) {
throw future_uninitialized{};
}
- base_t::state_->get();
+ base_type::state_->get();
}
- using base_t::valid;
- using base_t::get_exception_ptr;
- using base_t::wait;
- using base_t::wait_for;
- using base_t::wait_until;
+ using base_type::valid;
+ using base_type::get_exception_ptr;
+ using base_type::wait;
+ using base_type::wait_for;
+ using base_type::wait_until;
};
template< typename R >
shared_future< R >
future< R >::share() {
- if ( ! base_t::valid() ) {
+ if ( ! base_type::valid() ) {
throw future_uninitialized{};
}
return shared_future< R >{ std::move( * this) };
@@ -443,7 +454,7 @@ future< R >::share() {
template< typename R >
shared_future< R & >
future< R & >::share() {
- if ( ! base_t::valid() ) {
+ if ( ! base_type::valid() ) {
throw future_uninitialized{};
}
return shared_future< R & >{ std::move( * this) };
@@ -452,7 +463,7 @@ future< R & >::share() {
inline
shared_future< void >
future< void >::share() {
- if ( ! base_t::valid() ) {
+ if ( ! base_type::valid() ) {
throw future_uninitialized{};
}
return shared_future< void >{ std::move( * this) };
diff --git a/boost/fiber/future/packaged_task.hpp b/boost/fiber/future/packaged_task.hpp
index 7ea16bfee7..31838ee41f 100644
--- a/boost/fiber/future/packaged_task.hpp
+++ b/boost/fiber/future/packaged_task.hpp
@@ -30,13 +30,13 @@ class packaged_task;
template< typename R, typename ... Args >
class packaged_task< R( Args ... ) > {
private:
- typedef typename detail::task_base< R, Args ... >::ptr_t ptr_t;
+ typedef typename detail::task_base< R, Args ... >::ptr_type ptr_type;
bool obtained_{ false };
- ptr_t task_{};
+ ptr_type task_{};
public:
- constexpr packaged_task() noexcept = default;
+ packaged_task() = default;
template< typename Fn,
typename = detail::disable_overload< packaged_task, Fn >
@@ -53,17 +53,17 @@ public:
explicit packaged_task( std::allocator_arg_t, Allocator const& alloc, Fn && fn) {
typedef detail::task_object<
typename std::decay< Fn >::type, Allocator, R, Args ...
- > object_t;
+ > object_type;
typedef std::allocator_traits<
- typename object_t::allocator_t
- > traits_t;
+ typename object_type::allocator_type
+ > traits_type;
- typename object_t::allocator_t a{ alloc };
- typename traits_t::pointer ptr{ traits_t::allocate( a, 1) };
+ typename object_type::allocator_type a{ alloc };
+ typename traits_type::pointer ptr{ traits_type::allocate( a, 1) };
try {
- traits_t::construct( a, ptr, a, std::forward< Fn >( fn) );
+ traits_type::construct( a, ptr, a, std::forward< Fn >( fn) );
} catch (...) {
- traits_t::deallocate( a, ptr, 1);
+ traits_type::deallocate( a, ptr, 1);
throw;
}
task_.reset( convert( ptr) );
@@ -85,9 +85,10 @@ public:
}
packaged_task & operator=( packaged_task && other) noexcept {
- if ( this == & other) return * this;
- packaged_task tmp{ std::move( other) };
- swap( tmp);
+ if ( this != & other) {
+ packaged_task tmp{ std::move( other) };
+ swap( tmp);
+ }
return * this;
}
diff --git a/boost/fiber/future/promise.hpp b/boost/fiber/future/promise.hpp
index 278c839700..83ba63fb23 100644
--- a/boost/fiber/future/promise.hpp
+++ b/boost/fiber/future/promise.hpp
@@ -25,10 +25,10 @@ namespace detail {
template< typename R >
struct promise_base {
- typedef typename shared_state< R >::ptr_t ptr_t;
+ typedef typename shared_state< R >::ptr_type ptr_type;
bool obtained_{ false };
- ptr_t future_{};
+ ptr_type future_{};
promise_base() :
promise_base{ std::allocator_arg, std::allocator< promise_base >{} } {
@@ -36,15 +36,15 @@ struct promise_base {
template< typename Allocator >
promise_base( std::allocator_arg_t, Allocator alloc) {
- typedef detail::shared_state_object< R, Allocator > object_t;
- typedef std::allocator_traits< typename object_t::allocator_t > traits_t;
- typename object_t::allocator_t a{ alloc };
- typename traits_t::pointer ptr{ traits_t::allocate( a, 1) };
+ typedef detail::shared_state_object< R, Allocator > object_type;
+ typedef std::allocator_traits< typename object_type::allocator_type > traits_type;
+ typename object_type::allocator_type a{ alloc };
+ typename traits_type::pointer ptr{ traits_type::allocate( a, 1) };
try {
- traits_t::construct( a, ptr, a);
+ traits_type::construct( a, ptr, a);
} catch (...) {
- traits_t::deallocate( a, ptr, 1);
+ traits_type::deallocate( a, ptr, 1);
throw;
}
future_.reset( convert( ptr) );
@@ -66,9 +66,10 @@ struct promise_base {
}
promise_base & operator=( promise_base && other) noexcept {
- if ( this == & other) return * this;
- promise_base tmp{ std::move( other) };
- swap( tmp);
+ if ( this != & other) {
+ promise_base tmp{ std::move( other) };
+ swap( tmp);
+ }
return * this;
}
@@ -101,14 +102,14 @@ struct promise_base {
template< typename R >
class promise : private detail::promise_base< R > {
private:
- typedef detail::promise_base< R > base_t;
+ typedef detail::promise_base< R > base_type;
public:
promise() = default;
template< typename Allocator >
promise( std::allocator_arg_t, Allocator alloc) :
- base_t{ std::allocator_arg, alloc } {
+ base_type{ std::allocator_arg, alloc } {
}
promise( promise const&) = delete;
@@ -118,38 +119,38 @@ public:
promise & operator=( promise && other) = default;
void set_value( R const& value) {
- if ( ! base_t::future_) {
+ if ( ! base_type::future_) {
throw promise_uninitialized{};
}
- base_t::future_->set_value( value);
+ base_type::future_->set_value( value);
}
void set_value( R && value) {
- if ( ! base_t::future_) {
+ if ( ! base_type::future_) {
throw promise_uninitialized{};
}
- base_t::future_->set_value( std::move( value) );
+ base_type::future_->set_value( std::move( value) );
}
void swap( promise & other) noexcept {
- base_t::swap( other);
+ base_type::swap( other);
}
- using base_t::get_future;
- using base_t::set_exception;
+ using base_type::get_future;
+ using base_type::set_exception;
};
template< typename R >
class promise< R & > : private detail::promise_base< R & > {
private:
- typedef detail::promise_base< R & > base_t;
+ typedef detail::promise_base< R & > base_type;
public:
promise() = default;
template< typename Allocator >
promise( std::allocator_arg_t, Allocator alloc) :
- base_t{ std::allocator_arg, alloc } {
+ base_type{ std::allocator_arg, alloc } {
}
promise( promise const&) = delete;
@@ -159,31 +160,31 @@ public:
promise & operator=( promise && other) noexcept = default;
void set_value( R & value) {
- if ( ! base_t::future_) {
+ if ( ! base_type::future_) {
throw promise_uninitialized{};
}
- base_t::future_->set_value( value);
+ base_type::future_->set_value( value);
}
void swap( promise & other) noexcept {
- base_t::swap( other);
+ base_type::swap( other);
}
- using base_t::get_future;
- using base_t::set_exception;
+ using base_type::get_future;
+ using base_type::set_exception;
};
template<>
class promise< void > : private detail::promise_base< void > {
private:
- typedef detail::promise_base< void > base_t;
+ typedef detail::promise_base< void > base_type;
public:
promise() = default;
template< typename Allocator >
promise( std::allocator_arg_t, Allocator alloc) :
- base_t{ std::allocator_arg, alloc } {
+ base_type{ std::allocator_arg, alloc } {
}
promise( promise const&) = delete;
@@ -194,19 +195,19 @@ public:
inline
void set_value() {
- if ( ! base_t::future_) {
+ if ( ! base_type::future_) {
throw promise_uninitialized{};
}
- base_t::future_->set_value();
+ base_type::future_->set_value();
}
inline
void swap( promise & other) noexcept {
- base_t::swap( other);
+ base_type::swap( other);
}
- using base_t::get_future;
- using base_t::set_exception;
+ using base_type::get_future;
+ using base_type::set_exception;
};
template< typename R >
diff --git a/boost/fiber/mutex.hpp b/boost/fiber/mutex.hpp
index b56e96802a..e25ef04d9e 100644
--- a/boost/fiber/mutex.hpp
+++ b/boost/fiber/mutex.hpp
@@ -33,11 +33,11 @@ class BOOST_FIBERS_DECL mutex {
private:
friend class condition_variable;
- typedef context::wait_queue_t wait_queue_t;
+ typedef context::wait_queue_t wait_queue_type;
- context * owner_{ nullptr };
- wait_queue_t wait_queue_{};
detail::spinlock wait_queue_splk_{};
+ wait_queue_type wait_queue_{};
+ context * owner_{ nullptr };
public:
mutex() = default;
diff --git a/boost/fiber/operations.hpp b/boost/fiber/operations.hpp
index 490ba462f6..95ab9ba452 100644
--- a/boost/fiber/operations.hpp
+++ b/boost/fiber/operations.hpp
@@ -35,8 +35,7 @@ void yield() noexcept {
template< typename Clock, typename Duration >
void sleep_until( std::chrono::time_point< Clock, Duration > const& sleep_time_) {
- std::chrono::steady_clock::time_point sleep_time(
- boost::fibers::detail::convert( sleep_time_) );
+ std::chrono::steady_clock::time_point sleep_time = boost::fibers::detail::convert( sleep_time_);
fibers::context::active()->wait_until( sleep_time);
}
@@ -48,8 +47,7 @@ void sleep_for( std::chrono::duration< Rep, Period > const& timeout_duration) {
template< typename PROPS >
PROPS & properties() {
- fibers::fiber_properties * props =
- fibers::context::active()->get_properties();
+ fibers::fiber_properties * props = fibers::context::active()->get_properties();
if ( ! props) {
// props could be nullptr if the thread's main fiber has not yet
// yielded (not yet passed through algorithm_with_properties::
@@ -61,7 +59,7 @@ PROPS & properties() {
props = fibers::context::active()->get_properties();
// Could still be hosed if the running manager isn't a subclass of
// algorithm_with_properties.
- BOOST_ASSERT_MSG(props, "this_fiber::properties not set");
+ BOOST_ASSERT_MSG( props, "this_fiber::properties not set");
}
return dynamic_cast< PROPS & >( * props );
}
diff --git a/boost/fiber/recursive_mutex.hpp b/boost/fiber/recursive_mutex.hpp
index 66fb5aa7c3..864c65cb8e 100644
--- a/boost/fiber/recursive_mutex.hpp
+++ b/boost/fiber/recursive_mutex.hpp
@@ -37,12 +37,12 @@ class BOOST_FIBERS_DECL recursive_mutex {
private:
friend class condition_variable;
- typedef context::wait_queue_t wait_queue_t;
+ typedef context::wait_queue_t wait_queue_type;
+ detail::spinlock wait_queue_splk_{};
+ wait_queue_type wait_queue_{};
context * owner_{ nullptr };
std::size_t count_{ 0 };
- wait_queue_t wait_queue_{};
- detail::spinlock wait_queue_splk_{};
public:
recursive_mutex() = default;
diff --git a/boost/fiber/recursive_timed_mutex.hpp b/boost/fiber/recursive_timed_mutex.hpp
index 2852252cd0..339427b895 100644
--- a/boost/fiber/recursive_timed_mutex.hpp
+++ b/boost/fiber/recursive_timed_mutex.hpp
@@ -39,12 +39,12 @@ class BOOST_FIBERS_DECL recursive_timed_mutex {
private:
friend class condition_variable;
- typedef context::wait_queue_t wait_queue_t;
+ typedef context::wait_queue_t wait_queue_type;
+ detail::spinlock wait_queue_splk_{};
+ wait_queue_type wait_queue_{};
context * owner_{ nullptr };
std::size_t count_{ 0 };
- wait_queue_t wait_queue_{};
- detail::spinlock wait_queue_splk_{};
bool try_lock_until_( std::chrono::steady_clock::time_point const& timeout_time) noexcept;
@@ -66,8 +66,7 @@ public:
template< typename Clock, typename Duration >
bool try_lock_until( std::chrono::time_point< Clock, Duration > const& timeout_time_) {
- std::chrono::steady_clock::time_point timeout_time(
- detail::convert( timeout_time_) );
+ std::chrono::steady_clock::time_point timeout_time = detail::convert( timeout_time_);
return try_lock_until_( timeout_time);
}
diff --git a/boost/fiber/scheduler.hpp b/boost/fiber/scheduler.hpp
index 4503bc0b5f..4a5f0dab8c 100644
--- a/boost/fiber/scheduler.hpp
+++ b/boost/fiber/scheduler.hpp
@@ -13,15 +13,19 @@
#include <vector>
#include <boost/config.hpp>
-#include <boost/context/execution_context.hpp>
+#if (BOOST_EXECUTION_CONTEXT==1)
+# include <boost/context/execution_context.hpp>
+#else
+# include <boost/context/continuation.hpp>
+#endif
#include <boost/intrusive/list.hpp>
#include <boost/intrusive_ptr.hpp>
#include <boost/intrusive/set.hpp>
+#include <boost/intrusive/slist.hpp>
#include <boost/fiber/algo/algorithm.hpp>
#include <boost/fiber/context.hpp>
#include <boost/fiber/detail/config.hpp>
-#include <boost/fiber/detail/context_mpsc_queue.hpp>
#include <boost/fiber/detail/data.hpp>
#include <boost/fiber/detail/spinlock.hpp>
@@ -49,45 +53,56 @@ public:
context,
intrusive::member_hook<
context, detail::ready_hook, & context::ready_hook_ >,
- intrusive::constant_time_size< false > > ready_queue_t;
+ intrusive::constant_time_size< false >
+ > ready_queue_type;
private:
typedef intrusive::multiset<
context,
intrusive::member_hook<
context, detail::sleep_hook, & context::sleep_hook_ >,
intrusive::constant_time_size< false >,
- intrusive::compare< timepoint_less > > sleep_queue_t;
+ intrusive::compare< timepoint_less >
+ > sleep_queue_type;
typedef intrusive::list<
context,
intrusive::member_hook<
+ context, detail::worker_hook, & context::worker_hook_ >,
+ intrusive::constant_time_size< false >
+ > worker_queue_type;
+ typedef intrusive::slist<
+ context,
+ intrusive::member_hook<
context, detail::terminated_hook, & context::terminated_hook_ >,
- intrusive::constant_time_size< false > > terminated_queue_t;
- typedef intrusive::list<
+ intrusive::linear< true >,
+ intrusive::cache_last< true >
+ > terminated_queue_type;
+ typedef intrusive::slist<
context,
intrusive::member_hook<
- context, detail::worker_hook, & context::worker_hook_ >,
- intrusive::constant_time_size< false > > worker_queue_t;
+ context, detail::remote_ready_hook, & context::remote_ready_hook_ >,
+ intrusive::linear< true >,
+ intrusive::cache_last< true >
+ > remote_ready_queue_type;
- std::unique_ptr< algo::algorithm > algo_;
- context * main_ctx_{ nullptr };
- intrusive_ptr< context > dispatcher_ctx_{};
- // worker-queue contains all context' mananged by this scheduler
- // except main-context and dispatcher-context
- // unlink happens on destruction of a context
- worker_queue_t worker_queue_{};
- // terminated-queue contains context' which have been terminated
- terminated_queue_t terminated_queue_{};
#if ! defined(BOOST_FIBERS_NO_ATOMICS)
// remote ready-queue contains context' signaled by schedulers
// running in other threads
- detail::context_mpsc_queue remote_ready_queue_{};
- // sleep-queue contains context' which have been called
+ detail::spinlock remote_ready_splk_{};
+ remote_ready_queue_type remote_ready_queue_{};
#endif
+ alignas(cache_alignment) std::unique_ptr< algo::algorithm > algo_;
+ // sleep-queue contains context' which have been called
// scheduler::wait_until()
- sleep_queue_t sleep_queue_{};
- bool shutdown_{ false };
-
- context * get_next_() noexcept;
+ sleep_queue_type sleep_queue_{};
+ // worker-queue contains all context' mananged by this scheduler
+ // except main-context and dispatcher-context
+ // unlink happens on destruction of a context
+ worker_queue_type worker_queue_{};
+ // terminated-queue contains context' which have been terminated
+ terminated_queue_type terminated_queue_{};
+ intrusive_ptr< context > dispatcher_ctx_{};
+ context * main_ctx_{ nullptr };
+ bool shutdown_{ false };
void release_terminated_() noexcept;
@@ -105,20 +120,20 @@ public:
virtual ~scheduler();
- void set_ready( context *) noexcept;
+ void schedule( context *) noexcept;
#if ! defined(BOOST_FIBERS_NO_ATOMICS)
- void set_remote_ready( context *) noexcept;
+ void schedule_from_remote( context *) noexcept;
#endif
#if (BOOST_EXECUTION_CONTEXT==1)
void dispatch() noexcept;
- void set_terminated( context *) noexcept;
+ void terminate( detail::spinlock_lock &, context *) noexcept;
#else
- boost::context::execution_context< detail::data_t * > dispatch() noexcept;
+ boost::context::continuation dispatch() noexcept;
- boost::context::execution_context< detail::data_t * > set_terminated( context *) noexcept;
+ boost::context::continuation terminate( detail::spinlock_lock &, context *) noexcept;
#endif
void yield( context *) noexcept;
diff --git a/boost/fiber/timed_mutex.hpp b/boost/fiber/timed_mutex.hpp
index ed6dbd458e..f74c5e41ad 100644
--- a/boost/fiber/timed_mutex.hpp
+++ b/boost/fiber/timed_mutex.hpp
@@ -35,11 +35,11 @@ class BOOST_FIBERS_DECL timed_mutex {
private:
friend class condition_variable;
- typedef context::wait_queue_t wait_queue_t;
+ typedef context::wait_queue_t wait_queue_type;
- context * owner_{ nullptr };
- wait_queue_t wait_queue_{};
detail::spinlock wait_queue_splk_{};
+ wait_queue_type wait_queue_{};
+ context * owner_{ nullptr };
bool try_lock_until_( std::chrono::steady_clock::time_point const& timeout_time) noexcept;
@@ -60,8 +60,7 @@ public:
template< typename Clock, typename Duration >
bool try_lock_until( std::chrono::time_point< Clock, Duration > const& timeout_time_) {
- std::chrono::steady_clock::time_point timeout_time(
- detail::convert( timeout_time_) );
+ std::chrono::steady_clock::time_point timeout_time = detail::convert( timeout_time_);
return try_lock_until_( timeout_time);
}
diff --git a/boost/fiber/type.hpp b/boost/fiber/type.hpp
index 9346265856..065d22fccb 100644
--- a/boost/fiber/type.hpp
+++ b/boost/fiber/type.hpp
@@ -18,7 +18,6 @@
#include <boost/assert.hpp>
#include <boost/config.hpp>
#include <boost/context/detail/apply.hpp>
-#include <boost/context/execution_context.hpp>
#include <boost/context/stack_context.hpp>
#include <boost/intrusive/list.hpp>
#include <boost/intrusive/parent_from_member.hpp>
diff --git a/boost/fiber/unbounded_channel.hpp b/boost/fiber/unbounded_channel.hpp
deleted file mode 100644
index 6de5a6f5f0..0000000000
--- a/boost/fiber/unbounded_channel.hpp
+++ /dev/null
@@ -1,272 +0,0 @@
-
-// Copyright Oliver Kowalke 2013.
-// Distributed under the Boost Software License, Version 1.0.
-// (See accompanying file LICENSE_1_0.txt or copy at
-// http://www.boost.org/LICENSE_1_0.txt)
-//
-
-#ifndef BOOST_FIBERS_UNBOUNDED_CHANNEL_H
-#define BOOST_FIBERS_UNBOUNDED_CHANNEL_H
-
-#warn "template unbounded_channel is deprecated"
-
-#include <atomic>
-#include <algorithm>
-#include <chrono>
-#include <cstddef>
-#include <deque>
-#include <memory>
-#include <mutex>
-#include <utility>
-
-#include <boost/config.hpp>
-#include <boost/intrusive_ptr.hpp>
-
-#include <boost/fiber/detail/config.hpp>
-#include <boost/fiber/channel_op_status.hpp>
-#include <boost/fiber/condition_variable.hpp>
-#include <boost/fiber/detail/convert.hpp>
-#include <boost/fiber/exceptions.hpp>
-#include <boost/fiber/mutex.hpp>
-#include <boost/fiber/operations.hpp>
-
-#ifdef BOOST_HAS_ABI_HEADERS
-# include BOOST_ABI_PREFIX
-#endif
-
-namespace boost {
-namespace fibers {
-
-template< typename T,
- typename Allocator = std::allocator< T >
->
-class unbounded_channel {
-public:
- typedef T value_type;
-
-private:
- struct node {
- typedef intrusive_ptr< node > ptr_t;
- typedef typename std::allocator_traits< Allocator >::template rebind_alloc<
- node
- > allocator_t;
- typedef std::allocator_traits< allocator_t > allocator_traits_t;
-
-#if ! defined(BOOST_FIBERS_NO_ATOMICS)
- std::atomic< std::size_t > use_count{ 0 };
-#else
- std::size_t use_count{ 0 };
-#endif
- allocator_t alloc;
- T va;
- ptr_t nxt{};
-
- node( T const& t, allocator_t const& alloc_) noexcept :
- alloc{ alloc_ },
- va{ t } {
- }
-
- node( T && t, allocator_t const& alloc_) noexcept :
- alloc{ alloc_ },
- va{ std::move( t) } {
- }
-
- friend
- void intrusive_ptr_add_ref( node * p) noexcept {
- ++p->use_count;
- }
-
- friend
- void intrusive_ptr_release( node * p) noexcept {
- if ( 0 == --p->use_count) {
- allocator_t alloc( p->alloc);
- allocator_traits_t::destroy( alloc, p);
- allocator_traits_t::deallocate( alloc, p, 1);
- }
- }
- };
-
- using ptr_t = typename node::ptr_t;
- using allocator_t = typename node::allocator_t;
- using allocator_traits_t = typename node::allocator_traits_t;
-
- enum class queue_status {
- open = 0,
- closed
- };
-
- allocator_t alloc_;
- queue_status state_{ queue_status::open };
- ptr_t head_{};
- ptr_t * tail_;
- mutable mutex mtx_{};
- condition_variable not_empty_cond_{};
-
- bool is_closed_() const noexcept {
- return queue_status::closed == state_;
- }
-
- void close_( std::unique_lock< mutex > & lk) noexcept {
- state_ = queue_status::closed;
- lk.unlock();
- not_empty_cond_.notify_all();
- }
-
- bool is_empty_() const noexcept {
- return ! head_;
- }
-
- channel_op_status push_( ptr_t new_node,
- std::unique_lock< mutex > & lk) noexcept {
- if ( is_closed_() ) {
- return channel_op_status::closed;
- }
- return push_and_notify_( new_node, lk);
- }
-
- channel_op_status push_and_notify_( ptr_t new_node,
- std::unique_lock< mutex > & lk) noexcept {
- push_tail_( new_node);
- lk.unlock();
- not_empty_cond_.notify_one();
- return channel_op_status::success;
- }
-
- void push_tail_( ptr_t new_node) noexcept {
- * tail_ = new_node;
- tail_ = & new_node->nxt;
- }
-
- value_type value_pop_( std::unique_lock< mutex > & lk) {
- BOOST_ASSERT( ! is_empty_() );
- auto old_head = pop_head_();
- return std::move( old_head->va);
- }
-
- ptr_t pop_head_() noexcept {
- auto old_head = head_;
- head_ = old_head->nxt;
- if ( ! head_) {
- tail_ = & head_;
- }
- old_head->nxt.reset();
- return old_head;
- }
-
-public:
- explicit unbounded_channel( Allocator const& alloc = Allocator() ) noexcept :
- alloc_{ alloc },
- tail_{ & head_ } {
- }
-
- unbounded_channel( unbounded_channel const&) = delete;
- unbounded_channel & operator=( unbounded_channel const&) = delete;
-
- void close() noexcept {
- std::unique_lock< mutex > lk( mtx_);
- close_( lk);
- }
-
- channel_op_status push( value_type const& va) {
- typename allocator_traits_t::pointer ptr{
- allocator_traits_t::allocate( alloc_, 1) };
- try {
- allocator_traits_t::construct( alloc_, ptr, va, alloc_);
- } catch (...) {
- allocator_traits_t::deallocate( alloc_, ptr, 1);
- throw;
- }
- std::unique_lock< mutex > lk( mtx_);
- return push_( { detail::convert( ptr) }, lk);
- }
-
- channel_op_status push( value_type && va) {
- typename allocator_traits_t::pointer ptr{
- allocator_traits_t::allocate( alloc_, 1) };
- try {
- allocator_traits_t::construct(
- alloc_, ptr, std::move( va), alloc_);
- } catch (...) {
- allocator_traits_t::deallocate( alloc_, ptr, 1);
- throw;
- }
- std::unique_lock< mutex > lk( mtx_);
- return push_( { detail::convert( ptr) }, lk);
- }
-
- channel_op_status pop( value_type & va) {
- std::unique_lock< mutex > lk( mtx_);
- not_empty_cond_.wait( lk,
- [this](){
- return is_closed_() || ! is_empty_();
- });
- if ( is_closed_() && is_empty_() ) {
- return channel_op_status::closed;
- }
- va = value_pop_( lk);
- return channel_op_status::success;
- }
-
- value_type value_pop() {
- std::unique_lock< mutex > lk( mtx_);
- not_empty_cond_.wait( lk,
- [this](){
- return is_closed_() || ! is_empty_();
- });
- if ( is_closed_() && is_empty_() ) {
- throw fiber_error(
- std::make_error_code( std::errc::operation_not_permitted),
- "boost fiber: queue is closed");
- }
- return value_pop_( lk);
- }
-
- channel_op_status try_pop( value_type & va) {
- std::unique_lock< mutex > lk( mtx_);
- if ( is_closed_() && is_empty_() ) {
- // let other fibers run
- lk.unlock();
- this_fiber::yield();
- return channel_op_status::closed;
- }
- if ( is_empty_() ) {
- // let other fibers run
- lk.unlock();
- this_fiber::yield();
- return channel_op_status::empty;
- }
- va = value_pop_( lk);
- return channel_op_status::success;
- }
-
- template< typename Rep, typename Period >
- channel_op_status pop_wait_for( value_type & va,
- std::chrono::duration< Rep, Period > const& timeout_duration) {
- return pop_wait_until( va, std::chrono::steady_clock::now() + timeout_duration);
- }
-
- template< typename Clock, typename Duration >
- channel_op_status pop_wait_until( value_type & va,
- std::chrono::time_point< Clock, Duration > const& timeout_time) {
- std::unique_lock< mutex > lk( mtx_);
- if ( ! not_empty_cond_.wait_until( lk, timeout_time,
- [this](){
- return is_closed_() || ! is_empty_();
- })) {
- return channel_op_status::timeout;
- }
- if ( is_closed_() && is_empty_() ) {
- return channel_op_status::closed;
- }
- va = value_pop_( lk);
- return channel_op_status::success;
- }
-};
-
-}}
-
-#ifdef BOOST_HAS_ABI_HEADERS
-# include BOOST_ABI_SUFFIX
-#endif
-
-#endif // BOOST_FIBERS_UNBOUNDED_CHANNEL_H
diff --git a/boost/fiber/unbuffered_channel.hpp b/boost/fiber/unbuffered_channel.hpp
index 582d9ae5a7..38a2d6111e 100644
--- a/boost/fiber/unbuffered_channel.hpp
+++ b/boost/fiber/unbuffered_channel.hpp
@@ -54,13 +54,14 @@ private:
};
// shared cacheline
- alignas(cache_alignment) std::atomic< slot * > slot_{ nullptr };
+ alignas(cache_alignment) std::atomic< slot * > slot_{ nullptr };
// shared cacheline
- alignas(cache_alignment) std::atomic_bool closed_{ false };
- mutable detail::spinlock splk_{};
- wait_queue_type waiting_producers_{};
- wait_queue_type waiting_consumers_{};
- char pad_[cacheline_length];
+ alignas(cache_alignment) std::atomic_bool closed_{ false };
+ alignas(cache_alignment) mutable detail::spinlock splk_producers_{};
+ wait_queue_type waiting_producers_{};
+ alignas( cache_alignment) mutable detail::spinlock splk_consumers_{};
+ wait_queue_type waiting_consumers_{};
+ char pad_[cacheline_length];
bool is_empty_() {
return nullptr == slot_.load( std::memory_order_acquire);
@@ -68,7 +69,7 @@ private:
bool try_push_( slot * own_slot) {
for (;;) {
- slot * s{ slot_.load( std::memory_order_acquire) };
+ slot * s = slot_.load( std::memory_order_acquire);
if ( nullptr == s) {
if ( ! slot_.compare_exchange_strong( s, own_slot, std::memory_order_acq_rel) ) {
continue;
@@ -81,9 +82,9 @@ private:
}
slot * try_pop_() {
- slot * nil_slot{ nullptr };
+ slot * nil_slot = nullptr;
for (;;) {
- slot * s{ slot_.load( std::memory_order_acquire) };
+ slot * s = slot_.load( std::memory_order_acquire);
if ( nullptr != s) {
if ( ! slot_.compare_exchange_strong( s, nil_slot, std::memory_order_acq_rel) ) {
continue;}
@@ -97,12 +98,12 @@ public:
~unbuffered_channel() {
close();
- slot * s{ nullptr };
+ slot * s = nullptr;
if ( nullptr != ( s = try_pop_() ) ) {
BOOST_ASSERT( nullptr != s);
BOOST_ASSERT( nullptr != s->ctx);
// value will be destructed in the context of the waiting fiber
- context::active()->set_ready( s->ctx);
+ context::active()->schedule( s->ctx);
}
}
@@ -114,90 +115,89 @@ public:
}
void close() noexcept {
- context * ctx{ context::active() };
- detail::spinlock_lock lk{ splk_ };
- closed_.store( true, std::memory_order_release);
+ context * active_ctx = context::active();
// notify all waiting producers
+ closed_.store( true, std::memory_order_release);
+ detail::spinlock_lock lk1{ splk_producers_ };
while ( ! waiting_producers_.empty() ) {
- context * producer_ctx{ & waiting_producers_.front() };
+ context * producer_ctx = & waiting_producers_.front();
waiting_producers_.pop_front();
- ctx->set_ready( producer_ctx);
+ active_ctx->schedule( producer_ctx);
}
// notify all waiting consumers
+ detail::spinlock_lock lk2{ splk_consumers_ };
while ( ! waiting_consumers_.empty() ) {
- context * consumer_ctx{ & waiting_consumers_.front() };
+ context * consumer_ctx = & waiting_consumers_.front();
waiting_consumers_.pop_front();
- ctx->set_ready( consumer_ctx);
+ active_ctx->schedule( consumer_ctx);
}
}
channel_op_status push( value_type const& value) {
- context * ctx{ context::active() };
- slot s{ value, ctx };
+ context * active_ctx = context::active();
+ slot s{ value, active_ctx };
for (;;) {
if ( is_closed() ) {
return channel_op_status::closed;
}
if ( try_push_( & s) ) {
- detail::spinlock_lock lk{ splk_ };
+ detail::spinlock_lock lk{ splk_consumers_ };
// notify one waiting consumer
if ( ! waiting_consumers_.empty() ) {
- context * consumer_ctx{ & waiting_consumers_.front() };
+ context * consumer_ctx = & waiting_consumers_.front();
waiting_consumers_.pop_front();
- ctx->set_ready( consumer_ctx);
+ active_ctx->schedule( consumer_ctx);
}
// suspend till value has been consumed
- ctx->suspend( lk);
+ active_ctx->suspend( lk);
// resumed, value has been consumed
return channel_op_status::success;
} else {
- BOOST_ASSERT( ! ctx->wait_is_linked() );
- detail::spinlock_lock lk{ splk_ };
+ detail::spinlock_lock lk{ splk_producers_ };
if ( is_closed() ) {
return channel_op_status::closed;
}
if ( is_empty_() ) {
continue;
}
- ctx->wait_link( waiting_producers_);
+ active_ctx->wait_link( waiting_producers_);
// suspend this producer
- ctx->suspend( lk);
+ active_ctx->suspend( lk);
// resumed, slot mabye free
}
}
}
channel_op_status push( value_type && value) {
- context * ctx{ context::active() };
- slot s{ std::move( value), ctx };
+ context * active_ctx = context::active();
+ slot s{ std::move( value), active_ctx };
for (;;) {
if ( is_closed() ) {
return channel_op_status::closed;
}
if ( try_push_( & s) ) {
- detail::spinlock_lock lk{ splk_ };
+ detail::spinlock_lock lk{ splk_consumers_ };
// notify one waiting consumer
if ( ! waiting_consumers_.empty() ) {
- context * consumer_ctx{ & waiting_consumers_.front() };
+ context * consumer_ctx = & waiting_consumers_.front();
waiting_consumers_.pop_front();
- ctx->set_ready( consumer_ctx);
+ active_ctx->schedule( consumer_ctx);
}
// suspend till value has been consumed
- ctx->suspend( lk);
+ active_ctx->suspend( lk);
// resumed, value has been consumed
return channel_op_status::success;
} else {
- BOOST_ASSERT( ! ctx->wait_is_linked() );
- detail::spinlock_lock lk{ splk_ };
+ detail::spinlock_lock lk{ splk_producers_ };
if ( is_closed() ) {
return channel_op_status::closed;
}
if ( is_empty_() ) {
continue;
}
- ctx->wait_link( waiting_producers_);
+ active_ctx->wait_link( waiting_producers_);
// suspend this producer
- ctx->suspend( lk);
+ active_ctx->suspend( lk);
// resumed, slot mabye free
}
}
@@ -220,51 +220,46 @@ public:
template< typename Clock, typename Duration >
channel_op_status push_wait_until( value_type const& value,
std::chrono::time_point< Clock, Duration > const& timeout_time_) {
- std::chrono::steady_clock::time_point timeout_time( detail::convert( timeout_time_) );
- context * ctx{ context::active() };
- slot s{ value, ctx };
+ context * active_ctx = context::active();
+ slot s{ value, active_ctx };
+ std::chrono::steady_clock::time_point timeout_time = detail::convert( timeout_time_);
for (;;) {
if ( is_closed() ) {
return channel_op_status::closed;
}
if ( try_push_( & s) ) {
- detail::spinlock_lock lk{ splk_ };
+ detail::spinlock_lock lk{ splk_consumers_ };
// notify one waiting consumer
if ( ! waiting_consumers_.empty() ) {
- context * consumer_ctx{ & waiting_consumers_.front() };
+ context * consumer_ctx = & waiting_consumers_.front();
waiting_consumers_.pop_front();
- ctx->set_ready( consumer_ctx);
+ active_ctx->schedule( consumer_ctx);
}
// suspend this producer
- if ( ! ctx->wait_until( timeout_time, lk) ) {
+ if ( ! active_ctx->wait_until( timeout_time, lk) ) {
// clear slot
- slot * nil_slot{ nullptr }, * own_slot{ & s };
+ slot * nil_slot = nullptr, * own_slot = & s;
slot_.compare_exchange_strong( own_slot, nil_slot, std::memory_order_acq_rel);
- // relock local lk
- lk.lock();
- // remove from waiting-queue
- ctx->wait_unlink();
// resumed, value has not been consumed
return channel_op_status::timeout;
}
// resumed, value has been consumed
return channel_op_status::success;
} else {
- BOOST_ASSERT( ! ctx->wait_is_linked() );
- detail::spinlock_lock lk{ splk_ };
+ detail::spinlock_lock lk{ splk_producers_ };
if ( is_closed() ) {
return channel_op_status::closed;
}
if ( is_empty_() ) {
continue;
}
- ctx->wait_link( waiting_producers_);
+ active_ctx->wait_link( waiting_producers_);
// suspend this producer
- if ( ! ctx->wait_until( timeout_time, lk) ) {
+ if ( ! active_ctx->wait_until( timeout_time, lk) ) {
// relock local lk
lk.lock();
// remove from waiting-queue
- ctx->wait_unlink();
+ waiting_producers_.remove( * active_ctx);
return channel_op_status::timeout;
}
// resumed, slot maybe free
@@ -275,51 +270,46 @@ public:
template< typename Clock, typename Duration >
channel_op_status push_wait_until( value_type && value,
std::chrono::time_point< Clock, Duration > const& timeout_time_) {
- std::chrono::steady_clock::time_point timeout_time( detail::convert( timeout_time_) );
- context * ctx{ context::active() };
- slot s{ std::move( value), ctx };
+ context * active_ctx = context::active();
+ slot s{ std::move( value), active_ctx };
+ std::chrono::steady_clock::time_point timeout_time = detail::convert( timeout_time_);
for (;;) {
if ( is_closed() ) {
return channel_op_status::closed;
}
if ( try_push_( & s) ) {
- detail::spinlock_lock lk{ splk_ };
+ detail::spinlock_lock lk{ splk_consumers_ };
// notify one waiting consumer
if ( ! waiting_consumers_.empty() ) {
- context * consumer_ctx{ & waiting_consumers_.front() };
+ context * consumer_ctx = & waiting_consumers_.front();
waiting_consumers_.pop_front();
- ctx->set_ready( consumer_ctx);
+ active_ctx->schedule( consumer_ctx);
}
// suspend this producer
- if ( ! ctx->wait_until( timeout_time, lk) ) {
+ if ( ! active_ctx->wait_until( timeout_time, lk) ) {
// clear slot
- slot * nil_slot{ nullptr }, * own_slot{ & s };
+ slot * nil_slot = nullptr, * own_slot = & s;
slot_.compare_exchange_strong( own_slot, nil_slot, std::memory_order_acq_rel);
- // relock local lk
- lk.lock();
- // remove from waiting-queue
- ctx->wait_unlink();
// resumed, value has not been consumed
return channel_op_status::timeout;
}
// resumed, value has been consumed
return channel_op_status::success;
} else {
- BOOST_ASSERT( ! ctx->wait_is_linked() );
- detail::spinlock_lock lk{ splk_ };
+ detail::spinlock_lock lk{ splk_producers_ };
if ( is_closed() ) {
return channel_op_status::closed;
}
if ( is_empty_() ) {
continue;
}
- ctx->wait_link( waiting_producers_);
+ active_ctx->wait_link( waiting_producers_);
// suspend this producer
- if ( ! ctx->wait_until( timeout_time, lk) ) {
+ if ( ! active_ctx->wait_until( timeout_time, lk) ) {
// relock local lk
lk.lock();
// remove from waiting-queue
- ctx->wait_unlink();
+ waiting_producers_.remove( * active_ctx);
return channel_op_status::timeout;
}
// resumed, slot maybe free
@@ -328,65 +318,63 @@ public:
}
channel_op_status pop( value_type & value) {
- context * ctx{ context::active() };
- slot * s{ nullptr };
+ context * active_ctx = context::active();
+ slot * s = nullptr;
for (;;) {
if ( nullptr != ( s = try_pop_() ) ) {
{
- detail::spinlock_lock lk{ splk_ };
+ detail::spinlock_lock lk{ splk_producers_ };
// notify one waiting producer
if ( ! waiting_producers_.empty() ) {
- context * producer_ctx{ & waiting_producers_.front() };
+ context * producer_ctx = & waiting_producers_.front();
waiting_producers_.pop_front();
lk.unlock();
- ctx->set_ready( producer_ctx);
+ active_ctx->schedule( producer_ctx);
}
}
// consume value
value = std::move( s->value);
// resume suspended producer
- ctx->set_ready( s->ctx);
+ active_ctx->schedule( s->ctx);
return channel_op_status::success;
} else {
- BOOST_ASSERT( ! ctx->wait_is_linked() );
- detail::spinlock_lock lk{ splk_ };
+ detail::spinlock_lock lk{ splk_consumers_ };
if ( is_closed() ) {
return channel_op_status::closed;
}
if ( ! is_empty_() ) {
continue;
}
- ctx->wait_link( waiting_consumers_);
+ active_ctx->wait_link( waiting_consumers_);
// suspend this consumer
- ctx->suspend( lk);
+ active_ctx->suspend( lk);
// resumed, slot mabye set
}
}
}
value_type value_pop() {
- context * ctx{ context::active() };
- slot * s{ nullptr };
+ context * active_ctx = context::active();
+ slot * s = nullptr;
for (;;) {
if ( nullptr != ( s = try_pop_() ) ) {
{
- detail::spinlock_lock lk{ splk_ };
+ detail::spinlock_lock lk{ splk_producers_ };
// notify one waiting producer
if ( ! waiting_producers_.empty() ) {
- context * producer_ctx{ & waiting_producers_.front() };
+ context * producer_ctx = & waiting_producers_.front();
waiting_producers_.pop_front();
lk.unlock();
- ctx->set_ready( producer_ctx);
+ active_ctx->schedule( producer_ctx);
}
}
// consume value
value_type value{ std::move( s->value) };
// resume suspended producer
- ctx->set_ready( s->ctx);
+ active_ctx->schedule( s->ctx);
return std::move( value);
} else {
- BOOST_ASSERT( ! ctx->wait_is_linked() );
- detail::spinlock_lock lk{ splk_ };
+ detail::spinlock_lock lk{ splk_consumers_ };
if ( is_closed() ) {
throw fiber_error{
std::make_error_code( std::errc::operation_not_permitted),
@@ -395,9 +383,9 @@ public:
if ( ! is_empty_() ) {
continue;
}
- ctx->wait_link( waiting_consumers_);
+ active_ctx->wait_link( waiting_consumers_);
// suspend this consumer
- ctx->suspend( lk);
+ active_ctx->suspend( lk);
// resumed, slot mabye set
}
}
@@ -413,42 +401,41 @@ public:
template< typename Clock, typename Duration >
channel_op_status pop_wait_until( value_type & value,
std::chrono::time_point< Clock, Duration > const& timeout_time_) {
- std::chrono::steady_clock::time_point timeout_time( detail::convert( timeout_time_) );
- context * ctx{ context::active() };
- slot * s{ nullptr };
+ context * active_ctx = context::active();
+ slot * s = nullptr;
+ std::chrono::steady_clock::time_point timeout_time = detail::convert( timeout_time_);
for (;;) {
if ( nullptr != ( s = try_pop_() ) ) {
{
- detail::spinlock_lock lk{ splk_ };
+ detail::spinlock_lock lk{ splk_producers_ };
// notify one waiting producer
if ( ! waiting_producers_.empty() ) {
- context * producer_ctx{ & waiting_producers_.front() };
+ context * producer_ctx = & waiting_producers_.front();
waiting_producers_.pop_front();
lk.unlock();
- ctx->set_ready( producer_ctx);
+ active_ctx->schedule( producer_ctx);
}
}
// consume value
value = std::move( s->value);
// resume suspended producer
- ctx->set_ready( s->ctx);
+ active_ctx->schedule( s->ctx);
return channel_op_status::success;
} else {
- BOOST_ASSERT( ! ctx->wait_is_linked() );
- detail::spinlock_lock lk{ splk_ };
+ detail::spinlock_lock lk{ splk_consumers_ };
if ( is_closed() ) {
return channel_op_status::closed;
}
if ( ! is_empty_() ) {
continue;
}
- ctx->wait_link( waiting_consumers_);
+ active_ctx->wait_link( waiting_consumers_);
// suspend this consumer
- if ( ! ctx->wait_until( timeout_time, lk) ) {
+ if ( ! active_ctx->wait_until( timeout_time, lk) ) {
// relock local lk
lk.lock();
// remove from waiting-queue
- ctx->wait_unlink();
+ waiting_consumers_.remove( * active_ctx);
return channel_op_status::timeout;
}
}
@@ -459,8 +446,8 @@ public:
private:
typedef typename std::aligned_storage< sizeof( value_type), alignof( value_type) >::type storage_type;
- unbuffered_channel * chan_{ nullptr };
- storage_type storage_;
+ unbuffered_channel * chan_{ nullptr };
+ storage_type storage_;
void increment_() {
BOOST_ASSERT( nullptr != chan_);