From: Joe Schmitt Date: Tue, 25 Oct 2022 20:29:02 +0000 (-0700) Subject: dotnet-monitor: Port stop-on-event infrastructure (#3472) X-Git-Tag: accepted/tizen/unified/riscv/20231226.055542~45^2^2~37 X-Git-Url: http://review.tizen.org/git/?a=commitdiff_plain;h=b229e8d0044dc5629b33add9e4ec6ee5a4018563;p=platform%2Fcore%2Fdotnet%2Fdiagnostics.git dotnet-monitor: Port stop-on-event infrastructure (#3472) --- diff --git a/src/Microsoft.Diagnostics.Monitoring.EventPipe/EventMonitor.cs b/src/Microsoft.Diagnostics.Monitoring.EventPipe/EventMonitor.cs new file mode 100644 index 000000000..3ade33082 --- /dev/null +++ b/src/Microsoft.Diagnostics.Monitoring.EventPipe/EventMonitor.cs @@ -0,0 +1,192 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT license. +// See the LICENSE file in the project root for more information. + +using Microsoft.Diagnostics.Tracing; +using System; +using System.Collections.Generic; +using System.Globalization; +using System.IO; +using System.Threading; +using System.Threading.Tasks; + +namespace Microsoft.Diagnostics.Monitoring.EventPipe +{ + /// + /// A stream that can monitor an event stream which is compatible with for a specific event. + /// + internal sealed class EventMonitor : IAsyncDisposable + { + private readonly Action _onPayloadFilterMismatch; + private readonly Action _onEvent; + private readonly bool _callOnEventOnlyOnce; + + private readonly Stream _eventStream; + private readonly bool _leaveEventStreamOpen; + private EventPipeEventSource _eventSource; + + private readonly string _providerName; + private readonly string _eventName; + + // The original payload filter of fieldName->fieldValue specified by the user. It will only be used to hydrate _payloadFilterIndexCache. + private readonly IDictionary _payloadFilter; + + // This tracks the exact indices into the provided event's payload to check for the expected values instead + // of repeatedly searching the payload for the field names in _payloadFilter. + private Dictionary _payloadFilterIndexCache; + + /// + /// A stream that can monitor an event stream which is compatible with for a specific event. + /// + /// The event provider name. + /// The event name, which is the concatenation of the task name and opcode name, if set. for more information about the format. + /// A mapping of the event payload field names to their expected values. A subset of the payload fields may be specified. + /// A callback that will be invoked each time the requested event has been observed. + /// A callback that will be invoked if the field names specified in do not match those in the event's manifest. + /// The source event stream which is compatible with . + /// If true, the provided will only be called for the first matching event. + /// If true, the provided will not be automatically closed when this object is disposed. + public EventMonitor( + string providerName, + string eventName, + IDictionary payloadFilter, + Action onEvent, + Action onPayloadFilterMismatch, + Stream eventStream, + bool callOnEventOnlyOnce, + bool leaveEventStreamOpen = false) : base() + { + _providerName = providerName; + _eventName = eventName; + _onEvent = onEvent; + _onPayloadFilterMismatch = onPayloadFilterMismatch; + _eventStream = eventStream; + _payloadFilter = payloadFilter; + _callOnEventOnlyOnce = callOnEventOnlyOnce; + _leaveEventStreamOpen = leaveEventStreamOpen; + } + + /// + /// Start processing the event stream, monitoring it for the requested event. + /// This will continue to run until the event stream is complete or a stop is requested, regardless of if the specified event has been observed. + /// + /// The cancellation token. + /// + public Task ProcessAsync(CancellationToken token) + { + return Task.Run(() => + { + _eventSource = new EventPipeEventSource(_eventStream); + token.ThrowIfCancellationRequested(); + using IDisposable registration = token.Register(() => _eventSource.Dispose()); + + _eventSource.Dynamic.AddCallbackForProviderEvent(_providerName, _eventName, TraceEventCallback); + + _eventSource.Process(); + token.ThrowIfCancellationRequested(); + }, token); + } + + /// + /// Stops monitoring for the specified event. + /// The event stream will continue to be processed until it is complete or is called. + /// + private void StopMonitoringForEvent() + { + _eventSource?.Dynamic.RemoveCallback(TraceEventCallback); + } + + private void TraceEventCallback(TraceEvent obj) + { + if (_payloadFilterIndexCache == null && !HydratePayloadFilterCache(obj)) + { + // The payload filter doesn't map onto the actual data so we will never match the event. + StopMonitoringForEvent(); + _onPayloadFilterMismatch(obj); + return; + } + + if (!DoesPayloadMatch(obj)) + { + return; + } + + if (_callOnEventOnlyOnce) + { + StopMonitoringForEvent(); + } + + _onEvent(obj); + } + + /// + /// Hydrates the payload filter cache. + /// + /// An instance of the specified event (matching provider, task name, and opcode), but without checking the payload yet. + /// + private bool HydratePayloadFilterCache(TraceEvent obj) + { + if (_payloadFilterIndexCache != null) + { + return true; + } + + // If there's no payload filter, there's nothing to do. + if (_payloadFilter == null || _payloadFilter.Count == 0) + { + _payloadFilterIndexCache = new Dictionary(capacity: 0); + return true; + } + + // If the payload has fewer fields than the requested filter, we can never match it. + // NOTE: this function will only ever be called with an instance of the specified event + // (matching provider, task name, and opcode) but without checking the payload yet. + if (obj.PayloadNames.Length < _payloadFilter.Count) + { + return false; + } + + Dictionary payloadFilterCache = new(capacity: _payloadFilter.Count); + for (int i = 0; (i < obj.PayloadNames.Length) && (payloadFilterCache.Count < _payloadFilter.Count); i++) + { + if (_payloadFilter.TryGetValue(obj.PayloadNames[i], out string expectedPayloadValue)) + { + payloadFilterCache.Add(i, expectedPayloadValue); + } + } + + // Check if one or more of the requested filter field names did not exist on the actual payload. + if (_payloadFilter.Count != payloadFilterCache.Count) + { + return false; + } + + _payloadFilterIndexCache = payloadFilterCache; + + return true; + } + + private bool DoesPayloadMatch(TraceEvent obj) + { + foreach (var (fieldIndex, expectedValue) in _payloadFilterIndexCache) + { + string fieldValue = Convert.ToString(obj.PayloadValue(fieldIndex), CultureInfo.InvariantCulture) ?? string.Empty; + if (!string.Equals(fieldValue, expectedValue, StringComparison.Ordinal)) + { + return false; + } + } + + return true; + } + + public async ValueTask DisposeAsync() + { + _eventSource?.Dispose(); + if (!_leaveEventStreamOpen) + { + await _eventStream.DisposeAsync(); + } + } + } +} diff --git a/src/Microsoft.Diagnostics.Monitoring.EventPipe/Microsoft.Diagnostics.Monitoring.EventPipe.csproj b/src/Microsoft.Diagnostics.Monitoring.EventPipe/Microsoft.Diagnostics.Monitoring.EventPipe.csproj index 782d3f3b4..19fce8c29 100644 --- a/src/Microsoft.Diagnostics.Monitoring.EventPipe/Microsoft.Diagnostics.Monitoring.EventPipe.csproj +++ b/src/Microsoft.Diagnostics.Monitoring.EventPipe/Microsoft.Diagnostics.Monitoring.EventPipe.csproj @@ -1,7 +1,7 @@  - netstandard2.0 + netstandard2.1 ;1591;1701 EventPipe package for collecting logs, metrics, and gcdumps. diff --git a/src/Microsoft.Diagnostics.Monitoring.EventPipe/PassthroughStream.cs b/src/Microsoft.Diagnostics.Monitoring.EventPipe/PassthroughStream.cs new file mode 100644 index 000000000..b37b11ef5 --- /dev/null +++ b/src/Microsoft.Diagnostics.Monitoring.EventPipe/PassthroughStream.cs @@ -0,0 +1,103 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT license. +// See the LICENSE file in the project root for more information. + +using System; +using System.IO; +using System.Threading; +using System.Threading.Tasks; + +namespace Microsoft.Diagnostics.Monitoring.EventPipe +{ + /// + /// A read-only stream that passes data as it is read to another stream. + /// + internal sealed class PassthroughStream : Stream + { + private readonly Stream _sourceStream; + private readonly Stream _destinationStream; + + /// + /// A read-only stream that passes data as it is read to another stream. + /// + /// The source stream that data will be read from. + /// The destination stream to pass read data to. It must either be full duplex or be write-only. + /// The size of the buffer to use when writing to the . + /// If true, the provided will not be automatically closed when this object is disposed. + public PassthroughStream( + Stream sourceStream, + Stream destinationStream, + int bufferSize, + bool leaveDestinationStreamOpen = false) : base() + { + _sourceStream = sourceStream; + + // Wrap a buffered stream around the destination stream to avoid + // slowing down the data passthrough unless there is significant pressure. + _destinationStream = new BufferedStream( + leaveDestinationStreamOpen + ? new StreamLeaveOpenWrapper(destinationStream) + : destinationStream, + bufferSize); + } + + public override int Read(byte[] buffer, int offset, int count) + { + return Read(buffer.AsSpan(offset, count)); + } + + public override int Read(Span buffer) + { + int bytesRead = _sourceStream.Read(buffer); + if (bytesRead != 0) + { + _destinationStream.Write(buffer[..bytesRead]); + } + + return bytesRead; + } + + public override Task ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) + { + return ReadAsync(buffer.AsMemory(offset, count), cancellationToken).AsTask(); + } + + public override async ValueTask ReadAsync(Memory buffer, CancellationToken cancellationToken = default) + { + int bytesRead = await _sourceStream.ReadAsync(buffer, cancellationToken); + if (bytesRead != 0) + { + await _destinationStream.WriteAsync(buffer[..bytesRead], cancellationToken); + } + + return bytesRead; + } + + public override bool CanRead => true; + public override bool CanSeek => false; + public override bool CanWrite => false; + + public override bool CanTimeout => _sourceStream.CanTimeout; + public override long Length => _sourceStream.Length; + + public override long Position { get => _sourceStream.Position; set => throw new NotSupportedException(); } + public override int ReadTimeout { get => _sourceStream.ReadTimeout; set => _sourceStream.ReadTimeout = value; } + + public override long Seek(long offset, SeekOrigin origin) => throw new NotSupportedException(); + public override void SetLength(long value) => throw new NotSupportedException(); + public override void Write(byte[] buffer, int offset, int count) => throw new NotSupportedException(); + + public override void CopyTo(Stream destination, int bufferSize) => throw new NotSupportedException(); + public override Task CopyToAsync(Stream destination, int bufferSize, CancellationToken cancellationToken) => throw new NotSupportedException(); + + public override void Flush() => _destinationStream.Flush(); + public override Task FlushAsync(CancellationToken cancellationToken) => _destinationStream.FlushAsync(cancellationToken); + + public override async ValueTask DisposeAsync() + { + await _sourceStream.DisposeAsync(); + await _destinationStream.DisposeAsync(); + await base.DisposeAsync(); + } + } +} diff --git a/src/Microsoft.Diagnostics.Monitoring.EventPipe/StreamLeaveOpenWrapper.cs b/src/Microsoft.Diagnostics.Monitoring.EventPipe/StreamLeaveOpenWrapper.cs new file mode 100644 index 000000000..b9e10afcb --- /dev/null +++ b/src/Microsoft.Diagnostics.Monitoring.EventPipe/StreamLeaveOpenWrapper.cs @@ -0,0 +1,74 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT license. +// See the LICENSE file in the project root for more information. + +using System; +using System.IO; +using System.Threading; +using System.Threading.Tasks; + +namespace Microsoft.Diagnostics.Monitoring.EventPipe +{ + /// + /// Wraps a given stream but leaves it open on Dispose. + /// + internal sealed class StreamLeaveOpenWrapper : Stream + { + private readonly Stream _baseStream; + + public StreamLeaveOpenWrapper(Stream baseStream) : base() + { + _baseStream = baseStream; + } + + public override bool CanSeek => _baseStream.CanSeek; + + public override bool CanTimeout => _baseStream.CanTimeout; + + public override bool CanRead => _baseStream.CanRead; + + public override bool CanWrite => _baseStream.CanWrite; + + public override long Length => _baseStream.Length; + + public override long Position { get => _baseStream.Position; set => _baseStream.Position = value; } + + public override int ReadTimeout { get => _baseStream.ReadTimeout; set => _baseStream.ReadTimeout = value; } + + public override int WriteTimeout { get => _baseStream.WriteTimeout; set => _baseStream.WriteTimeout = value; } + + public override long Seek(long offset, SeekOrigin origin) => _baseStream.Seek(offset, origin); + + public override int Read(Span buffer) => _baseStream.Read(buffer); + + public override int Read(byte[] buffer, int offset, int count) => _baseStream.Read(buffer, offset, count); + + public override int ReadByte() => _baseStream.ReadByte(); + + public override Task ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) => _baseStream.ReadAsync(buffer, offset, count, cancellationToken); + + public override ValueTask ReadAsync(Memory buffer, CancellationToken cancellationToken = default) => _baseStream.ReadAsync(buffer, cancellationToken); + + public override void Flush() => _baseStream.Flush(); + + public override void SetLength(long value) => _baseStream.SetLength(value); + + public override void Write(byte[] buffer, int offset, int count) => _baseStream.Write(buffer, offset, count); + + public override void Write(ReadOnlySpan buffer) => _baseStream.Write(buffer); + + public override Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) => _baseStream.WriteAsync(buffer, offset, count, cancellationToken); + + public override ValueTask WriteAsync(ReadOnlyMemory buffer, CancellationToken cancellationToken = default) => _baseStream.WriteAsync(buffer, cancellationToken); + + public override void WriteByte(byte value) => _baseStream.WriteByte(value); + + public override Task FlushAsync(CancellationToken cancellationToken) => _baseStream.FlushAsync(cancellationToken); + + public override void CopyTo(Stream destination, int bufferSize) => _baseStream.CopyTo(destination, bufferSize); + + public override Task CopyToAsync(Stream destination, int bufferSize, CancellationToken cancellationToken) => _baseStream.CopyToAsync(destination, bufferSize, cancellationToken); + + public override async ValueTask DisposeAsync() => await base.DisposeAsync(); + } +}