Skip to content

Commit

Permalink
Merge pull request #436 from getodk/issa/worker-resilience
Browse files Browse the repository at this point in the history
improve: worker resiliency, hopefully.
  • Loading branch information
issa-tseng authored Jan 11, 2022
2 parents 6a53aa6 + 6e936be commit 5313b0a
Show file tree
Hide file tree
Showing 3 changed files with 173 additions and 17 deletions.
2 changes: 1 addition & 1 deletion lib/util/promise.js
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ const resolve = Promise.resolve.bind(Promise);

// restricts the length of time a promise can run for to some number of seconds,
// else throws a timeout Problem.
const timebound = (promise, bound = 120) => new Promise((pass, fail) => {
const timebound = (promise, bound = 600) => new Promise((pass, fail) => {
let timedOut = false;
const timeout = setTimeout(() => {
timedOut = true;
Expand Down
62 changes: 49 additions & 13 deletions lib/worker/worker.js
Original file line number Diff line number Diff line change
Expand Up @@ -70,25 +70,61 @@ with q as
update audits set claimed=clock_timestamp() from q where audits.id=q.id returning *`)
.then(head);

// tiny struct thing to just store worker last report status below.
const stati = { idle: Symbol('idle'), check: Symbol('check'), run: Symbol('run') };
class Status {
constructor() { this.set(stati.idle); }
set(status) { this.status = status; this.at = (new Date()).getTime(); }
}

// main loop. kicks off a check and attempts to process the result of the check.
// if there was something to do, takes a break while that happens; the runner will
// call back into the scheduler when it's done.
// if there was nothing to do, immediately schedules a subsequent check at a capped
// exponential backoff rate.
const scheduler = (check, run) => {
const reschedule = (delay = 3000) => {
check()
.then((event) => run(event, reschedule))
.then((running) => {
if (!running) setTimeout(() => { reschedule(min(delay * 2, 25000)); }, delay);
});
const worker = (container, jobMap = defaultJobMap) => {
let enable = true; // we allow the caller to abort for testing.
const check = checker(container);
const run = runner(container, jobMap);
const status = new Status();
const withStatus = (x, chain) => { status.set(x); return chain; };

// this is the main loop, which should already try to hermetically catch its own
// failures and restart itself.
const now = (delay = 3000) => {
if (!enable) return;
const wait = () => { now(min(delay * 2, 25000)); };
try {
withStatus(stati.check, check())
.then((event) => withStatus(stati.run, run(event, now)))
.then((running) => { if (!running) withStatus(stati.idle, wait); })
.catch((err) => {
process.stderr.write(`!! unexpected worker loop error: ${inspect(err)}\n`);
wait();
});
} catch (ex) {
process.stderr.write(`!! unexpected worker invocation error: ${inspect(ex)}\n`);
wait();
}
};
return reschedule;
};
now();

// injects all the relevant contexts and kicks off the scheduler.
const worker = (container, jobMap = defaultJobMap) => {
scheduler(checker(container), runner(container, jobMap))();
// this is the watchdog timer, which ensures that the worker has reported back
// in a reasonable time for what it claims to be doing. if not, it starts a new
// check immediately. there is some theoretical chance if the worker was secretly
// fine we'll end up with extras, but it seems unlikely.
const woof = (which) => {
process.stderr.write(`!! unexpected worker loss in ${which} (${status.at})\n`);
now();
};
const watchdog = setInterval(() => {
const delta = (new Date()).getTime() - status.at;
if ((delta > 60000) && (status.status === stati.idle)) woof('idle');
else if ((delta > 120000) && (status.status === stati.check)) woof('check');
else if ((delta > 720000) && (status.status === stati.run)) woof('run');
}, 60000);

return () => { enable = false; clearInterval(watchdog); };
};

// for testing: chews through the event queue serially until there is nothing left to process.
Expand All @@ -101,5 +137,5 @@ const exhaust = async (container, jobMap = defaultJobMap) => {
while (await check().then(runWait)); // eslint-disable-line no-await-in-loop
};

module.exports = { runner, checker, scheduler, worker, exhaust };
module.exports = { runner, checker, worker, exhaust };

126 changes: 123 additions & 3 deletions test/integration/worker/worker.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,12 @@ const should = require('should');
const appRoot = require('app-root-path');
const { promisify } = require('util');
const { DateTime, Duration } = require('luxon');
const { sql } = require('slonik');
const { testContainerFullTrx, testContainer } = require('../setup');
const { runner, checker } = require(appRoot + '/lib/worker/worker');
const { runner, checker, worker } = require(appRoot + '/lib/worker/worker');
const { Audit } = require(appRoot + '/lib/model/frames');
const { insert } = require(appRoot + '/lib/util/db');

// we test everything except scheduler() and worker(), because these both start
// timed feedback loops that we cannot easily control or halt.
describe('worker', () => {
describe('runner @slow', () => {
// we know reschedule is getting called at some point in these flows because
Expand Down Expand Up @@ -225,5 +224,126 @@ describe('worker', () => {
should.exist(await check());
}));
});

describe('worker', () => {
const millis = (x) => new Promise((done) => { setTimeout(done, x); });

it('should run a full loop right away', testContainerFullTrx(async (container) => {
const { Audits, Users } = container;
const alice = (await Users.getByEmail('alice@getodk.org')).get();
await Audits.log(alice.actor, 'submission.attachment.update', alice.actor);

let ran;
const jobMap = { 'submission.attachment.update': [ () => { ran = true; return Promise.resolve(); } ] };
const cancel = worker(container, jobMap);

while ((await Audits.getLatestByAction('submission.attachment.update')).get().processed == null)
await millis(20);

cancel();
await millis(20); // buffer so the next check lands before the database is wiped on return
ran.should.equal(true);
}));

it('should run two full loops right away', testContainerFullTrx(async (container) => {
const { Audits, Users } = container;
const alice = (await Users.getByEmail('alice@getodk.org')).get();
await Audits.log(alice.actor, 'submission.attachment.update', alice.actor);
await Audits.log(alice.actor, 'submission.attachment.update', alice.actor);

const jobMap = { 'submission.attachment.update': [ () => Promise.resolve() ] };
const cancel = worker(container, jobMap);

while ((await container.oneFirst(sql`
select count(*) from audits where action='submission.attachment.update' and processed is null`)) > 0)
await millis(40);

cancel();
await millis(20); // ditto above
}));

it('should restart if the check fails prequery', testContainerFullTrx(async (container) => {
const { Audits, Users } = container;
const alice = (await Users.getByEmail('alice@getodk.org')).get();
await Audits.log(alice.actor, 'submission.attachment.update', alice.actor);

let failed;
const hijacked = Object.create(container.__proto__);
Object.assign(hijacked, container);
hijacked.all = (q) => {
if (q.sql.startsWith('\nwith q as')) {
if (failed) return container.all(q);
failed = true;
throw new Error('oh whoops!');
}
};
const jobMap = { 'submission.attachment.update': [ () => Promise.resolve() ] };
const cancel = worker(hijacked, jobMap);

while ((await Audits.getLatestByAction('submission.attachment.update')).get().processed == null)
await millis(20);

cancel();
await millis(20);
failed.should.equal(true);
}));

it('should restart if the check fails in-query', testContainerFullTrx(async (container) => {
const { Audits, Users } = container;
const alice = (await Users.getByEmail('alice@getodk.org')).get();
await Audits.log(alice.actor, 'submission.attachment.update', alice.actor);

let failed;
const hijacked = Object.create(container.__proto__);
Object.assign(hijacked, container);
hijacked.all = (q) => {
if (q.sql.startsWith('\nwith q as')) {
if (failed) return container.all(q);
failed = true;
return new Promise(async (_, reject) => { await millis(5); reject('not this time'); });
}
};
const jobMap = { 'submission.attachment.update': [ () => Promise.resolve() ] };
const cancel = worker(hijacked, jobMap);

while ((await Audits.getLatestByAction('submission.attachment.update')).get().processed == null)
await millis(20);

cancel();
await millis(20);
failed.should.equal(true);
}));

it('should restart if the process itself fails', testContainerFullTrx(async (container) => {
const { Audits, Users } = container;
const alice = (await Users.getByEmail('alice@getodk.org')).get();
await Audits.log(alice.actor, 'submission.attachment.update', alice.actor);

let failed;
let checks = 0;
const hijacked = Object.create(container.__proto__);
Object.assign(hijacked, container);
hijacked.all = (q) => {
if (q.sql.startsWith('\nwith q as')) checks++;
return container.all(q);
};
const jobMap = { 'submission.attachment.update': [ () => {
if (failed) return Promise.resolve();
failed = true;
checks.should.equal(1);
throw new Error('oh no!');
} ] };
const cancel = worker(hijacked, jobMap);

while ((await Audits.getLatestByAction('submission.attachment.update')).get().lastFailure == null)
await millis(40);

cancel();
await millis(20); // ditto above
checks.should.equal(2);
}));

// TODO: maybe someday test the watchdog loop too.
});
});

0 comments on commit 5313b0a

Please sign in to comment.