Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

cache: Fix connection leak. #576

Merged
merged 1 commit into from
Jan 27, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 11 additions & 16 deletions src/cache.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { ADD, DELETE, ERROR, Informer, ListPromise, ObjectCallback, UPDATE } from './informer';
import { KubernetesObject } from './types';
import { Watch } from './watch';
import { RequestResult, Watch } from './watch';

export interface ObjectCache<T> {
get(name: string, namespace?: string): T | undefined;
Expand All @@ -12,7 +12,7 @@ export class ListWatch<T extends KubernetesObject> implements ObjectCache<T>, In
private resourceVersion: string;
private readonly indexCache: { [key: string]: T[] } = {};
private readonly callbackCache: { [key: string]: Array<ObjectCallback<T>> } = {};
private stopped: boolean;
private request: RequestResult | undefined;

public constructor(
private readonly path: string,
Expand All @@ -25,19 +25,20 @@ export class ListWatch<T extends KubernetesObject> implements ObjectCache<T>, In
this.callbackCache[DELETE] = [];
this.callbackCache[ERROR] = [];
this.resourceVersion = '';
this.stopped = true;
if (autoStart) {
this.start();
this.doneHandler(null);
}
}

public async start(): Promise<void> {
this.stopped = false;
await this.doneHandler();
await this.doneHandler(null);
}

public stop(): void {
this.stopped = true;
if (this.request) {
this.request.abort();
this.request = undefined;
}
}

public on(verb: string, cb: ObjectCallback<T>): void {
Expand Down Expand Up @@ -79,15 +80,10 @@ export class ListWatch<T extends KubernetesObject> implements ObjectCache<T>, In
return this.resourceVersion;
}

private async errorHandler(err: any): Promise<void> {
private async doneHandler(err: any): Promise<any> {
this.stop();
if (err) {
this.callbackCache[ERROR].forEach((elt: ObjectCallback<T>) => elt(err));
}
this.stopped = true;
}

private async doneHandler(): Promise<any> {
if (this.stopped) {
return;
}
// TODO: Don't always list here for efficiency
Expand All @@ -106,12 +102,11 @@ export class ListWatch<T extends KubernetesObject> implements ObjectCache<T>, In
}
});
this.addOrUpdateItems(list.items);
await this.watch.watch(
this.request = await this.watch.watch(
this.path,
{ resourceVersion: list.metadata!.resourceVersion },
this.watchHandler.bind(this),
this.doneHandler.bind(this),
this.errorHandler.bind(this),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please preserve the error handler here.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done callback handles the error as it is usually done in nodejs code - no need to pass an error handler anymore. If the watcher ends with an error then it will be the first argument of done callback. IMHO having a distinct callback just for errors makes usage of the watcher more complicated. Or is there a use case when error callback is useful and cannot be replaced by a single done callback?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The web socket makes a distinction between the two cases:
OnError:
https://developer.mozilla.org/en-US/docs/Web/API/WebSocket/onerror

OnClose:
https://developer.mozilla.org/en-US/docs/Web/API/WebSocket/onclose

I'd like to preserve that distinction.

In particular, from the docs at least:

"A function or EventHandler which is executed whenever an error event occurs on the WebSocket connection."

An error can occur independent of a close event (which triggers done)

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is an abstraction that makes sense for the websocket, where as you have cited:

An error can occur independent of a close event (which triggers done)

in our case that cannot happen. If there is an error, it is always followed by close (done). By adopting websocket model we have to supply two callbacks instead of one and save the state between the callbacks. Something like:

   private error: any;
   
   private async errorHandler(err: any): Promise<void> {
         this.error = err;
   }

   private async doneHandler(): Promise<void> {
         this.stop();
         if (this.error) {
             this.callbackCache[ERROR].forEach((elt: ObjectCallback<T>) => elt(this.error));
             return;
        }
        ...
   }

The current code calls callbackCache[ERROR] immdiately in error handler and introduces new stopped variable. But the principle remains the same. If we compare that with a simple done callback with error argument, that is crystal clear and easy to understand, I'm wondering why would we deliberately introduce more complex code without bringing any advantage to us.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I mis-spoke, it's not a WebSocket in this case, it is a nodejs Stream:

https://github.com/kubernetes-client/javascript/blob/master/src/watch.ts#L82

but the idea is the same. The Stream has an error callback and a done callback and they're distinct, I don't see much value in adding more code so that we can hide that. (esp. since Stream is a core NodeJS class)

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AFAIK the code as it is now does all what we need. As shown above having additional error callback implies more complexity in all consumers of the API. Watcher code itself remains as it is. It does either:

doneCb(err);

or with error callback

errorCb(err);
doneCb();

error callback in the watcher does not make anything easier. We still need to handle error events from underlaying streams, we still need to ensure that error and done callbacks are called just once. Unless I overlooked something.

If the only reason is to make watcher API more similar to nodejs stream API, then even after adding error callback they will be significantly different (events vs callbacks, drain, finish pipe events which are not there, # of methods that are missing). If we wanted to convert watcher to a true stream module, that would be something else and neat. Then we could do something like:

watcher.on('data', ...)
watcher.pipe(other-stream)
watcher.on('error', ...)

but adding an error callback without transforming the whole API to be stream-compliant seems like a meaningless step to me.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, I'm alright with this after further thought.

);
}

Expand Down
129 changes: 38 additions & 91 deletions src/cache_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,22 @@ import chaiAsPromised = require('chai-as-promised');
import * as mock from 'ts-mockito';

import http = require('http');
import { Duplex } from 'stream';
import { EventEmitter } from 'ws';

import { V1Namespace, V1NamespaceList, V1ObjectMeta, V1Pod, V1ListMeta } from './api';
import { deleteObject, ListWatch, deleteItems } from './cache';
import { ListCallback, ADD, UPDATE, DELETE, ListPromise } from './informer';
import { ADD, UPDATE, DELETE, ListPromise } from './informer';

use(chaiAsPromised);

import { Watch } from './watch';
import { RequestResult, Watch } from './watch';

// Object replacing real Request object in the test
class FakeRequest extends EventEmitter implements RequestResult {
pipe(stream: Duplex): void {}
abort() {}
}

describe('ListWatchCache', () => {
it('should throw on unknown update', () => {
Expand All @@ -24,7 +32,12 @@ describe('ListWatchCache', () => {
(resolve, reject) => {
resolve({
response: {} as http.IncomingMessage,
body: {} as V1NamespaceList,
body: {
metadata: {
resourceVersion: '12345',
} as V1ListMeta,
items: [],
} as V1NamespaceList,
});
},
);
Expand Down Expand Up @@ -88,15 +101,9 @@ describe('ListWatchCache', () => {
};
const promise = new Promise((resolve) => {
mock.when(
fakeWatch.watch(
mock.anything(),
mock.anything(),
mock.anything(),
mock.anything(),
mock.anything(),
),
fakeWatch.watch(mock.anything(), mock.anything(), mock.anything(), mock.anything()),
).thenCall(() => {
resolve(null);
resolve(new FakeRequest());
});
});
const cache = new ListWatch('/some/path', mock.instance(fakeWatch), listFn);
Expand Down Expand Up @@ -159,7 +166,7 @@ describe('ListWatchCache', () => {
} as V1ObjectMeta,
} as V1Namespace);

await doneHandler();
await doneHandler(null);
expect(cache.list().length, 'all namespace list').to.equal(1);
expect(cache.list('default').length, 'default namespace list').to.equal(1);
expect(cache.list('other'), 'other namespace list').to.be.undefined;
Expand Down Expand Up @@ -198,15 +205,9 @@ describe('ListWatchCache', () => {
};
const promise = new Promise((resolve) => {
mock.when(
fakeWatch.watch(
mock.anything(),
mock.anything(),
mock.anything(),
mock.anything(),
mock.anything(),
),
fakeWatch.watch(mock.anything(), mock.anything(), mock.anything(), mock.anything()),
).thenCall(() => {
resolve(null);
resolve(new FakeRequest());
});
});
const informer = new ListWatch('/some/path', mock.instance(fakeWatch), listFn);
Expand Down Expand Up @@ -285,15 +286,9 @@ describe('ListWatchCache', () => {
};
const promise = new Promise((resolve) => {
mock.when(
fakeWatch.watch(
mock.anything(),
mock.anything(),
mock.anything(),
mock.anything(),
mock.anything(),
),
fakeWatch.watch(mock.anything(), mock.anything(), mock.anything(), mock.anything()),
).thenCall(() => {
resolve(null);
resolve(new FakeRequest());
});
});
const informer = new ListWatch('/some/path', mock.instance(fakeWatch), listFn);
Expand Down Expand Up @@ -362,15 +357,9 @@ describe('ListWatchCache', () => {
};
let promise = new Promise((resolve) => {
mock.when(
fakeWatch.watch(
mock.anything(),
mock.anything(),
mock.anything(),
mock.anything(),
mock.anything(),
),
fakeWatch.watch(mock.anything(), mock.anything(), mock.anything(), mock.anything()),
).thenCall(() => {
resolve(null);
resolve(new FakeRequest());
});
});
const informer = new ListWatch('/some/path', mock.instance(fakeWatch), listFn, false);
Expand All @@ -390,18 +379,12 @@ describe('ListWatchCache', () => {

promise = new Promise((resolve) => {
mock.when(
fakeWatch.watch(
mock.anything(),
mock.anything(),
mock.anything(),
mock.anything(),
mock.anything(),
),
fakeWatch.watch(mock.anything(), mock.anything(), mock.anything(), mock.anything()),
).thenCall(() => {
resolve(null);
resolve(new FakeRequest());
});
});
doneHandler();
doneHandler(null);
await promise;
expect(addObjects).to.deep.equal(list);
expect(updateObjects).to.deep.equal(list);
Expand Down Expand Up @@ -447,15 +430,9 @@ describe('ListWatchCache', () => {
};
let promise = new Promise((resolve) => {
mock.when(
fakeWatch.watch(
mock.anything(),
mock.anything(),
mock.anything(),
mock.anything(),
mock.anything(),
),
fakeWatch.watch(mock.anything(), mock.anything(), mock.anything(), mock.anything()),
).thenCall(() => {
resolve(null);
resolve(new FakeRequest());
});
});
const informer = new ListWatch('/some/path', mock.instance(fakeWatch), listFn, false);
Expand All @@ -476,19 +453,13 @@ describe('ListWatchCache', () => {

promise = new Promise((resolve) => {
mock.when(
fakeWatch.watch(
mock.anything(),
mock.anything(),
mock.anything(),
mock.anything(),
mock.anything(),
),
fakeWatch.watch(mock.anything(), mock.anything(), mock.anything(), mock.anything()),
).thenCall(() => {
resolve(null);
resolve(new FakeRequest());
});
});
listObj.items = list2;
doneHandler();
doneHandler(null);
await promise;
expect(addObjects).to.deep.equal(list);
expect(updateObjects).to.deep.equal(list2);
Expand Down Expand Up @@ -536,15 +507,9 @@ describe('ListWatchCache', () => {
};
const promise = new Promise((resolve) => {
mock.when(
fakeWatch.watch(
mock.anything(),
mock.anything(),
mock.anything(),
mock.anything(),
mock.anything(),
),
fakeWatch.watch(mock.anything(), mock.anything(), mock.anything(), mock.anything()),
).thenCall(() => {
resolve(null);
resolve(new FakeRequest());
});
});
const cache = new ListWatch('/some/path', mock.instance(fakeWatch), listFn);
Expand Down Expand Up @@ -662,13 +627,7 @@ describe('ListWatchCache', () => {
};
const watchCalled = new Promise((resolve) => {
mock.when(
fakeWatch.watch(
mock.anything(),
mock.anything(),
mock.anything(),
mock.anything(),
mock.anything(),
),
fakeWatch.watch(mock.anything(), mock.anything(), mock.anything(), mock.anything()),
).thenCall(resolve);
});
const informer = new ListWatch('/some/path', mock.instance(fakeWatch), listFn);
Expand Down Expand Up @@ -727,13 +686,7 @@ describe('ListWatchCache', () => {
};
const watchCalled = new Promise((resolve) => {
mock.when(
fakeWatch.watch(
mock.anything(),
mock.anything(),
mock.anything(),
mock.anything(),
mock.anything(),
),
fakeWatch.watch(mock.anything(), mock.anything(), mock.anything(), mock.anything()),
).thenCall(resolve);
});
const informer = new ListWatch('/some/path', mock.instance(fakeWatch), listFn);
Expand Down Expand Up @@ -820,13 +773,7 @@ describe('ListWatchCache', () => {
};
const watchCalled = new Promise((resolve) => {
mock.when(
fakeWatch.watch(
mock.anything(),
mock.anything(),
mock.anything(),
mock.anything(),
mock.anything(),
),
fakeWatch.watch(mock.anything(), mock.anything(), mock.anything(), mock.anything()),
).thenCall(resolve);
});
const informer = new ListWatch('/some/path', mock.instance(fakeWatch), listFn);
Expand Down
Loading