Skip to content

Commit

Permalink
stream: allow calling callback before promise
Browse files Browse the repository at this point in the history
Refs: #39535

PR-URL: #40772
Reviewed-By: Matteo Collina <matteo.collina@gmail.com>
Reviewed-By: Luigi Pinca <luigipinca@gmail.com>
Reviewed-By: James M Snell <jasnell@gmail.com>
  • Loading branch information
ronag committed Nov 14, 2021
1 parent cf56abe commit afe460e
Show file tree
Hide file tree
Showing 4 changed files with 39 additions and 27 deletions.
8 changes: 6 additions & 2 deletions lib/internal/streams/destroy.js
Original file line number Diff line number Diff line change
Expand Up @@ -292,10 +292,14 @@ function constructNT(stream) {
then.call(
result,
function() {
process.nextTick(onConstruct, null);
if (!called) {
process.nextTick(onConstruct, null);
}
},
function(err) {
process.nextTick(onConstruct, err);
if (!called) {
process.nextTick(onConstruct, err);
}
});
}
}
Expand Down
8 changes: 6 additions & 2 deletions lib/internal/streams/writable.js
Original file line number Diff line number Diff line change
Expand Up @@ -699,10 +699,14 @@ function callFinal(stream, state) {
then.call(
result,
function() {
process.nextTick(onFinish, null);
if (!called) {
process.nextTick(onFinish, null);
}
},
function(err) {
process.nextTick(onFinish, err);
if (!called) {
process.nextTick(onFinish, err);
}
});
}
}
Expand Down
24 changes: 1 addition & 23 deletions test/parallel/test-stream-construct-async-error.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,26 +9,6 @@ const {
const { setTimeout } = require('timers/promises');
const assert = require('assert');

{
class Foo extends Duplex {
async _construct(cb) {
// eslint-disable-next-line no-restricted-syntax
await setTimeout(common.platformTimeout(1));
cb();
throw new Error('boom');
}
}

const foo = new Foo();
foo.on('error', common.expectsError({
message: 'boom'
}));
foo.on('close', common.mustCall(() => {
assert(foo._writableState.constructed);
assert(foo._readableState.constructed);
}));
}

{
class Foo extends Duplex {
async _destroy(err, cb) {
Expand Down Expand Up @@ -98,9 +78,7 @@ const assert = require('assert');

const foo = new Foo();
foo.write('test', common.mustCall());
foo.on('error', common.expectsError({
code: 'ERR_MULTIPLE_CALLBACK'
}));
foo.on('error', common.mustNotCall());
}

{
Expand Down
26 changes: 26 additions & 0 deletions test/parallel/test-stream-writable-final-async.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
'use strict';

const common = require('../common');
const {
Duplex,
} = require('stream');
const { setTimeout } = require('timers/promises');

{
class Foo extends Duplex {
async _final(callback) {
// eslint-disable-next-line no-restricted-syntax
await setTimeout(common.platformTimeout(1));
callback();
}

_read() {}
}

const foo = new Foo();
foo._write = common.mustCall((chunk, encoding, cb) => {
cb();
});
foo.end('test', common.mustCall());
foo.on('error', common.mustNotCall());
}

0 comments on commit afe460e

Please sign in to comment.