Skip to content

Commit

Permalink
worker: add option to track unmanaged file descriptors
Browse files Browse the repository at this point in the history
Add a public option for Workers which adds tracking for raw
file descriptors, as currently, those resources are not cleaned
up, unlike e.g. `FileHandle`s.

PR-URL: #34303
Reviewed-By: James M Snell <jasnell@gmail.com>
Reviewed-By: Ben Noordhuis <info@bnoordhuis.nl>
Reviewed-By: Benjamin Gruenbaum <benjamingr@gmail.com>
  • Loading branch information
addaleax committed Sep 27, 2020
1 parent 3e21dd9 commit b4819db
Show file tree
Hide file tree
Showing 5 changed files with 89 additions and 3 deletions.
12 changes: 12 additions & 0 deletions doc/api/worker_threads.md
Original file line number Diff line number Diff line change
Expand Up @@ -561,6 +561,10 @@ if (isMainThread) {
<!-- YAML
added: v10.5.0
changes:
- version:
- REPLACEME
pr-url: https://github.com/nodejs/node/pull/34303
description: The `trackUnmanagedFds` option was introduced.
- version: v12.17.0
pr-url: https://github.com/nodejs/node/pull/32278
description: The `transferList` option was introduced.
Expand Down Expand Up @@ -612,6 +616,12 @@ changes:
occur as described in the [HTML structured clone algorithm][], and an error
will be thrown if the object cannot be cloned (e.g. because it contains
`function`s).
* `trackUnmanagedFds` {boolean} If this is set to `true`, then the Worker will
track raw file descriptors managed through [`fs.open()`][] and
[`fs.close()`][], and close them when the Worker exits, similar to other
resources like network sockets or file descriptors managed through
the [`FileHandle`][] API. This option is automatically inherited by all
nested `Worker`s. **Default**: `false`.
* `transferList` {Object[]} If one or more `MessagePort`-like objects
are passed in `workerData`, a `transferList` is required for those
items or [`ERR_MISSING_MESSAGE_PORT_IN_TRANSFER_LIST`][] will be thrown.
Expand Down Expand Up @@ -816,6 +826,8 @@ active handle in the event system. If the worker is already `unref()`ed calling
[`WebAssembly.Module`]: https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/WebAssembly/Module
[`Worker`]: #worker_threads_class_worker
[`cluster` module]: cluster.html
[`fs.open()`]: fs.html#fs_fs_open_path_flags_mode_callback
[`fs.close()`]: fs.html#fs_fs_close_fd_callback
[`port.on('message')`]: #worker_threads_event_message
[`port.onmessage()`]: https://developer.mozilla.org/en-US/docs/Web/API/MessagePort/onmessage
[`port.postMessage()`]: #worker_threads_port_postmessage_value_transferlist
Expand Down
3 changes: 2 additions & 1 deletion lib/internal/worker.js
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,8 @@ class Worker extends EventEmitter {
this[kHandle] = new WorkerImpl(url,
env === process.env ? null : env,
options.execArgv,
parseResourceLimits(options.resourceLimits));
parseResourceLimits(options.resourceLimits),
!!options.trackUnmanagedFds);
if (this[kHandle].invalidExecArgv) {
throw new ERR_WORKER_INVALID_EXEC_ARGV(this[kHandle].invalidExecArgv);
}
Expand Down
7 changes: 5 additions & 2 deletions src/node_worker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -313,7 +313,7 @@ void Worker::Run() {
context,
std::move(argv_),
std::move(exec_argv_),
EnvironmentFlags::kNoFlags,
static_cast<EnvironmentFlags::Flags>(environment_flags_),
thread_id_,
std::move(inspector_parent_handle_)));
if (is_stopped()) return;
Expand Down Expand Up @@ -460,7 +460,6 @@ void Worker::New(const FunctionCallbackInfo<Value>& args) {

std::vector<std::string> exec_argv_out;

CHECK_EQ(args.Length(), 4);
// Argument might be a string or URL
if (!args[0]->IsNullOrUndefined()) {
Utf8Value value(
Expand Down Expand Up @@ -586,6 +585,10 @@ void Worker::New(const FunctionCallbackInfo<Value>& args) {
CHECK_EQ(limit_info->Length(), kTotalResourceLimitCount);
limit_info->CopyContents(worker->resource_limits_,
sizeof(worker->resource_limits_));

CHECK(args[4]->IsBoolean());
if (args[4]->IsTrue() || env->tracks_unmanaged_fds())
worker->environment_flags_ |= EnvironmentFlags::kTrackUnmanagedFds;
}

void Worker::StartThread(const FunctionCallbackInfo<Value>& args) {
Expand Down
1 change: 1 addition & 0 deletions src/node_worker.h
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ class Worker : public AsyncWrap {
bool stopped_ = true;

bool has_ref_ = true;
uint64_t environment_flags_ = EnvironmentFlags::kNoFlags;

// The real Environment of the worker object. It has a lesser
// lifespan than the worker object itself - comes to life
Expand Down
69 changes: 69 additions & 0 deletions test/parallel/test-worker-track-unmanaged-fds.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
'use strict';
const common = require('../common');
const assert = require('assert');
const { Worker } = require('worker_threads');
const { once } = require('events');
const fs = require('fs');

// All the tests here are run sequentially, to avoid accidentally opening an fd
// which another part of the test expects to be closed.

const preamble = `
const fs = require("fs");
const { parentPort } = require('worker_threads');
const __filename = ${JSON.stringify(__filename)};
process.on('warning', (warning) => parentPort.postMessage({ warning }));
`;

(async () => {
// Consistency check: Without trackUnmanagedFds, FDs are *not* closed.
{
const w = new Worker(`${preamble}
parentPort.postMessage(fs.openSync(__filename));
`, { eval: true, trackUnmanagedFds: false });
const [ [ fd ] ] = await Promise.all([once(w, 'message'), once(w, 'exit')]);
assert(fd > 2);
fs.fstatSync(fd); // Does not throw.
fs.closeSync(fd);
}

// With trackUnmanagedFds, FDs are closed automatically.
{
const w = new Worker(`${preamble}
parentPort.postMessage(fs.openSync(__filename));
`, { eval: true, trackUnmanagedFds: true });
const [ [ fd ] ] = await Promise.all([once(w, 'message'), once(w, 'exit')]);
assert(fd > 2);
assert.throws(() => fs.fstatSync(fd), { code: 'EBADF' });
}

// There is a warning when an fd is unexpectedly opened twice.
{
const w = new Worker(`${preamble}
parentPort.postMessage(fs.openSync(__filename));
parentPort.once('message', () => {
const reopened = fs.openSync(__filename);
fs.closeSync(reopened);
});
`, { eval: true, trackUnmanagedFds: true });
const [ fd ] = await once(w, 'message');
fs.closeSync(fd);
w.postMessage('');
const [ { warning } ] = await once(w, 'message');
assert.match(warning.message,
/File descriptor \d+ opened in unmanaged mode twice/);
}

// There is a warning when an fd is unexpectedly closed.
{
const w = new Worker(`${preamble}
parentPort.once('message', (fd) => {
fs.closeSync(fd);
});
`, { eval: true, trackUnmanagedFds: true });
w.postMessage(fs.openSync(__filename));
const [ { warning } ] = await once(w, 'message');
assert.match(warning.message,
/File descriptor \d+ closed but not opened in unmanaged mode/);
}
})().then(common.mustCall());

0 comments on commit b4819db

Please sign in to comment.