From 6e936beab217af6cf0347d63cc5caa0221cc3e08 Mon Sep 17 00:00:00 2001 From: Issa Tseng Date: Wed, 1 Dec 2021 21:33:19 -0800 Subject: [PATCH] improve: worker resiliency, hopefully. * one try/catch we didn't have before, around checker invocation. * one .catch() we didn't have before, around checker promise result. * watchdog timer which kicks the loop if it looks stale. --- lib/util/promise.js | 2 +- lib/worker/worker.js | 62 ++++++++++++--- test/integration/worker/worker.js | 126 +++++++++++++++++++++++++++++- 3 files changed, 173 insertions(+), 17 deletions(-) diff --git a/lib/util/promise.js b/lib/util/promise.js index f22ac06b1..55fbfe501 100644 --- a/lib/util/promise.js +++ b/lib/util/promise.js @@ -22,7 +22,7 @@ const resolve = Promise.resolve.bind(Promise); // restricts the length of time a promise can run for to some number of seconds, // else throws a timeout Problem. -const timebound = (promise, bound = 120) => new Promise((pass, fail) => { +const timebound = (promise, bound = 600) => new Promise((pass, fail) => { let timedOut = false; const timeout = setTimeout(() => { timedOut = true; diff --git a/lib/worker/worker.js b/lib/worker/worker.js index 6e564568b..9abbbe034 100644 --- a/lib/worker/worker.js +++ b/lib/worker/worker.js @@ -70,25 +70,61 @@ with q as update audits set claimed=clock_timestamp() from q where audits.id=q.id returning *`) .then(head); +// tiny struct thing to just store worker last report status below. +const stati = { idle: Symbol('idle'), check: Symbol('check'), run: Symbol('run') }; +class Status { + constructor() { this.set(stati.idle); } + set(status) { this.status = status; this.at = (new Date()).getTime(); } +} + // main loop. kicks off a check and attempts to process the result of the check. // if there was something to do, takes a break while that happens; the runner will // call back into the scheduler when it's done. // if there was nothing to do, immediately schedules a subsequent check at a capped // exponential backoff rate. -const scheduler = (check, run) => { - const reschedule = (delay = 3000) => { - check() - .then((event) => run(event, reschedule)) - .then((running) => { - if (!running) setTimeout(() => { reschedule(min(delay * 2, 25000)); }, delay); - }); +const worker = (container, jobMap = defaultJobMap) => { + let enable = true; // we allow the caller to abort for testing. + const check = checker(container); + const run = runner(container, jobMap); + const status = new Status(); + const withStatus = (x, chain) => { status.set(x); return chain; }; + + // this is the main loop, which should already try to hermetically catch its own + // failures and restart itself. + const now = (delay = 3000) => { + if (!enable) return; + const wait = () => { now(min(delay * 2, 25000)); }; + try { + withStatus(stati.check, check()) + .then((event) => withStatus(stati.run, run(event, now))) + .then((running) => { if (!running) withStatus(stati.idle, wait); }) + .catch((err) => { + process.stderr.write(`!! unexpected worker loop error: ${inspect(err)}\n`); + wait(); + }); + } catch (ex) { + process.stderr.write(`!! unexpected worker invocation error: ${inspect(ex)}\n`); + wait(); + } }; - return reschedule; -}; + now(); -// injects all the relevant contexts and kicks off the scheduler. -const worker = (container, jobMap = defaultJobMap) => { - scheduler(checker(container), runner(container, jobMap))(); + // this is the watchdog timer, which ensures that the worker has reported back + // in a reasonable time for what it claims to be doing. if not, it starts a new + // check immediately. there is some theoretical chance if the worker was secretly + // fine we'll end up with extras, but it seems unlikely. + const woof = (which) => { + process.stderr.write(`!! unexpected worker loss in ${which} (${status.at})\n`); + now(); + }; + const watchdog = setInterval(() => { + const delta = (new Date()).getTime() - status.at; + if ((delta > 60000) && (status.status === stati.idle)) woof('idle'); + else if ((delta > 120000) && (status.status === stati.check)) woof('check'); + else if ((delta > 720000) && (status.status === stati.run)) woof('run'); + }, 60000); + + return () => { enable = false; clearInterval(watchdog); }; }; // for testing: chews through the event queue serially until there is nothing left to process. @@ -101,5 +137,5 @@ const exhaust = async (container, jobMap = defaultJobMap) => { while (await check().then(runWait)); // eslint-disable-line no-await-in-loop }; -module.exports = { runner, checker, scheduler, worker, exhaust }; +module.exports = { runner, checker, worker, exhaust }; diff --git a/test/integration/worker/worker.js b/test/integration/worker/worker.js index 9805fce90..175da380b 100644 --- a/test/integration/worker/worker.js +++ b/test/integration/worker/worker.js @@ -2,13 +2,12 @@ const should = require('should'); const appRoot = require('app-root-path'); const { promisify } = require('util'); const { DateTime, Duration } = require('luxon'); +const { sql } = require('slonik'); const { testContainerFullTrx, testContainer } = require('../setup'); -const { runner, checker } = require(appRoot + '/lib/worker/worker'); +const { runner, checker, worker } = require(appRoot + '/lib/worker/worker'); const { Audit } = require(appRoot + '/lib/model/frames'); const { insert } = require(appRoot + '/lib/util/db'); -// we test everything except scheduler() and worker(), because these both start -// timed feedback loops that we cannot easily control or halt. describe('worker', () => { describe('runner @slow', () => { // we know reschedule is getting called at some point in these flows because @@ -225,5 +224,126 @@ describe('worker', () => { should.exist(await check()); })); }); + + describe('worker', () => { + const millis = (x) => new Promise((done) => { setTimeout(done, x); }); + + it('should run a full loop right away', testContainerFullTrx(async (container) => { + const { Audits, Users } = container; + const alice = (await Users.getByEmail('alice@getodk.org')).get(); + await Audits.log(alice.actor, 'submission.attachment.update', alice.actor); + + let ran; + const jobMap = { 'submission.attachment.update': [ () => { ran = true; return Promise.resolve(); } ] }; + const cancel = worker(container, jobMap); + + while ((await Audits.getLatestByAction('submission.attachment.update')).get().processed == null) + await millis(20); + + cancel(); + await millis(20); // buffer so the next check lands before the database is wiped on return + ran.should.equal(true); + })); + + it('should run two full loops right away', testContainerFullTrx(async (container) => { + const { Audits, Users } = container; + const alice = (await Users.getByEmail('alice@getodk.org')).get(); + await Audits.log(alice.actor, 'submission.attachment.update', alice.actor); + await Audits.log(alice.actor, 'submission.attachment.update', alice.actor); + + const jobMap = { 'submission.attachment.update': [ () => Promise.resolve() ] }; + const cancel = worker(container, jobMap); + + while ((await container.oneFirst(sql` +select count(*) from audits where action='submission.attachment.update' and processed is null`)) > 0) + await millis(40); + + cancel(); + await millis(20); // ditto above + })); + + it('should restart if the check fails prequery', testContainerFullTrx(async (container) => { + const { Audits, Users } = container; + const alice = (await Users.getByEmail('alice@getodk.org')).get(); + await Audits.log(alice.actor, 'submission.attachment.update', alice.actor); + + let failed; + const hijacked = Object.create(container.__proto__); + Object.assign(hijacked, container); + hijacked.all = (q) => { + if (q.sql.startsWith('\nwith q as')) { + if (failed) return container.all(q); + failed = true; + throw new Error('oh whoops!'); + } + }; + const jobMap = { 'submission.attachment.update': [ () => Promise.resolve() ] }; + const cancel = worker(hijacked, jobMap); + + while ((await Audits.getLatestByAction('submission.attachment.update')).get().processed == null) + await millis(20); + + cancel(); + await millis(20); + failed.should.equal(true); + })); + + it('should restart if the check fails in-query', testContainerFullTrx(async (container) => { + const { Audits, Users } = container; + const alice = (await Users.getByEmail('alice@getodk.org')).get(); + await Audits.log(alice.actor, 'submission.attachment.update', alice.actor); + + let failed; + const hijacked = Object.create(container.__proto__); + Object.assign(hijacked, container); + hijacked.all = (q) => { + if (q.sql.startsWith('\nwith q as')) { + if (failed) return container.all(q); + failed = true; + return new Promise(async (_, reject) => { await millis(5); reject('not this time'); }); + } + }; + const jobMap = { 'submission.attachment.update': [ () => Promise.resolve() ] }; + const cancel = worker(hijacked, jobMap); + + while ((await Audits.getLatestByAction('submission.attachment.update')).get().processed == null) + await millis(20); + + cancel(); + await millis(20); + failed.should.equal(true); + })); + + it('should restart if the process itself fails', testContainerFullTrx(async (container) => { + const { Audits, Users } = container; + const alice = (await Users.getByEmail('alice@getodk.org')).get(); + await Audits.log(alice.actor, 'submission.attachment.update', alice.actor); + + let failed; + let checks = 0; + const hijacked = Object.create(container.__proto__); + Object.assign(hijacked, container); + hijacked.all = (q) => { + if (q.sql.startsWith('\nwith q as')) checks++; + return container.all(q); + }; + const jobMap = { 'submission.attachment.update': [ () => { + if (failed) return Promise.resolve(); + failed = true; + checks.should.equal(1); + throw new Error('oh no!'); + } ] }; + const cancel = worker(hijacked, jobMap); + + while ((await Audits.getLatestByAction('submission.attachment.update')).get().lastFailure == null) + await millis(40); + + cancel(); + await millis(20); // ditto above + checks.should.equal(2); + })); + + // TODO: maybe someday test the watchdog loop too. + }); });