Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(core): Workflow Execution Statistics #4200

Merged
merged 65 commits into from
Dec 6, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
65 commits
Select commit Hold shift + click to select a range
42753a3
:tada: - Initial attempt at putting a DB table together
Sep 26, 2022
1d4cba1
Merge branch 'master' of github.com:n8n-io/n8n into feature/workflow-…
Sep 27, 2022
3c7fc28
:sparkles: - Add statistics to DB.collections
Sep 27, 2022
d78d9ac
:art: - actually run the statistics sqlite migration
Sep 27, 2022
10deae8
:sparkles: - add postgres migration
Sep 27, 2022
f98516c
:bug: - Fixed wrong table in down migrations
Sep 27, 2022
1da308f
:sparkles: - Added mysql migration
Sep 27, 2022
35a24a4
:bug: - Postgres migration tested
Sep 27, 2022
e5098a4
:sparkles: - Add dataLoaded flag to workflow table
Sep 27, 2022
64c945b
Merge branch 'master' of github.com:n8n-io/n8n into feature/workflow-…
Sep 27, 2022
b239eb4
Merge branch 'master' of github.com:n8n-io/n8n into feature/workflow-…
Oct 6, 2022
613bc02
:art: - Make latestEvent nullable
Oct 6, 2022
563ff77
:sparkles: - Load current execution counts during migration
Oct 6, 2022
0ff421e
:bug: - Fixed issue with postgres migrations
Oct 10, 2022
b1ad9d0
:art: - Better insertion queries for initial stats
Oct 10, 2022
268862f
:art: - Remove initial counts of executions from migration
Oct 10, 2022
4f401ff
:art: - Implemented change comments
Oct 13, 2022
c1c0aa6
:sparkles: - Add new eventEmitter to core
Oct 13, 2022
e13fe42
:sparkles: - Add workflow finished event handler
Oct 13, 2022
87adfcd
:sparkles: - Trigger workflow finish events
Oct 13, 2022
c217157
Merge branch 'master' of github.com:n8n-io/n8n into feature/workflow-…
Oct 13, 2022
46e0df1
:art: - Replaced magic strings with key/value pair
Oct 17, 2022
e98c15e
:sparkles: - Add execution counts endpoint to internal api
Oct 20, 2022
f1296ab
:sparkles: - Added timestamp endpoint
Oct 20, 2022
d593e43
:bug: - fix issue where new workflows had null for dataLoaded
Oct 20, 2022
d7b2baf
:sparkles: - Add dataLoaded endpoint
Oct 20, 2022
e671459
:sparkles: - Add nodefetcheddata event handling
Oct 28, 2022
793d40d
:sparkles: - set data loaded flag
Nov 1, 2022
af92711
:sparkles: - workflow exec stats now have user id
Nov 2, 2022
0830fb4
:sparkles: - Add first prod success to posthog
Nov 3, 2022
6f36b54
:sparkles: - Pass node to nodeFetchedData event
Nov 3, 2022
91bac53
:sparkles: - sending node fetch metrics to posthog
Nov 7, 2022
97ac970
Merge branch 'master' of github.com:n8n-io/n8n into feature/workflow-…
Nov 7, 2022
c0a124b
fix existing tests
Nov 8, 2022
b9dd237
:sparkles: - Added tests for event handler code
Nov 9, 2022
1d0a717
Merge branch 'master' of github.com:n8n-io/n8n into feature/workflow-…
Nov 9, 2022
d256362
:hammer: - Fixing config imports
Nov 9, 2022
db68f90
Merge branch 'master' into feature/workflow-execution-statistics
krynble Nov 10, 2022
36141f9
:hammer: - fixed update issue for mysql
Nov 10, 2022
25a8dd8
:bug: - Fix first event having null timestamp
Nov 11, 2022
d74cba5
:bug: - Fix issue where creation timestamps were null
Nov 14, 2022
6a205e0
:sparkles: - Add permission checking to stats apis
Nov 14, 2022
633d335
Merge branch 'feature/workflow-execution-statistics' of github.com:n8…
krynble Nov 15, 2022
f116327
Merge branch 'master' of github.com:n8n-io/n8n into feature/workflow-…
Nov 25, 2022
446acf5
Merge branch 'master' of github.com:n8n-io/n8n into feature/workflow-…
Nov 28, 2022
152169a
bring files up to date with recent refactors
Nov 28, 2022
84c195d
:arrow_up: - Updating posthog-node to latest
Nov 29, 2022
3d53b55
:sparkles: - Ensuring backend pushes feat flags to PH
Nov 29, 2022
c02060b
:bug: - use getsharedworkflow function for checking permissions in api
Nov 30, 2022
d2cf259
Merge branch 'master' of github.com:n8n-io/n8n into feature/workflow-…
Nov 30, 2022
ae8a136
:rotating_light: - fixing slight issue in tests
Nov 30, 2022
4af5c2c
:rotating_light: - more test fixes
Nov 30, 2022
8f4772f
Merge branch 'feature/workflow-execution-statistics' of github.com:n8…
krynble Dec 1, 2022
d7af982
fix: Remove commented import
krynble Dec 1, 2022
407c146
:hammer: - moved changeWorkflowID to middleware
Dec 6, 2022
a83c64d
Merge branch 'master' of github.com:n8n-io/n8n into feature/workflow-…
Dec 6, 2022
53a1fe2
Merge branch 'feature/workflow-execution-statistics' of github.com:n8…
Dec 6, 2022
637018d
Merge branch 'master' of github.com:n8n-io/n8n into feature/workflow-…
Dec 6, 2022
7617ed0
:shirt: - clean up lint errors
Dec 6, 2022
7260b01
:shirt: - accidental typo
Dec 6, 2022
8dbd8f8
:hammer: - Made checkWorkflowId work as middleware
Dec 6, 2022
3698218
:bug: - next doesn't halt execution
Dec 6, 2022
c910933
refactor: add brackets to improve legibility
krynble Dec 6, 2022
850fbb8
:shirt: - remove the name `err`
Dec 6, 2022
c3a2a74
Merge branch 'feature/workflow-execution-statistics' of github.com:n8…
Dec 6, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion packages/cli/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@
"passport-jwt": "^4.0.0",
"pg": "^8.3.0",
"picocolors": "^1.0.0",
"posthog-node": "^1.3.0",
"posthog-node": "^2.2.2",
"prom-client": "^13.1.0",
"psl": "^1.8.0",
"replacestream": "^4.0.3",
Expand Down
1 change: 1 addition & 0 deletions packages/cli/src/Db.ts
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,7 @@ export async function init(
collections.Settings = linkRepository(entities.Settings);
collections.InstalledPackages = linkRepository(entities.InstalledPackages);
collections.InstalledNodes = linkRepository(entities.InstalledNodes);
collections.WorkflowStatistics = linkRepository(entities.WorkflowStatistics);

isInitialized = true;

Expand Down
20 changes: 20 additions & 0 deletions packages/cli/src/Interfaces.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import type { SharedWorkflow } from '@db/entities/SharedWorkflow';
import type { TagEntity } from '@db/entities/TagEntity';
import type { User } from '@db/entities/User';
import type { WorkflowEntity } from '@db/entities/WorkflowEntity';
import type { WorkflowStatistics } from '@db/entities/WorkflowStatistics';

export interface IActivationError {
time: number;
Expand Down Expand Up @@ -79,6 +80,7 @@ export interface IDatabaseCollections {
Settings: Repository<Settings>;
InstalledPackages: Repository<InstalledPackages>;
InstalledNodes: Repository<InstalledNodes>;
WorkflowStatistics: Repository<WorkflowStatistics>;
}

export interface IWebhookDb {
Expand Down Expand Up @@ -691,6 +693,24 @@ export interface IWorkflowExecuteProcess {
workflowExecute: WorkflowExecute;
}

export interface IWorkflowStatisticsCounts {
productionSuccess: number;
productionError: number;
manualSuccess: number;
manualError: number;
}

export interface IWorkflowStatisticsDataLoaded {
dataLoaded: boolean;
}

export interface IWorkflowStatisticsTimestamps {
productionSuccess: Date | null;
productionError: Date | null;
manualSuccess: Date | null;
manualError: Date | null;
}

export type WhereClause = Record<string, { [key: string]: string | FindOperator<unknown> }>;

// ----------------------------------
Expand Down
21 changes: 21 additions & 0 deletions packages/cli/src/InternalHooks.ts
Original file line number Diff line number Diff line change
Expand Up @@ -477,4 +477,25 @@ export class InternalHooksClass implements IInternalHooksClass {
}): Promise<void> {
return this.telemetry.track('cnr package deleted', updateData);
}

/**
* Execution Statistics
*/
async onFirstProductionWorkflowSuccess(data: {
user_id: string;
workflow_id: string | number;
}): Promise<void> {
return this.telemetry.track('Workflow first prod success', data, { withPostHog: true });
}

async onFirstWorkflowDataLoad(data: {
user_id: string;
workflow_id: string | number;
node_type: string;
node_id: string;
credential_type?: string;
credential_id?: string;
}): Promise<void> {
return this.telemetry.track('Workflow first data fetched', data, { withPostHog: true });
}
}
6 changes: 6 additions & 0 deletions packages/cli/src/Server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ import { resolveJwt } from '@/UserManagement/auth/jwt';
import { executionsController } from '@/executions/executions.controller';
import { nodeTypesController } from '@/api/nodeTypes.api';
import { tagsController } from '@/api/tags.api';
import { workflowStatsController } from '@/api/workflowStats.api';
import { loadPublicApiVersions } from '@/PublicApi';
import {
getInstanceBaseUrl,
Expand Down Expand Up @@ -806,6 +807,11 @@ class App {
// ----------------------------------------
this.app.use(`/${this.restEndpoint}/workflows`, workflowsController);

// ----------------------------------------
// Workflow Statistics
// ----------------------------------------
this.app.use(`/${this.restEndpoint}/workflow-stats`, workflowStatsController);

// ----------------------------------------
// Tags
// ----------------------------------------
Expand Down
3 changes: 2 additions & 1 deletion packages/cli/src/WebhookHelpers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
import express from 'express';
import get from 'lodash.get';

import { BINARY_ENCODING, BinaryDataManager, NodeExecuteFunctions } from 'n8n-core';
import { BINARY_ENCODING, BinaryDataManager, NodeExecuteFunctions, eventEmitter } from 'n8n-core';

import {
createDeferredPromise,
Expand Down Expand Up @@ -233,6 +233,7 @@ export async function executeWebhook(
NodeExecuteFunctions,
executionMode,
);
eventEmitter.emit(eventEmitter.types.nodeFetchedData, workflow.id, workflowStartNode);
} catch (err) {
// Send error response to webhook caller
const errorMessage = 'Workflow Webhook Error: Workflow could not be started!';
Expand Down
25 changes: 24 additions & 1 deletion packages/cli/src/WorkflowExecuteAdditionalData.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,13 @@
/* eslint-disable @typescript-eslint/no-unused-vars */
/* eslint-disable func-names */
/* eslint-disable @typescript-eslint/no-unsafe-assignment */
import { BinaryDataManager, UserSettings, WorkflowExecute } from 'n8n-core';
import { BinaryDataManager, eventEmitter, UserSettings, WorkflowExecute } from 'n8n-core';

import {
IDataObject,
IExecuteData,
IExecuteWorkflowInfo,
INode,
INodeExecutionData,
INodeParameters,
IRun,
Expand Down Expand Up @@ -648,9 +649,20 @@ function hookFunctionsSave(parentProcessMode?: string): IWorkflowExecuteHooks {
this.retryOf,
);
}
} finally {
eventEmitter.emit(
eventEmitter.types.workflowExecutionCompleted,
this.workflowData,
fullRunData,
);
}
},
],
nodeFetchedData: [
async (workflowId: string, node: INode) => {
eventEmitter.emit(eventEmitter.types.nodeFetchedData, workflowId, node);
},
],
};
}

