summaryrefslogtreecommitdiff
path: root/boost/thread/concurrent_queues
diff options
context:
space:
mode:
authorDongHun Kwak <dh0128.kwak@samsung.com>2019-12-05 15:12:59 +0900
committerDongHun Kwak <dh0128.kwak@samsung.com>2019-12-05 15:12:59 +0900
commitb8cf34c691623e4ec329053cbbf68522a855882d (patch)
tree34da08632a99677f6b79ecb65e5b655a5b69a67f /boost/thread/concurrent_queues
parent3fdc3e5ee96dca5b11d1694975a65200787eab86 (diff)
downloadboost-b8cf34c691623e4ec329053cbbf68522a855882d.tar.gz
boost-b8cf34c691623e4ec329053cbbf68522a855882d.tar.bz2
boost-b8cf34c691623e4ec329053cbbf68522a855882d.zip
Imported Upstream version 1.67.0upstream/1.67.0
Diffstat (limited to 'boost/thread/concurrent_queues')
-rw-r--r--boost/thread/concurrent_queues/detail/sync_deque_base.hpp50
-rw-r--r--boost/thread/concurrent_queues/detail/sync_queue_base.hpp50
-rw-r--r--boost/thread/concurrent_queues/sync_deque.hpp12
-rw-r--r--boost/thread/concurrent_queues/sync_priority_queue.hpp24
-rw-r--r--boost/thread/concurrent_queues/sync_queue.hpp20
-rw-r--r--boost/thread/concurrent_queues/sync_timed_queue.hpp180
6 files changed, 131 insertions, 205 deletions
diff --git a/boost/thread/concurrent_queues/detail/sync_deque_base.hpp b/boost/thread/concurrent_queues/detail/sync_deque_base.hpp
index beffaf73b1..e3ecad15de 100644
--- a/boost/thread/concurrent_queues/detail/sync_deque_base.hpp
+++ b/boost/thread/concurrent_queues/detail/sync_deque_base.hpp
@@ -11,6 +11,8 @@
//
//////////////////////////////////////////////////////////////////////////////
+#include <boost/bind.hpp>
+
#include <boost/thread/detail/config.hpp>
#include <boost/thread/condition_variable.hpp>
#include <boost/thread/detail/move.hpp>
@@ -84,10 +86,13 @@ namespace detail
inline void throw_if_closed(unique_lock<mutex>&);
inline void throw_if_closed(lock_guard<mutex>&);
- inline void wait_until_not_empty(unique_lock<mutex>& lk);
+ inline bool not_empty_or_closed(unique_lock<mutex>& ) const;
+
inline bool wait_until_not_empty_or_closed(unique_lock<mutex>& lk);
template <class WClock, class Duration>
- queue_op_status wait_until_not_empty_until(unique_lock<mutex>& lk, chrono::time_point<WClock,Duration> const&);
+ queue_op_status wait_until_not_empty_or_closed_until(unique_lock<mutex>& lk, chrono::time_point<WClock,Duration> const&tp);
+ template <class WClock, class Duration>
+ queue_op_status wait_until_closed_until(unique_lock<mutex>& lk, chrono::time_point<WClock,Duration> const&tp);
inline void notify_not_empty_if_needed(unique_lock<mutex>& )
{
@@ -176,39 +181,38 @@ namespace detail
}
template <class ValueType, class Queue>
- void sync_deque_base<ValueType, Queue>::wait_until_not_empty(unique_lock<mutex>& lk)
+ bool sync_deque_base<ValueType, Queue>::not_empty_or_closed(unique_lock<mutex>& ) const
{
- for (;;)
- {
- if (! empty(lk)) break;
- throw_if_closed(lk);
- not_empty_.wait(lk);
- }
+ return ! data_.empty() || closed_;
}
+
template <class ValueType, class Queue>
bool sync_deque_base<ValueType, Queue>::wait_until_not_empty_or_closed(unique_lock<mutex>& lk)
{
- for (;;)
- {
- if (! empty(lk)) break;
- if (closed(lk)) return true;
- not_empty_.wait(lk);
- }
- return false;
+ not_empty_.wait(lk, boost::bind(&sync_deque_base<ValueType, Queue>::not_empty_or_closed, boost::ref(*this), boost::ref(lk)));
+ if (! empty(lk)) return false; // success
+ return true; // closed
}
template <class ValueType, class Queue>
template <class WClock, class Duration>
- queue_op_status sync_deque_base<ValueType, Queue>::wait_until_not_empty_until(unique_lock<mutex>& lk, chrono::time_point<WClock,Duration> const&tp)
+ queue_op_status sync_deque_base<ValueType, Queue>::wait_until_not_empty_or_closed_until(unique_lock<mutex>& lk, chrono::time_point<WClock,Duration> const&tp)
{
- for (;;)
- {
- if (! empty(lk)) return queue_op_status::success;
- throw_if_closed(lk);
- if (not_empty_.wait_until(lk, tp) == cv_status::timeout ) return queue_op_status::timeout;
- }
+ if (! not_empty_.wait_until(lk, tp, boost::bind(&sync_deque_base<ValueType, Queue>::not_empty_or_closed, boost::ref(*this), boost::ref(lk))))
+ return queue_op_status::timeout;
+ if (! empty(lk)) return queue_op_status::success;
+ return queue_op_status::closed;
}
+ template <class ValueType, class Queue>
+ template <class WClock, class Duration>
+ queue_op_status sync_deque_base<ValueType, Queue>::wait_until_closed_until(unique_lock<mutex>& lk, chrono::time_point<WClock,Duration> const&tp)
+ {
+ bool (sync_queue_base<ValueType, Queue>::*closed_function_ptr)(unique_lock<mutex>&) const = &sync_queue_base<ValueType, Queue>::closed;
+ if (! not_empty_.wait_until(lk, tp, boost::bind(closed_function_ptr, boost::ref(*this), boost::ref(lk))))
+ return queue_op_status::timeout;
+ return queue_op_status::closed;
+ }
} // detail
} // concurrent
diff --git a/boost/thread/concurrent_queues/detail/sync_queue_base.hpp b/boost/thread/concurrent_queues/detail/sync_queue_base.hpp
index 6ecb8211e3..c570da9505 100644
--- a/boost/thread/concurrent_queues/detail/sync_queue_base.hpp
+++ b/boost/thread/concurrent_queues/detail/sync_queue_base.hpp
@@ -11,6 +11,8 @@
//
//////////////////////////////////////////////////////////////////////////////
+#include <boost/bind.hpp>
+
#include <boost/thread/detail/config.hpp>
#include <boost/thread/condition_variable.hpp>
#include <boost/thread/detail/move.hpp>
@@ -84,10 +86,13 @@ namespace detail
inline void throw_if_closed(unique_lock<mutex>&);
inline void throw_if_closed(lock_guard<mutex>&);
- inline void wait_until_not_empty(unique_lock<mutex>& lk);
+ inline bool not_empty_or_closed(unique_lock<mutex>& ) const;
+
inline bool wait_until_not_empty_or_closed(unique_lock<mutex>& lk);
template <class WClock, class Duration>
- queue_op_status wait_until_not_empty_until(unique_lock<mutex>& lk, chrono::time_point<WClock,Duration> const&);
+ queue_op_status wait_until_not_empty_or_closed_until(unique_lock<mutex>& lk, chrono::time_point<WClock,Duration> const&tp);
+ template <class WClock, class Duration>
+ queue_op_status wait_until_closed_until(unique_lock<mutex>& lk, chrono::time_point<WClock,Duration> const&tp);
inline void notify_not_empty_if_needed(unique_lock<mutex>& )
{
@@ -176,39 +181,38 @@ namespace detail
}
template <class ValueType, class Queue>
- void sync_queue_base<ValueType, Queue>::wait_until_not_empty(unique_lock<mutex>& lk)
+ bool sync_queue_base<ValueType, Queue>::not_empty_or_closed(unique_lock<mutex>& ) const
{
- for (;;)
- {
- if (! empty(lk)) break;
- throw_if_closed(lk);
- not_empty_.wait(lk);
- }
+ return ! data_.empty() || closed_;
}
+
template <class ValueType, class Queue>
bool sync_queue_base<ValueType, Queue>::wait_until_not_empty_or_closed(unique_lock<mutex>& lk)
{
- for (;;)
- {
- if (! empty(lk)) break;
- if (closed(lk)) return true;
- not_empty_.wait(lk);
- }
- return false;
+ not_empty_.wait(lk, boost::bind(&sync_queue_base<ValueType, Queue>::not_empty_or_closed, boost::ref(*this), boost::ref(lk)));
+ if (! empty(lk)) return false; // success
+ return true; // closed
}
template <class ValueType, class Queue>
template <class WClock, class Duration>
- queue_op_status sync_queue_base<ValueType, Queue>::wait_until_not_empty_until(unique_lock<mutex>& lk, chrono::time_point<WClock,Duration> const&tp)
+ queue_op_status sync_queue_base<ValueType, Queue>::wait_until_not_empty_or_closed_until(unique_lock<mutex>& lk, chrono::time_point<WClock,Duration> const&tp)
{
- for (;;)
- {
- if (! empty(lk)) return queue_op_status::success;
- throw_if_closed(lk);
- if (not_empty_.wait_until(lk, tp) == cv_status::timeout ) return queue_op_status::timeout;
- }
+ if (! not_empty_.wait_until(lk, tp, boost::bind(&sync_queue_base<ValueType, Queue>::not_empty_or_closed, boost::ref(*this), boost::ref(lk))))
+ return queue_op_status::timeout;
+ if (! empty(lk)) return queue_op_status::success;
+ return queue_op_status::closed;
}
+ template <class ValueType, class Queue>
+ template <class WClock, class Duration>
+ queue_op_status sync_queue_base<ValueType, Queue>::wait_until_closed_until(unique_lock<mutex>& lk, chrono::time_point<WClock,Duration> const&tp)
+ {
+ bool (sync_queue_base<ValueType, Queue>::*closed_function_ptr)(unique_lock<mutex>&) const = &sync_queue_base<ValueType, Queue>::closed;
+ if (! not_empty_.wait_until(lk, tp, boost::bind(closed_function_ptr, boost::ref(*this), boost::ref(lk))))
+ return queue_op_status::timeout;
+ return queue_op_status::closed;
+ }
} // detail
} // concurrent
diff --git a/boost/thread/concurrent_queues/sync_deque.hpp b/boost/thread/concurrent_queues/sync_deque.hpp
index c84dae022a..c6159c4be0 100644
--- a/boost/thread/concurrent_queues/sync_deque.hpp
+++ b/boost/thread/concurrent_queues/sync_deque.hpp
@@ -149,11 +149,7 @@ namespace concurrent
template <class ValueType, class Container>
queue_op_status sync_deque<ValueType, Container>::wait_pull_front(ValueType& elem, unique_lock<mutex>& lk)
{
- if (super::empty(lk))
- {
- if (super::closed(lk)) return queue_op_status::closed;
- }
- bool has_been_closed = super::wait_until_not_empty_or_closed(lk);
+ const bool has_been_closed = super::wait_until_not_empty_or_closed(lk);
if (has_been_closed) return queue_op_status::closed;
pull_front(elem, lk);
return queue_op_status::success;
@@ -188,7 +184,8 @@ namespace concurrent
void sync_deque<ValueType, Container>::pull_front(ValueType& elem)
{
unique_lock<mutex> lk(super::mtx_);
- super::wait_until_not_empty(lk);
+ const bool has_been_closed = super::wait_until_not_empty_or_closed(lk);
+ if (has_been_closed) super::throw_if_closed(lk);
pull_front(elem, lk);
}
@@ -197,7 +194,8 @@ namespace concurrent
ValueType sync_deque<ValueType, Container>::pull_front()
{
unique_lock<mutex> lk(super::mtx_);
- super::wait_until_not_empty(lk);
+ const bool has_been_closed = super::wait_until_not_empty_or_closed(lk);
+ if (has_been_closed) super::throw_if_closed(lk);
return pull_front(lk);
}
diff --git a/boost/thread/concurrent_queues/sync_priority_queue.hpp b/boost/thread/concurrent_queues/sync_priority_queue.hpp
index a556910cd6..2c6881f90f 100644
--- a/boost/thread/concurrent_queues/sync_priority_queue.hpp
+++ b/boost/thread/concurrent_queues/sync_priority_queue.hpp
@@ -82,7 +82,7 @@ namespace detail {
return boost::move(result);
}
- Type const& top()
+ Type const& top() const
{
return _elements.front();
}
@@ -249,7 +249,8 @@ namespace concurrent
T sync_priority_queue<T,Container,Cmp>::pull()
{
unique_lock<mutex> lk(super::mtx_);
- super::wait_until_not_empty(lk);
+ const bool has_been_closed = super::wait_until_not_empty_or_closed(lk);
+ if (has_been_closed) super::throw_if_closed(lk);
return pull(lk);
}
@@ -269,7 +270,8 @@ namespace concurrent
void sync_priority_queue<T,Container,Cmp>::pull(T& elem)
{
unique_lock<mutex> lk(super::mtx_);
- super::wait_until_not_empty(lk);
+ const bool has_been_closed = super::wait_until_not_empty_or_closed(lk);
+ if (has_been_closed) super::throw_if_closed(lk);
pull(lk, elem);
}
@@ -280,10 +282,9 @@ namespace concurrent
sync_priority_queue<T,Cont,Cmp>::pull_until(const chrono::time_point<WClock,Duration>& tp, T& elem)
{
unique_lock<mutex> lk(super::mtx_);
- if (queue_op_status::timeout == super::wait_until_not_empty_until(lk, tp))
- return queue_op_status::timeout;
- pull(lk, elem);
- return queue_op_status::success;
+ const queue_op_status rc = super::wait_until_not_empty_or_closed_until(lk, tp);
+ if (rc == queue_op_status::success) pull(lk, elem);
+ return rc;
}
//////////////////////
@@ -292,7 +293,7 @@ namespace concurrent
queue_op_status
sync_priority_queue<T,Cont,Cmp>::pull_for(const chrono::duration<Rep,Period>& dura, T& elem)
{
- return pull_until(clock::now() + dura, elem);
+ return pull_until(chrono::steady_clock::now() + dura, elem);
}
//////////////////////
@@ -334,11 +335,7 @@ namespace concurrent
template <class T,class Container, class Cmp>
queue_op_status sync_priority_queue<T,Container,Cmp>::wait_pull(unique_lock<mutex>& lk, T& elem)
{
- if (super::empty(lk))
- {
- if (super::closed(lk)) return queue_op_status::closed;
- }
- bool has_been_closed = super::wait_until_not_empty_or_closed(lk);
+ const bool has_been_closed = super::wait_until_not_empty_or_closed(lk);
if (has_been_closed) return queue_op_status::closed;
pull(lk, elem);
return queue_op_status::success;
@@ -352,7 +349,6 @@ namespace concurrent
}
//////////////////////
-
template <class T,class Container, class Cmp>
queue_op_status sync_priority_queue<T,Container,Cmp>::nonblocking_pull(T& elem)
{
diff --git a/boost/thread/concurrent_queues/sync_queue.hpp b/boost/thread/concurrent_queues/sync_queue.hpp
index 1dbbef05dd..b36b57e607 100644
--- a/boost/thread/concurrent_queues/sync_queue.hpp
+++ b/boost/thread/concurrent_queues/sync_queue.hpp
@@ -10,7 +10,6 @@
// See http://www.boost.org/libs/thread for documentation.
//
//////////////////////////////////////////////////////////////////////////////
-#include <iostream>
#include <boost/thread/detail/config.hpp>
#include <boost/thread/concurrent_queues/detail/sync_queue_base.hpp>
@@ -51,7 +50,6 @@ namespace concurrent
inline ~sync_queue();
// Modifiers
-
inline void push(const value_type& x);
inline queue_op_status try_push(const value_type& x);
inline queue_op_status nonblocking_push(const value_type& x);
@@ -151,19 +149,9 @@ namespace concurrent
template <class ValueType, class Container>
queue_op_status sync_queue<ValueType, Container>::wait_pull(ValueType& elem, unique_lock<mutex>& lk)
{
- //std::cout << __FILE__ << "[" << __LINE__ << "]" << std::endl;
- if (super::empty(lk))
- {
- //std::cout << __FILE__ << "[" << __LINE__ << "]" << std::endl;
- if (super::closed(lk)) return queue_op_status::closed;
- }
- //std::cout << __FILE__ << "[" << __LINE__ << "]" << std::endl;
- bool has_been_closed = super::wait_until_not_empty_or_closed(lk);
- //std::cout << __FILE__ << "[" << __LINE__ << "]" << std::endl;
+ const bool has_been_closed = super::wait_until_not_empty_or_closed(lk);
if (has_been_closed) return queue_op_status::closed;
- //std::cout << __FILE__ << "[" << __LINE__ << "]" << std::endl;
pull(elem, lk);
- //std::cout << __FILE__ << "[" << __LINE__ << "]" << std::endl;
return queue_op_status::success;
}
@@ -196,7 +184,8 @@ namespace concurrent
void sync_queue<ValueType, Container>::pull(ValueType& elem)
{
unique_lock<mutex> lk(super::mtx_);
- super::wait_until_not_empty(lk);
+ const bool has_been_closed = super::wait_until_not_empty_or_closed(lk);
+ if (has_been_closed) super::throw_if_closed(lk);
pull(elem, lk);
}
@@ -205,7 +194,8 @@ namespace concurrent
ValueType sync_queue<ValueType, Container>::pull()
{
unique_lock<mutex> lk(super::mtx_);
- super::wait_until_not_empty(lk);
+ const bool has_been_closed = super::wait_until_not_empty_or_closed(lk);
+ if (has_been_closed) super::throw_if_closed(lk);
return pull(lk);
}
diff --git a/boost/thread/concurrent_queues/sync_timed_queue.hpp b/boost/thread/concurrent_queues/sync_timed_queue.hpp
index 596a625ef5..a4394c0729 100644
--- a/boost/thread/concurrent_queues/sync_timed_queue.hpp
+++ b/boost/thread/concurrent_queues/sync_timed_queue.hpp
@@ -53,11 +53,6 @@ namespace detail
return *this;
}
- bool time_not_reached() const
- {
- return time > clock::now();
- }
-
bool operator <(const scheduled_type & other) const
{
return this->time > other.time;
@@ -123,6 +118,13 @@ namespace detail
queue_op_status try_push(BOOST_THREAD_RV_REF(T) elem, chrono::duration<Rep,Period> const& dura);
private:
+ inline bool not_empty_and_time_reached(unique_lock<mutex>& lk) const;
+ inline bool not_empty_and_time_reached(lock_guard<mutex>& lk) const;
+
+ bool wait_to_pull(unique_lock<mutex>&);
+ template <class WClock, class Duration>
+ queue_op_status wait_to_pull_until(unique_lock<mutex>&, chrono::time_point<WClock, Duration> const& tp);
+
T pull(unique_lock<mutex>&);
T pull(lock_guard<mutex>&);
@@ -134,15 +136,6 @@ namespace detail
queue_op_status wait_pull(unique_lock<mutex>& lk, T& elem);
- bool wait_until_not_empty_time_reached_or_closed(unique_lock<mutex>&);
- T pull_when_time_reached(unique_lock<mutex>&);
- template <class WClock, class Duration>
- queue_op_status pull_when_time_reached_until(unique_lock<mutex>&, chrono::time_point<WClock,Duration> const& tp, T& elem);
- bool time_not_reached(unique_lock<mutex>&);
- bool time_not_reached(lock_guard<mutex>&);
- bool empty_or_time_not_reached(unique_lock<mutex>&);
- bool empty_or_time_not_reached(lock_guard<mutex>&);
-
sync_timed_queue(const sync_timed_queue&);
sync_timed_queue& operator=(const sync_timed_queue&);
sync_timed_queue(BOOST_THREAD_RV_REF(sync_timed_queue));
@@ -210,82 +203,55 @@ namespace detail
///////////////////////////
template <class T, class Clock, class TimePoint>
- bool sync_timed_queue<T, Clock, TimePoint>::time_not_reached(unique_lock<mutex>&)
+ bool sync_timed_queue<T, Clock, TimePoint>::not_empty_and_time_reached(unique_lock<mutex>& lk) const
{
- return super::data_.top().time_not_reached();
+ return ! super::empty(lk) && clock::now() >= super::data_.top().time;
}
template <class T, class Clock, class TimePoint>
- bool sync_timed_queue<T, Clock, TimePoint>::time_not_reached(lock_guard<mutex>&)
+ bool sync_timed_queue<T, Clock, TimePoint>::not_empty_and_time_reached(lock_guard<mutex>& lk) const
{
- return super::data_.top().time_not_reached();
+ return ! super::empty(lk) && clock::now() >= super::data_.top().time;
}
///////////////////////////
template <class T, class Clock, class TimePoint>
- bool sync_timed_queue<T, Clock, TimePoint>::wait_until_not_empty_time_reached_or_closed(unique_lock<mutex>& lk)
+ bool sync_timed_queue<T, Clock, TimePoint>::wait_to_pull(unique_lock<mutex>& lk)
{
for (;;)
{
- if (super::closed(lk)) return true;
- while (! super::empty(lk)) {
- if (! time_not_reached(lk)) return false;
- time_point tp = super::data_.top().time;
- super::not_empty_.wait_until(lk, tp);
- if (super::closed(lk)) return true;
- }
- if (super::closed(lk)) return true;
- super::not_empty_.wait(lk);
- }
- //return false;
- }
+ if (not_empty_and_time_reached(lk)) return false; // success
+ if (super::closed(lk)) return true; // closed
- ///////////////////////////
- template <class T, class Clock, class TimePoint>
- T sync_timed_queue<T, Clock, TimePoint>::pull_when_time_reached(unique_lock<mutex>& lk)
- {
- while (time_not_reached(lk))
- {
- super::throw_if_closed(lk);
- time_point tp = super::data_.top().time;
- super::not_empty_.wait_until(lk,tp);
- super::wait_until_not_empty(lk);
+ super::wait_until_not_empty_or_closed(lk);
+
+ if (not_empty_and_time_reached(lk)) return false; // success
+ if (super::closed(lk)) return true; // closed
+
+ const time_point tp(super::data_.top().time);
+ super::wait_until_closed_until(lk, tp);
}
- return pull(lk);
}
template <class T, class Clock, class TimePoint>
template <class WClock, class Duration>
- queue_op_status
- sync_timed_queue<T, Clock, TimePoint>::pull_when_time_reached_until(unique_lock<mutex>& lk, chrono::time_point<WClock, Duration> const& tp, T& elem)
+ queue_op_status sync_timed_queue<T, Clock, TimePoint>::wait_to_pull_until(unique_lock<mutex>& lk, chrono::time_point<WClock, Duration> const& tp)
{
- chrono::time_point<WClock, Duration> tpmin = (tp < super::data_.top().time) ? tp : super::data_.top().time;
- while (time_not_reached(lk))
+ for (;;)
{
- super::throw_if_closed(lk);
- if (cv_status::timeout == super::not_empty_.wait_until(lk, tpmin)) {
- if (time_not_reached(lk)) return queue_op_status::not_ready;
- return queue_op_status::timeout;
- }
- }
- pull(lk, elem);
- return queue_op_status::success;
- }
+ if (not_empty_and_time_reached(lk)) return queue_op_status::success;
+ if (super::closed(lk)) return queue_op_status::closed;
+ if (clock::now() >= tp) return super::empty(lk) ? queue_op_status::timeout : queue_op_status::not_ready;
- ///////////////////////////
- template <class T, class Clock, class TimePoint>
- bool sync_timed_queue<T, Clock, TimePoint>::empty_or_time_not_reached(unique_lock<mutex>& lk)
- {
- if ( super::empty(lk) ) return true;
- if ( time_not_reached(lk) ) return true;
- return false;
- }
- template <class T, class Clock, class TimePoint>
- bool sync_timed_queue<T, Clock, TimePoint>::empty_or_time_not_reached(lock_guard<mutex>& lk)
- {
- if ( super::empty(lk) ) return true;
- if ( time_not_reached(lk) ) return true;
- return false;
+ super::wait_until_not_empty_or_closed_until(lk, tp);
+
+ if (not_empty_and_time_reached(lk)) return queue_op_status::success;
+ if (super::closed(lk)) return queue_op_status::closed;
+ if (clock::now() >= tp) return super::empty(lk) ? queue_op_status::timeout : queue_op_status::not_ready;
+
+ const time_point tpmin(tp < super::data_.top().time ? tp : super::data_.top().time);
+ super::wait_until_closed_until(lk, tpmin);
+ }
}
///////////////////////////
@@ -312,8 +278,9 @@ namespace detail
T sync_timed_queue<T, Clock, TimePoint>::pull()
{
unique_lock<mutex> lk(super::mtx_);
- super::wait_until_not_empty(lk);
- return pull_when_time_reached(lk);
+ const bool has_been_closed = wait_to_pull(lk);
+ if (has_been_closed) super::throw_if_closed(lk);
+ return pull(lk);
}
///////////////////////////
@@ -341,8 +308,9 @@ namespace detail
void sync_timed_queue<T, Clock, TimePoint>::pull(T& elem)
{
unique_lock<mutex> lk(super::mtx_);
- super::wait_until_not_empty(lk);
- elem = pull_when_time_reached(lk);
+ const bool has_been_closed = wait_to_pull(lk);
+ if (has_been_closed) super::throw_if_closed(lk);
+ pull(lk, elem);
}
//////////////////////
@@ -352,10 +320,9 @@ namespace detail
sync_timed_queue<T, Clock, TimePoint>::pull_until(chrono::time_point<WClock, Duration> const& tp, T& elem)
{
unique_lock<mutex> lk(super::mtx_);
-
- if (queue_op_status::timeout == super::wait_until_not_empty_until(lk, tp))
- return queue_op_status::timeout;
- return pull_when_time_reached_until(lk, tp, elem);
+ const queue_op_status rc = wait_to_pull_until(lk, tp);
+ if (rc == queue_op_status::success) pull(lk, elem);
+ return rc;
}
//////////////////////
@@ -371,35 +338,26 @@ namespace detail
template <class T, class Clock, class TimePoint>
queue_op_status sync_timed_queue<T, Clock, TimePoint>::try_pull(unique_lock<mutex>& lk, T& elem)
{
- if ( super::empty(lk) )
+ if (not_empty_and_time_reached(lk))
{
- if (super::closed(lk)) return queue_op_status::closed;
- return queue_op_status::empty;
- }
- if ( time_not_reached(lk) )
- {
- if (super::closed(lk)) return queue_op_status::closed;
- return queue_op_status::not_ready;
+ pull(lk, elem);
+ return queue_op_status::success;
}
-
- pull(lk, elem);
- return queue_op_status::success;
+ if (super::closed(lk)) return queue_op_status::closed;
+ if (super::empty(lk)) return queue_op_status::empty;
+ return queue_op_status::not_ready;
}
template <class T, class Clock, class TimePoint>
queue_op_status sync_timed_queue<T, Clock, TimePoint>::try_pull(lock_guard<mutex>& lk, T& elem)
{
- if ( super::empty(lk) )
+ if (not_empty_and_time_reached(lk))
{
- if (super::closed(lk)) return queue_op_status::closed;
- return queue_op_status::empty;
- }
- if ( time_not_reached(lk) )
- {
- if (super::closed(lk)) return queue_op_status::closed;
- return queue_op_status::not_ready;
+ pull(lk, elem);
+ return queue_op_status::success;
}
- pull(lk, elem);
- return queue_op_status::success;
+ if (super::closed(lk)) return queue_op_status::closed;
+ if (super::empty(lk)) return queue_op_status::empty;
+ return queue_op_status::not_ready;
}
template <class T, class Clock, class TimePoint>
@@ -413,11 +371,7 @@ namespace detail
template <class T, class Clock, class TimePoint>
queue_op_status sync_timed_queue<T, Clock, TimePoint>::wait_pull(unique_lock<mutex>& lk, T& elem)
{
- if (super::empty(lk))
- {
- if (super::closed(lk)) return queue_op_status::closed;
- }
- bool has_been_closed = wait_until_not_empty_time_reached_or_closed(lk);
+ const bool has_been_closed = wait_to_pull(lk);
if (has_been_closed) return queue_op_status::closed;
pull(lk, elem);
return queue_op_status::success;
@@ -430,26 +384,6 @@ namespace detail
return wait_pull(lk, elem);
}
-// ///////////////////////////
-// template <class T, class Clock, class TimePoint>
-// queue_op_status sync_timed_queue<T, Clock, TimePoint>::wait_pull(unique_lock<mutex> &lk, T& elem)
-// {
-// if (super::empty(lk))
-// {
-// if (super::closed(lk)) return queue_op_status::closed;
-// }
-// bool has_been_closed = super::wait_until_not_empty_or_closed(lk);
-// if (has_been_closed) return queue_op_status::closed;
-// pull(lk, elem);
-// return queue_op_status::success;
-// }
-// template <class T, class Clock, class TimePoint>
-// queue_op_status sync_timed_queue<T, Clock, TimePoint>::wait_pull(T& elem)
-// {
-// unique_lock<mutex> lk(super::mtx_);
-// return wait_pull(lk, elem);
-// }
-
///////////////////////////
template <class T, class Clock, class TimePoint>
queue_op_status sync_timed_queue<T, Clock, TimePoint>::nonblocking_pull(T& elem)