Skip to content

Commit

Permalink
feat: 🎸 implement FSA ReadStream
Browse files Browse the repository at this point in the history
  • Loading branch information
streamich committed Jun 20, 2023
1 parent 7e1a844 commit bc50fc5
Show file tree
Hide file tree
Showing 5 changed files with 185 additions and 29 deletions.
26 changes: 24 additions & 2 deletions src/fsa-to-node/FsaNodeFs.ts
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ import { FsaNodeStats } from './FsaNodeStats';
import process from '../process';
import { FsSynchronousApi } from '../node/types/FsSynchronousApi';
import { FsaNodeWriteStream } from './FsaNodeWriteStream';
import { FsaNodeReadStream } from './FsaNodeReadStream';
import { FsaNodeCore } from './FsaNodeCore';
import type { FsCallbackApi, FsPromisesApi } from '../node/types';
import type * as misc from '../node/types/misc';
Expand Down Expand Up @@ -758,11 +759,32 @@ export class FsaNodeFs extends FsaNodeCore implements FsCallbackApi, FsSynchrono
return stream;
};

public readonly createReadStream: FsCallbackApi['createReadStream'] = (path: misc.PathLike, options?: opts.IReadStreamOptions | string): misc.IReadStream => {
const defaults: opts.IReadStreamOptions = {
flags: 'r',
fd: null,
mode: 0o666,
autoClose: true,
emitClose: true,
start: 0,
end: Infinity,
highWaterMark: 64 * 1024,
fs: null,
signal: null,
};
const optionsObj: opts.IReadStreamOptions = getOptions<opts.IReadStreamOptions>(defaults, options);
const filename = pathToFilename(path);
const flags = flagsToNumber(optionsObj.flags);
const fd: number = optionsObj.fd ? (typeof optionsObj.fd === 'number' ? optionsObj.fd : optionsObj.fd.fd) : 0;
const handle = fd ? this.getFileByFdAsync(fd) : this.__open(filename, flags, 0);
const stream = new FsaNodeReadStream(this, handle, filename, optionsObj);
return stream;
};

public readonly symlink: FsCallbackApi['symlink'] = notSupported;
public readonly link: FsCallbackApi['link'] = notSupported;
public readonly watchFile: FsCallbackApi['watchFile'] = notSupported;
public readonly unwatchFile: FsCallbackApi['unwatchFile'] = notSupported;
public readonly createReadStream: FsCallbackApi['createReadStream'] = notSupported;
public readonly watch: FsCallbackApi['watch'] = notSupported;

// --------------------------------------------------------- FsSynchronousApi
Expand Down Expand Up @@ -1033,10 +1055,10 @@ export class FsaNodeFs extends FsaNodeCore implements FsCallbackApi, FsSynchrono
public readonly Dirent = FsaNodeDirent;
public readonly Stats = FsaNodeStats<any>;
public readonly WriteStream = FsaNodeWriteStream;
public readonly ReadStream = FsaNodeReadStream;

public readonly StatFs = 0 as any;
public readonly Dir = 0 as any;
public readonly StatsWatcher = 0 as any;
public readonly FSWatcher = 0 as any;
public readonly ReadStream = 0 as any;
}
93 changes: 93 additions & 0 deletions src/fsa-to-node/FsaNodeReadStream.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
import { Readable } from 'stream';
import { Defer } from 'thingies/es6/Defer';
import { concurrency } from 'thingies/es6/concurrency';
import type { FsaNodeFsOpenFile } from './FsaNodeFsOpenFile';
import type { IReadStream } from '../node/types/misc';
import type { IReadStreamOptions } from '../node/types/options';
import type { FsaNodeFs } from './FsaNodeFs';

export class FsaNodeReadStream extends Readable implements IReadStream {
protected __pending__: boolean = true;
protected __closed__: boolean = false;
protected __bytes__: number = 0;
protected readonly __mutex__ = concurrency(1);
protected readonly __file__ = new Defer<FsaNodeFsOpenFile>();

public constructor(
protected readonly fs: FsaNodeFs,
protected readonly handle: Promise<FsaNodeFsOpenFile>,
public readonly path: string,
protected readonly options: IReadStreamOptions,
) {
super();
handle
.then((file) => {
if (this.__closed__) return;
this.__file__.resolve(file);
if (this.options.fd !== undefined) this.emit('open', file.fd);
this.emit('ready');
})
.catch(error => {
this.__file__.reject(error);
})
.finally(() => {
this.__pending__ = false;
});
}

private async __read__(): Promise<Uint8Array | undefined> {
return await this.__mutex__<Uint8Array | undefined>(async () => {
if (this.__closed__) return;
const {file} = await this.__file__.promise;
const blob = await file.getFile();
const buffer = await blob.arrayBuffer();
const start = this.options.start || 0;
let end = typeof this.options.end === 'number' ? this.options.end + 1 : buffer.byteLength;
if (end > buffer.byteLength) end = buffer.byteLength;
const uint8 = new Uint8Array(buffer, start, end - start);
return uint8;
});
}

private __close__(): void {
if (this.__closed__) return;
this.__closed__ = true;
if (this.options.autoClose) {
this.__file__.promise
.then(file => {
this.fs.close(file.fd, () => {
this.emit('close');
})
return file.close();
})
.catch(error => {});
}
}

// -------------------------------------------------------------- IReadStream

public get bytesRead(): number {
return this.__bytes__;
}

public get pending(): boolean {
return this.__pending__;
}

// ----------------------------------------------------------------- Readable

_read() {
this.__read__()
.then((uint8: Uint8Array) => {
if (this.__closed__) return;
if (!uint8) return this.push(null);
this.__bytes__ += uint8.length;
this.__close__();
this.push(uint8);
this.push(null);
}, (error) => {
this.__close__();
this.destroy(error);
});
}
}
25 changes: 0 additions & 25 deletions src/fsa-to-node/FsaNodeWriteStream copy.ts

