Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add bulkupdateasync #2655

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion src/Microsoft.OData.Client/BaseSaveResult.cs
Original file line number Diff line number Diff line change
Expand Up @@ -494,7 +494,11 @@ protected override void AsyncEndGetResponse(IAsyncResult asyncResult)
if (!this.IsBatchRequest)
{
this.HandleOperationResponse(responseMessage);
this.HandleOperationResponseHeaders((HttpStatusCode)responseMessage.StatusCode, new HeaderCollection(responseMessage));

if (!Util.IsBulkUpdate(this.Options))
ElizabethOkerio marked this conversation as resolved.
Show resolved Hide resolved
{
this.HandleOperationResponseHeaders((HttpStatusCode)responseMessage.StatusCode, new HeaderCollection(responseMessage));
}
}

Util.DebugInjectFault("SaveAsyncResult::AsyncEndGetResponse_BeforeGetStream");
Expand Down
90 changes: 88 additions & 2 deletions src/Microsoft.OData.Client/BulkUpdateSaveResult.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Diagnostics.CodeAnalysis;
using System.Globalization;
using System.IO;
using System.Linq;
Expand All @@ -18,6 +19,7 @@ namespace Microsoft.OData.Client
/// <summary>
/// Handles the bulk update requests and responses (both sync and async)
/// </summary>
[SuppressMessage("Microsoft.Design", "CA1001:TypesThatOwnDisposableFieldsShouldBeDisposable", Justification = "The response stream is disposed by the message reader we create over it which we dispose inside the enumerator.")]
internal class BulkUpdateSaveResult : BaseSaveResult
{
#region Private Fields
Expand Down Expand Up @@ -99,11 +101,70 @@ protected override Stream ResponseStream
get { return this.responseStream; }
}

/// <summary>
/// Asynchronous bulk update request.
/// </summary>
/// <typeparam name="T">The type of the top-level objects to be deep-updated.</typeparam>
/// <param name="objects">The top-level objects of the type to be deep updated.</param>
internal void BeginBulkUpdateRequest<T>(params T[] objects)
{
PerRequest pereq = null;

if (objects == null || objects.Length == 0)
ElizabethOkerio marked this conversation as resolved.
Show resolved Hide resolved
{
throw Error.Argument(Strings.Util_EmptyArray, nameof(objects));
}

BuildDescriptorGraph(this.ChangedEntries, true, objects);

try
{
ODataRequestMessageWrapper bulkUpdateRequestMessage = this.GenerateBulkUpdateRequest();
this.Abortable = bulkUpdateRequestMessage;

if (bulkUpdateRequestMessage != null)
ElizabethOkerio marked this conversation as resolved.
Show resolved Hide resolved
{
bulkUpdateRequestMessage.SetContentLengthHeader();
this.perRequest = pereq = new PerRequest();
pereq.Request = bulkUpdateRequestMessage;
pereq.RequestContentStream = bulkUpdateRequestMessage.CachedRequestStream;

AsyncStateBag asyncStateBag = new AsyncStateBag(pereq);

this.responseStream = new MemoryStream();

IAsyncResult asyncResult = BaseAsyncResult.InvokeAsync(bulkUpdateRequestMessage.BeginGetRequestStream, this.AsyncEndGetRequestStream, asyncStateBag);

pereq.SetRequestCompletedSynchronously(asyncResult.CompletedSynchronously);
}
else
{
this.SetCompleted();

if (this.CompletedSynchronously)
{
this.HandleCompleted(pereq);
}
}
}
catch (Exception e)
{
this.HandleFailure(pereq, e);
throw;
}
ElizabethOkerio marked this conversation as resolved.
Show resolved Hide resolved
finally
{
this.HandleCompleted(pereq);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does request message get disposed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes we dispose the request object.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where is it getting disposed?

I see that we're disposing perReq at some point, but looking at the source code of PerRequest.Dispose(), it doesn't seem to be disposing it's PerRequest.Request object. So it's still not clear to me at what point we're disposing this.bulkUpdateRequestMessage.

Copy link
Contributor Author

@ElizabethOkerio ElizabethOkerio Apr 27, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

perReq is a wrapper of the request object. When it is disposed it disposes its contents. We assign this.bulkUpdateRequestMessage to pereq.Request. When pereq.Dispose() is called, pereq.Request is set to null. I don't think we can call Dispose() on the perRequest.Request object. ODataRequestMessageWrapper does not implement the IDisposable interface.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. It's strange that it doesn't implement IDisposable, seems like design flaw. I wonder whether we may have any resource leaks since at least the underlying HttpClientRequestMessage is disposable.

}

Debug.Assert((this.CompletedSynchronously && this.IsCompleted) || !this.CompletedSynchronously, "sync without complete");
}

