Skip to content

Commit

Permalink
Refactored outputting log messages from DML nodes
Browse files Browse the repository at this point in the history
  • Loading branch information
MarkMpn committed Nov 10, 2023
1 parent a6b0237 commit ee540f9
Show file tree
Hide file tree
Showing 19 changed files with 61 additions and 59 deletions.
10 changes: 5 additions & 5 deletions MarkMpn.Sql4Cds.Engine/Ado/Sql4CdsDataReader.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
using System.Linq;
using System.Linq.Expressions;
using System.Text;
using System.Xml;
using MarkMpn.Sql4Cds.Engine.ExecutionPlan;
using Microsoft.SqlServer.TransactSql.ScriptDom;
using Microsoft.Xrm.Sdk;
Expand Down Expand Up @@ -62,13 +63,15 @@ public Sql4CdsDataReader(Sql4CdsCommand command, IQueryExecutionOptions options,

private bool Execute(Dictionary<string, DataTypeReference> parameterTypes, Dictionary<string, object> parameterValues)
{
var context = new NodeExecutionContext(_connection.DataSources, _options, parameterTypes, parameterValues);
IRootExecutionPlanNode logNode = null;
var context = new NodeExecutionContext(_connection.DataSources, _options, parameterTypes, parameterValues, msg => _connection.OnInfoMessage(logNode, msg));

try
{
while (_instructionPointer < _command.Plan.Length && !_options.CancellationToken.IsCancellationRequested)
{
var node = _command.Plan[_instructionPointer];
logNode = node;

if (node is IDataReaderExecutionPlanNode dataSetNode)
{
Expand All @@ -90,10 +93,7 @@ private bool Execute(Dictionary<string, DataTypeReference> parameterTypes, Dicti
else if (node is IDmlQueryExecutionPlanNode dmlNode)
{
dmlNode = (IDmlQueryExecutionPlanNode)dmlNode.Clone();
var msg = dmlNode.Execute(context, out var recordsAffected);

if (!String.IsNullOrEmpty(msg))
_connection.OnInfoMessage(dmlNode, msg);
dmlNode.Execute(context, out var recordsAffected);

_command.OnStatementCompleted(dmlNode, recordsAffected);

Expand Down
3 changes: 1 addition & 2 deletions MarkMpn.Sql4Cds.Engine/ExecutionPlan/AssignVariablesNode.cs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ class AssignVariablesNode : BaseDmlNode
[Browsable(false)]
public override bool ContinueOnError { get; set; }

public override string Execute(NodeExecutionContext context, out int recordsAffected)
public override void Execute(NodeExecutionContext context, out int recordsAffected)
{
_executionCount++;

Expand All @@ -63,7 +63,6 @@ public override string Execute(NodeExecutionContext context, out int recordsAffe
}

recordsAffected = -1;
return null;
}

/// <summary>
Expand Down
25 changes: 13 additions & 12 deletions MarkMpn.Sql4Cds.Engine/ExecutionPlan/BaseDmlNode.cs
Original file line number Diff line number Diff line change
Expand Up @@ -129,8 +129,7 @@ public void Dispose()
/// </summary>
/// <param name="context">The context in which the node is being executed</param>
/// <param name="recordsAffected">The number of records that were affected by the query</param>
/// <returns>A log message to display</returns>
public abstract string Execute(NodeExecutionContext context, out int recordsAffected);
public abstract void Execute(NodeExecutionContext context, out int recordsAffected);

/// <summary>
/// Indicates if some errors returned by the server can be silently ignored
Expand Down Expand Up @@ -539,8 +538,8 @@ protected class OperationNames
/// <param name="meta">The metadata of the entity that will be affected</param>
/// <param name="requestGenerator">A function to generate a DML request from a data source entity</param>
/// <param name="operationNames">The constant strings to use in log messages</param>
/// <returns>The final log message</returns>
protected string ExecuteDmlOperation(DataSource dataSource, IQueryExecutionOptions options, List<Entity> entities, EntityMetadata meta, Func<Entity,OrganizationRequest> requestGenerator, OperationNames operationNames, out int recordsAffected, IDictionary<string, object> parameterValues, Action<OrganizationResponse> responseHandler = null)
/// <param name="log">A callback function to be executed when a log message is generated</param>
protected void ExecuteDmlOperation(DataSource dataSource, IQueryExecutionOptions options, List<Entity> entities, EntityMetadata meta, Func<Entity,OrganizationRequest> requestGenerator, OperationNames operationNames, NodeExecutionContext context, out int recordsAffected, Action<OrganizationResponse> responseHandler = null)
{
var inProgressCount = 0;
var count = 0;
Expand Down Expand Up @@ -618,13 +617,15 @@ protected string ExecuteDmlOperation(DataSource dataSource, IQueryExecutionOptio
}
catch (FaultException<OrganizationServiceFault> ex)
{
if (FilterErrors(ex.Detail))
if (FilterErrors(context, request, ex.Detail))
{
if (ContinueOnError)
fault = fault ?? ex.Detail;
else
throw;
}

Interlocked.Increment(ref errorCount);
}
}
else
Expand All @@ -650,7 +651,7 @@ protected string ExecuteDmlOperation(DataSource dataSource, IQueryExecutionOptio

if (threadLocalState.EMR.Requests.Count == BatchSize)
{
ProcessBatch(threadLocalState.EMR, threadCount, ref count, ref inProgressCount, ref errorCount, entities, operationNames, meta, options, dataSource, threadLocalState.Service, responseHandler, ref fault);
ProcessBatch(threadLocalState.EMR, threadCount, ref count, ref inProgressCount, ref errorCount, entities, operationNames, meta, options, dataSource, threadLocalState.Service, context, responseHandler, ref fault);

threadLocalState = new { threadLocalState.Service, EMR = default(ExecuteMultipleRequest) };
}
Expand All @@ -661,7 +662,7 @@ protected string ExecuteDmlOperation(DataSource dataSource, IQueryExecutionOptio
(threadLocalState) =>
{
if (threadLocalState.EMR != null)
ProcessBatch(threadLocalState.EMR, threadCount, ref count, ref inProgressCount, ref errorCount, entities, operationNames, meta, options, dataSource, threadLocalState.Service, responseHandler, ref fault);
ProcessBatch(threadLocalState.EMR, threadCount, ref count, ref inProgressCount, ref errorCount, entities, operationNames, meta, options, dataSource, threadLocalState.Service, context, responseHandler, ref fault);

Interlocked.Decrement(ref threadCount);

Expand Down Expand Up @@ -692,8 +693,8 @@ protected string ExecuteDmlOperation(DataSource dataSource, IQueryExecutionOptio
}

recordsAffected = count;
parameterValues["@@ROWCOUNT"] = (SqlInt32)count;
return $"{count:N0} {GetDisplayName(count, meta)} {operationNames.CompletedLowercase}";
context.ParameterValues["@@ROWCOUNT"] = (SqlInt32)count;
context.Log($"{count:N0} {GetDisplayName(count, meta)} {operationNames.CompletedLowercase}");
}

protected class BulkApiErrorDetail
Expand All @@ -703,7 +704,7 @@ protected class BulkApiErrorDetail
public int StatusCode { get; set; }
}

private void ProcessBatch(ExecuteMultipleRequest req, int threadCount, ref int count, ref int inProgressCount, ref int errorCount, List<Entity> entities, OperationNames operationNames, EntityMetadata meta, IQueryExecutionOptions options, DataSource dataSource, IOrganizationService org, Action<OrganizationResponse> responseHandler, ref OrganizationServiceFault fault)
private void ProcessBatch(ExecuteMultipleRequest req, int threadCount, ref int count, ref int inProgressCount, ref int errorCount, List<Entity> entities, OperationNames operationNames, EntityMetadata meta, IQueryExecutionOptions options, DataSource dataSource, IOrganizationService org, NodeExecutionContext context, Action<OrganizationResponse> responseHandler, ref OrganizationServiceFault fault)
{
var newCount = Interlocked.Add(ref inProgressCount, req.Requests.Count);
var progress = (double)newCount / entities.Count;
Expand All @@ -727,7 +728,7 @@ private void ProcessBatch(ExecuteMultipleRequest req, int threadCount, ref int c
Interlocked.Add(ref count, req.Requests.Count - errorResponses.Count);
Interlocked.Add(ref errorCount, errorResponses.Count);

var error = errorResponses.FirstOrDefault(item => FilterErrors(item.Fault));
var error = errorResponses.FirstOrDefault(item => FilterErrors(context, req.Requests[item.RequestIndex], item.Fault));

if (error != null)
{
Expand All @@ -738,7 +739,7 @@ private void ProcessBatch(ExecuteMultipleRequest req, int threadCount, ref int c
}
}

protected virtual bool FilterErrors(OrganizationServiceFault fault)
protected virtual bool FilterErrors(NodeExecutionContext context, OrganizationRequest request, OrganizationServiceFault fault)
{
return true;
}
Expand Down
4 changes: 2 additions & 2 deletions MarkMpn.Sql4Cds.Engine/ExecutionPlan/BulkDeleteJobNode.cs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ public override void AddRequiredColumns(NodeCompilationContext context, IList<st
{
}

public string Execute(NodeExecutionContext context, out int recordsAffected)
public void Execute(NodeExecutionContext context, out int recordsAffected)
{
_executionCount++;

Expand Down Expand Up @@ -73,7 +73,7 @@ public string Execute(NodeExecutionContext context, out int recordsAffected)

recordsAffected = 1;
context.ParameterValues["@@IDENTITY"] = new SqlEntityReference(DataSource, "asyncoperation", resp.JobId);
return $"Bulk delete job started";
context.Log("Bulk delete job started");
}
}
catch (QueryExecutionException ex)
Expand Down
3 changes: 1 addition & 2 deletions MarkMpn.Sql4Cds.Engine/ExecutionPlan/DeclareVariablesNode.cs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ public override void AddRequiredColumns(NodeCompilationContext context, IList<st
{
}

public string Execute(NodeExecutionContext context, out int recordsAffected)
public void Execute(NodeExecutionContext context, out int recordsAffected)
{
_executionCount++;

Expand All @@ -57,7 +57,6 @@ public string Execute(NodeExecutionContext context, out int recordsAffected)
}

recordsAffected = -1;
return null;
}

public IRootExecutionPlanNodeInternal[] FoldQuery(NodeCompilationContext context, IList<OptimizerHint> hints)
Expand Down
10 changes: 5 additions & 5 deletions MarkMpn.Sql4Cds.Engine/ExecutionPlan/DeleteNode.cs
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ protected override void RenameSourceColumns(IDictionary<string, string> columnRe
SecondaryIdSource = secondaryIdSourceRenamed;
}

public override string Execute(NodeExecutionContext context, out int recordsAffected)
public override void Execute(NodeExecutionContext context, out int recordsAffected)
{
_executionCount++;

Expand Down Expand Up @@ -172,7 +172,7 @@ public override string Execute(NodeExecutionContext context, out int recordsAffe

using (_timer.Run())
{
return ExecuteDmlOperation(
ExecuteDmlOperation(
dataSource,
context.Options,
entities,
Expand All @@ -184,8 +184,8 @@ public override string Execute(NodeExecutionContext context, out int recordsAffe
InProgressLowercase = "deleting",
CompletedLowercase = "deleted"
},
out recordsAffected,
context.ParameterValues);
context,
out recordsAffected);
}
}
catch (QueryExecutionException ex)
Expand Down Expand Up @@ -250,7 +250,7 @@ private OrganizationRequest CreateDeleteRequest(EntityMetadata meta, Entity enti
return req;
}

protected override bool FilterErrors(OrganizationServiceFault fault)
protected override bool FilterErrors(NodeExecutionContext context, OrganizationRequest request, OrganizationServiceFault fault)
{
// Ignore errors trying to delete records that don't exist - record may have been deleted by another
// process in parallel.
Expand Down
4 changes: 2 additions & 2 deletions MarkMpn.Sql4Cds.Engine/ExecutionPlan/ExecuteAsNode.cs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ public override void AddRequiredColumns(NodeCompilationContext context, IList<st
Source.AddRequiredColumns(context, requiredColumns);
}

public override string Execute(NodeExecutionContext context, out int recordsAffected)
public override void Execute(NodeExecutionContext context, out int recordsAffected)
{
_executionCount++;

Expand Down Expand Up @@ -91,7 +91,7 @@ public override string Execute(NodeExecutionContext context, out int recordsAffe
throw new QueryExecutionException("Unexpected organization service type");

recordsAffected = -1;
return $"Impersonated user {userId}";
context.Log($"Impersonated user {userId}");
}
}
catch (QueryExecutionException ex)
Expand Down
4 changes: 2 additions & 2 deletions MarkMpn.Sql4Cds.Engine/ExecutionPlan/ExecuteMessageNode.cs
Original file line number Diff line number Diff line change
Expand Up @@ -693,10 +693,10 @@ public static ExecuteMessageNode FromMessage(ExecutableProcedureReference sproc,
return node;
}

public string Execute(NodeExecutionContext context, out int recordsAffected)
public void Execute(NodeExecutionContext context, out int recordsAffected)
{
recordsAffected = Execute(context).Count();
return "Executed " + MessageName;
context.Log("Executed " + MessageName);
}

IRootExecutionPlanNodeInternal[] IRootExecutionPlanNodeInternal.FoldQuery(NodeCompilationContext context, IList<OptimizerHint> hints)
Expand Down
2 changes: 1 addition & 1 deletion MarkMpn.Sql4Cds.Engine/ExecutionPlan/FetchXmlScan.cs
Original file line number Diff line number Diff line change
Expand Up @@ -1185,7 +1185,7 @@ private void AddSchemaAttribute(DataSource dataSource, ColumnList schema, Dictio

if (attrMetadata is LookupAttributeMetadata lookup)
{
AddSchemaAttribute(schema, aliases, fullName + "name", attrMetadata.LogicalName + "name", DataTypeHelpers.NVarChar(lookup.Targets == null || lookup.Targets.Length == 0 ? 100 : lookup.Targets.Select(e => ((StringAttributeMetadata)dataSource.Metadata[e].Attributes.SingleOrDefault(a => a.LogicalName == dataSource.Metadata[e].PrimaryNameAttribute))?.MaxLength ?? 100).Max(), dataSource.DefaultCollation, CollationLabel.Implicit), notNull);
AddSchemaAttribute(schema, aliases, fullName + "name", attrMetadata.LogicalName + "name", DataTypeHelpers.NVarChar(lookup.Targets == null || lookup.Targets.Length == 0 ? 100 : lookup.Targets.Select(e => (dataSource.Metadata[e].Attributes.SingleOrDefault(a => a.LogicalName == dataSource.Metadata[e].PrimaryNameAttribute) as StringAttributeMetadata)?.MaxLength ?? 100).Max(), dataSource.DefaultCollation, CollationLabel.Implicit), notNull);

if (lookup.Targets?.Length != 1 && lookup.AttributeType != AttributeTypeCode.PartyList)
AddSchemaAttribute(schema, aliases, fullName + "type", attrMetadata.LogicalName + "type", DataTypeHelpers.NVarChar(MetadataExtensions.EntityLogicalNameMaxLength, dataSource.DefaultCollation, CollationLabel.Implicit), notNull);
Expand Down
2 changes: 1 addition & 1 deletion MarkMpn.Sql4Cds.Engine/ExecutionPlan/FilterNode.cs
Original file line number Diff line number Diff line change
Expand Up @@ -1511,7 +1511,7 @@ protected bool TranslateMetadataCriteria(NodeCompilationContext context, Boolean
relationshipFilter = null;

var expressionCompilationContext = new ExpressionCompilationContext(context, null, null);
var expressionExecutionContext = new ExpressionExecutionContext(new NodeExecutionContext(context.DataSources, context.Options, context.ParameterTypes, null));
var expressionExecutionContext = new ExpressionExecutionContext(new NodeExecutionContext(context.DataSources, context.Options, context.ParameterTypes, null, null));

if (criteria is BooleanBinaryExpression binary)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ internal interface IDmlQueryExecutionPlanNode : IRootExecutionPlanNodeInternal
/// </summary>
/// <param name="context">The context in which the node is being executed</param>
/// <param name="recordsAffected">The number of records that were affected by the query</param>
/// <returns>A status message for the results of the query</returns>
string Execute(NodeExecutionContext context, out int recordsAffected);
void Execute(NodeExecutionContext context, out int recordsAffected);
}
}
5 changes: 3 additions & 2 deletions MarkMpn.Sql4Cds.Engine/ExecutionPlan/InsertNode.cs
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ public override void AddRequiredColumns(NodeCompilationContext context, IList<st
}

public override string Execute(NodeExecutionContext context, out int recordsAffected)
public override void Execute(NodeExecutionContext context, out int recordsAffected)
{
_executionCount++;

Expand Down Expand Up @@ -101,7 +102,7 @@ public override string Execute(NodeExecutionContext context, out int recordsAffe

using (_timer.Run())
{
return ExecuteDmlOperation(
ExecuteDmlOperation(
dataSource,
context.Options,
entities,
Expand All @@ -113,8 +114,8 @@ public override string Execute(NodeExecutionContext context, out int recordsAffe
InProgressLowercase = "inserting",
CompletedLowercase = "inserted"
},
context,
out recordsAffected,
context.ParameterValues,
LogicalName == "listmember" || meta.IsIntersect == true ? null : (Action<OrganizationResponse>) ((r) => SetIdentity(r, context.ParameterValues))
);
}
Expand Down
2 changes: 1 addition & 1 deletion MarkMpn.Sql4Cds.Engine/ExecutionPlan/NestedLoopNode.cs
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ protected override IEnumerable<Entity> ExecuteInternal(NodeExecutionContext cont

var hasRight = false;

foreach (var right in RightSource.Execute(new NodeExecutionContext(context.DataSources, context.Options, innerParameterTypes, innerParameters)))
foreach (var right in RightSource.Execute(new NodeExecutionContext(context.DataSources, context.Options, innerParameterTypes, innerParameters, context.Log)))
{
if (rightSchema == null)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ protected override IEnumerable<Entity> ExecuteInternal(NodeExecutionContext cont
partitionParameterValues[kvp.Key] = kvp.Value;
}

var partitionContext = new NodeExecutionContext(context.DataSources, context.Options, context.ParameterTypes, partitionParameterValues);
var partitionContext = new NodeExecutionContext(context.DataSources, context.Options, context.ParameterTypes, partitionParameterValues, context.Log);

return new { Context = partitionContext, Fetch = fetch };
},
Expand Down
8 changes: 3 additions & 5 deletions MarkMpn.Sql4Cds.Engine/ExecutionPlan/PrintNode.cs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ public object Clone()
};
}

public string Execute(NodeExecutionContext context, out int recordsAffected)
public void Execute(NodeExecutionContext context, out int recordsAffected)
{
_executionCount++;
recordsAffected = -1;
Expand All @@ -56,10 +56,8 @@ public string Execute(NodeExecutionContext context, out int recordsAffected)
{
var value = (SqlString)_expression(new ExpressionExecutionContext(context));

if (value.IsNull)
return null;

return value.Value;
if (!value.IsNull)
context.Log(value.Value);
}
}

Expand Down
Loading

0 comments on commit ee540f9

Please sign in to comment.