diff --git a/benchmark/streams/creation.js b/benchmark/streams/creation.js index 0fb0b1ddb696ea..7bc2e2fcd3bd9b 100644 --- a/benchmark/streams/creation.js +++ b/benchmark/streams/creation.js @@ -8,7 +8,7 @@ const { } = require('stream'); const bench = common.createBenchmark(main, { - n: [50e6], + n: [10e6], kind: ['duplex', 'readable', 'transform', 'writable'], }); diff --git a/lib/internal/streams/writable.js b/lib/internal/streams/writable.js index b8f8b72ce8d113..c0c28a4cf27689 100644 --- a/lib/internal/streams/writable.js +++ b/lib/internal/streams/writable.js @@ -865,52 +865,45 @@ function needFinish(state) { )) === (kEnding | kConstructed) && state.length === 0); } -function callFinal(stream, state) { - let called = false; - - function onFinish(err) { - if (called) { - errorOrDestroy(stream, err ?? ERR_MULTIPLE_CALLBACK()); - return; - } - called = true; - - state.pendingcb--; - if (err) { - callFinishedCallbacks(state, err); - errorOrDestroy(stream, err, (state[kState] & kSync) !== 0); - } else if (needFinish(state)) { - state[kState] |= kPrefinished; - stream.emit('prefinish'); - // Backwards compat. Don't check state.sync here. - // Some streams assume 'finish' will be emitted - // asynchronously relative to _final callback. - state.pendingcb++; - process.nextTick(finish, stream, state); - } +function onFinish(stream, state, err) { + if ((state[kState] & kPrefinished) !== 0) { + errorOrDestroy(stream, err ?? ERR_MULTIPLE_CALLBACK()); + return; } - - state[kState] |= kSync; - state.pendingcb++; - - try { - stream._final(onFinish); - } catch (err) { - onFinish(err); + state.pendingcb--; + if (err) { + callFinishedCallbacks(state, err); + errorOrDestroy(stream, err, (state[kState] & kSync) !== 0); + } else if (needFinish(state)) { + state[kState] |= kPrefinished; + stream.emit('prefinish'); + // Backwards compat. Don't check state.sync here. + // Some streams assume 'finish' will be emitted + // asynchronously relative to _final callback. + state.pendingcb++; + process.nextTick(finish, stream, state); } - - state[kState] &= ~kSync; } function prefinish(stream, state) { - if ((state[kState] & (kPrefinished | kFinalCalled)) === 0) { - if (typeof stream._final === 'function' && (state[kState] & kDestroyed) === 0) { - state[kState] |= kFinalCalled; - callFinal(stream, state); - } else { - state[kState] |= kPrefinished; - stream.emit('prefinish'); + if ((state[kState] & (kPrefinished | kFinalCalled)) !== 0) { + return + } + + if (typeof stream._final === 'function' && (state[kState] & kDestroyed) === 0) { + state[kState] |= kFinalCalled | kSync; + state.pendingcb++; + + try { + stream._final(err => onFinish(stream, state, err)); + } catch (err) { + onFinish(stream, state, err); } + + state[kState] &= ~kSync; + } else { + state[kState] |= kFinalCalled | kPrefinished; + stream.emit('prefinish'); } }