From: Justin Anderson Date: Mon, 30 Nov 2020 23:47:02 +0000 (-0800) Subject: Refactor pipeline mode details into pipeline classes. (#1776) X-Git-Tag: submit/tizen/20210909.063632~17^2~293 X-Git-Url: http://review.tizen.org/git/?a=commitdiff_plain;h=b1a9da0fdbc16b06a0d14c434b09c0931a5eca67;p=platform%2Fcore%2Fdotnet%2Fdiagnostics.git Refactor pipeline mode details into pipeline classes. (#1776) --- diff --git a/src/Microsoft.Diagnostics.Monitoring.EventPipe/Counters/EventCounterPipeline.cs b/src/Microsoft.Diagnostics.Monitoring.EventPipe/Counters/EventCounterPipeline.cs index 3904afd0c..a77809123 100644 --- a/src/Microsoft.Diagnostics.Monitoring.EventPipe/Counters/EventCounterPipeline.cs +++ b/src/Microsoft.Diagnostics.Monitoring.EventPipe/Counters/EventCounterPipeline.cs @@ -3,10 +3,9 @@ // See the LICENSE file in the project root for more information. using Microsoft.Diagnostics.NETCore.Client; +using Microsoft.Diagnostics.Tracing; using System; using System.Collections.Generic; -using System.Linq; -using System.Text; using System.Threading; using System.Threading.Tasks; @@ -14,14 +13,14 @@ namespace Microsoft.Diagnostics.Monitoring.EventPipe { internal class EventCounterPipeline : EventSourcePipeline { - private readonly IEnumerable _metricsLogger; + private readonly IEnumerable _loggers; private readonly CounterFilter _filter; public EventCounterPipeline(DiagnosticsClient client, EventPipeCounterPipelineSettings settings, - IEnumerable metricsLogger) : base(client, settings) + IEnumerable loggers) : base(client, settings) { - _metricsLogger = metricsLogger ?? throw new ArgumentNullException(nameof(metricsLogger)); + _loggers = loggers ?? throw new ArgumentNullException(nameof(loggers)); if (settings.CounterGroups.Length > 0) { @@ -37,10 +36,113 @@ namespace Microsoft.Diagnostics.Monitoring.EventPipe } } - internal override DiagnosticsEventPipeProcessor CreateProcessor() + protected override MonitoringSourceConfiguration CreateConfiguration() { - return new DiagnosticsEventPipeProcessor(PipeMode.Metrics, metricLoggers: _metricsLogger, metricIntervalSeconds: (int)Settings.RefreshInterval.TotalSeconds, - metricFilter: _filter); + return new MetricSourceConfiguration(CounterIntervalSeconds, _filter.GetProviders()); } + + protected override async Task OnEventSourceAvailable(EventPipeEventSource eventSource, Func stopSessionAsync, CancellationToken token) + { + ExecuteCounterLoggerAction((metricLogger) => metricLogger.PipelineStarted()); + + eventSource.Dynamic.All += traceEvent => + { + try + { + // Metrics + if (traceEvent.EventName.Equals("EventCounters")) + { + IDictionary payloadVal = (IDictionary)(traceEvent.PayloadValue(0)); + IDictionary payloadFields = (IDictionary)(payloadVal["Payload"]); + + //Make sure we are part of the requested series. If multiple clients request metrics, all of them get the metrics. + string series = payloadFields["Series"].ToString(); + if (GetInterval(series) != CounterIntervalSeconds * 1000) + { + return; + } + + string counterName = payloadFields["Name"].ToString(); + if (!_filter.IsIncluded(traceEvent.ProviderName, counterName)) + { + return; + } + + float intervalSec = (float)payloadFields["IntervalSec"]; + string displayName = payloadFields["DisplayName"].ToString(); + string displayUnits = payloadFields["DisplayUnits"].ToString(); + double value = 0; + CounterType counterType = CounterType.Metric; + + if (payloadFields["CounterType"].Equals("Mean")) + { + value = (double)payloadFields["Mean"]; + } + else if (payloadFields["CounterType"].Equals("Sum")) + { + counterType = CounterType.Rate; + value = (double)payloadFields["Increment"]; + if (string.IsNullOrEmpty(displayUnits)) + { + displayUnits = "count"; + } + //TODO Should we make these /sec like the dotnet-counters tool? + } + + // Note that dimensional data such as pod and namespace are automatically added in prometheus and azure monitor scenarios. + // We no longer added it here. + var counterPayload = new CounterPayload(traceEvent.TimeStamp, + traceEvent.ProviderName, + counterName, displayName, + displayUnits, + value, + counterType, + intervalSec); + + ExecuteCounterLoggerAction((metricLogger) => metricLogger.Log(counterPayload)); + } + } + catch (Exception) + { + } + }; + + using var sourceCompletedTaskSource = new EventTaskSource( + taskComplete => taskComplete, + handler => eventSource.Completed += handler, + handler => eventSource.Completed -= handler, + token); + + await sourceCompletedTaskSource.Task; + + ExecuteCounterLoggerAction((metricLogger) => metricLogger.PipelineStopped()); + } + + private static int GetInterval(string series) + { + const string comparison = "Interval="; + int interval = 0; + if (series.StartsWith(comparison, StringComparison.OrdinalIgnoreCase)) + { + int.TryParse(series.Substring(comparison.Length), out interval); + } + return interval; + } + + private void ExecuteCounterLoggerAction(Action action) + { + foreach (ICountersLogger logger in _loggers) + { + try + { + action(logger); + } + catch (ObjectDisposedException) + { + } + } + } + + private int CounterIntervalSeconds => (int)Settings.RefreshInterval.TotalSeconds; } } diff --git a/src/Microsoft.Diagnostics.Monitoring.EventPipe/DiagnosticsEventPipeProcessor.cs b/src/Microsoft.Diagnostics.Monitoring.EventPipe/DiagnosticsEventPipeProcessor.cs index 84bcde29f..6a24eb150 100644 --- a/src/Microsoft.Diagnostics.Monitoring.EventPipe/DiagnosticsEventPipeProcessor.cs +++ b/src/Microsoft.Diagnostics.Monitoring.EventPipe/DiagnosticsEventPipeProcessor.cs @@ -2,67 +2,34 @@ // The .NET Foundation licenses this file to you under the MIT license. // See the LICENSE file in the project root for more information. +using Microsoft.Diagnostics.NETCore.Client; +using Microsoft.Diagnostics.Tracing; using System; -using System.Collections.Generic; -using System.Globalization; using System.IO; -using System.Linq; -using System.Text.Json; using System.Threading; using System.Threading.Tasks; -using Graphs; -using Microsoft.Diagnostics.NETCore.Client; -using Microsoft.Diagnostics.Tracing; -using Microsoft.Diagnostics.Tracing.Extensions; -using Microsoft.Diagnostics.Tracing.Parsers.Clr; -using Microsoft.Extensions.Logging; namespace Microsoft.Diagnostics.Monitoring.EventPipe { internal partial class DiagnosticsEventPipeProcessor : IAsyncDisposable { - private readonly MemoryGraph _gcGraph; - private readonly ILoggerFactory _loggerFactory; - private readonly IEnumerable _metricLoggers; - private readonly PipeMode _mode; - private readonly int _metricIntervalSeconds; - private readonly CounterFilter _counterFilter; - private readonly LogLevel _logsLevel; - private readonly Func _processInfoCallback; - private readonly MonitoringSourceConfiguration _userConfig; - private readonly Func _onStreamAvailable; + private readonly MonitoringSourceConfiguration _configuration; + private readonly Func, CancellationToken, Task> _onEventSourceAvailable; private readonly object _lock = new object(); private TaskCompletionSource _sessionStarted; - private EventPipeEventSource _eventPipeSession; + private EventPipeEventSource _eventSource; private Func _stopFunc; private bool _disposed; public DiagnosticsEventPipeProcessor( - PipeMode mode, - ILoggerFactory loggerFactory = null, // PipeMode = Logs - LogLevel logsLevel = LogLevel.Debug, // PipeMode = Logs - IEnumerable metricLoggers = null, // PipeMode = Metrics - int metricIntervalSeconds = 10, // PipeMode = Metrics - CounterFilter metricFilter = null, // PipeMode = Metrics - MemoryGraph gcGraph = null, // PipeMode = GCDump - MonitoringSourceConfiguration configuration = null, // PipeMode = Nettrace - Func onStreamAvailable = null, // PipeMode = Nettrace - Func processInfoCallback = null // PipeMode = ProcessInfo + MonitoringSourceConfiguration configuration, + Func, CancellationToken, Task> onEventSourceAvailable ) { - _metricLoggers = metricLoggers ?? Enumerable.Empty(); - _mode = mode; - _loggerFactory = loggerFactory; - _gcGraph = gcGraph; - _metricIntervalSeconds = metricIntervalSeconds; - _logsLevel = logsLevel; - _processInfoCallback = processInfoCallback; - _userConfig = configuration; - _onStreamAvailable = onStreamAvailable; - _processInfoCallback = processInfoCallback; - _counterFilter = metricFilter; + _configuration = configuration ?? throw new ArgumentNullException(nameof(configuration)); + _onEventSourceAvailable = onEventSourceAvailable ?? throw new ArgumentNullException(nameof(onEventSourceAvailable)); _sessionStarted = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); } @@ -74,83 +41,23 @@ namespace Microsoft.Diagnostics.Monitoring.EventPipe await await Task.Factory.StartNew(async () => { EventPipeEventSource source = null; - DiagnosticsMonitor monitor = null; + EventPipeStreamProvider streamProvider = null; Task handleEventsTask = Task.CompletedTask; try { - MonitoringSourceConfiguration config = null; - if (_mode == PipeMode.Logs) - { - config = new LoggingSourceConfiguration(_logsLevel); - } - else if (_mode == PipeMode.Metrics) - { - config = new MetricSourceConfiguration(_metricIntervalSeconds, _counterFilter.GetProviders()); - } - else if (_mode == PipeMode.GCDump) - { - config = new GCDumpSourceConfiguration(); - } - else if (_mode == PipeMode.ProcessInfo) - { - config = new SampleProfilerConfiguration(); - } - else if (_mode == PipeMode.Nettrace) - { - config = _userConfig; - } - - monitor = new DiagnosticsMonitor(config); + streamProvider = new EventPipeStreamProvider(_configuration); // Allows the event handling routines to stop processing before the duration expires. - Func stopFunc = () => Task.Run(() => { monitor.StopProcessing(); }); - - Stream sessionStream = await monitor.ProcessEvents(client, duration, token); + Func stopFunc = () => Task.Run(() => { streamProvider.StopProcessing(); }); - if (_mode == PipeMode.Nettrace) - { - if (!_sessionStarted.TrySetResult(true)) - { - token.ThrowIfCancellationRequested(); - } - - lock (_lock) - { - //Save the stop function for later, so that we can stop a trace later. - _stopFunc = stopFunc; - } - - await _onStreamAvailable(sessionStream, token); - return; - } + Stream sessionStream = await streamProvider.ProcessEvents(client, duration, token); source = new EventPipeEventSource(sessionStream); + handleEventsTask = _onEventSourceAvailable(source, stopFunc, token); - if (_mode == PipeMode.Metrics) - { - // Metrics - HandleEventCounters(source); - } - else if (_mode == PipeMode.Logs) - { - // Logging - HandleLoggingEvents(source); - } - else if (_mode == PipeMode.GCDump) - { - // GC - handleEventsTask = HandleGCEvents(source, stopFunc, token); - } - - else if (_mode == PipeMode.ProcessInfo) - { - // ProcessInfo - handleEventsTask = HandleProcessInfo(source, stopFunc, token); - } - - lock(_lock) + lock (_lock) { - _eventPipeSession = source; + _eventSource = source; _stopFunc = stopFunc; } registration.Dispose(); @@ -168,20 +75,18 @@ namespace Microsoft.Diagnostics.Monitoring.EventPipe } finally { - ExecuteCounterLoggerAction((metricLogger) => metricLogger.PipelineStopped()); - registration.Dispose(); - EventPipeEventSource session = null; + EventPipeEventSource eventSource = null; lock (_lock) { - session = _eventPipeSession; - _eventPipeSession = null; + eventSource = _eventSource; + _eventSource = null; } - session?.Dispose(); - if (monitor != null) + eventSource?.Dispose(); + if (streamProvider != null) { - await monitor.DisposeAsync(); + await streamProvider.DisposeAsync(); } } @@ -202,7 +107,7 @@ namespace Microsoft.Diagnostics.Monitoring.EventPipe Func stopFunc = null; lock (_lock) { - session = _eventPipeSession; + session = _eventSource; stopFunc = _stopFunc; } if (session != null) @@ -216,366 +121,6 @@ namespace Microsoft.Diagnostics.Monitoring.EventPipe } } - private void HandleLoggingEvents(EventPipeEventSource source) - { - string lastFormattedMessage = string.Empty; - - var logActivities = new Dictionary(); - var stack = new Stack(); - - source.Dynamic.AddCallbackForProviderEvent(LoggingSourceConfiguration.MicrosoftExtensionsLoggingProviderName, "ActivityJson/Start", (traceEvent) => - { - var factoryId = (int)traceEvent.PayloadByName("FactoryID"); - var categoryName = (string)traceEvent.PayloadByName("LoggerName"); - var argsJson = (string)traceEvent.PayloadByName("ArgumentsJson"); - - // TODO: Store this information by logger factory id - var item = new LogActivityItem - { - ActivityID = traceEvent.ActivityID, - ScopedObject = new LogObject(JsonDocument.Parse(argsJson).RootElement), - }; - - if (stack.Count > 0) - { - Guid parentId = stack.Peek(); - if (logActivities.TryGetValue(parentId, out var parentItem)) - { - item.Parent = parentItem; - } - } - - stack.Push(traceEvent.ActivityID); - logActivities[traceEvent.ActivityID] = item; - }); - - source.Dynamic.AddCallbackForProviderEvent(LoggingSourceConfiguration.MicrosoftExtensionsLoggingProviderName, "ActivityJson/Stop", (traceEvent) => - { - var factoryId = (int)traceEvent.PayloadByName("FactoryID"); - var categoryName = (string)traceEvent.PayloadByName("LoggerName"); - - stack.Pop(); - logActivities.Remove(traceEvent.ActivityID); - }); - - source.Dynamic.AddCallbackForProviderEvent(LoggingSourceConfiguration.MicrosoftExtensionsLoggingProviderName, "MessageJson", (traceEvent) => - { - // Level, FactoryID, LoggerName, EventID, EventName, ExceptionJson, ArgumentsJson - var logLevel = (LogLevel)traceEvent.PayloadByName("Level"); - var factoryId = (int)traceEvent.PayloadByName("FactoryID"); - var categoryName = (string)traceEvent.PayloadByName("LoggerName"); - var eventId = (int)traceEvent.PayloadByName("EventId"); - var eventName = (string)traceEvent.PayloadByName("EventName"); - var exceptionJson = (string)traceEvent.PayloadByName("ExceptionJson"); - var argsJson = (string)traceEvent.PayloadByName("ArgumentsJson"); - - // There's a bug that causes some of the columns to get mixed up - if (eventName.StartsWith("{")) - { - argsJson = exceptionJson; - exceptionJson = eventName; - eventName = null; - } - - if (string.IsNullOrEmpty(argsJson)) - { - return; - } - - Exception exception = null; - - ILogger logger = _loggerFactory.CreateLogger(categoryName); - var scopes = new List(); - - if (logActivities.TryGetValue(traceEvent.ActivityID, out var logActivityItem)) - { - // REVIEW: Does order matter here? We're combining everything anyways. - while (logActivityItem != null) - { - scopes.Add(logger.BeginScope(logActivityItem.ScopedObject)); - - logActivityItem = logActivityItem.Parent; - } - } - - try - { - if (exceptionJson != "{}") - { - var exceptionMessage = JsonSerializer.Deserialize(exceptionJson); - exception = new LoggerException(exceptionMessage); - } - - var message = JsonSerializer.Deserialize(argsJson); - if (message.TryGetProperty("{OriginalFormat}", out var formatElement)) - { - var formatString = formatElement.GetString(); - var formatter = new LogValuesFormatter(formatString); - object[] args = new object[formatter.ValueNames.Count]; - for (int i = 0; i < args.Length; i++) - { - args[i] = message.GetProperty(formatter.ValueNames[i]).GetString(); - } - - logger.Log(logLevel, new EventId(eventId, eventName), exception, formatString, args); - } - else - { - var obj = new LogObject(message, lastFormattedMessage); - logger.Log(logLevel, new EventId(eventId, eventName), obj, exception, LogObject.Callback); - } - } - catch (Exception) - { - } - finally - { - scopes.ForEach(d => d.Dispose()); - } - }); - - source.Dynamic.AddCallbackForProviderEvent(LoggingSourceConfiguration.MicrosoftExtensionsLoggingProviderName, "FormattedMessage", (traceEvent) => - { - // Level, FactoryID, LoggerName, EventID, EventName, FormattedMessage - var logLevel = (LogLevel)traceEvent.PayloadByName("Level"); - var factoryId = (int)traceEvent.PayloadByName("FactoryID"); - var categoryName = (string)traceEvent.PayloadByName("LoggerName"); - var eventId = (int)traceEvent.PayloadByName("EventId"); - var eventName = (string)traceEvent.PayloadByName("EventName"); - var formattedMessage = (string)traceEvent.PayloadByName("FormattedMessage"); - - if (string.IsNullOrEmpty(formattedMessage)) - { - formattedMessage = eventName; - eventName = string.Empty; - } - - lastFormattedMessage = formattedMessage; - }); - } - - private void HandleEventCounters(EventPipeEventSource source) - { - ExecuteCounterLoggerAction((metricLogger) => metricLogger.PipelineStarted()); - - source.Dynamic.All += traceEvent => - { - try - { - // Metrics - if (traceEvent.EventName.Equals("EventCounters")) - { - IDictionary payloadVal = (IDictionary)(traceEvent.PayloadValue(0)); - IDictionary payloadFields = (IDictionary)(payloadVal["Payload"]); - - //Make sure we are part of the requested series. If multiple clients request metrics, all of them get the metrics. - string series = payloadFields["Series"].ToString(); - if (GetInterval(series) != _metricIntervalSeconds * 1000) - { - return; - } - - string counterName = payloadFields["Name"].ToString(); - if (!_counterFilter.IsIncluded(traceEvent.ProviderName, counterName)) - { - return; - } - - float intervalSec = (float)payloadFields["IntervalSec"]; - string displayName = payloadFields["DisplayName"].ToString(); - string displayUnits = payloadFields["DisplayUnits"].ToString(); - double value = 0; - CounterType counterType = CounterType.Metric; - - if (payloadFields["CounterType"].Equals("Mean")) - { - value = (double)payloadFields["Mean"]; - } - else if (payloadFields["CounterType"].Equals("Sum")) - { - counterType = CounterType.Rate; - value = (double)payloadFields["Increment"]; - if (string.IsNullOrEmpty(displayUnits)) - { - displayUnits = "count"; - } - //TODO Should we make these /sec like the dotnet-counters tool? - } - - // Note that dimensional data such as pod and namespace are automatically added in prometheus and azure monitor scenarios. - // We no longer added it here. - var counterPayload = new CounterPayload(traceEvent.TimeStamp, - traceEvent.ProviderName, - counterName, displayName, - displayUnits, - value, - counterType, - intervalSec); - - ExecuteCounterLoggerAction((metricLogger) => metricLogger.Log(counterPayload)); - } - } - catch (Exception) - { - } - }; - } - - private static int GetInterval(string series) - { - const string comparison = "Interval="; - int interval = 0; - if (series.StartsWith(comparison, StringComparison.OrdinalIgnoreCase)) - { - int.TryParse(series.Substring(comparison.Length), out interval); - } - return interval; - } - - private void ExecuteCounterLoggerAction(Action action) - { - foreach (ICountersLogger metricLogger in _metricLoggers) - { - try - { - action(metricLogger); - } - catch (ObjectDisposedException) - { - } - } - } - - private async Task HandleGCEvents(EventPipeEventSource source, Func stopFunc, CancellationToken token) - { - int gcNum = -1; - - Action gcStartHandler = (GCStartTraceData data, Action taskComplete) => - { - taskComplete(); - - if (gcNum < 0 && data.Depth == 2 && data.Type != GCType.BackgroundGC) - { - gcNum = data.Count; - } - }; - - Action gcBulkNodeHandler = (GCBulkNodeTraceData data, Action taskComplete) => - { - taskComplete(); - }; - - Action gcEndHandler = (GCEndTraceData data, Action taskComplete) => - { - if (data.Count == gcNum) - { - taskComplete(); - } - }; - - // Register event handlers on the event source and represent their completion as tasks - using var gcStartTaskSource = new EventTaskSource>( - taskComplete => data => gcStartHandler(data, taskComplete), - handler => source.Clr.GCStart += handler, - handler => source.Clr.GCStart -= handler, - token); - using var gcBulkNodeTaskSource = new EventTaskSource>( - taskComplete => data => gcBulkNodeHandler(data, taskComplete), - handler => source.Clr.GCBulkNode += handler, - handler => source.Clr.GCBulkNode -= handler, - token); - using var gcStopTaskSource = new EventTaskSource>( - taskComplete => data => gcEndHandler(data, taskComplete), - handler => source.Clr.GCStop += handler, - handler => source.Clr.GCStop -= handler, - token); - using var sourceCompletedTaskSource = new EventTaskSource( - taskComplete => taskComplete, - handler => source.Completed += handler, - handler => source.Completed -= handler, - token); - - // A task that is completed whenever GC data is received - Task gcDataTask = Task.WhenAny(gcStartTaskSource.Task, gcBulkNodeTaskSource.Task); - Task gcStopTask = gcStopTaskSource.Task; - - DotNetHeapDumpGraphReader dumper = new DotNetHeapDumpGraphReader(TextWriter.Null) - { - DotNetHeapInfo = new DotNetHeapInfo() - }; - dumper.SetupCallbacks(_gcGraph, source); - - // The event source will not always provide the GC events when it starts listening. However, - // they will be provided when the event source is told to stop processing events. Give the - // event source some time to produce the events, but if it doesn't start producing them within - // a short amount of time (5 seconds), then stop processing events to allow them to be flushed. - Task eventsTimeoutTask = Task.Delay(TimeSpan.FromSeconds(5), token); - Task completedTask = await Task.WhenAny(gcDataTask, eventsTimeoutTask); - - token.ThrowIfCancellationRequested(); - - // If started receiving GC events, wait for the GC Stop event. - if (completedTask == gcDataTask) - { - await gcStopTask; - } - - // Stop receiving events; if haven't received events yet, this will force flushing of events. - await stopFunc(); - - // Wait for all received events to be processed. - await sourceCompletedTaskSource.Task; - - // Check that GC data and stop events were received. This is done by checking that the - // associated tasks have ran to completion. If one of them has not reached the completion state, then - // fail the GC dump operation. - if (gcDataTask.Status != TaskStatus.RanToCompletion || - gcStopTask.Status != TaskStatus.RanToCompletion) - { - throw new InvalidOperationException("Unable to create GC dump due to incomplete GC data."); - } - - dumper.ConvertHeapDataToGraph(); - - _gcGraph.AllowReading(); - } - - private async Task HandleProcessInfo(EventPipeEventSource source, Func stopFunc, CancellationToken token) - { - string commandLine = null; - Action processInfoHandler = (TraceEvent traceEvent, Action taskComplete) => - { - commandLine = (string)traceEvent.PayloadByName("CommandLine"); - taskComplete(); - }; - - // Completed when the ProcessInfo event of the Microsoft-DotNETCore-EventPipe event provider is handled - using var processInfoTaskSource = new EventTaskSource>( - taskComplete => traceEvent => processInfoHandler(traceEvent, taskComplete), - handler => source.Dynamic.AddCallbackForProviderEvent(MonitoringSourceConfiguration.EventPipeProviderName, "ProcessInfo", handler), - handler => source.Dynamic.RemoveCallback(handler), - token); - - // Completed when any trace event is handled - using var anyEventTaskSource = new EventTaskSource>( - taskComplete => traceEvent => taskComplete(), - handler => source.Dynamic.All += handler, - handler => source.Dynamic.All -= handler, - token); - - // Wait for any trace event to be processed - await anyEventTaskSource.Task; - - // Stop the event pipe session - await stopFunc(); - - // Wait for the ProcessInfo event to be processed - await processInfoTaskSource.Task; - - // Notify of command line information - await _processInfoCallback(commandLine, token); - } - public async ValueTask DisposeAsync() { lock (_lock) @@ -596,16 +141,7 @@ namespace Microsoft.Diagnostics.Monitoring.EventPipe { } - _eventPipeSession?.Dispose(); - } - - private class LogActivityItem - { - public Guid ActivityID { get; set; } - - public LogObject ScopedObject { get; set; } - - public LogActivityItem Parent { get; set; } + _eventSource?.Dispose(); } } } diff --git a/src/Microsoft.Diagnostics.Monitoring.EventPipe/DiagnosticsMonitor.cs b/src/Microsoft.Diagnostics.Monitoring.EventPipe/DiagnosticsMonitor.cs deleted file mode 100644 index 172b4add5..000000000 --- a/src/Microsoft.Diagnostics.Monitoring.EventPipe/DiagnosticsMonitor.cs +++ /dev/null @@ -1,110 +0,0 @@ -// Licensed to the .NET Foundation under one or more agreements. -// The .NET Foundation licenses this file to you under the MIT license. -// See the LICENSE file in the project root for more information. - -using System; -using System.IO; -using System.Threading; -using System.Threading.Tasks; -using Microsoft.Diagnostics.NETCore.Client; - -namespace Microsoft.Diagnostics.Monitoring.EventPipe -{ - partial class DiagnosticsEventPipeProcessor - { - private sealed class DiagnosticsMonitor : IAsyncDisposable - { - private readonly MonitoringSourceConfiguration _sourceConfig; - private readonly TaskCompletionSource _stopProcessingSource; - private Task _currentTask; - - public DiagnosticsMonitor(MonitoringSourceConfiguration sourceConfig) - { - _sourceConfig = sourceConfig; - _stopProcessingSource = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); - } - - public Task ProcessEvents(DiagnosticsClient client, TimeSpan duration, CancellationToken cancellationToken) - { - cancellationToken.ThrowIfCancellationRequested(); - - EventPipeSession session = null; - try - { - session = client.StartEventPipeSession(_sourceConfig.GetProviders(), _sourceConfig.RequestRundown, _sourceConfig.BufferSizeInMB); - } - catch (EndOfStreamException e) - { - throw new InvalidOperationException("End of stream", e); - } - catch (Exception ex) when (!(ex is OperationCanceledException)) - { - throw new InvalidOperationException("Failed to start the event pipe session", ex); - } - - _currentTask = Task.Run(async () => - { - using var linkedSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken); - linkedSource.CancelAfter(duration); - using var _ = linkedSource.Token.Register(() => _stopProcessingSource.TrySetResult(null)); - - // Use TaskCompletionSource instead of Task.Delay with cancellation to avoid - // using exceptions for normal termination of event stream. - await _stopProcessingSource.Task.ConfigureAwait(false); - StopSession(session); - }); - - return Task.FromResult(session.EventStream); - } - - public void StopProcessing() - { - _stopProcessingSource.TrySetResult(null); - } - - private static void StopSession(EventPipeSession session) - { - try - { - session.Stop(); - } - catch (EndOfStreamException) - { - // If the app we're monitoring exits abruptly, this may throw in which case we just swallow the exception and exit gracefully. - } - // We may time out if the process ended before we sent StopTracing command. We can just exit in that case. - catch (TimeoutException) - { - } - // On Unix platforms, we may actually get a PNSE since the pipe is gone with the process, and Runtime Client Library - // does not know how to distinguish a situation where there is no pipe to begin with, or where the process has exited - // before collection started and got rid of a pipe that once existed. - // Since we are catching this at the end of a session we know that the pipe once existed (otherwise the exception would've - // been thrown at the beginning directly) - catch (PlatformNotSupportedException) - { - } - // On non-abrupt exits, the socket may be already closed by the runtime and we won't be able to send a stop request through it. - catch (ServerNotAvailableException) - { - } - } - - public async ValueTask DisposeAsync() - { - Task currentTask = _currentTask; - _stopProcessingSource.TrySetResult(null); - if (currentTask != null) - { - try - { - await currentTask.ConfigureAwait(false); - } - catch (OperationCanceledException) - { - } - } - } - } - } -} \ No newline at end of file diff --git a/src/Microsoft.Diagnostics.Monitoring.EventPipe/EventPipeStreamProvider.cs b/src/Microsoft.Diagnostics.Monitoring.EventPipe/EventPipeStreamProvider.cs new file mode 100644 index 000000000..545d92a14 --- /dev/null +++ b/src/Microsoft.Diagnostics.Monitoring.EventPipe/EventPipeStreamProvider.cs @@ -0,0 +1,107 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT license. +// See the LICENSE file in the project root for more information. + +using System; +using System.IO; +using System.Threading; +using System.Threading.Tasks; +using Microsoft.Diagnostics.NETCore.Client; + +namespace Microsoft.Diagnostics.Monitoring.EventPipe +{ + internal sealed class EventPipeStreamProvider : IAsyncDisposable + { + private readonly MonitoringSourceConfiguration _sourceConfig; + private readonly TaskCompletionSource _stopProcessingSource; + private Task _currentTask; + + public EventPipeStreamProvider(MonitoringSourceConfiguration sourceConfig) + { + _sourceConfig = sourceConfig; + _stopProcessingSource = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + } + + public Task ProcessEvents(DiagnosticsClient client, TimeSpan duration, CancellationToken cancellationToken) + { + cancellationToken.ThrowIfCancellationRequested(); + + EventPipeSession session = null; + try + { + session = client.StartEventPipeSession(_sourceConfig.GetProviders(), _sourceConfig.RequestRundown, _sourceConfig.BufferSizeInMB); + } + catch (EndOfStreamException e) + { + throw new InvalidOperationException("End of stream", e); + } + catch (Exception ex) when (!(ex is OperationCanceledException)) + { + throw new InvalidOperationException("Failed to start the event pipe session", ex); + } + + _currentTask = Task.Run(async () => + { + using var linkedSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken); + linkedSource.CancelAfter(duration); + using var _ = linkedSource.Token.Register(() => _stopProcessingSource.TrySetResult(null)); + + // Use TaskCompletionSource instead of Task.Delay with cancellation to avoid + // using exceptions for normal termination of event stream. + await _stopProcessingSource.Task.ConfigureAwait(false); + StopSession(session); + }); + + return Task.FromResult(session.EventStream); + } + + public void StopProcessing() + { + _stopProcessingSource.TrySetResult(null); + } + + private static void StopSession(EventPipeSession session) + { + try + { + session.Stop(); + } + catch (EndOfStreamException) + { + // If the app we're monitoring exits abruptly, this may throw in which case we just swallow the exception and exit gracefully. + } + // We may time out if the process ended before we sent StopTracing command. We can just exit in that case. + catch (TimeoutException) + { + } + // On Unix platforms, we may actually get a PNSE since the pipe is gone with the process, and Runtime Client Library + // does not know how to distinguish a situation where there is no pipe to begin with, or where the process has exited + // before collection started and got rid of a pipe that once existed. + // Since we are catching this at the end of a session we know that the pipe once existed (otherwise the exception would've + // been thrown at the beginning directly) + catch (PlatformNotSupportedException) + { + } + // On non-abrupt exits, the socket may be already closed by the runtime and we won't be able to send a stop request through it. + catch (ServerNotAvailableException) + { + } + } + + public async ValueTask DisposeAsync() + { + Task currentTask = _currentTask; + _stopProcessingSource.TrySetResult(null); + if (currentTask != null) + { + try + { + await currentTask.ConfigureAwait(false); + } + catch (OperationCanceledException) + { + } + } + } + } +} \ No newline at end of file diff --git a/src/Microsoft.Diagnostics.Monitoring.EventPipe/EventSourcePipeline.cs b/src/Microsoft.Diagnostics.Monitoring.EventPipe/EventSourcePipeline.cs index 85275d931..d05a9778a 100644 --- a/src/Microsoft.Diagnostics.Monitoring.EventPipe/EventSourcePipeline.cs +++ b/src/Microsoft.Diagnostics.Monitoring.EventPipe/EventSourcePipeline.cs @@ -2,13 +2,9 @@ // The .NET Foundation licenses this file to you under the MIT license. // See the LICENSE file in the project root for more information. -using Graphs; using Microsoft.Diagnostics.NETCore.Client; -using Microsoft.Extensions.Logging; +using Microsoft.Diagnostics.Tracing; using System; -using System.Collections.Generic; -using System.Linq; -using System.Text; using System.Threading; using System.Threading.Tasks; @@ -27,7 +23,14 @@ namespace Microsoft.Diagnostics.Monitoring.EventPipe _processor = new Lazy(CreateProcessor); } - internal abstract DiagnosticsEventPipeProcessor CreateProcessor(); + protected abstract MonitoringSourceConfiguration CreateConfiguration(); + + private DiagnosticsEventPipeProcessor CreateProcessor() + { + return new DiagnosticsEventPipeProcessor( + configuration: CreateConfiguration(), + onEventSourceAvailable: OnEventSourceAvailable); + } protected override Task OnRun(CancellationToken token) { @@ -61,5 +64,10 @@ namespace Microsoft.Diagnostics.Monitoring.EventPipe await Task.WhenAny(stoppingTask, taskCompletionSource.Task).Unwrap(); } } + + protected virtual Task OnEventSourceAvailable(EventPipeEventSource eventSource, Func stopSessionAsync, CancellationToken token) + { + return Task.CompletedTask; + } } } diff --git a/src/Microsoft.Diagnostics.Monitoring.EventPipe/GCDump/EventGCDumpPipeline.cs b/src/Microsoft.Diagnostics.Monitoring.EventPipe/GCDump/EventGCDumpPipeline.cs index ef4d8aeb4..af615fecf 100644 --- a/src/Microsoft.Diagnostics.Monitoring.EventPipe/GCDump/EventGCDumpPipeline.cs +++ b/src/Microsoft.Diagnostics.Monitoring.EventPipe/GCDump/EventGCDumpPipeline.cs @@ -4,10 +4,10 @@ using Graphs; using Microsoft.Diagnostics.NETCore.Client; +using Microsoft.Diagnostics.Tracing; using Microsoft.Diagnostics.Tracing.Parsers.Clr; using System; -using System.Collections.Generic; -using System.Text; +using System.IO; using System.Threading; using System.Threading.Tasks; @@ -22,9 +22,103 @@ namespace Microsoft.Diagnostics.Monitoring.EventPipe _gcGraph = gcGraph ?? throw new ArgumentNullException(nameof(gcGraph)); } - internal override DiagnosticsEventPipeProcessor CreateProcessor() + protected override MonitoringSourceConfiguration CreateConfiguration() { - return new DiagnosticsEventPipeProcessor(PipeMode.GCDump, gcGraph: _gcGraph); + return new GCDumpSourceConfiguration(); + } + + protected override async Task OnEventSourceAvailable(EventPipeEventSource eventSource, Func stopSessionAsync, CancellationToken token) + { + int gcNum = -1; + + Action gcStartHandler = (GCStartTraceData data, Action taskComplete) => + { + taskComplete(); + + if (gcNum < 0 && data.Depth == 2 && data.Type != GCType.BackgroundGC) + { + gcNum = data.Count; + } + }; + + Action gcBulkNodeHandler = (GCBulkNodeTraceData data, Action taskComplete) => + { + taskComplete(); + }; + + Action gcEndHandler = (GCEndTraceData data, Action taskComplete) => + { + if (data.Count == gcNum) + { + taskComplete(); + } + }; + + // Register event handlers on the event source and represent their completion as tasks + using var gcStartTaskSource = new EventTaskSource>( + taskComplete => data => gcStartHandler(data, taskComplete), + handler => eventSource.Clr.GCStart += handler, + handler => eventSource.Clr.GCStart -= handler, + token); + using var gcBulkNodeTaskSource = new EventTaskSource>( + taskComplete => data => gcBulkNodeHandler(data, taskComplete), + handler => eventSource.Clr.GCBulkNode += handler, + handler => eventSource.Clr.GCBulkNode -= handler, + token); + using var gcStopTaskSource = new EventTaskSource>( + taskComplete => data => gcEndHandler(data, taskComplete), + handler => eventSource.Clr.GCStop += handler, + handler => eventSource.Clr.GCStop -= handler, + token); + using var sourceCompletedTaskSource = new EventTaskSource( + taskComplete => taskComplete, + handler => eventSource.Completed += handler, + handler => eventSource.Completed -= handler, + token); + + // A task that is completed whenever GC data is received + Task gcDataTask = Task.WhenAny(gcStartTaskSource.Task, gcBulkNodeTaskSource.Task); + Task gcStopTask = gcStopTaskSource.Task; + + DotNetHeapDumpGraphReader dumper = new DotNetHeapDumpGraphReader(TextWriter.Null) + { + DotNetHeapInfo = new DotNetHeapInfo() + }; + dumper.SetupCallbacks(_gcGraph, eventSource); + + // The event source will not always provide the GC events when it starts listening. However, + // they will be provided when the event source is told to stop processing events. Give the + // event source some time to produce the events, but if it doesn't start producing them within + // a short amount of time (5 seconds), then stop processing events to allow them to be flushed. + Task eventsTimeoutTask = Task.Delay(TimeSpan.FromSeconds(5), token); + Task completedTask = await Task.WhenAny(gcDataTask, eventsTimeoutTask); + + token.ThrowIfCancellationRequested(); + + // If started receiving GC events, wait for the GC Stop event. + if (completedTask == gcDataTask) + { + await gcStopTask; + } + + // Stop receiving events; if haven't received events yet, this will force flushing of events. + await stopSessionAsync(); + + // Wait for all received events to be processed. + await sourceCompletedTaskSource.Task; + + // Check that GC data and stop events were received. This is done by checking that the + // associated tasks have ran to completion. If one of them has not reached the completion state, then + // fail the GC dump operation. + if (gcDataTask.Status != TaskStatus.RanToCompletion || + gcStopTask.Status != TaskStatus.RanToCompletion) + { + throw new InvalidOperationException("Unable to create GC dump due to incomplete GC data."); + } + + dumper.ConvertHeapDataToGraph(); + + _gcGraph.AllowReading(); } } } diff --git a/src/Microsoft.Diagnostics.Monitoring.EventPipe/Logs/EventLogsPipeline.cs b/src/Microsoft.Diagnostics.Monitoring.EventPipe/Logs/EventLogsPipeline.cs index 7a0ac11f2..62ddc75f0 100644 --- a/src/Microsoft.Diagnostics.Monitoring.EventPipe/Logs/EventLogsPipeline.cs +++ b/src/Microsoft.Diagnostics.Monitoring.EventPipe/Logs/EventLogsPipeline.cs @@ -3,10 +3,11 @@ // See the LICENSE file in the project root for more information. using Microsoft.Diagnostics.NETCore.Client; +using Microsoft.Diagnostics.Tracing; using Microsoft.Extensions.Logging; using System; using System.Collections.Generic; -using System.Text; +using System.Text.Json; using System.Threading; using System.Threading.Tasks; @@ -21,9 +22,158 @@ namespace Microsoft.Diagnostics.Monitoring.EventPipe _factory = factory ?? throw new ArgumentNullException(nameof(factory)); } - internal override DiagnosticsEventPipeProcessor CreateProcessor() + protected override MonitoringSourceConfiguration CreateConfiguration() { - return new DiagnosticsEventPipeProcessor(PipeMode.Logs, loggerFactory: _factory, logsLevel: Settings.LogLevel); + return new LoggingSourceConfiguration(Settings.LogLevel); + } + + protected override Task OnEventSourceAvailable(EventPipeEventSource eventSource, Func stopSessionAsync, CancellationToken token) + { + string lastFormattedMessage = string.Empty; + + var logActivities = new Dictionary(); + var stack = new Stack(); + + eventSource.Dynamic.AddCallbackForProviderEvent(LoggingSourceConfiguration.MicrosoftExtensionsLoggingProviderName, "ActivityJson/Start", (traceEvent) => + { + var factoryId = (int)traceEvent.PayloadByName("FactoryID"); + var categoryName = (string)traceEvent.PayloadByName("LoggerName"); + var argsJson = (string)traceEvent.PayloadByName("ArgumentsJson"); + + // TODO: Store this information by logger factory id + var item = new LogActivityItem + { + ActivityID = traceEvent.ActivityID, + ScopedObject = new LogObject(JsonDocument.Parse(argsJson).RootElement), + }; + + if (stack.Count > 0) + { + Guid parentId = stack.Peek(); + if (logActivities.TryGetValue(parentId, out var parentItem)) + { + item.Parent = parentItem; + } + } + + stack.Push(traceEvent.ActivityID); + logActivities[traceEvent.ActivityID] = item; + }); + + eventSource.Dynamic.AddCallbackForProviderEvent(LoggingSourceConfiguration.MicrosoftExtensionsLoggingProviderName, "ActivityJson/Stop", (traceEvent) => + { + var factoryId = (int)traceEvent.PayloadByName("FactoryID"); + var categoryName = (string)traceEvent.PayloadByName("LoggerName"); + + stack.Pop(); + logActivities.Remove(traceEvent.ActivityID); + }); + + eventSource.Dynamic.AddCallbackForProviderEvent(LoggingSourceConfiguration.MicrosoftExtensionsLoggingProviderName, "MessageJson", (traceEvent) => + { + // Level, FactoryID, LoggerName, EventID, EventName, ExceptionJson, ArgumentsJson + var logLevel = (LogLevel)traceEvent.PayloadByName("Level"); + var factoryId = (int)traceEvent.PayloadByName("FactoryID"); + var categoryName = (string)traceEvent.PayloadByName("LoggerName"); + var eventId = (int)traceEvent.PayloadByName("EventId"); + var eventName = (string)traceEvent.PayloadByName("EventName"); + var exceptionJson = (string)traceEvent.PayloadByName("ExceptionJson"); + var argsJson = (string)traceEvent.PayloadByName("ArgumentsJson"); + + // There's a bug that causes some of the columns to get mixed up + if (eventName.StartsWith("{")) + { + argsJson = exceptionJson; + exceptionJson = eventName; + eventName = null; + } + + if (string.IsNullOrEmpty(argsJson)) + { + return; + } + + Exception exception = null; + + ILogger logger = _factory.CreateLogger(categoryName); + var scopes = new List(); + + if (logActivities.TryGetValue(traceEvent.ActivityID, out var logActivityItem)) + { + // REVIEW: Does order matter here? We're combining everything anyways. + while (logActivityItem != null) + { + scopes.Add(logger.BeginScope(logActivityItem.ScopedObject)); + + logActivityItem = logActivityItem.Parent; + } + } + + try + { + if (exceptionJson != "{}") + { + var exceptionMessage = JsonSerializer.Deserialize(exceptionJson); + exception = new LoggerException(exceptionMessage); + } + + var message = JsonSerializer.Deserialize(argsJson); + if (message.TryGetProperty("{OriginalFormat}", out var formatElement)) + { + var formatString = formatElement.GetString(); + var formatter = new LogValuesFormatter(formatString); + object[] args = new object[formatter.ValueNames.Count]; + for (int i = 0; i < args.Length; i++) + { + args[i] = message.GetProperty(formatter.ValueNames[i]).GetString(); + } + + logger.Log(logLevel, new EventId(eventId, eventName), exception, formatString, args); + } + else + { + var obj = new LogObject(message, lastFormattedMessage); + logger.Log(logLevel, new EventId(eventId, eventName), obj, exception, LogObject.Callback); + } + } + catch (Exception) + { + } + finally + { + scopes.ForEach(d => d.Dispose()); + } + }); + + eventSource.Dynamic.AddCallbackForProviderEvent(LoggingSourceConfiguration.MicrosoftExtensionsLoggingProviderName, "FormattedMessage", (traceEvent) => + { + // Level, FactoryID, LoggerName, EventID, EventName, FormattedMessage + var logLevel = (LogLevel)traceEvent.PayloadByName("Level"); + var factoryId = (int)traceEvent.PayloadByName("FactoryID"); + var categoryName = (string)traceEvent.PayloadByName("LoggerName"); + var eventId = (int)traceEvent.PayloadByName("EventId"); + var eventName = (string)traceEvent.PayloadByName("EventName"); + var formattedMessage = (string)traceEvent.PayloadByName("FormattedMessage"); + + if (string.IsNullOrEmpty(formattedMessage)) + { + formattedMessage = eventName; + eventName = string.Empty; + } + + lastFormattedMessage = formattedMessage; + }); + + return Task.CompletedTask; + } + + private class LogActivityItem + { + public Guid ActivityID { get; set; } + + public LogObject ScopedObject { get; set; } + + public LogActivityItem Parent { get; set; } } } } diff --git a/src/Microsoft.Diagnostics.Monitoring.EventPipe/PipeMode.cs b/src/Microsoft.Diagnostics.Monitoring.EventPipe/PipeMode.cs deleted file mode 100644 index ca2a4c0f4..000000000 --- a/src/Microsoft.Diagnostics.Monitoring.EventPipe/PipeMode.cs +++ /dev/null @@ -1,15 +0,0 @@ -// Licensed to the .NET Foundation under one or more agreements. -// The .NET Foundation licenses this file to you under the MIT license. -// See the LICENSE file in the project root for more information. - -namespace Microsoft.Diagnostics.Monitoring.EventPipe -{ - internal enum PipeMode - { - Logs = 1, - Metrics, - GCDump, - ProcessInfo, - Nettrace, - } -} diff --git a/src/Microsoft.Diagnostics.Monitoring.EventPipe/ProcessInfo/EventProcessInfoPipeline.cs b/src/Microsoft.Diagnostics.Monitoring.EventPipe/ProcessInfo/EventProcessInfoPipeline.cs index ca7b4230f..bc4f72b43 100644 --- a/src/Microsoft.Diagnostics.Monitoring.EventPipe/ProcessInfo/EventProcessInfoPipeline.cs +++ b/src/Microsoft.Diagnostics.Monitoring.EventPipe/ProcessInfo/EventProcessInfoPipeline.cs @@ -20,9 +20,45 @@ namespace Microsoft.Diagnostics.Monitoring.EventPipe _onCommandLine = onCommandLine ?? throw new ArgumentNullException(nameof(onCommandLine)); } - internal override DiagnosticsEventPipeProcessor CreateProcessor() => - new DiagnosticsEventPipeProcessor( - PipeMode.ProcessInfo, - processInfoCallback: _onCommandLine); + protected override MonitoringSourceConfiguration CreateConfiguration() + { + return new SampleProfilerConfiguration(); + } + + protected override async Task OnEventSourceAvailable(EventPipeEventSource eventSource, Func stopSessionAsync, CancellationToken token) + { + string commandLine = null; + Action processInfoHandler = (TraceEvent traceEvent, Action taskComplete) => + { + commandLine = (string)traceEvent.PayloadByName("CommandLine"); + taskComplete(); + }; + + // Completed when the ProcessInfo event of the Microsoft-DotNETCore-EventPipe event provider is handled + using var processInfoTaskSource = new EventTaskSource>( + taskComplete => traceEvent => processInfoHandler(traceEvent, taskComplete), + handler => eventSource.Dynamic.AddCallbackForProviderEvent(MonitoringSourceConfiguration.EventPipeProviderName, "ProcessInfo", handler), + handler => eventSource.Dynamic.RemoveCallback(handler), + token); + + // Completed when any trace event is handled + using var anyEventTaskSource = new EventTaskSource>( + taskComplete => traceEvent => taskComplete(), + handler => eventSource.Dynamic.All += handler, + handler => eventSource.Dynamic.All -= handler, + token); + + // Wait for any trace event to be processed + await anyEventTaskSource.Task; + + // Stop the event pipe session + await stopSessionAsync(); + + // Wait for the ProcessInfo event to be processed + await processInfoTaskSource.Task; + + // Notify of command line information + await _onCommandLine(commandLine, token); + } } } \ No newline at end of file diff --git a/src/Microsoft.Diagnostics.Monitoring.EventPipe/Trace/EventTracePipeline.cs b/src/Microsoft.Diagnostics.Monitoring.EventPipe/Trace/EventTracePipeline.cs index 68bd69cfc..dfea41b33 100644 --- a/src/Microsoft.Diagnostics.Monitoring.EventPipe/Trace/EventTracePipeline.cs +++ b/src/Microsoft.Diagnostics.Monitoring.EventPipe/Trace/EventTracePipeline.cs @@ -4,26 +4,63 @@ using Microsoft.Diagnostics.NETCore.Client; using System; -using System.Collections.Generic; using System.IO; -using System.Text; using System.Threading; using System.Threading.Tasks; namespace Microsoft.Diagnostics.Monitoring.EventPipe { - internal class EventTracePipeline : EventSourcePipeline + internal class EventTracePipeline : Pipeline { - private readonly Func _streamAvailable; - public EventTracePipeline(DiagnosticsClient client, EventTracePipelineSettings settings, Func streamAvailable) - : base(client, settings) + private readonly Lazy _provider; + private readonly Func _onStreamAvailable; + + public DiagnosticsClient Client { get; } + public EventTracePipelineSettings Settings { get; } + + public EventTracePipeline(DiagnosticsClient client, EventTracePipelineSettings settings, Func onStreamAvailable) + { + Client = client ?? throw new ArgumentNullException(nameof(client)); + Settings = settings ?? throw new ArgumentNullException(nameof(settings)); + _onStreamAvailable = onStreamAvailable ?? throw new ArgumentNullException(nameof(onStreamAvailable)); + _provider = new Lazy(CreateProvider); + } + + protected override async Task OnRun(CancellationToken token) + { + try + { + Stream eventStream = await _provider.Value.ProcessEvents(Client, Settings.Duration, token); + + await _onStreamAvailable(eventStream, token); + } + catch (InvalidOperationException e) + { + throw new PipelineException(e.Message, e); + } + } + + protected override async Task OnCleanup() + { + if (_provider.IsValueCreated) + { + await _provider.Value.DisposeAsync(); + } + await base.OnCleanup(); + } + + protected override Task OnStop(CancellationToken token) { - _streamAvailable = streamAvailable ?? throw new ArgumentNullException(nameof(streamAvailable)); + if (_provider.IsValueCreated) + { + _provider.Value.StopProcessing(); + } + return Task.CompletedTask; } - internal override DiagnosticsEventPipeProcessor CreateProcessor() + private EventPipeStreamProvider CreateProvider() { - return new DiagnosticsEventPipeProcessor(PipeMode.Nettrace, configuration: Settings.Configuration, onStreamAvailable: _streamAvailable); + return new EventPipeStreamProvider(Settings.Configuration); } } } diff --git a/src/Microsoft.Diagnostics.Monitoring.RestServer/MetricsService.cs b/src/Microsoft.Diagnostics.Monitoring.RestServer/MetricsService.cs index dce4672cc..303cd1ac0 100644 --- a/src/Microsoft.Diagnostics.Monitoring.RestServer/MetricsService.cs +++ b/src/Microsoft.Diagnostics.Monitoring.RestServer/MetricsService.cs @@ -82,7 +82,7 @@ namespace Microsoft.Diagnostics.Monitoring.RestServer CounterGroups = counterGroups, Duration = Timeout.InfiniteTimeSpan, RefreshInterval = TimeSpan.FromSeconds(options.UpdateIntervalSeconds) - }, metricsLogger: new[] { new MetricsLogger(_store.MetricsStore) }); + }, loggers: new[] { new MetricsLogger(_store.MetricsStore) }); using var linkedTokenSource = CancellationTokenSource.CreateLinkedTokenSource(stoppingToken, optionsTokenSource.Token); await _counterPipeline.RunAsync(linkedTokenSource.Token);