Add streaming support for EventPipe env trace file. (#52443)
authorJohan Lorensson <lateralusx.github@gmail.com>
Wed, 12 May 2021 09:21:01 +0000 (11:21 +0200)
committerGitHub <noreply@github.com>
Wed, 12 May 2021 09:21:01 +0000 (11:21 +0200)
* Add streaming support for EventPipe env trace file.

Using EventPipeOutputPath and EventPipeConfig it is possible to setup
an EventPipe file session running during complete runtime execution.
There was however no logic that would flush this session, so if number
of events included in file exceeded max buffer all subsequent events
would be lost. Depeding on use case (like startup profiling) that could
be acceptable, unless rundown events where needed, since they would
likely be lost unless buffer size was way biffer than default limit (1MB).

Adding ability to run a streaming thread (similar to IPC) together with
file sessions would greatly increase usage of the default file session
and could then be real useful in cases where regular diagnostic
tooling won't work or increase the complexity of the profilig task.

In order to preserve old behavior, a new EventPipeSessionType has been
introduced, EP_SESSION_TYPE_FILESTREAM and that will work similar to
existing EP_SESSION_TYPE_FILE, but also start a streaming thread,
making sure buffer content is periodically flushed into file, reduce
risk of getting a flooded buffer manager triggering lost events.

For the default trace file, old behavior is kept and there is a new env
variable, COMPlus_EventPipeOutputStreaming that can be used to enable
using EP_SESSION_TYPE_FILESTREAM instead of EP_SESSION_TYPE_FILE for the
default file session.

src/coreclr/inc/clrconfigvalues.h
src/coreclr/vm/eventing/eventpipe/ep-rt-coreclr.h
src/mono/mono/eventpipe/ep-rt-mono.h
src/mono/mono/eventpipe/test/ep-session-tests.c
src/native/eventpipe/ep-rt.h
src/native/eventpipe/ep-session.c
src/native/eventpipe/ep-session.h
src/native/eventpipe/ep-types-forward.h
src/native/eventpipe/ep.c

index fec282fbe10ad9d7f6819798e0bdae08ce0530eb..62a7665b9e1d1df9018d85dea313971fb01bdbec 100644 (file)
@@ -708,6 +708,7 @@ RETAIL_CONFIG_STRING_INFO(INTERNAL_EventPipeConfig, W("EventPipeConfig"), "Confi
 RETAIL_CONFIG_DWORD_INFO(INTERNAL_EventPipeRundown, W("EventPipeRundown"), 1, "Enable/disable eventpipe rundown.")
 RETAIL_CONFIG_DWORD_INFO(INTERNAL_EventPipeCircularMB, W("EventPipeCircularMB"), 1024, "The EventPipe circular buffer size in megabytes.")
 RETAIL_CONFIG_DWORD_INFO(INTERNAL_EventPipeProcNumbers, W("EventPipeProcNumbers"), 0, "Enable/disable capturing processor numbers in EventPipe event headers")
+RETAIL_CONFIG_DWORD_INFO(INTERNAL_EventPipeOutputStreaming, W("EventPipeOutputStreaming"), 0, "Enable/disable streaming for trace file set in COMPlus_EventPipeOutputPath.  Non-zero values enable streaming.")
 
 // 
 // Generational Aware Analysis
index 22c6826d7be8e9a3d0efb22051dbb0e6bf221f84..da3ff470670eda53dbe0a1308f9c8e8ff48fd66f 100644 (file)
@@ -1598,6 +1598,15 @@ ep_rt_config_value_get_circular_mb (void)
        return CLRConfig::GetConfigValue (CLRConfig::INTERNAL_EventPipeCircularMB);
 }
 
+static
+inline
+bool
+ep_rt_config_value_get_output_streaming (void)
+{
+       STATIC_CONTRACT_NOTHROW;
+       return CLRConfig::GetConfigValue (CLRConfig::INTERNAL_EventPipeOutputStreaming) != 0;
+}
+
 static
 inline
 bool
index 47e86930f9f11297c1b56472d54d4e3dd6ef92ed..920f5b6c26b9f5c838e35da2912c408a35c4751a 100644 (file)
@@ -877,6 +877,19 @@ ep_rt_config_value_get_circular_mb (void)
        return circular_mb;
 }
 
+static
+inline
+bool
+ep_rt_config_value_get_output_streaming (void)
+{
+       bool enable = false;
+       gchar *value = g_getenv ("COMPlus_EventPipeOutputStreaming");
+       if (value && atoi (value) == 1)
+               enable = true;
+       g_free (value);
+       return enable;
+}
+
 static
 inline
 bool
