lockless buffer reservation/release in EventPipe (#50797)
authorJohn Salem <josalem@microsoft.com>
Thu, 15 Apr 2021 23:20:30 +0000 (16:20 -0700)
committerGitHub <noreply@github.com>
Thu, 15 Apr 2021 23:20:30 +0000 (16:20 -0700)
src/coreclr/vm/eventing/eventpipe/ep-rt-coreclr.h
src/mono/mono/eventpipe/ep-rt-mono.h
src/native/eventpipe/ep-buffer-manager.c
src/native/eventpipe/ep-buffer-manager.h
src/native/eventpipe/ep-rt.h
src/native/eventpipe/ep-thread.c
src/native/eventpipe/ep-thread.h

index c7edf06..abc13dc 100644 (file)
@@ -1219,6 +1219,15 @@ ep_rt_atomic_dec_int64_t (volatile int64_t *value)
        return static_cast<int64_t>(InterlockedDecrement64 ((volatile LONG64 *)(value)));
 }
 
+static
+inline
+size_t
+ep_rt_atomic_compare_exchange_size_t (volatile size_t *target, size_t expected, size_t value)
+{
+       STATIC_CONTRACT_NOTHROW;
+       return static_cast<size_t>(InterlockedCompareExchangeT<size_t> (target, value, expected));
+}
+
 /*
  * EventPipe.
  */
index 2cf41b4..e8590d9 100644 (file)
@@ -783,6 +783,14 @@ ep_rt_atomic_dec_int64_t (volatile int64_t *value)
        return (int64_t)mono_atomic_dec_i64 ((volatile gint64 *)value);
 }
 
+static
+inline
+size_t
+ep_rt_atomic_compare_exchange_size_t (volatile size_t *target, size_t expected, size_t value)
+{
+       return (size_t)(mono_atomic_cas_i32 ((volatile gint32 *)(target), (gint32)(value), (gint32)(expected)));
+}
+
 /*
  * EventPipe.
  */
index 2355d13..bcab43e 100644 (file)
@@ -69,6 +69,20 @@ buffer_manager_deallocate_buffer (
        EventPipeBufferManager *buffer_manager,
        EventPipeBuffer *buffer);
 
+// Attempt to reserve space for a buffer
+static
+bool
+buffer_manager_try_reserve_buffer(
+       EventPipeBufferManager *buffer_manager,
+       uint32_t request_size);
+
+// Release a reserved buffer budget
+static
+void
+buffer_manager_release_buffer(
+       EventPipeBufferManager *buffer_manager,
+       uint32_t size);
+
 // An iterator that can enumerate all the events which have been written into this buffer manager.
 // Initially the iterator starts uninitialized and get_current_event () returns NULL. Calling move_next_xxx ()
 // attempts to advance the cursor to the next event. If there is no event prior to stop_timestamp then
@@ -248,6 +262,44 @@ ep_buffer_list_get_and_remove_head (EventPipeBufferList *buffer_list)
        return ret_buffer;
 }
 
