Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Query : Fixes ORDER BY query issue when partial partition key is specified with hierarchical partition #4507

Merged
merged 4 commits into from
May 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,8 @@ private static async Task<TryCatch<IQueryPipelineStage>> TryCreateFromPartitione
inputParameters.InitialFeedRange,
trace);

Debug.Assert(targetRanges != null, $"{nameof(CosmosQueryExecutionContextFactory)} Assert!", "targetRanges != null");

TryCatch<IQueryPipelineStage> tryCreatePipelineStage;
Documents.PartitionKeyRange targetRange = await TryGetTargetRangeOptimisticDirectExecutionAsync(
inputParameters,
Expand All @@ -270,7 +272,7 @@ private static async Task<TryCatch<IQueryPipelineStage>> TryCreateFromPartitione
}
else
{
bool singleLogicalPartitionKeyQuery = inputParameters.PartitionKey.HasValue
bool singleLogicalPartitionKeyQuery = (inputParameters.PartitionKey.HasValue && targetRanges.Count == 1)
|| ((partitionedQueryExecutionInfo.QueryRanges.Count == 1)
&& partitionedQueryExecutionInfo.QueryRanges[0].IsSingleValue);
bool serverStreamingQuery = !partitionedQueryExecutionInfo.QueryInfo.HasAggregates
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,4 +57,62 @@
{"id":"2","value2":"97"}]]></Documents>
</Output>
</Result>
<Result>
<Input>
<Description>SELECT ORDER BY with ODE</Description>
<Query><![CDATA[SELECT c.id, c.value2, c.intVal FROM c ORDER BY c.intVal]]></Query>
<ODE>True</ODE>
</Input>
<Output>
<Documents><![CDATA[{"id":"2","value2":"97","intVal":-47},
{"id":"2","value2":"92","intVal":-42},
{"id":"2","value2":"87","intVal":-37},
{"id":"2","value2":"82","intVal":-32},
{"id":"2","value2":"77","intVal":-27},
{"id":"2","value2":"72","intVal":-22},
{"id":"2","value2":"67","intVal":-17},
{"id":"2","value2":"62","intVal":-12},
{"id":"2","value2":"57","intVal":-7},
{"id":"2","value2":"52","intVal":-2},
{"id":"2","value2":"47","intVal":3},
{"id":"2","value2":"42","intVal":8},
{"id":"2","value2":"37","intVal":13},
{"id":"2","value2":"32","intVal":18},
{"id":"2","value2":"27","intVal":23},
{"id":"2","value2":"22","intVal":28},
{"id":"2","value2":"17","intVal":33},
{"id":"2","value2":"12","intVal":38},
{"id":"2","value2":"7","intVal":43},
{"id":"2","value2":"2","intVal":48}]]></Documents>
</Output>
</Result>
<Result>
<Input>
<Description>SELECT ORDER BY without ODE</Description>
<Query><![CDATA[SELECT c.id, c.value2, c.intVal FROM c ORDER BY c.intVal]]></Query>
<ODE>False</ODE>
</Input>
<Output>
<Documents><![CDATA[{"id":"2","value2":"97","intVal":-47},
{"id":"2","value2":"92","intVal":-42},
{"id":"2","value2":"87","intVal":-37},
{"id":"2","value2":"82","intVal":-32},
{"id":"2","value2":"77","intVal":-27},
{"id":"2","value2":"72","intVal":-22},
{"id":"2","value2":"67","intVal":-17},
{"id":"2","value2":"62","intVal":-12},
{"id":"2","value2":"57","intVal":-7},
{"id":"2","value2":"52","intVal":-2},
{"id":"2","value2":"47","intVal":3},
{"id":"2","value2":"42","intVal":8},
{"id":"2","value2":"37","intVal":13},
{"id":"2","value2":"32","intVal":18},
{"id":"2","value2":"27","intVal":23},
{"id":"2","value2":"22","intVal":28},
{"id":"2","value2":"17","intVal":33},
{"id":"2","value2":"12","intVal":38},
{"id":"2","value2":"7","intVal":43},
{"id":"2","value2":"2","intVal":48}]]></Documents>
</Output>
</Result>
</Results>
Original file line number Diff line number Diff line change
Expand Up @@ -52,11 +52,13 @@ internal class InMemoryContainer : IMonadicDocumentContainer
private Dictionary<int, PartitionKeyHashRange> cachedPartitionKeyRangeIdToHashRange;
private readonly bool createSplitForMultiHashAtSecondlevel;
private readonly bool resolvePartitionsBasedOnPrefix;
private readonly QueryRequestOptions queryRequestOptions;

