From 5c5608e11ff3ae9a9c598a6355b98933142a4d81 Mon Sep 17 00:00:00 2001 From: RafaelGSS Date: Sat, 20 Nov 2021 23:44:38 -0300 Subject: [PATCH] streams: fix enqueue race condition on esm modules --- lib/internal/webstreams/readablestream.js | 36 ++++++++++----------- test/parallel/test-whatwg-readablestream.js | 33 +++++++++++++++++++ 2 files changed, 50 insertions(+), 19 deletions(-) diff --git a/lib/internal/webstreams/readablestream.js b/lib/internal/webstreams/readablestream.js index fe9b26b991f04e..a88d166f886a94 100644 --- a/lib/internal/webstreams/readablestream.js +++ b/lib/internal/webstreams/readablestream.js @@ -1403,25 +1403,23 @@ function readableStreamTee(stream, cloneForBranch2) { reading = true; const readRequest = { [kChunk](value) { - queueMicrotask(() => { - reading = false; - const value1 = value; - let value2 = value; - if (!canceled2 && cloneForBranch2) { - // Structured Clone - value2 = deserialize(serialize(value2)); - } - if (!canceled1) { - readableStreamDefaultControllerEnqueue( - branch1[kState].controller, - value1); - } - if (!canceled2) { - readableStreamDefaultControllerEnqueue( - branch2[kState].controller, - value2); - } - }); + reading = false; + const value1 = value; + let value2 = value; + if (!canceled2 && cloneForBranch2) { + // Structured Clone + value2 = deserialize(serialize(value2)); + } + if (!canceled1) { + readableStreamDefaultControllerEnqueue( + branch1[kState].controller, + value1); + } + if (!canceled2) { + readableStreamDefaultControllerEnqueue( + branch2[kState].controller, + value2); + } }, [kClose]() { reading = false; diff --git a/test/parallel/test-whatwg-readablestream.js b/test/parallel/test-whatwg-readablestream.js index c8b82fa9823786..edfbdfee67841b 100644 --- a/test/parallel/test-whatwg-readablestream.js +++ b/test/parallel/test-whatwg-readablestream.js @@ -1488,6 +1488,39 @@ class Source { common.mustCall(({ value }) => assert.strictEqual(value, 'hello'))); } +{ + // Test tee() with close in the nextTick after enqueue + async function read(stream) { + const chunks = []; + for await (const chunk of stream) + chunks.push(chunk); + return Buffer.concat(chunks).toString(); + } + + const [r1, r2] = new ReadableStream({ + start(controller) { + process.nextTick(() => { + controller.enqueue(new Uint8Array([102, 111, 111, 98, 97, 114])) + + process.nextTick(() => { + controller.close() + }) + }) + } + }).tee(); + + (async () => { + const [dataReader1, dataReader2] = await Promise.all([ + read(r1), + read(r2), + ]); + + assert.strictEqual(dataReader1, dataReader2); + assert.strictEqual(dataReader1, 'foobar'); + assert.strictEqual(dataReader2, 'foobar'); + })().then(common.mustCall()); +} + { assert.throws(() => { readableByteStreamControllerConvertPullIntoDescriptor({