This file was deleted.

66 changes: 66 additions & 0 deletions src/fsa-to-node/__tests__/FsaNodeFs.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -816,4 +816,70 @@ onlyOnNode20('FsaNodeFs', () => {
}
});
});

describe('.createReadStream()', () => {
test('can pipe fs.ReadStream to fs.WriteStream', async () => {
const { fs, mfs } = setup({ folder: { file: 'test' }, 'empty-folder': null, 'f.html': 'test' });
const readStream = fs.createReadStream('/folder/file');
const writeStream = fs.createWriteStream('/folder/file2');
readStream.pipe(writeStream);
await new Promise(resolve => writeStream.once('close', resolve));
expect(mfs.__vol.toJSON()).toStrictEqual({
'/mountpoint/folder/file': 'test',
'/mountpoint/folder/file2': 'test',
'/mountpoint/empty-folder': null,
'/mountpoint/f.html': 'test',
});
});

test('emits "open" event', async () => {
const { fs } = setup({ folder: { file: 'test' }, 'empty-folder': null, 'f.html': 'test' });
const readStream = fs.createReadStream('/folder/file');
const fd = await new Promise(resolve => readStream.once('open', resolve));
expect(typeof fd).toBe('number');
});

test('emits "ready" event', async () => {
const { fs } = setup({ folder: { file: 'test' }, 'empty-folder': null, 'f.html': 'test' });
const readStream = fs.createReadStream('/folder/file');
await new Promise(resolve => readStream.once('ready', resolve));
});

test('emits "close" event', async () => {
const { fs } = setup({ folder: { file: 'test' }, 'empty-folder': null, 'f.html': 'test' });
const readStream = fs.createReadStream('/folder/file', {emitClose: true});
const writeStream = fs.createWriteStream('/folder/file2');
readStream.pipe(writeStream);
await new Promise(resolve => readStream.once('close', resolve));
});

test('can write to already open file', async () => {
const { fs, mfs } = setup({ folder: { file: 'test' }, 'empty-folder': null, 'f.html': 'test' });
const handle = await fs.promises.open('/folder/file');
const readStream = fs.createReadStream('xyz', {fd: handle.fd});
const writeStream = fs.createWriteStream('/folder/file2');
readStream.pipe(writeStream);
await new Promise(resolve => writeStream.once('close', resolve));
expect(mfs.__vol.toJSON()).toStrictEqual({
'/mountpoint/folder/file': 'test',
'/mountpoint/folder/file2': 'test',
'/mountpoint/empty-folder': null,
'/mountpoint/f.html': 'test',
});
});

test('can read a specified slice of a file', async () => {
const { fs, mfs } = setup({ folder: { file: 'test' }, 'empty-folder': null, 'f.html': 'test' });
const readStream = fs.createReadStream('/folder/file', {start: 1, end: 2});
const writeStream = fs.createWriteStream('/folder/file2');
readStream.pipe(writeStream);
await new Promise(resolve => writeStream.once('close', resolve));
expect(mfs.__vol.toJSON()).toStrictEqual({
'/mountpoint/folder/file': 'test',
'/mountpoint/folder/file2': 'es',
'/mountpoint/empty-folder': null,
'/mountpoint/f.html': 'test',
});
});
});
});
4 changes: 2 additions & 2 deletions src/node/types/options.ts
Original file line number Diff line number Diff line change
Expand Up @@ -60,11 +60,11 @@ export interface IWatchFileOptions {
interval?: number;
}

export interface IReadStreamOptions {
export interface IReadStreamOptions extends IOptions {
/** Defaults to `'r'`. */
flags?: TFlags;
/** Defaults to `null`. */
encoding?: BufferEncoding | null;
encoding?: BufferEncoding;
/** Defaults to `null`. */
fd?: number | IFileHandle | null;
/** Defaults to 0o666 */
Expand Down

0 comments on commit bc50fc5

Please sign in to comment.