-
Notifications
You must be signed in to change notification settings - Fork 29.9k
Commit
Support creating a Read/WriteStream from a FileHandle instead of a raw file descriptor Add an EventEmitter to FileHandle with a single 'close' event. Fixes: #35240 PR-URL: #35922 Reviewed-By: Anna Henningsen <anna@addaleax.net> Reviewed-By: Rich Trott <rtrott@gmail.com>
- Loading branch information
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -2,21 +2,25 @@ | |
|
||
const { | ||
Array, | ||
FunctionPrototypeBind, | ||
MathMin, | ||
ObjectDefineProperty, | ||
ObjectSetPrototypeOf, | ||
PromisePrototypeThen, | ||
ReflectApply, | ||
Symbol, | ||
} = primordials; | ||
|
||
const { | ||
ERR_INVALID_ARG_TYPE, | ||
ERR_OUT_OF_RANGE | ||
ERR_OUT_OF_RANGE, | ||
ERR_METHOD_NOT_IMPLEMENTED, | ||
} = require('internal/errors').codes; | ||
const { deprecate } = require('internal/util'); | ||
const { validateInteger } = require('internal/validators'); | ||
const { errorOrDestroy } = require('internal/streams/destroy'); | ||
const fs = require('fs'); | ||
const { kRef, kUnref, FileHandle } = require('internal/fs/promises'); | ||
const { Buffer } = require('buffer'); | ||
const { | ||
copyObject, | ||
|
@@ -28,6 +32,7 @@ const kIoDone = Symbol('kIoDone'); | |
const kIsPerformingIO = Symbol('kIsPerformingIO'); | ||
|
||
const kFs = Symbol('kFs'); | ||
const kHandle = Symbol('kHandle'); | ||
|
||
function _construct(callback) { | ||
const stream = this; | ||
|
@@ -66,6 +71,35 @@ function _construct(callback) { | |
} | ||
} | ||
|
||
// This generates an fs operations structure for a FileHandle | ||
const FileHandleOperations = (handle) => { | ||
return { | ||
open: (path, flags, mode, cb) => { | ||
throw new ERR_METHOD_NOT_IMPLEMENTED('open()'); | ||
}, | ||
close: (fd, cb) => { | ||
handle[kUnref](); | ||
PromisePrototypeThen(handle.close(), | ||
() => cb(), cb); | ||
}, | ||
read: (fd, buf, offset, length, pos, cb) => { | ||
PromisePrototypeThen(handle.read(buf, offset, length, pos), | ||
(r) => cb(null, r.bytesRead, r.buffer), | ||
(err) => cb(err, 0, buf)); | ||
}, | ||
write: (fd, buf, offset, length, pos, cb) => { | ||
PromisePrototypeThen(handle.write(buf, offset, length, pos), | ||
(r) => cb(null, r.bytesWritten, r.buffer), | ||
(err) => cb(err, 0, buf)); | ||
}, | ||
writev: (fd, buffers, pos, cb) => { | ||
PromisePrototypeThen(handle.writev(buffers, pos), | ||
(r) => cb(null, r.bytesWritten, r.buffers), | ||
(err) => cb(err, 0, buffers)); | ||
} | ||
}; | ||
}; | ||
|
||
function close(stream, err, cb) { | ||
if (!stream.fd) { | ||
// TODO(ronag) | ||
|
@@ -80,6 +114,32 @@ function close(stream, err, cb) { | |
} | ||
} | ||
|
||
function importFd(stream, options) { | ||
stream.fd = null; | ||
if (options.fd) { | ||
if (typeof options.fd === 'number') { | ||
// When fd is a raw descriptor, we must keep our fingers crossed | ||
// that the descriptor won't get closed, or worse, replaced with | ||
// another one | ||
// https://github.com/nodejs/node/issues/35862 | ||
stream.fd = options.fd; | ||
} else if (typeof options.fd === 'object' && | ||
options.fd instanceof FileHandle) { | ||
// When fd is a FileHandle we can listen for 'close' events | ||
if (options.fs) | ||
// FileHandle is not supported with custom fs operations | ||
throw new ERR_METHOD_NOT_IMPLEMENTED('FileHandle with fs'); | ||
stream[kHandle] = options.fd; | ||
stream.fd = options.fd.fd; | ||
stream[kFs] = FileHandleOperations(stream[kHandle]); | ||
stream[kHandle][kRef](); | ||
options.fd.on('close', FunctionPrototypeBind(stream.close, stream)); | ||
This comment has been minimized.
Sorry, something went wrong.
ronag
Member
|
||
} else | ||
throw ERR_INVALID_ARG_TYPE('options.fd', | ||
['number', 'FileHandle'], options.fd); | ||
} | ||
} | ||
|
||
function ReadStream(path, options) { | ||
if (!(this instanceof ReadStream)) | ||
return new ReadStream(path, options); | ||
|
@@ -115,10 +175,11 @@ function ReadStream(path, options) { | |
|
||
// Path will be ignored when fd is specified, so it can be falsy | ||
this.path = toPathIfFileURL(path); | ||
this.fd = options.fd === undefined ? null : options.fd; | ||
this.flags = options.flags === undefined ? 'r' : options.flags; | ||
this.mode = options.mode === undefined ? 0o666 : options.mode; | ||
|
||
importFd(this, options); | ||
|
||
this.start = options.start; | ||
this.end = options.end; | ||
this.pos = undefined; | ||
|
@@ -287,10 +348,11 @@ function WriteStream(path, options) { | |
|
||
// Path will be ignored when fd is specified, so it can be falsy | ||
this.path = toPathIfFileURL(path); | ||
this.fd = options.fd === undefined ? null : options.fd; | ||
this.flags = options.flags === undefined ? 'w' : options.flags; | ||
this.mode = options.mode === undefined ? 0o666 : options.mode; | ||
|
||
importFd(this, options); | ||
|
||
this.start = options.start; | ||
this.pos = undefined; | ||
this.bytesWritten = 0; | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,50 @@ | ||
'use strict'; | ||
const common = require('../common'); | ||
const fs = require('fs'); | ||
const assert = require('assert'); | ||
const path = require('path'); | ||
const tmpdir = require('../common/tmpdir'); | ||
const file = path.join(tmpdir.path, 'read_stream_filehandle_worker.txt'); | ||
const input = 'hello world'; | ||
const { Worker, isMainThread, workerData } = require('worker_threads'); | ||
|
||
if (isMainThread || !workerData) { | ||
tmpdir.refresh(); | ||
fs.writeFileSync(file, input); | ||
|
||
fs.promises.open(file, 'r').then((handle) => { | ||
handle.on('close', common.mustNotCall()); | ||
new Worker(__filename, { | ||
workerData: { handle }, | ||
transferList: [handle] | ||
}); | ||
}); | ||
fs.promises.open(file, 'r').then((handle) => { | ||
fs.createReadStream(null, { fd: handle }); | ||
assert.throws(() => { | ||
new Worker(__filename, { | ||
workerData: { handle }, | ||
transferList: [handle] | ||
}); | ||
}, { | ||
code: 25, | ||
}); | ||
}); | ||
} else { | ||
let output = ''; | ||
|
||
const handle = workerData.handle; | ||
handle.on('close', common.mustCall()); | ||
const stream = fs.createReadStream(null, { fd: handle }); | ||
|
||
stream.on('data', common.mustCallAtLeast((data) => { | ||
output += data; | ||
})); | ||
|
||
stream.on('end', common.mustCall(() => { | ||
handle.close(); | ||
assert.strictEqual(output, input); | ||
})); | ||
|
||
stream.on('close', common.mustCall()); | ||
} |
This is a bit weird. When having multiple refs the point is that we don't close.