Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add TailFileStream #1

Merged
merged 4 commits into from
Mar 13, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 36 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
name: CI

on: [push, pull_request]

permissions:
contents: read

jobs:
test:
name: Test
runs-on: ${{ matrix.os }}

strategy:
matrix:
node-version: [18, 20]
os: [ubuntu-latest, windows-latest, macOS-latest]

steps:
- uses: actions/checkout@v3

- name: Use Node.js ${{ matrix.node-version }}
uses: actions/setup-node@v3
with:
node-version: ${{ matrix.node-version }}

- name: Install
run: |
npm install

- name: Lint
run: |
npm run lint

- name: Test
run: |
npm test
99 changes: 98 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1 +1,98 @@
# tail-file-stream
# tail-file-stream

__tail-file-stream__ is a library that provides a streaming interface for reading a file that can be appended.
It implements the Node.js fs.ReadStream functionality and watches the file for changes.
When the file is appended, the stream emits a 'data' event with the new data.

- [Install](#install)
- [Usage](#usage)
- [API](#api)
- [createReadStream(path, [options])](#createreadstreampath-options)
- [TailFileStream](#tailfilestream)
- [License](#license)

<a name="install"></a>

## Install

```
npm i tail-file-stream --save
```

<a name="usage"></a>

## Usage

```js
const { createReadStream } = require('tail-file-stream')

const stream = createReadStream('./foo.txt')

stream.on('data', (chunk) => {
// Emits when has new data
})
stream.on('eof', () => {
// Emits when reaches the end of the file
})
stream.on('end', () => {
// Emits when stream ends
})

setTimeout(() => {
// Stop watching the file
stream.unwatch()
}, 10000)

```

<a name="api"></a>

## API

<a name="create-read-stream"></a>

#### createReadStream(path, [options])

Creates a new `TailFileStream` instance.

- `path` `<string>` The file path to be read.
- `options` `<Object>` Options for the stream.
- `flags` `<string>` See [support of file system](https://nodejs.org/docs/latest/api/fs.html#file-system-flags) flags. Default: `'r'`.
- `mode` `<number>` Default: `0o666`.
- `start` `<number>` The byte offset to start reading from. Default: `0`.
- `end` `<number>` The byte offset to stop reading. Default: `Infinity`.
- `highWaterMark` `<number>` The maximum number of bytes to store in the internal buffer. Default: `64 * 1024`.
- `autoWatch` `<boolean>` If `true`, the file will be watched for changes from the beginning. Default: `true`.
- `persistent` `<boolean>` Indicates whether the process should continue to run as long as files are being watched. Default: `true`.
- Returns: `TailFileStream` The stream instance.

#### TailFileStream

The `TailFileStream` class extends the Node.js [Readable](https://nodejs.org/docs/latest/api/stream.html#class-streamreadable) stream.

`TailFileStream` emits the following events:

- `close` - Emits when the file is closed.
- `eof` - Emits when reaches the end of the file.
- `open` - Emits when the file is opened.
- `ready` - Emits when the file is ready to be read.

`TailFileStream` has the following properties:

- `bytesRead` `<number>` The number of bytes read from the file.
- `path` `<string>` The file path.
- `pending` `<boolean>` This property is true if the underlying file has not been opened yet, i.e. before the 'ready' event is emitted.
- `watching` `<boolean>` Indicates whether the file is being watched.
- `waiting` `<boolean>` Indicates whether the stream is waiting for file changes.

`TailFileStream` has the following methods:

- `watch()` - Starts watching the file for changes.
- `unwatch()` - Stops watching the file. If the stream is waiting for file changes, it will be closed.
If the stream is reading, the data will be read until the end of the file and then it will be closed.

<a name="license"></a>

## License

MIT
27 changes: 27 additions & 0 deletions index.d.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/// <reference types="node" />
mcollina marked this conversation as resolved.
Show resolved Hide resolved

import { Readable } from 'node:stream';

declare class TailFileStream extends Readable {
constructor(path: string, options?: TailFileStreamOptions);
readonly pending: boolean;
readonly watching: boolean;
readonly waiting: boolean;
close(cb?: (err?: NodeJS.ErrnoException) => void): void;
watch(): void;
unwatch(): void;
}

interface TailFileStreamOptions {
highWaterMark?: number;
flags?: string;
mode?: number;
start?: number;
end?: number;
autoWatch?: boolean;
persistent?: boolean;
}

declare function createReadStream(path: string, options?: TailFileStreamOptions): TailFileStream;

export { TailFileStream, TailFileStreamOptions, createReadStream };
185 changes: 185 additions & 0 deletions index.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,185 @@
'use strict'
mcollina marked this conversation as resolved.
Show resolved Hide resolved

const { Readable, finished } = require('node:stream')
const fs = require('node:fs')

const ioDone = Symbol('ioDone')

class TailFileStream extends Readable {
#watching = false
#waiting = false
#performingIO = false

#start
#end
#pos
#flags
#mode
#fd

#autoWatch
#persistent

constructor (path, options = {}) {
if (options.highWaterMark === undefined) {
options.highWaterMark = 64 * 1024
}

super(options)

this.path = path
this.#flags = options.flags ?? 'r'
this.#mode = options.mode ?? 0o666

this.#start = options.start ?? 0
this.#end = options.end ?? Infinity
this.#pos = this.#start
this.bytesRead = 0

this.#autoWatch = options.autoWatch ?? true
this.#persistent = options.persistent ?? true

this.#fd = null
this.watcher = null
}

get pending () {
return this.#fd === null
}

get watching () {
return this.#watching
}

get waiting () {
return this.#waiting
}

_construct (cb) {
if (this.#autoWatch) {
this.watch()
}

fs.open(this.path, this.#flags, this.#mode, (err, fd) => {
if (err) {
cb(err)
} else {
this.#fd = fd
cb()
this.emit('open', this.#fd)
this.emit('ready')
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
this.emit('open', this.#fd)
this.emit('ready')
process.nextTick(() => {
this.emit('open', this.#fd)
this.emit('ready')
})

See nodejs/node#51993

}
})
}

_read (n) {
n = Math.min(this.#end - this.#pos + 1, n)

if (n <= 0) {
this.push(null)
return
}

const buf = Buffer.allocUnsafeSlow(n)

this.#performingIO = true
fs.read(this.#fd, buf, 0, n, this.#pos, (err, bytesRead, buf) => {
this.#performingIO = false

/* c8 ignore next 4 */
if (this.destroyed) {
this.emit(ioDone, err)
return
}

if (err) {
this.emit('error', err)
this.destroy(err)
return
}

if (bytesRead === 0) {
if (this.#watching) {
this.#waiting = true
this.watcher.once('change', () => {
this.#waiting = false
this._read(n)
})
} else {
this.push(null)
}
this.emit('eof')
return
}

this.#pos += bytesRead
this.bytesRead += bytesRead

if (bytesRead !== buf.length) {
const dst = Buffer.allocUnsafeSlow(bytesRead)
buf.copy(dst, 0, 0, bytesRead)
buf = dst
}

this.push(buf)
})
}

watch () {
this.#watching = true

if (!this.watcher) {
this.watcher = fs.watch(this.path, {
persistent: this.#persistent
})
}
}

unwatch () {
this.#autoWatch = false
this.#watching = false

if (this.watcher) {
this.watcher.close()
this.watcher = null
}

if (this.#waiting) {
this.#waiting = false
this.push(null)
}
}

_destroy (err, cb) {
mcollina marked this conversation as resolved.
Show resolved Hide resolved
/* c8 ignore next 3 */
if (this.#performingIO) {
this.once(ioDone, (er) => this.#close(err || er, cb))
} else {
this.#close(err, cb)
}
}

close (cb) {
if (typeof cb === 'function') finished(this, cb)
this.destroy()
}

#close (err, cb) {
this.unwatch()

if (this.#fd) {
fs.close(this.#fd, (er) => cb(er || err))
this.#fd = null
} else {
cb(err)
}
}
}

function createReadStream (path, options) {
return new TailFileStream(path, options)
}

module.exports = {
createReadStream
}
Loading
Loading