From e1ba8c6530362dcc807aad620892b557570c69cb Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Thu, 8 Aug 2019 18:12:06 +0200 Subject: [PATCH] fs: refactor to use Stream api --- lib/internal/fs/streams.js | 57 ++++++++++++------------- test/parallel/test-file-write-stream.js | 24 +++++------ 2 files changed, 37 insertions(+), 44 deletions(-) diff --git a/lib/internal/fs/streams.js b/lib/internal/fs/streams.js index dfff08dbbd1d2a..92e32c2258c86f 100644 --- a/lib/internal/fs/streams.js +++ b/lib/internal/fs/streams.js @@ -19,8 +19,10 @@ const { } = require('internal/fs/utils'); const { Readable, Writable } = require('stream'); const { toPathIfFileURL } = require('internal/url'); +const { errorOrDestroy } = require('internal/streams/destroy'); const kMinPoolSpace = 128; +const kPending = Symbol('pending'); let pool; // It can happen that we expect to read a large chunk of data, and reserve @@ -67,6 +69,8 @@ function ReadStream(path, options) { // For backwards compat do not emit close on destroy. options.emitClose = false; + options.autoDestroy = options.autoClose == null ? true : options.autoClose; + Readable.call(this, options); // Path will be ignored when fd is specified, so it can be falsy @@ -74,10 +78,11 @@ function ReadStream(path, options) { this.fd = options.fd === undefined ? null : options.fd; this.flags = options.flags === undefined ? 'r' : options.flags; this.mode = options.mode === undefined ? 0o666 : options.mode; + this[kPending] = !this.fd; this.start = options.start; this.end = options.end; - this.autoClose = options.autoClose === undefined ? true : options.autoClose; + this.autoClose = options.autoDestroy; this.pos = undefined; this.bytesRead = 0; this.closed = false; @@ -104,23 +109,16 @@ function ReadStream(path, options) { if (typeof this.fd !== 'number') this.open(); - - this.on('end', function() { - if (this.autoClose) { - this.destroy(); - } - }); } Object.setPrototypeOf(ReadStream.prototype, Readable.prototype); Object.setPrototypeOf(ReadStream, Readable); ReadStream.prototype.open = function() { fs.open(this.path, this.flags, this.mode, (er, fd) => { + this[kPending] = false; + if (er) { - if (this.autoClose) { - this.destroy(); - } - this.emit('error', er); + errorOrDestroy(this, er); return; } @@ -167,10 +165,7 @@ ReadStream.prototype._read = function(n) { // the actual read. fs.read(this.fd, pool, pool.used, toRead, this.pos, (er, bytesRead) => { if (er) { - if (this.autoClose) { - this.destroy(); - } - this.emit('error', er); + errorOrDestroy(this, er); } else { let b = null; // Now that we know how much data we have actually read, re-wind the @@ -206,8 +201,13 @@ ReadStream.prototype._read = function(n) { }; ReadStream.prototype._destroy = function(err, cb) { - if (typeof this.fd !== 'number') { - this.once('open', closeFsStream.bind(null, this, cb, err)); + if (this[kPending]) { + this.once('open', this._destroy.bind(this, err, cb)); + return; + } + + if (!this.fd) { + cb(err); return; } @@ -243,6 +243,8 @@ function WriteStream(path, options) { // For backwards compat do not emit close on destroy. options.emitClose = false; + options.autoDestroy = options.autoClose == null ? true : options.autoClose; + Writable.call(this, options); // Path will be ignored when fd is specified, so it can be falsy @@ -250,9 +252,10 @@ function WriteStream(path, options) { this.fd = options.fd === undefined ? null : options.fd; this.flags = options.flags === undefined ? 'w' : options.flags; this.mode = options.mode === undefined ? 0o666 : options.mode; + this[kPending] = !this.fd; this.start = options.start; - this.autoClose = options.autoClose === undefined ? true : !!options.autoClose; + this.autoClose = options.autoDestroy; this.pos = undefined; this.bytesWritten = 0; this.closed = false; @@ -274,19 +277,18 @@ Object.setPrototypeOf(WriteStream, Writable); WriteStream.prototype._final = function(callback) { if (this.autoClose) { - this.destroy(); + this._destroy(null, callback); + } else { + callback(); } - - callback(); }; WriteStream.prototype.open = function() { fs.open(this.path, this.flags, this.mode, (er, fd) => { + this[kPending] = false; + if (er) { - if (this.autoClose) { - this.destroy(); - } - this.emit('error', er); + errorOrDestroy(this, er); return; } @@ -296,7 +298,6 @@ WriteStream.prototype.open = function() { }); }; - WriteStream.prototype._write = function(data, encoding, cb) { if (!(data instanceof Buffer)) { const err = new ERR_INVALID_ARG_TYPE('data', 'Buffer', data); @@ -311,9 +312,6 @@ WriteStream.prototype._write = function(data, encoding, cb) { fs.write(this.fd, data, 0, data.length, this.pos, (er, bytes) => { if (er) { - if (this.autoClose) { - this.destroy(); - } return cb(er); } this.bytesWritten += bytes; @@ -358,7 +356,6 @@ WriteStream.prototype._writev = function(data, cb) { writev(this.fd, chunks, this.pos, function(er, bytes) { if (er) { - self.destroy(); return cb(er); } self.bytesWritten += bytes; diff --git a/test/parallel/test-file-write-stream.js b/test/parallel/test-file-write-stream.js index d66657d690f1b2..4ba0eecbee62c4 100644 --- a/test/parallel/test-file-write-stream.js +++ b/test/parallel/test-file-write-stream.js @@ -46,9 +46,6 @@ file callbacks.open++; assert.strictEqual(typeof fd, 'number'); }) - .on('error', function(err) { - throw err; - }) .on('drain', function() { console.error('drain!', callbacks.drain); callbacks.drain++; @@ -65,17 +62,16 @@ file assert.strictEqual(file.bytesWritten, EXPECTED.length * 2); callbacks.close++; - common.expectsError( - () => { - console.error('write after end should not be allowed'); - file.write('should not work anymore'); - }, - { - code: 'ERR_STREAM_WRITE_AFTER_END', - type: Error, - message: 'write after end' - } - ); + file.on('error', common.expectsError({ + code: 'ERR_STREAM_WRITE_AFTER_END', + type: Error, + message: 'write after end' + })); + file.write('should not work anymore', common.expectsError({ + code: 'ERR_STREAM_WRITE_AFTER_END', + type: Error, + message: 'write after end' + })); fs.unlinkSync(fn); });