Skip to content

Commit

Permalink
Address feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
mayya-sharipova committed Oct 26, 2020
1 parent 6e38427 commit 0775cf1
Show file tree
Hide file tree
Showing 8 changed files with 277 additions and 246 deletions.
40 changes: 37 additions & 3 deletions docs/reference/search/async-search.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,7 @@ GET /_async_search/status/FmRldE8zREVEUzA2ZVpUeGs2ejJFUFEaMkZ5QTVrSTZSaVN3WlNFVm
{
"id" : "FmRldE8zREVEUzA2ZVpUeGs2ejJFUFEaMkZ5QTVrSTZSaVN3WlNFVmtlWHJsdzoxMDc=",
"is_running" : true,
"is_partial" : true,
"start_time_in_millis" : 1583945890986,
"expiration_time_in_millis" : 1584377890986,
"_shards" : {
Expand All @@ -256,18 +257,51 @@ GET /_async_search/status/FmRldE8zREVEUzA2ZVpUeGs2ejJFUFEaMkZ5QTVrSTZSaVN3WlNFVm

<1> Indicates how many shards have executed the query so far.

For an async search that has been completed, the status response has a shorter form
that includes only `id`, `is_running` and `expiration_time_in_millis` fields.
For an async search that has been completed, the status response has
an additional `completion_status` field that shows the status
code of the completed async search.
[source,console-result]
--------------------------------------------------
{
"id" : "FmRldE8zREVEUzA2ZVpUeGs2ejJFUFEaMkZ5QTVrSTZSaVN3WlNFVmtlWHJsdzoxMDc=",
"is_running" : false,
"expiration_time_in_millis" : 1584377890986
"is_partial" : false,
"start_time_in_millis" : 1583945890986,
"expiration_time_in_millis" : 1584377890986,
"_shards" : {
"total" : 562,
"successful" : 562,
"skipped" : 0,
"failed" : 0
},
"completion_status" : 200 <1>
}
--------------------------------------------------
// TEST[skip: a sample output of a status of a completed async search]

<1> Indicates that the async search was successfully completed


[source,console-result]
--------------------------------------------------
{
"id" : "FmRldE8zREVEUzA2ZVpUeGs2ejJFUFEaMkZ5QTVrSTZSaVN3WlNFVmtlWHJsdzoxMDc=",
"is_running" : false,
"is_partial" : true,
"start_time_in_millis" : 1583945890986,
"expiration_time_in_millis" : 1584377890986,
"_shards" : {
"total" : 562,
"successful" : 188,
"skipped" : 0,
"failed" : 1
},
"completion_status" : 503 <1>
}
--------------------------------------------------
// TEST[skip: a sample output of a status of a completed async search]

<1> Indicates that the async search was completed with an error

[[delete-async-search]]
==== Delete async search
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,10 @@ public void testRestartAfterCompletion() throws Exception {

AsyncStatusResponse statusResponse = getAsyncStatus(initial.getId());
assertFalse(statusResponse.isRunning());
assertFalse(statusResponse.isPartial());
assertEquals(numShards, statusResponse.getTotalShards());
assertEquals(numShards, statusResponse.getSuccessfulShards());
assertEquals(RestStatus.OK, statusResponse.getCompletionStatus());

deleteAsyncSearch(response.getId());
ensureTaskRemoval(response.getId());
Expand Down Expand Up @@ -241,6 +245,11 @@ public void testCleanupOnFailure() throws Exception {

AsyncStatusResponse statusResponse = getAsyncStatus(initial.getId());
assertFalse(statusResponse.isRunning());
assertTrue(statusResponse.isPartial());
assertEquals(numShards, statusResponse.getTotalShards());
assertEquals(0, statusResponse.getSuccessfulShards());
assertEquals(numShards, statusResponse.getFailedShards());
assertThat(statusResponse.getCompletionStatus().getStatus(), greaterThanOrEqualTo(400));

deleteAsyncSearch(initial.getId());
ensureTaskRemoval(initial.getId());
Expand Down Expand Up @@ -305,8 +314,10 @@ public void testCancellation() throws Exception {

AsyncStatusResponse statusResponse = getAsyncStatus(response.getId());
assertTrue(statusResponse.isRunning());
assertThat(statusResponse.getTotalShards(), equalTo(numShards));
assertThat(statusResponse.getSuccessfulShards(), equalTo(0));
assertEquals(numShards, statusResponse.getTotalShards());
assertEquals(0, statusResponse.getSuccessfulShards());
assertEquals(0, statusResponse.getSkippedShards());
assertEquals(0, statusResponse.getFailedShards());

deleteAsyncSearch(response.getId());
ensureTaskRemoval(response.getId());
Expand Down Expand Up @@ -343,10 +354,15 @@ public void testUpdateRunningKeepAlive() throws Exception {
assertThat(response.getSearchResponse().getFailedShards(), equalTo(0));

AsyncStatusResponse statusResponse = getAsyncStatus(response.getId());
assertTrue(statusResponse.isRunning());
assertTrue(statusResponse.isPartial());
assertThat(statusResponse.getExpirationTime(), greaterThan(expirationTime));
assertThat(statusResponse.getStartTime(), lessThan(statusResponse.getExpirationTime()));
assertTrue(statusResponse.isRunning());
assertThat(statusResponse.getTotalShards(), equalTo(numShards));
assertEquals(numShards, statusResponse.getTotalShards());
assertEquals(0, statusResponse.getSuccessfulShards());
assertEquals(0, statusResponse.getFailedShards());
assertEquals(0, statusResponse.getSkippedShards());
assertEquals(null, statusResponse.getCompletionStatus());

response = getAsyncSearch(response.getId(), TimeValue.timeValueMillis(1));
assertThat(response.getExpirationTime(), lessThan(expirationTime));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import org.apache.lucene.search.TotalHits;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchResponse.Clusters;
import org.elasticsearch.action.search.ShardSearchFailure;
Expand All @@ -25,7 +26,6 @@
import java.util.function.Supplier;

import static org.elasticsearch.xpack.core.async.AsyncTaskIndexService.restoreResponseHeadersContext;
import static org.elasticsearch.xpack.core.search.action.AsyncStatusResponse.getCompletedSearchStatusResponse;

/**
* A mutable search response that allows to update and create partial response synchronously.
Expand All @@ -42,7 +42,7 @@ class MutableSearchResponse {
private final ThreadContext threadContext;

private boolean isPartial;
private int successfulShards;
private volatile int successfulShards;
private TotalHits totalHits;
/**
* How we get the reduced aggs when {@link #finalResponse} isn't populated.
Expand All @@ -56,8 +56,8 @@ class MutableSearchResponse {
* building our own {@linkplain SearchResponse}s when get async search
* is called, and instead return this.
*/
private SearchResponse finalResponse;
private ElasticsearchException failure;
private volatile SearchResponse finalResponse;
private volatile ElasticsearchException failure;
private Map<String, List<String>> responseHeaders;

private boolean frozen;
Expand Down Expand Up @@ -196,21 +196,47 @@ synchronized AsyncSearchResponse toAsyncSearchResponse(AsyncSearchTask task,
* @param expirationTime – expiration time of async search request
* @return response representing the status of async search
*/
AsyncStatusResponse toStatusResponse(String asyncExecutionId, long startTime, long expirationTime) {
if (frozen == false) {
AsyncStatusResponse toStatusResponse(String asyncExecutionId, long startTime, long expirationTime) {
if (finalResponse != null) { // no need to synchronize this, as finalResponse is set only once
return new AsyncStatusResponse(
asyncExecutionId,
false,
false,
startTime,
expirationTime,
finalResponse != null ? finalResponse.getTotalShards() : totalShards,
finalResponse != null ? finalResponse.getSuccessfulShards() : successfulShards,
finalResponse != null ? finalResponse.getSkippedShards() : skippedShards,
finalResponse != null ? (finalResponse.getShardFailures() == null ? finalResponse.getShardFailures().length : 0) :
(queryFailures != null ? queryFailures.length() : 0)
finalResponse.getTotalShards(),
finalResponse.getSuccessfulShards(),
finalResponse.getSkippedShards(),
finalResponse.getShardFailures() != null ? finalResponse.getShardFailures().length : 0,
finalResponse.status()
);
}
if (failure != null) { // no need to synchronize this, as failure is set only once
return new AsyncStatusResponse(
asyncExecutionId,
false,
true,
startTime,
expirationTime,
totalShards,
successfulShards,
skippedShards,
getQueryFailuresCount(),
ExceptionsHelper.status(ExceptionsHelper.unwrapCause(failure))
);
} else {
return getCompletedSearchStatusResponse(asyncExecutionId, expirationTime);
}
return new AsyncStatusResponse(
asyncExecutionId,
true,
true,
startTime,
expirationTime,
totalShards,
successfulShards,
skippedShards,
getQueryFailuresCount(),
null // for a still running search, completion status is null
);
}

synchronized AsyncSearchResponse toAsyncSearchResponse(AsyncSearchTask task,
Expand Down Expand Up @@ -242,4 +268,17 @@ private ShardSearchFailure[] buildQueryFailures() {
}
return failures.toArray(ShardSearchFailure[]::new);
}

private int getQueryFailuresCount() {
if (queryFailures == null) {
return 0;
}
int count = 0;
for (int i = 0; i < queryFailures.length(); i++) {
if (queryFailures.get(i) != null) {
count++;
}
}
return count;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
*/
package org.elasticsearch.xpack.search;

import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionListenerResponseHandler;
import org.elasticsearch.action.support.ActionFilters;
Expand All @@ -19,17 +20,22 @@
import org.elasticsearch.transport.TransportRequestOptions;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.core.XPackPlugin;
import org.elasticsearch.xpack.core.async.AsyncStatusService;
import org.elasticsearch.xpack.core.async.AsyncExecutionId;
import org.elasticsearch.xpack.core.async.AsyncTask;
import org.elasticsearch.xpack.core.async.AsyncTaskIndexService;
import org.elasticsearch.xpack.core.async.GetAsyncStatusRequest;
import org.elasticsearch.xpack.core.search.action.AsyncSearchResponse;
import org.elasticsearch.xpack.core.search.action.AsyncStatusResponse;
import org.elasticsearch.xpack.core.search.action.GetAsyncStatusAction;

import java.util.Objects;

import static org.elasticsearch.xpack.core.ClientHelper.ASYNC_SEARCH_ORIGIN;

public class TransportGetAsyncStatusAction extends HandledTransportAction<GetAsyncStatusRequest, AsyncStatusResponse> {
private final TransportService transportService;
private final AsyncStatusService<AsyncSearchTask, AsyncStatusResponse> statusService;
private final ClusterService clusterService;
private final AsyncTaskIndexService<AsyncSearchResponse> store;

@Inject
public TransportGetAsyncStatusAction(TransportService transportService,
Expand All @@ -40,21 +46,70 @@ public TransportGetAsyncStatusAction(TransportService transportService,
ThreadPool threadPool) {
super(GetAsyncStatusAction.NAME, transportService, actionFilters, GetAsyncStatusRequest::new);
this.transportService = transportService;
AsyncTaskIndexService<AsyncStatusResponse> store = new AsyncTaskIndexService<>(XPackPlugin.ASYNC_RESULTS_INDEX, clusterService,
threadPool.getThreadContext(), client, ASYNC_SEARCH_ORIGIN, AsyncStatusResponse::new, registry);
this.statusService = new AsyncStatusService<>(store, AsyncSearchTask.class, AsyncSearchTask::getStatusResponse,
AsyncStatusResponse::getCompletedSearchStatusResponse, transportService.getTaskManager(), clusterService);
this.clusterService = clusterService;
this.store = new AsyncTaskIndexService<>(XPackPlugin.ASYNC_RESULTS_INDEX, clusterService,
threadPool.getThreadContext(), client, ASYNC_SEARCH_ORIGIN, AsyncSearchResponse::new, registry);
}

@Override
protected void doExecute(Task task, GetAsyncStatusRequest request, ActionListener<AsyncStatusResponse> listener) {
DiscoveryNode node = statusService.getNode(request.getId());
if (node == null || statusService.isLocalNode(node)) {
statusService.retrieveStatus(request, listener);
AsyncExecutionId searchId = AsyncExecutionId.decode(request.getId());
DiscoveryNode node = clusterService.state().nodes().get(searchId.getTaskId().getNodeId());
if (node == null || Objects.equals(node, clusterService.localNode())) {
retrieveStatus(request, listener);
} else {
TransportRequestOptions.Builder builder = TransportRequestOptions.builder();
transportService.sendRequest(node, GetAsyncStatusAction.NAME, request, builder.build(),
new ActionListenerResponseHandler<>(listener, AsyncStatusResponse::new, ThreadPool.Names.SAME));
}
}

private void retrieveStatus(GetAsyncStatusRequest request, ActionListener<AsyncStatusResponse> listener) {
long nowInMillis = System.currentTimeMillis();
AsyncExecutionId searchId = AsyncExecutionId.decode(request.getId());
try {
AsyncTask task = (AsyncTask) taskManager.getTask(searchId.getTaskId().getId());
if ((task instanceof AsyncSearchTask) && (task.getExecutionId().equals(searchId))) {
if (task.isCancelled()) {
listener.onFailure(new ResourceNotFoundException(searchId.getEncoded()));
} else {
AsyncStatusResponse response = ((AsyncSearchTask) task).getStatusResponse();
sendFinalResponse(request, response, nowInMillis, listener);
}
} else {
getStatusResponseFromIndex(searchId, request, nowInMillis, listener);
}
} catch (Exception exc) {
listener.onFailure(exc);
}
}

/**
* Get a status response from index
*/
private void getStatusResponseFromIndex(AsyncExecutionId searchId,
GetAsyncStatusRequest request, long nowInMillis, ActionListener<AsyncStatusResponse> listener) {
store.getStatusResponse(searchId, AsyncStatusResponse::getStatusFromAsyncSearchResponseWithExpirationTime,
new ActionListener<>() {
@Override
public void onResponse(AsyncStatusResponse asyncStatusResponse) {
sendFinalResponse(request, asyncStatusResponse, nowInMillis, listener);
}

@Override
public void onFailure(Exception e) {
listener.onFailure(e);
}
}
);
}

private static void sendFinalResponse(GetAsyncStatusRequest request,
AsyncStatusResponse response, long nowInMillis, ActionListener<AsyncStatusResponse> listener) {
if (response.getExpirationTime() < nowInMillis) { // check if the result has expired
listener.onFailure(new ResourceNotFoundException(request.getId()));
} else {
listener.onResponse(response);
}
}
}
Loading

0 comments on commit 0775cf1

Please sign in to comment.