Initial (partially-reviewed API) System.Net.Connections. (#39524)
authorCory Nelson <phrosty@gmail.com>
Tue, 28 Jul 2020 15:06:57 +0000 (08:06 -0700)
committerGitHub <noreply@github.com>
Tue, 28 Jul 2020 15:06:57 +0000 (08:06 -0700)
55 files changed:
src/libraries/Common/tests/System/Net/Http/GenericLoopbackServer.cs
src/libraries/Common/tests/System/Net/Http/Http2LoopbackConnection.cs
src/libraries/Common/tests/System/Net/Http/Http2LoopbackServer.cs
src/libraries/Common/tests/System/Net/Http/Http3LoopbackConnection.cs
src/libraries/Common/tests/System/Net/Http/Http3LoopbackServer.cs
src/libraries/Common/tests/System/Net/Http/LoopbackServer.cs
src/libraries/Common/tests/System/Net/VirtualNetwork/VirtualNetwork.cs
src/libraries/Common/tests/System/Net/VirtualNetwork/VirtualNetworkConnectionListenerFactory.cs [new file with mode: 0644]
src/libraries/Common/tests/System/Net/VirtualNetwork/VirtualNetworkStream.cs
src/libraries/Common/tests/System/Threading/Tasks/TaskTimeoutExtensions.cs
src/libraries/NetCoreAppLibrary.props
src/libraries/System.IO.Pipelines/pkg/System.IO.Pipelines.pkgproj
src/libraries/System.IO.Pipelines/ref/System.IO.Pipelines.csproj
src/libraries/System.Net.Connections/Directory.Build.props [new file with mode: 0644]
src/libraries/System.Net.Connections/System.Net.Connections.sln [new file with mode: 0644]
src/libraries/System.Net.Connections/ref/System.Net.Connections.cs [new file with mode: 0644]
src/libraries/System.Net.Connections/ref/System.Net.Connections.csproj [new file with mode: 0644]
src/libraries/System.Net.Connections/src/Resources/Strings.resx [new file with mode: 0644]
src/libraries/System.Net.Connections/src/System.Net.Connections.csproj [new file with mode: 0644]
src/libraries/System.Net.Connections/src/System/Net/Connections/Connection.cs [new file with mode: 0644]
src/libraries/System.Net.Connections/src/System/Net/Connections/ConnectionBase.cs [new file with mode: 0644]
src/libraries/System.Net.Connections/src/System/Net/Connections/ConnectionCloseMethod.cs [new file with mode: 0644]
src/libraries/System.Net.Connections/src/System/Net/Connections/ConnectionExtensions.cs [new file with mode: 0644]
src/libraries/System.Net.Connections/src/System/Net/Connections/ConnectionFactory.cs [new file with mode: 0644]
src/libraries/System.Net.Connections/src/System/Net/Connections/ConnectionListener.cs [new file with mode: 0644]
src/libraries/System.Net.Connections/src/System/Net/Connections/ConnectionListenerFactory.cs [new file with mode: 0644]
src/libraries/System.Net.Connections/src/System/Net/Connections/DuplexPipeStream.cs [new file with mode: 0644]
src/libraries/System.Net.Connections/src/System/Net/Connections/IConnectionProperties.cs [new file with mode: 0644]
src/libraries/System.Net.Connections/tests/System.Net.Connections.Tests/ConnectionBaseTest.cs [new file with mode: 0644]
src/libraries/System.Net.Connections/tests/System.Net.Connections.Tests/ConnectionTest.cs [new file with mode: 0644]
src/libraries/System.Net.Connections/tests/System.Net.Connections.Tests/ConnectionWithoutStreamOrPipe.cs [new file with mode: 0644]
src/libraries/System.Net.Connections/tests/System.Net.Connections.Tests/MockConnection.cs [new file with mode: 0644]
src/libraries/System.Net.Connections/tests/System.Net.Connections.Tests/MockPipe.cs [new file with mode: 0644]
src/libraries/System.Net.Connections/tests/System.Net.Connections.Tests/MockPipeReader.cs [new file with mode: 0644]
src/libraries/System.Net.Connections/tests/System.Net.Connections.Tests/MockPipeWriter.cs [new file with mode: 0644]
src/libraries/System.Net.Connections/tests/System.Net.Connections.Tests/MockStream.cs [new file with mode: 0644]
src/libraries/System.Net.Connections/tests/System.Net.Connections.Tests/System.Net.Connections.Tests.csproj [new file with mode: 0644]
src/libraries/System.Net.Http/ref/System.Net.Http.cs
src/libraries/System.Net.Http/ref/System.Net.Http.csproj
src/libraries/System.Net.Http/src/System.Net.Http.csproj
src/libraries/System.Net.Http/src/System/Net/Http/BrowserHttpHandler/SocketsHttpConnectionFactory.cs [new file with mode: 0644]
src/libraries/System.Net.Http/src/System/Net/Http/BrowserHttpHandler/SocketsHttpHandler.cs
src/libraries/System.Net.Http/src/System/Net/Http/SocketsHttpHandler/ConnectHelper.cs
src/libraries/System.Net.Http/src/System/Net/Http/SocketsHttpHandler/Connections/SocketConnection.cs [new file with mode: 0644]
src/libraries/System.Net.Http/src/System/Net/Http/SocketsHttpHandler/Connections/TaskSocketAsyncEventArgs.cs [new file with mode: 0644]
src/libraries/System.Net.Http/src/System/Net/Http/SocketsHttpHandler/DnsEndPointWithProperties.cs [new file with mode: 0644]
src/libraries/System.Net.Http/src/System/Net/Http/SocketsHttpHandler/Http2Connection.cs
src/libraries/System.Net.Http/src/System/Net/Http/SocketsHttpHandler/HttpConnection.cs
src/libraries/System.Net.Http/src/System/Net/Http/SocketsHttpHandler/HttpConnectionPool.cs
src/libraries/System.Net.Http/src/System/Net/Http/SocketsHttpHandler/HttpConnectionSettings.cs
src/libraries/System.Net.Http/src/System/Net/Http/SocketsHttpHandler/SocketsHttpConnectionFactory.cs [new file with mode: 0644]
src/libraries/System.Net.Http/src/System/Net/Http/SocketsHttpHandler/SocketsHttpHandler.cs
src/libraries/System.Net.Http/tests/FunctionalTests/SocketsHttpHandlerTest.cs
src/libraries/System.Net.Http/tests/FunctionalTests/System.Net.Http.Functional.Tests.csproj
src/libraries/pkg/baseline/packageIndex.json

index fd12e23..d15ab88 100644 (file)
@@ -6,6 +6,8 @@ using System.Linq;
 using System.Text;
 using System.Threading.Tasks;
 using System.Security.Authentication;
+using System.IO;
+using System.Net.Sockets;
 
 namespace System.Net.Test.Common
 {
@@ -17,6 +19,8 @@ namespace System.Net.Test.Common
         public abstract GenericLoopbackServer CreateServer(GenericLoopbackOptions options = null);
         public abstract Task CreateServerAsync(Func<GenericLoopbackServer, Uri, Task> funcAsync, int millisecondsTimeout = 60_000, GenericLoopbackOptions options = null);
 
+        public abstract Task<GenericLoopbackConnection> CreateConnectionAsync(Socket socket, Stream stream, GenericLoopbackOptions options = null);
+
         public abstract Version Version { get; }
 
         // Common helper methods
@@ -58,6 +62,8 @@ namespace System.Net.Test.Common
     {
         public abstract void Dispose();
 
+        public abstract Task InitializeConnectionAsync();
+
         /// <summary>Read request Headers and optionally request body as well.</summary>
         public abstract Task<HttpRequestData> ReadRequestDataAsync(bool readBody = true);
         /// <summary>Read complete request body if not done by ReadRequestData.</summary>
index 3bc2097..ad60410 100644 (file)
@@ -7,6 +7,7 @@ using System.Linq;
 using System.Net.Http.Functional.Tests;
 using System.Net.Security;
 using System.Net.Sockets;
+using System.Security.Cryptography.X509Certificates;
 using System.Text;
 using System.Threading;
 using System.Threading.Tasks;
@@ -24,28 +25,31 @@ namespace System.Net.Test.Common
         private readonly TimeSpan _timeout;
         private int _lastStreamId;
 
-        private readonly byte[] _prefix;
+        private readonly byte[] _prefix = new byte[24];
         public string PrefixString => Encoding.UTF8.GetString(_prefix, 0, _prefix.Length);
         public bool IsInvalid => _connectionSocket == null;
         public Stream Stream => _connectionStream;
         public Task<bool> SettingAckWaiter => _ignoredSettingsAckPromise?.Task;
 
-        public Http2LoopbackConnection(Socket socket, Http2Options httpOptions)
-            : this(socket, httpOptions, Http2LoopbackServer.Timeout)
+        private Http2LoopbackConnection(Socket socket, Stream stream, TimeSpan timeout)
         {
+            _connectionSocket = socket;
+            _connectionStream = stream;
+            _timeout = timeout;
         }
 
-        public Http2LoopbackConnection(Socket socket, Http2Options httpOptions, TimeSpan timeout)
+        public static Task<Http2LoopbackConnection> CreateAsync(Socket socket, Stream stream, Http2Options httpOptions)
         {
-            _connectionSocket = socket;
-            _connectionStream = new NetworkStream(_connectionSocket, true);
-            _timeout = timeout;
+            return CreateAsync(socket, stream, httpOptions, Http2LoopbackServer.Timeout);
+        }
 
+        public static async Task<Http2LoopbackConnection> CreateAsync(Socket socket, Stream stream, Http2Options httpOptions, TimeSpan timeout)
+        {
             if (httpOptions.UseSsl)
             {
-                var sslStream = new SslStream(_connectionStream, false, delegate { return true; });
+                var sslStream = new SslStream(stream, false, delegate { return true; });
 
-                using (var cert = Configuration.Certificates.GetServerCertificate())
+                using (X509Certificate2 cert = Configuration.Certificates.GetServerCertificate())
                 {
 #if !NETFRAMEWORK
                     SslServerAuthenticationOptions options = new SslServerAuthenticationOptions();
@@ -61,21 +65,29 @@ namespace System.Net.Test.Common
 
                     options.ClientCertificateRequired = httpOptions.ClientCertificateRequired;
 
-                    sslStream.AuthenticateAsServerAsync(options, CancellationToken.None).Wait();
+                    await sslStream.AuthenticateAsServerAsync(options, CancellationToken.None).ConfigureAwait(false);
 #else
-                    sslStream.AuthenticateAsServerAsync(cert, httpOptions.ClientCertificateRequired, httpOptions.SslProtocols, checkCertificateRevocation: false).Wait();
+                    await sslStream.AuthenticateAsServerAsync(cert, httpOptions.ClientCertificateRequired, httpOptions.SslProtocols, checkCertificateRevocation: false).ConfigureAwait(false);
 #endif
                 }
 
-                _connectionStream = sslStream;
+                stream = sslStream;
             }
 
-            _prefix = new byte[24];
-            if (!FillBufferAsync(_prefix).Result)
+            var con = new Http2LoopbackConnection(socket, stream, timeout);
+            await con.ReadPrefixAsync().ConfigureAwait(false);
+
+            return con;
+        }
+
+        private async Task ReadPrefixAsync()
+        {
+            if (!await FillBufferAsync(_prefix))
             {
                 throw new Exception("Connection stream closed while attempting to read connection preface.");
             }
-            else if (Text.Encoding.ASCII.GetString(_prefix).Contains("HTTP/1.1"))
+
+            if (Text.Encoding.ASCII.GetString(_prefix).Contains("HTTP/1.1"))
             {
                 throw new Exception("HTTP 1.1 request received.");
             }
@@ -275,7 +287,7 @@ namespace System.Net.Test.Common
 
         public void ShutdownSend()
         {
-            _connectionSocket.Shutdown(SocketShutdown.Send);
+            _connectionSocket?.Shutdown(SocketShutdown.Send);
         }
 
         // This will cause a server-initiated shutdown of the connection.
@@ -563,6 +575,41 @@ namespace System.Net.Test.Common
             return (streamId, requestData);
         }
 
+        public override Task InitializeConnectionAsync()
+        {
+            return ReadAndSendSettingsAsync(ackTimeout: null);
+        }
+
+        public async Task<SettingsFrame> ReadAndSendSettingsAsync(TimeSpan? ackTimeout, params SettingsEntry[] settingsEntries)
+        {
+            // Receive the initial client settings frame.
+            Frame receivedFrame = await ReadFrameAsync(_timeout).ConfigureAwait(false);
+            Assert.Equal(FrameType.Settings, receivedFrame.Type);
+            Assert.Equal(FrameFlags.None, receivedFrame.Flags);
+            Assert.Equal(0, receivedFrame.StreamId);
+
+            var clientSettingsFrame = (SettingsFrame)receivedFrame;
+
+            // Receive the initial client window update frame.
+            receivedFrame = await ReadFrameAsync(_timeout).ConfigureAwait(false);
+            Assert.Equal(FrameType.WindowUpdate, receivedFrame.Type);
+            Assert.Equal(FrameFlags.None, receivedFrame.Flags);
+            Assert.Equal(0, receivedFrame.StreamId);
+
+            // Send the initial server settings frame.
+            SettingsFrame settingsFrame = new SettingsFrame(settingsEntries);
+            await WriteFrameAsync(settingsFrame).ConfigureAwait(false);
+
+            // Send the client settings frame ACK.
+            Frame settingsAck = new Frame(0, FrameType.Settings, FrameFlags.Ack, 0);
+            await WriteFrameAsync(settingsAck).ConfigureAwait(false);
+
+            // The client will send us a SETTINGS ACK eventually, but not necessarily right away.
+            await ExpectSettingsAckAsync((int?)ackTimeout?.TotalMilliseconds ?? 5000);
+
+            return clientSettingsFrame;
+        }
+
         public async Task SendGoAway(int lastStreamId, ProtocolErrors errorCode = ProtocolErrors.NO_ERROR)
         {
             GoAwayFrame frame = new GoAwayFrame(lastStreamId, (int)errorCode, new byte[] { }, 0);
index 8516140..e890812 100644 (file)
@@ -90,7 +90,10 @@ namespace System.Net.Test.Common
 
             Socket connectionSocket = await _listenSocket.AcceptAsync().ConfigureAwait(false);
 
-            Http2LoopbackConnection connection = timeout != null ? new Http2LoopbackConnection(connectionSocket, _options, timeout.Value) : new Http2LoopbackConnection(connectionSocket, _options);
+            var stream = new NetworkStream(connectionSocket, ownsSocket: true);
+            Http2LoopbackConnection connection =
+                timeout != null ? await Http2LoopbackConnection.CreateAsync(connectionSocket, stream, _options, timeout.Value).ConfigureAwait(false) :
+                await Http2LoopbackConnection.CreateAsync(connectionSocket, stream, _options).ConfigureAwait(false);
             _connections.Add(connection);
 
             return connection;
@@ -103,7 +106,7 @@ namespace System.Net.Test.Common
 
         public Task<Http2LoopbackConnection> EstablishConnectionAsync(params SettingsEntry[] settingsEntries)
         {
-            return EstablishConnectionAsync(null, null, settingsEntries);
+            return EstablishConnectionAsync(timeout: null, ackTimeout: null, settingsEntries);
         }
 
         public async Task<Http2LoopbackConnection> EstablishConnectionAsync(TimeSpan? timeout, TimeSpan? ackTimeout, params SettingsEntry[] settingsEntries)
@@ -114,37 +117,13 @@ namespace System.Net.Test.Common
 
         public Task<(Http2LoopbackConnection, SettingsFrame)> EstablishConnectionGetSettingsAsync(params SettingsEntry[] settingsEntries)
         {
-            return EstablishConnectionGetSettingsAsync(null, null, settingsEntries);
+            return EstablishConnectionGetSettingsAsync(timeout: null, ackTimeout: null, settingsEntries);
         }
 
         public async Task<(Http2LoopbackConnection, SettingsFrame)> EstablishConnectionGetSettingsAsync(TimeSpan? timeout, TimeSpan? ackTimeout, params SettingsEntry[] settingsEntries)
         {
             Http2LoopbackConnection connection = await AcceptConnectionAsync(timeout).ConfigureAwait(false);
-
-            // Receive the initial client settings frame.
-            Frame receivedFrame = await connection.ReadFrameAsync(Timeout).ConfigureAwait(false);
-            Assert.Equal(FrameType.Settings, receivedFrame.Type);
-            Assert.Equal(FrameFlags.None, receivedFrame.Flags);
-            Assert.Equal(0, receivedFrame.StreamId);
-
-            var clientSettingsFrame = (SettingsFrame)receivedFrame;
-
-            // Receive the initial client window update frame.
-            receivedFrame = await connection.ReadFrameAsync(Timeout).ConfigureAwait(false);
-            Assert.Equal(FrameType.WindowUpdate, receivedFrame.Type);
-            Assert.Equal(FrameFlags.None, receivedFrame.Flags);
-            Assert.Equal(0, receivedFrame.StreamId);
-
-            // Send the initial server settings frame.
-            SettingsFrame settingsFrame = new SettingsFrame(settingsEntries);
-            await connection.WriteFrameAsync(settingsFrame).ConfigureAwait(false);
-
-            // Send the client settings frame ACK.
-            Frame settingsAck = new Frame(0, FrameType.Settings, FrameFlags.Ack, 0);
-            await connection.WriteFrameAsync(settingsAck).ConfigureAwait(false);
-
-            // The client will send us a SETTINGS ACK eventually, but not necessarily right away.
-            await connection.ExpectSettingsAckAsync((int) (ackTimeout?.TotalMilliseconds ?? 5000));
+            SettingsFrame clientSettingsFrame = await connection.ReadAndSendSettingsAsync(ackTimeout, settingsEntries).ConfigureAwait(false);
 
             return (connection, clientSettingsFrame);
         }
@@ -220,7 +199,6 @@ namespace System.Net.Test.Common
 
         public Http2Options()
         {
-            UseSsl = PlatformDetection.SupportsAlpn && !Capability.Http2ForceUnencryptedLoopback();
             SslProtocols = SslProtocols.Tls12;
         }
     }
@@ -239,6 +217,16 @@ namespace System.Net.Test.Common
 
         public override GenericLoopbackServer CreateServer(GenericLoopbackOptions options = null)
         {
+            return Http2LoopbackServer.CreateServer(CreateOptions(options));
+        }
+
+        public override async Task<GenericLoopbackConnection> CreateConnectionAsync(Socket socket, Stream stream, GenericLoopbackOptions options = null)
+        {
+            return await Http2LoopbackConnection.CreateAsync(socket, stream, CreateOptions(options)).ConfigureAwait(false);
+        }
+
+        private static Http2Options CreateOptions(GenericLoopbackOptions options)
+        {
             Http2Options http2Options = new Http2Options();
             if (options != null)
             {
@@ -246,8 +234,7 @@ namespace System.Net.Test.Common
                 http2Options.UseSsl = options.UseSsl;
                 http2Options.SslProtocols = options.SslProtocols;
             }
-
-            return Http2LoopbackServer.CreateServer(http2Options);
+            return http2Options;
         }
 
         public override async Task CreateServerAsync(Func<GenericLoopbackServer, Uri, Task> funcAsync, int millisecondsTimeout = 60_000, GenericLoopbackOptions options = null)
index 5d6f3e7..cfa912a 100644 (file)
@@ -8,6 +8,7 @@ using System.Net.Quic;
 using System.Text;
 using System.Threading.Tasks;
 using System.Linq;
+using System.Net.Http.Functional.Tests;
 
 namespace System.Net.Test.Common
 {
@@ -84,6 +85,11 @@ namespace System.Net.Test.Common
             return requestId == 0 ? _currentStream : _openStreams[requestId - 1];
         }
 
+        public override Task InitializeConnectionAsync()
+        {
+            throw new NotImplementedException();
+        }
+
         public async Task<Http3LoopbackStream> AcceptStreamAsync()
         {
             QuicStream quicStream = await _connection.AcceptStreamAsync().ConfigureAwait(false);
index 4115570..49c3c86 100644 (file)
@@ -3,8 +3,10 @@
 
 using System.Collections.Generic;
 using System.Diagnostics;
+using System.IO;
 using System.Net.Quic;
 using System.Net.Security;
+using System.Net.Sockets;
 using System.Security.Cryptography.X509Certificates;
 using System.Threading.Tasks;
 
@@ -80,5 +82,12 @@ namespace System.Net.Test.Common
             using GenericLoopbackServer server = CreateServer(options);
             await funcAsync(server, server.Address).TimeoutAfter(millisecondsTimeout).ConfigureAwait(false);
         }
+
+        public override Task<GenericLoopbackConnection> CreateConnectionAsync(Socket socket, Stream stream, GenericLoopbackOptions options = null)
+        {
+            // TODO: make a new overload that takes a MultiplexedConnection.
+            // This method is always unacceptable to call for HTTP/3.
+            throw new NotImplementedException("HTTP/3 does not operate over a Socket.");
+        }
     }
 }
index 208f9d7..76c8459 100644 (file)
@@ -594,13 +594,18 @@ namespace System.Net.Test.Common
                     // This seems to help avoid connection reset issues caused by buffered data
                     // that has not been sent/acked when the graceful shutdown timeout expires.
                     // This may throw if the socket was already closed, so eat any exception.
-                    _socket.Shutdown(SocketShutdown.Send);
+                    _socket?.Shutdown(SocketShutdown.Send);
                 }
                 catch (Exception) { }
 
                 _writer.Dispose();
                 _stream.Dispose();
