[retry only] Additional HTTP/2 connections created when active streams limit is reach...
authorAlexander Nikolaev <55398552+alnikola@users.noreply.github.com>
Tue, 28 Jul 2020 10:43:39 +0000 (12:43 +0200)
committerGitHub <noreply@github.com>
Tue, 28 Jul 2020 10:43:39 +0000 (12:43 +0200)
HTTP/2 standard commands clients to not open more than one HTTP/2 connection to the same server. At the same time, server has right to limit the maximum number of active streams per that HTTP/2 connection. These two directives combined impose limit on the number of requests concurrently send to the server. This limitation is justified in client to server scenarios, but become a bottleneck in server to server cases like gRPC. This PR introduces a new SocketsHttpHandler API enabling establishing additional HTTP/2 connections to the same server when the maximum stream limit is reached on the existing ones.

**Note**. This algorithm version uses only retries to make request choose another  connection when all stream slots are occupied. It does not implement stream credit management in `HttpConnectionPool` and therefore exhibit a sub-optimal request scheduling behavior in "request burst" and "infinite requests" scenarios.

Fixes #35088

src/libraries/Common/tests/System/Net/Http/Http2LoopbackConnection.cs
src/libraries/Common/tests/System/Net/Http/Http2LoopbackServer.cs
src/libraries/System.Net.Http/ref/System.Net.Http.cs
src/libraries/System.Net.Http/src/System/Net/Http/BrowserHttpHandler/SocketsHttpHandler.cs
src/libraries/System.Net.Http/src/System/Net/Http/RequestRetryType.cs
src/libraries/System.Net.Http/src/System/Net/Http/SocketsHttpHandler/CreditManager.cs
src/libraries/System.Net.Http/src/System/Net/Http/SocketsHttpHandler/Http2Connection.cs
src/libraries/System.Net.Http/src/System/Net/Http/SocketsHttpHandler/HttpConnectionPool.cs
src/libraries/System.Net.Http/src/System/Net/Http/SocketsHttpHandler/HttpConnectionSettings.cs
src/libraries/System.Net.Http/src/System/Net/Http/SocketsHttpHandler/SocketsHttpHandler.cs
src/libraries/System.Net.Http/tests/FunctionalTests/SocketsHttpHandlerTest.cs

index 3750a36..3bc2097 100644 (file)
@@ -21,18 +21,25 @@ namespace System.Net.Test.Common
         private Stream _connectionStream;
         private TaskCompletionSource<bool> _ignoredSettingsAckPromise;
         private bool _ignoreWindowUpdates;
-        public static TimeSpan Timeout => Http2LoopbackServer.Timeout;
+        private readonly TimeSpan _timeout;
         private int _lastStreamId;
 
         private readonly byte[] _prefix;
         public string PrefixString => Encoding.UTF8.GetString(_prefix, 0, _prefix.Length);
         public bool IsInvalid => _connectionSocket == null;
         public Stream Stream => _connectionStream;
+        public Task<bool> SettingAckWaiter => _ignoredSettingsAckPromise?.Task;
 
         public Http2LoopbackConnection(Socket socket, Http2Options httpOptions)
