Enhance client-client dotnet-dsrouter mode. (#3455)
authorJohan Lorensson <lateralusx.github@gmail.com>
Fri, 21 Oct 2022 09:33:51 +0000 (11:33 +0200)
committerGitHub <noreply@github.com>
Fri, 21 Oct 2022 09:33:51 +0000 (09:33 +0000)
* Enhance dsrouter client-client mode.

* Fix comment.

Co-authored-by: Juan Hoyos <juan.hoyos@microsoft.com>
* Review feedback.

Co-authored-by: Juan Hoyos <juan.hoyos@microsoft.com>
src/Microsoft.Diagnostics.NETCore.Client/DiagnosticsServerRouter/DiagnosticsServerRouterFactory.cs

index c9326245071efb55664eb1fb9e29608b3a5acc5f..fbef8756efd5545e0ab34d87cda489135fb088c9 100644 (file)
@@ -335,7 +335,7 @@ namespace Microsoft.Diagnostics.NETCore.Client
         {
         }
 
-        async Task<Stream> ConnectTcpStreamAsyncInternal(CancellationToken token, bool retry)
+        private async Task<Stream> ConnectTcpStreamAsyncInternal(CancellationToken token, bool retry)
         {
             Stream tcpClientStream = null;
 
@@ -396,7 +396,7 @@ namespace Microsoft.Diagnostics.NETCore.Client
             return tcpClientStream;
         }
 
-        async Task ConnectAsyncInternal(Socket clientSocket, EndPoint remoteEP, CancellationToken token)
+        private async Task ConnectAsyncInternal(Socket clientSocket, EndPoint remoteEP, CancellationToken token)
         {
             using (token.Register(() => clientSocket.Close(0)))
             {
@@ -1059,6 +1059,7 @@ namespace Microsoft.Diagnostics.NETCore.Client
     /// </summary>
     internal class IpcClientTcpClientRouterFactory : DiagnosticsServerRouterFactory
     {
+        bool _updateRuntimeInfo;
         Guid _runtimeInstanceId;
         ulong _runtimeProcessId;
         ILogger _logger;
@@ -1067,6 +1068,7 @@ namespace Microsoft.Diagnostics.NETCore.Client
 
         public IpcClientTcpClientRouterFactory(string ipcClient, string tcpClient, int runtimeTimeoutMs, TcpClientRouterFactory.CreateInstanceDelegate factory, ILogger logger)
         {
+            _updateRuntimeInfo = true;
             _runtimeInstanceId = Guid.Empty;
             _runtimeProcessId = 0;
             _logger = logger;
@@ -1099,47 +1101,7 @@ namespace Microsoft.Diagnostics.NETCore.Client
         {
             _tcpClientRouterFactory.Start();
             _logger?.LogInformation($"Starting IPC client ({_ipcClientRouterFactory.IpcClientPath}) <--> TCP client ({_tcpClientRouterFactory.TcpClientAddress}) router.");
-
-            var requestRuntimeInfo = new Task(() =>
-            {
-                try
-                {
-                    _logger?.LogDebug($"Requesting runtime process information.");
-
-                    // Get new tcp client endpoint.
-                    using var tcpClientStream = _tcpClientRouterFactory.ConnectTcpStreamAsync(token).Result;
-
-                    // Request process info.
-                    IpcMessage message = new IpcMessage(DiagnosticsServerCommandSet.Process, (byte)ProcessCommandId.GetProcessInfo);
-                
-                    byte[] buffer = message.Serialize();
-                    tcpClientStream.Write(buffer, 0, buffer.Length);
-
-                    var response = IpcMessage.Parse(tcpClientStream);
-                    if ((DiagnosticsServerResponseId)response.Header.CommandId == DiagnosticsServerResponseId.OK)
-                    {
-                        var info = ProcessInfo.ParseV1(response.Payload);
-
-                        _runtimeProcessId = info.ProcessId;
-                        _runtimeInstanceId = info.RuntimeInstanceCookie;
-
-                        _logger?.LogDebug($"Retrieved runtime process information, pid={_runtimeProcessId}, cookie={_runtimeInstanceId}.");
-                    }
-                    else
-                    {
-                        throw new ServerErrorException("Failed to retrieve runtime process info.");
-                    }
-                }
-                catch (Exception)
-                {
-                    _runtimeProcessId = (ulong)Process.GetCurrentProcess().Id;
-                    _runtimeInstanceId = Guid.NewGuid();
-                    _logger?.LogWarning($"Failed to retrieve runtime process info, fallback to current process information, pid={_runtimeProcessId}, cookie={_runtimeInstanceId}.");
-                }
-            });
-
-            requestRuntimeInfo.Start();
-            return requestRuntimeInfo;
+            return Task.CompletedTask;
         }
 
         public override Task Stop()
@@ -1154,6 +1116,11 @@ namespace Microsoft.Diagnostics.NETCore.Client
             Stream tcpClientStream = null;
             Stream ipcClientStream = null;
 
+            int initFrontendToBackendByteTransfer = 0;
+            int initBackendToFrontendByteTransfer = 0;
+
+            await UpdateRuntimeInfo(token).ConfigureAwait(false);
+
             _logger?.LogDebug("Trying to create a new router instance.");
 
             try
@@ -1190,6 +1157,7 @@ namespace Microsoft.Diagnostics.NETCore.Client
                     {
                         _logger?.LogInformation("Broken tcp connection detected, aborting ipc connection.");
                         checkTcpStreamTask.GetAwaiter().GetResult();
+                        _updateRuntimeInfo = true;
                     }
 
                     throw;
@@ -1200,17 +1168,28 @@ namespace Microsoft.Diagnostics.NETCore.Client
                 try
                 {
                     await IpcAdvertise.SerializeAsync(ipcClientStream, _runtimeInstanceId, _runtimeProcessId, token).ConfigureAwait(false);
+                    initBackendToFrontendByteTransfer = IpcAdvertise.V1SizeInBytes;
                 }
                 catch (Exception)
                 {
                     _logger?.LogDebug("Failed sending advertise message.");
                     throw;
                 }
+
+                // Router needs to emulate backend behavior when running in client-client mode.
+                // A new router instance can not be complete until frontend starts to
+                // write data to backend or a new router instance will connect against frontend
+                // that in turn will disconnects previous accepted but pending connections, triggering
+                // frequent connects/disconnects.
+                initFrontendToBackendByteTransfer = await InitFrontendReadBackendWrite(ipcClientStream, tcpClientStream, token).ConfigureAwait(false);
             }
             catch (Exception)
             {
                 _logger?.LogDebug("Failed creating new router instance.");
 
+                if (tcpClientStream == null || (tcpClientStream != null && ipcClientStream == null))
+                    _updateRuntimeInfo = true;
+
                 // Cleanup and rethrow.
                 tcpClientStream?.Dispose();
                 ipcClientStream?.Dispose();
@@ -1221,7 +1200,95 @@ namespace Microsoft.Diagnostics.NETCore.Client
             // Create new router.
             _logger?.LogDebug("New router instance successfully created.");
 
-            return new Router(ipcClientStream, tcpClientStream, _logger, (ulong)IpcAdvertise.V1SizeInBytes);
+            return new Router(ipcClientStream, tcpClientStream, _logger, (ulong)initBackendToFrontendByteTransfer, (ulong)initFrontendToBackendByteTransfer);
+        }
+
+        private async Task<int> InitFrontendReadBackendWrite(Stream ipcClientStream, Stream tcpClientStream, CancellationToken token)
+        {
+            using CancellationTokenSource cancelReadConnect = CancellationTokenSource.CreateLinkedTokenSource(token);
+
+            byte[] buffer = new byte[1024];
+            using var readTask = ipcClientStream.ReadAsync(buffer, 0, buffer.Length, cancelReadConnect.Token);
+
+            // Check tcp client connection while waiting on ipc client.
+            using var checkTcpStreamTask = IsStreamConnectedAsync(tcpClientStream, cancelReadConnect.Token);
+
+            // Wait for completion of at least one task.
+            await Task.WhenAny(readTask, checkTcpStreamTask).ConfigureAwait(false);
+
+            // Cancel out any pending tasks not yet completed.
+            cancelReadConnect.Cancel();
+
+            try
+            {
+                await Task.WhenAll(readTask, checkTcpStreamTask).ConfigureAwait(false);
+            }
+            catch (Exception)
+            {
+                if (readTask.IsFaulted)
+                    _logger?.LogInformation("Broken ipc connection detected.");
+
+                if (checkTcpStreamTask.IsFaulted)
+                {
+                    _logger?.LogInformation("Broken tcp connection detected.");
+                    _updateRuntimeInfo = true;
+                }
+
+                throw;
+            }
+
+            var bytesRead = readTask.Result;
+            if (bytesRead == 0)
+            {
+                _logger?.LogDebug("ReverseDiagnosticServer disconnected ipc connection.");
+                throw new DiagnosticsClientException("ReverseDiagnosticServer disconnect detected.");
+            }
+
+            await tcpClientStream.WriteAsync(buffer, 0, bytesRead, token).ConfigureAwait(false);
+
+            return bytesRead;
+        }
+
+        private async Task UpdateRuntimeInfo(CancellationToken token)
+        {
+            if (!_updateRuntimeInfo)
+                return;
+
+            try
+            {
+                _logger?.LogDebug($"Requesting runtime process information.");
+
+                // Get new tcp client endpoint.
+                using var tcpClientStream = await _tcpClientRouterFactory.ConnectTcpStreamAsync(token, true).ConfigureAwait(false);
+
+                // Request process info.
+                IpcMessage message = new IpcMessage(DiagnosticsServerCommandSet.Process, (byte)ProcessCommandId.GetProcessInfo);
+
+                byte[] buffer = message.Serialize();
+                await tcpClientStream.WriteAsync(buffer, 0, buffer.Length, token).ConfigureAwait(false);
+
+                var response = IpcMessage.Parse(tcpClientStream);
+                if ((DiagnosticsServerResponseId)response.Header.CommandId == DiagnosticsServerResponseId.OK)
+                {
+                    var info = ProcessInfo.ParseV1(response.Payload);
+
+                    _runtimeProcessId = info.ProcessId;
+                    _runtimeInstanceId = info.RuntimeInstanceCookie;
+
+                    _logger?.LogDebug($"Retrieved runtime process information, pid={_runtimeProcessId}, cookie={_runtimeInstanceId}.");
+                }
+                else
+                {
+                    throw new ServerErrorException("Failed to retrieve runtime process info.");
+                }
+            }
+            catch (Exception)
+            {
+                _runtimeProcessId = (ulong)Process.GetCurrentProcess().Id;
+                _runtimeInstanceId = Guid.NewGuid();
+                _logger?.LogWarning($"Failed to retrieve runtime process info, fallback to current process information, pid={_runtimeProcessId}, cookie={_runtimeInstanceId}.");
+            }
+            _updateRuntimeInfo = false;
         }
     }
 
@@ -1322,12 +1389,12 @@ namespace Microsoft.Diagnostics.NETCore.Client
 
                 Interlocked.Decrement(ref s_routerInstanceCount);
 
-                _logger?.LogTrace($"Diposed stats: Back End->Front End {_backendToFrontendByteTransfer} bytes, Front End->Back End {_frontendToBackendByteTransfer} bytes.");
-                _logger?.LogTrace($"Active instances: {s_routerInstanceCount}");
+                _logger?.LogDebug($"Diposed stats: Back End->Front End {_backendToFrontendByteTransfer} bytes, Front End->Back End {_frontendToBackendByteTransfer} bytes.");
+                _logger?.LogDebug($"Active instances: {s_routerInstanceCount}");
             }
         }
 
-        async Task BackendReadFrontendWrite(CancellationToken token)
+        private async Task BackendReadFrontendWrite(CancellationToken token)
         {
             try
             {
@@ -1368,7 +1435,7 @@ namespace Microsoft.Diagnostics.NETCore.Client
             RouterTaskCompleted?.TrySetResult(true);
         }
 
-        async Task FrontendReadBackendWrite(CancellationToken token)
+        private async Task FrontendReadBackendWrite(CancellationToken token)
         {
             try
             {