Skip to content

Commit

Permalink
Add timeouts and setup enforcement for custom plugins statuses (#77965)…
Browse files Browse the repository at this point in the history
… (#103149)

Co-authored-by: Josh Dover <[email protected]>
  • Loading branch information
kibanamachine and joshdover authored Jun 23, 2021
1 parent 081ba7f commit a6985b2
Show file tree
Hide file tree
Showing 25 changed files with 529 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,5 +24,7 @@ set(status$: Observable<ServiceStatus>): void;

## Remarks

The first emission from this Observable should occur within 30s, else this plugin's status will fallback to `unavailable` until the first emission.

See the [StatusServiceSetup.derivedStatus$](./kibana-plugin-core-server.statusservicesetup.derivedstatus_.md) API for leveraging the default status calculation that is provided by Core.

1 change: 1 addition & 0 deletions packages/kbn-pm/dist/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -63827,6 +63827,7 @@ function getProjectPaths({

projectPaths.push(Object(path__WEBPACK_IMPORTED_MODULE_0__["resolve"])(rootPath, 'test/plugin_functional/plugins/*'));
projectPaths.push(Object(path__WEBPACK_IMPORTED_MODULE_0__["resolve"])(rootPath, 'test/interpreter_functional/plugins/*'));
projectPaths.push(Object(path__WEBPACK_IMPORTED_MODULE_0__["resolve"])(rootPath, 'test/server_integration/__fixtures__/plugins/*'));
projectPaths.push(Object(path__WEBPACK_IMPORTED_MODULE_0__["resolve"])(rootPath, 'examples/*'));

if (!ossOnly) {
Expand Down
1 change: 1 addition & 0 deletions packages/kbn-pm/src/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ export function getProjectPaths({ rootPath, ossOnly, skipKibanaPlugins }: Option
// correct and the expect behavior.
projectPaths.push(resolve(rootPath, 'test/plugin_functional/plugins/*'));
projectPaths.push(resolve(rootPath, 'test/interpreter_functional/plugins/*'));
projectPaths.push(resolve(rootPath, 'test/server_integration/__fixtures__/plugins/*'));
projectPaths.push(resolve(rootPath, 'examples/*'));

if (!ossOnly) {
Expand Down
2 changes: 2 additions & 0 deletions src/core/server/server.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -114,13 +114,15 @@ test('runs services on "start"', async () => {
expect(mockSavedObjectsService.start).not.toHaveBeenCalled();
expect(mockUiSettingsService.start).not.toHaveBeenCalled();
expect(mockMetricsService.start).not.toHaveBeenCalled();
expect(mockStatusService.start).not.toHaveBeenCalled();

await server.start();

expect(mockHttpService.start).toHaveBeenCalledTimes(1);
expect(mockSavedObjectsService.start).toHaveBeenCalledTimes(1);
expect(mockUiSettingsService.start).toHaveBeenCalledTimes(1);
expect(mockMetricsService.start).toHaveBeenCalledTimes(1);
expect(mockStatusService.start).toHaveBeenCalledTimes(1);
});

test('does not fail on "setup" if there are unused paths detected', async () => {
Expand Down
2 changes: 1 addition & 1 deletion src/core/server/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,7 @@ export class Server {
savedObjects: savedObjectsStart,
exposedConfigsToUsage: this.plugins.getExposedPluginConfigsToUsage(),
});
this.status.start();

this.coreStart = {
capabilities: capabilitiesStart,
Expand All @@ -261,7 +262,6 @@ export class Server {

await this.plugins.start(this.coreStart);

this.status.start();
await this.http.start();

startTransaction?.end();
Expand Down
93 changes: 92 additions & 1 deletion src/core/server/status/plugins_status.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

import { PluginName } from '../plugins';
import { PluginsStatusService } from './plugins_status';
import { of, Observable, BehaviorSubject } from 'rxjs';
import { of, Observable, BehaviorSubject, ReplaySubject } from 'rxjs';
import { ServiceStatusLevels, CoreStatus, ServiceStatus } from './types';
import { first } from 'rxjs/operators';
import { ServiceStatusLevelSnapshotSerializer } from './test_utils';
Expand All @@ -34,6 +34,28 @@ describe('PluginStatusService', () => {
['c', ['a', 'b']],
]);

describe('set', () => {
it('throws an exception if called after registrations are blocked', () => {
const service = new PluginsStatusService({
core$: coreAllAvailable$,
pluginDependencies,
});

service.blockNewRegistrations();
expect(() => {
service.set(
'a',
of({
level: ServiceStatusLevels.available,
summary: 'fail!',
})
);
}).toThrowErrorMatchingInlineSnapshot(
`"Custom statuses cannot be registered after setup, plugin [a] attempted"`
);
});
});

describe('getDerivedStatus$', () => {
it(`defaults to core's most severe status`, async () => {
const serviceAvailable = new PluginsStatusService({
Expand Down Expand Up @@ -231,6 +253,75 @@ describe('PluginStatusService', () => {
{ a: { level: ServiceStatusLevels.available, summary: 'a available' } },
]);
});

it('updates when a plugin status observable emits', async () => {
const service = new PluginsStatusService({
core$: coreAllAvailable$,
pluginDependencies: new Map([['a', []]]),
});
const statusUpdates: Array<Record<PluginName, ServiceStatus>> = [];
const subscription = service
.getAll$()
.subscribe((pluginStatuses) => statusUpdates.push(pluginStatuses));

const aStatus$ = new BehaviorSubject<ServiceStatus>({
level: ServiceStatusLevels.degraded,
summary: 'a degraded',
});
service.set('a', aStatus$);
aStatus$.next({ level: ServiceStatusLevels.unavailable, summary: 'a unavailable' });
aStatus$.next({ level: ServiceStatusLevels.available, summary: 'a available' });
subscription.unsubscribe();

expect(statusUpdates).toEqual([
{ a: { level: ServiceStatusLevels.available, summary: 'All dependencies are available' } },
{ a: { level: ServiceStatusLevels.degraded, summary: 'a degraded' } },
{ a: { level: ServiceStatusLevels.unavailable, summary: 'a unavailable' } },
{ a: { level: ServiceStatusLevels.available, summary: 'a available' } },
]);
});

it('emits an unavailable status if first emission times out, then continues future emissions', async () => {
jest.useFakeTimers();
const service = new PluginsStatusService({
core$: coreAllAvailable$,
pluginDependencies: new Map([
['a', []],
['b', ['a']],
]),
});

const pluginA$ = new ReplaySubject<ServiceStatus>(1);
service.set('a', pluginA$);
const firstEmission = service.getAll$().pipe(first()).toPromise();
jest.runAllTimers();

expect(await firstEmission).toEqual({
a: { level: ServiceStatusLevels.unavailable, summary: 'Status check timed out after 30s' },
b: {
level: ServiceStatusLevels.unavailable,
summary: '[a]: Status check timed out after 30s',
detail: 'See the status page for more information',
meta: {
affectedServices: {
a: {
level: ServiceStatusLevels.unavailable,
summary: 'Status check timed out after 30s',
},
},
},
},
});

pluginA$.next({ level: ServiceStatusLevels.available, summary: 'a available' });
const secondEmission = service.getAll$().pipe(first()).toPromise();
jest.runAllTimers();
expect(await secondEmission).toEqual({
a: { level: ServiceStatusLevels.available, summary: 'a available' },
b: { level: ServiceStatusLevels.available, summary: 'All dependencies are available' },
});
jest.useRealTimers();
});
});

describe('getDependenciesStatus$', () => {
Expand Down
46 changes: 37 additions & 9 deletions src/core/server/status/plugins_status.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,22 @@
*/

import { BehaviorSubject, Observable, combineLatest, of } from 'rxjs';
import { map, distinctUntilChanged, switchMap, debounceTime } from 'rxjs/operators';
import {
map,
distinctUntilChanged,
switchMap,
debounceTime,
timeoutWith,
startWith,
} from 'rxjs/operators';
import { isDeepStrictEqual } from 'util';

import { PluginName } from '../plugins';
import { ServiceStatus, CoreStatus } from './types';
import { ServiceStatus, CoreStatus, ServiceStatusLevels } from './types';
import { getSummaryStatus } from './get_summary_status';

const STATUS_TIMEOUT_MS = 30 * 1000; // 30 seconds

interface Deps {
core$: Observable<CoreStatus>;
pluginDependencies: ReadonlyMap<PluginName, PluginName[]>;
Expand All @@ -23,6 +32,7 @@ export class PluginsStatusService {
private readonly pluginStatuses = new Map<PluginName, Observable<ServiceStatus>>();
private readonly update$ = new BehaviorSubject(true);
private readonly defaultInheritedStatus$: Observable<ServiceStatus>;
private newRegistrationsAllowed = true;

constructor(private readonly deps: Deps) {
this.defaultInheritedStatus$ = this.deps.core$.pipe(
Expand All @@ -35,10 +45,19 @@ export class PluginsStatusService {
}

public set(plugin: PluginName, status$: Observable<ServiceStatus>) {
if (!this.newRegistrationsAllowed) {
throw new Error(
`Custom statuses cannot be registered after setup, plugin [${plugin}] attempted`
);
}
this.pluginStatuses.set(plugin, status$);
this.update$.next(true); // trigger all existing Observables to update from the new source Observable
}

public blockNewRegistrations() {
this.newRegistrationsAllowed = false;
}

public getAll$(): Observable<Record<PluginName, ServiceStatus>> {
return this.getPluginStatuses$([...this.deps.pluginDependencies.keys()]);
}
Expand Down Expand Up @@ -86,13 +105,22 @@ export class PluginsStatusService {
return this.update$.pipe(
switchMap(() => {
const pluginStatuses = plugins
.map(
(depName) =>
[depName, this.pluginStatuses.get(depName) ?? this.getDerivedStatus$(depName)] as [
PluginName,
Observable<ServiceStatus>
]
)
.map((depName) => {
const pluginStatus = this.pluginStatuses.get(depName)
? this.pluginStatuses.get(depName)!.pipe(
timeoutWith(
STATUS_TIMEOUT_MS,
this.pluginStatuses.get(depName)!.pipe(
startWith({
level: ServiceStatusLevels.unavailable,
summary: `Status check timed out after ${STATUS_TIMEOUT_MS / 1000}s`,
})
)
)
)
: this.getDerivedStatus$(depName);
return [depName, pluginStatus] as [PluginName, Observable<ServiceStatus>];
})
.map(([pName, status$]) =>
status$.pipe(map((status) => [pName, status] as [PluginName, ServiceStatus]))
);
Expand Down
6 changes: 4 additions & 2 deletions src/core/server/status/status_service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -135,9 +135,11 @@ export class StatusService implements CoreService<InternalStatusServiceSetup> {
}

public start() {
if (!this.overall$) {
throw new Error('cannot call `start` before `setup`');
if (!this.pluginsStatus || !this.overall$) {
throw new Error(`StatusService#setup must be called before #start`);
}
this.pluginsStatus.blockNewRegistrations();

getOverallStatusChanges(this.overall$, this.stop$).subscribe((message) => {
this.logger.info(message);
});
Expand Down
3 changes: 3 additions & 0 deletions src/core/server/status/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,9 @@ export interface StatusServiceSetup {
* Completely overrides the default inherited status.
*
* @remarks
* The first emission from this Observable should occur within 30s, else this plugin's status will fallback to
* `unavailable` until the first emission.
*
* See the {@link StatusServiceSetup.derivedStatus$} API for leveraging the default status
* calculation that is provided by Core.
*/
Expand Down
3 changes: 3 additions & 0 deletions src/dev/typescript/projects.ts
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,9 @@ export const PROJECTS = [
...glob
.sync('test/interpreter_functional/plugins/*/tsconfig.json', { cwd: REPO_ROOT })
.map((path) => new Project(resolve(REPO_ROOT, path))),
...glob
.sync('test/server_integration/__fixtures__/plugins/*/tsconfig.json', { cwd: REPO_ROOT })
.map((path) => new Project(resolve(REPO_ROOT, path))),
];

export function filterProjectsByFlag(projectFlag?: string) {
Expand Down
71 changes: 71 additions & 0 deletions test/plugin_functional/test_suites/core_plugins/status.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

import expect from '@kbn/expect';
import { ServiceStatusLevels } from '../../../../src/core/server';
import { PluginFunctionalProviderContext } from '../../services';

export default function ({ getService, getPageObjects }: PluginFunctionalProviderContext) {
const supertest = getService('supertest');
const log = getService('log');

const delay = (ms: number) => new Promise((r) => setTimeout(r, ms));
const getStatus = async (pluginName?: string) => {
const resp = await supertest.get('/api/status?v8format=true');

if (pluginName) {
return resp.body.status.plugins[pluginName];
} else {
return resp.body.status.overall;
}
};

const setStatus = async <T extends keyof typeof ServiceStatusLevels>(level: T) =>
supertest
.post(`/internal/core_plugin_a/status/set?level=${level}`)
.set('kbn-xsrf', 'xxx')
.expect(200);

describe('status service', () => {
// This test must comes first because the timeout only applies to the initial emission
it("returns a timeout for status check that doesn't emit after 30s", async () => {
let aStatus = await getStatus('corePluginA');
expect(aStatus.level).to.eql('unavailable');

// Status will remain in unavailable due to core services until custom status timesout
// Keep polling until that condition ends, up to a timeout
const start = Date.now();
while ('elasticsearch' in (aStatus.meta?.affectedServices ?? {})) {
aStatus = await getStatus('corePluginA');
expect(aStatus.level).to.eql('unavailable');

// If it's been more than 40s, break out of this loop
if (Date.now() - start >= 40_000) {
throw new Error(`Timed out waiting for status timeout after 40s`);
}

log.info('Waiting for status check to timeout...');
await delay(2000);
}

expect(aStatus.summary).to.eql('Status check timed out after 30s');
});

it('propagates status issues to dependencies', async () => {
await setStatus('degraded');
await delay(1000);
expect((await getStatus('corePluginA')).level).to.eql('degraded');
expect((await getStatus('corePluginB')).level).to.eql('degraded');

await setStatus('available');
await delay(1000);
expect((await getStatus('corePluginA')).level).to.eql('available');
expect((await getStatus('corePluginB')).level).to.eql('available');
});
});
}
7 changes: 7 additions & 0 deletions test/scripts/test/server_integration.sh
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,10 @@ checks-reporter-with-killswitch "Server Integration Tests" \
--bail \
--debug \
--kibana-install-dir $KIBANA_INSTALL_DIR

# Tests that must be run against source in order to build test plugins
checks-reporter-with-killswitch "Status Integration Tests" \
node scripts/functional_tests \
--config test/server_integration/http/platform/config.status.ts \
--bail \
--debug \
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
{
"id": "statusPluginA",
"version": "0.0.1",
"kibanaVersion": "kibana",
"server": true,
"ui": false
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
{
"name": "status_plugin_a",
"version": "1.0.0",
"main": "target/test/server_integration/__fixtures__/plugins/status_plugin_a",
"kibana": {
"version": "kibana",
"templateVersion": "1.0.0"
},
"license": "SSPL-1.0 OR Elastic License 2.0",
"scripts": {
"kbn": "node ../../../../../../scripts/kbn.js",
"build": "rm -rf './target' && ../../../../../../node_modules/.bin/tsc"
}
}
Loading

0 comments on commit a6985b2

Please sign in to comment.