+            : this(socket, httpOptions, Http2LoopbackServer.Timeout)
+        {
+        }
+
+        public Http2LoopbackConnection(Socket socket, Http2Options httpOptions, TimeSpan timeout)
         {
             _connectionSocket = socket;
             _connectionStream = new NetworkStream(_connectionSocket, true);
+            _timeout = timeout;
 
             if (httpOptions.UseSsl)
             {
@@ -81,12 +88,12 @@ namespace System.Net.Test.Common
             await WriteFrameAsync(emptySettings).ConfigureAwait(false);
 
             // Receive and ACK the client settings frame.
-            Frame clientSettings = await ReadFrameAsync(Timeout).ConfigureAwait(false);
+            Frame clientSettings = await ReadFrameAsync(_timeout).ConfigureAwait(false);
             clientSettings.Flags = clientSettings.Flags | FrameFlags.Ack;
             await WriteFrameAsync(clientSettings).ConfigureAwait(false);
 
             // Receive the client ACK of the server settings frame.
-            clientSettings = await ReadFrameAsync(Timeout).ConfigureAwait(false);
+            clientSettings = await ReadFrameAsync(_timeout).ConfigureAwait(false);
         }
 
         public async Task WriteFrameAsync(Frame frame)
@@ -225,7 +232,7 @@ namespace System.Net.Test.Common
 
         public async Task ReadRstStreamAsync(int streamId)
         {
-            Frame frame = await ReadFrameAsync(Timeout);
+            Frame frame = await ReadFrameAsync(_timeout);
 
             if (frame == null)
             {
@@ -248,7 +255,7 @@ namespace System.Net.Test.Common
         {
             IgnoreWindowUpdates();
 
-            Frame frame = await ReadFrameAsync(Timeout).ConfigureAwait(false);
+            Frame frame = await ReadFrameAsync(_timeout).ConfigureAwait(false);
             if (frame != null)
             {
                 if (!ignoreUnexpectedFrames)
@@ -310,7 +317,7 @@ namespace System.Net.Test.Common
         public async Task<HeadersFrame> ReadRequestHeaderFrameAsync()
         {
             // Receive HEADERS frame for request.
-            Frame frame = await ReadFrameAsync(Timeout).ConfigureAwait(false);
+            Frame frame = await ReadFrameAsync(_timeout).ConfigureAwait(false);
             if (frame == null)
             {
                 throw new IOException("Failed to read Headers frame.");
@@ -476,7 +483,7 @@ namespace System.Net.Test.Common
 
             do
             {
-                frame = await ReadFrameAsync(Timeout).ConfigureAwait(false);
+                frame = await ReadFrameAsync(_timeout).ConfigureAwait(false);
                 if (frame == null && expectEndOfStream)
                 {
                     break;
@@ -516,7 +523,7 @@ namespace System.Net.Test.Common
             HttpRequestData requestData = new HttpRequestData();
 
             // Receive HEADERS frame for request.
-            Frame frame = await ReadFrameAsync(Timeout).ConfigureAwait(false);
+            Frame frame = await ReadFrameAsync(_timeout).ConfigureAwait(false);
             if (frame == null)
             {
                 throw new IOException("Failed to read Headers frame.");
@@ -567,7 +574,7 @@ namespace System.Net.Test.Common
             byte[] pingData = new byte[8] { 1, 2, 3, 4, 50, 60, 70, 80 };
             PingFrame ping = new PingFrame(pingData, FrameFlags.None, 0);
             await WriteFrameAsync(ping).ConfigureAwait(false);
-            PingFrame pingAck = (PingFrame)await ReadFrameAsync(Timeout).ConfigureAwait(false);
+            PingFrame pingAck = (PingFrame)await ReadFrameAsync(_timeout).ConfigureAwait(false);
             if (pingAck == null || pingAck.Type != FrameType.Ping || !pingAck.AckFlag)
             {
                 throw new Exception("Expected PING ACK");
index a3ff123..8516140 100644 (file)
@@ -74,7 +74,12 @@ namespace System.Net.Test.Common
             _connections.RemoveAll((c) => c.IsInvalid);
         }
 
-        public async Task<Http2LoopbackConnection> AcceptConnectionAsync()
+        public Task<Http2LoopbackConnection> AcceptConnectionAsync()
+        {
+            return AcceptConnectionAsync(null);
+        }
+
+        public async Task<Http2LoopbackConnection> AcceptConnectionAsync(TimeSpan? timeout)
         {
             RemoveInvalidConnections();
 
@@ -85,7 +90,7 @@ namespace System.Net.Test.Common
 
             Socket connectionSocket = await _listenSocket.AcceptAsync().ConfigureAwait(false);
 
-            Http2LoopbackConnection connection = new Http2LoopbackConnection(connectionSocket, _options);
+            Http2LoopbackConnection connection = timeout != null ? new Http2LoopbackConnection(connectionSocket, _options, timeout.Value) : new Http2LoopbackConnection(connectionSocket, _options);
             _connections.Add(connection);
 
             return connection;
@@ -96,15 +101,25 @@ namespace System.Net.Test.Common
             return await EstablishConnectionAsync();
         }
 
-        public async Task<Http2LoopbackConnection> EstablishConnectionAsync(params SettingsEntry[] settingsEntries)
+        public Task<Http2LoopbackConnection> EstablishConnectionAsync(params SettingsEntry[] settingsEntries)
         {
-            (Http2LoopbackConnection connection, _) = await EstablishConnectionGetSettingsAsync(settingsEntries).ConfigureAwait(false);
+            return EstablishConnectionAsync(null, null, settingsEntries);
+        }
+
+        public async Task<Http2LoopbackConnection> EstablishConnectionAsync(TimeSpan? timeout, TimeSpan? ackTimeout, params SettingsEntry[] settingsEntries)
+        {
+            (Http2LoopbackConnection connection, _) = await EstablishConnectionGetSettingsAsync(timeout, ackTimeout, settingsEntries).ConfigureAwait(false);
             return connection;
         }
 
-        public async Task<(Http2LoopbackConnection, SettingsFrame)> EstablishConnectionGetSettingsAsync(params SettingsEntry[] settingsEntries)
+        public Task<(Http2LoopbackConnection, SettingsFrame)> EstablishConnectionGetSettingsAsync(params SettingsEntry[] settingsEntries)
+        {
+            return EstablishConnectionGetSettingsAsync(null, null, settingsEntries);
+        }
+
+        public async Task<(Http2LoopbackConnection, SettingsFrame)> EstablishConnectionGetSettingsAsync(TimeSpan? timeout, TimeSpan? ackTimeout, params SettingsEntry[] settingsEntries)
         {
-            Http2LoopbackConnection connection = await AcceptConnectionAsync().ConfigureAwait(false);
+            Http2LoopbackConnection connection = await AcceptConnectionAsync(timeout).ConfigureAwait(false);
 
             // Receive the initial client settings frame.
             Frame receivedFrame = await connection.ReadFrameAsync(Timeout).ConfigureAwait(false);
@@ -129,7 +144,7 @@ namespace System.Net.Test.Common
             await connection.WriteFrameAsync(settingsAck).ConfigureAwait(false);
 
             // The client will send us a SETTINGS ACK eventually, but not necessarily right away.
-            await connection.ExpectSettingsAckAsync();
+            await connection.ExpectSettingsAckAsync((int) (ackTimeout?.TotalMilliseconds ?? 5000));
 
             return (connection, clientSettingsFrame);
         }
index dc1e77f..3386d32 100644 (file)
@@ -310,6 +310,7 @@ namespace System.Net.Http
         protected override void Dispose(bool disposing) { }
         protected internal override System.Net.Http.HttpResponseMessage Send(System.Net.Http.HttpRequestMessage request, System.Threading.CancellationToken cancellationToken) { throw null; }
         protected internal override System.Threading.Tasks.Task<System.Net.Http.HttpResponseMessage> SendAsync(System.Net.Http.HttpRequestMessage request, System.Threading.CancellationToken cancellationToken) { throw null; }
+        public bool EnableMultipleHttp2Connections { get { throw null; } set { } }
     }
     public partial class StreamContent : System.Net.Http.HttpContent
     {
index 6f47475..4644b8b 100644 (file)
@@ -131,5 +131,11 @@ namespace System.Net.Http
 
         protected internal override Task<HttpResponseMessage> SendAsync(
             HttpRequestMessage request, CancellationToken cancellationToken) => throw new PlatformNotSupportedException();
+
+        public bool EnableMultipleHttp2Connections
+        {
+            get => throw new PlatformNotSupportedException();
+            set => throw new PlatformNotSupportedException();
+        }
     }
 }
index 4456e4f..f2e29c4 100644 (file)
@@ -26,6 +26,10 @@ namespace System.Net.Http
         /// <summary>
         /// The proxy failed, so the request should be retried on the next proxy.
         /// </summary>
-        RetryOnNextProxy
+        RetryOnNextProxy,
+
+        /// The HTTP/2 connection reached the maximum number of streams and
+        /// another HTTP/2 connection must be created or found to serve the request.
+        RetryOnNextConnection
     }
 }
index 0439b51..f6d37ae 100644 (file)
@@ -28,6 +28,8 @@ namespace System.Net.Http
             _current = initialCredit;
         }
 
+        public bool IsCreditAvailable => Volatile.Read(ref _current) > 0;
+
         private object SyncObject
         {
             // Generally locking on "this" is considered poor form, but this type is internal,
@@ -35,23 +37,23 @@ namespace System.Net.Http
             get => this;
         }
 
-        public ValueTask<int> RequestCreditAsync(int amount, CancellationToken cancellationToken)
+        public bool TryRequestCreditNoWait(int amount)
         {
             lock (SyncObject)
             {
-                if (_disposed)
-                {
-                    throw new ObjectDisposedException($"{nameof(CreditManager)}:{_owner.GetType().Name}:{_name}");
-                }
+                return TryRequestCreditNoLock(amount) > 0;
+            }
+        }
 
+        public ValueTask<int> RequestCreditAsync(int amount, CancellationToken cancellationToken)
+        {
+            lock (SyncObject)
+            {
                 // If we can satisfy the request with credit already available, do so synchronously.
-                if (_current > 0)
-                {
-                    Debug.Assert(_waitersTail is null, "Shouldn't have waiters when credit is available");
+                int granted = TryRequestCreditNoLock(amount);
 
-                    int granted = Math.Min(amount, _current);
-                    if (NetEventSource.Log.IsEnabled()) _owner.Trace($"{_name}. requested={amount}, current={_current}, granted={granted}");
-                    _current -= granted;
+                if (granted > 0)
+                {
                     return new ValueTask<int>(granted);
                 }
 
@@ -155,5 +157,26 @@ namespace System.Net.Http
                 }
             }
         }
+
+        private int TryRequestCreditNoLock(int amount)
+        {
+            Debug.Assert(Monitor.IsEntered(SyncObject), "Shouldn't be called outside lock.");
+
+            if (_disposed)
+            {
+                throw new ObjectDisposedException($"{nameof(CreditManager)}:{_owner.GetType().Name}:{_name}");
+            }
+
+            if (_current > 0)
+            {
+                Debug.Assert(_waitersTail is null, "Shouldn't have waiters when credit is available");
+
+                int granted = Math.Min(amount, _current);
+                if (NetEventSource.Log.IsEnabled()) _owner.Trace($"{_name}. requested={amount}, current={_current}, granted={granted}");
+                _current -= granted;
+                return granted;
+            }
+            return 0;
+        }
     }
 }
index 5432df7..83f7953 100644 (file)
@@ -114,12 +114,15 @@ namespace System.Net.Http
             _initialWindowSize = DefaultInitialWindowSize;
             _maxConcurrentStreams = int.MaxValue;
             _pendingWindowUpdate = 0;
+            _idleSinceTickCount = Environment.TickCount64;
 
             if (NetEventSource.Log.IsEnabled()) TraceConnection(stream);
         }
 
         private object SyncObject => _httpStreams;
 
+        public bool CanAddNewStream => _concurrentStreams.IsCreditAvailable;
+
         public async ValueTask SetupAsync()
         {
             _outgoingBuffer.EnsureAvailableSpace(s_http2ConnectionPreface.Length +
@@ -1203,7 +1206,17 @@ namespace System.Net.Http
             // in order to avoid consuming resources in potentially many requests waiting for access.
             try
             {
-                await _concurrentStreams.RequestCreditAsync(1, cancellationToken).ConfigureAwait(false);
+                if (_pool.EnableMultipleHttp2Connections)
+                {
+                    if (!_concurrentStreams.TryRequestCreditNoWait(1))
+                    {
+                        throw new HttpRequestException(null, null, RequestRetryType.RetryOnNextConnection);
+                    }
+                }
+                else
+                {
+                    await _concurrentStreams.RequestCreditAsync(1, cancellationToken).ConfigureAwait(false);
+                }
             }
             catch (ObjectDisposedException)
             {
index a363430..072c9c2 100644 (file)
@@ -66,7 +66,8 @@ namespace System.Net.Http
         private readonly int _maxConnections;
 
         private bool _http2Enabled;
-        private Http2Connection? _http2Connection;
+        // This array must be treated as immutable. It can only be replaced with a new value in AddHttp2Connection method.
+        private volatile Http2Connection[]? _http2Connections;
         private SemaphoreSlim? _http2ConnectionCreateLock;
         private byte[]? _http2AltSvcOriginUri;
         internal readonly byte[]? _http2EncodedAuthorityHostHeader;
@@ -329,6 +330,8 @@ namespace System.Net.Http
             }
         }
 
+        public bool EnableMultipleHttp2Connections => _poolManager.Settings.EnableMultipleHttp2Connections;
+
         /// <summary>Object used to synchronize access to state in the pool.</summary>
         private object SyncObj => _idleConnections;
 
@@ -483,24 +486,14 @@ namespace System.Net.Http
             Debug.Assert(_kind == HttpConnectionKind.Https || _kind == HttpConnectionKind.SslProxyTunnel || _kind == HttpConnectionKind.Http);
 
             // See if we have an HTTP2 connection
-            Http2Connection? http2Connection = _http2Connection;
+            Http2Connection? http2Connection = GetExistingHttp2Connection();
 
             if (http2Connection != null)
             {
-                TimeSpan pooledConnectionLifetime = _poolManager.Settings._pooledConnectionLifetime;
-                if (http2Connection.LifetimeExpired(Environment.TickCount64, pooledConnectionLifetime))
-                {
-                    // Connection expired.
-                    http2Connection.Dispose();
-                    InvalidateHttp2Connection(http2Connection);
-                }
-                else
-                {
-                    // Connection exist and it is still good to use.
-                    if (NetEventSource.Log.IsEnabled()) Trace("Using existing HTTP2 connection.");
-                    _usedSinceLastCleanup = true;
-                    return (http2Connection, false, null);
-                }
+                // Connection exists and it is still good to use.
+                if (NetEventSource.Log.IsEnabled()) Trace("Using existing HTTP2 connection.");
+                _usedSinceLastCleanup = true;
+                return (http2Connection, false, null);
             }
 
             // Ensure that the connection creation semaphore is created
@@ -524,16 +517,10 @@ namespace System.Net.Http
             await _http2ConnectionCreateLock.WaitAsync(cancellationToken).ConfigureAwait(false);
             try
             {
-                if (_http2Connection != null)
+                http2Connection = GetExistingHttp2Connection();
+                if (http2Connection != null)
                 {
-                    // Someone beat us to it
-
-                    if (NetEventSource.Log.IsEnabled())
-                    {
-                        Trace("Using existing HTTP2 connection.");
-                    }
-
-                    return (_http2Connection, false, null);
+                    return (http2Connection, false, null);
                 }
 
                 // Recheck if HTTP2 has been disabled by a previous attempt.
@@ -558,15 +545,14 @@ namespace System.Net.Http
                         http2Connection = new Http2Connection(this, stream!);
                         await http2Connection.SetupAsync().ConfigureAwait(false);
 
-                        Debug.Assert(_http2Connection == null);
-                        _http2Connection = http2Connection;
+                        AddHttp2Connection(http2Connection);
 
                         if (NetEventSource.Log.IsEnabled())
                         {
                             Trace("New unencrypted HTTP2 connection established.");
                         }
 
-                        return (_http2Connection, true, null);
+                        return (http2Connection, true, null);
                     }
 
                     sslStream = (SslStream)stream!;
@@ -582,15 +568,14 @@ namespace System.Net.Http
                         http2Connection = new Http2Connection(this, sslStream);
                         await http2Connection.SetupAsync().ConfigureAwait(false);
 
-                        Debug.Assert(_http2Connection == null);
-                        _http2Connection = http2Connection;
+                        AddHttp2Connection(http2Connection);
 
                         if (NetEventSource.Log.IsEnabled())
                         {
                             Trace("New HTTP2 connection established.");
                         }
 
-                        return (_http2Connection, true, null);
+                        return (http2Connection, true, null);
                     }
                 }
             }
