Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[7.x] [Task Manager] time out work when it overruns in poller (#74980) #75317

Merged
merged 1 commit into from
Aug 19, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions x-pack/plugins/task_manager/server/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ The task_manager can be configured via `taskManager` config options (e.g. `taskM

- `max_attempts` - The maximum number of times a task will be attempted before being abandoned as failed
- `poll_interval` - How often the background worker should check the task_manager index for more work
- `max_poll_inactivity_cycles` - How many poll intervals is work allowed to block polling for before it's timed out. This does not include task execution, as task execution does not block the polling, but rather includes work needed to manage Task Manager's state.
- `index` - The name of the index that the task_manager
- `max_workers` - The maximum number of tasks a Kibana will run concurrently (defaults to 10)
- `credentials` - Encrypted user credentials. All tasks will run in the security context of this user. See [this issue](https:/elastic/dev/issues/1045) for a discussion on task scheduler security.
Expand Down
1 change: 1 addition & 0 deletions x-pack/plugins/task_manager/server/config.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ describe('config validation', () => {
"enabled": true,
"index": ".kibana_task_manager",
"max_attempts": 3,
"max_poll_inactivity_cycles": 10,
"max_workers": 10,
"poll_interval": 3000,
"request_capacity": 1000,
Expand Down
6 changes: 6 additions & 0 deletions x-pack/plugins/task_manager/server/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import { schema, TypeOf } from '@kbn/config-schema';

export const DEFAULT_MAX_WORKERS = 10;
export const DEFAULT_POLL_INTERVAL = 3000;
export const DEFAULT_MAX_POLL_INACTIVITY_CYCLES = 10;

export const configSchema = schema.object({
enabled: schema.boolean({ defaultValue: true }),
Expand All @@ -21,6 +22,11 @@ export const configSchema = schema.object({
defaultValue: DEFAULT_POLL_INTERVAL,
min: 100,
}),
/* How many poll interval cycles can work take before it's timed out. */
max_poll_inactivity_cycles: schema.number({
defaultValue: DEFAULT_MAX_POLL_INACTIVITY_CYCLES,
min: 1,
}),
/* How many requests can Task Manager buffer before it rejects new requests. */
request_capacity: schema.number({
// a nice round contrived number, feel free to change as we learn how it behaves
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/

import { timeoutPromiseAfter } from './timeout_promise_after';

const delay = (ms: number, result: unknown) =>
new Promise((resolve) => setTimeout(() => resolve(result), ms));

const delayRejection = (ms: number, result: unknown) =>
new Promise((resolve, reject) => setTimeout(() => reject(result), ms));

describe('Promise Timeout', () => {
test('resolves when wrapped promise resolves', async () => {
return expect(
timeoutPromiseAfter(delay(100, 'OK'), 1000, () => 'TIMEOUT ERR')
).resolves.toMatchInlineSnapshot(`"OK"`);
});

test('reject when wrapped promise rejects', async () => {
return expect(
timeoutPromiseAfter(delayRejection(100, 'ERR'), 1000, () => 'TIMEOUT ERR')
).rejects.toMatchInlineSnapshot(`"ERR"`);
});

test('reject it the timeout elapses', async () => {
return expect(
timeoutPromiseAfter(delay(1000, 'OK'), 100, () => 'TIMEOUT ERR')
).rejects.toMatchInlineSnapshot(`"TIMEOUT ERR"`);
});
});
16 changes: 16 additions & 0 deletions x-pack/plugins/task_manager/server/lib/timeout_promise_after.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/

export function timeoutPromiseAfter<T, G>(
future: Promise<T>,
ms: number,
onTimeout: () => G
): Promise<T> {
return new Promise((resolve, reject) => {
setTimeout(() => reject(onTimeout()), ms);
future.then(resolve).catch(reject);
});
}
1 change: 1 addition & 0 deletions x-pack/plugins/task_manager/server/task_manager.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ describe('TaskManager', () => {
index: 'foo',
max_attempts: 9,
poll_interval: 6000000,
max_poll_inactivity_cycles: 10,
request_capacity: 1000,
};
const taskManagerOpts = {
Expand Down
5 changes: 5 additions & 0 deletions x-pack/plugins/task_manager/server/task_manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,11 @@ export class TaskManager {
getCapacity: () => this.pool.availableWorkers,
pollRequests$: this.claimRequests$,
work: this.pollForWork,
// Time out the `work` phase if it takes longer than a certain number of polling cycles
// The `work` phase includes the prework needed *before* executing a task
// (such as polling for new work, marking tasks as running etc.) but does not
// include the time of actually running the task
workTimeout: opts.config.poll_interval * opts.config.max_poll_inactivity_cycles,
});
}

Expand Down
65 changes: 64 additions & 1 deletion x-pack/plugins/task_manager/server/task_poller.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import { Subject } from 'rxjs';
import { Option, none, some } from 'fp-ts/lib/Option';
import { createTaskPoller, PollingError, PollingErrorType } from './task_poller';
import { fakeSchedulers } from 'rxjs-marbles/jest';
import { sleep, resolvable } from './test_utils';
import { sleep, resolvable, Resolvable } from './test_utils';
import { asOk, asErr } from './lib/result_type';

describe('TaskPoller', () => {
Expand Down Expand Up @@ -243,6 +243,7 @@ describe('TaskPoller', () => {
},
getCapacity: () => 5,
pollRequests$,
workTimeout: pollInterval * 5,
}).subscribe(handler);

pollRequests$.next(some('one'));
Expand Down Expand Up @@ -272,6 +273,68 @@ describe('TaskPoller', () => {
})
);

test(
'work times out whe nit exceeds a predefined amount of time',
fakeSchedulers(async (advance) => {
const pollInterval = 100;
const workTimeout = pollInterval * 2;
const bufferCapacity = 2;

const handler = jest.fn();

type ResolvableTupple = [string, PromiseLike<void> & Resolvable];
const pollRequests$ = new Subject<Option<ResolvableTupple>>();
createTaskPoller<[string, Resolvable], string[]>({
pollInterval,
bufferCapacity,
work: async (...resolvables) => {
await Promise.all(resolvables.map(([, future]) => future));
return resolvables.map(([name]) => name);
},
getCapacity: () => 5,
pollRequests$,
workTimeout,
}).subscribe(handler);

const one: ResolvableTupple = ['one', resolvable()];
pollRequests$.next(some(one));

// split these into two payloads
advance(pollInterval);

const two: ResolvableTupple = ['two', resolvable()];
const three: ResolvableTupple = ['three', resolvable()];
pollRequests$.next(some(two));
pollRequests$.next(some(three));

advance(workTimeout);
await sleep(workTimeout);

// one resolves too late!
one[1].resolve();

expect(handler).toHaveBeenCalledWith(
asErr(
new PollingError<string>(
'Failed to poll for work: Error: work has timed out',
PollingErrorType.WorkError,
none
)
)
);
expect(handler.mock.calls[0][0].error.type).toEqual(PollingErrorType.WorkError);

// two and three in time
two[1].resolve();
three[1].resolve();

advance(pollInterval);
await sleep(pollInterval);

expect(handler).toHaveBeenCalledWith(asOk(['two', 'three']));
})
);

test(
'returns an error when polling for work fails',
fakeSchedulers(async (advance) => {
Expand Down
16 changes: 12 additions & 4 deletions x-pack/plugins/task_manager/server/task_poller.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import {
asErr,
promiseResult,
} from './lib/result_type';
import { timeoutPromiseAfter } from './lib/timeout_promise_after';

type WorkFn<T, H> = (...params: T[]) => Promise<H>;

Expand All @@ -34,6 +35,7 @@ interface Opts<T, H> {
getCapacity: () => number;
pollRequests$: Observable<Option<T>>;
work: WorkFn<T, H>;
workTimeout?: number;
}

/**
Expand All @@ -55,6 +57,7 @@ export function createTaskPoller<T, H>({
pollRequests$,
bufferCapacity,
work,
workTimeout,
}: Opts<T, H>): Observable<Result<H, PollingError<T>>> {
const hasCapacity = () => getCapacity() > 0;

Expand Down Expand Up @@ -89,11 +92,15 @@ export function createTaskPoller<T, H>({
concatMap(async (set: Set<T>) => {
closeSleepPerf();
return mapResult<H, Error, Result<H, PollingError<T>>>(
await promiseResult<H, Error>(work(...pullFromSet(set, getCapacity()))),
await promiseResult<H, Error>(
timeoutPromiseAfter<H, Error>(
work(...pullFromSet(set, getCapacity())),
workTimeout ?? pollInterval,
() => new Error(`work has timed out`)
)
),
(workResult) => asOk(workResult),
(err: Error) => {
return asPollingError<T>(err, PollingErrorType.WorkError);
}
(err: Error) => asPollingError<T>(err, PollingErrorType.WorkError)
);
}),
tap(openSleepPerf),
Expand Down Expand Up @@ -129,6 +136,7 @@ function pushOptionalIntoSet<T>(

export enum PollingErrorType {
WorkError,
WorkTimeout,
RequestCapacityReached,
}

Expand Down
2 changes: 1 addition & 1 deletion x-pack/plugins/task_manager/server/test_utils/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ export function mockLogger() {
};
}

interface Resolvable {
export interface Resolvable {
resolve: () => void;
}

Expand Down