_responseDataPayloadRemaining -= copyLen;
_recvBuffer.Discard(copyLen);
buffer = buffer.Slice(copyLen);
+
+ // Stop, if we've reached the end of a data frame and start of the next data frame is not buffered yet
+ // Waiting for the next data frame may cause a hang, e.g. in echo scenario
+ // TODO: this is inefficient if data is already available in transport
+ if (_responseDataPayloadRemaining == 0 && _recvBuffer.ActiveLength == 0)
+ {
+ break;
+ }
}
else
{
_responseDataPayloadRemaining -= bytesRead;
buffer = buffer.Slice(bytesRead);
- if (_responseDataPayloadRemaining == 0)
- {
- break;
- }
+ // Stop, even if we are in the middle of a data frame. Waiting for the next data may cause a hang
+ // TODO: this is inefficient if data is already available in transport
+ break;
}
}
_responseDataPayloadRemaining -= copyLen;
_recvBuffer.Discard(copyLen);
buffer = buffer.Slice(copyLen);
+
+ // Stop, if we've reached the end of a data frame and start of the next data frame is not buffered yet
+ // Waiting for the next data frame may cause a hang, e.g. in echo scenario
+ // TODO: this is inefficient if data is already available in transport
+ if (_responseDataPayloadRemaining == 0 && _recvBuffer.ActiveLength == 0)
+ {
+ break;
+ }
}
else
{
_responseDataPayloadRemaining -= bytesRead;
buffer = buffer.Slice(bytesRead);
- if (_responseDataPayloadRemaining == 0)
- {
- break;
- }
+ // Stop, even if we are in the middle of a data frame. Waiting for the next data may cause a hang
+ // TODO: this is inefficient if data is already available in transport
+ break;
}
}
connection.Dispose();
}
+ [Theory]
+ [InlineData(1)] // frame fits into Http3RequestStream buffer
+ [InlineData(10)]
+ [InlineData(100)] // frame doesn't fit into Http3RequestStream buffer
+ [InlineData(1000)]
+ public async Task EchoServerStreaming_DifferentMessageSize_Success(int messageSize)
+ {
+ int iters = 5;
+ var message = new byte[messageSize];
+ var readBuffer = new byte[5 * messageSize]; // bigger than message
+ var random = new Random(0);
+
+ using Http3LoopbackServer server = CreateHttp3LoopbackServer();
+ Http3LoopbackConnection connection = null;
+ Http3LoopbackStream serverStream = null;
+
+ Task serverTask = Task.Run(async () =>
+ {
+ connection = (Http3LoopbackConnection)await server.EstablishGenericConnectionAsync();
+ serverStream = await connection.AcceptRequestStreamAsync();
+
+ HttpRequestData requestData = await serverStream.ReadRequestDataAsync(readBody: false).WaitAsync(TimeSpan.FromSeconds(30));
+
+ await serverStream.SendResponseHeadersAsync().ConfigureAwait(false);
+
+ while (true)
+ {
+ (long? frameType, byte[] payload) = await serverStream.ReadFrameAsync();
+ if (frameType == null)
+ {
+ // EOS
+ break;
+ }
+ // echo back
+ await serverStream.SendDataFrameAsync(payload).WaitAsync(TimeSpan.FromSeconds(30));
+ }
+ // send FIN
+ await serverStream.SendResponseBodyAsync(Array.Empty<byte>(), isFinal: true);
+ });
+
+ StreamingHttpContent requestContent = new StreamingHttpContent();
+
+ using HttpClient client = CreateHttpClient();
+ using HttpRequestMessage request = new()
+ {
+ Method = HttpMethod.Post,
+ RequestUri = server.Address,
+ Version = HttpVersion30,
+ VersionPolicy = HttpVersionPolicy.RequestVersionExact,
+ Content = requestContent
+ };
+
+ var responseTask = client.SendAsync(request, HttpCompletionOption.ResponseHeadersRead).WaitAsync(TimeSpan.FromSeconds(10));
+
+ Stream requestStream = await requestContent.GetStreamAsync();
+ // Send headers
+ await requestStream.FlushAsync();
+
+ using HttpResponseMessage response = await responseTask;
+
+ var responseStream = await response.Content.ReadAsStreamAsync();
+
+ for (int i = 0; i < iters; ++i)
+ {
+ random.NextBytes(message);
+ await requestStream.WriteAsync(message).AsTask().WaitAsync(TimeSpan.FromSeconds(10));
+ await requestStream.FlushAsync();
+
+ int bytesRead = await responseStream.ReadAsync(readBuffer).AsTask().WaitAsync(TimeSpan.FromSeconds(10));
+ Assert.Equal(bytesRead, messageSize);
+ Assert.Equal(message, readBuffer[..bytesRead]);
+ }
+ // Send FIN
+ requestContent.CompleteStream();
+ // Receive FIN
+ Assert.Equal(0, await responseStream.ReadAsync(readBuffer).AsTask().WaitAsync(TimeSpan.FromSeconds(10)));
+
+ await serverTask.WaitAsync(TimeSpan.FromSeconds(60));
+
+ serverStream.Dispose();
+ Assert.NotNull(connection);
+ connection.Dispose();
+ }
+
public static TheoryData<HttpStatusCode, bool> StatusCodesTestData()
{
var statuses = Enum.GetValues(typeof(HttpStatusCode)).Cast<HttpStatusCode>().Where(s => s >= HttpStatusCode.OK); // exclude informational