Refactor pipeline mode details into pipeline classes. (#1776)
authorJustin Anderson <jander-msft@users.noreply.github.com>
Mon, 30 Nov 2020 23:47:02 +0000 (15:47 -0800)
committerGitHub <noreply@github.com>
Mon, 30 Nov 2020 23:47:02 +0000 (15:47 -0800)
src/Microsoft.Diagnostics.Monitoring.EventPipe/Counters/EventCounterPipeline.cs
src/Microsoft.Diagnostics.Monitoring.EventPipe/DiagnosticsEventPipeProcessor.cs
src/Microsoft.Diagnostics.Monitoring.EventPipe/DiagnosticsMonitor.cs [deleted file]
src/Microsoft.Diagnostics.Monitoring.EventPipe/EventPipeStreamProvider.cs [new file with mode: 0644]
src/Microsoft.Diagnostics.Monitoring.EventPipe/EventSourcePipeline.cs
src/Microsoft.Diagnostics.Monitoring.EventPipe/GCDump/EventGCDumpPipeline.cs
src/Microsoft.Diagnostics.Monitoring.EventPipe/Logs/EventLogsPipeline.cs
src/Microsoft.Diagnostics.Monitoring.EventPipe/PipeMode.cs [deleted file]
src/Microsoft.Diagnostics.Monitoring.EventPipe/ProcessInfo/EventProcessInfoPipeline.cs
src/Microsoft.Diagnostics.Monitoring.EventPipe/Trace/EventTracePipeline.cs
src/Microsoft.Diagnostics.Monitoring.RestServer/MetricsService.cs

index 3904afd0cb8bdb9ba520b81b36380baea9433893..a778091235a41a9e26c891067d0520c0d6d019b7 100644 (file)
@@ -3,10 +3,9 @@
 // See the LICENSE file in the project root for more information.
 
 using Microsoft.Diagnostics.NETCore.Client;
+using Microsoft.Diagnostics.Tracing;
 using System;
 using System.Collections.Generic;
-using System.Linq;
-using System.Text;
 using System.Threading;
 using System.Threading.Tasks;
 
@@ -14,14 +13,14 @@ namespace Microsoft.Diagnostics.Monitoring.EventPipe
 {
     internal class EventCounterPipeline : EventSourcePipeline<EventPipeCounterPipelineSettings>
     {
-        private readonly IEnumerable<ICountersLogger> _metricsLogger;
+        private readonly IEnumerable<ICountersLogger> _loggers;
         private readonly CounterFilter _filter;
 
         public EventCounterPipeline(DiagnosticsClient client,
             EventPipeCounterPipelineSettings settings,
-            IEnumerable<ICountersLogger> metricsLogger) : base(client, settings)
+            IEnumerable<ICountersLogger> loggers) : base(client, settings)
         {
-            _metricsLogger = metricsLogger ?? throw new ArgumentNullException(nameof(metricsLogger));
+            _loggers = loggers ?? throw new ArgumentNullException(nameof(loggers));
 
             if (settings.CounterGroups.Length > 0)
             {
@@ -37,10 +36,113 @@ namespace Microsoft.Diagnostics.Monitoring.EventPipe
             }
         }
 
-        internal override DiagnosticsEventPipeProcessor CreateProcessor()
+        protected override MonitoringSourceConfiguration CreateConfiguration()
         {
-            return new DiagnosticsEventPipeProcessor(PipeMode.Metrics, metricLoggers: _metricsLogger, metricIntervalSeconds: (int)Settings.RefreshInterval.TotalSeconds,
-                metricFilter: _filter);
+            return new MetricSourceConfiguration(CounterIntervalSeconds, _filter.GetProviders());
         }
+
+        protected override async Task OnEventSourceAvailable(EventPipeEventSource eventSource, Func<Task> stopSessionAsync, CancellationToken token)
+        {
+            ExecuteCounterLoggerAction((metricLogger) => metricLogger.PipelineStarted());
+
+            eventSource.Dynamic.All += traceEvent =>
+            {
+                try
+                {
+                    // Metrics
+                    if (traceEvent.EventName.Equals("EventCounters"))
+                    {
+                        IDictionary<string, object> payloadVal = (IDictionary<string, object>)(traceEvent.PayloadValue(0));
+                        IDictionary<string, object> payloadFields = (IDictionary<string, object>)(payloadVal["Payload"]);
+
+                        //Make sure we are part of the requested series. If multiple clients request metrics, all of them get the metrics.
+                        string series = payloadFields["Series"].ToString();
+                        if (GetInterval(series) != CounterIntervalSeconds * 1000)
+                        {
+                            return;
+                        }
+
+                        string counterName = payloadFields["Name"].ToString();
+                        if (!_filter.IsIncluded(traceEvent.ProviderName, counterName))
+                        {
+                            return;
+                        }
+
+                        float intervalSec = (float)payloadFields["IntervalSec"];
+                        string displayName = payloadFields["DisplayName"].ToString();
+                        string displayUnits = payloadFields["DisplayUnits"].ToString();
+                        double value = 0;
+                        CounterType counterType = CounterType.Metric;
+
+                        if (payloadFields["CounterType"].Equals("Mean"))
+                        {
+                            value = (double)payloadFields["Mean"];
+                        }
+                        else if (payloadFields["CounterType"].Equals("Sum"))
+                        {
+                            counterType = CounterType.Rate;
+                            value = (double)payloadFields["Increment"];
+                            if (string.IsNullOrEmpty(displayUnits))
+                            {
+                                displayUnits = "count";
+                            }
+                            //TODO Should we make these /sec like the dotnet-counters tool?
+                        }
+
+                        // Note that dimensional data such as pod and namespace are automatically added in prometheus and azure monitor scenarios.
+                        // We no longer added it here.
+                        var counterPayload = new CounterPayload(traceEvent.TimeStamp,
+                            traceEvent.ProviderName,
+                            counterName, displayName,
+                            displayUnits,
+                            value,
+                            counterType,
+                            intervalSec);
+
+                        ExecuteCounterLoggerAction((metricLogger) => metricLogger.Log(counterPayload));
+                    }
+                }
+                catch (Exception)
+                {
+                }
+            };
+
+            using var sourceCompletedTaskSource = new EventTaskSource<Action>(
+                taskComplete => taskComplete,
+                handler => eventSource.Completed += handler,
+                handler => eventSource.Completed -= handler,
+                token);
+
+            await sourceCompletedTaskSource.Task;
+
+            ExecuteCounterLoggerAction((metricLogger) => metricLogger.PipelineStopped());
+        }
+
+        private static int GetInterval(string series)
+        {
+            const string comparison = "Interval=";
+            int interval = 0;
+            if (series.StartsWith(comparison, StringComparison.OrdinalIgnoreCase))
+            {
+                int.TryParse(series.Substring(comparison.Length), out interval);
+            }
+            return interval;
+        }
+
+        private void ExecuteCounterLoggerAction(Action<ICountersLogger> action)
+        {
+            foreach (ICountersLogger logger in _loggers)
+            {
+                try
+                {
+                    action(logger);
+                }
+                catch (ObjectDisposedException)
+                {
+                }
+            }
+        }
+
+        private int CounterIntervalSeconds => (int)Settings.RefreshInterval.TotalSeconds;
     }
 }
index 84bcde29f8d91e2dd9243d863d3b8d559830b312..6a24eb1501ba793c2029a01fc296f7036f70f3e4 100644 (file)
@@ -2,67 +2,34 @@
 // The .NET Foundation licenses this file to you under the MIT license.
 // See the LICENSE file in the project root for more information.
 
