Skip to content

Commit

Permalink
worker: add eventLoopUtilization()
Browse files Browse the repository at this point in the history
Allow calling eventLoopUtilization() directly on a worker thread:

    const worker = new Worker('./foo.js');
    const elu = worker.performance.eventLoopUtilization();
    setTimeout(() => {
      worker.performance.eventLoopUtilization(elu);
    }, 10);

Add a new performance object on the Worker instance that will hopefully
one day hold all the other performance metrics, such as nodeTiming.

Include benchmarks and tests.

PR-URL: #35664
Reviewed-By: Juan José Arboleda <soyjuanarbol@gmail.com>
Reviewed-By: Anna Henningsen <anna@addaleax.net>
Reviewed-By: Gerhard Stöbich <deb2001-github@yahoo.de>
Reviewed-By: James M Snell <jasnell@gmail.com>
  • Loading branch information
trevnorris authored and Flarna committed Oct 27, 2020
1 parent 0d47432 commit 04d1664
Show file tree
Hide file tree
Showing 9 changed files with 370 additions and 24 deletions.
61 changes: 61 additions & 0 deletions benchmark/worker/bench-eventlooputil.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
'use strict';

const common = require('../common.js');
const { Worker, parentPort } = require('worker_threads');

if (process.argv[2] === 'idle cats') {
return parentPort.once('message', () => {});
}

const bench = common.createBenchmark(main, {
n: [1e6],
method: [
'ELU_simple',
'ELU_passed',
],
});

function main({ method, n }) {
switch (method) {
case 'ELU_simple':
benchELUSimple(n);
break;
case 'ELU_passed':
benchELUPassed(n);
break;
default:
throw new Error(`Unsupported method ${method}`);
}
}

function benchELUSimple(n) {
const worker = new Worker(__filename, { argv: ['idle cats'] });

spinUntilIdle(worker, () => {
bench.start();
for (let i = 0; i < n; i++)
worker.performance.eventLoopUtilization();
bench.end(n);
worker.postMessage('bye');
});
}

function benchELUPassed(n) {
const worker = new Worker(__filename, { argv: ['idle cats'] });

spinUntilIdle(worker, () => {
let elu = worker.performance.eventLoopUtilization();
bench.start();
for (let i = 0; i < n; i++)
elu = worker.performance.eventLoopUtilization(elu);
bench.end(n);
worker.postMessage('bye');
});
}

function spinUntilIdle(w, cb) {
const t = w.performance.eventLoopUtilization();
if (t.idle + t.active > 0)
return process.nextTick(cb);
setTimeout(() => spinUntilIdle(w, cb), 1);
}
8 changes: 6 additions & 2 deletions doc/api/perf_hooks.md
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,11 @@ added:
The `eventLoopUtilization()` method returns an object that contains the
cumulative duration of time the event loop has been both idle and active as a
high resolution milliseconds timer. The `utilization` value is the calculated
Event Loop Utilization (ELU). If bootstrapping has not yet finished, the
properties have the value of `0`.
Event Loop Utilization (ELU).

If bootstrapping has not yet finished on the main thread the properties have
the value of `0`. The ELU is immediately available on [Worker threads][] since
bootstrap happens within the event loop.

Both `utilization1` and `utilization2` are optional parameters.

Expand Down Expand Up @@ -766,6 +769,7 @@ require('some-module');
[Performance Timeline]: https://w3c.github.io/performance-timeline/
[User Timing]: https://www.w3.org/TR/user-timing/
[Web Performance APIs]: https://w3c.github.io/perf-timing-primer/
[Worker threads]: worker_threads.md#worker_threads_worker_threads
[`'exit'`]: process.md#process_event_exit
[`child_process.spawnSync()`]: child_process.md#child_process_child_process_spawnsync_command_args_options
[`process.hrtime()`]: process.md#process_process_hrtime_time
Expand Down
62 changes: 62 additions & 0 deletions doc/api/worker_threads.md
Original file line number Diff line number Diff line change
Expand Up @@ -799,6 +799,65 @@ If the Worker thread is no longer running, which may occur before the
[`'exit'` event][] is emitted, the returned `Promise` will be rejected
immediately with an [`ERR_WORKER_NOT_RUNNING`][] error.

### `worker.performance`
<!-- YAML
added: REPLACEME
-->

An object that can be used to query performance information from a worker
instance. Similar to [`perf_hooks.performance`][].

#### `performance.eventLoopUtilization([utilization1[, utilization2]])`
<!-- YAML
added: REPLACEME
-->

* `utilization1` {Object} The result of a previous call to
`eventLoopUtilization()`.
* `utilization2` {Object} The result of a previous call to
`eventLoopUtilization()` prior to `utilization1`.
* Returns {Object}
* `idle` {number}
* `active` {number}
* `utilization` {number}

The same call as [`perf_hooks` `eventLoopUtilization()`][], except the values
of the worker instance are returned.

One difference is that, unlike the main thread, bootstrapping within a worker
is done within the event loop. So the event loop utilization will be
immediately available once the worker's script begins execution.

