Skip to content

Commit

Permalink
refactor helper function plain bonkers signature
Browse files Browse the repository at this point in the history
  • Loading branch information
tsullivan committed Nov 27, 2019
1 parent 52b9741 commit 00870d6
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 22 deletions.
40 changes: 21 additions & 19 deletions x-pack/legacy/plugins/reporting/server/lib/create_worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,7 @@ import { events as esqueueEvents } from './esqueue';
import { LevelLogger } from './level_logger';

export function createWorkerFactory<JobParamsType>(server: ServerFacade) {
type JobSourceType = JobSource<JobParamsType>;
type ExecuteFnType = ESQueueWorkerExecuteFn<JobParamsType> | ImmediateExecuteFn<JobParamsType>;

type JobDocPayloadType = JobDocPayload<JobParamsType>;
const config = server.config();
const logger = LevelLogger.createForServer(server, [PLUGIN_ID, 'queue-worker']);
const queueConfig: QueueConfig = config.get('xpack.reporting.queue');
Expand All @@ -33,40 +31,44 @@ export function createWorkerFactory<JobParamsType>(server: ServerFacade) {
const { exportTypesRegistry } = server.plugins.reporting!;

// Once more document types are added, this will need to be passed in
return function createWorker(queue: ESQueueInstance) {
return function createWorker(queue: ESQueueInstance<JobParamsType, JobDocPayloadType>) {
// export type / execute job map
const jobExecutors: Map<string, ExecuteFnType> = new Map();
const jobExecutors: Map<string, any> = new Map();

for (const exportType of exportTypesRegistry.getAll() as Array<
ExportTypeDefinition<JobParamsType, any, any, ExecuteFnType>
ExportTypeDefinition<JobParamsType, any, any, any>
>) {
const executeJobFactory = exportType.executeJobFactory(server);
jobExecutors.set(exportType.jobType, executeJobFactory);
}

const workerFn = (
{ _id: jobId, _source: { jobtype: jobType } }: JobSourceType,
arg1: JobDocPayload<JobParamsType> | JobParamsType,
arg2: CancellationToken | RequestFacade | undefined
) => {
const workerFn = (jobSource: JobSource<JobParamsType>, ...workerRestArgs: any[]) => {
const {
_id: jobId,
_source: { jobtype: jobType },
} = jobSource;

const jobTypeExecutor = jobExecutors.get(jobType);
// pass the work to the jobExecutor
if (!jobExecutors.get(jobType)) {
if (!jobTypeExecutor) {
throw new Error(`Unable to find a job executor for the claimed job: [${jobId}]`);
}
// job executor function signature is different depending on whether it
// is ESQueueWorkerExecuteFn or ImmediateExecuteFn
const jobExecutorWorker = jobExecutors.get(jobType) as ESQueueWorkerExecuteFn<JobParamsType>;
const jobExecutorImmediate = jobExecutors.get(jobType) as ImmediateExecuteFn<JobParamsType>;

if (jobId) {
return jobExecutorWorker(jobId, arg1 as JobParamsType, arg2 as CancellationToken);
const jobExecutorWorker = jobTypeExecutor as ESQueueWorkerExecuteFn<JobDocPayloadType>;
return jobExecutorWorker(
jobId,
...(workerRestArgs as [JobDocPayloadType, CancellationToken])
);
} else {
const jobExecutorImmediate = jobExecutors.get(jobType) as ImmediateExecuteFn<JobParamsType>;
return jobExecutorImmediate(
null,
arg1 as JobDocPayload<JobParamsType>,
arg2 as RequestFacade
...(workerRestArgs as [JobDocPayload<JobParamsType>, RequestFacade])
);
}
};

const workerOptions = {
kibanaName,
kibanaId,
Expand Down
10 changes: 7 additions & 3 deletions x-pack/legacy/plugins/reporting/types.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -289,10 +289,14 @@ export interface ESQueueWorkerOptions {
intervalErrorMultiplier: number;
}

export interface ESQueueInstance {
// workerFn is a generic for ImmediateExecuteFn<JobParamsType> | ESQueueWorkerExecuteFn<JobDocPayloadType>,
export interface ESQueueInstance<JobParamsType, JobDocPayloadType> {
registerWorker: (
jobtype: string,
workerFn: any,
pluginId: string,
workerFn: (
jobSource: JobSource<JobParamsType>,
...workerRestArgs: any[]
) => void | Promise<JobDocOutputExecuted>,
workerOptions: ESQueueWorkerOptions
) => ESQueueWorker;
}
Expand Down

0 comments on commit 00870d6

Please sign in to comment.