Add reversed diagnostics server, endpoints, and endpoint sources. (#1303)
authorJustin Anderson <jander-msft@users.noreply.github.com>
Thu, 6 Aug 2020 18:46:34 +0000 (11:46 -0700)
committerGitHub <noreply@github.com>
Thu, 6 Aug 2020 18:46:34 +0000 (11:46 -0700)
Add reversed diagnostics server and endpoint information.
Add diagnostics endpoint source for unifying server and client connections into same contract.
Consume endpoint source in dotnet-monitor.
Add unit tests for reversed server, endpoints, and endpoint source concepts.

42 files changed:
eng/Versions.props
src/Microsoft.Diagnostics.Monitoring.RestServer/Controllers/DiagController.cs
src/Microsoft.Diagnostics.Monitoring.RestServer/MetricsService.cs
src/Microsoft.Diagnostics.Monitoring.RestServer/Models/ProcessModel.cs
src/Microsoft.Diagnostics.Monitoring/ClientEndpointInfoSource.cs [new file with mode: 0644]
src/Microsoft.Diagnostics.Monitoring/Configuration/CpuProfileConfiguration.cs
src/Microsoft.Diagnostics.Monitoring/Configuration/MonitoringSourceConfiguration.cs
src/Microsoft.Diagnostics.Monitoring/Configuration/SampleProfilerConfiguration.cs [new file with mode: 0644]
src/Microsoft.Diagnostics.Monitoring/Contracts/IDiagnosticServices.cs
src/Microsoft.Diagnostics.Monitoring/Contracts/IEndpointInfoSource.cs [new file with mode: 0644]
src/Microsoft.Diagnostics.Monitoring/DiagnosticServices.cs
src/Microsoft.Diagnostics.Monitoring/DiagnosticsEventPipeProcessor.cs
src/Microsoft.Diagnostics.Monitoring/DiagnosticsMonitor.cs
src/Microsoft.Diagnostics.Monitoring/Microsoft.Diagnostics.Monitoring.csproj
src/Microsoft.Diagnostics.Monitoring/ServerEndpointInfoSource.cs [new file with mode: 0644]
src/Microsoft.Diagnostics.Monitoring/ServiceCollectionExtensions.cs [new file with mode: 0644]
src/Microsoft.Diagnostics.NETCore.Client/DiagnosticsClient/DiagnosticsClient.cs
src/Microsoft.Diagnostics.NETCore.Client/DiagnosticsClient/EventPipeSession.cs
src/Microsoft.Diagnostics.NETCore.Client/DiagnosticsIpc/ExposedSocketNetworkStream.cs [new file with mode: 0644]
src/Microsoft.Diagnostics.NETCore.Client/DiagnosticsIpc/IpcAdvertise.cs [new file with mode: 0644]
src/Microsoft.Diagnostics.NETCore.Client/DiagnosticsIpc/IpcClient.cs
src/Microsoft.Diagnostics.NETCore.Client/DiagnosticsIpc/IpcCommands.cs
src/Microsoft.Diagnostics.NETCore.Client/DiagnosticsIpc/IpcMessage.cs
src/Microsoft.Diagnostics.NETCore.Client/DiagnosticsIpc/IpcServerTransport.cs [new file with mode: 0644]
src/Microsoft.Diagnostics.NETCore.Client/DiagnosticsIpc/IpcTransport.cs [new file with mode: 0644]
src/Microsoft.Diagnostics.NETCore.Client/DiagnosticsIpc/UnixDomainSocket.cs [new file with mode: 0644]
src/Microsoft.Diagnostics.NETCore.Client/Microsoft.Diagnostics.NETCore.Client.csproj
src/Microsoft.Diagnostics.NETCore.Client/NativeMethods.cs [new file with mode: 0644]
src/Microsoft.Diagnostics.NETCore.Client/ReversedServer/IpcEndpointInfo.cs [new file with mode: 0644]
src/Microsoft.Diagnostics.NETCore.Client/ReversedServer/ReversedDiagnosticsServer.cs [new file with mode: 0644]
src/Tools/dotnet-monitor/DiagnosticsMonitorCommandHandler.cs
src/Tools/dotnet-monitor/Program.cs
src/tests/Microsoft.Diagnostics.NETCore.Client/CommonHelper.cs
src/tests/Microsoft.Diagnostics.NETCore.Client/GetPublishedProcessesTests.cs
src/tests/Microsoft.Diagnostics.NETCore.Client/Microsoft.Diagnostics.NETCore.Client.UnitTests.csproj
src/tests/Microsoft.Diagnostics.NETCore.Client/ReversedServerHelper.cs [new file with mode: 0644]
src/tests/Microsoft.Diagnostics.NETCore.Client/ReversedServerTests.cs [new file with mode: 0644]
src/tests/Microsoft.Diagnostics.NETCore.Client/TestRunner.cs
src/tests/Tracee/Program.cs
src/tests/dotnet-monitor/DiagnosticsMonitorTests.cs
src/tests/dotnet-monitor/EndpointInfoSourceTests.cs [new file with mode: 0644]
src/tests/dotnet-monitor/RemoteTestExecution.cs

index 282408aea94e998109b6cdea5ddd3ce451f1c4a9..91f5875cbf8e4073f47122b3134355efac98b5f9 100644 (file)
@@ -42,6 +42,7 @@
     <MicrosoftExtensionsConfigurationJsonVersion>2.1.1</MicrosoftExtensionsConfigurationJsonVersion>
     <MicrosoftExtensionsConfigurationKeyPerFileVersion>2.1.1</MicrosoftExtensionsConfigurationKeyPerFileVersion>
     <MicrosoftExtensionsDependencyInjectionVersion>2.1.1</MicrosoftExtensionsDependencyInjectionVersion>
+    <MicrosoftExtensionsHostingAbstractionsVersion>2.1.1</MicrosoftExtensionsHostingAbstractionsVersion>
     <MicrosoftExtensionsLoggingVersion>2.1.1</MicrosoftExtensionsLoggingVersion>
     <MicrosoftExtensionsLoggingConsoleVersion>2.1.1</MicrosoftExtensionsLoggingConsoleVersion>
     <!-- We use a newer version of LoggingEventSource due to a bug in an older version-->
index 85c350d03e2bcd76c79cb451fa5aa3ace1c3a80a..f0238863060e6b2bbff65cbdf85b7cfde71a76e7 100644 (file)
@@ -44,14 +44,14 @@ namespace Microsoft.Diagnostics.Monitoring.RestServer.Controllers
         }
 
         [HttpGet("processes")]
