internal static KeyValuePair<string, object?>[] EmptyTags { get; } = Array.Empty<KeyValuePair<string, object?>>();
#endif // NO_ARRAY_EMPTY_SUPPORT
+ // The SyncObject is used to synchronize the following operations:
+ // - Instrument.Publish()
+ // - Meter constructor
+ // - Meter.Dispose
+ // - MeterListener.EnableMeasurementEvents
+ // - MeterListener.DisableMeasurementEvents
+ // - MeterListener.Start
+ // - MeterListener.Dispose
+ internal static object SyncObject { get; } = new object();
+
// We use LikedList here so we don't have to take any lock while iterating over the list as we always hold on a node which be either valid or null.
// LinkedList is thread safe for Add and Remove operations.
internal LinkedList<ListenerSubscription> _subscriptions = new LinkedList<ListenerSubscription>();
/// </summary>
protected void Publish()
{
- Meter.AddInstrument(this);
- MeterListener.NotifyForPublishedInstrument(this);
+ List<MeterListener>? allListeners = null;
+ lock (Instrument.SyncObject)
+ {
+ if (Meter.Disposed || !Meter.AddInstrument(this))
+ {
+ return;
+ }
+
+ allListeners = MeterListener.GetAllListeners();
+ }
+
+ if (allListeners is not null)
+ {
+ foreach (MeterListener listener in allListeners)
+ {
+ listener.InstrumentPublished?.Invoke(this, listener);
+ }
+ }
}
/// <summary>
}
// Called from MeterListener.EnableMeasurementEvents
- internal void EnableMeasurement(ListenerSubscription subscription)
+ internal object? EnableMeasurement(ListenerSubscription subscription, out bool oldStateStored)
{
- while (!_subscriptions.AddIfNotExist(subscription, (s1, s2) => object.ReferenceEquals(s1.Listener, s2.Listener)))
+ oldStateStored = false;
+
+ if (!_subscriptions.AddIfNotExist(subscription, (s1, s2) => object.ReferenceEquals(s1.Listener, s2.Listener)))
{
ListenerSubscription oldSubscription = _subscriptions.Remove(subscription, (s1, s2) => object.ReferenceEquals(s1.Listener, s2.Listener));
- if (object.ReferenceEquals(oldSubscription.Listener, subscription.Listener))
- {
- oldSubscription.Listener.MeasurementsCompleted?.Invoke(this, oldSubscription.State);
- }
+ _subscriptions.AddIfNotExist(subscription, (s1, s2) => object.ReferenceEquals(s1.Listener, s2.Listener));
+ oldStateStored = object.ReferenceEquals(oldSubscription.Listener, subscription.Listener);
+ return oldSubscription.State;
}
+
+ return false;
}
// Called from MeterListener.DisableMeasurementEvents
#endif
public class Meter : IDisposable
{
- private static LinkedList<Meter> s_allMeters = new LinkedList<Meter>();
- private LinkedList<Instrument>? _instruments;
+ private static List<Meter> s_allMeters = new List<Meter>();
+ private List<Instrument> _instruments = new List<Instrument>();
+ internal bool Disposed { get; private set; }
/// <summary>
/// Initializes a new instance of the Meter using the meter name.
Name = name;
Version = version;
- s_allMeters.Add(this);
+ lock (Instrument.SyncObject)
+ {
+ s_allMeters.Add(this);
+ }
}
/// <summary>
/// </summary>
public void Dispose()
{
- s_allMeters.Remove(this, (meter1, meter2) => object.ReferenceEquals(meter1, meter2));
+ List<Instrument>? instruments = null;
- if (_instruments is not null)
+ lock (Instrument.SyncObject)
{
- LinkedListNode<Instrument>? current = _instruments.First;
-
- while (current is not null)
+ if (Disposed)
{
- current.Value.NotifyForUnpublishedInstrument();
- current = current.Next;
+ return;
}
+ Disposed = true;
- _instruments.Clear();
+ s_allMeters.Remove(this);
+ instruments = _instruments;
+ _instruments = new List<Instrument>();
+ }
+
+ if (instruments is not null)
+ {
+ foreach (Instrument instrument in instruments)
+ {
+ instrument.NotifyForUnpublishedInstrument();
+ }
}
}
// AddInstrument will be called when publishing the instrument (i.e. calling Instrument.Publish()).
- internal void AddInstrument(Instrument instrument)
+ internal bool AddInstrument(Instrument instrument)
{
- if (_instruments is null)
+ if (!_instruments.Contains(instrument))
{
- Interlocked.CompareExchange(ref _instruments, new LinkedList<Instrument>(), null);
+ _instruments.Add(instrument);
+ return true;
}
-
- Debug.Assert(_instruments is not null);
-
- _instruments.AddIfNotExist(instrument, (instrument1, instrument2) => object.ReferenceEquals(instrument1, instrument2));
+ return false;
}
// Called from MeterListener.Start
- internal static void NotifyListenerWithAllPublishedInstruments(MeterListener listener)
+ internal static List<Instrument>? GetPublishedInstruments()
{
- Action<Instrument, MeterListener>? instrumentPublished = listener.InstrumentPublished;
- if (instrumentPublished is null)
- {
- return;
- }
+ List<Instrument>? instruments = null;
- LinkedListNode<Meter>? current = s_allMeters.First;
- while (current is not null)
+ if (s_allMeters.Count > 0)
{
- LinkedListNode<Instrument>? currentInstrument = current.Value._instruments?.First;
- while (currentInstrument is not null)
+ instruments = new List<Instrument>();
+
+ foreach (Meter meter in s_allMeters)
{
- instrumentPublished.Invoke(currentInstrument.Value, listener);
- currentInstrument = currentInstrument.Next;
+ foreach (Instrument instrument in meter._instruments)
+ {
+ instruments.Add(instrument);
+ }
}
- current = current.Next;
}
+
+ return instruments;
}
}
}
{
// We use LikedList here so we don't have to take any lock while iterating over the list as we always hold on a node which be either valid or null.
// LinkedList is thread safe for Add, Remove, and Clear operations.
- private static LinkedList<MeterListener> s_allStartedListeners = new LinkedList<MeterListener>();
+ private static List<MeterListener> s_allStartedListeners = new List<MeterListener>();
// List of the instruments which the current listener is listening to.
private LinkedList<Instrument> _enabledMeasurementInstruments = new LinkedList<Instrument>();
/// <param name="state">A state object which will be passed back to the callback getting measurements events.</param>
public void EnableMeasurementEvents(Instrument instrument, object? state = null)
{
- if (instrument is null || _disposed)
+ bool oldStateStored = false;
+ bool enabled = false;
+ object? oldState = null;
+
+ lock (Instrument.SyncObject)
{
- return;
+ if (instrument is not null && !_disposed && !instrument.Meter.Disposed)
+ {
+ _enabledMeasurementInstruments.AddIfNotExist(instrument, (instrument1, instrument2) => object.ReferenceEquals(instrument1, instrument2));
+ oldState = instrument.EnableMeasurement(new ListenerSubscription(this, state), out oldStateStored);
+ enabled = true;
+ }
}
- _enabledMeasurementInstruments.AddIfNotExist(instrument, (instrument1, instrument2) => object.ReferenceEquals(instrument1, instrument2));
- instrument.EnableMeasurement(new ListenerSubscription(this, state));
+ if (enabled)
+ {
+ if (oldStateStored && MeasurementsCompleted is not null)
+ {
+ MeasurementsCompleted?.Invoke(instrument!, oldState);
+ }
+ }
+ else
+ {
+ // The caller trying to enable the measurements but it didn't happen because the meter or the listener is disposed.
+ // We need to send MeasurementsCompleted notification telling this instrument is not enabled for measuring.
+ MeasurementsCompleted?.Invoke(instrument!, state);
+ }
}
/// <summary>
/// <returns>The state object originally passed to <see cref="EnableMeasurementEvents" /> method.</returns>
public object? DisableMeasurementEvents(Instrument instrument)
{
- if (instrument is null)
+ object? state = null;
+ lock (Instrument.SyncObject)
{
- return default;
+ if (instrument is null || _enabledMeasurementInstruments.Remove(instrument, (instrument1, instrument2) => object.ReferenceEquals(instrument1, instrument2)) == default)
+ {
+ return default;
+ }
+
+ state = instrument.DisableMeasurements(this);
}
- _enabledMeasurementInstruments.Remove(instrument, (instrument1, instrument2) => object.ReferenceEquals(instrument1, instrument2));
- object? state = instrument.DisableMeasurements(this);
- MeasurementsCompleted?.Invoke(instrument!, state);
+ MeasurementsCompleted?.Invoke(instrument, state);
return state;
}
/// </summary>
public void Start()
{
- if (_disposed)
+ List<Instrument>? publishedInstruments = null;
+ lock (Instrument.SyncObject)
{
- return;
+ if (_disposed)
+ {
+ return;
+ }
+
+ if (!s_allStartedListeners.Contains(this))
+ {
+ s_allStartedListeners.Add(this);
+ publishedInstruments = Meter.GetPublishedInstruments();
+ }
}
- if (s_allStartedListeners.AddIfNotExist(this, (listener1, listener2) => object.ReferenceEquals(listener1, listener2)))
+ if (publishedInstruments is not null)
{
- Meter.NotifyListenerWithAllPublishedInstruments(this);
+ foreach (Instrument instrument in publishedInstruments)
+ {
+ InstrumentPublished?.Invoke(instrument, this);
+ }
}
}
/// </summary>
public void Dispose()
{
- if (_disposed)
+ Dictionary<Instrument, object?>? callbacksArguments = null;
+ Action<Instrument, object?>? measurementsCompleted = MeasurementsCompleted;
+
+ lock (Instrument.SyncObject)
{
- return;
- }
- _disposed = true;
- s_allStartedListeners.Remove(this, (listener1, listener2) => object.ReferenceEquals(listener1, listener2));
+ if (_disposed)
+ {
+ return;
+ }
+ _disposed = true;
+ s_allStartedListeners.Remove(this);
- LinkedListNode<Instrument>? current = _enabledMeasurementInstruments.First;
+ LinkedListNode<Instrument>? current = _enabledMeasurementInstruments.First;
+ if (current is not null && measurementsCompleted is not null)
+ {
+ callbacksArguments = new Dictionary<Instrument, object?>();
- while (current is not null)
- {
- object? state = current.Value.DisableMeasurements(this);
- MeasurementsCompleted?.Invoke(current.Value, state);
- current = current.Next;
- }
+ do
+ {
+ object? state = current.Value.DisableMeasurements(this);
+ callbacksArguments.Add(current.Value, state);
+ current = current.Next;
+ } while (current is not null);
- _enabledMeasurementInstruments.Clear();
- }
+ _enabledMeasurementInstruments.Clear();
+ }
+ }
- // NotifyForPublishedInstrument will be called every time publishing instrument
- internal static void NotifyForPublishedInstrument(Instrument instrument)
- {
- LinkedListNode<MeterListener>? current = s_allStartedListeners.First;
- while (current is not null)
+ if (callbacksArguments is not null)
{
- current.Value.InstrumentPublished?.Invoke(instrument, current.Value);
- current = current.Next;
+ foreach (KeyValuePair<Instrument, object?> kvp in callbacksArguments)
+ {
+ measurementsCompleted?.Invoke(kvp.Key, kvp.Value);
+ }
}
}
+ // Publish is called from Instrument.Publish
+ internal static List<MeterListener>? GetAllListeners() => s_allStartedListeners.Count == 0 ? null : new List<MeterListener>(s_allStartedListeners);
+
[MethodImpl(MethodImplOptions.AggressiveInlining)]
internal void NotifyMeasurement<T>(Instrument instrument, T measurement, ReadOnlySpan<KeyValuePair<string, object?>> tags, object? state) where T : struct
{
Assert.Equal(loopLength * 8, totalCount);
}).Dispose();
}
+
+ [ConditionalFact(typeof(RemoteExecutor), nameof(RemoteExecutor.IsSupported))]
+ public void SerializedEventsTest()
+ {
+ RemoteExecutor.Invoke(() => {
+
+ const int MaxMetersCount = 50;
+
+ Meter [] meters = new Meter[MaxMetersCount];
+ for (int i = 0; i < MaxMetersCount; i++)
+ {
+ meters[i] = new Meter("SerializedEventsTest" + i);
+ }
+
+ Dictionary<Instrument, bool> instruments = new Dictionary<Instrument, bool>();
+
+ MeterListener listener = new MeterListener()
+ {
+ InstrumentPublished = (instrument, theListener) =>
+ {
+ lock (instruments)
+ {
+ Assert.False(instruments.ContainsKey(instrument), $"{instrument.Name}, {instrument.Meter.Name} is already published before");
+ instruments.Add(instrument, true);
+ theListener.EnableMeasurementEvents(instrument, null);
+ }
+ },
+
+ MeasurementsCompleted = (instrument, state) =>
+ {
+ lock (instruments)
+ {
+ Assert.True(instruments.Remove(instrument), $"{instrument.Name}, {instrument.Meter.Name} is not published while getting completed results");
+ }
+ }
+ };
+
+ listener.Start();
+
+ int counterCounter = 0;
+ Random r = new Random();
+
+ Task [] jobs = new Task[Environment.ProcessorCount];
+ for (int i = 0; i < jobs.Length; i++)
+ {
+ jobs[i] = Task.Run(() => {
+ for (int j = 0; j < 10; j++)
+ {
+ int index = r.Next(MaxMetersCount);
+
+ for (int k = 0; k < 10; k++)
+ {
+ int c = Interlocked.Increment(ref counterCounter);
+ Counter<int> counter = meters[index].CreateCounter<int>("Counter");
+ counter.Add(1);
+ }
+
+ meters[index].Dispose();
+ }
+ });
+ }
+
+ Task.WaitAll(jobs);
+ listener.Dispose();
+ Assert.Equal(0, instruments.Count);
+ }).Dispose();
+ }
+
private void PublishCounterMeasurement<T>(Counter<T> counter, T value, KeyValuePair<string, object?>[] tags) where T : struct
{
switch (tags.Length)
}
}
-}
\ No newline at end of file
+}