summaryrefslogtreecommitdiff
path: root/boost/fiber
diff options
context:
space:
mode:
Diffstat (limited to 'boost/fiber')
-rw-r--r--boost/fiber/algo/algorithm.hpp29
-rw-r--r--boost/fiber/algo/numa/work_stealing.hpp93
-rw-r--r--boost/fiber/algo/work_stealing.hpp34
-rw-r--r--boost/fiber/all.hpp3
-rw-r--r--boost/fiber/buffered_channel.hpp294
-rw-r--r--boost/fiber/condition_variable.hpp3
-rw-r--r--boost/fiber/context.hpp263
-rw-r--r--boost/fiber/detail/config.hpp21
-rw-r--r--boost/fiber/detail/context_spinlock_queue.hpp2
-rw-r--r--boost/fiber/detail/context_spmc_queue.hpp10
-rw-r--r--boost/fiber/detail/convert.hpp16
-rw-r--r--boost/fiber/detail/cpu_relax.hpp2
-rw-r--r--boost/fiber/detail/data.hpp17
-rw-r--r--boost/fiber/detail/futex.hpp12
-rw-r--r--boost/fiber/detail/rtm.hpp94
-rw-r--r--boost/fiber/detail/spinlock.hpp29
-rw-r--r--boost/fiber/detail/spinlock_rtm.hpp126
-rw-r--r--boost/fiber/detail/spinlock_status.hpp21
-rw-r--r--boost/fiber/detail/spinlock_ttas.hpp43
-rw-r--r--boost/fiber/detail/spinlock_ttas_adaptive.hpp46
-rw-r--r--boost/fiber/detail/spinlock_ttas_adaptive_futex.hpp63
-rw-r--r--boost/fiber/detail/spinlock_ttas_futex.hpp51
-rw-r--r--boost/fiber/detail/wrap.hpp131
-rw-r--r--boost/fiber/fiber.hpp47
-rw-r--r--boost/fiber/fss.hpp6
-rw-r--r--boost/fiber/future/async.hpp8
-rw-r--r--boost/fiber/future/detail/shared_state.hpp14
-rw-r--r--boost/fiber/future/detail/task_object.hpp13
-rw-r--r--boost/fiber/future/future.hpp48
-rw-r--r--boost/fiber/future/packaged_task.hpp15
-rw-r--r--boost/fiber/future/promise.hpp24
-rw-r--r--boost/fiber/numa/pin_thread.hpp33
-rw-r--r--boost/fiber/numa/topology.hpp46
-rw-r--r--boost/fiber/operations.hpp16
-rw-r--r--boost/fiber/properties.hpp2
-rw-r--r--boost/fiber/scheduler.hpp16
-rw-r--r--boost/fiber/type.hpp1
-rw-r--r--boost/fiber/unbuffered_channel.hpp252
38 files changed, 1301 insertions, 643 deletions
diff --git a/boost/fiber/algo/algorithm.hpp b/boost/fiber/algo/algorithm.hpp
index 9b846e774b..b22a5923d6 100644
--- a/boost/fiber/algo/algorithm.hpp
+++ b/boost/fiber/algo/algorithm.hpp
@@ -6,11 +6,13 @@
#ifndef BOOST_FIBERS_ALGO_ALGORITHM_H
#define BOOST_FIBERS_ALGO_ALGORITHM_H
-#include <cstddef>
+#include <atomic>
#include <chrono>
+#include <cstddef>
-#include <boost/config.hpp>
#include <boost/assert.hpp>
+#include <boost/config.hpp>
+#include <boost/intrusive_ptr.hpp>
#include <boost/fiber/properties.hpp>
#include <boost/fiber/detail/config.hpp>
@@ -26,7 +28,13 @@ class context;
namespace algo {
-struct BOOST_FIBERS_DECL algorithm {
+class BOOST_FIBERS_DECL algorithm {
+private:
+ std::atomic< std::size_t > use_count_{ 0 };
+
+public:
+ typedef intrusive_ptr< algorithm > ptr_t;
+
virtual ~algorithm() {}
virtual void awakened( context *) noexcept = 0;
@@ -38,6 +46,19 @@ struct BOOST_FIBERS_DECL algorithm {
virtual void suspend_until( std::chrono::steady_clock::time_point const&) noexcept = 0;
virtual void notify() noexcept = 0;
+
+ friend void intrusive_ptr_add_ref( algorithm * algo) noexcept {
+ BOOST_ASSERT( nullptr != algo);
+ algo->use_count_.fetch_add( 1, std::memory_order_relaxed);
+ }
+
+ friend void intrusive_ptr_release( algorithm * algo) noexcept {
+ BOOST_ASSERT( nullptr != algo);
+ if ( 1 == algo->use_count_.fetch_sub( 1, std::memory_order_release) ) {
+ std::atomic_thread_fence( std::memory_order_acquire);
+ delete algo;
+ }
+ }
};
class BOOST_FIBERS_DECL algorithm_with_properties_base : public algorithm {
@@ -60,7 +81,7 @@ struct algorithm_with_properties : public algorithm_with_properties_base {
// with: algorithm_with_properties<PROPS>::awakened(fb);
virtual void awakened( context * ctx) noexcept override final {
fiber_properties * props = super::get_properties( ctx);
- if ( nullptr == props) {
+ if ( BOOST_LIKELY( nullptr == props) ) {
// TODO: would be great if PROPS could be allocated on the new
// fiber's stack somehow
props = new_properties( ctx);
diff --git a/boost/fiber/algo/numa/work_stealing.hpp b/boost/fiber/algo/numa/work_stealing.hpp
new file mode 100644
index 0000000000..26032ab35e
--- /dev/null
+++ b/boost/fiber/algo/numa/work_stealing.hpp
@@ -0,0 +1,93 @@
+
+// Copyright Oliver Kowalke 2017.
+// Distributed under the Boost Software License, Version 1.0.
+// (See accompanying file LICENSE_1_0.txt or copy at
+// http://www.boost.org/LICENSE_1_0.txt)
+//
+
+#ifndef BOOST_FIBERS_ALGO_NUMA_WORK_STEALING_H
+#define BOOST_FIBERS_ALGO_NUMA_WORK_STEALING_H
+
+#include <condition_variable>
+#include <chrono>
+#include <cstddef>
+#include <cstdint>
+#include <mutex>
+#include <vector>
+
+#include <boost/config.hpp>
+#include <boost/intrusive_ptr.hpp>
+
+#include <boost/fiber/algo/algorithm.hpp>
+#include <boost/fiber/context.hpp>
+#include <boost/fiber/detail/config.hpp>
+#include <boost/fiber/detail/context_spinlock_queue.hpp>
+#include <boost/fiber/detail/context_spmc_queue.hpp>
+#include <boost/fiber/numa/pin_thread.hpp>
+#include <boost/fiber/numa/topology.hpp>
+#include <boost/fiber/scheduler.hpp>
+
+#ifdef BOOST_HAS_ABI_HEADERS
+# include BOOST_ABI_PREFIX
+#endif
+
+namespace boost {
+namespace fibers {
+namespace algo {
+namespace numa {
+
+class work_stealing : public algorithm {
+private:
+ static std::vector< intrusive_ptr< work_stealing > > schedulers_;
+
+ std::uint32_t cpu_id_;
+ std::vector< std::uint32_t > local_cpus_;
+ std::vector< std::uint32_t > remote_cpus_;
+#ifdef BOOST_FIBERS_USE_SPMC_QUEUE
+ detail::context_spmc_queue rqueue_{};
+#else
+ detail::context_spinlock_queue rqueue_{};
+#endif
+ std::mutex mtx_{};
+ std::condition_variable cnd_{};
+ bool flag_{ false };
+ bool suspend_;
+
+ static void init_( std::vector< boost::fibers::numa::node > const&,
+ std::vector< intrusive_ptr< work_stealing > > &);
+
+public:
+ work_stealing( std::uint32_t, std::uint32_t,
+ std::vector< boost::fibers::numa::node > const&,
+ bool = false);
+
+ work_stealing( work_stealing const&) = delete;
+ work_stealing( work_stealing &&) = delete;
+
+ work_stealing & operator=( work_stealing const&) = delete;
+ work_stealing & operator=( work_stealing &&) = delete;
+
+ virtual void awakened( context *) noexcept;
+
+ virtual context * pick_next() noexcept;
+
+ virtual context * steal() noexcept {
+ return rqueue_.steal();
+ }
+
+ virtual bool has_ready_fibers() const noexcept {
+ return ! rqueue_.empty();
+ }
+
+ virtual void suspend_until( std::chrono::steady_clock::time_point const&) noexcept;
+
+ virtual void notify() noexcept;
+};
+
+}}}}
+
+#ifdef BOOST_HAS_ABI_HEADERS
+# include BOOST_ABI_SUFFIX
+#endif
+
+#endif // BOOST_FIBERS_ALGO_NUMA_WORK_STEALING_H
diff --git a/boost/fiber/algo/work_stealing.hpp b/boost/fiber/algo/work_stealing.hpp
index 66cadd12be..db4b5cf12d 100644
--- a/boost/fiber/algo/work_stealing.hpp
+++ b/boost/fiber/algo/work_stealing.hpp
@@ -8,19 +8,22 @@
#ifndef BOOST_FIBERS_ALGO_WORK_STEALING_H
#define BOOST_FIBERS_ALGO_WORK_STEALING_H
+#include <atomic>
#include <condition_variable>
#include <chrono>
#include <cstddef>
+#include <cstdint>
#include <mutex>
#include <vector>
#include <boost/config.hpp>
+#include <boost/intrusive_ptr.hpp>
#include <boost/fiber/algo/algorithm.hpp>
-#include <boost/fiber/detail/context_spinlock_queue.hpp>
-#include <boost/fiber/detail/context_spmc_queue.hpp>
#include <boost/fiber/context.hpp>
#include <boost/fiber/detail/config.hpp>
+#include <boost/fiber/detail/context_spinlock_queue.hpp>
+#include <boost/fiber/detail/context_spmc_queue.hpp>
#include <boost/fiber/scheduler.hpp>
#ifdef BOOST_HAS_ABI_HEADERS
@@ -33,24 +36,25 @@ namespace algo {
class work_stealing : public algorithm {
private:
- static std::vector< work_stealing * > schedulers_;
+ static std::atomic< std::uint32_t > counter_;
+ static std::vector< intrusive_ptr< work_stealing > > schedulers_;
- std::size_t idx_;
- std::size_t max_idx_;
+ std::uint32_t id_;
+ std::uint32_t thread_count_;
#ifdef BOOST_FIBERS_USE_SPMC_QUEUE
- alignas(cache_alignment) detail::context_spmc_queue rqueue_{};
+ detail::context_spmc_queue rqueue_{};
#else
- alignas(cache_alignment) detail::context_spinlock_queue rqueue_{};
+ detail::context_spinlock_queue rqueue_{};
#endif
std::mutex mtx_{};
std::condition_variable cnd_{};
bool flag_{ false };
bool suspend_;
- static void init_( std::size_t max_idx);
+ static void init_( std::uint32_t, std::vector< intrusive_ptr< work_stealing > > &);
public:
- work_stealing( std::size_t max_idx, std::size_t idx, bool suspend = false);
+ work_stealing( std::uint32_t, bool = false);
work_stealing( work_stealing const&) = delete;
work_stealing( work_stealing &&) = delete;
@@ -58,21 +62,21 @@ public:
work_stealing & operator=( work_stealing const&) = delete;
work_stealing & operator=( work_stealing &&) = delete;
- void awakened( context * ctx) noexcept;
+ virtual void awakened( context *) noexcept;
- context * pick_next() noexcept;
+ virtual context * pick_next() noexcept;
- context * steal() noexcept {
+ virtual context * steal() noexcept {
return rqueue_.steal();
}
- bool has_ready_fibers() const noexcept {
+ virtual bool has_ready_fibers() const noexcept {
return ! rqueue_.empty();
}
- void suspend_until( std::chrono::steady_clock::time_point const& time_point) noexcept;
+ virtual void suspend_until( std::chrono::steady_clock::time_point const&) noexcept;
- void notify() noexcept;
+ virtual void notify() noexcept;
};
}}}
diff --git a/boost/fiber/all.hpp b/boost/fiber/all.hpp
index 460df5295b..17ceca45cc 100644
--- a/boost/fiber/all.hpp
+++ b/boost/fiber/all.hpp
@@ -11,6 +11,7 @@
#include <boost/fiber/algo/round_robin.hpp>
#include <boost/fiber/algo/shared_work.hpp>
#include <boost/fiber/algo/work_stealing.hpp>
+#include <boost/fiber/algo/numa/work_stealing.hpp>
#include <boost/fiber/barrier.hpp>
#include <boost/fiber/buffered_channel.hpp>
#include <boost/fiber/channel_op_status.hpp>
@@ -21,6 +22,8 @@
#include <boost/fiber/fixedsize_stack.hpp>
#include <boost/fiber/fss.hpp>
#include <boost/fiber/future.hpp>
+#include <boost/fiber/numa/pin_thread.hpp>
+#include <boost/fiber/numa/topology.hpp>
#include <boost/fiber/mutex.hpp>
#include <boost/fiber/operations.hpp>
#include <boost/fiber/policy.hpp>
diff --git a/boost/fiber/buffered_channel.hpp b/boost/fiber/buffered_channel.hpp
index 1c32e49bae..3cf22295dd 100644
--- a/boost/fiber/buffered_channel.hpp
+++ b/boost/fiber/buffered_channel.hpp
@@ -40,7 +40,7 @@ private:
typedef context::wait_queue_t wait_queue_type;
typedef T slot_type;
- alignas(cache_alignment) mutable detail::spinlock splk_{};
+ mutable detail::spinlock splk_{};
wait_queue_type waiting_producers_{};
wait_queue_type waiting_consumers_{};
slot_type * slots_;
@@ -64,7 +64,7 @@ private:
public:
explicit buffered_channel( std::size_t capacity) :
capacity_{ capacity } {
- if ( 2 > capacity_ || 0 != ( capacity_ & (capacity_ - 1) ) ) {
+ if ( BOOST_UNLIKELY( 2 > capacity_ || 0 != ( capacity_ & (capacity_ - 1) ) ) ) {
throw fiber_error{ std::make_error_code( std::errc::invalid_argument),
"boost fiber: buffer capacity is invalid" };
}
@@ -92,20 +92,52 @@ public:
while ( ! waiting_producers_.empty() ) {
context * producer_ctx = & waiting_producers_.front();
waiting_producers_.pop_front();
- active_ctx->schedule( producer_ctx);
+ std::intptr_t expected = reinterpret_cast< std::intptr_t >( this);
+ if ( producer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) {
+ // notify before timeout
+ intrusive_ptr_release( producer_ctx);
+ // notify context
+ active_ctx->schedule( producer_ctx);
+ } else if ( static_cast< std::intptr_t >( 0) == expected) {
+ // no timed-wait op.
+ // notify context
+ active_ctx->schedule( producer_ctx);
+ } else {
+ // timed-wait op.
+ // expected == -1: notify after timeout, same timed-wait op.
+ // expected == <any>: notify after timeout, another timed-wait op. was already started
+ intrusive_ptr_release( producer_ctx);
+ // re-schedule next
+ }
}
// notify all waiting consumers
while ( ! waiting_consumers_.empty() ) {
context * consumer_ctx = & waiting_consumers_.front();
waiting_consumers_.pop_front();
- active_ctx->schedule( consumer_ctx);
+ std::intptr_t expected = reinterpret_cast< std::intptr_t >( this);
+ if ( consumer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) {
+ // notify before timeout
+ intrusive_ptr_release( consumer_ctx);
+ // notify context
+ active_ctx->schedule( consumer_ctx);
+ } else if ( static_cast< std::intptr_t >( 0) == expected) {
+ // no timed-wait op.
+ // notify context
+ active_ctx->schedule( consumer_ctx);
+ } else {
+ // timed-wait op.
+ // expected == -1: notify after timeout, same timed-wait op.
+ // expected == <any>: notify after timeout, another timed-wait op. was already started
+ intrusive_ptr_release( consumer_ctx);
+ // re-schedule next
+ }
}
}
channel_op_status try_push( value_type const& value) {
context * active_ctx = context::active();
detail::spinlock_lock lk{ splk_ };
- if ( is_closed_() ) {
+ if ( BOOST_UNLIKELY( is_closed_() ) ) {
return channel_op_status::closed;
} else if ( is_full_() ) {
return channel_op_status::full;
@@ -113,11 +145,28 @@ public:
slots_[pidx_] = value;
pidx_ = (pidx_ + 1) % capacity_;
// notify one waiting consumer
- if ( ! waiting_consumers_.empty() ) {
+ while ( ! waiting_consumers_.empty() ) {
context * consumer_ctx = & waiting_consumers_.front();
waiting_consumers_.pop_front();
- lk.unlock();
- active_ctx->schedule( consumer_ctx);
+ std::intptr_t expected = reinterpret_cast< std::intptr_t >( this);
+ if ( consumer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) {
+ // notify before timeout
+ intrusive_ptr_release( consumer_ctx);
+ // notify context
+ active_ctx->schedule( consumer_ctx);
+ break;
+ } else if ( static_cast< std::intptr_t >( 0) == expected) {
+ // no timed-wait op.
+ // notify context
+ active_ctx->schedule( consumer_ctx);
+ break;
+ } else {
+ // timed-wait op.
+ // expected == -1: notify after timeout, same timed-wait op.
+ // expected == <any>: notify after timeout, another timed-wait op. was already started
+ intrusive_ptr_release( consumer_ctx);
+ // re-schedule next
+ }
}
return channel_op_status::success;
}
@@ -126,7 +175,7 @@ public:
channel_op_status try_push( value_type && value) {
context * active_ctx = context::active();
detail::spinlock_lock lk{ splk_ };
- if ( is_closed_() ) {
+ if ( BOOST_UNLIKELY( is_closed_() ) ) {
return channel_op_status::closed;
} else if ( is_full_() ) {
return channel_op_status::full;
@@ -134,11 +183,29 @@ public:
slots_[pidx_] = std::move( value);
pidx_ = (pidx_ + 1) % capacity_;
// notify one waiting consumer
- if ( ! waiting_consumers_.empty() ) {
+ while ( ! waiting_consumers_.empty() ) {
context * consumer_ctx = & waiting_consumers_.front();
waiting_consumers_.pop_front();
lk.unlock();
- active_ctx->schedule( consumer_ctx);
+ std::intptr_t expected = reinterpret_cast< std::intptr_t >( this);
+ if ( consumer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) {
+ // notify before timeout
+ intrusive_ptr_release( consumer_ctx);
+ // notify context
+ active_ctx->schedule( consumer_ctx);
+ break;
+ } else if ( static_cast< std::intptr_t >( 0) == expected) {
+ // no timed-wait op.
+ // notify context
+ active_ctx->schedule( consumer_ctx);
+ break;
+ } else {
+ // timed-wait op.
+ // expected == -1: notify after timeout, same timed-wait op.
+ // expected == <any>: notify after timeout, another timed-wait op. was already started
+ intrusive_ptr_release( consumer_ctx);
+ // re-schedule next
+ }
}
return channel_op_status::success;
}
@@ -148,21 +215,40 @@ public:
context * active_ctx = context::active();
for (;;) {
detail::spinlock_lock lk{ splk_ };
- if ( is_closed_() ) {
+ if ( BOOST_UNLIKELY( is_closed_() ) ) {
return channel_op_status::closed;
} else if ( is_full_() ) {
active_ctx->wait_link( waiting_producers_);
+ active_ctx->twstatus.store( static_cast< std::intptr_t >( 0), std::memory_order_release);
// suspend this producer
active_ctx->suspend( lk);
} else {
slots_[pidx_] = value;
pidx_ = (pidx_ + 1) % capacity_;
// notify one waiting consumer
- if ( ! waiting_consumers_.empty() ) {
+ while ( ! waiting_consumers_.empty() ) {
context * consumer_ctx = & waiting_consumers_.front();
waiting_consumers_.pop_front();
lk.unlock();
- active_ctx->schedule( consumer_ctx);
+ std::intptr_t expected = reinterpret_cast< std::intptr_t >( this);
+ if ( consumer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) {
+ // notify before timeout
+ intrusive_ptr_release( consumer_ctx);
+ // notify context
+ active_ctx->schedule( consumer_ctx);
+ break;
+ } else if ( static_cast< std::intptr_t >( 0) == expected) {
+ // no timed-wait op.
+ // notify context
+ active_ctx->schedule( consumer_ctx);
+ break;
+ } else {
+ // timed-wait op.
+ // expected == -1: notify after timeout, same timed-wait op.
+ // expected == <any>: notify after timeout, another timed-wait op. was already started
+ intrusive_ptr_release( consumer_ctx);
+ // re-schedule next
+ }
}
return channel_op_status::success;
}
@@ -173,21 +259,40 @@ public:
context * active_ctx = context::active();
for (;;) {
detail::spinlock_lock lk{ splk_ };
- if ( is_closed_() ) {
+ if ( BOOST_UNLIKELY( is_closed_() ) ) {
return channel_op_status::closed;
} else if ( is_full_() ) {
active_ctx->wait_link( waiting_producers_);
+ active_ctx->twstatus.store( static_cast< std::intptr_t >( 0), std::memory_order_release);
// suspend this producer
active_ctx->suspend( lk);
} else {
slots_[pidx_] = std::move( value);
pidx_ = (pidx_ + 1) % capacity_;
// notify one waiting consumer
- if ( ! waiting_consumers_.empty() ) {
+ while ( ! waiting_consumers_.empty() ) {
context * consumer_ctx = & waiting_consumers_.front();
waiting_consumers_.pop_front();
lk.unlock();
- active_ctx->schedule( consumer_ctx);
+ std::intptr_t expected = reinterpret_cast< std::intptr_t >( this);
+ if ( consumer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) {
+ // notify before timeout
+ intrusive_ptr_release( consumer_ctx);
+ // notify context
+ active_ctx->schedule( consumer_ctx);
+ break;
+ } else if ( static_cast< std::intptr_t >( 0) == expected) {
+ // no timed-wait op.
+ // notify context
+ active_ctx->schedule( consumer_ctx);
+ break;
+ } else {
+ // timed-wait op.
+ // expected == -1: notify after timeout, same timed-wait op.
+ // expected == <any>: notify after timeout, another timed-wait op. was already started
+ intrusive_ptr_release( consumer_ctx);
+ // re-schedule next
+ }
}
return channel_op_status::success;
}
@@ -215,10 +320,12 @@ public:
std::chrono::steady_clock::time_point timeout_time = detail::convert( timeout_time_);
for (;;) {
detail::spinlock_lock lk{ splk_ };
- if ( is_closed_() ) {
+ if ( BOOST_UNLIKELY( is_closed_() ) ) {
return channel_op_status::closed;
} else if ( is_full_() ) {
active_ctx->wait_link( waiting_producers_);
+ intrusive_ptr_add_ref( active_ctx);
+ active_ctx->twstatus.store( reinterpret_cast< std::intptr_t >( this), std::memory_order_release);
// suspend this producer
if ( ! active_ctx->wait_until( timeout_time, lk) ) {
// relock local lk
@@ -231,11 +338,29 @@ public:
slots_[pidx_] = value;
pidx_ = (pidx_ + 1) % capacity_;
// notify one waiting consumer
- if ( ! waiting_consumers_.empty() ) {
+ while ( ! waiting_consumers_.empty() ) {
context * consumer_ctx = & waiting_consumers_.front();
waiting_consumers_.pop_front();
lk.unlock();
- active_ctx->schedule( consumer_ctx);
+ std::intptr_t expected = reinterpret_cast< std::intptr_t >( this);
+ if ( consumer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) {
+ // notify before timeout
+ intrusive_ptr_release( consumer_ctx);
+ // notify context
+ active_ctx->schedule( consumer_ctx);
+ break;
+ } else if ( static_cast< std::intptr_t >( 0) == expected) {
+ // no timed-wait op.
+ // notify context
+ active_ctx->schedule( consumer_ctx);
+ break;
+ } else {
+ // timed-wait op.
+ // expected == -1: notify after timeout, same timed-wait op.
+ // expected == <any>: notify after timeout, another timed-wait op. was already started
+ intrusive_ptr_release( consumer_ctx);
+ // re-schedule next
+ }
}
return channel_op_status::success;
}
@@ -249,10 +374,12 @@ public:
std::chrono::steady_clock::time_point timeout_time = detail::convert( timeout_time_);
for (;;) {
detail::spinlock_lock lk{ splk_ };
- if ( is_closed_() ) {
+ if ( BOOST_UNLIKELY( is_closed_() ) ) {
return channel_op_status::closed;
} else if ( is_full_() ) {
active_ctx->wait_link( waiting_producers_);
+ intrusive_ptr_add_ref( active_ctx);
+ active_ctx->twstatus.store( reinterpret_cast< std::intptr_t >( this), std::memory_order_release);
// suspend this producer
if ( ! active_ctx->wait_until( timeout_time, lk) ) {
// relock local lk
@@ -265,11 +392,29 @@ public:
slots_[pidx_] = std::move( value);
pidx_ = (pidx_ + 1) % capacity_;
// notify one waiting consumer
- if ( ! waiting_consumers_.empty() ) {
+ while ( ! waiting_consumers_.empty() ) {
context * consumer_ctx = & waiting_consumers_.front();
waiting_consumers_.pop_front();
lk.unlock();
- active_ctx->schedule( consumer_ctx);
+ std::intptr_t expected = reinterpret_cast< std::intptr_t >( this);
+ if ( consumer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) {
+ // notify before timeout
+ intrusive_ptr_release( consumer_ctx);
+ // notify context
+ active_ctx->schedule( consumer_ctx);
+ break;
+ } else if ( static_cast< std::intptr_t >( 0) == expected) {
+ // no timed-wait op.
+ // notify context
+ active_ctx->schedule( consumer_ctx);
+ break;
+ } else {
+ // timed-wait op.
+ // expected == -1: notify after timeout, same timed-wait op.
+ // expected == <any>: notify after timeout, another timed-wait op. was already started
+ intrusive_ptr_release( consumer_ctx);
+ // re-schedule next
+ }
}
return channel_op_status::success;
}
@@ -287,11 +432,29 @@ public:
value = std::move( slots_[cidx_]);
cidx_ = (cidx_ + 1) % capacity_;
// notify one waiting producer
- if ( ! waiting_producers_.empty() ) {
+ while ( ! waiting_producers_.empty() ) {
context * producer_ctx = & waiting_producers_.front();
waiting_producers_.pop_front();
lk.unlock();
- active_ctx->schedule( producer_ctx);
+ std::intptr_t expected = reinterpret_cast< std::intptr_t >( this);
+ if ( producer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) {
+ // notify before timeout
+ intrusive_ptr_release( producer_ctx);
+ // notify context
+ active_ctx->schedule( producer_ctx);
+ break;
+ } else if ( static_cast< std::intptr_t >( 0) == expected) {
+ // no timed-wait op.
+ // notify context
+ active_ctx->schedule( producer_ctx);
+ break;
+ } else {
+ // timed-wait op.
+ // expected == -1: notify after timeout, same timed-wait op.
+ // expected == <any>: notify after timeout, another timed-wait op. was already started
+ intrusive_ptr_release( producer_ctx);
+ // re-schedule next
+ }
}
return channel_op_status::success;
}
@@ -302,10 +465,11 @@ public:
for (;;) {
detail::spinlock_lock lk{ splk_ };
if ( is_empty_() ) {
- if ( is_closed_() ) {
+ if ( BOOST_UNLIKELY( is_closed_() ) ) {
return channel_op_status::closed;
} else {
active_ctx->wait_link( waiting_consumers_);
+ active_ctx->twstatus.store( static_cast< std::intptr_t >( 0), std::memory_order_release);
// suspend this consumer
active_ctx->suspend( lk);
}
@@ -313,11 +477,29 @@ public:
value = std::move( slots_[cidx_]);
cidx_ = (cidx_ + 1) % capacity_;
// notify one waiting producer
- if ( ! waiting_producers_.empty() ) {
+ while ( ! waiting_producers_.empty() ) {
context * producer_ctx = & waiting_producers_.front();
waiting_producers_.pop_front();
lk.unlock();
- active_ctx->schedule( producer_ctx);
+ std::intptr_t expected = reinterpret_cast< std::intptr_t >( this);
+ if ( producer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) {
+ // notify before timeout
+ intrusive_ptr_release( producer_ctx);
+ // notify context
+ active_ctx->schedule( producer_ctx);
+ break;
+ } else if ( static_cast< std::intptr_t >( 0) == expected) {
+ // no timed-wait op.
+ // notify context
+ active_ctx->schedule( producer_ctx);
+ break;
+ } else {
+ // timed-wait op.
+ // expected == -1: notify after timeout, same timed-wait op.
+ // expected == <any>: notify after timeout, another timed-wait op. was already started
+ intrusive_ptr_release( producer_ctx);
+ // re-schedule next
+ }
}
return channel_op_status::success;
}
@@ -329,12 +511,13 @@ public:
for (;;) {
detail::spinlock_lock lk{ splk_ };
if ( is_empty_() ) {
- if ( is_closed_() ) {
+ if ( BOOST_UNLIKELY( is_closed_() ) ) {
throw fiber_error{
std::make_error_code( std::errc::operation_not_permitted),
"boost fiber: channel is closed" };
} else {
active_ctx->wait_link( waiting_consumers_);
+ active_ctx->twstatus.store( static_cast< std::intptr_t >( 0), std::memory_order_release);
// suspend this consumer
active_ctx->suspend( lk);
}
@@ -342,11 +525,29 @@ public:
value_type value = std::move( slots_[cidx_]);
cidx_ = (cidx_ + 1) % capacity_;
// notify one waiting producer
- if ( ! waiting_producers_.empty() ) {
+ while ( ! waiting_producers_.empty() ) {
context * producer_ctx = & waiting_producers_.front();
waiting_producers_.pop_front();
lk.unlock();
- active_ctx->schedule( producer_ctx);
+ std::intptr_t expected = reinterpret_cast< std::intptr_t >( this);
+ if ( producer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) {
+ // notify before timeout
+ intrusive_ptr_release( producer_ctx);
+ // notify context
+ active_ctx->schedule( producer_ctx);
+ break;
+ } else if ( static_cast< std::intptr_t >( 0) == expected) {
+ // no timed-wait op.
+ // notify context
+ active_ctx->schedule( producer_ctx);
+ break;
+ } else {
+ // timed-wait op.
+ // expected == -1: notify after timeout, same timed-wait op.
+ // expected == <any>: notify after timeout, another timed-wait op. was already started
+ intrusive_ptr_release( producer_ctx);
+ // re-schedule next
+ }
}
return std::move( value);
}
@@ -368,10 +569,12 @@ public:
for (;;) {
detail::spinlock_lock lk{ splk_ };
if ( is_empty_() ) {
- if ( is_closed_() ) {
+ if ( BOOST_UNLIKELY( is_closed_() ) ) {
return channel_op_status::closed;
} else {
active_ctx->wait_link( waiting_consumers_);
+ intrusive_ptr_add_ref( active_ctx);
+ active_ctx->twstatus.store( reinterpret_cast< std::intptr_t >( this), std::memory_order_release);
// suspend this consumer
if ( ! active_ctx->wait_until( timeout_time, lk) ) {
// relock local lk
@@ -385,11 +588,29 @@ public:
value = std::move( slots_[cidx_]);
cidx_ = (cidx_ + 1) % capacity_;
// notify one waiting producer
- if ( ! waiting_producers_.empty() ) {
+ while ( ! waiting_producers_.empty() ) {
context * producer_ctx = & waiting_producers_.front();
waiting_producers_.pop_front();
lk.unlock();
- active_ctx->schedule( producer_ctx);
+ std::intptr_t expected = reinterpret_cast< std::intptr_t >( this);
+ if ( producer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) {
+ // notify before timeout
+ intrusive_ptr_release( producer_ctx);
+ // notify context
+ active_ctx->schedule( producer_ctx);
+ break;
+ } else if ( static_cast< std::intptr_t >( 0) == expected) {
+ // no timed-wait op.
+ // notify context
+ active_ctx->schedule( producer_ctx);
+ break;
+ } else {
+ // timed-wait op.
+ // expected == -1: notify after timeout, same timed-wait op.
+ // expected == <any>: notify after timeout, another timed-wait op. was already started
+ intrusive_ptr_release( producer_ctx);
+ // re-schedule next
+ }
}
return channel_op_status::success;
}
@@ -428,8 +649,9 @@ public:
}
iterator & operator=( iterator const& other) noexcept {
- if ( this == & other) return * this;
- chan_ = other.chan_;
+ if ( BOOST_LIKELY( this != & other) ) {
+ chan_ = other.chan_;
+ }
return * this;
}
diff --git a/boost/fiber/condition_variable.hpp b/boost/fiber/condition_variable.hpp
index cd0e7cb022..2cd3ecc719 100644
--- a/boost/fiber/condition_variable.hpp
+++ b/boost/fiber/condition_variable.hpp
@@ -70,6 +70,7 @@ public:
detail::spinlock_lock lk{ wait_queue_splk_ };
BOOST_ASSERT( ! active_ctx->wait_is_linked() );
active_ctx->wait_link( wait_queue_);
+ active_ctx->twstatus.store( static_cast< std::intptr_t >( 0), std::memory_order_release);
// unlock external lt
lt.unlock();
// suspend this fiber
@@ -101,6 +102,8 @@ public:
detail::spinlock_lock lk{ wait_queue_splk_ };
BOOST_ASSERT( ! active_ctx->wait_is_linked() );
active_ctx->wait_link( wait_queue_);
+ intrusive_ptr_add_ref( active_ctx);
+ active_ctx->twstatus.store( reinterpret_cast< std::intptr_t >( this), std::memory_order_release);
// unlock external lt
lt.unlock();
// suspend this fiber
diff --git a/boost/fiber/context.hpp b/boost/fiber/context.hpp
index 773528e3a1..e2d1aeff5f 100644
--- a/boost/fiber/context.hpp
+++ b/boost/fiber/context.hpp
@@ -7,11 +7,12 @@
#ifndef BOOST_FIBERS_CONTEXT_H
#define BOOST_FIBERS_CONTEXT_H
-#include <iostream>
#include <atomic>
#include <chrono>
+#include <cstdint>
#include <exception>
#include <functional>
+#include <iostream>
#include <map>
#include <memory>
#include <tuple>
@@ -22,11 +23,7 @@
#if defined(BOOST_NO_CXX17_STD_APPLY)
#include <boost/context/detail/apply.hpp>
#endif
-#if (BOOST_EXECUTION_CONTEXT==1)
-# include <boost/context/execution_context.hpp>
-#else
-# include <boost/context/continuation.hpp>
-#endif
+#include <boost/context/continuation.hpp>
#include <boost/context/stack_context.hpp>
#include <boost/intrusive/list.hpp>
#include <boost/intrusive/parent_from_member.hpp>
@@ -39,7 +36,6 @@
#include <boost/fiber/detail/decay_copy.hpp>
#include <boost/fiber/detail/fss.hpp>
#include <boost/fiber/detail/spinlock.hpp>
-#include <boost/fiber/detail/wrap.hpp>
#include <boost/fiber/exceptions.hpp>
#include <boost/fiber/fixedsize_stack.hpp>
#include <boost/fiber/policy.hpp>
@@ -66,10 +62,10 @@ class scheduler;
namespace detail {
struct wait_tag;
-typedef intrusive::slist_member_hook<
+typedef intrusive::list_member_hook<
intrusive::tag< wait_tag >,
intrusive::link_mode<
- intrusive::safe_link
+ intrusive::auto_unlink
>
> wait_hook;
// declaration of the functor that converts between
@@ -132,25 +128,18 @@ typedef intrusive::slist_member_hook<
}
-struct main_context_t {};
-const main_context_t main_context{};
-
-struct dispatcher_context_t {};
-const dispatcher_context_t dispatcher_context{};
-
-struct worker_context_t {};
-const worker_context_t worker_context{};
-
class BOOST_FIBERS_DECL context {
public:
- typedef intrusive::slist<
+ typedef intrusive::list<
context,
intrusive::function_hook< detail::wait_functor >,
- intrusive::linear< true >,
- intrusive::cache_last< true >
+ intrusive::constant_time_size< false >
> wait_queue_t;
private:
+ friend class dispatcher_context;
+ friend class main_context;
+ template< typename Fn, typename ... Arg > friend class worker_context;
friend class scheduler;
struct fss_data {
@@ -175,88 +164,39 @@ private:
typedef std::map< uintptr_t, fss_data > fss_data_t;
#if ! defined(BOOST_FIBERS_NO_ATOMICS)
- alignas(cache_alignment) std::atomic< std::size_t > use_count_{ 0 };
+ std::atomic< std::size_t > use_count_;
#else
- alignas(cache_alignment) std::size_t use_count_{ 0 };
+ std::size_t use_count_;
#endif
#if ! defined(BOOST_FIBERS_NO_ATOMICS)
- alignas(cache_alignment) detail::remote_ready_hook remote_ready_hook_{};
- std::atomic< context * > remote_nxt_{ nullptr };
+ detail::remote_ready_hook remote_ready_hook_{};
#endif
- alignas(cache_alignment) detail::spinlock splk_{};
+ detail::spinlock splk_{};
bool terminated_{ false };
wait_queue_t wait_queue_{};
public:
detail::wait_hook wait_hook_{};
+#if ! defined(BOOST_FIBERS_NO_ATOMICS)
+ std::atomic< std::intptr_t > twstatus{ 0 };
+#endif
private:
- alignas(cache_alignment) scheduler * scheduler_{ nullptr };
+ scheduler * scheduler_{ nullptr };
fss_data_t fss_data_{};
detail::sleep_hook sleep_hook_{};
detail::ready_hook ready_hook_{};
detail::terminated_hook terminated_hook_{};
detail::worker_hook worker_hook_{};
-#if (BOOST_EXECUTION_CONTEXT==1)
- boost::context::execution_context ctx_;
-#else
- boost::context::continuation c_;
-#endif
fiber_properties * properties_{ nullptr };
std::chrono::steady_clock::time_point tp_{ (std::chrono::steady_clock::time_point::max)() };
+ boost::context::continuation c_{};
type type_;
launch policy_;
- void resume_( detail::data_t &) noexcept;
- void schedule_( context *) noexcept;
-
-#if (BOOST_EXECUTION_CONTEXT==1)
- template< typename Fn, typename Tpl >
- void run_( Fn && fn_, Tpl && tpl_, detail::data_t * dp) noexcept {
- {
- // fn and tpl must be destroyed before calling terminate()
- typename std::decay< Fn >::type fn = std::forward< Fn >( fn_);
- typename std::decay< Tpl >::type tpl = std::forward< Tpl >( tpl_);
- if ( nullptr != dp->lk) {
- dp->lk->unlock();
- } else if ( nullptr != dp->ctx) {
- active()->schedule_( dp->ctx);
- }
-#if defined(BOOST_NO_CXX17_STD_APPLY)
- boost::context::detail::apply( std::move( fn), std::move( tpl) );
-#else
- std::apply( std::move( fn), std::move( tpl) );
-#endif
- }
- // terminate context
- terminate();
- BOOST_ASSERT_MSG( false, "fiber already terminated");
- }
-#else
- template< typename Fn, typename Tpl >
- boost::context::continuation
- run_( boost::context::continuation && c, Fn && fn_, Tpl && tpl_) noexcept {
- {
- // fn and tpl must be destroyed before calling terminate()
- typename std::decay< Fn >::type fn = std::forward< Fn >( fn_);
- typename std::decay< Tpl >::type tpl = std::forward< Tpl >( tpl_);
- c = c.resume();
- detail::data_t * dp = c.get_data< detail::data_t * >();
- // update contiunation of calling fiber
- dp->from->c_ = std::move( c);
- if ( nullptr != dp->lk) {
- dp->lk->unlock();
- } else if ( nullptr != dp->ctx) {
- active()->schedule_( dp->ctx);
- }
-#if defined(BOOST_NO_CXX17_STD_APPLY)
- boost::context::detail::apply( std::move( fn), std::move( tpl) );
-#else
- std::apply( std::move( fn), std::move( tpl) );
-#endif
- }
- // terminate context
- return terminate();
+ context( std::size_t initial_count, type t, launch policy) noexcept :
+ use_count_{ initial_count },
+ type_{ t },
+ policy_{ policy } {
}
-#endif
public:
class id {
@@ -317,73 +257,6 @@ public:
static void reset_active() noexcept;
- // main fiber context
- explicit context( main_context_t) noexcept;
-
- // dispatcher fiber context
- context( dispatcher_context_t, boost::context::preallocated const&,
- default_stack const&, scheduler *);
-
- // worker fiber context
- template< typename StackAlloc,
- typename Fn,
- typename Tpl
- >
- context( worker_context_t,
- launch policy,
- boost::context::preallocated palloc, StackAlloc salloc,
- Fn && fn, Tpl && tpl) :
- use_count_{ 1 }, // fiber instance or scheduler owner
-#if (BOOST_EXECUTION_CONTEXT==1)
-# if defined(BOOST_NO_CXX14_GENERIC_LAMBDAS)
- ctx_{ std::allocator_arg, palloc, salloc,
- detail::wrap(
- [this]( typename std::decay< Fn >::type & fn, typename std::decay< Tpl >::type & tpl,
- boost::context::execution_context & ctx, void * vp) mutable noexcept {
- run_( std::move( fn), std::move( tpl), static_cast< detail::data_t * >( vp) );
- },
- std::forward< Fn >( fn),
- std::forward< Tpl >( tpl),
- boost::context::execution_context::current() )
- },
- type_{ type::worker_context },
- policy_{ policy }
-# else
- ctx_{ std::allocator_arg, palloc, salloc,
- [this,fn=detail::decay_copy( std::forward< Fn >( fn) ),tpl=std::forward< Tpl >( tpl),
- ctx=boost::context::execution_context::current()] (void * vp) mutable noexcept {
- run_( std::move( fn), std::move( tpl), static_cast< detail::data_t * >( vp) );
- }},
- type_{ type::worker_context },
- policy_{ policy }
-# endif
- {}
-#else
- c_{},
- type_{ type::worker_context },
- policy_{ policy }
- {
-# if defined(BOOST_NO_CXX14_GENERIC_LAMBDAS)
- c_ = boost::context::callcc(
- std::allocator_arg, palloc, salloc,
- detail::wrap(
- [this]( typename std::decay< Fn >::type & fn, typename std::decay< Tpl >::type & tpl,
- boost::context::continuation && c) mutable noexcept {
- return run_( std::forward< boost::context::continuation >( c), std::move( fn), std::move( tpl) );
- },
- std::forward< Fn >( fn),
- std::forward< Tpl >( tpl) ) );
-# else
- c_ = boost::context::callcc(
- std::allocator_arg, palloc, salloc,
- [this,fn=detail::decay_copy( std::forward< Fn >( fn) ),tpl=std::forward< Tpl >( tpl)]
- (boost::context::continuation && c) mutable noexcept {
- return run_( std::forward< boost::context::continuation >( c), std::move( fn), std::move( tpl) );
- });
-# endif
- }
-#endif
-
context( context const&) = delete;
context & operator=( context const&) = delete;
@@ -412,12 +285,9 @@ public:
void suspend() noexcept;
void suspend( detail::spinlock_lock &) noexcept;
-#if (BOOST_EXECUTION_CONTEXT==1)
- void terminate() noexcept;
-#else
boost::context::continuation suspend_with_cc() noexcept;
boost::context::continuation terminate() noexcept;
-#endif
+
void join();
void yield() noexcept;
@@ -510,6 +380,8 @@ public:
void sleep_unlink() noexcept;
+ void wait_unlink() noexcept;
+
void detach() noexcept;
void attach( context *) noexcept;
@@ -523,18 +395,11 @@ public:
BOOST_ASSERT( nullptr != ctx);
if ( 1 == ctx->use_count_.fetch_sub( 1, std::memory_order_release) ) {
std::atomic_thread_fence( std::memory_order_acquire);
-#if (BOOST_EXECUTION_CONTEXT==1)
- boost::context::execution_context ec = ctx->ctx_;
- // destruct context
- // deallocates stack (execution_context is ref counted)
- ctx->~context();
-#else
boost::context::continuation c = std::move( ctx->c_);
// destruct context
ctx->~context();
// deallocated stack
- c.resume( nullptr);
-#endif
+ c.resume();
}
}
};
@@ -544,36 +409,66 @@ bool operator<( context const& l, context const& r) noexcept {
return l.get_id() < r.get_id();
}
-template< typename StackAlloc, typename Fn, typename ... Args >
+template< typename Fn, typename ... Arg >
+class worker_context final : public context {
+private:
+ typename std::decay< Fn >::type fn_;
+ std::tuple< Arg ... > arg_;
+
+ boost::context::continuation
+ run_( boost::context::continuation && c) {
+ {
+ // fn and tpl must be destroyed before calling terminate()
+ auto fn = std::move( fn_);
+ auto arg = std::move( arg_);
+ c.resume();
+#if defined(BOOST_NO_CXX17_STD_APPLY)
+ boost::context::detail::apply( std::move( fn_), std::move( arg_) );
+#else
+ std::apply( std::move( fn_), std::move( arg_) );
+#endif
+ }
+ // terminate context
+ return terminate();
+ }
+
+public:
+ template< typename StackAlloc >
+ worker_context( launch policy,
+ boost::context::preallocated const& palloc, StackAlloc const& salloc,
+ Fn && fn, Arg ... arg) :
+ context{ 1, type::worker_context, policy },
+ fn_( std::forward< Fn >( fn) ),
+ arg_( std::forward< Arg >( arg) ... ) {
+ c_ = boost::context::callcc(
+ std::allocator_arg, palloc, salloc,
+ std::bind( & worker_context::run_, this, std::placeholders::_1) );
+ }
+};
+
+
+template< typename StackAlloc, typename Fn, typename ... Arg >
static intrusive_ptr< context > make_worker_context( launch policy,
StackAlloc salloc,
- Fn && fn, Args && ... args) {
- boost::context::stack_context sctx = salloc.allocate();
-#if defined(BOOST_NO_CXX14_CONSTEXPR) || defined(BOOST_NO_CXX11_STD_ALIGN)
+ Fn && fn, Arg ... arg) {
+ typedef worker_context< Fn, Arg ... > context_t;
+
+ auto sctx = salloc.allocate();
// reserve space for control structure
- const std::size_t size = sctx.size - sizeof( context);
- void * sp = static_cast< char * >( sctx.sp) - sizeof( context);
-#else
- constexpr std::size_t func_alignment = 64; // alignof( context);
- constexpr std::size_t func_size = sizeof( context);
- // reserve space on stack
- void * sp = static_cast< char * >( sctx.sp) - func_size - func_alignment;
- // align sp pointer
- std::size_t space = func_size + func_alignment;
- sp = std::align( func_alignment, func_size, sp, space);
- BOOST_ASSERT( nullptr != sp);
- // calculate remaining size
- const std::size_t size = sctx.size - ( static_cast< char * >( sctx.sp) - static_cast< char * >( sp) );
-#endif
+ void * storage = reinterpret_cast< void * >(
+ ( reinterpret_cast< uintptr_t >( sctx.sp) - static_cast< uintptr_t >( sizeof( context_t) ) )
+ & ~ static_cast< uintptr_t >( 0xff) );
+ void * stack_bottom = reinterpret_cast< void * >(
+ reinterpret_cast< uintptr_t >( sctx.sp) - static_cast< uintptr_t >( sctx.size) );
+ const std::size_t size = reinterpret_cast< uintptr_t >( storage) - reinterpret_cast< uintptr_t >( stack_bottom);
// placement new of context on top of fiber's stack
return intrusive_ptr< context >{
- ::new ( sp) context{
- worker_context,
+ new ( storage) context_t{
policy,
- boost::context::preallocated{ sp, size, sctx },
+ boost::context::preallocated{ storage, size, sctx },
salloc,
std::forward< Fn >( fn),
- std::make_tuple( std::forward< Args >( args) ... ) } };
+ std::forward< Arg >( arg) ... } };
}
namespace detail {
diff --git a/boost/fiber/detail/config.hpp b/boost/fiber/detail/config.hpp
index 7c7119e1fb..21dea693ac 100644
--- a/boost/fiber/detail/config.hpp
+++ b/boost/fiber/detail/config.hpp
@@ -47,19 +47,20 @@
# error "futex not supported on this platform"
#endif
-#if !defined(BOOST_FIBERS_SPIN_MAX_COLLISIONS)
-# define BOOST_FIBERS_SPIN_MAX_COLLISIONS 16
+#if !defined(BOOST_FIBERS_CONTENTION_WINDOW_THRESHOLD)
+# define BOOST_FIBERS_CONTENTION_WINDOW_THRESHOLD 16
#endif
-#if !defined(BOOST_FIBERS_SPIN_MAX_TESTS)
-# define BOOST_FIBERS_SPIN_MAX_TESTS 500
+#if !defined(BOOST_FIBERS_RETRY_THRESHOLD)
+# define BOOST_FIBERS_RETRY_THRESHOLD 64
#endif
-// modern architectures have cachelines with 64byte length
-// ARM Cortex-A15 32/64byte, Cortex-A9 16/32/64bytes
-// MIPS 74K: 32byte, 4KEc: 16byte
-// ist shoudl be safe to use 64byte for all
-static constexpr std::size_t cache_alignment{ 64 };
-static constexpr std::size_t cacheline_length{ 64 };
+#if !defined(BOOST_FIBERS_SPIN_BEFORE_SLEEP0)
+# define BOOST_FIBERS_SPIN_BEFORE_SLEEP0 32
+#endif
+
+#if !defined(BOOST_FIBERS_SPIN_BEFORE_YIELD)
+# define BOOST_FIBERS_SPIN_BEFORE_YIELD 64
+#endif
#endif // BOOST_FIBERS_DETAIL_CONFIG_H
diff --git a/boost/fiber/detail/context_spinlock_queue.hpp b/boost/fiber/detail/context_spinlock_queue.hpp
index e0ebdabda6..f58fbd2296 100644
--- a/boost/fiber/detail/context_spinlock_queue.hpp
+++ b/boost/fiber/detail/context_spinlock_queue.hpp
@@ -30,7 +30,7 @@ class context_spinlock_queue {
private:
typedef context * slot_type;
- alignas(cache_alignment) mutable spinlock splk_{};
+ mutable spinlock splk_{};
std::size_t pidx_{ 0 };
std::size_t cidx_{ 0 };
std::size_t capacity_;
diff --git a/boost/fiber/detail/context_spmc_queue.hpp b/boost/fiber/detail/context_spmc_queue.hpp
index 27256233cf..89f93044f9 100644
--- a/boost/fiber/detail/context_spmc_queue.hpp
+++ b/boost/fiber/detail/context_spmc_queue.hpp
@@ -44,9 +44,7 @@ private:
class array {
private:
typedef std::atomic< context * > atomic_type;
- typedef std::aligned_storage<
- sizeof( atomic_type), cache_alignment
- >::type storage_type;
+ typedef atomic_type storage_type;
std::size_t capacity_;
storage_type * storage_;
@@ -92,9 +90,9 @@ private:
}
};
- alignas(cache_alignment) std::atomic< std::size_t > top_{ 0 };
- alignas(cache_alignment) std::atomic< std::size_t > bottom_{ 0 };
- alignas(cache_alignment) std::atomic< array * > array_;
+ std::atomic< std::size_t > top_{ 0 };
+ std::atomic< std::size_t > bottom_{ 0 };
+ std::atomic< array * > array_;
std::vector< array * > old_arrays_{};
char padding_[cacheline_length];
diff --git a/boost/fiber/detail/convert.hpp b/boost/fiber/detail/convert.hpp
index ac190d8528..ba3bbbd0aa 100644
--- a/boost/fiber/detail/convert.hpp
+++ b/boost/fiber/detail/convert.hpp
@@ -34,22 +34,6 @@ std::chrono::steady_clock::time_point convert(
return std::chrono::steady_clock::now() + ( timeout_time - Clock::now() );
}
-// suggested by Howard Hinnant
-template< typename T >
-inline
-T * convert( T * p) noexcept {
- return p;
-}
-
-template< typename Pointer >
-inline
-typename std::pointer_traits< Pointer >::element_type *
-convert( Pointer p) noexcept {
- return nullptr != p
- ? to_raw_pointer( p.operator->() )
- : nullptr;
-}
-
}}}
#ifdef BOOST_HAS_ABI_HEADERS
diff --git a/boost/fiber/detail/cpu_relax.hpp b/boost/fiber/detail/cpu_relax.hpp
index 541b46dfd0..8a20aae059 100644
--- a/boost/fiber/detail/cpu_relax.hpp
+++ b/boost/fiber/detail/cpu_relax.hpp
@@ -16,7 +16,7 @@
#include <boost/fiber/detail/config.hpp>
#if BOOST_COMP_MSVC || BOOST_COMP_MSVC_EMULATED
-# include <Windows.h>
+# include <windows.h>
#endif
#ifdef BOOST_HAS_ABI_HEADERS
diff --git a/boost/fiber/detail/data.hpp b/boost/fiber/detail/data.hpp
index e2b119ec3e..c363817a09 100644
--- a/boost/fiber/detail/data.hpp
+++ b/boost/fiber/detail/data.hpp
@@ -23,22 +23,6 @@ class context;
namespace detail {
-#if (BOOST_EXECUTION_CONTEXT==1)
-struct data_t {
- spinlock_lock * lk{ nullptr };
- context * ctx{ nullptr };
-
- data_t() = default;
-
- explicit data_t( spinlock_lock * lk_) noexcept :
- lk{ lk_ } {
- }
-
- explicit data_t( context * ctx_) noexcept :
- ctx{ ctx_ } {
- }
-};
-#else
struct data_t {
spinlock_lock * lk{ nullptr };
context * ctx{ nullptr };
@@ -60,7 +44,6 @@ struct data_t {
from{ from_ } {
}
};
-#endif
}}}
diff --git a/boost/fiber/detail/futex.hpp b/boost/fiber/detail/futex.hpp
index d383dc4077..e64bd5990d 100644
--- a/boost/fiber/detail/futex.hpp
+++ b/boost/fiber/detail/futex.hpp
@@ -18,7 +18,7 @@ extern "C" {
#include <sys/syscall.h>
}
#elif BOOST_OS_WINDOWS
-#include <Windows.h>
+#include <windows.h>
#endif
namespace boost {
@@ -26,28 +26,28 @@ namespace fibers {
namespace detail {
#if BOOST_OS_LINUX
-inline
+BOOST_FORCEINLINE
int sys_futex( void * addr, std::int32_t op, std::int32_t x) {
return ::syscall( SYS_futex, addr, op, x, nullptr, nullptr, 0);
}
-inline
+BOOST_FORCEINLINE
int futex_wake( std::atomic< std::int32_t > * addr) {
return 0 <= sys_futex( static_cast< void * >( addr), FUTEX_WAKE_PRIVATE, 1) ? 0 : -1;
}
-inline
+BOOST_FORCEINLINE
int futex_wait( std::atomic< std::int32_t > * addr, std::int32_t x) {
return 0 <= sys_futex( static_cast< void * >( addr), FUTEX_WAIT_PRIVATE, x) ? 0 : -1;
}
#elif BOOST_OS_WINDOWS
-inline
+BOOST_FORCEINLINE
int futex_wake( std::atomic< std::int32_t > * addr) {
::WakeByAddressSingle( static_cast< void * >( addr) );
return 0;
}
-inline
+BOOST_FORCEINLINE
int futex_wait( std::atomic< std::int32_t > * addr, std::int32_t x) {
::WaitOnAddress( static_cast< volatile void * >( addr), & x, sizeof( x), INFINITE);
return 0;
diff --git a/boost/fiber/detail/rtm.hpp b/boost/fiber/detail/rtm.hpp
new file mode 100644
index 0000000000..5188b0d216
--- /dev/null
+++ b/boost/fiber/detail/rtm.hpp
@@ -0,0 +1,94 @@
+
+// Copyright Oliver Kowalke 2017.
+// Distributed under the Boost Software License, Version 1.0.
+// (See accompanying file LICENSE_1_0.txt or copy at
+// http://www.boost.org/LICENSE_1_0.txt)
+//
+
+#ifndef BOOST_FIBER_DETAIL_RTM_H
+#define BOOST_FIBER_DETAIL_RTM_H
+
+#include <cstdint>
+
+#include <boost/assert.hpp>
+#include <boost/config.hpp>
+
+#include <boost/fiber/detail/config.hpp>
+
+#ifdef BOOST_HAS_ABI_HEADERS
+# include BOOST_ABI_PREFIX
+#endif
+
+namespace boost {
+namespace fibers {
+namespace detail {
+
+struct rtm_status {
+ enum {
+ none = 0,
+ explicit_abort = 1 << 0,
+ may_retry = 1 << 1,
+ memory_conflict = 1 << 2,
+ buffer_overflow = 1 << 3,
+ debug_hit = 1 << 4,
+ nested_abort = 1 << 5
+ };
+
+ static constexpr std::uint32_t success = ~std::uint32_t{ 0 };
+};
+
+static BOOST_FORCEINLINE
+std::uint32_t rtm_begin() noexcept {
+ std::uint32_t result = rtm_status::success;
+ __asm__ __volatile__
+ (
+ ".byte 0xc7,0xf8 ; .long 0"
+ : "+a" (result)
+ :
+ : "memory"
+ );
+ return result;
+}
+
+static BOOST_FORCEINLINE
+void rtm_end() noexcept {
+ __asm__ __volatile__
+ (
+ ".byte 0x0f,0x01,0xd5"
+ :
+ :
+ : "memory"
+ );
+}
+
+static BOOST_FORCEINLINE
+void rtm_abort_lock_not_free() noexcept {
+ __asm__ __volatile__
+ (
+ ".byte 0xc6,0xf8,0xff"
+ :
+ :
+ : "memory"
+ );
+}
+
+static BOOST_FORCEINLINE
+bool rtm_test() noexcept {
+ bool result;
+ __asm__ __volatile__
+ (
+ ".byte 0x0f,0x01,0xd6; setz %0"
+ : "=q" (result)
+ :
+ : "memory"
+ );
+ return result;
+}
+
+}}}
+
+#ifdef BOOST_HAS_ABI_HEADERS
+# include BOOST_ABI_SUFFIX
+#endif
+
+#endif // BOOST_FIBER_DETAIL_RTM_H
diff --git a/boost/fiber/detail/spinlock.hpp b/boost/fiber/detail/spinlock.hpp
index 89a6d51a6f..59d2a5cd2b 100644
--- a/boost/fiber/detail/spinlock.hpp
+++ b/boost/fiber/detail/spinlock.hpp
@@ -13,11 +13,14 @@
#if !defined(BOOST_FIBERS_NO_ATOMICS)
# include <mutex>
-# include <boost/fiber/detail/spinlock_ttas.hpp>
# include <boost/fiber/detail/spinlock_ttas_adaptive.hpp>
+# include <boost/fiber/detail/spinlock_ttas.hpp>
# if defined(BOOST_FIBERS_HAS_FUTEX)
-# include <boost/fiber/detail/spinlock_ttas_futex.hpp>
# include <boost/fiber/detail/spinlock_ttas_adaptive_futex.hpp>
+# include <boost/fiber/detail/spinlock_ttas_futex.hpp>
+# endif
+# if defined(BOOST_USE_TSX)
+# include <boost/fiber/detail/spinlock_rtm.hpp>
# endif
#endif
@@ -29,7 +32,7 @@ namespace boost {
namespace fibers {
namespace detail {
-#if defined(BOOST_FIBERS_NO_ATOMICS)
+#if defined(BOOST_FIBERS_NO_ATOMICS)
struct spinlock {
constexpr spinlock() noexcept {}
void lock() noexcept {}
@@ -42,16 +45,32 @@ struct spinlock_lock {
void unlock() noexcept {}
};
#else
-# if defined(BOOST_FIBERS_SPINLOCK_STD_MUTEX)
+# if defined(BOOST_FIBERS_SPINLOCK_STD_MUTEX)
using spinlock = std::mutex;
# elif defined(BOOST_FIBERS_SPINLOCK_TTAS_FUTEX)
+# if defined(BOOST_USE_TSX)
+using spinlock = spinlock_rtm< spinlock_ttas_futex >;
+# else
using spinlock = spinlock_ttas_futex;
+# endif
# elif defined(BOOST_FIBERS_SPINLOCK_TTAS_ADAPTIVE_FUTEX)
+# if defined(BOOST_USE_TSX)
+using spinlock = spinlock_rtm< spinlock_ttas_adaptive_futex >;
+# else
using spinlock = spinlock_ttas_adaptive_futex;
-# elif defined(BOOST_FIBERS_SPINLOCK_TTAS_ADAPTIVE)
+# endif
+# elif defined(BOOST_FIBERS_SPINLOCK_TTAS_ADAPTIVE)
+# if defined(BOOST_USE_TSX)
+using spinlock = spinlock_rtm< spinlock_ttas_adaptive >;
+# else
using spinlock = spinlock_ttas_adaptive;
+# endif
# else
+# if defined(BOOST_USE_TSX)
+using spinlock = spinlock_rtm< spinlock_ttas >;
+# else
using spinlock = spinlock_ttas;
+# endif
# endif
using spinlock_lock = std::unique_lock< spinlock >;
#endif
diff --git a/boost/fiber/detail/spinlock_rtm.hpp b/boost/fiber/detail/spinlock_rtm.hpp
new file mode 100644
index 0000000000..5cc4a5e9af
--- /dev/null
+++ b/boost/fiber/detail/spinlock_rtm.hpp
@@ -0,0 +1,126 @@
+
+// Copyright Oliver Kowalke 2017.
+// Distributed under the Boost Software License, Version 1.0.
+// (See accompanying file LICENSE_1_0.txt or copy at
+// http://www.boost.org/LICENSE_1_0.txt)
+
+#ifndef BOOST_FIBERS_SPINLOCK_RTM_H
+#define BOOST_FIBERS_SPINLOCK_RTM_H
+
+#include <atomic>
+#include <chrono>
+#include <cmath>
+#include <random>
+#include <thread>
+
+#include <boost/fiber/detail/config.hpp>
+#include <boost/fiber/detail/cpu_relax.hpp>
+#include <boost/fiber/detail/rtm.hpp>
+#include <boost/fiber/detail/spinlock_status.hpp>
+
+namespace boost {
+namespace fibers {
+namespace detail {
+
+template< typename FBSplk >
+class spinlock_rtm {
+private:
+ FBSplk splk_{};
+
+public:
+ spinlock_rtm() = default;
+
+ spinlock_rtm( spinlock_rtm const&) = delete;
+ spinlock_rtm & operator=( spinlock_rtm const&) = delete;
+
+ void lock() noexcept {
+ static thread_local std::minstd_rand generator{ std::random_device{}() };
+ std::size_t collisions = 0 ;
+ for ( std::size_t retries = 0; retries < BOOST_FIBERS_RETRY_THRESHOLD; ++retries) {
+ std::uint32_t status;
+ if ( rtm_status::success == ( status = rtm_begin() ) ) {
+ // add lock to read-set
+ if ( spinlock_status::unlocked == splk_.state_.load( std::memory_order_relaxed) ) {
+ // lock is free, enter critical section
+ return;
+ }
+ // lock was acquired by another thread
+ // explicit abort of transaction with abort argument 'lock not free'
+ rtm_abort_lock_not_free();
+ }
+ // transaction aborted
+ if ( rtm_status::none != (status & rtm_status::may_retry) ||
+ rtm_status::none != (status & rtm_status::memory_conflict) ) {
+ // another logical processor conflicted with a memory address that was
+ // part or the read-/write-set
+ if ( BOOST_FIBERS_CONTENTION_WINDOW_THRESHOLD > collisions) {
+ std::uniform_int_distribution< std::size_t > distribution{
+ 0, static_cast< std::size_t >( 1) << (std::min)(collisions, static_cast< std::size_t >( BOOST_FIBERS_CONTENTION_WINDOW_THRESHOLD)) };
+ const std::size_t z = distribution( generator);
+ ++collisions;
+ for ( std::size_t i = 0; i < z; ++i) {
+ cpu_relax();
+ }
+ } else {
+ std::this_thread::yield();
+ }
+ } else if ( rtm_status::none != (status & rtm_status::explicit_abort) &&
+ rtm_status::none == (status & rtm_status::nested_abort) ) {
+ // another logical processor has acquired the lock and
+ // abort was not caused by a nested transaction
+ // wait till lock becomes free again
+ std::size_t count = 0;
+ while ( spinlock_status::locked == splk_.state_.load( std::memory_order_relaxed) ) {
+ if ( BOOST_FIBERS_SPIN_BEFORE_SLEEP0 > count) {
+ ++count;
+ cpu_relax();
+ } else if ( BOOST_FIBERS_SPIN_BEFORE_YIELD > count) {
+ ++count;
+ static constexpr std::chrono::microseconds us0{ 0 };
+ std::this_thread::sleep_for( us0);
+#if 0
+ using namespace std::chrono_literals;
+ std::this_thread::sleep_for( 0ms);
+#endif
+ } else {
+ std::this_thread::yield();
+ }
+ }
+ } else {
+ // transaction aborted due:
+ // - internal buffer to track transactional state overflowed
+ // - debug exception or breakpoint exception was hit
+ // - abort during execution of nested transactions (max nesting limit exceeded)
+ // -> use fallback path
+ break;
+ }
+ }
+ splk_.lock();
+ }
+
+ bool try_lock() noexcept {
+ if ( rtm_status::success != rtm_begin() ) {
+ return false;
+ }
+
+ // add lock to read-set
+ if ( spinlock_status::unlocked != splk_.state_.load( std::memory_order_relaxed) ) {
+ // lock was acquired by another thread
+ // explicit abort of transaction with abort argument 'lock not free'
+ rtm_abort_lock_not_free();
+ }
+ return true;
+ }
+
+ void unlock() noexcept {
+ if ( spinlock_status::unlocked == splk_.state_.load( std::memory_order_acquire) ) {
+ rtm_end();
+ } else {
+ splk_.unlock();
+ }
+ }
+};
+
+}}}
+
+#endif // BOOST_FIBERS_SPINLOCK_RTM_H
diff --git a/boost/fiber/detail/spinlock_status.hpp b/boost/fiber/detail/spinlock_status.hpp
new file mode 100644
index 0000000000..74f09e4acc
--- /dev/null
+++ b/boost/fiber/detail/spinlock_status.hpp
@@ -0,0 +1,21 @@
+
+// Copyright Oliver Kowalke 2017.
+// Distributed under the Boost Software License, Version 1.0.
+// (See accompanying file LICENSE_1_0.txt or copy at
+// http://www.boost.org/LICENSE_1_0.txt)
+
+#ifndef BOOST_FIBERS_SPINLOCK_STATUS_H
+#define BOOST_FIBERS_SPINLOCK_STATUS_H
+
+namespace boost {
+namespace fibers {
+namespace detail {
+
+enum class spinlock_status {
+ locked = 0,
+ unlocked
+};
+
+}}}
+
+#endif // BOOST_FIBERS_SPINLOCK_STATUS_H
diff --git a/boost/fiber/detail/spinlock_ttas.hpp b/boost/fiber/detail/spinlock_ttas.hpp
index 380773ad6d..f3302ed17e 100644
--- a/boost/fiber/detail/spinlock_ttas.hpp
+++ b/boost/fiber/detail/spinlock_ttas.hpp
@@ -9,41 +9,37 @@
#include <atomic>
#include <chrono>
+#include <cmath>
#include <random>
#include <thread>
#include <boost/fiber/detail/config.hpp>
#include <boost/fiber/detail/cpu_relax.hpp>
+#include <boost/fiber/detail/spinlock_status.hpp>
// based on informations from:
// https://software.intel.com/en-us/articles/benefitting-power-and-performance-sleep-loops
// https://software.intel.com/en-us/articles/long-duration-spin-wait-loops-on-hyper-threading-technology-enabled-intel-processors
-#if BOOST_COMP_CLANG
-#pragma clang diagnostic push
-#pragma clang diagnostic ignored "-Wunused-private-field"
-#endif
-
namespace boost {
namespace fibers {
namespace detail {
class spinlock_ttas {
private:
- enum class spinlock_status {
- locked = 0,
- unlocked
- };
+ template< typename FBSplk >
+ friend class spinlock_rtm;
- std::atomic< spinlock_status > state_{ spinlock_status::unlocked };
+ std::atomic< spinlock_status > state_{ spinlock_status::unlocked };
public:
- spinlock_ttas() noexcept = default;
+ spinlock_ttas() = default;
spinlock_ttas( spinlock_ttas const&) = delete;
spinlock_ttas & operator=( spinlock_ttas const&) = delete;
void lock() noexcept {
+ static thread_local std::minstd_rand generator{ std::random_device{}() };
std::size_t collisions = 0 ;
for (;;) {
// avoid using multiple pause instructions for a delay of a specific cycle count
@@ -51,7 +47,7 @@ public:
// the cycle count can not guaranteed from one system to the next
// -> check the shared variable 'state_' in between each cpu_relax() to prevent
// unnecessarily long delays on some systems
- std::size_t tests = 0;
+ std::size_t retries = 0;
// test shared variable 'status_'
// first access to 'state_' -> chache miss
// sucessive acccess to 'state_' -> cache hit
@@ -59,21 +55,26 @@ public:
// cached 'state_' is invalidated -> cache miss
while ( spinlock_status::locked == state_.load( std::memory_order_relaxed) ) {
#if !defined(BOOST_FIBERS_SPIN_SINGLE_CORE)
- if ( BOOST_FIBERS_SPIN_MAX_TESTS > tests) {
- ++tests;
+ if ( BOOST_FIBERS_SPIN_BEFORE_SLEEP0 > retries) {
+ ++retries;
// give CPU a hint that this thread is in a "spin-wait" loop
// delays the next instruction's execution for a finite period of time (depends on processor family)
// the CPU is not under demand, parts of the pipeline are no longer being used
// -> reduces the power consumed by the CPU
// -> prevent pipeline stalls
cpu_relax();
- } else {
+ } else if ( BOOST_FIBERS_SPIN_BEFORE_YIELD > retries) {
// std::this_thread::sleep_for( 0us) has a fairly long instruction path length,
// combined with an expensive ring3 to ring 0 transition costing about 1000 cycles
// std::this_thread::sleep_for( 0us) lets give up this_thread the remaining part of its time slice
// if and only if a thread of equal or greater priority is ready to run
static constexpr std::chrono::microseconds us0{ 0 };
std::this_thread::sleep_for( us0);
+ } else {
+ // std::this_thread::yield() allows this_thread to give up the remaining part of its time slice,
+ // but only to another thread on the same processor
+ // instead of constant checking, a thread only checks if no other useful work is pending
+ std::this_thread::yield();
}
#else
std::this_thread::yield();
@@ -85,8 +86,8 @@ public:
// spinlock now contended
// utilize 'Binary Exponential Backoff' algorithm
// linear_congruential_engine is a random number engine based on Linear congruential generator (LCG)
- static thread_local std::minstd_rand generator;
- static std::uniform_int_distribution< std::size_t > distribution{ 0, static_cast< std::size_t >( 1) << collisions };
+ std::uniform_int_distribution< std::size_t > distribution{
+ 0, static_cast< std::size_t >( 1) << (std::min)(collisions, static_cast< std::size_t >( BOOST_FIBERS_CONTENTION_WINDOW_THRESHOLD)) };
const std::size_t z = distribution( generator);
++collisions;
for ( std::size_t i = 0; i < z; ++i) {
@@ -101,6 +102,10 @@ public:
}
}
+ bool try_lock() noexcept {
+ return spinlock_status::unlocked == state_.exchange( spinlock_status::locked, std::memory_order_acquire);
+ }
+
void unlock() noexcept {
state_.store( spinlock_status::unlocked, std::memory_order_release);
}
@@ -108,8 +113,4 @@ public:
}}}
-#if BOOST_COMP_CLANG
-#pragma clang diagnostic pop
-#endif
-
#endif // BOOST_FIBERS_SPINLOCK_TTAS_H
diff --git a/boost/fiber/detail/spinlock_ttas_adaptive.hpp b/boost/fiber/detail/spinlock_ttas_adaptive.hpp
index da044b6298..d1f8b73cf3 100644
--- a/boost/fiber/detail/spinlock_ttas_adaptive.hpp
+++ b/boost/fiber/detail/spinlock_ttas_adaptive.hpp
@@ -15,6 +15,7 @@
#include <boost/fiber/detail/config.hpp>
#include <boost/fiber/detail/cpu_relax.hpp>
+#include <boost/fiber/detail/spinlock_status.hpp>
// based on informations from:
// https://software.intel.com/en-us/articles/benefitting-power-and-performance-sleep-loops
@@ -26,26 +27,28 @@ namespace detail {
class spinlock_ttas_adaptive {
private:
- enum class spinlock_status {
- locked = 0,
- unlocked
- };
+ template< typename FBSplk >
+ friend class spinlock_rtm;
- std::atomic< spinlock_status > state_{ spinlock_status::unlocked };
- std::atomic< std::size_t > tests_{ 0 };
+ std::atomic< spinlock_status > state_{ spinlock_status::unlocked };
+ std::atomic< std::size_t > retries_{ 0 };
public:
- spinlock_ttas_adaptive() noexcept = default;
+ spinlock_ttas_adaptive() = default;
spinlock_ttas_adaptive( spinlock_ttas_adaptive const&) = delete;
spinlock_ttas_adaptive & operator=( spinlock_ttas_adaptive const&) = delete;
void lock() noexcept {
+ static thread_local std::minstd_rand generator{ std::random_device{}() };
std::size_t collisions = 0 ;
for (;;) {
- std::size_t tests = 0;
- const std::size_t prev_tests = tests_.load( std::memory_order_relaxed);
- const std::size_t max_tests = (std::min)( static_cast< std::size_t >( BOOST_FIBERS_SPIN_MAX_TESTS), 2 * prev_tests + 10);
+ std::size_t retries = 0;
+ const std::size_t prev_retries = retries_.load( std::memory_order_relaxed);
+ const std::size_t max_relax_retries = (std::min)(
+ static_cast< std::size_t >( BOOST_FIBERS_SPIN_BEFORE_SLEEP0), 2 * prev_retries + 10);
+ const std::size_t max_sleep_retries = (std::min)(
+ static_cast< std::size_t >( BOOST_FIBERS_SPIN_BEFORE_YIELD), 2 * prev_retries + 10);
// avoid using multiple pause instructions for a delay of a specific cycle count
// the delay of cpu_relax() (pause on Intel) depends on the processor family
// the cycle count can not guaranteed from one system to the next
@@ -58,22 +61,27 @@ public:
// cached 'state_' is invalidated -> cache miss
while ( spinlock_status::locked == state_.load( std::memory_order_relaxed) ) {
#if !defined(BOOST_FIBERS_SPIN_SINGLE_CORE)
- if ( max_tests > tests) {
- ++tests;
+ if ( max_relax_retries > retries) {
+ ++retries;
// give CPU a hint that this thread is in a "spin-wait" loop
// delays the next instruction's execution for a finite period of time (depends on processor family)
// the CPU is not under demand, parts of the pipeline are no longer being used
// -> reduces the power consumed by the CPU
// -> prevent pipeline stalls
cpu_relax();
- } else {
- ++tests;
+ } else if ( max_sleep_retries > retries) {
+ ++retries;
// std::this_thread::sleep_for( 0us) has a fairly long instruction path length,
// combined with an expensive ring3 to ring 0 transition costing about 1000 cycles
// std::this_thread::sleep_for( 0us) lets give up this_thread the remaining part of its time slice
// if and only if a thread of equal or greater priority is ready to run
static constexpr std::chrono::microseconds us0{ 0 };
std::this_thread::sleep_for( us0);
+ } else {
+ // std::this_thread::yield() allows this_thread to give up the remaining part of its time slice,
+ // but only to another thread on the same processor
+ // instead of constant checking, a thread only checks if no other useful work is pending
+ std::this_thread::yield();
}
#else
std::this_thread::yield();
@@ -85,8 +93,8 @@ public:
// spinlock now contended
// utilize 'Binary Exponential Backoff' algorithm
// linear_congruential_engine is a random number engine based on Linear congruential generator (LCG)
- static thread_local std::minstd_rand generator;
- static std::uniform_int_distribution< std::size_t > distribution{ 0, static_cast< std::size_t >( 1) << collisions };
+ std::uniform_int_distribution< std::size_t > distribution{
+ 0, static_cast< std::size_t >( 1) << (std::min)(collisions, static_cast< std::size_t >( BOOST_FIBERS_CONTENTION_WINDOW_THRESHOLD)) };
const std::size_t z = distribution( generator);
++collisions;
for ( std::size_t i = 0; i < z; ++i) {
@@ -95,13 +103,17 @@ public:
cpu_relax();
}
} else {
- tests_.store( prev_tests + (tests - prev_tests) / 8, std::memory_order_relaxed);
+ retries_.store( prev_retries + (retries - prev_retries) / 8, std::memory_order_relaxed);
// success, thread has acquired the lock
break;
}
}
}
+ bool try_lock() noexcept {
+ return spinlock_status::unlocked == state_.exchange( spinlock_status::locked, std::memory_order_acquire);
+ }
+
void unlock() noexcept {
state_.store( spinlock_status::unlocked, std::memory_order_release);
}
diff --git a/boost/fiber/detail/spinlock_ttas_adaptive_futex.hpp b/boost/fiber/detail/spinlock_ttas_adaptive_futex.hpp
index 61ab47691e..0f0b191e67 100644
--- a/boost/fiber/detail/spinlock_ttas_adaptive_futex.hpp
+++ b/boost/fiber/detail/spinlock_ttas_adaptive_futex.hpp
@@ -26,21 +26,28 @@ namespace detail {
class spinlock_ttas_adaptive_futex {
private:
- std::atomic< std::int32_t > value_{ 0 };
- std::atomic< std::int32_t > tests_{ 0 };
+ template< typename FBSplk >
+ friend class spinlock_rtm;
+
+ std::atomic< std::int32_t > value_{ 0 };
+ std::atomic< std::int32_t > retries_{ 0 };
public:
- spinlock_ttas_adaptive_futex() noexcept = default;
+ spinlock_ttas_adaptive_futex() = default;
spinlock_ttas_adaptive_futex( spinlock_ttas_adaptive_futex const&) = delete;
spinlock_ttas_adaptive_futex & operator=( spinlock_ttas_adaptive_futex const&) = delete;
void lock() noexcept {
- std::int32_t collisions = 0, tests = 0, expected = 0;
- const std::int32_t prev_tests = tests_.load( std::memory_order_relaxed);
- const std::int32_t max_tests = (std::min)( static_cast< std::int32_t >( BOOST_FIBERS_SPIN_MAX_TESTS), 2 * prev_tests + 10);
+ static thread_local std::minstd_rand generator{ std::random_device{}() };
+ std::int32_t collisions = 0, retries = 0, expected = 0;
+ const std::int32_t prev_retries = retries_.load( std::memory_order_relaxed);
+ const std::int32_t max_relax_retries = (std::min)(
+ static_cast< std::int32_t >( BOOST_FIBERS_SPIN_BEFORE_SLEEP0), 2 * prev_retries + 10);
+ const std::int32_t max_sleep_retries = (std::min)(
+ static_cast< std::int32_t >( BOOST_FIBERS_SPIN_BEFORE_YIELD), 2 * prev_retries + 10);
// after max. spins or collisions suspend via futex
- while ( max_tests > tests && BOOST_FIBERS_SPIN_MAX_COLLISIONS > collisions) {
+ while ( retries++ < BOOST_FIBERS_RETRY_THRESHOLD) {
// avoid using multiple pause instructions for a delay of a specific cycle count
// the delay of cpu_relax() (pause on Intel) depends on the processor family
// the cycle count can not guaranteed from one system to the next
@@ -52,26 +59,39 @@ public:
// if 'value_' was released by other fiber
// cached 'value_' is invalidated -> cache miss
if ( 0 != ( expected = value_.load( std::memory_order_relaxed) ) ) {
- ++tests;
#if !defined(BOOST_FIBERS_SPIN_SINGLE_CORE)
- // give CPU a hint that this thread is in a "spin-wait" loop
- // delays the next instruction's execution for a finite period of time (depends on processor family)
- // the CPU is not under demand, parts of the pipeline are no longer being used
- // -> reduces the power consumed by the CPU
- // -> prevent pipeline stalls
- cpu_relax();
+ if ( max_relax_retries > retries) {
+ // give CPU a hint that this thread is in a "spin-wait" loop
+ // delays the next instruction's execution for a finite period of time (depends on processor family)
+ // the CPU is not under demand, parts of the pipeline are no longer being used
+ // -> reduces the power consumed by the CPU
+ // -> prevent pipeline stalls
+ cpu_relax();
+ } else if ( max_sleep_retries > retries) {
+ // std::this_thread::sleep_for( 0us) has a fairly long instruction path length,
+ // combined with an expensive ring3 to ring 0 transition costing about 1000 cycles
+ // std::this_thread::sleep_for( 0us) lets give up this_thread the remaining part of its time slice
+ // if and only if a thread of equal or greater priority is ready to run
+ static constexpr std::chrono::microseconds us0{ 0 };
+ std::this_thread::sleep_for( us0);
+ } else {
+ // std::this_thread::yield() allows this_thread to give up the remaining part of its time slice,
+ // but only to another thread on the same processor
+ // instead of constant checking, a thread only checks if no other useful work is pending
+ std::this_thread::yield();
+ }
#else
// std::this_thread::yield() allows this_thread to give up the remaining part of its time slice,
// but only to another thread on the same processor
// instead of constant checking, a thread only checks if no other useful work is pending
std::this_thread::yield();
#endif
- } else if ( ! value_.compare_exchange_strong( expected, 1, std::memory_order_acquire, std::memory_order_release) ) {
+ } else if ( ! value_.compare_exchange_strong( expected, 1, std::memory_order_acquire) ) {
// spinlock now contended
// utilize 'Binary Exponential Backoff' algorithm
// linear_congruential_engine is a random number engine based on Linear congruential generator (LCG)
- static thread_local std::minstd_rand generator;
- static std::uniform_int_distribution< std::int32_t > distribution{ 0, static_cast< std::int32_t >( 1) << collisions };
+ std::uniform_int_distribution< std::int32_t > distribution{
+ 0, static_cast< std::int32_t >( 1) << (std::min)(collisions, static_cast< std::int32_t >( BOOST_FIBERS_CONTENTION_WINDOW_THRESHOLD)) };
const std::int32_t z = distribution( generator);
++collisions;
for ( std::int32_t i = 0; i < z; ++i) {
@@ -81,7 +101,7 @@ public:
}
} else {
// success, lock acquired
- tests_.store( prev_tests + (tests - prev_tests) / 8, std::memory_order_relaxed);
+ retries_.store( prev_retries + (retries - prev_retries) / 8, std::memory_order_relaxed);
return;
}
}
@@ -95,7 +115,12 @@ public:
expected = value_.exchange( 2, std::memory_order_acquire);
}
// success, lock acquired
- tests_.store( prev_tests + (tests - prev_tests) / 8, std::memory_order_relaxed);
+ retries_.store( prev_retries + (retries - prev_retries) / 8, std::memory_order_relaxed);
+ }
+
+ bool try_lock() noexcept {
+ std::int32_t expected = 0;
+ return value_.compare_exchange_strong( expected, 1, std::memory_order_acquire);
}
void unlock() noexcept {
diff --git a/boost/fiber/detail/spinlock_ttas_futex.hpp b/boost/fiber/detail/spinlock_ttas_futex.hpp
index a427b73ba5..fd30c4120e 100644
--- a/boost/fiber/detail/spinlock_ttas_futex.hpp
+++ b/boost/fiber/detail/spinlock_ttas_futex.hpp
@@ -8,6 +8,7 @@
#define BOOST_FIBERS_spinlock_ttas_futex_FUTEX_H
#include <atomic>
+#include <cmath>
#include <random>
#include <thread>
@@ -25,18 +26,22 @@ namespace detail {
class spinlock_ttas_futex {
private:
- std::atomic< std::int32_t > value_{ 0 };
+ template< typename FBSplk >
+ friend class spinlock_rtm;
+
+ std::atomic< std::int32_t > value_{ 0 };
public:
- spinlock_ttas_futex() noexcept = default;
+ spinlock_ttas_futex() = default;
spinlock_ttas_futex( spinlock_ttas_futex const&) = delete;
spinlock_ttas_futex & operator=( spinlock_ttas_futex const&) = delete;
void lock() noexcept {
- std::int32_t collisions = 0, tests = 0, expected = 0;
+ static thread_local std::minstd_rand generator{ std::random_device{}() };
+ std::int32_t collisions = 0, retries = 0, expected = 0;
// after max. spins or collisions suspend via futex
- while ( BOOST_FIBERS_SPIN_MAX_TESTS > tests && BOOST_FIBERS_SPIN_MAX_COLLISIONS > collisions) {
+ while ( retries++ < BOOST_FIBERS_RETRY_THRESHOLD) {
// avoid using multiple pause instructions for a delay of a specific cycle count
// the delay of cpu_relax() (pause on Intel) depends on the processor family
// the cycle count can not guaranteed from one system to the next
@@ -48,26 +53,39 @@ public:
// if 'value_' was released by other fiber
// cached 'value_' is invalidated -> cache miss
if ( 0 != ( expected = value_.load( std::memory_order_relaxed) ) ) {
- ++tests;
#if !defined(BOOST_FIBERS_SPIN_SINGLE_CORE)
- // give CPU a hint that this thread is in a "spin-wait" loop
- // delays the next instruction's execution for a finite period of time (depends on processor family)
- // the CPU is not under demand, parts of the pipeline are no longer being used
- // -> reduces the power consumed by the CPU
- // -> prevent pipeline stalls
- cpu_relax();
+ if ( BOOST_FIBERS_SPIN_BEFORE_SLEEP0 > retries) {
+ // give CPU a hint that this thread is in a "spin-wait" loop
+ // delays the next instruction's execution for a finite period of time (depends on processor family)
+ // the CPU is not under demand, parts of the pipeline are no longer being used
+ // -> reduces the power consumed by the CPU
+ // -> prevent pipeline stalls
+ cpu_relax();
+ } else if ( BOOST_FIBERS_SPIN_BEFORE_YIELD > retries) {
+ // std::this_thread::sleep_for( 0us) has a fairly long instruction path length,
+ // combined with an expensive ring3 to ring 0 transition costing about 1000 cycles
+ // std::this_thread::sleep_for( 0us) lets give up this_thread the remaining part of its time slice
+ // if and only if a thread of equal or greater priority is ready to run
+ static constexpr std::chrono::microseconds us0{ 0 };
+ std::this_thread::sleep_for( us0);
+ } else {
+ // std::this_thread::yield() allows this_thread to give up the remaining part of its time slice,
+ // but only to another thread on the same processor
+ // instead of constant checking, a thread only checks if no other useful work is pending
+ std::this_thread::yield();
+ }
#else
// std::this_thread::yield() allows this_thread to give up the remaining part of its time slice,
// but only to another thread on the same processor
// instead of constant checking, a thread only checks if no other useful work is pending
std::this_thread::yield();
#endif
- } else if ( ! value_.compare_exchange_strong( expected, 1, std::memory_order_acquire, std::memory_order_release) ) {
+ } else if ( ! value_.compare_exchange_strong( expected, 1, std::memory_order_acquire) ) {
// spinlock now contended
// utilize 'Binary Exponential Backoff' algorithm
// linear_congruential_engine is a random number engine based on Linear congruential generator (LCG)
- static thread_local std::minstd_rand generator;
- static std::uniform_int_distribution< std::int32_t > distribution{ 0, static_cast< std::int32_t >( 1) << collisions };
+ std::uniform_int_distribution< std::int32_t > distribution{
+ 0, static_cast< std::int32_t >( 1) << (std::min)(collisions, static_cast< std::int32_t >( BOOST_FIBERS_CONTENTION_WINDOW_THRESHOLD)) };
const std::int32_t z = distribution( generator);
++collisions;
for ( std::int32_t i = 0; i < z; ++i) {
@@ -91,6 +109,11 @@ public:
}
}
+ bool try_lock() noexcept {
+ std::int32_t expected = 0;
+ return value_.compare_exchange_strong( expected, 1, std::memory_order_acquire);
+ }
+
void unlock() noexcept {
if ( 1 != value_.fetch_sub( 1, std::memory_order_acquire) ) {
value_.store( 0, std::memory_order_release);
diff --git a/boost/fiber/detail/wrap.hpp b/boost/fiber/detail/wrap.hpp
deleted file mode 100644
index 558de6bd94..0000000000
--- a/boost/fiber/detail/wrap.hpp
+++ /dev/null
@@ -1,131 +0,0 @@
-
-// Copyright Oliver Kowalke 2014.
-// Distributed under the Boost Software License, Version 1.0.
-// (See accompanying file LICENSE_1_0.txt or copy at
-// http://www.boost.org/LICENSE_1_0.txt)
-
-#ifndef BOOST_FIBER_DETAIL_WRAP_H
-#define BOOST_FIBER_DETAIL_WRAP_H
-
-#include <type_traits>
-
-#include <boost/config.hpp>
-#if defined(BOOST_NO_CXX17_STD_INVOKE)
-#include <boost/context/detail/invoke.hpp>
-#endif
-#if (BOOST_EXECUTION_CONTEXT==1)
-# include <boost/context/execution_context.hpp>
-#else
-# include <boost/context/continuation.hpp>
-#endif
-
-#include <boost/fiber/detail/config.hpp>
-#include <boost/fiber/detail/data.hpp>
-
-#ifdef BOOST_HAS_ABI_HEADERS
-# include BOOST_ABI_PREFIX
-#endif
-
-namespace boost {
-namespace fibers {
-namespace detail {
-
-#if (BOOST_EXECUTION_CONTEXT==1)
-template< typename Fn1, typename Fn2, typename Tpl >
-class wrapper {
-private:
- typename std::decay< Fn1 >::type fn1_;
- typename std::decay< Fn2 >::type fn2_;
- typename std::decay< Tpl >::type tpl_;
- boost::context::execution_context ctx_;
-
-public:
- wrapper( Fn1 && fn1, Fn2 && fn2, Tpl && tpl,
- boost::context::execution_context const& ctx) :
- fn1_{ std::move( fn1) },
- fn2_{ std::move( fn2) },
- tpl_{ std::move( tpl) },
- ctx_{ ctx } {
- }
-
- wrapper( wrapper const&) = delete;
- wrapper & operator=( wrapper const&) = delete;
-
- wrapper( wrapper && other) = default;
- wrapper & operator=( wrapper && other) = default;
-
- void operator()( void * vp) {
-#if defined(BOOST_NO_CXX17_STD_INVOKE)
- boost::context::detail::invoke( std::move( fn1_), fn2_, tpl_, ctx_, vp);
-#else
- std::invoke( std::move( fn1_), fn2_, tpl_, ctx_, vp);
-#endif
- }
-};
-
-template< typename Fn1, typename Fn2, typename Tpl >
-wrapper< Fn1, Fn2, Tpl >
-wrap( Fn1 && fn1, Fn2 && fn2, Tpl && tpl,
- boost::context::execution_context const& ctx) {
- return wrapper< Fn1, Fn2, Tpl >{
- std::forward< Fn1 >( fn1),
- std::forward< Fn2 >( fn2),
- std::forward< Tpl >( tpl),
- ctx };
-}
-#else
-template< typename Fn1, typename Fn2, typename Tpl >
-class wrapper {
-private:
- typename std::decay< Fn1 >::type fn1_;
- typename std::decay< Fn2 >::type fn2_;
- typename std::decay< Tpl >::type tpl_;
-
-public:
- wrapper( Fn1 && fn1, Fn2 && fn2, Tpl && tpl) :
- fn1_{ std::move( fn1) },
- fn2_{ std::move( fn2) },
- tpl_{ std::move( tpl) } {
- }
-
- wrapper( wrapper const&) = delete;
- wrapper & operator=( wrapper const&) = delete;
-
- wrapper( wrapper && other) = default;
- wrapper & operator=( wrapper && other) = default;
-
- boost::context::continuation
- operator()( boost::context::continuation && c) {
-#if defined(BOOST_NO_CXX17_STD_INVOKE)
- return boost::context::detail::invoke(
- std::move( fn1_),
- fn2_,
- tpl_,
- std::forward< boost::context::continuation >( c) );
-#else
- return std::invoke(
- std::move( fn1_),
- fn2_,
- tpl_,
- std::forward< boost::context::continuation >( c) );
-#endif
- }
-};
-
-template< typename Fn1, typename Fn2, typename Tpl >
-wrapper< Fn1, Fn2, Tpl >
-wrap( Fn1 && fn1, Fn2 && fn2, Tpl && tpl) {
- return wrapper< Fn1, Fn2, Tpl >{
- std::forward< Fn1 >( fn1),
- std::forward< Fn2 >( fn2),
- std::forward< Tpl >( tpl) };
-}
-#endif
-
-}}}
-
-#ifdef BOOST_HAS_ABI_HEADERS
-#include BOOST_ABI_SUFFIX
-#endif
-
-#endif // BOOST_FIBER_DETAIL_WRAP_H
diff --git a/boost/fiber/fiber.hpp b/boost/fiber/fiber.hpp
index 1508a9b5a6..eb10487c66 100644
--- a/boost/fiber/fiber.hpp
+++ b/boost/fiber/fiber.hpp
@@ -15,6 +15,7 @@
#include <boost/assert.hpp>
#include <boost/config.hpp>
#include <boost/intrusive_ptr.hpp>
+#include <boost/predef.h>
#include <boost/fiber/detail/config.hpp>
#include <boost/fiber/detail/disable_overload.hpp>
@@ -52,41 +53,59 @@ public:
fiber() = default;
template< typename Fn,
- typename ... Args,
- typename = detail::disable_overload< fiber, Fn >
+ typename ... Arg,
+ typename = detail::disable_overload< fiber, Fn >,
+ typename = detail::disable_overload< launch, Fn >,
+ typename = detail::disable_overload< std::allocator_arg_t, Fn >
>
- fiber( Fn && fn, Args && ... args) :
+#if BOOST_COMP_GNUC < 50000000
+ fiber( Fn && fn, Arg && ... arg) :
+#else
+ fiber( Fn && fn, Arg ... arg) :
+#endif
fiber{ launch::post,
std::allocator_arg, default_stack(),
- std::forward< Fn >( fn), std::forward< Args >( args) ... } {
+ std::forward< Fn >( fn), std::forward< Arg >( arg) ... } {
}
template< typename Fn,
- typename ... Args,
+ typename ... Arg,
typename = detail::disable_overload< fiber, Fn >
>
- fiber( launch policy, Fn && fn, Args && ... args) :
+#if BOOST_COMP_GNUC < 50000000
+ fiber( launch policy, Fn && fn, Arg && ... arg) :
+#else
+ fiber( launch policy, Fn && fn, Arg ... arg) :
+#endif
fiber{ policy,
std::allocator_arg, default_stack(),
- std::forward< Fn >( fn), std::forward< Args >( args) ... } {
+ std::forward< Fn >( fn), std::forward< Arg >( arg) ... } {
}
template< typename StackAllocator,
typename Fn,
- typename ... Args
+ typename ... Arg
>
- fiber( std::allocator_arg_t, StackAllocator salloc, Fn && fn, Args && ... args) :
+#if BOOST_COMP_GNUC < 50000000
+ fiber( std::allocator_arg_t, StackAllocator salloc, Fn && fn, Arg && ... arg) :
+#else
+ fiber( std::allocator_arg_t, StackAllocator salloc, Fn && fn, Arg ... arg) :
+#endif
fiber{ launch::post,
std::allocator_arg, salloc,
- std::forward< Fn >( fn), std::forward< Args >( args) ... } {
+ std::forward< Fn >( fn), std::forward< Arg >( arg) ... } {
}
template< typename StackAllocator,
typename Fn,
- typename ... Args
+ typename ... Arg
>
- fiber( launch policy, std::allocator_arg_t, StackAllocator salloc, Fn && fn, Args && ... args) :
- impl_{ make_worker_context( policy, salloc, std::forward< Fn >( fn), std::forward< Args >( args) ... ) } {
+#if BOOST_COMP_GNUC < 50000000
+ fiber( launch policy, std::allocator_arg_t, StackAllocator salloc, Fn && fn, Arg && ... arg) :
+#else
+ fiber( launch policy, std::allocator_arg_t, StackAllocator salloc, Fn && fn, Arg ... arg) :
+#endif
+ impl_{ make_worker_context( policy, salloc, std::forward< Fn >( fn), std::forward< Arg >( arg) ... ) } {
start_();
}
@@ -108,7 +127,7 @@ public:
if ( joinable() ) {
std::terminate();
}
- if ( this == & other) {
+ if ( BOOST_UNLIKELY( this == & other) ) {
return * this;
}
impl_.swap( other.impl_);
diff --git a/boost/fiber/fss.hpp b/boost/fiber/fss.hpp
index f65d7353b3..0a321bb661 100644
--- a/boost/fiber/fss.hpp
+++ b/boost/fiber/fss.hpp
@@ -37,8 +37,8 @@ private:
fn{ fn_ } {
}
- void operator()( void* data) {
- if ( fn) {
+ void operator()( void * data) {
+ if ( BOOST_LIKELY( nullptr != fn) ) {
fn( static_cast< T * >( data) );
}
}
@@ -91,7 +91,7 @@ public:
void reset( T * t) {
T * c = get();
- if ( c != t) {
+ if ( BOOST_LIKELY( c != t) ) {
context::active()->set_fss_data(
this, cleanup_fn_, t, true);
}
diff --git a/boost/fiber/future/async.hpp b/boost/fiber/future/async.hpp
index e68b2c28fa..9601e8cdfe 100644
--- a/boost/fiber/future/async.hpp
+++ b/boost/fiber/future/async.hpp
@@ -30,7 +30,7 @@ future<
>::type( typename std::decay< Args >::type ... )
>::type
>
-async( Fn && fn, Args && ... args) {
+async( Fn && fn, Args ... args) {
typedef typename std::result_of<
typename std::decay< Fn >::type( typename std::decay< Args >::type ... )
>::type result_type;
@@ -51,7 +51,7 @@ future<
>::type( typename std::decay< Args >::type ...)
>::type
>
-async( Policy policy, Fn && fn, Args && ... args) {
+async( Policy policy, Fn && fn, Args ... args) {
typedef typename std::result_of<
typename std::decay< Fn >::type( typename std::decay< Args >::type ... )
>::type result_type;
@@ -72,7 +72,7 @@ future<
>::type( typename std::decay< Args >::type ... )
>::type
>
-async( Policy policy, std::allocator_arg_t, StackAllocator salloc, Fn && fn, Args && ... args) {
+async( Policy policy, std::allocator_arg_t, StackAllocator salloc, Fn && fn, Args ... args) {
typedef typename std::result_of<
typename std::decay< Fn >::type( typename std::decay< Args >::type ... )
>::type result_type;
@@ -94,7 +94,7 @@ future<
>::type( typename std::decay< Args >::type ... )
>::type
>
-async( Policy policy, std::allocator_arg_t, StackAllocator salloc, Allocator alloc, Fn && fn, Args && ... args) {
+async( Policy policy, std::allocator_arg_t, StackAllocator salloc, Allocator alloc, Fn && fn, Args ... args) {
typedef typename std::result_of<
typename std::decay< Fn >::type( typename std::decay< Args >::type ... )
>::type result_type;
diff --git a/boost/fiber/future/detail/shared_state.hpp b/boost/fiber/future/detail/shared_state.hpp
index 898fdaffd4..28e0e3cfb9 100644
--- a/boost/fiber/future/detail/shared_state.hpp
+++ b/boost/fiber/future/detail/shared_state.hpp
@@ -62,7 +62,7 @@ protected:
void set_exception_( std::exception_ptr except, std::unique_lock< mutex > & lk) {
BOOST_ASSERT( lk.owns_lock() );
- if ( ready_) {
+ if ( BOOST_UNLIKELY( ready_) ) {
throw promise_already_satisfied();
}
except_ = except;
@@ -161,19 +161,19 @@ private:
void set_value_( R const& value, std::unique_lock< mutex > & lk) {
BOOST_ASSERT( lk.owns_lock() );
- if ( ready_) {
+ if ( BOOST_UNLIKELY( ready_) ) {
throw promise_already_satisfied{};
}
- ::new ( static_cast< void * >( std::addressof( storage_) ) ) R{ value };
+ ::new ( static_cast< void * >( std::addressof( storage_) ) ) R( value );
mark_ready_and_notify_( lk);
}
void set_value_( R && value, std::unique_lock< mutex > & lk) {
BOOST_ASSERT( lk.owns_lock() );
- if ( ready_) {
+ if ( BOOST_UNLIKELY( ready_) ) {
throw promise_already_satisfied{};
}
- ::new ( static_cast< void * >( std::addressof( storage_) ) ) R{ std::move( value) };
+ ::new ( static_cast< void * >( std::addressof( storage_) ) ) R( std::move( value) );
mark_ready_and_notify_( lk);
}
@@ -223,7 +223,7 @@ private:
void set_value_( R & value, std::unique_lock< mutex > & lk) {
BOOST_ASSERT( lk.owns_lock() );
- if ( ready_) {
+ if ( BOOST_UNLIKELY( ready_) ) {
throw promise_already_satisfied();
}
value_ = std::addressof( value);
@@ -266,7 +266,7 @@ private:
inline
void set_value_( std::unique_lock< mutex > & lk) {
BOOST_ASSERT( lk.owns_lock() );
- if ( ready_) {
+ if ( BOOST_UNLIKELY( ready_) ) {
throw promise_already_satisfied();
}
mark_ready_and_notify_( lk);
diff --git a/boost/fiber/future/detail/task_object.hpp b/boost/fiber/future/detail/task_object.hpp
index 3a48929a58..abb4c8d877 100644
--- a/boost/fiber/future/detail/task_object.hpp
+++ b/boost/fiber/future/detail/task_object.hpp
@@ -16,6 +16,7 @@
#if defined(BOOST_NO_CXX17_STD_APPLY)
#include <boost/context/detail/apply.hpp>
#endif
+#include <boost/core/pointer_traits.hpp>
#include <boost/fiber/detail/config.hpp>
#include <boost/fiber/future/detail/task_base.hpp>
@@ -68,15 +69,17 @@ public:
typename base_type::ptr_type reset() override final {
typedef std::allocator_traits< allocator_type > traity_type;
+ typedef pointer_traits< typename traity_type::pointer> ptrait_type;
typename traity_type::pointer ptr{ traity_type::allocate( alloc_, 1) };
+ typename ptrait_type::element_type* p = ptrait_type::to_address(ptr);
try {
- traity_type::construct( alloc_, ptr, alloc_, std::move( fn_) );
+ traity_type::construct( alloc_, p, alloc_, std::move( fn_) );
} catch (...) {
traity_type::deallocate( alloc_, ptr, 1);
throw;
}
- return { convert( ptr) };
+ return { p };
}
protected:
@@ -134,15 +137,17 @@ public:
typename base_type::ptr_type reset() override final {
typedef std::allocator_traits< allocator_type > traity_type;
+ typedef pointer_traits< typename traity_type::pointer> ptrait_type;
typename traity_type::pointer ptr{ traity_type::allocate( alloc_, 1) };
+ typename ptrait_type::element_type* p = ptrait_type::to_address(ptr);
try {
- traity_type::construct( alloc_, ptr, alloc_, std::move( fn_) );
+ traity_type::construct( alloc_, p, alloc_, std::move( fn_) );
} catch (...) {
traity_type::deallocate( alloc_, ptr, 1);
throw;
}
- return { convert( ptr) };
+ return { p };
}
protected:
diff --git a/boost/fiber/future/future.hpp b/boost/fiber/future/future.hpp
index 5d4ad78ab5..2191080d69 100644
--- a/boost/fiber/future/future.hpp
+++ b/boost/fiber/future/future.hpp
@@ -46,14 +46,14 @@ struct future_base {
}
future_base & operator=( future_base const& other) noexcept {
- if ( this != & other) {
+ if ( BOOST_LIKELY( this != & other) ) {
state_ = other.state_;
}
return * this;
}
future_base & operator=( future_base && other) noexcept {
- if ( this != & other) {
+ if ( BOOST_LIKELY( this != & other) ) {
state_ = other.state_;
other.state_.reset();
}
@@ -65,14 +65,14 @@ struct future_base {
}
std::exception_ptr get_exception_ptr() {
- if ( ! valid() ) {
+ if ( BOOST_UNLIKELY( ! valid() ) ) {
throw future_uninitialized{};
}
return state_->get_exception_ptr();
}
void wait() const {
- if ( ! valid() ) {
+ if ( BOOST_UNLIKELY( ! valid() ) ) {
throw future_uninitialized{};
}
state_->wait();
@@ -80,7 +80,7 @@ struct future_base {
template< typename Rep, typename Period >
future_status wait_for( std::chrono::duration< Rep, Period > const& timeout_duration) const {
- if ( ! valid() ) {
+ if ( BOOST_UNLIKELY( ! valid() ) ) {
throw future_uninitialized{};
}
return state_->wait_for( timeout_duration);
@@ -88,7 +88,7 @@ struct future_base {
template< typename Clock, typename Duration >
future_status wait_until( std::chrono::time_point< Clock, Duration > const& timeout_time) const {
- if ( ! valid() ) {
+ if ( BOOST_UNLIKELY( ! valid() ) ) {
throw future_uninitialized{};
}
return state_->wait_until( timeout_time);
@@ -131,7 +131,7 @@ public:
}
future & operator=( future && other) noexcept {
- if ( this != & other) {
+ if ( BOOST_LIKELY( this != & other) ) {
base_type::operator=( std::move( other) );
}
return * this;
@@ -140,7 +140,7 @@ public:
shared_future< R > share();
R get() {
- if ( ! base_type::valid() ) {
+ if ( BOOST_UNLIKELY( ! base_type::valid() ) ) {
throw future_uninitialized{};
}
typename base_type::ptr_type tmp{};
@@ -180,7 +180,7 @@ public:
}
future & operator=( future && other) noexcept {
- if ( this != & other) {
+ if ( BOOST_LIKELY( this != & other) ) {
base_type::operator=( std::move( other) );
}
return * this;
@@ -189,7 +189,7 @@ public:
shared_future< R & > share();
R & get() {
- if ( ! base_type::valid() ) {
+ if ( BOOST_UNLIKELY( ! base_type::valid() ) ) {
throw future_uninitialized{};
}
typename base_type::ptr_type tmp{};
@@ -231,7 +231,7 @@ public:
inline
future & operator=( future && other) noexcept {
- if ( this != & other) {
+ if ( BOOST_LIKELY( this != & other) ) {
base_type::operator=( std::move( other) );
}
return * this;
@@ -241,7 +241,7 @@ public:
inline
void get() {
- if ( ! base_type::valid() ) {
+ if ( BOOST_UNLIKELY( ! base_type::valid() ) ) {
throw future_uninitialized{};
}
base_type::ptr_type tmp{};
@@ -284,14 +284,14 @@ public:
}
shared_future & operator=( shared_future const& other) noexcept {
- if ( this != & other) {
+ if ( BOOST_LIKELY( this != & other) ) {
base_type::operator=( other);
}
return * this;
}
shared_future & operator=( shared_future && other) noexcept {
- if ( this != & other) {
+ if ( BOOST_LIKELY( this != & other) ) {
base_type::operator=( std::move( other) );
}
return * this;
@@ -303,7 +303,7 @@ public:
}
R const& get() const {
- if ( ! valid() ) {
+ if ( BOOST_UNLIKELY( ! valid() ) ) {
throw future_uninitialized{};
}
return base_type::state_->get();
@@ -343,14 +343,14 @@ public:
}
shared_future & operator=( shared_future const& other) noexcept {
- if ( this != & other) {
+ if ( BOOST_LIKELY( this != & other) ) {
base_type::operator=( other);
}
return * this;
}
shared_future & operator=( shared_future && other) noexcept {
- if ( this != & other) {
+ if ( BOOST_LIKELY( this != & other) ) {
base_type::operator=( std::move( other) );
}
return * this;
@@ -362,7 +362,7 @@ public:
}
R & get() const {
- if ( ! valid() ) {
+ if ( BOOST_UNLIKELY( ! valid() ) ) {
throw future_uninitialized{};
}
return base_type::state_->get();
@@ -406,7 +406,7 @@ public:
inline
shared_future & operator=( shared_future const& other) noexcept {
- if ( this != & other) {
+ if ( BOOST_LIKELY( this != & other) ) {
base_type::operator=( other);
}
return * this;
@@ -414,7 +414,7 @@ public:
inline
shared_future & operator=( shared_future && other) noexcept {
- if ( this != & other) {
+ if ( BOOST_LIKELY( this != & other) ) {
base_type::operator=( std::move( other) );
}
return * this;
@@ -428,7 +428,7 @@ public:
inline
void get() const {
- if ( ! valid() ) {
+ if ( BOOST_UNLIKELY( ! valid() ) ) {
throw future_uninitialized{};
}
base_type::state_->get();
@@ -445,7 +445,7 @@ public:
template< typename R >
shared_future< R >
future< R >::share() {
- if ( ! base_type::valid() ) {
+ if ( BOOST_UNLIKELY( ! base_type::valid() ) ) {
throw future_uninitialized{};
}
return shared_future< R >{ std::move( * this) };
@@ -454,7 +454,7 @@ future< R >::share() {
template< typename R >
shared_future< R & >
future< R & >::share() {
- if ( ! base_type::valid() ) {
+ if ( BOOST_UNLIKELY( ! base_type::valid() ) ) {
throw future_uninitialized{};
}
return shared_future< R & >{ std::move( * this) };
@@ -463,7 +463,7 @@ future< R & >::share() {
inline
shared_future< void >
future< void >::share() {
- if ( ! base_type::valid() ) {
+ if ( BOOST_UNLIKELY( ! base_type::valid() ) ) {
throw future_uninitialized{};
}
return shared_future< void >{ std::move( * this) };
diff --git a/boost/fiber/future/packaged_task.hpp b/boost/fiber/future/packaged_task.hpp
index 31838ee41f..c8b10d43e0 100644
--- a/boost/fiber/future/packaged_task.hpp
+++ b/boost/fiber/future/packaged_task.hpp
@@ -14,7 +14,6 @@
#include <boost/config.hpp>
-#include <boost/fiber/detail/convert.hpp>
#include <boost/fiber/detail/disable_overload.hpp>
#include <boost/fiber/exceptions.hpp>
#include <boost/fiber/future/detail/task_base.hpp>
@@ -57,16 +56,18 @@ public:
typedef std::allocator_traits<
typename object_type::allocator_type
> traits_type;
+ typedef pointer_traits< typename traits_type::pointer > ptrait_type;
typename object_type::allocator_type a{ alloc };
typename traits_type::pointer ptr{ traits_type::allocate( a, 1) };
+ typename ptrait_type::element_type* p = ptrait_type::to_address(ptr);
try {
- traits_type::construct( a, ptr, a, std::forward< Fn >( fn) );
+ traits_type::construct( a, p, a, std::forward< Fn >( fn) );
} catch (...) {
traits_type::deallocate( a, ptr, 1);
throw;
}
- task_.reset( convert( ptr) );
+ task_.reset(p);
}
~packaged_task() {
@@ -85,7 +86,7 @@ public:
}
packaged_task & operator=( packaged_task && other) noexcept {
- if ( this != & other) {
+ if ( BOOST_LIKELY( this != & other) ) {
packaged_task tmp{ std::move( other) };
swap( tmp);
}
@@ -105,7 +106,7 @@ public:
if ( obtained_) {
throw future_already_retrieved{};
}
- if ( ! valid() ) {
+ if ( BOOST_UNLIKELY( ! valid() ) ) {
throw packaged_task_uninitialized{};
}
obtained_ = true;
@@ -114,14 +115,14 @@ public:
}
void operator()( Args ... args) {
- if ( ! valid() ) {
+ if ( BOOST_UNLIKELY( ! valid() ) ) {
throw packaged_task_uninitialized{};
}
task_->run( std::forward< Args >( args) ... );
}
void reset() {
- if ( ! valid() ) {
+ if ( BOOST_UNLIKELY( ! valid() ) ) {
throw packaged_task_uninitialized{};
}
packaged_task tmp;
diff --git a/boost/fiber/future/promise.hpp b/boost/fiber/future/promise.hpp
index 83ba63fb23..661c8b0480 100644
--- a/boost/fiber/future/promise.hpp
+++ b/boost/fiber/future/promise.hpp
@@ -12,8 +12,8 @@
#include <utility>
#include <boost/config.hpp>
+#include <boost/core/pointer_traits.hpp>
-#include <boost/fiber/detail/convert.hpp>
#include <boost/fiber/exceptions.hpp>
#include <boost/fiber/future/detail/shared_state.hpp>
#include <boost/fiber/future/detail/shared_state_object.hpp>
@@ -38,16 +38,18 @@ struct promise_base {
promise_base( std::allocator_arg_t, Allocator alloc) {
typedef detail::shared_state_object< R, Allocator > object_type;
typedef std::allocator_traits< typename object_type::allocator_type > traits_type;
+ typedef pointer_traits< typename traits_type::pointer > ptrait_type;
typename object_type::allocator_type a{ alloc };
typename traits_type::pointer ptr{ traits_type::allocate( a, 1) };
+ typename ptrait_type::element_type* p = ptrait_type::to_address(ptr);
try {
- traits_type::construct( a, ptr, a);
+ traits_type::construct( a, p, a);
} catch (...) {
traits_type::deallocate( a, ptr, 1);
throw;
}
- future_.reset( convert( ptr) );
+ future_.reset(p);
}
~promise_base() {
@@ -66,7 +68,7 @@ struct promise_base {
}
promise_base & operator=( promise_base && other) noexcept {
- if ( this != & other) {
+ if ( BOOST_LIKELY( this != & other) ) {
promise_base tmp{ std::move( other) };
swap( tmp);
}
@@ -74,10 +76,10 @@ struct promise_base {
}
future< R > get_future() {
- if ( obtained_) {
+ if ( BOOST_UNLIKELY( obtained_) ) {
throw future_already_retrieved{};
}
- if ( ! future_) {
+ if ( BOOST_UNLIKELY( ! future_) ) {
throw promise_uninitialized{};
}
obtained_ = true;
@@ -90,7 +92,7 @@ struct promise_base {
}
void set_exception( std::exception_ptr p) {
- if ( ! future_) {
+ if ( BOOST_UNLIKELY( ! future_) ) {
throw promise_uninitialized{};
}
future_->set_exception( p);
@@ -119,14 +121,14 @@ public:
promise & operator=( promise && other) = default;
void set_value( R const& value) {
- if ( ! base_type::future_) {
+ if ( BOOST_UNLIKELY( ! base_type::future_) ) {
throw promise_uninitialized{};
}
base_type::future_->set_value( value);
}
void set_value( R && value) {
- if ( ! base_type::future_) {
+ if ( BOOST_UNLIKELY( ! base_type::future_) ) {
throw promise_uninitialized{};
}
base_type::future_->set_value( std::move( value) );
@@ -160,7 +162,7 @@ public:
promise & operator=( promise && other) noexcept = default;
void set_value( R & value) {
- if ( ! base_type::future_) {
+ if ( BOOST_UNLIKELY( ! base_type::future_) ) {
throw promise_uninitialized{};
}
base_type::future_->set_value( value);
@@ -195,7 +197,7 @@ public:
inline
void set_value() {
- if ( ! base_type::future_) {
+ if ( BOOST_UNLIKELY( ! base_type::future_) ) {
throw promise_uninitialized{};
}
base_type::future_->set_value();
diff --git a/boost/fiber/numa/pin_thread.hpp b/boost/fiber/numa/pin_thread.hpp
new file mode 100644
index 0000000000..c8d48395c6
--- /dev/null
+++ b/boost/fiber/numa/pin_thread.hpp
@@ -0,0 +1,33 @@
+
+// Copyright Oliver Kowalke 2017.
+// Distributed under the Boost Software License, Version 1.0.
+// (See accompanying file LICENSE_1_0.txt or copy at
+// http://www.boost.org/LICENSE_1_0.txt)
+
+#ifndef BOOST_FIBERS_NUMA_PIN_THREAD_H
+#define BOOST_FIBERS_NUMA_PIN_THREAD_H
+
+#include <cstdint>
+
+#include <boost/config.hpp>
+
+#include <boost/fiber/detail/config.hpp>
+
+#ifdef BOOST_HAS_ABI_HEADERS
+# include BOOST_ABI_PREFIX
+#endif
+
+namespace boost {
+namespace fibers {
+namespace numa {
+
+BOOST_FIBERS_DECL
+void pin_thread( std::uint32_t);
+
+}}}
+
+#ifdef BOOST_HAS_ABI_HEADERS
+# include BOOST_ABI_SUFFIX
+#endif
+
+#endif // BOOST_FIBERS_NUMA_PIN_THREAD_H
diff --git a/boost/fiber/numa/topology.hpp b/boost/fiber/numa/topology.hpp
new file mode 100644
index 0000000000..7b67fad288
--- /dev/null
+++ b/boost/fiber/numa/topology.hpp
@@ -0,0 +1,46 @@
+
+// Copyright Oliver Kowalke 2017.
+// Distributed under the Boost Software License, Version 1.0.
+// (See accompanying file LICENSE_1_0.txt or copy at
+// http://www.boost.org/LICENSE_1_0.txt)
+
+#ifndef BOOST_FIBERS_NUMA_TOPOLOGY_H
+#define BOOST_FIBERS_NUMA_TOPOLOGY_H
+
+#include <cstdint>
+#include <set>
+#include <vector>
+
+#include <boost/config.hpp>
+
+#include <boost/fiber/detail/config.hpp>
+
+#ifdef BOOST_HAS_ABI_HEADERS
+# include BOOST_ABI_PREFIX
+#endif
+
+namespace boost {
+namespace fibers {
+namespace numa {
+
+struct node {
+ std::uint32_t id;
+ std::set< std::uint32_t > logical_cpus;
+ std::vector< std::uint32_t > distance;
+};
+
+inline
+bool operator<( node const& lhs, node const& rhs) noexcept {
+ return lhs.id < rhs.id;
+}
+
+BOOST_FIBERS_DECL
+std::vector< node > topology();
+
+}}}
+
+#ifdef BOOST_HAS_ABI_HEADERS
+# include BOOST_ABI_SUFFIX
+#endif
+
+#endif // BOOST_FIBERS_NUMA_TOPOLOGY_H
diff --git a/boost/fiber/operations.hpp b/boost/fiber/operations.hpp
index 95ab9ba452..22b58562da 100644
--- a/boost/fiber/operations.hpp
+++ b/boost/fiber/operations.hpp
@@ -10,6 +10,7 @@
#include <boost/config.hpp>
+#include <boost/fiber/algo/algorithm.hpp>
#include <boost/fiber/context.hpp>
#include <boost/fiber/detail/config.hpp>
#include <boost/fiber/detail/convert.hpp>
@@ -36,19 +37,22 @@ void yield() noexcept {
template< typename Clock, typename Duration >
void sleep_until( std::chrono::time_point< Clock, Duration > const& sleep_time_) {
std::chrono::steady_clock::time_point sleep_time = boost::fibers::detail::convert( sleep_time_);
- fibers::context::active()->wait_until( sleep_time);
+ fibers::context * active_ctx = fibers::context::active();
+ active_ctx->twstatus.store( static_cast< std::intptr_t >( 0), std::memory_order_release);
+ active_ctx->wait_until( sleep_time);
}
template< typename Rep, typename Period >
void sleep_for( std::chrono::duration< Rep, Period > const& timeout_duration) {
- fibers::context::active()->wait_until(
- std::chrono::steady_clock::now() + timeout_duration);
+ fibers::context * active_ctx = fibers::context::active();
+ active_ctx->twstatus.store( static_cast< std::intptr_t >( 0), std::memory_order_release);
+ active_ctx->wait_until( std::chrono::steady_clock::now() + timeout_duration);
}
template< typename PROPS >
PROPS & properties() {
fibers::fiber_properties * props = fibers::context::active()->get_properties();
- if ( ! props) {
+ if ( BOOST_LIKELY( nullptr == props) ) {
// props could be nullptr if the thread's main fiber has not yet
// yielded (not yet passed through algorithm_with_properties::
// awakened()). Address that by yielding right now.
@@ -76,9 +80,7 @@ bool has_ready_fibers() noexcept {
template< typename SchedAlgo, typename ... Args >
void use_scheduling_algorithm( Args && ... args) noexcept {
boost::fibers::context::active()->get_scheduler()
- ->set_algo(
- std::unique_ptr< SchedAlgo >(
- new SchedAlgo( std::forward< Args >( args) ... ) ) );
+ ->set_algo( new SchedAlgo( std::forward< Args >( args) ... ) );
}
}}
diff --git a/boost/fiber/properties.hpp b/boost/fiber/properties.hpp
index 09ac7405d6..2f8b7a008d 100644
--- a/boost/fiber/properties.hpp
+++ b/boost/fiber/properties.hpp
@@ -28,7 +28,7 @@ class context;
namespace algo {
-struct algorithm;
+class algorithm;
}
diff --git a/boost/fiber/scheduler.hpp b/boost/fiber/scheduler.hpp
index 4a5f0dab8c..accedf03d7 100644
--- a/boost/fiber/scheduler.hpp
+++ b/boost/fiber/scheduler.hpp
@@ -13,11 +13,7 @@
#include <vector>
#include <boost/config.hpp>
-#if (BOOST_EXECUTION_CONTEXT==1)
-# include <boost/context/execution_context.hpp>
-#else
-# include <boost/context/continuation.hpp>
-#endif
+#include <boost/context/continuation.hpp>
#include <boost/intrusive/list.hpp>
#include <boost/intrusive_ptr.hpp>
#include <boost/intrusive/set.hpp>
@@ -90,7 +86,7 @@ private:
detail::spinlock remote_ready_splk_{};
remote_ready_queue_type remote_ready_queue_{};
#endif
- alignas(cache_alignment) std::unique_ptr< algo::algorithm > algo_;
+ algo::algorithm::ptr_t algo_;
// sleep-queue contains context' which have been called
// scheduler::wait_until()
sleep_queue_type sleep_queue_{};
@@ -126,15 +122,9 @@ public:
void schedule_from_remote( context *) noexcept;
#endif
-#if (BOOST_EXECUTION_CONTEXT==1)
- void dispatch() noexcept;
-
- void terminate( detail::spinlock_lock &, context *) noexcept;
-#else
boost::context::continuation dispatch() noexcept;
boost::context::continuation terminate( detail::spinlock_lock &, context *) noexcept;
-#endif
void yield( context *) noexcept;
@@ -149,7 +139,7 @@ public:
bool has_ready_fibers() const noexcept;
- void set_algo( std::unique_ptr< algo::algorithm >) noexcept;
+ void set_algo( algo::algorithm::ptr_t) noexcept;
void attach_main_context( context *) noexcept;
diff --git a/boost/fiber/type.hpp b/boost/fiber/type.hpp
index 065d22fccb..d9ab0a94d7 100644
--- a/boost/fiber/type.hpp
+++ b/boost/fiber/type.hpp
@@ -29,7 +29,6 @@
#include <boost/fiber/detail/decay_copy.hpp>
#include <boost/fiber/detail/fss.hpp>
#include <boost/fiber/detail/spinlock.hpp>
-#include <boost/fiber/detail/wrap.hpp>
#include <boost/fiber/exceptions.hpp>
#include <boost/fiber/fixedsize_stack.hpp>
#include <boost/fiber/properties.hpp>
diff --git a/boost/fiber/unbuffered_channel.hpp b/boost/fiber/unbuffered_channel.hpp
index 38a2d6111e..1474299ded 100644
--- a/boost/fiber/unbuffered_channel.hpp
+++ b/boost/fiber/unbuffered_channel.hpp
@@ -38,7 +38,7 @@ public:
private:
typedef context::wait_queue_t wait_queue_type;
- struct alignas(cache_alignment) slot {
+ struct slot {
value_type value;
context * ctx;
@@ -54,12 +54,12 @@ private:
};
// shared cacheline
- alignas(cache_alignment) std::atomic< slot * > slot_{ nullptr };
+ std::atomic< slot * > slot_{ nullptr };
// shared cacheline
- alignas(cache_alignment) std::atomic_bool closed_{ false };
- alignas(cache_alignment) mutable detail::spinlock splk_producers_{};
+ std::atomic_bool closed_{ false };
+ mutable detail::spinlock splk_producers_{};
wait_queue_type waiting_producers_{};
- alignas( cache_alignment) mutable detail::spinlock splk_consumers_{};
+ mutable detail::spinlock splk_consumers_{};
wait_queue_type waiting_consumers_{};
char pad_[cacheline_length];
@@ -98,13 +98,6 @@ public:
~unbuffered_channel() {
close();
- slot * s = nullptr;
- if ( nullptr != ( s = try_pop_() ) ) {
- BOOST_ASSERT( nullptr != s);
- BOOST_ASSERT( nullptr != s->ctx);
- // value will be destructed in the context of the waiting fiber
- context::active()->schedule( s->ctx);
- }
}
unbuffered_channel( unbuffered_channel const&) = delete;
@@ -122,14 +115,46 @@ public:
while ( ! waiting_producers_.empty() ) {
context * producer_ctx = & waiting_producers_.front();
waiting_producers_.pop_front();
- active_ctx->schedule( producer_ctx);
+ std::intptr_t expected = reinterpret_cast< std::intptr_t >( this);
+ if ( producer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) {
+ // notify before timeout
+ intrusive_ptr_release( producer_ctx);
+ // notify context
+ active_ctx->schedule( producer_ctx);
+ } else if ( static_cast< std::intptr_t >( 0) == expected) {
+ // no timed-wait op.
+ // notify context
+ active_ctx->schedule( producer_ctx);
+ } else {
+ // timed-wait op.
+ // expected == -1: notify after timeout, same timed-wait op.
+ // expected == <any>: notify after timeout, another timed-wait op. was already started
+ intrusive_ptr_release( producer_ctx);
+ // re-schedule next
+ }
}
// notify all waiting consumers
detail::spinlock_lock lk2{ splk_consumers_ };
while ( ! waiting_consumers_.empty() ) {
context * consumer_ctx = & waiting_consumers_.front();
waiting_consumers_.pop_front();
- active_ctx->schedule( consumer_ctx);
+ std::intptr_t expected = reinterpret_cast< std::intptr_t >( this);
+ if ( consumer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) {
+ // notify before timeout
+ intrusive_ptr_release( consumer_ctx);
+ // notify context
+ active_ctx->schedule( consumer_ctx);
+ } else if ( static_cast< std::intptr_t >( 0) == expected) {
+ // no timed-wait op.
+ // notify context
+ active_ctx->schedule( consumer_ctx);
+ } else {
+ // timed-wait op.
+ // expected == -1: notify after timeout, same timed-wait op.
+ // expected == <any>: notify after timeout, another timed-wait op. was already started
+ intrusive_ptr_release( consumer_ctx);
+ // re-schedule next
+ }
}
}
@@ -137,16 +162,34 @@ public:
context * active_ctx = context::active();
slot s{ value, active_ctx };
for (;;) {
- if ( is_closed() ) {
+ if ( BOOST_UNLIKELY( is_closed() ) ) {
return channel_op_status::closed;
}
if ( try_push_( & s) ) {
detail::spinlock_lock lk{ splk_consumers_ };
// notify one waiting consumer
- if ( ! waiting_consumers_.empty() ) {
+ while ( ! waiting_consumers_.empty() ) {
context * consumer_ctx = & waiting_consumers_.front();
waiting_consumers_.pop_front();
- active_ctx->schedule( consumer_ctx);
+ std::intptr_t expected = reinterpret_cast< std::intptr_t >( this);
+ if ( consumer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) {
+ // notify before timeout
+ intrusive_ptr_release( consumer_ctx);
+ // notify context
+ active_ctx->schedule( consumer_ctx);
+ break;
+ } else if ( static_cast< std::intptr_t >( 0) == expected) {
+ // no timed-wait op.
+ // notify context
+ active_ctx->schedule( consumer_ctx);
+ break;
+ } else {
+ // timed-wait op.
+ // expected == -1: notify after timeout, same timed-wait op.
+ // expected == <any>: notify after timeout, another timed-wait op. was already started
+ intrusive_ptr_release( consumer_ctx);
+ // re-schedule next
+ }
}
// suspend till value has been consumed
active_ctx->suspend( lk);
@@ -154,13 +197,14 @@ public:
return channel_op_status::success;
} else {
detail::spinlock_lock lk{ splk_producers_ };
- if ( is_closed() ) {
+ if ( BOOST_UNLIKELY( is_closed() ) ) {
return channel_op_status::closed;
}
if ( is_empty_() ) {
continue;
}
active_ctx->wait_link( waiting_producers_);
+ active_ctx->twstatus.store( static_cast< std::intptr_t >( 0), std::memory_order_release);
// suspend this producer
active_ctx->suspend( lk);
// resumed, slot mabye free
@@ -172,16 +216,34 @@ public:
context * active_ctx = context::active();
slot s{ std::move( value), active_ctx };
for (;;) {
- if ( is_closed() ) {
+ if ( BOOST_UNLIKELY( is_closed() ) ) {
return channel_op_status::closed;
}
if ( try_push_( & s) ) {
detail::spinlock_lock lk{ splk_consumers_ };
// notify one waiting consumer
- if ( ! waiting_consumers_.empty() ) {
+ while ( ! waiting_consumers_.empty() ) {
context * consumer_ctx = & waiting_consumers_.front();
waiting_consumers_.pop_front();
- active_ctx->schedule( consumer_ctx);
+ std::intptr_t expected = reinterpret_cast< std::intptr_t >( this);
+ if ( consumer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) {
+ // notify before timeout
+ intrusive_ptr_release( consumer_ctx);
+ // notify context
+ active_ctx->schedule( consumer_ctx);
+ break;
+ } else if ( static_cast< std::intptr_t >( 0) == expected) {
+ // no timed-wait op.
+ // notify context
+ active_ctx->schedule( consumer_ctx);
+ break;
+ } else {
+ // timed-wait op.
+ // expected == -1: notify after timeout, same timed-wait op.
+ // expected == <any>: notify after timeout, another timed-wait op. was already started
+ intrusive_ptr_release( consumer_ctx);
+ // re-schedule next
+ }
}
// suspend till value has been consumed
active_ctx->suspend( lk);
@@ -189,13 +251,14 @@ public:
return channel_op_status::success;
} else {
detail::spinlock_lock lk{ splk_producers_ };
- if ( is_closed() ) {
+ if ( BOOST_UNLIKELY( is_closed() ) ) {
return channel_op_status::closed;
}
if ( is_empty_() ) {
continue;
}
active_ctx->wait_link( waiting_producers_);
+ active_ctx->twstatus.store( static_cast< std::intptr_t >( 0), std::memory_order_release);
// suspend this producer
active_ctx->suspend( lk);
// resumed, slot mabye free
@@ -224,18 +287,38 @@ public:
slot s{ value, active_ctx };
std::chrono::steady_clock::time_point timeout_time = detail::convert( timeout_time_);
for (;;) {
- if ( is_closed() ) {
+ if ( BOOST_UNLIKELY( is_closed() ) ) {
return channel_op_status::closed;
}
if ( try_push_( & s) ) {
detail::spinlock_lock lk{ splk_consumers_ };
// notify one waiting consumer
- if ( ! waiting_consumers_.empty() ) {
+ while ( ! waiting_consumers_.empty() ) {
context * consumer_ctx = & waiting_consumers_.front();
waiting_consumers_.pop_front();
- active_ctx->schedule( consumer_ctx);
+ std::intptr_t expected = reinterpret_cast< std::intptr_t >( this);
+ if ( consumer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) {
+ // notify before timeout
+ intrusive_ptr_release( consumer_ctx);
+ // notify context
+ active_ctx->schedule( consumer_ctx);
+ break;
+ } else if ( static_cast< std::intptr_t >( 0) == expected) {
+ // no timed-wait op.
+ // notify context
+ active_ctx->schedule( consumer_ctx);
+ break;
+ } else {
+ // timed-wait op.
+ // expected == -1: notify after timeout, same timed-wait op.
+ // expected == <any>: notify after timeout, another timed-wait op. was already started
+ intrusive_ptr_release( consumer_ctx);
+ // re-schedule next
+ }
}
// suspend this producer
+ intrusive_ptr_add_ref( active_ctx);
+ active_ctx->twstatus.store( reinterpret_cast< std::intptr_t >( this), std::memory_order_release);
if ( ! active_ctx->wait_until( timeout_time, lk) ) {
// clear slot
slot * nil_slot = nullptr, * own_slot = & s;
@@ -247,13 +330,15 @@ public:
return channel_op_status::success;
} else {
detail::spinlock_lock lk{ splk_producers_ };
- if ( is_closed() ) {
+ if ( BOOST_UNLIKELY( is_closed() ) ) {
return channel_op_status::closed;
}
if ( is_empty_() ) {
continue;
}
active_ctx->wait_link( waiting_producers_);
+ intrusive_ptr_add_ref( active_ctx);
+ active_ctx->twstatus.store( reinterpret_cast< std::intptr_t >( this), std::memory_order_release);
// suspend this producer
if ( ! active_ctx->wait_until( timeout_time, lk) ) {
// relock local lk
@@ -274,18 +359,38 @@ public:
slot s{ std::move( value), active_ctx };
std::chrono::steady_clock::time_point timeout_time = detail::convert( timeout_time_);
for (;;) {
- if ( is_closed() ) {
+ if ( BOOST_UNLIKELY( is_closed() ) ) {
return channel_op_status::closed;
}
if ( try_push_( & s) ) {
detail::spinlock_lock lk{ splk_consumers_ };
// notify one waiting consumer
- if ( ! waiting_consumers_.empty() ) {
+ while ( ! waiting_consumers_.empty() ) {
context * consumer_ctx = & waiting_consumers_.front();
waiting_consumers_.pop_front();
- active_ctx->schedule( consumer_ctx);
+ std::intptr_t expected = reinterpret_cast< std::intptr_t >( this);
+ if ( consumer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) {
+ // notify before timeout
+ intrusive_ptr_release( consumer_ctx);
+ // notify context
+ active_ctx->schedule( consumer_ctx);
+ break;
+ } else if ( static_cast< std::intptr_t >( 0) == expected) {
+ // no timed-wait op.
+ // notify context
+ active_ctx->schedule( consumer_ctx);
+ break;
+ } else {
+ // timed-wait op.
+ // expected == -1: notify after timeout, same timed-wait op.
+ // expected == <any>: notify after timeout, another timed-wait op. was already started
+ intrusive_ptr_release( consumer_ctx);
+ // re-schedule next
+ }
}
// suspend this producer
+ intrusive_ptr_add_ref( active_ctx);
+ active_ctx->twstatus.store( reinterpret_cast< std::intptr_t >( this), std::memory_order_release);
if ( ! active_ctx->wait_until( timeout_time, lk) ) {
// clear slot
slot * nil_slot = nullptr, * own_slot = & s;
@@ -297,13 +402,15 @@ public:
return channel_op_status::success;
} else {
detail::spinlock_lock lk{ splk_producers_ };
- if ( is_closed() ) {
+ if ( BOOST_UNLIKELY( is_closed() ) ) {
return channel_op_status::closed;
}
if ( is_empty_() ) {
continue;
}
active_ctx->wait_link( waiting_producers_);
+ intrusive_ptr_add_ref( active_ctx);
+ active_ctx->twstatus.store( reinterpret_cast< std::intptr_t >( this), std::memory_order_release);
// suspend this producer
if ( ! active_ctx->wait_until( timeout_time, lk) ) {
// relock local lk
@@ -325,27 +432,45 @@ public:
{
detail::spinlock_lock lk{ splk_producers_ };
// notify one waiting producer
- if ( ! waiting_producers_.empty() ) {
+ while ( ! waiting_producers_.empty() ) {
context * producer_ctx = & waiting_producers_.front();
waiting_producers_.pop_front();
lk.unlock();
- active_ctx->schedule( producer_ctx);
+ std::intptr_t expected = reinterpret_cast< std::intptr_t >( this);
+ if ( producer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) {
+ // notify before timeout
+ intrusive_ptr_release( producer_ctx);
+ // notify context
+ active_ctx->schedule( producer_ctx);
+ break;
+ } else if ( static_cast< std::intptr_t >( 0) == expected) {
+ // no timed-wait op.
+ // notify context
+ active_ctx->schedule( producer_ctx);
+ break;
+ } else {
+ // timed-wait op.
+ // expected == -1: notify after timeout, same timed-wait op.
+ // expected == <any>: notify after timeout, another timed-wait op. was already started
+ intrusive_ptr_release( producer_ctx);
+ // re-schedule next
+ }
}
}
- // consume value
value = std::move( s->value);
- // resume suspended producer
+ // notify context
active_ctx->schedule( s->ctx);
return channel_op_status::success;
} else {
detail::spinlock_lock lk{ splk_consumers_ };
- if ( is_closed() ) {
+ if ( BOOST_UNLIKELY( is_closed() ) ) {
return channel_op_status::closed;
}
if ( ! is_empty_() ) {
continue;
}
active_ctx->wait_link( waiting_consumers_);
+ active_ctx->twstatus.store( static_cast< std::intptr_t >( 0), std::memory_order_release);
// suspend this consumer
active_ctx->suspend( lk);
// resumed, slot mabye set
@@ -361,21 +486,39 @@ public:
{
detail::spinlock_lock lk{ splk_producers_ };
// notify one waiting producer
- if ( ! waiting_producers_.empty() ) {
+ while ( ! waiting_producers_.empty() ) {
context * producer_ctx = & waiting_producers_.front();
waiting_producers_.pop_front();
lk.unlock();
- active_ctx->schedule( producer_ctx);
+ std::intptr_t expected = reinterpret_cast< std::intptr_t >( this);
+ if ( producer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) {
+ // notify before timeout
+ intrusive_ptr_release( producer_ctx);
+ // notify context
+ active_ctx->schedule( producer_ctx);
+ break;
+ } else if ( static_cast< std::intptr_t >( 0) == expected) {
+ // no timed-wait op.
+ // notify context
+ active_ctx->schedule( producer_ctx);
+ break;
+ } else {
+ // timed-wait op.
+ // expected == -1: notify after timeout, same timed-wait op.
+ // expected == <any>: notify after timeout, another timed-wait op. was already started
+ intrusive_ptr_release( producer_ctx);
+ // re-schedule next
+ }
}
}
// consume value
- value_type value{ std::move( s->value) };
- // resume suspended producer
+ value_type value = std::move( s->value);
+ // notify context
active_ctx->schedule( s->ctx);
return std::move( value);
} else {
detail::spinlock_lock lk{ splk_consumers_ };
- if ( is_closed() ) {
+ if ( BOOST_UNLIKELY( is_closed() ) ) {
throw fiber_error{
std::make_error_code( std::errc::operation_not_permitted),
"boost fiber: channel is closed" };
@@ -384,6 +527,7 @@ public:
continue;
}
active_ctx->wait_link( waiting_consumers_);
+ active_ctx->twstatus.store( static_cast< std::intptr_t >( 0), std::memory_order_release);
// suspend this consumer
active_ctx->suspend( lk);
// resumed, slot mabye set
@@ -409,27 +553,47 @@ public:
{
detail::spinlock_lock lk{ splk_producers_ };
// notify one waiting producer
- if ( ! waiting_producers_.empty() ) {
+ while ( ! waiting_producers_.empty() ) {
context * producer_ctx = & waiting_producers_.front();
waiting_producers_.pop_front();
lk.unlock();
- active_ctx->schedule( producer_ctx);
+ std::intptr_t expected = reinterpret_cast< std::intptr_t >( this);
+ if ( producer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) {
+ // notify before timeout
+ intrusive_ptr_release( producer_ctx);
+ // notify context
+ active_ctx->schedule( producer_ctx);
+ break;
+ } else if ( static_cast< std::intptr_t >( 0) == expected) {
+ // no timed-wait op.
+ // notify context
+ active_ctx->schedule( producer_ctx);
+ break;
+ } else {
+ // timed-wait op.
+ // expected == -1: notify after timeout, same timed-wait op.
+ // expected == <any>: notify after timeout, another timed-wait op. was already started
+ intrusive_ptr_release( producer_ctx);
+ // re-schedule next
+ }
}
}
// consume value
value = std::move( s->value);
- // resume suspended producer
+ // notify context
active_ctx->schedule( s->ctx);
return channel_op_status::success;
} else {
detail::spinlock_lock lk{ splk_consumers_ };
- if ( is_closed() ) {
+ if ( BOOST_UNLIKELY( is_closed() ) ) {
return channel_op_status::closed;
}
if ( ! is_empty_() ) {
continue;
}
active_ctx->wait_link( waiting_consumers_);
+ intrusive_ptr_add_ref( active_ctx);
+ active_ctx->twstatus.store( reinterpret_cast< std::intptr_t >( this), std::memory_order_release);
// suspend this consumer
if ( ! active_ctx->wait_until( timeout_time, lk) ) {
// relock local lk