diff options
Diffstat (limited to 'boost/fiber/detail/context_spmc_queue.hpp')
-rw-r--r-- | boost/fiber/detail/context_spmc_queue.hpp | 99 |
1 files changed, 70 insertions, 29 deletions
diff --git a/boost/fiber/detail/context_spmc_queue.hpp b/boost/fiber/detail/context_spmc_queue.hpp index 6449e3658f..27256233cf 100644 --- a/boost/fiber/detail/context_spmc_queue.hpp +++ b/boost/fiber/detail/context_spmc_queue.hpp @@ -30,6 +30,11 @@ // In Proceedings of the 18th ACM SIGPLAN symposium on Principles and practice // of parallel programming (PPoPP '13). ACM, New York, NY, USA, 69-80. +#if BOOST_COMP_CLANG +#pragma clang diagnostic push +#pragma clang diagnostic ignored "-Wunused-private-field" +#endif + namespace boost { namespace fibers { namespace detail { @@ -43,43 +48,43 @@ private: sizeof( atomic_type), cache_alignment >::type storage_type; - std::size_t size_; + std::size_t capacity_; storage_type * storage_; public: - array( std::size_t size) : - size_{ size }, - storage_{ new storage_type[size_] } { - for ( std::size_t i = 0; i < size_; ++i) { + array( std::size_t capacity) : + capacity_{ capacity }, + storage_{ new storage_type[capacity_] } { + for ( std::size_t i = 0; i < capacity_; ++i) { ::new ( static_cast< void * >( std::addressof( storage_[i]) ) ) atomic_type{ nullptr }; } } ~array() { - for ( std::size_t i = 0; i < size_; ++i) { + for ( std::size_t i = 0; i < capacity_; ++i) { reinterpret_cast< atomic_type * >( std::addressof( storage_[i]) )->~atomic_type(); } delete [] storage_; } - std::size_t size() const noexcept { - return size_; + std::size_t capacity() const noexcept { + return capacity_; } void push( std::size_t bottom, context * ctx) noexcept { reinterpret_cast< atomic_type * >( - std::addressof( storage_[bottom % size_]) ) + std::addressof( storage_[bottom % capacity_]) ) ->store( ctx, std::memory_order_relaxed); } context * pop( std::size_t top) noexcept { return reinterpret_cast< atomic_type * >( - std::addressof( storage_[top % size_]) ) + std::addressof( storage_[top % capacity_]) ) ->load( std::memory_order_relaxed); } array * resize( std::size_t bottom, std::size_t top) { - std::unique_ptr< array > tmp{ new array{ 2 * size_ } }; + std::unique_ptr< array > tmp{ new array{ 2 * capacity_ } }; for ( std::size_t i = top; i != bottom; ++i) { tmp->push( i, pop( i) ); } @@ -87,15 +92,15 @@ private: } }; - alignas(cache_alignment) std::atomic< std::size_t > top_{ 0 }; - alignas(cache_alignment) std::atomic< std::size_t > bottom_{ 0 }; + alignas(cache_alignment) std::atomic< std::size_t > top_{ 0 }; + alignas(cache_alignment) std::atomic< std::size_t > bottom_{ 0 }; alignas(cache_alignment) std::atomic< array * > array_; - std::vector< array * > old_arrays_{}; + std::vector< array * > old_arrays_{}; char padding_[cacheline_length]; public: - context_spmc_queue() : - array_{ new array{ 1024 } } { + context_spmc_queue( std::size_t capacity = 4096) : + array_{ new array{ capacity } } { old_arrays_.reserve( 32); } @@ -110,19 +115,19 @@ public: context_spmc_queue & operator=( context_spmc_queue const&) = delete; bool empty() const noexcept { - std::size_t bottom{ bottom_.load( std::memory_order_relaxed) }; - std::size_t top{ top_.load( std::memory_order_relaxed) }; + std::size_t bottom = bottom_.load( std::memory_order_relaxed); + std::size_t top = top_.load( std::memory_order_relaxed); return bottom <= top; } void push( context * ctx) { - std::size_t bottom{ bottom_.load( std::memory_order_relaxed) }; - std::size_t top{ top_.load( std::memory_order_acquire) }; - array * a{ array_.load( std::memory_order_relaxed) }; - if ( (a->size() - 1) < (bottom - top) ) { + std::size_t bottom = bottom_.load( std::memory_order_relaxed); + std::size_t top = top_.load( std::memory_order_acquire); + array * a = array_.load( std::memory_order_relaxed); + if ( (a->capacity() - 1) < (bottom - top) ) { // queue is full // resize - array * tmp{ a->resize( bottom, top) }; + array * tmp = a->resize( bottom, top); old_arrays_.push_back( a); std::swap( a, tmp); array_.store( a, std::memory_order_relaxed); @@ -133,16 +138,48 @@ public: } context * pop() { - std::size_t top{ top_.load( std::memory_order_acquire) }; + std::size_t bottom = bottom_.load( std::memory_order_relaxed) - 1; + array * a = array_.load( std::memory_order_relaxed); + bottom_.store( bottom, std::memory_order_relaxed); std::atomic_thread_fence( std::memory_order_seq_cst); - std::size_t bottom{ bottom_.load( std::memory_order_acquire) }; - context * ctx{ nullptr }; + std::size_t top = top_.load( std::memory_order_relaxed); + context * ctx = nullptr; + if ( top <= bottom) { + // queue is not empty + ctx = a->pop( bottom); + BOOST_ASSERT( nullptr != ctx); + if ( top == bottom) { + // last element dequeued + if ( ! top_.compare_exchange_strong( top, top + 1, + std::memory_order_seq_cst, + std::memory_order_relaxed) ) { + // lose the race + ctx = nullptr; + } + bottom_.store( bottom + 1, std::memory_order_relaxed); + } + } else { + // queue is empty + bottom_.store( bottom + 1, std::memory_order_relaxed); + } + return ctx; + } + + context * steal() { + std::size_t top = top_.load( std::memory_order_acquire); + std::atomic_thread_fence( std::memory_order_seq_cst); + std::size_t bottom = bottom_.load( std::memory_order_acquire); + context * ctx = nullptr; if ( top < bottom) { // queue is not empty - array * a{ array_.load( std::memory_order_consume) }; + array * a = array_.load( std::memory_order_consume); ctx = a->pop( top); - if ( ctx->is_context( type::pinned_context) || - ! top_.compare_exchange_strong( top, top + 1, + BOOST_ASSERT( nullptr != ctx); + // do not steal pinned context (e.g. main-/dispatcher-context) + if ( ctx->is_context( type::pinned_context) ) { + return nullptr; + } + if ( ! top_.compare_exchange_strong( top, top + 1, std::memory_order_seq_cst, std::memory_order_relaxed) ) { // lose the race @@ -155,4 +192,8 @@ public: }}} +#if BOOST_COMP_CLANG +#pragma clang diagnostic pop +#endif + #endif // BOOST_FIBERS_DETAIL_CONTEXT_SPMC_QUEUE_H |