Refactor IPC communication to allow for async and cancellation. (#2350)
authorJustin Anderson <jander-msft@users.noreply.github.com>
Tue, 31 Aug 2021 05:18:16 +0000 (22:18 -0700)
committerGitHub <noreply@github.com>
Tue, 31 Aug 2021 05:18:16 +0000 (22:18 -0700)
* Refactor IPC communication to allow for async and cancellation.
Refactor tests to flex both non-async and async methods.

18 files changed:
src/Microsoft.Diagnostics.Monitoring.EventPipe/EventPipeStreamProvider.cs
src/Microsoft.Diagnostics.NETCore.Client/DiagnosticsClient/DiagnosticsClient.cs
src/Microsoft.Diagnostics.NETCore.Client/DiagnosticsClient/EventPipeSession.cs
src/Microsoft.Diagnostics.NETCore.Client/DiagnosticsIpc/IpcClient.cs
src/Microsoft.Diagnostics.NETCore.Client/DiagnosticsIpc/IpcCommands.cs
src/Microsoft.Diagnostics.NETCore.Client/DiagnosticsIpc/IpcHeader.cs
src/Microsoft.Diagnostics.NETCore.Client/DiagnosticsIpc/IpcMessage.cs
src/Microsoft.Diagnostics.NETCore.Client/DiagnosticsIpc/IpcResponse.cs [new file with mode: 0644]
src/Microsoft.Diagnostics.NETCore.Client/DiagnosticsIpc/ProcessEnvironment.cs
src/Microsoft.Diagnostics.NETCore.Client/Microsoft.Diagnostics.NETCore.Client.csproj
src/Microsoft.Diagnostics.NETCore.Client/StreamExtensions.cs [new file with mode: 0644]
src/tests/Microsoft.Diagnostics.Monitoring.EventPipe/EventLogsPipelineUnitTests.cs
src/tests/Microsoft.Diagnostics.NETCore.Client/DiagnosticsClientApiShim.cs [new file with mode: 0644]
src/tests/Microsoft.Diagnostics.NETCore.Client/DiagnosticsClientApiShimExtensions.cs [new file with mode: 0644]
src/tests/Microsoft.Diagnostics.NETCore.Client/EventPipeSessionTests.cs
src/tests/Microsoft.Diagnostics.NETCore.Client/GetProcessEnvironmentTests.cs
src/tests/Microsoft.Diagnostics.NETCore.Client/GetProcessInfoTests.cs
src/tests/Microsoft.Diagnostics.NETCore.Client/ReversedServerTests.cs

index 545d92a14282b1b847e68f808147430305cc3c1d..7f9681ddfbe74a128dfff3b8b22e3af6bcb5021a 100644 (file)
@@ -22,14 +22,14 @@ namespace Microsoft.Diagnostics.Monitoring.EventPipe
             _stopProcessingSource = new TaskCompletionSource<object>(TaskCreationOptions.RunContinuationsAsynchronously);
         }
 
-        public Task<Stream> ProcessEvents(DiagnosticsClient client, TimeSpan duration, CancellationToken cancellationToken)
+        public async Task<Stream> ProcessEvents(DiagnosticsClient client, TimeSpan duration, CancellationToken cancellationToken)
         {
             cancellationToken.ThrowIfCancellationRequested();
 
             EventPipeSession session = null;
             try
             {
-                session = client.StartEventPipeSession(_sourceConfig.GetProviders(), _sourceConfig.RequestRundown, _sourceConfig.BufferSizeInMB);
+                session = await client.StartEventPipeSessionAsync(_sourceConfig.GetProviders(), _sourceConfig.RequestRundown, _sourceConfig.BufferSizeInMB, cancellationToken).ConfigureAwait(false);
             }
             catch (EndOfStreamException e)
             {
@@ -49,10 +49,11 @@ namespace Microsoft.Diagnostics.Monitoring.EventPipe
                 // Use TaskCompletionSource instead of Task.Delay with cancellation to avoid
                 // using exceptions for normal termination of event stream.
                 await _stopProcessingSource.Task.ConfigureAwait(false);
-                StopSession(session);
+
+                await StopSessionAsync(session).ConfigureAwait(false);
             });
 
-            return Task.FromResult(session.EventStream);
+            return session.EventStream;
         }
 
         public void StopProcessing()
@@ -60,11 +61,13 @@ namespace Microsoft.Diagnostics.Monitoring.EventPipe
             _stopProcessingSource.TrySetResult(null);
         }
 
