Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(core): Separate execution startedAt from createdAt #10810

Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
677eae7
refactor(core): Separate execution `startedAt` from `createdAt`
ivov Sep 13, 2024
4e8f8e8
Fix for concurrency control
ivov Sep 13, 2024
089beb4
Add TODO
ivov Sep 13, 2024
5329357
Merge master, fix conflicts
ivov Sep 13, 2024
04245d0
Prevent `createdAt` from being updated
ivov Sep 13, 2024
71dd68c
Merge master, fix conflicts again
ivov Sep 13, 2024
87f0414
Add clarifying comment
ivov Sep 13, 2024
b0f22d9
Remove unneeded call
ivov Sep 13, 2024
14af59a
Clean up `setRunning`
ivov Sep 13, 2024
abe04ab
Merge branch 'master' into pay-1815-execution-duration-includes-time-…
ivov Sep 19, 2024
ec1a254
Make `execution_entity.createdAt` not nullable
ivov Sep 19, 2024
99188b4
Copy over `startedAt` to `createdAt`
ivov Sep 19, 2024
ac1e781
Fix tests
ivov Sep 19, 2024
ea731a9
Escape column names
ivov Sep 19, 2024
5b60076
Fix more tests
ivov Sep 20, 2024
70d74ed
Adjust UI per product feedback
ivov Sep 20, 2024
d6d7c12
Fix typecheck
ivov Sep 20, 2024
ad48c60
Fix typecheck again
ivov Sep 20, 2024
3a83192
Fix tests
ivov Sep 20, 2024
e1ea3cc
Merge branch 'master' into pay-1815-execution-duration-includes-time-…
ivov Sep 26, 2024
025b351
Colors feedback
ivov Sep 26, 2024
6a62d47
Batch updates
ivov Sep 26, 2024
44d3280
Handle zero IDs
ivov Sep 26, 2024
1eea4a6
Fix type
ivov Sep 26, 2024
5a519fc
Revert batching
ivov Sep 26, 2024
7c857f1
Fix typecheck
ivov Sep 26, 2024
6d834ac
Merge master, fix conflicts
ivov Sep 27, 2024
669a1f3
Update timestamp
ivov Sep 27, 2024
c505938
Add FE test
ivov Sep 27, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 7 additions & 4 deletions packages/cli/src/active-executions.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import { Service } from 'typedi';

