From d6236fa477bdfd251e5e8bbdb43a733bbb05aac7 Mon Sep 17 00:00:00 2001 From: Johan Lorensson Date: Tue, 6 Jul 2021 18:53:46 +0200 Subject: [PATCH] Add dsrouter client-client mode. (#2385) * Add dsrouter client-client mode. * Make retry explicit parameter in ConnectTcpStreamAsync. --- .../DiagnosticsServerRouterFactory.cs | 223 ++++++++++++++++-- .../DiagnosticsServerRouterRunner.cs | 10 +- .../DiagnosticsServerRouterCommands.cs | 60 +++++ src/Tools/dotnet-dsrouter/Program.cs | 15 ++ .../USBMuxTcpClientRouterFactory.cs | 43 ++-- 5 files changed, 312 insertions(+), 39 deletions(-) diff --git a/src/Microsoft.Diagnostics.NETCore.Client/DiagnosticsServerRouter/DiagnosticsServerRouterFactory.cs b/src/Microsoft.Diagnostics.NETCore.Client/DiagnosticsServerRouter/DiagnosticsServerRouterFactory.cs index c92f607b1..98c3febb1 100644 --- a/src/Microsoft.Diagnostics.NETCore.Client/DiagnosticsServerRouter/DiagnosticsServerRouterFactory.cs +++ b/src/Microsoft.Diagnostics.NETCore.Client/DiagnosticsServerRouter/DiagnosticsServerRouterFactory.cs @@ -44,7 +44,7 @@ namespace Microsoft.Diagnostics.NETCore.Client public virtual ILogger Logger { get; } - public virtual void Start() + public virtual Task Start(CancellationToken token) { throw new NotImplementedException(); } @@ -319,12 +319,29 @@ namespace Microsoft.Diagnostics.NETCore.Client } public virtual async Task ConnectTcpStreamAsync(CancellationToken token) + { + return await ConnectTcpStreamAsyncInternal(token, _auto_shutdown).ConfigureAwait(false); + } + + public virtual async Task ConnectTcpStreamAsync(CancellationToken token, bool retry) + { + return await ConnectTcpStreamAsyncInternal(token, retry).ConfigureAwait(false); + } + + public virtual void Start() + { + } + + public virtual void Stop() + { + } + + async Task ConnectTcpStreamAsyncInternal(CancellationToken token, bool retry) { Stream tcpClientStream = null; _logger?.LogDebug($"Connecting new tcp endpoint \"{_tcpClientAddress}\"."); - bool retry = false; IpcTcpSocketEndPoint clientTcpEndPoint = new IpcTcpSocketEndPoint(_tcpClientAddress); Socket clientSocket = null; @@ -339,7 +356,7 @@ namespace Microsoft.Diagnostics.NETCore.Client try { - await ConnectAsyncInternal(clientSocket, clientTcpEndPoint, token).ConfigureAwait(false); + await ConnectAsyncInternal(clientSocket, clientTcpEndPoint, connectTokenSource.Token).ConfigureAwait(false); retry = false; } catch (Exception) @@ -356,10 +373,10 @@ namespace Microsoft.Diagnostics.NETCore.Client throw new TimeoutException(); } - // If we are not doing auto shutdown when runtime is unavailable, fail right away, this will + // If we are not doing retries when runtime is unavailable, fail right away, this will // break any accepted IPC connections, making sure client is notified and could reconnect. - // If we do have auto shutdown enabled, retry until succeed or time out. - if (!_auto_shutdown) + // If not, retry until succeed or time out. + if (!retry) { _logger?.LogTrace($"Failed connecting {_tcpClientAddress}."); throw; @@ -370,8 +387,6 @@ namespace Microsoft.Diagnostics.NETCore.Client // If we get an error (without hitting timeout above), most likely due to unavailable listener. // Delay execution to prevent to rapid retry attempts. await Task.Delay(TcpClientRetryTimeoutMs, token).ConfigureAwait(false); - - retry = true; } } while (retry); @@ -382,14 +397,6 @@ namespace Microsoft.Diagnostics.NETCore.Client return tcpClientStream; } - public virtual void Start() - { - } - - public virtual void Stop() - { - } - async Task ConnectAsyncInternal(Socket clientSocket, EndPoint remoteEP, CancellationToken token) { using (token.Register(() => clientSocket.Close(0))) @@ -650,12 +657,14 @@ namespace Microsoft.Diagnostics.NETCore.Client } } - public override void Start() + public override Task Start(CancellationToken token) { _tcpServerRouterFactory.Start(); _ipcServerRouterFactory.Start(); _logger?.LogInformation($"Starting IPC server ({_ipcServerRouterFactory.IpcServerPath}) <--> TCP server ({_tcpServerRouterFactory.TcpServerAddress}) router."); + + return Task.CompletedTask; } public override Task Stop() @@ -840,11 +849,13 @@ namespace Microsoft.Diagnostics.NETCore.Client } } - public override void Start() + public override Task Start(CancellationToken token) { _ipcServerRouterFactory.Start(); _tcpClientRouterFactory.Start(); _logger?.LogInformation($"Starting IPC server ({_ipcServerRouterFactory.IpcServerPath}) <--> TCP client ({_tcpClientRouterFactory.TcpClientAddress}) router."); + + return Task.CompletedTask; } public override Task Stop() @@ -962,10 +973,12 @@ namespace Microsoft.Diagnostics.NETCore.Client } } - public override void Start() + public override Task Start(CancellationToken token) { _tcpServerRouterFactory.Start(); _logger?.LogInformation($"Starting IPC client ({_ipcClientRouterFactory.IpcClientPath}) <--> TCP server ({_tcpServerRouterFactory.TcpServerAddress}) router."); + + return Task.CompletedTask; } public override Task Stop() @@ -1056,6 +1069,178 @@ namespace Microsoft.Diagnostics.NETCore.Client } } + /// + /// This class creates IPC Client - TCP Client router instances. + /// Supports NamedPipes/UnixDomainSocket client and TCP/IP client. + /// + internal class IpcClientTcpClientRouterFactory : DiagnosticsServerRouterFactory + { + Guid _runtimeInstanceId; + ulong _runtimeProcessId; + ILogger _logger; + IpcClientRouterFactory _ipcClientRouterFactory; + TcpClientRouterFactory _tcpClientRouterFactory; + + public IpcClientTcpClientRouterFactory(string ipcClient, string tcpClient, int runtimeTimeoutMs, TcpClientRouterFactory.CreateInstanceDelegate factory, ILogger logger) + { + _runtimeInstanceId = Guid.Empty; + _runtimeProcessId = 0; + _logger = logger; + _ipcClientRouterFactory = new IpcClientRouterFactory(ipcClient, logger); + _tcpClientRouterFactory = factory(tcpClient, runtimeTimeoutMs, logger); + } + + public override string IpcAddress { + get + { + return _ipcClientRouterFactory.IpcClientPath; + } + } + + public override string TcpAddress { + get + { + return _tcpClientRouterFactory.TcpClientAddress; + } + } + + public override ILogger Logger { + get + { + return _logger; + } + } + + public override Task Start(CancellationToken token) + { + _tcpClientRouterFactory.Start(); + _logger?.LogInformation($"Starting IPC client ({_ipcClientRouterFactory.IpcClientPath}) <--> TCP client ({_tcpClientRouterFactory.TcpClientAddress}) router."); + + var requestRuntimeInfo = new Task(() => + { + try + { + _logger?.LogDebug($"Requesting runtime process information."); + + // Get new tcp client endpoint. + using var tcpClientStream = _tcpClientRouterFactory.ConnectTcpStreamAsync(token).Result; + + // Request process info. + IpcMessage message = new IpcMessage(DiagnosticsServerCommandSet.Process, (byte)ProcessCommandId.GetProcessInfo); + + byte[] buffer = message.Serialize(); + tcpClientStream.Write(buffer, 0, buffer.Length); + + var response = IpcMessage.Parse(tcpClientStream); + if ((DiagnosticsServerResponseId)response.Header.CommandId == DiagnosticsServerResponseId.OK) + { + var info = ProcessInfo.Parse(response.Payload); + + _runtimeProcessId = info.ProcessId; + _runtimeInstanceId = info.RuntimeInstanceCookie; + + _logger?.LogDebug($"Retrieved runtime process information, pid={_runtimeProcessId}, cookie={_runtimeInstanceId}."); + } + else + { + throw new ServerErrorException("Failed to retrieve runtime process info."); + } + } + catch (Exception) + { + _runtimeProcessId = (ulong)Process.GetCurrentProcess().Id; + _runtimeInstanceId = Guid.NewGuid(); + _logger?.LogWarning($"Failed to retrieve runtime process info, fallback to current process information, pid={_runtimeProcessId}, cookie={_runtimeInstanceId}."); + } + }); + + requestRuntimeInfo.Start(); + return requestRuntimeInfo; + } + + public override Task Stop() + { + _logger?.LogInformation($"Stopping IPC client ({_ipcClientRouterFactory.IpcClientPath}) <--> TCP client ({_tcpClientRouterFactory.TcpClientAddress}) router."); + _tcpClientRouterFactory.Stop(); + return Task.CompletedTask; + } + + public override async Task CreateRouterAsync(CancellationToken token) + { + Stream tcpClientStream = null; + Stream ipcClientStream = null; + + _logger?.LogDebug("Trying to create a new router instance."); + + try + { + using CancellationTokenSource cancelRouter = CancellationTokenSource.CreateLinkedTokenSource(token); + + // Get new tcp client endpoint. + tcpClientStream = await _tcpClientRouterFactory.ConnectTcpStreamAsync(cancelRouter.Token, true).ConfigureAwait(false); + + // Get new ipc client endpoint. + using var ipcClientStreamTask = _ipcClientRouterFactory.ConnectIpcStreamAsync(cancelRouter.Token); + + // We have a valid tcp stream and a pending ipc stream. Wait for completion + // or disconnect of tcp stream. + using var checkTcpStreamTask = IsStreamConnectedAsync(tcpClientStream, cancelRouter.Token); + + // Wait for at least completion of one task. + await Task.WhenAny(ipcClientStreamTask, checkTcpStreamTask).ConfigureAwait(false); + + // Cancel out any pending tasks not yet completed. + cancelRouter.Cancel(); + + try + { + await Task.WhenAll(ipcClientStreamTask, checkTcpStreamTask).ConfigureAwait(false); + } + catch (Exception) + { + // Check if we have an accepted ipc stream. + if (IsCompletedSuccessfully(ipcClientStreamTask)) + ipcClientStreamTask.Result?.Dispose(); + + if (checkTcpStreamTask.IsFaulted) + { + _logger?.LogInformation("Broken tcp connection detected, aborting ipc connection."); + checkTcpStreamTask.GetAwaiter().GetResult(); + } + + throw; + } + + ipcClientStream = ipcClientStreamTask.Result; + + try + { + await IpcAdvertise.SerializeAsync(ipcClientStream, _runtimeInstanceId, _runtimeProcessId, token).ConfigureAwait(false); + } + catch (Exception) + { + _logger?.LogDebug("Failed sending advertise message."); + throw; + } + } + catch (Exception) + { + _logger?.LogDebug("Failed creating new router instance."); + + // Cleanup and rethrow. + tcpClientStream?.Dispose(); + ipcClientStream?.Dispose(); + + throw; + } + + // Create new router. + _logger?.LogDebug("New router instance successfully created."); + + return new Router(ipcClientStream, tcpClientStream, _logger, (ulong)IpcAdvertise.V1SizeInBytes); + } + } + internal class Router : IDisposable { readonly ILogger _logger; diff --git a/src/Microsoft.Diagnostics.NETCore.Client/DiagnosticsServerRouter/DiagnosticsServerRouterRunner.cs b/src/Microsoft.Diagnostics.NETCore.Client/DiagnosticsServerRouter/DiagnosticsServerRouterRunner.cs index 91d087475..24546fe2c 100644 --- a/src/Microsoft.Diagnostics.NETCore.Client/DiagnosticsServerRouter/DiagnosticsServerRouterRunner.cs +++ b/src/Microsoft.Diagnostics.NETCore.Client/DiagnosticsServerRouter/DiagnosticsServerRouterRunner.cs @@ -37,6 +37,11 @@ namespace Microsoft.Diagnostics.NETCore.Client return await runRouter(token, new IpcServerTcpClientRouterFactory(ipcServer, tcpClient, runtimeTimeoutMs, tcpClientRouterFactory, logger), callbacks).ConfigureAwait(false); } + public static async Task runIpcClientTcpClientRouter(CancellationToken token, string ipcClient, string tcpClient, int runtimeTimeoutMs, TcpClientRouterFactory.CreateInstanceDelegate tcpClientRouterFactory, ILogger logger, Callbacks callbacks) + { + return await runRouter(token, new IpcClientTcpClientRouterFactory(ipcClient, tcpClient, runtimeTimeoutMs, tcpClientRouterFactory, logger), callbacks).ConfigureAwait(false); + } + public static bool isLoopbackOnly(string address) { bool isLooback = false; @@ -58,8 +63,9 @@ namespace Microsoft.Diagnostics.NETCore.Client try { - routerFactory.Start(); - callbacks?.OnRouterStarted(routerFactory.TcpAddress); + await routerFactory.Start(token); + if (!token.IsCancellationRequested) + callbacks?.OnRouterStarted(routerFactory.TcpAddress); while (!token.IsCancellationRequested) { diff --git a/src/Tools/dotnet-dsrouter/DiagnosticsServerRouterCommands.cs b/src/Tools/dotnet-dsrouter/DiagnosticsServerRouterCommands.cs index f79b5cbcc..164544f63 100644 --- a/src/Tools/dotnet-dsrouter/DiagnosticsServerRouterCommands.cs +++ b/src/Tools/dotnet-dsrouter/DiagnosticsServerRouterCommands.cs @@ -224,6 +224,66 @@ namespace Microsoft.Diagnostics.Tools.DiagnosticsServerRouter return routerTask.Result; } + public async Task RunIpcClientTcpClientRouter(CancellationToken token, string ipcClient, string tcpClient, int runtimeTimeout, string verbose, string forwardPort) + { + using CancellationTokenSource cancelRouterTask = new CancellationTokenSource(); + using CancellationTokenSource linkedCancelToken = CancellationTokenSource.CreateLinkedTokenSource(token, cancelRouterTask.Token); + + LogLevel logLevel = LogLevel.Information; + if (string.Compare(verbose, "debug", StringComparison.OrdinalIgnoreCase) == 0) + logLevel = LogLevel.Debug; + else if (string.Compare(verbose, "trace", StringComparison.OrdinalIgnoreCase) == 0) + logLevel = LogLevel.Trace; + + using var factory = new LoggerFactory(); + factory.AddConsole(logLevel, false); + + Launcher.SuspendProcess = true; + Launcher.ConnectMode = false; + Launcher.Verbose = logLevel != LogLevel.Information; + Launcher.CommandToken = token; + + var logger = factory.CreateLogger("dotnet-dsrouter"); + + TcpClientRouterFactory.CreateInstanceDelegate tcpClientRouterFactory = TcpClientRouterFactory.CreateDefaultInstance; + if (!string.IsNullOrEmpty(forwardPort)) + { + if (string.Compare(forwardPort, "android", StringComparison.OrdinalIgnoreCase) == 0) + { + tcpClientRouterFactory = ADBTcpClientRouterFactory.CreateADBInstance; + } + else if (string.Compare(forwardPort, "ios", StringComparison.OrdinalIgnoreCase) == 0) + { + tcpClientRouterFactory = USBMuxTcpClientRouterFactory.CreateUSBMuxInstance; + } + else + { + logger.LogError($"Unknown port forwarding argument, {forwardPort}. Ignoring --forward-port argument."); + } + } + + var routerTask = DiagnosticsServerRouterRunner.runIpcClientTcpClientRouter(linkedCancelToken.Token, ipcClient, tcpClient, runtimeTimeout == Timeout.Infinite ? runtimeTimeout : runtimeTimeout * 1000, tcpClientRouterFactory, logger, Launcher); + + while (!linkedCancelToken.IsCancellationRequested) + { + await Task.WhenAny(routerTask, Task.Delay(250)).ConfigureAwait(false); + if (routerTask.IsCompleted) + break; + + if (!Console.IsInputRedirected && Console.KeyAvailable) + { + ConsoleKey cmd = Console.ReadKey(true).Key; + if (cmd == ConsoleKey.Q) + { + cancelRouterTask.Cancel(); + break; + } + } + } + + return routerTask.Result; + } + static void checkLoopbackOnly(string tcpServer) { if (!string.IsNullOrEmpty(tcpServer) && !DiagnosticsServerRouterRunner.isLoopbackOnly(tcpServer)) diff --git a/src/Tools/dotnet-dsrouter/Program.cs b/src/Tools/dotnet-dsrouter/Program.cs index 52ab8165a..7b9e08a8e 100644 --- a/src/Tools/dotnet-dsrouter/Program.cs +++ b/src/Tools/dotnet-dsrouter/Program.cs @@ -20,6 +20,7 @@ namespace Microsoft.Diagnostics.Tools.DiagnosticsServerRouter delegate Task DiagnosticsServerIpcClientTcpServerRouterDelegate(CancellationToken ct, string ipcClient, string tcpServer, int runtimeTimeoutS, string verbose, string forwardPort); delegate Task DiagnosticsServerIpcServerTcpServerRouterDelegate(CancellationToken ct, string ipcServer, string tcpServer, int runtimeTimeoutS, string verbose, string forwardPort); delegate Task DiagnosticsServerIpcServerTcpClientRouterDelegate(CancellationToken ct, string ipcServer, string tcpClient, int runtimeTimeoutS, string verbose, string forwardPort); + delegate Task DiagnosticsServerIpcClientTcpClientRouterDelegate(CancellationToken ct, string ipcClient, string tcpClient, int runtimeTimeoutS, string verbose, string forwardPort); private static Command IpcClientTcpServerRouterCommand() => new Command( @@ -60,6 +61,19 @@ namespace Microsoft.Diagnostics.Tools.DiagnosticsServerRouter IpcServerAddressOption(), TcpClientAddressOption(), RuntimeTimeoutOption(), VerboseOption(), ForwardPortOption() }; + private static Command IpcClientTcpClientRouterCommand() => + new Command( + name: "client-client", + description: "Start a .NET application Diagnostics Server routing local IPC server <--> remote TCP server. " + + "Router is configured using an IPC client (connecting diagnostic tool IPC server) " + + "and a TCP/IP client (connecting runtime TCP server).") + { + // Handler + HandlerDescriptor.FromDelegate((DiagnosticsServerIpcServerTcpClientRouterDelegate)new DiagnosticsServerRouterCommands().RunIpcClientTcpClientRouter).GetCommandHandler(), + // Options + IpcClientAddressOption(), TcpClientAddressOption(), RuntimeTimeoutOption(), VerboseOption(), ForwardPortOption() + }; + private static Option IpcClientAddressOption() => new Option( aliases: new[] { "--ipc-client", "-ipcc" }, @@ -139,6 +153,7 @@ namespace Microsoft.Diagnostics.Tools.DiagnosticsServerRouter .AddCommand(IpcClientTcpServerRouterCommand()) .AddCommand(IpcServerTcpServerRouterCommand()) .AddCommand(IpcServerTcpClientRouterCommand()) + .AddCommand(IpcClientTcpClientRouterCommand()) .UseDefaults() .Build(); diff --git a/src/Tools/dotnet-dsrouter/USBMuxTcpClientRouterFactory.cs b/src/Tools/dotnet-dsrouter/USBMuxTcpClientRouterFactory.cs index 29ca466c5..870008b16 100644 --- a/src/Tools/dotnet-dsrouter/USBMuxTcpClientRouterFactory.cs +++ b/src/Tools/dotnet-dsrouter/USBMuxTcpClientRouterFactory.cs @@ -260,7 +260,28 @@ namespace Microsoft.Diagnostics.Tools.DiagnosticsServerRouter public override async Task ConnectTcpStreamAsync(CancellationToken token) { - bool retry = false; + return await ConnectTcpStreamAsyncInternal(token, _auto_shutdown).ConfigureAwait(false); + } + + public override async Task ConnectTcpStreamAsync(CancellationToken token, bool retry) + { + return await ConnectTcpStreamAsyncInternal(token, retry).ConfigureAwait(false); + } + + public override void Start() + { + // Start device subscription thread. + StartNotificationSubscribeThread(); + } + + public override void Stop() + { + // Stop device subscription thread. + StopNotificationSubscribeThread(); + } + + async Task ConnectTcpStreamAsyncInternal(CancellationToken token, bool retry) + { int handle = -1; ushort networkPort = (ushort)IPAddress.HostToNetworkOrder(unchecked((short)_port)); @@ -290,10 +311,10 @@ namespace Microsoft.Diagnostics.Tools.DiagnosticsServerRouter throw new TimeoutException(); } - // If we are not doing auto shutdown when runtime is unavailable, fail right away, this will + // If we are not doing retries when runtime is unavailable, fail right away, this will // break any accepted IPC connections, making sure client is notified and could reconnect. - // If we do have auto shutdown enabled, retry until succeed or time out. - if (!_auto_shutdown) + // If not, retry until succeed or time out. + if (!retry) { _logger?.LogTrace($"Failed connecting {_port} over usbmux."); throw; @@ -304,8 +325,6 @@ namespace Microsoft.Diagnostics.Tools.DiagnosticsServerRouter // If we get an error (without hitting timeout above), most likely due to unavailable device/listener. // Delay execution to prevent to rapid retry attempts. await Task.Delay(TcpClientRetryTimeoutMs, token).ConfigureAwait(false); - - retry = true; } } while (retry); @@ -313,18 +332,6 @@ namespace Microsoft.Diagnostics.Tools.DiagnosticsServerRouter return new USBMuxStream(handle); } - public override void Start() - { - // Start device subscription thread. - StartNotificationSubscribeThread(); - } - - public override void Stop() - { - // Stop device subscription thread. - StopNotificationSubscribeThread(); - } - int ConnectTcpClientOverUSBMux() { uint result = 0; -- 2.34.1