From 104311c312f80b6008eb26cc38a4552adf708818 Mon Sep 17 00:00:00 2001 From: Debadree Chatterjee Date: Mon, 10 Apr 2023 12:49:19 +0530 Subject: [PATCH] fs: implement byob mode for readableWebStream() Fixes: https://github.com/nodejs/node/issues/45853 PR-URL: https://github.com/nodejs/node/pull/46933 Reviewed-By: Benjamin Gruenbaum Reviewed-By: Matteo Collina --- doc/api/fs.md | 10 ++- lib/internal/fs/promises.js | 68 +++++++++++++-- .../test-filehandle-readablestream.js | 84 +++++++++++++++++++ 3 files changed, 152 insertions(+), 10 deletions(-) diff --git a/doc/api/fs.md b/doc/api/fs.md index e4f6f14ea938b1..c3b4fc54734728 100644 --- a/doc/api/fs.md +++ b/doc/api/fs.md @@ -444,14 +444,22 @@ Reads data from the file and stores that in the given buffer. If the file is not modified concurrently, the end-of-file is reached when the number of bytes read is zero. -#### `filehandle.readableWebStream()` +#### `filehandle.readableWebStream(options)` > Stability: 1 - Experimental +* `options` {Object} + * `type` {string|undefined} Whether to open a normal or a `'bytes'` stream. + **Default:** `undefined` + * Returns: {ReadableStream} Returns a `ReadableStream` that may be used to read the files data. diff --git a/lib/internal/fs/promises.js b/lib/internal/fs/promises.js index 31f2be4e800d40..12e4aa0aadc2eb 100644 --- a/lib/internal/fs/promises.js +++ b/lib/internal/fs/promises.js @@ -14,6 +14,7 @@ const { SafePromisePrototypeFinally, Symbol, Uint8Array, + FunctionPrototypeBind, } = primordials; const { fs: constants } = internalBinding('constants'); @@ -252,7 +253,7 @@ class FileHandle extends EventEmitterMixin(JSTransferable) { * } ReadableStream * @returns {ReadableStream} */ - readableWebStream() { + readableWebStream(options = kEmptyObject) { if (this[kFd] === -1) throw new ERR_INVALID_STATE('The FileHandle is closed'); if (this[kClosePromise]) @@ -261,15 +262,64 @@ class FileHandle extends EventEmitterMixin(JSTransferable) { throw new ERR_INVALID_STATE('The FileHandle is locked'); this[kLocked] = true; - const readable = newReadableStreamFromStreamBase( - this[kHandle], - undefined, - { ondone: () => this[kUnref]() }); + if (options.type !== undefined) { + validateString(options.type, 'options.type'); + } - this[kRef](); - this.once('close', () => { - readableStreamCancel(readable); - }); + let readable; + + if (options.type !== 'bytes') { + const { + newReadableStreamFromStreamBase, + } = require('internal/webstreams/adapters'); + readable = newReadableStreamFromStreamBase( + this[kHandle], + undefined, + { ondone: () => this[kUnref]() }); + + const { + readableStreamCancel, + } = require('internal/webstreams/readablestream'); + this[kRef](); + this.once('close', () => { + readableStreamCancel(readable); + }); + } else { + const { + readableStreamCancel, + ReadableStream, + } = require('internal/webstreams/readablestream'); + + const readFn = FunctionPrototypeBind(this.read, this); + const ondone = FunctionPrototypeBind(this[kUnref], this); + + readable = new ReadableStream({ + type: 'bytes', + autoAllocateChunkSize: 16384, + + async pull(controller) { + const view = controller.byobRequest.view; + const { bytesRead } = await readFn(view, view.byteOffset, view.byteLength); + + if (bytesRead === 0) { + ondone(); + controller.close(); + } + + controller.byobRequest.respond(bytesRead); + }, + + cancel() { + ondone(); + }, + }); + + this[kRef](); + + this.once('close', () => { + readableStreamCancel(readable); + }); + } return readable; } diff --git a/test/parallel/test-filehandle-readablestream.js b/test/parallel/test-filehandle-readablestream.js index 5f4ff0960b238c..70cd1814b2fe1b 100644 --- a/test/parallel/test-filehandle-readablestream.js +++ b/test/parallel/test-filehandle-readablestream.js @@ -86,3 +86,87 @@ const check = readFileSync(__filename, { encoding: 'utf8' }); mc.port1.close(); await file.close(); })().then(common.mustCall()); + +// Make sure 'bytes' stream works +(async () => { + const file = await open(__filename); + const dec = new TextDecoder(); + const readable = file.readableWebStream({ type: 'bytes' }); + const reader = readable.getReader({ mode: 'byob' }); + + let data = ''; + let result; + do { + const buff = new ArrayBuffer(100); + result = await reader.read(new DataView(buff)); + if (result.value !== undefined) { + data += dec.decode(result.value); + assert.ok(result.value.byteLength <= 100); + } + } while (!result.done); + + assert.strictEqual(check, data); + + assert.throws(() => file.readableWebStream(), { + code: 'ERR_INVALID_STATE', + }); + + await file.close(); +})().then(common.mustCall()); + +// Make sure that acquiring a ReadableStream 'bytes' stream +// fails if the FileHandle is already closed. +(async () => { + const file = await open(__filename); + await file.close(); + + assert.throws(() => file.readableWebStream({ type: 'bytes' }), { + code: 'ERR_INVALID_STATE', + }); +})().then(common.mustCall()); + +// Make sure that acquiring a ReadableStream 'bytes' stream +// fails if the FileHandle is already closing. +(async () => { + const file = await open(__filename); + file.close(); + + assert.throws(() => file.readableWebStream({ type: 'bytes' }), { + code: 'ERR_INVALID_STATE', + }); +})().then(common.mustCall()); + +// Make sure the 'bytes' ReadableStream is closed when the underlying +// FileHandle is closed. +(async () => { + const file = await open(__filename); + const readable = file.readableWebStream({ type: 'bytes' }); + const reader = readable.getReader({ mode: 'byob' }); + file.close(); + await reader.closed; +})().then(common.mustCall()); + +// Make sure the 'bytes' ReadableStream is closed when the underlying +// FileHandle is closed. +(async () => { + const file = await open(__filename); + const readable = file.readableWebStream({ type: 'bytes' }); + file.close(); + const reader = readable.getReader({ mode: 'byob' }); + await reader.closed; +})().then(common.mustCall()); + +// Make sure that the FileHandle is properly marked "in use" +// when a 'bytes' ReadableStream has been acquired for it. +(async () => { + const file = await open(__filename); + file.readableWebStream({ type: 'bytes' }); + const mc = new MessageChannel(); + mc.port1.onmessage = common.mustNotCall(); + assert.throws(() => mc.port2.postMessage(file, [file]), { + code: 25, + name: 'DataCloneError', + }); + mc.port1.close(); + await file.close(); +})().then(common.mustCall());