Skip to content

Commit

Permalink
Merge pull request #614 from brendandburns/stop
Browse files Browse the repository at this point in the history
Add a CONNECT event to informer.
  • Loading branch information
k8s-ci-robot committed May 2, 2021
2 parents 965da9b + 7d88ed6 commit f0ee657
Show file tree
Hide file tree
Showing 3 changed files with 168 additions and 46 deletions.
39 changes: 28 additions & 11 deletions src/cache.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,15 @@
import { ADD, CHANGE, DELETE, ERROR, Informer, ListPromise, ObjectCallback, UPDATE } from './informer';
import {
ADD,
CHANGE,
CONNECT,
DELETE,
ERROR,
ErrorCallback,
Informer,
ListPromise,
ObjectCallback,
UPDATE,
} from './informer';
import { KubernetesObject } from './types';
import { RequestResult, Watch } from './watch';

Expand All @@ -11,7 +22,7 @@ export class ListWatch<T extends KubernetesObject> implements ObjectCache<T>, In
private objects: T[] = [];
private resourceVersion: string;
private readonly indexCache: { [key: string]: T[] } = {};
private readonly callbackCache: { [key: string]: Array<ObjectCallback<T>> } = {};
private readonly callbackCache: { [key: string]: Array<ObjectCallback<T> | ErrorCallback> } = {};
private request: RequestResult | undefined;
private stopped: boolean = false;

Expand All @@ -25,6 +36,7 @@ export class ListWatch<T extends KubernetesObject> implements ObjectCache<T>, In
this.callbackCache[UPDATE] = [];
this.callbackCache[DELETE] = [];
this.callbackCache[ERROR] = [];
this.callbackCache[CONNECT] = [];
this.resourceVersion = '';
if (autoStart) {
this.doneHandler(null);
Expand All @@ -41,11 +53,13 @@ export class ListWatch<T extends KubernetesObject> implements ObjectCache<T>, In
this._stop();
}

