Skip to content

Commit

Permalink
Initial partitioning work
Browse files Browse the repository at this point in the history
  • Loading branch information
MarkMpn committed Nov 27, 2021
1 parent 78e5e5f commit 9f88d56
Show file tree
Hide file tree
Showing 4 changed files with 587 additions and 2 deletions.
56 changes: 56 additions & 0 deletions MarkMpn.Sql4Cds.Engine/ExecutionPlan/Aggregate.cs
Original file line number Diff line number Diff line change
Expand Up @@ -84,11 +84,27 @@ public virtual void NextRecord(Entity entity)
Update(value);
}

/// <summary>
/// Updates the aggregate function state based on the aggregate values for a partition
/// </summary>
/// <param name="entity">The <see cref="Entity"/> that contains aggregated values from a partition of the available records</param>
public virtual void NextPartition(Entity entity)
{
var value = _selector(entity);
UpdatePartition(value);
}

/// <summary>
/// Updates the aggregation state based on a value extracted from the source <see cref="Entity"/>
/// </summary>
/// <param name="value"></param>
protected abstract void Update(object value);

/// <summary>
/// Updates the aggregation state based on a value extracted from the partition <see cref="Entity"/>
/// </summary>
/// <param name="value"></param>
protected abstract void UpdatePartition(object value);

/// <summary>
/// Returns the current value of this aggregation
Expand Down Expand Up @@ -160,6 +176,11 @@ protected override void Update(object value)
Value = _valueSelector(_sum / _count);
}

protected override void UpdatePartition(object value)
{
throw new InvalidOperationException();
}

public override Type Type { get; }

public override void Reset()
Expand Down Expand Up @@ -188,6 +209,11 @@ protected override void Update(object value)
Value = (SqlInt32)Value + 1;
}

protected override void UpdatePartition(object value)
{
Value = (SqlInt32)Value + (SqlInt32)value;
}

public override Type Type => typeof(SqlInt32);

public override void Reset()
Expand Down Expand Up @@ -217,6 +243,11 @@ protected override void Update(object value)
Value = (SqlInt32)Value + 1;
}

protected override void UpdatePartition(object value)
{
Value = (SqlInt32)Value + (SqlInt32)value;
}

public override Type Type => typeof(int);

public override void Reset()
Expand Down Expand Up @@ -251,6 +282,11 @@ protected override void Update(object value)
Value = value;
}

protected override void UpdatePartition(object value)
{
Update(value);
}

public override Type Type { get; }
}

Expand Down Expand Up @@ -280,6 +316,11 @@ protected override void Update(object value)
Value = value;
}

protected override void UpdatePartition(object value)
{
Update(value);
}

public override Type Type { get; }
}

Expand Down Expand Up @@ -317,6 +358,11 @@ protected override void Update(object value)
Value = _valueSelector(_sumDecimal);
}

protected override void UpdatePartition(object value)
{
Update(value);
}

public override Type Type { get; }

public override void Reset()
Expand Down Expand Up @@ -348,6 +394,11 @@ protected override void Update(object value)
Value = value;
}

protected override void UpdatePartition(object value)
{
throw new InvalidOperationException();
}

public override Type Type { get; }

public override void Reset()
Expand Down Expand Up @@ -387,6 +438,11 @@ public override void NextRecord(Entity entity)
}
}

protected override void UpdatePartition(object value)
{
throw new InvalidOperationException();
}

protected override void Update(object value)
{
throw new NotImplementedException();
Expand Down
48 changes: 46 additions & 2 deletions MarkMpn.Sql4Cds.Engine/ExecutionPlan/HashMatchAggregateNode.cs
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,11 @@ Source is FetchXmlScan fetch &&
if (Source is FetchXmlScan || Source is ComputeScalarNode computeScalar && computeScalar.Source is FetchXmlScan)
{
// Check if all the aggregates & groupings can be done in FetchXML. Can only convert them if they can ALL
// be handled - if any one needs to be calculated manually, we need to calculate them all
// be handled - if any one needs to be calculated manually, we need to calculate them all. Also track if
// we can partition the query for larger source data sets. We can't partition DISTINCT aggregates, and need
// to transform AVG(field) to SUM(field) / COUNT(field)
var canPartition = true;

foreach (var agg in Aggregates)
{
if (agg.Value.SqlExpression != null && !(agg.Value.SqlExpression is ColumnReferenceExpression))
Expand All @@ -301,6 +305,9 @@ Source is FetchXmlScan fetch &&

if (agg.Value.AggregateType == AggregateType.First)
return this;

if (agg.Value.Distinct)
canPartition = false;
}

var fetchXml = Source as FetchXmlScan;
Expand Down Expand Up @@ -518,10 +525,47 @@ Source is FetchXmlScan fetch &&
// FoldQuery can be called again in some circumstances. Don't repeat the folding operation and create another try/catch
_folded = true;

IDataExecutionPlanNode fallback = this;

if (canPartition)
{
// Create a clone of the aggregate FetchXML query
var partitionedFetchXml = new FetchXmlScan
{
DataSource = fetchXml.DataSource,
Alias = fetchXml.Alias,
AllPages = fetchXml.AllPages,
FetchXml = (FetchXml.FetchType)serializer.Deserialize(new StringReader(fetchXml.FetchXmlString)),
ReturnFullSchema = fetchXml.ReturnFullSchema
};

var partitionedAggregates = new PartitionedFetchXmlAggregateNode
{
Source = partitionedFetchXml
};

var tryPartitioned = new TryCatchNode
{
TrySource = partitionedAggregates,
CatchSource = fallback,
ExceptionFilter = IsAggregateQueryRetryableException
};
fallback = tryPartitioned;

partitionedAggregates.GroupBy.AddRange(GroupBy);

foreach (var aggregate in Aggregates)
{
// TODO: Clone the aggregate
// TODO: Rewrite AVG
partitionedAggregates.Aggregates[aggregate.Key] = aggregate.Value;
}
}

return new TryCatchNode
{
TrySource = fetchXml,
CatchSource = this,
CatchSource = fallback,
ExceptionFilter = IsAggregateQueryRetryableException
};
}
Expand Down
Loading

0 comments on commit 9f88d56

Please sign in to comment.