Skip to content

Commit

Permalink
Merge pull request #780 from bbatha/fix-cache-performance
Browse files Browse the repository at this point in the history
fix(cache): update cache with O(1) data structures
  • Loading branch information
k8s-ci-robot authored Mar 4, 2022
2 parents ea5041d + a589f36 commit a5264eb
Show file tree
Hide file tree
Showing 2 changed files with 252 additions and 112 deletions.
179 changes: 100 additions & 79 deletions src/cache.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,15 @@ export interface ObjectCache<T> {
list(namespace?: string): ReadonlyArray<T>;
}

// exported for testing
export type CacheMap<T extends KubernetesObject> = Map<string, Map<string, T>>;

export class ListWatch<T extends KubernetesObject> implements ObjectCache<T>, Informer<T> {
private objects: T[] = [];
private objects: CacheMap<T> = new Map();
private resourceVersion: string;
private readonly indexCache: { [key: string]: T[] } = {};
private readonly callbackCache: { [key: string]: Array<ObjectCallback<T> | ErrorCallback> } = {};
private readonly callbackCache: {
[key: string]: Array<ObjectCallback<T> | ErrorCallback>;
} = {};
private request: RequestResult | undefined;
private stopped: boolean = false;

Expand Down Expand Up @@ -93,18 +97,26 @@ export class ListWatch<T extends KubernetesObject> implements ObjectCache<T>, In
}

public get(name: string, namespace?: string): T | undefined {
return this.objects.find(
(obj: T): boolean => {
return obj.metadata!.name === name && (!namespace || obj.metadata!.namespace === namespace);
},
);
const nsObjects = this.objects.get(namespace || '');
if (nsObjects) {
return nsObjects.get(name);
}
return undefined;
}

public list(namespace?: string | undefined): ReadonlyArray<T> {
if (!namespace) {
return this.objects;
const allObjects: T[] = [];
for (const nsObjects of this.objects.values()) {
allObjects.push(...nsObjects.values());
}
return allObjects;
}
const namespaceObjects = this.objects.get(namespace || '');
if (!namespaceObjects) {
return [];
}
return this.indexCache[namespace] as ReadonlyArray<T>;
return Array.from(namespaceObjects.values());
}

public latestResourceVersion(): string {
Expand All @@ -118,7 +130,7 @@ export class ListWatch<T extends KubernetesObject> implements ObjectCache<T>, In
}
}

private async doneHandler(err: any): Promise<any> {
private async doneHandler(err: unknown): Promise<void> {
this._stop();
if (
err &&
Expand All @@ -139,16 +151,8 @@ export class ListWatch<T extends KubernetesObject> implements ObjectCache<T>, In
const result = await promise;
const list = result.body;
this.objects = deleteItems(this.objects, list.items, this.callbackCache[DELETE].slice());
Object.keys(this.indexCache).forEach((key) => {
const updateObjects = deleteItems(this.indexCache[key], list.items);
if (updateObjects.length !== 0) {
this.indexCache[key] = updateObjects;
} else {
delete this.indexCache[key];
}
});
this.addOrUpdateItems(list.items);
this.resourceVersion = list.metadata!.resourceVersion!;
this.resourceVersion = list.metadata!.resourceVersion || '';
}
const queryParams = {
resourceVersion: this.resourceVersion,
Expand All @@ -175,21 +179,9 @@ export class ListWatch<T extends KubernetesObject> implements ObjectCache<T>, In
this.callbackCache[ADD].slice(),
this.callbackCache[UPDATE].slice(),
);
if (obj.metadata!.namespace) {
this.indexObj(obj);
}
});
}

private indexObj(obj: T): void {
let namespaceList = this.indexCache[obj.metadata!.namespace!] as T[];
if (!namespaceList) {
namespaceList = [];
this.indexCache[obj.metadata!.namespace!] = namespaceList;
}
addOrUpdateObject(namespaceList, obj);
}

private async watchHandler(
phase: string,
obj: T,
Expand All @@ -204,18 +196,9 @@ export class ListWatch<T extends KubernetesObject> implements ObjectCache<T>, In
this.callbackCache[ADD].slice(),
this.callbackCache[UPDATE].slice(),
);
if (obj.metadata!.namespace) {
this.indexObj(obj);
}
break;
case 'DELETED':
deleteObject(this.objects, obj, this.callbackCache[DELETE].slice());
if (obj.metadata!.namespace) {
const namespaceList = this.indexCache[obj.metadata!.namespace!] as T[];
if (namespaceList) {
deleteObject(namespaceList, obj);
}
}
break;
case 'BOOKMARK':
// nothing to do, here for documentation, mostly.
Expand All @@ -228,50 +211,85 @@ export class ListWatch<T extends KubernetesObject> implements ObjectCache<T>, In
}
}

