Skip to content

Commit

Permalink
lib: add webstreams to Duplex.from()
Browse files Browse the repository at this point in the history
Refs: nodejs#39519
PR-URL: nodejs#46190
Reviewed-By: Matteo Collina <matteo.collina@gmail.com>
Reviewed-By: James M Snell <jasnell@gmail.com>
Reviewed-By: Robert Nagy <ronagy@icloud.com>
  • Loading branch information
debadree25 committed Feb 27, 2023
1 parent 9933495 commit 7e23a16
Show file tree
Hide file tree
Showing 2 changed files with 125 additions and 15 deletions.
38 changes: 23 additions & 15 deletions lib/internal/streams/duplexify.js
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ const {
const { destroyer } = require('internal/streams/destroy');
const Duplex = require('internal/streams/duplex');
const Readable = require('internal/streams/readable');
const Writable = require('internal/streams/writable');
const { createDeferredPromise } = require('internal/util');
const from = require('internal/streams/from');

Expand All @@ -32,6 +33,16 @@ const {
FunctionPrototypeCall
} = primordials;


const {
isBrandCheck,
} = require('internal/webstreams/util');

const isReadableStream =
isBrandCheck('ReadableStream');
const isWritableStream =
isBrandCheck('WritableStream');

// This is needed for pre node 17.
class Duplexify extends Duplex {
constructor(options) {
Expand Down Expand Up @@ -71,15 +82,13 @@ module.exports = function duplexify(body, name) {
return _duplexify({ writable: false, readable: false });
}

// TODO: Webstreams
// if (isReadableStream(body)) {
// return _duplexify({ readable: Readable.fromWeb(body) });
// }
if (isReadableStream(body)) {
return _duplexify({ readable: Readable.fromWeb(body) });
}

// TODO: Webstreams
// if (isWritableStream(body)) {
// return _duplexify({ writable: Writable.fromWeb(body) });
// }
if (isWritableStream(body)) {
return _duplexify({ writable: Writable.fromWeb(body) });
}

if (typeof body === 'function') {
const { value, write, final, destroy } = fromAsyncGen(body);
Expand Down Expand Up @@ -146,13 +155,12 @@ module.exports = function duplexify(body, name) {
});
}

// TODO: Webstreams.
// if (
// isReadableStream(body?.readable) &&
// isWritableStream(body?.writable)
// ) {
// return Duplexify.fromWeb(body);
// }
if (
isReadableStream(body?.readable) &&
isWritableStream(body?.writable)
) {
return Duplexify.fromWeb(body);
}

if (
typeof body?.writable === 'object' ||
Expand Down
102 changes: 102 additions & 0 deletions test/parallel/test-stream-duplex-from.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
const common = require('../common');
const assert = require('assert');
const { Duplex, Readable, Writable, pipeline, PassThrough } = require('stream');
const { ReadableStream, WritableStream } = require('stream/web');
const { Blob } = require('buffer');

{
Expand Down Expand Up @@ -299,3 +300,104 @@ const { Blob } = require('buffer');
assert.strictEqual(res, 'foobar');
})).on('close', common.mustCall());
}

function makeATestReadableStream(value) {
return new ReadableStream({
start(controller) {
controller.enqueue(value);
controller.close();
}
});
}

function makeATestWritableStream(writeFunc) {
return new WritableStream({
write(chunk) {
writeFunc(chunk);
}
});
}

{
const d = Duplex.from({
readable: makeATestReadableStream('foo'),
});
assert.strictEqual(d.readable, true);
assert.strictEqual(d.writable, false);

d.on('data', common.mustCall((data) => {
assert.strictEqual(data.toString(), 'foo');
}));

d.on('end', common.mustCall(() => {
assert.strictEqual(d.readable, false);
}));
}

{
const d = Duplex.from(makeATestReadableStream('foo'));

assert.strictEqual(d.readable, true);
assert.strictEqual(d.writable, false);

d.on('data', common.mustCall((data) => {
assert.strictEqual(data.toString(), 'foo');
}));

d.on('end', common.mustCall(() => {
assert.strictEqual(d.readable, false);
}));
}

{
let ret = '';
const d = Duplex.from({
writable: makeATestWritableStream((chunk) => ret += chunk),
});

assert.strictEqual(d.readable, false);
assert.strictEqual(d.writable, true);

d.end('foo');
d.on('finish', common.mustCall(() => {
assert.strictEqual(ret, 'foo');
assert.strictEqual(d.writable, false);
}));
}

{
let ret = '';
const d = Duplex.from(makeATestWritableStream((chunk) => ret += chunk));

assert.strictEqual(d.readable, false);
assert.strictEqual(d.writable, true);

d.end('foo');
d.on('finish', common.mustCall(() => {
assert.strictEqual(ret, 'foo');
assert.strictEqual(d.writable, false);
}));
}

{
let ret = '';
const d = Duplex.from({
readable: makeATestReadableStream('foo'),
writable: makeATestWritableStream((chunk) => ret += chunk),
});

d.end('bar');

d.on('data', common.mustCall((data) => {
assert.strictEqual(data.toString(), 'foo');
}));

d.on('end', common.mustCall(() => {
assert.strictEqual(d.readable, false);
}));

d.on('finish', common.mustCall(() => {
assert.strictEqual(ret, 'bar');
assert.strictEqual(d.writable, false);
}));
}

0 comments on commit 7e23a16

Please sign in to comment.