* Stop pipelines in tests after receiving expected events.
// The .NET Foundation licenses this file to you under the MIT license.
// See the LICENSE file in the project root for more information.
+using Microsoft.Diagnostics.NETCore.Client;
+using Microsoft.Diagnostics.NETCore.Client.UnitTests;
using System;
using System.Collections.Generic;
-using System.IO;
using System.Linq;
-using System.Runtime.InteropServices;
-using System.Text.Json;
using System.Threading;
using System.Threading.Tasks;
-using Microsoft.Diagnostics.NETCore.Client;
-using Microsoft.Diagnostics.NETCore.Client.UnitTests;
-using Microsoft.Extensions.Logging;
using Xunit;
using Xunit.Abstractions;
-using Xunit.Extensions;
namespace Microsoft.Diagnostics.Monitoring.EventPipe.UnitTests
{
private sealed class TestMetricsLogger : ICountersLogger
{
- private readonly ITestOutputHelper _output;
+ private readonly List<string> _expectedCounters = new List<string>();
private Dictionary<string, ICounterPayload> _metrics = new Dictionary<string, ICounterPayload>();
+ private readonly TaskCompletionSource<object> _foundExpectedCountersSource;
- public TestMetricsLogger(ITestOutputHelper output)
+ public TestMetricsLogger(IDictionary<string, IEnumerable<string>> expectedCounters, TaskCompletionSource<object> foundExpectedCountersSource)
{
- _output = output;
+ _foundExpectedCountersSource = foundExpectedCountersSource;
+
+ if (expectedCounters.Count > 0)
+ {
+ foreach (string providerName in expectedCounters.Keys)
+ {
+ foreach (string counterName in expectedCounters[providerName])
+ {
+ _expectedCounters.Add(CreateKey(providerName, counterName));
+ }
+ }
+ }
+ else
+ {
+ foundExpectedCountersSource.SetResult(null);
+ }
}
public IEnumerable<ICounterPayload> Metrics => _metrics.Values;
public void Log(ICounterPayload metric)
{
- _metrics[string.Concat(metric.Provider, "_", metric.Name)] = metric;
+ string key = CreateKey(metric);
+
+ _metrics[key] = metric;
+
+ // Complete the task source if the last expected key was removed.
+ if (_expectedCounters.Remove(key) && _expectedCounters.Count == 0)
+ {
+ _foundExpectedCountersSource.TrySetResult(null);
+ }
}
public void PipelineStarted()
public void PipelineStopped()
{
}
+
+ private static string CreateKey(ICounterPayload payload)
+ {
+ return CreateKey(payload.Provider, payload.Name);
+ }
+
+ private static string CreateKey(string providerName, string counterName)
+ {
+ return $"{providerName}_{counterName}";
+ }
}
[Fact]
public async Task TestCounterEventPipeline()
{
- var logger = new TestMetricsLogger(_output);
var expectedCounters = new[] { "cpu-usage", "working-set" };
string expectedProvider = "System.Runtime";
+ IDictionary<string, IEnumerable<string>> expectedMap = new Dictionary<string, IEnumerable<string>>();
+ expectedMap.Add(expectedProvider, expectedCounters);
+
+ var foundExpectedCountersSource = new TaskCompletionSource<object>(TaskCreationOptions.RunContinuationsAsynchronously);
+
+ var logger = new TestMetricsLogger(expectedMap, foundExpectedCountersSource);
+
await using (var testExecution = StartTraceeProcess("CounterRemoteTest"))
{
//TestRunner should account for start delay to make sure that the diagnostic pipe is available.
RefreshInterval = TimeSpan.FromSeconds(1)
}, new[] { logger });
- await PipelineTestUtilities.ExecutePipelineWithDebugee(pipeline, testExecution);
+ await PipelineTestUtilities.ExecutePipelineWithDebugee(
+ _output,
+ pipeline,
+ testExecution,
+ foundExpectedCountersSource);
}
Assert.True(logger.Metrics.Any());
}
await using var pipeline = new EventLogsPipeline(client, logSettings, loggerFactory);
- await PipelineTestUtilities.ExecutePipelineWithDebugee(pipeline, testExecution);
+ await PipelineTestUtilities.ExecutePipelineWithDebugee(_output, pipeline, testExecution);
}
outputStream.Position = 0L;
using System;
using System.Collections.Generic;
+using System.Diagnostics;
using System.IO;
using System.Linq;
using System.Runtime.InteropServices;
[Fact]
public async Task TestTraceStopAsync()
{
- using var buffer = new MemoryStream();
Stream eventStream = null;
await using (var testExecution = StartTraceeProcess("TraceStopTest"))
{
Configuration = new CpuProfileConfiguration()
};
+ var foundProviderSource = new TaskCompletionSource<object>(TaskCreationOptions.RunContinuationsAsynchronously);
+
await using var pipeline = new EventTracePipeline(client, settings, async (s, token) =>
{
- await s.CopyToAsync(buffer);
eventStream = s;
- });
- await PipelineTestUtilities.ExecutePipelineWithDebugee(pipeline, testExecution);
- }
+ using var eventSource = new EventPipeEventSource(s);
+
+ // Dispose event source when cancelled.
+ using var _ = token.Register(() => eventSource.Dispose());
- //Validate that the stream is only valid for the lifetime of the callback in the trace pipeline.
- Assert.Throws<ObjectDisposedException>(() => eventStream.Read(new byte[4], 0, 4));
+ eventSource.Dynamic.All += (TraceEvent obj) =>
+ {
+ if (string.Equals(obj.ProviderName, MonitoringSourceConfiguration.SampleProfilerProviderName, StringComparison.OrdinalIgnoreCase))
+ {
+ foundProviderSource.TrySetResult(null);
+ }
+ };
- Assert.True(buffer.Length > 0);
+ await Task.Run(() => Assert.True(eventSource.Process()), token);
+ });
- var eventSource = new EventPipeEventSource(buffer);
- bool foundCpuProvider = false;
+ await PipelineTestUtilities.ExecutePipelineWithDebugee(
+ _output,
+ pipeline,
+ testExecution,
+ foundProviderSource);
+ }
- eventSource.Dynamic.All += (TraceEvent obj) =>
- {
- if (string.Equals(obj.ProviderName, MonitoringSourceConfiguration.SampleProfilerProviderName, StringComparison.OrdinalIgnoreCase))
- {
- foundCpuProvider = true;
- }
- };
- Assert.True(eventSource.Process());
- Assert.True(foundCpuProvider);
+ //Validate that the stream is only valid for the lifetime of the callback in the trace pipeline.
+ Assert.Throws<ObjectDisposedException>(() => eventStream.Read(new byte[4], 0, 4));
}
[SkippableFact]
return Task.CompletedTask;
});
- await Assert.ThrowsAsync<OperationCanceledException>(async () => await PipelineTestUtilities.ExecutePipelineWithDebugee(pipeline, testExecution, cancellationTokenSource.Token));
+ await Assert.ThrowsAsync<OperationCanceledException>(
+ async () => await PipelineTestUtilities.ExecutePipelineWithDebugee(
+ _output,
+ pipeline,
+ testExecution,
+ cancellationTokenSource.Token));
}
//Validate that the stream is only valid for the lifetime of the callback in the trace pipeline.
// See the LICENSE file in the project root for more information.
using Microsoft.Diagnostics.NETCore.Client.UnitTests;
+using System;
using System.Threading;
using System.Threading.Tasks;
+using Xunit.Abstractions;
namespace Microsoft.Diagnostics.Monitoring.EventPipe.UnitTests
{
internal static class PipelineTestUtilities
{
- public static async Task ExecutePipelineWithDebugee(Pipeline pipeline, RemoteTestExecution testExecution, CancellationToken token = default)
+ public static async Task ExecutePipelineWithDebugee(ITestOutputHelper outputHelper, Pipeline pipeline, RemoteTestExecution testExecution, TaskCompletionSource<object> waitTaskSource = null)
+ {
+ using var cancellation = new CancellationTokenSource(TimeSpan.FromMinutes(1));
+
+ await ExecutePipelineWithDebugee(outputHelper, pipeline, testExecution, cancellation.Token, waitTaskSource);
+ }
+
+ public static async Task ExecutePipelineWithDebugee(ITestOutputHelper outputHelper, Pipeline pipeline, RemoteTestExecution testExecution, CancellationToken token, TaskCompletionSource<object> waitTaskSource = null)
{
Task processingTask = pipeline.RunAsync(token);
try
{
+ // Optionally wait on caller before allowing the pipeline to stop.
+ if (null != waitTaskSource)
+ {
+ using var _ = token.Register(() =>
+ {
+ outputHelper.WriteLine("Did not receive completion signal before cancellation.");
+ waitTaskSource.TrySetCanceled(token);
+ });
+
+ await waitTaskSource.Task;
+ }
+
//Signal for the pipeline to stop
await pipeline.StopAsync(token);