Skip to content

Commit

Permalink
stream: add setter & getter for default highWaterMark (#46929)
Browse files Browse the repository at this point in the history
Adds stream.(get|set)DefaultHighWaterMark to read or update
the default hwm.

PR-URL: #46929
Reviewed-By: Matteo Collina <matteo.collina@gmail.com>
Reviewed-By: Paolo Insogna <paolo@cowtech.it>
Reviewed-By: Moshe Atlow <moshe@atlow.co.il>
Reviewed-By: Luigi Pinca <luigipinca@gmail.com>
Reviewed-By: Benjamin Gruenbaum <benjamingr@gmail.com>
Reviewed-By: Michael Dawson <midawson@redhat.com>
Reviewed-By: Erick Wendel <erick.workspace@gmail.com>
  • Loading branch information
ronag authored and RafaelGSS committed Apr 6, 2023
1 parent 7fc2c57 commit 51b7f8e
Show file tree
Hide file tree
Showing 5 changed files with 81 additions and 5 deletions.
23 changes: 23 additions & 0 deletions doc/api/stream.md
Original file line number Diff line number Diff line change
Expand Up @@ -3346,6 +3346,29 @@ reader.read().then(({ value, done }) => {
});
```

### `stream.getDefaultHighWaterMark(objectMode)`

<!-- YAML
added: REPLACEME
-->

* {boolean} objectMode
* Returns: {integer}

Returns the default highWaterMark used by streams.
Defaults to `16384` (16 KiB), or `16` for `objectMode`.

### `stream.setDefaultHighWaterMark(objectMode, value)`

<!-- YAML
added: REPLACEME
-->

* {boolean} objectMode
* {integer} highWaterMark value

Sets the default highWaterMark used by streams.

## API for stream implementers

<!--type=misc-->
Expand Down
8 changes: 4 additions & 4 deletions lib/_http_outgoing.js
Original file line number Diff line number Diff line change
Expand Up @@ -80,12 +80,11 @@ let debug = require('internal/util/debuglog').debuglog('http', (fn) => {
debug = fn;
});

const HIGH_WATER_MARK = getDefaultHighWaterMark();

const kCorked = Symbol('corked');
const kUniqueHeaders = Symbol('kUniqueHeaders');
const kBytesWritten = Symbol('kBytesWritten');
const kErrored = Symbol('errored');
const kHighWaterMark = Symbol('kHighWaterMark');

const nop = () => {};

Expand Down Expand Up @@ -150,6 +149,7 @@ function OutgoingMessage() {
this._onPendingData = nop;

this[kErrored] = null;
this[kHighWaterMark] = getDefaultHighWaterMark();
}
ObjectSetPrototypeOf(OutgoingMessage.prototype, Stream.prototype);
ObjectSetPrototypeOf(OutgoingMessage, Stream);
Expand Down Expand Up @@ -196,7 +196,7 @@ ObjectDefineProperty(OutgoingMessage.prototype, 'writableLength', {
ObjectDefineProperty(OutgoingMessage.prototype, 'writableHighWaterMark', {
__proto__: null,
get() {
return this.socket ? this.socket.writableHighWaterMark : HIGH_WATER_MARK;
return this.socket ? this.socket.writableHighWaterMark : this[kHighWaterMark];
},
});

Expand Down Expand Up @@ -403,7 +403,7 @@ function _writeRaw(data, encoding, callback, size) {
this.outputData.push({ data, encoding, callback });
this.outputSize += data.length;
this._onPendingData(data.length);
return this.outputSize < HIGH_WATER_MARK;
return this.outputSize < this[kHighWaterMark];
}


Expand Down
16 changes: 15 additions & 1 deletion lib/internal/streams/state.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,29 @@ const {
MathFloor,
NumberIsInteger,
} = primordials;
const { validateInteger } = require('internal/validators');

const { ERR_INVALID_ARG_VALUE } = require('internal/errors').codes;

let defaultHighWaterMarkBytes = 16 * 1024;
let defaultHighWaterMarkObjectMode = 16;

function highWaterMarkFrom(options, isDuplex, duplexKey) {
return options.highWaterMark != null ? options.highWaterMark :
isDuplex ? options[duplexKey] : null;
}

function getDefaultHighWaterMark(objectMode) {
return objectMode ? 16 : 16 * 1024;
return objectMode ? defaultHighWaterMarkObjectMode : defaultHighWaterMarkBytes;
}

function setDefaultHighWaterMark(objectMode, value) {
validateInteger(value, 'value', 0);
if (objectMode) {
defaultHighWaterMarkObjectMode = value;
} else {
defaultHighWaterMarkBytes = value;
}
}

function getHighWaterMark(state, options, duplexKey, isDuplex) {
Expand All @@ -33,4 +46,5 @@ function getHighWaterMark(state, options, duplexKey, isDuplex) {
module.exports = {
getHighWaterMark,
getDefaultHighWaterMark,
setDefaultHighWaterMark,
};
3 changes: 3 additions & 0 deletions lib/stream.js
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ const {
},
} = require('internal/errors');
const compose = require('internal/streams/compose');
const { setDefaultHighWaterMark, getDefaultHighWaterMark } = require('internal/streams/state');
const { pipeline } = require('internal/streams/pipeline');
const { destroyer } = require('internal/streams/destroy');
const eos = require('internal/streams/end-of-stream');
Expand Down Expand Up @@ -105,6 +106,8 @@ Stream.addAbortSignal = addAbortSignal;
Stream.finished = eos;
Stream.destroy = destroyer;
Stream.compose = compose;
Stream.setDefaultHighWaterMark = setDefaultHighWaterMark;
Stream.getDefaultHighWaterMark = getDefaultHighWaterMark;

ObjectDefineProperty(Stream, 'promises', {
__proto__: null,
Expand Down
36 changes: 36 additions & 0 deletions test/parallel/test-stream-set-default-hwm.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
'use strict';

require('../common');

const assert = require('node:assert');
const {
setDefaultHighWaterMark,
getDefaultHighWaterMark,
Writable,
Readable,
Transform
} = require('stream');

assert.notStrictEqual(getDefaultHighWaterMark(false), 32 * 1000);
setDefaultHighWaterMark(false, 32 * 1000);
assert.strictEqual(getDefaultHighWaterMark(false), 32 * 1000);

assert.notStrictEqual(getDefaultHighWaterMark(true), 32);
setDefaultHighWaterMark(true, 32);
assert.strictEqual(getDefaultHighWaterMark(true), 32);

const w = new Writable({
write() {}
});
assert.strictEqual(w.writableHighWaterMark, 32 * 1000);

const r = new Readable({
read() {}
});
assert.strictEqual(r.readableHighWaterMark, 32 * 1000);

const t = new Transform({
transform() {}
});
assert.strictEqual(t.writableHighWaterMark, 32 * 1000);
assert.strictEqual(t.readableHighWaterMark, 32 * 1000);

0 comments on commit 51b7f8e

Please sign in to comment.