-        private static void StopSession(EventPipeSession session)
+        private static async Task StopSessionAsync(EventPipeSession session)
         {
+            // Cancel after a generous amount of time if process ended before command is sent.
+            using CancellationTokenSource cancellationSource = new(IpcClient.ConnectTimeout);
             try
             {
-                session.Stop();
+                await session.StopAsync(cancellationSource.Token).ConfigureAwait(false);
             }
             catch (EndOfStreamException)
             {
@@ -74,6 +77,10 @@ namespace Microsoft.Diagnostics.Monitoring.EventPipe
             catch (TimeoutException)
             {
             }
+            // We may time out if the process ended before we sent StopTracing command. We can just exit in that case.
+            catch (OperationCanceledException)
+            {
+            }
             // On Unix platforms, we may actually get a PNSE since the pipe is gone with the process, and Runtime Client Library
             // does not know how to distinguish a situation where there is no pipe to begin with, or where the process has exited
             // before collection started and got rid of a pipe that once existed.
index 62391f376a95f3a9e1ac22eac0d7a787c5967383..3a420f9df1c150cd99f140f293f1044ee688625d 100644 (file)
@@ -7,7 +7,6 @@ using System.Collections.Generic;
 using System.Globalization;
 using System.IO;
 using System.Linq;
-using System.Runtime.InteropServices;
 using System.Text.RegularExpressions;
 using System.Threading;
 using System.Threading.Tasks;
@@ -68,7 +67,7 @@ namespace Microsoft.Diagnostics.NETCore.Client
         /// </returns> 
         public EventPipeSession StartEventPipeSession(IEnumerable<EventPipeProvider> providers, bool requestRundown = true, int circularBufferMB = 256)
         {
-            return new EventPipeSession(_endpoint, providers, requestRundown, circularBufferMB);
+            return EventPipeSession.Start(_endpoint, providers, requestRundown, circularBufferMB);
         }
 
         /// <summary>
@@ -82,7 +81,37 @@ namespace Microsoft.Diagnostics.NETCore.Client
         /// </returns> 
         public EventPipeSession StartEventPipeSession(EventPipeProvider provider, bool requestRundown = true, int circularBufferMB = 256)
         {
-            return new EventPipeSession(_endpoint, new[] { provider }, requestRundown, circularBufferMB);
+            return EventPipeSession.Start(_endpoint, new[] { provider }, requestRundown, circularBufferMB);
+        }
+
+        /// <summary>
+        /// Start tracing the application and return an EventPipeSession object
+        /// </summary>
+        /// <param name="providers">An IEnumerable containing the list of Providers to turn on.</param>
+        /// <param name="requestRundown">If true, request rundown events from the runtime</param>
+        /// <param name="circularBufferMB">The size of the runtime's buffer for collecting events in MB</param>
+        /// <param name="token">The token to monitor for cancellation requests.</param>
+        /// <returns>
+        /// An EventPipeSession object representing the EventPipe session that just started.
+        /// </returns> 
+        internal Task<EventPipeSession> StartEventPipeSessionAsync(IEnumerable<EventPipeProvider> providers, bool requestRundown, int circularBufferMB, CancellationToken token)
+        {
+            return EventPipeSession.StartAsync(_endpoint, providers, requestRundown, circularBufferMB, token);
+        }
+
+        /// <summary>
+        /// Start tracing the application and return an EventPipeSession object
+        /// </summary>
+        /// <param name="provider">An EventPipeProvider to turn on.</param>
+        /// <param name="requestRundown">If true, request rundown events from the runtime</param>
+        /// <param name="circularBufferMB">The size of the runtime's buffer for collecting events in MB</param>
+        /// <param name="token">The token to monitor for cancellation requests.</param>
+        /// <returns>
+        /// An EventPipeSession object representing the EventPipe session that just started.
+        /// </returns>
+        internal Task<EventPipeSession> StartEventPipeSessionAsync(EventPipeProvider provider, bool requestRundown, int circularBufferMB, CancellationToken token)
+        {
+            return EventPipeSession.StartAsync(_endpoint, new[] { provider }, requestRundown, circularBufferMB, token);
         }
 
         /// <summary>
@@ -93,26 +122,23 @@ namespace Microsoft.Diagnostics.NETCore.Client
         /// <param name="logDumpGeneration">When set to true, display the dump generation debug log to the console.</param>
         public void WriteDump(DumpType dumpType, string dumpPath, bool logDumpGeneration = false)
         {
-            if (string.IsNullOrEmpty(dumpPath))
-                throw new ArgumentNullException($"{nameof(dumpPath)} required");
+            IpcMessage request = CreateWriteDumpMessage(dumpType, dumpPath, logDumpGeneration);
+            IpcMessage response = IpcClient.SendMessage(_endpoint, request);
+            ValidateResponseMessage(response, nameof(WriteDump));
+        }
 
-            byte[] payload = SerializePayload(dumpPath, (uint)dumpType, logDumpGeneration);
-            IpcMessage message = new IpcMessage(DiagnosticsServerCommandSet.Dump, (byte)DumpCommandId.GenerateCoreDump, payload);
-            IpcMessage response = IpcClient.SendMessage(_endpoint, message);
-            switch ((DiagnosticsServerResponseId)response.Header.CommandId)
-            {
-                case DiagnosticsServerResponseId.Error:
-                    uint hr = BitConverter.ToUInt32(response.Payload, 0);
-                    if (hr == (uint)DiagnosticsIpcError.UnknownCommand)
-                    {
-                        throw new UnsupportedCommandException($"Unsupported operating system: {RuntimeInformation.OSDescription}");
-                    }
-                    throw new ServerErrorException($"Writing dump failed (HRESULT: 0x{hr:X8})");
-                case DiagnosticsServerResponseId.OK:
-                    return;
-                default:
-                    throw new ServerErrorException($"Writing dump failed - server responded with unknown command");
-            }
+        /// <summary>
+        /// Trigger a core dump generation.
+        /// </summary> 
+        /// <param name="dumpType">Type of the dump to be generated</param>
+        /// <param name="dumpPath">Full path to the dump to be generated. By default it is /tmp/coredump.{pid}</param>
+        /// <param name="logDumpGeneration">When set to true, display the dump generation debug log to the console.</param>
+        /// <param name="token">The token to monitor for cancellation requests.</param>
+        internal async Task WriteDumpAsync(DumpType dumpType, string dumpPath, bool logDumpGeneration, CancellationToken token)
+        {
+            IpcMessage request = CreateWriteDumpMessage(dumpType, dumpPath, logDumpGeneration);
+            IpcMessage response = await IpcClient.SendMessageAsync(_endpoint, request, token).ConfigureAwait(false);
+            ValidateResponseMessage(response, nameof(WriteDumpAsync));
         }
 
         /// <summary>
@@ -124,43 +150,22 @@ namespace Microsoft.Diagnostics.NETCore.Client
         /// <param name="additionalData">Additional data to be passed to the profiler</param>
         public void AttachProfiler(TimeSpan attachTimeout, Guid profilerGuid, string profilerPath, byte[] additionalData = null)
         {
-            if (profilerGuid == null || profilerGuid == Guid.Empty)
-            {
-                throw new ArgumentException($"{nameof(profilerGuid)} must be a valid Guid");
-            }
-
-            if (String.IsNullOrEmpty(profilerPath))
-            {
-                throw new ArgumentException($"{nameof(profilerPath)} must be non-null");
-            }
-
-            byte[] serializedConfiguration = SerializePayload((uint)attachTimeout.TotalSeconds, profilerGuid, profilerPath, additionalData);
-            var message = new IpcMessage(DiagnosticsServerCommandSet.Profiler, (byte)ProfilerCommandId.AttachProfiler, serializedConfiguration);
-            var response = IpcClient.SendMessage(_endpoint, message);
-            switch ((DiagnosticsServerResponseId)response.Header.CommandId)
-            {
-                case DiagnosticsServerResponseId.Error:
-                    uint hr = BitConverter.ToUInt32(response.Payload, 0);
-                    if (hr == (uint)DiagnosticsIpcError.UnknownCommand)
-                    {
-                        throw new UnsupportedCommandException("The target runtime does not support profiler attach");
-                    }
-                    if (hr == (uint)DiagnosticsIpcError.ProfilerAlreadyActive)
-                    {
-                        throw new ProfilerAlreadyActiveException("The request to attach a profiler was denied because a profiler is already loaded");
-                    }
-                    throw new ServerErrorException($"Profiler attach failed (HRESULT: 0x{hr:X8})");
-                case DiagnosticsServerResponseId.OK:
-                    return;
-                default:
-                    throw new ServerErrorException($"Profiler attach failed - server responded with unknown command");
-            }
+            IpcMessage request = CreateAttachProfilerMessage(attachTimeout, profilerGuid, profilerPath, additionalData);
+            IpcMessage response = IpcClient.SendMessage(_endpoint, request);
+            ValidateResponseMessage(response, nameof(AttachProfiler));
 
             // The call to set up the pipe and send the message operates on a different timeout than attachTimeout, which is for the runtime.
             // We should eventually have a configurable timeout for the message passing, potentially either separately from the 
             // runtime timeout or respect attachTimeout as one total duration.
         }
 
+        internal async Task AttachProfilerAsync(TimeSpan attachTimeout, Guid profilerGuid, string profilerPath, byte[] additionalData, CancellationToken token)
+        {
+            IpcMessage request = CreateAttachProfilerMessage(attachTimeout, profilerGuid, profilerPath, additionalData);
+            IpcMessage response = await IpcClient.SendMessageAsync(_endpoint, request, token).ConfigureAwait(false);
+            ValidateResponseMessage(response, nameof(AttachProfilerAsync));
+        }
+
         /// <summary>
         /// Set a profiler as the startup profiler. It is only valid to issue this command
         /// while the runtime is paused at startup.
@@ -169,38 +174,16 @@ namespace Microsoft.Diagnostics.NETCore.Client
         /// <param name="profilerPath">Path to the profiler to be attached</param>
         public void SetStartupProfiler(Guid profilerGuid, string profilerPath)
         {
-            if (profilerGuid == null || profilerGuid == Guid.Empty)
-            {
-                throw new ArgumentException($"{nameof(profilerGuid)} must be a valid Guid");
-            }
-
-            if (String.IsNullOrEmpty(profilerPath))
-            {
-                throw new ArgumentException($"{nameof(profilerPath)} must be non-null");
-            }
-
-            byte[] serializedConfiguration = SerializePayload(profilerGuid, profilerPath);
-            var message = new IpcMessage(DiagnosticsServerCommandSet.Profiler, (byte)ProfilerCommandId.StartupProfiler, serializedConfiguration);
-            var response = IpcClient.SendMessage(_endpoint, message);
-            switch ((DiagnosticsServerResponseId)response.Header.CommandId)
-            {
-                case DiagnosticsServerResponseId.Error:
-                    uint hr = BitConverter.ToUInt32(response.Payload, 0);
-                    if (hr == (uint)DiagnosticsIpcError.UnknownCommand)
-                    {
-                        throw new UnsupportedCommandException("The target runtime does not support the ProfilerStartup command.");
-                    }
-                    else if (hr == (uint)DiagnosticsIpcError.InvalidArgument)
-                    {
-                        throw new ServerErrorException("The runtime must be suspended to issue the SetStartupProfiler command.");
-                    }
+            IpcMessage request = CreateSetStartupProfilerMessage(profilerGuid, profilerPath);
+            IpcMessage response = IpcClient.SendMessage(_endpoint, request);
+            ValidateResponseMessage(response, nameof(SetStartupProfiler), ValidateResponseOptions.InvalidArgumentIsRequiresSuspension);
+        }
 
-                    throw new ServerErrorException($"Profiler startup failed (HRESULT: 0x{hr:X8})");
-                case DiagnosticsServerResponseId.OK:
-                    return;
-                default:
-                    throw new ServerErrorException($"Profiler startup failed - server responded with unknown command");
-            }
+        internal async Task SetStartupProfilerAsync(Guid profilerGuid, string profilerPath, CancellationToken token)
+        {
+            IpcMessage request = CreateSetStartupProfilerMessage(profilerGuid, profilerPath);
+            IpcMessage response = await IpcClient.SendMessageAsync(_endpoint, request, token).ConfigureAwait(false);
+            ValidateResponseMessage(response, nameof(SetStartupProfilerAsync), ValidateResponseOptions.InvalidArgumentIsRequiresSuspension);
         }
 
         /// <summary>
@@ -208,19 +191,16 @@ namespace Microsoft.Diagnostics.NETCore.Client
         /// </summary>
         public void ResumeRuntime()
         {
-            IpcMessage message = new IpcMessage(DiagnosticsServerCommandSet.Process, (byte)ProcessCommandId.ResumeRuntime);
-            var response = IpcClient.SendMessage(_endpoint, message);
-            switch ((DiagnosticsServerResponseId)response.Header.CommandId)
-            {
-                case DiagnosticsServerResponseId.Error:
-                    // Try fallback for Preview 7 and Preview 8
-                    ResumeRuntimeFallback();
-                    return;
-                case DiagnosticsServerResponseId.OK:
-                    return;
-                default:
-                    throw new ServerErrorException($"Resume runtime failed - server responded with unknown command");
-            }
+            IpcMessage request = CreateResumeRuntimeMessage();
+            IpcMessage response = IpcClient.SendMessage(_endpoint, request);
+            ValidateResponseMessage(response, nameof(ResumeRuntime));
+        }
+
+        internal async Task ResumeRuntimeAsync(CancellationToken token)
+        {
+            IpcMessage request = CreateResumeRuntimeMessage();
+            IpcMessage response = await IpcClient.SendMessageAsync(_endpoint, request, token).ConfigureAwait(false);
+            ValidateResponseMessage(response, nameof(ResumeRuntimeAsync));
         }
 
         /// <summary>
@@ -230,29 +210,16 @@ namespace Microsoft.Diagnostics.NETCore.Client
         /// <param name="value">The value of the environment variable to set.</param>
         public void SetEnvironmentVariable(string name, string value)
         {
-            if (String.IsNullOrEmpty(name))
-            {
-                throw new ArgumentException($"{nameof(name)} must be non-null.");
-            }
-
-            byte[] serializedConfiguration = SerializePayload(name, value);
-            var message = new IpcMessage(DiagnosticsServerCommandSet.Process, (byte)ProcessCommandId.SetEnvironmentVariable, serializedConfiguration);
-            var response = IpcClient.SendMessage(_endpoint, message);
-            switch ((DiagnosticsServerResponseId)response.Header.CommandId)
-            {
-                case DiagnosticsServerResponseId.Error:
-                    uint hr = BitConverter.ToUInt32(response.Payload, 0);
-                    if (hr == (uint)DiagnosticsIpcError.UnknownCommand)
-                    {
-                        throw new UnsupportedCommandException("The target runtime does not support the SetEnvironmentVariable command.");
-                    }
+            IpcMessage request = CreateSetEnvironmentVariableMessage(name, value);
+            IpcMessage response = IpcClient.SendMessage(_endpoint, request);
+            ValidateResponseMessage(response, nameof(SetEnvironmentVariable));
+        }
 
-                    throw new ServerErrorException($"SetEnvironmentVariable failed (HRESULT: 0x{hr:X8})");
-                case DiagnosticsServerResponseId.OK:
-                    return;
-                default:
-                    throw new ServerErrorException($"SetEnvironmentVariable failed - server responded with unknown command");
-            }
+        internal async Task SetEnvironmentVariableAsync(string name, string value, CancellationToken token)
+        {
+            IpcMessage request = CreateSetEnvironmentVariableMessage(name, value);
+            IpcMessage response = await IpcClient.SendMessageAsync(_endpoint, request, token).ConfigureAwait(false);
+            ValidateResponseMessage(response, nameof(SetEnvironmentVariableAsync));
         }
 
         /// <summary>
@@ -261,21 +228,22 @@ namespace Microsoft.Diagnostics.NETCore.Client
         /// <returns>A dictionary containing all of the environment variables defined in the target process.</returns>
         public Dictionary<string, string> GetProcessEnvironment()
         {
-            var message = new IpcMessage(DiagnosticsServerCommandSet.Process, (byte)ProcessCommandId.GetProcessEnvironment);
-            Stream continuation = IpcClient.SendMessage(_endpoint, message, out IpcMessage response);
-            switch ((DiagnosticsServerResponseId)response.Header.CommandId)
-            {
-                case DiagnosticsServerResponseId.Error:
-                    int hr = BitConverter.ToInt32(response.Payload, 0);
-                    throw new ServerErrorException($"Get process environment failed (HRESULT: 0x{hr:X8})");
-                case DiagnosticsServerResponseId.OK:
-                    ProcessEnvironmentHelper helper = ProcessEnvironmentHelper.Parse(response.Payload);
-                    Task<Dictionary<string, string>> envTask = helper.ReadEnvironmentAsync(continuation);
-                    envTask.Wait();
-                    return envTask.Result;
-                default:
-                    throw new ServerErrorException($"Get process environment failed - server responded with unknown command");
-            }
+            IpcMessage message = CreateProcessEnvironmentMessage();
+            using IpcResponse response = IpcClient.SendMessageGetContinuation(_endpoint, message);
+            ValidateResponseMessage(response.Message, nameof(GetProcessEnvironmentAsync));
+
+            ProcessEnvironmentHelper helper = ProcessEnvironmentHelper.Parse(response.Message.Payload);
+            return helper.ReadEnvironment(response.Continuation);
+        }
+
+        internal async Task<Dictionary<string, string>> GetProcessEnvironmentAsync(CancellationToken token)
+        {
+            IpcMessage message = CreateProcessEnvironmentMessage();
+            using IpcResponse response = await IpcClient.SendMessageGetContinuationAsync(_endpoint, message, token).ConfigureAwait(false);
+            ValidateResponseMessage(response.Message, nameof(GetProcessEnvironmentAsync));
+
+            ProcessEnvironmentHelper helper = ProcessEnvironmentHelper.Parse(response.Message.Payload);
+            return await helper.ReadEnvironmentAsync(response.Continuation, token).ConfigureAwait(false);
         }
 
         /// <summary>
@@ -304,76 +272,54 @@ namespace Microsoft.Diagnostics.NETCore.Client
             return GetAllPublishedProcesses().Distinct();
         }
 
-
-        // Fallback command for .NET 5 Preview 7 and Preview 8
-        internal void ResumeRuntimeFallback()
+        internal ProcessInfo GetProcessInfo()
         {
-            IpcMessage message = new IpcMessage(DiagnosticsServerCommandSet.Server, (byte)DiagnosticServerCommandId.ResumeRuntime);
-            var response = IpcClient.SendMessage(_endpoint, message);
-            switch ((DiagnosticsServerResponseId)response.Header.CommandId)
-            {
-                case DiagnosticsServerResponseId.Error:
-                    var hr = BitConverter.ToUInt32(response.Payload, 0);
-                    if (hr == (uint)DiagnosticsIpcError.UnknownCommand)
-                    {
-                        throw new UnsupportedCommandException($"Resume runtime command is unknown by target runtime.");
-                    }
-                    throw new ServerErrorException($"Resume runtime failed (HRESULT: 0x{hr:X8})");
-                case DiagnosticsServerResponseId.OK:
-                    return;
-                default:
-                    throw new ServerErrorException($"Resume runtime failed - server responded with unknown command");
-            }
+            // RE: https://github.com/dotnet/runtime/issues/54083
+            // If the GetProcessInfo2 command is sent too early, it will crash the runtime instance.
+            // Disable the usage of the command until that issue is fixed.
+
+            // Attempt to get ProcessInfo v2
+            //ProcessInfo processInfo = TryGetProcessInfo2();
+            //if (null != processInfo)
+            //{
+            //    return processInfo;
+            //}
+
+            IpcMessage request = CreateProcessInfoMessage();
+            using IpcResponse response = IpcClient.SendMessageGetContinuation(_endpoint, request);
+            return GetProcessInfoFromResponse(response, nameof(GetProcessInfo));
         }
 
-        internal ProcessInfo GetProcessInfo()
+        internal async Task<ProcessInfo> GetProcessInfoAsync(CancellationToken token)
         {
             // RE: https://github.com/dotnet/runtime/issues/54083
             // If the GetProcessInfo2 command is sent too early, it will crash the runtime instance.
             // Disable the usage of the command until that issue is fixed.
 
             // Attempt to get ProcessInfo v2
-            //ProcessInfo processInfo = GetProcessInfo2();
+            //ProcessInfo processInfo = await TryGetProcessInfo2Async(token);
             //if (null != processInfo)
             //{
             //    return processInfo;
             //}
 
-            // Attempt to get ProcessInfo v1
-            IpcMessage message = new IpcMessage(DiagnosticsServerCommandSet.Process, (byte)ProcessCommandId.GetProcessInfo);
-            var response = IpcClient.SendMessage(_endpoint, message);
-            switch ((DiagnosticsServerResponseId)response.Header.CommandId)
-            {
-                case DiagnosticsServerResponseId.Error:
-                    var hr = BitConverter.ToInt32(response.Payload, 0);
-                    throw new ServerErrorException($"Get process info failed (HRESULT: 0x{hr:X8})");
-                case DiagnosticsServerResponseId.OK:
-                    return ProcessInfo.ParseV1(response.Payload);
-                default:
-                    throw new ServerErrorException($"Get process info failed - server responded with unknown command");
-            }
+            IpcMessage request = CreateProcessInfoMessage();
+            using IpcResponse response = await IpcClient.SendMessageGetContinuationAsync(_endpoint, request, token).ConfigureAwait(false);
+            return GetProcessInfoFromResponse(response, nameof(GetProcessInfoAsync));
         }
 
-        private ProcessInfo GetProcessInfo2()
+        private ProcessInfo TryGetProcessInfo2()
         {
-            IpcMessage message = new IpcMessage(DiagnosticsServerCommandSet.Process, (byte)ProcessCommandId.GetProcessInfo2);
-            var response = IpcClient.SendMessage(_endpoint, message);
-            switch ((DiagnosticsServerResponseId)response.Header.CommandId)
-            {
-                case DiagnosticsServerResponseId.Error:
-                    uint hr = BitConverter.ToUInt32(response.Payload, 0);
-                    // In the case that the runtime doesn't understand the GetProcessInfo2 command,
-                    // just break to allow fallback to try to get ProcessInfo v1.
-                    if (hr == (uint)DiagnosticsIpcError.UnknownCommand)
-                    {
-                        return null;
-                    }
-                    throw new ServerErrorException($"GetProcessInfo2 failed (HRESULT: 0x{hr:X8})");
-                case DiagnosticsServerResponseId.OK:
-                    return ProcessInfo.ParseV2(response.Payload);
-                default:
-                    throw new ServerErrorException($"Get process info failed - server responded with unknown command");
-            }
+            IpcMessage request = CreateProcessInfo2Message();
+            using IpcResponse response2 = IpcClient.SendMessageGetContinuation(_endpoint, request);
+            return TryGetProcessInfo2FromResponse(response2, nameof(GetProcessInfo));
+        }
+
+        private async Task<ProcessInfo> TryGetProcessInfo2Async(CancellationToken token)
+        {
+            IpcMessage request = CreateProcessInfo2Message();
+            using IpcResponse response2 = await IpcClient.SendMessageGetContinuationAsync(_endpoint, request, token).ConfigureAwait(false);
+            return TryGetProcessInfo2FromResponse(response2, nameof(GetProcessInfoAsync));
         }
 
         private static byte[] SerializePayload<T>(T arg)
@@ -455,5 +401,133 @@ namespace Microsoft.Diagnostics.NETCore.Client
                 throw new ArgumentException($"Type {obj.GetType()} is not supported in SerializePayloadArgument, please add it.");
             }
         }
