Skip to content
This repository has been archived by the owner on Feb 28, 2022. It is now read-only.

Commit

Permalink
fix(pipeline): re-implement ability to reset error by error handler
Browse files Browse the repository at this point in the history
  • Loading branch information
tripodsan committed May 14, 2019
1 parent 458bbc3 commit e8f21b3
Show file tree
Hide file tree
Showing 2 changed files with 121 additions and 124 deletions.
132 changes: 73 additions & 59 deletions src/pipeline.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@

const _ = require('lodash/fp');
const callsites = require('callsites');
const { setdefault } = require('@adobe/helix-shared').types;
const { enumerate, iter } = require('@adobe/helix-shared').sequence;
const coerce = require('./utils/coerce-secrets');

Expand All @@ -30,6 +29,16 @@ const nopLogger = {
level: 'error',
};

/**
* Simple wrapper to mark a function as error handler
* @private
*/
function errorWrapper(fn) {
const wrapper = (...args) => fn(...args);
wrapper.errorHandler = true;
return wrapper;
}

/**
* @typedef {Object} Context
* @param {Winston.Logger} logger Winston logger to use
Expand Down Expand Up @@ -89,8 +98,6 @@ class Pipeline {
this._posts = [];
// functions that are executed before each step
this._taps = [];
// executed on error
this._onError = [];

this.attach = (ext) => {
if (this.sealed) {
Expand Down Expand Up @@ -264,7 +271,10 @@ class Pipeline {
*/
error(f) {
this.describe(f);
this._onError.push(f);
const wrapped = errorWrapper(f);
// ensure proper alias
wrapped.alias = f.alias;
this._last.push(wrapped);
return this;
}

