Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(core): Separate execution startedAt from createdAt #10810

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
677eae7
refactor(core): Separate execution `startedAt` from `createdAt`
ivov Sep 13, 2024
4e8f8e8
Fix for concurrency control
ivov Sep 13, 2024
089beb4
Add TODO
ivov Sep 13, 2024
5329357
Merge master, fix conflicts
ivov Sep 13, 2024
04245d0
Prevent `createdAt` from being updated
ivov Sep 13, 2024
71dd68c
Merge master, fix conflicts again
ivov Sep 13, 2024
87f0414
Add clarifying comment
ivov Sep 13, 2024
b0f22d9
Remove unneeded call
ivov Sep 13, 2024
14af59a
Clean up `setRunning`
ivov Sep 13, 2024
abe04ab
Merge branch 'master' into pay-1815-execution-duration-includes-time-…
ivov Sep 19, 2024
ec1a254
Make `execution_entity.createdAt` not nullable
ivov Sep 19, 2024
99188b4
Copy over `startedAt` to `createdAt`
ivov Sep 19, 2024
ac1e781
Fix tests
ivov Sep 19, 2024
ea731a9
Escape column names
ivov Sep 19, 2024
5b60076
Fix more tests
ivov Sep 20, 2024
70d74ed
Adjust UI per product feedback
ivov Sep 20, 2024
d6d7c12
Fix typecheck
ivov Sep 20, 2024
ad48c60
Fix typecheck again
ivov Sep 20, 2024
3a83192
Fix tests
ivov Sep 20, 2024
e1ea3cc
Merge branch 'master' into pay-1815-execution-duration-includes-time-…
ivov Sep 26, 2024
025b351
Colors feedback
ivov Sep 26, 2024
6a62d47
Batch updates
ivov Sep 26, 2024
44d3280
Handle zero IDs
ivov Sep 26, 2024
1eea4a6
Fix type
ivov Sep 26, 2024
5a519fc
Revert batching
ivov Sep 26, 2024
7c857f1
Fix typecheck
ivov Sep 26, 2024
6d834ac
Merge master, fix conflicts
ivov Sep 27, 2024
669a1f3
Update timestamp
ivov Sep 27, 2024
c505938
Add FE test
ivov Sep 27, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 7 additions & 4 deletions packages/cli/src/active-executions.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import { Service } from 'typedi';
import { ExecutionRepository } from '@/databases/repositories/execution.repository';
import { ExecutionNotFoundError } from '@/errors/execution-not-found-error';
import type {
ExecutionPayload,
CreateExecutionPayload,
IExecutingWorkflowData,
IExecutionDb,
IExecutionsCurrentSummary,
Expand Down Expand Up @@ -52,11 +52,10 @@ export class ActiveExecutions {
if (executionId === undefined) {
// Is a new execution so save in DB

const fullExecutionData: ExecutionPayload = {
const fullExecutionData: CreateExecutionPayload = {
data: executionData.executionData!,
mode,
finished: false,
startedAt: new Date(),
workflowData: executionData.workflowData,
status: executionStatus,
workflowId: executionData.workflowData.id,
Expand All @@ -74,7 +73,10 @@ export class ActiveExecutions {
executionId = await this.executionRepository.createNewExecution(fullExecutionData);
assert(executionId);

await this.concurrencyControl.throttle({ mode, executionId });
if (config.getEnv('executions.mode') === 'regular') {
await this.concurrencyControl.throttle({ mode, executionId });
await this.executionRepository.setRunning(executionId);
}
executionStatus = 'running';
} else {
// Is an existing execution we want to finish so update in DB
Expand All @@ -86,6 +88,7 @@ export class ActiveExecutions {
data: executionData.executionData!,
waitTill: null,
status: executionStatus,
// this is resuming, so keep `startedAt` as it was
};

await this.executionRepository.updateExistingExecution(executionId, execution);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,6 @@ export class ConcurrencyControlService {

this.productionQueue.on('execution-released', async (executionId) => {
this.log('Execution released', { executionId });
await this.executionRepository.resetStartedAt(executionId);
});
}

Expand Down
9 changes: 8 additions & 1 deletion packages/cli/src/databases/entities/execution-entity.ts
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,14 @@ export class ExecutionEntity {
status: ExecutionStatus;

@Column(datetimeColumnType)
startedAt: Date;
createdAt: Date;

/**
* Time when the processing of the execution actually started. This column
* is `null` when an execution is enqueued but has not started yet.
*/
@Column({ type: datetimeColumnType, nullable: true })
startedAt: Date | null;

@Index()
@Column({ type: datetimeColumnType, nullable: true })
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
import type { MigrationContext, ReversibleMigration } from '@/databases/types';

export class SeparateExecutionCreationFromStart1727427440136 implements ReversibleMigration {
async up({
schemaBuilder: { addColumns, column, dropNotNull },
runQuery,
escape,
}: MigrationContext) {
await addColumns('execution_entity', [
column('createdAt').notNull.timestamp().default('NOW()'),
]);

await dropNotNull('execution_entity', 'startedAt');

const executionEntity = escape.tableName('execution_entity');
const createdAt = escape.columnName('createdAt');
const startedAt = escape.columnName('startedAt');

// inaccurate for pre-migration rows but prevents `createdAt` from being nullable
await runQuery(`UPDATE ${executionEntity} SET ${createdAt} = ${startedAt};`);
}

async down({ schemaBuilder: { dropColumns, addNotNull } }: MigrationContext) {
await dropColumns('execution_entity', ['createdAt']);
await addNotNull('execution_entity', 'startedAt');
}
}
2 changes: 2 additions & 0 deletions packages/cli/src/databases/migrations/mysqldb/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ import { CreateInvalidAuthTokenTable1723627610222 } from '../common/172362761022
import { RefactorExecutionIndices1723796243146 } from '../common/1723796243146-RefactorExecutionIndices';
import { CreateAnnotationTables1724753530828 } from '../common/1724753530828-CreateExecutionAnnotationTables';
import { AddApiKeysTable1724951148974 } from '../common/1724951148974-AddApiKeysTable';
import { SeparateExecutionCreationFromStart1727427440136 } from '../common/1727427440136-SeparateExecutionCreationFromStart';

export const mysqlMigrations: Migration[] = [
InitialMigration1588157391238,
Expand Down Expand Up @@ -130,4 +131,5 @@ export const mysqlMigrations: Migration[] = [
RefactorExecutionIndices1723796243146,
CreateAnnotationTables1724753530828,
AddApiKeysTable1724951148974,
SeparateExecutionCreationFromStart1727427440136,
];
2 changes: 2 additions & 0 deletions packages/cli/src/databases/migrations/postgresdb/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ import { CreateInvalidAuthTokenTable1723627610222 } from '../common/172362761022
import { RefactorExecutionIndices1723796243146 } from '../common/1723796243146-RefactorExecutionIndices';
import { CreateAnnotationTables1724753530828 } from '../common/1724753530828-CreateExecutionAnnotationTables';
import { AddApiKeysTable1724951148974 } from '../common/1724951148974-AddApiKeysTable';
import { SeparateExecutionCreationFromStart1727427440136 } from '../common/1727427440136-SeparateExecutionCreationFromStart';

export const postgresMigrations: Migration[] = [
InitialMigration1587669153312,
Expand Down Expand Up @@ -130,4 +131,5 @@ export const postgresMigrations: Migration[] = [
RefactorExecutionIndices1723796243146,
CreateAnnotationTables1724753530828,
AddApiKeysTable1724951148974,
SeparateExecutionCreationFromStart1727427440136,
];
2 changes: 2 additions & 0 deletions packages/cli/src/databases/migrations/sqlite/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ import { AddConstraintToExecutionMetadata1720101653148 } from '../common/1720101
import { CreateInvalidAuthTokenTable1723627610222 } from '../common/1723627610222-CreateInvalidAuthTokenTable';
import { RefactorExecutionIndices1723796243146 } from '../common/1723796243146-RefactorExecutionIndices';
import { CreateAnnotationTables1724753530828 } from '../common/1724753530828-CreateExecutionAnnotationTables';
import { SeparateExecutionCreationFromStart1727427440136 } from '../common/1727427440136-SeparateExecutionCreationFromStart';

const sqliteMigrations: Migration[] = [
InitialMigration1588102412422,
Expand Down Expand Up @@ -124,6 +125,7 @@ const sqliteMigrations: Migration[] = [
RefactorExecutionIndices1723796243146,
CreateAnnotationTables1724753530828,
AddApiKeysTable1724951148974,
SeparateExecutionCreationFromStart1727427440136,
];

export { sqliteMigrations };
42 changes: 30 additions & 12 deletions packages/cli/src/databases/repositories/execution.repository.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ import { ExecutionAnnotation } from '@/databases/entities/execution-annotation.e
import { PostgresLiveRowsRetrievalError } from '@/errors/postgres-live-rows-retrieval.error';
import type { ExecutionSummaries } from '@/executions/execution.types';
import type {
ExecutionPayload,
CreateExecutionPayload,
IExecutionBase,
IExecutionFlattedDb,
IExecutionResponse,
Expand Down Expand Up @@ -198,7 +198,7 @@ export class ExecutionRepository extends Repository<ExecutionEntity> {
return executions.map((execution) => {
const { executionData, ...rest } = execution;
return rest;
});
}) as IExecutionFlattedDb[] | IExecutionResponse[] | IExecutionBase[];
}

reportInvalidExecutions(executions: ExecutionEntity[]) {
Expand Down Expand Up @@ -297,15 +297,15 @@ export class ExecutionRepository extends Repository<ExecutionEntity> {
}),
...(options?.includeAnnotation &&
serializedAnnotation && { annotation: serializedAnnotation }),
};
} as IExecutionFlattedDb | IExecutionResponse | IExecutionBase;
}

/**
* Insert a new execution and its execution data using a transaction.
*/
async createNewExecution(execution: ExecutionPayload): Promise<string> {
async createNewExecution(execution: CreateExecutionPayload): Promise<string> {
const { data, workflowData, ...rest } = execution;
const { identifiers: inserted } = await this.insert(rest);
const { identifiers: inserted } = await this.insert({ ...rest, createdAt: new Date() });
const { id: executionId } = inserted[0] as { id: string };
const { connections, nodes, name, settings } = workflowData ?? {};
await this.executionDataRepository.insert({
Expand Down Expand Up @@ -344,16 +344,25 @@ export class ExecutionRepository extends Repository<ExecutionEntity> {
await this.update({ id: executionId }, { status });
}

async resetStartedAt(executionId: string) {
await this.update({ id: executionId }, { startedAt: new Date() });
async setRunning(executionId: string) {
const startedAt = new Date();

await this.update({ id: executionId }, { status: 'running', startedAt });

return startedAt;
}

async updateExistingExecution(executionId: string, execution: Partial<IExecutionResponse>) {
// Se isolate startedAt because it must be set when the execution starts and should never change.
// So we prevent updating it, if it's sent (it usually is and causes problems to executions that
// are resumed after waiting for some time, as a new startedAt is set)
const { id, data, workflowId, workflowData, startedAt, customData, ...executionInformation } =
execution;
const {
id,
data,
workflowId,
workflowData,
createdAt, // must never change
startedAt, // must never change
customData,
...executionInformation
} = execution;
if (Object.keys(executionInformation).length > 0) {
await this.update({ id: executionId }, executionInformation);
}
Expand Down Expand Up @@ -721,6 +730,7 @@ export class ExecutionRepository extends Repository<ExecutionEntity> {
mode: true,
retryOf: true,
status: true,
createdAt: true,
startedAt: true,
stoppedAt: true,
};
Expand Down Expand Up @@ -806,6 +816,7 @@ export class ExecutionRepository extends Repository<ExecutionEntity> {
// @tech_debt: These transformations should not be needed
private toSummary(execution: {
id: number | string;
createdAt?: Date | string;
startedAt?: Date | string;
stoppedAt?: Date | string;
waitTill?: Date | string | null;
Expand All @@ -817,6 +828,13 @@ export class ExecutionRepository extends Repository<ExecutionEntity> {
return date;
};

if (execution.createdAt) {
execution.createdAt =
execution.createdAt instanceof Date
? execution.createdAt.toISOString()
: normalizeDateString(execution.createdAt);
}

if (execution.startedAt) {
execution.startedAt =
execution.startedAt instanceof Date
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import type { ExecutionStatus, IRun, IWorkflowBase } from 'n8n-workflow';
import { Container } from 'typedi';

import { ExecutionRepository } from '@/databases/repositories/execution.repository';
import type { ExecutionPayload, IExecutionDb } from '@/interfaces';
import type { IExecutionDb, UpdateExecutionPayload } from '@/interfaces';
import { Logger } from '@/logger';
import { ExecutionMetadataService } from '@/services/execution-metadata.service';
import { isWorkflowIdValid } from '@/utils';
Expand Down Expand Up @@ -46,7 +46,7 @@ export function prepareExecutionDataForDbUpdate(parameters: {
'pinData',
]);

const fullExecutionData: ExecutionPayload = {
const fullExecutionData: UpdateExecutionPayload = {
data: runData.data,
mode: runData.mode,
finished: runData.finished ? runData.finished : false,
Expand Down
5 changes: 2 additions & 3 deletions packages/cli/src/executions/execution.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import { QueuedExecutionRetryError } from '@/errors/queued-execution-retry.error
import { InternalServerError } from '@/errors/response-errors/internal-server.error';
import { NotFoundError } from '@/errors/response-errors/not-found.error';
import type {
ExecutionPayload,
CreateExecutionPayload,
IExecutionFlattedResponse,
IExecutionResponse,
IWorkflowDb,
Expand Down Expand Up @@ -321,11 +321,10 @@ export class ExecutionService {
},
};

const fullExecutionData: ExecutionPayload = {
const fullExecutionData: CreateExecutionPayload = {
data: executionData,
mode,
finished: false,
startedAt: new Date(),
workflowData,
workflowId: workflow.id,
stoppedAt: new Date(),
Expand Down
10 changes: 6 additions & 4 deletions packages/cli/src/interfaces.ts
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ export type SaveExecutionDataType = 'all' | 'none';
export interface IExecutionBase {
id: string;
mode: WorkflowExecuteMode;
createdAt: Date; // set by DB
startedAt: Date;
stoppedAt?: Date; // empty value means execution is still running
workflowId: string;
Expand All @@ -131,10 +132,11 @@ export interface IExecutionDb extends IExecutionBase {
workflowData: IWorkflowBase;
}

/**
* Payload for creating or updating an execution.
*/
export type ExecutionPayload = Omit<IExecutionDb, 'id'>;
/** Payload for creating an execution. */
export type CreateExecutionPayload = Omit<IExecutionDb, 'id' | 'createdAt' | 'startedAt'>;

/** Payload for updating an execution. */
export type UpdateExecutionPayload = Omit<IExecutionDb, 'id' | 'createdAt'>;

export interface IExecutionResponse extends IExecutionBase {
id: string;
Expand Down
4 changes: 2 additions & 2 deletions packages/cli/src/scaling/job-processor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ export class JobProcessor {

this.logger.info(`[JobProcessor] Starting job ${job.id} (execution ${executionId})`);

await this.executionRepository.updateStatus(executionId, 'running');
const startedAt = await this.executionRepository.setRunning(executionId);

let { staticData } = execution.workflowData;

Expand Down Expand Up @@ -137,7 +137,7 @@ export class JobProcessor {
workflowId: execution.workflowId,
workflowName: execution.workflowData.name,
mode: execution.mode,
startedAt: execution.startedAt,
startedAt,
retryOf: execution.retryOf ?? '',
status: execution.status,
};
Expand Down
8 changes: 6 additions & 2 deletions packages/cli/src/workflow-execute-additional-data.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,11 @@ import config from '@/config';
import { CredentialsHelper } from '@/credentials-helper';
import { ExecutionRepository } from '@/databases/repositories/execution.repository';
import { ExternalHooks } from '@/external-hooks';
import type { IWorkflowExecuteProcess, IWorkflowErrorData, ExecutionPayload } from '@/interfaces';
import type {
IWorkflowExecuteProcess,
IWorkflowErrorData,
UpdateExecutionPayload,
} from '@/interfaces';
import { NodeTypes } from '@/node-types';
import { Push } from '@/push';
import { WorkflowStatisticsService } from '@/services/workflow-statistics.service';
Expand Down Expand Up @@ -865,7 +869,7 @@ async function executeWorkflow(
// Therefore, database might not contain finished errors.
// Force an update to db as there should be no harm doing this

const fullExecutionData: ExecutionPayload = {
const fullExecutionData: UpdateExecutionPayload = {
data: fullRunData.data,
mode: fullRunData.mode,
finished: fullRunData.finished ? fullRunData.finished : false,
Expand Down
5 changes: 2 additions & 3 deletions packages/cli/src/workflows/workflow-execution.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import type { Project } from '@/databases/entities/project';
import type { User } from '@/databases/entities/user';
import { ExecutionRepository } from '@/databases/repositories/execution.repository';
import { WorkflowRepository } from '@/databases/repositories/workflow.repository';
import type { ExecutionPayload, IWorkflowDb, IWorkflowErrorData } from '@/interfaces';
import type { CreateExecutionPayload, IWorkflowDb, IWorkflowErrorData } from '@/interfaces';
import { Logger } from '@/logger';
import { NodeTypes } from '@/node-types';
import { SubworkflowPolicyChecker } from '@/subworkflows/subworkflow-policy-checker.service';
Expand Down Expand Up @@ -206,11 +206,10 @@ export class WorkflowExecutionService {
initialNode,
);

const fullExecutionData: ExecutionPayload = {
const fullExecutionData: CreateExecutionPayload = {
data: fakeExecution.data,
mode: fakeExecution.mode,
finished: false,
startedAt: new Date(),
stoppedAt: new Date(),
workflowData,
waitTill: null,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ describe('ExecutionService', () => {
mode: expect.any(String),
retryOf: null,
status: expect.any(String),
createdAt: expect.any(String),
startedAt: expect.any(String),
stoppedAt: expect.any(String),
waitTill: null,
Expand Down Expand Up @@ -510,6 +511,7 @@ describe('ExecutionService', () => {
mode: expect.any(String),
retryOf: null,
status: expect.any(String),
createdAt: expect.any(String),
startedAt: expect.any(String),
stoppedAt: expect.any(String),
waitTill: null,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,7 @@ test('should report credential in not recently executed workflow', async () => {
const savedExecution = await Container.get(ExecutionRepository).save({
finished: true,
mode: 'manual',
createdAt: date,
startedAt: date,
stoppedAt: date,
workflowId: workflow.id,
Expand Down Expand Up @@ -227,6 +228,7 @@ test('should not report credentials in recently executed workflow', async () =>
const savedExecution = await Container.get(ExecutionRepository).save({
finished: true,
mode: 'manual',
createdAt: date,
startedAt: date,
stoppedAt: date,
workflowId: workflow.id,
Expand Down
Loading
Loading