// See the LICENSE file in the project root for more information.
using Microsoft.Diagnostics.NETCore.Client;
+using Microsoft.Diagnostics.Tracing;
using System;
using System.Collections.Generic;
-using System.Linq;
-using System.Text;
using System.Threading;
using System.Threading.Tasks;
{
internal class EventCounterPipeline : EventSourcePipeline<EventPipeCounterPipelineSettings>
{
- private readonly IEnumerable<ICountersLogger> _metricsLogger;
+ private readonly IEnumerable<ICountersLogger> _loggers;
private readonly CounterFilter _filter;
public EventCounterPipeline(DiagnosticsClient client,
EventPipeCounterPipelineSettings settings,
- IEnumerable<ICountersLogger> metricsLogger) : base(client, settings)
+ IEnumerable<ICountersLogger> loggers) : base(client, settings)
{
- _metricsLogger = metricsLogger ?? throw new ArgumentNullException(nameof(metricsLogger));
+ _loggers = loggers ?? throw new ArgumentNullException(nameof(loggers));
if (settings.CounterGroups.Length > 0)
{
}
}
- internal override DiagnosticsEventPipeProcessor CreateProcessor()
+ protected override MonitoringSourceConfiguration CreateConfiguration()
{
- return new DiagnosticsEventPipeProcessor(PipeMode.Metrics, metricLoggers: _metricsLogger, metricIntervalSeconds: (int)Settings.RefreshInterval.TotalSeconds,
- metricFilter: _filter);
+ return new MetricSourceConfiguration(CounterIntervalSeconds, _filter.GetProviders());
}
+
+ protected override async Task OnEventSourceAvailable(EventPipeEventSource eventSource, Func<Task> stopSessionAsync, CancellationToken token)
+ {
+ ExecuteCounterLoggerAction((metricLogger) => metricLogger.PipelineStarted());
+
+ eventSource.Dynamic.All += traceEvent =>
+ {
+ try
+ {
+ // Metrics
+ if (traceEvent.EventName.Equals("EventCounters"))
+ {
+ IDictionary<string, object> payloadVal = (IDictionary<string, object>)(traceEvent.PayloadValue(0));
+ IDictionary<string, object> payloadFields = (IDictionary<string, object>)(payloadVal["Payload"]);
+
+ //Make sure we are part of the requested series. If multiple clients request metrics, all of them get the metrics.
+ string series = payloadFields["Series"].ToString();
+ if (GetInterval(series) != CounterIntervalSeconds * 1000)
+ {
+ return;
+ }
+
+ string counterName = payloadFields["Name"].ToString();
+ if (!_filter.IsIncluded(traceEvent.ProviderName, counterName))
+ {
+ return;
+ }
+
+ float intervalSec = (float)payloadFields["IntervalSec"];
+ string displayName = payloadFields["DisplayName"].ToString();
+ string displayUnits = payloadFields["DisplayUnits"].ToString();
+ double value = 0;
+ CounterType counterType = CounterType.Metric;
+
+ if (payloadFields["CounterType"].Equals("Mean"))
+ {
+ value = (double)payloadFields["Mean"];
+ }
+ else if (payloadFields["CounterType"].Equals("Sum"))
+ {
+ counterType = CounterType.Rate;
+ value = (double)payloadFields["Increment"];
+ if (string.IsNullOrEmpty(displayUnits))
+ {
+ displayUnits = "count";
+ }
+ //TODO Should we make these /sec like the dotnet-counters tool?
+ }
+
+ // Note that dimensional data such as pod and namespace are automatically added in prometheus and azure monitor scenarios.
+ // We no longer added it here.
+ var counterPayload = new CounterPayload(traceEvent.TimeStamp,
+ traceEvent.ProviderName,
+ counterName, displayName,
+ displayUnits,
+ value,
+ counterType,
+ intervalSec);
+
+ ExecuteCounterLoggerAction((metricLogger) => metricLogger.Log(counterPayload));
+ }
+ }
+ catch (Exception)
+ {
+ }
+ };
+
+ using var sourceCompletedTaskSource = new EventTaskSource<Action>(
+ taskComplete => taskComplete,
+ handler => eventSource.Completed += handler,
+ handler => eventSource.Completed -= handler,
+ token);
+
+ await sourceCompletedTaskSource.Task;
+
+ ExecuteCounterLoggerAction((metricLogger) => metricLogger.PipelineStopped());
+ }
+
+ private static int GetInterval(string series)
+ {
+ const string comparison = "Interval=";
+ int interval = 0;
+ if (series.StartsWith(comparison, StringComparison.OrdinalIgnoreCase))
+ {
+ int.TryParse(series.Substring(comparison.Length), out interval);
+ }
+ return interval;
+ }
+
+ private void ExecuteCounterLoggerAction(Action<ICountersLogger> action)
+ {
+ foreach (ICountersLogger logger in _loggers)
+ {
+ try
+ {
+ action(logger);
+ }
+ catch (ObjectDisposedException)
+ {
+ }
+ }
+ }
+
+ private int CounterIntervalSeconds => (int)Settings.RefreshInterval.TotalSeconds;
}
}
// The .NET Foundation licenses this file to you under the MIT license.
// See the LICENSE file in the project root for more information.
+using Microsoft.Diagnostics.NETCore.Client;
+using Microsoft.Diagnostics.Tracing;
using System;
-using System.Collections.Generic;
-using System.Globalization;
using System.IO;
-using System.Linq;
-using System.Text.Json;
using System.Threading;
using System.Threading.Tasks;
-using Graphs;
-using Microsoft.Diagnostics.NETCore.Client;
-using Microsoft.Diagnostics.Tracing;
-using Microsoft.Diagnostics.Tracing.Extensions;
-using Microsoft.Diagnostics.Tracing.Parsers.Clr;
-using Microsoft.Extensions.Logging;
namespace Microsoft.Diagnostics.Monitoring.EventPipe
{
internal partial class DiagnosticsEventPipeProcessor : IAsyncDisposable
{
- private readonly MemoryGraph _gcGraph;
- private readonly ILoggerFactory _loggerFactory;
- private readonly IEnumerable<ICountersLogger> _metricLoggers;
- private readonly PipeMode _mode;
- private readonly int _metricIntervalSeconds;
- private readonly CounterFilter _counterFilter;
- private readonly LogLevel _logsLevel;
- private readonly Func<string, CancellationToken, Task> _processInfoCallback;
- private readonly MonitoringSourceConfiguration _userConfig;
- private readonly Func<Stream, CancellationToken, Task> _onStreamAvailable;
+ private readonly MonitoringSourceConfiguration _configuration;
+ private readonly Func<EventPipeEventSource, Func<Task>, CancellationToken, Task> _onEventSourceAvailable;
private readonly object _lock = new object();
private TaskCompletionSource<bool> _sessionStarted;
- private EventPipeEventSource _eventPipeSession;
+ private EventPipeEventSource _eventSource;
private Func<Task> _stopFunc;
private bool _disposed;
public DiagnosticsEventPipeProcessor(
- PipeMode mode,
- ILoggerFactory loggerFactory = null, // PipeMode = Logs
- LogLevel logsLevel = LogLevel.Debug, // PipeMode = Logs
- IEnumerable<ICountersLogger> metricLoggers = null, // PipeMode = Metrics
- int metricIntervalSeconds = 10, // PipeMode = Metrics
- CounterFilter metricFilter = null, // PipeMode = Metrics
- MemoryGraph gcGraph = null, // PipeMode = GCDump
- MonitoringSourceConfiguration configuration = null, // PipeMode = Nettrace
- Func<Stream, CancellationToken, Task> onStreamAvailable = null, // PipeMode = Nettrace
- Func<string, CancellationToken, Task> processInfoCallback = null // PipeMode = ProcessInfo
+ MonitoringSourceConfiguration configuration,
+ Func<EventPipeEventSource, Func<Task>, CancellationToken, Task> onEventSourceAvailable
)
{
- _metricLoggers = metricLoggers ?? Enumerable.Empty<ICountersLogger>();
- _mode = mode;
- _loggerFactory = loggerFactory;
- _gcGraph = gcGraph;
- _metricIntervalSeconds = metricIntervalSeconds;
- _logsLevel = logsLevel;
- _processInfoCallback = processInfoCallback;
- _userConfig = configuration;
- _onStreamAvailable = onStreamAvailable;
- _processInfoCallback = processInfoCallback;
- _counterFilter = metricFilter;
+ _configuration = configuration ?? throw new ArgumentNullException(nameof(configuration));
+ _onEventSourceAvailable = onEventSourceAvailable ?? throw new ArgumentNullException(nameof(onEventSourceAvailable));
_sessionStarted = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
}
await await Task.Factory.StartNew(async () =>
{
EventPipeEventSource source = null;
- DiagnosticsMonitor monitor = null;
+ EventPipeStreamProvider streamProvider = null;
Task handleEventsTask = Task.CompletedTask;
try
{
- MonitoringSourceConfiguration config = null;
- if (_mode == PipeMode.Logs)
- {
- config = new LoggingSourceConfiguration(_logsLevel);
- }
- else if (_mode == PipeMode.Metrics)
- {
- config = new MetricSourceConfiguration(_metricIntervalSeconds, _counterFilter.GetProviders());
- }
- else if (_mode == PipeMode.GCDump)
- {
- config = new GCDumpSourceConfiguration();
- }
- else if (_mode == PipeMode.ProcessInfo)
- {
- config = new SampleProfilerConfiguration();
- }
- else if (_mode == PipeMode.Nettrace)
- {
- config = _userConfig;
- }
-
- monitor = new DiagnosticsMonitor(config);
+ streamProvider = new EventPipeStreamProvider(_configuration);
// Allows the event handling routines to stop processing before the duration expires.
- Func<Task> stopFunc = () => Task.Run(() => { monitor.StopProcessing(); });
-
- Stream sessionStream = await monitor.ProcessEvents(client, duration, token);
+ Func<Task> stopFunc = () => Task.Run(() => { streamProvider.StopProcessing(); });
- if (_mode == PipeMode.Nettrace)
- {
- if (!_sessionStarted.TrySetResult(true))
- {
- token.ThrowIfCancellationRequested();
- }
-
- lock (_lock)
- {
- //Save the stop function for later, so that we can stop a trace later.
- _stopFunc = stopFunc;
- }
-
- await _onStreamAvailable(sessionStream, token);
- return;
- }
+ Stream sessionStream = await streamProvider.ProcessEvents(client, duration, token);
source = new EventPipeEventSource(sessionStream);
+ handleEventsTask = _onEventSourceAvailable(source, stopFunc, token);
- if (_mode == PipeMode.Metrics)
- {
- // Metrics
- HandleEventCounters(source);
- }
- else if (_mode == PipeMode.Logs)
- {
- // Logging
- HandleLoggingEvents(source);
- }
- else if (_mode == PipeMode.GCDump)
- {
- // GC
- handleEventsTask = HandleGCEvents(source, stopFunc, token);
- }
-
- else if (_mode == PipeMode.ProcessInfo)
- {
- // ProcessInfo
- handleEventsTask = HandleProcessInfo(source, stopFunc, token);
- }
-
- lock(_lock)
+ lock (_lock)
{
- _eventPipeSession = source;
+ _eventSource = source;
_stopFunc = stopFunc;
}
registration.Dispose();
}
finally
{
- ExecuteCounterLoggerAction((metricLogger) => metricLogger.PipelineStopped());
-
registration.Dispose();
- EventPipeEventSource session = null;
+ EventPipeEventSource eventSource = null;
lock (_lock)
{
- session = _eventPipeSession;
- _eventPipeSession = null;
+ eventSource = _eventSource;
+ _eventSource = null;
}
- session?.Dispose();
- if (monitor != null)
+ eventSource?.Dispose();
+ if (streamProvider != null)
{
- await monitor.DisposeAsync();
+ await streamProvider.DisposeAsync();
}
}
Func<Task> stopFunc = null;
lock (_lock)
{
- session = _eventPipeSession;
+ session = _eventSource;
stopFunc = _stopFunc;
}
if (session != null)
}
}
- private void HandleLoggingEvents(EventPipeEventSource source)
- {
- string lastFormattedMessage = string.Empty;
-
- var logActivities = new Dictionary<Guid, LogActivityItem>();
- var stack = new Stack<Guid>();
-
- source.Dynamic.AddCallbackForProviderEvent(LoggingSourceConfiguration.MicrosoftExtensionsLoggingProviderName, "ActivityJson/Start", (traceEvent) =>
- {
- var factoryId = (int)traceEvent.PayloadByName("FactoryID");
- var categoryName = (string)traceEvent.PayloadByName("LoggerName");
- var argsJson = (string)traceEvent.PayloadByName("ArgumentsJson");
-
- // TODO: Store this information by logger factory id
- var item = new LogActivityItem
- {
- ActivityID = traceEvent.ActivityID,
- ScopedObject = new LogObject(JsonDocument.Parse(argsJson).RootElement),
- };
-
- if (stack.Count > 0)
- {
- Guid parentId = stack.Peek();
- if (logActivities.TryGetValue(parentId, out var parentItem))
- {
- item.Parent = parentItem;
- }
- }
-
- stack.Push(traceEvent.ActivityID);
- logActivities[traceEvent.ActivityID] = item;
- });
-
- source.Dynamic.AddCallbackForProviderEvent(LoggingSourceConfiguration.MicrosoftExtensionsLoggingProviderName, "ActivityJson/Stop", (traceEvent) =>
- {
- var factoryId = (int)traceEvent.PayloadByName("FactoryID");
- var categoryName = (string)traceEvent.PayloadByName("LoggerName");
-
- stack.Pop();
- logActivities.Remove(traceEvent.ActivityID);
- });
-
- source.Dynamic.AddCallbackForProviderEvent(LoggingSourceConfiguration.MicrosoftExtensionsLoggingProviderName, "MessageJson", (traceEvent) =>
- {
- // Level, FactoryID, LoggerName, EventID, EventName, ExceptionJson, ArgumentsJson
- var logLevel = (LogLevel)traceEvent.PayloadByName("Level");
- var factoryId = (int)traceEvent.PayloadByName("FactoryID");
- var categoryName = (string)traceEvent.PayloadByName("LoggerName");
- var eventId = (int)traceEvent.PayloadByName("EventId");
- var eventName = (string)traceEvent.PayloadByName("EventName");
- var exceptionJson = (string)traceEvent.PayloadByName("ExceptionJson");
- var argsJson = (string)traceEvent.PayloadByName("ArgumentsJson");
-
- // There's a bug that causes some of the columns to get mixed up
- if (eventName.StartsWith("{"))
- {
- argsJson = exceptionJson;
- exceptionJson = eventName;
- eventName = null;
- }
-
- if (string.IsNullOrEmpty(argsJson))
- {
- return;
- }
-
- Exception exception = null;
-
- ILogger logger = _loggerFactory.CreateLogger(categoryName);
- var scopes = new List<IDisposable>();
-
- if (logActivities.TryGetValue(traceEvent.ActivityID, out var logActivityItem))
- {
- // REVIEW: Does order matter here? We're combining everything anyways.
- while (logActivityItem != null)
- {
- scopes.Add(logger.BeginScope(logActivityItem.ScopedObject));
-
- logActivityItem = logActivityItem.Parent;
- }
- }
-
- try
- {
- if (exceptionJson != "{}")
- {
- var exceptionMessage = JsonSerializer.Deserialize<JsonElement>(exceptionJson);
- exception = new LoggerException(exceptionMessage);
- }
-
- var message = JsonSerializer.Deserialize<JsonElement>(argsJson);
- if (message.TryGetProperty("{OriginalFormat}", out var formatElement))
- {
- var formatString = formatElement.GetString();
- var formatter = new LogValuesFormatter(formatString);
- object[] args = new object[formatter.ValueNames.Count];
- for (int i = 0; i < args.Length; i++)
- {
- args[i] = message.GetProperty(formatter.ValueNames[i]).GetString();
- }
-
- logger.Log(logLevel, new EventId(eventId, eventName), exception, formatString, args);
- }
- else
- {
- var obj = new LogObject(message, lastFormattedMessage);
- logger.Log(logLevel, new EventId(eventId, eventName), obj, exception, LogObject.Callback);
- }
- }
- catch (Exception)
- {
- }
- finally
- {
- scopes.ForEach(d => d.Dispose());
- }
- });
-
- source.Dynamic.AddCallbackForProviderEvent(LoggingSourceConfiguration.MicrosoftExtensionsLoggingProviderName, "FormattedMessage", (traceEvent) =>
- {
- // Level, FactoryID, LoggerName, EventID, EventName, FormattedMessage
- var logLevel = (LogLevel)traceEvent.PayloadByName("Level");
- var factoryId = (int)traceEvent.PayloadByName("FactoryID");
- var categoryName = (string)traceEvent.PayloadByName("LoggerName");
- var eventId = (int)traceEvent.PayloadByName("EventId");
- var eventName = (string)traceEvent.PayloadByName("EventName");
- var formattedMessage = (string)traceEvent.PayloadByName("FormattedMessage");
-
- if (string.IsNullOrEmpty(formattedMessage))
- {
- formattedMessage = eventName;
- eventName = string.Empty;
- }
-
- lastFormattedMessage = formattedMessage;
- });
- }
-
- private void HandleEventCounters(EventPipeEventSource source)
- {
- ExecuteCounterLoggerAction((metricLogger) => metricLogger.PipelineStarted());
-
- source.Dynamic.All += traceEvent =>
- {
- try
- {
- // Metrics
- if (traceEvent.EventName.Equals("EventCounters"))
- {
- IDictionary<string, object> payloadVal = (IDictionary<string, object>)(traceEvent.PayloadValue(0));
- IDictionary<string, object> payloadFields = (IDictionary<string, object>)(payloadVal["Payload"]);
-
- //Make sure we are part of the requested series. If multiple clients request metrics, all of them get the metrics.
- string series = payloadFields["Series"].ToString();
- if (GetInterval(series) != _metricIntervalSeconds * 1000)
- {
- return;
- }
-
- string counterName = payloadFields["Name"].ToString();
- if (!_counterFilter.IsIncluded(traceEvent.ProviderName, counterName))
- {
- return;
- }
-
- float intervalSec = (float)payloadFields["IntervalSec"];
- string displayName = payloadFields["DisplayName"].ToString();
- string displayUnits = payloadFields["DisplayUnits"].ToString();
- double value = 0;
- CounterType counterType = CounterType.Metric;
-
- if (payloadFields["CounterType"].Equals("Mean"))
- {
- value = (double)payloadFields["Mean"];
- }
- else if (payloadFields["CounterType"].Equals("Sum"))
- {
- counterType = CounterType.Rate;
- value = (double)payloadFields["Increment"];
- if (string.IsNullOrEmpty(displayUnits))
- {
- displayUnits = "count";
- }
- //TODO Should we make these /sec like the dotnet-counters tool?
- }
-
- // Note that dimensional data such as pod and namespace are automatically added in prometheus and azure monitor scenarios.
- // We no longer added it here.
- var counterPayload = new CounterPayload(traceEvent.TimeStamp,
- traceEvent.ProviderName,
- counterName, displayName,
- displayUnits,
- value,
- counterType,
- intervalSec);
-
- ExecuteCounterLoggerAction((metricLogger) => metricLogger.Log(counterPayload));
- }
- }
- catch (Exception)
- {
- }
- };
- }
-
- private static int GetInterval(string series)
- {
- const string comparison = "Interval=";
- int interval = 0;
- if (series.StartsWith(comparison, StringComparison.OrdinalIgnoreCase))
- {
- int.TryParse(series.Substring(comparison.Length), out interval);
- }
- return interval;
- }
-
- private void ExecuteCounterLoggerAction(Action<ICountersLogger> action)
- {
- foreach (ICountersLogger metricLogger in _metricLoggers)
- {
- try
- {
- action(metricLogger);
- }
- catch (ObjectDisposedException)
- {
- }
- }
- }
-
- private async Task HandleGCEvents(EventPipeEventSource source, Func<Task> stopFunc, CancellationToken token)
- {
- int gcNum = -1;
-
- Action<GCStartTraceData, Action> gcStartHandler = (GCStartTraceData data, Action taskComplete) =>
- {
- taskComplete();
-
- if (gcNum < 0 && data.Depth == 2 && data.Type != GCType.BackgroundGC)
- {
- gcNum = data.Count;
- }
- };
-
- Action<GCBulkNodeTraceData, Action> gcBulkNodeHandler = (GCBulkNodeTraceData data, Action taskComplete) =>
- {
- taskComplete();
- };
-
- Action<GCEndTraceData, Action> gcEndHandler = (GCEndTraceData data, Action taskComplete) =>
- {
- if (data.Count == gcNum)
- {
- taskComplete();
- }
- };
-
- // Register event handlers on the event source and represent their completion as tasks
- using var gcStartTaskSource = new EventTaskSource<Action<GCStartTraceData>>(
- taskComplete => data => gcStartHandler(data, taskComplete),
- handler => source.Clr.GCStart += handler,
- handler => source.Clr.GCStart -= handler,
- token);
- using var gcBulkNodeTaskSource = new EventTaskSource<Action<GCBulkNodeTraceData>>(
- taskComplete => data => gcBulkNodeHandler(data, taskComplete),
- handler => source.Clr.GCBulkNode += handler,
- handler => source.Clr.GCBulkNode -= handler,
- token);
- using var gcStopTaskSource = new EventTaskSource<Action<GCEndTraceData>>(
- taskComplete => data => gcEndHandler(data, taskComplete),
- handler => source.Clr.GCStop += handler,
- handler => source.Clr.GCStop -= handler,
- token);
- using var sourceCompletedTaskSource = new EventTaskSource<Action>(
- taskComplete => taskComplete,
- handler => source.Completed += handler,
- handler => source.Completed -= handler,
- token);
-
- // A task that is completed whenever GC data is received
- Task gcDataTask = Task.WhenAny(gcStartTaskSource.Task, gcBulkNodeTaskSource.Task);
- Task gcStopTask = gcStopTaskSource.Task;
-
- DotNetHeapDumpGraphReader dumper = new DotNetHeapDumpGraphReader(TextWriter.Null)
- {
- DotNetHeapInfo = new DotNetHeapInfo()
- };
- dumper.SetupCallbacks(_gcGraph, source);
-
- // The event source will not always provide the GC events when it starts listening. However,
- // they will be provided when the event source is told to stop processing events. Give the
- // event source some time to produce the events, but if it doesn't start producing them within
- // a short amount of time (5 seconds), then stop processing events to allow them to be flushed.
- Task eventsTimeoutTask = Task.Delay(TimeSpan.FromSeconds(5), token);
- Task completedTask = await Task.WhenAny(gcDataTask, eventsTimeoutTask);
-
- token.ThrowIfCancellationRequested();
-
- // If started receiving GC events, wait for the GC Stop event.
- if (completedTask == gcDataTask)
- {
- await gcStopTask;
- }
-
- // Stop receiving events; if haven't received events yet, this will force flushing of events.
- await stopFunc();
-
- // Wait for all received events to be processed.
- await sourceCompletedTaskSource.Task;
-
- // Check that GC data and stop events were received. This is done by checking that the
- // associated tasks have ran to completion. If one of them has not reached the completion state, then
- // fail the GC dump operation.
- if (gcDataTask.Status != TaskStatus.RanToCompletion ||
- gcStopTask.Status != TaskStatus.RanToCompletion)
- {
- throw new InvalidOperationException("Unable to create GC dump due to incomplete GC data.");
- }
-
- dumper.ConvertHeapDataToGraph();
-
- _gcGraph.AllowReading();
- }
-
- private async Task HandleProcessInfo(EventPipeEventSource source, Func<Task> stopFunc, CancellationToken token)
- {
- string commandLine = null;
- Action<TraceEvent, Action> processInfoHandler = (TraceEvent traceEvent, Action taskComplete) =>
- {
- commandLine = (string)traceEvent.PayloadByName("CommandLine");
- taskComplete();
- };
-
- // Completed when the ProcessInfo event of the Microsoft-DotNETCore-EventPipe event provider is handled
- using var processInfoTaskSource = new EventTaskSource<Action<TraceEvent>>(
- taskComplete => traceEvent => processInfoHandler(traceEvent, taskComplete),
- handler => source.Dynamic.AddCallbackForProviderEvent(MonitoringSourceConfiguration.EventPipeProviderName, "ProcessInfo", handler),
- handler => source.Dynamic.RemoveCallback(handler),
- token);
-
- // Completed when any trace event is handled
- using var anyEventTaskSource = new EventTaskSource<Action<TraceEvent>>(
- taskComplete => traceEvent => taskComplete(),
- handler => source.Dynamic.All += handler,
- handler => source.Dynamic.All -= handler,
- token);
-
- // Wait for any trace event to be processed
- await anyEventTaskSource.Task;
-
- // Stop the event pipe session
- await stopFunc();
-
- // Wait for the ProcessInfo event to be processed
- await processInfoTaskSource.Task;
-
- // Notify of command line information
- await _processInfoCallback(commandLine, token);
- }
-
public async ValueTask DisposeAsync()
{
lock (_lock)
{
}
- _eventPipeSession?.Dispose();
- }
-
- private class LogActivityItem
- {
- public Guid ActivityID { get; set; }
-
- public LogObject ScopedObject { get; set; }
-
- public LogActivityItem Parent { get; set; }
+ _eventSource?.Dispose();
}
}
}
+++ /dev/null
-// Licensed to the .NET Foundation under one or more agreements.
-// The .NET Foundation licenses this file to you under the MIT license.
-// See the LICENSE file in the project root for more information.
-
-using System;
-using System.IO;
-using System.Threading;
-using System.Threading.Tasks;
-using Microsoft.Diagnostics.NETCore.Client;
-
-namespace Microsoft.Diagnostics.Monitoring.EventPipe
-{
- partial class DiagnosticsEventPipeProcessor
- {
- private sealed class DiagnosticsMonitor : IAsyncDisposable
- {
- private readonly MonitoringSourceConfiguration _sourceConfig;
- private readonly TaskCompletionSource<object> _stopProcessingSource;
- private Task _currentTask;
-
- public DiagnosticsMonitor(MonitoringSourceConfiguration sourceConfig)
- {
- _sourceConfig = sourceConfig;
- _stopProcessingSource = new TaskCompletionSource<object>(TaskCreationOptions.RunContinuationsAsynchronously);
- }
-
- public Task<Stream> ProcessEvents(DiagnosticsClient client, TimeSpan duration, CancellationToken cancellationToken)
- {
- cancellationToken.ThrowIfCancellationRequested();
-
- EventPipeSession session = null;
- try
- {
- session = client.StartEventPipeSession(_sourceConfig.GetProviders(), _sourceConfig.RequestRundown, _sourceConfig.BufferSizeInMB);
- }
- catch (EndOfStreamException e)
- {
- throw new InvalidOperationException("End of stream", e);
- }
- catch (Exception ex) when (!(ex is OperationCanceledException))
- {
- throw new InvalidOperationException("Failed to start the event pipe session", ex);
- }
-
- _currentTask = Task.Run(async () =>
- {
- using var linkedSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
- linkedSource.CancelAfter(duration);
- using var _ = linkedSource.Token.Register(() => _stopProcessingSource.TrySetResult(null));
-
- // Use TaskCompletionSource instead of Task.Delay with cancellation to avoid
- // using exceptions for normal termination of event stream.
- await _stopProcessingSource.Task.ConfigureAwait(false);
- StopSession(session);
- });
-
- return Task.FromResult(session.EventStream);
- }
-
- public void StopProcessing()
- {
- _stopProcessingSource.TrySetResult(null);
- }
-
- private static void StopSession(EventPipeSession session)
- {
- try
- {
- session.Stop();
- }
- catch (EndOfStreamException)
- {
- // If the app we're monitoring exits abruptly, this may throw in which case we just swallow the exception and exit gracefully.
- }
- // We may time out if the process ended before we sent StopTracing command. We can just exit in that case.
- catch (TimeoutException)
- {
- }
- // On Unix platforms, we may actually get a PNSE since the pipe is gone with the process, and Runtime Client Library
- // does not know how to distinguish a situation where there is no pipe to begin with, or where the process has exited
- // before collection started and got rid of a pipe that once existed.
- // Since we are catching this at the end of a session we know that the pipe once existed (otherwise the exception would've
- // been thrown at the beginning directly)
- catch (PlatformNotSupportedException)
- {
- }
- // On non-abrupt exits, the socket may be already closed by the runtime and we won't be able to send a stop request through it.
- catch (ServerNotAvailableException)
- {
- }
- }
-
- public async ValueTask DisposeAsync()
- {
- Task currentTask = _currentTask;
- _stopProcessingSource.TrySetResult(null);
- if (currentTask != null)
- {
- try
- {
- await currentTask.ConfigureAwait(false);
- }
- catch (OperationCanceledException)
- {
- }
- }
- }
- }
- }
-}
\ No newline at end of file
--- /dev/null
+// Licensed to the .NET Foundation under one or more agreements.
+// The .NET Foundation licenses this file to you under the MIT license.
+// See the LICENSE file in the project root for more information.
+
+using System;
+using System.IO;
+using System.Threading;
+using System.Threading.Tasks;
+using Microsoft.Diagnostics.NETCore.Client;
+
+namespace Microsoft.Diagnostics.Monitoring.EventPipe
+{
+ internal sealed class EventPipeStreamProvider : IAsyncDisposable
+ {
+ private readonly MonitoringSourceConfiguration _sourceConfig;
+ private readonly TaskCompletionSource<object> _stopProcessingSource;
+ private Task _currentTask;
+
+ public EventPipeStreamProvider(MonitoringSourceConfiguration sourceConfig)
+ {
+ _sourceConfig = sourceConfig;
+ _stopProcessingSource = new TaskCompletionSource<object>(TaskCreationOptions.RunContinuationsAsynchronously);
+ }
+
+ public Task<Stream> ProcessEvents(DiagnosticsClient client, TimeSpan duration, CancellationToken cancellationToken)
+ {
+ cancellationToken.ThrowIfCancellationRequested();
+
+ EventPipeSession session = null;
+ try
+ {
+ session = client.StartEventPipeSession(_sourceConfig.GetProviders(), _sourceConfig.RequestRundown, _sourceConfig.BufferSizeInMB);
+ }
+ catch (EndOfStreamException e)
+ {
+ throw new InvalidOperationException("End of stream", e);
+ }
+ catch (Exception ex) when (!(ex is OperationCanceledException))
+ {
+ throw new InvalidOperationException("Failed to start the event pipe session", ex);
+ }
+
+ _currentTask = Task.Run(async () =>
+ {
+ using var linkedSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
+ linkedSource.CancelAfter(duration);
+ using var _ = linkedSource.Token.Register(() => _stopProcessingSource.TrySetResult(null));
+
+ // Use TaskCompletionSource instead of Task.Delay with cancellation to avoid
+ // using exceptions for normal termination of event stream.
+ await _stopProcessingSource.Task.ConfigureAwait(false);
+ StopSession(session);
+ });
+
+ return Task.FromResult(session.EventStream);
+ }
+
+ public void StopProcessing()
+ {
+ _stopProcessingSource.TrySetResult(null);
+ }
+
+ private static void StopSession(EventPipeSession session)
+ {
+ try
+ {
+ session.Stop();
+ }
+ catch (EndOfStreamException)
+ {
+ // If the app we're monitoring exits abruptly, this may throw in which case we just swallow the exception and exit gracefully.
+ }
+ // We may time out if the process ended before we sent StopTracing command. We can just exit in that case.
+ catch (TimeoutException)
+ {
+ }
+ // On Unix platforms, we may actually get a PNSE since the pipe is gone with the process, and Runtime Client Library
+ // does not know how to distinguish a situation where there is no pipe to begin with, or where the process has exited
+ // before collection started and got rid of a pipe that once existed.
+ // Since we are catching this at the end of a session we know that the pipe once existed (otherwise the exception would've
+ // been thrown at the beginning directly)
+ catch (PlatformNotSupportedException)
+ {
+ }
+ // On non-abrupt exits, the socket may be already closed by the runtime and we won't be able to send a stop request through it.
+ catch (ServerNotAvailableException)
+ {
+ }
+ }
+
+ public async ValueTask DisposeAsync()
+ {
+ Task currentTask = _currentTask;
+ _stopProcessingSource.TrySetResult(null);
+ if (currentTask != null)
+ {
+ try
+ {
+ await currentTask.ConfigureAwait(false);
+ }
+ catch (OperationCanceledException)
+ {
+ }
+ }
+ }
+ }
+}
\ No newline at end of file
// The .NET Foundation licenses this file to you under the MIT license.
// See the LICENSE file in the project root for more information.
-using Graphs;
using Microsoft.Diagnostics.NETCore.Client;
-using Microsoft.Extensions.Logging;
+using Microsoft.Diagnostics.Tracing;
using System;
-using System.Collections.Generic;
-using System.Linq;
-using System.Text;
using System.Threading;
using System.Threading.Tasks;
_processor = new Lazy<DiagnosticsEventPipeProcessor>(CreateProcessor);
}
- internal abstract DiagnosticsEventPipeProcessor CreateProcessor();
+ protected abstract MonitoringSourceConfiguration CreateConfiguration();
+
+ private DiagnosticsEventPipeProcessor CreateProcessor()
+ {
+ return new DiagnosticsEventPipeProcessor(
+ configuration: CreateConfiguration(),
+ onEventSourceAvailable: OnEventSourceAvailable);
+ }
protected override Task OnRun(CancellationToken token)
{
await Task.WhenAny(stoppingTask, taskCompletionSource.Task).Unwrap();
}
}
+
+ protected virtual Task OnEventSourceAvailable(EventPipeEventSource eventSource, Func<Task> stopSessionAsync, CancellationToken token)
+ {
+ return Task.CompletedTask;
+ }
}
}
using Graphs;
using Microsoft.Diagnostics.NETCore.Client;
+using Microsoft.Diagnostics.Tracing;
using Microsoft.Diagnostics.Tracing.Parsers.Clr;
using System;
-using System.Collections.Generic;
-using System.Text;
+using System.IO;
using System.Threading;
using System.Threading.Tasks;
_gcGraph = gcGraph ?? throw new ArgumentNullException(nameof(gcGraph));
}
- internal override DiagnosticsEventPipeProcessor CreateProcessor()
+ protected override MonitoringSourceConfiguration CreateConfiguration()
{
- return new DiagnosticsEventPipeProcessor(PipeMode.GCDump, gcGraph: _gcGraph);
+ return new GCDumpSourceConfiguration();
+ }
+
+ protected override async Task OnEventSourceAvailable(EventPipeEventSource eventSource, Func<Task> stopSessionAsync, CancellationToken token)
+ {
+ int gcNum = -1;
+
+ Action<GCStartTraceData, Action> gcStartHandler = (GCStartTraceData data, Action taskComplete) =>
+ {
+ taskComplete();
+
+ if (gcNum < 0 && data.Depth == 2 && data.Type != GCType.BackgroundGC)
+ {
+ gcNum = data.Count;
+ }
+ };
+
+ Action<GCBulkNodeTraceData, Action> gcBulkNodeHandler = (GCBulkNodeTraceData data, Action taskComplete) =>
+ {
+ taskComplete();
+ };
+
+ Action<GCEndTraceData, Action> gcEndHandler = (GCEndTraceData data, Action taskComplete) =>
+ {
+ if (data.Count == gcNum)
+ {
+ taskComplete();
+ }
+ };
+
+ // Register event handlers on the event source and represent their completion as tasks
+ using var gcStartTaskSource = new EventTaskSource<Action<GCStartTraceData>>(
+ taskComplete => data => gcStartHandler(data, taskComplete),
+ handler => eventSource.Clr.GCStart += handler,
+ handler => eventSource.Clr.GCStart -= handler,
+ token);
+ using var gcBulkNodeTaskSource = new EventTaskSource<Action<GCBulkNodeTraceData>>(
+ taskComplete => data => gcBulkNodeHandler(data, taskComplete),
+ handler => eventSource.Clr.GCBulkNode += handler,
+ handler => eventSource.Clr.GCBulkNode -= handler,
+ token);
+ using var gcStopTaskSource = new EventTaskSource<Action<GCEndTraceData>>(
+ taskComplete => data => gcEndHandler(data, taskComplete),
+ handler => eventSource.Clr.GCStop += handler,
+ handler => eventSource.Clr.GCStop -= handler,
+ token);
+ using var sourceCompletedTaskSource = new EventTaskSource<Action>(
+ taskComplete => taskComplete,
+ handler => eventSource.Completed += handler,
+ handler => eventSource.Completed -= handler,
+ token);
+
+ // A task that is completed whenever GC data is received
+ Task gcDataTask = Task.WhenAny(gcStartTaskSource.Task, gcBulkNodeTaskSource.Task);
+ Task gcStopTask = gcStopTaskSource.Task;
+
+ DotNetHeapDumpGraphReader dumper = new DotNetHeapDumpGraphReader(TextWriter.Null)
+ {
+ DotNetHeapInfo = new DotNetHeapInfo()
+ };
+ dumper.SetupCallbacks(_gcGraph, eventSource);
+
+ // The event source will not always provide the GC events when it starts listening. However,
+ // they will be provided when the event source is told to stop processing events. Give the
+ // event source some time to produce the events, but if it doesn't start producing them within
+ // a short amount of time (5 seconds), then stop processing events to allow them to be flushed.
+ Task eventsTimeoutTask = Task.Delay(TimeSpan.FromSeconds(5), token);
+ Task completedTask = await Task.WhenAny(gcDataTask, eventsTimeoutTask);
+
+ token.ThrowIfCancellationRequested();
+
+ // If started receiving GC events, wait for the GC Stop event.
+ if (completedTask == gcDataTask)
+ {
+ await gcStopTask;
+ }
+
+ // Stop receiving events; if haven't received events yet, this will force flushing of events.
+ await stopSessionAsync();
+
+ // Wait for all received events to be processed.
+ await sourceCompletedTaskSource.Task;
+
+ // Check that GC data and stop events were received. This is done by checking that the
+ // associated tasks have ran to completion. If one of them has not reached the completion state, then
+ // fail the GC dump operation.
+ if (gcDataTask.Status != TaskStatus.RanToCompletion ||
+ gcStopTask.Status != TaskStatus.RanToCompletion)
+ {
+ throw new InvalidOperationException("Unable to create GC dump due to incomplete GC data.");
+ }
+
+ dumper.ConvertHeapDataToGraph();
+
+ _gcGraph.AllowReading();
}
}
}
// See the LICENSE file in the project root for more information.
using Microsoft.Diagnostics.NETCore.Client;
+using Microsoft.Diagnostics.Tracing;
using Microsoft.Extensions.Logging;
using System;
using System.Collections.Generic;
-using System.Text;
+using System.Text.Json;
using System.Threading;
using System.Threading.Tasks;
_factory = factory ?? throw new ArgumentNullException(nameof(factory));
}
- internal override DiagnosticsEventPipeProcessor CreateProcessor()
+ protected override MonitoringSourceConfiguration CreateConfiguration()
{
- return new DiagnosticsEventPipeProcessor(PipeMode.Logs, loggerFactory: _factory, logsLevel: Settings.LogLevel);
+ return new LoggingSourceConfiguration(Settings.LogLevel);
+ }
+
+ protected override Task OnEventSourceAvailable(EventPipeEventSource eventSource, Func<Task> stopSessionAsync, CancellationToken token)
+ {
+ string lastFormattedMessage = string.Empty;
+
+ var logActivities = new Dictionary<Guid, LogActivityItem>();
+ var stack = new Stack<Guid>();
+
+ eventSource.Dynamic.AddCallbackForProviderEvent(LoggingSourceConfiguration.MicrosoftExtensionsLoggingProviderName, "ActivityJson/Start", (traceEvent) =>
+ {
+ var factoryId = (int)traceEvent.PayloadByName("FactoryID");
+ var categoryName = (string)traceEvent.PayloadByName("LoggerName");
+ var argsJson = (string)traceEvent.PayloadByName("ArgumentsJson");
+
+ // TODO: Store this information by logger factory id
+ var item = new LogActivityItem
+ {
+ ActivityID = traceEvent.ActivityID,
+ ScopedObject = new LogObject(JsonDocument.Parse(argsJson).RootElement),
+ };
+
+ if (stack.Count > 0)
+ {
+ Guid parentId = stack.Peek();
+ if (logActivities.TryGetValue(parentId, out var parentItem))
+ {
+ item.Parent = parentItem;
+ }
+ }
+
+ stack.Push(traceEvent.ActivityID);
+ logActivities[traceEvent.ActivityID] = item;
+ });
+
+ eventSource.Dynamic.AddCallbackForProviderEvent(LoggingSourceConfiguration.MicrosoftExtensionsLoggingProviderName, "ActivityJson/Stop", (traceEvent) =>
+ {
+ var factoryId = (int)traceEvent.PayloadByName("FactoryID");
+ var categoryName = (string)traceEvent.PayloadByName("LoggerName");
+
+ stack.Pop();
+ logActivities.Remove(traceEvent.ActivityID);
+ });
+
+ eventSource.Dynamic.AddCallbackForProviderEvent(LoggingSourceConfiguration.MicrosoftExtensionsLoggingProviderName, "MessageJson", (traceEvent) =>
+ {
+ // Level, FactoryID, LoggerName, EventID, EventName, ExceptionJson, ArgumentsJson
+ var logLevel = (LogLevel)traceEvent.PayloadByName("Level");
+ var factoryId = (int)traceEvent.PayloadByName("FactoryID");
+ var categoryName = (string)traceEvent.PayloadByName("LoggerName");
+ var eventId = (int)traceEvent.PayloadByName("EventId");
+ var eventName = (string)traceEvent.PayloadByName("EventName");
+ var exceptionJson = (string)traceEvent.PayloadByName("ExceptionJson");
+ var argsJson = (string)traceEvent.PayloadByName("ArgumentsJson");
+
+ // There's a bug that causes some of the columns to get mixed up
+ if (eventName.StartsWith("{"))
+ {
+ argsJson = exceptionJson;
+ exceptionJson = eventName;
+ eventName = null;
+ }
+
+ if (string.IsNullOrEmpty(argsJson))
+ {
+ return;
+ }
+
+ Exception exception = null;
+
+ ILogger logger = _factory.CreateLogger(categoryName);
+ var scopes = new List<IDisposable>();
+
+ if (logActivities.TryGetValue(traceEvent.ActivityID, out var logActivityItem))
+ {
+ // REVIEW: Does order matter here? We're combining everything anyways.
+ while (logActivityItem != null)
+ {
+ scopes.Add(logger.BeginScope(logActivityItem.ScopedObject));
+
+ logActivityItem = logActivityItem.Parent;
+ }
+ }
+
+ try
+ {
+ if (exceptionJson != "{}")
+ {
+ var exceptionMessage = JsonSerializer.Deserialize<JsonElement>(exceptionJson);
+ exception = new LoggerException(exceptionMessage);
+ }
+
+ var message = JsonSerializer.Deserialize<JsonElement>(argsJson);
+ if (message.TryGetProperty("{OriginalFormat}", out var formatElement))
+ {
+ var formatString = formatElement.GetString();
+ var formatter = new LogValuesFormatter(formatString);
+ object[] args = new object[formatter.ValueNames.Count];
+ for (int i = 0; i < args.Length; i++)
+ {
+ args[i] = message.GetProperty(formatter.ValueNames[i]).GetString();
+ }
+
+ logger.Log(logLevel, new EventId(eventId, eventName), exception, formatString, args);
+ }
+ else
+ {
+ var obj = new LogObject(message, lastFormattedMessage);
+ logger.Log(logLevel, new EventId(eventId, eventName), obj, exception, LogObject.Callback);
+ }
+ }
+ catch (Exception)
+ {
+ }
+ finally
+ {
+ scopes.ForEach(d => d.Dispose());
+ }
+ });
+
+ eventSource.Dynamic.AddCallbackForProviderEvent(LoggingSourceConfiguration.MicrosoftExtensionsLoggingProviderName, "FormattedMessage", (traceEvent) =>
+ {
+ // Level, FactoryID, LoggerName, EventID, EventName, FormattedMessage
+ var logLevel = (LogLevel)traceEvent.PayloadByName("Level");
+ var factoryId = (int)traceEvent.PayloadByName("FactoryID");
+ var categoryName = (string)traceEvent.PayloadByName("LoggerName");
+ var eventId = (int)traceEvent.PayloadByName("EventId");
+ var eventName = (string)traceEvent.PayloadByName("EventName");
+ var formattedMessage = (string)traceEvent.PayloadByName("FormattedMessage");
+
+ if (string.IsNullOrEmpty(formattedMessage))
+ {
+ formattedMessage = eventName;
+ eventName = string.Empty;
+ }
+
+ lastFormattedMessage = formattedMessage;
+ });
+
+ return Task.CompletedTask;
+ }
+
+ private class LogActivityItem
+ {
+ public Guid ActivityID { get; set; }
+
+ public LogObject ScopedObject { get; set; }
+
+ public LogActivityItem Parent { get; set; }
}
}
}
+++ /dev/null
-// Licensed to the .NET Foundation under one or more agreements.
-// The .NET Foundation licenses this file to you under the MIT license.
-// See the LICENSE file in the project root for more information.
-
-namespace Microsoft.Diagnostics.Monitoring.EventPipe
-{
- internal enum PipeMode
- {
- Logs = 1,
- Metrics,
- GCDump,
- ProcessInfo,
- Nettrace,
- }
-}
_onCommandLine = onCommandLine ?? throw new ArgumentNullException(nameof(onCommandLine));
}
- internal override DiagnosticsEventPipeProcessor CreateProcessor() =>
- new DiagnosticsEventPipeProcessor(
- PipeMode.ProcessInfo,
- processInfoCallback: _onCommandLine);
+ protected override MonitoringSourceConfiguration CreateConfiguration()
+ {
+ return new SampleProfilerConfiguration();
+ }
+
+ protected override async Task OnEventSourceAvailable(EventPipeEventSource eventSource, Func<Task> stopSessionAsync, CancellationToken token)
+ {
+ string commandLine = null;
+ Action<TraceEvent, Action> processInfoHandler = (TraceEvent traceEvent, Action taskComplete) =>
+ {
+ commandLine = (string)traceEvent.PayloadByName("CommandLine");
+ taskComplete();
+ };
+
+ // Completed when the ProcessInfo event of the Microsoft-DotNETCore-EventPipe event provider is handled
+ using var processInfoTaskSource = new EventTaskSource<Action<TraceEvent>>(
+ taskComplete => traceEvent => processInfoHandler(traceEvent, taskComplete),
+ handler => eventSource.Dynamic.AddCallbackForProviderEvent(MonitoringSourceConfiguration.EventPipeProviderName, "ProcessInfo", handler),
+ handler => eventSource.Dynamic.RemoveCallback(handler),
+ token);
+
+ // Completed when any trace event is handled
+ using var anyEventTaskSource = new EventTaskSource<Action<TraceEvent>>(
+ taskComplete => traceEvent => taskComplete(),
+ handler => eventSource.Dynamic.All += handler,
+ handler => eventSource.Dynamic.All -= handler,
+ token);
+
+ // Wait for any trace event to be processed
+ await anyEventTaskSource.Task;
+
+ // Stop the event pipe session
+ await stopSessionAsync();
+
+ // Wait for the ProcessInfo event to be processed
+ await processInfoTaskSource.Task;
+
+ // Notify of command line information
+ await _onCommandLine(commandLine, token);
+ }
}
}
\ No newline at end of file
using Microsoft.Diagnostics.NETCore.Client;
using System;
-using System.Collections.Generic;
using System.IO;
-using System.Text;
using System.Threading;
using System.Threading.Tasks;
namespace Microsoft.Diagnostics.Monitoring.EventPipe
{
- internal class EventTracePipeline : EventSourcePipeline<EventTracePipelineSettings>
+ internal class EventTracePipeline : Pipeline
{
- private readonly Func<Stream, CancellationToken, Task> _streamAvailable;
- public EventTracePipeline(DiagnosticsClient client, EventTracePipelineSettings settings, Func<Stream, CancellationToken, Task> streamAvailable)
- : base(client, settings)
+ private readonly Lazy<EventPipeStreamProvider> _provider;
+ private readonly Func<Stream, CancellationToken, Task> _onStreamAvailable;
+
+ public DiagnosticsClient Client { get; }
+ public EventTracePipelineSettings Settings { get; }
+
+ public EventTracePipeline(DiagnosticsClient client, EventTracePipelineSettings settings, Func<Stream, CancellationToken, Task> onStreamAvailable)
+ {
+ Client = client ?? throw new ArgumentNullException(nameof(client));
+ Settings = settings ?? throw new ArgumentNullException(nameof(settings));
+ _onStreamAvailable = onStreamAvailable ?? throw new ArgumentNullException(nameof(onStreamAvailable));
+ _provider = new Lazy<EventPipeStreamProvider>(CreateProvider);
+ }
+
+ protected override async Task OnRun(CancellationToken token)
+ {
+ try
+ {
+ Stream eventStream = await _provider.Value.ProcessEvents(Client, Settings.Duration, token);
+
+ await _onStreamAvailable(eventStream, token);
+ }
+ catch (InvalidOperationException e)
+ {
+ throw new PipelineException(e.Message, e);
+ }
+ }
+
+ protected override async Task OnCleanup()
+ {
+ if (_provider.IsValueCreated)
+ {
+ await _provider.Value.DisposeAsync();
+ }
+ await base.OnCleanup();
+ }
+
+ protected override Task OnStop(CancellationToken token)
{
- _streamAvailable = streamAvailable ?? throw new ArgumentNullException(nameof(streamAvailable));
+ if (_provider.IsValueCreated)
+ {
+ _provider.Value.StopProcessing();
+ }
+ return Task.CompletedTask;
}
- internal override DiagnosticsEventPipeProcessor CreateProcessor()
+ private EventPipeStreamProvider CreateProvider()
{
- return new DiagnosticsEventPipeProcessor(PipeMode.Nettrace, configuration: Settings.Configuration, onStreamAvailable: _streamAvailable);
+ return new EventPipeStreamProvider(Settings.Configuration);
}
}
}
CounterGroups = counterGroups,
Duration = Timeout.InfiniteTimeSpan,
RefreshInterval = TimeSpan.FromSeconds(options.UpdateIntervalSeconds)
- }, metricsLogger: new[] { new MetricsLogger(_store.MetricsStore) });
+ }, loggers: new[] { new MetricsLogger(_store.MetricsStore) });
using var linkedTokenSource = CancellationTokenSource.CreateLinkedTokenSource(stoppingToken, optionsTokenSource.Token);
await _counterPipeline.RunAsync(linkedTokenSource.Token);