-                _socket.Dispose();
+                _socket?.Dispose();
+            }
+
+            public override Task InitializeConnectionAsync()
+            {
+                return Task.CompletedTask;
             }
 
             public async Task<List<string>> ReadRequestHeaderAsync()
@@ -896,6 +901,11 @@ namespace System.Net.Test.Common
             return LoopbackServer.CreateServerAsync((server, uri) => funcAsync(server, uri), options: CreateOptions(options));
         }
 
+        public override Task<GenericLoopbackConnection> CreateConnectionAsync(Socket socket, Stream stream, GenericLoopbackOptions options = null)
+        {
+            return Task.FromResult<GenericLoopbackConnection>(new LoopbackServer.Connection(socket, stream));
+        }
+
         private static LoopbackServer.Options CreateOptions(GenericLoopbackOptions options)
         {
             LoopbackServer.Options newOptions = new LoopbackServer.Options();
index 77a4695..141fe66 100644 (file)
@@ -14,7 +14,7 @@ namespace System.Net.Test.Common
             public VirtualNetworkConnectionBroken() : base("Connection broken") { }
         }
 
-        private readonly int WaitForReadDataTimeoutMilliseconds = 30 * 1000;
+        private readonly int WaitForReadDataTimeoutMilliseconds = 60 * 1000;
 
         private readonly ConcurrentQueue<byte[]> _clientWriteQueue = new ConcurrentQueue<byte[]>();
         private readonly ConcurrentQueue<byte[]> _serverWriteQueue = new ConcurrentQueue<byte[]>();
@@ -22,8 +22,11 @@ namespace System.Net.Test.Common
         private readonly SemaphoreSlim _clientDataAvailable = new SemaphoreSlim(0);
         private readonly SemaphoreSlim _serverDataAvailable = new SemaphoreSlim(0);
 
+        private volatile bool _clientWriteShutdown = false;
+        private volatile bool _serverWriteShutdown = false;
+
         public bool DisableConnectionBreaking { get; set; } = false;
-        private bool _connectionBroken = false;
+        private volatile bool _connectionBroken = false;
 
         public byte[] ReadFrame(bool server) =>
             ReadFrameCoreAsync(server, sync: true, cancellationToken: default).GetAwaiter().GetResult();
@@ -75,6 +78,11 @@ namespace System.Net.Test.Common
                     return buffer;
                 }
 
+                if ((server && _clientWriteShutdown) || (!server && _serverWriteShutdown))
+                {
+                    return Array.Empty<byte>();
+                }
+
                 remainingTries--;
                 backOffDelayMilliseconds *= backOffDelayMilliseconds;
                 if (sync)
@@ -98,6 +106,16 @@ namespace System.Net.Test.Common
                 throw new VirtualNetworkConnectionBroken();
             }
 
+            if ((server && _serverWriteShutdown) || (!server && _clientWriteShutdown))
+            {
+                throw new InvalidOperationException("Writing to a shutdown side.");
+            }
+
+            if (buffer.Length == 0)
+            {
+                return;
+            }
+
             SemaphoreSlim semaphore;
             ConcurrentQueue<byte[]> packetQueue;
 
@@ -116,11 +134,25 @@ namespace System.Net.Test.Common
             semaphore.Release();
         }
 
+        public void GracefulShutdown(bool server)
+        {
+            if (server)
+            {
+                _serverWriteShutdown = true;
+                _serverDataAvailable.Release(1_000_000);
+            }
+            else
+            {
+                _clientWriteShutdown = true;
+                _clientDataAvailable.Release(1_000_000);
+            }
+        }
+
         public void BreakConnection()
         {
             if (!DisableConnectionBreaking)
             {
-                _connectionBroken = true;
+                _connectionBroken = !_serverWriteShutdown || !_clientWriteShutdown;
                 _serverDataAvailable.Release(1_000_000);
                 _clientDataAvailable.Release(1_000_000);
             }
diff --git a/src/libraries/Common/tests/System/Net/VirtualNetwork/VirtualNetworkConnectionListenerFactory.cs b/src/libraries/Common/tests/System/Net/VirtualNetwork/VirtualNetworkConnectionListenerFactory.cs
new file mode 100644 (file)
index 0000000..0a47a25
--- /dev/null
@@ -0,0 +1,187 @@
+// Licensed to the .NET Foundation under one or more agreements.
+// The .NET Foundation licenses this file to you under the MIT license.
+
+using System.Diagnostics;
+using System.IO;
+using System.Net.Connections;
+using System.Threading;
+using System.Threading.Channels;
+using System.Threading.Tasks;
+
+namespace System.Net.Test.Common
+{
+
+    internal sealed class VirtualNetworkConnectionListenerFactory : ConnectionListenerFactory
+    {
+        public static ConnectionFactory GetConnectionFactory(ConnectionListener listener)
+        {
+            bool hasFactory = listener.ListenerProperties.TryGet(out VirtualConnectionFactory factory);
+            Debug.Assert(hasFactory);
+            return factory;
+        }
+
+        public override ValueTask<ConnectionListener> BindAsync(EndPoint endPoint, IConnectionProperties options = null, CancellationToken cancellationToken = default)
+        {
+            if (cancellationToken.IsCancellationRequested) return ValueTask.FromCanceled<ConnectionListener>(cancellationToken);
+            return new ValueTask<ConnectionListener>(new VirtualConnectionListener(endPoint));
+        }
+
+        protected override void Dispose(bool disposing)
+        {
+        }
+
+        protected override ValueTask DisposeAsyncCore()
+        {
+            return default;
+        }
+
+        private sealed class VirtualConnectionListener : ConnectionListener, IConnectionProperties
+        {
+            private readonly Channel<TaskCompletionSource<Connection>> _pendingConnects;
+            private readonly VirtualConnectionFactory _connectionFactory;
+            private readonly CancellationTokenSource _cts = new CancellationTokenSource();
+
+            public override IConnectionProperties ListenerProperties => this;
+
+            public override EndPoint LocalEndPoint { get; }
+
+            public VirtualConnectionListener(EndPoint localEndPoint)
+            {
+                LocalEndPoint = localEndPoint;
+
+                _pendingConnects = Channel.CreateUnbounded<TaskCompletionSource<Connection>>();
+                _connectionFactory = new VirtualConnectionFactory(this);
+            }
+
+            public override async ValueTask<Connection> AcceptAsync(IConnectionProperties options = null, CancellationToken cancellationToken = default)
+            {
+                using CancellationTokenSource cts = CancellationTokenSource.CreateLinkedTokenSource(_cts.Token, cancellationToken);
+
+                var network = new VirtualNetwork();
+                var serverConnection = new VirtualConnection(network, isServer: true);
+                var clientConnection = new VirtualConnection(network, isServer: false);
+
+                while (true)
+                {
+                    TaskCompletionSource<Connection> tcs = await _pendingConnects.Reader.ReadAsync(cancellationToken);
+                    if (tcs.TrySetResult(clientConnection))
+                    {
+                        return serverConnection;
+                    }
+                }
+            }
+
+            internal async ValueTask<Connection> ConnectAsync(EndPoint endPoint, IConnectionProperties options = null, CancellationToken cancellationToken = default)
+            {
+                var tcs = new TaskCompletionSource<Connection>();
+                await _pendingConnects.Writer.WriteAsync(tcs, cancellationToken).ConfigureAwait(false);
+
+                using (CancellationTokenSource cts = CancellationTokenSource.CreateLinkedTokenSource(_cts.Token, cancellationToken))
+                using (cts.Token.UnsafeRegister(o => ((TaskCompletionSource<Connection>)o).TrySetCanceled(), tcs))
+                {
+                    return await tcs.Task.ConfigureAwait(false);
+                }
+            }
+
+            protected override void Dispose(bool disposing)
+            {
+                if (disposing)
+                {
+                    _cts.Cancel();
+                }
+            }
+
+            protected override ValueTask DisposeAsyncCore()
+            {
+                Dispose(true);
+                return default;
+            }
+
+            bool IConnectionProperties.TryGet(Type propertyKey, out object property)
+            {
+                if (propertyKey == typeof(VirtualConnectionFactory))
+                {
+                    property = _connectionFactory;
+                    return true;
+                }
+
+                property = null;
+                return false;
+            }
+        }
+
+        private sealed class VirtualConnectionFactory : ConnectionFactory
+        {
+            private readonly VirtualConnectionListener _listener;
+
+            public VirtualConnectionFactory(VirtualConnectionListener listener)
+            {
+                _listener = listener;
+            }
+
+            public override ValueTask<Connection> ConnectAsync(EndPoint endPoint, IConnectionProperties options = null, CancellationToken cancellationToken = default)
+            {
+                return _listener.ConnectAsync(endPoint, options, cancellationToken);
+            }
+
+            protected override void Dispose(bool disposing)
+            {
+                if (disposing)
+                {
+                    _listener.Dispose();
+                }
+            }
+
+            protected override ValueTask DisposeAsyncCore()
+            {
+                return _listener.DisposeAsync();
+            }
+        }
+
+        private sealed class VirtualConnection : Connection, IConnectionProperties
+        {
+            private readonly VirtualNetwork _network;
+            private bool _isServer;
+
+            public override IConnectionProperties ConnectionProperties => this;
+
+            public override EndPoint LocalEndPoint => null;
+
+            public override EndPoint RemoteEndPoint => null;
+
+            public VirtualConnection(VirtualNetwork network, bool isServer)
+            {
+                _network = network;
+                _isServer = isServer;
+            }
+
+            protected override Stream CreateStream()
+            {
+                return new VirtualNetworkStream(_network, _isServer, gracefulShutdown: true);
+            }
+
+            protected override ValueTask CloseAsyncCore(ConnectionCloseMethod method, CancellationToken cancellationToken)
+            {
+                if (cancellationToken.IsCancellationRequested) return ValueTask.FromCanceled(cancellationToken);
+
+                if (method == ConnectionCloseMethod.GracefulShutdown)
+                {
+                    _network.GracefulShutdown(_isServer);
+                }
+                else
+                {
+                    _network.BreakConnection();
+                }
+
+                return default;
+            }
+
+            bool IConnectionProperties.TryGet(Type propertyKey, out object property)
+            {
+                property = null;
+                return false;
+            }
+        }
+    }
+
+}
index 5770553..aa0fb2e 100644 (file)
@@ -12,13 +12,15 @@ namespace System.Net.Test.Common
         private readonly VirtualNetwork _network;
         private MemoryStream _readStream;
         private readonly bool _isServer;
+        private readonly bool _gracefulShutdown;
         private SemaphoreSlim _readStreamLock = new SemaphoreSlim(1, 1);
         private TaskCompletionSource _flushTcs;
 
-        public VirtualNetworkStream(VirtualNetwork network, bool isServer)
+        public VirtualNetworkStream(VirtualNetwork network, bool isServer, bool gracefulShutdown = false)
         {
             _network = network;
             _isServer = isServer;
+            _gracefulShutdown = gracefulShutdown;
         }
 
         public int DelayMilliseconds { get; set; }
@@ -79,7 +81,10 @@ namespace System.Net.Test.Common
             {
                 if (_readStream == null || (_readStream.Position >= _readStream.Length))
                 {
-                    _readStream = new MemoryStream(_network.ReadFrame(_isServer));
+                    byte[] frame = _network.ReadFrame(_isServer);
+                    if (frame.Length == 0) return 0;
+
+                    _readStream = new MemoryStream(frame);
                 }
 
                 return _readStream.Read(buffer, offset, count);
@@ -102,7 +107,10 @@ namespace System.Net.Test.Common
 
                 if (_readStream == null || (_readStream.Position >= _readStream.Length))
                 {
-                    _readStream = new MemoryStream(await _network.ReadFrameAsync(_isServer, cancellationToken).ConfigureAwait(false));
+                    byte[] frame = await _network.ReadFrameAsync(_isServer, cancellationToken).ConfigureAwait(false);
+                    if (frame.Length == 0) return 0;
+
+                    _readStream = new MemoryStream(frame);
                 }
 
                 return await _readStream.ReadAsync(buffer, offset, count, cancellationToken).ConfigureAwait(false);
@@ -147,10 +155,22 @@ namespace System.Net.Test.Common
             if (disposing)
             {
                 Disposed = true;
-                _network.BreakConnection();
+                if (_gracefulShutdown)
+                {
+                    GracefulShutdown();
+                }
+                else
+                {
+                    _network.BreakConnection();
+                }
             }
 
             base.Dispose(disposing);
         }
+
+        public void GracefulShutdown()
+        {
+            _network.GracefulShutdown(_isServer);
+        }
     }
 }
index 033cdd7..a2a6c5e 100644 (file)
@@ -60,6 +60,20 @@ namespace System.Threading.Tasks
             }
         }
 
