From 85d4a375ab897cd9ab312c9f6bad2fe28d31b434 Mon Sep 17 00:00:00 2001 From: Johan Lorensson Date: Fri, 21 Oct 2022 11:33:51 +0200 Subject: [PATCH] Enhance client-client dotnet-dsrouter mode. (#3455) * Enhance dsrouter client-client mode. * Fix comment. Co-authored-by: Juan Hoyos * Review feedback. Co-authored-by: Juan Hoyos --- .../DiagnosticsServerRouterFactory.cs | 163 ++++++++++++------ 1 file changed, 115 insertions(+), 48 deletions(-) diff --git a/src/Microsoft.Diagnostics.NETCore.Client/DiagnosticsServerRouter/DiagnosticsServerRouterFactory.cs b/src/Microsoft.Diagnostics.NETCore.Client/DiagnosticsServerRouter/DiagnosticsServerRouterFactory.cs index c93262450..fbef8756e 100644 --- a/src/Microsoft.Diagnostics.NETCore.Client/DiagnosticsServerRouter/DiagnosticsServerRouterFactory.cs +++ b/src/Microsoft.Diagnostics.NETCore.Client/DiagnosticsServerRouter/DiagnosticsServerRouterFactory.cs @@ -335,7 +335,7 @@ namespace Microsoft.Diagnostics.NETCore.Client { } - async Task ConnectTcpStreamAsyncInternal(CancellationToken token, bool retry) + private async Task ConnectTcpStreamAsyncInternal(CancellationToken token, bool retry) { Stream tcpClientStream = null; @@ -396,7 +396,7 @@ namespace Microsoft.Diagnostics.NETCore.Client return tcpClientStream; } - async Task ConnectAsyncInternal(Socket clientSocket, EndPoint remoteEP, CancellationToken token) + private async Task ConnectAsyncInternal(Socket clientSocket, EndPoint remoteEP, CancellationToken token) { using (token.Register(() => clientSocket.Close(0))) { @@ -1059,6 +1059,7 @@ namespace Microsoft.Diagnostics.NETCore.Client /// internal class IpcClientTcpClientRouterFactory : DiagnosticsServerRouterFactory { + bool _updateRuntimeInfo; Guid _runtimeInstanceId; ulong _runtimeProcessId; ILogger _logger; @@ -1067,6 +1068,7 @@ namespace Microsoft.Diagnostics.NETCore.Client public IpcClientTcpClientRouterFactory(string ipcClient, string tcpClient, int runtimeTimeoutMs, TcpClientRouterFactory.CreateInstanceDelegate factory, ILogger logger) { + _updateRuntimeInfo = true; _runtimeInstanceId = Guid.Empty; _runtimeProcessId = 0; _logger = logger; @@ -1099,47 +1101,7 @@ namespace Microsoft.Diagnostics.NETCore.Client { _tcpClientRouterFactory.Start(); _logger?.LogInformation($"Starting IPC client ({_ipcClientRouterFactory.IpcClientPath}) <--> TCP client ({_tcpClientRouterFactory.TcpClientAddress}) router."); - - var requestRuntimeInfo = new Task(() => - { - try - { - _logger?.LogDebug($"Requesting runtime process information."); - - // Get new tcp client endpoint. - using var tcpClientStream = _tcpClientRouterFactory.ConnectTcpStreamAsync(token).Result; - - // Request process info. - IpcMessage message = new IpcMessage(DiagnosticsServerCommandSet.Process, (byte)ProcessCommandId.GetProcessInfo); - - byte[] buffer = message.Serialize(); - tcpClientStream.Write(buffer, 0, buffer.Length); - - var response = IpcMessage.Parse(tcpClientStream); - if ((DiagnosticsServerResponseId)response.Header.CommandId == DiagnosticsServerResponseId.OK) - { - var info = ProcessInfo.ParseV1(response.Payload); - - _runtimeProcessId = info.ProcessId; - _runtimeInstanceId = info.RuntimeInstanceCookie; - - _logger?.LogDebug($"Retrieved runtime process information, pid={_runtimeProcessId}, cookie={_runtimeInstanceId}."); - } - else - { - throw new ServerErrorException("Failed to retrieve runtime process info."); - } - } - catch (Exception) - { - _runtimeProcessId = (ulong)Process.GetCurrentProcess().Id; - _runtimeInstanceId = Guid.NewGuid(); - _logger?.LogWarning($"Failed to retrieve runtime process info, fallback to current process information, pid={_runtimeProcessId}, cookie={_runtimeInstanceId}."); - } - }); - - requestRuntimeInfo.Start(); - return requestRuntimeInfo; + return Task.CompletedTask; } public override Task Stop() @@ -1154,6 +1116,11 @@ namespace Microsoft.Diagnostics.NETCore.Client Stream tcpClientStream = null; Stream ipcClientStream = null; + int initFrontendToBackendByteTransfer = 0; + int initBackendToFrontendByteTransfer = 0; + + await UpdateRuntimeInfo(token).ConfigureAwait(false); + _logger?.LogDebug("Trying to create a new router instance."); try @@ -1190,6 +1157,7 @@ namespace Microsoft.Diagnostics.NETCore.Client { _logger?.LogInformation("Broken tcp connection detected, aborting ipc connection."); checkTcpStreamTask.GetAwaiter().GetResult(); + _updateRuntimeInfo = true; } throw; @@ -1200,17 +1168,28 @@ namespace Microsoft.Diagnostics.NETCore.Client try { await IpcAdvertise.SerializeAsync(ipcClientStream, _runtimeInstanceId, _runtimeProcessId, token).ConfigureAwait(false); + initBackendToFrontendByteTransfer = IpcAdvertise.V1SizeInBytes; } catch (Exception) { _logger?.LogDebug("Failed sending advertise message."); throw; } + + // Router needs to emulate backend behavior when running in client-client mode. + // A new router instance can not be complete until frontend starts to + // write data to backend or a new router instance will connect against frontend + // that in turn will disconnects previous accepted but pending connections, triggering + // frequent connects/disconnects. + initFrontendToBackendByteTransfer = await InitFrontendReadBackendWrite(ipcClientStream, tcpClientStream, token).ConfigureAwait(false); } catch (Exception) { _logger?.LogDebug("Failed creating new router instance."); + if (tcpClientStream == null || (tcpClientStream != null && ipcClientStream == null)) + _updateRuntimeInfo = true; + // Cleanup and rethrow. tcpClientStream?.Dispose(); ipcClientStream?.Dispose(); @@ -1221,7 +1200,95 @@ namespace Microsoft.Diagnostics.NETCore.Client // Create new router. _logger?.LogDebug("New router instance successfully created."); - return new Router(ipcClientStream, tcpClientStream, _logger, (ulong)IpcAdvertise.V1SizeInBytes); + return new Router(ipcClientStream, tcpClientStream, _logger, (ulong)initBackendToFrontendByteTransfer, (ulong)initFrontendToBackendByteTransfer); + } + + private async Task InitFrontendReadBackendWrite(Stream ipcClientStream, Stream tcpClientStream, CancellationToken token) + { + using CancellationTokenSource cancelReadConnect = CancellationTokenSource.CreateLinkedTokenSource(token); + + byte[] buffer = new byte[1024]; + using var readTask = ipcClientStream.ReadAsync(buffer, 0, buffer.Length, cancelReadConnect.Token); + + // Check tcp client connection while waiting on ipc client. + using var checkTcpStreamTask = IsStreamConnectedAsync(tcpClientStream, cancelReadConnect.Token); + + // Wait for completion of at least one task. + await Task.WhenAny(readTask, checkTcpStreamTask).ConfigureAwait(false); + + // Cancel out any pending tasks not yet completed. + cancelReadConnect.Cancel(); + + try + { + await Task.WhenAll(readTask, checkTcpStreamTask).ConfigureAwait(false); + } + catch (Exception) + { + if (readTask.IsFaulted) + _logger?.LogInformation("Broken ipc connection detected."); + + if (checkTcpStreamTask.IsFaulted) + { + _logger?.LogInformation("Broken tcp connection detected."); + _updateRuntimeInfo = true; + } + + throw; + } + + var bytesRead = readTask.Result; + if (bytesRead == 0) + { + _logger?.LogDebug("ReverseDiagnosticServer disconnected ipc connection."); + throw new DiagnosticsClientException("ReverseDiagnosticServer disconnect detected."); + } + + await tcpClientStream.WriteAsync(buffer, 0, bytesRead, token).ConfigureAwait(false); + + return bytesRead; + } + + private async Task UpdateRuntimeInfo(CancellationToken token) + { + if (!_updateRuntimeInfo) + return; + + try + { + _logger?.LogDebug($"Requesting runtime process information."); + + // Get new tcp client endpoint. + using var tcpClientStream = await _tcpClientRouterFactory.ConnectTcpStreamAsync(token, true).ConfigureAwait(false); + + // Request process info. + IpcMessage message = new IpcMessage(DiagnosticsServerCommandSet.Process, (byte)ProcessCommandId.GetProcessInfo); + + byte[] buffer = message.Serialize(); + await tcpClientStream.WriteAsync(buffer, 0, buffer.Length, token).ConfigureAwait(false); + + var response = IpcMessage.Parse(tcpClientStream); + if ((DiagnosticsServerResponseId)response.Header.CommandId == DiagnosticsServerResponseId.OK) + { + var info = ProcessInfo.ParseV1(response.Payload); + + _runtimeProcessId = info.ProcessId; + _runtimeInstanceId = info.RuntimeInstanceCookie; + + _logger?.LogDebug($"Retrieved runtime process information, pid={_runtimeProcessId}, cookie={_runtimeInstanceId}."); + } + else + { + throw new ServerErrorException("Failed to retrieve runtime process info."); + } + } + catch (Exception) + { + _runtimeProcessId = (ulong)Process.GetCurrentProcess().Id; + _runtimeInstanceId = Guid.NewGuid(); + _logger?.LogWarning($"Failed to retrieve runtime process info, fallback to current process information, pid={_runtimeProcessId}, cookie={_runtimeInstanceId}."); + } + _updateRuntimeInfo = false; } } @@ -1322,12 +1389,12 @@ namespace Microsoft.Diagnostics.NETCore.Client Interlocked.Decrement(ref s_routerInstanceCount); - _logger?.LogTrace($"Diposed stats: Back End->Front End {_backendToFrontendByteTransfer} bytes, Front End->Back End {_frontendToBackendByteTransfer} bytes."); - _logger?.LogTrace($"Active instances: {s_routerInstanceCount}"); + _logger?.LogDebug($"Diposed stats: Back End->Front End {_backendToFrontendByteTransfer} bytes, Front End->Back End {_frontendToBackendByteTransfer} bytes."); + _logger?.LogDebug($"Active instances: {s_routerInstanceCount}"); } } - async Task BackendReadFrontendWrite(CancellationToken token) + private async Task BackendReadFrontendWrite(CancellationToken token) { try { @@ -1368,7 +1435,7 @@ namespace Microsoft.Diagnostics.NETCore.Client RouterTaskCompleted?.TrySetResult(true); } - async Task FrontendReadBackendWrite(CancellationToken token) + private async Task FrontendReadBackendWrite(CancellationToken token) { try { -- 2.34.1