diff --git a/core/src/main/java/org/elasticsearch/action/admin/indices/migrate/TransportMigrateIndexAction.java b/core/src/main/java/org/elasticsearch/action/admin/indices/migrate/TransportMigrateIndexAction.java index 8cdf143f36e59..6775700024c20 100644 --- a/core/src/main/java/org/elasticsearch/action/admin/indices/migrate/TransportMigrateIndexAction.java +++ b/core/src/main/java/org/elasticsearch/action/admin/indices/migrate/TransportMigrateIndexAction.java @@ -222,8 +222,12 @@ Operation buildOperation(Task task, MigrateIndexRequest request, MetaData cluste synchronized (runningOperations) { ActingOperation currentlyRunning = runningOperations.get(request.getCreateIndexRequest().index()); if (currentlyRunning != null) { - // There is a request currently running for this index. We have to "follow" it. - // NOCOMMIT make sure that the requests are the same.... + // There is a request currently running for this index. If it is for the same index we have to "follow" it. + if (false == request.equals(currentlyRunning.request)) { + throw new IllegalArgumentException("Attempting two concurrent but different migration requests for the same index [" + + request.getCreateIndexRequest().index() + "]. This request is [" + request + + "] and the currently running request is [" + currentlyRunning.request + "]"); + } ObservingOperation newOperation = new ObservingOperation(currentlyRunning, listener); currentlyRunning.observers.add(newOperation); return newOperation; diff --git a/core/src/test/java/org/elasticsearch/action/admin/indices/migrate/TransportMigrateIndexActionCoalesceTests.java b/core/src/test/java/org/elasticsearch/action/admin/indices/migrate/TransportMigrateIndexActionCoalesceTests.java index 2239f62b88aaa..6de38e5e70996 100644 --- a/core/src/test/java/org/elasticsearch/action/admin/indices/migrate/TransportMigrateIndexActionCoalesceTests.java +++ b/core/src/test/java/org/elasticsearch/action/admin/indices/migrate/TransportMigrateIndexActionCoalesceTests.java @@ -47,6 +47,7 @@ import static java.util.Collections.emptySet; import static java.util.stream.Collectors.toList; +import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.empty; import static org.mockito.Matchers.any; import static org.mockito.Mockito.mock; @@ -243,9 +244,29 @@ void shortCircuitMigration(ActingOperation operation) { assertBusy(() -> action.withRunningOperation("test", op -> assertNull(op))); } + public void testDifferentRequestsForTheSameIndexFail() throws Exception { + MockAction action = new MockAction() { + @Override + void shortCircuitMigration(ActingOperation operation) { + // Don't return anything so it looks like the request is still running + } + }; + ActionListener mainListener = listener(); + action.masterOperation(request("test"), mainListener); + + ActionListener followerListener = listener(); + MigrateIndexRequest differentRequest = request("test"); + differentRequest.setSourceIndex(differentRequest.getSourceIndex() + "_different"); + Exception e = expectThrows(IllegalArgumentException.class, () -> action.masterOperation(differentRequest, followerListener)); + assertThat(e.getMessage(), containsString("Attempting two concurrent but different migration requests for the same index [test]")); + /* The error message contains the different parts. It contains the whole request right now, but it also contains the different + * parts.... */ + assertThat(e.getMessage(), containsString("source=test_0")); + assertThat(e.getMessage(), containsString("source=test_0_different")); + } + private MigrateIndexRequest request(String destIndex) { - MigrateIndexRequest request = new MigrateIndexRequest(); - request.setCreateIndexRequest(new CreateIndexRequest(destIndex)); + MigrateIndexRequest request = new MigrateIndexRequest(destIndex + "_0", destIndex); return request; }