Skip to content

Commit

Permalink
fix(debounce): fix AsyncIterable debounce and reenable tests
Browse files Browse the repository at this point in the history
  • Loading branch information
trxcllnt committed Sep 1, 2020
1 parent 954ce58 commit 480996e
Show file tree
Hide file tree
Showing 6 changed files with 96 additions and 78 deletions.
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
"@types/jest": "25.1.3",
"@typescript-eslint/eslint-plugin": "2.21.0",
"@typescript-eslint/parser": "2.21.0",
"abortcontroller-polyfill": "1.4.0",
"async-done": "1.3.2",
"benchmark": "2.1.4",
"command-line-args": "5.1.1",
Expand Down
3 changes: 2 additions & 1 deletion spec/Ix.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import 'web-streams-polyfill';
import 'abortcontroller-polyfill/dist/abortcontroller-polyfill-only';

// import this before assigning window global since it does a `typeof window` check
// eslint-disable-next-line @typescript-eslint/no-require-imports
Expand All @@ -12,7 +13,7 @@ Object.defineProperty(ArrayBuffer, Symbol.hasInstance, {
configurable: true,
value(inst: any) {
return inst && inst.constructor && inst.constructor.name === 'ArrayBuffer';
}
},
});

// Require rxjs first so we pick up its polyfilled Symbol.observable
Expand Down
37 changes: 31 additions & 6 deletions spec/asynciterable-operators/debounce-spec.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
import { hasNext, noNext, delayValue } from '../asynciterablehelpers';
import { debounce } from 'ix/asynciterable/operators';
import { as } from 'ix/asynciterable';
import { AbortError } from 'ix/aborterror';

test.skip(
test(
'AsyncIterable#debounce none drop',
async () => {
const xs = async function*() {
const xs = async function* () {
yield await delayValue(1, 100);
yield await delayValue(2, 100);
yield await delayValue(3, 100);
Expand All @@ -21,15 +22,15 @@ test.skip(
10 * 1000
);

test.skip(
test(
'AsyncIterable#debounce some drop',
async () => {
const xs = async function*() {
const xs = async function* () {
yield await delayValue(1, 200);
yield await delayValue(2, 200);
yield await delayValue(2, 400);
yield await delayValue(3, 200);
};
const ys = as(xs()).pipe(debounce(500));
const ys = as(xs()).pipe(debounce(300));

const it = ys[Symbol.asyncIterator]();
await hasNext(it, 1);
Expand All @@ -38,3 +39,27 @@ test.skip(
},
10 * 1000
);

test(
'AsyncIterable#debounce cancels on abort',
async () => {
const xs = async function* () {
yield await delayValue(1, 200);
yield await delayValue(2, 400);
yield await delayValue(3, 200);
};
const ys = as(xs()).pipe(debounce(300));
const controller = new AbortController();
// @ts-ignore
const it = ys[Symbol.asyncIterator](controller.signal);
try {
await hasNext(it, 1);
controller.abort();
await hasNext(it, 3);
} catch (e) {
expect(e).toBeInstanceOf(AbortError);
}
await noNext(it);
},
10 * 1000
);
12 changes: 8 additions & 4 deletions src/asynciterable/_sleep.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,14 @@ export function sleep(dueTime: number, signal?: AbortSignal) {
}, dueTime);

if (signal) {
signal.onabort = () => {
clearTimeout(id);
reject(new AbortError());
};
signal.addEventListener(
'abort',
() => {
clearTimeout(id);
reject(new AbortError());
},
{ once: true }
);
}
});
}
117 changes: 52 additions & 65 deletions src/asynciterable/operators/debounce.ts
Original file line number Diff line number Diff line change
@@ -1,19 +1,8 @@
import { AbortSignal } from '../../abortsignal';
import { AsyncIterableX } from '../asynciterablex';
import { MonoTypeOperatorAsyncFunction } from '../../interfaces';
import { wrapWithAbort } from './withabort';
import { AbortError } from '../../aborterror';

async function forEach<T>(
source: AsyncIterable<T>,
fn: (item: T, signal?: AbortSignal) => void | Promise<void>,
signal?: AbortSignal
): Promise<void> {
for await (const item of wrapWithAbort(source, signal)) {
await fn(item, signal);
}
}

export class DebounceAsyncIterable<TSource> extends AsyncIterableX<TSource> {
private _source: AsyncIterable<TSource>;
private _time: number;
Expand All @@ -25,67 +14,65 @@ export class DebounceAsyncIterable<TSource> extends AsyncIterableX<TSource> {
}

async *[Symbol.asyncIterator](signal?: AbortSignal) {
let noValue: boolean;
let lastItem: TSource | undefined;
let deferred: Promise<TSource>;
let resolver: (value?: TSource | PromiseLike<TSource> | undefined) => void;
let done: boolean = false;
let hasError: boolean = false;
let error: any;
let handle: any;
let done = false;
let reject: (reason?: any) => void = () => {
/**/
};
let resolve: (value?: TSource | PromiseLike<TSource>) => void = () => {
/**/
};
let promise = new Promise<TSource>((r1, r2) => {
resolve = r1;
reject = r2;
});
(async () => {
let id: any = null;

if (signal) {
signal.onabort = () => {
clearTimeout(handle);
hasError = true;
error = new AbortError();
const emitValue = (value: TSource) => {
id = null;
resolve(value);
promise = new Promise<TSource>((r1, r2) => {
resolve = r1;
reject = r2;
});
};
}

const reset = (hasNoValue: boolean) => {
noValue = hasNoValue;
lastItem = undefined;
deferred = new Promise<TSource>((r) => (resolver = r));
};

const run = () => {
if (lastItem === undefined) {
noValue = true;
return;
if (signal) {
signal.addEventListener(
'abort',
() => {
done = true;
if (id) {
clearTimeout(id);
}
id = null;
reject(new AbortError());
},
{ once: true }
);
}

const item = lastItem;
const res = resolver;
reset(false);
handle = setTimeout(run, this._time);
res(item);
};

reset(true);
forEach(
this._source,
(item) => {
lastItem = item;
if (noValue) {
run();
try {
let result: IteratorResult<TSource>;
// @ts-ignore
const it = this._source[Symbol.asyncIterator](signal);
// 1. check `!done`
// 2. await next value
// 3. check `!done` again, in case the signal aborted while the promise was pending
while (!done && !(result = await it.next()).done && !done) {
if (id) {
clearTimeout(id);
}
id = setTimeout(emitValue, this._time, result.value);
}
},
signal
)
.then(() => (done = true))
.catch((err) => {
hasError = true;
error = err;
});

while (1) {
if (done) {
break;
}
if (hasError) {
throw error;
} catch (e) {
reject(e);
}
yield await deferred!;
done = true;
})();

while (!done) {
yield await promise;
}
}
}
Expand Down
4 changes: 2 additions & 2 deletions src/asynciterable/operators/withabort.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@ import { MonoTypeOperatorAsyncFunction } from '../../interfaces';

export class WithAbortAsyncIterable<TSource> extends AsyncIterableX<TSource> {
private _source: AsyncIterable<TSource>;
private _signal?: AbortSignal;
private _signal: AbortSignal;

constructor(source: AsyncIterable<TSource>, signal?: AbortSignal) {
constructor(source: AsyncIterable<TSource>, signal: AbortSignal) {
super();
this._source = source;
this._signal = signal;
Expand Down

0 comments on commit 480996e

Please sign in to comment.