Expand Down Expand Up @@ -742,9 +754,20 @@ function hookFunctionsSaveWorker(): IWorkflowExecuteHooks {
this.executionId,
this.retryOf,
);
} finally {
eventEmitter.emit(
eventEmitter.types.workflowExecutionCompleted,
this.workflowData,
fullRunData,
);
}
},
],
nodeFetchedData: [
async (workflowId: string, node: INode) => {
eventEmitter.emit(eventEmitter.types.nodeFetchedData, workflowId, node);
},
],
};
}

Expand Down
6 changes: 6 additions & 0 deletions packages/cli/src/WorkflowRunnerProcess.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import {
IExecuteResponsePromiseData,
IExecuteWorkflowInfo,
ILogger,
INode,
INodeExecutionData,
IRun,
ITaskData,
Expand Down Expand Up @@ -396,6 +397,11 @@ class WorkflowRunnerProcess {
await this.sendHookToParentProcess('workflowExecuteAfter', [fullRunData, newStaticData]);
},
],
nodeFetchedData: [
async (workflowId: string, node: INode) => {
await this.sendHookToParentProcess('nodeFetchedData', [workflowId, node]);
},
],
};

const preExecuteFunctions = WorkflowExecuteAdditionalData.hookFunctionsPreExecute();
Expand Down
185 changes: 185 additions & 0 deletions packages/cli/src/api/workflowStats.api.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,185 @@
import { User } from '@/databases/entities/User';
import { whereClause } from '@/UserManagement/UserManagementHelper';
import express from 'express';
import { LoggerProxy } from 'n8n-workflow';
import {
Db,
IWorkflowStatisticsCounts,
IWorkflowStatisticsDataLoaded,
IWorkflowStatisticsTimestamps,
ResponseHelper,
} from '..';
import { StatisticsNames } from '../databases/entities/WorkflowStatistics';
import { getLogger } from '../Logger';
import { ExecutionRequest } from '../requests';

