From b8cf34c691623e4ec329053cbbf68522a855882d Mon Sep 17 00:00:00 2001 From: DongHun Kwak Date: Thu, 5 Dec 2019 15:12:59 +0900 Subject: Imported Upstream version 1.67.0 --- .../concurrent_queues/detail/sync_deque_base.hpp | 50 +++--- .../concurrent_queues/detail/sync_queue_base.hpp | 50 +++--- boost/thread/concurrent_queues/sync_deque.hpp | 12 +- .../concurrent_queues/sync_priority_queue.hpp | 24 ++- boost/thread/concurrent_queues/sync_queue.hpp | 20 +-- .../thread/concurrent_queues/sync_timed_queue.hpp | 180 +++++++-------------- 6 files changed, 131 insertions(+), 205 deletions(-) (limited to 'boost/thread/concurrent_queues') diff --git a/boost/thread/concurrent_queues/detail/sync_deque_base.hpp b/boost/thread/concurrent_queues/detail/sync_deque_base.hpp index beffaf73b1..e3ecad15de 100644 --- a/boost/thread/concurrent_queues/detail/sync_deque_base.hpp +++ b/boost/thread/concurrent_queues/detail/sync_deque_base.hpp @@ -11,6 +11,8 @@ // ////////////////////////////////////////////////////////////////////////////// +#include + #include #include #include @@ -84,10 +86,13 @@ namespace detail inline void throw_if_closed(unique_lock&); inline void throw_if_closed(lock_guard&); - inline void wait_until_not_empty(unique_lock& lk); + inline bool not_empty_or_closed(unique_lock& ) const; + inline bool wait_until_not_empty_or_closed(unique_lock& lk); template - queue_op_status wait_until_not_empty_until(unique_lock& lk, chrono::time_point const&); + queue_op_status wait_until_not_empty_or_closed_until(unique_lock& lk, chrono::time_point const&tp); + template + queue_op_status wait_until_closed_until(unique_lock& lk, chrono::time_point const&tp); inline void notify_not_empty_if_needed(unique_lock& ) { @@ -176,39 +181,38 @@ namespace detail } template - void sync_deque_base::wait_until_not_empty(unique_lock& lk) + bool sync_deque_base::not_empty_or_closed(unique_lock& ) const { - for (;;) - { - if (! empty(lk)) break; - throw_if_closed(lk); - not_empty_.wait(lk); - } + return ! data_.empty() || closed_; } + template bool sync_deque_base::wait_until_not_empty_or_closed(unique_lock& lk) { - for (;;) - { - if (! empty(lk)) break; - if (closed(lk)) return true; - not_empty_.wait(lk); - } - return false; + not_empty_.wait(lk, boost::bind(&sync_deque_base::not_empty_or_closed, boost::ref(*this), boost::ref(lk))); + if (! empty(lk)) return false; // success + return true; // closed } template template - queue_op_status sync_deque_base::wait_until_not_empty_until(unique_lock& lk, chrono::time_point const&tp) + queue_op_status sync_deque_base::wait_until_not_empty_or_closed_until(unique_lock& lk, chrono::time_point const&tp) { - for (;;) - { - if (! empty(lk)) return queue_op_status::success; - throw_if_closed(lk); - if (not_empty_.wait_until(lk, tp) == cv_status::timeout ) return queue_op_status::timeout; - } + if (! not_empty_.wait_until(lk, tp, boost::bind(&sync_deque_base::not_empty_or_closed, boost::ref(*this), boost::ref(lk)))) + return queue_op_status::timeout; + if (! empty(lk)) return queue_op_status::success; + return queue_op_status::closed; } + template + template + queue_op_status sync_deque_base::wait_until_closed_until(unique_lock& lk, chrono::time_point const&tp) + { + bool (sync_queue_base::*closed_function_ptr)(unique_lock&) const = &sync_queue_base::closed; + if (! not_empty_.wait_until(lk, tp, boost::bind(closed_function_ptr, boost::ref(*this), boost::ref(lk)))) + return queue_op_status::timeout; + return queue_op_status::closed; + } } // detail } // concurrent diff --git a/boost/thread/concurrent_queues/detail/sync_queue_base.hpp b/boost/thread/concurrent_queues/detail/sync_queue_base.hpp index 6ecb8211e3..c570da9505 100644 --- a/boost/thread/concurrent_queues/detail/sync_queue_base.hpp +++ b/boost/thread/concurrent_queues/detail/sync_queue_base.hpp @@ -11,6 +11,8 @@ // ////////////////////////////////////////////////////////////////////////////// +#include + #include #include #include @@ -84,10 +86,13 @@ namespace detail inline void throw_if_closed(unique_lock&); inline void throw_if_closed(lock_guard&); - inline void wait_until_not_empty(unique_lock& lk); + inline bool not_empty_or_closed(unique_lock& ) const; + inline bool wait_until_not_empty_or_closed(unique_lock& lk); template - queue_op_status wait_until_not_empty_until(unique_lock& lk, chrono::time_point const&); + queue_op_status wait_until_not_empty_or_closed_until(unique_lock& lk, chrono::time_point const&tp); + template + queue_op_status wait_until_closed_until(unique_lock& lk, chrono::time_point const&tp); inline void notify_not_empty_if_needed(unique_lock& ) { @@ -176,39 +181,38 @@ namespace detail } template - void sync_queue_base::wait_until_not_empty(unique_lock& lk) + bool sync_queue_base::not_empty_or_closed(unique_lock& ) const { - for (;;) - { - if (! empty(lk)) break; - throw_if_closed(lk); - not_empty_.wait(lk); - } + return ! data_.empty() || closed_; } + template bool sync_queue_base::wait_until_not_empty_or_closed(unique_lock& lk) { - for (;;) - { - if (! empty(lk)) break; - if (closed(lk)) return true; - not_empty_.wait(lk); - } - return false; + not_empty_.wait(lk, boost::bind(&sync_queue_base::not_empty_or_closed, boost::ref(*this), boost::ref(lk))); + if (! empty(lk)) return false; // success + return true; // closed } template template - queue_op_status sync_queue_base::wait_until_not_empty_until(unique_lock& lk, chrono::time_point const&tp) + queue_op_status sync_queue_base::wait_until_not_empty_or_closed_until(unique_lock& lk, chrono::time_point const&tp) { - for (;;) - { - if (! empty(lk)) return queue_op_status::success; - throw_if_closed(lk); - if (not_empty_.wait_until(lk, tp) == cv_status::timeout ) return queue_op_status::timeout; - } + if (! not_empty_.wait_until(lk, tp, boost::bind(&sync_queue_base::not_empty_or_closed, boost::ref(*this), boost::ref(lk)))) + return queue_op_status::timeout; + if (! empty(lk)) return queue_op_status::success; + return queue_op_status::closed; } + template + template + queue_op_status sync_queue_base::wait_until_closed_until(unique_lock& lk, chrono::time_point const&tp) + { + bool (sync_queue_base::*closed_function_ptr)(unique_lock&) const = &sync_queue_base::closed; + if (! not_empty_.wait_until(lk, tp, boost::bind(closed_function_ptr, boost::ref(*this), boost::ref(lk)))) + return queue_op_status::timeout; + return queue_op_status::closed; + } } // detail } // concurrent diff --git a/boost/thread/concurrent_queues/sync_deque.hpp b/boost/thread/concurrent_queues/sync_deque.hpp index c84dae022a..c6159c4be0 100644 --- a/boost/thread/concurrent_queues/sync_deque.hpp +++ b/boost/thread/concurrent_queues/sync_deque.hpp @@ -149,11 +149,7 @@ namespace concurrent template queue_op_status sync_deque::wait_pull_front(ValueType& elem, unique_lock& lk) { - if (super::empty(lk)) - { - if (super::closed(lk)) return queue_op_status::closed; - } - bool has_been_closed = super::wait_until_not_empty_or_closed(lk); + const bool has_been_closed = super::wait_until_not_empty_or_closed(lk); if (has_been_closed) return queue_op_status::closed; pull_front(elem, lk); return queue_op_status::success; @@ -188,7 +184,8 @@ namespace concurrent void sync_deque::pull_front(ValueType& elem) { unique_lock lk(super::mtx_); - super::wait_until_not_empty(lk); + const bool has_been_closed = super::wait_until_not_empty_or_closed(lk); + if (has_been_closed) super::throw_if_closed(lk); pull_front(elem, lk); } @@ -197,7 +194,8 @@ namespace concurrent ValueType sync_deque::pull_front() { unique_lock lk(super::mtx_); - super::wait_until_not_empty(lk); + const bool has_been_closed = super::wait_until_not_empty_or_closed(lk); + if (has_been_closed) super::throw_if_closed(lk); return pull_front(lk); } diff --git a/boost/thread/concurrent_queues/sync_priority_queue.hpp b/boost/thread/concurrent_queues/sync_priority_queue.hpp index a556910cd6..2c6881f90f 100644 --- a/boost/thread/concurrent_queues/sync_priority_queue.hpp +++ b/boost/thread/concurrent_queues/sync_priority_queue.hpp @@ -82,7 +82,7 @@ namespace detail { return boost::move(result); } - Type const& top() + Type const& top() const { return _elements.front(); } @@ -249,7 +249,8 @@ namespace concurrent T sync_priority_queue::pull() { unique_lock lk(super::mtx_); - super::wait_until_not_empty(lk); + const bool has_been_closed = super::wait_until_not_empty_or_closed(lk); + if (has_been_closed) super::throw_if_closed(lk); return pull(lk); } @@ -269,7 +270,8 @@ namespace concurrent void sync_priority_queue::pull(T& elem) { unique_lock lk(super::mtx_); - super::wait_until_not_empty(lk); + const bool has_been_closed = super::wait_until_not_empty_or_closed(lk); + if (has_been_closed) super::throw_if_closed(lk); pull(lk, elem); } @@ -280,10 +282,9 @@ namespace concurrent sync_priority_queue::pull_until(const chrono::time_point& tp, T& elem) { unique_lock lk(super::mtx_); - if (queue_op_status::timeout == super::wait_until_not_empty_until(lk, tp)) - return queue_op_status::timeout; - pull(lk, elem); - return queue_op_status::success; + const queue_op_status rc = super::wait_until_not_empty_or_closed_until(lk, tp); + if (rc == queue_op_status::success) pull(lk, elem); + return rc; } ////////////////////// @@ -292,7 +293,7 @@ namespace concurrent queue_op_status sync_priority_queue::pull_for(const chrono::duration& dura, T& elem) { - return pull_until(clock::now() + dura, elem); + return pull_until(chrono::steady_clock::now() + dura, elem); } ////////////////////// @@ -334,11 +335,7 @@ namespace concurrent template queue_op_status sync_priority_queue::wait_pull(unique_lock& lk, T& elem) { - if (super::empty(lk)) - { - if (super::closed(lk)) return queue_op_status::closed; - } - bool has_been_closed = super::wait_until_not_empty_or_closed(lk); + const bool has_been_closed = super::wait_until_not_empty_or_closed(lk); if (has_been_closed) return queue_op_status::closed; pull(lk, elem); return queue_op_status::success; @@ -352,7 +349,6 @@ namespace concurrent } ////////////////////// - template queue_op_status sync_priority_queue::nonblocking_pull(T& elem) { diff --git a/boost/thread/concurrent_queues/sync_queue.hpp b/boost/thread/concurrent_queues/sync_queue.hpp index 1dbbef05dd..b36b57e607 100644 --- a/boost/thread/concurrent_queues/sync_queue.hpp +++ b/boost/thread/concurrent_queues/sync_queue.hpp @@ -10,7 +10,6 @@ // See http://www.boost.org/libs/thread for documentation. // ////////////////////////////////////////////////////////////////////////////// -#include #include #include @@ -51,7 +50,6 @@ namespace concurrent inline ~sync_queue(); // Modifiers - inline void push(const value_type& x); inline queue_op_status try_push(const value_type& x); inline queue_op_status nonblocking_push(const value_type& x); @@ -151,19 +149,9 @@ namespace concurrent template queue_op_status sync_queue::wait_pull(ValueType& elem, unique_lock& lk) { - //std::cout << __FILE__ << "[" << __LINE__ << "]" << std::endl; - if (super::empty(lk)) - { - //std::cout << __FILE__ << "[" << __LINE__ << "]" << std::endl; - if (super::closed(lk)) return queue_op_status::closed; - } - //std::cout << __FILE__ << "[" << __LINE__ << "]" << std::endl; - bool has_been_closed = super::wait_until_not_empty_or_closed(lk); - //std::cout << __FILE__ << "[" << __LINE__ << "]" << std::endl; + const bool has_been_closed = super::wait_until_not_empty_or_closed(lk); if (has_been_closed) return queue_op_status::closed; - //std::cout << __FILE__ << "[" << __LINE__ << "]" << std::endl; pull(elem, lk); - //std::cout << __FILE__ << "[" << __LINE__ << "]" << std::endl; return queue_op_status::success; } @@ -196,7 +184,8 @@ namespace concurrent void sync_queue::pull(ValueType& elem) { unique_lock lk(super::mtx_); - super::wait_until_not_empty(lk); + const bool has_been_closed = super::wait_until_not_empty_or_closed(lk); + if (has_been_closed) super::throw_if_closed(lk); pull(elem, lk); } @@ -205,7 +194,8 @@ namespace concurrent ValueType sync_queue::pull() { unique_lock lk(super::mtx_); - super::wait_until_not_empty(lk); + const bool has_been_closed = super::wait_until_not_empty_or_closed(lk); + if (has_been_closed) super::throw_if_closed(lk); return pull(lk); } diff --git a/boost/thread/concurrent_queues/sync_timed_queue.hpp b/boost/thread/concurrent_queues/sync_timed_queue.hpp index 596a625ef5..a4394c0729 100644 --- a/boost/thread/concurrent_queues/sync_timed_queue.hpp +++ b/boost/thread/concurrent_queues/sync_timed_queue.hpp @@ -53,11 +53,6 @@ namespace detail return *this; } - bool time_not_reached() const - { - return time > clock::now(); - } - bool operator <(const scheduled_type & other) const { return this->time > other.time; @@ -123,6 +118,13 @@ namespace detail queue_op_status try_push(BOOST_THREAD_RV_REF(T) elem, chrono::duration const& dura); private: + inline bool not_empty_and_time_reached(unique_lock& lk) const; + inline bool not_empty_and_time_reached(lock_guard& lk) const; + + bool wait_to_pull(unique_lock&); + template + queue_op_status wait_to_pull_until(unique_lock&, chrono::time_point const& tp); + T pull(unique_lock&); T pull(lock_guard&); @@ -134,15 +136,6 @@ namespace detail queue_op_status wait_pull(unique_lock& lk, T& elem); - bool wait_until_not_empty_time_reached_or_closed(unique_lock&); - T pull_when_time_reached(unique_lock&); - template - queue_op_status pull_when_time_reached_until(unique_lock&, chrono::time_point const& tp, T& elem); - bool time_not_reached(unique_lock&); - bool time_not_reached(lock_guard&); - bool empty_or_time_not_reached(unique_lock&); - bool empty_or_time_not_reached(lock_guard&); - sync_timed_queue(const sync_timed_queue&); sync_timed_queue& operator=(const sync_timed_queue&); sync_timed_queue(BOOST_THREAD_RV_REF(sync_timed_queue)); @@ -210,82 +203,55 @@ namespace detail /////////////////////////// template - bool sync_timed_queue::time_not_reached(unique_lock&) + bool sync_timed_queue::not_empty_and_time_reached(unique_lock& lk) const { - return super::data_.top().time_not_reached(); + return ! super::empty(lk) && clock::now() >= super::data_.top().time; } template - bool sync_timed_queue::time_not_reached(lock_guard&) + bool sync_timed_queue::not_empty_and_time_reached(lock_guard& lk) const { - return super::data_.top().time_not_reached(); + return ! super::empty(lk) && clock::now() >= super::data_.top().time; } /////////////////////////// template - bool sync_timed_queue::wait_until_not_empty_time_reached_or_closed(unique_lock& lk) + bool sync_timed_queue::wait_to_pull(unique_lock& lk) { for (;;) { - if (super::closed(lk)) return true; - while (! super::empty(lk)) { - if (! time_not_reached(lk)) return false; - time_point tp = super::data_.top().time; - super::not_empty_.wait_until(lk, tp); - if (super::closed(lk)) return true; - } - if (super::closed(lk)) return true; - super::not_empty_.wait(lk); - } - //return false; - } + if (not_empty_and_time_reached(lk)) return false; // success + if (super::closed(lk)) return true; // closed - /////////////////////////// - template - T sync_timed_queue::pull_when_time_reached(unique_lock& lk) - { - while (time_not_reached(lk)) - { - super::throw_if_closed(lk); - time_point tp = super::data_.top().time; - super::not_empty_.wait_until(lk,tp); - super::wait_until_not_empty(lk); + super::wait_until_not_empty_or_closed(lk); + + if (not_empty_and_time_reached(lk)) return false; // success + if (super::closed(lk)) return true; // closed + + const time_point tp(super::data_.top().time); + super::wait_until_closed_until(lk, tp); } - return pull(lk); } template template - queue_op_status - sync_timed_queue::pull_when_time_reached_until(unique_lock& lk, chrono::time_point const& tp, T& elem) + queue_op_status sync_timed_queue::wait_to_pull_until(unique_lock& lk, chrono::time_point const& tp) { - chrono::time_point tpmin = (tp < super::data_.top().time) ? tp : super::data_.top().time; - while (time_not_reached(lk)) + for (;;) { - super::throw_if_closed(lk); - if (cv_status::timeout == super::not_empty_.wait_until(lk, tpmin)) { - if (time_not_reached(lk)) return queue_op_status::not_ready; - return queue_op_status::timeout; - } - } - pull(lk, elem); - return queue_op_status::success; - } + if (not_empty_and_time_reached(lk)) return queue_op_status::success; + if (super::closed(lk)) return queue_op_status::closed; + if (clock::now() >= tp) return super::empty(lk) ? queue_op_status::timeout : queue_op_status::not_ready; - /////////////////////////// - template - bool sync_timed_queue::empty_or_time_not_reached(unique_lock& lk) - { - if ( super::empty(lk) ) return true; - if ( time_not_reached(lk) ) return true; - return false; - } - template - bool sync_timed_queue::empty_or_time_not_reached(lock_guard& lk) - { - if ( super::empty(lk) ) return true; - if ( time_not_reached(lk) ) return true; - return false; + super::wait_until_not_empty_or_closed_until(lk, tp); + + if (not_empty_and_time_reached(lk)) return queue_op_status::success; + if (super::closed(lk)) return queue_op_status::closed; + if (clock::now() >= tp) return super::empty(lk) ? queue_op_status::timeout : queue_op_status::not_ready; + + const time_point tpmin(tp < super::data_.top().time ? tp : super::data_.top().time); + super::wait_until_closed_until(lk, tpmin); + } } /////////////////////////// @@ -312,8 +278,9 @@ namespace detail T sync_timed_queue::pull() { unique_lock lk(super::mtx_); - super::wait_until_not_empty(lk); - return pull_when_time_reached(lk); + const bool has_been_closed = wait_to_pull(lk); + if (has_been_closed) super::throw_if_closed(lk); + return pull(lk); } /////////////////////////// @@ -341,8 +308,9 @@ namespace detail void sync_timed_queue::pull(T& elem) { unique_lock lk(super::mtx_); - super::wait_until_not_empty(lk); - elem = pull_when_time_reached(lk); + const bool has_been_closed = wait_to_pull(lk); + if (has_been_closed) super::throw_if_closed(lk); + pull(lk, elem); } ////////////////////// @@ -352,10 +320,9 @@ namespace detail sync_timed_queue::pull_until(chrono::time_point const& tp, T& elem) { unique_lock lk(super::mtx_); - - if (queue_op_status::timeout == super::wait_until_not_empty_until(lk, tp)) - return queue_op_status::timeout; - return pull_when_time_reached_until(lk, tp, elem); + const queue_op_status rc = wait_to_pull_until(lk, tp); + if (rc == queue_op_status::success) pull(lk, elem); + return rc; } ////////////////////// @@ -371,35 +338,26 @@ namespace detail template queue_op_status sync_timed_queue::try_pull(unique_lock& lk, T& elem) { - if ( super::empty(lk) ) + if (not_empty_and_time_reached(lk)) { - if (super::closed(lk)) return queue_op_status::closed; - return queue_op_status::empty; - } - if ( time_not_reached(lk) ) - { - if (super::closed(lk)) return queue_op_status::closed; - return queue_op_status::not_ready; + pull(lk, elem); + return queue_op_status::success; } - - pull(lk, elem); - return queue_op_status::success; + if (super::closed(lk)) return queue_op_status::closed; + if (super::empty(lk)) return queue_op_status::empty; + return queue_op_status::not_ready; } template queue_op_status sync_timed_queue::try_pull(lock_guard& lk, T& elem) { - if ( super::empty(lk) ) + if (not_empty_and_time_reached(lk)) { - if (super::closed(lk)) return queue_op_status::closed; - return queue_op_status::empty; - } - if ( time_not_reached(lk) ) - { - if (super::closed(lk)) return queue_op_status::closed; - return queue_op_status::not_ready; + pull(lk, elem); + return queue_op_status::success; } - pull(lk, elem); - return queue_op_status::success; + if (super::closed(lk)) return queue_op_status::closed; + if (super::empty(lk)) return queue_op_status::empty; + return queue_op_status::not_ready; } template @@ -413,11 +371,7 @@ namespace detail template queue_op_status sync_timed_queue::wait_pull(unique_lock& lk, T& elem) { - if (super::empty(lk)) - { - if (super::closed(lk)) return queue_op_status::closed; - } - bool has_been_closed = wait_until_not_empty_time_reached_or_closed(lk); + const bool has_been_closed = wait_to_pull(lk); if (has_been_closed) return queue_op_status::closed; pull(lk, elem); return queue_op_status::success; @@ -430,26 +384,6 @@ namespace detail return wait_pull(lk, elem); } -// /////////////////////////// -// template -// queue_op_status sync_timed_queue::wait_pull(unique_lock &lk, T& elem) -// { -// if (super::empty(lk)) -// { -// if (super::closed(lk)) return queue_op_status::closed; -// } -// bool has_been_closed = super::wait_until_not_empty_or_closed(lk); -// if (has_been_closed) return queue_op_status::closed; -// pull(lk, elem); -// return queue_op_status::success; -// } -// template -// queue_op_status sync_timed_queue::wait_pull(T& elem) -// { -// unique_lock lk(super::mtx_); -// return wait_pull(lk, elem); -// } - /////////////////////////// template queue_op_status sync_timed_queue::nonblocking_pull(T& elem) -- cgit v1.2.3