From 42d6d8db3ccb629e1c642e9f6561ae5ea09f3c76 Mon Sep 17 00:00:00 2001 From: Debadree Chatterjee Date: Fri, 3 Mar 2023 16:35:48 +0530 Subject: [PATCH] fs: implement byob mode for readableWebStream() Fixes: https://github.com/nodejs/node/issues/45853 --- lib/internal/fs/promises.js | 72 ++++++++++++++++++++++++++++--------- 1 file changed, 56 insertions(+), 16 deletions(-) diff --git a/lib/internal/fs/promises.js b/lib/internal/fs/promises.js index 41c17de95ed3c4..0c99cb20605406 100644 --- a/lib/internal/fs/promises.js +++ b/lib/internal/fs/promises.js @@ -14,6 +14,7 @@ const { SafePromisePrototypeFinally, Symbol, Uint8Array, + FunctionPrototypeBind, } = primordials; const { fs: constants } = internalBinding('constants'); @@ -249,7 +250,7 @@ class FileHandle extends EventEmitterMixin(JSTransferable) { * } ReadableStream * @returns {ReadableStream} */ - readableWebStream() { + readableWebStream(options = kEmptyObject) { if (this[kFd] === -1) throw new ERR_INVALID_STATE('The FileHandle is closed'); if (this[kClosePromise]) @@ -257,21 +258,60 @@ class FileHandle extends EventEmitterMixin(JSTransferable) { if (this[kLocked]) throw new ERR_INVALID_STATE('The FileHandle is locked'); this[kLocked] = true; - const { - newReadableStreamFromStreamBase, - } = require('internal/webstreams/adapters'); - const readable = newReadableStreamFromStreamBase( - this[kHandle], - undefined, - { ondone: () => this[kUnref]() }); - - const { - readableStreamCancel, - } = require('internal/webstreams/readablestream'); - this[kRef](); - this.once('close', () => { - readableStreamCancel(readable); - }); + + if (options.type !== undefined) { + validateString(options.type, 'options.type'); + } + + let readable; + + if (options.type !== 'bytes') { + const { + newReadableStreamFromStreamBase, + } = require('internal/webstreams/adapters'); + readable = newReadableStreamFromStreamBase( + this[kHandle], + undefined, + { ondone: () => this[kUnref]() }); + + const { + readableStreamCancel, + } = require('internal/webstreams/readablestream'); + this[kRef](); + this.once('close', () => { + readableStreamCancel(readable); + }); + } else { + const { + readableStreamCancel, + ReadableStream, + } = require('internal/webstreams/readablestream'); + this[kRef](); + this.once('close', () => { + readableStreamCancel(readable); + }); + const readFn = FunctionPrototypeBind(this.read, this); + readable = new ReadableStream({ + type: 'bytes', + autoAllocateChunkSize: 16384, + + async pull(controller) { + const view = controller.byobRequest.view; + const { bytesRead } = await readFn(view, view.byteOffset, view.byteLength); + + if (bytesRead === 0) { + try { + controller.close(); + this[kUnref](); + } catch (error) { + controller.error(error); + } + } + + controller.byobRequest.respond(bytesRead); + }, + }); + } return readable; }