+#if !NETFRAMEWORK
+        public static Task TimeoutAfter(this ValueTask task, int millisecondsTimeout)
+            => task.AsTask().TimeoutAfter(TimeSpan.FromMilliseconds(millisecondsTimeout));
+
+        public static Task TimeoutAfter(this ValueTask task, TimeSpan timeout)
+            => task.AsTask().TimeoutAfter(timeout);
+
+        public static Task<TResult> TimeoutAfter<TResult>(this ValueTask<TResult> task, int millisecondsTimeout)
+            => task.AsTask().TimeoutAfter(TimeSpan.FromMilliseconds(millisecondsTimeout));
+
+        public static Task<TResult> TimeoutAfter<TResult>(this ValueTask<TResult> task, TimeSpan timeout)
+            => task.AsTask().TimeoutAfter(timeout);
+#endif
+
         public static async Task WhenAllOrAnyFailed(this Task[] tasks, int millisecondsTimeout)
         {
             var cts = new CancellationTokenSource();
index 710e7b1..9b52e36 100644 (file)
@@ -48,6 +48,7 @@
       System.IO.FileSystem.Watcher;
       System.IO.IsolatedStorage;
       System.IO.MemoryMappedFiles;
+      System.IO.Pipelines;
       System.IO.Pipes;
       System.IO.Pipes.AccessControl;
       System.IO.UnmanagedMemoryStream;
@@ -56,6 +57,7 @@
       System.Linq.Parallel;
       System.Linq.Queryable;
       System.Memory;
+      System.Net.Connections;
       System.Net.Http;
       System.Net.Http.Json;
       System.Net.HttpListener;
index da675e5..517bc69 100644 (file)
@@ -11,6 +11,7 @@
     </ProjectReference>
     <ProjectReference Include="..\src\System.IO.Pipelines.csproj" />
     <HarvestIncludePaths Include="lib/netstandard1.3" />
+    <InboxOnTargetFramework Include="net5.0" />
   </ItemGroup>
   <Import Project="$([MSBuild]::GetPathOfFileAbove(Directory.Build.targets))" />
 </Project>
index a038d35..7cb2868 100644 (file)
@@ -1,16 +1,21 @@
 <Project Sdk="Microsoft.NET.Sdk">
   <PropertyGroup>
-    <TargetFrameworks>netstandard2.0;net461</TargetFrameworks>
+    <TargetFrameworks>$(NetCoreAppCurrent);netstandard2.0;net461</TargetFrameworks>
     <Nullable>enable</Nullable>
     <ExcludeFromPackage Condition="'$(TargetFramework)' == 'net461'">true</ExcludeFromPackage>
     <!-- We only plan to use this ref in netcoreapp. For all other netstandard compatible frameworks
     we should use the lib asset instead. -->
     <PackageTargetFramework>netcoreapp2.0</PackageTargetFramework>
+    <ExcludeCurrentNetCoreAppFromPackage>true</ExcludeCurrentNetCoreAppFromPackage>
   </PropertyGroup>
   <ItemGroup>
     <Compile Include="System.IO.Pipelines.cs" />
   </ItemGroup>
-  <ItemGroup>
+  <ItemGroup Condition="'$(TargetFramework)' == '$(NetCoreAppCurrent)'">
+    <ProjectReference Include="..\..\System.Runtime\ref\System.Runtime.csproj" />
+    <ProjectReference Include="..\..\System.Memory\ref\System.Memory.csproj" />
+  </ItemGroup>
+  <ItemGroup Condition="'$(TargetFramework)' != '$(NetCoreAppCurrent)'">
     <PackageReference Include="System.Memory" Version="$(SystemMemoryVersion)" />
     <PackageReference Include="System.Buffers" Version="$(SystemBuffersVersion)" />
     <PackageReference Include="System.Threading.Tasks.Extensions" Version="$(SystemThreadingTasksExtensionsVersion)" />
diff --git a/src/libraries/System.Net.Connections/Directory.Build.props b/src/libraries/System.Net.Connections/Directory.Build.props
new file mode 100644 (file)
index 0000000..63f02a0
--- /dev/null
@@ -0,0 +1,6 @@
+<Project>
+  <Import Project="..\Directory.Build.props" />
+  <PropertyGroup>
+    <StrongNameKeyId>Microsoft</StrongNameKeyId>
+  </PropertyGroup>
+</Project>
\ No newline at end of file
diff --git a/src/libraries/System.Net.Connections/System.Net.Connections.sln b/src/libraries/System.Net.Connections/System.Net.Connections.sln
new file mode 100644 (file)
index 0000000..a325eb2
--- /dev/null
@@ -0,0 +1,50 @@
+Microsoft Visual Studio Solution File, Format Version 12.00
+# Visual Studio Version 16
+VisualStudioVersion = 16.0.30310.162
+MinimumVisualStudioVersion = 10.0.40219.1
+Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "System.Net.Connections", "src\System.Net.Connections.csproj", "{1D422B1D-D7C4-41B9-862D-EB3D98DF37DE}"
+       ProjectSection(ProjectDependencies) = postProject
+               {132BF813-FC40-4D39-8B6F-E55D7633F0ED} = {132BF813-FC40-4D39-8B6F-E55D7633F0ED}
+       EndProjectSection
+EndProject
+Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "System.Net.Connections", "ref\System.Net.Connections.csproj", "{132BF813-FC40-4D39-8B6F-E55D7633F0ED}"
+EndProject
+Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "src", "src", "{E107E9C1-E893-4E87-987E-04EF0DCEAEFD}"
+EndProject
+Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "ref", "ref", "{2E666815-2EDB-464B-9DF6-380BF4789AD4}"
+EndProject
+Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "tests", "tests", "{E0983BCC-F93B-4FFF-A4E4-EA874908783F}"
+EndProject
+Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "System.Net.Connections.Tests", "tests\System.Net.Connections.Tests\System.Net.Connections.Tests.csproj", "{A4DB505A-FFD8-4A7B-B31A-12AD623E7AD9}"
+EndProject
+Global
+       GlobalSection(SolutionConfigurationPlatforms) = preSolution
+               Debug|Any CPU = Debug|Any CPU
+               Release|Any CPU = Release|Any CPU
+       EndGlobalSection
+       GlobalSection(ProjectConfigurationPlatforms) = postSolution
+               {1D422B1D-D7C4-41B9-862D-EB3D98DF37DE}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
+               {1D422B1D-D7C4-41B9-862D-EB3D98DF37DE}.Debug|Any CPU.Build.0 = Debug|Any CPU
+               {1D422B1D-D7C4-41B9-862D-EB3D98DF37DE}.Release|Any CPU.ActiveCfg = Release|Any CPU
+               {1D422B1D-D7C4-41B9-862D-EB3D98DF37DE}.Release|Any CPU.Build.0 = Release|Any CPU
+               {132BF813-FC40-4D39-8B6F-E55D7633F0ED}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
+               {132BF813-FC40-4D39-8B6F-E55D7633F0ED}.Debug|Any CPU.Build.0 = Debug|Any CPU
+               {132BF813-FC40-4D39-8B6F-E55D7633F0ED}.Release|Any CPU.ActiveCfg = Release|Any CPU
+               {132BF813-FC40-4D39-8B6F-E55D7633F0ED}.Release|Any CPU.Build.0 = Release|Any CPU
+               {A4DB505A-FFD8-4A7B-B31A-12AD623E7AD9}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
+               {A4DB505A-FFD8-4A7B-B31A-12AD623E7AD9}.Debug|Any CPU.Build.0 = Debug|Any CPU
+               {A4DB505A-FFD8-4A7B-B31A-12AD623E7AD9}.Release|Any CPU.ActiveCfg = Release|Any CPU
+               {A4DB505A-FFD8-4A7B-B31A-12AD623E7AD9}.Release|Any CPU.Build.0 = Release|Any CPU
+       EndGlobalSection
+       GlobalSection(SolutionProperties) = preSolution
+               HideSolutionNode = FALSE
+       EndGlobalSection
+       GlobalSection(NestedProjects) = preSolution
+               {1D422B1D-D7C4-41B9-862D-EB3D98DF37DE} = {E107E9C1-E893-4E87-987E-04EF0DCEAEFD}
+               {132BF813-FC40-4D39-8B6F-E55D7633F0ED} = {2E666815-2EDB-464B-9DF6-380BF4789AD4}
+               {A4DB505A-FFD8-4A7B-B31A-12AD623E7AD9} = {E0983BCC-F93B-4FFF-A4E4-EA874908783F}
+       EndGlobalSection
+       GlobalSection(ExtensibilityGlobals) = postSolution
+               SolutionGuid = {5100F629-0FAB-4C6F-9A54-95AE9565EE0D}
+       EndGlobalSection
+EndGlobal
diff --git a/src/libraries/System.Net.Connections/ref/System.Net.Connections.cs b/src/libraries/System.Net.Connections/ref/System.Net.Connections.cs
new file mode 100644 (file)
index 0000000..c82551e
--- /dev/null
@@ -0,0 +1,74 @@
+// Licensed to the .NET Foundation under one or more agreements.
+// The .NET Foundation licenses this file to you under the MIT license.
+// ------------------------------------------------------------------------------
+// Changes to this file must follow the https://aka.ms/api-review process.
+// ------------------------------------------------------------------------------
+
+namespace System.Net.Connections
+{
+    public abstract partial class Connection : System.Net.Connections.ConnectionBase
+    {
+        protected Connection() { }
+        public System.IO.Pipelines.IDuplexPipe Pipe { get { throw null; } }
+        public System.IO.Stream Stream { get { throw null; } }
+        protected virtual System.IO.Pipelines.IDuplexPipe CreatePipe() { throw null; }
+        protected virtual System.IO.Stream CreateStream() { throw null; }
+        public static System.Net.Connections.Connection FromPipe(System.IO.Pipelines.IDuplexPipe pipe, bool leaveOpen = false, System.Net.Connections.IConnectionProperties? properties = null, System.Net.EndPoint? localEndPoint = null, System.Net.EndPoint? remoteEndPoint = null) { throw null; }
+        public static System.Net.Connections.Connection FromStream(System.IO.Stream stream, bool leaveOpen = false, System.Net.Connections.IConnectionProperties? properties = null, System.Net.EndPoint? localEndPoint = null, System.Net.EndPoint? remoteEndPoint = null) { throw null; }
+    }
+    public abstract partial class ConnectionBase : System.IAsyncDisposable, System.IDisposable
+    {
+        protected ConnectionBase() { }
+        public abstract System.Net.Connections.IConnectionProperties ConnectionProperties { get; }
+        public abstract System.Net.EndPoint? LocalEndPoint { get; }
+        public abstract System.Net.EndPoint? RemoteEndPoint { get; }
+        public System.Threading.Tasks.ValueTask CloseAsync(System.Net.Connections.ConnectionCloseMethod method = System.Net.Connections.ConnectionCloseMethod.GracefulShutdown, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
+        protected abstract System.Threading.Tasks.ValueTask CloseAsyncCore(System.Net.Connections.ConnectionCloseMethod method, System.Threading.CancellationToken cancellationToken);
+        public void Dispose() { }
+        public System.Threading.Tasks.ValueTask DisposeAsync() { throw null; }
+    }
+    public enum ConnectionCloseMethod
+    {
+        GracefulShutdown = 0,
+        Abort = 1,
+        Immediate = 2,
+    }
+    public static partial class ConnectionExtensions
+    {
+        public static System.Net.Connections.ConnectionFactory Filter(this System.Net.Connections.ConnectionFactory factory, System.Func<System.Net.Connections.Connection, System.Net.Connections.IConnectionProperties?, System.Threading.CancellationToken, System.Threading.Tasks.ValueTask<System.Net.Connections.Connection>> filter) { throw null; }
+        public static bool TryGet<T>(this System.Net.Connections.IConnectionProperties properties, [System.Diagnostics.CodeAnalysis.MaybeNullWhenAttribute(false)] out T property) { throw null; }
+    }
+    public abstract partial class ConnectionFactory : System.IAsyncDisposable, System.IDisposable
+    {
+        protected ConnectionFactory() { }
+        public abstract System.Threading.Tasks.ValueTask<System.Net.Connections.Connection> ConnectAsync(System.Net.EndPoint? endPoint, System.Net.Connections.IConnectionProperties? options = null, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken));
+        public void Dispose() { }
+        protected virtual void Dispose(bool disposing) { }
+        public System.Threading.Tasks.ValueTask DisposeAsync() { throw null; }
+        protected virtual System.Threading.Tasks.ValueTask DisposeAsyncCore() { throw null; }
+    }
+    public abstract partial class ConnectionListener : System.IAsyncDisposable, System.IDisposable
+    {
+        protected ConnectionListener() { }
+        public abstract System.Net.Connections.IConnectionProperties ListenerProperties { get; }
+        public abstract System.Net.EndPoint? LocalEndPoint { get; }
+        public abstract System.Threading.Tasks.ValueTask<System.Net.Connections.Connection> AcceptAsync(System.Net.Connections.IConnectionProperties? options = null, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken));
+        public void Dispose() { }
+        protected virtual void Dispose(bool disposing) { }
+        public System.Threading.Tasks.ValueTask DisposeAsync() { throw null; }
+        protected virtual System.Threading.Tasks.ValueTask DisposeAsyncCore() { throw null; }
+    }
+    public abstract partial class ConnectionListenerFactory : System.IAsyncDisposable, System.IDisposable
+    {
+        protected ConnectionListenerFactory() { }
+        public abstract System.Threading.Tasks.ValueTask<System.Net.Connections.ConnectionListener> BindAsync(System.Net.EndPoint? endPoint, System.Net.Connections.IConnectionProperties? options = null, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken));
+        public void Dispose() { }
+        protected virtual void Dispose(bool disposing) { }
+        public System.Threading.Tasks.ValueTask DisposeAsync() { throw null; }
+        protected virtual System.Threading.Tasks.ValueTask DisposeAsyncCore() { throw null; }
+    }
+    public partial interface IConnectionProperties
+    {
+        bool TryGet(System.Type propertyKey, [System.Diagnostics.CodeAnalysis.NotNullWhenAttribute(true)] out object? property);
+    }
+}
diff --git a/src/libraries/System.Net.Connections/ref/System.Net.Connections.csproj b/src/libraries/System.Net.Connections/ref/System.Net.Connections.csproj
new file mode 100644 (file)
index 0000000..b239637
--- /dev/null
@@ -0,0 +1,14 @@
+<Project Sdk="Microsoft.NET.Sdk">
+  <PropertyGroup>
+    <TargetFrameworks>$(NetCoreAppCurrent)</TargetFrameworks>
+    <Nullable>enable</Nullable>
+  </PropertyGroup>
+  <ItemGroup>
+    <Compile Include="System.Net.Connections.cs" />
+  </ItemGroup>
+  <ItemGroup>
+    <ProjectReference Include="..\..\System.Runtime\ref\System.Runtime.csproj" />
+    <ProjectReference Include="..\..\System.Net.Primitives\ref\System.Net.Primitives.csproj" />
+    <ProjectReference Include="..\..\System.IO.Pipelines\ref\System.IO.Pipelines.csproj" />
+  </ItemGroup>
+</Project>
diff --git a/src/libraries/System.Net.Connections/src/Resources/Strings.resx b/src/libraries/System.Net.Connections/src/Resources/Strings.resx
new file mode 100644 (file)
index 0000000..004c0da
--- /dev/null
@@ -0,0 +1,138 @@
+<?xml version="1.0" encoding="utf-8"?>
+<root>
+  <!-- 
+    Microsoft ResX Schema 
+    
+    Version 2.0
+    
+    The primary goals of this format is to allow a simple XML format 
+    that is mostly human readable. The generation and parsing of the 
+    various data types are done through the TypeConverter classes 
+    associated with the data types.
+    
+    Example:
+    
+    ... ado.net/XML headers & schema ...
+    <resheader name="resmimetype">text/microsoft-resx</resheader>
+    <resheader name="version">2.0</resheader>
+    <resheader name="reader">System.Resources.ResXResourceReader, System.Windows.Forms, ...</resheader>
+    <resheader name="writer">System.Resources.ResXResourceWriter, System.Windows.Forms, ...</resheader>
+    <data name="Name1"><value>this is my long string</value><comment>this is a comment</comment></data>
+    <data name="Color1" type="System.Drawing.Color, System.Drawing">Blue</data>
+    <data name="Bitmap1" mimetype="application/x-microsoft.net.object.binary.base64">
+        <value>[base64 mime encoded serialized .NET Framework object]</value>
+    </data>
+    <data name="Icon1" type="System.Drawing.Icon, System.Drawing" mimetype="application/x-microsoft.net.object.bytearray.base64">
+        <value>[base64 mime encoded string representing a byte array form of the .NET Framework object]</value>
+        <comment>This is a comment</comment>
+    </data>
+                
+    There are any number of "resheader" rows that contain simple 
+    name/value pairs.
+    
+    Each data row contains a name, and value. The row also contains a 
+    type or mimetype. Type corresponds to a .NET class that support 
+    text/value conversion through the TypeConverter architecture. 
+    Classes that don't support this are serialized and stored with the 
+    mimetype set.
+    
+    The mimetype is used for serialized objects, and tells the 
+    ResXResourceReader how to depersist the object. This is currently not 
+    extensible. For a given mimetype the value must be set accordingly:
+    
+    Note - application/x-microsoft.net.object.binary.base64 is the format 
+    that the ResXResourceWriter will generate, however the reader can 
+    read any of the formats listed below.
+    
+    mimetype: application/x-microsoft.net.object.binary.base64
+    value   : The object must be serialized with 
+            : System.Runtime.Serialization.Formatters.Binary.BinaryFormatter
+            : and then encoded with base64 encoding.
+    
+    mimetype: application/x-microsoft.net.object.soap.base64
+    value   : The object must be serialized with 
+            : System.Runtime.Serialization.Formatters.Soap.SoapFormatter
+            : and then encoded with base64 encoding.
+
+    mimetype: application/x-microsoft.net.object.bytearray.base64
+    value   : The object must be serialized into a byte array 
+            : using a System.ComponentModel.TypeConverter
+            : and then encoded with base64 encoding.
+    -->
+  <xsd:schema id="root" xmlns="" xmlns:xsd="http://www.w3.org/2001/XMLSchema" xmlns:msdata="urn:schemas-microsoft-com:xml-msdata">
+    <xsd:import namespace="http://www.w3.org/XML/1998/namespace" />
+    <xsd:element name="root" msdata:IsDataSet="true">
+      <xsd:complexType>
+        <xsd:choice maxOccurs="unbounded">
+          <xsd:element name="metadata">
+            <xsd:complexType>
+              <xsd:sequence>
+                <xsd:element name="value" type="xsd:string" minOccurs="0" />
+              </xsd:sequence>
+              <xsd:attribute name="name" use="required" type="xsd:string" />
+              <xsd:attribute name="type" type="xsd:string" />
+              <xsd:attribute name="mimetype" type="xsd:string" />
+              <xsd:attribute ref="xml:space" />
+            </xsd:complexType>
+          </xsd:element>
+          <xsd:element name="assembly">
+            <xsd:complexType>
+              <xsd:attribute name="alias" type="xsd:string" />
+              <xsd:attribute name="name" type="xsd:string" />
+            </xsd:complexType>
+          </xsd:element>
+          <xsd:element name="data">
+            <xsd:complexType>
+              <xsd:sequence>
+                <xsd:element name="value" type="xsd:string" minOccurs="0" msdata:Ordinal="1" />
+                <xsd:element name="comment" type="xsd:string" minOccurs="0" msdata:Ordinal="2" />
+              </xsd:sequence>
+              <xsd:attribute name="name" type="xsd:string" use="required" msdata:Ordinal="1" />
+              <xsd:attribute name="type" type="xsd:string" msdata:Ordinal="3" />
+              <xsd:attribute name="mimetype" type="xsd:string" msdata:Ordinal="4" />
+              <xsd:attribute ref="xml:space" />
+            </xsd:complexType>
+          </xsd:element>
+          <xsd:element name="resheader">
+            <xsd:complexType>
+              <xsd:sequence>
+                <xsd:element name="value" type="xsd:string" minOccurs="0" msdata:Ordinal="1" />
+              </xsd:sequence>
+              <xsd:attribute name="name" type="xsd:string" use="required" />
+            </xsd:complexType>
+          </xsd:element>
+        </xsd:choice>
+      </xsd:complexType>
+    </xsd:element>
+  </xsd:schema>
+  <resheader name="resmimetype">
+    <value>text/microsoft-resx</value>
+  </resheader>
+  <resheader name="version">
+    <value>2.0</value>
+  </resheader>
+  <resheader name="reader">
+    <value>System.Resources.ResXResourceReader, System.Windows.Forms, Version=4.0.0.0, Culture=neutral, PublicKeyToken=b77a5c561934e089</value>
+  </resheader>
+  <resheader name="writer">
+    <value>System.Resources.ResXResourceWriter, System.Windows.Forms, Version=4.0.0.0, Culture=neutral, PublicKeyToken=b77a5c561934e089</value>
+  </resheader>
+  <data name="net_connections_createpipe_null" xml:space="preserve">
+    <value>The CreatePipe implementation returned null; a valid reference was expected.</value>
+  </data>
+  <data name="net_connections_createstream_null" xml:space="preserve">
+    <value>The CreateStream implementation returned null; a valid reference was expected.</value>
+  </data>
+  <data name="net_connections_no_create_overrides" xml:space="preserve">
+    <value>One of CreatePipe or CreateStream must be implemented</value>
+  </data>
+  <data name="net_connections_pipe_use_after_stream" xml:space="preserve">
+    <value>The Connection's Pipe may not be accessed after Stream has been accessed.</value>
+  </data>
+  <data name="net_connections_stream_use_after_pipe" xml:space="preserve">
+    <value>The Connection's Stream may not be accessed after Pipe has been accessed.</value>
+  </data>
+  <data name="net_connections_zero_byte_pipe_read" xml:space="preserve">
+    <value>The PipeReader returned a zero-length  read.</value>
+  </data>
+</root>
\ No newline at end of file
diff --git a/src/libraries/System.Net.Connections/src/System.Net.Connections.csproj b/src/libraries/System.Net.Connections/src/System.Net.Connections.csproj
new file mode 100644 (file)
index 0000000..c53e559
--- /dev/null
@@ -0,0 +1,26 @@
+<Project Sdk="Microsoft.NET.Sdk">
+  <PropertyGroup>
+    <TargetFrameworks>$(NetCoreAppCurrent)</TargetFrameworks>
+    <Nullable>enable</Nullable>
+  </PropertyGroup>
+  <ItemGroup>
+    <Compile Include="$(CommonPath)System\Threading\Tasks\TaskToApm.cs" Link="Common\System\Threading\Tasks\TaskToApm.cs" />
+    <Compile Include="System\Net\Connections\ConnectionBase.cs" />
+    <Compile Include="System\Net\Connections\ConnectionCloseMethod.cs" />
+    <Compile Include="System\Net\Connections\ConnectionExtensions.cs" />
+    <Compile Include="System\Net\Connections\ConnectionListenerFactory.cs" />
+    <Compile Include="System\Net\Connections\Connection.cs" />
+    <Compile Include="System\Net\Connections\DuplexPipeStream.cs" />
+    <Compile Include="System\Net\Connections\ConnectionFactory.cs" />
+    <Compile Include="System\Net\Connections\ConnectionListener.cs" />
+    <Compile Include="System\Net\Connections\IConnectionProperties.cs" />
+  </ItemGroup>
+  <ItemGroup>
+    <Reference Include="System.Runtime" />
+    <Reference Include="System.Memory" />
+    <Reference Include="System.Net.Primitives" />
+    <Reference Include="System.Threading" />
+    <Reference Include="System.Threading.Tasks" />
+    <Reference Include="System.IO.Pipelines" />
+  </ItemGroup>
+</Project>
diff --git a/src/libraries/System.Net.Connections/src/System/Net/Connections/Connection.cs b/src/libraries/System.Net.Connections/src/System/Net/Connections/Connection.cs
new file mode 100644 (file)
index 0000000..8f7d479
--- /dev/null
@@ -0,0 +1,265 @@
+// Licensed to the .NET Foundation under one or more agreements.
+// The .NET Foundation licenses this file to you under the MIT license.
+
+using System.Diagnostics;
+using System.Diagnostics.CodeAnalysis;
+using System.IO;
+using System.IO.Pipelines;
+using System.Runtime.ExceptionServices;
+using System.Threading;
+using System.Threading.Tasks;
+
+namespace System.Net.Connections
+{
+    /// <summary>
+    /// A connection.
+    /// </summary>
+    public abstract class Connection : ConnectionBase
+    {
+        private Stream? _stream;
+        private IDuplexPipe? _pipe;
+        private bool _initializing;
+
+        /// <summary>
+        /// The connection's <see cref="Stream"/>.
+        /// </summary>
+        public Stream Stream =>
+            _stream != null ? _stream :
+            _pipe != null ? throw new InvalidOperationException(SR.net_connections_stream_use_after_pipe) :
+            (_stream = CreateStream() ?? throw new InvalidOperationException(SR.net_connections_createstream_null));
+
+        /// <summary>
+        /// The connection's <see cref="IDuplexPipe"/>.
+        /// </summary>
+        public IDuplexPipe Pipe =>
+            _pipe != null ? _pipe :
+            _stream != null ? throw new InvalidOperationException(SR.net_connections_pipe_use_after_stream) :
+            (_pipe = CreatePipe() ?? throw new InvalidOperationException(SR.net_connections_createpipe_null));
+
+        /// <summary>
+        /// Initializes the <see cref="Stream"/> for the <see cref="Connection"/>.
+        /// </summary>
+        /// <returns>A <see cref="Stream"/>.</returns>
+        /// <remarks>
+        /// At least one of <see cref="CreateStream"/> and <see cref="CreatePipe"/> must be overridden.
+        /// If only <see cref="CreateStream"/> is overridden, a user accessing <see cref="Pipe"/> will get a <see cref="IDuplexPipe"/> wrapping the <see cref="Stream"/>.
+        /// </remarks>
+        protected virtual Stream CreateStream()
+        {
+            if (_initializing) throw new InvalidOperationException(SR.net_connections_no_create_overrides);
+
+            try
+            {
+                _initializing = true;
+
+                IDuplexPipe pipe = CreatePipe();
+                if (pipe == null) throw new InvalidOperationException(SR.net_connections_createpipe_null);
+
+                return new DuplexPipeStream(pipe);
+            }
+            finally
+            {
+                _initializing = false;
+            }
+        }
+
+        /// <summary>
+        /// Initializes the <see cref="Pipe"/> for the <see cref="Connection"/>.
+        /// </summary>
+        /// <returns>An <see cref="IDuplexPipe"/>.</returns>
+        /// <remarks>
+        /// At least one of <see cref="CreateStream"/> and <see cref="CreatePipe"/> must be overridden.
+        /// If only <see cref="CreatePipe"/> is overridden, a user accessing <see cref="Stream"/> will get a <see cref="Stream"/> wrapping the <see cref="Pipe"/>.
+        /// </remarks>
+        protected virtual IDuplexPipe CreatePipe()
+        {
+            if (_initializing) throw new InvalidOperationException(SR.net_connections_no_create_overrides);
+
+            try
+            {
+                _initializing = true;
+
+                Stream stream = CreateStream();
+                if (stream == null) throw new InvalidOperationException(SR.net_connections_createstream_null);
+
+                return new DuplexStreamPipe(stream);
+            }
+            finally
+            {
+                _initializing = false;
+            }
+        }
+
+        private sealed class DuplexStreamPipe : IDuplexPipe
+        {
+            private static readonly StreamPipeReaderOptions s_readerOpts = new StreamPipeReaderOptions(leaveOpen: true);
+            private static readonly StreamPipeWriterOptions s_writerOpts = new StreamPipeWriterOptions(leaveOpen: true);
+
+            public DuplexStreamPipe(Stream stream)
+            {
+                Input = PipeReader.Create(stream, s_readerOpts);
+                Output = PipeWriter.Create(stream, s_writerOpts);
+            }
+
+            public PipeReader Input { get; }
+
+            public PipeWriter Output { get; }
+        }
+
+        /// <summary>
+        /// Creates a connection for a <see cref="Stream"/>.
+        /// </summary>
+        /// <param name="stream">The connection's <see cref="Connection.Stream"/>.</param>
+        /// <param name="leaveOpen">If false, the <paramref name="stream"/> will be disposed of once the connection has been closed.</param>
+        /// <param name="properties">The connection's <see cref="ConnectionBase.ConnectionProperties"/>.</param>
+        /// <param name="localEndPoint">The connection's <see cref="ConnectionBase.LocalEndPoint"/>.</param>
+        /// <param name="remoteEndPoint">The connection's <see cref="ConnectionBase.RemoteEndPoint"/>.</param>
+        /// <returns>A new <see cref="Connection"/>.</returns>
+        public static Connection FromStream(Stream stream, bool leaveOpen = false, IConnectionProperties? properties = null, EndPoint? localEndPoint = null, EndPoint? remoteEndPoint = null)
+        {
+            if (stream == null) throw new ArgumentNullException(nameof(stream));
+            return new ConnectionFromStream(stream, leaveOpen, properties, localEndPoint, remoteEndPoint);
+        }
+
+        /// <summary>
+        /// Creates a connection for an <see cref="IDuplexPipe"/>.
+        /// </summary>
+        /// <param name="pipe">The connection's <see cref="Connection.Pipe"/>.</param>
+        /// <param name="leaveOpen">If false and the <paramref name="pipe"/> implements <see cref="IAsyncDisposable"/> or <see cref="IDisposable"/>, it will be disposed of once the connection has been closed.</param>
+        /// <param name="properties">The connection's <see cref="ConnectionBase.ConnectionProperties"/>.</param>
+        /// <param name="localEndPoint">The connection's <see cref="ConnectionBase.LocalEndPoint"/>.</param>
+        /// <param name="remoteEndPoint">The connection's <see cref="ConnectionBase.RemoteEndPoint"/>.</param>
+        /// <returns>A new <see cref="Connection"/>.</returns>
+        public static Connection FromPipe(IDuplexPipe pipe, bool leaveOpen = false, IConnectionProperties? properties = null, EndPoint? localEndPoint = null, EndPoint? remoteEndPoint = null)
+        {
+            if (pipe == null) throw new ArgumentNullException(nameof(pipe));
+            return new ConnectionFromPipe(pipe, leaveOpen, properties, localEndPoint, remoteEndPoint);
+        }
+
+        private sealed class ConnectionFromStream : Connection, IConnectionProperties
+        {
+            private Stream? _originalStream;
+            private IConnectionProperties? _properties;
+            private readonly bool _leaveOpen;
+
+            public override IConnectionProperties ConnectionProperties => _properties ?? this;
+
+            public override EndPoint? LocalEndPoint { get; }
+
+            public override EndPoint? RemoteEndPoint { get; }
+
+            public ConnectionFromStream(Stream stream, bool leaveOpen, IConnectionProperties? properties, EndPoint? localEndPoint, EndPoint? remoteEndPoint)
+            {
+                _originalStream = stream;
+                _leaveOpen = leaveOpen;
+                _properties = properties;
+                LocalEndPoint = localEndPoint;
+                RemoteEndPoint = remoteEndPoint;
+            }
+
+            protected override Stream CreateStream() => _originalStream ?? throw new ObjectDisposedException(nameof(Connection));
+
+            protected override async ValueTask CloseAsyncCore(ConnectionCloseMethod method, CancellationToken cancellationToken)
+            {
+                if (_originalStream == null)
+                {
+                    return;
+                }
+
+                if (method == ConnectionCloseMethod.GracefulShutdown)
+                {
+                    await _originalStream.FlushAsync(cancellationToken).ConfigureAwait(false);
+                }
+
+                if (!_leaveOpen)
+                {
+                    await _originalStream.DisposeAsync().ConfigureAwait(false);
+                }
+
+                _originalStream = null;
+            }
+
+            bool IConnectionProperties.TryGet(Type propertyKey, [NotNullWhen(true)] out object? property)
+            {
+                property = null;
+                return false;
+            }
+        }
+
+        private sealed class ConnectionFromPipe : Connection, IConnectionProperties
+        {
+            private IDuplexPipe? _originalPipe;
+            private IConnectionProperties? _properties;
+            private readonly bool _leaveOpen;
+
+            public override IConnectionProperties ConnectionProperties => _properties ?? this;
+
+            public override EndPoint? LocalEndPoint { get; }
+
+            public override EndPoint? RemoteEndPoint { get; }
+
+            public ConnectionFromPipe(IDuplexPipe pipe, bool leaveOpen, IConnectionProperties? properties, EndPoint? localEndPoint, EndPoint? remoteEndPoint)
+            {
+                _originalPipe = pipe;
+                _leaveOpen = leaveOpen;
+                _properties = properties;
+                LocalEndPoint = localEndPoint;
+                RemoteEndPoint = remoteEndPoint;
+            }
+
+            protected override IDuplexPipe CreatePipe() => _originalPipe ?? throw new ObjectDisposedException(nameof(Connection));
+
+            protected override async ValueTask CloseAsyncCore(ConnectionCloseMethod method, CancellationToken cancellationToken)
+            {
+                if (_originalPipe == null)
+                {
+                    return;
+                }
+
+                Exception? inputException, outputException;
+
+                if (method == ConnectionCloseMethod.GracefulShutdown)
+                {
+                    // Flush happens implicitly from CompleteAsync(null), so only flush here if we need cancellation.
+                    if (cancellationToken.CanBeCanceled)
+                    {
+                        FlushResult r = await _originalPipe.Output.FlushAsync(cancellationToken).ConfigureAwait(false);
+                        if (r.IsCanceled) cancellationToken.ThrowIfCancellationRequested();
+                    }
+
+                    inputException = null;
+                    outputException = null;
+                }
+                else
+                {
+                    inputException = ExceptionDispatchInfo.SetCurrentStackTrace(new ObjectDisposedException(nameof(Connection)));
+                    outputException = ExceptionDispatchInfo.SetCurrentStackTrace(new ObjectDisposedException(nameof(Connection)));
+                }
+
+                await _originalPipe.Input.CompleteAsync(inputException).ConfigureAwait(false);
+                await _originalPipe.Output.CompleteAsync(outputException).ConfigureAwait(false);
+
+                if (!_leaveOpen)
+                {
+                    switch (_originalPipe)
+                    {
+                        case IAsyncDisposable d:
+                            await d.DisposeAsync().ConfigureAwait(false);
+                            break;
+                        case IDisposable d:
+                            d.Dispose();
+                            break;
+                    }
+                }
+
+                _originalPipe = null;
+            }
+
+            bool IConnectionProperties.TryGet(Type propertyKey, [NotNullWhen(true)] out object? property)
+            {
+                property = null;
+                return false;
+            }
+        }
+    }
+}
diff --git a/src/libraries/System.Net.Connections/src/System/Net/Connections/ConnectionBase.cs b/src/libraries/System.Net.Connections/src/System/Net/Connections/ConnectionBase.cs
new file mode 100644 (file)
index 0000000..027431b
--- /dev/null
@@ -0,0 +1,80 @@
+// Licensed to the .NET Foundation under one or more agreements.
+// The .NET Foundation licenses this file to you under the MIT license.
+
+using System.Threading;
+using System.Threading.Tasks;
+
+namespace System.Net.Connections
+{
+    /// <summary>
+    /// Provides base functionality shared between singular (e.g. TCP) and multiplexed (e.g. QUIC) connections.
+    /// </summary>
+    public abstract class ConnectionBase : IDisposable, IAsyncDisposable
+    {
+        private bool _disposed;
+
+        /// <summary>
+        /// Properties exposed by this connection.
+        /// </summary>
+        public abstract IConnectionProperties ConnectionProperties { get; }
+
+        /// <summary>
+        /// The local endpoint of this connection, if any.
+        /// </summary>
+        public abstract EndPoint? LocalEndPoint { get; }
+
+        /// <summary>
+        /// The remote endpoint of this connection, if any.
+        /// </summary>
+        public abstract EndPoint? RemoteEndPoint { get; }
+
+        /// <summary>
+        /// Closes the connection.
+        /// </summary>
+        /// <param name="method">The method to use when closing the connection.</param>
+        /// <param name="cancellationToken">A cancellation token for the asynchronous operation.</param>
+        /// <returns>A <see cref="ValueTask"/> for the asynchronous operation.</returns>
+        public async ValueTask CloseAsync(ConnectionCloseMethod method = ConnectionCloseMethod.GracefulShutdown, CancellationToken cancellationToken = default)
+        {
+            if (!_disposed)
+            {
+                await CloseAsyncCore(method, cancellationToken).ConfigureAwait(false);
+                _disposed = true;
+            }
+            GC.SuppressFinalize(this);
+        }
+
+        /// <summary>
+        /// Closes the connection.
+        /// </summary>
+        /// <param name="method">The method to use when closing the connection.</param>
+        /// <param name="cancellationToken">A cancellation token for the asynchronous operation.</param>
+        /// <returns>A <see cref="ValueTask"/> for the asynchronous operation.</returns>
+        protected abstract ValueTask CloseAsyncCore(ConnectionCloseMethod method, CancellationToken cancellationToken);
+
+        /// <summary>
+        /// Disposes of the connection.
+        /// </summary>
+        /// <remarks>
+        /// This is equivalent to calling <see cref="CloseAsync(ConnectionCloseMethod, CancellationToken)"/> with the method <see cref="ConnectionCloseMethod.GracefulShutdown"/>, and calling GetAwaiter().GetResult() on the resulting task.
+        /// To increase likelihood of synchronous completion, call <see cref="CloseAsync(ConnectionCloseMethod, CancellationToken)"/> directly with the method <see cref="ConnectionCloseMethod.Immediate"/>.
+        /// </remarks>
+        public void Dispose()
+        {
+            ValueTask t = CloseAsync(ConnectionCloseMethod.GracefulShutdown, CancellationToken.None);
+
+            if (t.IsCompleted) t.GetAwaiter().GetResult();
+            else t.AsTask().GetAwaiter().GetResult();
+        }
+
+        /// <summary>
+        /// Disposes of the connection.
+        /// </summary>
+        /// <returns>A <see cref="ValueTask"/> for the asynchronous operation.</returns>
+        /// <remarks>This is equivalent to calling <see cref="CloseAsync(ConnectionCloseMethod, CancellationToken)"/> with the method <see cref="ConnectionCloseMethod.GracefulShutdown"/>.</remarks>
+        public ValueTask DisposeAsync()
+        {
+            return CloseAsync(ConnectionCloseMethod.GracefulShutdown, CancellationToken.None);
+        }
+    }
+}
diff --git a/src/libraries/System.Net.Connections/src/System/Net/Connections/ConnectionCloseMethod.cs b/src/libraries/System.Net.Connections/src/System/Net/Connections/ConnectionCloseMethod.cs
new file mode 100644 (file)
index 0000000..6b5a079
--- /dev/null
@@ -0,0 +1,26 @@
+// Licensed to the .NET Foundation under one or more agreements.
+// The .NET Foundation licenses this file to you under the MIT license.
+
+namespace System.Net.Connections
+{
+    /// <summary>
+    /// Methods for closing a connection.
+    /// </summary>
+    public enum ConnectionCloseMethod
+    {
+        /// <summary>
+        /// The connection should be flushed and closed.
+        /// </summary>
+        GracefulShutdown,
+
+        /// <summary>
+        /// The connection should be aborted gracefully, performing any I/O needed to notify the other side of the connection that it has been aborted.
+        /// </summary>
+        Abort,
+
+        /// <summary>
+        /// The connection should be aborted immediately, avoiding any I/O needed to notify the other side of the connection that it has been aborted.
+        /// </summary>
+        Immediate
+    }
+}
diff --git a/src/libraries/System.Net.Connections/src/System/Net/Connections/ConnectionExtensions.cs b/src/libraries/System.Net.Connections/src/System/Net/Connections/ConnectionExtensions.cs
new file mode 100644 (file)
index 0000000..6850247
--- /dev/null
@@ -0,0 +1,87 @@
+// Licensed to the .NET Foundation under one or more agreements.
+// The .NET Foundation licenses this file to you under the MIT license.
+
+using System.Diagnostics.CodeAnalysis;
+using System.Threading;
+using System.Threading.Tasks;
+
+namespace System.Net.Connections
+{
+    /// <summary>
+    /// Extension methods for working with the System.Net.Connections types.
+    /// </summary>
+    public static class ConnectionExtensions
+    {
+        /// <summary>
+        /// Retrieves a Type-based property from an <see cref="IConnectionProperties"/>, if it exists.
+        /// </summary>
+        /// <typeparam name="T">The type of the property to retrieve.</typeparam>
+        /// <param name="properties">The connection properties to retrieve a property from.</param>
+        /// <param name="property">If <paramref name="properties"/> contains a property of type <typeparamref name="T"/>, receives the property. Otherwise, default.</param>
+        /// <returns>If <paramref name="properties"/> contains a property of type <typeparamref name="T"/>, true. Otherwise, false.</returns>
+        public static bool TryGet<T>(this IConnectionProperties properties, [MaybeNullWhen(false)] out T property)
+        {
+            if (properties == null) throw new ArgumentNullException(nameof(properties));
+
+            if (properties.TryGet(typeof(T), out object? obj) && obj is T propertyValue)
+            {
+                property = propertyValue;
+                return true;
+            }
+            else
+            {
+                property = default;
+                return false;
+            }
+        }
+
+        /// <summary>
+        /// Creates a connection-level filter on top of a <see cref="ConnectionFactory"/>.
+        /// </summary>
+        /// <param name="factory">The factory to be filtered.</param>
+        /// <param name="filter">The connection-level filter to apply on top of <paramref name="factory"/>.</param>
+        /// <returns>A new filtered <see cref="ConnectionFactory"/>.</returns>
+        public static ConnectionFactory Filter(this ConnectionFactory factory, Func<Connection, IConnectionProperties?, CancellationToken, ValueTask<Connection>> filter)
+        {
+            if (factory == null) throw new ArgumentNullException(nameof(factory));
+            if (filter == null) throw new ArgumentNullException(nameof(filter));
+            return new ConnectionFilteringFactory(factory, filter);
+        }
+
+        private sealed class ConnectionFilteringFactory : ConnectionFactory
+        {
+            private readonly ConnectionFactory _baseFactory;
+            private readonly Func<Connection, IConnectionProperties?, CancellationToken, ValueTask<Connection>> _filter;
+
+            public ConnectionFilteringFactory(ConnectionFactory baseFactory, Func<Connection, IConnectionProperties?, CancellationToken, ValueTask<Connection>> filter)
+            {
+                _baseFactory = baseFactory;
+                _filter = filter;
+            }
+
+            public override async ValueTask<Connection> ConnectAsync(EndPoint? endPoint, IConnectionProperties? options = null, CancellationToken cancellationToken = default)
+            {
+                Connection con = await _baseFactory.ConnectAsync(endPoint, options, cancellationToken).ConfigureAwait(false);
+                try
+                {
+                    return await _filter(con, options, cancellationToken).ConfigureAwait(false);
+                }
+                catch
+                {
+                    await con.CloseAsync(ConnectionCloseMethod.Abort, cancellationToken).ConfigureAwait(false);
+                    throw;
+                }
+            }
+
+            protected override void Dispose(bool disposing)
+            {
+                if (disposing) _baseFactory.Dispose();
+            }
+
+            protected override ValueTask DisposeAsyncCore()
+            {
+                return _baseFactory.DisposeAsync();
+            }
+        }
+    }
+}
diff --git a/src/libraries/System.Net.Connections/src/System/Net/Connections/ConnectionFactory.cs b/src/libraries/System.Net.Connections/src/System/Net/Connections/ConnectionFactory.cs
new file mode 100644 (file)
index 0000000..812f78a
--- /dev/null
@@ -0,0 +1,53 @@
+// Licensed to the .NET Foundation under one or more agreements.
+// The .NET Foundation licenses this file to you under the MIT license.
+
+using System.Threading;
+using System.Threading.Tasks;
+
+namespace System.Net.Connections
+{
+    /// <summary>
+    /// A factory for opening outgoing connections.
+    /// </summary>
+    public abstract class ConnectionFactory : IAsyncDisposable, IDisposable
+    {
+        /// <summary>
+        /// Opens a new <see cref="Connection"/>.
+        /// </summary>
+        /// <param name="endPoint">The <see cref="EndPoint"/> to connect to, if any.</param>
+        /// <param name="options">Options used to create the connection, if any.</param>
+        /// <param name="cancellationToken">A token used to cancel the asynchronous operation.</param>
+        /// <returns>A <see cref="ValueTask{TResult}"/> for the <see cref="Connection"/>.</returns>
+        public abstract ValueTask<Connection> ConnectAsync(EndPoint? endPoint, IConnectionProperties? options = null, CancellationToken cancellationToken = default);
+
+        public void Dispose()
+        {
+            Dispose(true);
+            GC.SuppressFinalize(this);
+        }
+
+        public async ValueTask DisposeAsync()
+        {
+            await DisposeAsyncCore().ConfigureAwait(false);
+            GC.SuppressFinalize(this);
+        }
+
+        /// <summary>
+        /// Disposes the <see cref="ConnectionFactory"/>.
+        /// </summary>
+        /// <param name="disposing">If true, the <see cref="ConnectionFactory"/> is being disposed. If false, the <see cref="ConnectionFactory"/> is being finalized.</param>
+        protected virtual void Dispose(bool disposing)
+        {
+        }
+
+        /// <summary>
+        /// Asynchronously disposes the <see cref="ConnectionFactory"/>.
+        /// </summary>
+        /// <returns>A <see cref="ValueTask"/> representing the asynchronous dispose operation.</returns>
+        protected virtual ValueTask DisposeAsyncCore()
+        {
+            Dispose(true);
+            return default;
+        }
+    }
+}
diff --git a/src/libraries/System.Net.Connections/src/System/Net/Connections/ConnectionListener.cs b/src/libraries/System.Net.Connections/src/System/Net/Connections/ConnectionListener.cs
new file mode 100644 (file)
index 0000000..299ba0d
--- /dev/null
@@ -0,0 +1,62 @@
+// Licensed to the .NET Foundation under one or more agreements.
+// The .NET Foundation licenses this file to you under the MIT license.
+
+using System.Threading;
+using System.Threading.Tasks;
+
+namespace System.Net.Connections
+{
+    /// <summary>
+    /// A listener to accept incoming connections.
+    /// </summary>
+    public abstract class ConnectionListener : IAsyncDisposable, IDisposable
+    {
+        /// <summary>
+        /// Properties exposed by this listener.
+        /// </summary>
+        public abstract IConnectionProperties ListenerProperties { get; }
+
+        /// <summary>
+        /// The local endpoint of this connection, if any.
+        /// </summary>
+        public abstract EndPoint? LocalEndPoint { get; }
+
+        /// <summary>
+        /// Accepts an incoming connection.
+        /// </summary>
+        /// <param name="options">Options used to create the connection, if any.</param>
+        /// <param name="cancellationToken">A token used to cancel the asynchronous operation.</param>
+        /// <returns>A <see cref="ValueTask{TResult}"/> for the <see cref="Connection"/>.</returns>
+        public abstract ValueTask<Connection> AcceptAsync(IConnectionProperties? options = null, CancellationToken cancellationToken = default);
+
+        public void Dispose()
+        {
+            Dispose(true);
+            GC.SuppressFinalize(this);
+        }
+
+        public async ValueTask DisposeAsync()
+        {
+            await DisposeAsyncCore().ConfigureAwait(false);
+            GC.SuppressFinalize(this);
+        }
+
+        /// <summary>
+        /// Disposes the <see cref="ConnectionFactory"/>.
+        /// </summary>
+        /// <param name="disposing">If true, the <see cref="ConnectionFactory"/> is being disposed. If false, the <see cref="ConnectionFactory"/> is being finalized.</param>
+        protected virtual void Dispose(bool disposing)
+        {
+        }
+
+        /// <summary>
+        /// Asynchronously disposes the <see cref="ConnectionFactory"/>.
+        /// </summary>
+        /// <returns>A <see cref="ValueTask"/> representing the asynchronous dispose operation.</returns>
+        protected virtual ValueTask DisposeAsyncCore()
+        {
+            Dispose(true);
+            return default;
+        }
+    }
+}
diff --git a/src/libraries/System.Net.Connections/src/System/Net/Connections/ConnectionListenerFactory.cs b/src/libraries/System.Net.Connections/src/System/Net/Connections/ConnectionListenerFactory.cs
new file mode 100644 (file)
index 0000000..d208f05
--- /dev/null
@@ -0,0 +1,46 @@
+// Licensed to the .NET Foundation under one or more agreements.
+// The .NET Foundation licenses this file to you under the MIT license.
+
+using System.Threading;
+using System.Threading.Tasks;
+
+namespace System.Net.Connections
+{
+    /// <summary>
+    /// A factory for creating connection listeners, to accept incoming connections.
+    /// </summary>
+    public abstract class ConnectionListenerFactory : IAsyncDisposable, IDisposable
+    {
+        public abstract ValueTask<ConnectionListener> BindAsync(EndPoint? endPoint, IConnectionProperties? options = null, CancellationToken cancellationToken = default);
+
+        public void Dispose()
+        {
+            Dispose(true);
+            GC.SuppressFinalize(this);
+        }
+
+        public async ValueTask DisposeAsync()
+        {
+            await DisposeAsyncCore().ConfigureAwait(false);
+            GC.SuppressFinalize(this);
+        }
+
+        /// <summary>
+        /// Disposes the <see cref="ConnectionFactory"/>.
+        /// </summary>
+        /// <param name="disposing">If true, the <see cref="ConnectionFactory"/> is being disposed. If false, the <see cref="ConnectionFactory"/> is being finalized.</param>
+        protected virtual void Dispose(bool disposing)
+        {
+        }
+
+        /// <summary>
+        /// Asynchronously disposes the <see cref="ConnectionFactory"/>.
+        /// </summary>
+        /// <returns>A <see cref="ValueTask"/> representing the asynchronous dispose operation.</returns>
+        protected virtual ValueTask DisposeAsyncCore()
+        {
+            Dispose(true);
+            return default;
+        }
+    }
+}
diff --git a/src/libraries/System.Net.Connections/src/System/Net/Connections/DuplexPipeStream.cs b/src/libraries/System.Net.Connections/src/System/Net/Connections/DuplexPipeStream.cs
new file mode 100644 (file)
index 0000000..3319c97
--- /dev/null
@@ -0,0 +1,166 @@
+// Licensed to the .NET Foundation under one or more agreements.
+// The .NET Foundation licenses this file to you under the MIT license.
+
+using System;
+using System.Buffers;
+using System.IO;
+using System.IO.Pipelines;
+using System.Runtime.ExceptionServices;
+using System.Threading;
+using System.Threading.Tasks;
+
+namespace System.Net.Connections
+{
+    internal sealed class DuplexPipeStream : Stream
+    {
+        private readonly PipeReader _reader;
+        private readonly PipeWriter _writer;
+
+        public override bool CanRead => true;
+        public override bool CanSeek => false;
+        public override bool CanWrite => true;
+        public override long Length => throw new NotSupportedException();
+        public override long Position { get => throw new NotSupportedException(); set => throw new NotSupportedException(); }
+
+        public DuplexPipeStream(IDuplexPipe pipe)
+        {
+            _reader = pipe.Input;
+            _writer = pipe.Output;
+        }
+
+        protected override void Dispose(bool disposing)
+        {
+            if (disposing)
+            {
+                _reader.Complete();
+                _writer.Complete();
+            }
+            base.Dispose(disposing);
+        }
+
+        public override async ValueTask DisposeAsync()
+        {
+            await _reader.CompleteAsync().ConfigureAwait(false);
+            await _writer.CompleteAsync().ConfigureAwait(false);
+        }
+
+        public override void Flush()
+        {
+            FlushAsync().GetAwaiter().GetResult();
+        }
+
+        public override async Task FlushAsync(CancellationToken cancellationToken)
+        {
+            FlushResult r = await _writer.FlushAsync(cancellationToken).ConfigureAwait(false);
+            if (r.IsCanceled) throw new OperationCanceledException(cancellationToken);
+        }
+
+        public override int Read(byte[] buffer, int offset, int count)
+        {
+            if (buffer == null) throw new ArgumentNullException(nameof(buffer));
+
+            ValueTask<int> t = ReadAsync(buffer.AsMemory(offset, count));
+            return
+                t.IsCompleted ? t.GetAwaiter().GetResult() :
+                t.AsTask().GetAwaiter().GetResult();
+        }
+
+        public override Task<int> ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
+        {
+            if (buffer == null) return Task.FromException<int>(ExceptionDispatchInfo.SetCurrentStackTrace(new ArgumentNullException(nameof(buffer))));
+            return ReadAsync(buffer.AsMemory(offset, count), cancellationToken).AsTask();
+        }
+
+        public override async ValueTask<int> ReadAsync(Memory<byte> buffer, CancellationToken cancellationToken = default)
+        {
+            ReadResult result = await _reader.ReadAsync(cancellationToken).ConfigureAwait(false);
+
+            if (result.IsCanceled)
+            {
+                throw new OperationCanceledException();
+            }
+
+            ReadOnlySequence<byte> sequence = result.Buffer;
+            long bufferLength = sequence.Length;
+            SequencePosition consumed = sequence.Start;
+
+            try
+            {
+                if (bufferLength != 0)
+                {
+                    int actual = (int)Math.Min(bufferLength, buffer.Length);
+
+                    ReadOnlySequence<byte> slice = actual == bufferLength ? sequence : sequence.Slice(0, actual);
+                    consumed = slice.End;
+                    slice.CopyTo(buffer.Span);
+
+                    return actual;
+                }
+
+                if (result.IsCompleted)
+                {
+                    return 0;
+                }
+            }
+            finally
+            {
+                _reader.AdvanceTo(consumed);
+            }
+
+            // This is a buggy PipeReader implementation that returns 0 byte reads even though the PipeReader
+            // isn't completed or canceled.
+            throw new InvalidOperationException(SR.net_connections_zero_byte_pipe_read);
+        }
+
+        public override IAsyncResult BeginRead(byte[] buffer, int offset, int count, AsyncCallback? callback, object? state)
+        {
+            return TaskToApm.Begin(ReadAsync(buffer, offset, count), callback, state);
+        }
+
+        public override int EndRead(IAsyncResult asyncResult)
+        {
+            return TaskToApm.End<int>(asyncResult);
+        }
+
+        public override long Seek(long offset, SeekOrigin origin)
+        {
+            throw new NotSupportedException();
+        }
+
+        public override void SetLength(long value)
+        {
+            throw new NotSupportedException();
+        }
+
+        public override void Write(byte[] buffer, int offset, int count)
+        {
+            WriteAsync(buffer, offset, count, CancellationToken.None).GetAwaiter().GetResult();
+        }
+
+        public override Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
+        {
+            return WriteAsync(buffer.AsMemory(offset, count), cancellationToken).AsTask();
+        }
+
+        public override async ValueTask WriteAsync(ReadOnlyMemory<byte> buffer, CancellationToken cancellationToken = default)
+        {
+            FlushResult r = await _writer.WriteAsync(buffer, cancellationToken).ConfigureAwait(false);
+            if (r.IsCanceled) throw new OperationCanceledException(cancellationToken);
+        }
+
+        public override IAsyncResult BeginWrite(byte[] buffer, int offset, int count, AsyncCallback? callback, object? state)
+        {
+            return TaskToApm.Begin(WriteAsync(buffer, offset, count), callback, state);
+        }
+
+        public override void EndWrite(IAsyncResult asyncResult)
+        {
+            TaskToApm.End(asyncResult);
+        }
+
+        public override Task CopyToAsync(Stream destination, int bufferSize, CancellationToken cancellationToken)
+        {
+            return _reader.CopyToAsync(destination, cancellationToken);
+        }
+    }
+}
diff --git a/src/libraries/System.Net.Connections/src/System/Net/Connections/IConnectionProperties.cs b/src/libraries/System.Net.Connections/src/System/Net/Connections/IConnectionProperties.cs
new file mode 100644 (file)
index 0000000..ce89dd9
--- /dev/null
@@ -0,0 +1,21 @@
+// Licensed to the .NET Foundation under one or more agreements.
+// The .NET Foundation licenses this file to you under the MIT license.
+
+using System.Diagnostics.CodeAnalysis;
+
+namespace System.Net.Connections
+{
+    /// <summary>
+    /// A container for connection properties.
+    /// </summary>
+    public interface IConnectionProperties
+    {
+        /// <summary>
+        /// Retrieves a connection property, if it exists.
+        /// </summary>
+        /// <param name="propertyKey">The key of the property to retrieve.</param>
+        /// <param name="property">If the property was found, retrieves the property. Otherwise, null.</param>
+        /// <returns>If the property was found, true. Otherwise, false.</returns>
+        bool TryGet(Type propertyKey, [NotNullWhen(true)] out object? property);
+    }
+}
diff --git a/src/libraries/System.Net.Connections/tests/System.Net.Connections.Tests/ConnectionBaseTest.cs b/src/libraries/System.Net.Connections/tests/System.Net.Connections.Tests/ConnectionBaseTest.cs
new file mode 100644 (file)
index 0000000..66cc4a2
--- /dev/null
@@ -0,0 +1,81 @@
+// Licensed to the .NET Foundation under one or more agreements.
+// The .NET Foundation licenses this file to you under the MIT license.
+
+using System.Threading.Tasks;
+using Xunit;
+
+namespace System.Net.Connections.Tests
+{
+    public class ConnectionBaseTest
+    {
+        [Fact]
+        public void Dispose_CallsClose_Success()
+        {
+            ConnectionCloseMethod? method = null;
+
+            var con = new MockConnection();
+            con.OnCloseAsyncCore = (m, t) =>
+            {
+                method = m;
+                return default(ValueTask);
+            };
+
+            con.Dispose();
+
+            Assert.Equal(ConnectionCloseMethod.GracefulShutdown, method);
+        }
+
+        [Fact]
+        public async Task DisposeAsync_CallsClose_Success()
+        {
+            ConnectionCloseMethod? method = null;
+
+            var con = new MockConnection();
+            con.OnCloseAsyncCore = (m, t) =>
+            {
+                method = m;
+                return default(ValueTask);
+            };
+
+            await con.DisposeAsync();
+
+            Assert.Equal(ConnectionCloseMethod.GracefulShutdown, method);
+        }
+
+        [Fact]
+        public void Dispose_CalledOnce_Success()
+        {
+            int callCount = 0;
+
+            var con = new MockConnection();
+            con.OnCloseAsyncCore = delegate
+            {
+                ++callCount;
+                return default(ValueTask);
+            };
+
+            con.Dispose();
+            con.Dispose();
+
+            Assert.Equal(1, callCount);
+        }
+
+        [Fact]
+        public async Task DisposeAsync_CalledOnce_Success()
+        {
+            int callCount = 0;
+
+            var con = new MockConnection();
+            con.OnCloseAsyncCore = delegate
+            {
+                ++callCount;
+                return default(ValueTask);
+            };
+
+            await con.DisposeAsync();
+            await con.DisposeAsync();
+
+            Assert.Equal(1, callCount);
+        }
+    }
+}
diff --git a/src/libraries/System.Net.Connections/tests/System.Net.Connections.Tests/ConnectionTest.cs b/src/libraries/System.Net.Connections/tests/System.Net.Connections.Tests/ConnectionTest.cs
new file mode 100644 (file)
index 0000000..ad16e45
--- /dev/null
@@ -0,0 +1,264 @@
+// Licensed to the .NET Foundation under one or more agreements.
+// The .NET Foundation licenses this file to you under the MIT license.
+
+using System.Buffers;
+using System.Diagnostics;
+using System.Diagnostics.CodeAnalysis;
+using System.IO;
+using System.IO.Pipelines;
+using System.Linq;
+using System.Text;
+using System.Threading;
+using System.Threading.Tasks;
+using Xunit;
+
+namespace System.Net.Connections.Tests
+{
+    public class ConnectionTest
+    {
+        [Fact]
+        public void CreateStream_CalledOnce_Success()
+        {
+            int callCount = 0;
+
+            var con = new MockConnection();
+            con.OnCreateStream = () =>
+            {
+                ++callCount;
+                return new MemoryStream();
+            };
+
+            _ = con.Stream;
+            _ = con.Stream;
+
+            Assert.Equal(1, callCount);
+        }
+
+        [Fact]
+        public void CreatePipe_CalledOnce_Success()
+        {
+            int callCount = 0;
+
+            var con = new MockConnection();
+            con.OnCreatePipe = () =>
+            {
+                ++callCount;
+                return new MockPipe();
+            };
+
+            _ = con.Pipe;
+            _ = con.Pipe;
+
+            Assert.Equal(1, callCount);
+        }
+
+        [Fact]
+        public void AccessStream_AccessPipe_Fail()
+        {
+            var con = new MockConnection();
+            con.OnCreateStream = () => new MemoryStream();
+
+            _ = con.Stream;
+            Assert.Throws<InvalidOperationException>(() => _ = con.Pipe);
+        }
+
+        [Fact]
+        public void AccessPipe_AccessStream_Fail()
+        {
+            var con = new MockConnection();
+            con.OnCreatePipe = () => new MockPipe();
+
+            _ = con.Pipe;
+            Assert.Throws<InvalidOperationException>(() => _ = con.Stream);
+        }
+
+        [Fact]
+        public void AccessStream_NoOverloads_Fail()
+        {
+            var con = new ConnectionWithoutStreamOrPipe();
+            Assert.Throws<InvalidOperationException>(() => _ = con.Stream);
+        }
+
+        [Fact]
+        public void AccessPipe_NoOverloads_Fail()
+        {
+            var con = new ConnectionWithoutStreamOrPipe();
+            Assert.Throws<InvalidOperationException>(() => _ = con.Pipe);
+        }
+
+        [Fact]
+        public async Task WrappedStream_Success()
+        {
+            var bytesA = Encoding.ASCII.GetBytes("foo");
+            var bytesB = Encoding.ASCII.GetBytes("bar");
+
+            var stream = new MemoryStream();
+            stream.Write(bytesA);
+            stream.Position = 0;
+
+            var con = new MockConnection();
+            con.OnCreateStream = () => stream;
+
+            IDuplexPipe pipe = con.Pipe;
+
+            ReadResult res = await pipe.Input.ReadAsync();
+            Assert.Equal(bytesA, res.Buffer.ToArray());
+
+            await pipe.Output.WriteAsync(bytesB);
+            Assert.Equal(bytesA.Concat(bytesB).ToArray(), stream.ToArray());
+        }
+
+        [Fact]
+        public async Task WrappedPipe_Success()
+        {
+            var bytesA = Encoding.ASCII.GetBytes("foo");
+            var bytesB = Encoding.ASCII.GetBytes("bar");
+
+            var stream = new MemoryStream();
+            stream.Write(bytesA);
+            stream.Position = 0;
+
+            var pipe = new MockPipe
+            {
+                Input = PipeReader.Create(stream),
+                Output = PipeWriter.Create(stream)
+            };
+
+            var con = new MockConnection();
+            con.OnCreatePipe = () => pipe;
+
+            Stream s = con.Stream;
+
+            var readBuffer = new byte[4];
+            int len = await s.ReadAsync(readBuffer);
+            Assert.Equal(3, len);
+            Assert.Equal(bytesA, readBuffer.AsSpan(0, len).ToArray());
+
+            await s.WriteAsync(bytesB);
+            Assert.Equal(bytesA.Concat(bytesB).ToArray(), stream.ToArray());
+        }
+
+        [Theory]
+        [InlineData(ConnectionCloseMethod.GracefulShutdown, true)]
+        [InlineData(ConnectionCloseMethod.Abort, false)]
+        [InlineData(ConnectionCloseMethod.Immediate, false)]
+        public async Task FromStream_CloseMethod_Flushed(ConnectionCloseMethod method, bool shouldFlush)
+        {
+            bool streamFlushed = false;
+
+            var stream = new MockStream
+            {
+                OnFlushAsync = _ => { streamFlushed = true; return Task.CompletedTask; }
+            };
+
+            var con = Connection.FromStream(stream, leaveOpen: true);
+
+            await con.CloseAsync(method);
+            Assert.Equal(shouldFlush, streamFlushed);
+        }
+
+        [Theory]
+        [InlineData(ConnectionCloseMethod.GracefulShutdown, true)]
+        [InlineData(ConnectionCloseMethod.Abort, false)]
+        [InlineData(ConnectionCloseMethod.Immediate, false)]
+        public async Task FromPipe_CloseMethod_Flushed(ConnectionCloseMethod method, bool shouldFlush)
+        {
+            bool pipeFlushed = false;
+
+            var pipe = new MockPipe
+            {
+                Input = new MockPipeReader()
+                {
+                    OnCompleteAsync = _ => default
+                },
+                Output = new MockPipeWriter()
+                {
+                    OnFlushAsync = _ => { pipeFlushed = true; return default; },
+                    OnCompleteAsync = _ => default
+                }
+            };
+
+            var con = Connection.FromPipe(pipe);
+
+            await con.CloseAsync(method, new CancellationTokenSource().Token);
+            Assert.Equal(shouldFlush, pipeFlushed);
+        }
+
+        [Theory]
+        [InlineData(true, false)]
+        [InlineData(false, true)]
+        public async Task FromStream_LeaveOpen_StreamDisposed(bool leaveOpen, bool shouldDispose)
+        {
+            bool streamDisposed = false;
+
+            var stream = new MockStream();
+            stream.OnDisposeAsync = delegate { streamDisposed = true; return default; };
+
+            var con = Connection.FromStream(stream, leaveOpen);
+
+            await con.CloseAsync(ConnectionCloseMethod.Immediate);
+            Assert.Equal(shouldDispose, streamDisposed);
+        }
+
+        [Theory]
+        [InlineData(true, false)]
+        [InlineData(false, true)]
+        public async Task FromPipe_LeaveOpen_PipeDisposed(bool leaveOpen, bool shouldDispose)
+        {
+            bool pipeDisposed = false;
+
+            var pipe = new MockPipe
+            {
+                OnDisposeAsync = () => { pipeDisposed = true; return default; },
+                Input = new MockPipeReader()
+                {
+                    OnCompleteAsync = _ => default
+                },
+                Output = new MockPipeWriter()
+                {
+                    OnFlushAsync = _ => default,
+                    OnCompleteAsync = _ => default
+                }
+            };
+
+            var con = Connection.FromPipe(pipe, leaveOpen);
+
+            await con.CloseAsync(ConnectionCloseMethod.Immediate);
+            Assert.Equal(shouldDispose, pipeDisposed);
+        }
+
+        [Fact]
+        public void FromStream_PropertiesInitialized()
+        {
+            var properties = new DummyConnectionProperties();
+            var localEndPoint = new IPEndPoint(IPAddress.Any, 1);
+            var remoteEndPoint = new IPEndPoint(IPAddress.Any, 2);
+
+            Connection c = Connection.FromStream(new MockStream(), leaveOpen: false, properties, localEndPoint, remoteEndPoint);
+            Assert.Same(properties, c.ConnectionProperties);
+            Assert.Same(localEndPoint, c.LocalEndPoint);
+            Assert.Same(remoteEndPoint, c.RemoteEndPoint);
+        }
+
+        [Fact]
+        public void FromPipe_PropertiesInitialized()
+        {
+            var properties = new DummyConnectionProperties();
+            var localEndPoint = new IPEndPoint(IPAddress.Any, 1);
+            var remoteEndPoint = new IPEndPoint(IPAddress.Any, 2);
+
+            Connection c = Connection.FromPipe(new MockPipe(), leaveOpen: false, properties, localEndPoint, remoteEndPoint);
+            Assert.Same(properties, c.ConnectionProperties);
+            Assert.Same(localEndPoint, c.LocalEndPoint);
+            Assert.Same(remoteEndPoint, c.RemoteEndPoint);
+        }
+
+        private sealed class DummyConnectionProperties : IConnectionProperties
+        {
+            public bool TryGet(Type propertyKey, [NotNullWhen(true)] out object property)
+            {
+                throw new NotImplementedException();
+            }
+        }
+    }
+}
diff --git a/src/libraries/System.Net.Connections/tests/System.Net.Connections.Tests/ConnectionWithoutStreamOrPipe.cs b/src/libraries/System.Net.Connections/tests/System.Net.Connections.Tests/ConnectionWithoutStreamOrPipe.cs
new file mode 100644 (file)
index 0000000..16e5238
--- /dev/null
@@ -0,0 +1,22 @@
+// Licensed to the .NET Foundation under one or more agreements.
+// The .NET Foundation licenses this file to you under the MIT license.
+
+using System.Threading;
+using System.Threading.Tasks;
+
+namespace System.Net.Connections.Tests
+{
+    internal class ConnectionWithoutStreamOrPipe : Connection
+    {
+        public override IConnectionProperties ConnectionProperties => throw new NotImplementedException();
+
+        public override EndPoint LocalEndPoint => throw new NotImplementedException();
+
+        public override EndPoint RemoteEndPoint => throw new NotImplementedException();
+
+        protected override ValueTask CloseAsyncCore(ConnectionCloseMethod method, CancellationToken cancellationToken)
+        {
+            throw new NotImplementedException();
+        }
+    }
+}
diff --git a/src/libraries/System.Net.Connections/tests/System.Net.Connections.Tests/MockConnection.cs b/src/libraries/System.Net.Connections/tests/System.Net.Connections.Tests/MockConnection.cs
new file mode 100644 (file)
index 0000000..f1acf75
--- /dev/null
@@ -0,0 +1,33 @@
+// Licensed to the .NET Foundation under one or more agreements.
+// The .NET Foundation licenses this file to you under the MIT license.
+
+using System.IO;
+using System.IO.Pipelines;
+using System.Threading;
+using System.Threading.Tasks;
+
+namespace System.Net.Connections.Tests
+{
+    internal class MockConnection : Connection
+    {
+        public Func<IConnectionProperties> OnConnectionProperties { get; set; }
+        public Func<EndPoint> OnLocalEndPoint { get; set; }
+        public Func<EndPoint> OnRemoteEndPoint { get; set; }
+        public Func<ConnectionCloseMethod, CancellationToken, ValueTask> OnCloseAsyncCore { get; set; }
+        public Func<IDuplexPipe> OnCreatePipe { get; set; }
+        public Func<Stream> OnCreateStream { get; set; }
+
+        public override IConnectionProperties ConnectionProperties => OnConnectionProperties();
+
+        public override EndPoint LocalEndPoint => OnLocalEndPoint();
+
+        public override EndPoint RemoteEndPoint => OnRemoteEndPoint();
+
+        protected override ValueTask CloseAsyncCore(ConnectionCloseMethod method, CancellationToken cancellationToken) =>
+            OnCloseAsyncCore(method, cancellationToken);
+
+        protected override IDuplexPipe CreatePipe() => OnCreatePipe != null ? OnCreatePipe() : base.CreatePipe();
+
+        protected override Stream CreateStream() => OnCreateStream != null ? OnCreateStream() : base.CreateStream();
+    }
+}
diff --git a/src/libraries/System.Net.Connections/tests/System.Net.Connections.Tests/MockPipe.cs b/src/libraries/System.Net.Connections/tests/System.Net.Connections.Tests/MockPipe.cs
new file mode 100644 (file)
index 0000000..6abbb9a
--- /dev/null
@@ -0,0 +1,21 @@
+// Licensed to the .NET Foundation under one or more agreements.
+// The .NET Foundation licenses this file to you under the MIT license.
+
+using System.IO.Pipelines;
+using System.Threading.Tasks;
+
+namespace System.Net.Connections.Tests
+{
+    internal class MockPipe : IDuplexPipe, IAsyncDisposable
+    {
+        public Func<ValueTask> OnDisposeAsync { get; set; }
+        public PipeReader Input { get; set; }
+
+        public PipeWriter Output { get; set; }
+
+        public ValueTask DisposeAsync()
+        {
+            return OnDisposeAsync?.Invoke() ?? default;
+        }
+    }
+}
diff --git a/src/libraries/System.Net.Connections/tests/System.Net.Connections.Tests/MockPipeReader.cs b/src/libraries/System.Net.Connections/tests/System.Net.Connections.Tests/MockPipeReader.cs
new file mode 100644 (file)
index 0000000..2413bf1
--- /dev/null
@@ -0,0 +1,44 @@
+// Licensed to the .NET Foundation under one or more agreements.
+// The .NET Foundation licenses this file to you under the MIT license.
+
+using System.IO.Pipelines;
+using System.Threading;
+using System.Threading.Tasks;
+
+namespace System.Net.Connections.Tests
+{
+    internal class MockPipeReader : PipeReader
+    {
+        public Action<SequencePosition, SequencePosition> OnAdvanceTo { get; set; }
+        public Action OnCancelPendingRead { get; set; }
+        public Action<Exception> OnComplete { get; set; }
+        public Func<Exception,ValueTask> OnCompleteAsync { get; set; }
+        public Func<CancellationToken, ValueTask<ReadResult>> OnReadAsync { get; set; }
+        public Func<ReadResult?> OnTryRead { get; set; }
+
+        public override void AdvanceTo(SequencePosition consumed)
+            => OnAdvanceTo(consumed, consumed);
+
+        public override void AdvanceTo(SequencePosition consumed, SequencePosition examined)
+            => OnAdvanceTo(consumed, examined);
+
+        public override void CancelPendingRead()
+            => OnCancelPendingRead();
+
+        public override void Complete(Exception exception = null)
+            => OnComplete(exception);
+
+        public override ValueTask<ReadResult> ReadAsync(CancellationToken cancellationToken = default)
+            => OnReadAsync(cancellationToken);
+
+        public override bool TryRead(out ReadResult result)
+        {
+            ReadResult? r = OnTryRead();
+            result = r.GetValueOrDefault();
+            return r.HasValue;
+        }
+
+        public override ValueTask CompleteAsync(Exception exception = null)
+            => OnCompleteAsync(exception);
+    }
+}
diff --git a/src/libraries/System.Net.Connections/tests/System.Net.Connections.Tests/MockPipeWriter.cs b/src/libraries/System.Net.Connections/tests/System.Net.Connections.Tests/MockPipeWriter.cs
new file mode 100644 (file)
index 0000000..7c35e19
--- /dev/null
@@ -0,0 +1,40 @@
+// Licensed to the .NET Foundation under one or more agreements.
+// The .NET Foundation licenses this file to you under the MIT license.
+
+using System.IO.Pipelines;
+using System.Threading;
+using System.Threading.Tasks;
+
+namespace System.Net.Connections.Tests
+{
+    internal class MockPipeWriter : PipeWriter
+    {
+        public Action<int> OnAdvance { get; set; }
+        public Action OnCancelPendingFlush { get; set; }
+        public Action<Exception> OnComplete { get; set; }
+        public Func<Exception,ValueTask> OnCompleteAsync { get; set; }
+        public Func<CancellationToken,ValueTask<FlushResult>> OnFlushAsync { get; set; }
+        public Func<int, Memory<byte>> OnGetMemory { get; set; }
+
+        public override void Advance(int bytes)
+            => OnAdvance(bytes);
+
+        public override void CancelPendingFlush()
+            => OnCancelPendingFlush();
+
+        public override void Complete(Exception exception = null)
+            => OnComplete(exception);
+
+        public override ValueTask CompleteAsync(Exception exception = null)
+            => OnCompleteAsync(exception);
+
+        public override ValueTask<FlushResult> FlushAsync(CancellationToken cancellationToken = default)
+            => OnFlushAsync(cancellationToken);
+
+        public override Memory<byte> GetMemory(int sizeHint = 0)
+            => OnGetMemory(sizeHint);
+
+        public override Span<byte> GetSpan(int sizeHint = 0)
+            => GetMemory(sizeHint).Span;
+    }
+}
diff --git a/src/libraries/System.Net.Connections/tests/System.Net.Connections.Tests/MockStream.cs b/src/libraries/System.Net.Connections/tests/System.Net.Connections.Tests/MockStream.cs
new file mode 100644 (file)
index 0000000..50dbc31
--- /dev/null
@@ -0,0 +1,107 @@
+// Licensed to the .NET Foundation under one or more agreements.
+// The .NET Foundation licenses this file to you under the MIT license.
+
+using System.IO;
+using System.Threading;
+using System.Threading.Tasks;
+
+namespace System.Net.Connections.Tests
+{
+    internal class MockStream : Stream
+    {
+        public Func<Memory<byte>, CancellationToken, ValueTask<int>> OnReadAsync { get; set; }
+        public Func<ReadOnlyMemory<byte>, CancellationToken, ValueTask> OnWriteAsync { get; set; }
+        public Func<CancellationToken, Task> OnFlushAsync { get; set; }
+        public Func<ValueTask> OnDisposeAsync { get; set; }
+
+        public override bool CanRead => true;
+
+        public override bool CanSeek => false;
+
+        public override bool CanWrite => true;
+
+        public override long Length => throw new NotImplementedException();
+
+        public override long Position { get => throw new NotImplementedException(); set => throw new NotImplementedException(); }
+
+        protected override void Dispose(bool disposing)
+        {
+            if (disposing) DisposeAsync().AsTask().GetAwaiter().GetResult();
+        }
+
+        public override ValueTask DisposeAsync()
+        {
+            return OnDisposeAsync();
+        }
+
+        public override void Flush()
+        {
+            FlushAsync().GetAwaiter().GetResult();
+        }
+
+        public override Task FlushAsync(CancellationToken cancellationToken)
+        {
+            return OnFlushAsync(cancellationToken);
+        }
+
+        public override int Read(byte[] buffer, int offset, int count)
+        {
+            return ReadAsync(buffer, offset, count).GetAwaiter().GetResult();
+        }
+
+        public override Task<int> ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
+        {
+            return ReadAsync(buffer.AsMemory(offset, count), cancellationToken).AsTask();
+        }
+
+        public override ValueTask<int> ReadAsync(Memory<byte> buffer, CancellationToken cancellationToken = default)
+        {
+            return OnReadAsync(buffer, cancellationToken);
+        }
+
+        public override IAsyncResult BeginRead(byte[] buffer, int offset, int count, AsyncCallback callback, object state)
+        {
+            return TaskToApm.Begin(ReadAsync(buffer, offset, count), callback, state);
+        }
+
+        public override int EndRead(IAsyncResult asyncResult)
+        {
+            return TaskToApm.End<int>(asyncResult);
+        }
+
+        public override long Seek(long offset, SeekOrigin origin)
+        {
+            throw new NotImplementedException();
+        }
+
+        public override void SetLength(long value)
+        {
+            throw new NotImplementedException();
+        }
+
+        public override void Write(byte[] buffer, int offset, int count)
+        {
+            WriteAsync(buffer, offset, count).GetAwaiter().GetResult();
+        }
+
+        public override Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
+        {
+            return WriteAsync(buffer.AsMemory(offset, count), cancellationToken).AsTask();
+        }
+
+        public override ValueTask WriteAsync(ReadOnlyMemory<byte> buffer, CancellationToken cancellationToken = default)
+        {
+            return OnWriteAsync(buffer, cancellationToken);
+        }
+
+        public override IAsyncResult BeginWrite(byte[] buffer, int offset, int count, AsyncCallback callback, object state)
+        {
+            return TaskToApm.Begin(WriteAsync(buffer, offset, count), callback, state);
+        }
+
+        public override void EndWrite(IAsyncResult asyncResult)
+        {
+            TaskToApm.End(asyncResult);
+        }
+    }
+}
diff --git a/src/libraries/System.Net.Connections/tests/System.Net.Connections.Tests/System.Net.Connections.Tests.csproj b/src/libraries/System.Net.Connections/tests/System.Net.Connections.Tests/System.Net.Connections.Tests.csproj
new file mode 100644 (file)
index 0000000..9aa0112
--- /dev/null
@@ -0,0 +1,19 @@
+<Project Sdk="Microsoft.NET.Sdk">
+
+  <PropertyGroup>
+    <TargetFrameworks>$(NetCoreAppCurrent)</TargetFrameworks>
+  </PropertyGroup>
+
+  <ItemGroup>
+    <Compile Include="$(CommonPath)System\Threading\Tasks\TaskToApm.cs" Link="Common\System\Threading\Tasks\TaskToApm.cs" />
+    <Compile Include="ConnectionBaseTest.cs" />
+    <Compile Include="ConnectionTest.cs" />
+    <Compile Include="ConnectionWithoutStreamOrPipe.cs" />
+    <Compile Include="MockConnection.cs" />
+    <Compile Include="MockPipe.cs" />
+    <Compile Include="MockPipeReader.cs" />
+    <Compile Include="MockPipeWriter.cs" />
+    <Compile Include="MockStream.cs" />
+  </ItemGroup>
+
+</Project>
index 3386d32..31db0e5 100644 (file)
@@ -282,11 +282,21 @@ namespace System.Net.Http
         protected override System.Threading.Tasks.Task SerializeToStreamAsync(System.IO.Stream stream, System.Net.TransportContext? context, System.Threading.CancellationToken cancellationToken) { throw null; }
         protected internal override bool TryComputeLength(out long length) { throw null; }
     }
