Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve watch performance. #1995

Merged
merged 2 commits into from
Oct 31, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
174 changes: 98 additions & 76 deletions src/cache.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,11 @@ export interface ObjectCache<T> {
list(namespace?: string): ReadonlyArray<T>;
}

// exported for testing
export type CacheMap<T extends KubernetesObject> = Map<string, Map<string, T>>;

export class ListWatch<T extends KubernetesObject> implements ObjectCache<T>, Informer<T> {
private objects: T[] = [];
private objects: CacheMap<T> = new Map();
private resourceVersion: string;
private readonly indexCache: { [key: string]: T[] } = {};
private readonly callbackCache: { [key: string]: (ObjectCallback<T> | ErrorCallback)[] } = {};
Expand Down Expand Up @@ -93,16 +96,26 @@ export class ListWatch<T extends KubernetesObject> implements ObjectCache<T>, In
}

public get(name: string, namespace?: string): T | undefined {
return this.objects.find((obj: T): boolean => {
return obj.metadata!.name === name && (!namespace || obj.metadata!.namespace === namespace);
});
const nsObjects = this.objects.get(namespace || '');
if (nsObjects) {
return nsObjects.get(name);
}
return undefined;
}

public list(namespace?: string | undefined): ReadonlyArray<T> {
if (!namespace) {
return this.objects;
const allObjects: T[] = [];
for (const nsObjects of this.objects.values()) {
allObjects.push(...nsObjects.values());
}
return allObjects;
}
const namespaceObjects = this.objects.get(namespace || '');
if (!namespaceObjects) {
return [];
}
return this.indexCache[namespace] as ReadonlyArray<T>;
return Array.from(namespaceObjects.values());
}

public latestResourceVersion(): string {
Expand All @@ -116,7 +129,7 @@ export class ListWatch<T extends KubernetesObject> implements ObjectCache<T>, In
}
}

private async doneHandler(err: any): Promise<any> {
private async doneHandler(err: any): Promise<void> {
this._stop();
if (err && err.statusCode === 410) {
this.resourceVersion = '';
Expand All @@ -133,16 +146,8 @@ export class ListWatch<T extends KubernetesObject> implements ObjectCache<T>, In
const promise = this.listFn();
const list = await promise;
this.objects = deleteItems(this.objects, list.items, this.callbackCache[DELETE].slice());
Object.keys(this.indexCache).forEach((key) => {
const updateObjects = deleteItems(this.indexCache[key], list.items);
if (updateObjects.length !== 0) {
this.indexCache[key] = updateObjects;
} else {
delete this.indexCache[key];
}
});
this.addOrUpdateItems(list.items);
this.resourceVersion = list.metadata!.resourceVersion!;
this.resourceVersion = list.metadata!.resourceVersion || '';
}
const queryParams = {
resourceVersion: this.resourceVersion,
Expand All @@ -169,21 +174,9 @@ export class ListWatch<T extends KubernetesObject> implements ObjectCache<T>, In
this.callbackCache[ADD].slice(),
this.callbackCache[UPDATE].slice(),
);
if (obj.metadata!.namespace) {
this.indexObj(obj);
}
});
}

private indexObj(obj: T): void {
let namespaceList = this.indexCache[obj.metadata!.namespace!] as T[];
if (!namespaceList) {
namespaceList = [];
this.indexCache[obj.metadata!.namespace!] = namespaceList;
}
addOrUpdateObject(namespaceList, obj);
}

private watchHandler(phase: string, obj: T, watchObj?: any): void {
switch (phase) {
case 'ERROR':
Expand All @@ -199,18 +192,9 @@ export class ListWatch<T extends KubernetesObject> implements ObjectCache<T>, In
this.callbackCache[ADD].slice(),
this.callbackCache[UPDATE].slice(),
);
if (obj.metadata!.namespace) {
this.indexObj(obj);
}
break;
case 'DELETED':
deleteObject(this.objects, obj, this.callbackCache[DELETE].slice());
if (obj.metadata!.namespace) {
const namespaceList = this.indexCache[obj.metadata!.namespace!] as T[];
if (namespaceList) {
deleteObject(namespaceList, obj);
}
}
break;
case 'BOOKMARK':
// nothing to do, here for documentation, mostly.
Expand All @@ -222,75 +206,113 @@ export class ListWatch<T extends KubernetesObject> implements ObjectCache<T>, In
}
}

// exported for testing
export function cacheMapFromList<T extends KubernetesObject>(newObjects: T[]): CacheMap<T> {
const objects: CacheMap<T> = new Map();
// build up the new list
for (const obj of newObjects) {
let namespaceObjects = objects.get(obj.metadata!.namespace || '');
if (!namespaceObjects) {
namespaceObjects = new Map();
objects.set(obj.metadata!.namespace || '', namespaceObjects);
}

const name = obj.metadata!.name || '';
namespaceObjects.set(name, obj);
}
return objects;
}