public on(verb: string, cb: ObjectCallback<T>): void {
public on(verb: 'add' | 'update' | 'delete' | 'change', cb: ObjectCallback<T>): void;
public on(verb: 'error' | 'connect', cb: ErrorCallback): void;
public on(verb: string, cb: any): void {
if (verb === CHANGE) {
this.on(ADD, cb);
this.on(UPDATE, cb);
this.on(DELETE, cb);
this.on('add', cb);
this.on('update', cb);
this.on('delete', cb);
return;
}
if (this.callbackCache[verb] === undefined) {
Expand All @@ -54,11 +68,13 @@ export class ListWatch<T extends KubernetesObject> implements ObjectCache<T>, In
this.callbackCache[verb].push(cb);
}

public off(verb: string, cb: ObjectCallback<T>): void {
public off(verb: 'add' | 'update' | 'delete' | 'change', cb: ObjectCallback<T>): void;
public off(verb: 'error' | 'connect', cb: ErrorCallback): void;
public off(verb: string, cb: any): void {
if (verb === CHANGE) {
this.off(ADD, cb);
this.off(UPDATE, cb);
this.off(DELETE, cb);
this.off('add', cb);
this.off('update', cb);
this.off('delete', cb);
return;
}
if (this.callbackCache[verb] === undefined) {
Expand Down Expand Up @@ -102,13 +118,14 @@ export class ListWatch<T extends KubernetesObject> implements ObjectCache<T>, In
private async doneHandler(err: any): Promise<any> {
this._stop();
if (err) {
this.callbackCache[ERROR].forEach((elt: ObjectCallback<T>) => elt(err));
this.callbackCache[ERROR].forEach((elt: ErrorCallback) => elt(err));
return;
}
if (this.stopped) {
// do not auto-restart
return;
}
this.callbackCache[CONNECT].forEach((elt: ErrorCallback) => elt(undefined));
// TODO: Don't always list here for efficiency
// try to restart the watch from resourceVersion, but detect 410 GONE and relist in that case.
// Or if resourceVersion is empty.
Expand Down
169 changes: 134 additions & 35 deletions src/cache_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import { EventEmitter } from 'ws';

import { V1Namespace, V1NamespaceList, V1ObjectMeta, V1Pod, V1ListMeta } from './api';
import { deleteObject, ListWatch, deleteItems } from './cache';
import { ADD, UPDATE, DELETE, ERROR, ListPromise, CHANGE } from './informer';
import { ListPromise } from './informer';

use(chaiAsPromised);

Expand Down Expand Up @@ -44,7 +44,9 @@ describe('ListWatchCache', () => {
};
const lw = new ListWatch('/some/path', fake, listFn);
const verb = 'FOOBAR';
expect(() => lw.on(verb, (obj: V1Namespace) => {})).to.throw(`Unknown verb: ${verb}`);
// The 'as any' is a hack to get around Typescript which prevents an unknown verb from being
// passed. We want to test for Javascript clients also, where this is possible
expect(() => (lw as any).on(verb, (obj?: V1Namespace) => {})).to.throw(`Unknown verb: ${verb}`);
});

it('should perform basic caching', async () => {
Expand Down Expand Up @@ -216,19 +218,19 @@ describe('ListWatchCache', () => {
expect(pathOut).to.equal('/some/path');

const addPromise = new Promise<V1Namespace>((resolve: (V1Namespace) => void) => {
informer.on(ADD, (obj: V1Namespace) => {
informer.on('add', (obj?: V1Namespace) => {
resolve(obj);
});
});

const updatePromise = new Promise<V1Namespace>((resolve: (V1Namespace) => void) => {
informer.on(UPDATE, (obj: V1Namespace) => {
informer.on('update', (obj?: V1Namespace) => {
resolve(obj);
});
});

const deletePromise = new Promise<V1Namespace>((resolve: (V1Namespace) => void) => {
informer.on(DELETE, (obj: V1Namespace) => {
informer.on('delete', (obj?: V1Namespace) => {
resolve(obj);
});
});
Expand Down Expand Up @@ -310,7 +312,7 @@ describe('ListWatchCache', () => {

let count = 0;
const changePromise = new Promise<boolean>((resolve: (V1Namespace) => void) => {
informer.on(CHANGE, (obj: V1Namespace) => {
informer.on('change', (obj?: V1Namespace) => {
count++;
if (count == 3) {
resolve(true);
Expand Down Expand Up @@ -372,13 +374,13 @@ describe('ListWatchCache', () => {
expect(pathOut).to.equal('/some/path');

const addPromise = new Promise<V1Namespace>((resolve: (V1Namespace) => void) => {
informer.on(ADD, (obj: V1Namespace) => {
informer.on('add', (obj?: V1Namespace) => {
resolve(obj);
});
});

const addPromise2 = new Promise<V1Namespace>((resolve: (V1Namespace) => void) => {
informer.on(ADD, (obj: V1Namespace) => {
informer.on('add', (obj?: V1Namespace) => {
resolve(obj);
});
});
Expand Down Expand Up @@ -442,9 +444,9 @@ describe('ListWatchCache', () => {
const informer = new ListWatch('/some/path', mock.instance(fakeWatch), listFn, false);

const addObjects: V1Namespace[] = [];
informer.on(ADD, (obj: V1Namespace) => addObjects.push(obj));
informer.on('add', (obj?: V1Namespace) => addObjects.push(obj!));
const updateObjects: V1Namespace[] = [];
informer.on(UPDATE, (obj: V1Namespace) => updateObjects.push(obj));
informer.on('update', (obj?: V1Namespace) => updateObjects.push(obj!));

informer.start();
await promise;
Expand Down Expand Up @@ -518,11 +520,11 @@ describe('ListWatchCache', () => {
const informer = new ListWatch('/some/path', mock.instance(fakeWatch), listFn, false);

const addObjects: V1Namespace[] = [];
informer.on(ADD, (obj: V1Namespace) => addObjects.push(obj));
informer.on('add', (obj?: V1Namespace) => addObjects.push(obj!));
const updateObjects: V1Namespace[] = [];
informer.on(UPDATE, (obj: V1Namespace) => updateObjects.push(obj));
informer.on('update', (obj?: V1Namespace) => updateObjects.push(obj!));
const deleteObjects: V1Namespace[] = [];
informer.on(DELETE, (obj: V1Namespace) => deleteObjects.push(obj));
informer.on('delete', (obj?: V1Namespace) => deleteObjects.push(obj!));
informer.start();
await promise;
const [pathOut, , , doneHandler] = mock.capture(fakeWatch.watch).last();
Expand Down Expand Up @@ -716,29 +718,29 @@ describe('ListWatchCache', () => {
const informer = new ListWatch('/some/path', mock.instance(fakeWatch), listFn);

const addedList1: V1Namespace[] = [];
const addToList1Fn = function(obj: V1Namespace) {
addedList1.push(obj);
const addToList1Fn = function(obj?: V1Namespace) {
addedList1.push(obj!);
};
const addedList2: V1Namespace[] = [];
const addToList2Fn = function(obj: V1Namespace) {
addedList2.push(obj);
const addToList2Fn = function(obj?: V1Namespace) {
addedList2.push(obj!);
};

informer.start();

await watchCalled;
const [, , watchHandler] = mock.capture(fakeWatch.watch).last();

informer.on(ADD, addToList1Fn);
informer.on(ADD, addToList2Fn);
informer.on('add', addToList1Fn);
informer.on('add', addToList2Fn);

watchHandler('ADDED', {
metadata: {
name: 'name1',
} as V1ObjectMeta,
} as V1Namespace);

informer.off(ADD, addToList2Fn);
informer.off('add', addToList2Fn);

watchHandler('ADDED', {
metadata: {
Expand Down Expand Up @@ -775,20 +777,20 @@ describe('ListWatchCache', () => {
const informer = new ListWatch('/some/path', mock.instance(fakeWatch), listFn);

const addedList: V1Namespace[] = [];
const addToListFn = function(obj: V1Namespace) {
addedList.push(obj);
const addToListFn = function(obj?: V1Namespace) {
addedList.push(obj!);
};
const removeSelf = function() {
informer.off(ADD, removeSelf);
informer.off('add', removeSelf);
};

informer.start();

await watchCalled;
const [, , watchHandler] = mock.capture(fakeWatch.watch).last();

informer.on(ADD, removeSelf);
informer.on(ADD, addToListFn);
informer.on('add', removeSelf);
informer.on('add', addToListFn);

watchHandler('ADDED', {
metadata: {
Expand Down Expand Up @@ -863,12 +865,12 @@ describe('ListWatchCache', () => {
const informer = new ListWatch('/some/path', mock.instance(fakeWatch), listFn);

const addedList1: V1Namespace[] = [];
const addToList1Fn = function(obj: V1Namespace) {
addedList1.push(obj);
const addToList1Fn = function(obj?: V1Namespace) {
addedList1.push(obj!);
};
const addedList2: V1Namespace[] = [];
const addToList2Fn = function(obj: V1Namespace) {
addedList2.push(obj);
const addToList2Fn = function(obj?: V1Namespace) {
addedList2.push(obj!);
};

informer.start();
Expand All @@ -877,9 +879,9 @@ describe('ListWatchCache', () => {
const [, , watchHandler] = mock.capture(fakeWatch.watch).last();

let adds = 0;
informer.on(ADD, () => adds++);
informer.on(ADD, addToList1Fn);
informer.on(ADD, addToList2Fn);
informer.on('add', () => adds++);
informer.on('add', addToList1Fn);
informer.on('add', addToList2Fn);

watchHandler('ADDED', {
metadata: {
Expand All @@ -888,7 +890,7 @@ describe('ListWatchCache', () => {
} as V1ObjectMeta,
} as V1Namespace);

informer.off(ADD, addToList2Fn);
informer.off('add', addToList2Fn);

watchHandler('ADDED', {
metadata: {
Expand Down Expand Up @@ -1010,7 +1012,7 @@ describe('ListWatchCache', () => {
await promise;

let errorEmitted = false;
cache.on(ERROR, () => (errorEmitted = true));
cache.on('error', () => (errorEmitted = true));

const [, , , doneHandler] = mock.capture(fakeWatch.watch).last();

Expand Down Expand Up @@ -1106,7 +1108,104 @@ describe('delete items', () => {
];
const pods: V1Pod[] = [];

deleteItems(listA, listB, [(obj: V1Pod) => pods.push(obj)]);
deleteItems(listA, listB, [(obj?: V1Pod) => pods.push(obj!)]);
expect(pods).to.deep.equal(expected);
});

it('should call the connect handler', async () => {
const fakeWatch = mock.mock(Watch);
const listObj = {
metadata: {
resourceVersion: '12345',
} as V1ListMeta,
items: [],
} as V1NamespaceList;

const listFn: ListPromise<V1Namespace> = function(): Promise<{
response: http.IncomingMessage;
body: V1NamespaceList;
}> {
return new Promise<{ response: http.IncomingMessage; body: V1NamespaceList }>(
(resolve, reject) => {
resolve({ response: {} as http.IncomingMessage, body: listObj });
},
);
};
const informer = new ListWatch('/some/path', mock.instance(fakeWatch), listFn, false);
const connectPromise = new Promise<boolean>((resolve: (boolean) => void) => {
informer.on('connect', (obj?: V1Namespace) => {
resolve(true);
});
});
informer.start();

expect(connectPromise).to.eventually.be.true;
});

it('does calls connect after a restart after an error', async () => {
const fakeWatch = mock.mock(Watch);
const list: V1Pod[] = [
{
metadata: {
name: 'name1',
namespace: 'ns1',
} as V1ObjectMeta,
} as V1Pod,
{
metadata: {
name: 'name2',
namespace: 'ns2',
} as V1ObjectMeta,
} as V1Pod,
];
const listObj = {
metadata: {
resourceVersion: '12345',
} as V1ListMeta,
items: list,
} as V1NamespaceList;

const listFn: ListPromise<V1Namespace> = function(): Promise<{
response: http.IncomingMessage;
body: V1NamespaceList;
}> {
return new Promise<{ response: http.IncomingMessage; body: V1NamespaceList }>(
(resolve, reject) => {
resolve({ response: {} as http.IncomingMessage, body: listObj });
},
);
};
let promise = new Promise((resolve) => {
mock.when(
fakeWatch.watch(mock.anything(), mock.anything(), mock.anything(), mock.anything()),
).thenCall(() => {
resolve(new FakeRequest());
});
});

const cache = new ListWatch('/some/path', mock.instance(fakeWatch), listFn);
await promise;

let errorEmitted = false;
cache.on('error', () => (errorEmitted = true));

const [, , , doneHandler] = mock.capture(fakeWatch.watch).last();

const error = new Error('testing');
await doneHandler(error);

mock.verify(
fakeWatch.watch(mock.anything(), mock.anything(), mock.anything(), mock.anything()),
).once();
expect(errorEmitted).to.equal(true);

const connectPromise = new Promise<boolean>((resolve: (boolean) => void) => {
cache.on('connect', (obj?: V1Namespace) => {
resolve(true);
});
});
cache.start();

expect(connectPromise).to.eventually.be.true;
});
});
Loading

0 comments on commit f0ee657

Please sign in to comment.