[RateLimiting] Dequeue items when queuing with NewestFirst (#63377)
authorBrennan <brecon@microsoft.com>
Wed, 19 Jan 2022 19:01:24 +0000 (11:01 -0800)
committerGitHub <noreply@github.com>
Wed, 19 Jan 2022 19:01:24 +0000 (11:01 -0800)
src/libraries/System.Threading.RateLimiting/src/System/Threading/RateLimiting/ConcurrencyLimiter.cs
src/libraries/System.Threading.RateLimiting/src/System/Threading/RateLimiting/TokenBucketRateLimiter.cs
src/libraries/System.Threading.RateLimiting/tests/BaseRateLimiterTests.cs
src/libraries/System.Threading.RateLimiting/tests/ConcurrencyLimiterTests.cs
src/libraries/System.Threading.RateLimiting/tests/TokenBucketRateLimiterTests.cs

index 4ef7a3b..74ef7ec 100644 (file)
@@ -95,10 +95,27 @@ namespace System.Threading.RateLimiting
                     return new ValueTask<RateLimitLease>(lease);
                 }
 
-                // Don't queue if queue limit reached
-                if (_queueCount + permitCount > _options.QueueLimit)
+                // Avoid integer overflow by using subtraction instead of addition
+                Debug.Assert(_options.QueueLimit >= _queueCount);
+                if (_options.QueueLimit - _queueCount < permitCount)
                 {
-                    return new ValueTask<RateLimitLease>(QueueLimitLease);
+                    if (_options.QueueProcessingOrder == QueueProcessingOrder.NewestFirst && permitCount <= _options.QueueLimit)
+                    {
+                        // remove oldest items from queue until there is space for the newest request
+                        do
+                        {
+                            RequestRegistration oldestRequest = _queue.DequeueHead();
+                            _queueCount -= oldestRequest.Count;
+                            Debug.Assert(_queueCount >= 0);
+                            oldestRequest.Tcs.TrySetResult(FailedLease);
+                        }
+                        while (_options.QueueLimit - _queueCount < permitCount);
+                    }
+                    else
+                    {
+                        // Don't queue if queue limit reached and QueueProcessingOrder is OldestFirst
+                        return new ValueTask<RateLimitLease>(QueueLimitLease);
+                    }
                 }
 
                 TaskCompletionSource<RateLimitLease> tcs = new TaskCompletionSource<RateLimitLease>(TaskCreationOptions.RunContinuationsAsynchronously);
index bb1ec82..6593d89 100644 (file)
@@ -101,10 +101,27 @@ namespace System.Threading.RateLimiting
                     return new ValueTask<RateLimitLease>(lease);
                 }
 
-                // Don't queue if queue limit reached
-                if (_queueCount + tokenCount > _options.QueueLimit)
+                // Avoid integer overflow by using subtraction instead of addition
+                Debug.Assert(_options.QueueLimit >= _queueCount);
+                if (_options.QueueLimit - _queueCount < tokenCount)
                 {
-                    return new ValueTask<RateLimitLease>(CreateFailedTokenLease(tokenCount));
+                    if (_options.QueueProcessingOrder == QueueProcessingOrder.NewestFirst && tokenCount <= _options.QueueLimit)
+                    {
+                        // remove oldest items from queue until there is space for the newest acquisition request
+                        do
+                        {
+                            RequestRegistration oldestRequest = _queue.DequeueHead();
+                            _queueCount -= oldestRequest.Count;
+                            Debug.Assert(_queueCount >= 0);
+                            oldestRequest.Tcs.TrySetResult(FailedLease);
+                        }
+                        while (_options.QueueLimit - _queueCount < tokenCount);
+                    }
+                    else
+                    {
+                        // Don't queue if queue limit reached and QueueProcessingOrder is OldestFirst
+                        return new ValueTask<RateLimitLease>(CreateFailedTokenLease(tokenCount));
+                    }
                 }
 
                 TaskCompletionSource<RateLimitLease> tcs = new TaskCompletionSource<RateLimitLease>(TaskCreationOptions.RunContinuationsAsynchronously);
