diff options
Diffstat (limited to 'boost/fiber')
38 files changed, 1301 insertions, 643 deletions
diff --git a/boost/fiber/algo/algorithm.hpp b/boost/fiber/algo/algorithm.hpp index 9b846e774b..b22a5923d6 100644 --- a/boost/fiber/algo/algorithm.hpp +++ b/boost/fiber/algo/algorithm.hpp @@ -6,11 +6,13 @@ #ifndef BOOST_FIBERS_ALGO_ALGORITHM_H #define BOOST_FIBERS_ALGO_ALGORITHM_H -#include <cstddef> +#include <atomic> #include <chrono> +#include <cstddef> -#include <boost/config.hpp> #include <boost/assert.hpp> +#include <boost/config.hpp> +#include <boost/intrusive_ptr.hpp> #include <boost/fiber/properties.hpp> #include <boost/fiber/detail/config.hpp> @@ -26,7 +28,13 @@ class context; namespace algo { -struct BOOST_FIBERS_DECL algorithm { +class BOOST_FIBERS_DECL algorithm { +private: + std::atomic< std::size_t > use_count_{ 0 }; + +public: + typedef intrusive_ptr< algorithm > ptr_t; + virtual ~algorithm() {} virtual void awakened( context *) noexcept = 0; @@ -38,6 +46,19 @@ struct BOOST_FIBERS_DECL algorithm { virtual void suspend_until( std::chrono::steady_clock::time_point const&) noexcept = 0; virtual void notify() noexcept = 0; + + friend void intrusive_ptr_add_ref( algorithm * algo) noexcept { + BOOST_ASSERT( nullptr != algo); + algo->use_count_.fetch_add( 1, std::memory_order_relaxed); + } + + friend void intrusive_ptr_release( algorithm * algo) noexcept { + BOOST_ASSERT( nullptr != algo); + if ( 1 == algo->use_count_.fetch_sub( 1, std::memory_order_release) ) { + std::atomic_thread_fence( std::memory_order_acquire); + delete algo; + } + } }; class BOOST_FIBERS_DECL algorithm_with_properties_base : public algorithm { @@ -60,7 +81,7 @@ struct algorithm_with_properties : public algorithm_with_properties_base { // with: algorithm_with_properties<PROPS>::awakened(fb); virtual void awakened( context * ctx) noexcept override final { fiber_properties * props = super::get_properties( ctx); - if ( nullptr == props) { + if ( BOOST_LIKELY( nullptr == props) ) { // TODO: would be great if PROPS could be allocated on the new // fiber's stack somehow props = new_properties( ctx); diff --git a/boost/fiber/algo/numa/work_stealing.hpp b/boost/fiber/algo/numa/work_stealing.hpp new file mode 100644 index 0000000000..26032ab35e --- /dev/null +++ b/boost/fiber/algo/numa/work_stealing.hpp @@ -0,0 +1,93 @@ + +// Copyright Oliver Kowalke 2017. +// Distributed under the Boost Software License, Version 1.0. +// (See accompanying file LICENSE_1_0.txt or copy at +// http://www.boost.org/LICENSE_1_0.txt) +// + +#ifndef BOOST_FIBERS_ALGO_NUMA_WORK_STEALING_H +#define BOOST_FIBERS_ALGO_NUMA_WORK_STEALING_H + +#include <condition_variable> +#include <chrono> +#include <cstddef> +#include <cstdint> +#include <mutex> +#include <vector> + +#include <boost/config.hpp> +#include <boost/intrusive_ptr.hpp> + +#include <boost/fiber/algo/algorithm.hpp> +#include <boost/fiber/context.hpp> +#include <boost/fiber/detail/config.hpp> +#include <boost/fiber/detail/context_spinlock_queue.hpp> +#include <boost/fiber/detail/context_spmc_queue.hpp> +#include <boost/fiber/numa/pin_thread.hpp> +#include <boost/fiber/numa/topology.hpp> +#include <boost/fiber/scheduler.hpp> + +#ifdef BOOST_HAS_ABI_HEADERS +# include BOOST_ABI_PREFIX +#endif + +namespace boost { +namespace fibers { +namespace algo { +namespace numa { + +class work_stealing : public algorithm { +private: + static std::vector< intrusive_ptr< work_stealing > > schedulers_; + + std::uint32_t cpu_id_; + std::vector< std::uint32_t > local_cpus_; + std::vector< std::uint32_t > remote_cpus_; +#ifdef BOOST_FIBERS_USE_SPMC_QUEUE + detail::context_spmc_queue rqueue_{}; +#else + detail::context_spinlock_queue rqueue_{}; +#endif + std::mutex mtx_{}; + std::condition_variable cnd_{}; + bool flag_{ false }; + bool suspend_; + + static void init_( std::vector< boost::fibers::numa::node > const&, + std::vector< intrusive_ptr< work_stealing > > &); + +public: + work_stealing( std::uint32_t, std::uint32_t, + std::vector< boost::fibers::numa::node > const&, + bool = false); + + work_stealing( work_stealing const&) = delete; + work_stealing( work_stealing &&) = delete; + + work_stealing & operator=( work_stealing const&) = delete; + work_stealing & operator=( work_stealing &&) = delete; + + virtual void awakened( context *) noexcept; + + virtual context * pick_next() noexcept; + + virtual context * steal() noexcept { + return rqueue_.steal(); + } + + virtual bool has_ready_fibers() const noexcept { + return ! rqueue_.empty(); + } + + virtual void suspend_until( std::chrono::steady_clock::time_point const&) noexcept; + + virtual void notify() noexcept; +}; + +}}}} + +#ifdef BOOST_HAS_ABI_HEADERS +# include BOOST_ABI_SUFFIX +#endif + +#endif // BOOST_FIBERS_ALGO_NUMA_WORK_STEALING_H diff --git a/boost/fiber/algo/work_stealing.hpp b/boost/fiber/algo/work_stealing.hpp index 66cadd12be..db4b5cf12d 100644 --- a/boost/fiber/algo/work_stealing.hpp +++ b/boost/fiber/algo/work_stealing.hpp @@ -8,19 +8,22 @@ #ifndef BOOST_FIBERS_ALGO_WORK_STEALING_H #define BOOST_FIBERS_ALGO_WORK_STEALING_H +#include <atomic> #include <condition_variable> #include <chrono> #include <cstddef> +#include <cstdint> #include <mutex> #include <vector> #include <boost/config.hpp> +#include <boost/intrusive_ptr.hpp> #include <boost/fiber/algo/algorithm.hpp> -#include <boost/fiber/detail/context_spinlock_queue.hpp> -#include <boost/fiber/detail/context_spmc_queue.hpp> #include <boost/fiber/context.hpp> #include <boost/fiber/detail/config.hpp> +#include <boost/fiber/detail/context_spinlock_queue.hpp> +#include <boost/fiber/detail/context_spmc_queue.hpp> #include <boost/fiber/scheduler.hpp> #ifdef BOOST_HAS_ABI_HEADERS @@ -33,24 +36,25 @@ namespace algo { class work_stealing : public algorithm { private: - static std::vector< work_stealing * > schedulers_; + static std::atomic< std::uint32_t > counter_; + static std::vector< intrusive_ptr< work_stealing > > schedulers_; - std::size_t idx_; - std::size_t max_idx_; + std::uint32_t id_; + std::uint32_t thread_count_; #ifdef BOOST_FIBERS_USE_SPMC_QUEUE - alignas(cache_alignment) detail::context_spmc_queue rqueue_{}; + detail::context_spmc_queue rqueue_{}; #else - alignas(cache_alignment) detail::context_spinlock_queue rqueue_{}; + detail::context_spinlock_queue rqueue_{}; #endif std::mutex mtx_{}; std::condition_variable cnd_{}; bool flag_{ false }; bool suspend_; - static void init_( std::size_t max_idx); + static void init_( std::uint32_t, std::vector< intrusive_ptr< work_stealing > > &); public: - work_stealing( std::size_t max_idx, std::size_t idx, bool suspend = false); + work_stealing( std::uint32_t, bool = false); work_stealing( work_stealing const&) = delete; work_stealing( work_stealing &&) = delete; @@ -58,21 +62,21 @@ public: work_stealing & operator=( work_stealing const&) = delete; work_stealing & operator=( work_stealing &&) = delete; - void awakened( context * ctx) noexcept; + virtual void awakened( context *) noexcept; - context * pick_next() noexcept; + virtual context * pick_next() noexcept; - context * steal() noexcept { + virtual context * steal() noexcept { return rqueue_.steal(); } - bool has_ready_fibers() const noexcept { + virtual bool has_ready_fibers() const noexcept { return ! rqueue_.empty(); } - void suspend_until( std::chrono::steady_clock::time_point const& time_point) noexcept; + virtual void suspend_until( std::chrono::steady_clock::time_point const&) noexcept; - void notify() noexcept; + virtual void notify() noexcept; }; }}} diff --git a/boost/fiber/all.hpp b/boost/fiber/all.hpp index 460df5295b..17ceca45cc 100644 --- a/boost/fiber/all.hpp +++ b/boost/fiber/all.hpp @@ -11,6 +11,7 @@ #include <boost/fiber/algo/round_robin.hpp> #include <boost/fiber/algo/shared_work.hpp> #include <boost/fiber/algo/work_stealing.hpp> +#include <boost/fiber/algo/numa/work_stealing.hpp> #include <boost/fiber/barrier.hpp> #include <boost/fiber/buffered_channel.hpp> #include <boost/fiber/channel_op_status.hpp> @@ -21,6 +22,8 @@ #include <boost/fiber/fixedsize_stack.hpp> #include <boost/fiber/fss.hpp> #include <boost/fiber/future.hpp> +#include <boost/fiber/numa/pin_thread.hpp> +#include <boost/fiber/numa/topology.hpp> #include <boost/fiber/mutex.hpp> #include <boost/fiber/operations.hpp> #include <boost/fiber/policy.hpp> diff --git a/boost/fiber/buffered_channel.hpp b/boost/fiber/buffered_channel.hpp index 1c32e49bae..3cf22295dd 100644 --- a/boost/fiber/buffered_channel.hpp +++ b/boost/fiber/buffered_channel.hpp @@ -40,7 +40,7 @@ private: typedef context::wait_queue_t wait_queue_type; typedef T slot_type; - alignas(cache_alignment) mutable detail::spinlock splk_{}; + mutable detail::spinlock splk_{}; wait_queue_type waiting_producers_{}; wait_queue_type waiting_consumers_{}; slot_type * slots_; @@ -64,7 +64,7 @@ private: public: explicit buffered_channel( std::size_t capacity) : capacity_{ capacity } { - if ( 2 > capacity_ || 0 != ( capacity_ & (capacity_ - 1) ) ) { + if ( BOOST_UNLIKELY( 2 > capacity_ || 0 != ( capacity_ & (capacity_ - 1) ) ) ) { throw fiber_error{ std::make_error_code( std::errc::invalid_argument), "boost fiber: buffer capacity is invalid" }; } @@ -92,20 +92,52 @@ public: while ( ! waiting_producers_.empty() ) { context * producer_ctx = & waiting_producers_.front(); waiting_producers_.pop_front(); - active_ctx->schedule( producer_ctx); + std::intptr_t expected = reinterpret_cast< std::intptr_t >( this); + if ( producer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) { + // notify before timeout + intrusive_ptr_release( producer_ctx); + // notify context + active_ctx->schedule( producer_ctx); + } else if ( static_cast< std::intptr_t >( 0) == expected) { + // no timed-wait op. + // notify context + active_ctx->schedule( producer_ctx); + } else { + // timed-wait op. + // expected == -1: notify after timeout, same timed-wait op. + // expected == <any>: notify after timeout, another timed-wait op. was already started + intrusive_ptr_release( producer_ctx); + // re-schedule next + } } // notify all waiting consumers while ( ! waiting_consumers_.empty() ) { context * consumer_ctx = & waiting_consumers_.front(); waiting_consumers_.pop_front(); - active_ctx->schedule( consumer_ctx); + std::intptr_t expected = reinterpret_cast< std::intptr_t >( this); + if ( consumer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) { + // notify before timeout + intrusive_ptr_release( consumer_ctx); + // notify context + active_ctx->schedule( consumer_ctx); + } else if ( static_cast< std::intptr_t >( 0) == expected) { + // no timed-wait op. + // notify context + active_ctx->schedule( consumer_ctx); + } else { + // timed-wait op. + // expected == -1: notify after timeout, same timed-wait op. + // expected == <any>: notify after timeout, another timed-wait op. was already started + intrusive_ptr_release( consumer_ctx); + // re-schedule next + } } } channel_op_status try_push( value_type const& value) { context * active_ctx = context::active(); detail::spinlock_lock lk{ splk_ }; - if ( is_closed_() ) { + if ( BOOST_UNLIKELY( is_closed_() ) ) { return channel_op_status::closed; } else if ( is_full_() ) { return channel_op_status::full; @@ -113,11 +145,28 @@ public: slots_[pidx_] = value; pidx_ = (pidx_ + 1) % capacity_; // notify one waiting consumer - if ( ! waiting_consumers_.empty() ) { + while ( ! waiting_consumers_.empty() ) { context * consumer_ctx = & waiting_consumers_.front(); waiting_consumers_.pop_front(); - lk.unlock(); - active_ctx->schedule( consumer_ctx); + std::intptr_t expected = reinterpret_cast< std::intptr_t >( this); + if ( consumer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) { + // notify before timeout + intrusive_ptr_release( consumer_ctx); + // notify context + active_ctx->schedule( consumer_ctx); + break; + } else if ( static_cast< std::intptr_t >( 0) == expected) { + // no timed-wait op. + // notify context + active_ctx->schedule( consumer_ctx); + break; + } else { + // timed-wait op. + // expected == -1: notify after timeout, same timed-wait op. + // expected == <any>: notify after timeout, another timed-wait op. was already started + intrusive_ptr_release( consumer_ctx); + // re-schedule next + } } return channel_op_status::success; } @@ -126,7 +175,7 @@ public: channel_op_status try_push( value_type && value) { context * active_ctx = context::active(); detail::spinlock_lock lk{ splk_ }; - if ( is_closed_() ) { + if ( BOOST_UNLIKELY( is_closed_() ) ) { return channel_op_status::closed; } else if ( is_full_() ) { return channel_op_status::full; @@ -134,11 +183,29 @@ public: slots_[pidx_] = std::move( value); pidx_ = (pidx_ + 1) % capacity_; // notify one waiting consumer - if ( ! waiting_consumers_.empty() ) { + while ( ! waiting_consumers_.empty() ) { context * consumer_ctx = & waiting_consumers_.front(); waiting_consumers_.pop_front(); lk.unlock(); - active_ctx->schedule( consumer_ctx); + std::intptr_t expected = reinterpret_cast< std::intptr_t >( this); + if ( consumer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) { + // notify before timeout + intrusive_ptr_release( consumer_ctx); + // notify context + active_ctx->schedule( consumer_ctx); + break; + } else if ( static_cast< std::intptr_t >( 0) == expected) { + // no timed-wait op. + // notify context + active_ctx->schedule( consumer_ctx); + break; + } else { + // timed-wait op. + // expected == -1: notify after timeout, same timed-wait op. + // expected == <any>: notify after timeout, another timed-wait op. was already started + intrusive_ptr_release( consumer_ctx); + // re-schedule next + } } return channel_op_status::success; } @@ -148,21 +215,40 @@ public: context * active_ctx = context::active(); for (;;) { detail::spinlock_lock lk{ splk_ }; - if ( is_closed_() ) { + if ( BOOST_UNLIKELY( is_closed_() ) ) { return channel_op_status::closed; } else if ( is_full_() ) { active_ctx->wait_link( waiting_producers_); + active_ctx->twstatus.store( static_cast< std::intptr_t >( 0), std::memory_order_release); // suspend this producer active_ctx->suspend( lk); } else { slots_[pidx_] = value; pidx_ = (pidx_ + 1) % capacity_; // notify one waiting consumer - if ( ! waiting_consumers_.empty() ) { + while ( ! waiting_consumers_.empty() ) { context * consumer_ctx = & waiting_consumers_.front(); waiting_consumers_.pop_front(); lk.unlock(); - active_ctx->schedule( consumer_ctx); + std::intptr_t expected = reinterpret_cast< std::intptr_t >( this); + if ( consumer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) { + // notify before timeout + intrusive_ptr_release( consumer_ctx); + // notify context + active_ctx->schedule( consumer_ctx); + break; + } else if ( static_cast< std::intptr_t >( 0) == expected) { + // no timed-wait op. + // notify context + active_ctx->schedule( consumer_ctx); + break; + } else { + // timed-wait op. + // expected == -1: notify after timeout, same timed-wait op. + // expected == <any>: notify after timeout, another timed-wait op. was already started + intrusive_ptr_release( consumer_ctx); + // re-schedule next + } } return channel_op_status::success; } @@ -173,21 +259,40 @@ public: context * active_ctx = context::active(); for (;;) { detail::spinlock_lock lk{ splk_ }; - if ( is_closed_() ) { + if ( BOOST_UNLIKELY( is_closed_() ) ) { return channel_op_status::closed; } else if ( is_full_() ) { active_ctx->wait_link( waiting_producers_); + active_ctx->twstatus.store( static_cast< std::intptr_t >( 0), std::memory_order_release); // suspend this producer active_ctx->suspend( lk); } else { slots_[pidx_] = std::move( value); pidx_ = (pidx_ + 1) % capacity_; // notify one waiting consumer - if ( ! waiting_consumers_.empty() ) { + while ( ! waiting_consumers_.empty() ) { context * consumer_ctx = & waiting_consumers_.front(); waiting_consumers_.pop_front(); lk.unlock(); - active_ctx->schedule( consumer_ctx); + std::intptr_t expected = reinterpret_cast< std::intptr_t >( this); + if ( consumer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) { + // notify before timeout + intrusive_ptr_release( consumer_ctx); + // notify context + active_ctx->schedule( consumer_ctx); + break; + } else if ( static_cast< std::intptr_t >( 0) == expected) { + // no timed-wait op. + // notify context + active_ctx->schedule( consumer_ctx); + break; + } else { + // timed-wait op. + // expected == -1: notify after timeout, same timed-wait op. + // expected == <any>: notify after timeout, another timed-wait op. was already started + intrusive_ptr_release( consumer_ctx); + // re-schedule next + } } return channel_op_status::success; } @@ -215,10 +320,12 @@ public: std::chrono::steady_clock::time_point timeout_time = detail::convert( timeout_time_); for (;;) { detail::spinlock_lock lk{ splk_ }; - if ( is_closed_() ) { + if ( BOOST_UNLIKELY( is_closed_() ) ) { return channel_op_status::closed; } else if ( is_full_() ) { active_ctx->wait_link( waiting_producers_); + intrusive_ptr_add_ref( active_ctx); + active_ctx->twstatus.store( reinterpret_cast< std::intptr_t >( this), std::memory_order_release); // suspend this producer if ( ! active_ctx->wait_until( timeout_time, lk) ) { // relock local lk @@ -231,11 +338,29 @@ public: slots_[pidx_] = value; pidx_ = (pidx_ + 1) % capacity_; // notify one waiting consumer - if ( ! waiting_consumers_.empty() ) { + while ( ! waiting_consumers_.empty() ) { context * consumer_ctx = & waiting_consumers_.front(); waiting_consumers_.pop_front(); lk.unlock(); - active_ctx->schedule( consumer_ctx); + std::intptr_t expected = reinterpret_cast< std::intptr_t >( this); + if ( consumer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) { + // notify before timeout + intrusive_ptr_release( consumer_ctx); + // notify context + active_ctx->schedule( consumer_ctx); + break; + } else if ( static_cast< std::intptr_t >( 0) == expected) { + // no timed-wait op. + // notify context + active_ctx->schedule( consumer_ctx); + break; + } else { + // timed-wait op. + // expected == -1: notify after timeout, same timed-wait op. + // expected == <any>: notify after timeout, another timed-wait op. was already started + intrusive_ptr_release( consumer_ctx); + // re-schedule next + } } return channel_op_status::success; } @@ -249,10 +374,12 @@ public: std::chrono::steady_clock::time_point timeout_time = detail::convert( timeout_time_); for (;;) { detail::spinlock_lock lk{ splk_ }; - if ( is_closed_() ) { + if ( BOOST_UNLIKELY( is_closed_() ) ) { return channel_op_status::closed; } else if ( is_full_() ) { active_ctx->wait_link( waiting_producers_); + intrusive_ptr_add_ref( active_ctx); + active_ctx->twstatus.store( reinterpret_cast< std::intptr_t >( this), std::memory_order_release); // suspend this producer if ( ! active_ctx->wait_until( timeout_time, lk) ) { // relock local lk @@ -265,11 +392,29 @@ public: slots_[pidx_] = std::move( value); pidx_ = (pidx_ + 1) % capacity_; // notify one waiting consumer - if ( ! waiting_consumers_.empty() ) { + while ( ! waiting_consumers_.empty() ) { context * consumer_ctx = & waiting_consumers_.front(); waiting_consumers_.pop_front(); lk.unlock(); - active_ctx->schedule( consumer_ctx); + std::intptr_t expected = reinterpret_cast< std::intptr_t >( this); + if ( consumer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) { + // notify before timeout + intrusive_ptr_release( consumer_ctx); + // notify context + active_ctx->schedule( consumer_ctx); + break; + } else if ( static_cast< std::intptr_t >( 0) == expected) { + // no timed-wait op. + // notify context + active_ctx->schedule( consumer_ctx); + break; + } else { + // timed-wait op. + // expected == -1: notify after timeout, same timed-wait op. + // expected == <any>: notify after timeout, another timed-wait op. was already started + intrusive_ptr_release( consumer_ctx); + // re-schedule next + } } return channel_op_status::success; } @@ -287,11 +432,29 @@ public: value = std::move( slots_[cidx_]); cidx_ = (cidx_ + 1) % capacity_; // notify one waiting producer - if ( ! waiting_producers_.empty() ) { + while ( ! waiting_producers_.empty() ) { context * producer_ctx = & waiting_producers_.front(); waiting_producers_.pop_front(); lk.unlock(); - active_ctx->schedule( producer_ctx); + std::intptr_t expected = reinterpret_cast< std::intptr_t >( this); + if ( producer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) { + // notify before timeout + intrusive_ptr_release( producer_ctx); + // notify context + active_ctx->schedule( producer_ctx); + break; + } else if ( static_cast< std::intptr_t >( 0) == expected) { + // no timed-wait op. + // notify context + active_ctx->schedule( producer_ctx); + break; + } else { + // timed-wait op. + // expected == -1: notify after timeout, same timed-wait op. + // expected == <any>: notify after timeout, another timed-wait op. was already started + intrusive_ptr_release( producer_ctx); + // re-schedule next + } } return channel_op_status::success; } @@ -302,10 +465,11 @@ public: for (;;) { detail::spinlock_lock lk{ splk_ }; if ( is_empty_() ) { - if ( is_closed_() ) { + if ( BOOST_UNLIKELY( is_closed_() ) ) { return channel_op_status::closed; } else { active_ctx->wait_link( waiting_consumers_); + active_ctx->twstatus.store( static_cast< std::intptr_t >( 0), std::memory_order_release); // suspend this consumer active_ctx->suspend( lk); } @@ -313,11 +477,29 @@ public: value = std::move( slots_[cidx_]); cidx_ = (cidx_ + 1) % capacity_; // notify one waiting producer - if ( ! waiting_producers_.empty() ) { + while ( ! waiting_producers_.empty() ) { context * producer_ctx = & waiting_producers_.front(); waiting_producers_.pop_front(); lk.unlock(); - active_ctx->schedule( producer_ctx); + std::intptr_t expected = reinterpret_cast< std::intptr_t >( this); + if ( producer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) { + // notify before timeout + intrusive_ptr_release( producer_ctx); + // notify context + active_ctx->schedule( producer_ctx); + break; + } else if ( static_cast< std::intptr_t >( 0) == expected) { + // no timed-wait op. + // notify context + active_ctx->schedule( producer_ctx); + break; + } else { + // timed-wait op. + // expected == -1: notify after timeout, same timed-wait op. + // expected == <any>: notify after timeout, another timed-wait op. was already started + intrusive_ptr_release( producer_ctx); + // re-schedule next + } } return channel_op_status::success; } @@ -329,12 +511,13 @@ public: for (;;) { detail::spinlock_lock lk{ splk_ }; if ( is_empty_() ) { - if ( is_closed_() ) { + if ( BOOST_UNLIKELY( is_closed_() ) ) { throw fiber_error{ std::make_error_code( std::errc::operation_not_permitted), "boost fiber: channel is closed" }; } else { active_ctx->wait_link( waiting_consumers_); + active_ctx->twstatus.store( static_cast< std::intptr_t >( 0), std::memory_order_release); // suspend this consumer active_ctx->suspend( lk); } @@ -342,11 +525,29 @@ public: value_type value = std::move( slots_[cidx_]); cidx_ = (cidx_ + 1) % capacity_; // notify one waiting producer - if ( ! waiting_producers_.empty() ) { + while ( ! waiting_producers_.empty() ) { context * producer_ctx = & waiting_producers_.front(); waiting_producers_.pop_front(); lk.unlock(); - active_ctx->schedule( producer_ctx); + std::intptr_t expected = reinterpret_cast< std::intptr_t >( this); + if ( producer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) { + // notify before timeout + intrusive_ptr_release( producer_ctx); + // notify context + active_ctx->schedule( producer_ctx); + break; + } else if ( static_cast< std::intptr_t >( 0) == expected) { + // no timed-wait op. + // notify context + active_ctx->schedule( producer_ctx); + break; + } else { + // timed-wait op. + // expected == -1: notify after timeout, same timed-wait op. + // expected == <any>: notify after timeout, another timed-wait op. was already started + intrusive_ptr_release( producer_ctx); + // re-schedule next + } } return std::move( value); } @@ -368,10 +569,12 @@ public: for (;;) { detail::spinlock_lock lk{ splk_ }; if ( is_empty_() ) { - if ( is_closed_() ) { + if ( BOOST_UNLIKELY( is_closed_() ) ) { return channel_op_status::closed; } else { active_ctx->wait_link( waiting_consumers_); + intrusive_ptr_add_ref( active_ctx); + active_ctx->twstatus.store( reinterpret_cast< std::intptr_t >( this), std::memory_order_release); // suspend this consumer if ( ! active_ctx->wait_until( timeout_time, lk) ) { // relock local lk @@ -385,11 +588,29 @@ public: value = std::move( slots_[cidx_]); cidx_ = (cidx_ + 1) % capacity_; // notify one waiting producer - if ( ! waiting_producers_.empty() ) { + while ( ! waiting_producers_.empty() ) { context * producer_ctx = & waiting_producers_.front(); waiting_producers_.pop_front(); lk.unlock(); - active_ctx->schedule( producer_ctx); + std::intptr_t expected = reinterpret_cast< std::intptr_t >( this); + if ( producer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) { + // notify before timeout + intrusive_ptr_release( producer_ctx); + // notify context + active_ctx->schedule( producer_ctx); + break; + } else if ( static_cast< std::intptr_t >( 0) == expected) { + // no timed-wait op. + // notify context + active_ctx->schedule( producer_ctx); + break; + } else { + // timed-wait op. + // expected == -1: notify after timeout, same timed-wait op. + // expected == <any>: notify after timeout, another timed-wait op. was already started + intrusive_ptr_release( producer_ctx); + // re-schedule next + } } return channel_op_status::success; } @@ -428,8 +649,9 @@ public: } iterator & operator=( iterator const& other) noexcept { - if ( this == & other) return * this; - chan_ = other.chan_; + if ( BOOST_LIKELY( this != & other) ) { + chan_ = other.chan_; + } return * this; } diff --git a/boost/fiber/condition_variable.hpp b/boost/fiber/condition_variable.hpp index cd0e7cb022..2cd3ecc719 100644 --- a/boost/fiber/condition_variable.hpp +++ b/boost/fiber/condition_variable.hpp @@ -70,6 +70,7 @@ public: detail::spinlock_lock lk{ wait_queue_splk_ }; BOOST_ASSERT( ! active_ctx->wait_is_linked() ); active_ctx->wait_link( wait_queue_); + active_ctx->twstatus.store( static_cast< std::intptr_t >( 0), std::memory_order_release); // unlock external lt lt.unlock(); // suspend this fiber @@ -101,6 +102,8 @@ public: detail::spinlock_lock lk{ wait_queue_splk_ }; BOOST_ASSERT( ! active_ctx->wait_is_linked() ); active_ctx->wait_link( wait_queue_); + intrusive_ptr_add_ref( active_ctx); + active_ctx->twstatus.store( reinterpret_cast< std::intptr_t >( this), std::memory_order_release); // unlock external lt lt.unlock(); // suspend this fiber diff --git a/boost/fiber/context.hpp b/boost/fiber/context.hpp index 773528e3a1..e2d1aeff5f 100644 --- a/boost/fiber/context.hpp +++ b/boost/fiber/context.hpp @@ -7,11 +7,12 @@ #ifndef BOOST_FIBERS_CONTEXT_H #define BOOST_FIBERS_CONTEXT_H -#include <iostream> #include <atomic> #include <chrono> +#include <cstdint> #include <exception> #include <functional> +#include <iostream> #include <map> #include <memory> #include <tuple> @@ -22,11 +23,7 @@ #if defined(BOOST_NO_CXX17_STD_APPLY) #include <boost/context/detail/apply.hpp> #endif -#if (BOOST_EXECUTION_CONTEXT==1) -# include <boost/context/execution_context.hpp> -#else -# include <boost/context/continuation.hpp> -#endif +#include <boost/context/continuation.hpp> #include <boost/context/stack_context.hpp> #include <boost/intrusive/list.hpp> #include <boost/intrusive/parent_from_member.hpp> @@ -39,7 +36,6 @@ #include <boost/fiber/detail/decay_copy.hpp> #include <boost/fiber/detail/fss.hpp> #include <boost/fiber/detail/spinlock.hpp> -#include <boost/fiber/detail/wrap.hpp> #include <boost/fiber/exceptions.hpp> #include <boost/fiber/fixedsize_stack.hpp> #include <boost/fiber/policy.hpp> @@ -66,10 +62,10 @@ class scheduler; namespace detail { struct wait_tag; -typedef intrusive::slist_member_hook< +typedef intrusive::list_member_hook< intrusive::tag< wait_tag >, intrusive::link_mode< - intrusive::safe_link + intrusive::auto_unlink > > wait_hook; // declaration of the functor that converts between @@ -132,25 +128,18 @@ typedef intrusive::slist_member_hook< } -struct main_context_t {}; -const main_context_t main_context{}; - -struct dispatcher_context_t {}; -const dispatcher_context_t dispatcher_context{}; - -struct worker_context_t {}; -const worker_context_t worker_context{}; - class BOOST_FIBERS_DECL context { public: - typedef intrusive::slist< + typedef intrusive::list< context, intrusive::function_hook< detail::wait_functor >, - intrusive::linear< true >, - intrusive::cache_last< true > + intrusive::constant_time_size< false > > wait_queue_t; private: + friend class dispatcher_context; + friend class main_context; + template< typename Fn, typename ... Arg > friend class worker_context; friend class scheduler; struct fss_data { @@ -175,88 +164,39 @@ private: typedef std::map< uintptr_t, fss_data > fss_data_t; #if ! defined(BOOST_FIBERS_NO_ATOMICS) - alignas(cache_alignment) std::atomic< std::size_t > use_count_{ 0 }; + std::atomic< std::size_t > use_count_; #else - alignas(cache_alignment) std::size_t use_count_{ 0 }; + std::size_t use_count_; #endif #if ! defined(BOOST_FIBERS_NO_ATOMICS) - alignas(cache_alignment) detail::remote_ready_hook remote_ready_hook_{}; - std::atomic< context * > remote_nxt_{ nullptr }; + detail::remote_ready_hook remote_ready_hook_{}; #endif - alignas(cache_alignment) detail::spinlock splk_{}; + detail::spinlock splk_{}; bool terminated_{ false }; wait_queue_t wait_queue_{}; public: detail::wait_hook wait_hook_{}; +#if ! defined(BOOST_FIBERS_NO_ATOMICS) + std::atomic< std::intptr_t > twstatus{ 0 }; +#endif private: - alignas(cache_alignment) scheduler * scheduler_{ nullptr }; + scheduler * scheduler_{ nullptr }; fss_data_t fss_data_{}; detail::sleep_hook sleep_hook_{}; detail::ready_hook ready_hook_{}; detail::terminated_hook terminated_hook_{}; detail::worker_hook worker_hook_{}; -#if (BOOST_EXECUTION_CONTEXT==1) - boost::context::execution_context ctx_; -#else - boost::context::continuation c_; -#endif fiber_properties * properties_{ nullptr }; std::chrono::steady_clock::time_point tp_{ (std::chrono::steady_clock::time_point::max)() }; + boost::context::continuation c_{}; type type_; launch policy_; - void resume_( detail::data_t &) noexcept; - void schedule_( context *) noexcept; - -#if (BOOST_EXECUTION_CONTEXT==1) - template< typename Fn, typename Tpl > - void run_( Fn && fn_, Tpl && tpl_, detail::data_t * dp) noexcept { - { - // fn and tpl must be destroyed before calling terminate() - typename std::decay< Fn >::type fn = std::forward< Fn >( fn_); - typename std::decay< Tpl >::type tpl = std::forward< Tpl >( tpl_); - if ( nullptr != dp->lk) { - dp->lk->unlock(); - } else if ( nullptr != dp->ctx) { - active()->schedule_( dp->ctx); - } -#if defined(BOOST_NO_CXX17_STD_APPLY) - boost::context::detail::apply( std::move( fn), std::move( tpl) ); -#else - std::apply( std::move( fn), std::move( tpl) ); -#endif - } - // terminate context - terminate(); - BOOST_ASSERT_MSG( false, "fiber already terminated"); - } -#else - template< typename Fn, typename Tpl > - boost::context::continuation - run_( boost::context::continuation && c, Fn && fn_, Tpl && tpl_) noexcept { - { - // fn and tpl must be destroyed before calling terminate() - typename std::decay< Fn >::type fn = std::forward< Fn >( fn_); - typename std::decay< Tpl >::type tpl = std::forward< Tpl >( tpl_); - c = c.resume(); - detail::data_t * dp = c.get_data< detail::data_t * >(); - // update contiunation of calling fiber - dp->from->c_ = std::move( c); - if ( nullptr != dp->lk) { - dp->lk->unlock(); - } else if ( nullptr != dp->ctx) { - active()->schedule_( dp->ctx); - } -#if defined(BOOST_NO_CXX17_STD_APPLY) - boost::context::detail::apply( std::move( fn), std::move( tpl) ); -#else - std::apply( std::move( fn), std::move( tpl) ); -#endif - } - // terminate context - return terminate(); + context( std::size_t initial_count, type t, launch policy) noexcept : + use_count_{ initial_count }, + type_{ t }, + policy_{ policy } { } -#endif public: class id { @@ -317,73 +257,6 @@ public: static void reset_active() noexcept; - // main fiber context - explicit context( main_context_t) noexcept; - - // dispatcher fiber context - context( dispatcher_context_t, boost::context::preallocated const&, - default_stack const&, scheduler *); - - // worker fiber context - template< typename StackAlloc, - typename Fn, - typename Tpl - > - context( worker_context_t, - launch policy, - boost::context::preallocated palloc, StackAlloc salloc, - Fn && fn, Tpl && tpl) : - use_count_{ 1 }, // fiber instance or scheduler owner -#if (BOOST_EXECUTION_CONTEXT==1) -# if defined(BOOST_NO_CXX14_GENERIC_LAMBDAS) - ctx_{ std::allocator_arg, palloc, salloc, - detail::wrap( - [this]( typename std::decay< Fn >::type & fn, typename std::decay< Tpl >::type & tpl, - boost::context::execution_context & ctx, void * vp) mutable noexcept { - run_( std::move( fn), std::move( tpl), static_cast< detail::data_t * >( vp) ); - }, - std::forward< Fn >( fn), - std::forward< Tpl >( tpl), - boost::context::execution_context::current() ) - }, - type_{ type::worker_context }, - policy_{ policy } -# else - ctx_{ std::allocator_arg, palloc, salloc, - [this,fn=detail::decay_copy( std::forward< Fn >( fn) ),tpl=std::forward< Tpl >( tpl), - ctx=boost::context::execution_context::current()] (void * vp) mutable noexcept { - run_( std::move( fn), std::move( tpl), static_cast< detail::data_t * >( vp) ); - }}, - type_{ type::worker_context }, - policy_{ policy } -# endif - {} -#else - c_{}, - type_{ type::worker_context }, - policy_{ policy } - { -# if defined(BOOST_NO_CXX14_GENERIC_LAMBDAS) - c_ = boost::context::callcc( - std::allocator_arg, palloc, salloc, - detail::wrap( - [this]( typename std::decay< Fn >::type & fn, typename std::decay< Tpl >::type & tpl, - boost::context::continuation && c) mutable noexcept { - return run_( std::forward< boost::context::continuation >( c), std::move( fn), std::move( tpl) ); - }, - std::forward< Fn >( fn), - std::forward< Tpl >( tpl) ) ); -# else - c_ = boost::context::callcc( - std::allocator_arg, palloc, salloc, - [this,fn=detail::decay_copy( std::forward< Fn >( fn) ),tpl=std::forward< Tpl >( tpl)] - (boost::context::continuation && c) mutable noexcept { - return run_( std::forward< boost::context::continuation >( c), std::move( fn), std::move( tpl) ); - }); -# endif - } -#endif - context( context const&) = delete; context & operator=( context const&) = delete; @@ -412,12 +285,9 @@ public: void suspend() noexcept; void suspend( detail::spinlock_lock &) noexcept; -#if (BOOST_EXECUTION_CONTEXT==1) - void terminate() noexcept; -#else boost::context::continuation suspend_with_cc() noexcept; boost::context::continuation terminate() noexcept; -#endif + void join(); void yield() noexcept; @@ -510,6 +380,8 @@ public: void sleep_unlink() noexcept; + void wait_unlink() noexcept; + void detach() noexcept; void attach( context *) noexcept; @@ -523,18 +395,11 @@ public: BOOST_ASSERT( nullptr != ctx); if ( 1 == ctx->use_count_.fetch_sub( 1, std::memory_order_release) ) { std::atomic_thread_fence( std::memory_order_acquire); -#if (BOOST_EXECUTION_CONTEXT==1) - boost::context::execution_context ec = ctx->ctx_; - // destruct context - // deallocates stack (execution_context is ref counted) - ctx->~context(); -#else boost::context::continuation c = std::move( ctx->c_); // destruct context ctx->~context(); // deallocated stack - c.resume( nullptr); -#endif + c.resume(); } } }; @@ -544,36 +409,66 @@ bool operator<( context const& l, context const& r) noexcept { return l.get_id() < r.get_id(); } -template< typename StackAlloc, typename Fn, typename ... Args > +template< typename Fn, typename ... Arg > +class worker_context final : public context { +private: + typename std::decay< Fn >::type fn_; + std::tuple< Arg ... > arg_; + + boost::context::continuation + run_( boost::context::continuation && c) { + { + // fn and tpl must be destroyed before calling terminate() + auto fn = std::move( fn_); + auto arg = std::move( arg_); + c.resume(); +#if defined(BOOST_NO_CXX17_STD_APPLY) + boost::context::detail::apply( std::move( fn_), std::move( arg_) ); +#else + std::apply( std::move( fn_), std::move( arg_) ); +#endif + } + // terminate context + return terminate(); + } + +public: + template< typename StackAlloc > + worker_context( launch policy, + boost::context::preallocated const& palloc, StackAlloc const& salloc, + Fn && fn, Arg ... arg) : + context{ 1, type::worker_context, policy }, + fn_( std::forward< Fn >( fn) ), + arg_( std::forward< Arg >( arg) ... ) { + c_ = boost::context::callcc( + std::allocator_arg, palloc, salloc, + std::bind( & worker_context::run_, this, std::placeholders::_1) ); + } +}; + + +template< typename StackAlloc, typename Fn, typename ... Arg > static intrusive_ptr< context > make_worker_context( launch policy, StackAlloc salloc, - Fn && fn, Args && ... args) { - boost::context::stack_context sctx = salloc.allocate(); -#if defined(BOOST_NO_CXX14_CONSTEXPR) || defined(BOOST_NO_CXX11_STD_ALIGN) + Fn && fn, Arg ... arg) { + typedef worker_context< Fn, Arg ... > context_t; + + auto sctx = salloc.allocate(); // reserve space for control structure - const std::size_t size = sctx.size - sizeof( context); - void * sp = static_cast< char * >( sctx.sp) - sizeof( context); -#else - constexpr std::size_t func_alignment = 64; // alignof( context); - constexpr std::size_t func_size = sizeof( context); - // reserve space on stack - void * sp = static_cast< char * >( sctx.sp) - func_size - func_alignment; - // align sp pointer - std::size_t space = func_size + func_alignment; - sp = std::align( func_alignment, func_size, sp, space); - BOOST_ASSERT( nullptr != sp); - // calculate remaining size - const std::size_t size = sctx.size - ( static_cast< char * >( sctx.sp) - static_cast< char * >( sp) ); -#endif + void * storage = reinterpret_cast< void * >( + ( reinterpret_cast< uintptr_t >( sctx.sp) - static_cast< uintptr_t >( sizeof( context_t) ) ) + & ~ static_cast< uintptr_t >( 0xff) ); + void * stack_bottom = reinterpret_cast< void * >( + reinterpret_cast< uintptr_t >( sctx.sp) - static_cast< uintptr_t >( sctx.size) ); + const std::size_t size = reinterpret_cast< uintptr_t >( storage) - reinterpret_cast< uintptr_t >( stack_bottom); // placement new of context on top of fiber's stack return intrusive_ptr< context >{ - ::new ( sp) context{ - worker_context, + new ( storage) context_t{ policy, - boost::context::preallocated{ sp, size, sctx }, + boost::context::preallocated{ storage, size, sctx }, salloc, std::forward< Fn >( fn), - std::make_tuple( std::forward< Args >( args) ... ) } }; + std::forward< Arg >( arg) ... } }; } namespace detail { diff --git a/boost/fiber/detail/config.hpp b/boost/fiber/detail/config.hpp index 7c7119e1fb..21dea693ac 100644 --- a/boost/fiber/detail/config.hpp +++ b/boost/fiber/detail/config.hpp @@ -47,19 +47,20 @@ # error "futex not supported on this platform" #endif -#if !defined(BOOST_FIBERS_SPIN_MAX_COLLISIONS) -# define BOOST_FIBERS_SPIN_MAX_COLLISIONS 16 +#if !defined(BOOST_FIBERS_CONTENTION_WINDOW_THRESHOLD) +# define BOOST_FIBERS_CONTENTION_WINDOW_THRESHOLD 16 #endif -#if !defined(BOOST_FIBERS_SPIN_MAX_TESTS) -# define BOOST_FIBERS_SPIN_MAX_TESTS 500 +#if !defined(BOOST_FIBERS_RETRY_THRESHOLD) +# define BOOST_FIBERS_RETRY_THRESHOLD 64 #endif -// modern architectures have cachelines with 64byte length -// ARM Cortex-A15 32/64byte, Cortex-A9 16/32/64bytes -// MIPS 74K: 32byte, 4KEc: 16byte -// ist shoudl be safe to use 64byte for all -static constexpr std::size_t cache_alignment{ 64 }; -static constexpr std::size_t cacheline_length{ 64 }; +#if !defined(BOOST_FIBERS_SPIN_BEFORE_SLEEP0) +# define BOOST_FIBERS_SPIN_BEFORE_SLEEP0 32 +#endif + +#if !defined(BOOST_FIBERS_SPIN_BEFORE_YIELD) +# define BOOST_FIBERS_SPIN_BEFORE_YIELD 64 +#endif #endif // BOOST_FIBERS_DETAIL_CONFIG_H diff --git a/boost/fiber/detail/context_spinlock_queue.hpp b/boost/fiber/detail/context_spinlock_queue.hpp index e0ebdabda6..f58fbd2296 100644 --- a/boost/fiber/detail/context_spinlock_queue.hpp +++ b/boost/fiber/detail/context_spinlock_queue.hpp @@ -30,7 +30,7 @@ class context_spinlock_queue { private: typedef context * slot_type; - alignas(cache_alignment) mutable spinlock splk_{}; + mutable spinlock splk_{}; std::size_t pidx_{ 0 }; std::size_t cidx_{ 0 }; std::size_t capacity_; diff --git a/boost/fiber/detail/context_spmc_queue.hpp b/boost/fiber/detail/context_spmc_queue.hpp index 27256233cf..89f93044f9 100644 --- a/boost/fiber/detail/context_spmc_queue.hpp +++ b/boost/fiber/detail/context_spmc_queue.hpp @@ -44,9 +44,7 @@ private: class array { private: typedef std::atomic< context * > atomic_type; - typedef std::aligned_storage< - sizeof( atomic_type), cache_alignment - >::type storage_type; + typedef atomic_type storage_type; std::size_t capacity_; storage_type * storage_; @@ -92,9 +90,9 @@ private: } }; - alignas(cache_alignment) std::atomic< std::size_t > top_{ 0 }; - alignas(cache_alignment) std::atomic< std::size_t > bottom_{ 0 }; - alignas(cache_alignment) std::atomic< array * > array_; + std::atomic< std::size_t > top_{ 0 }; + std::atomic< std::size_t > bottom_{ 0 }; + std::atomic< array * > array_; std::vector< array * > old_arrays_{}; char padding_[cacheline_length]; diff --git a/boost/fiber/detail/convert.hpp b/boost/fiber/detail/convert.hpp index ac190d8528..ba3bbbd0aa 100644 --- a/boost/fiber/detail/convert.hpp +++ b/boost/fiber/detail/convert.hpp @@ -34,22 +34,6 @@ std::chrono::steady_clock::time_point convert( return std::chrono::steady_clock::now() + ( timeout_time - Clock::now() ); } -// suggested by Howard Hinnant -template< typename T > -inline -T * convert( T * p) noexcept { - return p; -} - -template< typename Pointer > -inline -typename std::pointer_traits< Pointer >::element_type * -convert( Pointer p) noexcept { - return nullptr != p - ? to_raw_pointer( p.operator->() ) - : nullptr; -} - }}} #ifdef BOOST_HAS_ABI_HEADERS diff --git a/boost/fiber/detail/cpu_relax.hpp b/boost/fiber/detail/cpu_relax.hpp index 541b46dfd0..8a20aae059 100644 --- a/boost/fiber/detail/cpu_relax.hpp +++ b/boost/fiber/detail/cpu_relax.hpp @@ -16,7 +16,7 @@ #include <boost/fiber/detail/config.hpp> #if BOOST_COMP_MSVC || BOOST_COMP_MSVC_EMULATED -# include <Windows.h> +# include <windows.h> #endif #ifdef BOOST_HAS_ABI_HEADERS diff --git a/boost/fiber/detail/data.hpp b/boost/fiber/detail/data.hpp index e2b119ec3e..c363817a09 100644 --- a/boost/fiber/detail/data.hpp +++ b/boost/fiber/detail/data.hpp @@ -23,22 +23,6 @@ class context; namespace detail { -#if (BOOST_EXECUTION_CONTEXT==1) -struct data_t { - spinlock_lock * lk{ nullptr }; - context * ctx{ nullptr }; - - data_t() = default; - - explicit data_t( spinlock_lock * lk_) noexcept : - lk{ lk_ } { - } - - explicit data_t( context * ctx_) noexcept : - ctx{ ctx_ } { - } -}; -#else struct data_t { spinlock_lock * lk{ nullptr }; context * ctx{ nullptr }; @@ -60,7 +44,6 @@ struct data_t { from{ from_ } { } }; -#endif }}} diff --git a/boost/fiber/detail/futex.hpp b/boost/fiber/detail/futex.hpp index d383dc4077..e64bd5990d 100644 --- a/boost/fiber/detail/futex.hpp +++ b/boost/fiber/detail/futex.hpp @@ -18,7 +18,7 @@ extern "C" { #include <sys/syscall.h> } #elif BOOST_OS_WINDOWS -#include <Windows.h> +#include <windows.h> #endif namespace boost { @@ -26,28 +26,28 @@ namespace fibers { namespace detail { #if BOOST_OS_LINUX -inline +BOOST_FORCEINLINE int sys_futex( void * addr, std::int32_t op, std::int32_t x) { return ::syscall( SYS_futex, addr, op, x, nullptr, nullptr, 0); } -inline +BOOST_FORCEINLINE int futex_wake( std::atomic< std::int32_t > * addr) { return 0 <= sys_futex( static_cast< void * >( addr), FUTEX_WAKE_PRIVATE, 1) ? 0 : -1; } -inline +BOOST_FORCEINLINE int futex_wait( std::atomic< std::int32_t > * addr, std::int32_t x) { return 0 <= sys_futex( static_cast< void * >( addr), FUTEX_WAIT_PRIVATE, x) ? 0 : -1; } #elif BOOST_OS_WINDOWS -inline +BOOST_FORCEINLINE int futex_wake( std::atomic< std::int32_t > * addr) { ::WakeByAddressSingle( static_cast< void * >( addr) ); return 0; } -inline +BOOST_FORCEINLINE int futex_wait( std::atomic< std::int32_t > * addr, std::int32_t x) { ::WaitOnAddress( static_cast< volatile void * >( addr), & x, sizeof( x), INFINITE); return 0; diff --git a/boost/fiber/detail/rtm.hpp b/boost/fiber/detail/rtm.hpp new file mode 100644 index 0000000000..5188b0d216 --- /dev/null +++ b/boost/fiber/detail/rtm.hpp @@ -0,0 +1,94 @@ + +// Copyright Oliver Kowalke 2017. +// Distributed under the Boost Software License, Version 1.0. +// (See accompanying file LICENSE_1_0.txt or copy at +// http://www.boost.org/LICENSE_1_0.txt) +// + +#ifndef BOOST_FIBER_DETAIL_RTM_H +#define BOOST_FIBER_DETAIL_RTM_H + +#include <cstdint> + +#include <boost/assert.hpp> +#include <boost/config.hpp> + +#include <boost/fiber/detail/config.hpp> + +#ifdef BOOST_HAS_ABI_HEADERS +# include BOOST_ABI_PREFIX +#endif + +namespace boost { +namespace fibers { +namespace detail { + +struct rtm_status { + enum { + none = 0, + explicit_abort = 1 << 0, + may_retry = 1 << 1, + memory_conflict = 1 << 2, + buffer_overflow = 1 << 3, + debug_hit = 1 << 4, + nested_abort = 1 << 5 + }; + + static constexpr std::uint32_t success = ~std::uint32_t{ 0 }; +}; + +static BOOST_FORCEINLINE +std::uint32_t rtm_begin() noexcept { + std::uint32_t result = rtm_status::success; + __asm__ __volatile__ + ( + ".byte 0xc7,0xf8 ; .long 0" + : "+a" (result) + : + : "memory" + ); + return result; +} + +static BOOST_FORCEINLINE +void rtm_end() noexcept { + __asm__ __volatile__ + ( + ".byte 0x0f,0x01,0xd5" + : + : + : "memory" + ); +} + +static BOOST_FORCEINLINE +void rtm_abort_lock_not_free() noexcept { + __asm__ __volatile__ + ( + ".byte 0xc6,0xf8,0xff" + : + : + : "memory" + ); +} + +static BOOST_FORCEINLINE +bool rtm_test() noexcept { + bool result; + __asm__ __volatile__ + ( + ".byte 0x0f,0x01,0xd6; setz %0" + : "=q" (result) + : + : "memory" + ); + return result; +} + +}}} + +#ifdef BOOST_HAS_ABI_HEADERS +# include BOOST_ABI_SUFFIX +#endif + +#endif // BOOST_FIBER_DETAIL_RTM_H diff --git a/boost/fiber/detail/spinlock.hpp b/boost/fiber/detail/spinlock.hpp index 89a6d51a6f..59d2a5cd2b 100644 --- a/boost/fiber/detail/spinlock.hpp +++ b/boost/fiber/detail/spinlock.hpp @@ -13,11 +13,14 @@ #if !defined(BOOST_FIBERS_NO_ATOMICS) # include <mutex> -# include <boost/fiber/detail/spinlock_ttas.hpp> # include <boost/fiber/detail/spinlock_ttas_adaptive.hpp> +# include <boost/fiber/detail/spinlock_ttas.hpp> # if defined(BOOST_FIBERS_HAS_FUTEX) -# include <boost/fiber/detail/spinlock_ttas_futex.hpp> # include <boost/fiber/detail/spinlock_ttas_adaptive_futex.hpp> +# include <boost/fiber/detail/spinlock_ttas_futex.hpp> +# endif +# if defined(BOOST_USE_TSX) +# include <boost/fiber/detail/spinlock_rtm.hpp> # endif #endif @@ -29,7 +32,7 @@ namespace boost { namespace fibers { namespace detail { -#if defined(BOOST_FIBERS_NO_ATOMICS) +#if defined(BOOST_FIBERS_NO_ATOMICS) struct spinlock { constexpr spinlock() noexcept {} void lock() noexcept {} @@ -42,16 +45,32 @@ struct spinlock_lock { void unlock() noexcept {} }; #else -# if defined(BOOST_FIBERS_SPINLOCK_STD_MUTEX) +# if defined(BOOST_FIBERS_SPINLOCK_STD_MUTEX) using spinlock = std::mutex; # elif defined(BOOST_FIBERS_SPINLOCK_TTAS_FUTEX) +# if defined(BOOST_USE_TSX) +using spinlock = spinlock_rtm< spinlock_ttas_futex >; +# else using spinlock = spinlock_ttas_futex; +# endif # elif defined(BOOST_FIBERS_SPINLOCK_TTAS_ADAPTIVE_FUTEX) +# if defined(BOOST_USE_TSX) +using spinlock = spinlock_rtm< spinlock_ttas_adaptive_futex >; +# else using spinlock = spinlock_ttas_adaptive_futex; -# elif defined(BOOST_FIBERS_SPINLOCK_TTAS_ADAPTIVE) +# endif +# elif defined(BOOST_FIBERS_SPINLOCK_TTAS_ADAPTIVE) +# if defined(BOOST_USE_TSX) +using spinlock = spinlock_rtm< spinlock_ttas_adaptive >; +# else using spinlock = spinlock_ttas_adaptive; +# endif # else +# if defined(BOOST_USE_TSX) +using spinlock = spinlock_rtm< spinlock_ttas >; +# else using spinlock = spinlock_ttas; +# endif # endif using spinlock_lock = std::unique_lock< spinlock >; #endif diff --git a/boost/fiber/detail/spinlock_rtm.hpp b/boost/fiber/detail/spinlock_rtm.hpp new file mode 100644 index 0000000000..5cc4a5e9af --- /dev/null +++ b/boost/fiber/detail/spinlock_rtm.hpp @@ -0,0 +1,126 @@ + +// Copyright Oliver Kowalke 2017. +// Distributed under the Boost Software License, Version 1.0. +// (See accompanying file LICENSE_1_0.txt or copy at +// http://www.boost.org/LICENSE_1_0.txt) + +#ifndef BOOST_FIBERS_SPINLOCK_RTM_H +#define BOOST_FIBERS_SPINLOCK_RTM_H + +#include <atomic> +#include <chrono> +#include <cmath> +#include <random> +#include <thread> + +#include <boost/fiber/detail/config.hpp> +#include <boost/fiber/detail/cpu_relax.hpp> +#include <boost/fiber/detail/rtm.hpp> +#include <boost/fiber/detail/spinlock_status.hpp> + +namespace boost { +namespace fibers { +namespace detail { + +template< typename FBSplk > +class spinlock_rtm { +private: + FBSplk splk_{}; + +public: + spinlock_rtm() = default; + + spinlock_rtm( spinlock_rtm const&) = delete; + spinlock_rtm & operator=( spinlock_rtm const&) = delete; + + void lock() noexcept { + static thread_local std::minstd_rand generator{ std::random_device{}() }; + std::size_t collisions = 0 ; + for ( std::size_t retries = 0; retries < BOOST_FIBERS_RETRY_THRESHOLD; ++retries) { + std::uint32_t status; + if ( rtm_status::success == ( status = rtm_begin() ) ) { + // add lock to read-set + if ( spinlock_status::unlocked == splk_.state_.load( std::memory_order_relaxed) ) { + // lock is free, enter critical section + return; + } + // lock was acquired by another thread + // explicit abort of transaction with abort argument 'lock not free' + rtm_abort_lock_not_free(); + } + // transaction aborted + if ( rtm_status::none != (status & rtm_status::may_retry) || + rtm_status::none != (status & rtm_status::memory_conflict) ) { + // another logical processor conflicted with a memory address that was + // part or the read-/write-set + if ( BOOST_FIBERS_CONTENTION_WINDOW_THRESHOLD > collisions) { + std::uniform_int_distribution< std::size_t > distribution{ + 0, static_cast< std::size_t >( 1) << (std::min)(collisions, static_cast< std::size_t >( BOOST_FIBERS_CONTENTION_WINDOW_THRESHOLD)) }; + const std::size_t z = distribution( generator); + ++collisions; + for ( std::size_t i = 0; i < z; ++i) { + cpu_relax(); + } + } else { + std::this_thread::yield(); + } + } else if ( rtm_status::none != (status & rtm_status::explicit_abort) && + rtm_status::none == (status & rtm_status::nested_abort) ) { + // another logical processor has acquired the lock and + // abort was not caused by a nested transaction + // wait till lock becomes free again + std::size_t count = 0; + while ( spinlock_status::locked == splk_.state_.load( std::memory_order_relaxed) ) { + if ( BOOST_FIBERS_SPIN_BEFORE_SLEEP0 > count) { + ++count; + cpu_relax(); + } else if ( BOOST_FIBERS_SPIN_BEFORE_YIELD > count) { + ++count; + static constexpr std::chrono::microseconds us0{ 0 }; + std::this_thread::sleep_for( us0); +#if 0 + using namespace std::chrono_literals; + std::this_thread::sleep_for( 0ms); +#endif + } else { + std::this_thread::yield(); + } + } + } else { + // transaction aborted due: + // - internal buffer to track transactional state overflowed + // - debug exception or breakpoint exception was hit + // - abort during execution of nested transactions (max nesting limit exceeded) + // -> use fallback path + break; + } + } + splk_.lock(); + } + + bool try_lock() noexcept { + if ( rtm_status::success != rtm_begin() ) { + return false; + } + + // add lock to read-set + if ( spinlock_status::unlocked != splk_.state_.load( std::memory_order_relaxed) ) { + // lock was acquired by another thread + // explicit abort of transaction with abort argument 'lock not free' + rtm_abort_lock_not_free(); + } + return true; + } + + void unlock() noexcept { + if ( spinlock_status::unlocked == splk_.state_.load( std::memory_order_acquire) ) { + rtm_end(); + } else { + splk_.unlock(); + } + } +}; + +}}} + +#endif // BOOST_FIBERS_SPINLOCK_RTM_H diff --git a/boost/fiber/detail/spinlock_status.hpp b/boost/fiber/detail/spinlock_status.hpp new file mode 100644 index 0000000000..74f09e4acc --- /dev/null +++ b/boost/fiber/detail/spinlock_status.hpp @@ -0,0 +1,21 @@ + +// Copyright Oliver Kowalke 2017. +// Distributed under the Boost Software License, Version 1.0. +// (See accompanying file LICENSE_1_0.txt or copy at +// http://www.boost.org/LICENSE_1_0.txt) + +#ifndef BOOST_FIBERS_SPINLOCK_STATUS_H +#define BOOST_FIBERS_SPINLOCK_STATUS_H + +namespace boost { +namespace fibers { +namespace detail { + +enum class spinlock_status { + locked = 0, + unlocked +}; + +}}} + +#endif // BOOST_FIBERS_SPINLOCK_STATUS_H diff --git a/boost/fiber/detail/spinlock_ttas.hpp b/boost/fiber/detail/spinlock_ttas.hpp index 380773ad6d..f3302ed17e 100644 --- a/boost/fiber/detail/spinlock_ttas.hpp +++ b/boost/fiber/detail/spinlock_ttas.hpp @@ -9,41 +9,37 @@ #include <atomic> #include <chrono> +#include <cmath> #include <random> #include <thread> #include <boost/fiber/detail/config.hpp> #include <boost/fiber/detail/cpu_relax.hpp> +#include <boost/fiber/detail/spinlock_status.hpp> // based on informations from: // https://software.intel.com/en-us/articles/benefitting-power-and-performance-sleep-loops // https://software.intel.com/en-us/articles/long-duration-spin-wait-loops-on-hyper-threading-technology-enabled-intel-processors -#if BOOST_COMP_CLANG -#pragma clang diagnostic push -#pragma clang diagnostic ignored "-Wunused-private-field" -#endif - namespace boost { namespace fibers { namespace detail { class spinlock_ttas { private: - enum class spinlock_status { - locked = 0, - unlocked - }; + template< typename FBSplk > + friend class spinlock_rtm; - std::atomic< spinlock_status > state_{ spinlock_status::unlocked }; + std::atomic< spinlock_status > state_{ spinlock_status::unlocked }; public: - spinlock_ttas() noexcept = default; + spinlock_ttas() = default; spinlock_ttas( spinlock_ttas const&) = delete; spinlock_ttas & operator=( spinlock_ttas const&) = delete; void lock() noexcept { + static thread_local std::minstd_rand generator{ std::random_device{}() }; std::size_t collisions = 0 ; for (;;) { // avoid using multiple pause instructions for a delay of a specific cycle count @@ -51,7 +47,7 @@ public: // the cycle count can not guaranteed from one system to the next // -> check the shared variable 'state_' in between each cpu_relax() to prevent // unnecessarily long delays on some systems - std::size_t tests = 0; + std::size_t retries = 0; // test shared variable 'status_' // first access to 'state_' -> chache miss // sucessive acccess to 'state_' -> cache hit @@ -59,21 +55,26 @@ public: // cached 'state_' is invalidated -> cache miss while ( spinlock_status::locked == state_.load( std::memory_order_relaxed) ) { #if !defined(BOOST_FIBERS_SPIN_SINGLE_CORE) - if ( BOOST_FIBERS_SPIN_MAX_TESTS > tests) { - ++tests; + if ( BOOST_FIBERS_SPIN_BEFORE_SLEEP0 > retries) { + ++retries; // give CPU a hint that this thread is in a "spin-wait" loop // delays the next instruction's execution for a finite period of time (depends on processor family) // the CPU is not under demand, parts of the pipeline are no longer being used // -> reduces the power consumed by the CPU // -> prevent pipeline stalls cpu_relax(); - } else { + } else if ( BOOST_FIBERS_SPIN_BEFORE_YIELD > retries) { // std::this_thread::sleep_for( 0us) has a fairly long instruction path length, // combined with an expensive ring3 to ring 0 transition costing about 1000 cycles // std::this_thread::sleep_for( 0us) lets give up this_thread the remaining part of its time slice // if and only if a thread of equal or greater priority is ready to run static constexpr std::chrono::microseconds us0{ 0 }; std::this_thread::sleep_for( us0); + } else { + // std::this_thread::yield() allows this_thread to give up the remaining part of its time slice, + // but only to another thread on the same processor + // instead of constant checking, a thread only checks if no other useful work is pending + std::this_thread::yield(); } #else std::this_thread::yield(); @@ -85,8 +86,8 @@ public: // spinlock now contended // utilize 'Binary Exponential Backoff' algorithm // linear_congruential_engine is a random number engine based on Linear congruential generator (LCG) - static thread_local std::minstd_rand generator; - static std::uniform_int_distribution< std::size_t > distribution{ 0, static_cast< std::size_t >( 1) << collisions }; + std::uniform_int_distribution< std::size_t > distribution{ + 0, static_cast< std::size_t >( 1) << (std::min)(collisions, static_cast< std::size_t >( BOOST_FIBERS_CONTENTION_WINDOW_THRESHOLD)) }; const std::size_t z = distribution( generator); ++collisions; for ( std::size_t i = 0; i < z; ++i) { @@ -101,6 +102,10 @@ public: } } + bool try_lock() noexcept { + return spinlock_status::unlocked == state_.exchange( spinlock_status::locked, std::memory_order_acquire); + } + void unlock() noexcept { state_.store( spinlock_status::unlocked, std::memory_order_release); } @@ -108,8 +113,4 @@ public: }}} -#if BOOST_COMP_CLANG -#pragma clang diagnostic pop -#endif - #endif // BOOST_FIBERS_SPINLOCK_TTAS_H diff --git a/boost/fiber/detail/spinlock_ttas_adaptive.hpp b/boost/fiber/detail/spinlock_ttas_adaptive.hpp index da044b6298..d1f8b73cf3 100644 --- a/boost/fiber/detail/spinlock_ttas_adaptive.hpp +++ b/boost/fiber/detail/spinlock_ttas_adaptive.hpp @@ -15,6 +15,7 @@ #include <boost/fiber/detail/config.hpp> #include <boost/fiber/detail/cpu_relax.hpp> +#include <boost/fiber/detail/spinlock_status.hpp> // based on informations from: // https://software.intel.com/en-us/articles/benefitting-power-and-performance-sleep-loops @@ -26,26 +27,28 @@ namespace detail { class spinlock_ttas_adaptive { private: - enum class spinlock_status { - locked = 0, - unlocked - }; + template< typename FBSplk > + friend class spinlock_rtm; - std::atomic< spinlock_status > state_{ spinlock_status::unlocked }; - std::atomic< std::size_t > tests_{ 0 }; + std::atomic< spinlock_status > state_{ spinlock_status::unlocked }; + std::atomic< std::size_t > retries_{ 0 }; public: - spinlock_ttas_adaptive() noexcept = default; + spinlock_ttas_adaptive() = default; spinlock_ttas_adaptive( spinlock_ttas_adaptive const&) = delete; spinlock_ttas_adaptive & operator=( spinlock_ttas_adaptive const&) = delete; void lock() noexcept { + static thread_local std::minstd_rand generator{ std::random_device{}() }; std::size_t collisions = 0 ; for (;;) { - std::size_t tests = 0; - const std::size_t prev_tests = tests_.load( std::memory_order_relaxed); - const std::size_t max_tests = (std::min)( static_cast< std::size_t >( BOOST_FIBERS_SPIN_MAX_TESTS), 2 * prev_tests + 10); + std::size_t retries = 0; + const std::size_t prev_retries = retries_.load( std::memory_order_relaxed); + const std::size_t max_relax_retries = (std::min)( + static_cast< std::size_t >( BOOST_FIBERS_SPIN_BEFORE_SLEEP0), 2 * prev_retries + 10); + const std::size_t max_sleep_retries = (std::min)( + static_cast< std::size_t >( BOOST_FIBERS_SPIN_BEFORE_YIELD), 2 * prev_retries + 10); // avoid using multiple pause instructions for a delay of a specific cycle count // the delay of cpu_relax() (pause on Intel) depends on the processor family // the cycle count can not guaranteed from one system to the next @@ -58,22 +61,27 @@ public: // cached 'state_' is invalidated -> cache miss while ( spinlock_status::locked == state_.load( std::memory_order_relaxed) ) { #if !defined(BOOST_FIBERS_SPIN_SINGLE_CORE) - if ( max_tests > tests) { - ++tests; + if ( max_relax_retries > retries) { + ++retries; // give CPU a hint that this thread is in a "spin-wait" loop // delays the next instruction's execution for a finite period of time (depends on processor family) // the CPU is not under demand, parts of the pipeline are no longer being used // -> reduces the power consumed by the CPU // -> prevent pipeline stalls cpu_relax(); - } else { - ++tests; + } else if ( max_sleep_retries > retries) { + ++retries; // std::this_thread::sleep_for( 0us) has a fairly long instruction path length, // combined with an expensive ring3 to ring 0 transition costing about 1000 cycles // std::this_thread::sleep_for( 0us) lets give up this_thread the remaining part of its time slice // if and only if a thread of equal or greater priority is ready to run static constexpr std::chrono::microseconds us0{ 0 }; std::this_thread::sleep_for( us0); + } else { + // std::this_thread::yield() allows this_thread to give up the remaining part of its time slice, + // but only to another thread on the same processor + // instead of constant checking, a thread only checks if no other useful work is pending + std::this_thread::yield(); } #else std::this_thread::yield(); @@ -85,8 +93,8 @@ public: // spinlock now contended // utilize 'Binary Exponential Backoff' algorithm // linear_congruential_engine is a random number engine based on Linear congruential generator (LCG) - static thread_local std::minstd_rand generator; - static std::uniform_int_distribution< std::size_t > distribution{ 0, static_cast< std::size_t >( 1) << collisions }; + std::uniform_int_distribution< std::size_t > distribution{ + 0, static_cast< std::size_t >( 1) << (std::min)(collisions, static_cast< std::size_t >( BOOST_FIBERS_CONTENTION_WINDOW_THRESHOLD)) }; const std::size_t z = distribution( generator); ++collisions; for ( std::size_t i = 0; i < z; ++i) { @@ -95,13 +103,17 @@ public: cpu_relax(); } } else { - tests_.store( prev_tests + (tests - prev_tests) / 8, std::memory_order_relaxed); + retries_.store( prev_retries + (retries - prev_retries) / 8, std::memory_order_relaxed); // success, thread has acquired the lock break; } } } + bool try_lock() noexcept { + return spinlock_status::unlocked == state_.exchange( spinlock_status::locked, std::memory_order_acquire); + } + void unlock() noexcept { state_.store( spinlock_status::unlocked, std::memory_order_release); } diff --git a/boost/fiber/detail/spinlock_ttas_adaptive_futex.hpp b/boost/fiber/detail/spinlock_ttas_adaptive_futex.hpp index 61ab47691e..0f0b191e67 100644 --- a/boost/fiber/detail/spinlock_ttas_adaptive_futex.hpp +++ b/boost/fiber/detail/spinlock_ttas_adaptive_futex.hpp @@ -26,21 +26,28 @@ namespace detail { class spinlock_ttas_adaptive_futex { private: - std::atomic< std::int32_t > value_{ 0 }; - std::atomic< std::int32_t > tests_{ 0 }; + template< typename FBSplk > + friend class spinlock_rtm; + + std::atomic< std::int32_t > value_{ 0 }; + std::atomic< std::int32_t > retries_{ 0 }; public: - spinlock_ttas_adaptive_futex() noexcept = default; + spinlock_ttas_adaptive_futex() = default; spinlock_ttas_adaptive_futex( spinlock_ttas_adaptive_futex const&) = delete; spinlock_ttas_adaptive_futex & operator=( spinlock_ttas_adaptive_futex const&) = delete; void lock() noexcept { - std::int32_t collisions = 0, tests = 0, expected = 0; - const std::int32_t prev_tests = tests_.load( std::memory_order_relaxed); - const std::int32_t max_tests = (std::min)( static_cast< std::int32_t >( BOOST_FIBERS_SPIN_MAX_TESTS), 2 * prev_tests + 10); + static thread_local std::minstd_rand generator{ std::random_device{}() }; + std::int32_t collisions = 0, retries = 0, expected = 0; + const std::int32_t prev_retries = retries_.load( std::memory_order_relaxed); + const std::int32_t max_relax_retries = (std::min)( + static_cast< std::int32_t >( BOOST_FIBERS_SPIN_BEFORE_SLEEP0), 2 * prev_retries + 10); + const std::int32_t max_sleep_retries = (std::min)( + static_cast< std::int32_t >( BOOST_FIBERS_SPIN_BEFORE_YIELD), 2 * prev_retries + 10); // after max. spins or collisions suspend via futex - while ( max_tests > tests && BOOST_FIBERS_SPIN_MAX_COLLISIONS > collisions) { + while ( retries++ < BOOST_FIBERS_RETRY_THRESHOLD) { // avoid using multiple pause instructions for a delay of a specific cycle count // the delay of cpu_relax() (pause on Intel) depends on the processor family // the cycle count can not guaranteed from one system to the next @@ -52,26 +59,39 @@ public: // if 'value_' was released by other fiber // cached 'value_' is invalidated -> cache miss if ( 0 != ( expected = value_.load( std::memory_order_relaxed) ) ) { - ++tests; #if !defined(BOOST_FIBERS_SPIN_SINGLE_CORE) - // give CPU a hint that this thread is in a "spin-wait" loop - // delays the next instruction's execution for a finite period of time (depends on processor family) - // the CPU is not under demand, parts of the pipeline are no longer being used - // -> reduces the power consumed by the CPU - // -> prevent pipeline stalls - cpu_relax(); + if ( max_relax_retries > retries) { + // give CPU a hint that this thread is in a "spin-wait" loop + // delays the next instruction's execution for a finite period of time (depends on processor family) + // the CPU is not under demand, parts of the pipeline are no longer being used + // -> reduces the power consumed by the CPU + // -> prevent pipeline stalls + cpu_relax(); + } else if ( max_sleep_retries > retries) { + // std::this_thread::sleep_for( 0us) has a fairly long instruction path length, + // combined with an expensive ring3 to ring 0 transition costing about 1000 cycles + // std::this_thread::sleep_for( 0us) lets give up this_thread the remaining part of its time slice + // if and only if a thread of equal or greater priority is ready to run + static constexpr std::chrono::microseconds us0{ 0 }; + std::this_thread::sleep_for( us0); + } else { + // std::this_thread::yield() allows this_thread to give up the remaining part of its time slice, + // but only to another thread on the same processor + // instead of constant checking, a thread only checks if no other useful work is pending + std::this_thread::yield(); + } #else // std::this_thread::yield() allows this_thread to give up the remaining part of its time slice, // but only to another thread on the same processor // instead of constant checking, a thread only checks if no other useful work is pending std::this_thread::yield(); #endif - } else if ( ! value_.compare_exchange_strong( expected, 1, std::memory_order_acquire, std::memory_order_release) ) { + } else if ( ! value_.compare_exchange_strong( expected, 1, std::memory_order_acquire) ) { // spinlock now contended // utilize 'Binary Exponential Backoff' algorithm // linear_congruential_engine is a random number engine based on Linear congruential generator (LCG) - static thread_local std::minstd_rand generator; - static std::uniform_int_distribution< std::int32_t > distribution{ 0, static_cast< std::int32_t >( 1) << collisions }; + std::uniform_int_distribution< std::int32_t > distribution{ + 0, static_cast< std::int32_t >( 1) << (std::min)(collisions, static_cast< std::int32_t >( BOOST_FIBERS_CONTENTION_WINDOW_THRESHOLD)) }; const std::int32_t z = distribution( generator); ++collisions; for ( std::int32_t i = 0; i < z; ++i) { @@ -81,7 +101,7 @@ public: } } else { // success, lock acquired - tests_.store( prev_tests + (tests - prev_tests) / 8, std::memory_order_relaxed); + retries_.store( prev_retries + (retries - prev_retries) / 8, std::memory_order_relaxed); return; } } @@ -95,7 +115,12 @@ public: expected = value_.exchange( 2, std::memory_order_acquire); } // success, lock acquired - tests_.store( prev_tests + (tests - prev_tests) / 8, std::memory_order_relaxed); + retries_.store( prev_retries + (retries - prev_retries) / 8, std::memory_order_relaxed); + } + + bool try_lock() noexcept { + std::int32_t expected = 0; + return value_.compare_exchange_strong( expected, 1, std::memory_order_acquire); } void unlock() noexcept { diff --git a/boost/fiber/detail/spinlock_ttas_futex.hpp b/boost/fiber/detail/spinlock_ttas_futex.hpp index a427b73ba5..fd30c4120e 100644 --- a/boost/fiber/detail/spinlock_ttas_futex.hpp +++ b/boost/fiber/detail/spinlock_ttas_futex.hpp @@ -8,6 +8,7 @@ #define BOOST_FIBERS_spinlock_ttas_futex_FUTEX_H #include <atomic> +#include <cmath> #include <random> #include <thread> @@ -25,18 +26,22 @@ namespace detail { class spinlock_ttas_futex { private: - std::atomic< std::int32_t > value_{ 0 }; + template< typename FBSplk > + friend class spinlock_rtm; + + std::atomic< std::int32_t > value_{ 0 }; public: - spinlock_ttas_futex() noexcept = default; + spinlock_ttas_futex() = default; spinlock_ttas_futex( spinlock_ttas_futex const&) = delete; spinlock_ttas_futex & operator=( spinlock_ttas_futex const&) = delete; void lock() noexcept { - std::int32_t collisions = 0, tests = 0, expected = 0; + static thread_local std::minstd_rand generator{ std::random_device{}() }; + std::int32_t collisions = 0, retries = 0, expected = 0; // after max. spins or collisions suspend via futex - while ( BOOST_FIBERS_SPIN_MAX_TESTS > tests && BOOST_FIBERS_SPIN_MAX_COLLISIONS > collisions) { + while ( retries++ < BOOST_FIBERS_RETRY_THRESHOLD) { // avoid using multiple pause instructions for a delay of a specific cycle count // the delay of cpu_relax() (pause on Intel) depends on the processor family // the cycle count can not guaranteed from one system to the next @@ -48,26 +53,39 @@ public: // if 'value_' was released by other fiber // cached 'value_' is invalidated -> cache miss if ( 0 != ( expected = value_.load( std::memory_order_relaxed) ) ) { - ++tests; #if !defined(BOOST_FIBERS_SPIN_SINGLE_CORE) - // give CPU a hint that this thread is in a "spin-wait" loop - // delays the next instruction's execution for a finite period of time (depends on processor family) - // the CPU is not under demand, parts of the pipeline are no longer being used - // -> reduces the power consumed by the CPU - // -> prevent pipeline stalls - cpu_relax(); + if ( BOOST_FIBERS_SPIN_BEFORE_SLEEP0 > retries) { + // give CPU a hint that this thread is in a "spin-wait" loop + // delays the next instruction's execution for a finite period of time (depends on processor family) + // the CPU is not under demand, parts of the pipeline are no longer being used + // -> reduces the power consumed by the CPU + // -> prevent pipeline stalls + cpu_relax(); + } else if ( BOOST_FIBERS_SPIN_BEFORE_YIELD > retries) { + // std::this_thread::sleep_for( 0us) has a fairly long instruction path length, + // combined with an expensive ring3 to ring 0 transition costing about 1000 cycles + // std::this_thread::sleep_for( 0us) lets give up this_thread the remaining part of its time slice + // if and only if a thread of equal or greater priority is ready to run + static constexpr std::chrono::microseconds us0{ 0 }; + std::this_thread::sleep_for( us0); + } else { + // std::this_thread::yield() allows this_thread to give up the remaining part of its time slice, + // but only to another thread on the same processor + // instead of constant checking, a thread only checks if no other useful work is pending + std::this_thread::yield(); + } #else // std::this_thread::yield() allows this_thread to give up the remaining part of its time slice, // but only to another thread on the same processor // instead of constant checking, a thread only checks if no other useful work is pending std::this_thread::yield(); #endif - } else if ( ! value_.compare_exchange_strong( expected, 1, std::memory_order_acquire, std::memory_order_release) ) { + } else if ( ! value_.compare_exchange_strong( expected, 1, std::memory_order_acquire) ) { // spinlock now contended // utilize 'Binary Exponential Backoff' algorithm // linear_congruential_engine is a random number engine based on Linear congruential generator (LCG) - static thread_local std::minstd_rand generator; - static std::uniform_int_distribution< std::int32_t > distribution{ 0, static_cast< std::int32_t >( 1) << collisions }; + std::uniform_int_distribution< std::int32_t > distribution{ + 0, static_cast< std::int32_t >( 1) << (std::min)(collisions, static_cast< std::int32_t >( BOOST_FIBERS_CONTENTION_WINDOW_THRESHOLD)) }; const std::int32_t z = distribution( generator); ++collisions; for ( std::int32_t i = 0; i < z; ++i) { @@ -91,6 +109,11 @@ public: } } + bool try_lock() noexcept { + std::int32_t expected = 0; + return value_.compare_exchange_strong( expected, 1, std::memory_order_acquire); + } + void unlock() noexcept { if ( 1 != value_.fetch_sub( 1, std::memory_order_acquire) ) { value_.store( 0, std::memory_order_release); diff --git a/boost/fiber/detail/wrap.hpp b/boost/fiber/detail/wrap.hpp deleted file mode 100644 index 558de6bd94..0000000000 --- a/boost/fiber/detail/wrap.hpp +++ /dev/null @@ -1,131 +0,0 @@ - -// Copyright Oliver Kowalke 2014. -// Distributed under the Boost Software License, Version 1.0. -// (See accompanying file LICENSE_1_0.txt or copy at -// http://www.boost.org/LICENSE_1_0.txt) - -#ifndef BOOST_FIBER_DETAIL_WRAP_H -#define BOOST_FIBER_DETAIL_WRAP_H - -#include <type_traits> - -#include <boost/config.hpp> -#if defined(BOOST_NO_CXX17_STD_INVOKE) -#include <boost/context/detail/invoke.hpp> -#endif -#if (BOOST_EXECUTION_CONTEXT==1) -# include <boost/context/execution_context.hpp> -#else -# include <boost/context/continuation.hpp> -#endif - -#include <boost/fiber/detail/config.hpp> -#include <boost/fiber/detail/data.hpp> - -#ifdef BOOST_HAS_ABI_HEADERS -# include BOOST_ABI_PREFIX -#endif - -namespace boost { -namespace fibers { -namespace detail { - -#if (BOOST_EXECUTION_CONTEXT==1) -template< typename Fn1, typename Fn2, typename Tpl > -class wrapper { -private: - typename std::decay< Fn1 >::type fn1_; - typename std::decay< Fn2 >::type fn2_; - typename std::decay< Tpl >::type tpl_; - boost::context::execution_context ctx_; - -public: - wrapper( Fn1 && fn1, Fn2 && fn2, Tpl && tpl, - boost::context::execution_context const& ctx) : - fn1_{ std::move( fn1) }, - fn2_{ std::move( fn2) }, - tpl_{ std::move( tpl) }, - ctx_{ ctx } { - } - - wrapper( wrapper const&) = delete; - wrapper & operator=( wrapper const&) = delete; - - wrapper( wrapper && other) = default; - wrapper & operator=( wrapper && other) = default; - - void operator()( void * vp) { -#if defined(BOOST_NO_CXX17_STD_INVOKE) - boost::context::detail::invoke( std::move( fn1_), fn2_, tpl_, ctx_, vp); -#else - std::invoke( std::move( fn1_), fn2_, tpl_, ctx_, vp); -#endif - } -}; - -template< typename Fn1, typename Fn2, typename Tpl > -wrapper< Fn1, Fn2, Tpl > -wrap( Fn1 && fn1, Fn2 && fn2, Tpl && tpl, - boost::context::execution_context const& ctx) { - return wrapper< Fn1, Fn2, Tpl >{ - std::forward< Fn1 >( fn1), - std::forward< Fn2 >( fn2), - std::forward< Tpl >( tpl), - ctx }; -} -#else -template< typename Fn1, typename Fn2, typename Tpl > -class wrapper { -private: - typename std::decay< Fn1 >::type fn1_; - typename std::decay< Fn2 >::type fn2_; - typename std::decay< Tpl >::type tpl_; - -public: - wrapper( Fn1 && fn1, Fn2 && fn2, Tpl && tpl) : - fn1_{ std::move( fn1) }, - fn2_{ std::move( fn2) }, - tpl_{ std::move( tpl) } { - } - - wrapper( wrapper const&) = delete; - wrapper & operator=( wrapper const&) = delete; - - wrapper( wrapper && other) = default; - wrapper & operator=( wrapper && other) = default; - - boost::context::continuation - operator()( boost::context::continuation && c) { -#if defined(BOOST_NO_CXX17_STD_INVOKE) - return boost::context::detail::invoke( - std::move( fn1_), - fn2_, - tpl_, - std::forward< boost::context::continuation >( c) ); -#else - return std::invoke( - std::move( fn1_), - fn2_, - tpl_, - std::forward< boost::context::continuation >( c) ); -#endif - } -}; - -template< typename Fn1, typename Fn2, typename Tpl > -wrapper< Fn1, Fn2, Tpl > -wrap( Fn1 && fn1, Fn2 && fn2, Tpl && tpl) { - return wrapper< Fn1, Fn2, Tpl >{ - std::forward< Fn1 >( fn1), - std::forward< Fn2 >( fn2), - std::forward< Tpl >( tpl) }; -} -#endif - -}}} - -#ifdef BOOST_HAS_ABI_HEADERS -#include BOOST_ABI_SUFFIX -#endif - -#endif // BOOST_FIBER_DETAIL_WRAP_H diff --git a/boost/fiber/fiber.hpp b/boost/fiber/fiber.hpp index 1508a9b5a6..eb10487c66 100644 --- a/boost/fiber/fiber.hpp +++ b/boost/fiber/fiber.hpp @@ -15,6 +15,7 @@ #include <boost/assert.hpp> #include <boost/config.hpp> #include <boost/intrusive_ptr.hpp> +#include <boost/predef.h> #include <boost/fiber/detail/config.hpp> #include <boost/fiber/detail/disable_overload.hpp> @@ -52,41 +53,59 @@ public: fiber() = default; template< typename Fn, - typename ... Args, - typename = detail::disable_overload< fiber, Fn > + typename ... Arg, + typename = detail::disable_overload< fiber, Fn >, + typename = detail::disable_overload< launch, Fn >, + typename = detail::disable_overload< std::allocator_arg_t, Fn > > - fiber( Fn && fn, Args && ... args) : +#if BOOST_COMP_GNUC < 50000000 + fiber( Fn && fn, Arg && ... arg) : +#else + fiber( Fn && fn, Arg ... arg) : +#endif fiber{ launch::post, std::allocator_arg, default_stack(), - std::forward< Fn >( fn), std::forward< Args >( args) ... } { + std::forward< Fn >( fn), std::forward< Arg >( arg) ... } { } template< typename Fn, - typename ... Args, + typename ... Arg, typename = detail::disable_overload< fiber, Fn > > - fiber( launch policy, Fn && fn, Args && ... args) : +#if BOOST_COMP_GNUC < 50000000 + fiber( launch policy, Fn && fn, Arg && ... arg) : +#else + fiber( launch policy, Fn && fn, Arg ... arg) : +#endif fiber{ policy, std::allocator_arg, default_stack(), - std::forward< Fn >( fn), std::forward< Args >( args) ... } { + std::forward< Fn >( fn), std::forward< Arg >( arg) ... } { } template< typename StackAllocator, typename Fn, - typename ... Args + typename ... Arg > - fiber( std::allocator_arg_t, StackAllocator salloc, Fn && fn, Args && ... args) : +#if BOOST_COMP_GNUC < 50000000 + fiber( std::allocator_arg_t, StackAllocator salloc, Fn && fn, Arg && ... arg) : +#else + fiber( std::allocator_arg_t, StackAllocator salloc, Fn && fn, Arg ... arg) : +#endif fiber{ launch::post, std::allocator_arg, salloc, - std::forward< Fn >( fn), std::forward< Args >( args) ... } { + std::forward< Fn >( fn), std::forward< Arg >( arg) ... } { } template< typename StackAllocator, typename Fn, - typename ... Args + typename ... Arg > - fiber( launch policy, std::allocator_arg_t, StackAllocator salloc, Fn && fn, Args && ... args) : - impl_{ make_worker_context( policy, salloc, std::forward< Fn >( fn), std::forward< Args >( args) ... ) } { +#if BOOST_COMP_GNUC < 50000000 + fiber( launch policy, std::allocator_arg_t, StackAllocator salloc, Fn && fn, Arg && ... arg) : +#else + fiber( launch policy, std::allocator_arg_t, StackAllocator salloc, Fn && fn, Arg ... arg) : +#endif + impl_{ make_worker_context( policy, salloc, std::forward< Fn >( fn), std::forward< Arg >( arg) ... ) } { start_(); } @@ -108,7 +127,7 @@ public: if ( joinable() ) { std::terminate(); } - if ( this == & other) { + if ( BOOST_UNLIKELY( this == & other) ) { return * this; } impl_.swap( other.impl_); diff --git a/boost/fiber/fss.hpp b/boost/fiber/fss.hpp index f65d7353b3..0a321bb661 100644 --- a/boost/fiber/fss.hpp +++ b/boost/fiber/fss.hpp @@ -37,8 +37,8 @@ private: fn{ fn_ } { } - void operator()( void* data) { - if ( fn) { + void operator()( void * data) { + if ( BOOST_LIKELY( nullptr != fn) ) { fn( static_cast< T * >( data) ); } } @@ -91,7 +91,7 @@ public: void reset( T * t) { T * c = get(); - if ( c != t) { + if ( BOOST_LIKELY( c != t) ) { context::active()->set_fss_data( this, cleanup_fn_, t, true); } diff --git a/boost/fiber/future/async.hpp b/boost/fiber/future/async.hpp index e68b2c28fa..9601e8cdfe 100644 --- a/boost/fiber/future/async.hpp +++ b/boost/fiber/future/async.hpp @@ -30,7 +30,7 @@ future< >::type( typename std::decay< Args >::type ... ) >::type > -async( Fn && fn, Args && ... args) { +async( Fn && fn, Args ... args) { typedef typename std::result_of< typename std::decay< Fn >::type( typename std::decay< Args >::type ... ) >::type result_type; @@ -51,7 +51,7 @@ future< >::type( typename std::decay< Args >::type ...) >::type > -async( Policy policy, Fn && fn, Args && ... args) { +async( Policy policy, Fn && fn, Args ... args) { typedef typename std::result_of< typename std::decay< Fn >::type( typename std::decay< Args >::type ... ) >::type result_type; @@ -72,7 +72,7 @@ future< >::type( typename std::decay< Args >::type ... ) >::type > -async( Policy policy, std::allocator_arg_t, StackAllocator salloc, Fn && fn, Args && ... args) { +async( Policy policy, std::allocator_arg_t, StackAllocator salloc, Fn && fn, Args ... args) { typedef typename std::result_of< typename std::decay< Fn >::type( typename std::decay< Args >::type ... ) >::type result_type; @@ -94,7 +94,7 @@ future< >::type( typename std::decay< Args >::type ... ) >::type > -async( Policy policy, std::allocator_arg_t, StackAllocator salloc, Allocator alloc, Fn && fn, Args && ... args) { +async( Policy policy, std::allocator_arg_t, StackAllocator salloc, Allocator alloc, Fn && fn, Args ... args) { typedef typename std::result_of< typename std::decay< Fn >::type( typename std::decay< Args >::type ... ) >::type result_type; diff --git a/boost/fiber/future/detail/shared_state.hpp b/boost/fiber/future/detail/shared_state.hpp index 898fdaffd4..28e0e3cfb9 100644 --- a/boost/fiber/future/detail/shared_state.hpp +++ b/boost/fiber/future/detail/shared_state.hpp @@ -62,7 +62,7 @@ protected: void set_exception_( std::exception_ptr except, std::unique_lock< mutex > & lk) { BOOST_ASSERT( lk.owns_lock() ); - if ( ready_) { + if ( BOOST_UNLIKELY( ready_) ) { throw promise_already_satisfied(); } except_ = except; @@ -161,19 +161,19 @@ private: void set_value_( R const& value, std::unique_lock< mutex > & lk) { BOOST_ASSERT( lk.owns_lock() ); - if ( ready_) { + if ( BOOST_UNLIKELY( ready_) ) { throw promise_already_satisfied{}; } - ::new ( static_cast< void * >( std::addressof( storage_) ) ) R{ value }; + ::new ( static_cast< void * >( std::addressof( storage_) ) ) R( value ); mark_ready_and_notify_( lk); } void set_value_( R && value, std::unique_lock< mutex > & lk) { BOOST_ASSERT( lk.owns_lock() ); - if ( ready_) { + if ( BOOST_UNLIKELY( ready_) ) { throw promise_already_satisfied{}; } - ::new ( static_cast< void * >( std::addressof( storage_) ) ) R{ std::move( value) }; + ::new ( static_cast< void * >( std::addressof( storage_) ) ) R( std::move( value) ); mark_ready_and_notify_( lk); } @@ -223,7 +223,7 @@ private: void set_value_( R & value, std::unique_lock< mutex > & lk) { BOOST_ASSERT( lk.owns_lock() ); - if ( ready_) { + if ( BOOST_UNLIKELY( ready_) ) { throw promise_already_satisfied(); } value_ = std::addressof( value); @@ -266,7 +266,7 @@ private: inline void set_value_( std::unique_lock< mutex > & lk) { BOOST_ASSERT( lk.owns_lock() ); - if ( ready_) { + if ( BOOST_UNLIKELY( ready_) ) { throw promise_already_satisfied(); } mark_ready_and_notify_( lk); diff --git a/boost/fiber/future/detail/task_object.hpp b/boost/fiber/future/detail/task_object.hpp index 3a48929a58..abb4c8d877 100644 --- a/boost/fiber/future/detail/task_object.hpp +++ b/boost/fiber/future/detail/task_object.hpp @@ -16,6 +16,7 @@ #if defined(BOOST_NO_CXX17_STD_APPLY) #include <boost/context/detail/apply.hpp> #endif +#include <boost/core/pointer_traits.hpp> #include <boost/fiber/detail/config.hpp> #include <boost/fiber/future/detail/task_base.hpp> @@ -68,15 +69,17 @@ public: typename base_type::ptr_type reset() override final { typedef std::allocator_traits< allocator_type > traity_type; + typedef pointer_traits< typename traity_type::pointer> ptrait_type; typename traity_type::pointer ptr{ traity_type::allocate( alloc_, 1) }; + typename ptrait_type::element_type* p = ptrait_type::to_address(ptr); try { - traity_type::construct( alloc_, ptr, alloc_, std::move( fn_) ); + traity_type::construct( alloc_, p, alloc_, std::move( fn_) ); } catch (...) { traity_type::deallocate( alloc_, ptr, 1); throw; } - return { convert( ptr) }; + return { p }; } protected: @@ -134,15 +137,17 @@ public: typename base_type::ptr_type reset() override final { typedef std::allocator_traits< allocator_type > traity_type; + typedef pointer_traits< typename traity_type::pointer> ptrait_type; typename traity_type::pointer ptr{ traity_type::allocate( alloc_, 1) }; + typename ptrait_type::element_type* p = ptrait_type::to_address(ptr); try { - traity_type::construct( alloc_, ptr, alloc_, std::move( fn_) ); + traity_type::construct( alloc_, p, alloc_, std::move( fn_) ); } catch (...) { traity_type::deallocate( alloc_, ptr, 1); throw; } - return { convert( ptr) }; + return { p }; } protected: diff --git a/boost/fiber/future/future.hpp b/boost/fiber/future/future.hpp index 5d4ad78ab5..2191080d69 100644 --- a/boost/fiber/future/future.hpp +++ b/boost/fiber/future/future.hpp @@ -46,14 +46,14 @@ struct future_base { } future_base & operator=( future_base const& other) noexcept { - if ( this != & other) { + if ( BOOST_LIKELY( this != & other) ) { state_ = other.state_; } return * this; } future_base & operator=( future_base && other) noexcept { - if ( this != & other) { + if ( BOOST_LIKELY( this != & other) ) { state_ = other.state_; other.state_.reset(); } @@ -65,14 +65,14 @@ struct future_base { } std::exception_ptr get_exception_ptr() { - if ( ! valid() ) { + if ( BOOST_UNLIKELY( ! valid() ) ) { throw future_uninitialized{}; } return state_->get_exception_ptr(); } void wait() const { - if ( ! valid() ) { + if ( BOOST_UNLIKELY( ! valid() ) ) { throw future_uninitialized{}; } state_->wait(); @@ -80,7 +80,7 @@ struct future_base { template< typename Rep, typename Period > future_status wait_for( std::chrono::duration< Rep, Period > const& timeout_duration) const { - if ( ! valid() ) { + if ( BOOST_UNLIKELY( ! valid() ) ) { throw future_uninitialized{}; } return state_->wait_for( timeout_duration); @@ -88,7 +88,7 @@ struct future_base { template< typename Clock, typename Duration > future_status wait_until( std::chrono::time_point< Clock, Duration > const& timeout_time) const { - if ( ! valid() ) { + if ( BOOST_UNLIKELY( ! valid() ) ) { throw future_uninitialized{}; } return state_->wait_until( timeout_time); @@ -131,7 +131,7 @@ public: } future & operator=( future && other) noexcept { - if ( this != & other) { + if ( BOOST_LIKELY( this != & other) ) { base_type::operator=( std::move( other) ); } return * this; @@ -140,7 +140,7 @@ public: shared_future< R > share(); R get() { - if ( ! base_type::valid() ) { + if ( BOOST_UNLIKELY( ! base_type::valid() ) ) { throw future_uninitialized{}; } typename base_type::ptr_type tmp{}; @@ -180,7 +180,7 @@ public: } future & operator=( future && other) noexcept { - if ( this != & other) { + if ( BOOST_LIKELY( this != & other) ) { base_type::operator=( std::move( other) ); } return * this; @@ -189,7 +189,7 @@ public: shared_future< R & > share(); R & get() { - if ( ! base_type::valid() ) { + if ( BOOST_UNLIKELY( ! base_type::valid() ) ) { throw future_uninitialized{}; } typename base_type::ptr_type tmp{}; @@ -231,7 +231,7 @@ public: inline future & operator=( future && other) noexcept { - if ( this != & other) { + if ( BOOST_LIKELY( this != & other) ) { base_type::operator=( std::move( other) ); } return * this; @@ -241,7 +241,7 @@ public: inline void get() { - if ( ! base_type::valid() ) { + if ( BOOST_UNLIKELY( ! base_type::valid() ) ) { throw future_uninitialized{}; } base_type::ptr_type tmp{}; @@ -284,14 +284,14 @@ public: } shared_future & operator=( shared_future const& other) noexcept { - if ( this != & other) { + if ( BOOST_LIKELY( this != & other) ) { base_type::operator=( other); } return * this; } shared_future & operator=( shared_future && other) noexcept { - if ( this != & other) { + if ( BOOST_LIKELY( this != & other) ) { base_type::operator=( std::move( other) ); } return * this; @@ -303,7 +303,7 @@ public: } R const& get() const { - if ( ! valid() ) { + if ( BOOST_UNLIKELY( ! valid() ) ) { throw future_uninitialized{}; } return base_type::state_->get(); @@ -343,14 +343,14 @@ public: } shared_future & operator=( shared_future const& other) noexcept { - if ( this != & other) { + if ( BOOST_LIKELY( this != & other) ) { base_type::operator=( other); } return * this; } shared_future & operator=( shared_future && other) noexcept { - if ( this != & other) { + if ( BOOST_LIKELY( this != & other) ) { base_type::operator=( std::move( other) ); } return * this; @@ -362,7 +362,7 @@ public: } R & get() const { - if ( ! valid() ) { + if ( BOOST_UNLIKELY( ! valid() ) ) { throw future_uninitialized{}; } return base_type::state_->get(); @@ -406,7 +406,7 @@ public: inline shared_future & operator=( shared_future const& other) noexcept { - if ( this != & other) { + if ( BOOST_LIKELY( this != & other) ) { base_type::operator=( other); } return * this; @@ -414,7 +414,7 @@ public: inline shared_future & operator=( shared_future && other) noexcept { - if ( this != & other) { + if ( BOOST_LIKELY( this != & other) ) { base_type::operator=( std::move( other) ); } return * this; @@ -428,7 +428,7 @@ public: inline void get() const { - if ( ! valid() ) { + if ( BOOST_UNLIKELY( ! valid() ) ) { throw future_uninitialized{}; } base_type::state_->get(); @@ -445,7 +445,7 @@ public: template< typename R > shared_future< R > future< R >::share() { - if ( ! base_type::valid() ) { + if ( BOOST_UNLIKELY( ! base_type::valid() ) ) { throw future_uninitialized{}; } return shared_future< R >{ std::move( * this) }; @@ -454,7 +454,7 @@ future< R >::share() { template< typename R > shared_future< R & > future< R & >::share() { - if ( ! base_type::valid() ) { + if ( BOOST_UNLIKELY( ! base_type::valid() ) ) { throw future_uninitialized{}; } return shared_future< R & >{ std::move( * this) }; @@ -463,7 +463,7 @@ future< R & >::share() { inline shared_future< void > future< void >::share() { - if ( ! base_type::valid() ) { + if ( BOOST_UNLIKELY( ! base_type::valid() ) ) { throw future_uninitialized{}; } return shared_future< void >{ std::move( * this) }; diff --git a/boost/fiber/future/packaged_task.hpp b/boost/fiber/future/packaged_task.hpp index 31838ee41f..c8b10d43e0 100644 --- a/boost/fiber/future/packaged_task.hpp +++ b/boost/fiber/future/packaged_task.hpp @@ -14,7 +14,6 @@ #include <boost/config.hpp> -#include <boost/fiber/detail/convert.hpp> #include <boost/fiber/detail/disable_overload.hpp> #include <boost/fiber/exceptions.hpp> #include <boost/fiber/future/detail/task_base.hpp> @@ -57,16 +56,18 @@ public: typedef std::allocator_traits< typename object_type::allocator_type > traits_type; + typedef pointer_traits< typename traits_type::pointer > ptrait_type; typename object_type::allocator_type a{ alloc }; typename traits_type::pointer ptr{ traits_type::allocate( a, 1) }; + typename ptrait_type::element_type* p = ptrait_type::to_address(ptr); try { - traits_type::construct( a, ptr, a, std::forward< Fn >( fn) ); + traits_type::construct( a, p, a, std::forward< Fn >( fn) ); } catch (...) { traits_type::deallocate( a, ptr, 1); throw; } - task_.reset( convert( ptr) ); + task_.reset(p); } ~packaged_task() { @@ -85,7 +86,7 @@ public: } packaged_task & operator=( packaged_task && other) noexcept { - if ( this != & other) { + if ( BOOST_LIKELY( this != & other) ) { packaged_task tmp{ std::move( other) }; swap( tmp); } @@ -105,7 +106,7 @@ public: if ( obtained_) { throw future_already_retrieved{}; } - if ( ! valid() ) { + if ( BOOST_UNLIKELY( ! valid() ) ) { throw packaged_task_uninitialized{}; } obtained_ = true; @@ -114,14 +115,14 @@ public: } void operator()( Args ... args) { - if ( ! valid() ) { + if ( BOOST_UNLIKELY( ! valid() ) ) { throw packaged_task_uninitialized{}; } task_->run( std::forward< Args >( args) ... ); } void reset() { - if ( ! valid() ) { + if ( BOOST_UNLIKELY( ! valid() ) ) { throw packaged_task_uninitialized{}; } packaged_task tmp; diff --git a/boost/fiber/future/promise.hpp b/boost/fiber/future/promise.hpp index 83ba63fb23..661c8b0480 100644 --- a/boost/fiber/future/promise.hpp +++ b/boost/fiber/future/promise.hpp @@ -12,8 +12,8 @@ #include <utility> #include <boost/config.hpp> +#include <boost/core/pointer_traits.hpp> -#include <boost/fiber/detail/convert.hpp> #include <boost/fiber/exceptions.hpp> #include <boost/fiber/future/detail/shared_state.hpp> #include <boost/fiber/future/detail/shared_state_object.hpp> @@ -38,16 +38,18 @@ struct promise_base { promise_base( std::allocator_arg_t, Allocator alloc) { typedef detail::shared_state_object< R, Allocator > object_type; typedef std::allocator_traits< typename object_type::allocator_type > traits_type; + typedef pointer_traits< typename traits_type::pointer > ptrait_type; typename object_type::allocator_type a{ alloc }; typename traits_type::pointer ptr{ traits_type::allocate( a, 1) }; + typename ptrait_type::element_type* p = ptrait_type::to_address(ptr); try { - traits_type::construct( a, ptr, a); + traits_type::construct( a, p, a); } catch (...) { traits_type::deallocate( a, ptr, 1); throw; } - future_.reset( convert( ptr) ); + future_.reset(p); } ~promise_base() { @@ -66,7 +68,7 @@ struct promise_base { } promise_base & operator=( promise_base && other) noexcept { - if ( this != & other) { + if ( BOOST_LIKELY( this != & other) ) { promise_base tmp{ std::move( other) }; swap( tmp); } @@ -74,10 +76,10 @@ struct promise_base { } future< R > get_future() { - if ( obtained_) { + if ( BOOST_UNLIKELY( obtained_) ) { throw future_already_retrieved{}; } - if ( ! future_) { + if ( BOOST_UNLIKELY( ! future_) ) { throw promise_uninitialized{}; } obtained_ = true; @@ -90,7 +92,7 @@ struct promise_base { } void set_exception( std::exception_ptr p) { - if ( ! future_) { + if ( BOOST_UNLIKELY( ! future_) ) { throw promise_uninitialized{}; } future_->set_exception( p); @@ -119,14 +121,14 @@ public: promise & operator=( promise && other) = default; void set_value( R const& value) { - if ( ! base_type::future_) { + if ( BOOST_UNLIKELY( ! base_type::future_) ) { throw promise_uninitialized{}; } base_type::future_->set_value( value); } void set_value( R && value) { - if ( ! base_type::future_) { + if ( BOOST_UNLIKELY( ! base_type::future_) ) { throw promise_uninitialized{}; } base_type::future_->set_value( std::move( value) ); @@ -160,7 +162,7 @@ public: promise & operator=( promise && other) noexcept = default; void set_value( R & value) { - if ( ! base_type::future_) { + if ( BOOST_UNLIKELY( ! base_type::future_) ) { throw promise_uninitialized{}; } base_type::future_->set_value( value); @@ -195,7 +197,7 @@ public: inline void set_value() { - if ( ! base_type::future_) { + if ( BOOST_UNLIKELY( ! base_type::future_) ) { throw promise_uninitialized{}; } base_type::future_->set_value(); diff --git a/boost/fiber/numa/pin_thread.hpp b/boost/fiber/numa/pin_thread.hpp new file mode 100644 index 0000000000..c8d48395c6 --- /dev/null +++ b/boost/fiber/numa/pin_thread.hpp @@ -0,0 +1,33 @@ + +// Copyright Oliver Kowalke 2017. +// Distributed under the Boost Software License, Version 1.0. +// (See accompanying file LICENSE_1_0.txt or copy at +// http://www.boost.org/LICENSE_1_0.txt) + +#ifndef BOOST_FIBERS_NUMA_PIN_THREAD_H +#define BOOST_FIBERS_NUMA_PIN_THREAD_H + +#include <cstdint> + +#include <boost/config.hpp> + +#include <boost/fiber/detail/config.hpp> + +#ifdef BOOST_HAS_ABI_HEADERS +# include BOOST_ABI_PREFIX +#endif + +namespace boost { +namespace fibers { +namespace numa { + +BOOST_FIBERS_DECL +void pin_thread( std::uint32_t); + +}}} + +#ifdef BOOST_HAS_ABI_HEADERS +# include BOOST_ABI_SUFFIX +#endif + +#endif // BOOST_FIBERS_NUMA_PIN_THREAD_H diff --git a/boost/fiber/numa/topology.hpp b/boost/fiber/numa/topology.hpp new file mode 100644 index 0000000000..7b67fad288 --- /dev/null +++ b/boost/fiber/numa/topology.hpp @@ -0,0 +1,46 @@ + +// Copyright Oliver Kowalke 2017. +// Distributed under the Boost Software License, Version 1.0. +// (See accompanying file LICENSE_1_0.txt or copy at +// http://www.boost.org/LICENSE_1_0.txt) + +#ifndef BOOST_FIBERS_NUMA_TOPOLOGY_H +#define BOOST_FIBERS_NUMA_TOPOLOGY_H + +#include <cstdint> +#include <set> +#include <vector> + +#include <boost/config.hpp> + +#include <boost/fiber/detail/config.hpp> + +#ifdef BOOST_HAS_ABI_HEADERS +# include BOOST_ABI_PREFIX +#endif + +namespace boost { +namespace fibers { +namespace numa { + +struct node { + std::uint32_t id; + std::set< std::uint32_t > logical_cpus; + std::vector< std::uint32_t > distance; +}; + +inline +bool operator<( node const& lhs, node const& rhs) noexcept { + return lhs.id < rhs.id; +} + +BOOST_FIBERS_DECL +std::vector< node > topology(); + +}}} + +#ifdef BOOST_HAS_ABI_HEADERS +# include BOOST_ABI_SUFFIX +#endif + +#endif // BOOST_FIBERS_NUMA_TOPOLOGY_H diff --git a/boost/fiber/operations.hpp b/boost/fiber/operations.hpp index 95ab9ba452..22b58562da 100644 --- a/boost/fiber/operations.hpp +++ b/boost/fiber/operations.hpp @@ -10,6 +10,7 @@ #include <boost/config.hpp> +#include <boost/fiber/algo/algorithm.hpp> #include <boost/fiber/context.hpp> #include <boost/fiber/detail/config.hpp> #include <boost/fiber/detail/convert.hpp> @@ -36,19 +37,22 @@ void yield() noexcept { template< typename Clock, typename Duration > void sleep_until( std::chrono::time_point< Clock, Duration > const& sleep_time_) { std::chrono::steady_clock::time_point sleep_time = boost::fibers::detail::convert( sleep_time_); - fibers::context::active()->wait_until( sleep_time); + fibers::context * active_ctx = fibers::context::active(); + active_ctx->twstatus.store( static_cast< std::intptr_t >( 0), std::memory_order_release); + active_ctx->wait_until( sleep_time); } template< typename Rep, typename Period > void sleep_for( std::chrono::duration< Rep, Period > const& timeout_duration) { - fibers::context::active()->wait_until( - std::chrono::steady_clock::now() + timeout_duration); + fibers::context * active_ctx = fibers::context::active(); + active_ctx->twstatus.store( static_cast< std::intptr_t >( 0), std::memory_order_release); + active_ctx->wait_until( std::chrono::steady_clock::now() + timeout_duration); } template< typename PROPS > PROPS & properties() { fibers::fiber_properties * props = fibers::context::active()->get_properties(); - if ( ! props) { + if ( BOOST_LIKELY( nullptr == props) ) { // props could be nullptr if the thread's main fiber has not yet // yielded (not yet passed through algorithm_with_properties:: // awakened()). Address that by yielding right now. @@ -76,9 +80,7 @@ bool has_ready_fibers() noexcept { template< typename SchedAlgo, typename ... Args > void use_scheduling_algorithm( Args && ... args) noexcept { boost::fibers::context::active()->get_scheduler() - ->set_algo( - std::unique_ptr< SchedAlgo >( - new SchedAlgo( std::forward< Args >( args) ... ) ) ); + ->set_algo( new SchedAlgo( std::forward< Args >( args) ... ) ); } }} diff --git a/boost/fiber/properties.hpp b/boost/fiber/properties.hpp index 09ac7405d6..2f8b7a008d 100644 --- a/boost/fiber/properties.hpp +++ b/boost/fiber/properties.hpp @@ -28,7 +28,7 @@ class context; namespace algo { -struct algorithm; +class algorithm; } diff --git a/boost/fiber/scheduler.hpp b/boost/fiber/scheduler.hpp index 4a5f0dab8c..accedf03d7 100644 --- a/boost/fiber/scheduler.hpp +++ b/boost/fiber/scheduler.hpp @@ -13,11 +13,7 @@ #include <vector> #include <boost/config.hpp> -#if (BOOST_EXECUTION_CONTEXT==1) -# include <boost/context/execution_context.hpp> -#else -# include <boost/context/continuation.hpp> -#endif +#include <boost/context/continuation.hpp> #include <boost/intrusive/list.hpp> #include <boost/intrusive_ptr.hpp> #include <boost/intrusive/set.hpp> @@ -90,7 +86,7 @@ private: detail::spinlock remote_ready_splk_{}; remote_ready_queue_type remote_ready_queue_{}; #endif - alignas(cache_alignment) std::unique_ptr< algo::algorithm > algo_; + algo::algorithm::ptr_t algo_; // sleep-queue contains context' which have been called // scheduler::wait_until() sleep_queue_type sleep_queue_{}; @@ -126,15 +122,9 @@ public: void schedule_from_remote( context *) noexcept; #endif -#if (BOOST_EXECUTION_CONTEXT==1) - void dispatch() noexcept; - - void terminate( detail::spinlock_lock &, context *) noexcept; -#else boost::context::continuation dispatch() noexcept; boost::context::continuation terminate( detail::spinlock_lock &, context *) noexcept; -#endif void yield( context *) noexcept; @@ -149,7 +139,7 @@ public: bool has_ready_fibers() const noexcept; - void set_algo( std::unique_ptr< algo::algorithm >) noexcept; + void set_algo( algo::algorithm::ptr_t) noexcept; void attach_main_context( context *) noexcept; diff --git a/boost/fiber/type.hpp b/boost/fiber/type.hpp index 065d22fccb..d9ab0a94d7 100644 --- a/boost/fiber/type.hpp +++ b/boost/fiber/type.hpp @@ -29,7 +29,6 @@ #include <boost/fiber/detail/decay_copy.hpp> #include <boost/fiber/detail/fss.hpp> #include <boost/fiber/detail/spinlock.hpp> -#include <boost/fiber/detail/wrap.hpp> #include <boost/fiber/exceptions.hpp> #include <boost/fiber/fixedsize_stack.hpp> #include <boost/fiber/properties.hpp> diff --git a/boost/fiber/unbuffered_channel.hpp b/boost/fiber/unbuffered_channel.hpp index 38a2d6111e..1474299ded 100644 --- a/boost/fiber/unbuffered_channel.hpp +++ b/boost/fiber/unbuffered_channel.hpp @@ -38,7 +38,7 @@ public: private: typedef context::wait_queue_t wait_queue_type; - struct alignas(cache_alignment) slot { + struct slot { value_type value; context * ctx; @@ -54,12 +54,12 @@ private: }; // shared cacheline - alignas(cache_alignment) std::atomic< slot * > slot_{ nullptr }; + std::atomic< slot * > slot_{ nullptr }; // shared cacheline - alignas(cache_alignment) std::atomic_bool closed_{ false }; - alignas(cache_alignment) mutable detail::spinlock splk_producers_{}; + std::atomic_bool closed_{ false }; + mutable detail::spinlock splk_producers_{}; wait_queue_type waiting_producers_{}; - alignas( cache_alignment) mutable detail::spinlock splk_consumers_{}; + mutable detail::spinlock splk_consumers_{}; wait_queue_type waiting_consumers_{}; char pad_[cacheline_length]; @@ -98,13 +98,6 @@ public: ~unbuffered_channel() { close(); - slot * s = nullptr; - if ( nullptr != ( s = try_pop_() ) ) { - BOOST_ASSERT( nullptr != s); - BOOST_ASSERT( nullptr != s->ctx); - // value will be destructed in the context of the waiting fiber - context::active()->schedule( s->ctx); - } } unbuffered_channel( unbuffered_channel const&) = delete; @@ -122,14 +115,46 @@ public: while ( ! waiting_producers_.empty() ) { context * producer_ctx = & waiting_producers_.front(); waiting_producers_.pop_front(); - active_ctx->schedule( producer_ctx); + std::intptr_t expected = reinterpret_cast< std::intptr_t >( this); + if ( producer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) { + // notify before timeout + intrusive_ptr_release( producer_ctx); + // notify context + active_ctx->schedule( producer_ctx); + } else if ( static_cast< std::intptr_t >( 0) == expected) { + // no timed-wait op. + // notify context + active_ctx->schedule( producer_ctx); + } else { + // timed-wait op. + // expected == -1: notify after timeout, same timed-wait op. + // expected == <any>: notify after timeout, another timed-wait op. was already started + intrusive_ptr_release( producer_ctx); + // re-schedule next + } } // notify all waiting consumers detail::spinlock_lock lk2{ splk_consumers_ }; while ( ! waiting_consumers_.empty() ) { context * consumer_ctx = & waiting_consumers_.front(); waiting_consumers_.pop_front(); - active_ctx->schedule( consumer_ctx); + std::intptr_t expected = reinterpret_cast< std::intptr_t >( this); + if ( consumer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) { + // notify before timeout + intrusive_ptr_release( consumer_ctx); + // notify context + active_ctx->schedule( consumer_ctx); + } else if ( static_cast< std::intptr_t >( 0) == expected) { + // no timed-wait op. + // notify context + active_ctx->schedule( consumer_ctx); + } else { + // timed-wait op. + // expected == -1: notify after timeout, same timed-wait op. + // expected == <any>: notify after timeout, another timed-wait op. was already started + intrusive_ptr_release( consumer_ctx); + // re-schedule next + } } } @@ -137,16 +162,34 @@ public: context * active_ctx = context::active(); slot s{ value, active_ctx }; for (;;) { - if ( is_closed() ) { + if ( BOOST_UNLIKELY( is_closed() ) ) { return channel_op_status::closed; } if ( try_push_( & s) ) { detail::spinlock_lock lk{ splk_consumers_ }; // notify one waiting consumer - if ( ! waiting_consumers_.empty() ) { + while ( ! waiting_consumers_.empty() ) { context * consumer_ctx = & waiting_consumers_.front(); waiting_consumers_.pop_front(); - active_ctx->schedule( consumer_ctx); + std::intptr_t expected = reinterpret_cast< std::intptr_t >( this); + if ( consumer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) { + // notify before timeout + intrusive_ptr_release( consumer_ctx); + // notify context + active_ctx->schedule( consumer_ctx); + break; + } else if ( static_cast< std::intptr_t >( 0) == expected) { + // no timed-wait op. + // notify context + active_ctx->schedule( consumer_ctx); + break; + } else { + // timed-wait op. + // expected == -1: notify after timeout, same timed-wait op. + // expected == <any>: notify after timeout, another timed-wait op. was already started + intrusive_ptr_release( consumer_ctx); + // re-schedule next + } } // suspend till value has been consumed active_ctx->suspend( lk); @@ -154,13 +197,14 @@ public: return channel_op_status::success; } else { detail::spinlock_lock lk{ splk_producers_ }; - if ( is_closed() ) { + if ( BOOST_UNLIKELY( is_closed() ) ) { return channel_op_status::closed; } if ( is_empty_() ) { continue; } active_ctx->wait_link( waiting_producers_); + active_ctx->twstatus.store( static_cast< std::intptr_t >( 0), std::memory_order_release); // suspend this producer active_ctx->suspend( lk); // resumed, slot mabye free @@ -172,16 +216,34 @@ public: context * active_ctx = context::active(); slot s{ std::move( value), active_ctx }; for (;;) { - if ( is_closed() ) { + if ( BOOST_UNLIKELY( is_closed() ) ) { return channel_op_status::closed; } if ( try_push_( & s) ) { detail::spinlock_lock lk{ splk_consumers_ }; // notify one waiting consumer - if ( ! waiting_consumers_.empty() ) { + while ( ! waiting_consumers_.empty() ) { context * consumer_ctx = & waiting_consumers_.front(); waiting_consumers_.pop_front(); - active_ctx->schedule( consumer_ctx); + std::intptr_t expected = reinterpret_cast< std::intptr_t >( this); + if ( consumer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) { + // notify before timeout + intrusive_ptr_release( consumer_ctx); + // notify context + active_ctx->schedule( consumer_ctx); + break; + } else if ( static_cast< std::intptr_t >( 0) == expected) { + // no timed-wait op. + // notify context + active_ctx->schedule( consumer_ctx); + break; + } else { + // timed-wait op. + // expected == -1: notify after timeout, same timed-wait op. + // expected == <any>: notify after timeout, another timed-wait op. was already started + intrusive_ptr_release( consumer_ctx); + // re-schedule next + } } // suspend till value has been consumed active_ctx->suspend( lk); @@ -189,13 +251,14 @@ public: return channel_op_status::success; } else { detail::spinlock_lock lk{ splk_producers_ }; - if ( is_closed() ) { + if ( BOOST_UNLIKELY( is_closed() ) ) { return channel_op_status::closed; } if ( is_empty_() ) { continue; } active_ctx->wait_link( waiting_producers_); + active_ctx->twstatus.store( static_cast< std::intptr_t >( 0), std::memory_order_release); // suspend this producer active_ctx->suspend( lk); // resumed, slot mabye free @@ -224,18 +287,38 @@ public: slot s{ value, active_ctx }; std::chrono::steady_clock::time_point timeout_time = detail::convert( timeout_time_); for (;;) { - if ( is_closed() ) { + if ( BOOST_UNLIKELY( is_closed() ) ) { return channel_op_status::closed; } if ( try_push_( & s) ) { detail::spinlock_lock lk{ splk_consumers_ }; // notify one waiting consumer - if ( ! waiting_consumers_.empty() ) { + while ( ! waiting_consumers_.empty() ) { context * consumer_ctx = & waiting_consumers_.front(); waiting_consumers_.pop_front(); - active_ctx->schedule( consumer_ctx); + std::intptr_t expected = reinterpret_cast< std::intptr_t >( this); + if ( consumer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) { + // notify before timeout + intrusive_ptr_release( consumer_ctx); + // notify context + active_ctx->schedule( consumer_ctx); + break; + } else if ( static_cast< std::intptr_t >( 0) == expected) { + // no timed-wait op. + // notify context + active_ctx->schedule( consumer_ctx); + break; + } else { + // timed-wait op. + // expected == -1: notify after timeout, same timed-wait op. + // expected == <any>: notify after timeout, another timed-wait op. was already started + intrusive_ptr_release( consumer_ctx); + // re-schedule next + } } // suspend this producer + intrusive_ptr_add_ref( active_ctx); + active_ctx->twstatus.store( reinterpret_cast< std::intptr_t >( this), std::memory_order_release); if ( ! active_ctx->wait_until( timeout_time, lk) ) { // clear slot slot * nil_slot = nullptr, * own_slot = & s; @@ -247,13 +330,15 @@ public: return channel_op_status::success; } else { detail::spinlock_lock lk{ splk_producers_ }; - if ( is_closed() ) { + if ( BOOST_UNLIKELY( is_closed() ) ) { return channel_op_status::closed; } if ( is_empty_() ) { continue; } active_ctx->wait_link( waiting_producers_); + intrusive_ptr_add_ref( active_ctx); + active_ctx->twstatus.store( reinterpret_cast< std::intptr_t >( this), std::memory_order_release); // suspend this producer if ( ! active_ctx->wait_until( timeout_time, lk) ) { // relock local lk @@ -274,18 +359,38 @@ public: slot s{ std::move( value), active_ctx }; std::chrono::steady_clock::time_point timeout_time = detail::convert( timeout_time_); for (;;) { - if ( is_closed() ) { + if ( BOOST_UNLIKELY( is_closed() ) ) { return channel_op_status::closed; } if ( try_push_( & s) ) { detail::spinlock_lock lk{ splk_consumers_ }; // notify one waiting consumer - if ( ! waiting_consumers_.empty() ) { + while ( ! waiting_consumers_.empty() ) { context * consumer_ctx = & waiting_consumers_.front(); waiting_consumers_.pop_front(); - active_ctx->schedule( consumer_ctx); + std::intptr_t expected = reinterpret_cast< std::intptr_t >( this); + if ( consumer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) { + // notify before timeout + intrusive_ptr_release( consumer_ctx); + // notify context + active_ctx->schedule( consumer_ctx); + break; + } else if ( static_cast< std::intptr_t >( 0) == expected) { + // no timed-wait op. + // notify context + active_ctx->schedule( consumer_ctx); + break; + } else { + // timed-wait op. + // expected == -1: notify after timeout, same timed-wait op. + // expected == <any>: notify after timeout, another timed-wait op. was already started + intrusive_ptr_release( consumer_ctx); + // re-schedule next + } } // suspend this producer + intrusive_ptr_add_ref( active_ctx); + active_ctx->twstatus.store( reinterpret_cast< std::intptr_t >( this), std::memory_order_release); if ( ! active_ctx->wait_until( timeout_time, lk) ) { // clear slot slot * nil_slot = nullptr, * own_slot = & s; @@ -297,13 +402,15 @@ public: return channel_op_status::success; } else { detail::spinlock_lock lk{ splk_producers_ }; - if ( is_closed() ) { + if ( BOOST_UNLIKELY( is_closed() ) ) { return channel_op_status::closed; } if ( is_empty_() ) { continue; } active_ctx->wait_link( waiting_producers_); + intrusive_ptr_add_ref( active_ctx); + active_ctx->twstatus.store( reinterpret_cast< std::intptr_t >( this), std::memory_order_release); // suspend this producer if ( ! active_ctx->wait_until( timeout_time, lk) ) { // relock local lk @@ -325,27 +432,45 @@ public: { detail::spinlock_lock lk{ splk_producers_ }; // notify one waiting producer - if ( ! waiting_producers_.empty() ) { + while ( ! waiting_producers_.empty() ) { context * producer_ctx = & waiting_producers_.front(); waiting_producers_.pop_front(); lk.unlock(); - active_ctx->schedule( producer_ctx); + std::intptr_t expected = reinterpret_cast< std::intptr_t >( this); + if ( producer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) { + // notify before timeout + intrusive_ptr_release( producer_ctx); + // notify context + active_ctx->schedule( producer_ctx); + break; + } else if ( static_cast< std::intptr_t >( 0) == expected) { + // no timed-wait op. + // notify context + active_ctx->schedule( producer_ctx); + break; + } else { + // timed-wait op. + // expected == -1: notify after timeout, same timed-wait op. + // expected == <any>: notify after timeout, another timed-wait op. was already started + intrusive_ptr_release( producer_ctx); + // re-schedule next + } } } - // consume value value = std::move( s->value); - // resume suspended producer + // notify context active_ctx->schedule( s->ctx); return channel_op_status::success; } else { detail::spinlock_lock lk{ splk_consumers_ }; - if ( is_closed() ) { + if ( BOOST_UNLIKELY( is_closed() ) ) { return channel_op_status::closed; } if ( ! is_empty_() ) { continue; } active_ctx->wait_link( waiting_consumers_); + active_ctx->twstatus.store( static_cast< std::intptr_t >( 0), std::memory_order_release); // suspend this consumer active_ctx->suspend( lk); // resumed, slot mabye set @@ -361,21 +486,39 @@ public: { detail::spinlock_lock lk{ splk_producers_ }; // notify one waiting producer - if ( ! waiting_producers_.empty() ) { + while ( ! waiting_producers_.empty() ) { context * producer_ctx = & waiting_producers_.front(); waiting_producers_.pop_front(); lk.unlock(); - active_ctx->schedule( producer_ctx); + std::intptr_t expected = reinterpret_cast< std::intptr_t >( this); + if ( producer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) { + // notify before timeout + intrusive_ptr_release( producer_ctx); + // notify context + active_ctx->schedule( producer_ctx); + break; + } else if ( static_cast< std::intptr_t >( 0) == expected) { + // no timed-wait op. + // notify context + active_ctx->schedule( producer_ctx); + break; + } else { + // timed-wait op. + // expected == -1: notify after timeout, same timed-wait op. + // expected == <any>: notify after timeout, another timed-wait op. was already started + intrusive_ptr_release( producer_ctx); + // re-schedule next + } } } // consume value - value_type value{ std::move( s->value) }; - // resume suspended producer + value_type value = std::move( s->value); + // notify context active_ctx->schedule( s->ctx); return std::move( value); } else { detail::spinlock_lock lk{ splk_consumers_ }; - if ( is_closed() ) { + if ( BOOST_UNLIKELY( is_closed() ) ) { throw fiber_error{ std::make_error_code( std::errc::operation_not_permitted), "boost fiber: channel is closed" }; @@ -384,6 +527,7 @@ public: continue; } active_ctx->wait_link( waiting_consumers_); + active_ctx->twstatus.store( static_cast< std::intptr_t >( 0), std::memory_order_release); // suspend this consumer active_ctx->suspend( lk); // resumed, slot mabye set @@ -409,27 +553,47 @@ public: { detail::spinlock_lock lk{ splk_producers_ }; // notify one waiting producer - if ( ! waiting_producers_.empty() ) { + while ( ! waiting_producers_.empty() ) { context * producer_ctx = & waiting_producers_.front(); waiting_producers_.pop_front(); lk.unlock(); - active_ctx->schedule( producer_ctx); + std::intptr_t expected = reinterpret_cast< std::intptr_t >( this); + if ( producer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) { + // notify before timeout + intrusive_ptr_release( producer_ctx); + // notify context + active_ctx->schedule( producer_ctx); + break; + } else if ( static_cast< std::intptr_t >( 0) == expected) { + // no timed-wait op. + // notify context + active_ctx->schedule( producer_ctx); + break; + } else { + // timed-wait op. + // expected == -1: notify after timeout, same timed-wait op. + // expected == <any>: notify after timeout, another timed-wait op. was already started + intrusive_ptr_release( producer_ctx); + // re-schedule next + } } } // consume value value = std::move( s->value); - // resume suspended producer + // notify context active_ctx->schedule( s->ctx); return channel_op_status::success; } else { detail::spinlock_lock lk{ splk_consumers_ }; - if ( is_closed() ) { + if ( BOOST_UNLIKELY( is_closed() ) ) { return channel_op_status::closed; } if ( ! is_empty_() ) { continue; } active_ctx->wait_link( waiting_consumers_); + intrusive_ptr_add_ref( active_ctx); + active_ctx->twstatus.store( reinterpret_cast< std::intptr_t >( this), std::memory_order_release); // suspend this consumer if ( ! active_ctx->wait_until( timeout_time, lk) ) { // relock local lk |