Skip to content

Commit

Permalink
Added support for partitionid field as part of elastic lookup columns
Browse files Browse the repository at this point in the history
  • Loading branch information
MarkMpn committed Jun 4, 2024
1 parent 532c569 commit baf51f0
Show file tree
Hide file tree
Showing 5 changed files with 146 additions and 27 deletions.
27 changes: 27 additions & 0 deletions MarkMpn.Sql4Cds.Engine/ExecutionPlan/BaseDmlNode.cs
Original file line number Diff line number Diff line change
Expand Up @@ -488,6 +488,15 @@ protected Dictionary<string, Func<Entity, object>> CompileColumnMappings(DataSou
);
}

if (lookupAttr != null && lookupAttr.Targets.Any(t => dataSource.Metadata[t].DataProviderId == DataProviders.ElasticDataProvider) && mappings.TryGetValue(destAttributeName + "pid", out var partitionIdColumn))
{
var partitionIdExpr = (Expression)Expression.Property(entityParam, typeof(Entity).GetCustomAttribute<DefaultMemberAttribute>().MemberName, Expression.Constant(partitionIdColumn));
partitionIdExpr = Expression.Convert(partitionIdExpr, schema.Schema[partitionIdColumn].Type.ToNetType(out _));
partitionIdExpr = SqlTypeConverter.Convert(partitionIdExpr, schema.Schema[partitionIdColumn].Type, DataTypeHelpers.NVarChar(100, dataSource.DefaultCollation, CollationLabel.Implicit));
partitionIdExpr = SqlTypeConverter.Convert(partitionIdExpr, typeof(string));
convertedExpr = Expr.Call(() => CreateElasticEntityReference(Expr.Arg<EntityReference>(), Expr.Arg<string>(), Expr.Arg<IAttributeMetadataCache>()), convertedExpr, partitionIdExpr, Expression.Constant(dataSource.Metadata));
}

destType = typeof(EntityReference);
}
else
Expand Down Expand Up @@ -556,6 +565,24 @@ private static string ObjectTypeCodeToLogicalName(SqlInt32 otc, IAttributeMetada
return attributeMetadataCache[otc.Value].LogicalName;
}

private static EntityReference CreateElasticEntityReference(EntityReference entityReference, string partitionId, IAttributeMetadataCache metadata)
{
if (entityReference == null)
return null;

if (partitionId == null)
return entityReference;

var meta = metadata[entityReference.LogicalName];
var keys = new KeyAttributeCollection
{
[meta.PrimaryIdAttribute] = entityReference.Id,
["partitionid"] = partitionId
};

return new EntityReference(entityReference.LogicalName, keys);
}

/// <summary>
/// Provides values to include in log messages
/// </summary>
Expand Down
3 changes: 3 additions & 0 deletions MarkMpn.Sql4Cds.Engine/ExecutionPlan/FetchXmlScan.cs
Original file line number Diff line number Diff line change
Expand Up @@ -1304,6 +1304,9 @@ private void AddSchemaAttribute(DataSource dataSource, ColumnList schema, Dictio

if (lookup.Targets?.Length != 1 && lookup.AttributeType != AttributeTypeCode.PartyList)
AddSchemaAttribute(schema, aliases, AddSuffix(fullName, "type"), (attrMetadata.LogicalName + "type").EscapeIdentifier(), DataTypeHelpers.NVarChar(MetadataExtensions.EntityLogicalNameMaxLength, dataSource.DefaultCollation, CollationLabel.Implicit), notNull); ;

if (lookup.Targets != null && lookup.Targets.Any(logicalName => dataSource.Metadata[logicalName].DataProviderId == DataProviders.ElasticDataProvider))
AddSchemaAttribute(schema, aliases, AddSuffix(fullName, "pid"), (attrMetadata.LogicalName + "pid").EscapeIdentifier(), DataTypeHelpers.NVarChar(100, dataSource.DefaultCollation, CollationLabel.Implicit), false);
}
}

Expand Down
83 changes: 78 additions & 5 deletions MarkMpn.Sql4Cds.Engine/ExecutionPlanBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -1410,6 +1410,7 @@ private InsertNode ConvertInsertSpecification(NamedTableReference target, IList<
var attributes = metadata.Attributes.ToDictionary(attr => attr.LogicalName, StringComparer.OrdinalIgnoreCase);
var attributeNames = new HashSet<string>(StringComparer.OrdinalIgnoreCase);
var virtualTypeAttributes = new HashSet<string>(StringComparer.OrdinalIgnoreCase);
var virtualPidAttributes = new HashSet<string>(StringComparer.OrdinalIgnoreCase);
var schema = sourceColumns == null ? null : ((IDataExecutionPlanNodeInternal)source).GetSchema(_nodeContext);

// Check all target columns are valid for create
Expand All @@ -1430,6 +1431,18 @@ attr is LookupAttributeMetadata lookupAttr &&
continue;
}

