Add read ahead logic for streams. (dotnet/corefx#38039)
authorJeremy Kuhne <jkuhne@microsoft.com>
Fri, 31 May 2019 05:00:36 +0000 (22:00 -0700)
committerGitHub <noreply@github.com>
Fri, 31 May 2019 05:00:36 +0000 (22:00 -0700)
* Add read ahead logic for streams.

When reading a jaon object or array from a stream into an object we need to TrySkip to ensure that we have all the needed data for the JsonDocument to create a JsonElement. This is only necessary  if we haven't already drained the stream.

* Track state correctly

We need to track consumed separately so we can requeue the reader properly after skipping. Add more stream tests.

* Clarify comments and other feedback.

* Fix comment

Commit migrated from https://github.com/dotnet/corefx/commit/f6b010d2a0bdab94953d519a60d882ff805eea36

src/libraries/System.Text.Json/src/System/Text/Json/Serialization/JsonSerializer.Read.Stream.cs
src/libraries/System.Text.Json/src/System/Text/Json/Serialization/JsonSerializer.Read.cs
src/libraries/System.Text.Json/src/System/Text/Json/Serialization/JsonSerializerOptions.cs
src/libraries/System.Text.Json/src/System/Text/Json/Serialization/ReadStack.cs
src/libraries/System.Text.Json/tests/Serialization/JsonElementTests.cs
src/libraries/System.Text.Json/tests/Serialization/SpanTests.cs

index 19fdc98..61dc05e 100644 (file)
@@ -83,8 +83,8 @@ namespace System.Text.Json.Serialization
                 options = JsonSerializerOptions.s_defaultOptions;
             }
 
-            ReadStack state = default;
-            state.Current.Initialize(returnType, options);
+            ReadStack readStack = default;
+            readStack.Current.Initialize(returnType, options);
 
             var readerState = new JsonReaderState(options.GetReaderOptions());
 
@@ -138,10 +138,11 @@ namespace System.Text.Json.Serialization
                         isFinalBlock,
                         new Span<byte>(buffer, 0, bytesInBuffer),
                         options,
-                        ref state);
+                        ref readStack);
+
+                    Debug.Assert(readStack.BytesConsumed <= bytesInBuffer);
+                    int bytesConsumed = checked((int)readStack.BytesConsumed);
 
-                    Debug.Assert(readerState.BytesConsumed <= bytesInBuffer);
-                    int bytesConsumed = (int)readerState.BytesConsumed;
                     bytesInBuffer -= bytesConsumed;
 
                     if (isFinalBlock)
@@ -183,7 +184,7 @@ namespace System.Text.Json.Serialization
                 ThrowHelper.ThrowJsonException_DeserializeDataRemaining(totalBytesRead, bytesInBuffer);
             }
 
-            return (TValue)state.Current.ReturnValue;
+            return (TValue)readStack.Current.ReturnValue;
         }
 
         private static void ReadCore(
@@ -191,14 +192,21 @@ namespace System.Text.Json.Serialization
             bool isFinalBlock,
             Span<byte> buffer,
             JsonSerializerOptions options,
-            ref ReadStack state)
+            ref ReadStack readStack)
         {
             var reader = new Utf8JsonReader(buffer, isFinalBlock, readerState);
 
+            // If we haven't read in the entire stream's payload we'll need to signify that we want
+            // to enable read ahead behaviors to ensure we have complete json objects and arrays
+            // ({}, []) when needed. (Notably to successfully parse JsonElement via JsonDocument
+            // to assign to object and JsonElement properties in the constructed .NET object.)
+            options.ReadAhead = !isFinalBlock;
+            readStack.BytesConsumed = 0;
+
             ReadCore(
                 options,
                 ref reader,
-                ref state);
+                ref readStack);
 
             readerState = reader.CurrentState;
         }
index d31d4e2..4d96348 100644 (file)
@@ -3,8 +3,8 @@
 // See the LICENSE file in the project root for more information.
 
 using System.Buffers;
-using System.Collections;
 using System.Diagnostics;
