From b893db72e2bf8ebafca8860479257eff3c6d34f6 Mon Sep 17 00:00:00 2001 From: Timothy Sullivan Date: Fri, 7 Sep 2018 14:15:09 -0700 Subject: [PATCH 01/12] return raw task doc fields for calling code --- src/server/task_manager/README.md | 2 +- src/server/task_manager/client_wrapper.ts | 37 +++++++++++++++++-- .../task_manager/task_pool/task_manager.ts | 11 +++--- .../task_manager/task_pool/task_store.ts | 8 ++-- 4 files changed, 45 insertions(+), 13 deletions(-) diff --git a/src/server/task_manager/README.md b/src/server/task_manager/README.md index 389d9c4bf06c7c..46a17705d8ce4a 100644 --- a/src/server/task_manager/README.md +++ b/src/server/task_manager/README.md @@ -201,7 +201,7 @@ const manager = server.taskManager; // Schedules a task. All properties are as documented in the previous // storage section, except that here, params is an object, not a JSON // string. -const task = manager.schedule({ +const task = await manager.schedule({ taskType, runAt, interval, diff --git a/src/server/task_manager/client_wrapper.ts b/src/server/task_manager/client_wrapper.ts index 8cba5931232acf..f8588c6c60152d 100644 --- a/src/server/task_manager/client_wrapper.ts +++ b/src/server/task_manager/client_wrapper.ts @@ -19,6 +19,20 @@ import { TaskManager } from './task_pool'; import { TaskInstance } from './task_pool/task'; +import { FetchOpts, FetchResult } from './task_pool/task_store'; + +interface ClientCheckResult { + error?: Error; +} + +function checkClient(client: TaskManager | null): ClientCheckResult { + if (client === null) { + return { + error: new Error('Task Manager Client has not been set properly!'), + }; + } + return {}; +} export class TaskManagerClientWrapper { private client: TaskManager | null; @@ -32,9 +46,26 @@ export class TaskManagerClientWrapper { } public schedule(task: TaskInstance) { - if (this.client == null) { - throw new Error('Task Manager Client has not been set properly!'); + const { error } = checkClient(this.client); + if (error) { + throw error; + } + return this.client.schedule(task); + } + + public remove(id: string) { + const { error } = checkClient(this.client); + if (error) { + throw error; + } + return this.client.remove(id); + } + + public fetch(opts: FetchOpts = {}): Promise { + const { error } = checkClient(this.client); + if (error) { + throw error; } - this.client.schedule(task); + return this.client.fetch(opts); } } diff --git a/src/server/task_manager/task_pool/task_manager.ts b/src/server/task_manager/task_pool/task_manager.ts index 693d6e48f3c5cb..263e68f0057e3f 100644 --- a/src/server/task_manager/task_pool/task_manager.ts +++ b/src/server/task_manager/task_pool/task_manager.ts @@ -19,7 +19,7 @@ import { TaskInstance } from './task'; import { TaskPoller } from './task_poller'; -import { FetchOpts, FetchResult, TaskStore } from './task_store'; +import { FetchOpts, FetchResult, RawTaskDoc, TaskStore } from './task_store'; interface Opts { poller: TaskPoller; @@ -35,16 +35,17 @@ export class TaskManager { this.store = opts.store; } - public async schedule(task: TaskInstance) { - await this.store.schedule(task); - this.poller.attemptWork(); + public async schedule(task: TaskInstance): Promise { + const result = await this.store.schedule(task); + this.poller.attemptWork(); // TODO await this? + return result; } public fetch(opts: FetchOpts = {}): Promise { return this.store.fetch(opts); } - public remove(id: string): Promise { + public remove(id: string): Promise { return this.store.remove(id); } } diff --git a/src/server/task_manager/task_pool/task_store.ts b/src/server/task_manager/task_pool/task_store.ts index 134229f4bdec65..81325e4679da19 100644 --- a/src/server/task_manager/task_pool/task_store.ts +++ b/src/server/task_manager/task_pool/task_store.ts @@ -39,7 +39,7 @@ export interface FetchResult { } // Internal, the raw document, as stored in the Kibana index. -interface RawTaskDoc { +export interface RawTaskDoc { _id: string; _index: string; _type: string; @@ -133,7 +133,7 @@ export class TaskStore { } } - public schedule(task: TaskInstance) { + public schedule(task: TaskInstance): Promise { return this.callCluster('index', { index: this.index, type: DOC_TYPE, @@ -218,7 +218,7 @@ export class TaskStore { * @param {string} id * @returns {Promise} */ - public async remove(id: string): Promise { + public async remove(id: string): Promise { return this.callCluster('delete', { id, index: this.index, @@ -240,7 +240,7 @@ export class TaskStore { function paginatableSort(sort: any[] = []) { if (!sort.length) { - return [{ runAt: 'asc' }, { _id: 'desc' }]; + return [{ 'task.runAt': 'asc' }, { _id: 'desc' }]; } if (sort.find(({ _id }) => !!_id)) { From da4f53c5fc3ae1bf486f909c21f2633966d2d252 Mon Sep 17 00:00:00 2001 From: Timothy Sullivan Date: Tue, 11 Sep 2018 12:51:38 -0700 Subject: [PATCH 02/12] remove todo comment --- src/server/task_manager/task_pool/task_manager.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/server/task_manager/task_pool/task_manager.ts b/src/server/task_manager/task_pool/task_manager.ts index 263e68f0057e3f..13fa47211310ee 100644 --- a/src/server/task_manager/task_pool/task_manager.ts +++ b/src/server/task_manager/task_pool/task_manager.ts @@ -37,7 +37,7 @@ export class TaskManager { public async schedule(task: TaskInstance): Promise { const result = await this.store.schedule(task); - this.poller.attemptWork(); // TODO await this? + this.poller.attemptWork(); return result; } From 3a7a5c2dd734543771ff9db01b84bb2a7bf6199a Mon Sep 17 00:00:00 2001 From: Timothy Sullivan Date: Tue, 11 Sep 2018 15:45:15 -0700 Subject: [PATCH 03/12] helper module for default client - setClient takes a callback fn --- src/server/task_manager/client_wrapper.ts | 55 +++++-------- src/server/task_manager/default_client.ts | 77 +++++++++++++++++++ src/server/task_manager/task_manager_mixin.ts | 66 ++++------------ 3 files changed, 111 insertions(+), 87 deletions(-) create mode 100644 src/server/task_manager/default_client.ts diff --git a/src/server/task_manager/client_wrapper.ts b/src/server/task_manager/client_wrapper.ts index f8588c6c60152d..5ccf4245c49042 100644 --- a/src/server/task_manager/client_wrapper.ts +++ b/src/server/task_manager/client_wrapper.ts @@ -17,55 +17,40 @@ * under the License. */ -import { TaskManager } from './task_pool'; -import { TaskInstance } from './task_pool/task'; -import { FetchOpts, FetchResult } from './task_pool/task_store'; - -interface ClientCheckResult { - error?: Error; -} - -function checkClient(client: TaskManager | null): ClientCheckResult { - if (client === null) { - return { - error: new Error('Task Manager Client has not been set properly!'), - }; - } - return {}; -} +import { TaskDictionary, TaskManager, TaskManagerLogger } from './task_pool'; +import { SanitizedTaskDefinition, TaskInstance } from './task_pool/task'; +import { FetchOpts } from './task_pool/task_store'; export class TaskManagerClientWrapper { private client: TaskManager | null; - constructor() { + constructor( + private logger: TaskManagerLogger, + private totalCapacity: number, + private definitions: TaskDictionary + ) { this.client = null; } - public setClient(client: TaskManager) { - this.client = client; + public async setClient( + cb: ( + logger: TaskManagerLogger, + totalCapacity: number, + definitions: TaskDictionary + ) => Promise + ) { + this.client = await cb(this.logger, this.totalCapacity, this.definitions); } public schedule(task: TaskInstance) { - const { error } = checkClient(this.client); - if (error) { - throw error; - } - return this.client.schedule(task); + return this.client ? this.client.schedule(task) : null; } public remove(id: string) { - const { error } = checkClient(this.client); - if (error) { - throw error; - } - return this.client.remove(id); + return this.client ? this.client.remove(id) : null; } - public fetch(opts: FetchOpts = {}): Promise { - const { error } = checkClient(this.client); - if (error) { - throw error; - } - return this.client.fetch(opts); + public fetch(opts: FetchOpts = {}) { + return this.client ? this.client.fetch(opts) : null; } } diff --git a/src/server/task_manager/default_client.ts b/src/server/task_manager/default_client.ts new file mode 100644 index 00000000000000..f8843aee47b754 --- /dev/null +++ b/src/server/task_manager/default_client.ts @@ -0,0 +1,77 @@ +/* + * Licensed to Elasticsearch B.V. under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch B.V. licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import { TaskDictionary, TaskManager, TaskManagerLogger, TaskPool, TaskStore } from './task_pool'; +import { fillPool } from './task_pool/fill_pool'; +import { ConcreteTaskInstance, SanitizedTaskDefinition } from './task_pool/task'; +import { TaskPoller } from './task_pool/task_poller'; +import { TaskManagerRunner } from './task_pool/task_runner'; + +export async function getDefaultClient( + kbnServer: any, + server: any, + config: any, + logger: TaskManagerLogger, + totalCapacity: number, + definitions: TaskDictionary +) { + const callCluster = server.plugins.elasticsearch.getCluster('admin').callWithInternalUser; + const store = new TaskStore({ + index: config.get('taskManager.index'), + callCluster, + maxAttempts: config.get('taskManager.max_attempts'), + supportedTypes: Object.keys(definitions), + }); + + logger.debug('Initializing the task manager index'); + await store.init(); + + const pool = new TaskPool({ + logger, + totalCapacity, + }); + + const contextProvider = async (taskInstance: ConcreteTaskInstance) => ({ + callCluster, + kbnServer, + taskInstance, + }); + + const poller = new TaskPoller({ + logger, + pollInterval: config.get('taskManager.poll_interval'), + work: () => + fillPool( + pool.run, + store.fetchAvailableTasks, + (instance: ConcreteTaskInstance) => + new TaskManagerRunner({ + logger, + definition: definitions[instance.taskType], + instance, + store, + contextProvider, + }) + ), + }); + + poller.start(); + + return new TaskManager({ store, poller }); +} diff --git a/src/server/task_manager/task_manager_mixin.ts b/src/server/task_manager/task_manager_mixin.ts index 1fba97aa5385c0..423123638b4212 100644 --- a/src/server/task_manager/task_manager_mixin.ts +++ b/src/server/task_manager/task_manager_mixin.ts @@ -19,72 +19,34 @@ import Joi from 'joi'; import { TaskManagerClientWrapper } from './client_wrapper'; +import { getDefaultClient } from './default_client'; import { TaskDefinition, TaskDictionary, - TaskManager, TaskManagerLogger, - TaskPool, - TaskStore, validateTaskDefinition, } from './task_pool'; -import { fillPool } from './task_pool/fill_pool'; -import { ConcreteTaskInstance, SanitizedTaskDefinition } from './task_pool/task'; -import { TaskPoller } from './task_pool/task_poller'; -import { TaskManagerRunner } from './task_pool/task_runner'; +import { SanitizedTaskDefinition } from './task_pool/task'; export async function taskManagerMixin(kbnServer: any, server: any, config: any) { const logger = new TaskManagerLogger((...args) => server.log(...args)); const totalCapacity = config.get('taskManager.num_workers'); const definitions = extractTaskDefinitions(totalCapacity, kbnServer.uiExports.taskDefinitions); - server.decorate('server', 'taskManager', new TaskManagerClientWrapper()); + server.decorate( + 'server', + 'taskManager', + new TaskManagerClientWrapper(logger, totalCapacity, definitions) + ); kbnServer.afterPluginsInit(async () => { - const callCluster = server.plugins.elasticsearch.getCluster('admin').callWithInternalUser; - const store = new TaskStore({ - index: config.get('taskManager.index'), - callCluster, - maxAttempts: config.get('taskManager.max_attempts'), - supportedTypes: Object.keys(definitions), - }); - - logger.debug('Initializing the task manager index'); - await store.init(); - - const pool = new TaskPool({ - logger, - totalCapacity, - }); - - const contextProvider = async (taskInstance: ConcreteTaskInstance) => ({ - callCluster, - kbnServer, - taskInstance, - }); - - const poller = new TaskPoller({ - logger, - pollInterval: config.get('taskManager.poll_interval'), - work: () => - fillPool( - pool.run, - store.fetchAvailableTasks, - (instance: ConcreteTaskInstance) => - new TaskManagerRunner({ - logger, - instance, - store, - contextProvider, - definition: definitions[instance.taskType], - }) - ), - }); - - poller.start(); - - const client = new TaskManager({ store, poller }); - server.taskManager.setClient(client); + server.taskManager.setClient( + ( + cLogger: TaskManagerLogger, + cTotalCapacity: number, + cDefinitions: TaskDictionary + ) => getDefaultClient(kbnServer, server, config, cLogger, cTotalCapacity, cDefinitions) + ); }); } From 7482c0dc6f5c94f1d95222f3d2db815f284bd087 Mon Sep 17 00:00:00 2001 From: Timothy Sullivan Date: Tue, 11 Sep 2018 15:45:59 -0700 Subject: [PATCH 04/12] fix misidentified field name --- src/server/task_manager/task_manager_mixin.ts | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/server/task_manager/task_manager_mixin.ts b/src/server/task_manager/task_manager_mixin.ts index 423123638b4212..ee3b35d15cd9c3 100644 --- a/src/server/task_manager/task_manager_mixin.ts +++ b/src/server/task_manager/task_manager_mixin.ts @@ -60,11 +60,11 @@ function extractTaskDefinitions( const rawDefinition = taskDefinitions[type]; rawDefinition.type = type; const definition = Joi.attempt(rawDefinition, validateTaskDefinition) as TaskDefinition; - const workersOccupied = Math.min(numWorkers, definition.workersOccupied || 1); + const workersOccupied = Math.min(numWorkers, definition.numWorkers || 1); acc[type] = { ...definition, - workersOccupied, + numWorkers: workersOccupied, }; return acc; From 48487b58d2225bc281bee120b716e1851d868a54 Mon Sep 17 00:00:00 2001 From: Timothy Sullivan Date: Tue, 11 Sep 2018 15:50:06 -0700 Subject: [PATCH 05/12] fix rest args warning --- src/server/task_manager/task_manager_mixin.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/server/task_manager/task_manager_mixin.ts b/src/server/task_manager/task_manager_mixin.ts index ee3b35d15cd9c3..a6b0bc5daec5ed 100644 --- a/src/server/task_manager/task_manager_mixin.ts +++ b/src/server/task_manager/task_manager_mixin.ts @@ -29,7 +29,7 @@ import { import { SanitizedTaskDefinition } from './task_pool/task'; export async function taskManagerMixin(kbnServer: any, server: any, config: any) { - const logger = new TaskManagerLogger((...args) => server.log(...args)); + const logger = new TaskManagerLogger((...args: any[]) => server.log(...args)); const totalCapacity = config.get('taskManager.num_workers'); const definitions = extractTaskDefinitions(totalCapacity, kbnServer.uiExports.taskDefinitions); From b89cc9e8d02f63021e81123c54af7f394c0f5624 Mon Sep 17 00:00:00 2001 From: Timothy Sullivan Date: Fri, 7 Sep 2018 14:15:09 -0700 Subject: [PATCH 06/12] return raw task doc fields for calling code --- src/server/task_manager/README.md | 2 +- src/server/task_manager/client_wrapper.ts | 37 +++++++++++++++++++++-- src/server/task_manager/task_manager.ts | 11 ++++--- src/server/task_manager/task_store.ts | 8 ++--- 4 files changed, 45 insertions(+), 13 deletions(-) diff --git a/src/server/task_manager/README.md b/src/server/task_manager/README.md index 389d9c4bf06c7c..46a17705d8ce4a 100644 --- a/src/server/task_manager/README.md +++ b/src/server/task_manager/README.md @@ -201,7 +201,7 @@ const manager = server.taskManager; // Schedules a task. All properties are as documented in the previous // storage section, except that here, params is an object, not a JSON // string. -const task = manager.schedule({ +const task = await manager.schedule({ taskType, runAt, interval, diff --git a/src/server/task_manager/client_wrapper.ts b/src/server/task_manager/client_wrapper.ts index 8c8d94d5d342cb..f026e588f52fe1 100644 --- a/src/server/task_manager/client_wrapper.ts +++ b/src/server/task_manager/client_wrapper.ts @@ -19,6 +19,20 @@ import { TaskInstance } from './task'; import { TaskManager } from './task_manager'; +import { FetchOpts, FetchResult } from './task_pool/task_store'; + +interface ClientCheckResult { + error?: Error; +} + +function checkClient(client: TaskManager | null): ClientCheckResult { + if (client === null) { + return { + error: new Error('Task Manager Client has not been set properly!'), + }; + } + return {}; +} export class TaskManagerClientWrapper { private client: TaskManager | null; @@ -32,9 +46,26 @@ export class TaskManagerClientWrapper { } public schedule(task: TaskInstance) { - if (this.client == null) { - throw new Error('Task Manager Client has not been set properly!'); + const { error } = checkClient(this.client); + if (error) { + throw error; + } + return this.client.schedule(task); + } + + public remove(id: string) { + const { error } = checkClient(this.client); + if (error) { + throw error; + } + return this.client.remove(id); + } + + public fetch(opts: FetchOpts = {}): Promise { + const { error } = checkClient(this.client); + if (error) { + throw error; } - this.client.schedule(task); + return this.client.fetch(opts); } } diff --git a/src/server/task_manager/task_manager.ts b/src/server/task_manager/task_manager.ts index 693d6e48f3c5cb..263e68f0057e3f 100644 --- a/src/server/task_manager/task_manager.ts +++ b/src/server/task_manager/task_manager.ts @@ -19,7 +19,7 @@ import { TaskInstance } from './task'; import { TaskPoller } from './task_poller'; -import { FetchOpts, FetchResult, TaskStore } from './task_store'; +import { FetchOpts, FetchResult, RawTaskDoc, TaskStore } from './task_store'; interface Opts { poller: TaskPoller; @@ -35,16 +35,17 @@ export class TaskManager { this.store = opts.store; } - public async schedule(task: TaskInstance) { - await this.store.schedule(task); - this.poller.attemptWork(); + public async schedule(task: TaskInstance): Promise { + const result = await this.store.schedule(task); + this.poller.attemptWork(); // TODO await this? + return result; } public fetch(opts: FetchOpts = {}): Promise { return this.store.fetch(opts); } - public remove(id: string): Promise { + public remove(id: string): Promise { return this.store.remove(id); } } diff --git a/src/server/task_manager/task_store.ts b/src/server/task_manager/task_store.ts index 134229f4bdec65..81325e4679da19 100644 --- a/src/server/task_manager/task_store.ts +++ b/src/server/task_manager/task_store.ts @@ -39,7 +39,7 @@ export interface FetchResult { } // Internal, the raw document, as stored in the Kibana index. -interface RawTaskDoc { +export interface RawTaskDoc { _id: string; _index: string; _type: string; @@ -133,7 +133,7 @@ export class TaskStore { } } - public schedule(task: TaskInstance) { + public schedule(task: TaskInstance): Promise { return this.callCluster('index', { index: this.index, type: DOC_TYPE, @@ -218,7 +218,7 @@ export class TaskStore { * @param {string} id * @returns {Promise} */ - public async remove(id: string): Promise { + public async remove(id: string): Promise { return this.callCluster('delete', { id, index: this.index, @@ -240,7 +240,7 @@ export class TaskStore { function paginatableSort(sort: any[] = []) { if (!sort.length) { - return [{ runAt: 'asc' }, { _id: 'desc' }]; + return [{ 'task.runAt': 'asc' }, { _id: 'desc' }]; } if (sort.find(({ _id }) => !!_id)) { From 3469f721991f058e9cffbf7b9e65abda7fc33be8 Mon Sep 17 00:00:00 2001 From: Timothy Sullivan Date: Tue, 11 Sep 2018 12:51:38 -0700 Subject: [PATCH 07/12] remove todo comment --- src/server/task_manager/task_manager.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/server/task_manager/task_manager.ts b/src/server/task_manager/task_manager.ts index 263e68f0057e3f..13fa47211310ee 100644 --- a/src/server/task_manager/task_manager.ts +++ b/src/server/task_manager/task_manager.ts @@ -37,7 +37,7 @@ export class TaskManager { public async schedule(task: TaskInstance): Promise { const result = await this.store.schedule(task); - this.poller.attemptWork(); // TODO await this? + this.poller.attemptWork(); return result; } From 8b407bb9d2af52e09f693debced9bcd620b4e1c5 Mon Sep 17 00:00:00 2001 From: Timothy Sullivan Date: Tue, 11 Sep 2018 15:45:15 -0700 Subject: [PATCH 08/12] helper module for default client - setClient takes a callback fn --- src/server/task_manager/client_wrapper.ts | 49 ++++-------- src/server/task_manager/default_client.ts | 77 +++++++++++++++++++ src/server/task_manager/task_manager_mixin.ts | 58 ++++---------- 3 files changed, 107 insertions(+), 77 deletions(-) create mode 100644 src/server/task_manager/default_client.ts diff --git a/src/server/task_manager/client_wrapper.ts b/src/server/task_manager/client_wrapper.ts index f026e588f52fe1..553ebca27930a0 100644 --- a/src/server/task_manager/client_wrapper.ts +++ b/src/server/task_manager/client_wrapper.ts @@ -21,51 +21,36 @@ import { TaskInstance } from './task'; import { TaskManager } from './task_manager'; import { FetchOpts, FetchResult } from './task_pool/task_store'; -interface ClientCheckResult { - error?: Error; -} - -function checkClient(client: TaskManager | null): ClientCheckResult { - if (client === null) { - return { - error: new Error('Task Manager Client has not been set properly!'), - }; - } - return {}; -} - export class TaskManagerClientWrapper { private client: TaskManager | null; - constructor() { + constructor( + private logger: TaskManagerLogger, + private totalCapacity: number, + private definitions: TaskDictionary + ) { this.client = null; } - public setClient(client: TaskManager) { - this.client = client; + public async setClient( + cb: ( + logger: TaskManagerLogger, + totalCapacity: number, + definitions: TaskDictionary + ) => Promise + ) { + this.client = await cb(this.logger, this.totalCapacity, this.definitions); } public schedule(task: TaskInstance) { - const { error } = checkClient(this.client); - if (error) { - throw error; - } - return this.client.schedule(task); + return this.client ? this.client.schedule(task) : null; } public remove(id: string) { - const { error } = checkClient(this.client); - if (error) { - throw error; - } - return this.client.remove(id); + return this.client ? this.client.remove(id) : null; } - public fetch(opts: FetchOpts = {}): Promise { - const { error } = checkClient(this.client); - if (error) { - throw error; - } - return this.client.fetch(opts); + public fetch(opts: FetchOpts = {}) { + return this.client ? this.client.fetch(opts) : null; } } diff --git a/src/server/task_manager/default_client.ts b/src/server/task_manager/default_client.ts new file mode 100644 index 00000000000000..f8843aee47b754 --- /dev/null +++ b/src/server/task_manager/default_client.ts @@ -0,0 +1,77 @@ +/* + * Licensed to Elasticsearch B.V. under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch B.V. licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import { TaskDictionary, TaskManager, TaskManagerLogger, TaskPool, TaskStore } from './task_pool'; +import { fillPool } from './task_pool/fill_pool'; +import { ConcreteTaskInstance, SanitizedTaskDefinition } from './task_pool/task'; +import { TaskPoller } from './task_pool/task_poller'; +import { TaskManagerRunner } from './task_pool/task_runner'; + +export async function getDefaultClient( + kbnServer: any, + server: any, + config: any, + logger: TaskManagerLogger, + totalCapacity: number, + definitions: TaskDictionary +) { + const callCluster = server.plugins.elasticsearch.getCluster('admin').callWithInternalUser; + const store = new TaskStore({ + index: config.get('taskManager.index'), + callCluster, + maxAttempts: config.get('taskManager.max_attempts'), + supportedTypes: Object.keys(definitions), + }); + + logger.debug('Initializing the task manager index'); + await store.init(); + + const pool = new TaskPool({ + logger, + totalCapacity, + }); + + const contextProvider = async (taskInstance: ConcreteTaskInstance) => ({ + callCluster, + kbnServer, + taskInstance, + }); + + const poller = new TaskPoller({ + logger, + pollInterval: config.get('taskManager.poll_interval'), + work: () => + fillPool( + pool.run, + store.fetchAvailableTasks, + (instance: ConcreteTaskInstance) => + new TaskManagerRunner({ + logger, + definition: definitions[instance.taskType], + instance, + store, + contextProvider, + }) + ), + }); + + poller.start(); + + return new TaskManager({ store, poller }); +} diff --git a/src/server/task_manager/task_manager_mixin.ts b/src/server/task_manager/task_manager_mixin.ts index b9482834b5b5e9..b5ddbea30dba87 100644 --- a/src/server/task_manager/task_manager_mixin.ts +++ b/src/server/task_manager/task_manager_mixin.ts @@ -21,6 +21,7 @@ import Joi from 'joi'; import { TaskManagerClientWrapper } from './client_wrapper'; import { fillPool } from './fill_pool'; import { TaskManagerLogger } from './logger'; +import { getDefaultClient } from './default_client'; import { ConcreteTaskInstance, SanitizedTaskDefinition, @@ -39,53 +40,20 @@ export async function taskManagerMixin(kbnServer: any, server: any, config: any) const totalCapacity = config.get('taskManager.num_workers'); const definitions = extractTaskDefinitions(totalCapacity, kbnServer.uiExports.taskDefinitions); - server.decorate('server', 'taskManager', new TaskManagerClientWrapper()); + server.decorate( + 'server', + 'taskManager', + new TaskManagerClientWrapper(logger, totalCapacity, definitions) + ); kbnServer.afterPluginsInit(async () => { - const callCluster = server.plugins.elasticsearch.getCluster('admin').callWithInternalUser; - const store = new TaskStore({ - index: config.get('taskManager.index'), - callCluster, - maxAttempts: config.get('taskManager.max_attempts'), - supportedTypes: Object.keys(definitions), - }); - - logger.debug('Initializing the task manager index'); - await store.init(); - - const pool = new TaskPool({ - logger, - totalCapacity, - }); - - const contextProvider = async (taskInstance: ConcreteTaskInstance) => ({ - callCluster, - kbnServer, - taskInstance, - }); - - const poller = new TaskPoller({ - logger, - pollInterval: config.get('taskManager.poll_interval'), - work: () => - fillPool( - pool.run, - store.fetchAvailableTasks, - (instance: ConcreteTaskInstance) => - new TaskManagerRunner({ - logger, - instance, - store, - contextProvider, - definition: definitions[instance.taskType], - }) - ), - }); - - poller.start(); - - const client = new TaskManager({ store, poller }); - server.taskManager.setClient(client); + server.taskManager.setClient( + ( + cLogger: TaskManagerLogger, + cTotalCapacity: number, + cDefinitions: TaskDictionary + ) => getDefaultClient(kbnServer, server, config, cLogger, cTotalCapacity, cDefinitions) + ); }); } From 30d568458563acf78459072a6255c826a1246b38 Mon Sep 17 00:00:00 2001 From: Timothy Sullivan Date: Tue, 11 Sep 2018 15:50:06 -0700 Subject: [PATCH 09/12] fix rest args warning --- src/server/task_manager/task_manager_mixin.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/server/task_manager/task_manager_mixin.ts b/src/server/task_manager/task_manager_mixin.ts index b5ddbea30dba87..0f2912d8c8ab96 100644 --- a/src/server/task_manager/task_manager_mixin.ts +++ b/src/server/task_manager/task_manager_mixin.ts @@ -36,7 +36,7 @@ import { TaskManagerRunner } from './task_runner'; import { TaskStore } from './task_store'; export async function taskManagerMixin(kbnServer: any, server: any, config: any) { - const logger = new TaskManagerLogger((...args) => server.log(...args)); + const logger = new TaskManagerLogger((...args: any[]) => server.log(...args)); const totalCapacity = config.get('taskManager.num_workers'); const definitions = extractTaskDefinitions(totalCapacity, kbnServer.uiExports.taskDefinitions); From 85e6c8e7c50fdd40e85f20c9335c5e44b542a994 Mon Sep 17 00:00:00 2001 From: Timothy Sullivan Date: Wed, 12 Sep 2018 15:57:09 -0700 Subject: [PATCH 10/12] typescript fixes --- src/server/task_manager/client_wrapper.ts | 5 +++-- src/server/task_manager/default_client.ts | 13 ++++++++----- src/server/task_manager/task_manager_mixin.ts | 9 +-------- 3 files changed, 12 insertions(+), 15 deletions(-) diff --git a/src/server/task_manager/client_wrapper.ts b/src/server/task_manager/client_wrapper.ts index 553ebca27930a0..f031c0801ae05b 100644 --- a/src/server/task_manager/client_wrapper.ts +++ b/src/server/task_manager/client_wrapper.ts @@ -17,9 +17,10 @@ * under the License. */ -import { TaskInstance } from './task'; +import { TaskManagerLogger } from './logger'; +import { SanitizedTaskDefinition, TaskDictionary, TaskInstance } from './task'; import { TaskManager } from './task_manager'; -import { FetchOpts, FetchResult } from './task_pool/task_store'; +import { FetchOpts } from './task_store'; export class TaskManagerClientWrapper { private client: TaskManager | null; diff --git a/src/server/task_manager/default_client.ts b/src/server/task_manager/default_client.ts index f8843aee47b754..d7cf4a946dc817 100644 --- a/src/server/task_manager/default_client.ts +++ b/src/server/task_manager/default_client.ts @@ -17,11 +17,14 @@ * under the License. */ -import { TaskDictionary, TaskManager, TaskManagerLogger, TaskPool, TaskStore } from './task_pool'; -import { fillPool } from './task_pool/fill_pool'; -import { ConcreteTaskInstance, SanitizedTaskDefinition } from './task_pool/task'; -import { TaskPoller } from './task_pool/task_poller'; -import { TaskManagerRunner } from './task_pool/task_runner'; +import { fillPool } from './fill_pool'; +import { TaskManagerLogger } from './logger'; +import { ConcreteTaskInstance, SanitizedTaskDefinition, TaskDictionary } from './task'; +import { TaskManager } from './task_manager'; +import { TaskPoller } from './task_poller'; +import { TaskPool } from './task_pool'; +import { TaskManagerRunner } from './task_runner'; +import { TaskStore } from './task_store'; export async function getDefaultClient( kbnServer: any, diff --git a/src/server/task_manager/task_manager_mixin.ts b/src/server/task_manager/task_manager_mixin.ts index 0f2912d8c8ab96..dca244bed29abb 100644 --- a/src/server/task_manager/task_manager_mixin.ts +++ b/src/server/task_manager/task_manager_mixin.ts @@ -19,21 +19,14 @@ import Joi from 'joi'; import { TaskManagerClientWrapper } from './client_wrapper'; -import { fillPool } from './fill_pool'; -import { TaskManagerLogger } from './logger'; import { getDefaultClient } from './default_client'; +import { TaskManagerLogger } from './logger'; import { - ConcreteTaskInstance, SanitizedTaskDefinition, TaskDefinition, TaskDictionary, validateTaskDefinition, } from './task'; -import { TaskManager } from './task_manager'; -import { TaskPoller } from './task_poller'; -import { TaskPool } from './task_pool'; -import { TaskManagerRunner } from './task_runner'; -import { TaskStore } from './task_store'; export async function taskManagerMixin(kbnServer: any, server: any, config: any) { const logger = new TaskManagerLogger((...args: any[]) => server.log(...args)); From edc5852063999c9016793b6b7ae2fd9af1b882d1 Mon Sep 17 00:00:00 2001 From: Timothy Sullivan Date: Wed, 12 Sep 2018 21:47:44 -0700 Subject: [PATCH 11/12] roll back setClient takes a callback --- src/server/task_manager/client_wrapper.ts | 19 ++++-------------- src/server/task_manager/default_client.ts | 2 +- src/server/task_manager/task_manager_mixin.ts | 20 +++++++++---------- 3 files changed, 14 insertions(+), 27 deletions(-) diff --git a/src/server/task_manager/client_wrapper.ts b/src/server/task_manager/client_wrapper.ts index f031c0801ae05b..e37922d11e1b68 100644 --- a/src/server/task_manager/client_wrapper.ts +++ b/src/server/task_manager/client_wrapper.ts @@ -17,30 +17,19 @@ * under the License. */ -import { TaskManagerLogger } from './logger'; -import { SanitizedTaskDefinition, TaskDictionary, TaskInstance } from './task'; +import { TaskInstance } from './task'; import { TaskManager } from './task_manager'; import { FetchOpts } from './task_store'; export class TaskManagerClientWrapper { private client: TaskManager | null; - constructor( - private logger: TaskManagerLogger, - private totalCapacity: number, - private definitions: TaskDictionary - ) { + constructor() { this.client = null; } - public async setClient( - cb: ( - logger: TaskManagerLogger, - totalCapacity: number, - definitions: TaskDictionary - ) => Promise - ) { - this.client = await cb(this.logger, this.totalCapacity, this.definitions); + public async setClient(client: TaskManager) { + this.client = client; } public schedule(task: TaskInstance) { diff --git a/src/server/task_manager/default_client.ts b/src/server/task_manager/default_client.ts index d7cf4a946dc817..385672fdaf89c3 100644 --- a/src/server/task_manager/default_client.ts +++ b/src/server/task_manager/default_client.ts @@ -33,7 +33,7 @@ export async function getDefaultClient( logger: TaskManagerLogger, totalCapacity: number, definitions: TaskDictionary -) { +): Promise { const callCluster = server.plugins.elasticsearch.getCluster('admin').callWithInternalUser; const store = new TaskStore({ index: config.get('taskManager.index'), diff --git a/src/server/task_manager/task_manager_mixin.ts b/src/server/task_manager/task_manager_mixin.ts index dca244bed29abb..29daeaaf033db4 100644 --- a/src/server/task_manager/task_manager_mixin.ts +++ b/src/server/task_manager/task_manager_mixin.ts @@ -33,20 +33,18 @@ export async function taskManagerMixin(kbnServer: any, server: any, config: any) const totalCapacity = config.get('taskManager.num_workers'); const definitions = extractTaskDefinitions(totalCapacity, kbnServer.uiExports.taskDefinitions); - server.decorate( - 'server', - 'taskManager', - new TaskManagerClientWrapper(logger, totalCapacity, definitions) - ); + server.decorate('server', 'taskManager', new TaskManagerClientWrapper()); kbnServer.afterPluginsInit(async () => { - server.taskManager.setClient( - ( - cLogger: TaskManagerLogger, - cTotalCapacity: number, - cDefinitions: TaskDictionary - ) => getDefaultClient(kbnServer, server, config, cLogger, cTotalCapacity, cDefinitions) + const client = await getDefaultClient( + kbnServer, + server, + config, + logger, + totalCapacity, + definitions ); + server.taskManager.setClient(client); }); } From 6ad9282ae4a4bbedae265fd3ba1ea0c54582584e Mon Sep 17 00:00:00 2001 From: Timothy Sullivan Date: Wed, 12 Sep 2018 23:08:43 -0700 Subject: [PATCH 12/12] createTaskRunner returns an object with run/cancel functions --- src/server/task_manager/task.ts | 22 +++++++++------------- src/server/task_manager/task_runner.ts | 3 ++- 2 files changed, 11 insertions(+), 14 deletions(-) diff --git a/src/server/task_manager/task.ts b/src/server/task_manager/task.ts index a8dc0c4267b6af..d4592f967b0d42 100644 --- a/src/server/task_manager/task.ts +++ b/src/server/task_manager/task.ts @@ -84,10 +84,13 @@ export const validateRunResult = Joi.object({ state: Joi.object().optional(), }).optional(); -/** - * The type signature of the function that performs a task. - */ -export type RunFunction = (context: RunContext) => PromiseLike; +export type RunFunction = () => PromiseLike; + +export type CancelFunction = () => PromiseLike; + +export type TaskRunCreatorFunction = ( + context: RunContext +) => { run: RunFunction; cancel?: CancelFunction }; /** * Defines a task which can be scheduled and run by the Kibana @@ -123,14 +126,7 @@ export interface TaskDefinition { */ numWorkers?: number; - /** - * A function which, does the work this task is built to do. Note, - * this is a *function* and is not guaranteed to be called with - * the *this* context of the task. - * - * @memberof TaskDefinition - */ - run: RunFunction; + createTaskRunner: TaskRunCreatorFunction; } /** @@ -146,7 +142,7 @@ export const validateTaskDefinition = Joi.object({ description: Joi.string().optional(), timeOut: Joi.string().default('5m'), numWorkers: Joi.number().default(1), - run: Joi.func().required(), + createTaskRunner: Joi.func().required(), }).default(); /** diff --git a/src/server/task_manager/task_runner.ts b/src/server/task_manager/task_runner.ts index b1bc0ff92ffc94..b86eb32aaca3b6 100644 --- a/src/server/task_manager/task_runner.ts +++ b/src/server/task_manager/task_runner.ts @@ -146,7 +146,8 @@ export class TaskManagerRunner implements TaskRunner { try { this.logger.debug(`Running task ${this}`); const context = await this.contextProvider(this.instance); - this.promise = this.definition.run(context); + const taskRunner = this.definition.createTaskRunner(context); + this.promise = taskRunner.run(); return this.processResult(this.validateResult(await this.promise)); } catch (error) { this.logger.warning(`Task ${this} failed ${error.stack}`);