Skip to content

Commit

Permalink
Only run the provided delayed operation early (#3101)
Browse files Browse the repository at this point in the history
  • Loading branch information
schmidt-sebastian authored May 26, 2020
1 parent 5ffa43b commit ea2fcf5
Show file tree
Hide file tree
Showing 14 changed files with 50 additions and 64 deletions.
2 changes: 2 additions & 0 deletions packages/firestore/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
# Unreleased
- [fixed] Fixed an issue that could cause Firestore to temporarily go
offline when a Window visibility event occurred.
- [feature] Added support for calling `FirebaseFiresore.settings` with
`{ ignoreUndefinedProperties: true }`. When set, Firestore ignores
undefined properties inside objects rather than rejecting the API call.
Expand Down
2 changes: 1 addition & 1 deletion packages/firestore/src/local/index_free_query_engine.ts
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ export class IndexFreeQueryEngine implements QueryEngine {
if (getLogLevel() <= LogLevel.DEBUG) {
logDebug(
'IndexFreeQueryEngine',
'Using full collection scan to execute query: %s',
'Using full collection scan to execute query:',
query.toString()
);
}
Expand Down
5 changes: 2 additions & 3 deletions packages/firestore/src/local/indexeddb_persistence.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,9 @@ import { DocumentKey } from '../model/document_key';
import { Platform } from '../platform/platform';
import { JsonProtoSerializer } from '../remote/serializer';
import { debugAssert, fail } from '../util/assert';
import { AsyncQueue, TimerId } from '../util/async_queue';
import { AsyncQueue, DelayedOperation, TimerId } from '../util/async_queue';
import { Code, FirestoreError } from '../util/error';
import { logDebug, logError } from '../util/log';
import { CancelablePromise } from '../util/promise';
import {
decodeResourcePath,
EncodedResourcePath,
Expand Down Expand Up @@ -215,7 +214,7 @@ export class IndexedDbPersistence implements Persistence {
private documentVisibilityHandler: ((e?: Event) => void) | null = null;

/** The client metadata refresh task. */
private clientMetadataRefresher: CancelablePromise<void> | null = null;
private clientMetadataRefresher: DelayedOperation<void> | null = null;

/** The last time we garbage collected the client metadata object store. */
private lastGarbageCollectionTime = Number.NEGATIVE_INFINITY;
Expand Down
5 changes: 2 additions & 3 deletions packages/firestore/src/local/lru_garbage_collector.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,9 @@
import { ListenSequence } from '../core/listen_sequence';
import { ListenSequenceNumber, TargetId } from '../core/types';
import { debugAssert } from '../util/assert';
import { AsyncQueue, TimerId } from '../util/async_queue';
import { AsyncQueue, DelayedOperation, TimerId } from '../util/async_queue';
import { getLogLevel, logDebug, LogLevel } from '../util/log';
import { primitiveComparator } from '../util/misc';
import { CancelablePromise } from '../util/promise';
import { SortedMap } from '../util/sorted_map';
import { SortedSet } from '../util/sorted_set';
import { ignoreIfPrimaryLeaseLoss, LocalStore } from './local_store';
Expand Down Expand Up @@ -222,7 +221,7 @@ const REGULAR_GC_DELAY_MS = 5 * 60 * 1000;
*/
export class LruScheduler implements GarbageCollectionScheduler {
private hasRun: boolean = false;
private gcTask: CancelablePromise<void> | null;
private gcTask: DelayedOperation<void> | null;

constructor(
private readonly garbageCollector: LruGarbageCollector,
Expand Down
13 changes: 10 additions & 3 deletions packages/firestore/src/remote/backoff.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@
* limitations under the License.
*/

import { AsyncQueue, TimerId } from '../util/async_queue';
import { AsyncQueue, DelayedOperation, TimerId } from '../util/async_queue';
import { logDebug } from '../util/log';
import { CancelablePromise } from '../util/promise';

const LOG_TAG = 'ExponentialBackoff';

/**
Expand All @@ -42,7 +42,7 @@ const DEFAULT_BACKOFF_MAX_DELAY_MS = 60 * 1000;
*/
export class ExponentialBackoff {
private currentBaseMs: number = 0;
private timerPromise: CancelablePromise<void> | null = null;
private timerPromise: DelayedOperation<void> | null = null;
/** The last backoff attempt, as epoch milliseconds. */
private lastAttemptTime = Date.now();

Expand Down Expand Up @@ -149,6 +149,13 @@ export class ExponentialBackoff {
}
}

skipBackoff(): void {
if (this.timerPromise !== null) {
this.timerPromise.skipDelay();
this.timerPromise = null;
}
}

cancel(): void {
if (this.timerPromise !== null) {
this.timerPromise.cancel();
Expand Down
5 changes: 2 additions & 3 deletions packages/firestore/src/remote/online_state_tracker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,9 @@

import { OnlineState } from '../core/types';
import { debugAssert } from '../util/assert';
import { AsyncQueue, TimerId } from '../util/async_queue';
import { AsyncQueue, DelayedOperation, TimerId } from '../util/async_queue';
import { FirestoreError } from '../util/error';
import { logError, logDebug } from '../util/log';
import { CancelablePromise } from '../util/promise';

const LOG_TAG = 'OnlineStateTracker';

Expand Down Expand Up @@ -64,7 +63,7 @@ export class OnlineStateTracker {
* transition from OnlineState.Unknown to OnlineState.Offline without waiting
* for the stream to actually fail (MAX_WATCH_STREAM_FAILURES times).
*/
private onlineStateTimer: CancelablePromise<void> | null = null;
private onlineStateTimer: DelayedOperation<void> | null = null;

/**
* Whether the client should log a warning message if it fails to connect to
Expand Down
5 changes: 2 additions & 3 deletions packages/firestore/src/remote/persistent_stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,10 @@ import { TargetData } from '../local/target_data';
import { Mutation, MutationResult } from '../model/mutation';
import * as api from '../protos/firestore_proto_api';
import { hardAssert, debugAssert } from '../util/assert';
import { AsyncQueue, TimerId } from '../util/async_queue';
import { AsyncQueue, DelayedOperation, TimerId } from '../util/async_queue';
import { Code, FirestoreError } from '../util/error';
import { logError, logDebug } from '../util/log';

import { CancelablePromise } from '../util/promise';
import { isNullOrUndefined } from '../util/types';
import { ExponentialBackoff } from './backoff';
import { Connection, Stream } from './connection';
Expand Down Expand Up @@ -164,7 +163,7 @@ export abstract class PersistentStream<
*/
private closeCount = 0;

private idleTimer: CancelablePromise<void> | null = null;
private idleTimer: DelayedOperation<void> | null = null;
private stream: Stream<SendType, ReceiveType> | null = null;

protected backoff: ExponentialBackoff;
Expand Down
27 changes: 12 additions & 15 deletions packages/firestore/src/util/async_queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
import { debugAssert, fail } from './assert';
import { Code, FirestoreError } from './error';
import { logDebug, logError } from './log';
import { CancelablePromise, Deferred } from './promise';
import { Deferred } from './promise';
import { ExponentialBackoff } from '../remote/backoff';
import { PlatformSupport } from '../platform/platform';
import { isIndexedDbTransactionError } from '../local/simple_db';
Expand Down Expand Up @@ -86,8 +86,12 @@ export const enum TimerId {
* It is created via DelayedOperation.createAndSchedule().
*
* Supports cancellation (via cancel()) and early execution (via skipDelay()).
*
* Note: We implement `PromiseLike` instead of `Promise`, as the `Promise` type
* in newer versions of TypeScript defines `finally`, which is not available in
* IE.
*/
class DelayedOperation<T extends unknown> implements CancelablePromise<T> {
export class DelayedOperation<T extends unknown> implements PromiseLike<T> {
// handle for use with clearTimeout(), or null if the operation has been
// executed or canceled already.
private timerHandle: TimerHandle | null;
Expand Down Expand Up @@ -175,10 +179,7 @@ class DelayedOperation<T extends unknown> implements CancelablePromise<T> {
}
}

// Promise implementation.
readonly [Symbol.toStringTag]: 'Promise';
then = this.deferred.promise.then.bind(this.deferred.promise);
catch = this.deferred.promise.catch.bind(this.deferred.promise);

private handleDelayElapsed(): void {
this.asyncQueue.enqueueAndForget(() => {
Expand Down Expand Up @@ -234,10 +235,7 @@ export class AsyncQueue {
// Visibility handler that triggers an immediate retry of all retryable
// operations. Meant to speed up recovery when we regain file system access
// after page comes into foreground.
private visibilityHandler = (): void => {
// eslint-disable-next-line @typescript-eslint/no-floating-promises
this.runDelayedOperationsEarly(TimerId.AsyncQueueRetry);
};
private visibilityHandler = (): void => this.backoff.skipBackoff();

constructor() {
const window = PlatformSupport.getPlatform().window;
Expand Down Expand Up @@ -379,14 +377,14 @@ export class AsyncQueue {

/**
* Schedules an operation to be queued on the AsyncQueue once the specified
* `delayMs` has elapsed. The returned CancelablePromise can be used to cancel
* the operation prior to its running.
* `delayMs` has elapsed. The returned DelayedOperation can be used to cancel
* or fast-forward the operation prior to its running.
*/
enqueueAfterDelay<T extends unknown>(
timerId: TimerId,
delayMs: number,
op: () => Promise<T>
): CancelablePromise<T> {
): DelayedOperation<T> {
this.verifyNotFailed();

debugAssert(
Expand Down Expand Up @@ -466,11 +464,10 @@ export class AsyncQueue {
* For Tests: Runs some or all delayed operations early.
*
* @param lastTimerId Delayed operations up to and including this TimerId will
* be drained. Throws if no such operation exists. Pass TimerId.All to run
* all delayed operations.
* be drained. Pass TimerId.All to run all delayed operations.
* @returns a Promise that resolves once all operations have been run.
*/
runDelayedOperationsEarly(lastTimerId: TimerId): Promise<void> {
runAllDelayedOperationsUntil(lastTimerId: TimerId): Promise<void> {
// Note that draining may generate more delayed ops, so we do that first.
return this.drain().then(() => {
// Run ops in the same order they'd run if they ran naturally.
Expand Down
18 changes: 0 additions & 18 deletions packages/firestore/src/util/promise.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,24 +23,6 @@ export interface Rejecter {
(reason?: Error): void;
}

export interface CancelablePromise<T> {
// We are not extending Promise, since Node's Promise API require us to
// implement 'finally', which is not fully supported on Web.
then<TResult1 = T, TResult2 = never>(
onfulfilled?:
| ((value: T) => TResult1 | PromiseLike<TResult1>)
| undefined
| null,
onrejected?: // eslint-disable-next-line @typescript-eslint/no-explicit-any
((reason: any) => TResult2 | PromiseLike<TResult2>) | undefined | null
): Promise<TResult1 | TResult2>;
catch<TResult = never>(
onrejected?: // eslint-disable-next-line @typescript-eslint/no-explicit-any
((reason: any) => TResult | PromiseLike<TResult>) | undefined | null
): Promise<T | TResult>;
cancel(): void;
}

export class Deferred<R> {
promise: Promise<R>;
// Assigned synchronously in constructor by Promise constructor callback.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ apiDescribe('Idle Timeout', (persistence: boolean) => {
return docRef
.set({ foo: 'bar' })
.then(() => {
return asyncQueue(db).runDelayedOperationsEarly(
return asyncQueue(db).runAllDelayedOperationsUntil(
TimerId.WriteStreamIdle
);
})
Expand All @@ -53,7 +53,7 @@ apiDescribe('Idle Timeout', (persistence: boolean) => {

return awaitOnlineSnapshot()
.then(() => {
return asyncQueue(db).runDelayedOperationsEarly(
return asyncQueue(db).runAllDelayedOperationsUntil(
TimerId.ListenStreamIdle
);
})
Expand Down
4 changes: 2 additions & 2 deletions packages/firestore/test/integration/remote/stream.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ describe('Write Stream', () => {
expect(queue.containsDelayedOperation(TimerId.WriteStreamIdle)).to.be
.true;
return Promise.all([
queue.runDelayedOperationsEarly(TimerId.WriteStreamIdle),
queue.runAllDelayedOperationsUntil(TimerId.WriteStreamIdle),
streamListener.awaitCallback('close')
]);
})
Expand All @@ -229,7 +229,7 @@ describe('Write Stream', () => {
writeStream.writeMutations(SINGLE_MUTATION);
await streamListener.awaitCallback('mutationResult');

await queue.runDelayedOperationsEarly(TimerId.All);
await queue.runAllDelayedOperationsUntil(TimerId.All);
expect(writeStream.isOpen()).to.be.true;
});
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1185,7 +1185,7 @@ describe('IndexedDb: allowTabSynchronization', () => {
it('ignores intermittent IndexedDbTransactionError during lease refresh', async () => {
await withPersistence('clientA', async (db, _, queue) => {
db.injectFailures = ['updateClientMetadataAndTryBecomePrimary'];
await queue.runDelayedOperationsEarly(TimerId.ClientMetadataRefresh);
await queue.runAllDelayedOperationsUntil(TimerId.ClientMetadataRefresh);
await queue.enqueue(() => {
db.injectFailures = [];
return db.runTransaction('check success', 'readwrite-primary', () =>
Expand Down
10 changes: 6 additions & 4 deletions packages/firestore/test/unit/specs/spec_test_runner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -380,7 +380,7 @@ abstract class TestRunner {
TimerId.ListenStreamConnectionBackoff
)
) {
await this.queue.runDelayedOperationsEarly(
await this.queue.runAllDelayedOperationsUntil(
TimerId.ListenStreamConnectionBackoff
);
}
Expand Down Expand Up @@ -605,7 +605,7 @@ abstract class TestRunner {
);
// The watch stream should re-open if we have active listeners.
if (spec.runBackoffTimer && !this.queryListeners.isEmpty()) {
await this.queue.runDelayedOperationsEarly(
await this.queue.runAllDelayedOperationsUntil(
TimerId.ListenStreamConnectionBackoff
);
await this.connection.waitForWatchOpen();
Expand Down Expand Up @@ -656,7 +656,7 @@ abstract class TestRunner {
// not, then there won't be a matching item on the queue and
// runDelayedOperationsEarly() will throw.
const timerId = timer as TimerId;
await this.queue.runDelayedOperationsEarly(timerId);
await this.queue.runAllDelayedOperationsUntil(timerId);
}

private async doDisableNetwork(): Promise<void> {
Expand Down Expand Up @@ -706,7 +706,9 @@ abstract class TestRunner {

if (state.primary) {
await clearCurrentPrimaryLease();
await this.queue.runDelayedOperationsEarly(TimerId.ClientMetadataRefresh);
await this.queue.runAllDelayedOperationsUntil(
TimerId.ClientMetadataRefresh
);
}

return Promise.resolve();
Expand Down
12 changes: 6 additions & 6 deletions packages/firestore/test/unit/util/async_queue.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import * as chaiAsPromised from 'chai-as-promised';
import { expect, use } from 'chai';
import { AsyncQueue, TimerId } from '../../../src/util/async_queue';
import { Code } from '../../../src/util/error';
import { getLogLevel, setLogLevel, LogLevel } from '../../../src/util/log';
import { getLogLevel, LogLevel, setLogLevel } from '../../../src/util/log';
import { Deferred, Rejecter, Resolver } from '../../../src/util/promise';
import { fail } from '../../../src/util/assert';
import { IndexedDbTransactionError } from '../../../src/local/simple_db';
Expand Down Expand Up @@ -171,7 +171,7 @@ describe('AsyncQueue', () => {
err => expect(err.code === Code.CANCELLED)
);

await queue.runDelayedOperationsEarly(TimerId.All);
await queue.runAllDelayedOperationsUntil(TimerId.All);
expect(completedSteps).to.deep.equal([1]);
});

Expand All @@ -187,7 +187,7 @@ describe('AsyncQueue', () => {
queue.enqueueAfterDelay(timerId2, 10000, () => doStep(3));
queue.enqueueAndForget(() => doStep(2));

await queue.runDelayedOperationsEarly(TimerId.All);
await queue.runAllDelayedOperationsUntil(TimerId.All);
expect(completedSteps).to.deep.equal([1, 2, 3, 4]);
});

Expand All @@ -205,7 +205,7 @@ describe('AsyncQueue', () => {
queue.enqueueAfterDelay(timerId3, 15000, () => doStep(4));
queue.enqueueAndForget(() => doStep(2));

await queue.runDelayedOperationsEarly(timerId3);
await queue.runAllDelayedOperationsUntil(timerId3);
expect(completedSteps).to.deep.equal([1, 2, 3, 4]);
});

Expand All @@ -223,7 +223,7 @@ describe('AsyncQueue', () => {
);
}
});
await queue.runDelayedOperationsEarly(TimerId.AsyncQueueRetry);
await queue.runAllDelayedOperationsUntil(TimerId.AsyncQueueRetry);
expect(completedSteps).to.deep.equal([1, 1]);
});

Expand Down Expand Up @@ -279,7 +279,7 @@ describe('AsyncQueue', () => {
expect(completedSteps).to.deep.equal([1]);

// Fast forward all operations
await queue.runDelayedOperationsEarly(TimerId.AsyncQueueRetry);
await queue.runAllDelayedOperationsUntil(TimerId.AsyncQueueRetry);
expect(completedSteps).to.deep.equal([1, 1]);
});

Expand Down

0 comments on commit ea2fcf5

Please sign in to comment.