From 6cee2edabe83ebb4899c38859f6e36eb461a3759 Mon Sep 17 00:00:00 2001 From: Koundinya Veluri Date: Fri, 22 Sep 2017 00:17:16 -0700 Subject: =?UTF-8?q?=EF=BB=BFImprove=20thread=20pool=20worker=20thread's=20?= =?UTF-8?q?spinning=20for=20work=20(#13921)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Improve thread pool worker thread's spinning for work Closes https://github.com/dotnet/coreclr/issues/5928 Replaced UnfairSemaphore with a new implementation in CLRLifoSemaphore - UnfairSemaphore had a some benefits: - It tracked the number of spinners and avoids waking up waiters as long as the signal count can be satisfied by spinners - Since spinners get priority over waiters, that's the main "unfair" part of it that allows hot threads to remain hot and cold threads to remain cold. However, waiters are still released in FIFO order. - Spinning helps with throughput when incoming work is bursty - All of the above benefits were retained in CLRLifoSemaphore and some were improved: - Similarly to UnfairSemaphore, the number of spinners are tracked and preferenced to avoid waking up waiters - For waiting, on Windows, a I/O completion port is used since it releases waiters in LIFO order. For Unix, added a prioritized wait function to the PAL to register waiters in reverse order for LIFO release behavior. This allows cold waiters to time out more easily since they will be used less frequently. - Similarly to SemaphoreSlim, the number of waiters that were signaled to wake but have not yet woken is tracked to help avoid waking up an excessive number of waiters - Added some YieldProcessorNormalized() calls to the spin loop. This avoids thrashing on Sleep(0) by adding a delay to the spin loop to allow it to be more effective when there are no threads to switch to, or the only other threads to switch to are other similar spinners. - Removed the processor count multiplier on the max spin count and retuned the default max spin count. The processor count multiplier was causing excessive CPU usage on machines with many processors. --- src/inc/clrconfigvalues.h | 7 +- src/pal/inc/pal.h | 7 + src/pal/src/include/pal/corunix.hpp | 3 +- src/pal/src/include/pal/synchobjects.hpp | 3 +- src/pal/src/synchmgr/synchcontrollers.cpp | 152 ++++++++--- src/pal/src/synchmgr/synchmanager.cpp | 2 +- src/pal/src/synchmgr/synchmanager.hpp | 7 +- src/pal/src/synchmgr/wait.cpp | 35 ++- src/vm/synch.cpp | 412 ++++++++++++++++++++++++++++++ src/vm/synch.h | 93 +++++++ src/vm/win32threadpool.cpp | 119 +++++---- src/vm/win32threadpool.h | 244 +----------------- 12 files changed, 731 insertions(+), 353 deletions(-) diff --git a/src/inc/clrconfigvalues.h b/src/inc/clrconfigvalues.h index 4da57f51d6..7b096b438f 100644 --- a/src/inc/clrconfigvalues.h +++ b/src/inc/clrconfigvalues.h @@ -943,7 +943,12 @@ RETAIL_CONFIG_DWORD_INFO(INTERNAL_ThreadPool_ForceMaxWorkerThreads, W("ThreadPoo RETAIL_CONFIG_DWORD_INFO(INTERNAL_ThreadPool_DisableStarvationDetection, W("ThreadPool_DisableStarvationDetection"), 0, "Disables the ThreadPool feature that forces new threads to be added when workitems run for too long") RETAIL_CONFIG_DWORD_INFO(INTERNAL_ThreadPool_DebugBreakOnWorkerStarvation, W("ThreadPool_DebugBreakOnWorkerStarvation"), 0, "Breaks into the debugger if the ThreadPool detects work queue starvation") RETAIL_CONFIG_DWORD_INFO(INTERNAL_ThreadPool_EnableWorkerTracking, W("ThreadPool_EnableWorkerTracking"), 0, "Enables extra expensive tracking of how many workers threads are working simultaneously") -RETAIL_CONFIG_DWORD_INFO(INTERNAL_ThreadPool_UnfairSemaphoreSpinLimit, W("ThreadPool_UnfairSemaphoreSpinLimit"), 50, "Per processor limit used when calculating spin duration in UnfairSemaphore::Wait") +#ifdef _TARGET_ARM64_ +// Spinning scheme is currently different on ARM64, see CLRLifoSemaphore::Wait(DWORD, UINT32, UINT32) +RETAIL_CONFIG_DWORD_INFO(INTERNAL_ThreadPool_UnfairSemaphoreSpinLimit, W("ThreadPool_UnfairSemaphoreSpinLimit"), 0x32, "Maximum number of spins per processor a thread pool worker thread performs before waiting for work") +#else // !_TARGET_ARM64_ +RETAIL_CONFIG_DWORD_INFO(INTERNAL_ThreadPool_UnfairSemaphoreSpinLimit, W("ThreadPool_UnfairSemaphoreSpinLimit"), 0x46, "Maximum number of spins a thread pool worker thread performs before waiting for work") +#endif // _TARGET_ARM64_ RETAIL_CONFIG_DWORD_INFO(EXTERNAL_Thread_UseAllCpuGroups, W("Thread_UseAllCpuGroups"), 0, "Specifies if to automatically distribute thread across CPU Groups") CONFIG_DWORD_INFO(INTERNAL_ThreadpoolTickCountAdjustment, W("ThreadpoolTickCountAdjustment"), 0, "") diff --git a/src/pal/inc/pal.h b/src/pal/inc/pal.h index 4ac91db0cf..4ae2187b69 100644 --- a/src/pal/inc/pal.h +++ b/src/pal/inc/pal.h @@ -1464,6 +1464,13 @@ WaitForSingleObject( IN HANDLE hHandle, IN DWORD dwMilliseconds); +PALIMPORT +DWORD +PALAPI +PAL_WaitForSingleObjectPrioritized( + IN HANDLE hHandle, + IN DWORD dwMilliseconds); + PALIMPORT DWORD PALAPI diff --git a/src/pal/src/include/pal/corunix.hpp b/src/pal/src/include/pal/corunix.hpp index bfdfb6c167..e35c5b056e 100644 --- a/src/pal/src/include/pal/corunix.hpp +++ b/src/pal/src/include/pal/corunix.hpp @@ -773,7 +773,8 @@ namespace CorUnix RegisterWaitingThread( WaitType eWaitType, DWORD dwIndex, - bool fAltertable + bool fAltertable, + bool fPrioritize ) = 0; // diff --git a/src/pal/src/include/pal/synchobjects.hpp b/src/pal/src/include/pal/synchobjects.hpp index 62f4017492..1ee4f1c57b 100644 --- a/src/pal/src/include/pal/synchobjects.hpp +++ b/src/pal/src/include/pal/synchobjects.hpp @@ -40,7 +40,8 @@ namespace CorUnix CONST HANDLE *lpHandles, BOOL bWaitAll, DWORD dwMilliseconds, - BOOL bAlertable); + BOOL bAlertable, + BOOL bPrioritize = FALSE); PAL_ERROR InternalSleepEx( CPalThread * pthrCurrent, diff --git a/src/pal/src/synchmgr/synchcontrollers.cpp b/src/pal/src/synchmgr/synchcontrollers.cpp index 68fe429462..6eae9187d9 100644 --- a/src/pal/src/synchmgr/synchcontrollers.cpp +++ b/src/pal/src/synchmgr/synchcontrollers.cpp @@ -262,7 +262,8 @@ namespace CorUnix PAL_ERROR CSynchWaitController::RegisterWaitingThread( WaitType wtWaitType, DWORD dwIndex, - bool fAlertable) + bool fAlertable, + bool fPrioritize) { VALIDATEOBJECT(m_psdSynchData); @@ -421,12 +422,12 @@ namespace CorUnix // Add new node to queue if (fSharedObject) { - m_psdSynchData->SharedWaiterEnqueue(shridNewNode); + m_psdSynchData->SharedWaiterEnqueue(shridNewNode, fPrioritize); ptwiWaitInfo->lSharedObjCount += 1; } else { - m_psdSynchData->WaiterEnqueue(pwtlnNewNode); + m_psdSynchData->WaiterEnqueue(pwtlnNewNode, fPrioritize); } // Succeeded: update object count @@ -1821,7 +1822,7 @@ namespace CorUnix Note: this method must be called while holding the local process synchronization lock. --*/ - void CSynchData::WaiterEnqueue(WaitingThreadsListNode * pwtlnNewNode) + void CSynchData::WaiterEnqueue(WaitingThreadsListNode * pwtlnNewNode, bool fPrioritize) { VALIDATEOBJECT(this); VALIDATEOBJECT(pwtlnNewNode); @@ -1833,26 +1834,55 @@ namespace CorUnix "Trying to add a WaitingThreadsListNode marked as shared " "as it was a local one\n"); - WaitingThreadsListNode * pwtlnCurrLast = m_ptrWTLTail.ptr; - - pwtlnNewNode->ptrNext.ptr = NULL; - if (NULL == pwtlnCurrLast) + if (!fPrioritize) { - _ASSERT_MSG(NULL == m_ptrWTLHead.ptr, - "Corrupted waiting list on local CSynchData @ %p\n", - this); + // Enqueue normally to the end of the queue + WaitingThreadsListNode * pwtlnCurrLast = m_ptrWTLTail.ptr; + + pwtlnNewNode->ptrNext.ptr = NULL; + if (NULL == pwtlnCurrLast) + { + _ASSERT_MSG(NULL == m_ptrWTLHead.ptr, + "Corrupted waiting list on local CSynchData @ %p\n", + this); - pwtlnNewNode->ptrPrev.ptr = NULL; - m_ptrWTLHead.ptr = pwtlnNewNode; - m_ptrWTLTail.ptr = pwtlnNewNode; + pwtlnNewNode->ptrPrev.ptr = NULL; + m_ptrWTLHead.ptr = pwtlnNewNode; + m_ptrWTLTail.ptr = pwtlnNewNode; + } + else + { + VALIDATEOBJECT(pwtlnCurrLast); + + pwtlnNewNode->ptrPrev.ptr = pwtlnCurrLast; + pwtlnCurrLast->ptrNext.ptr = pwtlnNewNode; + m_ptrWTLTail.ptr = pwtlnNewNode; + } } else { - VALIDATEOBJECT(pwtlnCurrLast); + // The wait is prioritized, enqueue to the beginning of the queue + WaitingThreadsListNode * pwtlnCurrFirst = m_ptrWTLHead.ptr; + + pwtlnNewNode->ptrPrev.ptr = NULL; + if (NULL == pwtlnCurrFirst) + { + _ASSERT_MSG(NULL == m_ptrWTLTail.ptr, + "Corrupted waiting list on local CSynchData @ %p\n", + this); + + pwtlnNewNode->ptrNext.ptr = NULL; + m_ptrWTLHead.ptr = pwtlnNewNode; + m_ptrWTLTail.ptr = pwtlnNewNode; + } + else + { + VALIDATEOBJECT(pwtlnCurrFirst); - pwtlnNewNode->ptrPrev.ptr = pwtlnCurrLast; - pwtlnCurrLast->ptrNext.ptr = pwtlnNewNode; - m_ptrWTLTail.ptr = pwtlnNewNode; + pwtlnNewNode->ptrNext.ptr = pwtlnCurrFirst; + pwtlnCurrFirst->ptrPrev.ptr = pwtlnNewNode; + m_ptrWTLHead.ptr = pwtlnNewNode; + } } m_ulcWaitingThreads += 1; @@ -1872,7 +1902,7 @@ namespace CorUnix Note: this method must be called while holding both local and shared synchronization locks. --*/ - void CSynchData::SharedWaiterEnqueue(SharedID shridNewNode) + void CSynchData::SharedWaiterEnqueue(SharedID shridNewNode, bool fPrioritize) { VALIDATEOBJECT(this); @@ -1880,37 +1910,77 @@ namespace CorUnix "Trying to enqueue a WaitingThreadsListNode as shared " "on a local object\n"); - SharedID shridCurrLast; - WaitingThreadsListNode * pwtlnCurrLast, * pwtlnNewNode; + if (!fPrioritize) + { + // Enqueue normally to the end of the queue + SharedID shridCurrLast; + WaitingThreadsListNode * pwtlnCurrLast, * pwtlnNewNode; - shridCurrLast = m_ptrWTLTail.shrid; - pwtlnCurrLast = SharedIDToTypePointer(WaitingThreadsListNode, shridCurrLast); - pwtlnNewNode = SharedIDToTypePointer(WaitingThreadsListNode, shridNewNode); + shridCurrLast = m_ptrWTLTail.shrid; + pwtlnCurrLast = SharedIDToTypePointer(WaitingThreadsListNode, shridCurrLast); + pwtlnNewNode = SharedIDToTypePointer(WaitingThreadsListNode, shridNewNode); - _ASSERT_MSG(1 == (WTLN_FLAG_OWNER_OBJECT_IS_SHARED & pwtlnNewNode->dwFlags), - "Trying to add a WaitingThreadsListNode marked as local " - "as it was a shared one\n"); + _ASSERT_MSG(1 == (WTLN_FLAG_OWNER_OBJECT_IS_SHARED & pwtlnNewNode->dwFlags), + "Trying to add a WaitingThreadsListNode marked as local " + "as it was a shared one\n"); - VALIDATEOBJECT(pwtlnNewNode); + VALIDATEOBJECT(pwtlnNewNode); - pwtlnNewNode->ptrNext.shrid = NULL; - if (NULL == pwtlnCurrLast) - { - _ASSERT_MSG(NULL == m_ptrWTLHead.shrid, - "Corrupted waiting list on shared CSynchData at " - "{shrid=%p, p=%p}\n", m_shridThis, this); + pwtlnNewNode->ptrNext.shrid = NULL; + if (NULL == pwtlnCurrLast) + { + _ASSERT_MSG(NULL == m_ptrWTLHead.shrid, + "Corrupted waiting list on shared CSynchData at " + "{shrid=%p, p=%p}\n", m_shridThis, this); - pwtlnNewNode->ptrPrev.shrid = NULL; - m_ptrWTLHead.shrid = shridNewNode; - m_ptrWTLTail.shrid = shridNewNode; + pwtlnNewNode->ptrPrev.shrid = NULL; + m_ptrWTLHead.shrid = shridNewNode; + m_ptrWTLTail.shrid = shridNewNode; + } + else + { + VALIDATEOBJECT(pwtlnCurrLast); + + pwtlnNewNode->ptrPrev.shrid = shridCurrLast; + pwtlnCurrLast->ptrNext.shrid = shridNewNode; + m_ptrWTLTail.shrid = shridNewNode; + } } else { - VALIDATEOBJECT(pwtlnCurrLast); + // The wait is prioritized, enqueue to the beginning of the queue + SharedID shridCurrFirst; + WaitingThreadsListNode * pwtlnCurrFirst, * pwtlnNewNode; + + shridCurrFirst = m_ptrWTLHead.shrid; + pwtlnCurrFirst = SharedIDToTypePointer(WaitingThreadsListNode, shridCurrFirst); + pwtlnNewNode = SharedIDToTypePointer(WaitingThreadsListNode, shridNewNode); + + _ASSERT_MSG(1 == (WTLN_FLAG_OWNER_OBJECT_IS_SHARED & pwtlnNewNode->dwFlags), + "Trying to add a WaitingThreadsListNode marked as local " + "as it was a shared one\n"); + + VALIDATEOBJECT(pwtlnNewNode); + + pwtlnNewNode->ptrPrev.shrid = NULL; + if (NULL == pwtlnCurrFirst) + { + _ASSERT_MSG(NULL == m_ptrWTLTail.shrid, + "Corrupted waiting list on shared CSynchData at " + "{shrid=%p, p=%p}\n", m_shridThis, this); - pwtlnNewNode->ptrPrev.shrid = shridCurrLast; - pwtlnCurrLast->ptrNext.shrid = shridNewNode; - m_ptrWTLTail.shrid = shridNewNode; + pwtlnNewNode->ptrNext.shrid = NULL; + m_ptrWTLHead.shrid = shridNewNode; + m_ptrWTLTail.shrid = shridNewNode; + } + else + { + VALIDATEOBJECT(pwtlnCurrFirst); + + pwtlnNewNode->ptrNext.shrid = shridCurrFirst; + pwtlnCurrFirst->ptrPrev.shrid = shridNewNode; + m_ptrWTLHead.shrid = shridNewNode; + } } m_ulcWaitingThreads += 1; diff --git a/src/pal/src/synchmgr/synchmanager.cpp b/src/pal/src/synchmgr/synchmanager.cpp index a683255a3e..048ea3ee7d 100644 --- a/src/pal/src/synchmgr/synchmanager.cpp +++ b/src/pal/src/synchmgr/synchmanager.cpp @@ -3867,7 +3867,7 @@ namespace CorUnix pwtlnNew->shridWaitingState = pwtlnOld->shridWaitingState; pwtlnNew->ptwiWaitInfo = pwtlnOld->ptwiWaitInfo; - psdShared->SharedWaiterEnqueue(rgshridWTLNodes[i]); + psdShared->SharedWaiterEnqueue(rgshridWTLNodes[i], false); psdShared->AddRef(); _ASSERTE(pwtlnOld = pwtlnOld->ptwiWaitInfo->rgpWTLNodes[pwtlnOld->dwObjIndex]); diff --git a/src/pal/src/synchmgr/synchmanager.hpp b/src/pal/src/synchmgr/synchmanager.hpp index b0cc2e7622..89e1d13568 100644 --- a/src/pal/src/synchmgr/synchmanager.hpp +++ b/src/pal/src/synchmgr/synchmanager.hpp @@ -206,8 +206,8 @@ namespace CorUnix CPalThread * pthrCurrent, CPalThread * pthrTarget); - void WaiterEnqueue(WaitingThreadsListNode * pwtlnNewNode); - void SharedWaiterEnqueue(SharedID shridNewNode); + void WaiterEnqueue(WaitingThreadsListNode * pwtlnNewNode, bool fPrioritize); + void SharedWaiterEnqueue(SharedID shridNewNode, bool fPrioritize); // Object Domain accessor methods ObjectDomain GetObjectDomain(void) @@ -464,7 +464,8 @@ namespace CorUnix virtual PAL_ERROR RegisterWaitingThread( WaitType wtWaitType, DWORD dwIndex, - bool fAlertable); + bool fAlertable, + bool fPrioritize); virtual void ReleaseController(void); diff --git a/src/pal/src/synchmgr/wait.cpp b/src/pal/src/synchmgr/wait.cpp index 8ef65aaa01..fc5bb674db 100644 --- a/src/pal/src/synchmgr/wait.cpp +++ b/src/pal/src/synchmgr/wait.cpp @@ -73,6 +73,35 @@ WaitForSingleObject(IN HANDLE hHandle, } +/*++ +Function: + WaitForSingleObjectPrioritized + +Similar to WaitForSingleObject, except uses a LIFO release policy for waiting threads by prioritizing new waiters (registering +them at the beginning of the wait queue rather than at the end). +--*/ +DWORD +PALAPI +PAL_WaitForSingleObjectPrioritized(IN HANDLE hHandle, + IN DWORD dwMilliseconds) +{ + DWORD dwRet; + + PERF_ENTRY(PAL_WaitForSingleObjectPrioritized); + ENTRY("PAL_WaitForSingleObjectPrioritized(hHandle=%p, dwMilliseconds=%u)\n", + hHandle, dwMilliseconds); + + CPalThread * pThread = InternalGetCurrentThread(); + + dwRet = InternalWaitForMultipleObjectsEx(pThread, 1, &hHandle, FALSE, + dwMilliseconds, FALSE, TRUE /* bPrioritize */); + + LOGEXIT("PAL_WaitForSingleObjectPrioritized returns DWORD %u\n", dwRet); + PERF_EXIT(PAL_WaitForSingleObjectPrioritized); + return dwRet; +} + + /*++ Function: WaitForSingleObjectEx @@ -285,7 +314,8 @@ DWORD CorUnix::InternalWaitForMultipleObjectsEx( CONST HANDLE *lpHandles, BOOL bWaitAll, DWORD dwMilliseconds, - BOOL bAlertable) + BOOL bAlertable, + BOOL bPrioritize) { DWORD dwRet = WAIT_FAILED; PAL_ERROR palErr = NO_ERROR; @@ -530,7 +560,8 @@ DWORD CorUnix::InternalWaitForMultipleObjectsEx( palErr = ppISyncWaitCtrlrs[i]->RegisterWaitingThread( wtWaitType, i, - (TRUE == bAlertable)); + (TRUE == bAlertable), + bPrioritize != FALSE); if (NO_ERROR != palErr) { ERROR("RegisterWaitingThread() failed for %d-th object " diff --git a/src/vm/synch.cpp b/src/vm/synch.cpp index e159b7813a..e4fae65855 100644 --- a/src/vm/synch.cpp +++ b/src/vm/synch.cpp @@ -590,6 +590,418 @@ DWORD CLRSemaphore::Wait(DWORD dwMilliseconds, BOOL alertable) } } +void CLRLifoSemaphore::Create(INT32 initialSignalCount, INT32 maximumSignalCount) +{ + CONTRACTL + { + THROWS; + GC_NOTRIGGER; + SO_TOLERANT; + } + CONTRACTL_END; + + _ASSERTE(maximumSignalCount > 0); + _ASSERTE(initialSignalCount <= maximumSignalCount); + _ASSERTE(m_handle == nullptr); + +#ifdef FEATURE_PAL + HANDLE h = UnsafeCreateSemaphore(nullptr, initialSignalCount, maximumSignalCount, nullptr); +#else // !FEATURE_PAL + HANDLE h = CreateIoCompletionPort(INVALID_HANDLE_VALUE, nullptr, 0, maximumSignalCount); +#endif // FEATURE_PAL + if (h == nullptr) + { + ThrowOutOfMemory(); + } + + m_handle = h; + m_counts.signalCount = initialSignalCount; + INDEBUG(m_maximumSignalCount = maximumSignalCount); +} + +void CLRLifoSemaphore::Close() +{ + LIMITED_METHOD_CONTRACT; + + if (m_handle == nullptr) + { + return; + } + + CloseHandle(m_handle); + m_handle = nullptr; +} + +bool CLRLifoSemaphore::WaitForSignal(DWORD timeoutMs) +{ + CONTRACTL + { + NOTHROW; + GC_NOTRIGGER; + SO_TOLERANT; + } + CONTRACTL_END; + + _ASSERTE(timeoutMs != 0); + _ASSERTE(m_handle != nullptr); + _ASSERTE(m_counts.waiterCount != (UINT16)0); + + while (true) + { + // Wait for a signal + BOOL waitSuccessful; + { +#ifdef FEATURE_PAL + // Do a prioritized wait to get LIFO waiter release order + DWORD waitResult = PAL_WaitForSingleObjectPrioritized(m_handle, timeoutMs); + _ASSERTE(waitResult == WAIT_OBJECT_0 || waitResult == WAIT_TIMEOUT); + waitSuccessful = waitResult == WAIT_OBJECT_0; +#else // !FEATURE_PAL + // I/O completion ports release waiters in LIFO order, see + // https://msdn.microsoft.com/en-us/library/windows/desktop/aa365198(v=vs.85).aspx + DWORD numberOfBytes; + ULONG_PTR completionKey; + LPOVERLAPPED overlapped; + waitSuccessful = GetQueuedCompletionStatus(m_handle, &numberOfBytes, &completionKey, &overlapped, timeoutMs); + _ASSERTE(waitSuccessful || GetLastError() == WAIT_TIMEOUT); + _ASSERTE(overlapped == nullptr); +#endif // FEATURE_PAL + } + + // Unregister the waiter if this thread will not be waiting anymore, and try to acquire the semaphore + Counts counts = m_counts.VolatileLoad(); + while (true) + { + _ASSERTE(counts.waiterCount != (UINT16)0); + Counts newCounts = counts; + if (counts.signalCount != 0) + { + --newCounts.signalCount; + --newCounts.waiterCount; + } + else if (!waitSuccessful) + { + --newCounts.waiterCount; + } + + // This waiter has woken up and this needs to be reflected in the count of waiters signaled to wake. Since we don't + // have thread-specific signal state, there is not enough information to tell whether this thread woke up because it + // was signaled. For instance, this thread may have timed out and then we don't know whether this thread also got + // signaled. So in any woken case, decrement the count if possible. As such, timeouts could cause more waiters to + // wake than necessary. + if (counts.countOfWaitersSignaledToWake != (UINT8)0) + { + --newCounts.countOfWaitersSignaledToWake; + } + + Counts countsBeforeUpdate = m_counts.CompareExchange(newCounts, counts); + if (countsBeforeUpdate == counts) + { + if (counts.signalCount != 0) + { + return true; + } + break; + } + + counts = countsBeforeUpdate; + } + + if (!waitSuccessful) + { + return false; + } + } +} + +bool CLRLifoSemaphore::Wait(DWORD timeoutMs) +{ + WRAPPER_NO_CONTRACT; + + _ASSERTE(m_handle != nullptr); + + // Acquire the semaphore or register as a waiter + Counts counts = m_counts.VolatileLoad(); + while (true) + { + _ASSERTE(counts.signalCount <= m_maximumSignalCount); + Counts newCounts = counts; + if (counts.signalCount != 0) + { + --newCounts.signalCount; + } + else if (timeoutMs != 0) + { + ++newCounts.waiterCount; + _ASSERTE(newCounts.waiterCount != (UINT16)0); // overflow check, this many waiters is currently not supported + } + + Counts countsBeforeUpdate = m_counts.CompareExchange(newCounts, counts); + if (countsBeforeUpdate == counts) + { + return counts.signalCount != 0 || (timeoutMs != 0 && WaitForSignal(timeoutMs)); + } + + counts = countsBeforeUpdate; + } +} + +bool CLRLifoSemaphore::Wait(DWORD timeoutMs, UINT32 spinCount, UINT32 processorCount) +{ + CONTRACTL + { + NOTHROW; + GC_NOTRIGGER; + SO_TOLERANT; + } + CONTRACTL_END; + + _ASSERTE(m_handle != nullptr); + + if (timeoutMs == 0 || spinCount == 0) + { + return Wait(timeoutMs); + } + + // Try to acquire the semaphore or register as a spinner + Counts counts = m_counts.VolatileLoad(); + while (true) + { + Counts newCounts = counts; + if (counts.signalCount != 0) + { + --newCounts.signalCount; + } + else + { + ++newCounts.spinnerCount; + if (newCounts.spinnerCount == (UINT8)0) + { + // Maximum number of spinners reached, register as a waiter instead + --newCounts.spinnerCount; + ++newCounts.waiterCount; + _ASSERTE(newCounts.waiterCount != (UINT16)0); // overflow check, this many waiters is currently not supported + } + } + + Counts countsBeforeUpdate = m_counts.CompareExchange(newCounts, counts); + if (countsBeforeUpdate == counts) + { + if (counts.signalCount != 0) + { + return true; + } + if (newCounts.waiterCount != counts.waiterCount) + { + return WaitForSignal(timeoutMs); + } + break; + } + + counts = countsBeforeUpdate; + } + +#ifdef _TARGET_ARM64_ + // For now, the spinning changes are disabled on ARM64. The spin loop below replicates how UnfairSemaphore used to spin. + // Once more tuning is done on ARM64, it should be possible to come up with a spinning scheme that works well everywhere. + int spinCountPerProcessor = spinCount; + for (UINT32 i = 1; ; ++i) + { + // Wait + ClrSleepEx(0, false); + + // Try to acquire the semaphore and unregister as a spinner + counts = m_counts.VolatileLoad(); + while (true) + { + _ASSERTE(counts.spinnerCount != (UINT8)0); + if (counts.signalCount == 0) + { + break; + } + + Counts newCounts = counts; + --newCounts.signalCount; + --newCounts.spinnerCount; + + Counts countsBeforeUpdate = m_counts.CompareExchange(newCounts, counts); + if (countsBeforeUpdate == counts) + { + return true; + } + + counts = countsBeforeUpdate; + } + + // Determine whether to spin further + double spinnersPerProcessor = (double)counts.spinnerCount / processorCount; + UINT32 spinLimit = (UINT32)(spinCountPerProcessor / spinnersPerProcessor + 0.5); + if (i >= spinLimit) + { + break; + } + } +#else // !_TARGET_ARM64_ + const UINT32 Sleep0Threshold = 10; + YieldProcessorWithBackOffNormalizationInfo normalizationInfo; +#ifdef FEATURE_PAL + // The PAL's wait subsystem is quite slow, spin more to compensate for the more expensive wait + spinCount *= 2; +#endif // FEATURE_PAL + for (UINT32 i = 0; i < spinCount; ++i) + { + // Wait + // + // (i - Sleep0Threshold) % 2 != 0: The purpose of this check is to interleave Thread.Yield/Sleep(0) with + // Thread.SpinWait. Otherwise, the following issues occur: + // - When there are no threads to switch to, Yield and Sleep(0) become no-op and it turns the spin loop into a + // busy-spin that may quickly reach the max spin count and cause the thread to enter a wait state. Completing the + // spin loop too early can cause excessive context switcing from the wait. + // - If there are multiple threads doing Yield and Sleep(0) (typically from the same spin loop due to contention), + // they may switch between one another, delaying work that can make progress. + if (i < Sleep0Threshold || (i - Sleep0Threshold) % 2 != 0) + { + YieldProcessorWithBackOffNormalized(normalizationInfo, i); + } + else + { + // Not doing SwitchToThread(), it does not seem to have any benefit over Sleep(0) + ClrSleepEx(0, false); + } + + // Try to acquire the semaphore and unregister as a spinner + counts = m_counts.VolatileLoad(); + while (true) + { + _ASSERTE(counts.spinnerCount != (UINT8)0); + if (counts.signalCount == 0) + { + break; + } + + Counts newCounts = counts; + --newCounts.signalCount; + --newCounts.spinnerCount; + + Counts countsBeforeUpdate = m_counts.CompareExchange(newCounts, counts); + if (countsBeforeUpdate == counts) + { + return true; + } + + counts = countsBeforeUpdate; + } + } +#endif // _TARGET_ARM64_ + + // Unregister as a spinner, and acquire the semaphore or register as a waiter + counts = m_counts.VolatileLoad(); + while (true) + { + _ASSERTE(counts.spinnerCount != (UINT8)0); + Counts newCounts = counts; + --newCounts.spinnerCount; + if (counts.signalCount != 0) + { + --newCounts.signalCount; + } + else + { + ++newCounts.waiterCount; + _ASSERTE(newCounts.waiterCount != (UINT16)0); // overflow check, this many waiters is currently not supported + } + + Counts countsBeforeUpdate = m_counts.CompareExchange(newCounts, counts); + if (countsBeforeUpdate == counts) + { + return counts.signalCount != 0 || WaitForSignal(timeoutMs); + } + + counts = countsBeforeUpdate; + } +} + +void CLRLifoSemaphore::Release(INT32 releaseCount) +{ + CONTRACTL + { + NOTHROW; + GC_NOTRIGGER; + SO_TOLERANT; + } + CONTRACTL_END; + + _ASSERTE(releaseCount > 0); + _ASSERTE((UINT32)releaseCount <= m_maximumSignalCount); + _ASSERTE(m_handle != INVALID_HANDLE_VALUE); + + INT32 countOfWaitersToWake; + Counts counts = m_counts.VolatileLoad(); + while (true) + { + Counts newCounts = counts; + + // Increase the signal count. The addition doesn't overflow because of the limit on the max signal count in Create. + newCounts.signalCount += releaseCount; + _ASSERTE(newCounts.signalCount > counts.signalCount); + + // Determine how many waiters to wake, taking into account how many spinners and waiters there are and how many waiters + // have previously been signaled to wake but have not yet woken + countOfWaitersToWake = + (INT32)min(newCounts.signalCount, (UINT32)newCounts.waiterCount + newCounts.spinnerCount) - + newCounts.spinnerCount - + newCounts.countOfWaitersSignaledToWake; + if (countOfWaitersToWake > 0) + { + // Ideally, limiting to a maximum of releaseCount would not be necessary and could be an assert instead, but since + // WaitForSignal() does not have enough information to tell whether a woken thread was signaled, and due to the cap + // below, it's possible for countOfWaitersSignaledToWake to be less than the number of threads that have actually + // been signaled to wake. + if (countOfWaitersToWake > releaseCount) + { + countOfWaitersToWake = releaseCount; + } + + // Cap countOfWaitersSignaledToWake to its max value. It's ok to ignore some woken threads in this count, it just + // means some more threads will be woken next time. Typically, it won't reach the max anyway. + newCounts.countOfWaitersSignaledToWake += (UINT8)min(countOfWaitersToWake, (INT32)UINT8_MAX); + if (newCounts.countOfWaitersSignaledToWake <= counts.countOfWaitersSignaledToWake) + { + newCounts.countOfWaitersSignaledToWake = UINT8_MAX; + } + } + + Counts countsBeforeUpdate = m_counts.CompareExchange(newCounts, counts); + if (countsBeforeUpdate == counts) + { + _ASSERTE((UINT32)releaseCount <= m_maximumSignalCount - counts.signalCount); + _ASSERTE(newCounts.countOfWaitersSignaledToWake <= newCounts.waiterCount); + if (countOfWaitersToWake <= 0) + { + return; + } + break; + } + + counts = countsBeforeUpdate; + } + + // Wake waiters +#ifdef FEATURE_PAL + BOOL released = UnsafeReleaseSemaphore(m_handle, countOfWaitersToWake, nullptr); + _ASSERTE(released); +#else // !FEATURE_PAL + while (--countOfWaitersToWake >= 0) + { + while (!PostQueuedCompletionStatus(m_handle, 0, 0, nullptr)) + { + // Probably out of memory. It's not valid to stop and throw here, so try again after a delay. + ClrSleepEx(1, false); + } + } +#endif // FEATURE_PAL +} + void CLRMutex::Create(LPSECURITY_ATTRIBUTES lpMutexAttributes, BOOL bInitialOwner, LPCTSTR lpName) { CONTRACTL diff --git a/src/vm/synch.h b/src/vm/synch.h index d88ec46342..c8e9baf481 100644 --- a/src/vm/synch.h +++ b/src/vm/synch.h @@ -177,6 +177,99 @@ private: HANDLE m_handle; }; +class CLRLifoSemaphore +{ +private: + struct Counts + { + union + { + struct + { + UINT32 signalCount; + UINT16 waiterCount; + UINT8 spinnerCount; + UINT8 countOfWaitersSignaledToWake; + }; + UINT64 data; + }; + + Counts(UINT64 data = 0) : data(data) + { + LIMITED_METHOD_CONTRACT; + } + + operator UINT64() const + { + LIMITED_METHOD_CONTRACT; + return data; + } + + Counts &operator =(UINT64 data) + { + LIMITED_METHOD_CONTRACT; + + this->data = data; + return *this; + } + + Counts VolatileLoad() const + { + LIMITED_METHOD_CONTRACT; + return ::VolatileLoad(&data); + } + + Counts CompareExchange(Counts toCounts, Counts fromCounts) + { + LIMITED_METHOD_CONTRACT; + return (UINT64)InterlockedCompareExchange64((LONG64 *)&data, (LONG64)toCounts, (LONG64)fromCounts); + } + }; + +public: + CLRLifoSemaphore() : m_handle(nullptr) + { + LIMITED_METHOD_CONTRACT; + } + + ~CLRLifoSemaphore() + { + WRAPPER_NO_CONTRACT; + Close(); + } + +public: + void Create(INT32 initialSignalCount, INT32 maximumSignalCount); + void Close(); + +public: + BOOL IsValid() const + { + LIMITED_METHOD_CONTRACT; + return m_handle != nullptr; + } + +private: + bool WaitForSignal(DWORD timeoutMs); +public: + bool Wait(DWORD timeoutMs); + bool Wait(DWORD timeoutMs, UINT32 spinCount, UINT32 processorCount); + void Release(INT32 releaseCount); + +private: + BYTE __padding1[MAX_CACHE_LINE_SIZE]; // padding to ensure that m_counts gets its own cache line + Counts m_counts; + BYTE __padding2[MAX_CACHE_LINE_SIZE]; // padding to ensure that m_counts gets its own cache line + +#if defined(DEBUG) + UINT32 m_maximumSignalCount; +#endif // _DEBUG && !FEATURE_PAL + + // When FEATURE_PAL is defined, this is a handle to an instance of the PAL's LIFO semaphore. When FEATURE_PAL is not + // defined, this is a handle to an I/O completion port. + HANDLE m_handle; +}; + class CLRMutex { public: CLRMutex() diff --git a/src/vm/win32threadpool.cpp b/src/vm/win32threadpool.cpp index eabbcb93ae..97c020a4b6 100644 --- a/src/vm/win32threadpool.cpp +++ b/src/vm/win32threadpool.cpp @@ -103,6 +103,7 @@ DWORD ThreadpoolMgr::NextCompletedWorkRequestsTime; LARGE_INTEGER ThreadpoolMgr::CurrentSampleStartTime; +unsigned int ThreadpoolMgr::WorkerThreadSpinLimit; int ThreadpoolMgr::ThreadAdjustmentInterval; #define INVALID_HANDLE ((HANDLE) -1) @@ -136,8 +137,8 @@ CLREvent * ThreadpoolMgr::RetiredCPWakeupEvent; // wakeup event for comple CrstStatic ThreadpoolMgr::WaitThreadsCriticalSection; ThreadpoolMgr::LIST_ENTRY ThreadpoolMgr::WaitThreadsHead; -ThreadpoolMgr::UnfairSemaphore* ThreadpoolMgr::WorkerSemaphore; -CLRSemaphore* ThreadpoolMgr::RetiredWorkerSemaphore; +CLRLifoSemaphore* ThreadpoolMgr::WorkerSemaphore; +CLRLifoSemaphore* ThreadpoolMgr::RetiredWorkerSemaphore; CrstStatic ThreadpoolMgr::TimerQueueCriticalSection; HANDLE ThreadpoolMgr::TimerThread=NULL; @@ -353,6 +354,7 @@ BOOL ThreadpoolMgr::Initialize() EX_TRY { + WorkerThreadSpinLimit = CLRConfig::GetConfigValue(CLRConfig::INTERNAL_ThreadPool_UnfairSemaphoreSpinLimit); ThreadAdjustmentInterval = CLRConfig::GetConfigValue(CLRConfig::INTERNAL_HillClimbing_SampleIntervalLow); pADTPCount->InitResources(); @@ -370,26 +372,26 @@ BOOL ThreadpoolMgr::Initialize() RetiredCPWakeupEvent->CreateAutoEvent(FALSE); _ASSERTE(RetiredCPWakeupEvent->IsValid()); - int spinLimitPerProcessor = CLRConfig::GetConfigValue(CLRConfig::INTERNAL_ThreadPool_UnfairSemaphoreSpinLimit); - WorkerSemaphore = new UnfairSemaphore(ThreadCounter::MaxPossibleCount, spinLimitPerProcessor); + WorkerSemaphore = new CLRLifoSemaphore(); + WorkerSemaphore->Create(0, ThreadCounter::MaxPossibleCount); - RetiredWorkerSemaphore = new CLRSemaphore(); + RetiredWorkerSemaphore = new CLRLifoSemaphore(); RetiredWorkerSemaphore->Create(0, ThreadCounter::MaxPossibleCount); - //ThreadPool_CPUGroup - if (CPUGroupInfo::CanEnableGCCPUGroups() && CPUGroupInfo::CanEnableThreadUseAllCpuGroups()) + //ThreadPool_CPUGroup + if (CPUGroupInfo::CanEnableGCCPUGroups() && CPUGroupInfo::CanEnableThreadUseAllCpuGroups()) RecycledLists.Initialize( CPUGroupInfo::GetNumActiveProcessors() ); else - RecycledLists.Initialize( g_SystemInfo.dwNumberOfProcessors ); - /* - { - SYSTEM_INFO sysInfo; + RecycledLists.Initialize( g_SystemInfo.dwNumberOfProcessors ); + /* + { + SYSTEM_INFO sysInfo; - ::GetSystemInfo( &sysInfo ); + ::GetSystemInfo( &sysInfo ); - RecycledLists.Initialize( sysInfo.dwNumberOfProcessors ); - } - */ + RecycledLists.Initialize( sysInfo.dwNumberOfProcessors ); + } + */ } EX_CATCH { @@ -1034,9 +1036,7 @@ void ThreadpoolMgr::MaybeAddWorkingWorker() if (toUnretire > 0) { - LONG previousCount; - INDEBUG(BOOL success =) RetiredWorkerSemaphore->Release((LONG)toUnretire, &previousCount); - _ASSERTE(success); + RetiredWorkerSemaphore->Release(toUnretire); } if (toRelease > 0) @@ -2055,10 +2055,7 @@ Retire: while (true) { RetryRetire: - DWORD result = RetiredWorkerSemaphore->Wait(AppX::IsAppXProcess() ? WorkerTimeoutAppX : WorkerTimeout, FALSE); - _ASSERTE(WAIT_OBJECT_0 == result || WAIT_TIMEOUT == result); - - if (WAIT_OBJECT_0 == result) + if (RetiredWorkerSemaphore->Wait(AppX::IsAppXProcess() ? WorkerTimeoutAppX : WorkerTimeout)) { foundWork = true; @@ -2134,59 +2131,57 @@ WaitForWork: FireEtwThreadPoolWorkerThreadWait(counts.NumActive, counts.NumRetired, GetClrInstanceId()); RetryWaitForWork: - if (!WorkerSemaphore->Wait(AppX::IsAppXProcess() ? WorkerTimeoutAppX : WorkerTimeout)) + if (WorkerSemaphore->Wait(AppX::IsAppXProcess() ? WorkerTimeoutAppX : WorkerTimeout, WorkerThreadSpinLimit, NumberOfProcessors)) { - if (!IsIoPending()) - { - // - // We timed out, and are about to exit. This puts us in a very similar situation to the - // retirement case above - someone may think we're still waiting, and go ahead and: - // - // 1) Increment NumWorking - // 2) Signal WorkerSemaphore - // - // The solution is much like retirement; when we're decrementing NumActive, we need to make - // sure it doesn't drop below NumWorking. If it would, then we need to go back and wait - // again. - // + foundWork = true; + goto Work; + } - DangerousNonHostedSpinLockHolder tal(&ThreadAdjustmentLock); + if (!IsIoPending()) + { + // + // We timed out, and are about to exit. This puts us in a very similar situation to the + // retirement case above - someone may think we're still waiting, and go ahead and: + // + // 1) Increment NumWorking + // 2) Signal WorkerSemaphore + // + // The solution is much like retirement; when we're decrementing NumActive, we need to make + // sure it doesn't drop below NumWorking. If it would, then we need to go back and wait + // again. + // - // counts volatile read paired with CompareExchangeCounts loop set - counts = WorkerCounter.DangerousGetDirtyCounts(); - while (true) - { - if (counts.NumActive == counts.NumWorking) - { - goto RetryWaitForWork; - } + DangerousNonHostedSpinLockHolder tal(&ThreadAdjustmentLock); - newCounts = counts; - newCounts.NumActive--; + // counts volatile read paired with CompareExchangeCounts loop set + counts = WorkerCounter.DangerousGetDirtyCounts(); + while (true) + { + if (counts.NumActive == counts.NumWorking) + { + goto RetryWaitForWork; + } - // if we timed out while active, then Hill Climbing needs to be told that we need fewer threads - newCounts.MaxWorking = max(MinLimitTotalWorkerThreads, min(newCounts.NumActive, newCounts.MaxWorking)); + newCounts = counts; + newCounts.NumActive--; - oldCounts = WorkerCounter.CompareExchangeCounts(newCounts, counts); + // if we timed out while active, then Hill Climbing needs to be told that we need fewer threads + newCounts.MaxWorking = max(MinLimitTotalWorkerThreads, min(newCounts.NumActive, newCounts.MaxWorking)); - if (oldCounts == counts) - { - HillClimbingInstance.ForceChange(newCounts.MaxWorking, ThreadTimedOut); - goto Exit; - } + oldCounts = WorkerCounter.CompareExchangeCounts(newCounts, counts); - counts = oldCounts; + if (oldCounts == counts) + { + HillClimbingInstance.ForceChange(newCounts.MaxWorking, ThreadTimedOut); + goto Exit; } - } - else - { - goto RetryWaitForWork; + + counts = oldCounts; } } else { - foundWork = true; - goto Work; + goto RetryWaitForWork; } Exit: diff --git a/src/vm/win32threadpool.h b/src/vm/win32threadpool.h index fc5742b494..764c65efdc 100644 --- a/src/vm/win32threadpool.h +++ b/src/vm/win32threadpool.h @@ -105,245 +105,6 @@ class ThreadpoolMgr friend class HillClimbing; friend struct _DacGlobals; - // - // UnfairSemaphore is a more scalable semaphore than CLRSemaphore. It prefers to release threads that have more recently begun waiting, - // to preserve locality. Additionally, very recently-waiting threads can be released without an addition kernel transition to unblock - // them, which reduces latency. - // - // UnfairSemaphore is only appropriate in scenarios where the order of unblocking threads is not important, and where threads frequently - // need to be woken. This is true of the ThreadPool's "worker semaphore", but not, for example, of the "retired worker semaphore" which is - // only rarely signalled. - // - // A further optimization that could be done here would be to replace CLRSemaphore with a Win32 IO Completion Port. Completion ports - // unblock threads in LIFO order, unlike the roughly-FIFO ordering of ordinary semaphores, and that would help to keep the "warm" threads warm. - // We did not do this in CLR 4.0 because hosts currently have no way of intercepting calls to IO Completion Ports (other than THE completion port - // behind the I/O thread pool), and we did not have time to explore the implications of this. Also, completion ports are not available on the Mac, - // though Snow Leopard has something roughly similar (and a regular Semaphore would do on the Mac in a pinch). - // - class UnfairSemaphore - { - private: - // padding to ensure we get our own cache line - BYTE padding1[MAX_CACHE_LINE_SIZE]; - - // - // We track everything we care about in a single 64-bit struct to allow us to - // do CompareExchanges on this for atomic updates. - // - union Counts - { - struct - { - int spinners : 16; //how many threads are currently spin-waiting for this semaphore? - int countForSpinners : 16; //how much of the semaphore's count is availble to spinners? - int waiters : 16; //how many threads are blocked in the OS waiting for this semaphore? - int countForWaiters : 16; //how much count is available to waiters? - }; - - LONGLONG asLongLong; - - } m_counts; - - private: - // padding to ensure we get our own cache line - BYTE padding2[MAX_CACHE_LINE_SIZE]; - - const int m_spinLimitPerProcessor; //used when calculating max spin duration - CLRSemaphore m_sem; //waiters wait on this - - INDEBUG(int m_maxCount;) - - bool UpdateCounts(Counts newCounts, Counts currentCounts) - { - LIMITED_METHOD_CONTRACT; - Counts oldCounts; - oldCounts.asLongLong = FastInterlockCompareExchangeLong(&m_counts.asLongLong, newCounts.asLongLong, currentCounts.asLongLong); - if (oldCounts.asLongLong == currentCounts.asLongLong) - { - // we succesfully updated the counts. Now validate what we put in. - // Note: we can't validate these unless the CompareExchange succeeds, because - // on x86 a VolatileLoad of m_counts is not atomic; we could end up getting inconsistent - // values. It's not until we've successfully stored the new values that we know for sure - // that the old values were correct (because if they were not, the CompareExchange would have - // failed. - _ASSERTE(newCounts.spinners >= 0); - _ASSERTE(newCounts.countForSpinners >= 0); - _ASSERTE(newCounts.waiters >= 0); - _ASSERTE(newCounts.countForWaiters >= 0); - _ASSERTE(newCounts.countForSpinners + newCounts.countForWaiters <= m_maxCount); - - return true; - } - else - { - // we lost a race with some other thread, and will need to try again. - return false; - } - } - - public: - - UnfairSemaphore(int maxCount, int spinLimitPerProcessor) - : m_spinLimitPerProcessor(spinLimitPerProcessor) - { - CONTRACTL - { - THROWS; - GC_NOTRIGGER; - SO_TOLERANT; - MODE_ANY; - } - CONTRACTL_END; - _ASSERTE(maxCount <= 0x7fff); //counts need to fit in signed 16-bit ints - INDEBUG(m_maxCount = maxCount;) - - m_counts.asLongLong = 0; - m_sem.Create(0, maxCount); - } - - // - // no destructor - CLRSemaphore will close itself in its own destructor. - // - //~UnfairSemaphore() - //{ - //} - - - void Release(int countToRelease) - { - while (true) - { - Counts currentCounts, newCounts; - currentCounts.asLongLong = VolatileLoad(&m_counts.asLongLong); - newCounts = currentCounts; - - int remainingCount = countToRelease; - - // First, prefer to release existing spinners, - // because a) they're hot, and b) we don't need a kernel - // transition to release them. - int spinnersToRelease = max(0, min(remainingCount, currentCounts.spinners - currentCounts.countForSpinners)); - newCounts.countForSpinners += spinnersToRelease; - remainingCount -= spinnersToRelease; - - // Next, prefer to release existing waiters - int waitersToRelease = max(0, min(remainingCount, currentCounts.waiters - currentCounts.countForWaiters)); - newCounts.countForWaiters += waitersToRelease; - remainingCount -= waitersToRelease; - - // Finally, release any future spinners that might come our way - newCounts.countForSpinners += remainingCount; - - // Try to commit the transaction - if (UpdateCounts(newCounts, currentCounts)) - { - // Now we need to release the waiters we promised to release - if (waitersToRelease > 0) - { - LONG previousCount; - INDEBUG(BOOL success =) m_sem.Release((LONG)waitersToRelease, &previousCount); - _ASSERTE(success); - } - break; - } - } - } - - - bool Wait(DWORD timeout) - { - while (true) - { - Counts currentCounts, newCounts; - currentCounts.asLongLong = VolatileLoad(&m_counts.asLongLong); - newCounts = currentCounts; - - // First, just try to grab some count. - if (currentCounts.countForSpinners > 0) - { - newCounts.countForSpinners--; - if (UpdateCounts(newCounts, currentCounts)) - return true; - } - else - { - // No count available, become a spinner - newCounts.spinners++; - if (UpdateCounts(newCounts, currentCounts)) - break; - } - } - - // - // Now we're a spinner. - // - int numSpins = 0; - while (true) - { - Counts currentCounts, newCounts; - - currentCounts.asLongLong = VolatileLoad(&m_counts.asLongLong); - newCounts = currentCounts; - - if (currentCounts.countForSpinners > 0) - { - newCounts.countForSpinners--; - newCounts.spinners--; - if (UpdateCounts(newCounts, currentCounts)) - return true; - } - else - { - double spinnersPerProcessor = (double)currentCounts.spinners / ThreadpoolMgr::NumberOfProcessors; - int spinLimit = (int)((m_spinLimitPerProcessor / spinnersPerProcessor) + 0.5); - if (numSpins >= spinLimit) - { - newCounts.spinners--; - newCounts.waiters++; - if (UpdateCounts(newCounts, currentCounts)) - break; - } - else - { - // - // We yield to other threads using SleepEx rather than the more traditional SwitchToThread. - // This is because SwitchToThread does not yield to threads currently scheduled to run on other - // processors. On a 4-core machine, for example, this means that SwitchToThread is only ~25% likely - // to yield to the correct thread in some scenarios. - // SleepEx has the disadvantage of not yielding to lower-priority threads. However, this is ok because - // once we've called this a few times we'll become a "waiter" and wait on the CLRSemaphore, and that will - // yield to anything that is runnable. - // - ClrSleepEx(0, FALSE); - numSpins++; - } - } - } - - // - // Now we're a waiter - // - DWORD result = m_sem.Wait(timeout, FALSE); - _ASSERTE(WAIT_OBJECT_0 == result || WAIT_TIMEOUT == result); - - while (true) - { - Counts currentCounts, newCounts; - - currentCounts.asLongLong = VolatileLoad(&m_counts.asLongLong); - newCounts = currentCounts; - - newCounts.waiters--; - - if (result == WAIT_OBJECT_0) - newCounts.countForWaiters--; - - if (UpdateCounts(newCounts, currentCounts)) - return (result == WAIT_OBJECT_0); - } - } - }; - public: struct ThreadCounter { @@ -1258,6 +1019,7 @@ private: static LARGE_INTEGER CurrentSampleStartTime; + static unsigned int WorkerThreadSpinLimit; static int ThreadAdjustmentInterval; SPTR_DECL(WorkRequest,WorkRequestHead); // Head of work request queue @@ -1286,7 +1048,7 @@ private: // 2) There is no functional reason why any particular thread should be preferred when waking workers. This only impacts performance, // and un-fairness helps performance in this case. // - static UnfairSemaphore* WorkerSemaphore; + static CLRLifoSemaphore* WorkerSemaphore; // // RetiredWorkerSemaphore is a regular CLRSemaphore, not an UnfairSemaphore, because if a thread waits on this semaphore is it almost certainly @@ -1295,7 +1057,7 @@ private: // down, by constantly re-using the same small set of retired workers rather than round-robining between all of them as CLRSemaphore will do. // If we go that route, we should add a "no-spin" option to UnfairSemaphore.Wait to avoid wasting CPU. // - static CLRSemaphore* RetiredWorkerSemaphore; + static CLRLifoSemaphore* RetiredWorkerSemaphore; static CLREvent * RetiredCPWakeupEvent; -- cgit v1.2.3