Skip to content

Commit

Permalink
stream: simplify prefinish
Browse files Browse the repository at this point in the history
PR-URL: nodejs#50204
Reviewed-By: Matteo Collina <matteo.collina@gmail.com>
Reviewed-By: Yagiz Nizipli <yagiz@nizipli.com>
Reviewed-By: Benjamin Gruenbaum <benjamingr@gmail.com>
  • Loading branch information
ronag authored and alexfernandez committed Nov 1, 2023
1 parent 528470a commit a95a413
Showing 1 changed file with 33 additions and 40 deletions.
73 changes: 33 additions & 40 deletions lib/internal/streams/writable.js
Original file line number Diff line number Diff line change
Expand Up @@ -865,52 +865,45 @@ function needFinish(state) {
)) === (kEnding | kConstructed) && state.length === 0);
}

function callFinal(stream, state) {
let called = false;

function onFinish(err) {
if (called) {
errorOrDestroy(stream, err ?? ERR_MULTIPLE_CALLBACK());
return;
}
called = true;

state.pendingcb--;
if (err) {
callFinishedCallbacks(state, err);
errorOrDestroy(stream, err, (state[kState] & kSync) !== 0);
} else if (needFinish(state)) {
state[kState] |= kPrefinished;
stream.emit('prefinish');
// Backwards compat. Don't check state.sync here.
// Some streams assume 'finish' will be emitted
// asynchronously relative to _final callback.
state.pendingcb++;
process.nextTick(finish, stream, state);
}
function onFinish(stream, state, err) {
if ((state[kState] & kPrefinished) !== 0) {
errorOrDestroy(stream, err ?? ERR_MULTIPLE_CALLBACK());
return;
}

state[kState] |= kSync;
state.pendingcb++;

try {
stream._final(onFinish);
} catch (err) {
onFinish(err);
state.pendingcb--;
if (err) {
callFinishedCallbacks(state, err);
errorOrDestroy(stream, err, (state[kState] & kSync) !== 0);
} else if (needFinish(state)) {
state[kState] |= kPrefinished;
stream.emit('prefinish');
// Backwards compat. Don't check state.sync here.
// Some streams assume 'finish' will be emitted
// asynchronously relative to _final callback.
state.pendingcb++;
process.nextTick(finish, stream, state);
}

state[kState] &= ~kSync;
}

function prefinish(stream, state) {
if ((state[kState] & (kPrefinished | kFinalCalled)) === 0) {
if (typeof stream._final === 'function' && (state[kState] & kDestroyed) === 0) {
state[kState] |= kFinalCalled;
callFinal(stream, state);
} else {
state[kState] |= kPrefinished;
stream.emit('prefinish');
if ((state[kState] & (kPrefinished | kFinalCalled)) !== 0) {
return;
}

if (typeof stream._final === 'function' && (state[kState] & kDestroyed) === 0) {
state[kState] |= kFinalCalled | kSync;
state.pendingcb++;

try {
stream._final((err) => onFinish(stream, state, err));
} catch (err) {
onFinish(stream, state, err);
}

state[kState] &= ~kSync;
} else {
state[kState] |= kFinalCalled | kPrefinished;
stream.emit('prefinish');
}
}

Expand Down

0 comments on commit a95a413

Please sign in to comment.