Skip to content

Commit

Permalink
Query : Fixes ORDER BY query issue when partial partition key is spec…
Browse files Browse the repository at this point in the history
…ified with hierarchical partition (#4507)

* Initial commit

* Initial commit

* Update.
  • Loading branch information
adityasa authored May 21, 2024
1 parent 618d120 commit dda9cb4
Show file tree
Hide file tree
Showing 4 changed files with 138 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,8 @@ private static async Task<TryCatch<IQueryPipelineStage>> TryCreateFromPartitione
inputParameters.InitialFeedRange,
trace);

Debug.Assert(targetRanges != null, $"{nameof(CosmosQueryExecutionContextFactory)} Assert!", "targetRanges != null");

TryCatch<IQueryPipelineStage> tryCreatePipelineStage;
Documents.PartitionKeyRange targetRange = await TryGetTargetRangeOptimisticDirectExecutionAsync(
inputParameters,
Expand All @@ -270,7 +272,7 @@ private static async Task<TryCatch<IQueryPipelineStage>> TryCreateFromPartitione
}
else
{
bool singleLogicalPartitionKeyQuery = inputParameters.PartitionKey.HasValue
bool singleLogicalPartitionKeyQuery = (inputParameters.PartitionKey.HasValue && targetRanges.Count == 1)
|| ((partitionedQueryExecutionInfo.QueryRanges.Count == 1)
&& partitionedQueryExecutionInfo.QueryRanges[0].IsSingleValue);
bool serverStreamingQuery = !partitionedQueryExecutionInfo.QueryInfo.HasAggregates
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,4 +57,62 @@
{"id":"2","value2":"97"}]]></Documents>
</Output>
</Result>
<Result>
<Input>
<Description>SELECT ORDER BY with ODE</Description>
<Query><![CDATA[SELECT c.id, c.value2, c.intVal FROM c ORDER BY c.intVal]]></Query>
<ODE>True</ODE>
</Input>
<Output>
<Documents><![CDATA[{"id":"2","value2":"97","intVal":-47},
{"id":"2","value2":"92","intVal":-42},
{"id":"2","value2":"87","intVal":-37},
{"id":"2","value2":"82","intVal":-32},
{"id":"2","value2":"77","intVal":-27},
{"id":"2","value2":"72","intVal":-22},
{"id":"2","value2":"67","intVal":-17},
{"id":"2","value2":"62","intVal":-12},
{"id":"2","value2":"57","intVal":-7},
{"id":"2","value2":"52","intVal":-2},
{"id":"2","value2":"47","intVal":3},
{"id":"2","value2":"42","intVal":8},
{"id":"2","value2":"37","intVal":13},
{"id":"2","value2":"32","intVal":18},
{"id":"2","value2":"27","intVal":23},
{"id":"2","value2":"22","intVal":28},
{"id":"2","value2":"17","intVal":33},
{"id":"2","value2":"12","intVal":38},
{"id":"2","value2":"7","intVal":43},
{"id":"2","value2":"2","intVal":48}]]></Documents>
</Output>
</Result>
<Result>
<Input>
<Description>SELECT ORDER BY without ODE</Description>
<Query><![CDATA[SELECT c.id, c.value2, c.intVal FROM c ORDER BY c.intVal]]></Query>
<ODE>False</ODE>
</Input>
<Output>
<Documents><![CDATA[{"id":"2","value2":"97","intVal":-47},
{"id":"2","value2":"92","intVal":-42},
{"id":"2","value2":"87","intVal":-37},
{"id":"2","value2":"82","intVal":-32},
{"id":"2","value2":"77","intVal":-27},
{"id":"2","value2":"72","intVal":-22},
{"id":"2","value2":"67","intVal":-17},
{"id":"2","value2":"62","intVal":-12},
{"id":"2","value2":"57","intVal":-7},
{"id":"2","value2":"52","intVal":-2},
{"id":"2","value2":"47","intVal":3},
{"id":"2","value2":"42","intVal":8},
{"id":"2","value2":"37","intVal":13},
{"id":"2","value2":"32","intVal":18},
{"id":"2","value2":"27","intVal":23},
{"id":"2","value2":"22","intVal":28},
{"id":"2","value2":"17","intVal":33},
{"id":"2","value2":"12","intVal":38},
{"id":"2","value2":"7","intVal":43},
{"id":"2","value2":"2","intVal":48}]]></Documents>
</Output>
</Result>
</Results>
Original file line number Diff line number Diff line change
Expand Up @@ -52,11 +52,13 @@ internal class InMemoryContainer : IMonadicDocumentContainer
private Dictionary<int, PartitionKeyHashRange> cachedPartitionKeyRangeIdToHashRange;
private readonly bool createSplitForMultiHashAtSecondlevel;
private readonly bool resolvePartitionsBasedOnPrefix;
private readonly QueryRequestOptions queryRequestOptions;