+    public partial class SocketsHttpConnectionFactory : System.Net.Connections.ConnectionFactory
+    {
+        public SocketsHttpConnectionFactory() { }
+        public sealed override System.Threading.Tasks.ValueTask<System.Net.Connections.Connection> ConnectAsync(System.Net.EndPoint? endPoint, System.Net.Connections.IConnectionProperties? options = null, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
+        public virtual System.Net.Sockets.Socket CreateSocket(System.Net.Http.HttpRequestMessage message, System.Net.EndPoint? endPoint, System.Net.Connections.IConnectionProperties options) { throw null; }
+        protected override void Dispose(bool disposing) { }
+        protected override System.Threading.Tasks.ValueTask DisposeAsyncCore() { throw null; }
+        public virtual System.Threading.Tasks.ValueTask<System.Net.Connections.Connection> EstablishConnectionAsync(System.Net.Http.HttpRequestMessage message, System.Net.EndPoint? endPoint, System.Net.Connections.IConnectionProperties options, System.Threading.CancellationToken cancellationToken) { throw null; }
+    }
     public sealed partial class SocketsHttpHandler : System.Net.Http.HttpMessageHandler
     {
         public SocketsHttpHandler() { }
         public bool AllowAutoRedirect { get { throw null; } set { } }
         public System.Net.DecompressionMethods AutomaticDecompression { get { throw null; } set { } }
+        public System.Net.Connections.ConnectionFactory? ConnectionFactory { get { throw null; } set { } }
         public System.TimeSpan ConnectTimeout { get { throw null; } set { } }
         [System.Diagnostics.CodeAnalysis.AllowNullAttribute]
         public System.Net.CookieContainer CookieContainer { get { throw null; } set { } }
@@ -297,6 +307,7 @@ namespace System.Net.Http
         public int MaxConnectionsPerServer { get { throw null; } set { } }
         public int MaxResponseDrainSize { get { throw null; } set { } }
         public int MaxResponseHeadersLength { get { throw null; } set { } }
+        public System.Func<System.Net.Http.HttpRequestMessage, System.Net.Connections.Connection, System.Threading.CancellationToken, System.Threading.Tasks.ValueTask<System.Net.Connections.Connection>>? PlaintextFilter { get { throw null; } set { } }
         public System.TimeSpan PooledConnectionIdleTimeout { get { throw null; } set { } }
         public System.TimeSpan PooledConnectionLifetime { get { throw null; } set { } }
         public bool PreAuthenticate { get { throw null; } set { } }
index 9f96fca..296b6c0 100644 (file)
@@ -9,6 +9,8 @@
   <ItemGroup>
     <ProjectReference Include="..\..\System.Runtime\ref\System.Runtime.csproj" />
     <ProjectReference Include="..\..\System.Net.Primitives\ref\System.Net.Primitives.csproj" />
+    <ProjectReference Include="..\..\System.Net.Sockets\ref\System.Net.Sockets.csproj" />
+    <ProjectReference Include="..\..\System.Net.Connections\ref\System.Net.Connections.csproj" />
     <ProjectReference Include="..\..\System.Net.Security\ref\System.Net.Security.csproj" />
     <ProjectReference Include="..\..\System.Security.Cryptography.X509Certificates\ref\System.Security.Cryptography.X509Certificates.csproj" />
     <ProjectReference Include="..\..\System.Text.Encoding\ref\System.Text.Encoding.csproj" />
index 62a0334..5732676 100644 (file)
     <Compile Include="System\Net\Http\SocketsHttpHandler\RedirectHandler.cs" />
     <Compile Include="System\Net\Http\SocketsHttpHandler\SocketsHttpHandler.cs" />
     <Compile Include="System\Net\Http\SocketsHttpHandler\SystemProxyInfo.cs" />
+    <Compile Include="System\Net\Http\SocketsHttpHandler\Connections\SocketConnection.cs" />
+    <Compile Include="System\Net\Http\SocketsHttpHandler\Connections\TaskSocketAsyncEventArgs.cs" />
+    <Compile Include="System\Net\Http\SocketsHttpHandler\DnsEndPointWithProperties.cs" />
+    <Compile Include="System\Net\Http\SocketsHttpHandler\SocketsHttpConnectionFactory.cs" />
     <Compile Include="$(CommonPath)System\Net\NTAuthentication.Common.cs"
              Link="Common\System\Net\NTAuthentication.Common.cs" />
     <Compile Include="$(CommonPath)System\Net\ContextFlagsPal.cs"
     <Compile Include="System\Net\Http\SocketsHttpHandler\HttpNoProxy.cs" />
     <Compile Include="System\Net\Http\BrowserHttpHandler\SystemProxyInfo.Browser.cs" />
     <Compile Include="System\Net\Http\BrowserHttpHandler\SocketsHttpHandler.cs" />
+    <Compile Include="System\Net\Http\BrowserHttpHandler\SocketsHttpConnectionFactory.cs" />
     <Compile Include="System\Net\Http\BrowserHttpHandler\BrowserHttpHandler.cs" />
   </ItemGroup>
   <ItemGroup>
     <Reference Include="System.Diagnostics.DiagnosticSource" />
     <Reference Include="System.Diagnostics.Tracing" />
     <Reference Include="System.IO.Compression" />
+    <Reference Include="System.IO.Pipelines" />
     <Reference Include="System.Memory" />
+    <Reference Include="System.Net.Connections" />
     <Reference Include="System.Net.NameResolution" />
     <Reference Include="System.Net.NetworkInformation" />
     <Reference Include="System.Net.Primitives" />
diff --git a/src/libraries/System.Net.Http/src/System/Net/Http/BrowserHttpHandler/SocketsHttpConnectionFactory.cs b/src/libraries/System.Net.Http/src/System/Net/Http/BrowserHttpHandler/SocketsHttpConnectionFactory.cs
new file mode 100644 (file)
index 0000000..664de02
--- /dev/null
@@ -0,0 +1,19 @@
+using System.Threading;
+using System.Threading.Tasks;
+using System.Net.Connections;
+using System.Net.Sockets;
+
+namespace System.Net.Http
+{
+    public class SocketsHttpConnectionFactory : ConnectionFactory
+    {
+        public sealed override ValueTask<Connection> ConnectAsync(EndPoint? endPoint, IConnectionProperties? options = null, CancellationToken cancellationToken = default)
+            => throw new NotImplementedException();
+
+        public virtual Socket CreateSocket(HttpRequestMessage message, EndPoint? endPoint, IConnectionProperties options)
+            => throw new NotImplementedException();
+
+        public virtual ValueTask<Connection> EstablishConnectionAsync(HttpRequestMessage message, EndPoint? endPoint, IConnectionProperties options, CancellationToken cancellationToken)
+            => throw new NotImplementedException();
+    }
+}
index 4644b8b..657d003 100644 (file)
@@ -6,6 +6,7 @@ using System.Net.Security;
 using System.Threading;
 using System.Threading.Tasks;
 using System.Diagnostics.CodeAnalysis;
+using System.Net.Connections;
 
 namespace System.Net.Http
 {
@@ -127,6 +128,18 @@ namespace System.Net.Http
             set => throw new PlatformNotSupportedException();
         }
 
+        public ConnectionFactory? ConnectionFactory
+        {
+            get => throw new PlatformNotSupportedException();
+            set => throw new PlatformNotSupportedException();
+        }
+
+        public Func<HttpRequestMessage, Connection, CancellationToken, ValueTask<Connection>>? PlaintextFilter
+        {
+            get => throw new PlatformNotSupportedException();
+            set => throw new PlatformNotSupportedException();
+        }
+
         public IDictionary<string, object?> Properties => throw new PlatformNotSupportedException();
 
         protected internal override Task<HttpResponseMessage> SendAsync(
index 48d530c..467e813 100644 (file)
@@ -3,11 +3,10 @@
 
 using System.Diagnostics;
 using System.IO;
+using System.Net.Connections;
 using System.Net.Quic;
 using System.Net.Security;
 using System.Net.Sockets;
-using System.Runtime.CompilerServices;
-using System.Runtime.ExceptionServices;
 using System.Security.Cryptography.X509Certificates;
 using System.Threading;
 using System.Threading.Tasks;
@@ -33,58 +32,23 @@ namespace System.Net.Http
             }
         }
 
-        public static ValueTask<Stream> ConnectAsync(string host, int port, bool async, CancellationToken cancellationToken)
+        public static async ValueTask<Connection> ConnectAsync(ConnectionFactory factory, DnsEndPoint endPoint, IConnectionProperties? options, CancellationToken cancellationToken)
         {
-            return async ? ConnectAsync(host, port, cancellationToken) : new ValueTask<Stream>(Connect(host, port, cancellationToken));
-        }
-
-        private static async ValueTask<Stream> ConnectAsync(string host, int port, CancellationToken cancellationToken)
-        {
-            // Rather than creating a new Socket and calling ConnectAsync on it, we use the static
-            // Socket.ConnectAsync with a SocketAsyncEventArgs, as we can then use Socket.CancelConnectAsync
-            // to cancel it if needed.
-            var saea = new ConnectEventArgs();
             try
             {
-                saea.Initialize(cancellationToken);
-
-                // Configure which server to which to connect.
-                saea.RemoteEndPoint = new DnsEndPoint(host, port);
-
-                // Initiate the connection.
-                if (Socket.ConnectAsync(SocketType.Stream, ProtocolType.Tcp, saea))
-                {
-                    // Connect completing asynchronously. Enable it to be canceled and wait for it.
-                    using (cancellationToken.UnsafeRegister(static s => Socket.CancelConnectAsync((SocketAsyncEventArgs)s!), saea))
-                    {
-                        await saea.Builder.Task.ConfigureAwait(false);
-                    }
-                }
-                else if (saea.SocketError != SocketError.Success)
-                {
-                    // Connect completed synchronously but unsuccessfully.
-                    throw new SocketException((int)saea.SocketError);
-                }
-
-                Debug.Assert(saea.SocketError == SocketError.Success, $"Expected Success, got {saea.SocketError}.");
-                Debug.Assert(saea.ConnectSocket != null, "Expected non-null socket");
-
-                // Configure the socket and return a stream for it.
-                Socket socket = saea.ConnectSocket;
-                socket.NoDelay = true;
-                return new NetworkStream(socket, ownsSocket: true);
+                return await factory.ConnectAsync(endPoint, options, cancellationToken).ConfigureAwait(false);
             }
-            catch (Exception error) when (!(error is OperationCanceledException))
+            catch (OperationCanceledException ex) when (ex.CancellationToken == cancellationToken)
             {
-                throw CreateWrappedException(error, host, port, cancellationToken);
+                throw CancellationHelper.CreateOperationCanceledException(innerException: null, cancellationToken);
             }
-            finally
+            catch (Exception ex)
             {
-                saea.Dispose();
+                throw CreateWrappedException(ex, endPoint.Host, endPoint.Port, cancellationToken);
             }
         }
 
-        private static Stream Connect(string host, int port, CancellationToken cancellationToken)
+        public static Connection Connect(string host, int port, CancellationToken cancellationToken)
         {
             // For synchronous connections, we can just create a socket and make the connection.
             cancellationToken.ThrowIfCancellationRequested();
@@ -96,59 +60,14 @@ namespace System.Net.Http
                 {
                     socket.Connect(new DnsEndPoint(host, port));
                 }
-
-                return new NetworkStream(socket, ownsSocket: true);
             }
             catch (Exception e)
             {
                 socket.Dispose();
                 throw CreateWrappedException(e, host, port, cancellationToken);
             }
-        }
-
-        /// <summary>SocketAsyncEventArgs that carries with it additional state for a Task builder and a CancellationToken.</summary>
-        private sealed class ConnectEventArgs : SocketAsyncEventArgs
-        {
-            internal ConnectEventArgs() :
-                // The OnCompleted callback serves just to complete a task that's awaited in ConnectAsync,
-                // so we don't need to also flow ExecutionContext again into the OnCompleted callback.
-                base(unsafeSuppressExecutionContextFlow: true)
-            {
-            }
-
-            public AsyncTaskMethodBuilder Builder { get; private set; }
-            public CancellationToken CancellationToken { get; private set; }
 
-            public void Initialize(CancellationToken cancellationToken)
-            {
-                CancellationToken = cancellationToken;
-                AsyncTaskMethodBuilder b = default;
-                _ = b.Task; // force initialization
-                Builder = b;
-            }
-
-            protected override void OnCompleted(SocketAsyncEventArgs _)
-            {
-                switch (SocketError)
-                {
-                    case SocketError.Success:
-                        Builder.SetResult();
-                        break;
-
-                    case SocketError.OperationAborted:
-                    case SocketError.ConnectionAborted:
-                        if (CancellationToken.IsCancellationRequested)
-                        {
-                            Builder.SetException(ExceptionDispatchInfo.SetCurrentStackTrace(CancellationHelper.CreateOperationCanceledException(null, CancellationToken)));
-                            break;
-                        }
-                        goto default;
-
-                    default:
-                        Builder.SetException(ExceptionDispatchInfo.SetCurrentStackTrace(new SocketException((int)SocketError)));
-                        break;
-                }
-            }
+            return new SocketConnection(socket);
         }
 
         public static ValueTask<SslStream> EstablishSslConnectionAsync(SslClientAuthenticationOptions sslOptions, HttpRequestMessage request, bool async, Stream stream, CancellationToken cancellationToken)
