-
Notifications
You must be signed in to change notification settings - Fork 8.2k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
37 changed files
with
2,207 additions
and
0 deletions.
There are no files selected for viewing
14 changes: 14 additions & 0 deletions
14
x-pack/plugins/rule_registry/server/event_log/elasticsearch/index.ts
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,14 @@ | ||
/* | ||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one | ||
* or more contributor license agreements. Licensed under the Elastic License | ||
* 2.0; you may not use this file except in compliance with the Elastic License | ||
* 2.0. | ||
*/ | ||
|
||
export * from './index_bootstrapper'; | ||
export * from './index_management_gateway'; | ||
export * from './index_reader'; | ||
export * from './index_writer'; | ||
export * from './resources/ilm_policy'; | ||
export * from './resources/index_mappings'; | ||
export * from './resources/index_names'; |
188 changes: 188 additions & 0 deletions
188
x-pack/plugins/rule_registry/server/event_log/elasticsearch/index_bootstrapper.ts
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,188 @@ | ||
/* | ||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one | ||
* or more contributor license agreements. Licensed under the Elastic License | ||
* 2.0; you may not use this file except in compliance with the Elastic License | ||
* 2.0. | ||
*/ | ||
|
||
import type { PublicMethodsOf } from '@kbn/utility-types'; | ||
import { Logger } from 'src/core/server'; | ||
|
||
import { IndexNames } from './resources/index_names'; | ||
import { IndexMappings } from './resources/index_mappings'; | ||
import { createIndexTemplate } from './resources/index_template'; | ||
import { ILMPolicy, defaultIlmPolicy } from './resources/ilm_policy'; | ||
import { IIndexManagementGateway } from './index_management_gateway'; | ||
|
||
interface ConstructorParams { | ||
gateway: IIndexManagementGateway; | ||
logger: Logger; | ||
} | ||
|
||
export interface IndexSpecification { | ||
indexNames: IndexNames; | ||
indexMappings: IndexMappings; | ||
ilmPolicy?: ILMPolicy; | ||
} | ||
|
||
export type IIndexBootstrapper = PublicMethodsOf<IndexBootstrapper>; | ||
|
||
export class IndexBootstrapper { | ||
private readonly gateway: IIndexManagementGateway; | ||
private readonly logger: Logger; | ||
|
||
constructor(params: ConstructorParams) { | ||
this.gateway = params.gateway; | ||
this.logger = params.logger.get('IndexBootstrapper'); | ||
} | ||
|
||
public async run(indexSpec: IndexSpecification): Promise<boolean> { | ||
this.logger.debug('bootstrapping elasticsearch resources starting'); | ||
|
||
try { | ||
// TODO: handle race conditions and errors (perhaps within each step separately) | ||
const { indexNames, indexMappings, ilmPolicy } = indexSpec; | ||
await this.createIlmPolicyIfNotExists(indexNames, ilmPolicy); | ||
await this.createIndexTemplateIfNotExists(indexNames, indexMappings); | ||
await this.createInitialIndexIfNotExists(indexNames); | ||
} catch (err) { | ||
this.logger.error(`error bootstrapping elasticsearch resources: ${err.message}`); | ||
return false; | ||
} | ||
|
||
this.logger.debug('bootstrapping elasticsearch resources complete'); | ||
return true; | ||
} | ||
|
||
private async createIlmPolicyIfNotExists(names: IndexNames, policy?: ILMPolicy): Promise<void> { | ||
const { logIlmPolicyName } = names; | ||
|
||
const exists = await this.gateway.doesIlmPolicyExist(logIlmPolicyName); | ||
if (!exists) { | ||
const ilmPolicy = policy ?? defaultIlmPolicy; | ||
await this.gateway.createIlmPolicy(logIlmPolicyName, ilmPolicy); | ||
} | ||
|
||
//--------------- | ||
|
||
const ilmPolicyExists = await this.esAdapter.doesIlmPolicyExist(policyName); | ||
|
||
if (!ilmPolicyExists) { | ||
await this.esAdapter.createIlmPolicy( | ||
policyName, | ||
(this.options.ilmPolicy as unknown) as Record<string, unknown> | ||
); | ||
} | ||
} | ||
|
||
private async createIndexTemplateIfNotExists( | ||
names: IndexNames, | ||
mappings: IndexMappings | ||
): Promise<void> { | ||
const { logIndexTemplateName } = names; | ||
|
||
const exists = await this.gateway.doesIndexTemplateExist(logIndexTemplateName); | ||
if (!exists) { | ||
const template = createIndexTemplate(names, mappings); | ||
await this.gateway.createIndexTemplate(logIndexTemplateName, template); | ||
} | ||
|
||
//--------------- | ||
|
||
const templateExists = await this.esAdapter.doesIndexTemplateExist(indexAliasName); | ||
|
||
const mappings = mappingFromFieldMap(this.options.fieldMap); | ||
|
||
const esClient = (await this.options.coreSetup.getStartServices())[0].elasticsearch.client | ||
.asInternalUser; | ||
|
||
if (!templateExists) { | ||
await this.esAdapter.createIndexTemplate(indexAliasName, { | ||
index_patterns: [`${indexAliasName}-*`], | ||
settings: { | ||
number_of_shards: 1, | ||
auto_expand_replicas: '0-1', | ||
'index.lifecycle.name': policyName, | ||
'index.lifecycle.rollover_alias': indexAliasName, | ||
'sort.field': '@timestamp', | ||
'sort.order': 'desc', | ||
}, | ||
mappings, | ||
}); | ||
} else { | ||
await esClient.indices.putTemplate({ | ||
name: indexAliasName, | ||
body: { | ||
index_patterns: [`${indexAliasName}-*`], | ||
mappings, | ||
}, | ||
create: false, | ||
}); | ||
} | ||
} | ||
|
||
private async createInitialIndexIfNotExists(names: IndexNames): Promise<void> { | ||
const { logIndexAliasName, logIndexInitialName } = names; | ||
|
||
const exists = await this.gateway.doesAliasExist(logIndexAliasName); | ||
if (!exists) { | ||
await this.gateway.createIndex(logIndexInitialName, { | ||
aliases: { | ||
[logIndexAliasName]: { | ||
is_write_index: true, | ||
}, | ||
}, | ||
}); | ||
} | ||
|
||
//--------------- | ||
|
||
const aliasExists = await this.esAdapter.doesAliasExist(indexAliasName); | ||
|
||
if (!aliasExists) { | ||
await this.esAdapter.createIndex(`${indexAliasName}-000001`, { | ||
aliases: { | ||
[indexAliasName]: { | ||
is_write_index: true, | ||
}, | ||
}, | ||
}); | ||
} else { | ||
const { body: aliases } = (await esClient.indices.getAlias({ | ||
index: indexAliasName, | ||
})) as { body: Record<string, { aliases: Record<string, { is_write_index: boolean }> }> }; | ||
|
||
const writeIndex = Object.entries(aliases).find( | ||
([indexName, alias]) => alias.aliases[indexAliasName]?.is_write_index === true | ||
)![0]; | ||
|
||
const { body: fieldsInWriteIndex } = await esClient.fieldCaps({ | ||
index: writeIndex, | ||
fields: '*', | ||
}); | ||
|
||
const fieldsNotOrDifferentInIndex = Object.entries(this.options.fieldMap).filter( | ||
([fieldName, descriptor]) => { | ||
return ( | ||
!fieldsInWriteIndex.fields[fieldName] || | ||
!fieldsInWriteIndex.fields[fieldName][descriptor.type] | ||
); | ||
} | ||
); | ||
|
||
if (fieldsNotOrDifferentInIndex.length > 0) { | ||
this.options.logger.debug( | ||
`Some fields were not found in write index mapping: ${Object.keys( | ||
Object.fromEntries(fieldsNotOrDifferentInIndex) | ||
).join(',')}` | ||
); | ||
this.options.logger.info(`Updating index mapping due to new fields`); | ||
|
||
await esClient.indices.putMapping({ | ||
index: indexAliasName, | ||
body: mappings, | ||
}); | ||
} | ||
} | ||
} | ||
} |
108 changes: 108 additions & 0 deletions
108
x-pack/plugins/rule_registry/server/event_log/elasticsearch/index_management_gateway.ts
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,108 @@ | ||
/* | ||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one | ||
* or more contributor license agreements. Licensed under the Elastic License | ||
* 2.0; you may not use this file except in compliance with the Elastic License | ||
* 2.0. | ||
*/ | ||
|
||
import type { PublicMethodsOf } from '@kbn/utility-types'; | ||
import { ElasticsearchClient, Logger } from 'src/core/server'; | ||
import { ILMPolicy } from './resources/ilm_policy'; | ||
|
||
interface ConstructorParams { | ||
elasticsearch: Promise<ElasticsearchClient>; | ||
logger: Logger; | ||
} | ||
|
||
export type IIndexManagementGateway = PublicMethodsOf<IndexManagementGateway>; | ||
|
||
export class IndexManagementGateway { | ||
private readonly elasticsearch: Promise<ElasticsearchClient>; | ||
private readonly logger: Logger; // TODO: use or remove | ||
|
||
constructor(params: ConstructorParams) { | ||
this.elasticsearch = params.elasticsearch; | ||
this.logger = params.logger.get('IndexManagementGateway'); | ||
} | ||
|
||
public async doesIlmPolicyExist(policyName: string): Promise<boolean> { | ||
const request = { | ||
method: 'GET', | ||
path: `/_ilm/policy/${policyName}`, | ||
}; | ||
try { | ||
const esClient = await this.elasticsearch; | ||
await esClient.transport.request(request); | ||
} catch (err) { | ||
if (err.statusCode === 404) return false; | ||
throw new Error(`error checking existance of ilm policy: ${err.message}`); | ||
} | ||
return true; | ||
} | ||
|
||
public async createIlmPolicy(policyName: string, policy: ILMPolicy): Promise<void> { | ||
const request = { | ||
method: 'PUT', | ||
path: `/_ilm/policy/${policyName}`, | ||
body: policy, | ||
}; | ||
try { | ||
const esClient = await this.elasticsearch; | ||
await esClient.transport.request(request); | ||
} catch (err) { | ||
throw new Error(`error creating ilm policy: ${err.message}`); | ||
} | ||
} | ||
|
||
public async doesIndexTemplateExist(name: string): Promise<boolean> { | ||
try { | ||
const esClient = await this.elasticsearch; | ||
const { body } = await esClient.indices.existsTemplate({ name }); | ||
return body as boolean; | ||
} catch (err) { | ||
throw new Error(`error checking existance of index template: ${err.message}`); | ||
} | ||
} | ||
|
||
public async createIndexTemplate(name: string, template: Record<string, unknown>): Promise<void> { | ||
try { | ||
const esClient = await this.elasticsearch; | ||
await esClient.indices.putTemplate({ name, body: template, create: true }); | ||
} catch (err) { | ||
// The error message doesn't have a type attribute we can look to guarantee it's due | ||
// to the template already existing (only long message) so we'll check ourselves to see | ||
// if the template now exists. This scenario would happen if you startup multiple Kibana | ||
// instances at the same time. | ||
const existsNow = await this.doesIndexTemplateExist(name); | ||
if (!existsNow) { | ||
const error = new Error(`error creating index template: ${err.message}`); | ||
Object.assign(error, { wrapped: err }); | ||
throw error; | ||
} | ||
} | ||
} | ||
|
||
public async doesAliasExist(name: string): Promise<boolean> { | ||
try { | ||
const esClient = await this.elasticsearch; | ||
const { body } = await esClient.indices.existsAlias({ name }); | ||
return body as boolean; | ||
} catch (err) { | ||
throw new Error(`error checking existance of initial index: ${err.message}`); | ||
} | ||
} | ||
|
||
public async createIndex(name: string, body: Record<string, unknown> = {}): Promise<void> { | ||
try { | ||
const esClient = await this.elasticsearch; | ||
await esClient.indices.create({ | ||
index: name, | ||
body, | ||
}); | ||
} catch (err) { | ||
if (err.body?.error?.type !== 'resource_already_exists_exception') { | ||
throw new Error(`error creating initial index: ${err.message}`); | ||
} | ||
} | ||
} | ||
} |
40 changes: 40 additions & 0 deletions
40
x-pack/plugins/rule_registry/server/event_log/elasticsearch/index_reader.ts
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,40 @@ | ||
/* | ||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one | ||
* or more contributor license agreements. Licensed under the Elastic License | ||
* 2.0; you may not use this file except in compliance with the Elastic License | ||
* 2.0. | ||
*/ | ||
|
||
import type { PublicMethodsOf } from '@kbn/utility-types'; | ||
import { estypes } from '@elastic/elasticsearch'; | ||
import { Logger, ElasticsearchClient } from 'src/core/server'; | ||
|
||
interface ConstructorParams { | ||
indexName: string; | ||
elasticsearch: Promise<ElasticsearchClient>; | ||
logger: Logger; | ||
} | ||
|
||
export type IIndexReader = PublicMethodsOf<IndexReader>; | ||
|
||
export class IndexReader { | ||
private readonly indexName: string; | ||
private readonly elasticsearch: Promise<ElasticsearchClient>; | ||
private readonly logger: Logger; // TODO: use or remove | ||
|
||
constructor(params: ConstructorParams) { | ||
this.indexName = params.indexName; | ||
this.elasticsearch = params.elasticsearch; | ||
this.logger = params.logger.get('IndexReader'); | ||
} | ||
|
||
public async search<TDocument>(request: estypes.SearchRequest) { | ||
const esClient = await this.elasticsearch; | ||
const response = await esClient.search<TDocument>({ | ||
...request, | ||
index: this.indexName, | ||
}); | ||
|
||
return response; | ||
} | ||
} |
Oops, something went wrong.