+using Microsoft.Diagnostics.NETCore.Client;
+using Microsoft.Diagnostics.Tracing;
 using System;
-using System.Collections.Generic;
-using System.Globalization;
 using System.IO;
-using System.Linq;
-using System.Text.Json;
 using System.Threading;
 using System.Threading.Tasks;
-using Graphs;
-using Microsoft.Diagnostics.NETCore.Client;
-using Microsoft.Diagnostics.Tracing;
-using Microsoft.Diagnostics.Tracing.Extensions;
-using Microsoft.Diagnostics.Tracing.Parsers.Clr;
-using Microsoft.Extensions.Logging;
 
 namespace Microsoft.Diagnostics.Monitoring.EventPipe
 {
     internal partial class DiagnosticsEventPipeProcessor : IAsyncDisposable
     {
-        private readonly MemoryGraph _gcGraph;
-        private readonly ILoggerFactory _loggerFactory;
-        private readonly IEnumerable<ICountersLogger> _metricLoggers;
-        private readonly PipeMode _mode;
-        private readonly int _metricIntervalSeconds;
-        private readonly CounterFilter _counterFilter;
-        private readonly LogLevel _logsLevel;
-        private readonly Func<string, CancellationToken, Task> _processInfoCallback;
-        private readonly MonitoringSourceConfiguration _userConfig;
-        private readonly Func<Stream, CancellationToken, Task> _onStreamAvailable;
+        private readonly MonitoringSourceConfiguration _configuration;
+        private readonly Func<EventPipeEventSource, Func<Task>, CancellationToken, Task> _onEventSourceAvailable;
 
         private readonly object _lock = new object();
 
         private TaskCompletionSource<bool> _sessionStarted;
-        private EventPipeEventSource _eventPipeSession;
+        private EventPipeEventSource _eventSource;
         private Func<Task> _stopFunc;
         private bool _disposed;
 
         public DiagnosticsEventPipeProcessor(
-            PipeMode mode,
-            ILoggerFactory loggerFactory = null,              // PipeMode = Logs
-            LogLevel logsLevel = LogLevel.Debug,              // PipeMode = Logs
-            IEnumerable<ICountersLogger> metricLoggers = null, // PipeMode = Metrics
-            int metricIntervalSeconds = 10,                   // PipeMode = Metrics
-            CounterFilter metricFilter = null,                // PipeMode = Metrics
-            MemoryGraph gcGraph = null,                       // PipeMode = GCDump
-            MonitoringSourceConfiguration configuration = null, // PipeMode = Nettrace
-            Func<Stream, CancellationToken, Task> onStreamAvailable = null, // PipeMode = Nettrace
-            Func<string, CancellationToken, Task> processInfoCallback = null     // PipeMode = ProcessInfo
+            MonitoringSourceConfiguration configuration,
+            Func<EventPipeEventSource, Func<Task>, CancellationToken, Task> onEventSourceAvailable
             )
         {
-            _metricLoggers = metricLoggers ?? Enumerable.Empty<ICountersLogger>();
-            _mode = mode;
-            _loggerFactory = loggerFactory;
-            _gcGraph = gcGraph;
-            _metricIntervalSeconds = metricIntervalSeconds;
-            _logsLevel = logsLevel;
-            _processInfoCallback = processInfoCallback;
-            _userConfig = configuration;
-            _onStreamAvailable = onStreamAvailable;
-            _processInfoCallback = processInfoCallback;
-            _counterFilter = metricFilter;
+            _configuration = configuration ?? throw new ArgumentNullException(nameof(configuration));
+            _onEventSourceAvailable = onEventSourceAvailable ?? throw new ArgumentNullException(nameof(onEventSourceAvailable));
 
             _sessionStarted = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
         }
@@ -74,83 +41,23 @@ namespace Microsoft.Diagnostics.Monitoring.EventPipe
             await await Task.Factory.StartNew(async () =>
             {
                 EventPipeEventSource source = null;
-                DiagnosticsMonitor monitor = null;
+                EventPipeStreamProvider streamProvider = null;
                 Task handleEventsTask = Task.CompletedTask;
                 try
                 {
-                    MonitoringSourceConfiguration config = null;
-                    if (_mode == PipeMode.Logs)
-                    {
-                        config = new LoggingSourceConfiguration(_logsLevel);
-                    }
-                    else if (_mode == PipeMode.Metrics)
-                    {
-                        config = new MetricSourceConfiguration(_metricIntervalSeconds, _counterFilter.GetProviders());
-                    }
-                    else if (_mode == PipeMode.GCDump)
-                    {
-                        config = new GCDumpSourceConfiguration();
-                    }
-                    else if (_mode == PipeMode.ProcessInfo)
-                    {
-                        config = new SampleProfilerConfiguration();
-                    }
-                    else if (_mode == PipeMode.Nettrace)
-                    {
-                        config = _userConfig;
-                    }
-
-                    monitor = new DiagnosticsMonitor(config);
+                    streamProvider = new EventPipeStreamProvider(_configuration);
                     // Allows the event handling routines to stop processing before the duration expires.
-                    Func<Task> stopFunc = () => Task.Run(() => { monitor.StopProcessing(); });
-
-                    Stream sessionStream = await monitor.ProcessEvents(client, duration, token);
+                    Func<Task> stopFunc = () => Task.Run(() => { streamProvider.StopProcessing(); });
 
-                    if (_mode == PipeMode.Nettrace)
-                    {
-                        if (!_sessionStarted.TrySetResult(true))
-                        {
-                            token.ThrowIfCancellationRequested();
-                        }
-
-                        lock (_lock)
-                        {
-                            //Save the stop function for later, so that we can stop a trace later.
-                            _stopFunc = stopFunc;
-                        }
-
-                        await _onStreamAvailable(sessionStream, token);
-                        return;
-                    }
+                    Stream sessionStream = await streamProvider.ProcessEvents(client, duration, token);
 
                     source = new EventPipeEventSource(sessionStream);
 
+                    handleEventsTask = _onEventSourceAvailable(source, stopFunc, token);
 
-                    if (_mode == PipeMode.Metrics)
-                    {
-                        // Metrics
-                        HandleEventCounters(source);
-                    }
-                    else if (_mode == PipeMode.Logs)
-                    {
-                        // Logging
-                        HandleLoggingEvents(source);
-                    }
-                    else if (_mode == PipeMode.GCDump)
-                    {
-                        // GC
-                        handleEventsTask = HandleGCEvents(source, stopFunc, token);
-                    }
-
-                    else if (_mode == PipeMode.ProcessInfo)
-                    {
-                        // ProcessInfo
-                        handleEventsTask = HandleProcessInfo(source, stopFunc, token);
-                    }
-
-                    lock(_lock)
+                    lock (_lock)
                     {
-                        _eventPipeSession = source;
+                        _eventSource = source;
                         _stopFunc = stopFunc;
                     }
                     registration.Dispose();
@@ -168,20 +75,18 @@ namespace Microsoft.Diagnostics.Monitoring.EventPipe
                 }
                 finally
                 {
-                    ExecuteCounterLoggerAction((metricLogger) => metricLogger.PipelineStopped());
-
                     registration.Dispose();
-                    EventPipeEventSource session = null;
+                    EventPipeEventSource eventSource = null;
                     lock (_lock)
                     {
-                        session = _eventPipeSession;
-                        _eventPipeSession = null;
+                        eventSource = _eventSource;
+                        _eventSource = null;
                     }
 
-                    session?.Dispose();
-                    if (monitor != null)
+                    eventSource?.Dispose();
+                    if (streamProvider != null)
                     {
-                        await monitor.DisposeAsync();
+                        await streamProvider.DisposeAsync();
                     }
                 }
 
@@ -202,7 +107,7 @@ namespace Microsoft.Diagnostics.Monitoring.EventPipe
             Func<Task> stopFunc = null;
             lock (_lock)
             {
-                session = _eventPipeSession;
+                session = _eventSource;
                 stopFunc = _stopFunc;
             }
             if (session != null)
@@ -216,366 +121,6 @@ namespace Microsoft.Diagnostics.Monitoring.EventPipe
             }
         }
 
-        private void HandleLoggingEvents(EventPipeEventSource source)
-        {
-            string lastFormattedMessage = string.Empty;
-
-            var logActivities = new Dictionary<Guid, LogActivityItem>();
-            var stack = new Stack<Guid>();
-
-            source.Dynamic.AddCallbackForProviderEvent(LoggingSourceConfiguration.MicrosoftExtensionsLoggingProviderName, "ActivityJson/Start", (traceEvent) =>
-            {
-                var factoryId = (int)traceEvent.PayloadByName("FactoryID");
-                var categoryName = (string)traceEvent.PayloadByName("LoggerName");
-                var argsJson = (string)traceEvent.PayloadByName("ArgumentsJson");
-
-                // TODO: Store this information by logger factory id
-                var item = new LogActivityItem
-                {
-                    ActivityID = traceEvent.ActivityID,
-                    ScopedObject = new LogObject(JsonDocument.Parse(argsJson).RootElement),
-                };
-
-                if (stack.Count > 0)
-                {
-                    Guid parentId = stack.Peek();
-                    if (logActivities.TryGetValue(parentId, out var parentItem))
-                    {
-                        item.Parent = parentItem;
-                    }
-                }
-
-                stack.Push(traceEvent.ActivityID);
-                logActivities[traceEvent.ActivityID] = item;
-            });
-
-            source.Dynamic.AddCallbackForProviderEvent(LoggingSourceConfiguration.MicrosoftExtensionsLoggingProviderName, "ActivityJson/Stop", (traceEvent) =>
-            {
-                var factoryId = (int)traceEvent.PayloadByName("FactoryID");
-                var categoryName = (string)traceEvent.PayloadByName("LoggerName");
-
-                stack.Pop();
-                logActivities.Remove(traceEvent.ActivityID);
-            });
-
-            source.Dynamic.AddCallbackForProviderEvent(LoggingSourceConfiguration.MicrosoftExtensionsLoggingProviderName, "MessageJson", (traceEvent) =>
-            {
-                // Level, FactoryID, LoggerName, EventID, EventName, ExceptionJson, ArgumentsJson
-                var logLevel = (LogLevel)traceEvent.PayloadByName("Level");
-                var factoryId = (int)traceEvent.PayloadByName("FactoryID");
-                var categoryName = (string)traceEvent.PayloadByName("LoggerName");
-                var eventId = (int)traceEvent.PayloadByName("EventId");
-                var eventName = (string)traceEvent.PayloadByName("EventName");
-                var exceptionJson = (string)traceEvent.PayloadByName("ExceptionJson");
-                var argsJson = (string)traceEvent.PayloadByName("ArgumentsJson");
-
-                // There's a bug that causes some of the columns to get mixed up
-                if (eventName.StartsWith("{"))
-                {
-                    argsJson = exceptionJson;
-                    exceptionJson = eventName;
-                    eventName = null;
-                }
-
-                if (string.IsNullOrEmpty(argsJson))
-                {
-                    return;
-                }
-
-                Exception exception = null;
-
-                ILogger logger = _loggerFactory.CreateLogger(categoryName);
-                var scopes = new List<IDisposable>();
-
-                if (logActivities.TryGetValue(traceEvent.ActivityID, out var logActivityItem))
-                {
-                    // REVIEW: Does order matter here? We're combining everything anyways.
-                    while (logActivityItem != null)
-                    {
-                        scopes.Add(logger.BeginScope(logActivityItem.ScopedObject));
-
-                        logActivityItem = logActivityItem.Parent;
-                    }
-                }
-
-                try
-                {
-                    if (exceptionJson != "{}")
-                    {
-                        var exceptionMessage = JsonSerializer.Deserialize<JsonElement>(exceptionJson);
-                        exception = new LoggerException(exceptionMessage);
-                    }
-
-                    var message = JsonSerializer.Deserialize<JsonElement>(argsJson);
-                    if (message.TryGetProperty("{OriginalFormat}", out var formatElement))
-                    {
-                        var formatString = formatElement.GetString();
-                        var formatter = new LogValuesFormatter(formatString);
-                        object[] args = new object[formatter.ValueNames.Count];
-                        for (int i = 0; i < args.Length; i++)
-                        {
-                            args[i] = message.GetProperty(formatter.ValueNames[i]).GetString();
-                        }
-
-                        logger.Log(logLevel, new EventId(eventId, eventName), exception, formatString, args);
-                    }
-                    else
-                    {
-                        var obj = new LogObject(message, lastFormattedMessage);
-                        logger.Log(logLevel, new EventId(eventId, eventName), obj, exception, LogObject.Callback);
-                    }
-                }
-                catch (Exception)
-                {
-                }
-                finally
-                {
-                    scopes.ForEach(d => d.Dispose());
-                }
-            });
-
-            source.Dynamic.AddCallbackForProviderEvent(LoggingSourceConfiguration.MicrosoftExtensionsLoggingProviderName, "FormattedMessage", (traceEvent) =>
-            {
-                // Level, FactoryID, LoggerName, EventID, EventName, FormattedMessage
-                var logLevel = (LogLevel)traceEvent.PayloadByName("Level");
-                var factoryId = (int)traceEvent.PayloadByName("FactoryID");
-                var categoryName = (string)traceEvent.PayloadByName("LoggerName");
-                var eventId = (int)traceEvent.PayloadByName("EventId");
-                var eventName = (string)traceEvent.PayloadByName("EventName");
-                var formattedMessage = (string)traceEvent.PayloadByName("FormattedMessage");
-
-                if (string.IsNullOrEmpty(formattedMessage))
-                {
-                    formattedMessage = eventName;
-                    eventName = string.Empty;
-                }
-
-                lastFormattedMessage = formattedMessage;
-            });
-        }
-
-        private void HandleEventCounters(EventPipeEventSource source)
-        {
-            ExecuteCounterLoggerAction((metricLogger) => metricLogger.PipelineStarted());
-
-            source.Dynamic.All += traceEvent =>
-            {
-                try
-                {
-                    // Metrics
-                    if (traceEvent.EventName.Equals("EventCounters"))
-                    {
-                        IDictionary<string, object> payloadVal = (IDictionary<string, object>)(traceEvent.PayloadValue(0));
-                        IDictionary<string, object> payloadFields = (IDictionary<string, object>)(payloadVal["Payload"]);
-
-                        //Make sure we are part of the requested series. If multiple clients request metrics, all of them get the metrics.
-                        string series = payloadFields["Series"].ToString();
-                        if (GetInterval(series) != _metricIntervalSeconds * 1000)
-                        {
-                            return;
-                        }
-
-                        string counterName = payloadFields["Name"].ToString();
-                        if (!_counterFilter.IsIncluded(traceEvent.ProviderName, counterName))
-                        {
-                            return;
-                        }
-
-                        float intervalSec = (float)payloadFields["IntervalSec"];
-                        string displayName = payloadFields["DisplayName"].ToString();
-                        string displayUnits = payloadFields["DisplayUnits"].ToString();
-                        double value = 0;
-                        CounterType counterType = CounterType.Metric;
-
-                        if (payloadFields["CounterType"].Equals("Mean"))
-                        {
-                            value = (double)payloadFields["Mean"];
-                        }
-                        else if (payloadFields["CounterType"].Equals("Sum"))
-                        {
-                            counterType = CounterType.Rate;
-                            value = (double)payloadFields["Increment"];
-                            if (string.IsNullOrEmpty(displayUnits))
-                            {
-                                displayUnits = "count";
-                            }
-                            //TODO Should we make these /sec like the dotnet-counters tool?
-                        }
-
-                        // Note that dimensional data such as pod and namespace are automatically added in prometheus and azure monitor scenarios.
-                        // We no longer added it here.
-                        var counterPayload = new CounterPayload(traceEvent.TimeStamp,
-                            traceEvent.ProviderName,
-                            counterName, displayName,
-                            displayUnits,
-                            value,
-                            counterType,
-                            intervalSec);
-
-                        ExecuteCounterLoggerAction((metricLogger) => metricLogger.Log(counterPayload));
-                    }
-                }
-                catch (Exception)
-                {
-                }
-            };
-        }
-
-        private static int GetInterval(string series)
-        {
-            const string comparison = "Interval=";
-            int interval = 0;
-            if (series.StartsWith(comparison, StringComparison.OrdinalIgnoreCase))
-            {
-                int.TryParse(series.Substring(comparison.Length), out interval);
-            }
-            return interval;
-        }
-
-        private void ExecuteCounterLoggerAction(Action<ICountersLogger> action)
-        {
-            foreach (ICountersLogger metricLogger in _metricLoggers)
-            {
-                try
-                {
-                    action(metricLogger);
-                }
-                catch (ObjectDisposedException)
-                {
-                }
-            }
-        }
-
-        private async Task HandleGCEvents(EventPipeEventSource source, Func<Task> stopFunc, CancellationToken token)
-        {
-            int gcNum = -1;
-
-            Action<GCStartTraceData, Action> gcStartHandler = (GCStartTraceData data, Action taskComplete) =>
-            {
-                taskComplete();
-
-                if (gcNum < 0 && data.Depth == 2 && data.Type != GCType.BackgroundGC)
-                {
-                    gcNum = data.Count;
-                }
-            };
-
-            Action<GCBulkNodeTraceData, Action> gcBulkNodeHandler = (GCBulkNodeTraceData data, Action taskComplete) =>
-            {
-                taskComplete();
-            };
-
-            Action<GCEndTraceData, Action> gcEndHandler = (GCEndTraceData data, Action taskComplete) =>
-            {
-                if (data.Count == gcNum)
-                {
-                    taskComplete();
-                }
-            };
-
-            // Register event handlers on the event source and represent their completion as tasks
-            using var gcStartTaskSource = new EventTaskSource<Action<GCStartTraceData>>(
-                taskComplete => data => gcStartHandler(data, taskComplete),
-                handler => source.Clr.GCStart += handler,
-                handler => source.Clr.GCStart -= handler,
-                token);
-            using var gcBulkNodeTaskSource = new EventTaskSource<Action<GCBulkNodeTraceData>>(
-                taskComplete => data => gcBulkNodeHandler(data, taskComplete),
-                handler => source.Clr.GCBulkNode += handler,
-                handler => source.Clr.GCBulkNode -= handler,
-                token);
-            using var gcStopTaskSource = new EventTaskSource<Action<GCEndTraceData>>(
-                taskComplete => data => gcEndHandler(data, taskComplete),
-                handler => source.Clr.GCStop += handler,
-                handler => source.Clr.GCStop -= handler,
-                token);
-            using var sourceCompletedTaskSource = new EventTaskSource<Action>(
-                taskComplete => taskComplete,
-                handler => source.Completed += handler,
-                handler => source.Completed -= handler,
-                token);
-
-            // A task that is completed whenever GC data is received
-            Task gcDataTask = Task.WhenAny(gcStartTaskSource.Task, gcBulkNodeTaskSource.Task);
-            Task gcStopTask = gcStopTaskSource.Task;
-
-            DotNetHeapDumpGraphReader dumper = new DotNetHeapDumpGraphReader(TextWriter.Null)
-            {
-                DotNetHeapInfo = new DotNetHeapInfo()
-            };
-            dumper.SetupCallbacks(_gcGraph, source);
-
-            // The event source will not always provide the GC events when it starts listening. However,
-            // they will be provided when the event source is told to stop processing events. Give the
-            // event source some time to produce the events, but if it doesn't start producing them within
-            // a short amount of time (5 seconds), then stop processing events to allow them to be flushed.
-            Task eventsTimeoutTask = Task.Delay(TimeSpan.FromSeconds(5), token);
-            Task completedTask = await Task.WhenAny(gcDataTask, eventsTimeoutTask);
-
-            token.ThrowIfCancellationRequested();
-
-            // If started receiving GC events, wait for the GC Stop event.
-            if (completedTask == gcDataTask)
-            {
-                await gcStopTask;
-            }
-
-            // Stop receiving events; if haven't received events yet, this will force flushing of events.
-            await stopFunc();
-
-            // Wait for all received events to be processed.
-            await sourceCompletedTaskSource.Task;
-
-            // Check that GC data and stop events were received. This is done by checking that the
-            // associated tasks have ran to completion. If one of them has not reached the completion state, then
-            // fail the GC dump operation.
-            if (gcDataTask.Status != TaskStatus.RanToCompletion ||
-                gcStopTask.Status != TaskStatus.RanToCompletion)
-            {
-                throw new InvalidOperationException("Unable to create GC dump due to incomplete GC data.");
-            }
-
-            dumper.ConvertHeapDataToGraph();
-
-            _gcGraph.AllowReading();
-        }
-
-        private async Task HandleProcessInfo(EventPipeEventSource source, Func<Task> stopFunc, CancellationToken token)
-        {
-            string commandLine = null;
-            Action<TraceEvent, Action> processInfoHandler = (TraceEvent traceEvent, Action taskComplete) =>
-            {
-                commandLine = (string)traceEvent.PayloadByName("CommandLine");
-                taskComplete();
-            };
-
-            // Completed when the ProcessInfo event of the Microsoft-DotNETCore-EventPipe event provider is handled
-            using var processInfoTaskSource = new EventTaskSource<Action<TraceEvent>>(
-                taskComplete => traceEvent => processInfoHandler(traceEvent, taskComplete),
-                handler => source.Dynamic.AddCallbackForProviderEvent(MonitoringSourceConfiguration.EventPipeProviderName, "ProcessInfo", handler),
-                handler => source.Dynamic.RemoveCallback(handler),
-                token);
-
-            // Completed when any trace event is handled
-            using var anyEventTaskSource = new EventTaskSource<Action<TraceEvent>>(
-                taskComplete => traceEvent => taskComplete(),
-                handler => source.Dynamic.All += handler,
-                handler => source.Dynamic.All -= handler,
-                token);
-
-            // Wait for any trace event to be processed
-            await anyEventTaskSource.Task;
-
-            // Stop the event pipe session
-            await stopFunc();
-
-            // Wait for the ProcessInfo event to be processed
-            await processInfoTaskSource.Task;
-
-            // Notify of command line information
-            await _processInfoCallback(commandLine, token);
-        }
-
         public async ValueTask DisposeAsync()
         {
             lock (_lock)
@@ -596,16 +141,7 @@ namespace Microsoft.Diagnostics.Monitoring.EventPipe
             {
             }
 
-            _eventPipeSession?.Dispose();
-        }
-
-        private class LogActivityItem
-        {
-            public Guid ActivityID { get; set; }
-
-            public LogObject ScopedObject { get; set; }
-
-            public LogActivityItem Parent { get; set; }
+            _eventSource?.Dispose();
         }
     }
 }