diff --git a/src/libraries/System.Net.Http/src/System/Net/Http/SocketsHttpHandler/Connections/SocketConnection.cs b/src/libraries/System.Net.Http/src/System/Net/Http/SocketsHttpHandler/Connections/SocketConnection.cs
new file mode 100644 (file)
index 0000000..37b6f83
--- /dev/null
@@ -0,0 +1,101 @@
+// Licensed to the .NET Foundation under one or more agreements.
+// The .NET Foundation licenses this file to you under the MIT license.
+
+using System.Diagnostics.CodeAnalysis;
+using System.IO;
+using System.Net.Sockets;
+using System.Threading;
+using System.Threading.Tasks;
+
+namespace System.Net.Connections
+{
+    internal sealed class SocketConnection : Connection, IConnectionProperties
+    {
+        private readonly SocketConnectionNetworkStream _stream;
+
+        public override EndPoint? RemoteEndPoint => _stream.Socket.RemoteEndPoint;
+        public override EndPoint? LocalEndPoint => _stream.Socket.LocalEndPoint;
+        public override IConnectionProperties ConnectionProperties => this;
+
+        public SocketConnection(Socket socket)
+        {
+            _stream = new SocketConnectionNetworkStream(socket, this);
+        }
+
+        protected override ValueTask CloseAsyncCore(ConnectionCloseMethod method, CancellationToken cancellationToken)
+        {
+            if (cancellationToken.IsCancellationRequested)
+            {
+                return ValueTask.FromCanceled(cancellationToken);
+            }
+
+            try
+            {
+                if (method != ConnectionCloseMethod.GracefulShutdown)
+                {
+                    // Dispose must be called first in order to cause a connection reset,
+                    // as NetworkStream.Dispose() will call Shutdown(Both).
+                    _stream.Socket.Dispose();
+                }
+
+                _stream.DisposeWithoutClosingConnection();
+            }
+            catch (Exception ex)
+            {
+                return ValueTask.FromException(ex);
+            }
+
+            return default;
+        }
+
+        protected override Stream CreateStream() => _stream;
+
+        bool IConnectionProperties.TryGet(Type propertyKey, [NotNullWhen(true)] out object? property)
+        {
+            if (propertyKey == typeof(Socket))
+            {
+                property = _stream.Socket;
+                return true;
+            }
+
+            property = null;
+            return false;
+        }
+
+        // This is done to couple disposal of the SocketConnection and the NetworkStream.
+        private sealed class SocketConnectionNetworkStream : NetworkStream
+        {
+            private readonly SocketConnection _connection;
+
+            public SocketConnectionNetworkStream(Socket socket, SocketConnection connection) : base(socket, ownsSocket: true)
+            {
+                _connection = connection;
+            }
+
+            public void DisposeWithoutClosingConnection()
+            {
+                base.Dispose(true);
+            }
+
+            protected override void Dispose(bool disposing)
+            {
+                if (disposing)
+                {
+                    // This will call base.Dispose().
+                    _connection.Dispose();
+                }
+                else
+                {
+                    base.Dispose(disposing);
+                }
+            }
+
+            public override ValueTask DisposeAsync()
+            {
+                // This will call base.Dispose().
+                Dispose(true);
+                return default;
+            }
+        }
+    }
+}
diff --git a/src/libraries/System.Net.Http/src/System/Net/Http/SocketsHttpHandler/Connections/TaskSocketAsyncEventArgs.cs b/src/libraries/System.Net.Http/src/System/Net/Http/SocketsHttpHandler/Connections/TaskSocketAsyncEventArgs.cs
new file mode 100644 (file)
index 0000000..7bf8947
--- /dev/null
@@ -0,0 +1,32 @@
+// Licensed to the .NET Foundation under one or more agreements.
+// The .NET Foundation licenses this file to you under the MIT license.
+
+using System.Net.Sockets;
+using System.Threading.Tasks;
+using System.Threading.Tasks.Sources;
+
+namespace System.Net.Connections
+{
+    internal sealed class TaskSocketAsyncEventArgs : SocketAsyncEventArgs, IValueTaskSource
+    {
+        private ManualResetValueTaskSourceCore<int> _valueTaskSource;
+
+        public void ResetTask() => _valueTaskSource.Reset();
+        public ValueTask Task => new ValueTask(this, _valueTaskSource.Version);
+
+        public void GetResult(short token) => _valueTaskSource.GetResult(token);
+        public ValueTaskSourceStatus GetStatus(short token) => _valueTaskSource.GetStatus(token);
+        public void OnCompleted(Action<object?> continuation, object? state, short token, ValueTaskSourceOnCompletedFlags flags) => _valueTaskSource.OnCompleted(continuation, state, token, flags);
+        public void Complete() => _valueTaskSource.SetResult(0);
+
+        public TaskSocketAsyncEventArgs()
+            : base(unsafeSuppressExecutionContextFlow: true)
+        {
+        }
+
+        protected override void OnCompleted(SocketAsyncEventArgs e)
+        {
+            _valueTaskSource.SetResult(0);
+        }
+    }
+}
diff --git a/src/libraries/System.Net.Http/src/System/Net/Http/SocketsHttpHandler/DnsEndPointWithProperties.cs b/src/libraries/System.Net.Http/src/System/Net/Http/SocketsHttpHandler/DnsEndPointWithProperties.cs
new file mode 100644 (file)
index 0000000..b99801c
--- /dev/null
@@ -0,0 +1,31 @@
+// Licensed to the .NET Foundation under one or more agreements.
+// The .NET Foundation licenses this file to you under the MIT license.
+
+using System.Diagnostics.CodeAnalysis;
+using System.Net.Connections;
+
+namespace System.Net.Http
+{
+    // Passed to a connection factory, merges allocations for the DnsEndPoint and connection properties.
+    internal sealed class DnsEndPointWithProperties : DnsEndPoint, IConnectionProperties
+    {
+        public HttpRequestMessage InitialRequest { get; }
+
+        public DnsEndPointWithProperties(string host, int port, HttpRequestMessage initialRequest) : base(host, port)
+        {
+            InitialRequest = initialRequest;
+        }
+
+        bool IConnectionProperties.TryGet(Type propertyKey, [NotNullWhen(true)] out object? property)
+        {
+            if (propertyKey == typeof(DnsEndPointWithProperties))
+            {
+                property = this;
+                return true;
+            }
+
+            property = null;
+            return false;
+        }
+    }
+}
index b21fd4a..1bd39f4 100644 (file)
@@ -6,6 +6,7 @@ using System.Collections.Generic;
 using System.Diagnostics;
 using System.Diagnostics.CodeAnalysis;
 using System.IO;