+
+        private static IpcMessage CreateAttachProfilerMessage(TimeSpan attachTimeout, Guid profilerGuid, string profilerPath, byte[] additionalData)
+        {
+            if (profilerGuid == null || profilerGuid == Guid.Empty)
+            {
+                throw new ArgumentException($"{nameof(profilerGuid)} must be a valid Guid");
+            }
+
+            if (String.IsNullOrEmpty(profilerPath))
+            {
+                throw new ArgumentException($"{nameof(profilerPath)} must be non-null");
+            }
+
+            byte[] serializedConfiguration = SerializePayload((uint)attachTimeout.TotalSeconds, profilerGuid, profilerPath, additionalData);
+            return new IpcMessage(DiagnosticsServerCommandSet.Profiler, (byte)ProfilerCommandId.AttachProfiler, serializedConfiguration);
+        }
+
+        private static IpcMessage CreateProcessEnvironmentMessage()
+        {
+            return new IpcMessage(DiagnosticsServerCommandSet.Process, (byte)ProcessCommandId.GetProcessEnvironment);
+        }
+
+        private static IpcMessage CreateProcessInfoMessage()
+        {
+            return new IpcMessage(DiagnosticsServerCommandSet.Process, (byte)ProcessCommandId.GetProcessInfo);
+        }
+
+        private static IpcMessage CreateProcessInfo2Message()
+        {
+            return new IpcMessage(DiagnosticsServerCommandSet.Process, (byte)ProcessCommandId.GetProcessInfo2);
+        }
+
+        private static IpcMessage CreateResumeRuntimeMessage()
+        {
+            return new IpcMessage(DiagnosticsServerCommandSet.Process, (byte)ProcessCommandId.ResumeRuntime);
+        }
+
+        private static IpcMessage CreateSetEnvironmentVariableMessage(string name, string value)
+        {
+            if (String.IsNullOrEmpty(name))
+            {
+                throw new ArgumentException($"{nameof(name)} must be non-null.");
+            }
+
+            byte[] serializedConfiguration = SerializePayload(name, value);
+            return new IpcMessage(DiagnosticsServerCommandSet.Process, (byte)ProcessCommandId.SetEnvironmentVariable, serializedConfiguration);
+        }
+
+        private static IpcMessage CreateSetStartupProfilerMessage(Guid profilerGuid, string profilerPath)
+        {
+            if (profilerGuid == null || profilerGuid == Guid.Empty)
+            {
+                throw new ArgumentException($"{nameof(profilerGuid)} must be a valid Guid");
+            }
+
+            if (String.IsNullOrEmpty(profilerPath))
+            {
+                throw new ArgumentException($"{nameof(profilerPath)} must be non-null");
+            }
+
+            byte[] serializedConfiguration = SerializePayload(profilerGuid, profilerPath);
+            return new IpcMessage(DiagnosticsServerCommandSet.Profiler, (byte)ProfilerCommandId.StartupProfiler, serializedConfiguration);
+        }
+
+        private static IpcMessage CreateWriteDumpMessage(DumpType dumpType, string dumpPath, bool logDumpGeneration)
+        {
+            if (string.IsNullOrEmpty(dumpPath))
+                throw new ArgumentNullException($"{nameof(dumpPath)} required");
+
+            byte[] payload = SerializePayload(dumpPath, (uint)dumpType, logDumpGeneration);
+            return new IpcMessage(DiagnosticsServerCommandSet.Dump, (byte)DumpCommandId.GenerateCoreDump, payload);
+        }
+
+        private static ProcessInfo GetProcessInfoFromResponse(IpcResponse response, string operationName)
+        {
+            ValidateResponseMessage(response.Message, operationName);
+
+            return ProcessInfo.ParseV1(response.Message.Payload);
+        }
+
+        private static ProcessInfo TryGetProcessInfo2FromResponse(IpcResponse response, string operationName)
+        {
+            if (!ValidateResponseMessage(response.Message, operationName, ValidateResponseOptions.UnknownCommandReturnsFalse))
+            {
+                return null;
+            }
+
+            return ProcessInfo.ParseV2(response.Message.Payload);
+        }
+
+        internal static bool ValidateResponseMessage(IpcMessage responseMessage, string operationName, ValidateResponseOptions options = ValidateResponseOptions.None)
+        {
+            switch ((DiagnosticsServerResponseId)responseMessage.Header.CommandId)
+            {
+                case DiagnosticsServerResponseId.Error:
+                    uint hr = BitConverter.ToUInt32(responseMessage.Payload, 0);
+                    switch (hr)
+                    {
+                        case (uint)DiagnosticsIpcError.UnknownCommand:
+                            if (options.HasFlag(ValidateResponseOptions.UnknownCommandReturnsFalse))
+                            {
+                                return false;
+                            }
+                            throw new UnsupportedCommandException($"{operationName} failed - Command is not supported.");
+                        case (uint)DiagnosticsIpcError.ProfilerAlreadyActive:
+                            throw new ProfilerAlreadyActiveException($"{operationName} failed - A profiler is already loaded.");
+                        case (uint)DiagnosticsIpcError.InvalidArgument:
+                            if (options.HasFlag(ValidateResponseOptions.InvalidArgumentIsRequiresSuspension))
+                            {
+                                throw new ServerErrorException($"{operationName} failed - The runtime must be suspended for this command.");
+                            }
+                            throw new UnsupportedCommandException($"{operationName} failed - Invalid command argument.");
+                    }
+                    throw new ServerErrorException($"{operationName} failed - HRESULT: 0x{hr:X8}");
+                case DiagnosticsServerResponseId.OK:
+                    return true;
+                default:
+                    throw new ServerErrorException($"{operationName} failed - Server responded with unknown response.");
+            }
+        }
+
+        [Flags]
+        internal enum ValidateResponseOptions
+        {
+            None = 0x0,
+            UnknownCommandReturnsFalse = 0x1,
+            InvalidArgumentIsRequiresSuspension = 0x2,
+        }
     }
 }
