From e95e7f9af540c88e93701ff6ca2f38986deb8e99 Mon Sep 17 00:00:00 2001 From: Matteo Collina Date: Mon, 11 Feb 2019 13:20:26 +0100 Subject: [PATCH] stream: make sure 'readable' is emitted before ending the stream Fixes: https://github.com/nodejs/node/issues/25810 PR-URL: https://github.com/nodejs/node/pull/26059 Reviewed-By: James M Snell Reviewed-By: Luigi Pinca --- lib/_stream_readable.js | 18 +-- lib/_stream_transform.js | 3 - .../parallel/test-http-readable-data-event.js | 4 +- ...eam-readable-emit-readable-short-stream.js | 146 ++++++++++++++++++ .../test-stream-readable-emittedReadable.js | 13 +- .../test-stream-readable-needReadable.js | 10 +- ...est-stream-readable-reading-readingMore.js | 10 +- .../test-stream2-httpclient-response-end.js | 7 +- test/parallel/test-stream2-transform.js | 13 +- 9 files changed, 191 insertions(+), 33 deletions(-) create mode 100644 test/parallel/test-stream-readable-emit-readable-short-stream.js diff --git a/lib/_stream_readable.js b/lib/_stream_readable.js index 861c262e15..85d1fb0c33 100644 --- a/lib/_stream_readable.js +++ b/lib/_stream_readable.js @@ -505,20 +505,12 @@ function onEofChunk(stream, state) { } } state.ended = true; + state.needReadable = false; - if (state.sync) { - // If we are sync, wait until next tick to emit the data. - // Otherwise we risk emitting data in the flow() - // the readable code triggers during a read() call - emitReadable(stream); - } else { - // Emit 'readable' now to make sure it gets picked up. - state.needReadable = false; - if (!state.emittedReadable) { - state.emittedReadable = true; - emitReadable_(stream); - } - } + // We are not protecting if emittedReadable = true, + // so 'readable' gets scheduled anyway. + state.emittedReadable = true; + process.nextTick(emitReadable_, stream); } // Don't emit readable right away in sync mode, because this can trigger diff --git a/lib/_stream_transform.js b/lib/_stream_transform.js index 9d8da0c547..8745c71341 100644 --- a/lib/_stream_transform.js +++ b/lib/_stream_transform.js @@ -116,9 +116,6 @@ function Transform(options) { writeencoding: null }; - // Start out asking for a readable event once data is transformed. - this._readableState.needReadable = true; - // We have implemented the _read method, and done the other things // that Readable wants before the first _read call, so unset the // sync guard flag. diff --git a/test/parallel/test-http-readable-data-event.js b/test/parallel/test-http-readable-data-event.js index ddaeca5fe7..21b1fa65c6 100644 --- a/test/parallel/test-http-readable-data-event.js +++ b/test/parallel/test-http-readable-data-event.js @@ -29,7 +29,7 @@ const server = http.createServer((req, res) => { }; const expectedData = [helloWorld, helloAgainLater]; - const expectedRead = [helloWorld, null, helloAgainLater, null]; + const expectedRead = [helloWorld, null, helloAgainLater, null, null]; const req = http.request(opts, (res) => { res.on('error', common.mustNotCall()); @@ -42,7 +42,7 @@ const server = http.createServer((req, res) => { assert.strictEqual(data, expectedRead.shift()); next(); } while (data !== null); - }, 2)); + }, 3)); res.setEncoding('utf8'); res.on('data', common.mustCall((data) => { diff --git a/test/parallel/test-stream-readable-emit-readable-short-stream.js b/test/parallel/test-stream-readable-emit-readable-short-stream.js new file mode 100644 index 0000000000..2f4f43baf5 --- /dev/null +++ b/test/parallel/test-stream-readable-emit-readable-short-stream.js @@ -0,0 +1,146 @@ +'use strict'; + +const common = require('../common'); +const stream = require('stream'); +const assert = require('assert'); + +{ + const r = new stream.Readable({ + read: common.mustCall(function() { + this.push('content'); + this.push(null); + }) + }); + + const t = new stream.Transform({ + transform: common.mustCall(function(chunk, encoding, callback) { + this.push(chunk); + return callback(); + }), + flush: common.mustCall(function(callback) { + return callback(); + }) + }); + + r.pipe(t); + t.on('readable', common.mustCall(function() { + while (true) { + const chunk = t.read(); + if (!chunk) + break; + + assert.strictEqual(chunk.toString(), 'content'); + } + }, 2)); +} + +{ + const t = new stream.Transform({ + transform: common.mustCall(function(chunk, encoding, callback) { + this.push(chunk); + return callback(); + }), + flush: common.mustCall(function(callback) { + return callback(); + }) + }); + + t.end('content'); + + t.on('readable', common.mustCall(function() { + while (true) { + const chunk = t.read(); + if (!chunk) + break; + assert.strictEqual(chunk.toString(), 'content'); + } + }, 2)); +} + +{ + const t = new stream.Transform({ + transform: common.mustCall(function(chunk, encoding, callback) { + this.push(chunk); + return callback(); + }), + flush: common.mustCall(function(callback) { + return callback(); + }) + }); + + t.write('content'); + t.end(); + + t.on('readable', common.mustCall(function() { + while (true) { + const chunk = t.read(); + if (!chunk) + break; + assert.strictEqual(chunk.toString(), 'content'); + } + }, 2)); +} + +{ + const t = new stream.Readable({ + read() { + } + }); + + t.on('readable', common.mustCall(function() { + while (true) { + const chunk = t.read(); + if (!chunk) + break; + assert.strictEqual(chunk.toString(), 'content'); + } + }, 2)); + + t.push('content'); + t.push(null); +} + +{ + const t = new stream.Readable({ + read() { + } + }); + + t.on('readable', common.mustCall(function() { + while (true) { + const chunk = t.read(); + if (!chunk) + break; + assert.strictEqual(chunk.toString(), 'content'); + } + }, 2)); + + process.nextTick(() => { + t.push('content'); + t.push(null); + }); +} + +{ + const t = new stream.Transform({ + transform: common.mustCall(function(chunk, encoding, callback) { + this.push(chunk); + return callback(); + }), + flush: common.mustCall(function(callback) { + return callback(); + }) + }); + + t.on('readable', common.mustCall(function() { + while (true) { + const chunk = t.read(); + if (!chunk) + break; + assert.strictEqual(chunk.toString(), 'content'); + } + }, 2)); + + t.write('content'); + t.end(); +} diff --git a/test/parallel/test-stream-readable-emittedReadable.js b/test/parallel/test-stream-readable-emittedReadable.js index 5b9affc59f..167904e592 100644 --- a/test/parallel/test-stream-readable-emittedReadable.js +++ b/test/parallel/test-stream-readable-emittedReadable.js @@ -43,12 +43,23 @@ const noRead = new Readable({ read: () => {} }); -noRead.on('readable', common.mustCall(() => { +noRead.once('readable', common.mustCall(() => { // emittedReadable should be true when the readable event is emitted assert.strictEqual(noRead._readableState.emittedReadable, true); noRead.read(0); // emittedReadable is not reset during read(0) assert.strictEqual(noRead._readableState.emittedReadable, true); + + noRead.on('readable', common.mustCall(() => { + // The second 'readable' is emitted because we are ending + + // emittedReadable should be true when the readable event is emitted + assert.strictEqual(noRead._readableState.emittedReadable, false); + noRead.read(0); + // emittedReadable is not reset during read(0) + assert.strictEqual(noRead._readableState.emittedReadable, false); + + })); })); noRead.push('foo'); diff --git a/test/parallel/test-stream-readable-needReadable.js b/test/parallel/test-stream-readable-needReadable.js index 7058e123f0..54618b5e8a 100644 --- a/test/parallel/test-stream-readable-needReadable.js +++ b/test/parallel/test-stream-readable-needReadable.js @@ -14,7 +14,7 @@ readable.on('readable', common.mustCall(() => { // When the readable event fires, needReadable is reset. assert.strictEqual(readable._readableState.needReadable, false); readable.read(); -})); +}, 2)); // If a readable listener is attached, then a readable event is needed. assert.strictEqual(readable._readableState.needReadable, true); @@ -74,12 +74,14 @@ const slowProducer = new Readable({ }); slowProducer.on('readable', common.mustCall(() => { - if (slowProducer.read(8) === null) { + const chunk = slowProducer.read(8); + const state = slowProducer._readableState; + if (chunk === null) { // The buffer doesn't have enough data, and the stream is not need, // we need to notify the reader when data arrives. - assert.strictEqual(slowProducer._readableState.needReadable, true); + assert.strictEqual(state.needReadable, true); } else { - assert.strictEqual(slowProducer._readableState.needReadable, false); + assert.strictEqual(state.needReadable, false); } }, 4)); diff --git a/test/parallel/test-stream-readable-reading-readingMore.js b/test/parallel/test-stream-readable-reading-readingMore.js index 2198a889f0..a2a0665808 100644 --- a/test/parallel/test-stream-readable-reading-readingMore.js +++ b/test/parallel/test-stream-readable-reading-readingMore.js @@ -31,7 +31,7 @@ const Readable = require('stream').Readable; assert.strictEqual(state.reading, false); } - const expectedReadingMore = [true, false]; + const expectedReadingMore = [true, false, false]; readable.on('readable', common.mustCall(() => { // There is only one readingMore scheduled from on('data'), // after which everything is governed by the .read() call @@ -40,10 +40,12 @@ const Readable = require('stream').Readable; // If the stream has ended, we shouldn't be reading assert.strictEqual(state.ended, !state.reading); - const data = readable.read(); - if (data === null) // reached end of stream + // consume all the data + while (readable.read() !== null) {} + + if (expectedReadingMore.length === 0) // reached end of stream process.nextTick(common.mustCall(onStreamEnd, 1)); - }, 2)); + }, 3)); readable.on('end', common.mustCall(onStreamEnd)); readable.push('pushed'); diff --git a/test/parallel/test-stream2-httpclient-response-end.js b/test/parallel/test-stream2-httpclient-response-end.js index 27d31e50a9..8b2920668c 100644 --- a/test/parallel/test-stream2-httpclient-response-end.js +++ b/test/parallel/test-stream2-httpclient-response-end.js @@ -11,8 +11,11 @@ const server = http.createServer(function(req, res) { let data = ''; res.on('readable', common.mustCall(function() { console.log('readable event'); - data += res.read(); - })); + let chunk; + while ((chunk = res.read()) !== null) { + data += chunk; + } + }, 2)); res.on('end', common.mustCall(function() { console.log('end event'); assert.strictEqual(msg, data); diff --git a/test/parallel/test-stream2-transform.js b/test/parallel/test-stream2-transform.js index 92e4669dd4..d7611bb2c4 100644 --- a/test/parallel/test-stream2-transform.js +++ b/test/parallel/test-stream2-transform.js @@ -321,11 +321,16 @@ const Transform = require('_stream_transform'); pt.end(); - assert.strictEqual(emits, 1); - assert.strictEqual(pt.read(5).toString(), 'l'); - assert.strictEqual(pt.read(5), null); + // The next readable is emitted on the next tick. + assert.strictEqual(emits, 0); - assert.strictEqual(emits, 1); + process.on('nextTick', function() { + assert.strictEqual(emits, 1); + assert.strictEqual(pt.read(5).toString(), 'l'); + assert.strictEqual(pt.read(5), null); + + assert.strictEqual(emits, 1); + }); } {