Add dedicated wasm implementation for Parallel.Invoke and Parallel.For (#57283)
authorKatelyn Gadd <kg@luminance.org>
Tue, 17 Aug 2021 16:40:02 +0000 (09:40 -0700)
committerGitHub <noreply@github.com>
Tue, 17 Aug 2021 16:40:02 +0000 (09:40 -0700)
Introduces a simple single-threaded implementation for Parallel.Invoke, .For, and .ForEach that bypasses Task and wait primitives so that in browser environments parallel loop operations complete synchronously on the current thread

src/libraries/System.Private.Runtime.InteropServices.JavaScript/tests/System.Private.Runtime.InteropServices.JavaScript.Tests.csproj
src/libraries/System.Private.Runtime.InteropServices.JavaScript/tests/System/Runtime/InteropServices/JavaScript/ParallelTests.cs [new file with mode: 0644]
src/libraries/System.Threading.Tasks.Parallel/src/System/Threading/Tasks/Parallel.cs
src/libraries/System.Threading.Tasks.Parallel/src/System/Threading/Tasks/TaskReplicator.cs
src/libraries/System.Threading.Tasks.Parallel/tests/BreakTests.cs
src/libraries/System.Threading.Tasks.Parallel/tests/ParallelFor.cs

index c38d7c3..fe9348f 100644 (file)
@@ -16,6 +16,7 @@
     <Compile Include="System\Runtime\InteropServices\JavaScript\DelegateTests.cs" />
     <Compile Include="System\Runtime\InteropServices\JavaScript\HelperMarshal.cs" />
     <Compile Include="System\Runtime\InteropServices\JavaScript\Http\HttpRequestMessageTest.cs" />
+    <Compile Include="System\Runtime\InteropServices\JavaScript\ParallelTests.cs" />
   </ItemGroup>
   <ItemGroup>
     <!-- Part of the shared framework but not exposed. -->
diff --git a/src/libraries/System.Private.Runtime.InteropServices.JavaScript/tests/System/Runtime/InteropServices/JavaScript/ParallelTests.cs b/src/libraries/System.Private.Runtime.InteropServices.JavaScript/tests/System/Runtime/InteropServices/JavaScript/ParallelTests.cs
new file mode 100644 (file)
index 0000000..c74b84b
--- /dev/null
@@ -0,0 +1,70 @@
+// Licensed to the .NET Foundation under one or more agreements.
+// The .NET Foundation licenses this file to you under the MIT license.
+
+using System.Collections;
+using System.Collections.Generic;
+using System.Runtime.InteropServices.JavaScript;
+using System.Threading;
+using System.Threading.Tasks;
+using Xunit;
+
+namespace System.Runtime.InteropServices.JavaScript.Tests
+{
+    public static class ParallelTests
+    {
+        // The behavior of APIs like Invoke depends on how many items they are asked to invoke
+        [Theory]
+        [InlineData(0)]
+        [InlineData(1)]
+        [InlineData(2)]
+        [InlineData(5)]
+        [InlineData(32)]
+        [InlineData(250)]
+        public static void ParallelInvokeActionArray(int count)
+        {
+            var actions = new List<Action>();
+            int sum = 0, expected = 0;
+            for (int i = 0; i < count; i++) {
+                int j = i;
+                actions.Add(() => {
+                    sum += j;
+                });
+                expected += j;
+            }
+
+            Parallel.Invoke(actions.ToArray());
+            Assert.Equal(expected, sum);
+        }
+
+        [Theory]
+        [InlineData(0)]
+        [InlineData(1)]
+        [InlineData(32)]
+        [InlineData(250)]
+        public static void ParallelFor(int count)
+        {
+            int sum = 0, expected = 0;
+            for (int i = 0; i < count; i++)
+                expected += i;
+            Parallel.For(0, count, (i) => { sum += i; });
+            Assert.Equal(expected, sum);
+        }
+
+        [Theory]
+        [InlineData(0)]
+        [InlineData(1)]
+        [InlineData(32)]
+        [InlineData(250)]
+        public static void ParallelForEach(int count)
+        {
+            int sum = 0, expected = 0;
+            var items = new List<int>();
+            for (int i = 0; i < count; i++) {
+                items.Add(i);
+                expected += i;
+            }
+            Parallel.ForEach(items, (i) => { sum += i; });
+            Assert.Equal(expected, sum);
+        }
+    }
+}
\ No newline at end of file
index cffded0..1b81cfb 100644 (file)
@@ -243,9 +243,12 @@ namespace System.Threading.Tasks
             {
                 // If we've gotten this far, it's time to process the actions.
 
-                // This is more efficient for a large number of actions, or for enforcing MaxDegreeOfParallelism:
-                if ((actionsCopy.Length > SMALL_ACTIONCOUNT_LIMIT) ||
-                     (parallelOptions.MaxDegreeOfParallelism != -1 && parallelOptions.MaxDegreeOfParallelism < actionsCopy.Length))
+                // Web browsers need special treatment that is implemented in TaskReplicator
+                if (OperatingSystem.IsBrowser() ||
+                    // This is more efficient for a large number of actions, or for enforcing MaxDegreeOfParallelism:
+                    (actionsCopy.Length > SMALL_ACTIONCOUNT_LIMIT) ||
+                    (parallelOptions.MaxDegreeOfParallelism != -1 && parallelOptions.MaxDegreeOfParallelism < actionsCopy.Length)
+                )
                 {
                     // Used to hold any exceptions encountered during action processing
                     ConcurrentQueue<Exception>? exceptionQ = null; // will be lazily initialized if necessary
index c10e9c8..a84113b 100644 (file)
@@ -131,17 +131,29 @@ namespace System.Threading.Tasks
 
         public static void Run<TState>(ReplicatableUserAction<TState> action, ParallelOptions options, bool stopOnFirstFailure)
         {
-            int maxConcurrencyLevel = (options.EffectiveMaxConcurrencyLevel > 0) ? options.EffectiveMaxConcurrencyLevel : int.MaxValue;
-
-            TaskReplicator replicator = new TaskReplicator(options, stopOnFirstFailure);
-            new Replica<TState>(replicator, maxConcurrencyLevel, CooperativeMultitaskingTaskTimeout_RootTask, action).Start();
-
-            Replica? nextReplica;
-            while (replicator._pendingReplicas.TryDequeue(out nextReplica))
-                nextReplica.Wait();
-
-            if (replicator._exceptions != null)
-                throw new AggregateException(replicator._exceptions);
+            // Browser hosts do not support synchronous Wait so we want to run the
+            //  replicated task directly instead of going through Task infrastructure
+            if (OperatingSystem.IsBrowser()) {
+                // Since we are running on a single thread, we don't want the action to time out
+                var timeout = int.MaxValue - 1;
+                var state = default(TState)!;
+
+                action(ref state, timeout, out bool yieldedBeforeCompletion);
+                if (yieldedBeforeCompletion)
+                    throw new Exception("Replicated tasks cannot yield in this single-threaded browser environment");
+            } else {
+                int maxConcurrencyLevel = (options.EffectiveMaxConcurrencyLevel > 0) ? options.EffectiveMaxConcurrencyLevel : int.MaxValue;
+
+                TaskReplicator replicator = new TaskReplicator(options, stopOnFirstFailure);
+                new Replica<TState>(replicator, maxConcurrencyLevel, CooperativeMultitaskingTaskTimeout_RootTask, action).Start();
+
+                Replica? nextReplica;
+                while (replicator._pendingReplicas.TryDequeue(out nextReplica))
+                    nextReplica.Wait();
+
+                if (replicator._exceptions != null)
+                    throw new AggregateException(replicator._exceptions);
+            }
         }
 
 
index 91f1489..526f944 100644 (file)
@@ -10,7 +10,7 @@ namespace System.Threading.Tasks.Tests
 {
     public static class BreakTests
     {
-        [ConditionalTheory(typeof(PlatformDetection), nameof(PlatformDetection.IsThreadingSupported))]
+        [Theory]
         [InlineData(100, 10)]
         [InlineData(100, 20)]
         [InlineData(1000, 100)]
@@ -46,7 +46,7 @@ namespace System.Threading.Tasks.Tests
             Assert.True(result, "TestForBreak:  Failed: Could not detect any interruption of For-loop.");
         }
 
-        [ConditionalTheory(typeof(PlatformDetection), nameof(PlatformDetection.IsThreadingSupported))]
+        [Theory]
         [InlineData(100, 10)]
         [InlineData(100, 20)]
         [InlineData(1000, 100)]
@@ -86,7 +86,7 @@ namespace System.Threading.Tasks.Tests
             Assert.True(result, "TestFor64Break:  Failed: Could not detect any interruption of For-loop.");
         }
 
-        [ConditionalTheory(typeof(PlatformDetection), nameof(PlatformDetection.IsThreadingSupported))]
+        [Theory]
         [InlineData(500, 10)]
         [InlineData(500, 20)]
         [InlineData(1000, 100)]
index b8a97b9..34bc14d 100644 (file)
@@ -7,7 +7,7 @@ namespace System.Threading.Tasks.Tests
 {
     public static class ParallelForUnitTests
     {
-        [ConditionalTheory(typeof(PlatformDetection), nameof(PlatformDetection.IsThreadingSupported))]
+        [Theory]
         [InlineData(API.For64, StartIndexBase.Int32, 0, WithParallelOption.None, ActionWithState.None, ActionWithLocal.None)]
         [InlineData(API.For64, StartIndexBase.Int32, 10, WithParallelOption.None, ActionWithState.Stop, ActionWithLocal.HasFinally)]
         [InlineData(API.For64, StartIndexBase.Int32, 10, WithParallelOption.WithDOP, ActionWithState.None, ActionWithLocal.None)]