// Could be a virtual ___pid attribute
if (colName.EndsWith("pid", StringComparison.OrdinalIgnoreCase) &&
attributes.TryGetValue(colName.Substring(0, colName.Length - 3), out attr) &&
attr is LookupAttributeMetadata elasticLookupAttr &&
elasticLookupAttr.Targets.Any(t => dataSource.Metadata[t].DataProviderId == DataProviders.ElasticDataProvider))
{
if (!virtualPidAttributes.Add(colName))
throw new NotSupportedQueryFragmentException(Sql4CdsError.DuplicateInsertUpdateColumn(col));

continue;
}

if (!attributes.TryGetValue(colName, out attr))
throw new NotSupportedQueryFragmentException(Sql4CdsError.InvalidColumnName(col));

Expand Down Expand Up @@ -1521,6 +1534,11 @@ attr is LookupAttributeMetadata lookupAttr &&
targetName = colName;
targetType = DataTypeHelpers.NVarChar(MetadataExtensions.EntityLogicalNameMaxLength, dataSource.DefaultCollation, CollationLabel.CoercibleDefault);
}
else if (virtualPidAttributes.Contains(colName))
{
targetName = colName;
targetType = DataTypeHelpers.NVarChar(100, dataSource.DefaultCollation, CollationLabel.CoercibleDefault);
}
else
{
var attr = attributes[colName];
Expand Down Expand Up @@ -1579,12 +1597,23 @@ attr is LookupAttributeMetadata lookupAttr &&
(schema == null || (node.ColumnMappings[targetAttrName].ToColumnReference().GetType(GetExpressionContext(schema, _nodeContext), out var lookupType) != typeof(SqlEntityReference) && lookupType != DataTypeHelpers.ImplicitIntForNullLiteral)))
{
// Special case: not required for listmember.entityid
if (metadata.LogicalName == "listmember" && targetLookupAttribute.LogicalName == "entityid")
continue;
if (metadata.LogicalName != "listmember" || targetLookupAttribute.LogicalName != "entityid")
{
throw new NotSupportedQueryFragmentException(Sql4CdsError.ReadOnlyColumn(col))
{
Suggestion = $"Inserting values into a polymorphic lookup field requires setting the associated type column as well\r\nAdd a value for the {targetLookupAttribute.LogicalName}type column and set it to one of the following values:\r\n{String.Join("\r\n", targetLookupAttribute.Targets.Select(t => $"* {t}"))}"
};
}
}

// If the lookup references an elastic table we also need the pid column
if (targetLookupAttribute.Targets.Any(t => dataSource.Metadata[t].DataProviderId == DataProviders.ElasticDataProvider) &&
!virtualPidAttributes.Contains(targetAttrName + "pid") &&
(schema == null || (node.ColumnMappings[targetAttrName].ToColumnReference().GetType(GetExpressionContext(schema, _nodeContext), out var elasticLookupType) != null && elasticLookupType != DataTypeHelpers.ImplicitIntForNullLiteral)))
{
throw new NotSupportedQueryFragmentException(Sql4CdsError.ReadOnlyColumn(col))
{
Suggestion = $"Inserting values into a polymorphic lookup field requires setting the associated type column as well\r\nAdd a value for the {targetLookupAttribute.LogicalName}type column and set it to one of the following values:\r\n{String.Join("\r\n", targetLookupAttribute.Targets.Select(t => $"* {t}"))}"
Suggestion = $"Inserting values into an elastic lookup field requires setting the associated pid column as well\r\nAdd a value for the {targetLookupAttribute.LogicalName}pid column"
};
}
}
Expand All @@ -1601,6 +1630,18 @@ attr is LookupAttributeMetadata lookupAttr &&
};
}
}
else if (virtualPidAttributes.Contains(targetAttrName))
{
var idAttrName = targetAttrName.Substring(0, targetAttrName.Length - 3);

if (!attributeNames.Contains(idAttrName))
{
throw new NotSupportedQueryFragmentException(Sql4CdsError.ReadOnlyColumn(col))
{
Suggestion = $"Inserting values into an elastic lookup field requires setting the associated ID column as well\r\nAdd a value for the {idAttrName} column"
};
}
}
}

