HTTP/2 standard commands clients to not open more than one HTTP/2 connection to the same server. At the same time, server has right to limit the maximum number of active streams per that HTTP/2 connection. These two directives combined impose limit on the number of requests concurrently send to the server. This limitation is justified in client to server scenarios, but become a bottleneck in server to server cases like gRPC. This PR introduces a new SocketsHttpHandler API enabling establishing additional HTTP/2 connections to the same server when the maximum stream limit is reached on the existing ones.
**Note**. This algorithm version uses only retries to make request choose another connection when all stream slots are occupied. It does not implement stream credit management in `HttpConnectionPool` and therefore exhibit a sub-optimal request scheduling behavior in "request burst" and "infinite requests" scenarios.
Fixes #35088
private Stream _connectionStream;
private TaskCompletionSource<bool> _ignoredSettingsAckPromise;
private bool _ignoreWindowUpdates;
- public static TimeSpan Timeout => Http2LoopbackServer.Timeout;
+ private readonly TimeSpan _timeout;
private int _lastStreamId;
private readonly byte[] _prefix;
public string PrefixString => Encoding.UTF8.GetString(_prefix, 0, _prefix.Length);
public bool IsInvalid => _connectionSocket == null;
public Stream Stream => _connectionStream;
+ public Task<bool> SettingAckWaiter => _ignoredSettingsAckPromise?.Task;
public Http2LoopbackConnection(Socket socket, Http2Options httpOptions)
+ : this(socket, httpOptions, Http2LoopbackServer.Timeout)
+ {
+ }
+
+ public Http2LoopbackConnection(Socket socket, Http2Options httpOptions, TimeSpan timeout)
{
_connectionSocket = socket;
_connectionStream = new NetworkStream(_connectionSocket, true);
+ _timeout = timeout;
if (httpOptions.UseSsl)
{
await WriteFrameAsync(emptySettings).ConfigureAwait(false);
// Receive and ACK the client settings frame.
- Frame clientSettings = await ReadFrameAsync(Timeout).ConfigureAwait(false);
+ Frame clientSettings = await ReadFrameAsync(_timeout).ConfigureAwait(false);
clientSettings.Flags = clientSettings.Flags | FrameFlags.Ack;
await WriteFrameAsync(clientSettings).ConfigureAwait(false);
// Receive the client ACK of the server settings frame.
- clientSettings = await ReadFrameAsync(Timeout).ConfigureAwait(false);
+ clientSettings = await ReadFrameAsync(_timeout).ConfigureAwait(false);
}
public async Task WriteFrameAsync(Frame frame)
public async Task ReadRstStreamAsync(int streamId)
{
- Frame frame = await ReadFrameAsync(Timeout);
+ Frame frame = await ReadFrameAsync(_timeout);
if (frame == null)
{
{
IgnoreWindowUpdates();
- Frame frame = await ReadFrameAsync(Timeout).ConfigureAwait(false);
+ Frame frame = await ReadFrameAsync(_timeout).ConfigureAwait(false);
if (frame != null)
{
if (!ignoreUnexpectedFrames)
public async Task<HeadersFrame> ReadRequestHeaderFrameAsync()
{
// Receive HEADERS frame for request.
- Frame frame = await ReadFrameAsync(Timeout).ConfigureAwait(false);
+ Frame frame = await ReadFrameAsync(_timeout).ConfigureAwait(false);
if (frame == null)
{
throw new IOException("Failed to read Headers frame.");
do
{
- frame = await ReadFrameAsync(Timeout).ConfigureAwait(false);
+ frame = await ReadFrameAsync(_timeout).ConfigureAwait(false);
if (frame == null && expectEndOfStream)
{
break;
HttpRequestData requestData = new HttpRequestData();
// Receive HEADERS frame for request.
- Frame frame = await ReadFrameAsync(Timeout).ConfigureAwait(false);
+ Frame frame = await ReadFrameAsync(_timeout).ConfigureAwait(false);
if (frame == null)
{
throw new IOException("Failed to read Headers frame.");
byte[] pingData = new byte[8] { 1, 2, 3, 4, 50, 60, 70, 80 };
PingFrame ping = new PingFrame(pingData, FrameFlags.None, 0);
await WriteFrameAsync(ping).ConfigureAwait(false);
- PingFrame pingAck = (PingFrame)await ReadFrameAsync(Timeout).ConfigureAwait(false);
+ PingFrame pingAck = (PingFrame)await ReadFrameAsync(_timeout).ConfigureAwait(false);
if (pingAck == null || pingAck.Type != FrameType.Ping || !pingAck.AckFlag)
{
throw new Exception("Expected PING ACK");
_connections.RemoveAll((c) => c.IsInvalid);
}
- public async Task<Http2LoopbackConnection> AcceptConnectionAsync()
+ public Task<Http2LoopbackConnection> AcceptConnectionAsync()
+ {
+ return AcceptConnectionAsync(null);
+ }
+
+ public async Task<Http2LoopbackConnection> AcceptConnectionAsync(TimeSpan? timeout)
{
RemoveInvalidConnections();
Socket connectionSocket = await _listenSocket.AcceptAsync().ConfigureAwait(false);
- Http2LoopbackConnection connection = new Http2LoopbackConnection(connectionSocket, _options);
+ Http2LoopbackConnection connection = timeout != null ? new Http2LoopbackConnection(connectionSocket, _options, timeout.Value) : new Http2LoopbackConnection(connectionSocket, _options);
_connections.Add(connection);
return connection;
return await EstablishConnectionAsync();
}
- public async Task<Http2LoopbackConnection> EstablishConnectionAsync(params SettingsEntry[] settingsEntries)
+ public Task<Http2LoopbackConnection> EstablishConnectionAsync(params SettingsEntry[] settingsEntries)
{
- (Http2LoopbackConnection connection, _) = await EstablishConnectionGetSettingsAsync(settingsEntries).ConfigureAwait(false);
+ return EstablishConnectionAsync(null, null, settingsEntries);
+ }
+
+ public async Task<Http2LoopbackConnection> EstablishConnectionAsync(TimeSpan? timeout, TimeSpan? ackTimeout, params SettingsEntry[] settingsEntries)
+ {
+ (Http2LoopbackConnection connection, _) = await EstablishConnectionGetSettingsAsync(timeout, ackTimeout, settingsEntries).ConfigureAwait(false);
return connection;
}
- public async Task<(Http2LoopbackConnection, SettingsFrame)> EstablishConnectionGetSettingsAsync(params SettingsEntry[] settingsEntries)
+ public Task<(Http2LoopbackConnection, SettingsFrame)> EstablishConnectionGetSettingsAsync(params SettingsEntry[] settingsEntries)
+ {
+ return EstablishConnectionGetSettingsAsync(null, null, settingsEntries);
+ }
+
+ public async Task<(Http2LoopbackConnection, SettingsFrame)> EstablishConnectionGetSettingsAsync(TimeSpan? timeout, TimeSpan? ackTimeout, params SettingsEntry[] settingsEntries)
{
- Http2LoopbackConnection connection = await AcceptConnectionAsync().ConfigureAwait(false);
+ Http2LoopbackConnection connection = await AcceptConnectionAsync(timeout).ConfigureAwait(false);
// Receive the initial client settings frame.
Frame receivedFrame = await connection.ReadFrameAsync(Timeout).ConfigureAwait(false);
await connection.WriteFrameAsync(settingsAck).ConfigureAwait(false);
// The client will send us a SETTINGS ACK eventually, but not necessarily right away.
- await connection.ExpectSettingsAckAsync();
+ await connection.ExpectSettingsAckAsync((int) (ackTimeout?.TotalMilliseconds ?? 5000));
return (connection, clientSettingsFrame);
}
protected override void Dispose(bool disposing) { }
protected internal override System.Net.Http.HttpResponseMessage Send(System.Net.Http.HttpRequestMessage request, System.Threading.CancellationToken cancellationToken) { throw null; }
protected internal override System.Threading.Tasks.Task<System.Net.Http.HttpResponseMessage> SendAsync(System.Net.Http.HttpRequestMessage request, System.Threading.CancellationToken cancellationToken) { throw null; }
+ public bool EnableMultipleHttp2Connections { get { throw null; } set { } }
}
public partial class StreamContent : System.Net.Http.HttpContent
{
protected internal override Task<HttpResponseMessage> SendAsync(
HttpRequestMessage request, CancellationToken cancellationToken) => throw new PlatformNotSupportedException();
+
+ public bool EnableMultipleHttp2Connections
+ {
+ get => throw new PlatformNotSupportedException();
+ set => throw new PlatformNotSupportedException();
+ }
}
}
/// <summary>
/// The proxy failed, so the request should be retried on the next proxy.
/// </summary>
- RetryOnNextProxy
+ RetryOnNextProxy,
+
+ /// The HTTP/2 connection reached the maximum number of streams and
+ /// another HTTP/2 connection must be created or found to serve the request.
+ RetryOnNextConnection
}
}
_current = initialCredit;
}
+ public bool IsCreditAvailable => Volatile.Read(ref _current) > 0;
+
private object SyncObject
{
// Generally locking on "this" is considered poor form, but this type is internal,
get => this;
}
- public ValueTask<int> RequestCreditAsync(int amount, CancellationToken cancellationToken)
+ public bool TryRequestCreditNoWait(int amount)
{
lock (SyncObject)
{
- if (_disposed)
- {
- throw new ObjectDisposedException($"{nameof(CreditManager)}:{_owner.GetType().Name}:{_name}");
- }
+ return TryRequestCreditNoLock(amount) > 0;
+ }
+ }
+ public ValueTask<int> RequestCreditAsync(int amount, CancellationToken cancellationToken)
+ {
+ lock (SyncObject)
+ {
// If we can satisfy the request with credit already available, do so synchronously.
- if (_current > 0)
- {
- Debug.Assert(_waitersTail is null, "Shouldn't have waiters when credit is available");
+ int granted = TryRequestCreditNoLock(amount);
- int granted = Math.Min(amount, _current);
- if (NetEventSource.Log.IsEnabled()) _owner.Trace($"{_name}. requested={amount}, current={_current}, granted={granted}");
- _current -= granted;
+ if (granted > 0)
+ {
return new ValueTask<int>(granted);
}
}
}
}
+
+ private int TryRequestCreditNoLock(int amount)
+ {
+ Debug.Assert(Monitor.IsEntered(SyncObject), "Shouldn't be called outside lock.");
+
+ if (_disposed)
+ {
+ throw new ObjectDisposedException($"{nameof(CreditManager)}:{_owner.GetType().Name}:{_name}");
+ }
+
+ if (_current > 0)
+ {
+ Debug.Assert(_waitersTail is null, "Shouldn't have waiters when credit is available");
+
+ int granted = Math.Min(amount, _current);
+ if (NetEventSource.Log.IsEnabled()) _owner.Trace($"{_name}. requested={amount}, current={_current}, granted={granted}");
+ _current -= granted;
+ return granted;
+ }
+ return 0;
+ }
}
}
_initialWindowSize = DefaultInitialWindowSize;
_maxConcurrentStreams = int.MaxValue;
_pendingWindowUpdate = 0;
+ _idleSinceTickCount = Environment.TickCount64;
if (NetEventSource.Log.IsEnabled()) TraceConnection(stream);
}
private object SyncObject => _httpStreams;
+ public bool CanAddNewStream => _concurrentStreams.IsCreditAvailable;
+
public async ValueTask SetupAsync()
{
_outgoingBuffer.EnsureAvailableSpace(s_http2ConnectionPreface.Length +
// in order to avoid consuming resources in potentially many requests waiting for access.
try
{
- await _concurrentStreams.RequestCreditAsync(1, cancellationToken).ConfigureAwait(false);
+ if (_pool.EnableMultipleHttp2Connections)
+ {
+ if (!_concurrentStreams.TryRequestCreditNoWait(1))
+ {
+ throw new HttpRequestException(null, null, RequestRetryType.RetryOnNextConnection);
+ }
+ }
+ else
+ {
+ await _concurrentStreams.RequestCreditAsync(1, cancellationToken).ConfigureAwait(false);
+ }
}
catch (ObjectDisposedException)
{
private readonly int _maxConnections;
private bool _http2Enabled;
- private Http2Connection? _http2Connection;
+ // This array must be treated as immutable. It can only be replaced with a new value in AddHttp2Connection method.
+ private volatile Http2Connection[]? _http2Connections;
private SemaphoreSlim? _http2ConnectionCreateLock;
private byte[]? _http2AltSvcOriginUri;
internal readonly byte[]? _http2EncodedAuthorityHostHeader;
}
}
+ public bool EnableMultipleHttp2Connections => _poolManager.Settings.EnableMultipleHttp2Connections;
+
/// <summary>Object used to synchronize access to state in the pool.</summary>
private object SyncObj => _idleConnections;
Debug.Assert(_kind == HttpConnectionKind.Https || _kind == HttpConnectionKind.SslProxyTunnel || _kind == HttpConnectionKind.Http);
// See if we have an HTTP2 connection
- Http2Connection? http2Connection = _http2Connection;
+ Http2Connection? http2Connection = GetExistingHttp2Connection();
if (http2Connection != null)
{
- TimeSpan pooledConnectionLifetime = _poolManager.Settings._pooledConnectionLifetime;
- if (http2Connection.LifetimeExpired(Environment.TickCount64, pooledConnectionLifetime))
- {
- // Connection expired.
- http2Connection.Dispose();
- InvalidateHttp2Connection(http2Connection);
- }
- else
- {
- // Connection exist and it is still good to use.
- if (NetEventSource.Log.IsEnabled()) Trace("Using existing HTTP2 connection.");
- _usedSinceLastCleanup = true;
- return (http2Connection, false, null);
- }
+ // Connection exists and it is still good to use.
+ if (NetEventSource.Log.IsEnabled()) Trace("Using existing HTTP2 connection.");
+ _usedSinceLastCleanup = true;
+ return (http2Connection, false, null);
}
// Ensure that the connection creation semaphore is created
await _http2ConnectionCreateLock.WaitAsync(cancellationToken).ConfigureAwait(false);
try
{
- if (_http2Connection != null)
+ http2Connection = GetExistingHttp2Connection();
+ if (http2Connection != null)
{
- // Someone beat us to it
-
- if (NetEventSource.Log.IsEnabled())
- {
- Trace("Using existing HTTP2 connection.");
- }
-
- return (_http2Connection, false, null);
+ return (http2Connection, false, null);
}
// Recheck if HTTP2 has been disabled by a previous attempt.
http2Connection = new Http2Connection(this, stream!);
await http2Connection.SetupAsync().ConfigureAwait(false);
- Debug.Assert(_http2Connection == null);
- _http2Connection = http2Connection;
+ AddHttp2Connection(http2Connection);
if (NetEventSource.Log.IsEnabled())
{
Trace("New unencrypted HTTP2 connection established.");
}
- return (_http2Connection, true, null);
+ return (http2Connection, true, null);
}
sslStream = (SslStream)stream!;
http2Connection = new Http2Connection(this, sslStream);
await http2Connection.SetupAsync().ConfigureAwait(false);
- Debug.Assert(_http2Connection == null);
- _http2Connection = http2Connection;
+ AddHttp2Connection(http2Connection);
if (NetEventSource.Log.IsEnabled())
{
Trace("New HTTP2 connection established.");
}
- return (_http2Connection, true, null);
+ return (http2Connection, true, null);
}
}
}
return await GetHttpConnectionAsync(request, async, cancellationToken).ConfigureAwait(false);
}
+ private Http2Connection? GetExistingHttp2Connection()
+ {
+ Http2Connection[]? localConnections = _http2Connections;
+
+ if (localConnections == null)
+ {
+ return null;
+ }
+
+ for (int i = 0; i < localConnections.Length; i++)
+ {
+ Http2Connection http2Connection = localConnections[i];
+
+ TimeSpan pooledConnectionLifetime = _poolManager.Settings._pooledConnectionLifetime;
+ if (http2Connection.LifetimeExpired(Environment.TickCount64, pooledConnectionLifetime))
+ {
+ // Connection expired.
+ http2Connection.Dispose();
+ InvalidateHttp2Connection(http2Connection);
+ }
+ else if (!EnableMultipleHttp2Connections || http2Connection.CanAddNewStream)
+ {
+ return http2Connection;
+ }
+ }
+
+ return null;
+ }
+
+ private void AddHttp2Connection(Http2Connection newConnection)
+ {
+ lock (SyncObj)
+ {
+ Http2Connection[]? localHttp2Connections = _http2Connections;
+ int newCollectionSize = localHttp2Connections == null ? 1 : localHttp2Connections.Length + 1;
+ Http2Connection[] newHttp2Connections = new Http2Connection[newCollectionSize];
+ newHttp2Connections[0] = newConnection;
+
+ if (localHttp2Connections != null)
+ {
+ Array.Copy(localHttp2Connections, 0, newHttp2Connections, 1, localHttp2Connections.Length);
+ }
+
+ _http2Connections = newHttp2Connections;
+ }
+ }
+
private async ValueTask<(HttpConnectionBase? connection, bool isNewConnection, HttpResponseMessage? failureResponse)>
GetHttp3ConnectionAsync(HttpRequestMessage request, HttpAuthority authority, CancellationToken cancellationToken)
{
// Eat exception and try again.
continue;
}
+ catch (HttpRequestException e) when (e.AllowRetry == RequestRetryType.RetryOnNextConnection)
+ {
+ if (NetEventSource.Log.IsEnabled())
+ {
+ Trace($"Retrying request on another HTTP/2 connection after active streams limit is reached on existing one: {e}");
+ }
+
+ // Eat exception and try again.
+ continue;
+ }
// Check for the Alt-Svc header, to upgrade to HTTP/3.
if (_altSvcEnabled && response.Headers.TryGetValues(KnownHeaders.AltSvc.Descriptor, out IEnumerable<string>? altSvcHeaderValues))
{
lock (SyncObj)
{
- if (_http2Connection == connection)
+ Http2Connection[]? localHttp2Connections = _http2Connections;
+
+ if (localHttp2Connections == null)
+ {
+ return;
+ }
+
+ if (localHttp2Connections.Length == 1)
+ {
+ // Fast shortcut for the most common case.
+ if (localHttp2Connections[0] == connection)
+ {
+ _http2Connections = null;
+ }
+ return;
+ }
+
+ int invalidatedIndex = Array.IndexOf(localHttp2Connections, connection);
+ if (invalidatedIndex >= 0)
{
- _http2Connection = null;
+ Http2Connection[] newHttp2Connections = new Http2Connection[localHttp2Connections.Length - 1];
+
+ if (invalidatedIndex > 0)
+ {
+ Array.Copy(localHttp2Connections, newHttp2Connections, invalidatedIndex);
+ }
+
+ if (invalidatedIndex < localHttp2Connections.Length - 1)
+ {
+ Array.Copy(localHttp2Connections, invalidatedIndex + 1, newHttp2Connections, invalidatedIndex, newHttp2Connections.Length - invalidatedIndex);
+ }
+
+ _http2Connections = newHttp2Connections;
}
}
}
list.ForEach(c => c._connection.Dispose());
list.Clear();
- if (_http2Connection != null)
+ if (_http2Connections != null)
{
- _http2Connection.Dispose();
- _http2Connection = null;
+ for (int i = 0; i < _http2Connections.Length; i++)
+ {
+ _http2Connections[i].Dispose();
+ }
+ _http2Connections = null;
}
if (_authorityExpireTimer != null)
// Get the current time. This is compared against each connection's last returned
// time to determine whether a connection is too old and should be closed.
long nowTicks = Environment.TickCount64;
- Http2Connection? http2Connection = _http2Connection;
+ // Copy the reference to a local variable to simplify the removal logic below.
+ Http2Connection[]? localHttp2Connections = _http2Connections;
- if (http2Connection != null)
+ if (localHttp2Connections != null)
{
- if (http2Connection.IsExpired(nowTicks, pooledConnectionLifetime, pooledConnectionIdleTimeout))
+ Http2Connection[]? newHttp2Connections = null;
+ int newIndex = 0;
+ for (int i = 0; i < localHttp2Connections.Length; i++)
+ {
+ Http2Connection http2Connection = localHttp2Connections[i];
+ if (http2Connection.IsExpired(nowTicks, pooledConnectionLifetime, pooledConnectionIdleTimeout))
+ {
+ http2Connection.Dispose();
+
+ if (newHttp2Connections == null)
+ {
+ newHttp2Connections = new Http2Connection[localHttp2Connections.Length];
+ if (i > 0)
+ {
+ // Copy valid connections residing at the beggining of the current collection.
+ Array.Copy(localHttp2Connections, newHttp2Connections, i);
+ newIndex = i;
+ }
+ }
+ }
+ else if (newHttp2Connections != null)
+ {
+ newHttp2Connections[newIndex] = localHttp2Connections[i];
+ newIndex++;
+ }
+ }
+
+ if (newHttp2Connections != null)
{
- http2Connection.Dispose();
- // We can set _http2Connection directly while holding lock instead of calling InvalidateHttp2Connection().
- _http2Connection = null;
+ //Some connections have been removed, so _http2Connections must be replaced.
+ if (newIndex > 0)
+ {
+ Array.Resize(ref newHttp2Connections, newIndex);
+ _http2Connections = newHttp2Connections;
+ }
+ else
+ {
+ // All connections expired.
+ _http2Connections = null;
+ }
}
}
// if a pool was used since the last time we cleaned up, give it another chance. New pools
// start out saying they've recently been used, to give them a bit of breathing room and time
// for the initial collection to be added to it.
- if (_associatedConnectionCount == 0 && !_usedSinceLastCleanup && _http2Connection == null)
+ if (_associatedConnectionCount == 0 && !_usedSinceLastCleanup && _http2Connections == null)
{
Debug.Assert(list.Count == 0, $"Expected {nameof(list)}.{nameof(list.Count)} == 0");
_disposed = true;
internal SslClientAuthenticationOptions? _sslOptions;
+ internal bool _enableMultipleHttp2Connections;
+
internal IDictionary<string, object?>? _properties;
public HttpConnectionSettings()
_useCookies = _useCookies,
_useProxy = _useProxy,
_allowUnencryptedHttp2 = _allowUnencryptedHttp2,
- _assumePrenegotiatedHttp3ForTesting = _assumePrenegotiatedHttp3ForTesting
+ _assumePrenegotiatedHttp3ForTesting = _assumePrenegotiatedHttp3ForTesting,
+ _enableMultipleHttp2Connections = _enableMultipleHttp2Connections
};
}
}
}
+ public bool EnableMultipleHttp2Connections => _enableMultipleHttp2Connections;
+
private byte[]? _http3SettingsFrame;
internal byte[] Http3SettingsFrame => _http3SettingsFrame ??= Http3Connection.BuildSettingsFrame(this);
}
}
}
+ public bool EnableMultipleHttp2Connections
+ {
+ get => _settings._enableMultipleHttp2Connections;
+ set
+ {
+ CheckDisposedOrStarted();
+
+ _settings._enableMultipleHttp2Connections = value;
+ }
+ }
+
internal bool SupportsAutomaticDecompression => true;
internal bool SupportsProxy => true;
internal bool SupportsRedirectConfiguration => true;
public sealed class SocketsHttpHandlerTest_Http2 : HttpClientHandlerTest_Http2
{
public SocketsHttpHandlerTest_Http2(ITestOutputHelper output) : base(output) { }
+
+ [ConditionalFact(nameof(SupportsAlpn))]
+ public async Task Http2_MultipleConnectionsEnabled_ConnectionLimitNotReached_ConcurrentRequestsSuccessfullyHandled()
+ {
+ const int MaxConcurrentStreams = 2;
+ using Http2LoopbackServer server = Http2LoopbackServer.CreateServer();
+ using SocketsHttpHandler handler = CreateHandler();
+ using (HttpClient client = CreateHttpClient(handler))
+ {
+ server.AllowMultipleConnections = true;
+ List<Task<HttpResponseMessage>> sendTasks = new List<Task<HttpResponseMessage>>();
+ List<Http2LoopbackConnection> connections = new List<Http2LoopbackConnection>();
+ List<int> acceptedStreams = new List<int>();
+ for (int i = 0; i < 3; i++)
+ {
+ Http2LoopbackConnection connection = await PrepareConnection(server, client, MaxConcurrentStreams).ConfigureAwait(false);
+ AcquireAllStreamSlots(server, client, sendTasks, MaxConcurrentStreams);
+ connections.Add(connection);
+ int prevAcceptedStreamCount = acceptedStreams.Count;
+ acceptedStreams.AddRange(await AcceptRequests(connection, MaxConcurrentStreams).ConfigureAwait(false));
+ Assert.Equal(prevAcceptedStreamCount + MaxConcurrentStreams, acceptedStreams.Count);
+ }
+
+ int responseIndex = 0;
+ List<Task> responseTasks = new List<Task>();
+ foreach (Http2LoopbackConnection connection in connections)
+ {
+ for (int i = 0; i < MaxConcurrentStreams; i++)
+ {
+ int streamId = acceptedStreams[responseIndex++];
+ responseTasks.Add(connection.SendDefaultResponseAsync(streamId));
+ }
+ }
+
+ await Task.WhenAll(responseTasks).TimeoutAfter(TestHelper.PassingTestTimeoutMilliseconds).ConfigureAwait(false);
+ await Task.WhenAll(sendTasks).TimeoutAfter(TestHelper.PassingTestTimeoutMilliseconds).ConfigureAwait(false);
+
+ await VerifySendTasks(sendTasks).ConfigureAwait(false);
+ }
+ }
+
+ [ConditionalFact(nameof(SupportsAlpn))]
+ public async Task Http2_MultipleConnectionsEnabled_InfiniteRequestsCompletelyBlockOneConnection_RemaningRequestsAreHandledByNewConnection()
+ {
+ const int MaxConcurrentStreams = 2;
+ using Http2LoopbackServer server = Http2LoopbackServer.CreateServer();
+ using SocketsHttpHandler handler = CreateHandler();
+ using (HttpClient client = CreateHttpClient(handler))
+ {
+ server.AllowMultipleConnections = true;
+ List<Task<HttpResponseMessage>> sendTasks = new List<Task<HttpResponseMessage>>();
+ Http2LoopbackConnection connection0 = await PrepareConnection(server, client, MaxConcurrentStreams).ConfigureAwait(false);
+ AcquireAllStreamSlots(server, client, sendTasks, MaxConcurrentStreams);
+
+ // Block the first connection on infinite requests.
+ List<int> blockedStreamIds = await AcceptRequests(connection0, MaxConcurrentStreams).ConfigureAwait(false);
+ Assert.Equal(MaxConcurrentStreams, blockedStreamIds.Count);
+
+ Http2LoopbackConnection connection1 = await PrepareConnection(server, client, MaxConcurrentStreams).ConfigureAwait(false);
+ AcquireAllStreamSlots(server, client, sendTasks, MaxConcurrentStreams);
+
+ int handledRequestCount = (await HandleAllPendingRequests(connection1, MaxConcurrentStreams).ConfigureAwait(false)).Count;
+
+ Assert.Equal(MaxConcurrentStreams, handledRequestCount);
+
+ //Complete inifinite requests.
+ handledRequestCount = await SendResponses(connection0, blockedStreamIds);
+
+ Assert.Equal(MaxConcurrentStreams, handledRequestCount);
+
+ await Task.WhenAll(sendTasks).TimeoutAfter(TestHelper.PassingTestTimeoutMilliseconds).ConfigureAwait(false);
+
+ await VerifySendTasks(sendTasks).ConfigureAwait(false);
+ }
+ }
+
+ [ConditionalFact(nameof(SupportsAlpn))]
+ public async Task Http2_MultipleConnectionsEnabled_OpenAndCloseMultipleConnections_Success()
+ {
+ const int MaxConcurrentStreams = 2;
+ using Http2LoopbackServer server = Http2LoopbackServer.CreateServer();
+ using SocketsHttpHandler handler = CreateHandler();
+ using (HttpClient client = CreateHttpClient(handler))
+ {
+ server.AllowMultipleConnections = true;
+ List<Task<HttpResponseMessage>> sendTasks = new List<Task<HttpResponseMessage>>();
+ Http2LoopbackConnection connection0 = await PrepareConnection(server, client, MaxConcurrentStreams).ConfigureAwait(false);
+ AcquireAllStreamSlots(server, client, sendTasks, MaxConcurrentStreams);
+ Http2LoopbackConnection connection1 = await PrepareConnection(server, client, MaxConcurrentStreams).ConfigureAwait(false);
+ AcquireAllStreamSlots(server, client, sendTasks, MaxConcurrentStreams);
+ Http2LoopbackConnection connection2 = await PrepareConnection(server, client, MaxConcurrentStreams).ConfigureAwait(false);
+ AcquireAllStreamSlots(server, client, sendTasks, MaxConcurrentStreams);
+
+ Task<(int Count, int LastStreamId)>[] handleRequestTasks = new[] {
+ HandleAllPendingRequests(connection0, sendTasks.Count),
+ HandleAllPendingRequests(connection1, sendTasks.Count),
+ HandleAllPendingRequests(connection2, sendTasks.Count)
+ };
+
+ await Task.WhenAll(handleRequestTasks).TimeoutAfter(TestHelper.PassingTestTimeoutMilliseconds).ConfigureAwait(false);
+
+ Assert.Equal(handleRequestTasks[0].Result.Count, MaxConcurrentStreams);
+ Assert.Equal(handleRequestTasks[1].Result.Count, MaxConcurrentStreams);
+ Assert.Equal(handleRequestTasks[2].Result.Count, MaxConcurrentStreams);
+
+ await connection0.ShutdownIgnoringErrorsAsync(handleRequestTasks[0].Result.LastStreamId).ConfigureAwait(false);
+ await connection2.ShutdownIgnoringErrorsAsync(handleRequestTasks[2].Result.LastStreamId).ConfigureAwait(false);
+
+ //Fill all connection1's stream slots
+ AcquireAllStreamSlots(server, client, sendTasks, MaxConcurrentStreams);
+
+ Http2LoopbackConnection connection3 = await PrepareConnection(server, client, MaxConcurrentStreams).ConfigureAwait(false);
+ AcquireAllStreamSlots(server, client, sendTasks, MaxConcurrentStreams);
+ Http2LoopbackConnection connection4 = await PrepareConnection(server, client, MaxConcurrentStreams).ConfigureAwait(false);
+ AcquireAllStreamSlots(server, client, sendTasks, MaxConcurrentStreams);
+
+ Task<(int Count, int LastStreamId)>[] finalHandleTasks = new[] {
+ HandleAllPendingRequests(connection1, sendTasks.Count),
+ HandleAllPendingRequests(connection3, sendTasks.Count),
+ HandleAllPendingRequests(connection4, sendTasks.Count)
+ };
+
+ await Task.WhenAll(finalHandleTasks).TimeoutAfter(TestHelper.PassingTestTimeoutMilliseconds).ConfigureAwait(false);
+
+ Assert.Equal(finalHandleTasks[0].Result.Count, MaxConcurrentStreams);
+ Assert.Equal(finalHandleTasks[1].Result.Count, MaxConcurrentStreams);
+ Assert.Equal(finalHandleTasks[2].Result.Count, MaxConcurrentStreams);
+
+ await Task.WhenAll(sendTasks).TimeoutAfter(TestHelper.PassingTestTimeoutMilliseconds).ConfigureAwait(false);
+
+ await VerifySendTasks(sendTasks).ConfigureAwait(false);
+ }
+ }
+
+ [ConditionalFact(nameof(SupportsAlpn))]
+ public async Task Http2_MultipleConnectionsEnabled_IdleConnectionTimeoutExpired_ConnectionRemovedAndNewCreated()
+ {
+ const int MaxConcurrentStreams = 2;
+ using Http2LoopbackServer server = Http2LoopbackServer.CreateServer();
+ using SocketsHttpHandler handler = CreateHandler();
+ handler.PooledConnectionIdleTimeout = TimeSpan.FromSeconds(5);
+ using (HttpClient client = CreateHttpClient(handler))
+ {
+ server.AllowMultipleConnections = true;
+ List<Task<HttpResponseMessage>> sendTasks = new List<Task<HttpResponseMessage>>();
+ Http2LoopbackConnection connection0 = await PrepareConnection(server, client, MaxConcurrentStreams).ConfigureAwait(false);
+ AcquireAllStreamSlots(server, client, sendTasks, MaxConcurrentStreams);
+ List<int> acceptedStreamIds = await AcceptRequests(connection0, MaxConcurrentStreams).ConfigureAwait(false);
+ Assert.Equal(MaxConcurrentStreams, acceptedStreamIds.Count);
+
+ List<Task<HttpResponseMessage>> connection1SendTasks = new List<Task<HttpResponseMessage>>();
+ Http2LoopbackConnection connection1 = await PrepareConnection(server, client, MaxConcurrentStreams, readTimeout: 10).ConfigureAwait(false);
+ AcquireAllStreamSlots(server, client, connection1SendTasks, MaxConcurrentStreams);
+ int handledRequests1 = (await HandleAllPendingRequests(connection1, MaxConcurrentStreams).ConfigureAwait(false)).Count;
+
+ Assert.Equal(MaxConcurrentStreams, handledRequests1);
+
+ // Complete all the requests.
+ await Task.WhenAll(connection1SendTasks).TimeoutAfter(TestHelper.PassingTestTimeoutMilliseconds).ConfigureAwait(false);
+ await VerifySendTasks(connection1SendTasks).ConfigureAwait(false);
+ connection1SendTasks.ForEach(t => t.Result.Dispose());
+
+ // Wait until the idle connection timeout expires.
+ await connection1.WaitForClientDisconnectAsync(false).TimeoutAfter(TestHelper.PassingTestTimeoutMilliseconds).ConfigureAwait(false);
+ // Client connection might be still alive, so send an extra request which will either land on the shutting down connection or on a new one.
+ try
+ {
+ await client.GetAsync(server.Address).TimeoutAfter(handler.PooledConnectionIdleTimeout).ConfigureAwait(false);
+ }
+ catch (Exception)
+ {
+ // Suppress all exceptions.
+ }
+
+ Assert.True(connection1.IsInvalid);
+ Assert.False(connection0.IsInvalid);
+
+ Http2LoopbackConnection connection2 = await PrepareConnection(server, client, MaxConcurrentStreams, readTimeout: 5, expectedWarpUpTasks:2).ConfigureAwait(false);
+
+ AcquireAllStreamSlots(server, client, sendTasks, MaxConcurrentStreams);
+
+ int handledRequests2 = (await HandleAllPendingRequests(connection2, MaxConcurrentStreams).ConfigureAwait(false)).Count;
+ Assert.Equal(MaxConcurrentStreams, handledRequests2);
+
+ //Make sure connection0 is still alive.
+ int handledRequests0 = await SendResponses(connection0, acceptedStreamIds).ConfigureAwait(false);
+ Assert.Equal(MaxConcurrentStreams, handledRequests0);
+
+ await Task.WhenAll(sendTasks).TimeoutAfter(TestHelper.PassingTestTimeoutMilliseconds).ConfigureAwait(false);
+
+ await VerifySendTasks(sendTasks).ConfigureAwait(false);
+ }
+ }
+
+ private async Task VerifySendTasks(IReadOnlyList<Task<HttpResponseMessage>> sendTasks)
+ {
+ foreach (Task<HttpResponseMessage> sendTask in sendTasks)
+ {
+ HttpResponseMessage response = await sendTask.ConfigureAwait(false);
+ Assert.Equal(HttpStatusCode.OK, response.StatusCode);
+ }
+ }
+
+ private static SocketsHttpHandler CreateHandler() => new SocketsHttpHandler
+ {
+ EnableMultipleHttp2Connections = true,
+ PooledConnectionIdleTimeout = TimeSpan.FromHours(1),
+ PooledConnectionLifetime = TimeSpan.FromHours(1),
+ SslOptions = { RemoteCertificateValidationCallback = delegate { return true; } }
+ };
+
+ private async Task<Http2LoopbackConnection> PrepareConnection(Http2LoopbackServer server, HttpClient client, uint maxConcurrentStreams, int readTimeout = 3, int expectedWarpUpTasks = 1)
+ {
+ Task<HttpResponseMessage> warmUpTask = client.GetAsync(server.Address);
+ Http2LoopbackConnection connection = await GetConnection(server, maxConcurrentStreams, readTimeout).TimeoutAfter(TestHelper.PassingTestTimeoutMilliseconds).ConfigureAwait(false);
+ // Wait until the client confirms MaxConcurrentStreams setting took into effect.
+ Task settingAckReceived = connection.SettingAckWaiter;
+ while (true)
+ {
+ Task handleRequestTask = HandleAllPendingRequests(connection, expectedWarpUpTasks);
+ await Task.WhenAll(warmUpTask, handleRequestTask).TimeoutAfter(TestHelper.PassingTestTimeoutMilliseconds).ConfigureAwait(false);
+ Assert.True(warmUpTask.Result.IsSuccessStatusCode);
+ warmUpTask.Result.Dispose();
+ if (settingAckReceived.IsCompleted)
+ {
+ break;
+ }
+
+ warmUpTask = client.GetAsync(server.Address);
+ }
+ return connection;
+ }
+
+ private static void AcquireAllStreamSlots(Http2LoopbackServer server, HttpClient client, List<Task<HttpResponseMessage>> sendTasks, uint maxConcurrentStreams)
+ {
+ for (int i = 0; i < maxConcurrentStreams; i++)
+ {
+ sendTasks.Add(client.GetAsync(server.Address));
+ }
+ }
+
+ private static async Task<Http2LoopbackConnection> GetConnection(Http2LoopbackServer server, uint maxConcurrentStreams, int readTimeout) =>
+ await server.EstablishConnectionAsync(TimeSpan.FromSeconds(readTimeout), TimeSpan.FromSeconds(10), new SettingsEntry { SettingId = SettingId.MaxConcurrentStreams, Value = maxConcurrentStreams }).ConfigureAwait(false);
+
+ private async Task<(int Count, int LastStreamId)> HandleAllPendingRequests(Http2LoopbackConnection connection, int totalRequestCount)
+ {
+ int streamId = -1;
+ for (int i = 0; i < totalRequestCount; i++)
+ {
+ try
+ {
+ // Exact number of requests sent over the given connection is unknown,
+ // so we keep reading headers and sending response while there are available requests.
+ streamId = await connection.ReadRequestHeaderAsync().ConfigureAwait(false);
+ await connection.SendDefaultResponseAsync(streamId).ConfigureAwait(false);
+ }
+ catch (OperationCanceledException)
+ {
+ return (i, streamId);
+ }
+ }
+
+ return (totalRequestCount, streamId);
+ }
+
+ private async Task<List<int>> AcceptRequests(Http2LoopbackConnection connection, int maxRequests = int.MaxValue)
+ {
+ List<int> streamIds = new List<int>();
+ for (int i = 0; i < maxRequests; i++)
+ {
+ try
+ {
+ streamIds.Add(await connection.ReadRequestHeaderAsync().ConfigureAwait(false));
+ }
+ catch (OperationCanceledException)
+ {
+ return streamIds;
+ }
+ }
+
+ return streamIds;
+ }
+
+ private async Task<int> SendResponses(Http2LoopbackConnection connection, IEnumerable<int> streamIds)
+ {
+ int count = 0;
+ foreach (int streamId in streamIds)
+ {
+ count++;
+ await connection.SendDefaultResponseAsync(streamId).ConfigureAwait(false);
+ }
+
+ return count;
+ }
}
[ConditionalClass(typeof(PlatformDetection), nameof(PlatformDetection.SupportsAlpn))]