diff --git a/lib/internal/blob.js b/lib/internal/blob.js index ee8e1c75819ab9..0d0e9906dbdb31 100644 --- a/lib/internal/blob.js +++ b/lib/internal/blob.js @@ -329,34 +329,45 @@ class Blob { pull(c) { const { promise, resolve, reject } = createDeferredPromise(); this.pendingPulls.push({ resolve, reject }); - reader.pull((status, buffer) => { - // If pendingPulls is empty here, the stream had to have - // been canceled, and we don't really care about the result. - // we can simply exit. - if (this.pendingPulls.length === 0) { - return; - } - const pending = this.pendingPulls.shift(); - if (status === 0) { - // EOS - c.close(); - pending.resolve(); - return; - } else if (status < 0) { - // The read could fail for many different reasons when reading - // from a non-memory resident blob part (e.g. file-backed blob). - // The error details the system error code. - const error = lazyDOMException('The blob could not be read', 'NotReadableError'); - - c.error(error); - pending.reject(error); - return; - } - if (buffer !== undefined) { - c.enqueue(new Uint8Array(buffer)); - } - pending.resolve(); - }); + const readNext = () => { + reader.pull((status, buffer) => { + // If pendingPulls is empty here, the stream had to have + // been canceled, and we don't really care about the result. + // We can simply exit. + if (this.pendingPulls.length === 0) { + return; + } + if (status === 0) { + // EOS + c.close(); + const pending = this.pendingPulls.shift(); + pending.resolve(); + return; + } else if (status < 0) { + // The read could fail for many different reasons when reading + // from a non-memory resident blob part (e.g. file-backed blob). + // The error details the system error code. + const error = lazyDOMException('The blob could not be read', 'NotReadableError'); + const pending = this.pendingPulls.shift(); + c.error(error); + pending.reject(error); + return; + } + if (buffer !== undefined) { + c.enqueue(new Uint8Array(buffer)); + } + // We keep reading until we either reach EOS, some error, or we + // hit the flow rate of the stream (c.desiredSize). + queueMicrotask(() => { + if (c.desiredSize <= 0) { + // A manual backpressure check. + return; + } + readNext(); + }); + }); + }; + readNext(); return promise; }, cancel(reason) { diff --git a/test/parallel/test-blob.js b/test/parallel/test-blob.js index 6b6ce70687660e..27dee5690d7e06 100644 --- a/test/parallel/test-blob.js +++ b/test/parallel/test-blob.js @@ -1,4 +1,4 @@ -// Flags: --no-warnings +// Flags: --no-warnings --expose-internals 'use strict'; const common = require('../common'); @@ -6,6 +6,7 @@ const assert = require('assert'); const { Blob } = require('buffer'); const { inspect } = require('util'); const { EOL } = require('os'); +const { kState } = require('internal/webstreams/util'); { const b = new Blob(); @@ -237,6 +238,50 @@ assert.throws(() => new Blob({}), { assert(res.done); })().then(common.mustCall()); +(async () => { + const b = new Blob(Array(10).fill('hello')); + const reader = b.stream().getReader(); + const chunks = []; + while (true) { + const res = await reader.read(); + if (res.done) break; + assert.strictEqual(res.value.byteLength, 5); + chunks.push(res.value); + } + assert.strictEqual(chunks.length, 10); +})().then(common.mustCall()); + +(async () => { + const b = new Blob(Array(10).fill('hello')); + const reader = b.stream().getReader(); + const chunks = []; + while (true) { + const res = await reader.read(); + if (chunks.length === 5) { + reader.cancel('boom'); + break; + } + if (res.done) break; + assert.strictEqual(res.value.byteLength, 5); + chunks.push(res.value); + } + assert.strictEqual(chunks.length, 5); + reader.closed.then(common.mustCall()); +})().then(common.mustCall()); + +(async () => { + const b = new Blob(Array(10).fill('hello')); + const stream = b.stream(); + const reader = stream.getReader(); + assert.strictEqual(stream[kState].controller.desiredSize, 1); + const { value, done } = await reader.read(); + assert.strictEqual(value.byteLength, 5); + assert(!done); + setTimeout(() => { + assert.strictEqual(stream[kState].controller.desiredSize, 0); + }, 0); +})().then(common.mustCall()); + { const b = new Blob(['hello\n'], { endings: 'native' }); assert.strictEqual(b.size, EOL.length + 5);