diff options
Diffstat (limited to 'boost/thread/concurrent_queues/sync_timed_queue.hpp')
-rw-r--r-- | boost/thread/concurrent_queues/sync_timed_queue.hpp | 94 |
1 files changed, 80 insertions, 14 deletions
diff --git a/boost/thread/concurrent_queues/sync_timed_queue.hpp b/boost/thread/concurrent_queues/sync_timed_queue.hpp index a4394c0729..fd8d5a3c46 100644 --- a/boost/thread/concurrent_queues/sync_timed_queue.hpp +++ b/boost/thread/concurrent_queues/sync_timed_queue.hpp @@ -16,6 +16,8 @@ #include <boost/chrono/system_clocks.hpp> #include <boost/chrono/chrono_io.hpp> +#include <algorithm> // std::min + #include <boost/config/abi_prefix.hpp> namespace boost @@ -59,6 +61,45 @@ namespace detail } }; //end struct + template <class Duration> + chrono::time_point<chrono::steady_clock,Duration> + limit_timepoint(chrono::time_point<chrono::steady_clock,Duration> const& tp) + { + // Clock == chrono::steady_clock + return tp; + } + + template <class Clock, class Duration> + chrono::time_point<Clock,Duration> + limit_timepoint(chrono::time_point<Clock,Duration> const& tp) + { + // Clock != chrono::steady_clock + // The system time may jump while wait_until() is waiting. To compensate for this and time out near + // the correct time, we limit how long wait_until() can wait before going around the loop again. + const chrono::time_point<Clock,Duration> tpmax(chrono::time_point_cast<Duration>(Clock::now() + chrono::milliseconds(BOOST_THREAD_POLL_INTERVAL_MILLISECONDS))); + return (std::min)(tp, tpmax); + } + + template <class Duration> + chrono::steady_clock::time_point + convert_to_steady_clock_timepoint(chrono::time_point<chrono::steady_clock,Duration> const& tp) + { + // Clock == chrono::steady_clock + return chrono::time_point_cast<chrono::steady_clock::duration>(tp); + } + + template <class Clock, class Duration> + chrono::steady_clock::time_point + convert_to_steady_clock_timepoint(chrono::time_point<Clock,Duration> const& tp) + { + // Clock != chrono::steady_clock + // The system time may jump while wait_until() is waiting. To compensate for this and time out near + // the correct time, we limit how long wait_until() can wait before going around the loop again. + const chrono::steady_clock::duration dura(chrono::duration_cast<chrono::steady_clock::duration>(tp - Clock::now())); + const chrono::steady_clock::duration duramax(chrono::milliseconds(BOOST_THREAD_POLL_INTERVAL_MILLISECONDS)); + return chrono::steady_clock::now() + (std::min)(dura, duramax); + } + } //end detail namespace template <class T, class Clock = chrono::steady_clock, class TimePoint=typename Clock::time_point> @@ -88,8 +129,8 @@ namespace detail T pull(); void pull(T& elem); - template <class WClock, class Duration> - queue_op_status pull_until(chrono::time_point<WClock,Duration> const& tp, T& elem); + template <class Duration> + queue_op_status pull_until(chrono::time_point<clock,Duration> const& tp, T& elem); template <class Rep, class Period> queue_op_status pull_for(chrono::duration<Rep,Period> const& dura, T& elem); @@ -122,8 +163,9 @@ namespace detail inline bool not_empty_and_time_reached(lock_guard<mutex>& lk) const; bool wait_to_pull(unique_lock<mutex>&); - template <class WClock, class Duration> - queue_op_status wait_to_pull_until(unique_lock<mutex>&, chrono::time_point<WClock, Duration> const& tp); + queue_op_status wait_to_pull_until(unique_lock<mutex>&, TimePoint const& tp); + template <class Rep, class Period> + queue_op_status wait_to_pull_for(unique_lock<mutex>& lk, chrono::duration<Rep,Period> const& dura); T pull(unique_lock<mutex>&); T pull(lock_guard<mutex>&); @@ -228,14 +270,13 @@ namespace detail if (not_empty_and_time_reached(lk)) return false; // success if (super::closed(lk)) return true; // closed - const time_point tp(super::data_.top().time); - super::wait_until_closed_until(lk, tp); + const time_point tpmin(detail::limit_timepoint(super::data_.top().time)); + super::cond_.wait_until(lk, tpmin); } } template <class T, class Clock, class TimePoint> - template <class WClock, class Duration> - queue_op_status sync_timed_queue<T, Clock, TimePoint>::wait_to_pull_until(unique_lock<mutex>& lk, chrono::time_point<WClock, Duration> const& tp) + queue_op_status sync_timed_queue<T, Clock, TimePoint>::wait_to_pull_until(unique_lock<mutex>& lk, TimePoint const& tp) { for (;;) { @@ -249,8 +290,30 @@ namespace detail if (super::closed(lk)) return queue_op_status::closed; if (clock::now() >= tp) return super::empty(lk) ? queue_op_status::timeout : queue_op_status::not_ready; - const time_point tpmin(tp < super::data_.top().time ? tp : super::data_.top().time); - super::wait_until_closed_until(lk, tpmin); + const time_point tpmin((std::min)(tp, detail::limit_timepoint(super::data_.top().time))); + super::cond_.wait_until(lk, tpmin); + } + } + + template <class T, class Clock, class TimePoint> + template <class Rep, class Period> + queue_op_status sync_timed_queue<T, Clock, TimePoint>::wait_to_pull_for(unique_lock<mutex>& lk, chrono::duration<Rep,Period> const& dura) + { + const chrono::steady_clock::time_point tp(chrono::steady_clock::now() + chrono::duration_cast<chrono::steady_clock::duration>(dura)); + for (;;) + { + if (not_empty_and_time_reached(lk)) return queue_op_status::success; + if (super::closed(lk)) return queue_op_status::closed; + if (chrono::steady_clock::now() >= tp) return super::empty(lk) ? queue_op_status::timeout : queue_op_status::not_ready; + + super::wait_until_not_empty_or_closed_until(lk, tp); + + if (not_empty_and_time_reached(lk)) return queue_op_status::success; + if (super::closed(lk)) return queue_op_status::closed; + if (chrono::steady_clock::now() >= tp) return super::empty(lk) ? queue_op_status::timeout : queue_op_status::not_ready; + + const chrono::steady_clock::time_point tpmin((std::min)(tp, detail::convert_to_steady_clock_timepoint(super::data_.top().time))); + super::cond_.wait_until(lk, tpmin); } } @@ -315,12 +378,12 @@ namespace detail ////////////////////// template <class T, class Clock, class TimePoint> - template <class WClock, class Duration> + template <class Duration> queue_op_status - sync_timed_queue<T, Clock, TimePoint>::pull_until(chrono::time_point<WClock, Duration> const& tp, T& elem) + sync_timed_queue<T, Clock, TimePoint>::pull_until(chrono::time_point<clock,Duration> const& tp, T& elem) { unique_lock<mutex> lk(super::mtx_); - const queue_op_status rc = wait_to_pull_until(lk, tp); + const queue_op_status rc = wait_to_pull_until(lk, chrono::time_point_cast<typename time_point::duration>(tp)); if (rc == queue_op_status::success) pull(lk, elem); return rc; } @@ -331,7 +394,10 @@ namespace detail queue_op_status sync_timed_queue<T, Clock, TimePoint>::pull_for(chrono::duration<Rep,Period> const& dura, T& elem) { - return pull_until(chrono::steady_clock::now() + dura, elem); + unique_lock<mutex> lk(super::mtx_); + const queue_op_status rc = wait_to_pull_for(lk, dura); + if (rc == queue_op_status::success) pull(lk, elem); + return rc; } /////////////////////////// |