Skip to content

Commit

Permalink
Reverted WorkflowRunner #8487 changes
Browse files Browse the repository at this point in the history
  • Loading branch information
OlegIvaniv committed Feb 8, 2024
1 parent c04f92f commit 538f3d2
Showing 1 changed file with 35 additions and 15 deletions.
50 changes: 35 additions & 15 deletions packages/cli/src/WorkflowRunner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -153,21 +153,27 @@ export class WorkflowRunner {
data: IWorkflowExecutionDataProcess,
loadStaticData?: boolean,
realtime?: boolean,
restartExecutionId?: string,
executionId?: string,
responsePromise?: IDeferredPromise<IExecuteResponsePromiseData>,
): Promise<string> {
// Register a new execution
const executionId = await this.activeExecutions.add(data, restartExecutionId);
if (responsePromise) {
this.activeExecutions.attachResponsePromise(executionId, responsePromise);
}
// const executionId = await this.activeExecutions.add(data, restartExecutionId);
// if (responsePromise) {
// this.activeExecutions.attachResponsePromise(executionId, responsePromise);
// }

if (this.executionsMode === 'queue' && data.executionMode !== 'manual') {
// Do not run "manual" executions in bull because sending events to the
// frontend would not be possible
await this.enqueueExecution(executionId, data, loadStaticData, realtime);
executionId = await this.enqueueExecution(
data,
loadStaticData,
realtime,
executionId,
responsePromise,
);
} else {
await this.runMainProcess(executionId, data, loadStaticData, executionId);
executionId = await this.runMainProcess(data, loadStaticData, executionId, responsePromise);
void Container.get(InternalHooks).onWorkflowBeforeExecute(executionId, data);
}

Expand All @@ -182,7 +188,7 @@ export class WorkflowRunner {
postExecutePromise
.then(async (executionData) => {
void Container.get(InternalHooks).onWorkflowPostExecute(
executionId,
executionId!,
data.workflowData,
executionData,
data.userId,
Expand Down Expand Up @@ -211,11 +217,11 @@ export class WorkflowRunner {

/** Run the workflow in current process */
private async runMainProcess(
executionId: string,
data: IWorkflowExecutionDataProcess,
loadStaticData?: boolean,
restartExecutionId?: string,
): Promise<void> {
responsePromise?: IDeferredPromise<IExecuteResponsePromiseData>,
): Promise<string> {
const workflowId = data.workflowData.id;
if (loadStaticData === true && workflowId) {
data.workflowData.staticData =
Expand Down Expand Up @@ -256,7 +262,8 @@ export class WorkflowRunner {
);
// TODO: set this in queue mode as well
additionalData.restartExecutionId = restartExecutionId;

// Register the active execution
const executionId = await this.activeExecutions.add(data, restartExecutionId);
additionalData.executionId = executionId;

this.logger.verbose(
Expand Down Expand Up @@ -286,12 +293,15 @@ export class WorkflowRunner {
);
await additionalData.hooks.executeHookFunctions('workflowExecuteAfter', [failedExecution]);
this.activeExecutions.remove(executionId, failedExecution);
return;
return executionId;
}

additionalData.hooks.hookFunctions.sendResponse = [
async (response: IExecuteResponsePromiseData): Promise<void> => {
this.activeExecutions.resolveResponsePromise(executionId, response);
// this.activeExecutions.resolveResponsePromise(executionId, response);
if (responsePromise) {
responsePromise.resolve(response);
}
},
];

Expand Down Expand Up @@ -385,14 +395,23 @@ export class WorkflowRunner {

throw error;
}
return executionId;
}

private async enqueueExecution(
executionId: string,
data: IWorkflowExecutionDataProcess,
loadStaticData?: boolean,
realtime?: boolean,
): Promise<void> {
restartExecutionId?: string,
responsePromise?: IDeferredPromise<IExecuteResponsePromiseData>,
): Promise<string> {
// TODO: If "loadStaticData" is set to true it has to load data new on worker

// Register the active execution
const executionId = await this.activeExecutions.add(data, restartExecutionId);
if (responsePromise) {
this.activeExecutions.attachResponsePromise(executionId, responsePromise);
}
const jobData: JobData = {
executionId,
loadStaticData: !!loadStaticData,
Expand Down Expand Up @@ -584,5 +603,6 @@ export class WorkflowRunner {
});

this.activeExecutions.attachWorkflowExecution(executionId, workflowExecution);
return executionId;
}
}

0 comments on commit 538f3d2

Please sign in to comment.