Skip to content

Commit

Permalink
Add more UTs and fixed spotless checks
Browse files Browse the repository at this point in the history
Signed-off-by: Shourya Dutta Biswas <[email protected]>
  • Loading branch information
shourya035 committed Aug 8, 2023
1 parent 3ac12e9 commit ce2bb70
Show file tree
Hide file tree
Showing 5 changed files with 98 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,7 @@ public Map<String, Long> getFileSizes() {
return Collections.unmodifiableMap(this.fileSizes);
}

/** Returns remote_store based stats **/
public RemoteSegmentStats getRemoteSegmentStats() {
return remoteSegmentStats;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

/**
* Tracks remote store segment download and upload stats
* Used for displaying remote store stats in IndicesStats/NodeStats API
*
* @opensearch.internal
*/
Expand Down Expand Up @@ -156,7 +157,11 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
builder.humanReadableField(Fields.FAILED_BYTES, Fields.FAILED, new ByteSizeValue(uploadBytesFailed));
builder.endObject();
builder.humanReadableField(Fields.MAX_REFRESH_TIME_LAG_IN_MILLIS, Fields.MAX_REFRESH_TIME_LAG, new TimeValue(maxRefreshTimeLag));
builder.humanReadableField(Fields.MAX_REFRESH_SIZE_LAG_IN_MILLIS, Fields.MAX_REFRESH_SIZE_LAG, new ByteSizeValue(maxRefreshBytesLag));
builder.humanReadableField(
Fields.MAX_REFRESH_SIZE_LAG_IN_MILLIS,
Fields.MAX_REFRESH_SIZE_LAG,
new ByteSizeValue(maxRefreshBytesLag)
);
builder.endObject();
builder.startObject(Fields.DOWNLOAD);
builder.startObject(Fields.TOTAL_DOWNLOADS);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -334,6 +334,7 @@ Runnable getGlobalCheckpointSyncer() {
private final Store remoteStore;
private final BiFunction<IndexSettings, ShardRouting, TranslogFactory> translogFactorySupplier;
private final boolean isTimeSeriesIndex;

private final RemoteRefreshSegmentPressureService remoteRefreshSegmentPressureService;

private final List<ReferenceManager.RefreshListener> internalRefreshListener = new ArrayList<>();
Expand Down Expand Up @@ -546,6 +547,11 @@ public QueryCachingPolicy getQueryCachingPolicy() {
return cachingPolicy;
}

/** Only used for testing **/
protected RemoteRefreshSegmentPressureService getRemoteRefreshSegmentPressureService() {
return remoteRefreshSegmentPressureService;
}

@Override
public void updateShardState(
final ShardRouting newRouting,
Expand Down Expand Up @@ -1379,6 +1385,7 @@ public MergeStats mergeStats() {
public SegmentsStats segmentStats(boolean includeSegmentFileSizes, boolean includeUnloadedSegments) {
SegmentsStats segmentsStats = getEngine().segmentsStats(includeSegmentFileSizes, includeUnloadedSegments);
segmentsStats.addBitsetMemoryInBytes(shardBitsetFilterCache.getMemorySizeInBytes());
// Populate remote_store stats only if the index is remote store backed
if (indexSettings.isRemoteStoreEnabled()) {
RemoteSegmentStats remoteSegmentStats = new RemoteSegmentStats();
remoteSegmentStats.buildRemoteSegmentStats(remoteRefreshSegmentPressureService.getRemoteRefreshSegmentTracker(shardId).stats());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@

package org.opensearch.action.admin.cluster.node.stats;

import org.opensearch.action.admin.indices.stats.CommonStats;
import org.opensearch.action.admin.indices.stats.CommonStatsFlags;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.routing.WeightedRoutingStats;
import org.opensearch.cluster.service.ClusterManagerThrottlingStats;
Expand All @@ -44,6 +46,8 @@
import org.opensearch.http.HttpStats;
import org.opensearch.core.indices.breaker.AllCircuitBreakerStats;
import org.opensearch.core.indices.breaker.CircuitBreakerStats;
import org.opensearch.index.remote.RemoteSegmentStats;
import org.opensearch.indices.NodeIndicesStats;
import org.opensearch.ingest.IngestStats;
import org.opensearch.monitor.fs.FsInfo;
import org.opensearch.monitor.jvm.JvmStats;
Expand Down Expand Up @@ -76,7 +80,7 @@

public class NodeStatsTests extends OpenSearchTestCase {
public void testSerialization() throws IOException {
NodeStats nodeStats = createNodeStats();
NodeStats nodeStats = createNodeStats(true);
try (BytesStreamOutput out = new BytesStreamOutput()) {
nodeStats.writeTo(out);
try (StreamInput in = out.bytes().streamInput()) {
Expand Down Expand Up @@ -436,11 +440,35 @@ public void testSerialization() throws IOException {
assertEquals(weightedRoutingStats.getFailOpenCount(), deserializedWeightedRoutingStats.getFailOpenCount());

}

NodeIndicesStats nodeIndicesStats = nodeStats.getIndices();
NodeIndicesStats deserializedNodeIndicesStats = deserializedNodeStats.getIndices();
if (nodeIndicesStats == null) {
assertNull(deserializedNodeIndicesStats);
} else {
RemoteSegmentStats remoteSegmentStats = nodeIndicesStats.getSegments().getRemoteSegmentStats();
RemoteSegmentStats deserializedRemoteSegmentStats = deserializedNodeIndicesStats.getSegments().getRemoteSegmentStats();
assertEquals(remoteSegmentStats.getDownloadBytesStarted(), deserializedRemoteSegmentStats.getDownloadBytesStarted());
assertEquals(
remoteSegmentStats.getDownloadBytesSucceeded(),
deserializedRemoteSegmentStats.getDownloadBytesSucceeded()
);
assertEquals(remoteSegmentStats.getDownloadBytesFailed(), deserializedRemoteSegmentStats.getDownloadBytesFailed());
assertEquals(remoteSegmentStats.getUploadBytesStarted(), deserializedRemoteSegmentStats.getUploadBytesStarted());
assertEquals(remoteSegmentStats.getUploadBytesSucceeded(), deserializedRemoteSegmentStats.getUploadBytesSucceeded());
assertEquals(remoteSegmentStats.getUploadBytesFailed(), deserializedRemoteSegmentStats.getUploadBytesFailed());
assertEquals(remoteSegmentStats.getMaxRefreshTimeLag(), deserializedRemoteSegmentStats.getMaxRefreshTimeLag());
assertEquals(remoteSegmentStats.getMaxRefreshBytesLag(), deserializedRemoteSegmentStats.getMaxRefreshBytesLag());
}
}
}
}

public static NodeStats createNodeStats() {
return createNodeStats(false);
}

public static NodeStats createNodeStats(boolean remoteStoreStats) {
DiscoveryNode node = new DiscoveryNode(
"test_node",
buildNewFakeTransportAddress(),
Expand Down Expand Up @@ -718,11 +746,14 @@ public static NodeStats createNodeStats() {
weightedRoutingStats = WeightedRoutingStats.getInstance();
weightedRoutingStats.updateFailOpenCount();

// TODO NodeIndicesStats are not tested here, way too complicated to create, also they need to be migrated to Writeable yet
NodeIndicesStats indicesStats = getNodeIndicesStats(remoteStoreStats);

// TODO: Only remote_store based aspects of NodeIndicesStats are being tested here.
// It is possible to test other metrics in NodeIndicesStats as well since it extends Writeable now
return new NodeStats(
node,
randomNonNegativeLong(),
null,
indicesStats,
osStats,
processStats,
jvmStats,
Expand All @@ -747,6 +778,23 @@ public static NodeStats createNodeStats() {
);
}

private static NodeIndicesStats getNodeIndicesStats(boolean remoteStoreStats) {
NodeIndicesStats indicesStats = null;
if (remoteStoreStats) {
indicesStats = new NodeIndicesStats(new CommonStats(CommonStatsFlags.ALL), new HashMap<>());
RemoteSegmentStats remoteSegmentStats = indicesStats.getSegments().getRemoteSegmentStats();
remoteSegmentStats.addUploadBytesStarted(10L);
remoteSegmentStats.addUploadBytesSucceeded(10L);
remoteSegmentStats.addUploadBytesFailed(1L);
remoteSegmentStats.addDownloadBytesStarted(10L);
remoteSegmentStats.addDownloadBytesSucceeded(10L);
remoteSegmentStats.addDownloadBytesFailed(1L);
remoteSegmentStats.setMaxRefreshBytesLag(5L);
remoteSegmentStats.setMaxRefreshTimeLag(2L);
}
return indicesStats;
}

private OperationStats getPipelineStats(List<IngestStats.PipelineStat> pipelineStats, String id) {
return pipelineStats.stream().filter(p1 -> p1.getPipelineId().equals(id)).findFirst().map(p2 -> p2.getStats()).orElse(null);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,8 @@
import org.opensearch.index.mapper.SourceToParse;
import org.opensearch.index.mapper.Uid;
import org.opensearch.index.mapper.VersionFieldMapper;
import org.opensearch.index.remote.RemoteSegmentStats;
import org.opensearch.index.remote.RemoteSegmentTransferTracker;
import org.opensearch.index.seqno.ReplicationTracker;
import org.opensearch.index.seqno.RetentionLease;
import org.opensearch.index.seqno.RetentionLeaseSyncer;
Expand Down Expand Up @@ -1812,6 +1814,31 @@ public Set<String> getPendingDeletions() throws IOException {
}
}

public void testShardStatsWithRemoteStoreEnabled() throws IOException {
IndexShard shard = newStartedShard(
Settings.builder()
.put(IndexMetadata.SETTING_REPLICATION_TYPE, "SEGMENT")
.put(IndexMetadata.SETTING_REMOTE_STORE_ENABLED, true)
.build()
);
RemoteSegmentTransferTracker remoteRefreshSegmentTracker = shard.getRemoteRefreshSegmentPressureService()
.getRemoteRefreshSegmentTracker(shard.shardId);
populateSampleRemoteStoreStats(remoteRefreshSegmentTracker);
ShardStats shardStats = new ShardStats(
shard.routingEntry(),
shard.shardPath(),
new CommonStats(new IndicesQueryCache(Settings.EMPTY), shard, new CommonStatsFlags()),
shard.commitStats(),
shard.seqNoStats(),
shard.getRetentionLeaseStats()
);
RemoteSegmentStats remoteSegmentStats = shardStats.getStats().getSegments().getRemoteSegmentStats();
assertEquals(remoteRefreshSegmentTracker.getUploadBytesStarted(), remoteSegmentStats.getUploadBytesStarted());
assertEquals(remoteRefreshSegmentTracker.getUploadBytesSucceeded(), remoteSegmentStats.getUploadBytesSucceeded());
assertEquals(remoteRefreshSegmentTracker.getUploadBytesFailed(), remoteSegmentStats.getUploadBytesFailed());
closeShards(shard);
}

public void testRefreshMetric() throws IOException {
IndexShard shard = newStartedShard();
// refresh on: finalize and end of recovery
Expand Down Expand Up @@ -4873,4 +4900,10 @@ public void testRecordsForceMerges() throws IOException {
assertThat(thirdForceMergeUUID, equalTo(secondForceMergeRequest.forceMergeUUID()));
closeShards(shard);
}

private void populateSampleRemoteStoreStats(RemoteSegmentTransferTracker tracker) {
tracker.addUploadBytesStarted(10L);
tracker.addUploadBytesSucceeded(10L);
tracker.addUploadBytesFailed(10L);
}
}

0 comments on commit ce2bb70

Please sign in to comment.