index 9f82f794183f3a4acba71beeaf081f3da9983949..e277b9a65f6833ee4cb665632ad3b15886535092 100644 (file)
@@ -6,83 +6,125 @@ using System;
 using System.Collections.Generic;
 using System.Diagnostics;
 using System.IO;
+using System.Threading;
+using System.Threading.Tasks;
 
 namespace Microsoft.Diagnostics.NETCore.Client
 {
     public class EventPipeSession : IDisposable
     {
-        private IEnumerable<EventPipeProvider> _providers;
-        private bool _requestRundown;
-        private int _circularBufferMB;
         private long _sessionId;
         private IpcEndpoint _endpoint;
         private bool _disposedValue = false; // To detect redundant calls
         private bool _stopped = false; // To detect redundant calls
+        private readonly IpcResponse _response;
 
-        internal EventPipeSession(IpcEndpoint endpoint, IEnumerable<EventPipeProvider> providers, bool requestRundown, int circularBufferMB)
+        private EventPipeSession(IpcEndpoint endpoint, IpcResponse response, long sessionId)
         {
             _endpoint = endpoint;
-            _providers = providers;
-            _requestRundown = requestRundown;
-            _circularBufferMB = circularBufferMB;
-            
-            var config = new EventPipeSessionConfiguration(circularBufferMB, EventPipeSerializationFormat.NetTrace, providers, requestRundown);
-            var message = new IpcMessage(DiagnosticsServerCommandSet.EventPipe, (byte)EventPipeCommandId.CollectTracing2, config.SerializeV2());
-            EventStream = IpcClient.SendMessage(endpoint, message, out var response);
-            switch ((DiagnosticsServerResponseId)response.Header.CommandId)
-            {
-                case DiagnosticsServerResponseId.OK:
-                    _sessionId = BitConverter.ToInt64(response.Payload, 0);
-                    break;
-                case DiagnosticsServerResponseId.Error:
-                    var hr = BitConverter.ToInt32(response.Payload, 0);
-                    throw new ServerErrorException($"EventPipe session start failed (HRESULT: 0x{hr:X8})");
-                default:
-                    throw new ServerErrorException($"EventPipe session start failed - Server responded with unknown command");
-            }
+            _response = response;
+            _sessionId = sessionId;
         }
 
-        public Stream EventStream { get; }
+        public Stream EventStream => _response.Continuation;
+
+        internal static EventPipeSession Start(IpcEndpoint endpoint, IEnumerable<EventPipeProvider> providers, bool requestRundown, int circularBufferMB)
+        {
+            IpcMessage requestMessage = CreateStartMessage(providers, requestRundown, circularBufferMB);
+            IpcResponse? response = IpcClient.SendMessageGetContinuation(endpoint, requestMessage);
+            return CreateSessionFromResponse(endpoint, ref response, nameof(Start));
+        }
+
+        internal static async Task<EventPipeSession> StartAsync(IpcEndpoint endpoint, IEnumerable<EventPipeProvider> providers, bool requestRundown, int circularBufferMB, CancellationToken cancellationToken)
+        {
+            IpcMessage requestMessage = CreateStartMessage(providers, requestRundown, circularBufferMB);
+            IpcResponse? response = await IpcClient.SendMessageGetContinuationAsync(endpoint, requestMessage, cancellationToken).ConfigureAwait(false);
+            return CreateSessionFromResponse(endpoint, ref response, nameof(StartAsync));
+        }
 
         ///<summary>
         /// Stops the given session
         ///</summary>
         public void Stop()
         {
-            Debug.Assert(_sessionId > 0);
-            
-            // Do not issue another Stop command if it has already been issued for this session instance.
-            if (_stopped)
+            if (TryCreateStopMessage(out IpcMessage requestMessage))
             {
-                return;
+                try
+                {
+                    IpcMessage response = IpcClient.SendMessage(_endpoint, requestMessage);
+
+                    DiagnosticsClient.ValidateResponseMessage(response, nameof(Stop));
+                }
+                // On non-abrupt exits (i.e. the target process has already exited and pipe is gone, sending Stop command will fail).
+                catch (IOException)
+                {
+                    throw new ServerNotAvailableException("Could not send Stop command. The target process may have exited.");
+                }
             }
-            else
+        }
+
+        public async Task StopAsync(CancellationToken cancellationToken)
+        {
+            if (TryCreateStopMessage(out IpcMessage requestMessage))
             {
-                _stopped = true;
+                try
+                {
+                    IpcMessage response = await IpcClient.SendMessageAsync(_endpoint, requestMessage, cancellationToken).ConfigureAwait(false);
+
+                    DiagnosticsClient.ValidateResponseMessage(response, nameof(StopAsync));
+                }
+                // On non-abrupt exits (i.e. the target process has already exited and pipe is gone, sending Stop command will fail).
+                catch (IOException)
+                {
+                    throw new ServerNotAvailableException("Could not send Stop command. The target process may have exited.");
+                }
             }
+        }
 
-            byte[] payload = BitConverter.GetBytes(_sessionId);
-            IpcMessage response;
+        private static IpcMessage CreateStartMessage(IEnumerable<EventPipeProvider> providers, bool requestRundown, int circularBufferMB)
+        {
+            var config = new EventPipeSessionConfiguration(circularBufferMB, EventPipeSerializationFormat.NetTrace, providers, requestRundown);
+            return new IpcMessage(DiagnosticsServerCommandSet.EventPipe, (byte)EventPipeCommandId.CollectTracing2, config.SerializeV2());
+        }
+
+        private static EventPipeSession CreateSessionFromResponse(IpcEndpoint endpoint, ref IpcResponse? response, string operationName)
+        {
             try
             {
-                response = IpcClient.SendMessage(_endpoint, new IpcMessage(DiagnosticsServerCommandSet.EventPipe, (byte)EventPipeCommandId.StopTracing, payload));
+                DiagnosticsClient.ValidateResponseMessage(response.Value.Message, operationName);
+
+                long sessionId = BitConverter.ToInt64(response.Value.Message.Payload, 0);
+
+                var session = new EventPipeSession(endpoint, response.Value, sessionId);
+                response = null;
+                return session;
             }
-            // On non-abrupt exits (i.e. the target process has already exited and pipe is gone, sending Stop command will fail).
-            catch (IOException)
+            finally
             {
-                throw new ServerNotAvailableException("Could not send Stop command. The target process may have exited.");
+                response?.Dispose();
             }
+        }
 
-            switch ((DiagnosticsServerResponseId)response.Header.CommandId)
+        private bool TryCreateStopMessage(out IpcMessage stopMessage)
+        {
+            Debug.Assert(_sessionId > 0);
+
+            // Do not issue another Stop command if it has already been issued for this session instance.
+            if (_stopped)
             {
-                case DiagnosticsServerResponseId.OK:
-                    return;
-                case DiagnosticsServerResponseId.Error:
-                    var hr = BitConverter.ToInt32(response.Payload, 0);
-                    throw new ServerErrorException($"EventPipe session stop failed (HRESULT: 0x{hr:X8})");
-                default:
-                    throw new ServerErrorException($"EventPipe session stop failed - Server responded with unknown command");
+                stopMessage = null;
+                return false;
             }
+            else
+            {
+                _stopped = true;
+            }
+
+            byte[] payload = BitConverter.GetBytes(_sessionId);
+
+            stopMessage = new IpcMessage(DiagnosticsServerCommandSet.EventPipe, (byte)EventPipeCommandId.StopTracing, payload);
+
+            return true;
         }
 
         protected virtual void Dispose(bool disposing)
@@ -101,7 +143,7 @@ namespace Microsoft.Diagnostics.NETCore.Client
             {
                 if (disposing)
                 {
-                    EventStream?.Dispose();
+                    _response.Dispose();
                 }
                 _disposedValue = true;
             }
index f6364820dc7255040ae5b53b1f239ac56a6f260b..87efac955a7fade45a50d6031046da974ff947ca 100644 (file)
@@ -5,6 +5,7 @@
 using System;
 using System.IO;
 using System.Threading;
+using System.Threading.Tasks;
 
 namespace Microsoft.Diagnostics.NETCore.Client
 {
@@ -12,52 +13,111 @@ namespace Microsoft.Diagnostics.NETCore.Client
     {
         // The amount of time to wait for a stream to be available for consumption by the Connect method.
         // Normally expect the runtime to respond quickly but resource constrained machines may take longer.
-        private static readonly TimeSpan ConnectTimeout = TimeSpan.FromSeconds(30);
+        internal static readonly TimeSpan ConnectTimeout = TimeSpan.FromSeconds(30);
 
         /// <summary>
-        /// Sends a single DiagnosticsIpc Message to the dotnet process with PID processId.
+        /// Sends a single DiagnosticsIpc Message to the dotnet process associated with the <paramref name="endpoint"/>.
         /// </summary>
         /// <param name="endpoint">An endpoint that provides a diagnostics connection to a runtime instance.</param>
         /// <param name="message">The DiagnosticsIpc Message to be sent</param>
-        /// <returns>The response DiagnosticsIpc Message from the dotnet process</returns>
+        /// <returns>An <see cref="IpcMessage"/> that is the response message.</returns>
         public static IpcMessage SendMessage(IpcEndpoint endpoint, IpcMessage message)
         {
-            using (var stream = endpoint.Connect(ConnectTimeout))
+            using IpcResponse response = SendMessageGetContinuation(endpoint, message);
+            return response.Message;
+        }
+
+        /// <summary>
+        /// Sends a single DiagnosticsIpc Message to the dotnet process associated with the <paramref name="endpoint"/>.
+        /// </summary>
+        /// <param name="endpoint">An endpoint that provides a diagnostics connection to a runtime instance.</param>
+        /// <param name="message">The DiagnosticsIpc Message to be sent</param>
+        /// <returns>An <see cref="IpcResponse"/> containing the response message and continuation stream.</returns>
+        public static IpcResponse SendMessageGetContinuation(IpcEndpoint endpoint, IpcMessage message)
+        {
+            Stream stream = null;
+            try
             {
+                stream = endpoint.Connect(ConnectTimeout);
+
                 Write(stream, message);
-                return Read(stream);
+
+                IpcMessage response = Read(stream);
+
+                return new IpcResponse(response, Release(ref stream));
+            }
+            finally
+            {
+                stream?.Dispose();
             }
         }
 
         /// <summary>
-        /// Sends a single DiagnosticsIpc Message to the dotnet process with PID processId
-        /// and returns the Stream for reuse in Optional Continuations.
+        /// Sends a single DiagnosticsIpc Message to the dotnet process associated with the <paramref name="endpoint"/>.
         /// </summary>
         /// <param name="endpoint">An endpoint that provides a diagnostics connection to a runtime instance.</param>
         /// <param name="message">The DiagnosticsIpc Message to be sent</param>
-        /// <param name="response">out var for response message</param>
-        /// <returns>The response DiagnosticsIpc Message from the dotnet process</returns>
-        public static Stream SendMessage(IpcEndpoint endpoint, IpcMessage message, out IpcMessage response)
+        /// <param name="cancellationToken">The token to monitor for cancellation requests.</param>
+        /// <returns>An <see cref="IpcMessage"/> that is the response message.</returns>
+        public static async Task<IpcMessage> SendMessageAsync(IpcEndpoint endpoint, IpcMessage message, CancellationToken cancellationToken)
         {
-            var stream = endpoint.Connect(ConnectTimeout);
-            Write(stream, message);
-            response = Read(stream);
-            return stream;
+            using IpcResponse response = await SendMessageGetContinuationAsync(endpoint, message, cancellationToken).ConfigureAwait(false);
+            return response.Message;
         }
 
-        private static void Write(Stream stream, byte[] buffer)
+        /// <summary>
+        /// Sends a single DiagnosticsIpc Message to the dotnet process associated with the <paramref name="endpoint"/>.
+        /// </summary>
+        /// <param name="endpoint">An endpoint that provides a diagnostics connection to a runtime instance.</param>
+        /// <param name="message">The DiagnosticsIpc Message to be sent</param>
+        /// <param name="cancellationToken">The token to monitor for cancellation requests.</param>
+        /// <returns>An <see cref="IpcResponse"/> containing the response message and continuation stream.</returns>
+        public static async Task<IpcResponse> SendMessageGetContinuationAsync(IpcEndpoint endpoint, IpcMessage message, CancellationToken cancellationToken)
         {
-            stream.Write(buffer, 0, buffer.Length);
+            Stream stream = null;
+            try
+            {
+                stream = await endpoint.ConnectAsync(cancellationToken).ConfigureAwait(false);
+
+                await WriteAsync(stream, message, cancellationToken).ConfigureAwait(false);
+
+                IpcMessage response = await ReadAsync(stream, cancellationToken).ConfigureAwait(false);
+
+                return new IpcResponse(response, Release(ref stream));
+            }
+            finally
+            {
+                stream?.Dispose();
+            }
         }
 
         private static void Write(Stream stream, IpcMessage message)
         {
-            Write(stream, message.Serialize());
+            byte[] buffer = message.Serialize();
+            stream.Write(buffer, 0, buffer.Length);
+        }
+
+        private static Task WriteAsync(Stream stream, IpcMessage message, CancellationToken cancellationToken)
+        {
+            byte[] buffer = message.Serialize();
+            return stream.WriteAsync(buffer, 0, buffer.Length, cancellationToken);
         }
 
         private static IpcMessage Read(Stream stream)
         {
             return IpcMessage.Parse(stream);
         }
+
+        private static Task<IpcMessage> ReadAsync(Stream stream, CancellationToken cancellationToken)
+        {
+            return IpcMessage.ParseAsync(stream, cancellationToken);
+        }
+
+        private static Stream Release(ref Stream stream1)
+        {
+            Stream intermediate = stream1;
+            stream1 = null;
+            return intermediate;
+        }
     }
 }
index 7b22f00c04844291ca70bfb2eb90a730409e1be5..d76f0df0bf228362bf9cc311d0d77209eae7d2ac 100644 (file)
@@ -18,17 +18,6 @@ namespace Microsoft.Diagnostics.NETCore.Client
         Server         = 0xFF,
     }
 
-    // For .NET 5 Preview 7 and Preview 8, use this with the
-    // DiagnosticsServerCommandSet.Server command set.
-    // For .NET 5 RC and later, use ProcessCommandId.ResumeRuntime with
-    // the DiagnosticsServerCommandSet.Process command set.
-    internal enum DiagnosticServerCommandId : byte
-    {
-        // 0x00 used in DiagnosticServerResponseId
-        ResumeRuntime = 0x01,
-        // 0xFF used DiagnosticServerResponseId
-    };
-
     internal enum DiagnosticsServerResponseId : byte
     {
         OK            = 0x00,
index 3a8fe59287aeb45efdbe871644f9534dfa11a072..6950e8527bd1cf0dd4ead13f8761b5dcabe9c622 100644 (file)
@@ -3,12 +3,11 @@
 // See the LICENSE file in the project root for more information.
 
 using System;
-using System.Collections.Generic;
 using System.Diagnostics;
 using System.IO;
-using System.Net.Sockets;
-using System.Runtime.InteropServices;
 using System.Text;
+using System.Threading;
+using System.Threading.Tasks;
 
 namespace Microsoft.Diagnostics.NETCore.Client
 {
@@ -53,7 +52,7 @@ namespace Microsoft.Diagnostics.NETCore.Client
             }
         }
 
-        public static IpcHeader TryParse(BinaryReader reader)
+        public static IpcHeader Parse(BinaryReader reader)
         {
             IpcHeader header = new IpcHeader
             {
@@ -67,6 +66,16 @@ namespace Microsoft.Diagnostics.NETCore.Client
             return header;
         }
 
+        public static async Task<IpcHeader> ParseAsync(Stream stream, CancellationToken cancellationToken)
+        {
+            byte[] buffer = await stream.ReadBytesAsync(HeaderSizeInBytes, cancellationToken).ConfigureAwait(false);
+            using MemoryStream bufferStream = new MemoryStream(buffer);
+            using BinaryReader bufferReader = new BinaryReader(bufferStream);
+            IpcHeader header = Parse(bufferReader);
+            Debug.Assert(bufferStream.Position == bufferStream.Length);
+            return header;
+        }
+
         override public string ToString()
         {
             return $"{{ Magic={Magic}; Size={Size}; CommandSet={CommandSet}; CommandId={CommandId}; Reserved={Reserved} }}";
index 00b50980ac061292197d0b47e93d7522ffc28a4d..c1acad97cf115a2f65a785b746fd58704cd60ed0 100644 (file)
@@ -7,6 +7,8 @@ using System.Collections.Generic;
 using System.IO;
 using System.Text;
 using System.Runtime.InteropServices;
+using System.Threading;
+using System.Threading.Tasks;
 
 namespace Microsoft.Diagnostics.NETCore.Client
 {
@@ -112,10 +114,18 @@ namespace Microsoft.Diagnostics.NETCore.Client
             IpcMessage message = new IpcMessage();
             using (var reader = new BinaryReader(stream, Encoding.UTF8, true))
             {
-                message.Header = IpcHeader.TryParse(reader);
+                message.Header = IpcHeader.Parse(reader);
                 message.Payload = reader.ReadBytes(message.Header.Size - IpcHeader.HeaderSizeInBytes);
                 return message;
             }
         }
+
+        public static async Task<IpcMessage> ParseAsync(Stream stream, CancellationToken cancellationToken)
+        {
+            IpcMessage message = new IpcMessage();
+            message.Header = await IpcHeader.ParseAsync(stream, cancellationToken).ConfigureAwait(false);
+            message.Payload = await stream.ReadBytesAsync(message.Header.Size - IpcHeader.HeaderSizeInBytes, cancellationToken).ConfigureAwait(false);
+            return message;
+        }
     }
 }
diff --git a/src/Microsoft.Diagnostics.NETCore.Client/DiagnosticsIpc/IpcResponse.cs b/src/Microsoft.Diagnostics.NETCore.Client/DiagnosticsIpc/IpcResponse.cs
new file mode 100644 (file)
index 0000000..02bc098
--- /dev/null
@@ -0,0 +1,27 @@
+// Licensed to the .NET Foundation under one or more agreements.
+// The .NET Foundation licenses this file to you under the MIT license.
+// See the LICENSE file in the project root for more information.
+
+using System;
+using System.IO;
+
+namespace Microsoft.Diagnostics.NETCore.Client
+{
+    internal struct IpcResponse : IDisposable
+    {
+        public readonly IpcMessage Message;
+
+        public readonly Stream Continuation;
+
+        public IpcResponse(IpcMessage message, Stream continuation)
+        {
+            Message = message;
+            Continuation = continuation;
+        }
+
+        public void Dispose()
+        {
+            Continuation?.Dispose();
+        }
+    }
+}
index b3985e49c4eccc58cefcde7e6cc12e6511ff2e28..2b65bbe5fd6ed4039e68a66fcd261445a7541a33 100644 (file)
@@ -5,8 +5,6 @@
 using System;
 using System.Collections.Generic;
 using System.IO;
-using System.Linq;
-using System.Text;
 using System.Threading;
 using System.Threading.Tasks;
 
@@ -14,6 +12,8 @@ namespace Microsoft.Diagnostics.NETCore.Client
 {
     internal class ProcessEnvironmentHelper
     {
+        private const int CopyBufferSize = (16 << 10) /* 16KiB */;
+
         private ProcessEnvironmentHelper() {}
         public static ProcessEnvironmentHelper Parse(byte[] payload)
         {
@@ -25,18 +25,29 @@ namespace Microsoft.Diagnostics.NETCore.Client
             return helper;
         }
 
-        public async Task<Dictionary<string,string>> ReadEnvironmentAsync(Stream continuation, CancellationToken token = default(CancellationToken))
+        public Dictionary<string, string> ReadEnvironment(Stream continuation)
         {
-            var env = new Dictionary<string,string>();
+            using var memoryStream = new MemoryStream();
+            continuation.CopyTo(memoryStream, CopyBufferSize);
+            return ReadEnvironmentCore(memoryStream);
+        }
 
+        public async Task<Dictionary<string,string>> ReadEnvironmentAsync(Stream continuation, CancellationToken token = default(CancellationToken))
+        {
             using var memoryStream = new MemoryStream();
-            await continuation.CopyToAsync(memoryStream, (16 << 10) /* 16KiB */, token);
-            memoryStream.Seek(0, SeekOrigin.Begin);
-            byte[] envBlock = memoryStream.ToArray();
+            await continuation.CopyToAsync(memoryStream, CopyBufferSize, token);
+            return ReadEnvironmentCore(memoryStream);
+        }
+
+        private Dictionary<string, string> ReadEnvironmentCore(MemoryStream stream)
+        {
+            stream.Seek(0, SeekOrigin.Begin);
+            byte[] envBlock = stream.ToArray();
 
             if (envBlock.Length != (long)ExpectedSizeInBytes)
                 throw new ApplicationException($"ProcessEnvironment continuation length did not match expected length. Expected: {ExpectedSizeInBytes} bytes, Received: {envBlock.Length} bytes");
 
+            var env = new Dictionary<string, string>();
             int cursor = 0;
             UInt32 nElements = BitConverter.ToUInt32(envBlock, cursor);
             cursor += sizeof(UInt32);
@@ -44,7 +55,7 @@ namespace Microsoft.Diagnostics.NETCore.Client
             {
                 string pair = IpcHelpers.ReadString(envBlock, ref cursor);
                 int equalsIdx = pair.IndexOf('=');
-                env[pair.Substring(0,equalsIdx)] = equalsIdx != pair.Length - 1 ? pair.Substring(equalsIdx+1) : "";
+                env[pair.Substring(0, equalsIdx)] = equalsIdx != pair.Length - 1 ? pair.Substring(equalsIdx + 1) : "";
             }
 
             return env;
index c9171c22ef82b7f689ccd16d5dccaacae6efbc5e..ae4c0f3a16b2a47babd0dded0fb5e546f586996e 100644 (file)
@@ -24,6 +24,7 @@
     <InternalsVisibleTo Include="dotnet-monitor" />
     <InternalsVisibleTo Include="dotnet-trace" />
     <InternalsVisibleTo Include="Microsoft.Diagnostics.Monitoring" />
+    <InternalsVisibleTo Include="Microsoft.Diagnostics.Monitoring.EventPipe" />
     <!-- Temporary until Diagnostic Apis are finalized-->
     <InternalsVisibleTo Include="Microsoft.Diagnostics.Monitoring.WebApi" />
     <InternalsVisibleTo Include="Microsoft.Diagnostics.NETCore.Client.UnitTests" />
diff --git a/src/Microsoft.Diagnostics.NETCore.Client/StreamExtensions.cs b/src/Microsoft.Diagnostics.NETCore.Client/StreamExtensions.cs
new file mode 100644 (file)
index 0000000..3cf6f97
--- /dev/null
@@ -0,0 +1,34 @@
+// Licensed to the .NET Foundation under one or more agreements.
+// The .NET Foundation licenses this file to you under the MIT license.
+// See the LICENSE file in the project root for more information.
+
+using System.IO;
+using System.Threading;
+using System.Threading.Tasks;
+
+namespace Microsoft.Diagnostics.NETCore.Client
+{
+    internal static class StreamExtensions
+    {
+        public static async Task<byte[]> ReadBytesAsync(this Stream stream, int length, CancellationToken cancellationToken)
+        {
+            byte[] buffer = new byte[length];
+
+            int totalRead = 0;
+            int remaining = length;
+            while (remaining > 0)
+            {
+                int read = await stream.ReadAsync(buffer, totalRead, remaining, cancellationToken);
+                if (0 == read)
+                {
+                    throw new EndOfStreamException();
+                }
+
+                remaining -= read;
+                totalRead += read;
+            }
+
+            return buffer;
+        }
+    }
+}
index 89741af175af9c896e1d450bfe0e7f995a850480..7d2a718b0361c96227e6f5c15ce62a8d3887f238 100644 (file)
@@ -8,11 +8,13 @@ using Microsoft.Extensions.Logging;
 using System;
 using System.Collections.Generic;
 using System.IO;
+using System.Runtime.InteropServices;
 using System.Text.Json;
 using System.Threading;
 using System.Threading.Tasks;
 using Xunit;
 using Xunit.Abstractions;
+using Xunit.Extensions;
 
 namespace Microsoft.Diagnostics.Monitoring.EventPipe.UnitTests
 {
@@ -79,9 +81,14 @@ namespace Microsoft.Diagnostics.Monitoring.EventPipe.UnitTests
         /// <summary>
         /// Test that log events at the default level are collected for categories without a specified level.
         /// </summary>
-        [Fact]
+        [SkippableFact]
         public async Task TestLogsAllCategoriesDefaultLevelFallback()
         {
+            if (RuntimeInformation.IsOSPlatform(OSPlatform.Windows))
+            {
+                throw new SkipTestException("https://github.com/dotnet/diagnostics/issues/2541");
+            }
+
             using Stream outputStream = await GetLogsAsync(settings =>
             {
                 settings.UseAppFilters = false;
@@ -144,9 +151,14 @@ namespace Microsoft.Diagnostics.Monitoring.EventPipe.UnitTests
         /// Test that log events are collected for the categories and levels specified by the application
         /// and for the categories and levels specified in the filter specs.
         /// </summary>
-        [Fact]
+        [SkippableFact]
         public async Task TestLogsUseAppFiltersAndFilterSpecs()
         {
+            if (RuntimeInformation.IsOSPlatform(OSPlatform.Windows))
+            {
+                throw new SkipTestException("https://github.com/dotnet/diagnostics/issues/2541");
+            }
+
             using Stream outputStream = await GetLogsAsync(settings =>
             {
                 settings.FilterSpecs = new Dictionary<string, LogLevel?>()
diff --git a/src/tests/Microsoft.Diagnostics.NETCore.Client/DiagnosticsClientApiShim.cs b/src/tests/Microsoft.Diagnostics.NETCore.Client/DiagnosticsClientApiShim.cs
new file mode 100644 (file)
index 0000000..712bd03
--- /dev/null
@@ -0,0 +1,93 @@
+// Licensed to the .NET Foundation under one or more agreements.
+// The .NET Foundation licenses this file to you under the MIT license.
+// See the LICENSE file in the project root for more information.
+
+using System;
+using System.Collections.Generic;
+using System.Threading;
+using System.Threading.Tasks;
+
+namespace Microsoft.Diagnostics.NETCore.Client
+{
+    /// <summary>
+    /// Unifies the async and non-async methods of the DiagnosticsClient class
+    /// so that tests do not need to be duplicated for testing each version of the
+    /// same API.
+    /// </summary>
+    internal sealed class DiagnosticsClientApiShim
+    {
+        private readonly DiagnosticsClient _client;
+        private readonly bool _useAsync;
+
+        public DiagnosticsClientApiShim(DiagnosticsClient client, bool useAsync)
+        {
+            _client = client;
+            _useAsync = useAsync;
+        }
+
+        public async Task<Dictionary<string, string>> GetProcessEnvironment(TimeSpan timeout)
+        {
+            if (_useAsync)
+            {
+                using CancellationTokenSource cancellation = new CancellationTokenSource(timeout);
+                return await _client.GetProcessEnvironmentAsync(cancellation.Token).ConfigureAwait(false);
+            }
+            else
+            {
+                return _client.GetProcessEnvironment();
+            }
+        }
+
+        public async Task<ProcessInfo> GetProcessInfo(TimeSpan timeout)
+        {
+            if (_useAsync)
+            {
+                using CancellationTokenSource cancellation = new CancellationTokenSource(timeout);
+                return await _client.GetProcessInfoAsync(cancellation.Token).ConfigureAwait(false);
+            }
+            else
+            {
+                return _client.GetProcessInfo();
+            }
+        }
+
+        public async Task ResumeRuntime(TimeSpan timeout)
+        {
+            if (_useAsync)
+            {
+                using CancellationTokenSource cancellation = new CancellationTokenSource(timeout);
+                await _client.ResumeRuntimeAsync(cancellation.Token).ConfigureAwait(false);
+            }
+            else
+            {
+                _client.ResumeRuntime();
+            }
+        }
+
+        public async Task<EventPipeSession> StartEventPipeSession(IEnumerable<EventPipeProvider> providers, TimeSpan timeout)
+        {
+            if (_useAsync)
+            {
+                CancellationTokenSource cancellation = new CancellationTokenSource(timeout);
+                return await _client.StartEventPipeSessionAsync(providers, true, circularBufferMB: 256, cancellation.Token).ConfigureAwait(false);
+            }
+            else
+            {
+                return _client.StartEventPipeSession(providers);
+            }
+        }
+
+        public async Task<EventPipeSession> StartEventPipeSession(EventPipeProvider provider, TimeSpan timeout)
+        {
+            if (_useAsync)
+            {
+                CancellationTokenSource cancellation = new CancellationTokenSource(timeout);
+                return await _client.StartEventPipeSessionAsync(provider, true, circularBufferMB: 256, cancellation.Token).ConfigureAwait(false);
+            }
+            else
+            {
+                return _client.StartEventPipeSession(provider);
+            }
+        }
+    }
+}
diff --git a/src/tests/Microsoft.Diagnostics.NETCore.Client/DiagnosticsClientApiShimExtensions.cs b/src/tests/Microsoft.Diagnostics.NETCore.Client/DiagnosticsClientApiShimExtensions.cs
new file mode 100644 (file)
index 0000000..71db430
--- /dev/null
@@ -0,0 +1,41 @@
+// Licensed to the .NET Foundation under one or more agreements.
+// The .NET Foundation licenses this file to you under the MIT license.
+// See the LICENSE file in the project root for more information.
+
+using System;
+using System.Collections.Generic;
+using System.Threading.Tasks;
+
+namespace Microsoft.Diagnostics.NETCore.Client
+{
+    internal static class DiagnosticsClientApiShimExtensions
+    {
+        // Generous timeout to allow APIs to respond on slower or more constrained machines
+        private static readonly TimeSpan DefaultPositiveVerificationTimeout = TimeSpan.FromSeconds(30);
+
+        public static Task<Dictionary<string, string>> GetProcessEnvironment(this DiagnosticsClientApiShim shim)
+        {
+            return shim.GetProcessEnvironment(DefaultPositiveVerificationTimeout);
+        }
+
+        public static Task<ProcessInfo> GetProcessInfo(this DiagnosticsClientApiShim shim)
+        {
+            return shim.GetProcessInfo(DefaultPositiveVerificationTimeout);
+        }
+
+        public static Task ResumeRuntime(this DiagnosticsClientApiShim shim)
+        {
+            return shim.ResumeRuntime(DefaultPositiveVerificationTimeout);
+        }
+
+        public static Task<EventPipeSession> StartEventPipeSession(this DiagnosticsClientApiShim shim, IEnumerable<EventPipeProvider> providers)
+        {
+            return shim.StartEventPipeSession(providers, DefaultPositiveVerificationTimeout);
+        }
+
+        public static Task<EventPipeSession> StartEventPipeSession(this DiagnosticsClientApiShim shim, EventPipeProvider provider)
+        {
+            return shim.StartEventPipeSession(provider, DefaultPositiveVerificationTimeout);
+        }
+    }
+}
index 25ca36ea9b1ab2e1eab95420669addbb3c29e8cd..8a761289ca0bfd5a394ac15033682767d19d4872 100644 (file)
@@ -2,22 +2,14 @@
 // The .NET Foundation licenses this file to you under the MIT license.
 // See the LICENSE file in the project root for more information.
 
+using Microsoft.Diagnostics.Tracing;
 using System;
 using System.Collections.Generic;
-using System.Diagnostics;
 using System.Diagnostics.Tracing;
-using System.IO;
-using System.Runtime.InteropServices;
-using System.Text.RegularExpressions;
-using System.Threading;
 using System.Threading.Tasks;
 using Xunit;
 using Xunit.Abstractions;
 
-using Microsoft.Diagnostics.Tracing;
-using Microsoft.Diagnostics.TestHelpers;
-using Microsoft.Diagnostics.NETCore.Client;
-
 namespace Microsoft.Diagnostics.NETCore.Client
 {
     public class EventPipeSessionTests
@@ -29,16 +21,28 @@ namespace Microsoft.Diagnostics.NETCore.Client
             output = outputHelper;
         }
 
+        [Fact]
+        public Task BasicEventPipeSessionTest()
+        {
+            return BasicEventPipeSessionTestCore(useAsync: false);
+        }
+
+        [Fact]
+        public Task BasicEventPipeSessionTestAsync()
+        {
+            return BasicEventPipeSessionTestCore(useAsync: true);
+        }
+
         /// <summary>
         /// A simple test that checks if we can create an EventPipeSession on a child process
         /// </summary>
-        [Fact]
-        public void BasicEventPipeSessionTest()
+        private async Task BasicEventPipeSessionTestCore(bool useAsync)
+
         {
             using TestRunner runner = new TestRunner(CommonHelper.GetTraceePathWithArgs(), output);
             runner.Start(timeoutInMSPipeCreation: 15_000, testProcessTimeout: 60_000);
-            DiagnosticsClient client = new DiagnosticsClient(runner.Pid);
-            using (var session = client.StartEventPipeSession(new List<EventPipeProvider>()
+            DiagnosticsClientApiShim clientShim = new DiagnosticsClientApiShim(new DiagnosticsClient(runner.Pid), useAsync);
+            using (var session = await clientShim.StartEventPipeSession(new List<EventPipeProvider>()
             {
                 new EventPipeProvider("Microsoft-Windows-DotNETRuntime", EventLevel.Informational)
             }))
@@ -48,18 +52,29 @@ namespace Microsoft.Diagnostics.NETCore.Client
             runner.Stop();
         }
 
+        [Fact]
+        public Task EventPipeSessionStreamTest()
+        {
+            return EventPipeSessionStreamTestCore(useAsync: false);
+        }
+
+        [Fact]
+        public Task EventPipeSessionStreamTestAsync()
+        {
+            return EventPipeSessionStreamTestCore(useAsync: true);
+        }
+
         /// <summary>
         /// Checks if we can create an EventPipeSession and can get some expected events out of it.
         /// </summary>
-        [Fact]
-        public void EventPipeSessionStreamTest()
+        private async Task EventPipeSessionStreamTestCore(bool useAsync)
         {
             TestRunner runner = new TestRunner(CommonHelper.GetTraceePathWithArgs(), output);
             runner.Start(timeoutInMSPipeCreation: 15_000, testProcessTimeout: 60_000);
-            DiagnosticsClient client = new DiagnosticsClient(runner.Pid);
+            DiagnosticsClientApiShim clientShim = new DiagnosticsClientApiShim(new DiagnosticsClient(runner.Pid), useAsync);
             runner.PrintStatus();
             output.WriteLine($"[{DateTime.Now.ToString()}] Trying to start an EventPipe session on process {runner.Pid}");
-            using (var session = client.StartEventPipeSession(new List<EventPipeProvider>()
+            using (var session = await clientShim.StartEventPipeSession(new List<EventPipeProvider>()
             {
                 new EventPipeProvider("System.Runtime", EventLevel.Informational, 0, new Dictionary<string, string>() {
                     { "EventCounterIntervalSec", "1" }
@@ -96,33 +111,55 @@ namespace Microsoft.Diagnostics.NETCore.Client
             }
         }
 
+        [Fact]
+        public Task EventPipeSessionUnavailableTest()
+        {
+            return EventPipeSessionUnavailableTestCore(useAsync: false);
+        }
+
+        [Fact]
+        public Task EventPipeSessionUnavailableTestAsync()
+        {
+            return EventPipeSessionUnavailableTestCore(useAsync: true);
+        }
+
         /// <summary>
         /// Tries to start an EventPipe session on a non-existent process
         /// </summary>
-        [Fact]
-        public void EventPipeSessionUnavailableTest()
+        private async Task EventPipeSessionUnavailableTestCore(bool useAsync)
         {
             List<int> pids = new List<int>(DiagnosticsClient.GetPublishedProcesses());
             int arbitraryPid = 1;
 
-            DiagnosticsClient client = new DiagnosticsClient(arbitraryPid);
+            DiagnosticsClientApiShim clientShim = new DiagnosticsClientApiShim(new DiagnosticsClient(arbitraryPid), useAsync);
 
-            Assert.Throws<ServerNotAvailableException>(() => client.StartEventPipeSession(new List<EventPipeProvider>()
+            await Assert.ThrowsAsync<ServerNotAvailableException>(() => clientShim.StartEventPipeSession(new List<EventPipeProvider>()
             {
                 new EventPipeProvider("Microsoft-Windows-DotNETRuntime", EventLevel.Informational)
             }));
         }
 
+        [Fact]
+        public Task StartEventPipeSessionWithSingleProviderTest()
+        {
+            return StartEventPipeSessionWithSingleProviderTestCore(useAsync: false);
+        }
+
+        [Fact]
+        public Task StartEventPipeSessionWithSingleProviderTestAsync()
+        {
+            return StartEventPipeSessionWithSingleProviderTestCore(useAsync: true);
+        }
+
         /// <summary>
         /// Test for the method overload: public EventPipeSession StartEventPipeSession(EventPipeProvider provider, bool requestRundown=true, int circularBufferMB=256)
         /// </summary>
-        [Fact]
-        public void StartEventPipeSessionWithSingleProviderTest()
+        private async Task StartEventPipeSessionWithSingleProviderTestCore(bool useAsync)
         {
             using TestRunner runner = new TestRunner(CommonHelper.GetTraceePathWithArgs(), output);
             runner.Start(timeoutInMSPipeCreation: 15_000, testProcessTimeout: 60_000);
-            DiagnosticsClient client = new DiagnosticsClient(runner.Pid);
-            using (var session = client.StartEventPipeSession(new EventPipeProvider("Microsoft-Windows-DotNETRuntime", EventLevel.Informational)))
+            DiagnosticsClientApiShim clientShim = new DiagnosticsClientApiShim(new DiagnosticsClient(runner.Pid), useAsync);
+            using (var session = await clientShim.StartEventPipeSession(new EventPipeProvider("Microsoft-Windows-DotNETRuntime", EventLevel.Informational)))
             {
                 Assert.True(session.EventStream != null);
             }
index 848593cf7817a34362dec1cf52cf7e143cb98974..5a27f825fcbf524a7aee6e5fc04478f238492335 100644 (file)
@@ -4,14 +4,10 @@
 
 using System;
 using System.Collections.Generic;
+using System.Threading.Tasks;
 using Xunit;
 using Xunit.Abstractions;
 
-using Microsoft.Diagnostics.Tracing;
-using Microsoft.Diagnostics.TestHelpers;
-using Microsoft.Diagnostics.NETCore.Client;
-using Xunit.Extensions;
-
 namespace Microsoft.Diagnostics.NETCore.Client
 {
     public class ProcessEnvironmentTests
@@ -23,11 +19,22 @@ namespace Microsoft.Diagnostics.NETCore.Client
             output = outputHelper;
         }
 
+        [Fact]
+        public Task BasicEnvTest()
+        {
+            return BasicEnvTestCore(useAsync: false);
+        }
+
+        [Fact]
+        public Task BasicEnvTestAsync()
+        {
+            return BasicEnvTestCore(useAsync: true);
+        }
+
         /// <summary>
         /// A simple test that collects process environment.
         /// </summary>
-        [Fact]
-        public void BasicEnvTest()
+        private async Task BasicEnvTestCore(bool useAsync)
         {
             // as the attribute says, this test requires 5.0-rc1 or newer.  This has been tested locally on
             // an rc1 build and passes.  It is equivalent to the dotnet/runtime version of this test.
@@ -36,8 +43,8 @@ namespace Microsoft.Diagnostics.NETCore.Client
             string testVal = "BAR";
             runner.AddEnvVar(testKey, testVal);
             runner.Start(timeoutInMSPipeCreation: 3000);
-            DiagnosticsClient client = new DiagnosticsClient(runner.Pid);
-            Dictionary<string,string> env = client.GetProcessEnvironment();
+            var clientShim = new DiagnosticsClientApiShim(new DiagnosticsClient(runner.Pid), useAsync);
+            Dictionary<string,string> env = await clientShim.GetProcessEnvironment();
 
             Assert.True(env.ContainsKey(testKey) && env[testKey].Equals(testVal));
 
index da832fa6d46f3f1c01f3c707bad56e24b32db8fa..172d271657e61645eb13dab4e202c955e8d3b53f 100644 (file)
@@ -3,7 +3,7 @@
 // See the LICENSE file in the project root for more information.
 
 using System;
-using System.Threading;
+using System.Threading.Tasks;
 using Xunit;
 using Xunit.Abstractions;
 
@@ -19,16 +19,27 @@ namespace Microsoft.Diagnostics.NETCore.Client
         }
 
         [Fact]
-        public void BasicProcessInfoTest()
+        public Task BasicProcessInfoTest()
+        {
+            return BasicProcessInfoTestCore(useAsync: false);
+        }
+
+        [Fact]
+        public Task BasicProcessInfoTestAsync()
+        {
+            return BasicProcessInfoTestCore(useAsync: true);
+        }
+
+        private async Task BasicProcessInfoTestCore(bool useAsync)
         {
             using TestRunner runner = new TestRunner(CommonHelper.GetTraceePathWithArgs(targetFramework: "net5.0"), output);
             runner.Start();
 
             try
             {
-                DiagnosticsClient client = new DiagnosticsClient(runner.Pid);
+                DiagnosticsClientApiShim clientShim = new DiagnosticsClientApiShim(new DiagnosticsClient(runner.Pid), useAsync);
 
-                ProcessInfo processInfo = client.GetProcessInfo();
+                ProcessInfo processInfo = await clientShim.GetProcessInfo();
 
                 Assert.NotNull(processInfo);
                 Assert.Equal(runner.Pid, (int)processInfo.ProcessId);
index a0d931ae6c7dbc75b65aa2d221ed5eb49da2fae9..de76509a7a6e3838b32ac0d544bf3ad6002a8e4c 100644 (file)
@@ -189,7 +189,7 @@ namespace Microsoft.Diagnostics.NETCore.Client
                 // There should not be any new endpoint infos
                 await VerifyNoNewEndpointInfos(server, useAsync);
 
-                ResumeRuntime(info);
+                await ResumeRuntime(info, useAsync);
 
                 await VerifySingleSession(info, useAsync);
             }
@@ -246,7 +246,7 @@ namespace Microsoft.Diagnostics.NETCore.Client
                 // There should not be any new endpoint infos
                 await VerifyNoNewEndpointInfos(server, useAsync);
 
-                ResumeRuntime(info);
+                await ResumeRuntime(info, useAsync);
 
                 await VerifyWaitForConnection(info, useAsync);
             }
@@ -296,7 +296,7 @@ namespace Microsoft.Diagnostics.NETCore.Client
                 // There should not be any new endpoint infos
                 await VerifyNoNewEndpointInfos(server, useAsync: true);
 
-                ResumeRuntime(info);
+                await ResumeRuntime(info, useAsync: true);
 
                 await VerifyWaitForConnection(info, useAsync: true);
 
@@ -372,14 +372,14 @@ namespace Microsoft.Diagnostics.NETCore.Client
             _outputHelper.WriteLine($"Connection: {info.DebuggerDisplay}");
         }
 
-        private void ResumeRuntime(IpcEndpointInfo info)
+        private async Task ResumeRuntime(IpcEndpointInfo info, bool useAsync)
         {
-            var client = new DiagnosticsClient(info.Endpoint);
+            var clientShim = new DiagnosticsClientApiShim(new DiagnosticsClient(info.Endpoint), useAsync);
 
             _outputHelper.WriteLine($"{info.RuntimeInstanceCookie}: Resuming runtime instance.");
             try
             {
-                client.ResumeRuntime();
+                await clientShim.ResumeRuntime(DefaultPositiveVerificationTimeout);
                 _outputHelper.WriteLine($"{info.RuntimeInstanceCookie}: Resumed successfully.");
             }
             catch (ServerErrorException ex)
@@ -396,7 +396,7 @@ namespace Microsoft.Diagnostics.NETCore.Client
         {
             await VerifyWaitForConnection(info, useAsync);
 
-            var client = new DiagnosticsClient(info.Endpoint);
+            var clientShim = new DiagnosticsClientApiShim(new DiagnosticsClient(info.Endpoint), useAsync);
 
             _outputHelper.WriteLine($"{info.RuntimeInstanceCookie}: Creating session #1.");
             var providers = new List<EventPipeProvider>();
@@ -407,7 +407,7 @@ namespace Microsoft.Diagnostics.NETCore.Client
                 new Dictionary<string, string>() {
                     { "EventCounterIntervalSec", "1" }
                 }));
-            using var session = client.StartEventPipeSession(providers);
+            using var session = await clientShim.StartEventPipeSession(providers);
 
             _outputHelper.WriteLine($"{info.RuntimeInstanceCookie}: Verifying session produces events.");
             await VerifyEventStreamProvidesEventsAsync(info, session, 1);