Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

async_hooks: add AsyncLocal class #27172

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
101 changes: 101 additions & 0 deletions doc/api/async_hooks.md
Original file line number Diff line number Diff line change
Expand Up @@ -682,6 +682,106 @@ never be called.
* Returns: {number} The same `triggerAsyncId` that is passed to the
`AsyncResource` constructor.

## Class: AsyncLocal

<!-- YAML
added: REPLACEME
-->

This class can be used to set a value which follows asynchronous control flow.
An `AsyncLocal` instance is a key into a continuation local storage.
The value set on an `AsyncLocal` instance is propagated to any async
continuation triggered within this flow. Modification of the value are done via
"copy on write", therefore already created continuations are not effected by
setting a new value, only continuations created afterwards.

The implementation relys on async hooks to follow the execution flow. Therefore
if some library is not interacting well with async hooks (e.g. it does user
space queuing) it will result in the same problems with `AsyncLocal`. To
correct this such modules should use the `AsyncResource` class.

### Example

```js
const http = require('http');
const wait = require('util').promisify(setTimeout);

const asyncLocal = new AsyncLocal();

function print(...args) {
console.log(`${asyncLocal.value || '-'}:`, ...args);
}

http.createServer(async (req, res) => {
asyncLocal.value = `${req.method}:${req.url}`;
print('start');

setImmediate(async () => {
print('next');
asyncLocal.value = `${asyncLocal.value}:split`;
await wait(10);
print('branched');
});

await wait(100);

print('done');
res.end();
}).listen(8181);
http.get('http://localhost:8181/first');
http.get('http://localhost:8181/second');
// Prints:
// GET:/first: start
// GET:/second: start
// GET:/second: next
// GET:/first: next
// GET:/second:split: branched
// GET:/first:split: branched
// GET:/first: done
// GET:/second: done
```

### new AsyncLocal(\[options\])

* `options` {Object}
* `onChangedCb` {Function} Optional callback invoked whenever a value of an
`AsyncLocal` changes.

Creates a new instance of an `AsyncLocal`. Once a value is set it's propagated
to async continuations until it is cleared.

The optional `onChangedCb` callback signals changes of the value referenced by
the `AsyncLocal` instance. The first argument is the previous value, the
second argument holds the current value and the third argument is a `true`
if this change is caused by a change of the execution context or `false` if a
new value has been assinged to `AsyncLocal.value`.

The `onChanged` callback may be invoked frequently therefore the processing
inside the callback should be limited. In special it should be avoided to
create new asynchronous operations within the callback as this may in turn
result in followup `onChanged` invocations.

It's not allowed to set a new `AsyncLocal.value` from the callback.

If the `onChanged` callback called for execution context changes throws the
error handling is like in [error handling][]. For invokations caused by
setting a new value the exception goes down to the caller of the setter.

### asyncLocal.value

Reading this value returns the current value associated with this execution
context (execution async id).

The value written is stored in a persistent storage and propagated for the
current asychronous execution path. Writting `null` or `undefined` clears the
value and stops further propagation on this execution path. Writing a new
value effects only the current execution context and new async operations
created afterwards. To avoid this copy on write semantic simply store an
`Object` or `Map` and update this.

Setting a new value directly from `onChanged` callback is not allowed an will
throw an Error.

[`after` callback]: #async_hooks_after_asyncid
[`before` callback]: #async_hooks_before_asyncid
[`destroy` callback]: #async_hooks_destroy_asyncid
Expand All @@ -691,3 +791,4 @@ never be called.
[PromiseHooks]: https://docs.google.com/document/d/1rda3yKGHimKIhg5YeoAmCOtyURgsbTH_qaYR79FELlk/edit
[`Worker`]: worker_threads.html#worker_threads_class_worker
[promise execution tracking]: #async_hooks_promise_execution_tracking
[error handling]: #async_hooks_error_handling
5 changes: 5 additions & 0 deletions doc/api/errors.md
Original file line number Diff line number Diff line change
Expand Up @@ -637,6 +637,11 @@ A special type of error that can be triggered whenever Node.js detects an
exceptional logic violation that should never occur. These are raised typically
by the `assert` module.

<a id="ERR_ASYNCLOCAL_NO_RECURSION"></a>
### ERR_ASYNCLOCAL_NO_RECURSION

An attempt was made to set an `AsyncLocal` value from `onChanged` callback.

<a id="ERR_ASYNC_CALLBACK"></a>
### ERR_ASYNC_CALLBACK

