diff options
Diffstat (limited to 'boost/fiber/detail/context_spmc_queue.hpp')
-rw-r--r-- | boost/fiber/detail/context_spmc_queue.hpp | 158 |
1 files changed, 158 insertions, 0 deletions
diff --git a/boost/fiber/detail/context_spmc_queue.hpp b/boost/fiber/detail/context_spmc_queue.hpp new file mode 100644 index 0000000000..6449e3658f --- /dev/null +++ b/boost/fiber/detail/context_spmc_queue.hpp @@ -0,0 +1,158 @@ + +// Copyright Oliver Kowalke 2013. +// Distributed under the Boost Software License, Version 1.0. +// (See accompanying file LICENSE_1_0.txt or copy at +// http://www.boost.org/LICENSE_1_0.txt) + +#ifndef BOOST_FIBERS_DETAIL_CONTEXT_SPMC_QUEUE_H +#define BOOST_FIBERS_DETAIL_CONTEXT_SPMC_QUEUE_H + +#include <atomic> +#include <cstddef> +#include <cstdint> +#include <memory> +#include <type_traits> +#include <utility> + +#include <boost/assert.hpp> +#include <boost/config.hpp> + +#include <boost/fiber/detail/config.hpp> +#include <boost/fiber/context.hpp> + +// David Chase and Yossi Lev. Dynamic circular work-stealing deque. +// In SPAA ’05: Proceedings of the seventeenth annual ACM symposium +// on Parallelism in algorithms and architectures, pages 21–28, +// New York, NY, USA, 2005. ACM. +// +// Nhat Minh Lê, Antoniu Pop, Albert Cohen, and Francesco Zappa Nardelli. 2013. +// Correct and efficient work-stealing for weak memory models. +// In Proceedings of the 18th ACM SIGPLAN symposium on Principles and practice +// of parallel programming (PPoPP '13). ACM, New York, NY, USA, 69-80. + +namespace boost { +namespace fibers { +namespace detail { + +class context_spmc_queue { +private: + class array { + private: + typedef std::atomic< context * > atomic_type; + typedef std::aligned_storage< + sizeof( atomic_type), cache_alignment + >::type storage_type; + + std::size_t size_; + storage_type * storage_; + + public: + array( std::size_t size) : + size_{ size }, + storage_{ new storage_type[size_] } { + for ( std::size_t i = 0; i < size_; ++i) { + ::new ( static_cast< void * >( std::addressof( storage_[i]) ) ) atomic_type{ nullptr }; + } + } + + ~array() { + for ( std::size_t i = 0; i < size_; ++i) { + reinterpret_cast< atomic_type * >( std::addressof( storage_[i]) )->~atomic_type(); + } + delete [] storage_; + } + + std::size_t size() const noexcept { + return size_; + } + + void push( std::size_t bottom, context * ctx) noexcept { + reinterpret_cast< atomic_type * >( + std::addressof( storage_[bottom % size_]) ) + ->store( ctx, std::memory_order_relaxed); + } + + context * pop( std::size_t top) noexcept { + return reinterpret_cast< atomic_type * >( + std::addressof( storage_[top % size_]) ) + ->load( std::memory_order_relaxed); + } + + array * resize( std::size_t bottom, std::size_t top) { + std::unique_ptr< array > tmp{ new array{ 2 * size_ } }; + for ( std::size_t i = top; i != bottom; ++i) { + tmp->push( i, pop( i) ); + } + return tmp.release(); + } + }; + + alignas(cache_alignment) std::atomic< std::size_t > top_{ 0 }; + alignas(cache_alignment) std::atomic< std::size_t > bottom_{ 0 }; + alignas(cache_alignment) std::atomic< array * > array_; + std::vector< array * > old_arrays_{}; + char padding_[cacheline_length]; + +public: + context_spmc_queue() : + array_{ new array{ 1024 } } { + old_arrays_.reserve( 32); + } + + ~context_spmc_queue() { + for ( array * a : old_arrays_) { + delete a; + } + delete array_.load(); + } + + context_spmc_queue( context_spmc_queue const&) = delete; + context_spmc_queue & operator=( context_spmc_queue const&) = delete; + + bool empty() const noexcept { + std::size_t bottom{ bottom_.load( std::memory_order_relaxed) }; + std::size_t top{ top_.load( std::memory_order_relaxed) }; + return bottom <= top; + } + + void push( context * ctx) { + std::size_t bottom{ bottom_.load( std::memory_order_relaxed) }; + std::size_t top{ top_.load( std::memory_order_acquire) }; + array * a{ array_.load( std::memory_order_relaxed) }; + if ( (a->size() - 1) < (bottom - top) ) { + // queue is full + // resize + array * tmp{ a->resize( bottom, top) }; + old_arrays_.push_back( a); + std::swap( a, tmp); + array_.store( a, std::memory_order_relaxed); + } + a->push( bottom, ctx); + std::atomic_thread_fence( std::memory_order_release); + bottom_.store( bottom + 1, std::memory_order_relaxed); + } + + context * pop() { + std::size_t top{ top_.load( std::memory_order_acquire) }; + std::atomic_thread_fence( std::memory_order_seq_cst); + std::size_t bottom{ bottom_.load( std::memory_order_acquire) }; + context * ctx{ nullptr }; + if ( top < bottom) { + // queue is not empty + array * a{ array_.load( std::memory_order_consume) }; + ctx = a->pop( top); + if ( ctx->is_context( type::pinned_context) || + ! top_.compare_exchange_strong( top, top + 1, + std::memory_order_seq_cst, + std::memory_order_relaxed) ) { + // lose the race + return nullptr; + } + } + return ctx; + } +}; + +}}} + +#endif // BOOST_FIBERS_DETAIL_CONTEXT_SPMC_QUEUE_H |