summaryrefslogtreecommitdiff
path: root/boost/thread/concurrent_queues/sync_timed_queue.hpp
diff options
context:
space:
mode:
Diffstat (limited to 'boost/thread/concurrent_queues/sync_timed_queue.hpp')
-rw-r--r--boost/thread/concurrent_queues/sync_timed_queue.hpp94
1 files changed, 80 insertions, 14 deletions
diff --git a/boost/thread/concurrent_queues/sync_timed_queue.hpp b/boost/thread/concurrent_queues/sync_timed_queue.hpp
index a4394c0729..fd8d5a3c46 100644
--- a/boost/thread/concurrent_queues/sync_timed_queue.hpp
+++ b/boost/thread/concurrent_queues/sync_timed_queue.hpp
@@ -16,6 +16,8 @@
#include <boost/chrono/system_clocks.hpp>
#include <boost/chrono/chrono_io.hpp>
+#include <algorithm> // std::min
+
#include <boost/config/abi_prefix.hpp>
namespace boost
@@ -59,6 +61,45 @@ namespace detail
}
}; //end struct
+ template <class Duration>
+ chrono::time_point<chrono::steady_clock,Duration>
+ limit_timepoint(chrono::time_point<chrono::steady_clock,Duration> const& tp)
+ {
+ // Clock == chrono::steady_clock
+ return tp;
+ }
+
+ template <class Clock, class Duration>
+ chrono::time_point<Clock,Duration>
+ limit_timepoint(chrono::time_point<Clock,Duration> const& tp)
+ {
+ // Clock != chrono::steady_clock
+ // The system time may jump while wait_until() is waiting. To compensate for this and time out near
+ // the correct time, we limit how long wait_until() can wait before going around the loop again.
+ const chrono::time_point<Clock,Duration> tpmax(chrono::time_point_cast<Duration>(Clock::now() + chrono::milliseconds(BOOST_THREAD_POLL_INTERVAL_MILLISECONDS)));
+ return (std::min)(tp, tpmax);
+ }
+
+ template <class Duration>
+ chrono::steady_clock::time_point
+ convert_to_steady_clock_timepoint(chrono::time_point<chrono::steady_clock,Duration> const& tp)
+ {
+ // Clock == chrono::steady_clock
+ return chrono::time_point_cast<chrono::steady_clock::duration>(tp);
+ }
+
+ template <class Clock, class Duration>
+ chrono::steady_clock::time_point
+ convert_to_steady_clock_timepoint(chrono::time_point<Clock,Duration> const& tp)
+ {
+ // Clock != chrono::steady_clock
+ // The system time may jump while wait_until() is waiting. To compensate for this and time out near
+ // the correct time, we limit how long wait_until() can wait before going around the loop again.
+ const chrono::steady_clock::duration dura(chrono::duration_cast<chrono::steady_clock::duration>(tp - Clock::now()));
+ const chrono::steady_clock::duration duramax(chrono::milliseconds(BOOST_THREAD_POLL_INTERVAL_MILLISECONDS));
+ return chrono::steady_clock::now() + (std::min)(dura, duramax);
+ }
+
} //end detail namespace
template <class T, class Clock = chrono::steady_clock, class TimePoint=typename Clock::time_point>
@@ -88,8 +129,8 @@ namespace detail
T pull();
void pull(T& elem);
- template <class WClock, class Duration>
- queue_op_status pull_until(chrono::time_point<WClock,Duration> const& tp, T& elem);
+ template <class Duration>
+ queue_op_status pull_until(chrono::time_point<clock,Duration> const& tp, T& elem);
template <class Rep, class Period>
queue_op_status pull_for(chrono::duration<Rep,Period> const& dura, T& elem);
@@ -122,8 +163,9 @@ namespace detail
inline bool not_empty_and_time_reached(lock_guard<mutex>& lk) const;
bool wait_to_pull(unique_lock<mutex>&);
- template <class WClock, class Duration>
- queue_op_status wait_to_pull_until(unique_lock<mutex>&, chrono::time_point<WClock, Duration> const& tp);
+ queue_op_status wait_to_pull_until(unique_lock<mutex>&, TimePoint const& tp);
+ template <class Rep, class Period>
+ queue_op_status wait_to_pull_for(unique_lock<mutex>& lk, chrono::duration<Rep,Period> const& dura);
T pull(unique_lock<mutex>&);
T pull(lock_guard<mutex>&);
@@ -228,14 +270,13 @@ namespace detail
if (not_empty_and_time_reached(lk)) return false; // success
if (super::closed(lk)) return true; // closed
- const time_point tp(super::data_.top().time);
- super::wait_until_closed_until(lk, tp);
+ const time_point tpmin(detail::limit_timepoint(super::data_.top().time));
+ super::cond_.wait_until(lk, tpmin);
}
}
template <class T, class Clock, class TimePoint>
- template <class WClock, class Duration>
- queue_op_status sync_timed_queue<T, Clock, TimePoint>::wait_to_pull_until(unique_lock<mutex>& lk, chrono::time_point<WClock, Duration> const& tp)
+ queue_op_status sync_timed_queue<T, Clock, TimePoint>::wait_to_pull_until(unique_lock<mutex>& lk, TimePoint const& tp)
{
for (;;)
{
@@ -249,8 +290,30 @@ namespace detail
if (super::closed(lk)) return queue_op_status::closed;
if (clock::now() >= tp) return super::empty(lk) ? queue_op_status::timeout : queue_op_status::not_ready;
- const time_point tpmin(tp < super::data_.top().time ? tp : super::data_.top().time);
- super::wait_until_closed_until(lk, tpmin);
+ const time_point tpmin((std::min)(tp, detail::limit_timepoint(super::data_.top().time)));
+ super::cond_.wait_until(lk, tpmin);
+ }
+ }
+
+ template <class T, class Clock, class TimePoint>
+ template <class Rep, class Period>
+ queue_op_status sync_timed_queue<T, Clock, TimePoint>::wait_to_pull_for(unique_lock<mutex>& lk, chrono::duration<Rep,Period> const& dura)
+ {
+ const chrono::steady_clock::time_point tp(chrono::steady_clock::now() + chrono::duration_cast<chrono::steady_clock::duration>(dura));
+ for (;;)
+ {
+ if (not_empty_and_time_reached(lk)) return queue_op_status::success;
+ if (super::closed(lk)) return queue_op_status::closed;
+ if (chrono::steady_clock::now() >= tp) return super::empty(lk) ? queue_op_status::timeout : queue_op_status::not_ready;
+
+ super::wait_until_not_empty_or_closed_until(lk, tp);
+
+ if (not_empty_and_time_reached(lk)) return queue_op_status::success;
+ if (super::closed(lk)) return queue_op_status::closed;
+ if (chrono::steady_clock::now() >= tp) return super::empty(lk) ? queue_op_status::timeout : queue_op_status::not_ready;
+
+ const chrono::steady_clock::time_point tpmin((std::min)(tp, detail::convert_to_steady_clock_timepoint(super::data_.top().time)));
+ super::cond_.wait_until(lk, tpmin);
}
}
@@ -315,12 +378,12 @@ namespace detail
//////////////////////
template <class T, class Clock, class TimePoint>
- template <class WClock, class Duration>
+ template <class Duration>
queue_op_status
- sync_timed_queue<T, Clock, TimePoint>::pull_until(chrono::time_point<WClock, Duration> const& tp, T& elem)
+ sync_timed_queue<T, Clock, TimePoint>::pull_until(chrono::time_point<clock,Duration> const& tp, T& elem)
{
unique_lock<mutex> lk(super::mtx_);
- const queue_op_status rc = wait_to_pull_until(lk, tp);
+ const queue_op_status rc = wait_to_pull_until(lk, chrono::time_point_cast<typename time_point::duration>(tp));
if (rc == queue_op_status::success) pull(lk, elem);
return rc;
}
@@ -331,7 +394,10 @@ namespace detail
queue_op_status
sync_timed_queue<T, Clock, TimePoint>::pull_for(chrono::duration<Rep,Period> const& dura, T& elem)
{
- return pull_until(chrono::steady_clock::now() + dura, elem);
+ unique_lock<mutex> lk(super::mtx_);
+ const queue_op_status rc = wait_to_pull_for(lk, dura);
+ if (rc == queue_op_status::success) pull(lk, elem);
+ return rc;
}
///////////////////////////