return node;
Expand Down Expand Up @@ -1903,6 +1944,7 @@ private UpdateNode ConvertUpdateStatement(UpdateSpecification update, IList<Opti
var attributes = targetMetadata.Attributes.ToDictionary(attr => attr.LogicalName, StringComparer.OrdinalIgnoreCase);
var attributeNames = new HashSet<string>(StringComparer.OrdinalIgnoreCase);
var virtualTypeAttributes = new HashSet<string>(StringComparer.OrdinalIgnoreCase);
var virtualPidAttributes = new HashSet<string>(StringComparer.OrdinalIgnoreCase);
var existingAttributes = new HashSet<string>(StringComparer.OrdinalIgnoreCase);
var useStateTransitions = !hints.OfType<UseHintList>().Any(h => h.Hints.Any(s => s.Value.Equals("DISABLE_STATE_TRANSITIONS", StringComparison.OrdinalIgnoreCase)));
var stateTransitions = useStateTransitions ? StateTransitionLoader.LoadStateTransitions(targetMetadata) : null;
Expand Down Expand Up @@ -1970,6 +2012,14 @@ attr is LookupAttributeMetadata lookupAttr &&
if (!virtualTypeAttributes.Add(targetAttrName))
throw new NotSupportedQueryFragmentException(Sql4CdsError.DuplicateInsertUpdateColumn(assignment.Column));
}
else if (targetAttrName.EndsWith("pid", StringComparison.OrdinalIgnoreCase) &&
attributes.TryGetValue(targetAttrName.Substring(0, targetAttrName.Length - 3), out attr) &&
attr is LookupAttributeMetadata elasticLookupAttr &&
elasticLookupAttr.Targets.Any(t => dataSource.Metadata[t].DataProviderId == DataProviders.ElasticDataProvider))
{
if (!virtualPidAttributes.Add(targetAttrName))
throw new NotSupportedQueryFragmentException(Sql4CdsError.DuplicateInsertUpdateColumn(assignment.Column));
}
else
{
if (!attributes.TryGetValue(targetAttrName, out attr))
Expand Down Expand Up @@ -2068,7 +2118,7 @@ attr is LookupAttributeMetadata lookupAttr &&
var source = ConvertSelectStatement(selectStatement);

// Add UPDATE
var updateNode = ConvertSetClause(update.SetClauses, existingAttributes, dataSource, source, targetLogicalName, targetAlias, attributeNames, virtualTypeAttributes, hints);
var updateNode = ConvertSetClause(update.SetClauses, existingAttributes, dataSource, source, targetLogicalName, targetAlias, attributeNames, virtualTypeAttributes, virtualPidAttributes, hints);
updateNode.StateTransitions = stateTransitions;

return updateNode;
Expand Down Expand Up @@ -2096,7 +2146,7 @@ private void CopyDmlHintsToSelectStatement(IList<OptimizerHint> hints, SelectSta
}
}

private UpdateNode ConvertSetClause(IList<SetClause> setClauses, HashSet<string> existingAttributes, DataSource dataSource, IExecutionPlanNodeInternal node, string targetLogicalName, string targetAlias, HashSet<string> attributeNames, HashSet<string> virtualTypeAttributes, IList<OptimizerHint> queryHints)
private UpdateNode ConvertSetClause(IList<SetClause> setClauses, HashSet<string> existingAttributes, DataSource dataSource, IExecutionPlanNodeInternal node, string targetLogicalName, string targetAlias, HashSet<string> attributeNames, HashSet<string> virtualTypeAttributes, HashSet<string> virtualPidAttributes, IList<OptimizerHint> queryHints)
{
var targetMetadata = dataSource.Metadata[targetLogicalName];
var attributes = targetMetadata.Attributes.ToDictionary(attr => attr.LogicalName, StringComparer.OrdinalIgnoreCase);
Expand Down Expand Up @@ -2223,6 +2273,17 @@ private UpdateNode ConvertSetClause(IList<SetClause> setClauses, HashSet<string>
Suggestion = $"Add a SET clause for the {targetLookupAttribute.LogicalName}type column and set it to one of the following values:\r\n{String.Join("\r\n", targetLookupAttribute.Targets.Select(t => $"* {t}"))}"
};
}

// If the lookup references an elastic table we also need the pid column
if (targetLookupAttribute.Targets.Any(t => dataSource.Metadata[t].DataProviderId == DataProviders.ElasticDataProvider) &&
!virtualPidAttributes.Contains(targetAttrName + "pid") &&
(!sourceTypes.TryGetValue(targetAttrName, out var elasticLookupType) || elasticLookupType != DataTypeHelpers.ImplicitIntForNullLiteral))
{
throw new NotSupportedQueryFragmentException("Updating an elastic lookup field requires setting the associated pid column as well", assignment.Column)
{
Suggestion = $"Add a SET clause for the {targetLookupAttribute.LogicalName}pid column"
};
}
}
else if (virtualTypeAttributes.Contains(targetAttrName))
{
Expand All @@ -2239,6 +2300,18 @@ private UpdateNode ConvertSetClause(IList<SetClause> setClauses, HashSet<string>
};
}
}
else if (virtualPidAttributes.Contains(targetAttrName))
{
var idAttrName = targetAttrName.Substring(0, targetAttrName.Length - 3);

if (!attributeNames.Contains(idAttrName))
{
throw new NotSupportedQueryFragmentException("Updating an elastic lookup field requires setting the associated ID column as well", assignment.Column)
{
Suggestion = $"Add a SET clause for the {idAttrName} column"
};
}
}
}

return update;
Expand Down
Loading

0 comments on commit baf51f0

Please sign in to comment.