--- /dev/null
+using System;
+using System.Diagnostics;
+using System.Threading;
+
+namespace Common
+{
+ // Each pattern sends N events per second, but varies the frequency
+ // across that second to simulate various burst patterns.
+ public enum BurstPattern : int
+ {
+ DRIP = 0, // (default) to send N events per second, sleep 1000/N ms between sends
+ BOLUS = 1, // each second send N events then stop
+ HEAVY_DRIP = 2, // each second send M bursts of N/M events
+ NONE = -1 // no burst pattern
+ }
+
+ public static class BurstPatternMethods
+ {
+ public static BurstPattern ToBurstPattern(this int n)
+ {
+ return n > 2 || n < -1 ? BurstPattern.NONE : (BurstPattern)n;
+ }
+
+ public static BurstPattern ToBurstPattern(this string str)
+ {
+ if (Int32.TryParse(str, out int result))
+ return result.ToBurstPattern();
+ else
+ {
+ return str.ToLowerInvariant() switch
+ {
+ "drip" => BurstPattern.DRIP,
+ "bolus" => BurstPattern.BOLUS,
+ "heavy_drip" => BurstPattern.HEAVY_DRIP,
+ "none" => BurstPattern.NONE,
+ _ => BurstPattern.NONE
+ };
+ }
+ }
+
+ public static string ToString(this BurstPattern burstPattern) => burstPattern switch
+ {
+ BurstPattern.DRIP => "DRIP",
+ BurstPattern.BOLUS => "BOLUS",
+ BurstPattern.HEAVY_DRIP => "HEAVY_DRIP",
+ BurstPattern.NONE => "NONE",
+ _ => "UNKOWN"
+ };
+
+ public static void DefaultSleepAction(int duration) => Thread.Sleep(duration);
+ public static void BusySleepAction(int duration)
+ {
+ string busyString = "0";
+ DateTime start = DateTime.Now;
+ while (DateTime.Now.Subtract(start).TotalMilliseconds < duration)
+ {
+ busyString += "0";
+ }
+ }
+
+ /// <summary>
+ /// Invoke <param name="method"/> <param name="rate"/> times in 1 second using the <param name="pattern"/> provided
+ /// </summary>
+ public static Func<long> Burst(BurstPattern pattern, int rate, Action method, Action<int> sleepAction = null)
+ {
+ if (rate == 0)
+ throw new ArgumentException("Rate cannot be 0");
+ if (sleepAction == null)
+ throw new ArgumentException("sleep action cannot be null");
+
+ switch (pattern)
+ {
+ case BurstPattern.DRIP:
+ {
+ int sleepInMs = (int)Math.Floor(1000.0/rate);
+ return () => { method(); sleepAction?.Invoke(sleepInMs); return 1; };
+ }
+ case BurstPattern.BOLUS:
+ {
+ return () =>
+ {
+ Stopwatch sw = new Stopwatch();
+ sw.Start();
+ for (int i = 0; i < rate; i++) { method(); }
+ sw.Stop();
+ if (sw.Elapsed.TotalSeconds < 1)
+ sleepAction?.Invoke(1000 - (int)Math.Floor((double)sw.ElapsedMilliseconds));
+ return rate;
+ };
+ }
+ case BurstPattern.HEAVY_DRIP:
+ {
+ int nDrips = 4;
+ int nEventsPerDrip = (int)Math.Floor((double)rate / nDrips);
+ int sleepInMs = (int)Math.Floor((1000.0 / rate) / nDrips);
+ return () =>
+ {
+ for (int i = 0; i < nDrips; i++)
+ {
+ for (int j = 0; j < nEventsPerDrip; i++)
+ method();
+ sleepAction?.Invoke(sleepInMs);
+ }
+ return nEventsPerDrip * nDrips;
+ };
+ }
+ case BurstPattern.NONE:
+ {
+ return () => { method(); return 1; };
+ }
+ default:
+ throw new ArgumentException("Unkown burst pattern");
+ }
+ }
+ }
+}
\ No newline at end of file
--- /dev/null
+<Project Sdk="Microsoft.NET.Sdk">
+
+ <PropertyGroup>
+ <TargetFramework>netstandard2.1</TargetFramework>
+ </PropertyGroup>
+
+ <ItemGroup>
+ <PackageReference Include="System.CommandLine" Version="2.0.0-beta1.20574.7" />
+ </ItemGroup>
+
+</Project>
\ No newline at end of file
--- /dev/null
+using System;
+using System.CommandLine;
+using System.CommandLine.Parsing;
+
+namespace Common
+{
+ public static class CommandLineOptions
+ {
+ static public ValidateSymbol<OptionResult> GreaterThanZeroValidator = (OptionResult result) =>
+ {
+ if (result.GetValueOrDefault<int>() <= 0)
+ return $"{result.Option.Name} must be greater than or equal to 0";
+ return null;
+ };
+
+ static public ValidateSymbol<OptionResult> GreaterThanOrEqualZeroValidator = (OptionResult result) =>
+ {
+ if (result.GetValueOrDefault<int>() < 0)
+ return $"{result.Option.Name} must be greater than 0";
+ return null;
+ };
+
+ static public ValidateSymbol<OptionResult> MustBeNegOneOrPositiveValidator = (OptionResult result) =>
+ {
+ int val = result.GetValueOrDefault<int>();
+ if (val < -1 || val == 0)
+ return $"{result.Option.Name} must be -1 or greater than 0";
+ return null;
+ };
+
+ static private Option<int> _eventSizeOption = null;
+ static public Option<int> EventSizeOption
+ {
+ get
+ {
+ if (_eventSizeOption != null)
+ return _eventSizeOption;
+
+ _eventSizeOption = new Option<int>(
+ alias: "--event-size",
+ getDefaultValue: () => 100,
+ description: "The size of the event payload. The payload is a string, so the actual size will be eventSize * sizeof(char) where sizeof(char) is 2 Bytes due to Unicode in C#.");
+ _eventSizeOption.AddValidator(GreaterThanZeroValidator);
+ return _eventSizeOption;
+ }
+ private set {}
+ }
+
+ static private Option<int> _eventRateOption = null;
+ static public Option<int> EventRateOption
+ {
+ get
+ {
+ if (_eventRateOption != null)
+ return _eventRateOption;
+
+ _eventRateOption = new Option<int>(
+ alias: "--event-rate",
+ getDefaultValue: () => -1,
+ description: "The rate of events in events/sec. -1 means 'as fast as possible'.");
+ _eventRateOption.AddValidator(MustBeNegOneOrPositiveValidator);
+ return _eventRateOption;
+ }
+ private set {}
+ }
+
+ static public Option<BurstPattern> BurstPatternOption =
+ new Option<BurstPattern>(
+ alias: "--burst-pattern",
+ getDefaultValue: () => BurstPattern.NONE,
+ description: "The burst pattern to send events in.");
+
+ static private Option<int> _durationOption = null;
+ static public Option<int> DurationOption
+ {
+ get
+ {
+ if (_durationOption != null)
+ return _durationOption;
+
+ _durationOption = new Option<int>(
+ alias: "--duration",
+ getDefaultValue: () => 60,
+ description: "The number of seconds to send events for.");
+ _durationOption.AddValidator(GreaterThanZeroValidator);
+ return _durationOption;
+ }
+ private set {}
+ }
+
+
+ static private Option<int> _threadsOption = null;
+ static public Option<int> ThreadsOption
+ {
+ get
+ {
+ if (_threadsOption != null)
+ return _threadsOption;
+
+ _threadsOption = new Option<int>(
+ alias: "--threads",
+ getDefaultValue: () => 1,
+ description: "The number of threads writing events.");
+ _threadsOption.AddValidator(GreaterThanZeroValidator);
+ return _threadsOption;
+ }
+ private set {}
+ }
+
+
+ static private Option<int> _eventCountOption = null;
+ static public Option<int> EventCountOption
+ {
+ get
+ {
+ if (_eventCountOption != null)
+ return _eventCountOption;
+
+ _eventCountOption = new Option<int>(
+ alias: "--event-count",
+ getDefaultValue: () => -1,
+ description: "The total number of events to write per thread. -1 means no limit");
+ _eventCountOption.AddValidator(MustBeNegOneOrPositiveValidator);
+ return _eventCountOption;
+ }
+ private set {}
+ }
+ }
+}
\ No newline at end of file
--- /dev/null
+using Common;
+using System;
+using System.CommandLine;
+using System.CommandLine.Parsing;
+
+namespace Orchestrator
+{
+ public static class OrchestrateCommandLine
+ {
+ static public Option<ReaderType> ReaderTypeOption =
+ new Option<ReaderType>(
+ alias: "--reader-type",
+ getDefaultValue: () => ReaderType.Stream,
+ description: "The method to read the stream of events.");
+
+ static public Option<bool> PauseOption =
+ new Option<bool>(
+ alias: "--pause",
+ getDefaultValue: () => false,
+ description: "Should the orchestrator pause before starting each test phase for a debugger to attach?");
+
+ static public Option<bool> RundownOption =
+ new Option<bool>(
+ alias: "--rundown",
+ getDefaultValue: () => true,
+ description: "Should the EventPipe session request rundown events?");
+
+ static private Option<int> _bufferSizeOption = null;
+ static public Option<int> BufferSizeOption
+ {
+ get
+ {
+ if (_bufferSizeOption != null)
+ return _bufferSizeOption;
+
+ _bufferSizeOption = new Option<int>(
+ alias: "--buffer-size",
+ getDefaultValue: () => 256,
+ description: "The size of the buffer requested in the EventPipe session");
+ _bufferSizeOption.AddValidator(CommandLineOptions.GreaterThanZeroValidator);
+ return _bufferSizeOption;
+ }
+ private set {}
+ }
+
+ static private Option<int> _slowReaderOption = null;
+ static public Option<int> SlowReaderOption
+ {
+ get
+ {
+ if (_slowReaderOption != null)
+ return _slowReaderOption;
+
+ _slowReaderOption = new Option<int>(
+ alias: "--slow-reader",
+ getDefaultValue: () => 0,
+ description: "<Only valid for EventPipeEventSource reader> Delay every read by this many milliseconds.");
+ _slowReaderOption.AddValidator(CommandLineOptions.GreaterThanOrEqualZeroValidator);
+ return _slowReaderOption;
+ }
+ private set {}
+ }
+
+ static private Option<int> _coresOption = null;
+ static public Option<int> CoresOption
+ {
+ get
+ {
+ if (_coresOption != null)
+ return _coresOption;
+
+ _coresOption = new Option<int>(
+ alias: "--cores",
+ getDefaultValue: () => Environment.ProcessorCount,
+ description: "The number of logical cores to restrict the writing process to.");
+ _coresOption.AddValidator(CoreValueMustBeFeasibleValidator);
+ return _coresOption;
+ }
+ private set {}
+ }
+
+ static private Option<int> _iterationsOption = null;
+ static public Option<int> IterationsOption
+ {
+ get
+ {
+ if (_iterationsOption != null)
+ return _iterationsOption;
+
+ _iterationsOption = new Option<int>(
+ alias: "--iterations",
+ getDefaultValue: () => 1,
+ description: "The number of times to run the test.");
+ _iterationsOption.AddValidator(CommandLineOptions.GreaterThanZeroValidator);
+ return _iterationsOption;
+ }
+ private set {}
+ }
+
+ static public ValidateSymbol<OptionResult> CoreValueMustBeFeasibleValidator = (OptionResult result) =>
+ {
+ int val = result.GetValueOrDefault<int>();
+ if (val < 1 || val > Environment.ProcessorCount)
+ return $"Core count must be between 1 and {Environment.ProcessorCount}";
+ return null;
+ };
+ }
+}
\ No newline at end of file
--- /dev/null
+<Project Sdk="Microsoft.NET.Sdk">
+
+ <PropertyGroup>
+ <OutputType>Exe</OutputType>
+ <TargetFramework>net5.0</TargetFramework>
+ </PropertyGroup>
+
+ <ItemGroup>
+ <ProjectReference Include="../Common/Common.csproj" />
+ </ItemGroup>
+
+ <ItemGroup>
+ <PackageReference Include="Microsoft.Diagnostics.NETCore.Client" Version="0.2.137102" />
+ <PackageReference Include="Microsoft.Diagnostics.Tracing.TraceEvent" Version="2.0.58" />
+ <PackageReference Include="System.CommandLine" Version="2.0.0-beta1.20574.7" />
+ <PackageReference Include="System.Security.Principal.Windows" Version="4.7.0" />
+ </ItemGroup>
+
+</Project>
--- /dev/null
+using System;
+using System.Diagnostics;
+using System.Diagnostics.Tracing;
+using System.IO;
+using System.Security.Principal;
+using System.Threading;
+using System.Threading.Tasks;
+using System.Runtime.InteropServices;
+using System.CommandLine;
+using System.CommandLine.Invocation;
+using System.CommandLine.Builder;
+using System.CommandLine.Parsing;
+
+using Common;
+using Microsoft.Diagnostics.NETCore.Client;
+using Microsoft.Diagnostics.Tracing;
+
+using Process = System.Diagnostics.Process;
+
+namespace Orchestrator
+{
+ public enum ReaderType
+ {
+ Stream,
+ EventPipeEventSource
+ }
+
+ class Program
+ {
+ delegate Task<int> RootCommandHandler(
+ IConsole console,
+ CancellationToken ct,
+ FileInfo stressPath,
+ int eventSize,
+ int eventRate,
+ BurstPattern burstPattern,
+ ReaderType readerType,
+ int slowReader,
+ int duration,
+ int cores,
+ int threads,
+ int eventCount,
+ bool rundown,
+ int bufferSize,
+ int iterations,
+ bool pause);
+
+ // TODO: Collect CPU % of reader and writer while running test and add to stats
+ // TODO: Standardize and clean up logging from orchestrator and corescaletest
+ // TODO: Improve error handling
+
+ static async Task<int> Main(string[] args)
+ {
+
+ if (RuntimeInformation.IsOSPlatform(OSPlatform.Windows))
+ {
+ // Try to run in admin mode in Windows
+ bool isElevated;
+ using (WindowsIdentity identity = WindowsIdentity.GetCurrent())
+ {
+ WindowsPrincipal principal = new WindowsPrincipal(identity);
+ isElevated = principal.IsInRole(WindowsBuiltInRole.Administrator);
+ }
+ if (!isElevated)
+ {
+ Console.WriteLine("Must run in root/admin mode");
+ return -1;
+ }
+ }
+
+ return await BuildCommandLine()
+ .UseDefaults()
+ .Build()
+ .InvokeAsync(args);
+ }
+
+ static CommandLineBuilder BuildCommandLine()
+ {
+ var rootCommand = new RootCommand("EventPipe Stress Tester - Orchestrator")
+ {
+ new Argument<FileInfo>(
+ name: "stress-path",
+ description: "The location of the Stress executable."
+ ),
+ CommandLineOptions.EventSizeOption,
+ CommandLineOptions.EventRateOption,
+ CommandLineOptions.BurstPatternOption,
+ OrchestrateCommandLine.ReaderTypeOption,
+ OrchestrateCommandLine.SlowReaderOption,
+ CommandLineOptions.DurationOption,
+ OrchestrateCommandLine.CoresOption,
+ CommandLineOptions.ThreadsOption,
+ CommandLineOptions.EventCountOption,
+ OrchestrateCommandLine.RundownOption,
+ OrchestrateCommandLine.BufferSizeOption,
+ OrchestrateCommandLine.IterationsOption,
+ OrchestrateCommandLine.PauseOption
+ };
+
+
+ rootCommand.Handler = CommandHandler.Create((RootCommandHandler)Orchestrate);
+ return new CommandLineBuilder(rootCommand);
+ }
+
+ private static EventPipeSession GetSession(int pid, bool rundown, int bufferSize)
+ {
+ DiagnosticsClient client = new DiagnosticsClient(pid);
+ while (!client.CheckTransport())
+ {
+ Console.WriteLine("still unable to talk");
+ Thread.Sleep(50);
+ }
+ return client.StartEventPipeSession(
+ new EventPipeProvider("MySource", EventLevel.Verbose),
+ requestRundown: rundown,
+ circularBufferMB: bufferSize);
+ }
+
+ /// <summary>
+ /// This uses EventPipeEventSource's Stream constructor to parse the events real-time.
+ /// It then returns the number of events read.
+ /// </summary>
+ private static Func<int, TestResult> UseEPES(bool rundown, int bufferSize, int slowReader)
+ {
+ return (int pid) =>
+ {
+ int eventsRead = 0;
+ var slowReadSw = new Stopwatch();
+ var totalTimeSw = new Stopwatch();
+ var interval = TimeSpan.FromSeconds(0.75);
+
+ EventPipeSession session = GetSession(pid, rundown, bufferSize);
+ Console.WriteLine("Session created.");
+
+ EventPipeEventSource epes = new EventPipeEventSource(session.EventStream);
+ epes.Dynamic.All += (TraceEvent data) => {
+ eventsRead += 1;
+ if (slowReader > 0)
+ {
+ if (slowReadSw.Elapsed > interval)
+ {
+ Thread.Sleep(slowReader);
+ slowReadSw.Reset();
+ }
+ }
+ };
+ if (slowReader > 0)
+ slowReadSw.Start();
+ totalTimeSw.Start();
+ epes.Process();
+ totalTimeSw.Stop();
+ if (slowReader > 0)
+ slowReadSw.Stop();
+ Console.WriteLine("Read total: " + eventsRead.ToString());
+ Console.WriteLine("Dropped total: " + epes.EventsLost.ToString());
+
+ return new TestResult(eventsRead, epes.EventsLost, totalTimeSw.Elapsed);
+ };
+ }
+
+ /// <summary>
+ /// This uses CopyTo to copy the trace into a filesystem first, and then uses EventPipeEventSource
+ /// on the file to post-process it and return the total # of events read.
+ /// </summary>
+ static Func<int, TestResult> UseFS(bool rundown, int bufferSize)
+ {
+ return (int pid) =>
+ {
+ int eventsRead = 0;
+ var totalTimeSw = new Stopwatch();
+ const string fileName = "./temp.nettrace";
+
+ EventPipeSession session = GetSession(pid, rundown, bufferSize);
+ Console.WriteLine("Session created.");
+
+ using (FileStream fs = new FileStream(fileName, FileMode.Create, FileAccess.Write))
+ {
+ totalTimeSw.Start();
+ session.EventStream.CopyTo(fs);
+ totalTimeSw.Stop();
+ }
+ EventPipeEventSource epes = new EventPipeEventSource(fileName);
+ epes.Dynamic.All += (TraceEvent data) => {
+ eventsRead += 1;
+ };
+ epes.Process();
+ Console.WriteLine("Read total: " + eventsRead.ToString());
+ Console.WriteLine("Dropped total: " + epes.EventsLost.ToString());
+
+ return new TestResult(eventsRead, epes.EventsLost, totalTimeSw.Elapsed);
+ };
+ }
+
+ static async Task<int> Orchestrate(
+ IConsole console,
+ CancellationToken ct,
+ FileInfo stressPath,
+ int eventSize,
+ int eventRate,
+ BurstPattern burstPattern,
+ ReaderType readerType,
+ int slowReader,
+ int duration,
+ int cores,
+ int threads,
+ int eventCount,
+ bool rundown,
+ int bufferSize,
+ int iterations,
+ bool pause)
+ {
+ if (!stressPath.Exists)
+ {
+ Console.WriteLine($"");
+ return -1;
+ }
+
+ string readerTypeString = readerType switch
+ {
+ ReaderType.Stream => "Stream",
+ ReaderType.EventPipeEventSource => "EventPipeEventSource",
+ _ => "Stream"
+ };
+
+ var durationTimeSpan = TimeSpan.FromSeconds(duration);
+ var testResults = new TestResults(eventSize);
+
+ Func<int, TestResult> threadProc = readerType switch
+ {
+ ReaderType.Stream => UseFS(rundown, bufferSize),
+ ReaderType.EventPipeEventSource => UseEPES(rundown, bufferSize, slowReader),
+ _ => throw new ArgumentException("Invalid reader type")
+ };
+
+ if (eventRate == -1 && burstPattern != BurstPattern.NONE)
+ throw new ArgumentException("Must have burst pattern of NONE if rate is -1");
+
+ Console.WriteLine($"Configuration: event_size={eventSize}, event_rate={eventRate}, cores={cores}, num_threads={threads}, reader={readerType}, event_rate={(eventRate == -1 ? -1 : eventRate * threads)}, burst_pattern={burstPattern.ToString()}, slow_reader={slowReader}, duration={duration}");
+
+ for (int iteration = 0; iteration < iterations; iteration++)
+ {
+ Console.WriteLine("========================================================");
+ Console.WriteLine($"Starting iteration {iteration + 1}");
+
+ Process eventWritingProc = new Process();
+ eventWritingProc.StartInfo.FileName = stressPath.FullName;
+ eventWritingProc.StartInfo.Arguments = $"--threads {(threads == -1 ? cores.ToString() : threads.ToString())} --event-count {eventCount} --event-size {eventSize} --event-rate {eventRate} --burst-pattern {burstPattern} --duration {(int)durationTimeSpan.TotalSeconds}";
+ eventWritingProc.StartInfo.UseShellExecute = false;
+ eventWritingProc.StartInfo.RedirectStandardInput = true;
+ eventWritingProc.StartInfo.Environment["COMPlus_StressLog"] = "1";
+ eventWritingProc.StartInfo.Environment["COMPlus_LogFacility"] = "2000";
+ eventWritingProc.StartInfo.Environment["COMPlus_LogLevel"] = "8";
+ eventWritingProc.StartInfo.Environment["COMPlus_StressLogSize"] = "0x1000000";
+ eventWritingProc.Start();
+
+ Console.WriteLine($"Executing: {eventWritingProc.StartInfo.FileName} {eventWritingProc.StartInfo.Arguments}");
+ if (!RuntimeInformation.IsOSPlatform(OSPlatform.OSX))
+ {
+ // Set affinity and priority
+ ulong affinityMask = 0;
+ for (int j = 0; j < cores; j++)
+ {
+ affinityMask |= ((ulong)1 << j);
+ }
+ eventWritingProc.ProcessorAffinity = (IntPtr)((ulong)eventWritingProc.ProcessorAffinity & affinityMask);
+ eventWritingProc.PriorityClass = ProcessPriorityClass.RealTime; // Set the process priority to highest possible
+ }
+
+ // Start listening to the event.
+ Task<TestResult> listenerTask = Task.Run(() => threadProc(eventWritingProc.Id), ct);
+
+ if (pause)
+ {
+ Console.WriteLine("Press <enter> to start test");
+ Console.ReadLine();
+ }
+
+ // start the target process
+ StreamWriter writer = eventWritingProc.StandardInput;
+ writer.WriteLine("\r\n");
+ eventWritingProc.WaitForExit();
+
+ var resultTuple = await listenerTask;
+ testResults.Add(resultTuple);
+
+ Console.WriteLine($"Done with iteration {iteration + 1}");
+ Console.WriteLine("========================================================");
+
+ }
+
+ Console.WriteLine(testResults.GenerateSummary());
+ Console.WriteLine(testResults.GenerateStatisticsTable());
+
+ return 0;
+ }
+ }
+}
--- /dev/null
+using System;
+using System.Collections;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+
+namespace Orchestrator
+{
+ public record TestResult(long EventsRead, long EventsDropped, TimeSpan Duration)
+ {
+ public double ThroughputEfficiency => 100 * ((double)EventsRead / ((double)TotalEvents));
+ public double EventThroughput => (double)EventsRead / Duration.TotalSeconds;
+ public long TotalEvents => EventsRead + EventsDropped;
+ }
+
+ public record Stats(string Name, double Min, double Max, double Avg, double Std, string Units = null, int Resolution = 2)
+ {
+ public string FormattedName = $"{Name}{(Units != null ? $" ({Units})" : "")}";
+ public string TableRow => $"|{" " + FormattedName,-35}|{Min.ToString($"N{Resolution}"),20}|{Max.ToString($"N{Resolution}"),20}|{Avg.ToString($"N{Resolution}"),20}|{Std.ToString($"N{Resolution}"),20}|";
+ public static string Separator => $"|{new string('-',35),-35}|{new string('-',20),-20}|{new string('-',20),-20}|{new string('-',20),-20}|{new string('-',20),-20}|";
+ public static string Header => $"|{" stat",-35}|{" Min",-20}|{" Max",-20}|{" Average",-20}|{" Standard Deviation",-20}|";
+ }
+
+ public record StatGenerator(string Name, Func<TestResult,double> Calculation, string Units = null, int Resolution = 2)
+ {
+ public Stats GetStats(IEnumerable<TestResult> results) => results.GetStats(Calculation, Name, Units, Resolution);
+ }
+
+ public class TestResults : IEnumerable<TestResult>
+ {
+ private List<TestResult> _results = new List<TestResult>();
+ private int _eventSize;
+
+ public TestResults(int eventSize)
+ {
+ _eventSize = eventSize;
+ }
+
+ public void Add(TestResult result) =>
+ _results.Add(result);
+
+ public string GenerateSummary()
+ {
+ var sb = new StringBuilder();
+ sb.AppendLine("**** Summary ****");
+ int i = 0;
+ foreach (var result in _results)
+ {
+ sb.AppendLine($"iteration {i++ +1}: {result.EventsRead:N2} events collected, {result.EventsDropped:N2} events dropped in {result.Duration.TotalSeconds:N6} seconds - ({100 * ((double)result.EventsRead / (double)((long)result.EventsRead + result.EventsDropped)):N2}% throughput)");
+ sb.AppendLine($"\t({(double)result.EventsRead / result.Duration.TotalSeconds:N2} events/s) ({((double)result.EventsRead * _eventSize * sizeof(char)) / result.Duration.TotalSeconds:N2} bytes/s)");
+ }
+ return sb.ToString();
+ }
+
+ public string GenerateStatisticsTable()
+ {
+ var sb = new StringBuilder();
+
+ sb.AppendLine();
+ sb.AppendLine(Stats.Separator);
+ sb.AppendLine(Stats.Header);
+ sb.AppendLine(Stats.Separator);
+ foreach (var generator in StatGenerators)
+ sb.AppendLine(generator.GetStats(_results).TableRow);
+ sb.AppendLine(Stats.Separator);
+ return sb.ToString();
+ }
+
+ // Add statistics you want rendered in the results table here
+ private IEnumerable<StatGenerator> StatGenerators
+ {
+ get
+ {
+ yield return new StatGenerator("Events Read", result => result.EventsRead);
+ yield return new StatGenerator("Events Dropped", result => result.EventsDropped);
+ yield return new StatGenerator("Throughput Efficiency", result => result.ThroughputEfficiency, "%");
+ yield return new StatGenerator("Event Throughput", result => result.EventThroughput, "events/sec");
+ yield return new StatGenerator("Data Throughput", result => result.EventThroughput * sizeof(char) * _eventSize, "Bytes/sec");
+ yield return new StatGenerator("Duration", result => result.Duration.TotalSeconds, "seconds", 6);
+ }
+ }
+
+ public IEnumerator<TestResult> GetEnumerator()
+ {
+ return _results.GetEnumerator();
+ }
+
+ IEnumerator IEnumerable.GetEnumerator()
+ {
+ return _results.GetEnumerator();
+ }
+ }
+
+ public static class Extensions
+ {
+ public static Stats GetStats(this IEnumerable<TestResult> results, Func<TestResult, double> calculation, string name, string units = null, int resolution = 2)
+ {
+ double min = results.Min(calculation);
+ double max = results.Max(calculation);
+ double avg = results.Average(calculation);
+ double std = Math.Sqrt(results.Average(result => Math.Pow(calculation(result) - avg, 2)));
+
+ return new Stats(name, min, max, avg, std, units, resolution);
+ }
+ }
+}
\ No newline at end of file
--- /dev/null
+# EventPipe Stress
+
+You can use `Orchestrator` and `Stress` to run several stress test scenarios for EventPipe.
+
+These tools are meant for developers working on EventPipe in dotnet/runtime.
+
+## Orchestrator
+
+Help text:
+
+```
+$ Orchestrator -h
+Orchestrator:
+ EventPipe Stress Tester - Orchestrator
+
+Usage:
+ Orchestrator [options] <stress-path>
+
+Arguments:
+ <stress-path> The location of the Stress executable.
+
+Options:
+ --event-size <event-size> The size of the event payload. The payload is a string, so the actual size will be eventSize * sizeof(char) where sizeof(char) is 2 Bytes due to Unicode in C#. [default: 100]
+ --event-rate <event-rate> The rate of events in events/sec. -1 means 'as fast as possible'. [default: -1]
+ --burst-pattern <BOLUS|DRIP|HEAVY_DRIP|NONE> The burst pattern to send events in. [default: NONE]
+ --reader-type <EventPipeEventSource|Stream> The method to read the stream of events. [default: Stream]
+ --slow-reader <slow-reader> <Only valid for EventPipeEventSource reader> Delay every read by this many milliseconds. [default: 0]
+ --duration <duration> The number of seconds to send events for. [default: 60]
+ --cores <cores> The number of logical cores to restrict the writing process to. [default: 8]
+ --threads <threads> The number of threads writing events. [default: 1]
+ --event-count <event-count> The total number of events to write per thread. -1 means no limit [default: -1]
+ --rundown Should the EventPipe session request rundown events? [default: True]
+ --buffer-size <buffer-size> The size of the buffer requested in the EventPipe session [default: 256]
+ --iterations <iterations> The number of times to run the test. [default: 1]
+ --pause Should the orchestrator pause before starting each test phase for a debugger to attach? [default: False]
+ --version Show version information
+ -?, -h, --help Show help and usage information
+```
+
+## Stress
+
+Help text:
+
+```
+$ Stress -h
+Stress:
+ EventPipe Stress Tester - Stress
+
+Usage:
+ Stress [options]
+
+Options:
+ --event-size <event-size> The size of the event payload. The payload is a string, so the actual size will be eventSize * sizeof(char) where sizeof(char) is 2 Bytes due to Unicode in C#. [default: 100]
+ --event-rate <event-rate> The rate of events in events/sec. -1 means 'as fast as possible'. [default: -1]
+ --burst-pattern <BOLUS|DRIP|HEAVY_DRIP|NONE> The burst pattern to send events in. [default: NONE]
+ --duration <duration> The number of seconds to send events for. [default: 60]
+ --threads <threads> The number of threads writing events. [default: 1]
+ --event-count <event-count> The total number of events to write per thread. -1 means no limit [default: -1]
+ --version Show version information
+ -?, -h, --help Show help and usage information
+```
+
+## Usage
+
+### Prerequisites
+
+1. Build `Orchestrator`
+2. Publish `Stress` as a self contained application (`dotnet publish -r <RID> --self-contained`)
+3. (optional) Copy over the runtime bits in the `Stress` publish location to test private runtime builds
+
+### Basic Scenarios
+
+1. Send as many events as possible in `N` seconds
+
+`Orchestrate <path-to-Stress> --duration <N> --iterations 100`
+
+2. Send `N` events as fast as possible
+
+`Orchestrate <path-to-Stress> --event-count <N> --iterations 100`
+
+3. Send `N` events in a burst pattern of `M` events/sec
+
+`Orchestrate <path-to-Stress> --event-count <N> --event-rate <M> --burst-pattern bolus --iterations 100`
+
+### Sample Output
+
+```
+**** Summary ****
+iteration 1: 102,678.00 events collected, 0.00 events dropped in 0.283581 seconds - (100.00% throughput)
+ (362,076.44 events/s) (181,038,221.88 bytes/s)
+iteration 2: 102,678.00 events collected, 0.00 events dropped in 0.634398 seconds - (100.00% throughput)
+ (161,851.05 events/s) (80,925,526.10 bytes/s)
+iteration 3: 102,678.00 events collected, 0.00 events dropped in 0.652566 seconds - (100.00% throughput)
+ (157,344.96 events/s) (78,672,477.98 bytes/s)
+iteration 4: 102,678.00 events collected, 0.00 events dropped in 0.661910 seconds - (100.00% throughput)
+ (155,123.83 events/s) (77,561,915.90 bytes/s)
+iteration 5: 102,678.00 events collected, 0.00 events dropped in 0.632966 seconds - (100.00% throughput)
+ (162,217.35 events/s) (81,108,673.20 bytes/s)
+
+
+|-----------------------------------|--------------------|--------------------|--------------------|--------------------|
+| stat | Min | Max | Average | Standard Deviation |
+|-----------------------------------|--------------------|--------------------|--------------------|--------------------|
+| Events Read | 102,678.00| 102,678.00| 102,678.00| 0.00|
+| Events Dropped | 0.00| 0.00| 0.00| 0.00|
+| Throughput Efficiency (%) | 100.00| 100.00| 100.00| 0.00|
+| Event Throughput (events/sec) | 155,123.83| 362,076.44| 199,722.73| 81,221.41|
+| Data Throughput (Bytes/sec) | 77,561,915.90| 181,038,221.88| 99,861,363.01| 40,610,702.78|
+| Duration (seconds) | 0.283581| 0.661910| 0.573084| 0.145165|
+|-----------------------------------|--------------------|--------------------|--------------------|--------------------|
+```
\ No newline at end of file
--- /dev/null
+using System;
+using System.CommandLine;
+using System.CommandLine.Builder;
+using System.CommandLine.Invocation;
+using System.CommandLine.Parsing;
+using System.Diagnostics.Tracing;
+using System.Linq;
+using System.Threading;
+using System.Threading.Tasks;
+using Common;
+
+namespace Stress
+{
+ class MySource : EventSource
+ {
+ public static MySource Log = new MySource();
+ public static string s_SmallPayload = new String('a', 100);
+ public static string s_BigPayload = new String('a', 10000);
+ public static string s_Payload = new String('a', 100);
+
+ public void FireSmallEvent() { WriteEvent(1, s_SmallPayload); }
+ public void FireBigEvent() { WriteEvent(1, s_BigPayload); }
+ public void FireEvent() => WriteEvent(1, s_Payload);
+ }
+
+ class Program
+ {
+ private static bool finished = false;
+ private static int eventRate = -1;
+ private static BurstPattern burstPattern = BurstPattern.NONE;
+ private static Action threadProc = null;
+ private static Action makeThreadProc(int eventCount)
+ {
+ Func<long> burst = BurstPatternMethods.Burst(burstPattern, eventRate, MySource.Log.FireEvent, BurstPatternMethods.BusySleepAction);
+ if (eventCount != -1)
+ {
+ return () => {
+ long messagesSent = 0;
+ while (!finished && messagesSent < eventCount)
+ messagesSent += burst();
+ };
+ }
+ else
+ return () => { while (!finished) { burst(); } };
+ }
+
+ private delegate Task<int> RootCommandHandler(IConsole console, CancellationToken ct, int eventSize, int eventRate, BurstPattern burstPattern, int threads, int duration, int eventCount);
+
+ private static CommandLineBuilder BuildCommandLine()
+ {
+ var rootCommand = new RootCommand("EventPipe Stress Tester - Stress")
+ {
+ CommandLineOptions.EventSizeOption,
+ CommandLineOptions.EventRateOption,
+ CommandLineOptions.BurstPatternOption,
+ CommandLineOptions.DurationOption,
+ CommandLineOptions.ThreadsOption,
+ CommandLineOptions.EventCountOption
+ };
+
+
+ rootCommand.Handler = CommandHandler.Create((RootCommandHandler)Run);
+ return new CommandLineBuilder(rootCommand);
+ }
+
+ static async Task<int> Main(string[] args)
+ {
+ return await BuildCommandLine()
+ .UseDefaults()
+ .Build()
+ .InvokeAsync(args);
+ }
+
+ private static async Task<int> Run(IConsole console, CancellationToken ct, int eventSize, int eventRate, BurstPattern burstPattern, int threads, int duration, int eventCount)
+ {
+ TimeSpan durationTimeSpan = TimeSpan.FromSeconds(duration);
+
+ MySource.s_Payload = new String('a', eventSize);
+
+ threadProc = makeThreadProc(eventCount);
+
+ Thread[] threadArray = new Thread[threads];
+ TaskCompletionSource<bool>[] tcsArray = new TaskCompletionSource<bool>[threads];
+
+ for (int i = 0; i < threads; i++)
+ {
+ var tcs = new TaskCompletionSource<bool>();
+ threadArray[i] = new Thread(() => { threadProc(); tcs.TrySetResult(true); });
+ tcsArray[i] = tcs;
+ }
+
+ Console.WriteLine($"SUBPROCESSS :: Running - Threads: {threads}, EventSize: {eventSize * sizeof(char):N} bytes, EventCount: {(eventCount == -1 ? -1 : eventCount * threads)}, EventRate: {(eventRate == -1 ? -1 : eventRate * threads)} events/sec, duration: {durationTimeSpan.TotalSeconds}s");
+ Console.ReadLine();
+
+ for (int i = 0; i < threads; i++)
+ {
+ threadArray[i].Start();
+ }
+
+ if (eventCount != -1)
+ Console.WriteLine($"SUBPROCESSS :: Sleeping for {durationTimeSpan.TotalSeconds} seconds or until {eventCount} events have been sent on each thread, whichever happens first");
+ else
+ Console.WriteLine($"SUBPROCESSS :: Sleeping for {durationTimeSpan.TotalSeconds} seconds");
+
+ Task threadCompletionTask = Task.WhenAll(tcsArray.Select(tcs => tcs.Task));
+ Task result = await Task.WhenAny(Task.Delay(durationTimeSpan), threadCompletionTask);
+ finished = true;
+
+ await threadCompletionTask;
+ Console.WriteLine("SUBPROCESSS :: Done. Goodbye!");
+ return 0;
+ }
+ }
+}
--- /dev/null
+<Project Sdk="Microsoft.NET.Sdk">
+
+ <PropertyGroup>
+ <OutputType>Exe</OutputType>
+ <TargetFramework>net5.0</TargetFramework>
+ </PropertyGroup>
+
+ <ItemGroup>
+ <ProjectReference Include="../Common/Common.csproj" />
+ <PackageReference Include="System.CommandLine" Version="2.0.0-beta1.20574.7" />
+ </ItemGroup>
+
+</Project>