diff --git a/src/Microsoft.Diagnostics.Monitoring.EventPipe/DiagnosticsMonitor.cs b/src/Microsoft.Diagnostics.Monitoring.EventPipe/DiagnosticsMonitor.cs
deleted file mode 100644 (file)
index 172b4ad..0000000
+++ /dev/null
@@ -1,110 +0,0 @@
-// Licensed to the .NET Foundation under one or more agreements.
-// The .NET Foundation licenses this file to you under the MIT license.
-// See the LICENSE file in the project root for more information.
-
-using System;
-using System.IO;
-using System.Threading;
-using System.Threading.Tasks;
-using Microsoft.Diagnostics.NETCore.Client;
-
-namespace Microsoft.Diagnostics.Monitoring.EventPipe
-{
-    partial class DiagnosticsEventPipeProcessor
-    {
-        private sealed class DiagnosticsMonitor : IAsyncDisposable
-        {
-            private readonly MonitoringSourceConfiguration _sourceConfig;
-            private readonly TaskCompletionSource<object> _stopProcessingSource;
-            private Task _currentTask;
-
-            public DiagnosticsMonitor(MonitoringSourceConfiguration sourceConfig)
-            {
-                _sourceConfig = sourceConfig;
-                _stopProcessingSource = new TaskCompletionSource<object>(TaskCreationOptions.RunContinuationsAsynchronously);
-            }
-
-            public Task<Stream> ProcessEvents(DiagnosticsClient client, TimeSpan duration, CancellationToken cancellationToken)
-            {
-                cancellationToken.ThrowIfCancellationRequested();
-
-                EventPipeSession session = null;
-                try
-                {
-                    session = client.StartEventPipeSession(_sourceConfig.GetProviders(), _sourceConfig.RequestRundown, _sourceConfig.BufferSizeInMB);
-                }
-                catch (EndOfStreamException e)
-                {
-                    throw new InvalidOperationException("End of stream", e);
-                }
-                catch (Exception ex) when (!(ex is OperationCanceledException))
-                {
-                    throw new InvalidOperationException("Failed to start the event pipe session", ex);
-                }
-
-                _currentTask = Task.Run(async () =>
-                {
-                    using var linkedSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
-                    linkedSource.CancelAfter(duration);
-                    using var _ = linkedSource.Token.Register(() => _stopProcessingSource.TrySetResult(null));
-
-                    // Use TaskCompletionSource instead of Task.Delay with cancellation to avoid
-                    // using exceptions for normal termination of event stream.
-                    await _stopProcessingSource.Task.ConfigureAwait(false);
-                    StopSession(session);
-                });
-
-                return Task.FromResult(session.EventStream);
-            }
-
-            public void StopProcessing()
-            {
-                _stopProcessingSource.TrySetResult(null);
-            }
-
-            private static void StopSession(EventPipeSession session)
-            {
-                try
-                {
-                    session.Stop();
-                }
-                catch (EndOfStreamException)
-                {
-                    // If the app we're monitoring exits abruptly, this may throw in which case we just swallow the exception and exit gracefully.
-                }
-                // We may time out if the process ended before we sent StopTracing command. We can just exit in that case.
-                catch (TimeoutException)
-                {
-                }
-                // On Unix platforms, we may actually get a PNSE since the pipe is gone with the process, and Runtime Client Library
-                // does not know how to distinguish a situation where there is no pipe to begin with, or where the process has exited
-                // before collection started and got rid of a pipe that once existed.
-                // Since we are catching this at the end of a session we know that the pipe once existed (otherwise the exception would've 
-                // been thrown at the beginning directly)
-                catch (PlatformNotSupportedException)
-                {
-                }
-                // On non-abrupt exits, the socket may be already closed by the runtime and we won't be able to send a stop request through it. 
-                catch (ServerNotAvailableException)
-                {
-                }
-            }
-
-            public async ValueTask DisposeAsync()
-            {
-                Task currentTask = _currentTask;
-                _stopProcessingSource.TrySetResult(null);
-                if (currentTask != null)
-                {
-                    try
-                    {
-                        await currentTask.ConfigureAwait(false);
-                    }
-                    catch (OperationCanceledException)
-                    {
-                    }
-                }
-            }
-        }
-    }
-}
\ No newline at end of file
diff --git a/src/Microsoft.Diagnostics.Monitoring.EventPipe/EventPipeStreamProvider.cs b/src/Microsoft.Diagnostics.Monitoring.EventPipe/EventPipeStreamProvider.cs
new file mode 100644 (file)
index 0000000..545d92a
--- /dev/null
@@ -0,0 +1,107 @@
+// Licensed to the .NET Foundation under one or more agreements.
+// The .NET Foundation licenses this file to you under the MIT license.
+// See the LICENSE file in the project root for more information.
+
+using System;
+using System.IO;
+using System.Threading;
+using System.Threading.Tasks;
+using Microsoft.Diagnostics.NETCore.Client;
+
+namespace Microsoft.Diagnostics.Monitoring.EventPipe
+{
+    internal sealed class EventPipeStreamProvider : IAsyncDisposable
+    {
+        private readonly MonitoringSourceConfiguration _sourceConfig;
+        private readonly TaskCompletionSource<object> _stopProcessingSource;
+        private Task _currentTask;
+
+        public EventPipeStreamProvider(MonitoringSourceConfiguration sourceConfig)
+        {
+            _sourceConfig = sourceConfig;
+            _stopProcessingSource = new TaskCompletionSource<object>(TaskCreationOptions.RunContinuationsAsynchronously);
+        }
+
+        public Task<Stream> ProcessEvents(DiagnosticsClient client, TimeSpan duration, CancellationToken cancellationToken)
+        {
+            cancellationToken.ThrowIfCancellationRequested();
+
+            EventPipeSession session = null;
+            try
+            {
+                session = client.StartEventPipeSession(_sourceConfig.GetProviders(), _sourceConfig.RequestRundown, _sourceConfig.BufferSizeInMB);
+            }
+            catch (EndOfStreamException e)
+            {
+                throw new InvalidOperationException("End of stream", e);
+            }
+            catch (Exception ex) when (!(ex is OperationCanceledException))
+            {
+                throw new InvalidOperationException("Failed to start the event pipe session", ex);
+            }
+
+            _currentTask = Task.Run(async () =>
+            {
+                using var linkedSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
+                linkedSource.CancelAfter(duration);
+                using var _ = linkedSource.Token.Register(() => _stopProcessingSource.TrySetResult(null));
+
+                // Use TaskCompletionSource instead of Task.Delay with cancellation to avoid
+                // using exceptions for normal termination of event stream.
+                await _stopProcessingSource.Task.ConfigureAwait(false);
+                StopSession(session);
+            });
+
+            return Task.FromResult(session.EventStream);
+        }
+
+        public void StopProcessing()
+        {
+            _stopProcessingSource.TrySetResult(null);
+        }
+
+        private static void StopSession(EventPipeSession session)
+        {
+            try
+            {
+                session.Stop();
+            }
+            catch (EndOfStreamException)
+            {
+                // If the app we're monitoring exits abruptly, this may throw in which case we just swallow the exception and exit gracefully.
+            }
+            // We may time out if the process ended before we sent StopTracing command. We can just exit in that case.
+            catch (TimeoutException)
+            {
+            }
+            // On Unix platforms, we may actually get a PNSE since the pipe is gone with the process, and Runtime Client Library
+            // does not know how to distinguish a situation where there is no pipe to begin with, or where the process has exited
+            // before collection started and got rid of a pipe that once existed.
+            // Since we are catching this at the end of a session we know that the pipe once existed (otherwise the exception would've 
+            // been thrown at the beginning directly)
+            catch (PlatformNotSupportedException)
+            {
+            }
+            // On non-abrupt exits, the socket may be already closed by the runtime and we won't be able to send a stop request through it. 
+            catch (ServerNotAvailableException)
+            {
+            }
+        }
+
+        public async ValueTask DisposeAsync()
+        {
+            Task currentTask = _currentTask;
+            _stopProcessingSource.TrySetResult(null);
+            if (currentTask != null)
+            {
+                try
+                {
+                    await currentTask.ConfigureAwait(false);
+                }
+                catch (OperationCanceledException)
+                {
+                }
+            }
+        }
+    }
+}
\ No newline at end of file
index 85275d93141639ebc4162d0c9e0b1d2543fea10c..d05a9778a450b40f5fe394e46f6312a9ed106e72 100644 (file)
@@ -2,13 +2,9 @@
 // The .NET Foundation licenses this file to you under the MIT license.
 // See the LICENSE file in the project root for more information.
 