+using System.Runtime.CompilerServices;
 
 namespace System.Text.Json.Serialization
 {
@@ -17,101 +17,150 @@ namespace System.Text.Json.Serialization
         private static void ReadCore(
             JsonSerializerOptions options,
             ref Utf8JsonReader reader,
-            ref ReadStack state)
+            ref ReadStack readStack)
         {
             try
             {
-                while (reader.Read())
+                JsonReaderState initialState = default;
+
+                while (true)
                 {
+                    if (options.ReadAhead)
+                    {
+                        // When we're reading ahead we always have to save the state
+                        // as we don't know if the next token is an opening object or
+                        // array brace.
+                        initialState = reader.CurrentState;
+                    }
+
+                    if (!reader.Read())
+                    {
+                        // Need more data
+                        break;
+                    }
+
                     JsonTokenType tokenType = reader.TokenType;
 
                     if (JsonHelpers.IsInRangeInclusive(tokenType, JsonTokenType.String, JsonTokenType.False))
                     {
                         Debug.Assert(tokenType == JsonTokenType.String || tokenType == JsonTokenType.Number || tokenType == JsonTokenType.True || tokenType == JsonTokenType.False);
 
-                        if (HandleValue(tokenType, options, ref reader, ref state))
-                        {
-                            continue;
-                        }
+                        HandleValue(tokenType, options, ref reader, ref readStack);
                     }
                     else if (tokenType == JsonTokenType.PropertyName)
                     {
-                        HandlePropertyName(options, ref reader, ref state);
+                        HandlePropertyName(options, ref reader, ref readStack);
                     }
                     else if (tokenType == JsonTokenType.StartObject)
                     {
-                        if (state.Current.SkipProperty)
+                        if (readStack.Current.SkipProperty)
                         {
-                            state.Push();
-                            state.Current.Drain = true;
+                            readStack.Push();
+                            readStack.Current.Drain = true;
                         }
-                        else if (state.Current.IsProcessingValue)
+                        else if (readStack.Current.IsProcessingValue)
                         {
-                            if (HandleValue(tokenType, options, ref reader, ref state))
+                            if (!HandleObjectAsValue(tokenType, options, ref reader, ref readStack, ref initialState))
                             {
-                                continue;
+                                // Need more data
+                                break;
                             }
                         }
-                        else if (state.Current.IsProcessingDictionary)
+                        else if (readStack.Current.IsProcessingDictionary)
                         {
-                            HandleStartDictionary(options, ref reader, ref state);
+                            HandleStartDictionary(options, ref reader, ref readStack);
                         }
                         else
                         {
-                            HandleStartObject(options, ref reader, ref state);
+                            HandleStartObject(options, ref reader, ref readStack);
                         }
                     }
                     else if (tokenType == JsonTokenType.EndObject)
                     {
-                        if (state.Current.Drain)
+                        if (readStack.Current.Drain)
                         {
-                            state.Pop();
+                            readStack.Pop();
                         }
-                        else if (state.Current.IsProcessingDictionary)
+                        else if (readStack.Current.IsProcessingDictionary)
                         {
-                            HandleEndDictionary(options, ref reader, ref state);
+                            HandleEndDictionary(options, ref reader, ref readStack);
                         }
                         else
                         {
-                            HandleEndObject(options, ref reader, ref state);
+                            HandleEndObject(options, ref reader, ref readStack);
                         }
                     }
                     else if (tokenType == JsonTokenType.StartArray)
                     {
-                        if (!state.Current.IsProcessingValue)
+                        if (!readStack.Current.IsProcessingValue)
                         {
-                            HandleStartArray(options, ref reader, ref state);
+                            HandleStartArray(options, ref reader, ref readStack);
                         }
-                        else if (HandleValue(tokenType, options, ref reader, ref state))
+                        else if (!HandleObjectAsValue(tokenType, options, ref reader, ref readStack, ref initialState))
                         {
-                            continue;
+                            // Need more data
+                            break;
                         }
                     }
                     else if (tokenType == JsonTokenType.EndArray)
                     {
-                        if (HandleEndArray(options, ref reader, ref state))
-                        {
-                            continue;
-                        }
+                        HandleEndArray(options, ref reader, ref readStack);
                     }
                     else if (tokenType == JsonTokenType.Null)
                     {
-                        if (HandleNull(ref reader, ref state, options))
-                        {
-                            continue;
-                        }
+                        HandleNull(ref reader, ref readStack, options);
                     }
                 }
             }
             catch (JsonReaderException e)
             {
                 // Re-throw with Path information.
-                ThrowHelper.ReThrowWithPath(e, state.JsonPath);
+                ThrowHelper.ReThrowWithPath(e, readStack.JsonPath);
             }
 
+            readStack.BytesConsumed += reader.BytesConsumed;
             return;
         }
 
+        [MethodImpl(MethodImplOptions.AggressiveInlining)]
+        private static bool HandleObjectAsValue(
+            JsonTokenType tokenType,
+            JsonSerializerOptions options,
+            ref Utf8JsonReader reader,
+            ref ReadStack readStack,
+            ref JsonReaderState initialState)
+        {
+            if (options.ReadAhead)
+            {
+                // Attempt to skip to make sure we have all the data we need.
+                bool complete = reader.TrySkip();
+
+                // We need to restore the state in all cases as we need to be positioned back before
+                // the current token to either attempt to skip again or to actually read the value in
+                // HandleValue below.
+
+                reader = new Utf8JsonReader(
+                    reader.OriginalSpan.Slice(checked((int)initialState.BytesConsumed)),
+                    isFinalBlock: reader.IsFinalBlock,
+                    state: initialState);
+                Debug.Assert(reader.BytesConsumed == 0);
+                readStack.BytesConsumed += initialState.BytesConsumed;
+
+                if (!complete)
+                {
+                    // Couldn't read to the end of the object, exit out to get more data in the buffer.
+                    return false;
+                }
+
+                // Success, requeue the reader to the token for HandleValue.
+                reader.Read();
+                Debug.Assert(tokenType == reader.TokenType);
+            }
+
+            HandleValue(tokenType, options, ref reader, ref readStack);
+            return true;
+        }
+
         private static ReadOnlySpan<byte> GetUnescapedString(ReadOnlySpan<byte> utf8Source, int idx)
         {
             // The escaped name is always longer than the unescaped, so it is safe to use escaped name for the buffer length.
index 70a1cc9..00e21f5 100644 (file)
@@ -314,5 +314,10 @@ namespace System.Text.Json.Serialization
                 ThrowHelper.ThrowInvalidOperationException_SerializerOptionsImmutable();
             }
         }
