From f4ab5ed74394b2ac29752581f7f83d2ea6ed30bc Mon Sep 17 00:00:00 2001 From: Gidi Meir Morris Date: Thu, 13 Aug 2020 18:29:33 +0100 Subject: [PATCH 1/2] time out if work overruns in poller --- .../task_manager/server/config.test.ts | 1 + x-pack/plugins/task_manager/server/config.ts | 6 ++ .../server/lib/timeout_promise_after.test.ts | 33 ++++++++++ .../server/lib/timeout_promise_after.ts | 16 +++++ .../task_manager/server/task_manager.ts | 5 ++ .../task_manager/server/task_poller.test.ts | 65 ++++++++++++++++++- .../task_manager/server/task_poller.ts | 16 +++-- .../task_manager/server/test_utils/index.ts | 2 +- 8 files changed, 138 insertions(+), 6 deletions(-) create mode 100644 x-pack/plugins/task_manager/server/lib/timeout_promise_after.test.ts create mode 100644 x-pack/plugins/task_manager/server/lib/timeout_promise_after.ts diff --git a/x-pack/plugins/task_manager/server/config.test.ts b/x-pack/plugins/task_manager/server/config.test.ts index 8e877f696a2fc2..d5bbbe65582f18 100644 --- a/x-pack/plugins/task_manager/server/config.test.ts +++ b/x-pack/plugins/task_manager/server/config.test.ts @@ -13,6 +13,7 @@ describe('config validation', () => { "enabled": true, "index": ".kibana_task_manager", "max_attempts": 3, + "max_poll_inactivity_cycles": 10, "max_workers": 10, "poll_interval": 3000, "request_capacity": 1000, diff --git a/x-pack/plugins/task_manager/server/config.ts b/x-pack/plugins/task_manager/server/config.ts index e3af12eca8a494..aa78cf3baa96d4 100644 --- a/x-pack/plugins/task_manager/server/config.ts +++ b/x-pack/plugins/task_manager/server/config.ts @@ -8,6 +8,7 @@ import { schema, TypeOf } from '@kbn/config-schema'; export const DEFAULT_MAX_WORKERS = 10; export const DEFAULT_POLL_INTERVAL = 3000; +export const DEFAULT_MAX_POLL_INACTIVITY_CYCLES = 10; export const configSchema = schema.object({ enabled: schema.boolean({ defaultValue: true }), @@ -21,6 +22,11 @@ export const configSchema = schema.object({ defaultValue: DEFAULT_POLL_INTERVAL, min: 100, }), + /* How many poll interval cycles can work take before it's timed out. */ + max_poll_inactivity_cycles: schema.number({ + defaultValue: DEFAULT_MAX_POLL_INACTIVITY_CYCLES, + min: 1, + }), /* How many requests can Task Manager buffer before it rejects new requests. */ request_capacity: schema.number({ // a nice round contrived number, feel free to change as we learn how it behaves diff --git a/x-pack/plugins/task_manager/server/lib/timeout_promise_after.test.ts b/x-pack/plugins/task_manager/server/lib/timeout_promise_after.test.ts new file mode 100644 index 00000000000000..6cd6b51f2d1259 --- /dev/null +++ b/x-pack/plugins/task_manager/server/lib/timeout_promise_after.test.ts @@ -0,0 +1,33 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +import { timeoutPromiseAfter } from './timeout_promise_after'; + +const delay = (ms: number, result: unknown) => + new Promise((resolve) => setTimeout(() => resolve(result), ms)); + +const delayRejection = (ms: number, result: unknown) => + new Promise((resolve, reject) => setTimeout(() => reject(result), ms)); + +describe('Promise Timeout', () => { + test('resolves when wrapped promise resolves', async () => { + return expect(timeoutPromiseAfter(delay(100, 'OK'), 1000)).resolves.toMatchInlineSnapshot( + `"OK"` + ); + }); + + test('reject when wrapped promise rejects', async () => { + return expect( + timeoutPromiseAfter(delayRejection(100, 'ERR'), 1000) + ).rejects.toMatchInlineSnapshot(`"ERR"`); + }); + + test('reject it the timeout elapses', async () => { + return expect( + timeoutPromiseAfter(delay(1000, 'OK'), 100, () => 'ERR') + ).rejects.toMatchInlineSnapshot(`"ERR"`); + }); +}); diff --git a/x-pack/plugins/task_manager/server/lib/timeout_promise_after.ts b/x-pack/plugins/task_manager/server/lib/timeout_promise_after.ts new file mode 100644 index 00000000000000..2f99bde26ca412 --- /dev/null +++ b/x-pack/plugins/task_manager/server/lib/timeout_promise_after.ts @@ -0,0 +1,16 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +export function timeoutPromiseAfter( + future: Promise, + ms: number, + onTimeout: () => G +): Promise { + return new Promise((resolve, reject) => { + setTimeout(() => reject(onTimeout()), ms); + future.then(resolve).catch(reject); + }); +} diff --git a/x-pack/plugins/task_manager/server/task_manager.ts b/x-pack/plugins/task_manager/server/task_manager.ts index 7165fd28678c16..851a6353739ac0 100644 --- a/x-pack/plugins/task_manager/server/task_manager.ts +++ b/x-pack/plugins/task_manager/server/task_manager.ts @@ -159,6 +159,11 @@ export class TaskManager { getCapacity: () => this.pool.availableWorkers, pollRequests$: this.claimRequests$, work: this.pollForWork, + // Time out the `work` phase if it takes longer than a certain number of polling cycles + // The `work` phase includes the prework needed *before* executing a task + // (such as polling for new work, marking tasks as running etc.) but does not + // include the time of actually running the task + workTimeout: opts.config.poll_interval * opts.config.max_poll_inactivity_cycles, }); } diff --git a/x-pack/plugins/task_manager/server/task_poller.test.ts b/x-pack/plugins/task_manager/server/task_poller.test.ts index 4b0ecef7ff9173..98e6d0f9388a41 100644 --- a/x-pack/plugins/task_manager/server/task_poller.test.ts +++ b/x-pack/plugins/task_manager/server/task_poller.test.ts @@ -9,7 +9,7 @@ import { Subject } from 'rxjs'; import { Option, none, some } from 'fp-ts/lib/Option'; import { createTaskPoller, PollingError, PollingErrorType } from './task_poller'; import { fakeSchedulers } from 'rxjs-marbles/jest'; -import { sleep, resolvable } from './test_utils'; +import { sleep, resolvable, Resolvable } from './test_utils'; import { asOk, asErr } from './lib/result_type'; describe('TaskPoller', () => { @@ -243,6 +243,7 @@ describe('TaskPoller', () => { }, getCapacity: () => 5, pollRequests$, + workTimeout: pollInterval * 5, }).subscribe(handler); pollRequests$.next(some('one')); @@ -272,6 +273,68 @@ describe('TaskPoller', () => { }) ); + test( + 'work times out whe nit exceeds a predefined amount of time', + fakeSchedulers(async (advance) => { + const pollInterval = 100; + const workTimeout = pollInterval * 2; + const bufferCapacity = 2; + + const handler = jest.fn(); + + type ResolvableTupple = [string, PromiseLike & Resolvable]; + const pollRequests$ = new Subject>(); + createTaskPoller<[string, Resolvable], string[]>({ + pollInterval, + bufferCapacity, + work: async (...resolvables) => { + await Promise.all(resolvables.map(([, future]) => future)); + return resolvables.map(([name]) => name); + }, + getCapacity: () => 5, + pollRequests$, + workTimeout, + }).subscribe(handler); + + const one: ResolvableTupple = ['one', resolvable()]; + pollRequests$.next(some(one)); + + // split these into two payloads + advance(pollInterval); + + const two: ResolvableTupple = ['two', resolvable()]; + const three: ResolvableTupple = ['three', resolvable()]; + pollRequests$.next(some(two)); + pollRequests$.next(some(three)); + + advance(workTimeout); + await sleep(workTimeout); + + // one resolves too late! + one[1].resolve(); + + expect(handler).toHaveBeenCalledWith( + asErr( + new PollingError( + 'Failed to poll for work: Error: work has timed out', + PollingErrorType.WorkError, + none + ) + ) + ); + expect(handler.mock.calls[0][0].error.type).toEqual(PollingErrorType.WorkError); + + // two and three in time + two[1].resolve(); + three[1].resolve(); + + advance(pollInterval); + await sleep(pollInterval); + + expect(handler).toHaveBeenCalledWith(asOk(['two', 'three'])); + }) + ); + test( 'returns an error when polling for work fails', fakeSchedulers(async (advance) => { diff --git a/x-pack/plugins/task_manager/server/task_poller.ts b/x-pack/plugins/task_manager/server/task_poller.ts index 3e1a04a366b0e3..88511f42f96fb6 100644 --- a/x-pack/plugins/task_manager/server/task_poller.ts +++ b/x-pack/plugins/task_manager/server/task_poller.ts @@ -25,6 +25,7 @@ import { asErr, promiseResult, } from './lib/result_type'; +import { timeoutPromiseAfter } from './lib/timeout_promise_after'; type WorkFn = (...params: T[]) => Promise; @@ -34,6 +35,7 @@ interface Opts { getCapacity: () => number; pollRequests$: Observable>; work: WorkFn; + workTimeout?: number; } /** @@ -55,6 +57,7 @@ export function createTaskPoller({ pollRequests$, bufferCapacity, work, + workTimeout, }: Opts): Observable>> { const hasCapacity = () => getCapacity() > 0; @@ -89,11 +92,15 @@ export function createTaskPoller({ concatMap(async (set: Set) => { closeSleepPerf(); return mapResult>>( - await promiseResult(work(...pullFromSet(set, getCapacity()))), + await promiseResult( + timeoutPromiseAfter( + work(...pullFromSet(set, getCapacity())), + workTimeout ?? pollInterval, + () => new Error(`work has timed out`) + ) + ), (workResult) => asOk(workResult), - (err: Error) => { - return asPollingError(err, PollingErrorType.WorkError); - } + (err: Error) => asPollingError(err, PollingErrorType.WorkError) ); }), tap(openSleepPerf), @@ -129,6 +136,7 @@ function pushOptionalIntoSet( export enum PollingErrorType { WorkError, + WorkTimeout, RequestCapacityReached, } diff --git a/x-pack/plugins/task_manager/server/test_utils/index.ts b/x-pack/plugins/task_manager/server/test_utils/index.ts index 3f000a9564ba3d..6f43a60ff42d28 100644 --- a/x-pack/plugins/task_manager/server/test_utils/index.ts +++ b/x-pack/plugins/task_manager/server/test_utils/index.ts @@ -23,7 +23,7 @@ export function mockLogger() { }; } -interface Resolvable { +export interface Resolvable { resolve: () => void; } From a3fd505482eb7701a5fbc855ffa2b99816cb439a Mon Sep 17 00:00:00 2001 From: Gidi Meir Morris Date: Fri, 14 Aug 2020 09:27:17 +0100 Subject: [PATCH 2/2] updated docs & test --- x-pack/plugins/task_manager/server/README.md | 1 + .../server/lib/timeout_promise_after.test.ts | 12 ++++++------ .../plugins/task_manager/server/task_manager.test.ts | 1 + 3 files changed, 8 insertions(+), 6 deletions(-) diff --git a/x-pack/plugins/task_manager/server/README.md b/x-pack/plugins/task_manager/server/README.md index c3d45be5d8f22e..fd2409a7db0a53 100644 --- a/x-pack/plugins/task_manager/server/README.md +++ b/x-pack/plugins/task_manager/server/README.md @@ -41,6 +41,7 @@ The task_manager can be configured via `taskManager` config options (e.g. `taskM - `max_attempts` - The maximum number of times a task will be attempted before being abandoned as failed - `poll_interval` - How often the background worker should check the task_manager index for more work +- `max_poll_inactivity_cycles` - How many poll intervals is work allowed to block polling for before it's timed out. This does not include task execution, as task execution does not block the polling, but rather includes work needed to manage Task Manager's state. - `index` - The name of the index that the task_manager - `max_workers` - The maximum number of tasks a Kibana will run concurrently (defaults to 10) - `credentials` - Encrypted user credentials. All tasks will run in the security context of this user. See [this issue](https://github.com/elastic/dev/issues/1045) for a discussion on task scheduler security. diff --git a/x-pack/plugins/task_manager/server/lib/timeout_promise_after.test.ts b/x-pack/plugins/task_manager/server/lib/timeout_promise_after.test.ts index 6cd6b51f2d1259..3e88269671dccd 100644 --- a/x-pack/plugins/task_manager/server/lib/timeout_promise_after.test.ts +++ b/x-pack/plugins/task_manager/server/lib/timeout_promise_after.test.ts @@ -14,20 +14,20 @@ const delayRejection = (ms: number, result: unknown) => describe('Promise Timeout', () => { test('resolves when wrapped promise resolves', async () => { - return expect(timeoutPromiseAfter(delay(100, 'OK'), 1000)).resolves.toMatchInlineSnapshot( - `"OK"` - ); + return expect( + timeoutPromiseAfter(delay(100, 'OK'), 1000, () => 'TIMEOUT ERR') + ).resolves.toMatchInlineSnapshot(`"OK"`); }); test('reject when wrapped promise rejects', async () => { return expect( - timeoutPromiseAfter(delayRejection(100, 'ERR'), 1000) + timeoutPromiseAfter(delayRejection(100, 'ERR'), 1000, () => 'TIMEOUT ERR') ).rejects.toMatchInlineSnapshot(`"ERR"`); }); test('reject it the timeout elapses', async () => { return expect( - timeoutPromiseAfter(delay(1000, 'OK'), 100, () => 'ERR') - ).rejects.toMatchInlineSnapshot(`"ERR"`); + timeoutPromiseAfter(delay(1000, 'OK'), 100, () => 'TIMEOUT ERR') + ).rejects.toMatchInlineSnapshot(`"TIMEOUT ERR"`); }); }); diff --git a/x-pack/plugins/task_manager/server/task_manager.test.ts b/x-pack/plugins/task_manager/server/task_manager.test.ts index 7035971ad60610..cf7f9e2a7cff38 100644 --- a/x-pack/plugins/task_manager/server/task_manager.test.ts +++ b/x-pack/plugins/task_manager/server/task_manager.test.ts @@ -40,6 +40,7 @@ describe('TaskManager', () => { index: 'foo', max_attempts: 9, poll_interval: 6000000, + max_poll_inactivity_cycles: 10, request_capacity: 1000, }; const taskManagerOpts = {