Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce async search status API #62947

Merged
merged 6 commits into from
Nov 3, 2020
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
87 changes: 84 additions & 3 deletions docs/reference/search/async-search.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ set to `false`.
==== Get async search

The get async search API retrieves the results of a previously submitted
async search request given its id. If the {es} {security-features} are enabled.
async search request given its id. If the {es} {security-features} are enabled,
the access to the results of a specific async search is restricted to the user
that submitted it in the first place.

Expand All @@ -161,8 +161,8 @@ GET /_async_search/FmRldE8zREVEUzA2ZVpUeGs2ejJFUFEaMkZ5QTVrSTZSaVN3WlNFVmtlWHJsd
"timed_out" : false,
"num_reduce_phases" : 46, <4>
"_shards" : {
"total" : 562, <5>
"successful" : 188,
"total" : 562,
"successful" : 188, <5>
"skipped" : 0,
"failed" : 0
},
Expand Down Expand Up @@ -222,6 +222,87 @@ override such value and extend the validity of the request. When this period
expires, the search, if still running, is cancelled. If the search is
completed, its saved results are deleted.


[[get-async-search-status]]
==== Get async search status
The get async search status API, without retrieving search results, shows
only the status of a previously submitted async search request given its `id`.
If the {es} {security-features} are enabled, the access to the get async
search status API is restricted to the
<<built-in-roles, monitoring_user role>>.

[source,console,id=get-async-search-status-example]
--------------------------------------------------
GET /_async_search/status/FmRldE8zREVEUzA2ZVpUeGs2ejJFUFEaMkZ5QTVrSTZSaVN3WlNFVmtlWHJsdzoxMDc=
--------------------------------------------------
// TEST[continued s/FmRldE8zREVEUzA2ZVpUeGs2ejJFUFEaMkZ5QTVrSTZSaVN3WlNFVmtlWHJsdzoxMDc=/\${body.id}/]

[source,console-result]
--------------------------------------------------
{
"id" : "FmRldE8zREVEUzA2ZVpUeGs2ejJFUFEaMkZ5QTVrSTZSaVN3WlNFVmtlWHJsdzoxMDc=",
"is_running" : true,
"is_partial" : true,
"start_time_in_millis" : 1583945890986,
"expiration_time_in_millis" : 1584377890986,
"_shards" : {
"total" : 562,
"successful" : 188, <1>
"skipped" : 0,
"failed" : 0
}
}
--------------------------------------------------
// TEST[skip: a sample output of a status of a running async search]

<1> Indicates how many shards have executed the query so far.

For an async search that has been completed, the status response has
an additional `completion_status` field that shows the status
code of the completed async search.
[source,console-result]
--------------------------------------------------
{
"id" : "FmRldE8zREVEUzA2ZVpUeGs2ejJFUFEaMkZ5QTVrSTZSaVN3WlNFVmtlWHJsdzoxMDc=",
"is_running" : false,
mayya-sharipova marked this conversation as resolved.
Show resolved Hide resolved
"is_partial" : false,
"start_time_in_millis" : 1583945890986,
"expiration_time_in_millis" : 1584377890986,
"_shards" : {
"total" : 562,
"successful" : 562,
"skipped" : 0,
"failed" : 0
},
"completion_status" : 200 <1>
}
--------------------------------------------------
// TEST[skip: a sample output of a status of a completed async search]

<1> Indicates that the async search was successfully completed


[source,console-result]
--------------------------------------------------
{
"id" : "FmRldE8zREVEUzA2ZVpUeGs2ejJFUFEaMkZ5QTVrSTZSaVN3WlNFVmtlWHJsdzoxMDc=",
"is_running" : false,
"is_partial" : true,
"start_time_in_millis" : 1583945890986,
"expiration_time_in_millis" : 1584377890986,
"_shards" : {
"total" : 562,
"successful" : 450,
"skipped" : 0,
"failed" : 112
},
"completion_status" : 503 <1>
}
--------------------------------------------------
// TEST[skip: a sample output of a status of a completed async search]

<1> Indicates that the async search was completed with an error

