From 9e175e885d04a8fcb348583f79e8a45e97090d91 Mon Sep 17 00:00:00 2001 From: Brendan Burns Date: Fri, 13 Nov 2020 14:17:50 -0800 Subject: [PATCH] split watch error and watch done handlers. --- src/cache.ts | 20 +++++++++-- src/cache_test.ts | 84 ++++++++++++++++++++++++++++++++++++++++------- src/watch.ts | 23 +++++++------ src/watch_test.ts | 33 ++++++++++--------- 4 files changed, 117 insertions(+), 43 deletions(-) diff --git a/src/cache.ts b/src/cache.ts index 6491a55ac0..2ada90a721 100644 --- a/src/cache.ts +++ b/src/cache.ts @@ -12,6 +12,7 @@ export class ListWatch implements ObjectCache, In private resourceVersion: string; private readonly indexCache: { [key: string]: T[] } = {}; private readonly callbackCache: { [key: string]: Array> } = {}; + private stopped: boolean; public constructor( private readonly path: string, @@ -24,13 +25,19 @@ export class ListWatch implements ObjectCache, In this.callbackCache[DELETE] = []; this.callbackCache[ERROR] = []; this.resourceVersion = ''; + this.stopped = true; if (autoStart) { - this.doneHandler(null); + this.start(); } } public async start(): Promise { - await this.doneHandler(null); + this.stopped = false; + await this.doneHandler(); + } + + public stop(): void { + this.stopped = true; } public on(verb: string, cb: ObjectCallback): void { @@ -72,9 +79,15 @@ export class ListWatch implements ObjectCache, In return this.resourceVersion; } - private async doneHandler(err: any): Promise { + private async errorHandler(err: any): Promise { if (err) { this.callbackCache[ERROR].forEach((elt: ObjectCallback) => elt(err)); + } + this.stopped = true; + } + + private async doneHandler(): Promise { + if (this.stopped) { return; } // TODO: Don't always list here for efficiency @@ -90,6 +103,7 @@ export class ListWatch implements ObjectCache, In { resourceVersion: list.metadata!.resourceVersion }, this.watchHandler.bind(this), this.doneHandler.bind(this), + this.errorHandler.bind(this), ); } diff --git a/src/cache_test.ts b/src/cache_test.ts index 7359c4703f..4338f82bba 100644 --- a/src/cache_test.ts +++ b/src/cache_test.ts @@ -67,7 +67,13 @@ describe('ListWatchCache', () => { }; const promise = new Promise((resolve) => { mock.when( - fakeWatch.watch(mock.anything(), mock.anything(), mock.anything(), mock.anything()), + fakeWatch.watch( + mock.anything(), + mock.anything(), + mock.anything(), + mock.anything(), + mock.anything(), + ), ).thenCall(() => { resolve(); }); @@ -146,7 +152,13 @@ describe('ListWatchCache', () => { }; const promise = new Promise((resolve) => { mock.when( - fakeWatch.watch(mock.anything(), mock.anything(), mock.anything(), mock.anything()), + fakeWatch.watch( + mock.anything(), + mock.anything(), + mock.anything(), + mock.anything(), + mock.anything(), + ), ).thenCall(() => { resolve(); }); @@ -227,7 +239,13 @@ describe('ListWatchCache', () => { }; const promise = new Promise((resolve) => { mock.when( - fakeWatch.watch(mock.anything(), mock.anything(), mock.anything(), mock.anything()), + fakeWatch.watch( + mock.anything(), + mock.anything(), + mock.anything(), + mock.anything(), + mock.anything(), + ), ).thenCall(() => { resolve(); }); @@ -298,7 +316,13 @@ describe('ListWatchCache', () => { }; let promise = new Promise((resolve) => { mock.when( - fakeWatch.watch(mock.anything(), mock.anything(), mock.anything(), mock.anything()), + fakeWatch.watch( + mock.anything(), + mock.anything(), + mock.anything(), + mock.anything(), + mock.anything(), + ), ).thenCall(() => { resolve(); }); @@ -320,12 +344,18 @@ describe('ListWatchCache', () => { promise = new Promise((resolve) => { mock.when( - fakeWatch.watch(mock.anything(), mock.anything(), mock.anything(), mock.anything()), + fakeWatch.watch( + mock.anything(), + mock.anything(), + mock.anything(), + mock.anything(), + mock.anything(), + ), ).thenCall(() => { resolve(); }); }); - doneHandler(null); + doneHandler(); await promise; expect(addObjects).to.deep.equal(list); expect(updateObjects).to.deep.equal(list); @@ -371,7 +401,13 @@ describe('ListWatchCache', () => { }; let promise = new Promise((resolve) => { mock.when( - fakeWatch.watch(mock.anything(), mock.anything(), mock.anything(), mock.anything()), + fakeWatch.watch( + mock.anything(), + mock.anything(), + mock.anything(), + mock.anything(), + mock.anything(), + ), ).thenCall(() => { resolve(); }); @@ -394,13 +430,19 @@ describe('ListWatchCache', () => { promise = new Promise((resolve) => { mock.when( - fakeWatch.watch(mock.anything(), mock.anything(), mock.anything(), mock.anything()), + fakeWatch.watch( + mock.anything(), + mock.anything(), + mock.anything(), + mock.anything(), + mock.anything(), + ), ).thenCall(() => { resolve(); }); }); listObj.items = list2; - doneHandler(null); + doneHandler(); await promise; expect(addObjects).to.deep.equal(list); expect(updateObjects).to.deep.equal(list2); @@ -448,7 +490,13 @@ describe('ListWatchCache', () => { }; const promise = new Promise((resolve) => { mock.when( - fakeWatch.watch(mock.anything(), mock.anything(), mock.anything(), mock.anything()), + fakeWatch.watch( + mock.anything(), + mock.anything(), + mock.anything(), + mock.anything(), + mock.anything(), + ), ).thenCall(() => { resolve(); }); @@ -568,7 +616,13 @@ describe('ListWatchCache', () => { }; const watchCalled = new Promise((resolve) => { mock.when( - fakeWatch.watch(mock.anything(), mock.anything(), mock.anything(), mock.anything()), + fakeWatch.watch( + mock.anything(), + mock.anything(), + mock.anything(), + mock.anything(), + mock.anything(), + ), ).thenCall(resolve); }); const informer = new ListWatch('/some/path', mock.instance(fakeWatch), listFn); @@ -627,7 +681,13 @@ describe('ListWatchCache', () => { }; const watchCalled = new Promise((resolve) => { mock.when( - fakeWatch.watch(mock.anything(), mock.anything(), mock.anything(), mock.anything()), + fakeWatch.watch( + mock.anything(), + mock.anything(), + mock.anything(), + mock.anything(), + mock.anything(), + ), ).thenCall(resolve); }); const informer = new ListWatch('/some/path', mock.instance(fakeWatch), listFn); diff --git a/src/watch.ts b/src/watch.ts index 5a98bb76f0..497dde23d6 100644 --- a/src/watch.ts +++ b/src/watch.ts @@ -46,7 +46,8 @@ export class Watch { path: string, queryParams: any, callback: (phase: string, apiObj: any, watchObj?: any) => void, - done: (err: any) => void, + done: () => void, + error: (err: any) => void, ): Promise { const cluster = this.config.getCurrentCluster(); if (!cluster) { @@ -76,20 +77,18 @@ export class Watch { // ignore parse errors } }); - let errOut: Error | null = null; - stream.on('error', (err) => { - errOut = err; - done(err); - }); - stream.on('close', () => done(errOut)); + stream.on('error', error); + stream.on('close', done); - const req = this.requestImpl.webRequest(requestOptions, (error, response, body) => { - if (error) { - done(error); + const req = this.requestImpl.webRequest(requestOptions, (err, response, body) => { + if (err) { + error(err); + done(); } else if (response && response.statusCode !== 200) { - done(new Error(response.statusMessage)); + error(new Error(response.statusMessage)); + done(); } else { - done(null); + done(); } }); req.pipe(stream); diff --git a/src/watch_test.ts b/src/watch_test.ts index 53361ca2a5..6b3d0c0bb2 100644 --- a/src/watch_test.ts +++ b/src/watch_test.ts @@ -1,6 +1,5 @@ import { expect } from 'chai'; import request = require('request'); -import { ReadableStreamBuffer, WritableStreamBuffer } from 'stream-buffers'; import { anyFunction, anything, capture, instance, mock, reset, verify, when } from 'ts-mockito'; import { KubeConfig } from './config'; @@ -60,8 +59,10 @@ describe('Watch', () => { path, {}, (phase: string, obj: string) => {}, - (err: any) => { + () => { doneCalled = true; + }, + (err: any) => { doneErr = err; }, ); @@ -130,8 +131,10 @@ describe('Watch', () => { receivedTypes.push(phase); receivedObjects.push(obj); }, - (err: any) => { + () => { doneCalled = true; + }, + (err: any) => { doneErr = err; }, ); @@ -153,7 +156,7 @@ describe('Watch', () => { doneCallback(null, null, null); expect(doneCalled).to.equal(true); - expect(doneErr).to.equal(null); + expect(doneErr).to.be.undefined; const errIn = { error: 'err' }; doneCallback(errIn, null, null); @@ -198,10 +201,8 @@ describe('Watch', () => { receivedTypes.push(phase); receivedObjects.push(obj); }, - (err: any) => { - doneCalled = true; - doneErr.push(err); - }, + () => (doneCalled = true), + (err: any) => doneErr.push(err), ); verify(fakeRequestor.webRequest(anything(), anyFunction())); @@ -217,9 +218,8 @@ describe('Watch', () => { expect(receivedObjects).to.deep.equal([obj1.object]); expect(doneCalled).to.equal(true); - expect(doneErr.length).to.equal(2); + expect(doneErr.length).to.equal(1); expect(doneErr[0]).to.deep.equal(errIn); - expect(doneErr[1]).to.deep.equal(errIn); }); it('should handle server side close correctly', async () => { @@ -258,10 +258,8 @@ describe('Watch', () => { receivedTypes.push(phase); receivedObjects.push(obj); }, - (err: any) => { - doneCalled = true; - doneErr = err; - }, + () => (doneCalled = true), + (err: any) => (doneErr = err), ); verify(fakeRequestor.webRequest(anything(), anyFunction())); @@ -277,7 +275,7 @@ describe('Watch', () => { expect(receivedObjects).to.deep.equal([obj1.object]); expect(doneCalled).to.equal(true); - expect(doneErr).to.be.null; + expect(doneErr).to.be.undefined; }); it('should ignore JSON parse errors', async () => { @@ -317,6 +315,9 @@ describe('Watch', () => { () => { /* ignore */ }, + () => { + /* ignore */ + }, ); verify(fakeRequestor.webRequest(anything(), anyFunction())); @@ -332,7 +333,7 @@ describe('Watch', () => { const kc = new KubeConfig(); const watch = new Watch(kc); - const promise = watch.watch('/some/path', {}, () => {}, () => {}); + const promise = watch.watch('/some/path', {}, () => {}, () => {}, () => {}); expect(promise).to.be.rejected; }); });