// exported for testing
export function cacheMapFromList<T extends KubernetesObject>(newObjects: T[]): CacheMap<T> {
const objects: CacheMap<T> = new Map();
// build up the new list
for (const obj of newObjects) {
let namespaceObjects = objects.get(obj.metadata!.namespace || '');
if (!namespaceObjects) {
namespaceObjects = new Map();
objects.set(obj.metadata!.namespace || '', namespaceObjects);
}

const name = obj.metadata!.name || '';
namespaceObjects.set(name, obj);
}
return objects;
}

// external for testing
export function deleteItems<T extends KubernetesObject>(
oldObjects: T[],
oldObjects: CacheMap<T>,
newObjects: T[],
deleteCallback?: Array<ObjectCallback<T>>,
): T[] {
return oldObjects.filter((obj: T) => {
if (findKubernetesObject(newObjects, obj) === -1) {
if (deleteCallback) {
deleteCallback.forEach((fn: ObjectCallback<T>) => fn(obj));
): CacheMap<T> {
const newObjectsMap = cacheMapFromList(newObjects);

for (const [namespace, oldNamespaceObjects] of oldObjects.entries()) {
const newNamespaceObjects = newObjectsMap.get(namespace);
if (newNamespaceObjects) {
for (const [name, oldObj] of oldNamespaceObjects.entries()) {
if (!newNamespaceObjects.has(name)) {
oldNamespaceObjects.delete(name);
if (deleteCallback) {
deleteCallback.forEach((fn: ObjectCallback<T>) => fn(oldObj));
}
}
}
return false;
} else {
oldObjects.delete(namespace);
oldNamespaceObjects.forEach((obj: T) => {
if (deleteCallback) {
deleteCallback.forEach((fn: ObjectCallback<T>) => fn(obj));
}
});
}
return true;
});
}

return oldObjects;
}

// Only public for testing.
export function addOrUpdateObject<T extends KubernetesObject>(
objects: T[],
objects: CacheMap<T>,
obj: T,
addCallback?: Array<ObjectCallback<T>>,
updateCallback?: Array<ObjectCallback<T>>,
addCallbacks?: Array<ObjectCallback<T>>,
updateCallbacks?: Array<ObjectCallback<T>>,
): void {
const ix = findKubernetesObject(objects, obj);
if (ix === -1) {
objects.push(obj);
if (addCallback) {
addCallback.forEach((elt: ObjectCallback<T>) => elt(obj));
let namespaceObjects = objects.get(obj.metadata!.namespace || '');
if (!namespaceObjects) {
namespaceObjects = new Map();
objects.set(obj.metadata!.namespace || '', namespaceObjects);
}

const name = obj.metadata!.name || '';
const found = namespaceObjects.get(name);
if (!found) {
namespaceObjects.set(name, obj);
if (addCallbacks) {
addCallbacks.forEach((elt: ObjectCallback<T>) => elt(obj));
}
} else {
if (!isSameVersion(objects[ix], obj)) {
objects[ix] = obj;
if (updateCallback) {
updateCallback.forEach((elt: ObjectCallback<T>) => elt(obj));
if (!isSameVersion(found, obj)) {
namespaceObjects.set(name, obj);
if (updateCallbacks) {
updateCallbacks.forEach((elt: ObjectCallback<T>) => elt(obj));
}
}
}
}

function isSameObject<T extends KubernetesObject>(o1: T, o2: T): boolean {
return o1.metadata!.name === o2.metadata!.name && o1.metadata!.namespace === o2.metadata!.namespace;
}

function isSameVersion<T extends KubernetesObject>(o1: T, o2: T): boolean {
return (
o1.metadata!.resourceVersion !== undefined &&
Expand All @@ -280,23 +298,26 @@ function isSameVersion<T extends KubernetesObject>(o1: T, o2: T): boolean {
);
}

function findKubernetesObject<T extends KubernetesObject>(objects: T[], obj: T): number {
return objects.findIndex((elt: T) => {
return isSameObject(elt, obj);
});
}

// Public for testing.
export function deleteObject<T extends KubernetesObject>(
objects: T[],
objects: CacheMap<T>,
obj: T,
deleteCallback?: Array<ObjectCallback<T>>,
deleteCallbacks?: Array<ObjectCallback<T>>,
): void {
const ix = findKubernetesObject(objects, obj);
if (ix !== -1) {
objects.splice(ix, 1);
if (deleteCallback) {
deleteCallback.forEach((elt: ObjectCallback<T>) => elt(obj));
const namespace = obj.metadata!.namespace || '';
const name = obj.metadata!.name || '';

const namespaceObjects = objects.get(namespace);
if (!namespaceObjects) {
return;
}
const deleted = namespaceObjects.delete(name);
if (deleted) {
if (deleteCallbacks) {
deleteCallbacks.forEach((elt: ObjectCallback<T>) => elt(obj));
}
if (namespaceObjects.size === 0) {
objects.delete(namespace);
}
}
}
Loading

0 comments on commit a5264eb

Please sign in to comment.