diff options
Diffstat (limited to 'boost/fiber/algo')
-rw-r--r-- | boost/fiber/algo/algorithm.hpp | 29 | ||||
-rw-r--r-- | boost/fiber/algo/numa/work_stealing.hpp | 93 | ||||
-rw-r--r-- | boost/fiber/algo/work_stealing.hpp | 34 |
3 files changed, 137 insertions, 19 deletions
diff --git a/boost/fiber/algo/algorithm.hpp b/boost/fiber/algo/algorithm.hpp index 9b846e774b..b22a5923d6 100644 --- a/boost/fiber/algo/algorithm.hpp +++ b/boost/fiber/algo/algorithm.hpp @@ -6,11 +6,13 @@ #ifndef BOOST_FIBERS_ALGO_ALGORITHM_H #define BOOST_FIBERS_ALGO_ALGORITHM_H -#include <cstddef> +#include <atomic> #include <chrono> +#include <cstddef> -#include <boost/config.hpp> #include <boost/assert.hpp> +#include <boost/config.hpp> +#include <boost/intrusive_ptr.hpp> #include <boost/fiber/properties.hpp> #include <boost/fiber/detail/config.hpp> @@ -26,7 +28,13 @@ class context; namespace algo { -struct BOOST_FIBERS_DECL algorithm { +class BOOST_FIBERS_DECL algorithm { +private: + std::atomic< std::size_t > use_count_{ 0 }; + +public: + typedef intrusive_ptr< algorithm > ptr_t; + virtual ~algorithm() {} virtual void awakened( context *) noexcept = 0; @@ -38,6 +46,19 @@ struct BOOST_FIBERS_DECL algorithm { virtual void suspend_until( std::chrono::steady_clock::time_point const&) noexcept = 0; virtual void notify() noexcept = 0; + + friend void intrusive_ptr_add_ref( algorithm * algo) noexcept { + BOOST_ASSERT( nullptr != algo); + algo->use_count_.fetch_add( 1, std::memory_order_relaxed); + } + + friend void intrusive_ptr_release( algorithm * algo) noexcept { + BOOST_ASSERT( nullptr != algo); + if ( 1 == algo->use_count_.fetch_sub( 1, std::memory_order_release) ) { + std::atomic_thread_fence( std::memory_order_acquire); + delete algo; + } + } }; class BOOST_FIBERS_DECL algorithm_with_properties_base : public algorithm { @@ -60,7 +81,7 @@ struct algorithm_with_properties : public algorithm_with_properties_base { // with: algorithm_with_properties<PROPS>::awakened(fb); virtual void awakened( context * ctx) noexcept override final { fiber_properties * props = super::get_properties( ctx); - if ( nullptr == props) { + if ( BOOST_LIKELY( nullptr == props) ) { // TODO: would be great if PROPS could be allocated on the new // fiber's stack somehow props = new_properties( ctx); diff --git a/boost/fiber/algo/numa/work_stealing.hpp b/boost/fiber/algo/numa/work_stealing.hpp new file mode 100644 index 0000000000..26032ab35e --- /dev/null +++ b/boost/fiber/algo/numa/work_stealing.hpp @@ -0,0 +1,93 @@ + +// Copyright Oliver Kowalke 2017. +// Distributed under the Boost Software License, Version 1.0. +// (See accompanying file LICENSE_1_0.txt or copy at +// http://www.boost.org/LICENSE_1_0.txt) +// + +#ifndef BOOST_FIBERS_ALGO_NUMA_WORK_STEALING_H +#define BOOST_FIBERS_ALGO_NUMA_WORK_STEALING_H + +#include <condition_variable> +#include <chrono> +#include <cstddef> +#include <cstdint> +#include <mutex> +#include <vector> + +#include <boost/config.hpp> +#include <boost/intrusive_ptr.hpp> + +#include <boost/fiber/algo/algorithm.hpp> +#include <boost/fiber/context.hpp> +#include <boost/fiber/detail/config.hpp> +#include <boost/fiber/detail/context_spinlock_queue.hpp> +#include <boost/fiber/detail/context_spmc_queue.hpp> +#include <boost/fiber/numa/pin_thread.hpp> +#include <boost/fiber/numa/topology.hpp> +#include <boost/fiber/scheduler.hpp> + +#ifdef BOOST_HAS_ABI_HEADERS +# include BOOST_ABI_PREFIX +#endif + +namespace boost { +namespace fibers { +namespace algo { +namespace numa { + +class work_stealing : public algorithm { +private: + static std::vector< intrusive_ptr< work_stealing > > schedulers_; + + std::uint32_t cpu_id_; + std::vector< std::uint32_t > local_cpus_; + std::vector< std::uint32_t > remote_cpus_; +#ifdef BOOST_FIBERS_USE_SPMC_QUEUE + detail::context_spmc_queue rqueue_{}; +#else + detail::context_spinlock_queue rqueue_{}; +#endif + std::mutex mtx_{}; + std::condition_variable cnd_{}; + bool flag_{ false }; + bool suspend_; + + static void init_( std::vector< boost::fibers::numa::node > const&, + std::vector< intrusive_ptr< work_stealing > > &); + +public: + work_stealing( std::uint32_t, std::uint32_t, + std::vector< boost::fibers::numa::node > const&, + bool = false); + + work_stealing( work_stealing const&) = delete; + work_stealing( work_stealing &&) = delete; + + work_stealing & operator=( work_stealing const&) = delete; + work_stealing & operator=( work_stealing &&) = delete; + + virtual void awakened( context *) noexcept; + + virtual context * pick_next() noexcept; + + virtual context * steal() noexcept { + return rqueue_.steal(); + } + + virtual bool has_ready_fibers() const noexcept { + return ! rqueue_.empty(); + } + + virtual void suspend_until( std::chrono::steady_clock::time_point const&) noexcept; + + virtual void notify() noexcept; +}; + +}}}} + +#ifdef BOOST_HAS_ABI_HEADERS +# include BOOST_ABI_SUFFIX +#endif + +#endif // BOOST_FIBERS_ALGO_NUMA_WORK_STEALING_H diff --git a/boost/fiber/algo/work_stealing.hpp b/boost/fiber/algo/work_stealing.hpp index 66cadd12be..db4b5cf12d 100644 --- a/boost/fiber/algo/work_stealing.hpp +++ b/boost/fiber/algo/work_stealing.hpp @@ -8,19 +8,22 @@ #ifndef BOOST_FIBERS_ALGO_WORK_STEALING_H #define BOOST_FIBERS_ALGO_WORK_STEALING_H +#include <atomic> #include <condition_variable> #include <chrono> #include <cstddef> +#include <cstdint> #include <mutex> #include <vector> #include <boost/config.hpp> +#include <boost/intrusive_ptr.hpp> #include <boost/fiber/algo/algorithm.hpp> -#include <boost/fiber/detail/context_spinlock_queue.hpp> -#include <boost/fiber/detail/context_spmc_queue.hpp> #include <boost/fiber/context.hpp> #include <boost/fiber/detail/config.hpp> +#include <boost/fiber/detail/context_spinlock_queue.hpp> +#include <boost/fiber/detail/context_spmc_queue.hpp> #include <boost/fiber/scheduler.hpp> #ifdef BOOST_HAS_ABI_HEADERS @@ -33,24 +36,25 @@ namespace algo { class work_stealing : public algorithm { private: - static std::vector< work_stealing * > schedulers_; + static std::atomic< std::uint32_t > counter_; + static std::vector< intrusive_ptr< work_stealing > > schedulers_; - std::size_t idx_; - std::size_t max_idx_; + std::uint32_t id_; + std::uint32_t thread_count_; #ifdef BOOST_FIBERS_USE_SPMC_QUEUE - alignas(cache_alignment) detail::context_spmc_queue rqueue_{}; + detail::context_spmc_queue rqueue_{}; #else - alignas(cache_alignment) detail::context_spinlock_queue rqueue_{}; + detail::context_spinlock_queue rqueue_{}; #endif std::mutex mtx_{}; std::condition_variable cnd_{}; bool flag_{ false }; bool suspend_; - static void init_( std::size_t max_idx); + static void init_( std::uint32_t, std::vector< intrusive_ptr< work_stealing > > &); public: - work_stealing( std::size_t max_idx, std::size_t idx, bool suspend = false); + work_stealing( std::uint32_t, bool = false); work_stealing( work_stealing const&) = delete; work_stealing( work_stealing &&) = delete; @@ -58,21 +62,21 @@ public: work_stealing & operator=( work_stealing const&) = delete; work_stealing & operator=( work_stealing &&) = delete; - void awakened( context * ctx) noexcept; + virtual void awakened( context *) noexcept; - context * pick_next() noexcept; + virtual context * pick_next() noexcept; - context * steal() noexcept { + virtual context * steal() noexcept { return rqueue_.steal(); } - bool has_ready_fibers() const noexcept { + virtual bool has_ready_fibers() const noexcept { return ! rqueue_.empty(); } - void suspend_until( std::chrono::steady_clock::time_point const& time_point) noexcept; + virtual void suspend_until( std::chrono::steady_clock::time_point const&) noexcept; - void notify() noexcept; + virtual void notify() noexcept; }; }}} |