Skip to content

Commit

Permalink
stream: add FileHandle support to Read/WriteStream
Browse files Browse the repository at this point in the history
Support creating a Read/WriteStream from a
FileHandle instead of a raw file descriptor
Add an EventEmitter to FileHandle with a single
'close' event

Refs: nodejs#35240
  • Loading branch information
mmomtchev committed Dec 2, 2020
1 parent ef8d0e9 commit 72873cf
Show file tree
Hide file tree
Showing 6 changed files with 200 additions and 38 deletions.
18 changes: 16 additions & 2 deletions doc/api/fs.md
Original file line number Diff line number Diff line change
Expand Up @@ -1761,6 +1761,9 @@ fs.copyFileSync('source.txt', 'destination.txt', COPYFILE_EXCL);
<!-- YAML
added: v0.1.31
changes:
- version:
- v15.0.2
description: The `fd` option accepts FileHandle arguments.
- version:
- v13.6.0
- v12.17.0
Expand Down Expand Up @@ -1792,7 +1795,7 @@ changes:
* `flags` {string} See [support of file system `flags`][]. **Default:**
`'r'`.
* `encoding` {string} **Default:** `null`
* `fd` {integer} **Default:** `null`
* `fd` {integer|FileHandle} **Default:** `null`
* `mode` {integer} **Default:** `0o666`
* `autoClose` {boolean} **Default:** `true`
* `emitClose` {boolean} **Default:** `false`
Expand Down Expand Up @@ -1868,6 +1871,9 @@ If `options` is a string, then it specifies the encoding.
<!-- YAML
added: v0.1.31
changes:
- version:
- v15.0.2
description: The `fd` option accepts FileHandle arguments.
- version:
- v13.6.0
- v12.17.0
Expand Down Expand Up @@ -1897,7 +1903,7 @@ changes:
* `flags` {string} See [support of file system `flags`][]. **Default:**
`'w'`.
* `encoding` {string} **Default:** `'utf8'`
* `fd` {integer} **Default:** `null`
* `fd` {integer|FileHandle} **Default:** `null`
* `mode` {integer} **Default:** `0o666`
* `autoClose` {boolean} **Default:** `true`
* `emitClose` {boolean} **Default:** `false`
Expand Down Expand Up @@ -4707,6 +4713,14 @@ the promise-based API uses the `FileHandle` class in order to help avoid
accidental leaking of unclosed file descriptors after a `Promise` is resolved or
rejected.

#### Event: `'close'`
<!-- YAML
added: v15.0.2
-->

The `'close'` event is emitted when the FileHandle and any of its underlying
resources (a file descriptor, for example) have been closed.

#### `filehandle.appendFile(data, options)`
<!-- YAML
added: v10.0.0
Expand Down
48 changes: 37 additions & 11 deletions lib/internal/fs/promises.js
Original file line number Diff line number Diff line change
Expand Up @@ -71,13 +71,17 @@ const {
} = require('internal/validators');
const pathModule = require('path');
const { promisify } = require('internal/util');
const EventEmitter = require('events');

const kHandle = Symbol('kHandle');
const kFd = Symbol('kFd');
const kRefs = Symbol('kRefs');
const kEmitter = Symbol('kEmitter');
const kClosePromise = Symbol('kClosePromise');
const kCloseResolve = Symbol('kCloseResolve');
const kCloseReject = Symbol('kCloseReject');
const kRef = Symbol('kRef');
const kUnref = Symbol('kUnref');

