#ifdef FEATURE_PERFTRACING
+void ReleaseEventPipeThreadRef(EventPipeThread* pThread) { LIMITED_METHOD_CONTRACT; pThread->Release(); }
+void AcquireEventPipeThreadRef(EventPipeThread* pThread) { LIMITED_METHOD_CONTRACT; pThread->AddRef(); }
+
#ifndef __GNUC__
-__declspec(thread) ThreadEventBufferList ThreadEventBufferList::gCurrentThreadEventBufferList;
+__declspec(thread) EventPipeThreadHolder EventPipeThread::gCurrentEventPipeThreadHolder;;
#else // !__GNUC__
-thread_local ThreadEventBufferList ThreadEventBufferList::gCurrentThreadEventBufferList;
+thread_local EventPipeThreadHolder EventPipeThread::gCurrentEventPipeThreadHolder;
#endif // !__GNUC__
+EventPipeThread::EventPipeThread()
+{
+ CONTRACTL
+ {
+ THROWS;
+ GC_NOTRIGGER;
+ MODE_ANY;
+ }
+ CONTRACTL_END;
+ m_lock.Init(LOCK_TYPE_DEFAULT);
+ m_refCount = 0;
+}
+
+EventPipeThread::~EventPipeThread()
+{
+ _ASSERTE(m_pWriteBuffer == nullptr);
+ _ASSERTE(m_pBufferList == nullptr);
+}
+
+/*static */ EventPipeThread* EventPipeThread::Get()
+{
+ LIMITED_METHOD_CONTRACT;
+ return gCurrentEventPipeThreadHolder;
+}
+
+/*static */ void EventPipeThread::Set(EventPipeThread* pThread)
+{
+ LIMITED_METHOD_CONTRACT;
+ gCurrentEventPipeThreadHolder = pThread;
+}
+
+void EventPipeThread::AddRef()
+{
+ LIMITED_METHOD_CONTRACT;
+ FastInterlockIncrement(&m_refCount);
+}
+
+void EventPipeThread::Release()
+{
+ LIMITED_METHOD_CONTRACT;
+ if (FastInterlockDecrement(&m_refCount) == 0)
+ {
+ delete this;
+ }
+}
+
+SpinLock* EventPipeThread::GetLock()
+{
+ LIMITED_METHOD_CONTRACT;
+ return &m_lock;
+}
+
+EventPipeBuffer* EventPipeThread::GetWriteBuffer()
+{
+ LIMITED_METHOD_CONTRACT;
+ _ASSERTE(m_lock.OwnedByCurrentThread());
+ _ASSERTE(m_pWriteBuffer == nullptr || m_pWriteBuffer->GetVolatileState() == EventPipeBufferState::WRITABLE);
+ return m_pWriteBuffer;
+}
+
+void EventPipeThread::SetWriteBuffer(EventPipeBuffer* pNewBuffer)
+{
+ LIMITED_METHOD_CONTRACT;
+ _ASSERTE(m_lock.OwnedByCurrentThread());
+ _ASSERTE(m_pWriteBuffer == nullptr || m_pWriteBuffer->GetVolatileState() == EventPipeBufferState::WRITABLE);
+ _ASSERTE(pNewBuffer == nullptr || pNewBuffer->GetVolatileState() == EventPipeBufferState::WRITABLE);
+ if (m_pWriteBuffer)
+ {
+ m_pWriteBuffer->ConvertToReadOnly();
+ }
+ m_pWriteBuffer = pNewBuffer;
+}
+
+EventPipeBufferList* EventPipeThread::GetBufferList()
+{
+ LIMITED_METHOD_CONTRACT;
+ _ASSERTE(EventPipe::IsBufferManagerLockOwnedByCurrentThread());
+ return m_pBufferList;
+}
+
+void EventPipeThread::SetBufferList(EventPipeBufferList* pNewBufferList)
+{
+ LIMITED_METHOD_CONTRACT;
+ _ASSERTE(EventPipe::IsBufferManagerLockOwnedByCurrentThread());
+ m_pBufferList = pNewBufferList;
+}
+
+
EventPipeBufferManager::EventPipeBufferManager()
{
CONTRACTL
m_pPerThreadBufferList = new SList<SListElem<EventPipeBufferList*>>();
m_sizeOfAllBuffers = 0;
m_lock.Init(LOCK_TYPE_DEFAULT);
+ m_writeEventSuspending = FALSE;
#ifdef _DEBUG
m_numBuffersAllocated = 0;
}
CONTRACTL_END;
- if(m_pPerThreadBufferList != NULL)
- {
- SListElem<EventPipeBufferList*> *pElem = m_pPerThreadBufferList->GetHead();
- while(pElem != NULL)
- {
- SListElem<EventPipeBufferList*> *pCurElem = pElem;
-
- EventPipeBufferList *pThreadBufferList = pCurElem->GetValue();
- if (!pThreadBufferList->OwnedByThread())
- {
- // We don't delete buffers themself because they can be in-use
- delete(pThreadBufferList);
- }
-
- pElem = m_pPerThreadBufferList->GetNext(pElem);
- delete(pCurElem);
- }
+ // setting this true should have no practical effect other than satisfying asserts at this point.
+ m_writeEventSuspending = TRUE;
+ DeAllocateBuffers();
+}
- delete(m_pPerThreadBufferList);
- m_pPerThreadBufferList = NULL;
- }
+#ifdef DEBUG
+bool EventPipeBufferManager::IsLockOwnedByCurrentThread()
+{
+ return m_lock.OwnedByCurrentThread();
}
+#endif
-EventPipeBuffer* EventPipeBufferManager::AllocateBufferForThread(EventPipeSession &session, unsigned int requestSize)
+EventPipeBuffer* EventPipeBufferManager::AllocateBufferForThread(EventPipeSession &session, unsigned int requestSize, BOOL & writeSuspended)
{
CONTRACTL
{
// Allocating a buffer requires us to take the lock.
SpinLockHolder _slh(&m_lock);
+ // if we are deallocating then give up, see the comments in SuspendWriteEvents() for why this is important.
+ if (m_writeEventSuspending.Load())
+ {
+ writeSuspended = TRUE;
+ return NULL;
+ }
+
// Determine if the requesting thread has at least one buffer.
// If not, we guarantee that each thread gets at least one (to prevent thrashing when the circular buffer size is too small).
bool allocateNewBuffer = false;
- EventPipeBufferList *pThreadBufferList = ThreadEventBufferList::Get();
- if(pThreadBufferList == NULL)
+ EventPipeThread *pEventPipeThread = EventPipeThread::Get();
+
+ if (pEventPipeThread == NULL)
+ {
+ EX_TRY
+ {
+ pEventPipeThread = new EventPipeThread();
+ EventPipeThread::Set(pEventPipeThread);
+ }
+ EX_CATCH
+ {
+ pEventPipeThread = NULL;
+ }
+ EX_END_CATCH(SwallowAllExceptions);
+
+ if (pEventPipeThread == NULL)
+ {
+ return NULL;
+ }
+ }
+
+ EventPipeBufferList *pThreadBufferList = pEventPipeThread->GetBufferList();
+ if (pThreadBufferList == NULL)
{
- pThreadBufferList = new (nothrow) EventPipeBufferList(this);
+ pThreadBufferList = new (nothrow) EventPipeBufferList(this, pEventPipeThread);
+
if (pThreadBufferList == NULL)
{
return NULL;
}
m_pPerThreadBufferList->InsertTail(pElem);
- ThreadEventBufferList::Set(pThreadBufferList);
+ pEventPipeThread->SetBufferList(pThreadBufferList);
allocateNewBuffer = true;
}
- // Determine if policy allows us to allocate another buffer, or if we need to steal one
- // from another thread.
+ // Determine if policy allows us to allocate another buffer
if(!allocateNewBuffer)
{
EventPipeConfiguration *pConfig = EventPipe::GetConfiguration();
allocateNewBuffer = true;
}
}
-
- // Only steal buffers from other threads if the session being written to is a
- // file-based session. Streaming sessions will simply drop events.
- // TODO: Add dropped events telemetry here.
- EventPipeBuffer *pNewBuffer = NULL;
- if(!allocateNewBuffer && (session.GetSessionType() == EventPipeSessionType::File))
- {
- // We can't allocate a new buffer.
- // Find the oldest buffer, de-allocate it, and re-purpose it for this thread.
-
- // Find the thread that contains the oldest stealable buffer, and get its list of buffers.
- EventPipeBufferList *pListToStealFrom = FindThreadToStealFrom();
- if(pListToStealFrom != NULL)
- {
- // Assert that the buffer we're stealing is not the only buffer in the list.
- // This invariant is enforced by FindThreadToStealFrom.
- _ASSERTE((pListToStealFrom->GetHead() != NULL) && (pListToStealFrom->GetHead()->GetNext() != NULL));
-
- // Remove the oldest buffer from the list.
- pNewBuffer = pListToStealFrom->GetAndRemoveHead();
-
- // De-allocate the buffer. We do this because buffers are variable sized
- // based on how much volume is coming from the thread.
- DeAllocateBuffer(pNewBuffer);
- pNewBuffer = NULL;
-
- // Set that we want to allocate a new buffer.
- allocateNewBuffer = true;
-
-#ifdef _DEBUG
- m_numBuffersStolen++;
-#endif // _DEBUG
-
- }
- else
- {
- // This only happens when # of threads == # of buffers.
- // We'll allocate one more buffer, and then this won't happen again.
- allocateNewBuffer = true;
- }
- }
-
+ EventPipeBuffer* pNewBuffer = NULL;
if(allocateNewBuffer)
{
// Pick a buffer size by multiplying the base buffer size by the number of buffers already allocated for this thread.
// could throw, and cannot be easily checked
EX_TRY
{
- pNewBuffer = new EventPipeBuffer(bufferSize);
+ pNewBuffer = new EventPipeBuffer(bufferSize DEBUG_ARG(pEventPipeThread));
}
EX_CATCH
{
return NULL;
}
-EventPipeBufferList* EventPipeBufferManager::FindThreadToStealFrom()
-{
- CONTRACTL
- {
- NOTHROW;
- GC_NOTRIGGER;
- MODE_ANY;
- PRECONDITION(m_lock.OwnedByCurrentThread());
- }
- CONTRACTL_END;
-
- // Find the thread buffer list containing the buffer whose most recent event is the oldest as long as the buffer is not
- // the current buffer for the thread (e.g. it's next pointer is non-NULL).
- // This means that the thread must also have multiple buffers, so that we don't steal its only buffer.
- EventPipeBufferList *pOldestContainingList = NULL;
-
- SListElem<EventPipeBufferList*> *pElem = m_pPerThreadBufferList->GetHead();
- while(pElem != NULL)
- {
- EventPipeBufferList *pCandidate = pElem->GetValue();
-
- // The current candidate has more than one buffer (otherwise it is disqualified).
- if(pCandidate->GetHead()->GetNext() != NULL)
- {
- // If we haven't seen any candidates, this one automatically becomes the oldest candidate.
- if(pOldestContainingList == NULL)
- {
- pOldestContainingList = pCandidate;
- }
- // Otherwise, to replace the existing candidate, this candidate must have an older timestamp in its oldest buffer.
- else if((pOldestContainingList->GetHead()->GetMostRecentTimeStamp().QuadPart) >
- (pCandidate->GetHead()->GetMostRecentTimeStamp().QuadPart))
- {
- pOldestContainingList = pCandidate;
- }
- }
-
- pElem = m_pPerThreadBufferList->GetNext(pElem);
- }
-
- return pOldestContainingList;
-}
-
void EventPipeBufferManager::DeAllocateBuffer(EventPipeBuffer *pBuffer)
{
CONTRACTL
return false;
}
+ StackContents stackContents;
+ if (pStack == NULL && event.NeedStack() && !session.RundownEnabled())
+ {
+ EventPipe::WalkManagedStackForCurrentThread(stackContents);
+ pStack = &stackContents;
+ }
+
// See if the thread already has a buffer to try.
bool allocNewBuffer = false;
EventPipeBuffer *pBuffer = NULL;
- EventPipeBufferList *pThreadBufferList = ThreadEventBufferList::Get();
+ EventPipeThread *pEventPipeThread = EventPipeThread::Get();
- if(pThreadBufferList == NULL)
+ if(pEventPipeThread == NULL)
{
allocNewBuffer = true;
}
else
{
- // The thread already has a buffer list. Select the newest buffer and attempt to write into it.
- pBuffer = pThreadBufferList->GetTail();
+ SpinLockHolder _slh(pEventPipeThread->GetLock());
+ pBuffer = pEventPipeThread->GetWriteBuffer();
+
if(pBuffer == NULL)
{
- // This should never happen. If the buffer list exists, it must contain at least one entry.
- _ASSERT(!"Thread buffer list with zero entries encountered.");
- return false;
+ allocNewBuffer = true;
}
else
{
- // The event is still enabled. Mark that the thread is now writing an event.
- pThreadBufferList->SetThreadEventWriteInProgress(true);
-
// Attempt to write the event to the buffer. If this fails, we should allocate a new buffer.
allocNewBuffer = !pBuffer->WriteEvent(pEventThread, session, event, payload, pActivityId, pRelatedActivityId, pStack);
-
- // Mark that the thread is no longer writing an event.
- pThreadBufferList->SetThreadEventWriteInProgress(false);
}
}
// to switch to preemptive mode here.
unsigned int requestSize = sizeof(EventPipeEventInstance) + payload.GetSize();
- pBuffer = AllocateBufferForThread(session, requestSize);
- }
-
- // Try to write the event after we allocated (or stole) a buffer.
- // This is the first time if the thread had no buffers before the call to this function.
- // This is the second time if this thread did have one or more buffers, but they were full.
- if(allocNewBuffer && pBuffer != NULL)
- {
- // By this point, a new buffer list has been allocated so we should fetch it again before using it.
- pThreadBufferList = ThreadEventBufferList::Get();
- // The event is still enabled. Mark that the thread is now writing an event.
- pThreadBufferList->SetThreadEventWriteInProgress(true);
- allocNewBuffer = !pBuffer->WriteEvent(pEventThread, session, event, payload, pActivityId, pRelatedActivityId, pStack);
+ BOOL writeSuspended = FALSE;
+ pBuffer = AllocateBufferForThread(session, requestSize, writeSuspended);
+ if (pBuffer == NULL)
+ {
+ // We treat this as the WriteEvent() call occurring after this session stopped listening for events, effectively the
+ // same as if event.IsEnabled() test above returned false.
+ if (writeSuspended)
+ {
+ return false;
+ }
+ }
+ else
+ {
+ EventPipeThread *pEventPipeThread = EventPipeThread::Get();
+ _ASSERTE(pEventPipeThread != NULL);
+ {
+ SpinLockHolder _slh(pEventPipeThread->GetLock());
+ if (m_writeEventSuspending.Load())
+ {
+ // After leaving the manager's lock in AllocateBufferForThread some other thread decided to suspend writes.
+ // We need to immediately return the buffer we just took without storing it or writing to it.
+ // SuspendWriteEvent() is spinning waiting for this buffer to be relinquished.
+ pBuffer->ConvertToReadOnly();
+
+ // We treat this as the WriteEvent() call occurring after this session stopped listening for events, effectively the
+ // same as if event.IsEnabled() returned false.
+ return false;
+ }
+ else
+ {
+ pEventPipeThread->SetWriteBuffer(pBuffer);
- // Mark that the thread is no longer writing an event.
- pThreadBufferList->SetThreadEventWriteInProgress(false);
+ // Try to write the event after we allocated a buffer.
+ // This is the first time if the thread had no buffers before the call to this function.
+ // This is the second time if this thread did have one or more buffers, but they were full.
+ allocNewBuffer = !pBuffer->WriteEvent(pEventThread, session, event, payload, pActivityId, pRelatedActivityId, pStack);
+ }
+ }
+ }
}
// Peek the next event out of the list.
EventPipeBuffer *pContainingBuffer = NULL;
+
+ // PERF: This may be too aggressive? If this method is being called frequently enough to keep pace with the
+ // writing threads we could be in a state of high lock contention and lots of churning buffers. Each writer
+ // would take several locks, allocate a new buffer, write one event into it, then the reader would take the
+ // lock, convert the buffer to read-only and read the single event out of it. Allowing more events to accumulate
+ // in the buffers before converting between writable and read-only amortizes a lot of the overhead. One way
+ // to achieve that would be picking a stopTimeStamp that was Xms in the past. This would let Xms of events
+ // to accumulate in the write buffer before we converted it and forced the writer to allocate another. Other more
+ // sophisticated approaches would probably build a low overhead synchronization mechanism to read and write the
+ // buffer at the same time.
EventPipeEventInstance *pNext = pBufferList->PeekNextEvent(stopTimeStamp, &pContainingBuffer);
if (pNext != NULL)
{
}
}
+void EventPipeBufferManager::SuspendWriteEvent()
+{
+ CONTRACTL
+ {
+ THROWS;
+ GC_NOTRIGGER;
+ MODE_ANY;
+ }
+ CONTRACTL_END;
+
+ _ASSERTE(EnsureConsistency());
+
+ // All calls to this method must not be synchronized by our caller
+ _ASSERTE(EventPipe::IsLockOwnedByCurrentThread());
+
+ CQuickArrayList<EventPipeThread*> threadList;
+ {
+ SpinLockHolder _slh(&m_lock);
+ m_writeEventSuspending.Store(TRUE);
+ // From this point until m_writeEventSuspending is reset to FALSE it is impossible
+ // for new EventPipeBufferLists to be added to the m_pPerThreadBufferList. The only
+ // way AllocateBufferForThread is allowed to add one is by:
+ // 1) take m_lock - AllocateBufferForThread can't own it now because this thread owns it,
+ // but after this thread gives it up lower in this function it could be acquired.
+ // 2) observe m_writeEventSuspending = False - that won't happen, acquiring m_lock
+ // guarantees AllocateBufferForThread will observe all the memory changes this
+ // thread made prior to releasing m_lock and we've already set it TRUE.
+ // This ensures that we iterate over the list of threads below we've got the complete list.
+ SListElem<EventPipeBufferList*> *pElem = m_pPerThreadBufferList->GetHead();
+ while(pElem != NULL)
+ {
+ threadList.Push(pElem->GetValue()->GetThread());
+ pElem = m_pPerThreadBufferList->GetNext(pElem);
+ }
+ }
+
+ // Iterate through all the threads, forcing them to finish writes in progress inside EventPipeThread::m_lock,
+ // relinquish any buffers stored in EventPipeThread::m_pWriteBuffer and prevent storing new ones.
+ for (size_t i = 0 ; i < threadList.Size(); i++)
+ {
+ EventPipeThread* pThread = threadList[i];
+ {
+ SpinLockHolder _slh(pThread->GetLock());
+ pThread->SetWriteBuffer(nullptr);
+ // From this point until m_writeEventSuspending is reset to FALSE it is impossible
+ // for new EventPipeBufferLists to be added to the m_pPerThreadBufferList. The only
+ // way AllocateBufferForThread is allowed to add one is by:
+ // 1) take m_lock - AllocateBufferForThread can't own it now because this thread owns it,
+ // but after this thread gives it up lower in this function it could be acquired.
+ // 2) observe m_writeEventSuspending = False - that won't happen, acquiring m_lock
+ // guarantees AllocateBufferForThread will observe all the memory changes this
+ // thread made prior to releasing m_lock and we've already set it TRUE.
+ }
+ }
+
+ // Wait for any straggler WriteEvent threads that may have already allocated a buffer but
+ // hadn't yet relinquished it.
+ {
+ SpinLockHolder _slh(&m_lock);
+ SListElem<EventPipeBufferList*> *pElem = m_pPerThreadBufferList->GetHead();
+ while (pElem != NULL)
+ {
+ // Get the list and remove it from the thread.
+ EventPipeBufferList *pBufferList = pElem->GetValue();
+ for (EventPipeBuffer* pBuffer = pBufferList->GetHead(); pBuffer != nullptr; pBuffer = pBuffer->GetNext())
+ {
+ // Above we guaranteed that other threads wouldn't acquire new buffers or keep the ones they
+ // already have indefinitely, but we haven't quite guaranteed the buffer has been relinquished
+ // back to us. It's possible the WriteEvent thread allocated the buffer before we took m_lock
+ // above, but it hasn't yet acquired EventPipeThread::m_lock in order to observe that it needs
+ // to relinquish the buffer. In this state, it has a pointer to the buffer stored in registers
+ // or on the stack. If the thread is in that tiny window, all we have to do is wait for it.
+ YIELD_WHILE(pBuffer->GetVolatileState() != EventPipeBufferState::READ_ONLY);
+ }
+ pElem = m_pPerThreadBufferList->GetNext(pElem);
+ }
+ }
+}
+
void EventPipeBufferManager::DeAllocateBuffers()
{
CONTRACTL
CONTRACTL_END;
_ASSERTE(EnsureConsistency());
+ _ASSERTE(m_writeEventSuspending);
// Take the buffer manager manipulation lock
SpinLockHolder _slh(&m_lock);
{
// Get the list and determine if we can free it.
EventPipeBufferList *pBufferList = pElem->GetValue();
- if(!pBufferList->OwnedByThread())
+ EventPipeThread *pThread = pBufferList->GetThread();
+ pThread->SetBufferList(nullptr);
+
+ // Iterate over all nodes in the list and deallocate them.
+ EventPipeBuffer *pBuffer = pBufferList->GetAndRemoveHead();
+ while (pBuffer != NULL)
{
- // Iterate over all nodes in the list and de-allocate them.
- EventPipeBuffer *pBuffer = pBufferList->GetAndRemoveHead();
- while(pBuffer != NULL)
- {
- DeAllocateBuffer(pBuffer);
- pBuffer = pBufferList->GetAndRemoveHead();
- }
+ DeAllocateBuffer(pBuffer);
+ pBuffer = pBufferList->GetAndRemoveHead();
+ }
- // Remove the buffer list from the per-thread buffer list.
- pElem = m_pPerThreadBufferList->FindAndRemove(pElem);
- _ASSERTE(pElem != NULL);
+ // Remove the buffer list from the per-thread buffer list.
+ pElem = m_pPerThreadBufferList->FindAndRemove(pElem);
+ _ASSERTE(pElem != NULL);
- SListElem<EventPipeBufferList*> *pCurElem = pElem;
- pElem = m_pPerThreadBufferList->GetNext(pElem);
- delete(pCurElem);
+ SListElem<EventPipeBufferList*> *pCurElem = pElem;
+ pElem = m_pPerThreadBufferList->GetNext(pElem);
+ delete(pCurElem);
- // Now that all of the list elements have been freed, free the list itself.
- delete(pBufferList);
- pBufferList = NULL;
- }
- else
- {
- pElem = m_pPerThreadBufferList->GetNext(pElem);
- }
+ // Now that all the list elements have been freed, free the list itself.
+ delete(pBufferList);
+ pBufferList = NULL;
}
}
+void EventPipeBufferManager::ResumeWriteEvent()
+{
+ LIMITED_METHOD_CONTRACT;
+
+ // All calls to this method must be synchronized by our caller.
+
+ _ASSERTE(EventPipe::IsLockOwnedByCurrentThread());
+ _ASSERTE(EnsureConsistency());
+
+ m_writeEventSuspending.Store(FALSE);
+
+ // At this point threads are allowed to again allocate new BufferLists and Buffers. However our caller
+ // presumablyh disabled all the events and until events are re-enabled no thread is going to get past
+ // the event.IsEnabled() checks in WriteEvent() to make any of those allocations happen.
+}
#ifdef _DEBUG
bool EventPipeBufferManager::EnsureConsistency()
}
#endif // _DEBUG
-EventPipeBufferList::EventPipeBufferList(EventPipeBufferManager *pManager)
+EventPipeBufferList::EventPipeBufferList(EventPipeBufferManager *pManager, EventPipeThread* pThread)
{
LIMITED_METHOD_CONTRACT;
m_pManager = pManager;
+ m_pThread = pThread;
m_pHeadBuffer = NULL;
m_pTailBuffer = NULL;
m_bufferCount = 0;
- m_pReadBuffer = NULL;
- m_ownedByThread = true;
- m_threadEventWriteInProgress = false;
-
-#ifdef _DEBUG
- m_pCreatingThread = GetThread();
-#endif // _DEBUG
}
EventPipeBuffer* EventPipeBufferList::GetHead()
return m_bufferCount;
}
+EventPipeBuffer* EventPipeBufferList::TryGetReadBuffer(LARGE_INTEGER beforeTimeStamp, EventPipeBuffer* pNewReadBuffer)
+{
+ LIMITED_METHOD_CONTRACT;
+ _ASSERTE(EventPipe::IsBufferManagerLockOwnedByCurrentThread());
+
+ // is it possible that the read buffer has events that occur before 'beforeTimeStamp'?
+ if (pNewReadBuffer == NULL || pNewReadBuffer->GetCreationTimeStamp().QuadPart >= beforeTimeStamp.QuadPart)
+ {
+ return NULL;
+ }
+
+ // we can't read from a buffer while it is being simultaneously being written, we need to preempt the writer.
+ if (pNewReadBuffer->GetVolatileState() == EventPipeBufferState::WRITABLE)
+ {
+ {
+ SpinLockHolder _slh(m_pThread->GetLock());
+ if (m_pThread->GetWriteBuffer() == pNewReadBuffer)
+ {
+ m_pThread->SetWriteBuffer(nullptr);
+ }
+ }
+ _ASSERTE(pNewReadBuffer->GetVolatileState() == EventPipeBufferState::READ_ONLY);
+ }
+
+ return pNewReadBuffer;
+}
+
EventPipeEventInstance* EventPipeBufferList::PeekNextEvent(LARGE_INTEGER beforeTimeStamp, EventPipeBuffer **pContainingBuffer)
{
CONTRACTL
}
CONTRACTL_END;
- // Get the current read buffer.
- // If it's not set, start with the head buffer.
- if(m_pReadBuffer == NULL)
- {
- m_pReadBuffer = m_pHeadBuffer;
- }
+ EventPipeBuffer* pReadBuffer = TryGetReadBuffer(beforeTimeStamp, m_pHeadBuffer);
- // If the read buffer is still NULL, then this list contains no buffers.
- if(m_pReadBuffer == NULL)
+ // If the read buffer is still NULL, then this list contains no buffers with events in the time range we care about.
+ if(pReadBuffer == NULL)
{
return NULL;
}
// Get the next event in the buffer.
- EventPipeEventInstance *pNext = m_pReadBuffer->PeekNext(beforeTimeStamp);
+ EventPipeEventInstance *pNext = pReadBuffer->PeekNext(beforeTimeStamp);
// If the next event is NULL, then go to the next buffer.
if(pNext == NULL)
{
- m_pReadBuffer = m_pReadBuffer->GetNext();
- if(m_pReadBuffer != NULL)
+ pReadBuffer = TryGetReadBuffer(beforeTimeStamp, pReadBuffer->GetNext());
+ if(pReadBuffer != NULL)
{
- pNext = m_pReadBuffer->PeekNext(beforeTimeStamp);
+ pNext = pReadBuffer->PeekNext(beforeTimeStamp);
}
}
// Set the containing buffer.
if(pNext != NULL && pContainingBuffer != NULL)
{
- *pContainingBuffer = m_pReadBuffer;
+ *pContainingBuffer = pReadBuffer;
}
// Make sure pContainingBuffer is properly set.
- _ASSERTE((pNext == NULL) || (pNext != NULL && pContainingBuffer == NULL) || (pNext != NULL && *pContainingBuffer == m_pReadBuffer));
+ _ASSERTE((pNext == NULL) || (pNext != NULL && pContainingBuffer == NULL) || (pNext != NULL && *pContainingBuffer == pReadBuffer));
return pNext;
}
return pNext;
}
-#ifdef _DEBUG
-Thread* EventPipeBufferList::GetThread()
+EventPipeThread* EventPipeBufferList::GetThread()
{
LIMITED_METHOD_CONTRACT;
-
- return m_pCreatingThread;
+ return m_pThread;
}
+#ifdef _DEBUG
bool EventPipeBufferList::EnsureConsistency()
{
CONTRACTL
}
#endif // _DEBUG
-ThreadEventBufferList::~ThreadEventBufferList()
-{
- // Before the thread dies, mark its buffers as no longer owned
- // so that they can be cleaned up after the thread dies.
- // EventPipeBufferList *pList = GetThreadEventBufferList();
- if (m_pThreadEventBufferList != NULL)
- m_pThreadEventBufferList->SetOwnedByThread(false);
-}
#endif // FEATURE_PERFTRACING
#include "spinlock.h"
class EventPipeBufferList;
+class EventPipeThread;
-// This class is a TLS wrapper around a pointer to thread-specific EventPipeBufferList
-// The struct wrapper is present mainly because we need a way to free the EventPipeBufferList
-// when the thread that owns it dies. Placing this class as a TLS variable will call ~ThreadEventBufferList()
-// when the thread dies so we can free EventPipeBufferList in the destructor.
-class ThreadEventBufferList
+void ReleaseEventPipeThreadRef(EventPipeThread* pThread);
+void AcquireEventPipeThreadRef(EventPipeThread* pThread);
+typedef Wrapper<EventPipeThread*, AcquireEventPipeThreadRef, ReleaseEventPipeThreadRef> EventPipeThreadHolder;
+
+class EventPipeThread
{
#ifndef __GNUC__
-__declspec(thread) static ThreadEventBufferList gCurrentThreadEventBufferList;
+ __declspec(thread) static EventPipeThreadHolder gCurrentEventPipeThreadHolder;
#else // !__GNUC__
-thread_local static ThreadEventBufferList gCurrentThreadEventBufferList;
+ thread_local static EventPipeThreadHolder gCurrentEventPipeThreadHolder;
#endif // !__GNUC__
EventPipeBufferList * m_pThreadEventBufferList = NULL;
- ~ThreadEventBufferList();
+ ~EventPipeThread();
+
+ // The EventPipeThreadHolder maintains one count while the thread is alive
+ // and each session's EventPipeBufferList maintains one count while it
+ // exists
+ LONG m_refCount;
+
+ // this is the one and only buffer this thread is allowed to write to
+ // if non-null, it must match the tail of the m_bufferList
+ // this pointer is protected by m_lock
+ EventPipeBuffer *m_pWriteBuffer = NULL;
+
+ // this is a list of buffers that were written to by this thread
+ // it is protected by EventPipeBufferManager::m_lock
+ EventPipeBufferList *m_pBufferList = NULL;
+
+ // This lock is designed to have low contention. Normally it is only taken by this thread,
+ // but occasionally it may also be taken by another thread which is trying to collect and drain
+ // buffers from all threads.
+ SpinLock m_lock;
public:
- static EventPipeBufferList* Get()
- {
- return gCurrentThreadEventBufferList.m_pThreadEventBufferList;
- }
-
- static void Set(EventPipeBufferList * bl)
- {
- gCurrentThreadEventBufferList.m_pThreadEventBufferList = bl;
- }
+ static EventPipeThread* Get();
+ static void Set(EventPipeThread* pThread);
+
+ EventPipeThread();
+ void AddRef();
+ void Release();
+ SpinLock * GetLock();
+ EventPipeBuffer* GetWriteBuffer();
+ void SetWriteBuffer(EventPipeBuffer* pNewBuffer);
+ EventPipeBufferList * GetBufferList();
+ void SetBufferList(EventPipeBufferList * pBufferList);
};
class EventPipeBufferManager
// Lock to protect access to the per-thread buffer list and total allocation size.
SpinLock m_lock;
-
+ Volatile<BOOL> m_writeEventSuspending;
#ifdef _DEBUG
// For debugging purposes.
// Allocate a new buffer for the specified thread.
// This function will store the buffer in the thread's buffer list for future use and also return it here.
// A NULL return value means that a buffer could not be allocated.
- EventPipeBuffer* AllocateBufferForThread(EventPipeSession &session, unsigned int requestSize);
+ EventPipeBuffer* AllocateBufferForThread(EventPipeSession &session, unsigned int requestSize, BOOL & writeSuspended);
// Add a buffer to the thread buffer list.
void AddBufferToThreadBufferList(EventPipeBufferList *pThreadBuffers, EventPipeBuffer *pBuffer);
// Otherwise, if a stack trace is needed, one will be automatically collected.
bool WriteEvent(Thread *pThread, EventPipeSession &session, EventPipeEvent &event, EventPipeEventPayload &payload, LPCGUID pActivityId, LPCGUID pRelatedActivityId, Thread *pEventThread = NULL, StackContents *pStack = NULL);
+ // Ends the suspension period created by SuspendWriteEvent(). After this call returns WriteEvent()
+ // can again be called succesfully, new BufferLists and Buffers may be allocated.
+ // The caller is required to synchronize all calls to SuspendWriteEvent() and ResumeWriteEvent()
+ void ResumeWriteEvent();
+
+ // From the time this function returns until ResumeWriteEvent() is called a suspended state will
+ // be in effect that blocks all WriteEvent activity. All existing buffers will be in the
+ // PENDING_FLUSH state and no new EventPipeBuffers or EventPipeBufferLists can be created. Calls to
+ // WriteEvent that start during the suspension period or were in progress but hadn't yet recorded
+ // their event into a buffer before the start of the suspension period will return false and the
+ // event will not be recorded. Any events that not recorded as a result of this suspension will be
+ // treated the same as events that were not recorded due to configuration.
+ // EXPECTED USAGE: First the caller will disable all events via configuration, then call
+ // SuspendWriteEvents() to force any WriteEvent calls that may still be in progress to either
+ // finish or cancel. After that all BufferLists and Buffers can be safely drained and/or deleted.
+ // The caller is required to synchronize all calls to SuspendWriteEvent() and ResumeWriteEvent()
+ void SuspendWriteEvent();
+
// Write the contents of the managed buffers to the specified file.
// The stopTimeStamp is used to determine when tracing was stopped to ensure that we
// skip any events that might be partially written due to races when tracing is stopped.
#ifdef _DEBUG
bool EnsureConsistency();
+ bool IsLockOwnedByCurrentThread();
#endif // _DEBUG
};
// The buffer manager that owns this list.
EventPipeBufferManager *m_pManager;
+ // The thread which writes to the buffers in this list
+ EventPipeThreadHolder m_pThread;
+
// Buffers are stored in an intrusive linked-list from oldest to newest.
// Head is the oldest buffer. Tail is the newest (and currently used) buffer.
EventPipeBuffer *m_pHeadBuffer;
// The number of buffers in the list.
unsigned int m_bufferCount;
- // The current read buffer (used when processing events on tracing stop).
- EventPipeBuffer *m_pReadBuffer;
-
- // True if this thread is owned by a thread.
- // If it is false, then this buffer can be de-allocated after it is drained.
- Volatile<bool> m_ownedByThread;
-
- // True if a thread is writing something to this list
- Volatile<bool> m_threadEventWriteInProgress;
-
-#ifdef _DEBUG
- // For diagnostics, keep the thread pointer.
- Thread *m_pCreatingThread;
-#endif // _DEBUG
+ // Check pNewReadBuffer to see if it has events in the right time-range and convert it to a readable
+ // state if needed
+ EventPipeBuffer* TryGetReadBuffer(LARGE_INTEGER beforeTimeStamp, EventPipeBuffer* pNewReadBuffer);
public:
- EventPipeBufferList(EventPipeBufferManager *pManager);
+ EventPipeBufferList(EventPipeBufferManager *pManager, EventPipeThread* pThread);
// Get the head node of the list.
EventPipeBuffer* GetHead();
// Get the next event as long as it is before the specified timestamp, and also mark it as read.
EventPipeEventInstance* PopNextEvent(LARGE_INTEGER beforeTimeStamp);
- // True if a thread owns this list.
- bool OwnedByThread() const
- {
- LIMITED_METHOD_CONTRACT;
- return m_ownedByThread;
- }
-
- // Set whether or not this list is owned by a thread.
- // If it is not owned by a thread, then it can be de-allocated
- // after the buffer is drained.
- // The default value is true.
- void SetOwnedByThread(bool value)
- {
- LIMITED_METHOD_CONTRACT;
- m_ownedByThread = value;
- }
-
- bool GetThreadEventWriteInProgress() const
- {
- LIMITED_METHOD_CONTRACT;
- return m_threadEventWriteInProgress;
- }
-
- void SetThreadEventWriteInProgress(bool value)
- {
- LIMITED_METHOD_CONTRACT;
- m_threadEventWriteInProgress = value;
- }
-
-#ifdef _DEBUG
// Get the thread associated with this list.
- Thread* GetThread();
+ EventPipeThread* GetThread();
+
+#ifdef _DEBUG
// Validate the consistency of the list.
// This function will assert if the list is in an inconsistent state.
bool EnsureConsistency();