Add dsrouter client-client mode. (#2385)
authorJohan Lorensson <lateralusx.github@gmail.com>
Tue, 6 Jul 2021 16:53:46 +0000 (18:53 +0200)
committerGitHub <noreply@github.com>
Tue, 6 Jul 2021 16:53:46 +0000 (09:53 -0700)
* Add dsrouter client-client mode.

* Make retry explicit parameter in ConnectTcpStreamAsync.

src/Microsoft.Diagnostics.NETCore.Client/DiagnosticsServerRouter/DiagnosticsServerRouterFactory.cs
src/Microsoft.Diagnostics.NETCore.Client/DiagnosticsServerRouter/DiagnosticsServerRouterRunner.cs
src/Tools/dotnet-dsrouter/DiagnosticsServerRouterCommands.cs
src/Tools/dotnet-dsrouter/Program.cs
src/Tools/dotnet-dsrouter/USBMuxTcpClientRouterFactory.cs

index c92f607b157f8972cd8adc3c8e3b329bdc67dd75..98c3febb1e77767df9fee58eb1089d04d3058618 100644 (file)
@@ -44,7 +44,7 @@ namespace Microsoft.Diagnostics.NETCore.Client
 
         public virtual ILogger Logger { get; }
 
-        public virtual void Start()
+        public virtual Task Start(CancellationToken token)
         {
             throw new NotImplementedException();
         }
@@ -319,12 +319,29 @@ namespace Microsoft.Diagnostics.NETCore.Client
         }
 
         public virtual async Task<Stream> ConnectTcpStreamAsync(CancellationToken token)
+        {
+            return await ConnectTcpStreamAsyncInternal(token, _auto_shutdown).ConfigureAwait(false);
+        }
+
+        public virtual async Task<Stream> ConnectTcpStreamAsync(CancellationToken token, bool retry)
+        {
+            return await ConnectTcpStreamAsyncInternal(token, retry).ConfigureAwait(false);
+        }
+
+        public virtual void Start()
+        {
+        }
+
+        public virtual void Stop()
+        {
+        }
+
+        async Task<Stream> ConnectTcpStreamAsyncInternal(CancellationToken token, bool retry)
         {
             Stream tcpClientStream = null;
 
             _logger?.LogDebug($"Connecting new tcp endpoint \"{_tcpClientAddress}\".");
 
-            bool retry = false;
             IpcTcpSocketEndPoint clientTcpEndPoint = new IpcTcpSocketEndPoint(_tcpClientAddress);
             Socket clientSocket = null;
 
@@ -339,7 +356,7 @@ namespace Microsoft.Diagnostics.NETCore.Client
 
                 try
                 {
-                    await ConnectAsyncInternal(clientSocket, clientTcpEndPoint, token).ConfigureAwait(false);
+                    await ConnectAsyncInternal(clientSocket, clientTcpEndPoint, connectTokenSource.Token).ConfigureAwait(false);
                     retry = false;
                 }
                 catch (Exception)
@@ -356,10 +373,10 @@ namespace Microsoft.Diagnostics.NETCore.Client
                         throw new TimeoutException();
                     }
 
-                    // If we are not doing auto shutdown when runtime is unavailable, fail right away, this will
+                    // If we are not doing retries when runtime is unavailable, fail right away, this will
                     // break any accepted IPC connections, making sure client is notified and could reconnect.
-                    // If we do have auto shutdown enabled, retry until succeed or time out.
-                    if (!_auto_shutdown)
+                    // If not, retry until succeed or time out.
+                    if (!retry)
                     {
                         _logger?.LogTrace($"Failed connecting {_tcpClientAddress}.");
                         throw;
@@ -370,8 +387,6 @@ namespace Microsoft.Diagnostics.NETCore.Client
                     // If we get an error (without hitting timeout above), most likely due to unavailable listener.
                     // Delay execution to prevent to rapid retry attempts.
                     await Task.Delay(TcpClientRetryTimeoutMs, token).ConfigureAwait(false);
-
-                    retry = true;
                 }
             }
             while (retry);