const { kUsePromises } = binding;
const {
Expand All @@ -97,6 +101,7 @@ const lazyDOMException = hideStackFrames((message, name) => {
class FileHandle extends JSTransferable {
constructor(filehandle) {
super();
this[kEmitter] = new EventEmitter();
this[kHandle] = filehandle;
this[kFd] = filehandle ? filehandle.fd : -1;

Expand Down Expand Up @@ -177,6 +182,8 @@ class FileHandle extends JSTransferable {
return this[kClosePromise];
}

this[kEmitter].emit('close');

this[kRefs]--;
if (this[kRefs] === 0) {
this[kFd] = -1;
Expand Down Expand Up @@ -211,6 +218,7 @@ class FileHandle extends JSTransferable {
this[kFd] = -1;
this[kHandle] = null;
this[kRefs] = 0;
this[kEmitter] = null;

return {
data: { handle },
Expand All @@ -225,9 +233,33 @@ class FileHandle extends JSTransferable {
[kDeserialize]({ handle }) {
this[kHandle] = handle;
this[kFd] = handle.fd;
this[kEmitter] = new EventEmitter();
}

[kRef]() {
this[kRefs]++;
}

[kUnref]() {
this[kRefs]--;
if (this[kRefs] === 0) {
this[kFd] = -1;
PromisePrototypeThen(
this[kHandle].close(),
this[kCloseResolve],
this[kCloseReject]
);
}
}
}

for (const op of ['on', 'emit', 'once', 'off', 'addListener',
'prependListener', 'removeListener', 'removeAllListeners',
'listeners', 'rawListeners'])
FileHandle.prototype[op] = function(...args) {
this[kEmitter][op].apply(this[kEmitter], args);
};

async function fsCall(fn, handle, ...args) {
if (handle[kRefs] === undefined) {
throw new ERR_INVALID_ARG_TYPE('filehandle', 'FileHandle', handle);
Expand All @@ -242,18 +274,10 @@ async function fsCall(fn, handle, ...args) {
}

try {
handle[kRefs]++;
handle[kRef]();
return await fn(handle, ...args);
} finally {
handle[kRefs]--;
if (handle[kRefs] === 0) {
handle[kFd] = -1;
PromisePrototypeThen(
handle[kHandle].close(),
handle[kCloseResolve],
handle[kCloseReject]
);
}
handle[kUnref]();
}
}

Expand Down Expand Up @@ -712,5 +736,7 @@ module.exports = {
readFile,
},

FileHandle
FileHandle,
kRef,
kUnref
};
64 changes: 62 additions & 2 deletions lib/internal/fs/streams.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,14 @@ const {

const {
ERR_INVALID_ARG_TYPE,
ERR_OUT_OF_RANGE
ERR_OUT_OF_RANGE,
ERR_METHOD_NOT_IMPLEMENTED
} = require('internal/errors').codes;
const { deprecate } = require('internal/util');
const { validateInteger } = require('internal/validators');
const { errorOrDestroy } = require('internal/streams/destroy');
const fs = require('fs');
const { FileHandle, kRef, kUnref } = require('internal/fs/promises');
const { Buffer } = require('buffer');
const {
copyObject,
Expand All @@ -28,6 +30,7 @@ const kIoDone = Symbol('kIoDone');
const kIsPerformingIO = Symbol('kIsPerformingIO');

const kFs = Symbol('kFs');
const kHandle = Symbol('kHandle');

function _construct(callback) {
const stream = this;
Expand Down Expand Up @@ -66,6 +69,36 @@ function _construct(callback) {
}
}

// This generates an fs operations structure for a FileHandle
const FileHandleOperations = (handle) => {
return {
open: (path, flags, mode, cb) => {
throw new ERR_METHOD_NOT_IMPLEMENTED('open()');
},
close: (fd, cb) => {
handle[kUnref]();
handle.close()
.then(() => cb())
.catch((err) => cb(err));
},
read: (fd, buf, offset, length, pos, cb) => {
handle.read(buf, offset, length, pos)
.then((r) => cb(null, r.bytesRead, r.buffer))
.catch((err) => cb(err, 0, buf));
},
write: (fd, buf, offset, length, pos, cb) => {
handle.read(buf, offset, length, pos)
.then((r) => cb(null, r.bytesWritten, r.buffer))
.catch((err) => cb(err, 0, buf));
},
writev: (fd, buffers, pos, cb) => {
handle.writev(buffers, pos)
.then((r) => cb(null, r.bytesWritten, r.buffers))
.catch((err) => cb(err, 0, buffers));
}
};
};

function close(stream, err, cb) {
if (!stream.fd) {
// TODO(ronag)
Expand All @@ -80,6 +113,30 @@ function close(stream, err, cb) {
}
}

function importFd(stream, options) {
stream.fd = null;
if (options.fd) {
// When fd is a FileHandle we can listen for 'close' events
if (options.fd instanceof FileHandle) {
// FileHandle is not supported with custom fs operations
if (options.fs)
throw new ERR_METHOD_NOT_IMPLEMENTED('FileHandle with fs');
stream[kHandle] = options.fd;
stream.fd = options.fd.fd;
stream[kFs] = FileHandleOperations(stream[kHandle]);
stream[kHandle][kRef]();
options.fd.on('close', stream.close.bind(stream));
}
// When fd is a raw descriptor, we must keep our fingers crossed
// that the descriptor won't get closed, or worse, replaced with
// another one
// https://github.com/nodejs/node/issues/35862
if (typeof options.fd === 'number') {
stream.fd = options.fd;
}
}
}

function ReadStream(path, options) {
if (!(this instanceof ReadStream))
return new ReadStream(path, options);
Expand Down Expand Up @@ -119,6 +176,8 @@ function ReadStream(path, options) {
this.flags = options.flags === undefined ? 'r' : options.flags;
this.mode = options.mode === undefined ? 0o666 : options.mode;

importFd(this, options);

this.start = options.start;
this.end = options.end;
this.pos = undefined;
Expand Down Expand Up @@ -287,10 +346,11 @@ function WriteStream(path, options) {

// Path will be ignored when fd is specified, so it can be falsy
this.path = toPathIfFileURL(path);
this.fd = options.fd === undefined ? null : options.fd;
this.flags = options.flags === undefined ? 'w' : options.flags;
this.mode = options.mode === undefined ? 0o666 : options.mode;

importFd(this, options);

this.start = options.start;
this.pos = undefined;
this.bytesWritten = 0;
Expand Down
45 changes: 22 additions & 23 deletions test/parallel/test-fs-promises-file-handle-read.js
Original file line number Diff line number Diff line change
Expand Up @@ -19,33 +19,32 @@ async function read(fileHandle, buffer, offset, length, position) {
fileHandle.read(buffer, offset, length, position);
}

async function validateRead() {
const filePath = path.resolve(tmpDir, 'tmp-read-file.txt');
const fileHandle = await open(filePath, 'w+');
const buffer = Buffer.from('Hello world', 'utf8');
async function validateRead(data, file) {
const filePath = path.resolve(tmpDir, file);
const buffer = Buffer.from(data, 'utf8');

const fd = fs.openSync(filePath, 'w+');
fs.writeSync(fd, buffer, 0, buffer.length);
fs.closeSync(fd);
const readAsyncHandle = await read(fileHandle, Buffer.alloc(11), 0, 11, 0);
assert.deepStrictEqual(buffer.length, readAsyncHandle.bytesRead);
assert.deepStrictEqual(buffer, readAsyncHandle.buffer);

await fileHandle.close();
}

async function validateEmptyRead() {
const filePath = path.resolve(tmpDir, 'tmp-read-empty-file.txt');
const fileHandle = await open(filePath, 'w+');
const buffer = Buffer.from('', 'utf8');
const streamFileHandle = await open(filePath, 'w+');

const fd = fs.openSync(filePath, 'w+');
fs.writeSync(fd, buffer, 0, buffer.length);
fs.closeSync(fd);
const readAsyncHandle = await read(fileHandle, Buffer.alloc(11), 0, 11, 0);
assert.deepStrictEqual(buffer.length, readAsyncHandle.bytesRead);

fileHandle.on('close', common.mustCall());
const readAsyncHandle = await read(fileHandle, Buffer.alloc(11), 0, 11, 0);
assert.deepStrictEqual(data.length, readAsyncHandle.bytesRead);
if (data.length)
assert.deepStrictEqual(buffer, readAsyncHandle.buffer);
await fileHandle.close();

const stream = fs.createReadStream(null, { fd: streamFileHandle });
let streamData = Buffer.alloc(0);
for await (const chunk of stream)
streamData = Buffer.from(chunk);
assert.deepStrictEqual(buffer, streamData);
if (data.length)
assert.deepStrictEqual(streamData, readAsyncHandle.buffer);
await streamFileHandle.close();
}

async function validateLargeRead() {
Expand All @@ -67,9 +66,9 @@ let useConf = false;
tmpdir.refresh();
useConf = value;

await validateRead()
.then(validateEmptyRead)
.then(validateLargeRead)
.then(common.mustCall());
await validateRead('Hello world', 'tmp-read-file.txt')
.then(validateRead('', 'tmp-read-empty-file.txt'))
.then(validateLargeRead)
.then(common.mustCall());
}
});
46 changes: 46 additions & 0 deletions test/parallel/test-fs-read-stream-file-handle.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
'use strict';
const common = require('../common');
const fs = require('fs');
const assert = require('assert');
const path = require('path');
const tmpdir = require('../common/tmpdir');
const file = path.join(tmpdir.path, '/read_stream_filehandle_test.txt');
const input = 'hello world';

let output = '';
tmpdir.refresh();
fs.writeFileSync(file, input);

fs.promises.open(file, 'r').then((handle) => {
handle.on('close', common.mustCall());
const stream = fs.createReadStream(null, { fd: handle });

stream.on('data', common.mustCallAtLeast((data) => {
output += data;
}));

stream.on('end', common.mustCall(() => {
assert.strictEqual(output, input);
}));

stream.on('close', common.mustCall());
});

fs.promises.open(file, 'r').then((handle) => {
handle.on('close', common.mustCall());
const stream = fs.createReadStream(null, { fd: handle });
stream.on('data', common.mustNotCall());
stream.on('close', common.mustCall());

handle.close();
});

fs.promises.open(file, 'r').then((handle) => {
assert.throws(() => {
fs.createReadStream(null, { fd: handle, fs });
}, {
code: 'ERR_METHOD_NOT_IMPLEMENTED',
name: 'Error',
message: 'The FileHandle with fs method is not implemented'
});
});
Loading

0 comments on commit 72873cf

Please sign in to comment.