Skip to content

Commit

Permalink
fs: implement byob mode for readableWebStream()
Browse files Browse the repository at this point in the history
  • Loading branch information
debadree25 committed Mar 3, 2023
1 parent 9dbb162 commit 42d6d8d
Showing 1 changed file with 56 additions and 16 deletions.
72 changes: 56 additions & 16 deletions lib/internal/fs/promises.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ const {
SafePromisePrototypeFinally,
Symbol,
Uint8Array,
FunctionPrototypeBind,
} = primordials;

const { fs: constants } = internalBinding('constants');
Expand Down Expand Up @@ -249,29 +250,68 @@ class FileHandle extends EventEmitterMixin(JSTransferable) {
* } ReadableStream
* @returns {ReadableStream}
*/
readableWebStream() {
readableWebStream(options = kEmptyObject) {
if (this[kFd] === -1)
throw new ERR_INVALID_STATE('The FileHandle is closed');
if (this[kClosePromise])
throw new ERR_INVALID_STATE('The FileHandle is closing');
if (this[kLocked])
throw new ERR_INVALID_STATE('The FileHandle is locked');
this[kLocked] = true;
const {
newReadableStreamFromStreamBase,
} = require('internal/webstreams/adapters');
const readable = newReadableStreamFromStreamBase(
this[kHandle],
undefined,
{ ondone: () => this[kUnref]() });

const {
readableStreamCancel,
} = require('internal/webstreams/readablestream');
this[kRef]();
this.once('close', () => {
readableStreamCancel(readable);
});

if (options.type !== undefined) {
validateString(options.type, 'options.type');
}

let readable;

if (options.type !== 'bytes') {
const {
newReadableStreamFromStreamBase,
} = require('internal/webstreams/adapters');
readable = newReadableStreamFromStreamBase(
this[kHandle],
undefined,
{ ondone: () => this[kUnref]() });

const {
readableStreamCancel,
} = require('internal/webstreams/readablestream');
this[kRef]();
this.once('close', () => {
readableStreamCancel(readable);
});
} else {
const {
readableStreamCancel,
ReadableStream,
} = require('internal/webstreams/readablestream');
this[kRef]();
this.once('close', () => {
readableStreamCancel(readable);
});
const readFn = FunctionPrototypeBind(this.read, this);
readable = new ReadableStream({
type: 'bytes',
autoAllocateChunkSize: 16384,

async pull(controller) {
const view = controller.byobRequest.view;
const { bytesRead } = await readFn(view, view.byteOffset, view.byteLength);

if (bytesRead === 0) {
try {
controller.close();
this[kUnref]();
} catch (error) {
controller.error(error);
}
}

controller.byobRequest.respond(bytesRead);
},
});
}

return readable;
}
Expand Down

0 comments on commit 42d6d8d

Please sign in to comment.