Skip to content

Commit

Permalink
watch: debounce restart in watch mode
Browse files Browse the repository at this point in the history
  • Loading branch information
matthieusieben committed Mar 6, 2024
1 parent 384fd17 commit a295353
Show file tree
Hide file tree
Showing 2 changed files with 172 additions and 2 deletions.
169 changes: 169 additions & 0 deletions lib/internal/debounce_iterable.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,169 @@
'use strict';

const {
ArrayPrototypePushApply,
ObjectGetPrototypeOf,
ObjectSetPrototypeOf,
Promise,
PromiseResolve,
SymbolAsyncIterator,
SymbolIterator,
} = primordials;

const {
codes: { ERR_INVALID_ARG_TYPE },
} = require('internal/errors');
const FixedQueue = require('internal/fixed_queue');

const AsyncIteratorPrototype = ObjectGetPrototypeOf(
ObjectGetPrototypeOf(async function* () {}).prototype
);

/**
* Wraps an iterable in a debounced iterable. When trying to get the next item,
* the debounced iterable will group all items that are returned less than
* `delay` milliseconds apart into a single batch.
*
* The debounced iterable will only start consuming the original iterable when
* the first consumer requests the next item, and will stop consuming the
* original iterable when no more items are requested (through `next` calls).
*
* Each debounced iterable item will be an array of items from the original
* iterable, and will always contain at least one item. This allows the consumer
* to decide how to handle the batch of items (e.g. take tha latest only, ensure
* unicity, etc.).
*
* @template T
* @param {Iterable<T> | AsyncIterable<T>} iterable
* @param {number} delay
* @returns {AsyncIterableIterator<[T, ...T[]]>}
*/
exports.debounceIterable = function debounceIterable(iterable, delay) {
const innerIterator =
SymbolAsyncIterator in iterable
? iterable[SymbolAsyncIterator]()
: iterable[SymbolIterator]();

let doneProducing = false;
let doneConsuming = false;
let consuming = false;
let error = null;
let timer = null;

const unconsumedPromises = new FixedQueue();
let unconsumedValues = [];

return ObjectSetPrototypeOf(
{
[SymbolAsyncIterator]() {
return this;
},

next() {
return new Promise((resolve, reject) => {
unconsumedPromises.push({ resolve, reject });
startConsuming();
});
},

return() {
return closeHandler();
},

throw(err) {
if (!err || !(err instanceof Error)) {
throw new ERR_INVALID_ARG_TYPE('AsyncIterator.throw', 'Error', err);
}
errorHandler(err);
},
},
AsyncIteratorPrototype
);

async function startConsuming() {
if (consuming) return;

consuming = true;

while (!doneProducing && !doneConsuming && !unconsumedPromises.isEmpty()) {
try {
// if `result` takes longer than `delay` to resolve, make sure any
// unconsumedValue are flushed.
scheduleFlush();

const result = await innerIterator.next();

// A new value just arrived. Make sure we wont flush just yet.
unscheduleFlush();

if (result.done) {
doneProducing = true;
} else if (!doneConsuming) {
ArrayPrototypePushApply(unconsumedValues, result.value);
}
} catch (err) {
doneProducing = true;
error ||= err;
}
}

flushNow();

consuming = false;
}

function scheduleFlush() {
if (timer == null) {
timer = setTimeout(flushNow, delay).unref();
}
}

function unscheduleFlush() {
if (timer != null) {
clearTimeout(timer);
timer = null;
}
}

function flushNow() {
unscheduleFlush();

if (!doneConsuming) {
if (unconsumedValues.length > 0 && !unconsumedPromises.isEmpty()) {
unconsumedPromises
.shift()
.resolve({ done: false, value: unconsumedValues });
unconsumedValues = [];
}
if (doneProducing && unconsumedValues.length === 0) {
doneConsuming = true;
}
}

while (doneConsuming && !unconsumedPromises.isEmpty()) {
const { resolve, reject } = unconsumedPromises.shift();
if (error) reject(error);
else resolve({ done: true, value: undefined });
}
}

function errorHandler(err) {
error ||= err;

closeHandler();
}

function closeHandler() {
doneConsuming = true;
unconsumedValues = [];

flushNow();

if (!doneProducing) {
doneProducing = true;
innerIterator.return?.();
}

return PromiseResolve({ done: true, value: undefined });
}
};
5 changes: 3 additions & 2 deletions lib/internal/main/watch_mode.js
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ const {
triggerUncaughtException,
exitCodes: { kNoFailure },
} = internalBinding('errors');
const { debounceIterable } = require('internal/debounce_iterable');
const { getOptionValue } = require('internal/options');
const { emitExperimentalWarning } = require('internal/util');
const { FilesWatcher } = require('internal/watch_mode/files_watcher');
Expand Down Expand Up @@ -44,7 +45,7 @@ const args = ArrayPrototypeFilter(process.execArgv, (arg, i, arr) =>
arg !== '--watch' && !StringPrototypeStartsWith(arg, '--watch=') && arg !== '--watch-preserve-output');
ArrayPrototypePushApply(args, kCommand);

const watcher = new FilesWatcher({ debounce: 200, mode: kShouldFilterModules ? 'filter' : 'all' });
const watcher = new FilesWatcher({ mode: kShouldFilterModules ? 'filter' : 'all' });
ArrayPrototypeForEach(kWatchedPaths, (p) => watcher.watchPath(p));

let graceTimer;
Expand Down Expand Up @@ -117,7 +118,7 @@ async function restart() {
start();

// eslint-disable-next-line no-unused-vars
for await (const _ of on(watcher, 'changed')) {
for await (const _ of debounceIterable(on(watcher, 'changed'), 200)) {
await restart();
}
} catch (error) {
Expand Down

0 comments on commit a295353

Please sign in to comment.