Add reversed diagnostics server and endpoint information.
Add diagnostics endpoint source for unifying server and client connections into same contract.
Consume endpoint source in dotnet-monitor.
Add unit tests for reversed server, endpoints, and endpoint source concepts.
<MicrosoftExtensionsConfigurationJsonVersion>2.1.1</MicrosoftExtensionsConfigurationJsonVersion>
<MicrosoftExtensionsConfigurationKeyPerFileVersion>2.1.1</MicrosoftExtensionsConfigurationKeyPerFileVersion>
<MicrosoftExtensionsDependencyInjectionVersion>2.1.1</MicrosoftExtensionsDependencyInjectionVersion>
+ <MicrosoftExtensionsHostingAbstractionsVersion>2.1.1</MicrosoftExtensionsHostingAbstractionsVersion>
<MicrosoftExtensionsLoggingVersion>2.1.1</MicrosoftExtensionsLoggingVersion>
<MicrosoftExtensionsLoggingConsoleVersion>2.1.1</MicrosoftExtensionsLoggingConsoleVersion>
<!-- We use a newer version of LoggingEventSource due to a bug in an older version-->
}
[HttpGet("processes")]
- public ActionResult<IEnumerable<ProcessModel>> GetProcesses()
+ public Task<ActionResult<IEnumerable<ProcessModel>>> GetProcesses()
{
- return this.InvokeService(() =>
+ return this.InvokeService(async () =>
{
IList<ProcessModel> processes = new List<ProcessModel>();
- foreach (int pid in _diagnosticServices.GetProcesses())
+ foreach (IProcessInfo p in await _diagnosticServices.GetProcessesAsync(HttpContext.RequestAborted))
{
- processes.Add(new ProcessModel() { Pid = pid });
+ processes.Add(ProcessModel.FromProcessInfo(p));
}
return new ActionResult<IEnumerable<ProcessModel>>(processes);
});
{
return this.InvokeService(async () =>
{
- int pidValue = _diagnosticServices.ResolveProcess(pid);
- Stream result = await _diagnosticServices.GetDump(pidValue, type);
+ int pidValue = await _diagnosticServices.ResolveProcessAsync(pid, HttpContext.RequestAborted);
+ Stream result = await _diagnosticServices.GetDump(pidValue, type, HttpContext.RequestAborted);
string dumpFileName = RuntimeInformation.IsOSPlatform(OSPlatform.Windows) ?
FormattableString.Invariant($"dump_{GetFileNameTimeStampUtcNow()}.dmp") :
{
return this.InvokeService(async () =>
{
- int pidValue = _diagnosticServices.ResolveProcess(pid);
+ int pidValue = await _diagnosticServices.ResolveProcessAsync(pid, HttpContext.RequestAborted);
Stream result = await _diagnosticServices.GetGcDump(pidValue, this.HttpContext.RequestAborted);
return File(result, "application/octet-stream", FormattableString.Invariant($"{GetFileNameTimeStampUtcNow()}_{pidValue}.gcdump"));
});
[HttpGet("logs/{pid?}")]
[Produces(ContentTypeEventStream, ContentTypeNdJson, ContentTypeJson)]
- public ActionResult Logs(int? pid, [FromQuery][Range(-1, int.MaxValue)] int durationSeconds = 30, [FromQuery] LogLevel level = LogLevel.Debug)
+ public Task<ActionResult> Logs(int? pid, [FromQuery][Range(-1, int.MaxValue)] int durationSeconds = 30, [FromQuery] LogLevel level = LogLevel.Debug)
{
TimeSpan duration = ConvertSecondsToTimeSpan(durationSeconds);
- return this.InvokeService(() =>
+ return this.InvokeService(async () =>
{
- int pidValue = _diagnosticServices.ResolveProcess(pid);
+ int pidValue = await _diagnosticServices.ResolveProcessAsync(pid, HttpContext.RequestAborted);
LogFormat format = ComputeLogFormat(Request.GetTypedHeaders().Accept);
if (format == LogFormat.None)
private async Task<StreamWithCleanupResult> StartTrace(int? pid, MonitoringSourceConfiguration configuration, TimeSpan duration)
{
- int pidValue = _diagnosticServices.ResolveProcess(pid);
+ int pidValue = await _diagnosticServices.ResolveProcessAsync(pid, HttpContext.RequestAborted);
IStreamWithCleanup result = await _diagnosticServices.StartTrace(pidValue, configuration, duration, this.HttpContext.RequestAborted);
return new StreamWithCleanupResult(result, "application/octet-stream", FormattableString.Invariant($"{GetFileNameTimeStampUtcNow()}_{pidValue}.nettrace"));
}
// The .NET Foundation licenses this file to you under the MIT license.
// See the LICENSE file in the project root for more information.
-using Microsoft.Diagnostics.Tracing.Analysis;
-using Microsoft.Extensions.Hosting;
-using Microsoft.Extensions.Options;
using System;
-using System.Collections.Generic;
-using System.Linq;
-using System.Text;
using System.Threading;
using System.Threading.Tasks;
+using Microsoft.Diagnostics.NETCore.Client;
+using Microsoft.Extensions.Hosting;
+using Microsoft.Extensions.Options;
namespace Microsoft.Diagnostics.Monitoring.RestServer
{
{
//TODO In multi-process scenarios, how do we decide which process to choose?
//One possibility is to enable metrics after a request to begin polling for metrics
- int pid = _services.ResolveProcess(pid: null);
- await _pipeProcessor.Process(pid, Timeout.InfiniteTimeSpan, stoppingToken);
+ int pid = await _services.ResolveProcessAsync(pid: null, stoppingToken);
+ var client = new DiagnosticsClient(pid);
+ await _pipeProcessor.Process(client, pid, Timeout.InfiniteTimeSpan, stoppingToken);
}
catch(Exception e) when (!(e is OperationCanceledException))
{
-using System.Runtime.Serialization;
+using System;
+using System.Runtime.Serialization;
namespace Microsoft.Diagnostics.Monitoring.RestServer.Models
{
{
[DataMember(Name = "pid")]
public int Pid { get; set; }
+
+ [DataMember(Name = "uid")]
+ public Guid Uid { get; set; }
+
+ public static ProcessModel FromProcessInfo(IProcessInfo processInfo)
+ {
+ return new ProcessModel() { Pid = processInfo.Pid, Uid = processInfo.Uid };
+ }
}
}
\ No newline at end of file
--- /dev/null
+// Licensed to the .NET Foundation under one or more agreements.
+// The .NET Foundation licenses this file to you under the MIT license.
+// See the LICENSE file in the project root for more information.
+
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Threading;
+using System.Threading.Tasks;
+using Microsoft.Diagnostics.NETCore.Client;
+
+namespace Microsoft.Diagnostics.Monitoring
+{
+ internal sealed class ClientEndpointInfoSource : IEndpointInfoSourceInternal
+ {
+ public Task<IEnumerable<IEndpointInfo>> GetEndpointInfoAsync(CancellationToken token)
+ {
+ List<IEndpointInfo> endpointInfos = new List<IEndpointInfo>();
+ foreach (int pid in DiagnosticsClient.GetPublishedProcesses())
+ {
+ // CONSIDER: Generate a "runtime instance identifier" based on the pipe name
+ // e.g. pid + disambiguator in GUID form.
+ endpointInfos.Add(new EndpointInfo(pid));
+ }
+
+ return Task.FromResult(endpointInfos.AsEnumerable());
+ }
+
+ private class EndpointInfo : IEndpointInfo
+ {
+ public EndpointInfo(int processId)
+ {
+ Endpoint = new PidIpcEndpoint(processId);
+ ProcessId = processId;
+ }
+
+ public IpcEndpoint Endpoint { get; }
+
+ public int ProcessId { get; }
+
+ public Guid RuntimeInstanceCookie => Guid.Empty;
+ }
+ }
+}
public override IList<EventPipeProvider> GetProviders() =>
new EventPipeProvider[]
{
- new EventPipeProvider("Microsoft-DotNETCore-SampleProfiler", System.Diagnostics.Tracing.EventLevel.Informational),
+ new EventPipeProvider(SampleProfilerProviderName, System.Diagnostics.Tracing.EventLevel.Informational),
new EventPipeProvider("Microsoft-Windows-DotNETRuntime", System.Diagnostics.Tracing.EventLevel.Informational, (long) Tracing.Parsers.ClrTraceEventParser.Keywords.Default)
};
}
public const string GrpcAspNetCoreServer = "Grpc.AspNetCore.Server";
public const string DiagnosticSourceEventSource = "Microsoft-Diagnostics-DiagnosticSource";
public const string TplEventSource = "System.Threading.Tasks.TplEventSource";
+ public const string SampleProfilerProviderName = "Microsoft-DotNETCore-SampleProfiler";
+ public const string EventPipeProviderName = "Microsoft-DotNETCore-EventPipe";
public abstract IList<EventPipeProvider> GetProviders();
--- /dev/null
+// Licensed to the .NET Foundation under one or more agreements.
+// The .NET Foundation licenses this file to you under the MIT license.
+// See the LICENSE file in the project root for more information.
+
+using Microsoft.Diagnostics.NETCore.Client;
+using System.Collections.Generic;
+using System.Diagnostics.Tracing;
+
+namespace Microsoft.Diagnostics.Monitoring
+{
+ public sealed class SampleProfilerConfiguration : MonitoringSourceConfiguration
+ {
+ public override IList<EventPipeProvider> GetProviders() =>
+ new EventPipeProvider[]
+ {
+ new EventPipeProvider(SampleProfilerProviderName, EventLevel.Informational)
+ };
+
+ public override int BufferSizeInMB => 1;
+
+ public override bool RequestRundown => false;
+ }
+}
/// </summary>
public interface IDiagnosticServices : IDisposable
{
- IEnumerable<int> GetProcesses();
+ Task<IEnumerable<IProcessInfo>> GetProcessesAsync(CancellationToken token);
- int ResolveProcess(int? pid);
+ Task<int> ResolveProcessAsync(int? pid, CancellationToken token);
- Task<Stream> GetDump(int pid, DumpType mode);
+ Task<Stream> GetDump(int pid, DumpType mode, CancellationToken token);
Task<Stream> GetGcDump(int pid, CancellationToken token);
Stream Stream { get; }
}
+ public interface IProcessInfo
+ {
+ int Pid { get; }
+
+ Guid Uid { get; }
+ }
+
public enum DumpType
{
Full = 1,
--- /dev/null
+// Licensed to the .NET Foundation under one or more agreements.
+// The .NET Foundation licenses this file to you under the MIT license.
+// See the LICENSE file in the project root for more information.
+
+using System;
+using System.Collections.Generic;
+using System.Threading;
+using System.Threading.Tasks;
+using Microsoft.Diagnostics.NETCore.Client;
+
+namespace Microsoft.Diagnostics.Monitoring
+{
+ internal interface IEndpointInfo
+ {
+ IpcEndpoint Endpoint { get; }
+
+ int ProcessId { get; }
+
+ Guid RuntimeInstanceCookie { get; }
+ }
+
+ public interface IEndpointInfoSource
+ {
+ }
+
+ internal interface IEndpointInfoSourceInternal : IEndpointInfoSource
+ {
+ Task<IEnumerable<IEndpointInfo>> GetEndpointInfoAsync(CancellationToken token);
+ }
+}
{
private const int DockerEntrypointProcessId = 1;
+ // The amount of time to wait when checking if the docker entrypoint process is a .NET process
+ // with a diagnostics transport connection.
+ private static readonly TimeSpan DockerEntrypointWaitTimeout = TimeSpan.FromMilliseconds(250);
+
+ private readonly IEndpointInfoSourceInternal _endpointInfoSource;
private readonly CancellationTokenSource _tokenSource = new CancellationTokenSource();
- public IEnumerable<int> GetProcesses()
+ public DiagnosticServices(IEndpointInfoSource endpointInfoSource)
+ {
+ _endpointInfoSource = (IEndpointInfoSourceInternal)endpointInfoSource;
+ }
+
+ public async Task<IEnumerable<IProcessInfo>> GetProcessesAsync(CancellationToken token)
{
try
{
- //TODO This won't work properly with multi-container scenarios that don't share the process space.
- //TODO We will need to use DiagnosticsAgent if we are the server.
- return DiagnosticsClient.GetPublishedProcesses();
+ var endpointInfos = await _endpointInfoSource.GetEndpointInfoAsync(token);
+
+ return endpointInfos.Select(c => new ProcessInfo(c.RuntimeInstanceCookie, c.ProcessId));
}
catch (UnauthorizedAccessException)
{
}
}
- public async Task<Stream> GetDump(int pid, DumpType mode)
+ public async Task<Stream> GetDump(int pid, DumpType mode, CancellationToken token)
{
string dumpFilePath = Path.Combine(Path.GetTempPath(), FormattableString.Invariant($"{Guid.NewGuid()}_{pid}"));
NETCore.Client.DumpType dumpType = MapDumpType(mode);
}
else
{
+ var client = await GetClientAsync(pid, CancellationToken.None);
await Task.Run(() =>
{
- var client = new DiagnosticsClient(pid);
client.WriteDump(dumpType, dumpFilePath);
});
}
return new AutoDeleteFileStream(dumpFilePath);
}
- public async Task<Stream> GetGcDump(int pid, CancellationToken cancellationToken)
+ public async Task<Stream> GetGcDump(int pid, CancellationToken token)
{
var graph = new MemoryGraph(50_000);
- await using var processor = new DiagnosticsEventPipeProcessor(PipeMode.GCDump,
+ await using var processor = new DiagnosticsEventPipeProcessor(
+ PipeMode.GCDump,
gcGraph: graph);
- await processor.Process(pid, Timeout.InfiniteTimeSpan, cancellationToken);
+ var client = await GetClientAsync(pid, token);
+ await processor.Process(client, pid, Timeout.InfiniteTimeSpan, token);
var dumper = new GCHeapDump(graph);
dumper.CreationTool = "dotnet-monitor";
public async Task<IStreamWithCleanup> StartTrace(int pid, MonitoringSourceConfiguration configuration, TimeSpan duration, CancellationToken token)
{
DiagnosticsMonitor monitor = new DiagnosticsMonitor(configuration);
- Stream stream = await monitor.ProcessEvents(pid, duration, token);
+ var client = await GetClientAsync(pid, token);
+ Stream stream = await monitor.ProcessEvents(client, duration, token);
return new StreamWithCleanup(monitor, stream);
}
loggerFactory.AddProvider(new StreamingLoggerProvider(outputStream, format, level));
- await using var processor = new DiagnosticsEventPipeProcessor(PipeMode.Logs,
+ await using var processor = new DiagnosticsEventPipeProcessor(
+ PipeMode.Logs,
loggerFactory: loggerFactory,
logsLevel: level);
- await processor.Process(pid, duration, token);
+ var client = await GetClientAsync(pid, token);
+ await processor.Process(client, pid, duration, token);
}
private static NETCore.Client.DumpType MapDumpType(DumpType dumpType)
}
}
- public int ResolveProcess(int? pid)
+ public async Task<int> ResolveProcessAsync(int? pid, CancellationToken token)
{
if (pid.HasValue)
{
// Short-circuit for when running in a Docker container.
if (RuntimeInfo.IsInDockerContainer)
{
- var client = new DiagnosticsClient(DockerEntrypointProcessId);
- if (client.CheckTransport())
+ try
{
+ var client = await GetClientAsync(DockerEntrypointProcessId, token);
+ using var timeoutSource = new CancellationTokenSource(DockerEntrypointWaitTimeout);
+
+ await client.WaitForConnectionAsync(timeoutSource.Token);
+
return DockerEntrypointProcessId;
}
+ catch
+ {
+ // Process ID 1 doesn't exist or didn't advertise in the reverse pipe configuration.
+ }
}
// Only return a process ID if there is exactly one discoverable process.
- int[] pids = GetProcesses().ToArray();
- switch (pids.Length)
+ IProcessInfo[] processes = (await GetProcessesAsync(token)).ToArray();
+ switch (processes.Length)
{
case 0:
throw new ArgumentException("Unable to discover a target process.");
case 1:
- return pids[0];
+ return processes[0].Pid;
default:
#if DEBUG
- Process process = pids.Select(pid => Process.GetProcessById(pid)).FirstOrDefault(p => string.Equals(p.ProcessName, "iisexpress", StringComparison.OrdinalIgnoreCase));
+ Process process = processes.Select(p => Process.GetProcessById(p.Pid)).FirstOrDefault(p => string.Equals(p.ProcessName, "iisexpress", StringComparison.OrdinalIgnoreCase));
if (process != null)
{
return process.Id;
}
}
+ private async Task<DiagnosticsClient> GetClientAsync(int processId, CancellationToken token)
+ {
+ var endpointInfos = await _endpointInfoSource.GetEndpointInfoAsync(token);
+ var endpointInfo = endpointInfos.FirstOrDefault(c => c.ProcessId == processId);
+
+ if (null == endpointInfo)
+ {
+ throw new InvalidOperationException($"Diagnostics client for process ID {processId} does not exist.");
+ }
+
+ return new DiagnosticsClient(endpointInfo.Endpoint);
+ }
+
public void Dispose()
{
_tokenSource.Cancel();
}
}
}
+
+ private sealed class ProcessInfo : IProcessInfo
+ {
+ public ProcessInfo(Guid uid, int pid)
+ {
+ Pid = pid;
+ Uid = uid;
+ }
+
+ public int Pid { get; }
+
+ public Guid Uid { get; }
+ }
}
}
// The .NET Foundation licenses this file to you under the MIT license.
// See the LICENSE file in the project root for more information.
-using Graphs;
-using Microsoft.Diagnostics.NETCore.Client;
-using Microsoft.Diagnostics.Tracing;
-using Microsoft.Diagnostics.Tracing.Parsers.Clr;
-using Microsoft.Extensions.Logging;
using System;
using System.Collections.Generic;
using System.Globalization;
using System.Text.Json;
using System.Threading;
using System.Threading.Tasks;
+using Graphs;
+using Microsoft.Diagnostics.NETCore.Client;
+using Microsoft.Diagnostics.Tracing;
+using Microsoft.Diagnostics.Tracing.Parsers.Clr;
+using Microsoft.Extensions.Logging;
namespace Microsoft.Diagnostics.Monitoring
{
{
Logs = 1,
Metrics,
- GCDump
+ GCDump,
+ ProcessInfo
}
public class DiagnosticsEventPipeProcessor : IAsyncDisposable
private readonly PipeMode _mode;
private readonly int _metricIntervalSeconds;
private readonly LogLevel _logsLevel;
+ private readonly Action<string> _processInfoCallback;
public DiagnosticsEventPipeProcessor(
PipeMode mode,
- ILoggerFactory loggerFactory = null,
- IEnumerable<IMetricsLogger> metricLoggers = null,
- int metricIntervalSeconds = 10,
- MemoryGraph gcGraph = null,
- LogLevel logsLevel = LogLevel.Debug)
+ ILoggerFactory loggerFactory = null, // PipeMode = Logs
+ LogLevel logsLevel = LogLevel.Debug, // PipeMode = Logs
+ IEnumerable<IMetricsLogger> metricLoggers = null, // PipeMode = Metrics
+ int metricIntervalSeconds = 10, // PipeMode = Metrics
+ MemoryGraph gcGraph = null, // PipeMode = GCDump
+ Action<string> processInfoCallback = null // PipeMode = ProcessInfo
+ )
{
_metricLoggers = metricLoggers ?? Enumerable.Empty<IMetricsLogger>();
_mode = mode;
_gcGraph = gcGraph;
_metricIntervalSeconds = metricIntervalSeconds;
_logsLevel = logsLevel;
+ _processInfoCallback = processInfoCallback;
}
- public async Task Process(int pid, TimeSpan duration, CancellationToken token)
+ public async Task Process(DiagnosticsClient client, int pid, TimeSpan duration, CancellationToken token)
{
await await Task.Factory.StartNew(async () =>
{
{
config = new GCDumpSourceConfiguration();
}
+ if (_mode == PipeMode.ProcessInfo)
+ {
+ config = new SampleProfilerConfiguration();
+ }
monitor = new DiagnosticsMonitor(config);
- Stream sessionStream = await monitor.ProcessEvents(pid, duration, token);
+ Stream sessionStream = await monitor.ProcessEvents(client, duration, token);
source = new EventPipeEventSource(sessionStream);
// Allows the event handling routines to stop processing before the duration expires.
handleEventsTask = HandleGCEvents(source, pid, stopFunc, token);
}
+ if (_mode == PipeMode.ProcessInfo)
+ {
+ // ProcessInfo
+ HandleProcessInfo(source, stopFunc, token);
+ }
+
source.Process();
token.ThrowIfCancellationRequested();
_gcGraph.AllowReading();
}
+ private void HandleProcessInfo(EventPipeEventSource source, Func<Task> stopFunc, CancellationToken token)
+ {
+ source.Dynamic.AddCallbackForProviderEvent(MonitoringSourceConfiguration.EventPipeProviderName, "ProcessInfo", traceEvent =>
+ {
+ _processInfoCallback?.Invoke((string)traceEvent.PayloadByName("CommandLine"));
+ });
+
+ source.Dynamic.All += traceEvent =>
+ {
+ stopFunc();
+ };
+ }
+
public async ValueTask DisposeAsync()
{
foreach (IMetricsLogger logger in _metricLoggers)
// The .NET Foundation licenses this file to you under the MIT license.
// See the LICENSE file in the project root for more information.
-using Microsoft.Diagnostics.NETCore.Client;
using System;
-using System.Collections.Generic;
using System.IO;
-using System.Linq;
-using System.Text.Json;
using System.Threading;
using System.Threading.Tasks;
+using Microsoft.Diagnostics.NETCore.Client;
namespace Microsoft.Diagnostics.Monitoring
{
public sealed class DiagnosticsMonitor : IAsyncDisposable
{
private readonly MonitoringSourceConfiguration _sourceConfig;
- private readonly CancellationTokenSource _stopProcessingSource;
+ private readonly TaskCompletionSource<object> _stopProcessingSource;
private readonly object _lock = new object();
private Task _currentTask;
private bool _disposed;
public DiagnosticsMonitor(MonitoringSourceConfiguration sourceConfig)
{
_sourceConfig = sourceConfig;
- _stopProcessingSource = new CancellationTokenSource();
+ _stopProcessingSource = new TaskCompletionSource<object>(TaskCreationOptions.RunContinuationsAsynchronously);
}
public Task CurrentProcessingTask => _currentTask;
- public Task<Stream> ProcessEvents(int processId, TimeSpan duration, CancellationToken cancellationToken)
+ public Task<Stream> ProcessEvents(DiagnosticsClient client, TimeSpan duration, CancellationToken cancellationToken)
{
cancellationToken.ThrowIfCancellationRequested();
}
EventPipeSession session = null;
- var client = new DiagnosticsClient(processId);
-
try
{
session = client.StartEventPipeSession(_sourceConfig.GetProviders(), _sourceConfig.RequestRundown, _sourceConfig.BufferSizeInMB);
throw new InvalidOperationException("Failed to start the event pipe session", ex);
}
- CancellationTokenSource linkedSource = CancellationTokenSource.CreateLinkedTokenSource(_stopProcessingSource.Token, cancellationToken);
-
_currentTask = Task.Run( async () =>
{
- try
- {
- await Task.Delay(duration, linkedSource.Token);
- }
- finally
- {
- linkedSource.Dispose();
- StopSession(session);
- }
+ using var linkedSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
+ linkedSource.CancelAfter(duration);
+ using var _ = linkedSource.Token.Register(() => _stopProcessingSource.TrySetResult(null));
+
+ // Use TaskCompletionSource instead of Task.Delay with cancellation to avoid
+ // using exceptions for normal termination of event stream.
+ await _stopProcessingSource.Task.ConfigureAwait(false);
+
+ StopSession(session);
});
return Task.FromResult(session.EventStream);
public void StopProcessing()
{
- _stopProcessingSource.Cancel();
+ _stopProcessingSource.TrySetResult(null);
}
private static void StopSession(EventPipeSession session)
_currentTask = null;
_disposed = true;
}
- _stopProcessingSource.Cancel();
+ _stopProcessingSource.TrySetResult(null);
if (currentTask != null)
{
try
{
- await currentTask;
+ await currentTask.ConfigureAwait(false);
}
catch (OperationCanceledException)
{
}
}
- _stopProcessingSource?.Dispose();
}
}
}
\ No newline at end of file
<PackageReference Include="Microsoft.Bcl.HashCode" Version="$(MicrosoftBclHashCodeVersion)" />
<PackageReference Include="Microsoft.Diagnostics.Tracing.TraceEvent" Version="$(MicrosoftDiagnosticsTracingTraceEventVersion)" />
<PackageReference Include="Microsoft.Bcl.AsyncInterfaces" Version="$(MicrosoftBclAsyncInterfacesVersion)" />
+ <PackageReference Include="Microsoft.Extensions.Hosting.Abstractions" Version="$(MicrosoftExtensionsHostingAbstractionsVersion)" />
<PackageReference Include="Microsoft.Extensions.Logging" Version="$(MicrosoftExtensionsLoggingVersion)" />
<PackageReference Include="System.Text.Json" Version="$(SystemTextJsonVersion)" />
</ItemGroup>
<ItemGroup>
<None Include="..\Tools\dotnet-gcdump\DotNetHeapDump\README.md" Link="DotNetHeapDump\README.md" />
</ItemGroup>
+
+ <ItemGroup>
+ <InternalsVisibleTo Include="dotnet-monitor" />
+ <InternalsVisibleTo Include="DotnetMonitor.UnitTests" />
+ </ItemGroup>
</Project>
--- /dev/null
+// Licensed to the .NET Foundation under one or more agreements.
+// The .NET Foundation licenses this file to you under the MIT license.
+// See the LICENSE file in the project root for more information.
+
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Threading;
+using System.Threading.Tasks;
+using Microsoft.Diagnostics.NETCore.Client;
+
+namespace Microsoft.Diagnostics.Monitoring
+{
+ /// <summary>
+ /// Aggregates diagnostic endpoints that are established at a transport path via a reversed server.
+ /// </summary>
+ internal class ServerEndpointInfoSource : IEndpointInfoSourceInternal, IAsyncDisposable
+ {
+ // The amount of time to wait when checking if the a endpoint info should be
+ // pruned from the list of endpoint infos. If the runtime doesn't have a viable connection within
+ // this time, it will be pruned from the list.
+ private static readonly TimeSpan PruneWaitForConnectionTimeout = TimeSpan.FromMilliseconds(250);
+
+ private readonly CancellationTokenSource _cancellation = new CancellationTokenSource();
+ private readonly IList<IpcEndpointInfo> _endpointInfos = new List<IpcEndpointInfo>();
+ private readonly SemaphoreSlim _endpointInfosSemaphore = new SemaphoreSlim(1);
+ private readonly string _transportPath;
+
+ private Task _listenTask;
+ private bool _disposed = false;
+ private ReversedDiagnosticsServer _server;
+
+ /// <summary>
+ /// Constructs a <see cref="ServerEndpointInfoSource"/> that aggreates diagnostic endpoints
+ /// from a reversed diagnostics server at path specified by <paramref name="transportPath"/>.
+ /// </summary>
+ /// <param name="transportPath">
+ /// The path of the server endpoint.
+ /// On Windows, this can be a full pipe path or the name without the "\\.\pipe\" prefix.
+ /// On all other systems, this must be the full file path of the socket.
+ /// </param>
+ public ServerEndpointInfoSource(string transportPath)
+ {
+ _transportPath = transportPath;
+ }
+
+ public async ValueTask DisposeAsync()
+ {
+ if (!_disposed)
+ {
+ _cancellation.Cancel();
+
+ if (null != _listenTask)
+ {
+ await _listenTask.ConfigureAwait(false);
+ }
+
+ _server?.Dispose();
+
+ _endpointInfosSemaphore.Dispose();
+
+ _cancellation.Dispose();
+
+ _disposed = true;
+ }
+ }
+
+ /// <summary>
+ /// Starts listening to the reversed diagnostics server for new connections.
+ /// </summary>
+ public void Listen()
+ {
+ Listen(ReversedDiagnosticsServer.MaxAllowedConnections);
+ }
+
+ /// <summary>
+ /// Starts listening to the reversed diagnostics server for new connections.
+ /// </summary>
+ /// <param name="maxConnections">The maximum number of connections the server will support.</param>
+ public void Listen(int maxConnections)
+ {
+ VerifyNotDisposed();
+
+ if (null != _server || null != _listenTask)
+ {
+ throw new InvalidOperationException(nameof(ServerEndpointInfoSource.Listen) + " method can only be called once.");
+ }
+
+ _server = new ReversedDiagnosticsServer(_transportPath, maxConnections);
+
+ _listenTask = ListenAsync(_cancellation.Token);
+ }
+
+ /// <summary>
+ /// Gets the list of <see cref="IpcEndpointInfo"/> served from the reversed diagnostics server.
+ /// </summary>
+ /// <param name="token">The token to monitor for cancellation requests.</param>
+ /// <returns>A list of active <see cref="IEndpointInfo"/> instances.</returns>
+ public async Task<IEnumerable<IEndpointInfo>> GetEndpointInfoAsync(CancellationToken token)
+ {
+ VerifyNotDisposed();
+
+ using CancellationTokenSource linkedSource = CancellationTokenSource.CreateLinkedTokenSource(token, _cancellation.Token);
+ CancellationToken linkedToken = linkedSource.Token;
+
+ // Prune connections that no longer have an active runtime instance before
+ // returning the list of connections.
+ await _endpointInfosSemaphore.WaitAsync(linkedToken).ConfigureAwait(false);
+ try
+ {
+ // Check the transport for each endpoint info and remove it if the check fails.
+ var endpointInfos = _endpointInfos.ToList();
+
+ var pruneTasks = new List<Task>();
+ foreach (IpcEndpointInfo info in endpointInfos)
+ {
+ pruneTasks.Add(Task.Run(() => PruneIfNotViable(info, linkedToken), linkedToken));
+ }
+
+ await Task.WhenAll(pruneTasks).ConfigureAwait(false);
+
+ return _endpointInfos.Select(c => new EndpointInfo(c));
+ }
+ finally
+ {
+ _endpointInfosSemaphore.Release();
+ }
+ }
+
+ private async Task PruneIfNotViable(IpcEndpointInfo info, CancellationToken token)
+ {
+ using var timeoutSource = new CancellationTokenSource();
+ using var linkedSource = CancellationTokenSource.CreateLinkedTokenSource(token, timeoutSource.Token);
+
+ try
+ {
+ timeoutSource.CancelAfter(PruneWaitForConnectionTimeout);
+
+ await info.Endpoint.WaitForConnectionAsync(linkedSource.Token).ConfigureAwait(false);
+ }
+ catch
+ {
+ // Only remove the endpoint info if due to some exception
+ // other than cancelling the pruning operation.
+ if (!token.IsCancellationRequested)
+ {
+ _endpointInfos.Remove(info);
+ OnRemovedEndpointInfo(info);
+ _server.RemoveConnection(info.RuntimeInstanceCookie);
+ }
+ }
+ }
+
+ /// <summary>
+ /// Accepts endpoint infos from the reversed diagnostics server.
+ /// </summary>
+ /// <param name="token">The token to monitor for cancellation requests.</param>
+ private async Task ListenAsync(CancellationToken token)
+ {
+ // Continuously accept endpoint infos from the reversed diagnostics server so
+ // that <see cref="ReversedDiagnosticsServer.AcceptAsync(CancellationToken)"/>
+ // is always awaited in order to to handle new runtime instance connections
+ // as well as existing runtime instance reconnections.
+ while (!token.IsCancellationRequested)
+ {
+ try
+ {
+ IpcEndpointInfo info = await _server.AcceptAsync(token).ConfigureAwait(false);
+
+ _ = Task.Run(() => ResumeAndQueueEndpointInfo(info, token), token);
+ }
+ catch (OperationCanceledException)
+ {
+ }
+ }
+ }
+
+ private async Task ResumeAndQueueEndpointInfo(IpcEndpointInfo info, CancellationToken token)
+ {
+ try
+ {
+ // Send ResumeRuntime message for runtime instances that connect to the server. This will allow
+ // those instances that are configured to pause on start to resume after the diagnostics
+ // connection has been made. Instances that are not configured to pause on startup will ignore
+ // the command and return success.
+ var client = new DiagnosticsClient(info.Endpoint);
+ try
+ {
+ client.ResumeRuntime();
+ }
+ catch (ServerErrorException)
+ {
+ // The runtime likely doesn't understand the ResumeRuntime command.
+ }
+
+ await _endpointInfosSemaphore.WaitAsync(token).ConfigureAwait(false);
+ try
+ {
+ _endpointInfos.Add(info);
+
+ OnAddedEndpointInfo(info);
+ }
+ finally
+ {
+ _endpointInfosSemaphore.Release();
+ }
+ }
+ catch (Exception)
+ {
+ _server.RemoveConnection(info.RuntimeInstanceCookie);
+
+ throw;
+ }
+ }
+
+ internal virtual void OnAddedEndpointInfo(IpcEndpointInfo info)
+ {
+ }
+
+ internal virtual void OnRemovedEndpointInfo(IpcEndpointInfo info)
+ {
+ }
+
+ private void VerifyNotDisposed()
+ {
+ if (_disposed)
+ {
+ throw new ObjectDisposedException(nameof(ServerEndpointInfoSource));
+ }
+ }
+
+ private class EndpointInfo : IEndpointInfo
+ {
+ private readonly IpcEndpointInfo _info;
+
+ public EndpointInfo(IpcEndpointInfo info)
+ {
+ _info = info;
+ }
+
+ public IpcEndpoint Endpoint => _info.Endpoint;
+
+ public int ProcessId => _info.ProcessId;
+
+ public Guid RuntimeInstanceCookie => _info.RuntimeInstanceCookie;
+ }
+ }
+}
--- /dev/null
+// Licensed to the .NET Foundation under one or more agreements.
+// The .NET Foundation licenses this file to you under the MIT license.
+// See the LICENSE file in the project root for more information.
+
+using Microsoft.Diagnostics.NETCore.Client;
+using Microsoft.Extensions.DependencyInjection;
+
+namespace Microsoft.Diagnostics.Monitoring
+{
+ public static class ServiceCollectionExtensions
+ {
+ public static IServiceCollection AddEndpointInfoSource(this IServiceCollection services, string reversedServerAddress, int? maxConnections = null)
+ {
+ if (string.IsNullOrWhiteSpace(reversedServerAddress))
+ {
+ return services.AddSingleton<IEndpointInfoSource, ClientEndpointInfoSource>();
+ }
+ else
+ {
+ // Construct the source now rather than delayed construction
+ // in order to be able to accept diagnostics connections immediately.
+ var serverSource = new ServerEndpointInfoSource(reversedServerAddress);
+ serverSource.Listen(maxConnections.GetValueOrDefault(ReversedDiagnosticsServer.MaxAllowedConnections));
+
+ return services.AddSingleton<IEndpointInfoSource>(serverSource);
+ }
+ }
+ }
+}
using System.Linq;
using System.Runtime.InteropServices;
using System.Text.RegularExpressions;
+using System.Threading;
+using System.Threading.Tasks;
namespace Microsoft.Diagnostics.NETCore.Client
{
/// </summary>
public sealed class DiagnosticsClient
{
- private readonly int _processId;
+ private readonly IpcEndpoint _endpoint;
- public DiagnosticsClient(int processId)
+ public DiagnosticsClient(int processId) :
+ this(new PidIpcEndpoint(processId))
{
- _processId = processId;
+ }
+
+ internal DiagnosticsClient(IpcEndpoint endpoint)
+ {
+ _endpoint = endpoint;
}
/// <summary>
- /// Checks that the client is able to communicate with target process over diagnostic transport.
+ /// Wait for an available diagnostic endpoint to the runtime instance.
/// </summary>
+ /// <param name="token">The token to monitor for cancellation requests.</param>
/// <returns>
- /// True if client is able to communicate with target process; otherwise, false.
+ /// A task the completes when a diagnostic endpoint to the runtime instance becomes available.
/// </returns>
- public bool CheckTransport()
+ internal Task WaitForConnectionAsync(CancellationToken token)
{
- return IpcClient.CheckTransport(_processId);
+ return _endpoint.WaitForConnectionAsync(token);
}
/// <summary>
/// </returns>
public EventPipeSession StartEventPipeSession(IEnumerable<EventPipeProvider> providers, bool requestRundown=true, int circularBufferMB=256)
{
- return new EventPipeSession(_processId, providers, requestRundown, circularBufferMB);
+ return new EventPipeSession(_endpoint, providers, requestRundown, circularBufferMB);
}
/// <summary>
/// </returns>
public EventPipeSession StartEventPipeSession(EventPipeProvider provider, bool requestRundown=true, int circularBufferMB=256)
{
- return new EventPipeSession(_processId, new[] { provider }, requestRundown, circularBufferMB);
+ return new EventPipeSession(_endpoint, new[] { provider }, requestRundown, circularBufferMB);
}
/// <summary>
byte[] payload = SerializeCoreDump(dumpPath, dumpType, logDumpGeneration);
IpcMessage message = new IpcMessage(DiagnosticsServerCommandSet.Dump, (byte)DumpCommandId.GenerateCoreDump, payload);
- IpcMessage response = IpcClient.SendMessage(_processId, message);
- switch ((DiagnosticsServerCommandId)response.Header.CommandId)
+ IpcMessage response = IpcClient.SendMessage(_endpoint, message);
+ switch ((DiagnosticsServerResponseId)response.Header.CommandId)
{
- case DiagnosticsServerCommandId.Error:
+ case DiagnosticsServerResponseId.Error:
uint hr = BitConverter.ToUInt32(response.Payload, 0);
if (hr == (uint)DiagnosticsIpcError.UnknownCommand) {
throw new PlatformNotSupportedException($"Unsupported operating system: {RuntimeInformation.OSDescription}");
}
throw new ServerErrorException($"Writing dump failed (HRESULT: 0x{hr:X8})");
- case DiagnosticsServerCommandId.OK:
+ case DiagnosticsServerResponseId.OK:
return;
default:
throw new ServerErrorException($"Writing dump failed - server responded with unknown command");
byte[] serializedConfiguration = SerializeProfilerAttach((uint)attachTimeout.TotalSeconds, profilerGuid, profilerPath, additionalData);
var message = new IpcMessage(DiagnosticsServerCommandSet.Profiler, (byte)ProfilerCommandId.AttachProfiler, serializedConfiguration);
- var response = IpcClient.SendMessage(_processId, message);
- switch ((DiagnosticsServerCommandId)response.Header.CommandId)
+ var response = IpcClient.SendMessage(_endpoint, message);
+ switch ((DiagnosticsServerResponseId)response.Header.CommandId)
{
- case DiagnosticsServerCommandId.Error:
+ case DiagnosticsServerResponseId.Error:
var hr = BitConverter.ToInt32(response.Payload, 0);
throw new ServerErrorException($"Profiler attach failed (HRESULT: 0x{hr:X8})");
- case DiagnosticsServerCommandId.OK:
+ case DiagnosticsServerResponseId.OK:
return;
default:
throw new ServerErrorException($"Profiler attach failed - server responded with unknown command");
// runtime timeout or respect attachTimeout as one total duration.
}
+ internal void ResumeRuntime()
+ {
+ IpcMessage message = new IpcMessage(DiagnosticsServerCommandSet.Process, (byte)ProcessCommandId.ResumeRuntime);
+ var response = IpcClient.SendMessage(_endpoint, message);
+ switch ((DiagnosticsServerResponseId)response.Header.CommandId)
+ {
+ case DiagnosticsServerResponseId.Error:
+ // Try fallback for Preview 7 and Preview 8
+ ResumeRuntimeFallback();
+ //var hr = BitConverter.ToInt32(response.Payload, 0);
+ //throw new ServerErrorException($"Resume runtime failed (HRESULT: 0x{hr:X8})");
+ return;
+ case DiagnosticsServerResponseId.OK:
+ return;
+ default:
+ throw new ServerErrorException($"Resume runtime failed - server responded with unknown command");
+ }
+ }
+
+ // Fallback command for .NET 5 Preview 7 and Preview 8
+ internal void ResumeRuntimeFallback()
+ {
+ IpcMessage message = new IpcMessage(DiagnosticsServerCommandSet.Server, (byte)DiagnosticServerCommandId.ResumeRuntime);
+ var response = IpcClient.SendMessage(_endpoint, message);
+ switch ((DiagnosticsServerResponseId)response.Header.CommandId)
+ {
+ case DiagnosticsServerResponseId.Error:
+ var hr = BitConverter.ToInt32(response.Payload, 0);
+ throw new ServerErrorException($"Resume runtime failed (HRESULT: 0x{hr:X8})");
+ case DiagnosticsServerResponseId.OK:
+ return;
+ default:
+ throw new ServerErrorException($"Resume runtime failed - server responded with unknown command");
+ }
+ }
+
/// <summary>
/// Get all the active processes that can be attached to.
/// </summary>
/// </returns>
public static IEnumerable<int> GetPublishedProcesses()
{
- return Directory.GetFiles(IpcClient.IpcRootPath)
+ return Directory.GetFiles(PidIpcEndpoint.IpcRootPath)
.Select(namedPipe => (new FileInfo(namedPipe)).Name)
- .Where(input => Regex.IsMatch(input, IpcClient.DiagnosticsPortPattern))
- .Select(input => int.Parse(Regex.Match(input, IpcClient.DiagnosticsPortPattern).Groups[1].Value, NumberStyles.Integer))
+ .Where(input => Regex.IsMatch(input, PidIpcEndpoint.DiagnosticsPortPattern))
+ .Select(input => int.Parse(Regex.Match(input, PidIpcEndpoint.DiagnosticsPortPattern).Groups[1].Value, NumberStyles.Integer))
.Distinct();
}
private bool _requestRundown;
private int _circularBufferMB;
private long _sessionId;
- private int _processId;
+ private IpcEndpoint _endpoint;
private bool disposedValue = false; // To detect redundant calls
- internal EventPipeSession(int processId, IEnumerable<EventPipeProvider> providers, bool requestRundown, int circularBufferMB)
+ internal EventPipeSession(IpcEndpoint endpoint, IEnumerable<EventPipeProvider> providers, bool requestRundown, int circularBufferMB)
{
- _processId = processId;
+ _endpoint = endpoint;
_providers = providers;
_requestRundown = requestRundown;
_circularBufferMB = circularBufferMB;
var config = new EventPipeSessionConfiguration(circularBufferMB, EventPipeSerializationFormat.NetTrace, providers, requestRundown);
var message = new IpcMessage(DiagnosticsServerCommandSet.EventPipe, (byte)EventPipeCommandId.CollectTracing2, config.SerializeV2());
- EventStream = IpcClient.SendMessage(processId, message, out var response);
- switch ((DiagnosticsServerCommandId)response.Header.CommandId)
+ EventStream = IpcClient.SendMessage(endpoint, message, out var response);
+ switch ((DiagnosticsServerResponseId)response.Header.CommandId)
{
- case DiagnosticsServerCommandId.OK:
+ case DiagnosticsServerResponseId.OK:
_sessionId = BitConverter.ToInt64(response.Payload, 0);
break;
- case DiagnosticsServerCommandId.Error:
+ case DiagnosticsServerResponseId.Error:
var hr = BitConverter.ToInt32(response.Payload, 0);
throw new ServerErrorException($"EventPipe session start failed (HRESULT: 0x{hr:X8})");
default:
Debug.Assert(_sessionId > 0);
byte[] payload = BitConverter.GetBytes(_sessionId);
- var response = IpcClient.SendMessage(_processId, new IpcMessage(DiagnosticsServerCommandSet.EventPipe, (byte)EventPipeCommandId.StopTracing, payload));
+ var response = IpcClient.SendMessage(_endpoint, new IpcMessage(DiagnosticsServerCommandSet.EventPipe, (byte)EventPipeCommandId.StopTracing, payload));
- switch ((DiagnosticsServerCommandId)response.Header.CommandId)
+ switch ((DiagnosticsServerResponseId)response.Header.CommandId)
{
- case DiagnosticsServerCommandId.OK:
+ case DiagnosticsServerResponseId.OK:
return;
- case DiagnosticsServerCommandId.Error:
+ case DiagnosticsServerResponseId.Error:
var hr = BitConverter.ToInt32(response.Payload, 0);
throw new ServerErrorException($"EventPipe session stop failed (HRESULT: 0x{hr:X8})");
default:
--- /dev/null
+// Licensed to the .NET Foundation under one or more agreements.
+// The .NET Foundation licenses this file to you under the MIT license.
+// See the LICENSE file in the project root for more information.
+
+using System.Net.Sockets;
+
+namespace Microsoft.Diagnostics.NETCore.Client
+{
+ internal sealed class ExposedSocketNetworkStream :
+ NetworkStream
+ {
+ public ExposedSocketNetworkStream(Socket socket, bool ownsSocket)
+ : base(socket, ownsSocket)
+ {
+ }
+
+ public new Socket Socket => base.Socket;
+ }
+}
--- /dev/null
+// Licensed to the .NET Foundation under one or more agreements.
+// The .NET Foundation licenses this file to you under the MIT license.
+// See the LICENSE file in the project root for more information.
+
+using System;
+using System.Linq;
+using System.IO;
+using System.Text;
+using System.Threading;
+using System.Threading.Tasks;
+
+namespace Microsoft.Diagnostics.NETCore.Client
+{
+ /**
+ * ==ADVERTISE PROTOCOL==
+ * Before standard IPC Protocol communication can occur on a client-mode connection
+ * the runtime must advertise itself over the connection. ALL SUBSEQUENT COMMUNICATION
+ * IS STANDARD DIAGNOSTICS IPC PROTOCOL COMMUNICATION.
+ *
+ * The flow for Advertise is a one-way burst of 34 bytes consisting of
+ * 8 bytes - "ADVR_V1\0" (ASCII chars + null byte)
+ * 16 bytes - CLR Instance Cookie (little-endian)
+ * 8 bytes - PID (little-endian)
+ * 2 bytes - future
+ */
+
+ internal sealed class IpcAdvertise
+ {
+ private static byte[] Magic_V1 => Encoding.ASCII.GetBytes("ADVR_V1" + '\0');
+ private static readonly int IpcAdvertiseV1SizeInBytes = Magic_V1.Length + 16 + 8 + 2; // 34 bytes
+
+ private IpcAdvertise(byte[] magic, Guid cookie, UInt64 pid, UInt16 future)
+ {
+ Future = future;
+ Magic = magic;
+ ProcessId = pid;
+ RuntimeInstanceCookie = cookie;
+ }
+
+ public static async Task<IpcAdvertise> ParseAsync(Stream stream, CancellationToken token)
+ {
+ byte[] buffer = new byte[IpcAdvertiseV1SizeInBytes];
+
+ int totalRead = 0;
+ do
+ {
+ int read = await stream.ReadAsync(buffer, totalRead, buffer.Length - totalRead, token).ConfigureAwait(false);
+ if (0 == read)
+ {
+ throw new EndOfStreamException();
+ }
+ totalRead += read;
+ }
+ while (totalRead < buffer.Length);
+
+ int index = 0;
+ byte[] magic = new byte[Magic_V1.Length];
+ Array.Copy(buffer, magic, Magic_V1.Length);
+ index += Magic_V1.Length;
+
+ if (!Magic_V1.SequenceEqual(magic))
+ {
+ throw new Exception("Invalid advertise message from client connection");
+ }
+
+ byte[] cookieBuffer = new byte[16];
+ Array.Copy(buffer, index, cookieBuffer, 0, 16);
+ Guid cookie = new Guid(cookieBuffer);
+ index += 16;
+
+ UInt64 pid = BitConverter.ToUInt64(buffer, index);
+ index += 8;
+
+ UInt16 future = BitConverter.ToUInt16(buffer, index);
+ index += 2;
+
+ // FUTURE: switch on incoming magic and change if version ever increments
+ return new IpcAdvertise(magic, cookie, pid, future);
+ }
+
+ public override string ToString()
+ {
+ return $"{{ Magic={Magic}; ClrInstanceId={RuntimeInstanceCookie}; ProcessId={ProcessId}; Future={Future} }}";
+ }
+
+ private UInt16 Future { get; } = 0;
+ public byte[] Magic { get; } = Magic_V1;
+ public UInt64 ProcessId { get; } = 0;
+ public Guid RuntimeInstanceCookie { get; } = Guid.Empty;
+ }
+}
// See the LICENSE file in the project root for more information.
using System;
-using System.Diagnostics;
using System.IO;
-using System.IO.Pipes;
-using System.Linq;
-using System.Net;
-using System.Net.Sockets;
-using System.Runtime.InteropServices;
-using System.Security.Principal;
+using System.Threading;
namespace Microsoft.Diagnostics.NETCore.Client
{
internal class IpcClient
{
- public static string IpcRootPath { get; } = RuntimeInformation.IsOSPlatform(OSPlatform.Windows) ? @"\\.\pipe\" : Path.GetTempPath();
- public static string DiagnosticsPortPattern { get; } = RuntimeInformation.IsOSPlatform(OSPlatform.Windows) ? @"^dotnet-diagnostic-(\d+)$" : @"^dotnet-diagnostic-(\d+)-(\d+)-socket$";
-
- private static double ConnectTimeoutMilliseconds { get; } = TimeSpan.FromSeconds(3).TotalMilliseconds;
-
- /// <summary>
- /// Get the OS Transport to be used for communicating with a dotnet process.
- /// </summary>
- /// <param name="processId">The PID of the dotnet process to get the transport for</param>
- /// <returns>A System.IO.Stream wrapper around the transport</returns>
- private static Stream GetTransport(int processId)
- {
- try
- {
- var process = Process.GetProcessById(processId);
- }
- catch (System.ArgumentException)
- {
- throw new ServerNotAvailableException($"Process {processId} is not running.");
- }
- catch (System.InvalidOperationException)
- {
- throw new ServerNotAvailableException($"Process {processId} seems to be elevated.");
- }
-
- if (RuntimeInformation.IsOSPlatform(OSPlatform.Windows))
- {
- string pipeName = $"dotnet-diagnostic-{processId}";
- var namedPipe = new NamedPipeClientStream(
- ".", pipeName, PipeDirection.InOut, PipeOptions.None, TokenImpersonationLevel.Impersonation);
- namedPipe.Connect((int)ConnectTimeoutMilliseconds);
- return namedPipe;
- }
- else
- {
- string ipcPort;
- try
- {
- ipcPort = Directory.GetFiles(IpcRootPath, $"dotnet-diagnostic-{processId}-*-socket") // Try best match.
- .OrderByDescending(f => new FileInfo(f).LastWriteTime)
- .FirstOrDefault();
- if (ipcPort == null)
- {
- throw new ServerNotAvailableException($"Process {processId} not running compatible .NET Core runtime.");
- }
- }
- catch (InvalidOperationException)
- {
- throw new ServerNotAvailableException($"Process {processId} not running compatible .NET Core runtime.");
- }
- string path = Path.Combine(IpcRootPath, ipcPort);
- var remoteEP = CreateUnixDomainSocketEndPoint(path);
-
- var socket = new Socket(AddressFamily.Unix, SocketType.Stream, ProtocolType.Unspecified);
- socket.Connect(remoteEP);
- return new NetworkStream(socket, ownsSocket: true);
- }
- }
-
- /// <summary>
- /// Checks that the client is able to communicate with target process over diagnostic transport.
- /// </summary>
- /// <returns>
- /// True if client is able to communicate with target process; otherwise, false.
- /// </returns>
- public static bool CheckTransport(int processId)
- {
- try
- {
- using var stream = GetTransport(processId);
- return null != stream;
- }
- catch (Exception)
- {
- return false;
- }
- }
+ // The amount of time to wait for a stream to be available for consumption by the Connect method.
+ private static readonly TimeSpan ConnectTimeout = TimeSpan.FromSeconds(3);
/// <summary>
/// Sends a single DiagnosticsIpc Message to the dotnet process with PID processId.
/// </summary>
- /// <param name="processId">The PID of the dotnet process</param>
+ /// <param name="endpoint">An endpoint that provides a diagnostics connection to a runtime instance.</param>
/// <param name="message">The DiagnosticsIpc Message to be sent</param>
/// <returns>The response DiagnosticsIpc Message from the dotnet process</returns>
- public static IpcMessage SendMessage(int processId, IpcMessage message)
+ public static IpcMessage SendMessage(IpcEndpoint endpoint, IpcMessage message)
{
- using (var stream = GetTransport(processId))
+ using (var stream = endpoint.Connect(ConnectTimeout))
{
Write(stream, message);
return Read(stream);
/// Sends a single DiagnosticsIpc Message to the dotnet process with PID processId
/// and returns the Stream for reuse in Optional Continuations.
/// </summary>
- /// <param name="processId">The PID of the dotnet process</param>
+ /// <param name="endpoint">An endpoint that provides a diagnostics connection to a runtime instance.</param>
/// <param name="message">The DiagnosticsIpc Message to be sent</param>
/// <param name="response">out var for response message</param>
/// <returns>The response DiagnosticsIpc Message from the dotnet process</returns>
- public static Stream SendMessage(int processId, IpcMessage message, out IpcMessage response)
+ public static Stream SendMessage(IpcEndpoint endpoint, IpcMessage message, out IpcMessage response)
{
- var stream = GetTransport(processId);
+ var stream = endpoint.Connect(ConnectTimeout);
Write(stream, message);
response = Read(stream);
return stream;
{
return IpcMessage.Parse(stream);
}
-
- private static EndPoint CreateUnixDomainSocketEndPoint(string path)
- {
-#if NETCOREAPP
- return new UnixDomainSocketEndPoint(path);
-#elif NETSTANDARD2_0
- // UnixDomainSocketEndPoint is not part of .NET Standard 2.0
- var type = typeof(Socket).Assembly.GetType("System.Net.Sockets.UnixDomainSocketEndPoint")
- ?? Type.GetType("System.Net.Sockets.UnixDomainSocketEndPoint, System.Core");
- if (type == null)
- {
- throw new PlatformNotSupportedException("Current process is not running a compatible .NET Core runtime.");
- }
- var ctor = type.GetConstructor(new[] { typeof(string) });
- return (EndPoint)ctor.Invoke(new object[] { path });
-#endif
- }
}
}
Dump = 0x01,
EventPipe = 0x02,
Profiler = 0x03,
+ Process = 0x04,
Server = 0xFF,
}
- internal enum DiagnosticsServerCommandId : byte
+ // For .NET 5 Preview 7 and Preview 8, use this with the
+ // DiagnosticsServerCommandSet.Server command set.
+ // For .NET 5 RC and later, use ProcessCommandId.ResumeRuntime with
+ // the DiagnosticsServerCommandSet.Process command set.
+ internal enum DiagnosticServerCommandId : byte
{
- OK = 0x00,
- Error = 0xFF,
+ // 0x00 used in DiagnosticServerResponseId
+ ResumeRuntime = 0x01,
+ // 0xFF used DiagnosticServerResponseId
+ };
+
+ internal enum DiagnosticsServerResponseId : byte
+ {
+ OK = 0x00,
+ // future
+ Error = 0xFF,
}
internal enum EventPipeCommandId : byte
{
AttachProfiler = 0x01,
}
+
+ internal enum ProcessCommandId : byte
+ {
+ GetProcessInfo = 0x00,
+ ResumeRuntime = 0x01
+ }
}
public IpcMessage()
{ }
- public IpcMessage(IpcHeader header, byte[] payload)
+ public IpcMessage(IpcHeader header, byte[] payload = null)
{
- Payload = payload;
+ Payload = payload ?? Array.Empty<byte>();
Header = header;
}
{
byte[] serializedData = null;
// Verify things will fit in the size capacity
- Header.Size = checked((UInt16)(IpcHeader.HeaderSizeInBytes + Payload.Length)); ;
+ Header.Size = checked((UInt16)(IpcHeader.HeaderSizeInBytes + Payload.Length));
byte[] headerBytes = Header.Serialize();
using (var stream = new MemoryStream())
--- /dev/null
+// Licensed to the .NET Foundation under one or more agreements.
+// The .NET Foundation licenses this file to you under the MIT license.
+// See the LICENSE file in the project root for more information.
+
+using System;
+using System.IO;
+using System.IO.Pipes;
+using System.Net.Sockets;
+using System.Runtime.InteropServices;
+using System.Threading;
+using System.Threading.Tasks;
+
+namespace Microsoft.Diagnostics.NETCore.Client
+{
+ internal abstract class IpcServerTransport : IDisposable
+ {
+ private bool _disposed;
+
+ public static IpcServerTransport Create(string transportPath, int maxConnections)
+ {
+ if (RuntimeInformation.IsOSPlatform(OSPlatform.Windows))
+ {
+ return new WindowsPipeServerTransport(transportPath, maxConnections);
+ }
+ else
+ {
+ return new UnixDomainSocketServerTransport(transportPath, maxConnections);
+ }
+ }
+
+ public void Dispose()
+ {
+ if (!_disposed)
+ {
+ Dispose(disposing: true);
+
+ _disposed = true;
+ }
+ }
+
+ protected virtual void Dispose(bool disposing)
+ {
+ }
+
+ public abstract Task<Stream> AcceptAsync(CancellationToken token);
+
+ public static int MaxAllowedConnections
+ {
+ get
+ {
+ if (RuntimeInformation.IsOSPlatform(OSPlatform.Windows))
+ {
+ return NamedPipeServerStream.MaxAllowedServerInstances;
+ }
+ else
+ {
+ return (int)SocketOptionName.MaxConnections;
+ }
+ }
+ }
+
+ protected void VerifyNotDisposed()
+ {
+ if (_disposed)
+ {
+ throw new ObjectDisposedException(this.GetType().Name);
+ }
+ }
+ }
+
+ internal sealed class WindowsPipeServerTransport : IpcServerTransport
+ {
+ private const string PipePrefix = @"\\.\pipe\";
+
+ private NamedPipeServerStream _stream;
+
+ private readonly CancellationTokenSource _cancellation = new CancellationTokenSource();
+ private readonly string _pipeName;
+ private readonly int _maxInstances;
+
+ public WindowsPipeServerTransport(string pipeName, int maxInstances)
+ {
+ _maxInstances = maxInstances;
+ _pipeName = pipeName.StartsWith(PipePrefix) ? pipeName.Substring(PipePrefix.Length) : pipeName;
+ CreateNewPipeServer();
+ }
+
+ protected override void Dispose(bool disposing)
+ {
+ if (disposing)
+ {
+ _cancellation.Cancel();
+
+ _stream.Dispose();
+
+ _cancellation.Dispose();
+ }
+ }
+
+ public override async Task<Stream> AcceptAsync(CancellationToken token)
+ {
+ VerifyNotDisposed();
+
+ using var linkedSource = CancellationTokenSource.CreateLinkedTokenSource(token, _cancellation.Token);
+
+ NamedPipeServerStream connectedStream;
+ try
+ {
+ await _stream.WaitForConnectionAsync(linkedSource.Token).ConfigureAwait(false);
+
+ connectedStream = _stream;
+ }
+ finally
+ {
+ if (!_cancellation.IsCancellationRequested)
+ {
+ CreateNewPipeServer();
+ }
+ }
+ return connectedStream;
+ }
+
+ private void CreateNewPipeServer()
+ {
+ _stream = new NamedPipeServerStream(
+ _pipeName,
+ PipeDirection.InOut,
+ _maxInstances,
+ PipeTransmissionMode.Byte,
+ PipeOptions.Asynchronous);
+ }
+ }
+
+ internal sealed class UnixDomainSocketServerTransport : IpcServerTransport
+ {
+ private readonly CancellationTokenSource _cancellation = new CancellationTokenSource();
+ private readonly int _backlog;
+ private readonly string _path;
+
+ private UnixDomainSocket _socket;
+
+ public UnixDomainSocketServerTransport(string path, int backlog)
+ {
+ _backlog = backlog;
+ _path = path;
+
+ CreateNewSocketServer();
+ }
+
+ protected override void Dispose(bool disposing)
+ {
+ if (disposing)
+ {
+ _cancellation.Cancel();
+
+ try
+ {
+ _socket.Shutdown(SocketShutdown.Both);
+ }
+ catch { }
+ finally
+ {
+ _socket.Close(0);
+ }
+ _socket.Dispose();
+
+ _cancellation.Dispose();
+ }
+ }
+
+ public override async Task<Stream> AcceptAsync(CancellationToken token)
+ {
+ VerifyNotDisposed();
+
+ using var linkedSource = CancellationTokenSource.CreateLinkedTokenSource(token, _cancellation.Token);
+ try
+ {
+ Socket socket = await _socket.AcceptAsync(linkedSource.Token).ConfigureAwait(false);
+
+ return new ExposedSocketNetworkStream(socket, ownsSocket: true);
+ }
+ catch (Exception)
+ {
+ // Recreate socket if transport is not disposed.
+ if (!_cancellation.IsCancellationRequested)
+ {
+ CreateNewSocketServer();
+ }
+ throw;
+ }
+ }
+
+ private void CreateNewSocketServer()
+ {
+ _socket = new UnixDomainSocket();
+ _socket.Bind(_path);
+ _socket.Listen(_backlog);
+ _socket.LingerState.Enabled = false;
+ }
+ }
+}
--- /dev/null
+// Licensed to the .NET Foundation under one or more agreements.
+// The .NET Foundation licenses this file to you under the MIT license.
+// See the LICENSE file in the project root for more information.
+
+using System;
+using System.Diagnostics;
+using System.IO;
+using System.IO.Pipes;
+using System.Linq;
+using System.Runtime.InteropServices;
+using System.Security.Principal;
+using System.Threading;
+using System.Threading.Tasks;
+
+namespace Microsoft.Diagnostics.NETCore.Client
+{
+ internal abstract class IpcEndpoint
+ {
+ /// <summary>
+ /// Connects to the underlying IPC transport and opens a read/write-able Stream
+ /// </summary>
+ /// <param name="timeout">The amount of time to block attempting to connect</param>
+ /// <returns>A Stream for writing and reading data to and from the target .NET process</returns>
+ public abstract Stream Connect(TimeSpan timeout);
+
+ /// <summary>
+ /// Wait for an available diagnostic endpoint to the runtime instance.
+ /// </summary>
+ /// <param name="token">The token to monitor for cancellation requests.</param>
+ /// <returns>
+ /// A task the completes when a diagnostic endpoint to the runtime instance becomes available.
+ /// </returns>
+ public abstract Task WaitForConnectionAsync(CancellationToken token);
+ }
+
+ internal class ServerIpcEndpoint : IpcEndpoint
+ {
+ private readonly Guid _runtimeId;
+ private readonly ReversedDiagnosticsServer _server;
+
+ public ServerIpcEndpoint(ReversedDiagnosticsServer server, Guid runtimeId)
+ {
+ _runtimeId = runtimeId;
+ _server = server;
+ }
+
+ /// <remarks>
+ /// This will block until the diagnostic stream is provided. This block can happen if
+ /// the stream is acquired previously and the runtime instance has not yet reconnected
+ /// to the reversed diagnostics server.
+ /// </remarks>
+ public override Stream Connect(TimeSpan timeout)
+ {
+ return _server.Connect(_runtimeId, timeout);
+ }
+
+ public override async Task WaitForConnectionAsync(CancellationToken token)
+ {
+ await _server.WaitForConnectionAsync(_runtimeId, token).ConfigureAwait(false);
+ }
+
+ public override bool Equals(object obj)
+ {
+ return Equals(obj as ServerIpcEndpoint);
+ }
+
+ public bool Equals(ServerIpcEndpoint other)
+ {
+ return other != null && other._runtimeId == _runtimeId && other._server == _server;
+ }
+
+ public override int GetHashCode()
+ {
+ return _runtimeId.GetHashCode() ^ _server.GetHashCode();
+ }
+ }
+
+ internal class PidIpcEndpoint : IpcEndpoint
+ {
+ public static string IpcRootPath { get; } = RuntimeInformation.IsOSPlatform(OSPlatform.Windows) ? @"\\.\pipe\" : Path.GetTempPath();
+ public static string DiagnosticsPortPattern { get; } = RuntimeInformation.IsOSPlatform(OSPlatform.Windows) ? @"^dotnet-diagnostic-(\d+)$" : @"^dotnet-diagnostic-(\d+)-(\d+)-socket$";
+
+ private int _pid;
+
+ /// <summary>
+ /// Creates a reference to a .NET process's IPC Transport
+ /// using the default rules for a given pid
+ /// </summary>
+ /// <param name="pid">The pid of the target process</param>
+ /// <returns>A reference to the IPC Transport</returns>
+ public PidIpcEndpoint(int pid)
+ {
+ _pid = pid;
+ }
+
+ public override Stream Connect(TimeSpan timeout)
+ {
+ string address = GetDefaultAddress();
+ if (RuntimeInformation.IsOSPlatform(OSPlatform.Windows))
+ {
+ var namedPipe = new NamedPipeClientStream(
+ ".",
+ address,
+ PipeDirection.InOut,
+ PipeOptions.None,
+ TokenImpersonationLevel.Impersonation);
+ namedPipe.Connect((int)timeout.TotalMilliseconds);
+ return namedPipe;
+ }
+ else
+ {
+ var socket = new UnixDomainSocket();
+ socket.Connect(Path.Combine(IpcRootPath, address), timeout);
+ return new ExposedSocketNetworkStream(socket, ownsSocket: true);
+ }
+ }
+
+ public override async Task WaitForConnectionAsync(CancellationToken token)
+ {
+ using var _ = await ConnectStreamAsync(token).ConfigureAwait(false);
+ }
+
+ async Task<Stream> ConnectStreamAsync(CancellationToken token)
+ {
+ string address = GetDefaultAddress();
+ if (RuntimeInformation.IsOSPlatform(OSPlatform.Windows))
+ {
+ var namedPipe = new NamedPipeClientStream(
+ ".",
+ address,
+ PipeDirection.InOut,
+ PipeOptions.None,
+ TokenImpersonationLevel.Impersonation);
+ await namedPipe.ConnectAsync(token).ConfigureAwait(false);
+ return namedPipe;
+ }
+ else
+ {
+ var socket = new UnixDomainSocket();
+ await socket.ConnectAsync(Path.Combine(IpcRootPath, address), token).ConfigureAwait(false);
+ return new ExposedSocketNetworkStream(socket, ownsSocket: true);
+ }
+ }
+
+ private string GetDefaultAddress()
+ {
+ try
+ {
+ var process = Process.GetProcessById(_pid);
+ }
+ catch (ArgumentException)
+ {
+ throw new ServerNotAvailableException($"Process {_pid} is not running.");
+ }
+ catch (InvalidOperationException)
+ {
+ throw new ServerNotAvailableException($"Process {_pid} seems to be elevated.");
+ }
+
+ if (!TryGetDefaultAddress(_pid, out string transportName))
+ {
+ throw new ServerNotAvailableException($"Process {_pid} not running compatible .NET runtime.");
+ }
+
+ return transportName;
+ }
+
+ private static bool TryGetDefaultAddress(int pid, out string defaultAddress)
+ {
+ defaultAddress = null;
+
+ if (RuntimeInformation.IsOSPlatform(OSPlatform.Windows))
+ {
+ defaultAddress = $"dotnet-diagnostic-{pid}";
+ }
+ else
+ {
+ try
+ {
+ defaultAddress = Directory.GetFiles(IpcRootPath, $"dotnet-diagnostic-{pid}-*-socket") // Try best match.
+ .OrderByDescending(f => new FileInfo(f).LastWriteTime)
+ .FirstOrDefault();
+ }
+ catch (InvalidOperationException)
+ {
+ }
+ }
+
+ return !string.IsNullOrEmpty(defaultAddress);
+ }
+
+ public override bool Equals(object obj)
+ {
+ return Equals(obj as PidIpcEndpoint);
+ }
+
+ public bool Equals(PidIpcEndpoint other)
+ {
+ return other != null && other._pid == _pid;
+ }
+
+ public override int GetHashCode()
+ {
+ return _pid.GetHashCode();
+ }
+ }
+}
\ No newline at end of file
--- /dev/null
+// Licensed to the .NET Foundation under one or more agreements.
+// The .NET Foundation licenses this file to you under the MIT license.
+// See the LICENSE file in the project root for more information.
+
+using System;
+using System.Diagnostics;
+using System.IO;
+using System.Net;
+using System.Net.Sockets;
+using System.Threading;
+using System.Threading.Tasks;
+
+namespace Microsoft.Diagnostics.NETCore.Client
+{
+ internal sealed class UnixDomainSocket : Socket
+ {
+ private bool _ownsSocketFile;
+ private string _path;
+
+ public UnixDomainSocket() :
+ base(AddressFamily.Unix, SocketType.Stream, ProtocolType.Unspecified)
+ {
+ }
+
+ public async Task<Socket> AcceptAsync(CancellationToken token)
+ {
+ using (token.Register(() => Close(0)))
+ {
+ try
+ {
+ return await Task.Factory.FromAsync(BeginAccept, EndAccept, this).ConfigureAwait(false);
+ }
+ // When the socket is closed, the FromAsync logic will try to call EndAccept on the socket,
+ // but that will throw an ObjectDisposedException. Only catch the exception if due to cancellation.
+ catch (ObjectDisposedException) when (token.IsCancellationRequested)
+ {
+ // First check if the cancellation token caused the closing of the socket,
+ // then rethrow the exception if it did not.
+ token.ThrowIfCancellationRequested();
+
+ Debug.Fail("Token should have thrown cancellation exception.");
+ return null;
+ }
+ }
+ }
+
+ public void Bind(string path)
+ {
+ Bind(CreateUnixDomainSocketEndPoint(path));
+
+ _ownsSocketFile = true;
+ _path = path;
+ }
+
+ public void Connect(string path, TimeSpan timeout)
+ {
+ IAsyncResult result = BeginConnect(CreateUnixDomainSocketEndPoint(path), null, null);
+
+ if (result.AsyncWaitHandle.WaitOne(timeout))
+ {
+ EndConnect(result);
+
+ _ownsSocketFile = false;
+ _path = path;
+ }
+ else
+ {
+ Close(0);
+ throw new TimeoutException();
+ }
+ }
+
+ public async Task ConnectAsync(string path, CancellationToken token)
+ {
+ using (token.Register(() => Close(0)))
+ {
+ try
+ {
+ Func<AsyncCallback, object, IAsyncResult> beginConnect = (callback, state) =>
+ {
+ return BeginConnect(CreateUnixDomainSocketEndPoint(path), callback, state);
+ };
+ await Task.Factory.FromAsync(beginConnect, EndConnect, this).ConfigureAwait(false);
+ }
+ // When the socket is closed, the FromAsync logic will try to call EndAccept on the socket,
+ // but that will throw an ObjectDisposedException. Only catch the exception if due to cancellation.
+ catch (ObjectDisposedException) when (token.IsCancellationRequested)
+ {
+ // First check if the cancellation token caused the closing of the socket,
+ // then rethrow the exception if it did not.
+ token.ThrowIfCancellationRequested();
+ }
+ }
+ }
+
+ protected override void Dispose(bool disposing)
+ {
+ if (disposing)
+ {
+ if (_ownsSocketFile && !string.IsNullOrEmpty(_path) && File.Exists(_path))
+ {
+ File.Delete(_path);
+ }
+ }
+ base.Dispose(disposing);
+ }
+
+ private static EndPoint CreateUnixDomainSocketEndPoint(string path)
+ {
+#if NETCOREAPP
+ return new UnixDomainSocketEndPoint(path);
+#elif NETSTANDARD2_0
+ // UnixDomainSocketEndPoint is not part of .NET Standard 2.0
+ var type = typeof(Socket).Assembly.GetType("System.Net.Sockets.UnixDomainSocketEndPoint");
+ if (type == null)
+ {
+ throw new PlatformNotSupportedException("Current process is not running a compatible .NET runtime.");
+ }
+ var ctor = type.GetConstructor(new[] { typeof(string) });
+ return (EndPoint)ctor.Invoke(new object[] { path });
+#endif
+ }
+ }
+}
<IncludeSymbols>true</IncludeSymbols>
<IsShipping>true</IsShipping>
</PropertyGroup>
+
+ <ItemGroup>
+ <InternalsVisibleTo Include="DotnetMonitor.UnitTests" />
+ <InternalsVisibleTo Include="Microsoft.Diagnostics.Monitoring" />
+ <InternalsVisibleTo Include="Microsoft.Diagnostics.NETCore.Client.UnitTests" />
+ </ItemGroup>
</Project>
--- /dev/null
+// Licensed to the .NET Foundation under one or more agreements.
+// The .NET Foundation licenses this file to you under the MIT license.
+// See the LICENSE file in the project root for more information.
+
+using System;
+using System.Runtime.InteropServices;
+using Microsoft.Win32.SafeHandles;
+
+namespace Microsoft.Diagnostics.NETCore.Client
+{
+ internal class NativeMethods
+ {
+ [DllImport("kernel32.dll", SetLastError = true)]
+ [return: MarshalAs(UnmanagedType.Bool)]
+ internal static extern bool PeekNamedPipe(
+ SafePipeHandle hNamedPipe,
+ byte[] lpBuffer,
+ int bufferSize,
+ IntPtr lpBytesRead,
+ IntPtr lpTotalBytesAvail,
+ IntPtr lpBytesLeftThisMessage
+ );
+ }
+}
--- /dev/null
+// Licensed to the .NET Foundation under one or more agreements.
+// The .NET Foundation licenses this file to you under the MIT license.
+// See the LICENSE file in the project root for more information.
+
+using System;
+using System.Diagnostics;
+
+namespace Microsoft.Diagnostics.NETCore.Client
+{
+ /// <summary>
+ /// Represents a runtine instance connection to a reversed diagnostics server.
+ /// </summary>
+ [DebuggerDisplay("PID={ProcessId}, Cookie={RuntimeInstanceCookie}")]
+ internal struct IpcEndpointInfo
+ {
+ internal IpcEndpointInfo(IpcEndpoint endpoint, int processId, Guid runtimeInstanceCookie)
+ {
+ Endpoint = endpoint;
+ ProcessId = processId;
+ RuntimeInstanceCookie = runtimeInstanceCookie;
+ }
+
+ /// <summary>
+ /// An endpoint used to retrieve diagnostic information from the associated runtime instance.
+ /// </summary>
+ public IpcEndpoint Endpoint { get; }
+
+ /// <summary>
+ /// The identifier of the process that is unique within its process namespace.
+ /// </summary>
+ public int ProcessId { get; }
+
+ /// <summary>
+ /// The unique identifier of the runtime instance.
+ /// </summary>
+ public Guid RuntimeInstanceCookie { get; }
+ }
+}
--- /dev/null
+// Licensed to the .NET Foundation under one or more agreements.
+// The .NET Foundation licenses this file to you under the MIT license.
+// See the LICENSE file in the project root for more information.
+
+using System;
+using System.Collections.Generic;
+using System.Diagnostics;
+using System.IO;
+using System.IO.Pipes;
+using System.Net.Sockets;
+using System.Runtime.InteropServices;
+using System.Threading;
+using System.Threading.Tasks;
+
+namespace Microsoft.Diagnostics.NETCore.Client
+{
+ /// <summary>
+ /// Establishes server endpoint for runtime instances to connect when
+ /// configured to provide diagnostic endpoints in reverse mode.
+ /// </summary>
+ internal sealed class ReversedDiagnosticsServer : IDisposable
+ {
+ // Returns true if the handler is complete and should be removed from the list
+ delegate bool StreamHandler(Guid runtimeId, ref Stream stream);
+
+ // The amount of time to allow parsing of the advertise data before cancelling. This allows the server to
+ // remain responsive in case the advertise data is incomplete and the stream is not closed.
+ private static readonly TimeSpan ParseAdvertiseTimeout = TimeSpan.FromMilliseconds(250);
+
+ private readonly Dictionary<Guid, ServerIpcEndpoint> _cachedEndpoints = new Dictionary<Guid, ServerIpcEndpoint>();
+ private readonly Dictionary<Guid, Stream> _cachedStreams = new Dictionary<Guid, Stream>();
+ private readonly CancellationTokenSource _disposalSource = new CancellationTokenSource();
+ private readonly List<StreamHandler> _handlers = new List<StreamHandler>();
+ private readonly object _lock = new object();
+ private readonly IpcServerTransport _transport;
+
+ private bool _disposed = false;
+
+ /// <summary>
+ /// Constructs the <see cref="ReversedDiagnosticsServer"/> instance with an endpoint bound
+ /// to the location specified by <paramref name="transportPath"/>.
+ /// </summary>
+ /// <param name="transportPath">
+ /// The path of the server endpoint.
+ /// On Windows, this can be a full pipe path or the name without the "\\.\pipe\" prefix.
+ /// On all other systems, this must be the full file path of the socket.
+ /// </param>
+ public ReversedDiagnosticsServer(string transportPath)
+ : this(transportPath, MaxAllowedConnections)
+ {
+ }
+
+ /// <summary>
+ /// Constructs the <see cref="ReversedDiagnosticsServer"/> instance with an endpoint bound
+ /// to the location specified by <paramref name="transportPath"/>.
+ /// </summary>
+ /// <param name="transportPath">
+ /// The path of the server endpoint.
+ /// On Windows, this can be a full pipe path or the name without the "\\.\pipe\" prefix.
+ /// On all other systems, this must be the full file path of the socket.
+ /// </param>
+ /// <param name="maxConnections">The maximum number of connections the server will support.</param>
+ public ReversedDiagnosticsServer(string transportPath, int maxConnections)
+ {
+ _transport = IpcServerTransport.Create(transportPath, maxConnections);
+ }
+
+ public void Dispose()
+ {
+ if (!_disposed)
+ {
+ _disposalSource.Cancel();
+
+ lock (_lock)
+ {
+ _cachedEndpoints.Clear();
+
+ foreach (Stream stream in _cachedStreams.Values)
+ {
+ stream?.Dispose();
+ }
+ _cachedStreams.Clear();
+ }
+
+ _transport.Dispose();
+
+ _disposalSource.Dispose();
+
+ _disposed = true;
+ }
+ }
+
+ /// <summary>
+ /// Provides endpoint information when a new runtime instance connects to the server.
+ /// </summary>
+ /// <param name="token">The token to monitor for cancellation requests.</param>
+ /// <returns>A <see cref="IpcEndpointInfo"/> that contains information about the new runtime instance connection.</returns>
+ /// <remarks>
+ /// This will only provide endpoint information on the first time a runtime connects to the server.
+ /// If a connection is removed using <see cref="RemoveConnection(Guid)"/> and the same runtime instance,
+ /// reconnects after this call, then a new <see cref="IpcEndpointInfo"/> will be produced.
+ /// </remarks>
+ public async Task<IpcEndpointInfo> AcceptAsync(CancellationToken token)
+ {
+ VerifyNotDisposed();
+
+ while (true)
+ {
+ Stream stream = null;
+ IpcAdvertise advertise = null;
+ try
+ {
+ stream = await _transport.AcceptAsync(token).ConfigureAwait(false);
+ }
+ catch (Exception ex) when (!(ex is OperationCanceledException))
+ {
+ // The advertise data could be incomplete if the runtime shuts down before completely writing
+ // the information. Catch the exception and continue waiting for a new connection.
+ }
+
+ token.ThrowIfCancellationRequested();
+
+ if (null != stream)
+ {
+ // Cancel parsing of advertise data after timeout period to
+ // mitigate runtimes that write partial data and do not close the stream (avoid waiting forever).
+ using var parseCancellationSource = new CancellationTokenSource();
+ using var linkedSource = CancellationTokenSource.CreateLinkedTokenSource(token, parseCancellationSource.Token);
+ try
+ {
+ parseCancellationSource.CancelAfter(ParseAdvertiseTimeout);
+
+ advertise = await IpcAdvertise.ParseAsync(stream, linkedSource.Token).ConfigureAwait(false);
+ }
+ catch (OperationCanceledException) when (parseCancellationSource.IsCancellationRequested)
+ {
+ // Only handle cancellation if it was due to the parse timeout.
+ }
+ catch (Exception ex) when (!(ex is OperationCanceledException))
+ {
+ // Catch all other exceptions and continue waiting for a new connection.
+ }
+ }
+
+ token.ThrowIfCancellationRequested();
+
+ if (null != advertise)
+ {
+ Guid runtimeCookie = advertise.RuntimeInstanceCookie;
+ int pid = unchecked((int)advertise.ProcessId);
+
+ lock (_lock)
+ {
+ ProvideStream(runtimeCookie, stream);
+ // Consumers should hold onto the endpoint info and use it for diagnostic communication,
+ // regardless of the number of times the same runtime instance connects. This requires consumers
+ // to continuously invoke the AcceptAsync method in order to handle runtime instance reconnects,
+ // even if the consumer only wants to handle a single endpoint.
+ if (!_cachedEndpoints.ContainsKey(runtimeCookie))
+ {
+ ServerIpcEndpoint endpoint = new ServerIpcEndpoint(this, runtimeCookie);
+ _cachedEndpoints.Add(runtimeCookie, endpoint);
+ return new IpcEndpointInfo(endpoint, pid, runtimeCookie);
+ }
+ }
+ }
+
+ token.ThrowIfCancellationRequested();
+ }
+ }
+
+ /// <summary>
+ /// Removes endpoint information from the server so that it is no longer tracked.
+ /// </summary>
+ /// <param name="runtimeCookie">The runtime instance cookie that corresponds to the endpoint to be removed.</param>
+ /// <returns>True if the endpoint existed and was removed; otherwise false.</returns>
+ public bool RemoveConnection(Guid runtimeCookie)
+ {
+ VerifyNotDisposed();
+
+ bool endpointExisted = false;
+ Stream previousStream = null;
+
+ lock (_lock)
+ {
+ endpointExisted = _cachedEndpoints.Remove(runtimeCookie);
+ if (endpointExisted)
+ {
+ if (_cachedStreams.TryGetValue(runtimeCookie, out previousStream))
+ {
+ _cachedStreams.Remove(runtimeCookie);
+ }
+ }
+ }
+
+ previousStream?.Dispose();
+
+ return endpointExisted;
+ }
+
+ private void VerifyNotDisposed()
+ {
+ if (_disposed)
+ {
+ throw new ObjectDisposedException(nameof(ReversedDiagnosticsServer));
+ }
+ }
+
+ /// <remarks>
+ /// This will block until the diagnostic stream is provided. This block can happen if
+ /// the stream is acquired previously and the runtime instance has not yet reconnected
+ /// to the reversed diagnostics server.
+ /// </remarks>
+ internal Stream Connect(Guid runtimeId, TimeSpan timeout)
+ {
+ VerifyNotDisposed();
+
+ const int StreamStatePending = 0;
+ const int StreamStateComplete = 1;
+ const int StreamStateCancelled = 2;
+ const int StreamStateDisposed = 3;
+
+ // CancellationTokenSource is used to trigger the timeout path in order to avoid inadvertently consuming
+ // the stream via the handler while processing the timeout after failing to wait for the stream event
+ // to be signaled within the timeout period. The source of truth of whether the stream was consumed or
+ // whether the timeout occurred is captured by the streamState variable.
+ Stream stream = null;
+ int streamState = StreamStatePending;
+ using var streamEvent = new ManualResetEvent(false);
+ var cancellationSource = new CancellationTokenSource();
+
+ bool TrySetStream(int state, Stream value)
+ {
+ if (StreamStatePending == Interlocked.CompareExchange(ref streamState, state, 0))
+ {
+ stream = value;
+ streamEvent.Set();
+ return true;
+ }
+ return false;
+ }
+
+ using var methodRegistration = cancellationSource.Token.Register(() => TrySetStream(StreamStateCancelled, value: null));
+ using var disposalRegistration = _disposalSource.Token.Register(() => TrySetStream(StreamStateDisposed, value: null));
+
+ RegisterHandler(runtimeId, (Guid id, ref Stream cachedStream) =>
+ {
+ if (id != runtimeId)
+ {
+ return false;
+ }
+
+ if (TrySetStream(StreamStateComplete, cachedStream))
+ {
+ cachedStream = null;
+ }
+
+ // Regardless of the registrant previously waiting or cancelled,
+ // the handler should be removed from consideration.
+ return true;
+ });
+
+ cancellationSource.CancelAfter(timeout);
+ streamEvent.WaitOne();
+
+ if (StreamStateCancelled == streamState)
+ {
+ throw new TimeoutException();
+ }
+
+ if (StreamStateDisposed == streamState)
+ {
+ throw new ObjectDisposedException(nameof(ReversedDiagnosticsServer));
+ }
+
+ return stream;
+ }
+
+ internal async Task WaitForConnectionAsync(Guid runtimeId, CancellationToken token)
+ {
+ VerifyNotDisposed();
+
+ var hasConnectedStreamSource = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
+ using var methodRegistration = token.Register(() => hasConnectedStreamSource.TrySetCanceled(token));
+ using var disposalRegistration = _disposalSource.Token.Register(
+ () => hasConnectedStreamSource.TrySetException(new ObjectDisposedException(nameof(ReversedDiagnosticsServer))));
+
+ RegisterHandler(runtimeId, (Guid id, ref Stream cachedStream) =>
+ {
+ if (runtimeId != id)
+ {
+ return false;
+ }
+
+ // Check if the registrant was already finished.
+ if (hasConnectedStreamSource.Task.IsCompleted)
+ {
+ return true;
+ }
+
+ if (!TestStream(cachedStream))
+ {
+ cachedStream.Dispose();
+ cachedStream = null;
+ return false;
+ }
+
+ // Found a stream that is valid; signal completion if possible.
+ hasConnectedStreamSource.TrySetResult(true);
+
+ // Regardless of the registrant previously waiting or cancelled,
+ // the handler should be removed from consideration.
+ return true;
+ });
+
+ // Wait for the handler to verify we have a connected stream
+ await hasConnectedStreamSource.Task.ConfigureAwait(false);
+ }
+
+ private void ProvideStream(Guid runtimeId, Stream stream)
+ {
+ Debug.Assert(Monitor.IsEntered(_lock));
+
+ // Get the previous stream in order to dispose it later
+ _cachedStreams.TryGetValue(runtimeId, out Stream previousStream);
+
+ RunStreamHandlers(runtimeId, stream);
+
+ // Dispose the previous stream if there was one.
+ previousStream?.Dispose();
+ }
+
+ private void RunStreamHandlers(Guid runtimeId, Stream stream)
+ {
+ Debug.Assert(Monitor.IsEntered(_lock));
+
+ // If there are any handlers waiting for a stream, provide
+ // it to the first handler in the queue.
+ for (int i = 0; (i < _handlers.Count) && (null != stream); i++)
+ {
+ StreamHandler handler = _handlers[i];
+ if (handler(runtimeId, ref stream))
+ {
+ _handlers.RemoveAt(i);
+ i--;
+ }
+ }
+
+ // Store the stream for when a handler registers later. If
+ // a handler already captured the stream, this will be null, thus
+ // representing that no existing stream is waiting to be consumed.
+ _cachedStreams[runtimeId] = stream;
+ }
+
+ private bool TestStream(Stream stream)
+ {
+ if (null == stream)
+ {
+ throw new ArgumentNullException(nameof(stream));
+ }
+
+ if (stream is ExposedSocketNetworkStream networkStream)
+ {
+ // Update Connected state of socket by sending non-blocking zero-byte data.
+ Socket socket = networkStream.Socket;
+ bool blocking = socket.Blocking;
+ try
+ {
+ socket.Blocking = false;
+ socket.Send(Array.Empty<byte>(), 0, SocketFlags.None);
+ }
+ catch (Exception)
+ {
+ }
+ finally
+ {
+ socket.Blocking = blocking;
+ }
+ return socket.Connected;
+ }
+ else if (stream is PipeStream pipeStream)
+ {
+ Debug.Assert(RuntimeInformation.IsOSPlatform(OSPlatform.Windows), "Pipe stream should only be used on Windows.");
+
+ // PeekNamedPipe will return false if the pipe is disconnected/broken.
+ return NativeMethods.PeekNamedPipe(
+ pipeStream.SafePipeHandle,
+ null,
+ 0,
+ IntPtr.Zero,
+ IntPtr.Zero,
+ IntPtr.Zero);
+ }
+
+ return false;
+ }
+
+ private void RegisterHandler(Guid runtimeId, StreamHandler handler)
+ {
+ lock (_lock)
+ {
+ if (!_cachedStreams.TryGetValue(runtimeId, out Stream stream))
+ {
+ throw new InvalidOperationException($"Runtime instance with identifier '{runtimeId}' is not registered.");
+ }
+
+ _handlers.Add(handler);
+
+ if (stream != null)
+ {
+ RunStreamHandlers(runtimeId, stream);
+ }
+ }
+ }
+
+ public static int MaxAllowedConnections = IpcServerTransport.MaxAllowedConnections;
+ }
+}
// The .NET Foundation licenses this file to you under the MIT license.
// See the LICENSE file in the project root for more information.
-using Microsoft.AspNetCore;
-using Microsoft.AspNetCore.Hosting;
-using Microsoft.Diagnostics.Monitoring;
-using Microsoft.Diagnostics.Monitoring.RestServer;
-using Microsoft.Extensions.Configuration;
-using Microsoft.Extensions.DependencyInjection;
-using Microsoft.Extensions.Logging;
using System;
using System.Collections.Generic;
using System.CommandLine;
-using System.Globalization;
-using System.IO;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
+using Microsoft.AspNetCore;
+using Microsoft.AspNetCore.Hosting;
+using Microsoft.Diagnostics.Monitoring;
+using Microsoft.Diagnostics.Monitoring.RestServer;
+using Microsoft.Extensions.Configuration;
+using Microsoft.Extensions.DependencyInjection;
namespace Microsoft.Diagnostics.Tools.Monitor
{
private const string ConfigPrefix = "DotnetMonitor_";
private const string ConfigPath = "/etc/dotnet-monitor";
- public async Task<int> Start(CancellationToken token, IConsole console, string[] urls, string[] metricUrls, bool metrics)
+ public async Task<int> Start(CancellationToken token, IConsole console, string[] urls, string[] metricUrls, bool metrics, string reversedServerAddress)
{
//CONSIDER The console logger uses the standard AddConsole, and therefore disregards IConsole.
- using IWebHost host = CreateWebHostBuilder(console, urls, metricUrls, metrics).Build();
+ using IWebHost host = CreateWebHostBuilder(console, urls, metricUrls, metrics, reversedServerAddress).Build();
await host.RunAsync(token);
return 0;
}
- public IWebHostBuilder CreateWebHostBuilder(IConsole console, string[] urls, string[] metricUrls, bool metrics)
+ public IWebHostBuilder CreateWebHostBuilder(IConsole console, string[] urls, string[] metricUrls, bool metrics, string reversedServerAddress)
{
if (metrics)
{
})
.ConfigureServices((WebHostBuilderContext context, IServiceCollection services) =>
{
+ services.AddEndpointInfoSource(reversedServerAddress);
//TODO Many of these service additions should be done through extension methods
services.AddSingleton<IDiagnosticServices, DiagnosticServices>();
if (metrics)
// The .NET Foundation licenses this file to you under the MIT license.
// See the LICENSE file in the project root for more information.
-using Microsoft.Diagnostics.Monitoring;
-using Microsoft.Tools.Common;
using System;
-using System.Collections.Generic;
using System.CommandLine;
using System.CommandLine.Builder;
using System.CommandLine.Invocation;
-using System.IO;
using System.Threading;
using System.Threading.Tasks;
+using Microsoft.Diagnostics.Monitoring;
+using Microsoft.Tools.Common;
namespace Microsoft.Diagnostics.Tools.Monitor
{
description: "Monitor logs and metrics in a .NET application send the results to a chosen destination.")
{
// Handler
- CommandHandler.Create<CancellationToken, IConsole, string[], string[], bool>(new DiagnosticsMonitorCommandHandler().Start),
- Urls(), MetricUrls(), ProvideMetrics()
+ CommandHandler.Create<CancellationToken, IConsole, string[], string[], bool, string>(new DiagnosticsMonitorCommandHandler().Start),
+ Urls(), MetricUrls(), ProvideMetrics(), ReversedServerAddress()
};
private static Option Urls() =>
Argument = new Argument<bool>(name: "metrics", defaultValue: true )
};
+ private static Option ReversedServerAddress() =>
+ new Option(
+ alias: "--reversed-server-address",
+ description: "A fully qualified path and filename for the OS transport to communicate over.")
+ {
+ Argument = new Argument<string>(name: "reversedServerAddress")
+ };
+
private static string GetDefaultMetricsEndpoint()
{
string endpoint = "http://localhost:52325";
"..\\..\\..\\..\\..\\.dotnet\\dotnet.exe") :
"../../../../../.dotnet/dotnet";
- public static string GetTraceePath(string traceeName = "Tracee")
+ public static string GetTraceePath(string traceeName = "Tracee", string targetFramework = "netcoreapp3.1")
{
var curPath = Directory.GetCurrentDirectory();
;
- var traceePath = curPath.Replace(System.Reflection.Assembly.GetCallingAssembly().GetName().Name, traceeName);
+ var traceePath = curPath
+ .Replace(System.Reflection.Assembly.GetCallingAssembly().GetName().Name, traceeName)
+ .Replace("netcoreapp3.1", targetFramework);
return Path.Combine(traceePath, Path.ChangeExtension(traceeName, ".dll"));
}
using System.Collections.Generic;
using System.Runtime.InteropServices;
using System.Threading;
+using System.Threading.Tasks;
using Xunit;
using Xunit.Abstractions;
}
[Fact]
- public void CheckSpecificProcessTest()
+ public async Task WaitForConnectionTest()
{
TestRunner runner = new TestRunner(CommonHelper.GetTraceePath(), output);
runner.Start(3000);
}
var client = new DiagnosticsClient(runner.Pid);
- Assert.True(client.CheckTransport(), $"Unable to verify diagnostics transport for test process {runner.Pid}.");
-
- runner.Stop();
+ using var timeoutSource = new CancellationTokenSource(TimeSpan.FromMilliseconds(250));
+ try
+ {
+ await client.WaitForConnectionAsync(timeoutSource.Token);
+ }
+ finally
+ {
+ runner.Stop();
+ }
}
}
}
<ProjectReference Include="../../Microsoft.Diagnostics.NETCore.Client/Microsoft.Diagnostics.NETCore.Client.csproj" />
<ProjectReference Include="../../Microsoft.Diagnostics.TestHelpers/Microsoft.Diagnostics.TestHelpers.csproj" />
<ProjectReference Include="../Tracee/Tracee.csproj" PrivateAssets="all" />
- <PackageReference Include="Microsoft.Diagnostics.Tracing.TraceEvent" Version="2.0.47" />
+ </ItemGroup>
+ <ItemGroup>
+ <PackageReference Include="Microsoft.Diagnostics.Tracing.TraceEvent" Version="$(MicrosoftDiagnosticsTracingTraceEventVersion)" />
+ </ItemGroup>
+ <ItemGroup>
+ <InternalsVisibleTo Include="DotnetMonitor.UnitTests" />
</ItemGroup>
</Project>
--- /dev/null
+// Licensed to the .NET Foundation under one or more agreements.
+// The .NET Foundation licenses this file to you under the MIT license.
+// See the LICENSE file in the project root for more information.
+
+using System.IO;
+using System.Runtime.InteropServices;
+using Xunit.Abstractions;
+
+namespace Microsoft.Diagnostics.NETCore.Client
+{
+ internal static class ReversedServerHelper
+ {
+ /// <summary>
+ /// Creates a unique server name to avoid collisions from simultaneous running tests
+ /// or potentially abandoned socket files.
+ /// </summary>
+ public static string CreateServerTransportName()
+ {
+ string transportName = "DOTNET_DIAGSERVER_TESTS_" + Path.GetRandomFileName();
+ if (RuntimeInformation.IsOSPlatform(OSPlatform.Windows))
+ {
+ return transportName;
+ }
+ else
+ {
+ return Path.Combine(Path.GetTempPath(), transportName);
+ }
+ }
+
+ /// <summary>
+ /// Starts the Tracee executable while enabling connection to reverse diagnostics server.
+ /// </summary>
+ public static TestRunner StartTracee(ITestOutputHelper _outputHelper, string transportName)
+ {
+ var runner = new TestRunner(CommonHelper.GetTraceePath(targetFramework: "net5.0"), _outputHelper);
+ runner.AddReversedServer(transportName);
+ runner.Start();
+ return runner;
+ }
+
+ public static void AddReversedServer(this TestRunner runner, string transportName)
+ {
+ runner.AddEnvVar("DOTNET_DiagnosticsMonitorAddress", transportName);
+ }
+
+ public static string ToTestString(this IpcEndpointInfo info)
+ {
+ return $"PID={info.ProcessId}, COOKIE={info.RuntimeInstanceCookie}";
+ }
+ }
+}
--- /dev/null
+// Licensed to the .NET Foundation under one or more agreements.
+// The .NET Foundation licenses this file to you under the MIT license.
+// See the LICENSE file in the project root for more information.
+
+using System;
+using System.Collections.Generic;
+using System.Diagnostics.Tracing;
+using System.Threading;
+using System.Threading.Tasks;
+using Microsoft.Diagnostics.Tracing;
+using Xunit;
+using Xunit.Abstractions;
+
+namespace Microsoft.Diagnostics.NETCore.Client
+{
+ public class ReversedServerTests
+ {
+ private readonly ITestOutputHelper _outputHelper;
+
+ public ReversedServerTests(ITestOutputHelper outputHelper)
+ {
+ _outputHelper = outputHelper;
+ }
+
+ /// <summary>
+ /// Tests that server throws appropriate exceptions when disposed.
+ /// </summary>
+ [Fact]
+ public async Task ReversedServerDisposeTest()
+ {
+ var server = StartReversedServer(out string transportName);
+
+ using CancellationTokenSource cancellation = new CancellationTokenSource(TimeSpan.FromSeconds(1));
+ Task acceptTask = server.AcceptAsync(cancellation.Token);
+
+ // Validate server surface throws after disposal
+ server.Dispose();
+
+ // Pending tasks should be cancelled and throw TaskCanceledException
+ await Assert.ThrowsAnyAsync<OperationCanceledException>(() => acceptTask);
+ Assert.True(acceptTask.IsCanceled);
+
+ // Calls after dispose should throw ObjectDisposedException
+ await Assert.ThrowsAsync<ObjectDisposedException>(
+ () => server.AcceptAsync(cancellation.Token));
+
+ Assert.Throws<ObjectDisposedException>(
+ () => server.RemoveConnection(Guid.Empty));
+ }
+
+ /// <summary>
+ /// Tests that <see cref="ReversedDiagnosticsServer.AcceptAsync(CancellationToken)"/> does not complete
+ /// when no connections are available and that cancellation will move the returned task to the cancelled state.
+ /// </summary>
+ [Fact]
+ public async Task ReversedServerAcceptAsyncYieldsTest()
+ {
+ using var server = StartReversedServer(out string transportName);
+
+ using var cancellationSource = new CancellationTokenSource(TimeSpan.FromSeconds(1));
+
+ _outputHelper.WriteLine("Waiting for connection from server.");
+ Task acceptTask = server.AcceptAsync(cancellationSource.Token);
+
+ await Assert.ThrowsAnyAsync<OperationCanceledException>(() => acceptTask);
+ Assert.True(acceptTask.IsCanceled);
+ }
+
+ /// <summary>
+ /// Tests that invoking server methods with non-existing runtime identifier appropriately fail.
+ /// </summary>
+ [Fact]
+ public async Task ReversedServerNonExistingRuntimeIdentifierTest()
+ {
+ using var server = StartReversedServer(out string transportName);
+
+ Guid nonExistingRuntimeId = Guid.NewGuid();
+
+ using CancellationTokenSource cancellation = new CancellationTokenSource(TimeSpan.FromSeconds(1));
+
+ _outputHelper.WriteLine($"Testing {nameof(ReversedDiagnosticsServer.WaitForConnectionAsync)}");
+ await Assert.ThrowsAsync<InvalidOperationException>(
+ () => server.WaitForConnectionAsync(nonExistingRuntimeId, cancellation.Token));
+
+ _outputHelper.WriteLine($"Testing {nameof(ReversedDiagnosticsServer.Connect)}");
+ Assert.Throws<InvalidOperationException>(
+ () => server.Connect(nonExistingRuntimeId, TimeSpan.FromSeconds(1)));
+
+ _outputHelper.WriteLine($"Testing {nameof(ReversedDiagnosticsServer.RemoveConnection)}");
+ Assert.False(server.RemoveConnection(nonExistingRuntimeId), "Removal of nonexisting connection should fail.");
+ }
+
+ /// <summary>
+ /// Tests that a single client can connect to server, diagnostics can occur,
+ /// and multiple use of a single DiagnosticsClient is allowed.
+ /// </summary>
+ /// <remarks>
+ /// The multiple use of a single client is important in the reverse scenario
+ /// because of how the endpoint is updated with new stream information each
+ /// time the target process reconnects to the server.
+ /// </remarks>
+ [Fact]
+ public async Task ReversedServerSingleTargetMultipleUseClientTest()
+ {
+ using var server = StartReversedServer(out string transportName);
+ await using var accepter = new EndpointInfoAccepter(server, _outputHelper);
+
+ TestRunner runner = null;
+ IpcEndpointInfo info;
+ try
+ {
+ // Start client pointing to diagnostics server
+ runner = StartTracee(transportName);
+
+ info = await AcceptAsync(accepter);
+
+ await VerifyEndpointInfo(runner, info);
+
+ // There should not be any new endpoint infos
+ await VerifyNoNewEndpointInfos(accepter);
+
+ ResumeRuntime(info);
+
+ await VerifySingleSession(info);
+ }
+ finally
+ {
+ _outputHelper.WriteLine("Stopping tracee.");
+ runner?.Stop();
+ }
+
+ // Wait some time for the process to exit
+ await Task.Delay(TimeSpan.FromSeconds(1));
+
+ // Process exited so the endpoint should not have a valid transport anymore.
+ await VerifyWaitForConnection(info, expectValid: false);
+
+ Assert.True(server.RemoveConnection(info.RuntimeInstanceCookie), "Expected to be able to remove connection from server.");
+
+ // There should not be any more endpoint infos
+ await VerifyNoNewEndpointInfos(accepter);
+ }
+
+ /// <summary>
+ /// Tests that a DiagnosticsClient is not viable after target exists.
+ /// </summary>
+ [Fact]
+ public async Task ReversedServerSingleTargetExitsClientInviableTest()
+ {
+ using var server = StartReversedServer(out string transportName);
+ await using var accepter = new EndpointInfoAccepter(server, _outputHelper);
+
+ TestRunner runner = null;
+ IpcEndpointInfo info;
+ try
+ {
+ // Start client pointing to diagnostics server
+ runner = StartTracee(transportName);
+
+ // Get client connection
+ info = await AcceptAsync(accepter);
+
+ await VerifyEndpointInfo(runner, info);
+
+ // There should not be any new endpoint infos
+ await VerifyNoNewEndpointInfos(accepter);
+
+ ResumeRuntime(info);
+
+ await VerifyWaitForConnection(info);
+ }
+ finally
+ {
+ _outputHelper.WriteLine("Stopping tracee.");
+ runner?.Stop();
+ }
+
+ // Wait some time for the process to exit
+ await Task.Delay(TimeSpan.FromSeconds(1));
+
+ // Process exited so the endpoint should not have a valid transport anymore.
+ await VerifyWaitForConnection(info, expectValid: false);
+
+ Assert.True(server.RemoveConnection(info.RuntimeInstanceCookie), "Expected to be able to remove connection from server.");
+
+ // There should not be any more endpoint infos
+ await VerifyNoNewEndpointInfos(accepter);
+ }
+
+ private ReversedDiagnosticsServer StartReversedServer(out string transportName)
+ {
+ transportName = ReversedServerHelper.CreateServerTransportName();
+ _outputHelper.WriteLine("Starting reversed server at '" + transportName + "'.");
+ return new ReversedDiagnosticsServer(transportName);
+ }
+
+ private async Task<IpcEndpointInfo> AcceptAsync(EndpointInfoAccepter accepter)
+ {
+ using (var cancellationSource = new CancellationTokenSource(TimeSpan.FromSeconds(3)))
+ {
+ return await accepter.AcceptAsync(cancellationSource.Token);
+ }
+ }
+
+ private TestRunner StartTracee(string transportName)
+ {
+ _outputHelper.WriteLine("Starting tracee.");
+ return ReversedServerHelper.StartTracee(_outputHelper, transportName);
+ }
+
+ private static EventPipeProvider CreateProvider(string name)
+ {
+ return new EventPipeProvider(name, EventLevel.Verbose, (long)EventKeywords.All);
+ }
+
+ private async Task VerifyWaitForConnection(IpcEndpointInfo info, bool expectValid = true)
+ {
+ using var connectionCancellation = new CancellationTokenSource(TimeSpan.FromSeconds(1));
+ if (expectValid)
+ {
+ await info.Endpoint.WaitForConnectionAsync(connectionCancellation.Token);
+ }
+ else
+ {
+ await Assert.ThrowsAsync<TaskCanceledException>(
+ () => info.Endpoint.WaitForConnectionAsync(connectionCancellation.Token));
+ }
+ }
+
+ /// <summary>
+ /// Checks that the accepter does not provide a new endpoint info.
+ /// </summary>
+ private async Task VerifyNoNewEndpointInfos(EndpointInfoAccepter accepter)
+ {
+ _outputHelper.WriteLine("Verifying there are no more connections.");
+
+ using var cancellationSource = new CancellationTokenSource(TimeSpan.FromSeconds(1));
+
+ Task acceptTask = accepter.AcceptAsync(cancellationSource.Token);
+ await Assert.ThrowsAsync<OperationCanceledException>(() => acceptTask);
+ Assert.True(acceptTask.IsCanceled);
+
+ _outputHelper.WriteLine("Verified there are no more connections.");
+ }
+
+ /// <summary>
+ /// Verifies basic information on the endpoint info and that it matches the target process from the runner.
+ /// </summary>
+ private async Task VerifyEndpointInfo(TestRunner runner, IpcEndpointInfo info, bool expectValid = true)
+ {
+ _outputHelper.WriteLine($"Verifying connection information for process ID {runner.Pid}.");
+ Assert.NotNull(runner);
+ Assert.Equal(runner.Pid, info.ProcessId);
+ Assert.NotEqual(Guid.Empty, info.RuntimeInstanceCookie);
+ Assert.NotNull(info.Endpoint);
+
+ await VerifyWaitForConnection(info, expectValid);
+
+ _outputHelper.WriteLine($"Connection: {info.ToTestString()}");
+ }
+
+ private void ResumeRuntime(IpcEndpointInfo info)
+ {
+ var client = new DiagnosticsClient(info.Endpoint);
+
+ _outputHelper.WriteLine($"{info.RuntimeInstanceCookie}: Resuming runtime instance.");
+ try
+ {
+ client.ResumeRuntime();
+ _outputHelper.WriteLine($"{info.RuntimeInstanceCookie}: Resumed successfully.");
+ }
+ catch (ServerErrorException ex)
+ {
+ // Runtime likely does not understand the ResumeRuntime command.
+ _outputHelper.WriteLine($"{info.RuntimeInstanceCookie}: {ex.Message}");
+ }
+ }
+
+ /// <summary>
+ /// Verifies that a client can handle multiple operations simultaneously.
+ /// </summary>
+ private async Task VerifySingleSession(IpcEndpointInfo info)
+ {
+ await VerifyWaitForConnection(info);
+
+ var client = new DiagnosticsClient(info.Endpoint);
+
+ _outputHelper.WriteLine($"{info.RuntimeInstanceCookie}: Creating session #1.");
+ var providers = new List<EventPipeProvider>();
+ providers.Add(new EventPipeProvider(
+ "System.Runtime",
+ EventLevel.Informational,
+ 0,
+ new Dictionary<string, string>() {
+ { "EventCounterIntervalSec", "1" }
+ }));
+ using var session = client.StartEventPipeSession(providers);
+
+ _outputHelper.WriteLine($"{info.RuntimeInstanceCookie}: Verifying session produces events.");
+ await VerifyEventStreamProvidesEventsAsync(info, session, 1);
+
+ _outputHelper.WriteLine($"{info.RuntimeInstanceCookie}: Session verification complete.");
+ }
+
+ /// <summary>
+ /// Verifies that an event stream does provide events.
+ /// </summary>
+ private Task VerifyEventStreamProvidesEventsAsync(IpcEndpointInfo info, EventPipeSession session, int sessionNumber)
+ {
+ Assert.NotNull(session);
+ Assert.NotNull(session.EventStream);
+
+ return Task.Run(async () =>
+ {
+ _outputHelper.WriteLine($"{info.RuntimeInstanceCookie}: Session #{sessionNumber} - Creating event source.");
+
+ // This blocks for a while due to this bug: https://github.com/microsoft/perfview/issues/1172
+ using var eventSource = new EventPipeEventSource(session.EventStream);
+
+ _outputHelper.WriteLine($"{info.RuntimeInstanceCookie}: Session #{sessionNumber} - Setup event handlers.");
+
+ // Create task completion source that is completed when any events are provided; cancel it if cancellation is requested
+ var receivedEventsSource = new TaskCompletionSource<object>(TaskCreationOptions.RunContinuationsAsynchronously);
+
+ using var cancellation = new CancellationTokenSource(TimeSpan.FromMinutes(1));
+ using var _ = cancellation.Token.Register(() =>
+ {
+ if (receivedEventsSource.TrySetCanceled())
+ {
+ _outputHelper.WriteLine($"{info.RuntimeInstanceCookie}: Session #{sessionNumber} - Cancelled event processing.");
+ }
+ });
+
+ // Create continuation task that stops the session (which immediately stops event processing).
+ Task stoppedProcessingTask = receivedEventsSource.Task
+ .ContinueWith(_ =>
+ {
+ _outputHelper.WriteLine($"{info.RuntimeInstanceCookie}: Session #{sessionNumber} - Stopping session.");
+ session.Stop();
+ });
+
+ // Signal task source when an event is received.
+ Action<TraceEvent> allEventsHandler = _ =>
+ {
+ if (receivedEventsSource.TrySetResult(null))
+ {
+ _outputHelper.WriteLine($"{info.RuntimeInstanceCookie}: Session #{sessionNumber} - Received an event and set result on completion source.");
+ }
+ };
+
+ _outputHelper.WriteLine($"{info.RuntimeInstanceCookie}: Session #{sessionNumber} - Start processing events.");
+ eventSource.Dynamic.All += allEventsHandler;
+ eventSource.Process();
+ eventSource.Dynamic.All -= allEventsHandler;
+ _outputHelper.WriteLine($"{info.RuntimeInstanceCookie}: Session #{sessionNumber} - Stopped processing events.");
+
+ // Wait on the task source to verify if it ran to completion or was cancelled.
+ await receivedEventsSource.Task;
+
+ _outputHelper.WriteLine($"{info.RuntimeInstanceCookie}: Session #{sessionNumber} - Waiting for session to stop.");
+ await stoppedProcessingTask;
+ });
+ }
+
+ /// <summary>
+ /// Helper class for consuming endpoint infos from the reverse diagnostics server.
+ /// </summary>
+ /// <remarks>
+ /// The diagnostics server requires that something is continuously attempting to accept endpoint infos
+ /// in order to process incoming connections. This helps facilitate that continuous accepting of
+ /// endpoint infos so the individual tests don't have to know about the behavior.
+ /// </remarks>
+ private class EndpointInfoAccepter : IAsyncDisposable
+ {
+ private readonly CancellationTokenSource _cancellation = new CancellationTokenSource();
+ private readonly Queue<IpcEndpointInfo> _connections = new Queue<IpcEndpointInfo>();
+ private readonly SemaphoreSlim _connectionsSemaphore = new SemaphoreSlim(0);
+ private readonly Task _listenTask;
+ private readonly ITestOutputHelper _outputHelper;
+ private readonly ReversedDiagnosticsServer _server;
+
+ private int _acceptedCount;
+ private bool _disposed;
+
+ public EndpointInfoAccepter(ReversedDiagnosticsServer server, ITestOutputHelper outputHelper)
+ {
+ _server = server;
+ _outputHelper = outputHelper;
+
+ _listenTask = ListenAsync(_cancellation.Token);
+ }
+
+ public async ValueTask DisposeAsync()
+ {
+ if (!_disposed)
+ {
+ _cancellation.Cancel();
+
+ await _listenTask;
+
+ _cancellation.Dispose();
+
+ _disposed = true;
+ }
+ }
+
+ public async Task<IpcEndpointInfo> AcceptAsync(CancellationToken token)
+ {
+ using var linkedSource = CancellationTokenSource.CreateLinkedTokenSource(token, _cancellation.Token);
+
+ _outputHelper.WriteLine("Waiting for connection from accepter.");
+ await _connectionsSemaphore.WaitAsync(linkedSource.Token).ConfigureAwait(false);
+ _outputHelper.WriteLine("Received connection from accepter.");
+
+ return _connections.Dequeue();
+ }
+
+ /// <summary>
+ /// Continuously accept endpoint infos from the reversed diagnostics server so
+ /// that <see cref="ReversedDiagnosticsServer.AcceptAsync(CancellationToken)"/>
+ /// is always awaited in order to to handle new runtime instance connections
+ /// as well as existing runtime instance reconnections.
+ /// </summary>
+ private async Task ListenAsync(CancellationToken token)
+ {
+ while (!token.IsCancellationRequested)
+ {
+ IpcEndpointInfo info;
+ try
+ {
+ _outputHelper.WriteLine("Waiting for connection from server.");
+ info = await _server.AcceptAsync(token).ConfigureAwait(false);
+
+ _acceptedCount++;
+ _outputHelper.WriteLine($"Accepted connection #{_acceptedCount} from server: {info.ToTestString()}");
+ }
+ catch (OperationCanceledException)
+ {
+ break;
+ }
+
+ _connections.Enqueue(info);
+ _connectionsSemaphore.Release();
+ }
+ }
+ }
+ }
+}
// See the LICENSE file in the project root for more information.
-using Microsoft.Diagnostics.NETCore.Client;
-using Microsoft.Diagnostics.TestHelpers;
using System;
-using System.Collections.Generic;
using System.ComponentModel;
using System.Diagnostics;
using System.IO;
-using System.Linq;
using System.Runtime.InteropServices;
using System.Threading;
using System.Threading.Tasks;
-using Xunit;
using Xunit.Abstractions;
namespace Microsoft.Diagnostics.NETCore.Client
if (outputHelper != null)
outputHelper.WriteLine($"[{DateTime.Now.ToString()}] Launching test: " + startInfo.FileName);
- testProcess = Process.Start(startInfo);
+ testProcess = new Process();
+ testProcess.StartInfo = startInfo;
+ testProcess.EnableRaisingEvents = true;
- if (testProcess == null)
+ if (!testProcess.Start())
{
outputHelper.WriteLine($"Could not start process: " + startInfo.FileName);
}
outputHelper.WriteLine($"Process {testProcess.Id} status: Running");
}
}
+
+ public async Task WaitForExitAsync(CancellationToken token)
+ {
+ TaskCompletionSource<object> exitedSource = new TaskCompletionSource<object>(TaskCreationOptions.RunContinuationsAsynchronously);
+ EventHandler exitedHandler = (s, e) => exitedSource.TrySetResult(null);
+
+ testProcess.Exited += exitedHandler;
+ try
+ {
+ if (!testProcess.HasExited)
+ {
+ using var _ = token.Register(() => exitedSource.TrySetCanceled(token));
+
+ await exitedSource.Task;
+ }
+ }
+ finally
+ {
+ testProcess.Exited -= exitedHandler;
+ }
+ }
}
}
{
class Program
{
+ private const int LoopCount = 30;
+
static void Main(string[] args)
{
+ Console.WriteLine("Sleep in loop for {0} seconds.", LoopCount);
+
// Runs for max of 30 sec
- for(var i = 0; i < 30; i++)
+ for (var i = 0; i < LoopCount; i++)
{
+ Console.WriteLine("Iteration #{0}", i);
Thread.Sleep(1000);
}
+
+ Console.WriteLine("Press any key to exit.");
+ Console.ReadKey();
}
}
}
// The .NET Foundation licenses this file to you under the MIT license.
// See the LICENSE file in the project root for more information.
-using Microsoft.Diagnostics.Monitoring;
-using Microsoft.Diagnostics.NETCore.Client;
-using Microsoft.Extensions.DependencyInjection;
-using Microsoft.Extensions.Logging;
-using Microsoft.Extensions.Options;
-using Microsoft.VisualStudio.TestPlatform.ObjectModel;
-using Microsoft.VisualStudio.TestPlatform.ObjectModel.DataCollection;
using System;
using System.Collections.Generic;
-using System.Diagnostics;
using System.IO;
-using System.Linq;
using System.Runtime.InteropServices;
using System.Text.Json;
using System.Threading;
using System.Threading.Tasks;
+using Microsoft.Diagnostics.Monitoring;
+using Microsoft.Diagnostics.NETCore.Client;
+using Microsoft.Extensions.Logging;
using Xunit;
using Xunit.Abstractions;
using Xunit.Extensions;
var outputStream = new MemoryStream();
- await using (var testExecution = RemoteTestExecution.StartRemoteProcess("LoggerRemoteTest", _output))
+ await using (var testExecution = StartTraceeProcess("LoggerRemoteTest"))
{
//TestRunner should account for start delay to make sure that the diagnostic pipe is available.
DiagnosticsEventPipeProcessor diagnosticsEventPipeProcessor = new DiagnosticsEventPipeProcessor(
PipeMode.Logs,
- loggerFactory,
- Enumerable.Empty<IMetricsLogger>());
+ loggerFactory);
- var processingTask = diagnosticsEventPipeProcessor.Process(testExecution.TestRunner.Pid, TimeSpan.FromSeconds(10), CancellationToken.None);
+ var client = new DiagnosticsClient(testExecution.TestRunner.Pid);
+ var processingTask = diagnosticsEventPipeProcessor.Process(client, testExecution.TestRunner.Pid, TimeSpan.FromSeconds(10), CancellationToken.None);
//Add a small delay to make sure diagnostic processor had a chance to initialize
await Task.Delay(1000);
}
}
+ private RemoteTestExecution StartTraceeProcess(string loggerCategory)
+ {
+ return RemoteTestExecution.StartProcess(CommonHelper.GetTraceePath("EventPipeTracee") + " " + loggerCategory, _output);
+ }
+
private sealed class LoggerTestResult
{
public string Category { get; set; }
--- /dev/null
+// Licensed to the .NET Foundation under one or more agreements.
+// The .NET Foundation licenses this file to you under the MIT license.
+// See the LICENSE file in the project root for more information.
+
+using System;
+using System.Collections.Generic;
+using System.Threading;
+using System.Threading.Tasks;
+using Microsoft.Diagnostics.Monitoring;
+using Microsoft.Diagnostics.NETCore.Client;
+using Xunit;
+using Xunit.Abstractions;
+
+namespace DotnetMonitor.UnitTests
+{
+ public class EndpointInfoSourceTests
+ {
+ private readonly ITestOutputHelper _outputHelper;
+
+ public EndpointInfoSourceTests(ITestOutputHelper outputHelper)
+ {
+ _outputHelper = outputHelper;
+ }
+
+ /// <summary>
+ /// Tests that the server endpoint info source has no connections
+ /// if <see cref="ServerEndpointInfoSource.Listen"/> is not called.
+ /// </summary>
+ [Fact]
+ public async Task ServerSourceNoListenTest()
+ {
+ await using var source = CreateServerSource(out string transportName);
+ // Intentionally do not call Listen
+
+ await using (var execution1 = StartTraceeProcess("LoggerRemoteTest", transportName))
+ {
+ execution1.Start();
+
+ await Task.Delay(TimeSpan.FromSeconds(1));
+
+ var endpointInfos = await GetEndpointInfoAsync(source);
+
+ Assert.Empty(endpointInfos);
+
+ _outputHelper.WriteLine("Stopping tracee.");
+ }
+ }
+
+ /// <summary>
+ /// Tests that the server endpoint info source has not connections if no processes connect to it.
+ /// </summary>
+ [Fact]
+ public async Task ServerSourceNoConnectionsTest()
+ {
+ await using var source = CreateServerSource(out _);
+ source.Listen();
+
+ var endpointInfos = await GetEndpointInfoAsync(source);
+ Assert.Empty(endpointInfos);
+ }
+
+ /// <summary>
+ /// Tests that server endpoint info source should throw ObjectDisposedException
+ /// from API surface after being disposed.
+ /// </summary>
+ [Fact]
+ public async Task ServerSourceThrowsWhenDisposedTest()
+ {
+ var source = CreateServerSource(out _);
+ source.Listen();
+
+ await source.DisposeAsync();
+
+ // Validate source surface throws after disposal
+ Assert.Throws<ObjectDisposedException>(
+ () => source.Listen());
+
+ Assert.Throws<ObjectDisposedException>(
+ () => source.Listen(1));
+
+ await Assert.ThrowsAsync<ObjectDisposedException>(
+ () => source.GetEndpointInfoAsync(CancellationToken.None));
+ }
+
+ /// <summary>
+ /// Tests that server endpoint info source should throw an exception from
+ /// <see cref="ServerEndpointInfoSource.Listen"/> and
+ /// <see cref="ServerEndpointInfoSource.Listen(int)"/> after listening was already started.
+ /// </summary>
+ [Fact]
+ public async Task ServerSourceThrowsWhenMultipleListenTest()
+ {
+ await using var source = CreateServerSource(out _);
+ source.Listen();
+
+ Assert.Throws<InvalidOperationException>(
+ () => source.Listen());
+
+ Assert.Throws<InvalidOperationException>(
+ () => source.Listen(1));
+ }
+
+ /// <summary>
+ /// Tests that the server endpoint info source can properly enumerate endpoint infos when a single
+ /// target connects to it and "disconnects" from it.
+ /// </summary>
+ [Fact]
+ public async Task ServerSourceAddRemoveSingleConnectionTest()
+ {
+ await using var source = CreateServerSource(out string transportName);
+ source.Listen();
+
+ var endpointInfos = await GetEndpointInfoAsync(source);
+ Assert.Empty(endpointInfos);
+
+ Task newEndpointInfoTask = source.WaitForNewEndpointInfoAsync(TimeSpan.FromSeconds(5));
+
+ await using (var execution1 = StartTraceeProcess("LoggerRemoteTest", transportName))
+ {
+ await newEndpointInfoTask;
+
+ execution1.Start();
+
+ endpointInfos = await GetEndpointInfoAsync(source);
+
+ var endpointInfo = Assert.Single(endpointInfos);
+ VerifyConnection(execution1.TestRunner, endpointInfo);
+
+ _outputHelper.WriteLine("Stopping tracee.");
+ }
+
+ await Task.Delay(TimeSpan.FromSeconds(1));
+
+ endpointInfos = await GetEndpointInfoAsync(source);
+
+ Assert.Empty(endpointInfos);
+ }
+
+ private TestServerEndpointInfoSource CreateServerSource(out string transportName)
+ {
+ transportName = ReversedServerHelper.CreateServerTransportName();
+ _outputHelper.WriteLine("Starting server endpoint info source at '" + transportName + "'.");
+ return new TestServerEndpointInfoSource(transportName, _outputHelper);
+ }
+
+ private RemoteTestExecution StartTraceeProcess(string loggerCategory, string transportName = null)
+ {
+ _outputHelper.WriteLine("Starting tracee.");
+ string exePath = CommonHelper.GetTraceePath("EventPipeTracee", targetFramework: "net5.0");
+ return RemoteTestExecution.StartProcess(exePath + " " + loggerCategory, _outputHelper, transportName);
+ }
+
+ private async Task<IEnumerable<IEndpointInfo>> GetEndpointInfoAsync(ServerEndpointInfoSource source)
+ {
+ _outputHelper.WriteLine("Getting endpoint infos.");
+ using CancellationTokenSource cancellationSource = new CancellationTokenSource(TimeSpan.FromSeconds(10));
+ return await source.GetEndpointInfoAsync(cancellationSource.Token);
+ }
+
+ /// <summary>
+ /// Verifies basic information on the connection and that it matches the target process from the runner.
+ /// </summary>
+ private static void VerifyConnection(TestRunner runner, IEndpointInfo endpointInfo)
+ {
+ Assert.NotNull(runner);
+ Assert.NotNull(endpointInfo);
+ Assert.Equal(runner.Pid, endpointInfo.ProcessId);
+ Assert.NotEqual(Guid.Empty, endpointInfo.RuntimeInstanceCookie);
+ Assert.NotNull(endpointInfo.Endpoint);
+ }
+
+ private sealed class TestServerEndpointInfoSource : ServerEndpointInfoSource
+ {
+ private readonly ITestOutputHelper _outputHelper;
+ private readonly List<TaskCompletionSource<IpcEndpointInfo>> _addedEndpointInfoSources = new List<TaskCompletionSource<IpcEndpointInfo>>();
+
+ public TestServerEndpointInfoSource(string transportPath, ITestOutputHelper outputHelper)
+ : base(transportPath)
+ {
+ _outputHelper = outputHelper;
+ }
+
+ public async Task<IpcEndpointInfo> WaitForNewEndpointInfoAsync(TimeSpan timeout)
+ {
+ TaskCompletionSource<IpcEndpointInfo> addedEndpointInfoSource = new TaskCompletionSource<IpcEndpointInfo>(TaskCreationOptions.RunContinuationsAsynchronously);
+ using var timeoutCancellation = new CancellationTokenSource();
+ var token = timeoutCancellation.Token;
+ using var _ = token.Register(() => addedEndpointInfoSource.TrySetCanceled(token));
+
+ lock (_addedEndpointInfoSources)
+ {
+ _addedEndpointInfoSources.Add(addedEndpointInfoSource);
+ }
+
+ _outputHelper.WriteLine("Waiting for new endpoint info.");
+ timeoutCancellation.CancelAfter(timeout);
+ IpcEndpointInfo endpointInfo = await addedEndpointInfoSource.Task;
+ _outputHelper.WriteLine("Notified of new endpoint info.");
+
+ return endpointInfo;
+ }
+
+ internal override void OnAddedEndpointInfo(IpcEndpointInfo info)
+ {
+ _outputHelper.WriteLine($"Added endpoint info to collection: {info.ToTestString()}");
+
+ lock (_addedEndpointInfoSources)
+ {
+ foreach (var source in _addedEndpointInfoSources)
+ {
+ source.TrySetResult(info);
+ }
+ _addedEndpointInfoSources.Clear();
+ }
+ }
+
+ internal override void OnRemovedEndpointInfo(IpcEndpointInfo info)
+ {
+ _outputHelper.WriteLine($"Removed endpoint info from collection: {info.ToTestString()}");
+ }
+ }
+ }
+}
// The .NET Foundation licenses this file to you under the MIT license.
// See the LICENSE file in the project root for more information.
-using Microsoft.Diagnostics.NETCore.Client;
-using Microsoft.Extensions.DependencyInjection;
-using Microsoft.Extensions.Logging;
using System;
-using System.Collections.Generic;
-using System.Diagnostics;
using System.IO;
-using System.Text;
using System.Threading;
using System.Threading.Tasks;
+using Microsoft.Diagnostics.NETCore.Client;
using Xunit.Abstractions;
namespace DotnetMonitor.UnitTests
{
private Task IoReadingTask { get; }
- private RemoteTestExecution(TestRunner runner, Task ioReadingTask)
+ private ITestOutputHelper OutputHelper { get; }
+
+ public TestRunner TestRunner { get; }
+
+ private RemoteTestExecution(TestRunner runner, Task ioReadingTask, ITestOutputHelper outputHelper)
{
TestRunner = runner;
IoReadingTask = ioReadingTask;
+ OutputHelper = outputHelper;
}
- public TestRunner TestRunner { get; }
-
public void Start()
{
SendSignal();
TestRunner.StandardInput.Flush();
}
- public static RemoteTestExecution StartRemoteProcess(string loggerCategory, ITestOutputHelper outputHelper)
+ public static RemoteTestExecution StartProcess(string commandLine, ITestOutputHelper outputHelper, string reversedServerTransportName = null)
{
- TestRunner runner = new TestRunner(CommonHelper.GetTraceePath("EventPipeTracee") + " " + loggerCategory,
- outputHelper, redirectError: true, redirectInput: true);
+ TestRunner runner = new TestRunner(commandLine, outputHelper, redirectError: true, redirectInput: true);
+ if (!string.IsNullOrEmpty(reversedServerTransportName))
+ {
+ runner.AddReversedServer(reversedServerTransportName);
+ }
runner.Start();
Task readingTask = ReadAllOutput(runner.StandardOutput, runner.StandardError, outputHelper);
- return new RemoteTestExecution(runner, readingTask);
+ return new RemoteTestExecution(runner, readingTask, outputHelper);
}
private static Task ReadAllOutput(StreamReader output, StreamReader error, ITestOutputHelper outputHelper)
public async ValueTask DisposeAsync()
{
SendSignal();
+
+ using var timeoutSource = new CancellationTokenSource(TimeSpan.FromSeconds(1));
+ try
+ {
+ await TestRunner.WaitForExitAsync(timeoutSource.Token);
+ }
+ catch (OperationCanceledException)
+ {
+ OutputHelper.WriteLine("Remote process did not exit within timeout period. Forcefully stopping process.");
+ TestRunner.Stop();
+ }
+
await IoReadingTask;
}
}