dotnet-monitor: Port stop-on-event infrastructure (#3472)
authorJoe Schmitt <joschmit@microsoft.com>
Tue, 25 Oct 2022 20:29:02 +0000 (13:29 -0700)
committerGitHub <noreply@github.com>
Tue, 25 Oct 2022 20:29:02 +0000 (13:29 -0700)
src/Microsoft.Diagnostics.Monitoring.EventPipe/EventMonitor.cs [new file with mode: 0644]
src/Microsoft.Diagnostics.Monitoring.EventPipe/Microsoft.Diagnostics.Monitoring.EventPipe.csproj
src/Microsoft.Diagnostics.Monitoring.EventPipe/PassthroughStream.cs [new file with mode: 0644]
src/Microsoft.Diagnostics.Monitoring.EventPipe/StreamLeaveOpenWrapper.cs [new file with mode: 0644]

diff --git a/src/Microsoft.Diagnostics.Monitoring.EventPipe/EventMonitor.cs b/src/Microsoft.Diagnostics.Monitoring.EventPipe/EventMonitor.cs
new file mode 100644 (file)
index 0000000..3ade330
--- /dev/null
@@ -0,0 +1,192 @@
+// Licensed to the .NET Foundation under one or more agreements.
+// The .NET Foundation licenses this file to you under the MIT license.
+// See the LICENSE file in the project root for more information.
+
+using Microsoft.Diagnostics.Tracing;
+using System;
+using System.Collections.Generic;
+using System.Globalization;
+using System.IO;
+using System.Threading;
+using System.Threading.Tasks;
+
+namespace Microsoft.Diagnostics.Monitoring.EventPipe
+{
+    /// <summary>
+    /// A stream that can monitor an event stream which is compatible with <see cref="EventPipeEventSource"/> for a specific event.
+    /// </summary>
+    internal sealed class EventMonitor : IAsyncDisposable
+    {
+        private readonly Action<TraceEvent> _onPayloadFilterMismatch;
+        private readonly Action<TraceEvent> _onEvent;
+        private readonly bool _callOnEventOnlyOnce;
+
+        private readonly Stream _eventStream;
+        private readonly bool _leaveEventStreamOpen;
+        private EventPipeEventSource _eventSource;
+
+        private readonly string _providerName;
+        private readonly string _eventName;
+
+        // The original payload filter of fieldName->fieldValue specified by the user. It will only be used to hydrate _payloadFilterIndexCache.
+        private readonly IDictionary<string, string> _payloadFilter;
+
+        // This tracks the exact indices into the provided event's payload to check for the expected values instead
+        // of repeatedly searching the payload for the field names in _payloadFilter.
+        private Dictionary<int, string> _payloadFilterIndexCache;
+
+        /// <summary>
+        /// A stream that can monitor an event stream which is compatible with <see cref="EventPipeEventSource"/> for a specific event.
+        /// </summary>
+        /// <param name="providerName">The event provider name.</param>
+        /// <param name="eventName">The event name, which is the concatenation of the task name and opcode name, if set. <see cref="TraceEvent.EventName"/> for more information about the format.</param>
+        /// <param name="payloadFilter">A mapping of the event payload field names to their expected values. A subset of the payload fields may be specified.</param>
+        /// <param name="onEvent">A callback that will be invoked each time the requested event has been observed.</param>
+        /// <param name="onPayloadFilterMismatch">A callback that will be invoked if the field names specified in <paramref name="payloadFilter"/> do not match those in the event's manifest.</param>
+        /// <param name="eventStream">The source event stream which is compatible with <see cref="EventPipeEventSource"/>.</param>
+        /// <param name="callOnEventOnlyOnce">If true, the provided <paramref name="onEvent"/> will only be called for the first matching event.</param>
+        /// <param name="leaveEventStreamOpen">If true, the provided <paramref name="eventStream"/> will not be automatically closed when this object is disposed.</param>
+        public EventMonitor(
+            string providerName,
+            string eventName,
+            IDictionary<string, string> payloadFilter,
+            Action<TraceEvent> onEvent,
+            Action<TraceEvent> onPayloadFilterMismatch,
+            Stream eventStream,
+            bool callOnEventOnlyOnce,
+            bool leaveEventStreamOpen = false) : base()
+        {
+            _providerName = providerName;
+            _eventName = eventName;
+            _onEvent = onEvent;
+            _onPayloadFilterMismatch = onPayloadFilterMismatch;
+            _eventStream = eventStream;
+            _payloadFilter = payloadFilter;
+            _callOnEventOnlyOnce = callOnEventOnlyOnce;
+            _leaveEventStreamOpen = leaveEventStreamOpen;
+        }
+
+        /// <summary>
+        /// Start processing the event stream, monitoring it for the requested event.
+        /// This will continue to run until the event stream is complete or a stop is requested, regardless of if the specified event has been observed.
+        /// </summary>
+        /// <param name="token">The cancellation token.</param>
+        /// <returns></returns>
+        public Task ProcessAsync(CancellationToken token)
+        {
+            return Task.Run(() =>
+            {
+                _eventSource = new EventPipeEventSource(_eventStream);
+                token.ThrowIfCancellationRequested();
+                using IDisposable registration = token.Register(() => _eventSource.Dispose());
+
+                _eventSource.Dynamic.AddCallbackForProviderEvent(_providerName, _eventName, TraceEventCallback);
+
+                _eventSource.Process();
+                token.ThrowIfCancellationRequested();
+            }, token);
+        }
+
+        /// <summary>
+        /// Stops monitoring for the specified event.
+        /// The event stream will continue to be processed until it is complete or <see cref="DisposeAsync"/> is called.
+        /// </summary>
+        private void StopMonitoringForEvent()
+        {
+            _eventSource?.Dynamic.RemoveCallback<TraceEvent>(TraceEventCallback);
+        }
+
+        private void TraceEventCallback(TraceEvent obj)
+        {
+            if (_payloadFilterIndexCache == null && !HydratePayloadFilterCache(obj))
+            {
+                // The payload filter doesn't map onto the actual data so we will never match the event.
+                StopMonitoringForEvent();
+                _onPayloadFilterMismatch(obj);
+                return;
+            }
+
+            if (!DoesPayloadMatch(obj))
+            {
+                return;
+            }
+
+            if (_callOnEventOnlyOnce)
+            {
+                StopMonitoringForEvent();
+            }
+
+            _onEvent(obj);
+        }
+
+        /// <summary>
+        /// Hydrates the payload filter cache.
+        /// </summary>
+        /// <param name="obj">An instance of the specified event (matching provider, task name, and opcode), but without checking the payload yet.</param>
+        /// <returns></returns>
+        private bool HydratePayloadFilterCache(TraceEvent obj)
+        {
+            if (_payloadFilterIndexCache != null)
+            {
+                return true;
+            }
+
+            // If there's no payload filter, there's nothing to do.
+            if (_payloadFilter == null || _payloadFilter.Count == 0)
+            {
+                _payloadFilterIndexCache = new Dictionary<int, string>(capacity: 0);
+                return true;
+            }
+
+            // If the payload has fewer fields than the requested filter, we can never match it.
+            // NOTE: this function will only ever be called with an instance of the specified event
+            // (matching provider, task name, and opcode) but without checking the payload yet.
+            if (obj.PayloadNames.Length < _payloadFilter.Count)
+            {
+                return false;
+            }
+
+            Dictionary<int, string> payloadFilterCache = new(capacity: _payloadFilter.Count);
+            for (int i = 0; (i < obj.PayloadNames.Length) && (payloadFilterCache.Count < _payloadFilter.Count); i++)
+            {
+                if (_payloadFilter.TryGetValue(obj.PayloadNames[i], out string expectedPayloadValue))
+                {
+                    payloadFilterCache.Add(i, expectedPayloadValue);
+                }
+            }
+
+            // Check if one or more of the requested filter field names did not exist on the actual payload.
+            if (_payloadFilter.Count != payloadFilterCache.Count)
+            {
+                return false;
+            }
+
+            _payloadFilterIndexCache = payloadFilterCache;
+
+            return true;
+        }
+
+        private bool DoesPayloadMatch(TraceEvent obj)
+        {
+            foreach (var (fieldIndex, expectedValue) in _payloadFilterIndexCache)
+            {
+                string fieldValue = Convert.ToString(obj.PayloadValue(fieldIndex), CultureInfo.InvariantCulture) ?? string.Empty;
+                if (!string.Equals(fieldValue, expectedValue, StringComparison.Ordinal))
+                {
+                    return false;
+                }
+            }
+
+            return true;
+        }
+
+        public async ValueTask DisposeAsync()
+        {
+            _eventSource?.Dispose();
+            if (!_leaveEventStreamOpen)
+            {
+                await _eventStream.DisposeAsync();
+            }
+        }
+    }
+}
index 782d3f3b4e75662c6e23bb74b32ea8fd49b45a70..19fce8c29629a0470d9ce89be0311c5cac96e6ba 100644 (file)
@@ -1,7 +1,7 @@
 <Project Sdk="Microsoft.NET.Sdk">
 
   <PropertyGroup>
-    <TargetFramework>netstandard2.0</TargetFramework>
+    <TargetFramework>netstandard2.1</TargetFramework>
     <NoWarn>;1591;1701</NoWarn>
     <Description>EventPipe package for collecting logs, metrics, and gcdumps.</Description>
     <!-- Tentatively create package so other teams can tentatively consume. -->
diff --git a/src/Microsoft.Diagnostics.Monitoring.EventPipe/PassthroughStream.cs b/src/Microsoft.Diagnostics.Monitoring.EventPipe/PassthroughStream.cs
new file mode 100644 (file)
index 0000000..b37b11e
--- /dev/null
@@ -0,0 +1,103 @@
+// Licensed to the .NET Foundation under one or more agreements.
+// The .NET Foundation licenses this file to you under the MIT license.
+// See the LICENSE file in the project root for more information.
+
+using System;
+using System.IO;
+using System.Threading;
+using System.Threading.Tasks;
+
+namespace Microsoft.Diagnostics.Monitoring.EventPipe
+{
+    /// <summary>
+    /// A read-only stream that passes data as it is read to another stream.
+    /// </summary>
+    internal sealed class PassthroughStream : Stream
+    {
+        private readonly Stream _sourceStream;
+        private readonly Stream _destinationStream;
+
+        /// <summary>
+        /// A read-only stream that passes data as it is read to another stream.
+        /// </summary>
+        /// <param name="sourceStream">The source stream that data will be read from.</param>
+        /// <param name="destinationStream">The destination stream to pass read data to. It must either be full duplex or be write-only.</param>
+        /// <param name="bufferSize">The size of the buffer to use when writing to the <paramref name="destinationStream"/>.</param>
+        /// <param name="leaveDestinationStreamOpen">If true, the provided <paramref name="destinationStream"/> will not be automatically closed when this object is disposed.</param>
+        public PassthroughStream(
+            Stream sourceStream,
+            Stream destinationStream,
+            int bufferSize,
+            bool leaveDestinationStreamOpen = false) : base()
+        {
+            _sourceStream = sourceStream;
+
+            // Wrap a buffered stream around the destination stream to avoid
+            // slowing down the data passthrough unless there is significant pressure.
+            _destinationStream = new BufferedStream(
+                leaveDestinationStreamOpen
+                    ? new StreamLeaveOpenWrapper(destinationStream)
+                    : destinationStream,
+                bufferSize);
+        }
+
+        public override int Read(byte[] buffer, int offset, int count)
+        {
+            return Read(buffer.AsSpan(offset, count));
+        }
+
+        public override int Read(Span<byte> buffer)
+        {
+            int bytesRead = _sourceStream.Read(buffer);
+            if (bytesRead != 0)
+            {
+                _destinationStream.Write(buffer[..bytesRead]);
+            }
+
+            return bytesRead;
+        }
+
+        public override Task<int> ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
+        {
+            return ReadAsync(buffer.AsMemory(offset, count), cancellationToken).AsTask();
+        }
+
+        public override async ValueTask<int> ReadAsync(Memory<byte> buffer, CancellationToken cancellationToken = default)
+        {
+            int bytesRead = await _sourceStream.ReadAsync(buffer, cancellationToken);
+            if (bytesRead != 0)
+            {
+                await _destinationStream.WriteAsync(buffer[..bytesRead], cancellationToken);
+            }
+
+            return bytesRead;
+        }
+
+        public override bool CanRead => true;
+        public override bool CanSeek => false;
+        public override bool CanWrite => false;
+
+        public override bool CanTimeout => _sourceStream.CanTimeout;
+        public override long Length => _sourceStream.Length;
+
+        public override long Position { get => _sourceStream.Position; set => throw new NotSupportedException(); }
+        public override int ReadTimeout { get => _sourceStream.ReadTimeout; set => _sourceStream.ReadTimeout = value; }
+
+        public override long Seek(long offset, SeekOrigin origin) => throw new NotSupportedException();
+        public override void SetLength(long value) => throw new NotSupportedException();
+        public override void Write(byte[] buffer, int offset, int count) => throw new NotSupportedException();
+
+        public override void CopyTo(Stream destination, int bufferSize) => throw new NotSupportedException();
+        public override Task CopyToAsync(Stream destination, int bufferSize, CancellationToken cancellationToken) => throw new NotSupportedException();
+
+        public override void Flush() => _destinationStream.Flush();
+        public override Task FlushAsync(CancellationToken cancellationToken) => _destinationStream.FlushAsync(cancellationToken);
+
+        public override async ValueTask DisposeAsync()
+        {
+            await _sourceStream.DisposeAsync();
+            await _destinationStream.DisposeAsync();
+            await base.DisposeAsync();
+        }
+    }
+}
diff --git a/src/Microsoft.Diagnostics.Monitoring.EventPipe/StreamLeaveOpenWrapper.cs b/src/Microsoft.Diagnostics.Monitoring.EventPipe/StreamLeaveOpenWrapper.cs
new file mode 100644 (file)
index 0000000..b9e10af
--- /dev/null
@@ -0,0 +1,74 @@
+// Licensed to the .NET Foundation under one or more agreements.
+// The .NET Foundation licenses this file to you under the MIT license.
+// See the LICENSE file in the project root for more information.
+
+using System;
+using System.IO;
+using System.Threading;
+using System.Threading.Tasks;
+
+namespace Microsoft.Diagnostics.Monitoring.EventPipe
+{
+    /// <summary>
+    /// Wraps a given stream but leaves it open on Dispose.
+    /// </summary>
+    internal sealed class StreamLeaveOpenWrapper : Stream
+    {
+        private readonly Stream _baseStream;
+
+        public StreamLeaveOpenWrapper(Stream baseStream) : base()
+        {
+            _baseStream = baseStream;
+        }
+
+        public override bool CanSeek => _baseStream.CanSeek;
+
+        public override bool CanTimeout => _baseStream.CanTimeout;
+
+        public override bool CanRead => _baseStream.CanRead;
+
+        public override bool CanWrite => _baseStream.CanWrite;
+
+        public override long Length => _baseStream.Length;
+
+        public override long Position { get => _baseStream.Position; set => _baseStream.Position = value; }
+
+        public override int ReadTimeout { get => _baseStream.ReadTimeout; set => _baseStream.ReadTimeout = value; }
+
+        public override int WriteTimeout { get => _baseStream.WriteTimeout; set => _baseStream.WriteTimeout = value; }
+
+        public override long Seek(long offset, SeekOrigin origin) => _baseStream.Seek(offset, origin);
+
+        public override int Read(Span<byte> buffer) => _baseStream.Read(buffer);
+
+        public override int Read(byte[] buffer, int offset, int count) => _baseStream.Read(buffer, offset, count);
+
+        public override int ReadByte() => _baseStream.ReadByte();
+
+        public override Task<int> ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) => _baseStream.ReadAsync(buffer, offset, count, cancellationToken);
+
+        public override ValueTask<int> ReadAsync(Memory<byte> buffer, CancellationToken cancellationToken = default) => _baseStream.ReadAsync(buffer, cancellationToken);
+
+        public override void Flush() => _baseStream.Flush();
+
+        public override void SetLength(long value) => _baseStream.SetLength(value);
+
+        public override void Write(byte[] buffer, int offset, int count) => _baseStream.Write(buffer, offset, count);
+
+        public override void Write(ReadOnlySpan<byte> buffer) => _baseStream.Write(buffer);
+
+        public override Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) => _baseStream.WriteAsync(buffer, offset, count, cancellationToken);
+
+        public override ValueTask WriteAsync(ReadOnlyMemory<byte> buffer, CancellationToken cancellationToken = default) => _baseStream.WriteAsync(buffer, cancellationToken);
+
+        public override void WriteByte(byte value) => _baseStream.WriteByte(value);
+
+        public override Task FlushAsync(CancellationToken cancellationToken) => _baseStream.FlushAsync(cancellationToken);
+
+        public override void CopyTo(Stream destination, int bufferSize) => _baseStream.CopyTo(destination, bufferSize);
+
+        public override Task CopyToAsync(Stream destination, int bufferSize, CancellationToken cancellationToken) => _baseStream.CopyToAsync(destination, bufferSize, cancellationToken);
+
+        public override async ValueTask DisposeAsync() => await base.DisposeAsync();
+    }
+}