Let EventPipe threads sleep when no events are available (#25601)
authorDavid Mason <davmason@microsoft.com>
Wed, 10 Jul 2019 05:33:24 +0000 (22:33 -0700)
committerGitHub <noreply@github.com>
Wed, 10 Jul 2019 05:33:24 +0000 (22:33 -0700)
 Don't spin forever in EventListener when listening for EventPipe data

src/System.Private.CoreLib/src/System/Diagnostics/Eventing/EventPipe.cs
src/System.Private.CoreLib/src/System/Diagnostics/Eventing/EventPipeEventDispatcher.cs
src/vm/ecalllist.h
src/vm/eventpipe.cpp
src/vm/eventpipe.h
src/vm/eventpipebuffermanager.cpp
src/vm/eventpipebuffermanager.h
src/vm/eventpipeinternal.cpp
src/vm/eventpipeinternal.h
src/vm/eventpipesession.cpp
src/vm/eventpipesession.h

index f3f48b4..b50fdad 100644 (file)
@@ -4,6 +4,7 @@
 using System.Collections.Generic;
 using System.Runtime.CompilerServices;
 using System.Runtime.InteropServices;
+using System.Threading;
 
 #if FEATURE_PERFTRACING
 
@@ -85,6 +86,11 @@ namespace System.Diagnostics.Tracing
         NetTrace
     }
 
+    internal sealed class EventPipeWaitHandle : WaitHandle
+    {
+
+    }
+    
     internal sealed class EventPipeConfiguration
     {
         private string m_outputFile;
@@ -251,6 +257,9 @@ namespace System.Diagnostics.Tracing
 
         [DllImport(JitHelpers.QCall, CharSet = CharSet.Unicode)]
         internal static extern unsafe bool GetNextEvent(UInt64 sessionID, EventPipeEventInstanceData* pInstance);
+
+        [DllImport(JitHelpers.QCall, CharSet = CharSet.Unicode)]
+        internal static extern unsafe IntPtr GetWaitHandle(UInt64 sessionID);
     }
 }
 
index 4360283..b4ff7d1 100644 (file)
@@ -4,6 +4,7 @@
 using System.Collections.Generic;
 using System.Threading;
 using System.Threading.Tasks;
+using Microsoft.Win32.SafeHandles;
 
 namespace System.Diagnostics.Tracing
 {
@@ -32,6 +33,7 @@ namespace System.Diagnostics.Tracing
         private Int64 m_timeQPCFrequency;
 
         private bool m_stopDispatchTask;
+        private EventPipeWaitHandle m_dispatchTaskWaitHandle = new EventPipeWaitHandle();
         private Task? m_dispatchTask = null;
         private object m_dispatchControlLock = new object();
         private Dictionary<EventListener, EventListenerSubscription> m_subscriptions = new Dictionary<EventListener, EventListenerSubscription>();
@@ -42,6 +44,7 @@ namespace System.Diagnostics.Tracing
         {
             // Get the ID of the runtime provider so that it can be used as a filter when processing events.
             m_RuntimeProviderID = EventPipeInternal.GetProvider(NativeRuntimeEventSource.EventSourceName);
+            m_dispatchTaskWaitHandle.SafeWaitHandle = new SafeWaitHandle(IntPtr.Zero, false);
         }
 
         internal void SendCommand(EventListener eventListener, EventCommand command, bool enable, EventLevel level, EventKeywords matchAnyKeywords)
@@ -140,6 +143,9 @@ namespace System.Diagnostics.Tracing
             if (m_dispatchTask == null)
             {
                 m_stopDispatchTask = false;
+                // Create a SafeWaitHandle that won't release the handle when done
+                m_dispatchTaskWaitHandle.SafeWaitHandle = new SafeWaitHandle(EventPipeInternal.GetWaitHandle(m_sessionID), false);
+
                 m_dispatchTask = Task.Factory.StartNew(DispatchEventsToEventListeners, CancellationToken.None, TaskCreationOptions.LongRunning, TaskScheduler.Default);
             }
         }
@@ -151,6 +157,8 @@ namespace System.Diagnostics.Tracing
             if(m_dispatchTask != null)
             {
                 m_stopDispatchTask = true;
+                Debug.Assert(!m_dispatchTaskWaitHandle.SafeWaitHandle.IsInvalid);
+                Interop.Kernel32.SetEvent(m_dispatchTaskWaitHandle.SafeWaitHandle);
                 m_dispatchTask.Wait();
                 m_dispatchTask = null;
             }
