diff options
Diffstat (limited to 'boost/thread/future.hpp')
-rw-r--r-- | boost/thread/future.hpp | 1413 |
1 files changed, 760 insertions, 653 deletions
diff --git a/boost/thread/future.hpp b/boost/thread/future.hpp index a1e69c355a..e6e22363aa 100644 --- a/boost/thread/future.hpp +++ b/boost/thread/future.hpp @@ -1,5 +1,5 @@ // (C) Copyright 2008-10 Anthony Williams -// (C) Copyright 2011-2014 Vicente J. Botet Escriba +// (C) Copyright 2011-2015 Vicente J. Botet Escriba // // Distributed under the Boost Software License, Version 1.0. (See // accompanying file LICENSE_1_0.txt or copy at @@ -15,49 +15,52 @@ #ifndef BOOST_NO_EXCEPTIONS -#include <boost/core/scoped_enum.hpp> -#include <stdexcept> -#include <iostream> -#include <boost/thread/exceptional_ptr.hpp> +#include <boost/thread/condition_variable.hpp> #include <boost/thread/detail/move.hpp> #include <boost/thread/detail/invoker.hpp> #include <boost/thread/detail/invoke.hpp> -#include <boost/thread/thread_time.hpp> -#include <boost/thread/mutex.hpp> -#include <boost/thread/condition_variable.hpp> +#include <boost/thread/detail/is_convertible.hpp> +#include <boost/thread/exceptional_ptr.hpp> +#include <boost/thread/futures/future_error.hpp> +#include <boost/thread/futures/future_error_code.hpp> +#include <boost/thread/futures/future_status.hpp> +#include <boost/thread/futures/is_future_type.hpp> +#include <boost/thread/futures/launch.hpp> +#include <boost/thread/futures/wait_for_all.hpp> +#include <boost/thread/futures/wait_for_any.hpp> #include <boost/thread/lock_algorithms.hpp> #include <boost/thread/lock_types.hpp> -#include <boost/exception_ptr.hpp> -#include <boost/shared_ptr.hpp> +#include <boost/thread/mutex.hpp> +#include <boost/thread/thread_only.hpp> +#include <boost/thread/thread_time.hpp> + #if defined BOOST_THREAD_FUTURE_USES_OPTIONAL #include <boost/optional.hpp> #else #include <boost/thread/csbl/memory/unique_ptr.hpp> -//#include <boost/move/make_unique.hpp> #endif -#include <boost/type_traits/is_fundamental.hpp> -#include <boost/thread/detail/is_convertible.hpp> -#include <boost/type_traits/decay.hpp> -#include <boost/type_traits/is_void.hpp> -#include <boost/type_traits/conditional.hpp> -#include <boost/config.hpp> -#include <boost/throw_exception.hpp> -#include <algorithm> -#include <boost/function.hpp> + +#include <boost/assert.hpp> #include <boost/bind.hpp> +#ifdef BOOST_THREAD_USES_CHRONO +#include <boost/chrono/system_clocks.hpp> +#endif +#include <boost/core/enable_if.hpp> #include <boost/core/ref.hpp> -#include <boost/scoped_array.hpp> #include <boost/enable_shared_from_this.hpp> -#include <boost/core/enable_if.hpp> - -#include <list> +#include <boost/exception_ptr.hpp> +#include <boost/function.hpp> #include <boost/next_prior.hpp> -#include <vector> +#include <boost/scoped_array.hpp> +#include <boost/shared_ptr.hpp> +#include <boost/throw_exception.hpp> +#include <boost/type_traits/conditional.hpp> +#include <boost/type_traits/decay.hpp> +#include <boost/type_traits/is_copy_constructible.hpp> +#include <boost/type_traits/is_fundamental.hpp> +#include <boost/type_traits/is_void.hpp> +#include <boost/utility/result_of.hpp> -#include <boost/thread/future_error_code.hpp> -#ifdef BOOST_THREAD_USES_CHRONO -#include <boost/chrono/system_clocks.hpp> -#endif #if defined BOOST_THREAD_PROVIDES_FUTURE_CTOR_ALLOCATORS #include <boost/thread/detail/memory.hpp> @@ -67,14 +70,16 @@ #endif #endif -#include <boost/utility/result_of.hpp> -#include <boost/thread/thread_only.hpp> - #if defined BOOST_THREAD_PROVIDES_FUTURE_WHEN_ALL_WHEN_ANY #include <boost/thread/csbl/tuple.hpp> #include <boost/thread/csbl/vector.hpp> #endif +#include <algorithm> +#include <list> +#include <vector> +#include <utility> + #if defined BOOST_THREAD_PROVIDES_FUTURE #define BOOST_THREAD_FUTURE future #else @@ -83,134 +88,26 @@ namespace boost { - - //enum class launch - BOOST_SCOPED_ENUM_DECLARE_BEGIN(launch) - { - none = 0, - async = 1, - deferred = 2, -#ifdef BOOST_THREAD_PROVIDES_EXECUTORS - executor = 4, -#endif - any = async | deferred - } - BOOST_SCOPED_ENUM_DECLARE_END(launch) - - //enum class future_status - BOOST_SCOPED_ENUM_DECLARE_BEGIN(future_status) - { - ready, - timeout, - deferred - } - BOOST_SCOPED_ENUM_DECLARE_END(future_status) - - class BOOST_SYMBOL_VISIBLE future_error - : public std::logic_error - { - system::error_code ec_; - public: - future_error(system::error_code ec) - : logic_error(ec.message()), - ec_(ec) - { - } - - const system::error_code& code() const BOOST_NOEXCEPT - { - return ec_; - } - }; - - class BOOST_SYMBOL_VISIBLE future_uninitialized: - public future_error - { - public: - future_uninitialized() : - future_error(system::make_error_code(future_errc::no_state)) - {} - }; - class BOOST_SYMBOL_VISIBLE broken_promise: - public future_error - { - public: - broken_promise(): - future_error(system::make_error_code(future_errc::broken_promise)) - {} - }; - class BOOST_SYMBOL_VISIBLE future_already_retrieved: - public future_error - { - public: - future_already_retrieved(): - future_error(system::make_error_code(future_errc::future_already_retrieved)) - {} - }; - class BOOST_SYMBOL_VISIBLE promise_already_satisfied: - public future_error - { - public: - promise_already_satisfied(): - future_error(system::make_error_code(future_errc::promise_already_satisfied)) - {} - }; - - class BOOST_SYMBOL_VISIBLE task_already_started: - public future_error - { - public: - task_already_started(): - future_error(system::make_error_code(future_errc::promise_already_satisfied)) - {} - }; - - class BOOST_SYMBOL_VISIBLE task_moved: - public future_error - { - public: - task_moved(): - future_error(system::make_error_code(future_errc::no_state)) - {} - }; - - class promise_moved: - public future_error - { - public: - promise_moved(): - future_error(system::make_error_code(future_errc::no_state)) - {} - }; - - namespace future_state - { - enum state { uninitialized, waiting, ready, moved, deferred }; - } - namespace detail { struct relocker { boost::unique_lock<boost::mutex>& lock_; - bool unlocked_; relocker(boost::unique_lock<boost::mutex>& lk): lock_(lk) { lock_.unlock(); - unlocked_=true; } ~relocker() { - if (unlocked_) { + if (! lock_.owns_lock()) { lock_.lock(); } } void lock() { - if (unlocked_) { + if (! lock_.owns_lock()) { lock_.lock(); - unlocked_=false; } } private: @@ -220,36 +117,72 @@ namespace boost struct shared_state_base : enable_shared_from_this<shared_state_base> { typedef std::list<boost::condition_variable_any*> waiter_list; + typedef waiter_list::iterator notify_when_ready_handle; // This type should be only included conditionally if interruptions are allowed, but is included to maintain the same layout. typedef shared_ptr<shared_state_base> continuation_ptr_type; + typedef std::vector<continuation_ptr_type> continuations_type; boost::exception_ptr exception; bool done; + bool is_valid_; bool is_deferred_; - launch policy_; bool is_constructed; + std::size_t cnt_; + launch policy_; mutable boost::mutex mutex; boost::condition_variable waiters; waiter_list external_waiters; boost::function<void()> callback; // This declaration should be only included conditionally, but is included to maintain the same layout. - continuation_ptr_type continuation_ptr; + continuations_type continuations; // This declaration should be only included conditionally, but is included to maintain the same layout. - virtual void launch_continuation(boost::unique_lock<boost::mutex>&) + virtual void launch_continuation(boost::unique_lock<boost::mutex>&, shared_ptr<shared_state_base>) { } shared_state_base(): done(false), + is_valid_(true), is_deferred_(false), - policy_(launch::none), is_constructed(false), - continuation_ptr() + cnt_(0), + policy_(launch::none), + continuations() {} virtual ~shared_state_base() + { + BOOST_ASSERT(cnt_==0); + } + virtual void block_if_needed(boost::unique_lock<boost::mutex>&) {} + bool valid(boost::unique_lock<boost::mutex>&) { return is_valid_; } + bool valid() { + boost::unique_lock<boost::mutex> lk(this->mutex); + return valid(lk); + } + void invalidate(boost::unique_lock<boost::mutex>&) { is_valid_ = false; } + void invalidate() { + boost::unique_lock<boost::mutex> lk(this->mutex); + invalidate(lk); + } + void validate(boost::unique_lock<boost::mutex>&) { is_valid_ = true; } + void validate() { + boost::unique_lock<boost::mutex> lk(this->mutex); + validate(lk); + } + + void inc(boost::unique_lock<boost::mutex>&) { ++cnt_; } + void inc() { boost::unique_lock<boost::mutex> lk(this->mutex); inc(lk); } + + void dec(boost::unique_lock<boost::mutex>& lk) { + if (--cnt_ == 0) { + block_if_needed(lk); + } + } + void dec() { boost::unique_lock<boost::mutex> lk(this->mutex); dec(lk); } + void set_deferred() { is_deferred_ = true; @@ -267,14 +200,14 @@ namespace boost policy_ = launch::executor; } #endif - waiter_list::iterator register_external_waiter(boost::condition_variable_any& cv) + notify_when_ready_handle notify_when_ready(boost::condition_variable_any& cv) { boost::unique_lock<boost::mutex> lock(this->mutex); do_callback(lock); return external_waiters.insert(external_waiters.end(),&cv); } - void remove_external_waiter(waiter_list::iterator it) + void unnotify_when_ready(notify_when_ready_handle it) { boost::lock_guard<boost::mutex> lock(this->mutex); external_waiters.erase(it); @@ -283,10 +216,14 @@ namespace boost #if defined BOOST_THREAD_PROVIDES_FUTURE_CONTINUATION void do_continuation(boost::unique_lock<boost::mutex>& lock) { - if (continuation_ptr) { - continuation_ptr_type this_continuation_ptr = continuation_ptr; - continuation_ptr.reset(); - this_continuation_ptr->launch_continuation(lock); + if (! continuations.empty()) { + continuations_type the_continuations = continuations; + continuations.clear(); + relocker rlk(lock); + for (continuations_type::iterator it = the_continuations.begin(); it != the_continuations.end(); ++it) { + boost::unique_lock<boost::mutex> cont_lock((*it)->mutex); + (*it)->launch_continuation(cont_lock, *it); + } } } #else @@ -295,9 +232,9 @@ namespace boost } #endif #if defined BOOST_THREAD_PROVIDES_FUTURE_CONTINUATION - void set_continuation_ptr(continuation_ptr_type continuation, boost::unique_lock<boost::mutex>& lock) + virtual void set_continuation_ptr(continuation_ptr_type continuation, boost::unique_lock<boost::mutex>& lock) { - continuation_ptr= continuation; + continuations.push_back(continuation); if (done) { do_continuation(lock); } @@ -373,12 +310,17 @@ namespace boost } } - virtual void wait(bool rethrow=true) + virtual void wait(boost::unique_lock<boost::mutex>& lock, bool rethrow=true) { - boost::unique_lock<boost::mutex> lock(this->mutex); wait_internal(lock, rethrow); } + void wait(bool rethrow=true) + { + boost::unique_lock<boost::mutex> lock(this->mutex); + wait(lock, rethrow); + } + #if defined BOOST_THREAD_USES_DATETIME bool timed_wait_until(boost::system_time const& target_time) { @@ -431,27 +373,6 @@ namespace boost mark_exceptional_finish_internal(boost::current_exception(), lock); } -#if defined BOOST_THREAD_PROVIDES_INTERRUPTIONS - void mark_interrupted_finish() - { - boost::unique_lock<boost::mutex> lock(this->mutex); - exception = boost::copy_exception(boost::thread_interrupted()); - mark_finished_internal(lock); - } - - void set_interrupted_at_thread_exit() - { - unique_lock<boost::mutex> lk(this->mutex); - if (has_value(lk)) - { - throw_exception(promise_already_satisfied()); - } - exception = boost::copy_exception(boost::thread_interrupted()); - this->is_constructed = true; - detail::make_ready_at_thread_exit(shared_from_this()); - } -#endif - void set_exception_at_thread_exit(exception_ptr e) { unique_lock<boost::mutex> lk(this->mutex); @@ -481,20 +402,22 @@ namespace boost return done && exception; } - bool has_exception(unique_lock<boost::mutex>&) const - { - return done && exception; - } - - bool is_deferred(boost::lock_guard<boost::mutex>&) const { - return is_deferred_; - } - launch launch_policy(boost::unique_lock<boost::mutex>&) const { return policy_; } + future_state::state get_state(boost::unique_lock<boost::mutex>& lk) const + { + if(!done) + { + return future_state::waiting; + } + else + { + return future_state::ready; + } + } future_state::state get_state() const { boost::lock_guard<boost::mutex> guard(this->mutex); @@ -511,10 +434,6 @@ namespace boost exception_ptr get_exception_ptr() { boost::unique_lock<boost::mutex> lock(this->mutex); - return get_exception_ptr(lock); - } - exception_ptr get_exception_ptr(boost::unique_lock<boost::mutex>& lock) - { wait_internal(lock, false); return exception; } @@ -580,21 +499,29 @@ namespace boost void mark_finished_with_result_internal(rvalue_source_type result_, boost::unique_lock<boost::mutex>& lock) { -#if ! defined BOOST_NO_CXX11_RVALUE_REFERENCES #if defined BOOST_THREAD_FUTURE_USES_OPTIONAL result = boost::move(result_); -#else +#elif ! defined BOOST_NO_CXX11_RVALUE_REFERENCES result.reset(new T(boost::move(result_))); -#endif -#else -#if defined BOOST_THREAD_FUTURE_USES_OPTIONAL - result = boost::move(result_); #else result.reset(new T(static_cast<rvalue_source_type>(result_))); #endif + this->mark_finished_internal(lock); + } + + +#if ! defined(BOOST_NO_CXX11_VARIADIC_TEMPLATES) + template <class ...Args> + void mark_finished_with_result_internal(boost::unique_lock<boost::mutex>& lock, BOOST_THREAD_FWD_REF(Args)... args) + { +#if defined BOOST_THREAD_FUTURE_USES_OPTIONAL + result.emplace(boost::forward<Args>(args)...); +#else + result.reset(new T(boost::forward<Args>(args)...)); #endif this->mark_finished_internal(lock); } +#endif void mark_finished_with_result(source_reference_type result_) { @@ -613,21 +540,29 @@ namespace boost #endif } - storage_type& get_storage(boost::unique_lock<boost::mutex>& lock) + storage_type& get_storage(boost::unique_lock<boost::mutex>& lk) { - wait_internal(lock); + wait_internal(lk); return result; } - virtual move_dest_type get() + virtual move_dest_type get(boost::unique_lock<boost::mutex>& lk) { - boost::unique_lock<boost::mutex> lock(this->mutex); - return boost::move(*get_storage(lock)); + return boost::move(*get_storage(lk)); + } + move_dest_type get() + { + boost::unique_lock<boost::mutex> lk(this->mutex); + return this->get(lk); } - virtual shared_future_get_result_type get_sh() + virtual shared_future_get_result_type get_sh(boost::unique_lock<boost::mutex>& lk) { - boost::unique_lock<boost::mutex> lock(this->mutex); - return *get_storage(lock); + return *get_storage(lk); + } + shared_future_get_result_type get_sh() + { + boost::unique_lock<boost::mutex> lk(this->mutex); + return this->get_sh(lk); } void set_value_at_thread_exit(source_reference_type result_) @@ -705,19 +640,27 @@ namespace boost mark_finished_with_result_internal(result_, lock); } - virtual T& get() + virtual T& get(boost::unique_lock<boost::mutex>& lock) { - boost::unique_lock<boost::mutex> lock(this->mutex); wait_internal(lock); return *result; } + T& get() + { + boost::unique_lock<boost::mutex> lk(this->mutex); + return get(lk); + } - virtual T& get_sh() + virtual T& get_sh(boost::unique_lock<boost::mutex>& lock) { - boost::unique_lock<boost::mutex> lock(this->mutex); wait_internal(lock); return *result; } + T& get_sh() + { + boost::unique_lock<boost::mutex> lock(this->mutex); + return get_sh(lock); + } void set_value_at_thread_exit(T& result_) { @@ -755,17 +698,25 @@ namespace boost mark_finished_with_result_internal(lock); } - virtual void get() + virtual void get(boost::unique_lock<boost::mutex>& lock) { - boost::unique_lock<boost::mutex> lock(this->mutex); this->wait_internal(lock); } - - virtual void get_sh() + void get() { boost::unique_lock<boost::mutex> lock(this->mutex); + this->get(lock); + } + + virtual void get_sh(boost::unique_lock<boost::mutex>& lock) + { this->wait_internal(lock); } + void get_sh() + { + boost::unique_lock<boost::mutex> lock(this->mutex); + this->get_sh(lock); + } void set_value_at_thread_exit() { @@ -801,15 +752,23 @@ namespace boost this->set_async(); } + virtual void block_if_needed(boost::unique_lock<boost::mutex>& lk) + { + this->wait(lk, false); + } + ~future_async_shared_state_base() { join(); } - virtual void wait(bool rethrow) + virtual void wait(boost::unique_lock<boost::mutex>& lk, bool rethrow) { - join(); - this->base_type::wait(rethrow); + { + relocker rlk(lk); + join(); + } + this->base_type::wait(lk, rethrow); } }; @@ -819,23 +778,23 @@ namespace boost template<typename Rp, typename Fp> struct future_async_shared_state: future_async_shared_state_base<Rp> { - explicit future_async_shared_state(BOOST_THREAD_FWD_REF(Fp) f) + future_async_shared_state() + { + } + + void init(BOOST_THREAD_FWD_REF(Fp) f) { - this->thr_ = thread(&future_async_shared_state::run, this, boost::forward<Fp>(f)); + shared_ptr<boost::detail::shared_state_base> that = this->shared_from_this(); + this->thr_ = thread(&future_async_shared_state::run, that, boost::forward<Fp>(f)); } - static void run(future_async_shared_state* that, BOOST_THREAD_FWD_REF(Fp) f) + static void run(shared_ptr<boost::detail::shared_state_base> that_, BOOST_THREAD_FWD_REF(Fp) f) { + future_async_shared_state* that = dynamic_cast<future_async_shared_state*>(that_.get()); try { that->mark_finished_with_result(f()); } -#if defined BOOST_THREAD_PROVIDES_INTERRUPTIONS - catch(thread_interrupted& ) - { - that->mark_interrupted_finish(); - } -#endif catch(...) { that->mark_exceptional_finish(); @@ -846,24 +805,19 @@ namespace boost template<typename Fp> struct future_async_shared_state<void, Fp>: public future_async_shared_state_base<void> { - explicit future_async_shared_state(BOOST_THREAD_FWD_REF(Fp) f) + void init(BOOST_THREAD_FWD_REF(Fp) f) { - this->thr_ = thread(&future_async_shared_state::run, this, boost::move(f)); + this->thr_ = thread(&future_async_shared_state::run, this->shared_from_this(), boost::move(f)); } - static void run(future_async_shared_state* that, BOOST_THREAD_FWD_REF(Fp) f) + static void run(shared_ptr<boost::detail::shared_state_base> that_, BOOST_THREAD_FWD_REF(Fp) f) { + future_async_shared_state* that = dynamic_cast<future_async_shared_state*>(that_.get()); try { f(); that->mark_finished_with_result(); } -#if defined BOOST_THREAD_PROVIDES_INTERRUPTIONS - catch(thread_interrupted& ) - { - that->mark_interrupted_finish(); - } -#endif catch(...) { that->mark_exceptional_finish(); @@ -874,23 +828,18 @@ namespace boost template<typename Rp, typename Fp> struct future_async_shared_state<Rp&, Fp>: future_async_shared_state_base<Rp&> { - explicit future_async_shared_state(BOOST_THREAD_FWD_REF(Fp) f) + void init(BOOST_THREAD_FWD_REF(Fp) f) { - this->thr_ = thread(&future_async_shared_state::run, this, boost::move(f)); + this->thr_ = thread(&future_async_shared_state::run, this->shared_from_this(), boost::move(f)); } - static void run(future_async_shared_state* that, BOOST_THREAD_FWD_REF(Fp) f) + static void run(shared_ptr<boost::detail::shared_state_base> that_, BOOST_THREAD_FWD_REF(Fp) f) { + future_async_shared_state* that = dynamic_cast<future_async_shared_state*>(that_.get()); try { that->mark_finished_with_result(f()); } -#if defined BOOST_THREAD_PROVIDES_INTERRUPTIONS - catch(thread_interrupted& ) - { - that->mark_interrupted_finish(); - } -#endif catch(...) { that->mark_exceptional_finish(); @@ -991,13 +940,13 @@ namespace boost struct registered_waiter { boost::shared_ptr<detail::shared_state_base> future_; - detail::shared_state_base::waiter_list::iterator wait_iterator; + detail::shared_state_base::notify_when_ready_handle handle; count_type index; registered_waiter(boost::shared_ptr<detail::shared_state_base> const& a_future, - detail::shared_state_base::waiter_list::iterator wait_iterator_, + detail::shared_state_base::notify_when_ready_handle handle_, count_type index_): - future_(a_future),wait_iterator(wait_iterator_),index(index_) + future_(a_future),handle(handle_),index(index_) {} }; @@ -1035,7 +984,7 @@ namespace boost }; boost::condition_variable_any cv; - std::vector<registered_waiter> futures; + std::vector<registered_waiter> futures_; count_type future_count; public: @@ -1048,11 +997,11 @@ namespace boost { if(f.future_) { - registered_waiter waiter(f.future_,f.future_->register_external_waiter(cv),future_count); + registered_waiter waiter(f.future_,f.future_->notify_when_ready(cv),future_count); try { - futures.push_back(waiter); + futures_.push_back(waiter); } catch(...) { - f.future_->remove_external_waiter(waiter.wait_iterator); + f.future_->unnotify_when_ready(waiter.handle); throw; } } @@ -1069,14 +1018,14 @@ namespace boost count_type wait() { - all_futures_lock lk(futures); + all_futures_lock lk(futures_); for(;;) { - for(count_type i=0;i<futures.size();++i) + for(count_type i=0;i<futures_.size();++i) { - if(futures[i].future_->done) + if(futures_[i].future_->done) { - return futures[i].index; + return futures_[i].index; } } cv.wait(lk); @@ -1085,9 +1034,9 @@ namespace boost ~future_waiter() { - for(count_type i=0;i<futures.size();++i) + for(count_type i=0;i<futures_.size();++i) { - futures[i].future_->remove_external_waiter(futures[i].wait_iterator); + futures_[i].future_->unnotify_when_ready(futures_[i].handle); } } }; @@ -1101,93 +1050,28 @@ namespace boost class shared_future; template<typename T> - struct is_future_type - { - BOOST_STATIC_CONSTANT(bool, value=false); - typedef void type; - }; - - template<typename T> - struct is_future_type<BOOST_THREAD_FUTURE<T> > + struct is_future_type<BOOST_THREAD_FUTURE<T> > : true_type { - BOOST_STATIC_CONSTANT(bool, value=true); - typedef T type; }; template<typename T> - struct is_future_type<shared_future<T> > + struct is_future_type<shared_future<T> > : true_type { - BOOST_STATIC_CONSTANT(bool, value=true); - typedef T type; }; - template<typename Iterator> - typename boost::disable_if<is_future_type<Iterator>,void>::type wait_for_all(Iterator begin,Iterator end) - { - for(Iterator current=begin;current!=end;++current) - { - current->wait(); - } - } - -#ifdef BOOST_NO_CXX11_VARIADIC_TEMPLATES - template<typename F1,typename F2> - typename boost::enable_if<is_future_type<F1>,void>::type wait_for_all(F1& f1,F2& f2) - { - f1.wait(); - f2.wait(); - } - - template<typename F1,typename F2,typename F3> - void wait_for_all(F1& f1,F2& f2,F3& f3) - { - f1.wait(); - f2.wait(); - f3.wait(); - } - - template<typename F1,typename F2,typename F3,typename F4> - void wait_for_all(F1& f1,F2& f2,F3& f3,F4& f4) - { - f1.wait(); - f2.wait(); - f3.wait(); - f4.wait(); - } - - template<typename F1,typename F2,typename F3,typename F4,typename F5> - void wait_for_all(F1& f1,F2& f2,F3& f3,F4& f4,F5& f5) - { - f1.wait(); - f2.wait(); - f3.wait(); - f4.wait(); - f5.wait(); - } -#else - template<typename F1, typename... Fs> - void wait_for_all(F1& f1, Fs&... fs) - { - bool dummy[] = { (f1.wait(), true), (fs.wait(), true)... }; - - // prevent unused parameter warning - (void) dummy; - } -#endif // !defined(BOOST_NO_CXX11_VARIADIC_TEMPLATES) - - template<typename Iterator> - typename boost::disable_if<is_future_type<Iterator>,Iterator>::type wait_for_any(Iterator begin,Iterator end) - { - if(begin==end) - return end; - - detail::future_waiter waiter; - for(Iterator current=begin;current!=end;++current) - { - waiter.add(*current); - } - return boost::next(begin,waiter.wait()); - } +// template<typename Iterator> +// typename boost::disable_if<is_future_type<Iterator>,Iterator>::type wait_for_any(Iterator begin,Iterator end) +// { +// if(begin==end) +// return end; +// +// detail::future_waiter waiter; +// for(Iterator current=begin;current!=end;++current) +// { +// waiter.add(*current); +// } +// return boost::next(begin,waiter.wait()); +// } #ifdef BOOST_NO_CXX11_VARIADIC_TEMPLATES template<typename F1,typename F2> @@ -1252,6 +1136,7 @@ namespace boost /// Common implementation for all the futures independently of the return type class base_future { + public: }; /// Common implementation for future and shared_future. template <typename R> @@ -1277,15 +1162,13 @@ namespace boost future_ = p.get_future().future_; } - future_ptr future_; basic_future(future_ptr a_future): future_(a_future) { + if (a_future) a_future->inc(); } - // Copy construction from a shared_future - explicit basic_future(const shared_future<R>&) BOOST_NOEXCEPT; public: typedef future_state::state state; @@ -1298,9 +1181,12 @@ namespace boost basic_future(exceptional_ptr const& ex) : future_(make_exceptional_future_ptr(ex)) { + future_->inc(); } - ~basic_future() {} + ~basic_future() { + if (future_) future_->dec(); + } basic_future(BOOST_THREAD_RV_REF(basic_future) other) BOOST_NOEXCEPT: future_(BOOST_THREAD_RV(other).future_) @@ -1309,6 +1195,9 @@ namespace boost } basic_future& operator=(BOOST_THREAD_RV_REF(basic_future) other) BOOST_NOEXCEPT { + if (this->future_) { + this->future_->dec(); + } future_=BOOST_THREAD_RV(other).future_; BOOST_THREAD_RV(other).future_.reset(); return *this; @@ -1318,6 +1207,14 @@ namespace boost future_.swap(that.future_); } // functions to check state, and wait for ready + state get_state(boost::unique_lock<boost::mutex>& lk) const + { + if(!future_) + { + return future_state::uninitialized; + } + return future_->get_state(lk); + } state get_state() const { if(!future_) @@ -1332,6 +1229,10 @@ namespace boost return get_state()==future_state::ready; } + bool is_ready(boost::unique_lock<boost::mutex>& lk) const + { + return get_state(lk)==future_state::ready; + } bool has_exception() const { return future_ && future_->has_exception(); @@ -1357,7 +1258,7 @@ namespace boost bool valid() const BOOST_NOEXCEPT { - return future_ != 0; + return future_ != 0 && future_->valid(); } void wait() const @@ -1369,6 +1270,34 @@ namespace boost future_->wait(false); } + typedef detail::shared_state_base::notify_when_ready_handle notify_when_ready_handle; + + boost::mutex& mutex() { + if(!future_) + { + boost::throw_exception(future_uninitialized()); + } + return future_->mutex; + }; + + notify_when_ready_handle notify_when_ready(boost::condition_variable_any& cv) + { + if(!future_) + { + boost::throw_exception(future_uninitialized()); + } + return future_->notify_when_ready(cv); + } + + void unnotify_when_ready(notify_when_ready_handle h) + { + if(!future_) + { + boost::throw_exception(future_uninitialized()); + } + return future_->unnotify_when_ready(h); + } + #if defined BOOST_THREAD_USES_DATETIME template<typename Duration> bool timed_wait(Duration const& rel_time) const @@ -1516,6 +1445,10 @@ namespace boost } inline BOOST_THREAD_FUTURE(BOOST_THREAD_RV_REF(BOOST_THREAD_FUTURE<BOOST_THREAD_FUTURE<R> >) other); // EXTENSION + explicit BOOST_THREAD_FUTURE(BOOST_THREAD_RV_REF(shared_future<R>) other) : + base_type(boost::move(static_cast<base_type&>(BOOST_THREAD_RV(other)))) + {} + BOOST_THREAD_FUTURE& operator=(BOOST_THREAD_RV_REF(BOOST_THREAD_FUTURE) other) BOOST_NOEXCEPT { this->base_type::operator=(boost::move(static_cast<base_type&>(BOOST_THREAD_RV(other)))); @@ -1551,32 +1484,42 @@ namespace boost // retrieving the value move_dest_type get() { - if(!this->future_) + if (this->future_ == 0) + { + boost::throw_exception(future_uninitialized()); + } + unique_lock<boost::mutex> lk(this->future_->mutex); + if (! this->future_->valid(lk)) { boost::throw_exception(future_uninitialized()); } - future_ptr fut_=this->future_; #ifdef BOOST_THREAD_PROVIDES_FUTURE_INVALID_AFTER_GET - this->future_.reset(); + this->future_->invalidate(lk); #endif - return fut_->get(); + return this->future_->get(lk); } template <typename R2> typename boost::disable_if< is_void<R2>, move_dest_type>::type get_or(BOOST_THREAD_RV_REF(R2) v) { - if(!this->future_) + + if (this->future_ == 0) + { + boost::throw_exception(future_uninitialized()); + } + unique_lock<boost::mutex> lk(this->future_->mutex); + if (! this->future_->valid(lk)) { boost::throw_exception(future_uninitialized()); } - this->future_->wait(false); - future_ptr fut_=this->future_; + this->future_->wait(lk, false); #ifdef BOOST_THREAD_PROVIDES_FUTURE_INVALID_AFTER_GET - this->future_.reset(); + this->future_->invalidate(lk); #endif - if (fut_->has_value()) { - return fut_->get(); + + if (this->future_->has_value(lk)) { + return this->future_->get(lk); } else { return boost::move(v); @@ -1587,17 +1530,21 @@ namespace boost typename boost::disable_if< is_void<R2>, move_dest_type>::type get_or(R2 const& v) // EXTENSION { - if(!this->future_) + if (this->future_ == 0) { boost::throw_exception(future_uninitialized()); } - this->future_->wait(false); - future_ptr fut_=this->future_; + unique_lock<boost::mutex> lk(this->future_->mutex); + if (! this->future_->valid(lk)) + { + boost::throw_exception(future_uninitialized()); + } + this->future_->wait(lk, false); #ifdef BOOST_THREAD_PROVIDES_FUTURE_INVALID_AFTER_GET - this->future_.reset(); + this->future_->invalidate(lk); #endif - if (fut_->has_value()) { - return fut_->get(); + if (this->future_->has_value(lk)) { + return this->future_->get(lk); } else { return v; @@ -1736,43 +1683,55 @@ namespace boost // retrieving the value move_dest_type get() { - if(!this->future_) + if (this->future_ == 0) + { + boost::throw_exception(future_uninitialized()); + } + unique_lock<boost::mutex> lk(this->future_->mutex); + if (! this->future_->valid(lk)) { boost::throw_exception(future_uninitialized()); } - future_ptr fut_=this->future_; #ifdef BOOST_THREAD_PROVIDES_FUTURE_INVALID_AFTER_GET - this->future_.reset(); + this->future_->invalidate(lk); #endif - return fut_->get(); + return this->future_->get(lk); } move_dest_type get_or(BOOST_THREAD_RV_REF(R) v) // EXTENSION { - if(!this->future_) + if (this->future_ == 0) { boost::throw_exception(future_uninitialized()); } - this->future_->wait(false); - future_ptr fut_=this->future_; + unique_lock<boost::mutex> lk(this->future_->mutex); + if (! this->future_->valid(lk)) + { + boost::throw_exception(future_uninitialized()); + } + this->future_->wait(lk, false); #ifdef BOOST_THREAD_PROVIDES_FUTURE_INVALID_AFTER_GET - this->future_.reset(); + this->future_->invalidate(lk); #endif - if (fut_->has_value()) return fut_->get(); + if (this->future_->has_value(lk)) return this->future_->get(lk); else return boost::move(v); } move_dest_type get_or(R const& v) // EXTENSION { - if(!this->future_) + if (this->future_ == 0) { boost::throw_exception(future_uninitialized()); } - this->future_->wait(false); - future_ptr fut_=this->future_; + unique_lock<boost::mutex> lk(this->future_->mutex); + if (! this->future_->valid(lk)) + { + boost::throw_exception(future_uninitialized()); + } + this->future_->wait(lk, false); #ifdef BOOST_THREAD_PROVIDES_FUTURE_INVALID_AFTER_GET - this->future_.reset(); + this->future_->invalidate(lk); #endif - if (fut_->has_value()) return fut_->get(); + if (this->future_->has_value(lk)) return this->future_->get(lk); else return v; } @@ -1836,7 +1795,7 @@ namespace boost typedef R value_type; // EXTENSION shared_future(shared_future const& other): - base_type(other) + base_type(other.future_) {} typedef future_state::state state; @@ -1851,14 +1810,19 @@ namespace boost shared_future& operator=(BOOST_THREAD_COPY_ASSIGN_REF(shared_future) other) { - shared_future(other).swap(*this); + if (other.future_) { + other.future_->inc(); + } + if (this->future_) { + this->future_->dec(); + } + this->future_ = other.future_; return *this; } shared_future(BOOST_THREAD_RV_REF(shared_future) other) BOOST_NOEXCEPT : base_type(boost::move(static_cast<base_type&>(BOOST_THREAD_RV(other)))) { - BOOST_THREAD_RV(other).future_.reset(); } shared_future(BOOST_THREAD_RV_REF( BOOST_THREAD_FUTURE<R> ) other) BOOST_NOEXCEPT : base_type(boost::move(static_cast<base_type&>(BOOST_THREAD_RV(other)))) @@ -1887,7 +1851,7 @@ namespace boost return this->future_->run_if_is_deferred_or_ready(); } // retrieving the value - typename detail::shared_state<R>::shared_future_get_result_type get() + typename detail::shared_state<R>::shared_future_get_result_type get() const { if(!this->future_) { @@ -1898,15 +1862,14 @@ namespace boost template <typename R2> typename boost::disable_if< is_void<R2>, typename detail::shared_state<R>::shared_future_get_result_type>::type - get_or(BOOST_THREAD_RV_REF(R2) v) // EXTENSION + get_or(BOOST_THREAD_RV_REF(R2) v) const // EXTENSION { if(!this->future_) { boost::throw_exception(future_uninitialized()); } - future_ptr fut_=this->future_; - fut_->wait(); - if (fut_->has_value()) return fut_->get_sh(); + this->future_->wait(); + if (this->future_->has_value()) return this->future_->get_sh(); else return boost::move(v); } @@ -1928,16 +1891,6 @@ namespace boost BOOST_THREAD_DCL_MOVABLE_BEG(T) shared_future<T> BOOST_THREAD_DCL_MOVABLE_END - namespace detail - { - /// Copy construction from a shared_future - template <typename R> - inline basic_future<R>::basic_future(const shared_future<R>& other) BOOST_NOEXCEPT - : future_(other.future_) - { - } - } - template <typename R> class promise { @@ -2077,6 +2030,22 @@ namespace boost future_->mark_finished_with_result_internal(static_cast<rvalue_source_type>(r), lock); #endif } + +#if ! defined(BOOST_NO_CXX11_VARIADIC_TEMPLATES) + template <class ...Args> + void emplace(BOOST_THREAD_FWD_REF(Args) ...args) + { + lazy_init(); + boost::unique_lock<boost::mutex> lock(future_->mutex); + if(future_->done) + { + boost::throw_exception(promise_already_satisfied()); + } + future_->mark_finished_with_result_internal(lock, boost::forward<Args>(args)...); + } + +#endif + void set_exception(boost::exception_ptr p) { lazy_init(); @@ -2503,7 +2472,10 @@ namespace boost void reset() { + // todo The packaged_task::reset must be as if an assignemnt froma new packaged_task with the same function + // the reset function is an optimization that avoids reallocating a new task. started=false; + this->validate(); } #if defined BOOST_THREAD_PROVIDES_SIGNATURE_PACKAGED_TASK && defined(BOOST_THREAD_PROVIDES_VARIADIC_THREAD) virtual void do_run(BOOST_THREAD_RV_REF(ArgTypes) ... args)=0; @@ -2591,6 +2563,11 @@ namespace boost f(boost::move(f_)) {} + F callable() + { + return boost::move(f); + } + #if defined BOOST_THREAD_PROVIDES_SIGNATURE_PACKAGED_TASK && defined(BOOST_THREAD_PROVIDES_VARIADIC_THREAD) void do_apply(BOOST_THREAD_RV_REF(ArgTypes) ... args) { @@ -2606,12 +2583,6 @@ namespace boost this->set_value_at_thread_exit(f()); } #endif -#if defined BOOST_THREAD_PROVIDES_INTERRUPTIONS - catch(thread_interrupted& ) - { - this->set_interrupted_at_thread_exit(); - } -#endif catch(...) { this->set_exception_at_thread_exit(current_exception()); @@ -2638,12 +2609,6 @@ namespace boost #endif } #endif -#if defined BOOST_THREAD_PROVIDES_INTERRUPTIONS - catch(thread_interrupted& ) - { - this->mark_interrupted_finish(); - } -#endif catch(...) { this->mark_exceptional_finish(); @@ -2678,6 +2643,11 @@ namespace boost f(boost::move(f_)) {} + F callable() + { + return f; + } + #if defined BOOST_THREAD_PROVIDES_SIGNATURE_PACKAGED_TASK && defined(BOOST_THREAD_PROVIDES_VARIADIC_THREAD) void do_apply(BOOST_THREAD_RV_REF(ArgTypes) ... args) { @@ -2693,12 +2663,6 @@ namespace boost this->set_value_at_thread_exit(f()); } #endif -#if defined BOOST_THREAD_PROVIDES_INTERRUPTIONS - catch(thread_interrupted& ) - { - this->set_interrupted_at_thread_exit(); - } -#endif catch(...) { this->set_exception_at_thread_exit(current_exception()); @@ -2721,12 +2685,6 @@ namespace boost this->mark_finished_with_result(res); } #endif -#if defined BOOST_THREAD_PROVIDES_INTERRUPTIONS - catch(thread_interrupted& ) - { - this->mark_interrupted_finish(); - } -#endif catch(...) { this->mark_exceptional_finish(); @@ -2754,18 +2712,21 @@ namespace boost { private: task_shared_state(task_shared_state&); - public: #if defined BOOST_THREAD_PROVIDES_SIGNATURE_PACKAGED_TASK && defined(BOOST_THREAD_PROVIDES_VARIADIC_THREAD) - R (*f)(BOOST_THREAD_RV_REF(ArgTypes) ... ); - task_shared_state(R (*f_)(BOOST_THREAD_RV_REF(ArgTypes) ... )): - f(f_) - {} + typedef R (*CallableType)(BOOST_THREAD_RV_REF(ArgTypes) ... ); #else - R (*f)(); - task_shared_state(R (*f_)()): + typedef R (*CallableType)(); +#endif + public: + CallableType f; + task_shared_state(CallableType f_): f(f_) {} -#endif + + CallableType callable() + { + return f; + } #if defined BOOST_THREAD_PROVIDES_SIGNATURE_PACKAGED_TASK && defined(BOOST_THREAD_PROVIDES_VARIADIC_THREAD) void do_apply(BOOST_THREAD_RV_REF(ArgTypes) ... args) @@ -2783,12 +2744,6 @@ namespace boost this->set_value_at_thread_exit(boost::move(r)); } #endif -#if defined BOOST_THREAD_PROVIDES_INTERRUPTIONS - catch(thread_interrupted& ) - { - this->set_interrupted_at_thread_exit(); - } -#endif catch(...) { this->set_exception_at_thread_exit(current_exception()); @@ -2812,12 +2767,6 @@ namespace boost this->mark_finished_with_result(boost::move(res)); } #endif -#if defined BOOST_THREAD_PROVIDES_INTERRUPTIONS - catch(thread_interrupted& ) - { - this->mark_interrupted_finish(); - } -#endif catch(...) { this->mark_exceptional_finish(); @@ -2844,16 +2793,19 @@ namespace boost task_shared_state(task_shared_state&); public: #if defined BOOST_THREAD_PROVIDES_SIGNATURE_PACKAGED_TASK && defined(BOOST_THREAD_PROVIDES_VARIADIC_THREAD) - R& (*f)(BOOST_THREAD_RV_REF(ArgTypes) ... ); - task_shared_state(R& (*f_)(BOOST_THREAD_RV_REF(ArgTypes) ... )): - f(f_) - {} + typedef R& (*CallableType)(BOOST_THREAD_RV_REF(ArgTypes) ... ); #else - R& (*f)(); - task_shared_state(R& (*f_)()): + typedef R& (*CallableType)(); +#endif + CallableType f; + task_shared_state(CallableType f_): f(f_) {} -#endif + + CallableType callable() + { + return boost::move(f); + } #if defined BOOST_THREAD_PROVIDES_SIGNATURE_PACKAGED_TASK && defined(BOOST_THREAD_PROVIDES_VARIADIC_THREAD) void do_apply(BOOST_THREAD_RV_REF(ArgTypes) ... args) @@ -2870,12 +2822,6 @@ namespace boost this->set_value_at_thread_exit(f()); } #endif -#if defined BOOST_THREAD_PROVIDES_INTERRUPTIONS - catch(thread_interrupted& ) - { - this->set_interrupted_at_thread_exit(); - } -#endif catch(...) { this->set_exception_at_thread_exit(current_exception()); @@ -2898,12 +2844,6 @@ namespace boost this->mark_finished_with_result(f()); } #endif -#if defined BOOST_THREAD_PROVIDES_INTERRUPTIONS - catch(thread_interrupted& ) - { - this->mark_interrupted_finish(); - } -#endif catch(...) { this->mark_exceptional_finish(); @@ -2930,6 +2870,7 @@ namespace boost private: task_shared_state(task_shared_state&); public: + typedef F CallableType; F f; task_shared_state(F const& f_): f(f_) @@ -2937,7 +2878,10 @@ namespace boost task_shared_state(BOOST_THREAD_RV_REF(F) f_): f(boost::move(f_)) {} - + F callable() + { + return boost::move(f); + } #if defined BOOST_THREAD_PROVIDES_SIGNATURE_PACKAGED_TASK && defined(BOOST_THREAD_PROVIDES_VARIADIC_THREAD) void do_apply(BOOST_THREAD_RV_REF(ArgTypes) ... args) { @@ -2953,12 +2897,6 @@ namespace boost #endif this->set_value_at_thread_exit(); } -#if defined BOOST_THREAD_PROVIDES_INTERRUPTIONS - catch(thread_interrupted& ) - { - this->set_interrupted_at_thread_exit(); - } -#endif catch(...) { this->set_exception_at_thread_exit(current_exception()); @@ -2980,12 +2918,6 @@ namespace boost #endif this->mark_finished_with_result(); } -#if defined BOOST_THREAD_PROVIDES_INTERRUPTIONS - catch(thread_interrupted& ) - { - this->mark_interrupted_finish(); - } -#endif catch(...) { this->mark_exceptional_finish(); @@ -3011,12 +2943,20 @@ namespace boost { private: task_shared_state(task_shared_state&); +#if defined BOOST_THREAD_PROVIDES_SIGNATURE_PACKAGED_TASK && defined(BOOST_THREAD_PROVIDES_VARIADIC_THREAD) + typedef void (*CallableType)(BOOST_THREAD_RV_REF(ArgTypes)...); +#else + typedef void (*CallableType)(); +#endif public: - void (*f)(); - task_shared_state(void (*f_)()): + CallableType f; + task_shared_state(CallableType f_): f(f_) {} - + CallableType callable() + { + return f; + } #if defined BOOST_THREAD_PROVIDES_SIGNATURE_PACKAGED_TASK && defined(BOOST_THREAD_PROVIDES_VARIADIC_THREAD) void do_apply(BOOST_THREAD_RV_REF(ArgTypes) ... args) { @@ -3032,12 +2972,6 @@ namespace boost #endif this->set_value_at_thread_exit(); } -#if defined BOOST_THREAD_PROVIDES_INTERRUPTIONS - catch(thread_interrupted& ) - { - this->set_interrupted_at_thread_exit(); - } -#endif catch(...) { this->set_exception_at_thread_exit(current_exception()); @@ -3059,12 +2993,6 @@ namespace boost #endif this->mark_finished_with_result(); } -#if defined BOOST_THREAD_PROVIDES_INTERRUPTIONS - catch(thread_interrupted& ) - { - this->mark_interrupted_finish(); - } -#endif catch(...) { this->mark_exceptional_finish(); @@ -3277,11 +3205,7 @@ namespace boost A2 a2(a); typedef thread_detail::allocator_destructor<A2> D; -#if defined BOOST_THREAD_PROVIDES_SIGNATURE_PACKAGED_TASK && defined(BOOST_THREAD_PROVIDES_VARIADIC_THREAD) task = task_ptr(::new(a2.allocate(1)) task_shared_state_type(boost::move(f)), D(a2, 1) ); -#else - task = task_ptr(::new(a2.allocate(1)) task_shared_state_type(boost::move(f)), D(a2, 1) ); -#endif future_obtained = false; } @@ -3314,6 +3238,9 @@ namespace boost void reset() { if (!valid()) throw future_error(system::make_error_code(future_errc::no_state)); + + // As if *this = packaged_task(task->callable()); + task->reset(); future_obtained=false; } @@ -3415,7 +3342,8 @@ namespace detail BOOST_THREAD_FUTURE<Rp> make_future_async_shared_state(BOOST_THREAD_FWD_REF(Fp) f) { shared_ptr<future_async_shared_state<Rp, Fp> > - h(new future_async_shared_state<Rp, Fp>(boost::forward<Fp>(f))); + h(new future_async_shared_state<Rp, Fp>()); + h->init(boost::forward<Fp>(f)); return BOOST_THREAD_FUTURE<Rp>(h); } } @@ -3602,10 +3530,6 @@ namespace detail { void operator()() { try { that->mark_finished_with_result(f_()); -#if defined BOOST_THREAD_PROVIDES_INTERRUPTIONS - } catch(thread_interrupted& ) { - that->mark_interrupted_finish(); -#endif // defined BOOST_THREAD_PROVIDES_INTERRUPTIONS } catch(...) { that->mark_exceptional_finish(); } @@ -3653,10 +3577,6 @@ namespace detail { try { f_(); that->mark_finished_with_result(); -#if defined BOOST_THREAD_PROVIDES_INTERRUPTIONS - } catch(thread_interrupted& ) { - that->mark_interrupted_finish(); -#endif // defined BOOST_THREAD_PROVIDES_INTERRUPTIONS } catch(...) { that->mark_exceptional_finish(); } @@ -3702,10 +3622,6 @@ namespace detail { void operator()() { try { that->mark_finished_with_result(f_()); -#if defined BOOST_THREAD_PROVIDES_INTERRUPTIONS - } catch(thread_interrupted& ) { - that->mark_interrupted_finish(); -#endif // defined BOOST_THREAD_PROVIDES_INTERRUPTIONS } catch(...) { that->mark_exceptional_finish(); } @@ -3728,9 +3644,12 @@ namespace detail { ex.submit(boost::move(t)); } - ~future_executor_shared_state() { - this->wait(false); + virtual void block_if_needed(boost::unique_lock<boost::mutex>&lk) + { + this->wait(lk, false); } + + ~future_executor_shared_state() {} }; //////////////////////////////// @@ -3929,14 +3848,80 @@ namespace detail { //////////////////////////////// // make_ready_future //////////////////////////////// - template <typename T> - BOOST_THREAD_FUTURE<typename decay<T>::type> make_ready_future(BOOST_THREAD_FWD_REF(T) value) { - typedef typename decay<T>::type future_value_type; + namespace detail { + template <class T> + struct deduced_type_impl + { + typedef T type; + }; + + template <class T> + struct deduced_type_impl<reference_wrapper<T> const> + { + typedef T& type; + }; + template <class T> + struct deduced_type_impl<reference_wrapper<T> > + { + typedef T& type; + }; +#if __cplusplus > 201103L + template <class T> + struct deduced_type_impl<std::reference_wrapper<T> > + { + typedef T& type; + }; +#endif + template <class T> + struct deduced_type + { + typedef typename detail::deduced_type_impl<typename decay<T>::type>::type type; + }; + + } + + +#if ! defined(BOOST_NO_CXX11_VARIADIC_TEMPLATES) + template <int = 0, int..., class T> +#else + template <class T> +#endif + BOOST_THREAD_FUTURE<typename detail::deduced_type<T>::type> make_ready_future(BOOST_THREAD_FWD_REF(T) value) { + typedef typename detail::deduced_type<T>::type future_value_type; promise<future_value_type> p; - p.set_value(boost::forward<future_value_type>(value)); + p.set_value(boost::forward<T>(value)); return BOOST_THREAD_MAKE_RV_REF(p.get_future()); } + // explicit overloads + template <class T> + BOOST_THREAD_FUTURE<T> make_ready_future(typename remove_reference<T>::type & x) + { + promise<T> p; + p.set_value(x); + return p.get_future(); + } + + template <class T> + BOOST_THREAD_FUTURE<T> make_ready_future(BOOST_THREAD_FWD_REF(typename remove_reference<T>::type) x) + { + promise<T> p; + p.set_value(forward<typename remove_reference<T>::type>(x)); + return p.get_future(); + } + + // variadic overload +#if ! defined(BOOST_NO_CXX11_VARIADIC_TEMPLATES) + template <class T, class ...Args> + BOOST_THREAD_FUTURE<T> make_ready_future(Args&&... args) + { + promise<T> p; + p.emplace(forward<Args>(args)...); + return p.get_future(); + + } +#endif + template <typename T, typename T1> BOOST_THREAD_FUTURE<T> make_ready_no_decay_future(T1 value) { typedef T future_value_type; @@ -3945,11 +3930,11 @@ namespace detail { return BOOST_THREAD_MAKE_RV_REF(p.get_future()); } -#if defined BOOST_THREAD_USES_MOVE +#if ! defined(BOOST_NO_CXX11_VARIADIC_TEMPLATES) || defined BOOST_THREAD_USES_MOVE inline BOOST_THREAD_FUTURE<void> make_ready_future() { promise<void> p; p.set_value(); - return BOOST_THREAD_MAKE_RV_REF(p.get_future()); + return p.get_future(); } #endif @@ -4038,31 +4023,30 @@ namespace detail { F parent; Fp continuation; + shared_ptr<shared_state_base> centinel; public: future_async_continuation_shared_state(BOOST_THREAD_RV_REF(F) f, BOOST_THREAD_FWD_REF(Fp) c) : parent(boost::move(f)), - continuation(boost::move(c)) { + continuation(boost::move(c)), + centinel(parent.future_) { } - void launch_continuation(boost::unique_lock<boost::mutex>& ) { - this->thr_ = thread(&future_async_continuation_shared_state::run, this); + + void launch_continuation(boost::unique_lock<boost::mutex>&, shared_ptr<shared_state_base> that) { + this->thr_ = thread(&future_async_continuation_shared_state::run, that); } - static void run(future_async_continuation_shared_state* that) { + static void run(shared_ptr<boost::detail::shared_state_base> that_) { + future_async_continuation_shared_state* that = dynamic_cast<future_async_continuation_shared_state*>(that_.get()); try { that->mark_finished_with_result(that->continuation(boost::move(that->parent))); -#if defined BOOST_THREAD_PROVIDES_INTERRUPTIONS - } catch(thread_interrupted& ) { - that->mark_interrupted_finish(); -#endif } catch(...) { that->mark_exceptional_finish(); } } ~future_async_continuation_shared_state() { - this->join(); } }; @@ -4071,33 +4055,31 @@ namespace detail { F parent; Fp continuation; + shared_ptr<shared_state_base> centinel; public: future_async_continuation_shared_state(BOOST_THREAD_RV_REF(F) f, BOOST_THREAD_FWD_REF(Fp) c) : parent(boost::move(f)), - continuation(boost::move(c)) { + continuation(boost::move(c)), + centinel(parent.future_) { } - void launch_continuation(boost::unique_lock<boost::mutex>& ) { - this->thr_ = thread(&future_async_continuation_shared_state::run, this); + void launch_continuation(boost::unique_lock<boost::mutex>&, shared_ptr<shared_state_base> that) { + this->thr_ = thread(&future_async_continuation_shared_state::run, that); } - static void run(future_async_continuation_shared_state* that) { + static void run(shared_ptr<boost::detail::shared_state_base> that_) { + future_async_continuation_shared_state* that = dynamic_cast<future_async_continuation_shared_state*>(that_.get()); try { that->continuation(boost::move(that->parent)); that->mark_finished_with_result(); -#if defined BOOST_THREAD_PROVIDES_INTERRUPTIONS - } catch(thread_interrupted& ) { - that->mark_interrupted_finish(); -#endif } catch(...) { that->mark_exceptional_finish(); } } - ~future_async_continuation_shared_state() { - this->join(); - } + ~future_async_continuation_shared_state() {} + }; ///////////////////////// @@ -4105,12 +4087,16 @@ namespace detail ///////////////////////// #ifdef BOOST_THREAD_PROVIDES_EXECUTORS - template <typename Ex> + template <typename FutureExecutorContinuationSharedState> struct run_it { - Ex* that; + shared_ptr<boost::detail::shared_state_base> that_; - run_it(Ex* that) : that (that) {} - void operator()() { that->run(that); } + run_it(shared_ptr<boost::detail::shared_state_base> that) : that_ (that) {} + void operator()() + { + FutureExecutorContinuationSharedState* that = dynamic_cast<FutureExecutorContinuationSharedState*>(that_.get()); + that->run(that_); + } }; template<typename Ex, typename F, typename Rp, typename Fp> @@ -4119,34 +4105,37 @@ namespace detail Ex* ex; F parent; Fp continuation; + shared_ptr<shared_state_base> centinel; public: future_executor_continuation_shared_state(Ex& ex, BOOST_THREAD_RV_REF(F) f, BOOST_THREAD_FWD_REF(Fp) c) : ex(&ex), parent(boost::move(f)), - continuation(boost::move(c)) { + continuation(boost::move(c)), + centinel(parent.future_) { this->set_executor(); } - void launch_continuation(boost::unique_lock<boost::mutex>& ) { - run_it<future_executor_continuation_shared_state> fct(this); + void launch_continuation(boost::unique_lock<boost::mutex>& lck, shared_ptr<shared_state_base> that ) { + relocker relock(lck); + run_it<future_executor_continuation_shared_state> fct(that); ex->submit(fct); } - static void run(future_executor_continuation_shared_state* that) { + static void run(shared_ptr<boost::detail::shared_state_base> that_) { + future_executor_continuation_shared_state* that = dynamic_cast<future_executor_continuation_shared_state*>(that_.get()); try { that->mark_finished_with_result(that->continuation(boost::move(that->parent))); -#if defined BOOST_THREAD_PROVIDES_INTERRUPTIONS - } catch(thread_interrupted& ) { - that->mark_interrupted_finish(); -#endif } catch(...) { that->mark_exceptional_finish(); } } - ~future_executor_continuation_shared_state() { - this->wait(false); + virtual void block_if_needed(boost::unique_lock<boost::mutex>&lk) + { + this->wait(lk, false); } + + ~future_executor_continuation_shared_state() {} }; template<typename Ex, typename F, typename Fp> @@ -4155,35 +4144,38 @@ namespace detail Ex* ex; F parent; Fp continuation; + shared_ptr<shared_state_base> centinel; public: future_executor_continuation_shared_state(Ex& ex, BOOST_THREAD_RV_REF(F) f, BOOST_THREAD_FWD_REF(Fp) c) : ex(&ex), parent(boost::move(f)), - continuation(boost::move(c)) { + continuation(boost::move(c)), + centinel(parent.future_) { this->set_executor(); } - void launch_continuation(boost::unique_lock<boost::mutex>& ) { - run_it<future_executor_continuation_shared_state> fct(this); + void launch_continuation(boost::unique_lock<boost::mutex>& lck, shared_ptr<shared_state_base> that ) { + relocker relock(lck); + run_it<future_executor_continuation_shared_state> fct(that); ex->submit(fct); } - static void run(future_executor_continuation_shared_state* that) { + static void run(shared_ptr<boost::detail::shared_state_base> that_) { + future_executor_continuation_shared_state* that = dynamic_cast<future_executor_continuation_shared_state*>(that_.get()); try { that->continuation(boost::move(that->parent)); that->mark_finished_with_result(); -#if defined BOOST_THREAD_PROVIDES_INTERRUPTIONS - } catch(thread_interrupted& ) { - that->mark_interrupted_finish(); -#endif } catch(...) { that->mark_exceptional_finish(); } } - ~future_executor_continuation_shared_state() { - this->wait(false); + virtual void block_if_needed(boost::unique_lock<boost::mutex>&lk) + { + this->wait(lk, false); } + + ~future_executor_continuation_shared_state() {} }; #endif @@ -4196,32 +4188,29 @@ namespace detail { F parent; Fp continuation; + shared_ptr<shared_state_base> centinel; public: shared_future_async_continuation_shared_state(F f, BOOST_THREAD_FWD_REF(Fp) c) : parent(f), - continuation(boost::move(c)) { + continuation(boost::move(c)), + centinel(parent.future_) { } - void launch_continuation(boost::unique_lock<boost::mutex>& ) { - this->thr_ = thread(&shared_future_async_continuation_shared_state::run, this); + void launch_continuation(boost::unique_lock<boost::mutex>&, shared_ptr<shared_state_base> that) { + this->thr_ = thread(&shared_future_async_continuation_shared_state::run, that); } - static void run(shared_future_async_continuation_shared_state* that) { + static void run(shared_ptr<boost::detail::shared_state_base> that_) { + shared_future_async_continuation_shared_state* that = dynamic_cast<shared_future_async_continuation_shared_state*>(that_.get()); try { that->mark_finished_with_result(that->continuation(that->parent)); -#if defined BOOST_THREAD_PROVIDES_INTERRUPTIONS - } catch(thread_interrupted& ) { - that->mark_interrupted_finish(); -#endif } catch(...) { that->mark_exceptional_finish(); } } - ~shared_future_async_continuation_shared_state() { - this->join(); - } + ~shared_future_async_continuation_shared_state() {} }; template<typename F, typename Fp> @@ -4229,33 +4218,30 @@ namespace detail { F parent; Fp continuation; + shared_ptr<shared_state_base> centinel; public: shared_future_async_continuation_shared_state(F f, BOOST_THREAD_FWD_REF(Fp) c) : parent(f), - continuation(boost::move(c)) { + continuation(boost::move(c)), + centinel(parent.future_) { } - void launch_continuation(boost::unique_lock<boost::mutex>& ) { - this->thr_ = thread(&shared_future_async_continuation_shared_state::run, this); + void launch_continuation(boost::unique_lock<boost::mutex>&, shared_ptr<shared_state_base> that) { + this->thr_ = thread(&shared_future_async_continuation_shared_state::run, that); } - static void run(shared_future_async_continuation_shared_state* that) { + static void run(shared_ptr<boost::detail::shared_state_base> that_) { + shared_future_async_continuation_shared_state* that = dynamic_cast<shared_future_async_continuation_shared_state*>(that_.get()); try { that->continuation(that->parent); that->mark_finished_with_result(); -#if defined BOOST_THREAD_PROVIDES_INTERRUPTIONS - } catch(thread_interrupted& ) { - that->mark_interrupted_finish(); -#endif } catch(...) { that->mark_exceptional_finish(); } } - ~shared_future_async_continuation_shared_state() { - this->join(); - } + ~shared_future_async_continuation_shared_state() {} }; ///////////////////////// @@ -4269,34 +4255,37 @@ namespace detail Ex* ex; F parent; Fp continuation; + shared_ptr<shared_state_base> centinel; public: shared_future_executor_continuation_shared_state(Ex& ex, F f, BOOST_THREAD_FWD_REF(Fp) c) : ex(&ex), parent(f), - continuation(boost::move(c)) { + continuation(boost::move(c)), + centinel(parent.future_) { this->set_executor(); } - void launch_continuation(boost::unique_lock<boost::mutex>& ) { - run_it<shared_future_executor_continuation_shared_state> fct(this); + void launch_continuation(boost::unique_lock<boost::mutex>& lck, shared_ptr<shared_state_base> that) { + relocker relock(lck); + run_it<shared_future_executor_continuation_shared_state> fct(that); ex->submit(fct); } - static void run(shared_future_executor_continuation_shared_state* that) { + static void run(shared_ptr<boost::detail::shared_state_base> that_) { + shared_future_executor_continuation_shared_state* that = dynamic_cast<shared_future_executor_continuation_shared_state*>(that_.get()); try { that->mark_finished_with_result(that->continuation(that->parent)); -#if defined BOOST_THREAD_PROVIDES_INTERRUPTIONS - } catch(thread_interrupted& ) { - that->mark_interrupted_finish(); -#endif } catch(...) { that->mark_exceptional_finish(); } } - ~shared_future_executor_continuation_shared_state() { - this->wait(false); + virtual void block_if_needed(boost::unique_lock<boost::mutex>&lk) + { + this->wait(lk, false); } + + ~shared_future_executor_continuation_shared_state() {} }; template<typename Ex, typename F, typename Fp> @@ -4305,34 +4294,37 @@ namespace detail Ex* ex; F parent; Fp continuation; + shared_ptr<shared_state_base> centinel; public: shared_future_executor_continuation_shared_state(Ex& ex, F f, BOOST_THREAD_FWD_REF(Fp) c) : ex(&ex), parent(f), - continuation(boost::move(c)) { + continuation(boost::move(c)), + centinel(parent.future_) { } - void launch_continuation(boost::unique_lock<boost::mutex>& ) { - run_it<shared_future_executor_continuation_shared_state> fct(this); + void launch_continuation(boost::unique_lock<boost::mutex>& lck, shared_ptr<shared_state_base> that) { + relocker relock(lck); + run_it<shared_future_executor_continuation_shared_state> fct(that); ex->submit(fct); } - static void run(shared_future_executor_continuation_shared_state* that) { + static void run(shared_ptr<boost::detail::shared_state_base> that_) { + shared_future_executor_continuation_shared_state* that = dynamic_cast<shared_future_executor_continuation_shared_state*>(that_.get()); try { that->continuation(that->parent); that->mark_finished_with_result(); -#if defined BOOST_THREAD_PROVIDES_INTERRUPTIONS - } catch(thread_interrupted& ) { - that->mark_interrupted_finish(); -#endif } catch(...) { that->mark_exceptional_finish(); } } - ~shared_future_executor_continuation_shared_state() { - this->wait(false); + virtual void block_if_needed(boost::unique_lock<boost::mutex>&lk) + { + this->wait(lk, false); } + + ~shared_future_executor_continuation_shared_state() {} }; #endif ////////////////////////// @@ -4343,15 +4335,21 @@ namespace detail { F parent; Fp continuation; + shared_ptr<shared_state_base> centinel; public: future_deferred_continuation_shared_state(BOOST_THREAD_RV_REF(F) f, BOOST_THREAD_FWD_REF(Fp) c) : parent(boost::move(f)), - continuation(boost::move(c)) { + continuation(boost::move(c)), + centinel(parent.future_) { this->set_deferred(); } - virtual void launch_continuation(boost::unique_lock<boost::mutex>& ) { + virtual void launch_continuation(boost::unique_lock<boost::mutex>&lk, shared_ptr<shared_state_base> ) { + if (this->is_deferred_) { + this->is_deferred_=false; + this->execute(lk); + } } virtual void execute(boost::unique_lock<boost::mutex>& lck) { @@ -4373,15 +4371,23 @@ namespace detail { F parent; Fp continuation; + shared_ptr<shared_state_base> centinel; public: future_deferred_continuation_shared_state(BOOST_THREAD_RV_REF(F) f, BOOST_THREAD_FWD_REF(Fp) c) : parent(boost::move(f)), - continuation(boost::move(c)) { + continuation(boost::move(c)), + centinel(parent.future_) { this->set_deferred(); } - virtual void launch_continuation(boost::unique_lock<boost::mutex>& ) { + ~future_deferred_continuation_shared_state() { + } + virtual void launch_continuation(boost::unique_lock<boost::mutex>& lk, shared_ptr<shared_state_base> ) { + if (this->is_deferred_) { + this->is_deferred_=false; + this->execute(lk); + } } virtual void execute(boost::unique_lock<boost::mutex>& lck) { @@ -4406,15 +4412,21 @@ namespace detail { F parent; Fp continuation; + shared_ptr<shared_state_base> centinel; public: shared_future_deferred_continuation_shared_state(F f, BOOST_THREAD_FWD_REF(Fp) c) : parent(f), - continuation(boost::move(c)) { + continuation(boost::move(c)), + centinel(parent.future_) { this->set_deferred(); } - virtual void launch_continuation(boost::unique_lock<boost::mutex>& ) { + virtual void launch_continuation(boost::unique_lock<boost::mutex>& lk, shared_ptr<shared_state_base> ) { + if (this->is_deferred_) { + this->is_deferred_=false; + this->execute(lk); + } } virtual void execute(boost::unique_lock<boost::mutex>& lck) { @@ -4436,15 +4448,21 @@ namespace detail { F parent; Fp continuation; + shared_ptr<shared_state_base> centinel; public: shared_future_deferred_continuation_shared_state(F f, BOOST_THREAD_FWD_REF(Fp) c) : parent(f), - continuation(boost::move(c)) { + continuation(boost::move(c)), + centinel(parent.future_) { this->set_deferred(); } - virtual void launch_continuation(boost::unique_lock<boost::mutex>& ) { + virtual void launch_continuation(boost::unique_lock<boost::mutex>& lk, shared_ptr<shared_state_base> ) { + if (this->is_deferred_) { + this->is_deferred_=false; + this->execute(lk); + } } virtual void execute(boost::unique_lock<boost::mutex>& lck) { @@ -4471,7 +4489,9 @@ namespace detail BOOST_THREAD_RV_REF(F) f, BOOST_THREAD_FWD_REF(Fp) c) { shared_ptr<future_deferred_continuation_shared_state<F, Rp, Fp> > h(new future_deferred_continuation_shared_state<F, Rp, Fp>(boost::move(f), boost::forward<Fp>(c))); + lock.lock(); h->parent.future_->set_continuation_ptr(h, lock); + lock.unlock(); return BOOST_THREAD_FUTURE<Rp>(h); } @@ -4485,7 +4505,9 @@ namespace detail BOOST_THREAD_FWD_REF(Fp) c) { shared_ptr<future_async_continuation_shared_state<F,Rp, Fp> > h(new future_async_continuation_shared_state<F,Rp, Fp>(boost::move(f), boost::forward<Fp>(c))); + lock.lock(); h->parent.future_->set_continuation_ptr(h, lock); + lock.unlock(); return BOOST_THREAD_FUTURE<Rp>(h); } @@ -4502,7 +4524,9 @@ namespace detail BOOST_THREAD_FWD_REF(Fp) c) { shared_ptr<future_executor_continuation_shared_state<Ex,F,Rp, Fp> > h(new future_executor_continuation_shared_state<Ex, F,Rp, Fp>(ex, boost::move(f), boost::forward<Fp>(c))); + lock.lock(); h->parent.future_->set_continuation_ptr(h, lock); + lock.unlock(); return BOOST_THREAD_FUTURE<Rp>(h); } @@ -4518,7 +4542,10 @@ namespace detail F f, BOOST_THREAD_FWD_REF(Fp) c) { shared_ptr<shared_future_deferred_continuation_shared_state<F, Rp, Fp> > h(new shared_future_deferred_continuation_shared_state<F, Rp, Fp>(f, boost::forward<Fp>(c))); + lock.lock(); h->parent.future_->set_continuation_ptr(h, lock); + lock.unlock(); + return BOOST_THREAD_FUTURE<Rp>(h); } //////////////////////////////// @@ -4531,7 +4558,9 @@ namespace detail BOOST_THREAD_FWD_REF(Fp) c) { shared_ptr<shared_future_async_continuation_shared_state<F,Rp, Fp> > h(new shared_future_async_continuation_shared_state<F,Rp, Fp>(f, boost::forward<Fp>(c))); + lock.lock(); h->parent.future_->set_continuation_ptr(h, lock); + lock.unlock(); return BOOST_THREAD_FUTURE<Rp>(h); } @@ -4546,7 +4575,9 @@ namespace detail BOOST_THREAD_FWD_REF(Fp) c) { shared_ptr<shared_future_executor_continuation_shared_state<Ex, F, Rp, Fp> > h(new shared_future_executor_continuation_shared_state<Ex, F, Rp, Fp>(ex, f, boost::forward<Fp>(c))); + lock.lock(); h->parent.future_->set_continuation_ptr(h, lock); + lock.unlock(); return BOOST_THREAD_FUTURE<Rp>(h); } @@ -4567,14 +4598,18 @@ namespace detail boost::unique_lock<boost::mutex> lock(this->future_->mutex); if (underlying_cast<int>(policy) & int(launch::async)) { + lock.unlock(); return BOOST_THREAD_MAKE_RV_REF((boost::detail::make_future_async_continuation_shared_state<BOOST_THREAD_FUTURE<R>, future_type, F>( lock, boost::move(*this), boost::forward<F>(func) ))); } else if (underlying_cast<int>(policy) & int(launch::deferred)) { + this->future_->wait_internal(lock); + lock.unlock(); return BOOST_THREAD_MAKE_RV_REF((boost::detail::make_future_deferred_continuation_shared_state<BOOST_THREAD_FUTURE<R>, future_type, F>( lock, boost::move(*this), boost::forward<F>(func) ))); } else { + lock.unlock(); return BOOST_THREAD_MAKE_RV_REF((boost::detail::make_future_async_continuation_shared_state<BOOST_THREAD_FUTURE<R>, future_type, F>( lock, boost::move(*this), boost::forward<F>(func) ))); @@ -4589,6 +4624,7 @@ namespace detail BOOST_THREAD_ASSERT_PRECONDITION(this->future_!=0, future_uninitialized()); boost::unique_lock<boost::mutex> lock(this->future_->mutex); + lock.unlock(); return BOOST_THREAD_MAKE_RV_REF((boost::detail::make_future_executor_continuation_shared_state<Ex, BOOST_THREAD_FUTURE<R>, future_type, F>(ex, lock, boost::move(*this), boost::forward<F>(func) ))); @@ -4603,6 +4639,77 @@ namespace detail boost::unique_lock<boost::mutex> lock(this->future_->mutex); if (underlying_cast<int>(this->launch_policy(lock)) & int(launch::async)) { + lock.unlock(); + return boost::detail::make_future_async_continuation_shared_state<BOOST_THREAD_FUTURE<R>, future_type, F>( + lock, boost::move(*this), boost::forward<F>(func) + ); + } else if (underlying_cast<int>(this->launch_policy(lock)) & int(launch::deferred)) { + this->future_->wait_internal(lock); + lock.unlock(); + return boost::detail::make_future_deferred_continuation_shared_state<BOOST_THREAD_FUTURE<R>, future_type, F>( + lock, boost::move(*this), boost::forward<F>(func) + ); + } else { + lock.unlock(); + return boost::detail::make_future_async_continuation_shared_state<BOOST_THREAD_FUTURE<R>, future_type, F>( + lock, boost::move(*this), boost::forward<F>(func) + ); + } + } + + //////////////////////////////// + // template<typename F> + // auto future<future<R2>>::then(F&& func) -> BOOST_THREAD_FUTURE<decltype(func(*this))>; + //////////////////////////////// + + template <typename R2> + template <typename F> + inline BOOST_THREAD_FUTURE<typename boost::result_of<F(BOOST_THREAD_FUTURE<BOOST_THREAD_FUTURE<R2> >)>::type> + BOOST_THREAD_FUTURE<BOOST_THREAD_FUTURE<R2> >::then(launch policy, BOOST_THREAD_FWD_REF(F) func) { + typedef BOOST_THREAD_FUTURE<R2> R; + typedef typename boost::result_of<F(BOOST_THREAD_FUTURE<R>)>::type future_type; + BOOST_THREAD_ASSERT_PRECONDITION(this->future_!=0, future_uninitialized()); + + boost::unique_lock<boost::mutex> lock(this->future_->mutex); + if (underlying_cast<int>(policy) & int(launch::async)) { + return BOOST_THREAD_MAKE_RV_REF((boost::detail::make_future_async_continuation_shared_state<BOOST_THREAD_FUTURE<R>, future_type, F>( + lock, boost::move(*this), boost::forward<F>(func) + ))); + } else if (underlying_cast<int>(policy) & int(launch::deferred)) { + return BOOST_THREAD_MAKE_RV_REF((boost::detail::make_future_deferred_continuation_shared_state<BOOST_THREAD_FUTURE<R>, future_type, F>( + lock, boost::move(*this), boost::forward<F>(func) + ))); + } else { + return BOOST_THREAD_MAKE_RV_REF((boost::detail::make_future_async_continuation_shared_state<BOOST_THREAD_FUTURE<R>, future_type, F>( + lock, boost::move(*this), boost::forward<F>(func) + ))); + } + } +#ifdef BOOST_THREAD_PROVIDES_EXECUTORS + template <typename R2> + template <typename Ex, typename F> + inline BOOST_THREAD_FUTURE<typename boost::result_of<F(BOOST_THREAD_FUTURE<BOOST_THREAD_FUTURE<R2> >)>::type> + BOOST_THREAD_FUTURE<BOOST_THREAD_FUTURE<R2> >::then(Ex& ex, BOOST_THREAD_FWD_REF(F) func) { + typedef BOOST_THREAD_FUTURE<R2> R; + typedef typename boost::result_of<F(BOOST_THREAD_FUTURE<R>)>::type future_type; + BOOST_THREAD_ASSERT_PRECONDITION(this->future_!=0, future_uninitialized()); + + boost::unique_lock<boost::mutex> lock(this->future_->mutex); + return BOOST_THREAD_MAKE_RV_REF((boost::detail::make_future_executor_continuation_shared_state<Ex, BOOST_THREAD_FUTURE<R>, future_type, F>(ex, + lock, boost::move(*this), boost::forward<F>(func) + ))); + } +#endif + template <typename R2> + template <typename F> + inline BOOST_THREAD_FUTURE<typename boost::result_of<F(BOOST_THREAD_FUTURE<BOOST_THREAD_FUTURE<R2> >)>::type> + BOOST_THREAD_FUTURE<BOOST_THREAD_FUTURE<R2> >::then(BOOST_THREAD_FWD_REF(F) func) { + typedef BOOST_THREAD_FUTURE<R2> R; + typedef typename boost::result_of<F(BOOST_THREAD_FUTURE<R>)>::type future_type; + BOOST_THREAD_ASSERT_PRECONDITION(this->future_!=0, future_uninitialized()); + + boost::unique_lock<boost::mutex> lock(this->future_->mutex); + if (underlying_cast<int>(this->launch_policy(lock)) & int(launch::async)) { return boost::detail::make_future_async_continuation_shared_state<BOOST_THREAD_FUTURE<R>, future_type, F>( lock, boost::move(*this), boost::forward<F>(func) ); @@ -4618,6 +4725,11 @@ namespace detail } } + //////////////////////////////// + // template<typename F> + // auto shared_future<R>::then(F&& func) -> BOOST_THREAD_FUTURE<decltype(func(*this))>; + //////////////////////////////// + template <typename R> template <typename F> inline BOOST_THREAD_FUTURE<typename boost::result_of<F(shared_future<R>)>::type> @@ -4627,16 +4739,19 @@ namespace detail BOOST_THREAD_ASSERT_PRECONDITION(this->future_!=0, future_uninitialized()); boost::unique_lock<boost::mutex> lock(this->future_->mutex); - if (underlying_cast<int>(policy) & int(launch::async)) { + lock.unlock(); return BOOST_THREAD_MAKE_RV_REF((boost::detail::make_shared_future_async_continuation_shared_state<shared_future<R>, future_type, F>( lock, *this, boost::forward<F>(func) ))); } else if (underlying_cast<int>(policy) & int(launch::deferred)) { + this->future_->wait_internal(lock); + lock.unlock(); return BOOST_THREAD_MAKE_RV_REF((boost::detail::make_shared_future_deferred_continuation_shared_state<shared_future<R>, future_type, F>( lock, *this, boost::forward<F>(func) ))); } else { + lock.unlock(); return BOOST_THREAD_MAKE_RV_REF((boost::detail::make_shared_future_async_continuation_shared_state<shared_future<R>, future_type, F>( lock, *this, boost::forward<F>(func) ))); @@ -4652,6 +4767,7 @@ namespace detail BOOST_THREAD_ASSERT_PRECONDITION(this->future_!=0, future_uninitialized()); boost::unique_lock<boost::mutex> lock(this->future_->mutex); + lock.unlock(); return BOOST_THREAD_MAKE_RV_REF((boost::detail::make_shared_future_executor_continuation_shared_state<Ex, shared_future<R>, future_type, F>(ex, lock, *this, boost::forward<F>(func) ))); @@ -4668,13 +4784,16 @@ namespace detail boost::unique_lock<boost::mutex> lock(this->future_->mutex); if (underlying_cast<int>(this->launch_policy(lock)) & int(launch::async)) { + lock.unlock(); return boost::detail::make_shared_future_async_continuation_shared_state<shared_future<R>, future_type, F>( lock, *this, boost::forward<F>(func)); } else if (underlying_cast<int>(this->launch_policy(lock)) & int(launch::deferred)) { this->future_->wait_internal(lock); + lock.unlock(); return boost::detail::make_shared_future_deferred_continuation_shared_state<shared_future<R>, future_type, F>( lock, *this, boost::forward<F>(func)); } else { + lock.unlock(); return boost::detail::make_shared_future_async_continuation_shared_state<shared_future<R>, future_type, F>( lock, *this, boost::forward<F>(func)); } @@ -4704,7 +4823,7 @@ namespace detail : value_(v) {} - T operator()(BOOST_THREAD_FUTURE<T> fut) { + T operator()(BOOST_THREAD_FUTURE<T> fut) const { return fut.get_or(value_); } @@ -4751,14 +4870,21 @@ namespace detail return boost::move(r); } - virtual void wait(bool ) { // todo see if rethrow must be used - boost::unique_lock<boost::mutex> lk(this->mutex); + virtual void wait(boost::unique_lock<boost::mutex>& lk, bool ) { // todo see if rethrow must be used parent_value(lk).wait(); } - virtual Rp get() { - boost::unique_lock<boost::mutex> lk(this->mutex); + virtual Rp get(boost::unique_lock<boost::mutex>& lk) { return parent_value(lk).get(); } +#if defined BOOST_THREAD_PROVIDES_FUTURE_CONTINUATION + typedef shared_ptr<shared_state_base> continuation_ptr_type; + + virtual void set_continuation_ptr(continuation_ptr_type continuation, boost::unique_lock<boost::mutex>& lock) + { + boost::unique_lock<boost::mutex> lk(parent.future_->mutex); + parent.future_->set_continuation_ptr(continuation, lk); + } +#endif }; template <class F, class Rp> @@ -4766,7 +4892,9 @@ namespace detail make_future_unwrap_shared_state(boost::unique_lock<boost::mutex> &lock, BOOST_THREAD_RV_REF(F) f) { shared_ptr<future_unwrap_shared_state<F, Rp> > h(new future_unwrap_shared_state<F, Rp>(boost::move(f))); + lock.lock(); h->parent.future_->set_continuation_ptr(h, lock); + lock.unlock(); return BOOST_THREAD_FUTURE<Rp>(h); } } @@ -4781,6 +4909,7 @@ namespace detail { BOOST_THREAD_ASSERT_PRECONDITION(this->future_!=0, future_uninitialized()); boost::unique_lock<boost::mutex> lock(this->future_->mutex); + lock.unlock(); return boost::detail::make_future_unwrap_shared_state<BOOST_THREAD_FUTURE<BOOST_THREAD_FUTURE<R2> >, R2>(lock, boost::move(*this)); } #endif @@ -4807,14 +4936,11 @@ namespace detail typedef typename F::value_type value_type; vector_type vec_; - static void run(future_when_all_vector_shared_state* that) { + static void run(shared_ptr<boost::detail::shared_state_base> that_) { + future_when_all_vector_shared_state* that = dynamic_cast<future_when_all_vector_shared_state*>(that_.get()); try { boost::wait_for_all(that->vec_.begin(), that->vec_.end()); that->mark_finished_with_result(boost::move(that->vec_)); -#if defined BOOST_THREAD_PROVIDES_INTERRUPTIONS - } catch(thread_interrupted& ) { - that->mark_interrupted_finish(); -#endif } catch(...) { that->mark_exceptional_finish(); } @@ -4833,10 +4959,10 @@ namespace detail void init() { if (! run_deferred()) { - future_when_all_vector_shared_state::run(this); + future_when_all_vector_shared_state::run(this->shared_from_this()); return; } - this->thr_ = thread(&future_when_all_vector_shared_state::run, this); + this->thr_ = thread(&future_when_all_vector_shared_state::run, this->shared_from_this()); } public: @@ -4844,13 +4970,11 @@ namespace detail future_when_all_vector_shared_state(input_iterator_tag, InputIterator first, InputIterator last) : vec_(std::make_move_iterator(first), std::make_move_iterator(last)) { - init(); } future_when_all_vector_shared_state(vector_tag, BOOST_THREAD_RV_REF(csbl::vector<F>) v) : vec_(boost::move(v)) { - init(); } #if ! defined(BOOST_NO_CXX11_VARIADIC_TEMPLATES) @@ -4862,12 +4986,10 @@ namespace detail vec_.push_back(boost::forward<T>(futures)),'0' )..., '0' }; //second part of magic unpacker - init(); } #endif - ~future_when_all_vector_shared_state() { - this->join(); - } + + ~future_when_all_vector_shared_state() {} }; //////////////////////////////// @@ -4880,15 +5002,12 @@ namespace detail typedef typename F::value_type value_type; vector_type vec_; - static void run(future_when_any_vector_shared_state* that) + static void run(shared_ptr<boost::detail::shared_state_base> that_) { + future_when_any_vector_shared_state* that = dynamic_cast<future_when_any_vector_shared_state*>(that_.get()); try { boost::wait_for_any(that->vec_.begin(), that->vec_.end()); that->mark_finished_with_result(boost::move(that->vec_)); -#if defined BOOST_THREAD_PROVIDES_INTERRUPTIONS - } catch(thread_interrupted& ) { - that->mark_interrupted_finish(); -#endif } catch(...) { that->mark_exceptional_finish(); } @@ -4906,11 +5025,11 @@ namespace detail void init() { if (run_deferred()) { - future_when_any_vector_shared_state::run(this); + future_when_any_vector_shared_state::run(this->shared_from_this()); return; } - this->thr_ = thread(&future_when_any_vector_shared_state::run, this); + this->thr_ = thread(&future_when_any_vector_shared_state::run, this->shared_from_this()); } public: @@ -4918,13 +5037,11 @@ namespace detail future_when_any_vector_shared_state(input_iterator_tag, InputIterator first, InputIterator last) : vec_(std::make_move_iterator(first), std::make_move_iterator(last)) { - init(); } future_when_any_vector_shared_state(vector_tag, BOOST_THREAD_RV_REF(csbl::vector<F>) v) : vec_(boost::move(v)) { - init(); } #if ! defined(BOOST_NO_CXX11_VARIADIC_TEMPLATES) @@ -4940,13 +5057,10 @@ namespace detail )..., '0' }; //second part of magic unpacker - init(); } #endif - ~future_when_any_vector_shared_state() { - this->join(); - } + ~future_when_any_vector_shared_state() {} }; #if ! defined(BOOST_NO_CXX11_VARIADIC_TEMPLATES) @@ -4987,16 +5101,13 @@ namespace detail Tuple tup_; typedef typename make_tuple_indices<1+sizeof...(T)>::type Index; - static void run(future_when_all_tuple_shared_state* that) { + static void run(shared_ptr<boost::detail::shared_state_base> that_) { + future_when_all_tuple_shared_state* that = dynamic_cast<future_when_all_tuple_shared_state*>(that_.get()); try { // TODO make use of apply(that->tup_, boost::detail::wait_for_all_fctor()); that->wait_for_all(Index()); that->mark_finished_with_result(boost::move(that->tup_)); -#if defined BOOST_THREAD_PROVIDES_INTERRUPTIONS - } catch(thread_interrupted& ) { - that->mark_interrupted_finish(); -#endif } catch(...) { that->mark_exceptional_finish(); } @@ -5018,23 +5129,21 @@ namespace detail void init() { if (! run_deferred()) { - future_when_all_tuple_shared_state::run(this); + future_when_all_tuple_shared_state::run(this->shared_from_this()); return; } - this->thr_ = thread(&future_when_all_tuple_shared_state::run, this); + this->thr_ = thread(&future_when_all_tuple_shared_state::run, this->shared_from_this()); } public: template< typename F, typename ...Fs> future_when_all_tuple_shared_state(values_tag, BOOST_THREAD_FWD_REF(F) f, BOOST_THREAD_FWD_REF(Fs) ... futures) : tup_(boost::csbl::make_tuple(boost::forward<F>(f), boost::forward<Fs>(futures)...)) { - init(); - } - ~future_when_all_tuple_shared_state() { - this->join(); } + ~future_when_all_tuple_shared_state() {} + }; @@ -5060,17 +5169,14 @@ namespace detail Tuple tup_; typedef typename make_tuple_indices<1+sizeof...(T)>::type Index; - static void run(future_when_any_tuple_shared_state* that) + static void run(shared_ptr<boost::detail::shared_state_base> that_) { + future_when_any_tuple_shared_state* that = dynamic_cast<future_when_any_tuple_shared_state*>(that_.get()); try { // TODO make use of apply(that->tup_, wait_for_any_fctr); that->wait_for_any(Index()); that->mark_finished_with_result(boost::move(that->tup_)); -#if defined BOOST_THREAD_PROVIDES_INTERRUPTIONS - } catch(thread_interrupted& ) { - that->mark_interrupted_finish(); -#endif } catch(...) { that->mark_exceptional_finish(); } @@ -5089,11 +5195,11 @@ namespace detail void init() { if (run_deferred()) { - future_when_any_tuple_shared_state::run(this); + future_when_any_tuple_shared_state::run(this->shared_from_this()); return; } - this->thr_ = thread(&future_when_any_tuple_shared_state::run, this); + this->thr_ = thread(&future_when_any_tuple_shared_state::run, this->shared_from_this()); } public: @@ -5103,12 +5209,9 @@ namespace detail ) : tup_(boost::csbl::make_tuple(boost::forward<F>(f), boost::forward<Fs>(futures)...)) { - init(); } - ~future_when_any_tuple_shared_state() { - this->join(); - } + ~future_when_any_tuple_shared_state() {} }; #endif @@ -5126,6 +5229,7 @@ namespace detail if (first==last) return make_ready_future(container_type()); shared_ptr<factory_type > h(new factory_type(detail::input_iterator_tag_value, first,last)); + h->init(); return BOOST_THREAD_FUTURE<container_type>(h); } @@ -5142,6 +5246,7 @@ namespace detail shared_ptr<factory_type> h(new factory_type(detail::values_tag_value, boost::forward<T0>(f), boost::forward<T>(futures)...)); + h->init(); return BOOST_THREAD_FUTURE<container_type>(h); } #endif @@ -5158,6 +5263,7 @@ namespace detail if (first==last) return make_ready_future(container_type()); shared_ptr<factory_type > h(new factory_type(detail::input_iterator_tag_value, first,last)); + h->init(); return BOOST_THREAD_FUTURE<container_type>(h); } @@ -5174,6 +5280,7 @@ namespace detail shared_ptr<factory_type> h(new factory_type(detail::values_tag_value, boost::forward<T0>(f), boost::forward<T>(futures)...)); + h->init(); return BOOST_THREAD_FUTURE<container_type>(h); } #endif |