From fed5acc4e1b8ac07d6dfe214a069f440a2673594 Mon Sep 17 00:00:00 2001 From: Jeremy Kuhne Date: Thu, 30 May 2019 22:00:36 -0700 Subject: [PATCH] Add read ahead logic for streams. (dotnet/corefx#38039) * Add read ahead logic for streams. When reading a jaon object or array from a stream into an object we need to TrySkip to ensure that we have all the needed data for the JsonDocument to create a JsonElement. This is only necessary if we haven't already drained the stream. * Track state correctly We need to track consumed separately so we can requeue the reader properly after skipping. Add more stream tests. * Clarify comments and other feedback. * Fix comment Commit migrated from https://github.com/dotnet/corefx/commit/f6b010d2a0bdab94953d519a60d882ff805eea36 --- .../Serialization/JsonSerializer.Read.Stream.cs | 24 +++-- .../Text/Json/Serialization/JsonSerializer.Read.cs | 119 +++++++++++++++------ .../Json/Serialization/JsonSerializerOptions.cs | 5 + .../System/Text/Json/Serialization/ReadStack.cs | 5 + .../tests/Serialization/JsonElementTests.cs | 28 +++++ .../tests/Serialization/SpanTests.cs | 24 +++++ 6 files changed, 162 insertions(+), 43 deletions(-) diff --git a/src/libraries/System.Text.Json/src/System/Text/Json/Serialization/JsonSerializer.Read.Stream.cs b/src/libraries/System.Text.Json/src/System/Text/Json/Serialization/JsonSerializer.Read.Stream.cs index 19fdc98..61dc05e 100644 --- a/src/libraries/System.Text.Json/src/System/Text/Json/Serialization/JsonSerializer.Read.Stream.cs +++ b/src/libraries/System.Text.Json/src/System/Text/Json/Serialization/JsonSerializer.Read.Stream.cs @@ -83,8 +83,8 @@ namespace System.Text.Json.Serialization options = JsonSerializerOptions.s_defaultOptions; } - ReadStack state = default; - state.Current.Initialize(returnType, options); + ReadStack readStack = default; + readStack.Current.Initialize(returnType, options); var readerState = new JsonReaderState(options.GetReaderOptions()); @@ -138,10 +138,11 @@ namespace System.Text.Json.Serialization isFinalBlock, new Span(buffer, 0, bytesInBuffer), options, - ref state); + ref readStack); + + Debug.Assert(readStack.BytesConsumed <= bytesInBuffer); + int bytesConsumed = checked((int)readStack.BytesConsumed); - Debug.Assert(readerState.BytesConsumed <= bytesInBuffer); - int bytesConsumed = (int)readerState.BytesConsumed; bytesInBuffer -= bytesConsumed; if (isFinalBlock) @@ -183,7 +184,7 @@ namespace System.Text.Json.Serialization ThrowHelper.ThrowJsonException_DeserializeDataRemaining(totalBytesRead, bytesInBuffer); } - return (TValue)state.Current.ReturnValue; + return (TValue)readStack.Current.ReturnValue; } private static void ReadCore( @@ -191,14 +192,21 @@ namespace System.Text.Json.Serialization bool isFinalBlock, Span buffer, JsonSerializerOptions options, - ref ReadStack state) + ref ReadStack readStack) { var reader = new Utf8JsonReader(buffer, isFinalBlock, readerState); + // If we haven't read in the entire stream's payload we'll need to signify that we want + // to enable read ahead behaviors to ensure we have complete json objects and arrays + // ({}, []) when needed. (Notably to successfully parse JsonElement via JsonDocument + // to assign to object and JsonElement properties in the constructed .NET object.) + options.ReadAhead = !isFinalBlock; + readStack.BytesConsumed = 0; + ReadCore( options, ref reader, - ref state); + ref readStack); readerState = reader.CurrentState; } diff --git a/src/libraries/System.Text.Json/src/System/Text/Json/Serialization/JsonSerializer.Read.cs b/src/libraries/System.Text.Json/src/System/Text/Json/Serialization/JsonSerializer.Read.cs index d31d4e2..4d96348 100644 --- a/src/libraries/System.Text.Json/src/System/Text/Json/Serialization/JsonSerializer.Read.cs +++ b/src/libraries/System.Text.Json/src/System/Text/Json/Serialization/JsonSerializer.Read.cs @@ -3,8 +3,8 @@ // See the LICENSE file in the project root for more information. using System.Buffers; -using System.Collections; using System.Diagnostics; +using System.Runtime.CompilerServices; namespace System.Text.Json.Serialization { @@ -17,101 +17,150 @@ namespace System.Text.Json.Serialization private static void ReadCore( JsonSerializerOptions options, ref Utf8JsonReader reader, - ref ReadStack state) + ref ReadStack readStack) { try { - while (reader.Read()) + JsonReaderState initialState = default; + + while (true) { + if (options.ReadAhead) + { + // When we're reading ahead we always have to save the state + // as we don't know if the next token is an opening object or + // array brace. + initialState = reader.CurrentState; + } + + if (!reader.Read()) + { + // Need more data + break; + } + JsonTokenType tokenType = reader.TokenType; if (JsonHelpers.IsInRangeInclusive(tokenType, JsonTokenType.String, JsonTokenType.False)) { Debug.Assert(tokenType == JsonTokenType.String || tokenType == JsonTokenType.Number || tokenType == JsonTokenType.True || tokenType == JsonTokenType.False); - if (HandleValue(tokenType, options, ref reader, ref state)) - { - continue; - } + HandleValue(tokenType, options, ref reader, ref readStack); } else if (tokenType == JsonTokenType.PropertyName) { - HandlePropertyName(options, ref reader, ref state); + HandlePropertyName(options, ref reader, ref readStack); } else if (tokenType == JsonTokenType.StartObject) { - if (state.Current.SkipProperty) + if (readStack.Current.SkipProperty) { - state.Push(); - state.Current.Drain = true; + readStack.Push(); + readStack.Current.Drain = true; } - else if (state.Current.IsProcessingValue) + else if (readStack.Current.IsProcessingValue) { - if (HandleValue(tokenType, options, ref reader, ref state)) + if (!HandleObjectAsValue(tokenType, options, ref reader, ref readStack, ref initialState)) { - continue; + // Need more data + break; } } - else if (state.Current.IsProcessingDictionary) + else if (readStack.Current.IsProcessingDictionary) { - HandleStartDictionary(options, ref reader, ref state); + HandleStartDictionary(options, ref reader, ref readStack); } else { - HandleStartObject(options, ref reader, ref state); + HandleStartObject(options, ref reader, ref readStack); } } else if (tokenType == JsonTokenType.EndObject) { - if (state.Current.Drain) + if (readStack.Current.Drain) { - state.Pop(); + readStack.Pop(); } - else if (state.Current.IsProcessingDictionary) + else if (readStack.Current.IsProcessingDictionary) { - HandleEndDictionary(options, ref reader, ref state); + HandleEndDictionary(options, ref reader, ref readStack); } else { - HandleEndObject(options, ref reader, ref state); + HandleEndObject(options, ref reader, ref readStack); } } else if (tokenType == JsonTokenType.StartArray) { - if (!state.Current.IsProcessingValue) + if (!readStack.Current.IsProcessingValue) { - HandleStartArray(options, ref reader, ref state); + HandleStartArray(options, ref reader, ref readStack); } - else if (HandleValue(tokenType, options, ref reader, ref state)) + else if (!HandleObjectAsValue(tokenType, options, ref reader, ref readStack, ref initialState)) { - continue; + // Need more data + break; } } else if (tokenType == JsonTokenType.EndArray) { - if (HandleEndArray(options, ref reader, ref state)) - { - continue; - } + HandleEndArray(options, ref reader, ref readStack); } else if (tokenType == JsonTokenType.Null) { - if (HandleNull(ref reader, ref state, options)) - { - continue; - } + HandleNull(ref reader, ref readStack, options); } } } catch (JsonReaderException e) { // Re-throw with Path information. - ThrowHelper.ReThrowWithPath(e, state.JsonPath); + ThrowHelper.ReThrowWithPath(e, readStack.JsonPath); } + readStack.BytesConsumed += reader.BytesConsumed; return; } + [MethodImpl(MethodImplOptions.AggressiveInlining)] + private static bool HandleObjectAsValue( + JsonTokenType tokenType, + JsonSerializerOptions options, + ref Utf8JsonReader reader, + ref ReadStack readStack, + ref JsonReaderState initialState) + { + if (options.ReadAhead) + { + // Attempt to skip to make sure we have all the data we need. + bool complete = reader.TrySkip(); + + // We need to restore the state in all cases as we need to be positioned back before + // the current token to either attempt to skip again or to actually read the value in + // HandleValue below. + + reader = new Utf8JsonReader( + reader.OriginalSpan.Slice(checked((int)initialState.BytesConsumed)), + isFinalBlock: reader.IsFinalBlock, + state: initialState); + Debug.Assert(reader.BytesConsumed == 0); + readStack.BytesConsumed += initialState.BytesConsumed; + + if (!complete) + { + // Couldn't read to the end of the object, exit out to get more data in the buffer. + return false; + } + + // Success, requeue the reader to the token for HandleValue. + reader.Read(); + Debug.Assert(tokenType == reader.TokenType); + } + + HandleValue(tokenType, options, ref reader, ref readStack); + return true; + } + private static ReadOnlySpan GetUnescapedString(ReadOnlySpan utf8Source, int idx) { // The escaped name is always longer than the unescaped, so it is safe to use escaped name for the buffer length. diff --git a/src/libraries/System.Text.Json/src/System/Text/Json/Serialization/JsonSerializerOptions.cs b/src/libraries/System.Text.Json/src/System/Text/Json/Serialization/JsonSerializerOptions.cs index 70a1cc9..00e21f5 100644 --- a/src/libraries/System.Text.Json/src/System/Text/Json/Serialization/JsonSerializerOptions.cs +++ b/src/libraries/System.Text.Json/src/System/Text/Json/Serialization/JsonSerializerOptions.cs @@ -314,5 +314,10 @@ namespace System.Text.Json.Serialization ThrowHelper.ThrowInvalidOperationException_SerializerOptionsImmutable(); } } + + /// + /// Internal flag to let us know that we need to read ahead in the inner read loop. + /// + internal bool ReadAhead { get; set; } } } diff --git a/src/libraries/System.Text.Json/src/System/Text/Json/Serialization/ReadStack.cs b/src/libraries/System.Text.Json/src/System/Text/Json/Serialization/ReadStack.cs index c394bac..be6d645 100644 --- a/src/libraries/System.Text.Json/src/System/Text/Json/Serialization/ReadStack.cs +++ b/src/libraries/System.Text.Json/src/System/Text/Json/Serialization/ReadStack.cs @@ -145,5 +145,10 @@ namespace System.Text.Json.Serialization return propertyName; } + + /// + /// Bytes consumed in the current loop + /// + public long BytesConsumed; } } diff --git a/src/libraries/System.Text.Json/tests/Serialization/JsonElementTests.cs b/src/libraries/System.Text.Json/tests/Serialization/JsonElementTests.cs index f146734..544c021 100644 --- a/src/libraries/System.Text.Json/tests/Serialization/JsonElementTests.cs +++ b/src/libraries/System.Text.Json/tests/Serialization/JsonElementTests.cs @@ -2,6 +2,7 @@ // The .NET Foundation licenses this file to you under the MIT license. // See the LICENSE file in the project root for more information. +using System.IO; using System.Linq; using Xunit; @@ -145,5 +146,32 @@ namespace System.Text.Json.Serialization.Tests Assert.Equal("Hello", Array[3].ToString()); } } + + [Theory, + InlineData(5), + InlineData(10), + InlineData(20), + InlineData(1024)] + public void ReadJsonElementFromStream(int defaultBufferSize) + { + // Streams need to read ahead when they hit objects or arrays that are assigned to JsonElement or object. + + byte[] data = Encoding.UTF8.GetBytes(@"{""Data"":[1,true,{""City"":""MyCity""},null,""foo""]}"); + MemoryStream stream = new MemoryStream(data); + JsonElement obj = JsonSerializer.ReadAsync(stream, new JsonSerializerOptions { DefaultBufferSize = defaultBufferSize }).Result; + + data = Encoding.UTF8.GetBytes(@"[1,true,{""City"":""MyCity""},null,""foo""]"); + stream = new MemoryStream(data); + obj = JsonSerializer.ReadAsync(stream, new JsonSerializerOptions { DefaultBufferSize = defaultBufferSize }).Result; + + // Ensure we fail with incomplete data + data = Encoding.UTF8.GetBytes(@"{""Data"":[1,true,{""City"":""MyCity""},null,""foo""]"); + stream = new MemoryStream(data); + Assert.Throws(() => JsonSerializer.ReadAsync(stream, new JsonSerializerOptions { DefaultBufferSize = defaultBufferSize }).Result); + + data = Encoding.UTF8.GetBytes(@"[1,true,{""City"":""MyCity""},null,""foo"""); + stream = new MemoryStream(data); + Assert.Throws(() => JsonSerializer.ReadAsync(stream, new JsonSerializerOptions { DefaultBufferSize = defaultBufferSize }).Result); + } } } diff --git a/src/libraries/System.Text.Json/tests/Serialization/SpanTests.cs b/src/libraries/System.Text.Json/tests/Serialization/SpanTests.cs index 2dd50b7..c54dced 100644 --- a/src/libraries/System.Text.Json/tests/Serialization/SpanTests.cs +++ b/src/libraries/System.Text.Json/tests/Serialization/SpanTests.cs @@ -3,6 +3,7 @@ // See the LICENSE file in the project root for more information. using System.Collections.Generic; +using System.IO; using Xunit; namespace System.Text.Json.Serialization.Tests @@ -24,6 +25,29 @@ namespace System.Text.Json.Serialization.Tests ((ITestClass)obj).Verify(); } + [Theory] + [MemberData(nameof(ReadSuccessCases))] + public static void ReadFromStream(Type classType, byte[] data) + { + MemoryStream stream = new MemoryStream(data); + object obj = JsonSerializer.ReadAsync( + stream, + classType).Result; + + Assert.IsAssignableFrom(typeof(ITestClass), obj); + ((ITestClass)obj).Verify(); + + // Try again with a smaller initial buffer size to ensure we handle incomplete data + stream = new MemoryStream(data); + obj = JsonSerializer.ReadAsync( + stream, + classType, + new JsonSerializerOptions { DefaultBufferSize = 5 }).Result; + + Assert.IsAssignableFrom(typeof(ITestClass), obj); + ((ITestClass)obj).Verify(); + } + [Fact] public static void ReadGenericApi() { -- 2.7.4