diff --git a/src/Microsoft.OData.Core/Batch/ODataBatchUtils.cs b/src/Microsoft.OData.Core/Batch/ODataBatchUtils.cs index 63e650c8d3..ce2c32c0d9 100644 --- a/src/Microsoft.OData.Core/Batch/ODataBatchUtils.cs +++ b/src/Microsoft.OData.Core/Batch/ODataBatchUtils.cs @@ -77,11 +77,13 @@ internal static Uri CreateOperationRequestUri(Uri uri, Uri baseUri, IODataPayloa /// The batch stream to create the operation read stream for. /// The headers of the current part; based on the header we create different, optimized stream implementations. /// The operation listener to be passed to the newly created read stream. + /// true if the stream is to be created for synchronous operation; false for asynchronous. /// A new instance. internal static ODataReadStream CreateBatchOperationReadStream( ODataBatchReaderStream batchReaderStream, ODataBatchOperationHeaders headers, - IODataStreamListener operationListener) + IODataStreamListener operationListener, + bool synchronous = true) { Debug.Assert(batchReaderStream != null, "batchReaderStream != null"); Debug.Assert(operationListener != null, "operationListener != null"); @@ -96,10 +98,10 @@ internal static ODataReadStream CreateBatchOperationReadStream( throw new ODataException(Strings.ODataBatchReaderStream_InvalidContentLengthSpecified(contentLengthValue)); } - return ODataReadStream.Create(batchReaderStream, operationListener, length); + return ODataReadStream.Create(batchReaderStream, operationListener, length, synchronous); } - return ODataReadStream.Create(batchReaderStream, operationListener); + return ODataReadStream.Create(batchReaderStream, operationListener, synchronous); } /// @@ -107,6 +109,7 @@ internal static ODataReadStream CreateBatchOperationReadStream( /// /// The output stream to create the operation write stream over. /// The operation listener to be passed to the newly created write stream. + /// true if the stream is to be created for synchronous operation; false for asynchronous. /// A new instance. internal static ODataWriteStream CreateBatchOperationWriteStream( Stream outputStream, diff --git a/src/Microsoft.OData.Core/MultipartMixed/ODataMultipartMixedBatchReader.cs b/src/Microsoft.OData.Core/MultipartMixed/ODataMultipartMixedBatchReader.cs index c2df608859..1ccd08a916 100644 --- a/src/Microsoft.OData.Core/MultipartMixed/ODataMultipartMixedBatchReader.cs +++ b/src/Microsoft.OData.Core/MultipartMixed/ODataMultipartMixedBatchReader.cs @@ -97,7 +97,7 @@ protected override ODataBatchOperationRequestMessage CreateOperationRequestMessa } ODataBatchOperationRequestMessage requestMessage = BuildOperationRequestMessage( - () => ODataBatchUtils.CreateBatchOperationReadStream(this.batchStream, headers, this), + () => ODataBatchUtils.CreateBatchOperationReadStream(this.batchStream, headers, this, this.InputContext.Synchronous), httpMethod, requestUri, headers, @@ -139,7 +139,7 @@ protected override ODataBatchOperationResponseMessage CreateOperationResponseMes // We don't have correlation of changeset boundary between request and response messages in // changesets, so use null value for groupId. ODataBatchOperationResponseMessage responseMessage = BuildOperationResponseMessage( - () => ODataBatchUtils.CreateBatchOperationReadStream(this.batchStream, headers, this), + () => ODataBatchUtils.CreateBatchOperationReadStream(this.batchStream, headers, this, this.InputContext.Synchronous), statusCode, headers, this.currentContentId, diff --git a/src/Microsoft.OData.Core/MultipartMixed/ODataMultipartMixedBatchWriter.cs b/src/Microsoft.OData.Core/MultipartMixed/ODataMultipartMixedBatchWriter.cs index a942927fd8..1d3108fa55 100644 --- a/src/Microsoft.OData.Core/MultipartMixed/ODataMultipartMixedBatchWriter.cs +++ b/src/Microsoft.OData.Core/MultipartMixed/ODataMultipartMixedBatchWriter.cs @@ -125,7 +125,12 @@ await this.StartBatchOperationContentAsync() // then dispose the batch writer (since we are now writing the operation content) and set the corresponding state. await this.RawOutputContext.FlushBuffersAsync() .ConfigureAwait(false); +#if NETCOREAPP3_1_OR_GREATER + await this.DisposeBatchWriterAndSetContentStreamRequestedStateAsync() + .ConfigureAwait(false); +#else this.DisposeBatchWriterAndSetContentStreamRequestedState(); +#endif } /// @@ -752,5 +757,19 @@ await this.RawOutputContext.TextWriter.WriteLineAsync() } } } + +#if NETCOREAPP3_1_OR_GREATER + /// + /// Asynchronously disposes the batch writer and set the 'OperationStreamRequested' batch writer state; + /// called after the flush operation(s) have completed. + /// + /// A task that represents the asynchronous operation. + private async Task DisposeBatchWriterAndSetContentStreamRequestedStateAsync() + { + await this.RawOutputContext.CloseWriterAsync().ConfigureAwait(false); + + this.SetState(BatchWriterState.OperationStreamRequested); + } +#endif } } diff --git a/src/Microsoft.OData.Core/ODataRawOutputContext.cs b/src/Microsoft.OData.Core/ODataRawOutputContext.cs index a008326d6d..23fca177ca 100644 --- a/src/Microsoft.OData.Core/ODataRawOutputContext.cs +++ b/src/Microsoft.OData.Core/ODataRawOutputContext.cs @@ -318,6 +318,20 @@ internal Task FlushBuffersAsync() } } +#if NETCOREAPP3_1_OR_GREATER + /// + /// Closes the text writer asynchronously. + /// + /// A task that represents the asynchronous operation. + internal async Task CloseWriterAsync() + { + Debug.Assert(this.rawValueWriter != null, "The text writer has not been initialized yet."); + + await this.rawValueWriter.DisposeAsync().ConfigureAwait(false); + this.rawValueWriter = null; + } +#endif + /// /// Perform the actual cleanup work. /// diff --git a/src/Microsoft.OData.Core/ODataReadStream.cs b/src/Microsoft.OData.Core/ODataReadStream.cs index 402ced1381..5ae6104e87 100644 --- a/src/Microsoft.OData.Core/ODataReadStream.cs +++ b/src/Microsoft.OData.Core/ODataReadStream.cs @@ -29,8 +29,9 @@ internal abstract class ODataReadStream : ODataStream /// /// The underlying stream to read from. /// Listener interface to be notified of operation changes. - private ODataReadStream(ODataBatchReaderStream batchReaderStream, IODataStreamListener listener) - : base(listener) + /// true if the stream is created for synchronous operation; false for asynchronous. + private ODataReadStream(ODataBatchReaderStream batchReaderStream, IODataStreamListener listener, bool synchronous) + : base(listener, synchronous) { Debug.Assert(batchReaderStream != null, "batchReaderStream != null"); this.batchReaderStream = batchReaderStream; @@ -111,10 +112,15 @@ public override void Write(byte[] buffer, int offset, int count) /// The batch stream underlying the operation stream to create. /// The batch operation listener. /// The content length of the operation stream. + /// true if the stream is created for synchronous operation; false for asynchronous. /// A to read the content of a batch operation from. - internal static ODataReadStream Create(ODataBatchReaderStream batchReaderStream, IODataStreamListener listener, int length) + internal static ODataReadStream Create( + ODataBatchReaderStream batchReaderStream, + IODataStreamListener listener, + int length, + bool synchronous = true) { - return new ODataBatchOperationReadStreamWithLength(batchReaderStream, listener, length); + return new ODataBatchOperationReadStreamWithLength(batchReaderStream, listener, length, synchronous); } /// @@ -122,10 +128,14 @@ internal static ODataReadStream Create(ODataBatchReaderStream batchReaderStream, /// /// The batch stream underlying the operation stream to create. /// The batch operation listener. + /// true if the stream is created for synchronous operation; false for asynchronous. /// A to read the content of a batch operation from. - internal static ODataReadStream Create(ODataBatchReaderStream batchReaderStream, IODataStreamListener listener) + internal static ODataReadStream Create( + ODataBatchReaderStream batchReaderStream, + IODataStreamListener listener, + bool synchronous = true) { - return new ODataBatchOperationReadStreamWithDelimiter(batchReaderStream, listener); + return new ODataBatchOperationReadStreamWithDelimiter(batchReaderStream, listener, synchronous); } /// @@ -142,8 +152,12 @@ private sealed class ODataBatchOperationReadStreamWithLength : ODataReadStream /// The underlying batch stream to write the message to. /// Listener interface to be notified of operation changes. /// The total length of the stream. - internal ODataBatchOperationReadStreamWithLength(ODataBatchReaderStream batchReaderStream, IODataStreamListener listener, int length) - : base(batchReaderStream, listener) + internal ODataBatchOperationReadStreamWithLength( + ODataBatchReaderStream batchReaderStream, + IODataStreamListener listener, + int length, + bool synchronous) + : base(batchReaderStream, listener, synchronous) { ExceptionUtils.CheckIntegerNotNegative(length, "length"); this.length = length; @@ -189,8 +203,11 @@ private sealed class ODataBatchOperationReadStreamWithDelimiter : ODataReadStrea /// /// The underlying batch stream to write the message to. /// Listener interface to be notified of operation changes. - internal ODataBatchOperationReadStreamWithDelimiter(ODataBatchReaderStream batchReaderStream, IODataStreamListener listener) - : base(batchReaderStream, listener) + internal ODataBatchOperationReadStreamWithDelimiter( + ODataBatchReaderStream batchReaderStream, + IODataStreamListener listener, + bool synchronous) + : base(batchReaderStream, listener, synchronous) { } diff --git a/src/Microsoft.OData.Core/ODataWriteStream.cs b/src/Microsoft.OData.Core/ODataWriteStream.cs index d2ea225be4..0d1cc33107 100644 --- a/src/Microsoft.OData.Core/ODataWriteStream.cs +++ b/src/Microsoft.OData.Core/ODataWriteStream.cs @@ -28,6 +28,7 @@ internal sealed class ODataWriteStream : ODataStream /// /// The underlying stream to write the message to. /// Listener interface to be notified of operation changes. + /// true if the stream is created for synchronous operation; false for asynchronous. internal ODataWriteStream(Stream stream, IODataStreamListener listener, bool synchronous = true) : base(listener, synchronous) { diff --git a/src/Microsoft.OData.Core/RawValueWriter.cs b/src/Microsoft.OData.Core/RawValueWriter.cs index aba295e69a..9dc79181bd 100644 --- a/src/Microsoft.OData.Core/RawValueWriter.cs +++ b/src/Microsoft.OData.Core/RawValueWriter.cs @@ -17,7 +17,11 @@ namespace Microsoft.OData /// /// Class that handles writing top level raw values to a stream. /// +#if NETCOREAPP3_1_OR_GREATER + internal sealed class RawValueWriter : IDisposable, IAsyncDisposable +#else internal sealed class RawValueWriter : IDisposable +#endif { /// /// Writer settings. @@ -86,6 +90,30 @@ public void Dispose() this.textWriter = null; } +#if NETCOREAPP3_1_OR_GREATER + /// + /// Asynchronously disposes the . + /// It flushes itself and then disposes its inner . + /// + /// A task that represents the asynchronous dispose operation. + public ValueTask DisposeAsync() + { + return DisposeInnerAsync(); + + async ValueTask DisposeInnerAsync() + { + Debug.Assert(this.textWriter != null, "The text writer has not been initialized yet."); + + if (this.textWriter != null) + { + await this.textWriter.DisposeAsync().ConfigureAwait(false); + } + + this.textWriter = null; + } + } +#endif + /// /// Start writing a raw output. This should only be called once. /// diff --git a/test/FunctionalTests/Microsoft.OData.Core.Tests/ODataMultipartMixedBatchReaderTests.cs b/test/FunctionalTests/Microsoft.OData.Core.Tests/ODataMultipartMixedBatchReaderTests.cs new file mode 100644 index 0000000000..be8b0a4da4 --- /dev/null +++ b/test/FunctionalTests/Microsoft.OData.Core.Tests/ODataMultipartMixedBatchReaderTests.cs @@ -0,0 +1,784 @@ +//--------------------------------------------------------------------- +// +// Copyright (C) Microsoft Corporation. All rights reserved. See License.txt in the project root for license information. +// +//--------------------------------------------------------------------- + +using System; +using System.Collections.Generic; +using System.IO; +using System.Linq; +using System.Threading.Tasks; +using Microsoft.OData.Edm; +using Microsoft.OData.MultipartMixed; +using Xunit; + +namespace Microsoft.OData.Tests +{ + public class ODataMultipartMixedBatchReaderTests + { + private const string batchRequestBoundary = "batch_aed653ab"; + private const string batchResponseBoundary = "batchresponse_aed653ab"; + private readonly ODataMediaType mediaType; + private readonly ODataMessageReaderSettings messageReaderSettings; + private readonly ODataBatchOperationHeaders batchOperationHeaders; + + #region Batch Paylods + + private const string batchRequestPayload = @"--batch_aed653ab +Content-Type: multipart/mixed; boundary=changeset_5e368128 + +--changeset_5e368128 +Content-Type: application/http +Content-Transfer-Encoding: binary +Content-ID: 1 + +POST http://tempuri.org/Customers HTTP/1.1 +OData-Version: 4.0 +OData-MaxVersion: 4.0 +Content-Type: application/json;odata.metadata=minimal +Accept: application/json;odata.metadata=minimal +Accept-Charset: UTF-8 + +{""@odata.type"":""NS.Customer"",""Id"":1,""Name"":""Sue""} +--changeset_5e368128 +Content-Type: application/http +Content-Transfer-Encoding: binary +Content-ID: 2 + +POST http://tempuri.org/Orders HTTP/1.1 +OData-Version: 4.0 +OData-MaxVersion: 4.0 +Content-Type: application/json;odata.metadata=minimal +Accept: application/json;odata.metadata=minimal +Accept-Charset: UTF-8 + +{""@odata.type"":""NS.Order"",""Id"":1,""Amount"":13} +--changeset_5e368128 +Content-Type: application/http +Content-Transfer-Encoding: binary +Content-ID: 3 + +POST $1/Orders/$ref HTTP/1.1 +OData-Version: 4.0 +OData-MaxVersion: 4.0 +Content-Type: application/json;odata.metadata=minimal +Accept: application/json;odata.metadata=minimal +Accept-Charset: UTF-8 + +{""@odata.id"":""$2""} +--changeset_5e368128-- +--batch_aed653ab-- +"; + + private const string batchResponsePayload = @"--batchresponse_aed653ab +Content-Type: application/http +Content-Transfer-Encoding: binary + +HTTP/1.1 200 OK +Content-Type: application/json;odata.metadata=minimal;odata.streaming=true +OData-Version: 4.0 + +{""@odata.context"":""http://tempuri.org/$metadata#Customers/$entity"",""Id"":1,""Name"":""Sue""} +--batchresponse_aed653ab-- +"; + + #endregion Batch Payloads + + private EdmModel model; + + public ODataMultipartMixedBatchReaderTests() + { + this.InitializeEdmModel(); + this.messageReaderSettings = new ODataMessageReaderSettings(); + this.mediaType = new ODataMediaType( + "Multipart", + "Mixed", + new KeyValuePair[] { new KeyValuePair("boundary", batchRequestBoundary) }); + + this.batchOperationHeaders = new ODataBatchOperationHeaders + { + { "OData-Version", "4.0" }, + { "OData-MaxVersion", "4.0" }, + { "Content-Type", "application/json;odata.metadata=minimal" }, + { "Accept", "application/json;odata.metadata=minimal" }, + { "Accept-Charset", "UTF-8" } + }; + } + + [Fact] + public void ReadMultipartMixedBatchRequest() + { + var verifyUrlStack = new Stack(new[] { "$1/Orders/$ref", "http://tempuri.org/Orders", "http://tempuri.org/Customers" }); + + var verifyDependsOnIdsStack = new Stack>>(); + verifyDependsOnIdsStack.Push((dependsOnIds) => + { + Assert.Equal(2, dependsOnIds.Count()); + Assert.Equal("1", dependsOnIds.First()); + Assert.Equal("2", dependsOnIds.Last()); + }); + verifyDependsOnIdsStack.Push((dependsOnIds) => Assert.Equal("1", Assert.Single(dependsOnIds))); + verifyDependsOnIdsStack.Push((dependsOnIds) => Assert.Empty(dependsOnIds)); + + var verifyResourceStack = new Stack>(); + verifyResourceStack.Push((resource) => + { + Assert.NotNull(resource); + Assert.Equal("NS.Order", resource.TypeName); + var properties = resource.Properties.ToArray(); + Assert.Equal(2, properties.Length); + Assert.Equal("Id", properties[0].Name); + Assert.Equal(1, properties[0].Value); + Assert.Equal("Amount", properties[1].Name); + Assert.Equal(13M, properties[1].Value); + }); + verifyResourceStack.Push((resource) => + { + Assert.NotNull(resource); + Assert.Equal("NS.Customer", resource.TypeName); + var properties = resource.Properties.ToArray(); + Assert.Equal(2, properties.Length); + Assert.Equal("Id", properties[0].Name); + Assert.Equal(1, properties[0].Value); + Assert.Equal("Name", properties[1].Name); + Assert.Equal("Sue", properties[1].Value); + }); + + SetupMultipartMixedBatchReaderAndRunTest( + batchRequestPayload, + (multipartMixedBatchReader) => + { + var operationCount = 0; + + while (multipartMixedBatchReader.Read()) + { + switch (multipartMixedBatchReader.State) + { + case ODataBatchReaderState.Operation: + var operationRequestMessage = multipartMixedBatchReader.CreateOperationRequestMessage(); + operationCount++; + + Assert.Equal("POST", operationRequestMessage.Method); + + Assert.NotEmpty(verifyUrlStack); + Assert.NotNull(operationRequestMessage.Url); + Assert.Equal(verifyUrlStack.Pop(), operationRequestMessage.Url.OriginalString); + + Assert.NotEmpty(verifyDependsOnIdsStack); + var verifyDependsOnId = verifyDependsOnIdsStack.Pop(); + verifyDependsOnId(operationRequestMessage.DependsOnIds); + + using (var messageReader = new ODataMessageReader(operationRequestMessage, new ODataMessageReaderSettings(), this.model)) + { + if (operationCount == 3) + { + var entityReferenceLink = messageReader.ReadEntityReferenceLink(); + + Assert.Equal("$2", entityReferenceLink.Url.OriginalString); + } + else + { + var jsonLightResourceReader = messageReader.CreateODataResourceReader(); + + while (jsonLightResourceReader.Read()) + { + switch (jsonLightResourceReader.State) + { + case ODataReaderState.ResourceEnd: + Assert.NotEmpty(verifyResourceStack); + var innerVerifyResourceStack = verifyResourceStack.Pop(); + innerVerifyResourceStack(jsonLightResourceReader.Item as ODataResource); + break; + } + } + } + } + break; + } + } + }, + batchRequestBoundary); + + Assert.Empty(verifyUrlStack); + Assert.Empty(verifyDependsOnIdsStack); + Assert.Empty(verifyResourceStack); + } + + [Fact] + public async Task ReadMultipartMixedBatchRequestAsync() + { + var verifyUrlStack = new Stack(new[] { "$1/Orders/$ref", "http://tempuri.org/Orders", "http://tempuri.org/Customers" }); + + var verifyDependsOnIdsStack = new Stack>>(); + verifyDependsOnIdsStack.Push((dependsOnIds) => + { + Assert.Equal(2, dependsOnIds.Count()); + Assert.Equal("1", dependsOnIds.First()); + Assert.Equal("2", dependsOnIds.Last()); + }); + verifyDependsOnIdsStack.Push((dependsOnIds) => Assert.Equal("1", Assert.Single(dependsOnIds))); + verifyDependsOnIdsStack.Push((dependsOnIds) => Assert.Empty(dependsOnIds)); + + var verifyResourceStack = new Stack>(); + verifyResourceStack.Push((resource) => + { + Assert.NotNull(resource); + Assert.Equal("NS.Order", resource.TypeName); + var properties = resource.Properties.ToArray(); + Assert.Equal(2, properties.Length); + Assert.Equal("Id", properties[0].Name); + Assert.Equal(1, properties[0].Value); + Assert.Equal("Amount", properties[1].Name); + Assert.Equal(13M, properties[1].Value); + }); + verifyResourceStack.Push((resource) => + { + Assert.NotNull(resource); + Assert.Equal("NS.Customer", resource.TypeName); + var properties = resource.Properties.ToArray(); + Assert.Equal(2, properties.Length); + Assert.Equal("Id", properties[0].Name); + Assert.Equal(1, properties[0].Value); + Assert.Equal("Name", properties[1].Name); + Assert.Equal("Sue", properties[1].Value); + }); + + await SetupMultipartMixedBatchReaderAndRunTestAsync( + batchRequestPayload, + async (multipartMixedBatchReader) => + { + var operationCount = 0; + + while (await multipartMixedBatchReader.ReadAsync()) + { + switch (multipartMixedBatchReader.State) + { + case ODataBatchReaderState.Operation: + var operationRequestMessage = await multipartMixedBatchReader.CreateOperationRequestMessageAsync(); + operationCount++; + + Assert.Equal("POST", operationRequestMessage.Method); + + Assert.NotEmpty(verifyUrlStack); + Assert.NotNull(operationRequestMessage.Url); + Assert.Equal(verifyUrlStack.Pop(), operationRequestMessage.Url.OriginalString); + + Assert.NotEmpty(verifyDependsOnIdsStack); + var verifyDependsOnId = verifyDependsOnIdsStack.Pop(); + verifyDependsOnId(operationRequestMessage.DependsOnIds); + + using (var messageReader = new ODataMessageReader(operationRequestMessage, new ODataMessageReaderSettings(), this.model)) + { + if (operationCount == 3) + { + var entityReferenceLink = await messageReader.ReadEntityReferenceLinkAsync(); + + Assert.Equal("$2", entityReferenceLink.Url.OriginalString); + } + else + { + var jsonLightResourceReader = await messageReader.CreateODataResourceReaderAsync(); + + while (await jsonLightResourceReader.ReadAsync()) + { + switch (jsonLightResourceReader.State) + { + case ODataReaderState.ResourceEnd: + Assert.NotEmpty(verifyResourceStack); + var innerVerifyResourceStack = verifyResourceStack.Pop(); + innerVerifyResourceStack(jsonLightResourceReader.Item as ODataResource); + break; + } + } + } + } + break; + } + } + }, + batchRequestBoundary); + + Assert.Empty(verifyUrlStack); + Assert.Empty(verifyDependsOnIdsStack); + Assert.Empty(verifyResourceStack); + } + + [Fact] + public void ReadMultipartMixedBatchResponse() + { + bool resourceRead = false; + + SetupMultipartMixedBatchReaderAndRunTest( + batchResponsePayload, + (multipartMixedBatchReader) => + { + while (multipartMixedBatchReader.Read()) + { + switch (multipartMixedBatchReader.State) + { + case ODataBatchReaderState.Operation: + var operationResponseMessage = multipartMixedBatchReader.CreateOperationResponseMessage(); + + using (var messageReader = new ODataMessageReader(operationResponseMessage, new ODataMessageReaderSettings(), this.model)) + { + var jsonLightResourceReader = messageReader.CreateODataResourceReader(); + + while (jsonLightResourceReader.Read()) + { + switch (jsonLightResourceReader.State) + { + case ODataReaderState.ResourceEnd: + var resource = jsonLightResourceReader.Item as ODataResource; + resourceRead = true; + Assert.NotNull(resource); + Assert.Equal("NS.Customer", resource.TypeName); + var properties = resource.Properties.ToArray(); + Assert.Equal(2, properties.Length); + Assert.Equal("Id", properties[0].Name); + Assert.Equal(1, properties[0].Value); + Assert.Equal("Name", properties[1].Name); + Assert.Equal("Sue", properties[1].Value); + + break; + } + } + } + break; + } + } + }, + batchResponseBoundary, + isRequest: false); + + Assert.True(resourceRead); + } + + [Fact] + public async Task ReadMultipartMixedBatchResponseAsync() + { + bool resourceRead = false; + + await SetupMultipartMixedBatchReaderAndRunTestAsync( + batchResponsePayload, + async (multipartMixedBatchReader) => + { + while (await multipartMixedBatchReader.ReadAsync()) + { + switch (multipartMixedBatchReader.State) + { + case ODataBatchReaderState.Operation: + var operationResponseMessage = await multipartMixedBatchReader.CreateOperationResponseMessageAsync(); + + using (var messageReader = new ODataMessageReader(operationResponseMessage, new ODataMessageReaderSettings(), this.model)) + { + var jsonLightResourceReader = await messageReader.CreateODataResourceReaderAsync(); + + while (await jsonLightResourceReader.ReadAsync()) + { + switch (jsonLightResourceReader.State) + { + case ODataReaderState.ResourceEnd: + var resource = jsonLightResourceReader.Item as ODataResource; + resourceRead = true; + Assert.NotNull(resource); + Assert.Equal("NS.Customer", resource.TypeName); + var properties = resource.Properties.ToArray(); + Assert.Equal(2, properties.Length); + Assert.Equal("Id", properties[0].Name); + Assert.Equal(1, properties[0].Value); + Assert.Equal("Name", properties[1].Name); + Assert.Equal("Sue", properties[1].Value); + + break; + } + } + } + break; + } + } + }, + batchResponseBoundary, + isRequest: false); + + Assert.True(resourceRead); + } + + #region BatchOperationReadStream + + [Theory] + [InlineData(true, "StreamDisposed")] + [InlineData(false, "StreamDisposedAsync")] + public void ODataBatchReaderStreamDisposeShouldInvokeStreamDisposed(bool synchronous, string expected) + { + var payload = @" + +{""@odata.type"":""NS.Customer"",""Id"":1,""Name"":""Sue""} +--batch_aed653ab-- +"; + var stream = new MemoryStream(); + using (Stream batchReaderStream = ODataBatchUtils.CreateBatchOperationReadStream( + CreateBatchReaderStream(payload), + this.batchOperationHeaders, + new MockODataStreamListener(new StreamWriter(stream)), + synchronous: synchronous)) + { + } + + stream.Position = 0; + var contents = new StreamReader(stream).ReadToEnd(); + + Assert.Equal(expected, contents); + } + + [Theory] + [InlineData(true, "StreamDisposed")] + [InlineData(false, "StreamDisposedAsync")] + public void ODataBatchReaderStreamDisposeShouldBeIdempotent(bool synchronous, string expected) + { + var payload = @" + +{""@odata.type"":""NS.Customer"",""Id"":1,""Name"":""Sue""} +--batch_aed653ab-- +"; + var stream = new MemoryStream(); + Stream batchReaderStream = ODataBatchUtils.CreateBatchOperationReadStream( + CreateBatchReaderStream(payload), + this.batchOperationHeaders, + new MockODataStreamListener(new StreamWriter(stream)), + synchronous: synchronous); + + // 1st call to Dispose + batchReaderStream.Dispose(); + // 2nd call to Dispose + batchReaderStream.Dispose(); + + stream.Position = 0; + var contents = new StreamReader(stream).ReadToEnd(); + + // StreamDisposed/StreamDisposeAsync was written only once + Assert.Equal(expected, contents); + } + +#if NETCOREAPP3_1_OR_GREATER + [Fact] + public async Task ODataBatchReaderStreamDisposeAsyncShouldInvokeStreamDisposedAsync() + { + var payload = @" + +{""@odata.type"":""NS.Customer"",""Id"":1,""Name"":""Sue""} +--batch_aed653ab-- +"; + var stream = new MemoryStream(); + await using (Stream batchReaderStream = ODataBatchUtils.CreateBatchOperationReadStream( + CreateBatchReaderStream(payload), + this.batchOperationHeaders, + new MockODataStreamListener(new StreamWriter(stream))))// `synchronous` argument becomes irrelevant since we'll directly call DisposeAsync + { + } + + stream.Position = 0; + var contents = await new StreamReader(stream).ReadToEndAsync(); + + Assert.Equal("StreamDisposedAsync", contents); + } + + [Fact] + public async Task ODataBatchReaderStreamDisposeAsyncShouldBeIdempotentAsync() + { + var payload = @" + +{""@odata.type"":""NS.Customer"",""Id"":1,""Name"":""Sue""} +--batch_aed653ab-- +"; + var stream = new MemoryStream(); + Stream batchReaderStream = ODataBatchUtils.CreateBatchOperationReadStream( + CreateBatchReaderStream(payload), + this.batchOperationHeaders, + new MockODataStreamListener(new StreamWriter(stream)));// `synchronous` argument becomes irrelevant since we'll directly call DisposeAsync + + // 1st call to DisposeAsync + await batchReaderStream.DisposeAsync(); + // 2nd call to DisposeAsync + await batchReaderStream.DisposeAsync(); + + stream.Position = 0; + var contents = await new StreamReader(stream).ReadToEndAsync(); + + Assert.Equal("StreamDisposedAsync", contents); + } +#else + [Fact] + public async Task ODataBatchReaderStreamDisposeAsyncShouldInvokeStreamDisposedAsync() + { + var payload = @" + +{""@odata.type"":""NS.Customer"",""Id"":1,""Name"":""Sue""} +--batch_aed653ab-- +"; + var stream = new MemoryStream(); + using (Stream batchReaderStream = ODataBatchUtils.CreateBatchOperationReadStream( + CreateBatchReaderStream(payload), + this.batchOperationHeaders, + new MockODataStreamListener(new StreamWriter(stream)), + synchronous: false)) + { + } + + stream.Position = 0; + var contents = await new StreamReader(stream).ReadToEndAsync(); + + Assert.Equal("StreamDisposedAsync", contents); + } +#endif + + #endregion BatchOperationReadStream + #region BatchOperationWriteStream + + [Theory] + [InlineData(true, "StreamDisposed")] + [InlineData(false, "StreamDisposedAsync")] + public void ODataBatchWriteStreamDisposeShouldInvokeStreamDisposed(bool synchronous, string expected) + { + var stream = new MemoryStream(); + using (Stream batchWriteStream = ODataBatchUtils.CreateBatchOperationWriteStream( + new MemoryStream(), + new MockODataStreamListener(new StreamWriter(stream)), + synchronous: synchronous)) + { + } + + stream.Position = 0; + var contents = new StreamReader(stream).ReadToEnd(); + + Assert.Equal(expected, contents); + } + + [Theory] + [InlineData(true, "StreamDisposed")] + [InlineData(false, "StreamDisposedAsync")] + public void ODataBatchWriterStreamDisposeShouldBeIdempotent(bool synchronous, string expected) + { + var stream = new MemoryStream(); + Stream batchWriteStream = ODataBatchUtils.CreateBatchOperationWriteStream( + new MemoryStream(), + new MockODataStreamListener(new StreamWriter(stream)), + synchronous: synchronous); + + // 1st call to Dispose + batchWriteStream.Dispose(); + // 2nd call to Dispose + batchWriteStream.Dispose(); + + stream.Position = 0; + var contents = new StreamReader(stream).ReadToEnd(); + + Assert.Equal(expected, contents); + } + +#if NETCOREAPP3_1_OR_GREATER + [Fact] + public async Task ODataBatchWriteStreamDisposeAsyncShouldInvokeStreamDisposedAsync() + { + var stream = new MemoryStream(); + await using (Stream batchWriteStream = ODataBatchUtils.CreateBatchOperationWriteStream( + new MemoryStream(), + new MockODataStreamListener(new StreamWriter(stream))))// `synchronous` argument becomes irrelevant since we'll directly call DisposeAsync + { + } + + stream.Position = 0; + var contents = await new StreamReader(stream).ReadToEndAsync(); + + Assert.Equal("StreamDisposedAsync", contents); + } + + [Fact] + public async Task ODataBatchWriteStreamDisposeAsyncShouldBeIdempotentAsync() + { + var stream = new MemoryStream(); + Stream batchWriteStream = ODataBatchUtils.CreateBatchOperationWriteStream( + new MemoryStream(), + new MockODataStreamListener(new StreamWriter(stream)));// `synchronous` argument becomes irrelevant since we'll directly call DisposeAsync + + // 1st call to DisposeAsync + await batchWriteStream.DisposeAsync(); + // 2nd call to DisposeAsync + await batchWriteStream.DisposeAsync(); + + stream.Position = 0; + var contents = await new StreamReader(stream).ReadToEndAsync(); + + Assert.Equal("StreamDisposedAsync", contents); + } +#else + [Fact] + public async Task ODataBatchWriteStreamDisposeAsyncShouldInvokeStreamDisposedAsync() + { + var stream = new MemoryStream(); + using (Stream batchWriteStream = ODataBatchUtils.CreateBatchOperationWriteStream( + new MemoryStream(), + new MockODataStreamListener(new StreamWriter(stream)), + synchronous: false)) + { + } + + stream.Position = 0; + var contents = await new StreamReader(stream).ReadToEndAsync(); + + Assert.Equal("StreamDisposedAsync", contents); + } +#endif + + #endregion BatchOperationWriteStream + + private void InitializeEdmModel() + { + this.model = new EdmModel(); + + var orderEntityType = new EdmEntityType("NS", "Order", /*baseType*/ null, /*isAbstract*/ false, /*isOpen*/ true); + var customerEntityType = new EdmEntityType("NS", "Customer"); + + var orderIdProperty = orderEntityType.AddStructuralProperty("Id", EdmPrimitiveTypeKind.Int32); + orderEntityType.AddKeys(orderIdProperty); + orderEntityType.AddStructuralProperty("CustomerId", EdmPrimitiveTypeKind.Int32); + orderEntityType.AddStructuralProperty("Amount", EdmPrimitiveTypeKind.Decimal); + var customerNavProperty = orderEntityType.AddUnidirectionalNavigation( + new EdmNavigationPropertyInfo + { + Name = "Customer", + Target = customerEntityType, + TargetMultiplicity = EdmMultiplicity.ZeroOrOne + }); + model.AddElement(orderEntityType); + + var customerIdProperty = customerEntityType.AddStructuralProperty("Id", EdmPrimitiveTypeKind.Int32); + customerEntityType.AddKeys(customerIdProperty); + customerEntityType.AddStructuralProperty("Name", EdmPrimitiveTypeKind.String); + var ordersNavProperty = customerEntityType.AddUnidirectionalNavigation( + new EdmNavigationPropertyInfo + { + Name = "Orders", + Target = orderEntityType, + TargetMultiplicity = EdmMultiplicity.Many + }); + this.model.AddElement(customerEntityType); + + var entityContainer = new EdmEntityContainer("NS", "Container"); + this.model.AddElement(entityContainer); + + var orderEntitySet = entityContainer.AddEntitySet("Orders", orderEntityType); + var customerEntitySet = entityContainer.AddEntitySet("Customers", customerEntityType); + + orderEntitySet.AddNavigationTarget(customerNavProperty, customerEntitySet); + customerEntitySet.AddNavigationTarget(ordersNavProperty, orderEntitySet); + } + + private ODataMultipartMixedBatchReaderStream CreateBatchReaderStream(string payload, bool synchronous = true, bool isRequest = true) + { + var multipartMixedBatchInputContext = CreateMultipartMixedBatchInputContext( + payload, + isRequest, + synchronous: synchronous); + + return new ODataMultipartMixedBatchReaderStream( + multipartMixedBatchInputContext, + batchRequestBoundary, + MediaTypeUtils.EncodingUtf8NoPreamble); + } + + /// + /// Sets up an ODataMultipartMixedBatchReader, then runs the given test code + /// + private void SetupMultipartMixedBatchReaderAndRunTest( + string payload, + Action action, + string batchBoundary, + bool isRequest = true) + { + var multipartMixedBatchInputContext = CreateMultipartMixedBatchInputContext( + payload, + isRequest, + synchronous: true); + var multipartMixedBatchReader = new ODataMultipartMixedBatchReader( + multipartMixedBatchInputContext, + batchBoundary, + MediaTypeUtils.EncodingUtf8NoPreamble, + synchronous: true); + + action(multipartMixedBatchReader); + } + + /// + /// Sets up an ODataMultipartMixedBatchReader, then runs the given test code asynchronously + /// + private async Task SetupMultipartMixedBatchReaderAndRunTestAsync( + string payload, + Func func, + string batchBoundary, + bool isRequest = true) + { + var multipartMixedBatchInputContext = CreateMultipartMixedBatchInputContext( + payload, + isRequest, + synchronous: false); + var multipartMixedBatchReader = new ODataMultipartMixedBatchReader( + multipartMixedBatchInputContext, + batchBoundary, + MediaTypeUtils.EncodingUtf8NoPreamble, + synchronous: false); + + await func(multipartMixedBatchReader); + } + + private ODataMultipartMixedBatchInputContext CreateMultipartMixedBatchInputContext( + string payload, + bool isRequest = true, + bool synchronous = true) + { + var encoding = MediaTypeUtils.EncodingUtf8NoPreamble; + var messageInfo = new ODataMessageInfo + { + MessageStream = new MemoryStream(encoding.GetBytes(payload)), + MediaType = this.mediaType, + Encoding = encoding, + IsResponse = !isRequest, + IsAsync = !synchronous + }; + + return new ODataMultipartMixedBatchInputContext(ODataFormat.Batch, messageInfo, this.messageReaderSettings); + } + + private class MockODataStreamListener : IODataStreamListener + { + private TextWriter writer; + + public MockODataStreamListener(TextWriter writer) + { + this.writer = writer; + } + + public void StreamDisposed() + { + writer.Write("StreamDisposed"); + writer.Flush(); + } + + public async Task StreamDisposedAsync() + { + await writer.WriteAsync("StreamDisposedAsync").ConfigureAwait(false); + await writer.FlushAsync().ConfigureAwait(false); + } + + public void StreamRequested() + { + throw new NotImplementedException(); + } + + public Task StreamRequestedAsync() + { + throw new NotImplementedException(); + } + } + } +}