An `idle` time that does not increase does not indicate that the worker is
stuck in bootstrap. The following examples shows how the worker's entire
lifetime will never accumulate any `idle` time, but is still be able to process
messages.

```js
const { Worker, isMainThread, parentPort } = require('worker_threads');

if (isMainThread) {
const worker = new Worker(__filename);
setInterval(() => {
worker.postMessage('hi');
console.log(worker.performance.eventLoopUtilization());
}, 100).unref();
return;
}

parentPort.on('message', () => console.log('msg')).unref();
(function r(n) {
if (--n < 0) return;
const t = Date.now();
while (Date.now() - t < 300);
setImmediate(r, n);
})(10);
```

The event loop utilization of a worker is available only after the [`'online'`
event][] emitted, and if called before this, or after the [`'exit'`
event][], then all properties have the value of `0`.

### `worker.postMessage(value[, transferList])`
<!-- YAML
added: v10.5.0
Expand Down Expand Up @@ -920,6 +979,7 @@ active handle in the event system. If the worker is already `unref()`ed calling
[Web Workers]: https://developer.mozilla.org/en-US/docs/Web/API/Web_Workers_API
[`'close'` event]: #worker_threads_event_close
[`'exit'` event]: #worker_threads_event_exit
[`'online'` event]: #worker_threads_event_online
[`ArrayBuffer`]: https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/ArrayBuffer
[`AsyncResource`]: async_hooks.md#async_hooks_class_asyncresource
[`Buffer.allocUnsafe()`]: buffer.md#buffer_static_method_buffer_allocunsafe_size
Expand All @@ -940,6 +1000,8 @@ active handle in the event system. If the worker is already `unref()`ed calling
[`fs.close()`]: fs.md#fs_fs_close_fd_callback
[`fs.open()`]: fs.md#fs_fs_open_path_flags_mode_callback
[`markAsUntransferable()`]: #worker_threads_worker_markasuntransferable_object
[`perf_hooks.performance`]: #perf_hooks.md#perf_hooks_perf_hooks_performance
[`perf_hooks` `eventLoopUtilization()`]: perf_hooks.md#perf_hooks_performance_eventlooputilization_utilization1_utilization2
[`port.on('message')`]: #worker_threads_event_message
[`port.onmessage()`]: https://developer.mozilla.org/en-US/docs/Web/API/MessagePort/onmessage
[`port.postMessage()`]: #worker_threads_port_postmessage_value_transferlist
Expand Down
56 changes: 56 additions & 0 deletions lib/internal/worker.js
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ const {
const EventEmitter = require('events');
const assert = require('internal/assert');
const path = require('path');
const { timeOrigin } = internalBinding('performance');

const errorCodes = require('internal/errors').codes;
const {
Expand Down Expand Up @@ -70,6 +71,8 @@ const kOnMessage = Symbol('kOnMessage');
const kOnCouldNotSerializeErr = Symbol('kOnCouldNotSerializeErr');
const kOnErrorMessage = Symbol('kOnErrorMessage');
const kParentSideStdio = Symbol('kParentSideStdio');
const kLoopStartTime = Symbol('kLoopStartTime');
const kIsOnline = Symbol('kIsOnline');

const SHARE_ENV = SymbolFor('nodejs.worker_threads.SHARE_ENV');
let debug = require('internal/util/debuglog').debuglog('worker', (fn) => {
Expand Down Expand Up @@ -223,6 +226,12 @@ class Worker extends EventEmitter {
null,
hasStdin: !!options.stdin
}, transferList);
// Use this to cache the Worker's loopStart value once available.
this[kLoopStartTime] = -1;
this[kIsOnline] = false;
this.performance = {
eventLoopUtilization: eventLoopUtilization.bind(this),
};
// Actually start the new thread now that everything is in place.
this[kHandle].startThread();
}
Expand Down Expand Up @@ -254,6 +263,7 @@ class Worker extends EventEmitter {
[kOnMessage](message) {
switch (message.type) {
case messageTypes.UP_AND_RUNNING:
this[kIsOnline] = true;
return this.emit('online');
case messageTypes.COULD_NOT_SERIALIZE_ERROR:
return this[kOnCouldNotSerializeErr]();
Expand Down Expand Up @@ -415,6 +425,52 @@ function makeResourceLimits(float64arr) {
};
}

function eventLoopUtilization(util1, util2) {
// TODO(trevnorris): Works to solve the thread-safe read/write issue of
// loopTime, but has the drawback that it can't be set until the event loop
// has had a chance to turn. So it will be impossible to read the ELU of
// a worker thread immediately after it's been created.
if (!this[kIsOnline] || !this[kHandle]) {
return { idle: 0, active: 0, utilization: 0 };
}

// Cache loopStart, since it's only written to once.
if (this[kLoopStartTime] === -1) {
this[kLoopStartTime] = this[kHandle].loopStartTime();
if (this[kLoopStartTime] === -1)
return { idle: 0, active: 0, utilization: 0 };
}

if (util2) {
const idle = util1.idle - util2.idle;
const active = util1.active - util2.active;
return { idle, active, utilization: active / (idle + active) };
}

const idle = this[kHandle].loopIdleTime();

// Using performance.now() here is fine since it's always the time from
// the beginning of the process, and is why it needs to be offset by the
// loopStart time (which is also calculated from the beginning of the
// process).
const active = now() - this[kLoopStartTime] - idle;

if (!util1) {
return { idle, active, utilization: active / (idle + active) };
}

const idle_delta = idle - util1.idle;
const active_delta = active - util1.active;
const utilization = active_delta / (idle_delta + active_delta);
return { idle: idle_delta, active: active_delta, utilization };
}

// Duplicate code from performance.now() so don't need to require perf_hooks.
function now() {
const hr = process.hrtime();
return (hr[0] * 1000 + hr[1] / 1e6) - timeOrigin;
}

module.exports = {
ownsProcessState,
isMainThread,
Expand Down
37 changes: 37 additions & 0 deletions src/node_worker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -746,6 +746,39 @@ void Worker::TakeHeapSnapshot(const FunctionCallbackInfo<Value>& args) {
args.GetReturnValue().Set(scheduled ? taker->object() : Local<Object>());
}

void Worker::LoopIdleTime(const FunctionCallbackInfo<Value>& args) {
Worker* w;
ASSIGN_OR_RETURN_UNWRAP(&w, args.This());

Mutex::ScopedLock lock(w->mutex_);
// Using w->is_stopped() here leads to a deadlock, and checking is_stopped()
// before locking the mutex is a race condition. So manually do the same
// check.
if (w->stopped_ || w->env_ == nullptr)
return args.GetReturnValue().Set(-1);

uint64_t idle_time = uv_metrics_idle_time(w->env_->event_loop());
args.GetReturnValue().Set(1.0 * idle_time / 1e6);
}

void Worker::LoopStartTime(const FunctionCallbackInfo<Value>& args) {
Worker* w;
ASSIGN_OR_RETURN_UNWRAP(&w, args.This());

Mutex::ScopedLock lock(w->mutex_);
// Using w->is_stopped() here leads to a deadlock, and checking is_stopped()
// before locking the mutex is a race condition. So manually do the same
// check.
if (w->stopped_ || w->env_ == nullptr)
return args.GetReturnValue().Set(-1);

double loop_start_time = w->env_->performance_state()->milestones[
node::performance::NODE_PERFORMANCE_MILESTONE_LOOP_START];
CHECK_GE(loop_start_time, 0);
args.GetReturnValue().Set(
(loop_start_time - node::performance::timeOrigin) / 1e6);
}

namespace {

// Return the MessagePort that is global for this Environment and communicates
Expand Down Expand Up @@ -779,6 +812,8 @@ void InitWorker(Local<Object> target,
env->SetProtoMethod(w, "unref", Worker::Unref);
env->SetProtoMethod(w, "getResourceLimits", Worker::GetResourceLimits);
env->SetProtoMethod(w, "takeHeapSnapshot", Worker::TakeHeapSnapshot);
env->SetProtoMethod(w, "loopIdleTime", Worker::LoopIdleTime);
env->SetProtoMethod(w, "loopStartTime", Worker::LoopStartTime);

Local<String> workerString =
FIXED_ONE_BYTE_STRING(env->isolate(), "Worker");
Expand Down Expand Up @@ -845,6 +880,8 @@ void RegisterExternalReferences(ExternalReferenceRegistry* registry) {
registry->Register(Worker::Unref);
registry->Register(Worker::GetResourceLimits);
registry->Register(Worker::TakeHeapSnapshot);
registry->Register(Worker::LoopIdleTime);
registry->Register(Worker::LoopStartTime);
}

} // anonymous namespace
Expand Down
2 changes: 2 additions & 0 deletions src/node_worker.h
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,8 @@ class Worker : public AsyncWrap {
const v8::FunctionCallbackInfo<v8::Value>& args);
v8::Local<v8::Float64Array> GetResourceLimits(v8::Isolate* isolate) const;
static void TakeHeapSnapshot(const v8::FunctionCallbackInfo<v8::Value>& args);
static void LoopIdleTime(const v8::FunctionCallbackInfo<v8::Value>& args);
static void LoopStartTime(const v8::FunctionCallbackInfo<v8::Value>& args);

private:
void CreateEnvMessagePort(Environment* env);
Expand Down
1 change: 1 addition & 0 deletions test/parallel/test-bootstrap-modules.js
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ const expectedModules = new Set([
if (!common.isMainThread) {
[
'Internal Binding messaging',
'Internal Binding performance',
'Internal Binding symbols',
'Internal Binding worker',
'NativeModule internal/streams/duplex',
Expand Down
Loading

1 comment on commit 04d1664

@amark
Copy link

@amark amark commented on 04d1664 Oct 28, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🎉

So excited for this, thank you.

Please sign in to comment.