Skip to content

Commit

Permalink
move execution creation into a single place
Browse files Browse the repository at this point in the history
  • Loading branch information
netroy committed Jan 30, 2024
1 parent f350b14 commit c1c2bbc
Showing 1 changed file with 19 additions and 29 deletions.
48 changes: 19 additions & 29 deletions packages/cli/src/WorkflowRunner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ export class WorkflowRunner {
data: IWorkflowExecutionDataProcess,
loadStaticData?: boolean,
realtime?: boolean,
executionId?: string,
restartExecutionId?: string,
responsePromise?: IDeferredPromise<IExecuteResponsePromiseData>,
): Promise<string> {
const workflowId = data.workflowData.id;
Expand All @@ -162,12 +162,18 @@ export class WorkflowRunner {
await this.workflowStaticDataService.getStaticDataById(workflowId);
}

// Register a new execution
const executionId = await this.activeExecutions.add(data, restartExecutionId);
if (responsePromise) {
this.activeExecutions.attachResponsePromise(executionId, responsePromise);
}

if (this.executionsMode === 'queue' && data.executionMode !== 'manual') {
// Do not run "manual" executions in bull because sending events to the
// frontend would not be possible
executionId = await this.enqueueExecution(data, realtime, executionId, responsePromise);
await this.enqueueExecution(executionId, data, realtime, restartExecutionId);
} else {
executionId = await this.runMainProcess(data, executionId, responsePromise);
await this.runMainProcess(executionId, data, restartExecutionId);
void Container.get(InternalHooks).onWorkflowBeforeExecute(executionId, data);
}

Expand All @@ -182,7 +188,7 @@ export class WorkflowRunner {
postExecutePromise
.then(async (executionData) => {
void Container.get(InternalHooks).onWorkflowPostExecute(
executionId!,
executionId,
data.workflowData,
executionData,
data.userId,
Expand Down Expand Up @@ -211,13 +217,10 @@ export class WorkflowRunner {

/** Run the workflow in current process */
private async runMainProcess(
executionId: string,
data: IWorkflowExecutionDataProcess,
restartExecutionId?: string,
responsePromise?: IDeferredPromise<IExecuteResponsePromiseData>,
): Promise<string> {
// Register a new execution
const executionId = await this.activeExecutions.add(data, restartExecutionId);

): Promise<void> {
// Soft timeout to stop workflow execution after current running node
// Changes were made by adding the `workflowTimeout` to the `additionalData`
// So that the timeout will also work for executions with nested workflows.
Expand Down Expand Up @@ -250,6 +253,7 @@ export class WorkflowRunner {
undefined,
workflowTimeout <= 0 ? undefined : Date.now() + workflowTimeout * 1000,
);
// TODO: set this in queue mode as well
additionalData.restartExecutionId = restartExecutionId;

additionalData.executionId = executionId;
Expand Down Expand Up @@ -281,14 +285,12 @@ export class WorkflowRunner {
);
await additionalData.hooks.executeHookFunctions('workflowExecuteAfter', [failedExecution]);
this.activeExecutions.remove(executionId, failedExecution);
return executionId;
return;
}

additionalData.hooks.hookFunctions.sendResponse = [
async (response: IExecuteResponsePromiseData): Promise<void> => {
if (responsePromise) {
responsePromise.resolve(response);
}
this.activeExecutions.resolveResponsePromise(executionId, response);
},
];

Expand Down Expand Up @@ -318,8 +320,8 @@ export class WorkflowRunner {
this.logger.debug(`Execution ID ${executionId} will run executing all nodes.`, {
executionId,
});
// Execute all nodes

// Execute all nodes
const startNode = WorkflowHelpers.getExecutionStartNode(data, workflow);

// Can execute without webhook so go on
Expand Down Expand Up @@ -382,26 +384,15 @@ export class WorkflowRunner {

throw error;
}

return executionId;
}

private async enqueueExecution(
executionId: string,
data: IWorkflowExecutionDataProcess,
realtime?: boolean,
restartExecutionId?: string,
responsePromise?: IDeferredPromise<IExecuteResponsePromiseData>,
): Promise<string> {
// Register a new execution
const executionId = await this.activeExecutions.add(data, restartExecutionId);

if (responsePromise) {
this.activeExecutions.attachResponsePromise(executionId, responsePromise);
}

const jobData: JobData = {
executionId,
};
): Promise<void> {
const jobData: JobData = { executionId };

let priority = 100;
if (realtime === true) {
Expand Down Expand Up @@ -589,6 +580,5 @@ export class WorkflowRunner {
});

this.activeExecutions.attachWorkflowExecution(executionId, workflowExecution);
return executionId;
}
}

0 comments on commit c1c2bbc

Please sign in to comment.