-using Graphs;
 using Microsoft.Diagnostics.NETCore.Client;
-using Microsoft.Extensions.Logging;
+using Microsoft.Diagnostics.Tracing;
 using System;
-using System.Collections.Generic;
-using System.Linq;
-using System.Text;
 using System.Threading;
 using System.Threading.Tasks;
 
@@ -27,7 +23,14 @@ namespace Microsoft.Diagnostics.Monitoring.EventPipe
             _processor = new Lazy<DiagnosticsEventPipeProcessor>(CreateProcessor);
         }
 
-        internal abstract DiagnosticsEventPipeProcessor CreateProcessor();
+        protected abstract MonitoringSourceConfiguration CreateConfiguration();
+
+        private DiagnosticsEventPipeProcessor CreateProcessor()
+        {
+            return new DiagnosticsEventPipeProcessor(
+                configuration: CreateConfiguration(),
+                onEventSourceAvailable: OnEventSourceAvailable);
+        }
 
         protected override Task OnRun(CancellationToken token)
         {
@@ -61,5 +64,10 @@ namespace Microsoft.Diagnostics.Monitoring.EventPipe
                 await Task.WhenAny(stoppingTask, taskCompletionSource.Task).Unwrap();
             }
         }
+
+        protected virtual Task OnEventSourceAvailable(EventPipeEventSource eventSource, Func<Task> stopSessionAsync, CancellationToken token)
+        {
+            return Task.CompletedTask;
+        }
     }
 }
