Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a CONNECT event to informer. #614

Merged
merged 2 commits into from
May 2, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 14 additions & 2 deletions src/cache.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,14 @@
import { ADD, CHANGE, DELETE, ERROR, Informer, ListPromise, ObjectCallback, UPDATE } from './informer';
import {
ADD,
CHANGE,
CONNECT,
DELETE,
ERROR,
Informer,
ListPromise,
ObjectCallback,
UPDATE,
} from './informer';
import { KubernetesObject } from './types';
import { RequestResult, Watch } from './watch';

Expand All @@ -25,6 +35,7 @@ export class ListWatch<T extends KubernetesObject> implements ObjectCache<T>, In
this.callbackCache[UPDATE] = [];
this.callbackCache[DELETE] = [];
this.callbackCache[ERROR] = [];
this.callbackCache[CONNECT] = [];
this.resourceVersion = '';
if (autoStart) {
this.doneHandler(null);
Expand Down Expand Up @@ -102,13 +113,14 @@ export class ListWatch<T extends KubernetesObject> implements ObjectCache<T>, In
private async doneHandler(err: any): Promise<any> {
this._stop();
if (err) {
this.callbackCache[ERROR].forEach((elt: ObjectCallback<T>) => elt(err));
this.callbackCache[ERROR].forEach((elt: ObjectCallback<T>) => elt(undefined, err));
Copy link
Contributor

@dominykas dominykas Mar 25, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmmm. This makes it a breaking change? It also has the error at the end, which may be unintuitive to folks used to Node's error-first callbacks... But moving the error first would be even more breaking...

Not sure what's the best approach here... Not saying this shouldn't got ahead, it's easy enough to adapt as a user of the library, but still.

Would be a travesty in TypeScript world to allow on() to have overloads to accept different parameter types? i.e. allow on(verb: string, fn: ObjectCallback<T>): void; as well as on(verb: string, fn: ErrorCallback): void;? While it wouldn't enforce the correct callback type based on the verb, it would at least be non-breaking? Not sure if there's a better way to express this in TS...

Copy link
Contributor Author

@brendandburns brendandburns Mar 25, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did also try it that way:

on(verb: string, fn: ObjectCallback<T> | ErrorCallback | ConnectCallback)

But I felt like that was even messier, because suddenly the user can accidentally pass a ConnectCallback to a on('add', ...)

Maybe we could do:

export type Callback<T extends KubernetesObject> = (err: any, obj: T | null) => void;
// Deprecated
export type ObjectCallback<T extends KubernetesObject> = (obj: T) => void;

...

on(verb: string, fn: ObjectCallback | Callback)
...

wdyt? That has the benefit of not being a breaking change, and also adding in the more node-style callbacks.

Copy link
Contributor Author

@brendandburns brendandburns Mar 25, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Although, I'm not actually sure that TS can handle that since those two callback types may be indistinguishable...

I made the types more concrete so I think TS can handle it based on the # of parameters.

Copy link
Contributor

@dominykas dominykas Mar 29, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would TS handle that at runtime or just compile time?

Did TS ever get enums? If the first param for on() was enum, rather than a string, then it could have multiple signatures based on enum type... Which would probably still be a breaking change from TS perspective, but possibly the old string signature could still be there, just with any callback so that people can upgrade their code eventually, even if they would lose out a little bit in terms of potentially accidentally passing the wrong callback?

Also I wonder if passing the wrong callback is even a problem for people using TS, because surely someone must have noticed by now that the callback for the error event is not right? i.e. this could mean that making changes here is not that disruptive, even if the changes are breaking when looking at it strictly?

This is a common pattern (on() with different callbacks for different event types) - surely someone must have solved this by now? This is used in node itself (in streams and elsewhere): https://github.com/DefinitelyTyped/DefinitelyTyped/blob/1d52dcdeea0c4006bb63c28ff60b919f3f5fc3a6/types/node/stream.d.ts#L84-L91 🤔

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

was there a follow-up on this @brendandburns ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the pointer! I fixed this using type overloading.

return;
}
if (this.stopped) {
// do not auto-restart
return;
}
this.callbackCache[CONNECT].forEach((elt: ObjectCallback<T>) => elt());
// TODO: Don't always list here for efficiency
// try to restart the watch from resourceVersion, but detect 410 GONE and relist in that case.
// Or if resourceVersion is empty.
Expand Down
145 changes: 121 additions & 24 deletions src/cache_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import { EventEmitter } from 'ws';

import { V1Namespace, V1NamespaceList, V1ObjectMeta, V1Pod, V1ListMeta } from './api';
import { deleteObject, ListWatch, deleteItems } from './cache';
import { ADD, UPDATE, DELETE, ERROR, ListPromise, CHANGE } from './informer';
import { ADD, UPDATE, DELETE, ERROR, ListPromise, CHANGE, CONNECT } from './informer';

use(chaiAsPromised);

Expand Down Expand Up @@ -44,7 +44,7 @@ describe('ListWatchCache', () => {
};
const lw = new ListWatch('/some/path', fake, listFn);
const verb = 'FOOBAR';
expect(() => lw.on(verb, (obj: V1Namespace) => {})).to.throw(`Unknown verb: ${verb}`);
expect(() => lw.on(verb, (obj?: V1Namespace) => {})).to.throw(`Unknown verb: ${verb}`);
});

it('should perform basic caching', async () => {
Expand Down Expand Up @@ -216,19 +216,19 @@ describe('ListWatchCache', () => {
expect(pathOut).to.equal('/some/path');

const addPromise = new Promise<V1Namespace>((resolve: (V1Namespace) => void) => {
informer.on(ADD, (obj: V1Namespace) => {
informer.on(ADD, (obj?: V1Namespace) => {
resolve(obj);
});
});

const updatePromise = new Promise<V1Namespace>((resolve: (V1Namespace) => void) => {
informer.on(UPDATE, (obj: V1Namespace) => {
informer.on(UPDATE, (obj?: V1Namespace) => {
resolve(obj);
});
});

const deletePromise = new Promise<V1Namespace>((resolve: (V1Namespace) => void) => {
informer.on(DELETE, (obj: V1Namespace) => {
informer.on(DELETE, (obj?: V1Namespace) => {
resolve(obj);
});
});
Expand Down Expand Up @@ -310,7 +310,7 @@ describe('ListWatchCache', () => {

let count = 0;
const changePromise = new Promise<boolean>((resolve: (V1Namespace) => void) => {
informer.on(CHANGE, (obj: V1Namespace) => {
informer.on(CHANGE, (obj?: V1Namespace) => {
count++;
if (count == 3) {
resolve(true);
Expand Down Expand Up @@ -372,13 +372,13 @@ describe('ListWatchCache', () => {
expect(pathOut).to.equal('/some/path');

const addPromise = new Promise<V1Namespace>((resolve: (V1Namespace) => void) => {
informer.on(ADD, (obj: V1Namespace) => {
informer.on(ADD, (obj?: V1Namespace) => {
resolve(obj);
});
});

const addPromise2 = new Promise<V1Namespace>((resolve: (V1Namespace) => void) => {
informer.on(ADD, (obj: V1Namespace) => {
informer.on(ADD, (obj?: V1Namespace) => {
resolve(obj);
});
});
Expand Down Expand Up @@ -442,9 +442,9 @@ describe('ListWatchCache', () => {
const informer = new ListWatch('/some/path', mock.instance(fakeWatch), listFn, false);

const addObjects: V1Namespace[] = [];
informer.on(ADD, (obj: V1Namespace) => addObjects.push(obj));
informer.on(ADD, (obj?: V1Namespace) => addObjects.push(obj!));
const updateObjects: V1Namespace[] = [];
informer.on(UPDATE, (obj: V1Namespace) => updateObjects.push(obj));
informer.on(UPDATE, (obj?: V1Namespace) => updateObjects.push(obj!));

informer.start();
await promise;
Expand Down Expand Up @@ -518,11 +518,11 @@ describe('ListWatchCache', () => {
const informer = new ListWatch('/some/path', mock.instance(fakeWatch), listFn, false);

const addObjects: V1Namespace[] = [];
informer.on(ADD, (obj: V1Namespace) => addObjects.push(obj));
informer.on(ADD, (obj?: V1Namespace) => addObjects.push(obj!));
const updateObjects: V1Namespace[] = [];
informer.on(UPDATE, (obj: V1Namespace) => updateObjects.push(obj));
informer.on(UPDATE, (obj?: V1Namespace) => updateObjects.push(obj!));
const deleteObjects: V1Namespace[] = [];
informer.on(DELETE, (obj: V1Namespace) => deleteObjects.push(obj));
informer.on(DELETE, (obj?: V1Namespace) => deleteObjects.push(obj!));
informer.start();
await promise;
const [pathOut, , , doneHandler] = mock.capture(fakeWatch.watch).last();
Expand Down Expand Up @@ -716,12 +716,12 @@ describe('ListWatchCache', () => {
const informer = new ListWatch('/some/path', mock.instance(fakeWatch), listFn);

const addedList1: V1Namespace[] = [];
const addToList1Fn = function(obj: V1Namespace) {
addedList1.push(obj);
const addToList1Fn = function(obj?: V1Namespace) {
addedList1.push(obj!);
};
const addedList2: V1Namespace[] = [];
const addToList2Fn = function(obj: V1Namespace) {
addedList2.push(obj);
const addToList2Fn = function(obj?: V1Namespace) {
addedList2.push(obj!);
};

informer.start();
Expand Down Expand Up @@ -775,8 +775,8 @@ describe('ListWatchCache', () => {
const informer = new ListWatch('/some/path', mock.instance(fakeWatch), listFn);

const addedList: V1Namespace[] = [];
const addToListFn = function(obj: V1Namespace) {
addedList.push(obj);
const addToListFn = function(obj?: V1Namespace) {
addedList.push(obj!);
};
const removeSelf = function() {
informer.off(ADD, removeSelf);
Expand Down Expand Up @@ -863,12 +863,12 @@ describe('ListWatchCache', () => {
const informer = new ListWatch('/some/path', mock.instance(fakeWatch), listFn);

const addedList1: V1Namespace[] = [];
const addToList1Fn = function(obj: V1Namespace) {
addedList1.push(obj);
const addToList1Fn = function(obj?: V1Namespace) {
addedList1.push(obj!);
};
const addedList2: V1Namespace[] = [];
const addToList2Fn = function(obj: V1Namespace) {
addedList2.push(obj);
const addToList2Fn = function(obj?: V1Namespace) {
addedList2.push(obj!);
};

informer.start();
Expand Down Expand Up @@ -1106,7 +1106,104 @@ describe('delete items', () => {
];
const pods: V1Pod[] = [];

deleteItems(listA, listB, [(obj: V1Pod) => pods.push(obj)]);
deleteItems(listA, listB, [(obj?: V1Pod) => pods.push(obj!)]);
expect(pods).to.deep.equal(expected);
});

it('should call the connect handler', async () => {
const fakeWatch = mock.mock(Watch);
const listObj = {
metadata: {
resourceVersion: '12345',
} as V1ListMeta,
items: [],
} as V1NamespaceList;

const listFn: ListPromise<V1Namespace> = function(): Promise<{
response: http.IncomingMessage;
body: V1NamespaceList;
}> {
return new Promise<{ response: http.IncomingMessage; body: V1NamespaceList }>(
(resolve, reject) => {
resolve({ response: {} as http.IncomingMessage, body: listObj });
},
);
};
const informer = new ListWatch('/some/path', mock.instance(fakeWatch), listFn, false);
const connectPromise = new Promise<boolean>((resolve: (boolean) => void) => {
informer.on(CONNECT, (obj?: V1Namespace) => {
resolve(true);
});
});
informer.start();

expect(connectPromise).to.eventually.be.true;
});

it('does calls connect after a restart after an error', async () => {
const fakeWatch = mock.mock(Watch);
const list: V1Pod[] = [
{
metadata: {
name: 'name1',
namespace: 'ns1',
} as V1ObjectMeta,
} as V1Pod,
{
metadata: {
name: 'name2',
namespace: 'ns2',
} as V1ObjectMeta,
} as V1Pod,
];
const listObj = {
metadata: {
resourceVersion: '12345',
} as V1ListMeta,
items: list,
} as V1NamespaceList;

const listFn: ListPromise<V1Namespace> = function(): Promise<{
response: http.IncomingMessage;
body: V1NamespaceList;
}> {
return new Promise<{ response: http.IncomingMessage; body: V1NamespaceList }>(
(resolve, reject) => {
resolve({ response: {} as http.IncomingMessage, body: listObj });
},
);
};
let promise = new Promise((resolve) => {
mock.when(
fakeWatch.watch(mock.anything(), mock.anything(), mock.anything(), mock.anything()),
).thenCall(() => {
resolve(new FakeRequest());
});
});

const cache = new ListWatch('/some/path', mock.instance(fakeWatch), listFn);
await promise;

let errorEmitted = false;
cache.on(ERROR, () => (errorEmitted = true));

const [, , , doneHandler] = mock.capture(fakeWatch.watch).last();

const error = new Error('testing');
await doneHandler(error);

mock.verify(
fakeWatch.watch(mock.anything(), mock.anything(), mock.anything(), mock.anything()),
).once();
expect(errorEmitted).to.equal(true);

const connectPromise = new Promise<boolean>((resolve: (boolean) => void) => {
cache.on(CONNECT, (obj?: V1Namespace) => {
resolve(true);
});
});
cache.start();

expect(connectPromise).to.eventually.be.true;
});
});
7 changes: 6 additions & 1 deletion src/informer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,22 @@ import { Watch } from './watch';

import http = require('http');

export type ObjectCallback<T extends KubernetesObject> = (obj: T) => void;
export type ObjectCallback<T extends KubernetesObject> = (obj?: T, err?: any) => void;
export type ListCallback<T extends KubernetesObject> = (list: T[], ResourceVersion: string) => void;
export type ListPromise<T extends KubernetesObject> = () => Promise<{
response: http.IncomingMessage;
body: KubernetesListObject<T>;
}>;

// These are issued per object
export const ADD: string = 'add';
export const UPDATE: string = 'update';
export const CHANGE: string = 'change';
export const DELETE: string = 'delete';

// This is issued when a watch connects or reconnects
export const CONNECT: string = 'connect';
// This is issued when there is an error
export const ERROR: string = 'error';

export interface Informer<T> {
Expand Down