From: John Salem Date: Mon, 4 Jan 2021 19:51:58 +0000 (-0800) Subject: Add EventPipeStress framework (#1843) X-Git-Tag: submit/tizen/20210909.063632~17^2~220 X-Git-Url: http://review.tizen.org/git/?a=commitdiff_plain;h=26a371c8c4627bf35c1ff028a5ede1d5ace0c5be;p=platform%2Fcore%2Fdotnet%2Fdiagnostics.git Add EventPipeStress framework (#1843) --- diff --git a/src/tests/EventPipeStress/Common/BurstPattern.cs b/src/tests/EventPipeStress/Common/BurstPattern.cs new file mode 100644 index 000000000..c27432f49 --- /dev/null +++ b/src/tests/EventPipeStress/Common/BurstPattern.cs @@ -0,0 +1,116 @@ +using System; +using System.Diagnostics; +using System.Threading; + +namespace Common +{ + // Each pattern sends N events per second, but varies the frequency + // across that second to simulate various burst patterns. + public enum BurstPattern : int + { + DRIP = 0, // (default) to send N events per second, sleep 1000/N ms between sends + BOLUS = 1, // each second send N events then stop + HEAVY_DRIP = 2, // each second send M bursts of N/M events + NONE = -1 // no burst pattern + } + + public static class BurstPatternMethods + { + public static BurstPattern ToBurstPattern(this int n) + { + return n > 2 || n < -1 ? BurstPattern.NONE : (BurstPattern)n; + } + + public static BurstPattern ToBurstPattern(this string str) + { + if (Int32.TryParse(str, out int result)) + return result.ToBurstPattern(); + else + { + return str.ToLowerInvariant() switch + { + "drip" => BurstPattern.DRIP, + "bolus" => BurstPattern.BOLUS, + "heavy_drip" => BurstPattern.HEAVY_DRIP, + "none" => BurstPattern.NONE, + _ => BurstPattern.NONE + }; + } + } + + public static string ToString(this BurstPattern burstPattern) => burstPattern switch + { + BurstPattern.DRIP => "DRIP", + BurstPattern.BOLUS => "BOLUS", + BurstPattern.HEAVY_DRIP => "HEAVY_DRIP", + BurstPattern.NONE => "NONE", + _ => "UNKOWN" + }; + + public static void DefaultSleepAction(int duration) => Thread.Sleep(duration); + public static void BusySleepAction(int duration) + { + string busyString = "0"; + DateTime start = DateTime.Now; + while (DateTime.Now.Subtract(start).TotalMilliseconds < duration) + { + busyString += "0"; + } + } + + /// + /// Invoke times in 1 second using the provided + /// + public static Func Burst(BurstPattern pattern, int rate, Action method, Action sleepAction = null) + { + if (rate == 0) + throw new ArgumentException("Rate cannot be 0"); + if (sleepAction == null) + throw new ArgumentException("sleep action cannot be null"); + + switch (pattern) + { + case BurstPattern.DRIP: + { + int sleepInMs = (int)Math.Floor(1000.0/rate); + return () => { method(); sleepAction?.Invoke(sleepInMs); return 1; }; + } + case BurstPattern.BOLUS: + { + return () => + { + Stopwatch sw = new Stopwatch(); + sw.Start(); + for (int i = 0; i < rate; i++) { method(); } + sw.Stop(); + if (sw.Elapsed.TotalSeconds < 1) + sleepAction?.Invoke(1000 - (int)Math.Floor((double)sw.ElapsedMilliseconds)); + return rate; + }; + } + case BurstPattern.HEAVY_DRIP: + { + int nDrips = 4; + int nEventsPerDrip = (int)Math.Floor((double)rate / nDrips); + int sleepInMs = (int)Math.Floor((1000.0 / rate) / nDrips); + return () => + { + for (int i = 0; i < nDrips; i++) + { + for (int j = 0; j < nEventsPerDrip; i++) + method(); + sleepAction?.Invoke(sleepInMs); + } + return nEventsPerDrip * nDrips; + }; + } + case BurstPattern.NONE: + { + return () => { method(); return 1; }; + } + default: + throw new ArgumentException("Unkown burst pattern"); + } + } + } +} \ No newline at end of file diff --git a/src/tests/EventPipeStress/Common/Common.csproj b/src/tests/EventPipeStress/Common/Common.csproj new file mode 100644 index 000000000..614763693 --- /dev/null +++ b/src/tests/EventPipeStress/Common/Common.csproj @@ -0,0 +1,11 @@ + + + + netstandard2.1 + + + + + + + \ No newline at end of file diff --git a/src/tests/EventPipeStress/Common/Options.cs b/src/tests/EventPipeStress/Common/Options.cs new file mode 100644 index 000000000..aa2c25756 --- /dev/null +++ b/src/tests/EventPipeStress/Common/Options.cs @@ -0,0 +1,129 @@ +using System; +using System.CommandLine; +using System.CommandLine.Parsing; + +namespace Common +{ + public static class CommandLineOptions + { + static public ValidateSymbol GreaterThanZeroValidator = (OptionResult result) => + { + if (result.GetValueOrDefault() <= 0) + return $"{result.Option.Name} must be greater than or equal to 0"; + return null; + }; + + static public ValidateSymbol GreaterThanOrEqualZeroValidator = (OptionResult result) => + { + if (result.GetValueOrDefault() < 0) + return $"{result.Option.Name} must be greater than 0"; + return null; + }; + + static public ValidateSymbol MustBeNegOneOrPositiveValidator = (OptionResult result) => + { + int val = result.GetValueOrDefault(); + if (val < -1 || val == 0) + return $"{result.Option.Name} must be -1 or greater than 0"; + return null; + }; + + static private Option _eventSizeOption = null; + static public Option EventSizeOption + { + get + { + if (_eventSizeOption != null) + return _eventSizeOption; + + _eventSizeOption = new Option( + alias: "--event-size", + getDefaultValue: () => 100, + description: "The size of the event payload. The payload is a string, so the actual size will be eventSize * sizeof(char) where sizeof(char) is 2 Bytes due to Unicode in C#."); + _eventSizeOption.AddValidator(GreaterThanZeroValidator); + return _eventSizeOption; + } + private set {} + } + + static private Option _eventRateOption = null; + static public Option EventRateOption + { + get + { + if (_eventRateOption != null) + return _eventRateOption; + + _eventRateOption = new Option( + alias: "--event-rate", + getDefaultValue: () => -1, + description: "The rate of events in events/sec. -1 means 'as fast as possible'."); + _eventRateOption.AddValidator(MustBeNegOneOrPositiveValidator); + return _eventRateOption; + } + private set {} + } + + static public Option BurstPatternOption = + new Option( + alias: "--burst-pattern", + getDefaultValue: () => BurstPattern.NONE, + description: "The burst pattern to send events in."); + + static private Option _durationOption = null; + static public Option DurationOption + { + get + { + if (_durationOption != null) + return _durationOption; + + _durationOption = new Option( + alias: "--duration", + getDefaultValue: () => 60, + description: "The number of seconds to send events for."); + _durationOption.AddValidator(GreaterThanZeroValidator); + return _durationOption; + } + private set {} + } + + + static private Option _threadsOption = null; + static public Option ThreadsOption + { + get + { + if (_threadsOption != null) + return _threadsOption; + + _threadsOption = new Option( + alias: "--threads", + getDefaultValue: () => 1, + description: "The number of threads writing events."); + _threadsOption.AddValidator(GreaterThanZeroValidator); + return _threadsOption; + } + private set {} + } + + + static private Option _eventCountOption = null; + static public Option EventCountOption + { + get + { + if (_eventCountOption != null) + return _eventCountOption; + + _eventCountOption = new Option( + alias: "--event-count", + getDefaultValue: () => -1, + description: "The total number of events to write per thread. -1 means no limit"); + _eventCountOption.AddValidator(MustBeNegOneOrPositiveValidator); + return _eventCountOption; + } + private set {} + } + } +} \ No newline at end of file diff --git a/src/tests/EventPipeStress/Orchestrator/CommandLine.cs b/src/tests/EventPipeStress/Orchestrator/CommandLine.cs new file mode 100644 index 000000000..994977066 --- /dev/null +++ b/src/tests/EventPipeStress/Orchestrator/CommandLine.cs @@ -0,0 +1,108 @@ +using Common; +using System; +using System.CommandLine; +using System.CommandLine.Parsing; + +namespace Orchestrator +{ + public static class OrchestrateCommandLine + { + static public Option ReaderTypeOption = + new Option( + alias: "--reader-type", + getDefaultValue: () => ReaderType.Stream, + description: "The method to read the stream of events."); + + static public Option PauseOption = + new Option( + alias: "--pause", + getDefaultValue: () => false, + description: "Should the orchestrator pause before starting each test phase for a debugger to attach?"); + + static public Option RundownOption = + new Option( + alias: "--rundown", + getDefaultValue: () => true, + description: "Should the EventPipe session request rundown events?"); + + static private Option _bufferSizeOption = null; + static public Option BufferSizeOption + { + get + { + if (_bufferSizeOption != null) + return _bufferSizeOption; + + _bufferSizeOption = new Option( + alias: "--buffer-size", + getDefaultValue: () => 256, + description: "The size of the buffer requested in the EventPipe session"); + _bufferSizeOption.AddValidator(CommandLineOptions.GreaterThanZeroValidator); + return _bufferSizeOption; + } + private set {} + } + + static private Option _slowReaderOption = null; + static public Option SlowReaderOption + { + get + { + if (_slowReaderOption != null) + return _slowReaderOption; + + _slowReaderOption = new Option( + alias: "--slow-reader", + getDefaultValue: () => 0, + description: " Delay every read by this many milliseconds."); + _slowReaderOption.AddValidator(CommandLineOptions.GreaterThanOrEqualZeroValidator); + return _slowReaderOption; + } + private set {} + } + + static private Option _coresOption = null; + static public Option CoresOption + { + get + { + if (_coresOption != null) + return _coresOption; + + _coresOption = new Option( + alias: "--cores", + getDefaultValue: () => Environment.ProcessorCount, + description: "The number of logical cores to restrict the writing process to."); + _coresOption.AddValidator(CoreValueMustBeFeasibleValidator); + return _coresOption; + } + private set {} + } + + static private Option _iterationsOption = null; + static public Option IterationsOption + { + get + { + if (_iterationsOption != null) + return _iterationsOption; + + _iterationsOption = new Option( + alias: "--iterations", + getDefaultValue: () => 1, + description: "The number of times to run the test."); + _iterationsOption.AddValidator(CommandLineOptions.GreaterThanZeroValidator); + return _iterationsOption; + } + private set {} + } + + static public ValidateSymbol CoreValueMustBeFeasibleValidator = (OptionResult result) => + { + int val = result.GetValueOrDefault(); + if (val < 1 || val > Environment.ProcessorCount) + return $"Core count must be between 1 and {Environment.ProcessorCount}"; + return null; + }; + } +} \ No newline at end of file diff --git a/src/tests/EventPipeStress/Orchestrator/Orchestrator.csproj b/src/tests/EventPipeStress/Orchestrator/Orchestrator.csproj new file mode 100644 index 000000000..063a1c1ce --- /dev/null +++ b/src/tests/EventPipeStress/Orchestrator/Orchestrator.csproj @@ -0,0 +1,19 @@ + + + + Exe + net5.0 + + + + + + + + + + + + + + diff --git a/src/tests/EventPipeStress/Orchestrator/Program.cs b/src/tests/EventPipeStress/Orchestrator/Program.cs new file mode 100644 index 000000000..34e867916 --- /dev/null +++ b/src/tests/EventPipeStress/Orchestrator/Program.cs @@ -0,0 +1,297 @@ +using System; +using System.Diagnostics; +using System.Diagnostics.Tracing; +using System.IO; +using System.Security.Principal; +using System.Threading; +using System.Threading.Tasks; +using System.Runtime.InteropServices; +using System.CommandLine; +using System.CommandLine.Invocation; +using System.CommandLine.Builder; +using System.CommandLine.Parsing; + +using Common; +using Microsoft.Diagnostics.NETCore.Client; +using Microsoft.Diagnostics.Tracing; + +using Process = System.Diagnostics.Process; + +namespace Orchestrator +{ + public enum ReaderType + { + Stream, + EventPipeEventSource + } + + class Program + { + delegate Task RootCommandHandler( + IConsole console, + CancellationToken ct, + FileInfo stressPath, + int eventSize, + int eventRate, + BurstPattern burstPattern, + ReaderType readerType, + int slowReader, + int duration, + int cores, + int threads, + int eventCount, + bool rundown, + int bufferSize, + int iterations, + bool pause); + + // TODO: Collect CPU % of reader and writer while running test and add to stats + // TODO: Standardize and clean up logging from orchestrator and corescaletest + // TODO: Improve error handling + + static async Task Main(string[] args) + { + + if (RuntimeInformation.IsOSPlatform(OSPlatform.Windows)) + { + // Try to run in admin mode in Windows + bool isElevated; + using (WindowsIdentity identity = WindowsIdentity.GetCurrent()) + { + WindowsPrincipal principal = new WindowsPrincipal(identity); + isElevated = principal.IsInRole(WindowsBuiltInRole.Administrator); + } + if (!isElevated) + { + Console.WriteLine("Must run in root/admin mode"); + return -1; + } + } + + return await BuildCommandLine() + .UseDefaults() + .Build() + .InvokeAsync(args); + } + + static CommandLineBuilder BuildCommandLine() + { + var rootCommand = new RootCommand("EventPipe Stress Tester - Orchestrator") + { + new Argument( + name: "stress-path", + description: "The location of the Stress executable." + ), + CommandLineOptions.EventSizeOption, + CommandLineOptions.EventRateOption, + CommandLineOptions.BurstPatternOption, + OrchestrateCommandLine.ReaderTypeOption, + OrchestrateCommandLine.SlowReaderOption, + CommandLineOptions.DurationOption, + OrchestrateCommandLine.CoresOption, + CommandLineOptions.ThreadsOption, + CommandLineOptions.EventCountOption, + OrchestrateCommandLine.RundownOption, + OrchestrateCommandLine.BufferSizeOption, + OrchestrateCommandLine.IterationsOption, + OrchestrateCommandLine.PauseOption + }; + + + rootCommand.Handler = CommandHandler.Create((RootCommandHandler)Orchestrate); + return new CommandLineBuilder(rootCommand); + } + + private static EventPipeSession GetSession(int pid, bool rundown, int bufferSize) + { + DiagnosticsClient client = new DiagnosticsClient(pid); + while (!client.CheckTransport()) + { + Console.WriteLine("still unable to talk"); + Thread.Sleep(50); + } + return client.StartEventPipeSession( + new EventPipeProvider("MySource", EventLevel.Verbose), + requestRundown: rundown, + circularBufferMB: bufferSize); + } + + /// + /// This uses EventPipeEventSource's Stream constructor to parse the events real-time. + /// It then returns the number of events read. + /// + private static Func UseEPES(bool rundown, int bufferSize, int slowReader) + { + return (int pid) => + { + int eventsRead = 0; + var slowReadSw = new Stopwatch(); + var totalTimeSw = new Stopwatch(); + var interval = TimeSpan.FromSeconds(0.75); + + EventPipeSession session = GetSession(pid, rundown, bufferSize); + Console.WriteLine("Session created."); + + EventPipeEventSource epes = new EventPipeEventSource(session.EventStream); + epes.Dynamic.All += (TraceEvent data) => { + eventsRead += 1; + if (slowReader > 0) + { + if (slowReadSw.Elapsed > interval) + { + Thread.Sleep(slowReader); + slowReadSw.Reset(); + } + } + }; + if (slowReader > 0) + slowReadSw.Start(); + totalTimeSw.Start(); + epes.Process(); + totalTimeSw.Stop(); + if (slowReader > 0) + slowReadSw.Stop(); + Console.WriteLine("Read total: " + eventsRead.ToString()); + Console.WriteLine("Dropped total: " + epes.EventsLost.ToString()); + + return new TestResult(eventsRead, epes.EventsLost, totalTimeSw.Elapsed); + }; + } + + /// + /// This uses CopyTo to copy the trace into a filesystem first, and then uses EventPipeEventSource + /// on the file to post-process it and return the total # of events read. + /// + static Func UseFS(bool rundown, int bufferSize) + { + return (int pid) => + { + int eventsRead = 0; + var totalTimeSw = new Stopwatch(); + const string fileName = "./temp.nettrace"; + + EventPipeSession session = GetSession(pid, rundown, bufferSize); + Console.WriteLine("Session created."); + + using (FileStream fs = new FileStream(fileName, FileMode.Create, FileAccess.Write)) + { + totalTimeSw.Start(); + session.EventStream.CopyTo(fs); + totalTimeSw.Stop(); + } + EventPipeEventSource epes = new EventPipeEventSource(fileName); + epes.Dynamic.All += (TraceEvent data) => { + eventsRead += 1; + }; + epes.Process(); + Console.WriteLine("Read total: " + eventsRead.ToString()); + Console.WriteLine("Dropped total: " + epes.EventsLost.ToString()); + + return new TestResult(eventsRead, epes.EventsLost, totalTimeSw.Elapsed); + }; + } + + static async Task Orchestrate( + IConsole console, + CancellationToken ct, + FileInfo stressPath, + int eventSize, + int eventRate, + BurstPattern burstPattern, + ReaderType readerType, + int slowReader, + int duration, + int cores, + int threads, + int eventCount, + bool rundown, + int bufferSize, + int iterations, + bool pause) + { + if (!stressPath.Exists) + { + Console.WriteLine($""); + return -1; + } + + string readerTypeString = readerType switch + { + ReaderType.Stream => "Stream", + ReaderType.EventPipeEventSource => "EventPipeEventSource", + _ => "Stream" + }; + + var durationTimeSpan = TimeSpan.FromSeconds(duration); + var testResults = new TestResults(eventSize); + + Func threadProc = readerType switch + { + ReaderType.Stream => UseFS(rundown, bufferSize), + ReaderType.EventPipeEventSource => UseEPES(rundown, bufferSize, slowReader), + _ => throw new ArgumentException("Invalid reader type") + }; + + if (eventRate == -1 && burstPattern != BurstPattern.NONE) + throw new ArgumentException("Must have burst pattern of NONE if rate is -1"); + + Console.WriteLine($"Configuration: event_size={eventSize}, event_rate={eventRate}, cores={cores}, num_threads={threads}, reader={readerType}, event_rate={(eventRate == -1 ? -1 : eventRate * threads)}, burst_pattern={burstPattern.ToString()}, slow_reader={slowReader}, duration={duration}"); + + for (int iteration = 0; iteration < iterations; iteration++) + { + Console.WriteLine("========================================================"); + Console.WriteLine($"Starting iteration {iteration + 1}"); + + Process eventWritingProc = new Process(); + eventWritingProc.StartInfo.FileName = stressPath.FullName; + eventWritingProc.StartInfo.Arguments = $"--threads {(threads == -1 ? cores.ToString() : threads.ToString())} --event-count {eventCount} --event-size {eventSize} --event-rate {eventRate} --burst-pattern {burstPattern} --duration {(int)durationTimeSpan.TotalSeconds}"; + eventWritingProc.StartInfo.UseShellExecute = false; + eventWritingProc.StartInfo.RedirectStandardInput = true; + eventWritingProc.StartInfo.Environment["COMPlus_StressLog"] = "1"; + eventWritingProc.StartInfo.Environment["COMPlus_LogFacility"] = "2000"; + eventWritingProc.StartInfo.Environment["COMPlus_LogLevel"] = "8"; + eventWritingProc.StartInfo.Environment["COMPlus_StressLogSize"] = "0x1000000"; + eventWritingProc.Start(); + + Console.WriteLine($"Executing: {eventWritingProc.StartInfo.FileName} {eventWritingProc.StartInfo.Arguments}"); + if (!RuntimeInformation.IsOSPlatform(OSPlatform.OSX)) + { + // Set affinity and priority + ulong affinityMask = 0; + for (int j = 0; j < cores; j++) + { + affinityMask |= ((ulong)1 << j); + } + eventWritingProc.ProcessorAffinity = (IntPtr)((ulong)eventWritingProc.ProcessorAffinity & affinityMask); + eventWritingProc.PriorityClass = ProcessPriorityClass.RealTime; // Set the process priority to highest possible + } + + // Start listening to the event. + Task listenerTask = Task.Run(() => threadProc(eventWritingProc.Id), ct); + + if (pause) + { + Console.WriteLine("Press to start test"); + Console.ReadLine(); + } + + // start the target process + StreamWriter writer = eventWritingProc.StandardInput; + writer.WriteLine("\r\n"); + eventWritingProc.WaitForExit(); + + var resultTuple = await listenerTask; + testResults.Add(resultTuple); + + Console.WriteLine($"Done with iteration {iteration + 1}"); + Console.WriteLine("========================================================"); + + } + + Console.WriteLine(testResults.GenerateSummary()); + Console.WriteLine(testResults.GenerateStatisticsTable()); + + return 0; + } + } +} diff --git a/src/tests/EventPipeStress/Orchestrator/TestResult.cs b/src/tests/EventPipeStress/Orchestrator/TestResult.cs new file mode 100644 index 000000000..28f086be4 --- /dev/null +++ b/src/tests/EventPipeStress/Orchestrator/TestResult.cs @@ -0,0 +1,106 @@ +using System; +using System.Collections; +using System.Collections.Generic; +using System.Linq; +using System.Text; + +namespace Orchestrator +{ + public record TestResult(long EventsRead, long EventsDropped, TimeSpan Duration) + { + public double ThroughputEfficiency => 100 * ((double)EventsRead / ((double)TotalEvents)); + public double EventThroughput => (double)EventsRead / Duration.TotalSeconds; + public long TotalEvents => EventsRead + EventsDropped; + } + + public record Stats(string Name, double Min, double Max, double Avg, double Std, string Units = null, int Resolution = 2) + { + public string FormattedName = $"{Name}{(Units != null ? $" ({Units})" : "")}"; + public string TableRow => $"|{" " + FormattedName,-35}|{Min.ToString($"N{Resolution}"),20}|{Max.ToString($"N{Resolution}"),20}|{Avg.ToString($"N{Resolution}"),20}|{Std.ToString($"N{Resolution}"),20}|"; + public static string Separator => $"|{new string('-',35),-35}|{new string('-',20),-20}|{new string('-',20),-20}|{new string('-',20),-20}|{new string('-',20),-20}|"; + public static string Header => $"|{" stat",-35}|{" Min",-20}|{" Max",-20}|{" Average",-20}|{" Standard Deviation",-20}|"; + } + + public record StatGenerator(string Name, Func Calculation, string Units = null, int Resolution = 2) + { + public Stats GetStats(IEnumerable results) => results.GetStats(Calculation, Name, Units, Resolution); + } + + public class TestResults : IEnumerable + { + private List _results = new List(); + private int _eventSize; + + public TestResults(int eventSize) + { + _eventSize = eventSize; + } + + public void Add(TestResult result) => + _results.Add(result); + + public string GenerateSummary() + { + var sb = new StringBuilder(); + sb.AppendLine("**** Summary ****"); + int i = 0; + foreach (var result in _results) + { + sb.AppendLine($"iteration {i++ +1}: {result.EventsRead:N2} events collected, {result.EventsDropped:N2} events dropped in {result.Duration.TotalSeconds:N6} seconds - ({100 * ((double)result.EventsRead / (double)((long)result.EventsRead + result.EventsDropped)):N2}% throughput)"); + sb.AppendLine($"\t({(double)result.EventsRead / result.Duration.TotalSeconds:N2} events/s) ({((double)result.EventsRead * _eventSize * sizeof(char)) / result.Duration.TotalSeconds:N2} bytes/s)"); + } + return sb.ToString(); + } + + public string GenerateStatisticsTable() + { + var sb = new StringBuilder(); + + sb.AppendLine(); + sb.AppendLine(Stats.Separator); + sb.AppendLine(Stats.Header); + sb.AppendLine(Stats.Separator); + foreach (var generator in StatGenerators) + sb.AppendLine(generator.GetStats(_results).TableRow); + sb.AppendLine(Stats.Separator); + return sb.ToString(); + } + + // Add statistics you want rendered in the results table here + private IEnumerable StatGenerators + { + get + { + yield return new StatGenerator("Events Read", result => result.EventsRead); + yield return new StatGenerator("Events Dropped", result => result.EventsDropped); + yield return new StatGenerator("Throughput Efficiency", result => result.ThroughputEfficiency, "%"); + yield return new StatGenerator("Event Throughput", result => result.EventThroughput, "events/sec"); + yield return new StatGenerator("Data Throughput", result => result.EventThroughput * sizeof(char) * _eventSize, "Bytes/sec"); + yield return new StatGenerator("Duration", result => result.Duration.TotalSeconds, "seconds", 6); + } + } + + public IEnumerator GetEnumerator() + { + return _results.GetEnumerator(); + } + + IEnumerator IEnumerable.GetEnumerator() + { + return _results.GetEnumerator(); + } + } + + public static class Extensions + { + public static Stats GetStats(this IEnumerable results, Func calculation, string name, string units = null, int resolution = 2) + { + double min = results.Min(calculation); + double max = results.Max(calculation); + double avg = results.Average(calculation); + double std = Math.Sqrt(results.Average(result => Math.Pow(calculation(result) - avg, 2))); + + return new Stats(name, min, max, avg, std, units, resolution); + } + } +} \ No newline at end of file diff --git a/src/tests/EventPipeStress/README.md b/src/tests/EventPipeStress/README.md new file mode 100644 index 000000000..57e91ff29 --- /dev/null +++ b/src/tests/EventPipeStress/README.md @@ -0,0 +1,111 @@ +# EventPipe Stress + +You can use `Orchestrator` and `Stress` to run several stress test scenarios for EventPipe. + +These tools are meant for developers working on EventPipe in dotnet/runtime. + +## Orchestrator + +Help text: + +``` +$ Orchestrator -h +Orchestrator: + EventPipe Stress Tester - Orchestrator + +Usage: + Orchestrator [options] + +Arguments: + The location of the Stress executable. + +Options: + --event-size The size of the event payload. The payload is a string, so the actual size will be eventSize * sizeof(char) where sizeof(char) is 2 Bytes due to Unicode in C#. [default: 100] + --event-rate The rate of events in events/sec. -1 means 'as fast as possible'. [default: -1] + --burst-pattern The burst pattern to send events in. [default: NONE] + --reader-type The method to read the stream of events. [default: Stream] + --slow-reader Delay every read by this many milliseconds. [default: 0] + --duration The number of seconds to send events for. [default: 60] + --cores The number of logical cores to restrict the writing process to. [default: 8] + --threads The number of threads writing events. [default: 1] + --event-count The total number of events to write per thread. -1 means no limit [default: -1] + --rundown Should the EventPipe session request rundown events? [default: True] + --buffer-size The size of the buffer requested in the EventPipe session [default: 256] + --iterations The number of times to run the test. [default: 1] + --pause Should the orchestrator pause before starting each test phase for a debugger to attach? [default: False] + --version Show version information + -?, -h, --help Show help and usage information +``` + +## Stress + +Help text: + +``` +$ Stress -h +Stress: + EventPipe Stress Tester - Stress + +Usage: + Stress [options] + +Options: + --event-size The size of the event payload. The payload is a string, so the actual size will be eventSize * sizeof(char) where sizeof(char) is 2 Bytes due to Unicode in C#. [default: 100] + --event-rate The rate of events in events/sec. -1 means 'as fast as possible'. [default: -1] + --burst-pattern The burst pattern to send events in. [default: NONE] + --duration The number of seconds to send events for. [default: 60] + --threads The number of threads writing events. [default: 1] + --event-count The total number of events to write per thread. -1 means no limit [default: -1] + --version Show version information + -?, -h, --help Show help and usage information +``` + +## Usage + +### Prerequisites + +1. Build `Orchestrator` +2. Publish `Stress` as a self contained application (`dotnet publish -r --self-contained`) +3. (optional) Copy over the runtime bits in the `Stress` publish location to test private runtime builds + +### Basic Scenarios + +1. Send as many events as possible in `N` seconds + +`Orchestrate --duration --iterations 100` + +2. Send `N` events as fast as possible + +`Orchestrate --event-count --iterations 100` + +3. Send `N` events in a burst pattern of `M` events/sec + +`Orchestrate --event-count --event-rate --burst-pattern bolus --iterations 100` + +### Sample Output + +``` +**** Summary **** +iteration 1: 102,678.00 events collected, 0.00 events dropped in 0.283581 seconds - (100.00% throughput) + (362,076.44 events/s) (181,038,221.88 bytes/s) +iteration 2: 102,678.00 events collected, 0.00 events dropped in 0.634398 seconds - (100.00% throughput) + (161,851.05 events/s) (80,925,526.10 bytes/s) +iteration 3: 102,678.00 events collected, 0.00 events dropped in 0.652566 seconds - (100.00% throughput) + (157,344.96 events/s) (78,672,477.98 bytes/s) +iteration 4: 102,678.00 events collected, 0.00 events dropped in 0.661910 seconds - (100.00% throughput) + (155,123.83 events/s) (77,561,915.90 bytes/s) +iteration 5: 102,678.00 events collected, 0.00 events dropped in 0.632966 seconds - (100.00% throughput) + (162,217.35 events/s) (81,108,673.20 bytes/s) + + +|-----------------------------------|--------------------|--------------------|--------------------|--------------------| +| stat | Min | Max | Average | Standard Deviation | +|-----------------------------------|--------------------|--------------------|--------------------|--------------------| +| Events Read | 102,678.00| 102,678.00| 102,678.00| 0.00| +| Events Dropped | 0.00| 0.00| 0.00| 0.00| +| Throughput Efficiency (%) | 100.00| 100.00| 100.00| 0.00| +| Event Throughput (events/sec) | 155,123.83| 362,076.44| 199,722.73| 81,221.41| +| Data Throughput (Bytes/sec) | 77,561,915.90| 181,038,221.88| 99,861,363.01| 40,610,702.78| +| Duration (seconds) | 0.283581| 0.661910| 0.573084| 0.145165| +|-----------------------------------|--------------------|--------------------|--------------------|--------------------| +``` \ No newline at end of file diff --git a/src/tests/EventPipeStress/Stress/Program.cs b/src/tests/EventPipeStress/Stress/Program.cs new file mode 100644 index 000000000..d5289f016 --- /dev/null +++ b/src/tests/EventPipeStress/Stress/Program.cs @@ -0,0 +1,114 @@ +using System; +using System.CommandLine; +using System.CommandLine.Builder; +using System.CommandLine.Invocation; +using System.CommandLine.Parsing; +using System.Diagnostics.Tracing; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; +using Common; + +namespace Stress +{ + class MySource : EventSource + { + public static MySource Log = new MySource(); + public static string s_SmallPayload = new String('a', 100); + public static string s_BigPayload = new String('a', 10000); + public static string s_Payload = new String('a', 100); + + public void FireSmallEvent() { WriteEvent(1, s_SmallPayload); } + public void FireBigEvent() { WriteEvent(1, s_BigPayload); } + public void FireEvent() => WriteEvent(1, s_Payload); + } + + class Program + { + private static bool finished = false; + private static int eventRate = -1; + private static BurstPattern burstPattern = BurstPattern.NONE; + private static Action threadProc = null; + private static Action makeThreadProc(int eventCount) + { + Func burst = BurstPatternMethods.Burst(burstPattern, eventRate, MySource.Log.FireEvent, BurstPatternMethods.BusySleepAction); + if (eventCount != -1) + { + return () => { + long messagesSent = 0; + while (!finished && messagesSent < eventCount) + messagesSent += burst(); + }; + } + else + return () => { while (!finished) { burst(); } }; + } + + private delegate Task RootCommandHandler(IConsole console, CancellationToken ct, int eventSize, int eventRate, BurstPattern burstPattern, int threads, int duration, int eventCount); + + private static CommandLineBuilder BuildCommandLine() + { + var rootCommand = new RootCommand("EventPipe Stress Tester - Stress") + { + CommandLineOptions.EventSizeOption, + CommandLineOptions.EventRateOption, + CommandLineOptions.BurstPatternOption, + CommandLineOptions.DurationOption, + CommandLineOptions.ThreadsOption, + CommandLineOptions.EventCountOption + }; + + + rootCommand.Handler = CommandHandler.Create((RootCommandHandler)Run); + return new CommandLineBuilder(rootCommand); + } + + static async Task Main(string[] args) + { + return await BuildCommandLine() + .UseDefaults() + .Build() + .InvokeAsync(args); + } + + private static async Task Run(IConsole console, CancellationToken ct, int eventSize, int eventRate, BurstPattern burstPattern, int threads, int duration, int eventCount) + { + TimeSpan durationTimeSpan = TimeSpan.FromSeconds(duration); + + MySource.s_Payload = new String('a', eventSize); + + threadProc = makeThreadProc(eventCount); + + Thread[] threadArray = new Thread[threads]; + TaskCompletionSource[] tcsArray = new TaskCompletionSource[threads]; + + for (int i = 0; i < threads; i++) + { + var tcs = new TaskCompletionSource(); + threadArray[i] = new Thread(() => { threadProc(); tcs.TrySetResult(true); }); + tcsArray[i] = tcs; + } + + Console.WriteLine($"SUBPROCESSS :: Running - Threads: {threads}, EventSize: {eventSize * sizeof(char):N} bytes, EventCount: {(eventCount == -1 ? -1 : eventCount * threads)}, EventRate: {(eventRate == -1 ? -1 : eventRate * threads)} events/sec, duration: {durationTimeSpan.TotalSeconds}s"); + Console.ReadLine(); + + for (int i = 0; i < threads; i++) + { + threadArray[i].Start(); + } + + if (eventCount != -1) + Console.WriteLine($"SUBPROCESSS :: Sleeping for {durationTimeSpan.TotalSeconds} seconds or until {eventCount} events have been sent on each thread, whichever happens first"); + else + Console.WriteLine($"SUBPROCESSS :: Sleeping for {durationTimeSpan.TotalSeconds} seconds"); + + Task threadCompletionTask = Task.WhenAll(tcsArray.Select(tcs => tcs.Task)); + Task result = await Task.WhenAny(Task.Delay(durationTimeSpan), threadCompletionTask); + finished = true; + + await threadCompletionTask; + Console.WriteLine("SUBPROCESSS :: Done. Goodbye!"); + return 0; + } + } +} diff --git a/src/tests/EventPipeStress/Stress/Stress.csproj b/src/tests/EventPipeStress/Stress/Stress.csproj new file mode 100644 index 000000000..1f797977b --- /dev/null +++ b/src/tests/EventPipeStress/Stress/Stress.csproj @@ -0,0 +1,13 @@ + + + + Exe + net5.0 + + + + + + + +