/// <summary>
/// Synchronous bulk update request.
/// </summary>
/// <typeparam name="T"> The type of the top-level objects to be deep-updated.</typeparam>
/// <param name="objects"> The top-level objects of the type to be deep updated.</param>
/// <typeparam name="T">The type of the top-level objects to be deep-updated.</typeparam>
/// <param name="objects">The top-level objects of the type to be deep updated.</param>
internal void BulkUpdateRequest<T>(params T[] objects)
{
if (objects == null || objects.Length == 0)
Expand Down Expand Up @@ -136,6 +197,31 @@ internal void BulkUpdateRequest<T>(params T[] objects)
}
}

/// <summary>Reads and stores response data for the bulk update request.</summary>
/// <param name="pereq">The completed per request object.</param>
protected override void FinishCurrentChange(PerRequest pereq)
{
base.FinishCurrentChange(pereq);

Debug.Assert(this.ResponseStream != null, "this.HttpWebResponseStream != null");
Debug.Assert((this.ResponseStream as MemoryStream) != null, "(this.HttpWebResponseStream as MemoryStream) != null");

if (this.ResponseStream.Position != 0)
{
// Set the stream to the start position and then parse the response and cache it
this.ResponseStream.Position = 0;
this.HandleBulkUpdateResponse(this.batchResponseMessage, this.ResponseStream);
ElizabethOkerio marked this conversation as resolved.
Show resolved Hide resolved
}
else
{
this.HandleBulkUpdateResponse(this.batchResponseMessage, null);
}

pereq.Dispose();
this.perRequest = null;
this.SetCompleted();
}

/// <summary>
/// This method processes all the changed descriptors in the entity tracker.
/// It loops through all the descriptors and creates relationships between the descriptors if any.
Expand Down
86 changes: 68 additions & 18 deletions src/Microsoft.OData.Client/DataServiceContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2271,6 +2271,74 @@ public virtual DataServiceResponse SaveChanges(SaveChangesOptions options)

#endregion

#region BulkUpdate, BulkUpdateAsync, BeginBulkUpdate, EndBulkUpdate

/// <summary>
/// Processes bulk update requests
/// </summary>
/// <typeparam name="T">The type of top-level objects to be deep updated.</typeparam>
/// <param name="objects">The top-level objects of the type to be deep updated.</param>
public virtual DataServiceResponse BulkUpdate<T>(params T[] objects)
ElizabethOkerio marked this conversation as resolved.
Show resolved Hide resolved
{
if (objects == null || objects.Length == 0)
{
throw Error.Argument(Strings.Util_EmptyArray, nameof(objects));
}

BulkUpdateSaveResult result = new BulkUpdateSaveResult(this, Util.BulkUpdateMethodName, SaveChangesOptions.BulkUpdate, callback: null, state: null);
result.BulkUpdateRequest(objects);

return result.EndRequest();
}

/// <summary>
/// Asynchronously processes a bulk update request.
/// </summary>
/// <typeparam name="T">The type of top-level objects to be deep updated.</typeparam>
/// <param name="objects">The top-level objects of the type to be deep updated. </param>
/// <returns>A task representing the <see cref="DataServiceResponse"/> that holds the result of a bulk operation. </returns>
public virtual Task<DataServiceResponse> BulkUpdateAsync<T>(params T[] objects)
ElizabethOkerio marked this conversation as resolved.
Show resolved Hide resolved
{
return this.BulkUpdateAsync(CancellationToken.None, objects);
}

/// <summary>Asynchronously processes a bulk update request</summary>
/// <typeparam name="T">The type of top-level objects to be deep updated.</typeparam>
/// <param name="objects">The top-level objects of the type to be deep updated.</param>
/// <returns>A task representing the <see cref="DataServiceResponse"/> that holds the result of a bulk operation.</returns>
public virtual Task<DataServiceResponse> BulkUpdateAsync<T>(CancellationToken cancellationToken, params T[] objects)
ElizabethOkerio marked this conversation as resolved.
Show resolved Hide resolved
{
return FromAsync((objectsArg, callback, state) => BeginBulkUpdate(callback, state, objectsArg), EndBulkUpdate, objects, cancellationToken);
}