index 1edcd3649ed1334bd4fcaf03c793803c9f42563a..f40b05047302788359cc1151a8cc82fcbdd9849d 100644 (file)
@@ -196,21 +196,21 @@ test_session_special_get_set (void)
 
        test_location = 4;
 
-       if (ep_session_get_ipc_streaming_enabled (test_session)) {
+       if (ep_session_get_streaming_enabled (test_session)) {
                result = FAILED ("ep_session_get_ipc_streaming_enabled returned true, should be false");
                ep_raise_error ();
        }
 
        test_location = 5;
 
-       ep_session_set_ipc_streaming_enabled (test_session, true);
+       ep_session_set_streaming_enabled (test_session, true);
 
-       if (!ep_session_get_ipc_streaming_enabled (test_session)) {
+       if (!ep_session_get_streaming_enabled (test_session)) {
                result = FAILED ("ep_session_set_ipc_streaming_enabled returned false, should be true");
                ep_raise_error ();
        }
 
-       ep_session_set_ipc_streaming_enabled (test_session, false);
+       ep_session_set_streaming_enabled (test_session, false);
 
        test_location = 6;
 
index 073d256300f27f6d196bb6021d9cba7b7ff56d7d..dab786dbf34a4fe5c37a60b5accf9d4ecf4a1933 100644 (file)
@@ -340,6 +340,11 @@ static
 uint32_t
 ep_rt_config_value_get_circular_mb (void);
 
+static
+inline
+bool
+ep_rt_config_value_get_output_streaming (void);
+
 static
 bool
 ep_rt_config_value_get_use_portable_thread_pool (void);
index 6f7190a66a70dd192b98a8f5dcd1a869653b3d5c..cb26bc302e9c0bf0bb917a9db439fc9595ac70ad 100644 (file)
 
 static
 void
-session_disable_ipc_streaming_thread (EventPipeSession *session);
+session_disable_streaming_thread (EventPipeSession *session);
 
 // _Requires_lock_held (ep)
 static
 void
-session_create_ipc_streaming_thread (EventPipeSession *session);
+session_create_streaming_thread (EventPipeSession *session);
 
 /*
  * EventPipeSession.
@@ -39,19 +39,19 @@ EP_RT_DEFINE_THREAD_FUNC (streaming_thread)
        ep_rt_thread_params_t *thread_params = (ep_rt_thread_params_t *)data;
 
        EventPipeSession *const session = (EventPipeSession *)thread_params->thread_params;
-       if (session->session_type != EP_SESSION_TYPE_IPCSTREAM)
+       if (session->session_type != EP_SESSION_TYPE_IPCSTREAM && session->session_type != EP_SESSION_TYPE_FILESTREAM)
                return 1;
 
        if (!thread_params->thread || !ep_rt_thread_has_started (thread_params->thread))
                return 1;
 
-       session->ipc_streaming_thread = thread_params->thread;
+       session->streaming_thread = thread_params->thread;
 
        bool success = true;
        ep_rt_wait_event_handle_t *wait_event = ep_session_get_wait_event (session);
 
        EP_GCX_PREEMP_ENTER
-               while (ep_session_get_ipc_streaming_enabled (session)) {
+               while (ep_session_get_streaming_enabled (session)) {
                        bool events_written = false;
                        if (!ep_session_write_all_buffers_to_file (session, &events_written)) {
                                success = false;
@@ -74,43 +74,43 @@ EP_RT_DEFINE_THREAD_FUNC (streaming_thread)
        if (!success)
                ep_disable ((EventPipeSessionID)session);
 
-       session->ipc_streaming_thread = NULL;
+       session->streaming_thread = NULL;
 
        return (ep_rt_thread_start_func_return_t)0;
 }
 
 static
 void
-session_create_ipc_streaming_thread (EventPipeSession *session)
+session_create_streaming_thread (EventPipeSession *session)
 {
        EP_ASSERT (session != NULL);
-       EP_ASSERT (session->session_type == EP_SESSION_TYPE_IPCSTREAM);
+       EP_ASSERT (session->session_type == EP_SESSION_TYPE_IPCSTREAM || session->session_type == EP_SESSION_TYPE_FILESTREAM);
 
        ep_requires_lock_held ();
 
-       ep_session_set_ipc_streaming_enabled (session, true);
+       ep_session_set_streaming_enabled (session, true);
        ep_rt_wait_event_alloc (&session->rt_thread_shutdown_event, true, false);
        if (!ep_rt_wait_event_is_valid (&session->rt_thread_shutdown_event))
-               EP_UNREACHABLE ("Unable to create IPC stream flushing thread shutdown event.");
+               EP_UNREACHABLE ("Unable to create stream flushing thread shutdown event.");
 
        ep_rt_thread_id_t thread_id = ep_rt_uint64_t_to_thread_id_t (0);
        if (!ep_rt_thread_create ((void *)streaming_thread, (void *)session, EP_THREAD_TYPE_SESSION, &thread_id))
-               EP_UNREACHABLE ("Unable to create IPC stream flushing thread.");
+               EP_UNREACHABLE ("Unable to create stream flushing thread.");
 }
 
 static
 void
-session_disable_ipc_streaming_thread (EventPipeSession *session)
+session_disable_streaming_thread (EventPipeSession *session)
 {
-       EP_ASSERT (session->session_type == EP_SESSION_TYPE_IPCSTREAM);
-       EP_ASSERT (ep_session_get_ipc_streaming_enabled (session));
+       EP_ASSERT (session->session_type == EP_SESSION_TYPE_IPCSTREAM || session->session_type == EP_SESSION_TYPE_FILESTREAM);
+       EP_ASSERT (ep_session_get_streaming_enabled (session));
 
        EP_ASSERT (!ep_rt_process_detach ());
        EP_ASSERT (session->buffer_manager != NULL);
 
-       // The IPC streaming thread will watch this value and exit
+       // The streaming thread will watch this value and exit
        // when profiling is disabled.
-       ep_session_set_ipc_streaming_enabled (session, false);
+       ep_session_set_streaming_enabled (session, false);
 
        // Thread could be waiting on the event that there is new data to read.
        ep_rt_wait_event_set (ep_buffer_manager_get_rt_wait_event_ref (session->buffer_manager));
@@ -176,6 +176,7 @@ ep_session_alloc (
        // This is used in the EventListener case.
        switch (session_type) {
        case EP_SESSION_TYPE_FILE :
+       case EP_SESSION_TYPE_FILESTREAM :
                if (output_path) {
                        file_stream_writer = ep_file_stream_writer_alloc (output_path);
                        instance->file = ep_file_alloc (ep_file_stream_writer_get_stream_writer_ref (file_stream_writer), format);
@@ -218,7 +219,7 @@ ep_session_free (EventPipeSession *session)
 {
        ep_return_void_if_nok (session != NULL);
 
-       EP_ASSERT (!ep_session_get_ipc_streaming_enabled (session));
+       EP_ASSERT (!ep_session_get_streaming_enabled (session));
 
        ep_rt_wait_event_free (&session->rt_thread_shutdown_event);
 
@@ -375,12 +376,12 @@ ep_session_start_streaming (EventPipeSession *session)
        if (session->file != NULL)
                ep_file_initialize_file (session->file);
 
-       if (session->session_type == EP_SESSION_TYPE_IPCSTREAM)
-               session_create_ipc_streaming_thread (session);
+       if (session->session_type == EP_SESSION_TYPE_IPCSTREAM || session->session_type == EP_SESSION_TYPE_FILESTREAM)
+               session_create_streaming_thread (session);
 
        if (session->session_type == EP_SESSION_TYPE_SYNCHRONOUS) {
                EP_ASSERT (session->file == NULL);
-               EP_ASSERT (!ep_session_get_ipc_streaming_enabled (session));
+               EP_ASSERT (!ep_session_get_streaming_enabled (session));
        }
 
        ep_requires_lock_held ();
@@ -412,8 +413,8 @@ ep_session_disable (EventPipeSession *session)
 {
        EP_ASSERT (session != NULL);
 
-       if (session->session_type == EP_SESSION_TYPE_IPCSTREAM && ep_session_get_ipc_streaming_enabled (session))
-               session_disable_ipc_streaming_thread (session);
+       if ((session->session_type == EP_SESSION_TYPE_IPCSTREAM || session->session_type == EP_SESSION_TYPE_FILESTREAM) && ep_session_get_streaming_enabled (session))
+               session_disable_streaming_thread (session);
 
        bool ignored;
        ep_session_write_all_buffers_to_file (session, &ignored);
@@ -541,19 +542,19 @@ ep_session_set_rundown_enabled (
 }
 
 bool
-ep_session_get_ipc_streaming_enabled (const EventPipeSession *session)
+ep_session_get_streaming_enabled (const EventPipeSession *session)
 {
        EP_ASSERT (session != NULL);
-       return (ep_rt_volatile_load_uint32_t(&session->ipc_streaming_enabled) != 0 ? true : false);
+       return (ep_rt_volatile_load_uint32_t(&session->streaming_enabled) != 0 ? true : false);
 }
 
 void
-ep_session_set_ipc_streaming_enabled (
+ep_session_set_streaming_enabled (
        EventPipeSession *session,
        bool enabled)
 {
        EP_ASSERT (session != NULL);
-       ep_rt_volatile_store_uint32_t (&session->ipc_streaming_enabled, (enabled) ? 1 : 0);
+       ep_rt_volatile_store_uint32_t (&session->streaming_enabled, (enabled) ? 1 : 0);
 }
 
 void
index e82b8f871d0f7d1337d260f27c7b5a51345eb0c8..ebda6c64ddc021262f6fb09baf46106b799bc985 100644 (file)
@@ -23,9 +23,9 @@ struct _EventPipeSession {
 #else
 struct _EventPipeSession_Internal {
 #endif
-       // When the session is of IPC type, this becomes a reference to the streaming thread.
-       ep_rt_thread_handle_t ipc_streaming_thread;
-       // Event object used to signal Disable that the IPC streaming thread is done.
+       // When the session is of IPC or FILE stream type, this becomes a reference to the streaming thread.
+       ep_rt_thread_handle_t streaming_thread;
+       // Event object used to signal Disable that the streaming thread is done.
        ep_rt_wait_event_handle_t rt_thread_shutdown_event;
        // The set of configurations for each provider in the session.
        EventPipeSessionProviderList *providers;
@@ -42,8 +42,8 @@ struct _EventPipeSession_Internal {
        uint32_t index;
        // True if rundown is enabled.
        volatile uint32_t rundown_enabled;
-       // Data members used when an IPC streaming thread is used.
-       volatile uint32_t ipc_streaming_enabled;
+       // Data members used when an streaming thread is used.
+       volatile uint32_t streaming_enabled;
        // The type of the session.
        // This determines behavior within the system (e.g. policies around which events to drop, etc.)
        EventPipeSessionType session_type;
@@ -118,7 +118,7 @@ ep_session_write_sequence_point_unbuffered (EventPipeSession *session);
 // MUST be called AFTER sending the IPC response
 // Side effects:
 // - sends file header information for nettrace format
-// - turns on IpcStreaming thread which flushes events to stream
+// - turns on streaming thread which flushes events to stream
 // _Requires_lock_held (ep)
 void
 ep_session_start_streaming (EventPipeSession *session);
@@ -176,10 +176,10 @@ ep_session_set_rundown_enabled (
        bool enabled);
 
 bool
-ep_session_get_ipc_streaming_enabled (const EventPipeSession *session);
+ep_session_get_streaming_enabled (const EventPipeSession *session);
 
 void
-ep_session_set_ipc_streaming_enabled (
+ep_session_set_streaming_enabled (
        EventPipeSession *session,
        bool enabled);
 
index c6954d632961cd1c761b17d7f1ed4f185e238bca..0248c2266914705224ea4c4ba7771ae91be25596 100644 (file)
@@ -144,7 +144,8 @@ typedef enum {
        EP_SESSION_TYPE_FILE,
        EP_SESSION_TYPE_LISTENER,
        EP_SESSION_TYPE_IPCSTREAM,
-       EP_SESSION_TYPE_SYNCHRONOUS
+       EP_SESSION_TYPE_SYNCHRONOUS,
+       EP_SESSION_TYPE_FILESTREAM
 } EventPipeSessionType ;
 
 typedef enum {
index d13c2ac61ef2c8ece788981c5d918c8c081a0eec..7ddb08a85ae66d710a8e2694ead77a58004ee70b 100644 (file)
@@ -817,7 +817,7 @@ enable_default_session_via_env_variables (void)
                        output_path,
                        ep_circular_mb,
                        ep_config,
-                       EP_SESSION_TYPE_FILE,
+                       ep_rt_config_value_get_output_streaming () ? EP_SESSION_TYPE_FILESTREAM : EP_SESSION_TYPE_FILE,
                        EP_SERIALIZATION_FORMAT_NETTRACE_V4,
                        true,
                        NULL,
@@ -880,7 +880,7 @@ ep_enable (
        ep_requires_lock_not_held ();
 
        // If the state or arguments are invalid, bail here.
-       if (session_type == EP_SESSION_TYPE_FILE && output_path == NULL)
+       if ((session_type == EP_SESSION_TYPE_FILE || session_type == EP_SESSION_TYPE_FILESTREAM) && output_path == NULL)
                return 0;
        if (session_type == EP_SESSION_TYPE_IPCSTREAM && stream == NULL)
                return 0;