diff options
Diffstat (limited to 'boost/interprocess/sync/detail/condition_algorithm_8a.hpp')
-rw-r--r-- | boost/interprocess/sync/detail/condition_algorithm_8a.hpp | 316 |
1 files changed, 316 insertions, 0 deletions
diff --git a/boost/interprocess/sync/detail/condition_algorithm_8a.hpp b/boost/interprocess/sync/detail/condition_algorithm_8a.hpp new file mode 100644 index 0000000000..eaad671cdf --- /dev/null +++ b/boost/interprocess/sync/detail/condition_algorithm_8a.hpp @@ -0,0 +1,316 @@ +////////////////////////////////////////////////////////////////////////////// +// +// (C) Copyright Ion Gaztanaga 2005-2011. Distributed under the Boost +// Software License, Version 1.0. (See accompanying file +// LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// +// See http://www.boost.org/libs/interprocess for documentation. +// +////////////////////////////////////////////////////////////////////////////// + +#ifndef BOOST_INTERPROCESS_DETAIL_CONDITION_ALGORITHM_8A_HPP +#define BOOST_INTERPROCESS_DETAIL_CONDITION_ALGORITHM_8A_HPP + +#include <boost/interprocess/detail/config_begin.hpp> +#include <boost/interprocess/detail/workaround.hpp> +#include <boost/interprocess/sync/scoped_lock.hpp> +#include <limits> + +namespace boost { +namespace interprocess { +namespace ipcdetail { + +//////////////////////////////////////////////////////////////////////// +//////////////////////////////////////////////////////////////////////// +//////////////////////////////////////////////////////////////////////// +// +// Condition variable algorithm taken from pthreads-win32 discussion. +// +// The algorithm was developed by Alexander Terekhov in colaboration with +// Louis Thomas. +// +// Algorithm 8a / IMPL_SEM,UNBLOCK_STRATEGY == UNBLOCK_ALL +// +// semBlockLock - bin.semaphore +// semBlockQueue - semaphore +// mtxExternal - mutex or CS +// mtxUnblockLock - mutex or CS +// nWaitersGone - int +// nWaitersBlocked - int +// nWaitersToUnblock - int +// +// wait( timeout ) { +// +// [auto: register int result ] // error checking omitted +// [auto: register int nSignalsWasLeft ] +// [auto: register int nWaitersWasGone ] +// +// sem_wait( semBlockLock ); +// nWaitersBlocked++; +// sem_post( semBlockLock ); +// +// unlock( mtxExternal ); +// bTimedOut = sem_wait( semBlockQueue,timeout ); +// +// lock( mtxUnblockLock ); +// if ( 0 != (nSignalsWasLeft = nWaitersToUnblock) ) { +// if ( bTimedOut ) { // timeout (or canceled) +// if ( 0 != nWaitersBlocked ) { +// nWaitersBlocked--; +// } +// else { +// nWaitersGone++; // count spurious wakeups. +// } +// } +// if ( 0 == --nWaitersToUnblock ) { +// if ( 0 != nWaitersBlocked ) { +// sem_post( semBlockLock ); // open the gate. +// nSignalsWasLeft = 0; // do not open the gate +// // below again. +// } +// else if ( 0 != (nWaitersWasGone = nWaitersGone) ) { +// nWaitersGone = 0; +// } +// } +// } +// else if ( INT_MAX/2 == ++nWaitersGone ) { // timeout/canceled or +// // spurious semaphore :-) +// sem_wait( semBlockLock ); +// nWaitersBlocked -= nWaitersGone; // something is going on here +// // - test of timeouts? :-) +// sem_post( semBlockLock ); +// nWaitersGone = 0; +// } +// unlock( mtxUnblockLock ); +// +// if ( 1 == nSignalsWasLeft ) { +// if ( 0 != nWaitersWasGone ) { +// // sem_adjust( semBlockQueue,-nWaitersWasGone ); +// while ( nWaitersWasGone-- ) { +// sem_wait( semBlockQueue ); // better now than spurious later +// } +// } sem_post( semBlockLock ); // open the gate +// } +// +// lock( mtxExternal ); +// +// return ( bTimedOut ) ? ETIMEOUT : 0; +// } +// +// signal(bAll) { +// +// [auto: register int result ] +// [auto: register int nSignalsToIssue] +// +// lock( mtxUnblockLock ); +// +// if ( 0 != nWaitersToUnblock ) { // the gate is closed!!! +// if ( 0 == nWaitersBlocked ) { // NO-OP +// return unlock( mtxUnblockLock ); +// } +// if (bAll) { +// nWaitersToUnblock += nSignalsToIssue=nWaitersBlocked; +// nWaitersBlocked = 0; +// } +// else { +// nSignalsToIssue = 1; +// nWaitersToUnblock++; +// nWaitersBlocked--; +// } +// } +// else if ( nWaitersBlocked > nWaitersGone ) { // HARMLESS RACE CONDITION! +// sem_wait( semBlockLock ); // close the gate +// if ( 0 != nWaitersGone ) { +// nWaitersBlocked -= nWaitersGone; +// nWaitersGone = 0; +// } +// if (bAll) { +// nSignalsToIssue = nWaitersToUnblock = nWaitersBlocked; +// nWaitersBlocked = 0; +// } +// else { +// nSignalsToIssue = nWaitersToUnblock = 1; +// nWaitersBlocked--; +// } +// } +// else { // NO-OP +// return unlock( mtxUnblockLock ); +// } +// +// unlock( mtxUnblockLock ); +// sem_post( semBlockQueue,nSignalsToIssue ); +// return result; +// } +//////////////////////////////////////////////////////////////////////// +//////////////////////////////////////////////////////////////////////// +//////////////////////////////////////////////////////////////////////// + + +// Required interface for ConditionMembers +// class ConditionMembers +// { +// typedef implementation_defined semaphore_type; +// typedef implementation_defined mutex_type; +// typedef implementation_defined integer_type; +// +// integer_type &get_nwaiters_blocked() +// integer_type &get_nwaiters_gone() +// integer_type &get_nwaiters_to_unblock() +// semaphore_type &get_sem_block_queue() +// semaphore_type &get_sem_block_lock() +// mutex_type &get_mtx_unblock_lock() +// }; +// +template<class ConditionMembers> +class condition_algorithm_8a +{ + private: + condition_algorithm_8a(); + ~condition_algorithm_8a(); + condition_algorithm_8a(const condition_algorithm_8a &); + condition_algorithm_8a &operator=(const condition_algorithm_8a &); + + typedef typename ConditionMembers::semaphore_type semaphore_type; + typedef typename ConditionMembers::mutex_type mutex_type; + typedef typename ConditionMembers::integer_type integer_type; + + // nwaiters_blocked == 0 + // nwaiters_gone() == 0 + // nwaiters_to_unblock == 0 + // sem_block_queue() == initial count 0 + // sem_block_lock() == initial count 1 + // mtx_unblock_lock (unlocked) + + public: + template<class InterprocessMutex> + static bool wait (ConditionMembers &data, bool timeout_enabled, const boost::posix_time::ptime &abs_time, InterprocessMutex &mut); + static void signal(ConditionMembers &data, bool broadcast); +}; + +template<class ConditionMembers> +inline void condition_algorithm_8a<ConditionMembers>::signal(ConditionMembers &data, bool broadcast) +{ + integer_type nsignals_to_issue; + + { + scoped_lock<mutex_type> locker(data.get_mtx_unblock_lock()); + + if ( 0 != data.get_nwaiters_to_unblock() ) { // the gate is closed!!! + if ( 0 == data.get_nwaiters_blocked() ) { // NO-OP + //locker's destructor triggers data.get_mtx_unblock_lock().unlock() + return; + } + if (broadcast) { + data.get_nwaiters_to_unblock() += nsignals_to_issue = data.get_nwaiters_blocked(); + data.get_nwaiters_blocked() = 0; + } + else { + nsignals_to_issue = 1; + data.get_nwaiters_to_unblock()++; + data.get_nwaiters_blocked()--; + } + } + else if ( data.get_nwaiters_blocked() > data.get_nwaiters_gone() ) { // HARMLESS RACE CONDITION! + data.get_sem_block_lock().wait(); // close the gate + if ( 0 != data.get_nwaiters_gone() ) { + data.get_nwaiters_blocked() -= data.get_nwaiters_gone(); + data.get_nwaiters_gone() = 0; + } + if (broadcast) { + nsignals_to_issue = data.get_nwaiters_to_unblock() = data.get_nwaiters_blocked(); + data.get_nwaiters_blocked() = 0; + } + else { + nsignals_to_issue = data.get_nwaiters_to_unblock() = 1; + data.get_nwaiters_blocked()--; + } + } + else { // NO-OP + //locker's destructor triggers data.get_mtx_unblock_lock().unlock() + return; + } + //locker's destructor triggers data.get_mtx_unblock_lock().unlock() + } + data.get_sem_block_queue().post(nsignals_to_issue); +} + +template<class ConditionMembers> +template<class InterprocessMutex> +inline bool condition_algorithm_8a<ConditionMembers>::wait + (ConditionMembers &data, bool tout_enabled, const boost::posix_time::ptime &abs_time, InterprocessMutex &mtxExternal) +{ + //Initialize to avoid warnings + integer_type nsignals_was_left = 0; + integer_type nwaiters_was_gone = 0; + + data.get_sem_block_lock().wait(); + ++data.get_nwaiters_blocked(); + data.get_sem_block_lock().post(); + + struct scoped_unlock + { + InterprocessMutex & mut; + scoped_unlock(InterprocessMutex & m) + : mut(m) + { m.unlock(); } + + ~scoped_unlock() + { mut.lock(); } + } unlocker(mtxExternal); + + + bool bTimedOut = tout_enabled ? !data.get_sem_block_queue().timed_wait(abs_time) : (data.get_sem_block_queue().wait(), false); + + { + scoped_lock<mutex_type> locker(data.get_mtx_unblock_lock()); + if ( 0 != (nsignals_was_left = data.get_nwaiters_to_unblock()) ) { + if ( bTimedOut ) { // timeout (or canceled) + if ( 0 != data.get_nwaiters_blocked() ) { + data.get_nwaiters_blocked()--; + } + else { + data.get_nwaiters_gone()++; // count spurious wakeups. + } + } + if ( 0 == --data.get_nwaiters_to_unblock() ) { + if ( 0 != data.get_nwaiters_blocked() ) { + data.get_sem_block_lock().post(); // open the gate. + nsignals_was_left = 0; // do not open the gate below again. + } + else if ( 0 != (nwaiters_was_gone = data.get_nwaiters_gone()) ) { + data.get_nwaiters_gone() = 0; + } + } + } + else if ( (std::numeric_limits<integer_type>::max)()/2 + == ++data.get_nwaiters_gone() ) { // timeout/canceled or spurious semaphore :-) + data.get_sem_block_lock().wait(); + data.get_nwaiters_blocked() -= data.get_nwaiters_gone(); // something is going on here - test of timeouts? :-) + data.get_sem_block_lock().post(); + data.get_nwaiters_gone() = 0; + } + //locker's destructor triggers data.get_mtx_unblock_lock().unlock() + } + + if ( 1 == nsignals_was_left ) { + if ( 0 != nwaiters_was_gone ) { + // sem_adjust( data.get_sem_block_queue(),-nwaiters_was_gone ); + while ( nwaiters_was_gone-- ) { + data.get_sem_block_queue().wait(); // better now than spurious later + } + } + data.get_sem_block_lock().post(); // open the gate + } + + //mtxExternal.lock(); called from unlocker + + return ( bTimedOut ) ? false : true; +} + +} //namespace ipcdetail +} //namespace interprocess +} //namespace boost + +#include <boost/interprocess/detail/config_end.hpp> + +#endif //BOOST_INTERPROCESS_DETAIL_CONDITION_ALGORITHM_8A_HPP |