add timeout handling to QuicStream (#56060)
authorTomas Weinfurt <tweinfurt@yahoo.com>
Wed, 11 Aug 2021 04:09:47 +0000 (21:09 -0700)
committerGitHub <noreply@github.com>
Wed, 11 Aug 2021 04:09:47 +0000 (21:09 -0700)
* add timeout handling to QuicStream

* feedback from review

* fix bad resolve

* feedback from review

src/libraries/System.Net.Quic/ref/System.Net.Quic.cs
src/libraries/System.Net.Quic/src/Resources/Strings.resx
src/libraries/System.Net.Quic/src/System/Net/Quic/Implementations/Mock/MockStream.cs
src/libraries/System.Net.Quic/src/System/Net/Quic/Implementations/MsQuic/MsQuicStream.cs
src/libraries/System.Net.Quic/src/System/Net/Quic/Implementations/QuicStreamProvider.cs
src/libraries/System.Net.Quic/src/System/Net/Quic/QuicStream.cs
src/libraries/System.Net.Quic/tests/FunctionalTests/QuicStreamConnectedStreamConformanceTests.cs

index 4c7e48ccda82024cf04dc37a67cb9589de7b2097..93b63fb465bd45ed9e881e4999620248b972f5cd 100644 (file)
@@ -85,6 +85,7 @@ namespace System.Net.Quic
         public override bool CanRead { get { throw null; } }
         public override bool CanSeek { get { throw null; } }
         public override bool CanWrite { get { throw null; } }
+        public override bool CanTimeout { get { throw null; } }
         public override long Length { get { throw null; } }
         public override long Position { get { throw null; } set { } }
         public long StreamId { get { throw null; } }
@@ -101,6 +102,7 @@ namespace System.Net.Quic
         public override int Read(System.Span<byte> buffer) { throw null; }
         public override System.Threading.Tasks.Task<int> ReadAsync(byte[] buffer, int offset, int count, System.Threading.CancellationToken cancellationToken) { throw null; }
         public override System.Threading.Tasks.ValueTask<int> ReadAsync(System.Memory<byte> buffer, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
+        public override int ReadTimeout { get { throw null; } set { } }
         public override long Seek(long offset, System.IO.SeekOrigin origin) { throw null; }
         public override void SetLength(long value) { }
         public void Shutdown() { }
@@ -114,6 +116,7 @@ namespace System.Net.Quic
         public override System.Threading.Tasks.ValueTask WriteAsync(System.ReadOnlyMemory<byte> buffer, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
         public System.Threading.Tasks.ValueTask WriteAsync(System.ReadOnlyMemory<System.ReadOnlyMemory<byte>> buffers, bool endStream, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
         public System.Threading.Tasks.ValueTask WriteAsync(System.ReadOnlyMemory<System.ReadOnlyMemory<byte>> buffers, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
+        public override int WriteTimeout { get { throw null; } set { } }
     }
     public partial class QuicStreamAbortedException : System.Net.Quic.QuicException
     {
index aeecb31ba29768a52ec1b09655348ce250152325..52a12c3c0eaf1c822f18d689cb10d9d3053f2261 100644 (file)
   <data name="net_quic_writing_notallowed" xml:space="preserve">
     <value>Writing is not allowed on stream.</value>
   </data>
+  <data name="net_quic_timeout_use_gt_zero" xml:space="preserve">
+    <value>Timeout can only be set to 'System.Threading.Timeout.Infinite' or a value &gt; 0.</value>
+  </data>
+  <data name="net_quic_timeout" xml:space="preserve">
+    <value>Connection timed out.</value>
+  </data>
   <data name="net_quic_ssl_option" xml:space="preserve">
     <value>'{0}' is not supported by System.Net.Quic.</value>
   </data>
index fde0eab97d197bd268ce5f21161b8ad135caa60a..1b58009a2fd3555910b1bd5f2452df70ba16012d 100644 (file)
@@ -42,6 +42,20 @@ namespace System.Net.Quic.Implementations.Mock
 
         private StreamBuffer? ReadStreamBuffer => _isInitiator ? _streamState._inboundStreamBuffer : _streamState._outboundStreamBuffer;
 
+        internal override bool CanTimeout => false;
+
+        internal override int ReadTimeout
+        {
+            get => throw new InvalidOperationException();
+            set => throw new InvalidOperationException();
+        }
+
+        internal override int WriteTimeout
+        {
+            get => throw new InvalidOperationException();
+            set => throw new InvalidOperationException();
+        }
+
         internal override bool CanRead => !_disposed && ReadStreamBuffer is not null;
 
         internal override int Read(Span<byte> buffer)
index aa9c981a12202acf63e2d6cb3c936752b3361867..6b70cc9b3c3b75ac5fffac07cd7c227eb1b860bc 100644 (file)
@@ -4,6 +4,7 @@
 using System.Buffers;
 using System.Collections.Generic;
 using System.Diagnostics;
+using System.IO;
 using System.Net.Quic.Implementations.MsQuic.Internal;
 using System.Runtime.ExceptionServices;
 using System.Runtime.InteropServices;
@@ -192,6 +193,47 @@ namespace System.Net.Quic.Implementations.MsQuic
 
         internal override bool CanWrite => _disposed == 0 && _canWrite;
 
+        internal override bool CanTimeout => true;
+
+        private int _readTimeout = Timeout.Infinite;
+
+        internal override int ReadTimeout
+        {
+            get
+            {
+                ThrowIfDisposed();
+                return _readTimeout;
+            }
+            set
+            {
+                ThrowIfDisposed();
+                if (value <= 0 && value != System.Threading.Timeout.Infinite)
+                {
+                    throw new ArgumentOutOfRangeException(nameof(value), SR.net_quic_timeout_use_gt_zero);
+                }
+                _readTimeout = value;
+            }
+        }
+
+        private int _writeTimeout = Timeout.Infinite;
+        internal override int WriteTimeout
+        {
+            get
+            {
+                ThrowIfDisposed();
+                return  _writeTimeout;
+            }
+            set
+            {
+                ThrowIfDisposed();
+                if (value <= 0 && value != System.Threading.Timeout.Infinite)
+                {
+                    throw new ArgumentOutOfRangeException(nameof(value), SR.net_quic_timeout_use_gt_zero);
+                }
+                _writeTimeout = value;
+            }
+        }
+
         internal override long StreamId
         {
             get
@@ -404,7 +446,6 @@ namespace System.Net.Quic.Implementations.MsQuic
                         {
                             var state = (State)obj!;
                             bool completePendingRead;
-
                             lock (state)
                             {
                                 completePendingRead = state.ReadState == ReadState.PendingRead;
@@ -593,24 +634,54 @@ namespace System.Net.Quic.Implementations.MsQuic
         {
             ThrowIfDisposed();
             byte[] rentedBuffer = ArrayPool<byte>.Shared.Rent(buffer.Length);
+            CancellationTokenSource? cts = null;
             try
             {
-                int readLength = ReadAsync(new Memory<byte>(rentedBuffer, 0, buffer.Length)).AsTask().GetAwaiter().GetResult();
+                if (_readTimeout > 0)
+                {
+                    cts = new CancellationTokenSource(_readTimeout);
+                }
+                int readLength = ReadAsync(new Memory<byte>(rentedBuffer, 0, buffer.Length), cts != null ? cts.Token : default).AsTask().GetAwaiter().GetResult();
                 rentedBuffer.AsSpan(0, readLength).CopyTo(buffer);
                 return readLength;
             }
+            catch (OperationCanceledException) when (cts != null && cts.IsCancellationRequested)
+            {
+                // sync operations do not have Cancellation
+                throw new IOException(SR.net_quic_timeout);
+            }
             finally
             {
                 ArrayPool<byte>.Shared.Return(rentedBuffer);
+                cts?.Dispose();
             }
         }
 
         internal override void Write(ReadOnlySpan<byte> buffer)
         {
             ThrowIfDisposed();
+            CancellationTokenSource? cts = null;
+
+
+            if (_writeTimeout > 0)
+            {
+                cts = new CancellationTokenSource(_writeTimeout);
+            }
 
             // TODO: optimize this.
-            WriteAsync(buffer.ToArray()).AsTask().GetAwaiter().GetResult();
+            try
+            {
+                WriteAsync(buffer.ToArray()).AsTask().GetAwaiter().GetResult();
+            }
+            catch (OperationCanceledException) when (cts != null && cts.IsCancellationRequested)
+            {
+                // sync operations do not have Cancellation
+                throw new IOException(SR.net_quic_timeout);
+            }
+            finally
+            {
+                cts?.Dispose();
+            }
         }
 
         // MsQuic doesn't support explicit flushing
index a90bcf9bb89a3ec9c26714d8b98030f36d1af04a..f011e561855ee3806cdff6babf45315df968501e 100644 (file)
@@ -11,8 +11,12 @@ namespace System.Net.Quic.Implementations
     {
         internal abstract long StreamId { get; }
 
+        internal abstract bool CanTimeout { get; }
+
         internal abstract bool CanRead { get; }
 
+        internal abstract int ReadTimeout { get; set; }
+
         internal abstract int Read(Span<byte> buffer);
 
         internal abstract ValueTask<int> ReadAsync(Memory<byte> buffer, CancellationToken cancellationToken = default);
@@ -25,6 +29,8 @@ namespace System.Net.Quic.Implementations
 
         internal abstract void Write(ReadOnlySpan<byte> buffer);
 
+        internal abstract int WriteTimeout { get; set; }
+
         internal abstract ValueTask WriteAsync(ReadOnlyMemory<byte> buffer, CancellationToken cancellationToken = default);
 
         internal abstract ValueTask WriteAsync(ReadOnlyMemory<byte> buffer, bool endStream, CancellationToken cancellationToken = default);
index 778a4cd1efa2d025a6c53ff609f14f1e4e4c982f..55ba9953260f031d671f75e1cfcbb61e8854432e 100644 (file)
@@ -79,6 +79,20 @@ namespace System.Net.Quic
 
         public override void Write(ReadOnlySpan<byte> buffer) => _provider.Write(buffer);
 
+        public override bool CanTimeout => _provider.CanTimeout;
+
+        public override int ReadTimeout
+        {
+            get => _provider.ReadTimeout;
+            set => _provider.ReadTimeout = value;
+        }
+
+        public override int WriteTimeout
+        {
+            get => _provider.WriteTimeout;
+            set => _provider.WriteTimeout = value;
+        }
+
         public override ValueTask WriteAsync(ReadOnlyMemory<byte> buffer, CancellationToken cancellationToken = default) => _provider.WriteAsync(buffer, cancellationToken);
 
         public override void Flush() => _provider.Flush();
index 9ba6c5a46c807ee88325027d064fd1bc6fef3fd9..a100b2155674f23c4d7f93577300242bf84511f5 100644 (file)
@@ -26,6 +26,7 @@ namespace System.Net.Quic.Tests
         protected override QuicImplementationProvider Provider => QuicImplementationProviders.MsQuic;
         protected override bool UsableAfterCanceledReads => false;
         protected override bool BlocksOnZeroByteReads => true;
+        protected override bool CanTimeout => true;
 
         public MsQuicQuicStreamConformanceTests(ITestOutputHelper output)
         {