Add EventPipeStress framework (#1843)
authorJohn Salem <josalem@microsoft.com>
Mon, 4 Jan 2021 19:51:58 +0000 (11:51 -0800)
committerGitHub <noreply@github.com>
Mon, 4 Jan 2021 19:51:58 +0000 (11:51 -0800)
src/tests/EventPipeStress/Common/BurstPattern.cs [new file with mode: 0644]
src/tests/EventPipeStress/Common/Common.csproj [new file with mode: 0644]
src/tests/EventPipeStress/Common/Options.cs [new file with mode: 0644]
src/tests/EventPipeStress/Orchestrator/CommandLine.cs [new file with mode: 0644]
src/tests/EventPipeStress/Orchestrator/Orchestrator.csproj [new file with mode: 0644]
src/tests/EventPipeStress/Orchestrator/Program.cs [new file with mode: 0644]
src/tests/EventPipeStress/Orchestrator/TestResult.cs [new file with mode: 0644]
src/tests/EventPipeStress/README.md [new file with mode: 0644]
src/tests/EventPipeStress/Stress/Program.cs [new file with mode: 0644]
src/tests/EventPipeStress/Stress/Stress.csproj [new file with mode: 0644]

diff --git a/src/tests/EventPipeStress/Common/BurstPattern.cs b/src/tests/EventPipeStress/Common/BurstPattern.cs
new file mode 100644 (file)
index 0000000..c27432f
--- /dev/null
@@ -0,0 +1,116 @@
+using System;
+using System.Diagnostics;
+using System.Threading;
+
+namespace Common
+{
+    // Each pattern sends N events per second, but varies the frequency
+    // across that second to simulate various burst patterns.
+    public enum BurstPattern : int
+    {
+        DRIP  = 0, // (default) to send N events per second, sleep 1000/N ms between sends
+        BOLUS = 1, // each second send N events then stop
+        HEAVY_DRIP = 2, // each second send M bursts of N/M events
+        NONE = -1 // no burst pattern
+    }
+
+    public static class BurstPatternMethods
+    {
+        public static BurstPattern ToBurstPattern(this int n)
+        {
+            return n > 2 || n < -1 ? BurstPattern.NONE : (BurstPattern)n;
+        }
+
+        public static BurstPattern ToBurstPattern(this string str)
+        {
+            if (Int32.TryParse(str, out int result))
+                return result.ToBurstPattern();
+            else
+            {
+                return str.ToLowerInvariant() switch
+                {
+                    "drip" => BurstPattern.DRIP,
+                    "bolus" => BurstPattern.BOLUS,
+                    "heavy_drip" => BurstPattern.HEAVY_DRIP,
+                    "none" => BurstPattern.NONE,
+                    _ => BurstPattern.NONE
+                };
+            }
+        }
+
+        public static string ToString(this BurstPattern burstPattern) => burstPattern switch
+            {
+                BurstPattern.DRIP => "DRIP",
+                BurstPattern.BOLUS => "BOLUS",
+                BurstPattern.HEAVY_DRIP => "HEAVY_DRIP",
+                BurstPattern.NONE => "NONE",
+                _ => "UNKOWN"
+            };
+
+        public static void DefaultSleepAction(int duration) => Thread.Sleep(duration);
+        public static void BusySleepAction(int duration)
+        {
+            string busyString = "0";
+            DateTime start = DateTime.Now;
+            while (DateTime.Now.Subtract(start).TotalMilliseconds < duration)
+            {
+                busyString += "0";
+            }
+        }
+
+        /// <summary>
+        /// Invoke <param name="method"/> <param name="rate"/> times in 1 second using the <param name="pattern"/> provided
+        /// </summary>
+        public static Func<long> Burst(BurstPattern pattern, int rate, Action method, Action<int> sleepAction = null)
+        {
+            if (rate == 0)
+                throw new ArgumentException("Rate cannot be 0");
+            if (sleepAction == null)
+                throw new ArgumentException("sleep action cannot be null");
+
+            switch (pattern)
+            {
+                case BurstPattern.DRIP:
+                {
+                    int sleepInMs = (int)Math.Floor(1000.0/rate);
+                    return () => { method(); sleepAction?.Invoke(sleepInMs); return 1; };
+                }
+                case BurstPattern.BOLUS:
+                {
+                    return () => 
+                    {
+                        Stopwatch sw = new Stopwatch();
+                        sw.Start();
+                        for (int i = 0; i < rate; i++) { method(); } 
+                        sw.Stop();
+                        if (sw.Elapsed.TotalSeconds < 1)
+                            sleepAction?.Invoke(1000 - (int)Math.Floor((double)sw.ElapsedMilliseconds));
+                        return rate;
+                    };
+                }
+                case BurstPattern.HEAVY_DRIP:
+                {
+                    int nDrips = 4;
+                    int nEventsPerDrip = (int)Math.Floor((double)rate / nDrips);
+                    int sleepInMs = (int)Math.Floor((1000.0 / rate) / nDrips);
+                    return () =>
+                    {
+                        for (int i = 0; i < nDrips; i++)
+                        {
+                            for (int j = 0; j < nEventsPerDrip; i++)
+                                method();
+                            sleepAction?.Invoke(sleepInMs);
+                        }
+                        return nEventsPerDrip * nDrips;
+                    };
+                }
+                case BurstPattern.NONE:
+                {
+                    return () => { method(); return 1; };
+                }
+                default:
+                    throw new ArgumentException("Unkown burst pattern");
+            }
+        }
+    }
+}
\ No newline at end of file
diff --git a/src/tests/EventPipeStress/Common/Common.csproj b/src/tests/EventPipeStress/Common/Common.csproj
new file mode 100644 (file)
index 0000000..6147636
--- /dev/null
@@ -0,0 +1,11 @@
+<Project Sdk="Microsoft.NET.Sdk">
+
+  <PropertyGroup>
+    <TargetFramework>netstandard2.1</TargetFramework>
+  </PropertyGroup>
+
+  <ItemGroup>
+    <PackageReference Include="System.CommandLine" Version="2.0.0-beta1.20574.7" />
+  </ItemGroup>
+
+</Project>
\ No newline at end of file
diff --git a/src/tests/EventPipeStress/Common/Options.cs b/src/tests/EventPipeStress/Common/Options.cs
new file mode 100644 (file)
index 0000000..aa2c257
--- /dev/null
@@ -0,0 +1,129 @@
+using System;
+using System.CommandLine;
+using System.CommandLine.Parsing;
+
+namespace Common
+{
+    public static class CommandLineOptions
+    {
+        static public ValidateSymbol<OptionResult> GreaterThanZeroValidator = (OptionResult result) =>
+        {
+            if (result.GetValueOrDefault<int>() <= 0)
+                return $"{result.Option.Name} must be greater than or equal to 0";
+            return null;
+        };
+
+        static public ValidateSymbol<OptionResult> GreaterThanOrEqualZeroValidator = (OptionResult result) =>
+        {
+            if (result.GetValueOrDefault<int>() < 0)
+                return $"{result.Option.Name} must be greater than 0";
+            return null;
+        };
+
+        static public ValidateSymbol<OptionResult> MustBeNegOneOrPositiveValidator = (OptionResult result) =>
+        {
+            int val = result.GetValueOrDefault<int>();
+            if (val < -1 || val == 0)
+                return $"{result.Option.Name} must be -1 or greater than 0";
+            return null;
+        };
+
+        static private Option<int> _eventSizeOption = null;
+        static public Option<int> EventSizeOption 
+        {
+            get
+            {
+                if (_eventSizeOption != null)
+                    return _eventSizeOption;
+
+                _eventSizeOption = new Option<int>(
+                                        alias: "--event-size",
+                                        getDefaultValue: () => 100,
+                                        description: "The size of the event payload.  The payload is a string, so the actual size will be eventSize * sizeof(char) where sizeof(char) is 2 Bytes due to Unicode in C#.");
+                _eventSizeOption.AddValidator(GreaterThanZeroValidator);
+                return _eventSizeOption;
+            }
+            private set {}
+        }
+
+        static private Option<int> _eventRateOption = null;
+        static public Option<int> EventRateOption
+        {
+            get
+            {
+                if (_eventRateOption != null)
+                    return _eventRateOption;
+
+                _eventRateOption = new Option<int>(
+                                        alias: "--event-rate",
+                                        getDefaultValue: () => -1,
+                                        description: "The rate of events in events/sec.  -1 means 'as fast as possible'.");
+                _eventRateOption.AddValidator(MustBeNegOneOrPositiveValidator);
+                return _eventRateOption;
+            }
+            private set {}
+        }
+
+        static public Option<BurstPattern> BurstPatternOption =
+            new Option<BurstPattern>(
+                alias: "--burst-pattern",
+                getDefaultValue: () => BurstPattern.NONE,
+                description: "The burst pattern to send events in.");
+
+        static private Option<int> _durationOption = null;
+        static public Option<int> DurationOption
+        {
+            get
+            {
+                if (_durationOption != null)
+                    return _durationOption;
+
+                _durationOption = new Option<int>(
+                                        alias: "--duration",
+                                        getDefaultValue: () => 60,
+                                        description: "The number of seconds to send events for.");
+                _durationOption.AddValidator(GreaterThanZeroValidator);
+                return _durationOption;
+            }
+            private set {}
+        }
+
+
+        static private Option<int> _threadsOption = null;
+        static public Option<int> ThreadsOption
+        {
+            get
+            {
+                if (_threadsOption != null)
+                    return _threadsOption;
+
+                _threadsOption = new Option<int>(
+                                        alias: "--threads",
+                                        getDefaultValue: () => 1,
+                                        description: "The number of threads writing events.");
+                _threadsOption.AddValidator(GreaterThanZeroValidator);
+                return _threadsOption;
+            }
+            private set {}
+        }
+
+
+        static private Option<int> _eventCountOption = null;
+        static public Option<int> EventCountOption
+        {
+            get
+            {
+                if (_eventCountOption != null)
+                    return _eventCountOption;
+
+                _eventCountOption = new Option<int>(
+                                            alias: "--event-count",
+                                            getDefaultValue: () => -1,
+                                            description: "The total number of events to write per thread.  -1 means no limit");
+                _eventCountOption.AddValidator(MustBeNegOneOrPositiveValidator);
+                return _eventCountOption;
+            }
+            private set {}
+        }
+    }
+}
\ No newline at end of file
diff --git a/src/tests/EventPipeStress/Orchestrator/CommandLine.cs b/src/tests/EventPipeStress/Orchestrator/CommandLine.cs
new file mode 100644 (file)
index 0000000..9949770
--- /dev/null
@@ -0,0 +1,108 @@
+using Common;
+using System;
+using System.CommandLine;
+using System.CommandLine.Parsing;
+
+namespace Orchestrator
+{
+    public static class OrchestrateCommandLine
+    {
+        static public Option<ReaderType> ReaderTypeOption = 
+            new Option<ReaderType>(
+                alias: "--reader-type",
+                getDefaultValue: () => ReaderType.Stream,
+                description: "The method to read the stream of events.");
+
+        static public Option<bool> PauseOption = 
+            new Option<bool>(
+                alias: "--pause",
+                getDefaultValue: () => false,
+                description: "Should the orchestrator pause before starting each test phase for a debugger to attach?");
+
+        static public Option<bool> RundownOption = 
+            new Option<bool>(
+                alias: "--rundown",
+                getDefaultValue: () => true,
+                description: "Should the EventPipe session request rundown events?");
+
+        static private Option<int> _bufferSizeOption = null;
+        static public Option<int> BufferSizeOption 
+        {
+            get
+            {
+                if (_bufferSizeOption != null)
+                    return _bufferSizeOption;
+
+                _bufferSizeOption = new Option<int>(
+                                            alias: "--buffer-size",
+                                            getDefaultValue: () => 256,
+                                            description: "The size of the buffer requested in the EventPipe session");
+                _bufferSizeOption.AddValidator(CommandLineOptions.GreaterThanZeroValidator);
+                return _bufferSizeOption;
+            }
+            private set {}
+        }
+
+        static private Option<int> _slowReaderOption = null;
+        static public Option<int> SlowReaderOption 
+        {
+            get
+            {
+                if (_slowReaderOption != null)
+                    return _slowReaderOption;
+
+                _slowReaderOption = new Option<int>(
+                                            alias: "--slow-reader",
+                                            getDefaultValue: () => 0,
+                                            description: "<Only valid for EventPipeEventSource reader> Delay every read by this many milliseconds.");
+                _slowReaderOption.AddValidator(CommandLineOptions.GreaterThanOrEqualZeroValidator);
+                return _slowReaderOption;
+            }
+            private set {}
+        }
+
+        static private Option<int> _coresOption = null;
+        static public Option<int> CoresOption 
+        {
+            get
+            {
+                if (_coresOption != null)
+                    return _coresOption;
+
+                _coresOption = new Option<int>(
+                                        alias: "--cores",
+                                        getDefaultValue: () => Environment.ProcessorCount,
+                                        description: "The number of logical cores to restrict the writing process to.");
+                _coresOption.AddValidator(CoreValueMustBeFeasibleValidator);
+                return _coresOption;
+            }
+            private set {}
+        }
+
+        static private Option<int> _iterationsOption = null;
+        static public Option<int> IterationsOption 
+        {
+            get
+            {
+                if (_iterationsOption != null)
+                    return _iterationsOption;
+
+                _iterationsOption = new Option<int>(
+                                        alias: "--iterations",
+                                        getDefaultValue: () => 1,
+                                        description: "The number of times to run the test.");
+                _iterationsOption.AddValidator(CommandLineOptions.GreaterThanZeroValidator);
+                return _iterationsOption;
+            }
+            private set {}
+        }
+
+        static public ValidateSymbol<OptionResult> CoreValueMustBeFeasibleValidator = (OptionResult result) =>
+        {
+            int val = result.GetValueOrDefault<int>();
+            if (val < 1 || val > Environment.ProcessorCount)
+                return $"Core count must be between 1 and {Environment.ProcessorCount}";
+            return null;
+        };
+    }
+}
\ No newline at end of file
diff --git a/src/tests/EventPipeStress/Orchestrator/Orchestrator.csproj b/src/tests/EventPipeStress/Orchestrator/Orchestrator.csproj
new file mode 100644 (file)
index 0000000..063a1c1
--- /dev/null
@@ -0,0 +1,19 @@
+<Project Sdk="Microsoft.NET.Sdk">
+
+  <PropertyGroup>
+    <OutputType>Exe</OutputType>
+    <TargetFramework>net5.0</TargetFramework>
+  </PropertyGroup>
+
+  <ItemGroup>
+    <ProjectReference Include="../Common/Common.csproj" />
+  </ItemGroup>
+
+  <ItemGroup>
+    <PackageReference Include="Microsoft.Diagnostics.NETCore.Client" Version="0.2.137102" />
+    <PackageReference Include="Microsoft.Diagnostics.Tracing.TraceEvent" Version="2.0.58" />
+    <PackageReference Include="System.CommandLine" Version="2.0.0-beta1.20574.7" />
+    <PackageReference Include="System.Security.Principal.Windows" Version="4.7.0" />
+  </ItemGroup>
+
+</Project>
diff --git a/src/tests/EventPipeStress/Orchestrator/Program.cs b/src/tests/EventPipeStress/Orchestrator/Program.cs
new file mode 100644 (file)
index 0000000..34e8679
--- /dev/null
@@ -0,0 +1,297 @@
+using System;
+using System.Diagnostics;
+using System.Diagnostics.Tracing;
+using System.IO;
+using System.Security.Principal;
+using System.Threading;
+using System.Threading.Tasks;
+using System.Runtime.InteropServices;
+using System.CommandLine;
+using System.CommandLine.Invocation;
+using System.CommandLine.Builder;
+using System.CommandLine.Parsing;
+
+using Common;
+using Microsoft.Diagnostics.NETCore.Client;
+using Microsoft.Diagnostics.Tracing;
+
+using Process = System.Diagnostics.Process;
+
+namespace Orchestrator
+{
+    public enum ReaderType
+    {
+        Stream,
+        EventPipeEventSource
+    }
+
+    class Program
+    {
+        delegate Task<int> RootCommandHandler(
+            IConsole console,
+            CancellationToken ct,
+            FileInfo stressPath,
+            int eventSize,
+            int eventRate,
+            BurstPattern burstPattern,
+            ReaderType readerType,
+            int slowReader,
+            int duration,
+            int cores,
+            int threads,
+            int eventCount,
+            bool rundown,
+            int bufferSize,
+            int iterations,
+            bool pause);
+
+        // TODO: Collect CPU % of reader and writer while running test and add to stats
+        // TODO: Standardize and clean up logging from orchestrator and corescaletest
+        // TODO: Improve error handling
+
+        static async Task<int> Main(string[] args)
+        {
+
+            if (RuntimeInformation.IsOSPlatform(OSPlatform.Windows))
+            {
+                // Try to run in admin mode in Windows
+                bool isElevated;
+                using (WindowsIdentity identity = WindowsIdentity.GetCurrent())
+                {
+                    WindowsPrincipal principal = new WindowsPrincipal(identity);
+                    isElevated = principal.IsInRole(WindowsBuiltInRole.Administrator);
+                }
+                if (!isElevated)
+                {
+                    Console.WriteLine("Must run in root/admin mode");
+                    return -1;
+                } 
+            }
+
+            return await BuildCommandLine()
+                            .UseDefaults()
+                            .Build()
+                            .InvokeAsync(args);
+        }
+
+        static CommandLineBuilder BuildCommandLine()
+        {
+            var rootCommand = new RootCommand("EventPipe Stress Tester - Orchestrator")
+            {
+                new Argument<FileInfo>(
+                    name: "stress-path",
+                    description: "The location of the Stress executable."
+                ),
+                CommandLineOptions.EventSizeOption,
+                CommandLineOptions.EventRateOption,
+                CommandLineOptions.BurstPatternOption,
+                OrchestrateCommandLine.ReaderTypeOption,
+                OrchestrateCommandLine.SlowReaderOption,
+                CommandLineOptions.DurationOption,
+                OrchestrateCommandLine.CoresOption,
+                CommandLineOptions.ThreadsOption,
+                CommandLineOptions.EventCountOption,
+                OrchestrateCommandLine.RundownOption,
+                OrchestrateCommandLine.BufferSizeOption,
+                OrchestrateCommandLine.IterationsOption,
+                OrchestrateCommandLine.PauseOption
+            };
+
+
+            rootCommand.Handler = CommandHandler.Create((RootCommandHandler)Orchestrate);
+            return new CommandLineBuilder(rootCommand);
+        }
+
+        private static EventPipeSession GetSession(int pid, bool rundown, int bufferSize)
+        {
+            DiagnosticsClient client = new DiagnosticsClient(pid);
+            while (!client.CheckTransport())
+            {
+                Console.WriteLine("still unable to talk");
+                Thread.Sleep(50);
+            }
+            return client.StartEventPipeSession(
+                new EventPipeProvider("MySource", EventLevel.Verbose), 
+                requestRundown: rundown, 
+                circularBufferMB: bufferSize);
+        }
+
+        /// <summary>
+        /// This uses EventPipeEventSource's Stream constructor to parse the events real-time.
+        /// It then returns the number of events read.
+        /// </summary>
+        private static Func<int, TestResult> UseEPES(bool rundown, int bufferSize, int slowReader)
+        {
+            return (int pid) =>
+            {
+                int eventsRead = 0;
+                var slowReadSw = new Stopwatch();
+                var totalTimeSw = new Stopwatch();
+                var interval = TimeSpan.FromSeconds(0.75);
+
+                EventPipeSession session = GetSession(pid, rundown, bufferSize);
+                Console.WriteLine("Session created.");
+
+                EventPipeEventSource epes = new EventPipeEventSource(session.EventStream);
+                epes.Dynamic.All += (TraceEvent data) => {
+                    eventsRead += 1;
+                    if (slowReader > 0)
+                    {
+                        if (slowReadSw.Elapsed > interval)
+                        {
+                            Thread.Sleep(slowReader);
+                            slowReadSw.Reset();
+                        }
+                    }
+                };
+                if (slowReader > 0)
+                    slowReadSw.Start();
+                totalTimeSw.Start();
+                epes.Process();
+                totalTimeSw.Stop();
+                if (slowReader > 0)
+                    slowReadSw.Stop();
+                Console.WriteLine("Read total: " + eventsRead.ToString());
+                Console.WriteLine("Dropped total: " + epes.EventsLost.ToString());
+
+                return new TestResult(eventsRead, epes.EventsLost, totalTimeSw.Elapsed);
+            };
+        }
+
+        /// <summary>
+        /// This uses CopyTo to copy the trace into a filesystem first, and then uses EventPipeEventSource
+        /// on the file to post-process it and return the total # of events read.
+        /// </summary>
+        static Func<int, TestResult> UseFS(bool rundown, int bufferSize)
+        {
+            return (int pid) =>
+            {
+                int eventsRead = 0;
+                var totalTimeSw = new Stopwatch();
+                const string fileName = "./temp.nettrace";
+
+                EventPipeSession session = GetSession(pid, rundown, bufferSize);
+                Console.WriteLine("Session created.");
+
+                using (FileStream fs = new FileStream(fileName, FileMode.Create, FileAccess.Write))
+                {
+                    totalTimeSw.Start();
+                    session.EventStream.CopyTo(fs);
+                    totalTimeSw.Stop();
+                }
+                EventPipeEventSource epes = new EventPipeEventSource(fileName);
+                epes.Dynamic.All += (TraceEvent data) => {
+                    eventsRead += 1;
+                };
+                epes.Process();
+                Console.WriteLine("Read total: " + eventsRead.ToString());
+                Console.WriteLine("Dropped total: " + epes.EventsLost.ToString());
+
+                return new TestResult(eventsRead, epes.EventsLost, totalTimeSw.Elapsed);
+            };
+        }
+
+        static async Task<int> Orchestrate(
+            IConsole console,
+            CancellationToken ct,
+            FileInfo stressPath,
+            int eventSize,
+            int eventRate,
+            BurstPattern burstPattern,
+            ReaderType readerType,
+            int slowReader,
+            int duration,
+            int cores,
+            int threads,
+            int eventCount,
+            bool rundown,
+            int bufferSize,
+            int iterations,
+            bool pause)
+        {
+            if (!stressPath.Exists)
+            {
+                Console.WriteLine($"");
+                return -1;
+            }
+
+            string readerTypeString = readerType switch
+            {
+                ReaderType.Stream => "Stream",
+                ReaderType.EventPipeEventSource => "EventPipeEventSource",
+                _ => "Stream"
+            };
+
+            var durationTimeSpan = TimeSpan.FromSeconds(duration); 
+            var testResults = new TestResults(eventSize);
+
+            Func<int, TestResult> threadProc = readerType switch
+            {
+                ReaderType.Stream => UseFS(rundown, bufferSize),
+                ReaderType.EventPipeEventSource => UseEPES(rundown, bufferSize, slowReader),
+                _ => throw new ArgumentException("Invalid reader type")
+            };
+
+            if (eventRate == -1 && burstPattern != BurstPattern.NONE)
+                throw new ArgumentException("Must have burst pattern of NONE if rate is -1");
+
+            Console.WriteLine($"Configuration: event_size={eventSize}, event_rate={eventRate}, cores={cores}, num_threads={threads}, reader={readerType}, event_rate={(eventRate == -1 ? -1 : eventRate * threads)}, burst_pattern={burstPattern.ToString()}, slow_reader={slowReader}, duration={duration}");
+
+            for (int iteration = 0; iteration < iterations; iteration++)
+            {
+                Console.WriteLine("========================================================");
+                Console.WriteLine($"Starting iteration {iteration + 1}");
+
+                Process eventWritingProc = new Process();
+                eventWritingProc.StartInfo.FileName = stressPath.FullName;
+                eventWritingProc.StartInfo.Arguments = $"--threads {(threads == -1 ? cores.ToString() : threads.ToString())} --event-count {eventCount} --event-size {eventSize} --event-rate {eventRate} --burst-pattern {burstPattern} --duration {(int)durationTimeSpan.TotalSeconds}";
+                eventWritingProc.StartInfo.UseShellExecute = false;
+                eventWritingProc.StartInfo.RedirectStandardInput = true;
+                eventWritingProc.StartInfo.Environment["COMPlus_StressLog"] = "1";
+                eventWritingProc.StartInfo.Environment["COMPlus_LogFacility"] = "2000";
+                eventWritingProc.StartInfo.Environment["COMPlus_LogLevel"] = "8";
+                eventWritingProc.StartInfo.Environment["COMPlus_StressLogSize"] = "0x1000000";
+                eventWritingProc.Start();
+
+                Console.WriteLine($"Executing: {eventWritingProc.StartInfo.FileName} {eventWritingProc.StartInfo.Arguments}");
+                if (!RuntimeInformation.IsOSPlatform(OSPlatform.OSX))
+                {
+                    // Set affinity and priority
+                    ulong affinityMask = 0;
+                    for (int j = 0; j < cores; j++)
+                    {
+                        affinityMask |= ((ulong)1 << j);
+                    }
+                    eventWritingProc.ProcessorAffinity = (IntPtr)((ulong)eventWritingProc.ProcessorAffinity & affinityMask);
+                    eventWritingProc.PriorityClass = ProcessPriorityClass.RealTime; // Set the process priority to highest possible
+                }
+
+                // Start listening to the event.
+                Task<TestResult> listenerTask = Task.Run(() => threadProc(eventWritingProc.Id), ct);
+
+                if (pause)
+                {
+                    Console.WriteLine("Press <enter> to start test");
+                    Console.ReadLine();
+                }
+
+                // start the target process
+                StreamWriter writer = eventWritingProc.StandardInput;
+                writer.WriteLine("\r\n");
+                eventWritingProc.WaitForExit();
+
+                var resultTuple = await listenerTask;
+                testResults.Add(resultTuple);
+
+                Console.WriteLine($"Done with iteration {iteration + 1}");
+                Console.WriteLine("========================================================");
+
+            }
+
+            Console.WriteLine(testResults.GenerateSummary());
+            Console.WriteLine(testResults.GenerateStatisticsTable());
+
+            return 0;
+        }
+    }
+}
diff --git a/src/tests/EventPipeStress/Orchestrator/TestResult.cs b/src/tests/EventPipeStress/Orchestrator/TestResult.cs
new file mode 100644 (file)
index 0000000..28f086b
--- /dev/null
@@ -0,0 +1,106 @@
+using System;
+using System.Collections;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+
+namespace Orchestrator
+{
+    public record TestResult(long EventsRead, long EventsDropped, TimeSpan Duration)
+    {
+        public double ThroughputEfficiency => 100 * ((double)EventsRead / ((double)TotalEvents));
+        public double EventThroughput => (double)EventsRead / Duration.TotalSeconds;
+        public long TotalEvents => EventsRead + EventsDropped;
+    }
+
+    public record Stats(string Name, double Min, double Max, double Avg, double Std, string Units = null, int Resolution = 2)
+    {
+        public string FormattedName = $"{Name}{(Units != null ? $" ({Units})" : "")}";
+        public string TableRow => $"|{" " + FormattedName,-35}|{Min.ToString($"N{Resolution}"),20}|{Max.ToString($"N{Resolution}"),20}|{Avg.ToString($"N{Resolution}"),20}|{Std.ToString($"N{Resolution}"),20}|";
+        public static string Separator => $"|{new string('-',35),-35}|{new string('-',20),-20}|{new string('-',20),-20}|{new string('-',20),-20}|{new string('-',20),-20}|";
+        public static string Header => $"|{" stat",-35}|{" Min",-20}|{" Max",-20}|{" Average",-20}|{" Standard Deviation",-20}|";
+    }
+
+    public record StatGenerator(string Name, Func<TestResult,double> Calculation, string Units = null, int Resolution = 2)
+    {
+        public Stats GetStats(IEnumerable<TestResult> results) => results.GetStats(Calculation, Name, Units, Resolution);
+    }
+
+    public class TestResults : IEnumerable<TestResult>
+    {
+        private List<TestResult> _results = new List<TestResult>();
+        private int _eventSize;
+
+        public TestResults(int eventSize)
+        {
+            _eventSize = eventSize;
+        }
+
+        public void Add(TestResult result) =>
+            _results.Add(result);
+
+        public string GenerateSummary()
+        {
+            var sb = new StringBuilder();
+            sb.AppendLine("**** Summary ****");
+            int i = 0;
+            foreach (var result in _results)
+            {
+                sb.AppendLine($"iteration {i++ +1}: {result.EventsRead:N2} events collected, {result.EventsDropped:N2} events dropped in {result.Duration.TotalSeconds:N6} seconds - ({100 * ((double)result.EventsRead / (double)((long)result.EventsRead + result.EventsDropped)):N2}% throughput)");
+                sb.AppendLine($"\t({(double)result.EventsRead / result.Duration.TotalSeconds:N2} events/s) ({((double)result.EventsRead * _eventSize * sizeof(char)) / result.Duration.TotalSeconds:N2} bytes/s)");
+            }
+            return sb.ToString();
+        }
+
+        public string GenerateStatisticsTable()
+        {
+            var sb = new StringBuilder();
+
+            sb.AppendLine();
+            sb.AppendLine(Stats.Separator);
+            sb.AppendLine(Stats.Header);
+            sb.AppendLine(Stats.Separator);
+            foreach (var generator in StatGenerators)
+                sb.AppendLine(generator.GetStats(_results).TableRow);
+            sb.AppendLine(Stats.Separator);
+            return sb.ToString();
+        }
+
+        // Add statistics you want rendered in the results table here
+        private IEnumerable<StatGenerator> StatGenerators
+        {
+            get
+            {
+                yield return new StatGenerator("Events Read", result => result.EventsRead);
+                yield return new StatGenerator("Events Dropped", result => result.EventsDropped);
+                yield return new StatGenerator("Throughput Efficiency", result => result.ThroughputEfficiency, "%");
+                yield return new StatGenerator("Event Throughput", result => result.EventThroughput, "events/sec");
+                yield return new StatGenerator("Data Throughput", result => result.EventThroughput * sizeof(char) * _eventSize, "Bytes/sec");
+                yield return new StatGenerator("Duration", result => result.Duration.TotalSeconds, "seconds", 6);
+            }
+        }
+
+        public IEnumerator<TestResult> GetEnumerator()
+        {
+            return _results.GetEnumerator();
+        }
+
+        IEnumerator IEnumerable.GetEnumerator()
+        {
+            return _results.GetEnumerator();
+        }
+    }
+
+    public static class Extensions
+    {
+        public static Stats GetStats(this IEnumerable<TestResult> results, Func<TestResult, double> calculation, string name, string units = null, int resolution = 2)
+        {
+            double min = results.Min(calculation);
+            double max = results.Max(calculation);
+            double avg = results.Average(calculation);
+            double std = Math.Sqrt(results.Average(result => Math.Pow(calculation(result) - avg, 2)));
+
+            return new Stats(name, min, max, avg, std, units, resolution);
+        }
+    }
+}
\ No newline at end of file
diff --git a/src/tests/EventPipeStress/README.md b/src/tests/EventPipeStress/README.md
new file mode 100644 (file)
index 0000000..57e91ff
--- /dev/null
@@ -0,0 +1,111 @@
+# EventPipe Stress
+
+You can use `Orchestrator` and `Stress` to run several stress test scenarios for EventPipe.
+
+These tools are meant for developers working on EventPipe in dotnet/runtime.
+
+## Orchestrator
+
+Help text:
+
+```
+$ Orchestrator -h
+Orchestrator:
+  EventPipe Stress Tester - Orchestrator
+
+Usage:
+  Orchestrator [options] <stress-path>
+
+Arguments:
+  <stress-path>    The location of the Stress executable.
+
+Options:
+  --event-size <event-size>                       The size of the event payload. The payload is a string, so the actual size will be eventSize * sizeof(char) where sizeof(char) is 2 Bytes due to Unicode in C#. [default: 100]
+  --event-rate <event-rate>                       The rate of events in events/sec. -1 means 'as fast as possible'. [default: -1]
+  --burst-pattern <BOLUS|DRIP|HEAVY_DRIP|NONE>    The burst pattern to send events in. [default: NONE]
+  --reader-type <EventPipeEventSource|Stream>     The method to read the stream of events. [default: Stream]
+  --slow-reader <slow-reader>                     <Only valid for EventPipeEventSource reader> Delay every read by this many milliseconds. [default: 0]
+  --duration <duration>                           The number of seconds to send events for. [default: 60]
+  --cores <cores>                                 The number of logical cores to restrict the writing process to. [default: 8]
+  --threads <threads>                             The number of threads writing events. [default: 1]
+  --event-count <event-count>                     The total number of events to write per thread. -1 means no limit [default: -1]
+  --rundown                                       Should the EventPipe session request rundown events? [default: True]
+  --buffer-size <buffer-size>                     The size of the buffer requested in the EventPipe session [default: 256]
+  --iterations <iterations>                       The number of times to run the test. [default: 1]
+  --pause                                         Should the orchestrator pause before starting each test phase for a debugger to attach? [default: False]
+  --version                                       Show version information
+  -?, -h, --help                                  Show help and usage information
+```
+
+## Stress
+
+Help text:
+
+```
+$ Stress -h
+Stress:
+  EventPipe Stress Tester - Stress
+
+Usage:
+  Stress [options]
+
+Options:
+  --event-size <event-size>                       The size of the event payload. The payload is a string, so the actual size will be eventSize * sizeof(char) where sizeof(char) is 2 Bytes due to Unicode in C#. [default: 100]
+  --event-rate <event-rate>                       The rate of events in events/sec. -1 means 'as fast as possible'. [default: -1]
+  --burst-pattern <BOLUS|DRIP|HEAVY_DRIP|NONE>    The burst pattern to send events in. [default: NONE]
+  --duration <duration>                           The number of seconds to send events for. [default: 60]
+  --threads <threads>                             The number of threads writing events. [default: 1]
+  --event-count <event-count>                     The total number of events to write per thread. -1 means no limit [default: -1]
+  --version                                       Show version information
+  -?, -h, --help                                  Show help and usage information
+```
+
+## Usage
+
+### Prerequisites
+
+1. Build `Orchestrator`
+2. Publish `Stress` as a self contained application (`dotnet publish -r <RID> --self-contained`)
+3. (optional) Copy over the runtime bits in the `Stress` publish location to test private runtime builds
+
+### Basic Scenarios
+
+1. Send as many events as possible in `N` seconds
+
+`Orchestrate <path-to-Stress> --duration <N> --iterations 100`
+
+2. Send `N` events as fast as possible
+
+`Orchestrate <path-to-Stress> --event-count <N> --iterations 100`
+
+3. Send `N` events in a burst pattern of `M` events/sec
+
+`Orchestrate <path-to-Stress> --event-count <N> --event-rate <M> --burst-pattern bolus --iterations 100`
+
+### Sample Output
+
+```
+**** Summary ****
+iteration 1: 102,678.00 events collected, 0.00 events dropped in 0.283581 seconds - (100.00% throughput)
+        (362,076.44 events/s) (181,038,221.88 bytes/s)
+iteration 2: 102,678.00 events collected, 0.00 events dropped in 0.634398 seconds - (100.00% throughput)
+        (161,851.05 events/s) (80,925,526.10 bytes/s)
+iteration 3: 102,678.00 events collected, 0.00 events dropped in 0.652566 seconds - (100.00% throughput)
+        (157,344.96 events/s) (78,672,477.98 bytes/s)
+iteration 4: 102,678.00 events collected, 0.00 events dropped in 0.661910 seconds - (100.00% throughput)
+        (155,123.83 events/s) (77,561,915.90 bytes/s)
+iteration 5: 102,678.00 events collected, 0.00 events dropped in 0.632966 seconds - (100.00% throughput)
+        (162,217.35 events/s) (81,108,673.20 bytes/s)
+
+
+|-----------------------------------|--------------------|--------------------|--------------------|--------------------|
+| stat                              | Min                | Max                | Average            | Standard Deviation |
+|-----------------------------------|--------------------|--------------------|--------------------|--------------------|
+| Events Read                       |          102,678.00|          102,678.00|          102,678.00|                0.00|
+| Events Dropped                    |                0.00|                0.00|                0.00|                0.00|
+| Throughput Efficiency (%)         |              100.00|              100.00|              100.00|                0.00|
+| Event Throughput (events/sec)     |          155,123.83|          362,076.44|          199,722.73|           81,221.41|
+| Data Throughput (Bytes/sec)       |       77,561,915.90|      181,038,221.88|       99,861,363.01|       40,610,702.78|
+| Duration (seconds)                |            0.283581|            0.661910|            0.573084|            0.145165|
+|-----------------------------------|--------------------|--------------------|--------------------|--------------------|
+```
\ No newline at end of file
diff --git a/src/tests/EventPipeStress/Stress/Program.cs b/src/tests/EventPipeStress/Stress/Program.cs
new file mode 100644 (file)
index 0000000..d5289f0
--- /dev/null
@@ -0,0 +1,114 @@
+using System;
+using System.CommandLine;
+using System.CommandLine.Builder;
+using System.CommandLine.Invocation;
+using System.CommandLine.Parsing;
+using System.Diagnostics.Tracing;
+using System.Linq;
+using System.Threading;
+using System.Threading.Tasks;
+using Common;
+
+namespace Stress
+{
+    class MySource : EventSource
+    {
+        public static MySource Log = new MySource();
+        public static string s_SmallPayload = new String('a', 100);
+        public static string s_BigPayload = new String('a', 10000);
+        public static string s_Payload = new String('a', 100);
+
+        public void FireSmallEvent() { WriteEvent(1, s_SmallPayload); }
+        public void FireBigEvent() { WriteEvent(1, s_BigPayload); }
+        public void FireEvent() => WriteEvent(1, s_Payload);
+    }
+
+    class Program
+    {
+        private static bool finished = false;
+        private static int eventRate = -1;
+        private static BurstPattern burstPattern = BurstPattern.NONE;
+        private static Action threadProc = null;
+        private static Action makeThreadProc(int eventCount)
+        {
+            Func<long> burst = BurstPatternMethods.Burst(burstPattern, eventRate, MySource.Log.FireEvent, BurstPatternMethods.BusySleepAction);
+            if (eventCount != -1)
+            {
+                return () => { 
+                    long messagesSent = 0; 
+                    while (!finished && messagesSent < eventCount)
+                        messagesSent += burst();
+                };
+            }
+            else
+                return () => { while (!finished) { burst(); } };
+        }
+
+        private delegate Task<int> RootCommandHandler(IConsole console, CancellationToken ct, int eventSize, int eventRate, BurstPattern burstPattern, int threads, int duration, int eventCount);
+
+        private static CommandLineBuilder BuildCommandLine()
+        {
+            var rootCommand = new RootCommand("EventPipe Stress Tester - Stress")
+            {
+                CommandLineOptions.EventSizeOption,
+                CommandLineOptions.EventRateOption,
+                CommandLineOptions.BurstPatternOption,
+                CommandLineOptions.DurationOption,
+                CommandLineOptions.ThreadsOption,
+                CommandLineOptions.EventCountOption
+            };
+
+
+            rootCommand.Handler = CommandHandler.Create((RootCommandHandler)Run);
+            return new CommandLineBuilder(rootCommand);
+        }
+
+        static async Task<int> Main(string[] args)
+        {
+            return await BuildCommandLine()
+                .UseDefaults()
+                .Build()
+                .InvokeAsync(args);
+        }
+
+        private static async Task<int> Run(IConsole console, CancellationToken ct, int eventSize, int eventRate, BurstPattern burstPattern, int threads, int duration, int eventCount)
+        {
+            TimeSpan durationTimeSpan = TimeSpan.FromSeconds(duration);
+
+            MySource.s_Payload = new String('a', eventSize);
+
+            threadProc = makeThreadProc(eventCount);
+
+            Thread[] threadArray = new Thread[threads];
+            TaskCompletionSource<bool>[] tcsArray = new TaskCompletionSource<bool>[threads];
+
+            for (int i = 0; i < threads; i++)
+            {
+                var tcs = new TaskCompletionSource<bool>();
+                threadArray[i] = new Thread(() => { threadProc(); tcs.TrySetResult(true); });
+                tcsArray[i] = tcs;
+            }
+
+            Console.WriteLine($"SUBPROCESSS :: Running - Threads: {threads}, EventSize: {eventSize * sizeof(char):N} bytes, EventCount: {(eventCount == -1 ? -1 : eventCount * threads)}, EventRate: {(eventRate == -1 ? -1 : eventRate * threads)} events/sec, duration: {durationTimeSpan.TotalSeconds}s");
+            Console.ReadLine();
+
+            for (int i = 0; i < threads; i++)
+            {
+                threadArray[i].Start();
+            }
+            
+            if (eventCount != -1)
+                Console.WriteLine($"SUBPROCESSS :: Sleeping for {durationTimeSpan.TotalSeconds} seconds or until {eventCount} events have been sent on each thread, whichever happens first");
+            else
+                Console.WriteLine($"SUBPROCESSS :: Sleeping for {durationTimeSpan.TotalSeconds} seconds");
+
+            Task threadCompletionTask = Task.WhenAll(tcsArray.Select(tcs => tcs.Task));
+            Task result = await Task.WhenAny(Task.Delay(durationTimeSpan), threadCompletionTask);
+            finished = true;
+
+            await threadCompletionTask;
+            Console.WriteLine("SUBPROCESSS :: Done. Goodbye!");
+            return 0;
+        }
+    }
+}
diff --git a/src/tests/EventPipeStress/Stress/Stress.csproj b/src/tests/EventPipeStress/Stress/Stress.csproj
new file mode 100644 (file)
index 0000000..1f79797
--- /dev/null
@@ -0,0 +1,13 @@
+<Project Sdk="Microsoft.NET.Sdk">
+
+  <PropertyGroup>
+    <OutputType>Exe</OutputType>
+    <TargetFramework>net5.0</TargetFramework>
+  </PropertyGroup>
+
+  <ItemGroup>
+    <ProjectReference Include="../Common/Common.csproj" />
+    <PackageReference Include="System.CommandLine" Version="2.0.0-beta1.20574.7" />
+  </ItemGroup>
+
+</Project>