Skip to content

Commit

Permalink
stream: implement streams to webstreams adapters
Browse files Browse the repository at this point in the history
Experimental adapters for the webstreams API

Signed-off-by: James M Snell <jasnell@gmail.com>

PR-URL: nodejs#39134
Reviewed-By: Robert Nagy <ronagy@icloud.com>
Reviewed-By: Matteo Collina <matteo.collina@gmail.com>
  • Loading branch information
jasnell committed Jul 13, 2021
1 parent 09b57f7 commit a99c230
Show file tree
Hide file tree
Showing 13 changed files with 2,358 additions and 0 deletions.
81 changes: 81 additions & 0 deletions doc/api/stream.md
Original file line number Diff line number Diff line change
Expand Up @@ -1892,6 +1892,87 @@ Calling `Readable.from(string)` or `Readable.from(buffer)` will not have
the strings or buffers be iterated to match the other streams semantics
for performance reasons.

### `stream.Readable.fromWeb(readableStream[, options])`
<!-- YAML
added: REPLACEME
-->

> Stability: 1 - Experimental
* `readableStream` {ReadableStream}
* `options` {Object}
* `encoding` {string}
* `highWaterMark` {number}
* `objectModel` {boolean}
* `signal` {AbortSignal}
* Returns: {stream.Readable}

### `stream.Readable.toWeb(streamReadable)`
<!-- YAML
added: REPLACEME
-->

> Stability: 1 - Experimental
* `streamReadable` {stream.Readable}
* Returns: {ReadableStream}

### `stream.Writable.fromWeb(writableStream[, options])`
<!-- YAML
added: REPLACEME
-->

> Stability: 1 - Experimental
* `writableStream` {WritableStream}
* `options` {Object}
* `decodeStrings` {boolean}
* `highWaterMark` {number}
* `objectMode` {boolean}
* `signal` {AbortSignal}
* Returns: {stream.Writable}

### `stream.Writable.toWeb(streamWritable)`
<!-- YAML
added: REPLACEME
-->

> Stability: 1 - Experimental
* `streamWritable` {stream.Writable}
* Returns: {WritableStream}

### `stream.Duplex.fromWeb(pair[, options])`
<!-- YAML
added: REPLACEME
-->

> Stability: 1 - Experimental
* `pair` {Object}
* `readable` {ReadableStream}
* `writable` {WritableStream}
* `options` {Object}
* `allowHalfOpen` {boolean}
* `decodeStrings` {boolean}
* `encoding` {string}
* `highWaterMark` {number}
* `objectMode` {boolean}
* `signal` {AbortSignal}
* Returns: {stream.Duplex}

### `stream.Duplex.toWeb(streamDuplex)`
<!-- YAML
added: REPLACEME
-->

> Stability: 1 - Experimental
* `streamDuplex` {stream.Duplex}
* Returns: {Object}
* `readable` {ReadableStream}
* `writable` {WritableStream}

### `stream.addAbortSignal(signal, stream)`
<!-- YAML
added: v15.4.0
Expand Down
2 changes: 2 additions & 0 deletions lib/internal/fs/promises.js
Original file line number Diff line number Diff line change
Expand Up @@ -788,6 +788,8 @@ module.exports = {
appendFile,
readFile,
watch,

kHandle,
},

FileHandle,
Expand Down
19 changes: 19 additions & 0 deletions lib/internal/streams/duplex.js
Original file line number Diff line number Diff line change
Expand Up @@ -114,3 +114,22 @@ ObjectDefineProperties(Duplex.prototype, {
}
}
});

let webStreamsAdapters;

// Lazy to avoid circular references
function lazyWebStreams() {
if (webStreamsAdapters === undefined)
webStreamsAdapters = require('internal/webstreams/adapters');
return webStreamsAdapters;
}

Duplex.fromWeb = function(pair, options) {
return lazyWebStreams().newStreamDuplexFromReadableWritablePair(
pair,
options);
};

Duplex.toWeb = function(duplex) {
return lazyWebStreams().newReadableWritablePairFromDuplex(duplex);
};
19 changes: 19 additions & 0 deletions lib/internal/streams/readable.js
Original file line number Diff line number Diff line change
Expand Up @@ -1355,3 +1355,22 @@ function endWritableNT(state, stream) {
Readable.from = function(iterable, opts) {
return from(Readable, iterable, opts);
};

let webStreamsAdapters;

// Lazy to avoid circular references
function lazyWebStreams() {
if (webStreamsAdapters === undefined)
webStreamsAdapters = require('internal/webstreams/adapters');
return webStreamsAdapters;
}

Readable.fromWeb = function(readableStream, options) {
return lazyWebStreams().newStreamReadableFromReadableStream(
readableStream,
options);
};

Readable.toWeb = function(streamReadable) {
return lazyWebStreams().newStreamReadableFromReadableStream(streamReadable);
};
19 changes: 19 additions & 0 deletions lib/internal/streams/writable.js
Original file line number Diff line number Diff line change
Expand Up @@ -872,3 +872,22 @@ Writable.prototype._destroy = function(err, cb) {
Writable.prototype[EE.captureRejectionSymbol] = function(err) {
this.destroy(err);
};

let webStreamsAdapters;

// Lazy to avoid circular references
function lazyWebStreams() {
if (webStreamsAdapters === undefined)
webStreamsAdapters = require('internal/webstreams/adapters');
return webStreamsAdapters;
}

Writable.fromWeb = function(writableStream, options) {
return lazyWebStreams().newStreamWritableFromWritableStream(
writableStream,
options);
};

Writable.toWeb = function(streamWritable) {
return lazyWebStreams().newWritableStreamFromStreamWritable(streamWritable);
};
Loading

0 comments on commit a99c230

Please sign in to comment.