diff --git a/x-pack/plugins/data_enhanced/public/search/async_search_strategy.test.ts b/x-pack/plugins/data_enhanced/public/search/async_search_strategy.test.ts index 95f2c9e4770642..a7d6aa894d91db 100644 --- a/x-pack/plugins/data_enhanced/public/search/async_search_strategy.test.ts +++ b/x-pack/plugins/data_enhanced/public/search/async_search_strategy.test.ts @@ -45,9 +45,11 @@ describe('Async search strategy', () => { it('stops polling when the response is complete', async () => { mockSearch - .mockReturnValueOnce(of({ id: 1, total: 2, loaded: 1 })) - .mockReturnValueOnce(of({ id: 1, total: 2, loaded: 2 })) - .mockReturnValueOnce(of({ id: 1, total: 2, loaded: 2 })); + .mockReturnValueOnce(of({ id: 1, total: 2, loaded: 1, is_running: true, is_partial: true })) + .mockReturnValueOnce(of({ id: 1, total: 2, loaded: 2, is_running: false, is_partial: false })) + .mockReturnValueOnce( + of({ id: 1, total: 2, loaded: 2, is_running: false, is_partial: false }) + ); const asyncSearch = asyncSearchStrategyProvider({ core: mockCoreStart, @@ -67,10 +69,39 @@ describe('Async search strategy', () => { expect(mockSearch).toBeCalledTimes(2); }); + it('stops polling when the response is an error', async () => { + mockSearch + .mockReturnValueOnce(of({ id: 1, total: 2, loaded: 1, is_running: true, is_partial: true })) + .mockReturnValueOnce(of({ id: 1, total: 2, loaded: 2, is_running: false, is_partial: true })) + .mockReturnValueOnce(of({ id: 1, total: 2, loaded: 2, is_running: false, is_partial: true })); + + const asyncSearch = asyncSearchStrategyProvider({ + core: mockCoreStart, + getSearchStrategy: jest.fn().mockImplementation(() => { + return () => { + return { + search: mockSearch, + }; + }; + }), + }); + + expect(mockSearch).toBeCalledTimes(0); + + await asyncSearch + .search(mockRequest, mockOptions) + .toPromise() + .catch(() => { + expect(mockSearch).toBeCalledTimes(2); + }); + }); + it('only sends the ID and server strategy after the first request', async () => { mockSearch - .mockReturnValueOnce(of({ id: 1, total: 2, loaded: 1 })) - .mockReturnValueOnce(of({ id: 1, total: 2, loaded: 2 })); + .mockReturnValueOnce(of({ id: 1, total: 2, loaded: 1, is_running: true, is_partial: true })) + .mockReturnValueOnce( + of({ id: 1, total: 2, loaded: 2, is_running: false, is_partial: false }) + ); const asyncSearch = asyncSearchStrategyProvider({ core: mockCoreStart, diff --git a/x-pack/plugins/data_enhanced/public/search/async_search_strategy.ts b/x-pack/plugins/data_enhanced/public/search/async_search_strategy.ts index 6271d7fcbeaace..18b5b976b3c1b5 100644 --- a/x-pack/plugins/data_enhanced/public/search/async_search_strategy.ts +++ b/x-pack/plugins/data_enhanced/public/search/async_search_strategy.ts @@ -14,7 +14,7 @@ import { SYNC_SEARCH_STRATEGY, TSearchStrategyProvider, } from '../../../../../src/plugins/data/public'; -import { IAsyncSearchRequest, IAsyncSearchOptions } from './types'; +import { IAsyncSearchRequest, IAsyncSearchOptions, IAsyncSearchResponse } from './types'; export const ASYNC_SEARCH_STRATEGY = 'ASYNC_SEARCH_STRATEGY'; @@ -52,9 +52,14 @@ export const asyncSearchStrategyProvider: TSearchStrategyProvider { + expand((response: IAsyncSearchResponse) => { + // If the response indicates of an error, stop polling and complete the observable + if (!response || (response.is_partial && !response.is_running)) { + return throwError(new AbortError()); + } + // If the response indicates it is complete, stop polling and complete the observable - if ((response.loaded ?? 0) >= (response.total ?? 0)) return EMPTY; + if (!response.is_running) return EMPTY; id = response.id; diff --git a/x-pack/plugins/data_enhanced/public/search/types.ts b/x-pack/plugins/data_enhanced/public/search/types.ts index edaaf1b22654d4..8ffc8eddda0528 100644 --- a/x-pack/plugins/data_enhanced/public/search/types.ts +++ b/x-pack/plugins/data_enhanced/public/search/types.ts @@ -4,7 +4,11 @@ * you may not use this file except in compliance with the Elastic License. */ -import { ISearchOptions, ISyncSearchRequest } from '../../../../../src/plugins/data/public'; +import { + IKibanaSearchResponse, + ISearchOptions, + ISyncSearchRequest, +} from '../../../../../src/plugins/data/public'; export interface IAsyncSearchRequest extends ISyncSearchRequest { /** @@ -19,3 +23,14 @@ export interface IAsyncSearchOptions extends ISearchOptions { */ pollInterval?: number; } + +export interface IAsyncSearchResponse extends IKibanaSearchResponse { + /** + * Indicates whether async search is still in flight + */ + is_running?: boolean; + /** + * Indicates whether the results returned are complete or partial + */ + is_partial?: boolean; +} diff --git a/x-pack/plugins/data_enhanced/server/search/es_search_strategy.ts b/x-pack/plugins/data_enhanced/server/search/es_search_strategy.ts index 6b329bccab4a73..bf502889ffa4f8 100644 --- a/x-pack/plugins/data_enhanced/server/search/es_search_strategy.ts +++ b/x-pack/plugins/data_enhanced/server/search/es_search_strategy.ts @@ -23,6 +23,8 @@ import { shimHitsTotal } from './shim_hits_total'; export interface AsyncSearchResponse { id: string; + is_partial: boolean; + is_running: boolean; response: SearchResponse; } @@ -71,13 +73,19 @@ async function asyncSearch( // Wait up to 1s for the response to return const query = toSnakeCase({ waitForCompletionTimeout: '1s', ...queryParams }); - const { response, id } = (await caller( + const { id, response, is_partial, is_running } = (await caller( 'transport.request', { method, path, body, query }, options )) as AsyncSearchResponse; - return { id, rawResponse: shimHitsTotal(response), ...getTotalLoaded(response._shards) }; + return { + id, + is_partial, + is_running, + rawResponse: shimHitsTotal(response), + ...getTotalLoaded(response._shards), + }; } async function rollupSearch(