{
}
- async Task<Stream> ConnectTcpStreamAsyncInternal(CancellationToken token, bool retry)
+ private async Task<Stream> ConnectTcpStreamAsyncInternal(CancellationToken token, bool retry)
{
Stream tcpClientStream = null;
return tcpClientStream;
}
- async Task ConnectAsyncInternal(Socket clientSocket, EndPoint remoteEP, CancellationToken token)
+ private async Task ConnectAsyncInternal(Socket clientSocket, EndPoint remoteEP, CancellationToken token)
{
using (token.Register(() => clientSocket.Close(0)))
{
/// </summary>
internal class IpcClientTcpClientRouterFactory : DiagnosticsServerRouterFactory
{
+ bool _updateRuntimeInfo;
Guid _runtimeInstanceId;
ulong _runtimeProcessId;
ILogger _logger;
public IpcClientTcpClientRouterFactory(string ipcClient, string tcpClient, int runtimeTimeoutMs, TcpClientRouterFactory.CreateInstanceDelegate factory, ILogger logger)
{
+ _updateRuntimeInfo = true;
_runtimeInstanceId = Guid.Empty;
_runtimeProcessId = 0;
_logger = logger;
{
_tcpClientRouterFactory.Start();
_logger?.LogInformation($"Starting IPC client ({_ipcClientRouterFactory.IpcClientPath}) <--> TCP client ({_tcpClientRouterFactory.TcpClientAddress}) router.");
-
- var requestRuntimeInfo = new Task(() =>
- {
- try
- {
- _logger?.LogDebug($"Requesting runtime process information.");
-
- // Get new tcp client endpoint.
- using var tcpClientStream = _tcpClientRouterFactory.ConnectTcpStreamAsync(token).Result;
-
- // Request process info.
- IpcMessage message = new IpcMessage(DiagnosticsServerCommandSet.Process, (byte)ProcessCommandId.GetProcessInfo);
-
- byte[] buffer = message.Serialize();
- tcpClientStream.Write(buffer, 0, buffer.Length);
-
- var response = IpcMessage.Parse(tcpClientStream);
- if ((DiagnosticsServerResponseId)response.Header.CommandId == DiagnosticsServerResponseId.OK)
- {
- var info = ProcessInfo.ParseV1(response.Payload);
-
- _runtimeProcessId = info.ProcessId;
- _runtimeInstanceId = info.RuntimeInstanceCookie;
-
- _logger?.LogDebug($"Retrieved runtime process information, pid={_runtimeProcessId}, cookie={_runtimeInstanceId}.");
- }
- else
- {
- throw new ServerErrorException("Failed to retrieve runtime process info.");
- }
- }
- catch (Exception)
- {
- _runtimeProcessId = (ulong)Process.GetCurrentProcess().Id;
- _runtimeInstanceId = Guid.NewGuid();
- _logger?.LogWarning($"Failed to retrieve runtime process info, fallback to current process information, pid={_runtimeProcessId}, cookie={_runtimeInstanceId}.");
- }
- });
-
- requestRuntimeInfo.Start();
- return requestRuntimeInfo;
+ return Task.CompletedTask;
}
public override Task Stop()
Stream tcpClientStream = null;
Stream ipcClientStream = null;
+ int initFrontendToBackendByteTransfer = 0;
+ int initBackendToFrontendByteTransfer = 0;
+
+ await UpdateRuntimeInfo(token).ConfigureAwait(false);
+
_logger?.LogDebug("Trying to create a new router instance.");
try
{
_logger?.LogInformation("Broken tcp connection detected, aborting ipc connection.");
checkTcpStreamTask.GetAwaiter().GetResult();
+ _updateRuntimeInfo = true;
}
throw;
try
{
await IpcAdvertise.SerializeAsync(ipcClientStream, _runtimeInstanceId, _runtimeProcessId, token).ConfigureAwait(false);
+ initBackendToFrontendByteTransfer = IpcAdvertise.V1SizeInBytes;
}
catch (Exception)
{
_logger?.LogDebug("Failed sending advertise message.");
throw;
}
+
+ // Router needs to emulate backend behavior when running in client-client mode.
+ // A new router instance can not be complete until frontend starts to
+ // write data to backend or a new router instance will connect against frontend
+ // that in turn will disconnects previous accepted but pending connections, triggering
+ // frequent connects/disconnects.
+ initFrontendToBackendByteTransfer = await InitFrontendReadBackendWrite(ipcClientStream, tcpClientStream, token).ConfigureAwait(false);
}
catch (Exception)
{
_logger?.LogDebug("Failed creating new router instance.");
+ if (tcpClientStream == null || (tcpClientStream != null && ipcClientStream == null))
+ _updateRuntimeInfo = true;
+
// Cleanup and rethrow.
tcpClientStream?.Dispose();
ipcClientStream?.Dispose();
// Create new router.
_logger?.LogDebug("New router instance successfully created.");
- return new Router(ipcClientStream, tcpClientStream, _logger, (ulong)IpcAdvertise.V1SizeInBytes);
+ return new Router(ipcClientStream, tcpClientStream, _logger, (ulong)initBackendToFrontendByteTransfer, (ulong)initFrontendToBackendByteTransfer);
+ }
+
+ private async Task<int> InitFrontendReadBackendWrite(Stream ipcClientStream, Stream tcpClientStream, CancellationToken token)
+ {
+ using CancellationTokenSource cancelReadConnect = CancellationTokenSource.CreateLinkedTokenSource(token);
+
+ byte[] buffer = new byte[1024];
+ using var readTask = ipcClientStream.ReadAsync(buffer, 0, buffer.Length, cancelReadConnect.Token);
+
+ // Check tcp client connection while waiting on ipc client.
+ using var checkTcpStreamTask = IsStreamConnectedAsync(tcpClientStream, cancelReadConnect.Token);
+
+ // Wait for completion of at least one task.
+ await Task.WhenAny(readTask, checkTcpStreamTask).ConfigureAwait(false);
+
+ // Cancel out any pending tasks not yet completed.
+ cancelReadConnect.Cancel();
+
+ try
+ {
+ await Task.WhenAll(readTask, checkTcpStreamTask).ConfigureAwait(false);
+ }
+ catch (Exception)
+ {
+ if (readTask.IsFaulted)
+ _logger?.LogInformation("Broken ipc connection detected.");
+
+ if (checkTcpStreamTask.IsFaulted)
+ {
+ _logger?.LogInformation("Broken tcp connection detected.");
+ _updateRuntimeInfo = true;
+ }
+
+ throw;
+ }
+
+ var bytesRead = readTask.Result;
+ if (bytesRead == 0)
+ {
+ _logger?.LogDebug("ReverseDiagnosticServer disconnected ipc connection.");
+ throw new DiagnosticsClientException("ReverseDiagnosticServer disconnect detected.");
+ }
+
+ await tcpClientStream.WriteAsync(buffer, 0, bytesRead, token).ConfigureAwait(false);
+
+ return bytesRead;
+ }
+
+ private async Task UpdateRuntimeInfo(CancellationToken token)
+ {
+ if (!_updateRuntimeInfo)
+ return;
+
+ try
+ {
+ _logger?.LogDebug($"Requesting runtime process information.");
+
+ // Get new tcp client endpoint.
+ using var tcpClientStream = await _tcpClientRouterFactory.ConnectTcpStreamAsync(token, true).ConfigureAwait(false);
+
+ // Request process info.
+ IpcMessage message = new IpcMessage(DiagnosticsServerCommandSet.Process, (byte)ProcessCommandId.GetProcessInfo);
+
+ byte[] buffer = message.Serialize();
+ await tcpClientStream.WriteAsync(buffer, 0, buffer.Length, token).ConfigureAwait(false);
+
+ var response = IpcMessage.Parse(tcpClientStream);
+ if ((DiagnosticsServerResponseId)response.Header.CommandId == DiagnosticsServerResponseId.OK)
+ {
+ var info = ProcessInfo.ParseV1(response.Payload);
+
+ _runtimeProcessId = info.ProcessId;
+ _runtimeInstanceId = info.RuntimeInstanceCookie;
+
+ _logger?.LogDebug($"Retrieved runtime process information, pid={_runtimeProcessId}, cookie={_runtimeInstanceId}.");
+ }
+ else
+ {
+ throw new ServerErrorException("Failed to retrieve runtime process info.");
+ }
+ }
+ catch (Exception)
+ {
+ _runtimeProcessId = (ulong)Process.GetCurrentProcess().Id;
+ _runtimeInstanceId = Guid.NewGuid();
+ _logger?.LogWarning($"Failed to retrieve runtime process info, fallback to current process information, pid={_runtimeProcessId}, cookie={_runtimeInstanceId}.");
+ }
+ _updateRuntimeInfo = false;
}
}
Interlocked.Decrement(ref s_routerInstanceCount);
- _logger?.LogTrace($"Diposed stats: Back End->Front End {_backendToFrontendByteTransfer} bytes, Front End->Back End {_frontendToBackendByteTransfer} bytes.");
- _logger?.LogTrace($"Active instances: {s_routerInstanceCount}");
+ _logger?.LogDebug($"Diposed stats: Back End->Front End {_backendToFrontendByteTransfer} bytes, Front End->Back End {_frontendToBackendByteTransfer} bytes.");
+ _logger?.LogDebug($"Active instances: {s_routerInstanceCount}");
}
}
- async Task BackendReadFrontendWrite(CancellationToken token)
+ private async Task BackendReadFrontendWrite(CancellationToken token)
{
try
{
RouterTaskCompleted?.TrySetResult(true);
}
- async Task FrontendReadBackendWrite(CancellationToken token)
+ private async Task FrontendReadBackendWrite(CancellationToken token)
{
try
{