From 8d3e9332a960ab034201a50fef07dc18f5c7f88d Mon Sep 17 00:00:00 2001 From: Jan Kryl Date: Sat, 2 Jan 2021 23:29:15 +0100 Subject: [PATCH] cache: Fix connection leak. Make sure that done callback of a watcher isn't called more than once and call abort() on request object to close the connection when done. --- src/cache.ts | 27 +++--- src/cache_test.ts | 129 +++++++++-------------------- src/watch.ts | 84 +++++++++++++------ src/watch_test.ts | 207 +++++++++++++++++++++++++++------------------- 4 files changed, 227 insertions(+), 220 deletions(-) diff --git a/src/cache.ts b/src/cache.ts index 1b34429758..4f6db95ec3 100644 --- a/src/cache.ts +++ b/src/cache.ts @@ -1,6 +1,6 @@ import { ADD, DELETE, ERROR, Informer, ListPromise, ObjectCallback, UPDATE } from './informer'; import { KubernetesObject } from './types'; -import { Watch } from './watch'; +import { RequestResult, Watch } from './watch'; export interface ObjectCache { get(name: string, namespace?: string): T | undefined; @@ -12,7 +12,7 @@ export class ListWatch implements ObjectCache, In private resourceVersion: string; private readonly indexCache: { [key: string]: T[] } = {}; private readonly callbackCache: { [key: string]: Array> } = {}; - private stopped: boolean; + private request: RequestResult | undefined; public constructor( private readonly path: string, @@ -25,19 +25,20 @@ export class ListWatch implements ObjectCache, In this.callbackCache[DELETE] = []; this.callbackCache[ERROR] = []; this.resourceVersion = ''; - this.stopped = true; if (autoStart) { - this.start(); + this.doneHandler(null); } } public async start(): Promise { - this.stopped = false; - await this.doneHandler(); + await this.doneHandler(null); } public stop(): void { - this.stopped = true; + if (this.request) { + this.request.abort(); + this.request = undefined; + } } public on(verb: string, cb: ObjectCallback): void { @@ -79,15 +80,10 @@ export class ListWatch implements ObjectCache, In return this.resourceVersion; } - private async errorHandler(err: any): Promise { + private async doneHandler(err: any): Promise { + this.stop(); if (err) { this.callbackCache[ERROR].forEach((elt: ObjectCallback) => elt(err)); - } - this.stopped = true; - } - - private async doneHandler(): Promise { - if (this.stopped) { return; } // TODO: Don't always list here for efficiency @@ -106,12 +102,11 @@ export class ListWatch implements ObjectCache, In } }); this.addOrUpdateItems(list.items); - await this.watch.watch( + this.request = await this.watch.watch( this.path, { resourceVersion: list.metadata!.resourceVersion }, this.watchHandler.bind(this), this.doneHandler.bind(this), - this.errorHandler.bind(this), ); } diff --git a/src/cache_test.ts b/src/cache_test.ts index 4250bb240c..661f4e46ea 100644 --- a/src/cache_test.ts +++ b/src/cache_test.ts @@ -4,14 +4,22 @@ import chaiAsPromised = require('chai-as-promised'); import * as mock from 'ts-mockito'; import http = require('http'); +import { Duplex } from 'stream'; +import { EventEmitter } from 'ws'; import { V1Namespace, V1NamespaceList, V1ObjectMeta, V1Pod, V1ListMeta } from './api'; import { deleteObject, ListWatch, deleteItems } from './cache'; -import { ListCallback, ADD, UPDATE, DELETE, ListPromise } from './informer'; +import { ADD, UPDATE, DELETE, ListPromise } from './informer'; use(chaiAsPromised); -import { Watch } from './watch'; +import { RequestResult, Watch } from './watch'; + +// Object replacing real Request object in the test +class FakeRequest extends EventEmitter implements RequestResult { + pipe(stream: Duplex): void {} + abort() {} +} describe('ListWatchCache', () => { it('should throw on unknown update', () => { @@ -24,7 +32,12 @@ describe('ListWatchCache', () => { (resolve, reject) => { resolve({ response: {} as http.IncomingMessage, - body: {} as V1NamespaceList, + body: { + metadata: { + resourceVersion: '12345', + } as V1ListMeta, + items: [], + } as V1NamespaceList, }); }, ); @@ -88,15 +101,9 @@ describe('ListWatchCache', () => { }; const promise = new Promise((resolve) => { mock.when( - fakeWatch.watch( - mock.anything(), - mock.anything(), - mock.anything(), - mock.anything(), - mock.anything(), - ), + fakeWatch.watch(mock.anything(), mock.anything(), mock.anything(), mock.anything()), ).thenCall(() => { - resolve(null); + resolve(new FakeRequest()); }); }); const cache = new ListWatch('/some/path', mock.instance(fakeWatch), listFn); @@ -159,7 +166,7 @@ describe('ListWatchCache', () => { } as V1ObjectMeta, } as V1Namespace); - await doneHandler(); + await doneHandler(null); expect(cache.list().length, 'all namespace list').to.equal(1); expect(cache.list('default').length, 'default namespace list').to.equal(1); expect(cache.list('other'), 'other namespace list').to.be.undefined; @@ -198,15 +205,9 @@ describe('ListWatchCache', () => { }; const promise = new Promise((resolve) => { mock.when( - fakeWatch.watch( - mock.anything(), - mock.anything(), - mock.anything(), - mock.anything(), - mock.anything(), - ), + fakeWatch.watch(mock.anything(), mock.anything(), mock.anything(), mock.anything()), ).thenCall(() => { - resolve(null); + resolve(new FakeRequest()); }); }); const informer = new ListWatch('/some/path', mock.instance(fakeWatch), listFn); @@ -285,15 +286,9 @@ describe('ListWatchCache', () => { }; const promise = new Promise((resolve) => { mock.when( - fakeWatch.watch( - mock.anything(), - mock.anything(), - mock.anything(), - mock.anything(), - mock.anything(), - ), + fakeWatch.watch(mock.anything(), mock.anything(), mock.anything(), mock.anything()), ).thenCall(() => { - resolve(null); + resolve(new FakeRequest()); }); }); const informer = new ListWatch('/some/path', mock.instance(fakeWatch), listFn); @@ -362,15 +357,9 @@ describe('ListWatchCache', () => { }; let promise = new Promise((resolve) => { mock.when( - fakeWatch.watch( - mock.anything(), - mock.anything(), - mock.anything(), - mock.anything(), - mock.anything(), - ), + fakeWatch.watch(mock.anything(), mock.anything(), mock.anything(), mock.anything()), ).thenCall(() => { - resolve(null); + resolve(new FakeRequest()); }); }); const informer = new ListWatch('/some/path', mock.instance(fakeWatch), listFn, false); @@ -390,18 +379,12 @@ describe('ListWatchCache', () => { promise = new Promise((resolve) => { mock.when( - fakeWatch.watch( - mock.anything(), - mock.anything(), - mock.anything(), - mock.anything(), - mock.anything(), - ), + fakeWatch.watch(mock.anything(), mock.anything(), mock.anything(), mock.anything()), ).thenCall(() => { - resolve(null); + resolve(new FakeRequest()); }); }); - doneHandler(); + doneHandler(null); await promise; expect(addObjects).to.deep.equal(list); expect(updateObjects).to.deep.equal(list); @@ -447,15 +430,9 @@ describe('ListWatchCache', () => { }; let promise = new Promise((resolve) => { mock.when( - fakeWatch.watch( - mock.anything(), - mock.anything(), - mock.anything(), - mock.anything(), - mock.anything(), - ), + fakeWatch.watch(mock.anything(), mock.anything(), mock.anything(), mock.anything()), ).thenCall(() => { - resolve(null); + resolve(new FakeRequest()); }); }); const informer = new ListWatch('/some/path', mock.instance(fakeWatch), listFn, false); @@ -476,19 +453,13 @@ describe('ListWatchCache', () => { promise = new Promise((resolve) => { mock.when( - fakeWatch.watch( - mock.anything(), - mock.anything(), - mock.anything(), - mock.anything(), - mock.anything(), - ), + fakeWatch.watch(mock.anything(), mock.anything(), mock.anything(), mock.anything()), ).thenCall(() => { - resolve(null); + resolve(new FakeRequest()); }); }); listObj.items = list2; - doneHandler(); + doneHandler(null); await promise; expect(addObjects).to.deep.equal(list); expect(updateObjects).to.deep.equal(list2); @@ -536,15 +507,9 @@ describe('ListWatchCache', () => { }; const promise = new Promise((resolve) => { mock.when( - fakeWatch.watch( - mock.anything(), - mock.anything(), - mock.anything(), - mock.anything(), - mock.anything(), - ), + fakeWatch.watch(mock.anything(), mock.anything(), mock.anything(), mock.anything()), ).thenCall(() => { - resolve(null); + resolve(new FakeRequest()); }); }); const cache = new ListWatch('/some/path', mock.instance(fakeWatch), listFn); @@ -662,13 +627,7 @@ describe('ListWatchCache', () => { }; const watchCalled = new Promise((resolve) => { mock.when( - fakeWatch.watch( - mock.anything(), - mock.anything(), - mock.anything(), - mock.anything(), - mock.anything(), - ), + fakeWatch.watch(mock.anything(), mock.anything(), mock.anything(), mock.anything()), ).thenCall(resolve); }); const informer = new ListWatch('/some/path', mock.instance(fakeWatch), listFn); @@ -727,13 +686,7 @@ describe('ListWatchCache', () => { }; const watchCalled = new Promise((resolve) => { mock.when( - fakeWatch.watch( - mock.anything(), - mock.anything(), - mock.anything(), - mock.anything(), - mock.anything(), - ), + fakeWatch.watch(mock.anything(), mock.anything(), mock.anything(), mock.anything()), ).thenCall(resolve); }); const informer = new ListWatch('/some/path', mock.instance(fakeWatch), listFn); @@ -820,13 +773,7 @@ describe('ListWatchCache', () => { }; const watchCalled = new Promise((resolve) => { mock.when( - fakeWatch.watch( - mock.anything(), - mock.anything(), - mock.anything(), - mock.anything(), - mock.anything(), - ), + fakeWatch.watch(mock.anything(), mock.anything(), mock.anything(), mock.anything()), ).thenCall(resolve); }); const informer = new ListWatch('/some/path', mock.instance(fakeWatch), listFn); diff --git a/src/watch.ts b/src/watch.ts index 80f4903e3d..da14a8deed 100644 --- a/src/watch.ts +++ b/src/watch.ts @@ -1,5 +1,6 @@ import byline = require('byline'); import request = require('request'); +import { Duplex } from 'stream'; import { KubeConfig } from './config'; export interface WatchUpdate { @@ -7,24 +8,49 @@ export interface WatchUpdate { object: object; } +// We cannot use the type ReadableStream because Request returned by request +// library is not a true ReadableStream and there is extra abort method. +export interface RequestResult { + pipe(stream: Duplex): void; + on(ev: string, cb: (arg: any) => void): void; + abort(): void; +} + export interface Response { statusCode: number; statusMessage: string; } +// The contract is that the provided request library will return a readable +// stream with abort method. export interface RequestInterface { - webRequest( - opts: request.Options, - callback: (err: object | null, response: Response | null, body: object | null) => void, - ): any; + webRequest(opts: request.OptionsWithUri): RequestResult; } export class DefaultRequest implements RequestInterface { - public webRequest( - opts: request.Options, - callback: (err: object | null, response: Response | null, body: object | null) => void, - ): any { - return request(opts, callback); + // requestImpl can be overriden in case we need to test mocked DefaultRequest + private requestImpl: (opts: request.OptionsWithUri) => request.Request; + + constructor(requestImpl?: (opts: request.OptionsWithUri) => request.Request) { + this.requestImpl = requestImpl ? requestImpl : request; + } + + // Using request lib can be confusing when combining Stream- with Callback- + // style API. We avoid the callback and handle HTTP response errors, that + // would otherwise require a different error handling, in a transparent way + // to the user (see github issue request/request#647 for more info). + public webRequest(opts: request.OptionsWithUri): RequestResult { + const req = this.requestImpl(opts); + // pause the stream until we get a response not to miss any bytes + req.pause(); + req.on('response', (resp) => { + if (resp.statusCode === 200) { + req.resume(); + } else { + req.emit('error', new Error(resp.statusMessage)); + } + }); + return req; } } @@ -42,12 +68,17 @@ export class Watch { } } + // Watch the resource and call provided callback with parsed json object + // upon event received over the watcher connection. + // + // "done" callback is called either when connection is closed or when there + // is an error. In either case, watcher takes care of properly closing the + // underlaying connection so that it doesn't leak any resources. public async watch( path: string, queryParams: any, callback: (phase: string, apiObj: any, watchObj?: any) => void, - done: () => void, - error: (err: any) => void, + done: (err: any) => void, ): Promise { const cluster = this.config.getCurrentCluster(); if (!cluster) { @@ -58,19 +89,31 @@ export class Watch { queryParams.watch = true; const headerParams: any = {}; - const requestOptions: request.Options = { + const requestOptions: request.OptionsWithUri = { method: 'GET', qs: queryParams, headers: headerParams, uri: url, useQuerystring: true, json: true, - forever: true, - timeout: 0, + pool: false, }; await this.config.applyToRequest(requestOptions); + let req; + let doneCalled: boolean = false; + const doneCallOnce = (err: any) => { + if (!doneCalled) { + req.abort(); + doneCalled = true; + done(err); + } + }; + req = this.requestImpl.webRequest(requestOptions); const stream = byline.createStream(); + req.on('error', doneCallOnce); + stream.on('error', doneCallOnce); + stream.on('close', () => doneCallOnce(null)); stream.on('data', (line) => { try { const data = JSON.parse(line); @@ -79,20 +122,7 @@ export class Watch { // ignore parse errors } }); - stream.on('error', error); - stream.on('close', done); - const req = this.requestImpl.webRequest(requestOptions, (err, response, body) => { - if (err) { - error(err); - done(); - } else if (response && response.statusCode !== 200) { - error(new Error(response.statusMessage)); - done(); - } else { - done(); - } - }); req.pipe(stream); return req; } diff --git a/src/watch_test.ts b/src/watch_test.ts index 6b3d0c0bb2..2bf6cce5ed 100644 --- a/src/watch_test.ts +++ b/src/watch_test.ts @@ -1,10 +1,12 @@ import { expect } from 'chai'; import request = require('request'); -import { anyFunction, anything, capture, instance, mock, reset, verify, when } from 'ts-mockito'; +import { Duplex } from 'stream'; +import { anything, capture, instance, mock, spy, verify, when } from 'ts-mockito'; +import { EventEmitter } from 'ws'; import { KubeConfig } from './config'; import { Cluster, Context, User } from './config_types'; -import { DefaultRequest, Watch } from './watch'; +import { DefaultRequest, RequestResult, Watch } from './watch'; const server = 'foo.company.com'; @@ -32,23 +34,79 @@ const fakeConfig: { ], }; +// Object replacing real Request object in the test +class FakeRequest extends EventEmitter implements RequestResult { + pipe(stream: Duplex): void {} + abort() {} +} + describe('Watch', () => { + describe('DefaultRequest', () => { + it('should resume stream upon http status 200', (done) => { + const defaultRequest = new DefaultRequest((opts) => { + const req = request(opts); + // to prevent request from firing we must replace start() method + // before "nextTick" happens + (req).start = () => {}; + (req).resume = () => done(); + return req; + }); + const req = defaultRequest.webRequest({ + uri: `http://${server}/testURI`, + }); + req.on('error', done); + (req).emit('response', { + statusCode: 200, + }); + }); + + it('should handle non-200 error codes', (done) => { + const defaultRequest = new DefaultRequest((opts) => { + const req = request(opts); + (req).start = () => {}; + return req; + }); + const req = defaultRequest.webRequest({ + uri: `http://${server}/testURI`, + }); + req.on('error', (err) => { + expect(err.toString()).to.equal('Error: Conflict'); + done(); + }); + (req).emit('response', { + statusCode: 409, + statusMessage: 'Conflict', + }); + }); + }); + it('should construct correctly', () => { const kc = new KubeConfig(); const watch = new Watch(kc); }); - it('should handle non-200 error codes', async () => { + it('should handle error from request stream', async () => { const kc = new KubeConfig(); - Object.assign(kc, fakeConfig); - const fakeRequestor = mock(DefaultRequest); - const watch = new Watch(kc, instance(fakeRequestor)); + const fakeRequest = new FakeRequest(); + const spiedRequest = spy(fakeRequest); + let aborted = false; - const fakeRequest = { - pipe: (stream) => {}, - }; + when(spiedRequest.pipe(anything())).thenCall(() => { + fakeRequest.emit('error', new Error('some error')); + }); + when(spiedRequest.abort()).thenCall(() => { + aborted = true; + }); - when(fakeRequestor.webRequest(anything(), anyFunction())).thenReturn(fakeRequest); + Object.assign(kc, fakeConfig); + const watch = new Watch(kc, { + webRequest: (opts: request.OptionsWithUri) => { + expect(opts.uri).to.equal(`${server}${path}`); + expect(opts.method).to.equal('GET'); + expect(opts.json).to.equal(true); + return fakeRequest; + }, + }); const path = '/some/path/to/object'; @@ -59,36 +117,17 @@ describe('Watch', () => { path, {}, (phase: string, obj: string) => {}, - () => { - doneCalled = true; - }, (err: any) => { + doneCalled = true; doneErr = err; }, ); - - verify(fakeRequestor.webRequest(anything(), anyFunction())); - - const [opts, doneCallback] = capture(fakeRequestor.webRequest).last(); - const reqOpts: request.OptionsWithUri = opts as request.OptionsWithUri; - - expect(reqOpts.uri).to.equal(`${server}${path}`); - expect(reqOpts.method).to.equal('GET'); - expect(reqOpts.json).to.equal(true); - - expect(doneCalled).to.equal(false); - - const resp = { - statusCode: 409, - statusMessage: 'Conflict', - }; - doneCallback(null, resp, {}); - expect(doneCalled).to.equal(true); - expect(doneErr.toString()).to.equal('Error: Conflict'); + expect(doneErr.toString()).to.equal('Error: some error'); + expect(aborted).to.equal(true); }); - it('should watch correctly', async () => { + it('should not call watch done callback more than once', async () => { const kc = new KubeConfig(); Object.assign(kc, fakeConfig); const fakeRequestor = mock(DefaultRequest); @@ -108,20 +147,21 @@ describe('Watch', () => { }, }; - const fakeRequest = { - pipe: (stream) => { - stream.write(JSON.stringify(obj1) + '\n'); - stream.write(JSON.stringify(obj2) + '\n'); - }, + var stream; + const fakeRequest = new FakeRequest(); + fakeRequest.pipe = function(arg) { + stream = arg; + stream.write(JSON.stringify(obj1) + '\n'); + stream.write(JSON.stringify(obj2) + '\n'); }; - when(fakeRequestor.webRequest(anything(), anyFunction())).thenReturn(fakeRequest); + when(fakeRequestor.webRequest(anything())).thenReturn(fakeRequest); const path = '/some/path/to/object'; const receivedTypes: string[] = []; const receivedObjects: string[] = []; - let doneCalled = false; + let doneCalled = 0; let doneErr: any; await watch.watch( @@ -131,17 +171,15 @@ describe('Watch', () => { receivedTypes.push(phase); receivedObjects.push(obj); }, - () => { - doneCalled = true; - }, (err: any) => { + doneCalled += 1; doneErr = err; }, ); - verify(fakeRequestor.webRequest(anything(), anyFunction())); + verify(fakeRequestor.webRequest(anything())); - const [opts, doneCallback] = capture(fakeRequestor.webRequest).last(); + const [opts] = capture(fakeRequestor.webRequest).last(); const reqOpts: request.OptionsWithUri = opts as request.OptionsWithUri; expect(reqOpts.uri).to.equal(`${server}${path}`); @@ -151,16 +189,15 @@ describe('Watch', () => { expect(receivedTypes).to.deep.equal([obj1.type, obj2.type]); expect(receivedObjects).to.deep.equal([obj1.object, obj2.object]); - expect(doneCalled).to.equal(false); - - doneCallback(null, null, null); - - expect(doneCalled).to.equal(true); - expect(doneErr).to.be.undefined; + expect(doneCalled).to.equal(0); const errIn = { error: 'err' }; - doneCallback(errIn, null, null); + stream.emit('error', errIn); expect(doneErr).to.deep.equal(errIn); + expect(doneCalled).to.equal(1); + + stream.end(); + expect(doneCalled).to.equal(1); }); it('should handle errors correctly', async () => { @@ -177,15 +214,14 @@ describe('Watch', () => { }; const errIn = { error: 'err' }; - const fakeRequest = { - pipe: (stream) => { - stream.write(JSON.stringify(obj1) + '\n'); - stream.emit('error', errIn); - stream.emit('close'); - }, + const fakeRequest = new FakeRequest(); + fakeRequest.pipe = function(stream) { + stream.write(JSON.stringify(obj1) + '\n'); + stream.emit('error', errIn); + stream.emit('close'); }; - when(fakeRequestor.webRequest(anything(), anyFunction())).thenReturn(fakeRequest); + when(fakeRequestor.webRequest(anything())).thenReturn(fakeRequest); const path = '/some/path/to/object'; @@ -201,13 +237,15 @@ describe('Watch', () => { receivedTypes.push(phase); receivedObjects.push(obj); }, - () => (doneCalled = true), - (err: any) => doneErr.push(err), + (err: any) => { + doneCalled = true; + doneErr.push(err); + }, ); - verify(fakeRequestor.webRequest(anything(), anyFunction())); + verify(fakeRequestor.webRequest(anything())); - const [opts, doneCallback] = capture(fakeRequestor.webRequest).last(); + const [opts] = capture(fakeRequestor.webRequest).last(); const reqOpts: request.OptionsWithUri = opts as request.OptionsWithUri; expect(reqOpts.uri).to.equal(`${server}${path}`); @@ -235,14 +273,13 @@ describe('Watch', () => { }, }; - const fakeRequest = { - pipe: (stream) => { - stream.write(JSON.stringify(obj1) + '\n'); - stream.emit('close'); - }, + const fakeRequest = new FakeRequest(); + fakeRequest.pipe = function(stream) { + stream.write(JSON.stringify(obj1) + '\n'); + stream.emit('close'); }; - when(fakeRequestor.webRequest(anything(), anyFunction())).thenReturn(fakeRequest); + when(fakeRequestor.webRequest(anything())).thenReturn(fakeRequest); const path = '/some/path/to/object'; @@ -258,13 +295,15 @@ describe('Watch', () => { receivedTypes.push(phase); receivedObjects.push(obj); }, - () => (doneCalled = true), - (err: any) => (doneErr = err), + (err: any) => { + doneCalled = true; + doneErr = err; + }, ); - verify(fakeRequestor.webRequest(anything(), anyFunction())); + verify(fakeRequestor.webRequest(anything())); - const [opts, doneCallback] = capture(fakeRequestor.webRequest).last(); + const [opts] = capture(fakeRequestor.webRequest).last(); const reqOpts: request.OptionsWithUri = opts as request.OptionsWithUri; expect(reqOpts.uri).to.equal(`${server}${path}`); @@ -275,7 +314,7 @@ describe('Watch', () => { expect(receivedObjects).to.deep.equal([obj1.object]); expect(doneCalled).to.equal(true); - expect(doneErr).to.be.undefined; + expect(doneErr).to.be.null; }); it('should ignore JSON parse errors', async () => { @@ -291,14 +330,13 @@ describe('Watch', () => { }, }; - const fakeRequest = { - pipe: (stream) => { - stream.write(JSON.stringify(obj) + '\n'); - stream.write('{"truncated json\n'); - }, + const fakeRequest = new FakeRequest(); + fakeRequest.pipe = function(stream) { + stream.write(JSON.stringify(obj) + '\n'); + stream.write('{"truncated json\n'); }; - when(fakeRequestor.webRequest(anything(), anyFunction())).thenReturn(fakeRequest); + when(fakeRequestor.webRequest(anything())).thenReturn(fakeRequest); const path = '/some/path/to/object'; @@ -313,14 +351,11 @@ describe('Watch', () => { receivedObjects.push(recievedObject); }, () => { - /* ignore */ - }, - () => { - /* ignore */ + // ignore }, ); - verify(fakeRequestor.webRequest(anything(), anyFunction())); + verify(fakeRequestor.webRequest(anything())); const [opts, doneCallback] = capture(fakeRequestor.webRequest).last(); const reqOpts: request.OptionsWithUri = opts as request.OptionsWithUri; @@ -333,7 +368,7 @@ describe('Watch', () => { const kc = new KubeConfig(); const watch = new Watch(kc); - const promise = watch.watch('/some/path', {}, () => {}, () => {}, () => {}); + const promise = watch.watch('/some/path', {}, () => {}, () => {}); expect(promise).to.be.rejected; }); });