Skip to content

Commit

Permalink
Simplify Snapshot Create Request Handling (#37464)
Browse files Browse the repository at this point in the history
* The internal create request is absolutely redundant, the only difference to the transport request is that we resolved the snapshot
name when moving from the transport to the internal version
  * Removed it and passed the transport request into the snapshot service instead
* nicer way of resolve snapshot name in callback
  • Loading branch information
original-brownbear authored Jan 16, 2019
1 parent 5e94f38 commit 5a5e44d
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 215 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -73,33 +73,22 @@ protected ClusterBlockException checkBlock(CreateSnapshotRequest request, Cluste
@Override
protected void masterOperation(final CreateSnapshotRequest request, ClusterState state,
final ActionListener<CreateSnapshotResponse> listener) {
final String snapshotName = indexNameExpressionResolver.resolveDateMathExpression(request.snapshot());
SnapshotsService.SnapshotRequest snapshotRequest =
new SnapshotsService.SnapshotRequest(request.repository(), snapshotName, "create_snapshot [" + snapshotName + "]")
.indices(request.indices())
.indicesOptions(request.indicesOptions())
.partial(request.partial())
.settings(request.settings())
.includeGlobalState(request.includeGlobalState())
.masterNodeTimeout(request.masterNodeTimeout());
snapshotsService.createSnapshot(snapshotRequest, new SnapshotsService.CreateSnapshotListener() {
snapshotsService.createSnapshot(request, new SnapshotsService.CreateSnapshotListener() {
@Override
public void onResponse() {
public void onResponse(Snapshot snapshotCreated) {
if (request.waitForCompletion()) {
snapshotsService.addListener(new SnapshotsService.SnapshotCompletionListener() {
@Override
public void onSnapshotCompletion(Snapshot snapshot, SnapshotInfo snapshotInfo) {
if (snapshot.getRepository().equals(request.repository()) &&
snapshot.getSnapshotId().getName().equals(snapshotName)) {
if (snapshotCreated.equals(snapshot)) {
listener.onResponse(new CreateSnapshotResponse(snapshotInfo));
snapshotsService.removeListener(this);
}
}

@Override
public void onSnapshotFailure(Snapshot snapshot, Exception e) {
if (snapshot.getRepository().equals(request.repository()) &&
snapshot.getSnapshotId().getName().equals(snapshotName)) {
if (snapshotCreated.equals(snapshot)) {
listener.onFailure(e);
snapshotsService.removeListener(this);
}
Expand Down
219 changes: 19 additions & 200 deletions server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
import org.apache.lucene.util.CollectionUtil;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotRequest;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateApplier;
Expand Down Expand Up @@ -78,7 +78,6 @@
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArrayList;
Expand All @@ -92,8 +91,8 @@
* <p>
* A typical snapshot creating process looks like this:
* <ul>
* <li>On the master node the {@link #createSnapshot(SnapshotRequest, CreateSnapshotListener)} is called and makes sure that no snapshots
* is currently running and registers the new snapshot in cluster state</li>
* <li>On the master node the {@link #createSnapshot(CreateSnapshotRequest, CreateSnapshotListener)} is called and makes sure that
* no snapshot is currently running and registers the new snapshot in cluster state</li>
* <li>When cluster state is updated
* the {@link #beginSnapshot(ClusterState, SnapshotsInProgress.Entry, boolean, CreateSnapshotListener)} method kicks in and initializes
* the snapshot in the repository and then populates list of shards that needs to be snapshotted in cluster state</li>
Expand Down Expand Up @@ -235,20 +234,20 @@ public List<SnapshotInfo> currentSnapshots(final String repositoryName) {
* @param request snapshot request
* @param listener snapshot creation listener
*/
public void createSnapshot(final SnapshotRequest request, final CreateSnapshotListener listener) {
final String repositoryName = request.repositoryName;
final String snapshotName = request.snapshotName;
public void createSnapshot(final CreateSnapshotRequest request, final CreateSnapshotListener listener) {
final String repositoryName = request.repository();
final String snapshotName = indexNameExpressionResolver.resolveDateMathExpression(request.snapshot());
validate(repositoryName, snapshotName);
final SnapshotId snapshotId = new SnapshotId(snapshotName, UUIDs.randomBase64UUID()); // new UUID for the snapshot
final RepositoryData repositoryData = repositoriesService.repository(repositoryName).getRepositoryData();

clusterService.submitStateUpdateTask(request.cause(), new ClusterStateUpdateTask() {
clusterService.submitStateUpdateTask("create_snapshot [" + snapshotName + ']', new ClusterStateUpdateTask() {

private SnapshotsInProgress.Entry newSnapshot = null;

@Override
public ClusterState execute(ClusterState currentState) {
validate(request, currentState);
validate(repositoryName, snapshotName, currentState);
SnapshotDeletionsInProgress deletionsInProgress = currentState.custom(SnapshotDeletionsInProgress.TYPE);
if (deletionsInProgress != null && deletionsInProgress.hasDeletionsInProgress()) {
throw new ConcurrentSnapshotExecutionException(repositoryName, snapshotName,
Expand Down Expand Up @@ -301,16 +300,16 @@ public TimeValue timeout() {
/**
* Validates snapshot request
*
* @param request snapshot request
* @param repositoryName repository name
* @param snapshotName snapshot name
* @param state current cluster state
*/
private void validate(SnapshotRequest request, ClusterState state) {
private void validate(String repositoryName, String snapshotName, ClusterState state) {
RepositoriesMetaData repositoriesMetaData = state.getMetaData().custom(RepositoriesMetaData.TYPE);
final String repository = request.repositoryName;
if (repositoriesMetaData == null || repositoriesMetaData.repository(repository) == null) {
throw new RepositoryMissingException(repository);
if (repositoriesMetaData == null || repositoriesMetaData.repository(repositoryName) == null) {
throw new RepositoryMissingException(repositoryName);
}
validate(repository, request.snapshotName);
validate(repositoryName, snapshotName);
}

private static void validate(final String repositoryName, final String snapshotName) {
Expand Down Expand Up @@ -377,7 +376,7 @@ protected void doRun() {
logger.info("snapshot [{}] started", snapshot.snapshot());
if (snapshot.indices().isEmpty()) {
// No indices in this snapshot - we are done
userCreateSnapshotListener.onResponse();
userCreateSnapshotListener.onResponse(snapshot.snapshot());
endSnapshot(snapshot);
return;
}
Expand Down Expand Up @@ -465,7 +464,7 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS
// for processing. If client wants to wait for the snapshot completion, it can register snapshot
// completion listener in this method. For the snapshot completion to work properly, the snapshot
// should still exist when listener is registered.
userCreateSnapshotListener.onResponse();
userCreateSnapshotListener.onResponse(snapshot.snapshot());

// Now that snapshot completion listener is registered we can end the snapshot if needed
// We should end snapshot only if 1) we didn't accept it for processing (which happens when there
Expand Down Expand Up @@ -1544,8 +1543,10 @@ public interface CreateSnapshotListener {

/**
* Called when snapshot has successfully started
*
* @param snapshot snapshot that was created
*/
void onResponse();
void onResponse(Snapshot snapshot);

/**
* Called if a snapshot operation couldn't start
Expand Down Expand Up @@ -1575,186 +1576,4 @@ public interface SnapshotCompletionListener {

void onSnapshotFailure(Snapshot snapshot, Exception e);
}

/**
* Snapshot creation request
*/
public static class SnapshotRequest {

private final String cause;

private final String repositoryName;

private final String snapshotName;

private String[] indices;

private IndicesOptions indicesOptions = IndicesOptions.strictExpandOpen();

private boolean partial;

private Settings settings;

private boolean includeGlobalState;

private TimeValue masterNodeTimeout;

/**
* Constructs new snapshot creation request
*
* @param repositoryName repository name
* @param snapshotName snapshot name
* @param cause cause for snapshot operation
*/
public SnapshotRequest(final String repositoryName, final String snapshotName, final String cause) {
this.repositoryName = Objects.requireNonNull(repositoryName);
this.snapshotName = Objects.requireNonNull(snapshotName);
this.cause = Objects.requireNonNull(cause);
}

/**
* Sets the list of indices to be snapshotted
*
* @param indices list of indices
* @return this request
*/
public SnapshotRequest indices(String[] indices) {
this.indices = indices;
return this;
}

/**
* Sets repository-specific snapshot settings
*
* @param settings snapshot settings
* @return this request
*/
public SnapshotRequest settings(Settings settings) {
this.settings = settings;
return this;
}

/**
* Set to true if global state should be stored as part of the snapshot
*
* @param includeGlobalState true if global state should be stored as part of the snapshot
* @return this request
*/
public SnapshotRequest includeGlobalState(boolean includeGlobalState) {
this.includeGlobalState = includeGlobalState;
return this;
}

/**
* Sets master node timeout
*
* @param masterNodeTimeout master node timeout
* @return this request
*/
public SnapshotRequest masterNodeTimeout(TimeValue masterNodeTimeout) {
this.masterNodeTimeout = masterNodeTimeout;
return this;
}

/**
* Sets the indices options
*
* @param indicesOptions indices options
* @return this request
*/
public SnapshotRequest indicesOptions(IndicesOptions indicesOptions) {
this.indicesOptions = indicesOptions;
return this;
}

/**
* Set to true if partial snapshot should be allowed
*
* @param partial true if partial snapshots should be allowed
* @return this request
*/
public SnapshotRequest partial(boolean partial) {
this.partial = partial;
return this;
}

/**
* Returns cause for snapshot operation
*
* @return cause for snapshot operation
*/
public String cause() {
return cause;
}

/**
* Returns the repository name
*/
public String repositoryName() {
return repositoryName;
}

/**
* Returns the snapshot name
*/
public String snapshotName() {
return snapshotName;
}

/**
* Returns the list of indices to be snapshotted
*
* @return the list of indices
*/
public String[] indices() {
return indices;
}

/**
* Returns indices options
*
* @return indices options
*/
public IndicesOptions indicesOptions() {
return indicesOptions;
}

/**
* Returns repository-specific settings for the snapshot operation
*
* @return repository-specific settings
*/
public Settings settings() {
return settings;
}

/**
* Returns true if global state should be stored as part of the snapshot
*
* @return true if global state should be stored as part of the snapshot
*/
public boolean includeGlobalState() {
return includeGlobalState;
}

/**
* Returns true if partial snapshot should be allowed
*
* @return true if partial snapshot should be allowed
*/
public boolean partial() {
return partial;
}

/**
* Returns master node timeout
*
* @return master node timeout
*/
public TimeValue masterNodeTimeout() {
return masterNodeTimeout;
}

}
}

0 comments on commit 5a5e44d

Please sign in to comment.