From: Justin Anderson Date: Thu, 6 Aug 2020 18:46:34 +0000 (-0700) Subject: Add reversed diagnostics server, endpoints, and endpoint sources. (#1303) X-Git-Tag: submit/tizen/20210909.063632~17^2~518 X-Git-Url: http://review.tizen.org/git/?a=commitdiff_plain;h=455c8914b2831bfd1eee54b80263084d1e5de03e;p=platform%2Fcore%2Fdotnet%2Fdiagnostics.git Add reversed diagnostics server, endpoints, and endpoint sources. (#1303) Add reversed diagnostics server and endpoint information. Add diagnostics endpoint source for unifying server and client connections into same contract. Consume endpoint source in dotnet-monitor. Add unit tests for reversed server, endpoints, and endpoint source concepts. --- diff --git a/eng/Versions.props b/eng/Versions.props index 282408aea..91f5875cb 100644 --- a/eng/Versions.props +++ b/eng/Versions.props @@ -42,6 +42,7 @@ 2.1.1 2.1.1 2.1.1 + 2.1.1 2.1.1 2.1.1 diff --git a/src/Microsoft.Diagnostics.Monitoring.RestServer/Controllers/DiagController.cs b/src/Microsoft.Diagnostics.Monitoring.RestServer/Controllers/DiagController.cs index 85c350d03..f02388630 100644 --- a/src/Microsoft.Diagnostics.Monitoring.RestServer/Controllers/DiagController.cs +++ b/src/Microsoft.Diagnostics.Monitoring.RestServer/Controllers/DiagController.cs @@ -44,14 +44,14 @@ namespace Microsoft.Diagnostics.Monitoring.RestServer.Controllers } [HttpGet("processes")] - public ActionResult> GetProcesses() + public Task>> GetProcesses() { - return this.InvokeService(() => + return this.InvokeService(async () => { IList processes = new List(); - foreach (int pid in _diagnosticServices.GetProcesses()) + foreach (IProcessInfo p in await _diagnosticServices.GetProcessesAsync(HttpContext.RequestAborted)) { - processes.Add(new ProcessModel() { Pid = pid }); + processes.Add(ProcessModel.FromProcessInfo(p)); } return new ActionResult>(processes); }); @@ -62,8 +62,8 @@ namespace Microsoft.Diagnostics.Monitoring.RestServer.Controllers { return this.InvokeService(async () => { - int pidValue = _diagnosticServices.ResolveProcess(pid); - Stream result = await _diagnosticServices.GetDump(pidValue, type); + int pidValue = await _diagnosticServices.ResolveProcessAsync(pid, HttpContext.RequestAborted); + Stream result = await _diagnosticServices.GetDump(pidValue, type, HttpContext.RequestAborted); string dumpFileName = RuntimeInformation.IsOSPlatform(OSPlatform.Windows) ? FormattableString.Invariant($"dump_{GetFileNameTimeStampUtcNow()}.dmp") : @@ -80,7 +80,7 @@ namespace Microsoft.Diagnostics.Monitoring.RestServer.Controllers { return this.InvokeService(async () => { - int pidValue = _diagnosticServices.ResolveProcess(pid); + int pidValue = await _diagnosticServices.ResolveProcessAsync(pid, HttpContext.RequestAborted); Stream result = await _diagnosticServices.GetGcDump(pidValue, this.HttpContext.RequestAborted); return File(result, "application/octet-stream", FormattableString.Invariant($"{GetFileNameTimeStampUtcNow()}_{pidValue}.gcdump")); }); @@ -159,12 +159,12 @@ namespace Microsoft.Diagnostics.Monitoring.RestServer.Controllers [HttpGet("logs/{pid?}")] [Produces(ContentTypeEventStream, ContentTypeNdJson, ContentTypeJson)] - public ActionResult Logs(int? pid, [FromQuery][Range(-1, int.MaxValue)] int durationSeconds = 30, [FromQuery] LogLevel level = LogLevel.Debug) + public Task Logs(int? pid, [FromQuery][Range(-1, int.MaxValue)] int durationSeconds = 30, [FromQuery] LogLevel level = LogLevel.Debug) { TimeSpan duration = ConvertSecondsToTimeSpan(durationSeconds); - return this.InvokeService(() => + return this.InvokeService(async () => { - int pidValue = _diagnosticServices.ResolveProcess(pid); + int pidValue = await _diagnosticServices.ResolveProcessAsync(pid, HttpContext.RequestAborted); LogFormat format = ComputeLogFormat(Request.GetTypedHeaders().Accept); if (format == LogFormat.None) @@ -184,7 +184,7 @@ namespace Microsoft.Diagnostics.Monitoring.RestServer.Controllers private async Task StartTrace(int? pid, MonitoringSourceConfiguration configuration, TimeSpan duration) { - int pidValue = _diagnosticServices.ResolveProcess(pid); + int pidValue = await _diagnosticServices.ResolveProcessAsync(pid, HttpContext.RequestAborted); IStreamWithCleanup result = await _diagnosticServices.StartTrace(pidValue, configuration, duration, this.HttpContext.RequestAborted); return new StreamWithCleanupResult(result, "application/octet-stream", FormattableString.Invariant($"{GetFileNameTimeStampUtcNow()}_{pidValue}.nettrace")); } diff --git a/src/Microsoft.Diagnostics.Monitoring.RestServer/MetricsService.cs b/src/Microsoft.Diagnostics.Monitoring.RestServer/MetricsService.cs index fa5b9d48a..7d3b7333a 100644 --- a/src/Microsoft.Diagnostics.Monitoring.RestServer/MetricsService.cs +++ b/src/Microsoft.Diagnostics.Monitoring.RestServer/MetricsService.cs @@ -2,15 +2,12 @@ // The .NET Foundation licenses this file to you under the MIT license. // See the LICENSE file in the project root for more information. -using Microsoft.Diagnostics.Tracing.Analysis; -using Microsoft.Extensions.Hosting; -using Microsoft.Extensions.Options; using System; -using System.Collections.Generic; -using System.Linq; -using System.Text; using System.Threading; using System.Threading.Tasks; +using Microsoft.Diagnostics.NETCore.Client; +using Microsoft.Extensions.Hosting; +using Microsoft.Extensions.Options; namespace Microsoft.Diagnostics.Monitoring.RestServer { @@ -46,8 +43,9 @@ namespace Microsoft.Diagnostics.Monitoring.RestServer { //TODO In multi-process scenarios, how do we decide which process to choose? //One possibility is to enable metrics after a request to begin polling for metrics - int pid = _services.ResolveProcess(pid: null); - await _pipeProcessor.Process(pid, Timeout.InfiniteTimeSpan, stoppingToken); + int pid = await _services.ResolveProcessAsync(pid: null, stoppingToken); + var client = new DiagnosticsClient(pid); + await _pipeProcessor.Process(client, pid, Timeout.InfiniteTimeSpan, stoppingToken); } catch(Exception e) when (!(e is OperationCanceledException)) { diff --git a/src/Microsoft.Diagnostics.Monitoring.RestServer/Models/ProcessModel.cs b/src/Microsoft.Diagnostics.Monitoring.RestServer/Models/ProcessModel.cs index 37bd13452..e127a4e8e 100644 --- a/src/Microsoft.Diagnostics.Monitoring.RestServer/Models/ProcessModel.cs +++ b/src/Microsoft.Diagnostics.Monitoring.RestServer/Models/ProcessModel.cs @@ -1,4 +1,5 @@ -using System.Runtime.Serialization; +using System; +using System.Runtime.Serialization; namespace Microsoft.Diagnostics.Monitoring.RestServer.Models { @@ -7,5 +8,13 @@ namespace Microsoft.Diagnostics.Monitoring.RestServer.Models { [DataMember(Name = "pid")] public int Pid { get; set; } + + [DataMember(Name = "uid")] + public Guid Uid { get; set; } + + public static ProcessModel FromProcessInfo(IProcessInfo processInfo) + { + return new ProcessModel() { Pid = processInfo.Pid, Uid = processInfo.Uid }; + } } } \ No newline at end of file diff --git a/src/Microsoft.Diagnostics.Monitoring/ClientEndpointInfoSource.cs b/src/Microsoft.Diagnostics.Monitoring/ClientEndpointInfoSource.cs new file mode 100644 index 000000000..7abee42fd --- /dev/null +++ b/src/Microsoft.Diagnostics.Monitoring/ClientEndpointInfoSource.cs @@ -0,0 +1,44 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT license. +// See the LICENSE file in the project root for more information. + +using System; +using System.Collections.Generic; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; +using Microsoft.Diagnostics.NETCore.Client; + +namespace Microsoft.Diagnostics.Monitoring +{ + internal sealed class ClientEndpointInfoSource : IEndpointInfoSourceInternal + { + public Task> GetEndpointInfoAsync(CancellationToken token) + { + List endpointInfos = new List(); + foreach (int pid in DiagnosticsClient.GetPublishedProcesses()) + { + // CONSIDER: Generate a "runtime instance identifier" based on the pipe name + // e.g. pid + disambiguator in GUID form. + endpointInfos.Add(new EndpointInfo(pid)); + } + + return Task.FromResult(endpointInfos.AsEnumerable()); + } + + private class EndpointInfo : IEndpointInfo + { + public EndpointInfo(int processId) + { + Endpoint = new PidIpcEndpoint(processId); + ProcessId = processId; + } + + public IpcEndpoint Endpoint { get; } + + public int ProcessId { get; } + + public Guid RuntimeInstanceCookie => Guid.Empty; + } + } +} diff --git a/src/Microsoft.Diagnostics.Monitoring/Configuration/CpuProfileConfiguration.cs b/src/Microsoft.Diagnostics.Monitoring/Configuration/CpuProfileConfiguration.cs index 77e5c9822..5735f6450 100644 --- a/src/Microsoft.Diagnostics.Monitoring/Configuration/CpuProfileConfiguration.cs +++ b/src/Microsoft.Diagnostics.Monitoring/Configuration/CpuProfileConfiguration.cs @@ -14,7 +14,7 @@ namespace Microsoft.Diagnostics.Monitoring public override IList GetProviders() => new EventPipeProvider[] { - new EventPipeProvider("Microsoft-DotNETCore-SampleProfiler", System.Diagnostics.Tracing.EventLevel.Informational), + new EventPipeProvider(SampleProfilerProviderName, System.Diagnostics.Tracing.EventLevel.Informational), new EventPipeProvider("Microsoft-Windows-DotNETRuntime", System.Diagnostics.Tracing.EventLevel.Informational, (long) Tracing.Parsers.ClrTraceEventParser.Keywords.Default) }; } diff --git a/src/Microsoft.Diagnostics.Monitoring/Configuration/MonitoringSourceConfiguration.cs b/src/Microsoft.Diagnostics.Monitoring/Configuration/MonitoringSourceConfiguration.cs index 50febb255..d95e65737 100644 --- a/src/Microsoft.Diagnostics.Monitoring/Configuration/MonitoringSourceConfiguration.cs +++ b/src/Microsoft.Diagnostics.Monitoring/Configuration/MonitoringSourceConfiguration.cs @@ -15,6 +15,8 @@ namespace Microsoft.Diagnostics.Monitoring public const string GrpcAspNetCoreServer = "Grpc.AspNetCore.Server"; public const string DiagnosticSourceEventSource = "Microsoft-Diagnostics-DiagnosticSource"; public const string TplEventSource = "System.Threading.Tasks.TplEventSource"; + public const string SampleProfilerProviderName = "Microsoft-DotNETCore-SampleProfiler"; + public const string EventPipeProviderName = "Microsoft-DotNETCore-EventPipe"; public abstract IList GetProviders(); diff --git a/src/Microsoft.Diagnostics.Monitoring/Configuration/SampleProfilerConfiguration.cs b/src/Microsoft.Diagnostics.Monitoring/Configuration/SampleProfilerConfiguration.cs new file mode 100644 index 000000000..de7d73da3 --- /dev/null +++ b/src/Microsoft.Diagnostics.Monitoring/Configuration/SampleProfilerConfiguration.cs @@ -0,0 +1,23 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT license. +// See the LICENSE file in the project root for more information. + +using Microsoft.Diagnostics.NETCore.Client; +using System.Collections.Generic; +using System.Diagnostics.Tracing; + +namespace Microsoft.Diagnostics.Monitoring +{ + public sealed class SampleProfilerConfiguration : MonitoringSourceConfiguration + { + public override IList GetProviders() => + new EventPipeProvider[] + { + new EventPipeProvider(SampleProfilerProviderName, EventLevel.Informational) + }; + + public override int BufferSizeInMB => 1; + + public override bool RequestRundown => false; + } +} diff --git a/src/Microsoft.Diagnostics.Monitoring/Contracts/IDiagnosticServices.cs b/src/Microsoft.Diagnostics.Monitoring/Contracts/IDiagnosticServices.cs index ea1762a36..d118a65e5 100644 --- a/src/Microsoft.Diagnostics.Monitoring/Contracts/IDiagnosticServices.cs +++ b/src/Microsoft.Diagnostics.Monitoring/Contracts/IDiagnosticServices.cs @@ -17,11 +17,11 @@ namespace Microsoft.Diagnostics.Monitoring /// public interface IDiagnosticServices : IDisposable { - IEnumerable GetProcesses(); + Task> GetProcessesAsync(CancellationToken token); - int ResolveProcess(int? pid); + Task ResolveProcessAsync(int? pid, CancellationToken token); - Task GetDump(int pid, DumpType mode); + Task GetDump(int pid, DumpType mode, CancellationToken token); Task GetGcDump(int pid, CancellationToken token); @@ -35,6 +35,13 @@ namespace Microsoft.Diagnostics.Monitoring Stream Stream { get; } } + public interface IProcessInfo + { + int Pid { get; } + + Guid Uid { get; } + } + public enum DumpType { Full = 1, diff --git a/src/Microsoft.Diagnostics.Monitoring/Contracts/IEndpointInfoSource.cs b/src/Microsoft.Diagnostics.Monitoring/Contracts/IEndpointInfoSource.cs new file mode 100644 index 000000000..d98ec2dae --- /dev/null +++ b/src/Microsoft.Diagnostics.Monitoring/Contracts/IEndpointInfoSource.cs @@ -0,0 +1,30 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT license. +// See the LICENSE file in the project root for more information. + +using System; +using System.Collections.Generic; +using System.Threading; +using System.Threading.Tasks; +using Microsoft.Diagnostics.NETCore.Client; + +namespace Microsoft.Diagnostics.Monitoring +{ + internal interface IEndpointInfo + { + IpcEndpoint Endpoint { get; } + + int ProcessId { get; } + + Guid RuntimeInstanceCookie { get; } + } + + public interface IEndpointInfoSource + { + } + + internal interface IEndpointInfoSourceInternal : IEndpointInfoSource + { + Task> GetEndpointInfoAsync(CancellationToken token); + } +} diff --git a/src/Microsoft.Diagnostics.Monitoring/DiagnosticServices.cs b/src/Microsoft.Diagnostics.Monitoring/DiagnosticServices.cs index 8c9d534ff..45099f793 100644 --- a/src/Microsoft.Diagnostics.Monitoring/DiagnosticServices.cs +++ b/src/Microsoft.Diagnostics.Monitoring/DiagnosticServices.cs @@ -21,15 +21,25 @@ namespace Microsoft.Diagnostics.Monitoring { private const int DockerEntrypointProcessId = 1; + // The amount of time to wait when checking if the docker entrypoint process is a .NET process + // with a diagnostics transport connection. + private static readonly TimeSpan DockerEntrypointWaitTimeout = TimeSpan.FromMilliseconds(250); + + private readonly IEndpointInfoSourceInternal _endpointInfoSource; private readonly CancellationTokenSource _tokenSource = new CancellationTokenSource(); - public IEnumerable GetProcesses() + public DiagnosticServices(IEndpointInfoSource endpointInfoSource) + { + _endpointInfoSource = (IEndpointInfoSourceInternal)endpointInfoSource; + } + + public async Task> GetProcessesAsync(CancellationToken token) { try { - //TODO This won't work properly with multi-container scenarios that don't share the process space. - //TODO We will need to use DiagnosticsAgent if we are the server. - return DiagnosticsClient.GetPublishedProcesses(); + var endpointInfos = await _endpointInfoSource.GetEndpointInfoAsync(token); + + return endpointInfos.Select(c => new ProcessInfo(c.RuntimeInstanceCookie, c.ProcessId)); } catch (UnauthorizedAccessException) { @@ -37,7 +47,7 @@ namespace Microsoft.Diagnostics.Monitoring } } - public async Task GetDump(int pid, DumpType mode) + public async Task GetDump(int pid, DumpType mode, CancellationToken token) { string dumpFilePath = Path.Combine(Path.GetTempPath(), FormattableString.Invariant($"{Guid.NewGuid()}_{pid}")); NETCore.Client.DumpType dumpType = MapDumpType(mode); @@ -50,9 +60,9 @@ namespace Microsoft.Diagnostics.Monitoring } else { + var client = await GetClientAsync(pid, CancellationToken.None); await Task.Run(() => { - var client = new DiagnosticsClient(pid); client.WriteDump(dumpType, dumpFilePath); }); } @@ -60,13 +70,15 @@ namespace Microsoft.Diagnostics.Monitoring return new AutoDeleteFileStream(dumpFilePath); } - public async Task GetGcDump(int pid, CancellationToken cancellationToken) + public async Task GetGcDump(int pid, CancellationToken token) { var graph = new MemoryGraph(50_000); - await using var processor = new DiagnosticsEventPipeProcessor(PipeMode.GCDump, + await using var processor = new DiagnosticsEventPipeProcessor( + PipeMode.GCDump, gcGraph: graph); - await processor.Process(pid, Timeout.InfiniteTimeSpan, cancellationToken); + var client = await GetClientAsync(pid, token); + await processor.Process(client, pid, Timeout.InfiniteTimeSpan, token); var dumper = new GCHeapDump(graph); dumper.CreationTool = "dotnet-monitor"; @@ -82,7 +94,8 @@ namespace Microsoft.Diagnostics.Monitoring public async Task StartTrace(int pid, MonitoringSourceConfiguration configuration, TimeSpan duration, CancellationToken token) { DiagnosticsMonitor monitor = new DiagnosticsMonitor(configuration); - Stream stream = await monitor.ProcessEvents(pid, duration, token); + var client = await GetClientAsync(pid, token); + Stream stream = await monitor.ProcessEvents(client, duration, token); return new StreamWithCleanup(monitor, stream); } @@ -92,11 +105,13 @@ namespace Microsoft.Diagnostics.Monitoring loggerFactory.AddProvider(new StreamingLoggerProvider(outputStream, format, level)); - await using var processor = new DiagnosticsEventPipeProcessor(PipeMode.Logs, + await using var processor = new DiagnosticsEventPipeProcessor( + PipeMode.Logs, loggerFactory: loggerFactory, logsLevel: level); - await processor.Process(pid, duration, token); + var client = await GetClientAsync(pid, token); + await processor.Process(client, pid, duration, token); } private static NETCore.Client.DumpType MapDumpType(DumpType dumpType) @@ -116,7 +131,7 @@ namespace Microsoft.Diagnostics.Monitoring } } - public int ResolveProcess(int? pid) + public async Task ResolveProcessAsync(int? pid, CancellationToken token) { if (pid.HasValue) { @@ -126,24 +141,32 @@ namespace Microsoft.Diagnostics.Monitoring // Short-circuit for when running in a Docker container. if (RuntimeInfo.IsInDockerContainer) { - var client = new DiagnosticsClient(DockerEntrypointProcessId); - if (client.CheckTransport()) + try { + var client = await GetClientAsync(DockerEntrypointProcessId, token); + using var timeoutSource = new CancellationTokenSource(DockerEntrypointWaitTimeout); + + await client.WaitForConnectionAsync(timeoutSource.Token); + return DockerEntrypointProcessId; } + catch + { + // Process ID 1 doesn't exist or didn't advertise in the reverse pipe configuration. + } } // Only return a process ID if there is exactly one discoverable process. - int[] pids = GetProcesses().ToArray(); - switch (pids.Length) + IProcessInfo[] processes = (await GetProcessesAsync(token)).ToArray(); + switch (processes.Length) { case 0: throw new ArgumentException("Unable to discover a target process."); case 1: - return pids[0]; + return processes[0].Pid; default: #if DEBUG - Process process = pids.Select(pid => Process.GetProcessById(pid)).FirstOrDefault(p => string.Equals(p.ProcessName, "iisexpress", StringComparison.OrdinalIgnoreCase)); + Process process = processes.Select(p => Process.GetProcessById(p.Pid)).FirstOrDefault(p => string.Equals(p.ProcessName, "iisexpress", StringComparison.OrdinalIgnoreCase)); if (process != null) { return process.Id; @@ -153,6 +176,19 @@ namespace Microsoft.Diagnostics.Monitoring } } + private async Task GetClientAsync(int processId, CancellationToken token) + { + var endpointInfos = await _endpointInfoSource.GetEndpointInfoAsync(token); + var endpointInfo = endpointInfos.FirstOrDefault(c => c.ProcessId == processId); + + if (null == endpointInfo) + { + throw new InvalidOperationException($"Diagnostics client for process ID {processId} does not exist."); + } + + return new DiagnosticsClient(endpointInfo.Endpoint); + } + public void Dispose() { _tokenSource.Cancel(); @@ -202,5 +238,18 @@ namespace Microsoft.Diagnostics.Monitoring } } } + + private sealed class ProcessInfo : IProcessInfo + { + public ProcessInfo(Guid uid, int pid) + { + Pid = pid; + Uid = uid; + } + + public int Pid { get; } + + public Guid Uid { get; } + } } } diff --git a/src/Microsoft.Diagnostics.Monitoring/DiagnosticsEventPipeProcessor.cs b/src/Microsoft.Diagnostics.Monitoring/DiagnosticsEventPipeProcessor.cs index 9ce3e1531..a784bb4e8 100644 --- a/src/Microsoft.Diagnostics.Monitoring/DiagnosticsEventPipeProcessor.cs +++ b/src/Microsoft.Diagnostics.Monitoring/DiagnosticsEventPipeProcessor.cs @@ -2,11 +2,6 @@ // The .NET Foundation licenses this file to you under the MIT license. // See the LICENSE file in the project root for more information. -using Graphs; -using Microsoft.Diagnostics.NETCore.Client; -using Microsoft.Diagnostics.Tracing; -using Microsoft.Diagnostics.Tracing.Parsers.Clr; -using Microsoft.Extensions.Logging; using System; using System.Collections.Generic; using System.Globalization; @@ -15,6 +10,11 @@ using System.Linq; using System.Text.Json; using System.Threading; using System.Threading.Tasks; +using Graphs; +using Microsoft.Diagnostics.NETCore.Client; +using Microsoft.Diagnostics.Tracing; +using Microsoft.Diagnostics.Tracing.Parsers.Clr; +using Microsoft.Extensions.Logging; namespace Microsoft.Diagnostics.Monitoring { @@ -22,7 +22,8 @@ namespace Microsoft.Diagnostics.Monitoring { Logs = 1, Metrics, - GCDump + GCDump, + ProcessInfo } public class DiagnosticsEventPipeProcessor : IAsyncDisposable @@ -33,14 +34,17 @@ namespace Microsoft.Diagnostics.Monitoring private readonly PipeMode _mode; private readonly int _metricIntervalSeconds; private readonly LogLevel _logsLevel; + private readonly Action _processInfoCallback; public DiagnosticsEventPipeProcessor( PipeMode mode, - ILoggerFactory loggerFactory = null, - IEnumerable metricLoggers = null, - int metricIntervalSeconds = 10, - MemoryGraph gcGraph = null, - LogLevel logsLevel = LogLevel.Debug) + ILoggerFactory loggerFactory = null, // PipeMode = Logs + LogLevel logsLevel = LogLevel.Debug, // PipeMode = Logs + IEnumerable metricLoggers = null, // PipeMode = Metrics + int metricIntervalSeconds = 10, // PipeMode = Metrics + MemoryGraph gcGraph = null, // PipeMode = GCDump + Action processInfoCallback = null // PipeMode = ProcessInfo + ) { _metricLoggers = metricLoggers ?? Enumerable.Empty(); _mode = mode; @@ -48,9 +52,10 @@ namespace Microsoft.Diagnostics.Monitoring _gcGraph = gcGraph; _metricIntervalSeconds = metricIntervalSeconds; _logsLevel = logsLevel; + _processInfoCallback = processInfoCallback; } - public async Task Process(int pid, TimeSpan duration, CancellationToken token) + public async Task Process(DiagnosticsClient client, int pid, TimeSpan duration, CancellationToken token) { await await Task.Factory.StartNew(async () => { @@ -72,9 +77,13 @@ namespace Microsoft.Diagnostics.Monitoring { config = new GCDumpSourceConfiguration(); } + if (_mode == PipeMode.ProcessInfo) + { + config = new SampleProfilerConfiguration(); + } monitor = new DiagnosticsMonitor(config); - Stream sessionStream = await monitor.ProcessEvents(pid, duration, token); + Stream sessionStream = await monitor.ProcessEvents(client, duration, token); source = new EventPipeEventSource(sessionStream); // Allows the event handling routines to stop processing before the duration expires. @@ -98,6 +107,12 @@ namespace Microsoft.Diagnostics.Monitoring handleEventsTask = HandleGCEvents(source, pid, stopFunc, token); } + if (_mode == PipeMode.ProcessInfo) + { + // ProcessInfo + HandleProcessInfo(source, stopFunc, token); + } + source.Process(); token.ThrowIfCancellationRequested(); @@ -455,6 +470,19 @@ namespace Microsoft.Diagnostics.Monitoring _gcGraph.AllowReading(); } + private void HandleProcessInfo(EventPipeEventSource source, Func stopFunc, CancellationToken token) + { + source.Dynamic.AddCallbackForProviderEvent(MonitoringSourceConfiguration.EventPipeProviderName, "ProcessInfo", traceEvent => + { + _processInfoCallback?.Invoke((string)traceEvent.PayloadByName("CommandLine")); + }); + + source.Dynamic.All += traceEvent => + { + stopFunc(); + }; + } + public async ValueTask DisposeAsync() { foreach (IMetricsLogger logger in _metricLoggers) diff --git a/src/Microsoft.Diagnostics.Monitoring/DiagnosticsMonitor.cs b/src/Microsoft.Diagnostics.Monitoring/DiagnosticsMonitor.cs index 8e3b97417..960f3c18d 100644 --- a/src/Microsoft.Diagnostics.Monitoring/DiagnosticsMonitor.cs +++ b/src/Microsoft.Diagnostics.Monitoring/DiagnosticsMonitor.cs @@ -2,21 +2,18 @@ // The .NET Foundation licenses this file to you under the MIT license. // See the LICENSE file in the project root for more information. -using Microsoft.Diagnostics.NETCore.Client; using System; -using System.Collections.Generic; using System.IO; -using System.Linq; -using System.Text.Json; using System.Threading; using System.Threading.Tasks; +using Microsoft.Diagnostics.NETCore.Client; namespace Microsoft.Diagnostics.Monitoring { public sealed class DiagnosticsMonitor : IAsyncDisposable { private readonly MonitoringSourceConfiguration _sourceConfig; - private readonly CancellationTokenSource _stopProcessingSource; + private readonly TaskCompletionSource _stopProcessingSource; private readonly object _lock = new object(); private Task _currentTask; private bool _disposed; @@ -24,12 +21,12 @@ namespace Microsoft.Diagnostics.Monitoring public DiagnosticsMonitor(MonitoringSourceConfiguration sourceConfig) { _sourceConfig = sourceConfig; - _stopProcessingSource = new CancellationTokenSource(); + _stopProcessingSource = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); } public Task CurrentProcessingTask => _currentTask; - public Task ProcessEvents(int processId, TimeSpan duration, CancellationToken cancellationToken) + public Task ProcessEvents(DiagnosticsClient client, TimeSpan duration, CancellationToken cancellationToken) { cancellationToken.ThrowIfCancellationRequested(); @@ -46,8 +43,6 @@ namespace Microsoft.Diagnostics.Monitoring } EventPipeSession session = null; - var client = new DiagnosticsClient(processId); - try { session = client.StartEventPipeSession(_sourceConfig.GetProviders(), _sourceConfig.RequestRundown, _sourceConfig.BufferSizeInMB); @@ -61,19 +56,17 @@ namespace Microsoft.Diagnostics.Monitoring throw new InvalidOperationException("Failed to start the event pipe session", ex); } - CancellationTokenSource linkedSource = CancellationTokenSource.CreateLinkedTokenSource(_stopProcessingSource.Token, cancellationToken); - _currentTask = Task.Run( async () => { - try - { - await Task.Delay(duration, linkedSource.Token); - } - finally - { - linkedSource.Dispose(); - StopSession(session); - } + using var linkedSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken); + linkedSource.CancelAfter(duration); + using var _ = linkedSource.Token.Register(() => _stopProcessingSource.TrySetResult(null)); + + // Use TaskCompletionSource instead of Task.Delay with cancellation to avoid + // using exceptions for normal termination of event stream. + await _stopProcessingSource.Task.ConfigureAwait(false); + + StopSession(session); }); return Task.FromResult(session.EventStream); @@ -82,7 +75,7 @@ namespace Microsoft.Diagnostics.Monitoring public void StopProcessing() { - _stopProcessingSource.Cancel(); + _stopProcessingSource.TrySetResult(null); } private static void StopSession(EventPipeSession session) @@ -127,18 +120,17 @@ namespace Microsoft.Diagnostics.Monitoring _currentTask = null; _disposed = true; } - _stopProcessingSource.Cancel(); + _stopProcessingSource.TrySetResult(null); if (currentTask != null) { try { - await currentTask; + await currentTask.ConfigureAwait(false); } catch (OperationCanceledException) { } } - _stopProcessingSource?.Dispose(); } } } \ No newline at end of file diff --git a/src/Microsoft.Diagnostics.Monitoring/Microsoft.Diagnostics.Monitoring.csproj b/src/Microsoft.Diagnostics.Monitoring/Microsoft.Diagnostics.Monitoring.csproj index 2b81867ec..c94be1221 100644 --- a/src/Microsoft.Diagnostics.Monitoring/Microsoft.Diagnostics.Monitoring.csproj +++ b/src/Microsoft.Diagnostics.Monitoring/Microsoft.Diagnostics.Monitoring.csproj @@ -27,6 +27,7 @@ + @@ -38,4 +39,9 @@ + + + + + diff --git a/src/Microsoft.Diagnostics.Monitoring/ServerEndpointInfoSource.cs b/src/Microsoft.Diagnostics.Monitoring/ServerEndpointInfoSource.cs new file mode 100644 index 000000000..6a9d22e71 --- /dev/null +++ b/src/Microsoft.Diagnostics.Monitoring/ServerEndpointInfoSource.cs @@ -0,0 +1,248 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT license. +// See the LICENSE file in the project root for more information. + +using System; +using System.Collections.Generic; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; +using Microsoft.Diagnostics.NETCore.Client; + +namespace Microsoft.Diagnostics.Monitoring +{ + /// + /// Aggregates diagnostic endpoints that are established at a transport path via a reversed server. + /// + internal class ServerEndpointInfoSource : IEndpointInfoSourceInternal, IAsyncDisposable + { + // The amount of time to wait when checking if the a endpoint info should be + // pruned from the list of endpoint infos. If the runtime doesn't have a viable connection within + // this time, it will be pruned from the list. + private static readonly TimeSpan PruneWaitForConnectionTimeout = TimeSpan.FromMilliseconds(250); + + private readonly CancellationTokenSource _cancellation = new CancellationTokenSource(); + private readonly IList _endpointInfos = new List(); + private readonly SemaphoreSlim _endpointInfosSemaphore = new SemaphoreSlim(1); + private readonly string _transportPath; + + private Task _listenTask; + private bool _disposed = false; + private ReversedDiagnosticsServer _server; + + /// + /// Constructs a that aggreates diagnostic endpoints + /// from a reversed diagnostics server at path specified by . + /// + /// + /// The path of the server endpoint. + /// On Windows, this can be a full pipe path or the name without the "\\.\pipe\" prefix. + /// On all other systems, this must be the full file path of the socket. + /// + public ServerEndpointInfoSource(string transportPath) + { + _transportPath = transportPath; + } + + public async ValueTask DisposeAsync() + { + if (!_disposed) + { + _cancellation.Cancel(); + + if (null != _listenTask) + { + await _listenTask.ConfigureAwait(false); + } + + _server?.Dispose(); + + _endpointInfosSemaphore.Dispose(); + + _cancellation.Dispose(); + + _disposed = true; + } + } + + /// + /// Starts listening to the reversed diagnostics server for new connections. + /// + public void Listen() + { + Listen(ReversedDiagnosticsServer.MaxAllowedConnections); + } + + /// + /// Starts listening to the reversed diagnostics server for new connections. + /// + /// The maximum number of connections the server will support. + public void Listen(int maxConnections) + { + VerifyNotDisposed(); + + if (null != _server || null != _listenTask) + { + throw new InvalidOperationException(nameof(ServerEndpointInfoSource.Listen) + " method can only be called once."); + } + + _server = new ReversedDiagnosticsServer(_transportPath, maxConnections); + + _listenTask = ListenAsync(_cancellation.Token); + } + + /// + /// Gets the list of served from the reversed diagnostics server. + /// + /// The token to monitor for cancellation requests. + /// A list of active instances. + public async Task> GetEndpointInfoAsync(CancellationToken token) + { + VerifyNotDisposed(); + + using CancellationTokenSource linkedSource = CancellationTokenSource.CreateLinkedTokenSource(token, _cancellation.Token); + CancellationToken linkedToken = linkedSource.Token; + + // Prune connections that no longer have an active runtime instance before + // returning the list of connections. + await _endpointInfosSemaphore.WaitAsync(linkedToken).ConfigureAwait(false); + try + { + // Check the transport for each endpoint info and remove it if the check fails. + var endpointInfos = _endpointInfos.ToList(); + + var pruneTasks = new List(); + foreach (IpcEndpointInfo info in endpointInfos) + { + pruneTasks.Add(Task.Run(() => PruneIfNotViable(info, linkedToken), linkedToken)); + } + + await Task.WhenAll(pruneTasks).ConfigureAwait(false); + + return _endpointInfos.Select(c => new EndpointInfo(c)); + } + finally + { + _endpointInfosSemaphore.Release(); + } + } + + private async Task PruneIfNotViable(IpcEndpointInfo info, CancellationToken token) + { + using var timeoutSource = new CancellationTokenSource(); + using var linkedSource = CancellationTokenSource.CreateLinkedTokenSource(token, timeoutSource.Token); + + try + { + timeoutSource.CancelAfter(PruneWaitForConnectionTimeout); + + await info.Endpoint.WaitForConnectionAsync(linkedSource.Token).ConfigureAwait(false); + } + catch + { + // Only remove the endpoint info if due to some exception + // other than cancelling the pruning operation. + if (!token.IsCancellationRequested) + { + _endpointInfos.Remove(info); + OnRemovedEndpointInfo(info); + _server.RemoveConnection(info.RuntimeInstanceCookie); + } + } + } + + /// + /// Accepts endpoint infos from the reversed diagnostics server. + /// + /// The token to monitor for cancellation requests. + private async Task ListenAsync(CancellationToken token) + { + // Continuously accept endpoint infos from the reversed diagnostics server so + // that + // is always awaited in order to to handle new runtime instance connections + // as well as existing runtime instance reconnections. + while (!token.IsCancellationRequested) + { + try + { + IpcEndpointInfo info = await _server.AcceptAsync(token).ConfigureAwait(false); + + _ = Task.Run(() => ResumeAndQueueEndpointInfo(info, token), token); + } + catch (OperationCanceledException) + { + } + } + } + + private async Task ResumeAndQueueEndpointInfo(IpcEndpointInfo info, CancellationToken token) + { + try + { + // Send ResumeRuntime message for runtime instances that connect to the server. This will allow + // those instances that are configured to pause on start to resume after the diagnostics + // connection has been made. Instances that are not configured to pause on startup will ignore + // the command and return success. + var client = new DiagnosticsClient(info.Endpoint); + try + { + client.ResumeRuntime(); + } + catch (ServerErrorException) + { + // The runtime likely doesn't understand the ResumeRuntime command. + } + + await _endpointInfosSemaphore.WaitAsync(token).ConfigureAwait(false); + try + { + _endpointInfos.Add(info); + + OnAddedEndpointInfo(info); + } + finally + { + _endpointInfosSemaphore.Release(); + } + } + catch (Exception) + { + _server.RemoveConnection(info.RuntimeInstanceCookie); + + throw; + } + } + + internal virtual void OnAddedEndpointInfo(IpcEndpointInfo info) + { + } + + internal virtual void OnRemovedEndpointInfo(IpcEndpointInfo info) + { + } + + private void VerifyNotDisposed() + { + if (_disposed) + { + throw new ObjectDisposedException(nameof(ServerEndpointInfoSource)); + } + } + + private class EndpointInfo : IEndpointInfo + { + private readonly IpcEndpointInfo _info; + + public EndpointInfo(IpcEndpointInfo info) + { + _info = info; + } + + public IpcEndpoint Endpoint => _info.Endpoint; + + public int ProcessId => _info.ProcessId; + + public Guid RuntimeInstanceCookie => _info.RuntimeInstanceCookie; + } + } +} diff --git a/src/Microsoft.Diagnostics.Monitoring/ServiceCollectionExtensions.cs b/src/Microsoft.Diagnostics.Monitoring/ServiceCollectionExtensions.cs new file mode 100644 index 000000000..06e16cbe0 --- /dev/null +++ b/src/Microsoft.Diagnostics.Monitoring/ServiceCollectionExtensions.cs @@ -0,0 +1,29 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT license. +// See the LICENSE file in the project root for more information. + +using Microsoft.Diagnostics.NETCore.Client; +using Microsoft.Extensions.DependencyInjection; + +namespace Microsoft.Diagnostics.Monitoring +{ + public static class ServiceCollectionExtensions + { + public static IServiceCollection AddEndpointInfoSource(this IServiceCollection services, string reversedServerAddress, int? maxConnections = null) + { + if (string.IsNullOrWhiteSpace(reversedServerAddress)) + { + return services.AddSingleton(); + } + else + { + // Construct the source now rather than delayed construction + // in order to be able to accept diagnostics connections immediately. + var serverSource = new ServerEndpointInfoSource(reversedServerAddress); + serverSource.Listen(maxConnections.GetValueOrDefault(ReversedDiagnosticsServer.MaxAllowedConnections)); + + return services.AddSingleton(serverSource); + } + } + } +} diff --git a/src/Microsoft.Diagnostics.NETCore.Client/DiagnosticsClient/DiagnosticsClient.cs b/src/Microsoft.Diagnostics.NETCore.Client/DiagnosticsClient/DiagnosticsClient.cs index 480f1e889..89a4d4240 100644 --- a/src/Microsoft.Diagnostics.NETCore.Client/DiagnosticsClient/DiagnosticsClient.cs +++ b/src/Microsoft.Diagnostics.NETCore.Client/DiagnosticsClient/DiagnosticsClient.cs @@ -9,6 +9,8 @@ using System.IO; using System.Linq; using System.Runtime.InteropServices; using System.Text.RegularExpressions; +using System.Threading; +using System.Threading.Tasks; namespace Microsoft.Diagnostics.NETCore.Client { @@ -17,22 +19,28 @@ namespace Microsoft.Diagnostics.NETCore.Client /// public sealed class DiagnosticsClient { - private readonly int _processId; + private readonly IpcEndpoint _endpoint; - public DiagnosticsClient(int processId) + public DiagnosticsClient(int processId) : + this(new PidIpcEndpoint(processId)) { - _processId = processId; + } + + internal DiagnosticsClient(IpcEndpoint endpoint) + { + _endpoint = endpoint; } /// - /// Checks that the client is able to communicate with target process over diagnostic transport. + /// Wait for an available diagnostic endpoint to the runtime instance. /// + /// The token to monitor for cancellation requests. /// - /// True if client is able to communicate with target process; otherwise, false. + /// A task the completes when a diagnostic endpoint to the runtime instance becomes available. /// - public bool CheckTransport() + internal Task WaitForConnectionAsync(CancellationToken token) { - return IpcClient.CheckTransport(_processId); + return _endpoint.WaitForConnectionAsync(token); } /// @@ -46,7 +54,7 @@ namespace Microsoft.Diagnostics.NETCore.Client /// public EventPipeSession StartEventPipeSession(IEnumerable providers, bool requestRundown=true, int circularBufferMB=256) { - return new EventPipeSession(_processId, providers, requestRundown, circularBufferMB); + return new EventPipeSession(_endpoint, providers, requestRundown, circularBufferMB); } /// @@ -60,7 +68,7 @@ namespace Microsoft.Diagnostics.NETCore.Client /// public EventPipeSession StartEventPipeSession(EventPipeProvider provider, bool requestRundown=true, int circularBufferMB=256) { - return new EventPipeSession(_processId, new[] { provider }, requestRundown, circularBufferMB); + return new EventPipeSession(_endpoint, new[] { provider }, requestRundown, circularBufferMB); } /// @@ -76,16 +84,16 @@ namespace Microsoft.Diagnostics.NETCore.Client byte[] payload = SerializeCoreDump(dumpPath, dumpType, logDumpGeneration); IpcMessage message = new IpcMessage(DiagnosticsServerCommandSet.Dump, (byte)DumpCommandId.GenerateCoreDump, payload); - IpcMessage response = IpcClient.SendMessage(_processId, message); - switch ((DiagnosticsServerCommandId)response.Header.CommandId) + IpcMessage response = IpcClient.SendMessage(_endpoint, message); + switch ((DiagnosticsServerResponseId)response.Header.CommandId) { - case DiagnosticsServerCommandId.Error: + case DiagnosticsServerResponseId.Error: uint hr = BitConverter.ToUInt32(response.Payload, 0); if (hr == (uint)DiagnosticsIpcError.UnknownCommand) { throw new PlatformNotSupportedException($"Unsupported operating system: {RuntimeInformation.OSDescription}"); } throw new ServerErrorException($"Writing dump failed (HRESULT: 0x{hr:X8})"); - case DiagnosticsServerCommandId.OK: + case DiagnosticsServerResponseId.OK: return; default: throw new ServerErrorException($"Writing dump failed - server responded with unknown command"); @@ -113,13 +121,13 @@ namespace Microsoft.Diagnostics.NETCore.Client byte[] serializedConfiguration = SerializeProfilerAttach((uint)attachTimeout.TotalSeconds, profilerGuid, profilerPath, additionalData); var message = new IpcMessage(DiagnosticsServerCommandSet.Profiler, (byte)ProfilerCommandId.AttachProfiler, serializedConfiguration); - var response = IpcClient.SendMessage(_processId, message); - switch ((DiagnosticsServerCommandId)response.Header.CommandId) + var response = IpcClient.SendMessage(_endpoint, message); + switch ((DiagnosticsServerResponseId)response.Header.CommandId) { - case DiagnosticsServerCommandId.Error: + case DiagnosticsServerResponseId.Error: var hr = BitConverter.ToInt32(response.Payload, 0); throw new ServerErrorException($"Profiler attach failed (HRESULT: 0x{hr:X8})"); - case DiagnosticsServerCommandId.OK: + case DiagnosticsServerResponseId.OK: return; default: throw new ServerErrorException($"Profiler attach failed - server responded with unknown command"); @@ -130,6 +138,42 @@ namespace Microsoft.Diagnostics.NETCore.Client // runtime timeout or respect attachTimeout as one total duration. } + internal void ResumeRuntime() + { + IpcMessage message = new IpcMessage(DiagnosticsServerCommandSet.Process, (byte)ProcessCommandId.ResumeRuntime); + var response = IpcClient.SendMessage(_endpoint, message); + switch ((DiagnosticsServerResponseId)response.Header.CommandId) + { + case DiagnosticsServerResponseId.Error: + // Try fallback for Preview 7 and Preview 8 + ResumeRuntimeFallback(); + //var hr = BitConverter.ToInt32(response.Payload, 0); + //throw new ServerErrorException($"Resume runtime failed (HRESULT: 0x{hr:X8})"); + return; + case DiagnosticsServerResponseId.OK: + return; + default: + throw new ServerErrorException($"Resume runtime failed - server responded with unknown command"); + } + } + + // Fallback command for .NET 5 Preview 7 and Preview 8 + internal void ResumeRuntimeFallback() + { + IpcMessage message = new IpcMessage(DiagnosticsServerCommandSet.Server, (byte)DiagnosticServerCommandId.ResumeRuntime); + var response = IpcClient.SendMessage(_endpoint, message); + switch ((DiagnosticsServerResponseId)response.Header.CommandId) + { + case DiagnosticsServerResponseId.Error: + var hr = BitConverter.ToInt32(response.Payload, 0); + throw new ServerErrorException($"Resume runtime failed (HRESULT: 0x{hr:X8})"); + case DiagnosticsServerResponseId.OK: + return; + default: + throw new ServerErrorException($"Resume runtime failed - server responded with unknown command"); + } + } + /// /// Get all the active processes that can be attached to. /// @@ -138,10 +182,10 @@ namespace Microsoft.Diagnostics.NETCore.Client /// public static IEnumerable GetPublishedProcesses() { - return Directory.GetFiles(IpcClient.IpcRootPath) + return Directory.GetFiles(PidIpcEndpoint.IpcRootPath) .Select(namedPipe => (new FileInfo(namedPipe)).Name) - .Where(input => Regex.IsMatch(input, IpcClient.DiagnosticsPortPattern)) - .Select(input => int.Parse(Regex.Match(input, IpcClient.DiagnosticsPortPattern).Groups[1].Value, NumberStyles.Integer)) + .Where(input => Regex.IsMatch(input, PidIpcEndpoint.DiagnosticsPortPattern)) + .Select(input => int.Parse(Regex.Match(input, PidIpcEndpoint.DiagnosticsPortPattern).Groups[1].Value, NumberStyles.Integer)) .Distinct(); } diff --git a/src/Microsoft.Diagnostics.NETCore.Client/DiagnosticsClient/EventPipeSession.cs b/src/Microsoft.Diagnostics.NETCore.Client/DiagnosticsClient/EventPipeSession.cs index fc6fa847d..d62b7e616 100644 --- a/src/Microsoft.Diagnostics.NETCore.Client/DiagnosticsClient/EventPipeSession.cs +++ b/src/Microsoft.Diagnostics.NETCore.Client/DiagnosticsClient/EventPipeSession.cs @@ -15,25 +15,25 @@ namespace Microsoft.Diagnostics.NETCore.Client private bool _requestRundown; private int _circularBufferMB; private long _sessionId; - private int _processId; + private IpcEndpoint _endpoint; private bool disposedValue = false; // To detect redundant calls - internal EventPipeSession(int processId, IEnumerable providers, bool requestRundown, int circularBufferMB) + internal EventPipeSession(IpcEndpoint endpoint, IEnumerable providers, bool requestRundown, int circularBufferMB) { - _processId = processId; + _endpoint = endpoint; _providers = providers; _requestRundown = requestRundown; _circularBufferMB = circularBufferMB; var config = new EventPipeSessionConfiguration(circularBufferMB, EventPipeSerializationFormat.NetTrace, providers, requestRundown); var message = new IpcMessage(DiagnosticsServerCommandSet.EventPipe, (byte)EventPipeCommandId.CollectTracing2, config.SerializeV2()); - EventStream = IpcClient.SendMessage(processId, message, out var response); - switch ((DiagnosticsServerCommandId)response.Header.CommandId) + EventStream = IpcClient.SendMessage(endpoint, message, out var response); + switch ((DiagnosticsServerResponseId)response.Header.CommandId) { - case DiagnosticsServerCommandId.OK: + case DiagnosticsServerResponseId.OK: _sessionId = BitConverter.ToInt64(response.Payload, 0); break; - case DiagnosticsServerCommandId.Error: + case DiagnosticsServerResponseId.Error: var hr = BitConverter.ToInt32(response.Payload, 0); throw new ServerErrorException($"EventPipe session start failed (HRESULT: 0x{hr:X8})"); default: @@ -51,13 +51,13 @@ namespace Microsoft.Diagnostics.NETCore.Client Debug.Assert(_sessionId > 0); byte[] payload = BitConverter.GetBytes(_sessionId); - var response = IpcClient.SendMessage(_processId, new IpcMessage(DiagnosticsServerCommandSet.EventPipe, (byte)EventPipeCommandId.StopTracing, payload)); + var response = IpcClient.SendMessage(_endpoint, new IpcMessage(DiagnosticsServerCommandSet.EventPipe, (byte)EventPipeCommandId.StopTracing, payload)); - switch ((DiagnosticsServerCommandId)response.Header.CommandId) + switch ((DiagnosticsServerResponseId)response.Header.CommandId) { - case DiagnosticsServerCommandId.OK: + case DiagnosticsServerResponseId.OK: return; - case DiagnosticsServerCommandId.Error: + case DiagnosticsServerResponseId.Error: var hr = BitConverter.ToInt32(response.Payload, 0); throw new ServerErrorException($"EventPipe session stop failed (HRESULT: 0x{hr:X8})"); default: diff --git a/src/Microsoft.Diagnostics.NETCore.Client/DiagnosticsIpc/ExposedSocketNetworkStream.cs b/src/Microsoft.Diagnostics.NETCore.Client/DiagnosticsIpc/ExposedSocketNetworkStream.cs new file mode 100644 index 000000000..dbf6e3e58 --- /dev/null +++ b/src/Microsoft.Diagnostics.NETCore.Client/DiagnosticsIpc/ExposedSocketNetworkStream.cs @@ -0,0 +1,19 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT license. +// See the LICENSE file in the project root for more information. + +using System.Net.Sockets; + +namespace Microsoft.Diagnostics.NETCore.Client +{ + internal sealed class ExposedSocketNetworkStream : + NetworkStream + { + public ExposedSocketNetworkStream(Socket socket, bool ownsSocket) + : base(socket, ownsSocket) + { + } + + public new Socket Socket => base.Socket; + } +} diff --git a/src/Microsoft.Diagnostics.NETCore.Client/DiagnosticsIpc/IpcAdvertise.cs b/src/Microsoft.Diagnostics.NETCore.Client/DiagnosticsIpc/IpcAdvertise.cs new file mode 100644 index 000000000..fbf11dbb4 --- /dev/null +++ b/src/Microsoft.Diagnostics.NETCore.Client/DiagnosticsIpc/IpcAdvertise.cs @@ -0,0 +1,91 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT license. +// See the LICENSE file in the project root for more information. + +using System; +using System.Linq; +using System.IO; +using System.Text; +using System.Threading; +using System.Threading.Tasks; + +namespace Microsoft.Diagnostics.NETCore.Client +{ + /** + * ==ADVERTISE PROTOCOL== + * Before standard IPC Protocol communication can occur on a client-mode connection + * the runtime must advertise itself over the connection. ALL SUBSEQUENT COMMUNICATION + * IS STANDARD DIAGNOSTICS IPC PROTOCOL COMMUNICATION. + * + * The flow for Advertise is a one-way burst of 34 bytes consisting of + * 8 bytes - "ADVR_V1\0" (ASCII chars + null byte) + * 16 bytes - CLR Instance Cookie (little-endian) + * 8 bytes - PID (little-endian) + * 2 bytes - future + */ + + internal sealed class IpcAdvertise + { + private static byte[] Magic_V1 => Encoding.ASCII.GetBytes("ADVR_V1" + '\0'); + private static readonly int IpcAdvertiseV1SizeInBytes = Magic_V1.Length + 16 + 8 + 2; // 34 bytes + + private IpcAdvertise(byte[] magic, Guid cookie, UInt64 pid, UInt16 future) + { + Future = future; + Magic = magic; + ProcessId = pid; + RuntimeInstanceCookie = cookie; + } + + public static async Task ParseAsync(Stream stream, CancellationToken token) + { + byte[] buffer = new byte[IpcAdvertiseV1SizeInBytes]; + + int totalRead = 0; + do + { + int read = await stream.ReadAsync(buffer, totalRead, buffer.Length - totalRead, token).ConfigureAwait(false); + if (0 == read) + { + throw new EndOfStreamException(); + } + totalRead += read; + } + while (totalRead < buffer.Length); + + int index = 0; + byte[] magic = new byte[Magic_V1.Length]; + Array.Copy(buffer, magic, Magic_V1.Length); + index += Magic_V1.Length; + + if (!Magic_V1.SequenceEqual(magic)) + { + throw new Exception("Invalid advertise message from client connection"); + } + + byte[] cookieBuffer = new byte[16]; + Array.Copy(buffer, index, cookieBuffer, 0, 16); + Guid cookie = new Guid(cookieBuffer); + index += 16; + + UInt64 pid = BitConverter.ToUInt64(buffer, index); + index += 8; + + UInt16 future = BitConverter.ToUInt16(buffer, index); + index += 2; + + // FUTURE: switch on incoming magic and change if version ever increments + return new IpcAdvertise(magic, cookie, pid, future); + } + + public override string ToString() + { + return $"{{ Magic={Magic}; ClrInstanceId={RuntimeInstanceCookie}; ProcessId={ProcessId}; Future={Future} }}"; + } + + private UInt16 Future { get; } = 0; + public byte[] Magic { get; } = Magic_V1; + public UInt64 ProcessId { get; } = 0; + public Guid RuntimeInstanceCookie { get; } = Guid.Empty; + } +} diff --git a/src/Microsoft.Diagnostics.NETCore.Client/DiagnosticsIpc/IpcClient.cs b/src/Microsoft.Diagnostics.NETCore.Client/DiagnosticsIpc/IpcClient.cs index c84395460..9dd0a2fec 100644 --- a/src/Microsoft.Diagnostics.NETCore.Client/DiagnosticsIpc/IpcClient.cs +++ b/src/Microsoft.Diagnostics.NETCore.Client/DiagnosticsIpc/IpcClient.cs @@ -3,106 +3,25 @@ // See the LICENSE file in the project root for more information. using System; -using System.Diagnostics; using System.IO; -using System.IO.Pipes; -using System.Linq; -using System.Net; -using System.Net.Sockets; -using System.Runtime.InteropServices; -using System.Security.Principal; +using System.Threading; namespace Microsoft.Diagnostics.NETCore.Client { internal class IpcClient { - public static string IpcRootPath { get; } = RuntimeInformation.IsOSPlatform(OSPlatform.Windows) ? @"\\.\pipe\" : Path.GetTempPath(); - public static string DiagnosticsPortPattern { get; } = RuntimeInformation.IsOSPlatform(OSPlatform.Windows) ? @"^dotnet-diagnostic-(\d+)$" : @"^dotnet-diagnostic-(\d+)-(\d+)-socket$"; - - private static double ConnectTimeoutMilliseconds { get; } = TimeSpan.FromSeconds(3).TotalMilliseconds; - - /// - /// Get the OS Transport to be used for communicating with a dotnet process. - /// - /// The PID of the dotnet process to get the transport for - /// A System.IO.Stream wrapper around the transport - private static Stream GetTransport(int processId) - { - try - { - var process = Process.GetProcessById(processId); - } - catch (System.ArgumentException) - { - throw new ServerNotAvailableException($"Process {processId} is not running."); - } - catch (System.InvalidOperationException) - { - throw new ServerNotAvailableException($"Process {processId} seems to be elevated."); - } - - if (RuntimeInformation.IsOSPlatform(OSPlatform.Windows)) - { - string pipeName = $"dotnet-diagnostic-{processId}"; - var namedPipe = new NamedPipeClientStream( - ".", pipeName, PipeDirection.InOut, PipeOptions.None, TokenImpersonationLevel.Impersonation); - namedPipe.Connect((int)ConnectTimeoutMilliseconds); - return namedPipe; - } - else - { - string ipcPort; - try - { - ipcPort = Directory.GetFiles(IpcRootPath, $"dotnet-diagnostic-{processId}-*-socket") // Try best match. - .OrderByDescending(f => new FileInfo(f).LastWriteTime) - .FirstOrDefault(); - if (ipcPort == null) - { - throw new ServerNotAvailableException($"Process {processId} not running compatible .NET Core runtime."); - } - } - catch (InvalidOperationException) - { - throw new ServerNotAvailableException($"Process {processId} not running compatible .NET Core runtime."); - } - string path = Path.Combine(IpcRootPath, ipcPort); - var remoteEP = CreateUnixDomainSocketEndPoint(path); - - var socket = new Socket(AddressFamily.Unix, SocketType.Stream, ProtocolType.Unspecified); - socket.Connect(remoteEP); - return new NetworkStream(socket, ownsSocket: true); - } - } - - /// - /// Checks that the client is able to communicate with target process over diagnostic transport. - /// - /// - /// True if client is able to communicate with target process; otherwise, false. - /// - public static bool CheckTransport(int processId) - { - try - { - using var stream = GetTransport(processId); - return null != stream; - } - catch (Exception) - { - return false; - } - } + // The amount of time to wait for a stream to be available for consumption by the Connect method. + private static readonly TimeSpan ConnectTimeout = TimeSpan.FromSeconds(3); /// /// Sends a single DiagnosticsIpc Message to the dotnet process with PID processId. /// - /// The PID of the dotnet process + /// An endpoint that provides a diagnostics connection to a runtime instance. /// The DiagnosticsIpc Message to be sent /// The response DiagnosticsIpc Message from the dotnet process - public static IpcMessage SendMessage(int processId, IpcMessage message) + public static IpcMessage SendMessage(IpcEndpoint endpoint, IpcMessage message) { - using (var stream = GetTransport(processId)) + using (var stream = endpoint.Connect(ConnectTimeout)) { Write(stream, message); return Read(stream); @@ -113,13 +32,13 @@ namespace Microsoft.Diagnostics.NETCore.Client /// Sends a single DiagnosticsIpc Message to the dotnet process with PID processId /// and returns the Stream for reuse in Optional Continuations. /// - /// The PID of the dotnet process + /// An endpoint that provides a diagnostics connection to a runtime instance. /// The DiagnosticsIpc Message to be sent /// out var for response message /// The response DiagnosticsIpc Message from the dotnet process - public static Stream SendMessage(int processId, IpcMessage message, out IpcMessage response) + public static Stream SendMessage(IpcEndpoint endpoint, IpcMessage message, out IpcMessage response) { - var stream = GetTransport(processId); + var stream = endpoint.Connect(ConnectTimeout); Write(stream, message); response = Read(stream); return stream; @@ -139,22 +58,5 @@ namespace Microsoft.Diagnostics.NETCore.Client { return IpcMessage.Parse(stream); } - - private static EndPoint CreateUnixDomainSocketEndPoint(string path) - { -#if NETCOREAPP - return new UnixDomainSocketEndPoint(path); -#elif NETSTANDARD2_0 - // UnixDomainSocketEndPoint is not part of .NET Standard 2.0 - var type = typeof(Socket).Assembly.GetType("System.Net.Sockets.UnixDomainSocketEndPoint") - ?? Type.GetType("System.Net.Sockets.UnixDomainSocketEndPoint, System.Core"); - if (type == null) - { - throw new PlatformNotSupportedException("Current process is not running a compatible .NET Core runtime."); - } - var ctor = type.GetConstructor(new[] { typeof(string) }); - return (EndPoint)ctor.Invoke(new object[] { path }); -#endif - } } } diff --git a/src/Microsoft.Diagnostics.NETCore.Client/DiagnosticsIpc/IpcCommands.cs b/src/Microsoft.Diagnostics.NETCore.Client/DiagnosticsIpc/IpcCommands.cs index 7c870e3c1..40db46de6 100644 --- a/src/Microsoft.Diagnostics.NETCore.Client/DiagnosticsIpc/IpcCommands.cs +++ b/src/Microsoft.Diagnostics.NETCore.Client/DiagnosticsIpc/IpcCommands.cs @@ -13,14 +13,27 @@ namespace Microsoft.Diagnostics.NETCore.Client Dump = 0x01, EventPipe = 0x02, Profiler = 0x03, + Process = 0x04, Server = 0xFF, } - internal enum DiagnosticsServerCommandId : byte + // For .NET 5 Preview 7 and Preview 8, use this with the + // DiagnosticsServerCommandSet.Server command set. + // For .NET 5 RC and later, use ProcessCommandId.ResumeRuntime with + // the DiagnosticsServerCommandSet.Process command set. + internal enum DiagnosticServerCommandId : byte { - OK = 0x00, - Error = 0xFF, + // 0x00 used in DiagnosticServerResponseId + ResumeRuntime = 0x01, + // 0xFF used DiagnosticServerResponseId + }; + + internal enum DiagnosticsServerResponseId : byte + { + OK = 0x00, + // future + Error = 0xFF, } internal enum EventPipeCommandId : byte @@ -39,4 +52,10 @@ namespace Microsoft.Diagnostics.NETCore.Client { AttachProfiler = 0x01, } + + internal enum ProcessCommandId : byte + { + GetProcessInfo = 0x00, + ResumeRuntime = 0x01 + } } diff --git a/src/Microsoft.Diagnostics.NETCore.Client/DiagnosticsIpc/IpcMessage.cs b/src/Microsoft.Diagnostics.NETCore.Client/DiagnosticsIpc/IpcMessage.cs index 9f03a66c6..b015673e1 100644 --- a/src/Microsoft.Diagnostics.NETCore.Client/DiagnosticsIpc/IpcMessage.cs +++ b/src/Microsoft.Diagnostics.NETCore.Client/DiagnosticsIpc/IpcMessage.cs @@ -72,9 +72,9 @@ namespace Microsoft.Diagnostics.NETCore.Client public IpcMessage() { } - public IpcMessage(IpcHeader header, byte[] payload) + public IpcMessage(IpcHeader header, byte[] payload = null) { - Payload = payload; + Payload = payload ?? Array.Empty(); Header = header; } @@ -90,7 +90,7 @@ namespace Microsoft.Diagnostics.NETCore.Client { byte[] serializedData = null; // Verify things will fit in the size capacity - Header.Size = checked((UInt16)(IpcHeader.HeaderSizeInBytes + Payload.Length)); ; + Header.Size = checked((UInt16)(IpcHeader.HeaderSizeInBytes + Payload.Length)); byte[] headerBytes = Header.Serialize(); using (var stream = new MemoryStream()) diff --git a/src/Microsoft.Diagnostics.NETCore.Client/DiagnosticsIpc/IpcServerTransport.cs b/src/Microsoft.Diagnostics.NETCore.Client/DiagnosticsIpc/IpcServerTransport.cs new file mode 100644 index 000000000..3881ac26e --- /dev/null +++ b/src/Microsoft.Diagnostics.NETCore.Client/DiagnosticsIpc/IpcServerTransport.cs @@ -0,0 +1,201 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT license. +// See the LICENSE file in the project root for more information. + +using System; +using System.IO; +using System.IO.Pipes; +using System.Net.Sockets; +using System.Runtime.InteropServices; +using System.Threading; +using System.Threading.Tasks; + +namespace Microsoft.Diagnostics.NETCore.Client +{ + internal abstract class IpcServerTransport : IDisposable + { + private bool _disposed; + + public static IpcServerTransport Create(string transportPath, int maxConnections) + { + if (RuntimeInformation.IsOSPlatform(OSPlatform.Windows)) + { + return new WindowsPipeServerTransport(transportPath, maxConnections); + } + else + { + return new UnixDomainSocketServerTransport(transportPath, maxConnections); + } + } + + public void Dispose() + { + if (!_disposed) + { + Dispose(disposing: true); + + _disposed = true; + } + } + + protected virtual void Dispose(bool disposing) + { + } + + public abstract Task AcceptAsync(CancellationToken token); + + public static int MaxAllowedConnections + { + get + { + if (RuntimeInformation.IsOSPlatform(OSPlatform.Windows)) + { + return NamedPipeServerStream.MaxAllowedServerInstances; + } + else + { + return (int)SocketOptionName.MaxConnections; + } + } + } + + protected void VerifyNotDisposed() + { + if (_disposed) + { + throw new ObjectDisposedException(this.GetType().Name); + } + } + } + + internal sealed class WindowsPipeServerTransport : IpcServerTransport + { + private const string PipePrefix = @"\\.\pipe\"; + + private NamedPipeServerStream _stream; + + private readonly CancellationTokenSource _cancellation = new CancellationTokenSource(); + private readonly string _pipeName; + private readonly int _maxInstances; + + public WindowsPipeServerTransport(string pipeName, int maxInstances) + { + _maxInstances = maxInstances; + _pipeName = pipeName.StartsWith(PipePrefix) ? pipeName.Substring(PipePrefix.Length) : pipeName; + CreateNewPipeServer(); + } + + protected override void Dispose(bool disposing) + { + if (disposing) + { + _cancellation.Cancel(); + + _stream.Dispose(); + + _cancellation.Dispose(); + } + } + + public override async Task AcceptAsync(CancellationToken token) + { + VerifyNotDisposed(); + + using var linkedSource = CancellationTokenSource.CreateLinkedTokenSource(token, _cancellation.Token); + + NamedPipeServerStream connectedStream; + try + { + await _stream.WaitForConnectionAsync(linkedSource.Token).ConfigureAwait(false); + + connectedStream = _stream; + } + finally + { + if (!_cancellation.IsCancellationRequested) + { + CreateNewPipeServer(); + } + } + return connectedStream; + } + + private void CreateNewPipeServer() + { + _stream = new NamedPipeServerStream( + _pipeName, + PipeDirection.InOut, + _maxInstances, + PipeTransmissionMode.Byte, + PipeOptions.Asynchronous); + } + } + + internal sealed class UnixDomainSocketServerTransport : IpcServerTransport + { + private readonly CancellationTokenSource _cancellation = new CancellationTokenSource(); + private readonly int _backlog; + private readonly string _path; + + private UnixDomainSocket _socket; + + public UnixDomainSocketServerTransport(string path, int backlog) + { + _backlog = backlog; + _path = path; + + CreateNewSocketServer(); + } + + protected override void Dispose(bool disposing) + { + if (disposing) + { + _cancellation.Cancel(); + + try + { + _socket.Shutdown(SocketShutdown.Both); + } + catch { } + finally + { + _socket.Close(0); + } + _socket.Dispose(); + + _cancellation.Dispose(); + } + } + + public override async Task AcceptAsync(CancellationToken token) + { + VerifyNotDisposed(); + + using var linkedSource = CancellationTokenSource.CreateLinkedTokenSource(token, _cancellation.Token); + try + { + Socket socket = await _socket.AcceptAsync(linkedSource.Token).ConfigureAwait(false); + + return new ExposedSocketNetworkStream(socket, ownsSocket: true); + } + catch (Exception) + { + // Recreate socket if transport is not disposed. + if (!_cancellation.IsCancellationRequested) + { + CreateNewSocketServer(); + } + throw; + } + } + + private void CreateNewSocketServer() + { + _socket = new UnixDomainSocket(); + _socket.Bind(_path); + _socket.Listen(_backlog); + _socket.LingerState.Enabled = false; + } + } +} diff --git a/src/Microsoft.Diagnostics.NETCore.Client/DiagnosticsIpc/IpcTransport.cs b/src/Microsoft.Diagnostics.NETCore.Client/DiagnosticsIpc/IpcTransport.cs new file mode 100644 index 000000000..f742f912f --- /dev/null +++ b/src/Microsoft.Diagnostics.NETCore.Client/DiagnosticsIpc/IpcTransport.cs @@ -0,0 +1,207 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT license. +// See the LICENSE file in the project root for more information. + +using System; +using System.Diagnostics; +using System.IO; +using System.IO.Pipes; +using System.Linq; +using System.Runtime.InteropServices; +using System.Security.Principal; +using System.Threading; +using System.Threading.Tasks; + +namespace Microsoft.Diagnostics.NETCore.Client +{ + internal abstract class IpcEndpoint + { + /// + /// Connects to the underlying IPC transport and opens a read/write-able Stream + /// + /// The amount of time to block attempting to connect + /// A Stream for writing and reading data to and from the target .NET process + public abstract Stream Connect(TimeSpan timeout); + + /// + /// Wait for an available diagnostic endpoint to the runtime instance. + /// + /// The token to monitor for cancellation requests. + /// + /// A task the completes when a diagnostic endpoint to the runtime instance becomes available. + /// + public abstract Task WaitForConnectionAsync(CancellationToken token); + } + + internal class ServerIpcEndpoint : IpcEndpoint + { + private readonly Guid _runtimeId; + private readonly ReversedDiagnosticsServer _server; + + public ServerIpcEndpoint(ReversedDiagnosticsServer server, Guid runtimeId) + { + _runtimeId = runtimeId; + _server = server; + } + + /// + /// This will block until the diagnostic stream is provided. This block can happen if + /// the stream is acquired previously and the runtime instance has not yet reconnected + /// to the reversed diagnostics server. + /// + public override Stream Connect(TimeSpan timeout) + { + return _server.Connect(_runtimeId, timeout); + } + + public override async Task WaitForConnectionAsync(CancellationToken token) + { + await _server.WaitForConnectionAsync(_runtimeId, token).ConfigureAwait(false); + } + + public override bool Equals(object obj) + { + return Equals(obj as ServerIpcEndpoint); + } + + public bool Equals(ServerIpcEndpoint other) + { + return other != null && other._runtimeId == _runtimeId && other._server == _server; + } + + public override int GetHashCode() + { + return _runtimeId.GetHashCode() ^ _server.GetHashCode(); + } + } + + internal class PidIpcEndpoint : IpcEndpoint + { + public static string IpcRootPath { get; } = RuntimeInformation.IsOSPlatform(OSPlatform.Windows) ? @"\\.\pipe\" : Path.GetTempPath(); + public static string DiagnosticsPortPattern { get; } = RuntimeInformation.IsOSPlatform(OSPlatform.Windows) ? @"^dotnet-diagnostic-(\d+)$" : @"^dotnet-diagnostic-(\d+)-(\d+)-socket$"; + + private int _pid; + + /// + /// Creates a reference to a .NET process's IPC Transport + /// using the default rules for a given pid + /// + /// The pid of the target process + /// A reference to the IPC Transport + public PidIpcEndpoint(int pid) + { + _pid = pid; + } + + public override Stream Connect(TimeSpan timeout) + { + string address = GetDefaultAddress(); + if (RuntimeInformation.IsOSPlatform(OSPlatform.Windows)) + { + var namedPipe = new NamedPipeClientStream( + ".", + address, + PipeDirection.InOut, + PipeOptions.None, + TokenImpersonationLevel.Impersonation); + namedPipe.Connect((int)timeout.TotalMilliseconds); + return namedPipe; + } + else + { + var socket = new UnixDomainSocket(); + socket.Connect(Path.Combine(IpcRootPath, address), timeout); + return new ExposedSocketNetworkStream(socket, ownsSocket: true); + } + } + + public override async Task WaitForConnectionAsync(CancellationToken token) + { + using var _ = await ConnectStreamAsync(token).ConfigureAwait(false); + } + + async Task ConnectStreamAsync(CancellationToken token) + { + string address = GetDefaultAddress(); + if (RuntimeInformation.IsOSPlatform(OSPlatform.Windows)) + { + var namedPipe = new NamedPipeClientStream( + ".", + address, + PipeDirection.InOut, + PipeOptions.None, + TokenImpersonationLevel.Impersonation); + await namedPipe.ConnectAsync(token).ConfigureAwait(false); + return namedPipe; + } + else + { + var socket = new UnixDomainSocket(); + await socket.ConnectAsync(Path.Combine(IpcRootPath, address), token).ConfigureAwait(false); + return new ExposedSocketNetworkStream(socket, ownsSocket: true); + } + } + + private string GetDefaultAddress() + { + try + { + var process = Process.GetProcessById(_pid); + } + catch (ArgumentException) + { + throw new ServerNotAvailableException($"Process {_pid} is not running."); + } + catch (InvalidOperationException) + { + throw new ServerNotAvailableException($"Process {_pid} seems to be elevated."); + } + + if (!TryGetDefaultAddress(_pid, out string transportName)) + { + throw new ServerNotAvailableException($"Process {_pid} not running compatible .NET runtime."); + } + + return transportName; + } + + private static bool TryGetDefaultAddress(int pid, out string defaultAddress) + { + defaultAddress = null; + + if (RuntimeInformation.IsOSPlatform(OSPlatform.Windows)) + { + defaultAddress = $"dotnet-diagnostic-{pid}"; + } + else + { + try + { + defaultAddress = Directory.GetFiles(IpcRootPath, $"dotnet-diagnostic-{pid}-*-socket") // Try best match. + .OrderByDescending(f => new FileInfo(f).LastWriteTime) + .FirstOrDefault(); + } + catch (InvalidOperationException) + { + } + } + + return !string.IsNullOrEmpty(defaultAddress); + } + + public override bool Equals(object obj) + { + return Equals(obj as PidIpcEndpoint); + } + + public bool Equals(PidIpcEndpoint other) + { + return other != null && other._pid == _pid; + } + + public override int GetHashCode() + { + return _pid.GetHashCode(); + } + } +} \ No newline at end of file diff --git a/src/Microsoft.Diagnostics.NETCore.Client/DiagnosticsIpc/UnixDomainSocket.cs b/src/Microsoft.Diagnostics.NETCore.Client/DiagnosticsIpc/UnixDomainSocket.cs new file mode 100644 index 000000000..4cf78a6fd --- /dev/null +++ b/src/Microsoft.Diagnostics.NETCore.Client/DiagnosticsIpc/UnixDomainSocket.cs @@ -0,0 +1,124 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT license. +// See the LICENSE file in the project root for more information. + +using System; +using System.Diagnostics; +using System.IO; +using System.Net; +using System.Net.Sockets; +using System.Threading; +using System.Threading.Tasks; + +namespace Microsoft.Diagnostics.NETCore.Client +{ + internal sealed class UnixDomainSocket : Socket + { + private bool _ownsSocketFile; + private string _path; + + public UnixDomainSocket() : + base(AddressFamily.Unix, SocketType.Stream, ProtocolType.Unspecified) + { + } + + public async Task AcceptAsync(CancellationToken token) + { + using (token.Register(() => Close(0))) + { + try + { + return await Task.Factory.FromAsync(BeginAccept, EndAccept, this).ConfigureAwait(false); + } + // When the socket is closed, the FromAsync logic will try to call EndAccept on the socket, + // but that will throw an ObjectDisposedException. Only catch the exception if due to cancellation. + catch (ObjectDisposedException) when (token.IsCancellationRequested) + { + // First check if the cancellation token caused the closing of the socket, + // then rethrow the exception if it did not. + token.ThrowIfCancellationRequested(); + + Debug.Fail("Token should have thrown cancellation exception."); + return null; + } + } + } + + public void Bind(string path) + { + Bind(CreateUnixDomainSocketEndPoint(path)); + + _ownsSocketFile = true; + _path = path; + } + + public void Connect(string path, TimeSpan timeout) + { + IAsyncResult result = BeginConnect(CreateUnixDomainSocketEndPoint(path), null, null); + + if (result.AsyncWaitHandle.WaitOne(timeout)) + { + EndConnect(result); + + _ownsSocketFile = false; + _path = path; + } + else + { + Close(0); + throw new TimeoutException(); + } + } + + public async Task ConnectAsync(string path, CancellationToken token) + { + using (token.Register(() => Close(0))) + { + try + { + Func beginConnect = (callback, state) => + { + return BeginConnect(CreateUnixDomainSocketEndPoint(path), callback, state); + }; + await Task.Factory.FromAsync(beginConnect, EndConnect, this).ConfigureAwait(false); + } + // When the socket is closed, the FromAsync logic will try to call EndAccept on the socket, + // but that will throw an ObjectDisposedException. Only catch the exception if due to cancellation. + catch (ObjectDisposedException) when (token.IsCancellationRequested) + { + // First check if the cancellation token caused the closing of the socket, + // then rethrow the exception if it did not. + token.ThrowIfCancellationRequested(); + } + } + } + + protected override void Dispose(bool disposing) + { + if (disposing) + { + if (_ownsSocketFile && !string.IsNullOrEmpty(_path) && File.Exists(_path)) + { + File.Delete(_path); + } + } + base.Dispose(disposing); + } + + private static EndPoint CreateUnixDomainSocketEndPoint(string path) + { +#if NETCOREAPP + return new UnixDomainSocketEndPoint(path); +#elif NETSTANDARD2_0 + // UnixDomainSocketEndPoint is not part of .NET Standard 2.0 + var type = typeof(Socket).Assembly.GetType("System.Net.Sockets.UnixDomainSocketEndPoint"); + if (type == null) + { + throw new PlatformNotSupportedException("Current process is not running a compatible .NET runtime."); + } + var ctor = type.GetConstructor(new[] { typeof(string) }); + return (EndPoint)ctor.Invoke(new object[] { path }); +#endif + } + } +} diff --git a/src/Microsoft.Diagnostics.NETCore.Client/Microsoft.Diagnostics.NETCore.Client.csproj b/src/Microsoft.Diagnostics.NETCore.Client/Microsoft.Diagnostics.NETCore.Client.csproj index b9fb1b55e..b80780a73 100644 --- a/src/Microsoft.Diagnostics.NETCore.Client/Microsoft.Diagnostics.NETCore.Client.csproj +++ b/src/Microsoft.Diagnostics.NETCore.Client/Microsoft.Diagnostics.NETCore.Client.csproj @@ -12,4 +12,10 @@ true true + + + + + + diff --git a/src/Microsoft.Diagnostics.NETCore.Client/NativeMethods.cs b/src/Microsoft.Diagnostics.NETCore.Client/NativeMethods.cs new file mode 100644 index 000000000..3962bb400 --- /dev/null +++ b/src/Microsoft.Diagnostics.NETCore.Client/NativeMethods.cs @@ -0,0 +1,24 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT license. +// See the LICENSE file in the project root for more information. + +using System; +using System.Runtime.InteropServices; +using Microsoft.Win32.SafeHandles; + +namespace Microsoft.Diagnostics.NETCore.Client +{ + internal class NativeMethods + { + [DllImport("kernel32.dll", SetLastError = true)] + [return: MarshalAs(UnmanagedType.Bool)] + internal static extern bool PeekNamedPipe( + SafePipeHandle hNamedPipe, + byte[] lpBuffer, + int bufferSize, + IntPtr lpBytesRead, + IntPtr lpTotalBytesAvail, + IntPtr lpBytesLeftThisMessage + ); + } +} diff --git a/src/Microsoft.Diagnostics.NETCore.Client/ReversedServer/IpcEndpointInfo.cs b/src/Microsoft.Diagnostics.NETCore.Client/ReversedServer/IpcEndpointInfo.cs new file mode 100644 index 000000000..55b9ea9ff --- /dev/null +++ b/src/Microsoft.Diagnostics.NETCore.Client/ReversedServer/IpcEndpointInfo.cs @@ -0,0 +1,38 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT license. +// See the LICENSE file in the project root for more information. + +using System; +using System.Diagnostics; + +namespace Microsoft.Diagnostics.NETCore.Client +{ + /// + /// Represents a runtine instance connection to a reversed diagnostics server. + /// + [DebuggerDisplay("PID={ProcessId}, Cookie={RuntimeInstanceCookie}")] + internal struct IpcEndpointInfo + { + internal IpcEndpointInfo(IpcEndpoint endpoint, int processId, Guid runtimeInstanceCookie) + { + Endpoint = endpoint; + ProcessId = processId; + RuntimeInstanceCookie = runtimeInstanceCookie; + } + + /// + /// An endpoint used to retrieve diagnostic information from the associated runtime instance. + /// + public IpcEndpoint Endpoint { get; } + + /// + /// The identifier of the process that is unique within its process namespace. + /// + public int ProcessId { get; } + + /// + /// The unique identifier of the runtime instance. + /// + public Guid RuntimeInstanceCookie { get; } + } +} diff --git a/src/Microsoft.Diagnostics.NETCore.Client/ReversedServer/ReversedDiagnosticsServer.cs b/src/Microsoft.Diagnostics.NETCore.Client/ReversedServer/ReversedDiagnosticsServer.cs new file mode 100644 index 000000000..842986979 --- /dev/null +++ b/src/Microsoft.Diagnostics.NETCore.Client/ReversedServer/ReversedDiagnosticsServer.cs @@ -0,0 +1,418 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT license. +// See the LICENSE file in the project root for more information. + +using System; +using System.Collections.Generic; +using System.Diagnostics; +using System.IO; +using System.IO.Pipes; +using System.Net.Sockets; +using System.Runtime.InteropServices; +using System.Threading; +using System.Threading.Tasks; + +namespace Microsoft.Diagnostics.NETCore.Client +{ + /// + /// Establishes server endpoint for runtime instances to connect when + /// configured to provide diagnostic endpoints in reverse mode. + /// + internal sealed class ReversedDiagnosticsServer : IDisposable + { + // Returns true if the handler is complete and should be removed from the list + delegate bool StreamHandler(Guid runtimeId, ref Stream stream); + + // The amount of time to allow parsing of the advertise data before cancelling. This allows the server to + // remain responsive in case the advertise data is incomplete and the stream is not closed. + private static readonly TimeSpan ParseAdvertiseTimeout = TimeSpan.FromMilliseconds(250); + + private readonly Dictionary _cachedEndpoints = new Dictionary(); + private readonly Dictionary _cachedStreams = new Dictionary(); + private readonly CancellationTokenSource _disposalSource = new CancellationTokenSource(); + private readonly List _handlers = new List(); + private readonly object _lock = new object(); + private readonly IpcServerTransport _transport; + + private bool _disposed = false; + + /// + /// Constructs the instance with an endpoint bound + /// to the location specified by . + /// + /// + /// The path of the server endpoint. + /// On Windows, this can be a full pipe path or the name without the "\\.\pipe\" prefix. + /// On all other systems, this must be the full file path of the socket. + /// + public ReversedDiagnosticsServer(string transportPath) + : this(transportPath, MaxAllowedConnections) + { + } + + /// + /// Constructs the instance with an endpoint bound + /// to the location specified by . + /// + /// + /// The path of the server endpoint. + /// On Windows, this can be a full pipe path or the name without the "\\.\pipe\" prefix. + /// On all other systems, this must be the full file path of the socket. + /// + /// The maximum number of connections the server will support. + public ReversedDiagnosticsServer(string transportPath, int maxConnections) + { + _transport = IpcServerTransport.Create(transportPath, maxConnections); + } + + public void Dispose() + { + if (!_disposed) + { + _disposalSource.Cancel(); + + lock (_lock) + { + _cachedEndpoints.Clear(); + + foreach (Stream stream in _cachedStreams.Values) + { + stream?.Dispose(); + } + _cachedStreams.Clear(); + } + + _transport.Dispose(); + + _disposalSource.Dispose(); + + _disposed = true; + } + } + + /// + /// Provides endpoint information when a new runtime instance connects to the server. + /// + /// The token to monitor for cancellation requests. + /// A that contains information about the new runtime instance connection. + /// + /// This will only provide endpoint information on the first time a runtime connects to the server. + /// If a connection is removed using and the same runtime instance, + /// reconnects after this call, then a new will be produced. + /// + public async Task AcceptAsync(CancellationToken token) + { + VerifyNotDisposed(); + + while (true) + { + Stream stream = null; + IpcAdvertise advertise = null; + try + { + stream = await _transport.AcceptAsync(token).ConfigureAwait(false); + } + catch (Exception ex) when (!(ex is OperationCanceledException)) + { + // The advertise data could be incomplete if the runtime shuts down before completely writing + // the information. Catch the exception and continue waiting for a new connection. + } + + token.ThrowIfCancellationRequested(); + + if (null != stream) + { + // Cancel parsing of advertise data after timeout period to + // mitigate runtimes that write partial data and do not close the stream (avoid waiting forever). + using var parseCancellationSource = new CancellationTokenSource(); + using var linkedSource = CancellationTokenSource.CreateLinkedTokenSource(token, parseCancellationSource.Token); + try + { + parseCancellationSource.CancelAfter(ParseAdvertiseTimeout); + + advertise = await IpcAdvertise.ParseAsync(stream, linkedSource.Token).ConfigureAwait(false); + } + catch (OperationCanceledException) when (parseCancellationSource.IsCancellationRequested) + { + // Only handle cancellation if it was due to the parse timeout. + } + catch (Exception ex) when (!(ex is OperationCanceledException)) + { + // Catch all other exceptions and continue waiting for a new connection. + } + } + + token.ThrowIfCancellationRequested(); + + if (null != advertise) + { + Guid runtimeCookie = advertise.RuntimeInstanceCookie; + int pid = unchecked((int)advertise.ProcessId); + + lock (_lock) + { + ProvideStream(runtimeCookie, stream); + // Consumers should hold onto the endpoint info and use it for diagnostic communication, + // regardless of the number of times the same runtime instance connects. This requires consumers + // to continuously invoke the AcceptAsync method in order to handle runtime instance reconnects, + // even if the consumer only wants to handle a single endpoint. + if (!_cachedEndpoints.ContainsKey(runtimeCookie)) + { + ServerIpcEndpoint endpoint = new ServerIpcEndpoint(this, runtimeCookie); + _cachedEndpoints.Add(runtimeCookie, endpoint); + return new IpcEndpointInfo(endpoint, pid, runtimeCookie); + } + } + } + + token.ThrowIfCancellationRequested(); + } + } + + /// + /// Removes endpoint information from the server so that it is no longer tracked. + /// + /// The runtime instance cookie that corresponds to the endpoint to be removed. + /// True if the endpoint existed and was removed; otherwise false. + public bool RemoveConnection(Guid runtimeCookie) + { + VerifyNotDisposed(); + + bool endpointExisted = false; + Stream previousStream = null; + + lock (_lock) + { + endpointExisted = _cachedEndpoints.Remove(runtimeCookie); + if (endpointExisted) + { + if (_cachedStreams.TryGetValue(runtimeCookie, out previousStream)) + { + _cachedStreams.Remove(runtimeCookie); + } + } + } + + previousStream?.Dispose(); + + return endpointExisted; + } + + private void VerifyNotDisposed() + { + if (_disposed) + { + throw new ObjectDisposedException(nameof(ReversedDiagnosticsServer)); + } + } + + /// + /// This will block until the diagnostic stream is provided. This block can happen if + /// the stream is acquired previously and the runtime instance has not yet reconnected + /// to the reversed diagnostics server. + /// + internal Stream Connect(Guid runtimeId, TimeSpan timeout) + { + VerifyNotDisposed(); + + const int StreamStatePending = 0; + const int StreamStateComplete = 1; + const int StreamStateCancelled = 2; + const int StreamStateDisposed = 3; + + // CancellationTokenSource is used to trigger the timeout path in order to avoid inadvertently consuming + // the stream via the handler while processing the timeout after failing to wait for the stream event + // to be signaled within the timeout period. The source of truth of whether the stream was consumed or + // whether the timeout occurred is captured by the streamState variable. + Stream stream = null; + int streamState = StreamStatePending; + using var streamEvent = new ManualResetEvent(false); + var cancellationSource = new CancellationTokenSource(); + + bool TrySetStream(int state, Stream value) + { + if (StreamStatePending == Interlocked.CompareExchange(ref streamState, state, 0)) + { + stream = value; + streamEvent.Set(); + return true; + } + return false; + } + + using var methodRegistration = cancellationSource.Token.Register(() => TrySetStream(StreamStateCancelled, value: null)); + using var disposalRegistration = _disposalSource.Token.Register(() => TrySetStream(StreamStateDisposed, value: null)); + + RegisterHandler(runtimeId, (Guid id, ref Stream cachedStream) => + { + if (id != runtimeId) + { + return false; + } + + if (TrySetStream(StreamStateComplete, cachedStream)) + { + cachedStream = null; + } + + // Regardless of the registrant previously waiting or cancelled, + // the handler should be removed from consideration. + return true; + }); + + cancellationSource.CancelAfter(timeout); + streamEvent.WaitOne(); + + if (StreamStateCancelled == streamState) + { + throw new TimeoutException(); + } + + if (StreamStateDisposed == streamState) + { + throw new ObjectDisposedException(nameof(ReversedDiagnosticsServer)); + } + + return stream; + } + + internal async Task WaitForConnectionAsync(Guid runtimeId, CancellationToken token) + { + VerifyNotDisposed(); + + var hasConnectedStreamSource = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + using var methodRegistration = token.Register(() => hasConnectedStreamSource.TrySetCanceled(token)); + using var disposalRegistration = _disposalSource.Token.Register( + () => hasConnectedStreamSource.TrySetException(new ObjectDisposedException(nameof(ReversedDiagnosticsServer)))); + + RegisterHandler(runtimeId, (Guid id, ref Stream cachedStream) => + { + if (runtimeId != id) + { + return false; + } + + // Check if the registrant was already finished. + if (hasConnectedStreamSource.Task.IsCompleted) + { + return true; + } + + if (!TestStream(cachedStream)) + { + cachedStream.Dispose(); + cachedStream = null; + return false; + } + + // Found a stream that is valid; signal completion if possible. + hasConnectedStreamSource.TrySetResult(true); + + // Regardless of the registrant previously waiting or cancelled, + // the handler should be removed from consideration. + return true; + }); + + // Wait for the handler to verify we have a connected stream + await hasConnectedStreamSource.Task.ConfigureAwait(false); + } + + private void ProvideStream(Guid runtimeId, Stream stream) + { + Debug.Assert(Monitor.IsEntered(_lock)); + + // Get the previous stream in order to dispose it later + _cachedStreams.TryGetValue(runtimeId, out Stream previousStream); + + RunStreamHandlers(runtimeId, stream); + + // Dispose the previous stream if there was one. + previousStream?.Dispose(); + } + + private void RunStreamHandlers(Guid runtimeId, Stream stream) + { + Debug.Assert(Monitor.IsEntered(_lock)); + + // If there are any handlers waiting for a stream, provide + // it to the first handler in the queue. + for (int i = 0; (i < _handlers.Count) && (null != stream); i++) + { + StreamHandler handler = _handlers[i]; + if (handler(runtimeId, ref stream)) + { + _handlers.RemoveAt(i); + i--; + } + } + + // Store the stream for when a handler registers later. If + // a handler already captured the stream, this will be null, thus + // representing that no existing stream is waiting to be consumed. + _cachedStreams[runtimeId] = stream; + } + + private bool TestStream(Stream stream) + { + if (null == stream) + { + throw new ArgumentNullException(nameof(stream)); + } + + if (stream is ExposedSocketNetworkStream networkStream) + { + // Update Connected state of socket by sending non-blocking zero-byte data. + Socket socket = networkStream.Socket; + bool blocking = socket.Blocking; + try + { + socket.Blocking = false; + socket.Send(Array.Empty(), 0, SocketFlags.None); + } + catch (Exception) + { + } + finally + { + socket.Blocking = blocking; + } + return socket.Connected; + } + else if (stream is PipeStream pipeStream) + { + Debug.Assert(RuntimeInformation.IsOSPlatform(OSPlatform.Windows), "Pipe stream should only be used on Windows."); + + // PeekNamedPipe will return false if the pipe is disconnected/broken. + return NativeMethods.PeekNamedPipe( + pipeStream.SafePipeHandle, + null, + 0, + IntPtr.Zero, + IntPtr.Zero, + IntPtr.Zero); + } + + return false; + } + + private void RegisterHandler(Guid runtimeId, StreamHandler handler) + { + lock (_lock) + { + if (!_cachedStreams.TryGetValue(runtimeId, out Stream stream)) + { + throw new InvalidOperationException($"Runtime instance with identifier '{runtimeId}' is not registered."); + } + + _handlers.Add(handler); + + if (stream != null) + { + RunStreamHandlers(runtimeId, stream); + } + } + } + + public static int MaxAllowedConnections = IpcServerTransport.MaxAllowedConnections; + } +} diff --git a/src/Tools/dotnet-monitor/DiagnosticsMonitorCommandHandler.cs b/src/Tools/dotnet-monitor/DiagnosticsMonitorCommandHandler.cs index d4a691807..d2a90e896 100644 --- a/src/Tools/dotnet-monitor/DiagnosticsMonitorCommandHandler.cs +++ b/src/Tools/dotnet-monitor/DiagnosticsMonitorCommandHandler.cs @@ -2,21 +2,18 @@ // The .NET Foundation licenses this file to you under the MIT license. // See the LICENSE file in the project root for more information. -using Microsoft.AspNetCore; -using Microsoft.AspNetCore.Hosting; -using Microsoft.Diagnostics.Monitoring; -using Microsoft.Diagnostics.Monitoring.RestServer; -using Microsoft.Extensions.Configuration; -using Microsoft.Extensions.DependencyInjection; -using Microsoft.Extensions.Logging; using System; using System.Collections.Generic; using System.CommandLine; -using System.Globalization; -using System.IO; using System.Linq; using System.Threading; using System.Threading.Tasks; +using Microsoft.AspNetCore; +using Microsoft.AspNetCore.Hosting; +using Microsoft.Diagnostics.Monitoring; +using Microsoft.Diagnostics.Monitoring.RestServer; +using Microsoft.Extensions.Configuration; +using Microsoft.Extensions.DependencyInjection; namespace Microsoft.Diagnostics.Tools.Monitor { @@ -25,15 +22,15 @@ namespace Microsoft.Diagnostics.Tools.Monitor private const string ConfigPrefix = "DotnetMonitor_"; private const string ConfigPath = "/etc/dotnet-monitor"; - public async Task Start(CancellationToken token, IConsole console, string[] urls, string[] metricUrls, bool metrics) + public async Task Start(CancellationToken token, IConsole console, string[] urls, string[] metricUrls, bool metrics, string reversedServerAddress) { //CONSIDER The console logger uses the standard AddConsole, and therefore disregards IConsole. - using IWebHost host = CreateWebHostBuilder(console, urls, metricUrls, metrics).Build(); + using IWebHost host = CreateWebHostBuilder(console, urls, metricUrls, metrics, reversedServerAddress).Build(); await host.RunAsync(token); return 0; } - public IWebHostBuilder CreateWebHostBuilder(IConsole console, string[] urls, string[] metricUrls, bool metrics) + public IWebHostBuilder CreateWebHostBuilder(IConsole console, string[] urls, string[] metricUrls, bool metrics, string reversedServerAddress) { if (metrics) { @@ -53,6 +50,7 @@ namespace Microsoft.Diagnostics.Tools.Monitor }) .ConfigureServices((WebHostBuilderContext context, IServiceCollection services) => { + services.AddEndpointInfoSource(reversedServerAddress); //TODO Many of these service additions should be done through extension methods services.AddSingleton(); if (metrics) diff --git a/src/Tools/dotnet-monitor/Program.cs b/src/Tools/dotnet-monitor/Program.cs index 527674386..eac45a3a1 100644 --- a/src/Tools/dotnet-monitor/Program.cs +++ b/src/Tools/dotnet-monitor/Program.cs @@ -2,16 +2,14 @@ // The .NET Foundation licenses this file to you under the MIT license. // See the LICENSE file in the project root for more information. -using Microsoft.Diagnostics.Monitoring; -using Microsoft.Tools.Common; using System; -using System.Collections.Generic; using System.CommandLine; using System.CommandLine.Builder; using System.CommandLine.Invocation; -using System.IO; using System.Threading; using System.Threading.Tasks; +using Microsoft.Diagnostics.Monitoring; +using Microsoft.Tools.Common; namespace Microsoft.Diagnostics.Tools.Monitor { @@ -31,8 +29,8 @@ namespace Microsoft.Diagnostics.Tools.Monitor description: "Monitor logs and metrics in a .NET application send the results to a chosen destination.") { // Handler - CommandHandler.Create(new DiagnosticsMonitorCommandHandler().Start), - Urls(), MetricUrls(), ProvideMetrics() + CommandHandler.Create(new DiagnosticsMonitorCommandHandler().Start), + Urls(), MetricUrls(), ProvideMetrics(), ReversedServerAddress() }; private static Option Urls() => @@ -59,6 +57,14 @@ namespace Microsoft.Diagnostics.Tools.Monitor Argument = new Argument(name: "metrics", defaultValue: true ) }; + private static Option ReversedServerAddress() => + new Option( + alias: "--reversed-server-address", + description: "A fully qualified path and filename for the OS transport to communicate over.") + { + Argument = new Argument(name: "reversedServerAddress") + }; + private static string GetDefaultMetricsEndpoint() { string endpoint = "http://localhost:52325"; diff --git a/src/tests/Microsoft.Diagnostics.NETCore.Client/CommonHelper.cs b/src/tests/Microsoft.Diagnostics.NETCore.Client/CommonHelper.cs index 85f34f150..33e5533e2 100644 --- a/src/tests/Microsoft.Diagnostics.NETCore.Client/CommonHelper.cs +++ b/src/tests/Microsoft.Diagnostics.NETCore.Client/CommonHelper.cs @@ -15,11 +15,13 @@ namespace Microsoft.Diagnostics.NETCore.Client "..\\..\\..\\..\\..\\.dotnet\\dotnet.exe") : "../../../../../.dotnet/dotnet"; - public static string GetTraceePath(string traceeName = "Tracee") + public static string GetTraceePath(string traceeName = "Tracee", string targetFramework = "netcoreapp3.1") { var curPath = Directory.GetCurrentDirectory(); ; - var traceePath = curPath.Replace(System.Reflection.Assembly.GetCallingAssembly().GetName().Name, traceeName); + var traceePath = curPath + .Replace(System.Reflection.Assembly.GetCallingAssembly().GetName().Name, traceeName) + .Replace("netcoreapp3.1", targetFramework); return Path.Combine(traceePath, Path.ChangeExtension(traceeName, ".dll")); } diff --git a/src/tests/Microsoft.Diagnostics.NETCore.Client/GetPublishedProcessesTests.cs b/src/tests/Microsoft.Diagnostics.NETCore.Client/GetPublishedProcessesTests.cs index f538ed411..f44cf0da6 100644 --- a/src/tests/Microsoft.Diagnostics.NETCore.Client/GetPublishedProcessesTests.cs +++ b/src/tests/Microsoft.Diagnostics.NETCore.Client/GetPublishedProcessesTests.cs @@ -6,6 +6,7 @@ using System; using System.Collections.Generic; using System.Runtime.InteropServices; using System.Threading; +using System.Threading.Tasks; using Xunit; using Xunit.Abstractions; @@ -79,7 +80,7 @@ namespace Microsoft.Diagnostics.NETCore.Client } [Fact] - public void CheckSpecificProcessTest() + public async Task WaitForConnectionTest() { TestRunner runner = new TestRunner(CommonHelper.GetTraceePath(), output); runner.Start(3000); @@ -89,9 +90,15 @@ namespace Microsoft.Diagnostics.NETCore.Client } var client = new DiagnosticsClient(runner.Pid); - Assert.True(client.CheckTransport(), $"Unable to verify diagnostics transport for test process {runner.Pid}."); - - runner.Stop(); + using var timeoutSource = new CancellationTokenSource(TimeSpan.FromMilliseconds(250)); + try + { + await client.WaitForConnectionAsync(timeoutSource.Token); + } + finally + { + runner.Stop(); + } } } } diff --git a/src/tests/Microsoft.Diagnostics.NETCore.Client/Microsoft.Diagnostics.NETCore.Client.UnitTests.csproj b/src/tests/Microsoft.Diagnostics.NETCore.Client/Microsoft.Diagnostics.NETCore.Client.UnitTests.csproj index 4d449f713..ab5762b7b 100644 --- a/src/tests/Microsoft.Diagnostics.NETCore.Client/Microsoft.Diagnostics.NETCore.Client.UnitTests.csproj +++ b/src/tests/Microsoft.Diagnostics.NETCore.Client/Microsoft.Diagnostics.NETCore.Client.UnitTests.csproj @@ -8,6 +8,11 @@ - + + + + + + diff --git a/src/tests/Microsoft.Diagnostics.NETCore.Client/ReversedServerHelper.cs b/src/tests/Microsoft.Diagnostics.NETCore.Client/ReversedServerHelper.cs new file mode 100644 index 000000000..063ce34d1 --- /dev/null +++ b/src/tests/Microsoft.Diagnostics.NETCore.Client/ReversedServerHelper.cs @@ -0,0 +1,51 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT license. +// See the LICENSE file in the project root for more information. + +using System.IO; +using System.Runtime.InteropServices; +using Xunit.Abstractions; + +namespace Microsoft.Diagnostics.NETCore.Client +{ + internal static class ReversedServerHelper + { + /// + /// Creates a unique server name to avoid collisions from simultaneous running tests + /// or potentially abandoned socket files. + /// + public static string CreateServerTransportName() + { + string transportName = "DOTNET_DIAGSERVER_TESTS_" + Path.GetRandomFileName(); + if (RuntimeInformation.IsOSPlatform(OSPlatform.Windows)) + { + return transportName; + } + else + { + return Path.Combine(Path.GetTempPath(), transportName); + } + } + + /// + /// Starts the Tracee executable while enabling connection to reverse diagnostics server. + /// + public static TestRunner StartTracee(ITestOutputHelper _outputHelper, string transportName) + { + var runner = new TestRunner(CommonHelper.GetTraceePath(targetFramework: "net5.0"), _outputHelper); + runner.AddReversedServer(transportName); + runner.Start(); + return runner; + } + + public static void AddReversedServer(this TestRunner runner, string transportName) + { + runner.AddEnvVar("DOTNET_DiagnosticsMonitorAddress", transportName); + } + + public static string ToTestString(this IpcEndpointInfo info) + { + return $"PID={info.ProcessId}, COOKIE={info.RuntimeInstanceCookie}"; + } + } +} diff --git a/src/tests/Microsoft.Diagnostics.NETCore.Client/ReversedServerTests.cs b/src/tests/Microsoft.Diagnostics.NETCore.Client/ReversedServerTests.cs new file mode 100644 index 000000000..27ab0f6b1 --- /dev/null +++ b/src/tests/Microsoft.Diagnostics.NETCore.Client/ReversedServerTests.cs @@ -0,0 +1,448 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT license. +// See the LICENSE file in the project root for more information. + +using System; +using System.Collections.Generic; +using System.Diagnostics.Tracing; +using System.Threading; +using System.Threading.Tasks; +using Microsoft.Diagnostics.Tracing; +using Xunit; +using Xunit.Abstractions; + +namespace Microsoft.Diagnostics.NETCore.Client +{ + public class ReversedServerTests + { + private readonly ITestOutputHelper _outputHelper; + + public ReversedServerTests(ITestOutputHelper outputHelper) + { + _outputHelper = outputHelper; + } + + /// + /// Tests that server throws appropriate exceptions when disposed. + /// + [Fact] + public async Task ReversedServerDisposeTest() + { + var server = StartReversedServer(out string transportName); + + using CancellationTokenSource cancellation = new CancellationTokenSource(TimeSpan.FromSeconds(1)); + Task acceptTask = server.AcceptAsync(cancellation.Token); + + // Validate server surface throws after disposal + server.Dispose(); + + // Pending tasks should be cancelled and throw TaskCanceledException + await Assert.ThrowsAnyAsync(() => acceptTask); + Assert.True(acceptTask.IsCanceled); + + // Calls after dispose should throw ObjectDisposedException + await Assert.ThrowsAsync( + () => server.AcceptAsync(cancellation.Token)); + + Assert.Throws( + () => server.RemoveConnection(Guid.Empty)); + } + + /// + /// Tests that does not complete + /// when no connections are available and that cancellation will move the returned task to the cancelled state. + /// + [Fact] + public async Task ReversedServerAcceptAsyncYieldsTest() + { + using var server = StartReversedServer(out string transportName); + + using var cancellationSource = new CancellationTokenSource(TimeSpan.FromSeconds(1)); + + _outputHelper.WriteLine("Waiting for connection from server."); + Task acceptTask = server.AcceptAsync(cancellationSource.Token); + + await Assert.ThrowsAnyAsync(() => acceptTask); + Assert.True(acceptTask.IsCanceled); + } + + /// + /// Tests that invoking server methods with non-existing runtime identifier appropriately fail. + /// + [Fact] + public async Task ReversedServerNonExistingRuntimeIdentifierTest() + { + using var server = StartReversedServer(out string transportName); + + Guid nonExistingRuntimeId = Guid.NewGuid(); + + using CancellationTokenSource cancellation = new CancellationTokenSource(TimeSpan.FromSeconds(1)); + + _outputHelper.WriteLine($"Testing {nameof(ReversedDiagnosticsServer.WaitForConnectionAsync)}"); + await Assert.ThrowsAsync( + () => server.WaitForConnectionAsync(nonExistingRuntimeId, cancellation.Token)); + + _outputHelper.WriteLine($"Testing {nameof(ReversedDiagnosticsServer.Connect)}"); + Assert.Throws( + () => server.Connect(nonExistingRuntimeId, TimeSpan.FromSeconds(1))); + + _outputHelper.WriteLine($"Testing {nameof(ReversedDiagnosticsServer.RemoveConnection)}"); + Assert.False(server.RemoveConnection(nonExistingRuntimeId), "Removal of nonexisting connection should fail."); + } + + /// + /// Tests that a single client can connect to server, diagnostics can occur, + /// and multiple use of a single DiagnosticsClient is allowed. + /// + /// + /// The multiple use of a single client is important in the reverse scenario + /// because of how the endpoint is updated with new stream information each + /// time the target process reconnects to the server. + /// + [Fact] + public async Task ReversedServerSingleTargetMultipleUseClientTest() + { + using var server = StartReversedServer(out string transportName); + await using var accepter = new EndpointInfoAccepter(server, _outputHelper); + + TestRunner runner = null; + IpcEndpointInfo info; + try + { + // Start client pointing to diagnostics server + runner = StartTracee(transportName); + + info = await AcceptAsync(accepter); + + await VerifyEndpointInfo(runner, info); + + // There should not be any new endpoint infos + await VerifyNoNewEndpointInfos(accepter); + + ResumeRuntime(info); + + await VerifySingleSession(info); + } + finally + { + _outputHelper.WriteLine("Stopping tracee."); + runner?.Stop(); + } + + // Wait some time for the process to exit + await Task.Delay(TimeSpan.FromSeconds(1)); + + // Process exited so the endpoint should not have a valid transport anymore. + await VerifyWaitForConnection(info, expectValid: false); + + Assert.True(server.RemoveConnection(info.RuntimeInstanceCookie), "Expected to be able to remove connection from server."); + + // There should not be any more endpoint infos + await VerifyNoNewEndpointInfos(accepter); + } + + /// + /// Tests that a DiagnosticsClient is not viable after target exists. + /// + [Fact] + public async Task ReversedServerSingleTargetExitsClientInviableTest() + { + using var server = StartReversedServer(out string transportName); + await using var accepter = new EndpointInfoAccepter(server, _outputHelper); + + TestRunner runner = null; + IpcEndpointInfo info; + try + { + // Start client pointing to diagnostics server + runner = StartTracee(transportName); + + // Get client connection + info = await AcceptAsync(accepter); + + await VerifyEndpointInfo(runner, info); + + // There should not be any new endpoint infos + await VerifyNoNewEndpointInfos(accepter); + + ResumeRuntime(info); + + await VerifyWaitForConnection(info); + } + finally + { + _outputHelper.WriteLine("Stopping tracee."); + runner?.Stop(); + } + + // Wait some time for the process to exit + await Task.Delay(TimeSpan.FromSeconds(1)); + + // Process exited so the endpoint should not have a valid transport anymore. + await VerifyWaitForConnection(info, expectValid: false); + + Assert.True(server.RemoveConnection(info.RuntimeInstanceCookie), "Expected to be able to remove connection from server."); + + // There should not be any more endpoint infos + await VerifyNoNewEndpointInfos(accepter); + } + + private ReversedDiagnosticsServer StartReversedServer(out string transportName) + { + transportName = ReversedServerHelper.CreateServerTransportName(); + _outputHelper.WriteLine("Starting reversed server at '" + transportName + "'."); + return new ReversedDiagnosticsServer(transportName); + } + + private async Task AcceptAsync(EndpointInfoAccepter accepter) + { + using (var cancellationSource = new CancellationTokenSource(TimeSpan.FromSeconds(3))) + { + return await accepter.AcceptAsync(cancellationSource.Token); + } + } + + private TestRunner StartTracee(string transportName) + { + _outputHelper.WriteLine("Starting tracee."); + return ReversedServerHelper.StartTracee(_outputHelper, transportName); + } + + private static EventPipeProvider CreateProvider(string name) + { + return new EventPipeProvider(name, EventLevel.Verbose, (long)EventKeywords.All); + } + + private async Task VerifyWaitForConnection(IpcEndpointInfo info, bool expectValid = true) + { + using var connectionCancellation = new CancellationTokenSource(TimeSpan.FromSeconds(1)); + if (expectValid) + { + await info.Endpoint.WaitForConnectionAsync(connectionCancellation.Token); + } + else + { + await Assert.ThrowsAsync( + () => info.Endpoint.WaitForConnectionAsync(connectionCancellation.Token)); + } + } + + /// + /// Checks that the accepter does not provide a new endpoint info. + /// + private async Task VerifyNoNewEndpointInfos(EndpointInfoAccepter accepter) + { + _outputHelper.WriteLine("Verifying there are no more connections."); + + using var cancellationSource = new CancellationTokenSource(TimeSpan.FromSeconds(1)); + + Task acceptTask = accepter.AcceptAsync(cancellationSource.Token); + await Assert.ThrowsAsync(() => acceptTask); + Assert.True(acceptTask.IsCanceled); + + _outputHelper.WriteLine("Verified there are no more connections."); + } + + /// + /// Verifies basic information on the endpoint info and that it matches the target process from the runner. + /// + private async Task VerifyEndpointInfo(TestRunner runner, IpcEndpointInfo info, bool expectValid = true) + { + _outputHelper.WriteLine($"Verifying connection information for process ID {runner.Pid}."); + Assert.NotNull(runner); + Assert.Equal(runner.Pid, info.ProcessId); + Assert.NotEqual(Guid.Empty, info.RuntimeInstanceCookie); + Assert.NotNull(info.Endpoint); + + await VerifyWaitForConnection(info, expectValid); + + _outputHelper.WriteLine($"Connection: {info.ToTestString()}"); + } + + private void ResumeRuntime(IpcEndpointInfo info) + { + var client = new DiagnosticsClient(info.Endpoint); + + _outputHelper.WriteLine($"{info.RuntimeInstanceCookie}: Resuming runtime instance."); + try + { + client.ResumeRuntime(); + _outputHelper.WriteLine($"{info.RuntimeInstanceCookie}: Resumed successfully."); + } + catch (ServerErrorException ex) + { + // Runtime likely does not understand the ResumeRuntime command. + _outputHelper.WriteLine($"{info.RuntimeInstanceCookie}: {ex.Message}"); + } + } + + /// + /// Verifies that a client can handle multiple operations simultaneously. + /// + private async Task VerifySingleSession(IpcEndpointInfo info) + { + await VerifyWaitForConnection(info); + + var client = new DiagnosticsClient(info.Endpoint); + + _outputHelper.WriteLine($"{info.RuntimeInstanceCookie}: Creating session #1."); + var providers = new List(); + providers.Add(new EventPipeProvider( + "System.Runtime", + EventLevel.Informational, + 0, + new Dictionary() { + { "EventCounterIntervalSec", "1" } + })); + using var session = client.StartEventPipeSession(providers); + + _outputHelper.WriteLine($"{info.RuntimeInstanceCookie}: Verifying session produces events."); + await VerifyEventStreamProvidesEventsAsync(info, session, 1); + + _outputHelper.WriteLine($"{info.RuntimeInstanceCookie}: Session verification complete."); + } + + /// + /// Verifies that an event stream does provide events. + /// + private Task VerifyEventStreamProvidesEventsAsync(IpcEndpointInfo info, EventPipeSession session, int sessionNumber) + { + Assert.NotNull(session); + Assert.NotNull(session.EventStream); + + return Task.Run(async () => + { + _outputHelper.WriteLine($"{info.RuntimeInstanceCookie}: Session #{sessionNumber} - Creating event source."); + + // This blocks for a while due to this bug: https://github.com/microsoft/perfview/issues/1172 + using var eventSource = new EventPipeEventSource(session.EventStream); + + _outputHelper.WriteLine($"{info.RuntimeInstanceCookie}: Session #{sessionNumber} - Setup event handlers."); + + // Create task completion source that is completed when any events are provided; cancel it if cancellation is requested + var receivedEventsSource = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + + using var cancellation = new CancellationTokenSource(TimeSpan.FromMinutes(1)); + using var _ = cancellation.Token.Register(() => + { + if (receivedEventsSource.TrySetCanceled()) + { + _outputHelper.WriteLine($"{info.RuntimeInstanceCookie}: Session #{sessionNumber} - Cancelled event processing."); + } + }); + + // Create continuation task that stops the session (which immediately stops event processing). + Task stoppedProcessingTask = receivedEventsSource.Task + .ContinueWith(_ => + { + _outputHelper.WriteLine($"{info.RuntimeInstanceCookie}: Session #{sessionNumber} - Stopping session."); + session.Stop(); + }); + + // Signal task source when an event is received. + Action allEventsHandler = _ => + { + if (receivedEventsSource.TrySetResult(null)) + { + _outputHelper.WriteLine($"{info.RuntimeInstanceCookie}: Session #{sessionNumber} - Received an event and set result on completion source."); + } + }; + + _outputHelper.WriteLine($"{info.RuntimeInstanceCookie}: Session #{sessionNumber} - Start processing events."); + eventSource.Dynamic.All += allEventsHandler; + eventSource.Process(); + eventSource.Dynamic.All -= allEventsHandler; + _outputHelper.WriteLine($"{info.RuntimeInstanceCookie}: Session #{sessionNumber} - Stopped processing events."); + + // Wait on the task source to verify if it ran to completion or was cancelled. + await receivedEventsSource.Task; + + _outputHelper.WriteLine($"{info.RuntimeInstanceCookie}: Session #{sessionNumber} - Waiting for session to stop."); + await stoppedProcessingTask; + }); + } + + /// + /// Helper class for consuming endpoint infos from the reverse diagnostics server. + /// + /// + /// The diagnostics server requires that something is continuously attempting to accept endpoint infos + /// in order to process incoming connections. This helps facilitate that continuous accepting of + /// endpoint infos so the individual tests don't have to know about the behavior. + /// + private class EndpointInfoAccepter : IAsyncDisposable + { + private readonly CancellationTokenSource _cancellation = new CancellationTokenSource(); + private readonly Queue _connections = new Queue(); + private readonly SemaphoreSlim _connectionsSemaphore = new SemaphoreSlim(0); + private readonly Task _listenTask; + private readonly ITestOutputHelper _outputHelper; + private readonly ReversedDiagnosticsServer _server; + + private int _acceptedCount; + private bool _disposed; + + public EndpointInfoAccepter(ReversedDiagnosticsServer server, ITestOutputHelper outputHelper) + { + _server = server; + _outputHelper = outputHelper; + + _listenTask = ListenAsync(_cancellation.Token); + } + + public async ValueTask DisposeAsync() + { + if (!_disposed) + { + _cancellation.Cancel(); + + await _listenTask; + + _cancellation.Dispose(); + + _disposed = true; + } + } + + public async Task AcceptAsync(CancellationToken token) + { + using var linkedSource = CancellationTokenSource.CreateLinkedTokenSource(token, _cancellation.Token); + + _outputHelper.WriteLine("Waiting for connection from accepter."); + await _connectionsSemaphore.WaitAsync(linkedSource.Token).ConfigureAwait(false); + _outputHelper.WriteLine("Received connection from accepter."); + + return _connections.Dequeue(); + } + + /// + /// Continuously accept endpoint infos from the reversed diagnostics server so + /// that + /// is always awaited in order to to handle new runtime instance connections + /// as well as existing runtime instance reconnections. + /// + private async Task ListenAsync(CancellationToken token) + { + while (!token.IsCancellationRequested) + { + IpcEndpointInfo info; + try + { + _outputHelper.WriteLine("Waiting for connection from server."); + info = await _server.AcceptAsync(token).ConfigureAwait(false); + + _acceptedCount++; + _outputHelper.WriteLine($"Accepted connection #{_acceptedCount} from server: {info.ToTestString()}"); + } + catch (OperationCanceledException) + { + break; + } + + _connections.Enqueue(info); + _connectionsSemaphore.Release(); + } + } + } + } +} diff --git a/src/tests/Microsoft.Diagnostics.NETCore.Client/TestRunner.cs b/src/tests/Microsoft.Diagnostics.NETCore.Client/TestRunner.cs index 4291750f1..ba4c0c982 100644 --- a/src/tests/Microsoft.Diagnostics.NETCore.Client/TestRunner.cs +++ b/src/tests/Microsoft.Diagnostics.NETCore.Client/TestRunner.cs @@ -3,18 +3,13 @@ // See the LICENSE file in the project root for more information. -using Microsoft.Diagnostics.NETCore.Client; -using Microsoft.Diagnostics.TestHelpers; using System; -using System.Collections.Generic; using System.ComponentModel; using System.Diagnostics; using System.IO; -using System.Linq; using System.Runtime.InteropServices; using System.Threading; using System.Threading.Tasks; -using Xunit; using Xunit.Abstractions; namespace Microsoft.Diagnostics.NETCore.Client @@ -50,9 +45,11 @@ namespace Microsoft.Diagnostics.NETCore.Client if (outputHelper != null) outputHelper.WriteLine($"[{DateTime.Now.ToString()}] Launching test: " + startInfo.FileName); - testProcess = Process.Start(startInfo); + testProcess = new Process(); + testProcess.StartInfo = startInfo; + testProcess.EnableRaisingEvents = true; - if (testProcess == null) + if (!testProcess.Start()) { outputHelper.WriteLine($"Could not start process: " + startInfo.FileName); } @@ -125,5 +122,26 @@ namespace Microsoft.Diagnostics.NETCore.Client outputHelper.WriteLine($"Process {testProcess.Id} status: Running"); } } + + public async Task WaitForExitAsync(CancellationToken token) + { + TaskCompletionSource exitedSource = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + EventHandler exitedHandler = (s, e) => exitedSource.TrySetResult(null); + + testProcess.Exited += exitedHandler; + try + { + if (!testProcess.HasExited) + { + using var _ = token.Register(() => exitedSource.TrySetCanceled(token)); + + await exitedSource.Task; + } + } + finally + { + testProcess.Exited -= exitedHandler; + } + } } } diff --git a/src/tests/Tracee/Program.cs b/src/tests/Tracee/Program.cs index c6143cf4d..8250ea7c8 100644 --- a/src/tests/Tracee/Program.cs +++ b/src/tests/Tracee/Program.cs @@ -9,13 +9,21 @@ namespace Tracee { class Program { + private const int LoopCount = 30; + static void Main(string[] args) { + Console.WriteLine("Sleep in loop for {0} seconds.", LoopCount); + // Runs for max of 30 sec - for(var i = 0; i < 30; i++) + for (var i = 0; i < LoopCount; i++) { + Console.WriteLine("Iteration #{0}", i); Thread.Sleep(1000); } + + Console.WriteLine("Press any key to exit."); + Console.ReadKey(); } } } diff --git a/src/tests/dotnet-monitor/DiagnosticsMonitorTests.cs b/src/tests/dotnet-monitor/DiagnosticsMonitorTests.cs index 134bf4d64..494ad20f4 100644 --- a/src/tests/dotnet-monitor/DiagnosticsMonitorTests.cs +++ b/src/tests/dotnet-monitor/DiagnosticsMonitorTests.cs @@ -2,22 +2,16 @@ // The .NET Foundation licenses this file to you under the MIT license. // See the LICENSE file in the project root for more information. -using Microsoft.Diagnostics.Monitoring; -using Microsoft.Diagnostics.NETCore.Client; -using Microsoft.Extensions.DependencyInjection; -using Microsoft.Extensions.Logging; -using Microsoft.Extensions.Options; -using Microsoft.VisualStudio.TestPlatform.ObjectModel; -using Microsoft.VisualStudio.TestPlatform.ObjectModel.DataCollection; using System; using System.Collections.Generic; -using System.Diagnostics; using System.IO; -using System.Linq; using System.Runtime.InteropServices; using System.Text.Json; using System.Threading; using System.Threading.Tasks; +using Microsoft.Diagnostics.Monitoring; +using Microsoft.Diagnostics.NETCore.Client; +using Microsoft.Extensions.Logging; using Xunit; using Xunit.Abstractions; using Xunit.Extensions; @@ -43,7 +37,7 @@ namespace DotnetMonitor.UnitTests var outputStream = new MemoryStream(); - await using (var testExecution = RemoteTestExecution.StartRemoteProcess("LoggerRemoteTest", _output)) + await using (var testExecution = StartTraceeProcess("LoggerRemoteTest")) { //TestRunner should account for start delay to make sure that the diagnostic pipe is available. @@ -51,10 +45,10 @@ namespace DotnetMonitor.UnitTests DiagnosticsEventPipeProcessor diagnosticsEventPipeProcessor = new DiagnosticsEventPipeProcessor( PipeMode.Logs, - loggerFactory, - Enumerable.Empty()); + loggerFactory); - var processingTask = diagnosticsEventPipeProcessor.Process(testExecution.TestRunner.Pid, TimeSpan.FromSeconds(10), CancellationToken.None); + var client = new DiagnosticsClient(testExecution.TestRunner.Pid); + var processingTask = diagnosticsEventPipeProcessor.Process(client, testExecution.TestRunner.Pid, TimeSpan.FromSeconds(10), CancellationToken.None); //Add a small delay to make sure diagnostic processor had a chance to initialize await Task.Delay(1000); @@ -108,6 +102,11 @@ namespace DotnetMonitor.UnitTests } } + private RemoteTestExecution StartTraceeProcess(string loggerCategory) + { + return RemoteTestExecution.StartProcess(CommonHelper.GetTraceePath("EventPipeTracee") + " " + loggerCategory, _output); + } + private sealed class LoggerTestResult { public string Category { get; set; } diff --git a/src/tests/dotnet-monitor/EndpointInfoSourceTests.cs b/src/tests/dotnet-monitor/EndpointInfoSourceTests.cs new file mode 100644 index 000000000..9d973aab3 --- /dev/null +++ b/src/tests/dotnet-monitor/EndpointInfoSourceTests.cs @@ -0,0 +1,223 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT license. +// See the LICENSE file in the project root for more information. + +using System; +using System.Collections.Generic; +using System.Threading; +using System.Threading.Tasks; +using Microsoft.Diagnostics.Monitoring; +using Microsoft.Diagnostics.NETCore.Client; +using Xunit; +using Xunit.Abstractions; + +namespace DotnetMonitor.UnitTests +{ + public class EndpointInfoSourceTests + { + private readonly ITestOutputHelper _outputHelper; + + public EndpointInfoSourceTests(ITestOutputHelper outputHelper) + { + _outputHelper = outputHelper; + } + + /// + /// Tests that the server endpoint info source has no connections + /// if is not called. + /// + [Fact] + public async Task ServerSourceNoListenTest() + { + await using var source = CreateServerSource(out string transportName); + // Intentionally do not call Listen + + await using (var execution1 = StartTraceeProcess("LoggerRemoteTest", transportName)) + { + execution1.Start(); + + await Task.Delay(TimeSpan.FromSeconds(1)); + + var endpointInfos = await GetEndpointInfoAsync(source); + + Assert.Empty(endpointInfos); + + _outputHelper.WriteLine("Stopping tracee."); + } + } + + /// + /// Tests that the server endpoint info source has not connections if no processes connect to it. + /// + [Fact] + public async Task ServerSourceNoConnectionsTest() + { + await using var source = CreateServerSource(out _); + source.Listen(); + + var endpointInfos = await GetEndpointInfoAsync(source); + Assert.Empty(endpointInfos); + } + + /// + /// Tests that server endpoint info source should throw ObjectDisposedException + /// from API surface after being disposed. + /// + [Fact] + public async Task ServerSourceThrowsWhenDisposedTest() + { + var source = CreateServerSource(out _); + source.Listen(); + + await source.DisposeAsync(); + + // Validate source surface throws after disposal + Assert.Throws( + () => source.Listen()); + + Assert.Throws( + () => source.Listen(1)); + + await Assert.ThrowsAsync( + () => source.GetEndpointInfoAsync(CancellationToken.None)); + } + + /// + /// Tests that server endpoint info source should throw an exception from + /// and + /// after listening was already started. + /// + [Fact] + public async Task ServerSourceThrowsWhenMultipleListenTest() + { + await using var source = CreateServerSource(out _); + source.Listen(); + + Assert.Throws( + () => source.Listen()); + + Assert.Throws( + () => source.Listen(1)); + } + + /// + /// Tests that the server endpoint info source can properly enumerate endpoint infos when a single + /// target connects to it and "disconnects" from it. + /// + [Fact] + public async Task ServerSourceAddRemoveSingleConnectionTest() + { + await using var source = CreateServerSource(out string transportName); + source.Listen(); + + var endpointInfos = await GetEndpointInfoAsync(source); + Assert.Empty(endpointInfos); + + Task newEndpointInfoTask = source.WaitForNewEndpointInfoAsync(TimeSpan.FromSeconds(5)); + + await using (var execution1 = StartTraceeProcess("LoggerRemoteTest", transportName)) + { + await newEndpointInfoTask; + + execution1.Start(); + + endpointInfos = await GetEndpointInfoAsync(source); + + var endpointInfo = Assert.Single(endpointInfos); + VerifyConnection(execution1.TestRunner, endpointInfo); + + _outputHelper.WriteLine("Stopping tracee."); + } + + await Task.Delay(TimeSpan.FromSeconds(1)); + + endpointInfos = await GetEndpointInfoAsync(source); + + Assert.Empty(endpointInfos); + } + + private TestServerEndpointInfoSource CreateServerSource(out string transportName) + { + transportName = ReversedServerHelper.CreateServerTransportName(); + _outputHelper.WriteLine("Starting server endpoint info source at '" + transportName + "'."); + return new TestServerEndpointInfoSource(transportName, _outputHelper); + } + + private RemoteTestExecution StartTraceeProcess(string loggerCategory, string transportName = null) + { + _outputHelper.WriteLine("Starting tracee."); + string exePath = CommonHelper.GetTraceePath("EventPipeTracee", targetFramework: "net5.0"); + return RemoteTestExecution.StartProcess(exePath + " " + loggerCategory, _outputHelper, transportName); + } + + private async Task> GetEndpointInfoAsync(ServerEndpointInfoSource source) + { + _outputHelper.WriteLine("Getting endpoint infos."); + using CancellationTokenSource cancellationSource = new CancellationTokenSource(TimeSpan.FromSeconds(10)); + return await source.GetEndpointInfoAsync(cancellationSource.Token); + } + + /// + /// Verifies basic information on the connection and that it matches the target process from the runner. + /// + private static void VerifyConnection(TestRunner runner, IEndpointInfo endpointInfo) + { + Assert.NotNull(runner); + Assert.NotNull(endpointInfo); + Assert.Equal(runner.Pid, endpointInfo.ProcessId); + Assert.NotEqual(Guid.Empty, endpointInfo.RuntimeInstanceCookie); + Assert.NotNull(endpointInfo.Endpoint); + } + + private sealed class TestServerEndpointInfoSource : ServerEndpointInfoSource + { + private readonly ITestOutputHelper _outputHelper; + private readonly List> _addedEndpointInfoSources = new List>(); + + public TestServerEndpointInfoSource(string transportPath, ITestOutputHelper outputHelper) + : base(transportPath) + { + _outputHelper = outputHelper; + } + + public async Task WaitForNewEndpointInfoAsync(TimeSpan timeout) + { + TaskCompletionSource addedEndpointInfoSource = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + using var timeoutCancellation = new CancellationTokenSource(); + var token = timeoutCancellation.Token; + using var _ = token.Register(() => addedEndpointInfoSource.TrySetCanceled(token)); + + lock (_addedEndpointInfoSources) + { + _addedEndpointInfoSources.Add(addedEndpointInfoSource); + } + + _outputHelper.WriteLine("Waiting for new endpoint info."); + timeoutCancellation.CancelAfter(timeout); + IpcEndpointInfo endpointInfo = await addedEndpointInfoSource.Task; + _outputHelper.WriteLine("Notified of new endpoint info."); + + return endpointInfo; + } + + internal override void OnAddedEndpointInfo(IpcEndpointInfo info) + { + _outputHelper.WriteLine($"Added endpoint info to collection: {info.ToTestString()}"); + + lock (_addedEndpointInfoSources) + { + foreach (var source in _addedEndpointInfoSources) + { + source.TrySetResult(info); + } + _addedEndpointInfoSources.Clear(); + } + } + + internal override void OnRemovedEndpointInfo(IpcEndpointInfo info) + { + _outputHelper.WriteLine($"Removed endpoint info from collection: {info.ToTestString()}"); + } + } + } +} diff --git a/src/tests/dotnet-monitor/RemoteTestExecution.cs b/src/tests/dotnet-monitor/RemoteTestExecution.cs index 832945aa9..0db40967e 100644 --- a/src/tests/dotnet-monitor/RemoteTestExecution.cs +++ b/src/tests/dotnet-monitor/RemoteTestExecution.cs @@ -2,16 +2,11 @@ // The .NET Foundation licenses this file to you under the MIT license. // See the LICENSE file in the project root for more information. -using Microsoft.Diagnostics.NETCore.Client; -using Microsoft.Extensions.DependencyInjection; -using Microsoft.Extensions.Logging; using System; -using System.Collections.Generic; -using System.Diagnostics; using System.IO; -using System.Text; using System.Threading; using System.Threading.Tasks; +using Microsoft.Diagnostics.NETCore.Client; using Xunit.Abstractions; namespace DotnetMonitor.UnitTests @@ -23,14 +18,17 @@ namespace DotnetMonitor.UnitTests { private Task IoReadingTask { get; } - private RemoteTestExecution(TestRunner runner, Task ioReadingTask) + private ITestOutputHelper OutputHelper { get; } + + public TestRunner TestRunner { get; } + + private RemoteTestExecution(TestRunner runner, Task ioReadingTask, ITestOutputHelper outputHelper) { TestRunner = runner; IoReadingTask = ioReadingTask; + OutputHelper = outputHelper; } - public TestRunner TestRunner { get; } - public void Start() { SendSignal(); @@ -44,15 +42,18 @@ namespace DotnetMonitor.UnitTests TestRunner.StandardInput.Flush(); } - public static RemoteTestExecution StartRemoteProcess(string loggerCategory, ITestOutputHelper outputHelper) + public static RemoteTestExecution StartProcess(string commandLine, ITestOutputHelper outputHelper, string reversedServerTransportName = null) { - TestRunner runner = new TestRunner(CommonHelper.GetTraceePath("EventPipeTracee") + " " + loggerCategory, - outputHelper, redirectError: true, redirectInput: true); + TestRunner runner = new TestRunner(commandLine, outputHelper, redirectError: true, redirectInput: true); + if (!string.IsNullOrEmpty(reversedServerTransportName)) + { + runner.AddReversedServer(reversedServerTransportName); + } runner.Start(); Task readingTask = ReadAllOutput(runner.StandardOutput, runner.StandardError, outputHelper); - return new RemoteTestExecution(runner, readingTask); + return new RemoteTestExecution(runner, readingTask, outputHelper); } private static Task ReadAllOutput(StreamReader output, StreamReader error, ITestOutputHelper outputHelper) @@ -95,6 +96,18 @@ namespace DotnetMonitor.UnitTests public async ValueTask DisposeAsync() { SendSignal(); + + using var timeoutSource = new CancellationTokenSource(TimeSpan.FromSeconds(1)); + try + { + await TestRunner.WaitForExitAsync(timeoutSource.Token); + } + catch (OperationCanceledException) + { + OutputHelper.WriteLine("Remote process did not exit within timeout period. Forcefully stopping process."); + TestRunner.Stop(); + } + await IoReadingTask; } }