Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/deep insert async #2716

Merged
merged 11 commits into from
Sep 5, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/Microsoft.OData.Client/BaseSaveResult.cs
Original file line number Diff line number Diff line change
Expand Up @@ -495,7 +495,7 @@ protected override void AsyncEndGetResponse(IAsyncResult asyncResult)
{
this.HandleOperationResponse(responseMessage);

if (!Util.IsBulkUpdate(this.Options))
if (!Util.IsBulkUpdate(this.Options) && !Util.IsDeepInsert(this.Options))
KenitoInc marked this conversation as resolved.
Show resolved Hide resolved
{
this.HandleOperationResponseHeaders((HttpStatusCode)responseMessage.StatusCode, new HeaderCollection(responseMessage));
}
Expand Down
52 changes: 50 additions & 2 deletions src/Microsoft.OData.Client/DataServiceContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2339,7 +2339,7 @@ internal virtual DataServiceResponse EndBulkUpdate(IAsyncResult asyncResult)

#endregion

#region DeepInsert
#region DeepInsert, DeepInsertAsync, BeginDeepInsert, EndDeepInsert

/// <summary>
/// Processes deep insert requests. Creates a resource and its related resources or creates a resource and links it to its related existing resources in a single request.
Expand All @@ -2353,12 +2353,60 @@ public virtual DataServiceResponse DeepInsert<T>(T resource)
throw Error.ArgumentNull(nameof(resource));
}

DeepInsertSaveResult result = new DeepInsertSaveResult(this, Util.SaveChangesMethodName, SaveChangesOptions.DeepInsert, callback: null, state: null);
DeepInsertSaveResult result = new DeepInsertSaveResult(this, Util.DeepInsertMethodName, SaveChangesOptions.DeepInsert, callback: null, state: null);
result.DeepInsertRequest(resource);

return result.EndRequest();
}

/// <summary>
/// Asynchronously processes a deep insert request.
/// </summary>
/// <typeparam name="T">The type of top-level object to be deep inserted.</typeparam>
/// <param name="resource">The top-level object of the type to be deep inserted.</param>
/// <returns>A task representing the <see cref="DataServiceResponse"/> that holds the result of the deep insert operation.</returns>
public virtual Task<DataServiceResponse> DeepInsertAsync<T>(T resource)
KenitoInc marked this conversation as resolved.
Show resolved Hide resolved
{
return this.DeepInsertAsync(resource, CancellationToken.None);
}

/// <summary>
/// Asynchronously processes a deep insert request.
/// </summary>
/// <typeparam name="T">The type of top-level object to be deep inserted.</typeparam>
/// <param name="resource">The top-level object of the type to be deep inserted.</param>
/// <returns>A task representing the <see cref="DataServiceResponse"/> that holds the result of the deep insert operation.</returns>
public virtual Task<DataServiceResponse> DeepInsertAsync<T>(T resource, CancellationToken cancellationToken)
{
return FromAsync((objectsArg, callback, state) => BeginDeepInsert(callback, state, objectsArg), EndDeepInsert, resource, cancellationToken);
}

/// <summary>Asynchronously submits top-level objects to be deep inserted to the data service.</summary>
/// <param name="callback">The delegate that is called when a response to the deep insert request is received.</param>
/// <param name="state">User-defined state object that is used to pass context data to the callback method.</param>
/// <returns>An<see cref="IAsyncResult" /> object that is used to track the status of the asynchronous operation.</returns>
public virtual IAsyncResult BeginDeepInsert<T>(AsyncCallback callback, object state, T resource)
{
if (resource == null)
{
throw Error.ArgumentNull(nameof(resource));
}

DeepInsertSaveResult result = new DeepInsertSaveResult(this, Util.DeepInsertMethodName, SaveChangesOptions.DeepInsert, callback, state);
result.BeginDeepInsertRequest(resource);

return result;
}

/// <summary>Called to complete the <see cref="BeginDeepInsert{T}(AsyncCallback, object, T)"/>.</summary>
/// <param name="asyncResult">An <see cref="IAsyncResult" /> that represents the status of the asynchronous operation.</param>
/// <returns>The DataServiceResponse object that holds the result of the deep insert operation.</returns>
public virtual DataServiceResponse EndDeepInsert(IAsyncResult asyncResult)
{
DeepInsertSaveResult result = BaseAsyncResult.EndExecute<DeepInsertSaveResult>(this, Util.DeepInsertMethodName, asyncResult);
return result.EndRequest();
}

#endregion

#region Add, Attach, Delete, Detach, Update, TryGetEntity, TryGetUri
Expand Down
67 changes: 67 additions & 0 deletions src/Microsoft.OData.Client/DeepInsertSaveResult.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Diagnostics.CodeAnalysis;
using System.Globalization;
using System.IO;
using System.Linq;
Expand All @@ -20,6 +21,7 @@ namespace Microsoft.OData.Client
/// <summary>
/// Handles the deep insert requests and responses (both sync and async).
/// </summary>
[SuppressMessage("Microsoft.Design", "CA1001:TypesThatOwnDisposableFieldsShouldBeDisposable", Justification = "The response stream is disposed by the message reader we create over it which we dispose inside the enumerator.")]
internal class DeepInsertSaveResult : BaseSaveResult
{
#region Private Fields
Expand Down Expand Up @@ -118,6 +120,52 @@ internal void DeepInsertRequest<T>(T resource)
}
}

