From e70f3f31dfdf5823dbc2044bc45bd3a4df366e47 Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Fri, 20 Dec 2019 15:49:24 +0100 Subject: [PATCH 1/5] stream: reset flowing state if no 'readable' or 'data' listeners. If we don't have any 'readable' or 'data' listeners and we are not about to resume. Then reset flowing state to initial null state. Fixes: https://github.com/nodejs/node/issues/24474 --- lib/_stream_readable.js | 2 ++ 1 file changed, 2 insertions(+) diff --git a/lib/_stream_readable.js b/lib/_stream_readable.js index 63efff40373014..bdfd7dc5285910 100644 --- a/lib/_stream_readable.js +++ b/lib/_stream_readable.js @@ -975,6 +975,8 @@ function updateReadableListening(self) { // Crude way to check if we should resume } else if (self.listenerCount('data') > 0) { self.resume(); + } else if (!state.readableListening) { + state.flowing = null; } } From 1514fb96b2bd564003e37520aa772463c156c176 Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Fri, 20 Dec 2019 16:19:57 +0100 Subject: [PATCH 2/5] fixup: add test --- test/parallel/test-stream-readable-data.js | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) create mode 100644 test/parallel/test-stream-readable-data.js diff --git a/test/parallel/test-stream-readable-data.js b/test/parallel/test-stream-readable-data.js new file mode 100644 index 00000000000000..277adddde63584 --- /dev/null +++ b/test/parallel/test-stream-readable-data.js @@ -0,0 +1,19 @@ +'use strict'; +const common = require('../common'); + +const { Readable } = require('stream'); + +const readable = new Readable({ + read() {} +}); + +function read() {} + +readable.setEncoding('utf8'); +readable.on('readable', read); +readable.removeListener('readable', read); + +process.nextTick(function() { + readable.on('data', common.mustCall()); + readable.push('hello'); +}); From 2a2782644cb3c41c2f23ab0bda3018db1449ff51 Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Sat, 21 Dec 2019 08:44:51 +0100 Subject: [PATCH 3/5] fixup: pasued --- lib/_stream_readable.js | 6 +++--- .../test-stream-readable-pause-and-resume.js | 18 ++++++++++++++++++ 2 files changed, 21 insertions(+), 3 deletions(-) diff --git a/lib/_stream_readable.js b/lib/_stream_readable.js index bdfd7dc5285910..9505ddae49624e 100644 --- a/lib/_stream_readable.js +++ b/lib/_stream_readable.js @@ -126,7 +126,7 @@ function ReadableState(options, stream, isDuplex) { this.emittedReadable = false; this.readableListening = false; this.resumeScheduled = false; - this.paused = true; + this.paused = null; // True if the error was already emitted and should not be thrown again this.errorEmitted = false; @@ -368,7 +368,7 @@ function chunkInvalid(state, chunk) { Readable.prototype.isPaused = function() { - return this._readableState.flowing === false; + return this._readableState.paused === true; }; // Backwards compatibility. @@ -967,7 +967,7 @@ function updateReadableListening(self) { const state = self._readableState; state.readableListening = self.listenerCount('readable') > 0; - if (state.resumeScheduled && !state.paused) { + if (state.resumeScheduled && state.paused === false) { // Flowing needs to be set to true now, otherwise // the upcoming resume will not flow. state.flowing = true; diff --git a/test/parallel/test-stream-readable-pause-and-resume.js b/test/parallel/test-stream-readable-pause-and-resume.js index 4d7d860a6373d4..5d4266117b7fad 100644 --- a/test/parallel/test-stream-readable-pause-and-resume.js +++ b/test/parallel/test-stream-readable-pause-and-resume.js @@ -1,6 +1,7 @@ 'use strict'; const common = require('../common'); +const assert = require('assert'); const { Readable } = require('stream'); let ticks = 18; @@ -38,3 +39,20 @@ function readAndPause() { rs.on('data', ondata); } + +{ + const readable = new Readable({ + read() {} + }); + + function read() {} + + readable.setEncoding('utf8'); + readable.on('readable', read); + readable.removeListener('readable', read); + readable.pause(); + + process.nextTick(function() { + assert(readable.isPaused()); // Throws. + }); +} From 906fd310dab6eee15e795393361040360ba15fec Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Sat, 21 Dec 2019 08:49:35 +0100 Subject: [PATCH 4/5] fixup: compat paused --- lib/_stream_readable.js | 23 +++++++++++++++---- .../test-stream-readable-pause-and-resume.js | 2 +- 2 files changed, 19 insertions(+), 6 deletions(-) diff --git a/lib/_stream_readable.js b/lib/_stream_readable.js index 9505ddae49624e..165bf53c09b998 100644 --- a/lib/_stream_readable.js +++ b/lib/_stream_readable.js @@ -28,6 +28,7 @@ const { ObjectDefineProperty, ObjectSetPrototypeOf, SymbolAsyncIterator, + Symbol } = primordials; module.exports = Readable; @@ -51,6 +52,8 @@ const { ERR_STREAM_UNSHIFT_AFTER_END_EVENT } = require('internal/errors').codes; +const kPaused = Symbol('kPaused'); + // Lazy loaded to improve the startup performance. let StringDecoder; let createReadableStreamAsyncIterator; @@ -126,7 +129,7 @@ function ReadableState(options, stream, isDuplex) { this.emittedReadable = false; this.readableListening = false; this.resumeScheduled = false; - this.paused = null; + this[kPaused] = null; // True if the error was already emitted and should not be thrown again this.errorEmitted = false; @@ -173,6 +176,16 @@ ObjectDefineProperty(ReadableState.prototype, 'pipesCount', { } }); +// Legacy property for `paused` +ObjectDefineProperty(ReadableState.prototype, 'paused', { + get() { + return this[kPaused] !== false; + }, + set(value) { + this[kPaused] = Boolean(value); + } +}); + function Readable(options) { if (!(this instanceof Readable)) return new Readable(options); @@ -368,7 +381,7 @@ function chunkInvalid(state, chunk) { Readable.prototype.isPaused = function() { - return this._readableState.paused === true; + return this._readableState[kPaused] === true; }; // Backwards compatibility. @@ -967,7 +980,7 @@ function updateReadableListening(self) { const state = self._readableState; state.readableListening = self.listenerCount('readable') > 0; - if (state.resumeScheduled && state.paused === false) { + if (state.resumeScheduled && state[kPaused] === false) { // Flowing needs to be set to true now, otherwise // the upcoming resume will not flow. state.flowing = true; @@ -997,7 +1010,7 @@ Readable.prototype.resume = function() { state.flowing = !state.readableListening; resume(this, state); } - state.paused = false; + state[kPaused] = false; return this; }; @@ -1028,7 +1041,7 @@ Readable.prototype.pause = function() { this._readableState.flowing = false; this.emit('pause'); } - this._readableState.paused = true; + this._readableState[kPaused] = true; return this; }; diff --git a/test/parallel/test-stream-readable-pause-and-resume.js b/test/parallel/test-stream-readable-pause-and-resume.js index 5d4266117b7fad..294ef2c35d4608 100644 --- a/test/parallel/test-stream-readable-pause-and-resume.js +++ b/test/parallel/test-stream-readable-pause-and-resume.js @@ -53,6 +53,6 @@ function readAndPause() { readable.pause(); process.nextTick(function() { - assert(readable.isPaused()); // Throws. + assert(readable.isPaused()); }); } From a080fd1dc96fde4d52ef2edda6a0bd65f265b401 Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Sat, 21 Dec 2019 10:36:56 +0100 Subject: [PATCH 5/5] fixup: paused --- lib/_stream_readable.js | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/lib/_stream_readable.js b/lib/_stream_readable.js index 165bf53c09b998..bda03cc1529892 100644 --- a/lib/_stream_readable.js +++ b/lib/_stream_readable.js @@ -182,7 +182,7 @@ ObjectDefineProperty(ReadableState.prototype, 'paused', { return this[kPaused] !== false; }, set(value) { - this[kPaused] = Boolean(value); + this[kPaused] = !!value; } }); @@ -381,7 +381,8 @@ function chunkInvalid(state, chunk) { Readable.prototype.isPaused = function() { - return this._readableState[kPaused] === true; + const state = this._readableState; + return state[kPaused] === true || state.flowing === false; }; // Backwards compatibility.