Synchronize Metrics Events (#53204)
authorTarek Mahmoud Sayed <tarekms@microsoft.com>
Wed, 26 May 2021 05:09:43 +0000 (22:09 -0700)
committerGitHub <noreply@github.com>
Wed, 26 May 2021 05:09:43 +0000 (22:09 -0700)
src/libraries/System.Diagnostics.DiagnosticSource/src/System/Diagnostics/Metrics/Instrument.cs
src/libraries/System.Diagnostics.DiagnosticSource/src/System/Diagnostics/Metrics/Meter.cs
src/libraries/System.Diagnostics.DiagnosticSource/src/System/Diagnostics/Metrics/MeterListener.cs
src/libraries/System.Diagnostics.DiagnosticSource/tests/MetricsTests.cs

index ec2112f..8851f30 100644 (file)
@@ -19,6 +19,16 @@ namespace System.Diagnostics.Metrics
         internal static KeyValuePair<string, object?>[] EmptyTags { get; } = Array.Empty<KeyValuePair<string, object?>>();
 #endif // NO_ARRAY_EMPTY_SUPPORT
 
+        // The SyncObject is used to synchronize the following operations:
+        //  - Instrument.Publish()
+        //  - Meter constructor
+        //  - Meter.Dispose
+        //  - MeterListener.EnableMeasurementEvents
+        //  - MeterListener.DisableMeasurementEvents
+        //  - MeterListener.Start
+        //  - MeterListener.Dispose
+        internal static object SyncObject { get; } = new object();
+
         // We use LikedList here so we don't have to take any lock while iterating over the list as we always hold on a node which be either valid or null.
         // LinkedList is thread safe for Add and Remove operations.
         internal LinkedList<ListenerSubscription> _subscriptions = new LinkedList<ListenerSubscription>();
@@ -54,8 +64,24 @@ namespace System.Diagnostics.Metrics
         /// </summary>
         protected void Publish()
         {
-            Meter.AddInstrument(this);
-            MeterListener.NotifyForPublishedInstrument(this);
+            List<MeterListener>? allListeners = null;
+            lock (Instrument.SyncObject)
+            {
+                if (Meter.Disposed || !Meter.AddInstrument(this))
+                {
+                    return;
+                }
+
+                allListeners = MeterListener.GetAllListeners();
+            }
+
+            if (allListeners is not null)
+            {
+                foreach (MeterListener listener in allListeners)
+                {
+                    listener.InstrumentPublished?.Invoke(this, listener);
+                }
+            }
         }
 
         /// <summary>
@@ -112,16 +138,19 @@ namespace System.Diagnostics.Metrics
         }
 
         // Called from MeterListener.EnableMeasurementEvents
-        internal void EnableMeasurement(ListenerSubscription subscription)
+        internal object? EnableMeasurement(ListenerSubscription subscription, out bool oldStateStored)
         {
-            while (!_subscriptions.AddIfNotExist(subscription, (s1, s2) => object.ReferenceEquals(s1.Listener, s2.Listener)))
+            oldStateStored = false;
+
+            if (!_subscriptions.AddIfNotExist(subscription, (s1, s2) => object.ReferenceEquals(s1.Listener, s2.Listener)))
             {
                 ListenerSubscription oldSubscription = _subscriptions.Remove(subscription, (s1, s2) => object.ReferenceEquals(s1.Listener, s2.Listener));
-                if (object.ReferenceEquals(oldSubscription.Listener, subscription.Listener))
-                {
-                    oldSubscription.Listener.MeasurementsCompleted?.Invoke(this, oldSubscription.State);
-                }
+                _subscriptions.AddIfNotExist(subscription, (s1, s2) => object.ReferenceEquals(s1.Listener, s2.Listener));
+                oldStateStored = object.ReferenceEquals(oldSubscription.Listener, subscription.Listener);
+                return oldSubscription.State;
             }
+
+            return false;
         }
 
         // Called from MeterListener.DisableMeasurementEvents
