Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Task Manager] time out work when it overruns in poller #74980

Merged
merged 3 commits into from
Aug 18, 2020
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions x-pack/plugins/task_manager/server/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ The task_manager can be configured via `taskManager` config options (e.g. `taskM

- `max_attempts` - The maximum number of times a task will be attempted before being abandoned as failed
- `poll_interval` - How often the background worker should check the task_manager index for more work
- `max_poll_inactivity_cycles` - How many poll intervals is work allowed to block polling for before it's timed out. This does not include task execution, as task execution does not block the polling, but rather includes work needed to manage Task Manager's state.
- `index` - The name of the index that the task_manager
- `max_workers` - The maximum number of tasks a Kibana will run concurrently (defaults to 10)
- `credentials` - Encrypted user credentials. All tasks will run in the security context of this user. See [this issue](https:/elastic/dev/issues/1045) for a discussion on task scheduler security.
Expand Down
1 change: 1 addition & 0 deletions x-pack/plugins/task_manager/server/config.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ describe('config validation', () => {
"enabled": true,
"index": ".kibana_task_manager",
"max_attempts": 3,
"max_poll_inactivity_cycles": 10,
"max_workers": 10,
"poll_interval": 3000,
"request_capacity": 1000,
Expand Down
6 changes: 6 additions & 0 deletions x-pack/plugins/task_manager/server/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import { schema, TypeOf } from '@kbn/config-schema';

export const DEFAULT_MAX_WORKERS = 10;
export const DEFAULT_POLL_INTERVAL = 3000;
export const DEFAULT_MAX_POLL_INACTIVITY_CYCLES = 10;

export const configSchema = schema.object({
enabled: schema.boolean({ defaultValue: true }),
Expand All @@ -21,6 +22,11 @@ export const configSchema = schema.object({
defaultValue: DEFAULT_POLL_INTERVAL,
min: 100,
}),
/* How many poll interval cycles can work take before it's timed out. */
max_poll_inactivity_cycles: schema.number({
defaultValue: DEFAULT_MAX_POLL_INACTIVITY_CYCLES,
min: 1,
}),
/* How many requests can Task Manager buffer before it rejects new requests. */
request_capacity: schema.number({
// a nice round contrived number, feel free to change as we learn how it behaves
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/

import { timeoutPromiseAfter } from './timeout_promise_after';

const delay = (ms: number, result: unknown) =>
new Promise((resolve) => setTimeout(() => resolve(result), ms));

const delayRejection = (ms: number, result: unknown) =>
new Promise((resolve, reject) => setTimeout(() => reject(result), ms));

describe('Promise Timeout', () => {
test('resolves when wrapped promise resolves', async () => {
return expect(
timeoutPromiseAfter(delay(100, 'OK'), 1000, () => 'TIMEOUT ERR')
).resolves.toMatchInlineSnapshot(`"OK"`);
});

test('reject when wrapped promise rejects', async () => {
return expect(
timeoutPromiseAfter(delayRejection(100, 'ERR'), 1000, () => 'TIMEOUT ERR')
).rejects.toMatchInlineSnapshot(`"ERR"`);
});

test('reject it the timeout elapses', async () => {
return expect(
timeoutPromiseAfter(delay(1000, 'OK'), 100, () => 'TIMEOUT ERR')
).rejects.toMatchInlineSnapshot(`"TIMEOUT ERR"`);
});
});
16 changes: 16 additions & 0 deletions x-pack/plugins/task_manager/server/lib/timeout_promise_after.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/

export function timeoutPromiseAfter<T, G>(
future: Promise<T>,
ms: number,
onTimeout: () => G
): Promise<T> {
return new Promise((resolve, reject) => {
setTimeout(() => reject(onTimeout()), ms);
future.then(resolve).catch(reject);
});
}
1 change: 1 addition & 0 deletions x-pack/plugins/task_manager/server/task_manager.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ describe('TaskManager', () => {
index: 'foo',
max_attempts: 9,
poll_interval: 6000000,
max_poll_inactivity_cycles: 10,
request_capacity: 1000,
};
const taskManagerOpts = {
Expand Down
5 changes: 5 additions & 0 deletions x-pack/plugins/task_manager/server/task_manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,11 @@ export class TaskManager {
getCapacity: () => this.pool.availableWorkers,
pollRequests$: this.claimRequests$,
work: this.pollForWork,
// Time out the `work` phase if it takes longer than a certain number of polling cycles
// The `work` phase includes the prework needed *before* executing a task
// (such as polling for new work, marking tasks as running etc.) but does not
// include the time of actually running the task
workTimeout: opts.config.poll_interval * opts.config.max_poll_inactivity_cycles,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Curious why you used a "count" as the config bit here, instead of a duration - my guess is the "count" makes this setting "move" if the poll interval changes, which I think is a nice pattern - I'd been thinking some of our other explicit timeouts could be "relative" like this as well - eg, throttle based on alert interval (eg, throttle for 10 intervals, not 10 minutes, or probably allow both).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup, Mike asked the same. :)
It's exactly that - to make it relative to the polling interval.
It also reduces the scenarios you need to support, such as a work duration that's lower than polling interval, and the potential complexity that introduces.

});
}

Expand Down
65 changes: 64 additions & 1 deletion x-pack/plugins/task_manager/server/task_poller.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import { Subject } from 'rxjs';
import { Option, none, some } from 'fp-ts/lib/Option';
import { createTaskPoller, PollingError, PollingErrorType } from './task_poller';
import { fakeSchedulers } from 'rxjs-marbles/jest';
import { sleep, resolvable } from './test_utils';
import { sleep, resolvable, Resolvable } from './test_utils';
import { asOk, asErr } from './lib/result_type';

describe('TaskPoller', () => {
Expand Down Expand Up @@ -243,6 +243,7 @@ describe('TaskPoller', () => {
},
getCapacity: () => 5,
pollRequests$,
workTimeout: pollInterval * 5,
}).subscribe(handler);

pollRequests$.next(some('one'));
Expand Down Expand Up @@ -272,6 +273,68 @@ describe('TaskPoller', () => {
})
);

test(
'work times out whe nit exceeds a predefined amount of time',
fakeSchedulers(async (advance) => {
const pollInterval = 100;
const workTimeout = pollInterval * 2;
const bufferCapacity = 2;

const handler = jest.fn();

type ResolvableTupple = [string, PromiseLike<void> & Resolvable];
const pollRequests$ = new Subject<Option<ResolvableTupple>>();
createTaskPoller<[string, Resolvable], string[]>({
pollInterval,
bufferCapacity,
work: async (...resolvables) => {
await Promise.all(resolvables.map(([, future]) => future));
return resolvables.map(([name]) => name);
},
getCapacity: () => 5,
pollRequests$,
workTimeout,
}).subscribe(handler);

const one: ResolvableTupple = ['one', resolvable()];
pollRequests$.next(some(one));

// split these into two payloads
advance(pollInterval);

const two: ResolvableTupple = ['two', resolvable()];
const three: ResolvableTupple = ['three', resolvable()];
pollRequests$.next(some(two));
pollRequests$.next(some(three));

advance(workTimeout);
await sleep(workTimeout);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we account for a bit of slop here? Seems like it shouldn't be possible for this to resolve before the workTimeout, but you know ... node, time, etc, heh. Maybe just add 100ms or so?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wanted to start here and address if it does introduce flakiness.
In this case it's using fake timers, which I've found to be quite accurate, because it isn't actually time based, but rather is based on the queued timeouts.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ya, I'm guessing even if we did see some timing flakiness, it would be fairly obvious that was the problem, given the context, so WORKSFORME


// one resolves too late!
one[1].resolve();

expect(handler).toHaveBeenCalledWith(
asErr(
new PollingError<string>(
'Failed to poll for work: Error: work has timed out',
PollingErrorType.WorkError,
none
)
)
);
expect(handler.mock.calls[0][0].error.type).toEqual(PollingErrorType.WorkError);

// two and three in time
two[1].resolve();
three[1].resolve();

advance(pollInterval);
await sleep(pollInterval);

expect(handler).toHaveBeenCalledWith(asOk(['two', 'three']));
})
);

test(
'returns an error when polling for work fails',
fakeSchedulers(async (advance) => {
Expand Down
16 changes: 12 additions & 4 deletions x-pack/plugins/task_manager/server/task_poller.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import {
asErr,
promiseResult,
} from './lib/result_type';
import { timeoutPromiseAfter } from './lib/timeout_promise_after';

type WorkFn<T, H> = (...params: T[]) => Promise<H>;

Expand All @@ -34,6 +35,7 @@ interface Opts<T, H> {
getCapacity: () => number;
pollRequests$: Observable<Option<T>>;
work: WorkFn<T, H>;
workTimeout?: number;
}

/**
Expand All @@ -55,6 +57,7 @@ export function createTaskPoller<T, H>({
pollRequests$,
bufferCapacity,
work,
workTimeout,
}: Opts<T, H>): Observable<Result<H, PollingError<T>>> {
const hasCapacity = () => getCapacity() > 0;

Expand Down Expand Up @@ -89,11 +92,15 @@ export function createTaskPoller<T, H>({
concatMap(async (set: Set<T>) => {
closeSleepPerf();
return mapResult<H, Error, Result<H, PollingError<T>>>(
await promiseResult<H, Error>(work(...pullFromSet(set, getCapacity()))),
await promiseResult<H, Error>(
timeoutPromiseAfter<H, Error>(
work(...pullFromSet(set, getCapacity())),
workTimeout ?? pollInterval,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the case for workTimeout being undefined? Tests? I worry that if there a path in production code to have this as undefined, the pollInterval value used would be too close to a valid cycle - should it at least be some kind of multiple of pollInterval, maybe like 2 or something?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm it was really just a default... but yeah, you're right that if someone sets the interval to be lower than the latency of talking to Elasticsearch this will behave badly 🤔
That said... same is true for the actual polling....

Not sure about the * 2 approach, as we have no idea what the interval will be and it might be set to something ridiculous like 30m, in which case, timeout will be 60m....
That said... if your poll is 30m you don't care about immediate results anyway, do you 🤔

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As the default config is set to 10 I think we can leave it as is for now and see how it behaves.
Unless someone specifically changes it, it'll always be far longer than the interval itself, and as this is here as a safeguard against issues in marking tasks as running (this time does not include task execution) I feel comfortable waiting to see how it behaves in the real world.

() => new Error(`work has timed out`)
)
),
(workResult) => asOk(workResult),
(err: Error) => {
return asPollingError<T>(err, PollingErrorType.WorkError);
}
(err: Error) => asPollingError<T>(err, PollingErrorType.WorkError)
);
}),
tap(openSleepPerf),
Expand Down Expand Up @@ -129,6 +136,7 @@ function pushOptionalIntoSet<T>(

export enum PollingErrorType {
WorkError,
WorkTimeout,
RequestCapacityReached,
}

Expand Down
2 changes: 1 addition & 1 deletion x-pack/plugins/task_manager/server/test_utils/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ export function mockLogger() {
};
}

interface Resolvable {
export interface Resolvable {
resolve: () => void;
}

Expand Down