+using System.Net.Connections;
 using System.Net.Http.Headers;
 using System.Net.Http.HPack;
 using System.Net.Security;
@@ -21,6 +22,7 @@ namespace System.Net.Http
     {
         private readonly HttpConnectionPool _pool;
         private readonly Stream _stream;
+        private readonly Connection _connection;
 
         // NOTE: These are mutable structs; do not make these readonly.
         private ArrayBuffer _incomingBuffer;
@@ -94,10 +96,11 @@ namespace System.Net.Http
         // Channel options for creating _writeChannel
         private static readonly UnboundedChannelOptions s_channelOptions = new UnboundedChannelOptions() { SingleReader = true };
 
-        public Http2Connection(HttpConnectionPool pool, Stream stream)
+        public Http2Connection(HttpConnectionPool pool, Connection connection)
         {
             _pool = pool;
-            _stream = stream;
+            _stream = connection.Stream;
+            _connection = connection;
             _incomingBuffer = new ArrayBuffer(InitialConnectionBufferSize);
             _outgoingBuffer = new ArrayBuffer(InitialConnectionBufferSize);
 
@@ -116,7 +119,7 @@ namespace System.Net.Http
             _pendingWindowUpdate = 0;
             _idleSinceTickCount = Environment.TickCount64;
 
-            if (NetEventSource.Log.IsEnabled()) TraceConnection(stream);
+            if (NetEventSource.Log.IsEnabled()) TraceConnection(_stream);
         }
 
         private object SyncObject => _httpStreams;
@@ -1585,7 +1588,7 @@ namespace System.Net.Http
             }
 
             // Do shutdown.
