Skip to content

Commit

Permalink
fix(core): Fixes issue with workflow lastUpdated field (#5015)
Browse files Browse the repository at this point in the history
Fixed issue causing workflow updated field to be affected by statistics data
  • Loading branch information
freya authored Jan 5, 2023
1 parent 7954025 commit 59004fe
Show file tree
Hide file tree
Showing 16 changed files with 263 additions and 28 deletions.
4 changes: 4 additions & 0 deletions packages/cli/src/Server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,7 @@ import { isLogStreamingEnabled } from '@/eventbus/MessageEventBus/MessageEventBu
import { getLicense } from '@/License';
import { licenseController } from './license/license.controller';
import { corsMiddleware } from './middlewares/cors';
import { initEvents } from './events';
import { AbstractServer } from './AbstractServer';

const exec = promisify(callbackExec);
Expand Down Expand Up @@ -1448,6 +1449,9 @@ export async function start(): Promise<void> {
smtp_set_up: config.getEnv('userManagement.emails.mode') === 'smtp',
};

// Set up event handling
initEvents();

const workflow = await Db.collections.Workflow!.findOne({
select: ['createdAt'],
order: { createdAt: 'ASC' },
Expand Down
16 changes: 9 additions & 7 deletions packages/cli/src/api/workflowStats.api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -169,15 +169,17 @@ workflowStatsController.get(
// Get flag
const workflowId = req.params.id;

// Get the corresponding workflow
const workflow = await Db.collections.Workflow.findOne(workflowId);
// It will be valid if we reach this point, this is just for TS
if (!workflow) {
return { dataLoaded: false };
}
// Get the flag
const stats = await Db.collections.WorkflowStatistics.findOne({
select: ['latestEvent'],
where: {
workflowId,
name: StatisticsNames.dataLoaded,
},
});

const data: IWorkflowStatisticsDataLoaded = {
dataLoaded: workflow.dataLoaded,
dataLoaded: stats ? true : false,
};

return data;
Expand Down
4 changes: 4 additions & 0 deletions packages/cli/src/commands/execute.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import { getLogger } from '@/Logger';
import config from '@/config';
import { getInstanceOwner } from '@/UserManagement/UserManagementHelper';
import { findCliWorkflowStart } from '@/utils';
import { initEvents } from '@/events';

export class Execute extends Command {
static description = '\nExecutes a given workflow';
Expand All @@ -47,6 +48,9 @@ export class Execute extends Command {
const binaryDataConfig = config.getEnv('binaryDataManager');
await BinaryDataManager.init(binaryDataConfig, true);

// Add event handlers
initEvents();

// eslint-disable-next-line @typescript-eslint/no-shadow
const { flags } = this.parse(Execute);

Expand Down
4 changes: 4 additions & 0 deletions packages/cli/src/commands/executeBatch.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import config from '@/config';
import { User } from '@db/entities/User';
import { getInstanceOwner } from '@/UserManagement/UserManagementHelper';
import { findCliWorkflowStart } from '@/utils';
import { initEvents } from '@/events';

const re = /\d+/;

Expand Down Expand Up @@ -197,6 +198,9 @@ export class ExecuteBatch extends Command {
// eslint-disable-next-line @typescript-eslint/no-shadow
const { flags } = this.parse(ExecuteBatch);

// Add event handlers
initEvents();

ExecuteBatch.debug = flags.debug;
ExecuteBatch.concurrency = flags.concurrency || 1;

Expand Down
3 changes: 0 additions & 3 deletions packages/cli/src/databases/entities/WorkflowEntity.ts
Original file line number Diff line number Diff line change
Expand Up @@ -86,9 +86,6 @@ export class WorkflowEntity extends AbstractEntity implements IWorkflowDb {
@JoinColumn({ referencedColumnName: 'workflow' })
statistics: WorkflowStatistics[];

@Column({ default: false })
dataLoaded: boolean;

@Column({
type: config.getEnv('database.type') === 'sqlite' ? 'text' : 'json',
nullable: true,
Expand Down
1 change: 1 addition & 0 deletions packages/cli/src/databases/entities/WorkflowStatistics.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ export enum StatisticsNames {
productionError = 'production_error',
manualSuccess = 'manual_success',
manualError = 'manual_error',
dataLoaded = 'data_loaded',
}

@Entity()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
import { MigrationInterface, QueryRunner } from 'typeorm';
import { logMigrationEnd, logMigrationStart } from '@db/utils/migrationHelpers';
import config from '@/config';
import { StatisticsNames } from '@/databases/entities/WorkflowStatistics';

export class RemoveWorkflowDataLoadedFlag1671726148420 implements MigrationInterface {
name = 'RemoveWorkflowDataLoadedFlag1671726148420';

async up(queryRunner: QueryRunner) {
logMigrationStart(this.name);
const tablePrefix = config.getEnv('database.tablePrefix');

// If any existing workflow has dataLoaded set to true, insert the relevant information to the statistics table
const workflowIds: Array<{ id: number; dataLoaded: boolean }> = await queryRunner.query(`
SELECT id, dataLoaded
FROM ${tablePrefix}workflow_entity
`);

workflowIds.map(({ id, dataLoaded }) => {
if (dataLoaded) {
const [insertQuery, insertParams] = queryRunner.connection.driver.escapeQueryWithParameters(
`
INSERT INTO ${tablePrefix}workflow_statistics (workflowId, name, count, latestEvent) VALUES
(:id, :name, 1, CURRENT_TIMESTAMP(3))
`,
{ id, name: StatisticsNames.dataLoaded },
{},
);

return queryRunner.query(insertQuery, insertParams);
}
return undefined;
});

await queryRunner.query(
`ALTER TABLE ${tablePrefix}workflow_entity DROP COLUMN dataLoaded`,
);

logMigrationEnd(this.name);
}

async down(queryRunner: QueryRunner) {
const tablePrefix = config.getEnv('database.tablePrefix');

await queryRunner.query(
`ALTER TABLE ${tablePrefix}workflow_entity ADD COLUMN dataLoaded BOOLEAN DEFAULT false`,
);

// Search through statistics for any workflows that have the dataLoaded stat
const workflowsIds: Array<{ workflowId: string }> = await queryRunner.query(`
SELECT workflowId
FROM ${tablePrefix}workflow_statistics
WHERE name = '${StatisticsNames.dataLoaded}'
`);
workflowsIds.map(({ workflowId }) => {
return queryRunner.query(`
UPDATE ${tablePrefix}workflow_entity
SET dataLoaded = true
WHERE id = '${workflowId}'`);
});

await queryRunner.query(
`DELETE FROM ${tablePrefix}workflow_statistics WHERE name = '${StatisticsNames.dataLoaded}'`,
);
}
}
2 changes: 2 additions & 0 deletions packages/cli/src/databases/migrations/mysqldb/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import { CreateCredentialUsageTable1665484192213 } from './1665484192213-CreateC
import { RemoveCredentialUsageTable1665754637026 } from './1665754637026-RemoveCredentialUsageTable';
import { AddWorkflowVersionIdColumn1669739707125 } from './1669739707125-AddWorkflowVersionIdColumn';
import { AddTriggerCountColumn1669823906994 } from './1669823906994-AddTriggerCountColumn';
import { RemoveWorkflowDataLoadedFlag1671726148420 } from './1671726148420-RemoveWorkflowDataLoadedFlag';
import { MessageEventBusDestinations1671535397530 } from './1671535397530-MessageEventBusDestinations';

export const mysqlMigrations = [
Expand Down Expand Up @@ -57,5 +58,6 @@ export const mysqlMigrations = [
AddWorkflowVersionIdColumn1669739707125,
WorkflowStatistics1664196174002,
AddTriggerCountColumn1669823906994,
RemoveWorkflowDataLoadedFlag1671726148420,
MessageEventBusDestinations1671535397530,
];
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
import { MigrationInterface, QueryRunner } from 'typeorm';
import { getTablePrefix, logMigrationEnd, logMigrationStart } from '@db/utils/migrationHelpers';
import config from '@/config';
import { StatisticsNames } from '@/databases/entities/WorkflowStatistics';

export class RemoveWorkflowDataLoadedFlag1671726148421 implements MigrationInterface {
name = 'RemoveWorkflowDataLoadedFlag1671726148421';

async up(queryRunner: QueryRunner) {
logMigrationStart(this.name);
const tablePrefix = getTablePrefix();

// If any existing workflow has dataLoaded set to true, insert the relevant information to the statistics table
const workflowIds: Array<{ id: number; dataLoaded: boolean }> = await queryRunner.query(`
SELECT id, "dataLoaded"
FROM ${tablePrefix}workflow_entity
`);

workflowIds.map(({ id, dataLoaded }) => {
if (dataLoaded) {
const [insertQuery, insertParams] = queryRunner.connection.driver.escapeQueryWithParameters(
`
INSERT INTO ${tablePrefix}workflow_statistics ("workflowId", name, count, "latestEvent") VALUES
(:id, :name, 1, CURRENT_TIMESTAMP(3))
`,
{ id, name: StatisticsNames.dataLoaded },
{},
);

return queryRunner.query(insertQuery, insertParams);
}
return undefined;
});

await queryRunner.query(`ALTER TABLE ${tablePrefix}workflow_entity DROP COLUMN "dataLoaded"`);

logMigrationEnd(this.name);
}

async down(queryRunner: QueryRunner) {
const tablePrefix = getTablePrefix();

await queryRunner.query(
`ALTER TABLE ${tablePrefix}workflow_entity ADD COLUMN "dataLoaded" BOOLEAN DEFAULT false`,
);

// Search through statistics for any workflows that have the dataLoaded stat
const workflowsIds: Array<{ workflowId: string }> = await queryRunner.query(`
SELECT "workflowId"
FROM ${tablePrefix}workflow_statistics
WHERE name = '${StatisticsNames.dataLoaded}'
`);
workflowsIds.map(({ workflowId }) => {
return queryRunner.query(`
UPDATE ${tablePrefix}workflow_entity
SET "dataLoaded" = true
WHERE id = '${workflowId}'`);
});

await queryRunner.query(
`DELETE FROM ${tablePrefix}workflow_statistics WHERE name = '${StatisticsNames.dataLoaded}'`,
);
}
}
2 changes: 2 additions & 0 deletions packages/cli/src/databases/migrations/postgresdb/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import { CreateCredentialUsageTable1665484192212 } from './1665484192212-CreateC
import { RemoveCredentialUsageTable1665754637025 } from './1665754637025-RemoveCredentialUsageTable';
import { AddWorkflowVersionIdColumn1669739707126 } from './1669739707126-AddWorkflowVersionIdColumn';
import { AddTriggerCountColumn1669823906995 } from './1669823906995-AddTriggerCountColumn';
import { RemoveWorkflowDataLoadedFlag1671726148421 } from './1671726148421-RemoveWorkflowDataLoadedFlag';
import { MessageEventBusDestinations1671535397530 } from './1671535397530-MessageEventBusDestinations';

export const postgresMigrations = [
Expand Down Expand Up @@ -53,5 +54,6 @@ export const postgresMigrations = [
AddWorkflowVersionIdColumn1669739707126,
WorkflowStatistics1664196174001,
AddTriggerCountColumn1669823906995,
RemoveWorkflowDataLoadedFlag1671726148421,
MessageEventBusDestinations1671535397530,
];
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
import { MigrationInterface, QueryRunner } from 'typeorm';
import { logMigrationEnd, logMigrationStart } from '@db/utils/migrationHelpers';
import config from '@/config';
import { v4 as uuidv4 } from 'uuid';
import { StatisticsNames } from '@/databases/entities/WorkflowStatistics';

export class RemoveWorkflowDataLoadedFlag1671726148419 implements MigrationInterface {
name = 'RemoveWorkflowDataLoadedFlag1671726148419';

async up(queryRunner: QueryRunner) {
logMigrationStart(this.name);
const tablePrefix = config.getEnv('database.tablePrefix');

// If any existing workflow has dataLoaded set to true, insert the relevant information to the statistics table
const workflowIds: Array<{ id: number; dataLoaded: boolean }> = await queryRunner.query(`
SELECT id, dataLoaded
FROM "${tablePrefix}workflow_entity"
`);

workflowIds.map(({ id, dataLoaded }) => {
if (dataLoaded) {
const [insertQuery, insertParams] = queryRunner.connection.driver.escapeQueryWithParameters(
`
INSERT INTO "${tablePrefix}workflow_statistics" (workflowId, name, count, latestEvent) VALUES
(:id, :name, 1, STRFTIME('%Y-%m-%d %H:%M:%f', 'NOW'))
`,
{ id, name: StatisticsNames.dataLoaded },
{},
);

return queryRunner.query(insertQuery, insertParams);
}
return undefined;
});

await queryRunner.query(
`ALTER TABLE \`${tablePrefix}workflow_entity\` DROP COLUMN "dataLoaded"`,
);

logMigrationEnd(this.name);
}

async down(queryRunner: QueryRunner) {
const tablePrefix = config.getEnv('database.tablePrefix');

await queryRunner.query(
`ALTER TABLE \`${tablePrefix}workflow_entity\` ADD COLUMN "dataLoaded" BOOLEAN DEFAULT false`,
);

// Search through statistics for any workflows that have the dataLoaded stat
const workflowsIds: Array<{ workflowId: string }> = await queryRunner.query(`
SELECT workflowId
FROM "${tablePrefix}workflow_statistics"
WHERE name = '${StatisticsNames.dataLoaded}'
`);
workflowsIds.map(({ workflowId }) => {
return queryRunner.query(`
UPDATE "${tablePrefix}workflow_entity"
SET dataLoaded = true
WHERE id = '${workflowId}'`);
});

await queryRunner.query(
`DELETE FROM "${tablePrefix}workflow_statistics" WHERE name = '${StatisticsNames.dataLoaded}'`,
);
}
}
2 changes: 2 additions & 0 deletions packages/cli/src/databases/migrations/sqlite/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import { CreateCredentialUsageTable1665484192211 } from './1665484192211-CreateC
import { RemoveCredentialUsageTable1665754637024 } from './1665754637024-RemoveCredentialUsageTable';
import { AddWorkflowVersionIdColumn1669739707124 } from './1669739707124-AddWorkflowVersionIdColumn';
import { AddTriggerCountColumn1669823906993 } from './1669823906993-AddTriggerCountColumn';
import { RemoveWorkflowDataLoadedFlag1671726148419 } from './1671726148419-RemoveWorkflowDataLoadedFlag';
import { MessageEventBusDestinations1671535397530 } from './1671535397530-MessageEventBusDestinations';

const sqliteMigrations = [
Expand Down Expand Up @@ -51,6 +52,7 @@ const sqliteMigrations = [
AddWorkflowVersionIdColumn1669739707124,
AddTriggerCountColumn1669823906993,
WorkflowStatistics1664196174000,
RemoveWorkflowDataLoadedFlag1671726148419,
MessageEventBusDestinations1671535397530,
];

Expand Down
34 changes: 23 additions & 11 deletions packages/cli/src/events/WorkflowStatistics.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
import type { INode, IRun, IWorkflowBase } from 'n8n-workflow';
import { INode, IRun, IWorkflowBase } from 'n8n-workflow';
import * as Db from '@/Db';
import { InternalHooksManager } from '@/InternalHooksManager';
import { StatisticsNames } from '@/databases/entities/WorkflowStatistics';
import { getWorkflowOwner } from '@/UserManagement/UserManagementHelper';
import { QueryFailedError } from 'typeorm';

export async function workflowExecutionCompleted(
workflowData: IWorkflowBase,
Expand Down Expand Up @@ -47,7 +48,10 @@ export async function workflowExecutionCompleted(
// Send the metrics
await InternalHooksManager.getInstance().onFirstProductionWorkflowSuccess(metrics);
} catch (error) {
// Do we just assume it's a conflict error? If there is any other sort of error in the DB it should trigger here too
if (!(error instanceof QueryFailedError)) {
throw error;
}

await Db.collections.WorkflowStatistics.update(
{ workflowId, name },
{ count: () => 'count + 1', latestEvent: new Date() },
Expand All @@ -56,16 +60,24 @@ export async function workflowExecutionCompleted(
}

export async function nodeFetchedData(workflowId: string, node: INode): Promise<void> {
// Update only if necessary
const response = await Db.collections.Workflow.update(
{ id: workflowId, dataLoaded: false },
{ dataLoaded: true },
);

// If response.affected is 1 then we know this was the first time data was loaded into the workflow; do posthog event here
if (!response.affected) return;
// Try to insert the data loaded statistic
try {
await Db.collections.WorkflowStatistics.insert({
workflowId,
name: StatisticsNames.dataLoaded,
count: 1,
latestEvent: new Date(),
});
} catch (error) {
// if it's a duplicate key error then that's fine, otherwise throw the error
if (!(error instanceof QueryFailedError)) {
throw error;
}
// If it is a query failed error, we return
return;
}

// Compile the metrics
// Compile the metrics since this was a new data loaded event
const owner = await getWorkflowOwner(workflowId);
let metrics = {
user_id: owner.id,
Expand Down
Loading

0 comments on commit 59004fe

Please sign in to comment.