index ef4d8aeb4ded417d21378c9ee8ee6b04bb1fea8b..af615fecf998e877489a98cc5525433df78ded9a 100644 (file)
@@ -4,10 +4,10 @@
 
 using Graphs;
 using Microsoft.Diagnostics.NETCore.Client;
+using Microsoft.Diagnostics.Tracing;
 using Microsoft.Diagnostics.Tracing.Parsers.Clr;
 using System;
-using System.Collections.Generic;
-using System.Text;
+using System.IO;
 using System.Threading;
 using System.Threading.Tasks;
 
@@ -22,9 +22,103 @@ namespace Microsoft.Diagnostics.Monitoring.EventPipe
             _gcGraph = gcGraph ?? throw new ArgumentNullException(nameof(gcGraph));
         }
 
-        internal override DiagnosticsEventPipeProcessor CreateProcessor()
+        protected override MonitoringSourceConfiguration CreateConfiguration()
         {
-            return new DiagnosticsEventPipeProcessor(PipeMode.GCDump, gcGraph: _gcGraph);
+            return new GCDumpSourceConfiguration();
+        }
+
+        protected override async Task OnEventSourceAvailable(EventPipeEventSource eventSource, Func<Task> stopSessionAsync, CancellationToken token)
+        {
+            int gcNum = -1;
+
+            Action<GCStartTraceData, Action> gcStartHandler = (GCStartTraceData data, Action taskComplete) =>
+            {
+                taskComplete();
+
+                if (gcNum < 0 && data.Depth == 2 && data.Type != GCType.BackgroundGC)
+                {
+                    gcNum = data.Count;
+                }
+            };
+
+            Action<GCBulkNodeTraceData, Action> gcBulkNodeHandler = (GCBulkNodeTraceData data, Action taskComplete) =>
+            {
+                taskComplete();
+            };
+
+            Action<GCEndTraceData, Action> gcEndHandler = (GCEndTraceData data, Action taskComplete) =>
+            {
+                if (data.Count == gcNum)
+                {
+                    taskComplete();
+                }
+            };
+
+            // Register event handlers on the event source and represent their completion as tasks
+            using var gcStartTaskSource = new EventTaskSource<Action<GCStartTraceData>>(
+                taskComplete => data => gcStartHandler(data, taskComplete),
+                handler => eventSource.Clr.GCStart += handler,
+                handler => eventSource.Clr.GCStart -= handler,
+                token);
+            using var gcBulkNodeTaskSource = new EventTaskSource<Action<GCBulkNodeTraceData>>(
+                taskComplete => data => gcBulkNodeHandler(data, taskComplete),
+                handler => eventSource.Clr.GCBulkNode += handler,
+                handler => eventSource.Clr.GCBulkNode -= handler,
+                token);
+            using var gcStopTaskSource = new EventTaskSource<Action<GCEndTraceData>>(
+                taskComplete => data => gcEndHandler(data, taskComplete),
+                handler => eventSource.Clr.GCStop += handler,
+                handler => eventSource.Clr.GCStop -= handler,
+                token);
+            using var sourceCompletedTaskSource = new EventTaskSource<Action>(
+                taskComplete => taskComplete,
+                handler => eventSource.Completed += handler,
+                handler => eventSource.Completed -= handler,
+                token);
+
+            // A task that is completed whenever GC data is received
+            Task gcDataTask = Task.WhenAny(gcStartTaskSource.Task, gcBulkNodeTaskSource.Task);
+            Task gcStopTask = gcStopTaskSource.Task;
+
+            DotNetHeapDumpGraphReader dumper = new DotNetHeapDumpGraphReader(TextWriter.Null)
+            {
+                DotNetHeapInfo = new DotNetHeapInfo()
+            };
+            dumper.SetupCallbacks(_gcGraph, eventSource);
+
+            // The event source will not always provide the GC events when it starts listening. However,
+            // they will be provided when the event source is told to stop processing events. Give the
+            // event source some time to produce the events, but if it doesn't start producing them within
+            // a short amount of time (5 seconds), then stop processing events to allow them to be flushed.
+            Task eventsTimeoutTask = Task.Delay(TimeSpan.FromSeconds(5), token);
+            Task completedTask = await Task.WhenAny(gcDataTask, eventsTimeoutTask);
+
+            token.ThrowIfCancellationRequested();
+
+            // If started receiving GC events, wait for the GC Stop event.
+            if (completedTask == gcDataTask)
+            {
+                await gcStopTask;
+            }
+
+            // Stop receiving events; if haven't received events yet, this will force flushing of events.
+            await stopSessionAsync();
+
+            // Wait for all received events to be processed.
+            await sourceCompletedTaskSource.Task;
+
+            // Check that GC data and stop events were received. This is done by checking that the
+            // associated tasks have ran to completion. If one of them has not reached the completion state, then
+            // fail the GC dump operation.
+            if (gcDataTask.Status != TaskStatus.RanToCompletion ||
+                gcStopTask.Status != TaskStatus.RanToCompletion)
+            {
+                throw new InvalidOperationException("Unable to create GC dump due to incomplete GC data.");
+            }
+
+            dumper.ConvertHeapDataToGraph();
+
+            _gcGraph.AllowReading();
         }
     }
 }
