diff options
Diffstat (limited to 'boost/interprocess/sync/windows/condition.hpp')
-rw-r--r-- | boost/interprocess/sync/windows/condition.hpp | 315 |
1 files changed, 50 insertions, 265 deletions
diff --git a/boost/interprocess/sync/windows/condition.hpp b/boost/interprocess/sync/windows/condition.hpp index 167b8730c3..9695c21044 100644 --- a/boost/interprocess/sync/windows/condition.hpp +++ b/boost/interprocess/sync/windows/condition.hpp @@ -20,133 +20,8 @@ #include <boost/interprocess/exceptions.hpp> #include <boost/interprocess/sync/windows/semaphore.hpp> #include <boost/interprocess/sync/windows/mutex.hpp> -#include <boost/cstdint.hpp> -#include <limits> - -//////////////////////////////////////////////////////////////////////// -//////////////////////////////////////////////////////////////////////// -//////////////////////////////////////////////////////////////////////// -// -// Condition variable algorithm taken from pthreads-win32 discussion. -// -// The algorithm was developed by Alexander Terekhov in colaboration with -// Louis Thomas. -// -// Algorithm 8a / IMPL_SEM,UNBLOCK_STRATEGY == UNBLOCK_ALL -// -// semBlockLock - bin.semaphore -// semBlockQueue - semaphore -// mtxExternal - mutex or CS -// mtxUnblockLock - mutex or CS -// nWaitersGone - int -// nWaitersBlocked - int -// nWaitersToUnblock - int -// -// wait( timeout ) { -// -// [auto: register int result ] // error checking omitted -// [auto: register int nSignalsWasLeft ] -// [auto: register int nWaitersWasGone ] -// -// sem_wait( semBlockLock ); -// nWaitersBlocked++; -// sem_post( semBlockLock ); -// -// unlock( mtxExternal ); -// bTimedOut = sem_wait( semBlockQueue,timeout ); -// -// lock( mtxUnblockLock ); -// if ( 0 != (nSignalsWasLeft = nWaitersToUnblock) ) { -// if ( bTimedOut ) { // timeout (or canceled) -// if ( 0 != nWaitersBlocked ) { -// nWaitersBlocked--; -// } -// else { -// nWaitersGone++; // count spurious wakeups. -// } -// } -// if ( 0 == --nWaitersToUnblock ) { -// if ( 0 != nWaitersBlocked ) { -// sem_post( semBlockLock ); // open the gate. -// nSignalsWasLeft = 0; // do not open the gate -// // below again. -// } -// else if ( 0 != (nWaitersWasGone = nWaitersGone) ) { -// nWaitersGone = 0; -// } -// } -// } -// else if ( INT_MAX/2 == ++nWaitersGone ) { // timeout/canceled or -// // spurious semaphore :-) -// sem_wait( semBlockLock ); -// nWaitersBlocked -= nWaitersGone; // something is going on here -// // - test of timeouts? :-) -// sem_post( semBlockLock ); -// nWaitersGone = 0; -// } -// unlock( mtxUnblockLock ); -// -// if ( 1 == nSignalsWasLeft ) { -// if ( 0 != nWaitersWasGone ) { -// // sem_adjust( semBlockQueue,-nWaitersWasGone ); -// while ( nWaitersWasGone-- ) { -// sem_wait( semBlockQueue ); // better now than spurious later -// } -// } sem_post( semBlockLock ); // open the gate -// } -// -// lock( mtxExternal ); -// -// return ( bTimedOut ) ? ETIMEOUT : 0; -// } -// -// signal(bAll) { -// -// [auto: register int result ] -// [auto: register int nSignalsToIssue] -// -// lock( mtxUnblockLock ); -// -// if ( 0 != nWaitersToUnblock ) { // the gate is closed!!! -// if ( 0 == nWaitersBlocked ) { // NO-OP -// return unlock( mtxUnblockLock ); -// } -// if (bAll) { -// nWaitersToUnblock += nSignalsToIssue=nWaitersBlocked; -// nWaitersBlocked = 0; -// } -// else { -// nSignalsToIssue = 1; -// nWaitersToUnblock++; -// nWaitersBlocked--; -// } -// } -// else if ( nWaitersBlocked > nWaitersGone ) { // HARMLESS RACE CONDITION! -// sem_wait( semBlockLock ); // close the gate -// if ( 0 != nWaitersGone ) { -// nWaitersBlocked -= nWaitersGone; -// nWaitersGone = 0; -// } -// if (bAll) { -// nSignalsToIssue = nWaitersToUnblock = nWaitersBlocked; -// nWaitersBlocked = 0; -// } -// else { -// nSignalsToIssue = nWaitersToUnblock = 1; -// nWaitersBlocked--; -// } -// } -// else { // NO-OP -// return unlock( mtxUnblockLock ); -// } -// -// unlock( mtxUnblockLock ); -// sem_post( semBlockQueue,nSignalsToIssue ); -// return result; -// } -//////////////////////////////////////////////////////////////////////// -//////////////////////////////////////////////////////////////////////// -//////////////////////////////////////////////////////////////////////// +#include <boost/interprocess/sync/detail/condition_algorithm_8a.hpp> + namespace boost { namespace interprocess { @@ -156,6 +31,7 @@ class windows_condition { windows_condition(const windows_condition &); windows_condition &operator=(const windows_condition &); + public: windows_condition(); ~windows_condition(); @@ -217,162 +93,71 @@ class windows_condition private: - template<class InterprocessMutex> - bool do_timed_wait(bool timeout_enabled, const boost::posix_time::ptime &abs_time, InterprocessMutex &mut); - void do_signal (bool broadcast); - - boost::int32_t m_nwaiters_blocked; - boost::int32_t m_nwaiters_gone; - boost::int32_t m_nwaiters_to_unblock; - windows_semaphore m_sem_block_queue; - windows_semaphore m_sem_block_lock; - windows_mutex m_mtx_unblock_lock; + struct condition_data + { + typedef boost::int32_t integer_type; + typedef windows_semaphore semaphore_type; + typedef windows_mutex mutex_type; + + condition_data() + : m_nwaiters_blocked(0) + , m_nwaiters_gone(0) + , m_nwaiters_to_unblock(0) + , m_sem_block_queue(0) + , m_sem_block_lock(1) + , m_mtx_unblock_lock() + {} + + integer_type &get_nwaiters_blocked() + { return m_nwaiters_blocked; } + + integer_type &get_nwaiters_gone() + { return m_nwaiters_gone; } + + integer_type &get_nwaiters_to_unblock() + { return m_nwaiters_to_unblock; } + + semaphore_type &get_sem_block_queue() + { return m_sem_block_queue; } + + semaphore_type &get_sem_block_lock() + { return m_sem_block_lock; } + + mutex_type &get_mtx_unblock_lock() + { return m_mtx_unblock_lock; } + + boost::int32_t m_nwaiters_blocked; + boost::int32_t m_nwaiters_gone; + boost::int32_t m_nwaiters_to_unblock; + windows_semaphore m_sem_block_queue; + windows_semaphore m_sem_block_lock; + windows_mutex m_mtx_unblock_lock; + } m_condition_data; + + typedef condition_algorithm_8a<condition_data> algorithm_type; }; inline windows_condition::windows_condition() - : m_nwaiters_blocked(0) - , m_nwaiters_gone(0) - , m_nwaiters_to_unblock(0) - , m_sem_block_queue(0) - , m_sem_block_lock(1) - , m_mtx_unblock_lock() + : m_condition_data() {} inline windows_condition::~windows_condition() {} inline void windows_condition::notify_one() -{ this->do_signal(false); } +{ algorithm_type::signal(m_condition_data, false); } inline void windows_condition::notify_all() -{ this->do_signal(true); } - -inline void windows_condition::do_signal(bool broadcast) -{ - boost::int32_t nsignals_to_issue; - - { - scoped_lock<windows_mutex> locker(m_mtx_unblock_lock); - - if ( 0 != m_nwaiters_to_unblock ) { // the gate is closed!!! - if ( 0 == m_nwaiters_blocked ) { // NO-OP - //locker's destructor triggers m_mtx_unblock_lock.unlock() - return; - } - if (broadcast) { - m_nwaiters_to_unblock += nsignals_to_issue = m_nwaiters_blocked; - m_nwaiters_blocked = 0; - } - else { - nsignals_to_issue = 1; - m_nwaiters_to_unblock++; - m_nwaiters_blocked--; - } - } - else if ( m_nwaiters_blocked > m_nwaiters_gone ) { // HARMLESS RACE CONDITION! - m_sem_block_lock.wait(); // close the gate - if ( 0 != m_nwaiters_gone ) { - m_nwaiters_blocked -= m_nwaiters_gone; - m_nwaiters_gone = 0; - } - if (broadcast) { - nsignals_to_issue = m_nwaiters_to_unblock = m_nwaiters_blocked; - m_nwaiters_blocked = 0; - } - else { - nsignals_to_issue = m_nwaiters_to_unblock = 1; - m_nwaiters_blocked--; - } - } - else { // NO-OP - //locker's destructor triggers m_mtx_unblock_lock.unlock() - return; - } - //locker's destructor triggers m_mtx_unblock_lock.unlock() - } - m_sem_block_queue.post(nsignals_to_issue); -} +{ algorithm_type::signal(m_condition_data, true); } template<class InterprocessMutex> inline void windows_condition::do_wait(InterprocessMutex &mut) -{ this->do_timed_wait(false, boost::posix_time::ptime(), mut); } +{ algorithm_type::wait(m_condition_data, false, boost::posix_time::ptime(), mut); } template<class InterprocessMutex> inline bool windows_condition::do_timed_wait (const boost::posix_time::ptime &abs_time, InterprocessMutex &mut) -{ return this->do_timed_wait(true, abs_time, mut); } - -template<class InterprocessMutex> -inline bool windows_condition::do_timed_wait - (bool tout_enabled, const boost::posix_time::ptime &abs_time, InterprocessMutex &mtxExternal) -{ - //Initialize to avoid warnings - boost::int32_t nsignals_was_left = 0; - boost::int32_t nwaiters_was_gone = 0; - - m_sem_block_lock.wait(); - ++m_nwaiters_blocked; - m_sem_block_lock.post(); - - struct scoped_unlock - { - InterprocessMutex & mut; - scoped_unlock(InterprocessMutex & m) - : mut(m) - { m.unlock(); } - - ~scoped_unlock() - { mut.lock(); } - } unlocker(mtxExternal); - - - bool bTimedOut = tout_enabled ? !m_sem_block_queue.timed_wait(abs_time) : (m_sem_block_queue.wait(), false); - - { - scoped_lock<windows_mutex> locker(m_mtx_unblock_lock); - if ( 0 != (nsignals_was_left = m_nwaiters_to_unblock) ) { - if ( bTimedOut ) { // timeout (or canceled) - if ( 0 != m_nwaiters_blocked ) { - m_nwaiters_blocked--; - } - else { - m_nwaiters_gone++; // count spurious wakeups. - } - } - if ( 0 == --m_nwaiters_to_unblock ) { - if ( 0 != m_nwaiters_blocked ) { - m_sem_block_lock.post(); // open the gate. - nsignals_was_left = 0; // do not open the gate below again. - } - else if ( 0 != (nwaiters_was_gone = m_nwaiters_gone) ) { - m_nwaiters_gone = 0; - } - } - } - else if ( (std::numeric_limits<boost::int32_t>::max)()/2 - == ++m_nwaiters_gone ) { // timeout/canceled or spurious semaphore :-) - m_sem_block_lock.wait(); - m_nwaiters_blocked -= m_nwaiters_gone; // something is going on here - test of timeouts? :-) - m_sem_block_lock.post(); - m_nwaiters_gone = 0; - } - //locker's destructor triggers m_mtx_unblock_lock.unlock() - } - - if ( 1 == nsignals_was_left ) { - if ( 0 != nwaiters_was_gone ) { - // sem_adjust( m_sem_block_queue,-nwaiters_was_gone ); - while ( nwaiters_was_gone-- ) { - m_sem_block_queue.wait(); // better now than spurious later - } - } - m_sem_block_lock.post(); // open the gate - } - - //mtxExternal.lock(); called from unlocker - - return ( bTimedOut ) ? false : true; -} +{ return algorithm_type::wait(m_condition_data, true, abs_time, mut); } } //namespace ipcdetail } //namespace interprocess |