-            _stream.Close();
+            _connection.Dispose();
 
             _connectionWindow.Dispose();
             _concurrentStreams.Dispose();
index 780212d..33383a1 100644 (file)
@@ -10,6 +10,7 @@ using System.IO;
 using System.Net.Http.Headers;
 using System.Net.Security;
 using System.Net.Sockets;
+using System.Net.Connections;
 using System.Runtime.CompilerServices;
 using System.Text;
 using System.Threading;
@@ -45,6 +46,7 @@ namespace System.Net.Http
         private readonly HttpConnectionPool _pool;
         private readonly Socket? _socket; // used for polling; _stream should be used for all reading/writing. _stream owns disposal.
         private readonly Stream _stream;
+        private readonly Connection _connection;
         private readonly TransportContext? _transportContext;
         private readonly WeakReference<HttpConnection> _weakThisRef;
 
@@ -68,16 +70,16 @@ namespace System.Net.Http
 
         public HttpConnection(
             HttpConnectionPool pool,
-            Socket? socket,
-            Stream stream,
+            Connection connection,
             TransportContext? transportContext)
         {
             Debug.Assert(pool != null);
-            Debug.Assert(stream != null);
+            Debug.Assert(connection != null);
 
             _pool = pool;
-            _socket = socket; // may be null in cases where we couldn't easily get the underlying socket
-            _stream = stream;
+            connection.ConnectionProperties.TryGet(out _socket); // may be null in cases where we couldn't easily get the underlying socket
+            _stream = connection.Stream;
+            _connection = connection;
             _transportContext = transportContext;
 
             _writeBuffer = new byte[InitialWriteBufferSize];
@@ -101,7 +103,7 @@ namespace System.Net.Http
                 if (disposing)
                 {
                     GC.SuppressFinalize(this);
-                    _stream.Dispose();
+                    _connection.Dispose();
 
                     // Eat any exceptions from the read-ahead task.  We don't need to log, as we expect
                     // failures from this task due to closing the connection while a read is in progress.
@@ -1917,7 +1919,7 @@ namespace System.Net.Http
 
     internal sealed class HttpConnectionWithFinalizer : HttpConnection
     {
-        public HttpConnectionWithFinalizer(HttpConnectionPool pool, Socket? socket, Stream stream, TransportContext? transportContext) : base(pool, socket, stream, transportContext) { }
+        public HttpConnectionWithFinalizer(HttpConnectionPool pool, Connection connection, TransportContext? transportContext) : base(pool, connection, transportContext) { }
 
         // This class is separated from HttpConnection so we only pay the price of having a finalizer
         // when it's actually needed, e.g. when MaxConnectionsPerServer is enabled.
index 072c9c2..af9e2cd 100644 (file)
@@ -6,6 +6,7 @@ using System.Collections.Generic;
 using System.Diagnostics;
 using System.Globalization;
 using System.IO;
+using System.Net.Connections;
 using System.Net.Http.Headers;
 using System.Net.Http.HPack;
 using System.Net.Http.QPack;
@@ -509,7 +510,7 @@ namespace System.Net.Http
             }
 
             // Try to establish an HTTP2 connection
-            Socket? socket = null;
+            Connection? connection = null;
             SslStream? sslStream = null;
             TransportContext? transportContext = null;
 
@@ -531,18 +532,28 @@ namespace System.Net.Http
                         Trace("Attempting new HTTP2 connection.");
                     }
 
-                    Stream? stream;
                     HttpResponseMessage? failureResponse;
-                    (socket, stream, transportContext, failureResponse) =
+
+                    (connection, transportContext, failureResponse) =
                         await ConnectAsync(request, async, true, cancellationToken).ConfigureAwait(false);
+
                     if (failureResponse != null)
                     {
                         return (null, true, failureResponse);
                     }
 
+                    Debug.Assert(connection != null);
+
+                    sslStream = connection.Stream as SslStream;
+
+                    if (Settings._plaintextFilter != null)
+                    {
+                        connection = await Settings._plaintextFilter(request, connection, cancellationToken).ConfigureAwait(false);
+                    }
+
                     if (_kind == HttpConnectionKind.Http)
                     {
-                        http2Connection = new Http2Connection(this, stream!);
+                        http2Connection = new Http2Connection(this, connection);
                         await http2Connection.SetupAsync().ConfigureAwait(false);
 
                         AddHttp2Connection(http2Connection);
@@ -555,7 +566,8 @@ namespace System.Net.Http
                         return (http2Connection, true, null);
                     }
 
-                    sslStream = (SslStream)stream!;
+                    Debug.Assert(sslStream != null);
+
                     if (sslStream.NegotiatedApplicationProtocol == SslApplicationProtocol.Http2)
                     {
                         // The server accepted our request for HTTP2.
@@ -565,7 +577,7 @@ namespace System.Net.Http
                             throw new HttpRequestException(SR.Format(SR.net_ssl_http2_requires_tls12, sslStream.SslProtocol));
                         }
 
-                        http2Connection = new Http2Connection(this, sslStream);
+                        http2Connection = new Http2Connection(this, connection);
                         await http2Connection.SetupAsync().ConfigureAwait(false);
 
                         AddHttp2Connection(http2Connection);
@@ -616,7 +628,7 @@ namespace System.Net.Http
 
                 if (canUse)
                 {
-                    return (ConstructHttp11Connection(socket, sslStream, transportContext), true, null);
+                    return (ConstructHttp11Connection(connection!, transportContext), true, null);
                 }
                 else
                 {
@@ -625,7 +637,7 @@ namespace System.Net.Http
                         Trace("Discarding downgraded HTTP/1.1 connection because connection limit is exceeded");
                     }
 
-                    sslStream.Close();
+                    await connection!.CloseAsync(ConnectionCloseMethod.GracefulShutdown, cancellationToken).ConfigureAwait(false);
                 }
             }
 
