Skip to content

Commit

Permalink
Add a CONNECT event to informer.
Browse files Browse the repository at this point in the history
  • Loading branch information
brendanburns committed Mar 24, 2021
1 parent d18e69d commit 0cc04fe
Show file tree
Hide file tree
Showing 3 changed files with 141 additions and 27 deletions.
16 changes: 14 additions & 2 deletions src/cache.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,14 @@
import { ADD, CHANGE, DELETE, ERROR, Informer, ListPromise, ObjectCallback, UPDATE } from './informer';
import {
ADD,
CHANGE,
CONNECT,
DELETE,
ERROR,
Informer,
ListPromise,
ObjectCallback,
UPDATE,
} from './informer';
import { KubernetesObject } from './types';
import { RequestResult, Watch } from './watch';

Expand All @@ -25,6 +35,7 @@ export class ListWatch<T extends KubernetesObject> implements ObjectCache<T>, In
this.callbackCache[UPDATE] = [];
this.callbackCache[DELETE] = [];
this.callbackCache[ERROR] = [];
this.callbackCache[CONNECT] = [];
this.resourceVersion = '';
if (autoStart) {
this.doneHandler(null);
Expand Down Expand Up @@ -102,13 +113,14 @@ export class ListWatch<T extends KubernetesObject> implements ObjectCache<T>, In
private async doneHandler(err: any): Promise<any> {
this._stop();
if (err) {
this.callbackCache[ERROR].forEach((elt: ObjectCallback<T>) => elt(err));
this.callbackCache[ERROR].forEach((elt: ObjectCallback<T>) => elt(undefined, err));
return;
}
if (this.stopped) {
// do not auto-restart
return;
}
this.callbackCache[CONNECT].forEach((elt: ObjectCallback<T>) => elt());
// TODO: Don't always list here for efficiency
// try to restart the watch from resourceVersion, but detect 410 GONE and relist in that case.
// Or if resourceVersion is empty.
Expand Down
145 changes: 121 additions & 24 deletions src/cache_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import { EventEmitter } from 'ws';

import { V1Namespace, V1NamespaceList, V1ObjectMeta, V1Pod, V1ListMeta } from './api';
import { deleteObject, ListWatch, deleteItems } from './cache';
import { ADD, UPDATE, DELETE, ERROR, ListPromise, CHANGE } from './informer';
import { ADD, UPDATE, DELETE, ERROR, ListPromise, CHANGE, CONNECT } from './informer';

use(chaiAsPromised);

Expand Down Expand Up @@ -44,7 +44,7 @@ describe('ListWatchCache', () => {
};
const lw = new ListWatch('/some/path', fake, listFn);
const verb = 'FOOBAR';
expect(() => lw.on(verb, (obj: V1Namespace) => {})).to.throw(`Unknown verb: ${verb}`);
expect(() => lw.on(verb, (obj?: V1Namespace) => {})).to.throw(`Unknown verb: ${verb}`);
});

it('should perform basic caching', async () => {
Expand Down Expand Up @@ -216,19 +216,19 @@ describe('ListWatchCache', () => {
expect(pathOut).to.equal('/some/path');

const addPromise = new Promise<V1Namespace>((resolve: (V1Namespace) => void) => {
informer.on(ADD, (obj: V1Namespace) => {
informer.on(ADD, (obj?: V1Namespace) => {
resolve(obj);
});
});

const updatePromise = new Promise<V1Namespace>((resolve: (V1Namespace) => void) => {
informer.on(UPDATE, (obj: V1Namespace) => {
informer.on(UPDATE, (obj?: V1Namespace) => {
resolve(obj);
});
});

const deletePromise = new Promise<V1Namespace>((resolve: (V1Namespace) => void) => {
informer.on(DELETE, (obj: V1Namespace) => {
informer.on(DELETE, (obj?: V1Namespace) => {
resolve(obj);
});
});
Expand Down Expand Up @@ -310,7 +310,7 @@ describe('ListWatchCache', () => {

let count = 0;
const changePromise = new Promise<boolean>((resolve: (V1Namespace) => void) => {
informer.on(CHANGE, (obj: V1Namespace) => {
informer.on(CHANGE, (obj?: V1Namespace) => {
count++;
if (count == 3) {
resolve(true);
Expand Down Expand Up @@ -372,13 +372,13 @@ describe('ListWatchCache', () => {
expect(pathOut).to.equal('/some/path');

const addPromise = new Promise<V1Namespace>((resolve: (V1Namespace) => void) => {
informer.on(ADD, (obj: V1Namespace) => {
informer.on(ADD, (obj?: V1Namespace) => {
resolve(obj);
});
});

const addPromise2 = new Promise<V1Namespace>((resolve: (V1Namespace) => void) => {
informer.on(ADD, (obj: V1Namespace) => {
informer.on(ADD, (obj?: V1Namespace) => {
resolve(obj);
});
});
Expand Down Expand Up @@ -442,9 +442,9 @@ describe('ListWatchCache', () => {
const informer = new ListWatch('/some/path', mock.instance(fakeWatch), listFn, false);

const addObjects: V1Namespace[] = [];
informer.on(ADD, (obj: V1Namespace) => addObjects.push(obj));
informer.on(ADD, (obj?: V1Namespace) => addObjects.push(obj!));
const updateObjects: V1Namespace[] = [];
informer.on(UPDATE, (obj: V1Namespace) => updateObjects.push(obj));
informer.on(UPDATE, (obj?: V1Namespace) => updateObjects.push(obj!));

informer.start();
await promise;
Expand Down Expand Up @@ -518,11 +518,11 @@ describe('ListWatchCache', () => {
const informer = new ListWatch('/some/path', mock.instance(fakeWatch), listFn, false);

const addObjects: V1Namespace[] = [];
informer.on(ADD, (obj: V1Namespace) => addObjects.push(obj));
informer.on(ADD, (obj?: V1Namespace) => addObjects.push(obj!));
const updateObjects: V1Namespace[] = [];
informer.on(UPDATE, (obj: V1Namespace) => updateObjects.push(obj));
informer.on(UPDATE, (obj?: V1Namespace) => updateObjects.push(obj!));
const deleteObjects: V1Namespace[] = [];
informer.on(DELETE, (obj: V1Namespace) => deleteObjects.push(obj));
informer.on(DELETE, (obj?: V1Namespace) => deleteObjects.push(obj!));
informer.start();
await promise;
const [pathOut, , , doneHandler] = mock.capture(fakeWatch.watch).last();
Expand Down Expand Up @@ -716,12 +716,12 @@ describe('ListWatchCache', () => {
const informer = new ListWatch('/some/path', mock.instance(fakeWatch), listFn);

const addedList1: V1Namespace[] = [];
const addToList1Fn = function(obj: V1Namespace) {
addedList1.push(obj);
const addToList1Fn = function(obj?: V1Namespace) {
addedList1.push(obj!);
};
const addedList2: V1Namespace[] = [];
const addToList2Fn = function(obj: V1Namespace) {
addedList2.push(obj);
const addToList2Fn = function(obj?: V1Namespace) {
addedList2.push(obj!);
};

informer.start();
Expand Down Expand Up @@ -775,8 +775,8 @@ describe('ListWatchCache', () => {
const informer = new ListWatch('/some/path', mock.instance(fakeWatch), listFn);

const addedList: V1Namespace[] = [];
const addToListFn = function(obj: V1Namespace) {
addedList.push(obj);
const addToListFn = function(obj?: V1Namespace) {
addedList.push(obj!);
};
const removeSelf = function() {
informer.off(ADD, removeSelf);
Expand Down Expand Up @@ -863,12 +863,12 @@ describe('ListWatchCache', () => {
const informer = new ListWatch('/some/path', mock.instance(fakeWatch), listFn);

const addedList1: V1Namespace[] = [];
const addToList1Fn = function(obj: V1Namespace) {
addedList1.push(obj);
const addToList1Fn = function(obj?: V1Namespace) {
addedList1.push(obj!);
};
const addedList2: V1Namespace[] = [];
const addToList2Fn = function(obj: V1Namespace) {
addedList2.push(obj);
const addToList2Fn = function(obj?: V1Namespace) {
addedList2.push(obj!);
};

informer.start();
Expand Down Expand Up @@ -1106,7 +1106,104 @@ describe('delete items', () => {
];
const pods: V1Pod[] = [];

deleteItems(listA, listB, [(obj: V1Pod) => pods.push(obj)]);
deleteItems(listA, listB, [(obj?: V1Pod) => pods.push(obj!)]);
expect(pods).to.deep.equal(expected);
});

it('should call the connect handler', async () => {
const fakeWatch = mock.mock(Watch);
const listObj = {
metadata: {
resourceVersion: '12345',
} as V1ListMeta,
items: [],
} as V1NamespaceList;

const listFn: ListPromise<V1Namespace> = function(): Promise<{
response: http.IncomingMessage;
body: V1NamespaceList;
}> {
return new Promise<{ response: http.IncomingMessage; body: V1NamespaceList }>(
(resolve, reject) => {
resolve({ response: {} as http.IncomingMessage, body: listObj });
},
);
};
const informer = new ListWatch('/some/path', mock.instance(fakeWatch), listFn, false);
const connectPromise = new Promise<boolean>((resolve: (boolean) => void) => {
informer.on(CONNECT, (obj?: V1Namespace) => {
resolve(true);
});
});
informer.start();

expect(connectPromise).to.eventually.be.true;
});

it('does calls connect after a restart after an error', async () => {
const fakeWatch = mock.mock(Watch);
const list: V1Pod[] = [
{
metadata: {
name: 'name1',
namespace: 'ns1',
} as V1ObjectMeta,
} as V1Pod,
{
metadata: {
name: 'name2',
namespace: 'ns2',
} as V1ObjectMeta,
} as V1Pod,
];
const listObj = {
metadata: {
resourceVersion: '12345',
} as V1ListMeta,
items: list,
} as V1NamespaceList;

const listFn: ListPromise<V1Namespace> = function(): Promise<{
response: http.IncomingMessage;
body: V1NamespaceList;
}> {
return new Promise<{ response: http.IncomingMessage; body: V1NamespaceList }>(
(resolve, reject) => {
resolve({ response: {} as http.IncomingMessage, body: listObj });
},
);
};
let promise = new Promise((resolve) => {
mock.when(
fakeWatch.watch(mock.anything(), mock.anything(), mock.anything(), mock.anything()),
).thenCall(() => {
resolve(new FakeRequest());
});
});

const cache = new ListWatch('/some/path', mock.instance(fakeWatch), listFn);
await promise;

let errorEmitted = false;
cache.on(ERROR, () => (errorEmitted = true));

const [, , , doneHandler] = mock.capture(fakeWatch.watch).last();

const error = new Error('testing');
await doneHandler(error);

mock.verify(
fakeWatch.watch(mock.anything(), mock.anything(), mock.anything(), mock.anything()),
).once();
expect(errorEmitted).to.equal(true);

const connectPromise = new Promise<boolean>((resolve: (boolean) => void) => {
cache.on(CONNECT, (obj?: V1Namespace) => {
resolve(true);
});
});
cache.start();

expect(connectPromise).to.eventually.be.true;
});
});
7 changes: 6 additions & 1 deletion src/informer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,22 @@ import { Watch } from './watch';

import http = require('http');

export type ObjectCallback<T extends KubernetesObject> = (obj: T) => void;
export type ObjectCallback<T extends KubernetesObject> = (obj?: T, err?: any) => void;
export type ListCallback<T extends KubernetesObject> = (list: T[], ResourceVersion: string) => void;
export type ListPromise<T extends KubernetesObject> = () => Promise<{
response: http.IncomingMessage;
body: KubernetesListObject<T>;
}>;

// These are issued per object
export const ADD: string = 'add';
export const UPDATE: string = 'update';
export const CHANGE: string = 'change';
export const DELETE: string = 'delete';

// This is issued when a watch connects or reconnects
export const CONNECT: string = 'connect';
// This is issued when there is an error
export const ERROR: string = 'error';

export interface Informer<T> {
Expand Down

0 comments on commit 0cc04fe

Please sign in to comment.