index edbf867..501f2af 100644 (file)
@@ -15,8 +15,9 @@ namespace System.Diagnostics.Metrics
 #endif
     public class Meter : IDisposable
     {
-        private static LinkedList<Meter> s_allMeters = new LinkedList<Meter>();
-        private LinkedList<Instrument>? _instruments;
+        private static List<Meter> s_allMeters = new List<Meter>();
+        private List<Instrument> _instruments = new List<Instrument>();
+        internal bool Disposed { get; private set; }
 
         /// <summary>
         /// Initializes a new instance of the Meter using the meter name.
@@ -39,7 +40,10 @@ namespace System.Diagnostics.Metrics
             Name = name;
             Version = version;
 
-            s_allMeters.Add(this);
+            lock (Instrument.SyncObject)
+            {
+                s_allMeters.Add(this);
+            }
         }
 
         /// <summary>
@@ -150,55 +154,60 @@ namespace System.Diagnostics.Metrics
         /// </summary>
         public void Dispose()
         {
-            s_allMeters.Remove(this, (meter1, meter2) => object.ReferenceEquals(meter1, meter2));
+            List<Instrument>? instruments = null;
 
-            if (_instruments is not null)
+            lock (Instrument.SyncObject)
             {
-                LinkedListNode<Instrument>? current = _instruments.First;
-
-                while (current is not null)
+                if (Disposed)
                 {
-                    current.Value.NotifyForUnpublishedInstrument();
-                    current = current.Next;
+                    return;
                 }
+                Disposed = true;
 
-                _instruments.Clear();
+                s_allMeters.Remove(this);
+                instruments = _instruments;
+                _instruments = new List<Instrument>();
+            }
+
+            if (instruments is not null)
+            {
+                foreach (Instrument instrument in instruments)
+                {
+                    instrument.NotifyForUnpublishedInstrument();
+                }
             }
         }
 
         // AddInstrument will be called when publishing the instrument (i.e. calling Instrument.Publish()).
-        internal void AddInstrument(Instrument instrument)
+        internal bool AddInstrument(Instrument instrument)
         {
-            if (_instruments is null)
+            if (!_instruments.Contains(instrument))
             {
-                Interlocked.CompareExchange(ref _instruments, new LinkedList<Instrument>(), null);
+                _instruments.Add(instrument);
+                return true;
             }
-
-            Debug.Assert(_instruments is not null);
-
-            _instruments.AddIfNotExist(instrument, (instrument1, instrument2) => object.ReferenceEquals(instrument1, instrument2));
+            return false;
         }
 
         // Called from MeterListener.Start
-        internal static void NotifyListenerWithAllPublishedInstruments(MeterListener listener)
+        internal static List<Instrument>? GetPublishedInstruments()
         {
-            Action<Instrument, MeterListener>? instrumentPublished = listener.InstrumentPublished;
-            if (instrumentPublished is null)
-            {
-                return;
-            }
+            List<Instrument>? instruments = null;
 
-            LinkedListNode<Meter>? current = s_allMeters.First;
-            while (current is not null)
+            if (s_allMeters.Count > 0)
             {
-                LinkedListNode<Instrument>? currentInstrument = current.Value._instruments?.First;
-                while (currentInstrument is not null)
+                instruments = new List<Instrument>();
+
+                foreach (Meter meter in s_allMeters)
                 {
-                    instrumentPublished.Invoke(currentInstrument.Value, listener);
-                    currentInstrument = currentInstrument.Next;
+                    foreach (Instrument instrument in meter._instruments)
+                    {
+                        instruments.Add(instrument);
+                    }
                 }
-                current = current.Next;
             }
+
+            return instruments;
         }
     }
 }
