Skip to content

Commit

Permalink
Implementing pagination for _cat/shards
Browse files Browse the repository at this point in the history
Signed-off-by: Harsh Garg <[email protected]>
  • Loading branch information
Harsh Garg committed Jul 8, 2024
1 parent f14b5c8 commit 549427a
Show file tree
Hide file tree
Showing 4 changed files with 230 additions and 12 deletions.
54 changes: 54 additions & 0 deletions server/src/main/java/org/opensearch/common/Table.java
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@
import java.util.Map;
import java.util.concurrent.TimeUnit;

import reactor.util.annotation.NonNull;

import static java.util.Collections.emptyMap;

/**
Expand All @@ -59,9 +61,19 @@ public class Table {
private List<Cell> currentCells;
private boolean inHeaders = false;
private boolean withTime = false;
private PaginationMetadata paginationMetadata = new PaginationMetadata(false, null, null);

public static final String EPOCH = "epoch";
public static final String TIMESTAMP = "timestamp";

public Table() {}

public Table(@Nullable PaginationMetadata paginationMetadata) {
if (paginationMetadata != null) {
this.paginationMetadata = paginationMetadata;

Check warning on line 73 in server/src/main/java/org/opensearch/common/Table.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/common/Table.java#L73

Added line #L73 was not covered by tests
}
}

public Table startHeaders() {
inHeaders = true;
currentCells = new ArrayList<>();
Expand Down Expand Up @@ -230,6 +242,18 @@ public Map<String, String> getAliasMap() {
return headerAliasMap;
}

public boolean isTablePaginated() {
return paginationMetadata.isResponsePaginated;
}

public String getNextTokenForTable() {
return paginationMetadata.nextToken;

Check warning on line 250 in server/src/main/java/org/opensearch/common/Table.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/common/Table.java#L250

Added line #L250 was not covered by tests
}

public String getPaginatedElementForTable() {
return paginationMetadata.paginatedElement;

Check warning on line 254 in server/src/main/java/org/opensearch/common/Table.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/common/Table.java#L254

Added line #L254 was not covered by tests
}

/**
* Cell in a table
*
Expand All @@ -254,4 +278,34 @@ public Cell(Object value, Map<String, String> attr) {
this.attr = attr;
}
}

/**
* Pagination metadata for a table.
*
* @opensearch.internal
*/
public static class PaginationMetadata {

/**
* boolean denoting whether the table is paginated or not.
*/
public final boolean isResponsePaginated;

/**
* String denoting the element which is being paginated (for e.g. shards, indices..).
*/
public final String paginatedElement;

/**
* String denoting the nextToken of paginated response, which will be used to fetch nextPage (if any).
*/
public final String nextToken;

public PaginationMetadata(@NonNull boolean isResponsePaginated, @Nullable String paginatedElement, @Nullable String nextToken) {
this.isResponsePaginated = isResponsePaginated;
assert !isResponsePaginated || paginatedElement != null : "paginatedElement must be specified for a table which is paginated";
this.paginatedElement = paginatedElement;
this.nextToken = nextToken;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.opensearch.action.admin.indices.stats.IndicesStatsResponse;
import org.opensearch.action.admin.indices.stats.ShardStats;
import org.opensearch.client.node.NodeClient;
import org.opensearch.cluster.routing.IndexShardRoutingTable;
import org.opensearch.cluster.routing.ShardRouting;
import org.opensearch.cluster.routing.UnassignedInfo;
import org.opensearch.common.Table;
Expand All @@ -65,11 +66,17 @@
import org.opensearch.rest.action.RestResponseListener;
import org.opensearch.search.suggest.completion.CompletionStats;

import java.nio.charset.StandardCharsets;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Base64;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.function.Function;

import static java.nio.charset.StandardCharsets.UTF_8;
import static java.util.Arrays.asList;
import static java.util.Collections.unmodifiableList;
import static org.opensearch.rest.RestRequest.Method.GET;
Expand Down Expand Up @@ -106,24 +113,53 @@ protected void documentation(StringBuilder sb) {

@Override
public RestChannelConsumer doCatRequest(final RestRequest request, final NodeClient client) {
final String[] indices = Strings.splitStringByCommaToArray(request.param("index"));

String[] indices = new String[0];
final ClusterStateRequest clusterStateRequest = new ClusterStateRequest();
clusterStateRequest.local(request.paramAsBoolean("local", clusterStateRequest.local()));
clusterStateRequest.clusterManagerNodeTimeout(
request.paramAsTime("cluster_manager_timeout", clusterStateRequest.clusterManagerNodeTimeout())
);
parseDeprecatedMasterTimeoutParameter(clusterStateRequest, request, deprecationLogger, getName());
clusterStateRequest.clear().nodes(true).routingTable(true).indices(indices);
if (request.hasParam("nextToken")) {
// ToDo: Add validation on the nextToken passed in the request
// Need to get the metadata as well
request.param("nextToken");
clusterStateRequest.clear().nodes(true).routingTable(true).metadata(true);

Check warning on line 128 in server/src/main/java/org/opensearch/rest/action/cat/RestShardsAction.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/rest/action/cat/RestShardsAction.java#L127-L128

Added lines #L127 - L128 were not covered by tests
} else {
// Only parse the "index" param if the request is not-paginated.
indices = Strings.splitStringByCommaToArray(request.param("index"));
clusterStateRequest.clear().nodes(true).routingTable(true).indices(indices);

Check warning on line 132 in server/src/main/java/org/opensearch/rest/action/cat/RestShardsAction.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/rest/action/cat/RestShardsAction.java#L131-L132

Added lines #L131 - L132 were not covered by tests
}

String[] finalIndices = indices;

Check warning on line 135 in server/src/main/java/org/opensearch/rest/action/cat/RestShardsAction.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/rest/action/cat/RestShardsAction.java#L135

Added line #L135 was not covered by tests
return channel -> client.admin().cluster().state(clusterStateRequest, new RestActionListener<ClusterStateResponse>(channel) {
@Override
public void processResponse(final ClusterStateResponse clusterStateResponse) {
IndicesStatsRequest indicesStatsRequest = new IndicesStatsRequest();
indicesStatsRequest.all();
indicesStatsRequest.indices(indices);
final List<ShardRouting> shardRoutingResponseList = new ArrayList<>();

Check warning on line 141 in server/src/main/java/org/opensearch/rest/action/cat/RestShardsAction.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/rest/action/cat/RestShardsAction.java#L141

Added line #L141 was not covered by tests
final Table.PaginationMetadata paginationMetadata;
if (request.hasParam("nextToken")) {
List<String> indicesToBeQueried = new ArrayList<>();
paginationMetadata = new Table.PaginationMetadata(

Check warning on line 145 in server/src/main/java/org/opensearch/rest/action/cat/RestShardsAction.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/rest/action/cat/RestShardsAction.java#L144-L145

Added lines #L144 - L145 were not covered by tests
true,
"shards",
getNextTokenForPaginatedResponse(request, clusterStateResponse, indicesToBeQueried, shardRoutingResponseList)

Check warning on line 148 in server/src/main/java/org/opensearch/rest/action/cat/RestShardsAction.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/rest/action/cat/RestShardsAction.java#L148

Added line #L148 was not covered by tests
);
indicesStatsRequest.indices(indicesToBeQueried.toArray(new String[0]));
} else {
shardRoutingResponseList.addAll(clusterStateResponse.getState().routingTable().allShards());
indicesStatsRequest.indices(finalIndices);
paginationMetadata = null;

Check warning on line 154 in server/src/main/java/org/opensearch/rest/action/cat/RestShardsAction.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/rest/action/cat/RestShardsAction.java#L150-L154

Added lines #L150 - L154 were not covered by tests
}
client.admin().indices().stats(indicesStatsRequest, new RestResponseListener<IndicesStatsResponse>(channel) {
@Override
public RestResponse buildResponse(IndicesStatsResponse indicesStatsResponse) throws Exception {
return RestTable.buildResponse(buildTable(request, clusterStateResponse, indicesStatsResponse), channel);
return RestTable.buildResponse(
buildTable(request, clusterStateResponse, indicesStatsResponse, shardRoutingResponseList, paginationMetadata),

Check warning on line 160 in server/src/main/java/org/opensearch/rest/action/cat/RestShardsAction.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/rest/action/cat/RestShardsAction.java#L159-L160

Added lines #L159 - L160 were not covered by tests
channel
);
}
});
}
Expand All @@ -132,7 +168,11 @@ public RestResponse buildResponse(IndicesStatsResponse indicesStatsResponse) thr

@Override
protected Table getTableWithHeader(final RestRequest request) {
Table table = new Table();
return getTableWithHeader(request, null);

Check warning on line 171 in server/src/main/java/org/opensearch/rest/action/cat/RestShardsAction.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/rest/action/cat/RestShardsAction.java#L171

Added line #L171 was not covered by tests
}

protected Table getTableWithHeader(final RestRequest request, Table.PaginationMetadata paginationMetadata) {
Table table = new Table(paginationMetadata);
table.startHeaders()
.addCell("index", "default:true;alias:i,idx;desc:index name")
.addCell("shard", "default:true;alias:s,sh;desc:shard name")
Expand Down Expand Up @@ -301,10 +341,15 @@ private static <S, T> Object getOrNull(S stats, Function<S, T> accessor, Functio
}

// package private for testing
Table buildTable(RestRequest request, ClusterStateResponse state, IndicesStatsResponse stats) {
Table table = getTableWithHeader(request);

for (ShardRouting shard : state.getState().routingTable().allShards()) {
Table buildTable(
RestRequest request,
ClusterStateResponse state,
IndicesStatsResponse stats,
List<ShardRouting> shardRoutingList,
Table.PaginationMetadata paginationMetadata
) {
Table table = getTableWithHeader(request, paginationMetadata);
for (ShardRouting shard : shardRoutingList) {
ShardStats shardStats = stats.asMap().get(shard);
CommonStats commonStats = null;
CommitStats commitStats = null;
Expand Down Expand Up @@ -453,7 +498,106 @@ Table buildTable(RestRequest request, ClusterStateResponse state, IndicesStatsRe

table.endRow();
}

return table;
}

private List<String> getListOfIndicesSortedByCreateTime(final ClusterStateResponse clusterStateResponse) {
List<String> indicesList = new ArrayList<String>(clusterStateResponse.getState().getRoutingTable().getIndicesRouting().keySet());
indicesList.sort((index1, index2) -> {
Long index1CreationTimeStamp = clusterStateResponse.getState().metadata().indices().get(index1).getCreationDate();
Long index2CreationTimeStamp = clusterStateResponse.getState().metadata().indices().get(index2).getCreationDate();

Check warning on line 508 in server/src/main/java/org/opensearch/rest/action/cat/RestShardsAction.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/rest/action/cat/RestShardsAction.java#L505-L508

Added lines #L505 - L508 were not covered by tests
if (index1CreationTimeStamp.equals(index2CreationTimeStamp)) {
return index1.compareTo(index2);

Check warning on line 510 in server/src/main/java/org/opensearch/rest/action/cat/RestShardsAction.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/rest/action/cat/RestShardsAction.java#L510

Added line #L510 was not covered by tests
}
return (index1CreationTimeStamp - index2CreationTimeStamp) > 0 ? 1 : -1;
});
return indicesList;

Check warning on line 514 in server/src/main/java/org/opensearch/rest/action/cat/RestShardsAction.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/rest/action/cat/RestShardsAction.java#L514

Added line #L514 was not covered by tests
}

private String getNextTokenForPaginatedResponse(
final RestRequest request,
ClusterStateResponse clusterStateResponse,
List<String> indicesToBeQueried,
List<ShardRouting> shardRoutingResponseList
) {
final long defaultPageSize = (long) clusterStateResponse.getState().nodes().getDataNodes().size() + 1;

Check warning on line 523 in server/src/main/java/org/opensearch/rest/action/cat/RestShardsAction.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/rest/action/cat/RestShardsAction.java#L523

Added line #L523 was not covered by tests

// Get the nextToken provided in the request
final String nextTokenInRequest = Objects.equals(request.param("nextToken"), "null")
? null
: new String(Base64.getDecoder().decode(request.param("nextToken")), UTF_8);

Check warning on line 528 in server/src/main/java/org/opensearch/rest/action/cat/RestShardsAction.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/rest/action/cat/RestShardsAction.java#L527-L528

Added lines #L527 - L528 were not covered by tests

List<String> sortedIndicesList = getListOfIndicesSortedByCreateTime(clusterStateResponse);

Check warning on line 530 in server/src/main/java/org/opensearch/rest/action/cat/RestShardsAction.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/rest/action/cat/RestShardsAction.java#L530

Added line #L530 was not covered by tests

// Since all the shards for last ID would have already been sent in the last response,
// start iterating from the next shard for current page
int newPageStartShardID = nextTokenInRequest == null ? 0 : Integer.parseInt(nextTokenInRequest.split("\\$")[0]) + 1;

// Since all the shards corresponding to the last processed index might not have been included in the last page,
// start iterating from the last index number itself
int newPageStartIndexNumber = nextTokenInRequest == null ? 0 : Integer.parseInt(nextTokenInRequest.split("\\$")[1]);

// Get the number of shards upto the maxPageSize
long shardCountSoFar = 0L;
int lastProcessedShardNumber = -1;
int lastProcessedIndexNumber = -1;

Check warning on line 543 in server/src/main/java/org/opensearch/rest/action/cat/RestShardsAction.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/rest/action/cat/RestShardsAction.java#L541-L543

Added lines #L541 - L543 were not covered by tests

// ToDo: Handle case when index gets deleted. Select the first index with creationTime just greater than the last index's
// creationTime
int indexNumberInSortedList = newPageStartIndexNumber;

Check warning on line 547 in server/src/main/java/org/opensearch/rest/action/cat/RestShardsAction.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/rest/action/cat/RestShardsAction.java#L547

Added line #L547 was not covered by tests
for (; indexNumberInSortedList < sortedIndicesList.size(); indexNumberInSortedList++) {
String index = sortedIndicesList.get(indexNumberInSortedList);
Map<Integer, IndexShardRoutingTable> indexShards = clusterStateResponse.getState()
.getRoutingTable()
.getIndicesRouting()
.get(index)
.getShards();

Check warning on line 554 in server/src/main/java/org/opensearch/rest/action/cat/RestShardsAction.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/rest/action/cat/RestShardsAction.java#L549-L554

Added lines #L549 - L554 were not covered by tests
// If all the shards corresponding to the last index were already processed, move to the next Index
if (indexNumberInSortedList == newPageStartIndexNumber && (newPageStartShardID > indexShards.size() - 1)) {
// ToDo: Add validation that the newPageStartShardID should not be greater than the newPageStartIndexShards.size()
newPageStartShardID = 0;
continue;

Check warning on line 559 in server/src/main/java/org/opensearch/rest/action/cat/RestShardsAction.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/rest/action/cat/RestShardsAction.java#L558-L559

Added lines #L558 - L559 were not covered by tests
}
int lastProcessedShardNumberForCurrentIndex = -1;

Check warning on line 561 in server/src/main/java/org/opensearch/rest/action/cat/RestShardsAction.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/rest/action/cat/RestShardsAction.java#L561

Added line #L561 was not covered by tests
int shardID = (indexNumberInSortedList == newPageStartIndexNumber) ? newPageStartShardID : 0;
for (; shardID < indexShards.size(); shardID++) {
shardCountSoFar += indexShards.get(shardID).shards().size();

Check warning on line 564 in server/src/main/java/org/opensearch/rest/action/cat/RestShardsAction.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/rest/action/cat/RestShardsAction.java#L564

Added line #L564 was not covered by tests
if (shardCountSoFar > defaultPageSize) {
break;

Check warning on line 566 in server/src/main/java/org/opensearch/rest/action/cat/RestShardsAction.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/rest/action/cat/RestShardsAction.java#L566

Added line #L566 was not covered by tests
}
shardRoutingResponseList.addAll(indexShards.get(shardID).shards());
lastProcessedShardNumberForCurrentIndex = shardID;

Check warning on line 569 in server/src/main/java/org/opensearch/rest/action/cat/RestShardsAction.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/rest/action/cat/RestShardsAction.java#L568-L569

Added lines #L568 - L569 were not covered by tests
}

if (shardCountSoFar > defaultPageSize) {
if (lastProcessedShardNumberForCurrentIndex != -1) {
indicesToBeQueried.add(index);
lastProcessedIndexNumber = indexNumberInSortedList;
lastProcessedShardNumber = lastProcessedShardNumberForCurrentIndex;

Check warning on line 576 in server/src/main/java/org/opensearch/rest/action/cat/RestShardsAction.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/rest/action/cat/RestShardsAction.java#L574-L576

Added lines #L574 - L576 were not covered by tests
}
break;
}
indicesToBeQueried.add(index);
lastProcessedShardNumber = lastProcessedShardNumberForCurrentIndex;
lastProcessedIndexNumber = indexNumberInSortedList;

Check warning on line 582 in server/src/main/java/org/opensearch/rest/action/cat/RestShardsAction.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/rest/action/cat/RestShardsAction.java#L580-L582

Added lines #L580 - L582 were not covered by tests
}

// nextToken = "lastProcessedShardNumber$LastProcessedIndexNumber$CreateTimeOfLastProcessedIndex$NameOfLastProcessedIndex"
return indexNumberInSortedList >= sortedIndicesList.size()
? null
: Base64.getEncoder()
.encodeToString(

Check warning on line 589 in server/src/main/java/org/opensearch/rest/action/cat/RestShardsAction.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/rest/action/cat/RestShardsAction.java#L587-L589

Added lines #L587 - L589 were not covered by tests
(lastProcessedShardNumber
+ "$"
+ (lastProcessedIndexNumber)
+ "$"
+ clusterStateResponse.getState()
.metadata()
.indices()
.get(sortedIndicesList.get(lastProcessedIndexNumber))
.getCreationDate()

Check warning on line 598 in server/src/main/java/org/opensearch/rest/action/cat/RestShardsAction.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/rest/action/cat/RestShardsAction.java#L594-L598

Added lines #L594 - L598 were not covered by tests
+ "$"
+ sortedIndicesList.get(lastProcessedIndexNumber)).getBytes(StandardCharsets.UTF_8)

Check warning on line 600 in server/src/main/java/org/opensearch/rest/action/cat/RestShardsAction.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/rest/action/cat/RestShardsAction.java#L600

Added line #L600 was not covered by tests
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,15 @@ public static RestResponse buildXContentBuilder(Table table, RestChannel channel
XContentBuilder builder = channel.newBuilder();
List<DisplayHeader> displayHeaders = buildDisplayHeaders(table, request);

builder.startArray();
if (table.isTablePaginated()) {
assert table.getPaginatedElementForTable() != null : "Paginated element is required in-case nextToken is not null";
builder.startObject();
builder.field("nextToken", table.getNextTokenForTable());
builder.startArray(table.getPaginatedElementForTable());

Check warning on line 95 in server/src/main/java/org/opensearch/rest/action/cat/RestTable.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/rest/action/cat/RestTable.java#L93-L95

Added lines #L93 - L95 were not covered by tests
} else {
builder.startArray();
}

List<Integer> rowOrder = getRowOrder(table, request);
for (Integer row : rowOrder) {
builder.startObject();
Expand All @@ -98,6 +106,11 @@ public static RestResponse buildXContentBuilder(Table table, RestChannel channel
builder.endObject();
}
builder.endArray();

if (table.isTablePaginated()) {
builder.endObject();

Check warning on line 111 in server/src/main/java/org/opensearch/rest/action/cat/RestTable.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/rest/action/cat/RestTable.java#L111

Added line #L111 was not covered by tests
}

return new BytesRestResponse(RestStatus.OK, builder);
}

Expand Down Expand Up @@ -136,6 +149,13 @@ public static RestResponse buildTextPlainResponse(Table table, RestChannel chann
}
out.append("\n");
}

// Adding a nextToken row, post an empty line, in the response if the table is paginated.
if (table.isTablePaginated()) {
out.append("\n");
out.append("nextToken" + " " + table.getNextTokenForTable());
out.append("\n");

Check warning on line 157 in server/src/main/java/org/opensearch/rest/action/cat/RestTable.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/rest/action/cat/RestTable.java#L155-L157

Added lines #L155 - L157 were not covered by tests
}
out.close();
return new BytesRestResponse(RestStatus.OK, BytesRestResponse.TEXT_CONTENT_TYPE, bytesOut.bytes());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ public void testBuildTable() {
when(state.getState()).thenReturn(clusterState);

final RestShardsAction action = new RestShardsAction();
final Table table = action.buildTable(new FakeRestRequest(), state, stats);
final Table table = action.buildTable(new FakeRestRequest(), state, stats, state.getState().routingTable().allShards(), null);

// now, verify the table is correct
List<Table.Cell> headers = table.getHeaders();
Expand Down

0 comments on commit 549427a

Please sign in to comment.