Skip to content

Commit

Permalink
Fixed batch exception handling
Browse files Browse the repository at this point in the history
Fixes #575
  • Loading branch information
MarkMpn committed Nov 6, 2024
1 parent f5cb3cd commit 3343c79
Showing 1 changed file with 77 additions and 64 deletions.
141 changes: 77 additions & 64 deletions MarkMpn.Sql4Cds.Engine/ExecutionPlan/BaseDmlNode.cs
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,15 @@ public void Dispose()
}
}

class ParallelThreadState
{
public IOrganizationService Service { get; set; }

public ExecuteMultipleRequest EMR { get; set; }

public bool Error { get; set; }
}

/// <summary>
/// The SQL string that the query was converted from
/// </summary>
Expand Down Expand Up @@ -439,105 +448,109 @@ protected void ExecuteDmlOperation(DataSource dataSource, IQueryExecutionOptions
#endif
Interlocked.Increment(ref threadCount);

return new { Service = service, EMR = default(ExecuteMultipleRequest) };
return new ParallelThreadState { Service = service, EMR = default(ExecuteMultipleRequest), Error = false };
},
(entity, loopState, index, threadLocalState) =>
{
if (options.CancellationToken.IsCancellationRequested)
try
{
loopState.Stop();
return threadLocalState;
}
if (options.CancellationToken.IsCancellationRequested)
{
loopState.Stop();
return threadLocalState;
}

var request = requestGenerator(entity);
var request = requestGenerator(entity);

if (BypassCustomPluginExecution)
request.Parameters["BypassCustomPluginExecution"] = true;
if (BypassCustomPluginExecution)
request.Parameters["BypassCustomPluginExecution"] = true;

if (BatchSize == 1)
{
var newCount = Interlocked.Increment(ref inProgressCount);
var progress = (double)newCount / entities.Count;
if (BatchSize == 1)
{
var newCount = Interlocked.Increment(ref inProgressCount);
var progress = (double)newCount / entities.Count;

if (threadCount < 2)
options.Progress(progress, $"{operationNames.InProgressUppercase} {newCount:N0} of {entities.Count:N0} {GetDisplayName(0, meta)} ({progress:P0})...");
else
options.Progress(progress, $"{operationNames.InProgressUppercase} {newCount - threadCount + 1:N0}-{newCount:N0} of {entities.Count:N0} {GetDisplayName(0, meta)} ({progress:P0}, {threadCount:N0} threads)...");
if (threadCount < 2)
options.Progress(progress, $"{operationNames.InProgressUppercase} {newCount:N0} of {entities.Count:N0} {GetDisplayName(0, meta)} ({progress:P0})...");
else
options.Progress(progress, $"{operationNames.InProgressUppercase} {newCount - threadCount + 1:N0}-{newCount:N0} of {entities.Count:N0} {GetDisplayName(0, meta)} ({progress:P0}, {threadCount:N0} threads)...");

while (true)
{
try
while (true)
{
var response = dataSource.Execute(threadLocalState.Service, request);
Interlocked.Increment(ref count);
try
{
var response = dataSource.Execute(threadLocalState.Service, request);
Interlocked.Increment(ref count);

responseHandler?.Invoke(response);
break;
}
catch (FaultException<OrganizationServiceFault> ex)
{
if (ex.Detail.ErrorCode == 429 || // Virtual/elastic tables
ex.Detail.ErrorCode == -2147015902 || // Number of requests exceeded the limit of 6000 over time window of 300 seconds.
ex.Detail.ErrorCode == -2147015903 || // Combined execution time of incoming requests exceeded limit of 1,200,000 milliseconds over time window of 300 seconds. Decrease number of concurrent requests or reduce the duration of requests and try again later.
ex.Detail.ErrorCode == -2147015898) // Number of concurrent requests exceeded the limit of 52.
responseHandler?.Invoke(response);
break;
}
catch (FaultException<OrganizationServiceFault> ex)
{
// In case throttling isn't handled by normal retry logic in the service client
var retryAfterSeconds = 2;
if (ex.Detail.ErrorCode == 429 || // Virtual/elastic tables
ex.Detail.ErrorCode == -2147015902 || // Number of requests exceeded the limit of 6000 over time window of 300 seconds.
ex.Detail.ErrorCode == -2147015903 || // Combined execution time of incoming requests exceeded limit of 1,200,000 milliseconds over time window of 300 seconds. Decrease number of concurrent requests or reduce the duration of requests and try again later.
ex.Detail.ErrorCode == -2147015898) // Number of concurrent requests exceeded the limit of 52.
{
// In case throttling isn't handled by normal retry logic in the service client
var retryAfterSeconds = 2;

if (ex.Detail.ErrorDetails.TryGetValue("Retry-After", out var retryAfter) && (retryAfter is int || retryAfter is string s && Int32.TryParse(s, out _)))
retryAfterSeconds = Convert.ToInt32(retryAfter);
if (ex.Detail.ErrorDetails.TryGetValue("Retry-After", out var retryAfter) && (retryAfter is int || retryAfter is string s && Int32.TryParse(s, out _)))
retryAfterSeconds = Convert.ToInt32(retryAfter);

Thread.Sleep(retryAfterSeconds * 1000);
continue;
}
Thread.Sleep(retryAfterSeconds * 1000);
continue;
}

if (FilterErrors(context, request, ex.Detail))
{
if (ContinueOnError)
fault = fault ?? ex.Detail;
else
throw;
}
if (FilterErrors(context, request, ex.Detail))
{
if (ContinueOnError)
fault = fault ?? ex.Detail;
else
throw;
}

Interlocked.Increment(ref errorCount);
break;
Interlocked.Increment(ref errorCount);
break;
}
}
}
}
else
{
if (threadLocalState.EMR == null)
else
{
threadLocalState = new
if (threadLocalState.EMR == null)
{
threadLocalState.Service,
EMR = new ExecuteMultipleRequest
threadLocalState.EMR = new ExecuteMultipleRequest
{
Requests = new OrganizationRequestCollection(),
Settings = new ExecuteMultipleSettings
{
ContinueOnError = IgnoresSomeErrors,
ReturnResponses = responseHandler != null
}
}
};
}
};
}

threadLocalState.EMR.Requests.Add(request);
threadLocalState.EMR.Requests.Add(request);

if (threadLocalState.EMR.Requests.Count == BatchSize)
{
ProcessBatch(threadLocalState.EMR, threadCount, ref count, ref inProgressCount, ref errorCount, entities, operationNames, meta, options, dataSource, threadLocalState.Service, context, responseHandler, ref fault);
if (threadLocalState.EMR.Requests.Count == BatchSize)
{
ProcessBatch(threadLocalState.EMR, threadCount, ref count, ref inProgressCount, ref errorCount, entities, operationNames, meta, options, dataSource, threadLocalState.Service, context, responseHandler, ref fault);

threadLocalState = new { threadLocalState.Service, EMR = default(ExecuteMultipleRequest) };
threadLocalState.EMR = null;
}
}
}

return threadLocalState;
return threadLocalState;
}
catch
{
threadLocalState.Error = true;
throw;
}
},
(threadLocalState) =>
{
if (threadLocalState.EMR != null)
if (threadLocalState.EMR != null && !threadLocalState.Error)
ProcessBatch(threadLocalState.EMR, threadCount, ref count, ref inProgressCount, ref errorCount, entities, operationNames, meta, options, dataSource, threadLocalState.Service, context, responseHandler, ref fault);

Interlocked.Decrement(ref threadCount);
Expand Down

0 comments on commit 3343c79

Please sign in to comment.