Skip to content

Commit

Permalink
Update status when summary is updated (#128769) (#129044)
Browse files Browse the repository at this point in the history
(cherry picked from commit abe70c5)

Co-authored-by: Nicolas Chaulet <nicolas.chaulet@elastic.co>
  • Loading branch information
kibanamachine and nchaulet authored Mar 31, 2022
1 parent de98fc0 commit bacfe26
Show file tree
Hide file tree
Showing 2 changed files with 69 additions and 24 deletions.
26 changes: 26 additions & 0 deletions src/core/server/status/plugins_status.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,32 @@ describe('PluginStatusService', () => {
]);
});

it('updates when a plugin status observable emits with the same level but a different summary', async () => {
const service = new PluginsStatusService({
core$: coreAllAvailable$,
pluginDependencies: new Map([['a', []]]),
});
const statusUpdates: Array<Record<PluginName, ServiceStatus>> = [];
const subscription = service
.getAll$()
// the first emission happens right after core services emit (see explanation above)
.pipe(skip(1))
.subscribe((pluginStatuses) => statusUpdates.push(pluginStatuses));

const aStatus$ = new BehaviorSubject<ServiceStatus>({
level: ServiceStatusLevels.available,
summary: 'summary initial',
});
service.set('a', aStatus$);
aStatus$.next({ level: ServiceStatusLevels.available, summary: 'summary updated' });
subscription.unsubscribe();

expect(statusUpdates).toEqual([
{ a: { level: ServiceStatusLevels.available, summary: 'summary initial' } },
{ a: { level: ServiceStatusLevels.available, summary: 'summary updated' } },
]);
});

it('emits an unavailable status if first emission times out, then continues future emissions', async () => {
const service = new PluginsStatusService(
{
Expand Down
67 changes: 43 additions & 24 deletions src/core/server/status/plugins_status.ts
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,12 @@ export class PluginsStatusService {

this.coreSubscription = deps.core$
.pipe(debounceTime(10))
.subscribe((coreStatus: CoreStatus) => this.updateCoreAndPluginStatuses(coreStatus));
.subscribe((coreStatus: CoreStatus) => {
this.coreStatus = coreStatus;
this.updateRootPluginsStatuses();
this.updateDependantStatuses(this.rootPlugins);
this.emitCurrentStatus();
});
}

/**
Expand All @@ -96,8 +101,19 @@ export class PluginsStatusService {

this.reportedStatusSubscriptions[plugin] = status$
// Set a timeout for externally-defined status Observables
.pipe(timeoutWith(this.statusTimeoutMs, status$.pipe(startWith(defaultStatus))))
.subscribe((status) => this.updatePluginReportedStatus(plugin, status));
.pipe(
timeoutWith(this.statusTimeoutMs, status$.pipe(startWith(defaultStatus))),
distinctUntilChanged()
)
.subscribe((status) => {
const levelChanged = this.updatePluginReportedStatus(plugin, status);

if (levelChanged) {
this.updateDependantStatuses([plugin]);
}

this.emitCurrentStatus();
});
}

/**
Expand Down Expand Up @@ -233,35 +249,33 @@ export class PluginsStatusService {
}

/**
* Updates the core services statuses and plugins' statuses
* according to the latest status reported by core services.
* @param {CoreStatus} coreStatus the latest status of core services
* Updates the root plugins statuses according to the current core services status
*/
private updateCoreAndPluginStatuses(coreStatus: CoreStatus): void {
this.coreStatus = coreStatus!;
private updateRootPluginsStatuses(): void {
const derivedStatus = getSummaryStatus(Object.entries(this.coreStatus), {
allAvailableSummary: `All dependencies are available`,
});

// note that the derived status is the same for all root plugins
this.rootPlugins.forEach((plugin) => {
this.pluginData[plugin].derivedStatus = derivedStatus;
if (!this.isReportingStatus[plugin]) {
// this root plugin has NOT registered any status Observable. Thus, its status is derived from core
this.pluginStatus[plugin] = derivedStatus;
}
});

this.updatePluginsStatuses(this.rootPlugins);
}

/**
* Determine the derived statuses of the specified plugins and their dependencies,
* updating them on the pluginData structure
* Optionally, if the plugins have not registered a custom status Observable, update their "current" status as well.
* @param {PluginName[]} plugins The names of the plugins to be updated
* Update the derived statuses of the specified plugins' dependant plugins,
* If impacted plugins have not registered a custom status Observable, update their "current" status as well.
* @param {PluginName[]} plugins The names of the plugins whose dependant plugins must be updated
*/
private updatePluginsStatuses(plugins: PluginName[]): void {
const toCheck = new Set<PluginName>(plugins);
private updateDependantStatuses(plugins: PluginName[]): void {
const toCheck = new Set<PluginName>();
plugins.forEach((plugin) =>
this.pluginData[plugin].reverseDependencies.forEach((revDep) => toCheck.add(revDep))
);

// Note that we are updating the plugins in an ordered fashion.
// This way, when updating plugin X (at depth = N),
Expand All @@ -276,9 +290,6 @@ export class PluginsStatusService {
this.pluginData[current].reverseDependencies.forEach((revDep) => toCheck.add(revDep));
}
}

this.pluginData$.next(this.pluginData);
this.pluginStatus$.next({ ...this.pluginStatus });
}

/**
Expand Down Expand Up @@ -328,15 +339,23 @@ export class PluginsStatusService {
* Updates the reported status for the given plugin, along with the status of its dependencies tree.
* @param {PluginName} plugin The name of the plugin whose reported status must be updated
* @param {ServiceStatus} reportedStatus The newly reported status for that plugin
* @return {boolean} true if the level of the reported status changed
*/
private updatePluginReportedStatus(plugin: PluginName, reportedStatus: ServiceStatus): void {
const previousReportedLevel = this.pluginData[plugin].reportedStatus?.level;
private updatePluginReportedStatus(plugin: PluginName, reportedStatus: ServiceStatus): boolean {
const previousReportedStatus = this.pluginData[plugin].reportedStatus;

this.pluginData[plugin].reportedStatus = reportedStatus;
this.pluginStatus[plugin] = reportedStatus;

if (reportedStatus.level !== previousReportedLevel) {
this.updatePluginsStatuses([plugin]);
}
return previousReportedStatus?.level !== reportedStatus.level;
}

/**
* Emit the current status to internal Subjects, effectively propagating it to observers.
*/
private emitCurrentStatus(): void {
this.pluginData$.next(this.pluginData);
// we must clone the plugin status to prevent future modifications from updating current emission
this.pluginStatus$.next({ ...this.pluginStatus });
}
}

0 comments on commit bacfe26

Please sign in to comment.