+
+        /// <summary>
+        /// Internal flag to let us know that we need to read ahead in the inner read loop.
+        /// </summary>
+        internal bool ReadAhead { get; set; }
     }
 }
index c394bac..be6d645 100644 (file)
@@ -145,5 +145,10 @@ namespace System.Text.Json.Serialization
 
             return propertyName;
         }
+
+        /// <summary>
+        /// Bytes consumed in the current loop
+        /// </summary>
+        public long BytesConsumed;
     }
 }
index f146734..544c021 100644 (file)
@@ -2,6 +2,7 @@
 // The .NET Foundation licenses this file to you under the MIT license.
 // See the LICENSE file in the project root for more information.
 
+using System.IO;
 using System.Linq;
 using Xunit;
 
@@ -145,5 +146,32 @@ namespace System.Text.Json.Serialization.Tests
                 Assert.Equal("Hello", Array[3].ToString());
             }
         }
+
+        [Theory,
+            InlineData(5),
+            InlineData(10),
+            InlineData(20),
+            InlineData(1024)]
+        public void ReadJsonElementFromStream(int defaultBufferSize)
+        {
+            // Streams need to read ahead when they hit objects or arrays that are assigned to JsonElement or object.
+
+            byte[] data = Encoding.UTF8.GetBytes(@"{""Data"":[1,true,{""City"":""MyCity""},null,""foo""]}");
+            MemoryStream stream = new MemoryStream(data);
+            JsonElement obj = JsonSerializer.ReadAsync<JsonElement>(stream, new JsonSerializerOptions { DefaultBufferSize = defaultBufferSize }).Result;
+
+            data = Encoding.UTF8.GetBytes(@"[1,true,{""City"":""MyCity""},null,""foo""]");
+            stream = new MemoryStream(data);
+            obj = JsonSerializer.ReadAsync<JsonElement>(stream, new JsonSerializerOptions { DefaultBufferSize = defaultBufferSize }).Result;
+
+            // Ensure we fail with incomplete data
+            data = Encoding.UTF8.GetBytes(@"{""Data"":[1,true,{""City"":""MyCity""},null,""foo""]");
+            stream = new MemoryStream(data);
+            Assert.Throws<JsonException>(() => JsonSerializer.ReadAsync<JsonElement>(stream, new JsonSerializerOptions { DefaultBufferSize = defaultBufferSize }).Result);
+
+            data = Encoding.UTF8.GetBytes(@"[1,true,{""City"":""MyCity""},null,""foo""");
+            stream = new MemoryStream(data);
+            Assert.Throws<JsonException>(() => JsonSerializer.ReadAsync<JsonElement>(stream, new JsonSerializerOptions { DefaultBufferSize = defaultBufferSize }).Result);
+        }
     }
 }
index 2dd50b7..c54dced 100644 (file)
@@ -3,6 +3,7 @@
 // See the LICENSE file in the project root for more information.
 
 using System.Collections.Generic;
+using System.IO;
 using Xunit;
 
 namespace System.Text.Json.Serialization.Tests
@@ -24,6 +25,29 @@ namespace System.Text.Json.Serialization.Tests
             ((ITestClass)obj).Verify();
         }
 
+        [Theory]
+        [MemberData(nameof(ReadSuccessCases))]
+        public static void ReadFromStream(Type classType, byte[] data)
+        {
+            MemoryStream stream = new MemoryStream(data);
+            object obj = JsonSerializer.ReadAsync(
+                stream,
+                classType).Result;
+
+            Assert.IsAssignableFrom(typeof(ITestClass), obj);
+            ((ITestClass)obj).Verify();
+
+            // Try again with a smaller initial buffer size to ensure we handle incomplete data
+            stream = new MemoryStream(data);
+            obj = JsonSerializer.ReadAsync(
+                stream,
+                classType,
+                new JsonSerializerOptions { DefaultBufferSize = 5 }).Result;
+
+            Assert.IsAssignableFrom(typeof(ITestClass), obj);
+            ((ITestClass)obj).Verify();
+        }
+
         [Fact]
         public static void ReadGenericApi()
         {