summaryrefslogtreecommitdiff
path: root/boost/thread/concurrent_queues
diff options
context:
space:
mode:
Diffstat (limited to 'boost/thread/concurrent_queues')
-rw-r--r--boost/thread/concurrent_queues/detail/sync_deque_base.hpp28
-rw-r--r--boost/thread/concurrent_queues/detail/sync_queue_base.hpp28
-rw-r--r--boost/thread/concurrent_queues/sync_deque.hpp6
-rw-r--r--boost/thread/concurrent_queues/sync_priority_queue.hpp8
-rw-r--r--boost/thread/concurrent_queues/sync_queue.hpp6
-rw-r--r--boost/thread/concurrent_queues/sync_timed_queue.hpp94
6 files changed, 106 insertions, 64 deletions
diff --git a/boost/thread/concurrent_queues/detail/sync_deque_base.hpp b/boost/thread/concurrent_queues/detail/sync_deque_base.hpp
index e3ecad15de..704efc70c5 100644
--- a/boost/thread/concurrent_queues/detail/sync_deque_base.hpp
+++ b/boost/thread/concurrent_queues/detail/sync_deque_base.hpp
@@ -63,7 +63,7 @@ namespace detail
protected:
mutable mutex mtx_;
- condition_variable not_empty_;
+ condition_variable cond_;
underlying_queue_type data_;
bool closed_;
@@ -91,16 +91,14 @@ namespace detail
inline bool wait_until_not_empty_or_closed(unique_lock<mutex>& lk);
template <class WClock, class Duration>
queue_op_status wait_until_not_empty_or_closed_until(unique_lock<mutex>& lk, chrono::time_point<WClock,Duration> const&tp);
- template <class WClock, class Duration>
- queue_op_status wait_until_closed_until(unique_lock<mutex>& lk, chrono::time_point<WClock,Duration> const&tp);
- inline void notify_not_empty_if_needed(unique_lock<mutex>& )
+ inline void notify_elem_added(unique_lock<mutex>& )
{
- not_empty_.notify_one();
+ cond_.notify_all();
}
- inline void notify_not_empty_if_needed(lock_guard<mutex>& )
+ inline void notify_elem_added(lock_guard<mutex>& )
{
- not_empty_.notify_one();
+ cond_.notify_all();
}
};
@@ -124,7 +122,7 @@ namespace detail
lock_guard<mutex> lk(mtx_);
closed_ = true;
}
- not_empty_.notify_all();
+ cond_.notify_all();
}
template <class ValueType, class Queue>
@@ -189,7 +187,7 @@ namespace detail
template <class ValueType, class Queue>
bool sync_deque_base<ValueType, Queue>::wait_until_not_empty_or_closed(unique_lock<mutex>& lk)
{
- not_empty_.wait(lk, boost::bind(&sync_deque_base<ValueType, Queue>::not_empty_or_closed, boost::ref(*this), boost::ref(lk)));
+ cond_.wait(lk, boost::bind(&sync_deque_base<ValueType, Queue>::not_empty_or_closed, boost::ref(*this), boost::ref(lk)));
if (! empty(lk)) return false; // success
return true; // closed
}
@@ -198,22 +196,12 @@ namespace detail
template <class WClock, class Duration>
queue_op_status sync_deque_base<ValueType, Queue>::wait_until_not_empty_or_closed_until(unique_lock<mutex>& lk, chrono::time_point<WClock,Duration> const&tp)
{
- if (! not_empty_.wait_until(lk, tp, boost::bind(&sync_deque_base<ValueType, Queue>::not_empty_or_closed, boost::ref(*this), boost::ref(lk))))
+ if (! cond_.wait_until(lk, tp, boost::bind(&sync_deque_base<ValueType, Queue>::not_empty_or_closed, boost::ref(*this), boost::ref(lk))))
return queue_op_status::timeout;
if (! empty(lk)) return queue_op_status::success;
return queue_op_status::closed;
}
- template <class ValueType, class Queue>
- template <class WClock, class Duration>
- queue_op_status sync_deque_base<ValueType, Queue>::wait_until_closed_until(unique_lock<mutex>& lk, chrono::time_point<WClock,Duration> const&tp)
- {
- bool (sync_queue_base<ValueType, Queue>::*closed_function_ptr)(unique_lock<mutex>&) const = &sync_queue_base<ValueType, Queue>::closed;
- if (! not_empty_.wait_until(lk, tp, boost::bind(closed_function_ptr, boost::ref(*this), boost::ref(lk))))
- return queue_op_status::timeout;
- return queue_op_status::closed;
- }
-
} // detail
} // concurrent
} // boost
diff --git a/boost/thread/concurrent_queues/detail/sync_queue_base.hpp b/boost/thread/concurrent_queues/detail/sync_queue_base.hpp
index c570da9505..26d4dcd14a 100644
--- a/boost/thread/concurrent_queues/detail/sync_queue_base.hpp
+++ b/boost/thread/concurrent_queues/detail/sync_queue_base.hpp
@@ -63,7 +63,7 @@ namespace detail
protected:
mutable mutex mtx_;
- condition_variable not_empty_;
+ condition_variable cond_;
underlying_queue_type data_;
bool closed_;
@@ -91,16 +91,14 @@ namespace detail
inline bool wait_until_not_empty_or_closed(unique_lock<mutex>& lk);
template <class WClock, class Duration>
queue_op_status wait_until_not_empty_or_closed_until(unique_lock<mutex>& lk, chrono::time_point<WClock,Duration> const&tp);
- template <class WClock, class Duration>
- queue_op_status wait_until_closed_until(unique_lock<mutex>& lk, chrono::time_point<WClock,Duration> const&tp);
- inline void notify_not_empty_if_needed(unique_lock<mutex>& )
+ inline void notify_elem_added(unique_lock<mutex>& )
{
- not_empty_.notify_one();
+ cond_.notify_all();
}
- inline void notify_not_empty_if_needed(lock_guard<mutex>& )
+ inline void notify_elem_added(lock_guard<mutex>& )
{
- not_empty_.notify_one();
+ cond_.notify_all();
}
};
@@ -124,7 +122,7 @@ namespace detail
lock_guard<mutex> lk(mtx_);
closed_ = true;
}
- not_empty_.notify_all();
+ cond_.notify_all();
}
template <class ValueType, class Queue>
@@ -189,7 +187,7 @@ namespace detail
template <class ValueType, class Queue>
bool sync_queue_base<ValueType, Queue>::wait_until_not_empty_or_closed(unique_lock<mutex>& lk)
{
- not_empty_.wait(lk, boost::bind(&sync_queue_base<ValueType, Queue>::not_empty_or_closed, boost::ref(*this), boost::ref(lk)));
+ cond_.wait(lk, boost::bind(&sync_queue_base<ValueType, Queue>::not_empty_or_closed, boost::ref(*this), boost::ref(lk)));
if (! empty(lk)) return false; // success
return true; // closed
}
@@ -198,22 +196,12 @@ namespace detail
template <class WClock, class Duration>
queue_op_status sync_queue_base<ValueType, Queue>::wait_until_not_empty_or_closed_until(unique_lock<mutex>& lk, chrono::time_point<WClock,Duration> const&tp)
{
- if (! not_empty_.wait_until(lk, tp, boost::bind(&sync_queue_base<ValueType, Queue>::not_empty_or_closed, boost::ref(*this), boost::ref(lk))))
+ if (! cond_.wait_until(lk, tp, boost::bind(&sync_queue_base<ValueType, Queue>::not_empty_or_closed, boost::ref(*this), boost::ref(lk))))
return queue_op_status::timeout;
if (! empty(lk)) return queue_op_status::success;
return queue_op_status::closed;
}
- template <class ValueType, class Queue>
- template <class WClock, class Duration>
- queue_op_status sync_queue_base<ValueType, Queue>::wait_until_closed_until(unique_lock<mutex>& lk, chrono::time_point<WClock,Duration> const&tp)
- {
- bool (sync_queue_base<ValueType, Queue>::*closed_function_ptr)(unique_lock<mutex>&) const = &sync_queue_base<ValueType, Queue>::closed;
- if (! not_empty_.wait_until(lk, tp, boost::bind(closed_function_ptr, boost::ref(*this), boost::ref(lk))))
- return queue_op_status::timeout;
- return queue_op_status::closed;
- }
-
} // detail
} // concurrent
} // boost
diff --git a/boost/thread/concurrent_queues/sync_deque.hpp b/boost/thread/concurrent_queues/sync_deque.hpp
index c6159c4be0..95e36409bd 100644
--- a/boost/thread/concurrent_queues/sync_deque.hpp
+++ b/boost/thread/concurrent_queues/sync_deque.hpp
@@ -92,13 +92,13 @@ namespace concurrent
inline void push_back(const value_type& elem, unique_lock<mutex>& lk)
{
super::data_.push_back(elem);
- super::notify_not_empty_if_needed(lk);
+ super::notify_elem_added(lk);
}
inline void push_back(BOOST_THREAD_RV_REF(value_type) elem, unique_lock<mutex>& lk)
{
super::data_.push_back(boost::move(elem));
- super::notify_not_empty_if_needed(lk);
+ super::notify_elem_added(lk);
}
};
@@ -122,7 +122,7 @@ namespace concurrent
// {
// data_.push(boost::move(*cur));;
// }
-// notify_not_empty_if_needed(lk);
+// notify_elem_added(lk);
// }
// catch (...)
// {
diff --git a/boost/thread/concurrent_queues/sync_priority_queue.hpp b/boost/thread/concurrent_queues/sync_priority_queue.hpp
index 2c6881f90f..f5676335c7 100644
--- a/boost/thread/concurrent_queues/sync_priority_queue.hpp
+++ b/boost/thread/concurrent_queues/sync_priority_queue.hpp
@@ -174,14 +174,14 @@ namespace concurrent
{
super::throw_if_closed(lk);
super::data_.push(elem);
- super::notify_not_empty_if_needed(lk);
+ super::notify_elem_added(lk);
}
template <class T, class Container,class Cmp>
void sync_priority_queue<T,Container,Cmp>::push(lock_guard<mutex>& lk, const T& elem)
{
super::throw_if_closed(lk);
super::data_.push(elem);
- super::notify_not_empty_if_needed(lk);
+ super::notify_elem_added(lk);
}
template <class T, class Container,class Cmp>
void sync_priority_queue<T,Container,Cmp>::push(const T& elem)
@@ -196,14 +196,14 @@ namespace concurrent
{
super::throw_if_closed(lk);
super::data_.push(boost::move(elem));
- super::notify_not_empty_if_needed(lk);
+ super::notify_elem_added(lk);
}
template <class T, class Container,class Cmp>
void sync_priority_queue<T,Container,Cmp>::push(lock_guard<mutex>& lk, BOOST_THREAD_RV_REF(T) elem)
{
super::throw_if_closed(lk);
super::data_.push(boost::move(elem));
- super::notify_not_empty_if_needed(lk);
+ super::notify_elem_added(lk);
}
template <class T, class Container,class Cmp>
void sync_priority_queue<T,Container,Cmp>::push(BOOST_THREAD_RV_REF(T) elem)
diff --git a/boost/thread/concurrent_queues/sync_queue.hpp b/boost/thread/concurrent_queues/sync_queue.hpp
index b36b57e607..ac0fe68d77 100644
--- a/boost/thread/concurrent_queues/sync_queue.hpp
+++ b/boost/thread/concurrent_queues/sync_queue.hpp
@@ -92,13 +92,13 @@ namespace concurrent
inline void push(const value_type& elem, unique_lock<mutex>& lk)
{
super::data_.push_back(elem);
- super::notify_not_empty_if_needed(lk);
+ super::notify_elem_added(lk);
}
inline void push(BOOST_THREAD_RV_REF(value_type) elem, unique_lock<mutex>& lk)
{
super::data_.push_back(boost::move(elem));
- super::notify_not_empty_if_needed(lk);
+ super::notify_elem_added(lk);
}
};
@@ -122,7 +122,7 @@ namespace concurrent
// {
// data_.push(boost::move(*cur));;
// }
-// notify_not_empty_if_needed(lk);
+// notify_elem_added(lk);
// }
// catch (...)
// {
diff --git a/boost/thread/concurrent_queues/sync_timed_queue.hpp b/boost/thread/concurrent_queues/sync_timed_queue.hpp
index a4394c0729..fd8d5a3c46 100644
--- a/boost/thread/concurrent_queues/sync_timed_queue.hpp
+++ b/boost/thread/concurrent_queues/sync_timed_queue.hpp
@@ -16,6 +16,8 @@
#include <boost/chrono/system_clocks.hpp>
#include <boost/chrono/chrono_io.hpp>
+#include <algorithm> // std::min
+
#include <boost/config/abi_prefix.hpp>
namespace boost
@@ -59,6 +61,45 @@ namespace detail
}
}; //end struct
+ template <class Duration>
+ chrono::time_point<chrono::steady_clock,Duration>
+ limit_timepoint(chrono::time_point<chrono::steady_clock,Duration> const& tp)
+ {
+ // Clock == chrono::steady_clock
+ return tp;
+ }
+
+ template <class Clock, class Duration>
+ chrono::time_point<Clock,Duration>
+ limit_timepoint(chrono::time_point<Clock,Duration> const& tp)
+ {
+ // Clock != chrono::steady_clock
+ // The system time may jump while wait_until() is waiting. To compensate for this and time out near
+ // the correct time, we limit how long wait_until() can wait before going around the loop again.
+ const chrono::time_point<Clock,Duration> tpmax(chrono::time_point_cast<Duration>(Clock::now() + chrono::milliseconds(BOOST_THREAD_POLL_INTERVAL_MILLISECONDS)));
+ return (std::min)(tp, tpmax);
+ }
+
+ template <class Duration>
+ chrono::steady_clock::time_point
+ convert_to_steady_clock_timepoint(chrono::time_point<chrono::steady_clock,Duration> const& tp)
+ {
+ // Clock == chrono::steady_clock
+ return chrono::time_point_cast<chrono::steady_clock::duration>(tp);
+ }
+
+ template <class Clock, class Duration>
+ chrono::steady_clock::time_point
+ convert_to_steady_clock_timepoint(chrono::time_point<Clock,Duration> const& tp)
+ {
+ // Clock != chrono::steady_clock
+ // The system time may jump while wait_until() is waiting. To compensate for this and time out near
+ // the correct time, we limit how long wait_until() can wait before going around the loop again.
+ const chrono::steady_clock::duration dura(chrono::duration_cast<chrono::steady_clock::duration>(tp - Clock::now()));
+ const chrono::steady_clock::duration duramax(chrono::milliseconds(BOOST_THREAD_POLL_INTERVAL_MILLISECONDS));
+ return chrono::steady_clock::now() + (std::min)(dura, duramax);
+ }
+
} //end detail namespace
template <class T, class Clock = chrono::steady_clock, class TimePoint=typename Clock::time_point>
@@ -88,8 +129,8 @@ namespace detail
T pull();
void pull(T& elem);
- template <class WClock, class Duration>
- queue_op_status pull_until(chrono::time_point<WClock,Duration> const& tp, T& elem);
+ template <class Duration>
+ queue_op_status pull_until(chrono::time_point<clock,Duration> const& tp, T& elem);
template <class Rep, class Period>
queue_op_status pull_for(chrono::duration<Rep,Period> const& dura, T& elem);
@@ -122,8 +163,9 @@ namespace detail
inline bool not_empty_and_time_reached(lock_guard<mutex>& lk) const;
bool wait_to_pull(unique_lock<mutex>&);
- template <class WClock, class Duration>
- queue_op_status wait_to_pull_until(unique_lock<mutex>&, chrono::time_point<WClock, Duration> const& tp);
+ queue_op_status wait_to_pull_until(unique_lock<mutex>&, TimePoint const& tp);
+ template <class Rep, class Period>
+ queue_op_status wait_to_pull_for(unique_lock<mutex>& lk, chrono::duration<Rep,Period> const& dura);
T pull(unique_lock<mutex>&);
T pull(lock_guard<mutex>&);
@@ -228,14 +270,13 @@ namespace detail
if (not_empty_and_time_reached(lk)) return false; // success
if (super::closed(lk)) return true; // closed
- const time_point tp(super::data_.top().time);
- super::wait_until_closed_until(lk, tp);
+ const time_point tpmin(detail::limit_timepoint(super::data_.top().time));
+ super::cond_.wait_until(lk, tpmin);
}
}
template <class T, class Clock, class TimePoint>
- template <class WClock, class Duration>
- queue_op_status sync_timed_queue<T, Clock, TimePoint>::wait_to_pull_until(unique_lock<mutex>& lk, chrono::time_point<WClock, Duration> const& tp)
+ queue_op_status sync_timed_queue<T, Clock, TimePoint>::wait_to_pull_until(unique_lock<mutex>& lk, TimePoint const& tp)
{
for (;;)
{
@@ -249,8 +290,30 @@ namespace detail
if (super::closed(lk)) return queue_op_status::closed;
if (clock::now() >= tp) return super::empty(lk) ? queue_op_status::timeout : queue_op_status::not_ready;
- const time_point tpmin(tp < super::data_.top().time ? tp : super::data_.top().time);
- super::wait_until_closed_until(lk, tpmin);
+ const time_point tpmin((std::min)(tp, detail::limit_timepoint(super::data_.top().time)));
+ super::cond_.wait_until(lk, tpmin);
+ }
+ }
+
+ template <class T, class Clock, class TimePoint>
+ template <class Rep, class Period>
+ queue_op_status sync_timed_queue<T, Clock, TimePoint>::wait_to_pull_for(unique_lock<mutex>& lk, chrono::duration<Rep,Period> const& dura)
+ {
+ const chrono::steady_clock::time_point tp(chrono::steady_clock::now() + chrono::duration_cast<chrono::steady_clock::duration>(dura));
+ for (;;)
+ {
+ if (not_empty_and_time_reached(lk)) return queue_op_status::success;
+ if (super::closed(lk)) return queue_op_status::closed;
+ if (chrono::steady_clock::now() >= tp) return super::empty(lk) ? queue_op_status::timeout : queue_op_status::not_ready;
+
+ super::wait_until_not_empty_or_closed_until(lk, tp);
+
+ if (not_empty_and_time_reached(lk)) return queue_op_status::success;
+ if (super::closed(lk)) return queue_op_status::closed;
+ if (chrono::steady_clock::now() >= tp) return super::empty(lk) ? queue_op_status::timeout : queue_op_status::not_ready;
+
+ const chrono::steady_clock::time_point tpmin((std::min)(tp, detail::convert_to_steady_clock_timepoint(super::data_.top().time)));
+ super::cond_.wait_until(lk, tpmin);
}
}
@@ -315,12 +378,12 @@ namespace detail
//////////////////////
template <class T, class Clock, class TimePoint>
- template <class WClock, class Duration>
+ template <class Duration>
queue_op_status
- sync_timed_queue<T, Clock, TimePoint>::pull_until(chrono::time_point<WClock, Duration> const& tp, T& elem)
+ sync_timed_queue<T, Clock, TimePoint>::pull_until(chrono::time_point<clock,Duration> const& tp, T& elem)
{
unique_lock<mutex> lk(super::mtx_);
- const queue_op_status rc = wait_to_pull_until(lk, tp);
+ const queue_op_status rc = wait_to_pull_until(lk, chrono::time_point_cast<typename time_point::duration>(tp));
if (rc == queue_op_status::success) pull(lk, elem);
return rc;
}
@@ -331,7 +394,10 @@ namespace detail
queue_op_status
sync_timed_queue<T, Clock, TimePoint>::pull_for(chrono::duration<Rep,Period> const& dura, T& elem)
{
- return pull_until(chrono::steady_clock::now() + dura, elem);
+ unique_lock<mutex> lk(super::mtx_);
+ const queue_op_status rc = wait_to_pull_for(lk, dura);
+ if (rc == queue_op_status::success) pull(lk, elem);
+ return rc;
}
///////////////////////////