index 0202b11..58af5a9 100644 (file)
@@ -21,7 +21,7 @@ namespace System.Diagnostics.Metrics
     {
         // We use LikedList here so we don't have to take any lock while iterating over the list as we always hold on a node which be either valid or null.
         // LinkedList is thread safe for Add, Remove, and Clear operations.
-        private static LinkedList<MeterListener> s_allStartedListeners = new LinkedList<MeterListener>();
+        private static List<MeterListener> s_allStartedListeners = new List<MeterListener>();
 
         // List of the instruments which the current listener is listening to.
         private LinkedList<Instrument> _enabledMeasurementInstruments = new LinkedList<Instrument>();
@@ -59,13 +59,33 @@ namespace System.Diagnostics.Metrics
         /// <param name="state">A state object which will be passed back to the callback getting measurements events.</param>
         public void EnableMeasurementEvents(Instrument instrument, object? state = null)
         {
-            if (instrument is null || _disposed)
+            bool oldStateStored = false;
+            bool enabled = false;
+            object? oldState = null;
+
+            lock (Instrument.SyncObject)
             {
-                return;
+                if (instrument is not null && !_disposed && !instrument.Meter.Disposed)
+                {
+                    _enabledMeasurementInstruments.AddIfNotExist(instrument, (instrument1, instrument2) => object.ReferenceEquals(instrument1, instrument2));
+                    oldState = instrument.EnableMeasurement(new ListenerSubscription(this, state), out oldStateStored);
+                    enabled = true;
+                }
             }
 
-            _enabledMeasurementInstruments.AddIfNotExist(instrument, (instrument1, instrument2) => object.ReferenceEquals(instrument1, instrument2));
-            instrument.EnableMeasurement(new ListenerSubscription(this, state));
+            if (enabled)
+            {
+                if (oldStateStored && MeasurementsCompleted is not null)
+                {
+                    MeasurementsCompleted?.Invoke(instrument!, oldState);
+                }
+            }
+            else
+            {
+                // The caller trying to enable the measurements but it didn't happen because the meter or the listener is disposed.
+                // We need to send MeasurementsCompleted notification telling this instrument is not enabled for measuring.
+                MeasurementsCompleted?.Invoke(instrument!, state);
+            }
         }
 
         /// <summary>
@@ -75,14 +95,18 @@ namespace System.Diagnostics.Metrics
         /// <returns>The state object originally passed to <see cref="EnableMeasurementEvents" /> method.</returns>
         public object? DisableMeasurementEvents(Instrument instrument)
         {
-            if (instrument is null)
+            object? state =  null;
+            lock (Instrument.SyncObject)
             {
-                return default;
+                if (instrument is null || _enabledMeasurementInstruments.Remove(instrument, (instrument1, instrument2) => object.ReferenceEquals(instrument1, instrument2)) == default)
+                {
+                    return default;
+                }
+
+                state =  instrument.DisableMeasurements(this);
             }
 
-            _enabledMeasurementInstruments.Remove(instrument, (instrument1, instrument2) => object.ReferenceEquals(instrument1, instrument2));
-            object? state =  instrument.DisableMeasurements(this);
-            MeasurementsCompleted?.Invoke(instrument!, state);
+            MeasurementsCompleted?.Invoke(instrument, state);
             return state;
         }
 
@@ -132,14 +156,27 @@ namespace System.Diagnostics.Metrics
         /// </summary>
         public void Start()
         {
-            if (_disposed)
+            List<Instrument>? publishedInstruments = null;
+            lock (Instrument.SyncObject)
             {
-                return;
+                if (_disposed)
+                {
+                    return;
+                }
+
+                if (!s_allStartedListeners.Contains(this))
+                {
+                    s_allStartedListeners.Add(this);
+                    publishedInstruments = Meter.GetPublishedInstruments();
+                }
             }
 
-            if (s_allStartedListeners.AddIfNotExist(this, (listener1, listener2) => object.ReferenceEquals(listener1, listener2)))
+            if (publishedInstruments is not null)
             {
-                Meter.NotifyListenerWithAllPublishedInstruments(this);
+                foreach (Instrument instrument in publishedInstruments)
+                {
+                    InstrumentPublished?.Invoke(instrument, this);
+                }
             }
         }
 
@@ -165,36 +202,46 @@ namespace System.Diagnostics.Metrics
         /// </summary>
         public void Dispose()
         {
-            if (_disposed)
+            Dictionary<Instrument, object?>? callbacksArguments = null;
+            Action<Instrument, object?>? measurementsCompleted = MeasurementsCompleted;
+
+            lock (Instrument.SyncObject)
             {
-                return;
-            }
-            _disposed = true;
-            s_allStartedListeners.Remove(this, (listener1, listener2) => object.ReferenceEquals(listener1, listener2));
+                if (_disposed)
+                {
+                    return;
+                }
+                _disposed = true;
+                s_allStartedListeners.Remove(this);
 
-            LinkedListNode<Instrument>? current = _enabledMeasurementInstruments.First;
+                LinkedListNode<Instrument>? current = _enabledMeasurementInstruments.First;
+                if (current is not null && measurementsCompleted is not null)
+                {
+                    callbacksArguments = new Dictionary<Instrument, object?>();
 
-            while (current is not null)
-            {
-                object? state = current.Value.DisableMeasurements(this);
-                MeasurementsCompleted?.Invoke(current.Value, state);
-                current = current.Next;
-            }
+                    do
+                    {
+                        object? state = current.Value.DisableMeasurements(this);
+                        callbacksArguments.Add(current.Value, state);
+                        current = current.Next;
+                    } while (current is not null);
 
-            _enabledMeasurementInstruments.Clear();
-        }
+                    _enabledMeasurementInstruments.Clear();
+                }
+            }
 