index 9d98a51..a96dc0b 100644 (file)
@@ -24,12 +24,24 @@ namespace System.Threading.RateLimiting.Test
         public abstract Task CanAcquireResourceAsync_QueuesAndGrabsNewest();
 
         [Fact]
-        public abstract Task FailsWhenQueuingMoreThanLimit();
+        public abstract Task FailsWhenQueuingMoreThanLimit_OldestFirst();
+
+        [Fact]
+        public abstract Task DropsOldestWhenQueuingMoreThanLimit_NewestFirst();
+
+        [Fact]
+        public abstract Task DropsMultipleOldestWhenQueuingMoreThanLimit_NewestFirst();
+
+        [Fact]
+        public abstract Task DropsRequestedLeaseIfPermitCountGreaterThanQueueLimitAndNoAvailability_NewestFirst();
 
         [Fact]
         public abstract Task QueueAvailableAfterQueueLimitHitAndResources_BecomeAvailable();
 
         [Fact]
+        public abstract Task LargeAcquiresAndQueuesDoNotIntegerOverflow();
+
+        [Fact]
         public abstract void ThrowsWhenAcquiringMoreThanLimit();
 
         [Fact]
index 22658e0..da041ac 100644 (file)
@@ -94,9 +94,9 @@ namespace System.Threading.RateLimiting.Test
         }
 
         [Fact]
-        public override async Task FailsWhenQueuingMoreThanLimit()
+        public override async Task FailsWhenQueuingMoreThanLimit_OldestFirst()
         {
-            var limiter = new ConcurrencyLimiter(new ConcurrencyLimiterOptions(1, QueueProcessingOrder.NewestFirst, 1));
+            var limiter = new ConcurrencyLimiter(new ConcurrencyLimiterOptions(1, QueueProcessingOrder.OldestFirst, 1));
             using var lease = limiter.Acquire(1);
             var wait = limiter.WaitAsync(1);
 
@@ -105,11 +105,96 @@ namespace System.Threading.RateLimiting.Test
         }
 
         [Fact]
