Skip to content

Commit

Permalink
More partitioning work
Browse files Browse the repository at this point in the history
  • Loading branch information
MarkMpn committed Nov 29, 2021
1 parent 022a4de commit a0c5b60
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 3 deletions.
22 changes: 21 additions & 1 deletion MarkMpn.Sql4Cds.Engine/ExecutionPlan/FetchXmlScan.cs
Original file line number Diff line number Diff line change
Expand Up @@ -776,10 +776,30 @@ public override void AddRequiredColumns(IDictionary<string, DataSource> dataSour

public override int EstimateRowsOut(IDictionary<string, DataSource> dataSources, IQueryExecutionOptions options, IDictionary<string, Type> parameterTypes)
{
// TODO: Improve estimate for aggregate queries
if (FetchXml.aggregateSpecified && FetchXml.aggregate)
{
var hasGroups = HasGroups(Entity.Items);

if (!hasGroups)
return 1;

return EstimateRowsOut(Entity.name, Entity.Items, dataSources) * 4 / 10;
}

return EstimateRowsOut(Entity.name, Entity.Items, dataSources);
}

private bool HasGroups(object[] items)
{
if (items == null)
return false;

if (items.OfType<FetchAttributeType>().Any(a => a.groupbySpecified && a.groupby == FetchBoolType.@true))
return true;

return items.OfType<FetchLinkEntityType>().Any(link => HasGroups(link.Items));
}

private int EstimateRowsOut(string name, object[] items, IDictionary<string, DataSource> dataSources)
{
if (!String.IsNullOrEmpty(FetchXml.top))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -389,7 +389,7 @@ Source is FetchXmlScan fetch &&
{
TrySource = firstTry,
CatchSource = this,
ExceptionFilter = ex => GetOrganizationServiceFault(ex, out var fault) && IsAggregateQueryRetryable(fault)
ExceptionFilter = ex => (ex is QueryExecutionException qee && qee.InnerException is PartitionedAggregateNode.PartitionOverflowException) || (GetOrganizationServiceFault(ex, out var fault) && IsAggregateQueryRetryable(fault))
};

firstTry.Parent = tryCatch;
Expand Down
11 changes: 10 additions & 1 deletion MarkMpn.Sql4Cds.Engine/ExecutionPlan/PartitionedAggregateNode.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@ namespace MarkMpn.Sql4Cds.Engine.ExecutionPlan
/// </summary>
class PartitionedAggregateNode : BaseAggregateNode
{
public class PartitionOverflowException : Exception
{
}

class Partition
{
public SqlDateTime MinValue { get; set; }
Expand Down Expand Up @@ -102,7 +106,7 @@ protected override IEnumerable<Entity> ExecuteInternal(IDictionary<string, DataS
throw new QueryExecutionException("Cannot partition query");

// Split recursively, add up values below & above split value if query returns successfully, or re-split on error
// Range is >MinValue AND <= MaxValue, so start from just before first record to ensure the first record is counted
// Range is > MinValue AND <= MaxValue, so start from just before first record to ensure the first record is counted
var fullRange = new Partition
{
MinValue = minKey.Value.AddSeconds(-1),
Expand Down Expand Up @@ -153,6 +157,11 @@ protected override IEnumerable<Entity> ExecuteInternal(IDictionary<string, DataS

private void SplitPartition(Partition partition)
{
// Fail if we get stuck on a particularly dense partition. If there's > 50K records in a 10 second window we probably
// won't be able to split it successfully
if (partition.MaxValue.Value < partition.MinValue.Value.AddSeconds(10))
throw new PartitionOverflowException();

var split = partition.MinValue.Value + TimeSpan.FromSeconds((partition.MaxValue.Value - partition.MinValue.Value).TotalSeconds / 2);

_queue.Enqueue(new Partition
Expand Down

0 comments on commit a0c5b60

Please sign in to comment.