index 7a0ac11f246ff568da3210124956d39d0033c992..62ddc75f0195bcbe204515a7593d550f921ef27b 100644 (file)
@@ -3,10 +3,11 @@
 // See the LICENSE file in the project root for more information.
 
 using Microsoft.Diagnostics.NETCore.Client;
+using Microsoft.Diagnostics.Tracing;
 using Microsoft.Extensions.Logging;
 using System;
 using System.Collections.Generic;
-using System.Text;
+using System.Text.Json;
 using System.Threading;
 using System.Threading.Tasks;
 
@@ -21,9 +22,158 @@ namespace Microsoft.Diagnostics.Monitoring.EventPipe
             _factory = factory ?? throw new ArgumentNullException(nameof(factory));
         }
 
-        internal override DiagnosticsEventPipeProcessor CreateProcessor()
+        protected override MonitoringSourceConfiguration CreateConfiguration()
         {
-            return new DiagnosticsEventPipeProcessor(PipeMode.Logs, loggerFactory: _factory, logsLevel: Settings.LogLevel);
+            return new LoggingSourceConfiguration(Settings.LogLevel);
+        }
+
+        protected override Task OnEventSourceAvailable(EventPipeEventSource eventSource, Func<Task> stopSessionAsync, CancellationToken token)
+        {
+            string lastFormattedMessage = string.Empty;
+
+            var logActivities = new Dictionary<Guid, LogActivityItem>();
+            var stack = new Stack<Guid>();
+
+            eventSource.Dynamic.AddCallbackForProviderEvent(LoggingSourceConfiguration.MicrosoftExtensionsLoggingProviderName, "ActivityJson/Start", (traceEvent) =>
+            {
+                var factoryId = (int)traceEvent.PayloadByName("FactoryID");
+                var categoryName = (string)traceEvent.PayloadByName("LoggerName");
+                var argsJson = (string)traceEvent.PayloadByName("ArgumentsJson");
+
+                // TODO: Store this information by logger factory id
+                var item = new LogActivityItem
+                {
+                    ActivityID = traceEvent.ActivityID,
+                    ScopedObject = new LogObject(JsonDocument.Parse(argsJson).RootElement),
+                };
+
+                if (stack.Count > 0)
+                {
+                    Guid parentId = stack.Peek();
+                    if (logActivities.TryGetValue(parentId, out var parentItem))
+                    {
+                        item.Parent = parentItem;
+                    }
+                }
+
+                stack.Push(traceEvent.ActivityID);
+                logActivities[traceEvent.ActivityID] = item;
+            });
+
+            eventSource.Dynamic.AddCallbackForProviderEvent(LoggingSourceConfiguration.MicrosoftExtensionsLoggingProviderName, "ActivityJson/Stop", (traceEvent) =>
+            {
+                var factoryId = (int)traceEvent.PayloadByName("FactoryID");
+                var categoryName = (string)traceEvent.PayloadByName("LoggerName");
+
+                stack.Pop();
+                logActivities.Remove(traceEvent.ActivityID);
+            });
+
+            eventSource.Dynamic.AddCallbackForProviderEvent(LoggingSourceConfiguration.MicrosoftExtensionsLoggingProviderName, "MessageJson", (traceEvent) =>
+            {
+                // Level, FactoryID, LoggerName, EventID, EventName, ExceptionJson, ArgumentsJson
+                var logLevel = (LogLevel)traceEvent.PayloadByName("Level");
+                var factoryId = (int)traceEvent.PayloadByName("FactoryID");
+                var categoryName = (string)traceEvent.PayloadByName("LoggerName");
+                var eventId = (int)traceEvent.PayloadByName("EventId");
+                var eventName = (string)traceEvent.PayloadByName("EventName");
+                var exceptionJson = (string)traceEvent.PayloadByName("ExceptionJson");
+                var argsJson = (string)traceEvent.PayloadByName("ArgumentsJson");
+
+                // There's a bug that causes some of the columns to get mixed up
+                if (eventName.StartsWith("{"))
+                {
+                    argsJson = exceptionJson;
+                    exceptionJson = eventName;
+                    eventName = null;
+                }
+
+                if (string.IsNullOrEmpty(argsJson))
+                {
+                    return;
+                }
+
+                Exception exception = null;
+
+                ILogger logger = _factory.CreateLogger(categoryName);
+                var scopes = new List<IDisposable>();
+
+                if (logActivities.TryGetValue(traceEvent.ActivityID, out var logActivityItem))
+                {
+                    // REVIEW: Does order matter here? We're combining everything anyways.
+                    while (logActivityItem != null)
+                    {
+                        scopes.Add(logger.BeginScope(logActivityItem.ScopedObject));
+
+                        logActivityItem = logActivityItem.Parent;
+                    }
+                }
+
+                try
+                {
+                    if (exceptionJson != "{}")
+                    {
+                        var exceptionMessage = JsonSerializer.Deserialize<JsonElement>(exceptionJson);
+                        exception = new LoggerException(exceptionMessage);
+                    }
+
+                    var message = JsonSerializer.Deserialize<JsonElement>(argsJson);
+                    if (message.TryGetProperty("{OriginalFormat}", out var formatElement))
+                    {
+                        var formatString = formatElement.GetString();
+                        var formatter = new LogValuesFormatter(formatString);
+                        object[] args = new object[formatter.ValueNames.Count];
+                        for (int i = 0; i < args.Length; i++)
+                        {
+                            args[i] = message.GetProperty(formatter.ValueNames[i]).GetString();
+                        }
+
+                        logger.Log(logLevel, new EventId(eventId, eventName), exception, formatString, args);
+                    }
+                    else
+                    {
+                        var obj = new LogObject(message, lastFormattedMessage);
+                        logger.Log(logLevel, new EventId(eventId, eventName), obj, exception, LogObject.Callback);
+                    }
+                }
+                catch (Exception)
+                {
+                }
+                finally
+                {
+                    scopes.ForEach(d => d.Dispose());
+                }
+            });
+
+            eventSource.Dynamic.AddCallbackForProviderEvent(LoggingSourceConfiguration.MicrosoftExtensionsLoggingProviderName, "FormattedMessage", (traceEvent) =>
+            {
+                // Level, FactoryID, LoggerName, EventID, EventName, FormattedMessage
+                var logLevel = (LogLevel)traceEvent.PayloadByName("Level");
+                var factoryId = (int)traceEvent.PayloadByName("FactoryID");
+                var categoryName = (string)traceEvent.PayloadByName("LoggerName");
+                var eventId = (int)traceEvent.PayloadByName("EventId");
+                var eventName = (string)traceEvent.PayloadByName("EventName");
+                var formattedMessage = (string)traceEvent.PayloadByName("FormattedMessage");
+
+                if (string.IsNullOrEmpty(formattedMessage))
+                {
+                    formattedMessage = eventName;
+                    eventName = string.Empty;
+                }
+
+                lastFormattedMessage = formattedMessage;
+            });
+
+            return Task.CompletedTask;
+        }
+
+        private class LogActivityItem
+        {
+            public Guid ActivityID { get; set; }
+
+            public LogObject ScopedObject { get; set; }
+
+            public LogActivityItem Parent { get; set; }
         }
     }
 }
