Skip to content

Commit

Permalink
SO service: don't start migration if shutdown occurs first (elastic#1…
Browse files Browse the repository at this point in the history
…70309)

## Summary

Fix elastic#109684

Most of the work was already done in
elastic#110677, this PR just remove a
deprecated call to `toPromise()`, a better error message and a unit test
to assert the behavior.
  • Loading branch information
pgayvallet authored Nov 2, 2023
1 parent 3647891 commit 9e9e9fc
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import {
typeRegistryInstanceMock,
applyTypeDefaultsMock,
} from './saved_objects_service.test.mocks';
import { BehaviorSubject, firstValueFrom } from 'rxjs';
import { BehaviorSubject, firstValueFrom, EMPTY } from 'rxjs';
import { skip } from 'rxjs/operators';
import { type RawPackageInfo, Env } from '@kbn/config';
import { ByteSizeValue } from '@kbn/config-schema';
Expand Down Expand Up @@ -549,6 +549,24 @@ describe('SavedObjectsService', () => {
expect(migratorInstanceMock.runMigrations).toHaveBeenCalledTimes(1);
});

it('does not start the migration if esNodesCompatibility$ is closed before calling `start`', async () => {
expect.assertions(2);
const coreContext = createCoreContext({ skipMigration: false });
const soService = new SavedObjectsService(coreContext);
const setupDeps = createSetupDeps();
// Create an new subject so that we can control when isCompatible=true
// is emitted.
setupDeps.elasticsearch.esNodesCompatibility$ = EMPTY;
await soService.setup(setupDeps);
await expect(() =>
soService.start(createStartDeps())
).rejects.toThrowErrorMatchingInlineSnapshot(
`"esNodesCompatibility$ was closed before emitting"`
);

expect(migratorInstanceMock.runMigrations).not.toHaveBeenCalled();
});

it('resolves with KibanaMigrator after waiting for migrations to complete', async () => {
const coreContext = createCoreContext({ skipMigration: false });
const soService = new SavedObjectsService(coreContext);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
*/

import { Subject, Observable, firstValueFrom, of } from 'rxjs';
import { filter, take, switchMap } from 'rxjs/operators';
import { filter, switchMap } from 'rxjs/operators';
import type { Logger } from '@kbn/logging';
import { stripVersionQualifier } from '@kbn/std';
import type { ServiceStatus } from '@kbn/core-status-common';
Expand Down Expand Up @@ -262,20 +262,25 @@ export class SavedObjectsService
'Waiting until all Elasticsearch nodes are compatible with Kibana before starting saved objects migrations...'
);

// The Elasticsearch service should already ensure that, but let's double check just in case.
// Should it be replaced with elasticsearch.status$ API instead?
const compatibleNodes = await this.setupDeps!.elasticsearch.esNodesCompatibility$.pipe(
filter((nodes) => nodes.isCompatible),
take(1)
).toPromise();

// Running migrations only if we got compatible nodes.
// It may happen that the observable completes due to Kibana shutting down
// and the promise above fulfils as undefined. We shouldn't trigger migrations at that point.
if (compatibleNodes) {
this.logger.info('Starting saved objects migrations');
await migrator.runMigrations();
try {
// The Elasticsearch service should already ensure that, but let's double check just in case.
// Should it be replaced with elasticsearch.status$ API instead?
await firstValueFrom(
this.setupDeps!.elasticsearch.esNodesCompatibility$.pipe(
filter((nodes) => nodes.isCompatible)
)
);
} catch (e) {
// EmptyError means esNodesCompatibility$ was closed before emitting
// which should only occur if the server is shutdown before being fully started.
if (e.name === 'EmptyError') {
throw new Error('esNodesCompatibility$ was closed before emitting');
}
throw e;
}

this.logger.info('Starting saved objects migrations');
await migrator.runMigrations();
}

const createRepository = (
Expand Down

0 comments on commit 9e9e9fc

Please sign in to comment.