public InMemoryContainer(
PartitionKeyDefinition partitionKeyDefinition,
bool createSplitForMultiHashAtSecondlevel = false,
bool resolvePartitionsBasedOnPrefix = false)
bool resolvePartitionsBasedOnPrefix = false,
QueryRequestOptions queryRequestOptions = null)
{
this.partitionKeyDefinition = partitionKeyDefinition ?? throw new ArgumentNullException(nameof(partitionKeyDefinition));
PartitionKeyHashRange fullRange = new PartitionKeyHashRange(startInclusive: null, endExclusive: new PartitionKeyHash(Cosmos.UInt128.MaxValue));
Expand All @@ -76,6 +78,7 @@ public InMemoryContainer(
this.parentToChildMapping = new Dictionary<int, (int, int)>();
this.createSplitForMultiHashAtSecondlevel = createSplitForMultiHashAtSecondlevel;
this.resolvePartitionsBasedOnPrefix = resolvePartitionsBasedOnPrefix;
this.queryRequestOptions = queryRequestOptions;
}

public Task<TryCatch<List<FeedRangeEpk>>> MonadicGetFeedRangesAsync(
Expand Down Expand Up @@ -512,7 +515,7 @@ public virtual Task<TryCatch<QueryPage>> MonadicQueryAsync(
}

List<CosmosObject> documents = new List<CosmosObject>();
foreach (Record record in records.Where(r => IsRecordWithinFeedRange(r, feedRangeState.FeedRange, this.partitionKeyDefinition)))
foreach (Record record in records.Where(r => IsRecordWithinFeedRange(r, feedRangeState.FeedRange, this.partitionKeyDefinition) && IsRecordWithinQueryPartition(r, this.queryRequestOptions, this.partitionKeyDefinition)))
{
CosmosObject document = ConvertRecordToCosmosElement(record);
documents.Add(CosmosObject.Create(document));
Expand Down Expand Up @@ -716,6 +719,26 @@ public virtual Task<TryCatch<QueryPage>> MonadicQueryAsync(
}
}

private bool IsRecordWithinQueryPartition(Record record, QueryRequestOptions queryRequestOptions, PartitionKeyDefinition partitionKeyDefinition)
{
if(queryRequestOptions?.PartitionKey == null)
{
return true;
}

IList<CosmosElement> partitionKey = GetPartitionKeysFromObjectModel(queryRequestOptions.PartitionKey.Value);
IList<CosmosElement> partitionKeyFromRecord = GetPartitionKeysFromPayload(record.Payload, partitionKeyDefinition);
if (partitionKeyDefinition.Kind == PartitionKind.MultiHash)
{
PartitionKeyHash partitionKeyHash = GetHashFromPartitionKeys(partitionKey, partitionKeyDefinition);
PartitionKeyHash partitionKeyFromRecordHash = GetHashFromPartitionKeys(partitionKeyFromRecord, partitionKeyDefinition);

return partitionKeyHash.Equals(partitionKeyFromRecordHash) || partitionKeyFromRecordHash.Value.StartsWith(partitionKeyHash.Value);
}

return partitionKey.SequenceEqual(partitionKeyFromRecord);
}

public Task<TryCatch<ChangeFeedPage>> MonadicChangeFeedAsync(
FeedRangeState<ChangeFeedState> feedRangeState,
ChangeFeedPaginationOptions changeFeedPaginationOptions,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,12 @@ public void TestQueriesOnSplitContainer()
{
List<SubpartitionTestInput> inputs = new List<SubpartitionTestInput>
{
new SubpartitionTestInput("SELECT", query: @"SELECT c.id, c.value2 FROM c", ode: true),
new SubpartitionTestInput("SELECT without ODE", query: @"SELECT c.id, c.value2 FROM c", ode: false),
new SubpartitionTestInput(description: "SELECT", query: @"SELECT c.id, c.value2 FROM c", ode: true),
new SubpartitionTestInput(description: "SELECT without ODE", query: @"SELECT c.id, c.value2 FROM c", ode: false),
new SubpartitionTestInput(description: "SELECT ORDER BY with ODE", query: @"SELECT c.id, c.value2, c.intVal FROM c ORDER BY c.intVal", ode: true, sortResults: false),
new SubpartitionTestInput(description: "SELECT ORDER BY without ODE", query: @"SELECT c.id, c.value2, c.intVal FROM c ORDER BY c.intVal", ode: false, sortResults: false),
};

this.ExecuteTestSuite(inputs);
}

Expand Down Expand Up @@ -65,16 +68,19 @@ public async Task VerifyTestFrameworkSupportsPartitionSplit()

public override SubpartitionTestOutput ExecuteTest(SubpartitionTestInput input)
{
IMonadicDocumentContainer monadicDocumentContainer = CreateSplitDocumentContainerAsync(DocumentCount).Result;
QueryRequestOptions queryRequestOptions = new QueryRequestOptions()
{
PartitionKey = new PartitionKeyBuilder().Add(SplitPartitionKey.ToString()).Build()
};

IMonadicDocumentContainer monadicDocumentContainer = CreateSplitDocumentContainerAsync(DocumentCount, queryRequestOptions).Result;
DocumentContainer documentContainer = new DocumentContainer(monadicDocumentContainer);
TryCatch _ = monadicDocumentContainer.MonadicRefreshProviderAsync(NoOpTrace.Singleton, cancellationToken: default).Result;
List<FeedRangeEpk> containerRanges = documentContainer.GetFeedRangesAsync(NoOpTrace.Singleton, cancellationToken: default).Result;

List<CosmosElement> documents = new List<CosmosElement>();
QueryRequestOptions queryRequestOptions = new QueryRequestOptions()
{
PartitionKey = new PartitionKeyBuilder().Add(SplitPartitionKey.ToString()).Build()
};
(CosmosQueryExecutionContextFactory.InputParameters inputParameters, CosmosQueryContextCore cosmosQueryContextCore) =
CreateInputParamsAndQueryContext(input, queryRequestOptions);
CreateInputParamsAndQueryContext(input, queryRequestOptions, containerRanges);
IQueryPipelineStage queryPipelineStage = CosmosQueryExecutionContextFactory.Create(
documentContainer,
cosmosQueryContextCore,
Expand All @@ -92,10 +98,10 @@ public override SubpartitionTestOutput ExecuteTest(SubpartitionTestInput input)
documents.AddRange(tryGetPage.Result.Documents);
}

return new SubpartitionTestOutput(documents);
return new SubpartitionTestOutput(documents, input.SortResults);
}

private static Tuple<CosmosQueryExecutionContextFactory.InputParameters, CosmosQueryContextCore> CreateInputParamsAndQueryContext(SubpartitionTestInput input, QueryRequestOptions queryRequestOptions)
private static Tuple<CosmosQueryExecutionContextFactory.InputParameters, CosmosQueryContextCore> CreateInputParamsAndQueryContext(SubpartitionTestInput input, QueryRequestOptions queryRequestOptions, IReadOnlyList<FeedRangeEpk> containerRanges)
{
string query = input.Query;
CosmosElement continuationToken = null;
Expand Down Expand Up @@ -134,10 +140,20 @@ public override SubpartitionTestOutput ExecuteTest(SubpartitionTestInput input)
isNonStreamingOrderByQueryFeatureDisabled: queryRequestOptions.IsNonStreamingOrderByQueryFeatureDisabled,
testInjections: queryRequestOptions.TestSettings);

List<PartitionKeyRange> targetPkRanges = new();
foreach (FeedRangeEpk feedRangeEpk in containerRanges)
{
targetPkRanges.Add(new PartitionKeyRange
{
MinInclusive = feedRangeEpk.Range.Min,
MaxExclusive = feedRangeEpk.Range.Max,
});
}

string databaseId = "db1234";
string resourceLink = $"dbs/{databaseId}/colls";
CosmosQueryContextCore cosmosQueryContextCore = new CosmosQueryContextCore(
client: new TestCosmosQueryClient(queryPartitionProvider),
client: new TestCosmosQueryClient(queryPartitionProvider, targetPkRanges),
resourceTypeEnum: Documents.ResourceType.Document,
operationType: Documents.OperationType.Query,
resourceType: typeof(QueryResponseCore),
Expand Down Expand Up @@ -215,20 +231,20 @@ internal static PartitionKeyDefinition CreatePartitionKeyDefinition()
return partitionKeyDefinition;
}

private static async Task<IDocumentContainer> CreateSplitDocumentContainerAsync(int numItems)
private static async Task<IDocumentContainer> CreateSplitDocumentContainerAsync(int numItems, QueryRequestOptions queryRequestOptions)
{
PartitionKeyDefinition partitionKeyDefinition = CreatePartitionKeyDefinition();
InMemoryContainer inMemoryContainer = await CreateSplitInMemoryDocumentContainerAsync(numItems, partitionKeyDefinition);
InMemoryContainer inMemoryContainer = await CreateSplitInMemoryDocumentContainerAsync(numItems, partitionKeyDefinition, queryRequestOptions);
DocumentContainer documentContainer = new DocumentContainer(inMemoryContainer);
return documentContainer;
}

private static async Task<InMemoryContainer> CreateSplitInMemoryDocumentContainerAsync(int numItems, PartitionKeyDefinition partitionKeyDefinition)
private static async Task<InMemoryContainer> CreateSplitInMemoryDocumentContainerAsync(int numItems, PartitionKeyDefinition partitionKeyDefinition, QueryRequestOptions queryRequestOptions = null)
{
InMemoryContainer inMemoryContainer = new InMemoryContainer(partitionKeyDefinition, createSplitForMultiHashAtSecondlevel: true, resolvePartitionsBasedOnPrefix: true);
InMemoryContainer inMemoryContainer = new InMemoryContainer(partitionKeyDefinition, createSplitForMultiHashAtSecondlevel: true, resolvePartitionsBasedOnPrefix: true, queryRequestOptions: queryRequestOptions);
for (int i = 0; i < numItems; i++)
{
CosmosObject item = CosmosObject.Parse($"{{\"id\" : \"{i % 5}\", \"value1\" : \"{Guid.NewGuid()}\", \"value2\" : \"{i}\" }}");
CosmosObject item = CosmosObject.Parse($"{{\"id\" : \"{i % 5}\", \"value1\" : \"{Guid.NewGuid()}\", \"value2\" : \"{i}\", \"intVal\" : {(numItems/2) - i} }}");
while (true)
{
TryCatch<Record> monadicCreateRecord = await inMemoryContainer.MonadicCreateItemAsync(item, cancellationToken: default);
Expand All @@ -243,13 +259,16 @@ private static async Task<InMemoryContainer> CreateSplitInMemoryDocumentContaine

return inMemoryContainer;
}

internal class TestCosmosQueryClient : CosmosQueryClient
{
private readonly QueryPartitionProvider queryPartitionProvider;
private readonly IReadOnlyList<PartitionKeyRange> targetPartitionKeyRanges;

public TestCosmosQueryClient(QueryPartitionProvider queryPartitionProvider)
public TestCosmosQueryClient(QueryPartitionProvider queryPartitionProvider, IEnumerable<PartitionKeyRange> targetPartitionKeyRanges)
{
this.queryPartitionProvider = queryPartitionProvider;
this.targetPartitionKeyRanges = targetPartitionKeyRanges.ToList();
}

public override Action<IQueryable> OnExecuteScalarQueryCallback => throw new NotImplementedException();
Expand Down Expand Up @@ -322,14 +341,7 @@ public override Task<List<PartitionKeyRange>> GetTargetPartitionKeyRangeByFeedRa

public override Task<List<PartitionKeyRange>> GetTargetPartitionKeyRangesAsync(string resourceLink, string collectionResourceId, IReadOnlyList<Documents.Routing.Range<string>> providedRanges, bool forceRefresh, ITrace trace)
{
return Task.FromResult(new List<PartitionKeyRange>
{
new PartitionKeyRange()
{
MinInclusive = Documents.Routing.PartitionKeyInternal.MinimumInclusiveEffectivePartitionKey,
MaxExclusive = Documents.Routing.PartitionKeyInternal.MaximumExclusiveEffectivePartitionKey
}
});
return Task.FromResult(this.targetPartitionKeyRanges.ToList());
}

public override Task<IReadOnlyList<PartitionKeyRange>> TryGetOverlappingRangesAsync(string collectionResourceId, Documents.Routing.Range<string> range, bool forceRefresh = false)
Expand All @@ -351,17 +363,20 @@ public override async Task<TryCatch<PartitionedQueryExecutionInfo>> TryGetPartit

public class SubpartitionTestInput : BaselineTestInput
{
public SubpartitionTestInput(string description, string query, bool ode)
public SubpartitionTestInput(string description, string query, bool ode, bool sortResults = true)
:base(description)
{
this.Query = query;
this.ODE = ode;
this.SortResults = sortResults;
}

internal string Query { get; }

internal bool ODE { get; }

internal bool SortResults { get; }

public override void SerializeAsXml(XmlWriter xmlWriter)
{
xmlWriter.WriteElementString("Description", this.Description);
Expand All @@ -375,17 +390,25 @@ public override void SerializeAsXml(XmlWriter xmlWriter)
public class SubpartitionTestOutput : BaselineTestOutput
{
private readonly List<CosmosElement> documents;
private readonly bool sortResults;

internal SubpartitionTestOutput(IReadOnlyList<CosmosElement> documents)
internal SubpartitionTestOutput(IReadOnlyList<CosmosElement> documents, bool sortResults)
{
this.documents = documents.ToList();
this.sortResults = sortResults;
}

public override void SerializeAsXml(XmlWriter xmlWriter)
{
xmlWriter.WriteStartElement("Documents");
string content = string.Join($",{Environment.NewLine}",
this.documents.Select(doc => doc.ToString()).OrderBy(serializedDoc => serializedDoc));

IEnumerable<string> lines = this.documents.Select(doc => doc.ToString());
if(this.sortResults)
{
lines = lines.OrderBy(serializedDoc => serializedDoc);
}

string content = string.Join($",{Environment.NewLine}", lines);
xmlWriter.WriteCData(content);
xmlWriter.WriteEndElement();
}
Expand Down
Loading