@@ -382,14 +397,6 @@ namespace Microsoft.Diagnostics.NETCore.Client
             return tcpClientStream;
         }
 
-        public virtual void Start()
-        {
-        }
-
-        public virtual void Stop()
-        {
-        }
-
         async Task ConnectAsyncInternal(Socket clientSocket, EndPoint remoteEP, CancellationToken token)
         {
             using (token.Register(() => clientSocket.Close(0)))
@@ -650,12 +657,14 @@ namespace Microsoft.Diagnostics.NETCore.Client
             }
         }
 
-        public override void Start()
+        public override Task Start(CancellationToken token)
         {
             _tcpServerRouterFactory.Start();
             _ipcServerRouterFactory.Start();
 
             _logger?.LogInformation($"Starting IPC server ({_ipcServerRouterFactory.IpcServerPath}) <--> TCP server ({_tcpServerRouterFactory.TcpServerAddress}) router.");
+
+            return Task.CompletedTask;
         }
 
         public override Task Stop()
@@ -840,11 +849,13 @@ namespace Microsoft.Diagnostics.NETCore.Client
             }
         }
 
-        public override void Start()
+        public override Task Start(CancellationToken token)
         {
             _ipcServerRouterFactory.Start();
             _tcpClientRouterFactory.Start();
             _logger?.LogInformation($"Starting IPC server ({_ipcServerRouterFactory.IpcServerPath}) <--> TCP client ({_tcpClientRouterFactory.TcpClientAddress}) router.");
+
+            return Task.CompletedTask;
         }
 
         public override Task Stop()
@@ -962,10 +973,12 @@ namespace Microsoft.Diagnostics.NETCore.Client
             }
         }
 
-        public override void Start()
+        public override Task Start(CancellationToken token)
         {
             _tcpServerRouterFactory.Start();
             _logger?.LogInformation($"Starting IPC client ({_ipcClientRouterFactory.IpcClientPath}) <--> TCP server ({_tcpServerRouterFactory.TcpServerAddress}) router.");
+
+            return Task.CompletedTask;
         }
 
         public override Task Stop()
@@ -1056,6 +1069,178 @@ namespace Microsoft.Diagnostics.NETCore.Client
         }
     }
 
