Skip to content

Commit

Permalink
add bulkupdateasync
Browse files Browse the repository at this point in the history
  • Loading branch information
ElizabethOkerio committed Apr 28, 2023
1 parent a526a32 commit c0741e4
Show file tree
Hide file tree
Showing 9 changed files with 340 additions and 21 deletions.
6 changes: 5 additions & 1 deletion src/Microsoft.OData.Client/BaseSaveResult.cs
Original file line number Diff line number Diff line change
Expand Up @@ -494,7 +494,11 @@ protected override void AsyncEndGetResponse(IAsyncResult asyncResult)
if (!this.IsBatchRequest)
{
this.HandleOperationResponse(responseMessage);
this.HandleOperationResponseHeaders((HttpStatusCode)responseMessage.StatusCode, new HeaderCollection(responseMessage));

if (!Util.IsBulkUpdate(this.Options))
{
this.HandleOperationResponseHeaders((HttpStatusCode)responseMessage.StatusCode, new HeaderCollection(responseMessage));
}
}

Util.DebugInjectFault("SaveAsyncResult::AsyncEndGetResponse_BeforeGetStream");
Expand Down
90 changes: 88 additions & 2 deletions src/Microsoft.OData.Client/BulkUpdateSaveResult.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Diagnostics.CodeAnalysis;
using System.Globalization;
using System.IO;
using System.Linq;
Expand All @@ -18,6 +19,7 @@ namespace Microsoft.OData.Client
/// <summary>
/// Handles the bulk update requests and responses (both sync and async)
/// </summary>
[SuppressMessage("Microsoft.Design", "CA1001:TypesThatOwnDisposableFieldsShouldBeDisposable", Justification = "The response stream is disposed by the message reader we create over it which we dispose inside the enumerator.")]
internal class BulkUpdateSaveResult : BaseSaveResult
{
#region Private Fields
Expand Down Expand Up @@ -99,11 +101,70 @@ protected override Stream ResponseStream
get { return this.responseStream; }
}

/// <summary>
/// Asynchronous bulk update request.
/// </summary>
/// <typeparam name="T">The type of the top-level objects to be deep-updated.</typeparam>
/// <param name="objects">The top-level objects of the type to be deep updated.</param>
internal void BeginBulkUpdateRequest<T>(params T[] objects)
{
PerRequest pereq = null;

if (objects == null || objects.Length == 0)
{
throw Error.Argument(Strings.Util_EmptyArray, nameof(objects));
}

BuildDescriptorGraph(this.ChangedEntries, true, objects);

try
{
ODataRequestMessageWrapper bulkUpdateRequestMessage = this.GenerateBulkUpdateRequest();
this.Abortable = bulkUpdateRequestMessage;

if (bulkUpdateRequestMessage != null)
{
bulkUpdateRequestMessage.SetContentLengthHeader();
this.perRequest = pereq = new PerRequest();
pereq.Request = bulkUpdateRequestMessage;
pereq.RequestContentStream = bulkUpdateRequestMessage.CachedRequestStream;

AsyncStateBag asyncStateBag = new AsyncStateBag(pereq);

this.responseStream = new MemoryStream();

IAsyncResult asyncResult = BaseAsyncResult.InvokeAsync(bulkUpdateRequestMessage.BeginGetRequestStream, this.AsyncEndGetRequestStream, asyncStateBag);

pereq.SetRequestCompletedSynchronously(asyncResult.CompletedSynchronously);
}
else
{
this.SetCompleted();

if (this.CompletedSynchronously)
{
this.HandleCompleted(pereq);
}
}
}
catch (Exception e)
{
this.HandleFailure(pereq, e);
throw;
}
finally
{
this.HandleCompleted(pereq);
}

Debug.Assert((this.CompletedSynchronously && this.IsCompleted) || !this.CompletedSynchronously, "sync without complete");
}

/// <summary>
/// Synchronous bulk update request.
/// </summary>
/// <typeparam name="T"> The type of the top-level objects to be deep-updated.</typeparam>
/// <param name="objects"> The top-level objects of the type to be deep updated.</param>
/// <typeparam name="T">The type of the top-level objects to be deep-updated.</typeparam>
/// <param name="objects">The top-level objects of the type to be deep updated.</param>
internal void BulkUpdateRequest<T>(params T[] objects)
{
if (objects == null || objects.Length == 0)
Expand Down Expand Up @@ -136,6 +197,31 @@ internal void BulkUpdateRequest<T>(params T[] objects)
}
}