// external for testing
export function deleteItems<T extends KubernetesObject>(
oldObjects: T[],
oldObjects: CacheMap<T>,
newObjects: T[],
deleteCallback?: ObjectCallback<T>[],
): T[] {
return oldObjects.filter((obj: T) => {
if (findKubernetesObject(newObjects, obj) === -1) {
if (deleteCallback) {
deleteCallback.forEach((fn: ObjectCallback<T>) => fn(obj));
): CacheMap<T> {
const newObjectsMap = cacheMapFromList(newObjects);

for (const [namespace, oldNamespaceObjects] of oldObjects.entries()) {
const newNamespaceObjects = newObjectsMap.get(namespace);
if (newNamespaceObjects) {
for (const [name, oldObj] of oldNamespaceObjects.entries()) {
if (!newNamespaceObjects.has(name)) {
oldNamespaceObjects.delete(name);
if (deleteCallback) {
deleteCallback.forEach((fn: ObjectCallback<T>) => fn(oldObj));
}
}
}
return false;
} else {
oldObjects.delete(namespace);
oldNamespaceObjects.forEach((obj: T) => {
if (deleteCallback) {
deleteCallback.forEach((fn: ObjectCallback<T>) => fn(obj));
}
});
}
return true;
});
}

return oldObjects;
}

// Only public for testing.
export function addOrUpdateObject<T extends KubernetesObject>(
objects: T[],
objects: CacheMap<T>,
obj: T,
addCallback?: ObjectCallback<T>[],
updateCallback?: ObjectCallback<T>[],
addCallbacks?: ObjectCallback<T>[],
updateCallbacks?: ObjectCallback<T>[],
): void {
const ix = findKubernetesObject(objects, obj);
if (ix === -1) {
objects.push(obj);
if (addCallback) {
addCallback.forEach((elt: ObjectCallback<T>) => elt(obj));
let namespaceObjects = objects.get(obj.metadata!.namespace || '');
if (!namespaceObjects) {
namespaceObjects = new Map();
objects.set(obj.metadata!.namespace || '', namespaceObjects);
}

const name = obj.metadata!.name || '';
const found = namespaceObjects.get(name);
if (!found) {
namespaceObjects.set(name, obj);
if (addCallbacks) {
addCallbacks.forEach((elt: ObjectCallback<T>) => elt(obj));
}
} else {
if (!isSameVersion(objects[ix], obj)) {
objects[ix] = obj;
if (updateCallback) {
updateCallback.forEach((elt: ObjectCallback<T>) => elt(obj));
if (!isSameVersion(found, obj)) {
namespaceObjects.set(name, obj);
if (updateCallbacks) {
updateCallbacks.forEach((elt: ObjectCallback<T>) => elt(obj));
}
}
}
}

function isSameObject<T extends KubernetesObject>(o1: T, o2: T): boolean {
return o1.metadata!.name === o2.metadata!.name && o1.metadata!.namespace === o2.metadata!.namespace;
}

function isSameVersion<T extends KubernetesObject>(o1: T, o2: T): boolean {
function isSameVersion<T extends KubernetesObject>(o1: any, o2: T): boolean {
return (
o1.metadata!.resourceVersion !== undefined &&
o1.metadata!.resourceVersion !== null &&
o1.metadata!.resourceVersion === o2.metadata!.resourceVersion
);
}

function findKubernetesObject<T extends KubernetesObject>(objects: T[], obj: T): number {
return objects.findIndex((elt: T) => {
return isSameObject(elt, obj);
});
}

// Public for testing.
export function deleteObject<T extends KubernetesObject>(
objects: T[],
objects: CacheMap<T>,
obj: T,
deleteCallback?: ObjectCallback<T>[],
deleteCallbacks?: ObjectCallback<T>[],
): void {
const ix = findKubernetesObject(objects, obj);
if (ix !== -1) {
objects.splice(ix, 1);
if (deleteCallback) {
deleteCallback.forEach((elt: ObjectCallback<T>) => elt(obj));
const namespace = obj.metadata!.namespace || '';
const name = obj.metadata!.name || '';

const namespaceObjects = objects.get(namespace);
if (!namespaceObjects) {
return;
}
const deleted = namespaceObjects.delete(name);
if (deleted) {
if (deleteCallbacks) {
deleteCallbacks.forEach((elt: ObjectCallback<T>) => elt(obj));
}
if (namespaceObjects.size === 0) {
objects.delete(namespace);
}
}
}
Loading
Loading