summaryrefslogtreecommitdiff
path: root/boost/thread/concurrent_queues/sync_timed_queue.hpp
diff options
context:
space:
mode:
Diffstat (limited to 'boost/thread/concurrent_queues/sync_timed_queue.hpp')
-rw-r--r--boost/thread/concurrent_queues/sync_timed_queue.hpp180
1 files changed, 57 insertions, 123 deletions
diff --git a/boost/thread/concurrent_queues/sync_timed_queue.hpp b/boost/thread/concurrent_queues/sync_timed_queue.hpp
index 596a625ef5..a4394c0729 100644
--- a/boost/thread/concurrent_queues/sync_timed_queue.hpp
+++ b/boost/thread/concurrent_queues/sync_timed_queue.hpp
@@ -53,11 +53,6 @@ namespace detail
return *this;
}
- bool time_not_reached() const
- {
- return time > clock::now();
- }
-
bool operator <(const scheduled_type & other) const
{
return this->time > other.time;
@@ -123,6 +118,13 @@ namespace detail
queue_op_status try_push(BOOST_THREAD_RV_REF(T) elem, chrono::duration<Rep,Period> const& dura);
private:
+ inline bool not_empty_and_time_reached(unique_lock<mutex>& lk) const;
+ inline bool not_empty_and_time_reached(lock_guard<mutex>& lk) const;
+
+ bool wait_to_pull(unique_lock<mutex>&);
+ template <class WClock, class Duration>
+ queue_op_status wait_to_pull_until(unique_lock<mutex>&, chrono::time_point<WClock, Duration> const& tp);
+
T pull(unique_lock<mutex>&);
T pull(lock_guard<mutex>&);
@@ -134,15 +136,6 @@ namespace detail
queue_op_status wait_pull(unique_lock<mutex>& lk, T& elem);
- bool wait_until_not_empty_time_reached_or_closed(unique_lock<mutex>&);
- T pull_when_time_reached(unique_lock<mutex>&);
- template <class WClock, class Duration>
- queue_op_status pull_when_time_reached_until(unique_lock<mutex>&, chrono::time_point<WClock,Duration> const& tp, T& elem);
- bool time_not_reached(unique_lock<mutex>&);
- bool time_not_reached(lock_guard<mutex>&);
- bool empty_or_time_not_reached(unique_lock<mutex>&);
- bool empty_or_time_not_reached(lock_guard<mutex>&);
-
sync_timed_queue(const sync_timed_queue&);
sync_timed_queue& operator=(const sync_timed_queue&);
sync_timed_queue(BOOST_THREAD_RV_REF(sync_timed_queue));
@@ -210,82 +203,55 @@ namespace detail
///////////////////////////
template <class T, class Clock, class TimePoint>
- bool sync_timed_queue<T, Clock, TimePoint>::time_not_reached(unique_lock<mutex>&)
+ bool sync_timed_queue<T, Clock, TimePoint>::not_empty_and_time_reached(unique_lock<mutex>& lk) const
{
- return super::data_.top().time_not_reached();
+ return ! super::empty(lk) && clock::now() >= super::data_.top().time;
}
template <class T, class Clock, class TimePoint>
- bool sync_timed_queue<T, Clock, TimePoint>::time_not_reached(lock_guard<mutex>&)
+ bool sync_timed_queue<T, Clock, TimePoint>::not_empty_and_time_reached(lock_guard<mutex>& lk) const
{
- return super::data_.top().time_not_reached();
+ return ! super::empty(lk) && clock::now() >= super::data_.top().time;
}
///////////////////////////
template <class T, class Clock, class TimePoint>
- bool sync_timed_queue<T, Clock, TimePoint>::wait_until_not_empty_time_reached_or_closed(unique_lock<mutex>& lk)
+ bool sync_timed_queue<T, Clock, TimePoint>::wait_to_pull(unique_lock<mutex>& lk)
{
for (;;)
{
- if (super::closed(lk)) return true;
- while (! super::empty(lk)) {
- if (! time_not_reached(lk)) return false;
- time_point tp = super::data_.top().time;
- super::not_empty_.wait_until(lk, tp);
- if (super::closed(lk)) return true;
- }
- if (super::closed(lk)) return true;
- super::not_empty_.wait(lk);
- }
- //return false;
- }
+ if (not_empty_and_time_reached(lk)) return false; // success
+ if (super::closed(lk)) return true; // closed
- ///////////////////////////
- template <class T, class Clock, class TimePoint>
- T sync_timed_queue<T, Clock, TimePoint>::pull_when_time_reached(unique_lock<mutex>& lk)
- {
- while (time_not_reached(lk))
- {
- super::throw_if_closed(lk);
- time_point tp = super::data_.top().time;
- super::not_empty_.wait_until(lk,tp);
- super::wait_until_not_empty(lk);
+ super::wait_until_not_empty_or_closed(lk);
+
+ if (not_empty_and_time_reached(lk)) return false; // success
+ if (super::closed(lk)) return true; // closed
+
+ const time_point tp(super::data_.top().time);
+ super::wait_until_closed_until(lk, tp);
}
- return pull(lk);
}
template <class T, class Clock, class TimePoint>
template <class WClock, class Duration>
- queue_op_status
- sync_timed_queue<T, Clock, TimePoint>::pull_when_time_reached_until(unique_lock<mutex>& lk, chrono::time_point<WClock, Duration> const& tp, T& elem)
+ queue_op_status sync_timed_queue<T, Clock, TimePoint>::wait_to_pull_until(unique_lock<mutex>& lk, chrono::time_point<WClock, Duration> const& tp)
{
- chrono::time_point<WClock, Duration> tpmin = (tp < super::data_.top().time) ? tp : super::data_.top().time;
- while (time_not_reached(lk))
+ for (;;)
{
- super::throw_if_closed(lk);
- if (cv_status::timeout == super::not_empty_.wait_until(lk, tpmin)) {
- if (time_not_reached(lk)) return queue_op_status::not_ready;
- return queue_op_status::timeout;
- }
- }
- pull(lk, elem);
- return queue_op_status::success;
- }
+ if (not_empty_and_time_reached(lk)) return queue_op_status::success;
+ if (super::closed(lk)) return queue_op_status::closed;
+ if (clock::now() >= tp) return super::empty(lk) ? queue_op_status::timeout : queue_op_status::not_ready;
- ///////////////////////////
- template <class T, class Clock, class TimePoint>
- bool sync_timed_queue<T, Clock, TimePoint>::empty_or_time_not_reached(unique_lock<mutex>& lk)
- {
- if ( super::empty(lk) ) return true;
- if ( time_not_reached(lk) ) return true;
- return false;
- }
- template <class T, class Clock, class TimePoint>
- bool sync_timed_queue<T, Clock, TimePoint>::empty_or_time_not_reached(lock_guard<mutex>& lk)
- {
- if ( super::empty(lk) ) return true;
- if ( time_not_reached(lk) ) return true;
- return false;
+ super::wait_until_not_empty_or_closed_until(lk, tp);
+
+ if (not_empty_and_time_reached(lk)) return queue_op_status::success;
+ if (super::closed(lk)) return queue_op_status::closed;
+ if (clock::now() >= tp) return super::empty(lk) ? queue_op_status::timeout : queue_op_status::not_ready;
+
+ const time_point tpmin(tp < super::data_.top().time ? tp : super::data_.top().time);
+ super::wait_until_closed_until(lk, tpmin);
+ }
}
///////////////////////////
@@ -312,8 +278,9 @@ namespace detail
T sync_timed_queue<T, Clock, TimePoint>::pull()
{
unique_lock<mutex> lk(super::mtx_);
- super::wait_until_not_empty(lk);
- return pull_when_time_reached(lk);
+ const bool has_been_closed = wait_to_pull(lk);
+ if (has_been_closed) super::throw_if_closed(lk);
+ return pull(lk);
}
///////////////////////////
@@ -341,8 +308,9 @@ namespace detail
void sync_timed_queue<T, Clock, TimePoint>::pull(T& elem)
{
unique_lock<mutex> lk(super::mtx_);
- super::wait_until_not_empty(lk);
- elem = pull_when_time_reached(lk);
+ const bool has_been_closed = wait_to_pull(lk);
+ if (has_been_closed) super::throw_if_closed(lk);
+ pull(lk, elem);
}
//////////////////////
@@ -352,10 +320,9 @@ namespace detail
sync_timed_queue<T, Clock, TimePoint>::pull_until(chrono::time_point<WClock, Duration> const& tp, T& elem)
{
unique_lock<mutex> lk(super::mtx_);
-
- if (queue_op_status::timeout == super::wait_until_not_empty_until(lk, tp))
- return queue_op_status::timeout;
- return pull_when_time_reached_until(lk, tp, elem);
+ const queue_op_status rc = wait_to_pull_until(lk, tp);
+ if (rc == queue_op_status::success) pull(lk, elem);
+ return rc;
}
//////////////////////
@@ -371,35 +338,26 @@ namespace detail
template <class T, class Clock, class TimePoint>
queue_op_status sync_timed_queue<T, Clock, TimePoint>::try_pull(unique_lock<mutex>& lk, T& elem)
{
- if ( super::empty(lk) )
+ if (not_empty_and_time_reached(lk))
{
- if (super::closed(lk)) return queue_op_status::closed;
- return queue_op_status::empty;
- }
- if ( time_not_reached(lk) )
- {
- if (super::closed(lk)) return queue_op_status::closed;
- return queue_op_status::not_ready;
+ pull(lk, elem);
+ return queue_op_status::success;
}
-
- pull(lk, elem);
- return queue_op_status::success;
+ if (super::closed(lk)) return queue_op_status::closed;
+ if (super::empty(lk)) return queue_op_status::empty;
+ return queue_op_status::not_ready;
}
template <class T, class Clock, class TimePoint>
queue_op_status sync_timed_queue<T, Clock, TimePoint>::try_pull(lock_guard<mutex>& lk, T& elem)
{
- if ( super::empty(lk) )
+ if (not_empty_and_time_reached(lk))
{
- if (super::closed(lk)) return queue_op_status::closed;
- return queue_op_status::empty;
- }
- if ( time_not_reached(lk) )
- {
- if (super::closed(lk)) return queue_op_status::closed;
- return queue_op_status::not_ready;
+ pull(lk, elem);
+ return queue_op_status::success;
}
- pull(lk, elem);
- return queue_op_status::success;
+ if (super::closed(lk)) return queue_op_status::closed;
+ if (super::empty(lk)) return queue_op_status::empty;
+ return queue_op_status::not_ready;
}
template <class T, class Clock, class TimePoint>
@@ -413,11 +371,7 @@ namespace detail
template <class T, class Clock, class TimePoint>
queue_op_status sync_timed_queue<T, Clock, TimePoint>::wait_pull(unique_lock<mutex>& lk, T& elem)
{
- if (super::empty(lk))
- {
- if (super::closed(lk)) return queue_op_status::closed;
- }
- bool has_been_closed = wait_until_not_empty_time_reached_or_closed(lk);
+ const bool has_been_closed = wait_to_pull(lk);
if (has_been_closed) return queue_op_status::closed;
pull(lk, elem);
return queue_op_status::success;
@@ -430,26 +384,6 @@ namespace detail
return wait_pull(lk, elem);
}
-// ///////////////////////////
-// template <class T, class Clock, class TimePoint>
-// queue_op_status sync_timed_queue<T, Clock, TimePoint>::wait_pull(unique_lock<mutex> &lk, T& elem)
-// {
-// if (super::empty(lk))
-// {
-// if (super::closed(lk)) return queue_op_status::closed;
-// }
-// bool has_been_closed = super::wait_until_not_empty_or_closed(lk);
-// if (has_been_closed) return queue_op_status::closed;
-// pull(lk, elem);
-// return queue_op_status::success;
-// }
-// template <class T, class Clock, class TimePoint>
-// queue_op_status sync_timed_queue<T, Clock, TimePoint>::wait_pull(T& elem)
-// {
-// unique_lock<mutex> lk(super::mtx_);
-// return wait_pull(lk, elem);
-// }
-
///////////////////////////
template <class T, class Clock, class TimePoint>
queue_op_status sync_timed_queue<T, Clock, TimePoint>::nonblocking_pull(T& elem)