-        public override async Task QueueAvailableAfterQueueLimitHitAndResources_BecomeAvailable()
+        public override async Task DropsOldestWhenQueuingMoreThanLimit_NewestFirst()
         {
             var limiter = new ConcurrencyLimiter(new ConcurrencyLimiterOptions(1, QueueProcessingOrder.NewestFirst, 1));
             var lease = limiter.Acquire(1);
             var wait = limiter.WaitAsync(1);
+            Assert.False(wait.IsCompleted);
+
+            var wait2 = limiter.WaitAsync(1);
+            var lease1 = await wait;
+            Assert.False(lease1.IsAcquired);
+            Assert.False(wait2.IsCompleted);
+
+            lease.Dispose();
+
+            lease = await wait2;
+            Assert.True(lease.IsAcquired);
+        }
+
+        [Fact]
+        public override async Task DropsMultipleOldestWhenQueuingMoreThanLimit_NewestFirst()
+        {
+            var limiter = new ConcurrencyLimiter(new ConcurrencyLimiterOptions(2, QueueProcessingOrder.NewestFirst, 2));
+            var lease = limiter.Acquire(2);
+            Assert.True(lease.IsAcquired);
+            var wait = limiter.WaitAsync(1);
+            Assert.False(wait.IsCompleted);
+
+            var wait2 = limiter.WaitAsync(1);
+            Assert.False(wait2.IsCompleted);
+
+            var wait3 = limiter.WaitAsync(2);
+            var lease1 = await wait;
+            var lease2 = await wait2;
+            Assert.False(lease1.IsAcquired);
+            Assert.False(lease2.IsAcquired);
+            Assert.False(wait3.IsCompleted);
+
+            lease.Dispose();
+
+            lease = await wait3;
+            Assert.True(lease.IsAcquired);
+        }
+
+        [Fact]
+        public override async Task DropsRequestedLeaseIfPermitCountGreaterThanQueueLimitAndNoAvailability_NewestFirst()
+        {
+            var limiter = new ConcurrencyLimiter(new ConcurrencyLimiterOptions(2, QueueProcessingOrder.NewestFirst, 1));
+            var lease = limiter.Acquire(2);
+            Assert.True(lease.IsAcquired);
+
+            // Fill queue
+            var wait = limiter.WaitAsync(1);
+            Assert.False(wait.IsCompleted);
+
+            var lease1 = await limiter.WaitAsync(2);
+            Assert.False(lease1.IsAcquired);
+
+            lease.Dispose();
+            var lease2 = await wait;
+            Assert.True(lease2.IsAcquired);
+        }
+
+        [Fact]
+        public override async Task LargeAcquiresAndQueuesDoNotIntegerOverflow()
+        {
+            var limiter = new ConcurrencyLimiter(new ConcurrencyLimiterOptions(int.MaxValue, QueueProcessingOrder.NewestFirst, int.MaxValue));
+            var lease = limiter.Acquire(int.MaxValue);
+            Assert.True(lease.IsAcquired);
+
+            // Fill queue
+            var wait = limiter.WaitAsync(3);
+            Assert.False(wait.IsCompleted);
+
+            var wait2 = limiter.WaitAsync(int.MaxValue);
+            Assert.False(wait2.IsCompleted);
+
+            var lease1 = await wait;
+            Assert.False(lease1.IsAcquired);
+
+            lease.Dispose();
+            var lease2 = await wait2;
+            Assert.True(lease2.IsAcquired);
+        }
+
+        [Fact]
+        public override async Task QueueAvailableAfterQueueLimitHitAndResources_BecomeAvailable()
+        {
+            var limiter = new ConcurrencyLimiter(new ConcurrencyLimiterOptions(1, QueueProcessingOrder.OldestFirst, 1));
+            var lease = limiter.Acquire(1);
+            var wait = limiter.WaitAsync(1);
 
             var failedLease = await limiter.WaitAsync(1);
             Assert.False(failedLease.IsAcquired);
index edf05bf..594bb79 100644 (file)
@@ -111,9 +111,9 @@ namespace System.Threading.RateLimiting.Test
         }
 
         [Fact]
