Skip to content

Commit

Permalink
Merge pull request elastic#3 from tsullivan/alerting/task-scheduler-r…
Browse files Browse the repository at this point in the history
…eturn-taskdoc-to-caller

add default client module, more wrapper methods
  • Loading branch information
chrisdavies authored Sep 13, 2018
2 parents 7b01dca + 6893e56 commit 8cf48ff
Show file tree
Hide file tree
Showing 8 changed files with 120 additions and 77 deletions.
2 changes: 1 addition & 1 deletion src/server/task_manager/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ const manager = server.taskManager;
// Schedules a task. All properties are as documented in the previous
// storage section, except that here, params is an object, not a JSON
// string.
const task = manager.schedule({
const task = await manager.schedule({
taskType,
runAt,
interval,
Expand Down
16 changes: 11 additions & 5 deletions src/server/task_manager/client_wrapper.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import { TaskInstance } from './task';
import { TaskManager } from './task_manager';
import { FetchOpts } from './task_store';

export class TaskManagerClientWrapper {
private client: TaskManager | null;
Expand All @@ -27,14 +28,19 @@ export class TaskManagerClientWrapper {
this.client = null;
}

public setClient(client: TaskManager) {
public async setClient(client: TaskManager) {
this.client = client;
}

public schedule(task: TaskInstance) {
if (this.client == null) {
throw new Error('Task Manager Client has not been set properly!');
}
this.client.schedule(task);
return this.client ? this.client.schedule(task) : null;
}

public remove(id: string) {
return this.client ? this.client.remove(id) : null;
}

public fetch(opts: FetchOpts = {}) {
return this.client ? this.client.fetch(opts) : null;
}
}
80 changes: 80 additions & 0 deletions src/server/task_manager/default_client.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
/*
* Licensed to Elasticsearch B.V. under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch B.V. licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

import { fillPool } from './fill_pool';
import { TaskManagerLogger } from './logger';
import { ConcreteTaskInstance, SanitizedTaskDefinition, TaskDictionary } from './task';
import { TaskManager } from './task_manager';
import { TaskPoller } from './task_poller';
import { TaskPool } from './task_pool';
import { TaskManagerRunner } from './task_runner';
import { TaskStore } from './task_store';

export async function getDefaultClient(
kbnServer: any,
server: any,
config: any,
logger: TaskManagerLogger,
maxWorkers: number,
definitions: TaskDictionary<SanitizedTaskDefinition>
): Promise<TaskManager> {
const callCluster = server.plugins.elasticsearch.getCluster('admin').callWithInternalUser;
const store = new TaskStore({
index: config.get('taskManager.index'),
callCluster,
maxAttempts: config.get('taskManager.max_attempts'),
supportedTypes: Object.keys(definitions),
});

logger.debug('Initializing the task manager index');
await store.init();

const pool = new TaskPool({
logger,
maxWorkers,
});

const contextProvider = async (taskInstance: ConcreteTaskInstance) => ({
callCluster,
kbnServer,
taskInstance,
});

const poller = new TaskPoller({
logger,
pollInterval: config.get('taskManager.poll_interval'),
work: () =>
fillPool(
pool.run,
store.fetchAvailableTasks,
(instance: ConcreteTaskInstance) =>
new TaskManagerRunner({
logger,
definition: definitions[instance.taskType],
instance,
store,
contextProvider,
})
),
});

poller.start();

return new TaskManager({ store, poller });
}
22 changes: 9 additions & 13 deletions src/server/task_manager/task.ts
Original file line number Diff line number Diff line change
Expand Up @@ -84,10 +84,13 @@ export const validateRunResult = Joi.object({
state: Joi.object().optional(),
}).optional();

/**
* The type signature of the function that performs a task.
*/
export type RunFunction = (context: RunContext) => PromiseLike<RunResult | undefined>;
export type RunFunction = () => PromiseLike<RunResult | undefined>;

export type CancelFunction = () => PromiseLike<RunResult | undefined>;

export type TaskRunCreatorFunction = (
context: RunContext
) => { run: RunFunction; cancel?: CancelFunction };

/**
* Defines a task which can be scheduled and run by the Kibana
Expand Down Expand Up @@ -123,14 +126,7 @@ export interface TaskDefinition {
*/
numWorkers?: number;

/**
* A function which, does the work this task is built to do. Note,
* this is a *function* and is not guaranteed to be called with
* the *this* context of the task.
*
* @memberof TaskDefinition
*/
run: RunFunction;
createTaskRunner: TaskRunCreatorFunction;
}

/**
Expand All @@ -146,7 +142,7 @@ export const validateTaskDefinition = Joi.object({
description: Joi.string().optional(),
timeOut: Joi.string().default('5m'),
numWorkers: Joi.number().default(1),
run: Joi.func().required(),
createTaskRunner: Joi.func().required(),
}).default();

/**
Expand Down
9 changes: 5 additions & 4 deletions src/server/task_manager/task_manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

import { TaskInstance } from './task';
import { TaskPoller } from './task_poller';
import { FetchOpts, FetchResult, TaskStore } from './task_store';
import { FetchOpts, FetchResult, RawTaskDoc, TaskStore } from './task_store';

interface Opts {
poller: TaskPoller;
Expand All @@ -35,16 +35,17 @@ export class TaskManager {
this.store = opts.store;
}

public async schedule(task: TaskInstance) {
await this.store.schedule(task);
public async schedule(task: TaskInstance): Promise<RawTaskDoc> {
const result = await this.store.schedule(task);
this.poller.attemptWork();
return result;
}

public fetch(opts: FetchOpts = {}): Promise<FetchResult> {
return this.store.fetch(opts);
}

public remove(id: string): Promise<void> {
public remove(id: string): Promise<RawTaskDoc> {
return this.store.remove(id);
}
}
57 changes: 8 additions & 49 deletions src/server/task_manager/task_manager_mixin.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,72 +19,31 @@

import Joi from 'joi';
import { TaskManagerClientWrapper } from './client_wrapper';
import { fillPool } from './fill_pool';
import { getDefaultClient } from './default_client';
import { TaskManagerLogger } from './logger';
import {
ConcreteTaskInstance,
SanitizedTaskDefinition,
TaskDefinition,
TaskDictionary,
validateTaskDefinition,
} from './task';
import { TaskManager } from './task_manager';
import { TaskPoller } from './task_poller';
import { TaskPool } from './task_pool';
import { TaskManagerRunner } from './task_runner';
import { TaskStore } from './task_store';

export async function taskManagerMixin(kbnServer: any, server: any, config: any) {
const logger = new TaskManagerLogger((...args) => server.log(...args));
const logger = new TaskManagerLogger((...args: any[]) => server.log(...args));
const maxWorkers = config.get('taskManager.max_workers');
const definitions = extractTaskDefinitions(maxWorkers, kbnServer.uiExports.taskDefinitions);

server.decorate('server', 'taskManager', new TaskManagerClientWrapper());

kbnServer.afterPluginsInit(async () => {
const callCluster = server.plugins.elasticsearch.getCluster('admin').callWithInternalUser;
const store = new TaskStore({
index: config.get('taskManager.index'),
callCluster,
maxAttempts: config.get('taskManager.max_attempts'),
supportedTypes: Object.keys(definitions),
});

logger.debug('Initializing the task manager index');
await store.init();

const pool = new TaskPool({
logger,
maxWorkers,
});

const contextProvider = async (taskInstance: ConcreteTaskInstance) => ({
callCluster,
const client = await getDefaultClient(
kbnServer,
taskInstance,
});

const poller = new TaskPoller({
server,
config,
logger,
pollInterval: config.get('taskManager.poll_interval'),
work: () =>
fillPool(
pool.run,
store.fetchAvailableTasks,
(instance: ConcreteTaskInstance) =>
new TaskManagerRunner({
logger,
instance,
store,
contextProvider,
definition: definitions[instance.taskType],
})
),
});

poller.start();

const client = new TaskManager({ store, poller });
maxWorkers,
definitions
);
server.taskManager.setClient(client);
});
}
Expand Down
5 changes: 3 additions & 2 deletions src/server/task_manager/task_runner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,8 @@ export class TaskManagerRunner implements TaskRunner {
try {
this.logger.debug(`Running task ${this}`);
const context = await this.contextProvider(this.instance);
this.promise = this.definition.run(context);
const taskRunner = this.definition.createTaskRunner(context);
this.promise = taskRunner.run();
return this.processResult(this.validateResult(await this.promise));
} catch (error) {
this.logger.warning(`Task ${this} failed ${error.stack}`);
Expand Down Expand Up @@ -190,7 +191,7 @@ export class TaskManagerRunner implements TaskRunner {
* @memberof TaskManagerRunner
*/
public async cancel() {
const promise: any = this.promise;
const promise: any = this.promise; // needs to be the stored taskrunner from `const taskRunner = this.definition.createTaskRunner(context)`

if (promise && promise.cancel) {
this.promise = undefined;
Expand Down
6 changes: 3 additions & 3 deletions src/server/task_manager/task_store.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ export interface FetchResult {
}

// Internal, the raw document, as stored in the Kibana index.
interface RawTaskDoc {
export interface RawTaskDoc {
_id: string;
_index: string;
_type: string;
Expand Down Expand Up @@ -142,7 +142,7 @@ export class TaskStore {
*
* @param task - The task being scheduled.
*/
public schedule(task: TaskInstance) {
public schedule(task: TaskInstance): Promise<RawTaskDoc> {
return this.callCluster('index', {
index: this.index,
type: DOC_TYPE,
Expand Down Expand Up @@ -230,7 +230,7 @@ export class TaskStore {
* @param {string} id
* @returns {Promise<void>}
*/
public async remove(id: string): Promise<void> {
public async remove(id: string): Promise<RawTaskDoc> {
return this.callCluster('delete', {
id,
index: this.index,
Expand Down

0 comments on commit 8cf48ff

Please sign in to comment.