Expand Down Expand Up @@ -304,74 +314,78 @@ class Pipeline {
// register all custom attachers to the pipeline
this.attach(this._oncef);

let currentlyExecuting;

const getident = (fn, classifier, idx) => `${classifier} #${idx}/${this.describe(fn)}`;

let manualError = false;
const ckError = () => {
if (context.error) {
manualError = true;
throw context.error;
}
};
const getident = (fn, classifier, idx) => `${classifier}-#${idx}/${this.describe(fn)}`;

const execTaps = async (ident, taps) => {
/**
* Executes the taps of the current function.
* @param {Function[]} taps the taps
* @param {string} fnIdent the name of the function
* @param {number} fnIdx the current idx of the function
*/
const execTaps = async (taps, fnIdent, fnIdx) => {
for (const [idx, t] of iter(taps)) {
currentlyExecuting = `${getident(t, 'Tap function', idx)} before ${ident}`;
await t(context, this._action);
ckError();
const ident = getident(t, 'Tap function', idx);
logger.silly(`${ident} before ${fnIdent}`);
try {
await t(context, this._action, fnIdx);
} catch (e) {
logger.error(`Exception during ${ident}:\n${e.stack}`);
throw e;
}
}
};

const execFns = async (fns, classifier) => {
for (const [idx, f] of enumerate(fns)) {
/**
* Executes the pipeline functions
* @param {Function[]} fns the functions
* @param {number} startIdx offset of the function's index in the entire pipeline.
* @param {string} classifier type of function (for logging)
*/
const execFns = async (fns, startIdx, classifier) => {
for (const [i, f] of enumerate(fns)) {
const idx = i + startIdx;
const ident = getident(f, classifier, idx);
await execTaps(ident, enumerate(this._taps));

currentlyExecuting = ident;
await f(context, this._action);
ckError();
// log the function that is being called and the parameters of the function
const skip = (!context.error) === (!!f.errorHandler);
logger.silly(`${skip ? 'skip' : 'exec'} ${ident}`, {
function: this.describe(f),
});
if (skip) {
// eslint-disable-next-line no-continue
continue;
}

try {
await execTaps(enumerate(this._taps), ident, idx);
} catch (e) {
if (!context.error) {
context.error = e;
}
}
if (context.error && !f.errorHandler) {
// eslint-disable-next-line no-continue
continue;
}
try {
await f(context, this._action);
} catch (e) {
logger.error(`Exception during ${ident}:\n${e.stack}`);
if (!context.error) {
context.error = e;
}
}
}
};

try {
await execFns(this._pres, 'Pre function');
await execFns([this._oncef], 'Once function');
await execFns(this._posts, 'Post function');
await execFns(this._pres, 0, 'pre');
await execFns([this._oncef], this._pres.length, 'once');
await execFns(this._posts, this._pres.length + 1, 'post');
} catch (e) {
const modified = `Exception during pipeline execution:\n${currentlyExecuting}:\n${e.stack}`;
logger.error(modified);

// This is non standard; doing this late to avoid any errors;
// since we already printed the error, any impact of this being
// wrong should not have a big impact
try {
e.stack = modified;
} catch (e2) {
logger.warn(`Unexpected error ${e2}\n${e2.stack}`);
}

const res = setdefault(context, 'response', {});

// Not thrown, manually set context.error...
if (manualError) {
setdefault(res, 'status', 500);
} else {
logger.error(`Unexpected error during pipeline execution: \n${e.stack}`);
if (!context.error) {
context.error = e;
res.status = 500;
}

// Invoke each error handler
for (const [idx, handler] of enumerate(this._onError)) {
try {
await handler(context, this._action);
} catch (e2) {
// Ignoring errors here…how are we going to handle errors from error handlers?
// Meta error handlers? It's probably better to just run the rest of the error
// handlers and notify the dev
logger.error(`Exception during ${getident(handler, 'Error Handler', idx)}:\n${e2.stack}`);
}
}
}
return context;
Expand Down
113 changes: 48 additions & 65 deletions test/testPipeline.js
Original file line number Diff line number Diff line change
Expand Up @@ -21,29 +21,24 @@ describe('Testing Pipeline', () => {
beforeEach(() => {
logger = Logger.getTestLogger({
// tune this for debugging
level: 'info',
level: 'silly',
});
});

it('Executes without logger', (done) => {
new Pipeline().once(() => {}).run({}).then(() => done())
.catch(done);
it('Executes without logger', async () => {
await new Pipeline().once(() => {}).run({});
});

it('Executes correct order', (done) => {
it('Executes correct order', async () => {
const order = [];
new Pipeline({ logger })
await new Pipeline({ logger })
.before(() => { order.push('pre0'); })
.after(() => { order.push('post0'); })
.before(() => { order.push('pre1'); })
.after(() => { order.push('post1'); })
.once(() => { order.push('once'); })
.run()
.then(() => {
assert.deepEqual(order, ['pre0', 'pre1', 'once', 'post0', 'post1']);
done();
})
.catch(done);
.run();
assert.deepEqual(order, ['pre0', 'pre1', 'once', 'post0', 'post1']);
});

it('Can be run twice', async () => {
Expand Down Expand Up @@ -145,18 +140,13 @@ describe('Testing Pipeline', () => {
assert.deepStrictEqual(order, ['one', 'two', 'middle', 'three', 'four']);
});

it('Logs correct names', (done) => {
it('Logs correct names', async () => {
const order = [];

const pre0 = () => order.push('pre0');
const post0 = function post0() {
order.push('post0');
};

const post0 = () => order.push('post0');
function noOp() {}

let counter = 0;

const validatinglogger = {
error: noOp,
warn: noOp,
Expand All @@ -171,33 +161,20 @@ describe('Testing Pipeline', () => {
assert.ok(obj.function.match(/^before:pre0/));
}
if (counter === 2) {
assert.ok(obj.function.match(/^before:pre0/));
}
if (counter === 3) {
assert.ok(obj.function.match(/^once:anonymous/));
}
if (counter === 4) {
assert.ok(obj.function.match(/^once:anonymous/));
}
if (counter === 5) {
assert.ok(obj.function.match(/^after:post0/));
}
if (counter === 6) {
if (counter === 3) {
assert.ok(obj.function.match(/^after:post0/));
}
},
};

new Pipeline({ logger: validatinglogger })
await new Pipeline({ logger: validatinglogger })
.before(pre0)
.after(post0)
.once(() => { order.push('once'); })
.run()
.then(() => {
assert.deepEqual(order, ['pre0', 'once', 'post0']);
done();
})
.catch(done);
.run();
assert.deepEqual(order, ['pre0', 'once', 'post0']);
});

it('Disables pre before when', (done) => {
Expand Down Expand Up @@ -381,8 +358,8 @@ describe('Testing Pipeline', () => {
.catch(done);
});

it('Executes promises', () => {
new Pipeline({ logger })
it('Executes promises', async () => {
await new Pipeline({ logger })
.once(v => new Promise((resolve) => {
setTimeout(() => {
v.foo = 'bar';
Expand Down Expand Up @@ -413,7 +390,7 @@ describe('Testing Pipeline', () => {
.once(() => {})
.after(() => ({ bar: 'baz' }))
.every(() => {
cnt += 1;
assert.fail('this should not be invoked');
})
.when(() => false)
.every(() => {
Expand Down Expand Up @@ -481,56 +458,62 @@ describe('Testing Pipeline', () => {
.catch(done);
});

it('handles error in error', (done) => {
it('error handler can clear error', (done) => {
const order = [];
new Pipeline({ logger })
.before(() => {
order.push('pre0');
throw new Error('stop');
})
.before(() => { order.push('pre1'); })
.error((ctx) => {
order.push('error0');
ctx.error = null;
})
.once(() => { order.push('once'); })
.after(() => { order.push('post0'); })
.after(() => { order.push('post1'); })
.error(() => { throw Error('in error handler'); })
.error(() => { order.push('error1'); })
.run()
.then(logger.getOutput)
.then((output) => {
assert.deepEqual(order, ['pre0']);
assert.ok(output.indexOf('Exception during Error Handler') > 0);
.then(() => {
assert.deepEqual(order, ['pre0', 'error0', 'once', 'post0', 'post1']);
done();
})
.catch(done);
});

it('handles error when trying to override error.stack', (done) => {
it('handles error in error', async () => {
const order = [];
new Pipeline({ logger })
await new Pipeline({ logger })
.before(() => {
order.push('pre0');
const validator = {
set(obj, prop, value) {
if (prop === 'stack') {
throw new TypeError('The stack property is readonly.');
}
obj[prop] = value;
return true;
},
};
throw new Proxy(new Error('stop'), validator);
throw new Error('stop');
})
.before(() => { order.push('pre1'); })
.once(() => { order.push('once'); })
.after(() => { order.push('post0'); })
.after(() => { order.push('post1'); })
.error(() => { order.push('error'); })
.run()
.then(logger.getOutput)
.then((output) => {
assert.deepEqual(order, ['pre0', 'error']);
assert.ok(output.indexOf('Unexpected error TypeError: The stack property is readonly') > 0);
done();
.error(() => { throw Error('in error handler'); })
.run();
const output = await logger.getOutput();
assert.deepEqual(order, ['pre0']);
assert.ok(output.indexOf('Exception during post-#5/error:anonymous') > 0);
});

it('handles generic pipeline error', async () => {
const order = [];
await new Pipeline({ logger })
.before(() => { order.push('pre1'); })
.once({
get errorHandler() {
throw new Error('generic error');
},
})
.catch(done);
.error(() => { order.push('error'); })
.run();

const output = await logger.getOutput();
assert.deepEqual(order, ['pre1']);
assert.ok(output.indexOf('Error: generic error') > 0);
});
});

0 comments on commit e8f21b3

Please sign in to comment.