diff --git a/src/Microsoft.Diagnostics.Monitoring.EventPipe/PipeMode.cs b/src/Microsoft.Diagnostics.Monitoring.EventPipe/PipeMode.cs
deleted file mode 100644 (file)
index ca2a4c0..0000000
+++ /dev/null
@@ -1,15 +0,0 @@
-// Licensed to the .NET Foundation under one or more agreements.
-// The .NET Foundation licenses this file to you under the MIT license.
-// See the LICENSE file in the project root for more information.
-
-namespace Microsoft.Diagnostics.Monitoring.EventPipe
-{
-    internal enum PipeMode
-    {
-        Logs = 1,
-        Metrics,
-        GCDump,
-        ProcessInfo,
-        Nettrace,
-    }
-}
index ca7b4230f96c17e2871b4685b88ab68b1aaf4490..bc4f72b4361e358177fc2b1007b863825d4846b9 100644 (file)
@@ -20,9 +20,45 @@ namespace Microsoft.Diagnostics.Monitoring.EventPipe
             _onCommandLine = onCommandLine ?? throw new ArgumentNullException(nameof(onCommandLine));
         }
 
-        internal override DiagnosticsEventPipeProcessor CreateProcessor() =>
-            new DiagnosticsEventPipeProcessor(
-                PipeMode.ProcessInfo,
-                processInfoCallback: _onCommandLine);
+        protected override MonitoringSourceConfiguration CreateConfiguration()
+        {
+            return new SampleProfilerConfiguration();
+        }
+
+        protected override async Task OnEventSourceAvailable(EventPipeEventSource eventSource, Func<Task> stopSessionAsync, CancellationToken token)
+        {
+            string commandLine = null;
+            Action<TraceEvent, Action> processInfoHandler = (TraceEvent traceEvent, Action taskComplete) =>
+            {
+                commandLine = (string)traceEvent.PayloadByName("CommandLine");
+                taskComplete();
+            };
+
+            // Completed when the ProcessInfo event of the Microsoft-DotNETCore-EventPipe event provider is handled
+            using var processInfoTaskSource = new EventTaskSource<Action<TraceEvent>>(
+                taskComplete => traceEvent => processInfoHandler(traceEvent, taskComplete),
+                handler => eventSource.Dynamic.AddCallbackForProviderEvent(MonitoringSourceConfiguration.EventPipeProviderName, "ProcessInfo", handler),
+                handler => eventSource.Dynamic.RemoveCallback(handler),
+                token);
+
+            // Completed when any trace event is handled
+            using var anyEventTaskSource = new EventTaskSource<Action<TraceEvent>>(
+                taskComplete => traceEvent => taskComplete(),
+                handler => eventSource.Dynamic.All += handler,
+                handler => eventSource.Dynamic.All -= handler,
+                token);
+
+            // Wait for any trace event to be processed
+            await anyEventTaskSource.Task;
+
+            // Stop the event pipe session
+            await stopSessionAsync();
+
+            // Wait for the ProcessInfo event to be processed
+            await processInfoTaskSource.Task;
+
+            // Notify of command line information
+            await _onCommandLine(commandLine, token);
+        }
     }
 }