public InMemoryContainer(
PartitionKeyDefinition partitionKeyDefinition,
bool createSplitForMultiHashAtSecondlevel = false,
bool resolvePartitionsBasedOnPrefix = false)
bool resolvePartitionsBasedOnPrefix = false,
QueryRequestOptions queryRequestOptions = null)
{
this.partitionKeyDefinition = partitionKeyDefinition ?? throw new ArgumentNullException(nameof(partitionKeyDefinition));
PartitionKeyHashRange fullRange = new PartitionKeyHashRange(startInclusive: null, endExclusive: new PartitionKeyHash(Cosmos.UInt128.MaxValue));
Expand All @@ -76,6 +78,7 @@ public InMemoryContainer(
this.parentToChildMapping = new Dictionary<int, (int, int)>();
this.createSplitForMultiHashAtSecondlevel = createSplitForMultiHashAtSecondlevel;
this.resolvePartitionsBasedOnPrefix = resolvePartitionsBasedOnPrefix;
this.queryRequestOptions = queryRequestOptions;
}

public Task<TryCatch<List<FeedRangeEpk>>> MonadicGetFeedRangesAsync(
Expand Down Expand Up @@ -512,7 +515,7 @@ public virtual Task<TryCatch<QueryPage>> MonadicQueryAsync(
}

List<CosmosObject> documents = new List<CosmosObject>();
foreach (Record record in records.Where(r => IsRecordWithinFeedRange(r, feedRangeState.FeedRange, this.partitionKeyDefinition)))
foreach (Record record in records.Where(r => IsRecordWithinFeedRange(r, feedRangeState.FeedRange, this.partitionKeyDefinition) && IsRecordWithinQueryPartition(r, this.queryRequestOptions, this.partitionKeyDefinition)))
{
CosmosObject document = ConvertRecordToCosmosElement(record);
documents.Add(CosmosObject.Create(document));
Expand Down Expand Up @@ -716,6 +719,26 @@ public virtual Task<TryCatch<QueryPage>> MonadicQueryAsync(
}
}

private bool IsRecordWithinQueryPartition(Record record, QueryRequestOptions queryRequestOptions, PartitionKeyDefinition partitionKeyDefinition)
{
if(queryRequestOptions?.PartitionKey == null)
{
return true;
}

IList<CosmosElement> partitionKey = GetPartitionKeysFromObjectModel(queryRequestOptions.PartitionKey.Value);
IList<CosmosElement> partitionKeyFromRecord = GetPartitionKeysFromPayload(record.Payload, partitionKeyDefinition);
if (partitionKeyDefinition.Kind == PartitionKind.MultiHash)
{
PartitionKeyHash partitionKeyHash = GetHashFromPartitionKeys(partitionKey, partitionKeyDefinition);
PartitionKeyHash partitionKeyFromRecordHash = GetHashFromPartitionKeys(partitionKeyFromRecord, partitionKeyDefinition);

return partitionKeyHash.Equals(partitionKeyFromRecordHash) || partitionKeyFromRecordHash.Value.StartsWith(partitionKeyHash.Value);
}

return partitionKey.SequenceEqual(partitionKeyFromRecord);
}

public Task<TryCatch<ChangeFeedPage>> MonadicChangeFeedAsync(
FeedRangeState<ChangeFeedState> feedRangeState,
ChangeFeedPaginationOptions changeFeedPaginationOptions,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,12 @@ public void TestQueriesOnSplitContainer()
{
List<SubpartitionTestInput> inputs = new List<SubpartitionTestInput>
{
new SubpartitionTestInput("SELECT", query: @"SELECT c.id, c.value2 FROM c", ode: true),
new SubpartitionTestInput("SELECT without ODE", query: @"SELECT c.id, c.value2 FROM c", ode: false),
new SubpartitionTestInput(description: "SELECT", query: @"SELECT c.id, c.value2 FROM c", ode: true),
new SubpartitionTestInput(description: "SELECT without ODE", query: @"SELECT c.id, c.value2 FROM c", ode: false),
new SubpartitionTestInput(description: "SELECT ORDER BY with ODE", query: @"SELECT c.id, c.value2, c.intVal FROM c ORDER BY c.intVal", ode: true, sortResults: false),
new SubpartitionTestInput(description: "SELECT ORDER BY without ODE", query: @"SELECT c.id, c.value2, c.intVal FROM c ORDER BY c.intVal", ode: false, sortResults: false),
};

this.ExecuteTestSuite(inputs);
}

Expand Down Expand Up @@ -65,16 +68,19 @@ public async Task VerifyTestFrameworkSupportsPartitionSplit()

public override SubpartitionTestOutput ExecuteTest(SubpartitionTestInput input)
{
IMonadicDocumentContainer monadicDocumentContainer = CreateSplitDocumentContainerAsync(DocumentCount).Result;
QueryRequestOptions queryRequestOptions = new QueryRequestOptions()
{
PartitionKey = new PartitionKeyBuilder().Add(SplitPartitionKey.ToString()).Build()
};

IMonadicDocumentContainer monadicDocumentContainer = CreateSplitDocumentContainerAsync(DocumentCount, queryRequestOptions).Result;
DocumentContainer documentContainer = new DocumentContainer(monadicDocumentContainer);
TryCatch _ = monadicDocumentContainer.MonadicRefreshProviderAsync(NoOpTrace.Singleton, cancellationToken: default).Result;
List<FeedRangeEpk> containerRanges = documentContainer.GetFeedRangesAsync(NoOpTrace.Singleton, cancellationToken: default).Result;

List<CosmosElement> documents = new List<CosmosElement>();
QueryRequestOptions queryRequestOptions = new QueryRequestOptions()
{
PartitionKey = new PartitionKeyBuilder().Add(SplitPartitionKey.ToString()).Build()
};
(CosmosQueryExecutionContextFactory.InputParameters inputParameters, CosmosQueryContextCore cosmosQueryContextCore) =
CreateInputParamsAndQueryContext(input, queryRequestOptions);
CreateInputParamsAndQueryContext(input, queryRequestOptions, containerRanges);
IQueryPipelineStage queryPipelineStage = CosmosQueryExecutionContextFactory.Create(
documentContainer,
cosmosQueryContextCore,
Expand All @@ -92,10 +98,10 @@ public override SubpartitionTestOutput ExecuteTest(SubpartitionTestInput input)
documents.AddRange(tryGetPage.Result.Documents);
}

return new SubpartitionTestOutput(documents);
return new SubpartitionTestOutput(documents, input.SortResults);
}

private static Tuple<CosmosQueryExecutionContextFactory.InputParameters, CosmosQueryContextCore> CreateInputParamsAndQueryContext(SubpartitionTestInput input, QueryRequestOptions queryRequestOptions)
private static Tuple<CosmosQueryExecutionContextFactory.InputParameters, CosmosQueryContextCore> CreateInputParamsAndQueryContext(SubpartitionTestInput input, QueryRequestOptions queryRequestOptions, IReadOnlyList<FeedRangeEpk> containerRanges)
{
string query = input.Query;
CosmosElement continuationToken = null;
Expand Down Expand Up @@ -134,10 +140,20 @@ public override SubpartitionTestOutput ExecuteTest(SubpartitionTestInput input)
isNonStreamingOrderByQueryFeatureDisabled: queryRequestOptions.IsNonStreamingOrderByQueryFeatureDisabled,
testInjections: queryRequestOptions.TestSettings);

List<PartitionKeyRange> targetPkRanges = new();
foreach (FeedRangeEpk feedRangeEpk in containerRanges)
{
targetPkRanges.Add(new PartitionKeyRange
{
MinInclusive = feedRangeEpk.Range.Min,
MaxExclusive = feedRangeEpk.Range.Max,
});
}

string databaseId = "db1234";
string resourceLink = $"dbs/{databaseId}/colls";
CosmosQueryContextCore cosmosQueryContextCore = new CosmosQueryContextCore(
client: new TestCosmosQueryClient(queryPartitionProvider),
client: new TestCosmosQueryClient(queryPartitionProvider, targetPkRanges),
resourceTypeEnum: Documents.ResourceType.Document,
operationType: Documents.OperationType.Query,
resourceType: typeof(QueryResponseCore),
Expand Down Expand Up @@ -215,20 +231,20 @@ internal static PartitionKeyDefinition CreatePartitionKeyDefinition()
return partitionKeyDefinition;
}

private static async Task<IDocumentContainer> CreateSplitDocumentContainerAsync(int numItems)
private static async Task<IDocumentContainer> CreateSplitDocumentContainerAsync(int numItems, QueryRequestOptions queryRequestOptions)
{
PartitionKeyDefinition partitionKeyDefinition = CreatePartitionKeyDefinition();
InMemoryContainer inMemoryContainer = await CreateSplitInMemoryDocumentContainerAsync(numItems, partitionKeyDefinition);
InMemoryContainer inMemoryContainer = await CreateSplitInMemoryDocumentContainerAsync(numItems, partitionKeyDefinition, queryRequestOptions);
DocumentContainer documentContainer = new DocumentContainer(inMemoryContainer);
return documentContainer;
}

private static async Task<InMemoryContainer> CreateSplitInMemoryDocumentContainerAsync(int numItems, PartitionKeyDefinition partitionKeyDefinition)
private static async Task<InMemoryContainer> CreateSplitInMemoryDocumentContainerAsync(int numItems, PartitionKeyDefinition partitionKeyDefinition, QueryRequestOptions queryRequestOptions = null)
{
InMemoryContainer inMemoryContainer = new InMemoryContainer(partitionKeyDefinition, createSplitForMultiHashAtSecondlevel: true, resolvePartitionsBasedOnPrefix: true);
InMemoryContainer inMemoryContainer = new InMemoryContainer(partitionKeyDefinition, createSplitForMultiHashAtSecondlevel: true, resolvePartitionsBasedOnPrefix: true, queryRequestOptions: queryRequestOptions);
for (int i = 0; i < numItems; i++)
{
CosmosObject item = CosmosObject.Parse($"{{\"id\" : \"{i % 5}\", \"value1\" : \"{Guid.NewGuid()}\", \"value2\" : \"{i}\" }}");
CosmosObject item = CosmosObject.Parse($"{{\"id\" : \"{i % 5}\", \"value1\" : \"{Guid.NewGuid()}\", \"value2\" : \"{i}\", \"intVal\" : {(numItems/2) - i} }}");
while (true)
{
TryCatch<Record> monadicCreateRecord = await inMemoryContainer.MonadicCreateItemAsync(item, cancellationToken: default);
Expand All @@ -243,13 +259,16 @@ private static async Task<InMemoryContainer> CreateSplitInMemoryDocumentContaine

return inMemoryContainer;
}

internal class TestCosmosQueryClient : CosmosQueryClient
{
private readonly QueryPartitionProvider queryPartitionProvider;
private readonly IReadOnlyList<PartitionKeyRange> targetPartitionKeyRanges;

public TestCosmosQueryClient(QueryPartitionProvider queryPartitionProvider)
public TestCosmosQueryClient(QueryPartitionProvider queryPartitionProvider, IEnumerable<PartitionKeyRange> targetPartitionKeyRanges)
{
this.queryPartitionProvider = queryPartitionProvider;
this.targetPartitionKeyRanges = targetPartitionKeyRanges.ToList();
}

public override Action<IQueryable> OnExecuteScalarQueryCallback => throw new NotImplementedException();
Expand Down Expand Up @@ -322,14 +341,7 @@ public override Task<List<PartitionKeyRange>> GetTargetPartitionKeyRangeByFeedRa

public override Task<List<PartitionKeyRange>> GetTargetPartitionKeyRangesAsync(string resourceLink, string collectionResourceId, IReadOnlyList<Documents.Routing.Range<string>> providedRanges, bool forceRefresh, ITrace trace)
{
return Task.FromResult(new List<PartitionKeyRange>
{
new PartitionKeyRange()
{
MinInclusive = Documents.Routing.PartitionKeyInternal.MinimumInclusiveEffectivePartitionKey,
MaxExclusive = Documents.Routing.PartitionKeyInternal.MaximumExclusiveEffectivePartitionKey
}
});
return Task.FromResult(this.targetPartitionKeyRanges.ToList());
}

public override Task<IReadOnlyList<PartitionKeyRange>> TryGetOverlappingRangesAsync(string collectionResourceId, Documents.Routing.Range<string> range, bool forceRefresh = false)
Expand All @@ -351,17 +363,20 @@ public override async Task<TryCatch<PartitionedQueryExecutionInfo>> TryGetPartit

public class SubpartitionTestInput : BaselineTestInput
{
public SubpartitionTestInput(string description, string query, bool ode)
public SubpartitionTestInput(string description, string query, bool ode, bool sortResults = true)
:base(description)
{
this.Query = query;
this.ODE = ode;
this.SortResults = sortResults;
}

internal string Query { get; }

internal bool ODE { get; }

internal bool SortResults { get; }

public override void SerializeAsXml(XmlWriter xmlWriter)
{
xmlWriter.WriteElementString("Description", this.Description);
Expand All @@ -375,17 +390,25 @@ public override void SerializeAsXml(XmlWriter xmlWriter)
public class SubpartitionTestOutput : BaselineTestOutput
{
private readonly List<CosmosElement> documents;
private readonly bool sortResults;

internal SubpartitionTestOutput(IReadOnlyList<CosmosElement> documents)
internal SubpartitionTestOutput(IReadOnlyList<CosmosElement> documents, bool sortResults)
{
this.documents = documents.ToList();
this.sortResults = sortResults;
}

public override void SerializeAsXml(XmlWriter xmlWriter)
{
xmlWriter.WriteStartElement("Documents");
string content = string.Join($",{Environment.NewLine}",
this.documents.Select(doc => doc.ToString()).OrderBy(serializedDoc => serializedDoc));

IEnumerable<string> lines = this.documents.Select(doc => doc.ToString());
if(this.sortResults)
{
lines = lines.OrderBy(serializedDoc => serializedDoc);
}

string content = string.Join($",{Environment.NewLine}", lines);
xmlWriter.WriteCData(content);
xmlWriter.WriteEndElement();
}
Expand Down

0 comments on commit dda9cb4

Please sign in to comment.