Skip to content

Commit

Permalink
[KP] use new ES client in SO service (#72289)
Browse files Browse the repository at this point in the history
* adapt retryCallCluster for new ES client

* review comments

* retry on 408 ResponseError

* remove legacy retry functions

* use Migrator Es client in SO migration

* update migration tests

* improve ES typings and mocks

* migrate decorate ES errors

* add repository es client

* use new es client in so repository

* update repository tests

* fix migrator integration tests

* declare _seq_no & _primary_term on get response. _source expect to be a string

* make _sourceIncludes and refresh compatible with the client

* add test for repository_es_client

* move ApiResponse to es client mocks

* TEMP: handle wait_for as true for deleteByNamespace

* add tests for migration_es_client

* TEMP: skip test for deleteByNamespace refresh

* pass ignore as transport option in mget

* log both es client and response errors

* fix update method test failures

* update deleteByNamespace refresh settings

es doesn't support 'refresh: wait_for' for `updateByQuery` endpoint

* update repository tests. we do not allow customising wait_for

* do not delegate retry logic to es client

* fix type errors after master merged

* fix repository tests

* fix security solutions code

SO doesn't throw Error with status code anymore. Always use SO error helpers

* switch error conditions to use SO error helpers

* cleanup

* address comments about mocks

* use isResponseError helper

* address comments

* fix type errors

Co-authored-by: pgayvallet <[email protected]>
  • Loading branch information
mshustov and pgayvallet authored Jul 25, 2020
1 parent 7f36bd7 commit 2a82ff9
Show file tree
Hide file tree
Showing 47 changed files with 2,383 additions and 1,798 deletions.
30 changes: 22 additions & 8 deletions src/core/server/elasticsearch/client/configure_client.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -118,26 +118,40 @@ describe('configureClient', () => {
});

describe('Client logging', () => {
it('logs error when the client emits an error', () => {
it('logs error when the client emits an @elastic/elasticsearch error', () => {
const client = configureClient(config, { logger, scoped: false });

const response = createApiResponse({ body: {} });
client.emit('response', new errors.TimeoutError('message', response), response);

expect(loggingSystemMock.collect(logger).error).toMatchInlineSnapshot(`
Array [
Array [
"[TimeoutError]: message",
],
]
`);
});

it('logs error when the client emits an ResponseError returned by elasticsearch', () => {
const client = configureClient(config, { logger, scoped: false });

const response = createApiResponse({
statusCode: 400,
headers: {},
body: {
error: {
type: 'error message',
type: 'illegal_argument_exception',
reason: 'request [/_path] contains unrecognized parameter: [name]',
},
},
});
client.emit('response', new errors.ResponseError(response), null);
client.emit('response', new Error('some error'), null);
client.emit('response', new errors.ResponseError(response), response);

expect(loggingSystemMock.collect(logger).error).toMatchInlineSnapshot(`
Array [
Array [
"ResponseError: error message",
],
Array [
"Error: some error",
"[illegal_argument_exception]: request [/_path] contains unrecognized parameter: [name]",
],
]
`);
Expand Down
13 changes: 10 additions & 3 deletions src/core/server/elasticsearch/client/configure_client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import { stringify } from 'querystring';
import { Client } from '@elastic/elasticsearch';
import { Logger } from '../../logging';
import { parseClientOptions, ElasticsearchClientConfig } from './client_config';
import { isResponseError } from './errors';

export const configureClient = (
config: ElasticsearchClientConfig,
Expand All @@ -35,9 +36,15 @@ export const configureClient = (
};

const addLogging = (client: Client, logger: Logger, logQueries: boolean) => {
client.on('response', (err, event) => {
if (err) {
logger.error(`${err.name}: ${err.message}`);
client.on('response', (error, event) => {
if (error) {
const errorMessage =
// error details for response errors provided by elasticsearch
isResponseError(error)
? `[${event.body.error.type}]: ${event.body.error.reason}`
: `[${error.name}]: ${error.message}`;

logger.error(errorMessage);
}
if (event && logQueries) {
const params = event.meta.request.params;
Expand Down
2 changes: 1 addition & 1 deletion src/core/server/elasticsearch/client/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
* under the License.
*/

export { ElasticsearchClient } from './types';
export * from './types';
export { IScopedClusterClient, ScopedClusterClient } from './scoped_cluster_client';
export { ElasticsearchClientConfig } from './client_config';
export { IClusterClient, ICustomClusterClient, ClusterClient } from './cluster_client';
Expand Down
34 changes: 22 additions & 12 deletions src/core/server/elasticsearch/client/mocks.ts
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ const createInternalClientMock = (): DeeplyMockedKeys<Client> => {
.forEach((key) => {
const propType = typeof obj[key];
if (propType === 'function') {
obj[key] = jest.fn();
obj[key] = jest.fn(() => createSuccessTransportRequestPromise({}));
} else if (propType === 'object' && obj[key] != null) {
mockify(obj[key]);
}
Expand All @@ -70,6 +70,7 @@ const createInternalClientMock = (): DeeplyMockedKeys<Client> => {
return (mock as unknown) as DeeplyMockedKeys<Client>;
};

// TODO fix naming ElasticsearchClientMock
export type ElasticSearchClientMock = DeeplyMockedKeys<ElasticsearchClient>;

const createClientMock = (): ElasticSearchClientMock =>
Expand Down Expand Up @@ -124,32 +125,41 @@ export type MockedTransportRequestPromise<T> = TransportRequestPromise<T> & {
abort: jest.MockedFunction<() => undefined>;
};

const createMockedClientResponse = <T>(body: T): MockedTransportRequestPromise<ApiResponse<T>> => {
const response: ApiResponse<T> = {
body,
statusCode: 200,
warnings: [],
headers: {},
meta: {} as any,
};
const createSuccessTransportRequestPromise = <T>(
body: T,
{ statusCode = 200 }: { statusCode?: number } = {}
): MockedTransportRequestPromise<ApiResponse<T>> => {
const response = createApiResponse({ body, statusCode });
const promise = Promise.resolve(response);
(promise as MockedTransportRequestPromise<ApiResponse<T>>).abort = jest.fn();

return promise as MockedTransportRequestPromise<ApiResponse<T>>;
};

const createMockedClientError = (err: any): MockedTransportRequestPromise<never> => {
const createErrorTransportRequestPromise = (err: any): MockedTransportRequestPromise<never> => {
const promise = Promise.reject(err);
(promise as MockedTransportRequestPromise<never>).abort = jest.fn();
return promise as MockedTransportRequestPromise<never>;
};

function createApiResponse(opts: Partial<ApiResponse> = {}): ApiResponse {
return {
body: {},
statusCode: 200,
headers: {},
warnings: [],
meta: {} as any,
...opts,
};
}

export const elasticsearchClientMock = {
createClusterClient: createClusterClientMock,
createCustomClusterClient: createCustomClusterClientMock,
createScopedClusterClient: createScopedClusterClientMock,
createElasticSearchClient: createClientMock,
createInternalClient: createInternalClientMock,
createClientResponse: createMockedClientResponse,
createClientError: createMockedClientError,
createSuccessTransportRequestPromise,
createErrorTransportRequestPromise,
createApiResponse,
};
35 changes: 26 additions & 9 deletions src/core/server/elasticsearch/client/retry_call_cluster.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@ import { loggingSystemMock } from '../../logging/logging_system.mock';
import { retryCallCluster, migrationRetryCallCluster } from './retry_call_cluster';

const dummyBody = { foo: 'bar' };
const createErrorReturn = (err: any) => elasticsearchClientMock.createClientError(err);
const createErrorReturn = (err: any) =>
elasticsearchClientMock.createErrorTransportRequestPromise(err);

describe('retryCallCluster', () => {
let client: ReturnType<typeof elasticsearchClientMock.createElasticSearchClient>;
Expand All @@ -33,7 +34,9 @@ describe('retryCallCluster', () => {
});

it('returns response from ES API call in case of success', async () => {
const successReturn = elasticsearchClientMock.createClientResponse({ ...dummyBody });
const successReturn = elasticsearchClientMock.createSuccessTransportRequestPromise({
...dummyBody,
});

client.asyncSearch.get.mockReturnValue(successReturn);

Expand All @@ -42,7 +45,9 @@ describe('retryCallCluster', () => {
});

it('retries ES API calls that rejects with `NoLivingConnectionsError`', async () => {
const successReturn = elasticsearchClientMock.createClientResponse({ ...dummyBody });
const successReturn = elasticsearchClientMock.createSuccessTransportRequestPromise({
...dummyBody,
});

client.asyncSearch.get
.mockImplementationOnce(() =>
Expand All @@ -57,7 +62,9 @@ describe('retryCallCluster', () => {
it('rejects when ES API calls reject with other errors', async () => {
client.ping
.mockImplementationOnce(() => createErrorReturn(new Error('unknown error')))
.mockImplementationOnce(() => elasticsearchClientMock.createClientResponse({ ...dummyBody }));
.mockImplementationOnce(() =>
elasticsearchClientMock.createSuccessTransportRequestPromise({ ...dummyBody })
);

await expect(retryCallCluster(() => client.ping())).rejects.toMatchInlineSnapshot(
`[Error: unknown error]`
Expand All @@ -73,7 +80,9 @@ describe('retryCallCluster', () => {
createErrorReturn(new errors.NoLivingConnectionsError('no living connections', {} as any))
)
.mockImplementationOnce(() => createErrorReturn(new Error('unknown error')))
.mockImplementationOnce(() => elasticsearchClientMock.createClientResponse({ ...dummyBody }));
.mockImplementationOnce(() =>
elasticsearchClientMock.createSuccessTransportRequestPromise({ ...dummyBody })
);

await expect(retryCallCluster(() => client.ping())).rejects.toMatchInlineSnapshot(
`[Error: unknown error]`
Expand All @@ -94,7 +103,9 @@ describe('migrationRetryCallCluster', () => {
client.ping
.mockImplementationOnce(() => createErrorReturn(error))
.mockImplementationOnce(() => createErrorReturn(error))
.mockImplementationOnce(() => elasticsearchClientMock.createClientResponse({ ...dummyBody }));
.mockImplementationOnce(() =>
elasticsearchClientMock.createSuccessTransportRequestPromise({ ...dummyBody })
);
};

it('retries ES API calls that rejects with `NoLivingConnectionsError`', async () => {
Expand Down Expand Up @@ -225,7 +236,9 @@ describe('migrationRetryCallCluster', () => {
} as any)
)
)
.mockImplementationOnce(() => elasticsearchClientMock.createClientResponse({ ...dummyBody }));
.mockImplementationOnce(() =>
elasticsearchClientMock.createSuccessTransportRequestPromise({ ...dummyBody })
);

await migrationRetryCallCluster(() => client.ping(), logger, 1);

Expand Down Expand Up @@ -258,7 +271,9 @@ describe('migrationRetryCallCluster', () => {
} as any)
)
)
.mockImplementationOnce(() => elasticsearchClientMock.createClientResponse({ ...dummyBody }));
.mockImplementationOnce(() =>
elasticsearchClientMock.createSuccessTransportRequestPromise({ ...dummyBody })
);

await expect(
migrationRetryCallCluster(() => client.ping(), logger, 1)
Expand All @@ -274,7 +289,9 @@ describe('migrationRetryCallCluster', () => {
createErrorReturn(new errors.TimeoutError('timeout error', {} as any))
)
.mockImplementationOnce(() => createErrorReturn(new Error('unknown error')))
.mockImplementationOnce(() => elasticsearchClientMock.createClientResponse({ ...dummyBody }));
.mockImplementationOnce(() =>
elasticsearchClientMock.createSuccessTransportRequestPromise({ ...dummyBody })
);

await expect(
migrationRetryCallCluster(() => client.ping(), logger, 1)
Expand Down
2 changes: 1 addition & 1 deletion src/core/server/elasticsearch/client/retry_call_cluster.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ const retryResponseStatuses = [
403, // AuthenticationException
408, // RequestTimeout
410, // Gone
];
] as const;

/**
* Retries the provided Elasticsearch API call when a `NoLivingConnectionsError` error is
Expand Down
80 changes: 80 additions & 0 deletions src/core/server/elasticsearch/client/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,3 +41,83 @@ export type ElasticsearchClient = Omit<
): TransportRequestPromise<ApiResponse>;
};
};

interface ShardsResponse {
total: number;
successful: number;
failed: number;
skipped: number;
}

interface Explanation {
value: number;
description: string;
details: Explanation[];
}

interface ShardsInfo {
total: number;
successful: number;
skipped: number;
failed: number;
}

export interface CountResponse {
_shards: ShardsInfo;
count: number;
}

/**
* Maintained until elasticsearch provides response typings out of the box
* https:/elastic/elasticsearch-js/pull/970
*/
export interface SearchResponse<T = unknown> {
took: number;
timed_out: boolean;
_scroll_id?: string;
_shards: ShardsResponse;
hits: {
total: number;
max_score: number;
hits: Array<{
_index: string;
_type: string;
_id: string;
_score: number;
_source: T;
_version?: number;
_explanation?: Explanation;
fields?: any;
highlight?: any;
inner_hits?: any;
matched_queries?: string[];
sort?: string[];
}>;
};
aggregations?: any;
}

export interface GetResponse<T> {
_index: string;
_type: string;
_id: string;
_version: number;
_routing?: string;
found: boolean;
_source: T;
_seq_no: number;
_primary_term: number;
}

export interface DeleteDocumentResponse {
_shards: ShardsResponse;
found: boolean;
_index: string;
_type: string;
_id: string;
_version: number;
result: string;
error?: {
type: string;
};
}
6 changes: 3 additions & 3 deletions src/core/server/elasticsearch/elasticsearch_service.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ describe('#setup', () => {
it('esNodeVersionCompatibility$ only starts polling when subscribed to', async (done) => {
const mockedClient = mockClusterClientInstance.asInternalUser;
mockedClient.nodes.info.mockImplementation(() =>
elasticsearchClientMock.createClientError(new Error())
elasticsearchClientMock.createErrorTransportRequestPromise(new Error())
);

const setupContract = await elasticsearchService.setup(setupDeps);
Expand All @@ -243,7 +243,7 @@ describe('#setup', () => {
it('esNodeVersionCompatibility$ stops polling when unsubscribed from', async (done) => {
const mockedClient = mockClusterClientInstance.asInternalUser;
mockedClient.nodes.info.mockImplementation(() =>
elasticsearchClientMock.createClientError(new Error())
elasticsearchClientMock.createErrorTransportRequestPromise(new Error())
);

const setupContract = await elasticsearchService.setup(setupDeps);
Expand Down Expand Up @@ -359,7 +359,7 @@ describe('#stop', () => {

const mockedClient = mockClusterClientInstance.asInternalUser;
mockedClient.nodes.info.mockImplementation(() =>
elasticsearchClientMock.createClientError(new Error())
elasticsearchClientMock.createErrorTransportRequestPromise(new Error())
);

const setupContract = await elasticsearchService.setup(setupDeps);
Expand Down
4 changes: 4 additions & 0 deletions src/core/server/elasticsearch/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,4 +36,8 @@ export {
ElasticsearchClientConfig,
ElasticsearchClient,
IScopedClusterClient,
SearchResponse,
GetResponse,
DeleteDocumentResponse,
CountResponse,
} from './client';
1 change: 0 additions & 1 deletion src/core/server/elasticsearch/legacy/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,5 @@ export {
} from './cluster_client';
export { ILegacyScopedClusterClient, LegacyScopedClusterClient } from './scoped_cluster_client';
export { LegacyElasticsearchClientConfig } from './elasticsearch_client_config';
export { retryCallCluster, migrationsRetryCallCluster } from './retry_call_cluster';
export { LegacyElasticsearchError, LegacyElasticsearchErrorHelpers } from './errors';
export * from './api_types';
Loading

0 comments on commit 2a82ff9

Please sign in to comment.