diff options
Diffstat (limited to 'boost/thread/concurrent_queues')
6 files changed, 106 insertions, 64 deletions
diff --git a/boost/thread/concurrent_queues/detail/sync_deque_base.hpp b/boost/thread/concurrent_queues/detail/sync_deque_base.hpp index e3ecad15de..704efc70c5 100644 --- a/boost/thread/concurrent_queues/detail/sync_deque_base.hpp +++ b/boost/thread/concurrent_queues/detail/sync_deque_base.hpp @@ -63,7 +63,7 @@ namespace detail protected: mutable mutex mtx_; - condition_variable not_empty_; + condition_variable cond_; underlying_queue_type data_; bool closed_; @@ -91,16 +91,14 @@ namespace detail inline bool wait_until_not_empty_or_closed(unique_lock<mutex>& lk); template <class WClock, class Duration> queue_op_status wait_until_not_empty_or_closed_until(unique_lock<mutex>& lk, chrono::time_point<WClock,Duration> const&tp); - template <class WClock, class Duration> - queue_op_status wait_until_closed_until(unique_lock<mutex>& lk, chrono::time_point<WClock,Duration> const&tp); - inline void notify_not_empty_if_needed(unique_lock<mutex>& ) + inline void notify_elem_added(unique_lock<mutex>& ) { - not_empty_.notify_one(); + cond_.notify_all(); } - inline void notify_not_empty_if_needed(lock_guard<mutex>& ) + inline void notify_elem_added(lock_guard<mutex>& ) { - not_empty_.notify_one(); + cond_.notify_all(); } }; @@ -124,7 +122,7 @@ namespace detail lock_guard<mutex> lk(mtx_); closed_ = true; } - not_empty_.notify_all(); + cond_.notify_all(); } template <class ValueType, class Queue> @@ -189,7 +187,7 @@ namespace detail template <class ValueType, class Queue> bool sync_deque_base<ValueType, Queue>::wait_until_not_empty_or_closed(unique_lock<mutex>& lk) { - not_empty_.wait(lk, boost::bind(&sync_deque_base<ValueType, Queue>::not_empty_or_closed, boost::ref(*this), boost::ref(lk))); + cond_.wait(lk, boost::bind(&sync_deque_base<ValueType, Queue>::not_empty_or_closed, boost::ref(*this), boost::ref(lk))); if (! empty(lk)) return false; // success return true; // closed } @@ -198,22 +196,12 @@ namespace detail template <class WClock, class Duration> queue_op_status sync_deque_base<ValueType, Queue>::wait_until_not_empty_or_closed_until(unique_lock<mutex>& lk, chrono::time_point<WClock,Duration> const&tp) { - if (! not_empty_.wait_until(lk, tp, boost::bind(&sync_deque_base<ValueType, Queue>::not_empty_or_closed, boost::ref(*this), boost::ref(lk)))) + if (! cond_.wait_until(lk, tp, boost::bind(&sync_deque_base<ValueType, Queue>::not_empty_or_closed, boost::ref(*this), boost::ref(lk)))) return queue_op_status::timeout; if (! empty(lk)) return queue_op_status::success; return queue_op_status::closed; } - template <class ValueType, class Queue> - template <class WClock, class Duration> - queue_op_status sync_deque_base<ValueType, Queue>::wait_until_closed_until(unique_lock<mutex>& lk, chrono::time_point<WClock,Duration> const&tp) - { - bool (sync_queue_base<ValueType, Queue>::*closed_function_ptr)(unique_lock<mutex>&) const = &sync_queue_base<ValueType, Queue>::closed; - if (! not_empty_.wait_until(lk, tp, boost::bind(closed_function_ptr, boost::ref(*this), boost::ref(lk)))) - return queue_op_status::timeout; - return queue_op_status::closed; - } - } // detail } // concurrent } // boost diff --git a/boost/thread/concurrent_queues/detail/sync_queue_base.hpp b/boost/thread/concurrent_queues/detail/sync_queue_base.hpp index c570da9505..26d4dcd14a 100644 --- a/boost/thread/concurrent_queues/detail/sync_queue_base.hpp +++ b/boost/thread/concurrent_queues/detail/sync_queue_base.hpp @@ -63,7 +63,7 @@ namespace detail protected: mutable mutex mtx_; - condition_variable not_empty_; + condition_variable cond_; underlying_queue_type data_; bool closed_; @@ -91,16 +91,14 @@ namespace detail inline bool wait_until_not_empty_or_closed(unique_lock<mutex>& lk); template <class WClock, class Duration> queue_op_status wait_until_not_empty_or_closed_until(unique_lock<mutex>& lk, chrono::time_point<WClock,Duration> const&tp); - template <class WClock, class Duration> - queue_op_status wait_until_closed_until(unique_lock<mutex>& lk, chrono::time_point<WClock,Duration> const&tp); - inline void notify_not_empty_if_needed(unique_lock<mutex>& ) + inline void notify_elem_added(unique_lock<mutex>& ) { - not_empty_.notify_one(); + cond_.notify_all(); } - inline void notify_not_empty_if_needed(lock_guard<mutex>& ) + inline void notify_elem_added(lock_guard<mutex>& ) { - not_empty_.notify_one(); + cond_.notify_all(); } }; @@ -124,7 +122,7 @@ namespace detail lock_guard<mutex> lk(mtx_); closed_ = true; } - not_empty_.notify_all(); + cond_.notify_all(); } template <class ValueType, class Queue> @@ -189,7 +187,7 @@ namespace detail template <class ValueType, class Queue> bool sync_queue_base<ValueType, Queue>::wait_until_not_empty_or_closed(unique_lock<mutex>& lk) { - not_empty_.wait(lk, boost::bind(&sync_queue_base<ValueType, Queue>::not_empty_or_closed, boost::ref(*this), boost::ref(lk))); + cond_.wait(lk, boost::bind(&sync_queue_base<ValueType, Queue>::not_empty_or_closed, boost::ref(*this), boost::ref(lk))); if (! empty(lk)) return false; // success return true; // closed } @@ -198,22 +196,12 @@ namespace detail template <class WClock, class Duration> queue_op_status sync_queue_base<ValueType, Queue>::wait_until_not_empty_or_closed_until(unique_lock<mutex>& lk, chrono::time_point<WClock,Duration> const&tp) { - if (! not_empty_.wait_until(lk, tp, boost::bind(&sync_queue_base<ValueType, Queue>::not_empty_or_closed, boost::ref(*this), boost::ref(lk)))) + if (! cond_.wait_until(lk, tp, boost::bind(&sync_queue_base<ValueType, Queue>::not_empty_or_closed, boost::ref(*this), boost::ref(lk)))) return queue_op_status::timeout; if (! empty(lk)) return queue_op_status::success; return queue_op_status::closed; } - template <class ValueType, class Queue> - template <class WClock, class Duration> - queue_op_status sync_queue_base<ValueType, Queue>::wait_until_closed_until(unique_lock<mutex>& lk, chrono::time_point<WClock,Duration> const&tp) - { - bool (sync_queue_base<ValueType, Queue>::*closed_function_ptr)(unique_lock<mutex>&) const = &sync_queue_base<ValueType, Queue>::closed; - if (! not_empty_.wait_until(lk, tp, boost::bind(closed_function_ptr, boost::ref(*this), boost::ref(lk)))) - return queue_op_status::timeout; - return queue_op_status::closed; - } - } // detail } // concurrent } // boost diff --git a/boost/thread/concurrent_queues/sync_deque.hpp b/boost/thread/concurrent_queues/sync_deque.hpp index c6159c4be0..95e36409bd 100644 --- a/boost/thread/concurrent_queues/sync_deque.hpp +++ b/boost/thread/concurrent_queues/sync_deque.hpp @@ -92,13 +92,13 @@ namespace concurrent inline void push_back(const value_type& elem, unique_lock<mutex>& lk) { super::data_.push_back(elem); - super::notify_not_empty_if_needed(lk); + super::notify_elem_added(lk); } inline void push_back(BOOST_THREAD_RV_REF(value_type) elem, unique_lock<mutex>& lk) { super::data_.push_back(boost::move(elem)); - super::notify_not_empty_if_needed(lk); + super::notify_elem_added(lk); } }; @@ -122,7 +122,7 @@ namespace concurrent // { // data_.push(boost::move(*cur));; // } -// notify_not_empty_if_needed(lk); +// notify_elem_added(lk); // } // catch (...) // { diff --git a/boost/thread/concurrent_queues/sync_priority_queue.hpp b/boost/thread/concurrent_queues/sync_priority_queue.hpp index 2c6881f90f..f5676335c7 100644 --- a/boost/thread/concurrent_queues/sync_priority_queue.hpp +++ b/boost/thread/concurrent_queues/sync_priority_queue.hpp @@ -174,14 +174,14 @@ namespace concurrent { super::throw_if_closed(lk); super::data_.push(elem); - super::notify_not_empty_if_needed(lk); + super::notify_elem_added(lk); } template <class T, class Container,class Cmp> void sync_priority_queue<T,Container,Cmp>::push(lock_guard<mutex>& lk, const T& elem) { super::throw_if_closed(lk); super::data_.push(elem); - super::notify_not_empty_if_needed(lk); + super::notify_elem_added(lk); } template <class T, class Container,class Cmp> void sync_priority_queue<T,Container,Cmp>::push(const T& elem) @@ -196,14 +196,14 @@ namespace concurrent { super::throw_if_closed(lk); super::data_.push(boost::move(elem)); - super::notify_not_empty_if_needed(lk); + super::notify_elem_added(lk); } template <class T, class Container,class Cmp> void sync_priority_queue<T,Container,Cmp>::push(lock_guard<mutex>& lk, BOOST_THREAD_RV_REF(T) elem) { super::throw_if_closed(lk); super::data_.push(boost::move(elem)); - super::notify_not_empty_if_needed(lk); + super::notify_elem_added(lk); } template <class T, class Container,class Cmp> void sync_priority_queue<T,Container,Cmp>::push(BOOST_THREAD_RV_REF(T) elem) diff --git a/boost/thread/concurrent_queues/sync_queue.hpp b/boost/thread/concurrent_queues/sync_queue.hpp index b36b57e607..ac0fe68d77 100644 --- a/boost/thread/concurrent_queues/sync_queue.hpp +++ b/boost/thread/concurrent_queues/sync_queue.hpp @@ -92,13 +92,13 @@ namespace concurrent inline void push(const value_type& elem, unique_lock<mutex>& lk) { super::data_.push_back(elem); - super::notify_not_empty_if_needed(lk); + super::notify_elem_added(lk); } inline void push(BOOST_THREAD_RV_REF(value_type) elem, unique_lock<mutex>& lk) { super::data_.push_back(boost::move(elem)); - super::notify_not_empty_if_needed(lk); + super::notify_elem_added(lk); } }; @@ -122,7 +122,7 @@ namespace concurrent // { // data_.push(boost::move(*cur));; // } -// notify_not_empty_if_needed(lk); +// notify_elem_added(lk); // } // catch (...) // { diff --git a/boost/thread/concurrent_queues/sync_timed_queue.hpp b/boost/thread/concurrent_queues/sync_timed_queue.hpp index a4394c0729..fd8d5a3c46 100644 --- a/boost/thread/concurrent_queues/sync_timed_queue.hpp +++ b/boost/thread/concurrent_queues/sync_timed_queue.hpp @@ -16,6 +16,8 @@ #include <boost/chrono/system_clocks.hpp> #include <boost/chrono/chrono_io.hpp> +#include <algorithm> // std::min + #include <boost/config/abi_prefix.hpp> namespace boost @@ -59,6 +61,45 @@ namespace detail } }; //end struct + template <class Duration> + chrono::time_point<chrono::steady_clock,Duration> + limit_timepoint(chrono::time_point<chrono::steady_clock,Duration> const& tp) + { + // Clock == chrono::steady_clock + return tp; + } + + template <class Clock, class Duration> + chrono::time_point<Clock,Duration> + limit_timepoint(chrono::time_point<Clock,Duration> const& tp) + { + // Clock != chrono::steady_clock + // The system time may jump while wait_until() is waiting. To compensate for this and time out near + // the correct time, we limit how long wait_until() can wait before going around the loop again. + const chrono::time_point<Clock,Duration> tpmax(chrono::time_point_cast<Duration>(Clock::now() + chrono::milliseconds(BOOST_THREAD_POLL_INTERVAL_MILLISECONDS))); + return (std::min)(tp, tpmax); + } + + template <class Duration> + chrono::steady_clock::time_point + convert_to_steady_clock_timepoint(chrono::time_point<chrono::steady_clock,Duration> const& tp) + { + // Clock == chrono::steady_clock + return chrono::time_point_cast<chrono::steady_clock::duration>(tp); + } + + template <class Clock, class Duration> + chrono::steady_clock::time_point + convert_to_steady_clock_timepoint(chrono::time_point<Clock,Duration> const& tp) + { + // Clock != chrono::steady_clock + // The system time may jump while wait_until() is waiting. To compensate for this and time out near + // the correct time, we limit how long wait_until() can wait before going around the loop again. + const chrono::steady_clock::duration dura(chrono::duration_cast<chrono::steady_clock::duration>(tp - Clock::now())); + const chrono::steady_clock::duration duramax(chrono::milliseconds(BOOST_THREAD_POLL_INTERVAL_MILLISECONDS)); + return chrono::steady_clock::now() + (std::min)(dura, duramax); + } + } //end detail namespace template <class T, class Clock = chrono::steady_clock, class TimePoint=typename Clock::time_point> @@ -88,8 +129,8 @@ namespace detail T pull(); void pull(T& elem); - template <class WClock, class Duration> - queue_op_status pull_until(chrono::time_point<WClock,Duration> const& tp, T& elem); + template <class Duration> + queue_op_status pull_until(chrono::time_point<clock,Duration> const& tp, T& elem); template <class Rep, class Period> queue_op_status pull_for(chrono::duration<Rep,Period> const& dura, T& elem); @@ -122,8 +163,9 @@ namespace detail inline bool not_empty_and_time_reached(lock_guard<mutex>& lk) const; bool wait_to_pull(unique_lock<mutex>&); - template <class WClock, class Duration> - queue_op_status wait_to_pull_until(unique_lock<mutex>&, chrono::time_point<WClock, Duration> const& tp); + queue_op_status wait_to_pull_until(unique_lock<mutex>&, TimePoint const& tp); + template <class Rep, class Period> + queue_op_status wait_to_pull_for(unique_lock<mutex>& lk, chrono::duration<Rep,Period> const& dura); T pull(unique_lock<mutex>&); T pull(lock_guard<mutex>&); @@ -228,14 +270,13 @@ namespace detail if (not_empty_and_time_reached(lk)) return false; // success if (super::closed(lk)) return true; // closed - const time_point tp(super::data_.top().time); - super::wait_until_closed_until(lk, tp); + const time_point tpmin(detail::limit_timepoint(super::data_.top().time)); + super::cond_.wait_until(lk, tpmin); } } template <class T, class Clock, class TimePoint> - template <class WClock, class Duration> - queue_op_status sync_timed_queue<T, Clock, TimePoint>::wait_to_pull_until(unique_lock<mutex>& lk, chrono::time_point<WClock, Duration> const& tp) + queue_op_status sync_timed_queue<T, Clock, TimePoint>::wait_to_pull_until(unique_lock<mutex>& lk, TimePoint const& tp) { for (;;) { @@ -249,8 +290,30 @@ namespace detail if (super::closed(lk)) return queue_op_status::closed; if (clock::now() >= tp) return super::empty(lk) ? queue_op_status::timeout : queue_op_status::not_ready; - const time_point tpmin(tp < super::data_.top().time ? tp : super::data_.top().time); - super::wait_until_closed_until(lk, tpmin); + const time_point tpmin((std::min)(tp, detail::limit_timepoint(super::data_.top().time))); + super::cond_.wait_until(lk, tpmin); + } + } + + template <class T, class Clock, class TimePoint> + template <class Rep, class Period> + queue_op_status sync_timed_queue<T, Clock, TimePoint>::wait_to_pull_for(unique_lock<mutex>& lk, chrono::duration<Rep,Period> const& dura) + { + const chrono::steady_clock::time_point tp(chrono::steady_clock::now() + chrono::duration_cast<chrono::steady_clock::duration>(dura)); + for (;;) + { + if (not_empty_and_time_reached(lk)) return queue_op_status::success; + if (super::closed(lk)) return queue_op_status::closed; + if (chrono::steady_clock::now() >= tp) return super::empty(lk) ? queue_op_status::timeout : queue_op_status::not_ready; + + super::wait_until_not_empty_or_closed_until(lk, tp); + + if (not_empty_and_time_reached(lk)) return queue_op_status::success; + if (super::closed(lk)) return queue_op_status::closed; + if (chrono::steady_clock::now() >= tp) return super::empty(lk) ? queue_op_status::timeout : queue_op_status::not_ready; + + const chrono::steady_clock::time_point tpmin((std::min)(tp, detail::convert_to_steady_clock_timepoint(super::data_.top().time))); + super::cond_.wait_until(lk, tpmin); } } @@ -315,12 +378,12 @@ namespace detail ////////////////////// template <class T, class Clock, class TimePoint> - template <class WClock, class Duration> + template <class Duration> queue_op_status - sync_timed_queue<T, Clock, TimePoint>::pull_until(chrono::time_point<WClock, Duration> const& tp, T& elem) + sync_timed_queue<T, Clock, TimePoint>::pull_until(chrono::time_point<clock,Duration> const& tp, T& elem) { unique_lock<mutex> lk(super::mtx_); - const queue_op_status rc = wait_to_pull_until(lk, tp); + const queue_op_status rc = wait_to_pull_until(lk, chrono::time_point_cast<typename time_point::duration>(tp)); if (rc == queue_op_status::success) pull(lk, elem); return rc; } @@ -331,7 +394,10 @@ namespace detail queue_op_status sync_timed_queue<T, Clock, TimePoint>::pull_for(chrono::duration<Rep,Period> const& dura, T& elem) { - return pull_until(chrono::steady_clock::now() + dura, elem); + unique_lock<mutex> lk(super::mtx_); + const queue_op_status rc = wait_to_pull_for(lk, dura); + if (rc == queue_op_status::success) pull(lk, elem); + return rc; } /////////////////////////// |