/// <summary>Reads and stores response data for the bulk update request.</summary>
/// <param name="pereq">The completed per request object.</param>
protected override void FinishCurrentChange(PerRequest pereq)
{
base.FinishCurrentChange(pereq);

Debug.Assert(this.ResponseStream != null, "this.HttpWebResponseStream != null");
Debug.Assert((this.ResponseStream as MemoryStream) != null, "(this.HttpWebResponseStream as MemoryStream) != null");

if (this.ResponseStream.Position != 0)
{
// Set the stream to the start position and then parse the response and cache it
this.ResponseStream.Position = 0;
this.HandleBulkUpdateResponse(this.batchResponseMessage, this.ResponseStream);
}
else
{
this.HandleBulkUpdateResponse(this.batchResponseMessage, null);
}

pereq.Dispose();
this.perRequest = null;
this.SetCompleted();
}

/// <summary>
/// This method processes all the changed descriptors in the entity tracker.
/// It loops through all the descriptors and creates relationships between the descriptors if any.
Expand Down
86 changes: 68 additions & 18 deletions src/Microsoft.OData.Client/DataServiceContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2271,6 +2271,74 @@ public virtual DataServiceResponse SaveChanges(SaveChangesOptions options)

#endregion

#region BulkUpdate, BulkUpdateAsync, BeginBulkUpdate, EndBulkUpdate

/// <summary>
/// Processes bulk update requests
/// </summary>
/// <typeparam name="T">The type of top-level objects to be deep updated.</typeparam>
/// <param name="objects">The top-level objects of the type to be deep updated.</param>
public virtual DataServiceResponse BulkUpdate<T>(params T[] objects)
{
if (objects == null || objects.Length == 0)
{
throw Error.Argument(Strings.Util_EmptyArray, nameof(objects));
}

BulkUpdateSaveResult result = new BulkUpdateSaveResult(this, Util.BulkUpdateMethodName, SaveChangesOptions.BulkUpdate, callback: null, state: null);
result.BulkUpdateRequest(objects);

return result.EndRequest();
}

/// <summary>
/// Asynchronously processes a bulk update request.
/// </summary>
/// <typeparam name="T">The type of top-level objects to be deep updated.</typeparam>
/// <param name="objects">The top-level objects of the type to be deep updated. </param>
/// <returns>A task representing the <see cref="DataServiceResponse"/> that holds the result of a bulk operation. </returns>
public virtual Task<DataServiceResponse> BulkUpdateAsync<T>(params T[] objects)
{
return this.BulkUpdateAsync(CancellationToken.None, objects);
}

/// <summary>Asynchronously processes a bulk update request</summary>
/// <typeparam name="T">The type of top-level objects to be deep updated.</typeparam>
/// <param name="objects">The top-level objects of the type to be deep updated.</param>
/// <returns>A task representing the <see cref="DataServiceResponse"/> that holds the result of a bulk operation.</returns>
public virtual Task<DataServiceResponse> BulkUpdateAsync<T>(CancellationToken cancellationToken, params T[] objects)
{
return FromAsync((objectsArg, callback, state) => BeginBulkUpdate(callback, state, objectsArg), EndBulkUpdate, objects, cancellationToken);
}

/// <summary>Asynchronously submits top-level objects to be deep-updated to the data service.</summary>
/// <param name="callback">The delegate that is called when a response to the bulk update request is received.</param>
/// <param name="state">User-defined state object that is used to pass context data to the callback method.</param>
/// <returns>An<see cref="System.IAsyncResult" /> object that is used to track the status of the asynchronous operation. </returns>
internal virtual IAsyncResult BeginBulkUpdate<T>(AsyncCallback callback, object state, params T[] objects)
{
if (objects == null || objects.Length == 0)
{
throw Error.Argument(Strings.Util_EmptyArray, nameof(objects));
}

BulkUpdateSaveResult result = new BulkUpdateSaveResult(this, Util.BulkUpdateMethodName, SaveChangesOptions.BulkUpdate, callback, state);
result.BeginBulkUpdateRequest(objects);

return result;
}

