From bca23b9e16c2780070d5682d567dba1cb9107153 Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Fri, 20 Dec 2019 15:49:24 +0100 Subject: [PATCH] stream: reset flowing state if no 'readable' or 'data' listeners If we don't have any 'readable' or 'data' listeners and we are not about to resume. Then reset flowing state to initial null state. PR-URL: https://github.com/nodejs/node/pull/31036 Fixes: https://github.com/nodejs/node/issues/24474 Reviewed-By: Luigi Pinca Reviewed-By: Matteo Collina Reviewed-By: Rich Trott --- lib/_stream_readable.js | 26 +++++++++++++++---- test/parallel/test-stream-readable-data.js | 19 ++++++++++++++ .../test-stream-readable-pause-and-resume.js | 18 +++++++++++++ 3 files changed, 58 insertions(+), 5 deletions(-) create mode 100644 test/parallel/test-stream-readable-data.js diff --git a/lib/_stream_readable.js b/lib/_stream_readable.js index acdd76f8ded9fb..089bfd0c49b7d2 100644 --- a/lib/_stream_readable.js +++ b/lib/_stream_readable.js @@ -28,6 +28,7 @@ const { ObjectDefineProperty, ObjectSetPrototypeOf, SymbolAsyncIterator, + Symbol } = primordials; module.exports = Readable; @@ -51,6 +52,8 @@ const { ERR_STREAM_UNSHIFT_AFTER_END_EVENT } = require('internal/errors').codes; +const kPaused = Symbol('kPaused'); + // Lazy loaded to improve the startup performance. let StringDecoder; let createReadableStreamAsyncIterator; @@ -126,7 +129,7 @@ function ReadableState(options, stream, isDuplex) { this.emittedReadable = false; this.readableListening = false; this.resumeScheduled = false; - this.paused = true; + this[kPaused] = null; // True if the error was already emitted and should not be thrown again this.errorEmitted = false; @@ -170,6 +173,16 @@ ObjectDefineProperty(ReadableState.prototype, 'pipesCount', { } }); +// Legacy property for `paused` +ObjectDefineProperty(ReadableState.prototype, 'paused', { + get() { + return this[kPaused] !== false; + }, + set(value) { + this[kPaused] = !!value; + } +}); + function Readable(options) { if (!(this instanceof Readable)) return new Readable(options); @@ -365,7 +378,8 @@ function chunkInvalid(state, chunk) { Readable.prototype.isPaused = function() { - return this._readableState.flowing === false; + const state = this._readableState; + return state[kPaused] === true || state.flowing === false; }; // Backwards compatibility. @@ -962,7 +976,7 @@ function updateReadableListening(self) { const state = self._readableState; state.readableListening = self.listenerCount('readable') > 0; - if (state.resumeScheduled && !state.paused) { + if (state.resumeScheduled && state[kPaused] === false) { // Flowing needs to be set to true now, otherwise // the upcoming resume will not flow. state.flowing = true; @@ -970,6 +984,8 @@ function updateReadableListening(self) { // Crude way to check if we should resume } else if (self.listenerCount('data') > 0) { self.resume(); + } else if (!state.readableListening) { + state.flowing = null; } } @@ -990,7 +1006,7 @@ Readable.prototype.resume = function() { state.flowing = !state.readableListening; resume(this, state); } - state.paused = false; + state[kPaused] = false; return this; }; @@ -1021,7 +1037,7 @@ Readable.prototype.pause = function() { this._readableState.flowing = false; this.emit('pause'); } - this._readableState.paused = true; + this._readableState[kPaused] = true; return this; }; diff --git a/test/parallel/test-stream-readable-data.js b/test/parallel/test-stream-readable-data.js new file mode 100644 index 00000000000000..277adddde63584 --- /dev/null +++ b/test/parallel/test-stream-readable-data.js @@ -0,0 +1,19 @@ +'use strict'; +const common = require('../common'); + +const { Readable } = require('stream'); + +const readable = new Readable({ + read() {} +}); + +function read() {} + +readable.setEncoding('utf8'); +readable.on('readable', read); +readable.removeListener('readable', read); + +process.nextTick(function() { + readable.on('data', common.mustCall()); + readable.push('hello'); +}); diff --git a/test/parallel/test-stream-readable-pause-and-resume.js b/test/parallel/test-stream-readable-pause-and-resume.js index 4d7d860a6373d4..294ef2c35d4608 100644 --- a/test/parallel/test-stream-readable-pause-and-resume.js +++ b/test/parallel/test-stream-readable-pause-and-resume.js @@ -1,6 +1,7 @@ 'use strict'; const common = require('../common'); +const assert = require('assert'); const { Readable } = require('stream'); let ticks = 18; @@ -38,3 +39,20 @@ function readAndPause() { rs.on('data', ondata); } + +{ + const readable = new Readable({ + read() {} + }); + + function read() {} + + readable.setEncoding('utf8'); + readable.on('readable', read); + readable.removeListener('readable', read); + readable.pause(); + + process.nextTick(function() { + assert(readable.isPaused()); + }); +}