Skip to content

Commit

Permalink
fix: Do not fail if partitions are not ready for specified date range…
Browse files Browse the repository at this point in the history
… interval but there's at least one ready for pre-aggregation (#6026)
  • Loading branch information
paveltiunov authored Jan 17, 2023
1 parent 7ec839e commit 2d39fe4
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1595,33 +1595,47 @@ export class PreAggregationPartitionRangeLoader {

public async loadPreAggregations(): Promise<LoadPreAggregationResult> {
if (this.preAggregation.partitionGranularity && !this.preAggregation.expandedPartition) {
const { buildRange, partitionRanges } = await this.partitionRanges();
const partitionLoaders = partitionRanges.map(range => new PreAggregationLoader(
this.redisPrefix,
this.driverFactory,
this.logger,
this.queryCache,
this.preAggregations,
this.partitionPreAggregationDescription(range, buildRange),
this.preAggregationsTablesToTempTables,
this.loadCache,
this.options,
));
const resolveResults = await Promise.all(partitionLoaders.map(async (l, i) => {
const result = await l.loadPreAggregation(false);
return result && {
...result,
partitionRange: partitionRanges[i]
};
}));
let loadResults = resolveResults.filter(res => res !== null);
if (this.options.externalRefresh && loadResults.length === 0) {
const loadPreAggregationsByPartitionRanges = async ({ buildRange, partitionRanges }: PartitionRanges) => {
const partitionLoaders = partitionRanges.map(range => new PreAggregationLoader(
this.redisPrefix,
this.driverFactory,
this.logger,
this.queryCache,
this.preAggregations,
this.partitionPreAggregationDescription(range, buildRange),
this.preAggregationsTablesToTempTables,
this.loadCache,
this.options,
));
const resolveResults = await Promise.all(partitionLoaders.map(async (l, i) => {
const result = await l.loadPreAggregation(false);
return result && {
...result,
partitionRange: partitionRanges[i]
};
}));
return { loadResults: resolveResults.filter(res => res !== null), partitionLoaders };
};

// eslint-disable-next-line prefer-const
let loadResultAndLoaders = await loadPreAggregationsByPartitionRanges(await this.partitionRanges());
if (this.options.externalRefresh && loadResultAndLoaders.loadResults.length === 0) {
loadResultAndLoaders = await loadPreAggregationsByPartitionRanges(await this.partitionRanges(true));
// In case there're no partitions ready at matched time dimension intersection then no data can be retrieved.
// We need to provide any table so query can just execute successfully.
if (loadResultAndLoaders.loadResults.length > 0) {
loadResultAndLoaders.loadResults = [loadResultAndLoaders.loadResults[loadResultAndLoaders.loadResults.length - 1]];
}
}
if (this.options.externalRefresh && loadResultAndLoaders.loadResults.length === 0) {
throw new Error(
// eslint-disable-next-line no-use-before-define
PreAggregations.noPreAggregationPartitionsBuiltMessage(partitionLoaders.map(p => p.preAggregation))
PreAggregations.noPreAggregationPartitionsBuiltMessage(loadResultAndLoaders.partitionLoaders.map(p => p.preAggregation))
);
}

let { loadResults } = loadResultAndLoaders;

let lambdaTable: InlineTable;
let emptyResult = false;

Expand Down Expand Up @@ -1727,14 +1741,14 @@ export class PreAggregationPartitionRangeLoader {
}
}

private async partitionRanges(): Promise<PartitionRanges> {
private async partitionRanges(ignoreMatchedDateRange?: boolean): Promise<PartitionRanges> {
const buildRange = await this.loadBuildRange();
if (!buildRange[0] || !buildRange[1]) {
return { buildRange, partitionRanges: [] };
}
let dateRange = PreAggregationPartitionRangeLoader.intersectDateRanges(
buildRange,
this.preAggregation.matchedTimeDimensionDateRange,
ignoreMatchedDateRange ? undefined : this.preAggregation.matchedTimeDimensionDateRange,
);
if (!dateRange) {
// If there's no date range intersection between query data range and pre-aggregation build range
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1076,8 +1076,8 @@ describe('QueryOrchestrator', () => {
});

test('empty partitions with externalRefresh', async () => {
const query = {
query: 'SELECT * FROM stb_pre_aggregations.orders_d',
const query = ({ startQuery, endQuery, matchedTimeDimensionDateRange }) => ({
query: 'SELECT * FROM stb_pre_aggregations.orders_empty',
values: [],
cacheKeyQueries: {
queries: []
Expand All @@ -1100,19 +1100,28 @@ describe('QueryOrchestrator', () => {
indexName: 'orders_d_main'
}],
preAggregationStartEndQueries: [
['SELECT MIN(created_at) FROM orders', []],
['SELECT MAX(created_at) FROM orders', []],
[startQuery || 'SELECT MIN(created_at) FROM orders', []],
[endQuery || 'SELECT MAX(created_at) FROM orders', []],
],
partitionGranularity: 'day',
timezone: 'UTC'
timezone: 'UTC',
matchedTimeDimensionDateRange
}],
requestId: 'empty partitions',
};
requestId: 'empty partitions with externalRefresh',
});
await expect(async () => {
await queryOrchestratorExternalRefresh.fetchQuery(query);
await queryOrchestratorExternalRefresh.fetchQuery(query({}));
}).rejects.toThrow(
/refresh worker/
);
await queryOrchestrator.fetchQuery(query({ startQuery: 'SELECT \'2021-05-01\'', endQuery: 'SELECT \'2021-05-15\'' }));
const result = await queryOrchestratorExternalRefresh.fetchQuery(query({
startQuery: 'SELECT \'2021-05-01\'',
endQuery: 'SELECT \'2021-05-15\'',
matchedTimeDimensionDateRange: ['2021-05-31T00:00:00.000', '2021-05-31T23:59:59.999']
}));
console.log(JSON.stringify(result, null, 2));
expect(result.data[0]).toMatch(/orders_empty20210515/);
});

test('empty intersection', async () => {
Expand Down

0 comments on commit 2d39fe4

Please sign in to comment.