@@ -1124,7 +1136,7 @@ namespace System.Net.Http
             return SendWithProxyAuthAsync(request, async, doRequestAuth, cancellationToken);
         }
 
-        private async ValueTask<(Socket?, Stream?, TransportContext?, HttpResponseMessage?)> ConnectAsync(HttpRequestMessage request, bool async, bool allowHttp2, CancellationToken cancellationToken)
+        private async ValueTask<(Connection?, TransportContext?, HttpResponseMessage?)> ConnectAsync(HttpRequestMessage request, bool async, bool allowHttp2, CancellationToken cancellationToken)
         {
             // If a non-infinite connect timeout has been set, create and use a new CancellationToken that will be canceled
             // when either the original token is canceled or a connect timeout occurs.
@@ -1138,44 +1150,44 @@ namespace System.Net.Http
 
             try
             {
-                Stream? stream = null;
+                Connection? connection = null;
                 switch (_kind)
                 {
                     case HttpConnectionKind.Http:
                     case HttpConnectionKind.Https:
                     case HttpConnectionKind.ProxyConnect:
                         Debug.Assert(_originAuthority != null);
-                        stream = await ConnectHelper.ConnectAsync(_originAuthority.IdnHost, _originAuthority.Port, async, cancellationToken).ConfigureAwait(false);
+                        connection = await ConnectToTcpHostAsync(_originAuthority.IdnHost, _originAuthority.Port, request, async, cancellationToken).ConfigureAwait(false);
                         break;
 
                     case HttpConnectionKind.Proxy:
-                        stream = await ConnectHelper.ConnectAsync(_proxyUri!.IdnHost, _proxyUri.Port, async, cancellationToken).ConfigureAwait(false);
+                        connection = await ConnectToTcpHostAsync(_proxyUri!.IdnHost, _proxyUri.Port, request, async, cancellationToken).ConfigureAwait(false);
                         break;
 
                     case HttpConnectionKind.ProxyTunnel:
                     case HttpConnectionKind.SslProxyTunnel:
                         HttpResponseMessage? response;
-                        (stream, response) = await EstablishProxyTunnel(async, request.HasHeaders ? request.Headers : null, cancellationToken).ConfigureAwait(false);
+                        (connection, response) = await EstablishProxyTunnel(async, request.HasHeaders ? request.Headers : null, cancellationToken).ConfigureAwait(false);
                         if (response != null)
                         {
                             // Return non-success response from proxy.
                             response.RequestMessage = request;
-                            return (null, null, null, response);
+                            return (null, null, response);
                         }
                         break;
                 }
 
-                Socket? socket = (stream as NetworkStream)?.Socket;
+                Debug.Assert(connection != null);
 
                 TransportContext? transportContext = null;
                 if (_kind == HttpConnectionKind.Https || _kind == HttpConnectionKind.SslProxyTunnel)
                 {
-                    SslStream sslStream = await ConnectHelper.EstablishSslConnectionAsync(allowHttp2 ? _sslOptionsHttp2! : _sslOptionsHttp11!, request, async, stream!, cancellationToken).ConfigureAwait(false);
-                    stream = sslStream;
+                    SslStream sslStream = await ConnectHelper.EstablishSslConnectionAsync(allowHttp2 ? _sslOptionsHttp2! : _sslOptionsHttp11!, request, async, connection.Stream, cancellationToken).ConfigureAwait(false);
+                    connection = Connection.FromStream(sslStream, leaveOpen: false, connection.ConnectionProperties, connection.LocalEndPoint, connection.RemoteEndPoint);
                     transportContext = sslStream.TransportContext;
                 }
 
-                return (socket, stream, transportContext, null);
+                return (connection, transportContext, null);
             }
             finally
             {
@@ -1183,9 +1195,37 @@ namespace System.Net.Http
             }
         }
 
+        private ValueTask<Connection> ConnectToTcpHostAsync(string host, int port, HttpRequestMessage initialRequest, bool async, CancellationToken cancellationToken)
+        {
+            if (async)
+            {
+                ConnectionFactory connectionFactory = Settings._connectionFactory ?? SocketsHttpConnectionFactory.Default;
+
+                var endPoint = new DnsEndPointWithProperties(host, port, initialRequest);
+                return ConnectHelper.ConnectAsync(connectionFactory, endPoint, endPoint, cancellationToken);
+            }
+
+            // Synchronous path.
+
+            if (Settings._connectionFactory != null)
+            {
+                // connection factories only support async.
+                throw new HttpRequestException();
+            }
+
+            try
+            {
+                return new ValueTask<Connection>(ConnectHelper.Connect(host, port, cancellationToken));
+            }
+            catch (Exception ex)
+            {
+                return ValueTask.FromException<Connection>(ex);
+            }
+        }
+
         internal async ValueTask<(HttpConnection?, HttpResponseMessage?)> CreateHttp11ConnectionAsync(HttpRequestMessage request, bool async, CancellationToken cancellationToken)
         {
-            (Socket? socket, Stream? stream, TransportContext? transportContext, HttpResponseMessage? failureResponse) =
+            (Connection? connection, TransportContext? transportContext, HttpResponseMessage? failureResponse) =
                 await ConnectAsync(request, async, false, cancellationToken).ConfigureAwait(false);
 
             if (failureResponse != null)
@@ -1193,18 +1233,18 @@ namespace System.Net.Http
                 return (null, failureResponse);
             }
 
-            return (ConstructHttp11Connection(socket, stream!, transportContext), null);
+            return (ConstructHttp11Connection(connection!, transportContext), null);
         }
 
-        private HttpConnection ConstructHttp11Connection(Socket? socket, Stream stream, TransportContext? transportContext)
+        private HttpConnection ConstructHttp11Connection(Connection connection, TransportContext? transportContext)
         {
             return _maxConnections == int.MaxValue ?
-                new HttpConnection(this, socket, stream, transportContext) :
-                new HttpConnectionWithFinalizer(this, socket, stream, transportContext); // finalizer needed to signal the pool when a connection is dropped
+                new HttpConnection(this, connection, transportContext) :
+                new HttpConnectionWithFinalizer(this, connection, transportContext); // finalizer needed to signal the pool when a connection is dropped
         }
 
         // Returns the established stream or an HttpResponseMessage from the proxy indicating failure.
-        private async ValueTask<(Stream?, HttpResponseMessage?)> EstablishProxyTunnel(bool async, HttpRequestHeaders? headers, CancellationToken cancellationToken)
+        private async ValueTask<(Connection?, HttpResponseMessage?)> EstablishProxyTunnel(bool async, HttpRequestHeaders? headers, CancellationToken cancellationToken)
         {
             Debug.Assert(_originAuthority != null);
             // Send a CONNECT request to the proxy server to establish a tunnel.
@@ -1223,7 +1263,12 @@ namespace System.Net.Http
                 return (null, tunnelResponse);
             }
 
-            return (tunnelResponse.Content!.ReadAsStream(cancellationToken), null);
+            Stream stream = tunnelResponse.Content.ReadAsStream(cancellationToken);
+            EndPoint remoteEndPoint = new DnsEndPoint(_originAuthority.IdnHost, _originAuthority.Port);
+
+            // TODO: the Socket from the response can be funneled into a connection property here.
+
+            return (Connection.FromStream(stream, remoteEndPoint: remoteEndPoint), null);
         }
 
         /// <summary>Enqueues a waiter to the waiters list.</summary>
index 4ba7696..558e6cf 100644 (file)
@@ -2,7 +2,10 @@
 // The .NET Foundation licenses this file to you under the MIT license.
 
 using System.Collections.Generic;
+using System.Net.Connections;
 using System.Net.Security;
+using System.Threading;
+using System.Threading.Tasks;
 
 namespace System.Net.Http
 {
@@ -54,6 +57,9 @@ namespace System.Net.Http
 
         internal bool _enableMultipleHttp2Connections;
 
+        internal ConnectionFactory? _connectionFactory;
+        internal Func<HttpRequestMessage, Connection, CancellationToken, ValueTask<Connection>>? _plaintextFilter;
+
         internal IDictionary<string, object?>? _properties;
 
         public HttpConnectionSettings()
@@ -104,7 +110,9 @@ namespace System.Net.Http
                 _useProxy = _useProxy,
                 _allowUnencryptedHttp2 = _allowUnencryptedHttp2,
                 _assumePrenegotiatedHttp3ForTesting = _assumePrenegotiatedHttp3ForTesting,
-                _enableMultipleHttp2Connections = _enableMultipleHttp2Connections
+                _enableMultipleHttp2Connections = _enableMultipleHttp2Connections,
+                _connectionFactory = _connectionFactory,
+                _plaintextFilter = _plaintextFilter
             };
         }
 
diff --git a/src/libraries/System.Net.Http/src/System/Net/Http/SocketsHttpHandler/SocketsHttpConnectionFactory.cs b/src/libraries/System.Net.Http/src/System/Net/Http/SocketsHttpHandler/SocketsHttpConnectionFactory.cs
new file mode 100644 (file)
index 0000000..e224616
--- /dev/null
@@ -0,0 +1,89 @@
+// Licensed to the .NET Foundation under one or more agreements.
+// The .NET Foundation licenses this file to you under the MIT license.
+
+using System.Net.Connections;
+using System.Net.Sockets;
+using System.Runtime.ExceptionServices;
+using System.Threading;
+using System.Threading.Tasks;
+
+namespace System.Net.Http
+{
+    /// <summary>
+    /// The default connection factory used by <see cref="SocketsHttpHandler"/>, opening TCP connections.
+    /// </summary>
+    public class SocketsHttpConnectionFactory : ConnectionFactory
+    {
+        internal static SocketsHttpConnectionFactory Default { get; } = new SocketsHttpConnectionFactory();
+
+        /// <inheritdoc/>
+        public sealed override ValueTask<Connection> ConnectAsync(EndPoint? endPoint, IConnectionProperties? options = null, CancellationToken cancellationToken = default)
+        {
+            if (options == null || !options.TryGet(out DnsEndPointWithProperties? httpOptions))
+            {
+                return ValueTask.FromException<Connection>(ExceptionDispatchInfo.SetCurrentStackTrace(new HttpRequestException($"{nameof(SocketsHttpConnectionFactory)} requires a {nameof(DnsEndPointWithProperties)} property.")));
+            }
+
+            return EstablishConnectionAsync(httpOptions!.InitialRequest, endPoint, options, cancellationToken);
+        }
+
+        /// <summary>
+        /// Creates the socket to be used for a request.
+        /// </summary>
+        /// <param name="message">The request causing this socket to be opened. Once opened, it may be reused for many subsequent requests.</param>
+        /// <param name="endPoint">The EndPoint this socket will be connected to.</param>
+        /// <param name="options">Properties, if any, that might change how the socket is initialized.</param>
+        /// <returns>A new unconnected socket.</returns>
+        public virtual Socket CreateSocket(HttpRequestMessage message, EndPoint? endPoint, IConnectionProperties options)
+        {
+            return new Socket(SocketType.Stream, ProtocolType.Tcp);
+        }
+
+        /// <summary>
+        /// Establishes a new connection for a request.
+        /// </summary>
+        /// <param name="message">The request causing this connection to be established. Once connected, it may be reused for many subsequent requests.</param>
+        /// <param name="endPoint">The EndPoint to connect to.</param>
+        /// <param name="options">Properties, if any, that might change how the connection is made.</param>
+        /// <param name="cancellationToken">A cancellation token for the asynchronous operation.</param>
+        /// <returns>A new open connection.</returns>
+        public virtual async ValueTask<Connection> EstablishConnectionAsync(HttpRequestMessage message, EndPoint? endPoint, IConnectionProperties options, CancellationToken cancellationToken)
+        {
+            if (message == null) throw new ArgumentNullException(nameof(message));
+            if (endPoint == null) throw new ArgumentNullException(nameof(endPoint));
+
+            Socket socket = CreateSocket(message, endPoint, options);
+
+            try
+            {
+                using var args = new TaskSocketAsyncEventArgs();
+                args.RemoteEndPoint = endPoint;
+
+                if (socket.ConnectAsync(args))
+                {
+                    using (cancellationToken.UnsafeRegister(o => Socket.CancelConnectAsync((SocketAsyncEventArgs)o!), args))
+                    {
+                        await args.Task.ConfigureAwait(false);
+                    }
+                }
+
+                if (args.SocketError != SocketError.Success)
+                {
+                    Exception ex = args.SocketError == SocketError.OperationAborted && cancellationToken.IsCancellationRequested
+                        ? (Exception)new OperationCanceledException(cancellationToken)
+                        : new SocketException((int)args.SocketError);
+
+                    throw ex;
+                }
+
+                socket.NoDelay = true;
+                return new SocketConnection(socket);
+            }
+            catch
+            {
+                socket.Dispose();
+                throw;
+            }
+        }
+    }
+}
index 1874723..de391e9 100644 (file)
@@ -7,6 +7,7 @@ using System.Net.Security;
 using System.Threading;
 using System.Threading.Tasks;
 using System.Diagnostics.CodeAnalysis;
+using System.Net.Connections;
 
 namespace System.Net.Http
 {
@@ -288,6 +289,33 @@ namespace System.Net.Http
         internal bool SupportsProxy => true;
         internal bool SupportsRedirectConfiguration => true;
 
+        /// <summary>
+        /// When non-null, a custom factory used to open new TCP connections.
+        /// When null, a <see cref="SocketsHttpConnectionFactory"/> will be used.
+        /// </summary>
+        public ConnectionFactory? ConnectionFactory
+        {
+            get => _settings._connectionFactory;
+            set
+            {
+                CheckDisposedOrStarted();
+                _settings._connectionFactory = value;
+            }
+        }
+
+        /// <summary>
+        /// When non-null, a connection filter that is applied prior to any TLS encryption.
+        /// </summary>
+        public Func<HttpRequestMessage, Connection, CancellationToken, ValueTask<Connection>>? PlaintextFilter
+        {
+            get => _settings._plaintextFilter;
+            set
+            {
+                CheckDisposedOrStarted();
+                _settings._plaintextFilter = value;
+            }
+        }
+
         public IDictionary<string, object?> Properties =>
             _settings._properties ?? (_settings._properties = new Dictionary<string, object?>());
 
index 3a4aef6..4328b0a 100644 (file)
@@ -5,6 +5,7 @@ using System.Collections.Generic;
 using System.Diagnostics;
 using System.IO;
 using System.Linq;
+using System.Net.Connections;
 using System.Net.Quic;
 using System.Net.Security;
 using System.Net.Sockets;
@@ -105,6 +106,71 @@ namespace System.Net.Http.Functional.Tests
         }
     }
 
+    public class SocketsHttpHandler_ConnectionFactoryTest : HttpClientHandlerTestBase
+    {
+        public SocketsHttpHandler_ConnectionFactoryTest(ITestOutputHelper output) : base(output) { }
+
+        [Fact]
+        public async Task CustomConnectionFactory_AsyncRequest_Success()
+        {
+            await using ConnectionListenerFactory listenerFactory = new VirtualNetworkConnectionListenerFactory();
+            await using ConnectionListener listener = await listenerFactory.BindAsync(endPoint: null);
+            await using ConnectionFactory connectionFactory = VirtualNetworkConnectionListenerFactory.GetConnectionFactory(listener);
+
+            // TODO: if GenericLoopbackOptions actually worked for HTTP/1 LoopbackServer we could just use that and pass in to CreateConnectionAsync.
+            // Making that work causes other tests to fail, so for now...
+            bool useHttps = UseVersion.Major >= 2 && new GenericLoopbackOptions().UseSsl;
+
+            Task serverTask = Task.Run(async () =>
+            {
+                await using Connection serverConnection = await listener.AcceptAsync();
+                using GenericLoopbackConnection loopbackConnection = await LoopbackServerFactory.CreateConnectionAsync(socket: null, serverConnection.Stream);
+
+                await loopbackConnection.InitializeConnectionAsync();
+
+                HttpRequestData requestData = await loopbackConnection.ReadRequestDataAsync();
+                await loopbackConnection.SendResponseAsync(content: "foo");
+
+                Assert.Equal("/foo", requestData.Path);
+            });
+
+            Task clientTask = Task.Run(async () =>
+            {
+                using HttpClientHandler handler = CreateHttpClientHandler();
+
+                var socketsHandler = (SocketsHttpHandler)GetUnderlyingSocketsHttpHandler(handler);
+                socketsHandler.ConnectionFactory = connectionFactory;
+
+                using HttpClient client = CreateHttpClient(handler);
+
+                string response = await client.GetStringAsync($"{(useHttps ? "https" : "http")}://{Guid.NewGuid():N}.com/foo");
+                Assert.Equal("foo", response);
+            });
+
+            await new[] { serverTask, clientTask }.WhenAllOrAnyFailed(60_000);
+        }
+
+        [Fact]
+        public async Task CustomConnectionFactory_SyncRequest_Fails()
+        {
+            await using ConnectionFactory connectionFactory = new SocketsHttpConnectionFactory();
+            using SocketsHttpHandler handler = new SocketsHttpHandler
+            {
+                ConnectionFactory = connectionFactory
+            };
+
+            using HttpClient client = CreateHttpClient(handler);
+
+            await Assert.ThrowsAnyAsync<HttpRequestException>(() => client.GetStringAsync($"http://{Guid.NewGuid():N}.com/foo"));
+        }
+    }
+
+    public sealed class SocketsHttpHandler_ConnectionFactoryTest_Http2 : SocketsHttpHandler_ConnectionFactoryTest
+    {
+        public SocketsHttpHandler_ConnectionFactoryTest_Http2(ITestOutputHelper output) : base(output) { }
+        protected override Version UseVersion => HttpVersion.Version20;
+    }
+
     public sealed class SocketsHttpHandler_HttpProtocolTests : HttpProtocolTests
     {
         public SocketsHttpHandler_HttpProtocolTests(ITestOutputHelper output) : base(output) { }
index 2e00a89..bdf9887 100644 (file)
              Link="Common\System\Net\Http\ThrowingContent.cs" />
     <Compile Include="ThrowingContent.netcore.cs" />
     <Compile Include="Watchdog.cs" />
+    <Compile Include="$(CommonTestPath)System\Net\VirtualNetwork\VirtualNetwork.cs" Link="Common\System\Net\VirtualNetwork\VirtualNetwork.cs" />
+    <Compile Include="$(CommonTestPath)System\Net\VirtualNetwork\VirtualNetworkStream.cs" Link="Common\System\Net\VirtualNetwork\VirtualNetworkStream.cs" />
+    <Compile Include="$(CommonTestPath)System\Net\VirtualNetwork\VirtualNetworkConnectionListenerFactory.cs" Link="Common\System\Net\VirtualNetwork\VirtualNetworkConnectionListenerFactory.cs" />
   </ItemGroup>
   <!-- Windows specific files -->
   <ItemGroup Condition=" '$(TargetsWindows)' == 'true'">
index 2c2fd22..5ab3eb9 100644 (file)
         "4.6.0"
       ],
       "BaselineVersion": "5.0.0",
-      "InboxOn": {},
+      "InboxOn": {
+        "net5.0": "5.0.0.0"
+      },
       "AssemblyVersionInPackageVersion": {
         "4.0.0.0": "4.5.0",
         "4.0.0.1": "4.5.2",