Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Aggregate fixes #153

Merged
merged 5 commits into from
Jan 14, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 13 additions & 1 deletion MarkMpn.Sql4Cds.Engine.Tests/ExecutionPlanTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ public class ExecutionPlanTests : FakeXrmEasyTestsBase, IQueryExecutionOptions

bool IQueryExecutionOptions.ColumnComparisonAvailable => true;

bool IQueryExecutionOptions.UseLocalTimeZone => false;
bool IQueryExecutionOptions.UseLocalTimeZone => true;

List<JoinOperator> IQueryExecutionOptions.JoinOperatorsAvailable => _supportedJoins;

Expand Down Expand Up @@ -3306,5 +3306,17 @@ public void DistinctOrderByUsesScalarAggregate()
</entity>
</fetch>");
}

[TestMethod]
[ExpectedException(typeof(NotSupportedQueryFragmentException))]
public void WindowFunctionsNotSupported()
{
var metadata = new AttributeMetadataCache(_service);
var planBuilder = new ExecutionPlanBuilder(metadata, new StubTableSizeCache(), this);

var query = "SELECT COUNT(accountid) OVER(PARTITION BY accountid) AS test FROM account";

planBuilder.Build(query);
}
}
}
3 changes: 3 additions & 0 deletions MarkMpn.Sql4Cds.Engine/ExecutionPlan/ExpressionExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -531,6 +531,9 @@ private static MethodInfo GetMethod(Type targetType, FunctionCall func, Type[] p

private static Expression ToExpression(this FunctionCall func, INodeSchema schema, INodeSchema nonAggregateSchema, IDictionary<string, Type> parameterTypes, ParameterExpression entityParam, ParameterExpression parameterParam, ParameterExpression optionsParam)
{
if (func.OverClause != null)
throw new NotSupportedQueryFragmentException("Window functions are not supported", func);

// Find the method to call and get the expressions for the parameter values
var method = GetMethod(func, schema, nonAggregateSchema, parameterTypes, entityParam, parameterParam, optionsParam, out var paramValues);

Expand Down
48 changes: 48 additions & 0 deletions MarkMpn.Sql4Cds.Engine/ExecutionPlan/FetchXmlScan.cs
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,13 @@ public void SetValue(object value, IQueryExecutionOptions options)
}
}

public class InvalidPagingException : Exception
{
public InvalidPagingException(string message) : base(message)
{
}
}

private Dictionary<string, ParameterizedCondition> _parameterizedConditions;
private HashSet<string> _entityNameGroupings;
private Dictionary<string, string> _primaryKeyColumns;
Expand Down Expand Up @@ -197,6 +204,11 @@ protected override IEnumerable<Entity> ExecuteInternal(IDictionary<string, DataS
if (AllPages && FetchXml.aggregateSpecified && FetchXml.aggregate && count == 5000 && FetchXml.top != "5000" && !res.MoreRecords)
throw new FaultException<OrganizationServiceFault>(new OrganizationServiceFault { ErrorCode = -2147164125, Message = "AggregateQueryRecordLimitExceeded" });

// Aggregate queries with grouping on lookup columns don't provide reliable paging as the sorting is done by the name of the related
// record, not the guid. Non-aggregate queries can also be sorted on the primary key as a tie-breaker.
if (res.MoreRecords && FetchXml.aggregateSpecified && FetchXml.aggregate && ContainsSortOnLookupAttribute(dataSource.Metadata, Entity.name, Entity.Items, out var lookupAttr))
throw new InvalidPagingException($"{lookupAttr.name} is a lookup attribute - paging with a sort order on this attribute is not reliable.");

foreach (var entity in res.Entities)
{
OnRetrievedEntity(entity, schema, options, dataSource.Metadata);
Expand Down Expand Up @@ -231,6 +243,42 @@ protected override IEnumerable<Entity> ExecuteInternal(IDictionary<string, DataS
}
}

private bool ContainsSortOnLookupAttribute(IAttributeMetadataCache metadata, string logicalName, object[] items, out FetchAttributeType lookupAttr)
{
if (items == null)
{
lookupAttr = null;
return false;
}

foreach (var order in items.OfType<FetchOrderType>())
{
if (!String.IsNullOrEmpty(order.alias))
lookupAttr = items.OfType<FetchAttributeType>().FirstOrDefault(attr => attr.alias.Equals(order.alias, StringComparison.OrdinalIgnoreCase));
else
lookupAttr = items.OfType<FetchAttributeType>().FirstOrDefault(attr => attr.name.Equals(order.attribute, StringComparison.OrdinalIgnoreCase));

if (lookupAttr == null)
continue;

var meta = metadata[logicalName];
var attrName = lookupAttr.name;
var attrMetadata = meta.Attributes.SingleOrDefault(a => a.LogicalName.Equals(attrName, StringComparison.OrdinalIgnoreCase));

if (attrMetadata is LookupAttributeMetadata)
return true;
}

foreach (var linkEntity in items.OfType<FetchLinkEntityType>())
{
if (ContainsSortOnLookupAttribute(metadata, linkEntity.name, linkEntity.Items, out lookupAttr))
return true;
}

lookupAttr = null;
return false;
}

private void OnRetrievedEntity(Entity entity, INodeSchema schema, IQueryExecutionOptions options, IAttributeMetadataCache metadata)
{
// Expose any formatted values for OptionSetValue and EntityReference values
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,13 @@ Source is FetchXmlScan fetch &&
canUseFetchXmlAggregate = false;
break;
}

// FetchXML dategrouping always uses local timezone. If we're using UTC we can't use it
if (!options.UseLocalTimeZone)
{
canUseFetchXmlAggregate = false;
break;
}
}
}

