summaryrefslogtreecommitdiff
path: root/boost/fiber/detail/context_spmc_queue.hpp
diff options
context:
space:
mode:
Diffstat (limited to 'boost/fiber/detail/context_spmc_queue.hpp')
-rw-r--r--boost/fiber/detail/context_spmc_queue.hpp99
1 files changed, 70 insertions, 29 deletions
diff --git a/boost/fiber/detail/context_spmc_queue.hpp b/boost/fiber/detail/context_spmc_queue.hpp
index 6449e3658f..27256233cf 100644
--- a/boost/fiber/detail/context_spmc_queue.hpp
+++ b/boost/fiber/detail/context_spmc_queue.hpp
@@ -30,6 +30,11 @@
// In Proceedings of the 18th ACM SIGPLAN symposium on Principles and practice
// of parallel programming (PPoPP '13). ACM, New York, NY, USA, 69-80.
+#if BOOST_COMP_CLANG
+#pragma clang diagnostic push
+#pragma clang diagnostic ignored "-Wunused-private-field"
+#endif
+
namespace boost {
namespace fibers {
namespace detail {
@@ -43,43 +48,43 @@ private:
sizeof( atomic_type), cache_alignment
>::type storage_type;
- std::size_t size_;
+ std::size_t capacity_;
storage_type * storage_;
public:
- array( std::size_t size) :
- size_{ size },
- storage_{ new storage_type[size_] } {
- for ( std::size_t i = 0; i < size_; ++i) {
+ array( std::size_t capacity) :
+ capacity_{ capacity },
+ storage_{ new storage_type[capacity_] } {
+ for ( std::size_t i = 0; i < capacity_; ++i) {
::new ( static_cast< void * >( std::addressof( storage_[i]) ) ) atomic_type{ nullptr };
}
}
~array() {
- for ( std::size_t i = 0; i < size_; ++i) {
+ for ( std::size_t i = 0; i < capacity_; ++i) {
reinterpret_cast< atomic_type * >( std::addressof( storage_[i]) )->~atomic_type();
}
delete [] storage_;
}
- std::size_t size() const noexcept {
- return size_;
+ std::size_t capacity() const noexcept {
+ return capacity_;
}
void push( std::size_t bottom, context * ctx) noexcept {
reinterpret_cast< atomic_type * >(
- std::addressof( storage_[bottom % size_]) )
+ std::addressof( storage_[bottom % capacity_]) )
->store( ctx, std::memory_order_relaxed);
}
context * pop( std::size_t top) noexcept {
return reinterpret_cast< atomic_type * >(
- std::addressof( storage_[top % size_]) )
+ std::addressof( storage_[top % capacity_]) )
->load( std::memory_order_relaxed);
}
array * resize( std::size_t bottom, std::size_t top) {
- std::unique_ptr< array > tmp{ new array{ 2 * size_ } };
+ std::unique_ptr< array > tmp{ new array{ 2 * capacity_ } };
for ( std::size_t i = top; i != bottom; ++i) {
tmp->push( i, pop( i) );
}
@@ -87,15 +92,15 @@ private:
}
};
- alignas(cache_alignment) std::atomic< std::size_t > top_{ 0 };
- alignas(cache_alignment) std::atomic< std::size_t > bottom_{ 0 };
+ alignas(cache_alignment) std::atomic< std::size_t > top_{ 0 };
+ alignas(cache_alignment) std::atomic< std::size_t > bottom_{ 0 };
alignas(cache_alignment) std::atomic< array * > array_;
- std::vector< array * > old_arrays_{};
+ std::vector< array * > old_arrays_{};
char padding_[cacheline_length];
public:
- context_spmc_queue() :
- array_{ new array{ 1024 } } {
+ context_spmc_queue( std::size_t capacity = 4096) :
+ array_{ new array{ capacity } } {
old_arrays_.reserve( 32);
}
@@ -110,19 +115,19 @@ public:
context_spmc_queue & operator=( context_spmc_queue const&) = delete;
bool empty() const noexcept {
- std::size_t bottom{ bottom_.load( std::memory_order_relaxed) };
- std::size_t top{ top_.load( std::memory_order_relaxed) };
+ std::size_t bottom = bottom_.load( std::memory_order_relaxed);
+ std::size_t top = top_.load( std::memory_order_relaxed);
return bottom <= top;
}
void push( context * ctx) {
- std::size_t bottom{ bottom_.load( std::memory_order_relaxed) };
- std::size_t top{ top_.load( std::memory_order_acquire) };
- array * a{ array_.load( std::memory_order_relaxed) };
- if ( (a->size() - 1) < (bottom - top) ) {
+ std::size_t bottom = bottom_.load( std::memory_order_relaxed);
+ std::size_t top = top_.load( std::memory_order_acquire);
+ array * a = array_.load( std::memory_order_relaxed);
+ if ( (a->capacity() - 1) < (bottom - top) ) {
// queue is full
// resize
- array * tmp{ a->resize( bottom, top) };
+ array * tmp = a->resize( bottom, top);
old_arrays_.push_back( a);
std::swap( a, tmp);
array_.store( a, std::memory_order_relaxed);
@@ -133,16 +138,48 @@ public:
}
context * pop() {
- std::size_t top{ top_.load( std::memory_order_acquire) };
+ std::size_t bottom = bottom_.load( std::memory_order_relaxed) - 1;
+ array * a = array_.load( std::memory_order_relaxed);
+ bottom_.store( bottom, std::memory_order_relaxed);
std::atomic_thread_fence( std::memory_order_seq_cst);
- std::size_t bottom{ bottom_.load( std::memory_order_acquire) };
- context * ctx{ nullptr };
+ std::size_t top = top_.load( std::memory_order_relaxed);
+ context * ctx = nullptr;
+ if ( top <= bottom) {
+ // queue is not empty
+ ctx = a->pop( bottom);
+ BOOST_ASSERT( nullptr != ctx);
+ if ( top == bottom) {
+ // last element dequeued
+ if ( ! top_.compare_exchange_strong( top, top + 1,
+ std::memory_order_seq_cst,
+ std::memory_order_relaxed) ) {
+ // lose the race
+ ctx = nullptr;
+ }
+ bottom_.store( bottom + 1, std::memory_order_relaxed);
+ }
+ } else {
+ // queue is empty
+ bottom_.store( bottom + 1, std::memory_order_relaxed);
+ }
+ return ctx;
+ }
+
+ context * steal() {
+ std::size_t top = top_.load( std::memory_order_acquire);
+ std::atomic_thread_fence( std::memory_order_seq_cst);
+ std::size_t bottom = bottom_.load( std::memory_order_acquire);
+ context * ctx = nullptr;
if ( top < bottom) {
// queue is not empty
- array * a{ array_.load( std::memory_order_consume) };
+ array * a = array_.load( std::memory_order_consume);
ctx = a->pop( top);
- if ( ctx->is_context( type::pinned_context) ||
- ! top_.compare_exchange_strong( top, top + 1,
+ BOOST_ASSERT( nullptr != ctx);
+ // do not steal pinned context (e.g. main-/dispatcher-context)
+ if ( ctx->is_context( type::pinned_context) ) {
+ return nullptr;
+ }
+ if ( ! top_.compare_exchange_strong( top, top + 1,
std::memory_order_seq_cst,
std::memory_order_relaxed) ) {
// lose the race
@@ -155,4 +192,8 @@ public:
}}}
+#if BOOST_COMP_CLANG
+#pragma clang diagnostic pop
+#endif
+
#endif // BOOST_FIBERS_DETAIL_CONTEXT_SPMC_QUEUE_H