public Task? CurrentSend { get { return current_send; } }
public WasmDebuggerConnection Connection { get; init; }
+ public string Id => Connection.Id;
public DevToolsQueue(WasmDebuggerConnection conn)
{
--- /dev/null
+// Licensed to the .NET Foundation under one or more agreements.
+// The .NET Foundation licenses this file to you under the MIT license.
+
+using System;
+using System.Threading;
+using System.Threading.Tasks;
+using System.Threading.Channels;
+using System.Collections.Generic;
+using Microsoft.Extensions.Logging;
+
+#nullable enable
+
+namespace Microsoft.WebAssembly.Diagnostics;
+
+internal sealed class RunLoop : IDisposable
+{
+ public event EventHandler<RunLoopExitState>? RunLoopStopped;
+ public bool IsRunning => StoppedState is null;
+ public RunLoopExitState? StoppedState { get; private set; }
+
+ private TaskCompletionSource<Exception> _failRequested { get; } = new();
+ private TaskCompletionSource _shutdownRequested { get; } = new();
+ private readonly ChannelWriter<Task> _channelWriter;
+ private readonly ChannelReader<Task> _channelReader;
+ private readonly DevToolsQueue[] _queues;
+ private readonly ILogger _logger;
+
+ public RunLoop(DevToolsQueue[] queues, ILogger logger)
+ {
+ if (queues.Length == 0)
+ throw new ArgumentException($"Minimum of one queue need to run", nameof(queues));
+
+ foreach (DevToolsQueue q in queues)
+ {
+ if (q.Connection.OnReadAsync is null)
+ throw new ArgumentException($"Queue's({q.Id}) connection doesn't have a OnReadAsync handler set");
+ }
+
+ _logger = logger;
+ _queues = queues;
+
+ var channel = Channel.CreateUnbounded<Task>(new UnboundedChannelOptions { SingleReader = true });
+ _channelWriter = channel.Writer;
+ _channelReader = channel.Reader;
+ }
+
+ public Task RunAsync(CancellationTokenSource cts)
+ => Task.Run(async () =>
+ {
+ RunLoopExitState exitState;
+
+ try
+ {
+ exitState = await RunActualAsync(cts);
+ StoppedState = exitState;
+ }
+ catch (Exception ex)
+ {
+ _channelWriter.Complete(ex);
+ _logger.LogDebug($"RunLoop threw an exception: {ex}");
+ StoppedState = new(RunLoopStopReason.Exception, ex);
+ RunLoopStopped?.Invoke(this, StoppedState);
+ return;
+ }
+ finally
+ {
+ if (!cts.IsCancellationRequested)
+ cts.Cancel();
+ }
+
+ try
+ {
+ _logger.LogDebug($"RunLoop stopped, reason: {exitState}");
+ RunLoopStopped?.Invoke(this, exitState);
+ }
+ catch (Exception ex)
+ {
+ _logger.LogError(ex, $"Invoking RunLoopStopped event ({exitState}) failed with {ex}");
+ }
+ });
+
+ private async Task<RunLoopExitState> RunActualAsync(CancellationTokenSource x)
+ {
+ List<Task> pending_ops;
+ List<Task> tmp_ops = new();
+ int numFixed;
+
+ // Fixed index tasks
+ {
+ pending_ops = new();
+
+ for (int i = 0; i < _queues.Length; i++)
+ pending_ops.Add(_queues[i].Connection.ReadOneAsync(x.Token));
+ pending_ops.Add(_failRequested.Task);
+ pending_ops.Add(_shutdownRequested.Task);
+
+ numFixed = pending_ops.Count;
+ }
+
+ Task<bool> readerTask = _channelReader.WaitToReadAsync(x.Token).AsTask();
+ pending_ops.Add(readerTask);
+
+ int numQueues = _queues.Length;
+ while (!x.IsCancellationRequested)
+ {
+ Task completedTask = await Task.WhenAny(pending_ops.ToArray()).ConfigureAwait(false);
+
+ if (_shutdownRequested.Task.IsCompleted)
+ return new(RunLoopStopReason.Shutdown, null);
+ if (_failRequested.Task.IsCompleted)
+ return new(RunLoopStopReason.Exception, await _failRequested.Task);
+
+ int completedIdx = pending_ops.IndexOf(completedTask);
+ if (completedTask.IsFaulted)
+ {
+ return (completedIdx < numQueues && !_queues[completedIdx].Connection.IsConnected)
+ ? new(RunLoopStopReason.ConnectionClosed, new Exception($"Connection id: {_queues[completedIdx].Id}", completedTask.Exception))
+ : new(RunLoopStopReason.Exception, completedTask.Exception);
+ }
+
+ if (x.IsCancellationRequested)
+ return new(RunLoopStopReason.Cancelled, null);
+
+ // Ensure the fixed slots are filled
+ for (int i = 0; i < numFixed; i++)
+ tmp_ops.Add(pending_ops[i]);
+
+ for (int queueIdx = 0; queueIdx < numQueues; queueIdx++)
+ {
+ DevToolsQueue curQueue = _queues[queueIdx];
+ if (curQueue.TryPumpIfCurrentCompleted(x.Token, out Task? tsk))
+ tmp_ops.Add(tsk);
+
+ Task queueReadTask = pending_ops[queueIdx];
+ if (!queueReadTask.IsCompleted)
+ continue;
+
+ string msg = await (Task<string>)queueReadTask;
+ tmp_ops[queueIdx] = curQueue.Connection.ReadOneAsync(x.Token);
+ if (msg != null)
+ {
+ Task? readHandlerTask = curQueue.Connection.OnReadAsync?.Invoke(msg, x.Token);
+ if (readHandlerTask != null)
+ tmp_ops.Add(readHandlerTask);
+ }
+ }
+
+ // Remaining tasks *after* the fixed ones
+ for (int pendingOpsIdx = numFixed; pendingOpsIdx < pending_ops.Count; pendingOpsIdx++)
+ {
+ Task t = pending_ops[pendingOpsIdx];
+ if (t.IsFaulted)
+ return new(RunLoopStopReason.Exception, t.Exception);
+ if (t.IsCanceled)
+ return new(RunLoopStopReason.Cancelled, null);
+
+ if (!t.IsCompleted)
+ {
+ tmp_ops.Add(t);
+ continue;
+ }
+ }
+
+ // Add any tasks that were received over the channel
+ if (readerTask.IsCompleted)
+ {
+ while (_channelReader.TryRead(out Task? newTask))
+ tmp_ops.Add(newTask);
+
+ readerTask = _channelReader.WaitToReadAsync(x.Token).AsTask();
+ tmp_ops.Add(readerTask);
+ }
+
+ pending_ops = tmp_ops;
+ tmp_ops = new(capacity: pending_ops.Count + 10);
+ }
+
+ _channelWriter.Complete();
+ if (_shutdownRequested.Task.IsCompleted)
+ return new(RunLoopStopReason.Shutdown, null);
+ return x.IsCancellationRequested
+ ? new(RunLoopStopReason.Cancelled, null)
+ : new(RunLoopStopReason.Exception,
+ new InvalidOperationException($"This shouldn't ever get thrown. Unsure why the loop stopped"));
+ }
+
+ public Task Send(byte[] payload, CancellationToken token, DevToolsQueue? queue = null)
+ {
+ queue ??= _queues[0];
+ Task? task = queue.Send(payload, token);
+ return task is null
+ ? Task.CompletedTask
+ : _channelWriter.WriteAsync(task, token).AsTask();
+ }
+
+ public void Fail(Exception exception)
+ {
+ if (_failRequested.Task.IsCompleted)
+ _logger.LogError($"Fail requested again with {exception}");
+ else
+ _failRequested.TrySetResult(exception);
+ }
+
+ // FIXME: Continue with to catch any errors in shutting down
+ public void Shutdown() => Task.Run(async () => await ShutdownAsync(CancellationToken.None));
+
+ public async Task ShutdownAsync(CancellationToken cancellationToken)
+ {
+ if (_shutdownRequested.Task.IsCompleted)
+ {
+ _logger.LogDebug($"Shutdown was already requested once. Ignoring");
+ return;
+ }
+
+ foreach (DevToolsQueue q in _queues)
+ await q.Connection.ShutdownAsync(cancellationToken);
+
+ _shutdownRequested.TrySetResult();
+ }
+
+ public void Dispose()
+ {
+ foreach (DevToolsQueue q in _queues)
+ q.Connection.Dispose();
+ }
+}
protected WasmDebuggerConnection(string id) => Id = id;
public abstract bool IsConnected { get; }
+ public Func<string, CancellationToken, Task>? OnReadAsync { get; set; }
public abstract Task<string?> ReadOneAsync(CancellationToken token);
public abstract Task SendAsync(byte[] bytes, CancellationToken token);
{
internal class DevToolsProxy
{
- protected TaskCompletionSource<Exception> side_exception = new();
- protected TaskCompletionSource shutdown_requested = new();
protected Dictionary<MessageId, TaskCompletionSource<Result>> pending_cmds = new Dictionary<MessageId, TaskCompletionSource<Result>>();
- protected WasmDebuggerConnection browser;
- protected WasmDebuggerConnection ide;
+ protected DevToolsQueue browser;
+ protected DevToolsQueue ide;
private int next_cmd_id;
- private readonly ChannelWriter<Task> _channelWriter;
- private readonly ChannelReader<Task> _channelReader;
- protected List<DevToolsQueue> queues = new List<DevToolsQueue>();
-
protected readonly ILogger logger;
+ protected RunLoop _runLoop;
private readonly string _loggerId;
public event EventHandler<RunLoopExitState> RunLoopStopped;
- public bool IsRunning => Stopped is null;
- public RunLoopExitState Stopped { get; private set; }
+ public bool IsRunning => _runLoop?.IsRunning == true;
+ public RunLoopExitState Stopped => _runLoop?.StoppedState;
public DevToolsProxy(ILogger logger, string loggerId)
{
_loggerId = loggerId;
this.logger = logger;
-
- var channel = Channel.CreateUnbounded<Task>(new UnboundedChannelOptions { SingleReader = true });
- _channelWriter = channel.Writer;
- _channelReader = channel.Reader;
}
protected int GetNewCmdId() => Interlocked.Increment(ref next_cmd_id);
return Task.FromResult(false);
}
- private DevToolsQueue GetQueueForConnection(WasmDebuggerConnection conn)
- => queues.FirstOrDefault(q => q.Connection == conn);
-
- protected DevToolsQueue GetQueueForTask(Task task)
+ protected Task Send(DevToolsQueue queue, JObject o, CancellationToken token)
{
- return queues.FirstOrDefault(q => q.CurrentSend == task);
- }
-
- protected async Task Send(WasmDebuggerConnection conn, JObject o, CancellationToken token)
- {
- logger.LogTrace($"to-{conn.Id}: {GetFromOrTo(o)} {o}");
+ logger.LogTrace($"to-{queue.Id}: {GetFromOrTo(o)} {o}");
var msg = o.ToString(Formatting.None);
var bytes = Encoding.UTF8.GetBytes(msg);
- DevToolsQueue queue = GetQueueForConnection(conn);
-
- Task task = queue.Send(bytes, token);
- if (task != null)
- await _channelWriter.WriteAsync(task, token);
+ return _runLoop.Send(bytes, token, queue);
}
protected virtual async Task OnEvent(SessionId sessionId, JObject parms, CancellationToken token)
}
catch (Exception e)
{
- side_exception.TrySetResult(e);
+ _runLoop.Fail(e);
}
}
}
catch (Exception e)
{
- side_exception.TrySetResult(e);
+ _runLoop.Fail(e);
}
}
}
catch (Exception ex)
{
- side_exception.TrySetResult(ex);
+ _runLoop.Fail(ex);
throw;
}
}
}
catch (Exception ex)
{
- side_exception.TrySetResult(ex);
+ _runLoop.Fail(ex);
throw;
}
}
using var ideConn = new DevToolsDebuggerConnection(ideSocket, "ide", logger);
using var browserConn = new DevToolsDebuggerConnection(browserSocket, "browser", logger);
- await StartRunLoop(ideConn: ideConn, browserConn: browserConn, cts);
+ await RunLoopAsync(ideConn: ideConn, browserConn: browserConn, cts);
}
catch (Exception ex)
{
}
}
- protected Task StartRunLoop(WasmDebuggerConnection ideConn, WasmDebuggerConnection browserConn, CancellationTokenSource cts)
- => Task.Run(async () =>
- {
- try
- {
- RunLoopExitState exitState;
-
- try
- {
- Stopped = await RunLoopActual(ideConn, browserConn, cts);
- exitState = Stopped;
- }
- catch (Exception ex)
- {
- logger.LogDebug($"RunLoop threw an exception: {ex}");
- Stopped = new(RunLoopStopReason.Exception, ex);
- RunLoopStopped?.Invoke(this, Stopped);
- return;
- }
-
- try
- {
- logger.LogDebug($"RunLoop stopped, reason: {exitState}");
- RunLoopStopped?.Invoke(this, Stopped);
- }
- catch (Exception ex)
- {
- logger.LogError(ex, $"Invoking RunLoopStopped event ({exitState}) failed with {ex}");
- }
- }
- finally
- {
- ideConn?.Dispose();
- browserConn?.Dispose();
- }
- });
-
-
- private async Task<RunLoopExitState> RunLoopActual(WasmDebuggerConnection ideConn,
- WasmDebuggerConnection browserConn,
- CancellationTokenSource cts)
+ protected async Task RunLoopAsync(WasmDebuggerConnection ideConn, WasmDebuggerConnection browserConn, CancellationTokenSource cts)
{
- using (ide = ideConn)
+ try
{
- queues.Add(new DevToolsQueue(ide));
- using (browser = browserConn)
- {
- queues.Add(new DevToolsQueue(browser));
- var x = cts;
-
- List<Task> pending_ops = new();
-
- pending_ops.Add(browser.ReadOneAsync(x.Token));
- pending_ops.Add(ide.ReadOneAsync(x.Token));
- pending_ops.Add(side_exception.Task);
- pending_ops.Add(shutdown_requested.Task);
- Task<bool> readerTask = _channelReader.WaitToReadAsync(x.Token).AsTask();
- pending_ops.Add(readerTask);
-
- try
- {
- while (!x.IsCancellationRequested)
- {
- Task completedTask = await Task.WhenAny(pending_ops.ToArray()).ConfigureAwait(false);
-
- if (shutdown_requested.Task.IsCompleted)
- {
- x.Cancel();
- return new(RunLoopStopReason.Shutdown, null);
- }
-
- if (side_exception.Task.IsCompleted)
- return new(RunLoopStopReason.Exception, await side_exception.Task);
-
- if (completedTask.IsFaulted)
- {
- if (completedTask == pending_ops[0] && !browser.IsConnected)
- return new(RunLoopStopReason.HostConnectionClosed, completedTask.Exception);
- else if (completedTask == pending_ops[1] && !ide.IsConnected)
- return new(RunLoopStopReason.IDEConnectionClosed, completedTask.Exception);
-
- return new(RunLoopStopReason.Exception, completedTask.Exception);
- }
-
- if (x.IsCancellationRequested)
- return new(RunLoopStopReason.Cancelled, null);
-
- // FIXME: instead of this, iterate through pending_ops, and clear it
- // out every time we wake up
- if (pending_ops.Where(t => t.IsFaulted).FirstOrDefault() is Task faultedTask)
- return new(RunLoopStopReason.Exception, faultedTask.Exception);
-
- if (readerTask.IsCompleted)
- {
- while (_channelReader.TryRead(out Task newTask))
- {
- pending_ops.Add(newTask);
- }
-
- pending_ops[4] = _channelReader.WaitToReadAsync(x.Token).AsTask();
- }
-
- // logger.LogDebug("pump {0} {1}", completedTask, pending_ops.IndexOf (completedTask));
- if (completedTask == pending_ops[0])
- {
- string msg = await (Task<string>)completedTask;
- if (msg != null)
- {
- pending_ops[0] = browser.ReadOneAsync(x.Token);
- Task newTask = ProcessBrowserMessage(msg, x.Token);
- if (newTask != null)
- pending_ops.Add(newTask);
- }
- }
- else if (completedTask == pending_ops[1])
- {
- string msg = await (Task<string>)completedTask;
- if (msg != null)
- {
- pending_ops[1] = ide.ReadOneAsync(x.Token);
- Task newTask = ProcessIdeMessage(msg, x.Token);
- if (newTask != null)
- pending_ops.Add(newTask);
- }
- }
- else if (completedTask == pending_ops[2])
- {
- throw await (Task<Exception>)completedTask;
- }
- else
- {
- //must be a background task
- pending_ops.Remove(completedTask);
- DevToolsQueue queue = GetQueueForTask(completedTask);
- if (queue != null)
- {
- if (queue.TryPumpIfCurrentCompleted(x.Token, out Task tsk))
- pending_ops.Add(tsk);
- }
- }
- }
-
- _channelWriter.Complete();
- if (shutdown_requested.Task.IsCompleted)
- return new(RunLoopStopReason.Shutdown, null);
- if (x.IsCancellationRequested)
- return new(RunLoopStopReason.Cancelled, null);
-
- return new(RunLoopStopReason.Exception, new InvalidOperationException($"This shouldn't ever get thrown. Unsure why the loop stopped"));
- }
- catch (Exception e)
- {
- _channelWriter.Complete(e);
- throw;
- }
- finally
- {
- if (!x.IsCancellationRequested)
- x.Cancel();
- foreach (Task t in pending_ops)
- logger.LogDebug($"\t{t}: {t.Status}");
- logger.LogDebug($"browser: {browser.IsConnected}, ide: {ide.IsConnected}");
-
- queues?.Clear();
- }
- }
+ this.ide = new DevToolsQueue(ideConn);
+ this.browser = new DevToolsQueue(browserConn);
+ ideConn.OnReadAsync = ProcessIdeMessage;
+ browserConn.OnReadAsync = ProcessBrowserMessage;
+ _runLoop = new(new[] { ide, browser }, logger);
+ _runLoop.RunLoopStopped += RunLoopStopped;
+ await _runLoop.RunAsync(cts);
+ }
+ finally
+ {
+ _runLoop?.Dispose();
+ _runLoop = null;
}
}
- public virtual void Shutdown()
- {
- logger.LogDebug($"Proxy.Shutdown, browser: {browser.IsConnected}, ide: {ide.IsConnected}");
- shutdown_requested.TrySetResult();
- }
-
- public void Fail(Exception exception)
- {
- if (side_exception.Task.IsCompleted)
- logger.LogError($"Fail requested again with {exception}");
- else
- side_exception.TrySetResult(exception);
- }
+ public virtual void Shutdown() => _runLoop?.Shutdown();
+ public void Fail(Exception exception) => _runLoop?.Fail(exception);
protected virtual string GetFromOrTo(JObject o) => string.Empty;
await browserClient.ConnectAsync("127.0.0.1", portBrowser);
logger.LogTrace($".. connected to the browser!");
- await StartRunLoop(ideConn, browserConn, cts);
+ await RunLoopAsync(ideConn, browserConn, cts);
if (Stopped?.reason == RunLoopStopReason.Exception)
ExceptionDispatchInfo.Capture(Stopped.exception).Throw();
}
}
catch (Exception e)
{
- side_exception.TrySetException(e);
+ _runLoop.Fail(e);
}
}
catch (Exception e)
{
logger.LogError($"OnCommand for id: {id}, {parms} failed: {e}");
- side_exception.TrySetException(e);
+ _runLoop.Fail(e);
}
}
}
catch (Exception ex)
{
- // FIXME: using `side_exception` right now because the runloop doesn't
- // immediately look at all faulted tasks
logger.LogError(ex.ToString());
- side_exception.TrySetResult(ex);
+ _runLoop.Fail(ex);
throw;
}
}
catch (Exception ex)
{
logger.LogError(ex.ToString());
- side_exception.TrySetResult(ex);
+ _runLoop.Fail(ex);
throw;
}
}
continue;
}
- Log("debug", $"frame il offset: {il_pos} method token: {method.Info.Token} assembly name: {method.Info.Assembly.Name}");
- Log("debug", $"\tmethod {method.Name} location: {location}");
+ // logger.LogTrace($"frame il offset: {il_pos} method token: {method.Info.Token} assembly name: {method.Info.Assembly.Name}");
+ // logger.LogTrace($"\tmethod {method.Name} location: {location}");
frames.Add(new Frame(method, location, frame_id));
callFrames.Add(new
Shutdown,
Cancelled,
Exception,
- ProxyConnectionClosed,
- IDEConnectionClosed,
- HostConnectionClosed
-
+ ConnectionClosed
}
{
internal class DevToolsClient : IDisposable
{
- DevToolsQueue _queue;
- protected WasmDebuggerConnection _conn;
- TaskCompletionSource _shutdownRequested = new TaskCompletionSource();
- readonly TaskCompletionSource<Exception> _failRequested = new();
- TaskCompletionSource _newSendTaskAvailable = new ();
protected readonly ILogger logger;
+ protected RunLoop _runLoop;
public event EventHandler<RunLoopExitState> RunLoopStopped;
protected virtual void Dispose(bool disposing)
{
- if (disposing)
- _conn.Dispose();
+ _runLoop?.Dispose();
+ _runLoop = null;
}
- public async Task Shutdown(CancellationToken cancellationToken)
- {
- if (_shutdownRequested.Task.IsCompleted)
- {
- logger.LogDebug($"Shutdown was already requested once. Ignoring");
- return;
- }
-
- await _conn.ShutdownAsync(cancellationToken);
- _shutdownRequested.TrySetResult();
- }
+ public Task ShutdownAsync(CancellationToken cancellationToken)
+ => _runLoop is null
+ ? Task.CompletedTask
+ : _runLoop.ShutdownAsync(cancellationToken);
- public void Fail(Exception exception)
- {
- if (_failRequested.Task.IsCompleted)
- logger.LogError($"Fail requested again with {exception}");
- else
- _failRequested.TrySetResult(exception);
- }
-
- protected void Send(byte[] bytes, CancellationToken token)
- {
- Task sendTask = _queue.Send(bytes, token);
- if (sendTask != null)
- _newSendTaskAvailable.TrySetResult();
- }
+ public void Fail(Exception exception) => _runLoop?.Fail(exception);
+ protected void Send(byte[] bytes, CancellationToken token) => _runLoop?.Send(bytes, token);
protected async Task<ClientWebSocket> ConnectToWebServer(Uri uri, CancellationToken token)
{
protected virtual Task<WasmDebuggerConnection> SetupConnection(Uri webserverUri, CancellationToken token)
=> throw new NotImplementedException();
- protected async Task ConnectWithMainLoops(
+ protected async Task ConnectAndStartRunLoopAsync(
Uri uri,
Func<string, CancellationToken, Task> receive,
CancellationTokenSource cts)
{
- CancellationToken token = cts.Token;
- _conn = await SetupConnection(uri, token);
- _queue = new DevToolsQueue(_conn);
-
- _ = Task.Run(async () =>
- {
- try
- {
- RunLoopExitState exitState;
-
- try
- {
- exitState = await RunLoop(receive, cts);
- }
- catch (Exception ex)
- {
- logger.LogDebug($"RunLoop threw an exception. (parentToken: {token.IsCancellationRequested}, linked: {cts.IsCancellationRequested}): {ex} ");
- RunLoopStopped?.Invoke(this, new(RunLoopStopReason.Exception, ex));
- return;
- }
-
- try
- {
- logger.LogDebug($"RunLoop stopped, reason: {exitState}. (parentToken: {token.IsCancellationRequested}, linked: {cts.IsCancellationRequested}): {exitState.exception?.Message}");
- RunLoopStopped?.Invoke(this, exitState);
- }
- catch (Exception ex)
- {
- logger.LogError(ex, $"Invoking RunLoopStopped event failed for ({exitState}) with {ex}");
- }
- }
- finally
- {
- cts.Cancel();
- _conn?.Dispose();
- if (_conn is DevToolsDebuggerConnection wsc)
- logger.LogDebug($"Loop ended with socket: {wsc.WebSocket.State}");
- else
- logger.LogDebug($"Loop ended");
- }
- });
- }
-
- private async Task<RunLoopExitState> RunLoop(
- Func<string, CancellationToken, Task> receive,
- CancellationTokenSource cts)
- {
- var pending_ops = new List<Task>
- {
- _conn.ReadOneAsync(cts.Token),
- _newSendTaskAvailable.Task,
- _shutdownRequested.Task,
- _failRequested.Task
- };
-
- // In case we had a Send called already
- if (_queue.TryPumpIfCurrentCompleted(cts.Token, out Task sendTask))
- pending_ops.Add(sendTask);
-
- while (!cts.IsCancellationRequested)
- {
- var task = await Task.WhenAny(pending_ops).ConfigureAwait(false);
-
- if (_shutdownRequested.Task.IsCompleted)
- return new(RunLoopStopReason.Shutdown, null);
-
- if (task.IsCanceled && cts.IsCancellationRequested)
- return new(RunLoopStopReason.Cancelled, null);
-
- if (task.IsFaulted)
- {
- if (task == pending_ops[0] && !_conn.IsConnected)
- return new(RunLoopStopReason.ProxyConnectionClosed, task.Exception);
-
- return new(RunLoopStopReason.Exception, task.Exception);
- }
- if (_failRequested.Task.IsCompleted)
- return new(RunLoopStopReason.Exception, _failRequested.Task.Result);
-
- // FIXME: instead of this, iterate through pending_ops, and clear it
- // out every time we wake up
- if (pending_ops.Where(t => t.IsFaulted).FirstOrDefault() is Task faultedTask)
- return new(RunLoopStopReason.Exception, faultedTask.Exception);
-
- if (_newSendTaskAvailable.Task.IsCompleted)
- {
- // Just needed to wake up. the new task has already
- // been added to pending_ops
- _newSendTaskAvailable = new ();
- pending_ops[1] = _newSendTaskAvailable.Task;
-
- _queue.TryPumpIfCurrentCompleted(cts.Token, out _);
- if (_queue.CurrentSend != null)
- pending_ops.Add(_queue.CurrentSend);
- }
-
- if (task == pending_ops[0])
- {
- var msg = await (Task<string>)pending_ops[0];
- pending_ops[0] = _conn.ReadOneAsync(cts.Token);
-
- if (msg != null)
- {
- Task tsk = receive(msg, cts.Token);
- if (tsk != null)
- pending_ops.Add(tsk);
- }
- }
- else
- {
- //must be a background task
- pending_ops.Remove(task);
- if (task == _queue.CurrentSend && _queue.TryPumpIfCurrentCompleted(cts.Token, out sendTask))
- pending_ops.Add(sendTask);
- }
- }
-
- if (cts.IsCancellationRequested)
- return new(RunLoopStopReason.Cancelled, null);
-
- return new(RunLoopStopReason.Exception, new InvalidOperationException($"This shouldn't ever get thrown. Unsure why the loop stopped"));
+ WasmDebuggerConnection conn = await SetupConnection(uri, cts.Token);
+ conn.OnReadAsync = receive;
+ _runLoop = new(new[] { new DevToolsQueue(conn) }, logger);
+ _runLoop.RunLoopStopped += (sender, args) => RunLoopStopped?.Invoke(sender, args);
+ _ = _runLoop.RunAsync(cts);
}
}
}
// Webserver connection is closed
// So, stop the loop here too
// _clientInitiatedClose.TrySetResult();
- await Shutdown(token);
+ await ShutdownAsync(token);
}, TaskContinuationOptions.NotOnRanToCompletion | TaskContinuationOptions.RunContinuationsAsynchronously)
.ConfigureAwait(false);
if (t != clientRunLoopStopped.Task)
Assert.Fail($"Proxy did not stop, as expected");
RunLoopExitState? state = await clientRunLoopStopped.Task;
- if (state.reason != RunLoopStopReason.ProxyConnectionClosed)
- Assert.Fail($"Client runloop did not stop with ProxyConnectionClosed. state: {state}.{Environment.NewLine}SendCommand had failed with {ex}");
+ if (state.reason != RunLoopStopReason.ConnectionClosed)
+ Assert.Fail($"Client runloop did not stop with ConnectionClosed. state: {state}.{Environment.NewLine}SendCommand had failed with {ex}");
}
}
try
{
TestHarnessProxy.ShutdownProxy(Id.ToString());
- await Client.Shutdown(_cancellationTokenSource.Token).ConfigureAwait(false);
+ await Client.ShutdownAsync(_cancellationTokenSource.Token).ConfigureAwait(false);
}
catch (Exception ex)
{
}
};
- await ConnectWithMainLoops(uri, HandleMessage, cts);
+ await ConnectAndStartRunLoopAsync(uri, HandleMessage, cts);
}
public Task<Result> SendCommand(string method, JObject args, CancellationToken token)