@@ -163,9 +171,12 @@ namespace System.Diagnostics.Tracing
 
             while (!m_stopDispatchTask)
             {
+                bool eventsReceived = false;
                 // Get the next event.
                 while (!m_stopDispatchTask && EventPipeInternal.GetNextEvent(m_sessionID, &instanceData))
                 {
+                    eventsReceived = true;
+
                     // Filter based on provider.
                     if (instanceData.ProviderID == m_RuntimeProviderID)
                     {
@@ -179,6 +190,13 @@ namespace System.Diagnostics.Tracing
                 // Wait for more events.
                 if (!m_stopDispatchTask)
                 {
+                    if (!eventsReceived)
+                    {
+                        // Future TODO: this would make more sense to handle in EventPipeSession/EventPipe native code.
+                        Debug.Assert(!m_dispatchTaskWaitHandle.SafeWaitHandle.IsInvalid);
+                        m_dispatchTaskWaitHandle.WaitOne();
+                    }
+
                     Thread.Sleep(10);
                 }
             }
index 62e2241..fca631e 100644 (file)
@@ -1103,6 +1103,7 @@ FCFuncStart(gEventPipeInternalFuncs)
     QCFuncElement("WriteEvent", EventPipeInternal::WriteEvent)
     QCFuncElement("WriteEventData", EventPipeInternal::WriteEventData)
     QCFuncElement("GetNextEvent", EventPipeInternal::GetNextEvent)
+    QCFuncElement("GetWaitHandle", EventPipeInternal::GetWaitHandle)
 FCFuncEnd()
 #endif // FEATURE_PERFTRACING
 
index acb4d39..e35d121 100644 (file)
@@ -371,7 +371,8 @@ void EventPipe::DisableInternal(EventPipeSessionID id, EventPipeProviderCallback
 
     s_allowWrite &= ~(pSession->GetMask()); 
     pSession->SuspendWriteEvent();
-    pSession->WriteAllBuffersToFile(); // Flush the buffers to the stream/file
+    bool ignored;
+    pSession->WriteAllBuffersToFile(&ignored); // Flush the buffers to the stream/file
 
     --s_numberOfSessions;
 
@@ -786,6 +787,14 @@ EventPipeEventInstance *EventPipe::GetNextEvent(EventPipeSessionID sessionID)
     return pSession ? pSession->GetNextEvent() : nullptr;
 }
 
+HANDLE EventPipe::GetWaitHandle(EventPipeSessionID sessionID)
+{
+    LIMITED_METHOD_CONTRACT;
+
+    EventPipeSession *const pSession = GetSession(sessionID);
+    return pSession ? pSession->GetWaitEvent()->GetHandleUNHOSTED() : 0;
+}
+
 void EventPipe::InvokeCallback(EventPipeProviderCallbackData eventPipeProviderCallbackData)
 {
     EventPipeProvider::InvokeCallback(eventPipeProviderCallbackData);
index 23b1acf..ac234a0 100644 (file)
@@ -108,6 +108,9 @@ public:
     // Get next event.
     static EventPipeEventInstance *GetNextEvent(EventPipeSessionID sessionID);
 
+    // Get the event handle that signals when new events are available.
+    static HANDLE GetWaitHandle(EventPipeSessionID sessionID);
+
 #ifdef DEBUG
     static bool IsLockOwnedByCurrentThread();
 #endif
index af403ff..7daf689 100644 (file)
@@ -37,6 +37,7 @@ EventPipeBufferManager::EventPipeBufferManager(EventPipeSession* pSession, size_
     m_sizeOfAllBuffers = 0;
     m_lock.Init(LOCK_TYPE_DEFAULT);
     m_writeEventSuspending = FALSE;
+    m_waitEvent.CreateAutoEvent(TRUE);
 
 #ifdef _DEBUG
     m_numBuffersAllocated = 0;
@@ -418,6 +419,10 @@ bool EventPipeBufferManager::WriteEvent(Thread *pThread, EventPipeSession &sessi
         }
     }
 
+    // allocNewBuffer is reused below to detect if overflow happened, so cache it here to see if we should 
+    // signal the reader thread
+    bool shouldSignalReaderThread = allocNewBuffer;
+    
     // Check to see if we need to allocate a new buffer, and if so, do it here.
     if (allocNewBuffer)
     {
@@ -476,6 +481,12 @@ bool EventPipeBufferManager::WriteEvent(Thread *pThread, EventPipeSession &sessi
         }
     }
 
+    if (shouldSignalReaderThread)
+    {
+        // Indicate that there is new data to be read
+        m_waitEvent.Set();
+    }
+
 #ifdef _DEBUG
     if (!allocNewBuffer)
     {
@@ -489,7 +500,7 @@ bool EventPipeBufferManager::WriteEvent(Thread *pThread, EventPipeSession &sessi
     return !allocNewBuffer;
 }
 
-void EventPipeBufferManager::WriteAllBuffersToFile(EventPipeFile *pFile, LARGE_INTEGER stopTimeStamp)
+void EventPipeBufferManager::WriteAllBuffersToFile(EventPipeFile *pFile, LARGE_INTEGER stopTimeStamp, bool *eventsWritten)
 {
     CONTRACTL
     {
@@ -505,15 +516,15 @@ void EventPipeBufferManager::WriteAllBuffersToFile(EventPipeFile *pFile, LARGE_I
     // See the comments in WriteAllBufferToFileV4 for more details
     if (pFile->GetSerializationFormat() >= EventPipeSerializationFormat::NetTraceV4)
     {
-        WriteAllBuffersToFileV4(pFile, stopTimeStamp);
+        WriteAllBuffersToFileV4(pFile, stopTimeStamp, eventsWritten);
     }
     else
     {
-        WriteAllBuffersToFileV3(pFile, stopTimeStamp);
+        WriteAllBuffersToFileV3(pFile, stopTimeStamp, eventsWritten);
     }
 }
 
-void EventPipeBufferManager::WriteAllBuffersToFileV3(EventPipeFile *pFile, LARGE_INTEGER stopTimeStamp)
+void EventPipeBufferManager::WriteAllBuffersToFileV3(EventPipeFile *pFile, LARGE_INTEGER stopTimeStamp, bool *pEventsWritten)
 {
     CONTRACTL
     {
@@ -522,20 +533,24 @@ void EventPipeBufferManager::WriteAllBuffersToFileV3(EventPipeFile *pFile, LARGE
         MODE_PREEMPTIVE;
         PRECONDITION(pFile != nullptr);
         PRECONDITION(GetCurrentEvent() == nullptr);
+        PRECONDITION(pEventsWritten != nullptr);
     }
     CONTRACTL_END;
 
+    *pEventsWritten = false;
+
     // Naively walk the circular buffer, writing the event stream in timestamp order.
     MoveNextEventAnyThread(stopTimeStamp);
     while (GetCurrentEvent() != nullptr)
     {
+        *pEventsWritten = true;
         pFile->WriteEvent(*GetCurrentEvent(), /*CaptureThreadId=*/0, /*sequenceNumber=*/0, /*IsSorted=*/TRUE);
         MoveNextEventAnyThread(stopTimeStamp);
     }
     pFile->Flush();
 }
 
-void EventPipeBufferManager::WriteAllBuffersToFileV4(EventPipeFile *pFile, LARGE_INTEGER stopTimeStamp)
+void EventPipeBufferManager::WriteAllBuffersToFileV4(EventPipeFile *pFile, LARGE_INTEGER stopTimeStamp, bool *pEventsWritten)
 {
     CONTRACTL
     {
@@ -544,6 +559,7 @@ void EventPipeBufferManager::WriteAllBuffersToFileV4(EventPipeFile *pFile, LARGE
         MODE_PREEMPTIVE;
         PRECONDITION(pFile != nullptr);
         PRECONDITION(GetCurrentEvent() == nullptr);
+        PRECONDITION(pEventsWritten != nullptr);
     }
     CONTRACTL_END;
 
@@ -551,7 +567,7 @@ void EventPipeBufferManager::WriteAllBuffersToFileV4(EventPipeFile *pFile, LARGE
     // In V3 of the format this code does a full timestamp order sort on the events which made the file easier to consume,
     // but the perf implications for emitting the file are less desirable. Imagine an application with 500 threads emitting
     // 10 events per sec per thread (granted this is a questionable number of threads to use in an app, but that isn't
-    // under our control). A nieve sort of 500 ordered lists is going to pull the oldest event from each of 500 lists,
+    // under our control). A naive sort of 500 ordered lists is going to pull the oldest event from each of 500 lists,
     // compare all the timestamps, then emit the oldest one. This could easily add a thousand CPU cycles per-event. A
     // better implementation could maintain a min-heap so that we scale O(log(N)) instead of O(N)but fundamentally sorting
     // has a cost and we didn't want a file format that forces the runtime to pay it on every event.
@@ -579,7 +595,7 @@ void EventPipeBufferManager::WriteAllBuffersToFileV4(EventPipeFile *pFile, LARGE
     // beforehand. I'm betting on these extreme cases being very rare and even something like 1GB isn't an unreasonable
     // amount of virtual memory to use on to parse an extreme trace. However if I am wrong we can control
     // both the allocation policy and the triggering instrumentation. Nothing requires us to give out 1MB buffers to
-    // 1000 threads simulatneously, nor are we prevented from observing buffer usage at finer granularity than we
+    // 1000 threads simultaneously, nor are we prevented from observing buffer usage at finer granularity than we
     // allocated.
     //
     // 2) We mark which events are the oldest ones in the stream at the time we emit them and we do this at regular
@@ -596,6 +612,8 @@ void EventPipeBufferManager::WriteAllBuffersToFileV4(EventPipeFile *pFile, LARGE
     // of each of them and needs to know when each buffer can be released. The explicit sequence point makes that
     // very easy - every sequence point all buffers can be released and no further bookkeeping is required.
 
+    *pEventsWritten = false;
+
     EventPipeSequencePoint* pSequencePoint;
     LARGE_INTEGER curTimestampBoundary;
     curTimestampBoundary.QuadPart = stopTimeStamp.QuadPart;
@@ -635,6 +653,8 @@ void EventPipeBufferManager::WriteAllBuffersToFileV4(EventPipeFile *pFile, LARGE
                 MoveNextEventSameThread(curTimestampBoundary);
             }
             pBufferList->SetLastReadSequenceNumber(sequenceNumber);
+            // Have we written events in any sequence point?
+            *pEventsWritten = eventsWritten || *pEventsWritten;
         }
 
         // This finishes any current partially filled EventPipeBlock, and flushes it to the stream
@@ -720,6 +740,14 @@ EventPipeEventInstance* EventPipeBufferManager::GetNextEvent()
     return GetCurrentEvent();
 }
 
+CLREvent *EventPipeBufferManager::GetWaitEvent()
+{
+    LIMITED_METHOD_CONTRACT;
+
+    _ASSERTE(m_waitEvent.IsValid());
+    return &m_waitEvent;
+}
+
 EventPipeEventInstance* EventPipeBufferManager::GetCurrentEvent()
 {
     LIMITED_METHOD_CONTRACT;
index 9bed01d..e515c38 100644 (file)
@@ -63,6 +63,9 @@ private:
     SpinLock m_lock;
     Volatile<BOOL> m_writeEventSuspending;
 
+    // Event for synchronizing real time reading
+    CLREvent m_waitEvent;
+
     // Iterator state for reader thread
     // These are not protected by m_lock and expected to only be used on the reader thread
     EventPipeEventInstance* m_pCurrentEvent;
@@ -174,9 +177,9 @@ public:
     // Write the contents of the managed buffers to the specified file.
     // The stopTimeStamp is used to determine when tracing was stopped to ensure that we
     // skip any events that might be partially written due to races when tracing is stopped.
-    void WriteAllBuffersToFile(EventPipeFile *pFile, LARGE_INTEGER stopTimeStamp);
-    void WriteAllBuffersToFileV3(EventPipeFile *pFastSerializableObject, LARGE_INTEGER stopTimeStamp);
-    void WriteAllBuffersToFileV4(EventPipeFile *pFastSerializableObject, LARGE_INTEGER stopTimeStamp);
+    void WriteAllBuffersToFile(EventPipeFile *pFile, LARGE_INTEGER stopTimeStamp, bool *pEventsWritten);
+    void WriteAllBuffersToFileV3(EventPipeFile *pFastSerializableObject, LARGE_INTEGER stopTimeStamp, bool *pEventsWritten);
+    void WriteAllBuffersToFileV4(EventPipeFile *pFastSerializableObject, LARGE_INTEGER stopTimeStamp, bool *pEventsWritten);
 
     // Attempt to de-allocate resources as best we can.  It is possible for some buffers to leak because
     // threads can be in the middle of a write operation and get blocked, and we may not get an opportunity
@@ -185,6 +188,8 @@ public:
 
     // Get next event.  This is used to dispatch events to EventListener.
     EventPipeEventInstance* GetNextEvent();
+    
+    CLREvent *GetWaitEvent();
 
 #ifdef _DEBUG
     bool EnsureConsistency();
index e79005d..c99c103 100644 (file)
@@ -277,4 +277,17 @@ bool QCALLTYPE EventPipeInternal::GetNextEvent(UINT64 sessionID, EventPipeEventI
     return pNextInstance != NULL;
 }
 
+HANDLE QCALLTYPE EventPipeInternal::GetWaitHandle(UINT64 sessionID)
+{
+    QCALL_CONTRACT;
+
+    HANDLE waitHandle;
+    BEGIN_QCALL;
+
+    waitHandle = EventPipe::GetWaitHandle(sessionID);
+
+    END_QCALL;
+    return waitHandle;
+}
+
 #endif // FEATURE_PERFTRACING
index ab280c3..9367501 100644 (file)
@@ -98,6 +98,10 @@ public:
     static bool QCALLTYPE GetNextEvent(
         UINT64 sessionID,
         EventPipeEventInstanceData *pInstance);
+
+    static HANDLE QCALLTYPE GetWaitHandle(
+        UINT64 sessionID);
+
 };
 
 #endif // FEATURE_PERFTRACING
index ca007dc..8d2ff65 100644 (file)
@@ -164,6 +164,7 @@ DWORD WINAPI EventPipeSession::ThreadProc(void *args)
 
     Thread *const pThisThread = pEventPipeSession->GetIpcStreamingThread();
     bool fSuccess = true;
+    CLREvent *waitEvent = pEventPipeSession->GetWaitEvent();
 
     {
         GCX_PREEMP();
@@ -171,12 +172,19 @@ DWORD WINAPI EventPipeSession::ThreadProc(void *args)
         {
             while (pEventPipeSession->IsIpcStreamingEnabled())
             {
-                if (!pEventPipeSession->WriteAllBuffersToFile())
+                bool eventsWritten = false;
+                if (!pEventPipeSession->WriteAllBuffersToFile(&eventsWritten))
                 {
                     fSuccess = false;
                     break;
                 }
 
+                if (!eventsWritten)
+                {
+                    // No events were available, sleep until more are available
+                    waitEvent->Wait(INFINITE, FALSE);
+                }
+                
                 // Wait until it's time to sample again.
                 PlatformSleep();
             }
@@ -274,7 +282,7 @@ EventPipeSessionProvider *EventPipeSession::GetSessionProvider(const EventPipePr
     return m_pProviderList->GetSessionProvider(pProvider);
 }
 
-bool EventPipeSession::WriteAllBuffersToFile()
+bool EventPipeSession::WriteAllBuffersToFile(bool *pEventsWritten)
 {
     CONTRACTL
     {
@@ -292,7 +300,7 @@ bool EventPipeSession::WriteAllBuffersToFile()
     // the current timestamp are written into the file.
     LARGE_INTEGER stopTimeStamp;
     QueryPerformanceCounter(&stopTimeStamp);
-    m_pBufferManager->WriteAllBuffersToFile(m_pFile, stopTimeStamp);
+    m_pBufferManager->WriteAllBuffersToFile(m_pFile, stopTimeStamp, pEventsWritten);
     return !m_pFile->HasErrors();
 }
 
@@ -350,6 +358,13 @@ EventPipeEventInstance *EventPipeSession::GetNextEvent()
     return m_pBufferManager->GetNextEvent();
 }
 
+CLREvent *EventPipeSession::GetWaitEvent()
+{
+    LIMITED_METHOD_CONTRACT;
+
+    return m_pBufferManager->GetWaitEvent();
+}
+
 void EventPipeSession::Enable()
 {
     CONTRACTL
@@ -430,6 +445,9 @@ void EventPipeSession::DisableIpcStreamingThread()
     // when profiling is disabled.
     m_ipcStreamingEnabled = false;
 
+    // Thread could be waiting on the event that there is new data to read.
+    m_pBufferManager->GetWaitEvent()->Set();
+
     // Wait for the sampling thread to clean itself up.
     m_threadShutdownEvent.Wait(INFINITE, FALSE /* bAlertable */);
     m_threadShutdownEvent.CloseEvent();
@@ -448,7 +466,8 @@ void EventPipeSession::Disable()
     if ((m_SessionType == EventPipeSessionType::IpcStream) && m_ipcStreamingEnabled)
         DisableIpcStreamingThread();
 
-    WriteAllBuffersToFile();
+    bool ignored;
+    WriteAllBuffersToFile(&ignored);
     m_pProviderList->Clear();
 }
 
index 288f394..31e9547 100644 (file)
@@ -189,7 +189,7 @@ public:
     // Get the session provider for the specified provider if present.
     EventPipeSessionProvider* GetSessionProvider(const EventPipeProvider *pProvider) const;
 
-    bool WriteAllBuffersToFile();
+    bool WriteAllBuffersToFile(bool *pEventsWritten);
 
     bool WriteEventBuffered(
         Thread *pThread,
@@ -205,6 +205,8 @@ public:
 
     EventPipeEventInstance *GetNextEvent();
 
+    CLREvent *GetWaitEvent();
+
     // Enable a session in the event pipe.
     void Enable();