-        public ActionResult<IEnumerable<ProcessModel>> GetProcesses()
+        public Task<ActionResult<IEnumerable<ProcessModel>>> GetProcesses()
         {
-            return this.InvokeService(() =>
+            return this.InvokeService(async () =>
             {
                 IList<ProcessModel> processes = new List<ProcessModel>();
-                foreach (int pid in _diagnosticServices.GetProcesses())
+                foreach (IProcessInfo p in await _diagnosticServices.GetProcessesAsync(HttpContext.RequestAborted))
                 {
-                    processes.Add(new ProcessModel() { Pid = pid });
+                    processes.Add(ProcessModel.FromProcessInfo(p));
                 }
                 return new ActionResult<IEnumerable<ProcessModel>>(processes);
             });
@@ -62,8 +62,8 @@ namespace Microsoft.Diagnostics.Monitoring.RestServer.Controllers
         {
             return this.InvokeService(async () =>
             {
-                int pidValue = _diagnosticServices.ResolveProcess(pid);
-                Stream result = await _diagnosticServices.GetDump(pidValue, type);
+                int pidValue = await _diagnosticServices.ResolveProcessAsync(pid, HttpContext.RequestAborted);
+                Stream result = await _diagnosticServices.GetDump(pidValue, type, HttpContext.RequestAborted);
 
                 string dumpFileName = RuntimeInformation.IsOSPlatform(OSPlatform.Windows) ?
                     FormattableString.Invariant($"dump_{GetFileNameTimeStampUtcNow()}.dmp") :
@@ -80,7 +80,7 @@ namespace Microsoft.Diagnostics.Monitoring.RestServer.Controllers
         {
             return this.InvokeService(async () =>
             {
-                int pidValue = _diagnosticServices.ResolveProcess(pid);
+                int pidValue = await _diagnosticServices.ResolveProcessAsync(pid, HttpContext.RequestAborted);
                 Stream result = await _diagnosticServices.GetGcDump(pidValue, this.HttpContext.RequestAborted);
                 return File(result, "application/octet-stream", FormattableString.Invariant($"{GetFileNameTimeStampUtcNow()}_{pidValue}.gcdump"));
             });
@@ -159,12 +159,12 @@ namespace Microsoft.Diagnostics.Monitoring.RestServer.Controllers
 
         [HttpGet("logs/{pid?}")]
         [Produces(ContentTypeEventStream, ContentTypeNdJson, ContentTypeJson)]
-        public ActionResult Logs(int? pid, [FromQuery][Range(-1, int.MaxValue)] int durationSeconds = 30, [FromQuery] LogLevel level = LogLevel.Debug)
+        public Task<ActionResult> Logs(int? pid, [FromQuery][Range(-1, int.MaxValue)] int durationSeconds = 30, [FromQuery] LogLevel level = LogLevel.Debug)
         {
             TimeSpan duration = ConvertSecondsToTimeSpan(durationSeconds);
-            return this.InvokeService(() =>
+            return this.InvokeService(async () =>
             {
-                int pidValue = _diagnosticServices.ResolveProcess(pid);
+                int pidValue = await _diagnosticServices.ResolveProcessAsync(pid, HttpContext.RequestAborted);
 
                 LogFormat format = ComputeLogFormat(Request.GetTypedHeaders().Accept);
                 if (format == LogFormat.None)
@@ -184,7 +184,7 @@ namespace Microsoft.Diagnostics.Monitoring.RestServer.Controllers
 
         private async Task<StreamWithCleanupResult> StartTrace(int? pid, MonitoringSourceConfiguration configuration, TimeSpan duration)
         {
-            int pidValue = _diagnosticServices.ResolveProcess(pid);
+            int pidValue = await _diagnosticServices.ResolveProcessAsync(pid, HttpContext.RequestAborted);
             IStreamWithCleanup result = await _diagnosticServices.StartTrace(pidValue, configuration, duration, this.HttpContext.RequestAborted);
             return new StreamWithCleanupResult(result, "application/octet-stream", FormattableString.Invariant($"{GetFileNameTimeStampUtcNow()}_{pidValue}.nettrace"));
         }
index fa5b9d48ab9eab309ab16f4ee6cbde832a910f35..7d3b7333af5b204c546dcb917bd9992889623363 100644 (file)
@@ -2,15 +2,12 @@
 // The .NET Foundation licenses this file to you under the MIT license.
 // See the LICENSE file in the project root for more information.
 
-using Microsoft.Diagnostics.Tracing.Analysis;
-using Microsoft.Extensions.Hosting;
-using Microsoft.Extensions.Options;
 using System;
-using System.Collections.Generic;
-using System.Linq;
-using System.Text;
 using System.Threading;
 using System.Threading.Tasks;
+using Microsoft.Diagnostics.NETCore.Client;
+using Microsoft.Extensions.Hosting;
+using Microsoft.Extensions.Options;
 
 namespace Microsoft.Diagnostics.Monitoring.RestServer
 {
@@ -46,8 +43,9 @@ namespace Microsoft.Diagnostics.Monitoring.RestServer
                     {
                         //TODO In multi-process scenarios, how do we decide which process to choose?
                         //One possibility is to enable metrics after a request to begin polling for metrics
-                        int pid = _services.ResolveProcess(pid: null);
-                        await _pipeProcessor.Process(pid, Timeout.InfiniteTimeSpan, stoppingToken);
+                        int pid = await _services.ResolveProcessAsync(pid: null, stoppingToken);
+                        var client = new DiagnosticsClient(pid);
+                        await _pipeProcessor.Process(client, pid, Timeout.InfiniteTimeSpan, stoppingToken);
                     }
                     catch(Exception e) when (!(e is OperationCanceledException))
                     {
index 37bd13452d6b0496b391645f0639338aeb1015ef..e127a4e8e003083ff29059e2b81bdd633c814e7d 100644 (file)
@@ -1,4 +1,5 @@
-using System.Runtime.Serialization;
+using System;
+using System.Runtime.Serialization;
 
 namespace Microsoft.Diagnostics.Monitoring.RestServer.Models
 {
@@ -7,5 +8,13 @@ namespace Microsoft.Diagnostics.Monitoring.RestServer.Models
     {
         [DataMember(Name = "pid")]
         public int Pid { get; set; }
+
+        [DataMember(Name = "uid")]
+        public Guid Uid { get; set; }
+
+        public static ProcessModel FromProcessInfo(IProcessInfo processInfo)
+        {
+            return new ProcessModel() { Pid = processInfo.Pid, Uid = processInfo.Uid };
+        }
     }
 }
\ No newline at end of file
diff --git a/src/Microsoft.Diagnostics.Monitoring/ClientEndpointInfoSource.cs b/src/Microsoft.Diagnostics.Monitoring/ClientEndpointInfoSource.cs
new file mode 100644 (file)
index 0000000..7abee42
--- /dev/null
@@ -0,0 +1,44 @@
+// Licensed to the .NET Foundation under one or more agreements.
+// The .NET Foundation licenses this file to you under the MIT license.
+// See the LICENSE file in the project root for more information.
+
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Threading;
+using System.Threading.Tasks;
+using Microsoft.Diagnostics.NETCore.Client;
+
+namespace Microsoft.Diagnostics.Monitoring
+{
+    internal sealed class ClientEndpointInfoSource : IEndpointInfoSourceInternal
+    {
+        public Task<IEnumerable<IEndpointInfo>> GetEndpointInfoAsync(CancellationToken token)
+        {
+            List<IEndpointInfo> endpointInfos = new List<IEndpointInfo>();
+            foreach (int pid in DiagnosticsClient.GetPublishedProcesses())
+            {
+                // CONSIDER: Generate a "runtime instance identifier" based on the pipe name
+                // e.g. pid + disambiguator in GUID form.
+                endpointInfos.Add(new EndpointInfo(pid));
+            }
+
+            return Task.FromResult(endpointInfos.AsEnumerable());
+        }
+
+        private class EndpointInfo : IEndpointInfo
+        {
+            public EndpointInfo(int processId)
+            {
+                Endpoint = new PidIpcEndpoint(processId);
+                ProcessId = processId;
+            }
+
+            public IpcEndpoint Endpoint { get; }
+
+            public int ProcessId { get; }
+
+            public Guid RuntimeInstanceCookie => Guid.Empty;
+        }
+    }
+}
index 77e5c9822515767bba1ded944fbd5b9105fdce81..5735f645079f72ff33e236724e300f19c841670b 100644 (file)
@@ -14,7 +14,7 @@ namespace Microsoft.Diagnostics.Monitoring
         public override IList<EventPipeProvider> GetProviders() =>
             new EventPipeProvider[]
             {
-                new EventPipeProvider("Microsoft-DotNETCore-SampleProfiler", System.Diagnostics.Tracing.EventLevel.Informational),
+                new EventPipeProvider(SampleProfilerProviderName, System.Diagnostics.Tracing.EventLevel.Informational),
                 new EventPipeProvider("Microsoft-Windows-DotNETRuntime", System.Diagnostics.Tracing.EventLevel.Informational, (long) Tracing.Parsers.ClrTraceEventParser.Keywords.Default)
             };
     }
index 50febb2553420127d864da9a3adc6e7b1df0d61a..d95e657374a337df4d0a4cb1e42e0f5ca8c42dd0 100644 (file)
@@ -15,6 +15,8 @@ namespace Microsoft.Diagnostics.Monitoring
         public const string GrpcAspNetCoreServer = "Grpc.AspNetCore.Server";
         public const string DiagnosticSourceEventSource = "Microsoft-Diagnostics-DiagnosticSource";
         public const string TplEventSource = "System.Threading.Tasks.TplEventSource";
+        public const string SampleProfilerProviderName = "Microsoft-DotNETCore-SampleProfiler";
+        public const string EventPipeProviderName = "Microsoft-DotNETCore-EventPipe";
 
         public abstract IList<EventPipeProvider> GetProviders();
 
diff --git a/src/Microsoft.Diagnostics.Monitoring/Configuration/SampleProfilerConfiguration.cs b/src/Microsoft.Diagnostics.Monitoring/Configuration/SampleProfilerConfiguration.cs
new file mode 100644 (file)
index 0000000..de7d73d
--- /dev/null
@@ -0,0 +1,23 @@
+// Licensed to the .NET Foundation under one or more agreements.
+// The .NET Foundation licenses this file to you under the MIT license.
+// See the LICENSE file in the project root for more information.
+
+using Microsoft.Diagnostics.NETCore.Client;
+using System.Collections.Generic;
+using System.Diagnostics.Tracing;
+
+namespace Microsoft.Diagnostics.Monitoring
+{
+    public sealed class SampleProfilerConfiguration : MonitoringSourceConfiguration
+    {
+        public override IList<EventPipeProvider> GetProviders() =>
+            new EventPipeProvider[]
+            {
+                new EventPipeProvider(SampleProfilerProviderName, EventLevel.Informational)
+            };
+
+        public override int BufferSizeInMB => 1;
+
+        public override bool RequestRundown => false;
+    }
+}
index ea1762a3670d3d33dd0ca36949fc73ceccadc43f..d118a65e521b628105e0f54f60e330c4a5eb67e2 100644 (file)
@@ -17,11 +17,11 @@ namespace Microsoft.Diagnostics.Monitoring
     /// </summary>
     public interface IDiagnosticServices : IDisposable
     {
-        IEnumerable<int> GetProcesses();
+        Task<IEnumerable<IProcessInfo>> GetProcessesAsync(CancellationToken token);
 
-        int ResolveProcess(int? pid);
+        Task<int> ResolveProcessAsync(int? pid, CancellationToken token);
 
-        Task<Stream> GetDump(int pid, DumpType mode);
+        Task<Stream> GetDump(int pid, DumpType mode, CancellationToken token);
 
         Task<Stream> GetGcDump(int pid, CancellationToken token);
 
@@ -35,6 +35,13 @@ namespace Microsoft.Diagnostics.Monitoring
         Stream Stream { get; }
     }
 
+    public interface IProcessInfo
+    {
+        int Pid { get; }
+
+        Guid Uid { get; }
+    }
+
     public enum DumpType
     {
         Full = 1,
diff --git a/src/Microsoft.Diagnostics.Monitoring/Contracts/IEndpointInfoSource.cs b/src/Microsoft.Diagnostics.Monitoring/Contracts/IEndpointInfoSource.cs
new file mode 100644 (file)
index 0000000..d98ec2d
--- /dev/null
@@ -0,0 +1,30 @@
+// Licensed to the .NET Foundation under one or more agreements.
+// The .NET Foundation licenses this file to you under the MIT license.
+// See the LICENSE file in the project root for more information.
+
+using System;
+using System.Collections.Generic;
+using System.Threading;
+using System.Threading.Tasks;
+using Microsoft.Diagnostics.NETCore.Client;
+
+namespace Microsoft.Diagnostics.Monitoring
+{
+    internal interface IEndpointInfo
+    {
+        IpcEndpoint Endpoint { get; }
+
+        int ProcessId { get; }
+
+        Guid RuntimeInstanceCookie { get; }
+    }
+
+    public interface IEndpointInfoSource
+    {
+    }
+
+    internal interface IEndpointInfoSourceInternal : IEndpointInfoSource
+    {
+        Task<IEnumerable<IEndpointInfo>> GetEndpointInfoAsync(CancellationToken token);
+    }
+}
index 8c9d534ff12c4cb4975849f9e72aa463b905528c..45099f793965c7f5a2f8965956b1fd3495ec61cc 100644 (file)
@@ -21,15 +21,25 @@ namespace Microsoft.Diagnostics.Monitoring
     {
         private const int DockerEntrypointProcessId = 1;
 
+        // The amount of time to wait when checking if the docker entrypoint process is a .NET process
+        // with a diagnostics transport connection.
+        private static readonly TimeSpan DockerEntrypointWaitTimeout = TimeSpan.FromMilliseconds(250);
+
+        private readonly IEndpointInfoSourceInternal _endpointInfoSource;
         private readonly CancellationTokenSource _tokenSource = new CancellationTokenSource();
 
-        public IEnumerable<int> GetProcesses()
+        public DiagnosticServices(IEndpointInfoSource endpointInfoSource)
+        {
+            _endpointInfoSource = (IEndpointInfoSourceInternal)endpointInfoSource;
+        }
+
+        public async Task<IEnumerable<IProcessInfo>> GetProcessesAsync(CancellationToken token)
         {
             try
             {
-                //TODO This won't work properly with multi-container scenarios that don't share the process space.
-                //TODO We will need to use DiagnosticsAgent if we are the server.
-                return DiagnosticsClient.GetPublishedProcesses();
+                var endpointInfos = await _endpointInfoSource.GetEndpointInfoAsync(token);
+
+                return endpointInfos.Select(c => new ProcessInfo(c.RuntimeInstanceCookie, c.ProcessId));
             }
             catch (UnauthorizedAccessException)
             {
@@ -37,7 +47,7 @@ namespace Microsoft.Diagnostics.Monitoring
             }
         }
 
-        public async Task<Stream> GetDump(int pid, DumpType mode)
+        public async Task<Stream> GetDump(int pid, DumpType mode, CancellationToken token)
         {
             string dumpFilePath = Path.Combine(Path.GetTempPath(), FormattableString.Invariant($"{Guid.NewGuid()}_{pid}"));
             NETCore.Client.DumpType dumpType = MapDumpType(mode);
@@ -50,9 +60,9 @@ namespace Microsoft.Diagnostics.Monitoring
             }
             else
             {
+                var client = await GetClientAsync(pid, CancellationToken.None);
                 await Task.Run(() =>
                 {
-                    var client = new DiagnosticsClient(pid);
                     client.WriteDump(dumpType, dumpFilePath);
                 });
             }
@@ -60,13 +70,15 @@ namespace Microsoft.Diagnostics.Monitoring
             return new AutoDeleteFileStream(dumpFilePath);
         }
 
-        public async Task<Stream> GetGcDump(int pid, CancellationToken cancellationToken)
+        public async Task<Stream> GetGcDump(int pid, CancellationToken token)
         {
             var graph = new MemoryGraph(50_000);
-            await using var processor = new DiagnosticsEventPipeProcessor(PipeMode.GCDump,
+            await using var processor = new DiagnosticsEventPipeProcessor(
+                PipeMode.GCDump,
                 gcGraph: graph);
 
-            await processor.Process(pid, Timeout.InfiniteTimeSpan, cancellationToken);
+            var client = await GetClientAsync(pid, token);
+            await processor.Process(client, pid, Timeout.InfiniteTimeSpan, token);
 
             var dumper = new GCHeapDump(graph);
             dumper.CreationTool = "dotnet-monitor";
@@ -82,7 +94,8 @@ namespace Microsoft.Diagnostics.Monitoring
         public async Task<IStreamWithCleanup> StartTrace(int pid, MonitoringSourceConfiguration configuration, TimeSpan duration, CancellationToken token)
         {
             DiagnosticsMonitor monitor = new DiagnosticsMonitor(configuration);
-            Stream stream = await monitor.ProcessEvents(pid, duration, token);
+            var client = await GetClientAsync(pid, token);
+            Stream stream = await monitor.ProcessEvents(client, duration, token);
             return new StreamWithCleanup(monitor, stream);
         }
 
@@ -92,11 +105,13 @@ namespace Microsoft.Diagnostics.Monitoring
 
             loggerFactory.AddProvider(new StreamingLoggerProvider(outputStream, format, level));
 
-            await using var processor = new DiagnosticsEventPipeProcessor(PipeMode.Logs,
+            await using var processor = new DiagnosticsEventPipeProcessor(
+                PipeMode.Logs,
                 loggerFactory: loggerFactory,
                 logsLevel: level);
 
-            await processor.Process(pid, duration, token);
+            var client = await GetClientAsync(pid, token);
+            await processor.Process(client, pid, duration, token);
         }
 
         private static NETCore.Client.DumpType MapDumpType(DumpType dumpType)
@@ -116,7 +131,7 @@ namespace Microsoft.Diagnostics.Monitoring
             }
         }
 
-        public int ResolveProcess(int? pid)
+        public async Task<int> ResolveProcessAsync(int? pid, CancellationToken token)
         {
             if (pid.HasValue)
             {
@@ -126,24 +141,32 @@ namespace Microsoft.Diagnostics.Monitoring
             // Short-circuit for when running in a Docker container.
             if (RuntimeInfo.IsInDockerContainer)
             {
-                var client = new DiagnosticsClient(DockerEntrypointProcessId);
-                if (client.CheckTransport())
+                try
                 {
+                    var client = await GetClientAsync(DockerEntrypointProcessId, token);
+                    using var timeoutSource = new CancellationTokenSource(DockerEntrypointWaitTimeout);
+                    
+                    await client.WaitForConnectionAsync(timeoutSource.Token);
+
                     return DockerEntrypointProcessId;
                 }
+                catch
+                {
+                    // Process ID 1 doesn't exist or didn't advertise in the reverse pipe configuration.
+                }
             }
 
             // Only return a process ID if there is exactly one discoverable process.
-            int[] pids = GetProcesses().ToArray();
-            switch (pids.Length)
+            IProcessInfo[] processes = (await GetProcessesAsync(token)).ToArray();
+            switch (processes.Length)
             {
                 case 0:
                     throw new ArgumentException("Unable to discover a target process.");
                 case 1:
-                    return pids[0];
+                    return processes[0].Pid;
                 default:
 #if DEBUG
-                    Process process = pids.Select(pid => Process.GetProcessById(pid)).FirstOrDefault(p => string.Equals(p.ProcessName, "iisexpress", StringComparison.OrdinalIgnoreCase));
+                    Process process = processes.Select(p => Process.GetProcessById(p.Pid)).FirstOrDefault(p => string.Equals(p.ProcessName, "iisexpress", StringComparison.OrdinalIgnoreCase));
                     if (process != null)
                     {
                         return process.Id;
@@ -153,6 +176,19 @@ namespace Microsoft.Diagnostics.Monitoring
             }
         }
 
+        private async Task<DiagnosticsClient> GetClientAsync(int processId, CancellationToken token)
+        {
+            var endpointInfos = await _endpointInfoSource.GetEndpointInfoAsync(token);
+            var endpointInfo = endpointInfos.FirstOrDefault(c => c.ProcessId == processId);
+
+            if (null == endpointInfo)
+            {
+                throw new InvalidOperationException($"Diagnostics client for process ID {processId} does not exist.");
+            }
+
+            return new DiagnosticsClient(endpointInfo.Endpoint);
+        }
+
         public void Dispose()
         {
             _tokenSource.Cancel();
@@ -202,5 +238,18 @@ namespace Microsoft.Diagnostics.Monitoring
                 }
             }
         }
+
+        private sealed class ProcessInfo : IProcessInfo
+        {
+            public ProcessInfo(Guid uid, int pid)
+            {
+                Pid = pid;
+                Uid = uid;
+            }
+
+            public int Pid { get; }
+
+            public Guid Uid { get; }
+        }
     }
 }
index 9ce3e15319bcc99ce5c451d3df6a3cb0d096da69..a784bb4e8ff24b2c2a15a5438dd346528b31a0cc 100644 (file)
@@ -2,11 +2,6 @@
 // The .NET Foundation licenses this file to you under the MIT license.
 // See the LICENSE file in the project root for more information.
 
-using Graphs;
-using Microsoft.Diagnostics.NETCore.Client;
-using Microsoft.Diagnostics.Tracing;
-using Microsoft.Diagnostics.Tracing.Parsers.Clr;
-using Microsoft.Extensions.Logging;
 using System;
 using System.Collections.Generic;
 using System.Globalization;
@@ -15,6 +10,11 @@ using System.Linq;
 using System.Text.Json;
 using System.Threading;
 using System.Threading.Tasks;
+using Graphs;
+using Microsoft.Diagnostics.NETCore.Client;
+using Microsoft.Diagnostics.Tracing;
+using Microsoft.Diagnostics.Tracing.Parsers.Clr;
+using Microsoft.Extensions.Logging;
 
 namespace Microsoft.Diagnostics.Monitoring
 {
@@ -22,7 +22,8 @@ namespace Microsoft.Diagnostics.Monitoring
     {
         Logs = 1,
         Metrics,
-        GCDump
+        GCDump,
+        ProcessInfo
     }
 
     public class DiagnosticsEventPipeProcessor : IAsyncDisposable
@@ -33,14 +34,17 @@ namespace Microsoft.Diagnostics.Monitoring
         private readonly PipeMode _mode;
         private readonly int _metricIntervalSeconds;
         private readonly LogLevel _logsLevel;
+        private readonly Action<string> _processInfoCallback;
 
         public DiagnosticsEventPipeProcessor(
             PipeMode mode,
-            ILoggerFactory loggerFactory = null,
-            IEnumerable<IMetricsLogger> metricLoggers = null,
-            int metricIntervalSeconds = 10,
-            MemoryGraph gcGraph = null,
-            LogLevel logsLevel = LogLevel.Debug)
+            ILoggerFactory loggerFactory = null,              // PipeMode = Logs
+            LogLevel logsLevel = LogLevel.Debug,              // PipeMode = Logs
+            IEnumerable<IMetricsLogger> metricLoggers = null, // PipeMode = Metrics
+            int metricIntervalSeconds = 10,                   // PipeMode = Metrics
+            MemoryGraph gcGraph = null,                       // PipeMode = GCDump
+            Action<string> processInfoCallback = null         // PipeMode = ProcessInfo
+            )
         {
             _metricLoggers = metricLoggers ?? Enumerable.Empty<IMetricsLogger>();
             _mode = mode;
@@ -48,9 +52,10 @@ namespace Microsoft.Diagnostics.Monitoring
             _gcGraph = gcGraph;
             _metricIntervalSeconds = metricIntervalSeconds;
             _logsLevel = logsLevel;
+            _processInfoCallback = processInfoCallback;
         }
 
-        public async Task Process(int pid, TimeSpan duration, CancellationToken token)
+        public async Task Process(DiagnosticsClient client, int pid, TimeSpan duration, CancellationToken token)
         {
             await await Task.Factory.StartNew(async () =>
             {
@@ -72,9 +77,13 @@ namespace Microsoft.Diagnostics.Monitoring
                     {
                         config = new GCDumpSourceConfiguration();
                     }
+                    if (_mode == PipeMode.ProcessInfo)
+                    {
+                        config = new SampleProfilerConfiguration();
+                    }
 
                     monitor = new DiagnosticsMonitor(config);
-                    Stream sessionStream = await monitor.ProcessEvents(pid, duration, token);
+                    Stream sessionStream = await monitor.ProcessEvents(client, duration, token);
                     source = new EventPipeEventSource(sessionStream);
 
                     // Allows the event handling routines to stop processing before the duration expires.
@@ -98,6 +107,12 @@ namespace Microsoft.Diagnostics.Monitoring
                         handleEventsTask = HandleGCEvents(source, pid, stopFunc, token);
                     }
 
+                    if (_mode == PipeMode.ProcessInfo)
+                    {
+                        // ProcessInfo
+                        HandleProcessInfo(source, stopFunc, token);
+                    }
+
                     source.Process();
 
                     token.ThrowIfCancellationRequested();
@@ -455,6 +470,19 @@ namespace Microsoft.Diagnostics.Monitoring
             _gcGraph.AllowReading();
         }
 
+        private void HandleProcessInfo(EventPipeEventSource source, Func<Task> stopFunc, CancellationToken token)
+        {
+            source.Dynamic.AddCallbackForProviderEvent(MonitoringSourceConfiguration.EventPipeProviderName, "ProcessInfo", traceEvent =>
+            {
+                _processInfoCallback?.Invoke((string)traceEvent.PayloadByName("CommandLine"));
+            });
+
+            source.Dynamic.All += traceEvent =>
+            {
+                stopFunc();
+            };
+        }
+
         public async ValueTask DisposeAsync()
         {
             foreach (IMetricsLogger logger in _metricLoggers)
index 8e3b9741774e7fa78a9e632ab5e66e9a7249db7e..960f3c18d5f1cdb3a5bf008cfd050985ed205d7b 100644 (file)
@@ -2,21 +2,18 @@
 // The .NET Foundation licenses this file to you under the MIT license.
 // See the LICENSE file in the project root for more information.
 
-using Microsoft.Diagnostics.NETCore.Client;
 using System;
-using System.Collections.Generic;
 using System.IO;
-using System.Linq;
-using System.Text.Json;
 using System.Threading;
 using System.Threading.Tasks;
+using Microsoft.Diagnostics.NETCore.Client;
 
 namespace Microsoft.Diagnostics.Monitoring
 {
     public sealed class DiagnosticsMonitor : IAsyncDisposable
     {
         private readonly MonitoringSourceConfiguration _sourceConfig;
-        private readonly CancellationTokenSource _stopProcessingSource;
+        private readonly TaskCompletionSource<object> _stopProcessingSource;
         private readonly object _lock = new object();
         private Task _currentTask;
         private bool _disposed;
@@ -24,12 +21,12 @@ namespace Microsoft.Diagnostics.Monitoring
         public DiagnosticsMonitor(MonitoringSourceConfiguration sourceConfig)
         {
             _sourceConfig = sourceConfig;
-            _stopProcessingSource = new CancellationTokenSource();
+            _stopProcessingSource = new TaskCompletionSource<object>(TaskCreationOptions.RunContinuationsAsynchronously);
         }
 
         public Task CurrentProcessingTask => _currentTask;
 
-        public Task<Stream> ProcessEvents(int processId, TimeSpan duration, CancellationToken cancellationToken)
+        public Task<Stream> ProcessEvents(DiagnosticsClient client, TimeSpan duration, CancellationToken cancellationToken)
         {
             cancellationToken.ThrowIfCancellationRequested();
 
@@ -46,8 +43,6 @@ namespace Microsoft.Diagnostics.Monitoring
                 }
 
                 EventPipeSession session = null;
-                var client = new DiagnosticsClient(processId);
-
                 try
                 {
                     session = client.StartEventPipeSession(_sourceConfig.GetProviders(), _sourceConfig.RequestRundown, _sourceConfig.BufferSizeInMB);
@@ -61,19 +56,17 @@ namespace Microsoft.Diagnostics.Monitoring
                     throw new InvalidOperationException("Failed to start the event pipe session", ex);
                 }
 
-                CancellationTokenSource linkedSource = CancellationTokenSource.CreateLinkedTokenSource(_stopProcessingSource.Token, cancellationToken);
-
                 _currentTask = Task.Run( async () =>
                 {
-                    try
-                    {
-                        await Task.Delay(duration, linkedSource.Token);
-                    }
-                    finally
-                    {
-                        linkedSource.Dispose();
-                        StopSession(session);
-                    }
+                    using var linkedSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
+                    linkedSource.CancelAfter(duration);
+                    using var _ = linkedSource.Token.Register(() => _stopProcessingSource.TrySetResult(null));
+
+                    // Use TaskCompletionSource instead of Task.Delay with cancellation to avoid
+                    // using exceptions for normal termination of event stream.
+                    await _stopProcessingSource.Task.ConfigureAwait(false);
+                    
+                    StopSession(session);
                 });
 
                 return Task.FromResult(session.EventStream);
@@ -82,7 +75,7 @@ namespace Microsoft.Diagnostics.Monitoring
 
         public void StopProcessing()
         {
-            _stopProcessingSource.Cancel();
+            _stopProcessingSource.TrySetResult(null);
         }
 
         private static void StopSession(EventPipeSession session)
@@ -127,18 +120,17 @@ namespace Microsoft.Diagnostics.Monitoring
                 _currentTask = null;
                 _disposed = true;
             }
-            _stopProcessingSource.Cancel();
+            _stopProcessingSource.TrySetResult(null);
             if (currentTask != null)
             {
                 try
                 {
-                    await currentTask;
+                    await currentTask.ConfigureAwait(false);
                 }
                 catch (OperationCanceledException)
                 {
                 }
             }
-            _stopProcessingSource?.Dispose();
         }
     }
 }
\ No newline at end of file
index 2b81867ec3f446b118566ead749d57116ef454e9..c94be12217a9024d2859424748bcaf855f37dd1b 100644 (file)
@@ -27,6 +27,7 @@
     <PackageReference Include="Microsoft.Bcl.HashCode" Version="$(MicrosoftBclHashCodeVersion)" />
     <PackageReference Include="Microsoft.Diagnostics.Tracing.TraceEvent" Version="$(MicrosoftDiagnosticsTracingTraceEventVersion)" />
     <PackageReference Include="Microsoft.Bcl.AsyncInterfaces" Version="$(MicrosoftBclAsyncInterfacesVersion)" />
+    <PackageReference Include="Microsoft.Extensions.Hosting.Abstractions" Version="$(MicrosoftExtensionsHostingAbstractionsVersion)" />
     <PackageReference Include="Microsoft.Extensions.Logging" Version="$(MicrosoftExtensionsLoggingVersion)" />
     <PackageReference Include="System.Text.Json" Version="$(SystemTextJsonVersion)" />
   </ItemGroup>
@@ -38,4 +39,9 @@
   <ItemGroup>
     <None Include="..\Tools\dotnet-gcdump\DotNetHeapDump\README.md" Link="DotNetHeapDump\README.md" />
   </ItemGroup>
+
+  <ItemGroup>
+    <InternalsVisibleTo Include="dotnet-monitor" />
+    <InternalsVisibleTo Include="DotnetMonitor.UnitTests" />
+  </ItemGroup>
 </Project>
diff --git a/src/Microsoft.Diagnostics.Monitoring/ServerEndpointInfoSource.cs b/src/Microsoft.Diagnostics.Monitoring/ServerEndpointInfoSource.cs
new file mode 100644 (file)
index 0000000..6a9d22e
--- /dev/null
@@ -0,0 +1,248 @@
+// Licensed to the .NET Foundation under one or more agreements.
+// The .NET Foundation licenses this file to you under the MIT license.
+// See the LICENSE file in the project root for more information.
+
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Threading;
+using System.Threading.Tasks;
+using Microsoft.Diagnostics.NETCore.Client;
+
+namespace Microsoft.Diagnostics.Monitoring
+{
+    /// <summary>
+    /// Aggregates diagnostic endpoints that are established at a transport path via a reversed server.
+    /// </summary>
+    internal class ServerEndpointInfoSource : IEndpointInfoSourceInternal, IAsyncDisposable
+    {
+        // The amount of time to wait when checking if the a endpoint info should be
+        // pruned from the list of endpoint infos. If the runtime doesn't have a viable connection within
+        // this time, it will be pruned from the list.
+        private static readonly TimeSpan PruneWaitForConnectionTimeout = TimeSpan.FromMilliseconds(250);
+
+        private readonly CancellationTokenSource _cancellation = new CancellationTokenSource();
+        private readonly IList<IpcEndpointInfo> _endpointInfos = new List<IpcEndpointInfo>();
+        private readonly SemaphoreSlim _endpointInfosSemaphore = new SemaphoreSlim(1);
+        private readonly string _transportPath;
+
+        private Task _listenTask;
+        private bool _disposed = false;
+        private ReversedDiagnosticsServer _server;
+
+        /// <summary>
+        /// Constructs a <see cref="ServerEndpointInfoSource"/> that aggreates diagnostic endpoints
+        /// from a reversed diagnostics server at path specified by <paramref name="transportPath"/>.
+        /// </summary>
+        /// <param name="transportPath">
+        /// The path of the server endpoint.
+        /// On Windows, this can be a full pipe path or the name without the "\\.\pipe\" prefix.
+        /// On all other systems, this must be the full file path of the socket.
+        /// </param>
+        public ServerEndpointInfoSource(string transportPath)
+        {
+            _transportPath = transportPath;
+        }
+
+        public async ValueTask DisposeAsync()
+        {
+            if (!_disposed)
+            {
+                _cancellation.Cancel();
+
+                if (null != _listenTask)
+                {
+                    await _listenTask.ConfigureAwait(false);
+                }
+
+                _server?.Dispose();
+
+                _endpointInfosSemaphore.Dispose();
+
+                _cancellation.Dispose();
+
+                _disposed = true;
+            }
+        }
+
+        /// <summary>
+        /// Starts listening to the reversed diagnostics server for new connections.
+        /// </summary>
+        public void Listen()
+        {
+            Listen(ReversedDiagnosticsServer.MaxAllowedConnections);
+        }
+
+        /// <summary>
+        /// Starts listening to the reversed diagnostics server for new connections.
+        /// </summary>
+        /// <param name="maxConnections">The maximum number of connections the server will support.</param>
+        public void Listen(int maxConnections)
+        {
+            VerifyNotDisposed();
+
+            if (null != _server || null != _listenTask)
+            {
+                throw new InvalidOperationException(nameof(ServerEndpointInfoSource.Listen) + " method can only be called once.");
+            }
+
+            _server = new ReversedDiagnosticsServer(_transportPath, maxConnections);
+
+            _listenTask = ListenAsync(_cancellation.Token);
+        }
+
+        /// <summary>
+        /// Gets the list of <see cref="IpcEndpointInfo"/> served from the reversed diagnostics server.
+        /// </summary>
+        /// <param name="token">The token to monitor for cancellation requests.</param>
+        /// <returns>A list of active <see cref="IEndpointInfo"/> instances.</returns>
+        public async Task<IEnumerable<IEndpointInfo>> GetEndpointInfoAsync(CancellationToken token)
+        {
+            VerifyNotDisposed();
+
+            using CancellationTokenSource linkedSource = CancellationTokenSource.CreateLinkedTokenSource(token, _cancellation.Token);
+            CancellationToken linkedToken = linkedSource.Token;
+
+            // Prune connections that no longer have an active runtime instance before
+            // returning the list of connections.
+            await _endpointInfosSemaphore.WaitAsync(linkedToken).ConfigureAwait(false);
+            try
+            {
+                // Check the transport for each endpoint info and remove it if the check fails.
+                var endpointInfos = _endpointInfos.ToList();
+
+                var pruneTasks = new List<Task>();
+                foreach (IpcEndpointInfo info in endpointInfos)
+                {
+                    pruneTasks.Add(Task.Run(() => PruneIfNotViable(info, linkedToken), linkedToken));
+                }
+
+                await Task.WhenAll(pruneTasks).ConfigureAwait(false);
+
+                return _endpointInfos.Select(c => new EndpointInfo(c));
+            }
+            finally
+            {
+                _endpointInfosSemaphore.Release();
+            }
+        }
+
+        private async Task PruneIfNotViable(IpcEndpointInfo info, CancellationToken token)
+        {
+            using var timeoutSource = new CancellationTokenSource();
+            using var linkedSource = CancellationTokenSource.CreateLinkedTokenSource(token, timeoutSource.Token);
+
+            try
+            {
+                timeoutSource.CancelAfter(PruneWaitForConnectionTimeout);
+
+                await info.Endpoint.WaitForConnectionAsync(linkedSource.Token).ConfigureAwait(false);
+            }
+            catch
+            {
+                // Only remove the endpoint info if due to some exception
+                // other than cancelling the pruning operation.
+                if (!token.IsCancellationRequested)
+                {
+                    _endpointInfos.Remove(info);
+                    OnRemovedEndpointInfo(info);
+                    _server.RemoveConnection(info.RuntimeInstanceCookie);
+                }
+            }
+        }
+
+        /// <summary>
+        /// Accepts endpoint infos from the reversed diagnostics server.
+        /// </summary>
+        /// <param name="token">The token to monitor for cancellation requests.</param>
+        private async Task ListenAsync(CancellationToken token)
+        {
+            // Continuously accept endpoint infos from the reversed diagnostics server so
+            // that <see cref="ReversedDiagnosticsServer.AcceptAsync(CancellationToken)"/>
+            // is always awaited in order to to handle new runtime instance connections
+            // as well as existing runtime instance reconnections.
+            while (!token.IsCancellationRequested)
+            {
+                try
+                {
+                    IpcEndpointInfo info = await _server.AcceptAsync(token).ConfigureAwait(false);
+
+                    _ = Task.Run(() => ResumeAndQueueEndpointInfo(info, token), token);
+                }
+                catch (OperationCanceledException)
+                {
+                }
+            }
+        }
+
+        private async Task ResumeAndQueueEndpointInfo(IpcEndpointInfo info, CancellationToken token)
+        {
+            try
+            {
+                // Send ResumeRuntime message for runtime instances that connect to the server. This will allow
+                // those instances that are configured to pause on start to resume after the diagnostics
+                // connection has been made. Instances that are not configured to pause on startup will ignore
+                // the command and return success.
+                var client = new DiagnosticsClient(info.Endpoint);
+                try
+                {
+                    client.ResumeRuntime();
+                }
+                catch (ServerErrorException)
+                {
+                    // The runtime likely doesn't understand the ResumeRuntime command.
+                }
+
+                await _endpointInfosSemaphore.WaitAsync(token).ConfigureAwait(false);
+                try
+                {
+                    _endpointInfos.Add(info);
+
+                    OnAddedEndpointInfo(info);
+                }
+                finally
+                {
+                    _endpointInfosSemaphore.Release();
+                }
+            }
+            catch (Exception)
+            {
+                _server.RemoveConnection(info.RuntimeInstanceCookie);
+
+                throw;
+            }
+        }
+
+        internal virtual void OnAddedEndpointInfo(IpcEndpointInfo info)
+        {
+        }
+
+        internal virtual void OnRemovedEndpointInfo(IpcEndpointInfo info)
+        {
+        }
+
+        private void VerifyNotDisposed()
+        {
+            if (_disposed)
+            {
+                throw new ObjectDisposedException(nameof(ServerEndpointInfoSource));
+            }
+        }
+
+        private class EndpointInfo : IEndpointInfo
+        {
+            private readonly IpcEndpointInfo _info;
+
+            public EndpointInfo(IpcEndpointInfo info)
+            {
+                _info = info;
+            }
+
+            public IpcEndpoint Endpoint => _info.Endpoint;
+
+            public int ProcessId => _info.ProcessId;
+
+            public Guid RuntimeInstanceCookie => _info.RuntimeInstanceCookie;
+        }
+    }
+}
diff --git a/src/Microsoft.Diagnostics.Monitoring/ServiceCollectionExtensions.cs b/src/Microsoft.Diagnostics.Monitoring/ServiceCollectionExtensions.cs
new file mode 100644 (file)
index 0000000..06e16cb
--- /dev/null
@@ -0,0 +1,29 @@
+// Licensed to the .NET Foundation under one or more agreements.
+// The .NET Foundation licenses this file to you under the MIT license.
+// See the LICENSE file in the project root for more information.
+
+using Microsoft.Diagnostics.NETCore.Client;
+using Microsoft.Extensions.DependencyInjection;
+
+namespace Microsoft.Diagnostics.Monitoring
+{
+    public static class ServiceCollectionExtensions
+    {
+        public static IServiceCollection AddEndpointInfoSource(this IServiceCollection services, string reversedServerAddress, int? maxConnections = null)
+        {
+            if (string.IsNullOrWhiteSpace(reversedServerAddress))
+            {
+                return services.AddSingleton<IEndpointInfoSource, ClientEndpointInfoSource>();
+            }
+            else
+            {
+                // Construct the source now rather than delayed construction
+                // in order to be able to accept diagnostics connections immediately.
+                var serverSource = new ServerEndpointInfoSource(reversedServerAddress);
+                serverSource.Listen(maxConnections.GetValueOrDefault(ReversedDiagnosticsServer.MaxAllowedConnections));
+
+                return services.AddSingleton<IEndpointInfoSource>(serverSource);
+            }
+        }
+    }
+}
index 480f1e88928436e6dd19de405b9a86e3cb676733..89a4d4240bbf1a5851a9367a117613d513d57e52 100644 (file)
@@ -9,6 +9,8 @@ using System.IO;
 using System.Linq;
 using System.Runtime.InteropServices;
 using System.Text.RegularExpressions;
+using System.Threading;
+using System.Threading.Tasks;
 
 namespace Microsoft.Diagnostics.NETCore.Client
 {
@@ -17,22 +19,28 @@ namespace Microsoft.Diagnostics.NETCore.Client
     /// </summary>
     public sealed class DiagnosticsClient
     {
-        private readonly int _processId;
+        private readonly IpcEndpoint _endpoint;
 
-        public DiagnosticsClient(int processId)
+        public DiagnosticsClient(int processId) :
+            this(new PidIpcEndpoint(processId))
         {
-            _processId = processId;
+        }
+
+        internal DiagnosticsClient(IpcEndpoint endpoint)
+        {
+            _endpoint = endpoint;
         }
 
         /// <summary>
-        /// Checks that the client is able to communicate with target process over diagnostic transport.
+        /// Wait for an available diagnostic endpoint to the runtime instance.
         /// </summary>
+        /// <param name="token">The token to monitor for cancellation requests.</param>
         /// <returns>
-        /// True if client is able to communicate with target process; otherwise, false.
+        /// A task the completes when a diagnostic endpoint to the runtime instance becomes available.
         /// </returns>
-        public bool CheckTransport()
+        internal Task WaitForConnectionAsync(CancellationToken token)
         {
-            return IpcClient.CheckTransport(_processId);
+            return _endpoint.WaitForConnectionAsync(token);
         }
 
         /// <summary>
@@ -46,7 +54,7 @@ namespace Microsoft.Diagnostics.NETCore.Client
         /// </returns> 
         public EventPipeSession StartEventPipeSession(IEnumerable<EventPipeProvider> providers, bool requestRundown=true, int circularBufferMB=256)
         {
-            return new EventPipeSession(_processId, providers, requestRundown, circularBufferMB);
+            return new EventPipeSession(_endpoint, providers, requestRundown, circularBufferMB);
         }
 
         /// <summary>
@@ -60,7 +68,7 @@ namespace Microsoft.Diagnostics.NETCore.Client
         /// </returns> 
         public EventPipeSession StartEventPipeSession(EventPipeProvider provider, bool requestRundown=true, int circularBufferMB=256)
         {
-            return new EventPipeSession(_processId, new[] { provider }, requestRundown, circularBufferMB);
+            return new EventPipeSession(_endpoint, new[] { provider }, requestRundown, circularBufferMB);
         }
 
         /// <summary>
@@ -76,16 +84,16 @@ namespace Microsoft.Diagnostics.NETCore.Client
 
             byte[] payload = SerializeCoreDump(dumpPath, dumpType, logDumpGeneration);
             IpcMessage message = new IpcMessage(DiagnosticsServerCommandSet.Dump, (byte)DumpCommandId.GenerateCoreDump, payload);
-            IpcMessage response = IpcClient.SendMessage(_processId, message);
-            switch ((DiagnosticsServerCommandId)response.Header.CommandId)
+            IpcMessage response = IpcClient.SendMessage(_endpoint, message);
+            switch ((DiagnosticsServerResponseId)response.Header.CommandId)
             {
-                case DiagnosticsServerCommandId.Error:
+                case DiagnosticsServerResponseId.Error:
                     uint hr = BitConverter.ToUInt32(response.Payload, 0);
                     if (hr == (uint)DiagnosticsIpcError.UnknownCommand) {
                         throw new PlatformNotSupportedException($"Unsupported operating system: {RuntimeInformation.OSDescription}");
                     }
                     throw new ServerErrorException($"Writing dump failed (HRESULT: 0x{hr:X8})");
-                case DiagnosticsServerCommandId.OK:
+                case DiagnosticsServerResponseId.OK:
                     return;
                 default:
                     throw new ServerErrorException($"Writing dump failed - server responded with unknown command");
@@ -113,13 +121,13 @@ namespace Microsoft.Diagnostics.NETCore.Client
 
             byte[] serializedConfiguration = SerializeProfilerAttach((uint)attachTimeout.TotalSeconds, profilerGuid, profilerPath, additionalData);
             var message = new IpcMessage(DiagnosticsServerCommandSet.Profiler, (byte)ProfilerCommandId.AttachProfiler, serializedConfiguration);
-            var response = IpcClient.SendMessage(_processId, message);
-            switch ((DiagnosticsServerCommandId)response.Header.CommandId)
+            var response = IpcClient.SendMessage(_endpoint, message);
+            switch ((DiagnosticsServerResponseId)response.Header.CommandId)
             {
-                case DiagnosticsServerCommandId.Error:
+                case DiagnosticsServerResponseId.Error:
                     var hr = BitConverter.ToInt32(response.Payload, 0);
                     throw new ServerErrorException($"Profiler attach failed (HRESULT: 0x{hr:X8})");
-                case DiagnosticsServerCommandId.OK:
+                case DiagnosticsServerResponseId.OK:
                     return;
                 default:
                     throw new ServerErrorException($"Profiler attach failed - server responded with unknown command");
@@ -130,6 +138,42 @@ namespace Microsoft.Diagnostics.NETCore.Client
             // runtime timeout or respect attachTimeout as one total duration.
         }
 
+        internal void ResumeRuntime()
+        {
+            IpcMessage message = new IpcMessage(DiagnosticsServerCommandSet.Process, (byte)ProcessCommandId.ResumeRuntime);
+            var response = IpcClient.SendMessage(_endpoint, message);
+            switch ((DiagnosticsServerResponseId)response.Header.CommandId)
+            {
+                case DiagnosticsServerResponseId.Error:
+                    // Try fallback for Preview 7 and Preview 8
+                    ResumeRuntimeFallback();
+                    //var hr = BitConverter.ToInt32(response.Payload, 0);
+                    //throw new ServerErrorException($"Resume runtime failed (HRESULT: 0x{hr:X8})");
+                    return;
+                case DiagnosticsServerResponseId.OK:
+                    return;
+                default:
+                    throw new ServerErrorException($"Resume runtime failed - server responded with unknown command");
+            }
+        }
+
+        // Fallback command for .NET 5 Preview 7 and Preview 8
+        internal void ResumeRuntimeFallback()
+        {
+            IpcMessage message = new IpcMessage(DiagnosticsServerCommandSet.Server, (byte)DiagnosticServerCommandId.ResumeRuntime);
+            var response = IpcClient.SendMessage(_endpoint, message);
+            switch ((DiagnosticsServerResponseId)response.Header.CommandId)
+            {
+                case DiagnosticsServerResponseId.Error:
+                    var hr = BitConverter.ToInt32(response.Payload, 0);
+                    throw new ServerErrorException($"Resume runtime failed (HRESULT: 0x{hr:X8})");
+                case DiagnosticsServerResponseId.OK:
+                    return;
+                default:
+                    throw new ServerErrorException($"Resume runtime failed - server responded with unknown command");
+            }
+        }
+
         /// <summary>
         /// Get all the active processes that can be attached to.
         /// </summary>
@@ -138,10 +182,10 @@ namespace Microsoft.Diagnostics.NETCore.Client
         /// </returns>
         public static IEnumerable<int> GetPublishedProcesses()
         {
-            return Directory.GetFiles(IpcClient.IpcRootPath)
+            return Directory.GetFiles(PidIpcEndpoint.IpcRootPath)
                 .Select(namedPipe => (new FileInfo(namedPipe)).Name)
-                .Where(input => Regex.IsMatch(input, IpcClient.DiagnosticsPortPattern))
-                .Select(input => int.Parse(Regex.Match(input, IpcClient.DiagnosticsPortPattern).Groups[1].Value, NumberStyles.Integer))
+                .Where(input => Regex.IsMatch(input, PidIpcEndpoint.DiagnosticsPortPattern))
+                .Select(input => int.Parse(Regex.Match(input, PidIpcEndpoint.DiagnosticsPortPattern).Groups[1].Value, NumberStyles.Integer))
                 .Distinct();
         }
 
index fc6fa847d46f9e5114fa369b6627a04155ae8916..d62b7e616952aee440bc6c964763e49482d9fc82 100644 (file)
@@ -15,25 +15,25 @@ namespace Microsoft.Diagnostics.NETCore.Client
         private bool _requestRundown;
         private int _circularBufferMB;
         private long _sessionId;
-        private int _processId;
+        private IpcEndpoint _endpoint;
         private bool disposedValue = false; // To detect redundant calls
 
-        internal EventPipeSession(int processId, IEnumerable<EventPipeProvider> providers, bool requestRundown, int circularBufferMB)
+        internal EventPipeSession(IpcEndpoint endpoint, IEnumerable<EventPipeProvider> providers, bool requestRundown, int circularBufferMB)
         {
-            _processId = processId;
+            _endpoint = endpoint;
             _providers = providers;
             _requestRundown = requestRundown;
             _circularBufferMB = circularBufferMB;
             
             var config = new EventPipeSessionConfiguration(circularBufferMB, EventPipeSerializationFormat.NetTrace, providers, requestRundown);
             var message = new IpcMessage(DiagnosticsServerCommandSet.EventPipe, (byte)EventPipeCommandId.CollectTracing2, config.SerializeV2());
-            EventStream = IpcClient.SendMessage(processId, message, out var response);
-            switch ((DiagnosticsServerCommandId)response.Header.CommandId)
+            EventStream = IpcClient.SendMessage(endpoint, message, out var response);
+            switch ((DiagnosticsServerResponseId)response.Header.CommandId)
             {
-                case DiagnosticsServerCommandId.OK:
+                case DiagnosticsServerResponseId.OK:
                     _sessionId = BitConverter.ToInt64(response.Payload, 0);
                     break;
-                case DiagnosticsServerCommandId.Error:
+                case DiagnosticsServerResponseId.Error:
                     var hr = BitConverter.ToInt32(response.Payload, 0);
                     throw new ServerErrorException($"EventPipe session start failed (HRESULT: 0x{hr:X8})");
                 default:
@@ -51,13 +51,13 @@ namespace Microsoft.Diagnostics.NETCore.Client
             Debug.Assert(_sessionId > 0);
 
             byte[] payload = BitConverter.GetBytes(_sessionId);
-            var response = IpcClient.SendMessage(_processId, new IpcMessage(DiagnosticsServerCommandSet.EventPipe, (byte)EventPipeCommandId.StopTracing, payload));
+            var response = IpcClient.SendMessage(_endpoint, new IpcMessage(DiagnosticsServerCommandSet.EventPipe, (byte)EventPipeCommandId.StopTracing, payload));
 
-            switch ((DiagnosticsServerCommandId)response.Header.CommandId)
+            switch ((DiagnosticsServerResponseId)response.Header.CommandId)
             {
-                case DiagnosticsServerCommandId.OK:
+                case DiagnosticsServerResponseId.OK:
                     return;
-                case DiagnosticsServerCommandId.Error:
+                case DiagnosticsServerResponseId.Error:
                     var hr = BitConverter.ToInt32(response.Payload, 0);
                     throw new ServerErrorException($"EventPipe session stop failed (HRESULT: 0x{hr:X8})");
                 default:
diff --git a/src/Microsoft.Diagnostics.NETCore.Client/DiagnosticsIpc/ExposedSocketNetworkStream.cs b/src/Microsoft.Diagnostics.NETCore.Client/DiagnosticsIpc/ExposedSocketNetworkStream.cs
new file mode 100644 (file)
index 0000000..dbf6e3e
--- /dev/null
@@ -0,0 +1,19 @@
+// Licensed to the .NET Foundation under one or more agreements.
+// The .NET Foundation licenses this file to you under the MIT license.
+// See the LICENSE file in the project root for more information.
+
+using System.Net.Sockets;
+
+namespace Microsoft.Diagnostics.NETCore.Client
+{
+    internal sealed class ExposedSocketNetworkStream :
+        NetworkStream
+    {
+        public ExposedSocketNetworkStream(Socket socket, bool ownsSocket)
+            : base(socket, ownsSocket)
+        {
+        }
+
+        public new Socket Socket => base.Socket;
+    }
+}
diff --git a/src/Microsoft.Diagnostics.NETCore.Client/DiagnosticsIpc/IpcAdvertise.cs b/src/Microsoft.Diagnostics.NETCore.Client/DiagnosticsIpc/IpcAdvertise.cs
new file mode 100644 (file)
index 0000000..fbf11db
--- /dev/null
@@ -0,0 +1,91 @@
+// Licensed to the .NET Foundation under one or more agreements.
+// The .NET Foundation licenses this file to you under the MIT license.
+// See the LICENSE file in the project root for more information.
+
+using System;
+using System.Linq;
+using System.IO;
+using System.Text;
+using System.Threading;
+using System.Threading.Tasks;
+
+namespace Microsoft.Diagnostics.NETCore.Client
+{
+    /**
+     * ==ADVERTISE PROTOCOL==
+     * Before standard IPC Protocol communication can occur on a client-mode connection
+     * the runtime must advertise itself over the connection. ALL SUBSEQUENT COMMUNICATION 
+     * IS STANDARD DIAGNOSTICS IPC PROTOCOL COMMUNICATION.
+     * 
+     * The flow for Advertise is a one-way burst of 34 bytes consisting of
+     * 8 bytes  - "ADVR_V1\0" (ASCII chars + null byte)
+     * 16 bytes - CLR Instance Cookie (little-endian)
+     * 8 bytes  - PID (little-endian)
+     * 2 bytes  - future
+     */
+
+    internal sealed class IpcAdvertise
+    {
+        private static byte[] Magic_V1 => Encoding.ASCII.GetBytes("ADVR_V1" + '\0');
+        private static readonly int IpcAdvertiseV1SizeInBytes = Magic_V1.Length + 16 + 8 + 2; // 34 bytes
+
+        private IpcAdvertise(byte[] magic, Guid cookie, UInt64 pid, UInt16 future)
+        {
+            Future = future;
+            Magic = magic;
+            ProcessId = pid;
+            RuntimeInstanceCookie = cookie;
+        }
+
+        public static async Task<IpcAdvertise> ParseAsync(Stream stream, CancellationToken token)
+        {
+            byte[] buffer = new byte[IpcAdvertiseV1SizeInBytes];
+
+            int totalRead = 0;
+            do
+            {
+                int read = await stream.ReadAsync(buffer, totalRead, buffer.Length - totalRead, token).ConfigureAwait(false);
+                if (0 == read)
+                {
+                    throw new EndOfStreamException();
+                }
+                totalRead += read;
+            }
+            while (totalRead < buffer.Length);
+
+            int index = 0;
+            byte[] magic = new byte[Magic_V1.Length];
+            Array.Copy(buffer, magic, Magic_V1.Length);
+            index += Magic_V1.Length;
+
+            if (!Magic_V1.SequenceEqual(magic))
+            {
+                throw new Exception("Invalid advertise message from client connection");
+            }
+
+            byte[] cookieBuffer = new byte[16];
+            Array.Copy(buffer, index, cookieBuffer, 0, 16);
+            Guid cookie = new Guid(cookieBuffer);
+            index += 16;
+
+            UInt64 pid = BitConverter.ToUInt64(buffer, index);
+            index += 8;
+
+            UInt16 future = BitConverter.ToUInt16(buffer, index);
+            index += 2;
+
+            // FUTURE: switch on incoming magic and change if version ever increments
+            return new IpcAdvertise(magic, cookie, pid, future);
+        }
+
+        public override string ToString()
+        {
+            return $"{{ Magic={Magic}; ClrInstanceId={RuntimeInstanceCookie}; ProcessId={ProcessId}; Future={Future} }}";
+        }
+
+        private UInt16 Future { get; } = 0;
+        public byte[] Magic { get; } = Magic_V1;
+        public UInt64 ProcessId { get; } = 0;
+        public Guid RuntimeInstanceCookie { get; } = Guid.Empty;
+    }
+}
index c84395460f2df9d189a1174e257c6bb72f98aafd..9dd0a2fecf87fbce3a113269a6947f7180240a6e 100644 (file)
 // See the LICENSE file in the project root for more information.
 
 using System;
-using System.Diagnostics;
 using System.IO;
-using System.IO.Pipes;
-using System.Linq;
-using System.Net;
-using System.Net.Sockets;
-using System.Runtime.InteropServices;
-using System.Security.Principal;
+using System.Threading;
 
 namespace Microsoft.Diagnostics.NETCore.Client
 {
     internal class IpcClient
     {
-        public static string IpcRootPath { get; } = RuntimeInformation.IsOSPlatform(OSPlatform.Windows) ? @"\\.\pipe\" : Path.GetTempPath();
-        public static string DiagnosticsPortPattern { get; } = RuntimeInformation.IsOSPlatform(OSPlatform.Windows) ? @"^dotnet-diagnostic-(\d+)$" : @"^dotnet-diagnostic-(\d+)-(\d+)-socket$";
-
-        private static double ConnectTimeoutMilliseconds { get; } = TimeSpan.FromSeconds(3).TotalMilliseconds;
-
-        /// <summary>
-        /// Get the OS Transport to be used for communicating with a dotnet process.
-        /// </summary>
-        /// <param name="processId">The PID of the dotnet process to get the transport for</param>
-        /// <returns>A System.IO.Stream wrapper around the transport</returns>
-        private static Stream GetTransport(int processId)
-        {
-            try 
-            {
-                var process = Process.GetProcessById(processId);
-            }
-            catch (System.ArgumentException)
-            {
-                throw new ServerNotAvailableException($"Process {processId} is not running.");
-            }
-            catch (System.InvalidOperationException)
-            {
-                throw new ServerNotAvailableException($"Process {processId} seems to be elevated.");
-            }
-            if (RuntimeInformation.IsOSPlatform(OSPlatform.Windows))
-            {
-                string pipeName = $"dotnet-diagnostic-{processId}";
-                var namedPipe = new NamedPipeClientStream(
-                    ".", pipeName, PipeDirection.InOut, PipeOptions.None, TokenImpersonationLevel.Impersonation);
-                namedPipe.Connect((int)ConnectTimeoutMilliseconds);
-                return namedPipe;
-            }
-            else
-            {
-                string ipcPort;
-                try
-                {
-                    ipcPort = Directory.GetFiles(IpcRootPath, $"dotnet-diagnostic-{processId}-*-socket") // Try best match.
-                                .OrderByDescending(f => new FileInfo(f).LastWriteTime)
-                                .FirstOrDefault();
-                    if (ipcPort == null)
-                    {
-                        throw new ServerNotAvailableException($"Process {processId} not running compatible .NET Core runtime.");
-                    }
-                }
-                catch (InvalidOperationException)
-                {
-                    throw new ServerNotAvailableException($"Process {processId} not running compatible .NET Core runtime.");
-                }
-                string path = Path.Combine(IpcRootPath, ipcPort);
-                var remoteEP = CreateUnixDomainSocketEndPoint(path);
-
-                var socket = new Socket(AddressFamily.Unix, SocketType.Stream, ProtocolType.Unspecified);
-                socket.Connect(remoteEP);
-                return new NetworkStream(socket, ownsSocket: true);
-            }
-        }
-
-        /// <summary>
-        /// Checks that the client is able to communicate with target process over diagnostic transport.
-        /// </summary>
-        /// <returns>
-        /// True if client is able to communicate with target process; otherwise, false.
-        /// </returns>
-        public static bool CheckTransport(int processId)
-        {
-            try
-            {
-                using var stream = GetTransport(processId);
-                return null != stream;
-            }
-            catch (Exception)
-            {
-                return false;
-            }
-        }
+        // The amount of time to wait for a stream to be available for consumption by the Connect method.
+        private static readonly TimeSpan ConnectTimeout = TimeSpan.FromSeconds(3);
 
         /// <summary>
         /// Sends a single DiagnosticsIpc Message to the dotnet process with PID processId.
         /// </summary>
-        /// <param name="processId">The PID of the dotnet process</param>
+        /// <param name="endpoint">An endpoint that provides a diagnostics connection to a runtime instance.</param>
         /// <param name="message">The DiagnosticsIpc Message to be sent</param>
         /// <returns>The response DiagnosticsIpc Message from the dotnet process</returns>
-        public static IpcMessage SendMessage(int processId, IpcMessage message)
+        public static IpcMessage SendMessage(IpcEndpoint endpoint, IpcMessage message)
         {
-            using (var stream = GetTransport(processId))
+            using (var stream = endpoint.Connect(ConnectTimeout))
             {
                 Write(stream, message);
                 return Read(stream);
@@ -113,13 +32,13 @@ namespace Microsoft.Diagnostics.NETCore.Client
         /// Sends a single DiagnosticsIpc Message to the dotnet process with PID processId
         /// and returns the Stream for reuse in Optional Continuations.
         /// </summary>
-        /// <param name="processId">The PID of the dotnet process</param>
+        /// <param name="endpoint">An endpoint that provides a diagnostics connection to a runtime instance.</param>
         /// <param name="message">The DiagnosticsIpc Message to be sent</param>
         /// <param name="response">out var for response message</param>
         /// <returns>The response DiagnosticsIpc Message from the dotnet process</returns>
-        public static Stream SendMessage(int processId, IpcMessage message, out IpcMessage response)
+        public static Stream SendMessage(IpcEndpoint endpoint, IpcMessage message, out IpcMessage response)
         {
-            var stream = GetTransport(processId);
+            var stream = endpoint.Connect(ConnectTimeout);
             Write(stream, message);
             response = Read(stream);
             return stream;
@@ -139,22 +58,5 @@ namespace Microsoft.Diagnostics.NETCore.Client
         {
             return IpcMessage.Parse(stream);
         }
-
-        private static EndPoint CreateUnixDomainSocketEndPoint(string path)
-        {
-#if NETCOREAPP
-            return new UnixDomainSocketEndPoint(path);
-#elif NETSTANDARD2_0
-            // UnixDomainSocketEndPoint is not part of .NET Standard 2.0
-            var type = typeof(Socket).Assembly.GetType("System.Net.Sockets.UnixDomainSocketEndPoint")
-                       ?? Type.GetType("System.Net.Sockets.UnixDomainSocketEndPoint, System.Core");
-            if (type == null)
-            {
-                throw new PlatformNotSupportedException("Current process is not running a compatible .NET Core runtime.");
-            }
-            var ctor = type.GetConstructor(new[] { typeof(string) });
-            return (EndPoint)ctor.Invoke(new object[] { path });
-#endif
-        }
     }
 }
index 7c870e3c1fbf36e3fc0e1b3ca17dc393e74bc15d..40db46de664a9ae2491eef35feea6fa356c9ba22 100644 (file)
@@ -13,14 +13,27 @@ namespace Microsoft.Diagnostics.NETCore.Client
         Dump           = 0x01,
         EventPipe      = 0x02,
         Profiler       = 0x03,
+        Process        = 0x04,
 
         Server         = 0xFF,
     }
 
-    internal enum DiagnosticsServerCommandId : byte
+    // For .NET 5 Preview 7 and Preview 8, use this with the
+    // DiagnosticsServerCommandSet.Server command set.
+    // For .NET 5 RC and later, use ProcessCommandId.ResumeRuntime with
+    // the DiagnosticsServerCommandSet.Process command set.
+    internal enum DiagnosticServerCommandId : byte
     {
-        OK    = 0x00,
-        Error = 0xFF,
+        // 0x00 used in DiagnosticServerResponseId
+        ResumeRuntime = 0x01,
+        // 0xFF used DiagnosticServerResponseId
+    };
+
+    internal enum DiagnosticsServerResponseId : byte
+    {
+        OK            = 0x00,
+        // future
+        Error         = 0xFF,
     }
 
     internal enum EventPipeCommandId : byte
@@ -39,4 +52,10 @@ namespace Microsoft.Diagnostics.NETCore.Client
     {
         AttachProfiler = 0x01,
     }
+
+    internal enum ProcessCommandId : byte
+    {
+        GetProcessInfo = 0x00,
+        ResumeRuntime  = 0x01
+    }
 }
index 9f03a66c6657d31cd95e9622bf25264086d8e111..b015673e1d875ab834817eefaa32824007288791 100644 (file)
@@ -72,9 +72,9 @@ namespace Microsoft.Diagnostics.NETCore.Client
         public IpcMessage()
         { }
 
-        public IpcMessage(IpcHeader header, byte[] payload)
+        public IpcMessage(IpcHeader header, byte[] payload = null)
         {
-            Payload = payload;
+            Payload = payload ?? Array.Empty<byte>();
             Header = header;
         }
 
@@ -90,7 +90,7 @@ namespace Microsoft.Diagnostics.NETCore.Client
         { 
             byte[] serializedData = null;
             // Verify things will fit in the size capacity
-            Header.Size = checked((UInt16)(IpcHeader.HeaderSizeInBytes + Payload.Length)); ;
+            Header.Size = checked((UInt16)(IpcHeader.HeaderSizeInBytes + Payload.Length));
             byte[] headerBytes = Header.Serialize();
 
             using (var stream = new MemoryStream())
diff --git a/src/Microsoft.Diagnostics.NETCore.Client/DiagnosticsIpc/IpcServerTransport.cs b/src/Microsoft.Diagnostics.NETCore.Client/DiagnosticsIpc/IpcServerTransport.cs
new file mode 100644 (file)
index 0000000..3881ac2
--- /dev/null
@@ -0,0 +1,201 @@
+// Licensed to the .NET Foundation under one or more agreements.
+// The .NET Foundation licenses this file to you under the MIT license.
+// See the LICENSE file in the project root for more information.
+
+using System;
+using System.IO;
+using System.IO.Pipes;
+using System.Net.Sockets;
+using System.Runtime.InteropServices;
+using System.Threading;
+using System.Threading.Tasks;
+
+namespace Microsoft.Diagnostics.NETCore.Client
+{
+    internal abstract class IpcServerTransport : IDisposable
+    {
+        private bool _disposed;
+
+        public static IpcServerTransport Create(string transportPath, int maxConnections)
+        {
+            if (RuntimeInformation.IsOSPlatform(OSPlatform.Windows))
+            {
+                return new WindowsPipeServerTransport(transportPath, maxConnections);
+            }
+            else
+            {
+                return new UnixDomainSocketServerTransport(transportPath, maxConnections);
+            }
+        }
+
+        public void Dispose()
+        {
+            if (!_disposed)
+            {
+                Dispose(disposing: true);
+
+                _disposed = true;
+            }
+        }
+
+        protected virtual void Dispose(bool disposing)
+        {
+        }
+
+        public abstract Task<Stream> AcceptAsync(CancellationToken token);
+
+        public static int MaxAllowedConnections
+        {
+            get
+            {
+                if (RuntimeInformation.IsOSPlatform(OSPlatform.Windows))
+                {
+                    return NamedPipeServerStream.MaxAllowedServerInstances;
+                }
+                else
+                {
+                    return (int)SocketOptionName.MaxConnections;
+                }
+            }
+        }
+
+        protected void VerifyNotDisposed()
+        {
+            if (_disposed)
+            {
+                throw new ObjectDisposedException(this.GetType().Name);
+            }
+        }
+    }
+
+    internal sealed class WindowsPipeServerTransport : IpcServerTransport
+    {
+        private const string PipePrefix = @"\\.\pipe\";
+
+        private NamedPipeServerStream _stream;
+
+        private readonly CancellationTokenSource _cancellation = new CancellationTokenSource();
+        private readonly string _pipeName;
+        private readonly int _maxInstances;
+
+        public WindowsPipeServerTransport(string pipeName, int maxInstances)
+        {
+            _maxInstances = maxInstances;
+            _pipeName = pipeName.StartsWith(PipePrefix) ? pipeName.Substring(PipePrefix.Length) : pipeName;
+            CreateNewPipeServer();
+        }
+
+        protected override void Dispose(bool disposing)
+        {
+            if (disposing)
+            {
+                _cancellation.Cancel();
+
+                _stream.Dispose();
+
+                _cancellation.Dispose();
+            }
+        }
+
+        public override async Task<Stream> AcceptAsync(CancellationToken token)
+        {
+            VerifyNotDisposed();
+
+            using var linkedSource = CancellationTokenSource.CreateLinkedTokenSource(token, _cancellation.Token);
+
+            NamedPipeServerStream connectedStream;
+            try
+            {
+                await _stream.WaitForConnectionAsync(linkedSource.Token).ConfigureAwait(false);
+
+                connectedStream = _stream;
+            }
+            finally
+            {
+                if (!_cancellation.IsCancellationRequested)
+                {
+                    CreateNewPipeServer();
+                }
+            }
+            return connectedStream;
+        }
+
+        private void CreateNewPipeServer()
+        {
+            _stream = new NamedPipeServerStream(
+                _pipeName,
+                PipeDirection.InOut,
+                _maxInstances,
+                PipeTransmissionMode.Byte,
+                PipeOptions.Asynchronous);
+        }
+    }
+
+    internal sealed class UnixDomainSocketServerTransport : IpcServerTransport
+    {
+        private readonly CancellationTokenSource _cancellation = new CancellationTokenSource();
+        private readonly int _backlog;
+        private readonly string _path;
+
+        private UnixDomainSocket _socket;
+
+        public UnixDomainSocketServerTransport(string path, int backlog)
+        {
+            _backlog = backlog;
+            _path = path;
+
+            CreateNewSocketServer();
+        }
+
+        protected override void Dispose(bool disposing)
+        {
+            if (disposing)
+            {
+                _cancellation.Cancel();
+
+                try
+                {
+                    _socket.Shutdown(SocketShutdown.Both);
+                }
+                catch { }
+                finally
+                {
+                    _socket.Close(0);
+                }
+                _socket.Dispose();
+
+                _cancellation.Dispose();
+            }
+        }
+
+        public override async Task<Stream> AcceptAsync(CancellationToken token)
+        {
+            VerifyNotDisposed();
+
+            using var linkedSource = CancellationTokenSource.CreateLinkedTokenSource(token, _cancellation.Token);
+            try
+            {
+                Socket socket = await _socket.AcceptAsync(linkedSource.Token).ConfigureAwait(false);
+
+                return new ExposedSocketNetworkStream(socket, ownsSocket: true);
+            }
+            catch (Exception)
+            {
+                // Recreate socket if transport is not disposed.
+                if (!_cancellation.IsCancellationRequested)
+                {
+                    CreateNewSocketServer();
+                }
+                throw;
+            }
+        }
+
+        private void CreateNewSocketServer()
+        {
+            _socket = new UnixDomainSocket();
+            _socket.Bind(_path);
+            _socket.Listen(_backlog);
+            _socket.LingerState.Enabled = false;
+        }
+    }
+}
diff --git a/src/Microsoft.Diagnostics.NETCore.Client/DiagnosticsIpc/IpcTransport.cs b/src/Microsoft.Diagnostics.NETCore.Client/DiagnosticsIpc/IpcTransport.cs
new file mode 100644 (file)
index 0000000..f742f91
--- /dev/null
@@ -0,0 +1,207 @@
+// Licensed to the .NET Foundation under one or more agreements.
+// The .NET Foundation licenses this file to you under the MIT license.
+// See the LICENSE file in the project root for more information.
+
+using System;
+using System.Diagnostics;
+using System.IO;
+using System.IO.Pipes;
+using System.Linq;
+using System.Runtime.InteropServices;
+using System.Security.Principal;
+using System.Threading;
+using System.Threading.Tasks;
+
+namespace Microsoft.Diagnostics.NETCore.Client
+{
+    internal abstract class IpcEndpoint
+    {
+        /// <summary>
+        /// Connects to the underlying IPC transport and opens a read/write-able Stream
+        /// </summary>
+        /// <param name="timeout">The amount of time to block attempting to connect</param>
+        /// <returns>A Stream for writing and reading data to and from the target .NET process</returns>
+        public abstract Stream Connect(TimeSpan timeout);
+
+        /// <summary>
+        /// Wait for an available diagnostic endpoint to the runtime instance.
+        /// </summary>
+        /// <param name="token">The token to monitor for cancellation requests.</param>
+        /// <returns>
+        /// A task the completes when a diagnostic endpoint to the runtime instance becomes available.
+        /// </returns>
+        public abstract Task WaitForConnectionAsync(CancellationToken token);
+    }
+
+    internal class ServerIpcEndpoint : IpcEndpoint
+    {
+        private readonly Guid _runtimeId;
+        private readonly ReversedDiagnosticsServer _server;
+
+        public ServerIpcEndpoint(ReversedDiagnosticsServer server, Guid runtimeId)
+        {
+            _runtimeId = runtimeId;
+            _server = server;
+        }
+
+        /// <remarks>
+        /// This will block until the diagnostic stream is provided. This block can happen if
+        /// the stream is acquired previously and the runtime instance has not yet reconnected
+        /// to the reversed diagnostics server.
+        /// </remarks>
+        public override Stream Connect(TimeSpan timeout)
+        {
+            return _server.Connect(_runtimeId, timeout);
+        }
+
+        public override async Task WaitForConnectionAsync(CancellationToken token)
+        {
+            await _server.WaitForConnectionAsync(_runtimeId, token).ConfigureAwait(false);
+        }
+
+        public override bool Equals(object obj)
+        {
+            return Equals(obj as ServerIpcEndpoint);
+        }
+
+        public bool Equals(ServerIpcEndpoint other)
+        {
+            return other != null && other._runtimeId == _runtimeId && other._server == _server;
+        }
+
+        public override int GetHashCode()
+        {
+            return _runtimeId.GetHashCode() ^ _server.GetHashCode();
+        }
+    }
+
+    internal class PidIpcEndpoint : IpcEndpoint
+    {
+        public static string IpcRootPath { get; } = RuntimeInformation.IsOSPlatform(OSPlatform.Windows) ? @"\\.\pipe\" : Path.GetTempPath();
+        public static string DiagnosticsPortPattern { get; } = RuntimeInformation.IsOSPlatform(OSPlatform.Windows) ? @"^dotnet-diagnostic-(\d+)$" : @"^dotnet-diagnostic-(\d+)-(\d+)-socket$";
+
+        private int _pid;
+
+        /// <summary>
+        /// Creates a reference to a .NET process's IPC Transport
+        /// using the default rules for a given pid
+        /// </summary>
+        /// <param name="pid">The pid of the target process</param>
+        /// <returns>A reference to the IPC Transport</returns>
+        public PidIpcEndpoint(int pid)
+        {
+            _pid = pid;
+        }
+
+        public override Stream Connect(TimeSpan timeout)
+        {
+            string address = GetDefaultAddress();
+            if (RuntimeInformation.IsOSPlatform(OSPlatform.Windows))
+            {
+                var namedPipe = new NamedPipeClientStream(
+                    ".",
+                    address,
+                    PipeDirection.InOut,
+                    PipeOptions.None,
+                    TokenImpersonationLevel.Impersonation);
+                namedPipe.Connect((int)timeout.TotalMilliseconds);
+                return namedPipe;
+            }
+            else
+            {
+                var socket = new UnixDomainSocket();
+                socket.Connect(Path.Combine(IpcRootPath, address), timeout);
+                return new ExposedSocketNetworkStream(socket, ownsSocket: true);
+            }
+        }
+
+        public override async Task WaitForConnectionAsync(CancellationToken token)
+        {
+            using var _ = await ConnectStreamAsync(token).ConfigureAwait(false);
+        }
+
+        async Task<Stream> ConnectStreamAsync(CancellationToken token)
+        {
+            string address = GetDefaultAddress();
+            if (RuntimeInformation.IsOSPlatform(OSPlatform.Windows))
+            {
+                var namedPipe = new NamedPipeClientStream(
+                    ".",
+                    address,
+                    PipeDirection.InOut,
+                    PipeOptions.None,
+                    TokenImpersonationLevel.Impersonation);
+                await namedPipe.ConnectAsync(token).ConfigureAwait(false);
+                return namedPipe;
+            }
+            else
+            {
+                var socket = new UnixDomainSocket();
+                await socket.ConnectAsync(Path.Combine(IpcRootPath, address), token).ConfigureAwait(false);
+                return new ExposedSocketNetworkStream(socket, ownsSocket: true);
+            }
+        }
+
+        private string GetDefaultAddress()
+        {
+            try
+            {
+                var process = Process.GetProcessById(_pid);
+            }
+            catch (ArgumentException)
+            {
+                throw new ServerNotAvailableException($"Process {_pid} is not running.");
+            }
+            catch (InvalidOperationException)
+            {
+                throw new ServerNotAvailableException($"Process {_pid} seems to be elevated.");
+            }
+
+            if (!TryGetDefaultAddress(_pid, out string transportName))
+            {
+                throw new ServerNotAvailableException($"Process {_pid} not running compatible .NET runtime.");
+            }
+
+            return transportName;
+        }
+
+        private static bool TryGetDefaultAddress(int pid, out string defaultAddress)
+        {
+            defaultAddress = null;
+
+            if (RuntimeInformation.IsOSPlatform(OSPlatform.Windows))
+            {
+                defaultAddress = $"dotnet-diagnostic-{pid}";
+            }
+            else
+            {
+                try
+                {
+                    defaultAddress = Directory.GetFiles(IpcRootPath, $"dotnet-diagnostic-{pid}-*-socket") // Try best match.
+                        .OrderByDescending(f => new FileInfo(f).LastWriteTime)
+                        .FirstOrDefault();
+                }
+                catch (InvalidOperationException)
+                {
+                }
+            }
+
+            return !string.IsNullOrEmpty(defaultAddress);
+        }
+
+        public override bool Equals(object obj)
+        {
+            return Equals(obj as PidIpcEndpoint);
+        }
+
+        public bool Equals(PidIpcEndpoint other)
+        {
+            return other != null && other._pid == _pid;
+        }
+
+        public override int GetHashCode()
+        {
+            return _pid.GetHashCode();
+        }
+    }
+}
\ No newline at end of file
diff --git a/src/Microsoft.Diagnostics.NETCore.Client/DiagnosticsIpc/UnixDomainSocket.cs b/src/Microsoft.Diagnostics.NETCore.Client/DiagnosticsIpc/UnixDomainSocket.cs
new file mode 100644 (file)
index 0000000..4cf78a6
--- /dev/null
@@ -0,0 +1,124 @@
+// Licensed to the .NET Foundation under one or more agreements.
+// The .NET Foundation licenses this file to you under the MIT license.
+// See the LICENSE file in the project root for more information.
+
+using System;
+using System.Diagnostics;
+using System.IO;
+using System.Net;
+using System.Net.Sockets;
+using System.Threading;
+using System.Threading.Tasks;
+
+namespace Microsoft.Diagnostics.NETCore.Client
+{
+    internal sealed class UnixDomainSocket : Socket
+    {
+        private bool _ownsSocketFile;
+        private string _path;
+
+        public UnixDomainSocket() :
+            base(AddressFamily.Unix, SocketType.Stream, ProtocolType.Unspecified)
+        {
+        }
+
+        public async Task<Socket> AcceptAsync(CancellationToken token)
+        {
+            using (token.Register(() => Close(0)))
+            {
+                try
+                {
+                    return await Task.Factory.FromAsync(BeginAccept, EndAccept, this).ConfigureAwait(false);
+                }
+                // When the socket is closed, the FromAsync logic will try to call EndAccept on the socket,
+                // but that will throw an ObjectDisposedException. Only catch the exception if due to cancellation.
+                catch (ObjectDisposedException) when (token.IsCancellationRequested)
+                {
+                    // First check if the cancellation token caused the closing of the socket,
+                    // then rethrow the exception if it did not.
+                    token.ThrowIfCancellationRequested();
+
+                    Debug.Fail("Token should have thrown cancellation exception.");
+                    return null;
+                }
+            }
+        }
+
+        public void Bind(string path)
+        {
+            Bind(CreateUnixDomainSocketEndPoint(path));
+
+            _ownsSocketFile = true;
+            _path = path;
+        }
+
+        public void Connect(string path, TimeSpan timeout)
+        {
+            IAsyncResult result = BeginConnect(CreateUnixDomainSocketEndPoint(path), null, null);
+
+            if (result.AsyncWaitHandle.WaitOne(timeout))
+            {
+                EndConnect(result);
+
+                _ownsSocketFile = false;
+                _path = path;
+            }
+            else
+            {
+                Close(0);
+                throw new TimeoutException();
+            }
+        }
+
+        public async Task ConnectAsync(string path, CancellationToken token)
+        {
+            using (token.Register(() => Close(0)))
+            {
+                try
+                {
+                    Func<AsyncCallback, object, IAsyncResult> beginConnect = (callback, state) =>
+                    {
+                        return BeginConnect(CreateUnixDomainSocketEndPoint(path), callback, state);
+                    };
+                    await Task.Factory.FromAsync(beginConnect, EndConnect, this).ConfigureAwait(false);
+                }
+                // When the socket is closed, the FromAsync logic will try to call EndAccept on the socket,
+                // but that will throw an ObjectDisposedException. Only catch the exception if due to cancellation.
+                catch (ObjectDisposedException) when (token.IsCancellationRequested)
+                {
+                    // First check if the cancellation token caused the closing of the socket,
+                    // then rethrow the exception if it did not.
+                    token.ThrowIfCancellationRequested();
+                }
+            }
+        }
+
+        protected override void Dispose(bool disposing)
+        {
+            if (disposing)
+            {
+                if (_ownsSocketFile && !string.IsNullOrEmpty(_path) && File.Exists(_path))
+                {
+                    File.Delete(_path);
+                }
+            }
+            base.Dispose(disposing);
+        }
+
+        private static EndPoint CreateUnixDomainSocketEndPoint(string path)
+        {
+#if NETCOREAPP
+            return new UnixDomainSocketEndPoint(path);
+#elif NETSTANDARD2_0
+            // UnixDomainSocketEndPoint is not part of .NET Standard 2.0
+            var type = typeof(Socket).Assembly.GetType("System.Net.Sockets.UnixDomainSocketEndPoint");
+            if (type == null)
+            {
+                throw new PlatformNotSupportedException("Current process is not running a compatible .NET runtime.");
+            }
+            var ctor = type.GetConstructor(new[] { typeof(string) });
+            return (EndPoint)ctor.Invoke(new object[] { path });
+#endif
+        }
+    }
+}
index b9fb1b55e099fdbe640d9b8cf55e4b9e71f64e99..b80780a73ef35866c0792e83220d5c3636e1e629 100644 (file)
     <IncludeSymbols>true</IncludeSymbols>
     <IsShipping>true</IsShipping>
   </PropertyGroup>
+
+  <ItemGroup>
+    <InternalsVisibleTo Include="DotnetMonitor.UnitTests" />
+    <InternalsVisibleTo Include="Microsoft.Diagnostics.Monitoring" />
+    <InternalsVisibleTo Include="Microsoft.Diagnostics.NETCore.Client.UnitTests" />
+  </ItemGroup>
 </Project>
diff --git a/src/Microsoft.Diagnostics.NETCore.Client/NativeMethods.cs b/src/Microsoft.Diagnostics.NETCore.Client/NativeMethods.cs
new file mode 100644 (file)
index 0000000..3962bb4
--- /dev/null
@@ -0,0 +1,24 @@
+// Licensed to the .NET Foundation under one or more agreements.
+// The .NET Foundation licenses this file to you under the MIT license.
+// See the LICENSE file in the project root for more information.
+
+using System;
+using System.Runtime.InteropServices;
+using Microsoft.Win32.SafeHandles;
+
+namespace Microsoft.Diagnostics.NETCore.Client
+{
+    internal class NativeMethods
+    {
+        [DllImport("kernel32.dll", SetLastError = true)]
+        [return: MarshalAs(UnmanagedType.Bool)]
+        internal static extern bool PeekNamedPipe(
+            SafePipeHandle hNamedPipe,
+            byte[] lpBuffer,
+            int bufferSize,
+            IntPtr lpBytesRead,
+            IntPtr lpTotalBytesAvail,
+            IntPtr lpBytesLeftThisMessage
+            );
+    }
+}
diff --git a/src/Microsoft.Diagnostics.NETCore.Client/ReversedServer/IpcEndpointInfo.cs b/src/Microsoft.Diagnostics.NETCore.Client/ReversedServer/IpcEndpointInfo.cs
new file mode 100644 (file)
index 0000000..55b9ea9
--- /dev/null
@@ -0,0 +1,38 @@
+// Licensed to the .NET Foundation under one or more agreements.
+// The .NET Foundation licenses this file to you under the MIT license.
+// See the LICENSE file in the project root for more information.
+
+using System;
+using System.Diagnostics;
+
+namespace Microsoft.Diagnostics.NETCore.Client
+{
+    /// <summary>
+    /// Represents a runtine instance connection to a reversed diagnostics server.
+    /// </summary>
+    [DebuggerDisplay("PID={ProcessId}, Cookie={RuntimeInstanceCookie}")]
+    internal struct IpcEndpointInfo
+    {
+        internal IpcEndpointInfo(IpcEndpoint endpoint, int processId, Guid runtimeInstanceCookie)
+        {
+            Endpoint = endpoint;
+            ProcessId = processId;
+            RuntimeInstanceCookie = runtimeInstanceCookie;
+        }
+
+        /// <summary>
+        /// An endpoint used to retrieve diagnostic information from the associated runtime instance.
+        /// </summary>
+        public IpcEndpoint Endpoint { get; }
+
+        /// <summary>
+        /// The identifier of the process that is unique within its process namespace.
+        /// </summary>
+        public int ProcessId { get; }
+
+        /// <summary>
+        /// The unique identifier of the runtime instance.
+        /// </summary>
+        public Guid RuntimeInstanceCookie { get; }
+    }
+}
diff --git a/src/Microsoft.Diagnostics.NETCore.Client/ReversedServer/ReversedDiagnosticsServer.cs b/src/Microsoft.Diagnostics.NETCore.Client/ReversedServer/ReversedDiagnosticsServer.cs
new file mode 100644 (file)
index 0000000..8429869
--- /dev/null
@@ -0,0 +1,418 @@
+// Licensed to the .NET Foundation under one or more agreements.
+// The .NET Foundation licenses this file to you under the MIT license.
+// See the LICENSE file in the project root for more information.
+
+using System;
+using System.Collections.Generic;
+using System.Diagnostics;
+using System.IO;
+using System.IO.Pipes;
+using System.Net.Sockets;
+using System.Runtime.InteropServices;
+using System.Threading;
+using System.Threading.Tasks;
+
+namespace Microsoft.Diagnostics.NETCore.Client
+{
+    /// <summary>
+    /// Establishes server endpoint for runtime instances to connect when
+    /// configured to provide diagnostic endpoints in reverse mode.
+    /// </summary>
+    internal sealed class ReversedDiagnosticsServer : IDisposable
+    {
+        // Returns true if the handler is complete and should be removed from the list
+        delegate bool StreamHandler(Guid runtimeId, ref Stream stream);
+
+        // The amount of time to allow parsing of the advertise data before cancelling. This allows the server to
+        // remain responsive in case the advertise data is incomplete and the stream is not closed.
+        private static readonly TimeSpan ParseAdvertiseTimeout = TimeSpan.FromMilliseconds(250);
+
+        private readonly Dictionary<Guid, ServerIpcEndpoint> _cachedEndpoints = new Dictionary<Guid, ServerIpcEndpoint>();
+        private readonly Dictionary<Guid, Stream> _cachedStreams = new Dictionary<Guid, Stream>();
+        private readonly CancellationTokenSource _disposalSource = new CancellationTokenSource();
+        private readonly List<StreamHandler> _handlers = new List<StreamHandler>();
+        private readonly object _lock = new object();
+        private readonly IpcServerTransport _transport;
+
+        private bool _disposed = false;
+
+        /// <summary>
+        /// Constructs the <see cref="ReversedDiagnosticsServer"/> instance with an endpoint bound
+        /// to the location specified by <paramref name="transportPath"/>.
+        /// </summary>
+        /// <param name="transportPath">
+        /// The path of the server endpoint.
+        /// On Windows, this can be a full pipe path or the name without the "\\.\pipe\" prefix.
+        /// On all other systems, this must be the full file path of the socket.
+        /// </param>
+        public ReversedDiagnosticsServer(string transportPath)
+            : this(transportPath, MaxAllowedConnections)
+        {
+        }
+
+        /// <summary>
+        /// Constructs the <see cref="ReversedDiagnosticsServer"/> instance with an endpoint bound
+        /// to the location specified by <paramref name="transportPath"/>.
+        /// </summary>
+        /// <param name="transportPath">
+        /// The path of the server endpoint.
+        /// On Windows, this can be a full pipe path or the name without the "\\.\pipe\" prefix.
+        /// On all other systems, this must be the full file path of the socket.
+        /// </param>
+        /// <param name="maxConnections">The maximum number of connections the server will support.</param>
+        public ReversedDiagnosticsServer(string transportPath, int maxConnections)
+        {
+            _transport = IpcServerTransport.Create(transportPath, maxConnections);
+        }
+
+        public void Dispose()
+        {
+            if (!_disposed)
+            {
+                _disposalSource.Cancel();
+
+                lock (_lock)
+                {
+                    _cachedEndpoints.Clear();
+
+                    foreach (Stream stream in _cachedStreams.Values)
+                    {
+                        stream?.Dispose();
+                    }
+                    _cachedStreams.Clear();
+                }
+
+                _transport.Dispose();
+
+                _disposalSource.Dispose();
+
+                _disposed = true;
+            }
+        }
+
+        /// <summary>
+        /// Provides endpoint information when a new runtime instance connects to the server.
+        /// </summary>
+        /// <param name="token">The token to monitor for cancellation requests.</param>
+        /// <returns>A <see cref="IpcEndpointInfo"/> that contains information about the new runtime instance connection.</returns>
+        /// <remarks>
+        /// This will only provide endpoint information on the first time a runtime connects to the server.
+        /// If a connection is removed using <see cref="RemoveConnection(Guid)"/> and the same runtime instance,
+        /// reconnects after this call, then a new <see cref="IpcEndpointInfo"/> will be produced.
+        /// </remarks>
+        public async Task<IpcEndpointInfo> AcceptAsync(CancellationToken token)
+        {
+            VerifyNotDisposed();
+
+            while (true)
+            {
+                Stream stream = null;
+                IpcAdvertise advertise = null;
+                try
+                {
+                    stream = await _transport.AcceptAsync(token).ConfigureAwait(false);
+                }
+                catch (Exception ex) when (!(ex is OperationCanceledException))
+                {
+                    // The advertise data could be incomplete if the runtime shuts down before completely writing
+                    // the information. Catch the exception and continue waiting for a new connection.
+                }
+
+                token.ThrowIfCancellationRequested();
+
+                if (null != stream)
+                {
+                    // Cancel parsing of advertise data after timeout period to
+                    // mitigate runtimes that write partial data and do not close the stream (avoid waiting forever).
+                    using var parseCancellationSource = new CancellationTokenSource();
+                    using var linkedSource = CancellationTokenSource.CreateLinkedTokenSource(token, parseCancellationSource.Token);
+                    try
+                    {
+                        parseCancellationSource.CancelAfter(ParseAdvertiseTimeout);
+
+                        advertise = await IpcAdvertise.ParseAsync(stream, linkedSource.Token).ConfigureAwait(false);
+                    }
+                    catch (OperationCanceledException) when (parseCancellationSource.IsCancellationRequested)
+                    {
+                        // Only handle cancellation if it was due to the parse timeout.
+                    }
+                    catch (Exception ex) when (!(ex is OperationCanceledException))
+                    {
+                        // Catch all other exceptions and continue waiting for a new connection.
+                    }
+                }
+
+                token.ThrowIfCancellationRequested();
+
+                if (null != advertise)
+                {
+                    Guid runtimeCookie = advertise.RuntimeInstanceCookie;
+                    int pid = unchecked((int)advertise.ProcessId);
+
+                    lock (_lock)
+                    {
+                        ProvideStream(runtimeCookie, stream);
+                        // Consumers should hold onto the endpoint info and use it for diagnostic communication,
+                        // regardless of the number of times the same runtime instance connects. This requires consumers
+                        // to continuously invoke the AcceptAsync method in order to handle runtime instance reconnects,
+                        // even if the consumer only wants to handle a single endpoint.
+                        if (!_cachedEndpoints.ContainsKey(runtimeCookie))
+                        {
+                            ServerIpcEndpoint endpoint = new ServerIpcEndpoint(this, runtimeCookie);
+                            _cachedEndpoints.Add(runtimeCookie, endpoint);
+                            return new IpcEndpointInfo(endpoint, pid, runtimeCookie);
+                        }
+                    }
+                }
+
+                token.ThrowIfCancellationRequested();
+            }
+        }
+
+        /// <summary>
+        /// Removes endpoint information from the server so that it is no longer tracked.
+        /// </summary>
+        /// <param name="runtimeCookie">The runtime instance cookie that corresponds to the endpoint to be removed.</param>
+        /// <returns>True if the endpoint existed and was removed; otherwise false.</returns>
+        public bool RemoveConnection(Guid runtimeCookie)
+        {
+            VerifyNotDisposed();
+
+            bool endpointExisted = false;
+            Stream previousStream = null;
+
+            lock (_lock)
+            {
+                endpointExisted = _cachedEndpoints.Remove(runtimeCookie);
+                if (endpointExisted)
+                {
+                    if (_cachedStreams.TryGetValue(runtimeCookie, out previousStream))
+                    {
+                        _cachedStreams.Remove(runtimeCookie);
+                    }
+                }
+            }
+
+            previousStream?.Dispose();
+
+            return endpointExisted;
+        }
+
+        private void VerifyNotDisposed()
+        {
+            if (_disposed)
+            {
+                throw new ObjectDisposedException(nameof(ReversedDiagnosticsServer));
+            }
+        }
+
+        /// <remarks>
+        /// This will block until the diagnostic stream is provided. This block can happen if
+        /// the stream is acquired previously and the runtime instance has not yet reconnected
+        /// to the reversed diagnostics server.
+        /// </remarks>
+        internal Stream Connect(Guid runtimeId, TimeSpan timeout)
+        {
+            VerifyNotDisposed();
+
+            const int StreamStatePending = 0;
+            const int StreamStateComplete = 1;
+            const int StreamStateCancelled = 2;
+            const int StreamStateDisposed = 3;
+
+            // CancellationTokenSource is used to trigger the timeout path in order to avoid inadvertently consuming
+            // the stream via the handler while processing the timeout after failing to wait for the stream event
+            // to be signaled within the timeout period. The source of truth of whether the stream was consumed or
+            // whether the timeout occurred is captured by the streamState variable.
+            Stream stream = null;
+            int streamState = StreamStatePending;
+            using var streamEvent = new ManualResetEvent(false);
+            var cancellationSource = new CancellationTokenSource();
+
+            bool TrySetStream(int state, Stream value)
+            {
+                if (StreamStatePending == Interlocked.CompareExchange(ref streamState, state, 0))
+                {
+                    stream = value;
+                    streamEvent.Set();
+                    return true;
+                }
+                return false;
+            }
+
+            using var methodRegistration = cancellationSource.Token.Register(() => TrySetStream(StreamStateCancelled, value: null));
+            using var disposalRegistration = _disposalSource.Token.Register(() => TrySetStream(StreamStateDisposed, value: null));
+
+            RegisterHandler(runtimeId, (Guid id, ref Stream cachedStream) =>
+            {
+                if (id != runtimeId)
+                {
+                    return false;
+                }
+
+                if (TrySetStream(StreamStateComplete, cachedStream))
+                {
+                    cachedStream = null;
+                }
+
+                // Regardless of the registrant previously waiting or cancelled,
+                // the handler should be removed from consideration.
+                return true;
+            });
+
+            cancellationSource.CancelAfter(timeout);
+            streamEvent.WaitOne();
+
+            if (StreamStateCancelled == streamState)
+            {
+                throw new TimeoutException();
+            }
+
+            if (StreamStateDisposed == streamState)
+            {
+                throw new ObjectDisposedException(nameof(ReversedDiagnosticsServer));
+            }
+
+            return stream;
+        }
+
+        internal async Task WaitForConnectionAsync(Guid runtimeId, CancellationToken token)
+        {
+            VerifyNotDisposed();
+
+            var hasConnectedStreamSource = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
+            using var methodRegistration = token.Register(() => hasConnectedStreamSource.TrySetCanceled(token));
+            using var disposalRegistration = _disposalSource.Token.Register(
+                () => hasConnectedStreamSource.TrySetException(new ObjectDisposedException(nameof(ReversedDiagnosticsServer))));
+
+            RegisterHandler(runtimeId, (Guid id, ref Stream cachedStream) =>
+            {
+                if (runtimeId != id)
+                {
+                    return false;
+                }
+
+                // Check if the registrant was already finished.
+                if (hasConnectedStreamSource.Task.IsCompleted)
+                {
+                    return true;
+                }
+
+                if (!TestStream(cachedStream))
+                {
+                    cachedStream.Dispose();
+                    cachedStream = null;
+                    return false;
+                }
+
+                // Found a stream that is valid; signal completion if possible.
+                hasConnectedStreamSource.TrySetResult(true);
+
+                // Regardless of the registrant previously waiting or cancelled,
+                // the handler should be removed from consideration.
+                return true;
+            });
+           
+            // Wait for the handler to verify we have a connected stream
+            await hasConnectedStreamSource.Task.ConfigureAwait(false);
+        }
+
+        private void ProvideStream(Guid runtimeId, Stream stream)
+        {
+            Debug.Assert(Monitor.IsEntered(_lock));
+
+            // Get the previous stream in order to dispose it later
+            _cachedStreams.TryGetValue(runtimeId, out Stream previousStream);
+
+            RunStreamHandlers(runtimeId, stream);
+
+            // Dispose the previous stream if there was one.
+            previousStream?.Dispose();
+        }
+
+        private void RunStreamHandlers(Guid runtimeId, Stream stream)
+        {
+            Debug.Assert(Monitor.IsEntered(_lock));
+
+            // If there are any handlers waiting for a stream, provide
+            // it to the first handler in the queue.
+            for (int i = 0; (i < _handlers.Count) && (null != stream); i++)
+            {
+                StreamHandler handler = _handlers[i];
+                if (handler(runtimeId, ref stream))
+                {
+                    _handlers.RemoveAt(i);
+                    i--;
+                }
+            }
+
+            // Store the stream for when a handler registers later. If
+            // a handler already captured the stream, this will be null, thus
+            // representing that no existing stream is waiting to be consumed.
+            _cachedStreams[runtimeId] = stream;
+        }
+
+        private bool TestStream(Stream stream)
+        {
+            if (null == stream)
+            {
+                throw new ArgumentNullException(nameof(stream));
+            }
+
+            if (stream is ExposedSocketNetworkStream networkStream)
+            {
+                // Update Connected state of socket by sending non-blocking zero-byte data.
+                Socket socket = networkStream.Socket;
+                bool blocking = socket.Blocking;
+                try
+                {
+                    socket.Blocking = false;
+                    socket.Send(Array.Empty<byte>(), 0, SocketFlags.None);
+                }
+                catch (Exception)
+                {
+                }
+                finally
+                {
+                    socket.Blocking = blocking;
+                }
+                return socket.Connected;
+            }
+            else if (stream is PipeStream pipeStream)
+            {
+                Debug.Assert(RuntimeInformation.IsOSPlatform(OSPlatform.Windows), "Pipe stream should only be used on Windows.");
+
+                // PeekNamedPipe will return false if the pipe is disconnected/broken.
+                return NativeMethods.PeekNamedPipe(
+                    pipeStream.SafePipeHandle,
+                    null,
+                    0,
+                    IntPtr.Zero,
+                    IntPtr.Zero,
+                    IntPtr.Zero);
+            }
+
+            return false;
+        }
+
+        private void RegisterHandler(Guid runtimeId, StreamHandler handler)
+        {
+            lock (_lock)
+            {
+                if (!_cachedStreams.TryGetValue(runtimeId, out Stream stream))
+                {
+                    throw new InvalidOperationException($"Runtime instance with identifier '{runtimeId}' is not registered.");
+                }
+
+                _handlers.Add(handler);
+
+                if (stream != null)
+                {
+                    RunStreamHandlers(runtimeId, stream);
+                }
+            }
+        }
+
+        public static int MaxAllowedConnections = IpcServerTransport.MaxAllowedConnections;
+    }
+}
index d4a69180762fe41bc0fcbe1e7a02e659e9e6b0f9..d2a90e896becae0492ae7d2ab5cdf73c11467a6e 100644 (file)
@@ -2,21 +2,18 @@
 // The .NET Foundation licenses this file to you under the MIT license.
 // See the LICENSE file in the project root for more information.
 
-using Microsoft.AspNetCore;
-using Microsoft.AspNetCore.Hosting;
-using Microsoft.Diagnostics.Monitoring;
-using Microsoft.Diagnostics.Monitoring.RestServer;
-using Microsoft.Extensions.Configuration;
-using Microsoft.Extensions.DependencyInjection;
-using Microsoft.Extensions.Logging;
 using System;
 using System.Collections.Generic;
 using System.CommandLine;
-using System.Globalization;
-using System.IO;
 using System.Linq;
 using System.Threading;
 using System.Threading.Tasks;
+using Microsoft.AspNetCore;
+using Microsoft.AspNetCore.Hosting;
+using Microsoft.Diagnostics.Monitoring;
+using Microsoft.Diagnostics.Monitoring.RestServer;
+using Microsoft.Extensions.Configuration;
+using Microsoft.Extensions.DependencyInjection;
 
 namespace Microsoft.Diagnostics.Tools.Monitor
 {
@@ -25,15 +22,15 @@ namespace Microsoft.Diagnostics.Tools.Monitor
         private const string ConfigPrefix = "DotnetMonitor_";
         private const string ConfigPath = "/etc/dotnet-monitor";
 
-        public async Task<int> Start(CancellationToken token, IConsole console, string[] urls, string[] metricUrls, bool metrics)
+        public async Task<int> Start(CancellationToken token, IConsole console, string[] urls, string[] metricUrls, bool metrics, string reversedServerAddress)
         {
             //CONSIDER The console logger uses the standard AddConsole, and therefore disregards IConsole.
-            using IWebHost host = CreateWebHostBuilder(console, urls, metricUrls, metrics).Build();
+            using IWebHost host = CreateWebHostBuilder(console, urls, metricUrls, metrics, reversedServerAddress).Build();
             await host.RunAsync(token);
             return 0;
         }
 
-        public IWebHostBuilder CreateWebHostBuilder(IConsole console, string[] urls, string[] metricUrls, bool metrics)
+        public IWebHostBuilder CreateWebHostBuilder(IConsole console, string[] urls, string[] metricUrls, bool metrics, string reversedServerAddress)
         {
             if (metrics)
             {
@@ -53,6 +50,7 @@ namespace Microsoft.Diagnostics.Tools.Monitor
                 })
                 .ConfigureServices((WebHostBuilderContext context, IServiceCollection services) =>
                 {
+                    services.AddEndpointInfoSource(reversedServerAddress);
                     //TODO Many of these service additions should be done through extension methods
                     services.AddSingleton<IDiagnosticServices, DiagnosticServices>();
                     if (metrics)
index 527674386542d6b400510d8d95f5fd2227d5383b..eac45a3a175ebdde91ba66fb5177e2e8949156da 100644 (file)
@@ -2,16 +2,14 @@
 // The .NET Foundation licenses this file to you under the MIT license.
 // See the LICENSE file in the project root for more information.
 
-using Microsoft.Diagnostics.Monitoring;
-using Microsoft.Tools.Common;
 using System;
-using System.Collections.Generic;
 using System.CommandLine;
 using System.CommandLine.Builder;
 using System.CommandLine.Invocation;
-using System.IO;
 using System.Threading;
 using System.Threading.Tasks;
+using Microsoft.Diagnostics.Monitoring;
+using Microsoft.Tools.Common;
 
 namespace Microsoft.Diagnostics.Tools.Monitor
 {
@@ -31,8 +29,8 @@ namespace Microsoft.Diagnostics.Tools.Monitor
                   description: "Monitor logs and metrics in a .NET application send the results to a chosen destination.")
               {
                 // Handler
-                CommandHandler.Create<CancellationToken, IConsole, string[], string[], bool>(new DiagnosticsMonitorCommandHandler().Start),
-                Urls(), MetricUrls(), ProvideMetrics()
+                CommandHandler.Create<CancellationToken, IConsole, string[], string[], bool, string>(new DiagnosticsMonitorCommandHandler().Start),
+                Urls(), MetricUrls(), ProvideMetrics(), ReversedServerAddress()
               };
 
         private static Option Urls() =>
@@ -59,6 +57,14 @@ namespace Microsoft.Diagnostics.Tools.Monitor
                 Argument = new Argument<bool>(name: "metrics", defaultValue: true )
             };
 
+        private static Option ReversedServerAddress() =>
+            new Option(
+                alias: "--reversed-server-address",
+                description: "A fully qualified path and filename for the OS transport to communicate over.")
+            {
+                Argument = new Argument<string>(name: "reversedServerAddress")
+            };
+
         private static string GetDefaultMetricsEndpoint()
         {
             string endpoint = "http://localhost:52325";
index 85f34f150832d179c62cc7ac94efeffcf9c6fce4..33e5533e22b686017a5c3f44b39d41f063bde2a4 100644 (file)
@@ -15,11 +15,13 @@ namespace Microsoft.Diagnostics.NETCore.Client
                 "..\\..\\..\\..\\..\\.dotnet\\dotnet.exe") : 
             "../../../../../.dotnet/dotnet";
         
-        public static string GetTraceePath(string traceeName = "Tracee")
+        public static string GetTraceePath(string traceeName = "Tracee", string targetFramework = "netcoreapp3.1")
         {
             var curPath = Directory.GetCurrentDirectory();
 ;
-            var traceePath = curPath.Replace(System.Reflection.Assembly.GetCallingAssembly().GetName().Name, traceeName);
+            var traceePath = curPath
+                .Replace(System.Reflection.Assembly.GetCallingAssembly().GetName().Name, traceeName)
+                .Replace("netcoreapp3.1", targetFramework);
 
             return Path.Combine(traceePath, Path.ChangeExtension(traceeName, ".dll"));
         }
index f538ed41140d78f7a0e7f82d1a261e0d22a74752..f44cf0da61f04a315e607562cfaced81d680df8f 100644 (file)
@@ -6,6 +6,7 @@ using System;
 using System.Collections.Generic;
 using System.Runtime.InteropServices;
 using System.Threading;
+using System.Threading.Tasks;
 using Xunit;
 using Xunit.Abstractions;
 
@@ -79,7 +80,7 @@ namespace Microsoft.Diagnostics.NETCore.Client
         }
 
         [Fact]
-        public void CheckSpecificProcessTest()
+        public async Task WaitForConnectionTest()
         {
             TestRunner runner = new TestRunner(CommonHelper.GetTraceePath(), output);
             runner.Start(3000);
@@ -89,9 +90,15 @@ namespace Microsoft.Diagnostics.NETCore.Client
             }
 
             var client = new DiagnosticsClient(runner.Pid);
-            Assert.True(client.CheckTransport(), $"Unable to verify diagnostics transport for test process {runner.Pid}.");
-
-            runner.Stop();
+            using var timeoutSource = new CancellationTokenSource(TimeSpan.FromMilliseconds(250));
+            try
+            {
+                await client.WaitForConnectionAsync(timeoutSource.Token);
+            }
+            finally
+            {
+                runner.Stop();
+            }
         }
     }
 }
index 4d449f7131424cff617e1950b170a92f6a444d4a..ab5762b7b8241de31956bb05d9d90a5beb29e4ee 100644 (file)
@@ -8,6 +8,11 @@
     <ProjectReference Include="../../Microsoft.Diagnostics.NETCore.Client/Microsoft.Diagnostics.NETCore.Client.csproj" />
     <ProjectReference Include="../../Microsoft.Diagnostics.TestHelpers/Microsoft.Diagnostics.TestHelpers.csproj" />
     <ProjectReference Include="../Tracee/Tracee.csproj" PrivateAssets="all" />
-    <PackageReference Include="Microsoft.Diagnostics.Tracing.TraceEvent" Version="2.0.47" />
+  </ItemGroup>
+  <ItemGroup>
+    <PackageReference Include="Microsoft.Diagnostics.Tracing.TraceEvent" Version="$(MicrosoftDiagnosticsTracingTraceEventVersion)" />
+  </ItemGroup>
+  <ItemGroup>
+    <InternalsVisibleTo Include="DotnetMonitor.UnitTests" />
   </ItemGroup>
 </Project>
diff --git a/src/tests/Microsoft.Diagnostics.NETCore.Client/ReversedServerHelper.cs b/src/tests/Microsoft.Diagnostics.NETCore.Client/ReversedServerHelper.cs
new file mode 100644 (file)
index 0000000..063ce34
--- /dev/null
@@ -0,0 +1,51 @@
+// Licensed to the .NET Foundation under one or more agreements.
+// The .NET Foundation licenses this file to you under the MIT license.
+// See the LICENSE file in the project root for more information.
+
+using System.IO;
+using System.Runtime.InteropServices;
+using Xunit.Abstractions;
+
+namespace Microsoft.Diagnostics.NETCore.Client
+{
+    internal static class ReversedServerHelper
+    {
+        /// <summary>
+        /// Creates a unique server name to avoid collisions from simultaneous running tests
+        /// or potentially abandoned socket files.
+        /// </summary>
+        public static string CreateServerTransportName()
+        {
+            string transportName = "DOTNET_DIAGSERVER_TESTS_" + Path.GetRandomFileName();
+            if (RuntimeInformation.IsOSPlatform(OSPlatform.Windows))
+            {
+                return transportName;
+            }
+            else
+            {
+                return Path.Combine(Path.GetTempPath(), transportName);
+            }
+        }
+
+        /// <summary>
+        /// Starts the Tracee executable while enabling connection to reverse diagnostics server.
+        /// </summary>
+        public static TestRunner StartTracee(ITestOutputHelper _outputHelper, string transportName)
+        {
+            var runner = new TestRunner(CommonHelper.GetTraceePath(targetFramework: "net5.0"), _outputHelper);
+            runner.AddReversedServer(transportName);
+            runner.Start();
+            return runner;
+        }
+
+        public static void AddReversedServer(this TestRunner runner, string transportName)
+        {
+            runner.AddEnvVar("DOTNET_DiagnosticsMonitorAddress", transportName);
+        }
+
+        public static string ToTestString(this IpcEndpointInfo info)
+        {
+            return $"PID={info.ProcessId}, COOKIE={info.RuntimeInstanceCookie}";
+        }
+    }
+}
diff --git a/src/tests/Microsoft.Diagnostics.NETCore.Client/ReversedServerTests.cs b/src/tests/Microsoft.Diagnostics.NETCore.Client/ReversedServerTests.cs
new file mode 100644 (file)
index 0000000..27ab0f6
--- /dev/null
@@ -0,0 +1,448 @@
+// Licensed to the .NET Foundation under one or more agreements.
+// The .NET Foundation licenses this file to you under the MIT license.
+// See the LICENSE file in the project root for more information.
+
+using System;
+using System.Collections.Generic;
+using System.Diagnostics.Tracing;
+using System.Threading;
+using System.Threading.Tasks;
+using Microsoft.Diagnostics.Tracing;
+using Xunit;
+using Xunit.Abstractions;
+
+namespace Microsoft.Diagnostics.NETCore.Client
+{
+    public class ReversedServerTests
+    {
+        private readonly ITestOutputHelper _outputHelper;
+
+        public ReversedServerTests(ITestOutputHelper outputHelper)
+        {
+            _outputHelper = outputHelper;
+        }
+
+        /// <summary>
+        /// Tests that server throws appropriate exceptions when disposed.
+        /// </summary>
+        [Fact]
+        public async Task ReversedServerDisposeTest()
+        {
+            var server = StartReversedServer(out string transportName);
+
+            using CancellationTokenSource cancellation = new CancellationTokenSource(TimeSpan.FromSeconds(1));
+            Task acceptTask = server.AcceptAsync(cancellation.Token);
+
+            // Validate server surface throws after disposal
+            server.Dispose();
+
+            // Pending tasks should be cancelled and throw TaskCanceledException
+            await Assert.ThrowsAnyAsync<OperationCanceledException>(() => acceptTask);
+            Assert.True(acceptTask.IsCanceled);
+
+            // Calls after dispose should throw ObjectDisposedException
+            await Assert.ThrowsAsync<ObjectDisposedException>(
+                () => server.AcceptAsync(cancellation.Token));
+
+            Assert.Throws<ObjectDisposedException>(
+                () => server.RemoveConnection(Guid.Empty));
+        }
+
+        /// <summary>
+        /// Tests that <see cref="ReversedDiagnosticsServer.AcceptAsync(CancellationToken)"/> does not complete
+        /// when no connections are available and that cancellation will move the returned task to the cancelled state.
+        /// </summary>
+        [Fact]
+        public async Task ReversedServerAcceptAsyncYieldsTest()
+        {
+            using var server = StartReversedServer(out string transportName);
+
+            using var cancellationSource = new CancellationTokenSource(TimeSpan.FromSeconds(1));
+
+            _outputHelper.WriteLine("Waiting for connection from server.");
+            Task acceptTask = server.AcceptAsync(cancellationSource.Token);
+
+            await Assert.ThrowsAnyAsync<OperationCanceledException>(() => acceptTask);
+            Assert.True(acceptTask.IsCanceled);
+        }
+
+        /// <summary>
+        /// Tests that invoking server methods with non-existing runtime identifier appropriately fail.
+        /// </summary>
+        [Fact]
+        public async Task ReversedServerNonExistingRuntimeIdentifierTest()
+        {
+            using var server = StartReversedServer(out string transportName);
+
+            Guid nonExistingRuntimeId = Guid.NewGuid();
+
+            using CancellationTokenSource cancellation = new CancellationTokenSource(TimeSpan.FromSeconds(1));
+
+            _outputHelper.WriteLine($"Testing {nameof(ReversedDiagnosticsServer.WaitForConnectionAsync)}");
+            await Assert.ThrowsAsync<InvalidOperationException>(
+                () => server.WaitForConnectionAsync(nonExistingRuntimeId, cancellation.Token));
+
+            _outputHelper.WriteLine($"Testing {nameof(ReversedDiagnosticsServer.Connect)}");
+            Assert.Throws<InvalidOperationException>(
+                () => server.Connect(nonExistingRuntimeId, TimeSpan.FromSeconds(1)));
+
+            _outputHelper.WriteLine($"Testing {nameof(ReversedDiagnosticsServer.RemoveConnection)}");
+            Assert.False(server.RemoveConnection(nonExistingRuntimeId), "Removal of nonexisting connection should fail.");
+        }
+
+        /// <summary>
+        /// Tests that a single client can connect to server, diagnostics can occur,
+        /// and multiple use of a single DiagnosticsClient is allowed.
+        /// </summary>
+        /// <remarks>
+        /// The multiple use of a single client is important in the reverse scenario
+        /// because of how the endpoint is updated with new stream information each
+        /// time the target process reconnects to the server.
+        /// </remarks>
+        [Fact]
+        public async Task ReversedServerSingleTargetMultipleUseClientTest()
+        {
+            using var server = StartReversedServer(out string transportName);
+            await using var accepter = new EndpointInfoAccepter(server, _outputHelper);
+
+            TestRunner runner = null;
+            IpcEndpointInfo info;
+            try
+            {
+                // Start client pointing to diagnostics server
+                runner = StartTracee(transportName);
+
+                info = await AcceptAsync(accepter);
+
+                await VerifyEndpointInfo(runner, info);
+
+                // There should not be any new endpoint infos
+                await VerifyNoNewEndpointInfos(accepter);
+
+                ResumeRuntime(info);
+
+                await VerifySingleSession(info);
+            }
+            finally
+            {
+                _outputHelper.WriteLine("Stopping tracee.");
+                runner?.Stop();
+            }
+
+            // Wait some time for the process to exit
+            await Task.Delay(TimeSpan.FromSeconds(1));
+
+            // Process exited so the endpoint should not have a valid transport anymore.
+            await VerifyWaitForConnection(info, expectValid: false);
+
+            Assert.True(server.RemoveConnection(info.RuntimeInstanceCookie), "Expected to be able to remove connection from server.");
+
+            // There should not be any more endpoint infos
+            await VerifyNoNewEndpointInfos(accepter);
+        }
+
+        /// <summary>
+        /// Tests that a DiagnosticsClient is not viable after target exists.
+        /// </summary>
+        [Fact]
+        public async Task ReversedServerSingleTargetExitsClientInviableTest()
+        {
+            using var server = StartReversedServer(out string transportName);
+            await using var accepter = new EndpointInfoAccepter(server, _outputHelper);
+
+            TestRunner runner = null;
+            IpcEndpointInfo info;
+            try
+            {
+                // Start client pointing to diagnostics server
+                runner = StartTracee(transportName);
+
+                // Get client connection
+                info = await AcceptAsync(accepter);
+
+                await VerifyEndpointInfo(runner, info);
+
+                // There should not be any new endpoint infos
+                await VerifyNoNewEndpointInfos(accepter);
+
+                ResumeRuntime(info);
+
+                await VerifyWaitForConnection(info);
+            }
+            finally
+            {
+                _outputHelper.WriteLine("Stopping tracee.");
+                runner?.Stop();
+            }
+
+            // Wait some time for the process to exit
+            await Task.Delay(TimeSpan.FromSeconds(1));
+
+            // Process exited so the endpoint should not have a valid transport anymore.
+            await VerifyWaitForConnection(info, expectValid: false);
+
+            Assert.True(server.RemoveConnection(info.RuntimeInstanceCookie), "Expected to be able to remove connection from server.");
+
+            // There should not be any more endpoint infos
+            await VerifyNoNewEndpointInfos(accepter);
+        }
+
+        private ReversedDiagnosticsServer StartReversedServer(out string transportName)
+        {
+            transportName = ReversedServerHelper.CreateServerTransportName();
+            _outputHelper.WriteLine("Starting reversed server at '" + transportName + "'.");
+            return new ReversedDiagnosticsServer(transportName);
+        }
+
+        private async Task<IpcEndpointInfo> AcceptAsync(EndpointInfoAccepter accepter)
+        {
+            using (var cancellationSource = new CancellationTokenSource(TimeSpan.FromSeconds(3)))
+            {
+                return await accepter.AcceptAsync(cancellationSource.Token);
+            }
+        }
+
+        private TestRunner StartTracee(string transportName)
+        {
+            _outputHelper.WriteLine("Starting tracee.");
+            return ReversedServerHelper.StartTracee(_outputHelper, transportName);
+        }
+
+        private static EventPipeProvider CreateProvider(string name)
+        {
+            return new EventPipeProvider(name, EventLevel.Verbose, (long)EventKeywords.All);
+        }
+
+        private async Task VerifyWaitForConnection(IpcEndpointInfo info, bool expectValid = true)
+        {
+            using var connectionCancellation = new CancellationTokenSource(TimeSpan.FromSeconds(1));
+            if (expectValid)
+            {
+                await info.Endpoint.WaitForConnectionAsync(connectionCancellation.Token);
+            }
+            else
+            {
+                await Assert.ThrowsAsync<TaskCanceledException>(
+                    () => info.Endpoint.WaitForConnectionAsync(connectionCancellation.Token));
+            }
+        }
+
+        /// <summary>
+        /// Checks that the accepter does not provide a new endpoint info.
+        /// </summary>
+        private async Task VerifyNoNewEndpointInfos(EndpointInfoAccepter accepter)
+        {
+            _outputHelper.WriteLine("Verifying there are no more connections.");
+
+            using var cancellationSource = new CancellationTokenSource(TimeSpan.FromSeconds(1));
+
+            Task acceptTask = accepter.AcceptAsync(cancellationSource.Token);
+            await Assert.ThrowsAsync<OperationCanceledException>(() => acceptTask);
+            Assert.True(acceptTask.IsCanceled);
+
+            _outputHelper.WriteLine("Verified there are no more connections.");
+        }
+
+        /// <summary>
+        /// Verifies basic information on the endpoint info and that it matches the target process from the runner.
+        /// </summary>
+        private async Task VerifyEndpointInfo(TestRunner runner, IpcEndpointInfo info, bool expectValid = true)
+        {
+            _outputHelper.WriteLine($"Verifying connection information for process ID {runner.Pid}.");
+            Assert.NotNull(runner);
+            Assert.Equal(runner.Pid, info.ProcessId);
+            Assert.NotEqual(Guid.Empty, info.RuntimeInstanceCookie);
+            Assert.NotNull(info.Endpoint);
+
+            await VerifyWaitForConnection(info, expectValid);
+
+            _outputHelper.WriteLine($"Connection: {info.ToTestString()}");
+        }
+
+        private void ResumeRuntime(IpcEndpointInfo info)
+        {
+            var client = new DiagnosticsClient(info.Endpoint);
+
+            _outputHelper.WriteLine($"{info.RuntimeInstanceCookie}: Resuming runtime instance.");
+            try
+            {
+                client.ResumeRuntime();
+                _outputHelper.WriteLine($"{info.RuntimeInstanceCookie}: Resumed successfully.");
+            }
+            catch (ServerErrorException ex)
+            {
+                // Runtime likely does not understand the ResumeRuntime command.
+                _outputHelper.WriteLine($"{info.RuntimeInstanceCookie}: {ex.Message}");
+            }
+        }
+
+        /// <summary>
+        /// Verifies that a client can handle multiple operations simultaneously.
+        /// </summary>
+        private async Task VerifySingleSession(IpcEndpointInfo info)
+        {
+            await VerifyWaitForConnection(info);
+
+            var client = new DiagnosticsClient(info.Endpoint);
+
+            _outputHelper.WriteLine($"{info.RuntimeInstanceCookie}: Creating session #1.");
+            var providers = new List<EventPipeProvider>();
+            providers.Add(new EventPipeProvider(
+                "System.Runtime",
+                EventLevel.Informational,
+                0,
+                new Dictionary<string, string>() {
+                    { "EventCounterIntervalSec", "1" }
+                }));
+            using var session = client.StartEventPipeSession(providers);
+
+            _outputHelper.WriteLine($"{info.RuntimeInstanceCookie}: Verifying session produces events.");
+            await VerifyEventStreamProvidesEventsAsync(info, session, 1);
+
+            _outputHelper.WriteLine($"{info.RuntimeInstanceCookie}: Session verification complete.");
+        }
+
+        /// <summary>
+        /// Verifies that an event stream does provide events.
+        /// </summary>
+        private Task VerifyEventStreamProvidesEventsAsync(IpcEndpointInfo info, EventPipeSession session, int sessionNumber)
+        {
+            Assert.NotNull(session);
+            Assert.NotNull(session.EventStream);
+
+            return Task.Run(async () =>
+            {
+                _outputHelper.WriteLine($"{info.RuntimeInstanceCookie}: Session #{sessionNumber} - Creating event source.");
+
+                // This blocks for a while due to this bug: https://github.com/microsoft/perfview/issues/1172
+                using var eventSource = new EventPipeEventSource(session.EventStream);
+
+                _outputHelper.WriteLine($"{info.RuntimeInstanceCookie}: Session #{sessionNumber} - Setup event handlers.");
+
+                // Create task completion source that is completed when any events are provided; cancel it if cancellation is requested
+                var receivedEventsSource = new TaskCompletionSource<object>(TaskCreationOptions.RunContinuationsAsynchronously);
+
+                using var cancellation = new CancellationTokenSource(TimeSpan.FromMinutes(1));
+                using var _ = cancellation.Token.Register(() =>
+                {
+                    if (receivedEventsSource.TrySetCanceled())
+                    {
+                        _outputHelper.WriteLine($"{info.RuntimeInstanceCookie}: Session #{sessionNumber} - Cancelled event processing.");
+                    }
+                });
+
+                // Create continuation task that stops the session (which immediately stops event processing).
+                Task stoppedProcessingTask = receivedEventsSource.Task
+                    .ContinueWith(_ =>
+                    {
+                        _outputHelper.WriteLine($"{info.RuntimeInstanceCookie}: Session #{sessionNumber} - Stopping session.");
+                        session.Stop();
+                    });
+
+                // Signal task source when an event is received.
+                Action<TraceEvent> allEventsHandler = _ =>
+                {
+                    if (receivedEventsSource.TrySetResult(null))
+                    {
+                        _outputHelper.WriteLine($"{info.RuntimeInstanceCookie}: Session #{sessionNumber} - Received an event and set result on completion source.");
+                    }
+                };
+
+                _outputHelper.WriteLine($"{info.RuntimeInstanceCookie}: Session #{sessionNumber} - Start processing events.");
+                eventSource.Dynamic.All += allEventsHandler;
+                eventSource.Process();
+                eventSource.Dynamic.All -= allEventsHandler;
+                _outputHelper.WriteLine($"{info.RuntimeInstanceCookie}: Session #{sessionNumber} - Stopped processing events.");
+
+                // Wait on the task source to verify if it ran to completion or was cancelled.
+                await receivedEventsSource.Task;
+
+                _outputHelper.WriteLine($"{info.RuntimeInstanceCookie}: Session #{sessionNumber} - Waiting for session to stop.");
+                await stoppedProcessingTask;
+            });
+        }
+
+        /// <summary>
+        /// Helper class for consuming endpoint infos from the reverse diagnostics server.
+        /// </summary>
+        /// <remarks>
+        /// The diagnostics server requires that something is continuously attempting to accept endpoint infos
+        /// in order to process incoming connections. This helps facilitate that continuous accepting of
+        /// endpoint infos so the individual tests don't have to know about the behavior. 
+        /// </remarks>
+        private class EndpointInfoAccepter : IAsyncDisposable
+        {
+            private readonly CancellationTokenSource _cancellation = new CancellationTokenSource();
+            private readonly Queue<IpcEndpointInfo> _connections = new Queue<IpcEndpointInfo>();
+            private readonly SemaphoreSlim _connectionsSemaphore = new SemaphoreSlim(0);
+            private readonly Task _listenTask;
+            private readonly ITestOutputHelper _outputHelper;
+            private readonly ReversedDiagnosticsServer _server;
+
+            private int _acceptedCount;
+            private bool _disposed;
+
+            public EndpointInfoAccepter(ReversedDiagnosticsServer server, ITestOutputHelper outputHelper)
+            {
+                _server = server;
+                _outputHelper = outputHelper;
+
+                _listenTask = ListenAsync(_cancellation.Token);
+            }
+
+            public async ValueTask DisposeAsync()
+            {
+                if (!_disposed)
+                {
+                    _cancellation.Cancel();
+
+                    await _listenTask;
+
+                    _cancellation.Dispose();
+
+                    _disposed = true;
+                }
+            }
+
+            public async Task<IpcEndpointInfo> AcceptAsync(CancellationToken token)
+            {
+                using var linkedSource = CancellationTokenSource.CreateLinkedTokenSource(token, _cancellation.Token);
+
+                _outputHelper.WriteLine("Waiting for connection from accepter.");
+                await _connectionsSemaphore.WaitAsync(linkedSource.Token).ConfigureAwait(false);
+                _outputHelper.WriteLine("Received connection from accepter.");
+
+                return _connections.Dequeue();
+            }
+
+            /// <summary>
+            /// Continuously accept endpoint infos from the reversed diagnostics server so
+            /// that <see cref="ReversedDiagnosticsServer.AcceptAsync(CancellationToken)"/>
+            /// is always awaited in order to to handle new runtime instance connections
+            /// as well as existing runtime instance reconnections.
+            /// </summary>
+            private async Task ListenAsync(CancellationToken token)
+            {
+                while (!token.IsCancellationRequested)
+                {
+                    IpcEndpointInfo info;
+                    try
+                    {
+                        _outputHelper.WriteLine("Waiting for connection from server.");
+                        info = await _server.AcceptAsync(token).ConfigureAwait(false);
+
+                        _acceptedCount++;
+                        _outputHelper.WriteLine($"Accepted connection #{_acceptedCount} from server: {info.ToTestString()}");
+                    }
+                    catch (OperationCanceledException)
+                    {
+                        break;
+                    }
+
+                    _connections.Enqueue(info);
+                    _connectionsSemaphore.Release();
+                }
+            }
+        }
+    }
+}
index 4291750f1e0895fe7d3474a2b44c6d132ee54c0e..ba4c0c982913fce5d72757101770ae5d760eab43 100644 (file)
@@ -3,18 +3,13 @@
 // See the LICENSE file in the project root for more information.
 
 
-using Microsoft.Diagnostics.NETCore.Client;
-using Microsoft.Diagnostics.TestHelpers;
 using System;
-using System.Collections.Generic;
 using System.ComponentModel;
 using System.Diagnostics;
 using System.IO;
-using System.Linq;
 using System.Runtime.InteropServices;
 using System.Threading;
 using System.Threading.Tasks;
-using Xunit;
 using Xunit.Abstractions;
 
 namespace Microsoft.Diagnostics.NETCore.Client
@@ -50,9 +45,11 @@ namespace Microsoft.Diagnostics.NETCore.Client
             if (outputHelper != null)
                 outputHelper.WriteLine($"[{DateTime.Now.ToString()}] Launching test: " + startInfo.FileName);
 
-            testProcess = Process.Start(startInfo);
+            testProcess = new Process();
+            testProcess.StartInfo = startInfo;
+            testProcess.EnableRaisingEvents = true;
 
-            if (testProcess == null)
+            if (!testProcess.Start())
             {
                 outputHelper.WriteLine($"Could not start process: " + startInfo.FileName);
             }
@@ -125,5 +122,26 @@ namespace Microsoft.Diagnostics.NETCore.Client
                 outputHelper.WriteLine($"Process {testProcess.Id} status: Running");
             }
         }
+
+        public async Task WaitForExitAsync(CancellationToken token)
+        {
+            TaskCompletionSource<object> exitedSource = new TaskCompletionSource<object>(TaskCreationOptions.RunContinuationsAsynchronously);
+            EventHandler exitedHandler = (s, e) => exitedSource.TrySetResult(null);
+
+            testProcess.Exited += exitedHandler;
+            try
+            {
+                if (!testProcess.HasExited)
+                {
+                    using var _ = token.Register(() => exitedSource.TrySetCanceled(token));
+
+                    await exitedSource.Task;
+                }
+            }
+            finally
+            {
+                testProcess.Exited -= exitedHandler;
+            }
+        }
     }
 }
index c6143cf4d1f8ebe20ce33df506bc670d0112f7b0..8250ea7c8ecfaee10d4e3dd9f1d148778854c4f8 100644 (file)
@@ -9,13 +9,21 @@ namespace Tracee
 {
     class Program
     {
+        private const int LoopCount = 30;
+
         static void Main(string[] args)
         {
+            Console.WriteLine("Sleep in loop for {0} seconds.", LoopCount);
+
             // Runs for max of 30 sec
-            for(var i = 0; i < 30; i++)
+            for (var i = 0; i < LoopCount; i++)
             {
+                Console.WriteLine("Iteration #{0}", i);
                 Thread.Sleep(1000);
             }
+
+            Console.WriteLine("Press any key to exit.");
+            Console.ReadKey();
         }
     }
 }
index 134bf4d64c41d8a42e7638b4d91c72f8c8b534fa..494ad20f4309ba9fcbb82a084bf0850507b21812 100644 (file)
@@ -2,22 +2,16 @@
 // The .NET Foundation licenses this file to you under the MIT license.
 // See the LICENSE file in the project root for more information.
 
-using Microsoft.Diagnostics.Monitoring;
-using Microsoft.Diagnostics.NETCore.Client;
-using Microsoft.Extensions.DependencyInjection;
-using Microsoft.Extensions.Logging;
-using Microsoft.Extensions.Options;
-using Microsoft.VisualStudio.TestPlatform.ObjectModel;
-using Microsoft.VisualStudio.TestPlatform.ObjectModel.DataCollection;
 using System;
 using System.Collections.Generic;
-using System.Diagnostics;
 using System.IO;
-using System.Linq;
 using System.Runtime.InteropServices;
 using System.Text.Json;
 using System.Threading;
 using System.Threading.Tasks;
+using Microsoft.Diagnostics.Monitoring;
+using Microsoft.Diagnostics.NETCore.Client;
+using Microsoft.Extensions.Logging;
 using Xunit;
 using Xunit.Abstractions;
 using Xunit.Extensions;
@@ -43,7 +37,7 @@ namespace DotnetMonitor.UnitTests
 
             var outputStream = new MemoryStream();
 
-            await using (var testExecution = RemoteTestExecution.StartRemoteProcess("LoggerRemoteTest", _output))
+            await using (var testExecution = StartTraceeProcess("LoggerRemoteTest"))
             {
                 //TestRunner should account for start delay to make sure that the diagnostic pipe is available.
 
@@ -51,10 +45,10 @@ namespace DotnetMonitor.UnitTests
 
                 DiagnosticsEventPipeProcessor diagnosticsEventPipeProcessor = new DiagnosticsEventPipeProcessor(
                     PipeMode.Logs,
-                    loggerFactory,
-                    Enumerable.Empty<IMetricsLogger>());
+                    loggerFactory);
 
-                var processingTask = diagnosticsEventPipeProcessor.Process(testExecution.TestRunner.Pid, TimeSpan.FromSeconds(10), CancellationToken.None);
+                var client = new DiagnosticsClient(testExecution.TestRunner.Pid);
+                var processingTask = diagnosticsEventPipeProcessor.Process(client, testExecution.TestRunner.Pid, TimeSpan.FromSeconds(10), CancellationToken.None);
 
                 //Add a small delay to make sure diagnostic processor had a chance to initialize
                 await Task.Delay(1000);
@@ -108,6 +102,11 @@ namespace DotnetMonitor.UnitTests
             }
         }
 
+        private RemoteTestExecution StartTraceeProcess(string loggerCategory)
+        {
+            return RemoteTestExecution.StartProcess(CommonHelper.GetTraceePath("EventPipeTracee") + " " + loggerCategory, _output);
+        }
+
         private sealed class LoggerTestResult
         {
             public string Category { get; set; }
diff --git a/src/tests/dotnet-monitor/EndpointInfoSourceTests.cs b/src/tests/dotnet-monitor/EndpointInfoSourceTests.cs
new file mode 100644 (file)
index 0000000..9d973aa
--- /dev/null
@@ -0,0 +1,223 @@
+// Licensed to the .NET Foundation under one or more agreements.
+// The .NET Foundation licenses this file to you under the MIT license.
+// See the LICENSE file in the project root for more information.
+
+using System;
+using System.Collections.Generic;
+using System.Threading;
+using System.Threading.Tasks;
+using Microsoft.Diagnostics.Monitoring;
+using Microsoft.Diagnostics.NETCore.Client;
+using Xunit;
+using Xunit.Abstractions;
+
+namespace DotnetMonitor.UnitTests
+{
+    public class EndpointInfoSourceTests
+    {
+        private readonly ITestOutputHelper _outputHelper;
+
+        public EndpointInfoSourceTests(ITestOutputHelper outputHelper)
+        {
+            _outputHelper = outputHelper;
+        }
+
+        /// <summary>
+        /// Tests that the server endpoint info source has no connections
+        /// if <see cref="ServerEndpointInfoSource.Listen"/> is not called.
+        /// </summary>
+        [Fact]
+        public async Task ServerSourceNoListenTest()
+        {
+            await using var source = CreateServerSource(out string transportName);
+            // Intentionally do not call Listen
+
+            await using (var execution1 = StartTraceeProcess("LoggerRemoteTest", transportName))
+            {
+                execution1.Start();
+
+                await Task.Delay(TimeSpan.FromSeconds(1));
+
+                var endpointInfos = await GetEndpointInfoAsync(source);
+
+                Assert.Empty(endpointInfos);
+
+                _outputHelper.WriteLine("Stopping tracee.");
+            }
+        }
+
+        /// <summary>
+        /// Tests that the server endpoint info source has not connections if no processes connect to it.
+        /// </summary>
+        [Fact]
+        public async Task ServerSourceNoConnectionsTest()
+        {
+            await using var source = CreateServerSource(out _);
+            source.Listen();
+
+            var endpointInfos = await GetEndpointInfoAsync(source);
+            Assert.Empty(endpointInfos);
+        }
+
+        /// <summary>
+        /// Tests that server endpoint info source should throw ObjectDisposedException
+        /// from API surface after being disposed.
+        /// </summary>
+        [Fact]
+        public async Task ServerSourceThrowsWhenDisposedTest()
+        {
+            var source = CreateServerSource(out _);
+            source.Listen();
+
+            await source.DisposeAsync();
+
+            // Validate source surface throws after disposal
+            Assert.Throws<ObjectDisposedException>(
+                () => source.Listen());
+
+            Assert.Throws<ObjectDisposedException>(
+                () => source.Listen(1));
+
+            await Assert.ThrowsAsync<ObjectDisposedException>(
+                () => source.GetEndpointInfoAsync(CancellationToken.None));
+        }
+
+        /// <summary>
+        /// Tests that server endpoint info source should throw an exception from
+        /// <see cref="ServerEndpointInfoSource.Listen"/> and
+        /// <see cref="ServerEndpointInfoSource.Listen(int)"/> after listening was already started.
+        /// </summary>
+        [Fact]
+        public async Task ServerSourceThrowsWhenMultipleListenTest()
+        {
+            await using var source = CreateServerSource(out _);
+            source.Listen();
+
+            Assert.Throws<InvalidOperationException>(
+                () => source.Listen());
+
+            Assert.Throws<InvalidOperationException>(
+                () => source.Listen(1));
+        }
+
+        /// <summary>
+        /// Tests that the server endpoint info source can properly enumerate endpoint infos when a single
+        /// target connects to it and "disconnects" from it.
+        /// </summary>
+        [Fact]
+        public async Task ServerSourceAddRemoveSingleConnectionTest()
+        {
+            await using var source = CreateServerSource(out string transportName);
+            source.Listen();
+
+            var endpointInfos = await GetEndpointInfoAsync(source);
+            Assert.Empty(endpointInfos);
+
+            Task newEndpointInfoTask = source.WaitForNewEndpointInfoAsync(TimeSpan.FromSeconds(5));
+
+            await using (var execution1 = StartTraceeProcess("LoggerRemoteTest", transportName))
+            {
+                await newEndpointInfoTask;
+
+                execution1.Start();
+
+                endpointInfos = await GetEndpointInfoAsync(source);
+
+                var endpointInfo = Assert.Single(endpointInfos);
+                VerifyConnection(execution1.TestRunner, endpointInfo);
+
+                _outputHelper.WriteLine("Stopping tracee.");
+            }
+
+            await Task.Delay(TimeSpan.FromSeconds(1));
+
+            endpointInfos = await GetEndpointInfoAsync(source);
+
+            Assert.Empty(endpointInfos);
+        }
+
+        private TestServerEndpointInfoSource CreateServerSource(out string transportName)
+        {
+            transportName = ReversedServerHelper.CreateServerTransportName();
+            _outputHelper.WriteLine("Starting server endpoint info source at '" + transportName + "'.");
+            return new TestServerEndpointInfoSource(transportName, _outputHelper);
+        }
+
+        private RemoteTestExecution StartTraceeProcess(string loggerCategory, string transportName = null)
+        {
+            _outputHelper.WriteLine("Starting tracee.");
+            string exePath = CommonHelper.GetTraceePath("EventPipeTracee", targetFramework: "net5.0");
+            return RemoteTestExecution.StartProcess(exePath + " " + loggerCategory, _outputHelper, transportName);
+        }
+
+        private async Task<IEnumerable<IEndpointInfo>> GetEndpointInfoAsync(ServerEndpointInfoSource source)
+        {
+            _outputHelper.WriteLine("Getting endpoint infos.");
+            using CancellationTokenSource cancellationSource = new CancellationTokenSource(TimeSpan.FromSeconds(10));
+            return await source.GetEndpointInfoAsync(cancellationSource.Token);
+        }
+
+        /// <summary>
+        /// Verifies basic information on the connection and that it matches the target process from the runner.
+        /// </summary>
+        private static void VerifyConnection(TestRunner runner, IEndpointInfo endpointInfo)
+        {
+            Assert.NotNull(runner);
+            Assert.NotNull(endpointInfo);
+            Assert.Equal(runner.Pid, endpointInfo.ProcessId);
+            Assert.NotEqual(Guid.Empty, endpointInfo.RuntimeInstanceCookie);
+            Assert.NotNull(endpointInfo.Endpoint);
+        }
+
+        private sealed class TestServerEndpointInfoSource : ServerEndpointInfoSource
+        {
+            private readonly ITestOutputHelper _outputHelper;
+            private readonly List<TaskCompletionSource<IpcEndpointInfo>> _addedEndpointInfoSources = new List<TaskCompletionSource<IpcEndpointInfo>>();
+
+            public TestServerEndpointInfoSource(string transportPath, ITestOutputHelper outputHelper)
+                : base(transportPath)
+            {
+                _outputHelper = outputHelper;
+            }
+
+            public async Task<IpcEndpointInfo> WaitForNewEndpointInfoAsync(TimeSpan timeout)
+            {
+                TaskCompletionSource<IpcEndpointInfo> addedEndpointInfoSource = new TaskCompletionSource<IpcEndpointInfo>(TaskCreationOptions.RunContinuationsAsynchronously);
+                using var timeoutCancellation = new CancellationTokenSource();
+                var token = timeoutCancellation.Token;
+                using var _ = token.Register(() => addedEndpointInfoSource.TrySetCanceled(token));
+
+                lock (_addedEndpointInfoSources)
+                {
+                    _addedEndpointInfoSources.Add(addedEndpointInfoSource);
+                }
+
+                _outputHelper.WriteLine("Waiting for new endpoint info.");
+                timeoutCancellation.CancelAfter(timeout);
+                IpcEndpointInfo endpointInfo = await addedEndpointInfoSource.Task;
+                _outputHelper.WriteLine("Notified of new endpoint info.");
+
+                return endpointInfo;
+            }
+
+            internal override void OnAddedEndpointInfo(IpcEndpointInfo info)
+            {
+                _outputHelper.WriteLine($"Added endpoint info to collection: {info.ToTestString()}");
+                
+                lock (_addedEndpointInfoSources)
+                {
+                    foreach (var source in _addedEndpointInfoSources)
+                    {
+                        source.TrySetResult(info);
+                    }
+                    _addedEndpointInfoSources.Clear();
+                }
+            }
+
+            internal override void OnRemovedEndpointInfo(IpcEndpointInfo info)
+            {
+                _outputHelper.WriteLine($"Removed endpoint info from collection: {info.ToTestString()}");
+            }
+        }
+    }
+}
index 832945aa9ee97a821d7117976be4a216a695b1d2..0db40967e33bb4d333d629e8e8f6453516a6ac6f 100644 (file)
@@ -2,16 +2,11 @@
 // The .NET Foundation licenses this file to you under the MIT license.
 // See the LICENSE file in the project root for more information.
 
-using Microsoft.Diagnostics.NETCore.Client;
-using Microsoft.Extensions.DependencyInjection;
-using Microsoft.Extensions.Logging;
 using System;
-using System.Collections.Generic;
-using System.Diagnostics;
 using System.IO;
-using System.Text;
 using System.Threading;
 using System.Threading.Tasks;
+using Microsoft.Diagnostics.NETCore.Client;
 using Xunit.Abstractions;
 
 namespace DotnetMonitor.UnitTests
@@ -23,14 +18,17 @@ namespace DotnetMonitor.UnitTests
     {
         private Task IoReadingTask { get; }
 
-        private RemoteTestExecution(TestRunner runner, Task ioReadingTask)
+        private ITestOutputHelper OutputHelper { get; }
+
+        public TestRunner TestRunner { get; }
+
+        private RemoteTestExecution(TestRunner runner, Task ioReadingTask, ITestOutputHelper outputHelper)
         {
             TestRunner = runner;
             IoReadingTask = ioReadingTask;
+            OutputHelper = outputHelper;
         }
 
-        public TestRunner TestRunner { get; }
-
         public void Start()
         {
             SendSignal();
@@ -44,15 +42,18 @@ namespace DotnetMonitor.UnitTests
             TestRunner.StandardInput.Flush();
         }
 
-        public static RemoteTestExecution StartRemoteProcess(string loggerCategory, ITestOutputHelper outputHelper)
+        public static RemoteTestExecution StartProcess(string commandLine, ITestOutputHelper outputHelper, string reversedServerTransportName = null)
         {
-            TestRunner runner = new TestRunner(CommonHelper.GetTraceePath("EventPipeTracee") + " " + loggerCategory,
-                outputHelper, redirectError: true, redirectInput: true);
+            TestRunner runner = new TestRunner(commandLine, outputHelper, redirectError: true, redirectInput: true);
+            if (!string.IsNullOrEmpty(reversedServerTransportName))
+            {
+                runner.AddReversedServer(reversedServerTransportName);
+            }
             runner.Start();
 
             Task readingTask = ReadAllOutput(runner.StandardOutput, runner.StandardError, outputHelper);
 
-            return new RemoteTestExecution(runner, readingTask);
+            return new RemoteTestExecution(runner, readingTask, outputHelper);
         }
 
         private static Task ReadAllOutput(StreamReader output, StreamReader error, ITestOutputHelper outputHelper)
@@ -95,6 +96,18 @@ namespace DotnetMonitor.UnitTests
         public async ValueTask DisposeAsync()
         {
             SendSignal();
+
+            using var timeoutSource = new CancellationTokenSource(TimeSpan.FromSeconds(1));
+            try
+            {
+                await TestRunner.WaitForExitAsync(timeoutSource.Token);
+            }
+            catch (OperationCanceledException)
+            {
+                OutputHelper.WriteLine("Remote process did not exit within timeout period. Forcefully stopping process.");
+                TestRunner.Stop();
+            }
+
             await IoReadingTask;
         }
     }