Skip to content

Commit

Permalink
stream: fix 0 transform hwm backpressure
Browse files Browse the repository at this point in the history
PR-URL: #43685
Refs: #42457
Refs: https://github.com/nodejs/node/pull/43648/files
Reviewed-By: Matteo Collina <matteo.collina@gmail.com>
Reviewed-By: Benjamin Gruenbaum <benjamingr@gmail.com>
  • Loading branch information
ronag authored and danielleadams committed Jul 26, 2022
1 parent 7b276b8 commit bf3991b
Show file tree
Hide file tree
Showing 4 changed files with 54 additions and 24 deletions.
27 changes: 23 additions & 4 deletions lib/internal/streams/transform.js
Original file line number Diff line number Diff line change
Expand Up @@ -65,14 +65,15 @@

const {
ObjectSetPrototypeOf,
Symbol
Symbol,
} = primordials;

module.exports = Transform;
const {
ERR_METHOD_NOT_IMPLEMENTED
} = require('internal/errors').codes;
const Duplex = require('internal/streams/duplex');
const { getHighWaterMark } = require('internal/streams/state');
ObjectSetPrototypeOf(Transform.prototype, Duplex.prototype);
ObjectSetPrototypeOf(Transform, Duplex);

Expand All @@ -82,6 +83,26 @@ function Transform(options) {
if (!(this instanceof Transform))
return new Transform(options);

// TODO (ronag): This should preferably always be
// applied but would be semver-major. Or even better;
// make Transform a Readable with the Writable interface.
const readableHighWaterMark = options ? getHighWaterMark(this, options, 'readableHighWaterMark', true) : null;
if (readableHighWaterMark === 0) {
// A Duplex will buffer both on the writable and readable side while
// a Transform just wants to buffer hwm number of elements. To avoid
// buffering twice we disable buffering on the writable side.
options = {
...options,
highWaterMark: null,
readableHighWaterMark,
// TODO (ronag): 0 is not optimal since we have
// a "bug" where we check needDrain before calling _write and not after.
// Refs: https://github.com/nodejs/node/pull/32887
// Refs: https://github.com/nodejs/node/pull/35941
writableHighWaterMark: options.writableHighWaterMark || 0
};
}

Duplex.call(this, options);

// We have implemented the _read method, and done the other things
Expand Down Expand Up @@ -164,9 +185,7 @@ Transform.prototype._write = function(chunk, encoding, callback) {
if (
wState.ended || // Backwards compat.
length === rState.length || // Backwards compat.
rState.length < rState.highWaterMark ||
rState.highWaterMark === 0 ||
rState.length === 0
rState.length < rState.highWaterMark
) {
callback();
} else {
Expand Down
4 changes: 3 additions & 1 deletion test/parallel/test-stream-passthrough-drain.js
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
'use strict';
const common = require('../common');
const assert = require('assert');
const { PassThrough } = require('stream');

const pt = new PassThrough({ highWaterMark: 0 });
pt.on('drain', common.mustCall());
pt.write('hello');
assert(!pt.write('hello1'));
pt.read();
pt.read();
28 changes: 28 additions & 0 deletions test/parallel/test-stream-transform-hwm0.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
'use strict';

const common = require('../common');
const assert = require('assert');
const { Transform } = require('stream');

const t = new Transform({
objectMode: true, highWaterMark: 0,
transform(chunk, enc, callback) {
process.nextTick(() => callback(null, chunk, enc));
}
});

assert.strictEqual(t.write(1), false);
t.on('drain', common.mustCall(() => {
assert.strictEqual(t.write(2), false);
t.end();
}));

t.once('readable', common.mustCall(() => {
assert.strictEqual(t.read(), 1);
setImmediate(common.mustCall(() => {
assert.strictEqual(t.read(), null);
t.once('readable', common.mustCall(() => {
assert.strictEqual(t.read(), 2);
}));
}));
}));
19 changes: 0 additions & 19 deletions test/parallel/test-stream-transform-split-highwatermark.js
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,6 @@ testTransform(666, 777, {
writableHighWaterMark: 777,
});

// test 0 overriding defaultHwm
testTransform(0, DEFAULT, { readableHighWaterMark: 0 });
testTransform(DEFAULT, 0, { writableHighWaterMark: 0 });

// Test highWaterMark overriding
testTransform(555, 555, {
highWaterMark: 555,
Expand All @@ -39,21 +35,6 @@ testTransform(555, 555, {
writableHighWaterMark: 777,
});

// Test highWaterMark = 0 overriding
testTransform(0, 0, {
highWaterMark: 0,
readableHighWaterMark: 666,
});
testTransform(0, 0, {
highWaterMark: 0,
writableHighWaterMark: 777,
});
testTransform(0, 0, {
highWaterMark: 0,
readableHighWaterMark: 666,
writableHighWaterMark: 777,
});

// Test undefined, null
[undefined, null].forEach((v) => {
testTransform(DEFAULT, DEFAULT, { readableHighWaterMark: v });
Expand Down

0 comments on commit bf3991b

Please sign in to comment.