Expand Down
139 changes: 138 additions & 1 deletion lib/async_hooks.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@ const {

const {
ERR_ASYNC_CALLBACK,
ERR_INVALID_ASYNC_ID
ERR_ASYNCLOCAL_NO_RECURSION,
ERR_INVALID_ASYNC_ID,
ERR_INVALID_ARG_TYPE
} = require('internal/errors').codes;
const { validateString } = require('internal/validators');
const internal_async_hooks = require('internal/async_hooks');
Expand Down Expand Up @@ -200,6 +202,139 @@ class AsyncResource {
}


// AsyncLocal //

const kStack = Symbol('stack');
const kIsFirst = Symbol('is-first');
const kMap = Symbol('map');
const kOnChangedCb = Symbol('on-changed-cb');
const kHooks = Symbol('hooks');
const kSet = Symbol('set');
const kInOnChangedCb = Symbol('in-on-changed-cb');
const kInvokeOnChangedCb = Symbol('invoke-on-changed-cb');

class AsyncLocal {
constructor(options = {}) {
if (typeof options !== 'object' || options === null)
throw new ERR_INVALID_ARG_TYPE('options', 'Object', options);

const { onChangedCb = null } = options;
if (onChangedCb !== null && typeof onChangedCb !== 'function')
throw new ERR_INVALID_ARG_TYPE('options.onChangedCb',
'function',
onChangedCb);

this[kOnChangedCb] = onChangedCb;
this[kMap] = new Map();

const fns = {
init: (asyncId, type, triggerAsyncId, resource) => {
// Propagate value from current id to new (execution graph)
const value = this[kMap].get(executionAsyncId());
Flarna marked this conversation as resolved.
Show resolved Hide resolved
if (value)
this[kMap].set(asyncId, value);
},

destroy: (asyncId) => this[kSet](asyncId, null),
};

if (this[kOnChangedCb]) {
// Avoid setting a value from onChangedCb
this[kInOnChangedCb] = false;
// Change notification requires to keep a stack of async local values
this[kStack] = [];
// Indicates that first value was stored (before callback "missing")
this[kIsFirst] = true;

// Use before/after hooks to signal changes because of execution
fns.before = (asyncId) => {
const stack = this[kStack];
const cVal = this[kMap].get(asyncId);
const pVal = stack[stack.length - 1];
stack.push(pVal);
Flarna marked this conversation as resolved.
Show resolved Hide resolved
if (cVal !== pVal)
this[kInvokeOnChangedCb](pVal, cVal, true);
};

fns.after = (asyncId) => {
const stack = this[kStack];
const pVal = this[kMap].get(asyncId);
stack.pop();
const cVal = stack[stack.length - 1];
if (cVal !== pVal)
this[kInvokeOnChangedCb](pVal, cVal, true);
};
}
this[kHooks] = createHook(fns);
}

set value(val) {
if (this[kInOnChangedCb])
throw new ERR_ASYNCLOCAL_NO_RECURSION();

val = val === null ? undefined : val;
const id = executionAsyncId();
const onChangedCb = this[kOnChangedCb];
let pVal;
if (onChangedCb)
pVal = this[kMap].get(id);

this[kSet](id, val);

if (onChangedCb && pVal !== val)
this[kInvokeOnChangedCb](pVal, val, false);
}

get value() {
return this[kMap].get(executionAsyncId());
}

[kSet](id, val) {
const map = this[kMap];

if (val == null) {
map.delete(id);
if (map.size === 0)
this[kHooks].disable();

if (this[kOnChangedCb]) {
const stack = this[kStack];
if (map.size === 0) {
// Hooks have been disabled so next set is the first one
stack.length = 0;
this[kIsFirst] = true;
} else {
stack[stack.length - 1] = undefined;
}
}
} else {
map.set(id, val);
if (map.size === 1)
this[kHooks].enable();

if (this[kOnChangedCb]) {
const stack = this[kStack];
if (this[kIsFirst]) {
// First value set => "simulate" before hook
this[kIsFirst] = false;
stack.push(val);
} else {
stack[stack.length - 1] = val;
}
}
}
}

[kInvokeOnChangedCb](p, c, e) {
try {
this[kInOnChangedCb] = true;
this[kOnChangedCb](p, c, e);
} finally {
this[kInOnChangedCb] = false;
}
}
}

// Placing all exports down here because the exported classes won't export
// otherwise.
module.exports = {
Expand All @@ -209,4 +344,6 @@ module.exports = {
triggerAsyncId,
// Embedder API
AsyncResource,
// CLS API
AsyncLocal
};
2 changes: 2 additions & 0 deletions lib/internal/errors.js
Original file line number Diff line number Diff line change
Expand Up @@ -721,6 +721,8 @@ module.exports = {
E('ERR_AMBIGUOUS_ARGUMENT', 'The "%s" argument is ambiguous. %s', TypeError);
E('ERR_ARG_NOT_ITERABLE', '%s must be iterable', TypeError);
E('ERR_ASSERTION', '%s', Error);
E('ERR_ASYNCLOCAL_NO_RECURSION',
'Setting value from onChanged callback is not allowed', Error);
E('ERR_ASYNC_CALLBACK', '%s must be a function', TypeError);
E('ERR_ASYNC_TYPE', 'Invalid name for async "type": %s', TypeError);
E('ERR_BROTLI_INVALID_PARAM', '%s is not a valid Brotli parameter', RangeError);
Expand Down
Loading