diff --git a/x-pack/plugin/async-search/src/internalClusterTest/java/org/elasticsearch/xpack/search/AsyncSearchActionIT.java b/x-pack/plugin/async-search/src/internalClusterTest/java/org/elasticsearch/xpack/search/AsyncSearchActionIT.java index 25df836ea74dc..cb317e72ae3fc 100644 --- a/x-pack/plugin/async-search/src/internalClusterTest/java/org/elasticsearch/xpack/search/AsyncSearchActionIT.java +++ b/x-pack/plugin/async-search/src/internalClusterTest/java/org/elasticsearch/xpack/search/AsyncSearchActionIT.java @@ -24,11 +24,13 @@ import org.elasticsearch.xpack.core.search.action.SubmitAsyncSearchRequest; import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.atomic.AtomicInteger; @@ -417,4 +419,37 @@ public void testSearchPhaseFailureNoCause() throws Exception { assertNotNull(response.getFailure()); ensureTaskNotRunning(response.getId()); } + + public void testRetryVersionConflict() throws Exception { + SubmitAsyncSearchRequest request = new SubmitAsyncSearchRequest(indexName); + request.setWaitForCompletionTimeout(TimeValue.timeValueMinutes(10)); + request.setKeepOnCompletion(true); + AsyncSearchResponse response = submitAsyncSearch(request); + assertNotNull(response.getSearchResponse()); + assertFalse(response.isRunning()); + + List threads = new ArrayList<>(); + CountDownLatch latch = new CountDownLatch(1); + List exceptions = Collections.synchronizedList(new ArrayList<>()); + for (int i = 0; i < 2; i++) { + Runnable runnable = () -> { + for (int j = 0; j < 10; j++) { + try { + latch.await(); + getAsyncSearch(response.getId(), TimeValue.timeValueMinutes(10)); + } catch (Exception exc) { + exceptions.add(exc); + } + } + }; + Thread thread = new Thread(runnable); + thread.start(); + threads.add(thread); + } + latch.countDown(); + for (Thread thread : threads) { + thread.join(); + } + assertTrue(exceptions.toString(), exceptions.isEmpty()); + } } diff --git a/x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/TransportSubmitAsyncSearchAction.java b/x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/TransportSubmitAsyncSearchAction.java index d12cd268e79c2..4903594b7bdba 100644 --- a/x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/TransportSubmitAsyncSearchAction.java +++ b/x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/TransportSubmitAsyncSearchAction.java @@ -173,24 +173,12 @@ private void onFatalFailure(AsyncSearchTask task, Exception error, boolean shoul private void onFinalResponse(AsyncSearchTask searchTask, AsyncSearchResponse response, Runnable nextAction) { - if (searchTask.isCancelled()) { - // the task was cancelled so we ensure that there is nothing stored in the response index. - store.deleteResponse(searchTask.getExecutionId(), ActionListener.wrap( - resp -> unregisterTaskAndMoveOn(searchTask, nextAction), - exc -> { - logger.error(() -> new ParameterizedMessage("failed to clean async-search [{}]", - searchTask.getExecutionId().getEncoded()), exc); - unregisterTaskAndMoveOn(searchTask, nextAction); - })); - return; - } - store.updateResponse(searchTask.getExecutionId().getDocId(), threadContext.getResponseHeaders(),response, ActionListener.wrap(resp -> unregisterTaskAndMoveOn(searchTask, nextAction), exc -> { Throwable cause = ExceptionsHelper.unwrapCause(exc); if (cause instanceof DocumentMissingException == false && - cause instanceof VersionConflictEngineException == false) { + cause instanceof VersionConflictEngineException == false) { logger.error(() -> new ParameterizedMessage("failed to store async-search [{}]", searchTask.getExecutionId().getEncoded()), exc); } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/async/AsyncResultsService.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/async/AsyncResultsService.java index 8b208ba3065ca..4e1d9d4381af9 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/async/AsyncResultsService.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/async/AsyncResultsService.java @@ -85,14 +85,16 @@ public void retrieveResult(GetAsyncResultRequest request, ActionListener getSearchResponseFromTask(searchId, request, nowInMillis, expirationTime, listener), exc -> { - //don't log when: the async search document or its index is not found. That can happen if an invalid - //search id is provided or no async search initial response has been stored yet. RestStatus status = ExceptionsHelper.status(ExceptionsHelper.unwrapCause(exc)); if (status != RestStatus.NOT_FOUND) { logger.error(() -> new ParameterizedMessage("failed to update expiration time for async-search [{}]", searchId.getEncoded()), exc); + listener.onFailure(exc); + } else { + //the async search document or its index is not found. + //That can happen if an invalid/deleted search id is provided. + listener.onFailure(new ResourceNotFoundException(searchId.getEncoded())); } - listener.onFailure(new ResourceNotFoundException(searchId.getEncoded())); } )); } else { diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/async/AsyncTaskIndexService.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/async/AsyncTaskIndexService.java index 2819a62ee0ee7..f22a918eac8a2 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/async/AsyncTaskIndexService.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/async/AsyncTaskIndexService.java @@ -193,7 +193,8 @@ public void updateResponse(String docId, UpdateRequest request = new UpdateRequest() .index(index) .id(docId) - .doc(source, XContentType.JSON); + .doc(source, XContentType.JSON) + .retryOnConflict(5); client.update(request, listener); } catch(Exception e) { listener.onFailure(e); @@ -210,7 +211,8 @@ public void updateExpirationTime(String docId, Map source = Collections.singletonMap(EXPIRATION_TIME_FIELD, expirationTimeMillis); UpdateRequest request = new UpdateRequest().index(index) .id(docId) - .doc(source, XContentType.JSON); + .doc(source, XContentType.JSON) + .retryOnConflict(5); client.update(request, listener); }