Synchronize pipeline tests with event pipe session start. (#2267)
authorJustin Anderson <jander-msft@users.noreply.github.com>
Tue, 11 May 2021 17:30:51 +0000 (10:30 -0700)
committerGitHub <noreply@github.com>
Tue, 11 May 2021 17:30:51 +0000 (10:30 -0700)
* Synchronize pipeline tests with event pipe session start.

src/Microsoft.Diagnostics.Monitoring.EventPipe/DiagnosticsEventPipeProcessor.cs
src/Microsoft.Diagnostics.Monitoring.EventPipe/EventSourcePipeline.cs
src/tests/Microsoft.Diagnostics.Monitoring.EventPipe/EventLogsPipelineUnitTests.cs
src/tests/Microsoft.Diagnostics.Monitoring.EventPipe/PipelineTestUtilities.cs

index 6a24eb1501ba793c2029a01fc296f7036f70f3e4..b07a1742db90e3ea0f6b68b239bf609f9043301b 100644 (file)
@@ -18,11 +18,16 @@ namespace Microsoft.Diagnostics.Monitoring.EventPipe
 
         private readonly object _lock = new object();
 
+        private TaskCompletionSource<bool> _initialized;
         private TaskCompletionSource<bool> _sessionStarted;
         private EventPipeEventSource _eventSource;
         private Func<Task> _stopFunc;
         private bool _disposed;
 
+        // Allows tests to know when the event pipe session has started so that the
+        // target application can start producing events.
+        internal Task SessionStarted => _sessionStarted.Task;
+
         public DiagnosticsEventPipeProcessor(
             MonitoringSourceConfiguration configuration,
             Func<EventPipeEventSource, Func<Task>, CancellationToken, Task> onEventSourceAvailable
@@ -31,13 +36,14 @@ namespace Microsoft.Diagnostics.Monitoring.EventPipe
             _configuration = configuration ?? throw new ArgumentNullException(nameof(configuration));
             _onEventSourceAvailable = onEventSourceAvailable ?? throw new ArgumentNullException(nameof(onEventSourceAvailable));
 
+            _initialized = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
             _sessionStarted = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
         }
 
         public async Task Process(DiagnosticsClient client, TimeSpan duration, CancellationToken token)
         {
             //No need to guard against reentrancy here, since the calling pipeline does this already.
-            IDisposable registration = token.Register(() => _sessionStarted.TrySetCanceled());
+            IDisposable registration = token.Register(() => _initialized.TrySetCanceled());
             await await Task.Factory.StartNew(async () =>
             {
                 EventPipeEventSource source = null;
@@ -51,6 +57,11 @@ namespace Microsoft.Diagnostics.Monitoring.EventPipe
 
                     Stream sessionStream = await streamProvider.ProcessEvents(client, duration, token);
 
+                    if (!_sessionStarted.TrySetResult(true))
+                    {
+                        token.ThrowIfCancellationRequested();
+                    }
+
                     source = new EventPipeEventSource(sessionStream);
 
                     handleEventsTask = _onEventSourceAvailable(source, stopFunc, token);
@@ -61,7 +72,7 @@ namespace Microsoft.Diagnostics.Monitoring.EventPipe
                         _stopFunc = stopFunc;
                     }
                     registration.Dispose();
-                    if (!_sessionStarted.TrySetResult(true))
+                    if (!_initialized.TrySetResult(true))
                     {
                         token.ThrowIfCancellationRequested();
                     }
@@ -101,7 +112,7 @@ namespace Microsoft.Diagnostics.Monitoring.EventPipe
 
         public async Task StopProcessing(CancellationToken token)
         {
-            await _sessionStarted.Task;
+            await _initialized.Task;
 
             EventPipeEventSource session = null;
             Func<Task> stopFunc = null;
@@ -132,15 +143,17 @@ namespace Microsoft.Diagnostics.Monitoring.EventPipe
                 _disposed = true;
             }
 
-            _sessionStarted.TrySetCanceled();
+            _initialized.TrySetCanceled();
             try
             {
-                await _sessionStarted.Task;
+                await _initialized.Task;
             }
             catch
             {
             }
 
+            _sessionStarted.TrySetCanceled();
+
             _eventSource?.Dispose();
         }
     }
index d05a9778a450b40f5fe394e46f6312a9ed106e72..0123c2dab8b10c92014096df4a9f0c1de809d22a 100644 (file)
@@ -10,12 +10,14 @@ using System.Threading.Tasks;
 
 namespace Microsoft.Diagnostics.Monitoring.EventPipe
 {
-    internal abstract class EventSourcePipeline<T> : Pipeline where T : EventSourcePipelineSettings
+    internal abstract class EventSourcePipeline<T> : Pipeline, IEventSourcePipelineInternal where T : EventSourcePipelineSettings
     {
         private readonly Lazy<DiagnosticsEventPipeProcessor> _processor;
         public DiagnosticsClient Client { get; }
         public T Settings { get; }
 
+        Task IEventSourcePipelineInternal.SessionStarted => _processor.Value.SessionStarted;
+
         protected EventSourcePipeline(DiagnosticsClient client, T settings)
         {
             Client = client ?? throw new ArgumentNullException(nameof(client));
@@ -70,4 +72,11 @@ namespace Microsoft.Diagnostics.Monitoring.EventPipe
             return Task.CompletedTask;
         }
     }
+
+    internal interface IEventSourcePipelineInternal
+    {
+        // Allows tests to know when the event pipe session has started so that the
+        // target application can start producing events.
+        Task SessionStarted { get; }
+    }
 }
index a72de9eeead31eeb2b61ff1099393d697b04950b..6bb7028100c4127f84046c99be629ff2fbace54c 100644 (file)
@@ -31,14 +31,9 @@ namespace Microsoft.Diagnostics.Monitoring.EventPipe.UnitTests
             _output = output;
         }
 
-        [SkippableFact]
+        [Fact]
         public async Task TestLogs()
         {
-            if (RuntimeInformation.IsOSPlatform(OSPlatform.Windows))
-            {
-                throw new SkipTestException("https://github.com/dotnet/diagnostics/issues/2234");
-            }
-
             var outputStream = new MemoryStream();
 
             await using (var testExecution = StartTraceeProcess("LoggerRemoteTest"))
index 2f883bceb2595df2d339dfac239f2a4b9ed95534..168068ec362e7b5236ec3ef6a151596da0edf10c 100644 (file)
@@ -1,8 +1,8 @@
-using Microsoft.Diagnostics.NETCore.Client;
+// Licensed to the .NET Foundation under one or more agreements.
+// The .NET Foundation licenses this file to you under the MIT license.
+// See the LICENSE file in the project root for more information.
+
 using Microsoft.Diagnostics.NETCore.Client.UnitTests;
-using System;
-using System.Collections.Generic;
-using System.Text;
 using System.Threading;
 using System.Threading.Tasks;
 
@@ -14,6 +14,12 @@ namespace Microsoft.Diagnostics.Monitoring.EventPipe.UnitTests
         {
             Task processingTask = pipeline.RunAsync(token);
 
+            // Wait for event session to be established before telling target app to produce events.
+            if (pipeline is IEventSourcePipelineInternal eventSourcePipeline)
+            {
+                await eventSourcePipeline.SessionStarted;
+            }
+
             //Begin event production
             testExecution.SendSignal();