From 0c52338496da02d2f09ab7da5e1a6bb73b84bb04 Mon Sep 17 00:00:00 2001 From: Alessandro Chitolina Date: Fri, 30 Oct 2020 15:10:57 +0100 Subject: [PATCH] [Filesystem] workaround for nodejs/node#35862 --- src/StreamWrapper/File/ReadableStream.js | 54 ++++++++++++++++++++++++ src/StreamWrapper/File/WritableStream.js | 48 +++++++++++++++++++++ src/StreamWrapper/FileStreamWrapper.js | 15 +++---- test/OpenFileTest.js | 34 ++++++++++++--- 4 files changed, 134 insertions(+), 17 deletions(-) create mode 100644 src/StreamWrapper/File/ReadableStream.js create mode 100644 src/StreamWrapper/File/WritableStream.js diff --git a/src/StreamWrapper/File/ReadableStream.js b/src/StreamWrapper/File/ReadableStream.js new file mode 100644 index 0000000..eff4f15 --- /dev/null +++ b/src/StreamWrapper/File/ReadableStream.js @@ -0,0 +1,54 @@ +import { Readable } from 'stream'; + +/** + * @memberOf Jymfony.Component.Filesystem.StreamWrapper.File + * @internal Use this instead of fs.createReadableStream led to undefined behavior + * (https://github.com/nodejs/node/issues/35862). + */ +export default class ReadableStream extends Readable { + /** + * Constructor + * + * @param {Jymfony.Component.Filesystem.StreamWrapper.File.Resource} resource + */ + constructor(resource) { + super(); + + /** + * @type {Jymfony.Component.Filesystem.StreamWrapper.File.Resource} + * + * @private + */ + this._resource = resource; + } + + /** + * @param {number} size + * + * @returns {Promise} + */ + async _read(size) { + const handle = this._resource.handle; + let result; + + try { + do { + const { buffer, bytesRead } = await handle.read(Buffer.alloc(size), 0, size, this._resource.position); + + if (0 === bytesRead) { + this.push(null); + return; + } + + this._resource.advance(bytesRead); + if (bytesRead !== size) { + result = this.push(buffer.slice(0, bytesRead)); + } else { + result = this.push(buffer); + } + } while (result); + } catch (err) { + this.destroy(err); + } + } +} diff --git a/src/StreamWrapper/File/WritableStream.js b/src/StreamWrapper/File/WritableStream.js new file mode 100644 index 0000000..16ed8a7 --- /dev/null +++ b/src/StreamWrapper/File/WritableStream.js @@ -0,0 +1,48 @@ +import { Writable } from 'stream'; + +/** + * @memberOf Jymfony.Component.Filesystem.StreamWrapper.File + */ +export default class WritableStream extends Writable { + /** + * Constructor + * + * @param {Jymfony.Component.Filesystem.StreamWrapper.File.Resource} resource + */ + constructor(resource) { + super(); + + /** + * @type {Jymfony.Component.Filesystem.StreamWrapper.File.Resource} + * + * @private + */ + this._resource = resource; + } + + /** + * @param {Buffer|string} chunk + * @param {string} encoding + * @param {Function} callback + * + * @returns {Promise} + * + * @private + */ + async _write(chunk, encoding, callback) { + const handle = this._resource.handle; + + try { + if (! isBuffer(chunk)) { + chunk = Buffer.from(chunk, encoding); + } + + const { bytesWritten } = await handle.write(chunk, 0, chunk.length, this._resource.position); + this._resource.advance(bytesWritten); + + callback(); + } catch (err) { + callback(err); + } + } +} diff --git a/src/StreamWrapper/FileStreamWrapper.js b/src/StreamWrapper/FileStreamWrapper.js index 0b3562e..f998cc9 100644 --- a/src/StreamWrapper/FileStreamWrapper.js +++ b/src/StreamWrapper/FileStreamWrapper.js @@ -13,14 +13,15 @@ import { symlink, unlink } from 'fs/promises'; -import { createReadStream, createWriteStream } from 'fs'; import { dirname, resolve as pathResolve } from 'path'; import { parse as urlParse } from 'url'; -const File = Jymfony.Component.Filesystem.File; const AbstractStreamWrapper = Jymfony.Component.Filesystem.StreamWrapper.AbstractStreamWrapper; +const File = Jymfony.Component.Filesystem.File; +const ReadableStream = Jymfony.Component.Filesystem.StreamWrapper.File.ReadableStream; const Resource = Jymfony.Component.Filesystem.StreamWrapper.File.Resource; const StreamWrapperInterface = Jymfony.Component.Filesystem.StreamWrapper.StreamWrapperInterface; +const WritableStream = Jymfony.Component.Filesystem.StreamWrapper.File.WritableStream; const Storage = function () {}; Storage.prototype = {}; @@ -201,20 +202,14 @@ export default class FileStreamWrapper extends AbstractStreamWrapper { * @inheritdoc */ createReadableStream(resource) { - return createReadStream(null, { - fd: resource.handle.fd, - autoClose: false, - }); + return new ReadableStream(resource); } /** * @inheritdoc */ createWritableStream(resource) { - return createWriteStream(null, { - fd: resource.handle.fd, - autoClose: false, - }); + return new WritableStream(resource); } /** diff --git a/test/OpenFileTest.js b/test/OpenFileTest.js index 25ce686..5211411 100644 --- a/test/OpenFileTest.js +++ b/test/OpenFileTest.js @@ -1,9 +1,11 @@ +import { Readable, Writable } from 'stream'; +import { fsyncSync, readFileSync } from 'fs'; +import { expect } from 'chai'; +import { promisify } from 'util'; + const OpenFile = Jymfony.Component.Filesystem.OpenFile; const FileStreamWrapper = Jymfony.Component.Filesystem.StreamWrapper.FileStreamWrapper; -const { expect } = require('chai'); -const fs = require('fs'); -const stream = require('stream'); describe('[Filesystem] OpenFile', function () { beforeEach(() => { @@ -33,24 +35,42 @@ describe('[Filesystem] OpenFile', function () { const file = await new OpenFile(__dirname + '/../fixtures/TESTFILE.txt', 'r'); const readable = await file.createReadableStream(); - expect(readable).to.be.instanceOf(stream.Readable); + expect(readable).to.be.instanceOf(Readable); + readable.on('data', buf => { + expect(buf.toString('utf-8')).to.be.equal('THIS IS A TEST\n'); + }); + + await new Promise((resolve, reject) => { + readable.on('end', resolve); + readable.on('error', reject); + + readable.read(); + }); await file.close(); }); it('createWritableStream should return a stream', async () => { - const file = await new OpenFile(__dirname + '/../fixtures/WRITEFILE', 'w'); + const path = __dirname + '/../fixtures/WRITEFILE'; + const file = await new OpenFile(path, 'w'); const writable = await file.createWritableStream(); - expect(writable).to.be.instanceOf(stream.Writable); + expect(writable).to.be.instanceOf(Writable); + await promisify(Writable.prototype.write).call(writable, 'This is ', 'utf-8'); + await promisify(Writable.prototype.write).call(writable, 'a te', 'utf-8'); + await promisify(Writable.prototype.write).call(writable, 'st of writing', 'utf-8'); await file.close(); + + const str = readFileSync(path, { encoding: 'utf-8' }); + expect(str).to.be.equal('This is a test of writing'); }); it('fwrite should write to file', async () => { const file = await new OpenFile(__dirname + '/../fixtures/WRITEFILE', 'w'); expect(await file.fwrite(Buffer.from('TEST FILE'))).to.be.equal(9); - fs.fsyncSync((await file._resource).handle.fd); + + fsyncSync((await file._resource).handle.fd); expect(await file.getSize()).to.be.equal(9); await file.close();