Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

stream: support AbortSignal in constructor #36431

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 46 additions & 0 deletions doc/api/stream.md
Original file line number Diff line number Diff line change
Expand Up @@ -1938,6 +1938,9 @@ method.
#### `new stream.Writable([options])`
<!-- YAML
changes:
- version: REPLACEME
pr-url: https://github.com/nodejs/node/pull/36431
description: support passing in an AbortSignal.
- version: v14.0.0
pr-url: https://github.com/nodejs/node/pull/30623
description: Change `autoDestroy` option default to `true`.
Expand Down Expand Up @@ -1985,6 +1988,7 @@ changes:
[`stream._construct()`][writable-_construct] method.
* `autoDestroy` {boolean} Whether this stream should automatically call
`.destroy()` on itself after ending. **Default:** `true`.
* `signal` {AbortSignal} A signal representing possible cancellation.

<!-- eslint-disable no-useless-constructor -->
```js
Expand Down Expand Up @@ -2028,6 +2032,27 @@ const myWritable = new Writable({
});
```

Calling `abort` on the `AbortController` corresponding to the passed
`AbortSignal` will behave the same way as calling `.destroy(new AbortError())`
on the writeable stream.

```js
const { Writable } = require('stream');

const controller = new AbortController();
const myWritable = new Writable({
write(chunk, encoding, callback) {
// ...
},
writev(chunks, callback) {
// ...
},
signal: controller.signal
});
// Later, abort the operation closing the stream
controller.abort();

```
#### `writable._construct(callback)`
<!-- YAML
added: v15.0.0
Expand Down Expand Up @@ -2276,6 +2301,9 @@ constructor and implement the [`readable._read()`][] method.
#### `new stream.Readable([options])`
<!-- YAML
changes:
- version: REPLACEME
pr-url: https://github.com/nodejs/node/pull/36431
description: support passing in an AbortSignal.
- version: v14.0.0
pr-url: https://github.com/nodejs/node/pull/30623
description: Change `autoDestroy` option default to `true`.
Expand Down Expand Up @@ -2306,6 +2334,7 @@ changes:
[`stream._construct()`][readable-_construct] method.
* `autoDestroy` {boolean} Whether this stream should automatically call
`.destroy()` on itself after ending. **Default:** `true`.
* `signal` {AbortSignal} A signal representing possible cancellation.

<!-- eslint-disable no-useless-constructor -->
```js
Expand Down Expand Up @@ -2346,6 +2375,23 @@ const myReadable = new Readable({
});
```

Calling `abort` on the `AbortController` corresponding to the passed
`AbortSignal` will behave the same way as calling `.destroy(new AbortError())`
on the readable created.

```js
const fs = require('fs');
const controller = new AbortController();
const read = new Readable({
read(size) {
// ...
},
signal: controller.signal
});
// Later, abort the operation closing the stream
controller.abort();
```

#### `readable._construct(callback)`
<!-- YAML
added: v15.0.0
Expand Down
15 changes: 10 additions & 5 deletions lib/internal/streams/add-abort-signal.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,11 @@ const eos = require('internal/streams/end-of-stream');
const { ERR_INVALID_ARG_TYPE } = codes;

