Skip to content

Commit

Permalink
fix(worker): timed digest event merging (#4979)
Browse files Browse the repository at this point in the history
* fix: timed digest event gathering

* feat: remove un needed digest filtering steps

* fix: spelling

* fix: object id typing

* fix: increase timeout

* chore(api): remove mocha timeout on package.json scripts

---------

Co-authored-by: Paweł <pawel.tymczuk@gmail.com>
  • Loading branch information
scopsy and LetItRock committed Dec 13, 2023
1 parent 5aca164 commit 835d17c
Show file tree
Hide file tree
Showing 14 changed files with 207 additions and 341 deletions.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion apps/api/.mocharc.json
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
{
"timeout": 10000,
"timeout": 20000,
"require": "ts-node/register",
"file": ["e2e/setup.ts"],
"exit": true,
Expand Down
6 changes: 3 additions & 3 deletions apps/api/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@
"start:build": "TZ=UTC node dist/main.js",
"lint": "eslint src",
"lint:fix": "pnpm lint -- --fix",
"test": "cross-env TS_NODE_COMPILER_OPTIONS='{\"strictNullChecks\": false}' TZ=UTC NODE_ENV=test E2E_RUNNER=true mocha --timeout 10000 --require ts-node/register --exit --file e2e/setup.ts src/**/**/*.spec.ts",
"test:e2e": "cross-env TS_NODE_COMPILER_OPTIONS='{\"strictNullChecks\": false}' TZ=UTC NODE_ENV=test E2E_RUNNER=true mocha --timeout 10000 --require ts-node/register --exit --file e2e/setup.ts src/**/*.e2e.ts",
"test:e2e:ee": "cross-env TS_NODE_COMPILER_OPTIONS='{\"strictNullChecks\": false}' TZ=UTC NODE_ENV=test E2E_RUNNER=true mocha --timeout 10000 --require ts-node/register --exit --file e2e/setup.ts src/**/*.e2e-ee.ts",
"test": "cross-env TS_NODE_COMPILER_OPTIONS='{\"strictNullChecks\": false}' TZ=UTC NODE_ENV=test E2E_RUNNER=true mocha --require ts-node/register --exit --file e2e/setup.ts src/**/**/*.spec.ts",
"test:e2e": "cross-env TS_NODE_COMPILER_OPTIONS='{\"strictNullChecks\": false}' TZ=UTC NODE_ENV=test E2E_RUNNER=true mocha --require ts-node/register --exit --file e2e/setup.ts src/**/*.e2e.ts",
"test:e2e:ee": "cross-env TS_NODE_COMPILER_OPTIONS='{\"strictNullChecks\": false}' TZ=UTC NODE_ENV=test E2E_RUNNER=true mocha --require ts-node/register --exit --file e2e/setup.ts src/**/*.e2e-ee.ts",
"migration": "cross-env NODE_ENV=local MIGRATION=true ts-node --transpileOnly",
"link:submodules": "pnpm link ../../enterprise/packages/auth",
"admin:remove-user-account": "cross-env NODE_ENV=local MIGRATION=true ts-node --transpileOnly ./admin/remove-user-account.ts",
Expand Down
121 changes: 121 additions & 0 deletions apps/api/src/app/events/e2e/scheduled-digest.e2e.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
/* eslint-disable @typescript-eslint/no-non-null-assertion */
import axios from 'axios';
import { expect } from 'chai';
import {
MessageRepository,
NotificationTemplateEntity,
SubscriberEntity,
JobRepository,
JobStatusEnum,
JobEntity,
} from '@novu/dal';
import { StepTypeEnum, DigestTypeEnum, DigestUnitEnum, IDigestRegularMetadata } from '@novu/shared';
import { UserSession, SubscribersService } from '@novu/testing';

const axiosInstance = axios.create();

const promiseTimeout = (ms: number): Promise<void> => new Promise((resolve) => setTimeout(resolve, ms));

describe('Trigger event - Scheduled Digest Mode - /v1/events/trigger (POST)', function () {
let session: UserSession;
let template: NotificationTemplateEntity;
let subscriber: SubscriberEntity;
let subscriberService: SubscribersService;
const jobRepository = new JobRepository();
const messageRepository = new MessageRepository();

const triggerEvent = async (payload, transactionId?: string): Promise<void> => {
await axiosInstance.post(
`${session.serverUrl}/v1/events/trigger`,
{
transactionId,
name: template.triggers[0].identifier,
to: [subscriber.subscriberId],
payload,
},
{
headers: {
authorization: `ApiKey ${session.apiKey}`,
},
}
);
};

beforeEach(async () => {
session = new UserSession();
await session.initialize();
template = await session.createTemplate();
subscriberService = new SubscribersService(session.organization._id, session.environment._id);
subscriber = await subscriberService.createSubscriber();
});

it('should digest events using a scheduled digest', async () => {
template = await session.createTemplate({
steps: [
{
type: StepTypeEnum.DIGEST,
content: '',
metadata: {
unit: DigestUnitEnum.MINUTES,
amount: 1,
type: DigestTypeEnum.TIMED,
},
},
{
type: StepTypeEnum.IN_APP,
content: 'Hello world {{step.events.length}}' as string,
},
],
});

const events = [
{ customVar: 'Testing of User Name' },
{ customVar: 'digest' },
{ customVar: 'merged' },
{ customVar: 'digest' },
{ customVar: 'merged' },
{ customVar: 'digest' },
{ customVar: 'merged' },
];

await Promise.all(events.map((event) => triggerEvent(event)));

const handler = await session.awaitRunningJobs(template?._id, false, 1);

await handler.runDelayedImmediately();
await session.awaitRunningJobs(template?._id);

const jobs = await jobRepository.find({
_environmentId: session.environment._id,
_templateId: template._id,
_subscriberId: subscriber._id,
type: StepTypeEnum.DIGEST,
});

expect(jobs && jobs.length).to.eql(7);

const completedJob = jobs.find((elem) => elem.status === JobStatusEnum.COMPLETED);
expect(completedJob).to.ok;

const mergedJob = jobs.find((elem) => elem.status === JobStatusEnum.MERGED);
expect(mergedJob).to.ok;

const generatedMessageJob = await jobRepository.find({
_environmentId: session.environment._id,
_templateId: template._id,
_subscriberId: subscriber._id,
type: StepTypeEnum.IN_APP,
});

expect(generatedMessageJob && generatedMessageJob.length).to.equal(7);

const mergedInApp = generatedMessageJob.filter((elem) => elem.status === JobStatusEnum.MERGED);
expect(mergedInApp && mergedInApp.length).to.equal(6);

const completedInApp = generatedMessageJob.filter((elem) => elem.status === JobStatusEnum.COMPLETED);
expect(completedInApp && completedInApp.length).to.equal(1);

const digestEventLength = completedInApp.find((i) => i.digest?.events?.length === 7);
expect(digestEventLength).to.be.ok;
});
});
6 changes: 0 additions & 6 deletions apps/worker/src/app/shared/shared.module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,6 @@ import {
CreateTenant,
DalServiceHealthIndicator,
DigestFilterSteps,
DigestFilterStepsBackoff,
DigestFilterStepsRegular,
DigestFilterStepsTimed,
distributedLockService,
EventsDistributedLockService,
featureFlagsService,
Expand Down Expand Up @@ -110,9 +107,6 @@ const PROVIDERS = [
dalService,
DalServiceHealthIndicator,
DigestFilterSteps,
DigestFilterStepsBackoff,
DigestFilterStepsRegular,
DigestFilterStepsTimed,
distributedLockService,
EventsDistributedLockService,
featureFlagsService,
Expand Down
4 changes: 3 additions & 1 deletion libs/dal/src/repositories/job/job.repository.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ type JobEntityPopulated = JobEntity & {
export interface IDelayOrDigestJobResult {
digestResult: DigestCreationResultEnum;
activeDigestId?: string;
activeNotificationId?: string;
}

export class JobRepository extends BaseRepository<JobDBModel, JobEntity, EnforceEnvOrOrgIds> {
Expand Down Expand Up @@ -215,7 +216,7 @@ export class JobRepository extends BaseRepository<JobDBModel, JobEntity, Enforce
_subscriberId: this.convertStringToObjectId(job._subscriberId),
...(digestKey && { [`payload.${digestKey}`]: digestValue }),
},
'_id'
'_id _notificationId'
);

if (!delayedDigestJob) {
Expand All @@ -241,6 +242,7 @@ export class JobRepository extends BaseRepository<JobDBModel, JobEntity, Enforce

return {
activeDigestId: delayedDigestJob._id,
activeNotificationId: delayedDigestJob._notificationId?.toString(),
digestResult: DigestCreationResultEnum.MERGED,
};
}
Expand Down
29 changes: 29 additions & 0 deletions libs/testing/src/jobs.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,35 @@ export class JobsService {
},
});
} while (totalCount > 0 || runningJobs > unfinishedJobs);

return {
getDelayedTimestamp: async () => {
const delayedJobs = await this.standardQueue.getDelayed();

if (delayedJobs.length === 1) {
return delayedJobs[0].delay;
} else {
if (delayedJobs.length > 1) {
throw new Error('There are more than one delayed jobs');
} else if (delayedJobs.length === 0) {
throw new Error('There are no delayed jobs');
}
}
},
runDelayedImmediately: async () => {
const delayedJobs = await this.standardQueue.getDelayed();

if (delayedJobs.length === 1) {
await delayedJobs[0].changeDelay(1);
} else {
if (delayedJobs.length > 1) {
throw new Error('There are more than one delayed jobs');
} else if (delayedJobs.length === 0) {
throw new Error('There are no delayed jobs');
}
}
},
};
}

