From: Alexander Nikolaev <55398552+alnikola@users.noreply.github.com> Date: Tue, 28 Jul 2020 10:43:39 +0000 (+0200) Subject: [retry only] Additional HTTP/2 connections created when active streams limit is reach... X-Git-Tag: submit/tizen/20210909.063632~6390 X-Git-Url: http://review.tizen.org/git/?a=commitdiff_plain;h=43670d59e25a6f9b5e6aeb8d1fd7cff1abe6d947;p=platform%2Fupstream%2Fdotnet%2Fruntime.git [retry only] Additional HTTP/2 connections created when active streams limit is reached (#39439) HTTP/2 standard commands clients to not open more than one HTTP/2 connection to the same server. At the same time, server has right to limit the maximum number of active streams per that HTTP/2 connection. These two directives combined impose limit on the number of requests concurrently send to the server. This limitation is justified in client to server scenarios, but become a bottleneck in server to server cases like gRPC. This PR introduces a new SocketsHttpHandler API enabling establishing additional HTTP/2 connections to the same server when the maximum stream limit is reached on the existing ones. **Note**. This algorithm version uses only retries to make request choose another connection when all stream slots are occupied. It does not implement stream credit management in `HttpConnectionPool` and therefore exhibit a sub-optimal request scheduling behavior in "request burst" and "infinite requests" scenarios. Fixes #35088 --- diff --git a/src/libraries/Common/tests/System/Net/Http/Http2LoopbackConnection.cs b/src/libraries/Common/tests/System/Net/Http/Http2LoopbackConnection.cs index 3750a36..3bc2097 100644 --- a/src/libraries/Common/tests/System/Net/Http/Http2LoopbackConnection.cs +++ b/src/libraries/Common/tests/System/Net/Http/Http2LoopbackConnection.cs @@ -21,18 +21,25 @@ namespace System.Net.Test.Common private Stream _connectionStream; private TaskCompletionSource _ignoredSettingsAckPromise; private bool _ignoreWindowUpdates; - public static TimeSpan Timeout => Http2LoopbackServer.Timeout; + private readonly TimeSpan _timeout; private int _lastStreamId; private readonly byte[] _prefix; public string PrefixString => Encoding.UTF8.GetString(_prefix, 0, _prefix.Length); public bool IsInvalid => _connectionSocket == null; public Stream Stream => _connectionStream; + public Task SettingAckWaiter => _ignoredSettingsAckPromise?.Task; public Http2LoopbackConnection(Socket socket, Http2Options httpOptions) + : this(socket, httpOptions, Http2LoopbackServer.Timeout) + { + } + + public Http2LoopbackConnection(Socket socket, Http2Options httpOptions, TimeSpan timeout) { _connectionSocket = socket; _connectionStream = new NetworkStream(_connectionSocket, true); + _timeout = timeout; if (httpOptions.UseSsl) { @@ -81,12 +88,12 @@ namespace System.Net.Test.Common await WriteFrameAsync(emptySettings).ConfigureAwait(false); // Receive and ACK the client settings frame. - Frame clientSettings = await ReadFrameAsync(Timeout).ConfigureAwait(false); + Frame clientSettings = await ReadFrameAsync(_timeout).ConfigureAwait(false); clientSettings.Flags = clientSettings.Flags | FrameFlags.Ack; await WriteFrameAsync(clientSettings).ConfigureAwait(false); // Receive the client ACK of the server settings frame. - clientSettings = await ReadFrameAsync(Timeout).ConfigureAwait(false); + clientSettings = await ReadFrameAsync(_timeout).ConfigureAwait(false); } public async Task WriteFrameAsync(Frame frame) @@ -225,7 +232,7 @@ namespace System.Net.Test.Common public async Task ReadRstStreamAsync(int streamId) { - Frame frame = await ReadFrameAsync(Timeout); + Frame frame = await ReadFrameAsync(_timeout); if (frame == null) { @@ -248,7 +255,7 @@ namespace System.Net.Test.Common { IgnoreWindowUpdates(); - Frame frame = await ReadFrameAsync(Timeout).ConfigureAwait(false); + Frame frame = await ReadFrameAsync(_timeout).ConfigureAwait(false); if (frame != null) { if (!ignoreUnexpectedFrames) @@ -310,7 +317,7 @@ namespace System.Net.Test.Common public async Task ReadRequestHeaderFrameAsync() { // Receive HEADERS frame for request. - Frame frame = await ReadFrameAsync(Timeout).ConfigureAwait(false); + Frame frame = await ReadFrameAsync(_timeout).ConfigureAwait(false); if (frame == null) { throw new IOException("Failed to read Headers frame."); @@ -476,7 +483,7 @@ namespace System.Net.Test.Common do { - frame = await ReadFrameAsync(Timeout).ConfigureAwait(false); + frame = await ReadFrameAsync(_timeout).ConfigureAwait(false); if (frame == null && expectEndOfStream) { break; @@ -516,7 +523,7 @@ namespace System.Net.Test.Common HttpRequestData requestData = new HttpRequestData(); // Receive HEADERS frame for request. - Frame frame = await ReadFrameAsync(Timeout).ConfigureAwait(false); + Frame frame = await ReadFrameAsync(_timeout).ConfigureAwait(false); if (frame == null) { throw new IOException("Failed to read Headers frame."); @@ -567,7 +574,7 @@ namespace System.Net.Test.Common byte[] pingData = new byte[8] { 1, 2, 3, 4, 50, 60, 70, 80 }; PingFrame ping = new PingFrame(pingData, FrameFlags.None, 0); await WriteFrameAsync(ping).ConfigureAwait(false); - PingFrame pingAck = (PingFrame)await ReadFrameAsync(Timeout).ConfigureAwait(false); + PingFrame pingAck = (PingFrame)await ReadFrameAsync(_timeout).ConfigureAwait(false); if (pingAck == null || pingAck.Type != FrameType.Ping || !pingAck.AckFlag) { throw new Exception("Expected PING ACK"); diff --git a/src/libraries/Common/tests/System/Net/Http/Http2LoopbackServer.cs b/src/libraries/Common/tests/System/Net/Http/Http2LoopbackServer.cs index a3ff123..8516140 100644 --- a/src/libraries/Common/tests/System/Net/Http/Http2LoopbackServer.cs +++ b/src/libraries/Common/tests/System/Net/Http/Http2LoopbackServer.cs @@ -74,7 +74,12 @@ namespace System.Net.Test.Common _connections.RemoveAll((c) => c.IsInvalid); } - public async Task AcceptConnectionAsync() + public Task AcceptConnectionAsync() + { + return AcceptConnectionAsync(null); + } + + public async Task AcceptConnectionAsync(TimeSpan? timeout) { RemoveInvalidConnections(); @@ -85,7 +90,7 @@ namespace System.Net.Test.Common Socket connectionSocket = await _listenSocket.AcceptAsync().ConfigureAwait(false); - Http2LoopbackConnection connection = new Http2LoopbackConnection(connectionSocket, _options); + Http2LoopbackConnection connection = timeout != null ? new Http2LoopbackConnection(connectionSocket, _options, timeout.Value) : new Http2LoopbackConnection(connectionSocket, _options); _connections.Add(connection); return connection; @@ -96,15 +101,25 @@ namespace System.Net.Test.Common return await EstablishConnectionAsync(); } - public async Task EstablishConnectionAsync(params SettingsEntry[] settingsEntries) + public Task EstablishConnectionAsync(params SettingsEntry[] settingsEntries) { - (Http2LoopbackConnection connection, _) = await EstablishConnectionGetSettingsAsync(settingsEntries).ConfigureAwait(false); + return EstablishConnectionAsync(null, null, settingsEntries); + } + + public async Task EstablishConnectionAsync(TimeSpan? timeout, TimeSpan? ackTimeout, params SettingsEntry[] settingsEntries) + { + (Http2LoopbackConnection connection, _) = await EstablishConnectionGetSettingsAsync(timeout, ackTimeout, settingsEntries).ConfigureAwait(false); return connection; } - public async Task<(Http2LoopbackConnection, SettingsFrame)> EstablishConnectionGetSettingsAsync(params SettingsEntry[] settingsEntries) + public Task<(Http2LoopbackConnection, SettingsFrame)> EstablishConnectionGetSettingsAsync(params SettingsEntry[] settingsEntries) + { + return EstablishConnectionGetSettingsAsync(null, null, settingsEntries); + } + + public async Task<(Http2LoopbackConnection, SettingsFrame)> EstablishConnectionGetSettingsAsync(TimeSpan? timeout, TimeSpan? ackTimeout, params SettingsEntry[] settingsEntries) { - Http2LoopbackConnection connection = await AcceptConnectionAsync().ConfigureAwait(false); + Http2LoopbackConnection connection = await AcceptConnectionAsync(timeout).ConfigureAwait(false); // Receive the initial client settings frame. Frame receivedFrame = await connection.ReadFrameAsync(Timeout).ConfigureAwait(false); @@ -129,7 +144,7 @@ namespace System.Net.Test.Common await connection.WriteFrameAsync(settingsAck).ConfigureAwait(false); // The client will send us a SETTINGS ACK eventually, but not necessarily right away. - await connection.ExpectSettingsAckAsync(); + await connection.ExpectSettingsAckAsync((int) (ackTimeout?.TotalMilliseconds ?? 5000)); return (connection, clientSettingsFrame); } diff --git a/src/libraries/System.Net.Http/ref/System.Net.Http.cs b/src/libraries/System.Net.Http/ref/System.Net.Http.cs index dc1e77f..3386d32 100644 --- a/src/libraries/System.Net.Http/ref/System.Net.Http.cs +++ b/src/libraries/System.Net.Http/ref/System.Net.Http.cs @@ -310,6 +310,7 @@ namespace System.Net.Http protected override void Dispose(bool disposing) { } protected internal override System.Net.Http.HttpResponseMessage Send(System.Net.Http.HttpRequestMessage request, System.Threading.CancellationToken cancellationToken) { throw null; } protected internal override System.Threading.Tasks.Task SendAsync(System.Net.Http.HttpRequestMessage request, System.Threading.CancellationToken cancellationToken) { throw null; } + public bool EnableMultipleHttp2Connections { get { throw null; } set { } } } public partial class StreamContent : System.Net.Http.HttpContent { diff --git a/src/libraries/System.Net.Http/src/System/Net/Http/BrowserHttpHandler/SocketsHttpHandler.cs b/src/libraries/System.Net.Http/src/System/Net/Http/BrowserHttpHandler/SocketsHttpHandler.cs index 6f47475..4644b8b 100644 --- a/src/libraries/System.Net.Http/src/System/Net/Http/BrowserHttpHandler/SocketsHttpHandler.cs +++ b/src/libraries/System.Net.Http/src/System/Net/Http/BrowserHttpHandler/SocketsHttpHandler.cs @@ -131,5 +131,11 @@ namespace System.Net.Http protected internal override Task SendAsync( HttpRequestMessage request, CancellationToken cancellationToken) => throw new PlatformNotSupportedException(); + + public bool EnableMultipleHttp2Connections + { + get => throw new PlatformNotSupportedException(); + set => throw new PlatformNotSupportedException(); + } } } diff --git a/src/libraries/System.Net.Http/src/System/Net/Http/RequestRetryType.cs b/src/libraries/System.Net.Http/src/System/Net/Http/RequestRetryType.cs index 4456e4f..f2e29c4 100644 --- a/src/libraries/System.Net.Http/src/System/Net/Http/RequestRetryType.cs +++ b/src/libraries/System.Net.Http/src/System/Net/Http/RequestRetryType.cs @@ -26,6 +26,10 @@ namespace System.Net.Http /// /// The proxy failed, so the request should be retried on the next proxy. /// - RetryOnNextProxy + RetryOnNextProxy, + + /// The HTTP/2 connection reached the maximum number of streams and + /// another HTTP/2 connection must be created or found to serve the request. + RetryOnNextConnection } } diff --git a/src/libraries/System.Net.Http/src/System/Net/Http/SocketsHttpHandler/CreditManager.cs b/src/libraries/System.Net.Http/src/System/Net/Http/SocketsHttpHandler/CreditManager.cs index 0439b51..f6d37ae 100644 --- a/src/libraries/System.Net.Http/src/System/Net/Http/SocketsHttpHandler/CreditManager.cs +++ b/src/libraries/System.Net.Http/src/System/Net/Http/SocketsHttpHandler/CreditManager.cs @@ -28,6 +28,8 @@ namespace System.Net.Http _current = initialCredit; } + public bool IsCreditAvailable => Volatile.Read(ref _current) > 0; + private object SyncObject { // Generally locking on "this" is considered poor form, but this type is internal, @@ -35,23 +37,23 @@ namespace System.Net.Http get => this; } - public ValueTask RequestCreditAsync(int amount, CancellationToken cancellationToken) + public bool TryRequestCreditNoWait(int amount) { lock (SyncObject) { - if (_disposed) - { - throw new ObjectDisposedException($"{nameof(CreditManager)}:{_owner.GetType().Name}:{_name}"); - } + return TryRequestCreditNoLock(amount) > 0; + } + } + public ValueTask RequestCreditAsync(int amount, CancellationToken cancellationToken) + { + lock (SyncObject) + { // If we can satisfy the request with credit already available, do so synchronously. - if (_current > 0) - { - Debug.Assert(_waitersTail is null, "Shouldn't have waiters when credit is available"); + int granted = TryRequestCreditNoLock(amount); - int granted = Math.Min(amount, _current); - if (NetEventSource.Log.IsEnabled()) _owner.Trace($"{_name}. requested={amount}, current={_current}, granted={granted}"); - _current -= granted; + if (granted > 0) + { return new ValueTask(granted); } @@ -155,5 +157,26 @@ namespace System.Net.Http } } } + + private int TryRequestCreditNoLock(int amount) + { + Debug.Assert(Monitor.IsEntered(SyncObject), "Shouldn't be called outside lock."); + + if (_disposed) + { + throw new ObjectDisposedException($"{nameof(CreditManager)}:{_owner.GetType().Name}:{_name}"); + } + + if (_current > 0) + { + Debug.Assert(_waitersTail is null, "Shouldn't have waiters when credit is available"); + + int granted = Math.Min(amount, _current); + if (NetEventSource.Log.IsEnabled()) _owner.Trace($"{_name}. requested={amount}, current={_current}, granted={granted}"); + _current -= granted; + return granted; + } + return 0; + } } } diff --git a/src/libraries/System.Net.Http/src/System/Net/Http/SocketsHttpHandler/Http2Connection.cs b/src/libraries/System.Net.Http/src/System/Net/Http/SocketsHttpHandler/Http2Connection.cs index 5432df7..83f7953 100644 --- a/src/libraries/System.Net.Http/src/System/Net/Http/SocketsHttpHandler/Http2Connection.cs +++ b/src/libraries/System.Net.Http/src/System/Net/Http/SocketsHttpHandler/Http2Connection.cs @@ -114,12 +114,15 @@ namespace System.Net.Http _initialWindowSize = DefaultInitialWindowSize; _maxConcurrentStreams = int.MaxValue; _pendingWindowUpdate = 0; + _idleSinceTickCount = Environment.TickCount64; if (NetEventSource.Log.IsEnabled()) TraceConnection(stream); } private object SyncObject => _httpStreams; + public bool CanAddNewStream => _concurrentStreams.IsCreditAvailable; + public async ValueTask SetupAsync() { _outgoingBuffer.EnsureAvailableSpace(s_http2ConnectionPreface.Length + @@ -1203,7 +1206,17 @@ namespace System.Net.Http // in order to avoid consuming resources in potentially many requests waiting for access. try { - await _concurrentStreams.RequestCreditAsync(1, cancellationToken).ConfigureAwait(false); + if (_pool.EnableMultipleHttp2Connections) + { + if (!_concurrentStreams.TryRequestCreditNoWait(1)) + { + throw new HttpRequestException(null, null, RequestRetryType.RetryOnNextConnection); + } + } + else + { + await _concurrentStreams.RequestCreditAsync(1, cancellationToken).ConfigureAwait(false); + } } catch (ObjectDisposedException) { diff --git a/src/libraries/System.Net.Http/src/System/Net/Http/SocketsHttpHandler/HttpConnectionPool.cs b/src/libraries/System.Net.Http/src/System/Net/Http/SocketsHttpHandler/HttpConnectionPool.cs index a363430..072c9c2 100644 --- a/src/libraries/System.Net.Http/src/System/Net/Http/SocketsHttpHandler/HttpConnectionPool.cs +++ b/src/libraries/System.Net.Http/src/System/Net/Http/SocketsHttpHandler/HttpConnectionPool.cs @@ -66,7 +66,8 @@ namespace System.Net.Http private readonly int _maxConnections; private bool _http2Enabled; - private Http2Connection? _http2Connection; + // This array must be treated as immutable. It can only be replaced with a new value in AddHttp2Connection method. + private volatile Http2Connection[]? _http2Connections; private SemaphoreSlim? _http2ConnectionCreateLock; private byte[]? _http2AltSvcOriginUri; internal readonly byte[]? _http2EncodedAuthorityHostHeader; @@ -329,6 +330,8 @@ namespace System.Net.Http } } + public bool EnableMultipleHttp2Connections => _poolManager.Settings.EnableMultipleHttp2Connections; + /// Object used to synchronize access to state in the pool. private object SyncObj => _idleConnections; @@ -483,24 +486,14 @@ namespace System.Net.Http Debug.Assert(_kind == HttpConnectionKind.Https || _kind == HttpConnectionKind.SslProxyTunnel || _kind == HttpConnectionKind.Http); // See if we have an HTTP2 connection - Http2Connection? http2Connection = _http2Connection; + Http2Connection? http2Connection = GetExistingHttp2Connection(); if (http2Connection != null) { - TimeSpan pooledConnectionLifetime = _poolManager.Settings._pooledConnectionLifetime; - if (http2Connection.LifetimeExpired(Environment.TickCount64, pooledConnectionLifetime)) - { - // Connection expired. - http2Connection.Dispose(); - InvalidateHttp2Connection(http2Connection); - } - else - { - // Connection exist and it is still good to use. - if (NetEventSource.Log.IsEnabled()) Trace("Using existing HTTP2 connection."); - _usedSinceLastCleanup = true; - return (http2Connection, false, null); - } + // Connection exists and it is still good to use. + if (NetEventSource.Log.IsEnabled()) Trace("Using existing HTTP2 connection."); + _usedSinceLastCleanup = true; + return (http2Connection, false, null); } // Ensure that the connection creation semaphore is created @@ -524,16 +517,10 @@ namespace System.Net.Http await _http2ConnectionCreateLock.WaitAsync(cancellationToken).ConfigureAwait(false); try { - if (_http2Connection != null) + http2Connection = GetExistingHttp2Connection(); + if (http2Connection != null) { - // Someone beat us to it - - if (NetEventSource.Log.IsEnabled()) - { - Trace("Using existing HTTP2 connection."); - } - - return (_http2Connection, false, null); + return (http2Connection, false, null); } // Recheck if HTTP2 has been disabled by a previous attempt. @@ -558,15 +545,14 @@ namespace System.Net.Http http2Connection = new Http2Connection(this, stream!); await http2Connection.SetupAsync().ConfigureAwait(false); - Debug.Assert(_http2Connection == null); - _http2Connection = http2Connection; + AddHttp2Connection(http2Connection); if (NetEventSource.Log.IsEnabled()) { Trace("New unencrypted HTTP2 connection established."); } - return (_http2Connection, true, null); + return (http2Connection, true, null); } sslStream = (SslStream)stream!; @@ -582,15 +568,14 @@ namespace System.Net.Http http2Connection = new Http2Connection(this, sslStream); await http2Connection.SetupAsync().ConfigureAwait(false); - Debug.Assert(_http2Connection == null); - _http2Connection = http2Connection; + AddHttp2Connection(http2Connection); if (NetEventSource.Log.IsEnabled()) { Trace("New HTTP2 connection established."); } - return (_http2Connection, true, null); + return (http2Connection, true, null); } } } @@ -648,6 +633,53 @@ namespace System.Net.Http return await GetHttpConnectionAsync(request, async, cancellationToken).ConfigureAwait(false); } + private Http2Connection? GetExistingHttp2Connection() + { + Http2Connection[]? localConnections = _http2Connections; + + if (localConnections == null) + { + return null; + } + + for (int i = 0; i < localConnections.Length; i++) + { + Http2Connection http2Connection = localConnections[i]; + + TimeSpan pooledConnectionLifetime = _poolManager.Settings._pooledConnectionLifetime; + if (http2Connection.LifetimeExpired(Environment.TickCount64, pooledConnectionLifetime)) + { + // Connection expired. + http2Connection.Dispose(); + InvalidateHttp2Connection(http2Connection); + } + else if (!EnableMultipleHttp2Connections || http2Connection.CanAddNewStream) + { + return http2Connection; + } + } + + return null; + } + + private void AddHttp2Connection(Http2Connection newConnection) + { + lock (SyncObj) + { + Http2Connection[]? localHttp2Connections = _http2Connections; + int newCollectionSize = localHttp2Connections == null ? 1 : localHttp2Connections.Length + 1; + Http2Connection[] newHttp2Connections = new Http2Connection[newCollectionSize]; + newHttp2Connections[0] = newConnection; + + if (localHttp2Connections != null) + { + Array.Copy(localHttp2Connections, 0, newHttp2Connections, 1, localHttp2Connections.Length); + } + + _http2Connections = newHttp2Connections; + } + } + private async ValueTask<(HttpConnectionBase? connection, bool isNewConnection, HttpResponseMessage? failureResponse)> GetHttp3ConnectionAsync(HttpRequestMessage request, HttpAuthority authority, CancellationToken cancellationToken) { @@ -794,6 +826,16 @@ namespace System.Net.Http // Eat exception and try again. continue; } + catch (HttpRequestException e) when (e.AllowRetry == RequestRetryType.RetryOnNextConnection) + { + if (NetEventSource.Log.IsEnabled()) + { + Trace($"Retrying request on another HTTP/2 connection after active streams limit is reached on existing one: {e}"); + } + + // Eat exception and try again. + continue; + } // Check for the Alt-Svc header, to upgrade to HTTP/3. if (_altSvcEnabled && response.Headers.TryGetValues(KnownHeaders.AltSvc.Descriptor, out IEnumerable? altSvcHeaderValues)) @@ -1355,9 +1397,39 @@ namespace System.Net.Http { lock (SyncObj) { - if (_http2Connection == connection) + Http2Connection[]? localHttp2Connections = _http2Connections; + + if (localHttp2Connections == null) + { + return; + } + + if (localHttp2Connections.Length == 1) + { + // Fast shortcut for the most common case. + if (localHttp2Connections[0] == connection) + { + _http2Connections = null; + } + return; + } + + int invalidatedIndex = Array.IndexOf(localHttp2Connections, connection); + if (invalidatedIndex >= 0) { - _http2Connection = null; + Http2Connection[] newHttp2Connections = new Http2Connection[localHttp2Connections.Length - 1]; + + if (invalidatedIndex > 0) + { + Array.Copy(localHttp2Connections, newHttp2Connections, invalidatedIndex); + } + + if (invalidatedIndex < localHttp2Connections.Length - 1) + { + Array.Copy(localHttp2Connections, invalidatedIndex + 1, newHttp2Connections, invalidatedIndex, newHttp2Connections.Length - invalidatedIndex); + } + + _http2Connections = newHttp2Connections; } } } @@ -1389,10 +1461,13 @@ namespace System.Net.Http list.ForEach(c => c._connection.Dispose()); list.Clear(); - if (_http2Connection != null) + if (_http2Connections != null) { - _http2Connection.Dispose(); - _http2Connection = null; + for (int i = 0; i < _http2Connections.Length; i++) + { + _http2Connections[i].Dispose(); + } + _http2Connections = null; } if (_authorityExpireTimer != null) @@ -1436,15 +1511,51 @@ namespace System.Net.Http // Get the current time. This is compared against each connection's last returned // time to determine whether a connection is too old and should be closed. long nowTicks = Environment.TickCount64; - Http2Connection? http2Connection = _http2Connection; + // Copy the reference to a local variable to simplify the removal logic below. + Http2Connection[]? localHttp2Connections = _http2Connections; - if (http2Connection != null) + if (localHttp2Connections != null) { - if (http2Connection.IsExpired(nowTicks, pooledConnectionLifetime, pooledConnectionIdleTimeout)) + Http2Connection[]? newHttp2Connections = null; + int newIndex = 0; + for (int i = 0; i < localHttp2Connections.Length; i++) + { + Http2Connection http2Connection = localHttp2Connections[i]; + if (http2Connection.IsExpired(nowTicks, pooledConnectionLifetime, pooledConnectionIdleTimeout)) + { + http2Connection.Dispose(); + + if (newHttp2Connections == null) + { + newHttp2Connections = new Http2Connection[localHttp2Connections.Length]; + if (i > 0) + { + // Copy valid connections residing at the beggining of the current collection. + Array.Copy(localHttp2Connections, newHttp2Connections, i); + newIndex = i; + } + } + } + else if (newHttp2Connections != null) + { + newHttp2Connections[newIndex] = localHttp2Connections[i]; + newIndex++; + } + } + + if (newHttp2Connections != null) { - http2Connection.Dispose(); - // We can set _http2Connection directly while holding lock instead of calling InvalidateHttp2Connection(). - _http2Connection = null; + //Some connections have been removed, so _http2Connections must be replaced. + if (newIndex > 0) + { + Array.Resize(ref newHttp2Connections, newIndex); + _http2Connections = newHttp2Connections; + } + else + { + // All connections expired. + _http2Connections = null; + } } } @@ -1493,7 +1604,7 @@ namespace System.Net.Http // if a pool was used since the last time we cleaned up, give it another chance. New pools // start out saying they've recently been used, to give them a bit of breathing room and time // for the initial collection to be added to it. - if (_associatedConnectionCount == 0 && !_usedSinceLastCleanup && _http2Connection == null) + if (_associatedConnectionCount == 0 && !_usedSinceLastCleanup && _http2Connections == null) { Debug.Assert(list.Count == 0, $"Expected {nameof(list)}.{nameof(list.Count)} == 0"); _disposed = true; diff --git a/src/libraries/System.Net.Http/src/System/Net/Http/SocketsHttpHandler/HttpConnectionSettings.cs b/src/libraries/System.Net.Http/src/System/Net/Http/SocketsHttpHandler/HttpConnectionSettings.cs index 959f82b..4ba7696 100644 --- a/src/libraries/System.Net.Http/src/System/Net/Http/SocketsHttpHandler/HttpConnectionSettings.cs +++ b/src/libraries/System.Net.Http/src/System/Net/Http/SocketsHttpHandler/HttpConnectionSettings.cs @@ -52,6 +52,8 @@ namespace System.Net.Http internal SslClientAuthenticationOptions? _sslOptions; + internal bool _enableMultipleHttp2Connections; + internal IDictionary? _properties; public HttpConnectionSettings() @@ -101,7 +103,8 @@ namespace System.Net.Http _useCookies = _useCookies, _useProxy = _useProxy, _allowUnencryptedHttp2 = _allowUnencryptedHttp2, - _assumePrenegotiatedHttp3ForTesting = _assumePrenegotiatedHttp3ForTesting + _assumePrenegotiatedHttp3ForTesting = _assumePrenegotiatedHttp3ForTesting, + _enableMultipleHttp2Connections = _enableMultipleHttp2Connections }; } @@ -183,6 +186,8 @@ namespace System.Net.Http } } + public bool EnableMultipleHttp2Connections => _enableMultipleHttp2Connections; + private byte[]? _http3SettingsFrame; internal byte[] Http3SettingsFrame => _http3SettingsFrame ??= Http3Connection.BuildSettingsFrame(this); } diff --git a/src/libraries/System.Net.Http/src/System/Net/Http/SocketsHttpHandler/SocketsHttpHandler.cs b/src/libraries/System.Net.Http/src/System/Net/Http/SocketsHttpHandler/SocketsHttpHandler.cs index 9b122a6..1874723 100644 --- a/src/libraries/System.Net.Http/src/System/Net/Http/SocketsHttpHandler/SocketsHttpHandler.cs +++ b/src/libraries/System.Net.Http/src/System/Net/Http/SocketsHttpHandler/SocketsHttpHandler.cs @@ -273,6 +273,17 @@ namespace System.Net.Http } } + public bool EnableMultipleHttp2Connections + { + get => _settings._enableMultipleHttp2Connections; + set + { + CheckDisposedOrStarted(); + + _settings._enableMultipleHttp2Connections = value; + } + } + internal bool SupportsAutomaticDecompression => true; internal bool SupportsProxy => true; internal bool SupportsRedirectConfiguration => true; diff --git a/src/libraries/System.Net.Http/tests/FunctionalTests/SocketsHttpHandlerTest.cs b/src/libraries/System.Net.Http/tests/FunctionalTests/SocketsHttpHandlerTest.cs index ae32a02..3a4aef6 100644 --- a/src/libraries/System.Net.Http/tests/FunctionalTests/SocketsHttpHandlerTest.cs +++ b/src/libraries/System.Net.Http/tests/FunctionalTests/SocketsHttpHandlerTest.cs @@ -1925,6 +1925,300 @@ namespace System.Net.Http.Functional.Tests public sealed class SocketsHttpHandlerTest_Http2 : HttpClientHandlerTest_Http2 { public SocketsHttpHandlerTest_Http2(ITestOutputHelper output) : base(output) { } + + [ConditionalFact(nameof(SupportsAlpn))] + public async Task Http2_MultipleConnectionsEnabled_ConnectionLimitNotReached_ConcurrentRequestsSuccessfullyHandled() + { + const int MaxConcurrentStreams = 2; + using Http2LoopbackServer server = Http2LoopbackServer.CreateServer(); + using SocketsHttpHandler handler = CreateHandler(); + using (HttpClient client = CreateHttpClient(handler)) + { + server.AllowMultipleConnections = true; + List> sendTasks = new List>(); + List connections = new List(); + List acceptedStreams = new List(); + for (int i = 0; i < 3; i++) + { + Http2LoopbackConnection connection = await PrepareConnection(server, client, MaxConcurrentStreams).ConfigureAwait(false); + AcquireAllStreamSlots(server, client, sendTasks, MaxConcurrentStreams); + connections.Add(connection); + int prevAcceptedStreamCount = acceptedStreams.Count; + acceptedStreams.AddRange(await AcceptRequests(connection, MaxConcurrentStreams).ConfigureAwait(false)); + Assert.Equal(prevAcceptedStreamCount + MaxConcurrentStreams, acceptedStreams.Count); + } + + int responseIndex = 0; + List responseTasks = new List(); + foreach (Http2LoopbackConnection connection in connections) + { + for (int i = 0; i < MaxConcurrentStreams; i++) + { + int streamId = acceptedStreams[responseIndex++]; + responseTasks.Add(connection.SendDefaultResponseAsync(streamId)); + } + } + + await Task.WhenAll(responseTasks).TimeoutAfter(TestHelper.PassingTestTimeoutMilliseconds).ConfigureAwait(false); + await Task.WhenAll(sendTasks).TimeoutAfter(TestHelper.PassingTestTimeoutMilliseconds).ConfigureAwait(false); + + await VerifySendTasks(sendTasks).ConfigureAwait(false); + } + } + + [ConditionalFact(nameof(SupportsAlpn))] + public async Task Http2_MultipleConnectionsEnabled_InfiniteRequestsCompletelyBlockOneConnection_RemaningRequestsAreHandledByNewConnection() + { + const int MaxConcurrentStreams = 2; + using Http2LoopbackServer server = Http2LoopbackServer.CreateServer(); + using SocketsHttpHandler handler = CreateHandler(); + using (HttpClient client = CreateHttpClient(handler)) + { + server.AllowMultipleConnections = true; + List> sendTasks = new List>(); + Http2LoopbackConnection connection0 = await PrepareConnection(server, client, MaxConcurrentStreams).ConfigureAwait(false); + AcquireAllStreamSlots(server, client, sendTasks, MaxConcurrentStreams); + + // Block the first connection on infinite requests. + List blockedStreamIds = await AcceptRequests(connection0, MaxConcurrentStreams).ConfigureAwait(false); + Assert.Equal(MaxConcurrentStreams, blockedStreamIds.Count); + + Http2LoopbackConnection connection1 = await PrepareConnection(server, client, MaxConcurrentStreams).ConfigureAwait(false); + AcquireAllStreamSlots(server, client, sendTasks, MaxConcurrentStreams); + + int handledRequestCount = (await HandleAllPendingRequests(connection1, MaxConcurrentStreams).ConfigureAwait(false)).Count; + + Assert.Equal(MaxConcurrentStreams, handledRequestCount); + + //Complete inifinite requests. + handledRequestCount = await SendResponses(connection0, blockedStreamIds); + + Assert.Equal(MaxConcurrentStreams, handledRequestCount); + + await Task.WhenAll(sendTasks).TimeoutAfter(TestHelper.PassingTestTimeoutMilliseconds).ConfigureAwait(false); + + await VerifySendTasks(sendTasks).ConfigureAwait(false); + } + } + + [ConditionalFact(nameof(SupportsAlpn))] + public async Task Http2_MultipleConnectionsEnabled_OpenAndCloseMultipleConnections_Success() + { + const int MaxConcurrentStreams = 2; + using Http2LoopbackServer server = Http2LoopbackServer.CreateServer(); + using SocketsHttpHandler handler = CreateHandler(); + using (HttpClient client = CreateHttpClient(handler)) + { + server.AllowMultipleConnections = true; + List> sendTasks = new List>(); + Http2LoopbackConnection connection0 = await PrepareConnection(server, client, MaxConcurrentStreams).ConfigureAwait(false); + AcquireAllStreamSlots(server, client, sendTasks, MaxConcurrentStreams); + Http2LoopbackConnection connection1 = await PrepareConnection(server, client, MaxConcurrentStreams).ConfigureAwait(false); + AcquireAllStreamSlots(server, client, sendTasks, MaxConcurrentStreams); + Http2LoopbackConnection connection2 = await PrepareConnection(server, client, MaxConcurrentStreams).ConfigureAwait(false); + AcquireAllStreamSlots(server, client, sendTasks, MaxConcurrentStreams); + + Task<(int Count, int LastStreamId)>[] handleRequestTasks = new[] { + HandleAllPendingRequests(connection0, sendTasks.Count), + HandleAllPendingRequests(connection1, sendTasks.Count), + HandleAllPendingRequests(connection2, sendTasks.Count) + }; + + await Task.WhenAll(handleRequestTasks).TimeoutAfter(TestHelper.PassingTestTimeoutMilliseconds).ConfigureAwait(false); + + Assert.Equal(handleRequestTasks[0].Result.Count, MaxConcurrentStreams); + Assert.Equal(handleRequestTasks[1].Result.Count, MaxConcurrentStreams); + Assert.Equal(handleRequestTasks[2].Result.Count, MaxConcurrentStreams); + + await connection0.ShutdownIgnoringErrorsAsync(handleRequestTasks[0].Result.LastStreamId).ConfigureAwait(false); + await connection2.ShutdownIgnoringErrorsAsync(handleRequestTasks[2].Result.LastStreamId).ConfigureAwait(false); + + //Fill all connection1's stream slots + AcquireAllStreamSlots(server, client, sendTasks, MaxConcurrentStreams); + + Http2LoopbackConnection connection3 = await PrepareConnection(server, client, MaxConcurrentStreams).ConfigureAwait(false); + AcquireAllStreamSlots(server, client, sendTasks, MaxConcurrentStreams); + Http2LoopbackConnection connection4 = await PrepareConnection(server, client, MaxConcurrentStreams).ConfigureAwait(false); + AcquireAllStreamSlots(server, client, sendTasks, MaxConcurrentStreams); + + Task<(int Count, int LastStreamId)>[] finalHandleTasks = new[] { + HandleAllPendingRequests(connection1, sendTasks.Count), + HandleAllPendingRequests(connection3, sendTasks.Count), + HandleAllPendingRequests(connection4, sendTasks.Count) + }; + + await Task.WhenAll(finalHandleTasks).TimeoutAfter(TestHelper.PassingTestTimeoutMilliseconds).ConfigureAwait(false); + + Assert.Equal(finalHandleTasks[0].Result.Count, MaxConcurrentStreams); + Assert.Equal(finalHandleTasks[1].Result.Count, MaxConcurrentStreams); + Assert.Equal(finalHandleTasks[2].Result.Count, MaxConcurrentStreams); + + await Task.WhenAll(sendTasks).TimeoutAfter(TestHelper.PassingTestTimeoutMilliseconds).ConfigureAwait(false); + + await VerifySendTasks(sendTasks).ConfigureAwait(false); + } + } + + [ConditionalFact(nameof(SupportsAlpn))] + public async Task Http2_MultipleConnectionsEnabled_IdleConnectionTimeoutExpired_ConnectionRemovedAndNewCreated() + { + const int MaxConcurrentStreams = 2; + using Http2LoopbackServer server = Http2LoopbackServer.CreateServer(); + using SocketsHttpHandler handler = CreateHandler(); + handler.PooledConnectionIdleTimeout = TimeSpan.FromSeconds(5); + using (HttpClient client = CreateHttpClient(handler)) + { + server.AllowMultipleConnections = true; + List> sendTasks = new List>(); + Http2LoopbackConnection connection0 = await PrepareConnection(server, client, MaxConcurrentStreams).ConfigureAwait(false); + AcquireAllStreamSlots(server, client, sendTasks, MaxConcurrentStreams); + List acceptedStreamIds = await AcceptRequests(connection0, MaxConcurrentStreams).ConfigureAwait(false); + Assert.Equal(MaxConcurrentStreams, acceptedStreamIds.Count); + + List> connection1SendTasks = new List>(); + Http2LoopbackConnection connection1 = await PrepareConnection(server, client, MaxConcurrentStreams, readTimeout: 10).ConfigureAwait(false); + AcquireAllStreamSlots(server, client, connection1SendTasks, MaxConcurrentStreams); + int handledRequests1 = (await HandleAllPendingRequests(connection1, MaxConcurrentStreams).ConfigureAwait(false)).Count; + + Assert.Equal(MaxConcurrentStreams, handledRequests1); + + // Complete all the requests. + await Task.WhenAll(connection1SendTasks).TimeoutAfter(TestHelper.PassingTestTimeoutMilliseconds).ConfigureAwait(false); + await VerifySendTasks(connection1SendTasks).ConfigureAwait(false); + connection1SendTasks.ForEach(t => t.Result.Dispose()); + + // Wait until the idle connection timeout expires. + await connection1.WaitForClientDisconnectAsync(false).TimeoutAfter(TestHelper.PassingTestTimeoutMilliseconds).ConfigureAwait(false); + // Client connection might be still alive, so send an extra request which will either land on the shutting down connection or on a new one. + try + { + await client.GetAsync(server.Address).TimeoutAfter(handler.PooledConnectionIdleTimeout).ConfigureAwait(false); + } + catch (Exception) + { + // Suppress all exceptions. + } + + Assert.True(connection1.IsInvalid); + Assert.False(connection0.IsInvalid); + + Http2LoopbackConnection connection2 = await PrepareConnection(server, client, MaxConcurrentStreams, readTimeout: 5, expectedWarpUpTasks:2).ConfigureAwait(false); + + AcquireAllStreamSlots(server, client, sendTasks, MaxConcurrentStreams); + + int handledRequests2 = (await HandleAllPendingRequests(connection2, MaxConcurrentStreams).ConfigureAwait(false)).Count; + Assert.Equal(MaxConcurrentStreams, handledRequests2); + + //Make sure connection0 is still alive. + int handledRequests0 = await SendResponses(connection0, acceptedStreamIds).ConfigureAwait(false); + Assert.Equal(MaxConcurrentStreams, handledRequests0); + + await Task.WhenAll(sendTasks).TimeoutAfter(TestHelper.PassingTestTimeoutMilliseconds).ConfigureAwait(false); + + await VerifySendTasks(sendTasks).ConfigureAwait(false); + } + } + + private async Task VerifySendTasks(IReadOnlyList> sendTasks) + { + foreach (Task sendTask in sendTasks) + { + HttpResponseMessage response = await sendTask.ConfigureAwait(false); + Assert.Equal(HttpStatusCode.OK, response.StatusCode); + } + } + + private static SocketsHttpHandler CreateHandler() => new SocketsHttpHandler + { + EnableMultipleHttp2Connections = true, + PooledConnectionIdleTimeout = TimeSpan.FromHours(1), + PooledConnectionLifetime = TimeSpan.FromHours(1), + SslOptions = { RemoteCertificateValidationCallback = delegate { return true; } } + }; + + private async Task PrepareConnection(Http2LoopbackServer server, HttpClient client, uint maxConcurrentStreams, int readTimeout = 3, int expectedWarpUpTasks = 1) + { + Task warmUpTask = client.GetAsync(server.Address); + Http2LoopbackConnection connection = await GetConnection(server, maxConcurrentStreams, readTimeout).TimeoutAfter(TestHelper.PassingTestTimeoutMilliseconds).ConfigureAwait(false); + // Wait until the client confirms MaxConcurrentStreams setting took into effect. + Task settingAckReceived = connection.SettingAckWaiter; + while (true) + { + Task handleRequestTask = HandleAllPendingRequests(connection, expectedWarpUpTasks); + await Task.WhenAll(warmUpTask, handleRequestTask).TimeoutAfter(TestHelper.PassingTestTimeoutMilliseconds).ConfigureAwait(false); + Assert.True(warmUpTask.Result.IsSuccessStatusCode); + warmUpTask.Result.Dispose(); + if (settingAckReceived.IsCompleted) + { + break; + } + + warmUpTask = client.GetAsync(server.Address); + } + return connection; + } + + private static void AcquireAllStreamSlots(Http2LoopbackServer server, HttpClient client, List> sendTasks, uint maxConcurrentStreams) + { + for (int i = 0; i < maxConcurrentStreams; i++) + { + sendTasks.Add(client.GetAsync(server.Address)); + } + } + + private static async Task GetConnection(Http2LoopbackServer server, uint maxConcurrentStreams, int readTimeout) => + await server.EstablishConnectionAsync(TimeSpan.FromSeconds(readTimeout), TimeSpan.FromSeconds(10), new SettingsEntry { SettingId = SettingId.MaxConcurrentStreams, Value = maxConcurrentStreams }).ConfigureAwait(false); + + private async Task<(int Count, int LastStreamId)> HandleAllPendingRequests(Http2LoopbackConnection connection, int totalRequestCount) + { + int streamId = -1; + for (int i = 0; i < totalRequestCount; i++) + { + try + { + // Exact number of requests sent over the given connection is unknown, + // so we keep reading headers and sending response while there are available requests. + streamId = await connection.ReadRequestHeaderAsync().ConfigureAwait(false); + await connection.SendDefaultResponseAsync(streamId).ConfigureAwait(false); + } + catch (OperationCanceledException) + { + return (i, streamId); + } + } + + return (totalRequestCount, streamId); + } + + private async Task> AcceptRequests(Http2LoopbackConnection connection, int maxRequests = int.MaxValue) + { + List streamIds = new List(); + for (int i = 0; i < maxRequests; i++) + { + try + { + streamIds.Add(await connection.ReadRequestHeaderAsync().ConfigureAwait(false)); + } + catch (OperationCanceledException) + { + return streamIds; + } + } + + return streamIds; + } + + private async Task SendResponses(Http2LoopbackConnection connection, IEnumerable streamIds) + { + int count = 0; + foreach (int streamId in streamIds) + { + count++; + await connection.SendDefaultResponseAsync(streamId).ConfigureAwait(false); + } + + return count; + } } [ConditionalClass(typeof(PlatformDetection), nameof(PlatformDetection.SupportsAlpn))]