Skip to content

Commit

Permalink
perf(core): Optimize executions filtering by metadata (#9477)
Browse files Browse the repository at this point in the history
  • Loading branch information
ivov authored May 22, 2024
1 parent 09a5867 commit 9bdc83a
Show file tree
Hide file tree
Showing 3 changed files with 53 additions and 7 deletions.
15 changes: 10 additions & 5 deletions packages/cli/src/databases/repositories/execution.repository.ts
Original file line number Diff line number Diff line change
Expand Up @@ -728,12 +728,17 @@ export class ExecutionRepository extends Repository<ExecutionEntity> {
if (startedBefore) qb.andWhere({ startedAt: lessThanOrEqual(startedBefore) });
if (startedAfter) qb.andWhere({ startedAt: moreThanOrEqual(startedAfter) });

if (metadata) {
qb.leftJoin(ExecutionMetadata, 'md', 'md.executionId = execution.id');
if (metadata?.length === 1) {
const [{ key, value }] = metadata;

for (const item of metadata) {
qb.andWhere('md.key = :key AND md.value = :value', item);
}
qb.innerJoin(
ExecutionMetadata,
'md',
'md.executionId = execution.id AND md.key = :key AND md.value = :value',
);

qb.setParameter('key', key);
qb.setParameter('value', value);
}

return qb;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,32 @@ describe('ExecutionService', () => {
]);
});

test('should filter executions by `metadata`', async () => {
const workflow = await createWorkflow();

const metadata = [{ key: 'myKey', value: 'myValue' }];

await Promise.all([
createExecution({ status: 'success', metadata }, workflow),
createExecution({ status: 'error' }, workflow),
]);

const query: ExecutionSummaries.RangeQuery = {
kind: 'range',
range: { limit: 20 },
accessibleWorkflowIds: [workflow.id],
metadata,
};

const output = await executionService.findRangeWithCount(query);

expect(output).toEqual({
count: 1,
estimated: false,
results: [expect.objectContaining({ status: 'success' })],
});
});

test('should exclude executions by inaccessible `workflowId`', async () => {
const accessibleWorkflow = await createWorkflow();
const inaccessibleWorkflow = await createWorkflow();
Expand Down
19 changes: 17 additions & 2 deletions packages/cli/test/integration/shared/db/executions.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import type { ExecutionEntity } from '@db/entities/ExecutionEntity';
import type { WorkflowEntity } from '@db/entities/WorkflowEntity';
import { ExecutionRepository } from '@db/repositories/execution.repository';
import { ExecutionDataRepository } from '@db/repositories/executionData.repository';
import { ExecutionMetadataRepository } from '@/databases/repositories/executionMetadata.repository';

export async function createManyExecutions(
amount: number,
Expand All @@ -18,10 +19,14 @@ export async function createManyExecutions(
* Store a execution in the DB and assign it to a workflow.
*/
export async function createExecution(
attributes: Partial<ExecutionEntity & ExecutionData>,
attributes: Partial<
Omit<ExecutionEntity, 'metadata'> &
ExecutionData & { metadata: Array<{ key: string; value: string }> }
>,
workflow: WorkflowEntity,
) {
const { data, finished, mode, startedAt, stoppedAt, waitTill, status, deletedAt } = attributes;
const { data, finished, mode, startedAt, stoppedAt, waitTill, status, deletedAt, metadata } =
attributes;

const execution = await Container.get(ExecutionRepository).save({
finished: finished ?? true,
Expand All @@ -34,6 +39,16 @@ export async function createExecution(
deletedAt,
});

if (metadata?.length) {
const metadataToSave = metadata.map(({ key, value }) => ({
key,
value,
execution: { id: execution.id },
}));

await Container.get(ExecutionMetadataRepository).save(metadataToSave);
}

await Container.get(ExecutionDataRepository).save({
data: data ?? '[]',
workflowData: workflow ?? {},
Expand Down

0 comments on commit 9bdc83a

Please sign in to comment.