+    /// <summary>
+    /// This class creates IPC Client - TCP Client router instances.
+    /// Supports NamedPipes/UnixDomainSocket client and TCP/IP client.
+    /// </summary>
+    internal class IpcClientTcpClientRouterFactory : DiagnosticsServerRouterFactory
+    {
+        Guid _runtimeInstanceId;
+        ulong _runtimeProcessId;
+        ILogger _logger;
+        IpcClientRouterFactory _ipcClientRouterFactory;
+        TcpClientRouterFactory _tcpClientRouterFactory;
+
+        public IpcClientTcpClientRouterFactory(string ipcClient, string tcpClient, int runtimeTimeoutMs, TcpClientRouterFactory.CreateInstanceDelegate factory, ILogger logger)
+        {
+            _runtimeInstanceId = Guid.Empty;
+            _runtimeProcessId = 0;
+            _logger = logger;
+            _ipcClientRouterFactory = new IpcClientRouterFactory(ipcClient, logger);
+            _tcpClientRouterFactory = factory(tcpClient, runtimeTimeoutMs, logger);
+        }
+
+        public override string IpcAddress {
+            get
+            {
+                return _ipcClientRouterFactory.IpcClientPath;
+            }
+        }
+
+        public override string TcpAddress {
+            get
+            {
+                return _tcpClientRouterFactory.TcpClientAddress;
+            }
+        }
+
+        public override ILogger Logger {
+            get
+            {
+                return _logger;
+            }
+        }
+
+        public override Task Start(CancellationToken token)
+        {
+            _tcpClientRouterFactory.Start();
+            _logger?.LogInformation($"Starting IPC client ({_ipcClientRouterFactory.IpcClientPath}) <--> TCP client ({_tcpClientRouterFactory.TcpClientAddress}) router.");
+
+            var requestRuntimeInfo = new Task(() =>
+            {
+                try
+                {
+                    _logger?.LogDebug($"Requesting runtime process information.");
+
+                    // Get new tcp client endpoint.
+                    using var tcpClientStream = _tcpClientRouterFactory.ConnectTcpStreamAsync(token).Result;
+
+                    // Request process info.
+                    IpcMessage message = new IpcMessage(DiagnosticsServerCommandSet.Process, (byte)ProcessCommandId.GetProcessInfo);
+                
+                    byte[] buffer = message.Serialize();
+                    tcpClientStream.Write(buffer, 0, buffer.Length);
+
+                    var response = IpcMessage.Parse(tcpClientStream);
+                    if ((DiagnosticsServerResponseId)response.Header.CommandId == DiagnosticsServerResponseId.OK)
+                    {
+                        var info = ProcessInfo.Parse(response.Payload);
+
+                        _runtimeProcessId = info.ProcessId;
+                        _runtimeInstanceId = info.RuntimeInstanceCookie;
+
+                        _logger?.LogDebug($"Retrieved runtime process information, pid={_runtimeProcessId}, cookie={_runtimeInstanceId}.");
+                    }
+                    else
+                    {
+                        throw new ServerErrorException("Failed to retrieve runtime process info.");
+                    }
+                }
+                catch (Exception)
+                {
+                    _runtimeProcessId = (ulong)Process.GetCurrentProcess().Id;
+                    _runtimeInstanceId = Guid.NewGuid();
+                    _logger?.LogWarning($"Failed to retrieve runtime process info, fallback to current process information, pid={_runtimeProcessId}, cookie={_runtimeInstanceId}.");
+                }
+            });
+
+            requestRuntimeInfo.Start();
+            return requestRuntimeInfo;
+        }
+
+        public override Task Stop()
+        {
+            _logger?.LogInformation($"Stopping IPC client ({_ipcClientRouterFactory.IpcClientPath}) <--> TCP client ({_tcpClientRouterFactory.TcpClientAddress}) router.");
+            _tcpClientRouterFactory.Stop();
+            return Task.CompletedTask;
+        }
+
+        public override async Task<Router> CreateRouterAsync(CancellationToken token)
+        {
+            Stream tcpClientStream = null;
+            Stream ipcClientStream = null;
+
+            _logger?.LogDebug("Trying to create a new router instance.");
+
+            try
+            {
+                using CancellationTokenSource cancelRouter = CancellationTokenSource.CreateLinkedTokenSource(token);
+
+                // Get new tcp client endpoint.
+                tcpClientStream = await _tcpClientRouterFactory.ConnectTcpStreamAsync(cancelRouter.Token, true).ConfigureAwait(false);
+
+                // Get new ipc client endpoint.
+                using var ipcClientStreamTask = _ipcClientRouterFactory.ConnectIpcStreamAsync(cancelRouter.Token);
+
+                // We have a valid tcp stream and a pending ipc stream. Wait for completion
+                // or disconnect of tcp stream.
+                using var checkTcpStreamTask = IsStreamConnectedAsync(tcpClientStream, cancelRouter.Token);
+
+                // Wait for at least completion of one task.
+                await Task.WhenAny(ipcClientStreamTask, checkTcpStreamTask).ConfigureAwait(false);
+
+                // Cancel out any pending tasks not yet completed.
+                cancelRouter.Cancel();
+
+                try
+                {
+                    await Task.WhenAll(ipcClientStreamTask, checkTcpStreamTask).ConfigureAwait(false);
+                }
+                catch (Exception)
+                {
+                    // Check if we have an accepted ipc stream.
+                    if (IsCompletedSuccessfully(ipcClientStreamTask))
+                        ipcClientStreamTask.Result?.Dispose();
+
+                    if (checkTcpStreamTask.IsFaulted)
+                    {
+                        _logger?.LogInformation("Broken tcp connection detected, aborting ipc connection.");
+                        checkTcpStreamTask.GetAwaiter().GetResult();
+                    }
+
+                    throw;
+                }
+
+                ipcClientStream = ipcClientStreamTask.Result;
+
+                try
+                {
+                    await IpcAdvertise.SerializeAsync(ipcClientStream, _runtimeInstanceId, _runtimeProcessId, token).ConfigureAwait(false);
+                }
+                catch (Exception)
+                {
+                    _logger?.LogDebug("Failed sending advertise message.");
+                    throw;
+                }
+            }
+            catch (Exception)
+            {
+                _logger?.LogDebug("Failed creating new router instance.");
+
+                // Cleanup and rethrow.
+                tcpClientStream?.Dispose();
+                ipcClientStream?.Dispose();
+
+                throw;
+            }
+
+            // Create new router.
+            _logger?.LogDebug("New router instance successfully created.");
+
+            return new Router(ipcClientStream, tcpClientStream, _logger, (ulong)IpcAdvertise.V1SizeInBytes);
+        }
+    }
+
     internal class Router : IDisposable
     {
         readonly ILogger _logger;
index 91d0874756c8cc4e4809af23d422cb2f5d81cffb..24546fe2c10d112df510abd2467a6a14ae0c898f 100644 (file)
@@ -37,6 +37,11 @@ namespace Microsoft.Diagnostics.NETCore.Client
             return await runRouter(token, new IpcServerTcpClientRouterFactory(ipcServer, tcpClient, runtimeTimeoutMs, tcpClientRouterFactory, logger), callbacks).ConfigureAwait(false);
         }
 
