using System.Diagnostics;
using Microsoft.Extensions.Logging;
using System.Net;
+using System.Net.Sockets;
namespace Microsoft.Diagnostics.NETCore.Client
{
/// </summary>
internal class DiagnosticsServerRouterFactory
{
- protected readonly ILogger _logger;
+ int IsStreamConnectedTimeoutMs { get; set; } = 500;
- public DiagnosticsServerRouterFactory(ILogger logger)
- {
- _logger = logger;
- }
+ public virtual string IpcAddress { get; }
- public ILogger Logger
- {
- get { return _logger; }
- }
+ public virtual string TcpAddress { get; }
+
+ public virtual ILogger Logger { get; }
public virtual void Start()
{
{
throw new NotImplementedException();
}
+
+ protected bool IsStreamConnected(Stream stream, CancellationToken token)
+ {
+ bool connected = true;
+
+ if (stream is NamedPipeServerStream || stream is NamedPipeClientStream)
+ {
+ PipeStream pipeStream = stream as PipeStream;
+
+ // PeekNamedPipe will return false if the pipe is disconnected/broken.
+ connected = NativeMethods.PeekNamedPipe(
+ pipeStream.SafePipeHandle,
+ null,
+ 0,
+ IntPtr.Zero,
+ IntPtr.Zero,
+ IntPtr.Zero);
+ }
+ else if (stream is ExposedSocketNetworkStream networkStream)
+ {
+ bool blockingState = networkStream.Socket.Blocking;
+ try
+ {
+ // Check connection read state by peek one byte. Will return 0 in case connection is closed.
+ // A closed connection could also raise exception, but then socket connected state should
+ // be set to false.
+ networkStream.Socket.Blocking = false;
+ if (networkStream.Socket.Receive(new byte[1], 0, 1, System.Net.Sockets.SocketFlags.Peek) == 0)
+ connected = false;
+
+ // Check connection write state by sending non-blocking zero-byte data.
+ // A closed connection should raise exception, but then socket connected state should
+ // be set to false.
+ if (connected)
+ networkStream.Socket.Send(Array.Empty<byte>(), 0, System.Net.Sockets.SocketFlags.None);
+ }
+ catch (Exception)
+ {
+ connected = networkStream.Socket.Connected;
+ }
+ finally
+ {
+ networkStream.Socket.Blocking = blockingState;
+ }
+ }
+ else
+ {
+ connected = false;
+ }
+
+ return connected;
+ }
+
+ protected async Task IsStreamConnectedAsync(Stream stream, CancellationToken token)
+ {
+ while (!token.IsCancellationRequested)
+ {
+ // Check if tcp stream connection is still available.
+ if (!IsStreamConnected(stream, token))
+ {
+ throw new EndOfStreamException();
+ }
+
+ try
+ {
+ // Wait before rechecking connection.
+ await Task.Delay(IsStreamConnectedTimeoutMs, token).ConfigureAwait(false);
+ }
+ catch { }
+ }
+ }
+
+ protected bool IsCompletedSuccessfully(Task t)
+ {
+#if NETCOREAPP2_0_OR_GREATER
+ return t.IsCompletedSuccessfully;
+#else
+ return t.IsCompleted && !t.IsCanceled && !t.IsFaulted;
+#endif
+ }
}
/// <summary>
/// This class represent a TCP/IP server endpoint used when building up router instances.
/// </summary>
- internal class TcpServerRouterFactory : DiagnosticsServerRouterFactory
+ internal class TcpServerRouterFactory : IIpcServerTransportCallbackInternal
{
- protected string _tcpServerAddress;
+ readonly ILogger _logger;
+
+ string _tcpServerAddress;
- protected ReversedDiagnosticsServer _tcpServer;
- protected IpcEndpointInfo _tcpServerEndpointInfo;
+ ReversedDiagnosticsServer _tcpServer;
+ IpcEndpointInfo _tcpServerEndpointInfo;
- protected bool _auto_shutdown;
+ bool _auto_shutdown;
- protected int RuntimeTimeoutMs { get; set; } = 60000;
- protected int TcpServerTimeoutMs { get; set; } = 5000;
- protected int IsStreamConnectedTimeoutMs { get; set; } = 500;
+ int RuntimeTimeoutMs { get; set; } = 60000;
+ int TcpServerTimeoutMs { get; set; } = 5000;
public Guid RuntimeInstanceId
{
get { return _tcpServerAddress; }
}
- protected TcpServerRouterFactory(string tcpServer, int runtimeTimeoutMs, ILogger logger)
- : base(logger)
+ public TcpServerRouterFactory(string tcpServer, int runtimeTimeoutMs, ILogger logger)
{
+ _logger = logger;
+
_tcpServerAddress = IpcTcpSocketEndPoint.NormalizeTcpIpEndPoint(string.IsNullOrEmpty(tcpServer) ? "127.0.0.1:0" : tcpServer);
_auto_shutdown = runtimeTimeoutMs != Timeout.Infinite;
_tcpServer = new ReversedDiagnosticsServer(_tcpServerAddress, enableTcpIpProtocol : true);
_tcpServerEndpointInfo = new IpcEndpointInfo();
+ _tcpServer.TransportCallback = this;
}
- public override void Start()
+ public void Start()
{
_tcpServer.Start();
}
- public override async Task Stop()
+ public async Task Stop()
{
await _tcpServer.DisposeAsync().ConfigureAwait(false);
}
- public override void Reset()
+ public void Reset()
{
if (_tcpServerEndpointInfo.Endpoint != null)
{
}
}
- protected async Task<Stream> AcceptTcpStreamAsync(CancellationToken token)
+ public async Task<Stream> AcceptTcpStreamAsync(CancellationToken token)
{
Stream tcpServerStream;
- Logger.LogDebug($"Waiting for a new tcp connection at endpoint \"{_tcpServerAddress}\".");
+ _logger?.LogDebug($"Waiting for a new tcp connection at endpoint \"{_tcpServerAddress}\".");
if (_tcpServerEndpointInfo.Endpoint == null)
{
{
if (acceptTimeoutTokenSource.IsCancellationRequested)
{
- Logger.LogDebug("No runtime instance connected before timeout.");
+ _logger?.LogDebug("No runtime instance connected before timeout.");
if (_auto_shutdown)
throw new RuntimeTimeoutException(RuntimeTimeoutMs);
{
if (connectTimeoutTokenSource.IsCancellationRequested)
{
- Logger.LogDebug("No tcp stream connected before timeout.");
+ _logger?.LogDebug("No tcp stream connected before timeout.");
throw new BackendStreamTimeoutException(TcpServerTimeoutMs);
}
}
if (tcpServerStream != null)
- Logger.LogDebug($"Successfully connected tcp stream, runtime id={RuntimeInstanceId}, runtime pid={RuntimeProcessId}.");
+ _logger?.LogDebug($"Successfully connected tcp stream, runtime id={RuntimeInstanceId}, runtime pid={RuntimeProcessId}.");
return tcpServerStream;
}
- protected bool IsStreamConnected(Stream stream, CancellationToken token)
+ public void CreatedNewServer(EndPoint localEP)
{
- bool connected = true;
+ if (localEP is IPEndPoint ipEP)
+ _tcpServerAddress = _tcpServerAddress.Replace(":0", string.Format(":{0}", ipEP.Port));
+ }
+ }
- if (stream is NamedPipeServerStream || stream is NamedPipeClientStream)
+ /// <summary>
+ /// This class represent a TCP/IP client endpoint used when building up router instances.
+ /// </summary>
+ internal class TcpClientRouterFactory
+ {
+ readonly ILogger _logger;
+
+ readonly string _tcpClientAddress;
+
+ bool _auto_shutdown;
+
+ int TcpClientTimeoutMs { get; set; } = Timeout.Infinite;
+
+ int TcpClientRetryTimeoutMs { get; set; } = 500;
+
+ public string TcpClientAddress {
+ get { return _tcpClientAddress; }
+ }
+
+ public TcpClientRouterFactory(string tcpClient, int runtimeTimeoutMs, ILogger logger)
+ {
+ _logger = logger;
+ _tcpClientAddress = IpcTcpSocketEndPoint.NormalizeTcpIpEndPoint(string.IsNullOrEmpty(tcpClient) ? "127.0.0.1:" + string.Format("{0}", 56000 + (Process.GetCurrentProcess().Id % 1000)) : tcpClient);
+ _auto_shutdown = runtimeTimeoutMs != Timeout.Infinite;
+ if (runtimeTimeoutMs != Timeout.Infinite)
+ TcpClientTimeoutMs = runtimeTimeoutMs;
+ }
+
+ public async Task<Stream> ConnectTcpStreamAsync(CancellationToken token)
+ {
+ Stream tcpClientStream = null;
+
+ _logger?.LogDebug($"Connecting new tcp endpoint \"{_tcpClientAddress}\".");
+
+ bool retry = false;
+ IpcTcpSocketEndPoint clientTcpEndPoint = new IpcTcpSocketEndPoint(_tcpClientAddress);
+ Socket clientSocket = null;
+
+ using var connectTimeoutTokenSource = new CancellationTokenSource();
+ using var connectTokenSource = CancellationTokenSource.CreateLinkedTokenSource(token, connectTimeoutTokenSource.Token);
+
+ connectTimeoutTokenSource.CancelAfter(TcpClientTimeoutMs);
+
+ do
{
- PipeStream pipeStream = stream as PipeStream;
+ clientSocket = new Socket(SocketType.Stream, ProtocolType.Tcp);
- // PeekNamedPipe will return false if the pipe is disconnected/broken.
- connected = NativeMethods.PeekNamedPipe(
- pipeStream.SafePipeHandle,
- null,
- 0,
- IntPtr.Zero,
- IntPtr.Zero,
- IntPtr.Zero);
+ try
+ {
+ await ConnectAsyncInternal(clientSocket, clientTcpEndPoint, token).ConfigureAwait(false);
+ retry = false;
+ }
+ catch (Exception)
+ {
+ clientSocket?.Dispose();
+
+ if (connectTimeoutTokenSource.IsCancellationRequested)
+ {
+ _logger?.LogDebug("No tcp stream connected, timing out.");
+
+ if (_auto_shutdown)
+ throw new RuntimeTimeoutException(TcpClientTimeoutMs);
+
+ throw new TimeoutException();
+ }
+
+ // If we are not doing auto shutdown when runtime is unavailable, fail right away, this will
+ // break any accepted IPC connections, making sure client is notified and could reconnect.
+ // If we do have auto shutdown enabled, retry until succeed or time out.
+ if (!_auto_shutdown)
+ {
+ _logger?.LogTrace($"Failed connecting {_tcpClientAddress}.");
+ throw;
+ }
+
+ _logger?.LogTrace($"Failed connecting {_tcpClientAddress}, wait {TcpClientRetryTimeoutMs} ms before retrying.");
+
+ // If we get an error (without hitting timeout above), most likely due to unavailable listener.
+ // Delay execution to prevent to rapid retry attempts.
+ await Task.Delay(TcpClientRetryTimeoutMs, token).ConfigureAwait(false);
+
+ retry = true;
+ }
}
- else if (stream is ExposedSocketNetworkStream networkStream)
+ while (retry);
+
+ tcpClientStream = new ExposedSocketNetworkStream(clientSocket, ownsSocket: true);
+ _logger?.LogDebug("Successfully connected tcp stream.");
+
+ return tcpClientStream;
+ }
+
+ async Task ConnectAsyncInternal(Socket clientSocket, EndPoint remoteEP, CancellationToken token)
+ {
+ using (token.Register(() => clientSocket.Close(0)))
{
- bool blockingState = networkStream.Socket.Blocking;
try
{
- // Check connection read state by peek one byte. Will return 0 in case connection is closed.
- // A closed connection could also raise exception, but then socket connected state should
- // be set to false.
- networkStream.Socket.Blocking = false;
- if (networkStream.Socket.Receive(new byte[1], 0, 1, System.Net.Sockets.SocketFlags.Peek) == 0)
- connected = false;
-
- // Check connection write state by sending non-blocking zero-byte data.
- // A closed connection should raise exception, but then socket connected state should
- // be set to false.
- if (connected)
- networkStream.Socket.Send(Array.Empty<byte>(), 0, System.Net.Sockets.SocketFlags.None);
+ Func<AsyncCallback, object, IAsyncResult> beginConnect = (callback, state) =>
+ {
+ return clientSocket.BeginConnect(remoteEP, callback, state);
+ };
+ await Task.Factory.FromAsync(beginConnect, clientSocket.EndConnect, this).ConfigureAwait(false);
}
- catch (Exception)
+ // When the socket is closed, the FromAsync logic will try to call EndAccept on the socket,
+ // but that will throw an ObjectDisposedException. Only catch the exception if due to cancellation.
+ catch (ObjectDisposedException) when (token.IsCancellationRequested)
{
- connected = networkStream.Socket.Connected;
+ // First check if the cancellation token caused the closing of the socket,
+ // then rethrow the exception if it did not.
+ token.ThrowIfCancellationRequested();
}
- finally
+ }
+ }
+ }
+
+ /// <summary>
+ /// This class represent a IPC server endpoint used when building up router instances.
+ /// </summary>
+ internal class IpcServerRouterFactory
+ {
+ readonly ILogger _logger;
+
+ readonly string _ipcServerPath;
+
+ IpcServerTransport _ipcServer;
+
+ int IpcServerTimeoutMs { get; set; } = Timeout.Infinite;
+
+ public string IpcServerPath {
+ get { return _ipcServerPath; }
+ }
+
+ public IpcServerRouterFactory(string ipcServer, ILogger logger)
+ {
+ _logger = logger;
+ _ipcServerPath = ipcServer;
+ if (string.IsNullOrEmpty(_ipcServerPath))
+ _ipcServerPath = GetDefaultIpcServerPath();
+
+ _ipcServer = IpcServerTransport.Create(_ipcServerPath, IpcServerTransport.MaxAllowedConnections, false);
+ }
+
+ public void Start()
+ {
+ }
+
+ public void Stop()
+ {
+ _ipcServer?.Dispose();
+ }
+
+ public async Task<Stream> AcceptIpcStreamAsync(CancellationToken token)
+ {
+ Stream ipcServerStream = null;
+
+ _logger?.LogDebug($"Waiting for new ipc connection at endpoint \"{_ipcServerPath}\".");
+
+
+ using var connectTimeoutTokenSource = new CancellationTokenSource();
+ using var connectTokenSource = CancellationTokenSource.CreateLinkedTokenSource(token, connectTimeoutTokenSource.Token);
+
+ try
+ {
+ connectTimeoutTokenSource.CancelAfter(IpcServerTimeoutMs);
+ ipcServerStream = await _ipcServer.AcceptAsync(connectTokenSource.Token).ConfigureAwait(false);
+ }
+ catch (Exception)
+ {
+ ipcServerStream?.Dispose();
+
+ if (connectTimeoutTokenSource.IsCancellationRequested)
{
- networkStream.Socket.Blocking = blockingState;
+ _logger?.LogDebug("No ipc stream connected, timing out.");
+ throw new TimeoutException();
}
+
+ throw;
+ }
+
+ if (ipcServerStream != null)
+ _logger?.LogDebug("Successfully connected ipc stream.");
+
+ return ipcServerStream;
+ }
+
+ static string GetDefaultIpcServerPath()
+ {
+ int processId = Process.GetCurrentProcess().Id;
+ if (RuntimeInformation.IsOSPlatform(OSPlatform.Windows))
+ {
+ return Path.Combine(PidIpcEndpoint.IpcRootPath, $"dotnet-diagnostic-{processId}");
}
else
{
- connected = false;
+ DateTime unixEpoch;
+#if NETCOREAPP2_1_OR_GREATER
+ unixEpoch = DateTime.UnixEpoch;
+#else
+ unixEpoch = new DateTime(1970, 1, 1, 0, 0, 0, 0, DateTimeKind.Utc);
+#endif
+ TimeSpan diff = Process.GetCurrentProcess().StartTime.ToUniversalTime() - unixEpoch;
+ return Path.Combine(PidIpcEndpoint.IpcRootPath, $"dotnet-diagnostic-{processId}-{(long)diff.TotalSeconds}-socket");
}
+ }
+ }
- return connected;
+ /// <summary>
+ /// This class represent a IPC client endpoint used when building up router instances.
+ /// </summary>
+ internal class IpcClientRouterFactory
+ {
+ readonly ILogger _logger;
+
+ readonly string _ipcClientPath;
+
+ int IpcClientTimeoutMs { get; set; } = Timeout.Infinite;
+
+ int IpcClientRetryTimeoutMs { get; set; } = 500;
+
+ public string IpcClientPath {
+ get { return _ipcClientPath; }
}
- protected async Task IsStreamConnectedAsync(Stream stream, CancellationToken token)
+ public IpcClientRouterFactory(string ipcClient, ILogger logger)
{
- while (!token.IsCancellationRequested)
+ _logger = logger;
+ _ipcClientPath = ipcClient;
+ }
+
+ public async Task<Stream> ConnectIpcStreamAsync(CancellationToken token)
+ {
+ Stream ipcClientStream = null;
+
+ _logger?.LogDebug($"Connecting new ipc endpoint \"{_ipcClientPath}\".");
+
+ if (RuntimeInformation.IsOSPlatform(OSPlatform.Windows))
{
- // Check if tcp stream connection is still available.
- if (!IsStreamConnected(stream, token))
+ var namedPipe = new NamedPipeClientStream(
+ ".",
+ _ipcClientPath,
+ PipeDirection.InOut,
+ PipeOptions.Asynchronous,
+ TokenImpersonationLevel.Impersonation);
+
+ try
{
- throw new EndOfStreamException();
+ await namedPipe.ConnectAsync(IpcClientTimeoutMs, token).ConfigureAwait(false);
+ }
+ catch (Exception ex)
+ {
+ namedPipe?.Dispose();
+
+ if (ex is TimeoutException)
+ _logger?.LogDebug("No ipc stream connected, timing out.");
+
+ throw;
+ }
+
+ ipcClientStream = namedPipe;
+ }
+ else
+ {
+ bool retry = false;
+ IpcUnixDomainSocket unixDomainSocket;
+
+ using var connectTimeoutTokenSource = new CancellationTokenSource();
+ using var connectTokenSource = CancellationTokenSource.CreateLinkedTokenSource(token, connectTimeoutTokenSource.Token);
+
+ connectTimeoutTokenSource.CancelAfter(IpcClientTimeoutMs);
+
+ do
+ {
+ unixDomainSocket = new IpcUnixDomainSocket();
+
+ try
+ {
+ await unixDomainSocket.ConnectAsync(new IpcUnixDomainSocketEndPoint(_ipcClientPath), token).ConfigureAwait(false);
+ retry = false;
+ }
+ catch (Exception)
+ {
+ unixDomainSocket?.Dispose();
+
+ if (connectTimeoutTokenSource.IsCancellationRequested)
+ {
+ _logger?.LogDebug("No ipc stream connected, timing out.");
+ throw new TimeoutException();
+ }
+
+ _logger?.LogTrace($"Failed connecting {_ipcClientPath}, wait {IpcClientRetryTimeoutMs} ms before retrying.");
+
+ // If we get an error (without hitting timeout above), most likely due to unavailable listener.
+ // Delay execution to prevent to rapid retry attempts.
+ await Task.Delay(IpcClientRetryTimeoutMs, token).ConfigureAwait(false);
+
+ retry = true;
+ }
}
+ while (retry);
- try
- {
- // Wait before rechecking connection.
- await Task.Delay(IsStreamConnectedTimeoutMs, token).ConfigureAwait(false);
- }
- catch { }
+ ipcClientStream = new ExposedSocketNetworkStream(unixDomainSocket, ownsSocket: true);
}
- }
- protected bool IsCompletedSuccessfully(Task t)
- {
-#if NETCOREAPP2_0_OR_GREATER
- return t.IsCompletedSuccessfully;
-#else
- return t.IsCompleted && !t.IsCanceled && !t.IsFaulted;
-#endif
+ if (ipcClientStream != null)
+ _logger?.LogDebug("Successfully connected ipc stream.");
+
+ return ipcClientStream;
}
}
/// This class creates IPC Server - TCP Server router instances.
/// Supports NamedPipes/UnixDomainSocket server and TCP/IP server.
/// </summary>
- internal class IpcServerTcpServerRouterFactory : TcpServerRouterFactory, IIpcServerTransportCallbackInternal
+ internal class IpcServerTcpServerRouterFactory : DiagnosticsServerRouterFactory
{
- readonly string _ipcServerPath;
-
- IpcServerTransport _ipcServer;
-
- protected int IpcServerTimeoutMs { get; set; } = Timeout.Infinite;
+ ILogger _logger;
+ TcpServerRouterFactory _tcpServerRouterFactory;
+ IpcServerRouterFactory _ipcServerRouterFactory;
public IpcServerTcpServerRouterFactory(string ipcServer, string tcpServer, int runtimeTimeoutMs, ILogger logger)
- : base(tcpServer, runtimeTimeoutMs, logger)
{
- _ipcServerPath = ipcServer;
- if (string.IsNullOrEmpty(_ipcServerPath))
- _ipcServerPath = GetDefaultIpcServerPath();
+ _logger = logger;
+ _tcpServerRouterFactory = new TcpServerRouterFactory(tcpServer, runtimeTimeoutMs, logger);
+ _ipcServerRouterFactory = new IpcServerRouterFactory(ipcServer, logger);
+ }
- _ipcServer = IpcServerTransport.Create(_ipcServerPath, IpcServerTransport.MaxAllowedConnections, false);
- _tcpServer.TransportCallback = this;
+ public override string IpcAddress
+ {
+ get
+ {
+ return _ipcServerRouterFactory.IpcServerPath;
+ }
}
- public static string GetDefaultIpcServerPath()
+ public override string TcpAddress
{
- int processId = Process.GetCurrentProcess().Id;
- if (RuntimeInformation.IsOSPlatform(OSPlatform.Windows))
+ get
{
- return Path.Combine(PidIpcEndpoint.IpcRootPath, $"dotnet-diagnostic-{processId}");
+ return _tcpServerRouterFactory.TcpServerAddress;
}
- else
+ }
+
+ public override ILogger Logger
+ {
+ get
{
- DateTime unixEpoch;
-#if NETCOREAPP2_1_OR_GREATER
- unixEpoch = DateTime.UnixEpoch;
-#else
- unixEpoch = new DateTime(1970, 1, 1, 0, 0, 0, 0, DateTimeKind.Utc);
-#endif
- TimeSpan diff = Process.GetCurrentProcess().StartTime.ToUniversalTime() - unixEpoch;
- return Path.Combine(PidIpcEndpoint.IpcRootPath, $"dotnet-diagnostic-{processId}-{(long)diff.TotalSeconds}-socket");
+ return _logger;
}
}
public override void Start()
{
- base.Start();
- _logger.LogInformation($"Starting IPC server ({_ipcServerPath}) <--> TCP server ({_tcpServerAddress}) router.");
+ _tcpServerRouterFactory.Start();
+ _ipcServerRouterFactory.Start();
+
+ _logger?.LogInformation($"Starting IPC server ({_ipcServerRouterFactory.IpcServerPath}) <--> TCP server ({_tcpServerRouterFactory.TcpServerAddress}) router.");
}
public override Task Stop()
{
- _ipcServer?.Dispose();
- return base.Stop();
+ _ipcServerRouterFactory.Stop();
+ return _tcpServerRouterFactory.Stop();
+ }
+
+ public override void Reset()
+ {
+ _tcpServerRouterFactory.Reset();
}
public override async Task<Router> CreateRouterAsync(CancellationToken token)
Stream tcpServerStream = null;
Stream ipcServerStream = null;
- Logger.LogDebug($"Trying to create new router instance.");
+ _logger?.LogDebug($"Trying to create new router instance.");
try
{
using CancellationTokenSource cancelRouter = CancellationTokenSource.CreateLinkedTokenSource(token);
// Get new tcp server endpoint.
- using var tcpServerStreamTask = AcceptTcpStreamAsync(cancelRouter.Token);
+ using var tcpServerStreamTask = _tcpServerRouterFactory.AcceptTcpStreamAsync(cancelRouter.Token);
// Get new ipc server endpoint.
- using var ipcServerStreamTask = AcceptIpcStreamAsync(cancelRouter.Token);
+ using var ipcServerStreamTask = _ipcServerRouterFactory.AcceptIpcStreamAsync(cancelRouter.Token);
await Task.WhenAny(ipcServerStreamTask, tcpServerStreamTask).ConfigureAwait(false);
if (checkIpcStreamTask.IsFaulted)
{
- Logger.LogInformation("Broken ipc connection detected, aborting tcp connection.");
+ _logger?.LogInformation("Broken ipc connection detected, aborting tcp connection.");
checkIpcStreamTask.GetAwaiter().GetResult();
}
if (checkTcpStreamTask.IsFaulted)
{
- Logger.LogInformation("Broken tcp connection detected, aborting ipc connection.");
+ _logger?.LogInformation("Broken tcp connection detected, aborting ipc connection.");
checkTcpStreamTask.GetAwaiter().GetResult();
}
}
catch (Exception)
{
- Logger.LogDebug("Failed creating new router instance.");
+ _logger?.LogDebug("Failed creating new router instance.");
// Cleanup and rethrow.
ipcServerStream?.Dispose();
}
// Create new router.
- Logger.LogDebug("New router instance successfully created.");
+ _logger?.LogDebug("New router instance successfully created.");
- return new Router(ipcServerStream, tcpServerStream, Logger);
+ return new Router(ipcServerStream, tcpServerStream, _logger);
}
+ }
+
+ /// <summary>
+ /// This class creates IPC Server - TCP Client router instances.
+ /// Supports NamedPipes/UnixDomainSocket server and TCP/IP client.
+ /// </summary>
+ internal class IpcServerTcpClientRouterFactory : DiagnosticsServerRouterFactory
+ {
+ ILogger _logger;
+ IpcServerRouterFactory _ipcServerRouterFactory;
+ TcpClientRouterFactory _tcpClientRouterFactory;
- protected async Task<Stream> AcceptIpcStreamAsync(CancellationToken token)
+ public IpcServerTcpClientRouterFactory(string ipcServer, string tcpClient, int runtimeTimeoutMs, ILogger logger)
{
- Stream ipcServerStream = null;
+ _logger = logger;
+ _ipcServerRouterFactory = new IpcServerRouterFactory(ipcServer, logger);
+ _tcpClientRouterFactory = new TcpClientRouterFactory(tcpClient, runtimeTimeoutMs, logger);
+ }
- Logger.LogDebug($"Waiting for new ipc connection at endpoint \"{_ipcServerPath}\".");
+ public override string IpcAddress
+ {
+ get
+ {
+ return _ipcServerRouterFactory.IpcServerPath;
+ }
+ }
+ public override string TcpAddress
+ {
+ get
+ {
+ return _tcpClientRouterFactory.TcpClientAddress;
+ }
+ }
- using var connectTimeoutTokenSource = new CancellationTokenSource();
- using var connectTokenSource = CancellationTokenSource.CreateLinkedTokenSource(token, connectTimeoutTokenSource.Token);
+ public override ILogger Logger
+ {
+ get
+ {
+ return _logger;
+ }
+ }
+
+ public override void Start()
+ {
+ _ipcServerRouterFactory.Start();
+ _logger?.LogInformation($"Starting IPC server ({_ipcServerRouterFactory.IpcServerPath}) <--> TCP client ({_tcpClientRouterFactory.TcpClientAddress}) router.");
+ }
+
+ public override Task Stop()
+ {
+ _ipcServerRouterFactory.Stop();
+ return Task.CompletedTask;
+ }
+
+ public override async Task<Router> CreateRouterAsync(CancellationToken token)
+ {
+ Stream tcpClientStream = null;
+ Stream ipcServerStream = null;
+
+ _logger?.LogDebug("Trying to create a new router instance.");
try
{
- connectTimeoutTokenSource.CancelAfter(IpcServerTimeoutMs);
- ipcServerStream = await _ipcServer.AcceptAsync(connectTokenSource.Token).ConfigureAwait(false);
+ using CancellationTokenSource cancelRouter = CancellationTokenSource.CreateLinkedTokenSource(token);
+
+ // Get new server endpoint.
+ ipcServerStream = await _ipcServerRouterFactory.AcceptIpcStreamAsync(cancelRouter.Token).ConfigureAwait(false);
+
+ // Get new client endpoint.
+ using var tcpClientStreamTask = _tcpClientRouterFactory.ConnectTcpStreamAsync(cancelRouter.Token);
+
+ // We have a valid ipc stream and a pending tcp stream. Wait for completion
+ // or disconnect of ipc stream.
+ using var checkIpcStreamTask = IsStreamConnectedAsync(ipcServerStream, cancelRouter.Token);
+
+ // Wait for at least completion of one task.
+ await Task.WhenAny(tcpClientStreamTask, checkIpcStreamTask).ConfigureAwait(false);
+
+ // Cancel out any pending tasks not yet completed.
+ cancelRouter.Cancel();
+
+ try
+ {
+ await Task.WhenAll(tcpClientStreamTask, checkIpcStreamTask).ConfigureAwait(false);
+ }
+ catch (Exception)
+ {
+ // Check if we have an accepted tcp stream.
+ if (IsCompletedSuccessfully(tcpClientStreamTask))
+ tcpClientStreamTask.Result?.Dispose();
+
+ if (checkIpcStreamTask.IsFaulted)
+ {
+ _logger?.LogInformation("Broken ipc connection detected, aborting tcp connection.");
+ checkIpcStreamTask.GetAwaiter().GetResult();
+ }
+
+ throw;
+ }
+
+ tcpClientStream = tcpClientStreamTask.Result;
}
catch (Exception)
{
- ipcServerStream?.Dispose();
+ _logger?.LogDebug("Failed creating new router instance.");
- if (connectTimeoutTokenSource.IsCancellationRequested)
- {
- Logger.LogDebug("No ipc stream connected, timing out.");
- throw new TimeoutException();
- }
+ // Cleanup and rethrow.
+ ipcServerStream?.Dispose();
+ tcpClientStream?.Dispose();
throw;
}
- if (ipcServerStream != null)
- Logger.LogDebug("Successfully connected ipc stream.");
-
- return ipcServerStream;
- }
+ // Create new router.
+ _logger?.LogDebug("New router instance successfully created.");
- public void CreatedNewServer(EndPoint localEP)
- {
- if (localEP is IPEndPoint ipEP)
- _tcpServerAddress = _tcpServerAddress.Replace(":0", string.Format(":{0}", ipEP.Port));
+ return new Router(ipcServerStream, tcpClientStream, _logger);
}
}
/// This class creates IPC Client - TCP Server router instances.
/// Supports NamedPipes/UnixDomainSocket client and TCP/IP server.
/// </summary>
- internal class IpcClientTcpServerRouterFactory : TcpServerRouterFactory, IIpcServerTransportCallbackInternal
+ internal class IpcClientTcpServerRouterFactory : DiagnosticsServerRouterFactory
{
- readonly string _ipcClientPath;
+ ILogger _logger;
+ IpcClientRouterFactory _ipcClientRouterFactory;
+ TcpServerRouterFactory _tcpServerRouterFactory;
+
+ public IpcClientTcpServerRouterFactory(string ipcClient, string tcpServer, int runtimeTimeoutMs, ILogger logger)
+ {
+ _logger = logger;
+ _ipcClientRouterFactory = new IpcClientRouterFactory(ipcClient, logger);
+ _tcpServerRouterFactory = new TcpServerRouterFactory(tcpServer, runtimeTimeoutMs, logger);
+ }
- protected int IpcClientTimeoutMs { get; set; } = Timeout.Infinite;
+ public override string IpcAddress
+ {
+ get
+ {
+ return _ipcClientRouterFactory.IpcClientPath;
+ }
+ }
- protected int IpcClientRetryTimeoutMs { get; set; } = 500;
+ public override string TcpAddress
+ {
+ get
+ {
+ return _tcpServerRouterFactory.TcpServerAddress;
+ }
+ }
- public IpcClientTcpServerRouterFactory(string ipcClient, string tcpServer, int runtimeTimeoutMs, ILogger logger)
- : base(tcpServer, runtimeTimeoutMs, logger)
+ public override ILogger Logger
{
- _ipcClientPath = ipcClient;
- _tcpServer.TransportCallback = this;
+ get
+ {
+ return _logger;
+ }
}
public override void Start()
{
- base.Start();
- _logger.LogInformation($"Starting IPC client ({_ipcClientPath}) <--> TCP server ({_tcpServerAddress}) router.");
+ _tcpServerRouterFactory.Start();
+ _logger?.LogInformation($"Starting IPC client ({_ipcClientRouterFactory.IpcClientPath}) <--> TCP server ({_tcpServerRouterFactory.TcpServerAddress}) router.");
+ }
+
+ public override Task Stop()
+ {
+ return _tcpServerRouterFactory.Stop();
+ }
+
+ public override void Reset()
+ {
+ _tcpServerRouterFactory.Reset();
}
public override async Task<Router> CreateRouterAsync(CancellationToken token)
Stream tcpServerStream = null;
Stream ipcClientStream = null;
- Logger.LogDebug("Trying to create a new router instance.");
+ _logger?.LogDebug("Trying to create a new router instance.");
try
{
using CancellationTokenSource cancelRouter = CancellationTokenSource.CreateLinkedTokenSource(token);
// Get new server endpoint.
- tcpServerStream = await AcceptTcpStreamAsync(cancelRouter.Token).ConfigureAwait(false);
+ tcpServerStream = await _tcpServerRouterFactory.AcceptTcpStreamAsync(cancelRouter.Token).ConfigureAwait(false);
// Get new client endpoint.
- using var ipcClientStreamTask = ConnectIpcStreamAsync(cancelRouter.Token);
+ using var ipcClientStreamTask = _ipcClientRouterFactory.ConnectIpcStreamAsync(cancelRouter.Token);
// We have a valid tcp stream and a pending ipc stream. Wait for completion
// or disconnect of tcp stream.
if (checkTcpStreamTask.IsFaulted)
{
- Logger.LogInformation("Broken tcp connection detected, aborting ipc connection.");
+ _logger?.LogInformation("Broken tcp connection detected, aborting ipc connection.");
checkTcpStreamTask.GetAwaiter().GetResult();
}
}
ipcClientStream = ipcClientStreamTask.Result;
- }
- catch (Exception)
- {
- Logger.LogDebug("Failed creating new router instance.");
-
- // Cleanup and rethrow.
- tcpServerStream?.Dispose();
- ipcClientStream?.Dispose();
-
- throw;
- }
-
- // Create new router.
- Logger.LogDebug("New router instance successfully created.");
-
- return new Router(ipcClientStream, tcpServerStream, Logger, (ulong)IpcAdvertise.V1SizeInBytes);
- }
-
- protected async Task<Stream> ConnectIpcStreamAsync(CancellationToken token)
- {
- Stream ipcClientStream = null;
-
- Logger.LogDebug($"Connecting new ipc endpoint \"{_ipcClientPath}\".");
-
- if (RuntimeInformation.IsOSPlatform(OSPlatform.Windows))
- {
- var namedPipe = new NamedPipeClientStream(
- ".",
- _ipcClientPath,
- PipeDirection.InOut,
- PipeOptions.Asynchronous,
- TokenImpersonationLevel.Impersonation);
try
{
- await namedPipe.ConnectAsync(IpcClientTimeoutMs, token).ConfigureAwait(false);
+ // TcpServer consumes advertise message, needs to be replayed back to ipc client stream. Use router process ID as representation.
+ await IpcAdvertise.SerializeAsync(ipcClientStream, _tcpServerRouterFactory.RuntimeInstanceId, (ulong)Process.GetCurrentProcess().Id, token).ConfigureAwait(false);
}
- catch (Exception ex)
+ catch (Exception)
{
- namedPipe?.Dispose();
-
- if (ex is TimeoutException)
- Logger.LogDebug("No ipc stream connected, timing out.");
-
+ _logger?.LogDebug("Failed sending advertise message.");
throw;
}
-
- ipcClientStream = namedPipe;
- }
- else
- {
- bool retry = false;
- IpcUnixDomainSocket unixDomainSocket;
- do
- {
- unixDomainSocket = new IpcUnixDomainSocket();
-
- using var connectTimeoutTokenSource = new CancellationTokenSource();
- using var connectTokenSource = CancellationTokenSource.CreateLinkedTokenSource(token, connectTimeoutTokenSource.Token);
-
- try
- {
- connectTimeoutTokenSource.CancelAfter(IpcClientTimeoutMs);
- await unixDomainSocket.ConnectAsync(new IpcUnixDomainSocketEndPoint(_ipcClientPath), token).ConfigureAwait(false);
- retry = false;
- }
- catch (Exception)
- {
- unixDomainSocket?.Dispose();
-
- if (connectTimeoutTokenSource.IsCancellationRequested)
- {
- Logger.LogDebug("No ipc stream connected, timing out.");
- throw new TimeoutException();
- }
-
- Logger.LogTrace($"Failed connecting {_ipcClientPath}, wait {IpcClientRetryTimeoutMs} ms before retrying.");
-
- // If we get an error (without hitting timeout above), most likely due to unavailable listener.
- // Delay execution to prevent to rapid retry attempts.
- await Task.Delay(IpcClientRetryTimeoutMs, token).ConfigureAwait(false);
-
- if (IpcClientTimeoutMs != Timeout.Infinite)
- throw;
-
- retry = true;
- }
- }
- while (retry);
-
- ipcClientStream = new ExposedSocketNetworkStream(unixDomainSocket, ownsSocket: true);
- }
-
- try
- {
- // ReversedDiagnosticsServer consumes advertise message, needs to be replayed back to ipc client stream. Use router process ID as representation.
- await IpcAdvertise.SerializeAsync(ipcClientStream, RuntimeInstanceId, (ulong)Process.GetCurrentProcess().Id, token).ConfigureAwait(false);
}
catch (Exception)
{
- Logger.LogDebug("Failed sending advertise message.");
+ _logger?.LogDebug("Failed creating new router instance.");
+ // Cleanup and rethrow.
+ tcpServerStream?.Dispose();
ipcClientStream?.Dispose();
+
throw;
}
- if (ipcClientStream != null)
- Logger.LogDebug("Successfully connected ipc stream.");
-
- return ipcClientStream;
- }
+ // Create new router.
+ _logger?.LogDebug("New router instance successfully created.");
- public void CreatedNewServer(EndPoint localEP)
- {
- if (localEP is IPEndPoint ipEP)
- _tcpServerAddress = _tcpServerAddress.Replace(":0", string.Format(":{0}", ipEP.Port));
+ return new Router(ipcClientStream, tcpServerStream, _logger, (ulong)IpcAdvertise.V1SizeInBytes);
}
}
Interlocked.Decrement(ref s_routerInstanceCount);
- _logger.LogTrace($"Diposed stats: Backend->Frontend {_backendToFrontendByteTransfer} bytes, Frontend->Backend {_frontendToBackendByteTransfer} bytes.");
- _logger.LogTrace($"Active instances: {s_routerInstanceCount}");
+ _logger?.LogTrace($"Diposed stats: Backend->Frontend {_backendToFrontendByteTransfer} bytes, Frontend->Backend {_frontendToBackendByteTransfer} bytes.");
+ _logger?.LogTrace($"Active instances: {s_routerInstanceCount}");
}
}
byte[] buffer = new byte[1024];
while (!token.IsCancellationRequested)
{
- _logger.LogTrace("Start reading bytes from backend.");
+ _logger?.LogTrace("Start reading bytes from backend.");
int bytesRead = await _backendStream.ReadAsync(buffer, 0, buffer.Length, token).ConfigureAwait(false);
- _logger.LogTrace($"Read {bytesRead} bytes from backend.");
+ _logger?.LogTrace($"Read {bytesRead} bytes from backend.");
// Check for end of stream indicating that remote end hung-up.
if (bytesRead == 0)
{
- _logger.LogTrace("Backend hung up.");
+ _logger?.LogTrace("Backend hung up.");
break;
}
_backendToFrontendByteTransfer += (ulong)bytesRead;
- _logger.LogTrace($"Start writing {bytesRead} bytes to frontend.");
+ _logger?.LogTrace($"Start writing {bytesRead} bytes to frontend.");
await _frontendStream.WriteAsync(buffer, 0, bytesRead, token).ConfigureAwait(false);
await _frontendStream.FlushAsync().ConfigureAwait(false);
- _logger.LogTrace($"Wrote {bytesRead} bytes to frontend.");
+ _logger?.LogTrace($"Wrote {bytesRead} bytes to frontend.");
}
}
catch (Exception)
// Completing task will trigger dispose of instance and cleanup.
// Faliure mainly consists of closed/disposed streams and cancelation requests.
// Just make sure task gets complete, nothing more needs to be in response to these exceptions.
- _logger.LogTrace("Failed stream operation. Completing task.");
+ _logger?.LogTrace("Failed stream operation. Completing task.");
}
RouterTaskCompleted?.TrySetResult(true);
byte[] buffer = new byte[1024];
while (!token.IsCancellationRequested)
{
- _logger.LogTrace("Start reading bytes from frotend.");
+ _logger?.LogTrace("Start reading bytes from frotend.");
int bytesRead = await _frontendStream.ReadAsync(buffer, 0, buffer.Length, token).ConfigureAwait(false);
- _logger.LogTrace($"Read {bytesRead} bytes from frontend.");
+ _logger?.LogTrace($"Read {bytesRead} bytes from frontend.");
// Check for end of stream indicating that remote end hung-up.
if (bytesRead == 0)
{
- _logger.LogTrace("Frontend hung up.");
+ _logger?.LogTrace("Frontend hung up.");
break;
}
_frontendToBackendByteTransfer += (ulong)bytesRead;
- _logger.LogTrace($"Start writing {bytesRead} bytes to backend.");
+ _logger?.LogTrace($"Start writing {bytesRead} bytes to backend.");
await _backendStream.WriteAsync(buffer, 0, bytesRead, token).ConfigureAwait(false);
await _backendStream.FlushAsync().ConfigureAwait(false);
- _logger.LogTrace($"Wrote {bytesRead} bytes to backend.");
+ _logger?.LogTrace($"Wrote {bytesRead} bytes to backend.");
}
}
catch (Exception)
// Completing task will trigger dispose of instance and cleanup.
// Faliure mainly consists of closed/disposed streams and cancelation requests.
// Just make sure task gets complete, nothing more needs to be in response to these exceptions.
- _logger.LogTrace("Failed stream operation. Completing task.");
+ _logger?.LogTrace("Failed stream operation. Completing task.");
}
RouterTaskCompleted?.TrySetResult(true);