private async getQueueMetric() {
Expand Down
2 changes: 1 addition & 1 deletion libs/testing/src/user.session.ts
Original file line number Diff line number Diff line change
Expand Up @@ -304,7 +304,7 @@ export class UserSession {
unfinishedJobs = 0,
organizationId = this.organization._id
) {
await this.jobsService.awaitRunningJobs({
return await this.jobsService.awaitRunningJobs({
templateId,
organizationId,
delay,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
import { forwardRef, Inject, Injectable } from '@nestjs/common';
import { JobEntity, JobRepository, IDelayOrDigestJobResult } from '@novu/dal';
import {
JobEntity,
JobRepository,
IDelayOrDigestJobResult,
NotificationRepository,
} from '@novu/dal';
import {
ExecutionDetailsSourceEnum,
ExecutionDetailsStatusEnum,
Expand Down Expand Up @@ -37,7 +42,8 @@ export class MergeOrCreateDigest {
private eventsDistributedLockService: EventsDistributedLockService,
private jobRepository: JobRepository,
@Inject(forwardRef(() => ExecutionLogQueueService))
private executionLogQueueService: ExecutionLogQueueService
private executionLogQueueService: ExecutionLogQueueService,
private notificationRepository: NotificationRepository
) {}

@InstrumentUsecase()
Expand All @@ -59,7 +65,11 @@ export class MergeOrCreateDigest {

switch (digestAction.digestResult) {
case DigestCreationResultEnum.MERGED:
return await this.processMergedDigest(job, digestAction.activeDigestId);
return await this.processMergedDigest(
job,
digestAction.activeDigestId,
digestAction.activeNotificationId
);
case DigestCreationResultEnum.SKIPPED:
return await this.processSkippedDigest(job);
case DigestCreationResultEnum.CREATED:
Expand Down Expand Up @@ -87,28 +97,41 @@ export class MergeOrCreateDigest {
@Instrument()
private async processMergedDigest(
job: JobEntity,
activeDigestId: string
activeDigestId: string,
activeNotificationId: string
): Promise<DigestCreationResultEnum> {
await this.jobRepository.update(
{
_environmentId: job._environmentId,
_id: job._id,
},
{
$set: {
status: JobStatusEnum.MERGED,
_mergedDigestId: activeDigestId,
await Promise.all([
this.jobRepository.update(
{
_environmentId: job._environmentId,
_id: job._id,
},
}
);

await this.jobRepository.updateAllChildJobStatus(
job,
JobStatusEnum.MERGED,
activeDigestId
);

await this.digestMergedExecutionDetails(job);
{
$set: {
status: JobStatusEnum.MERGED,
_mergedDigestId: activeDigestId,
},
}
),
this.jobRepository.updateAllChildJobStatus(
job,
JobStatusEnum.MERGED,
activeDigestId
),
this.digestMergedExecutionDetails(job),
this.notificationRepository.update(
{
_environmentId: job._environmentId,
_id: job._notificationId,
},
{
$set: {
_digestedNotificationId: activeNotificationId,
expireAt: job.expireAt,
},
}
),
]);

return DigestCreationResultEnum.MERGED;
}
Expand Down
Loading

0 comments on commit 835d17c

Please sign in to comment.