--- /dev/null
+// Licensed to the .NET Foundation under one or more agreements.
+// The .NET Foundation licenses this file to you under the MIT license.
+// See the LICENSE file in the project root for more information.
+
+using Microsoft.Diagnostics.Tracing;
+using System;
+using System.Collections.Generic;
+using System.Globalization;
+using System.IO;
+using System.Threading;
+using System.Threading.Tasks;
+
+namespace Microsoft.Diagnostics.Monitoring.EventPipe
+{
+ /// <summary>
+ /// A stream that can monitor an event stream which is compatible with <see cref="EventPipeEventSource"/> for a specific event.
+ /// </summary>
+ internal sealed class EventMonitor : IAsyncDisposable
+ {
+ private readonly Action<TraceEvent> _onPayloadFilterMismatch;
+ private readonly Action<TraceEvent> _onEvent;
+ private readonly bool _callOnEventOnlyOnce;
+
+ private readonly Stream _eventStream;
+ private readonly bool _leaveEventStreamOpen;
+ private EventPipeEventSource _eventSource;
+
+ private readonly string _providerName;
+ private readonly string _eventName;
+
+ // The original payload filter of fieldName->fieldValue specified by the user. It will only be used to hydrate _payloadFilterIndexCache.
+ private readonly IDictionary<string, string> _payloadFilter;
+
+ // This tracks the exact indices into the provided event's payload to check for the expected values instead
+ // of repeatedly searching the payload for the field names in _payloadFilter.
+ private Dictionary<int, string> _payloadFilterIndexCache;
+
+ /// <summary>
+ /// A stream that can monitor an event stream which is compatible with <see cref="EventPipeEventSource"/> for a specific event.
+ /// </summary>
+ /// <param name="providerName">The event provider name.</param>
+ /// <param name="eventName">The event name, which is the concatenation of the task name and opcode name, if set. <see cref="TraceEvent.EventName"/> for more information about the format.</param>
+ /// <param name="payloadFilter">A mapping of the event payload field names to their expected values. A subset of the payload fields may be specified.</param>
+ /// <param name="onEvent">A callback that will be invoked each time the requested event has been observed.</param>
+ /// <param name="onPayloadFilterMismatch">A callback that will be invoked if the field names specified in <paramref name="payloadFilter"/> do not match those in the event's manifest.</param>
+ /// <param name="eventStream">The source event stream which is compatible with <see cref="EventPipeEventSource"/>.</param>
+ /// <param name="callOnEventOnlyOnce">If true, the provided <paramref name="onEvent"/> will only be called for the first matching event.</param>
+ /// <param name="leaveEventStreamOpen">If true, the provided <paramref name="eventStream"/> will not be automatically closed when this object is disposed.</param>
+ public EventMonitor(
+ string providerName,
+ string eventName,
+ IDictionary<string, string> payloadFilter,
+ Action<TraceEvent> onEvent,
+ Action<TraceEvent> onPayloadFilterMismatch,
+ Stream eventStream,
+ bool callOnEventOnlyOnce,
+ bool leaveEventStreamOpen = false) : base()
+ {
+ _providerName = providerName;
+ _eventName = eventName;
+ _onEvent = onEvent;
+ _onPayloadFilterMismatch = onPayloadFilterMismatch;
+ _eventStream = eventStream;
+ _payloadFilter = payloadFilter;
+ _callOnEventOnlyOnce = callOnEventOnlyOnce;
+ _leaveEventStreamOpen = leaveEventStreamOpen;
+ }
+
+ /// <summary>
+ /// Start processing the event stream, monitoring it for the requested event.
+ /// This will continue to run until the event stream is complete or a stop is requested, regardless of if the specified event has been observed.
+ /// </summary>
+ /// <param name="token">The cancellation token.</param>
+ /// <returns></returns>
+ public Task ProcessAsync(CancellationToken token)
+ {
+ return Task.Run(() =>
+ {
+ _eventSource = new EventPipeEventSource(_eventStream);
+ token.ThrowIfCancellationRequested();
+ using IDisposable registration = token.Register(() => _eventSource.Dispose());
+
+ _eventSource.Dynamic.AddCallbackForProviderEvent(_providerName, _eventName, TraceEventCallback);
+
+ _eventSource.Process();
+ token.ThrowIfCancellationRequested();
+ }, token);
+ }
+
+ /// <summary>
+ /// Stops monitoring for the specified event.
+ /// The event stream will continue to be processed until it is complete or <see cref="DisposeAsync"/> is called.
+ /// </summary>
+ private void StopMonitoringForEvent()
+ {
+ _eventSource?.Dynamic.RemoveCallback<TraceEvent>(TraceEventCallback);
+ }
+
+ private void TraceEventCallback(TraceEvent obj)
+ {
+ if (_payloadFilterIndexCache == null && !HydratePayloadFilterCache(obj))
+ {
+ // The payload filter doesn't map onto the actual data so we will never match the event.
+ StopMonitoringForEvent();
+ _onPayloadFilterMismatch(obj);
+ return;
+ }
+
+ if (!DoesPayloadMatch(obj))
+ {
+ return;
+ }
+
+ if (_callOnEventOnlyOnce)
+ {
+ StopMonitoringForEvent();
+ }
+
+ _onEvent(obj);
+ }
+
+ /// <summary>
+ /// Hydrates the payload filter cache.
+ /// </summary>
+ /// <param name="obj">An instance of the specified event (matching provider, task name, and opcode), but without checking the payload yet.</param>
+ /// <returns></returns>
+ private bool HydratePayloadFilterCache(TraceEvent obj)
+ {
+ if (_payloadFilterIndexCache != null)
+ {
+ return true;
+ }
+
+ // If there's no payload filter, there's nothing to do.
+ if (_payloadFilter == null || _payloadFilter.Count == 0)
+ {
+ _payloadFilterIndexCache = new Dictionary<int, string>(capacity: 0);
+ return true;
+ }
+
+ // If the payload has fewer fields than the requested filter, we can never match it.
+ // NOTE: this function will only ever be called with an instance of the specified event
+ // (matching provider, task name, and opcode) but without checking the payload yet.
+ if (obj.PayloadNames.Length < _payloadFilter.Count)
+ {
+ return false;
+ }
+
+ Dictionary<int, string> payloadFilterCache = new(capacity: _payloadFilter.Count);
+ for (int i = 0; (i < obj.PayloadNames.Length) && (payloadFilterCache.Count < _payloadFilter.Count); i++)
+ {
+ if (_payloadFilter.TryGetValue(obj.PayloadNames[i], out string expectedPayloadValue))
+ {
+ payloadFilterCache.Add(i, expectedPayloadValue);
+ }
+ }
+
+ // Check if one or more of the requested filter field names did not exist on the actual payload.
+ if (_payloadFilter.Count != payloadFilterCache.Count)
+ {
+ return false;
+ }
+
+ _payloadFilterIndexCache = payloadFilterCache;
+
+ return true;
+ }
+
+ private bool DoesPayloadMatch(TraceEvent obj)
+ {
+ foreach (var (fieldIndex, expectedValue) in _payloadFilterIndexCache)
+ {
+ string fieldValue = Convert.ToString(obj.PayloadValue(fieldIndex), CultureInfo.InvariantCulture) ?? string.Empty;
+ if (!string.Equals(fieldValue, expectedValue, StringComparison.Ordinal))
+ {
+ return false;
+ }
+ }
+
+ return true;
+ }
+
+ public async ValueTask DisposeAsync()
+ {
+ _eventSource?.Dispose();
+ if (!_leaveEventStreamOpen)
+ {
+ await _eventStream.DisposeAsync();
+ }
+ }
+ }
+}
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
- <TargetFramework>netstandard2.0</TargetFramework>
+ <TargetFramework>netstandard2.1</TargetFramework>
<NoWarn>;1591;1701</NoWarn>
<Description>EventPipe package for collecting logs, metrics, and gcdumps.</Description>
<!-- Tentatively create package so other teams can tentatively consume. -->
--- /dev/null
+// Licensed to the .NET Foundation under one or more agreements.
+// The .NET Foundation licenses this file to you under the MIT license.
+// See the LICENSE file in the project root for more information.
+
+using System;
+using System.IO;
+using System.Threading;
+using System.Threading.Tasks;
+
+namespace Microsoft.Diagnostics.Monitoring.EventPipe
+{
+ /// <summary>
+ /// A read-only stream that passes data as it is read to another stream.
+ /// </summary>
+ internal sealed class PassthroughStream : Stream
+ {
+ private readonly Stream _sourceStream;
+ private readonly Stream _destinationStream;
+
+ /// <summary>
+ /// A read-only stream that passes data as it is read to another stream.
+ /// </summary>
+ /// <param name="sourceStream">The source stream that data will be read from.</param>
+ /// <param name="destinationStream">The destination stream to pass read data to. It must either be full duplex or be write-only.</param>
+ /// <param name="bufferSize">The size of the buffer to use when writing to the <paramref name="destinationStream"/>.</param>
+ /// <param name="leaveDestinationStreamOpen">If true, the provided <paramref name="destinationStream"/> will not be automatically closed when this object is disposed.</param>
+ public PassthroughStream(
+ Stream sourceStream,
+ Stream destinationStream,
+ int bufferSize,
+ bool leaveDestinationStreamOpen = false) : base()
+ {
+ _sourceStream = sourceStream;
+
+ // Wrap a buffered stream around the destination stream to avoid
+ // slowing down the data passthrough unless there is significant pressure.
+ _destinationStream = new BufferedStream(
+ leaveDestinationStreamOpen
+ ? new StreamLeaveOpenWrapper(destinationStream)
+ : destinationStream,
+ bufferSize);
+ }
+
+ public override int Read(byte[] buffer, int offset, int count)
+ {
+ return Read(buffer.AsSpan(offset, count));
+ }
+
+ public override int Read(Span<byte> buffer)
+ {
+ int bytesRead = _sourceStream.Read(buffer);
+ if (bytesRead != 0)
+ {
+ _destinationStream.Write(buffer[..bytesRead]);
+ }
+
+ return bytesRead;
+ }
+
+ public override Task<int> ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
+ {
+ return ReadAsync(buffer.AsMemory(offset, count), cancellationToken).AsTask();
+ }
+
+ public override async ValueTask<int> ReadAsync(Memory<byte> buffer, CancellationToken cancellationToken = default)
+ {
+ int bytesRead = await _sourceStream.ReadAsync(buffer, cancellationToken);
+ if (bytesRead != 0)
+ {
+ await _destinationStream.WriteAsync(buffer[..bytesRead], cancellationToken);
+ }
+
+ return bytesRead;
+ }
+
+ public override bool CanRead => true;
+ public override bool CanSeek => false;
+ public override bool CanWrite => false;
+
+ public override bool CanTimeout => _sourceStream.CanTimeout;
+ public override long Length => _sourceStream.Length;
+
+ public override long Position { get => _sourceStream.Position; set => throw new NotSupportedException(); }
+ public override int ReadTimeout { get => _sourceStream.ReadTimeout; set => _sourceStream.ReadTimeout = value; }
+
+ public override long Seek(long offset, SeekOrigin origin) => throw new NotSupportedException();
+ public override void SetLength(long value) => throw new NotSupportedException();
+ public override void Write(byte[] buffer, int offset, int count) => throw new NotSupportedException();
+
+ public override void CopyTo(Stream destination, int bufferSize) => throw new NotSupportedException();
+ public override Task CopyToAsync(Stream destination, int bufferSize, CancellationToken cancellationToken) => throw new NotSupportedException();
+
+ public override void Flush() => _destinationStream.Flush();
+ public override Task FlushAsync(CancellationToken cancellationToken) => _destinationStream.FlushAsync(cancellationToken);
+
+ public override async ValueTask DisposeAsync()
+ {
+ await _sourceStream.DisposeAsync();
+ await _destinationStream.DisposeAsync();
+ await base.DisposeAsync();
+ }
+ }
+}
--- /dev/null
+// Licensed to the .NET Foundation under one or more agreements.
+// The .NET Foundation licenses this file to you under the MIT license.
+// See the LICENSE file in the project root for more information.
+
+using System;
+using System.IO;
+using System.Threading;
+using System.Threading.Tasks;
+
+namespace Microsoft.Diagnostics.Monitoring.EventPipe
+{
+ /// <summary>
+ /// Wraps a given stream but leaves it open on Dispose.
+ /// </summary>
+ internal sealed class StreamLeaveOpenWrapper : Stream
+ {
+ private readonly Stream _baseStream;
+
+ public StreamLeaveOpenWrapper(Stream baseStream) : base()
+ {
+ _baseStream = baseStream;
+ }
+
+ public override bool CanSeek => _baseStream.CanSeek;
+
+ public override bool CanTimeout => _baseStream.CanTimeout;
+
+ public override bool CanRead => _baseStream.CanRead;
+
+ public override bool CanWrite => _baseStream.CanWrite;
+
+ public override long Length => _baseStream.Length;
+
+ public override long Position { get => _baseStream.Position; set => _baseStream.Position = value; }
+
+ public override int ReadTimeout { get => _baseStream.ReadTimeout; set => _baseStream.ReadTimeout = value; }
+
+ public override int WriteTimeout { get => _baseStream.WriteTimeout; set => _baseStream.WriteTimeout = value; }
+
+ public override long Seek(long offset, SeekOrigin origin) => _baseStream.Seek(offset, origin);
+
+ public override int Read(Span<byte> buffer) => _baseStream.Read(buffer);
+
+ public override int Read(byte[] buffer, int offset, int count) => _baseStream.Read(buffer, offset, count);
+
+ public override int ReadByte() => _baseStream.ReadByte();
+
+ public override Task<int> ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) => _baseStream.ReadAsync(buffer, offset, count, cancellationToken);
+
+ public override ValueTask<int> ReadAsync(Memory<byte> buffer, CancellationToken cancellationToken = default) => _baseStream.ReadAsync(buffer, cancellationToken);
+
+ public override void Flush() => _baseStream.Flush();
+
+ public override void SetLength(long value) => _baseStream.SetLength(value);
+
+ public override void Write(byte[] buffer, int offset, int count) => _baseStream.Write(buffer, offset, count);
+
+ public override void Write(ReadOnlySpan<byte> buffer) => _baseStream.Write(buffer);
+
+ public override Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) => _baseStream.WriteAsync(buffer, offset, count, cancellationToken);
+
+ public override ValueTask WriteAsync(ReadOnlyMemory<byte> buffer, CancellationToken cancellationToken = default) => _baseStream.WriteAsync(buffer, cancellationToken);
+
+ public override void WriteByte(byte value) => _baseStream.WriteByte(value);
+
+ public override Task FlushAsync(CancellationToken cancellationToken) => _baseStream.FlushAsync(cancellationToken);
+
+ public override void CopyTo(Stream destination, int bufferSize) => _baseStream.CopyTo(destination, bufferSize);
+
+ public override Task CopyToAsync(Stream destination, int bufferSize, CancellationToken cancellationToken) => _baseStream.CopyToAsync(destination, bufferSize, cancellationToken);
+
+ public override async ValueTask DisposeAsync() => await base.DisposeAsync();
+ }
+}