Expand Down Expand Up @@ -506,7 +513,7 @@ Source is FetchXmlScan fetch &&
{
TrySource = firstTry,
CatchSource = nonFetchXmlAggregate,
ExceptionFilter = ex => (ex is QueryExecutionException qee && qee.InnerException is PartitionedAggregateNode.PartitionOverflowException) || (GetOrganizationServiceFault(ex, out var fault) && IsAggregateQueryRetryable(fault))
ExceptionFilter = ex => (ex is QueryExecutionException qee && (qee.InnerException is PartitionedAggregateNode.PartitionOverflowException || qee.InnerException is FetchXmlScan.InvalidPagingException)) || (GetOrganizationServiceFault(ex, out var fault) && IsAggregateQueryRetryable(fault))
};

firstTry.Parent = tryCatch;
Expand Down
131 changes: 72 additions & 59 deletions MarkMpn.Sql4Cds.Engine/ExecutionPlan/PartitionedAggregateNode.cs
Original file line number Diff line number Diff line change
Expand Up @@ -142,80 +142,90 @@ protected override IEnumerable<Entity> ExecuteInternal(IDictionary<string, DataS
}
#endif

Parallel.For(1, maxDop, index =>
try
{
var ds = new Dictionary<string, DataSource>
Parallel.For(0, maxDop, index =>
{
[fetchXmlNode.DataSource] = new DataSource
var ds = new Dictionary<string, DataSource>
{
Connection = svc.Clone(),
Metadata = dataSources[fetchXmlNode.DataSource].Metadata,
Name = fetchXmlNode.DataSource,
TableSizeCache = dataSources[fetchXmlNode.DataSource].TableSizeCache
}
};

var fetch = new FetchXmlScan
{
Alias = fetchXmlNode.Alias,
DataSource = fetchXmlNode.DataSource,
FetchXml = CloneFetchXml(fetchXmlNode.FetchXml),
Parent = this
};

var partitionParameterValues = new Dictionary<string, object>
{
["@PartitionStart"] = minKey,
["@PartitionEnd"] = maxKey
};

if (parameterValues != null)
{
foreach (var kvp in parameterValues)
partitionParameterValues[kvp.Key] = kvp.Value;
}
[fetchXmlNode.DataSource] = new DataSource
{
Connection = svc?.Clone() ?? org,
Metadata = dataSources[fetchXmlNode.DataSource].Metadata,
Name = fetchXmlNode.DataSource,
TableSizeCache = dataSources[fetchXmlNode.DataSource].TableSizeCache
}
};

foreach (var partition in _queue.GetConsumingEnumerable())
{
try
var fetch = new FetchXmlScan
{
// Execute the query for this partition
ExecuteAggregate(ds, options, partitionParameterTypes, partitionParameterValues, aggregates, groups, fetch, partition.MinValue, partition.MaxValue);
Alias = fetchXmlNode.Alias,
DataSource = fetchXmlNode.DataSource,
FetchXml = CloneFetchXml(fetchXmlNode.FetchXml),
Parent = this
};

lock (_lock)
{
_progress += partition.Percentage;
options.Progress(0, $"Partitioning {GetDisplayName(0, meta)} ({_progress:P0})...");
}
var partitionParameterValues = new Dictionary<string, object>
{
["@PartitionStart"] = minKey,
["@PartitionEnd"] = maxKey
};

if (Interlocked.Decrement(ref _pendingPartitions) == 0)
_queue.CompleteAdding();
if (parameterValues != null)
{
foreach (var kvp in parameterValues)
partitionParameterValues[kvp.Key] = kvp.Value;
}
catch (Exception ex)

foreach (var partition in _queue.GetConsumingEnumerable())
{
if (!GetOrganizationServiceFault(ex, out var fault))
try
{
_queue.CompleteAdding();
throw;
}
// Execute the query for this partition
ExecuteAggregate(ds, options, partitionParameterTypes, partitionParameterValues, aggregates, groups, fetch, partition.MinValue, partition.MaxValue);

if (!IsAggregateQueryLimitExceeded(fault))
lock (_lock)
{
_progress += partition.Percentage;
options.Progress(0, $"Partitioning {GetDisplayName(0, meta)} ({_progress:P0})...");
}

if (Interlocked.Decrement(ref _pendingPartitions) == 0)
_queue.CompleteAdding();
}
catch (Exception ex)
{
_queue.CompleteAdding();
throw;
lock (_queue)
{
if (!GetOrganizationServiceFault(ex, out var fault))
{
_queue.CompleteAdding();
throw;
}

if (!IsAggregateQueryLimitExceeded(fault))
{
_queue.CompleteAdding();
throw;
}

SplitPartition(partition);
}
}

SplitPartition(partition);
}
}

// Merge the stats from this clone of the FetchXML node so we can still see total number of executions etc.
// in the main query plan.
lock (fetchXmlNode)
{
fetchXmlNode.MergeStatsFrom(fetch);
}
});
// Merge the stats from this clone of the FetchXML node so we can still see total number of executions etc.
// in the main query plan.
lock (fetchXmlNode)
{
fetchXmlNode.MergeStatsFrom(fetch);
}
});
}
catch (AggregateException aggEx)
{
throw aggEx.InnerExceptions[0];
}

