Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(core): Fixes issue with workflow lastUpdated field #5015

Merged
merged 15 commits into from
Jan 5, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions packages/cli/src/Server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,7 @@ import { isLogStreamingEnabled } from '@/eventbus/MessageEventBus/MessageEventBu
import { getLicense } from '@/License';
import { licenseController } from './license/license.controller';
import { corsMiddleware } from './middlewares/cors';
import { initEvents } from './events';
import { AbstractServer } from './AbstractServer';

const exec = promisify(callbackExec);
Expand Down Expand Up @@ -1448,6 +1449,9 @@ export async function start(): Promise<void> {
smtp_set_up: config.getEnv('userManagement.emails.mode') === 'smtp',
};

// Set up event handling
initEvents();

const workflow = await Db.collections.Workflow!.findOne({
select: ['createdAt'],
order: { createdAt: 'ASC' },
Expand Down
16 changes: 9 additions & 7 deletions packages/cli/src/api/workflowStats.api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -169,15 +169,17 @@ workflowStatsController.get(
// Get flag
const workflowId = req.params.id;

// Get the corresponding workflow
const workflow = await Db.collections.Workflow.findOne(workflowId);
// It will be valid if we reach this point, this is just for TS
if (!workflow) {
return { dataLoaded: false };
}
// Get the flag
const stats = await Db.collections.WorkflowStatistics.findOne({
select: ['latestEvent'],
where: {
workflowId,
name: StatisticsNames.dataLoaded,
},
});

const data: IWorkflowStatisticsDataLoaded = {
dataLoaded: workflow.dataLoaded,
dataLoaded: stats ? true : false,
};

return data;
Expand Down
4 changes: 4 additions & 0 deletions packages/cli/src/commands/execute.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import { getLogger } from '@/Logger';
import config from '@/config';
import { getInstanceOwner } from '@/UserManagement/UserManagementHelper';
import { findCliWorkflowStart } from '@/utils';
import { initEvents } from '@/events';

export class Execute extends Command {
static description = '\nExecutes a given workflow';
Expand All @@ -47,6 +48,9 @@ export class Execute extends Command {
const binaryDataConfig = config.getEnv('binaryDataManager');
await BinaryDataManager.init(binaryDataConfig, true);

// Add event handlers
initEvents();

// eslint-disable-next-line @typescript-eslint/no-shadow
const { flags } = this.parse(Execute);

Expand Down
4 changes: 4 additions & 0 deletions packages/cli/src/commands/executeBatch.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import config from '@/config';
import { User } from '@db/entities/User';
import { getInstanceOwner } from '@/UserManagement/UserManagementHelper';
import { findCliWorkflowStart } from '@/utils';
import { initEvents } from '@/events';

const re = /\d+/;

Expand Down Expand Up @@ -197,6 +198,9 @@ export class ExecuteBatch extends Command {
// eslint-disable-next-line @typescript-eslint/no-shadow
const { flags } = this.parse(ExecuteBatch);

// Add event handlers
initEvents();

ExecuteBatch.debug = flags.debug;
ExecuteBatch.concurrency = flags.concurrency || 1;

Expand Down
3 changes: 0 additions & 3 deletions packages/cli/src/databases/entities/WorkflowEntity.ts
Original file line number Diff line number Diff line change
Expand Up @@ -86,9 +86,6 @@ export class WorkflowEntity extends AbstractEntity implements IWorkflowDb {
@JoinColumn({ referencedColumnName: 'workflow' })
statistics: WorkflowStatistics[];

@Column({ default: false })
dataLoaded: boolean;

@Column({
type: config.getEnv('database.type') === 'sqlite' ? 'text' : 'json',
nullable: true,
Expand Down
1 change: 1 addition & 0 deletions packages/cli/src/databases/entities/WorkflowStatistics.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ export enum StatisticsNames {
productionError = 'production_error',
manualSuccess = 'manual_success',
manualError = 'manual_error',
dataLoaded = 'data_loaded',
}

@Entity()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
import { MigrationInterface, QueryRunner } from 'typeorm';
import { logMigrationEnd, logMigrationStart } from '@db/utils/migrationHelpers';
import config from '@/config';
import { StatisticsNames } from '@/databases/entities/WorkflowStatistics';

export class RemoveWorkflowDataLoadedFlag1671726148420 implements MigrationInterface {
name = 'RemoveWorkflowDataLoadedFlag1671726148420';

async up(queryRunner: QueryRunner) {
logMigrationStart(this.name);
const tablePrefix = config.getEnv('database.tablePrefix');

// If any existing workflow has dataLoaded set to true, insert the relevant information to the statistics table
const workflowIds: Array<{ id: number; dataLoaded: boolean }> = await queryRunner.query(`
SELECT id, dataLoaded
FROM ${tablePrefix}workflow_entity
`);

workflowIds.map(({ id, dataLoaded }) => {
if (dataLoaded) {
const [insertQuery, insertParams] = queryRunner.connection.driver.escapeQueryWithParameters(
`
INSERT INTO ${tablePrefix}workflow_statistics (workflowId, name, count, latestEvent) VALUES
(:id, :name, 1, CURRENT_TIMESTAMP(3))
`,
{ id, name: StatisticsNames.dataLoaded },
{},
);

return queryRunner.query(insertQuery, insertParams);
}
return undefined;
});

await queryRunner.query(
`ALTER TABLE ${tablePrefix}workflow_entity DROP COLUMN dataLoaded`,
);

logMigrationEnd(this.name);
}

async down(queryRunner: QueryRunner) {
const tablePrefix = config.getEnv('database.tablePrefix');

await queryRunner.query(
`ALTER TABLE ${tablePrefix}workflow_entity ADD COLUMN dataLoaded BOOLEAN DEFAULT false`,
);

// Search through statistics for any workflows that have the dataLoaded stat
const workflowsIds: Array<{ workflowId: string }> = await queryRunner.query(`
SELECT workflowId
FROM ${tablePrefix}workflow_statistics
WHERE name = '${StatisticsNames.dataLoaded}'
`);
workflowsIds.map(({ workflowId }) => {
return queryRunner.query(`
UPDATE ${tablePrefix}workflow_entity
SET dataLoaded = true
WHERE id = '${workflowId}'`);
});

await queryRunner.query(
`DELETE FROM ${tablePrefix}workflow_statistics WHERE name = '${StatisticsNames.dataLoaded}'`,
);
}
}
2 changes: 2 additions & 0 deletions packages/cli/src/databases/migrations/mysqldb/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import { CreateCredentialUsageTable1665484192213 } from './1665484192213-CreateC
import { RemoveCredentialUsageTable1665754637026 } from './1665754637026-RemoveCredentialUsageTable';
import { AddWorkflowVersionIdColumn1669739707125 } from './1669739707125-AddWorkflowVersionIdColumn';
import { AddTriggerCountColumn1669823906994 } from './1669823906994-AddTriggerCountColumn';
import { RemoveWorkflowDataLoadedFlag1671726148420 } from './1671726148420-RemoveWorkflowDataLoadedFlag';
import { MessageEventBusDestinations1671535397530 } from './1671535397530-MessageEventBusDestinations';

export const mysqlMigrations = [
Expand Down Expand Up @@ -57,5 +58,6 @@ export const mysqlMigrations = [
AddWorkflowVersionIdColumn1669739707125,
WorkflowStatistics1664196174002,
AddTriggerCountColumn1669823906994,
RemoveWorkflowDataLoadedFlag1671726148420,
MessageEventBusDestinations1671535397530,
];
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
import { MigrationInterface, QueryRunner } from 'typeorm';
import { getTablePrefix, logMigrationEnd, logMigrationStart } from '@db/utils/migrationHelpers';
import config from '@/config';
import { StatisticsNames } from '@/databases/entities/WorkflowStatistics';

export class RemoveWorkflowDataLoadedFlag1671726148421 implements MigrationInterface {
name = 'RemoveWorkflowDataLoadedFlag1671726148421';

async up(queryRunner: QueryRunner) {
logMigrationStart(this.name);
const tablePrefix = getTablePrefix();

// If any existing workflow has dataLoaded set to true, insert the relevant information to the statistics table
const workflowIds: Array<{ id: number; dataLoaded: boolean }> = await queryRunner.query(`
SELECT id, "dataLoaded"
FROM ${tablePrefix}workflow_entity
`);

workflowIds.map(({ id, dataLoaded }) => {
if (dataLoaded) {
const [insertQuery, insertParams] = queryRunner.connection.driver.escapeQueryWithParameters(
`
INSERT INTO ${tablePrefix}workflow_statistics ("workflowId", name, count, "latestEvent") VALUES
(:id, :name, 1, CURRENT_TIMESTAMP(3))
`,
{ id, name: StatisticsNames.dataLoaded },
{},
);

return queryRunner.query(insertQuery, insertParams);
}
return undefined;
});

await queryRunner.query(`ALTER TABLE ${tablePrefix}workflow_entity DROP COLUMN "dataLoaded"`);

logMigrationEnd(this.name);
}

async down(queryRunner: QueryRunner) {
const tablePrefix = getTablePrefix();

await queryRunner.query(
`ALTER TABLE ${tablePrefix}workflow_entity ADD COLUMN "dataLoaded" BOOLEAN DEFAULT false`,
);

// Search through statistics for any workflows that have the dataLoaded stat
const workflowsIds: Array<{ workflowId: string }> = await queryRunner.query(`
SELECT "workflowId"
FROM ${tablePrefix}workflow_statistics
WHERE name = '${StatisticsNames.dataLoaded}'
`);
workflowsIds.map(({ workflowId }) => {
return queryRunner.query(`
UPDATE ${tablePrefix}workflow_entity
SET "dataLoaded" = true
WHERE id = '${workflowId}'`);
});

await queryRunner.query(
`DELETE FROM ${tablePrefix}workflow_statistics WHERE name = '${StatisticsNames.dataLoaded}'`,
);
}
}
2 changes: 2 additions & 0 deletions packages/cli/src/databases/migrations/postgresdb/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import { CreateCredentialUsageTable1665484192212 } from './1665484192212-CreateC
import { RemoveCredentialUsageTable1665754637025 } from './1665754637025-RemoveCredentialUsageTable';
import { AddWorkflowVersionIdColumn1669739707126 } from './1669739707126-AddWorkflowVersionIdColumn';
import { AddTriggerCountColumn1669823906995 } from './1669823906995-AddTriggerCountColumn';
import { RemoveWorkflowDataLoadedFlag1671726148421 } from './1671726148421-RemoveWorkflowDataLoadedFlag';
import { MessageEventBusDestinations1671535397530 } from './1671535397530-MessageEventBusDestinations';

export const postgresMigrations = [
Expand Down Expand Up @@ -53,5 +54,6 @@ export const postgresMigrations = [
AddWorkflowVersionIdColumn1669739707126,
WorkflowStatistics1664196174001,
AddTriggerCountColumn1669823906995,
RemoveWorkflowDataLoadedFlag1671726148421,
MessageEventBusDestinations1671535397530,
];
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
import { MigrationInterface, QueryRunner } from 'typeorm';
import { logMigrationEnd, logMigrationStart } from '@db/utils/migrationHelpers';
import config from '@/config';
import { v4 as uuidv4 } from 'uuid';
import { StatisticsNames } from '@/databases/entities/WorkflowStatistics';

export class RemoveWorkflowDataLoadedFlag1671726148419 implements MigrationInterface {
name = 'RemoveWorkflowDataLoadedFlag1671726148419';

async up(queryRunner: QueryRunner) {
logMigrationStart(this.name);
const tablePrefix = config.getEnv('database.tablePrefix');

// If any existing workflow has dataLoaded set to true, insert the relevant information to the statistics table
const workflowIds: Array<{ id: number; dataLoaded: boolean }> = await queryRunner.query(`
SELECT id, dataLoaded
FROM "${tablePrefix}workflow_entity"
`);

workflowIds.map(({ id, dataLoaded }) => {
if (dataLoaded) {
const [insertQuery, insertParams] = queryRunner.connection.driver.escapeQueryWithParameters(
`
INSERT INTO "${tablePrefix}workflow_statistics" (workflowId, name, count, latestEvent) VALUES
(:id, :name, 1, STRFTIME('%Y-%m-%d %H:%M:%f', 'NOW'))
`,
{ id, name: StatisticsNames.dataLoaded },
{},
);

return queryRunner.query(insertQuery, insertParams);
}
return undefined;
});

await queryRunner.query(
`ALTER TABLE \`${tablePrefix}workflow_entity\` DROP COLUMN "dataLoaded"`,
);

logMigrationEnd(this.name);
}

async down(queryRunner: QueryRunner) {
const tablePrefix = config.getEnv('database.tablePrefix');

await queryRunner.query(
`ALTER TABLE \`${tablePrefix}workflow_entity\` ADD COLUMN "dataLoaded" BOOLEAN DEFAULT false`,
);

// Search through statistics for any workflows that have the dataLoaded stat
const workflowsIds: Array<{ workflowId: string }> = await queryRunner.query(`
SELECT workflowId
FROM "${tablePrefix}workflow_statistics"
WHERE name = '${StatisticsNames.dataLoaded}'
`);
workflowsIds.map(({ workflowId }) => {
return queryRunner.query(`
UPDATE "${tablePrefix}workflow_entity"
SET dataLoaded = true
WHERE id = '${workflowId}'`);
});

await queryRunner.query(
`DELETE FROM "${tablePrefix}workflow_statistics" WHERE name = '${StatisticsNames.dataLoaded}'`,
);
}
}
2 changes: 2 additions & 0 deletions packages/cli/src/databases/migrations/sqlite/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import { CreateCredentialUsageTable1665484192211 } from './1665484192211-CreateC
import { RemoveCredentialUsageTable1665754637024 } from './1665754637024-RemoveCredentialUsageTable';
import { AddWorkflowVersionIdColumn1669739707124 } from './1669739707124-AddWorkflowVersionIdColumn';
import { AddTriggerCountColumn1669823906993 } from './1669823906993-AddTriggerCountColumn';
import { RemoveWorkflowDataLoadedFlag1671726148419 } from './1671726148419-RemoveWorkflowDataLoadedFlag';
import { MessageEventBusDestinations1671535397530 } from './1671535397530-MessageEventBusDestinations';

const sqliteMigrations = [
Expand Down Expand Up @@ -51,6 +52,7 @@ const sqliteMigrations = [
AddWorkflowVersionIdColumn1669739707124,
AddTriggerCountColumn1669823906993,
WorkflowStatistics1664196174000,
RemoveWorkflowDataLoadedFlag1671726148419,
MessageEventBusDestinations1671535397530,
];

Expand Down
34 changes: 23 additions & 11 deletions packages/cli/src/events/WorkflowStatistics.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
import type { INode, IRun, IWorkflowBase } from 'n8n-workflow';
import { INode, IRun, IWorkflowBase } from 'n8n-workflow';
import * as Db from '@/Db';
import { InternalHooksManager } from '@/InternalHooksManager';
import { StatisticsNames } from '@/databases/entities/WorkflowStatistics';
import { getWorkflowOwner } from '@/UserManagement/UserManagementHelper';
import { QueryFailedError } from 'typeorm';

export async function workflowExecutionCompleted(
workflowData: IWorkflowBase,
Expand Down Expand Up @@ -47,7 +48,10 @@ export async function workflowExecutionCompleted(
// Send the metrics
await InternalHooksManager.getInstance().onFirstProductionWorkflowSuccess(metrics);
} catch (error) {
// Do we just assume it's a conflict error? If there is any other sort of error in the DB it should trigger here too
if (!(error instanceof QueryFailedError)) {
throw error;
}

await Db.collections.WorkflowStatistics.update(
{ workflowId, name },
{ count: () => 'count + 1', latestEvent: new Date() },
Expand All @@ -56,16 +60,24 @@ export async function workflowExecutionCompleted(
}

export async function nodeFetchedData(workflowId: string, node: INode): Promise<void> {
// Update only if necessary
const response = await Db.collections.Workflow.update(
{ id: workflowId, dataLoaded: false },
{ dataLoaded: true },
);

// If response.affected is 1 then we know this was the first time data was loaded into the workflow; do posthog event here
if (!response.affected) return;
// Try to insert the data loaded statistic
try {
await Db.collections.WorkflowStatistics.insert({
workflowId,
name: StatisticsNames.dataLoaded,
count: 1,
latestEvent: new Date(),
});
} catch (error) {
// if it's a duplicate key error then that's fine, otherwise throw the error
if (!(error instanceof QueryFailedError)) {
throw error;
}
// If it is a query failed error, we return
return;
}

// Compile the metrics
// Compile the metrics since this was a new data loaded event
const owner = await getWorkflowOwner(workflowId);
let metrics = {
user_id: owner.id,
Expand Down
Loading