Skip to content

Commit

Permalink
fix(core): Remove threads pkg, rewrite log writer worker (#5134)
Browse files Browse the repository at this point in the history
  • Loading branch information
flipswitchingmonkey authored Jan 13, 2023
1 parent b7faf4a commit e845eb3
Show file tree
Hide file tree
Showing 15 changed files with 286 additions and 285 deletions.
1 change: 0 additions & 1 deletion packages/cli/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,6 @@
"sse-channel": "^4.0.0",
"swagger-ui-express": "^4.3.0",
"syslog-client": "^1.1.1",
"threads": "^1.7.0",
"tslib": "1.14.1",
"typeorm": "0.2.45",
"uuid": "^8.3.2",
Expand Down
6 changes: 0 additions & 6 deletions packages/cli/src/config/schema.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1065,12 +1065,6 @@ export const schema = {
env: 'N8N_EVENTBUS_CHECKUNSENTINTERVAL',
},
logWriter: {
syncFileAccess: {
doc: 'Whether all file access happens synchronously within the thread.',
format: Boolean,
default: false,
env: 'N8N_EVENTBUS_LOGWRITER_SYNCFILEACCESS',
},
keepLogCount: {
doc: 'How many event log files to keep.',
format: Number,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
/* eslint-disable @typescript-eslint/no-unsafe-argument */
/* eslint-disable @typescript-eslint/no-explicit-any */
import { DateTime } from 'luxon';
import type { EventMessageTypeNames, JsonObject } from 'n8n-workflow';
import { v4 as uuid } from 'uuid';
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ export interface EventMessageAuditOptions extends AbstractEventMessageOptions {
}

export class EventMessageAudit extends AbstractEventMessage {
// eslint-disable-next-line @typescript-eslint/no-unsafe-assignment, @typescript-eslint/no-unsafe-member-access
readonly __type = EventMessageTypeNames.audit;

eventName: EventNamesAuditType;
Expand Down
38 changes: 19 additions & 19 deletions packages/cli/src/eventbus/MessageEventBus/MessageEventBus.ts
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,6 @@ class MessageEventBus extends EventEmitter {

LoggerProxy.debug('Initializing event bus...');

// eslint-disable-next-line @typescript-eslint/no-unsafe-assignment, @typescript-eslint/no-unsafe-member-access, @typescript-eslint/no-unsafe-call
const savedEventDestinations = await Db.collections.EventDestinations.find({});
if (savedEventDestinations.length > 0) {
for (const destinationData of savedEventDestinations) {
Expand All @@ -91,11 +90,9 @@ class MessageEventBus extends EventEmitter {
LoggerProxy.debug('Checking for unsent event messages');
const unsentAndUnfinished = await this.getUnsentAndUnfinishedExecutions();
LoggerProxy.debug(
`Start logging into ${
(await this.logWriter?.getThread()?.getLogFileName()) ?? 'unknown filename'
} `,
`Start logging into ${this.logWriter?.getLogFileName() ?? 'unknown filename'} `,
);
await this.logWriter?.startLogging();
this.logWriter?.startLogging();
await this.send(unsentAndUnfinished.unsentMessages);

if (unsentAndUnfinished.unfinishedExecutions.size > 0) {
Expand Down Expand Up @@ -130,10 +127,8 @@ class MessageEventBus extends EventEmitter {
if (id && Object.keys(this.destinations).includes(id)) {
result = [this.destinations[id].serialize()];
} else {
// eslint-disable-next-line @typescript-eslint/no-unsafe-return
result = Object.keys(this.destinations).map((e) => this.destinations[e].serialize());
}
// eslint-disable-next-line @typescript-eslint/no-unsafe-return, @typescript-eslint/no-unsafe-member-access, @typescript-eslint/no-unsafe-call
return result.sort((a, b) => (a.__type ?? '').localeCompare(b.__type ?? ''));
}

Expand Down Expand Up @@ -175,7 +170,11 @@ class MessageEventBus extends EventEmitter {
msgs = [msgs];
}
for (const msg of msgs) {
await this.logWriter?.putMessage(msg);
this.logWriter?.putMessage(msg);
// if there are no set up destinations, immediately mark the event as sent
if (!this.shouldSendMsg(msg)) {
this.confirmSent(msg, { id: '0', name: 'eventBus' });
}
await this.emitMessage(msg);
}
}
Expand All @@ -192,8 +191,8 @@ class MessageEventBus extends EventEmitter {
return false;
}

async confirmSent(msg: EventMessageTypes, source?: EventMessageConfirmSource) {
await this.logWriter?.confirmMessageSent(msg.id, source);
confirmSent(msg: EventMessageTypes, source?: EventMessageConfirmSource) {
this.logWriter?.confirmMessageSent(msg.id, source);
}

private hasAnyDestinationSubscribedToEvent(msg: EventMessageTypes): boolean {
Expand All @@ -210,22 +209,23 @@ class MessageEventBus extends EventEmitter {
// this is for internal use ONLY and not for use with custom destinations!
this.emit('message', msg);

LoggerProxy.debug(`Listeners: ${this.eventNames().join(',')}`);
// LoggerProxy.debug(`Listeners: ${this.eventNames().join(',')}`);

// if there are no set up destinations, immediately mark the event as sent
if (
!isLogStreamingEnabled() ||
Object.keys(this.destinations).length === 0 ||
!this.hasAnyDestinationSubscribedToEvent(msg)
) {
await this.confirmSent(msg, { id: '0', name: 'eventBus' });
} else {
if (this.shouldSendMsg(msg)) {
for (const destinationName of Object.keys(this.destinations)) {
this.emit(this.destinations[destinationName].getId(), msg);
}
}
}

shouldSendMsg(msg: EventMessageTypes): boolean {
return (
isLogStreamingEnabled() &&
Object.keys(this.destinations).length > 0 &&
this.hasAnyDestinationSubscribedToEvent(msg)
);
}

async getEventsAll(): Promise<EventMessageTypes[]> {
const queryResult = await this.logWriter?.getMessagesAll();
const filtered = uniqby(queryResult, 'id');
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
/* eslint-disable import/no-cycle */
/* eslint-disable @typescript-eslint/no-unsafe-member-access */
import { MessageEventBusDestinationTypeNames } from 'n8n-workflow';
import type { EventDestinations } from '@/databases/entities/MessageEventBusDestinationEntity';
import type { MessageEventBusDestination } from './MessageEventBusDestination.ee';
Expand All @@ -10,7 +9,6 @@ import { MessageEventBusDestinationWebhook } from './MessageEventBusDestinationW
export function messageEventBusDestinationFromDb(
dbData: EventDestinations,
): MessageEventBusDestination | null {
// eslint-disable-next-line @typescript-eslint/no-unsafe-argument, @typescript-eslint/no-unsafe-assignment
const destinationData = dbData.destination;
if ('__type' in destinationData) {
switch (destinationData.__type) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
/* eslint-disable @typescript-eslint/no-unsafe-member-access */
/* eslint-disable @typescript-eslint/no-unsafe-assignment */
import { v4 as uuid } from 'uuid';
import {
INodeCredentials,
Expand Down Expand Up @@ -83,7 +81,6 @@ export abstract class MessageEventBusDestination implements MessageEventBusDesti
id: this.getId(),
destination: this.serialize(),
};
// eslint-disable-next-line @typescript-eslint/no-unsafe-assignment, @typescript-eslint/no-unsafe-member-access, @typescript-eslint/no-unsafe-call
const dbResult: InsertResult = await Db.collections.EventDestinations.upsert(data, {
skipUpdateIfNoValuesChanged: true,
conflictPaths: ['id'],
Expand All @@ -97,7 +94,6 @@ export abstract class MessageEventBusDestination implements MessageEventBusDesti
}

static async deleteFromDb(id: string): Promise<DeleteResult> {
// eslint-disable-next-line @typescript-eslint/no-unsafe-assignment, @typescript-eslint/no-unsafe-call, @typescript-eslint/no-unsafe-member-access
const dbResult = await Db.collections.EventDestinations.delete({ id });
return dbResult;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ export class MessageEventBusDestinationSentry

constructor(options: MessageEventBusDestinationSentryOptions) {
super(options);
// eslint-disable-next-line @typescript-eslint/no-unsafe-member-access
this.label = options.label ?? 'Sentry DSN';
this.__type = options.__type ?? MessageEventBusDestinationTypeNames.sentry;
this.dsn = options.dsn;
Expand Down Expand Up @@ -85,7 +84,7 @@ export class MessageEventBusDestinationSentry
);

if (sentryResult) {
await eventBus.confirmSent(msg, { id: this.id, name: this.label });
eventBus.confirmSent(msg, { id: this.id, name: this.label });
sendResult = true;
}
} catch (error) {
Expand All @@ -109,7 +108,6 @@ export class MessageEventBusDestinationSentry
): MessageEventBusDestinationSentry | null {
if (
'__type' in data &&
// eslint-disable-next-line @typescript-eslint/no-unsafe-member-access
data.__type === MessageEventBusDestinationTypeNames.sentry &&
isMessageEventBusDestinationSentryOptions(data)
) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ export class MessageEventBusDestinationSyslog
if (error) {
console.log(error);
} else {
await eventBus.confirmSent(msg, { id: this.id, name: this.label });
eventBus.confirmSent(msg, { id: this.id, name: this.label });
sendResult = true;
}
},
Expand All @@ -112,7 +112,6 @@ export class MessageEventBusDestinationSyslog

serialize(): MessageEventBusDestinationSyslogOptions {
const abstractSerialized = super.serialize();
// eslint-disable-next-line @typescript-eslint/no-unsafe-return
return {
...abstractSerialized,
expectedStatusCode: this.expectedStatusCode,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -192,8 +192,6 @@ export class MessageEventBusDestinationWebhook
} catch (_) {
console.log('JSON parameter need to be an valid JSON');
}

// eslint-disable-next-line @typescript-eslint/no-unsafe-assignment
this.axiosRequestOptions.params = jsonParse(this.jsonQuery);
}
}
Expand All @@ -212,8 +210,6 @@ export class MessageEventBusDestinationWebhook
} catch (_) {
console.log('JSON parameter need to be an valid JSON');
}

// eslint-disable-next-line @typescript-eslint/no-unsafe-assignment
this.axiosRequestOptions.headers = jsonParse(this.jsonHeaders);
}
}
Expand All @@ -222,7 +218,6 @@ export class MessageEventBusDestinationWebhook
if (this.axiosRequestOptions.headers === undefined) {
this.axiosRequestOptions.headers = {};
}
// eslint-disable-next-line @typescript-eslint/no-unsafe-member-access
this.axiosRequestOptions.headers['Content-Type'] = 'application/json';
}

Expand Down Expand Up @@ -336,10 +331,8 @@ export class MessageEventBusDestinationWebhook
password: httpBasicAuth.password as string,
};
} else if (httpHeaderAuth) {
// eslint-disable-next-line @typescript-eslint/no-unsafe-member-access
this.axiosRequestOptions.headers[httpHeaderAuth.name as string] = httpHeaderAuth.value;
} else if (httpQueryAuth) {
// eslint-disable-next-line @typescript-eslint/no-unsafe-member-access
this.axiosRequestOptions.params[httpQueryAuth.name as string] = httpQueryAuth.value;
} else if (httpDigestAuth) {
this.axiosRequestOptions.auth = {
Expand All @@ -353,13 +346,13 @@ export class MessageEventBusDestinationWebhook
if (requestResponse) {
if (this.responseCodeMustMatch) {
if (requestResponse.status === this.expectedStatusCode) {
await eventBus.confirmSent(msg, { id: this.id, name: this.label });
eventBus.confirmSent(msg, { id: this.id, name: this.label });
sendResult = true;
} else {
sendResult = false;
}
} else {
await eventBus.confirmSent(msg, { id: this.id, name: this.label });
eventBus.confirmSent(msg, { id: this.id, name: this.label });
sendResult = true;
}
}
Expand Down
Loading

0 comments on commit e845eb3

Please sign in to comment.