[[delete-async-search]]
==== Delete async search

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.elasticsearch.test.ESIntegTestCase.SuiteScopeTestCase;
import org.elasticsearch.xpack.core.XPackPlugin;
import org.elasticsearch.xpack.core.search.action.AsyncSearchResponse;
import org.elasticsearch.xpack.core.search.action.AsyncStatusResponse;
import org.elasticsearch.xpack.core.search.action.SubmitAsyncSearchRequest;

import java.util.ArrayList;
Expand Down Expand Up @@ -188,10 +189,19 @@ public void testRestartAfterCompletion() throws Exception {
}
ensureTaskCompletion(initial.getId());
restartTaskNode(initial.getId(), indexName);

AsyncSearchResponse response = getAsyncSearch(initial.getId());
assertNotNull(response.getSearchResponse());
assertFalse(response.isRunning());
assertFalse(response.isPartial());

AsyncStatusResponse statusResponse = getAsyncStatus(initial.getId());
assertFalse(statusResponse.isRunning());
assertFalse(statusResponse.isPartial());
assertEquals(numShards, statusResponse.getTotalShards());
assertEquals(numShards, statusResponse.getSuccessfulShards());
assertEquals(RestStatus.OK, statusResponse.getCompletionStatus());

deleteAsyncSearch(response.getId());
ensureTaskRemoval(response.getId());
}
Expand Down Expand Up @@ -232,6 +242,15 @@ public void testCleanupOnFailure() throws Exception {
assertTrue(response.isPartial());
assertThat(response.getSearchResponse().getTotalShards(), equalTo(numShards));
assertThat(response.getSearchResponse().getShardFailures().length, equalTo(numShards));

AsyncStatusResponse statusResponse = getAsyncStatus(initial.getId());
assertFalse(statusResponse.isRunning());
assertTrue(statusResponse.isPartial());
assertEquals(numShards, statusResponse.getTotalShards());
assertEquals(0, statusResponse.getSuccessfulShards());
assertEquals(numShards, statusResponse.getFailedShards());
assertThat(statusResponse.getCompletionStatus().getStatus(), greaterThanOrEqualTo(400));

deleteAsyncSearch(initial.getId());
ensureTaskRemoval(initial.getId());
}
Expand All @@ -248,6 +267,10 @@ public void testInvalidId() throws Exception {
}
assertFalse(response.isRunning());
}

ExecutionException exc = expectThrows(ExecutionException.class, () -> getAsyncStatus("invalid"));
assertThat(exc.getCause(), instanceOf(IllegalArgumentException.class));
assertThat(exc.getMessage(), containsString("invalid id"));
}