/// <summary>
/// Begins an asynchronous deep insert request.
/// </summary>
/// <typeparam name="T">The type of the top-level object to be deep inserted.</typeparam>
/// <param name="resource">The top-level object of the type to be deep inserted.</param>
internal void BeginDeepInsertRequest<T>(T resource)
{
if (resource == null)
{
throw Error.ArgumentNull(nameof(resource));
}

PerRequest peReq = null;

try
{
BuildDescriptorGraph(this.ChangedEntries, true, resource);
ODataRequestMessageWrapper deepInsertRequestMessage = this.GenerateDeepInsertRequest();
this.Abortable = deepInsertRequestMessage;
KenitoInc marked this conversation as resolved.
Show resolved Hide resolved

deepInsertRequestMessage.SetContentLengthHeader();
this.perRequest = peReq = new PerRequest();
peReq.Request = deepInsertRequestMessage;
peReq.RequestContentStream = deepInsertRequestMessage.CachedRequestStream;

AsyncStateBag asyncStateBag = new AsyncStateBag(peReq);

this.responseStream = new MemoryStream();

IAsyncResult asyncResult = BaseAsyncResult.InvokeAsync(deepInsertRequestMessage.BeginGetRequestStream, this.AsyncEndGetRequestStream, asyncStateBag);

peReq.SetRequestCompletedSynchronously(asyncResult.CompletedSynchronously);
}
catch (Exception e)
{
this.HandleFailure(peReq, e);
KenitoInc marked this conversation as resolved.
Show resolved Hide resolved
throw;
}
finally
{
this.HandleCompleted(peReq);
}

Debug.Assert((this.CompletedSynchronously && this.IsCompleted) || !this.CompletedSynchronously, "sync without complete");
}

/// <summary>
/// This method processes all the changed descriptors in the entity tracker.
/// It loops through all the descriptors and creates relationships between the descriptors if any.
Expand Down Expand Up @@ -268,6 +316,25 @@ protected override ODataRequestMessageWrapper CreateRequestMessage(string method
return this.CreateTopLevelRequest(method, requestUri, headers, httpStack, descriptor);
}

/// <summary>Read and store response data for the current change</summary>
/// <param name="peReq">The completed <see cref="BaseAsyncResult.PerRequest"/> object</param>
/// <remarks>This is called only from the async code paths, when the response to the deep insert request has been read fully.</remarks>
protected override void FinishCurrentChange(PerRequest peReq)
{
base.FinishCurrentChange(peReq);

// This resets the position in the buffered response stream to the beginning
// so that we can start reading the response.
// In this case the ResponseStream is always a MemoryStream since we cache the async response.
if (this.responseStream.Position != 0)
{
this.responseStream.Position = 0;
}

this.perRequest = null;
this.SetCompleted();
}

/// <summary>
/// Gets the materializer state to process the response.
/// </summary>
Expand Down
Original file line number Diff line number Diff line change
@@ -1 +1,5 @@
virtual Microsoft.OData.Client.DataServiceContext.DeepInsert<T>(T resource) -> Microsoft.OData.Client.DataServiceResponse
virtual Microsoft.OData.Client.DataServiceContext.BeginDeepInsert<T>(System.AsyncCallback callback, object state, T resource) -> System.IAsyncResult
virtual Microsoft.OData.Client.DataServiceContext.DeepInsert<T>(T resource) -> Microsoft.OData.Client.DataServiceResponse
virtual Microsoft.OData.Client.DataServiceContext.DeepInsertAsync<T>(T resource) -> System.Threading.Tasks.Task<Microsoft.OData.Client.DataServiceResponse>
virtual Microsoft.OData.Client.DataServiceContext.DeepInsertAsync<T>(T resource, System.Threading.CancellationToken cancellationToken) -> System.Threading.Tasks.Task<Microsoft.OData.Client.DataServiceResponse>
virtual Microsoft.OData.Client.DataServiceContext.EndDeepInsert(System.IAsyncResult asyncResult) -> Microsoft.OData.Client.DataServiceResponse
Original file line number Diff line number Diff line change
@@ -1 +1,5 @@
virtual Microsoft.OData.Client.DataServiceContext.DeepInsert<T>(T resource) -> Microsoft.OData.Client.DataServiceResponse
virtual Microsoft.OData.Client.DataServiceContext.BeginDeepInsert<T>(System.AsyncCallback callback, object state, T resource) -> System.IAsyncResult
virtual Microsoft.OData.Client.DataServiceContext.DeepInsert<T>(T resource) -> Microsoft.OData.Client.DataServiceResponse
virtual Microsoft.OData.Client.DataServiceContext.DeepInsertAsync<T>(T resource) -> System.Threading.Tasks.Task<Microsoft.OData.Client.DataServiceResponse>
virtual Microsoft.OData.Client.DataServiceContext.DeepInsertAsync<T>(T resource, System.Threading.CancellationToken cancellationToken) -> System.Threading.Tasks.Task<Microsoft.OData.Client.DataServiceResponse>
virtual Microsoft.OData.Client.DataServiceContext.EndDeepInsert(System.IAsyncResult asyncResult) -> Microsoft.OData.Client.DataServiceResponse
3 changes: 3 additions & 0 deletions src/Microsoft.OData.Client/Util.cs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@ internal static class Util
/// <summary>Method name for the BulkUpdate method.</summary>
internal const string BulkUpdateMethodName = "BulkUpdate";

/// <summary>Method name for the DeepInsert method.</summary>
internal const string DeepInsertMethodName = "DeepInsert";

/// <summary>
/// The number of components of version.
/// </summary>
Expand Down
Loading