Skip to content

Commit

Permalink
fs: refactor to use Stream api
Browse files Browse the repository at this point in the history
  • Loading branch information
ronag committed Aug 17, 2019
1 parent f25bbf1 commit e1ba8c6
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 44 deletions.
57 changes: 27 additions & 30 deletions lib/internal/fs/streams.js
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@ const {
} = require('internal/fs/utils');
const { Readable, Writable } = require('stream');
const { toPathIfFileURL } = require('internal/url');
const { errorOrDestroy } = require('internal/streams/destroy');

const kMinPoolSpace = 128;
const kPending = Symbol('pending');

let pool;
// It can happen that we expect to read a large chunk of data, and reserve
Expand Down Expand Up @@ -67,17 +69,20 @@ function ReadStream(path, options) {
// For backwards compat do not emit close on destroy.
options.emitClose = false;

options.autoDestroy = options.autoClose == null ? true : options.autoClose;

Readable.call(this, options);

// Path will be ignored when fd is specified, so it can be falsy
this.path = toPathIfFileURL(path);
this.fd = options.fd === undefined ? null : options.fd;
this.flags = options.flags === undefined ? 'r' : options.flags;
this.mode = options.mode === undefined ? 0o666 : options.mode;
this[kPending] = !this.fd;

this.start = options.start;
this.end = options.end;
this.autoClose = options.autoClose === undefined ? true : options.autoClose;
this.autoClose = options.autoDestroy;
this.pos = undefined;
this.bytesRead = 0;
this.closed = false;
Expand All @@ -104,23 +109,16 @@ function ReadStream(path, options) {

if (typeof this.fd !== 'number')
this.open();

this.on('end', function() {
if (this.autoClose) {
this.destroy();
}
});
}
Object.setPrototypeOf(ReadStream.prototype, Readable.prototype);
Object.setPrototypeOf(ReadStream, Readable);

ReadStream.prototype.open = function() {
fs.open(this.path, this.flags, this.mode, (er, fd) => {
this[kPending] = false;

if (er) {
if (this.autoClose) {
this.destroy();
}
this.emit('error', er);
errorOrDestroy(this, er);
return;
}

Expand Down Expand Up @@ -167,10 +165,7 @@ ReadStream.prototype._read = function(n) {
// the actual read.
fs.read(this.fd, pool, pool.used, toRead, this.pos, (er, bytesRead) => {
if (er) {
if (this.autoClose) {
this.destroy();
}
this.emit('error', er);
errorOrDestroy(this, er);
} else {
let b = null;
// Now that we know how much data we have actually read, re-wind the
Expand Down Expand Up @@ -206,8 +201,13 @@ ReadStream.prototype._read = function(n) {
};

ReadStream.prototype._destroy = function(err, cb) {
if (typeof this.fd !== 'number') {
this.once('open', closeFsStream.bind(null, this, cb, err));
if (this[kPending]) {
this.once('open', this._destroy.bind(this, err, cb));
return;
}

if (!this.fd) {
cb(err);
return;
}

Expand Down Expand Up @@ -243,16 +243,19 @@ function WriteStream(path, options) {
// For backwards compat do not emit close on destroy.
options.emitClose = false;

options.autoDestroy = options.autoClose == null ? true : options.autoClose;

Writable.call(this, options);

// Path will be ignored when fd is specified, so it can be falsy
this.path = toPathIfFileURL(path);
this.fd = options.fd === undefined ? null : options.fd;
this.flags = options.flags === undefined ? 'w' : options.flags;
this.mode = options.mode === undefined ? 0o666 : options.mode;
this[kPending] = !this.fd;

this.start = options.start;
this.autoClose = options.autoClose === undefined ? true : !!options.autoClose;
this.autoClose = options.autoDestroy;
this.pos = undefined;
this.bytesWritten = 0;
this.closed = false;
Expand All @@ -274,19 +277,18 @@ Object.setPrototypeOf(WriteStream, Writable);

WriteStream.prototype._final = function(callback) {
if (this.autoClose) {
this.destroy();
this._destroy(null, callback);
} else {
callback();
}

callback();
};

WriteStream.prototype.open = function() {
fs.open(this.path, this.flags, this.mode, (er, fd) => {
this[kPending] = false;

if (er) {
if (this.autoClose) {
this.destroy();
}
this.emit('error', er);
errorOrDestroy(this, er);
return;
}

Expand All @@ -296,7 +298,6 @@ WriteStream.prototype.open = function() {
});
};


WriteStream.prototype._write = function(data, encoding, cb) {
if (!(data instanceof Buffer)) {
const err = new ERR_INVALID_ARG_TYPE('data', 'Buffer', data);
Expand All @@ -311,9 +312,6 @@ WriteStream.prototype._write = function(data, encoding, cb) {

fs.write(this.fd, data, 0, data.length, this.pos, (er, bytes) => {
if (er) {
if (this.autoClose) {
this.destroy();
}
return cb(er);
}
this.bytesWritten += bytes;
Expand Down Expand Up @@ -358,7 +356,6 @@ WriteStream.prototype._writev = function(data, cb) {

writev(this.fd, chunks, this.pos, function(er, bytes) {
if (er) {
self.destroy();
return cb(er);
}
self.bytesWritten += bytes;
Expand Down
24 changes: 10 additions & 14 deletions test/parallel/test-file-write-stream.js
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,6 @@ file
callbacks.open++;
assert.strictEqual(typeof fd, 'number');
})
.on('error', function(err) {
throw err;
})
.on('drain', function() {
console.error('drain!', callbacks.drain);
callbacks.drain++;
Expand All @@ -65,17 +62,16 @@ file
assert.strictEqual(file.bytesWritten, EXPECTED.length * 2);

callbacks.close++;
common.expectsError(
() => {
console.error('write after end should not be allowed');
file.write('should not work anymore');
},
{
code: 'ERR_STREAM_WRITE_AFTER_END',
type: Error,
message: 'write after end'
}
);
file.on('error', common.expectsError({
code: 'ERR_STREAM_WRITE_AFTER_END',
type: Error,
message: 'write after end'
}));
file.write('should not work anymore', common.expectsError({
code: 'ERR_STREAM_WRITE_AFTER_END',
type: Error,
message: 'write after end'
}));

fs.unlinkSync(fn);
});
Expand Down

0 comments on commit e1ba8c6

Please sign in to comment.