@@ -648,6 +633,53 @@ namespace System.Net.Http
             return await GetHttpConnectionAsync(request, async, cancellationToken).ConfigureAwait(false);
         }
 
+        private Http2Connection? GetExistingHttp2Connection()
+        {
+            Http2Connection[]? localConnections = _http2Connections;
+
+            if (localConnections == null)
+            {
+                return null;
+            }
+
+            for (int i = 0; i < localConnections.Length; i++)
+            {
+                Http2Connection http2Connection = localConnections[i];
+
+                TimeSpan pooledConnectionLifetime = _poolManager.Settings._pooledConnectionLifetime;
+                if (http2Connection.LifetimeExpired(Environment.TickCount64, pooledConnectionLifetime))
+                {
+                    // Connection expired.
+                    http2Connection.Dispose();
+                    InvalidateHttp2Connection(http2Connection);
+                }
+                else if (!EnableMultipleHttp2Connections || http2Connection.CanAddNewStream)
+                {
+                    return http2Connection;
+                }
+            }
+
+            return null;
+        }
+
+        private void AddHttp2Connection(Http2Connection newConnection)
+        {
+            lock (SyncObj)
+            {
+                Http2Connection[]? localHttp2Connections = _http2Connections;
+                int newCollectionSize = localHttp2Connections == null ? 1 : localHttp2Connections.Length + 1;
+                Http2Connection[] newHttp2Connections = new Http2Connection[newCollectionSize];
+                newHttp2Connections[0] = newConnection;
+
+                if (localHttp2Connections != null)
+                {
+                    Array.Copy(localHttp2Connections, 0, newHttp2Connections, 1, localHttp2Connections.Length);
+                }
+
+                _http2Connections = newHttp2Connections;
+            }
+        }
+
         private async ValueTask<(HttpConnectionBase? connection, bool isNewConnection, HttpResponseMessage? failureResponse)>
             GetHttp3ConnectionAsync(HttpRequestMessage request, HttpAuthority authority, CancellationToken cancellationToken)
         {
@@ -794,6 +826,16 @@ namespace System.Net.Http
                     // Eat exception and try again.
                     continue;
                 }
