diff --git a/lib/internal/webstreams/readablestream.js b/lib/internal/webstreams/readablestream.js index a1845574f93ee0..c916ccd20d2063 100644 --- a/lib/internal/webstreams/readablestream.js +++ b/lib/internal/webstreams/readablestream.js @@ -132,6 +132,8 @@ const { writableStreamDefaultWriterWrite, } = require('internal/webstreams/writablestream'); +const { Buffer } = require('buffer'); + const assert = require('internal/assert'); const kCancel = Symbol('kCancel'); @@ -1904,6 +1906,11 @@ function readableByteStreamControllerConvertPullIntoDescriptor(desc) { throw new ERR_INVALID_STATE.RangeError('The buffer size is invalid'); assert(!(bytesFilled % elementSize)); const transferredBuffer = transferArrayBuffer(buffer); + + if (ctor === Buffer) { + return Buffer.from(transferredBuffer, byteOffset, bytesFilled / elementSize); + } + return new ctor(transferredBuffer, byteOffset, bytesFilled / elementSize); } diff --git a/test/parallel/test-whatwg-readablebytestreambyob.js b/test/parallel/test-whatwg-readablebytestreambyob.js new file mode 100644 index 00000000000000..5dbe4813dbc4f6 --- /dev/null +++ b/test/parallel/test-whatwg-readablebytestreambyob.js @@ -0,0 +1,62 @@ +'use strict'; + +const common = require('../common'); + +const { + open, +} = require('fs/promises'); + +const { + Buffer, +} = require('buffer'); + +class Source { + async start(controller) { + this.file = await open(__filename); + this.controller = controller; + } + + async pull(controller) { + const byobRequest = controller.byobRequest; + const view = byobRequest.view; + + const { + bytesRead, + } = await this.file.read({ + buffer: view, + offset: view.byteOffset, + length: view.byteLength + }); + + if (bytesRead === 0) { + await this.file.close(); + this.controller.close(); + } + + byobRequest.respond(bytesRead); + } + + get type() { return 'bytes'; } + + get autoAllocateChunkSize() { return 1024; } +} + +(async () => { + const source = new Source(); + const stream = new ReadableStream(source); + + const { emitWarning } = process; + + process.emitWarning = common.mustNotCall(); + + try { + const reader = stream.getReader({ mode: 'byob' }); + + let result; + do { + result = await reader.read(Buffer.alloc(100)); + } while (!result.done); + } finally { + process.emitWarning = emitWarning; + } +})().then(common.mustCall());