+        public static async Task<int> runIpcClientTcpClientRouter(CancellationToken token, string ipcClient, string tcpClient, int runtimeTimeoutMs, TcpClientRouterFactory.CreateInstanceDelegate tcpClientRouterFactory, ILogger logger, Callbacks callbacks)
+        {
+            return await runRouter(token, new IpcClientTcpClientRouterFactory(ipcClient, tcpClient, runtimeTimeoutMs, tcpClientRouterFactory, logger), callbacks).ConfigureAwait(false);
+        }
+
         public static bool isLoopbackOnly(string address)
         {
             bool isLooback = false;
@@ -58,8 +63,9 @@ namespace Microsoft.Diagnostics.NETCore.Client
 
             try
             {
-                routerFactory.Start();
-                callbacks?.OnRouterStarted(routerFactory.TcpAddress);
+                await routerFactory.Start(token);
+                if (!token.IsCancellationRequested)
+                    callbacks?.OnRouterStarted(routerFactory.TcpAddress);
 
                 while (!token.IsCancellationRequested)
                 {
index f79b5cbcc75890ea5f90ce3d3cf629f71707a213..164544f636f04524687543de09b8d1a1b6a8f698 100644 (file)
@@ -224,6 +224,66 @@ namespace Microsoft.Diagnostics.Tools.DiagnosticsServerRouter
             return routerTask.Result;
         }
 
+        public async Task<int> RunIpcClientTcpClientRouter(CancellationToken token, string ipcClient, string tcpClient, int runtimeTimeout, string verbose, string forwardPort)
+        {
+            using CancellationTokenSource cancelRouterTask = new CancellationTokenSource();
+            using CancellationTokenSource linkedCancelToken = CancellationTokenSource.CreateLinkedTokenSource(token, cancelRouterTask.Token);
+
+            LogLevel logLevel = LogLevel.Information;
+            if (string.Compare(verbose, "debug", StringComparison.OrdinalIgnoreCase) == 0)
+                logLevel = LogLevel.Debug;
+            else if (string.Compare(verbose, "trace", StringComparison.OrdinalIgnoreCase) == 0)
+                logLevel = LogLevel.Trace;
+
+            using var factory = new LoggerFactory();
+            factory.AddConsole(logLevel, false);
+
+            Launcher.SuspendProcess = true;
+            Launcher.ConnectMode = false;
+            Launcher.Verbose = logLevel != LogLevel.Information;
+            Launcher.CommandToken = token;
+
+            var logger = factory.CreateLogger("dotnet-dsrouter");
+
+            TcpClientRouterFactory.CreateInstanceDelegate tcpClientRouterFactory = TcpClientRouterFactory.CreateDefaultInstance;
+            if (!string.IsNullOrEmpty(forwardPort))
+            {
+                if (string.Compare(forwardPort, "android", StringComparison.OrdinalIgnoreCase) == 0)
+                {
+                    tcpClientRouterFactory = ADBTcpClientRouterFactory.CreateADBInstance;
+                }
+                else if (string.Compare(forwardPort, "ios", StringComparison.OrdinalIgnoreCase) == 0)
+                {
+                    tcpClientRouterFactory = USBMuxTcpClientRouterFactory.CreateUSBMuxInstance;
+                }
+                else
+                {
+                    logger.LogError($"Unknown port forwarding argument, {forwardPort}. Ignoring --forward-port argument.");
+                }
+            }
+
+            var routerTask = DiagnosticsServerRouterRunner.runIpcClientTcpClientRouter(linkedCancelToken.Token, ipcClient, tcpClient, runtimeTimeout == Timeout.Infinite ? runtimeTimeout : runtimeTimeout * 1000, tcpClientRouterFactory, logger, Launcher);
+
+            while (!linkedCancelToken.IsCancellationRequested)
+            {
+                await Task.WhenAny(routerTask, Task.Delay(250)).ConfigureAwait(false);
+                if (routerTask.IsCompleted)
+                    break;
+
+                if (!Console.IsInputRedirected && Console.KeyAvailable)
+                {
+                    ConsoleKey cmd = Console.ReadKey(true).Key;
+                    if (cmd == ConsoleKey.Q)
+                    {
+                        cancelRouterTask.Cancel();
+                        break;
+                    }
+                }
+            }
+
+            return routerTask.Result;
+        }
+
         static void checkLoopbackOnly(string tcpServer)
         {
             if (!string.IsNullOrEmpty(tcpServer) && !DiagnosticsServerRouterRunner.isLoopbackOnly(tcpServer))
index 52ab8165a061a4565429172211fc8e9413ad5416..7b9e08a8ede5473f4e367f4d678e3f5cde528f09 100644 (file)
@@ -20,6 +20,7 @@ namespace Microsoft.Diagnostics.Tools.DiagnosticsServerRouter
         delegate Task<int> DiagnosticsServerIpcClientTcpServerRouterDelegate(CancellationToken ct, string ipcClient, string tcpServer, int runtimeTimeoutS, string verbose, string forwardPort);
         delegate Task<int> DiagnosticsServerIpcServerTcpServerRouterDelegate(CancellationToken ct, string ipcServer, string tcpServer, int runtimeTimeoutS, string verbose, string forwardPort);
         delegate Task<int> DiagnosticsServerIpcServerTcpClientRouterDelegate(CancellationToken ct, string ipcServer, string tcpClient, int runtimeTimeoutS, string verbose, string forwardPort);
+        delegate Task<int> DiagnosticsServerIpcClientTcpClientRouterDelegate(CancellationToken ct, string ipcClient, string tcpClient, int runtimeTimeoutS, string verbose, string forwardPort);
 
         private static Command IpcClientTcpServerRouterCommand() =>
             new Command(
@@ -60,6 +61,19 @@ namespace Microsoft.Diagnostics.Tools.DiagnosticsServerRouter
                 IpcServerAddressOption(), TcpClientAddressOption(), RuntimeTimeoutOption(), VerboseOption(), ForwardPortOption()
             };
 
+        private static Command IpcClientTcpClientRouterCommand() =>
+            new Command(
+                name: "client-client",
+                description: "Start a .NET application Diagnostics Server routing local IPC server <--> remote TCP server. " +
+                                "Router is configured using an IPC client (connecting diagnostic tool IPC server) " +
+                                "and a TCP/IP client (connecting runtime TCP server).")
+            {
+                // Handler
+                HandlerDescriptor.FromDelegate((DiagnosticsServerIpcServerTcpClientRouterDelegate)new DiagnosticsServerRouterCommands().RunIpcClientTcpClientRouter).GetCommandHandler(),
+                // Options
+                IpcClientAddressOption(), TcpClientAddressOption(), RuntimeTimeoutOption(), VerboseOption(), ForwardPortOption()
+            };
+
         private static Option IpcClientAddressOption() =>
             new Option(
                 aliases: new[] { "--ipc-client", "-ipcc" },
@@ -139,6 +153,7 @@ namespace Microsoft.Diagnostics.Tools.DiagnosticsServerRouter
                 .AddCommand(IpcClientTcpServerRouterCommand())
                 .AddCommand(IpcServerTcpServerRouterCommand())
                 .AddCommand(IpcServerTcpClientRouterCommand())
+                .AddCommand(IpcClientTcpClientRouterCommand())
                 .UseDefaults()
                 .Build();
 
index 29ca466c53d3a2f9eae0a4d80f51524ca20931cb..870008b163b7f51a78c1a19e074ef2043eda6133 100644 (file)
@@ -260,7 +260,28 @@ namespace Microsoft.Diagnostics.Tools.DiagnosticsServerRouter
 
         public override async Task<Stream> ConnectTcpStreamAsync(CancellationToken token)
         {
-            bool retry = false;
+            return await ConnectTcpStreamAsyncInternal(token, _auto_shutdown).ConfigureAwait(false);
+        }
+
+        public override async Task<Stream> ConnectTcpStreamAsync(CancellationToken token, bool retry)
+        {
+            return await ConnectTcpStreamAsyncInternal(token, retry).ConfigureAwait(false);
+        }
+
+        public override void Start()
+        {
+            // Start device subscription thread.
+            StartNotificationSubscribeThread();
+        }
+
+        public override void Stop()
+        {
+            // Stop device subscription thread.
+            StopNotificationSubscribeThread();
+        }
+
+        async Task<Stream> ConnectTcpStreamAsyncInternal(CancellationToken token, bool retry)
+        {
             int handle = -1;
             ushort networkPort = (ushort)IPAddress.HostToNetworkOrder(unchecked((short)_port));
 
@@ -290,10 +311,10 @@ namespace Microsoft.Diagnostics.Tools.DiagnosticsServerRouter
                         throw new TimeoutException();
                     }
 
-                    // If we are not doing auto shutdown when runtime is unavailable, fail right away, this will
+                    // If we are not doing retries when runtime is unavailable, fail right away, this will
                     // break any accepted IPC connections, making sure client is notified and could reconnect.
-                    // If we do have auto shutdown enabled, retry until succeed or time out.
-                    if (!_auto_shutdown)
+                    // If not, retry until succeed or time out.
+                    if (!retry)
                     {
                         _logger?.LogTrace($"Failed connecting {_port} over usbmux.");
                         throw;
@@ -304,8 +325,6 @@ namespace Microsoft.Diagnostics.Tools.DiagnosticsServerRouter
                     // If we get an error (without hitting timeout above), most likely due to unavailable device/listener.
                     // Delay execution to prevent to rapid retry attempts.
                     await Task.Delay(TcpClientRetryTimeoutMs, token).ConfigureAwait(false);
-
-                    retry = true;
                 }
             }
             while (retry);
@@ -313,18 +332,6 @@ namespace Microsoft.Diagnostics.Tools.DiagnosticsServerRouter
             return new USBMuxStream(handle);
         }
 
-        public override void Start()
-        {
-            // Start device subscription thread.
-            StartNotificationSubscribeThread();
-        }
-
-        public override void Stop()
-        {
-            // Stop device subscription thread.
-            StopNotificationSubscribeThread();
-        }
-
         int ConnectTcpClientOverUSBMux()
         {
             uint result = 0;