saea.SetBuffer(MemoryMarshal.AsMemory(buffer));
saea.SocketFlags = socketFlags;
saea.WrapExceptionsInIOExceptions = true;
- return saea.SendAsyncForNetworkStream(this);
+ return saea.SendAsyncForNetworkStream(this, cancellationToken);
}
else
{
new ValueTask<int>(Task.FromException<int>(CreateException(error)));
}
- public ValueTask SendAsyncForNetworkStream(Socket socket)
+ public ValueTask SendAsyncForNetworkStream(Socket socket, CancellationToken cancellationToken)
{
Debug.Assert(Volatile.Read(ref _continuation) == null, $"Expected null continuation to indicate reserved for use");
- if (socket.SendAsync(this))
+ if (socket.SendAsync(this, cancellationToken))
{
+ _cancellationToken = cancellationToken;
return new ValueTask(this, _token);
}
});
}
+ [Fact]
+ public async Task WriteAsync_CancelPendingWrite_SucceedsOrThrowsOperationCanceled()
+ {
+ await RunWithConnectedNetworkStreamsAsync(async (server, client) =>
+ {
+ await Assert.ThrowsAnyAsync<OperationCanceledException>(() => client.WriteAsync(new byte[1], 0, 1, new CancellationToken(true)));
+ await Assert.ThrowsAnyAsync<OperationCanceledException>(async () => { await client.WriteAsync(new Memory<byte>(new byte[1]), new CancellationToken(true)); });
+
+ byte[] hugeBuffer = new byte[100_000_000];
+ Exception e;
+
+ var cts = new CancellationTokenSource();
+ Task t = client.WriteAsync(hugeBuffer, 0, hugeBuffer.Length, cts.Token);
+ cts.Cancel();
+ e = await Record.ExceptionAsync(async () => await t);
+ if (e != null)
+ {
+ Assert.IsAssignableFrom<OperationCanceledException>(e);
+ }
+
+ cts = new CancellationTokenSource();
+ ValueTask vt = client.WriteAsync(new Memory<byte>(hugeBuffer), cts.Token);
+ cts.Cancel();
+ e = await Record.ExceptionAsync(async () => await vt);
+ if (e != null)
+ {
+ Assert.IsAssignableFrom<OperationCanceledException>(e);
+ }
+ });
+ }
+
private sealed class CustomSynchronizationContext : SynchronizationContext
{
public override void Post(SendOrPostCallback d, object state)