\ No newline at end of file
index 68bd69cfc189beb8403687e22d38bff19bd81bde..dfea41b33e9c7874c1069349a539207020f23bcd 100644 (file)
@@ -4,26 +4,63 @@
 
 using Microsoft.Diagnostics.NETCore.Client;
 using System;
-using System.Collections.Generic;
 using System.IO;
-using System.Text;
 using System.Threading;
 using System.Threading.Tasks;
 
 namespace Microsoft.Diagnostics.Monitoring.EventPipe
 {
-    internal class EventTracePipeline : EventSourcePipeline<EventTracePipelineSettings>
+    internal class EventTracePipeline : Pipeline
     {
-        private readonly Func<Stream, CancellationToken, Task> _streamAvailable;
-        public EventTracePipeline(DiagnosticsClient client, EventTracePipelineSettings settings, Func<Stream, CancellationToken, Task> streamAvailable)
-            : base(client, settings)
+        private readonly Lazy<EventPipeStreamProvider> _provider;
+        private readonly Func<Stream, CancellationToken, Task> _onStreamAvailable;
+
+        public DiagnosticsClient Client { get; }
+        public EventTracePipelineSettings Settings { get; }
+
+        public EventTracePipeline(DiagnosticsClient client, EventTracePipelineSettings settings, Func<Stream, CancellationToken, Task> onStreamAvailable)
+        {
+            Client = client ?? throw new ArgumentNullException(nameof(client));
+            Settings = settings ?? throw new ArgumentNullException(nameof(settings));
+            _onStreamAvailable = onStreamAvailable ?? throw new ArgumentNullException(nameof(onStreamAvailable));
+            _provider = new Lazy<EventPipeStreamProvider>(CreateProvider);
+        }
+
+        protected override async Task OnRun(CancellationToken token)
+        {
+            try
+            {
+                Stream eventStream = await _provider.Value.ProcessEvents(Client, Settings.Duration, token);
+
+                await _onStreamAvailable(eventStream, token);
+            }
+            catch (InvalidOperationException e)
+            {
+                throw new PipelineException(e.Message, e);
+            }
+        }
+
+        protected override async Task OnCleanup()
+        {
+            if (_provider.IsValueCreated)
+            {
+                await _provider.Value.DisposeAsync();
+            }
+            await base.OnCleanup();
+        }
+
+        protected override Task OnStop(CancellationToken token)
         {
-            _streamAvailable = streamAvailable ?? throw new ArgumentNullException(nameof(streamAvailable));
+            if (_provider.IsValueCreated)
+            {
+                _provider.Value.StopProcessing();
+            }
+            return Task.CompletedTask;
         }
 
-        internal override DiagnosticsEventPipeProcessor CreateProcessor()
+        private EventPipeStreamProvider CreateProvider()
         {
-            return new DiagnosticsEventPipeProcessor(PipeMode.Nettrace, configuration: Settings.Configuration, onStreamAvailable: _streamAvailable);
+            return new EventPipeStreamProvider(Settings.Configuration);
         }
     }
 }
index dce4672cca260628489fd0ffb2d9399922faa41d..303cd1ac0ad3d14972b032594dd13a90e6315699 100644 (file)
@@ -82,7 +82,7 @@ namespace Microsoft.Diagnostics.Monitoring.RestServer
                         CounterGroups = counterGroups,
                         Duration = Timeout.InfiniteTimeSpan,
                         RefreshInterval = TimeSpan.FromSeconds(options.UpdateIntervalSeconds)
-                    }, metricsLogger: new[] { new MetricsLogger(_store.MetricsStore) });
+                    }, loggers: new[] { new MetricsLogger(_store.MetricsStore) });
 
                     using var linkedTokenSource = CancellationTokenSource.CreateLinkedTokenSource(stoppingToken, optionsTokenSource.Token);
                     await _counterPipeline.RunAsync(linkedTokenSource.Token);