+                catch (HttpRequestException e) when (e.AllowRetry == RequestRetryType.RetryOnNextConnection)
+                {
+                    if (NetEventSource.Log.IsEnabled())
+                    {
+                        Trace($"Retrying request on another HTTP/2 connection after active streams limit is reached on existing one: {e}");
+                    }
+
+                    // Eat exception and try again.
+                    continue;
+                }
 
                 // Check for the Alt-Svc header, to upgrade to HTTP/3.
                 if (_altSvcEnabled && response.Headers.TryGetValues(KnownHeaders.AltSvc.Descriptor, out IEnumerable<string>? altSvcHeaderValues))
@@ -1355,9 +1397,39 @@ namespace System.Net.Http
         {
             lock (SyncObj)
             {
-                if (_http2Connection == connection)
+                Http2Connection[]? localHttp2Connections = _http2Connections;
+
+                if (localHttp2Connections == null)
+                {
+                    return;
+                }
+
+                if (localHttp2Connections.Length == 1)
+                {
+                    // Fast shortcut for the most common case.
+                    if (localHttp2Connections[0] == connection)
+                    {
+                        _http2Connections = null;
+                    }
+                    return;
+                }
+
+                int invalidatedIndex = Array.IndexOf(localHttp2Connections, connection);
+                if (invalidatedIndex >= 0)
                 {
-                    _http2Connection = null;
+                    Http2Connection[] newHttp2Connections = new Http2Connection[localHttp2Connections.Length - 1];
+
+                    if (invalidatedIndex > 0)
+                    {
+                        Array.Copy(localHttp2Connections, newHttp2Connections, invalidatedIndex);
+                    }
+
+                    if (invalidatedIndex < localHttp2Connections.Length - 1)
+                    {
+                        Array.Copy(localHttp2Connections, invalidatedIndex + 1, newHttp2Connections, invalidatedIndex, newHttp2Connections.Length - invalidatedIndex);
+                    }
+
+                    _http2Connections = newHttp2Connections;
                 }
             }
         }