// This method is inlined here for readable-stream
// It also does not allow for signal to not exist on the steam
// https://github.com/nodejs/node/pull/36061#discussion_r533718029
const validateAbortSignal = (signal, name) => {
if (signal !== undefined &&
(signal === null ||
typeof signal !== 'object' ||
!('aborted' in signal))) {
if (typeof signal !== 'object' ||
!('aborted' in signal)) {
throw new ERR_INVALID_ARG_TYPE(name, 'AbortSignal', signal);
}
};
Expand All @@ -23,11 +22,17 @@ function isStream(obj) {
return !!(obj && typeof obj.pipe === 'function');
}

module.exports = function addAbortSignal(signal, stream) {
module.exports.addAbortSignal = function addAbortSignal(signal, stream) {
validateAbortSignal(signal, 'signal');
if (!isStream(stream)) {
throw new ERR_INVALID_ARG_TYPE('stream', 'stream.Stream', stream);
}
return module.exports.addAbortSignalNoValidate(signal, stream);
};
module.exports.addAbortSignalNoValidate = function(signal, stream) {
benjamingr marked this conversation as resolved.
Show resolved Hide resolved
if (typeof signal !== 'object' || !('aborted' in signal)) {
return stream;
}
const onAbort = () => {
stream.destroy(new AbortError());
};
Expand Down
6 changes: 6 additions & 0 deletions lib/internal/streams/readable.js
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,10 @@ const EE = require('events');
const { Stream, prependListener } = require('internal/streams/legacy');
const { Buffer } = require('buffer');

const {
addAbortSignalNoValidate,
} = require('internal/streams/add-abort-signal');

let debug = require('internal/util/debuglog').debuglog('stream', (fn) => {
debug = fn;
});
Expand Down Expand Up @@ -192,6 +196,8 @@ function Readable(options) {

if (typeof options.construct === 'function')
this._construct = options.construct;
if (options.signal && !isDuplex)
addAbortSignalNoValidate(options.signal, this);
}

Stream.call(this, options);
Expand Down
7 changes: 7 additions & 0 deletions lib/internal/streams/writable.js
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,11 @@ const EE = require('events');
const Stream = require('internal/streams/legacy').Stream;
const { Buffer } = require('buffer');
const destroyImpl = require('internal/streams/destroy');

const {
addAbortSignalNoValidate,
} = require('internal/streams/add-abort-signal');

const {
getHighWaterMark,
getDefaultHighWaterMark
Expand Down Expand Up @@ -263,6 +268,8 @@ function Writable(options) {

if (typeof options.construct === 'function')
this._construct = options.construct;
if (options.signal)
benjamingr marked this conversation as resolved.
Show resolved Hide resolved
addAbortSignalNoValidate(options.signal, this);
}

Stream.call(this, options);
Expand Down
3 changes: 2 additions & 1 deletion lib/stream.js
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@ Stream.Duplex = require('internal/streams/duplex');
Stream.Transform = require('internal/streams/transform');
Stream.PassThrough = require('internal/streams/passthrough');
Stream.pipeline = pipeline;
Stream.addAbortSignal = require('internal/streams/add-abort-signal');
const { addAbortSignal } = require('internal/streams/add-abort-signal');
Stream.addAbortSignal = addAbortSignal;
Stream.finished = eos;

function lazyLoadPromises() {
Expand Down
17 changes: 17 additions & 0 deletions test/parallel/test-stream-duplex-destroy.js
Original file line number Diff line number Diff line change
Expand Up @@ -238,3 +238,20 @@ const assert = require('assert');
});
duplex.on('close', common.mustCall());
}
{
// Check abort signal
const controller = new AbortController();
const { signal } = controller;
const duplex = new Duplex({
write(chunk, enc, cb) { cb(); },
read() {},
signal,
});
let count = 0;
duplex.on('error', common.mustCall((e) => {
assert.strictEqual(count++, 0); // Ensure not called twice
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The mustCall already handles this doesn't it?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jasnell I want to make sure error is called, but not called twice - I'd need a comon.mustCallOnce or something.

assert.strictEqual(e.name, 'AbortError');
}));
duplex.on('close', common.mustCall());
controller.abort();
}
16 changes: 16 additions & 0 deletions test/parallel/test-stream-readable-destroy.js
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,22 @@ const assert = require('assert');
read.on('data', common.mustNotCall());
}

{
const controller = new AbortController();
const read = new Readable({
signal: controller.signal,
read() {
this.push('asd');
},
});

read.on('error', common.mustCall((e) => {
assert.strictEqual(e.name, 'AbortError');
}));
controller.abort();
read.on('data', common.mustNotCall());
}

{
const controller = new AbortController();
const read = addAbortSignal(controller.signal, new Readable({
Expand Down
15 changes: 15 additions & 0 deletions test/parallel/test-stream-writable-destroy.js
Original file line number Diff line number Diff line change
Expand Up @@ -431,3 +431,18 @@ const assert = require('assert');
write.write('asd');
ac.abort();
}

{
const ac = new AbortController();
const write = new Writable({
signal: ac.signal,
write(chunk, enc, cb) { cb(); }
});

write.on('error', common.mustCall((e) => {
assert.strictEqual(e.name, 'AbortError');
assert.strictEqual(write.destroyed, true);
}));
write.write('asd');
ac.abort();
}