export const workflowStatsController = express.Router();

// Helper function that validates the ID, return a flag stating whether the request is allowed
async function checkWorkflowId(workflowId: string, user: User): Promise<boolean> {
// Check permissions
const shared = await Db.collections.SharedWorkflow.findOne({
relations: ['workflow'],
where: whereClause({
user,
entityType: 'workflow',
entityId: workflowId,
}),
});

if (!shared) {
LoggerProxy.info('User attempted to read a workflow without permissions', {
workflowId,
userId: user.id,
});
return false;
}
return true;
}

/**
* Initialise Logger if needed
*/
workflowStatsController.use((req, res, next) => {
freyamade marked this conversation as resolved.
Show resolved Hide resolved
try {
LoggerProxy.getInstance();
} catch (error) {
LoggerProxy.init(getLogger());
}

next();
});

/**
* Check that the workflow ID is valid and allowed to be read by the user
*/
workflowStatsController.use(async (req: ExecutionRequest.Get, res, next) => {
const allowed = await checkWorkflowId(req.params.id, req.user);
if (allowed) {
next();
} else {
// Otherwise, make and return an error
const response = new ResponseHelper.NotFoundError(`Workflow ${req.params.id} does not exist.`);
next(response);
}
});

/**
* GET /workflow-stats/:id/counts/
*/
workflowStatsController.get(
'/:id/counts/',
ResponseHelper.send(async (req: ExecutionRequest.Get): Promise<IWorkflowStatisticsCounts> => {
// Get counts from DB
const workflowId = req.params.id;

// Find the stats for this workflow
const stats = await Db.collections.WorkflowStatistics.find({
select: ['count', 'name'],
where: {
workflowId,
},
});

const data: IWorkflowStatisticsCounts = {
productionSuccess: 0,
productionError: 0,
manualSuccess: 0,
manualError: 0,
};

// There will be a maximum of 4 stats (currently)
stats.forEach(({ count, name }) => {
switch (name) {
case StatisticsNames.manualError:
data.manualError = count;
break;

case StatisticsNames.manualSuccess:
data.manualSuccess = count;
break;

case StatisticsNames.productionError:
data.productionError = count;
break;

case StatisticsNames.productionSuccess:
data.productionSuccess = count;
}
});

return data;
}),
);

