Skip to content

Commit

Permalink
Merge pull request taskcluster#556 from taskcluster/tc-lib-iterate-up…
Browse files Browse the repository at this point in the history
…dates

Tc lib iterate updates (merge conflicts fixed)
  • Loading branch information
djmitche authored Apr 10, 2019
2 parents 4196510 + 700c019 commit 8e9e0e2
Show file tree
Hide file tree
Showing 25 changed files with 494 additions and 628 deletions.
1 change: 1 addition & 0 deletions dev-docs/best-practices/libraries.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,4 @@ This file will automatically be linked from the `README.md` in the root of the r

Library source code should be in a `src` subdirectory.
No transpilation should be used: write JS that can be interpreted directly by the Node version in use in the repository.
The `main` property in `package.json` should point to `src/index.js`, which may then load other parts of the library.
48 changes: 26 additions & 22 deletions libraries/iterate/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ i = new Iterate({
maxIterationTime: 10000,
watchdogTime: 5000,
waitTime: 2000,
handler: async (watchdog, state) => {
handler: async watchdog => {
await doSomeWork();
watchdog.touch(); // tell Iterate that we`re doing work still
await doMoreWork();
Expand All @@ -39,7 +39,7 @@ i.on(`stopped`, () => {
The constructor for the `Iterate` class takes an options object, with the following properties.
All times are in milliseconds.

* `handler`: the async function to call repeatedly, called as `await handler(watchdog, state)`.
* `handler`: the async function to call repeatedly, called as `await handler(watchdog)`.
See details below.
* `monitor` (optional): instance of a `taskcluster-lib-monitor` instance with a name appropriate for this iterate instance.
This is used to report errors.
Expand All @@ -51,41 +51,45 @@ All times are in milliseconds.
* `waitTime`: the time to wait between finishing an iteration and beginning the next.
* `maxIterations` (optional, default infinite): Complete up to this many
iterations and then successfully exit. Failed iterations count.
* `maxFailures` (optional, default 7): number of failures to tolerate before considering the iteration loop a failure by emitting an `error` event.
This provides a balance between quick recovery from transient errors and the crashing the process for persistent errors.
* `maxFailures` (optional, default 0): number of consecutive failures to tolerate before considering the iteration loop a failure by emitting an `error` event.
Disabled if set to 0.
* `watchdogTime`: this is the time within which `watchdog.touch` must be called or the iteration is considered a failure.
If this value is omitted or zero, the watchdog is disabled.

The main function of the `Iterate` instance is to call `handler` repeatedly.
This is an async function, receiving two parameters -- `(watchdog, state)`.
This begins after a call to the `Iterate` instance's `start()` method, which returns a Promise that resolves once the first iteration begins (on the next tick).
To stop iteration, call the `stop()` method; this returns a Promise that resolves when any ongoing iterations are complete.

The `watchdog` parameter is basically a ticking timebomb that must be defused frequently by calling its `.touch()` method.
The handler is an async function, receiving one parameter -- `watchdog`.
This is basically a ticking timebomb that must be defused frequently by calling its `.touch()` method (unless it is not enabled).
It has methods `.start()`, `.stop()` and `.touch()` and emits `expired` when it expires.
What it allows an implementor is the abilty to say that while the absolute maximum iteration interval (`maxIterationTime`), incremental progress should be made.
What it allows an implementor is the abilty to say that within the absolute maximum iteration interval (`maxIterationTime`), incremental progress should be made.
The idea here is that after each chunk of work in the handler, you run `.touch()`.
If the `watchdogTime` duration elapses without a touch, then the iteration is considered faild.
This way, you can have a handler that can be marked as failing without waiting the full `maxIterationTime`.

The `state` parameter is an object that is passed in to the handler function.
It allows each iteration to accumulate data and use on following iterations.
Because this object is passed in by reference, changes to properties on the object are saved, but reassignment of the state variable will not be saved.
In other words, do `state.data = {count: 1}` and not `state = {count:1}`.
If `maxFailures` is set, then the `Iterate` instance will emit an `error` event when the specified number of iteration failures have occurred with out intervening successful iterations.
This provides an escape from the situation where an application is "wedged" and some external action is required to restart it.
Typically, this entails exiting the process and allowing the hosting environment to automatically restart it.
Since all of the intervening failures were logged, this can be as simple as:

```js
iterator.on('error', () => {
process.exit(1);
});
```

## Events

Iterate is an event emitter. When relevant events occur, the following events
are emitted. If the `error` event does not have a listener, the process will
exit with a non-zero exit code when it would otherwise be emitted.

* `started`: when overall iteration starts
* `stopped`: when overall iteration is finished
* `completed`: only when we have a max number of iterations, when we
finish the last iteration
* `started`: when Iterate instance starts
* `stopped`: when Iterate instance has stopped
* `iteration-start`: when an individual iteration starts
* `iteration-success`: when an individual iteration completes with
success. provides the value that handler resolves with
* `iteration-failure`: provides iteration error
* `iteration-complete`: when an iteration is complete regardless of outcome
* `error`: when the iteration is considered to be concluded and provides
list of iteration errors. If there are no handlers and this event is
emitted, an exception will be thrown in a process.nextTick callback.
* `iteration-success`: when an individual iteration completes successfully
* `iteration-failure`: when an individual iteration fails
* `iteration-complete`: when an iteration completes, regardless of outcome
* `error`: when the Iterate instance has failed (due to reaching maxFailures),
containing the most recent error.
119 changes: 53 additions & 66 deletions libraries/iterate/src/index.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
let WatchDog = require('./watchdog');
let debug = require('debug')('iterate');
let events = require('events');
const WatchDog = require('./watchdog');
const debug = require('debug')('iterate');
const events = require('events');

/**
* The Iterate Class. See README.md for explanation of constructor
Expand All @@ -14,8 +14,8 @@ class Iterate extends events.EventEmitter {
// Set default values
opts = Object.assign({}, {
watchdogTime: 0,
maxFailures: 0,
maxIterations: 0,
maxFailures: 7,
minIterationTime: 0,
}, opts);

Expand Down Expand Up @@ -62,8 +62,11 @@ class Iterate extends events.EventEmitter {
// Decide whether iteration should continue
this.keepGoing = false;

// We want to be able to share state between iterations
this.sharedState = {};
// Called when stop is called (used to break out of waitTime sleep)
this.onStopCall = null;

// Fires when stopped, only set when started
this.stopPromise = null;

// Store the iteration timeout so that a `.stop()` call during an iteration
// inhibits a handler from running
Expand All @@ -72,14 +75,15 @@ class Iterate extends events.EventEmitter {

async single_iteration() {
debug('running handler');
let start = new Date();
let watchdog = new WatchDog(this.watchdogTime);
const start = new Date();
const watchdog = new WatchDog(this.watchdogTime);
let maxIterationTimeTimer;

// build a promise that will reject when either the watchdog
// times out or the maxIterationTimeTimer expires
let timeoutRejector = new Promise((resolve, reject) => {
const timeoutRejector = new Promise((resolve, reject) => {
watchdog.on('expired', () => {
debug('watchdog expired');
reject(new Error('watchdog exceeded'));
});

Expand All @@ -92,15 +96,15 @@ class Iterate extends events.EventEmitter {
watchdog.start();
await Promise.race([
timeoutRejector,
Promise.resolve(this.handler(watchdog, this.sharedState)),
Promise.resolve(this.handler(watchdog)),
]);
} finally {
// stop the timers regardless of success or failure
clearTimeout(maxIterationTimeTimer);
watchdog.stop();
}

let duration = new Date() - start;
const duration = new Date() - start;
if (this.minIterationTime > 0 && duration < this.minIterationTime) {
throw new Error('Handler duration was less than minIterationTime');
}
Expand All @@ -110,6 +114,9 @@ class Iterate extends events.EventEmitter {
async iterate() {
let currentIteration = 0;
let failures = [];

this.emit('started');

while (true) {
currentIteration++;
let iterError;
Expand Down Expand Up @@ -149,80 +156,60 @@ class Iterate extends events.EventEmitter {
// When we reach the end of a set number of iterations, we'll stop
if (this.maxIterations > 0 && currentIteration >= this.maxIterations) {
debug(`reached max iterations of ${this.maxIterations}`);
this.stop();
this.emit('completed');
// fall through to also send 'stopped'
this.keepGoing = false;
}

if (failures.length >= this.maxFailures) {
this.__emitFatalError(failures);
return;
} else if (!this.keepGoing) {
this.stop();
this.emit('stopped');
return;
if (this.maxFailures > 0 && failures.length >= this.maxFailures) {
this.emit('error', failures[failures.length - 1]);
}

if (!this.keepGoing) {
break;
}

if (this.waitTime > 0) {
debug('waiting for next iteration');
await new Promise(resolve => {
this.currentTimeout = setTimeout(resolve, this.waitTime);
debug('waiting for next iteration or stop');
const stopPromise = new Promise(resolve => {
this.onStopCall = resolve;
});
}
}
}
let waitTimeTimeout;
const waitTimePromise = new Promise(resolve => {
waitTimeTimeout = setTimeout(resolve, this.waitTime);
});
await Promise.race([stopPromise, waitTimePromise]);

/**
* Special function which knows how to emit the final error and then throw an
* unhandled exception where appropriate. Also stop trying to iterate
* further.
*/
__emitFatalError(failures) {
if (this.currentTimeout) {
clearTimeout(this.currentTimeout);
}
this.stop();
this.emit('stopped');
if (this.monitor) {
let err = new Error('Fatal iteration error');
err.failures = failures;
this.monitor.reportError(err);
}
if (this.listeners('error').length > 0) {
this.emit('error', failures);
} else {
debug('fatal error:');
for (let x of failures) {
debug(` * ${x.stack || x}`);
this.onStopCall = null;
clearTimeout(waitTimeTimeout);

if (!this.keepGoing) {
break;
}
}
debug('trying to crash process');
process.nextTick(() => {
throw new Error(`Errors:\n=====\n${failures.map(x => x.stack || x).join('=====\n')}`);
});
}
this.emit('stopped');
}

start() {
debug('starting');
this.stoppedPromise = new Promise(resolve => {
this.on('stopped', resolve);
});
this.keepGoing = true;

// Two reasons we call it this way:
// 1. first call should have same exec env as following
// 2. start should return immediately
this.currentTimeout = setTimeout(async () => {
debug('starting iteration');
this.emit('started');
try {
await this.iterate();
} catch (err) {
console.error(err.stack || err);
}
}, 0);
return new Promise(resolve => {
this.once('started', resolve);
// start iteration; any failures here are a programming error in this
// library and so should be considered fatal
this.iterate().catch(err => this.emit('error', err));
});
}

stop() {
this.keepGoing = false;
debug('stopped');
if (this.onStopCall) {
this.onStopCall();
}
return this.stoppedPromise;
}
}

Expand Down
2 changes: 1 addition & 1 deletion libraries/iterate/src/watchdog.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
let events = require('events');
const events = require('events');

/**
* This is a watch dog timer. Think of it as a ticking timebomb which will
Expand Down
Loading

0 comments on commit 8e9e0e2

Please sign in to comment.