private readonly string _targetSourceName;
private readonly Guid _targetSourceGuid;
private readonly EventLevel _level;
+ private readonly double? _eventCounterInterval;
private Action<EventWrittenEventArgs> _eventWritten;
private List<EventSource> _tmpEventSourceList = new List<EventSource>();
- public TestEventListener(string targetSourceName, EventLevel level)
+ public TestEventListener(string targetSourceName, EventLevel level, double? eventCounterInterval = null)
{
// Store the arguments
_targetSourceName = targetSourceName;
_level = level;
+ _eventCounterInterval = eventCounterInterval;
LoadSourceList();
}
- public TestEventListener(Guid targetSourceGuid, EventLevel level)
+ public TestEventListener(Guid targetSourceGuid, EventLevel level, double? eventCounterInterval = null)
{
// Store the arguments
_targetSourceGuid = targetSourceGuid;
_level = level;
+ _eventCounterInterval = eventCounterInterval;
LoadSourceList();
}
if (source.Name.Equals(_targetSourceName) ||
source.Guid.Equals(_targetSourceGuid))
{
- EnableEvents(source, _level);
+ if (_eventCounterInterval != null)
+ {
+ var args = new Dictionary<string, string> { { "EventCounterIntervalSec", _eventCounterInterval?.ToString() } };
+ EnableEvents(source, _level, EventKeywords.All, args);
+ }
+ else
+ {
+ EnableEvents(source, _level);
+ }
}
}
IPEndPointExtensions.Serialize(_rightEndPoint) :
new Internals.SocketAddress(_addressFamily, SocketPal.MaximumAddressSize); // may be different size.
+ if (SocketsTelemetry.Log.IsEnabled()) SocketsTelemetry.Log.AcceptStart(socketAddress);
+
// This may throw ObjectDisposedException.
SafeSocketHandle acceptedSocketHandle;
SocketError errorCode = SocketPal.Accept(
// Throw an appropriate SocketException if the native call fails.
if (errorCode != SocketError.Success)
{
+ if (SocketsTelemetry.Log.IsEnabled()) SocketsTelemetry.Log.AcceptFailedAndStop(errorCode, null);
+
Debug.Assert(acceptedSocketHandle.IsInvalid);
UpdateAcceptSocketErrorForDisposed(ref errorCode);
UpdateStatusAfterSocketErrorAndThrowException(errorCode);
}
+ else
+ {
+ if (SocketsTelemetry.Log.IsEnabled()) SocketsTelemetry.Log.AcceptStop();
+ }
Debug.Assert(!acceptedSocketHandle.IsInvalid);
// Update the internal state of this socket according to the error before throwing.
UpdateStatusAfterSocketError(errorCode);
if (NetEventSource.Log.IsEnabled()) NetEventSource.Error(this, new SocketException((int)errorCode));
+ // Don't log transfered byte count in case of a failure.
return 0;
}
+ if (SocketsTelemetry.Log.IsEnabled())
+ {
+ SocketsTelemetry.Log.BytesSent(bytesTransferred);
+ if (SocketType == SocketType.Dgram) SocketsTelemetry.Log.DatagramSent();
+ }
+
return bytesTransferred;
}
return 0;
}
+ if (SocketsTelemetry.Log.IsEnabled())
+ {
+ SocketsTelemetry.Log.BytesSent(bytesTransferred);
+ if (SocketType == SocketType.Dgram) SocketsTelemetry.Log.DatagramSent();
+ }
+
if (NetEventSource.Log.IsEnabled())
{
NetEventSource.Info(this, $"Send returns:{bytesTransferred}");
if (NetEventSource.Log.IsEnabled()) NetEventSource.Error(this, new SocketException((int)errorCode));
bytesTransferred = 0;
}
+ else
+ {
+ if (SocketsTelemetry.Log.IsEnabled())
+ {
+ SocketsTelemetry.Log.BytesSent(bytesTransferred);
+ if (SocketType == SocketType.Dgram) SocketsTelemetry.Log.DatagramSent();
+ }
+ }
return bytesTransferred;
}
UpdateStatusAfterSocketErrorAndThrowException(errorCode);
}
+ else
+ {
+ if (SocketsTelemetry.Log.IsEnabled())
+ {
+ SocketsTelemetry.Log.BytesSent(bytesTransferred);
+ if (SocketType == SocketType.Dgram) SocketsTelemetry.Log.DatagramSent();
+ }
+ }
if (_rightEndPoint == null)
{
int bytesTransferred;
errorCode = SocketPal.Receive(_handle, buffer, offset, size, socketFlags, out bytesTransferred);
+ if (SocketsTelemetry.Log.IsEnabled())
+ {
+ SocketsTelemetry.Log.BytesReceived(bytesTransferred);
+ if (SocketType == SocketType.Dgram) SocketsTelemetry.Log.DatagramReceived();
+ }
UpdateReceiveSocketErrorForDisposed(ref errorCode, bytesTransferred);
int bytesTransferred;
errorCode = SocketPal.Receive(_handle, buffer, socketFlags, out bytesTransferred);
+ if (SocketsTelemetry.Log.IsEnabled())
+ {
+ SocketsTelemetry.Log.BytesReceived(bytesTransferred);
+ if (SocketType == SocketType.Dgram) SocketsTelemetry.Log.DatagramReceived();
+ }
UpdateReceiveSocketErrorForDisposed(ref errorCode, bytesTransferred);
int bytesTransferred;
errorCode = SocketPal.Receive(_handle, buffers, socketFlags, out bytesTransferred);
+ if (SocketsTelemetry.Log.IsEnabled())
+ {
+ SocketsTelemetry.Log.BytesReceived(bytesTransferred);
+ if (SocketType == SocketType.Dgram) SocketsTelemetry.Log.DatagramReceived();
+ }
+
UpdateReceiveSocketErrorForDisposed(ref errorCode, bytesTransferred);
if (errorCode != SocketError.Success)
Internals.SocketAddress receiveAddress;
int bytesTransferred;
SocketError errorCode = SocketPal.ReceiveMessageFrom(this, _handle, buffer, offset, size, ref socketFlags, socketAddress, out receiveAddress, out ipPacketInformation, out bytesTransferred);
+ if (SocketsTelemetry.Log.IsEnabled())
+ {
+ SocketsTelemetry.Log.BytesReceived(bytesTransferred);
+ if (SocketType == SocketType.Dgram) SocketsTelemetry.Log.DatagramReceived();
+ }
UpdateReceiveSocketErrorForDisposed(ref errorCode, bytesTransferred);
int bytesTransferred;
SocketError errorCode = SocketPal.ReceiveFrom(_handle, buffer, offset, size, socketFlags, socketAddress.Buffer, ref socketAddress.InternalSize, out bytesTransferred);
+ if (SocketsTelemetry.Log.IsEnabled())
+ {
+ SocketsTelemetry.Log.BytesReceived(bytesTransferred);
+ if (SocketType == SocketType.Dgram) SocketsTelemetry.Log.DatagramReceived();
+ }
UpdateReceiveSocketErrorForDisposed(ref errorCode, bytesTransferred);
return 0;
}
+ if (SocketsTelemetry.Log.IsEnabled())
+ {
+ SocketsTelemetry.Log.BytesSent(bytesTransferred);
+ if (SocketType == SocketType.Dgram) SocketsTelemetry.Log.DatagramSent();
+ }
+
return bytesTransferred;
}
UpdateSendSocketErrorForDisposed(ref errorCode);
UpdateStatusAfterSocketErrorAndThrowException(errorCode);
}
+ else
+ {
+ if (SocketsTelemetry.Log.IsEnabled())
+ {
+ SocketsTelemetry.Log.BytesSent(bytesTransferred);
+ if (SocketType == SocketType.Dgram) SocketsTelemetry.Log.DatagramSent();
+ }
+ }
return bytesTransferred;
}
int bytesTransferred = castedAsyncResult.InternalWaitForCompletionInt32Result();
castedAsyncResult.EndCalled = true;
+ if (SocketsTelemetry.Log.IsEnabled())
+ {
+ SocketsTelemetry.Log.BytesReceived(bytesTransferred);
+ if (SocketType == SocketType.Dgram) SocketsTelemetry.Log.DatagramReceived();
+ }
// Throw an appropriate SocketException if the native call failed asynchronously.
errorCode = (SocketError)castedAsyncResult.ErrorCode;
int bytesTransferred = castedAsyncResult.InternalWaitForCompletionInt32Result();
castedAsyncResult.EndCalled = true;
+ if (SocketsTelemetry.Log.IsEnabled())
+ {
+ SocketsTelemetry.Log.BytesReceived(bytesTransferred);
+ if (SocketType == SocketType.Dgram) SocketsTelemetry.Log.DatagramReceived();
+ }
// Update socket address size.
castedAsyncResult.SocketAddress!.InternalSize = castedAsyncResult.GetSocketAddressSize();
int bytesTransferred = castedAsyncResult.InternalWaitForCompletionInt32Result();
castedAsyncResult.EndCalled = true;
+ if (SocketsTelemetry.Log.IsEnabled())
+ {
+ SocketsTelemetry.Log.BytesReceived(bytesTransferred);
+ if (SocketType == SocketType.Dgram) SocketsTelemetry.Log.DatagramReceived();
+ }
// Update socket address size.
castedAsyncResult.SocketAddress!.InternalSize = castedAsyncResult.GetSocketAddressSize();
asyncResult.AcceptSocket = GetOrCreateAcceptSocket(acceptSocket, false, nameof(acceptSocket), out acceptHandle);
if (NetEventSource.Log.IsEnabled()) NetEventSource.Info(this, $"AcceptSocket:{acceptSocket}");
+ if (SocketsTelemetry.Log.IsEnabled()) SocketsTelemetry.Log.AcceptStart(_rightEndPoint);
int socketAddressSize = GetAddressSize(_rightEndPoint);
SocketError errorCode = SocketPal.AcceptAsync(this, _handle, acceptHandle, receiveSize, socketAddressSize, asyncResult);
// Throw an appropriate SocketException if the native call fails synchronously.
if (!CheckErrorAndUpdateStatus(errorCode))
{
+ if (SocketsTelemetry.Log.IsEnabled()) SocketsTelemetry.Log.AcceptFailedAndStop(errorCode, null);
+
UpdateAcceptSocketErrorForDisposed(ref errorCode);
throw new SocketException((int)errorCode);
}
+
+ if (SocketsTelemetry.Log.IsEnabled()) SocketsTelemetry.Log.AcceptStop();
}
// Routine Description:
SocketError errorCode = (SocketError)castedAsyncResult.ErrorCode;
if (errorCode != SocketError.Success)
{
+ if (SocketsTelemetry.Log.IsEnabled()) SocketsTelemetry.Log.AcceptFailedAndStop(errorCode, null);
+
UpdateAcceptSocketErrorForDisposed(ref errorCode);
UpdateStatusAfterSocketErrorAndThrowException(errorCode);
}
+ else
+ {
+ if (SocketsTelemetry.Log.IsEnabled()) SocketsTelemetry.Log.AcceptStop();
+ }
if (NetEventSource.Log.IsEnabled()) NetEventSource.Accepted(socket, socket.RemoteEndPoint, socket.LocalEndPoint);
return socket;
_acceptBuffer = new byte[_acceptAddressBufferCount];
}
}
+
+ if (SocketsTelemetry.Log.IsEnabled()) SocketsTelemetry.Log.AcceptStart(_currentSocket!._rightEndPoint!);
}
internal void StartOperationConnect(MultipleConnectAsync? multipleConnect, bool userSocket)
{
SetResults(socketError, bytesTransferred, flags);
- if (SocketsTelemetry.Log.IsEnabled() && _multipleConnect == null && _completedOperation == SocketAsyncOperation.Connect)
+ if (SocketsTelemetry.Log.IsEnabled())
{
- SocketsTelemetry.Log.ConnectFailedAndStop(socketError, null);
+ if (_multipleConnect == null && _completedOperation == SocketAsyncOperation.Connect) SocketsTelemetry.Log.ConnectFailedAndStop(socketError, null);
+ if (_completedOperation == SocketAsyncOperation.Accept) SocketsTelemetry.Log.AcceptFailedAndStop(socketError, null);
}
// This will be null if we're doing a static ConnectAsync to a DnsEndPoint with AddressFamily.Unspecified;
break;
}
+ // Don't log transfered byte count in case of a failure.
+
Complete();
}
_currentSocket = connectSocket;
_connectSocket = connectSocket;
+ if (SocketsTelemetry.Log.IsEnabled()) LogBytesTransferEvents(connectSocket?.SocketType, SocketAsyncOperation.Connect, bytesTransferred);
+
// Complete the operation and raise the event.
ExecutionContext? context = _context; // store context before it's cleared as part of completing the operation
Complete();
}
catch (ObjectDisposedException) { }
}
+
+ if (SocketsTelemetry.Log.IsEnabled()) SocketsTelemetry.Log.AcceptStop();
}
else
{
+ if (SocketsTelemetry.Log.IsEnabled()) SocketsTelemetry.Log.AcceptFailedAndStop(socketError, null);
+
SetResults(socketError, bytesTransferred, flags);
_acceptSocket = null;
_currentSocket.UpdateStatusAfterSocketError(socketError);
break;
}
+ if (SocketsTelemetry.Log.IsEnabled()) LogBytesTransferEvents(_currentSocket?.SocketType, _completedOperation, bytesTransferred);
+
Complete();
}
FinishOperationSyncFailure(socketError, bytesTransferred, flags);
}
}
+
+ private static void LogBytesTransferEvents(SocketType? socketType, SocketAsyncOperation operation, int bytesTransferred)
+ {
+ switch (operation)
+ {
+ case SocketAsyncOperation.Receive:
+ case SocketAsyncOperation.ReceiveFrom:
+ case SocketAsyncOperation.ReceiveMessageFrom:
+ case SocketAsyncOperation.Accept:
+ SocketsTelemetry.Log.BytesReceived(bytesTransferred);
+ if (socketType == SocketType.Dgram) SocketsTelemetry.Log.DatagramReceived();
+ break;
+ case SocketAsyncOperation.Send:
+ case SocketAsyncOperation.SendTo:
+ case SocketAsyncOperation.SendPackets:
+ case SocketAsyncOperation.Connect:
+ SocketsTelemetry.Log.BytesSent(bytesTransferred);
+ if (socketType == SocketType.Dgram) SocketsTelemetry.Log.DatagramSent();
+ break;
+ }
+ }
}
}
// The .NET Foundation licenses this file to you under the MIT license.
using System.Diagnostics.Tracing;
+using System.Threading;
namespace System.Net.Sockets
{
{
public static readonly SocketsTelemetry Log = new SocketsTelemetry();
+ private PollingCounter? _outgoingConnectionsEstablishedCounter;
+ private PollingCounter? _incomingConnectionsEstablishedCounter;
+ private PollingCounter? _bytesReceivedCounter;
+ private PollingCounter? _bytesSentCounter;
+ private PollingCounter? _datagramsReceivedCounter;
+ private PollingCounter? _datagramsSentCounter;
+
+ private long _outgoingConnectionsEstablished;
+ private long _incomingConnectionsEstablished;
+ private long _bytesReceived;
+ private long _bytesSent;
+ private long _datagramsReceived;
+ private long _datagramsSent;
+
[Event(1, Level = EventLevel.Informational)]
public void ConnectStart(string? address)
{
+ Interlocked.Increment(ref _outgoingConnectionsEstablished);
if (IsEnabled(EventLevel.Informational, EventKeywords.All))
{
WriteEvent(eventId: 1, address ?? "");
}
}
+ [Event(5, Level = EventLevel.Informational)]
+ public void AcceptStart(string? address)
+ {
+ Interlocked.Increment(ref _incomingConnectionsEstablished);
+ if (IsEnabled(EventLevel.Informational, EventKeywords.All))
+ {
+ WriteEvent(eventId: 5, address ?? "");
+ }
+ }
+
+ [Event(6, Level = EventLevel.Informational)]
+ public void AcceptStop()
+ {
+ if (IsEnabled(EventLevel.Informational, EventKeywords.All))
+ {
+ WriteEvent(eventId: 6);
+ }
+ }
+
+ [Event(7, Level = EventLevel.Error)]
+ public void AcceptFailed(SocketError error, string? exceptionMessage)
+ {
+ if (IsEnabled(EventLevel.Error, EventKeywords.All))
+ {
+ WriteEvent(eventId: 7, (int)error, exceptionMessage ?? string.Empty);
+ }
+ }
+
[NonEvent]
public void ConnectStart(Internals.SocketAddress address)
{
ConnectFailed(error, exceptionMessage);
ConnectStop();
}
+
+ [NonEvent]
+ public void AcceptStart(Internals.SocketAddress address)
+ {
+ AcceptStart(address.ToString());
+ }
+
+ [NonEvent]
+ public void AcceptStart(EndPoint address)
+ {
+ AcceptStart(address.ToString());
+ }
+
+ [NonEvent]
+ public void AcceptFailedAndStop(SocketError error, string? exceptionMessage)
+ {
+ AcceptFailed(error, exceptionMessage);
+ AcceptStop();
+ }
+
+ [NonEvent]
+ public void BytesReceived(int count)
+ {
+ Interlocked.Add(ref _bytesReceived, count);
+ }
+
+ [NonEvent]
+ public void BytesSent(int count)
+ {
+ Interlocked.Add(ref _bytesSent, count);
+ }
+
+ [NonEvent]
+ public void DatagramReceived()
+ {
+ Interlocked.Increment(ref _datagramsReceived);
+ }
+
+ [NonEvent]
+ public void DatagramSent()
+ {
+ Interlocked.Increment(ref _datagramsSent);
+ }
+
+ protected override void OnEventCommand(EventCommandEventArgs command)
+ {
+ if (command.Command == EventCommand.Enable)
+ {
+ // This is the convention for initializing counters in the RuntimeEventSource (lazily on the first enable command).
+
+ _outgoingConnectionsEstablishedCounter ??= new PollingCounter("outgoing-connections-established", this, () => Interlocked.Read(ref _outgoingConnectionsEstablished))
+ {
+ DisplayName = "Outgoing Connections Established",
+ };
+ _incomingConnectionsEstablishedCounter ??= new PollingCounter("incoming-connections-established", this, () => Interlocked.Read(ref _incomingConnectionsEstablished))
+ {
+ DisplayName = "Incoming Connections Established",
+ };
+ _bytesReceivedCounter ??= new PollingCounter("bytes-received", this, () => Interlocked.Read(ref _bytesReceived))
+ {
+ DisplayName = "Bytes Received",
+ };
+ _bytesSentCounter ??= new PollingCounter("bytes-sent", this, () => Interlocked.Read(ref _bytesSent))
+ {
+ DisplayName = "Bytes Sent",
+ };
+ _datagramsReceivedCounter ??= new PollingCounter("datagrams-received", this, () => Interlocked.Read(ref _datagramsReceived))
+ {
+ DisplayName = "Datagrams Received",
+ };
+ _datagramsSentCounter ??= new PollingCounter("datagrams-sent", this, () => Interlocked.Read(ref _datagramsSent))
+ {
+ DisplayName = "Datagrams Sent",
+ };
+ }
+ }
}
}
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
+using System.Collections;
using System.Collections.Concurrent;
+using System.Collections.Generic;
+using System.Diagnostics;
using System.Diagnostics.Tracing;
+using System.Linq;
+using System.Threading.Tasks;
using Microsoft.DotNet.RemoteExecutor;
using Xunit;
using Xunit.Abstractions;
{
RemoteExecutor.Invoke(() =>
{
- using (var listener = new TestEventListener("System.Net.Sockets", EventLevel.Verbose))
+ using (var listener = new TestEventListener("System.Net.Sockets", EventLevel.Verbose, 0.1))
{
var events = new ConcurrentQueue<EventWrittenEventArgs>();
- listener.RunWithCallback(events.Enqueue, () =>
+ listener.RunWithCallbackAsync(events.Enqueue, async () =>
{
// Invoke several tests to execute code paths while tracing is enabled
- new SendReceiveSync(null).SendRecv_Stream_TCP(IPAddress.Loopback, false).GetAwaiter();
- new SendReceiveSync(null).SendRecv_Stream_TCP(IPAddress.Loopback, true).GetAwaiter();
+ await new SendReceiveSync(null).SendRecv_Stream_TCP(IPAddress.Loopback, false).ConfigureAwait(false);
+ await new SendReceiveSync(null).SendRecv_Stream_TCP(IPAddress.Loopback, true).ConfigureAwait(false);
- new SendReceiveTask(null).SendRecv_Stream_TCP(IPAddress.Loopback, false).GetAwaiter();
- new SendReceiveTask(null).SendRecv_Stream_TCP(IPAddress.Loopback, true).GetAwaiter();
+ await new SendReceiveTask(null).SendRecv_Stream_TCP(IPAddress.Loopback, false).ConfigureAwait(false);
+ await new SendReceiveTask(null).SendRecv_Stream_TCP(IPAddress.Loopback, true).ConfigureAwait(false);
- new SendReceiveEap(null).SendRecv_Stream_TCP(IPAddress.Loopback, false).GetAwaiter();
- new SendReceiveEap(null).SendRecv_Stream_TCP(IPAddress.Loopback, true).GetAwaiter();
+ await new SendReceiveEap(null).SendRecv_Stream_TCP(IPAddress.Loopback, false).ConfigureAwait(false);
+ await new SendReceiveEap(null).SendRecv_Stream_TCP(IPAddress.Loopback, true).ConfigureAwait(false);
- new SendReceiveApm(null).SendRecv_Stream_TCP(IPAddress.Loopback, false).GetAwaiter();
- new SendReceiveApm(null).SendRecv_Stream_TCP(IPAddress.Loopback, true).GetAwaiter();
+ await new SendReceiveApm(null).SendRecv_Stream_TCP(IPAddress.Loopback, false).ConfigureAwait(false);
+ await new SendReceiveApm(null).SendRecv_Stream_TCP(IPAddress.Loopback, true).ConfigureAwait(false);
- new NetworkStreamTest().CopyToAsync_AllDataCopied(4096, true).GetAwaiter().GetResult();
- new NetworkStreamTest().Timeout_ValidData_Roundtrips().GetAwaiter().GetResult();
- });
+ await new SendReceiveUdpClient().SendToRecvFromAsync_Datagram_UDP_UdpClient(IPAddress.Loopback).ConfigureAwait(false);
+ await new SendReceiveUdpClient().SendToRecvFromAsync_Datagram_UDP_UdpClient(IPAddress.Loopback).ConfigureAwait(false);
+
+ await new NetworkStreamTest().CopyToAsync_AllDataCopied(4096, true).ConfigureAwait(false);
+ await new NetworkStreamTest().Timeout_ValidData_Roundtrips().ConfigureAwait(false);
+ await Task.Delay(300).ConfigureAwait(false);
+ }).Wait();
Assert.DoesNotContain(events, ev => ev.EventId == 0); // errors from the EventSource itself
- Assert.InRange(events.Count, 1, int.MaxValue);
+ VerifyEvents(events, "ConnectStart", 10);
+ VerifyEvents(events, "ConnectStop", 10);
+
+ Dictionary<string, double> eventCounters = events.Where(e => e.EventName == "EventCounters").Select(e => (IDictionary<string, object>) e.Payload.Single())
+ .GroupBy(d => (string)d["Name"], d => (double)d["Mean"], (k, v) => new { Name = k, Value = v.Sum() })
+ .ToDictionary(p => p.Name, p => p.Value);
+
+ VerifyEventCounter("incoming-connections-established", eventCounters);
+ VerifyEventCounter("outgoing-connections-established", eventCounters);
+ VerifyEventCounter("bytes-received", eventCounters);
+ VerifyEventCounter("bytes-sent", eventCounters);
+ VerifyEventCounter("datagrams-received", eventCounters);
+ VerifyEventCounter("datagrams-sent", eventCounters);
}
}).Dispose();
}
+
+ private static void VerifyEvents(IEnumerable<EventWrittenEventArgs> events, string eventName, int expectedCount)
+ {
+ EventWrittenEventArgs[] starts = events.Where(e => e.EventName == eventName).ToArray();
+ Assert.Equal(expectedCount, starts.Length);
+ }
+
+ private static void VerifyEventCounter(string name, Dictionary<string, double> eventCounters)
+ {
+ Assert.True(eventCounters.ContainsKey(name));
+ Assert.True(eventCounters[name] > 0);
+ }
}
}