-        public override async Task FailsWhenQueuingMoreThanLimit()
+        public override async Task FailsWhenQueuingMoreThanLimit_OldestFirst()
         {
-            var limiter = new TokenBucketRateLimiter(new TokenBucketRateLimiterOptions(1, QueueProcessingOrder.NewestFirst, 1,
+            var limiter = new TokenBucketRateLimiter(new TokenBucketRateLimiterOptions(1, QueueProcessingOrder.OldestFirst, 1,
                 TimeSpan.Zero, 1, autoReplenishment: false));
             using var lease = limiter.Acquire(1);
             var wait = limiter.WaitAsync(1);
@@ -125,9 +125,77 @@ namespace System.Threading.RateLimiting.Test
         }
 
         [Fact]
-        public override async Task QueueAvailableAfterQueueLimitHitAndResources_BecomeAvailable()
+        public override async Task DropsOldestWhenQueuingMoreThanLimit_NewestFirst()
         {
             var limiter = new TokenBucketRateLimiter(new TokenBucketRateLimiterOptions(1, QueueProcessingOrder.NewestFirst, 1,
+                   TimeSpan.Zero, 1, autoReplenishment: false));
+            var lease = limiter.Acquire(1);
+            var wait = limiter.WaitAsync(1);
+            Assert.False(wait.IsCompleted);
+
+            var wait2 = limiter.WaitAsync(1);
+            var lease1 = await wait;
+            Assert.False(lease1.IsAcquired);
+            Assert.False(wait2.IsCompleted);
+
+            limiter.TryReplenish();
+
+            lease = await wait2;
+            Assert.True(lease.IsAcquired);
+        }
+
+        [Fact]
+        public override async Task DropsMultipleOldestWhenQueuingMoreThanLimit_NewestFirst()
+        {
+            var limiter = new TokenBucketRateLimiter(new TokenBucketRateLimiterOptions(2, QueueProcessingOrder.NewestFirst, 2,
+                   TimeSpan.Zero, 1, autoReplenishment: false));
+            var lease = limiter.Acquire(2);
+            Assert.True(lease.IsAcquired);
+            var wait = limiter.WaitAsync(1);
+            Assert.False(wait.IsCompleted);
+
+            var wait2 = limiter.WaitAsync(1);
+            Assert.False(wait2.IsCompleted);
+
+            var wait3 = limiter.WaitAsync(2);
+            var lease1 = await wait;
+            var lease2 = await wait2;
+            Assert.False(lease1.IsAcquired);
+            Assert.False(lease2.IsAcquired);
+            Assert.False(wait3.IsCompleted);
+
+            limiter.TryReplenish();
+            limiter.TryReplenish();
+
+            lease = await wait3;
+            Assert.True(lease.IsAcquired);
+        }
+
+        [Fact]
+        public override async Task DropsRequestedLeaseIfPermitCountGreaterThanQueueLimitAndNoAvailability_NewestFirst()
+        {
+            var limiter = new TokenBucketRateLimiter(new TokenBucketRateLimiterOptions(2, QueueProcessingOrder.NewestFirst, 1,
+                   TimeSpan.Zero, 1, autoReplenishment: false));
+            var lease = limiter.Acquire(2);
+            Assert.True(lease.IsAcquired);
+
+            // Fill queue
+            var wait = limiter.WaitAsync(1);
+            Assert.False(wait.IsCompleted);
+
+            var lease1 = await limiter.WaitAsync(2);
+            Assert.False(lease1.IsAcquired);
+
+            limiter.TryReplenish();
+
+            lease = await wait;
+            Assert.True(lease.IsAcquired);
+        }
+
+        [Fact]
+        public override async Task QueueAvailableAfterQueueLimitHitAndResources_BecomeAvailable()
+        {
+            var limiter = new TokenBucketRateLimiter(new TokenBucketRateLimiterOptions(1, QueueProcessingOrder.OldestFirst, 1,
                 TimeSpan.Zero, 1, autoReplenishment: false));
             var lease = limiter.Acquire(1);
             var wait = limiter.WaitAsync(1);
@@ -148,6 +216,29 @@ namespace System.Threading.RateLimiting.Test
         }
 
         [Fact]
+        public override async Task LargeAcquiresAndQueuesDoNotIntegerOverflow()
+        {
+            var limiter = new TokenBucketRateLimiter(new TokenBucketRateLimiterOptions(int.MaxValue, QueueProcessingOrder.NewestFirst, int.MaxValue,
+                TimeSpan.Zero, int.MaxValue, autoReplenishment: false));
+            var lease = limiter.Acquire(int.MaxValue);
+            Assert.True(lease.IsAcquired);
+
+            // Fill queue
+            var wait = limiter.WaitAsync(3);
+            Assert.False(wait.IsCompleted);
+
+            var wait2 = limiter.WaitAsync(int.MaxValue);
+            Assert.False(wait2.IsCompleted);
+
+            var lease1 = await wait;
+            Assert.False(lease1.IsAcquired);
+
+            limiter.TryReplenish();
+            var lease2 = await wait2;
+            Assert.True(lease2.IsAcquired);
+        }
+
+        [Fact]
         public override void ThrowsWhenAcquiringMoreThanLimit()
         {
             var limiter = new TokenBucketRateLimiter(new TokenBucketRateLimiterOptions(1, QueueProcessingOrder.NewestFirst, 1,