Skip to content

Commit

Permalink
Updates.
Browse files Browse the repository at this point in the history
  • Loading branch information
brendandburns committed Apr 23, 2021
1 parent 0cc04fe commit 463bb00
Show file tree
Hide file tree
Showing 3 changed files with 38 additions and 34 deletions.
17 changes: 10 additions & 7 deletions src/cache.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import {
CONNECT,
DELETE,
ERROR,
ErrorCallback,
Informer,
ListPromise,
ObjectCallback,
Expand All @@ -21,7 +22,7 @@ export class ListWatch<T extends KubernetesObject> implements ObjectCache<T>, In
private objects: T[] = [];
private resourceVersion: string;
private readonly indexCache: { [key: string]: T[] } = {};
private readonly callbackCache: { [key: string]: Array<ObjectCallback<T>> } = {};
private readonly callbackCache: { [key: string]: Array<ObjectCallback<T> | ErrorCallback> } = {};
private request: RequestResult | undefined;
private stopped: boolean = false;

Expand Down Expand Up @@ -65,11 +66,13 @@ export class ListWatch<T extends KubernetesObject> implements ObjectCache<T>, In
this.callbackCache[verb].push(cb);
}

public off(verb: string, cb: ObjectCallback<T>): void {
public off(verb: 'add' | 'update' | 'delete', cb: ObjectCallback<T>): void;
public off(verb: 'error' | 'connect', cb: ErrorCallback): void;
public off(verb: string, cb: any): void {
if (verb === CHANGE) {
this.off(ADD, cb);
this.off(UPDATE, cb);
this.off(DELETE, cb);
this.off('add', cb);
this.off('update', cb);
this.off('delete', cb);
return;
}
if (this.callbackCache[verb] === undefined) {
Expand Down Expand Up @@ -113,14 +116,14 @@ export class ListWatch<T extends KubernetesObject> implements ObjectCache<T>, In
private async doneHandler(err: any): Promise<any> {
this._stop();
if (err) {
this.callbackCache[ERROR].forEach((elt: ObjectCallback<T>) => elt(undefined, err));
this.callbackCache[ERROR].forEach((elt: ErrorCallback) => elt(err));
return;
}
if (this.stopped) {
// do not auto-restart
return;
}
this.callbackCache[CONNECT].forEach((elt: ObjectCallback<T>) => elt());
this.callbackCache[CONNECT].forEach((elt: ErrorCallback) => elt(undefined));
// TODO: Don't always list here for efficiency
// try to restart the watch from resourceVersion, but detect 410 GONE and relist in that case.
// Or if resourceVersion is empty.
Expand Down
52 changes: 26 additions & 26 deletions src/cache_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import { EventEmitter } from 'ws';

import { V1Namespace, V1NamespaceList, V1ObjectMeta, V1Pod, V1ListMeta } from './api';
import { deleteObject, ListWatch, deleteItems } from './cache';
import { ADD, UPDATE, DELETE, ERROR, ListPromise, CHANGE, CONNECT } from './informer';
import { ListPromise } from './informer';

use(chaiAsPromised);

Expand Down Expand Up @@ -216,19 +216,19 @@ describe('ListWatchCache', () => {
expect(pathOut).to.equal('/some/path');

const addPromise = new Promise<V1Namespace>((resolve: (V1Namespace) => void) => {
informer.on(ADD, (obj?: V1Namespace) => {
informer.on('add', (obj?: V1Namespace) => {
resolve(obj);
});
});

const updatePromise = new Promise<V1Namespace>((resolve: (V1Namespace) => void) => {
informer.on(UPDATE, (obj?: V1Namespace) => {
informer.on('update', (obj?: V1Namespace) => {
resolve(obj);
});
});

const deletePromise = new Promise<V1Namespace>((resolve: (V1Namespace) => void) => {
informer.on(DELETE, (obj?: V1Namespace) => {
informer.on('delete', (obj?: V1Namespace) => {
resolve(obj);
});
});
Expand Down Expand Up @@ -310,7 +310,7 @@ describe('ListWatchCache', () => {

let count = 0;
const changePromise = new Promise<boolean>((resolve: (V1Namespace) => void) => {
informer.on(CHANGE, (obj?: V1Namespace) => {
informer.on('change', (obj?: V1Namespace) => {
count++;
if (count == 3) {
resolve(true);
Expand Down Expand Up @@ -372,13 +372,13 @@ describe('ListWatchCache', () => {
expect(pathOut).to.equal('/some/path');

const addPromise = new Promise<V1Namespace>((resolve: (V1Namespace) => void) => {
informer.on(ADD, (obj?: V1Namespace) => {
informer.on('add', (obj?: V1Namespace) => {
resolve(obj);
});
});

const addPromise2 = new Promise<V1Namespace>((resolve: (V1Namespace) => void) => {
informer.on(ADD, (obj?: V1Namespace) => {
informer.on('add', (obj?: V1Namespace) => {
resolve(obj);
});
});
Expand Down Expand Up @@ -442,9 +442,9 @@ describe('ListWatchCache', () => {
const informer = new ListWatch('/some/path', mock.instance(fakeWatch), listFn, false);

const addObjects: V1Namespace[] = [];
informer.on(ADD, (obj?: V1Namespace) => addObjects.push(obj!));
informer.on('add', (obj?: V1Namespace) => addObjects.push(obj!));
const updateObjects: V1Namespace[] = [];
informer.on(UPDATE, (obj?: V1Namespace) => updateObjects.push(obj!));
informer.on('update', (obj?: V1Namespace) => updateObjects.push(obj!));

informer.start();
await promise;
Expand Down Expand Up @@ -518,11 +518,11 @@ describe('ListWatchCache', () => {
const informer = new ListWatch('/some/path', mock.instance(fakeWatch), listFn, false);

const addObjects: V1Namespace[] = [];
informer.on(ADD, (obj?: V1Namespace) => addObjects.push(obj!));
informer.on('add', (obj?: V1Namespace) => addObjects.push(obj!));
const updateObjects: V1Namespace[] = [];
informer.on(UPDATE, (obj?: V1Namespace) => updateObjects.push(obj!));
informer.on('update', (obj?: V1Namespace) => updateObjects.push(obj!));
const deleteObjects: V1Namespace[] = [];
informer.on(DELETE, (obj?: V1Namespace) => deleteObjects.push(obj!));
informer.on('delete', (obj?: V1Namespace) => deleteObjects.push(obj!));
informer.start();
await promise;
const [pathOut, , , doneHandler] = mock.capture(fakeWatch.watch).last();
Expand Down Expand Up @@ -729,16 +729,16 @@ describe('ListWatchCache', () => {
await watchCalled;
const [, , watchHandler] = mock.capture(fakeWatch.watch).last();

informer.on(ADD, addToList1Fn);
informer.on(ADD, addToList2Fn);
informer.on('add', addToList1Fn);
informer.on('add', addToList2Fn);

watchHandler('ADDED', {
metadata: {
name: 'name1',
} as V1ObjectMeta,
} as V1Namespace);

informer.off(ADD, addToList2Fn);
informer.off('add', addToList2Fn);

watchHandler('ADDED', {
metadata: {
Expand Down Expand Up @@ -779,16 +779,16 @@ describe('ListWatchCache', () => {
addedList.push(obj!);
};
const removeSelf = function() {
informer.off(ADD, removeSelf);
informer.off('add', removeSelf);
};

informer.start();

await watchCalled;
const [, , watchHandler] = mock.capture(fakeWatch.watch).last();

informer.on(ADD, removeSelf);
informer.on(ADD, addToListFn);
informer.on('add', removeSelf);
informer.on('add', addToListFn);

watchHandler('ADDED', {
metadata: {
Expand Down Expand Up @@ -877,9 +877,9 @@ describe('ListWatchCache', () => {
const [, , watchHandler] = mock.capture(fakeWatch.watch).last();

let adds = 0;
informer.on(ADD, () => adds++);
informer.on(ADD, addToList1Fn);
informer.on(ADD, addToList2Fn);
informer.on('add', () => adds++);
informer.on('add', addToList1Fn);
informer.on('add', addToList2Fn);

watchHandler('ADDED', {
metadata: {
Expand All @@ -888,7 +888,7 @@ describe('ListWatchCache', () => {
} as V1ObjectMeta,
} as V1Namespace);

informer.off(ADD, addToList2Fn);
informer.off('add', addToList2Fn);

watchHandler('ADDED', {
metadata: {
Expand Down Expand Up @@ -1010,7 +1010,7 @@ describe('ListWatchCache', () => {
await promise;

let errorEmitted = false;
cache.on(ERROR, () => (errorEmitted = true));
cache.on('error', () => (errorEmitted = true));

const [, , , doneHandler] = mock.capture(fakeWatch.watch).last();

Expand Down Expand Up @@ -1131,7 +1131,7 @@ describe('delete items', () => {
};
const informer = new ListWatch('/some/path', mock.instance(fakeWatch), listFn, false);
const connectPromise = new Promise<boolean>((resolve: (boolean) => void) => {
informer.on(CONNECT, (obj?: V1Namespace) => {
informer.on('connect', (obj?: V1Namespace) => {
resolve(true);
});
});
Expand Down Expand Up @@ -1185,7 +1185,7 @@ describe('delete items', () => {
await promise;

let errorEmitted = false;
cache.on(ERROR, () => (errorEmitted = true));
cache.on('error', () => (errorEmitted = true));

const [, , , doneHandler] = mock.capture(fakeWatch.watch).last();

Expand All @@ -1198,7 +1198,7 @@ describe('delete items', () => {
expect(errorEmitted).to.equal(true);

const connectPromise = new Promise<boolean>((resolve: (boolean) => void) => {
cache.on(CONNECT, (obj?: V1Namespace) => {
cache.on('connect', (obj?: V1Namespace) => {
resolve(true);
});
});
Expand Down
3 changes: 2 additions & 1 deletion src/informer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@ import { Watch } from './watch';

import http = require('http');

export type ObjectCallback<T extends KubernetesObject> = (obj?: T, err?: any) => void;
export type ObjectCallback<T extends KubernetesObject> = (obj: T) => void;
export type ErrorCallback = (err?: any) => void;
export type ListCallback<T extends KubernetesObject> = (list: T[], ResourceVersion: string) => void;
export type ListPromise<T extends KubernetesObject> = () => Promise<{
response: http.IncomingMessage;
Expand Down

0 comments on commit 463bb00

Please sign in to comment.