summaryrefslogtreecommitdiff
path: root/boost/fiber/detail/context_spmc_queue.hpp
diff options
context:
space:
mode:
Diffstat (limited to 'boost/fiber/detail/context_spmc_queue.hpp')
-rw-r--r--boost/fiber/detail/context_spmc_queue.hpp158
1 files changed, 158 insertions, 0 deletions
diff --git a/boost/fiber/detail/context_spmc_queue.hpp b/boost/fiber/detail/context_spmc_queue.hpp
new file mode 100644
index 0000000000..6449e3658f
--- /dev/null
+++ b/boost/fiber/detail/context_spmc_queue.hpp
@@ -0,0 +1,158 @@
+
+// Copyright Oliver Kowalke 2013.
+// Distributed under the Boost Software License, Version 1.0.
+// (See accompanying file LICENSE_1_0.txt or copy at
+// http://www.boost.org/LICENSE_1_0.txt)
+
+#ifndef BOOST_FIBERS_DETAIL_CONTEXT_SPMC_QUEUE_H
+#define BOOST_FIBERS_DETAIL_CONTEXT_SPMC_QUEUE_H
+
+#include <atomic>
+#include <cstddef>
+#include <cstdint>
+#include <memory>
+#include <type_traits>
+#include <utility>
+
+#include <boost/assert.hpp>
+#include <boost/config.hpp>
+
+#include <boost/fiber/detail/config.hpp>
+#include <boost/fiber/context.hpp>
+
+// David Chase and Yossi Lev. Dynamic circular work-stealing deque.
+// In SPAA ’05: Proceedings of the seventeenth annual ACM symposium
+// on Parallelism in algorithms and architectures, pages 21–28,
+// New York, NY, USA, 2005. ACM.
+//
+// Nhat Minh Lê, Antoniu Pop, Albert Cohen, and Francesco Zappa Nardelli. 2013.
+// Correct and efficient work-stealing for weak memory models.
+// In Proceedings of the 18th ACM SIGPLAN symposium on Principles and practice
+// of parallel programming (PPoPP '13). ACM, New York, NY, USA, 69-80.
+
+namespace boost {
+namespace fibers {
+namespace detail {
+
+class context_spmc_queue {
+private:
+ class array {
+ private:
+ typedef std::atomic< context * > atomic_type;
+ typedef std::aligned_storage<
+ sizeof( atomic_type), cache_alignment
+ >::type storage_type;
+
+ std::size_t size_;
+ storage_type * storage_;
+
+ public:
+ array( std::size_t size) :
+ size_{ size },
+ storage_{ new storage_type[size_] } {
+ for ( std::size_t i = 0; i < size_; ++i) {
+ ::new ( static_cast< void * >( std::addressof( storage_[i]) ) ) atomic_type{ nullptr };
+ }
+ }
+
+ ~array() {
+ for ( std::size_t i = 0; i < size_; ++i) {
+ reinterpret_cast< atomic_type * >( std::addressof( storage_[i]) )->~atomic_type();
+ }
+ delete [] storage_;
+ }
+
+ std::size_t size() const noexcept {
+ return size_;
+ }
+
+ void push( std::size_t bottom, context * ctx) noexcept {
+ reinterpret_cast< atomic_type * >(
+ std::addressof( storage_[bottom % size_]) )
+ ->store( ctx, std::memory_order_relaxed);
+ }
+
+ context * pop( std::size_t top) noexcept {
+ return reinterpret_cast< atomic_type * >(
+ std::addressof( storage_[top % size_]) )
+ ->load( std::memory_order_relaxed);
+ }
+
+ array * resize( std::size_t bottom, std::size_t top) {
+ std::unique_ptr< array > tmp{ new array{ 2 * size_ } };
+ for ( std::size_t i = top; i != bottom; ++i) {
+ tmp->push( i, pop( i) );
+ }
+ return tmp.release();
+ }
+ };
+
+ alignas(cache_alignment) std::atomic< std::size_t > top_{ 0 };
+ alignas(cache_alignment) std::atomic< std::size_t > bottom_{ 0 };
+ alignas(cache_alignment) std::atomic< array * > array_;
+ std::vector< array * > old_arrays_{};
+ char padding_[cacheline_length];
+
+public:
+ context_spmc_queue() :
+ array_{ new array{ 1024 } } {
+ old_arrays_.reserve( 32);
+ }
+
+ ~context_spmc_queue() {
+ for ( array * a : old_arrays_) {
+ delete a;
+ }
+ delete array_.load();
+ }
+
+ context_spmc_queue( context_spmc_queue const&) = delete;
+ context_spmc_queue & operator=( context_spmc_queue const&) = delete;
+
+ bool empty() const noexcept {
+ std::size_t bottom{ bottom_.load( std::memory_order_relaxed) };
+ std::size_t top{ top_.load( std::memory_order_relaxed) };
+ return bottom <= top;
+ }
+
+ void push( context * ctx) {
+ std::size_t bottom{ bottom_.load( std::memory_order_relaxed) };
+ std::size_t top{ top_.load( std::memory_order_acquire) };
+ array * a{ array_.load( std::memory_order_relaxed) };
+ if ( (a->size() - 1) < (bottom - top) ) {
+ // queue is full
+ // resize
+ array * tmp{ a->resize( bottom, top) };
+ old_arrays_.push_back( a);
+ std::swap( a, tmp);
+ array_.store( a, std::memory_order_relaxed);
+ }
+ a->push( bottom, ctx);
+ std::atomic_thread_fence( std::memory_order_release);
+ bottom_.store( bottom + 1, std::memory_order_relaxed);
+ }
+
+ context * pop() {
+ std::size_t top{ top_.load( std::memory_order_acquire) };
+ std::atomic_thread_fence( std::memory_order_seq_cst);
+ std::size_t bottom{ bottom_.load( std::memory_order_acquire) };
+ context * ctx{ nullptr };
+ if ( top < bottom) {
+ // queue is not empty
+ array * a{ array_.load( std::memory_order_consume) };
+ ctx = a->pop( top);
+ if ( ctx->is_context( type::pinned_context) ||
+ ! top_.compare_exchange_strong( top, top + 1,
+ std::memory_order_seq_cst,
+ std::memory_order_relaxed) ) {
+ // lose the race
+ return nullptr;
+ }
+ }
+ return ctx;
+ }
+};
+
+}}}
+
+#endif // BOOST_FIBERS_DETAIL_CONTEXT_SPMC_QUEUE_H