+bool
+buffer_manager_try_reserve_buffer(
+       EventPipeBufferManager *buffer_manager,
+       uint32_t request_size)
+{
+       uint64_t iters = 0;
+       size_t old_size_of_all_buffers;
+       size_t new_size_of_all_buffers;
+       do {
+               old_size_of_all_buffers = buffer_manager->size_of_all_buffers;
+               new_size_of_all_buffers = old_size_of_all_buffers + request_size;
+               iters++;
+               if (iters % 64 == 0) {
+                       ep_rt_thread_sleep (0); // yield the thread to the scheduler in case we're in high contention
+               }
+       } while (new_size_of_all_buffers <= buffer_manager->max_size_of_all_buffers && ep_rt_atomic_compare_exchange_size_t (&buffer_manager->size_of_all_buffers, old_size_of_all_buffers, new_size_of_all_buffers) != old_size_of_all_buffers);
+
+       return new_size_of_all_buffers <= buffer_manager->max_size_of_all_buffers;
+}
+
+void
+buffer_manager_release_buffer(
+       EventPipeBufferManager *buffer_manager,
+       uint32_t size)
+{
+       uint64_t iters = 0;
+       size_t old_size_of_all_buffers;
+       size_t new_size_of_all_buffers;
+       do {
+               old_size_of_all_buffers = buffer_manager->size_of_all_buffers;
+               new_size_of_all_buffers = old_size_of_all_buffers - size;
+               iters++;
+               if (iters % 64 == 0) {
+                       ep_rt_thread_sleep (0); // yield the thread to the scheduler in case we're in high contention
+               }
+       } while (new_size_of_all_buffers >= 0 && ep_rt_atomic_compare_exchange_size_t (&buffer_manager->size_of_all_buffers, old_size_of_all_buffers, new_size_of_all_buffers) != old_size_of_all_buffers);
+}
+
 #ifdef EP_CHECKED_BUILD
 bool
 ep_buffer_list_ensure_consistency (EventPipeBufferList *buffer_list)