public void testNoIndex() throws Exception {
Expand Down Expand Up @@ -289,6 +312,13 @@ public void testCancellation() throws Exception {
assertThat(response.getSearchResponse().getSuccessfulShards(), equalTo(0));
assertThat(response.getSearchResponse().getFailedShards(), equalTo(0));

AsyncStatusResponse statusResponse = getAsyncStatus(response.getId());
assertTrue(statusResponse.isRunning());
assertEquals(numShards, statusResponse.getTotalShards());
assertEquals(0, statusResponse.getSuccessfulShards());
assertEquals(0, statusResponse.getSkippedShards());
assertEquals(0, statusResponse.getFailedShards());

deleteAsyncSearch(response.getId());
ensureTaskRemoval(response.getId());
}
Expand Down Expand Up @@ -323,6 +353,17 @@ public void testUpdateRunningKeepAlive() throws Exception {
assertThat(response.getSearchResponse().getSuccessfulShards(), equalTo(0));
assertThat(response.getSearchResponse().getFailedShards(), equalTo(0));

AsyncStatusResponse statusResponse = getAsyncStatus(response.getId());
assertTrue(statusResponse.isRunning());
assertTrue(statusResponse.isPartial());
assertThat(statusResponse.getExpirationTime(), greaterThan(expirationTime));
assertThat(statusResponse.getStartTime(), lessThan(statusResponse.getExpirationTime()));
assertEquals(numShards, statusResponse.getTotalShards());
assertEquals(0, statusResponse.getSuccessfulShards());
assertEquals(0, statusResponse.getFailedShards());
assertEquals(0, statusResponse.getSkippedShards());
assertEquals(null, statusResponse.getCompletionStatus());

response = getAsyncSearch(response.getId(), TimeValue.timeValueMillis(1));
assertThat(response.getExpirationTime(), lessThan(expirationTime));
ensureTaskNotRunning(response.getId());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,13 @@
import org.elasticsearch.xpack.core.async.DeleteAsyncResultAction;
import org.elasticsearch.xpack.core.async.DeleteAsyncResultRequest;
import org.elasticsearch.xpack.core.async.GetAsyncResultRequest;
import org.elasticsearch.xpack.core.async.GetAsyncStatusRequest;
import org.elasticsearch.xpack.core.search.action.AsyncSearchResponse;
import org.elasticsearch.xpack.core.search.action.AsyncStatusResponse;
import org.elasticsearch.xpack.core.search.action.ClosePointInTimeAction;
import org.elasticsearch.xpack.core.search.action.ClosePointInTimeRequest;
import org.elasticsearch.xpack.core.search.action.GetAsyncSearchAction;
import org.elasticsearch.xpack.core.search.action.GetAsyncStatusAction;
import org.elasticsearch.xpack.core.search.action.OpenPointInTimeAction;
import org.elasticsearch.xpack.core.search.action.OpenPointInTimeRequest;
import org.elasticsearch.xpack.core.search.action.SubmitAsyncSearchAction;
Expand Down Expand Up @@ -154,6 +157,10 @@ protected AsyncSearchResponse getAsyncSearch(String id, TimeValue keepAlive) thr
return client().execute(GetAsyncSearchAction.INSTANCE, new GetAsyncResultRequest(id).setKeepAlive(keepAlive)).get();
}

protected AsyncStatusResponse getAsyncStatus(String id) throws ExecutionException, InterruptedException {
return client().execute(GetAsyncStatusAction.INSTANCE, new GetAsyncStatusRequest(id)).get();
}

protected AcknowledgedResponse deleteAsyncSearch(String id) throws ExecutionException, InterruptedException {
return client().execute(DeleteAsyncResultAction.INSTANCE, new DeleteAsyncResultRequest(id)).get();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import org.elasticsearch.rest.RestController;
import org.elasticsearch.rest.RestHandler;
import org.elasticsearch.xpack.core.search.action.GetAsyncSearchAction;
import org.elasticsearch.xpack.core.search.action.GetAsyncStatusAction;
import org.elasticsearch.xpack.core.search.action.SubmitAsyncSearchAction;

import java.util.Arrays;
Expand All @@ -34,7 +35,8 @@ public final class AsyncSearch extends Plugin implements ActionPlugin {
public List<ActionHandler<? extends ActionRequest, ? extends ActionResponse>> getActions() {
return Arrays.asList(
new ActionHandler<>(SubmitAsyncSearchAction.INSTANCE, TransportSubmitAsyncSearchAction.class),
new ActionHandler<>(GetAsyncSearchAction.INSTANCE, TransportGetAsyncSearchAction.class)
new ActionHandler<>(GetAsyncSearchAction.INSTANCE, TransportGetAsyncSearchAction.class),
new ActionHandler<>(GetAsyncStatusAction.INSTANCE, TransportGetAsyncStatusAction.class)
);
}

Expand All @@ -46,6 +48,7 @@ public List<RestHandler> getRestHandlers(Settings settings, RestController restC
return Arrays.asList(
new RestSubmitAsyncSearchAction(),
new RestGetAsyncSearchAction(),
new RestGetAsyncStatusAction(),
new RestDeleteAsyncSearchAction()
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.elasticsearch.xpack.core.async.AsyncExecutionId;
import org.elasticsearch.xpack.core.async.AsyncTask;
import org.elasticsearch.xpack.core.search.action.AsyncSearchResponse;
import org.elasticsearch.xpack.core.search.action.AsyncStatusResponse;

import java.util.ArrayList;
import java.util.HashMap;
Expand Down Expand Up @@ -347,6 +348,15 @@ private synchronized void checkCancellation() {
}
}

/**
* Returns the status of {@link AsyncSearchTask}
*/
public AsyncStatusResponse getStatusResponse() {
MutableSearchResponse mutableSearchResponse = searchResponse.get();
assert mutableSearchResponse != null;
return mutableSearchResponse.toStatusResponse(searchId.getEncoded(), getStartTime(), expirationTimeMillis);
}

class Listener extends SearchProgressActionListener {
@Override
protected void onQueryResult(int shardIndex) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import org.apache.lucene.search.TotalHits;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchResponse.Clusters;
import org.elasticsearch.action.search.ShardSearchFailure;
Expand All @@ -17,6 +18,7 @@
import org.elasticsearch.search.aggregations.InternalAggregations;
import org.elasticsearch.search.internal.InternalSearchResponse;
import org.elasticsearch.xpack.core.search.action.AsyncSearchResponse;
import org.elasticsearch.xpack.core.search.action.AsyncStatusResponse;

import java.util.ArrayList;
import java.util.List;
Expand All @@ -32,6 +34,7 @@
* run concurrently to 1 and ensures that we pause the search progress when an {@link AsyncSearchResponse} is built.
*/
class MutableSearchResponse {
private static final TotalHits EMPTY_TOTAL_HITS = new TotalHits(0L, TotalHits.Relation.GREATER_THAN_OR_EQUAL_TO);
private final int totalShards;
private final int skippedShards;
private final Clusters clusters;
Expand Down Expand Up @@ -77,7 +80,7 @@ class MutableSearchResponse {
this.queryFailures = totalShards == -1 ? null : new AtomicArray<>(totalShards-skippedShards);
this.isPartial = true;
this.threadContext = threadContext;
this.totalHits = new TotalHits(0L, TotalHits.Relation.GREATER_THAN_OR_EQUAL_TO);
this.totalHits = EMPTY_TOTAL_HITS;
}

/**
Expand Down Expand Up @@ -184,6 +187,58 @@ synchronized AsyncSearchResponse toAsyncSearchResponse(AsyncSearchTask task,
failure, isPartial, frozen == false, task.getStartTime(), expirationTime);
}


/**
* Creates an {@link AsyncStatusResponse} -- status of an async response.
* Response is created based on the current state of the mutable response or based on {@code finalResponse} if it is available.
* @param asyncExecutionId – id of async search request
* @param startTime – start time of task
* @param expirationTime – expiration time of async search request
* @return response representing the status of async search
*/
synchronized AsyncStatusResponse toStatusResponse(String asyncExecutionId, long startTime, long expirationTime) {
if (finalResponse != null) {
return new AsyncStatusResponse(
asyncExecutionId,
false,
false,
startTime,
expirationTime,
finalResponse.getTotalShards(),
finalResponse.getSuccessfulShards(),
finalResponse.getSkippedShards(),
finalResponse.getShardFailures() != null ? finalResponse.getShardFailures().length : 0,
finalResponse.status()
);
}
if (failure != null) {
return new AsyncStatusResponse(
asyncExecutionId,
false,
true,
startTime,
expirationTime,
totalShards,
successfulShards,
skippedShards,
getQueryFailuresCount(),
ExceptionsHelper.status(ExceptionsHelper.unwrapCause(failure))
);
}
return new AsyncStatusResponse(
asyncExecutionId,
true,
true,
startTime,
expirationTime,
totalShards,
successfulShards,
skippedShards,
getQueryFailuresCount(),
null // for a still running search, completion status is null
);
}

synchronized AsyncSearchResponse toAsyncSearchResponse(AsyncSearchTask task,
long expirationTime,
ElasticsearchException reduceException) {
Expand Down Expand Up @@ -213,4 +268,17 @@ private ShardSearchFailure[] buildQueryFailures() {
}
return failures.toArray(ShardSearchFailure[]::new);
}

private int getQueryFailuresCount() {
if (queryFailures == null) {
return 0;
}
int count = 0;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: you can do return queryFailures.asList().size() ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jimczi Thanks for the suggestion. It seems that asList() method creates a additional list object. Instead, in 39dafa9, I've added AtomicArray::nonNullLength method.

for (int i = 0; i < queryFailures.length(); i++) {
if (queryFailures.get(i) != null) {
count++;
}
}
return count;
}
}
Loading