path: root/src/vm/win32threadpool.h
diff options
Diffstat (limited to 'src/vm/win32threadpool.h')
1 files changed, 1400 insertions, 0 deletions
diff --git a/src/vm/win32threadpool.h b/src/vm/win32threadpool.h
new file mode 100644
index 0000000000..f712ef983d
--- /dev/null
+++ b/src/vm/win32threadpool.h
@@ -0,0 +1,1400 @@
+// Licensed to the .NET Foundation under one or more agreements.
+// The .NET Foundation licenses this file to you under the MIT license.
+// See the LICENSE file in the project root for more information.
+Module Name:
+ Win32ThreadPool.h
+ This module is the header file for thread pools using Win32 APIs.
+Revision History:
+#include "delegateinfo.h"
+#include "util.hpp"
+#include "nativeoverlapped.h"
+#include "hillclimbing.h"
+#define MAX_CACHED_EVENTS 40 // upper limit on number of wait events cached
+#define WAIT_REGISTERED 0x01
+#define WAIT_ACTIVE 0x02
+#define WAIT_DELETE 0x04
+#define TIMER_REGISTERED 0x01
+#define TIMER_ACTIVE 0x02
+#define TIMER_DELETE 0x04
+#define WAIT_SINGLE_EXECUTION 0x00000001
+#define WAIT_FREE_CONTEXT 0x00000002
+#define WAIT_INTERNAL_COMPLETION 0x00000004
+#define QUEUE_ONLY 0x00000000 // do not attempt to call on the thread
+#define CALL_OR_QUEUE 0x00000001 // call on the same thread if not too busy, else queue
+const int MaxLimitThreadsPerCPU=250; // upper limit on number of cp threads per CPU
+const int MaxFreeCPThreadsPerCPU=2; // upper limit on number of free cp threads per CPU
+const int CpuUtilizationHigh=95; // remove threads when above this
+const int CpuUtilizationLow =80; // inject more threads if below this
+#ifndef FEATURE_PAL
+extern HANDLE (WINAPI *g_pufnCreateIoCompletionPort)(HANDLE FileHandle,
+ HANDLE ExistingCompletionPort,
+ ULONG_PTR CompletionKey,
+ DWORD NumberOfConcurrentThreads);
+extern int (WINAPI *g_pufnNtQueryInformationThread) (HANDLE ThreadHandle,
+ THREADINFOCLASS ThreadInformationClass,
+ PVOID ThreadInformation,
+ ULONG ThreadInformationLength,
+ PULONG ReturnLength);
+extern int (WINAPI * g_pufnNtQuerySystemInformation) (SYSTEM_INFORMATION_CLASS SystemInformationClass,
+ PVOID SystemInformation,
+ ULONG SystemInformationLength,
+ PULONG ReturnLength OPTIONAL);
+#endif // !FEATURE_PAL
+#define FILETIME_TO_INT64(t) (*(__int64*)&(t))
+#define MILLI_TO_100NANO(x) (x * 10000) // convert from milliseond to 100 nanosecond unit
+ * This type is supposed to be private to ThreadpoolMgr.
+ * It's at global scope because Strike needs to be able to access its
+ * definition.
+ */
+struct WorkRequest {
+ WorkRequest* next;
+ PVOID Context;
+typedef struct _IOCompletionContext
+ DWORD ErrorCode;
+ DWORD numBytesTransferred;
+ LPOVERLAPPED lpOverlapped;
+ size_t key;
+} IOCompletionContext, *PIOCompletionContext;
+typedef DPTR(WorkRequest) PTR_WorkRequest;
+class ThreadpoolMgr
+ friend class ClrDataAccess;
+ friend struct DelegateInfo;
+ friend class ThreadPoolNative;
+ friend class TimerNative;
+ friend class UnManagedPerAppDomainTPCount;
+ friend class ManagedPerAppDomainTPCount;
+ friend class PerAppDomainTPCountList;
+ friend class HillClimbing;
+ friend struct _DacGlobals;
+ //
+ // UnfairSemaphore is a more scalable semaphore than CLRSemaphore. It prefers to release threads that have more recently begun waiting,
+ // to preserve locality. Additionally, very recently-waiting threads can be released without an addition kernel transition to unblock
+ // them, which reduces latency.
+ //
+ // UnfairSemaphore is only appropriate in scenarios where the order of unblocking threads is not important, and where threads frequently
+ // need to be woken. This is true of the ThreadPool's "worker semaphore", but not, for example, of the "retired worker semaphore" which is
+ // only rarely signalled.
+ //
+ // A further optimization that could be done here would be to replace CLRSemaphore with a Win32 IO Completion Port. Completion ports
+ // unblock threads in LIFO order, unlike the roughly-FIFO ordering of ordinary semaphores, and that would help to keep the "warm" threads warm.
+ // We did not do this in CLR 4.0 because hosts currently have no way of intercepting calls to IO Completion Ports (other than THE completion port
+ // behind the I/O thread pool), and we did not have time to explore the implications of this. Also, completion ports are not available on the Mac,
+ // though Snow Leopard has something roughly similar (and a regular Semaphore would do on the Mac in a pinch).
+ //
+ class UnfairSemaphore
+ {
+ private:
+ // padding to ensure we get our own cache line
+ BYTE padding1[64];
+ //
+ // We track everything we care about in a single 64-bit struct to allow us to
+ // do CompareExchanges on this for atomic updates.
+ //
+ union Counts
+ {
+ struct
+ {
+ int spinners : 16; //how many threads are currently spin-waiting for this semaphore?
+ int countForSpinners : 16; //how much of the semaphore's count is availble to spinners?
+ int waiters : 16; //how many threads are blocked in the OS waiting for this semaphore?
+ int countForWaiters : 16; //how much count is available to waiters?
+ };
+ LONGLONG asLongLong;
+ } m_counts;
+ private:
+ const int m_spinLimitPerProcessor; //used when calculating max spin duration
+ CLRSemaphore m_sem; //waiters wait on this
+ // padding to ensure we get our own cache line
+ BYTE padding2[64];
+ INDEBUG(int m_maxCount;)
+ bool UpdateCounts(Counts newCounts, Counts currentCounts)
+ {
+ Counts oldCounts;
+ oldCounts.asLongLong = FastInterlockCompareExchangeLong(&m_counts.asLongLong, newCounts.asLongLong, currentCounts.asLongLong);
+ if (oldCounts.asLongLong == currentCounts.asLongLong)
+ {
+ // we succesfully updated the counts. Now validate what we put in.
+ // Note: we can't validate these unless the CompareExchange succeeds, because
+ // on x86 a VolatileLoad of m_counts is not atomic; we could end up getting inconsistent
+ // values. It's not until we've successfully stored the new values that we know for sure
+ // that the old values were correct (because if they were not, the CompareExchange would have
+ // failed.
+ _ASSERTE(newCounts.spinners >= 0);
+ _ASSERTE(newCounts.countForSpinners >= 0);
+ _ASSERTE(newCounts.waiters >= 0);
+ _ASSERTE(newCounts.countForWaiters >= 0);
+ _ASSERTE(newCounts.countForSpinners + newCounts.countForWaiters <= m_maxCount);
+ return true;
+ }
+ else
+ {
+ // we lost a race with some other thread, and will need to try again.
+ return false;
+ }
+ }
+ public:
+ UnfairSemaphore(int maxCount, int spinLimitPerProcessor)
+ : m_spinLimitPerProcessor(spinLimitPerProcessor)
+ {
+ {
+ }
+ _ASSERTE(maxCount <= 0x7fff); //counts need to fit in signed 16-bit ints
+ INDEBUG(m_maxCount = maxCount;)
+ m_counts.asLongLong = 0;
+ m_sem.Create(0, maxCount);
+ }
+ //
+ // no destructor - CLRSemaphore will close itself in its own destructor.
+ //
+ //~UnfairSemaphore()
+ //{
+ //}
+ void Release(int countToRelease)
+ {
+ while (true)
+ {
+ Counts currentCounts, newCounts;
+ currentCounts.asLongLong = VolatileLoad(&m_counts.asLongLong);
+ newCounts = currentCounts;
+ int remainingCount = countToRelease;
+ // First, prefer to release existing spinners,
+ // because a) they're hot, and b) we don't need a kernel
+ // transition to release them.
+ int spinnersToRelease = max(0, min(remainingCount, currentCounts.spinners - currentCounts.countForSpinners));
+ newCounts.countForSpinners += spinnersToRelease;
+ remainingCount -= spinnersToRelease;
+ // Next, prefer to release existing waiters
+ int waitersToRelease = max(0, min(remainingCount, currentCounts.waiters - currentCounts.countForWaiters));
+ newCounts.countForWaiters += waitersToRelease;
+ remainingCount -= waitersToRelease;
+ // Finally, release any future spinners that might come our way
+ newCounts.countForSpinners += remainingCount;
+ // Try to commit the transaction
+ if (UpdateCounts(newCounts, currentCounts))
+ {
+ // Now we need to release the waiters we promised to release
+ if (waitersToRelease > 0)
+ {
+ LONG previousCount;
+ INDEBUG(BOOL success =) m_sem.Release((LONG)waitersToRelease, &previousCount);
+ _ASSERTE(success);
+ }
+ break;
+ }
+ }
+ }
+ bool Wait(DWORD timeout)
+ {
+ while (true)
+ {
+ Counts currentCounts, newCounts;
+ currentCounts.asLongLong = VolatileLoad(&m_counts.asLongLong);
+ newCounts = currentCounts;
+ // First, just try to grab some count.
+ if (currentCounts.countForSpinners > 0)
+ {
+ newCounts.countForSpinners--;
+ if (UpdateCounts(newCounts, currentCounts))
+ return true;
+ }
+ else
+ {
+ // No count available, become a spinner
+ newCounts.spinners++;
+ if (UpdateCounts(newCounts, currentCounts))
+ break;
+ }
+ }
+ //
+ // Now we're a spinner.
+ //
+ int numSpins = 0;
+ while (true)
+ {
+ Counts currentCounts, newCounts;
+ currentCounts.asLongLong = VolatileLoad(&m_counts.asLongLong);
+ newCounts = currentCounts;
+ if (currentCounts.countForSpinners > 0)
+ {
+ newCounts.countForSpinners--;
+ newCounts.spinners--;
+ if (UpdateCounts(newCounts, currentCounts))
+ return true;
+ }
+ else
+ {
+ double spinnersPerProcessor = (double)currentCounts.spinners / ThreadpoolMgr::NumberOfProcessors;
+ int spinLimit = (int)((m_spinLimitPerProcessor / spinnersPerProcessor) + 0.5);
+ if (numSpins >= spinLimit)
+ {
+ newCounts.spinners--;
+ newCounts.waiters++;
+ if (UpdateCounts(newCounts, currentCounts))
+ break;
+ }
+ else
+ {
+ //
+ // We yield to other threads using SleepEx rather than the more traditional SwitchToThread.
+ // This is because SwitchToThread does not yield to threads currently scheduled to run on other
+ // processors. On a 4-core machine, for example, this means that SwitchToThread is only ~25% likely
+ // to yield to the correct thread in some scenarios.
+ // SleepEx has the disadvantage of not yielding to lower-priority threads. However, this is ok because
+ // once we've called this a few times we'll become a "waiter" and wait on the CLRSemaphore, and that will
+ // yield to anything that is runnable.
+ //
+ ClrSleepEx(0, FALSE);
+ numSpins++;
+ }
+ }
+ }
+ //
+ // Now we're a waiter
+ //
+ DWORD result = m_sem.Wait(timeout, FALSE);
+ _ASSERTE(WAIT_OBJECT_0 == result || WAIT_TIMEOUT == result);
+ while (true)
+ {
+ Counts currentCounts, newCounts;
+ currentCounts.asLongLong = VolatileLoad(&m_counts.asLongLong);
+ newCounts = currentCounts;
+ newCounts.waiters--;
+ if (result == WAIT_OBJECT_0)
+ newCounts.countForWaiters--;
+ if (UpdateCounts(newCounts, currentCounts))
+ return (result == WAIT_OBJECT_0);
+ }
+ }
+ };
+ struct ThreadCounter
+ {
+ static const int MaxPossibleCount = 0x7fff;
+ union Counts
+ {
+ struct
+ {
+ //
+ // Note: these are signed rather than unsigned to allow us to detect under/overflow.
+ //
+ int MaxWorking : 16; //Determined by HillClimbing; adjusted elsewhere for timeouts, etc.
+ int NumActive : 16; //Active means working or waiting on WorkerSemaphore. These are "warm/hot" threads.
+ int NumWorking : 16; //Trying to get work from various queues. Not waiting on either semaphore.
+ int NumRetired : 16; //Not trying to get work; waiting on RetiredWorkerSemaphore. These are "cold" threads.
+ // Note: the only reason we need "retired" threads at all is that it allows some threads to eventually time out
+ // even if other threads are getting work. If we ever make WorkerSemaphore a true LIFO semaphore, we will no longer
+ // need the concept of "retirement" - instead, the very "coldest" threads will naturally be the first to time out.
+ };
+ LONGLONG AsLongLong;
+ bool operator==(Counts other) {LIMITED_METHOD_CONTRACT; return AsLongLong == other.AsLongLong;}
+ } counts;
+ // padding to ensure we get our own cache line
+ BYTE padding[64];
+ Counts GetCleanCounts()
+ {
+#ifdef _WIN64
+ // VolatileLoad x64 bit read is atomic
+ return DangerousGetDirtyCounts();
+#else // !_WIN64
+ // VolatileLoad may result in torn read
+ Counts result;
+ result.AsLongLong = FastInterlockCompareExchangeLong(&counts.AsLongLong, 0, 0);
+ ValidateCounts(result);
+ result.AsLongLong = 0; //prevents prefast warning for DAC builds
+ return result;
+#endif // !_WIN64
+ }
+ //
+ // This does a non-atomic read of the counts. The returned value is suitable only
+ // for use inside of a read-compare-exchange loop, where the compare-exhcange must succeed
+ // before any action is taken. Use GetCleanWorkerCounts for other needs, but keep in mind
+ // it's much slower.
+ //
+ Counts DangerousGetDirtyCounts()
+ {
+ Counts result;
+ result.AsLongLong = VolatileLoad(&counts.AsLongLong);
+ result.AsLongLong = 0; //prevents prefast warning for DAC builds
+ return result;
+ }
+ Counts CompareExchangeCounts(Counts newCounts, Counts oldCounts)
+ {
+ Counts result;
+ result.AsLongLong = FastInterlockCompareExchangeLong(&counts.AsLongLong, newCounts.AsLongLong, oldCounts.AsLongLong);
+ if (result == oldCounts)
+ {
+ // can only do validation on success; if we failed, it may have been due to a previous
+ // dirty read, which may contain invalid values.
+ ValidateCounts(result);
+ ValidateCounts(newCounts);
+ }
+ result.AsLongLong = 0; //prevents prefast warning for DAC builds
+ return result;
+ }
+ private:
+ static void ValidateCounts(Counts counts)
+ {
+ _ASSERTE(counts.MaxWorking > 0);
+ _ASSERTE(counts.NumActive >= 0);
+ _ASSERTE(counts.NumWorking >= 0);
+ _ASSERTE(counts.NumRetired >= 0);
+ _ASSERTE(counts.NumWorking <= counts.NumActive);
+ }
+ };
+ static void ReportThreadStatus(bool isWorking);
+ // enumeration of different kinds of memory blocks that are recycled
+ enum MemType
+ {
+ MEMTYPE_AsyncCallback = 0,
+ MEMTYPE_DelegateInfo = 1,
+ MEMTYPE_WorkRequest = 2,
+ MEMTYPE_PostRequest = 3,
+ };
+ static BOOL Initialize();
+ static BOOL SetMaxThreadsHelper(DWORD MaxWorkerThreads,
+ DWORD MaxIOCompletionThreads);
+ static BOOL SetMaxThreads(DWORD MaxWorkerThreads,
+ DWORD MaxIOCompletionThreads);
+ static BOOL GetMaxThreads(DWORD* MaxWorkerThreads,
+ DWORD* MaxIOCompletionThreads);
+ static BOOL SetMinThreads(DWORD MinWorkerThreads,
+ DWORD MinIOCompletionThreads);
+ static BOOL GetMinThreads(DWORD* MinWorkerThreads,
+ DWORD* MinIOCompletionThreads);
+ static BOOL GetAvailableThreads(DWORD* AvailableWorkerThreads,
+ DWORD* AvailableIOCompletionThreads);
+ static BOOL QueueUserWorkItem(LPTHREAD_START_ROUTINE Function,
+ PVOID Context,
+ ULONG Flags,
+ BOOL UnmanagedTPRequest=TRUE);
+ static BOOL PostQueuedCompletionStatus(LPOVERLAPPED lpOverlapped,
+ inline static BOOL IsCompletionPortInitialized()
+ {
+ return GlobalCompletionPort != NULL;
+ }
+ static BOOL DrainCompletionPortQueue();
+ static BOOL RegisterWaitForSingleObject(PHANDLE phNewWaitObject,
+ HANDLE hWaitObject,
+ PVOID Context,
+ ULONG timeout,
+ DWORD dwFlag);
+ static BOOL UnregisterWaitEx(HANDLE hWaitObject,HANDLE CompletionEvent);
+ static void WaitHandleCleanup(HANDLE hWaitObject);
+ static BOOL BindIoCompletionCallback(HANDLE FileHandle,
+ ULONG Flags,
+ DWORD& errorCode);
+ static void WaitIOCompletionCallback(DWORD dwErrorCode,
+ DWORD numBytesTransferred,
+ LPOVERLAPPED lpOverlapped);
+ static VOID CallbackForInitiateDrainageOfCompletionPortQueue(
+ DWORD dwErrorCode,
+ DWORD dwNumberOfBytesTransfered,
+ LPOVERLAPPED lpOverlapped
+ );
+ static VOID CallbackForContinueDrainageOfCompletionPortQueue(
+ DWORD dwErrorCode,
+ DWORD dwNumberOfBytesTransfered,
+ LPOVERLAPPED lpOverlapped
+ );
+ static BOOL SetAppDomainRequestsActive(BOOL UnmanagedTP = FALSE);
+ static void ClearAppDomainRequestsActive(BOOL UnmanagedTP = FALSE, BOOL AdUnloading = FALSE, LONG index = -1);
+ static inline void UpdateLastDequeueTime()
+ {
+ VolatileStore(&LastDequeueTime, (unsigned int)GetTickCount());
+ }
+ static BOOL CreateTimerQueueTimer(PHANDLE phNewTimer,
+ PVOID Parameter,
+ DWORD DueTime,
+ DWORD Period,
+ ULONG Flags);
+ static BOOL ChangeTimerQueueTimer(HANDLE Timer,
+ ULONG DueTime,
+ ULONG Period);
+ static BOOL DeleteTimerQueueTimer(HANDLE Timer,
+ HANDLE CompletionEvent);
+ static void RecycleMemory(LPVOID mem, enum MemType memType);
+ static void FlushQueueOfTimerInfos();
+ static BOOL HaveTimerInfosToFlush() { return TimerInfosToBeRecycled != NULL; }
+ inline static BOOL IsThreadPoolHosted()
+ {
+ IHostThreadpoolManager *provider = CorHost2::GetHostThreadpoolManager();
+ if (provider)
+ return TRUE;
+ else
+ return FALSE;
+ }
+#ifndef FEATURE_PAL
+ static LPOVERLAPPED CompletionPortDispatchWorkWithinAppDomain(Thread* pThread, DWORD* pErrorCode, DWORD* pNumBytes, size_t* pKey, DWORD adid);
+ static void StoreOverlappedInfoInThread(Thread* pThread, DWORD dwErrorCode, DWORD dwNumBytes, size_t key, LPOVERLAPPED lpOverlapped);
+#endif // !FEATURE_PAL
+ // Enable filtering of correlation ETW events for cases handled at a higher abstraction level
+ static FORCEINLINE BOOL AreEtwQueueEventsSpeciallyHandled(LPTHREAD_START_ROUTINE Function)
+ {
+ // Timer events are handled at a higher abstraction level: in the managed Timer class
+ return (Function == ThreadpoolMgr::AsyncTimerCallbackCompletion);
+ }
+ {
+ // We ignore drainage events b/c they are uninteresting
+ // We handle registered waits at a higher abstraction level
+ return (Function == ThreadpoolMgr::CallbackForInitiateDrainageOfCompletionPortQueue
+ || Function == ThreadpoolMgr::CallbackForContinueDrainageOfCompletionPortQueue
+ || Function == ThreadpoolMgr::WaitIOCompletionCallback);
+ }
+ inline static void FreeWorkRequest(WorkRequest* workRequest)
+ {
+ RecycleMemory( workRequest, MEMTYPE_WorkRequest ); //delete workRequest;
+ }
+ inline static WorkRequest* MakeWorkRequest(LPTHREAD_START_ROUTINE function, PVOID context)
+ {
+ {
+ }
+ WorkRequest* wr = (WorkRequest*) GetRecycledMemory(MEMTYPE_WorkRequest);
+ _ASSERTE(wr);
+ if (NULL == wr)
+ return NULL;
+ wr->Function = function;
+ wr->Context = context;
+ wr->next = NULL;
+ return wr;
+ }
+ struct PostRequest {
+ DWORD errorCode;
+ DWORD numBytesTransferred;
+ LPOVERLAPPED lpOverlapped;
+ };
+ inline static PostRequest* MakePostRequest(LPOVERLAPPED_COMPLETION_ROUTINE function, LPOVERLAPPED overlapped)
+ {
+ {
+ }
+ PostRequest* pr = (PostRequest*) GetRecycledMemory(MEMTYPE_PostRequest);
+ _ASSERTE(pr);
+ if (NULL == pr)
+ return NULL;
+ pr->Function = function;
+ pr->errorCode = 0;
+ pr->numBytesTransferred = 0;
+ pr->lpOverlapped = overlapped;
+ return pr;
+ }
+ inline static void ReleasePostRequest(PostRequest *postRequest)
+ {
+ ThreadpoolMgr::RecycleMemory(postRequest, MEMTYPE_PostRequest);
+ }
+ typedef Wrapper< PostRequest *, DoNothing<PostRequest *>, ThreadpoolMgr::ReleasePostRequest > PostRequestHolder;
+#endif // #ifndef DACCESS_COMPILE
+ typedef struct {
+ DWORD numBytes;
+ ULONG_PTR *key;
+ LPOVERLAPPED pOverlapped;
+ DWORD errorCode;
+ } QueuedStatus;
+ typedef DPTR(struct _LIST_ENTRY) PTR_LIST_ENTRY;
+ typedef struct _LIST_ENTRY {
+ struct _LIST_ENTRY *Flink;
+ struct _LIST_ENTRY *Blink;
+ struct WaitInfo;
+ typedef struct {
+ HANDLE threadHandle;
+ DWORD threadId;
+ CLREvent startEvent;
+ LONG NumWaitHandles; // number of wait objects registered to the thread <=64
+ LONG NumActiveWaits; // number of objects, thread is actually waiting on (this may be less than
+ // NumWaitHandles since the thread may not have activated some waits
+ HANDLE waitHandle[MAX_WAITHANDLES]; // array of wait handles (copied from waitInfo since
+ // we need them to be contiguous)
+ LIST_ENTRY waitPointer[MAX_WAITHANDLES]; // array of doubly linked list of corresponding waitinfo
+ } ThreadCB;
+ typedef struct {
+ ULONG startTime; // time at which wait was started
+ // endTime = startTime+timeout
+ ULONG remainingTime; // endTime - currentTime
+ } WaitTimerInfo;
+ struct WaitInfo {
+ LIST_ENTRY link; // Win9x does not allow duplicate waithandles, so we need to
+ // group all waits on a single waithandle using this linked list
+ HANDLE waitHandle;
+ PVOID Context;
+ ULONG timeout;
+ WaitTimerInfo timer;
+ DWORD flag;
+ DWORD state;
+ ThreadCB* threadCB;
+ LONG refCount; // when this reaches 0, the waitInfo can be safely deleted
+ CLREvent PartialCompletionEvent; // used to synchronize deactivation of a wait
+ CLREvent InternalCompletionEvent; // only one of InternalCompletion or ExternalCompletion is used
+ // but I cant make a union since CLREvent has a non-default constructor
+ HANDLE ExternalCompletionEvent; // they are signalled when all callbacks have completed (refCount=0)
+ ADID handleOwningAD;
+ OBJECTHANDLE ExternalEventSafeHandle;
+ } ;
+ // structure used to maintain global information about wait threads. Protected by WaitThreadsCriticalSection
+ typedef struct WaitThreadTag {
+ LIST_ENTRY link;
+ ThreadCB* threadCB;
+ } WaitThreadInfo;
+ struct AsyncCallback{
+ WaitInfo* wait;
+ BOOL waitTimedOut;
+ } ;
+ static VOID
+ AcquireAsyncCallback(AsyncCallback *pAsyncCB)
+ {
+ }
+ static VOID
+ ReleaseAsyncCallback(AsyncCallback *pAsyncCB)
+ {
+ {
+ }
+ WaitInfo *waitInfo = pAsyncCB->wait;
+ ThreadpoolMgr::RecycleMemory((LPVOID*)pAsyncCB, ThreadpoolMgr::MEMTYPE_AsyncCallback);
+ // if this was a single execution, we now need to stop rooting registeredWaitHandle
+ // in a GC handle. This will cause the finalizer to pick it up and call the cleanup
+ // routine.
+ if ( (waitInfo->flag & WAIT_SINGLE_EXECUTION) && (waitInfo->flag & WAIT_FREE_CONTEXT))
+ {
+ DelegateInfo* pDelegate = (DelegateInfo*) waitInfo->Context;
+ _ASSERTE(pDelegate->m_registeredWaitHandle);
+ {
+ AppDomainFromIDHolder ad(pDelegate->m_appDomainId, TRUE);
+ if (!ad.IsUnloaded())
+ // if no domain then handle already gone or about to go.
+ StoreObjectInHandle(pDelegate->m_registeredWaitHandle, NULL);
+ }
+ }
+ if (InterlockedDecrement(&waitInfo->refCount) == 0)
+ ThreadpoolMgr::DeleteWait(waitInfo);
+ }
+ typedef Holder<AsyncCallback *, ThreadpoolMgr::AcquireAsyncCallback, ThreadpoolMgr::ReleaseAsyncCallback> AsyncCallbackHolder;
+ inline static AsyncCallback* MakeAsyncCallback()
+ {
+ return (AsyncCallback*) GetRecycledMemory(MEMTYPE_AsyncCallback);
+ }
+ static VOID ReleaseInfo(OBJECTHANDLE& hndSafeHandle,
+ ADID& owningAD,
+ HANDLE hndNativeHandle)
+ {
+ {
+ }
+// Use of EX_TRY, GCPROTECT etc in the same function is causing prefast to complain about local variables with
+// same name masking each other (#246). The error could not be suppressed with "#pragma PREFAST_SUPPRESS"
+#ifndef _PREFAST_
+ if (hndSafeHandle != NULL)
+ {
+ {
+ {
+ {
+ // Read the GC handle
+ refSH = (SAFEHANDLEREF) ObjectToOBJECTREF(ObjectFromHandle(hndSafeHandle));
+ // Destroy the GC handle
+ DestroyHandle(hndSafeHandle);
+ if (refSH != NULL)
+ {
+ SafeHandleHolder h(&refSH);
+ HANDLE hEvent = refSH->GetHandle();
+ {
+ UnsafeSetEvent(hEvent);
+ }
+ }
+ }
+ }
+ {
+ }
+ EX_END_CATCH(SwallowAllExceptions);
+ }
+ hndSafeHandle = NULL;
+ owningAD = (ADID) 0;
+ }
+ }
+#endif // #ifndef DACCESS_COMPILE
+ typedef struct {
+ LIST_ENTRY link;
+ HANDLE Handle;
+ } WaitEvent ;
+ // Timer
+ typedef struct {
+ LIST_ENTRY link; // doubly linked list of timers
+ ULONG FiringTime; // TickCount of when to fire next
+ WAITORTIMERCALLBACK Function; // Function to call when timer fires
+ PVOID Context; // Context to pass to function when timer fires
+ ULONG Period;
+ DWORD flag; // How do we deal with the context
+ DWORD state;
+ LONG refCount;
+ HANDLE ExternalCompletionEvent; // only one of this is used, but cant do a union since CLREvent has a non-default constructor
+ CLREvent InternalCompletionEvent; // flags indicates which one is being used
+ OBJECTHANDLE ExternalEventSafeHandle;
+ ADID handleOwningAD;
+ } TimerInfo;
+ static VOID AcquireWaitInfo(WaitInfo *pInfo)
+ {
+ }
+ static VOID ReleaseWaitInfo(WaitInfo *pInfo)
+ {
+ ReleaseInfo(pInfo->ExternalEventSafeHandle,
+ pInfo->handleOwningAD,
+ pInfo->ExternalCompletionEvent);
+ }
+ static VOID AcquireTimerInfo(TimerInfo *pInfo)
+ {
+ }
+ static VOID ReleaseTimerInfo(TimerInfo *pInfo)
+ {
+ ReleaseInfo(pInfo->ExternalEventSafeHandle,
+ pInfo->handleOwningAD,
+ pInfo->ExternalCompletionEvent);
+ }
+ typedef Holder<WaitInfo *, ThreadpoolMgr::AcquireWaitInfo, ThreadpoolMgr::ReleaseWaitInfo> WaitInfoHolder;
+ typedef Holder<TimerInfo *, ThreadpoolMgr::AcquireTimerInfo, ThreadpoolMgr::ReleaseTimerInfo> TimerInfoHolder;
+ typedef struct {
+ TimerInfo* Timer; // timer to be updated
+ ULONG DueTime ; // new due time
+ ULONG Period ; // new period
+ } TimerUpdateInfo;
+ // Definitions and data structures to support recycling of high-frequency
+ // memory blocks. We use a spin-lock to access the list
+ class RecycledListInfo
+ {
+ static const unsigned int MaxCachedEntries = 40;
+ struct Entry
+ {
+ Entry* next;
+ };
+ Volatile<LONG> lock; // this is the spin lock
+ DWORD count; // count of number of elements in the list
+ Entry* root; // ptr to first element of recycled list
+#ifndef _WIN64
+ DWORD filler; // Pad the structure to a multiple of the 16.
+ //--//
+ RecycledListInfo()
+ {
+ lock = 0;
+ root = NULL;
+ count = 0;
+ }
+ FORCEINLINE bool CanInsert()
+ {
+ return count < MaxCachedEntries;
+ }
+ {
+ if(root == NULL) return NULL; // No need for acquiring the lock, there's nothing to remove.
+ AcquireLock();
+ Entry* ret = (Entry*)root;
+ if(ret)
+ {
+ root = ret->next;
+ count -= 1;
+ }
+ ReleaseLock();
+ return ret;
+ }
+ FORCEINLINE void Insert( LPVOID mem )
+ {
+ AcquireLock();
+ Entry* entry = (Entry*)mem;
+ entry->next = root;
+ root = entry;
+ count += 1;
+ ReleaseLock();
+ }
+ private:
+ FORCEINLINE void AcquireLock()
+ {
+ unsigned int rounds = 0;
+ DWORD dwSwitchCount = 0;
+ while(lock != 0 || FastInterlockExchange( &lock, 1 ) != 0)
+ {
+ YieldProcessor(); // indicate to the processor that we are spinning
+ rounds++;
+ if((rounds % 32) == 0)
+ {
+ __SwitchToThread( 0, ++dwSwitchCount );
+ }
+ }
+ }
+ FORCEINLINE void ReleaseLock()
+ {
+ lock = 0;
+ }
+ };
+ //
+ // It's critical that we ensure these pointers are allocated by the linker away from
+ // variables that are modified a lot at runtime.
+ //
+ // The use of the CacheGuard is a temporary solution,
+ // the thread pool has to be refactor away from static variable and
+ // toward a single global structure, where we can control the locality of variables.
+ //
+ class RecycledListsWrapper
+ {
+ DWORD CacheGuardPre[64/sizeof(DWORD)];
+ RecycledListInfo (*pRecycledListPerProcessor)[MEMTYPE_COUNT]; // RecycledListInfo [numProc][MEMTYPE_COUNT]
+ DWORD CacheGuardPost[64/sizeof(DWORD)];
+ public:
+ void Initialize( unsigned int numProcs );
+ FORCEINLINE bool IsInitialized()
+ {
+ return pRecycledListPerProcessor != NULL;
+ }
+ FORCEINLINE RecycledListInfo& GetRecycleMemoryInfo( enum MemType memType )
+ {
+ if (CPUGroupInfo::CanEnableGCCPUGroups() && CPUGroupInfo::CanEnableThreadUseAllCpuGroups())
+ return pRecycledListPerProcessor[CPUGroupInfo::CalculateCurrentProcessorNumber()][memType];
+ else
+ // Turns out GetCurrentProcessorNumber can return a value greater than the number of processors reported by
+ // GetSystemInfo, if we're running in WOW64 on a machine with >32 processors.
+ return pRecycledListPerProcessor[GetCurrentProcessorNumber()%NumberOfProcessors][memType];
+ }
+ };
+#define GATE_THREAD_STATUS_NOT_RUNNING 0 // There is no gate thread
+#define GATE_THREAD_STATUS_REQUESTED 1 // There is a gate thread, and someone has asked it to stick around recently
+#define GATE_THREAD_STATUS_WAITING_FOR_REQUEST 2 // There is a gate thread, but nobody has asked it to stay. It may die soon
+ // Private methods
+ static DWORD __stdcall intermediateThreadProc(PVOID arg);
+ typedef struct {
+ PVOID lpArg;
+ } intermediateThreadParam;
+ static Thread* CreateUnimpersonatedThread(LPTHREAD_START_ROUTINE lpStartAddress, LPVOID lpArgs, BOOL *pIsCLRThread);
+ static BOOL CreateWorkerThread();
+ static void EnqueueWorkRequest(WorkRequest* wr);
+ static WorkRequest* DequeueWorkRequest();
+ static void ExecuteWorkRequest(bool* foundWork, bool* wasNotRecalled);
+ static DWORD WINAPI ExecuteHostRequest(PVOID pArg);
+ inline static void AppendWorkRequest(WorkRequest* entry)
+ {
+ {
+ }
+ if (WorkRequestTail)
+ {
+ _ASSERTE(WorkRequestHead != NULL);
+ WorkRequestTail->next = entry;
+ }
+ else
+ {
+ _ASSERTE(WorkRequestHead == NULL);
+ WorkRequestHead = entry;
+ }
+ WorkRequestTail = entry;
+ _ASSERTE(WorkRequestTail->next == NULL);
+ }
+ inline static WorkRequest* RemoveWorkRequest()
+ {
+ {
+ }
+ WorkRequest* entry = NULL;
+ if (WorkRequestHead)
+ {
+ entry = WorkRequestHead;
+ WorkRequestHead = entry->next;
+ if (WorkRequestHead == NULL)
+ WorkRequestTail = NULL;
+ }
+ return entry;
+ }
+ static void EnsureInitialized();
+ static void InitPlatformVariables();
+ inline static BOOL IsInitialized()
+ {
+ return Initialization == -1;
+ }
+ static void MaybeAddWorkingWorker();
+ static void NotifyWorkItemCompleted()
+ {
+ if (!CLRThreadpoolHosted())
+ {
+ Thread::IncrementThreadPoolCompletionCount();
+ UpdateLastDequeueTime();
+ }
+ }
+ static bool ShouldAdjustMaxWorkersActive()
+ {
+ if (CLRThreadpoolHosted())
+ return false;
+ DWORD priorTime = PriorCompletedWorkRequestsTime;
+ MemoryBarrier(); // read fresh value for NextCompletedWorkRequestsTime below
+ DWORD requiredInterval = NextCompletedWorkRequestsTime - priorTime;
+ DWORD elapsedInterval = GetTickCount() - priorTime;
+ if (elapsedInterval >= requiredInterval)
+ {
+ ThreadCounter::Counts counts = WorkerCounter.GetCleanCounts();
+ if (counts.NumActive <= counts.MaxWorking)
+ return true;
+ }
+ return false;
+ }
+ static void AdjustMaxWorkersActive();
+ static bool ShouldWorkerKeepRunning();
+ static BOOL SuspendProcessing();
+ static DWORD SafeWait(CLREvent * ev, DWORD sleepTime, BOOL alertable);
+ static DWORD __stdcall WorkerThreadStart(LPVOID lpArgs);
+ static BOOL AddWaitRequest(HANDLE waitHandle, WaitInfo* waitInfo);
+ static ThreadCB* FindWaitThread(); // returns a wait thread that can accomodate another wait request
+ static BOOL CreateWaitThread();
+ static void __stdcall InsertNewWaitForSelf(WaitInfo* pArg);
+ static int FindWaitIndex(const ThreadCB* threadCB, const HANDLE waitHandle);
+ static DWORD MinimumRemainingWait(LIST_ENTRY* waitInfo, unsigned int numWaits);
+ static void ProcessWaitCompletion( WaitInfo* waitInfo,
+ unsigned index, // array index
+ BOOL waitTimedOut);
+ static DWORD __stdcall WaitThreadStart(LPVOID lpArgs);
+ static DWORD __stdcall AsyncCallbackCompletion(PVOID pArgs);
+ static void QueueTimerInfoForRelease(TimerInfo *pTimerInfo);
+ static DWORD __stdcall QUWIPostCompletion(PVOID pArgs);
+ static void DeactivateWait(WaitInfo* waitInfo);
+ static void DeactivateNthWait(WaitInfo* waitInfo, DWORD index);
+ static void DeleteWait(WaitInfo* waitInfo);
+ inline static void ShiftWaitArray( ThreadCB* threadCB,
+ ULONG SrcIndex,
+ ULONG DestIndex,
+ ULONG count)
+ {
+ memmove(&threadCB->waitHandle[DestIndex],
+ &threadCB->waitHandle[SrcIndex],
+ count * sizeof(HANDLE));
+ memmove(&threadCB->waitPointer[DestIndex],
+ &threadCB->waitPointer[SrcIndex],
+ count * sizeof(LIST_ENTRY));
+ }
+ static void __stdcall DeregisterWait(WaitInfo* pArgs);
+#ifndef FEATURE_PAL
+ // holds the aggregate of system cpu usage of all processors
+ {
+ LARGE_INTEGER kernelTime;
+ DWORD_PTR affinityMask;
+ int numberOfProcessors;
+ int usageBufferSize;
+ static int GetCPUBusyTime_NT(PROCESS_CPU_INFORMATION* pOldInfo);
+ static BOOL CreateCompletionPortThread(LPVOID lpArgs);
+ static DWORD __stdcall CompletionPortThreadStart(LPVOID lpArgs);
+ inline static bool HaveNativeWork()
+ {
+ return WorkRequestHead != NULL;
+ }
+ static void GrowCompletionPortThreadpoolIfNeeded();
+ static BOOL ShouldGrowCompletionPortThreadpool(ThreadCounter::Counts counts);
+ static int GetCPUBusyTime_NT(PAL_IOCP_CPU_INFORMATION* pOldInfo);
+#endif // !FEATURE_PAL
+ static BOOL IsIoPending();
+ static BOOL CreateGateThread();
+ static void EnsureGateThreadRunning();
+ static bool ShouldGateThreadKeepRunning();
+ static DWORD __stdcall GateThreadStart(LPVOID lpArgs);
+ static BOOL SufficientDelaySinceLastSample(unsigned int LastThreadCreationTime,
+ unsigned NumThreads, // total number of threads of that type (worker or CP)
+ double throttleRate=0.0 // the delay is increased by this percentage for each extra thread
+ );
+ static BOOL SufficientDelaySinceLastDequeue();
+ static LPVOID GetRecycledMemory(enum MemType memType);
+ static DWORD __stdcall TimerThreadStart(LPVOID args);
+ static void TimerThreadFire(); // helper method used by TimerThreadStart
+ static void __stdcall InsertNewTimer(TimerInfo* pArg);
+ static DWORD FireTimers();
+ static DWORD __stdcall AsyncTimerCallbackCompletion(PVOID pArgs);
+ static void DeactivateTimer(TimerInfo* timerInfo);
+ static DWORD __stdcall AsyncDeleteTimer(PVOID pArgs);
+ static void DeleteTimer(TimerInfo* timerInfo);
+ static void __stdcall UpdateTimer(TimerUpdateInfo* pArgs);
+ static void __stdcall DeregisterTimer(TimerInfo* pArgs);
+ inline static DWORD QueueDeregisterWait(HANDLE waitThread, WaitInfo* waitInfo)
+ {
+ {
+ }
+ DWORD result = QueueUserAPC(reinterpret_cast<PAPCFUNC>(DeregisterWait), waitThread, reinterpret_cast<ULONG_PTR>(waitInfo));
+ SetWaitThreadAPCPending();
+ return result;
+ }
+ inline static void SetWaitThreadAPCPending() {IsApcPendingOnWaitThread = TRUE;}
+ inline static void ResetWaitThreadAPCPending() {IsApcPendingOnWaitThread = FALSE;}
+ inline static BOOL IsWaitThreadAPCPending() {return IsApcPendingOnWaitThread;}
+#ifdef _DEBUG
+ inline static DWORD GetTickCount()
+ {
+ return ::GetTickCount() + TickCountAdjustment;
+ }
+#endif // #ifndef DACCESS_COMPILE
+ // Private variables
+ static LONG Initialization; // indicator of whether the threadpool is initialized.
+ SVAL_DECL(LONG,MinLimitTotalWorkerThreads); // same as MinLimitTotalCPThreads
+ SVAL_DECL(LONG,MaxLimitTotalWorkerThreads); // same as MaxLimitTotalCPThreads
+ DECLSPEC_ALIGN(64) static unsigned int LastDequeueTime; // used to determine if work items are getting thread starved
+ static HillClimbing HillClimbingInstance;
+ DECLSPEC_ALIGN(64) static LONG PriorCompletedWorkRequests;
+ static DWORD PriorCompletedWorkRequestsTime;
+ static DWORD NextCompletedWorkRequestsTime;
+ static LARGE_INTEGER CurrentSampleStartTime;
+ static int ThreadAdjustmentInterval;
+ SPTR_DECL(WorkRequest,WorkRequestHead); // Head of work request queue
+ SPTR_DECL(WorkRequest,WorkRequestTail); // Head of work request queue
+ static unsigned int LastCPThreadCreation; // last time a completion port thread was created
+ static unsigned int NumberOfProcessors; // = NumberOfWorkerThreads - no. of blocked threads
+ static BOOL IsApcPendingOnWaitThread; // Indicates if an APC is pending on the wait thread
+ // This needs to be non-hosted, because worker threads can run prior to EE startup.
+ static DangerousNonHostedSpinLock ThreadAdjustmentLock;
+ static CrstStatic WorkerCriticalSection;
+ static const DWORD WorkerTimeout = 20 * 1000;
+ static const DWORD WorkerTimeoutAppX = 5 * 1000; // shorter timeout to allow threads to exit prior to app suspension
+ DECLSPEC_ALIGN(64) SVAL_DECL(ThreadCounter,WorkerCounter);
+ //
+ // WorkerSemaphore is an UnfairSemaphore because:
+ // 1) Threads enter and exit this semaphore very frequently, and thus benefit greatly from the spinning done by UnfairSemaphore
+ // 2) There is no functional reason why any particular thread should be preferred when waking workers. This only impacts performance,
+ // and un-fairness helps performance in this case.
+ //
+ static UnfairSemaphore* WorkerSemaphore;
+ //
+ // RetiredWorkerSemaphore is a regular CLRSemaphore, not an UnfairSemaphore, because if a thread waits on this semaphore is it almost certainly
+ // NOT going to be released soon, so the spinning done in UnfairSemaphore only burns valuable CPU time. However, if UnfairSemaphore is ever
+ // implemented in terms of a Win32 IO Completion Port, we should reconsider this. The IOCP's LIFO unblocking behavior could help keep working set
+ // down, by constantly re-using the same small set of retired workers rather than round-robining between all of them as CLRSemaphore will do.
+ // If we go that route, we should add a "no-spin" option to UnfairSemaphore.Wait to avoid wasting CPU.
+ //
+ static CLRSemaphore* RetiredWorkerSemaphore;
+ static CLREvent * RetiredCPWakeupEvent;
+ static CrstStatic WaitThreadsCriticalSection;
+ static LIST_ENTRY WaitThreadsHead; // queue of wait threads, each thread can handle upto 64 waits
+ static TimerInfo *TimerInfosToBeRecycled; // list of delegate infos associated with deleted timers
+ static CrstStatic TimerQueueCriticalSection; // critical section to synchronize timer queue access
+ SVAL_DECL(LIST_ENTRY,TimerQueue); // queue of timers
+ static HANDLE TimerThread; // Currently we only have one timer thread
+ static Thread* pTimerThread;
+ DECLSPEC_ALIGN(64) static DWORD LastTickCount; // the count just before timer thread goes to sleep
+ static BOOL InitCompletionPortThreadpool; // flag indicating whether completion port threadpool has been initialized
+ static HANDLE GlobalCompletionPort; // used for binding io completions on file handles
+ SVAL_DECL(ThreadCounter,CPThreadCounter);
+ SVAL_DECL(LONG,MaxLimitTotalCPThreads); // = MaxLimitCPThreadsPerCPU * number of CPUS
+ SVAL_DECL(LONG,MinLimitTotalCPThreads);
+ SVAL_DECL(LONG,MaxFreeCPThreads); // = MaxFreeCPThreadsPerCPU * Number of CPUS
+ DECLSPEC_ALIGN(64) static LONG GateThreadStatus; // See GateThreadStatus enumeration
+ static Volatile<LONG> NumCPInfrastructureThreads; // number of threads currently busy handling draining cycle
+ SVAL_DECL(LONG,cpuUtilization);
+ static LONG cpuUtilizationAverage;
+ DECLSPEC_ALIGN(64) static RecycledListsWrapper RecycledLists;
+#ifdef _DEBUG
+ static DWORD TickCountAdjustment; // add this value to value returned by GetTickCount
+ DECLSPEC_ALIGN(64) static int offset_counter;
+ static const int offset_multiplier = 128;
+#endif // _WIN32THREADPOOL_H