Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Don't start pollEsNodesVersion unless someone subscribes #56923

Merged
merged 8 commits into from
Feb 28, 2020
Merged
133 changes: 99 additions & 34 deletions src/core/server/elasticsearch/elasticsearch_service.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@ import { ElasticsearchService } from './elasticsearch_service';
import { elasticsearchServiceMock } from './elasticsearch_service.mock';
import { duration } from 'moment';

const delay = async (durationMs: number) =>
await new Promise(resolve => setTimeout(resolve, durationMs));

let elasticsearchService: ElasticsearchService;
const configService = configServiceMock.create();
const deps = {
Expand All @@ -42,7 +45,7 @@ configService.atPath.mockReturnValue(
new BehaviorSubject({
hosts: ['http://1.2.3.4'],
healthCheck: {
delay: duration(2000),
delay: duration(10),
},
ssl: {
verificationMode: 'none',
Expand Down Expand Up @@ -125,21 +128,21 @@ describe('#setup', () => {

const config = MockClusterClient.mock.calls[0][0];
expect(config).toMatchInlineSnapshot(`
Object {
"healthCheckDelay": "PT2S",
"hosts": Array [
"http://8.8.8.8",
],
"logQueries": true,
"requestHeadersWhitelist": Array [
undefined,
],
"ssl": Object {
"certificate": "certificate-value",
"verificationMode": "none",
},
}
`);
Object {
"healthCheckDelay": "PT0.01S",
"hosts": Array [
"http://8.8.8.8",
],
"logQueries": true,
"requestHeadersWhitelist": Array [
undefined,
],
"ssl": Object {
"certificate": "certificate-value",
"verificationMode": "none",
},
}
`);
});
it('falls back to elasticsearch config if custom config not passed', async () => {
const setupContract = await elasticsearchService.setup(deps);
Expand All @@ -150,24 +153,24 @@ Object {

const config = MockClusterClient.mock.calls[0][0];
expect(config).toMatchInlineSnapshot(`
Object {
"healthCheckDelay": "PT2S",
"hosts": Array [
"http://1.2.3.4",
],
"requestHeadersWhitelist": Array [
undefined,
],
"ssl": Object {
"alwaysPresentCertificate": undefined,
"certificate": undefined,
"certificateAuthorities": undefined,
"key": undefined,
"keyPassphrase": undefined,
"verificationMode": "none",
},
}
`);
Object {
"healthCheckDelay": "PT0.01S",
"hosts": Array [
"http://1.2.3.4",
],
"requestHeadersWhitelist": Array [
undefined,
],
"ssl": Object {
"alwaysPresentCertificate": undefined,
"certificate": undefined,
"certificateAuthorities": undefined,
"key": undefined,
"keyPassphrase": undefined,
"verificationMode": "none",
},
}
`);
});

it('does not merge elasticsearch hosts if custom config overrides', async () => {
Expand Down Expand Up @@ -213,6 +216,45 @@ Object {
`);
});
});

it('esNodeVersionCompatibility$ only starts polling when subscribed to', async done => {
const mockAdminClusterClientInstance = elasticsearchServiceMock.createClusterClient();
const mockDataClusterClientInstance = elasticsearchServiceMock.createClusterClient();
MockClusterClient.mockImplementationOnce(
() => mockAdminClusterClientInstance
).mockImplementationOnce(() => mockDataClusterClientInstance);

mockAdminClusterClientInstance.callAsInternalUser.mockRejectedValue(new Error());

const setupContract = await elasticsearchService.setup(deps);
await delay(10);

expect(mockAdminClusterClientInstance.callAsInternalUser).toHaveBeenCalledTimes(0);
setupContract.esNodesCompatibility$.subscribe(() => {
expect(mockAdminClusterClientInstance.callAsInternalUser).toHaveBeenCalledTimes(1);
done();
});
});

it('esNodeVersionCompatibility$ stops polling when unsubscribed from', async done => {
const mockAdminClusterClientInstance = elasticsearchServiceMock.createClusterClient();
const mockDataClusterClientInstance = elasticsearchServiceMock.createClusterClient();
MockClusterClient.mockImplementationOnce(
() => mockAdminClusterClientInstance
).mockImplementationOnce(() => mockDataClusterClientInstance);

mockAdminClusterClientInstance.callAsInternalUser.mockRejectedValue(new Error());

const setupContract = await elasticsearchService.setup(deps);

expect(mockAdminClusterClientInstance.callAsInternalUser).toHaveBeenCalledTimes(0);
const sub = setupContract.esNodesCompatibility$.subscribe(async () => {
sub.unsubscribe();
await delay(100);
expect(mockAdminClusterClientInstance.callAsInternalUser).toHaveBeenCalledTimes(1);
done();
});
});
});

describe('#stop', () => {
Expand All @@ -229,4 +271,27 @@ describe('#stop', () => {
expect(mockAdminClusterClientInstance.close).toHaveBeenCalledTimes(1);
expect(mockDataClusterClientInstance.close).toHaveBeenCalledTimes(1);
});

it('stops pollEsNodeVersions even if there are active subscriptions', async done => {
expect.assertions(2);
const mockAdminClusterClientInstance = elasticsearchServiceMock.createCustomClusterClient();
const mockDataClusterClientInstance = elasticsearchServiceMock.createCustomClusterClient();

MockClusterClient.mockImplementationOnce(
() => mockAdminClusterClientInstance
).mockImplementationOnce(() => mockDataClusterClientInstance);

mockAdminClusterClientInstance.callAsInternalUser.mockRejectedValue(new Error());

const setupContract = await elasticsearchService.setup(deps);

setupContract.esNodesCompatibility$.subscribe(async () => {
expect(mockAdminClusterClientInstance.callAsInternalUser).toHaveBeenCalledTimes(1);

await elasticsearchService.stop();
await delay(100);
expect(mockAdminClusterClientInstance.callAsInternalUser).toHaveBeenCalledTimes(1);
done();
});
});
});
49 changes: 20 additions & 29 deletions src/core/server/elasticsearch/elasticsearch_service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,17 @@
* under the License.
*/

import { ConnectableObservable, Observable, Subscription } from 'rxjs';
import { filter, first, map, publishReplay, switchMap, take } from 'rxjs/operators';
import { ConnectableObservable, Observable, Subscription, Subject } from 'rxjs';
import {
filter,
first,
map,
publishReplay,
switchMap,
take,
shareReplay,
takeUntil,
} from 'rxjs/operators';

import { CoreService } from '../../types';
import { merge } from '../../utils';
Expand Down Expand Up @@ -47,13 +56,8 @@ interface SetupDeps {
export class ElasticsearchService implements CoreService<InternalElasticsearchServiceSetup> {
private readonly log: Logger;
private readonly config$: Observable<ElasticsearchConfig>;
private subscriptions: {
client?: Subscription;
esNodesCompatibility?: Subscription;
} = {
client: undefined,
esNodesCompatibility: undefined,
};
private subscription: Subscription | undefined;
private stop$ = new Subject();
private kibanaVersion: string;

constructor(private readonly coreContext: CoreContext) {
Expand All @@ -69,7 +73,7 @@ export class ElasticsearchService implements CoreService<InternalElasticsearchSe

const clients$ = this.config$.pipe(
filter(() => {
if (this.subscriptions.client !== undefined) {
if (this.subscription !== undefined) {
this.log.error('Clients cannot be changed after they are created');
return false;
}
Expand Down Expand Up @@ -100,7 +104,7 @@ export class ElasticsearchService implements CoreService<InternalElasticsearchSe
publishReplay(1)
) as ConnectableObservable<CoreClusterClients>;

this.subscriptions.client = clients$.connect();
this.subscription = clients$.connect();

const config = await this.config$.pipe(first()).toPromise();

Expand Down Expand Up @@ -164,18 +168,7 @@ export class ElasticsearchService implements CoreService<InternalElasticsearchSe
ignoreVersionMismatch: config.ignoreVersionMismatch,
esVersionCheckInterval: config.healthCheckDelay.asMilliseconds(),
kibanaVersion: this.kibanaVersion,
}).pipe(publishReplay(1));
rudolf marked this conversation as resolved.
Show resolved Hide resolved

this.subscriptions.esNodesCompatibility = (esNodesCompatibility$ as ConnectableObservable<
unknown
>).connect();

// TODO: Move to Status Service https://github.com/elastic/kibana/issues/41983
esNodesCompatibility$.subscribe(({ isCompatible, message }) => {
if (!isCompatible && message) {
this.log.error(message);
}
});
}).pipe(takeUntil(this.stop$), shareReplay({ refCount: true, bufferSize: 1 }));

return {
legacy: { config$: clients$.pipe(map(clients => clients.config)) },
Expand All @@ -195,12 +188,10 @@ export class ElasticsearchService implements CoreService<InternalElasticsearchSe

public async stop() {
this.log.debug('Stopping elasticsearch service');
// TODO(TS-3.7-ESLINT)
// eslint-disable-next-line no-unused-expressions
this.subscriptions.client?.unsubscribe();
// eslint-disable-next-line no-unused-expressions
this.subscriptions.esNodesCompatibility?.unsubscribe();
this.subscriptions = { client: undefined, esNodesCompatibility: undefined };
if (this.subscription !== undefined) {
this.subscription.unsubscribe();
}
this.stop$.next();
}

private createClusterClient(
Expand Down
8 changes: 8 additions & 0 deletions src/core/server/saved_objects/saved_objects_service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -374,6 +374,14 @@ export class SavedObjectsService
this.logger.info(
'Waiting until all Elasticsearch nodes are compatible with Kibana before starting saved objects migrations...'
);

// TODO: Move to Status Service https://github.com/elastic/kibana/issues/41983
this.setupDeps!.elasticsearch.esNodesCompatibility$.subscribe(({ isCompatible, message }) => {
if (!isCompatible && message) {
this.logger.error(message);
}
});

await this.setupDeps!.elasticsearch.esNodesCompatibility$.pipe(
filter(nodes => nodes.isCompatible),
take(1)
Expand Down