/// <summary>Asynchronously submits top-level objects to be deep-updated to the data service.</summary>
/// <param name="callback">The delegate that is called when a response to the bulk update request is received.</param>
/// <param name="state">User-defined state object that is used to pass context data to the callback method.</param>
/// <returns>An<see cref="System.IAsyncResult" /> object that is used to track the status of the asynchronous operation. </returns>
internal virtual IAsyncResult BeginBulkUpdate<T>(AsyncCallback callback, object state, params T[] objects)
{
if (objects == null || objects.Length == 0)
{
throw Error.Argument(Strings.Util_EmptyArray, nameof(objects));
}

BulkUpdateSaveResult result = new BulkUpdateSaveResult(this, Util.BulkUpdateMethodName, SaveChangesOptions.BulkUpdate, callback, state);
result.BeginBulkUpdateRequest(objects);

return result;
}

/// <summary>Called to complete the <see cref="BeginBulkUpdate{T}(AsyncCallback, object, T[])"/>.</summary>
/// <param name="asyncResult">An <see cref="System.IAsyncResult" /> that represents the status of the asynchronous operation.</param>
/// <returns>The DataServiceResponse object that holds the result of the bulk update operation.</returns>
internal virtual DataServiceResponse EndBulkUpdate(IAsyncResult asyncResult)
{
BulkUpdateSaveResult result = BaseAsyncResult.EndExecute<BulkUpdateSaveResult>(this, Util.BulkUpdateMethodName, asyncResult);
return result.EndRequest();
}

#endregion

#region Add, Attach, Delete, Detach, Update, TryGetEntity, TryGetUri

/// <summary>Adds the specified link to the set of objects the <see cref="Microsoft.OData.Client.DataServiceContext" /> is tracking.</summary>
Expand Down Expand Up @@ -2785,24 +2853,6 @@ public virtual bool TryGetUri(object entity, out Uri identity)
return identity != null;
}

/// <summary>
/// Processes bulk update requests
/// </summary>
/// <typeparam name="T">The type of top-level objects to be deep updated.</typeparam>
/// <param name="objects">The top-level objects of the type to be deep updated.</param>
public virtual DataServiceResponse BulkUpdate<T>(params T[] objects)
{
if (objects == null || objects.Length == 0)
{
throw Error.Argument(Strings.Util_EmptyArray, nameof(objects));
}

BulkUpdateSaveResult result = new BulkUpdateSaveResult(this, Util.SaveChangesMethodName, SaveChangesOptions.BulkUpdate, callback: null, state: null);
result.BulkUpdateRequest(objects);

return result.EndRequest();
}

#endregion

#region FromAsync
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,5 @@ Microsoft.OData.Client.ReadingDeltaFeedArgs.ReadingDeltaFeedArgs(Microsoft.OData
Microsoft.OData.Client.SaveChangesOptions.BulkUpdate = 128 -> Microsoft.OData.Client.SaveChangesOptions
Microsoft.OData.Client.SendingRequest2EventArgs.IsBulkUpdate.get -> bool
virtual Microsoft.OData.Client.DataServiceContext.BulkUpdate<T>(params T[] objects) -> Microsoft.OData.Client.DataServiceResponse
virtual Microsoft.OData.Client.DataServiceContext.BulkUpdateAsync<T>(params T[] objects) -> System.Threading.Tasks.Task<Microsoft.OData.Client.DataServiceResponse>
virtual Microsoft.OData.Client.DataServiceContext.BulkUpdateAsync<T>(System.Threading.CancellationToken cancellationToken, params T[] objects) -> System.Threading.Tasks.Task<Microsoft.OData.Client.DataServiceResponse>
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,5 @@ Microsoft.OData.Client.ReadingDeltaFeedArgs.ReadingDeltaFeedArgs(Microsoft.OData
Microsoft.OData.Client.SaveChangesOptions.BulkUpdate = 128 -> Microsoft.OData.Client.SaveChangesOptions
Microsoft.OData.Client.SendingRequest2EventArgs.IsBulkUpdate.get -> bool
virtual Microsoft.OData.Client.DataServiceContext.BulkUpdate<T>(params T[] objects) -> Microsoft.OData.Client.DataServiceResponse
virtual Microsoft.OData.Client.DataServiceContext.BulkUpdateAsync<T>(params T[] objects) -> System.Threading.Tasks.Task<Microsoft.OData.Client.DataServiceResponse>
virtual Microsoft.OData.Client.DataServiceContext.BulkUpdateAsync<T>(System.Threading.CancellationToken cancellationToken, params T[] objects) -> System.Threading.Tasks.Task<Microsoft.OData.Client.DataServiceResponse>
3 changes: 3 additions & 0 deletions src/Microsoft.OData.Client/Util.cs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@ internal static class Util
/// <summary>Method name for the SaveChanges method.</summary>
internal const string SaveChangesMethodName = "SaveChanges";

/// <summary>Method name for the BulkUpdate method.</summary>
internal const string BulkUpdateMethodName = "BulkUpdate";

/// <summary>
/// The number of components of version.
/// </summary>
Expand Down
Loading