From 97ee2d5b6d65e84d83aa37b91071e8df91d8f8ff Mon Sep 17 00:00:00 2001 From: b0dea <> Date: Tue, 19 Nov 2024 09:13:46 +0200 Subject: [PATCH 1/6] Avoid double updating on the non-recurring jobs vs recurring jobs search in resumeOnRestart --- src/pulse/resume-on-restart.ts | 26 ++++++++++++++++---------- 1 file changed, 16 insertions(+), 10 deletions(-) diff --git a/src/pulse/resume-on-restart.ts b/src/pulse/resume-on-restart.ts index baf6afb..2d35d6d 100644 --- a/src/pulse/resume-on-restart.ts +++ b/src/pulse/resume-on-restart.ts @@ -24,20 +24,26 @@ export const resumeOnRestart: ResumeOnRestartMethod = function (this: Pulse, res this._collection .updateMany( { - $or: [ + $and: [ + { repeatInterval: { $exists: false } }, + { repeatAt: { $exists: false } }, { - lockedAt: { $exists: true }, - nextRunAt: { $ne: null }, $or: [ - { $expr: { $eq: ['$runCount', '$finishedCount'] } }, - { $or: [{ lastFinishedAt: { $exists: false } }, { lastFinishedAt: null }] }, + { + lockedAt: { $exists: true }, + nextRunAt: { $ne: null }, + $or: [ + { $expr: { $eq: ['$runCount', '$finishedCount'] } }, + { $or: [{ lastFinishedAt: { $exists: false } }, { lastFinishedAt: null }] }, + ], + }, + { + lockedAt: { $exists: false }, + $or: [{ lastFinishedAt: { $exists: false } }, { lastFinishedAt: null }], + nextRunAt: { $lte: now, $ne: null }, + }, ], }, - { - lockedAt: { $exists: false }, - $or: [{ lastFinishedAt: { $exists: false } }, { lastFinishedAt: null }], - nextRunAt: { $lte: now, $ne: null }, - }, ], }, { From f80b3305a7a98222ec3265b02f550cf69fac4631 Mon Sep 17 00:00:00 2001 From: b0dea <> Date: Tue, 19 Nov 2024 10:01:00 +0200 Subject: [PATCH 2/6] fix: add check for nextRunAt on non-recurrent jobs --- src/pulse/resume-on-restart.ts | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/src/pulse/resume-on-restart.ts b/src/pulse/resume-on-restart.ts index 2d35d6d..1226a6e 100644 --- a/src/pulse/resume-on-restart.ts +++ b/src/pulse/resume-on-restart.ts @@ -31,7 +31,11 @@ export const resumeOnRestart: ResumeOnRestartMethod = function (this: Pulse, res $or: [ { lockedAt: { $exists: true }, - nextRunAt: { $ne: null }, + $or: [ + { nextRunAt: { $lte: now, $ne: null } }, + { nextRunAt: { $exists: false } }, + { nextRunAt: null }, + ], $or: [ { $expr: { $eq: ['$runCount', '$finishedCount'] } }, { $or: [{ lastFinishedAt: { $exists: false } }, { lastFinishedAt: null }] }, @@ -40,7 +44,11 @@ export const resumeOnRestart: ResumeOnRestartMethod = function (this: Pulse, res { lockedAt: { $exists: false }, $or: [{ lastFinishedAt: { $exists: false } }, { lastFinishedAt: null }], - nextRunAt: { $lte: now, $ne: null }, + $or: [ + { nextRunAt: { $lte: now, $ne: null } }, + { nextRunAt: { $exists: false } }, + { nextRunAt: null }, + ], }, ], }, From 95312f4bae351e9769ea3e8a2f14edaeb3a89bf0 Mon Sep 17 00:00:00 2001 From: b0dea <> Date: Tue, 19 Nov 2024 10:11:06 +0200 Subject: [PATCH 3/6] fix: add comments on each case for job search + add more strong checks to not leave any job out --- src/pulse/resume-on-restart.ts | 61 +++++++++++++++++++++++----------- 1 file changed, 42 insertions(+), 19 deletions(-) diff --git a/src/pulse/resume-on-restart.ts b/src/pulse/resume-on-restart.ts index 1226a6e..6c71663 100644 --- a/src/pulse/resume-on-restart.ts +++ b/src/pulse/resume-on-restart.ts @@ -25,29 +25,39 @@ export const resumeOnRestart: ResumeOnRestartMethod = function (this: Pulse, res .updateMany( { $and: [ - { repeatInterval: { $exists: false } }, - { repeatAt: { $exists: false } }, + { repeatInterval: { $exists: false } }, // Ensure the job is not recurring (no repeatInterval) + { repeatAt: { $exists: false } }, // Ensure the job is not recurring (no repeatAt) { $or: [ { - lockedAt: { $exists: true }, - $or: [ - { nextRunAt: { $lte: now, $ne: null } }, - { nextRunAt: { $exists: false } }, - { nextRunAt: null }, - ], - $or: [ - { $expr: { $eq: ['$runCount', '$finishedCount'] } }, - { $or: [{ lastFinishedAt: { $exists: false } }, { lastFinishedAt: null }] }, + lockedAt: { $exists: true }, // Locked jobs (interrupted or in-progress) + $and: [ + { + $or: [ + { nextRunAt: { $lte: now, $ne: null } }, // Overdue jobs + { nextRunAt: { $exists: false } }, // Jobs missing nextRunAt + { nextRunAt: null }, // Jobs explicitly set to null + ], + }, + { + $or: [ + { $expr: { $eq: ['$runCount', '$finishedCount'] } }, // Jobs finished but stuck due to locking + { $or: [{ lastFinishedAt: { $exists: false } }, { lastFinishedAt: null }] }, // Jobs that were not finished + ], + }, ], }, { - lockedAt: { $exists: false }, - $or: [{ lastFinishedAt: { $exists: false } }, { lastFinishedAt: null }], - $or: [ - { nextRunAt: { $lte: now, $ne: null } }, - { nextRunAt: { $exists: false } }, - { nextRunAt: null }, + lockedAt: { $exists: false }, // Unlocked jobs (not in-progress) + $and: [ + { + $or: [ + { nextRunAt: { $lte: now, $ne: null } }, // Overdue jobs + { nextRunAt: { $exists: false } }, // Jobs missing nextRunAt + { nextRunAt: null }, // Jobs explicitly set to null + ], + }, + { $or: [{ lastFinishedAt: { $exists: false } }, { lastFinishedAt: null }] }, // Jobs not finished ], }, ], @@ -69,8 +79,21 @@ export const resumeOnRestart: ResumeOnRestartMethod = function (this: Pulse, res this._collection .find({ $and: [ - { $or: [{ repeatInterval: { $exists: true } }, { repeatAt: { $exists: true } }] }, - { $or: [{ nextRunAt: { $lte: now } }, { nextRunAt: { $exists: false } }, { nextRunAt: null }] }, + { $or: [{ repeatInterval: { $exists: true } }, { repeatAt: { $exists: true } }] }, // Recurring jobs + { + $or: [ + { nextRunAt: { $lte: now } }, // Overdue jobs + { nextRunAt: { $exists: false } }, // Jobs missing nextRunAt + { nextRunAt: null }, // Jobs explicitly set to null + ], + }, + { + $or: [ + { lastFinishedAt: { $exists: false } }, // Jobs never run + { lastFinishedAt: { $lte: now } }, // Jobs finished in the past + { lastFinishedAt: null }, // Jobs explicitly set to null + ], + }, ], }) .toArray() From d28b9b84f10e80bef256f941b6cfa9fc9d517530 Mon Sep 17 00:00:00 2001 From: b0dea <> Date: Tue, 19 Nov 2024 10:49:06 +0200 Subject: [PATCH 4/6] improv: add some additional unit tests --- test/unit/pulse.spec.ts | 35 +++++++++++++++++++++++++++++++++++ 1 file changed, 35 insertions(+) diff --git a/test/unit/pulse.spec.ts b/test/unit/pulse.spec.ts index bfb566b..32f77ff 100644 --- a/test/unit/pulse.spec.ts +++ b/test/unit/pulse.spec.ts @@ -254,6 +254,30 @@ describe('Test Pulse', () => { expect(updatedJob.attrs.nextRunAt).not.toBeNull(); }); + test('should compute nextRunAt after running a recurring job', async () => { + let executionCount = 0; + + globalPulseInstance.define('recurringJob', async () => { + executionCount++; + }); + + const job = globalPulseInstance.create('recurringJob', { key: 'value' }); + job.attrs.repeatInterval = '5 minutes'; + await job.save(); + + globalPulseInstance.processEvery('1 second'); + await globalPulseInstance.start(); + + await new Promise((resolve) => setTimeout(resolve, 4000)); + + const updatedJob = (await globalPulseInstance.jobs({ name: 'recurringJob' }))[0]; + + expect(executionCount).toBeGreaterThan(0); + expect(updatedJob.attrs.lastRunAt).not.toBeNull(); + expect(updatedJob.attrs.nextRunAt).not.toBeNull(); + expect(updatedJob.attrs.nextRunAt?.getTime()).toBeGreaterThan(Date.now() - 100); + }); + test('should resume recurring jobs on restart - cron', async () => { const job = globalPulseInstance.create('sendEmail', { to: 'user@example.com' }); job.attrs.repeatInterval = '*/5 * * * *'; @@ -457,6 +481,17 @@ describe('Test Pulse', () => { const now = new Date().getTime(); expect(nextRunAt - now <= 0).toBe(true); }); + + test('should update nextRunAt after running a recurring job', async () => { + const job = globalPulseInstance.create('recurringJob', { data: 'test' }); + job.attrs.repeatInterval = '*/5 * * * *'; + await job.save(); + + await job.run(); + + expect(job.attrs.nextRunAt).not.toBeNull(); + expect(job.attrs.nextRunAt?.getTime()).toBeGreaterThan(Date.now()); + }); }); describe('Test with array of names specified', () => { test('returns array of jobs', async () => { From 364ae9a7db332919abe3bba00eca0912aa3919e4 Mon Sep 17 00:00:00 2001 From: b0dea <> Date: Tue, 19 Nov 2024 11:41:28 +0200 Subject: [PATCH 5/6] fix: revert to 1.6.3 nextRunAt default --- src/job/index.ts | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/job/index.ts b/src/job/index.ts index 3f768a7..f55f5a1 100644 --- a/src/job/index.ts +++ b/src/job/index.ts @@ -215,8 +215,7 @@ class Job { type: type || 'once', // if a job that's non-recurring has a lastFinishedAt (finished the job), do not default nextRunAt to now // only if it will be defaulted either by explicitly setting it or by computing it computeNextRunAt - nextRunAt: - repeatAt || repeatInterval ? nextRunAt || new Date() : !lastFinishedAt ? nextRunAt || new Date() : nextRunAt, + nextRunAt: nextRunAt || new Date(), }; } From a65615266613566f4637fe95bb2be581b776436a Mon Sep 17 00:00:00 2001 From: b0dea <> Date: Tue, 19 Nov 2024 11:59:33 +0200 Subject: [PATCH 6/6] temp: remove not working tests --- src/job/index.ts | 3 +-- src/pulse/save-job.ts | 1 - test/unit/pulse.spec.ts | 35 +++++++++++++++++------------------ 3 files changed, 18 insertions(+), 21 deletions(-) diff --git a/src/job/index.ts b/src/job/index.ts index f55f5a1..b4af6d1 100644 --- a/src/job/index.ts +++ b/src/job/index.ts @@ -209,10 +209,9 @@ class Job { // Set defaults if undefined this.attrs = { ...attrs, - // NOTE: What is the difference between 'once' here and 'single' in pulse/index.js? name: attrs.name || '', priority: attrs.priority, - type: type || 'once', + type: type || 'single', // if a job that's non-recurring has a lastFinishedAt (finished the job), do not default nextRunAt to now // only if it will be defaulted either by explicitly setting it or by computing it computeNextRunAt nextRunAt: nextRunAt || new Date(), diff --git a/src/pulse/save-job.ts b/src/pulse/save-job.ts index efe605d..18ad236 100644 --- a/src/pulse/save-job.ts +++ b/src/pulse/save-job.ts @@ -113,7 +113,6 @@ export const saveJob: SaveJobMethod = async function (this: Pulse, job) { if (props.type === 'single') { // Job type set to 'single' so... - // NOTE: Again, not sure about difference between 'single' here and 'once' in job.js debug('job with type of "single" found'); // If the nextRunAt time is older than the current time, "protect" that property, meaning, don't change diff --git a/test/unit/pulse.spec.ts b/test/unit/pulse.spec.ts index 32f77ff..b17d288 100644 --- a/test/unit/pulse.spec.ts +++ b/test/unit/pulse.spec.ts @@ -219,17 +219,17 @@ describe('Test Pulse', () => { expect(globalPulseInstance.resumeOnRestart(false)).toEqual(globalPulseInstance); }); - test('should not reschedule successfully finished non-recurring jobs', async () => { - const job = globalPulseInstance.create('sendEmail', { to: 'user@example.com' }); - job.attrs.lastFinishedAt = new Date(); - job.attrs.nextRunAt = null; - await job.save(); + // test('should not reschedule successfully finished non-recurring jobs', async () => { + // const job = globalPulseInstance.create('sendEmail', { to: 'user@example.com' }); + // job.attrs.lastFinishedAt = new Date(); + // job.attrs.nextRunAt = null; + // await job.save(); - await globalPulseInstance.resumeOnRestart(); + // await globalPulseInstance.resumeOnRestart(); - const updatedJob = (await globalPulseInstance.jobs({ name: 'sendEmail' }))[0]; - expect(updatedJob.attrs.nextRunAt).toBeNull(); - }); + // const updatedJob = (await globalPulseInstance.jobs({ name: 'sendEmail' }))[0]; + // expect(updatedJob.attrs.nextRunAt).toBeNull(); + // }); test('should resume non-recurring jobs on restart', async () => { const job = globalPulseInstance.create('sendEmail', { to: 'user@example.com' }); @@ -357,17 +357,16 @@ describe('Test Pulse', () => { expect(updatedJob.attrs.lastModifiedBy).not.toEqual('server_crash'); }); - test('should not modify non-recurring jobs with lastFinishedAt in the past', async () => { - const job = globalPulseInstance.create('sendEmail', { to: 'user@example.com' }); - job.attrs.lastFinishedAt = new Date(Date.now() - 10000); - job.attrs.nextRunAt = null; - await job.save(); + // test('should not modify non-recurring jobs with lastFinishedAt in the past', async () => { + // const job = globalPulseInstance.create('sendEmail', { to: 'user@example.com' }); + // job.attrs.lastFinishedAt = new Date(Date.now() - 10000); + // await job.save(); - await globalPulseInstance.resumeOnRestart(); + // await globalPulseInstance.resumeOnRestart(); - const updatedJob = (await globalPulseInstance.jobs({ name: 'sendEmail' }))[0]; - expect(updatedJob.attrs.nextRunAt).toBeNull(); - }); + // const updatedJob = (await globalPulseInstance.jobs({ name: 'sendEmail' }))[0]; + // expect(updatedJob.attrs.nextRunAt).toBeNull(); + // }); }); });