@@ -1389,10 +1461,13 @@ namespace System.Net.Http
                     list.ForEach(c => c._connection.Dispose());
                     list.Clear();
 
-                    if (_http2Connection != null)
+                    if (_http2Connections != null)
                     {
-                        _http2Connection.Dispose();
-                        _http2Connection = null;
+                        for (int i = 0; i < _http2Connections.Length; i++)
+                        {
+                            _http2Connections[i].Dispose();
+                        }
+                        _http2Connections = null;
                     }
 
                     if (_authorityExpireTimer != null)
@@ -1436,15 +1511,51 @@ namespace System.Net.Http
                 // Get the current time.  This is compared against each connection's last returned
                 // time to determine whether a connection is too old and should be closed.
                 long nowTicks = Environment.TickCount64;
-                Http2Connection? http2Connection = _http2Connection;
+                // Copy the reference to a local variable to simplify the removal logic below.
+                Http2Connection[]? localHttp2Connections = _http2Connections;
 
-                if (http2Connection != null)
+                if (localHttp2Connections != null)
                 {
-                    if (http2Connection.IsExpired(nowTicks, pooledConnectionLifetime, pooledConnectionIdleTimeout))
+                    Http2Connection[]? newHttp2Connections = null;
+                    int newIndex = 0;
+                    for (int i = 0; i < localHttp2Connections.Length; i++)
+                    {
+                        Http2Connection http2Connection = localHttp2Connections[i];
+                        if (http2Connection.IsExpired(nowTicks, pooledConnectionLifetime, pooledConnectionIdleTimeout))
+                        {
+                            http2Connection.Dispose();
+
+                            if (newHttp2Connections == null)
+                            {
+                                newHttp2Connections = new Http2Connection[localHttp2Connections.Length];
+                                if (i > 0)
+                                {
+                                    // Copy valid connections residing at the beggining of the current collection.
+                                    Array.Copy(localHttp2Connections, newHttp2Connections, i);
+                                    newIndex = i;
+                                }
+                            }
+                        }
+                        else if (newHttp2Connections != null)
+                        {
+                            newHttp2Connections[newIndex] = localHttp2Connections[i];
+                            newIndex++;
+                        }
+                    }
+
+                    if (newHttp2Connections != null)
                     {
-                        http2Connection.Dispose();
-                        // We can set _http2Connection directly while holding lock instead of calling InvalidateHttp2Connection().
-                        _http2Connection = null;
+                        //Some connections have been removed, so _http2Connections must be replaced.
+                        if (newIndex > 0)
+                        {
+                            Array.Resize(ref newHttp2Connections, newIndex);
+                            _http2Connections = newHttp2Connections;
+                        }
+                        else
+                        {
+                            // All connections expired.
+                            _http2Connections = null;
+                        }
                     }
                 }
 
@@ -1493,7 +1604,7 @@ namespace System.Net.Http
                     // if a pool was used since the last time we cleaned up, give it another chance. New pools
                     // start out saying they've recently been used, to give them a bit of breathing room and time
                     // for the initial collection to be added to it.
-                    if (_associatedConnectionCount == 0 && !_usedSinceLastCleanup && _http2Connection == null)
+                    if (_associatedConnectionCount == 0 && !_usedSinceLastCleanup && _http2Connections == null)
                     {
                         Debug.Assert(list.Count == 0, $"Expected {nameof(list)}.{nameof(list.Count)} == 0");
                         _disposed = true;
index 959f82b..4ba7696 100644 (file)
@@ -52,6 +52,8 @@ namespace System.Net.Http
 
         internal SslClientAuthenticationOptions? _sslOptions;
 
+        internal bool _enableMultipleHttp2Connections;
+
         internal IDictionary<string, object?>? _properties;
 
         public HttpConnectionSettings()
@@ -101,7 +103,8 @@ namespace System.Net.Http
                 _useCookies = _useCookies,
                 _useProxy = _useProxy,
                 _allowUnencryptedHttp2 = _allowUnencryptedHttp2,
-                _assumePrenegotiatedHttp3ForTesting = _assumePrenegotiatedHttp3ForTesting
+                _assumePrenegotiatedHttp3ForTesting = _assumePrenegotiatedHttp3ForTesting,
+                _enableMultipleHttp2Connections = _enableMultipleHttp2Connections
             };
         }
 
@@ -183,6 +186,8 @@ namespace System.Net.Http
             }
         }
 
+        public bool EnableMultipleHttp2Connections => _enableMultipleHttp2Connections;
+
         private byte[]? _http3SettingsFrame;
         internal byte[] Http3SettingsFrame => _http3SettingsFrame ??= Http3Connection.BuildSettingsFrame(this);
     }
index 9b122a6..1874723 100644 (file)
@@ -273,6 +273,17 @@ namespace System.Net.Http
             }
         }
 