/// <summary>Called to complete the <see cref="BeginBulkUpdate{T}(AsyncCallback, object, T[])"/>.</summary>
/// <param name="asyncResult">An <see cref="System.IAsyncResult" /> that represents the status of the asynchronous operation.</param>
/// <returns>The DataServiceResponse object that holds the result of the bulk update operation.</returns>
internal virtual DataServiceResponse EndBulkUpdate(IAsyncResult asyncResult)
{
BulkUpdateSaveResult result = BaseAsyncResult.EndExecute<BulkUpdateSaveResult>(this, Util.BulkUpdateMethodName, asyncResult);
return result.EndRequest();
}

#endregion

#region Add, Attach, Delete, Detach, Update, TryGetEntity, TryGetUri

/// <summary>Adds the specified link to the set of objects the <see cref="Microsoft.OData.Client.DataServiceContext" /> is tracking.</summary>
Expand Down Expand Up @@ -2785,24 +2853,6 @@ public virtual bool TryGetUri(object entity, out Uri identity)
return identity != null;
}

/// <summary>
/// Processes bulk update requests
/// </summary>
/// <typeparam name="T">The type of top-level objects to be deep updated.</typeparam>
/// <param name="objects">The top-level objects of the type to be deep updated.</param>
public virtual DataServiceResponse BulkUpdate<T>(params T[] objects)
{
if (objects == null || objects.Length == 0)
{
throw Error.Argument(Strings.Util_EmptyArray, nameof(objects));
}

BulkUpdateSaveResult result = new BulkUpdateSaveResult(this, Util.SaveChangesMethodName, SaveChangesOptions.BulkUpdate, callback: null, state: null);
result.BulkUpdateRequest(objects);

return result.EndRequest();
}

#endregion

#region FromAsync
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,5 @@ Microsoft.OData.Client.ReadingDeltaFeedArgs.ReadingDeltaFeedArgs(Microsoft.OData
Microsoft.OData.Client.SaveChangesOptions.BulkUpdate = 128 -> Microsoft.OData.Client.SaveChangesOptions
Microsoft.OData.Client.SendingRequest2EventArgs.IsBulkUpdate.get -> bool
virtual Microsoft.OData.Client.DataServiceContext.BulkUpdate<T>(params T[] objects) -> Microsoft.OData.Client.DataServiceResponse
virtual Microsoft.OData.Client.DataServiceContext.BulkUpdateAsync<T>(params T[] objects) -> System.Threading.Tasks.Task<Microsoft.OData.Client.DataServiceResponse>
virtual Microsoft.OData.Client.DataServiceContext.BulkUpdateAsync<T>(System.Threading.CancellationToken cancellationToken, params T[] objects) -> System.Threading.Tasks.Task<Microsoft.OData.Client.DataServiceResponse>
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,5 @@ Microsoft.OData.Client.ReadingDeltaFeedArgs.ReadingDeltaFeedArgs(Microsoft.OData
Microsoft.OData.Client.SaveChangesOptions.BulkUpdate = 128 -> Microsoft.OData.Client.SaveChangesOptions
Microsoft.OData.Client.SendingRequest2EventArgs.IsBulkUpdate.get -> bool
virtual Microsoft.OData.Client.DataServiceContext.BulkUpdate<T>(params T[] objects) -> Microsoft.OData.Client.DataServiceResponse
virtual Microsoft.OData.Client.DataServiceContext.BulkUpdateAsync<T>(params T[] objects) -> System.Threading.Tasks.Task<Microsoft.OData.Client.DataServiceResponse>
virtual Microsoft.OData.Client.DataServiceContext.BulkUpdateAsync<T>(System.Threading.CancellationToken cancellationToken, params T[] objects) -> System.Threading.Tasks.Task<Microsoft.OData.Client.DataServiceResponse>
3 changes: 3 additions & 0 deletions src/Microsoft.OData.Client/Util.cs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@ internal static class Util
/// <summary>Method name for the SaveChanges method.</summary>
internal const string SaveChangesMethodName = "SaveChanges";

/// <summary>Method name for the BulkUpdate method.</summary>
internal const string BulkUpdateMethodName = "BulkUpdate";

/// <summary>
/// The number of components of version.
/// </summary>
Expand Down
Loading

0 comments on commit c0741e4

Please sign in to comment.