Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

stream: reset awaitDrain after manual .resume() #7160

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions lib/_stream_readable.js
Original file line number Diff line number Diff line change
Expand Up @@ -740,6 +740,7 @@ function resume_(stream, state) {
}

state.resumeScheduled = false;
state.awaitDrain = 0;
stream.emit('resume');
flow(stream);
if (state.flowing && !state.reading)
Expand Down
54 changes: 54 additions & 0 deletions test/parallel/test-stream-pipe-await-drain-manual-resume.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
'use strict';
const common = require('../common');
const stream = require('stream');

// A consumer stream with a very low highWaterMark, which starts in a state
// where it buffers the chunk it receives rather than indicating that they
// have been consumed.
const writable = new stream.Writable({
highWaterMark: 5
});

let isCurrentlyBufferingWrites = true;
const queue = [];

writable._write = (chunk, encoding, cb) => {
if (isCurrentlyBufferingWrites)
queue.push({chunk, cb});
else
cb();
};

const readable = new stream.Readable({
read() {}
});

readable.pipe(writable);

readable.once('pause', common.mustCall(() => {
// First pause, resume manually. The next write() to writable will still
// return false, because chunks are still being buffered, so it will increase
// the awaitDrain counter again.
process.nextTick(common.mustCall(() => {
readable.resume();
}));

readable.once('pause', common.mustCall(() => {
// Second pause, handle all chunks from now on. Once all callbacks that
// are currently queued up are handled, the awaitDrain drain counter should
// fall back to 0 and all chunks that are pending on the readable side
// should be flushed.
isCurrentlyBufferingWrites = false;
for (const queued of queue)
queued.cb();
}));
}));

readable.push(Buffer.alloc(100)); // Fill the writable HWM, first 'pause'.
readable.push(Buffer.alloc(100)); // Second 'pause'.
readable.push(Buffer.alloc(100)); // Should get through to the writable.
readable.push(null);

writable.on('finish', common.mustCall(() => {
// Everything okay, all chunks were written.
}));