// The .NET Foundation licenses this file to you under the MIT license.
// See the LICENSE file in the project root for more information.
-using System;
using System.Collections.Concurrent;
using System.Diagnostics;
using System.Runtime.CompilerServices;
using System.Runtime.InteropServices;
using System.Threading;
-using System.Threading.Tasks;
namespace System.Net.Sockets
{
private static readonly object s_lock = new object();
- // In debug builds, force there to be 2 engines. In release builds, use half the number of processors when
- // there are at least 6. The lower bound is to avoid using multiple engines on systems which aren't servers.
-#pragma warning disable CA1802 // const works for debug, but needs to be static readonly for release
- private static readonly int s_engineCount =
-#if DEBUG
- 2;
-#else
- Environment.ProcessorCount >= 6 ? Environment.ProcessorCount / 2 : 1;
-#endif
-#pragma warning restore CA1802
+ private static readonly int s_maxEngineCount = GetEngineCount();
+
+ private static int GetEngineCount()
+ {
+ // The responsibility of SocketAsyncEngine is to get notifications from epoll|kqueue
+ // and schedule corresponding work items to ThreadPool (socket reads and writes).
+ //
+ // Using TechEmpower benchmarks that generate a LOT of SMALL socket reads and writes under a VERY HIGH load
+ // we have observed that a single engine is capable of keeping busy up to thirty x64 and eight ARM64 CPU Cores.
+ //
+ // The vast majority of real-life scenarios is never going to generate such a huge load (hundreds of thousands of requests per second)
+ // and having a single producer should be almost always enough.
+ //
+ // We want to be sure that we can handle extreme loads and that's why we have decided to use these values.
+ //
+ // It's impossible to predict all possible scenarios so we have added a possibility to configure this value using environment variables.
+ if (uint.TryParse(Environment.GetEnvironmentVariable("DOTNET_SYSTEM_NET_SOCKETS_THREAD_COUNT"), out uint count))
+ {
+ return (int)count;
+ }
+
+ Architecture architecture = RuntimeInformation.ProcessArchitecture;
+ int coresPerEngine = architecture == Architecture.Arm64 || architecture == Architecture.Arm
+ ? 8
+ : 30;
+
+ return Math.Max(1, (int)Math.Round(Environment.ProcessorCount / (double)coresPerEngine));
+ }
//
// The current engines. We replace an engine when it runs out of "handle" values.
// Must be accessed under s_lock.
//
- private static readonly SocketAsyncEngine?[] s_currentEngines = new SocketAsyncEngine?[s_engineCount];
+ private static readonly SocketAsyncEngine?[] s_currentEngines = new SocketAsyncEngine?[s_maxEngineCount];
private static int s_allocateFromEngine = 0;
private readonly IntPtr _port;
//
private static readonly IntPtr MaxHandles = IntPtr.Size == 4 ? (IntPtr)int.MaxValue : (IntPtr)long.MaxValue;
#endif
- private static readonly IntPtr MinHandlesForAdditionalEngine = s_engineCount == 1 ? MaxHandles : (IntPtr)32;
+ private static readonly IntPtr MinHandlesForAdditionalEngine = s_maxEngineCount == 1 ? MaxHandles : (IntPtr)32;
//
// Sentinel handle value to identify events from the "shutdown pipe," used to signal an event loop to stop
//
// Maps handle values to SocketAsyncContext instances.
//
- private readonly ConcurrentDictionary<IntPtr, SocketAsyncContext> _handleToContextMap = new ConcurrentDictionary<IntPtr, SocketAsyncContext>();
+ private readonly ConcurrentDictionary<IntPtr, SocketAsyncContextWrapper> _handleToContextMap = new ConcurrentDictionary<IntPtr, SocketAsyncContextWrapper>();
//
// Queue of events generated by EventLoop() that would be processed by the thread pool
// Round-robin to the next engine once we have sufficient sockets on this one.
if (!engine.HasLowNumberOfSockets)
{
- s_allocateFromEngine = (s_allocateFromEngine + 1) % s_engineCount;
+ s_allocateFromEngine = (s_allocateFromEngine + 1) % s_maxEngineCount;
}
}
}
Debug.Assert(!IsFull, "Expected !IsFull");
IntPtr handle = _nextHandle;
- _handleToContextMap.TryAdd(handle, context);
+ Debug.Assert(handle != ShutdownHandle, "ShutdownHandle must not be added to the dictionary");
+ _handleToContextMap.TryAdd(handle, new SocketAsyncContextWrapper(context));
_nextHandle = IntPtr.Add(_nextHandle, 1);
_outstandingHandles = IntPtr.Add(_outstandingHandles, 1);
{
bool shutdown = false;
Interop.Sys.SocketEvent* buffer = _buffer;
- ConcurrentDictionary<IntPtr, SocketAsyncContext> handleToContextMap = _handleToContextMap;
+ ConcurrentDictionary<IntPtr, SocketAsyncContextWrapper> handleToContextMap = _handleToContextMap;
ConcurrentQueue<SocketIOEvent> eventQueue = _eventQueue;
+ IntPtr shutdownHandle = ShutdownHandle;
+ SocketAsyncContext? context = null;
while (!shutdown)
{
int numEvents = EventBufferCount;
Debug.Assert(numEvents > 0, $"Unexpected numEvents: {numEvents}");
bool enqueuedEvent = false;
- for (int i = 0; i < numEvents; i++)
+ foreach (var socketEvent in new ReadOnlySpan<Interop.Sys.SocketEvent>(buffer, numEvents))
{
- IntPtr handle = buffer[i].Data;
- if (handle == ShutdownHandle)
- {
- shutdown = true;
- }
- else
+ IntPtr handle = socketEvent.Data;
+
+ if (handleToContextMap.TryGetValue(handle, out SocketAsyncContextWrapper contextWrapper) && (context = contextWrapper.Context) != null)
{
Debug.Assert(handle.ToInt64() < MaxHandles.ToInt64(), $"Unexpected values: handle={handle}, MaxHandles={MaxHandles}");
- handleToContextMap.TryGetValue(handle, out SocketAsyncContext? context);
- if (context != null)
+
+ Interop.Sys.SocketEvents events = context.HandleSyncEventsSpeculatively(socketEvent.Events);
+ if (events != Interop.Sys.SocketEvents.None)
{
- Interop.Sys.SocketEvents events = buffer[i].Events;
- events = context.HandleSyncEventsSpeculatively(events);
- if (events != Interop.Sys.SocketEvents.None)
- {
- var ev = new SocketIOEvent(context, events);
- eventQueue.Enqueue(ev);
- enqueuedEvent = true;
-
- // This is necessary when the JIT generates unoptimized code (debug builds, live debugging,
- // quick JIT, etc.) to ensure that the context does not remain referenced by this method, as
- // such code may keep the stack location live for longer than necessary
- ev = default;
- }
+ var ev = new SocketIOEvent(context, events);
+ eventQueue.Enqueue(ev);
+ enqueuedEvent = true;
// This is necessary when the JIT generates unoptimized code (debug builds, live debugging,
// quick JIT, etc.) to ensure that the context does not remain referenced by this method, as
// such code may keep the stack location live for longer than necessary
- context = null;
+ ev = default;
}
+
+ // This is necessary when the JIT generates unoptimized code (debug builds, live debugging,
+ // quick JIT, etc.) to ensure that the context does not remain referenced by this method, as
+ // such code may keep the stack location live for longer than necessary
+ context = null;
+ contextWrapper = default;
+ }
+ else if (handle == shutdownHandle)
+ {
+ shutdown = true;
}
}
return error == Interop.Error.SUCCESS;
}
+ // struct wrapper is used in order to improve the performance of the epoll thread hot path by up to 3% of some TechEmpower benchmarks
+ // the goal is to have a dedicated generic instantiation and using:
+ // System.Collections.Concurrent.ConcurrentDictionary`2[System.IntPtr,System.Net.Sockets.SocketAsyncContextWrapper]::TryGetValueInternal(!0,int32,!1&)
+ // instead of:
+ // System.Collections.Concurrent.ConcurrentDictionary`2[System.IntPtr,System.__Canon]::TryGetValueInternal(!0,int32,!1&)
+ private readonly struct SocketAsyncContextWrapper
+ {
+ public SocketAsyncContextWrapper(SocketAsyncContext context) => Context = context;
+
+ internal SocketAsyncContext Context { get; }
+ }
+
private readonly struct SocketIOEvent
{
public SocketAsyncContext Context { get; }