@@ -415,7 +467,39 @@ buffer_manager_allocate_buffer_for_thread (
        EventPipeBuffer *new_buffer = NULL;
        EventPipeBufferList *thread_buffer_list = NULL;
        EventPipeSequencePoint* sequence_point = NULL;
-       bool allocate_new_buffer = false;
+       uint32_t sequence_number = 0;
+
+       // Pick a buffer size by multiplying the base buffer size by the number of buffers already allocated for this thread.
+       uint32_t size_multiplier = ep_thread_session_state_get_buffer_count_estimate(thread_session_state) + 1;
+       EP_ASSERT(size_multiplier > 0);
+
+       // Pick the base buffer size.  Checked builds have a smaller size to stress the allocate path more.
+#ifdef EP_CHECKED_BUILD
+       uint32_t base_buffer_size = 30 * 1024; // 30K
+#else
+       uint32_t base_buffer_size = 100 * 1024; // 100K
+#endif
+       uint32_t buffer_size = base_buffer_size * size_multiplier;
+       EP_ASSERT(buffer_size > 0);
+
+
+       buffer_size = EP_MAX (request_size, buffer_size);
+
+       // Don't allow the buffer size to exceed 1MB.
+       const uint32_t max_buffer_size = 1024 * 1024;
+       buffer_size = EP_MIN (buffer_size, max_buffer_size);
+
+
+       // Make sure that buffer size >= request size so that the buffer size does not
+       // determine the max event size.
+       EP_ASSERT (request_size <= buffer_size);
+
+       // Make the buffer size fit into with pagesize-aligned block, since ep_rt_valloc0 expects page-aligned sizes to be passed as arguments
+       buffer_size = (buffer_size + ep_rt_system_get_alloc_granularity () - 1) & ~(uint32_t)(ep_rt_system_get_alloc_granularity () - 1);
+
+       // Attempt to reserve the necessary buffer size
+       EP_ASSERT(buffer_size > 0);
+       ep_return_null_if_nok(buffer_manager_try_reserve_buffer(buffer_manager, buffer_size));
 
        // Allocating a buffer requires us to take the lock.
        EP_SPIN_LOCK_ENTER (&buffer_manager->rt_lock, section1)
@@ -429,62 +513,28 @@ buffer_manager_allocate_buffer_for_thread (
                        thread_buffer_list = NULL;
                }
 
-               // Determine if policy allows us to allocate another buffer
-               size_t available_buffer_size;
-               available_buffer_size = buffer_manager->max_size_of_all_buffers - buffer_manager->size_of_all_buffers;
-               if (request_size <= available_buffer_size)
-                       allocate_new_buffer = true;
-
-               if (allocate_new_buffer) {
-                       // Pick a buffer size by multiplying the base buffer size by the number of buffers already allocated for this thread.
-                       uint32_t size_multiplier = ep_thread_session_state_get_buffer_list (thread_session_state)->buffer_count + 1;
-
-                       // Pick the base buffer size based.  Checked builds have a smaller size to stress the allocate/steal path more.
-#ifdef EP_CHECKED_BUILD
-                       uint32_t base_buffer_size = 30 * 1024; // 30K
-#else
-                       uint32_t base_buffer_size = 100 * 1024; // 100K
-#endif
-                       uint32_t buffer_size = base_buffer_size * size_multiplier;
-
-                       // Make sure that buffer size >= request size so that the buffer size does not
-                       // determine the max event size.
-                       EP_ASSERT (request_size <= available_buffer_size);
-
-                       buffer_size = EP_MAX (request_size, buffer_size);
-                       buffer_size = EP_MIN ((uint32_t)buffer_size, (uint32_t)available_buffer_size);
-
-                       // Don't allow the buffer size to exceed 1MB.
-                       const uint32_t max_buffer_size = 1024 * 1024;
-                       buffer_size = EP_MIN (buffer_size, max_buffer_size);
-
-                       // Make the buffer size fit into with pagesize-aligned block, since ep_rt_valloc0 expects page-aligned sizes to be passed as arguments
-                       buffer_size = (buffer_size + ep_rt_system_get_alloc_granularity () - 1) & ~(uint32_t)(ep_rt_system_get_alloc_granularity () - 1);
-
-                       // The sequence counter is exclusively mutated on this thread so this is a thread-local read.
-                       uint32_t sequence_number = ep_thread_session_state_get_volatile_sequence_number (thread_session_state);
-                       new_buffer = ep_buffer_alloc (buffer_size, ep_thread_session_state_get_thread (thread_session_state), sequence_number);
-                       ep_raise_error_if_nok_holding_spin_lock (new_buffer != NULL, section1);
-
-                       buffer_manager->size_of_all_buffers += buffer_size;
-                       if (buffer_manager->sequence_point_alloc_budget != 0) {
-                               // sequence point bookkeeping
-                               if (buffer_size >= buffer_manager->remaining_sequence_point_alloc_budget) {
-                                       sequence_point = ep_sequence_point_alloc ();
-                                       if (sequence_point) {
-                                               buffer_manager_init_sequence_point_thread_list (buffer_manager, sequence_point);
-                                               ep_raise_error_if_nok_holding_spin_lock (buffer_manager_enqueue_sequence_point (buffer_manager, sequence_point), section1);
-                                               sequence_point = NULL;
-                                       }
-                                       buffer_manager->remaining_sequence_point_alloc_budget = buffer_manager->sequence_point_alloc_budget;
-                               } else {
-                                       buffer_manager->remaining_sequence_point_alloc_budget -= buffer_size;
+               // The sequence counter is exclusively mutated on this thread so this is a thread-local read.
+               sequence_number = ep_thread_session_state_get_volatile_sequence_number (thread_session_state);
+               new_buffer = ep_buffer_alloc (buffer_size, ep_thread_session_state_get_thread (thread_session_state), sequence_number);
+               ep_raise_error_if_nok_holding_spin_lock (new_buffer != NULL, section1);
+
+               if (buffer_manager->sequence_point_alloc_budget != 0) {
+                       // sequence point bookkeeping
+                       if (buffer_size >= buffer_manager->remaining_sequence_point_alloc_budget) {
+                               sequence_point = ep_sequence_point_alloc ();
+                               if (sequence_point) {
+                                       buffer_manager_init_sequence_point_thread_list (buffer_manager, sequence_point);
+                                       ep_raise_error_if_nok_holding_spin_lock (buffer_manager_enqueue_sequence_point (buffer_manager, sequence_point), section1);
+                                       sequence_point = NULL;
                                }
+                               buffer_manager->remaining_sequence_point_alloc_budget = buffer_manager->sequence_point_alloc_budget;
+                       } else {
+                               buffer_manager->remaining_sequence_point_alloc_budget -= buffer_size;
                        }
+               }
 #ifdef EP_CHECKED_BUILD
-                       buffer_manager->num_buffers_allocated++;
+               buffer_manager->num_buffers_allocated++;
 #endif // EP_CHECKED_BUILD
-               }
 
                // Set the buffer on the thread.
                if (new_buffer != NULL)
@@ -506,6 +556,8 @@ ep_on_error:
        ep_buffer_free (new_buffer);
        new_buffer = NULL;
 
+       buffer_manager_release_buffer(buffer_manager, buffer_size);
+
        ep_exit_error_handler ();
 }
 
@@ -518,7 +570,7 @@ buffer_manager_deallocate_buffer (
        EP_ASSERT (buffer_manager != NULL);
 
        if (buffer) {
-               buffer_manager->size_of_all_buffers -= ep_buffer_get_size (buffer);
+               buffer_manager_release_buffer(buffer_manager, ep_buffer_get_size (buffer));
                ep_buffer_free (buffer);
 #ifdef EP_CHECKED_BUILD
                buffer_manager->num_buffers_allocated--;
index 822b845..a4ec7c6 100644 (file)
@@ -114,7 +114,7 @@ struct _EventPipeBufferManager_Internal {
        EventPipeBuffer *current_buffer;
        EventPipeBufferList *current_buffer_list;
        // The total allocation size of buffers under management.
-       size_t size_of_all_buffers;
+       volatile size_t size_of_all_buffers;
        // The maximum allowable size of buffers under management.
        // Attempted allocations above this threshold result in
        // dropped events.
index 3d1d26d..54ef0bf 100644 (file)
@@ -177,6 +177,10 @@ static
 int64_t
 ep_rt_atomic_dec_int64_t (volatile int64_t *value);
 
+static
+size_t
+ep_rt_atomic_compare_exchange_size_t (volatile size_t *target, size_t expected, size_t value);
+
 /*
  * EventPipe.
  */
index 0b48677..16d6538 100644 (file)
@@ -375,6 +375,19 @@ ep_on_error:
        ep_exit_error_handler ();
 }
 
+uint32_t
+ep_thread_session_state_get_buffer_count_estimate(const EventPipeThreadSessionState *thread_session_state)
+{
+       // this is specifically unprotected and allowed to be incorrect due to memory ordering
+       //
+       // buffer_list won't become NULL after getting this reference in the scope of this function.
+       //
+       // buffer_list is only set to NULL when the session is being freed
+       // when this code won't be called.
+       EventPipeBufferList *buffer_list = thread_session_state->buffer_list;
+       return buffer_list == NULL ? 0 : buffer_list->buffer_count;
+}
+
 void
 ep_thread_session_state_free (EventPipeThreadSessionState *thread_session_state)
 {
index 2c5e0a7..0480a2d 100644 (file)
@@ -263,6 +263,9 @@ struct _EventPipeThreadSessionState_Internal {
        // a buffer for this session. It is set back to null when
        // event writing is suspended during session disable.
        // protected by the buffer manager lock.
+       // This field can be read outside the lock when
+       // the buffer allocation logic is estimating how many
+       // buffers a given thread has used (see: ep_thread_session_state_get_buffer_count_estimate and its uses).
        EventPipeBufferList *buffer_list;
 #ifdef EP_CHECKED_BUILD
        // protected by the buffer manager lock.
@@ -311,6 +314,9 @@ ep_thread_session_state_free (EventPipeThreadSessionState *thread_session_state)
 EventPipeThread *
 ep_thread_session_state_get_thread (const EventPipeThreadSessionState *thread_session_state);
 
+uint32_t
+ep_thread_session_state_get_buffer_count_estimate(const EventPipeThreadSessionState *thread_session_state);
+
 // _Requires_lock_held (thread)
 EventPipeBuffer *
 ep_thread_session_state_get_write_buffer (const EventPipeThreadSessionState *thread_session_state);