private readonly object _lock = new object();
+ private TaskCompletionSource<bool> _initialized;
private TaskCompletionSource<bool> _sessionStarted;
private EventPipeEventSource _eventSource;
private Func<Task> _stopFunc;
private bool _disposed;
+ // Allows tests to know when the event pipe session has started so that the
+ // target application can start producing events.
+ internal Task SessionStarted => _sessionStarted.Task;
+
public DiagnosticsEventPipeProcessor(
MonitoringSourceConfiguration configuration,
Func<EventPipeEventSource, Func<Task>, CancellationToken, Task> onEventSourceAvailable
_configuration = configuration ?? throw new ArgumentNullException(nameof(configuration));
_onEventSourceAvailable = onEventSourceAvailable ?? throw new ArgumentNullException(nameof(onEventSourceAvailable));
+ _initialized = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
_sessionStarted = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
}
public async Task Process(DiagnosticsClient client, TimeSpan duration, CancellationToken token)
{
//No need to guard against reentrancy here, since the calling pipeline does this already.
- IDisposable registration = token.Register(() => _sessionStarted.TrySetCanceled());
+ IDisposable registration = token.Register(() => _initialized.TrySetCanceled());
await await Task.Factory.StartNew(async () =>
{
EventPipeEventSource source = null;
Stream sessionStream = await streamProvider.ProcessEvents(client, duration, token);
+ if (!_sessionStarted.TrySetResult(true))
+ {
+ token.ThrowIfCancellationRequested();
+ }
+
source = new EventPipeEventSource(sessionStream);
handleEventsTask = _onEventSourceAvailable(source, stopFunc, token);
_stopFunc = stopFunc;
}
registration.Dispose();
- if (!_sessionStarted.TrySetResult(true))
+ if (!_initialized.TrySetResult(true))
{
token.ThrowIfCancellationRequested();
}
public async Task StopProcessing(CancellationToken token)
{
- await _sessionStarted.Task;
+ await _initialized.Task;
EventPipeEventSource session = null;
Func<Task> stopFunc = null;
_disposed = true;
}
- _sessionStarted.TrySetCanceled();
+ _initialized.TrySetCanceled();
try
{
- await _sessionStarted.Task;
+ await _initialized.Task;
}
catch
{
}
+ _sessionStarted.TrySetCanceled();
+
_eventSource?.Dispose();
}
}
namespace Microsoft.Diagnostics.Monitoring.EventPipe
{
- internal abstract class EventSourcePipeline<T> : Pipeline where T : EventSourcePipelineSettings
+ internal abstract class EventSourcePipeline<T> : Pipeline, IEventSourcePipelineInternal where T : EventSourcePipelineSettings
{
private readonly Lazy<DiagnosticsEventPipeProcessor> _processor;
public DiagnosticsClient Client { get; }
public T Settings { get; }
+ Task IEventSourcePipelineInternal.SessionStarted => _processor.Value.SessionStarted;
+
protected EventSourcePipeline(DiagnosticsClient client, T settings)
{
Client = client ?? throw new ArgumentNullException(nameof(client));
return Task.CompletedTask;
}
}
+
+ internal interface IEventSourcePipelineInternal
+ {
+ // Allows tests to know when the event pipe session has started so that the
+ // target application can start producing events.
+ Task SessionStarted { get; }
+ }
}
-using Microsoft.Diagnostics.NETCore.Client;
+// Licensed to the .NET Foundation under one or more agreements.
+// The .NET Foundation licenses this file to you under the MIT license.
+// See the LICENSE file in the project root for more information.
+
using Microsoft.Diagnostics.NETCore.Client.UnitTests;
-using System;
-using System.Collections.Generic;
-using System.Text;
using System.Threading;
using System.Threading.Tasks;
{
Task processingTask = pipeline.RunAsync(token);
+ // Wait for event session to be established before telling target app to produce events.
+ if (pipeline is IEventSourcePipelineInternal eventSourcePipeline)
+ {
+ await eventSourcePipeline.SessionStarted;
+ }
+
//Begin event production
testExecution.SendSignal();