foreach (var group in groups)
{
Expand All @@ -233,6 +243,9 @@ protected override IEnumerable<Entity> ExecuteInternal(IDictionary<string, DataS

private void SplitPartition(Partition partition)
{
if (_queue.IsAddingCompleted)
return;

// Fail if we get stuck on a particularly dense partition. If there's > 50K records in a 10 second window we probably
// won't be able to split it successfully
if (partition.MaxValue.Value < partition.MinValue.Value.AddSeconds(10))
Expand Down
15 changes: 3 additions & 12 deletions MarkMpn.Sql4Cds.Engine/MarkMpn.Sql4Cds.Engine.nuspec
Original file line number Diff line number Diff line change
Expand Up @@ -11,18 +11,9 @@
<iconUrl>https://markcarrington.dev/sql4cds-icon/</iconUrl>
<description>Convert SQL queries to FetchXml and execute them against Dataverse / D365</description>
<summary>Convert SQL queries to FetchXml and execute them against Dataverse / D365</summary>
<releaseNotes>Added .NET Core version
Improved aggregate performance with automatic partitioning
Reduced attributes retrieved to process COUNT(*) queries
Fixed GROUP BY and aggregates on virtual attributes
Fixed retrieving file attributes
Fixed GROUP BY and ORDER BY on multi-select picklist fields
Fixed errors when using LIKE queries on non-string fields
Fixed errors when using filters on party list fields
Fixed running queries with more than 10 joins
Fixed DISTINCT with repeated attributes
Fixed use of non-FetchXML functions in WHERE clause
Fixed use of query derived tables with aggregates
<releaseNotes>Improved aggregate results when grouping by lookup columns
Detect use of window functions
Fixed use of DATEPART groupings when using UTC timezone
</releaseNotes>
<copyright>Copyright © 2020 Mark Carrington</copyright>
<language>en-GB</language>
Expand Down
3 changes: 3 additions & 0 deletions MarkMpn.Sql4Cds.Engine/Visitors/AggregateCollectingVisitor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,9 @@ public override void ExplicitVisit(QueryDerivedTable node)

private bool IsAggregate(FunctionCall func)
{
if (func.OverClause != null)
return false;

if (func.FunctionName.Value.Equals("SUM", StringComparison.OrdinalIgnoreCase) ||
func.FunctionName.Value.Equals("MIN", StringComparison.OrdinalIgnoreCase) ||
func.FunctionName.Value.Equals("MAX", StringComparison.OrdinalIgnoreCase) ||
Expand Down
17 changes: 3 additions & 14 deletions MarkMpn.Sql4Cds/MarkMpn.SQL4CDS.nuspec
Original file line number Diff line number Diff line change
Expand Up @@ -22,20 +22,9 @@ plugins or integrations by writing familiar SQL and converting it.

Using the preview TDS Endpoint, SELECT queries can also be run that aren't convertible to FetchXML.</description>
<summary>Convert SQL queries to FetchXML and execute them against Dataverse / D365</summary>
<releaseNotes>Improved aggregate performance with automatic partitioning
Added option to convert FetchXML to SQL using native D365 conversion
Added option to control displayed date format
Reduced attributes retrieved to process COUNT(*) queries
Fixed GROUP BY and aggregates on virtual attributes
Fixed retrieving file attributes
Fixed using quoted identifiers
Fixed GROUP BY and ORDER BY on multi-select picklist fields
Fixed errors when using LIKE queries on non-string fields
Fixed errors when using filters on party list fields
Fixed running queries with more than 10 joins
Fixed DISTINCT with repeated attributes
Fixed use of non-FetchXML functions in WHERE clause
Fixed use of query derived tables with aggregates
<releaseNotes>Improved aggregate results when grouping by lookup columns
Detect use of window functions
Fixed use of DATEPART groupings when using UTC timezone
</releaseNotes>
<copyright>Copyright © 2019 Mark Carrington</copyright>
<language>en-GB</language>
Expand Down