-        // NotifyForPublishedInstrument will be called every time publishing instrument
-        internal static void NotifyForPublishedInstrument(Instrument instrument)
-        {
-            LinkedListNode<MeterListener>? current = s_allStartedListeners.First;
-            while (current is not null)
+            if (callbacksArguments is not null)
             {
-                current.Value.InstrumentPublished?.Invoke(instrument, current.Value);
-                current = current.Next;
+                foreach (KeyValuePair<Instrument, object?> kvp in callbacksArguments)
+                {
+                    measurementsCompleted?.Invoke(kvp.Key, kvp.Value);
+                }
             }
         }
 
+        // Publish is called from Instrument.Publish
+        internal static List<MeterListener>? GetAllListeners() => s_allStartedListeners.Count == 0 ? null : new List<MeterListener>(s_allStartedListeners);
+
         [MethodImpl(MethodImplOptions.AggressiveInlining)]
         internal void NotifyMeasurement<T>(Instrument instrument, T measurement, ReadOnlySpan<KeyValuePair<string, object?>> tags, object? state) where T : struct
         {
index 39fd23b..896356f 100644 (file)
@@ -695,6 +695,74 @@ namespace System.Diagnostics.Metrics.Tests
                 Assert.Equal(loopLength * 8, totalCount);
             }).Dispose();
         }
+
+        [ConditionalFact(typeof(RemoteExecutor), nameof(RemoteExecutor.IsSupported))]
+        public void SerializedEventsTest()
+        {
+            RemoteExecutor.Invoke(() => {
+
+                const int MaxMetersCount = 50;
+
+                Meter [] meters = new Meter[MaxMetersCount];
+                for (int i = 0; i < MaxMetersCount; i++)
+                {
+                    meters[i] = new Meter("SerializedEventsTest" + i);
+                }
+
+                Dictionary<Instrument, bool> instruments = new Dictionary<Instrument, bool>();
+
+                MeterListener listener = new MeterListener()
+                {
+                    InstrumentPublished = (instrument, theListener) =>
+                    {
+                        lock (instruments)
+                        {
+                            Assert.False(instruments.ContainsKey(instrument), $"{instrument.Name}, {instrument.Meter.Name} is already published before");
+                            instruments.Add(instrument, true);
+                            theListener.EnableMeasurementEvents(instrument, null);
+                        }
+                    },
+
+                    MeasurementsCompleted = (instrument, state) =>
+                    {
+                        lock (instruments)
+                        {
+                            Assert.True(instruments.Remove(instrument), $"{instrument.Name}, {instrument.Meter.Name} is not published while getting completed results");
+                        }
+                    }
+                };
+
+                listener.Start();
+
+                int counterCounter = 0;
+                Random r = new Random();
+
+                Task [] jobs = new Task[Environment.ProcessorCount];
+                for (int i = 0; i < jobs.Length; i++)
+                {
+                    jobs[i] = Task.Run(() => {
+                        for (int j = 0; j < 10; j++)
+                        {
+                            int index = r.Next(MaxMetersCount);
+
+                            for (int k = 0; k < 10; k++)
+                            {
+                                int c = Interlocked.Increment(ref counterCounter);
+                                Counter<int> counter = meters[index].CreateCounter<int>("Counter");
+                                counter.Add(1);
+                            }
+
+                            meters[index].Dispose();
+                        }
+                    });
+                }
+
+                Task.WaitAll(jobs);
+                listener.Dispose();
+                Assert.Equal(0, instruments.Count);
+            }).Dispose();
+        }
+
         private void PublishCounterMeasurement<T>(Counter<T> counter, T value, KeyValuePair<string, object?>[] tags) where T : struct
         {
             switch (tags.Length)
@@ -853,4 +921,4 @@ namespace System.Diagnostics.Metrics.Tests
         }
 
     }
-}
\ No newline at end of file
+}