From 4b8c5fe2b4757f77f5699d49a072fcb123d2e33b Mon Sep 17 00:00:00 2001 From: Cory Nelson Date: Tue, 28 Jul 2020 08:06:57 -0700 Subject: [PATCH] Initial (partially-reviewed API) System.Net.Connections. (#39524) --- .../tests/System/Net/Http/GenericLoopbackServer.cs | 6 + .../System/Net/Http/Http2LoopbackConnection.cs | 79 ++++-- .../tests/System/Net/Http/Http2LoopbackServer.cs | 49 ++-- .../System/Net/Http/Http3LoopbackConnection.cs | 6 + .../tests/System/Net/Http/Http3LoopbackServer.cs | 9 + .../Common/tests/System/Net/Http/LoopbackServer.cs | 14 +- .../System/Net/VirtualNetwork/VirtualNetwork.cs | 38 ++- .../VirtualNetworkConnectionListenerFactory.cs | 187 +++++++++++++++ .../Net/VirtualNetwork/VirtualNetworkStream.cs | 28 ++- .../Threading/Tasks/TaskTimeoutExtensions.cs | 14 ++ src/libraries/NetCoreAppLibrary.props | 2 + .../pkg/System.IO.Pipelines.pkgproj | 1 + .../ref/System.IO.Pipelines.csproj | 9 +- .../System.Net.Connections/Directory.Build.props | 6 + .../System.Net.Connections.sln | 50 ++++ .../ref/System.Net.Connections.cs | 74 ++++++ .../ref/System.Net.Connections.csproj | 14 ++ .../src/Resources/Strings.resx | 138 +++++++++++ .../src/System.Net.Connections.csproj | 26 ++ .../src/System/Net/Connections/Connection.cs | 265 +++++++++++++++++++++ .../src/System/Net/Connections/ConnectionBase.cs | 80 +++++++ .../Net/Connections/ConnectionCloseMethod.cs | 26 ++ .../System/Net/Connections/ConnectionExtensions.cs | 87 +++++++ .../System/Net/Connections/ConnectionFactory.cs | 53 +++++ .../System/Net/Connections/ConnectionListener.cs | 62 +++++ .../Net/Connections/ConnectionListenerFactory.cs | 46 ++++ .../src/System/Net/Connections/DuplexPipeStream.cs | 166 +++++++++++++ .../Net/Connections/IConnectionProperties.cs | 21 ++ .../ConnectionBaseTest.cs | 81 +++++++ .../System.Net.Connections.Tests/ConnectionTest.cs | 264 ++++++++++++++++++++ .../ConnectionWithoutStreamOrPipe.cs | 22 ++ .../System.Net.Connections.Tests/MockConnection.cs | 33 +++ .../tests/System.Net.Connections.Tests/MockPipe.cs | 21 ++ .../System.Net.Connections.Tests/MockPipeReader.cs | 44 ++++ .../System.Net.Connections.Tests/MockPipeWriter.cs | 40 ++++ .../System.Net.Connections.Tests/MockStream.cs | 107 +++++++++ .../System.Net.Connections.Tests.csproj | 19 ++ .../System.Net.Http/ref/System.Net.Http.cs | 11 + .../System.Net.Http/ref/System.Net.Http.csproj | 2 + .../System.Net.Http/src/System.Net.Http.csproj | 7 + .../SocketsHttpConnectionFactory.cs | 19 ++ .../Http/BrowserHttpHandler/SocketsHttpHandler.cs | 13 + .../Net/Http/SocketsHttpHandler/ConnectHelper.cs | 99 +------- .../Connections/SocketConnection.cs | 101 ++++++++ .../Connections/TaskSocketAsyncEventArgs.cs | 32 +++ .../DnsEndPointWithProperties.cs | 31 +++ .../Net/Http/SocketsHttpHandler/Http2Connection.cs | 11 +- .../Net/Http/SocketsHttpHandler/HttpConnection.cs | 16 +- .../Http/SocketsHttpHandler/HttpConnectionPool.cs | 95 ++++++-- .../SocketsHttpHandler/HttpConnectionSettings.cs | 10 +- .../SocketsHttpConnectionFactory.cs | 89 +++++++ .../Http/SocketsHttpHandler/SocketsHttpHandler.cs | 28 +++ .../FunctionalTests/SocketsHttpHandlerTest.cs | 66 +++++ .../System.Net.Http.Functional.Tests.csproj | 3 + src/libraries/pkg/baseline/packageIndex.json | 4 +- 55 files changed, 2638 insertions(+), 186 deletions(-) create mode 100644 src/libraries/Common/tests/System/Net/VirtualNetwork/VirtualNetworkConnectionListenerFactory.cs create mode 100644 src/libraries/System.Net.Connections/Directory.Build.props create mode 100644 src/libraries/System.Net.Connections/System.Net.Connections.sln create mode 100644 src/libraries/System.Net.Connections/ref/System.Net.Connections.cs create mode 100644 src/libraries/System.Net.Connections/ref/System.Net.Connections.csproj create mode 100644 src/libraries/System.Net.Connections/src/Resources/Strings.resx create mode 100644 src/libraries/System.Net.Connections/src/System.Net.Connections.csproj create mode 100644 src/libraries/System.Net.Connections/src/System/Net/Connections/Connection.cs create mode 100644 src/libraries/System.Net.Connections/src/System/Net/Connections/ConnectionBase.cs create mode 100644 src/libraries/System.Net.Connections/src/System/Net/Connections/ConnectionCloseMethod.cs create mode 100644 src/libraries/System.Net.Connections/src/System/Net/Connections/ConnectionExtensions.cs create mode 100644 src/libraries/System.Net.Connections/src/System/Net/Connections/ConnectionFactory.cs create mode 100644 src/libraries/System.Net.Connections/src/System/Net/Connections/ConnectionListener.cs create mode 100644 src/libraries/System.Net.Connections/src/System/Net/Connections/ConnectionListenerFactory.cs create mode 100644 src/libraries/System.Net.Connections/src/System/Net/Connections/DuplexPipeStream.cs create mode 100644 src/libraries/System.Net.Connections/src/System/Net/Connections/IConnectionProperties.cs create mode 100644 src/libraries/System.Net.Connections/tests/System.Net.Connections.Tests/ConnectionBaseTest.cs create mode 100644 src/libraries/System.Net.Connections/tests/System.Net.Connections.Tests/ConnectionTest.cs create mode 100644 src/libraries/System.Net.Connections/tests/System.Net.Connections.Tests/ConnectionWithoutStreamOrPipe.cs create mode 100644 src/libraries/System.Net.Connections/tests/System.Net.Connections.Tests/MockConnection.cs create mode 100644 src/libraries/System.Net.Connections/tests/System.Net.Connections.Tests/MockPipe.cs create mode 100644 src/libraries/System.Net.Connections/tests/System.Net.Connections.Tests/MockPipeReader.cs create mode 100644 src/libraries/System.Net.Connections/tests/System.Net.Connections.Tests/MockPipeWriter.cs create mode 100644 src/libraries/System.Net.Connections/tests/System.Net.Connections.Tests/MockStream.cs create mode 100644 src/libraries/System.Net.Connections/tests/System.Net.Connections.Tests/System.Net.Connections.Tests.csproj create mode 100644 src/libraries/System.Net.Http/src/System/Net/Http/BrowserHttpHandler/SocketsHttpConnectionFactory.cs create mode 100644 src/libraries/System.Net.Http/src/System/Net/Http/SocketsHttpHandler/Connections/SocketConnection.cs create mode 100644 src/libraries/System.Net.Http/src/System/Net/Http/SocketsHttpHandler/Connections/TaskSocketAsyncEventArgs.cs create mode 100644 src/libraries/System.Net.Http/src/System/Net/Http/SocketsHttpHandler/DnsEndPointWithProperties.cs create mode 100644 src/libraries/System.Net.Http/src/System/Net/Http/SocketsHttpHandler/SocketsHttpConnectionFactory.cs diff --git a/src/libraries/Common/tests/System/Net/Http/GenericLoopbackServer.cs b/src/libraries/Common/tests/System/Net/Http/GenericLoopbackServer.cs index fd12e23..d15ab88 100644 --- a/src/libraries/Common/tests/System/Net/Http/GenericLoopbackServer.cs +++ b/src/libraries/Common/tests/System/Net/Http/GenericLoopbackServer.cs @@ -6,6 +6,8 @@ using System.Linq; using System.Text; using System.Threading.Tasks; using System.Security.Authentication; +using System.IO; +using System.Net.Sockets; namespace System.Net.Test.Common { @@ -17,6 +19,8 @@ namespace System.Net.Test.Common public abstract GenericLoopbackServer CreateServer(GenericLoopbackOptions options = null); public abstract Task CreateServerAsync(Func funcAsync, int millisecondsTimeout = 60_000, GenericLoopbackOptions options = null); + public abstract Task CreateConnectionAsync(Socket socket, Stream stream, GenericLoopbackOptions options = null); + public abstract Version Version { get; } // Common helper methods @@ -58,6 +62,8 @@ namespace System.Net.Test.Common { public abstract void Dispose(); + public abstract Task InitializeConnectionAsync(); + /// Read request Headers and optionally request body as well. public abstract Task ReadRequestDataAsync(bool readBody = true); /// Read complete request body if not done by ReadRequestData. diff --git a/src/libraries/Common/tests/System/Net/Http/Http2LoopbackConnection.cs b/src/libraries/Common/tests/System/Net/Http/Http2LoopbackConnection.cs index 3bc2097..ad60410 100644 --- a/src/libraries/Common/tests/System/Net/Http/Http2LoopbackConnection.cs +++ b/src/libraries/Common/tests/System/Net/Http/Http2LoopbackConnection.cs @@ -7,6 +7,7 @@ using System.Linq; using System.Net.Http.Functional.Tests; using System.Net.Security; using System.Net.Sockets; +using System.Security.Cryptography.X509Certificates; using System.Text; using System.Threading; using System.Threading.Tasks; @@ -24,28 +25,31 @@ namespace System.Net.Test.Common private readonly TimeSpan _timeout; private int _lastStreamId; - private readonly byte[] _prefix; + private readonly byte[] _prefix = new byte[24]; public string PrefixString => Encoding.UTF8.GetString(_prefix, 0, _prefix.Length); public bool IsInvalid => _connectionSocket == null; public Stream Stream => _connectionStream; public Task SettingAckWaiter => _ignoredSettingsAckPromise?.Task; - public Http2LoopbackConnection(Socket socket, Http2Options httpOptions) - : this(socket, httpOptions, Http2LoopbackServer.Timeout) + private Http2LoopbackConnection(Socket socket, Stream stream, TimeSpan timeout) { + _connectionSocket = socket; + _connectionStream = stream; + _timeout = timeout; } - public Http2LoopbackConnection(Socket socket, Http2Options httpOptions, TimeSpan timeout) + public static Task CreateAsync(Socket socket, Stream stream, Http2Options httpOptions) { - _connectionSocket = socket; - _connectionStream = new NetworkStream(_connectionSocket, true); - _timeout = timeout; + return CreateAsync(socket, stream, httpOptions, Http2LoopbackServer.Timeout); + } + public static async Task CreateAsync(Socket socket, Stream stream, Http2Options httpOptions, TimeSpan timeout) + { if (httpOptions.UseSsl) { - var sslStream = new SslStream(_connectionStream, false, delegate { return true; }); + var sslStream = new SslStream(stream, false, delegate { return true; }); - using (var cert = Configuration.Certificates.GetServerCertificate()) + using (X509Certificate2 cert = Configuration.Certificates.GetServerCertificate()) { #if !NETFRAMEWORK SslServerAuthenticationOptions options = new SslServerAuthenticationOptions(); @@ -61,21 +65,29 @@ namespace System.Net.Test.Common options.ClientCertificateRequired = httpOptions.ClientCertificateRequired; - sslStream.AuthenticateAsServerAsync(options, CancellationToken.None).Wait(); + await sslStream.AuthenticateAsServerAsync(options, CancellationToken.None).ConfigureAwait(false); #else - sslStream.AuthenticateAsServerAsync(cert, httpOptions.ClientCertificateRequired, httpOptions.SslProtocols, checkCertificateRevocation: false).Wait(); + await sslStream.AuthenticateAsServerAsync(cert, httpOptions.ClientCertificateRequired, httpOptions.SslProtocols, checkCertificateRevocation: false).ConfigureAwait(false); #endif } - _connectionStream = sslStream; + stream = sslStream; } - _prefix = new byte[24]; - if (!FillBufferAsync(_prefix).Result) + var con = new Http2LoopbackConnection(socket, stream, timeout); + await con.ReadPrefixAsync().ConfigureAwait(false); + + return con; + } + + private async Task ReadPrefixAsync() + { + if (!await FillBufferAsync(_prefix)) { throw new Exception("Connection stream closed while attempting to read connection preface."); } - else if (Text.Encoding.ASCII.GetString(_prefix).Contains("HTTP/1.1")) + + if (Text.Encoding.ASCII.GetString(_prefix).Contains("HTTP/1.1")) { throw new Exception("HTTP 1.1 request received."); } @@ -275,7 +287,7 @@ namespace System.Net.Test.Common public void ShutdownSend() { - _connectionSocket.Shutdown(SocketShutdown.Send); + _connectionSocket?.Shutdown(SocketShutdown.Send); } // This will cause a server-initiated shutdown of the connection. @@ -563,6 +575,41 @@ namespace System.Net.Test.Common return (streamId, requestData); } + public override Task InitializeConnectionAsync() + { + return ReadAndSendSettingsAsync(ackTimeout: null); + } + + public async Task ReadAndSendSettingsAsync(TimeSpan? ackTimeout, params SettingsEntry[] settingsEntries) + { + // Receive the initial client settings frame. + Frame receivedFrame = await ReadFrameAsync(_timeout).ConfigureAwait(false); + Assert.Equal(FrameType.Settings, receivedFrame.Type); + Assert.Equal(FrameFlags.None, receivedFrame.Flags); + Assert.Equal(0, receivedFrame.StreamId); + + var clientSettingsFrame = (SettingsFrame)receivedFrame; + + // Receive the initial client window update frame. + receivedFrame = await ReadFrameAsync(_timeout).ConfigureAwait(false); + Assert.Equal(FrameType.WindowUpdate, receivedFrame.Type); + Assert.Equal(FrameFlags.None, receivedFrame.Flags); + Assert.Equal(0, receivedFrame.StreamId); + + // Send the initial server settings frame. + SettingsFrame settingsFrame = new SettingsFrame(settingsEntries); + await WriteFrameAsync(settingsFrame).ConfigureAwait(false); + + // Send the client settings frame ACK. + Frame settingsAck = new Frame(0, FrameType.Settings, FrameFlags.Ack, 0); + await WriteFrameAsync(settingsAck).ConfigureAwait(false); + + // The client will send us a SETTINGS ACK eventually, but not necessarily right away. + await ExpectSettingsAckAsync((int?)ackTimeout?.TotalMilliseconds ?? 5000); + + return clientSettingsFrame; + } + public async Task SendGoAway(int lastStreamId, ProtocolErrors errorCode = ProtocolErrors.NO_ERROR) { GoAwayFrame frame = new GoAwayFrame(lastStreamId, (int)errorCode, new byte[] { }, 0); diff --git a/src/libraries/Common/tests/System/Net/Http/Http2LoopbackServer.cs b/src/libraries/Common/tests/System/Net/Http/Http2LoopbackServer.cs index 8516140..e890812 100644 --- a/src/libraries/Common/tests/System/Net/Http/Http2LoopbackServer.cs +++ b/src/libraries/Common/tests/System/Net/Http/Http2LoopbackServer.cs @@ -90,7 +90,10 @@ namespace System.Net.Test.Common Socket connectionSocket = await _listenSocket.AcceptAsync().ConfigureAwait(false); - Http2LoopbackConnection connection = timeout != null ? new Http2LoopbackConnection(connectionSocket, _options, timeout.Value) : new Http2LoopbackConnection(connectionSocket, _options); + var stream = new NetworkStream(connectionSocket, ownsSocket: true); + Http2LoopbackConnection connection = + timeout != null ? await Http2LoopbackConnection.CreateAsync(connectionSocket, stream, _options, timeout.Value).ConfigureAwait(false) : + await Http2LoopbackConnection.CreateAsync(connectionSocket, stream, _options).ConfigureAwait(false); _connections.Add(connection); return connection; @@ -103,7 +106,7 @@ namespace System.Net.Test.Common public Task EstablishConnectionAsync(params SettingsEntry[] settingsEntries) { - return EstablishConnectionAsync(null, null, settingsEntries); + return EstablishConnectionAsync(timeout: null, ackTimeout: null, settingsEntries); } public async Task EstablishConnectionAsync(TimeSpan? timeout, TimeSpan? ackTimeout, params SettingsEntry[] settingsEntries) @@ -114,37 +117,13 @@ namespace System.Net.Test.Common public Task<(Http2LoopbackConnection, SettingsFrame)> EstablishConnectionGetSettingsAsync(params SettingsEntry[] settingsEntries) { - return EstablishConnectionGetSettingsAsync(null, null, settingsEntries); + return EstablishConnectionGetSettingsAsync(timeout: null, ackTimeout: null, settingsEntries); } public async Task<(Http2LoopbackConnection, SettingsFrame)> EstablishConnectionGetSettingsAsync(TimeSpan? timeout, TimeSpan? ackTimeout, params SettingsEntry[] settingsEntries) { Http2LoopbackConnection connection = await AcceptConnectionAsync(timeout).ConfigureAwait(false); - - // Receive the initial client settings frame. - Frame receivedFrame = await connection.ReadFrameAsync(Timeout).ConfigureAwait(false); - Assert.Equal(FrameType.Settings, receivedFrame.Type); - Assert.Equal(FrameFlags.None, receivedFrame.Flags); - Assert.Equal(0, receivedFrame.StreamId); - - var clientSettingsFrame = (SettingsFrame)receivedFrame; - - // Receive the initial client window update frame. - receivedFrame = await connection.ReadFrameAsync(Timeout).ConfigureAwait(false); - Assert.Equal(FrameType.WindowUpdate, receivedFrame.Type); - Assert.Equal(FrameFlags.None, receivedFrame.Flags); - Assert.Equal(0, receivedFrame.StreamId); - - // Send the initial server settings frame. - SettingsFrame settingsFrame = new SettingsFrame(settingsEntries); - await connection.WriteFrameAsync(settingsFrame).ConfigureAwait(false); - - // Send the client settings frame ACK. - Frame settingsAck = new Frame(0, FrameType.Settings, FrameFlags.Ack, 0); - await connection.WriteFrameAsync(settingsAck).ConfigureAwait(false); - - // The client will send us a SETTINGS ACK eventually, but not necessarily right away. - await connection.ExpectSettingsAckAsync((int) (ackTimeout?.TotalMilliseconds ?? 5000)); + SettingsFrame clientSettingsFrame = await connection.ReadAndSendSettingsAsync(ackTimeout, settingsEntries).ConfigureAwait(false); return (connection, clientSettingsFrame); } @@ -220,7 +199,6 @@ namespace System.Net.Test.Common public Http2Options() { - UseSsl = PlatformDetection.SupportsAlpn && !Capability.Http2ForceUnencryptedLoopback(); SslProtocols = SslProtocols.Tls12; } } @@ -239,6 +217,16 @@ namespace System.Net.Test.Common public override GenericLoopbackServer CreateServer(GenericLoopbackOptions options = null) { + return Http2LoopbackServer.CreateServer(CreateOptions(options)); + } + + public override async Task CreateConnectionAsync(Socket socket, Stream stream, GenericLoopbackOptions options = null) + { + return await Http2LoopbackConnection.CreateAsync(socket, stream, CreateOptions(options)).ConfigureAwait(false); + } + + private static Http2Options CreateOptions(GenericLoopbackOptions options) + { Http2Options http2Options = new Http2Options(); if (options != null) { @@ -246,8 +234,7 @@ namespace System.Net.Test.Common http2Options.UseSsl = options.UseSsl; http2Options.SslProtocols = options.SslProtocols; } - - return Http2LoopbackServer.CreateServer(http2Options); + return http2Options; } public override async Task CreateServerAsync(Func funcAsync, int millisecondsTimeout = 60_000, GenericLoopbackOptions options = null) diff --git a/src/libraries/Common/tests/System/Net/Http/Http3LoopbackConnection.cs b/src/libraries/Common/tests/System/Net/Http/Http3LoopbackConnection.cs index 5d6f3e7..cfa912a 100644 --- a/src/libraries/Common/tests/System/Net/Http/Http3LoopbackConnection.cs +++ b/src/libraries/Common/tests/System/Net/Http/Http3LoopbackConnection.cs @@ -8,6 +8,7 @@ using System.Net.Quic; using System.Text; using System.Threading.Tasks; using System.Linq; +using System.Net.Http.Functional.Tests; namespace System.Net.Test.Common { @@ -84,6 +85,11 @@ namespace System.Net.Test.Common return requestId == 0 ? _currentStream : _openStreams[requestId - 1]; } + public override Task InitializeConnectionAsync() + { + throw new NotImplementedException(); + } + public async Task AcceptStreamAsync() { QuicStream quicStream = await _connection.AcceptStreamAsync().ConfigureAwait(false); diff --git a/src/libraries/Common/tests/System/Net/Http/Http3LoopbackServer.cs b/src/libraries/Common/tests/System/Net/Http/Http3LoopbackServer.cs index 4115570..49c3c86 100644 --- a/src/libraries/Common/tests/System/Net/Http/Http3LoopbackServer.cs +++ b/src/libraries/Common/tests/System/Net/Http/Http3LoopbackServer.cs @@ -3,8 +3,10 @@ using System.Collections.Generic; using System.Diagnostics; +using System.IO; using System.Net.Quic; using System.Net.Security; +using System.Net.Sockets; using System.Security.Cryptography.X509Certificates; using System.Threading.Tasks; @@ -80,5 +82,12 @@ namespace System.Net.Test.Common using GenericLoopbackServer server = CreateServer(options); await funcAsync(server, server.Address).TimeoutAfter(millisecondsTimeout).ConfigureAwait(false); } + + public override Task CreateConnectionAsync(Socket socket, Stream stream, GenericLoopbackOptions options = null) + { + // TODO: make a new overload that takes a MultiplexedConnection. + // This method is always unacceptable to call for HTTP/3. + throw new NotImplementedException("HTTP/3 does not operate over a Socket."); + } } } diff --git a/src/libraries/Common/tests/System/Net/Http/LoopbackServer.cs b/src/libraries/Common/tests/System/Net/Http/LoopbackServer.cs index 208f9d7f..76c8459 100644 --- a/src/libraries/Common/tests/System/Net/Http/LoopbackServer.cs +++ b/src/libraries/Common/tests/System/Net/Http/LoopbackServer.cs @@ -594,13 +594,18 @@ namespace System.Net.Test.Common // This seems to help avoid connection reset issues caused by buffered data // that has not been sent/acked when the graceful shutdown timeout expires. // This may throw if the socket was already closed, so eat any exception. - _socket.Shutdown(SocketShutdown.Send); + _socket?.Shutdown(SocketShutdown.Send); } catch (Exception) { } _writer.Dispose(); _stream.Dispose(); - _socket.Dispose(); + _socket?.Dispose(); + } + + public override Task InitializeConnectionAsync() + { + return Task.CompletedTask; } public async Task> ReadRequestHeaderAsync() @@ -896,6 +901,11 @@ namespace System.Net.Test.Common return LoopbackServer.CreateServerAsync((server, uri) => funcAsync(server, uri), options: CreateOptions(options)); } + public override Task CreateConnectionAsync(Socket socket, Stream stream, GenericLoopbackOptions options = null) + { + return Task.FromResult(new LoopbackServer.Connection(socket, stream)); + } + private static LoopbackServer.Options CreateOptions(GenericLoopbackOptions options) { LoopbackServer.Options newOptions = new LoopbackServer.Options(); diff --git a/src/libraries/Common/tests/System/Net/VirtualNetwork/VirtualNetwork.cs b/src/libraries/Common/tests/System/Net/VirtualNetwork/VirtualNetwork.cs index 77a4695..141fe66 100644 --- a/src/libraries/Common/tests/System/Net/VirtualNetwork/VirtualNetwork.cs +++ b/src/libraries/Common/tests/System/Net/VirtualNetwork/VirtualNetwork.cs @@ -14,7 +14,7 @@ namespace System.Net.Test.Common public VirtualNetworkConnectionBroken() : base("Connection broken") { } } - private readonly int WaitForReadDataTimeoutMilliseconds = 30 * 1000; + private readonly int WaitForReadDataTimeoutMilliseconds = 60 * 1000; private readonly ConcurrentQueue _clientWriteQueue = new ConcurrentQueue(); private readonly ConcurrentQueue _serverWriteQueue = new ConcurrentQueue(); @@ -22,8 +22,11 @@ namespace System.Net.Test.Common private readonly SemaphoreSlim _clientDataAvailable = new SemaphoreSlim(0); private readonly SemaphoreSlim _serverDataAvailable = new SemaphoreSlim(0); + private volatile bool _clientWriteShutdown = false; + private volatile bool _serverWriteShutdown = false; + public bool DisableConnectionBreaking { get; set; } = false; - private bool _connectionBroken = false; + private volatile bool _connectionBroken = false; public byte[] ReadFrame(bool server) => ReadFrameCoreAsync(server, sync: true, cancellationToken: default).GetAwaiter().GetResult(); @@ -75,6 +78,11 @@ namespace System.Net.Test.Common return buffer; } + if ((server && _clientWriteShutdown) || (!server && _serverWriteShutdown)) + { + return Array.Empty(); + } + remainingTries--; backOffDelayMilliseconds *= backOffDelayMilliseconds; if (sync) @@ -98,6 +106,16 @@ namespace System.Net.Test.Common throw new VirtualNetworkConnectionBroken(); } + if ((server && _serverWriteShutdown) || (!server && _clientWriteShutdown)) + { + throw new InvalidOperationException("Writing to a shutdown side."); + } + + if (buffer.Length == 0) + { + return; + } + SemaphoreSlim semaphore; ConcurrentQueue packetQueue; @@ -116,11 +134,25 @@ namespace System.Net.Test.Common semaphore.Release(); } + public void GracefulShutdown(bool server) + { + if (server) + { + _serverWriteShutdown = true; + _serverDataAvailable.Release(1_000_000); + } + else + { + _clientWriteShutdown = true; + _clientDataAvailable.Release(1_000_000); + } + } + public void BreakConnection() { if (!DisableConnectionBreaking) { - _connectionBroken = true; + _connectionBroken = !_serverWriteShutdown || !_clientWriteShutdown; _serverDataAvailable.Release(1_000_000); _clientDataAvailable.Release(1_000_000); } diff --git a/src/libraries/Common/tests/System/Net/VirtualNetwork/VirtualNetworkConnectionListenerFactory.cs b/src/libraries/Common/tests/System/Net/VirtualNetwork/VirtualNetworkConnectionListenerFactory.cs new file mode 100644 index 0000000..0a47a25 --- /dev/null +++ b/src/libraries/Common/tests/System/Net/VirtualNetwork/VirtualNetworkConnectionListenerFactory.cs @@ -0,0 +1,187 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT license. + +using System.Diagnostics; +using System.IO; +using System.Net.Connections; +using System.Threading; +using System.Threading.Channels; +using System.Threading.Tasks; + +namespace System.Net.Test.Common +{ + + internal sealed class VirtualNetworkConnectionListenerFactory : ConnectionListenerFactory + { + public static ConnectionFactory GetConnectionFactory(ConnectionListener listener) + { + bool hasFactory = listener.ListenerProperties.TryGet(out VirtualConnectionFactory factory); + Debug.Assert(hasFactory); + return factory; + } + + public override ValueTask BindAsync(EndPoint endPoint, IConnectionProperties options = null, CancellationToken cancellationToken = default) + { + if (cancellationToken.IsCancellationRequested) return ValueTask.FromCanceled(cancellationToken); + return new ValueTask(new VirtualConnectionListener(endPoint)); + } + + protected override void Dispose(bool disposing) + { + } + + protected override ValueTask DisposeAsyncCore() + { + return default; + } + + private sealed class VirtualConnectionListener : ConnectionListener, IConnectionProperties + { + private readonly Channel> _pendingConnects; + private readonly VirtualConnectionFactory _connectionFactory; + private readonly CancellationTokenSource _cts = new CancellationTokenSource(); + + public override IConnectionProperties ListenerProperties => this; + + public override EndPoint LocalEndPoint { get; } + + public VirtualConnectionListener(EndPoint localEndPoint) + { + LocalEndPoint = localEndPoint; + + _pendingConnects = Channel.CreateUnbounded>(); + _connectionFactory = new VirtualConnectionFactory(this); + } + + public override async ValueTask AcceptAsync(IConnectionProperties options = null, CancellationToken cancellationToken = default) + { + using CancellationTokenSource cts = CancellationTokenSource.CreateLinkedTokenSource(_cts.Token, cancellationToken); + + var network = new VirtualNetwork(); + var serverConnection = new VirtualConnection(network, isServer: true); + var clientConnection = new VirtualConnection(network, isServer: false); + + while (true) + { + TaskCompletionSource tcs = await _pendingConnects.Reader.ReadAsync(cancellationToken); + if (tcs.TrySetResult(clientConnection)) + { + return serverConnection; + } + } + } + + internal async ValueTask ConnectAsync(EndPoint endPoint, IConnectionProperties options = null, CancellationToken cancellationToken = default) + { + var tcs = new TaskCompletionSource(); + await _pendingConnects.Writer.WriteAsync(tcs, cancellationToken).ConfigureAwait(false); + + using (CancellationTokenSource cts = CancellationTokenSource.CreateLinkedTokenSource(_cts.Token, cancellationToken)) + using (cts.Token.UnsafeRegister(o => ((TaskCompletionSource)o).TrySetCanceled(), tcs)) + { + return await tcs.Task.ConfigureAwait(false); + } + } + + protected override void Dispose(bool disposing) + { + if (disposing) + { + _cts.Cancel(); + } + } + + protected override ValueTask DisposeAsyncCore() + { + Dispose(true); + return default; + } + + bool IConnectionProperties.TryGet(Type propertyKey, out object property) + { + if (propertyKey == typeof(VirtualConnectionFactory)) + { + property = _connectionFactory; + return true; + } + + property = null; + return false; + } + } + + private sealed class VirtualConnectionFactory : ConnectionFactory + { + private readonly VirtualConnectionListener _listener; + + public VirtualConnectionFactory(VirtualConnectionListener listener) + { + _listener = listener; + } + + public override ValueTask ConnectAsync(EndPoint endPoint, IConnectionProperties options = null, CancellationToken cancellationToken = default) + { + return _listener.ConnectAsync(endPoint, options, cancellationToken); + } + + protected override void Dispose(bool disposing) + { + if (disposing) + { + _listener.Dispose(); + } + } + + protected override ValueTask DisposeAsyncCore() + { + return _listener.DisposeAsync(); + } + } + + private sealed class VirtualConnection : Connection, IConnectionProperties + { + private readonly VirtualNetwork _network; + private bool _isServer; + + public override IConnectionProperties ConnectionProperties => this; + + public override EndPoint LocalEndPoint => null; + + public override EndPoint RemoteEndPoint => null; + + public VirtualConnection(VirtualNetwork network, bool isServer) + { + _network = network; + _isServer = isServer; + } + + protected override Stream CreateStream() + { + return new VirtualNetworkStream(_network, _isServer, gracefulShutdown: true); + } + + protected override ValueTask CloseAsyncCore(ConnectionCloseMethod method, CancellationToken cancellationToken) + { + if (cancellationToken.IsCancellationRequested) return ValueTask.FromCanceled(cancellationToken); + + if (method == ConnectionCloseMethod.GracefulShutdown) + { + _network.GracefulShutdown(_isServer); + } + else + { + _network.BreakConnection(); + } + + return default; + } + + bool IConnectionProperties.TryGet(Type propertyKey, out object property) + { + property = null; + return false; + } + } + } + +} diff --git a/src/libraries/Common/tests/System/Net/VirtualNetwork/VirtualNetworkStream.cs b/src/libraries/Common/tests/System/Net/VirtualNetwork/VirtualNetworkStream.cs index 5770553..aa0fb2e 100644 --- a/src/libraries/Common/tests/System/Net/VirtualNetwork/VirtualNetworkStream.cs +++ b/src/libraries/Common/tests/System/Net/VirtualNetwork/VirtualNetworkStream.cs @@ -12,13 +12,15 @@ namespace System.Net.Test.Common private readonly VirtualNetwork _network; private MemoryStream _readStream; private readonly bool _isServer; + private readonly bool _gracefulShutdown; private SemaphoreSlim _readStreamLock = new SemaphoreSlim(1, 1); private TaskCompletionSource _flushTcs; - public VirtualNetworkStream(VirtualNetwork network, bool isServer) + public VirtualNetworkStream(VirtualNetwork network, bool isServer, bool gracefulShutdown = false) { _network = network; _isServer = isServer; + _gracefulShutdown = gracefulShutdown; } public int DelayMilliseconds { get; set; } @@ -79,7 +81,10 @@ namespace System.Net.Test.Common { if (_readStream == null || (_readStream.Position >= _readStream.Length)) { - _readStream = new MemoryStream(_network.ReadFrame(_isServer)); + byte[] frame = _network.ReadFrame(_isServer); + if (frame.Length == 0) return 0; + + _readStream = new MemoryStream(frame); } return _readStream.Read(buffer, offset, count); @@ -102,7 +107,10 @@ namespace System.Net.Test.Common if (_readStream == null || (_readStream.Position >= _readStream.Length)) { - _readStream = new MemoryStream(await _network.ReadFrameAsync(_isServer, cancellationToken).ConfigureAwait(false)); + byte[] frame = await _network.ReadFrameAsync(_isServer, cancellationToken).ConfigureAwait(false); + if (frame.Length == 0) return 0; + + _readStream = new MemoryStream(frame); } return await _readStream.ReadAsync(buffer, offset, count, cancellationToken).ConfigureAwait(false); @@ -147,10 +155,22 @@ namespace System.Net.Test.Common if (disposing) { Disposed = true; - _network.BreakConnection(); + if (_gracefulShutdown) + { + GracefulShutdown(); + } + else + { + _network.BreakConnection(); + } } base.Dispose(disposing); } + + public void GracefulShutdown() + { + _network.GracefulShutdown(_isServer); + } } } diff --git a/src/libraries/Common/tests/System/Threading/Tasks/TaskTimeoutExtensions.cs b/src/libraries/Common/tests/System/Threading/Tasks/TaskTimeoutExtensions.cs index 033cdd7..a2a6c5e 100644 --- a/src/libraries/Common/tests/System/Threading/Tasks/TaskTimeoutExtensions.cs +++ b/src/libraries/Common/tests/System/Threading/Tasks/TaskTimeoutExtensions.cs @@ -60,6 +60,20 @@ namespace System.Threading.Tasks } } +#if !NETFRAMEWORK + public static Task TimeoutAfter(this ValueTask task, int millisecondsTimeout) + => task.AsTask().TimeoutAfter(TimeSpan.FromMilliseconds(millisecondsTimeout)); + + public static Task TimeoutAfter(this ValueTask task, TimeSpan timeout) + => task.AsTask().TimeoutAfter(timeout); + + public static Task TimeoutAfter(this ValueTask task, int millisecondsTimeout) + => task.AsTask().TimeoutAfter(TimeSpan.FromMilliseconds(millisecondsTimeout)); + + public static Task TimeoutAfter(this ValueTask task, TimeSpan timeout) + => task.AsTask().TimeoutAfter(timeout); +#endif + public static async Task WhenAllOrAnyFailed(this Task[] tasks, int millisecondsTimeout) { var cts = new CancellationTokenSource(); diff --git a/src/libraries/NetCoreAppLibrary.props b/src/libraries/NetCoreAppLibrary.props index 710e7b1..9b52e36 100644 --- a/src/libraries/NetCoreAppLibrary.props +++ b/src/libraries/NetCoreAppLibrary.props @@ -48,6 +48,7 @@ System.IO.FileSystem.Watcher; System.IO.IsolatedStorage; System.IO.MemoryMappedFiles; + System.IO.Pipelines; System.IO.Pipes; System.IO.Pipes.AccessControl; System.IO.UnmanagedMemoryStream; @@ -56,6 +57,7 @@ System.Linq.Parallel; System.Linq.Queryable; System.Memory; + System.Net.Connections; System.Net.Http; System.Net.Http.Json; System.Net.HttpListener; diff --git a/src/libraries/System.IO.Pipelines/pkg/System.IO.Pipelines.pkgproj b/src/libraries/System.IO.Pipelines/pkg/System.IO.Pipelines.pkgproj index da675e5..517bc69 100644 --- a/src/libraries/System.IO.Pipelines/pkg/System.IO.Pipelines.pkgproj +++ b/src/libraries/System.IO.Pipelines/pkg/System.IO.Pipelines.pkgproj @@ -11,6 +11,7 @@ + diff --git a/src/libraries/System.IO.Pipelines/ref/System.IO.Pipelines.csproj b/src/libraries/System.IO.Pipelines/ref/System.IO.Pipelines.csproj index a038d35..7cb2868 100644 --- a/src/libraries/System.IO.Pipelines/ref/System.IO.Pipelines.csproj +++ b/src/libraries/System.IO.Pipelines/ref/System.IO.Pipelines.csproj @@ -1,16 +1,21 @@ - netstandard2.0;net461 + $(NetCoreAppCurrent);netstandard2.0;net461 enable true netcoreapp2.0 + true - + + + + + diff --git a/src/libraries/System.Net.Connections/Directory.Build.props b/src/libraries/System.Net.Connections/Directory.Build.props new file mode 100644 index 0000000..63f02a0 --- /dev/null +++ b/src/libraries/System.Net.Connections/Directory.Build.props @@ -0,0 +1,6 @@ + + + + Microsoft + + \ No newline at end of file diff --git a/src/libraries/System.Net.Connections/System.Net.Connections.sln b/src/libraries/System.Net.Connections/System.Net.Connections.sln new file mode 100644 index 0000000..a325eb2 --- /dev/null +++ b/src/libraries/System.Net.Connections/System.Net.Connections.sln @@ -0,0 +1,50 @@ +Microsoft Visual Studio Solution File, Format Version 12.00 +# Visual Studio Version 16 +VisualStudioVersion = 16.0.30310.162 +MinimumVisualStudioVersion = 10.0.40219.1 +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "System.Net.Connections", "src\System.Net.Connections.csproj", "{1D422B1D-D7C4-41B9-862D-EB3D98DF37DE}" + ProjectSection(ProjectDependencies) = postProject + {132BF813-FC40-4D39-8B6F-E55D7633F0ED} = {132BF813-FC40-4D39-8B6F-E55D7633F0ED} + EndProjectSection +EndProject +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "System.Net.Connections", "ref\System.Net.Connections.csproj", "{132BF813-FC40-4D39-8B6F-E55D7633F0ED}" +EndProject +Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "src", "src", "{E107E9C1-E893-4E87-987E-04EF0DCEAEFD}" +EndProject +Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "ref", "ref", "{2E666815-2EDB-464B-9DF6-380BF4789AD4}" +EndProject +Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "tests", "tests", "{E0983BCC-F93B-4FFF-A4E4-EA874908783F}" +EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "System.Net.Connections.Tests", "tests\System.Net.Connections.Tests\System.Net.Connections.Tests.csproj", "{A4DB505A-FFD8-4A7B-B31A-12AD623E7AD9}" +EndProject +Global + GlobalSection(SolutionConfigurationPlatforms) = preSolution + Debug|Any CPU = Debug|Any CPU + Release|Any CPU = Release|Any CPU + EndGlobalSection + GlobalSection(ProjectConfigurationPlatforms) = postSolution + {1D422B1D-D7C4-41B9-862D-EB3D98DF37DE}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {1D422B1D-D7C4-41B9-862D-EB3D98DF37DE}.Debug|Any CPU.Build.0 = Debug|Any CPU + {1D422B1D-D7C4-41B9-862D-EB3D98DF37DE}.Release|Any CPU.ActiveCfg = Release|Any CPU + {1D422B1D-D7C4-41B9-862D-EB3D98DF37DE}.Release|Any CPU.Build.0 = Release|Any CPU + {132BF813-FC40-4D39-8B6F-E55D7633F0ED}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {132BF813-FC40-4D39-8B6F-E55D7633F0ED}.Debug|Any CPU.Build.0 = Debug|Any CPU + {132BF813-FC40-4D39-8B6F-E55D7633F0ED}.Release|Any CPU.ActiveCfg = Release|Any CPU + {132BF813-FC40-4D39-8B6F-E55D7633F0ED}.Release|Any CPU.Build.0 = Release|Any CPU + {A4DB505A-FFD8-4A7B-B31A-12AD623E7AD9}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {A4DB505A-FFD8-4A7B-B31A-12AD623E7AD9}.Debug|Any CPU.Build.0 = Debug|Any CPU + {A4DB505A-FFD8-4A7B-B31A-12AD623E7AD9}.Release|Any CPU.ActiveCfg = Release|Any CPU + {A4DB505A-FFD8-4A7B-B31A-12AD623E7AD9}.Release|Any CPU.Build.0 = Release|Any CPU + EndGlobalSection + GlobalSection(SolutionProperties) = preSolution + HideSolutionNode = FALSE + EndGlobalSection + GlobalSection(NestedProjects) = preSolution + {1D422B1D-D7C4-41B9-862D-EB3D98DF37DE} = {E107E9C1-E893-4E87-987E-04EF0DCEAEFD} + {132BF813-FC40-4D39-8B6F-E55D7633F0ED} = {2E666815-2EDB-464B-9DF6-380BF4789AD4} + {A4DB505A-FFD8-4A7B-B31A-12AD623E7AD9} = {E0983BCC-F93B-4FFF-A4E4-EA874908783F} + EndGlobalSection + GlobalSection(ExtensibilityGlobals) = postSolution + SolutionGuid = {5100F629-0FAB-4C6F-9A54-95AE9565EE0D} + EndGlobalSection +EndGlobal diff --git a/src/libraries/System.Net.Connections/ref/System.Net.Connections.cs b/src/libraries/System.Net.Connections/ref/System.Net.Connections.cs new file mode 100644 index 0000000..c82551e --- /dev/null +++ b/src/libraries/System.Net.Connections/ref/System.Net.Connections.cs @@ -0,0 +1,74 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT license. +// ------------------------------------------------------------------------------ +// Changes to this file must follow the https://aka.ms/api-review process. +// ------------------------------------------------------------------------------ + +namespace System.Net.Connections +{ + public abstract partial class Connection : System.Net.Connections.ConnectionBase + { + protected Connection() { } + public System.IO.Pipelines.IDuplexPipe Pipe { get { throw null; } } + public System.IO.Stream Stream { get { throw null; } } + protected virtual System.IO.Pipelines.IDuplexPipe CreatePipe() { throw null; } + protected virtual System.IO.Stream CreateStream() { throw null; } + public static System.Net.Connections.Connection FromPipe(System.IO.Pipelines.IDuplexPipe pipe, bool leaveOpen = false, System.Net.Connections.IConnectionProperties? properties = null, System.Net.EndPoint? localEndPoint = null, System.Net.EndPoint? remoteEndPoint = null) { throw null; } + public static System.Net.Connections.Connection FromStream(System.IO.Stream stream, bool leaveOpen = false, System.Net.Connections.IConnectionProperties? properties = null, System.Net.EndPoint? localEndPoint = null, System.Net.EndPoint? remoteEndPoint = null) { throw null; } + } + public abstract partial class ConnectionBase : System.IAsyncDisposable, System.IDisposable + { + protected ConnectionBase() { } + public abstract System.Net.Connections.IConnectionProperties ConnectionProperties { get; } + public abstract System.Net.EndPoint? LocalEndPoint { get; } + public abstract System.Net.EndPoint? RemoteEndPoint { get; } + public System.Threading.Tasks.ValueTask CloseAsync(System.Net.Connections.ConnectionCloseMethod method = System.Net.Connections.ConnectionCloseMethod.GracefulShutdown, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; } + protected abstract System.Threading.Tasks.ValueTask CloseAsyncCore(System.Net.Connections.ConnectionCloseMethod method, System.Threading.CancellationToken cancellationToken); + public void Dispose() { } + public System.Threading.Tasks.ValueTask DisposeAsync() { throw null; } + } + public enum ConnectionCloseMethod + { + GracefulShutdown = 0, + Abort = 1, + Immediate = 2, + } + public static partial class ConnectionExtensions + { + public static System.Net.Connections.ConnectionFactory Filter(this System.Net.Connections.ConnectionFactory factory, System.Func> filter) { throw null; } + public static bool TryGet(this System.Net.Connections.IConnectionProperties properties, [System.Diagnostics.CodeAnalysis.MaybeNullWhenAttribute(false)] out T property) { throw null; } + } + public abstract partial class ConnectionFactory : System.IAsyncDisposable, System.IDisposable + { + protected ConnectionFactory() { } + public abstract System.Threading.Tasks.ValueTask ConnectAsync(System.Net.EndPoint? endPoint, System.Net.Connections.IConnectionProperties? options = null, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)); + public void Dispose() { } + protected virtual void Dispose(bool disposing) { } + public System.Threading.Tasks.ValueTask DisposeAsync() { throw null; } + protected virtual System.Threading.Tasks.ValueTask DisposeAsyncCore() { throw null; } + } + public abstract partial class ConnectionListener : System.IAsyncDisposable, System.IDisposable + { + protected ConnectionListener() { } + public abstract System.Net.Connections.IConnectionProperties ListenerProperties { get; } + public abstract System.Net.EndPoint? LocalEndPoint { get; } + public abstract System.Threading.Tasks.ValueTask AcceptAsync(System.Net.Connections.IConnectionProperties? options = null, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)); + public void Dispose() { } + protected virtual void Dispose(bool disposing) { } + public System.Threading.Tasks.ValueTask DisposeAsync() { throw null; } + protected virtual System.Threading.Tasks.ValueTask DisposeAsyncCore() { throw null; } + } + public abstract partial class ConnectionListenerFactory : System.IAsyncDisposable, System.IDisposable + { + protected ConnectionListenerFactory() { } + public abstract System.Threading.Tasks.ValueTask BindAsync(System.Net.EndPoint? endPoint, System.Net.Connections.IConnectionProperties? options = null, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)); + public void Dispose() { } + protected virtual void Dispose(bool disposing) { } + public System.Threading.Tasks.ValueTask DisposeAsync() { throw null; } + protected virtual System.Threading.Tasks.ValueTask DisposeAsyncCore() { throw null; } + } + public partial interface IConnectionProperties + { + bool TryGet(System.Type propertyKey, [System.Diagnostics.CodeAnalysis.NotNullWhenAttribute(true)] out object? property); + } +} diff --git a/src/libraries/System.Net.Connections/ref/System.Net.Connections.csproj b/src/libraries/System.Net.Connections/ref/System.Net.Connections.csproj new file mode 100644 index 0000000..b239637 --- /dev/null +++ b/src/libraries/System.Net.Connections/ref/System.Net.Connections.csproj @@ -0,0 +1,14 @@ + + + $(NetCoreAppCurrent) + enable + + + + + + + + + + diff --git a/src/libraries/System.Net.Connections/src/Resources/Strings.resx b/src/libraries/System.Net.Connections/src/Resources/Strings.resx new file mode 100644 index 0000000..004c0da --- /dev/null +++ b/src/libraries/System.Net.Connections/src/Resources/Strings.resx @@ -0,0 +1,138 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + text/microsoft-resx + + + 2.0 + + + System.Resources.ResXResourceReader, System.Windows.Forms, Version=4.0.0.0, Culture=neutral, PublicKeyToken=b77a5c561934e089 + + + System.Resources.ResXResourceWriter, System.Windows.Forms, Version=4.0.0.0, Culture=neutral, PublicKeyToken=b77a5c561934e089 + + + The CreatePipe implementation returned null; a valid reference was expected. + + + The CreateStream implementation returned null; a valid reference was expected. + + + One of CreatePipe or CreateStream must be implemented + + + The Connection's Pipe may not be accessed after Stream has been accessed. + + + The Connection's Stream may not be accessed after Pipe has been accessed. + + + The PipeReader returned a zero-length read. + + \ No newline at end of file diff --git a/src/libraries/System.Net.Connections/src/System.Net.Connections.csproj b/src/libraries/System.Net.Connections/src/System.Net.Connections.csproj new file mode 100644 index 0000000..c53e559 --- /dev/null +++ b/src/libraries/System.Net.Connections/src/System.Net.Connections.csproj @@ -0,0 +1,26 @@ + + + $(NetCoreAppCurrent) + enable + + + + + + + + + + + + + + + + + + + + + + diff --git a/src/libraries/System.Net.Connections/src/System/Net/Connections/Connection.cs b/src/libraries/System.Net.Connections/src/System/Net/Connections/Connection.cs new file mode 100644 index 0000000..8f7d479 --- /dev/null +++ b/src/libraries/System.Net.Connections/src/System/Net/Connections/Connection.cs @@ -0,0 +1,265 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT license. + +using System.Diagnostics; +using System.Diagnostics.CodeAnalysis; +using System.IO; +using System.IO.Pipelines; +using System.Runtime.ExceptionServices; +using System.Threading; +using System.Threading.Tasks; + +namespace System.Net.Connections +{ + /// + /// A connection. + /// + public abstract class Connection : ConnectionBase + { + private Stream? _stream; + private IDuplexPipe? _pipe; + private bool _initializing; + + /// + /// The connection's . + /// + public Stream Stream => + _stream != null ? _stream : + _pipe != null ? throw new InvalidOperationException(SR.net_connections_stream_use_after_pipe) : + (_stream = CreateStream() ?? throw new InvalidOperationException(SR.net_connections_createstream_null)); + + /// + /// The connection's . + /// + public IDuplexPipe Pipe => + _pipe != null ? _pipe : + _stream != null ? throw new InvalidOperationException(SR.net_connections_pipe_use_after_stream) : + (_pipe = CreatePipe() ?? throw new InvalidOperationException(SR.net_connections_createpipe_null)); + + /// + /// Initializes the for the . + /// + /// A . + /// + /// At least one of and must be overridden. + /// If only is overridden, a user accessing will get a wrapping the . + /// + protected virtual Stream CreateStream() + { + if (_initializing) throw new InvalidOperationException(SR.net_connections_no_create_overrides); + + try + { + _initializing = true; + + IDuplexPipe pipe = CreatePipe(); + if (pipe == null) throw new InvalidOperationException(SR.net_connections_createpipe_null); + + return new DuplexPipeStream(pipe); + } + finally + { + _initializing = false; + } + } + + /// + /// Initializes the for the . + /// + /// An . + /// + /// At least one of and must be overridden. + /// If only is overridden, a user accessing will get a wrapping the . + /// + protected virtual IDuplexPipe CreatePipe() + { + if (_initializing) throw new InvalidOperationException(SR.net_connections_no_create_overrides); + + try + { + _initializing = true; + + Stream stream = CreateStream(); + if (stream == null) throw new InvalidOperationException(SR.net_connections_createstream_null); + + return new DuplexStreamPipe(stream); + } + finally + { + _initializing = false; + } + } + + private sealed class DuplexStreamPipe : IDuplexPipe + { + private static readonly StreamPipeReaderOptions s_readerOpts = new StreamPipeReaderOptions(leaveOpen: true); + private static readonly StreamPipeWriterOptions s_writerOpts = new StreamPipeWriterOptions(leaveOpen: true); + + public DuplexStreamPipe(Stream stream) + { + Input = PipeReader.Create(stream, s_readerOpts); + Output = PipeWriter.Create(stream, s_writerOpts); + } + + public PipeReader Input { get; } + + public PipeWriter Output { get; } + } + + /// + /// Creates a connection for a . + /// + /// The connection's . + /// If false, the will be disposed of once the connection has been closed. + /// The connection's . + /// The connection's . + /// The connection's . + /// A new . + public static Connection FromStream(Stream stream, bool leaveOpen = false, IConnectionProperties? properties = null, EndPoint? localEndPoint = null, EndPoint? remoteEndPoint = null) + { + if (stream == null) throw new ArgumentNullException(nameof(stream)); + return new ConnectionFromStream(stream, leaveOpen, properties, localEndPoint, remoteEndPoint); + } + + /// + /// Creates a connection for an . + /// + /// The connection's . + /// If false and the implements or , it will be disposed of once the connection has been closed. + /// The connection's . + /// The connection's . + /// The connection's . + /// A new . + public static Connection FromPipe(IDuplexPipe pipe, bool leaveOpen = false, IConnectionProperties? properties = null, EndPoint? localEndPoint = null, EndPoint? remoteEndPoint = null) + { + if (pipe == null) throw new ArgumentNullException(nameof(pipe)); + return new ConnectionFromPipe(pipe, leaveOpen, properties, localEndPoint, remoteEndPoint); + } + + private sealed class ConnectionFromStream : Connection, IConnectionProperties + { + private Stream? _originalStream; + private IConnectionProperties? _properties; + private readonly bool _leaveOpen; + + public override IConnectionProperties ConnectionProperties => _properties ?? this; + + public override EndPoint? LocalEndPoint { get; } + + public override EndPoint? RemoteEndPoint { get; } + + public ConnectionFromStream(Stream stream, bool leaveOpen, IConnectionProperties? properties, EndPoint? localEndPoint, EndPoint? remoteEndPoint) + { + _originalStream = stream; + _leaveOpen = leaveOpen; + _properties = properties; + LocalEndPoint = localEndPoint; + RemoteEndPoint = remoteEndPoint; + } + + protected override Stream CreateStream() => _originalStream ?? throw new ObjectDisposedException(nameof(Connection)); + + protected override async ValueTask CloseAsyncCore(ConnectionCloseMethod method, CancellationToken cancellationToken) + { + if (_originalStream == null) + { + return; + } + + if (method == ConnectionCloseMethod.GracefulShutdown) + { + await _originalStream.FlushAsync(cancellationToken).ConfigureAwait(false); + } + + if (!_leaveOpen) + { + await _originalStream.DisposeAsync().ConfigureAwait(false); + } + + _originalStream = null; + } + + bool IConnectionProperties.TryGet(Type propertyKey, [NotNullWhen(true)] out object? property) + { + property = null; + return false; + } + } + + private sealed class ConnectionFromPipe : Connection, IConnectionProperties + { + private IDuplexPipe? _originalPipe; + private IConnectionProperties? _properties; + private readonly bool _leaveOpen; + + public override IConnectionProperties ConnectionProperties => _properties ?? this; + + public override EndPoint? LocalEndPoint { get; } + + public override EndPoint? RemoteEndPoint { get; } + + public ConnectionFromPipe(IDuplexPipe pipe, bool leaveOpen, IConnectionProperties? properties, EndPoint? localEndPoint, EndPoint? remoteEndPoint) + { + _originalPipe = pipe; + _leaveOpen = leaveOpen; + _properties = properties; + LocalEndPoint = localEndPoint; + RemoteEndPoint = remoteEndPoint; + } + + protected override IDuplexPipe CreatePipe() => _originalPipe ?? throw new ObjectDisposedException(nameof(Connection)); + + protected override async ValueTask CloseAsyncCore(ConnectionCloseMethod method, CancellationToken cancellationToken) + { + if (_originalPipe == null) + { + return; + } + + Exception? inputException, outputException; + + if (method == ConnectionCloseMethod.GracefulShutdown) + { + // Flush happens implicitly from CompleteAsync(null), so only flush here if we need cancellation. + if (cancellationToken.CanBeCanceled) + { + FlushResult r = await _originalPipe.Output.FlushAsync(cancellationToken).ConfigureAwait(false); + if (r.IsCanceled) cancellationToken.ThrowIfCancellationRequested(); + } + + inputException = null; + outputException = null; + } + else + { + inputException = ExceptionDispatchInfo.SetCurrentStackTrace(new ObjectDisposedException(nameof(Connection))); + outputException = ExceptionDispatchInfo.SetCurrentStackTrace(new ObjectDisposedException(nameof(Connection))); + } + + await _originalPipe.Input.CompleteAsync(inputException).ConfigureAwait(false); + await _originalPipe.Output.CompleteAsync(outputException).ConfigureAwait(false); + + if (!_leaveOpen) + { + switch (_originalPipe) + { + case IAsyncDisposable d: + await d.DisposeAsync().ConfigureAwait(false); + break; + case IDisposable d: + d.Dispose(); + break; + } + } + + _originalPipe = null; + } + + bool IConnectionProperties.TryGet(Type propertyKey, [NotNullWhen(true)] out object? property) + { + property = null; + return false; + } + } + } +} diff --git a/src/libraries/System.Net.Connections/src/System/Net/Connections/ConnectionBase.cs b/src/libraries/System.Net.Connections/src/System/Net/Connections/ConnectionBase.cs new file mode 100644 index 0000000..027431b --- /dev/null +++ b/src/libraries/System.Net.Connections/src/System/Net/Connections/ConnectionBase.cs @@ -0,0 +1,80 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT license. + +using System.Threading; +using System.Threading.Tasks; + +namespace System.Net.Connections +{ + /// + /// Provides base functionality shared between singular (e.g. TCP) and multiplexed (e.g. QUIC) connections. + /// + public abstract class ConnectionBase : IDisposable, IAsyncDisposable + { + private bool _disposed; + + /// + /// Properties exposed by this connection. + /// + public abstract IConnectionProperties ConnectionProperties { get; } + + /// + /// The local endpoint of this connection, if any. + /// + public abstract EndPoint? LocalEndPoint { get; } + + /// + /// The remote endpoint of this connection, if any. + /// + public abstract EndPoint? RemoteEndPoint { get; } + + /// + /// Closes the connection. + /// + /// The method to use when closing the connection. + /// A cancellation token for the asynchronous operation. + /// A for the asynchronous operation. + public async ValueTask CloseAsync(ConnectionCloseMethod method = ConnectionCloseMethod.GracefulShutdown, CancellationToken cancellationToken = default) + { + if (!_disposed) + { + await CloseAsyncCore(method, cancellationToken).ConfigureAwait(false); + _disposed = true; + } + GC.SuppressFinalize(this); + } + + /// + /// Closes the connection. + /// + /// The method to use when closing the connection. + /// A cancellation token for the asynchronous operation. + /// A for the asynchronous operation. + protected abstract ValueTask CloseAsyncCore(ConnectionCloseMethod method, CancellationToken cancellationToken); + + /// + /// Disposes of the connection. + /// + /// + /// This is equivalent to calling with the method , and calling GetAwaiter().GetResult() on the resulting task. + /// To increase likelihood of synchronous completion, call directly with the method . + /// + public void Dispose() + { + ValueTask t = CloseAsync(ConnectionCloseMethod.GracefulShutdown, CancellationToken.None); + + if (t.IsCompleted) t.GetAwaiter().GetResult(); + else t.AsTask().GetAwaiter().GetResult(); + } + + /// + /// Disposes of the connection. + /// + /// A for the asynchronous operation. + /// This is equivalent to calling with the method . + public ValueTask DisposeAsync() + { + return CloseAsync(ConnectionCloseMethod.GracefulShutdown, CancellationToken.None); + } + } +} diff --git a/src/libraries/System.Net.Connections/src/System/Net/Connections/ConnectionCloseMethod.cs b/src/libraries/System.Net.Connections/src/System/Net/Connections/ConnectionCloseMethod.cs new file mode 100644 index 0000000..6b5a079 --- /dev/null +++ b/src/libraries/System.Net.Connections/src/System/Net/Connections/ConnectionCloseMethod.cs @@ -0,0 +1,26 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT license. + +namespace System.Net.Connections +{ + /// + /// Methods for closing a connection. + /// + public enum ConnectionCloseMethod + { + /// + /// The connection should be flushed and closed. + /// + GracefulShutdown, + + /// + /// The connection should be aborted gracefully, performing any I/O needed to notify the other side of the connection that it has been aborted. + /// + Abort, + + /// + /// The connection should be aborted immediately, avoiding any I/O needed to notify the other side of the connection that it has been aborted. + /// + Immediate + } +} diff --git a/src/libraries/System.Net.Connections/src/System/Net/Connections/ConnectionExtensions.cs b/src/libraries/System.Net.Connections/src/System/Net/Connections/ConnectionExtensions.cs new file mode 100644 index 0000000..6850247 --- /dev/null +++ b/src/libraries/System.Net.Connections/src/System/Net/Connections/ConnectionExtensions.cs @@ -0,0 +1,87 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT license. + +using System.Diagnostics.CodeAnalysis; +using System.Threading; +using System.Threading.Tasks; + +namespace System.Net.Connections +{ + /// + /// Extension methods for working with the System.Net.Connections types. + /// + public static class ConnectionExtensions + { + /// + /// Retrieves a Type-based property from an , if it exists. + /// + /// The type of the property to retrieve. + /// The connection properties to retrieve a property from. + /// If contains a property of type , receives the property. Otherwise, default. + /// If contains a property of type , true. Otherwise, false. + public static bool TryGet(this IConnectionProperties properties, [MaybeNullWhen(false)] out T property) + { + if (properties == null) throw new ArgumentNullException(nameof(properties)); + + if (properties.TryGet(typeof(T), out object? obj) && obj is T propertyValue) + { + property = propertyValue; + return true; + } + else + { + property = default; + return false; + } + } + + /// + /// Creates a connection-level filter on top of a . + /// + /// The factory to be filtered. + /// The connection-level filter to apply on top of . + /// A new filtered . + public static ConnectionFactory Filter(this ConnectionFactory factory, Func> filter) + { + if (factory == null) throw new ArgumentNullException(nameof(factory)); + if (filter == null) throw new ArgumentNullException(nameof(filter)); + return new ConnectionFilteringFactory(factory, filter); + } + + private sealed class ConnectionFilteringFactory : ConnectionFactory + { + private readonly ConnectionFactory _baseFactory; + private readonly Func> _filter; + + public ConnectionFilteringFactory(ConnectionFactory baseFactory, Func> filter) + { + _baseFactory = baseFactory; + _filter = filter; + } + + public override async ValueTask ConnectAsync(EndPoint? endPoint, IConnectionProperties? options = null, CancellationToken cancellationToken = default) + { + Connection con = await _baseFactory.ConnectAsync(endPoint, options, cancellationToken).ConfigureAwait(false); + try + { + return await _filter(con, options, cancellationToken).ConfigureAwait(false); + } + catch + { + await con.CloseAsync(ConnectionCloseMethod.Abort, cancellationToken).ConfigureAwait(false); + throw; + } + } + + protected override void Dispose(bool disposing) + { + if (disposing) _baseFactory.Dispose(); + } + + protected override ValueTask DisposeAsyncCore() + { + return _baseFactory.DisposeAsync(); + } + } + } +} diff --git a/src/libraries/System.Net.Connections/src/System/Net/Connections/ConnectionFactory.cs b/src/libraries/System.Net.Connections/src/System/Net/Connections/ConnectionFactory.cs new file mode 100644 index 0000000..812f78a --- /dev/null +++ b/src/libraries/System.Net.Connections/src/System/Net/Connections/ConnectionFactory.cs @@ -0,0 +1,53 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT license. + +using System.Threading; +using System.Threading.Tasks; + +namespace System.Net.Connections +{ + /// + /// A factory for opening outgoing connections. + /// + public abstract class ConnectionFactory : IAsyncDisposable, IDisposable + { + /// + /// Opens a new . + /// + /// The to connect to, if any. + /// Options used to create the connection, if any. + /// A token used to cancel the asynchronous operation. + /// A for the . + public abstract ValueTask ConnectAsync(EndPoint? endPoint, IConnectionProperties? options = null, CancellationToken cancellationToken = default); + + public void Dispose() + { + Dispose(true); + GC.SuppressFinalize(this); + } + + public async ValueTask DisposeAsync() + { + await DisposeAsyncCore().ConfigureAwait(false); + GC.SuppressFinalize(this); + } + + /// + /// Disposes the . + /// + /// If true, the is being disposed. If false, the is being finalized. + protected virtual void Dispose(bool disposing) + { + } + + /// + /// Asynchronously disposes the . + /// + /// A representing the asynchronous dispose operation. + protected virtual ValueTask DisposeAsyncCore() + { + Dispose(true); + return default; + } + } +} diff --git a/src/libraries/System.Net.Connections/src/System/Net/Connections/ConnectionListener.cs b/src/libraries/System.Net.Connections/src/System/Net/Connections/ConnectionListener.cs new file mode 100644 index 0000000..299ba0d --- /dev/null +++ b/src/libraries/System.Net.Connections/src/System/Net/Connections/ConnectionListener.cs @@ -0,0 +1,62 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT license. + +using System.Threading; +using System.Threading.Tasks; + +namespace System.Net.Connections +{ + /// + /// A listener to accept incoming connections. + /// + public abstract class ConnectionListener : IAsyncDisposable, IDisposable + { + /// + /// Properties exposed by this listener. + /// + public abstract IConnectionProperties ListenerProperties { get; } + + /// + /// The local endpoint of this connection, if any. + /// + public abstract EndPoint? LocalEndPoint { get; } + + /// + /// Accepts an incoming connection. + /// + /// Options used to create the connection, if any. + /// A token used to cancel the asynchronous operation. + /// A for the . + public abstract ValueTask AcceptAsync(IConnectionProperties? options = null, CancellationToken cancellationToken = default); + + public void Dispose() + { + Dispose(true); + GC.SuppressFinalize(this); + } + + public async ValueTask DisposeAsync() + { + await DisposeAsyncCore().ConfigureAwait(false); + GC.SuppressFinalize(this); + } + + /// + /// Disposes the . + /// + /// If true, the is being disposed. If false, the is being finalized. + protected virtual void Dispose(bool disposing) + { + } + + /// + /// Asynchronously disposes the . + /// + /// A representing the asynchronous dispose operation. + protected virtual ValueTask DisposeAsyncCore() + { + Dispose(true); + return default; + } + } +} diff --git a/src/libraries/System.Net.Connections/src/System/Net/Connections/ConnectionListenerFactory.cs b/src/libraries/System.Net.Connections/src/System/Net/Connections/ConnectionListenerFactory.cs new file mode 100644 index 0000000..d208f05 --- /dev/null +++ b/src/libraries/System.Net.Connections/src/System/Net/Connections/ConnectionListenerFactory.cs @@ -0,0 +1,46 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT license. + +using System.Threading; +using System.Threading.Tasks; + +namespace System.Net.Connections +{ + /// + /// A factory for creating connection listeners, to accept incoming connections. + /// + public abstract class ConnectionListenerFactory : IAsyncDisposable, IDisposable + { + public abstract ValueTask BindAsync(EndPoint? endPoint, IConnectionProperties? options = null, CancellationToken cancellationToken = default); + + public void Dispose() + { + Dispose(true); + GC.SuppressFinalize(this); + } + + public async ValueTask DisposeAsync() + { + await DisposeAsyncCore().ConfigureAwait(false); + GC.SuppressFinalize(this); + } + + /// + /// Disposes the . + /// + /// If true, the is being disposed. If false, the is being finalized. + protected virtual void Dispose(bool disposing) + { + } + + /// + /// Asynchronously disposes the . + /// + /// A representing the asynchronous dispose operation. + protected virtual ValueTask DisposeAsyncCore() + { + Dispose(true); + return default; + } + } +} diff --git a/src/libraries/System.Net.Connections/src/System/Net/Connections/DuplexPipeStream.cs b/src/libraries/System.Net.Connections/src/System/Net/Connections/DuplexPipeStream.cs new file mode 100644 index 0000000..3319c97 --- /dev/null +++ b/src/libraries/System.Net.Connections/src/System/Net/Connections/DuplexPipeStream.cs @@ -0,0 +1,166 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT license. + +using System; +using System.Buffers; +using System.IO; +using System.IO.Pipelines; +using System.Runtime.ExceptionServices; +using System.Threading; +using System.Threading.Tasks; + +namespace System.Net.Connections +{ + internal sealed class DuplexPipeStream : Stream + { + private readonly PipeReader _reader; + private readonly PipeWriter _writer; + + public override bool CanRead => true; + public override bool CanSeek => false; + public override bool CanWrite => true; + public override long Length => throw new NotSupportedException(); + public override long Position { get => throw new NotSupportedException(); set => throw new NotSupportedException(); } + + public DuplexPipeStream(IDuplexPipe pipe) + { + _reader = pipe.Input; + _writer = pipe.Output; + } + + protected override void Dispose(bool disposing) + { + if (disposing) + { + _reader.Complete(); + _writer.Complete(); + } + base.Dispose(disposing); + } + + public override async ValueTask DisposeAsync() + { + await _reader.CompleteAsync().ConfigureAwait(false); + await _writer.CompleteAsync().ConfigureAwait(false); + } + + public override void Flush() + { + FlushAsync().GetAwaiter().GetResult(); + } + + public override async Task FlushAsync(CancellationToken cancellationToken) + { + FlushResult r = await _writer.FlushAsync(cancellationToken).ConfigureAwait(false); + if (r.IsCanceled) throw new OperationCanceledException(cancellationToken); + } + + public override int Read(byte[] buffer, int offset, int count) + { + if (buffer == null) throw new ArgumentNullException(nameof(buffer)); + + ValueTask t = ReadAsync(buffer.AsMemory(offset, count)); + return + t.IsCompleted ? t.GetAwaiter().GetResult() : + t.AsTask().GetAwaiter().GetResult(); + } + + public override Task ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) + { + if (buffer == null) return Task.FromException(ExceptionDispatchInfo.SetCurrentStackTrace(new ArgumentNullException(nameof(buffer)))); + return ReadAsync(buffer.AsMemory(offset, count), cancellationToken).AsTask(); + } + + public override async ValueTask ReadAsync(Memory buffer, CancellationToken cancellationToken = default) + { + ReadResult result = await _reader.ReadAsync(cancellationToken).ConfigureAwait(false); + + if (result.IsCanceled) + { + throw new OperationCanceledException(); + } + + ReadOnlySequence sequence = result.Buffer; + long bufferLength = sequence.Length; + SequencePosition consumed = sequence.Start; + + try + { + if (bufferLength != 0) + { + int actual = (int)Math.Min(bufferLength, buffer.Length); + + ReadOnlySequence slice = actual == bufferLength ? sequence : sequence.Slice(0, actual); + consumed = slice.End; + slice.CopyTo(buffer.Span); + + return actual; + } + + if (result.IsCompleted) + { + return 0; + } + } + finally + { + _reader.AdvanceTo(consumed); + } + + // This is a buggy PipeReader implementation that returns 0 byte reads even though the PipeReader + // isn't completed or canceled. + throw new InvalidOperationException(SR.net_connections_zero_byte_pipe_read); + } + + public override IAsyncResult BeginRead(byte[] buffer, int offset, int count, AsyncCallback? callback, object? state) + { + return TaskToApm.Begin(ReadAsync(buffer, offset, count), callback, state); + } + + public override int EndRead(IAsyncResult asyncResult) + { + return TaskToApm.End(asyncResult); + } + + public override long Seek(long offset, SeekOrigin origin) + { + throw new NotSupportedException(); + } + + public override void SetLength(long value) + { + throw new NotSupportedException(); + } + + public override void Write(byte[] buffer, int offset, int count) + { + WriteAsync(buffer, offset, count, CancellationToken.None).GetAwaiter().GetResult(); + } + + public override Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) + { + return WriteAsync(buffer.AsMemory(offset, count), cancellationToken).AsTask(); + } + + public override async ValueTask WriteAsync(ReadOnlyMemory buffer, CancellationToken cancellationToken = default) + { + FlushResult r = await _writer.WriteAsync(buffer, cancellationToken).ConfigureAwait(false); + if (r.IsCanceled) throw new OperationCanceledException(cancellationToken); + } + + public override IAsyncResult BeginWrite(byte[] buffer, int offset, int count, AsyncCallback? callback, object? state) + { + return TaskToApm.Begin(WriteAsync(buffer, offset, count), callback, state); + } + + public override void EndWrite(IAsyncResult asyncResult) + { + TaskToApm.End(asyncResult); + } + + public override Task CopyToAsync(Stream destination, int bufferSize, CancellationToken cancellationToken) + { + return _reader.CopyToAsync(destination, cancellationToken); + } + } +} diff --git a/src/libraries/System.Net.Connections/src/System/Net/Connections/IConnectionProperties.cs b/src/libraries/System.Net.Connections/src/System/Net/Connections/IConnectionProperties.cs new file mode 100644 index 0000000..ce89dd9 --- /dev/null +++ b/src/libraries/System.Net.Connections/src/System/Net/Connections/IConnectionProperties.cs @@ -0,0 +1,21 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT license. + +using System.Diagnostics.CodeAnalysis; + +namespace System.Net.Connections +{ + /// + /// A container for connection properties. + /// + public interface IConnectionProperties + { + /// + /// Retrieves a connection property, if it exists. + /// + /// The key of the property to retrieve. + /// If the property was found, retrieves the property. Otherwise, null. + /// If the property was found, true. Otherwise, false. + bool TryGet(Type propertyKey, [NotNullWhen(true)] out object? property); + } +} diff --git a/src/libraries/System.Net.Connections/tests/System.Net.Connections.Tests/ConnectionBaseTest.cs b/src/libraries/System.Net.Connections/tests/System.Net.Connections.Tests/ConnectionBaseTest.cs new file mode 100644 index 0000000..66cc4a2 --- /dev/null +++ b/src/libraries/System.Net.Connections/tests/System.Net.Connections.Tests/ConnectionBaseTest.cs @@ -0,0 +1,81 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT license. + +using System.Threading.Tasks; +using Xunit; + +namespace System.Net.Connections.Tests +{ + public class ConnectionBaseTest + { + [Fact] + public void Dispose_CallsClose_Success() + { + ConnectionCloseMethod? method = null; + + var con = new MockConnection(); + con.OnCloseAsyncCore = (m, t) => + { + method = m; + return default(ValueTask); + }; + + con.Dispose(); + + Assert.Equal(ConnectionCloseMethod.GracefulShutdown, method); + } + + [Fact] + public async Task DisposeAsync_CallsClose_Success() + { + ConnectionCloseMethod? method = null; + + var con = new MockConnection(); + con.OnCloseAsyncCore = (m, t) => + { + method = m; + return default(ValueTask); + }; + + await con.DisposeAsync(); + + Assert.Equal(ConnectionCloseMethod.GracefulShutdown, method); + } + + [Fact] + public void Dispose_CalledOnce_Success() + { + int callCount = 0; + + var con = new MockConnection(); + con.OnCloseAsyncCore = delegate + { + ++callCount; + return default(ValueTask); + }; + + con.Dispose(); + con.Dispose(); + + Assert.Equal(1, callCount); + } + + [Fact] + public async Task DisposeAsync_CalledOnce_Success() + { + int callCount = 0; + + var con = new MockConnection(); + con.OnCloseAsyncCore = delegate + { + ++callCount; + return default(ValueTask); + }; + + await con.DisposeAsync(); + await con.DisposeAsync(); + + Assert.Equal(1, callCount); + } + } +} diff --git a/src/libraries/System.Net.Connections/tests/System.Net.Connections.Tests/ConnectionTest.cs b/src/libraries/System.Net.Connections/tests/System.Net.Connections.Tests/ConnectionTest.cs new file mode 100644 index 0000000..ad16e45 --- /dev/null +++ b/src/libraries/System.Net.Connections/tests/System.Net.Connections.Tests/ConnectionTest.cs @@ -0,0 +1,264 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT license. + +using System.Buffers; +using System.Diagnostics; +using System.Diagnostics.CodeAnalysis; +using System.IO; +using System.IO.Pipelines; +using System.Linq; +using System.Text; +using System.Threading; +using System.Threading.Tasks; +using Xunit; + +namespace System.Net.Connections.Tests +{ + public class ConnectionTest + { + [Fact] + public void CreateStream_CalledOnce_Success() + { + int callCount = 0; + + var con = new MockConnection(); + con.OnCreateStream = () => + { + ++callCount; + return new MemoryStream(); + }; + + _ = con.Stream; + _ = con.Stream; + + Assert.Equal(1, callCount); + } + + [Fact] + public void CreatePipe_CalledOnce_Success() + { + int callCount = 0; + + var con = new MockConnection(); + con.OnCreatePipe = () => + { + ++callCount; + return new MockPipe(); + }; + + _ = con.Pipe; + _ = con.Pipe; + + Assert.Equal(1, callCount); + } + + [Fact] + public void AccessStream_AccessPipe_Fail() + { + var con = new MockConnection(); + con.OnCreateStream = () => new MemoryStream(); + + _ = con.Stream; + Assert.Throws(() => _ = con.Pipe); + } + + [Fact] + public void AccessPipe_AccessStream_Fail() + { + var con = new MockConnection(); + con.OnCreatePipe = () => new MockPipe(); + + _ = con.Pipe; + Assert.Throws(() => _ = con.Stream); + } + + [Fact] + public void AccessStream_NoOverloads_Fail() + { + var con = new ConnectionWithoutStreamOrPipe(); + Assert.Throws(() => _ = con.Stream); + } + + [Fact] + public void AccessPipe_NoOverloads_Fail() + { + var con = new ConnectionWithoutStreamOrPipe(); + Assert.Throws(() => _ = con.Pipe); + } + + [Fact] + public async Task WrappedStream_Success() + { + var bytesA = Encoding.ASCII.GetBytes("foo"); + var bytesB = Encoding.ASCII.GetBytes("bar"); + + var stream = new MemoryStream(); + stream.Write(bytesA); + stream.Position = 0; + + var con = new MockConnection(); + con.OnCreateStream = () => stream; + + IDuplexPipe pipe = con.Pipe; + + ReadResult res = await pipe.Input.ReadAsync(); + Assert.Equal(bytesA, res.Buffer.ToArray()); + + await pipe.Output.WriteAsync(bytesB); + Assert.Equal(bytesA.Concat(bytesB).ToArray(), stream.ToArray()); + } + + [Fact] + public async Task WrappedPipe_Success() + { + var bytesA = Encoding.ASCII.GetBytes("foo"); + var bytesB = Encoding.ASCII.GetBytes("bar"); + + var stream = new MemoryStream(); + stream.Write(bytesA); + stream.Position = 0; + + var pipe = new MockPipe + { + Input = PipeReader.Create(stream), + Output = PipeWriter.Create(stream) + }; + + var con = new MockConnection(); + con.OnCreatePipe = () => pipe; + + Stream s = con.Stream; + + var readBuffer = new byte[4]; + int len = await s.ReadAsync(readBuffer); + Assert.Equal(3, len); + Assert.Equal(bytesA, readBuffer.AsSpan(0, len).ToArray()); + + await s.WriteAsync(bytesB); + Assert.Equal(bytesA.Concat(bytesB).ToArray(), stream.ToArray()); + } + + [Theory] + [InlineData(ConnectionCloseMethod.GracefulShutdown, true)] + [InlineData(ConnectionCloseMethod.Abort, false)] + [InlineData(ConnectionCloseMethod.Immediate, false)] + public async Task FromStream_CloseMethod_Flushed(ConnectionCloseMethod method, bool shouldFlush) + { + bool streamFlushed = false; + + var stream = new MockStream + { + OnFlushAsync = _ => { streamFlushed = true; return Task.CompletedTask; } + }; + + var con = Connection.FromStream(stream, leaveOpen: true); + + await con.CloseAsync(method); + Assert.Equal(shouldFlush, streamFlushed); + } + + [Theory] + [InlineData(ConnectionCloseMethod.GracefulShutdown, true)] + [InlineData(ConnectionCloseMethod.Abort, false)] + [InlineData(ConnectionCloseMethod.Immediate, false)] + public async Task FromPipe_CloseMethod_Flushed(ConnectionCloseMethod method, bool shouldFlush) + { + bool pipeFlushed = false; + + var pipe = new MockPipe + { + Input = new MockPipeReader() + { + OnCompleteAsync = _ => default + }, + Output = new MockPipeWriter() + { + OnFlushAsync = _ => { pipeFlushed = true; return default; }, + OnCompleteAsync = _ => default + } + }; + + var con = Connection.FromPipe(pipe); + + await con.CloseAsync(method, new CancellationTokenSource().Token); + Assert.Equal(shouldFlush, pipeFlushed); + } + + [Theory] + [InlineData(true, false)] + [InlineData(false, true)] + public async Task FromStream_LeaveOpen_StreamDisposed(bool leaveOpen, bool shouldDispose) + { + bool streamDisposed = false; + + var stream = new MockStream(); + stream.OnDisposeAsync = delegate { streamDisposed = true; return default; }; + + var con = Connection.FromStream(stream, leaveOpen); + + await con.CloseAsync(ConnectionCloseMethod.Immediate); + Assert.Equal(shouldDispose, streamDisposed); + } + + [Theory] + [InlineData(true, false)] + [InlineData(false, true)] + public async Task FromPipe_LeaveOpen_PipeDisposed(bool leaveOpen, bool shouldDispose) + { + bool pipeDisposed = false; + + var pipe = new MockPipe + { + OnDisposeAsync = () => { pipeDisposed = true; return default; }, + Input = new MockPipeReader() + { + OnCompleteAsync = _ => default + }, + Output = new MockPipeWriter() + { + OnFlushAsync = _ => default, + OnCompleteAsync = _ => default + } + }; + + var con = Connection.FromPipe(pipe, leaveOpen); + + await con.CloseAsync(ConnectionCloseMethod.Immediate); + Assert.Equal(shouldDispose, pipeDisposed); + } + + [Fact] + public void FromStream_PropertiesInitialized() + { + var properties = new DummyConnectionProperties(); + var localEndPoint = new IPEndPoint(IPAddress.Any, 1); + var remoteEndPoint = new IPEndPoint(IPAddress.Any, 2); + + Connection c = Connection.FromStream(new MockStream(), leaveOpen: false, properties, localEndPoint, remoteEndPoint); + Assert.Same(properties, c.ConnectionProperties); + Assert.Same(localEndPoint, c.LocalEndPoint); + Assert.Same(remoteEndPoint, c.RemoteEndPoint); + } + + [Fact] + public void FromPipe_PropertiesInitialized() + { + var properties = new DummyConnectionProperties(); + var localEndPoint = new IPEndPoint(IPAddress.Any, 1); + var remoteEndPoint = new IPEndPoint(IPAddress.Any, 2); + + Connection c = Connection.FromPipe(new MockPipe(), leaveOpen: false, properties, localEndPoint, remoteEndPoint); + Assert.Same(properties, c.ConnectionProperties); + Assert.Same(localEndPoint, c.LocalEndPoint); + Assert.Same(remoteEndPoint, c.RemoteEndPoint); + } + + private sealed class DummyConnectionProperties : IConnectionProperties + { + public bool TryGet(Type propertyKey, [NotNullWhen(true)] out object property) + { + throw new NotImplementedException(); + } + } + } +} diff --git a/src/libraries/System.Net.Connections/tests/System.Net.Connections.Tests/ConnectionWithoutStreamOrPipe.cs b/src/libraries/System.Net.Connections/tests/System.Net.Connections.Tests/ConnectionWithoutStreamOrPipe.cs new file mode 100644 index 0000000..16e5238 --- /dev/null +++ b/src/libraries/System.Net.Connections/tests/System.Net.Connections.Tests/ConnectionWithoutStreamOrPipe.cs @@ -0,0 +1,22 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT license. + +using System.Threading; +using System.Threading.Tasks; + +namespace System.Net.Connections.Tests +{ + internal class ConnectionWithoutStreamOrPipe : Connection + { + public override IConnectionProperties ConnectionProperties => throw new NotImplementedException(); + + public override EndPoint LocalEndPoint => throw new NotImplementedException(); + + public override EndPoint RemoteEndPoint => throw new NotImplementedException(); + + protected override ValueTask CloseAsyncCore(ConnectionCloseMethod method, CancellationToken cancellationToken) + { + throw new NotImplementedException(); + } + } +} diff --git a/src/libraries/System.Net.Connections/tests/System.Net.Connections.Tests/MockConnection.cs b/src/libraries/System.Net.Connections/tests/System.Net.Connections.Tests/MockConnection.cs new file mode 100644 index 0000000..f1acf75 --- /dev/null +++ b/src/libraries/System.Net.Connections/tests/System.Net.Connections.Tests/MockConnection.cs @@ -0,0 +1,33 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT license. + +using System.IO; +using System.IO.Pipelines; +using System.Threading; +using System.Threading.Tasks; + +namespace System.Net.Connections.Tests +{ + internal class MockConnection : Connection + { + public Func OnConnectionProperties { get; set; } + public Func OnLocalEndPoint { get; set; } + public Func OnRemoteEndPoint { get; set; } + public Func OnCloseAsyncCore { get; set; } + public Func OnCreatePipe { get; set; } + public Func OnCreateStream { get; set; } + + public override IConnectionProperties ConnectionProperties => OnConnectionProperties(); + + public override EndPoint LocalEndPoint => OnLocalEndPoint(); + + public override EndPoint RemoteEndPoint => OnRemoteEndPoint(); + + protected override ValueTask CloseAsyncCore(ConnectionCloseMethod method, CancellationToken cancellationToken) => + OnCloseAsyncCore(method, cancellationToken); + + protected override IDuplexPipe CreatePipe() => OnCreatePipe != null ? OnCreatePipe() : base.CreatePipe(); + + protected override Stream CreateStream() => OnCreateStream != null ? OnCreateStream() : base.CreateStream(); + } +} diff --git a/src/libraries/System.Net.Connections/tests/System.Net.Connections.Tests/MockPipe.cs b/src/libraries/System.Net.Connections/tests/System.Net.Connections.Tests/MockPipe.cs new file mode 100644 index 0000000..6abbb9a --- /dev/null +++ b/src/libraries/System.Net.Connections/tests/System.Net.Connections.Tests/MockPipe.cs @@ -0,0 +1,21 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT license. + +using System.IO.Pipelines; +using System.Threading.Tasks; + +namespace System.Net.Connections.Tests +{ + internal class MockPipe : IDuplexPipe, IAsyncDisposable + { + public Func OnDisposeAsync { get; set; } + public PipeReader Input { get; set; } + + public PipeWriter Output { get; set; } + + public ValueTask DisposeAsync() + { + return OnDisposeAsync?.Invoke() ?? default; + } + } +} diff --git a/src/libraries/System.Net.Connections/tests/System.Net.Connections.Tests/MockPipeReader.cs b/src/libraries/System.Net.Connections/tests/System.Net.Connections.Tests/MockPipeReader.cs new file mode 100644 index 0000000..2413bf1 --- /dev/null +++ b/src/libraries/System.Net.Connections/tests/System.Net.Connections.Tests/MockPipeReader.cs @@ -0,0 +1,44 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT license. + +using System.IO.Pipelines; +using System.Threading; +using System.Threading.Tasks; + +namespace System.Net.Connections.Tests +{ + internal class MockPipeReader : PipeReader + { + public Action OnAdvanceTo { get; set; } + public Action OnCancelPendingRead { get; set; } + public Action OnComplete { get; set; } + public Func OnCompleteAsync { get; set; } + public Func> OnReadAsync { get; set; } + public Func OnTryRead { get; set; } + + public override void AdvanceTo(SequencePosition consumed) + => OnAdvanceTo(consumed, consumed); + + public override void AdvanceTo(SequencePosition consumed, SequencePosition examined) + => OnAdvanceTo(consumed, examined); + + public override void CancelPendingRead() + => OnCancelPendingRead(); + + public override void Complete(Exception exception = null) + => OnComplete(exception); + + public override ValueTask ReadAsync(CancellationToken cancellationToken = default) + => OnReadAsync(cancellationToken); + + public override bool TryRead(out ReadResult result) + { + ReadResult? r = OnTryRead(); + result = r.GetValueOrDefault(); + return r.HasValue; + } + + public override ValueTask CompleteAsync(Exception exception = null) + => OnCompleteAsync(exception); + } +} diff --git a/src/libraries/System.Net.Connections/tests/System.Net.Connections.Tests/MockPipeWriter.cs b/src/libraries/System.Net.Connections/tests/System.Net.Connections.Tests/MockPipeWriter.cs new file mode 100644 index 0000000..7c35e19 --- /dev/null +++ b/src/libraries/System.Net.Connections/tests/System.Net.Connections.Tests/MockPipeWriter.cs @@ -0,0 +1,40 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT license. + +using System.IO.Pipelines; +using System.Threading; +using System.Threading.Tasks; + +namespace System.Net.Connections.Tests +{ + internal class MockPipeWriter : PipeWriter + { + public Action OnAdvance { get; set; } + public Action OnCancelPendingFlush { get; set; } + public Action OnComplete { get; set; } + public Func OnCompleteAsync { get; set; } + public Func> OnFlushAsync { get; set; } + public Func> OnGetMemory { get; set; } + + public override void Advance(int bytes) + => OnAdvance(bytes); + + public override void CancelPendingFlush() + => OnCancelPendingFlush(); + + public override void Complete(Exception exception = null) + => OnComplete(exception); + + public override ValueTask CompleteAsync(Exception exception = null) + => OnCompleteAsync(exception); + + public override ValueTask FlushAsync(CancellationToken cancellationToken = default) + => OnFlushAsync(cancellationToken); + + public override Memory GetMemory(int sizeHint = 0) + => OnGetMemory(sizeHint); + + public override Span GetSpan(int sizeHint = 0) + => GetMemory(sizeHint).Span; + } +} diff --git a/src/libraries/System.Net.Connections/tests/System.Net.Connections.Tests/MockStream.cs b/src/libraries/System.Net.Connections/tests/System.Net.Connections.Tests/MockStream.cs new file mode 100644 index 0000000..50dbc31 --- /dev/null +++ b/src/libraries/System.Net.Connections/tests/System.Net.Connections.Tests/MockStream.cs @@ -0,0 +1,107 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT license. + +using System.IO; +using System.Threading; +using System.Threading.Tasks; + +namespace System.Net.Connections.Tests +{ + internal class MockStream : Stream + { + public Func, CancellationToken, ValueTask> OnReadAsync { get; set; } + public Func, CancellationToken, ValueTask> OnWriteAsync { get; set; } + public Func OnFlushAsync { get; set; } + public Func OnDisposeAsync { get; set; } + + public override bool CanRead => true; + + public override bool CanSeek => false; + + public override bool CanWrite => true; + + public override long Length => throw new NotImplementedException(); + + public override long Position { get => throw new NotImplementedException(); set => throw new NotImplementedException(); } + + protected override void Dispose(bool disposing) + { + if (disposing) DisposeAsync().AsTask().GetAwaiter().GetResult(); + } + + public override ValueTask DisposeAsync() + { + return OnDisposeAsync(); + } + + public override void Flush() + { + FlushAsync().GetAwaiter().GetResult(); + } + + public override Task FlushAsync(CancellationToken cancellationToken) + { + return OnFlushAsync(cancellationToken); + } + + public override int Read(byte[] buffer, int offset, int count) + { + return ReadAsync(buffer, offset, count).GetAwaiter().GetResult(); + } + + public override Task ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) + { + return ReadAsync(buffer.AsMemory(offset, count), cancellationToken).AsTask(); + } + + public override ValueTask ReadAsync(Memory buffer, CancellationToken cancellationToken = default) + { + return OnReadAsync(buffer, cancellationToken); + } + + public override IAsyncResult BeginRead(byte[] buffer, int offset, int count, AsyncCallback callback, object state) + { + return TaskToApm.Begin(ReadAsync(buffer, offset, count), callback, state); + } + + public override int EndRead(IAsyncResult asyncResult) + { + return TaskToApm.End(asyncResult); + } + + public override long Seek(long offset, SeekOrigin origin) + { + throw new NotImplementedException(); + } + + public override void SetLength(long value) + { + throw new NotImplementedException(); + } + + public override void Write(byte[] buffer, int offset, int count) + { + WriteAsync(buffer, offset, count).GetAwaiter().GetResult(); + } + + public override Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) + { + return WriteAsync(buffer.AsMemory(offset, count), cancellationToken).AsTask(); + } + + public override ValueTask WriteAsync(ReadOnlyMemory buffer, CancellationToken cancellationToken = default) + { + return OnWriteAsync(buffer, cancellationToken); + } + + public override IAsyncResult BeginWrite(byte[] buffer, int offset, int count, AsyncCallback callback, object state) + { + return TaskToApm.Begin(WriteAsync(buffer, offset, count), callback, state); + } + + public override void EndWrite(IAsyncResult asyncResult) + { + TaskToApm.End(asyncResult); + } + } +} diff --git a/src/libraries/System.Net.Connections/tests/System.Net.Connections.Tests/System.Net.Connections.Tests.csproj b/src/libraries/System.Net.Connections/tests/System.Net.Connections.Tests/System.Net.Connections.Tests.csproj new file mode 100644 index 0000000..9aa0112 --- /dev/null +++ b/src/libraries/System.Net.Connections/tests/System.Net.Connections.Tests/System.Net.Connections.Tests.csproj @@ -0,0 +1,19 @@ + + + + $(NetCoreAppCurrent) + + + + + + + + + + + + + + + diff --git a/src/libraries/System.Net.Http/ref/System.Net.Http.cs b/src/libraries/System.Net.Http/ref/System.Net.Http.cs index 3386d32..31db0e5 100644 --- a/src/libraries/System.Net.Http/ref/System.Net.Http.cs +++ b/src/libraries/System.Net.Http/ref/System.Net.Http.cs @@ -282,11 +282,21 @@ namespace System.Net.Http protected override System.Threading.Tasks.Task SerializeToStreamAsync(System.IO.Stream stream, System.Net.TransportContext? context, System.Threading.CancellationToken cancellationToken) { throw null; } protected internal override bool TryComputeLength(out long length) { throw null; } } + public partial class SocketsHttpConnectionFactory : System.Net.Connections.ConnectionFactory + { + public SocketsHttpConnectionFactory() { } + public sealed override System.Threading.Tasks.ValueTask ConnectAsync(System.Net.EndPoint? endPoint, System.Net.Connections.IConnectionProperties? options = null, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; } + public virtual System.Net.Sockets.Socket CreateSocket(System.Net.Http.HttpRequestMessage message, System.Net.EndPoint? endPoint, System.Net.Connections.IConnectionProperties options) { throw null; } + protected override void Dispose(bool disposing) { } + protected override System.Threading.Tasks.ValueTask DisposeAsyncCore() { throw null; } + public virtual System.Threading.Tasks.ValueTask EstablishConnectionAsync(System.Net.Http.HttpRequestMessage message, System.Net.EndPoint? endPoint, System.Net.Connections.IConnectionProperties options, System.Threading.CancellationToken cancellationToken) { throw null; } + } public sealed partial class SocketsHttpHandler : System.Net.Http.HttpMessageHandler { public SocketsHttpHandler() { } public bool AllowAutoRedirect { get { throw null; } set { } } public System.Net.DecompressionMethods AutomaticDecompression { get { throw null; } set { } } + public System.Net.Connections.ConnectionFactory? ConnectionFactory { get { throw null; } set { } } public System.TimeSpan ConnectTimeout { get { throw null; } set { } } [System.Diagnostics.CodeAnalysis.AllowNullAttribute] public System.Net.CookieContainer CookieContainer { get { throw null; } set { } } @@ -297,6 +307,7 @@ namespace System.Net.Http public int MaxConnectionsPerServer { get { throw null; } set { } } public int MaxResponseDrainSize { get { throw null; } set { } } public int MaxResponseHeadersLength { get { throw null; } set { } } + public System.Func>? PlaintextFilter { get { throw null; } set { } } public System.TimeSpan PooledConnectionIdleTimeout { get { throw null; } set { } } public System.TimeSpan PooledConnectionLifetime { get { throw null; } set { } } public bool PreAuthenticate { get { throw null; } set { } } diff --git a/src/libraries/System.Net.Http/ref/System.Net.Http.csproj b/src/libraries/System.Net.Http/ref/System.Net.Http.csproj index 9f96fca..296b6c0 100644 --- a/src/libraries/System.Net.Http/ref/System.Net.Http.csproj +++ b/src/libraries/System.Net.Http/ref/System.Net.Http.csproj @@ -9,6 +9,8 @@ + + diff --git a/src/libraries/System.Net.Http/src/System.Net.Http.csproj b/src/libraries/System.Net.Http/src/System.Net.Http.csproj index 62a0334..5732676 100644 --- a/src/libraries/System.Net.Http/src/System.Net.Http.csproj +++ b/src/libraries/System.Net.Http/src/System.Net.Http.csproj @@ -169,6 +169,10 @@ + + + + + @@ -672,7 +677,9 @@ + + diff --git a/src/libraries/System.Net.Http/src/System/Net/Http/BrowserHttpHandler/SocketsHttpConnectionFactory.cs b/src/libraries/System.Net.Http/src/System/Net/Http/BrowserHttpHandler/SocketsHttpConnectionFactory.cs new file mode 100644 index 0000000..664de02 --- /dev/null +++ b/src/libraries/System.Net.Http/src/System/Net/Http/BrowserHttpHandler/SocketsHttpConnectionFactory.cs @@ -0,0 +1,19 @@ +using System.Threading; +using System.Threading.Tasks; +using System.Net.Connections; +using System.Net.Sockets; + +namespace System.Net.Http +{ + public class SocketsHttpConnectionFactory : ConnectionFactory + { + public sealed override ValueTask ConnectAsync(EndPoint? endPoint, IConnectionProperties? options = null, CancellationToken cancellationToken = default) + => throw new NotImplementedException(); + + public virtual Socket CreateSocket(HttpRequestMessage message, EndPoint? endPoint, IConnectionProperties options) + => throw new NotImplementedException(); + + public virtual ValueTask EstablishConnectionAsync(HttpRequestMessage message, EndPoint? endPoint, IConnectionProperties options, CancellationToken cancellationToken) + => throw new NotImplementedException(); + } +} diff --git a/src/libraries/System.Net.Http/src/System/Net/Http/BrowserHttpHandler/SocketsHttpHandler.cs b/src/libraries/System.Net.Http/src/System/Net/Http/BrowserHttpHandler/SocketsHttpHandler.cs index 4644b8b..657d003 100644 --- a/src/libraries/System.Net.Http/src/System/Net/Http/BrowserHttpHandler/SocketsHttpHandler.cs +++ b/src/libraries/System.Net.Http/src/System/Net/Http/BrowserHttpHandler/SocketsHttpHandler.cs @@ -6,6 +6,7 @@ using System.Net.Security; using System.Threading; using System.Threading.Tasks; using System.Diagnostics.CodeAnalysis; +using System.Net.Connections; namespace System.Net.Http { @@ -127,6 +128,18 @@ namespace System.Net.Http set => throw new PlatformNotSupportedException(); } + public ConnectionFactory? ConnectionFactory + { + get => throw new PlatformNotSupportedException(); + set => throw new PlatformNotSupportedException(); + } + + public Func>? PlaintextFilter + { + get => throw new PlatformNotSupportedException(); + set => throw new PlatformNotSupportedException(); + } + public IDictionary Properties => throw new PlatformNotSupportedException(); protected internal override Task SendAsync( diff --git a/src/libraries/System.Net.Http/src/System/Net/Http/SocketsHttpHandler/ConnectHelper.cs b/src/libraries/System.Net.Http/src/System/Net/Http/SocketsHttpHandler/ConnectHelper.cs index 48d530c..467e813 100644 --- a/src/libraries/System.Net.Http/src/System/Net/Http/SocketsHttpHandler/ConnectHelper.cs +++ b/src/libraries/System.Net.Http/src/System/Net/Http/SocketsHttpHandler/ConnectHelper.cs @@ -3,11 +3,10 @@ using System.Diagnostics; using System.IO; +using System.Net.Connections; using System.Net.Quic; using System.Net.Security; using System.Net.Sockets; -using System.Runtime.CompilerServices; -using System.Runtime.ExceptionServices; using System.Security.Cryptography.X509Certificates; using System.Threading; using System.Threading.Tasks; @@ -33,58 +32,23 @@ namespace System.Net.Http } } - public static ValueTask ConnectAsync(string host, int port, bool async, CancellationToken cancellationToken) + public static async ValueTask ConnectAsync(ConnectionFactory factory, DnsEndPoint endPoint, IConnectionProperties? options, CancellationToken cancellationToken) { - return async ? ConnectAsync(host, port, cancellationToken) : new ValueTask(Connect(host, port, cancellationToken)); - } - - private static async ValueTask ConnectAsync(string host, int port, CancellationToken cancellationToken) - { - // Rather than creating a new Socket and calling ConnectAsync on it, we use the static - // Socket.ConnectAsync with a SocketAsyncEventArgs, as we can then use Socket.CancelConnectAsync - // to cancel it if needed. - var saea = new ConnectEventArgs(); try { - saea.Initialize(cancellationToken); - - // Configure which server to which to connect. - saea.RemoteEndPoint = new DnsEndPoint(host, port); - - // Initiate the connection. - if (Socket.ConnectAsync(SocketType.Stream, ProtocolType.Tcp, saea)) - { - // Connect completing asynchronously. Enable it to be canceled and wait for it. - using (cancellationToken.UnsafeRegister(static s => Socket.CancelConnectAsync((SocketAsyncEventArgs)s!), saea)) - { - await saea.Builder.Task.ConfigureAwait(false); - } - } - else if (saea.SocketError != SocketError.Success) - { - // Connect completed synchronously but unsuccessfully. - throw new SocketException((int)saea.SocketError); - } - - Debug.Assert(saea.SocketError == SocketError.Success, $"Expected Success, got {saea.SocketError}."); - Debug.Assert(saea.ConnectSocket != null, "Expected non-null socket"); - - // Configure the socket and return a stream for it. - Socket socket = saea.ConnectSocket; - socket.NoDelay = true; - return new NetworkStream(socket, ownsSocket: true); + return await factory.ConnectAsync(endPoint, options, cancellationToken).ConfigureAwait(false); } - catch (Exception error) when (!(error is OperationCanceledException)) + catch (OperationCanceledException ex) when (ex.CancellationToken == cancellationToken) { - throw CreateWrappedException(error, host, port, cancellationToken); + throw CancellationHelper.CreateOperationCanceledException(innerException: null, cancellationToken); } - finally + catch (Exception ex) { - saea.Dispose(); + throw CreateWrappedException(ex, endPoint.Host, endPoint.Port, cancellationToken); } } - private static Stream Connect(string host, int port, CancellationToken cancellationToken) + public static Connection Connect(string host, int port, CancellationToken cancellationToken) { // For synchronous connections, we can just create a socket and make the connection. cancellationToken.ThrowIfCancellationRequested(); @@ -96,59 +60,14 @@ namespace System.Net.Http { socket.Connect(new DnsEndPoint(host, port)); } - - return new NetworkStream(socket, ownsSocket: true); } catch (Exception e) { socket.Dispose(); throw CreateWrappedException(e, host, port, cancellationToken); } - } - - /// SocketAsyncEventArgs that carries with it additional state for a Task builder and a CancellationToken. - private sealed class ConnectEventArgs : SocketAsyncEventArgs - { - internal ConnectEventArgs() : - // The OnCompleted callback serves just to complete a task that's awaited in ConnectAsync, - // so we don't need to also flow ExecutionContext again into the OnCompleted callback. - base(unsafeSuppressExecutionContextFlow: true) - { - } - - public AsyncTaskMethodBuilder Builder { get; private set; } - public CancellationToken CancellationToken { get; private set; } - public void Initialize(CancellationToken cancellationToken) - { - CancellationToken = cancellationToken; - AsyncTaskMethodBuilder b = default; - _ = b.Task; // force initialization - Builder = b; - } - - protected override void OnCompleted(SocketAsyncEventArgs _) - { - switch (SocketError) - { - case SocketError.Success: - Builder.SetResult(); - break; - - case SocketError.OperationAborted: - case SocketError.ConnectionAborted: - if (CancellationToken.IsCancellationRequested) - { - Builder.SetException(ExceptionDispatchInfo.SetCurrentStackTrace(CancellationHelper.CreateOperationCanceledException(null, CancellationToken))); - break; - } - goto default; - - default: - Builder.SetException(ExceptionDispatchInfo.SetCurrentStackTrace(new SocketException((int)SocketError))); - break; - } - } + return new SocketConnection(socket); } public static ValueTask EstablishSslConnectionAsync(SslClientAuthenticationOptions sslOptions, HttpRequestMessage request, bool async, Stream stream, CancellationToken cancellationToken) diff --git a/src/libraries/System.Net.Http/src/System/Net/Http/SocketsHttpHandler/Connections/SocketConnection.cs b/src/libraries/System.Net.Http/src/System/Net/Http/SocketsHttpHandler/Connections/SocketConnection.cs new file mode 100644 index 0000000..37b6f83 --- /dev/null +++ b/src/libraries/System.Net.Http/src/System/Net/Http/SocketsHttpHandler/Connections/SocketConnection.cs @@ -0,0 +1,101 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT license. + +using System.Diagnostics.CodeAnalysis; +using System.IO; +using System.Net.Sockets; +using System.Threading; +using System.Threading.Tasks; + +namespace System.Net.Connections +{ + internal sealed class SocketConnection : Connection, IConnectionProperties + { + private readonly SocketConnectionNetworkStream _stream; + + public override EndPoint? RemoteEndPoint => _stream.Socket.RemoteEndPoint; + public override EndPoint? LocalEndPoint => _stream.Socket.LocalEndPoint; + public override IConnectionProperties ConnectionProperties => this; + + public SocketConnection(Socket socket) + { + _stream = new SocketConnectionNetworkStream(socket, this); + } + + protected override ValueTask CloseAsyncCore(ConnectionCloseMethod method, CancellationToken cancellationToken) + { + if (cancellationToken.IsCancellationRequested) + { + return ValueTask.FromCanceled(cancellationToken); + } + + try + { + if (method != ConnectionCloseMethod.GracefulShutdown) + { + // Dispose must be called first in order to cause a connection reset, + // as NetworkStream.Dispose() will call Shutdown(Both). + _stream.Socket.Dispose(); + } + + _stream.DisposeWithoutClosingConnection(); + } + catch (Exception ex) + { + return ValueTask.FromException(ex); + } + + return default; + } + + protected override Stream CreateStream() => _stream; + + bool IConnectionProperties.TryGet(Type propertyKey, [NotNullWhen(true)] out object? property) + { + if (propertyKey == typeof(Socket)) + { + property = _stream.Socket; + return true; + } + + property = null; + return false; + } + + // This is done to couple disposal of the SocketConnection and the NetworkStream. + private sealed class SocketConnectionNetworkStream : NetworkStream + { + private readonly SocketConnection _connection; + + public SocketConnectionNetworkStream(Socket socket, SocketConnection connection) : base(socket, ownsSocket: true) + { + _connection = connection; + } + + public void DisposeWithoutClosingConnection() + { + base.Dispose(true); + } + + protected override void Dispose(bool disposing) + { + if (disposing) + { + // This will call base.Dispose(). + _connection.Dispose(); + } + else + { + base.Dispose(disposing); + } + } + + public override ValueTask DisposeAsync() + { + // This will call base.Dispose(). + Dispose(true); + return default; + } + } + } +} diff --git a/src/libraries/System.Net.Http/src/System/Net/Http/SocketsHttpHandler/Connections/TaskSocketAsyncEventArgs.cs b/src/libraries/System.Net.Http/src/System/Net/Http/SocketsHttpHandler/Connections/TaskSocketAsyncEventArgs.cs new file mode 100644 index 0000000..7bf8947 --- /dev/null +++ b/src/libraries/System.Net.Http/src/System/Net/Http/SocketsHttpHandler/Connections/TaskSocketAsyncEventArgs.cs @@ -0,0 +1,32 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT license. + +using System.Net.Sockets; +using System.Threading.Tasks; +using System.Threading.Tasks.Sources; + +namespace System.Net.Connections +{ + internal sealed class TaskSocketAsyncEventArgs : SocketAsyncEventArgs, IValueTaskSource + { + private ManualResetValueTaskSourceCore _valueTaskSource; + + public void ResetTask() => _valueTaskSource.Reset(); + public ValueTask Task => new ValueTask(this, _valueTaskSource.Version); + + public void GetResult(short token) => _valueTaskSource.GetResult(token); + public ValueTaskSourceStatus GetStatus(short token) => _valueTaskSource.GetStatus(token); + public void OnCompleted(Action continuation, object? state, short token, ValueTaskSourceOnCompletedFlags flags) => _valueTaskSource.OnCompleted(continuation, state, token, flags); + public void Complete() => _valueTaskSource.SetResult(0); + + public TaskSocketAsyncEventArgs() + : base(unsafeSuppressExecutionContextFlow: true) + { + } + + protected override void OnCompleted(SocketAsyncEventArgs e) + { + _valueTaskSource.SetResult(0); + } + } +} diff --git a/src/libraries/System.Net.Http/src/System/Net/Http/SocketsHttpHandler/DnsEndPointWithProperties.cs b/src/libraries/System.Net.Http/src/System/Net/Http/SocketsHttpHandler/DnsEndPointWithProperties.cs new file mode 100644 index 0000000..b99801c --- /dev/null +++ b/src/libraries/System.Net.Http/src/System/Net/Http/SocketsHttpHandler/DnsEndPointWithProperties.cs @@ -0,0 +1,31 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT license. + +using System.Diagnostics.CodeAnalysis; +using System.Net.Connections; + +namespace System.Net.Http +{ + // Passed to a connection factory, merges allocations for the DnsEndPoint and connection properties. + internal sealed class DnsEndPointWithProperties : DnsEndPoint, IConnectionProperties + { + public HttpRequestMessage InitialRequest { get; } + + public DnsEndPointWithProperties(string host, int port, HttpRequestMessage initialRequest) : base(host, port) + { + InitialRequest = initialRequest; + } + + bool IConnectionProperties.TryGet(Type propertyKey, [NotNullWhen(true)] out object? property) + { + if (propertyKey == typeof(DnsEndPointWithProperties)) + { + property = this; + return true; + } + + property = null; + return false; + } + } +} diff --git a/src/libraries/System.Net.Http/src/System/Net/Http/SocketsHttpHandler/Http2Connection.cs b/src/libraries/System.Net.Http/src/System/Net/Http/SocketsHttpHandler/Http2Connection.cs index b21fd4a..1bd39f4 100644 --- a/src/libraries/System.Net.Http/src/System/Net/Http/SocketsHttpHandler/Http2Connection.cs +++ b/src/libraries/System.Net.Http/src/System/Net/Http/SocketsHttpHandler/Http2Connection.cs @@ -6,6 +6,7 @@ using System.Collections.Generic; using System.Diagnostics; using System.Diagnostics.CodeAnalysis; using System.IO; +using System.Net.Connections; using System.Net.Http.Headers; using System.Net.Http.HPack; using System.Net.Security; @@ -21,6 +22,7 @@ namespace System.Net.Http { private readonly HttpConnectionPool _pool; private readonly Stream _stream; + private readonly Connection _connection; // NOTE: These are mutable structs; do not make these readonly. private ArrayBuffer _incomingBuffer; @@ -94,10 +96,11 @@ namespace System.Net.Http // Channel options for creating _writeChannel private static readonly UnboundedChannelOptions s_channelOptions = new UnboundedChannelOptions() { SingleReader = true }; - public Http2Connection(HttpConnectionPool pool, Stream stream) + public Http2Connection(HttpConnectionPool pool, Connection connection) { _pool = pool; - _stream = stream; + _stream = connection.Stream; + _connection = connection; _incomingBuffer = new ArrayBuffer(InitialConnectionBufferSize); _outgoingBuffer = new ArrayBuffer(InitialConnectionBufferSize); @@ -116,7 +119,7 @@ namespace System.Net.Http _pendingWindowUpdate = 0; _idleSinceTickCount = Environment.TickCount64; - if (NetEventSource.Log.IsEnabled()) TraceConnection(stream); + if (NetEventSource.Log.IsEnabled()) TraceConnection(_stream); } private object SyncObject => _httpStreams; @@ -1585,7 +1588,7 @@ namespace System.Net.Http } // Do shutdown. - _stream.Close(); + _connection.Dispose(); _connectionWindow.Dispose(); _concurrentStreams.Dispose(); diff --git a/src/libraries/System.Net.Http/src/System/Net/Http/SocketsHttpHandler/HttpConnection.cs b/src/libraries/System.Net.Http/src/System/Net/Http/SocketsHttpHandler/HttpConnection.cs index 780212d..33383a1 100644 --- a/src/libraries/System.Net.Http/src/System/Net/Http/SocketsHttpHandler/HttpConnection.cs +++ b/src/libraries/System.Net.Http/src/System/Net/Http/SocketsHttpHandler/HttpConnection.cs @@ -10,6 +10,7 @@ using System.IO; using System.Net.Http.Headers; using System.Net.Security; using System.Net.Sockets; +using System.Net.Connections; using System.Runtime.CompilerServices; using System.Text; using System.Threading; @@ -45,6 +46,7 @@ namespace System.Net.Http private readonly HttpConnectionPool _pool; private readonly Socket? _socket; // used for polling; _stream should be used for all reading/writing. _stream owns disposal. private readonly Stream _stream; + private readonly Connection _connection; private readonly TransportContext? _transportContext; private readonly WeakReference _weakThisRef; @@ -68,16 +70,16 @@ namespace System.Net.Http public HttpConnection( HttpConnectionPool pool, - Socket? socket, - Stream stream, + Connection connection, TransportContext? transportContext) { Debug.Assert(pool != null); - Debug.Assert(stream != null); + Debug.Assert(connection != null); _pool = pool; - _socket = socket; // may be null in cases where we couldn't easily get the underlying socket - _stream = stream; + connection.ConnectionProperties.TryGet(out _socket); // may be null in cases where we couldn't easily get the underlying socket + _stream = connection.Stream; + _connection = connection; _transportContext = transportContext; _writeBuffer = new byte[InitialWriteBufferSize]; @@ -101,7 +103,7 @@ namespace System.Net.Http if (disposing) { GC.SuppressFinalize(this); - _stream.Dispose(); + _connection.Dispose(); // Eat any exceptions from the read-ahead task. We don't need to log, as we expect // failures from this task due to closing the connection while a read is in progress. @@ -1917,7 +1919,7 @@ namespace System.Net.Http internal sealed class HttpConnectionWithFinalizer : HttpConnection { - public HttpConnectionWithFinalizer(HttpConnectionPool pool, Socket? socket, Stream stream, TransportContext? transportContext) : base(pool, socket, stream, transportContext) { } + public HttpConnectionWithFinalizer(HttpConnectionPool pool, Connection connection, TransportContext? transportContext) : base(pool, connection, transportContext) { } // This class is separated from HttpConnection so we only pay the price of having a finalizer // when it's actually needed, e.g. when MaxConnectionsPerServer is enabled. diff --git a/src/libraries/System.Net.Http/src/System/Net/Http/SocketsHttpHandler/HttpConnectionPool.cs b/src/libraries/System.Net.Http/src/System/Net/Http/SocketsHttpHandler/HttpConnectionPool.cs index 072c9c2..af9e2cd 100644 --- a/src/libraries/System.Net.Http/src/System/Net/Http/SocketsHttpHandler/HttpConnectionPool.cs +++ b/src/libraries/System.Net.Http/src/System/Net/Http/SocketsHttpHandler/HttpConnectionPool.cs @@ -6,6 +6,7 @@ using System.Collections.Generic; using System.Diagnostics; using System.Globalization; using System.IO; +using System.Net.Connections; using System.Net.Http.Headers; using System.Net.Http.HPack; using System.Net.Http.QPack; @@ -509,7 +510,7 @@ namespace System.Net.Http } // Try to establish an HTTP2 connection - Socket? socket = null; + Connection? connection = null; SslStream? sslStream = null; TransportContext? transportContext = null; @@ -531,18 +532,28 @@ namespace System.Net.Http Trace("Attempting new HTTP2 connection."); } - Stream? stream; HttpResponseMessage? failureResponse; - (socket, stream, transportContext, failureResponse) = + + (connection, transportContext, failureResponse) = await ConnectAsync(request, async, true, cancellationToken).ConfigureAwait(false); + if (failureResponse != null) { return (null, true, failureResponse); } + Debug.Assert(connection != null); + + sslStream = connection.Stream as SslStream; + + if (Settings._plaintextFilter != null) + { + connection = await Settings._plaintextFilter(request, connection, cancellationToken).ConfigureAwait(false); + } + if (_kind == HttpConnectionKind.Http) { - http2Connection = new Http2Connection(this, stream!); + http2Connection = new Http2Connection(this, connection); await http2Connection.SetupAsync().ConfigureAwait(false); AddHttp2Connection(http2Connection); @@ -555,7 +566,8 @@ namespace System.Net.Http return (http2Connection, true, null); } - sslStream = (SslStream)stream!; + Debug.Assert(sslStream != null); + if (sslStream.NegotiatedApplicationProtocol == SslApplicationProtocol.Http2) { // The server accepted our request for HTTP2. @@ -565,7 +577,7 @@ namespace System.Net.Http throw new HttpRequestException(SR.Format(SR.net_ssl_http2_requires_tls12, sslStream.SslProtocol)); } - http2Connection = new Http2Connection(this, sslStream); + http2Connection = new Http2Connection(this, connection); await http2Connection.SetupAsync().ConfigureAwait(false); AddHttp2Connection(http2Connection); @@ -616,7 +628,7 @@ namespace System.Net.Http if (canUse) { - return (ConstructHttp11Connection(socket, sslStream, transportContext), true, null); + return (ConstructHttp11Connection(connection!, transportContext), true, null); } else { @@ -625,7 +637,7 @@ namespace System.Net.Http Trace("Discarding downgraded HTTP/1.1 connection because connection limit is exceeded"); } - sslStream.Close(); + await connection!.CloseAsync(ConnectionCloseMethod.GracefulShutdown, cancellationToken).ConfigureAwait(false); } } @@ -1124,7 +1136,7 @@ namespace System.Net.Http return SendWithProxyAuthAsync(request, async, doRequestAuth, cancellationToken); } - private async ValueTask<(Socket?, Stream?, TransportContext?, HttpResponseMessage?)> ConnectAsync(HttpRequestMessage request, bool async, bool allowHttp2, CancellationToken cancellationToken) + private async ValueTask<(Connection?, TransportContext?, HttpResponseMessage?)> ConnectAsync(HttpRequestMessage request, bool async, bool allowHttp2, CancellationToken cancellationToken) { // If a non-infinite connect timeout has been set, create and use a new CancellationToken that will be canceled // when either the original token is canceled or a connect timeout occurs. @@ -1138,44 +1150,44 @@ namespace System.Net.Http try { - Stream? stream = null; + Connection? connection = null; switch (_kind) { case HttpConnectionKind.Http: case HttpConnectionKind.Https: case HttpConnectionKind.ProxyConnect: Debug.Assert(_originAuthority != null); - stream = await ConnectHelper.ConnectAsync(_originAuthority.IdnHost, _originAuthority.Port, async, cancellationToken).ConfigureAwait(false); + connection = await ConnectToTcpHostAsync(_originAuthority.IdnHost, _originAuthority.Port, request, async, cancellationToken).ConfigureAwait(false); break; case HttpConnectionKind.Proxy: - stream = await ConnectHelper.ConnectAsync(_proxyUri!.IdnHost, _proxyUri.Port, async, cancellationToken).ConfigureAwait(false); + connection = await ConnectToTcpHostAsync(_proxyUri!.IdnHost, _proxyUri.Port, request, async, cancellationToken).ConfigureAwait(false); break; case HttpConnectionKind.ProxyTunnel: case HttpConnectionKind.SslProxyTunnel: HttpResponseMessage? response; - (stream, response) = await EstablishProxyTunnel(async, request.HasHeaders ? request.Headers : null, cancellationToken).ConfigureAwait(false); + (connection, response) = await EstablishProxyTunnel(async, request.HasHeaders ? request.Headers : null, cancellationToken).ConfigureAwait(false); if (response != null) { // Return non-success response from proxy. response.RequestMessage = request; - return (null, null, null, response); + return (null, null, response); } break; } - Socket? socket = (stream as NetworkStream)?.Socket; + Debug.Assert(connection != null); TransportContext? transportContext = null; if (_kind == HttpConnectionKind.Https || _kind == HttpConnectionKind.SslProxyTunnel) { - SslStream sslStream = await ConnectHelper.EstablishSslConnectionAsync(allowHttp2 ? _sslOptionsHttp2! : _sslOptionsHttp11!, request, async, stream!, cancellationToken).ConfigureAwait(false); - stream = sslStream; + SslStream sslStream = await ConnectHelper.EstablishSslConnectionAsync(allowHttp2 ? _sslOptionsHttp2! : _sslOptionsHttp11!, request, async, connection.Stream, cancellationToken).ConfigureAwait(false); + connection = Connection.FromStream(sslStream, leaveOpen: false, connection.ConnectionProperties, connection.LocalEndPoint, connection.RemoteEndPoint); transportContext = sslStream.TransportContext; } - return (socket, stream, transportContext, null); + return (connection, transportContext, null); } finally { @@ -1183,9 +1195,37 @@ namespace System.Net.Http } } + private ValueTask ConnectToTcpHostAsync(string host, int port, HttpRequestMessage initialRequest, bool async, CancellationToken cancellationToken) + { + if (async) + { + ConnectionFactory connectionFactory = Settings._connectionFactory ?? SocketsHttpConnectionFactory.Default; + + var endPoint = new DnsEndPointWithProperties(host, port, initialRequest); + return ConnectHelper.ConnectAsync(connectionFactory, endPoint, endPoint, cancellationToken); + } + + // Synchronous path. + + if (Settings._connectionFactory != null) + { + // connection factories only support async. + throw new HttpRequestException(); + } + + try + { + return new ValueTask(ConnectHelper.Connect(host, port, cancellationToken)); + } + catch (Exception ex) + { + return ValueTask.FromException(ex); + } + } + internal async ValueTask<(HttpConnection?, HttpResponseMessage?)> CreateHttp11ConnectionAsync(HttpRequestMessage request, bool async, CancellationToken cancellationToken) { - (Socket? socket, Stream? stream, TransportContext? transportContext, HttpResponseMessage? failureResponse) = + (Connection? connection, TransportContext? transportContext, HttpResponseMessage? failureResponse) = await ConnectAsync(request, async, false, cancellationToken).ConfigureAwait(false); if (failureResponse != null) @@ -1193,18 +1233,18 @@ namespace System.Net.Http return (null, failureResponse); } - return (ConstructHttp11Connection(socket, stream!, transportContext), null); + return (ConstructHttp11Connection(connection!, transportContext), null); } - private HttpConnection ConstructHttp11Connection(Socket? socket, Stream stream, TransportContext? transportContext) + private HttpConnection ConstructHttp11Connection(Connection connection, TransportContext? transportContext) { return _maxConnections == int.MaxValue ? - new HttpConnection(this, socket, stream, transportContext) : - new HttpConnectionWithFinalizer(this, socket, stream, transportContext); // finalizer needed to signal the pool when a connection is dropped + new HttpConnection(this, connection, transportContext) : + new HttpConnectionWithFinalizer(this, connection, transportContext); // finalizer needed to signal the pool when a connection is dropped } // Returns the established stream or an HttpResponseMessage from the proxy indicating failure. - private async ValueTask<(Stream?, HttpResponseMessage?)> EstablishProxyTunnel(bool async, HttpRequestHeaders? headers, CancellationToken cancellationToken) + private async ValueTask<(Connection?, HttpResponseMessage?)> EstablishProxyTunnel(bool async, HttpRequestHeaders? headers, CancellationToken cancellationToken) { Debug.Assert(_originAuthority != null); // Send a CONNECT request to the proxy server to establish a tunnel. @@ -1223,7 +1263,12 @@ namespace System.Net.Http return (null, tunnelResponse); } - return (tunnelResponse.Content!.ReadAsStream(cancellationToken), null); + Stream stream = tunnelResponse.Content.ReadAsStream(cancellationToken); + EndPoint remoteEndPoint = new DnsEndPoint(_originAuthority.IdnHost, _originAuthority.Port); + + // TODO: the Socket from the response can be funneled into a connection property here. + + return (Connection.FromStream(stream, remoteEndPoint: remoteEndPoint), null); } /// Enqueues a waiter to the waiters list. diff --git a/src/libraries/System.Net.Http/src/System/Net/Http/SocketsHttpHandler/HttpConnectionSettings.cs b/src/libraries/System.Net.Http/src/System/Net/Http/SocketsHttpHandler/HttpConnectionSettings.cs index 4ba7696..558e6cf 100644 --- a/src/libraries/System.Net.Http/src/System/Net/Http/SocketsHttpHandler/HttpConnectionSettings.cs +++ b/src/libraries/System.Net.Http/src/System/Net/Http/SocketsHttpHandler/HttpConnectionSettings.cs @@ -2,7 +2,10 @@ // The .NET Foundation licenses this file to you under the MIT license. using System.Collections.Generic; +using System.Net.Connections; using System.Net.Security; +using System.Threading; +using System.Threading.Tasks; namespace System.Net.Http { @@ -54,6 +57,9 @@ namespace System.Net.Http internal bool _enableMultipleHttp2Connections; + internal ConnectionFactory? _connectionFactory; + internal Func>? _plaintextFilter; + internal IDictionary? _properties; public HttpConnectionSettings() @@ -104,7 +110,9 @@ namespace System.Net.Http _useProxy = _useProxy, _allowUnencryptedHttp2 = _allowUnencryptedHttp2, _assumePrenegotiatedHttp3ForTesting = _assumePrenegotiatedHttp3ForTesting, - _enableMultipleHttp2Connections = _enableMultipleHttp2Connections + _enableMultipleHttp2Connections = _enableMultipleHttp2Connections, + _connectionFactory = _connectionFactory, + _plaintextFilter = _plaintextFilter }; } diff --git a/src/libraries/System.Net.Http/src/System/Net/Http/SocketsHttpHandler/SocketsHttpConnectionFactory.cs b/src/libraries/System.Net.Http/src/System/Net/Http/SocketsHttpHandler/SocketsHttpConnectionFactory.cs new file mode 100644 index 0000000..e224616 --- /dev/null +++ b/src/libraries/System.Net.Http/src/System/Net/Http/SocketsHttpHandler/SocketsHttpConnectionFactory.cs @@ -0,0 +1,89 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT license. + +using System.Net.Connections; +using System.Net.Sockets; +using System.Runtime.ExceptionServices; +using System.Threading; +using System.Threading.Tasks; + +namespace System.Net.Http +{ + /// + /// The default connection factory used by , opening TCP connections. + /// + public class SocketsHttpConnectionFactory : ConnectionFactory + { + internal static SocketsHttpConnectionFactory Default { get; } = new SocketsHttpConnectionFactory(); + + /// + public sealed override ValueTask ConnectAsync(EndPoint? endPoint, IConnectionProperties? options = null, CancellationToken cancellationToken = default) + { + if (options == null || !options.TryGet(out DnsEndPointWithProperties? httpOptions)) + { + return ValueTask.FromException(ExceptionDispatchInfo.SetCurrentStackTrace(new HttpRequestException($"{nameof(SocketsHttpConnectionFactory)} requires a {nameof(DnsEndPointWithProperties)} property."))); + } + + return EstablishConnectionAsync(httpOptions!.InitialRequest, endPoint, options, cancellationToken); + } + + /// + /// Creates the socket to be used for a request. + /// + /// The request causing this socket to be opened. Once opened, it may be reused for many subsequent requests. + /// The EndPoint this socket will be connected to. + /// Properties, if any, that might change how the socket is initialized. + /// A new unconnected socket. + public virtual Socket CreateSocket(HttpRequestMessage message, EndPoint? endPoint, IConnectionProperties options) + { + return new Socket(SocketType.Stream, ProtocolType.Tcp); + } + + /// + /// Establishes a new connection for a request. + /// + /// The request causing this connection to be established. Once connected, it may be reused for many subsequent requests. + /// The EndPoint to connect to. + /// Properties, if any, that might change how the connection is made. + /// A cancellation token for the asynchronous operation. + /// A new open connection. + public virtual async ValueTask EstablishConnectionAsync(HttpRequestMessage message, EndPoint? endPoint, IConnectionProperties options, CancellationToken cancellationToken) + { + if (message == null) throw new ArgumentNullException(nameof(message)); + if (endPoint == null) throw new ArgumentNullException(nameof(endPoint)); + + Socket socket = CreateSocket(message, endPoint, options); + + try + { + using var args = new TaskSocketAsyncEventArgs(); + args.RemoteEndPoint = endPoint; + + if (socket.ConnectAsync(args)) + { + using (cancellationToken.UnsafeRegister(o => Socket.CancelConnectAsync((SocketAsyncEventArgs)o!), args)) + { + await args.Task.ConfigureAwait(false); + } + } + + if (args.SocketError != SocketError.Success) + { + Exception ex = args.SocketError == SocketError.OperationAborted && cancellationToken.IsCancellationRequested + ? (Exception)new OperationCanceledException(cancellationToken) + : new SocketException((int)args.SocketError); + + throw ex; + } + + socket.NoDelay = true; + return new SocketConnection(socket); + } + catch + { + socket.Dispose(); + throw; + } + } + } +} diff --git a/src/libraries/System.Net.Http/src/System/Net/Http/SocketsHttpHandler/SocketsHttpHandler.cs b/src/libraries/System.Net.Http/src/System/Net/Http/SocketsHttpHandler/SocketsHttpHandler.cs index 1874723..de391e9 100644 --- a/src/libraries/System.Net.Http/src/System/Net/Http/SocketsHttpHandler/SocketsHttpHandler.cs +++ b/src/libraries/System.Net.Http/src/System/Net/Http/SocketsHttpHandler/SocketsHttpHandler.cs @@ -7,6 +7,7 @@ using System.Net.Security; using System.Threading; using System.Threading.Tasks; using System.Diagnostics.CodeAnalysis; +using System.Net.Connections; namespace System.Net.Http { @@ -288,6 +289,33 @@ namespace System.Net.Http internal bool SupportsProxy => true; internal bool SupportsRedirectConfiguration => true; + /// + /// When non-null, a custom factory used to open new TCP connections. + /// When null, a will be used. + /// + public ConnectionFactory? ConnectionFactory + { + get => _settings._connectionFactory; + set + { + CheckDisposedOrStarted(); + _settings._connectionFactory = value; + } + } + + /// + /// When non-null, a connection filter that is applied prior to any TLS encryption. + /// + public Func>? PlaintextFilter + { + get => _settings._plaintextFilter; + set + { + CheckDisposedOrStarted(); + _settings._plaintextFilter = value; + } + } + public IDictionary Properties => _settings._properties ?? (_settings._properties = new Dictionary()); diff --git a/src/libraries/System.Net.Http/tests/FunctionalTests/SocketsHttpHandlerTest.cs b/src/libraries/System.Net.Http/tests/FunctionalTests/SocketsHttpHandlerTest.cs index 3a4aef6..4328b0a 100644 --- a/src/libraries/System.Net.Http/tests/FunctionalTests/SocketsHttpHandlerTest.cs +++ b/src/libraries/System.Net.Http/tests/FunctionalTests/SocketsHttpHandlerTest.cs @@ -5,6 +5,7 @@ using System.Collections.Generic; using System.Diagnostics; using System.IO; using System.Linq; +using System.Net.Connections; using System.Net.Quic; using System.Net.Security; using System.Net.Sockets; @@ -105,6 +106,71 @@ namespace System.Net.Http.Functional.Tests } } + public class SocketsHttpHandler_ConnectionFactoryTest : HttpClientHandlerTestBase + { + public SocketsHttpHandler_ConnectionFactoryTest(ITestOutputHelper output) : base(output) { } + + [Fact] + public async Task CustomConnectionFactory_AsyncRequest_Success() + { + await using ConnectionListenerFactory listenerFactory = new VirtualNetworkConnectionListenerFactory(); + await using ConnectionListener listener = await listenerFactory.BindAsync(endPoint: null); + await using ConnectionFactory connectionFactory = VirtualNetworkConnectionListenerFactory.GetConnectionFactory(listener); + + // TODO: if GenericLoopbackOptions actually worked for HTTP/1 LoopbackServer we could just use that and pass in to CreateConnectionAsync. + // Making that work causes other tests to fail, so for now... + bool useHttps = UseVersion.Major >= 2 && new GenericLoopbackOptions().UseSsl; + + Task serverTask = Task.Run(async () => + { + await using Connection serverConnection = await listener.AcceptAsync(); + using GenericLoopbackConnection loopbackConnection = await LoopbackServerFactory.CreateConnectionAsync(socket: null, serverConnection.Stream); + + await loopbackConnection.InitializeConnectionAsync(); + + HttpRequestData requestData = await loopbackConnection.ReadRequestDataAsync(); + await loopbackConnection.SendResponseAsync(content: "foo"); + + Assert.Equal("/foo", requestData.Path); + }); + + Task clientTask = Task.Run(async () => + { + using HttpClientHandler handler = CreateHttpClientHandler(); + + var socketsHandler = (SocketsHttpHandler)GetUnderlyingSocketsHttpHandler(handler); + socketsHandler.ConnectionFactory = connectionFactory; + + using HttpClient client = CreateHttpClient(handler); + + string response = await client.GetStringAsync($"{(useHttps ? "https" : "http")}://{Guid.NewGuid():N}.com/foo"); + Assert.Equal("foo", response); + }); + + await new[] { serverTask, clientTask }.WhenAllOrAnyFailed(60_000); + } + + [Fact] + public async Task CustomConnectionFactory_SyncRequest_Fails() + { + await using ConnectionFactory connectionFactory = new SocketsHttpConnectionFactory(); + using SocketsHttpHandler handler = new SocketsHttpHandler + { + ConnectionFactory = connectionFactory + }; + + using HttpClient client = CreateHttpClient(handler); + + await Assert.ThrowsAnyAsync(() => client.GetStringAsync($"http://{Guid.NewGuid():N}.com/foo")); + } + } + + public sealed class SocketsHttpHandler_ConnectionFactoryTest_Http2 : SocketsHttpHandler_ConnectionFactoryTest + { + public SocketsHttpHandler_ConnectionFactoryTest_Http2(ITestOutputHelper output) : base(output) { } + protected override Version UseVersion => HttpVersion.Version20; + } + public sealed class SocketsHttpHandler_HttpProtocolTests : HttpProtocolTests { public SocketsHttpHandler_HttpProtocolTests(ITestOutputHelper output) : base(output) { } diff --git a/src/libraries/System.Net.Http/tests/FunctionalTests/System.Net.Http.Functional.Tests.csproj b/src/libraries/System.Net.Http/tests/FunctionalTests/System.Net.Http.Functional.Tests.csproj index 2e00a89..bdf9887 100644 --- a/src/libraries/System.Net.Http/tests/FunctionalTests/System.Net.Http.Functional.Tests.csproj +++ b/src/libraries/System.Net.Http/tests/FunctionalTests/System.Net.Http.Functional.Tests.csproj @@ -230,6 +230,9 @@ Link="Common\System\Net\Http\ThrowingContent.cs" /> + + + diff --git a/src/libraries/pkg/baseline/packageIndex.json b/src/libraries/pkg/baseline/packageIndex.json index 2c2fd22..5ab3eb9 100644 --- a/src/libraries/pkg/baseline/packageIndex.json +++ b/src/libraries/pkg/baseline/packageIndex.json @@ -3353,7 +3353,9 @@ "4.6.0" ], "BaselineVersion": "5.0.0", - "InboxOn": {}, + "InboxOn": { + "net5.0": "5.0.0.0" + }, "AssemblyVersionInPackageVersion": { "4.0.0.0": "4.5.0", "4.0.0.1": "4.5.2", -- 2.7.4