From 59222163611989b984fd5c60f6def4495d2d0086 Mon Sep 17 00:00:00 2001 From: Sohaib Iftikhar Date: Wed, 21 Mar 2018 11:50:59 +0100 Subject: [PATCH 1/4] REST high-level client: add synced flush API WIP: Need clarifications --- .../elasticsearch/client/IndicesClient.java | 21 ++ .../org/elasticsearch/client/Request.java | 9 + .../elasticsearch/client/IndicesClientIT.java | 28 +++ .../indices/flush/SyncedFlushResponse.java | 208 ++++++++++++++++++ .../cluster/routing/RecoverySource.java | 89 ++++++++ .../cluster/routing/ShardRouting.java | 123 ++++++++++- .../cluster/routing/UnassignedInfo.java | 90 +++++++- .../flush/ShardsSyncedFlushResult.java | 43 +++- 8 files changed, 591 insertions(+), 20 deletions(-) diff --git a/client/rest-high-level/src/main/java/org/elasticsearch/client/IndicesClient.java b/client/rest-high-level/src/main/java/org/elasticsearch/client/IndicesClient.java index 0b366aa99e188..ebd2b642a33e4 100644 --- a/client/rest-high-level/src/main/java/org/elasticsearch/client/IndicesClient.java +++ b/client/rest-high-level/src/main/java/org/elasticsearch/client/IndicesClient.java @@ -34,6 +34,8 @@ import org.elasticsearch.action.admin.indices.delete.DeleteIndexResponse; import org.elasticsearch.action.admin.indices.flush.FlushRequest; import org.elasticsearch.action.admin.indices.flush.FlushResponse; +import org.elasticsearch.action.admin.indices.flush.SyncedFlushRequest; +import org.elasticsearch.action.admin.indices.flush.SyncedFlushResponse; import org.elasticsearch.action.admin.indices.get.GetIndexRequest; import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest; import org.elasticsearch.action.admin.indices.mapping.put.PutMappingResponse; @@ -261,6 +263,25 @@ public void flushAsync(FlushRequest flushRequest, ActionListener listener, emptySet(), headers); } + /** Initiate a synced flush manually using the synced flush API + *

+ * See Synced flush API on elastic.co + */ + public SyncedFlushResponse syncedFlush(SyncedFlushRequest syncedFlushRequest, Header... headers) throws IOException { + return restHighLevelClient.performRequestAndParseEntity(syncedFlushRequest, Request::syncedFlush, + SyncedFlushResponse::fromXContent, emptySet(), headers); + } + + /** + * Asynchronously initiate a synced flush manually using the synced flush API + *

+ * See Synced flush API on elastic.co + */ + public void syncedFlushAsync(SyncedFlushRequest syncedFlushRequest, ActionListener listener, Header... headers) { + restHighLevelClient.performRequestAsyncAndParseEntity(syncedFlushRequest, Request::syncedFlush, + SyncedFlushResponse::fromXContent, listener, emptySet(), headers); + } + /** * Clears the cache of one or more indices using the Clear Cache API *

diff --git a/client/rest-high-level/src/main/java/org/elasticsearch/client/Request.java b/client/rest-high-level/src/main/java/org/elasticsearch/client/Request.java index 66b34da777b6a..4e73ca0424aba 100755 --- a/client/rest-high-level/src/main/java/org/elasticsearch/client/Request.java +++ b/client/rest-high-level/src/main/java/org/elasticsearch/client/Request.java @@ -37,6 +37,7 @@ import org.elasticsearch.action.admin.indices.create.CreateIndexRequest; import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest; import org.elasticsearch.action.admin.indices.flush.FlushRequest; +import org.elasticsearch.action.admin.indices.flush.SyncedFlushRequest; import org.elasticsearch.action.admin.indices.get.GetIndexRequest; import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest; import org.elasticsearch.action.admin.indices.open.OpenIndexRequest; @@ -233,6 +234,14 @@ static Request flush(FlushRequest flushRequest) { return new Request(HttpPost.METHOD_NAME, endpoint, parameters.getParams(), null); } + static Request syncedFlush(SyncedFlushRequest syncedFlushRequest) { + String endpoint = endpoint(syncedFlushRequest.indices(), "_flush", "synced"); + Params parameters = Params.builder(); + // This request takes no other parameters other than the indices. + parameters.withIndicesOptions(syncedFlushRequest.indicesOptions()); + return new Request(HttpPost.METHOD_NAME, endpoint, parameters.getParams(), null); + } + static Request clearCache(ClearIndicesCacheRequest clearIndicesCacheRequest) { String[] indices = clearIndicesCacheRequest.indices() == null ? Strings.EMPTY_ARRAY :clearIndicesCacheRequest.indices(); String endpoint = endpoint(indices, "_cache/clear"); diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/IndicesClientIT.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/IndicesClientIT.java index 8a2ba44791149..951ad649aa33a 100755 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/IndicesClientIT.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/IndicesClientIT.java @@ -38,6 +38,8 @@ import org.elasticsearch.action.admin.indices.delete.DeleteIndexResponse; import org.elasticsearch.action.admin.indices.flush.FlushRequest; import org.elasticsearch.action.admin.indices.flush.FlushResponse; +import org.elasticsearch.action.admin.indices.flush.SyncedFlushRequest; +import org.elasticsearch.action.admin.indices.flush.SyncedFlushResponse; import org.elasticsearch.action.admin.indices.get.GetIndexRequest; import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest; import org.elasticsearch.action.admin.indices.mapping.put.PutMappingResponse; @@ -440,6 +442,32 @@ public void testFlush() throws IOException { } } + public void testSyncedFlush() throws IOException { + { + String index = "index"; + Settings settings = Settings.builder() + .put("number_of_shards", 1) + .put("number_of_replicas", 0) + .build(); + createIndex(index, settings); + SyncedFlushRequest syncedFlushRequest = new SyncedFlushRequest(index); + SyncedFlushResponse flushResponse = + execute(syncedFlushRequest, highLevelClient().indices()::syncedFlush, highLevelClient().indices()::syncedFlushAsync); + assertThat(flushResponse.totalShards(), equalTo(1)); + assertThat(flushResponse.successfulShards(), equalTo(1)); + assertThat(flushResponse.failedShards(), equalTo(0)); + //assertThat(flushResponse.shardFailures(), equalTo(BroadcastResponse.EMPTY)); + } + { + String nonExistentIndex = "non_existent_index"; + assertFalse(indexExists(nonExistentIndex)); + SyncedFlushRequest syncedFlushRequest = new SyncedFlushRequest(nonExistentIndex); + ElasticsearchException exception = expectThrows(ElasticsearchException.class, + () -> execute(syncedFlushRequest, highLevelClient().indices()::syncedFlush, highLevelClient().indices()::syncedFlushAsync)); + assertEquals(RestStatus.NOT_FOUND, exception.status()); + } + } + public void testClearCache() throws IOException { { String index = "index"; diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/flush/SyncedFlushResponse.java b/server/src/main/java/org/elasticsearch/action/admin/indices/flush/SyncedFlushResponse.java index 890e968fe60a4..4cbde7017fde7 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/flush/SyncedFlushResponse.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/flush/SyncedFlushResponse.java @@ -19,13 +19,21 @@ package org.elasticsearch.action.admin.indices.flush; import org.elasticsearch.action.ActionResponse; +import org.elasticsearch.action.support.broadcast.BroadcastResponse; +import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.routing.ShardRouting; +import org.elasticsearch.common.ParseField; +import org.elasticsearch.common.ParsingException; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Streamable; import org.elasticsearch.common.util.iterable.Iterables; import org.elasticsearch.common.xcontent.ToXContentFragment; import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentLocation; +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.common.xcontent.XContentParser.Token; +import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.indices.flush.ShardsSyncedFlushResult; import org.elasticsearch.indices.flush.SyncedFlushService; import org.elasticsearch.rest.RestStatus; @@ -38,6 +46,7 @@ import java.util.Map; import static java.util.Collections.unmodifiableMap; +import static org.elasticsearch.common.xcontent.XContentParserUtils.ensureExpectedToken; /** * The result of performing a sync flush operation on all shards of multiple indices @@ -46,6 +55,7 @@ public class SyncedFlushResponse extends ActionResponse implements ToXContentFra Map> shardsResultPerIndex; ShardCounts shardCounts; + Map shardCountsPerIndex; SyncedFlushResponse() { @@ -57,6 +67,21 @@ public SyncedFlushResponse(Map> shardsResu // ConcurrentHashMap this.shardsResultPerIndex = unmodifiableMap(shardsResultPerIndex); this.shardCounts = calculateShardCounts(Iterables.flatten(shardsResultPerIndex.values())); + this.shardCountsPerIndex = new HashMap<>(); + for (Map.Entry> entry: shardsResultPerIndex.entrySet()) { + this.shardCountsPerIndex.put(entry.getKey(), calculateShardCounts(entry.getValue())); + } + } + + public SyncedFlushResponse(ShardCounts shardCounts, + Map> shardsResultPerIndex, + Map shardCountsPerIndex) { + // shardsResultPerIndex is never modified after it is passed to this + // constructor so this is safe even though shardsResultPerIndex is a + // ConcurrentHashMap + this.shardsResultPerIndex = unmodifiableMap(shardsResultPerIndex); + this.shardCounts = shardCounts; + this.shardCountsPerIndex = unmodifiableMap(shardCountsPerIndex); } /** @@ -88,6 +113,15 @@ public Map> getShardsResultPerIndex() { return shardsResultPerIndex; } + /** + * Get the ShardCount for a particular index name. + * @param index name of the index to be searched + * @return ShardCounts or {@code null} if index is not present + */ + public ShardCounts getShardCountsForIndex(String index) { + return shardCountsPerIndex.get(index); + } + @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { builder.startObject(Fields._SHARDS); @@ -103,6 +137,8 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws for (ShardsSyncedFlushResult shardResults : indexResult) { if (shardResults.failed()) { builder.startObject(); + builder.field(Fields.TOTAL_COPIES, shardResults.totalShards()); + builder.field(Fields.SUCCESSFUL_COPIES, shardResults.successfulShards()); builder.field(Fields.SHARD, shardResults.shardId().id()); builder.field(Fields.REASON, shardResults.failureReason()); builder.endObject(); @@ -111,6 +147,8 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws Map failedShards = shardResults.failedShards(); for (Map.Entry shardEntry : failedShards.entrySet()) { builder.startObject(); + builder.field(Fields.TOTAL_COPIES, shardResults.totalShards()); + builder.field(Fields.SUCCESSFUL_COPIES, shardResults.successfulShards()); builder.field(Fields.SHARD, shardResults.shardId().id()); builder.field(Fields.REASON, shardEntry.getValue().failureReason()); builder.field(Fields.ROUTING, shardEntry.getKey()); @@ -124,6 +162,173 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws return builder; } + public static SyncedFlushResponse fromXContent(XContentParser parser) throws IOException { + ensureExpectedToken(Token.START_OBJECT, parser.nextToken(), parser::getTokenLocation); + parser.nextToken(); + return innerFromXContent(parser); + } + + private static SyncedFlushResponse innerFromXContent(XContentParser parser) throws IOException { + ShardCounts totalShardCounts = null; + Map> shardsResultPerIndex = new HashMap<>(); + Map shardsCountsPerIndex = new HashMap<>(); + // If it is an object we try to parse it for Fields._SHARD or for an index entry + for (Token curToken = parser.currentToken(); curToken != Token.END_OBJECT; curToken = parser.nextToken()) { + ensureExpectedToken(Token.FIELD_NAME, parser.currentToken(), parser::getTokenLocation); + String fieldName = parser.currentName(); + curToken = parser.nextToken(); + Integer totalShards = null; + Integer successfulShards = null; + Integer failedShards = null; + List listShardsSyncedFlushResult = new ArrayList<>(); + if (curToken == Token.START_OBJECT) { // Start parsing for _shard or for index + for (curToken = parser.nextToken(); curToken != Token.END_OBJECT; curToken = parser.nextToken()) { + if (curToken == Token.FIELD_NAME) { + String level2FieldName = parser.currentName(); + curToken = parser.nextToken(); + switch (level2FieldName) { + case Fields.TOTAL: + ensureExpectedToken(Token.VALUE_NUMBER, curToken, parser::getTokenLocation); + totalShards = parser.intValue(); + break; + case Fields.SUCCESSFUL: + ensureExpectedToken(Token.VALUE_NUMBER, curToken, parser::getTokenLocation); + successfulShards = parser.intValue(); + break; + case Fields.FAILED: + ensureExpectedToken(Token.VALUE_NUMBER, curToken, parser::getTokenLocation); + failedShards = parser.intValue(); + break; + case Fields.FAILURES: + if (!fieldName.equals(Fields._SHARDS)) { + ensureExpectedToken(Token.START_ARRAY, curToken, parser::getTokenLocation); + Map shardsSyncedFlushResults = new HashMap<>(); + Map> + failedSharedResponses = new HashMap<>(); + for (curToken = parser.nextToken(); curToken != Token.END_ARRAY; curToken = parser.nextToken()) { + ensureExpectedToken(Token.START_OBJECT, curToken, parser::getTokenLocation); + ShardRouting routing = null; + String failureReason = null; + ShardId shardId = null; + Integer totalShardCopies = null; + Integer successfulShardCopies = null; + XContentLocation startLocation = parser.getTokenLocation(); + for (curToken = parser.nextToken(); curToken != Token.END_OBJECT; curToken = parser.nextToken()) { + ensureExpectedToken(Token.FIELD_NAME, curToken, parser::getTokenLocation); + String level3FieldName = parser.currentName(); + curToken = parser.nextToken(); + switch (level3FieldName) { + case Fields.SHARD: + ensureExpectedToken(Token.VALUE_NUMBER, curToken, parser::getTokenLocation); + shardId = new ShardId( + fieldName, + IndexMetaData.INDEX_UUID_NA_VALUE, + parser.intValue() + ); + break; + case Fields.REASON: + ensureExpectedToken(Token.VALUE_STRING, curToken, parser::getTokenLocation); + failureReason = parser.text(); + break; + case Fields.ROUTING: + routing = ShardRouting.fromXContent(parser); + break; + case Fields.TOTAL_COPIES: + ensureExpectedToken(Token.VALUE_NUMBER, curToken, parser::getTokenLocation); + totalShardCopies = parser.intValue(); + break; + case Fields.SUCCESSFUL_COPIES: + ensureExpectedToken(Token.VALUE_NUMBER, curToken, parser::getTokenLocation); + successfulShardCopies = parser.intValue(); + break; + default: + // If something else skip it + parser.skipChildren(); + break; + } + } + if (failureReason != null && + shardId != null && + totalShardCopies != null && + successfulShardCopies != null) { + // This is ugly but there is only one ShardsSyncedFlushResult for each shardId + // so this will work. + shardsSyncedFlushResults.putIfAbsent ( + shardId, + new ShardsSyncedFlushResult( + shardId, + totalShardCopies, + successfulShardCopies, + routing == null ? failureReason : null + ) + ); + if (routing != null) { + if (failedSharedResponses.containsKey(shardId)) { + failedSharedResponses.get(shardId).put( + routing, new SyncedFlushService.ShardSyncedFlushResponse(failureReason) + ); + } else { + Map m = + new HashMap<>(); + m.put( + routing, + new SyncedFlushService.ShardSyncedFlushResponse(failureReason) + ); + failedSharedResponses.put(shardId, m); + } + } + } else { + throw new ParsingException(startLocation, "Unable to construct ShardsSyncedFlushResult"); + } + } + for (Map.Entry entry: shardsSyncedFlushResults.entrySet()) { + ShardsSyncedFlushResult result; + if (failedSharedResponses.containsKey(entry.getKey())) { + result = new ShardsSyncedFlushResult( + entry.getValue().shardId(), + null, // syncid is null since this is a failure response + entry.getValue().totalShards(), + entry.getValue().successfulShards(), + failedSharedResponses.get(entry.getKey()) + ); + } else { + result = entry.getValue(); + } + listShardsSyncedFlushResult.add(result); + } + } else { + parser.skipChildren(); + } + break; + default: + parser.skipChildren(); + break; + } + } else { + parser.skipChildren(); + } + } + if (totalShards != null && + successfulShards != null && + failedShards != null) { + ShardCounts shardCount = new ShardCounts(totalShards, successfulShards, failedShards); + if (fieldName.equals(Fields._SHARDS)) { + totalShardCounts = shardCount; + } else { + shardsCountsPerIndex.put(fieldName, shardCount); + shardsResultPerIndex.put(fieldName, listShardsSyncedFlushResult); + } + } + } else { // Else leave this tree alone + parser.skipChildren(); + } + } + return new SyncedFlushResponse( + totalShardCounts, shardsResultPerIndex, shardsCountsPerIndex + ); + } + + static ShardCounts calculateShardCounts(Iterable results) { int total = 0, successful = 0, failed = 0; for (ShardsSyncedFlushResult result : results) { @@ -185,6 +390,9 @@ static final class Fields { static final String SUCCESSFUL = "successful"; static final String FAILED = "failed"; static final String FAILURES = "failures"; + static final String TOTAL_COPIES = "total_copies"; + static final String FAILED_COPIES = "failed_copies"; + static final String SUCCESSFUL_COPIES = "successful_copies"; static final String SHARD = "shard"; static final String ROUTING = "routing"; static final String REASON = "reason"; diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/RecoverySource.java b/server/src/main/java/org/elasticsearch/cluster/routing/RecoverySource.java index ff7aab4a25622..f116814c1638e 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/RecoverySource.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/RecoverySource.java @@ -20,16 +20,23 @@ package org.elasticsearch.cluster.routing; import org.elasticsearch.Version; +import org.elasticsearch.common.ParsingException; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.ToXContentObject; import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentLocation; +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.common.xcontent.XContentParser.Token; import org.elasticsearch.snapshots.Snapshot; import java.io.IOException; import java.util.Objects; +import org.elasticsearch.snapshots.SnapshotId; + +import static org.elasticsearch.common.xcontent.XContentParserUtils.ensureExpectedToken; /** * Represents the recovery source of a shard. Available recovery types are: @@ -49,6 +56,88 @@ public final XContentBuilder toXContent(XContentBuilder builder, ToXContent.Para return builder.endObject(); } + public static RecoverySource fromXContent(XContentParser parser) throws IOException { + ensureExpectedToken(Token.START_OBJECT, parser.nextToken(), parser::getTokenLocation); + XContentLocation startingPosition = parser.getTokenLocation(); + Type type = null; + RecoverySource recoverySource = null; + // The following fields should ideally be handled by the child classes but the structure of the + // JSON prohibits this for now. + Version version = null; + String index = null; + String snapshotRepository = null; + SnapshotId snapshotId = null; + for (Token t = parser.nextToken(); t != Token.END_OBJECT; t = parser.nextToken()) { + ensureExpectedToken(Token.FIELD_NAME, t, parser::getTokenLocation); + String fieldName = parser.currentName(); + t = parser.nextToken(); + if (t.isValue()) { + switch (fieldName) { + case "type": + String typeString = parser.text(); + type = Type.valueOf(parser.text()); + switch (type) { + case EMPTY_STORE: + recoverySource = StoreRecoverySource.EMPTY_STORE_INSTANCE; + break; + case EXISTING_STORE: + recoverySource = StoreRecoverySource.EXISTING_STORE_INSTANCE; + break; + case PEER: + recoverySource = PeerRecoverySource.INSTANCE; + break; + case SNAPSHOT: + // We don't do anything. Will construct it from other stuff later + break; + case LOCAL_SHARDS: + recoverySource = LocalShardsRecoverySource.INSTANCE; + break; + default: throw new ParsingException(parser.getTokenLocation(), + "unknown recovery type: " + typeString); + } + break; + case "repository": + snapshotRepository = parser.text(); + break; + case "snapshot": + String snapshotName = parser.text(); + /** + * We use the name for Id and Name using the old format here since xContent has + * old format. See {@link org.elasticsearch.snapshots.SnapshotId#fromXContent} + */ + snapshotId = new SnapshotId(snapshotName, snapshotName); + break; + case "version": + version = Version.fromString(parser.text()); + break; + case "index": + index = parser.text(); + break; + default: + } + } else { // Else skip the tree + parser.skipChildren(); + } + } + // We only check if necessary information is present. Extra stuff is ignored. + if (type != null) { + if (recoverySource != null) { + return recoverySource; + } else if ( + type == Type.SNAPSHOT && + version != null && + index != null && + snapshotRepository != null && + snapshotId != null) { + return new SnapshotRecoverySource(new Snapshot(snapshotRepository, snapshotId), version, index); + } else { + throw new ParsingException(startingPosition, "Unable to recover RecoverySource from JSON"); + } + } else { + throw new ParsingException(startingPosition, "Unable to find type for RecoverySource from JSON"); + } + } + /** * to be overridden by subclasses */ diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/ShardRouting.java b/server/src/main/java/org/elasticsearch/cluster/routing/ShardRouting.java index be1213ad134f1..79cc932d0aee7 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/ShardRouting.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/ShardRouting.java @@ -19,16 +19,21 @@ package org.elasticsearch.cluster.routing; +import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.routing.RecoverySource.PeerRecoverySource; import org.elasticsearch.cluster.routing.RecoverySource.StoreRecoverySource; import org.elasticsearch.cluster.routing.allocation.allocator.BalancedShardsAllocator; import org.elasticsearch.common.Nullable; +import org.elasticsearch.common.ParsingException; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.common.xcontent.ToXContent.Params; import org.elasticsearch.common.xcontent.ToXContentObject; import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentLocation; +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.common.xcontent.XContentParser.Token; import org.elasticsearch.index.Index; import org.elasticsearch.index.shard.ShardId; @@ -36,6 +41,8 @@ import java.util.Collections; import java.util.List; +import static org.elasticsearch.common.xcontent.XContentParserUtils.ensureExpectedToken; + /** * {@link ShardRouting} immutably encapsulates information about shard * routings like id, state, version, etc. @@ -615,20 +622,20 @@ public String shortSummary() { @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { builder.startObject() - .field("state", state()) - .field("primary", primary()) - .field("node", currentNodeId()) - .field("relocating_node", relocatingNodeId()) - .field("shard", id()) - .field("index", getIndexName()); + .field(Fields.STATE, state()) + .field(Fields.PRIMARY, primary()) + .field(Fields.NODE, currentNodeId()) + .field(Fields.RELOCATING_NODE, relocatingNodeId()) + .field(Fields.SHARD, id()) + .field(Fields.INDEX, getIndexName()); if (expectedShardSize != UNAVAILABLE_EXPECTED_SHARD_SIZE) { - builder.field("expected_shard_size_in_bytes", expectedShardSize); + builder.field(Fields.EXPECTED_SHARD_SIZE_IN_BYTES, expectedShardSize); } if (recoverySource != null) { - builder.field("recovery_source", recoverySource); + builder.field(Fields.RECOVERY_SOURCE, recoverySource); } if (allocationId != null) { - builder.field("allocation_id"); + builder.field(Fields.ALLOCATION_ID); allocationId.toXContent(builder, params); } if (unassignedInfo != null) { @@ -637,6 +644,104 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws return builder.endObject(); } + + public static ShardRouting fromXContent(XContentParser parser) throws IOException { + ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.currentToken(), parser::getTokenLocation); + XContentLocation startingLocation = parser.getTokenLocation(); + ShardRoutingState state = null; + Boolean isPrimary = null; + String nodeId = null; + String relocatingNodeid = null; + Integer shardId = null; + String indexName = null; + long expectedShardSizeInBytes = -1; + RecoverySource recoverySource = null; + AllocationId allocationId = null; + UnassignedInfo unassignedInfo = null; + for (Token t = parser.nextToken(); t != Token.END_OBJECT; t = parser.nextToken()) { + ensureExpectedToken(XContentParser.Token.FIELD_NAME, parser.nextToken(), parser::getTokenLocation); + String fieldName = parser.currentName(); + Token currentToken = parser.nextToken(); // Move to value of the field + switch (fieldName) { + case Fields.STATE: + ensureExpectedToken(currentToken, Token.VALUE_STRING, parser::getTokenLocation); + state = ShardRoutingState.valueOf(parser.text()); + break; + case Fields.PRIMARY: + ensureExpectedToken(currentToken, Token.VALUE_BOOLEAN, parser::getTokenLocation); + isPrimary = parser.booleanValue(); + break; + case Fields.NODE: + ensureExpectedToken(currentToken, Token.VALUE_STRING, parser::getTokenLocation); + nodeId = parser.text(); + break; + case Fields.RELOCATING_NODE: + ensureExpectedToken(currentToken, Token.VALUE_STRING, parser::getTokenLocation); + relocatingNodeid = parser.text(); + break; + case Fields.SHARD: + ensureExpectedToken(currentToken, Token.VALUE_NUMBER, parser::getTokenLocation); + shardId = parser.intValue(); + break; + case Fields.INDEX: + ensureExpectedToken(currentToken, Token.VALUE_STRING, parser::getTokenLocation); + indexName = parser.text(); + break; + case Fields.EXPECTED_SHARD_SIZE_IN_BYTES: + ensureExpectedToken(currentToken, Token.VALUE_STRING, parser::getTokenLocation); + expectedShardSizeInBytes = parser.longValue(); + break; + case Fields.RECOVERY_SOURCE: + recoverySource = RecoverySource.fromXContent(parser); + break; + case Fields.ALLOCATION_ID: + allocationId = AllocationId.fromXContent(parser); + break; + case Fields.UNASSIGNED_INFO: + unassignedInfo = UnassignedInfo.fromXContent(parser); + break; + default: + parser.skipChildren(); // Else skip the whole tree + break; + + } + } + if (state != null && + isPrimary != null && + nodeId != null && + relocatingNodeid != null && + shardId != null && + indexName != null) { + return + new ShardRouting( + new ShardId(new Index(indexName, IndexMetaData.INDEX_UUID_NA_VALUE), shardId), + nodeId, + relocatingNodeid, + isPrimary, + state, + recoverySource, + unassignedInfo, + allocationId, + expectedShardSizeInBytes + ); + } else { + throw new ParsingException(startingLocation, "Unable to construct ShardRouting information from JSON"); + } + } + + static final class Fields { + static final String STATE = "state"; + static final String PRIMARY = "primary"; + static final String NODE = "node"; + static final String RELOCATING_NODE = "relocating_node"; + static final String SHARD = "shard"; + static final String INDEX = "index"; + static final String EXPECTED_SHARD_SIZE_IN_BYTES = "expected_shard_size_in_bytes"; + static final String RECOVERY_SOURCE = "recovery_source"; + static final String ALLOCATION_ID = "allocation_id"; + static final String UNASSIGNED_INFO = "unassigned_info"; + } + /** * Returns the expected shard size for {@link ShardRoutingState#RELOCATING} and {@link ShardRoutingState#INITIALIZING} * shards. If it's size is not available {@value #UNAVAILABLE_EXPECTED_SHARD_SIZE} will be returned. diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/UnassignedInfo.java b/server/src/main/java/org/elasticsearch/cluster/routing/UnassignedInfo.java index a543f4c3d3b3e..5440f2a5683ab 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/UnassignedInfo.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/UnassignedInfo.java @@ -25,6 +25,7 @@ import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.routing.allocation.decider.Decision; import org.elasticsearch.common.Nullable; +import org.elasticsearch.common.ParsingException; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Writeable; @@ -37,10 +38,16 @@ import org.elasticsearch.common.xcontent.ToXContent.Params; import org.elasticsearch.common.xcontent.ToXContentFragment; import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentLocation; +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.common.xcontent.XContentParser.Token; import java.io.IOException; import java.util.Locale; import java.util.Objects; +import org.joda.time.DateTime; + +import static org.elasticsearch.common.xcontent.XContentParserUtils.ensureExpectedToken; /** * Holds additional information as to why the shard is in unassigned state. @@ -431,21 +438,92 @@ public String toString() { @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { builder.startObject("unassigned_info"); - builder.field("reason", reason); - builder.field("at", DATE_TIME_FORMATTER.printer().print(unassignedTimeMillis)); + builder.field(Fields.REASON, reason); + builder.field(Fields.AT, DATE_TIME_FORMATTER.printer().print(unassignedTimeMillis)); if (failedAllocations > 0) { - builder.field("failed_attempts", failedAllocations); + builder.field(Fields.FAILED_ATTEMPTS, failedAllocations); } - builder.field("delayed", delayed); + builder.field(Fields.DELAYED, delayed); String details = getDetails(); if (details != null) { - builder.field("details", details); + builder.field(Fields.DETAILS, details); } - builder.field("allocation_status", lastAllocationStatus.value()); + builder.field(Fields.ALLOCATION_STATUS, lastAllocationStatus.value()); builder.endObject(); return builder; } + public static UnassignedInfo fromXContent(XContentParser parser) throws IOException { + ensureExpectedToken(Token.START_OBJECT, parser.currentToken(), parser::getTokenLocation); + XContentLocation startingLocation = parser.getTokenLocation(); + Reason reason = null; + // The message and exception always remains null as constructing an exception from 'details' + // is too much work + String message = null; + Exception failure = null; + int failedAllocations = 0; + // See UnassignedInfo(StreamInput in) constructor for details on why we reset the time here + Long unassignedTimeNanos = System.nanoTime(); + Long unassignedTimeMillis = null; + Boolean delayed = null; + AllocationStatus allocationStatus = null; + + for (Token t = parser.nextToken(); t != Token.END_OBJECT; t = parser.nextToken()) { + ensureExpectedToken(Token.FIELD_NAME, t, parser::getTokenLocation); + String fieldName = parser.currentName(); + t = parser.nextToken(); + switch (fieldName) { + case Fields.REASON: + ensureExpectedToken(Token.VALUE_STRING, t, parser::getTokenLocation); + reason = Reason.valueOf(parser.text()); + break; + case Fields.AT: + ensureExpectedToken(Token.VALUE_STRING, t, parser::getTokenLocation); + DateTime dt = DATE_TIME_FORMATTER.parser().parseDateTime(parser.text()); + unassignedTimeMillis = dt.getMillis(); + break; + case Fields.FAILED_ATTEMPTS: + ensureExpectedToken(Token.VALUE_NUMBER, t, parser::getTokenLocation); + failedAllocations = parser.intValue(); + break; + case Fields.DELAYED: + ensureExpectedToken(Token.VALUE_BOOLEAN, t, parser::getTokenLocation); + delayed = parser.booleanValue(); + break; + case Fields.DETAILS: + // For now we ignore this and set it null + ensureExpectedToken(Token.VALUE_BOOLEAN, t, parser::getTokenLocation); + break; + case Fields.ALLOCATION_STATUS: + ensureExpectedToken(Token.VALUE_STRING, t, parser::getTokenLocation); + allocationStatus = AllocationStatus.valueOf(parser.text()); + break; + default: + parser.skipChildren(); // else skip the whole tree with this fieldname + break; + } + } + if (reason != null && + unassignedTimeMillis != null && + delayed != null && + allocationStatus != null + ) { + return new UnassignedInfo(reason, null, null, failedAllocations, unassignedTimeNanos, + unassignedTimeMillis, delayed, allocationStatus); + } else { + throw new ParsingException(startingLocation, "Unable to construct UnassignedInfo from JSON"); + } + } + + static final class Fields { + public static final String REASON = "reason"; + public static final String AT = "at"; + public static final String FAILED_ATTEMPTS = "failed_attempts"; + public static final String DELAYED = "delayed"; + public static final String DETAILS = "details"; + public static final String ALLOCATION_STATUS = "allocation_status"; + } + @Override public boolean equals(Object o) { if (this == o) { diff --git a/server/src/main/java/org/elasticsearch/indices/flush/ShardsSyncedFlushResult.java b/server/src/main/java/org/elasticsearch/indices/flush/ShardsSyncedFlushResult.java index 04564943ca790..0c936d870f24f 100644 --- a/server/src/main/java/org/elasticsearch/indices/flush/ShardsSyncedFlushResult.java +++ b/server/src/main/java/org/elasticsearch/indices/flush/ShardsSyncedFlushResult.java @@ -41,8 +41,11 @@ public class ShardsSyncedFlushResult implements Streamable { private ShardId shardId; // some shards may be unassigned, so we need this as state private int totalShards; + // we will use lazy initialization for this if not getting from XContent + private int successfulShards; private ShardsSyncedFlushResult() { + this.successfulShards = -1; } public ShardId getShardId() { @@ -58,6 +61,7 @@ public ShardsSyncedFlushResult(ShardId shardId, int totalShards, String failureR this.shardResponses = emptyMap(); this.shardId = shardId; this.totalShards = totalShards; + this.successfulShards = -1; } /** @@ -69,6 +73,32 @@ public ShardsSyncedFlushResult(ShardId shardId, String syncId, int totalShards, this.syncId = syncId; this.totalShards = totalShards; this.shardId = shardId; + this.successfulShards = -1; + } + + /** + * XContent constructor: Partial failure + */ + public ShardsSyncedFlushResult(ShardId shardId, int totalShards, int successfulShards, String failureReason) { + this.syncId = null; + this.failureReason = failureReason; + this.shardResponses = emptyMap(); + this.shardId = shardId; + this.totalShards = totalShards; + this.successfulShards = successfulShards; + } + + /** + * XContent constructor: Partial failure with shardResponses + */ + public ShardsSyncedFlushResult(ShardId shardId, String syncId, int totalShards, int successfulShards, + Map shardResponses) { + this.failureReason = null; + this.shardResponses = unmodifiableMap(new HashMap<>(shardResponses)); + this.syncId = syncId; + this.totalShards = totalShards; + this.shardId = shardId; + this.successfulShards = successfulShards; } /** @@ -101,13 +131,16 @@ public int totalShards() { * @return total number of successful shards */ public int successfulShards() { - int i = 0; - for (SyncedFlushService.ShardSyncedFlushResponse result : shardResponses.values()) { - if (result.success()) { - i++; + if (this.successfulShards < 0) { + int i = 0; + for (SyncedFlushService.ShardSyncedFlushResponse result : shardResponses.values()) { + if (result.success()) { + i++; + } } + this.successfulShards = i; } - return i; + return this.successfulShards; } /** From 76bf15b97ff957896263fc4a93bb509d1830ca41 Mon Sep 17 00:00:00 2001 From: Sohaib Iftikhar Date: Thu, 22 Mar 2018 02:32:43 +0100 Subject: [PATCH 2/4] Added Unit Tests MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit — Fixed issues with parser for null values — Cleaned up the response parsing for SyncedFlush — TODO: Run complete test suite and check style cleanup --- .../elasticsearch/client/IndicesClient.java | 6 +- .../org/elasticsearch/client/Request.java | 3 +- .../elasticsearch/client/RequestTests.java | 27 ++++ .../indices/flush/SyncedFlushResponse.java | 148 +++++++++--------- .../cluster/routing/RecoverySource.java | 24 ++- .../cluster/routing/ShardRouting.java | 41 +++-- .../cluster/routing/UnassignedInfo.java | 23 +-- .../flush/ShardsSyncedFlushResult.java | 4 +- .../flush/AbstractSyncedFlushTest.java | 109 +++++++++++++ .../flush/SyncedFlushResponseUnitTests.java | 90 +++++++++++ .../indices/flush/SyncedFlushUnitTests.java | 69 +------- .../cluster/routing/RecoverySourceTests.java | 25 +++ .../cluster/routing/ShardRoutingTests.java | 34 ++++ .../cluster/routing/UnassignedInfoTests.java | 47 +++++- 14 files changed, 465 insertions(+), 185 deletions(-) create mode 100644 server/src/test/java/org/elasticsearch/action/admin/indices/flush/AbstractSyncedFlushTest.java create mode 100644 server/src/test/java/org/elasticsearch/action/admin/indices/flush/SyncedFlushResponseUnitTests.java diff --git a/client/rest-high-level/src/main/java/org/elasticsearch/client/IndicesClient.java b/client/rest-high-level/src/main/java/org/elasticsearch/client/IndicesClient.java index ebd2b642a33e4..5cb225d269a18 100644 --- a/client/rest-high-level/src/main/java/org/elasticsearch/client/IndicesClient.java +++ b/client/rest-high-level/src/main/java/org/elasticsearch/client/IndicesClient.java @@ -265,7 +265,8 @@ public void flushAsync(FlushRequest flushRequest, ActionListener /** Initiate a synced flush manually using the synced flush API *

- * See Synced flush API on elastic.co + * See + * Synced flush API on elastic.co */ public SyncedFlushResponse syncedFlush(SyncedFlushRequest syncedFlushRequest, Header... headers) throws IOException { return restHighLevelClient.performRequestAndParseEntity(syncedFlushRequest, Request::syncedFlush, @@ -275,7 +276,8 @@ public SyncedFlushResponse syncedFlush(SyncedFlushRequest syncedFlushRequest, He /** * Asynchronously initiate a synced flush manually using the synced flush API *

- * See Synced flush API on elastic.co + * See + * Synced flush API on elastic.co */ public void syncedFlushAsync(SyncedFlushRequest syncedFlushRequest, ActionListener listener, Header... headers) { restHighLevelClient.performRequestAsyncAndParseEntity(syncedFlushRequest, Request::syncedFlush, diff --git a/client/rest-high-level/src/main/java/org/elasticsearch/client/Request.java b/client/rest-high-level/src/main/java/org/elasticsearch/client/Request.java index 4e73ca0424aba..6dd0e440e21aa 100755 --- a/client/rest-high-level/src/main/java/org/elasticsearch/client/Request.java +++ b/client/rest-high-level/src/main/java/org/elasticsearch/client/Request.java @@ -235,7 +235,8 @@ static Request flush(FlushRequest flushRequest) { } static Request syncedFlush(SyncedFlushRequest syncedFlushRequest) { - String endpoint = endpoint(syncedFlushRequest.indices(), "_flush", "synced"); + String[] indices = syncedFlushRequest.indices() == null ? Strings.EMPTY_ARRAY : syncedFlushRequest.indices(); + String endpoint = endpoint(indices, "_flush", "synced"); Params parameters = Params.builder(); // This request takes no other parameters other than the indices. parameters.withIndicesOptions(syncedFlushRequest.indicesOptions()); diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/RequestTests.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/RequestTests.java index f79135c44f5ec..36fee8103b3c9 100755 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/RequestTests.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/RequestTests.java @@ -40,6 +40,7 @@ import org.elasticsearch.action.admin.indices.create.CreateIndexRequest; import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest; import org.elasticsearch.action.admin.indices.flush.FlushRequest; +import org.elasticsearch.action.admin.indices.flush.SyncedFlushRequest; import org.elasticsearch.action.admin.indices.get.GetIndexRequest; import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest; import org.elasticsearch.action.admin.indices.open.OpenIndexRequest; @@ -621,6 +622,32 @@ public void testFlush() { assertThat(request.getMethod(), equalTo(HttpPost.METHOD_NAME)); } + public void testSyncedFlush() { + String[] indices = randomBoolean() ? null : randomIndicesNames(0, 5); + SyncedFlushRequest syncedFlushRequest; + if (randomBoolean()) { + syncedFlushRequest = new SyncedFlushRequest(indices); + } else { + syncedFlushRequest = new SyncedFlushRequest(); + syncedFlushRequest.indices(indices); + } + Map expectedParams = new HashMap<>(); + setRandomIndicesOptions(syncedFlushRequest::indicesOptions, syncedFlushRequest::indicesOptions, expectedParams); + + + Request request = Request.syncedFlush(syncedFlushRequest); + StringJoiner endpoint = new StringJoiner("/", "/", ""); + if (indices != null && indices.length > 0) { + endpoint.add(String.join(",", indices)); + } + endpoint.add("_flush"); + endpoint.add("synced"); + assertThat(request.getEndpoint(), equalTo(endpoint.toString())); + assertThat(request.getParameters(), equalTo(expectedParams)); + assertThat(request.getEntity(), nullValue()); + assertThat(request.getMethod(), equalTo(HttpPost.METHOD_NAME)); + } + public void testClearCache() { String[] indices = randomBoolean() ? null : randomIndicesNames(0, 5); ClearIndicesCacheRequest clearIndicesCacheRequest; diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/flush/SyncedFlushResponse.java b/server/src/main/java/org/elasticsearch/action/admin/indices/flush/SyncedFlushResponse.java index 4cbde7017fde7..4d995c0daeb25 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/flush/SyncedFlushResponse.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/flush/SyncedFlushResponse.java @@ -19,11 +19,10 @@ package org.elasticsearch.action.admin.indices.flush; import org.elasticsearch.action.ActionResponse; -import org.elasticsearch.action.support.broadcast.BroadcastResponse; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.routing.ShardRouting; -import org.elasticsearch.common.ParseField; import org.elasticsearch.common.ParsingException; +import org.elasticsearch.common.inject.internal.Nullable; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Streamable; @@ -54,8 +53,8 @@ public class SyncedFlushResponse extends ActionResponse implements ToXContentFragment { Map> shardsResultPerIndex; - ShardCounts shardCounts; Map shardCountsPerIndex; + ShardCounts shardCounts; SyncedFlushResponse() { @@ -67,21 +66,21 @@ public SyncedFlushResponse(Map> shardsResu // ConcurrentHashMap this.shardsResultPerIndex = unmodifiableMap(shardsResultPerIndex); this.shardCounts = calculateShardCounts(Iterables.flatten(shardsResultPerIndex.values())); - this.shardCountsPerIndex = new HashMap<>(); + Map shardsCountsPerIndex = new HashMap<>(); for (Map.Entry> entry: shardsResultPerIndex.entrySet()) { - this.shardCountsPerIndex.put(entry.getKey(), calculateShardCounts(entry.getValue())); + shardsCountsPerIndex.put(entry.getKey(), calculateShardCounts(entry.getValue())); } + this.shardCountsPerIndex = unmodifiableMap(shardsCountsPerIndex); } - public SyncedFlushResponse(ShardCounts shardCounts, - Map> shardsResultPerIndex, + public SyncedFlushResponse(ShardCounts shardCounts, Map> shardsResultPerIndex, Map shardCountsPerIndex) { // shardsResultPerIndex is never modified after it is passed to this // constructor so this is safe even though shardsResultPerIndex is a // ConcurrentHashMap this.shardsResultPerIndex = unmodifiableMap(shardsResultPerIndex); this.shardCounts = shardCounts; - this.shardCountsPerIndex = unmodifiableMap(shardCountsPerIndex); + this.shardCountsPerIndex = shardCountsPerIndex; } /** @@ -113,13 +112,8 @@ public Map> getShardsResultPerIndex() { return shardsResultPerIndex; } - /** - * Get the ShardCount for a particular index name. - * @param index name of the index to be searched - * @return ShardCounts or {@code null} if index is not present - */ - public ShardCounts getShardCountsForIndex(String index) { - return shardCountsPerIndex.get(index); + public Map getShardCountsPerIndex() { + return shardCountsPerIndex; } @Override @@ -137,20 +131,20 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws for (ShardsSyncedFlushResult shardResults : indexResult) { if (shardResults.failed()) { builder.startObject(); - builder.field(Fields.TOTAL_COPIES, shardResults.totalShards()); - builder.field(Fields.SUCCESSFUL_COPIES, shardResults.successfulShards()); builder.field(Fields.SHARD, shardResults.shardId().id()); builder.field(Fields.REASON, shardResults.failureReason()); + builder.field(Fields.TOTAL_COPIES, shardResults.totalShards()); + builder.field(Fields.SUCCESSFUL_COPIES, shardResults.successfulShards()); builder.endObject(); continue; } Map failedShards = shardResults.failedShards(); for (Map.Entry shardEntry : failedShards.entrySet()) { builder.startObject(); - builder.field(Fields.TOTAL_COPIES, shardResults.totalShards()); - builder.field(Fields.SUCCESSFUL_COPIES, shardResults.successfulShards()); builder.field(Fields.SHARD, shardResults.shardId().id()); builder.field(Fields.REASON, shardEntry.getValue().failureReason()); + builder.field(Fields.TOTAL_COPIES, shardResults.totalShards()); + builder.field(Fields.SUCCESSFUL_COPIES, shardResults.successfulShards()); builder.field(Fields.ROUTING, shardEntry.getKey()); builder.endObject(); } @@ -170,8 +164,8 @@ public static SyncedFlushResponse fromXContent(XContentParser parser) throws IOE private static SyncedFlushResponse innerFromXContent(XContentParser parser) throws IOException { ShardCounts totalShardCounts = null; - Map> shardsResultPerIndex = new HashMap<>(); Map shardsCountsPerIndex = new HashMap<>(); + Map> shardsResultPerIndex = new HashMap<>(); // If it is an object we try to parse it for Fields._SHARD or for an index entry for (Token curToken = parser.currentToken(); curToken != Token.END_OBJECT; curToken = parser.nextToken()) { ensureExpectedToken(Token.FIELD_NAME, parser.currentToken(), parser::getTokenLocation); @@ -180,7 +174,7 @@ private static SyncedFlushResponse innerFromXContent(XContentParser parser) thro Integer totalShards = null; Integer successfulShards = null; Integer failedShards = null; - List listShardsSyncedFlushResult = new ArrayList<>(); + Map> failures = new HashMap<>(); if (curToken == Token.START_OBJECT) { // Start parsing for _shard or for index for (curToken = parser.nextToken(); curToken != Token.END_OBJECT; curToken = parser.nextToken()) { if (curToken == Token.FIELD_NAME) { @@ -202,16 +196,13 @@ private static SyncedFlushResponse innerFromXContent(XContentParser parser) thro case Fields.FAILURES: if (!fieldName.equals(Fields._SHARDS)) { ensureExpectedToken(Token.START_ARRAY, curToken, parser::getTokenLocation); - Map shardsSyncedFlushResults = new HashMap<>(); - Map> - failedSharedResponses = new HashMap<>(); for (curToken = parser.nextToken(); curToken != Token.END_ARRAY; curToken = parser.nextToken()) { ensureExpectedToken(Token.START_OBJECT, curToken, parser::getTokenLocation); ShardRouting routing = null; String failureReason = null; + Integer totalCopies = null; + Integer successfulCopies = null; ShardId shardId = null; - Integer totalShardCopies = null; - Integer successfulShardCopies = null; XContentLocation startLocation = parser.getTokenLocation(); for (curToken = parser.nextToken(); curToken != Token.END_OBJECT; curToken = parser.nextToken()) { ensureExpectedToken(Token.FIELD_NAME, curToken, parser::getTokenLocation); @@ -230,16 +221,16 @@ private static SyncedFlushResponse innerFromXContent(XContentParser parser) thro ensureExpectedToken(Token.VALUE_STRING, curToken, parser::getTokenLocation); failureReason = parser.text(); break; - case Fields.ROUTING: - routing = ShardRouting.fromXContent(parser); - break; case Fields.TOTAL_COPIES: ensureExpectedToken(Token.VALUE_NUMBER, curToken, parser::getTokenLocation); - totalShardCopies = parser.intValue(); + totalCopies = parser.intValue(); break; case Fields.SUCCESSFUL_COPIES: ensureExpectedToken(Token.VALUE_NUMBER, curToken, parser::getTokenLocation); - successfulShardCopies = parser.intValue(); + successfulCopies = parser.intValue(); + break; + case Fields.ROUTING: + routing = ShardRouting.fromXContent(parser); break; default: // If something else skip it @@ -249,52 +240,19 @@ private static SyncedFlushResponse innerFromXContent(XContentParser parser) thro } if (failureReason != null && shardId != null && - totalShardCopies != null && - successfulShardCopies != null) { + totalCopies != null && + successfulCopies != null) { // This is ugly but there is only one ShardsSyncedFlushResult for each shardId // so this will work. - shardsSyncedFlushResults.putIfAbsent ( - shardId, - new ShardsSyncedFlushResult( - shardId, - totalShardCopies, - successfulShardCopies, - routing == null ? failureReason : null - ) - ); - if (routing != null) { - if (failedSharedResponses.containsKey(shardId)) { - failedSharedResponses.get(shardId).put( - routing, new SyncedFlushService.ShardSyncedFlushResponse(failureReason) - ); - } else { - Map m = - new HashMap<>(); - m.put( - routing, - new SyncedFlushService.ShardSyncedFlushResponse(failureReason) - ); - failedSharedResponses.put(shardId, m); - } + if (!failures.containsKey(shardId)) { + failures.put(shardId, new ArrayList<>()); } - } else { - throw new ParsingException(startLocation, "Unable to construct ShardsSyncedFlushResult"); - } - } - for (Map.Entry entry: shardsSyncedFlushResults.entrySet()) { - ShardsSyncedFlushResult result; - if (failedSharedResponses.containsKey(entry.getKey())) { - result = new ShardsSyncedFlushResult( - entry.getValue().shardId(), - null, // syncid is null since this is a failure response - entry.getValue().totalShards(), - entry.getValue().successfulShards(), - failedSharedResponses.get(entry.getKey()) + failures.get(shardId).add( + new FailureContainer(shardId, failureReason, totalCopies, successfulCopies, routing) ); } else { - result = entry.getValue(); + throw new ParsingException(startLocation, "Unable to construct ShardsSyncedFlushResult"); } - listShardsSyncedFlushResult.add(result); } } else { parser.skipChildren(); @@ -315,17 +273,40 @@ private static SyncedFlushResponse innerFromXContent(XContentParser parser) thro if (fieldName.equals(Fields._SHARDS)) { totalShardCounts = shardCount; } else { + List results = new ArrayList<>(); + // All failures in this list belong to the same index + for (Map.Entry> entry: failures.entrySet()) { + Map shardResponses = new HashMap<>(); + for (FailureContainer container: entry.getValue()) { + if (container.shardRouting != null) { + shardResponses.put(container.shardRouting, + new SyncedFlushService.ShardSyncedFlushResponse(container.failureReason) + ); + } + } + // Size of entry.getValue() will at least be one + FailureContainer container = entry.getValue().get(0); + if (!shardResponses.isEmpty()) { + results.add( + new ShardsSyncedFlushResult(container.shardId, null, container.totalCopies, + container.successfulCopies, shardResponses) + ); + } else { + results.add( + new ShardsSyncedFlushResult(container.shardId, container.totalCopies, + container.successfulCopies, container.failureReason) + ); + } + } shardsCountsPerIndex.put(fieldName, shardCount); - shardsResultPerIndex.put(fieldName, listShardsSyncedFlushResult); + shardsResultPerIndex.put(fieldName, results); } } } else { // Else leave this tree alone parser.skipChildren(); } } - return new SyncedFlushResponse( - totalShardCounts, shardsResultPerIndex, shardsCountsPerIndex - ); + return new SyncedFlushResponse(totalShardCounts, shardsResultPerIndex, shardsCountsPerIndex); } @@ -345,6 +326,23 @@ static ShardCounts calculateShardCounts(Iterable result return new ShardCounts(total, successful, failed); } + // Only used as a container for parsing XContent + static final class FailureContainer { + ShardId shardId; + String failureReason; + ShardRouting shardRouting; + int totalCopies; + int successfulCopies; + FailureContainer(ShardId shardId, String failureReason, int totalCopies, int successfulCopies, + @Nullable ShardRouting shardRouting) { + this.shardId = shardId; + this.failureReason = failureReason; + this.shardRouting = shardRouting; + this.totalCopies = totalCopies; + this.successfulCopies = successfulCopies; + } + } + static final class ShardCounts implements ToXContentFragment, Streamable { public int total; diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/RecoverySource.java b/server/src/main/java/org/elasticsearch/cluster/routing/RecoverySource.java index f116814c1638e..a311786434e6a 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/RecoverySource.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/RecoverySource.java @@ -57,7 +57,7 @@ public final XContentBuilder toXContent(XContentBuilder builder, ToXContent.Para } public static RecoverySource fromXContent(XContentParser parser) throws IOException { - ensureExpectedToken(Token.START_OBJECT, parser.nextToken(), parser::getTokenLocation); + ensureExpectedToken(Token.START_OBJECT, parser.currentToken(), parser::getTokenLocation); XContentLocation startingPosition = parser.getTokenLocation(); Type type = null; RecoverySource recoverySource = null; @@ -67,6 +67,8 @@ public static RecoverySource fromXContent(XContentParser parser) throws IOExcept String index = null; String snapshotRepository = null; SnapshotId snapshotId = null; + String snapshotUUID = null; + String snapshotName = null; for (Token t = parser.nextToken(); t != Token.END_OBJECT; t = parser.nextToken()) { ensureExpectedToken(Token.FIELD_NAME, t, parser::getTokenLocation); String fieldName = parser.currentName(); @@ -74,6 +76,7 @@ public static RecoverySource fromXContent(XContentParser parser) throws IOExcept if (t.isValue()) { switch (fieldName) { case "type": + ensureExpectedToken(Token.VALUE_STRING, t, parser::getTokenLocation); String typeString = parser.text(); type = Type.valueOf(parser.text()); switch (type) { @@ -97,20 +100,23 @@ public static RecoverySource fromXContent(XContentParser parser) throws IOExcept } break; case "repository": + ensureExpectedToken(Token.VALUE_STRING, t, parser::getTokenLocation); snapshotRepository = parser.text(); break; case "snapshot": - String snapshotName = parser.text(); - /** - * We use the name for Id and Name using the old format here since xContent has - * old format. See {@link org.elasticsearch.snapshots.SnapshotId#fromXContent} - */ - snapshotId = new SnapshotId(snapshotName, snapshotName); + ensureExpectedToken(Token.VALUE_STRING, t, parser::getTokenLocation); + snapshotName = parser.text(); + break; + case "snapshot_uuid": + ensureExpectedToken(Token.VALUE_STRING, t, parser::getTokenLocation); + snapshotUUID = parser.text(); break; case "version": + ensureExpectedToken(Token.VALUE_STRING, t, parser::getTokenLocation); version = Version.fromString(parser.text()); break; case "index": + ensureExpectedToken(Token.VALUE_STRING, t, parser::getTokenLocation); index = parser.text(); break; default: @@ -128,7 +134,8 @@ public static RecoverySource fromXContent(XContentParser parser) throws IOExcept version != null && index != null && snapshotRepository != null && - snapshotId != null) { + snapshotName != null) { + snapshotId = new SnapshotId(snapshotName, snapshotUUID != null ? snapshotUUID : snapshotName); return new SnapshotRecoverySource(new Snapshot(snapshotRepository, snapshotId), version, index); } else { throw new ParsingException(startingPosition, "Unable to recover RecoverySource from JSON"); @@ -287,6 +294,7 @@ public Type getType() { @Override public void addAdditionalFields(XContentBuilder builder, ToXContent.Params params) throws IOException { builder.field("repository", snapshot.getRepository()) + .field("snapshot_uuid", snapshot.getSnapshotId().getUUID()) .field("snapshot", snapshot.getSnapshotId().getName()) .field("version", version.toString()) .field("index", index); diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/ShardRouting.java b/server/src/main/java/org/elasticsearch/cluster/routing/ShardRouting.java index 79cc932d0aee7..41349b684ff88 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/ShardRouting.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/ShardRouting.java @@ -71,8 +71,9 @@ public final class ShardRouting implements Writeable, ToXContentObject { * A constructor to internally create shard routing instances, note, the internal flag should only be set to true * by either this class or tests. Visible for testing. */ - ShardRouting(ShardId shardId, String currentNodeId, - String relocatingNodeId, boolean primary, ShardRoutingState state, RecoverySource recoverySource, + ShardRouting(ShardId shardId, @Nullable String currentNodeId, + @Nullable String relocatingNodeId, boolean primary, + @Nullable ShardRoutingState state, RecoverySource recoverySource, UnassignedInfo unassignedInfo, AllocationId allocationId, long expectedShardSize) { this.shardId = shardId; this.currentNodeId = currentNodeId; @@ -659,36 +660,48 @@ public static ShardRouting fromXContent(XContentParser parser) throws IOExceptio AllocationId allocationId = null; UnassignedInfo unassignedInfo = null; for (Token t = parser.nextToken(); t != Token.END_OBJECT; t = parser.nextToken()) { - ensureExpectedToken(XContentParser.Token.FIELD_NAME, parser.nextToken(), parser::getTokenLocation); + ensureExpectedToken(XContentParser.Token.FIELD_NAME, parser.currentToken(), parser::getTokenLocation); String fieldName = parser.currentName(); Token currentToken = parser.nextToken(); // Move to value of the field switch (fieldName) { case Fields.STATE: - ensureExpectedToken(currentToken, Token.VALUE_STRING, parser::getTokenLocation); - state = ShardRoutingState.valueOf(parser.text()); + if (currentToken == Token.VALUE_STRING) { + state = ShardRoutingState.valueOf(parser.text()); + } else { + // If it was not a string it must be null + ensureExpectedToken(Token.VALUE_NULL, currentToken, parser::getTokenLocation); + } break; case Fields.PRIMARY: - ensureExpectedToken(currentToken, Token.VALUE_BOOLEAN, parser::getTokenLocation); + ensureExpectedToken(Token.VALUE_BOOLEAN, currentToken, parser::getTokenLocation); isPrimary = parser.booleanValue(); break; case Fields.NODE: - ensureExpectedToken(currentToken, Token.VALUE_STRING, parser::getTokenLocation); - nodeId = parser.text(); + if (currentToken == Token.VALUE_STRING) { + nodeId = parser.text(); + } else { + // If it was not a string it must be null + ensureExpectedToken(Token.VALUE_NULL, currentToken, parser::getTokenLocation); + } break; case Fields.RELOCATING_NODE: - ensureExpectedToken(currentToken, Token.VALUE_STRING, parser::getTokenLocation); - relocatingNodeid = parser.text(); + if (currentToken == Token.VALUE_STRING) { + relocatingNodeid = parser.text(); + } else { + // If it was not a string it must be null + ensureExpectedToken(Token.VALUE_NULL, currentToken, parser::getTokenLocation); + } break; case Fields.SHARD: - ensureExpectedToken(currentToken, Token.VALUE_NUMBER, parser::getTokenLocation); + ensureExpectedToken(Token.VALUE_NUMBER, currentToken, parser::getTokenLocation); shardId = parser.intValue(); break; case Fields.INDEX: - ensureExpectedToken(currentToken, Token.VALUE_STRING, parser::getTokenLocation); + ensureExpectedToken(Token.VALUE_STRING, currentToken, parser::getTokenLocation); indexName = parser.text(); break; case Fields.EXPECTED_SHARD_SIZE_IN_BYTES: - ensureExpectedToken(currentToken, Token.VALUE_STRING, parser::getTokenLocation); + ensureExpectedToken(Token.VALUE_STRING, currentToken, parser::getTokenLocation); expectedShardSizeInBytes = parser.longValue(); break; case Fields.RECOVERY_SOURCE: @@ -708,8 +721,6 @@ public static ShardRouting fromXContent(XContentParser parser) throws IOExceptio } if (state != null && isPrimary != null && - nodeId != null && - relocatingNodeid != null && shardId != null && indexName != null) { return diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/UnassignedInfo.java b/server/src/main/java/org/elasticsearch/cluster/routing/UnassignedInfo.java index 5440f2a5683ab..dbc36de37dfe4 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/UnassignedInfo.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/UnassignedInfo.java @@ -35,7 +35,6 @@ import org.elasticsearch.common.settings.Setting.Property; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; -import org.elasticsearch.common.xcontent.ToXContent.Params; import org.elasticsearch.common.xcontent.ToXContentFragment; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentLocation; @@ -56,6 +55,8 @@ public final class UnassignedInfo implements ToXContentFragment, Writeable { public static final FormatDateTimeFormatter DATE_TIME_FORMATTER = Joda.forPattern("dateOptionalTime"); + private static final String MSG_DELIMITER = ","; + public static final Setting INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING = Setting.positiveTimeSetting("index.unassigned.node_left.delayed_timeout", TimeValue.timeValueMinutes(1), Property.Dynamic, Property.IndexScope); @@ -354,7 +355,7 @@ public String getDetails() { if (message == null) { return null; } - return message + (failure == null ? "" : ", failure " + ExceptionsHelper.detailedMessage(failure)); + return message + (failure == null ? "" : MSG_DELIMITER + " failure " + ExceptionsHelper.detailedMessage(failure)); } /** @@ -457,9 +458,9 @@ public static UnassignedInfo fromXContent(XContentParser parser) throws IOExcept ensureExpectedToken(Token.START_OBJECT, parser.currentToken(), parser::getTokenLocation); XContentLocation startingLocation = parser.getTokenLocation(); Reason reason = null; - // The message and exception always remains null as constructing an exception from 'details' - // is too much work String message = null; + // The exception always remains null as constructing an exception from 'details' + // is too much work Exception failure = null; int failedAllocations = 0; // See UnassignedInfo(StreamInput in) constructor for details on why we reset the time here @@ -475,7 +476,7 @@ public static UnassignedInfo fromXContent(XContentParser parser) throws IOExcept switch (fieldName) { case Fields.REASON: ensureExpectedToken(Token.VALUE_STRING, t, parser::getTokenLocation); - reason = Reason.valueOf(parser.text()); + reason = Reason.valueOf(parser.text().toUpperCase(Locale.ROOT)); break; case Fields.AT: ensureExpectedToken(Token.VALUE_STRING, t, parser::getTokenLocation); @@ -491,12 +492,16 @@ public static UnassignedInfo fromXContent(XContentParser parser) throws IOExcept delayed = parser.booleanValue(); break; case Fields.DETAILS: - // For now we ignore this and set it null - ensureExpectedToken(Token.VALUE_BOOLEAN, t, parser::getTokenLocation); + ensureExpectedToken(Token.VALUE_STRING, t, parser::getTokenLocation); + // We ignore the exception but take the message out + // This only works if the message itself did not contain the delimiter + // The length of the resulting array from split can never be smaller than 1 + // The pattern MSG_DELIMITER is applied limit-1 times which serves our purpose + message = parser.text().split(MSG_DELIMITER, 2)[0]; break; case Fields.ALLOCATION_STATUS: ensureExpectedToken(Token.VALUE_STRING, t, parser::getTokenLocation); - allocationStatus = AllocationStatus.valueOf(parser.text()); + allocationStatus = AllocationStatus.valueOf(parser.text().toUpperCase(Locale.ROOT)); break; default: parser.skipChildren(); // else skip the whole tree with this fieldname @@ -508,7 +513,7 @@ public static UnassignedInfo fromXContent(XContentParser parser) throws IOExcept delayed != null && allocationStatus != null ) { - return new UnassignedInfo(reason, null, null, failedAllocations, unassignedTimeNanos, + return new UnassignedInfo(reason, message, null, failedAllocations, unassignedTimeNanos, unassignedTimeMillis, delayed, allocationStatus); } else { throw new ParsingException(startingLocation, "Unable to construct UnassignedInfo from JSON"); diff --git a/server/src/main/java/org/elasticsearch/indices/flush/ShardsSyncedFlushResult.java b/server/src/main/java/org/elasticsearch/indices/flush/ShardsSyncedFlushResult.java index 0c936d870f24f..109e0eb7c2721 100644 --- a/server/src/main/java/org/elasticsearch/indices/flush/ShardsSyncedFlushResult.java +++ b/server/src/main/java/org/elasticsearch/indices/flush/ShardsSyncedFlushResult.java @@ -77,7 +77,7 @@ public ShardsSyncedFlushResult(ShardId shardId, String syncId, int totalShards, } /** - * XContent constructor: Partial failure + * failure constructor for XContent deserialization */ public ShardsSyncedFlushResult(ShardId shardId, int totalShards, int successfulShards, String failureReason) { this.syncId = null; @@ -89,7 +89,7 @@ public ShardsSyncedFlushResult(ShardId shardId, int totalShards, int successfulS } /** - * XContent constructor: Partial failure with shardResponses + * success contructor for XContent deserialization */ public ShardsSyncedFlushResult(ShardId shardId, String syncId, int totalShards, int successfulShards, Map shardResponses) { diff --git a/server/src/test/java/org/elasticsearch/action/admin/indices/flush/AbstractSyncedFlushTest.java b/server/src/test/java/org/elasticsearch/action/admin/indices/flush/AbstractSyncedFlushTest.java new file mode 100644 index 0000000000000..b35e205176e7e --- /dev/null +++ b/server/src/test/java/org/elasticsearch/action/admin/indices/flush/AbstractSyncedFlushTest.java @@ -0,0 +1,109 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.action.admin.indices.flush; + +import com.carrotsearch.hppc.ObjectIntHashMap; +import com.carrotsearch.hppc.ObjectIntMap; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.elasticsearch.cluster.routing.ShardRouting; +import org.elasticsearch.cluster.routing.ShardRoutingState; +import org.elasticsearch.cluster.routing.TestShardRouting; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.indices.flush.ShardsSyncedFlushResult; +import org.elasticsearch.indices.flush.SyncedFlushService; +import org.elasticsearch.test.ESTestCase; + +public abstract class AbstractSyncedFlushTest extends ESTestCase { + protected static class TestPlan { + public SyncedFlushResponse.ShardCounts totalCounts; + public Map countsPerIndex = new HashMap<>(); + public ObjectIntMap expectedFailuresPerIndex = new ObjectIntHashMap<>(); + public SyncedFlushResponse result; + } + + protected TestPlan createTestPlan() { + final TestPlan testPlan = new TestPlan(); + final Map> indicesResults = new HashMap<>(); + final int indexCount = randomIntBetween(1, 10); + int totalShards = 0; + int totalSuccesful = 0; + int totalFailed = 0; + for (int i = 0; i < indexCount; i++) { + final String index = "index_" + i; + int shards = randomIntBetween(1, 4); + int replicas = randomIntBetween(0, 2); + int successful = 0; + int failed = 0; + int failures = 0; + List shardsResults = new ArrayList<>(); + for (int shard = 0; shard < shards; shard++) { + final ShardId shardId = new ShardId(index, "_na_", shard); + if (randomInt(5) < 2) { + // total shard failure + failed += replicas + 1; + failures++; + shardsResults.add(new ShardsSyncedFlushResult(shardId, replicas + 1, "simulated total failure")); + } else { + Map shardResponses = new HashMap<>(); + for (int copy = 0; copy < replicas + 1; copy++) { + final ShardRouting shardRouting = + TestShardRouting.newShardRouting( + index, shard, "node_" + shardId + "_" + copy, null, + copy == 0, ShardRoutingState.STARTED + ); + if (randomInt(5) < 2) { + // shard copy failure + failed++; + failures++; + shardResponses.put(shardRouting, new SyncedFlushService.ShardSyncedFlushResponse("copy failure " + shardId)); + } else { + successful++; + shardResponses.put(shardRouting, new SyncedFlushService.ShardSyncedFlushResponse()); + } + } + shardsResults.add(new ShardsSyncedFlushResult(shardId, "_sync_id_" + shard, replicas + 1, shardResponses)); + } + } + indicesResults.put(index, shardsResults); + testPlan.countsPerIndex.put(index, new SyncedFlushResponse.ShardCounts(shards * (replicas + 1), successful, failed)); + testPlan.expectedFailuresPerIndex.put(index, failures); + totalFailed += failed; + totalShards += shards * (replicas + 1); + totalSuccesful += successful; + } + testPlan.result = new SyncedFlushResponse(indicesResults); + testPlan.totalCounts = new SyncedFlushResponse.ShardCounts(totalShards, totalSuccesful, totalFailed); + return testPlan; + } + + public void assertShardCounts(SyncedFlushResponse.ShardCounts first, SyncedFlushResponse.ShardCounts second) { + if (first == null) { + assertNull(second); + } else { + assertNotNull(second); + assertEquals(first.successful, second.successful); + assertEquals(first.failed, second.failed); + assertEquals(first.total, second.total); + } + } +} diff --git a/server/src/test/java/org/elasticsearch/action/admin/indices/flush/SyncedFlushResponseUnitTests.java b/server/src/test/java/org/elasticsearch/action/admin/indices/flush/SyncedFlushResponseUnitTests.java new file mode 100644 index 0000000000000..375c7f4e63c18 --- /dev/null +++ b/server/src/test/java/org/elasticsearch/action/admin/indices/flush/SyncedFlushResponseUnitTests.java @@ -0,0 +1,90 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.action.admin.indices.flush; + +import java.io.IOException; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.elasticsearch.cluster.routing.ShardRouting; +import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.xcontent.LoggingDeprecationHandler; +import org.elasticsearch.common.xcontent.ToXContent; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.indices.flush.ShardsSyncedFlushResult; +import org.elasticsearch.indices.flush.SyncedFlushService; + +public class SyncedFlushResponseUnitTests extends AbstractSyncedFlushTest { + + public void testXContentSerialization() throws IOException { + final XContentType xContentType = randomFrom(XContentType.values()); + XContentBuilder builder = XContentBuilder.builder(xContentType.xContent()); + TestPlan plan = createTestPlan(); + SyncedFlushResponse response = plan.result; + assertNotNull(response); + builder.startObject(); + response.toXContent(builder, ToXContent.EMPTY_PARAMS); + builder.endObject(); + XContentParser parser = builder + .generator() + .contentType() + .xContent() + .createParser( + xContentRegistry(), LoggingDeprecationHandler.INSTANCE, BytesReference.bytes(builder).streamInput() + ); + SyncedFlushResponse parsedResponse = SyncedFlushResponse.fromXContent(parser); + assertNotNull(parsedResponse); + assertShardCounts(response.shardCounts, parsedResponse.shardCounts); + for (Map.Entry entry: response.getShardCountsPerIndex().entrySet()) { + assertShardCounts(entry.getValue(), parsedResponse.shardCountsPerIndex.get(entry.getKey())); + List responseResults = response.shardsResultPerIndex.get(entry.getKey()); + List parsedResults = parsedResponse.shardsResultPerIndex.get(entry.getKey()); + assertNotNull(responseResults); + assertNotNull(parsedResults); + if (entry.getValue().failed > 0) { + // build a map for each shardId and compare total_copies and successful_copies and failed_copies + Map parsedResponseMap = new HashMap<>(); + for (ShardsSyncedFlushResult parsedResponseResult: parsedResults) { + parsedResponseMap.put(parsedResponseResult.shardId().id(), parsedResponseResult); + } + // We are just trying to perform a hash join here on the two lists based on shardId + for (ShardsSyncedFlushResult responseResult: responseResults) { + Map responseFailedShards = + responseResult.failedShards(); + // After deserialization we lose information of successful shards + if (responseResult.failed() || responseFailedShards.size() > 0) { + ShardsSyncedFlushResult parsedResponseResult = parsedResponseMap.get(responseResult.shardId().id()); + Map parsedFailedShards = + parsedResponseResult.failedShards(); + assertNotNull(parsedResponseResult); + assertEquals(responseResult.totalShards(), parsedResponseResult.totalShards()); + assertEquals(responseResult.successfulShards(), parsedResponseResult.successfulShards()); + assertEquals(responseResult.failureReason(), parsedResponseResult.failureReason()); + assertEquals(responseFailedShards.size(), parsedFailedShards.size()); + } + } + } + } + // We skip shard routing information here. Separate tests for shard routing verification exist + // in ShardRoutingTests + } + +} diff --git a/server/src/test/java/org/elasticsearch/action/admin/indices/flush/SyncedFlushUnitTests.java b/server/src/test/java/org/elasticsearch/action/admin/indices/flush/SyncedFlushUnitTests.java index 7040c92ec1d27..2bb100b4b5633 100644 --- a/server/src/test/java/org/elasticsearch/action/admin/indices/flush/SyncedFlushUnitTests.java +++ b/server/src/test/java/org/elasticsearch/action/admin/indices/flush/SyncedFlushUnitTests.java @@ -19,23 +19,15 @@ package org.elasticsearch.action.admin.indices.flush; -import com.carrotsearch.hppc.ObjectIntHashMap; -import com.carrotsearch.hppc.ObjectIntMap; import org.elasticsearch.action.admin.indices.flush.SyncedFlushResponse.ShardCounts; import org.elasticsearch.cluster.routing.ShardRouting; -import org.elasticsearch.cluster.routing.ShardRoutingState; -import org.elasticsearch.cluster.routing.TestShardRouting; import org.elasticsearch.common.io.stream.BytesStreamOutput; import org.elasticsearch.common.io.stream.StreamInput; -import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.indices.flush.ShardsSyncedFlushResult; import org.elasticsearch.indices.flush.SyncedFlushService; import org.elasticsearch.rest.RestStatus; -import org.elasticsearch.test.ESTestCase; import java.io.IOException; -import java.util.ArrayList; -import java.util.HashMap; import java.util.List; import java.util.Map; @@ -43,14 +35,7 @@ import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.hasSize; -public class SyncedFlushUnitTests extends ESTestCase { - - private static class TestPlan { - public SyncedFlushResponse.ShardCounts totalCounts; - public Map countsPerIndex = new HashMap<>(); - public ObjectIntMap expectedFailuresPerIndex = new ObjectIntHashMap<>(); - public SyncedFlushResponse result; - } +public class SyncedFlushUnitTests extends AbstractSyncedFlushTest { public void testIndicesSyncedFlushResult() throws IOException { final TestPlan testPlan = createTestPlan(); @@ -132,56 +117,4 @@ private void assertShardCount(String name, Map header, ShardCoun assertThat(name + " has unexpected failed count", (Integer) header.get("failed"), equalTo(expectedCounts.failed)); } - protected TestPlan createTestPlan() { - final TestPlan testPlan = new TestPlan(); - final Map> indicesResults = new HashMap<>(); - final int indexCount = randomIntBetween(1, 10); - int totalShards = 0; - int totalSuccesful = 0; - int totalFailed = 0; - for (int i = 0; i < indexCount; i++) { - final String index = "index_" + i; - int shards = randomIntBetween(1, 4); - int replicas = randomIntBetween(0, 2); - int successful = 0; - int failed = 0; - int failures = 0; - List shardsResults = new ArrayList<>(); - for (int shard = 0; shard < shards; shard++) { - final ShardId shardId = new ShardId(index, "_na_", shard); - if (randomInt(5) < 2) { - // total shard failure - failed += replicas + 1; - failures++; - shardsResults.add(new ShardsSyncedFlushResult(shardId, replicas + 1, "simulated total failure")); - } else { - Map shardResponses = new HashMap<>(); - for (int copy = 0; copy < replicas + 1; copy++) { - final ShardRouting shardRouting = TestShardRouting.newShardRouting(index, shard, "node_" + shardId + "_" + copy, null, - copy == 0, ShardRoutingState.STARTED); - if (randomInt(5) < 2) { - // shard copy failure - failed++; - failures++; - shardResponses.put(shardRouting, new SyncedFlushService.ShardSyncedFlushResponse("copy failure " + shardId)); - } else { - successful++; - shardResponses.put(shardRouting, new SyncedFlushService.ShardSyncedFlushResponse()); - } - } - shardsResults.add(new ShardsSyncedFlushResult(shardId, "_sync_id_" + shard, replicas + 1, shardResponses)); - } - } - indicesResults.put(index, shardsResults); - testPlan.countsPerIndex.put(index, new SyncedFlushResponse.ShardCounts(shards * (replicas + 1), successful, failed)); - testPlan.expectedFailuresPerIndex.put(index, failures); - totalFailed += failed; - totalShards += shards * (replicas + 1); - totalSuccesful += successful; - } - testPlan.result = new SyncedFlushResponse(indicesResults); - testPlan.totalCounts = new SyncedFlushResponse.ShardCounts(totalShards, totalSuccesful, totalFailed); - return testPlan; - } - } diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/RecoverySourceTests.java b/server/src/test/java/org/elasticsearch/cluster/routing/RecoverySourceTests.java index 1929c15f7d5ce..cc82ee35727b1 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/RecoverySourceTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/RecoverySourceTests.java @@ -19,7 +19,13 @@ package org.elasticsearch.cluster.routing; +import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.io.stream.BytesStreamOutput; +import org.elasticsearch.common.xcontent.LoggingDeprecationHandler; +import org.elasticsearch.common.xcontent.ToXContent; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.test.ESTestCase; import java.io.IOException; @@ -38,6 +44,25 @@ public void testSerialization() throws IOException { assertEquals(recoverySource, serializedRecoverySource); } + public void testXContentSerialization() throws IOException { + final XContentType xContentType = randomFrom(XContentType.values()); + RecoverySource original = TestShardRouting.randomRecoverySource(); + assertNotNull(original); + XContentBuilder builder = XContentBuilder.builder(xContentType.xContent()); + original.toXContent(builder, ToXContent.EMPTY_PARAMS); + XContentParser parser = builder + .generator() + .contentType() + .xContent() + .createParser( + xContentRegistry(), LoggingDeprecationHandler.INSTANCE, BytesReference.bytes(builder).streamInput() + ); + parser.nextToken(); // Move it to the first token + RecoverySource deserealized = RecoverySource.fromXContent(parser); + assertEquals(original.getType(), deserealized.getType()); + assertEquals(original, deserealized); + } + public void testRecoverySourceTypeOrder() { assertEquals(RecoverySource.Type.EMPTY_STORE.ordinal(), 0); assertEquals(RecoverySource.Type.EXISTING_STORE.ordinal(), 1); diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/ShardRoutingTests.java b/server/src/test/java/org/elasticsearch/cluster/routing/ShardRoutingTests.java index f87f918d99ecc..2d7d57482c56a 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/ShardRoutingTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/ShardRoutingTests.java @@ -21,6 +21,12 @@ import org.elasticsearch.Version; import org.elasticsearch.common.UUIDs; +import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.xcontent.LoggingDeprecationHandler; +import org.elasticsearch.common.xcontent.ToXContent; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.index.Index; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.snapshots.SnapshotId; @@ -233,4 +239,32 @@ public void testExpectedSize() throws IOException { } } } + + public void testXContentSerialization() throws IOException { + final XContentType xContentType = randomFrom(XContentType.values()); + ShardRouting routing = randomShardRouting("index", randomInt(5)); + assertNotNull(routing); + XContentBuilder builder = XContentBuilder.builder(xContentType.xContent()); + routing.toXContent(builder, ToXContent.EMPTY_PARAMS); + XContentParser parser = builder + .generator() + .contentType() + .xContent() + .createParser( + xContentRegistry(), LoggingDeprecationHandler.INSTANCE, BytesReference.bytes(builder).streamInput() + ); + parser.nextToken(); + ShardRouting deserializedRouting = ShardRouting.fromXContent(parser); + assertNotNull(deserializedRouting); + assertEquals(routing.state(), deserializedRouting.state()); + assertEquals(routing.primary(), deserializedRouting.primary()); + assertEquals(routing.currentNodeId(), deserializedRouting.currentNodeId()); + assertEquals(routing.relocatingNodeId(), deserializedRouting.relocatingNodeId()); + assertEquals(routing.id(), deserializedRouting.id()); // Check if shardId is equals + assertEquals(routing.getIndexName(), deserializedRouting.getIndexName()); + assertEquals(routing.getExpectedShardSize(), deserializedRouting.getExpectedShardSize()); + assertEquals(routing.allocationId(), deserializedRouting.allocationId()); + // We skip RecoverySource and UnassignedInfo here. Will test the serialization for them in + // a different class + } } diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/UnassignedInfoTests.java b/server/src/test/java/org/elasticsearch/cluster/routing/UnassignedInfoTests.java index d8f7f6552f908..10d2907ae07b8 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/UnassignedInfoTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/UnassignedInfoTests.java @@ -33,10 +33,16 @@ import org.elasticsearch.cluster.routing.allocation.AllocationService; import org.elasticsearch.cluster.routing.allocation.FailedShard; import org.elasticsearch.common.UUIDs; +import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.io.stream.ByteBufferStreamInput; import org.elasticsearch.common.io.stream.BytesStreamOutput; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.xcontent.LoggingDeprecationHandler; +import org.elasticsearch.common.xcontent.ToXContent; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.index.Index; import org.elasticsearch.snapshots.Snapshot; import org.elasticsearch.snapshots.SnapshotId; @@ -78,11 +84,7 @@ public void testReasonOrdinalOrder() { } public void testSerialization() throws Exception { - UnassignedInfo.Reason reason = RandomPicks.randomFrom(random(), UnassignedInfo.Reason.values()); - UnassignedInfo meta = reason == UnassignedInfo.Reason.ALLOCATION_FAILED ? - new UnassignedInfo(reason, randomBoolean() ? randomAlphaOfLength(4) : null, null, randomIntBetween(1, 100), System.nanoTime(), - System.currentTimeMillis(), false, AllocationStatus.NO_ATTEMPT): - new UnassignedInfo(reason, randomBoolean() ? randomAlphaOfLength(4) : null); + UnassignedInfo meta = createRandom(); BytesStreamOutput out = new BytesStreamOutput(); meta.writeTo(out); out.close(); @@ -95,6 +97,41 @@ public void testSerialization() throws Exception { assertThat(read.getNumFailedAllocations(), equalTo(meta.getNumFailedAllocations())); } + public void testXContentSerialization() throws Exception { + final XContentType xContentType = randomFrom(XContentType.values()); + UnassignedInfo original = createRandom(); + assertNotNull(original); + XContentBuilder builder = XContentBuilder.builder(xContentType.xContent()); + builder.startObject(); + original.toXContent(builder, ToXContent.EMPTY_PARAMS); + builder.endObject(); + XContentParser parser = builder + .generator() + .contentType() + .xContent() + .createParser( + xContentRegistry(), LoggingDeprecationHandler.INSTANCE, BytesReference.bytes(builder).streamInput() + ); + parser.nextToken(); // move to the outer object + parser.nextToken(); // move to the field name + assertEquals(XContentParser.Token.FIELD_NAME, parser.currentToken()); + parser.nextToken(); // move to inner object object and now start parsing + UnassignedInfo deserialized = UnassignedInfo.fromXContent(parser); + assertThat(deserialized.getReason(), equalTo(original.getReason())); + assertThat(deserialized.getUnassignedTimeInMillis(), equalTo(original.getUnassignedTimeInMillis())); + assertThat(deserialized.getMessage(), equalTo(original.getMessage())); + assertThat(deserialized.getDetails(), equalTo(original.getDetails())); + assertThat(deserialized.getNumFailedAllocations(), equalTo(original.getNumFailedAllocations())); + } + + private UnassignedInfo createRandom() { + UnassignedInfo.Reason reason = RandomPicks.randomFrom(random(), UnassignedInfo.Reason.values()); + return reason == UnassignedInfo.Reason.ALLOCATION_FAILED ? + new UnassignedInfo(reason, randomBoolean() ? randomAlphaOfLength(4) : null, null, randomIntBetween(1, 100), System.nanoTime(), + System.currentTimeMillis(), false, AllocationStatus.NO_ATTEMPT): + new UnassignedInfo(reason, randomBoolean() ? randomAlphaOfLength(4) : null); + } + public void testIndexCreated() { MetaData metaData = MetaData.builder() .put(IndexMetaData.builder("test").settings(settings(Version.CURRENT)).numberOfShards(randomIntBetween(1, 3)).numberOfReplicas(randomIntBetween(0, 3))) From 723300bb98129aa6ef491d4bc3767548b42e75a5 Mon Sep 17 00:00:00 2001 From: Sohaib Iftikhar Date: Mon, 30 Apr 2018 23:05:41 +0200 Subject: [PATCH 3/4] Added integration tests MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit — Split the `toXContent` method into multiple methods --- .../elasticsearch/client/IndicesClient.java | 4 +- .../org/elasticsearch/client/Request.java | 3 +- .../elasticsearch/client/IndicesClientIT.java | 4 +- .../elasticsearch/client/RequestTests.java | 4 +- .../IndicesClientDocumentationIT.java | 91 +++++ .../high-level/indices/flush_synced.asciidoc | 93 +++++ .../high-level/supported-apis.asciidoc | 2 + .../indices/flush/SyncedFlushResponse.java | 319 ++++++++++++------ .../cluster/routing/ShardRouting.java | 65 ++-- .../flush/SyncedFlushResponseTests.java | 75 ++++ .../flush/SyncedFlushResponseUnitTests.java | 90 ----- 11 files changed, 518 insertions(+), 232 deletions(-) create mode 100644 docs/java-rest/high-level/indices/flush_synced.asciidoc create mode 100644 server/src/test/java/org/elasticsearch/action/admin/indices/flush/SyncedFlushResponseTests.java delete mode 100644 server/src/test/java/org/elasticsearch/action/admin/indices/flush/SyncedFlushResponseUnitTests.java diff --git a/client/rest-high-level/src/main/java/org/elasticsearch/client/IndicesClient.java b/client/rest-high-level/src/main/java/org/elasticsearch/client/IndicesClient.java index 27dabac538ed1..782ea2f88f2bb 100644 --- a/client/rest-high-level/src/main/java/org/elasticsearch/client/IndicesClient.java +++ b/client/rest-high-level/src/main/java/org/elasticsearch/client/IndicesClient.java @@ -270,7 +270,7 @@ public void flushAsync(FlushRequest flushRequest, ActionListener * See * Synced flush API on elastic.co */ - public SyncedFlushResponse syncedFlush(SyncedFlushRequest syncedFlushRequest, Header... headers) throws IOException { + public SyncedFlushResponse flushSynced(SyncedFlushRequest syncedFlushRequest, Header... headers) throws IOException { return restHighLevelClient.performRequestAndParseEntity(syncedFlushRequest, Request::syncedFlush, SyncedFlushResponse::fromXContent, emptySet(), headers); } @@ -281,7 +281,7 @@ public SyncedFlushResponse syncedFlush(SyncedFlushRequest syncedFlushRequest, He * See * Synced flush API on elastic.co */ - public void syncedFlushAsync(SyncedFlushRequest syncedFlushRequest, ActionListener listener, Header... headers) { + public void flushSyncedAsync(SyncedFlushRequest syncedFlushRequest, ActionListener listener, Header... headers) { restHighLevelClient.performRequestAsyncAndParseEntity(syncedFlushRequest, Request::syncedFlush, SyncedFlushResponse::fromXContent, listener, emptySet(), headers); } diff --git a/client/rest-high-level/src/main/java/org/elasticsearch/client/Request.java b/client/rest-high-level/src/main/java/org/elasticsearch/client/Request.java index d499fb83d45a6..6676326b713bc 100755 --- a/client/rest-high-level/src/main/java/org/elasticsearch/client/Request.java +++ b/client/rest-high-level/src/main/java/org/elasticsearch/client/Request.java @@ -237,9 +237,8 @@ static Request flush(FlushRequest flushRequest) { static Request syncedFlush(SyncedFlushRequest syncedFlushRequest) { String[] indices = syncedFlushRequest.indices() == null ? Strings.EMPTY_ARRAY : syncedFlushRequest.indices(); - String endpoint = endpoint(indices, "_flush", "synced"); + String endpoint = endpoint(indices, "_flush/synced"); Params syncedFlushparameters = Params.builder(); - // This request takes no other parameters other than the indices. syncedFlushparameters.withIndicesOptions(syncedFlushRequest.indicesOptions()); return new Request(HttpPost.METHOD_NAME, endpoint, syncedFlushparameters.getParams(), null); } diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/IndicesClientIT.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/IndicesClientIT.java index a6b0b8bfa1d25..0c130e35ce16d 100755 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/IndicesClientIT.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/IndicesClientIT.java @@ -454,7 +454,7 @@ public void testSyncedFlush() throws IOException { createIndex(index, settings); SyncedFlushRequest syncedFlushRequest = new SyncedFlushRequest(index); SyncedFlushResponse flushResponse = - execute(syncedFlushRequest, highLevelClient().indices()::syncedFlush, highLevelClient().indices()::syncedFlushAsync); + execute(syncedFlushRequest, highLevelClient().indices()::flushSynced, highLevelClient().indices()::flushSyncedAsync); assertThat(flushResponse.totalShards(), equalTo(1)); assertThat(flushResponse.successfulShards(), equalTo(1)); assertThat(flushResponse.failedShards(), equalTo(0)); @@ -465,7 +465,7 @@ public void testSyncedFlush() throws IOException { assertFalse(indexExists(nonExistentIndex)); SyncedFlushRequest syncedFlushRequest = new SyncedFlushRequest(nonExistentIndex); ElasticsearchException exception = expectThrows(ElasticsearchException.class, - () -> execute(syncedFlushRequest, highLevelClient().indices()::syncedFlush, highLevelClient().indices()::syncedFlushAsync)); + () -> execute(syncedFlushRequest, highLevelClient().indices()::flushSynced, highLevelClient().indices()::flushSyncedAsync)); assertEquals(RestStatus.NOT_FOUND, exception.status()); } } diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/RequestTests.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/RequestTests.java index bfdb348421ee3..d480c39a8f1be 100755 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/RequestTests.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/RequestTests.java @@ -634,14 +634,12 @@ public void testSyncedFlush() { } Map expectedParams = new HashMap<>(); setRandomIndicesOptions(syncedFlushRequest::indicesOptions, syncedFlushRequest::indicesOptions, expectedParams); - Request request = Request.syncedFlush(syncedFlushRequest); StringJoiner endpoint = new StringJoiner("/", "/", ""); if (indices != null && indices.length > 0) { endpoint.add(String.join(",", indices)); } - endpoint.add("_flush"); - endpoint.add("synced"); + endpoint.add("_flush/synced"); assertThat(request.getEndpoint(), equalTo(endpoint.toString())); assertThat(request.getParameters(), equalTo(expectedParams)); assertThat(request.getEntity(), nullValue()); diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/IndicesClientDocumentationIT.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/IndicesClientDocumentationIT.java index bc6946eb2dc7f..f4539fb0c65c7 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/IndicesClientDocumentationIT.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/IndicesClientDocumentationIT.java @@ -37,6 +37,8 @@ import org.elasticsearch.action.admin.indices.delete.DeleteIndexResponse; import org.elasticsearch.action.admin.indices.flush.FlushRequest; import org.elasticsearch.action.admin.indices.flush.FlushResponse; +import org.elasticsearch.action.admin.indices.flush.SyncedFlushRequest; +import org.elasticsearch.action.admin.indices.flush.SyncedFlushResponse; import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeRequest; import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeResponse; import org.elasticsearch.action.admin.indices.get.GetIndexRequest; @@ -56,6 +58,7 @@ import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.client.ESRestHighLevelClientTestCase; import org.elasticsearch.client.RestHighLevelClient; +import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; @@ -64,6 +67,7 @@ import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.index.query.QueryBuilders; +import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.rest.RestStatus; import java.io.IOException; @@ -773,6 +777,93 @@ public void onFailure(Exception e) { } } + public void testSyncedFlushIndex() throws Exception { + RestHighLevelClient client = highLevelClient(); + + { + createIndex("index1", Settings.EMPTY); + } + + { + // tag::flush-synced-request + SyncedFlushRequest request = new SyncedFlushRequest("index1"); // <1> + SyncedFlushRequest requestMultiple = new SyncedFlushRequest("index1", "index2"); // <2> + SyncedFlushRequest requestAll = new SyncedFlushRequest(); // <3> + // end::flush-synced-request + + // tag::flush-synced-request-indicesOptions + request.indicesOptions(IndicesOptions.lenientExpandOpen()); // <1> + // end::flush-synced-request-indicesOptions + + // tag::flush-synced-execute + SyncedFlushResponse flushSyncedResponse = client.indices().flushSynced(request); + // end::flush-synced-execute + + // tag::flush-synced-response + int totalShards = flushSyncedResponse.totalShards(); // <1> + int successfulShards = flushSyncedResponse.successfulShards(); // <2> + int failedShards = flushSyncedResponse.failedShards(); // <3> + + for (Map.Entry responsePerIndexEntry: + flushSyncedResponse.getResponsePerIndex().entrySet()) { + String indexName = responsePerIndexEntry.getKey(); // <4> + SyncedFlushResponse.FlushSyncedResponsePerIndex responsePerIndex = responsePerIndexEntry.getValue(); + int totalShardsForIndex = responsePerIndex.getTotalShards(); // <5> + int successfulShardsForIndex = responsePerIndex.getSuccessfulShards(); // <6> + int failedShardsForIndex = responsePerIndex.getFailedShards(); // <7> + if (failedShardsForIndex > 0) { + for (Map.Entry failureEntry: + responsePerIndex.getShardFailures().entrySet()) { + int shardId = failureEntry.getKey().id(); // <8> + int totalCopies = failureEntry.getValue().getTotalCopies(); // <9> + int successfulCopies = failureEntry.getValue().getSuccessfulCopies(); // <10> + int failedCopies = failureEntry.getValue().getFailedCopies(); // <11> + String failureReason = failureEntry.getValue().getFailureReason(); // <12> + ShardRouting routing = failureEntry.getValue().getShardRouting(); // <13> + } + } + } + // end::flush-synced-response + + // tag::flush-synced-execute-listener + ActionListener listener = new ActionListener() { + @Override + public void onResponse(SyncedFlushResponse refreshResponse) { + // <1> + } + + @Override + public void onFailure(Exception e) { + // <2> + } + }; + // end::flush-synced-execute-listener + + // Replace the empty listener by a blocking listener in test + final CountDownLatch latch = new CountDownLatch(1); + listener = new LatchedActionListener<>(listener, latch); + + // tag::flush-synced-execute-async + client.indices().flushSyncedAsync(request, listener); // <1> + // end::flush-synced-execute-async + + assertTrue(latch.await(30L, TimeUnit.SECONDS)); + } + + { + // tag::flush-synced-notfound + try { + SyncedFlushRequest request = new SyncedFlushRequest("does_not_exist"); + client.indices().flushSynced(request); + } catch (ElasticsearchException exception) { + if (exception.status() == RestStatus.NOT_FOUND) { + // <1> + } + } + // end::flush-synced-notfound + } + } + public void testForceMergeIndex() throws Exception { RestHighLevelClient client = highLevelClient(); diff --git a/docs/java-rest/high-level/indices/flush_synced.asciidoc b/docs/java-rest/high-level/indices/flush_synced.asciidoc new file mode 100644 index 0000000000000..15cb0165502e5 --- /dev/null +++ b/docs/java-rest/high-level/indices/flush_synced.asciidoc @@ -0,0 +1,93 @@ +[[java-rest-high-flush]] +=== Flush Synced API + +[[java-rest-high-flush-synced-request]] +==== Flush Synced Request + +A `SyncedFlushRequest` can be applied to one or more indices, or even on `_all` the indices: + +["source","java",subs="attributes,callouts,macros"] +-------------------------------------------------- +include-tagged::{doc-tests}/IndicesClientDocumentationIT.java[flush-synced-request] +-------------------------------------------------- +<1> Flush synced one index +<2> Flush synced multiple indices +<3> Flush synced all the indices + +==== Optional arguments + +["source","java",subs="attributes,callouts,macros"] +-------------------------------------------------- +include-tagged::{doc-tests}/IndicesClientDocumentationIT.java[flush-synced-request-indicesOptions] +-------------------------------------------------- +<1> Setting `IndicesOptions` controls how unavailable indices are resolved and +how wildcard expressions are expanded + +[[java-rest-high-flush-synced-sync]] +==== Synchronous Execution + +["source","java",subs="attributes,callouts,macros"] +-------------------------------------------------- +include-tagged::{doc-tests}/IndicesClientDocumentationIT.java[flush-synced-execute] +-------------------------------------------------- + +[[java-rest-high-flush-synced-async]] +==== Asynchronous Execution + +The asynchronous execution of a flush request requires both the `SyncedFlushRequest` +instance and an `ActionListener` instance to be passed to the asynchronous +method: + +["source","java",subs="attributes,callouts,macros"] +-------------------------------------------------- +include-tagged::{doc-tests}/IndicesClientDocumentationIT.java[flush-synced-execute-async] +-------------------------------------------------- +<1> The `SyncedFlushRequest` to execute and the `ActionListener` to use when +the execution completes + +The asynchronous method does not block and returns immediately. Once it is +completed the `ActionListener` is called back using the `onResponse` method +if the execution successfully completed or using the `onFailure` method if +it failed. + +A typical listener for `SyncedFlushResponse` looks like: + +["source","java",subs="attributes,callouts,macros"] +-------------------------------------------------- +include-tagged::{doc-tests}/IndicesClientDocumentationIT.java[flush-synced-execute-listener] +-------------------------------------------------- +<1> Called when the execution is successfully completed. The response is +provided as an argument +<2> Called in case of failure. The raised exception is provided as an argument + +[[java-rest-high-flush-response]] +==== Flush Synced Response + +The returned `SyncedFlushResponse` allows to retrieve information about the +executed operation as follows: + +["source","java",subs="attributes,callouts,macros"] +-------------------------------------------------- +include-tagged::{doc-tests}/IndicesClientDocumentationIT.java[flush-synced-response] +-------------------------------------------------- +<1> Total number of shards hit by the flush request +<2> Number of shards where the flush has succeeded +<3> Number of shards where the flush has failed +<4> Name of the index whose results we are about to calculate. +<5> Total number of shards for index mentioned in 4. +<6> Successful shards for index mentioned in 4. +<7> Failed shards for index mentioned in 4. +<8> One of the failed shard ids of the failed index mentioned in 4. +<9> Total copies of the shard mentioned in 8. +<10> Successful copies of the shard mentioned in 8. +<11> Failed copies of the shard mentioned in 8. +<12> Reason for failure of copies of the shard mentioned in 8. +<13> Routing information (like id, state, version etc.) for the failed shard copies. + +By default, if the indices were not found, an `ElasticsearchException` will be thrown: + +["source","java",subs="attributes,callouts,macros"] +-------------------------------------------------- +include-tagged::{doc-tests}/IndicesClientDocumentationIT.java[flush-synced-notfound] +-------------------------------------------------- +<1> Do something if the indices to be flushed were not found \ No newline at end of file diff --git a/docs/java-rest/high-level/supported-apis.asciidoc b/docs/java-rest/high-level/supported-apis.asciidoc index de5a3d6b6a656..fb8ec558014e3 100644 --- a/docs/java-rest/high-level/supported-apis.asciidoc +++ b/docs/java-rest/high-level/supported-apis.asciidoc @@ -59,6 +59,7 @@ Index Management:: * <> * <> * <> +* <> * <> * <> * <> @@ -79,6 +80,7 @@ include::indices/shrink_index.asciidoc[] include::indices/split_index.asciidoc[] include::indices/refresh.asciidoc[] include::indices/flush.asciidoc[] +include::indices/flush_synced.asciidoc[] include::indices/clear_cache.asciidoc[] include::indices/force_merge.asciidoc[] include::indices/rollover.asciidoc[] diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/flush/SyncedFlushResponse.java b/server/src/main/java/org/elasticsearch/action/admin/indices/flush/SyncedFlushResponse.java index 4d995c0daeb25..ef2fcfcc0fbec 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/flush/SyncedFlushResponse.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/flush/SyncedFlushResponse.java @@ -52,8 +52,9 @@ */ public class SyncedFlushResponse extends ActionResponse implements ToXContentFragment { - Map> shardsResultPerIndex; + Map responsePerIndex; Map shardCountsPerIndex; + Map> shardsResultPerIndex; ShardCounts shardCounts; SyncedFlushResponse() { @@ -71,16 +72,15 @@ public SyncedFlushResponse(Map> shardsResu shardsCountsPerIndex.put(entry.getKey(), calculateShardCounts(entry.getValue())); } this.shardCountsPerIndex = unmodifiableMap(shardsCountsPerIndex); + this.responsePerIndex = unmodifiableMap(buildResponsePerIndex()); } public SyncedFlushResponse(ShardCounts shardCounts, Map> shardsResultPerIndex, Map shardCountsPerIndex) { - // shardsResultPerIndex is never modified after it is passed to this - // constructor so this is safe even though shardsResultPerIndex is a - // ConcurrentHashMap this.shardsResultPerIndex = unmodifiableMap(shardsResultPerIndex); this.shardCounts = shardCounts; this.shardCountsPerIndex = shardCountsPerIndex; + this.responsePerIndex = unmodifiableMap(buildResponsePerIndex()); } /** @@ -112,8 +112,53 @@ public Map> getShardsResultPerIndex() { return shardsResultPerIndex; } - public Map getShardCountsPerIndex() { - return shardCountsPerIndex; + /** + * @return FlushSyncedResponsePerIndex for each index that was sent in the request + */ + public Map getResponsePerIndex() { + return this.responsePerIndex; + } + + + private Map buildResponsePerIndex() { + Map responsePerIndex = new HashMap<>(); + for (Map.Entry entry: shardCountsPerIndex.entrySet()) { + String indexName = entry.getKey(); + ShardCounts shardCounts = entry.getValue(); + Map shardFailures = new HashMap<>(); + // If there were no failures shardFailures would be an empty array + if (shardCounts.failed > 0) { + List indexResult = shardsResultPerIndex.get(indexName); + for (ShardsSyncedFlushResult shardResults : indexResult) { + if (shardResults.failed()) { + shardFailures.put( + shardResults.shardId(), + new ShardFailure( + shardResults.shardId(), + shardResults.failureReason(), + shardResults.totalShards(), + shardResults.successfulShards(), + null) + ); + continue; + } + Map failedShards = shardResults.failedShards(); + for (Map.Entry shardEntry : failedShards.entrySet()) { + shardFailures.put( + shardResults.shardId(), + new ShardFailure( + shardResults.shardId(), + shardResults.failureReason(), + shardResults.totalShards(), + shardResults.successfulShards(), + shardEntry.getKey()) + ); + } + } + } + responsePerIndex.put(indexName, new FlushSyncedResponsePerIndex(indexName, shardCounts, shardFailures)); + } + return responsePerIndex; } @Override @@ -124,7 +169,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws for (Map.Entry> indexEntry : shardsResultPerIndex.entrySet()) { List indexResult = indexEntry.getValue(); builder.startObject(indexEntry.getKey()); - ShardCounts indexShardCounts = calculateShardCounts(indexResult); + ShardCounts indexShardCounts = shardCountsPerIndex.get(indexEntry.getKey()); indexShardCounts.toXContent(builder, params); if (indexShardCounts.failed > 0) { builder.startArray(Fields.FAILURES); @@ -169,18 +214,20 @@ private static SyncedFlushResponse innerFromXContent(XContentParser parser) thro // If it is an object we try to parse it for Fields._SHARD or for an index entry for (Token curToken = parser.currentToken(); curToken != Token.END_OBJECT; curToken = parser.nextToken()) { ensureExpectedToken(Token.FIELD_NAME, parser.currentToken(), parser::getTokenLocation); - String fieldName = parser.currentName(); + String currentName = parser.currentName(); curToken = parser.nextToken(); - Integer totalShards = null; - Integer successfulShards = null; - Integer failedShards = null; - Map> failures = new HashMap<>(); if (curToken == Token.START_OBJECT) { // Start parsing for _shard or for index + Boolean isIndex = !currentName.equals(Fields._SHARDS); + String indexName = isIndex ? currentName : null; + Integer totalShards = null; + Integer successfulShards = null; + Integer failedShards = null; + Map> failures = null; for (curToken = parser.nextToken(); curToken != Token.END_OBJECT; curToken = parser.nextToken()) { if (curToken == Token.FIELD_NAME) { - String level2FieldName = parser.currentName(); - curToken = parser.nextToken(); - switch (level2FieldName) { + currentName = parser.currentName(); + curToken = parser.nextToken(); + switch (currentName) { case Fields.TOTAL: ensureExpectedToken(Token.VALUE_NUMBER, curToken, parser::getTokenLocation); totalShards = parser.intValue(); @@ -194,66 +241,9 @@ private static SyncedFlushResponse innerFromXContent(XContentParser parser) thro failedShards = parser.intValue(); break; case Fields.FAILURES: - if (!fieldName.equals(Fields._SHARDS)) { + if (isIndex) { ensureExpectedToken(Token.START_ARRAY, curToken, parser::getTokenLocation); - for (curToken = parser.nextToken(); curToken != Token.END_ARRAY; curToken = parser.nextToken()) { - ensureExpectedToken(Token.START_OBJECT, curToken, parser::getTokenLocation); - ShardRouting routing = null; - String failureReason = null; - Integer totalCopies = null; - Integer successfulCopies = null; - ShardId shardId = null; - XContentLocation startLocation = parser.getTokenLocation(); - for (curToken = parser.nextToken(); curToken != Token.END_OBJECT; curToken = parser.nextToken()) { - ensureExpectedToken(Token.FIELD_NAME, curToken, parser::getTokenLocation); - String level3FieldName = parser.currentName(); - curToken = parser.nextToken(); - switch (level3FieldName) { - case Fields.SHARD: - ensureExpectedToken(Token.VALUE_NUMBER, curToken, parser::getTokenLocation); - shardId = new ShardId( - fieldName, - IndexMetaData.INDEX_UUID_NA_VALUE, - parser.intValue() - ); - break; - case Fields.REASON: - ensureExpectedToken(Token.VALUE_STRING, curToken, parser::getTokenLocation); - failureReason = parser.text(); - break; - case Fields.TOTAL_COPIES: - ensureExpectedToken(Token.VALUE_NUMBER, curToken, parser::getTokenLocation); - totalCopies = parser.intValue(); - break; - case Fields.SUCCESSFUL_COPIES: - ensureExpectedToken(Token.VALUE_NUMBER, curToken, parser::getTokenLocation); - successfulCopies = parser.intValue(); - break; - case Fields.ROUTING: - routing = ShardRouting.fromXContent(parser); - break; - default: - // If something else skip it - parser.skipChildren(); - break; - } - } - if (failureReason != null && - shardId != null && - totalCopies != null && - successfulCopies != null) { - // This is ugly but there is only one ShardsSyncedFlushResult for each shardId - // so this will work. - if (!failures.containsKey(shardId)) { - failures.put(shardId, new ArrayList<>()); - } - failures.get(shardId).add( - new FailureContainer(shardId, failureReason, totalCopies, successfulCopies, routing) - ); - } else { - throw new ParsingException(startLocation, "Unable to construct ShardsSyncedFlushResult"); - } - } + failures = shardFailuresFromXContent(parser, indexName); } else { parser.skipChildren(); } @@ -270,36 +260,38 @@ private static SyncedFlushResponse innerFromXContent(XContentParser parser) thro successfulShards != null && failedShards != null) { ShardCounts shardCount = new ShardCounts(totalShards, successfulShards, failedShards); - if (fieldName.equals(Fields._SHARDS)) { + if (!isIndex) { totalShardCounts = shardCount; } else { List results = new ArrayList<>(); - // All failures in this list belong to the same index - for (Map.Entry> entry: failures.entrySet()) { - Map shardResponses = new HashMap<>(); - for (FailureContainer container: entry.getValue()) { - if (container.shardRouting != null) { - shardResponses.put(container.shardRouting, - new SyncedFlushService.ShardSyncedFlushResponse(container.failureReason) + if (failures != null) { + // All failures in this list belong to the same index + for (Map.Entry> entry: failures.entrySet()) { + Map shardResponses = new HashMap<>(); + for (ShardFailure container: entry.getValue()) { + if (container.shardRouting != null) { + shardResponses.put(container.shardRouting, + new SyncedFlushService.ShardSyncedFlushResponse(container.failureReason) + ); + } + } + // Size of entry.getValue() will at least be one + ShardFailure container = entry.getValue().get(0); + if (!shardResponses.isEmpty()) { + results.add( + new ShardsSyncedFlushResult(container.shardId, null, container.totalCopies, + container.successfulCopies, shardResponses) + ); + } else { + results.add( + new ShardsSyncedFlushResult(container.shardId, container.totalCopies, + container.successfulCopies, container.failureReason) ); } } - // Size of entry.getValue() will at least be one - FailureContainer container = entry.getValue().get(0); - if (!shardResponses.isEmpty()) { - results.add( - new ShardsSyncedFlushResult(container.shardId, null, container.totalCopies, - container.successfulCopies, shardResponses) - ); - } else { - results.add( - new ShardsSyncedFlushResult(container.shardId, container.totalCopies, - container.successfulCopies, container.failureReason) - ); - } - } - shardsCountsPerIndex.put(fieldName, shardCount); - shardsResultPerIndex.put(fieldName, results); + } // if failures were null then no failures were reported + shardsCountsPerIndex.put(indexName, shardCount); + shardsResultPerIndex.put(indexName, results); } } } else { // Else leave this tree alone @@ -309,6 +301,23 @@ private static SyncedFlushResponse innerFromXContent(XContentParser parser) thro return new SyncedFlushResponse(totalShardCounts, shardsResultPerIndex, shardsCountsPerIndex); } + private static Map> shardFailuresFromXContent( + XContentParser parser, + String indexName) throws IOException { + + Map> failures = new HashMap<>(); + for (Token curToken = parser.nextToken(); curToken != Token.END_ARRAY; curToken = parser.nextToken()) { + ensureExpectedToken(Token.START_OBJECT, curToken, parser::getTokenLocation); + ShardFailure failure = ShardFailure.fromXContent(parser, indexName); + // This is ugly but there is only one ShardsSyncedFlushResult for each shardId + // so this will work. + if (!failures.containsKey(failure.shardId)) { + failures.put(failure.shardId, new ArrayList<>()); + } + failures.get(failure.shardId).add(failure); + } + return failures; + } static ShardCounts calculateShardCounts(Iterable results) { int total = 0, successful = 0, failed = 0; @@ -326,20 +335,128 @@ static ShardCounts calculateShardCounts(Iterable result return new ShardCounts(total, successful, failed); } - // Only used as a container for parsing XContent - static final class FailureContainer { + // Only used as a container for XContent + public static final class ShardFailure { ShardId shardId; String failureReason; ShardRouting shardRouting; int totalCopies; int successfulCopies; - FailureContainer(ShardId shardId, String failureReason, int totalCopies, int successfulCopies, + int failedCopies; + + ShardFailure(ShardId shardId, String failureReason, int totalCopies, int successfulCopies, @Nullable ShardRouting shardRouting) { this.shardId = shardId; this.failureReason = failureReason; this.shardRouting = shardRouting; this.totalCopies = totalCopies; this.successfulCopies = successfulCopies; + this.failedCopies = this.totalCopies - this.successfulCopies; + } + + public ShardId getShardId() { + return shardId; + } + + public String getFailureReason() { + return failureReason; + } + + public ShardRouting getShardRouting() { + return shardRouting; + } + + public int getTotalCopies() { + return totalCopies; + } + + public int getSuccessfulCopies() { + return successfulCopies; + } + + public int getFailedCopies() { + return failedCopies; + } + + public static ShardFailure fromXContent(XContentParser parser, String indexName) throws IOException { + ShardRouting routing = null; + String failureReason = null; + Integer totalCopies = null; + Integer successfulCopies = null; + ShardId shardId = null; + Token curToken; + XContentLocation startLocation = parser.getTokenLocation(); + for (curToken = parser.nextToken(); curToken != Token.END_OBJECT; curToken = parser.nextToken()) { + ensureExpectedToken(Token.FIELD_NAME, curToken, parser::getTokenLocation); + String currentFieldName = parser.currentName(); + curToken = parser.nextToken(); + switch (currentFieldName) { + case Fields.SHARD: + ensureExpectedToken(Token.VALUE_NUMBER, curToken, parser::getTokenLocation); + shardId = new ShardId( + indexName, + IndexMetaData.INDEX_UUID_NA_VALUE, + parser.intValue() + ); + break; + case Fields.REASON: + ensureExpectedToken(Token.VALUE_STRING, curToken, parser::getTokenLocation); + failureReason = parser.text(); + break; + case Fields.TOTAL_COPIES: + ensureExpectedToken(Token.VALUE_NUMBER, curToken, parser::getTokenLocation); + totalCopies = parser.intValue(); + break; + case Fields.SUCCESSFUL_COPIES: + ensureExpectedToken(Token.VALUE_NUMBER, curToken, parser::getTokenLocation); + successfulCopies = parser.intValue(); + break; + case Fields.ROUTING: + routing = ShardRouting.fromXContent(parser); + break; + default: + // If something else skip it + parser.skipChildren(); + break; + } + } + if (failureReason != null && + shardId != null && + totalCopies != null && + successfulCopies != null) { + return new ShardFailure(shardId, failureReason, totalCopies, successfulCopies, routing); + } else { + throw new ParsingException(startLocation, "Unable to construct ShardsSyncedFlushResult"); + } + } + } + + // Only used for response objects + public static final class FlushSyncedResponsePerIndex { + String index; + ShardCounts shardCounts; + Map shardFailures; + + FlushSyncedResponsePerIndex(String index, ShardCounts counts, Map shardFailures) { + this.index = index; + this.shardCounts = counts; + this.shardFailures = shardFailures; + } + + public int getTotalShards() { + return shardCounts.total; + } + + public int getSuccessfulShards() { + return shardCounts.successful; + } + + public int getFailedShards() { + return shardCounts.failed; + } + + public Map getShardFailures() { + return shardFailures; } } diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/ShardRouting.java b/server/src/main/java/org/elasticsearch/cluster/routing/ShardRouting.java index 41349b684ff88..b9dc4a66df6b4 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/ShardRouting.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/ShardRouting.java @@ -54,6 +54,20 @@ public final class ShardRouting implements Writeable, ToXContentObject { */ public static final long UNAVAILABLE_EXPECTED_SHARD_SIZE = -1; + /** + * XContent Fields + */ + static final String STATE = "state"; + static final String PRIMARY = "primary"; + static final String NODE = "node"; + static final String RELOCATING_NODE = "relocating_node"; + static final String SHARD = "shard"; + static final String INDEX = "index"; + static final String EXPECTED_SHARD_SIZE_IN_BYTES = "expected_shard_size_in_bytes"; + static final String RECOVERY_SOURCE = "recovery_source"; + static final String ALLOCATION_ID = "allocation_id"; + static final String UNASSIGNED_INFO = "unassigned_info"; + private final ShardId shardId; private final String currentNodeId; private final String relocatingNodeId; @@ -623,20 +637,20 @@ public String shortSummary() { @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { builder.startObject() - .field(Fields.STATE, state()) - .field(Fields.PRIMARY, primary()) - .field(Fields.NODE, currentNodeId()) - .field(Fields.RELOCATING_NODE, relocatingNodeId()) - .field(Fields.SHARD, id()) - .field(Fields.INDEX, getIndexName()); + .field(STATE, state()) + .field(PRIMARY, primary()) + .field(NODE, currentNodeId()) + .field(RELOCATING_NODE, relocatingNodeId()) + .field(SHARD, id()) + .field(INDEX, getIndexName()); if (expectedShardSize != UNAVAILABLE_EXPECTED_SHARD_SIZE) { - builder.field(Fields.EXPECTED_SHARD_SIZE_IN_BYTES, expectedShardSize); + builder.field(EXPECTED_SHARD_SIZE_IN_BYTES, expectedShardSize); } if (recoverySource != null) { - builder.field(Fields.RECOVERY_SOURCE, recoverySource); + builder.field(RECOVERY_SOURCE, recoverySource); } if (allocationId != null) { - builder.field(Fields.ALLOCATION_ID); + builder.field(ALLOCATION_ID); allocationId.toXContent(builder, params); } if (unassignedInfo != null) { @@ -664,7 +678,7 @@ public static ShardRouting fromXContent(XContentParser parser) throws IOExceptio String fieldName = parser.currentName(); Token currentToken = parser.nextToken(); // Move to value of the field switch (fieldName) { - case Fields.STATE: + case STATE: if (currentToken == Token.VALUE_STRING) { state = ShardRoutingState.valueOf(parser.text()); } else { @@ -672,11 +686,11 @@ public static ShardRouting fromXContent(XContentParser parser) throws IOExceptio ensureExpectedToken(Token.VALUE_NULL, currentToken, parser::getTokenLocation); } break; - case Fields.PRIMARY: + case PRIMARY: ensureExpectedToken(Token.VALUE_BOOLEAN, currentToken, parser::getTokenLocation); isPrimary = parser.booleanValue(); break; - case Fields.NODE: + case NODE: if (currentToken == Token.VALUE_STRING) { nodeId = parser.text(); } else { @@ -684,7 +698,7 @@ public static ShardRouting fromXContent(XContentParser parser) throws IOExceptio ensureExpectedToken(Token.VALUE_NULL, currentToken, parser::getTokenLocation); } break; - case Fields.RELOCATING_NODE: + case RELOCATING_NODE: if (currentToken == Token.VALUE_STRING) { relocatingNodeid = parser.text(); } else { @@ -692,25 +706,25 @@ public static ShardRouting fromXContent(XContentParser parser) throws IOExceptio ensureExpectedToken(Token.VALUE_NULL, currentToken, parser::getTokenLocation); } break; - case Fields.SHARD: + case SHARD: ensureExpectedToken(Token.VALUE_NUMBER, currentToken, parser::getTokenLocation); shardId = parser.intValue(); break; - case Fields.INDEX: + case INDEX: ensureExpectedToken(Token.VALUE_STRING, currentToken, parser::getTokenLocation); indexName = parser.text(); break; - case Fields.EXPECTED_SHARD_SIZE_IN_BYTES: + case EXPECTED_SHARD_SIZE_IN_BYTES: ensureExpectedToken(Token.VALUE_STRING, currentToken, parser::getTokenLocation); expectedShardSizeInBytes = parser.longValue(); break; - case Fields.RECOVERY_SOURCE: + case RECOVERY_SOURCE: recoverySource = RecoverySource.fromXContent(parser); break; - case Fields.ALLOCATION_ID: + case ALLOCATION_ID: allocationId = AllocationId.fromXContent(parser); break; - case Fields.UNASSIGNED_INFO: + case UNASSIGNED_INFO: unassignedInfo = UnassignedInfo.fromXContent(parser); break; default: @@ -740,19 +754,6 @@ public static ShardRouting fromXContent(XContentParser parser) throws IOExceptio } } - static final class Fields { - static final String STATE = "state"; - static final String PRIMARY = "primary"; - static final String NODE = "node"; - static final String RELOCATING_NODE = "relocating_node"; - static final String SHARD = "shard"; - static final String INDEX = "index"; - static final String EXPECTED_SHARD_SIZE_IN_BYTES = "expected_shard_size_in_bytes"; - static final String RECOVERY_SOURCE = "recovery_source"; - static final String ALLOCATION_ID = "allocation_id"; - static final String UNASSIGNED_INFO = "unassigned_info"; - } - /** * Returns the expected shard size for {@link ShardRoutingState#RELOCATING} and {@link ShardRoutingState#INITIALIZING} * shards. If it's size is not available {@value #UNAVAILABLE_EXPECTED_SHARD_SIZE} will be returned. diff --git a/server/src/test/java/org/elasticsearch/action/admin/indices/flush/SyncedFlushResponseTests.java b/server/src/test/java/org/elasticsearch/action/admin/indices/flush/SyncedFlushResponseTests.java new file mode 100644 index 0000000000000..6052c77786254 --- /dev/null +++ b/server/src/test/java/org/elasticsearch/action/admin/indices/flush/SyncedFlushResponseTests.java @@ -0,0 +1,75 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.action.admin.indices.flush; + +import java.io.IOException; +import java.util.Map; +import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.xcontent.LoggingDeprecationHandler; +import org.elasticsearch.common.xcontent.ToXContent; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.index.shard.ShardId; + +public class SyncedFlushResponseTests extends AbstractSyncedFlushTest { + + public void testXContentSerialization() throws IOException { + final XContentType xContentType = randomFrom(XContentType.values()); + XContentBuilder builder = XContentBuilder.builder(xContentType.xContent()); + TestPlan plan = createTestPlan(); + SyncedFlushResponse response = plan.result; + assertNotNull(response); + builder.startObject(); + response.toXContent(builder, ToXContent.EMPTY_PARAMS); + builder.endObject(); + XContentParser parser = builder + .generator() + .contentType() + .xContent() + .createParser( + xContentRegistry(), LoggingDeprecationHandler.INSTANCE, BytesReference.bytes(builder).streamInput() + ); + SyncedFlushResponse parsedResponse = SyncedFlushResponse.fromXContent(parser); + assertNotNull(parsedResponse); + assertShardCounts(response.shardCounts, parsedResponse.shardCounts); + for (Map.Entry entry: response.getResponsePerIndex().entrySet()) { + String index = entry.getKey(); + // assertShardCounts(entry.getValue().shardCounts, parsedResponse.shardCountsPerIndex.get(index)); + SyncedFlushResponse.FlushSyncedResponsePerIndex responseResult = entry.getValue(); + SyncedFlushResponse.FlushSyncedResponsePerIndex parsedResult = parsedResponse.responsePerIndex.get(index); + assertNotNull(responseResult); + assertNotNull(parsedResult); + assertShardCounts(responseResult.shardCounts, parsedResult.shardCounts); + assertEquals(responseResult.shardFailures.size(), parsedResult.shardFailures.size()); + for (Map.Entry failureEntry: responseResult.shardFailures.entrySet()) { + ShardId id = failureEntry.getKey(); + SyncedFlushResponse.ShardFailure responseShardFailure = failureEntry.getValue(); + SyncedFlushResponse.ShardFailure parsedShardFailure = parsedResult.shardFailures.get(id); + assertNotNull(parsedShardFailure); + assertEquals(responseShardFailure.successfulCopies, parsedShardFailure.successfulCopies); + assertEquals(responseShardFailure.totalCopies, parsedShardFailure.totalCopies); + assertEquals(responseShardFailure.failedCopies, parsedShardFailure.failedCopies); + // We skip shard routing information here. + // Separate tests for shard routing verification exist in + // org.elasticsearch.cluster.routing.ShardRoutingTests + } + } + } +} diff --git a/server/src/test/java/org/elasticsearch/action/admin/indices/flush/SyncedFlushResponseUnitTests.java b/server/src/test/java/org/elasticsearch/action/admin/indices/flush/SyncedFlushResponseUnitTests.java deleted file mode 100644 index 375c7f4e63c18..0000000000000 --- a/server/src/test/java/org/elasticsearch/action/admin/indices/flush/SyncedFlushResponseUnitTests.java +++ /dev/null @@ -1,90 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.elasticsearch.action.admin.indices.flush; - -import java.io.IOException; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import org.elasticsearch.cluster.routing.ShardRouting; -import org.elasticsearch.common.bytes.BytesReference; -import org.elasticsearch.common.xcontent.LoggingDeprecationHandler; -import org.elasticsearch.common.xcontent.ToXContent; -import org.elasticsearch.common.xcontent.XContentBuilder; -import org.elasticsearch.common.xcontent.XContentParser; -import org.elasticsearch.common.xcontent.XContentType; -import org.elasticsearch.indices.flush.ShardsSyncedFlushResult; -import org.elasticsearch.indices.flush.SyncedFlushService; - -public class SyncedFlushResponseUnitTests extends AbstractSyncedFlushTest { - - public void testXContentSerialization() throws IOException { - final XContentType xContentType = randomFrom(XContentType.values()); - XContentBuilder builder = XContentBuilder.builder(xContentType.xContent()); - TestPlan plan = createTestPlan(); - SyncedFlushResponse response = plan.result; - assertNotNull(response); - builder.startObject(); - response.toXContent(builder, ToXContent.EMPTY_PARAMS); - builder.endObject(); - XContentParser parser = builder - .generator() - .contentType() - .xContent() - .createParser( - xContentRegistry(), LoggingDeprecationHandler.INSTANCE, BytesReference.bytes(builder).streamInput() - ); - SyncedFlushResponse parsedResponse = SyncedFlushResponse.fromXContent(parser); - assertNotNull(parsedResponse); - assertShardCounts(response.shardCounts, parsedResponse.shardCounts); - for (Map.Entry entry: response.getShardCountsPerIndex().entrySet()) { - assertShardCounts(entry.getValue(), parsedResponse.shardCountsPerIndex.get(entry.getKey())); - List responseResults = response.shardsResultPerIndex.get(entry.getKey()); - List parsedResults = parsedResponse.shardsResultPerIndex.get(entry.getKey()); - assertNotNull(responseResults); - assertNotNull(parsedResults); - if (entry.getValue().failed > 0) { - // build a map for each shardId and compare total_copies and successful_copies and failed_copies - Map parsedResponseMap = new HashMap<>(); - for (ShardsSyncedFlushResult parsedResponseResult: parsedResults) { - parsedResponseMap.put(parsedResponseResult.shardId().id(), parsedResponseResult); - } - // We are just trying to perform a hash join here on the two lists based on shardId - for (ShardsSyncedFlushResult responseResult: responseResults) { - Map responseFailedShards = - responseResult.failedShards(); - // After deserialization we lose information of successful shards - if (responseResult.failed() || responseFailedShards.size() > 0) { - ShardsSyncedFlushResult parsedResponseResult = parsedResponseMap.get(responseResult.shardId().id()); - Map parsedFailedShards = - parsedResponseResult.failedShards(); - assertNotNull(parsedResponseResult); - assertEquals(responseResult.totalShards(), parsedResponseResult.totalShards()); - assertEquals(responseResult.successfulShards(), parsedResponseResult.successfulShards()); - assertEquals(responseResult.failureReason(), parsedResponseResult.failureReason()); - assertEquals(responseFailedShards.size(), parsedFailedShards.size()); - } - } - } - } - // We skip shard routing information here. Separate tests for shard routing verification exist - // in ShardRoutingTests - } - -} From 0bf9dcf027280bdc19e3962c4a124d6534fe6192 Mon Sep 17 00:00:00 2001 From: Sohaib Iftikhar Date: Tue, 1 May 2018 23:02:06 +0200 Subject: [PATCH 4/4] Fixes after merging with master MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit — fixed document:integTest --- .../elasticsearch/client/IndicesClient.java | 4 +-- .../client/RequestConverters.java | 8 ++--- .../client/RequestConvertersTests.java | 2 +- .../IndicesClientDocumentationIT.java | 3 +- .../high-level/indices/flush_synced.asciidoc | 3 +- docs/reference/indices/recovery.asciidoc | 1 + .../indices/flush/SyncedFlushResponse.java | 31 ++++++++++--------- 7 files changed, 28 insertions(+), 24 deletions(-) diff --git a/client/rest-high-level/src/main/java/org/elasticsearch/client/IndicesClient.java b/client/rest-high-level/src/main/java/org/elasticsearch/client/IndicesClient.java index 1ef12f3a4031b..444a27e744cf5 100644 --- a/client/rest-high-level/src/main/java/org/elasticsearch/client/IndicesClient.java +++ b/client/rest-high-level/src/main/java/org/elasticsearch/client/IndicesClient.java @@ -273,7 +273,7 @@ public void flushAsync(FlushRequest flushRequest, ActionListener * Synced flush API on elastic.co */ public SyncedFlushResponse flushSynced(SyncedFlushRequest syncedFlushRequest, Header... headers) throws IOException { - return restHighLevelClient.performRequestAndParseEntity(syncedFlushRequest, Request::syncedFlush, + return restHighLevelClient.performRequestAndParseEntity(syncedFlushRequest, RequestConverters::syncedFlush, SyncedFlushResponse::fromXContent, emptySet(), headers); } @@ -284,7 +284,7 @@ public SyncedFlushResponse flushSynced(SyncedFlushRequest syncedFlushRequest, He * Synced flush API on elastic.co */ public void flushSyncedAsync(SyncedFlushRequest syncedFlushRequest, ActionListener listener, Header... headers) { - restHighLevelClient.performRequestAsyncAndParseEntity(syncedFlushRequest, Request::syncedFlush, + restHighLevelClient.performRequestAsyncAndParseEntity(syncedFlushRequest, RequestConverters::syncedFlush, SyncedFlushResponse::fromXContent, listener, emptySet(), headers); } diff --git a/client/rest-high-level/src/main/java/org/elasticsearch/client/RequestConverters.java b/client/rest-high-level/src/main/java/org/elasticsearch/client/RequestConverters.java index 4d02f7c4a3106..2f54d1d158b78 100644 --- a/client/rest-high-level/src/main/java/org/elasticsearch/client/RequestConverters.java +++ b/client/rest-high-level/src/main/java/org/elasticsearch/client/RequestConverters.java @@ -209,10 +209,10 @@ static Request flush(FlushRequest flushRequest) { static Request syncedFlush(SyncedFlushRequest syncedFlushRequest) { String[] indices = syncedFlushRequest.indices() == null ? Strings.EMPTY_ARRAY : syncedFlushRequest.indices(); - String endpoint = endpoint(indices, "_flush/synced"); - Params syncedFlushparameters = Params.builder(); - syncedFlushparameters.withIndicesOptions(syncedFlushRequest.indicesOptions()); - return new Request(HttpPost.METHOD_NAME, endpoint, syncedFlushparameters.getParams(), null); + Request request = new Request(HttpPost.METHOD_NAME, endpoint(indices, "_flush/synced")); + Params parameters = new Params(request); + parameters.withIndicesOptions(syncedFlushRequest.indicesOptions()); + return request; } static Request forceMerge(ForceMergeRequest forceMergeRequest) { diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/RequestConvertersTests.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/RequestConvertersTests.java index 769bef3184a1d..b58e289d3eb80 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/RequestConvertersTests.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/RequestConvertersTests.java @@ -599,7 +599,7 @@ public void testSyncedFlush() { } Map expectedParams = new HashMap<>(); setRandomIndicesOptions(syncedFlushRequest::indicesOptions, syncedFlushRequest::indicesOptions, expectedParams); - Request request = Request.syncedFlush(syncedFlushRequest); + Request request = RequestConverters.syncedFlush(syncedFlushRequest); StringJoiner endpoint = new StringJoiner("/", "/", ""); if (indices != null && indices.length > 0) { endpoint.add(String.join(",", indices)); diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/IndicesClientDocumentationIT.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/IndicesClientDocumentationIT.java index 3fdf83f816dd8..ef59ebe4024c1 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/IndicesClientDocumentationIT.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/IndicesClientDocumentationIT.java @@ -75,6 +75,7 @@ import java.io.IOException; import java.util.HashMap; import java.util.Map; +import java.util.Optional; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -821,7 +822,7 @@ public void testSyncedFlushIndex() throws Exception { int successfulCopies = failureEntry.getValue().getSuccessfulCopies(); // <10> int failedCopies = failureEntry.getValue().getFailedCopies(); // <11> String failureReason = failureEntry.getValue().getFailureReason(); // <12> - ShardRouting routing = failureEntry.getValue().getShardRouting(); // <13> + Optional routing = failureEntry.getValue().getShardRouting(); // <13> } } } diff --git a/docs/java-rest/high-level/indices/flush_synced.asciidoc b/docs/java-rest/high-level/indices/flush_synced.asciidoc index 15cb0165502e5..41d4ae05d3558 100644 --- a/docs/java-rest/high-level/indices/flush_synced.asciidoc +++ b/docs/java-rest/high-level/indices/flush_synced.asciidoc @@ -82,7 +82,8 @@ include-tagged::{doc-tests}/IndicesClientDocumentationIT.java[flush-synced-respo <10> Successful copies of the shard mentioned in 8. <11> Failed copies of the shard mentioned in 8. <12> Reason for failure of copies of the shard mentioned in 8. -<13> Routing information (like id, state, version etc.) for the failed shard copies. +<13> Optional. Routing information (like id, state, version etc.) for the failed shard copies. +If the entire shard failed then this returns Optional.empty. By default, if the indices were not found, an `ElasticsearchException` will be thrown: diff --git a/docs/reference/indices/recovery.asciidoc b/docs/reference/indices/recovery.asciidoc index 49f58e645bcda..ef1cac48dd5bc 100644 --- a/docs/reference/indices/recovery.asciidoc +++ b/docs/reference/indices/recovery.asciidoc @@ -88,6 +88,7 @@ Response: "total_time_in_millis" : 175576, "source" : { "repository" : "my_repository", + "snapshot_uuid" : "faOyRKQOQwy8ze5yhwlAcQ", "snapshot" : "my_snapshot", "index" : "index1", "version" : "{version}" diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/flush/SyncedFlushResponse.java b/server/src/main/java/org/elasticsearch/action/admin/indices/flush/SyncedFlushResponse.java index ef2fcfcc0fbec..3ef0bff91dee5 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/flush/SyncedFlushResponse.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/flush/SyncedFlushResponse.java @@ -22,7 +22,6 @@ import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.common.ParsingException; -import org.elasticsearch.common.inject.internal.Nullable; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Streamable; @@ -38,11 +37,12 @@ import org.elasticsearch.rest.RestStatus; import java.io.IOException; -import java.util.ArrayList; import java.util.Collections; +import java.util.ArrayList; +import java.util.Map; import java.util.HashMap; import java.util.List; -import java.util.Map; +import java.util.Optional; import static java.util.Collections.unmodifiableMap; import static org.elasticsearch.common.xcontent.XContentParserUtils.ensureExpectedToken; @@ -138,7 +138,7 @@ private Map buildResponsePerIndex() { shardResults.failureReason(), shardResults.totalShards(), shardResults.successfulShards(), - null) + Optional.empty()) ); continue; } @@ -151,7 +151,8 @@ private Map buildResponsePerIndex() { shardResults.failureReason(), shardResults.totalShards(), shardResults.successfulShards(), - shardEntry.getKey()) + Optional.of(shardEntry.getKey()) + ) ); } } @@ -269,11 +270,11 @@ private static SyncedFlushResponse innerFromXContent(XContentParser parser) thro for (Map.Entry> entry: failures.entrySet()) { Map shardResponses = new HashMap<>(); for (ShardFailure container: entry.getValue()) { - if (container.shardRouting != null) { - shardResponses.put(container.shardRouting, + container.maybeShardRouting.ifPresent(shardRouting -> + shardResponses.put(shardRouting, new SyncedFlushService.ShardSyncedFlushResponse(container.failureReason) - ); - } + ) + ); } // Size of entry.getValue() will at least be one ShardFailure container = entry.getValue().get(0); @@ -339,16 +340,16 @@ static ShardCounts calculateShardCounts(Iterable result public static final class ShardFailure { ShardId shardId; String failureReason; - ShardRouting shardRouting; + Optional maybeShardRouting; int totalCopies; int successfulCopies; int failedCopies; ShardFailure(ShardId shardId, String failureReason, int totalCopies, int successfulCopies, - @Nullable ShardRouting shardRouting) { + Optional maybeShardRouting) { this.shardId = shardId; this.failureReason = failureReason; - this.shardRouting = shardRouting; + this.maybeShardRouting = maybeShardRouting; this.totalCopies = totalCopies; this.successfulCopies = successfulCopies; this.failedCopies = this.totalCopies - this.successfulCopies; @@ -362,8 +363,8 @@ public String getFailureReason() { return failureReason; } - public ShardRouting getShardRouting() { - return shardRouting; + public Optional getShardRouting() { + return maybeShardRouting; } public int getTotalCopies() { @@ -424,7 +425,7 @@ public static ShardFailure fromXContent(XContentParser parser, String indexName) shardId != null && totalCopies != null && successfulCopies != null) { - return new ShardFailure(shardId, failureReason, totalCopies, successfulCopies, routing); + return new ShardFailure(shardId, failureReason, totalCopies, successfulCopies, Optional.ofNullable(routing)); } else { throw new ParsingException(startLocation, "Unable to construct ShardsSyncedFlushResult"); }