using System.Text;
using System.Threading.Tasks;
using System.Security.Authentication;
+using System.IO;
+using System.Net.Sockets;
namespace System.Net.Test.Common
{
public abstract GenericLoopbackServer CreateServer(GenericLoopbackOptions options = null);
public abstract Task CreateServerAsync(Func<GenericLoopbackServer, Uri, Task> funcAsync, int millisecondsTimeout = 60_000, GenericLoopbackOptions options = null);
+ public abstract Task<GenericLoopbackConnection> CreateConnectionAsync(Socket socket, Stream stream, GenericLoopbackOptions options = null);
+
public abstract Version Version { get; }
// Common helper methods
{
public abstract void Dispose();
+ public abstract Task InitializeConnectionAsync();
+
/// <summary>Read request Headers and optionally request body as well.</summary>
public abstract Task<HttpRequestData> ReadRequestDataAsync(bool readBody = true);
/// <summary>Read complete request body if not done by ReadRequestData.</summary>
using System.Net.Http.Functional.Tests;
using System.Net.Security;
using System.Net.Sockets;
+using System.Security.Cryptography.X509Certificates;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
private readonly TimeSpan _timeout;
private int _lastStreamId;
- private readonly byte[] _prefix;
+ private readonly byte[] _prefix = new byte[24];
public string PrefixString => Encoding.UTF8.GetString(_prefix, 0, _prefix.Length);
public bool IsInvalid => _connectionSocket == null;
public Stream Stream => _connectionStream;
public Task<bool> SettingAckWaiter => _ignoredSettingsAckPromise?.Task;
- public Http2LoopbackConnection(Socket socket, Http2Options httpOptions)
- : this(socket, httpOptions, Http2LoopbackServer.Timeout)
+ private Http2LoopbackConnection(Socket socket, Stream stream, TimeSpan timeout)
{
+ _connectionSocket = socket;
+ _connectionStream = stream;
+ _timeout = timeout;
}
- public Http2LoopbackConnection(Socket socket, Http2Options httpOptions, TimeSpan timeout)
+ public static Task<Http2LoopbackConnection> CreateAsync(Socket socket, Stream stream, Http2Options httpOptions)
{
- _connectionSocket = socket;
- _connectionStream = new NetworkStream(_connectionSocket, true);
- _timeout = timeout;
+ return CreateAsync(socket, stream, httpOptions, Http2LoopbackServer.Timeout);
+ }
+ public static async Task<Http2LoopbackConnection> CreateAsync(Socket socket, Stream stream, Http2Options httpOptions, TimeSpan timeout)
+ {
if (httpOptions.UseSsl)
{
- var sslStream = new SslStream(_connectionStream, false, delegate { return true; });
+ var sslStream = new SslStream(stream, false, delegate { return true; });
- using (var cert = Configuration.Certificates.GetServerCertificate())
+ using (X509Certificate2 cert = Configuration.Certificates.GetServerCertificate())
{
#if !NETFRAMEWORK
SslServerAuthenticationOptions options = new SslServerAuthenticationOptions();
options.ClientCertificateRequired = httpOptions.ClientCertificateRequired;
- sslStream.AuthenticateAsServerAsync(options, CancellationToken.None).Wait();
+ await sslStream.AuthenticateAsServerAsync(options, CancellationToken.None).ConfigureAwait(false);
#else
- sslStream.AuthenticateAsServerAsync(cert, httpOptions.ClientCertificateRequired, httpOptions.SslProtocols, checkCertificateRevocation: false).Wait();
+ await sslStream.AuthenticateAsServerAsync(cert, httpOptions.ClientCertificateRequired, httpOptions.SslProtocols, checkCertificateRevocation: false).ConfigureAwait(false);
#endif
}
- _connectionStream = sslStream;
+ stream = sslStream;
}
- _prefix = new byte[24];
- if (!FillBufferAsync(_prefix).Result)
+ var con = new Http2LoopbackConnection(socket, stream, timeout);
+ await con.ReadPrefixAsync().ConfigureAwait(false);
+
+ return con;
+ }
+
+ private async Task ReadPrefixAsync()
+ {
+ if (!await FillBufferAsync(_prefix))
{
throw new Exception("Connection stream closed while attempting to read connection preface.");
}
- else if (Text.Encoding.ASCII.GetString(_prefix).Contains("HTTP/1.1"))
+
+ if (Text.Encoding.ASCII.GetString(_prefix).Contains("HTTP/1.1"))
{
throw new Exception("HTTP 1.1 request received.");
}
public void ShutdownSend()
{
- _connectionSocket.Shutdown(SocketShutdown.Send);
+ _connectionSocket?.Shutdown(SocketShutdown.Send);
}
// This will cause a server-initiated shutdown of the connection.
return (streamId, requestData);
}
+ public override Task InitializeConnectionAsync()
+ {
+ return ReadAndSendSettingsAsync(ackTimeout: null);
+ }
+
+ public async Task<SettingsFrame> ReadAndSendSettingsAsync(TimeSpan? ackTimeout, params SettingsEntry[] settingsEntries)
+ {
+ // Receive the initial client settings frame.
+ Frame receivedFrame = await ReadFrameAsync(_timeout).ConfigureAwait(false);
+ Assert.Equal(FrameType.Settings, receivedFrame.Type);
+ Assert.Equal(FrameFlags.None, receivedFrame.Flags);
+ Assert.Equal(0, receivedFrame.StreamId);
+
+ var clientSettingsFrame = (SettingsFrame)receivedFrame;
+
+ // Receive the initial client window update frame.
+ receivedFrame = await ReadFrameAsync(_timeout).ConfigureAwait(false);
+ Assert.Equal(FrameType.WindowUpdate, receivedFrame.Type);
+ Assert.Equal(FrameFlags.None, receivedFrame.Flags);
+ Assert.Equal(0, receivedFrame.StreamId);
+
+ // Send the initial server settings frame.
+ SettingsFrame settingsFrame = new SettingsFrame(settingsEntries);
+ await WriteFrameAsync(settingsFrame).ConfigureAwait(false);
+
+ // Send the client settings frame ACK.
+ Frame settingsAck = new Frame(0, FrameType.Settings, FrameFlags.Ack, 0);
+ await WriteFrameAsync(settingsAck).ConfigureAwait(false);
+
+ // The client will send us a SETTINGS ACK eventually, but not necessarily right away.
+ await ExpectSettingsAckAsync((int?)ackTimeout?.TotalMilliseconds ?? 5000);
+
+ return clientSettingsFrame;
+ }
+
public async Task SendGoAway(int lastStreamId, ProtocolErrors errorCode = ProtocolErrors.NO_ERROR)
{
GoAwayFrame frame = new GoAwayFrame(lastStreamId, (int)errorCode, new byte[] { }, 0);
Socket connectionSocket = await _listenSocket.AcceptAsync().ConfigureAwait(false);
- Http2LoopbackConnection connection = timeout != null ? new Http2LoopbackConnection(connectionSocket, _options, timeout.Value) : new Http2LoopbackConnection(connectionSocket, _options);
+ var stream = new NetworkStream(connectionSocket, ownsSocket: true);
+ Http2LoopbackConnection connection =
+ timeout != null ? await Http2LoopbackConnection.CreateAsync(connectionSocket, stream, _options, timeout.Value).ConfigureAwait(false) :
+ await Http2LoopbackConnection.CreateAsync(connectionSocket, stream, _options).ConfigureAwait(false);
_connections.Add(connection);
return connection;
public Task<Http2LoopbackConnection> EstablishConnectionAsync(params SettingsEntry[] settingsEntries)
{
- return EstablishConnectionAsync(null, null, settingsEntries);
+ return EstablishConnectionAsync(timeout: null, ackTimeout: null, settingsEntries);
}
public async Task<Http2LoopbackConnection> EstablishConnectionAsync(TimeSpan? timeout, TimeSpan? ackTimeout, params SettingsEntry[] settingsEntries)
public Task<(Http2LoopbackConnection, SettingsFrame)> EstablishConnectionGetSettingsAsync(params SettingsEntry[] settingsEntries)
{
- return EstablishConnectionGetSettingsAsync(null, null, settingsEntries);
+ return EstablishConnectionGetSettingsAsync(timeout: null, ackTimeout: null, settingsEntries);
}
public async Task<(Http2LoopbackConnection, SettingsFrame)> EstablishConnectionGetSettingsAsync(TimeSpan? timeout, TimeSpan? ackTimeout, params SettingsEntry[] settingsEntries)
{
Http2LoopbackConnection connection = await AcceptConnectionAsync(timeout).ConfigureAwait(false);
-
- // Receive the initial client settings frame.
- Frame receivedFrame = await connection.ReadFrameAsync(Timeout).ConfigureAwait(false);
- Assert.Equal(FrameType.Settings, receivedFrame.Type);
- Assert.Equal(FrameFlags.None, receivedFrame.Flags);
- Assert.Equal(0, receivedFrame.StreamId);
-
- var clientSettingsFrame = (SettingsFrame)receivedFrame;
-
- // Receive the initial client window update frame.
- receivedFrame = await connection.ReadFrameAsync(Timeout).ConfigureAwait(false);
- Assert.Equal(FrameType.WindowUpdate, receivedFrame.Type);
- Assert.Equal(FrameFlags.None, receivedFrame.Flags);
- Assert.Equal(0, receivedFrame.StreamId);
-
- // Send the initial server settings frame.
- SettingsFrame settingsFrame = new SettingsFrame(settingsEntries);
- await connection.WriteFrameAsync(settingsFrame).ConfigureAwait(false);
-
- // Send the client settings frame ACK.
- Frame settingsAck = new Frame(0, FrameType.Settings, FrameFlags.Ack, 0);
- await connection.WriteFrameAsync(settingsAck).ConfigureAwait(false);
-
- // The client will send us a SETTINGS ACK eventually, but not necessarily right away.
- await connection.ExpectSettingsAckAsync((int) (ackTimeout?.TotalMilliseconds ?? 5000));
+ SettingsFrame clientSettingsFrame = await connection.ReadAndSendSettingsAsync(ackTimeout, settingsEntries).ConfigureAwait(false);
return (connection, clientSettingsFrame);
}
public Http2Options()
{
- UseSsl = PlatformDetection.SupportsAlpn && !Capability.Http2ForceUnencryptedLoopback();
SslProtocols = SslProtocols.Tls12;
}
}
}
public override GenericLoopbackServer CreateServer(GenericLoopbackOptions options = null)
+ {
+ return Http2LoopbackServer.CreateServer(CreateOptions(options));
+ }
+
+ public override async Task<GenericLoopbackConnection> CreateConnectionAsync(Socket socket, Stream stream, GenericLoopbackOptions options = null)
+ {
+ return await Http2LoopbackConnection.CreateAsync(socket, stream, CreateOptions(options)).ConfigureAwait(false);
+ }
+
+ private static Http2Options CreateOptions(GenericLoopbackOptions options)
{
Http2Options http2Options = new Http2Options();
if (options != null)
http2Options.UseSsl = options.UseSsl;
http2Options.SslProtocols = options.SslProtocols;
}
-
- return Http2LoopbackServer.CreateServer(http2Options);
+ return http2Options;
}
public override async Task CreateServerAsync(Func<GenericLoopbackServer, Uri, Task> funcAsync, int millisecondsTimeout = 60_000, GenericLoopbackOptions options = null)
using System.Text;
using System.Threading.Tasks;
using System.Linq;
+using System.Net.Http.Functional.Tests;
namespace System.Net.Test.Common
{
return requestId == 0 ? _currentStream : _openStreams[requestId - 1];
}
+ public override Task InitializeConnectionAsync()
+ {
+ throw new NotImplementedException();
+ }
+
public async Task<Http3LoopbackStream> AcceptStreamAsync()
{
QuicStream quicStream = await _connection.AcceptStreamAsync().ConfigureAwait(false);
using System.Collections.Generic;
using System.Diagnostics;
+using System.IO;
using System.Net.Quic;
using System.Net.Security;
+using System.Net.Sockets;
using System.Security.Cryptography.X509Certificates;
using System.Threading.Tasks;
using GenericLoopbackServer server = CreateServer(options);
await funcAsync(server, server.Address).TimeoutAfter(millisecondsTimeout).ConfigureAwait(false);
}
+
+ public override Task<GenericLoopbackConnection> CreateConnectionAsync(Socket socket, Stream stream, GenericLoopbackOptions options = null)
+ {
+ // TODO: make a new overload that takes a MultiplexedConnection.
+ // This method is always unacceptable to call for HTTP/3.
+ throw new NotImplementedException("HTTP/3 does not operate over a Socket.");
+ }
}
}
// This seems to help avoid connection reset issues caused by buffered data
// that has not been sent/acked when the graceful shutdown timeout expires.
// This may throw if the socket was already closed, so eat any exception.
- _socket.Shutdown(SocketShutdown.Send);
+ _socket?.Shutdown(SocketShutdown.Send);
}
catch (Exception) { }
_writer.Dispose();
_stream.Dispose();
- _socket.Dispose();
+ _socket?.Dispose();
+ }
+
+ public override Task InitializeConnectionAsync()
+ {
+ return Task.CompletedTask;
}
public async Task<List<string>> ReadRequestHeaderAsync()
return LoopbackServer.CreateServerAsync((server, uri) => funcAsync(server, uri), options: CreateOptions(options));
}
+ public override Task<GenericLoopbackConnection> CreateConnectionAsync(Socket socket, Stream stream, GenericLoopbackOptions options = null)
+ {
+ return Task.FromResult<GenericLoopbackConnection>(new LoopbackServer.Connection(socket, stream));
+ }
+
private static LoopbackServer.Options CreateOptions(GenericLoopbackOptions options)
{
LoopbackServer.Options newOptions = new LoopbackServer.Options();
public VirtualNetworkConnectionBroken() : base("Connection broken") { }
}
- private readonly int WaitForReadDataTimeoutMilliseconds = 30 * 1000;
+ private readonly int WaitForReadDataTimeoutMilliseconds = 60 * 1000;
private readonly ConcurrentQueue<byte[]> _clientWriteQueue = new ConcurrentQueue<byte[]>();
private readonly ConcurrentQueue<byte[]> _serverWriteQueue = new ConcurrentQueue<byte[]>();
private readonly SemaphoreSlim _clientDataAvailable = new SemaphoreSlim(0);
private readonly SemaphoreSlim _serverDataAvailable = new SemaphoreSlim(0);
+ private volatile bool _clientWriteShutdown = false;
+ private volatile bool _serverWriteShutdown = false;
+
public bool DisableConnectionBreaking { get; set; } = false;
- private bool _connectionBroken = false;
+ private volatile bool _connectionBroken = false;
public byte[] ReadFrame(bool server) =>
ReadFrameCoreAsync(server, sync: true, cancellationToken: default).GetAwaiter().GetResult();
return buffer;
}
+ if ((server && _clientWriteShutdown) || (!server && _serverWriteShutdown))
+ {
+ return Array.Empty<byte>();
+ }
+
remainingTries--;
backOffDelayMilliseconds *= backOffDelayMilliseconds;
if (sync)
throw new VirtualNetworkConnectionBroken();
}
+ if ((server && _serverWriteShutdown) || (!server && _clientWriteShutdown))
+ {
+ throw new InvalidOperationException("Writing to a shutdown side.");
+ }
+
+ if (buffer.Length == 0)
+ {
+ return;
+ }
+
SemaphoreSlim semaphore;
ConcurrentQueue<byte[]> packetQueue;
semaphore.Release();
}
+ public void GracefulShutdown(bool server)
+ {
+ if (server)
+ {
+ _serverWriteShutdown = true;
+ _serverDataAvailable.Release(1_000_000);
+ }
+ else
+ {
+ _clientWriteShutdown = true;
+ _clientDataAvailable.Release(1_000_000);
+ }
+ }
+
public void BreakConnection()
{
if (!DisableConnectionBreaking)
{
- _connectionBroken = true;
+ _connectionBroken = !_serverWriteShutdown || !_clientWriteShutdown;
_serverDataAvailable.Release(1_000_000);
_clientDataAvailable.Release(1_000_000);
}
--- /dev/null
+// Licensed to the .NET Foundation under one or more agreements.
+// The .NET Foundation licenses this file to you under the MIT license.
+
+using System.Diagnostics;
+using System.IO;
+using System.Net.Connections;
+using System.Threading;
+using System.Threading.Channels;
+using System.Threading.Tasks;
+
+namespace System.Net.Test.Common
+{
+
+ internal sealed class VirtualNetworkConnectionListenerFactory : ConnectionListenerFactory
+ {
+ public static ConnectionFactory GetConnectionFactory(ConnectionListener listener)
+ {
+ bool hasFactory = listener.ListenerProperties.TryGet(out VirtualConnectionFactory factory);
+ Debug.Assert(hasFactory);
+ return factory;
+ }
+
+ public override ValueTask<ConnectionListener> BindAsync(EndPoint endPoint, IConnectionProperties options = null, CancellationToken cancellationToken = default)
+ {
+ if (cancellationToken.IsCancellationRequested) return ValueTask.FromCanceled<ConnectionListener>(cancellationToken);
+ return new ValueTask<ConnectionListener>(new VirtualConnectionListener(endPoint));
+ }
+
+ protected override void Dispose(bool disposing)
+ {
+ }
+
+ protected override ValueTask DisposeAsyncCore()
+ {
+ return default;
+ }
+
+ private sealed class VirtualConnectionListener : ConnectionListener, IConnectionProperties
+ {
+ private readonly Channel<TaskCompletionSource<Connection>> _pendingConnects;
+ private readonly VirtualConnectionFactory _connectionFactory;
+ private readonly CancellationTokenSource _cts = new CancellationTokenSource();
+
+ public override IConnectionProperties ListenerProperties => this;
+
+ public override EndPoint LocalEndPoint { get; }
+
+ public VirtualConnectionListener(EndPoint localEndPoint)
+ {
+ LocalEndPoint = localEndPoint;
+
+ _pendingConnects = Channel.CreateUnbounded<TaskCompletionSource<Connection>>();
+ _connectionFactory = new VirtualConnectionFactory(this);
+ }
+
+ public override async ValueTask<Connection> AcceptAsync(IConnectionProperties options = null, CancellationToken cancellationToken = default)
+ {
+ using CancellationTokenSource cts = CancellationTokenSource.CreateLinkedTokenSource(_cts.Token, cancellationToken);
+
+ var network = new VirtualNetwork();
+ var serverConnection = new VirtualConnection(network, isServer: true);
+ var clientConnection = new VirtualConnection(network, isServer: false);
+
+ while (true)
+ {
+ TaskCompletionSource<Connection> tcs = await _pendingConnects.Reader.ReadAsync(cancellationToken);
+ if (tcs.TrySetResult(clientConnection))
+ {
+ return serverConnection;
+ }
+ }
+ }
+
+ internal async ValueTask<Connection> ConnectAsync(EndPoint endPoint, IConnectionProperties options = null, CancellationToken cancellationToken = default)
+ {
+ var tcs = new TaskCompletionSource<Connection>();
+ await _pendingConnects.Writer.WriteAsync(tcs, cancellationToken).ConfigureAwait(false);
+
+ using (CancellationTokenSource cts = CancellationTokenSource.CreateLinkedTokenSource(_cts.Token, cancellationToken))
+ using (cts.Token.UnsafeRegister(o => ((TaskCompletionSource<Connection>)o).TrySetCanceled(), tcs))
+ {
+ return await tcs.Task.ConfigureAwait(false);
+ }
+ }
+
+ protected override void Dispose(bool disposing)
+ {
+ if (disposing)
+ {
+ _cts.Cancel();
+ }
+ }
+
+ protected override ValueTask DisposeAsyncCore()
+ {
+ Dispose(true);
+ return default;
+ }
+
+ bool IConnectionProperties.TryGet(Type propertyKey, out object property)
+ {
+ if (propertyKey == typeof(VirtualConnectionFactory))
+ {
+ property = _connectionFactory;
+ return true;
+ }
+
+ property = null;
+ return false;
+ }
+ }
+
+ private sealed class VirtualConnectionFactory : ConnectionFactory
+ {
+ private readonly VirtualConnectionListener _listener;
+
+ public VirtualConnectionFactory(VirtualConnectionListener listener)
+ {
+ _listener = listener;
+ }
+
+ public override ValueTask<Connection> ConnectAsync(EndPoint endPoint, IConnectionProperties options = null, CancellationToken cancellationToken = default)
+ {
+ return _listener.ConnectAsync(endPoint, options, cancellationToken);
+ }
+
+ protected override void Dispose(bool disposing)
+ {
+ if (disposing)
+ {
+ _listener.Dispose();
+ }
+ }
+
+ protected override ValueTask DisposeAsyncCore()
+ {
+ return _listener.DisposeAsync();
+ }
+ }
+
+ private sealed class VirtualConnection : Connection, IConnectionProperties
+ {
+ private readonly VirtualNetwork _network;
+ private bool _isServer;
+
+ public override IConnectionProperties ConnectionProperties => this;
+
+ public override EndPoint LocalEndPoint => null;
+
+ public override EndPoint RemoteEndPoint => null;
+
+ public VirtualConnection(VirtualNetwork network, bool isServer)
+ {
+ _network = network;
+ _isServer = isServer;
+ }
+
+ protected override Stream CreateStream()
+ {
+ return new VirtualNetworkStream(_network, _isServer, gracefulShutdown: true);
+ }
+
+ protected override ValueTask CloseAsyncCore(ConnectionCloseMethod method, CancellationToken cancellationToken)
+ {
+ if (cancellationToken.IsCancellationRequested) return ValueTask.FromCanceled(cancellationToken);
+
+ if (method == ConnectionCloseMethod.GracefulShutdown)
+ {
+ _network.GracefulShutdown(_isServer);
+ }
+ else
+ {
+ _network.BreakConnection();
+ }
+
+ return default;
+ }
+
+ bool IConnectionProperties.TryGet(Type propertyKey, out object property)
+ {
+ property = null;
+ return false;
+ }
+ }
+ }
+
+}
private readonly VirtualNetwork _network;
private MemoryStream _readStream;
private readonly bool _isServer;
+ private readonly bool _gracefulShutdown;
private SemaphoreSlim _readStreamLock = new SemaphoreSlim(1, 1);
private TaskCompletionSource _flushTcs;
- public VirtualNetworkStream(VirtualNetwork network, bool isServer)
+ public VirtualNetworkStream(VirtualNetwork network, bool isServer, bool gracefulShutdown = false)
{
_network = network;
_isServer = isServer;
+ _gracefulShutdown = gracefulShutdown;
}
public int DelayMilliseconds { get; set; }
{
if (_readStream == null || (_readStream.Position >= _readStream.Length))
{
- _readStream = new MemoryStream(_network.ReadFrame(_isServer));
+ byte[] frame = _network.ReadFrame(_isServer);
+ if (frame.Length == 0) return 0;
+
+ _readStream = new MemoryStream(frame);
}
return _readStream.Read(buffer, offset, count);
if (_readStream == null || (_readStream.Position >= _readStream.Length))
{
- _readStream = new MemoryStream(await _network.ReadFrameAsync(_isServer, cancellationToken).ConfigureAwait(false));
+ byte[] frame = await _network.ReadFrameAsync(_isServer, cancellationToken).ConfigureAwait(false);
+ if (frame.Length == 0) return 0;
+
+ _readStream = new MemoryStream(frame);
}
return await _readStream.ReadAsync(buffer, offset, count, cancellationToken).ConfigureAwait(false);
if (disposing)
{
Disposed = true;
- _network.BreakConnection();
+ if (_gracefulShutdown)
+ {
+ GracefulShutdown();
+ }
+ else
+ {
+ _network.BreakConnection();
+ }
}
base.Dispose(disposing);
}
+
+ public void GracefulShutdown()
+ {
+ _network.GracefulShutdown(_isServer);
+ }
}
}
}
}
+#if !NETFRAMEWORK
+ public static Task TimeoutAfter(this ValueTask task, int millisecondsTimeout)
+ => task.AsTask().TimeoutAfter(TimeSpan.FromMilliseconds(millisecondsTimeout));
+
+ public static Task TimeoutAfter(this ValueTask task, TimeSpan timeout)
+ => task.AsTask().TimeoutAfter(timeout);
+
+ public static Task<TResult> TimeoutAfter<TResult>(this ValueTask<TResult> task, int millisecondsTimeout)
+ => task.AsTask().TimeoutAfter(TimeSpan.FromMilliseconds(millisecondsTimeout));
+
+ public static Task<TResult> TimeoutAfter<TResult>(this ValueTask<TResult> task, TimeSpan timeout)
+ => task.AsTask().TimeoutAfter(timeout);
+#endif
+
public static async Task WhenAllOrAnyFailed(this Task[] tasks, int millisecondsTimeout)
{
var cts = new CancellationTokenSource();
System.IO.FileSystem.Watcher;
System.IO.IsolatedStorage;
System.IO.MemoryMappedFiles;
+ System.IO.Pipelines;
System.IO.Pipes;
System.IO.Pipes.AccessControl;
System.IO.UnmanagedMemoryStream;
System.Linq.Parallel;
System.Linq.Queryable;
System.Memory;
+ System.Net.Connections;
System.Net.Http;
System.Net.Http.Json;
System.Net.HttpListener;
</ProjectReference>
<ProjectReference Include="..\src\System.IO.Pipelines.csproj" />
<HarvestIncludePaths Include="lib/netstandard1.3" />
+ <InboxOnTargetFramework Include="net5.0" />
</ItemGroup>
<Import Project="$([MSBuild]::GetPathOfFileAbove(Directory.Build.targets))" />
</Project>
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
- <TargetFrameworks>netstandard2.0;net461</TargetFrameworks>
+ <TargetFrameworks>$(NetCoreAppCurrent);netstandard2.0;net461</TargetFrameworks>
<Nullable>enable</Nullable>
<ExcludeFromPackage Condition="'$(TargetFramework)' == 'net461'">true</ExcludeFromPackage>
<!-- We only plan to use this ref in netcoreapp. For all other netstandard compatible frameworks
we should use the lib asset instead. -->
<PackageTargetFramework>netcoreapp2.0</PackageTargetFramework>
+ <ExcludeCurrentNetCoreAppFromPackage>true</ExcludeCurrentNetCoreAppFromPackage>
</PropertyGroup>
<ItemGroup>
<Compile Include="System.IO.Pipelines.cs" />
</ItemGroup>
- <ItemGroup>
+ <ItemGroup Condition="'$(TargetFramework)' == '$(NetCoreAppCurrent)'">
+ <ProjectReference Include="..\..\System.Runtime\ref\System.Runtime.csproj" />
+ <ProjectReference Include="..\..\System.Memory\ref\System.Memory.csproj" />
+ </ItemGroup>
+ <ItemGroup Condition="'$(TargetFramework)' != '$(NetCoreAppCurrent)'">
<PackageReference Include="System.Memory" Version="$(SystemMemoryVersion)" />
<PackageReference Include="System.Buffers" Version="$(SystemBuffersVersion)" />
<PackageReference Include="System.Threading.Tasks.Extensions" Version="$(SystemThreadingTasksExtensionsVersion)" />
--- /dev/null
+<Project>
+ <Import Project="..\Directory.Build.props" />
+ <PropertyGroup>
+ <StrongNameKeyId>Microsoft</StrongNameKeyId>
+ </PropertyGroup>
+</Project>
\ No newline at end of file
--- /dev/null
+Microsoft Visual Studio Solution File, Format Version 12.00
+# Visual Studio Version 16
+VisualStudioVersion = 16.0.30310.162
+MinimumVisualStudioVersion = 10.0.40219.1
+Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "System.Net.Connections", "src\System.Net.Connections.csproj", "{1D422B1D-D7C4-41B9-862D-EB3D98DF37DE}"
+ ProjectSection(ProjectDependencies) = postProject
+ {132BF813-FC40-4D39-8B6F-E55D7633F0ED} = {132BF813-FC40-4D39-8B6F-E55D7633F0ED}
+ EndProjectSection
+EndProject
+Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "System.Net.Connections", "ref\System.Net.Connections.csproj", "{132BF813-FC40-4D39-8B6F-E55D7633F0ED}"
+EndProject
+Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "src", "src", "{E107E9C1-E893-4E87-987E-04EF0DCEAEFD}"
+EndProject
+Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "ref", "ref", "{2E666815-2EDB-464B-9DF6-380BF4789AD4}"
+EndProject
+Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "tests", "tests", "{E0983BCC-F93B-4FFF-A4E4-EA874908783F}"
+EndProject
+Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "System.Net.Connections.Tests", "tests\System.Net.Connections.Tests\System.Net.Connections.Tests.csproj", "{A4DB505A-FFD8-4A7B-B31A-12AD623E7AD9}"
+EndProject
+Global
+ GlobalSection(SolutionConfigurationPlatforms) = preSolution
+ Debug|Any CPU = Debug|Any CPU
+ Release|Any CPU = Release|Any CPU
+ EndGlobalSection
+ GlobalSection(ProjectConfigurationPlatforms) = postSolution
+ {1D422B1D-D7C4-41B9-862D-EB3D98DF37DE}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
+ {1D422B1D-D7C4-41B9-862D-EB3D98DF37DE}.Debug|Any CPU.Build.0 = Debug|Any CPU
+ {1D422B1D-D7C4-41B9-862D-EB3D98DF37DE}.Release|Any CPU.ActiveCfg = Release|Any CPU
+ {1D422B1D-D7C4-41B9-862D-EB3D98DF37DE}.Release|Any CPU.Build.0 = Release|Any CPU
+ {132BF813-FC40-4D39-8B6F-E55D7633F0ED}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
+ {132BF813-FC40-4D39-8B6F-E55D7633F0ED}.Debug|Any CPU.Build.0 = Debug|Any CPU
+ {132BF813-FC40-4D39-8B6F-E55D7633F0ED}.Release|Any CPU.ActiveCfg = Release|Any CPU
+ {132BF813-FC40-4D39-8B6F-E55D7633F0ED}.Release|Any CPU.Build.0 = Release|Any CPU
+ {A4DB505A-FFD8-4A7B-B31A-12AD623E7AD9}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
+ {A4DB505A-FFD8-4A7B-B31A-12AD623E7AD9}.Debug|Any CPU.Build.0 = Debug|Any CPU
+ {A4DB505A-FFD8-4A7B-B31A-12AD623E7AD9}.Release|Any CPU.ActiveCfg = Release|Any CPU
+ {A4DB505A-FFD8-4A7B-B31A-12AD623E7AD9}.Release|Any CPU.Build.0 = Release|Any CPU
+ EndGlobalSection
+ GlobalSection(SolutionProperties) = preSolution
+ HideSolutionNode = FALSE
+ EndGlobalSection
+ GlobalSection(NestedProjects) = preSolution
+ {1D422B1D-D7C4-41B9-862D-EB3D98DF37DE} = {E107E9C1-E893-4E87-987E-04EF0DCEAEFD}
+ {132BF813-FC40-4D39-8B6F-E55D7633F0ED} = {2E666815-2EDB-464B-9DF6-380BF4789AD4}
+ {A4DB505A-FFD8-4A7B-B31A-12AD623E7AD9} = {E0983BCC-F93B-4FFF-A4E4-EA874908783F}
+ EndGlobalSection
+ GlobalSection(ExtensibilityGlobals) = postSolution
+ SolutionGuid = {5100F629-0FAB-4C6F-9A54-95AE9565EE0D}
+ EndGlobalSection
+EndGlobal
--- /dev/null
+// Licensed to the .NET Foundation under one or more agreements.
+// The .NET Foundation licenses this file to you under the MIT license.
+// ------------------------------------------------------------------------------
+// Changes to this file must follow the https://aka.ms/api-review process.
+// ------------------------------------------------------------------------------
+
+namespace System.Net.Connections
+{
+ public abstract partial class Connection : System.Net.Connections.ConnectionBase
+ {
+ protected Connection() { }
+ public System.IO.Pipelines.IDuplexPipe Pipe { get { throw null; } }
+ public System.IO.Stream Stream { get { throw null; } }
+ protected virtual System.IO.Pipelines.IDuplexPipe CreatePipe() { throw null; }
+ protected virtual System.IO.Stream CreateStream() { throw null; }
+ public static System.Net.Connections.Connection FromPipe(System.IO.Pipelines.IDuplexPipe pipe, bool leaveOpen = false, System.Net.Connections.IConnectionProperties? properties = null, System.Net.EndPoint? localEndPoint = null, System.Net.EndPoint? remoteEndPoint = null) { throw null; }
+ public static System.Net.Connections.Connection FromStream(System.IO.Stream stream, bool leaveOpen = false, System.Net.Connections.IConnectionProperties? properties = null, System.Net.EndPoint? localEndPoint = null, System.Net.EndPoint? remoteEndPoint = null) { throw null; }
+ }
+ public abstract partial class ConnectionBase : System.IAsyncDisposable, System.IDisposable
+ {
+ protected ConnectionBase() { }
+ public abstract System.Net.Connections.IConnectionProperties ConnectionProperties { get; }
+ public abstract System.Net.EndPoint? LocalEndPoint { get; }
+ public abstract System.Net.EndPoint? RemoteEndPoint { get; }
+ public System.Threading.Tasks.ValueTask CloseAsync(System.Net.Connections.ConnectionCloseMethod method = System.Net.Connections.ConnectionCloseMethod.GracefulShutdown, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
+ protected abstract System.Threading.Tasks.ValueTask CloseAsyncCore(System.Net.Connections.ConnectionCloseMethod method, System.Threading.CancellationToken cancellationToken);
+ public void Dispose() { }
+ public System.Threading.Tasks.ValueTask DisposeAsync() { throw null; }
+ }
+ public enum ConnectionCloseMethod
+ {
+ GracefulShutdown = 0,
+ Abort = 1,
+ Immediate = 2,
+ }
+ public static partial class ConnectionExtensions
+ {
+ public static System.Net.Connections.ConnectionFactory Filter(this System.Net.Connections.ConnectionFactory factory, System.Func<System.Net.Connections.Connection, System.Net.Connections.IConnectionProperties?, System.Threading.CancellationToken, System.Threading.Tasks.ValueTask<System.Net.Connections.Connection>> filter) { throw null; }
+ public static bool TryGet<T>(this System.Net.Connections.IConnectionProperties properties, [System.Diagnostics.CodeAnalysis.MaybeNullWhenAttribute(false)] out T property) { throw null; }
+ }
+ public abstract partial class ConnectionFactory : System.IAsyncDisposable, System.IDisposable
+ {
+ protected ConnectionFactory() { }
+ public abstract System.Threading.Tasks.ValueTask<System.Net.Connections.Connection> ConnectAsync(System.Net.EndPoint? endPoint, System.Net.Connections.IConnectionProperties? options = null, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken));
+ public void Dispose() { }
+ protected virtual void Dispose(bool disposing) { }
+ public System.Threading.Tasks.ValueTask DisposeAsync() { throw null; }
+ protected virtual System.Threading.Tasks.ValueTask DisposeAsyncCore() { throw null; }
+ }
+ public abstract partial class ConnectionListener : System.IAsyncDisposable, System.IDisposable
+ {
+ protected ConnectionListener() { }
+ public abstract System.Net.Connections.IConnectionProperties ListenerProperties { get; }
+ public abstract System.Net.EndPoint? LocalEndPoint { get; }
+ public abstract System.Threading.Tasks.ValueTask<System.Net.Connections.Connection> AcceptAsync(System.Net.Connections.IConnectionProperties? options = null, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken));
+ public void Dispose() { }
+ protected virtual void Dispose(bool disposing) { }
+ public System.Threading.Tasks.ValueTask DisposeAsync() { throw null; }
+ protected virtual System.Threading.Tasks.ValueTask DisposeAsyncCore() { throw null; }
+ }
+ public abstract partial class ConnectionListenerFactory : System.IAsyncDisposable, System.IDisposable
+ {
+ protected ConnectionListenerFactory() { }
+ public abstract System.Threading.Tasks.ValueTask<System.Net.Connections.ConnectionListener> BindAsync(System.Net.EndPoint? endPoint, System.Net.Connections.IConnectionProperties? options = null, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken));
+ public void Dispose() { }
+ protected virtual void Dispose(bool disposing) { }
+ public System.Threading.Tasks.ValueTask DisposeAsync() { throw null; }
+ protected virtual System.Threading.Tasks.ValueTask DisposeAsyncCore() { throw null; }
+ }
+ public partial interface IConnectionProperties
+ {
+ bool TryGet(System.Type propertyKey, [System.Diagnostics.CodeAnalysis.NotNullWhenAttribute(true)] out object? property);
+ }
+}
--- /dev/null
+<Project Sdk="Microsoft.NET.Sdk">
+ <PropertyGroup>
+ <TargetFrameworks>$(NetCoreAppCurrent)</TargetFrameworks>
+ <Nullable>enable</Nullable>
+ </PropertyGroup>
+ <ItemGroup>
+ <Compile Include="System.Net.Connections.cs" />
+ </ItemGroup>
+ <ItemGroup>
+ <ProjectReference Include="..\..\System.Runtime\ref\System.Runtime.csproj" />
+ <ProjectReference Include="..\..\System.Net.Primitives\ref\System.Net.Primitives.csproj" />
+ <ProjectReference Include="..\..\System.IO.Pipelines\ref\System.IO.Pipelines.csproj" />
+ </ItemGroup>
+</Project>
--- /dev/null
+<?xml version="1.0" encoding="utf-8"?>
+<root>
+ <!--
+ Microsoft ResX Schema
+
+ Version 2.0
+
+ The primary goals of this format is to allow a simple XML format
+ that is mostly human readable. The generation and parsing of the
+ various data types are done through the TypeConverter classes
+ associated with the data types.
+
+ Example:
+
+ ... ado.net/XML headers & schema ...
+ <resheader name="resmimetype">text/microsoft-resx</resheader>
+ <resheader name="version">2.0</resheader>
+ <resheader name="reader">System.Resources.ResXResourceReader, System.Windows.Forms, ...</resheader>
+ <resheader name="writer">System.Resources.ResXResourceWriter, System.Windows.Forms, ...</resheader>
+ <data name="Name1"><value>this is my long string</value><comment>this is a comment</comment></data>
+ <data name="Color1" type="System.Drawing.Color, System.Drawing">Blue</data>
+ <data name="Bitmap1" mimetype="application/x-microsoft.net.object.binary.base64">
+ <value>[base64 mime encoded serialized .NET Framework object]</value>
+ </data>
+ <data name="Icon1" type="System.Drawing.Icon, System.Drawing" mimetype="application/x-microsoft.net.object.bytearray.base64">
+ <value>[base64 mime encoded string representing a byte array form of the .NET Framework object]</value>
+ <comment>This is a comment</comment>
+ </data>
+
+ There are any number of "resheader" rows that contain simple
+ name/value pairs.
+
+ Each data row contains a name, and value. The row also contains a
+ type or mimetype. Type corresponds to a .NET class that support
+ text/value conversion through the TypeConverter architecture.
+ Classes that don't support this are serialized and stored with the
+ mimetype set.
+
+ The mimetype is used for serialized objects, and tells the
+ ResXResourceReader how to depersist the object. This is currently not
+ extensible. For a given mimetype the value must be set accordingly:
+
+ Note - application/x-microsoft.net.object.binary.base64 is the format
+ that the ResXResourceWriter will generate, however the reader can
+ read any of the formats listed below.
+
+ mimetype: application/x-microsoft.net.object.binary.base64
+ value : The object must be serialized with
+ : System.Runtime.Serialization.Formatters.Binary.BinaryFormatter
+ : and then encoded with base64 encoding.
+
+ mimetype: application/x-microsoft.net.object.soap.base64
+ value : The object must be serialized with
+ : System.Runtime.Serialization.Formatters.Soap.SoapFormatter
+ : and then encoded with base64 encoding.
+
+ mimetype: application/x-microsoft.net.object.bytearray.base64
+ value : The object must be serialized into a byte array
+ : using a System.ComponentModel.TypeConverter
+ : and then encoded with base64 encoding.
+ -->
+ <xsd:schema id="root" xmlns="" xmlns:xsd="http://www.w3.org/2001/XMLSchema" xmlns:msdata="urn:schemas-microsoft-com:xml-msdata">
+ <xsd:import namespace="http://www.w3.org/XML/1998/namespace" />
+ <xsd:element name="root" msdata:IsDataSet="true">
+ <xsd:complexType>
+ <xsd:choice maxOccurs="unbounded">
+ <xsd:element name="metadata">
+ <xsd:complexType>
+ <xsd:sequence>
+ <xsd:element name="value" type="xsd:string" minOccurs="0" />
+ </xsd:sequence>
+ <xsd:attribute name="name" use="required" type="xsd:string" />
+ <xsd:attribute name="type" type="xsd:string" />
+ <xsd:attribute name="mimetype" type="xsd:string" />
+ <xsd:attribute ref="xml:space" />
+ </xsd:complexType>
+ </xsd:element>
+ <xsd:element name="assembly">
+ <xsd:complexType>
+ <xsd:attribute name="alias" type="xsd:string" />
+ <xsd:attribute name="name" type="xsd:string" />
+ </xsd:complexType>
+ </xsd:element>
+ <xsd:element name="data">
+ <xsd:complexType>
+ <xsd:sequence>
+ <xsd:element name="value" type="xsd:string" minOccurs="0" msdata:Ordinal="1" />
+ <xsd:element name="comment" type="xsd:string" minOccurs="0" msdata:Ordinal="2" />
+ </xsd:sequence>
+ <xsd:attribute name="name" type="xsd:string" use="required" msdata:Ordinal="1" />
+ <xsd:attribute name="type" type="xsd:string" msdata:Ordinal="3" />
+ <xsd:attribute name="mimetype" type="xsd:string" msdata:Ordinal="4" />
+ <xsd:attribute ref="xml:space" />
+ </xsd:complexType>
+ </xsd:element>
+ <xsd:element name="resheader">
+ <xsd:complexType>
+ <xsd:sequence>
+ <xsd:element name="value" type="xsd:string" minOccurs="0" msdata:Ordinal="1" />
+ </xsd:sequence>
+ <xsd:attribute name="name" type="xsd:string" use="required" />
+ </xsd:complexType>
+ </xsd:element>
+ </xsd:choice>
+ </xsd:complexType>
+ </xsd:element>
+ </xsd:schema>
+ <resheader name="resmimetype">
+ <value>text/microsoft-resx</value>
+ </resheader>
+ <resheader name="version">
+ <value>2.0</value>
+ </resheader>
+ <resheader name="reader">
+ <value>System.Resources.ResXResourceReader, System.Windows.Forms, Version=4.0.0.0, Culture=neutral, PublicKeyToken=b77a5c561934e089</value>
+ </resheader>
+ <resheader name="writer">
+ <value>System.Resources.ResXResourceWriter, System.Windows.Forms, Version=4.0.0.0, Culture=neutral, PublicKeyToken=b77a5c561934e089</value>
+ </resheader>
+ <data name="net_connections_createpipe_null" xml:space="preserve">
+ <value>The CreatePipe implementation returned null; a valid reference was expected.</value>
+ </data>
+ <data name="net_connections_createstream_null" xml:space="preserve">
+ <value>The CreateStream implementation returned null; a valid reference was expected.</value>
+ </data>
+ <data name="net_connections_no_create_overrides" xml:space="preserve">
+ <value>One of CreatePipe or CreateStream must be implemented</value>
+ </data>
+ <data name="net_connections_pipe_use_after_stream" xml:space="preserve">
+ <value>The Connection's Pipe may not be accessed after Stream has been accessed.</value>
+ </data>
+ <data name="net_connections_stream_use_after_pipe" xml:space="preserve">
+ <value>The Connection's Stream may not be accessed after Pipe has been accessed.</value>
+ </data>
+ <data name="net_connections_zero_byte_pipe_read" xml:space="preserve">
+ <value>The PipeReader returned a zero-length read.</value>
+ </data>
+</root>
\ No newline at end of file
--- /dev/null
+<Project Sdk="Microsoft.NET.Sdk">
+ <PropertyGroup>
+ <TargetFrameworks>$(NetCoreAppCurrent)</TargetFrameworks>
+ <Nullable>enable</Nullable>
+ </PropertyGroup>
+ <ItemGroup>
+ <Compile Include="$(CommonPath)System\Threading\Tasks\TaskToApm.cs" Link="Common\System\Threading\Tasks\TaskToApm.cs" />
+ <Compile Include="System\Net\Connections\ConnectionBase.cs" />
+ <Compile Include="System\Net\Connections\ConnectionCloseMethod.cs" />
+ <Compile Include="System\Net\Connections\ConnectionExtensions.cs" />
+ <Compile Include="System\Net\Connections\ConnectionListenerFactory.cs" />
+ <Compile Include="System\Net\Connections\Connection.cs" />
+ <Compile Include="System\Net\Connections\DuplexPipeStream.cs" />
+ <Compile Include="System\Net\Connections\ConnectionFactory.cs" />
+ <Compile Include="System\Net\Connections\ConnectionListener.cs" />
+ <Compile Include="System\Net\Connections\IConnectionProperties.cs" />
+ </ItemGroup>
+ <ItemGroup>
+ <Reference Include="System.Runtime" />
+ <Reference Include="System.Memory" />
+ <Reference Include="System.Net.Primitives" />
+ <Reference Include="System.Threading" />
+ <Reference Include="System.Threading.Tasks" />
+ <Reference Include="System.IO.Pipelines" />
+ </ItemGroup>
+</Project>
--- /dev/null
+// Licensed to the .NET Foundation under one or more agreements.
+// The .NET Foundation licenses this file to you under the MIT license.
+
+using System.Diagnostics;
+using System.Diagnostics.CodeAnalysis;
+using System.IO;
+using System.IO.Pipelines;
+using System.Runtime.ExceptionServices;
+using System.Threading;
+using System.Threading.Tasks;
+
+namespace System.Net.Connections
+{
+ /// <summary>
+ /// A connection.
+ /// </summary>
+ public abstract class Connection : ConnectionBase
+ {
+ private Stream? _stream;
+ private IDuplexPipe? _pipe;
+ private bool _initializing;
+
+ /// <summary>
+ /// The connection's <see cref="Stream"/>.
+ /// </summary>
+ public Stream Stream =>
+ _stream != null ? _stream :
+ _pipe != null ? throw new InvalidOperationException(SR.net_connections_stream_use_after_pipe) :
+ (_stream = CreateStream() ?? throw new InvalidOperationException(SR.net_connections_createstream_null));
+
+ /// <summary>
+ /// The connection's <see cref="IDuplexPipe"/>.
+ /// </summary>
+ public IDuplexPipe Pipe =>
+ _pipe != null ? _pipe :
+ _stream != null ? throw new InvalidOperationException(SR.net_connections_pipe_use_after_stream) :
+ (_pipe = CreatePipe() ?? throw new InvalidOperationException(SR.net_connections_createpipe_null));
+
+ /// <summary>
+ /// Initializes the <see cref="Stream"/> for the <see cref="Connection"/>.
+ /// </summary>
+ /// <returns>A <see cref="Stream"/>.</returns>
+ /// <remarks>
+ /// At least one of <see cref="CreateStream"/> and <see cref="CreatePipe"/> must be overridden.
+ /// If only <see cref="CreateStream"/> is overridden, a user accessing <see cref="Pipe"/> will get a <see cref="IDuplexPipe"/> wrapping the <see cref="Stream"/>.
+ /// </remarks>
+ protected virtual Stream CreateStream()
+ {
+ if (_initializing) throw new InvalidOperationException(SR.net_connections_no_create_overrides);
+
+ try
+ {
+ _initializing = true;
+
+ IDuplexPipe pipe = CreatePipe();
+ if (pipe == null) throw new InvalidOperationException(SR.net_connections_createpipe_null);
+
+ return new DuplexPipeStream(pipe);
+ }
+ finally
+ {
+ _initializing = false;
+ }
+ }
+
+ /// <summary>
+ /// Initializes the <see cref="Pipe"/> for the <see cref="Connection"/>.
+ /// </summary>
+ /// <returns>An <see cref="IDuplexPipe"/>.</returns>
+ /// <remarks>
+ /// At least one of <see cref="CreateStream"/> and <see cref="CreatePipe"/> must be overridden.
+ /// If only <see cref="CreatePipe"/> is overridden, a user accessing <see cref="Stream"/> will get a <see cref="Stream"/> wrapping the <see cref="Pipe"/>.
+ /// </remarks>
+ protected virtual IDuplexPipe CreatePipe()
+ {
+ if (_initializing) throw new InvalidOperationException(SR.net_connections_no_create_overrides);
+
+ try
+ {
+ _initializing = true;
+
+ Stream stream = CreateStream();
+ if (stream == null) throw new InvalidOperationException(SR.net_connections_createstream_null);
+
+ return new DuplexStreamPipe(stream);
+ }
+ finally
+ {
+ _initializing = false;
+ }
+ }
+
+ private sealed class DuplexStreamPipe : IDuplexPipe
+ {
+ private static readonly StreamPipeReaderOptions s_readerOpts = new StreamPipeReaderOptions(leaveOpen: true);
+ private static readonly StreamPipeWriterOptions s_writerOpts = new StreamPipeWriterOptions(leaveOpen: true);
+
+ public DuplexStreamPipe(Stream stream)
+ {
+ Input = PipeReader.Create(stream, s_readerOpts);
+ Output = PipeWriter.Create(stream, s_writerOpts);
+ }
+
+ public PipeReader Input { get; }
+
+ public PipeWriter Output { get; }
+ }
+
+ /// <summary>
+ /// Creates a connection for a <see cref="Stream"/>.
+ /// </summary>
+ /// <param name="stream">The connection's <see cref="Connection.Stream"/>.</param>
+ /// <param name="leaveOpen">If false, the <paramref name="stream"/> will be disposed of once the connection has been closed.</param>
+ /// <param name="properties">The connection's <see cref="ConnectionBase.ConnectionProperties"/>.</param>
+ /// <param name="localEndPoint">The connection's <see cref="ConnectionBase.LocalEndPoint"/>.</param>
+ /// <param name="remoteEndPoint">The connection's <see cref="ConnectionBase.RemoteEndPoint"/>.</param>
+ /// <returns>A new <see cref="Connection"/>.</returns>
+ public static Connection FromStream(Stream stream, bool leaveOpen = false, IConnectionProperties? properties = null, EndPoint? localEndPoint = null, EndPoint? remoteEndPoint = null)
+ {
+ if (stream == null) throw new ArgumentNullException(nameof(stream));
+ return new ConnectionFromStream(stream, leaveOpen, properties, localEndPoint, remoteEndPoint);
+ }
+
+ /// <summary>
+ /// Creates a connection for an <see cref="IDuplexPipe"/>.
+ /// </summary>
+ /// <param name="pipe">The connection's <see cref="Connection.Pipe"/>.</param>
+ /// <param name="leaveOpen">If false and the <paramref name="pipe"/> implements <see cref="IAsyncDisposable"/> or <see cref="IDisposable"/>, it will be disposed of once the connection has been closed.</param>
+ /// <param name="properties">The connection's <see cref="ConnectionBase.ConnectionProperties"/>.</param>
+ /// <param name="localEndPoint">The connection's <see cref="ConnectionBase.LocalEndPoint"/>.</param>
+ /// <param name="remoteEndPoint">The connection's <see cref="ConnectionBase.RemoteEndPoint"/>.</param>
+ /// <returns>A new <see cref="Connection"/>.</returns>
+ public static Connection FromPipe(IDuplexPipe pipe, bool leaveOpen = false, IConnectionProperties? properties = null, EndPoint? localEndPoint = null, EndPoint? remoteEndPoint = null)
+ {
+ if (pipe == null) throw new ArgumentNullException(nameof(pipe));
+ return new ConnectionFromPipe(pipe, leaveOpen, properties, localEndPoint, remoteEndPoint);
+ }
+
+ private sealed class ConnectionFromStream : Connection, IConnectionProperties
+ {
+ private Stream? _originalStream;
+ private IConnectionProperties? _properties;
+ private readonly bool _leaveOpen;
+
+ public override IConnectionProperties ConnectionProperties => _properties ?? this;
+
+ public override EndPoint? LocalEndPoint { get; }
+
+ public override EndPoint? RemoteEndPoint { get; }
+
+ public ConnectionFromStream(Stream stream, bool leaveOpen, IConnectionProperties? properties, EndPoint? localEndPoint, EndPoint? remoteEndPoint)
+ {
+ _originalStream = stream;
+ _leaveOpen = leaveOpen;
+ _properties = properties;
+ LocalEndPoint = localEndPoint;
+ RemoteEndPoint = remoteEndPoint;
+ }
+
+ protected override Stream CreateStream() => _originalStream ?? throw new ObjectDisposedException(nameof(Connection));
+
+ protected override async ValueTask CloseAsyncCore(ConnectionCloseMethod method, CancellationToken cancellationToken)
+ {
+ if (_originalStream == null)
+ {
+ return;
+ }
+
+ if (method == ConnectionCloseMethod.GracefulShutdown)
+ {
+ await _originalStream.FlushAsync(cancellationToken).ConfigureAwait(false);
+ }
+
+ if (!_leaveOpen)
+ {
+ await _originalStream.DisposeAsync().ConfigureAwait(false);
+ }
+
+ _originalStream = null;
+ }
+
+ bool IConnectionProperties.TryGet(Type propertyKey, [NotNullWhen(true)] out object? property)
+ {
+ property = null;
+ return false;
+ }
+ }
+
+ private sealed class ConnectionFromPipe : Connection, IConnectionProperties
+ {
+ private IDuplexPipe? _originalPipe;
+ private IConnectionProperties? _properties;
+ private readonly bool _leaveOpen;
+
+ public override IConnectionProperties ConnectionProperties => _properties ?? this;
+
+ public override EndPoint? LocalEndPoint { get; }
+
+ public override EndPoint? RemoteEndPoint { get; }
+
+ public ConnectionFromPipe(IDuplexPipe pipe, bool leaveOpen, IConnectionProperties? properties, EndPoint? localEndPoint, EndPoint? remoteEndPoint)
+ {
+ _originalPipe = pipe;
+ _leaveOpen = leaveOpen;
+ _properties = properties;
+ LocalEndPoint = localEndPoint;
+ RemoteEndPoint = remoteEndPoint;
+ }
+
+ protected override IDuplexPipe CreatePipe() => _originalPipe ?? throw new ObjectDisposedException(nameof(Connection));
+
+ protected override async ValueTask CloseAsyncCore(ConnectionCloseMethod method, CancellationToken cancellationToken)
+ {
+ if (_originalPipe == null)
+ {
+ return;
+ }
+
+ Exception? inputException, outputException;
+
+ if (method == ConnectionCloseMethod.GracefulShutdown)
+ {
+ // Flush happens implicitly from CompleteAsync(null), so only flush here if we need cancellation.
+ if (cancellationToken.CanBeCanceled)
+ {
+ FlushResult r = await _originalPipe.Output.FlushAsync(cancellationToken).ConfigureAwait(false);
+ if (r.IsCanceled) cancellationToken.ThrowIfCancellationRequested();
+ }
+
+ inputException = null;
+ outputException = null;
+ }
+ else
+ {
+ inputException = ExceptionDispatchInfo.SetCurrentStackTrace(new ObjectDisposedException(nameof(Connection)));
+ outputException = ExceptionDispatchInfo.SetCurrentStackTrace(new ObjectDisposedException(nameof(Connection)));
+ }
+
+ await _originalPipe.Input.CompleteAsync(inputException).ConfigureAwait(false);
+ await _originalPipe.Output.CompleteAsync(outputException).ConfigureAwait(false);
+
+ if (!_leaveOpen)
+ {
+ switch (_originalPipe)
+ {
+ case IAsyncDisposable d:
+ await d.DisposeAsync().ConfigureAwait(false);
+ break;
+ case IDisposable d:
+ d.Dispose();
+ break;
+ }
+ }
+
+ _originalPipe = null;
+ }
+
+ bool IConnectionProperties.TryGet(Type propertyKey, [NotNullWhen(true)] out object? property)
+ {
+ property = null;
+ return false;
+ }
+ }
+ }
+}
--- /dev/null
+// Licensed to the .NET Foundation under one or more agreements.
+// The .NET Foundation licenses this file to you under the MIT license.
+
+using System.Threading;
+using System.Threading.Tasks;
+
+namespace System.Net.Connections
+{
+ /// <summary>
+ /// Provides base functionality shared between singular (e.g. TCP) and multiplexed (e.g. QUIC) connections.
+ /// </summary>
+ public abstract class ConnectionBase : IDisposable, IAsyncDisposable
+ {
+ private bool _disposed;
+
+ /// <summary>
+ /// Properties exposed by this connection.
+ /// </summary>
+ public abstract IConnectionProperties ConnectionProperties { get; }
+
+ /// <summary>
+ /// The local endpoint of this connection, if any.
+ /// </summary>
+ public abstract EndPoint? LocalEndPoint { get; }
+
+ /// <summary>
+ /// The remote endpoint of this connection, if any.
+ /// </summary>
+ public abstract EndPoint? RemoteEndPoint { get; }
+
+ /// <summary>
+ /// Closes the connection.
+ /// </summary>
+ /// <param name="method">The method to use when closing the connection.</param>
+ /// <param name="cancellationToken">A cancellation token for the asynchronous operation.</param>
+ /// <returns>A <see cref="ValueTask"/> for the asynchronous operation.</returns>
+ public async ValueTask CloseAsync(ConnectionCloseMethod method = ConnectionCloseMethod.GracefulShutdown, CancellationToken cancellationToken = default)
+ {
+ if (!_disposed)
+ {
+ await CloseAsyncCore(method, cancellationToken).ConfigureAwait(false);
+ _disposed = true;
+ }
+ GC.SuppressFinalize(this);
+ }
+
+ /// <summary>
+ /// Closes the connection.
+ /// </summary>
+ /// <param name="method">The method to use when closing the connection.</param>
+ /// <param name="cancellationToken">A cancellation token for the asynchronous operation.</param>
+ /// <returns>A <see cref="ValueTask"/> for the asynchronous operation.</returns>
+ protected abstract ValueTask CloseAsyncCore(ConnectionCloseMethod method, CancellationToken cancellationToken);
+
+ /// <summary>
+ /// Disposes of the connection.
+ /// </summary>
+ /// <remarks>
+ /// This is equivalent to calling <see cref="CloseAsync(ConnectionCloseMethod, CancellationToken)"/> with the method <see cref="ConnectionCloseMethod.GracefulShutdown"/>, and calling GetAwaiter().GetResult() on the resulting task.
+ /// To increase likelihood of synchronous completion, call <see cref="CloseAsync(ConnectionCloseMethod, CancellationToken)"/> directly with the method <see cref="ConnectionCloseMethod.Immediate"/>.
+ /// </remarks>
+ public void Dispose()
+ {
+ ValueTask t = CloseAsync(ConnectionCloseMethod.GracefulShutdown, CancellationToken.None);
+
+ if (t.IsCompleted) t.GetAwaiter().GetResult();
+ else t.AsTask().GetAwaiter().GetResult();
+ }
+
+ /// <summary>
+ /// Disposes of the connection.
+ /// </summary>
+ /// <returns>A <see cref="ValueTask"/> for the asynchronous operation.</returns>
+ /// <remarks>This is equivalent to calling <see cref="CloseAsync(ConnectionCloseMethod, CancellationToken)"/> with the method <see cref="ConnectionCloseMethod.GracefulShutdown"/>.</remarks>
+ public ValueTask DisposeAsync()
+ {
+ return CloseAsync(ConnectionCloseMethod.GracefulShutdown, CancellationToken.None);
+ }
+ }
+}
--- /dev/null
+// Licensed to the .NET Foundation under one or more agreements.
+// The .NET Foundation licenses this file to you under the MIT license.
+
+namespace System.Net.Connections
+{
+ /// <summary>
+ /// Methods for closing a connection.
+ /// </summary>
+ public enum ConnectionCloseMethod
+ {
+ /// <summary>
+ /// The connection should be flushed and closed.
+ /// </summary>
+ GracefulShutdown,
+
+ /// <summary>
+ /// The connection should be aborted gracefully, performing any I/O needed to notify the other side of the connection that it has been aborted.
+ /// </summary>
+ Abort,
+
+ /// <summary>
+ /// The connection should be aborted immediately, avoiding any I/O needed to notify the other side of the connection that it has been aborted.
+ /// </summary>
+ Immediate
+ }
+}
--- /dev/null
+// Licensed to the .NET Foundation under one or more agreements.
+// The .NET Foundation licenses this file to you under the MIT license.
+
+using System.Diagnostics.CodeAnalysis;
+using System.Threading;
+using System.Threading.Tasks;
+
+namespace System.Net.Connections
+{
+ /// <summary>
+ /// Extension methods for working with the System.Net.Connections types.
+ /// </summary>
+ public static class ConnectionExtensions
+ {
+ /// <summary>
+ /// Retrieves a Type-based property from an <see cref="IConnectionProperties"/>, if it exists.
+ /// </summary>
+ /// <typeparam name="T">The type of the property to retrieve.</typeparam>
+ /// <param name="properties">The connection properties to retrieve a property from.</param>
+ /// <param name="property">If <paramref name="properties"/> contains a property of type <typeparamref name="T"/>, receives the property. Otherwise, default.</param>
+ /// <returns>If <paramref name="properties"/> contains a property of type <typeparamref name="T"/>, true. Otherwise, false.</returns>
+ public static bool TryGet<T>(this IConnectionProperties properties, [MaybeNullWhen(false)] out T property)
+ {
+ if (properties == null) throw new ArgumentNullException(nameof(properties));
+
+ if (properties.TryGet(typeof(T), out object? obj) && obj is T propertyValue)
+ {
+ property = propertyValue;
+ return true;
+ }
+ else
+ {
+ property = default;
+ return false;
+ }
+ }
+
+ /// <summary>
+ /// Creates a connection-level filter on top of a <see cref="ConnectionFactory"/>.
+ /// </summary>
+ /// <param name="factory">The factory to be filtered.</param>
+ /// <param name="filter">The connection-level filter to apply on top of <paramref name="factory"/>.</param>
+ /// <returns>A new filtered <see cref="ConnectionFactory"/>.</returns>
+ public static ConnectionFactory Filter(this ConnectionFactory factory, Func<Connection, IConnectionProperties?, CancellationToken, ValueTask<Connection>> filter)
+ {
+ if (factory == null) throw new ArgumentNullException(nameof(factory));
+ if (filter == null) throw new ArgumentNullException(nameof(filter));
+ return new ConnectionFilteringFactory(factory, filter);
+ }
+
+ private sealed class ConnectionFilteringFactory : ConnectionFactory
+ {
+ private readonly ConnectionFactory _baseFactory;
+ private readonly Func<Connection, IConnectionProperties?, CancellationToken, ValueTask<Connection>> _filter;
+
+ public ConnectionFilteringFactory(ConnectionFactory baseFactory, Func<Connection, IConnectionProperties?, CancellationToken, ValueTask<Connection>> filter)
+ {
+ _baseFactory = baseFactory;
+ _filter = filter;
+ }
+
+ public override async ValueTask<Connection> ConnectAsync(EndPoint? endPoint, IConnectionProperties? options = null, CancellationToken cancellationToken = default)
+ {
+ Connection con = await _baseFactory.ConnectAsync(endPoint, options, cancellationToken).ConfigureAwait(false);
+ try
+ {
+ return await _filter(con, options, cancellationToken).ConfigureAwait(false);
+ }
+ catch
+ {
+ await con.CloseAsync(ConnectionCloseMethod.Abort, cancellationToken).ConfigureAwait(false);
+ throw;
+ }
+ }
+
+ protected override void Dispose(bool disposing)
+ {
+ if (disposing) _baseFactory.Dispose();
+ }
+
+ protected override ValueTask DisposeAsyncCore()
+ {
+ return _baseFactory.DisposeAsync();
+ }
+ }
+ }
+}
--- /dev/null
+// Licensed to the .NET Foundation under one or more agreements.
+// The .NET Foundation licenses this file to you under the MIT license.
+
+using System.Threading;
+using System.Threading.Tasks;
+
+namespace System.Net.Connections
+{
+ /// <summary>
+ /// A factory for opening outgoing connections.
+ /// </summary>
+ public abstract class ConnectionFactory : IAsyncDisposable, IDisposable
+ {
+ /// <summary>
+ /// Opens a new <see cref="Connection"/>.
+ /// </summary>
+ /// <param name="endPoint">The <see cref="EndPoint"/> to connect to, if any.</param>
+ /// <param name="options">Options used to create the connection, if any.</param>
+ /// <param name="cancellationToken">A token used to cancel the asynchronous operation.</param>
+ /// <returns>A <see cref="ValueTask{TResult}"/> for the <see cref="Connection"/>.</returns>
+ public abstract ValueTask<Connection> ConnectAsync(EndPoint? endPoint, IConnectionProperties? options = null, CancellationToken cancellationToken = default);
+
+ public void Dispose()
+ {
+ Dispose(true);
+ GC.SuppressFinalize(this);
+ }
+
+ public async ValueTask DisposeAsync()
+ {
+ await DisposeAsyncCore().ConfigureAwait(false);
+ GC.SuppressFinalize(this);
+ }
+
+ /// <summary>
+ /// Disposes the <see cref="ConnectionFactory"/>.
+ /// </summary>
+ /// <param name="disposing">If true, the <see cref="ConnectionFactory"/> is being disposed. If false, the <see cref="ConnectionFactory"/> is being finalized.</param>
+ protected virtual void Dispose(bool disposing)
+ {
+ }
+
+ /// <summary>
+ /// Asynchronously disposes the <see cref="ConnectionFactory"/>.
+ /// </summary>
+ /// <returns>A <see cref="ValueTask"/> representing the asynchronous dispose operation.</returns>
+ protected virtual ValueTask DisposeAsyncCore()
+ {
+ Dispose(true);
+ return default;
+ }
+ }
+}
--- /dev/null
+// Licensed to the .NET Foundation under one or more agreements.
+// The .NET Foundation licenses this file to you under the MIT license.
+
+using System.Threading;
+using System.Threading.Tasks;
+
+namespace System.Net.Connections
+{
+ /// <summary>
+ /// A listener to accept incoming connections.
+ /// </summary>
+ public abstract class ConnectionListener : IAsyncDisposable, IDisposable
+ {
+ /// <summary>
+ /// Properties exposed by this listener.
+ /// </summary>
+ public abstract IConnectionProperties ListenerProperties { get; }
+
+ /// <summary>
+ /// The local endpoint of this connection, if any.
+ /// </summary>
+ public abstract EndPoint? LocalEndPoint { get; }
+
+ /// <summary>
+ /// Accepts an incoming connection.
+ /// </summary>
+ /// <param name="options">Options used to create the connection, if any.</param>
+ /// <param name="cancellationToken">A token used to cancel the asynchronous operation.</param>
+ /// <returns>A <see cref="ValueTask{TResult}"/> for the <see cref="Connection"/>.</returns>
+ public abstract ValueTask<Connection> AcceptAsync(IConnectionProperties? options = null, CancellationToken cancellationToken = default);
+
+ public void Dispose()
+ {
+ Dispose(true);
+ GC.SuppressFinalize(this);
+ }
+
+ public async ValueTask DisposeAsync()
+ {
+ await DisposeAsyncCore().ConfigureAwait(false);
+ GC.SuppressFinalize(this);
+ }
+
+ /// <summary>
+ /// Disposes the <see cref="ConnectionFactory"/>.
+ /// </summary>
+ /// <param name="disposing">If true, the <see cref="ConnectionFactory"/> is being disposed. If false, the <see cref="ConnectionFactory"/> is being finalized.</param>
+ protected virtual void Dispose(bool disposing)
+ {
+ }
+
+ /// <summary>
+ /// Asynchronously disposes the <see cref="ConnectionFactory"/>.
+ /// </summary>
+ /// <returns>A <see cref="ValueTask"/> representing the asynchronous dispose operation.</returns>
+ protected virtual ValueTask DisposeAsyncCore()
+ {
+ Dispose(true);
+ return default;
+ }
+ }
+}
--- /dev/null
+// Licensed to the .NET Foundation under one or more agreements.
+// The .NET Foundation licenses this file to you under the MIT license.
+
+using System.Threading;
+using System.Threading.Tasks;
+
+namespace System.Net.Connections
+{
+ /// <summary>
+ /// A factory for creating connection listeners, to accept incoming connections.
+ /// </summary>
+ public abstract class ConnectionListenerFactory : IAsyncDisposable, IDisposable
+ {
+ public abstract ValueTask<ConnectionListener> BindAsync(EndPoint? endPoint, IConnectionProperties? options = null, CancellationToken cancellationToken = default);
+
+ public void Dispose()
+ {
+ Dispose(true);
+ GC.SuppressFinalize(this);
+ }
+
+ public async ValueTask DisposeAsync()
+ {
+ await DisposeAsyncCore().ConfigureAwait(false);
+ GC.SuppressFinalize(this);
+ }
+
+ /// <summary>
+ /// Disposes the <see cref="ConnectionFactory"/>.
+ /// </summary>
+ /// <param name="disposing">If true, the <see cref="ConnectionFactory"/> is being disposed. If false, the <see cref="ConnectionFactory"/> is being finalized.</param>
+ protected virtual void Dispose(bool disposing)
+ {
+ }
+
+ /// <summary>
+ /// Asynchronously disposes the <see cref="ConnectionFactory"/>.
+ /// </summary>
+ /// <returns>A <see cref="ValueTask"/> representing the asynchronous dispose operation.</returns>
+ protected virtual ValueTask DisposeAsyncCore()
+ {
+ Dispose(true);
+ return default;
+ }
+ }
+}
--- /dev/null
+// Licensed to the .NET Foundation under one or more agreements.
+// The .NET Foundation licenses this file to you under the MIT license.
+
+using System;
+using System.Buffers;
+using System.IO;
+using System.IO.Pipelines;
+using System.Runtime.ExceptionServices;
+using System.Threading;
+using System.Threading.Tasks;
+
+namespace System.Net.Connections
+{
+ internal sealed class DuplexPipeStream : Stream
+ {
+ private readonly PipeReader _reader;
+ private readonly PipeWriter _writer;
+
+ public override bool CanRead => true;
+ public override bool CanSeek => false;
+ public override bool CanWrite => true;
+ public override long Length => throw new NotSupportedException();
+ public override long Position { get => throw new NotSupportedException(); set => throw new NotSupportedException(); }
+
+ public DuplexPipeStream(IDuplexPipe pipe)
+ {
+ _reader = pipe.Input;
+ _writer = pipe.Output;
+ }
+
+ protected override void Dispose(bool disposing)
+ {
+ if (disposing)
+ {
+ _reader.Complete();
+ _writer.Complete();
+ }
+ base.Dispose(disposing);
+ }
+
+ public override async ValueTask DisposeAsync()
+ {
+ await _reader.CompleteAsync().ConfigureAwait(false);
+ await _writer.CompleteAsync().ConfigureAwait(false);
+ }
+
+ public override void Flush()
+ {
+ FlushAsync().GetAwaiter().GetResult();
+ }
+
+ public override async Task FlushAsync(CancellationToken cancellationToken)
+ {
+ FlushResult r = await _writer.FlushAsync(cancellationToken).ConfigureAwait(false);
+ if (r.IsCanceled) throw new OperationCanceledException(cancellationToken);
+ }
+
+ public override int Read(byte[] buffer, int offset, int count)
+ {
+ if (buffer == null) throw new ArgumentNullException(nameof(buffer));
+
+ ValueTask<int> t = ReadAsync(buffer.AsMemory(offset, count));
+ return
+ t.IsCompleted ? t.GetAwaiter().GetResult() :
+ t.AsTask().GetAwaiter().GetResult();
+ }
+
+ public override Task<int> ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
+ {
+ if (buffer == null) return Task.FromException<int>(ExceptionDispatchInfo.SetCurrentStackTrace(new ArgumentNullException(nameof(buffer))));
+ return ReadAsync(buffer.AsMemory(offset, count), cancellationToken).AsTask();
+ }
+
+ public override async ValueTask<int> ReadAsync(Memory<byte> buffer, CancellationToken cancellationToken = default)
+ {
+ ReadResult result = await _reader.ReadAsync(cancellationToken).ConfigureAwait(false);
+
+ if (result.IsCanceled)
+ {
+ throw new OperationCanceledException();
+ }
+
+ ReadOnlySequence<byte> sequence = result.Buffer;
+ long bufferLength = sequence.Length;
+ SequencePosition consumed = sequence.Start;
+
+ try
+ {
+ if (bufferLength != 0)
+ {
+ int actual = (int)Math.Min(bufferLength, buffer.Length);
+
+ ReadOnlySequence<byte> slice = actual == bufferLength ? sequence : sequence.Slice(0, actual);
+ consumed = slice.End;
+ slice.CopyTo(buffer.Span);
+
+ return actual;
+ }
+
+ if (result.IsCompleted)
+ {
+ return 0;
+ }
+ }
+ finally
+ {
+ _reader.AdvanceTo(consumed);
+ }
+
+ // This is a buggy PipeReader implementation that returns 0 byte reads even though the PipeReader
+ // isn't completed or canceled.
+ throw new InvalidOperationException(SR.net_connections_zero_byte_pipe_read);
+ }
+
+ public override IAsyncResult BeginRead(byte[] buffer, int offset, int count, AsyncCallback? callback, object? state)
+ {
+ return TaskToApm.Begin(ReadAsync(buffer, offset, count), callback, state);
+ }
+
+ public override int EndRead(IAsyncResult asyncResult)
+ {
+ return TaskToApm.End<int>(asyncResult);
+ }
+
+ public override long Seek(long offset, SeekOrigin origin)
+ {
+ throw new NotSupportedException();
+ }
+
+ public override void SetLength(long value)
+ {
+ throw new NotSupportedException();
+ }
+
+ public override void Write(byte[] buffer, int offset, int count)
+ {
+ WriteAsync(buffer, offset, count, CancellationToken.None).GetAwaiter().GetResult();
+ }
+
+ public override Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
+ {
+ return WriteAsync(buffer.AsMemory(offset, count), cancellationToken).AsTask();
+ }
+
+ public override async ValueTask WriteAsync(ReadOnlyMemory<byte> buffer, CancellationToken cancellationToken = default)
+ {
+ FlushResult r = await _writer.WriteAsync(buffer, cancellationToken).ConfigureAwait(false);
+ if (r.IsCanceled) throw new OperationCanceledException(cancellationToken);
+ }
+
+ public override IAsyncResult BeginWrite(byte[] buffer, int offset, int count, AsyncCallback? callback, object? state)
+ {
+ return TaskToApm.Begin(WriteAsync(buffer, offset, count), callback, state);
+ }
+
+ public override void EndWrite(IAsyncResult asyncResult)
+ {
+ TaskToApm.End(asyncResult);
+ }
+
+ public override Task CopyToAsync(Stream destination, int bufferSize, CancellationToken cancellationToken)
+ {
+ return _reader.CopyToAsync(destination, cancellationToken);
+ }
+ }
+}
--- /dev/null
+// Licensed to the .NET Foundation under one or more agreements.
+// The .NET Foundation licenses this file to you under the MIT license.
+
+using System.Diagnostics.CodeAnalysis;
+
+namespace System.Net.Connections
+{
+ /// <summary>
+ /// A container for connection properties.
+ /// </summary>
+ public interface IConnectionProperties
+ {
+ /// <summary>
+ /// Retrieves a connection property, if it exists.
+ /// </summary>
+ /// <param name="propertyKey">The key of the property to retrieve.</param>
+ /// <param name="property">If the property was found, retrieves the property. Otherwise, null.</param>
+ /// <returns>If the property was found, true. Otherwise, false.</returns>
+ bool TryGet(Type propertyKey, [NotNullWhen(true)] out object? property);
+ }
+}
--- /dev/null
+// Licensed to the .NET Foundation under one or more agreements.
+// The .NET Foundation licenses this file to you under the MIT license.
+
+using System.Threading.Tasks;
+using Xunit;
+
+namespace System.Net.Connections.Tests
+{
+ public class ConnectionBaseTest
+ {
+ [Fact]
+ public void Dispose_CallsClose_Success()
+ {
+ ConnectionCloseMethod? method = null;
+
+ var con = new MockConnection();
+ con.OnCloseAsyncCore = (m, t) =>
+ {
+ method = m;
+ return default(ValueTask);
+ };
+
+ con.Dispose();
+
+ Assert.Equal(ConnectionCloseMethod.GracefulShutdown, method);
+ }
+
+ [Fact]
+ public async Task DisposeAsync_CallsClose_Success()
+ {
+ ConnectionCloseMethod? method = null;
+
+ var con = new MockConnection();
+ con.OnCloseAsyncCore = (m, t) =>
+ {
+ method = m;
+ return default(ValueTask);
+ };
+
+ await con.DisposeAsync();
+
+ Assert.Equal(ConnectionCloseMethod.GracefulShutdown, method);
+ }
+
+ [Fact]
+ public void Dispose_CalledOnce_Success()
+ {
+ int callCount = 0;
+
+ var con = new MockConnection();
+ con.OnCloseAsyncCore = delegate
+ {
+ ++callCount;
+ return default(ValueTask);
+ };
+
+ con.Dispose();
+ con.Dispose();
+
+ Assert.Equal(1, callCount);
+ }
+
+ [Fact]
+ public async Task DisposeAsync_CalledOnce_Success()
+ {
+ int callCount = 0;
+
+ var con = new MockConnection();
+ con.OnCloseAsyncCore = delegate
+ {
+ ++callCount;
+ return default(ValueTask);
+ };
+
+ await con.DisposeAsync();
+ await con.DisposeAsync();
+
+ Assert.Equal(1, callCount);
+ }
+ }
+}
--- /dev/null
+// Licensed to the .NET Foundation under one or more agreements.
+// The .NET Foundation licenses this file to you under the MIT license.
+
+using System.Buffers;
+using System.Diagnostics;
+using System.Diagnostics.CodeAnalysis;
+using System.IO;
+using System.IO.Pipelines;
+using System.Linq;
+using System.Text;
+using System.Threading;
+using System.Threading.Tasks;
+using Xunit;
+
+namespace System.Net.Connections.Tests
+{
+ public class ConnectionTest
+ {
+ [Fact]
+ public void CreateStream_CalledOnce_Success()
+ {
+ int callCount = 0;
+
+ var con = new MockConnection();
+ con.OnCreateStream = () =>
+ {
+ ++callCount;
+ return new MemoryStream();
+ };
+
+ _ = con.Stream;
+ _ = con.Stream;
+
+ Assert.Equal(1, callCount);
+ }
+
+ [Fact]
+ public void CreatePipe_CalledOnce_Success()
+ {
+ int callCount = 0;
+
+ var con = new MockConnection();
+ con.OnCreatePipe = () =>
+ {
+ ++callCount;
+ return new MockPipe();
+ };
+
+ _ = con.Pipe;
+ _ = con.Pipe;
+
+ Assert.Equal(1, callCount);
+ }
+
+ [Fact]
+ public void AccessStream_AccessPipe_Fail()
+ {
+ var con = new MockConnection();
+ con.OnCreateStream = () => new MemoryStream();
+
+ _ = con.Stream;
+ Assert.Throws<InvalidOperationException>(() => _ = con.Pipe);
+ }
+
+ [Fact]
+ public void AccessPipe_AccessStream_Fail()
+ {
+ var con = new MockConnection();
+ con.OnCreatePipe = () => new MockPipe();
+
+ _ = con.Pipe;
+ Assert.Throws<InvalidOperationException>(() => _ = con.Stream);
+ }
+
+ [Fact]
+ public void AccessStream_NoOverloads_Fail()
+ {
+ var con = new ConnectionWithoutStreamOrPipe();
+ Assert.Throws<InvalidOperationException>(() => _ = con.Stream);
+ }
+
+ [Fact]
+ public void AccessPipe_NoOverloads_Fail()
+ {
+ var con = new ConnectionWithoutStreamOrPipe();
+ Assert.Throws<InvalidOperationException>(() => _ = con.Pipe);
+ }
+
+ [Fact]
+ public async Task WrappedStream_Success()
+ {
+ var bytesA = Encoding.ASCII.GetBytes("foo");
+ var bytesB = Encoding.ASCII.GetBytes("bar");
+
+ var stream = new MemoryStream();
+ stream.Write(bytesA);
+ stream.Position = 0;
+
+ var con = new MockConnection();
+ con.OnCreateStream = () => stream;
+
+ IDuplexPipe pipe = con.Pipe;
+
+ ReadResult res = await pipe.Input.ReadAsync();
+ Assert.Equal(bytesA, res.Buffer.ToArray());
+
+ await pipe.Output.WriteAsync(bytesB);
+ Assert.Equal(bytesA.Concat(bytesB).ToArray(), stream.ToArray());
+ }
+
+ [Fact]
+ public async Task WrappedPipe_Success()
+ {
+ var bytesA = Encoding.ASCII.GetBytes("foo");
+ var bytesB = Encoding.ASCII.GetBytes("bar");
+
+ var stream = new MemoryStream();
+ stream.Write(bytesA);
+ stream.Position = 0;
+
+ var pipe = new MockPipe
+ {
+ Input = PipeReader.Create(stream),
+ Output = PipeWriter.Create(stream)
+ };
+
+ var con = new MockConnection();
+ con.OnCreatePipe = () => pipe;
+
+ Stream s = con.Stream;
+
+ var readBuffer = new byte[4];
+ int len = await s.ReadAsync(readBuffer);
+ Assert.Equal(3, len);
+ Assert.Equal(bytesA, readBuffer.AsSpan(0, len).ToArray());
+
+ await s.WriteAsync(bytesB);
+ Assert.Equal(bytesA.Concat(bytesB).ToArray(), stream.ToArray());
+ }
+
+ [Theory]
+ [InlineData(ConnectionCloseMethod.GracefulShutdown, true)]
+ [InlineData(ConnectionCloseMethod.Abort, false)]
+ [InlineData(ConnectionCloseMethod.Immediate, false)]
+ public async Task FromStream_CloseMethod_Flushed(ConnectionCloseMethod method, bool shouldFlush)
+ {
+ bool streamFlushed = false;
+
+ var stream = new MockStream
+ {
+ OnFlushAsync = _ => { streamFlushed = true; return Task.CompletedTask; }
+ };
+
+ var con = Connection.FromStream(stream, leaveOpen: true);
+
+ await con.CloseAsync(method);
+ Assert.Equal(shouldFlush, streamFlushed);
+ }
+
+ [Theory]
+ [InlineData(ConnectionCloseMethod.GracefulShutdown, true)]
+ [InlineData(ConnectionCloseMethod.Abort, false)]
+ [InlineData(ConnectionCloseMethod.Immediate, false)]
+ public async Task FromPipe_CloseMethod_Flushed(ConnectionCloseMethod method, bool shouldFlush)
+ {
+ bool pipeFlushed = false;
+
+ var pipe = new MockPipe
+ {
+ Input = new MockPipeReader()
+ {
+ OnCompleteAsync = _ => default
+ },
+ Output = new MockPipeWriter()
+ {
+ OnFlushAsync = _ => { pipeFlushed = true; return default; },
+ OnCompleteAsync = _ => default
+ }
+ };
+
+ var con = Connection.FromPipe(pipe);
+
+ await con.CloseAsync(method, new CancellationTokenSource().Token);
+ Assert.Equal(shouldFlush, pipeFlushed);
+ }
+
+ [Theory]
+ [InlineData(true, false)]
+ [InlineData(false, true)]
+ public async Task FromStream_LeaveOpen_StreamDisposed(bool leaveOpen, bool shouldDispose)
+ {
+ bool streamDisposed = false;
+
+ var stream = new MockStream();
+ stream.OnDisposeAsync = delegate { streamDisposed = true; return default; };
+
+ var con = Connection.FromStream(stream, leaveOpen);
+
+ await con.CloseAsync(ConnectionCloseMethod.Immediate);
+ Assert.Equal(shouldDispose, streamDisposed);
+ }
+
+ [Theory]
+ [InlineData(true, false)]
+ [InlineData(false, true)]
+ public async Task FromPipe_LeaveOpen_PipeDisposed(bool leaveOpen, bool shouldDispose)
+ {
+ bool pipeDisposed = false;
+
+ var pipe = new MockPipe
+ {
+ OnDisposeAsync = () => { pipeDisposed = true; return default; },
+ Input = new MockPipeReader()
+ {
+ OnCompleteAsync = _ => default
+ },
+ Output = new MockPipeWriter()
+ {
+ OnFlushAsync = _ => default,
+ OnCompleteAsync = _ => default
+ }
+ };
+
+ var con = Connection.FromPipe(pipe, leaveOpen);
+
+ await con.CloseAsync(ConnectionCloseMethod.Immediate);
+ Assert.Equal(shouldDispose, pipeDisposed);
+ }
+
+ [Fact]
+ public void FromStream_PropertiesInitialized()
+ {
+ var properties = new DummyConnectionProperties();
+ var localEndPoint = new IPEndPoint(IPAddress.Any, 1);
+ var remoteEndPoint = new IPEndPoint(IPAddress.Any, 2);
+
+ Connection c = Connection.FromStream(new MockStream(), leaveOpen: false, properties, localEndPoint, remoteEndPoint);
+ Assert.Same(properties, c.ConnectionProperties);
+ Assert.Same(localEndPoint, c.LocalEndPoint);
+ Assert.Same(remoteEndPoint, c.RemoteEndPoint);
+ }
+
+ [Fact]
+ public void FromPipe_PropertiesInitialized()
+ {
+ var properties = new DummyConnectionProperties();
+ var localEndPoint = new IPEndPoint(IPAddress.Any, 1);
+ var remoteEndPoint = new IPEndPoint(IPAddress.Any, 2);
+
+ Connection c = Connection.FromPipe(new MockPipe(), leaveOpen: false, properties, localEndPoint, remoteEndPoint);
+ Assert.Same(properties, c.ConnectionProperties);
+ Assert.Same(localEndPoint, c.LocalEndPoint);
+ Assert.Same(remoteEndPoint, c.RemoteEndPoint);
+ }
+
+ private sealed class DummyConnectionProperties : IConnectionProperties
+ {
+ public bool TryGet(Type propertyKey, [NotNullWhen(true)] out object property)
+ {
+ throw new NotImplementedException();
+ }
+ }
+ }
+}
--- /dev/null
+// Licensed to the .NET Foundation under one or more agreements.
+// The .NET Foundation licenses this file to you under the MIT license.
+
+using System.Threading;
+using System.Threading.Tasks;
+
+namespace System.Net.Connections.Tests
+{
+ internal class ConnectionWithoutStreamOrPipe : Connection
+ {
+ public override IConnectionProperties ConnectionProperties => throw new NotImplementedException();
+
+ public override EndPoint LocalEndPoint => throw new NotImplementedException();
+
+ public override EndPoint RemoteEndPoint => throw new NotImplementedException();
+
+ protected override ValueTask CloseAsyncCore(ConnectionCloseMethod method, CancellationToken cancellationToken)
+ {
+ throw new NotImplementedException();
+ }
+ }
+}
--- /dev/null
+// Licensed to the .NET Foundation under one or more agreements.
+// The .NET Foundation licenses this file to you under the MIT license.
+
+using System.IO;
+using System.IO.Pipelines;
+using System.Threading;
+using System.Threading.Tasks;
+
+namespace System.Net.Connections.Tests
+{
+ internal class MockConnection : Connection
+ {
+ public Func<IConnectionProperties> OnConnectionProperties { get; set; }
+ public Func<EndPoint> OnLocalEndPoint { get; set; }
+ public Func<EndPoint> OnRemoteEndPoint { get; set; }
+ public Func<ConnectionCloseMethod, CancellationToken, ValueTask> OnCloseAsyncCore { get; set; }
+ public Func<IDuplexPipe> OnCreatePipe { get; set; }
+ public Func<Stream> OnCreateStream { get; set; }
+
+ public override IConnectionProperties ConnectionProperties => OnConnectionProperties();
+
+ public override EndPoint LocalEndPoint => OnLocalEndPoint();
+
+ public override EndPoint RemoteEndPoint => OnRemoteEndPoint();
+
+ protected override ValueTask CloseAsyncCore(ConnectionCloseMethod method, CancellationToken cancellationToken) =>
+ OnCloseAsyncCore(method, cancellationToken);
+
+ protected override IDuplexPipe CreatePipe() => OnCreatePipe != null ? OnCreatePipe() : base.CreatePipe();
+
+ protected override Stream CreateStream() => OnCreateStream != null ? OnCreateStream() : base.CreateStream();
+ }
+}
--- /dev/null
+// Licensed to the .NET Foundation under one or more agreements.
+// The .NET Foundation licenses this file to you under the MIT license.
+
+using System.IO.Pipelines;
+using System.Threading.Tasks;
+
+namespace System.Net.Connections.Tests
+{
+ internal class MockPipe : IDuplexPipe, IAsyncDisposable
+ {
+ public Func<ValueTask> OnDisposeAsync { get; set; }
+ public PipeReader Input { get; set; }
+
+ public PipeWriter Output { get; set; }
+
+ public ValueTask DisposeAsync()
+ {
+ return OnDisposeAsync?.Invoke() ?? default;
+ }
+ }
+}
--- /dev/null
+// Licensed to the .NET Foundation under one or more agreements.
+// The .NET Foundation licenses this file to you under the MIT license.
+
+using System.IO.Pipelines;
+using System.Threading;
+using System.Threading.Tasks;
+
+namespace System.Net.Connections.Tests
+{
+ internal class MockPipeReader : PipeReader
+ {
+ public Action<SequencePosition, SequencePosition> OnAdvanceTo { get; set; }
+ public Action OnCancelPendingRead { get; set; }
+ public Action<Exception> OnComplete { get; set; }
+ public Func<Exception,ValueTask> OnCompleteAsync { get; set; }
+ public Func<CancellationToken, ValueTask<ReadResult>> OnReadAsync { get; set; }
+ public Func<ReadResult?> OnTryRead { get; set; }
+
+ public override void AdvanceTo(SequencePosition consumed)
+ => OnAdvanceTo(consumed, consumed);
+
+ public override void AdvanceTo(SequencePosition consumed, SequencePosition examined)
+ => OnAdvanceTo(consumed, examined);
+
+ public override void CancelPendingRead()
+ => OnCancelPendingRead();
+
+ public override void Complete(Exception exception = null)
+ => OnComplete(exception);
+
+ public override ValueTask<ReadResult> ReadAsync(CancellationToken cancellationToken = default)
+ => OnReadAsync(cancellationToken);
+
+ public override bool TryRead(out ReadResult result)
+ {
+ ReadResult? r = OnTryRead();
+ result = r.GetValueOrDefault();
+ return r.HasValue;
+ }
+
+ public override ValueTask CompleteAsync(Exception exception = null)
+ => OnCompleteAsync(exception);
+ }
+}
--- /dev/null
+// Licensed to the .NET Foundation under one or more agreements.
+// The .NET Foundation licenses this file to you under the MIT license.
+
+using System.IO.Pipelines;
+using System.Threading;
+using System.Threading.Tasks;
+
+namespace System.Net.Connections.Tests
+{
+ internal class MockPipeWriter : PipeWriter
+ {
+ public Action<int> OnAdvance { get; set; }
+ public Action OnCancelPendingFlush { get; set; }
+ public Action<Exception> OnComplete { get; set; }
+ public Func<Exception,ValueTask> OnCompleteAsync { get; set; }
+ public Func<CancellationToken,ValueTask<FlushResult>> OnFlushAsync { get; set; }
+ public Func<int, Memory<byte>> OnGetMemory { get; set; }
+
+ public override void Advance(int bytes)
+ => OnAdvance(bytes);
+
+ public override void CancelPendingFlush()
+ => OnCancelPendingFlush();
+
+ public override void Complete(Exception exception = null)
+ => OnComplete(exception);
+
+ public override ValueTask CompleteAsync(Exception exception = null)
+ => OnCompleteAsync(exception);
+
+ public override ValueTask<FlushResult> FlushAsync(CancellationToken cancellationToken = default)
+ => OnFlushAsync(cancellationToken);
+
+ public override Memory<byte> GetMemory(int sizeHint = 0)
+ => OnGetMemory(sizeHint);
+
+ public override Span<byte> GetSpan(int sizeHint = 0)
+ => GetMemory(sizeHint).Span;
+ }
+}
--- /dev/null
+// Licensed to the .NET Foundation under one or more agreements.
+// The .NET Foundation licenses this file to you under the MIT license.
+
+using System.IO;
+using System.Threading;
+using System.Threading.Tasks;
+
+namespace System.Net.Connections.Tests
+{
+ internal class MockStream : Stream
+ {
+ public Func<Memory<byte>, CancellationToken, ValueTask<int>> OnReadAsync { get; set; }
+ public Func<ReadOnlyMemory<byte>, CancellationToken, ValueTask> OnWriteAsync { get; set; }
+ public Func<CancellationToken, Task> OnFlushAsync { get; set; }
+ public Func<ValueTask> OnDisposeAsync { get; set; }
+
+ public override bool CanRead => true;
+
+ public override bool CanSeek => false;
+
+ public override bool CanWrite => true;
+
+ public override long Length => throw new NotImplementedException();
+
+ public override long Position { get => throw new NotImplementedException(); set => throw new NotImplementedException(); }
+
+ protected override void Dispose(bool disposing)
+ {
+ if (disposing) DisposeAsync().AsTask().GetAwaiter().GetResult();
+ }
+
+ public override ValueTask DisposeAsync()
+ {
+ return OnDisposeAsync();
+ }
+
+ public override void Flush()
+ {
+ FlushAsync().GetAwaiter().GetResult();
+ }
+
+ public override Task FlushAsync(CancellationToken cancellationToken)
+ {
+ return OnFlushAsync(cancellationToken);
+ }
+
+ public override int Read(byte[] buffer, int offset, int count)
+ {
+ return ReadAsync(buffer, offset, count).GetAwaiter().GetResult();
+ }
+
+ public override Task<int> ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
+ {
+ return ReadAsync(buffer.AsMemory(offset, count), cancellationToken).AsTask();
+ }
+
+ public override ValueTask<int> ReadAsync(Memory<byte> buffer, CancellationToken cancellationToken = default)
+ {
+ return OnReadAsync(buffer, cancellationToken);
+ }
+
+ public override IAsyncResult BeginRead(byte[] buffer, int offset, int count, AsyncCallback callback, object state)
+ {
+ return TaskToApm.Begin(ReadAsync(buffer, offset, count), callback, state);
+ }
+
+ public override int EndRead(IAsyncResult asyncResult)
+ {
+ return TaskToApm.End<int>(asyncResult);
+ }
+
+ public override long Seek(long offset, SeekOrigin origin)
+ {
+ throw new NotImplementedException();
+ }
+
+ public override void SetLength(long value)
+ {
+ throw new NotImplementedException();
+ }
+
+ public override void Write(byte[] buffer, int offset, int count)
+ {
+ WriteAsync(buffer, offset, count).GetAwaiter().GetResult();
+ }
+
+ public override Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
+ {
+ return WriteAsync(buffer.AsMemory(offset, count), cancellationToken).AsTask();
+ }
+
+ public override ValueTask WriteAsync(ReadOnlyMemory<byte> buffer, CancellationToken cancellationToken = default)
+ {
+ return OnWriteAsync(buffer, cancellationToken);
+ }
+
+ public override IAsyncResult BeginWrite(byte[] buffer, int offset, int count, AsyncCallback callback, object state)
+ {
+ return TaskToApm.Begin(WriteAsync(buffer, offset, count), callback, state);
+ }
+
+ public override void EndWrite(IAsyncResult asyncResult)
+ {
+ TaskToApm.End(asyncResult);
+ }
+ }
+}
--- /dev/null
+<Project Sdk="Microsoft.NET.Sdk">
+
+ <PropertyGroup>
+ <TargetFrameworks>$(NetCoreAppCurrent)</TargetFrameworks>
+ </PropertyGroup>
+
+ <ItemGroup>
+ <Compile Include="$(CommonPath)System\Threading\Tasks\TaskToApm.cs" Link="Common\System\Threading\Tasks\TaskToApm.cs" />
+ <Compile Include="ConnectionBaseTest.cs" />
+ <Compile Include="ConnectionTest.cs" />
+ <Compile Include="ConnectionWithoutStreamOrPipe.cs" />
+ <Compile Include="MockConnection.cs" />
+ <Compile Include="MockPipe.cs" />
+ <Compile Include="MockPipeReader.cs" />
+ <Compile Include="MockPipeWriter.cs" />
+ <Compile Include="MockStream.cs" />
+ </ItemGroup>
+
+</Project>
protected override System.Threading.Tasks.Task SerializeToStreamAsync(System.IO.Stream stream, System.Net.TransportContext? context, System.Threading.CancellationToken cancellationToken) { throw null; }
protected internal override bool TryComputeLength(out long length) { throw null; }
}
+ public partial class SocketsHttpConnectionFactory : System.Net.Connections.ConnectionFactory
+ {
+ public SocketsHttpConnectionFactory() { }
+ public sealed override System.Threading.Tasks.ValueTask<System.Net.Connections.Connection> ConnectAsync(System.Net.EndPoint? endPoint, System.Net.Connections.IConnectionProperties? options = null, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
+ public virtual System.Net.Sockets.Socket CreateSocket(System.Net.Http.HttpRequestMessage message, System.Net.EndPoint? endPoint, System.Net.Connections.IConnectionProperties options) { throw null; }
+ protected override void Dispose(bool disposing) { }
+ protected override System.Threading.Tasks.ValueTask DisposeAsyncCore() { throw null; }
+ public virtual System.Threading.Tasks.ValueTask<System.Net.Connections.Connection> EstablishConnectionAsync(System.Net.Http.HttpRequestMessage message, System.Net.EndPoint? endPoint, System.Net.Connections.IConnectionProperties options, System.Threading.CancellationToken cancellationToken) { throw null; }
+ }
public sealed partial class SocketsHttpHandler : System.Net.Http.HttpMessageHandler
{
public SocketsHttpHandler() { }
public bool AllowAutoRedirect { get { throw null; } set { } }
public System.Net.DecompressionMethods AutomaticDecompression { get { throw null; } set { } }
+ public System.Net.Connections.ConnectionFactory? ConnectionFactory { get { throw null; } set { } }
public System.TimeSpan ConnectTimeout { get { throw null; } set { } }
[System.Diagnostics.CodeAnalysis.AllowNullAttribute]
public System.Net.CookieContainer CookieContainer { get { throw null; } set { } }
public int MaxConnectionsPerServer { get { throw null; } set { } }
public int MaxResponseDrainSize { get { throw null; } set { } }
public int MaxResponseHeadersLength { get { throw null; } set { } }
+ public System.Func<System.Net.Http.HttpRequestMessage, System.Net.Connections.Connection, System.Threading.CancellationToken, System.Threading.Tasks.ValueTask<System.Net.Connections.Connection>>? PlaintextFilter { get { throw null; } set { } }
public System.TimeSpan PooledConnectionIdleTimeout { get { throw null; } set { } }
public System.TimeSpan PooledConnectionLifetime { get { throw null; } set { } }
public bool PreAuthenticate { get { throw null; } set { } }
<ItemGroup>
<ProjectReference Include="..\..\System.Runtime\ref\System.Runtime.csproj" />
<ProjectReference Include="..\..\System.Net.Primitives\ref\System.Net.Primitives.csproj" />
+ <ProjectReference Include="..\..\System.Net.Sockets\ref\System.Net.Sockets.csproj" />
+ <ProjectReference Include="..\..\System.Net.Connections\ref\System.Net.Connections.csproj" />
<ProjectReference Include="..\..\System.Net.Security\ref\System.Net.Security.csproj" />
<ProjectReference Include="..\..\System.Security.Cryptography.X509Certificates\ref\System.Security.Cryptography.X509Certificates.csproj" />
<ProjectReference Include="..\..\System.Text.Encoding\ref\System.Text.Encoding.csproj" />
<Compile Include="System\Net\Http\SocketsHttpHandler\RedirectHandler.cs" />
<Compile Include="System\Net\Http\SocketsHttpHandler\SocketsHttpHandler.cs" />
<Compile Include="System\Net\Http\SocketsHttpHandler\SystemProxyInfo.cs" />
+ <Compile Include="System\Net\Http\SocketsHttpHandler\Connections\SocketConnection.cs" />
+ <Compile Include="System\Net\Http\SocketsHttpHandler\Connections\TaskSocketAsyncEventArgs.cs" />
+ <Compile Include="System\Net\Http\SocketsHttpHandler\DnsEndPointWithProperties.cs" />
+ <Compile Include="System\Net\Http\SocketsHttpHandler\SocketsHttpConnectionFactory.cs" />
<Compile Include="$(CommonPath)System\Net\NTAuthentication.Common.cs"
Link="Common\System\Net\NTAuthentication.Common.cs" />
<Compile Include="$(CommonPath)System\Net\ContextFlagsPal.cs"
<Compile Include="System\Net\Http\SocketsHttpHandler\HttpNoProxy.cs" />
<Compile Include="System\Net\Http\BrowserHttpHandler\SystemProxyInfo.Browser.cs" />
<Compile Include="System\Net\Http\BrowserHttpHandler\SocketsHttpHandler.cs" />
+ <Compile Include="System\Net\Http\BrowserHttpHandler\SocketsHttpConnectionFactory.cs" />
<Compile Include="System\Net\Http\BrowserHttpHandler\BrowserHttpHandler.cs" />
</ItemGroup>
<ItemGroup>
<Reference Include="System.Diagnostics.DiagnosticSource" />
<Reference Include="System.Diagnostics.Tracing" />
<Reference Include="System.IO.Compression" />
+ <Reference Include="System.IO.Pipelines" />
<Reference Include="System.Memory" />
+ <Reference Include="System.Net.Connections" />
<Reference Include="System.Net.NameResolution" />
<Reference Include="System.Net.NetworkInformation" />
<Reference Include="System.Net.Primitives" />
--- /dev/null
+using System.Threading;
+using System.Threading.Tasks;
+using System.Net.Connections;
+using System.Net.Sockets;
+
+namespace System.Net.Http
+{
+ public class SocketsHttpConnectionFactory : ConnectionFactory
+ {
+ public sealed override ValueTask<Connection> ConnectAsync(EndPoint? endPoint, IConnectionProperties? options = null, CancellationToken cancellationToken = default)
+ => throw new NotImplementedException();
+
+ public virtual Socket CreateSocket(HttpRequestMessage message, EndPoint? endPoint, IConnectionProperties options)
+ => throw new NotImplementedException();
+
+ public virtual ValueTask<Connection> EstablishConnectionAsync(HttpRequestMessage message, EndPoint? endPoint, IConnectionProperties options, CancellationToken cancellationToken)
+ => throw new NotImplementedException();
+ }
+}
using System.Threading;
using System.Threading.Tasks;
using System.Diagnostics.CodeAnalysis;
+using System.Net.Connections;
namespace System.Net.Http
{
set => throw new PlatformNotSupportedException();
}
+ public ConnectionFactory? ConnectionFactory
+ {
+ get => throw new PlatformNotSupportedException();
+ set => throw new PlatformNotSupportedException();
+ }
+
+ public Func<HttpRequestMessage, Connection, CancellationToken, ValueTask<Connection>>? PlaintextFilter
+ {
+ get => throw new PlatformNotSupportedException();
+ set => throw new PlatformNotSupportedException();
+ }
+
public IDictionary<string, object?> Properties => throw new PlatformNotSupportedException();
protected internal override Task<HttpResponseMessage> SendAsync(
using System.Diagnostics;
using System.IO;
+using System.Net.Connections;
using System.Net.Quic;
using System.Net.Security;
using System.Net.Sockets;
-using System.Runtime.CompilerServices;
-using System.Runtime.ExceptionServices;
using System.Security.Cryptography.X509Certificates;
using System.Threading;
using System.Threading.Tasks;
}
}
- public static ValueTask<Stream> ConnectAsync(string host, int port, bool async, CancellationToken cancellationToken)
+ public static async ValueTask<Connection> ConnectAsync(ConnectionFactory factory, DnsEndPoint endPoint, IConnectionProperties? options, CancellationToken cancellationToken)
{
- return async ? ConnectAsync(host, port, cancellationToken) : new ValueTask<Stream>(Connect(host, port, cancellationToken));
- }
-
- private static async ValueTask<Stream> ConnectAsync(string host, int port, CancellationToken cancellationToken)
- {
- // Rather than creating a new Socket and calling ConnectAsync on it, we use the static
- // Socket.ConnectAsync with a SocketAsyncEventArgs, as we can then use Socket.CancelConnectAsync
- // to cancel it if needed.
- var saea = new ConnectEventArgs();
try
{
- saea.Initialize(cancellationToken);
-
- // Configure which server to which to connect.
- saea.RemoteEndPoint = new DnsEndPoint(host, port);
-
- // Initiate the connection.
- if (Socket.ConnectAsync(SocketType.Stream, ProtocolType.Tcp, saea))
- {
- // Connect completing asynchronously. Enable it to be canceled and wait for it.
- using (cancellationToken.UnsafeRegister(static s => Socket.CancelConnectAsync((SocketAsyncEventArgs)s!), saea))
- {
- await saea.Builder.Task.ConfigureAwait(false);
- }
- }
- else if (saea.SocketError != SocketError.Success)
- {
- // Connect completed synchronously but unsuccessfully.
- throw new SocketException((int)saea.SocketError);
- }
-
- Debug.Assert(saea.SocketError == SocketError.Success, $"Expected Success, got {saea.SocketError}.");
- Debug.Assert(saea.ConnectSocket != null, "Expected non-null socket");
-
- // Configure the socket and return a stream for it.
- Socket socket = saea.ConnectSocket;
- socket.NoDelay = true;
- return new NetworkStream(socket, ownsSocket: true);
+ return await factory.ConnectAsync(endPoint, options, cancellationToken).ConfigureAwait(false);
}
- catch (Exception error) when (!(error is OperationCanceledException))
+ catch (OperationCanceledException ex) when (ex.CancellationToken == cancellationToken)
{
- throw CreateWrappedException(error, host, port, cancellationToken);
+ throw CancellationHelper.CreateOperationCanceledException(innerException: null, cancellationToken);
}
- finally
+ catch (Exception ex)
{
- saea.Dispose();
+ throw CreateWrappedException(ex, endPoint.Host, endPoint.Port, cancellationToken);
}
}
- private static Stream Connect(string host, int port, CancellationToken cancellationToken)
+ public static Connection Connect(string host, int port, CancellationToken cancellationToken)
{
// For synchronous connections, we can just create a socket and make the connection.
cancellationToken.ThrowIfCancellationRequested();
{
socket.Connect(new DnsEndPoint(host, port));
}
-
- return new NetworkStream(socket, ownsSocket: true);
}
catch (Exception e)
{
socket.Dispose();
throw CreateWrappedException(e, host, port, cancellationToken);
}
- }
-
- /// <summary>SocketAsyncEventArgs that carries with it additional state for a Task builder and a CancellationToken.</summary>
- private sealed class ConnectEventArgs : SocketAsyncEventArgs
- {
- internal ConnectEventArgs() :
- // The OnCompleted callback serves just to complete a task that's awaited in ConnectAsync,
- // so we don't need to also flow ExecutionContext again into the OnCompleted callback.
- base(unsafeSuppressExecutionContextFlow: true)
- {
- }
-
- public AsyncTaskMethodBuilder Builder { get; private set; }
- public CancellationToken CancellationToken { get; private set; }
- public void Initialize(CancellationToken cancellationToken)
- {
- CancellationToken = cancellationToken;
- AsyncTaskMethodBuilder b = default;
- _ = b.Task; // force initialization
- Builder = b;
- }
-
- protected override void OnCompleted(SocketAsyncEventArgs _)
- {
- switch (SocketError)
- {
- case SocketError.Success:
- Builder.SetResult();
- break;
-
- case SocketError.OperationAborted:
- case SocketError.ConnectionAborted:
- if (CancellationToken.IsCancellationRequested)
- {
- Builder.SetException(ExceptionDispatchInfo.SetCurrentStackTrace(CancellationHelper.CreateOperationCanceledException(null, CancellationToken)));
- break;
- }
- goto default;
-
- default:
- Builder.SetException(ExceptionDispatchInfo.SetCurrentStackTrace(new SocketException((int)SocketError)));
- break;
- }
- }
+ return new SocketConnection(socket);
}
public static ValueTask<SslStream> EstablishSslConnectionAsync(SslClientAuthenticationOptions sslOptions, HttpRequestMessage request, bool async, Stream stream, CancellationToken cancellationToken)
--- /dev/null
+// Licensed to the .NET Foundation under one or more agreements.
+// The .NET Foundation licenses this file to you under the MIT license.
+
+using System.Diagnostics.CodeAnalysis;
+using System.IO;
+using System.Net.Sockets;
+using System.Threading;
+using System.Threading.Tasks;
+
+namespace System.Net.Connections
+{
+ internal sealed class SocketConnection : Connection, IConnectionProperties
+ {
+ private readonly SocketConnectionNetworkStream _stream;
+
+ public override EndPoint? RemoteEndPoint => _stream.Socket.RemoteEndPoint;
+ public override EndPoint? LocalEndPoint => _stream.Socket.LocalEndPoint;
+ public override IConnectionProperties ConnectionProperties => this;
+
+ public SocketConnection(Socket socket)
+ {
+ _stream = new SocketConnectionNetworkStream(socket, this);
+ }
+
+ protected override ValueTask CloseAsyncCore(ConnectionCloseMethod method, CancellationToken cancellationToken)
+ {
+ if (cancellationToken.IsCancellationRequested)
+ {
+ return ValueTask.FromCanceled(cancellationToken);
+ }
+
+ try
+ {
+ if (method != ConnectionCloseMethod.GracefulShutdown)
+ {
+ // Dispose must be called first in order to cause a connection reset,
+ // as NetworkStream.Dispose() will call Shutdown(Both).
+ _stream.Socket.Dispose();
+ }
+
+ _stream.DisposeWithoutClosingConnection();
+ }
+ catch (Exception ex)
+ {
+ return ValueTask.FromException(ex);
+ }
+
+ return default;
+ }
+
+ protected override Stream CreateStream() => _stream;
+
+ bool IConnectionProperties.TryGet(Type propertyKey, [NotNullWhen(true)] out object? property)
+ {
+ if (propertyKey == typeof(Socket))
+ {
+ property = _stream.Socket;
+ return true;
+ }
+
+ property = null;
+ return false;
+ }
+
+ // This is done to couple disposal of the SocketConnection and the NetworkStream.
+ private sealed class SocketConnectionNetworkStream : NetworkStream
+ {
+ private readonly SocketConnection _connection;
+
+ public SocketConnectionNetworkStream(Socket socket, SocketConnection connection) : base(socket, ownsSocket: true)
+ {
+ _connection = connection;
+ }
+
+ public void DisposeWithoutClosingConnection()
+ {
+ base.Dispose(true);
+ }
+
+ protected override void Dispose(bool disposing)
+ {
+ if (disposing)
+ {
+ // This will call base.Dispose().
+ _connection.Dispose();
+ }
+ else
+ {
+ base.Dispose(disposing);
+ }
+ }
+
+ public override ValueTask DisposeAsync()
+ {
+ // This will call base.Dispose().
+ Dispose(true);
+ return default;
+ }
+ }
+ }
+}
--- /dev/null
+// Licensed to the .NET Foundation under one or more agreements.
+// The .NET Foundation licenses this file to you under the MIT license.
+
+using System.Net.Sockets;
+using System.Threading.Tasks;
+using System.Threading.Tasks.Sources;
+
+namespace System.Net.Connections
+{
+ internal sealed class TaskSocketAsyncEventArgs : SocketAsyncEventArgs, IValueTaskSource
+ {
+ private ManualResetValueTaskSourceCore<int> _valueTaskSource;
+
+ public void ResetTask() => _valueTaskSource.Reset();
+ public ValueTask Task => new ValueTask(this, _valueTaskSource.Version);
+
+ public void GetResult(short token) => _valueTaskSource.GetResult(token);
+ public ValueTaskSourceStatus GetStatus(short token) => _valueTaskSource.GetStatus(token);
+ public void OnCompleted(Action<object?> continuation, object? state, short token, ValueTaskSourceOnCompletedFlags flags) => _valueTaskSource.OnCompleted(continuation, state, token, flags);
+ public void Complete() => _valueTaskSource.SetResult(0);
+
+ public TaskSocketAsyncEventArgs()
+ : base(unsafeSuppressExecutionContextFlow: true)
+ {
+ }
+
+ protected override void OnCompleted(SocketAsyncEventArgs e)
+ {
+ _valueTaskSource.SetResult(0);
+ }
+ }
+}
--- /dev/null
+// Licensed to the .NET Foundation under one or more agreements.
+// The .NET Foundation licenses this file to you under the MIT license.
+
+using System.Diagnostics.CodeAnalysis;
+using System.Net.Connections;
+
+namespace System.Net.Http
+{
+ // Passed to a connection factory, merges allocations for the DnsEndPoint and connection properties.
+ internal sealed class DnsEndPointWithProperties : DnsEndPoint, IConnectionProperties
+ {
+ public HttpRequestMessage InitialRequest { get; }
+
+ public DnsEndPointWithProperties(string host, int port, HttpRequestMessage initialRequest) : base(host, port)
+ {
+ InitialRequest = initialRequest;
+ }
+
+ bool IConnectionProperties.TryGet(Type propertyKey, [NotNullWhen(true)] out object? property)
+ {
+ if (propertyKey == typeof(DnsEndPointWithProperties))
+ {
+ property = this;
+ return true;
+ }
+
+ property = null;
+ return false;
+ }
+ }
+}
using System.Diagnostics;
using System.Diagnostics.CodeAnalysis;
using System.IO;
+using System.Net.Connections;
using System.Net.Http.Headers;
using System.Net.Http.HPack;
using System.Net.Security;
{
private readonly HttpConnectionPool _pool;
private readonly Stream _stream;
+ private readonly Connection _connection;
// NOTE: These are mutable structs; do not make these readonly.
private ArrayBuffer _incomingBuffer;
// Channel options for creating _writeChannel
private static readonly UnboundedChannelOptions s_channelOptions = new UnboundedChannelOptions() { SingleReader = true };
- public Http2Connection(HttpConnectionPool pool, Stream stream)
+ public Http2Connection(HttpConnectionPool pool, Connection connection)
{
_pool = pool;
- _stream = stream;
+ _stream = connection.Stream;
+ _connection = connection;
_incomingBuffer = new ArrayBuffer(InitialConnectionBufferSize);
_outgoingBuffer = new ArrayBuffer(InitialConnectionBufferSize);
_pendingWindowUpdate = 0;
_idleSinceTickCount = Environment.TickCount64;
- if (NetEventSource.Log.IsEnabled()) TraceConnection(stream);
+ if (NetEventSource.Log.IsEnabled()) TraceConnection(_stream);
}
private object SyncObject => _httpStreams;
}
// Do shutdown.
- _stream.Close();
+ _connection.Dispose();
_connectionWindow.Dispose();
_concurrentStreams.Dispose();
using System.Net.Http.Headers;
using System.Net.Security;
using System.Net.Sockets;
+using System.Net.Connections;
using System.Runtime.CompilerServices;
using System.Text;
using System.Threading;
private readonly HttpConnectionPool _pool;
private readonly Socket? _socket; // used for polling; _stream should be used for all reading/writing. _stream owns disposal.
private readonly Stream _stream;
+ private readonly Connection _connection;
private readonly TransportContext? _transportContext;
private readonly WeakReference<HttpConnection> _weakThisRef;
public HttpConnection(
HttpConnectionPool pool,
- Socket? socket,
- Stream stream,
+ Connection connection,
TransportContext? transportContext)
{
Debug.Assert(pool != null);
- Debug.Assert(stream != null);
+ Debug.Assert(connection != null);
_pool = pool;
- _socket = socket; // may be null in cases where we couldn't easily get the underlying socket
- _stream = stream;
+ connection.ConnectionProperties.TryGet(out _socket); // may be null in cases where we couldn't easily get the underlying socket
+ _stream = connection.Stream;
+ _connection = connection;
_transportContext = transportContext;
_writeBuffer = new byte[InitialWriteBufferSize];
if (disposing)
{
GC.SuppressFinalize(this);
- _stream.Dispose();
+ _connection.Dispose();
// Eat any exceptions from the read-ahead task. We don't need to log, as we expect
// failures from this task due to closing the connection while a read is in progress.
internal sealed class HttpConnectionWithFinalizer : HttpConnection
{
- public HttpConnectionWithFinalizer(HttpConnectionPool pool, Socket? socket, Stream stream, TransportContext? transportContext) : base(pool, socket, stream, transportContext) { }
+ public HttpConnectionWithFinalizer(HttpConnectionPool pool, Connection connection, TransportContext? transportContext) : base(pool, connection, transportContext) { }
// This class is separated from HttpConnection so we only pay the price of having a finalizer
// when it's actually needed, e.g. when MaxConnectionsPerServer is enabled.
using System.Diagnostics;
using System.Globalization;
using System.IO;
+using System.Net.Connections;
using System.Net.Http.Headers;
using System.Net.Http.HPack;
using System.Net.Http.QPack;
}
// Try to establish an HTTP2 connection
- Socket? socket = null;
+ Connection? connection = null;
SslStream? sslStream = null;
TransportContext? transportContext = null;
Trace("Attempting new HTTP2 connection.");
}
- Stream? stream;
HttpResponseMessage? failureResponse;
- (socket, stream, transportContext, failureResponse) =
+
+ (connection, transportContext, failureResponse) =
await ConnectAsync(request, async, true, cancellationToken).ConfigureAwait(false);
+
if (failureResponse != null)
{
return (null, true, failureResponse);
}
+ Debug.Assert(connection != null);
+
+ sslStream = connection.Stream as SslStream;
+
+ if (Settings._plaintextFilter != null)
+ {
+ connection = await Settings._plaintextFilter(request, connection, cancellationToken).ConfigureAwait(false);
+ }
+
if (_kind == HttpConnectionKind.Http)
{
- http2Connection = new Http2Connection(this, stream!);
+ http2Connection = new Http2Connection(this, connection);
await http2Connection.SetupAsync().ConfigureAwait(false);
AddHttp2Connection(http2Connection);
return (http2Connection, true, null);
}
- sslStream = (SslStream)stream!;
+ Debug.Assert(sslStream != null);
+
if (sslStream.NegotiatedApplicationProtocol == SslApplicationProtocol.Http2)
{
// The server accepted our request for HTTP2.
throw new HttpRequestException(SR.Format(SR.net_ssl_http2_requires_tls12, sslStream.SslProtocol));
}
- http2Connection = new Http2Connection(this, sslStream);
+ http2Connection = new Http2Connection(this, connection);
await http2Connection.SetupAsync().ConfigureAwait(false);
AddHttp2Connection(http2Connection);
if (canUse)
{
- return (ConstructHttp11Connection(socket, sslStream, transportContext), true, null);
+ return (ConstructHttp11Connection(connection!, transportContext), true, null);
}
else
{
Trace("Discarding downgraded HTTP/1.1 connection because connection limit is exceeded");
}
- sslStream.Close();
+ await connection!.CloseAsync(ConnectionCloseMethod.GracefulShutdown, cancellationToken).ConfigureAwait(false);
}
}
return SendWithProxyAuthAsync(request, async, doRequestAuth, cancellationToken);
}
- private async ValueTask<(Socket?, Stream?, TransportContext?, HttpResponseMessage?)> ConnectAsync(HttpRequestMessage request, bool async, bool allowHttp2, CancellationToken cancellationToken)
+ private async ValueTask<(Connection?, TransportContext?, HttpResponseMessage?)> ConnectAsync(HttpRequestMessage request, bool async, bool allowHttp2, CancellationToken cancellationToken)
{
// If a non-infinite connect timeout has been set, create and use a new CancellationToken that will be canceled
// when either the original token is canceled or a connect timeout occurs.
try
{
- Stream? stream = null;
+ Connection? connection = null;
switch (_kind)
{
case HttpConnectionKind.Http:
case HttpConnectionKind.Https:
case HttpConnectionKind.ProxyConnect:
Debug.Assert(_originAuthority != null);
- stream = await ConnectHelper.ConnectAsync(_originAuthority.IdnHost, _originAuthority.Port, async, cancellationToken).ConfigureAwait(false);
+ connection = await ConnectToTcpHostAsync(_originAuthority.IdnHost, _originAuthority.Port, request, async, cancellationToken).ConfigureAwait(false);
break;
case HttpConnectionKind.Proxy:
- stream = await ConnectHelper.ConnectAsync(_proxyUri!.IdnHost, _proxyUri.Port, async, cancellationToken).ConfigureAwait(false);
+ connection = await ConnectToTcpHostAsync(_proxyUri!.IdnHost, _proxyUri.Port, request, async, cancellationToken).ConfigureAwait(false);
break;
case HttpConnectionKind.ProxyTunnel:
case HttpConnectionKind.SslProxyTunnel:
HttpResponseMessage? response;
- (stream, response) = await EstablishProxyTunnel(async, request.HasHeaders ? request.Headers : null, cancellationToken).ConfigureAwait(false);
+ (connection, response) = await EstablishProxyTunnel(async, request.HasHeaders ? request.Headers : null, cancellationToken).ConfigureAwait(false);
if (response != null)
{
// Return non-success response from proxy.
response.RequestMessage = request;
- return (null, null, null, response);
+ return (null, null, response);
}
break;
}
- Socket? socket = (stream as NetworkStream)?.Socket;
+ Debug.Assert(connection != null);
TransportContext? transportContext = null;
if (_kind == HttpConnectionKind.Https || _kind == HttpConnectionKind.SslProxyTunnel)
{
- SslStream sslStream = await ConnectHelper.EstablishSslConnectionAsync(allowHttp2 ? _sslOptionsHttp2! : _sslOptionsHttp11!, request, async, stream!, cancellationToken).ConfigureAwait(false);
- stream = sslStream;
+ SslStream sslStream = await ConnectHelper.EstablishSslConnectionAsync(allowHttp2 ? _sslOptionsHttp2! : _sslOptionsHttp11!, request, async, connection.Stream, cancellationToken).ConfigureAwait(false);
+ connection = Connection.FromStream(sslStream, leaveOpen: false, connection.ConnectionProperties, connection.LocalEndPoint, connection.RemoteEndPoint);
transportContext = sslStream.TransportContext;
}
- return (socket, stream, transportContext, null);
+ return (connection, transportContext, null);
}
finally
{
}
}
+ private ValueTask<Connection> ConnectToTcpHostAsync(string host, int port, HttpRequestMessage initialRequest, bool async, CancellationToken cancellationToken)
+ {
+ if (async)
+ {
+ ConnectionFactory connectionFactory = Settings._connectionFactory ?? SocketsHttpConnectionFactory.Default;
+
+ var endPoint = new DnsEndPointWithProperties(host, port, initialRequest);
+ return ConnectHelper.ConnectAsync(connectionFactory, endPoint, endPoint, cancellationToken);
+ }
+
+ // Synchronous path.
+
+ if (Settings._connectionFactory != null)
+ {
+ // connection factories only support async.
+ throw new HttpRequestException();
+ }
+
+ try
+ {
+ return new ValueTask<Connection>(ConnectHelper.Connect(host, port, cancellationToken));
+ }
+ catch (Exception ex)
+ {
+ return ValueTask.FromException<Connection>(ex);
+ }
+ }
+
internal async ValueTask<(HttpConnection?, HttpResponseMessage?)> CreateHttp11ConnectionAsync(HttpRequestMessage request, bool async, CancellationToken cancellationToken)
{
- (Socket? socket, Stream? stream, TransportContext? transportContext, HttpResponseMessage? failureResponse) =
+ (Connection? connection, TransportContext? transportContext, HttpResponseMessage? failureResponse) =
await ConnectAsync(request, async, false, cancellationToken).ConfigureAwait(false);
if (failureResponse != null)
return (null, failureResponse);
}
- return (ConstructHttp11Connection(socket, stream!, transportContext), null);
+ return (ConstructHttp11Connection(connection!, transportContext), null);
}
- private HttpConnection ConstructHttp11Connection(Socket? socket, Stream stream, TransportContext? transportContext)
+ private HttpConnection ConstructHttp11Connection(Connection connection, TransportContext? transportContext)
{
return _maxConnections == int.MaxValue ?
- new HttpConnection(this, socket, stream, transportContext) :
- new HttpConnectionWithFinalizer(this, socket, stream, transportContext); // finalizer needed to signal the pool when a connection is dropped
+ new HttpConnection(this, connection, transportContext) :
+ new HttpConnectionWithFinalizer(this, connection, transportContext); // finalizer needed to signal the pool when a connection is dropped
}
// Returns the established stream or an HttpResponseMessage from the proxy indicating failure.
- private async ValueTask<(Stream?, HttpResponseMessage?)> EstablishProxyTunnel(bool async, HttpRequestHeaders? headers, CancellationToken cancellationToken)
+ private async ValueTask<(Connection?, HttpResponseMessage?)> EstablishProxyTunnel(bool async, HttpRequestHeaders? headers, CancellationToken cancellationToken)
{
Debug.Assert(_originAuthority != null);
// Send a CONNECT request to the proxy server to establish a tunnel.
return (null, tunnelResponse);
}
- return (tunnelResponse.Content!.ReadAsStream(cancellationToken), null);
+ Stream stream = tunnelResponse.Content.ReadAsStream(cancellationToken);
+ EndPoint remoteEndPoint = new DnsEndPoint(_originAuthority.IdnHost, _originAuthority.Port);
+
+ // TODO: the Socket from the response can be funneled into a connection property here.
+
+ return (Connection.FromStream(stream, remoteEndPoint: remoteEndPoint), null);
}
/// <summary>Enqueues a waiter to the waiters list.</summary>
// The .NET Foundation licenses this file to you under the MIT license.
using System.Collections.Generic;
+using System.Net.Connections;
using System.Net.Security;
+using System.Threading;
+using System.Threading.Tasks;
namespace System.Net.Http
{
internal bool _enableMultipleHttp2Connections;
+ internal ConnectionFactory? _connectionFactory;
+ internal Func<HttpRequestMessage, Connection, CancellationToken, ValueTask<Connection>>? _plaintextFilter;
+
internal IDictionary<string, object?>? _properties;
public HttpConnectionSettings()
_useProxy = _useProxy,
_allowUnencryptedHttp2 = _allowUnencryptedHttp2,
_assumePrenegotiatedHttp3ForTesting = _assumePrenegotiatedHttp3ForTesting,
- _enableMultipleHttp2Connections = _enableMultipleHttp2Connections
+ _enableMultipleHttp2Connections = _enableMultipleHttp2Connections,
+ _connectionFactory = _connectionFactory,
+ _plaintextFilter = _plaintextFilter
};
}
--- /dev/null
+// Licensed to the .NET Foundation under one or more agreements.
+// The .NET Foundation licenses this file to you under the MIT license.
+
+using System.Net.Connections;
+using System.Net.Sockets;
+using System.Runtime.ExceptionServices;
+using System.Threading;
+using System.Threading.Tasks;
+
+namespace System.Net.Http
+{
+ /// <summary>
+ /// The default connection factory used by <see cref="SocketsHttpHandler"/>, opening TCP connections.
+ /// </summary>
+ public class SocketsHttpConnectionFactory : ConnectionFactory
+ {
+ internal static SocketsHttpConnectionFactory Default { get; } = new SocketsHttpConnectionFactory();
+
+ /// <inheritdoc/>
+ public sealed override ValueTask<Connection> ConnectAsync(EndPoint? endPoint, IConnectionProperties? options = null, CancellationToken cancellationToken = default)
+ {
+ if (options == null || !options.TryGet(out DnsEndPointWithProperties? httpOptions))
+ {
+ return ValueTask.FromException<Connection>(ExceptionDispatchInfo.SetCurrentStackTrace(new HttpRequestException($"{nameof(SocketsHttpConnectionFactory)} requires a {nameof(DnsEndPointWithProperties)} property.")));
+ }
+
+ return EstablishConnectionAsync(httpOptions!.InitialRequest, endPoint, options, cancellationToken);
+ }
+
+ /// <summary>
+ /// Creates the socket to be used for a request.
+ /// </summary>
+ /// <param name="message">The request causing this socket to be opened. Once opened, it may be reused for many subsequent requests.</param>
+ /// <param name="endPoint">The EndPoint this socket will be connected to.</param>
+ /// <param name="options">Properties, if any, that might change how the socket is initialized.</param>
+ /// <returns>A new unconnected socket.</returns>
+ public virtual Socket CreateSocket(HttpRequestMessage message, EndPoint? endPoint, IConnectionProperties options)
+ {
+ return new Socket(SocketType.Stream, ProtocolType.Tcp);
+ }
+
+ /// <summary>
+ /// Establishes a new connection for a request.
+ /// </summary>
+ /// <param name="message">The request causing this connection to be established. Once connected, it may be reused for many subsequent requests.</param>
+ /// <param name="endPoint">The EndPoint to connect to.</param>
+ /// <param name="options">Properties, if any, that might change how the connection is made.</param>
+ /// <param name="cancellationToken">A cancellation token for the asynchronous operation.</param>
+ /// <returns>A new open connection.</returns>
+ public virtual async ValueTask<Connection> EstablishConnectionAsync(HttpRequestMessage message, EndPoint? endPoint, IConnectionProperties options, CancellationToken cancellationToken)
+ {
+ if (message == null) throw new ArgumentNullException(nameof(message));
+ if (endPoint == null) throw new ArgumentNullException(nameof(endPoint));
+
+ Socket socket = CreateSocket(message, endPoint, options);
+
+ try
+ {
+ using var args = new TaskSocketAsyncEventArgs();
+ args.RemoteEndPoint = endPoint;
+
+ if (socket.ConnectAsync(args))
+ {
+ using (cancellationToken.UnsafeRegister(o => Socket.CancelConnectAsync((SocketAsyncEventArgs)o!), args))
+ {
+ await args.Task.ConfigureAwait(false);
+ }
+ }
+
+ if (args.SocketError != SocketError.Success)
+ {
+ Exception ex = args.SocketError == SocketError.OperationAborted && cancellationToken.IsCancellationRequested
+ ? (Exception)new OperationCanceledException(cancellationToken)
+ : new SocketException((int)args.SocketError);
+
+ throw ex;
+ }
+
+ socket.NoDelay = true;
+ return new SocketConnection(socket);
+ }
+ catch
+ {
+ socket.Dispose();
+ throw;
+ }
+ }
+ }
+}
using System.Threading;
using System.Threading.Tasks;
using System.Diagnostics.CodeAnalysis;
+using System.Net.Connections;
namespace System.Net.Http
{
internal bool SupportsProxy => true;
internal bool SupportsRedirectConfiguration => true;
+ /// <summary>
+ /// When non-null, a custom factory used to open new TCP connections.
+ /// When null, a <see cref="SocketsHttpConnectionFactory"/> will be used.
+ /// </summary>
+ public ConnectionFactory? ConnectionFactory
+ {
+ get => _settings._connectionFactory;
+ set
+ {
+ CheckDisposedOrStarted();
+ _settings._connectionFactory = value;
+ }
+ }
+
+ /// <summary>
+ /// When non-null, a connection filter that is applied prior to any TLS encryption.
+ /// </summary>
+ public Func<HttpRequestMessage, Connection, CancellationToken, ValueTask<Connection>>? PlaintextFilter
+ {
+ get => _settings._plaintextFilter;
+ set
+ {
+ CheckDisposedOrStarted();
+ _settings._plaintextFilter = value;
+ }
+ }
+
public IDictionary<string, object?> Properties =>
_settings._properties ?? (_settings._properties = new Dictionary<string, object?>());
using System.Diagnostics;
using System.IO;
using System.Linq;
+using System.Net.Connections;
using System.Net.Quic;
using System.Net.Security;
using System.Net.Sockets;
}
}
+ public class SocketsHttpHandler_ConnectionFactoryTest : HttpClientHandlerTestBase
+ {
+ public SocketsHttpHandler_ConnectionFactoryTest(ITestOutputHelper output) : base(output) { }
+
+ [Fact]
+ public async Task CustomConnectionFactory_AsyncRequest_Success()
+ {
+ await using ConnectionListenerFactory listenerFactory = new VirtualNetworkConnectionListenerFactory();
+ await using ConnectionListener listener = await listenerFactory.BindAsync(endPoint: null);
+ await using ConnectionFactory connectionFactory = VirtualNetworkConnectionListenerFactory.GetConnectionFactory(listener);
+
+ // TODO: if GenericLoopbackOptions actually worked for HTTP/1 LoopbackServer we could just use that and pass in to CreateConnectionAsync.
+ // Making that work causes other tests to fail, so for now...
+ bool useHttps = UseVersion.Major >= 2 && new GenericLoopbackOptions().UseSsl;
+
+ Task serverTask = Task.Run(async () =>
+ {
+ await using Connection serverConnection = await listener.AcceptAsync();
+ using GenericLoopbackConnection loopbackConnection = await LoopbackServerFactory.CreateConnectionAsync(socket: null, serverConnection.Stream);
+
+ await loopbackConnection.InitializeConnectionAsync();
+
+ HttpRequestData requestData = await loopbackConnection.ReadRequestDataAsync();
+ await loopbackConnection.SendResponseAsync(content: "foo");
+
+ Assert.Equal("/foo", requestData.Path);
+ });
+
+ Task clientTask = Task.Run(async () =>
+ {
+ using HttpClientHandler handler = CreateHttpClientHandler();
+
+ var socketsHandler = (SocketsHttpHandler)GetUnderlyingSocketsHttpHandler(handler);
+ socketsHandler.ConnectionFactory = connectionFactory;
+
+ using HttpClient client = CreateHttpClient(handler);
+
+ string response = await client.GetStringAsync($"{(useHttps ? "https" : "http")}://{Guid.NewGuid():N}.com/foo");
+ Assert.Equal("foo", response);
+ });
+
+ await new[] { serverTask, clientTask }.WhenAllOrAnyFailed(60_000);
+ }
+
+ [Fact]
+ public async Task CustomConnectionFactory_SyncRequest_Fails()
+ {
+ await using ConnectionFactory connectionFactory = new SocketsHttpConnectionFactory();
+ using SocketsHttpHandler handler = new SocketsHttpHandler
+ {
+ ConnectionFactory = connectionFactory
+ };
+
+ using HttpClient client = CreateHttpClient(handler);
+
+ await Assert.ThrowsAnyAsync<HttpRequestException>(() => client.GetStringAsync($"http://{Guid.NewGuid():N}.com/foo"));
+ }
+ }
+
+ public sealed class SocketsHttpHandler_ConnectionFactoryTest_Http2 : SocketsHttpHandler_ConnectionFactoryTest
+ {
+ public SocketsHttpHandler_ConnectionFactoryTest_Http2(ITestOutputHelper output) : base(output) { }
+ protected override Version UseVersion => HttpVersion.Version20;
+ }
+
public sealed class SocketsHttpHandler_HttpProtocolTests : HttpProtocolTests
{
public SocketsHttpHandler_HttpProtocolTests(ITestOutputHelper output) : base(output) { }
Link="Common\System\Net\Http\ThrowingContent.cs" />
<Compile Include="ThrowingContent.netcore.cs" />
<Compile Include="Watchdog.cs" />
+ <Compile Include="$(CommonTestPath)System\Net\VirtualNetwork\VirtualNetwork.cs" Link="Common\System\Net\VirtualNetwork\VirtualNetwork.cs" />
+ <Compile Include="$(CommonTestPath)System\Net\VirtualNetwork\VirtualNetworkStream.cs" Link="Common\System\Net\VirtualNetwork\VirtualNetworkStream.cs" />
+ <Compile Include="$(CommonTestPath)System\Net\VirtualNetwork\VirtualNetworkConnectionListenerFactory.cs" Link="Common\System\Net\VirtualNetwork\VirtualNetworkConnectionListenerFactory.cs" />
</ItemGroup>
<!-- Windows specific files -->
<ItemGroup Condition=" '$(TargetsWindows)' == 'true'">
"4.6.0"
],
"BaselineVersion": "5.0.0",
- "InboxOn": {},
+ "InboxOn": {
+ "net5.0": "5.0.0.0"
+ },
"AssemblyVersionInPackageVersion": {
"4.0.0.0": "4.5.0",
"4.0.0.1": "4.5.2",