+        public bool EnableMultipleHttp2Connections
+        {
+            get => _settings._enableMultipleHttp2Connections;
+            set
+            {
+                CheckDisposedOrStarted();
+
+                _settings._enableMultipleHttp2Connections = value;
+            }
+        }
+
         internal bool SupportsAutomaticDecompression => true;
         internal bool SupportsProxy => true;
         internal bool SupportsRedirectConfiguration => true;
index ae32a02..3a4aef6 100644 (file)
@@ -1925,6 +1925,300 @@ namespace System.Net.Http.Functional.Tests
     public sealed class SocketsHttpHandlerTest_Http2 : HttpClientHandlerTest_Http2
     {
         public SocketsHttpHandlerTest_Http2(ITestOutputHelper output) : base(output) { }
+
+        [ConditionalFact(nameof(SupportsAlpn))]
+        public async Task Http2_MultipleConnectionsEnabled_ConnectionLimitNotReached_ConcurrentRequestsSuccessfullyHandled()
+        {
+            const int MaxConcurrentStreams = 2;
+            using Http2LoopbackServer server = Http2LoopbackServer.CreateServer();
+            using SocketsHttpHandler handler = CreateHandler();
+            using (HttpClient client = CreateHttpClient(handler))
+            {
+                server.AllowMultipleConnections = true;
+                List<Task<HttpResponseMessage>> sendTasks = new List<Task<HttpResponseMessage>>();
+                List<Http2LoopbackConnection> connections = new List<Http2LoopbackConnection>();
+                List<int> acceptedStreams = new List<int>();
+                for (int i = 0; i < 3; i++)
+                {
+                    Http2LoopbackConnection connection = await PrepareConnection(server, client, MaxConcurrentStreams).ConfigureAwait(false);
+                    AcquireAllStreamSlots(server, client, sendTasks, MaxConcurrentStreams);
+                    connections.Add(connection);
+                    int prevAcceptedStreamCount = acceptedStreams.Count;
+                    acceptedStreams.AddRange(await AcceptRequests(connection, MaxConcurrentStreams).ConfigureAwait(false));
+                    Assert.Equal(prevAcceptedStreamCount + MaxConcurrentStreams, acceptedStreams.Count);
+                }
+
+                int responseIndex = 0;
+                List<Task> responseTasks = new List<Task>();
+                foreach (Http2LoopbackConnection connection in connections)
+                {
+                    for (int i = 0; i < MaxConcurrentStreams; i++)
+                    {
+                        int streamId = acceptedStreams[responseIndex++];
+                        responseTasks.Add(connection.SendDefaultResponseAsync(streamId));
+                    }
+                }
+
+                await Task.WhenAll(responseTasks).TimeoutAfter(TestHelper.PassingTestTimeoutMilliseconds).ConfigureAwait(false);
+                await Task.WhenAll(sendTasks).TimeoutAfter(TestHelper.PassingTestTimeoutMilliseconds).ConfigureAwait(false);
+
+                await VerifySendTasks(sendTasks).ConfigureAwait(false);
+            }
+        }
+
+        [ConditionalFact(nameof(SupportsAlpn))]
+        public async Task Http2_MultipleConnectionsEnabled_InfiniteRequestsCompletelyBlockOneConnection_RemaningRequestsAreHandledByNewConnection()
+        {
+            const int MaxConcurrentStreams = 2;
+            using Http2LoopbackServer server = Http2LoopbackServer.CreateServer();
+            using SocketsHttpHandler handler = CreateHandler();
+            using (HttpClient client = CreateHttpClient(handler))
+            {
+                server.AllowMultipleConnections = true;
+                List<Task<HttpResponseMessage>> sendTasks = new List<Task<HttpResponseMessage>>();
+                Http2LoopbackConnection connection0 = await PrepareConnection(server, client, MaxConcurrentStreams).ConfigureAwait(false);
+                AcquireAllStreamSlots(server, client, sendTasks, MaxConcurrentStreams);
+
+                // Block the first connection on infinite requests.
+                List<int> blockedStreamIds = await AcceptRequests(connection0, MaxConcurrentStreams).ConfigureAwait(false);
+                Assert.Equal(MaxConcurrentStreams, blockedStreamIds.Count);
+
+                Http2LoopbackConnection connection1 = await PrepareConnection(server, client, MaxConcurrentStreams).ConfigureAwait(false);
+                AcquireAllStreamSlots(server, client, sendTasks, MaxConcurrentStreams);
+
+                int handledRequestCount = (await HandleAllPendingRequests(connection1, MaxConcurrentStreams).ConfigureAwait(false)).Count;
+
+                Assert.Equal(MaxConcurrentStreams, handledRequestCount);
+
+                //Complete inifinite requests.
+                handledRequestCount = await SendResponses(connection0, blockedStreamIds);
+
+                Assert.Equal(MaxConcurrentStreams, handledRequestCount);
+
+                await Task.WhenAll(sendTasks).TimeoutAfter(TestHelper.PassingTestTimeoutMilliseconds).ConfigureAwait(false);
+
+                await VerifySendTasks(sendTasks).ConfigureAwait(false);
+            }
+        }
+
+        [ConditionalFact(nameof(SupportsAlpn))]
+        public async Task Http2_MultipleConnectionsEnabled_OpenAndCloseMultipleConnections_Success()
+        {
+            const int MaxConcurrentStreams = 2;
+            using Http2LoopbackServer server = Http2LoopbackServer.CreateServer();
+            using SocketsHttpHandler handler = CreateHandler();
+            using (HttpClient client = CreateHttpClient(handler))
+            {
+                server.AllowMultipleConnections = true;
+                List<Task<HttpResponseMessage>> sendTasks = new List<Task<HttpResponseMessage>>();
+                Http2LoopbackConnection connection0 = await PrepareConnection(server, client, MaxConcurrentStreams).ConfigureAwait(false);
+                AcquireAllStreamSlots(server, client, sendTasks, MaxConcurrentStreams);
+                Http2LoopbackConnection connection1 = await PrepareConnection(server, client, MaxConcurrentStreams).ConfigureAwait(false);
+                AcquireAllStreamSlots(server, client, sendTasks, MaxConcurrentStreams);
+                Http2LoopbackConnection connection2 = await PrepareConnection(server, client, MaxConcurrentStreams).ConfigureAwait(false);
+                AcquireAllStreamSlots(server, client, sendTasks, MaxConcurrentStreams);
+
+                Task<(int Count, int LastStreamId)>[] handleRequestTasks = new[] {
+                    HandleAllPendingRequests(connection0, sendTasks.Count),
+                    HandleAllPendingRequests(connection1, sendTasks.Count),
+                    HandleAllPendingRequests(connection2, sendTasks.Count)
+                };
+
+                await Task.WhenAll(handleRequestTasks).TimeoutAfter(TestHelper.PassingTestTimeoutMilliseconds).ConfigureAwait(false);
+
+                Assert.Equal(handleRequestTasks[0].Result.Count, MaxConcurrentStreams);
+                Assert.Equal(handleRequestTasks[1].Result.Count, MaxConcurrentStreams);
+                Assert.Equal(handleRequestTasks[2].Result.Count, MaxConcurrentStreams);
+
+                await connection0.ShutdownIgnoringErrorsAsync(handleRequestTasks[0].Result.LastStreamId).ConfigureAwait(false);
+                await connection2.ShutdownIgnoringErrorsAsync(handleRequestTasks[2].Result.LastStreamId).ConfigureAwait(false);
+
+                //Fill all connection1's stream slots
+                AcquireAllStreamSlots(server, client, sendTasks, MaxConcurrentStreams);
+
+                Http2LoopbackConnection connection3 = await PrepareConnection(server, client, MaxConcurrentStreams).ConfigureAwait(false);
+                AcquireAllStreamSlots(server, client, sendTasks, MaxConcurrentStreams);
+                Http2LoopbackConnection connection4 = await PrepareConnection(server, client, MaxConcurrentStreams).ConfigureAwait(false);
+                AcquireAllStreamSlots(server, client, sendTasks, MaxConcurrentStreams);
+
+                Task<(int Count, int LastStreamId)>[] finalHandleTasks = new[] {
+                    HandleAllPendingRequests(connection1, sendTasks.Count),
+                    HandleAllPendingRequests(connection3, sendTasks.Count),
+                    HandleAllPendingRequests(connection4, sendTasks.Count)
+                };
+
+                await Task.WhenAll(finalHandleTasks).TimeoutAfter(TestHelper.PassingTestTimeoutMilliseconds).ConfigureAwait(false);
+
+                Assert.Equal(finalHandleTasks[0].Result.Count, MaxConcurrentStreams);
+                Assert.Equal(finalHandleTasks[1].Result.Count, MaxConcurrentStreams);
+                Assert.Equal(finalHandleTasks[2].Result.Count, MaxConcurrentStreams);
+
+                await Task.WhenAll(sendTasks).TimeoutAfter(TestHelper.PassingTestTimeoutMilliseconds).ConfigureAwait(false);
+
+                await VerifySendTasks(sendTasks).ConfigureAwait(false);
+            }
+        }
+
+        [ConditionalFact(nameof(SupportsAlpn))]
+        public async Task Http2_MultipleConnectionsEnabled_IdleConnectionTimeoutExpired_ConnectionRemovedAndNewCreated()
+        {
+            const int MaxConcurrentStreams = 2;
+            using Http2LoopbackServer server = Http2LoopbackServer.CreateServer();
+            using SocketsHttpHandler handler = CreateHandler();
+            handler.PooledConnectionIdleTimeout = TimeSpan.FromSeconds(5);
+            using (HttpClient client = CreateHttpClient(handler))
+            {
+                server.AllowMultipleConnections = true;
+                List<Task<HttpResponseMessage>> sendTasks = new List<Task<HttpResponseMessage>>();
+                Http2LoopbackConnection connection0 = await PrepareConnection(server, client, MaxConcurrentStreams).ConfigureAwait(false);
+                AcquireAllStreamSlots(server, client, sendTasks, MaxConcurrentStreams);
+                List<int> acceptedStreamIds = await AcceptRequests(connection0, MaxConcurrentStreams).ConfigureAwait(false);
+                Assert.Equal(MaxConcurrentStreams, acceptedStreamIds.Count);
+
+                List<Task<HttpResponseMessage>> connection1SendTasks = new List<Task<HttpResponseMessage>>();
+                Http2LoopbackConnection connection1 = await PrepareConnection(server, client, MaxConcurrentStreams, readTimeout: 10).ConfigureAwait(false);
+                AcquireAllStreamSlots(server, client, connection1SendTasks, MaxConcurrentStreams);
+                int handledRequests1 = (await HandleAllPendingRequests(connection1, MaxConcurrentStreams).ConfigureAwait(false)).Count;
+
+                Assert.Equal(MaxConcurrentStreams, handledRequests1);
+
+                // Complete all the requests.
+                await Task.WhenAll(connection1SendTasks).TimeoutAfter(TestHelper.PassingTestTimeoutMilliseconds).ConfigureAwait(false);
+                await VerifySendTasks(connection1SendTasks).ConfigureAwait(false);
+                connection1SendTasks.ForEach(t => t.Result.Dispose());
+
+                // Wait until the idle connection timeout expires.
+                await connection1.WaitForClientDisconnectAsync(false).TimeoutAfter(TestHelper.PassingTestTimeoutMilliseconds).ConfigureAwait(false);
+                // Client connection might be still alive, so send an extra request which will either land on the shutting down connection or on a new one.
+                try
+                {
+                    await client.GetAsync(server.Address).TimeoutAfter(handler.PooledConnectionIdleTimeout).ConfigureAwait(false);
+                }
+                catch (Exception)
+                {
+                    // Suppress all exceptions.
+                }
+
+                Assert.True(connection1.IsInvalid);
+                Assert.False(connection0.IsInvalid);
+
+                Http2LoopbackConnection connection2 = await PrepareConnection(server, client, MaxConcurrentStreams, readTimeout: 5, expectedWarpUpTasks:2).ConfigureAwait(false);
+
+                AcquireAllStreamSlots(server, client, sendTasks, MaxConcurrentStreams);
+
+                int handledRequests2 = (await HandleAllPendingRequests(connection2, MaxConcurrentStreams).ConfigureAwait(false)).Count;
+                Assert.Equal(MaxConcurrentStreams, handledRequests2);
+
+                //Make sure connection0 is still alive.
+                int handledRequests0 = await SendResponses(connection0, acceptedStreamIds).ConfigureAwait(false);
+                Assert.Equal(MaxConcurrentStreams, handledRequests0);
+
+                await Task.WhenAll(sendTasks).TimeoutAfter(TestHelper.PassingTestTimeoutMilliseconds).ConfigureAwait(false);
+
+                await VerifySendTasks(sendTasks).ConfigureAwait(false);
+            }
+        }
+
+        private async Task VerifySendTasks(IReadOnlyList<Task<HttpResponseMessage>> sendTasks)
+        {
+            foreach (Task<HttpResponseMessage> sendTask in sendTasks)
+            {
+                HttpResponseMessage response = await sendTask.ConfigureAwait(false);
+                Assert.Equal(HttpStatusCode.OK, response.StatusCode);
+            }
+        }
+
+        private static SocketsHttpHandler CreateHandler() => new SocketsHttpHandler
+        {
+            EnableMultipleHttp2Connections = true,
+            PooledConnectionIdleTimeout = TimeSpan.FromHours(1),
+            PooledConnectionLifetime = TimeSpan.FromHours(1),
+            SslOptions = { RemoteCertificateValidationCallback = delegate { return true; } }
+        };
+
+        private async Task<Http2LoopbackConnection> PrepareConnection(Http2LoopbackServer server, HttpClient client, uint maxConcurrentStreams, int readTimeout = 3, int expectedWarpUpTasks = 1)
+        {
+            Task<HttpResponseMessage> warmUpTask = client.GetAsync(server.Address);
+            Http2LoopbackConnection connection = await GetConnection(server, maxConcurrentStreams, readTimeout).TimeoutAfter(TestHelper.PassingTestTimeoutMilliseconds).ConfigureAwait(false);
+            // Wait until the client confirms MaxConcurrentStreams setting took into effect.
+            Task settingAckReceived = connection.SettingAckWaiter;
+            while (true)
+            {
+                Task handleRequestTask = HandleAllPendingRequests(connection, expectedWarpUpTasks);
+                await Task.WhenAll(warmUpTask, handleRequestTask).TimeoutAfter(TestHelper.PassingTestTimeoutMilliseconds).ConfigureAwait(false);
+                Assert.True(warmUpTask.Result.IsSuccessStatusCode);
+                warmUpTask.Result.Dispose();
+                if (settingAckReceived.IsCompleted)
+                {
+                    break;
+                }
+
+                warmUpTask = client.GetAsync(server.Address);
+            }
+            return connection;
+        }
+
+        private static void AcquireAllStreamSlots(Http2LoopbackServer server, HttpClient client, List<Task<HttpResponseMessage>> sendTasks, uint maxConcurrentStreams)
+        {
+            for (int i = 0; i < maxConcurrentStreams; i++)
+            {
+                sendTasks.Add(client.GetAsync(server.Address));
+            }
+        }
+
+        private static async Task<Http2LoopbackConnection> GetConnection(Http2LoopbackServer server, uint maxConcurrentStreams, int readTimeout) =>
+            await server.EstablishConnectionAsync(TimeSpan.FromSeconds(readTimeout), TimeSpan.FromSeconds(10), new SettingsEntry { SettingId = SettingId.MaxConcurrentStreams, Value = maxConcurrentStreams }).ConfigureAwait(false);
+
+        private async Task<(int Count, int LastStreamId)> HandleAllPendingRequests(Http2LoopbackConnection connection, int totalRequestCount)
+        {
+            int streamId = -1;
+            for (int i = 0; i < totalRequestCount; i++)
+            {
+                try
+                {
+                    // Exact number of requests sent over the given connection is unknown,
+                    // so we keep reading headers and sending response while there are available requests.
+                    streamId = await connection.ReadRequestHeaderAsync().ConfigureAwait(false);
+                    await connection.SendDefaultResponseAsync(streamId).ConfigureAwait(false);
+                }
+                catch (OperationCanceledException)
+                {
+                    return (i, streamId);
+                }
+            }
+
+            return (totalRequestCount, streamId);
+        }
+
+        private async Task<List<int>> AcceptRequests(Http2LoopbackConnection connection, int maxRequests = int.MaxValue)
+        {
+            List<int> streamIds = new List<int>();
+            for (int i = 0; i < maxRequests; i++)
+            {
+                try
+                {
+                    streamIds.Add(await connection.ReadRequestHeaderAsync().ConfigureAwait(false));
+                }
+                catch (OperationCanceledException)
+                {
+                    return streamIds;
+                }
+            }
+
+            return streamIds;
+        }
+
+        private async Task<int> SendResponses(Http2LoopbackConnection connection, IEnumerable<int> streamIds)
+        {
+            int count = 0;
+            foreach (int streamId in streamIds)
+            {
+                count++;
+                await connection.SendDefaultResponseAsync(streamId).ConfigureAwait(false);
+            }
+
+            return count;
+        }
     }
 
     [ConditionalClass(typeof(PlatformDetection), nameof(PlatformDetection.SupportsAlpn))]