import { ExecutionRepository } from '@/databases/repositories/execution.repository';
import type {
ExecutionPayload,
CreateExecutionPayload,
IExecutingWorkflowData,
IExecutionDb,
IExecutionsCurrentSummary,
Expand Down Expand Up @@ -56,11 +56,10 @@ export class ActiveExecutions {
if (executionId === undefined) {
// Is a new execution so save in DB

const fullExecutionData: ExecutionPayload = {
const fullExecutionData: CreateExecutionPayload = {
data: executionData.executionData!,
mode,
finished: false,
startedAt: new Date(),
workflowData: executionData.workflowData,
status: executionStatus,
workflowId: executionData.workflowData.id,
Expand All @@ -78,7 +77,10 @@ export class ActiveExecutions {
executionId = await this.executionRepository.createNewExecution(fullExecutionData);
assert(executionId);

await this.concurrencyControl.throttle({ mode, executionId });
if (config.getEnv('executions.mode') === 'regular') {
await this.concurrencyControl.throttle({ mode, executionId });
await this.executionRepository.setRunning(executionId);
}
executionStatus = 'running';
} else {
// Is an existing execution we want to finish so update in DB
Expand All @@ -90,6 +92,7 @@ export class ActiveExecutions {
data: executionData.executionData!,
waitTill: null,
status: executionStatus,
// this is resuming, so keep `startedAt` as it was
};

await this.executionRepository.updateExistingExecution(executionId, execution);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,6 @@ export class ConcurrencyControlService {

this.productionQueue.on('execution-released', async (executionId) => {
this.log('Execution released', { executionId });
await this.executionRepository.resetStartedAt(executionId);
});
}

Expand Down
15 changes: 14 additions & 1 deletion packages/cli/src/databases/entities/execution-entity.ts
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,20 @@ export class ExecutionEntity {
@Column('varchar')
status: ExecutionStatus;

@Column(datetimeColumnType)
/**
* Time when the execution was created. This column is nullable to represent
* missing data for executions created before this column was added.
*/
@Column({ type: datetimeColumnType, nullable: true })
ivov marked this conversation as resolved.
Show resolved Hide resolved
createdAt: Date;

/**
* Time when the processing of the execution actually started. This column
* is nullable to represent unavailable data until an execution has started.
* `createdAt` and `startedAt` may differ significantly on scaling mode and
* on regular mode with concurrency control enabled.
*/
@Column({ type: datetimeColumnType, nullable: true })
startedAt: Date;
ivov marked this conversation as resolved.
Show resolved Hide resolved

@Index()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
import type { MigrationContext, ReversibleMigration } from '@/databases/types';

export class SeparateExecutionCreationFromStart1726218295879 implements ReversibleMigration {
async up({ schemaBuilder: { addColumns, column, dropNotNull } }: MigrationContext) {
await addColumns('execution_entity', [column('createdAt').timestamp().default('NOW()')]);
await dropNotNull('execution_entity', 'startedAt');
}

async down({ schemaBuilder: { dropColumns, addNotNull } }: MigrationContext) {
await dropColumns('execution_entity', ['createdAt']);
await addNotNull('execution_entity', 'startedAt');
}
}
2 changes: 2 additions & 0 deletions packages/cli/src/databases/migrations/mysqldb/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ import { AddConstraintToExecutionMetadata1720101653148 } from '../common/1720101
import { CreateInvalidAuthTokenTable1723627610222 } from '../common/1723627610222-CreateInvalidAuthTokenTable';
import { RefactorExecutionIndices1723796243146 } from '../common/1723796243146-RefactorExecutionIndices';
import { CreateAnnotationTables1724753530828 } from '../common/1724753530828-CreateExecutionAnnotationTables';
import { SeparateExecutionCreationFromStart1726218295879 } from '../common/1726218295879-SeparateExecutionCreationFromStart';

export const mysqlMigrations: Migration[] = [
InitialMigration1588157391238,
Expand Down Expand Up @@ -128,4 +129,5 @@ export const mysqlMigrations: Migration[] = [
CreateInvalidAuthTokenTable1723627610222,
RefactorExecutionIndices1723796243146,
CreateAnnotationTables1724753530828,
SeparateExecutionCreationFromStart1726218295879,
];
2 changes: 2 additions & 0 deletions packages/cli/src/databases/migrations/postgresdb/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ import { AddConstraintToExecutionMetadata1720101653148 } from '../common/1720101
import { CreateInvalidAuthTokenTable1723627610222 } from '../common/1723627610222-CreateInvalidAuthTokenTable';
import { RefactorExecutionIndices1723796243146 } from '../common/1723796243146-RefactorExecutionIndices';
import { CreateAnnotationTables1724753530828 } from '../common/1724753530828-CreateExecutionAnnotationTables';
import { SeparateExecutionCreationFromStart1726218295879 } from '../common/1726218295879-SeparateExecutionCreationFromStart';

export const postgresMigrations: Migration[] = [
InitialMigration1587669153312,
Expand Down Expand Up @@ -128,4 +129,5 @@ export const postgresMigrations: Migration[] = [
CreateInvalidAuthTokenTable1723627610222,
RefactorExecutionIndices1723796243146,
CreateAnnotationTables1724753530828,
SeparateExecutionCreationFromStart1726218295879,
];
2 changes: 2 additions & 0 deletions packages/cli/src/databases/migrations/sqlite/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ import { AddConstraintToExecutionMetadata1720101653148 } from '../common/1720101
import { CreateInvalidAuthTokenTable1723627610222 } from '../common/1723627610222-CreateInvalidAuthTokenTable';
import { RefactorExecutionIndices1723796243146 } from '../common/1723796243146-RefactorExecutionIndices';
import { CreateAnnotationTables1724753530828 } from '../common/1724753530828-CreateExecutionAnnotationTables';
import { SeparateExecutionCreationFromStart1726218295879 } from '../common/1726218295879-SeparateExecutionCreationFromStart';

const sqliteMigrations: Migration[] = [
InitialMigration1588102412422,
Expand Down Expand Up @@ -122,6 +123,7 @@ const sqliteMigrations: Migration[] = [
CreateInvalidAuthTokenTable1723627610222,
RefactorExecutionIndices1723796243146,
CreateAnnotationTables1724753530828,
SeparateExecutionCreationFromStart1726218295879,
];

export { sqliteMigrations };
29 changes: 19 additions & 10 deletions packages/cli/src/databases/repositories/execution.repository.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ import { ExecutionAnnotation } from '@/databases/entities/execution-annotation';
import { PostgresLiveRowsRetrievalError } from '@/errors/postgres-live-rows-retrieval.error';
import type { ExecutionSummaries } from '@/executions/execution.types';
import type {
ExecutionPayload,
CreateExecutionPayload,
IExecutionBase,
IExecutionFlattedDb,
IExecutionResponse,
Expand Down Expand Up @@ -301,9 +301,9 @@ export class ExecutionRepository extends Repository<ExecutionEntity> {
/**
* Insert a new execution and its execution data using a transaction.
*/
async createNewExecution(execution: ExecutionPayload): Promise<string> {
async createNewExecution(execution: CreateExecutionPayload): Promise<string> {
const { data, workflowData, ...rest } = execution;
const { identifiers: inserted } = await this.insert(rest);
const { identifiers: inserted } = await this.insert({ ...rest, createdAt: new Date() });
const { id: executionId } = inserted[0] as { id: string };
const { connections, nodes, name, settings } = workflowData ?? {};
await this.executionDataRepository.insert({
Expand Down Expand Up @@ -342,16 +342,25 @@ export class ExecutionRepository extends Repository<ExecutionEntity> {
await this.update({ id: executionId }, { status });
}

async resetStartedAt(executionId: string) {
await this.update({ id: executionId }, { startedAt: new Date() });
async setRunning(executionId: string) {
const startedAt = new Date();

await this.update({ id: executionId }, { status: 'running', startedAt });

return startedAt;
}

async updateExistingExecution(executionId: string, execution: Partial<IExecutionResponse>) {
// Se isolate startedAt because it must be set when the execution starts and should never change.
// So we prevent updating it, if it's sent (it usually is and causes problems to executions that
// are resumed after waiting for some time, as a new startedAt is set)
const { id, data, workflowId, workflowData, startedAt, customData, ...executionInformation } =
execution;
const {
id,
data,
workflowId,
workflowData,
createdAt, // must never change
startedAt, // must never change
customData,
...executionInformation
} = execution;
if (Object.keys(executionInformation).length > 0) {
await this.update({ id: executionId }, executionInformation);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import type { ExecutionStatus, IRun, IWorkflowBase } from 'n8n-workflow';
import { Container } from 'typedi';

import { ExecutionRepository } from '@/databases/repositories/execution.repository';
import type { ExecutionPayload, IExecutionDb } from '@/interfaces';
import type { IExecutionDb, UpdateExecutionPayload } from '@/interfaces';
import { Logger } from '@/logger';
import { ExecutionMetadataService } from '@/services/execution-metadata.service';
import { isWorkflowIdValid } from '@/utils';
Expand Down Expand Up @@ -46,7 +46,7 @@ export function prepareExecutionDataForDbUpdate(parameters: {
'pinData',
]);

const fullExecutionData: ExecutionPayload = {
const fullExecutionData: UpdateExecutionPayload = {
data: runData.data,
mode: runData.mode,
finished: runData.finished ? runData.finished : false,
Expand Down
5 changes: 2 additions & 3 deletions packages/cli/src/executions/execution.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import { QueuedExecutionRetryError } from '@/errors/queued-execution-retry.error
import { InternalServerError } from '@/errors/response-errors/internal-server.error';
import { NotFoundError } from '@/errors/response-errors/not-found.error';
import type {
ExecutionPayload,
CreateExecutionPayload,
IExecutionFlattedResponse,
IExecutionResponse,
IWorkflowDb,
Expand Down Expand Up @@ -321,11 +321,10 @@ export class ExecutionService {
},
};

const fullExecutionData: ExecutionPayload = {
const fullExecutionData: CreateExecutionPayload = {
data: executionData,
mode,
finished: false,
startedAt: new Date(),
workflowData,
workflowId: workflow.id,
stoppedAt: new Date(),
Expand Down
10 changes: 6 additions & 4 deletions packages/cli/src/interfaces.ts
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ export type SaveExecutionDataType = 'all' | 'none';
export interface IExecutionBase {
id: string;
mode: WorkflowExecuteMode;
createdAt: Date; // set by DB
startedAt: Date;
stoppedAt?: Date; // empty value means execution is still running
workflowId: string;
Expand All @@ -131,10 +132,11 @@ export interface IExecutionDb extends IExecutionBase {
workflowData: IWorkflowBase;
}

/**
* Payload for creating or updating an execution.
*/
export type ExecutionPayload = Omit<IExecutionDb, 'id'>;
/** Payload for creating an execution. */
export type CreateExecutionPayload = Omit<IExecutionDb, 'id' | 'createdAt' | 'startedAt'>;

/** Payload for updating an execution. */
export type UpdateExecutionPayload = Omit<IExecutionDb, 'id' | 'createdAt'>;

export interface IExecutionResponse extends IExecutionBase {
id: string;
Expand Down
4 changes: 2 additions & 2 deletions packages/cli/src/scaling/job-processor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ export class JobProcessor {

this.logger.info(`[JobProcessor] Starting job ${job.id} (execution ${executionId})`);

await this.executionRepository.updateStatus(executionId, 'running');
const startedAt = await this.executionRepository.setRunning(executionId);

let { staticData } = execution.workflowData;

Expand Down Expand Up @@ -137,7 +137,7 @@ export class JobProcessor {
workflowId: execution.workflowId,
workflowName: execution.workflowData.name,
mode: execution.mode,
startedAt: execution.startedAt,
startedAt,
retryOf: execution.retryOf ?? '',
status: execution.status,
};
Expand Down
8 changes: 6 additions & 2 deletions packages/cli/src/workflow-execute-additional-data.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,11 @@ import config from '@/config';
import { CredentialsHelper } from '@/credentials-helper';
import { ExecutionRepository } from '@/databases/repositories/execution.repository';
import { ExternalHooks } from '@/external-hooks';
import type { IWorkflowExecuteProcess, IWorkflowErrorData, ExecutionPayload } from '@/interfaces';
import type {
IWorkflowExecuteProcess,
IWorkflowErrorData,
UpdateExecutionPayload,
} from '@/interfaces';
import { NodeTypes } from '@/node-types';
import { Push } from '@/push';
import { WorkflowStatisticsService } from '@/services/workflow-statistics.service';
Expand Down Expand Up @@ -865,7 +869,7 @@ async function executeWorkflow(
// Therefore, database might not contain finished errors.
// Force an update to db as there should be no harm doing this

const fullExecutionData: ExecutionPayload = {
const fullExecutionData: UpdateExecutionPayload = {
data: fullRunData.data,
mode: fullRunData.mode,
finished: fullRunData.finished ? fullRunData.finished : false,
Expand Down
5 changes: 2 additions & 3 deletions packages/cli/src/workflows/workflow-execution.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import type { Project } from '@/databases/entities/project';
import type { User } from '@/databases/entities/user';
import { ExecutionRepository } from '@/databases/repositories/execution.repository';
import { WorkflowRepository } from '@/databases/repositories/workflow.repository';
import type { ExecutionPayload, IWorkflowDb, IWorkflowErrorData } from '@/interfaces';
import type { CreateExecutionPayload, IWorkflowDb, IWorkflowErrorData } from '@/interfaces';
import { Logger } from '@/logger';
import { NodeTypes } from '@/node-types';
import { SubworkflowPolicyChecker } from '@/subworkflows/subworkflow-policy-checker.service';
Expand Down Expand Up @@ -204,11 +204,10 @@ export class WorkflowExecutionService {
initialNode,
);

const fullExecutionData: ExecutionPayload = {
const fullExecutionData: CreateExecutionPayload = {
data: fakeExecution.data,
mode: fakeExecution.mode,
finished: false,
startedAt: new Date(),
stoppedAt: new Date(),
workflowData,
waitTill: null,
Expand Down
Loading