/**
* GET /workflow-stats/:id/times/
*/
workflowStatsController.get(
'/:id/times/',
ResponseHelper.send(async (req: ExecutionRequest.Get): Promise<IWorkflowStatisticsTimestamps> => {
// Get times from DB
const workflowId = req.params.id;

// Find the stats for this workflow
const stats = await Db.collections.WorkflowStatistics.find({
select: ['latestEvent', 'name'],
where: {
workflowId,
},
});

const data: IWorkflowStatisticsTimestamps = {
productionSuccess: null,
productionError: null,
manualSuccess: null,
manualError: null,
};

// There will be a maximum of 4 stats (currently)
stats.forEach(({ latestEvent, name }) => {
switch (name) {
case StatisticsNames.manualError:
data.manualError = latestEvent;
break;

case StatisticsNames.manualSuccess:
data.manualSuccess = latestEvent;
break;

case StatisticsNames.productionError:
data.productionError = latestEvent;
break;

case StatisticsNames.productionSuccess:
data.productionSuccess = latestEvent;
}
});

return data;
}),
);

/**
* GET /workflow-stats/:id/data-loaded/
*/
workflowStatsController.get(
'/:id/data-loaded/',
ResponseHelper.send(async (req: ExecutionRequest.Get): Promise<IWorkflowStatisticsDataLoaded> => {
// Get flag
const workflowId = req.params.id;

// Get the corresponding workflow
const workflow = await Db.collections.Workflow.findOne(workflowId);
// It will be valid if we reach this point, this is just for TS
if (!workflow) {
return { dataLoaded: false };
}

const data: IWorkflowStatisticsDataLoaded = {
dataLoaded: workflow.dataLoaded,
};

return data;
}),
);
Loading