Use a pooled buffer in BrotliStream (dotnet/corefx#35492)
authorStephen Toub <stoub@microsoft.com>
Fri, 22 Feb 2019 03:16:04 +0000 (22:16 -0500)
committerGitHub <noreply@github.com>
Fri, 22 Feb 2019 03:16:04 +0000 (22:16 -0500)
* Use a pooled buffer in BrotliStream

* Address PR feedback

Commit migrated from https://github.com/dotnet/corefx/commit/bca44089343cf14cffd3cb0982b7e209692f6731

src/libraries/Common/tests/System/IO/Compression/CompressionStreamUnitTestBase.cs
src/libraries/System.IO.Compression.Brotli/src/System/IO/Compression/BrotliStream.cs
src/libraries/System.IO.Compression.Brotli/tests/CompressionStreamUnitTests.Brotli.cs

index 227b636..db45a36 100644 (file)
@@ -3,6 +3,7 @@
 // See the LICENSE file in the project root for more information.
 
 using System.Collections.Generic;
+using System.Linq;
 using System.Text;
 using System.Threading;
 using System.Threading.Tasks;
@@ -1225,6 +1226,35 @@ namespace System.IO.Compression
             else
                 baseStream.Read(bytes, 0, size);
         }
+
+        [Fact]
+        public async Task Parallel_CompressDecompressMultipleStreamsConcurrently()
+        {
+            const int ParallelOperations = 20;
+            const int DataSize = 10 * 1024;
+
+            var sourceData = new byte[DataSize];
+            new Random().NextBytes(sourceData);
+
+            await Task.WhenAll(Enumerable.Range(0, ParallelOperations).Select(_ => Task.Run(async () =>
+            {
+                var compressedStream = new MemoryStream();
+                using (Stream ds = CreateStream(compressedStream, CompressionMode.Compress, leaveOpen: true))
+                {
+                    await ds.WriteAsync(sourceData, 0, sourceData.Length);
+                }
+
+                compressedStream.Position = 0;
+
+                var decompressedStream = new MemoryStream();
+                using (Stream ds = CreateStream(compressedStream, CompressionMode.Decompress, leaveOpen: true))
+                {
+                    await ds.CopyToAsync(decompressedStream);
+                }
+
+                Assert.Equal(sourceData, decompressedStream.ToArray());
+            })));
+        }
     }
 
     internal sealed class BadWrappedStream : MemoryStream
index 0cbe424..b878204 100644 (file)
@@ -2,8 +2,8 @@
 // The .NET Foundation licenses this file to you under the MIT license.
 // See the LICENSE file in the project root for more information.
 
+using System.Buffers;
 using System.Diagnostics;
-using System.Runtime.CompilerServices;
 using System.Threading;
 using System.Threading.Tasks;
 
@@ -13,7 +13,7 @@ namespace System.IO.Compression
     {
         private const int DefaultInternalBufferSize = (1 << 16) - 16; //65520;
         private Stream _stream;
-        private readonly byte[] _buffer;
+        private byte[] _buffer;
         private readonly bool _leaveOpen;
         private readonly CompressionMode _mode;
 
@@ -40,13 +40,13 @@ namespace System.IO.Compression
             _mode = mode;
             _stream = stream;
             _leaveOpen = leaveOpen;
-            _buffer = new byte[DefaultInternalBufferSize];
+            _buffer = ArrayPool<byte>.Shared.Rent(DefaultInternalBufferSize);
         }
 
         private void EnsureNotDisposed()
         {
             if (_stream == null)
-                throw new ObjectDisposedException("stream", SR.ObjectDisposed_StreamClosed);
+                throw new ObjectDisposedException(GetType().Name, SR.ObjectDisposed_StreamClosed);
         }
 
         protected override void Dispose(bool disposing)
@@ -68,9 +68,7 @@ namespace System.IO.Compression
             }
             finally
             {
-                _stream = null;
-                _encoder.Dispose();
-                _decoder.Dispose();
+                ReleaseStateForDispose();
                 base.Dispose(disposing);
             }
         }
@@ -94,9 +92,24 @@ namespace System.IO.Compression
             }
             finally
             {
-                _stream = null;
-                _encoder.Dispose();
-                _decoder.Dispose();
+                ReleaseStateForDispose();
+            }
+        }
+
+        private void ReleaseStateForDispose()
+        {
+            _stream = null;
+            _encoder.Dispose();
+            _decoder.Dispose();
+
+            byte[] buffer = _buffer;
+            if (buffer != null)
+            {
+                _buffer = null;
+                if (!AsyncOperationIsActive)
+                {
+                    ArrayPool<byte>.Shared.Return(buffer);
+                }
             }
         }
 
index 38befdd..52f2d50 100644 (file)
@@ -15,7 +15,13 @@ namespace System.IO.Compression
         public override Stream CreateStream(Stream stream, CompressionLevel level) => new BrotliStream(stream, level);
         public override Stream CreateStream(Stream stream, CompressionLevel level, bool leaveOpen) => new BrotliStream(stream, level, leaveOpen);
         public override Stream BaseStream(Stream stream) => ((BrotliStream)stream).BaseStream;
-        public override int BufferSize { get => 65520; }
+
+        // The tests are relying on an implementation detail of BrotliStream, using knowledge of its internal buffer size
+        // in various test calculations.  Currently the implementation is using the ArrayPool, which will round up to a
+        // power-of-2. If the buffer size employed changes (which could also mean that ArrayPool<byte>.Shared starts giving
+        // out different array sizes), the tests will need to be tweaked.
+        public override int BufferSize => 1 << 16;
+
         protected override string CompressedTestFile(string uncompressedPath) => Path.Combine("BrotliTestData", Path.GetFileName(uncompressedPath) + ".br");
 
         [Fact]