From 48e38b07607674cb6e61c0957cee7359e1a80dd6 Mon Sep 17 00:00:00 2001 From: Harsh Garg Date: Tue, 26 Mar 2024 10:43:38 +0530 Subject: [PATCH] Adding TestMetricsRegistry for UTs Signed-off-by: Harsh Garg --- .../coordination/ClusterManagerMetrics.java | 52 +++++++++++++ .../cluster/coordination/Coordinator.java | 7 +- .../coordination/FollowersChecker.java | 20 +---- .../cluster/coordination/LeaderChecker.java | 22 +----- .../opensearch/discovery/DiscoveryModule.java | 6 +- .../main/java/org/opensearch/node/Node.java | 4 +- .../coordination/FollowersCheckerTests.java | 67 +++++++++------- .../coordination/LeaderCheckerTests.java | 50 +++++------- .../cluster/coordination/NodeJoinTests.java | 2 +- .../discovery/DiscoveryModuleTests.java | 3 +- .../snapshots/SnapshotResiliencyTests.java | 3 +- .../telemetry/TestInMemoryCounter.java | 52 +++++++++++++ .../telemetry/TestInMemoryHistogram.java | 47 ++++++++++++ .../TestInMemoryMetricsRegistry.java | 76 +++++++++++++++++++ .../AbstractCoordinatorTestCase.java | 2 +- 15 files changed, 308 insertions(+), 105 deletions(-) create mode 100644 server/src/main/java/org/opensearch/cluster/coordination/ClusterManagerMetrics.java create mode 100644 server/src/test/java/org/opensearch/telemetry/TestInMemoryCounter.java create mode 100644 server/src/test/java/org/opensearch/telemetry/TestInMemoryHistogram.java create mode 100644 server/src/test/java/org/opensearch/telemetry/TestInMemoryMetricsRegistry.java diff --git a/server/src/main/java/org/opensearch/cluster/coordination/ClusterManagerMetrics.java b/server/src/main/java/org/opensearch/cluster/coordination/ClusterManagerMetrics.java new file mode 100644 index 0000000000000..27a269202f048 --- /dev/null +++ b/server/src/main/java/org/opensearch/cluster/coordination/ClusterManagerMetrics.java @@ -0,0 +1,52 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.cluster.coordination; + +import org.opensearch.telemetry.metrics.Counter; +import org.opensearch.telemetry.metrics.MetricsRegistry; +import org.opensearch.telemetry.metrics.tags.Tags; + +import java.util.Objects; +import java.util.Optional; + +/** + * Class containing metrics (counters/latency) specific to ClusterManager. + */ +public final class ClusterManagerMetrics { + + private static final String COUNTER_METRICS_UNIT = "1"; + + public final Counter leaderCheckFailureCounter; + public final Counter followerChecksFailureCounter; + + public ClusterManagerMetrics(MetricsRegistry metricsRegistry) { + this.followerChecksFailureCounter = metricsRegistry.createCounter( + "followers.checker.failure.count", + "Counter for number of failed follower checks", + COUNTER_METRICS_UNIT + ); + this.leaderCheckFailureCounter = metricsRegistry.createCounter( + "leader.checker.failure.count", + "Counter for number of failed leader checks", + COUNTER_METRICS_UNIT + ); + } + + public void incrementCounter(Counter counter, Double value) { + incrementCounter(counter, value, Optional.empty()); + } + + public void incrementCounter(Counter counter, Double value, Optional tags) { + if (Objects.isNull(tags) || tags.isEmpty()) { + counter.add(value); + return; + } + counter.add(value, tags.get()); + } +} diff --git a/server/src/main/java/org/opensearch/cluster/coordination/Coordinator.java b/server/src/main/java/org/opensearch/cluster/coordination/Coordinator.java index d702003c98b25..2165984418af5 100644 --- a/server/src/main/java/org/opensearch/cluster/coordination/Coordinator.java +++ b/server/src/main/java/org/opensearch/cluster/coordination/Coordinator.java @@ -87,7 +87,6 @@ import org.opensearch.monitor.NodeHealthService; import org.opensearch.monitor.StatusInfo; import org.opensearch.node.remotestore.RemoteStoreNodeService; -import org.opensearch.telemetry.metrics.MetricsRegistry; import org.opensearch.threadpool.Scheduler; import org.opensearch.threadpool.ThreadPool.Names; import org.opensearch.transport.TransportService; @@ -209,7 +208,7 @@ public Coordinator( NodeHealthService nodeHealthService, PersistedStateRegistry persistedStateRegistry, RemoteStoreNodeService remoteStoreNodeService, - MetricsRegistry metricsRegistry + ClusterManagerMetrics clusterManagerMetrics ) { this.settings = settings; this.transportService = transportService; @@ -269,7 +268,7 @@ public Coordinator( transportService, this::onLeaderFailure, nodeHealthService, - metricsRegistry + clusterManagerMetrics ); this.followersChecker = new FollowersChecker( settings, @@ -278,7 +277,7 @@ public Coordinator( this::onFollowerCheckRequest, this::removeNode, nodeHealthService, - metricsRegistry + clusterManagerMetrics ); this.nodeRemovalExecutor = new NodeRemovalClusterStateTaskExecutor(allocationService, logger); this.clusterApplier = clusterApplier; diff --git a/server/src/main/java/org/opensearch/cluster/coordination/FollowersChecker.java b/server/src/main/java/org/opensearch/cluster/coordination/FollowersChecker.java index c50f50c06370f..312c3bc038c80 100644 --- a/server/src/main/java/org/opensearch/cluster/coordination/FollowersChecker.java +++ b/server/src/main/java/org/opensearch/cluster/coordination/FollowersChecker.java @@ -48,8 +48,6 @@ import org.opensearch.core.transport.TransportResponse.Empty; import org.opensearch.monitor.NodeHealthService; import org.opensearch.monitor.StatusInfo; -import org.opensearch.telemetry.metrics.Counter; -import org.opensearch.telemetry.metrics.MetricsRegistry; import org.opensearch.threadpool.ThreadPool.Names; import org.opensearch.transport.ConnectTransportException; import org.opensearch.transport.Transport; @@ -114,8 +112,6 @@ public class FollowersChecker { Setting.Property.NodeScope ); - private static final String UNIT = "1"; - private final Settings settings; private final TimeValue followerCheckInterval; @@ -131,7 +127,7 @@ public class FollowersChecker { private final TransportService transportService; private final NodeHealthService nodeHealthService; private volatile FastResponseState fastResponseState; - private Counter followerChecksFailureCounter; + private ClusterManagerMetrics clusterManagerMetrics; public FollowersChecker( Settings settings, @@ -140,7 +136,7 @@ public FollowersChecker( Consumer handleRequestAndUpdateState, BiConsumer onNodeFailure, NodeHealthService nodeHealthService, - MetricsRegistry metricsRegistry + ClusterManagerMetrics clusterManagerMetrics ) { this.settings = settings; this.transportService = transportService; @@ -167,15 +163,7 @@ public void onNodeDisconnected(DiscoveryNode node, Transport.Connection connecti handleDisconnectedNode(node); } }); - initializeMetrics(metricsRegistry); - } - - private void initializeMetrics(MetricsRegistry metricsRegistry) { - this.followerChecksFailureCounter = metricsRegistry.createCounter( - "followers.checker.failure.count", - "Counter for number of failed follower checks", - UNIT - ); + this.clusterManagerMetrics = clusterManagerMetrics; } private void setFollowerCheckTimeout(TimeValue followerCheckTimeout) { @@ -416,7 +404,6 @@ public void handleException(TransportException exp) { return; } - followerChecksFailureCounter.add(1); failNode(reason); } @@ -442,6 +429,7 @@ public void run() { followerCheckers.remove(discoveryNode); } onNodeFailure.accept(discoveryNode, reason); + clusterManagerMetrics.incrementCounter(clusterManagerMetrics.followerChecksFailureCounter, 1.0); } @Override diff --git a/server/src/main/java/org/opensearch/cluster/coordination/LeaderChecker.java b/server/src/main/java/org/opensearch/cluster/coordination/LeaderChecker.java index 2c2467c0c7643..0c12fc1d1ee2c 100644 --- a/server/src/main/java/org/opensearch/cluster/coordination/LeaderChecker.java +++ b/server/src/main/java/org/opensearch/cluster/coordination/LeaderChecker.java @@ -50,8 +50,6 @@ import org.opensearch.core.transport.TransportResponse.Empty; import org.opensearch.monitor.NodeHealthService; import org.opensearch.monitor.StatusInfo; -import org.opensearch.telemetry.metrics.Counter; -import org.opensearch.telemetry.metrics.MetricsRegistry; import org.opensearch.threadpool.ThreadPool.Names; import org.opensearch.transport.ConnectTransportException; import org.opensearch.transport.NodeDisconnectedException; @@ -113,8 +111,6 @@ public class LeaderChecker { Setting.Property.NodeScope ); - private static final String UNIT = "1"; - private final Settings settings; private final TimeValue leaderCheckInterval; @@ -125,7 +121,7 @@ public class LeaderChecker { private final NodeHealthService nodeHealthService; private AtomicReference currentChecker = new AtomicReference<>(); private volatile DiscoveryNodes discoveryNodes; - private Counter leaderCheckFailureCounter; + private final ClusterManagerMetrics clusterManagerMetrics; LeaderChecker( final Settings settings, @@ -133,7 +129,7 @@ public class LeaderChecker { final TransportService transportService, final Consumer onLeaderFailure, NodeHealthService nodeHealthService, - final MetricsRegistry metricsRegistry + final ClusterManagerMetrics clusterManagerMetrics ) { this.settings = settings; leaderCheckInterval = LEADER_CHECK_INTERVAL_SETTING.get(settings); @@ -142,6 +138,7 @@ public class LeaderChecker { this.transportService = transportService; this.onLeaderFailure = onLeaderFailure; this.nodeHealthService = nodeHealthService; + this.clusterManagerMetrics = clusterManagerMetrics; clusterSettings.addSettingsUpdateConsumer(LEADER_CHECK_TIMEOUT_SETTING, this::setLeaderCheckTimeout); transportService.registerRequestHandler( @@ -162,15 +159,6 @@ public void onNodeDisconnected(DiscoveryNode node, Transport.Connection connecti handleDisconnectedNode(node); } }); - initializeMetrics(metricsRegistry); - } - - private void initializeMetrics(MetricsRegistry metricsRegistry) { - this.leaderCheckFailureCounter = metricsRegistry.createCounter( - "leader.checker.failure.count", - "Counter for number of failed leader checks", - UNIT - ); } private void setLeaderCheckTimeout(TimeValue leaderCheckTimeout) { @@ -319,12 +307,10 @@ public void handleException(TransportException exp) { if (exp instanceof ConnectTransportException || exp.getCause() instanceof ConnectTransportException) { logger.debug(new ParameterizedMessage("leader [{}] disconnected during check", leader), exp); leaderFailed(new ConnectTransportException(leader, "disconnected during check", exp)); - leaderCheckFailureCounter.add(1); return; } else if (exp.getCause() instanceof NodeHealthCheckFailureException) { logger.debug(new ParameterizedMessage("leader [{}] health check failed", leader), exp); leaderFailed(new NodeHealthCheckFailureException("node [" + leader + "] failed health checks", exp)); - leaderCheckFailureCounter.add(1); return; } long failureCount = failureCountSinceLastSuccess.incrementAndGet(); @@ -342,7 +328,6 @@ public void handleException(TransportException exp) { leaderFailed( new OpenSearchException("node [" + leader + "] failed [" + failureCount + "] consecutive checks", exp) ); - leaderCheckFailureCounter.add(1); return; } @@ -373,6 +358,7 @@ void leaderFailed(Exception e) { @Override public void run() { onLeaderFailure.accept(e); + clusterManagerMetrics.incrementCounter(clusterManagerMetrics.leaderCheckFailureCounter, 1.0); } @Override diff --git a/server/src/main/java/org/opensearch/discovery/DiscoveryModule.java b/server/src/main/java/org/opensearch/discovery/DiscoveryModule.java index cdd1fda03dd4f..08e2fbd4dd08a 100644 --- a/server/src/main/java/org/opensearch/discovery/DiscoveryModule.java +++ b/server/src/main/java/org/opensearch/discovery/DiscoveryModule.java @@ -35,6 +35,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.opensearch.cluster.ClusterState; +import org.opensearch.cluster.coordination.ClusterManagerMetrics; import org.opensearch.cluster.coordination.Coordinator; import org.opensearch.cluster.coordination.ElectionStrategy; import org.opensearch.cluster.coordination.PersistedStateRegistry; @@ -55,7 +56,6 @@ import org.opensearch.monitor.NodeHealthService; import org.opensearch.node.remotestore.RemoteStoreNodeService; import org.opensearch.plugins.DiscoveryPlugin; -import org.opensearch.telemetry.metrics.MetricsRegistry; import org.opensearch.threadpool.ThreadPool; import org.opensearch.transport.TransportService; @@ -135,7 +135,7 @@ public DiscoveryModule( NodeHealthService nodeHealthService, PersistedStateRegistry persistedStateRegistry, RemoteStoreNodeService remoteStoreNodeService, - MetricsRegistry metricsRegistry + ClusterManagerMetrics clusterManagerMetrics ) { final Collection> joinValidators = new ArrayList<>(); final Map> hostProviders = new HashMap<>(); @@ -214,7 +214,7 @@ public DiscoveryModule( nodeHealthService, persistedStateRegistry, remoteStoreNodeService, - metricsRegistry + clusterManagerMetrics ); } else { throw new IllegalArgumentException("Unknown discovery type [" + discoveryType + "]"); diff --git a/server/src/main/java/org/opensearch/node/Node.java b/server/src/main/java/org/opensearch/node/Node.java index 4c3f981f999a3..b91917c7a0ba5 100644 --- a/server/src/main/java/org/opensearch/node/Node.java +++ b/server/src/main/java/org/opensearch/node/Node.java @@ -66,6 +66,7 @@ import org.opensearch.cluster.InternalClusterInfoService; import org.opensearch.cluster.NodeConnectionsService; import org.opensearch.cluster.action.index.MappingUpdatedAction; +import org.opensearch.cluster.coordination.ClusterManagerMetrics; import org.opensearch.cluster.coordination.PersistedStateRegistry; import org.opensearch.cluster.metadata.AliasValidator; import org.opensearch.cluster.metadata.IndexTemplateMetadata; @@ -644,6 +645,7 @@ protected Node( final ClusterInfoService clusterInfoService = newClusterInfoService(settings, clusterService, threadPool, client); final UsageService usageService = new UsageService(); + final ClusterManagerMetrics clusterManagerMetrics = new ClusterManagerMetrics(metricsRegistry); ModulesBuilder modules = new ModulesBuilder(); // plugin modules must be added here, before others or we can get crazy injection errors... @@ -1130,7 +1132,7 @@ protected Node( fsHealthService, persistedStateRegistry, remoteStoreNodeService, - metricsRegistry + clusterManagerMetrics ); final SearchPipelineService searchPipelineService = new SearchPipelineService( clusterService, diff --git a/server/src/test/java/org/opensearch/cluster/coordination/FollowersCheckerTests.java b/server/src/test/java/org/opensearch/cluster/coordination/FollowersCheckerTests.java index fda7660cd3121..229caa8cf4b52 100644 --- a/server/src/test/java/org/opensearch/cluster/coordination/FollowersCheckerTests.java +++ b/server/src/test/java/org/opensearch/cluster/coordination/FollowersCheckerTests.java @@ -47,7 +47,7 @@ import org.opensearch.core.transport.TransportResponse.Empty; import org.opensearch.monitor.NodeHealthService; import org.opensearch.monitor.StatusInfo; -import org.opensearch.telemetry.metrics.Counter; +import org.opensearch.telemetry.TestInMemoryMetricsRegistry; import org.opensearch.telemetry.metrics.MetricsRegistry; import org.opensearch.telemetry.metrics.noop.NoopMetricsRegistry; import org.opensearch.telemetry.tracing.noop.NoopTracer; @@ -94,12 +94,6 @@ import static org.hamcrest.Matchers.not; import static org.hamcrest.Matchers.nullValue; import static org.hamcrest.core.IsInstanceOf.instanceOf; -import static org.mockito.ArgumentMatchers.anyDouble; -import static org.mockito.ArgumentMatchers.anyString; -import static org.mockito.Mockito.atLeastOnce; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; public class FollowersCheckerTests extends OpenSearchTestCase { @@ -149,7 +143,7 @@ protected void onSendRequest(long requestId, String action, TransportRequest req assert false : node; }, () -> new StatusInfo(StatusInfo.Status.HEALTHY, "healthy-info"), - NoopMetricsRegistry.INSTANCE + new ClusterManagerMetrics(NoopMetricsRegistry.INSTANCE) ); followersChecker.setCurrentNodes(discoveryNodesHolder[0]); @@ -207,31 +201,38 @@ protected void onSendRequest(long requestId, String action, TransportRequest req public void testFailsNodeThatDoesNotRespond() { final Settings settings = randomSettings(); + TestInMemoryMetricsRegistry metricsRegistry = new TestInMemoryMetricsRegistry(); testBehaviourOfFailingNode( settings, () -> null, "followers check retry count exceeded", (FOLLOWER_CHECK_RETRY_COUNT_SETTING.get(settings) - 1) * FOLLOWER_CHECK_INTERVAL_SETTING.get(settings).millis() + FOLLOWER_CHECK_RETRY_COUNT_SETTING.get(settings) * FOLLOWER_CHECK_TIMEOUT_SETTING.get(settings).millis(), - () -> new StatusInfo(HEALTHY, "healthy-info") + () -> new StatusInfo(HEALTHY, "healthy-info"), + metricsRegistry ); + assertEquals(Integer.valueOf(2), metricsRegistry.getCounterStore().get("followers.checker.failure.count").getCounterValue()); } public void testFailsNodeThatRejectsCheck() { final Settings settings = randomSettings(); + TestInMemoryMetricsRegistry metricsRegistry = new TestInMemoryMetricsRegistry(); testBehaviourOfFailingNode( settings, () -> { throw new OpenSearchException("simulated exception"); }, "followers check retry count exceeded", (FOLLOWER_CHECK_RETRY_COUNT_SETTING.get(settings) - 1) * FOLLOWER_CHECK_INTERVAL_SETTING.get(settings).millis(), - () -> new StatusInfo(HEALTHY, "healthy-info") + () -> new StatusInfo(HEALTHY, "healthy-info"), + metricsRegistry ); + assertEquals(Integer.valueOf(2), metricsRegistry.getCounterStore().get("followers.checker.failure.count").getCounterValue()); } public void testFailureCounterResetsOnSuccess() { final Settings settings = randomSettings(); final int retryCount = FOLLOWER_CHECK_RETRY_COUNT_SETTING.get(settings); final int maxRecoveries = randomIntBetween(3, 10); + TestInMemoryMetricsRegistry metricsRegistry = new TestInMemoryMetricsRegistry(); // passes just enough checks to keep it alive, up to maxRecoveries, and then fails completely testBehaviourOfFailingNode(settings, new Supplier() { @@ -251,18 +252,23 @@ public Empty get() { "followers check retry count exceeded", (FOLLOWER_CHECK_RETRY_COUNT_SETTING.get(settings) * (maxRecoveries + 1) - 1) * FOLLOWER_CHECK_INTERVAL_SETTING.get(settings) .millis(), - () -> new StatusInfo(HEALTHY, "healthy-info") + () -> new StatusInfo(HEALTHY, "healthy-info"), + metricsRegistry ); + assertEquals(Integer.valueOf(2), metricsRegistry.getCounterStore().get("followers.checker.failure.count").getCounterValue()); } public void testFailsNodeThatIsDisconnected() { + TestInMemoryMetricsRegistry metricsRegistry = new TestInMemoryMetricsRegistry(); testBehaviourOfFailingNode( Settings.EMPTY, () -> { throw new ConnectTransportException(null, "simulated exception"); }, "disconnected", 0, - () -> new StatusInfo(HEALTHY, "healthy-info") + () -> new StatusInfo(HEALTHY, "healthy-info"), + metricsRegistry ); + assertEquals(Integer.valueOf(2), metricsRegistry.getCounterStore().get("followers.checker.failure.count").getCounterValue()); } public void testFailsNodeThatDisconnects() { @@ -318,7 +324,7 @@ public String toString() { assertThat(reason, equalTo("disconnected")); }, () -> new StatusInfo(HEALTHY, "healthy-info"), - NoopMetricsRegistry.INSTANCE + new ClusterManagerMetrics(NoopMetricsRegistry.INSTANCE) ); DiscoveryNodes discoveryNodes = DiscoveryNodes.builder().add(localNode).add(otherNode).localNodeId(localNode.getId()).build(); @@ -332,13 +338,16 @@ public String toString() { } public void testFailsNodeThatIsUnhealthy() { + TestInMemoryMetricsRegistry metricsRegistry = new TestInMemoryMetricsRegistry(); testBehaviourOfFailingNode( randomSettings(), () -> { throw new NodeHealthCheckFailureException("non writable exception"); }, "health check failed", 0, - () -> new StatusInfo(HEALTHY, "healthy-info") + () -> new StatusInfo(HEALTHY, "healthy-info"), + metricsRegistry ); + assertEquals(Integer.valueOf(2), metricsRegistry.getCounterStore().get("followers.checker.failure.count").getCounterValue()); } private void testBehaviourOfFailingNode( @@ -346,7 +355,8 @@ private void testBehaviourOfFailingNode( Supplier responder, String failureReason, long expectedFailureTime, - NodeHealthService nodeHealthService + NodeHealthService nodeHealthService, + MetricsRegistry metricsRegistry ) { final DiscoveryNode localNode = new DiscoveryNode("local-node", buildNewFakeTransportAddress(), Version.CURRENT); final DiscoveryNode otherNode = new DiscoveryNode("other-node", buildNewFakeTransportAddress(), Version.CURRENT); @@ -397,11 +407,6 @@ public String toString() { transportService.acceptIncomingRequests(); final AtomicBoolean nodeFailed = new AtomicBoolean(); - - final MetricsRegistry metricsRegistry = mock(MetricsRegistry.class); - final Counter counter = mock(Counter.class); - when(metricsRegistry.createCounter(anyString(), anyString(), anyString())).thenReturn(counter); - final FollowersChecker followersChecker = new FollowersChecker( settings, clusterSettings, @@ -412,7 +417,7 @@ public String toString() { assertThat(reason, equalTo(failureReason)); }, nodeHealthService, - metricsRegistry + new ClusterManagerMetrics(metricsRegistry) ); DiscoveryNodes discoveryNodes = DiscoveryNodes.builder().add(localNode).add(otherNode).localNodeId(localNode.getId()).build(); @@ -458,8 +463,6 @@ public String toString() { deterministicTaskQueue.runAllTasksInTimeOrder(); assertTrue(nodeFailed.get()); assertThat(followersChecker.getFaultyNodes(), contains(otherNode)); - - verify(counter, atLeastOnce()).add(anyDouble()); } public void testFollowerCheckRequestEqualsHashCodeSerialization() { @@ -519,7 +522,11 @@ protected void onSendRequest(long requestId, String action, TransportRequest req if (exception != null) { throw exception; } - }, (node, reason) -> { assert false : node; }, () -> new StatusInfo(UNHEALTHY, "unhealthy-info"), NoopMetricsRegistry.INSTANCE); + }, + (node, reason) -> { assert false : node; }, + () -> new StatusInfo(UNHEALTHY, "unhealthy-info"), + new ClusterManagerMetrics(NoopMetricsRegistry.INSTANCE) + ); final long leaderTerm = randomLongBetween(2, Long.MAX_VALUE); final long followerTerm = randomLongBetween(1, leaderTerm - 1); @@ -592,7 +599,11 @@ protected void onSendRequest(long requestId, String action, TransportRequest req if (exception != null) { throw exception; } - }, (node, reason) -> { assert false : node; }, () -> new StatusInfo(HEALTHY, "healthy-info"), NoopMetricsRegistry.INSTANCE); + }, + (node, reason) -> { assert false : node; }, + () -> new StatusInfo(HEALTHY, "healthy-info"), + new ClusterManagerMetrics(NoopMetricsRegistry.INSTANCE) + ); { // Does not call into the coordinator in the normal case @@ -739,7 +750,11 @@ public void testPreferClusterManagerNodes() { ); final FollowersChecker followersChecker = new FollowersChecker(Settings.EMPTY, clusterSettings, transportService, fcr -> { assert false : fcr; - }, (node, reason) -> { assert false : node; }, () -> new StatusInfo(HEALTHY, "healthy-info"), NoopMetricsRegistry.INSTANCE); + }, + (node, reason) -> { assert false : node; }, + () -> new StatusInfo(HEALTHY, "healthy-info"), + new ClusterManagerMetrics(NoopMetricsRegistry.INSTANCE) + ); followersChecker.setCurrentNodes(discoveryNodes); List followerTargets = Stream.of(capturingTransport.getCapturedRequestsAndClear()) .map(cr -> cr.node) diff --git a/server/src/test/java/org/opensearch/cluster/coordination/LeaderCheckerTests.java b/server/src/test/java/org/opensearch/cluster/coordination/LeaderCheckerTests.java index 8029fb3aba7ea..87ba9947f90ed 100644 --- a/server/src/test/java/org/opensearch/cluster/coordination/LeaderCheckerTests.java +++ b/server/src/test/java/org/opensearch/cluster/coordination/LeaderCheckerTests.java @@ -44,8 +44,8 @@ import org.opensearch.core.transport.TransportResponse; import org.opensearch.core.transport.TransportResponse.Empty; import org.opensearch.monitor.StatusInfo; -import org.opensearch.telemetry.metrics.Counter; -import org.opensearch.telemetry.metrics.MetricsRegistry; +import org.opensearch.telemetry.TestInMemoryMetricsRegistry; +import org.opensearch.telemetry.metrics.noop.NoopMetricsRegistry; import org.opensearch.telemetry.tracing.noop.NoopTracer; import org.opensearch.test.EqualsHashCodeTestUtils; import org.opensearch.test.EqualsHashCodeTestUtils.CopyFunction; @@ -80,13 +80,6 @@ import static org.hamcrest.Matchers.lessThanOrEqualTo; import static org.hamcrest.Matchers.matchesRegex; import static org.hamcrest.Matchers.nullValue; -import static org.mockito.ArgumentMatchers.anyDouble; -import static org.mockito.ArgumentMatchers.anyString; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.verifyNoInteractions; -import static org.mockito.Mockito.when; public class LeaderCheckerTests extends OpenSearchTestCase { @@ -184,14 +177,13 @@ public String toString() { transportService.acceptIncomingRequests(); final AtomicBoolean leaderFailed = new AtomicBoolean(); - final MetricsRegistry metricsRegistry = mock(MetricsRegistry.class); - final Counter leaderCheckFailedCounter = mock(Counter.class); - when(metricsRegistry.createCounter(anyString(), anyString(), anyString())).thenReturn(leaderCheckFailedCounter); + TestInMemoryMetricsRegistry metricsRegistry = new TestInMemoryMetricsRegistry(); + final ClusterManagerMetrics clusterManagerMetrics = new ClusterManagerMetrics(metricsRegistry); final LeaderChecker leaderChecker = new LeaderChecker(settings, clusterSettings, transportService, e -> { assertThat(e.getMessage(), matchesRegex("node \\[.*\\] failed \\[[1-9][0-9]*\\] consecutive checks")); assertTrue(leaderFailed.compareAndSet(false, true)); - }, () -> new StatusInfo(StatusInfo.Status.HEALTHY, "healthy-info"), metricsRegistry); + }, () -> new StatusInfo(StatusInfo.Status.HEALTHY, "healthy-info"), clusterManagerMetrics); logger.info("--> creating first checker"); leaderChecker.updateLeader(leader1); @@ -241,8 +233,7 @@ public String toString() { ); } leaderChecker.updateLeader(null); - verify(metricsRegistry, times(1)).createCounter(anyString(), anyString(), anyString()); - verify(leaderCheckFailedCounter, times(1)).add(anyDouble()); + assertEquals(Integer.valueOf(1), metricsRegistry.getCounterStore().get("leader.checker.failure.count").getCounterValue()); } enum Response { @@ -307,14 +298,13 @@ public String toString() { transportService.acceptIncomingRequests(); final AtomicBoolean leaderFailed = new AtomicBoolean(); - final MetricsRegistry metricsRegistry = mock(MetricsRegistry.class); - final Counter leaderCheckFailedCounter = mock(Counter.class); - when(metricsRegistry.createCounter(anyString(), anyString(), anyString())).thenReturn(leaderCheckFailedCounter); + TestInMemoryMetricsRegistry metricsRegistry = new TestInMemoryMetricsRegistry(); + final ClusterManagerMetrics clusterManagerMetrics = new ClusterManagerMetrics(metricsRegistry); final LeaderChecker leaderChecker = new LeaderChecker(settings, clusterSettings, transportService, e -> { assertThat(e.getMessage(), anyOf(endsWith("disconnected"), endsWith("disconnected during check"))); assertTrue(leaderFailed.compareAndSet(false, true)); - }, () -> new StatusInfo(StatusInfo.Status.HEALTHY, "healthy-info"), metricsRegistry); + }, () -> new StatusInfo(StatusInfo.Status.HEALTHY, "healthy-info"), clusterManagerMetrics); leaderChecker.updateLeader(leader); { @@ -369,8 +359,7 @@ public String toString() { deterministicTaskQueue.runAllRunnableTasks(); assertTrue(leaderFailed.get()); } - verify(metricsRegistry, times(1)).createCounter(anyString(), anyString(), anyString()); - verify(leaderCheckFailedCounter, times(2)).add(anyDouble()); + assertEquals(Integer.valueOf(3), metricsRegistry.getCounterStore().get("leader.checker.failure.count").getCounterValue()); } public void testFollowerFailsImmediatelyOnHealthCheckFailure() { @@ -427,13 +416,12 @@ public String toString() { transportService.acceptIncomingRequests(); final AtomicBoolean leaderFailed = new AtomicBoolean(); - final MetricsRegistry metricsRegistry = mock(MetricsRegistry.class); - final Counter leaderChecksFailedCounter = mock(Counter.class); - when(metricsRegistry.createCounter(anyString(), anyString(), anyString())).thenReturn(leaderChecksFailedCounter); + TestInMemoryMetricsRegistry metricsRegistry = new TestInMemoryMetricsRegistry(); + final ClusterManagerMetrics clusterManagerMetrics = new ClusterManagerMetrics(metricsRegistry); final LeaderChecker leaderChecker = new LeaderChecker(settings, clusterSettings, transportService, e -> { assertThat(e.getMessage(), endsWith("failed health checks")); assertTrue(leaderFailed.compareAndSet(false, true)); - }, () -> new StatusInfo(StatusInfo.Status.HEALTHY, "healthy-info"), metricsRegistry); + }, () -> new StatusInfo(StatusInfo.Status.HEALTHY, "healthy-info"), clusterManagerMetrics); leaderChecker.updateLeader(leader); @@ -453,8 +441,8 @@ public String toString() { assertTrue(leaderFailed.get()); } - verify(metricsRegistry, times(1)).createCounter(anyString(), anyString(), anyString()); - verify(leaderChecksFailedCounter, times(1)).add(anyDouble()); + + assertEquals(Integer.valueOf(1), metricsRegistry.getCounterStore().get("leader.checker.failure.count").getCounterValue()); } public void testLeaderBehaviour() { @@ -478,16 +466,14 @@ public void testLeaderBehaviour() { transportService.start(); transportService.acceptIncomingRequests(); - final MetricsRegistry metricsRegistry = mock(MetricsRegistry.class); - final Counter leaderChecksFailedCounter = mock(Counter.class); - when(metricsRegistry.createCounter(anyString(), anyString(), anyString())).thenReturn(leaderChecksFailedCounter); + final ClusterManagerMetrics clusterManagerMetrics = new ClusterManagerMetrics(NoopMetricsRegistry.INSTANCE); final LeaderChecker leaderChecker = new LeaderChecker( settings, clusterSettings, transportService, e -> fail("shouldn't be checking anything"), () -> nodeHealthServiceStatus.get(), - metricsRegistry + clusterManagerMetrics ); final DiscoveryNodes discoveryNodes = DiscoveryNodes.builder() @@ -552,8 +538,6 @@ public void testLeaderBehaviour() { equalTo("rejecting leader check from [" + otherNode + "] sent to a node that is no longer the cluster-manager") ); } - verify(metricsRegistry, times(1)).createCounter(anyString(), anyString(), anyString()); - verifyNoInteractions(leaderChecksFailedCounter); } private class CapturingTransportResponseHandler implements TransportResponseHandler { diff --git a/server/src/test/java/org/opensearch/cluster/coordination/NodeJoinTests.java b/server/src/test/java/org/opensearch/cluster/coordination/NodeJoinTests.java index 6c44c000205f7..25b7264356c61 100644 --- a/server/src/test/java/org/opensearch/cluster/coordination/NodeJoinTests.java +++ b/server/src/test/java/org/opensearch/cluster/coordination/NodeJoinTests.java @@ -272,7 +272,7 @@ protected void onSendRequest( nodeHealthService, persistedStateRegistry, Mockito.mock(RemoteStoreNodeService.class), - NoopMetricsRegistry.INSTANCE + new ClusterManagerMetrics(NoopMetricsRegistry.INSTANCE) ); transportService.start(); transportService.acceptIncomingRequests(); diff --git a/server/src/test/java/org/opensearch/discovery/DiscoveryModuleTests.java b/server/src/test/java/org/opensearch/discovery/DiscoveryModuleTests.java index 44e037e5a9845..b558d416aedf4 100644 --- a/server/src/test/java/org/opensearch/discovery/DiscoveryModuleTests.java +++ b/server/src/test/java/org/opensearch/discovery/DiscoveryModuleTests.java @@ -33,6 +33,7 @@ import org.opensearch.Version; import org.opensearch.cluster.ClusterState; +import org.opensearch.cluster.coordination.ClusterManagerMetrics; import org.opensearch.cluster.coordination.Coordinator; import org.opensearch.cluster.coordination.PersistedStateRegistry; import org.opensearch.cluster.node.DiscoveryNode; @@ -130,7 +131,7 @@ private DiscoveryModule newModule(Settings settings, List plugi null, new PersistedStateRegistry(), remoteStoreNodeService, - NoopMetricsRegistry.INSTANCE + new ClusterManagerMetrics(NoopMetricsRegistry.INSTANCE) ); } diff --git a/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java b/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java index 9922f9e6fa12b..a15154ad113a8 100644 --- a/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java +++ b/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java @@ -125,6 +125,7 @@ import org.opensearch.cluster.action.shard.ShardStateAction; import org.opensearch.cluster.coordination.AbstractCoordinatorTestCase; import org.opensearch.cluster.coordination.ClusterBootstrapService; +import org.opensearch.cluster.coordination.ClusterManagerMetrics; import org.opensearch.cluster.coordination.CoordinationMetadata.VotingConfiguration; import org.opensearch.cluster.coordination.CoordinationState; import org.opensearch.cluster.coordination.Coordinator; @@ -2550,7 +2551,7 @@ public void start(ClusterState initialState) { () -> new StatusInfo(HEALTHY, "healthy-info"), persistedStateRegistry, remoteStoreNodeService, - NoopMetricsRegistry.INSTANCE + new ClusterManagerMetrics(NoopMetricsRegistry.INSTANCE) ); clusterManagerService.setClusterStatePublisher(coordinator); coordinator.start(); diff --git a/server/src/test/java/org/opensearch/telemetry/TestInMemoryCounter.java b/server/src/test/java/org/opensearch/telemetry/TestInMemoryCounter.java new file mode 100644 index 0000000000000..d9aee5ebfa941 --- /dev/null +++ b/server/src/test/java/org/opensearch/telemetry/TestInMemoryCounter.java @@ -0,0 +1,52 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.telemetry; + +import org.opensearch.telemetry.metrics.Counter; +import org.opensearch.telemetry.metrics.tags.Tags; + +import java.util.HashMap; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * This is a simple implementation of Counter which is utilized by TestInMemoryMetricsRegistry for + * Unit Tests. It initializes an atomic integer to add the values of counter which doesn't have any tags + * along with a map to store the values recorded against the tags. + * The map and atomic integer can then be used to get the added values. + */ +public class TestInMemoryCounter implements Counter { + + private AtomicInteger counterValue = new AtomicInteger(0); + private ConcurrentHashMap, Double> counterValueForTags = new ConcurrentHashMap<>(); + + public Integer getCounterValue() { + return this.counterValue.get(); + } + + public ConcurrentHashMap, Double> getCounterValueForTags() { + return this.counterValueForTags; + } + + @Override + public void add(double value) { + counterValue.addAndGet((int) value); + } + + @Override + public synchronized void add(double value, Tags tags) { + HashMap hashMap = (HashMap) tags.getTagsMap(); + if (counterValueForTags.get(hashMap) == null) { + counterValueForTags.put(hashMap, value); + } else { + value = counterValueForTags.get(hashMap) + value; + counterValueForTags.put(hashMap, value); + } + } +} diff --git a/server/src/test/java/org/opensearch/telemetry/TestInMemoryHistogram.java b/server/src/test/java/org/opensearch/telemetry/TestInMemoryHistogram.java new file mode 100644 index 0000000000000..ff28df2b6529d --- /dev/null +++ b/server/src/test/java/org/opensearch/telemetry/TestInMemoryHistogram.java @@ -0,0 +1,47 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.telemetry; + +import org.opensearch.telemetry.metrics.Histogram; +import org.opensearch.telemetry.metrics.tags.Tags; + +import java.util.HashMap; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * This is a simple implementation of Histogram which is utilized by TestInMemoryMetricsRegistry for + * Unit Tests. It initializes an atomic integer to record the value of histogram which doesn't have any tags + * along with a map to store the values recorded against the tags. + * The map and atomic integer can then be used to get the recorded values. + */ +public class TestInMemoryHistogram implements Histogram { + + private AtomicInteger histogramValue = new AtomicInteger(0); + private ConcurrentHashMap, Double> histogramValueForTags = new ConcurrentHashMap<>(); + + public Integer getHistogramValue() { + return this.histogramValue.get(); + } + + public ConcurrentHashMap, Double> getHistogramValueForTags() { + return this.histogramValueForTags; + } + + @Override + public void record(double value) { + histogramValue.addAndGet((int) value); + } + + @Override + public synchronized void record(double value, Tags tags) { + HashMap hashMap = (HashMap) tags.getTagsMap(); + histogramValueForTags.put(hashMap, value); + } +} diff --git a/server/src/test/java/org/opensearch/telemetry/TestInMemoryMetricsRegistry.java b/server/src/test/java/org/opensearch/telemetry/TestInMemoryMetricsRegistry.java new file mode 100644 index 0000000000000..761efc0fe96aa --- /dev/null +++ b/server/src/test/java/org/opensearch/telemetry/TestInMemoryMetricsRegistry.java @@ -0,0 +1,76 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.telemetry; + +import org.opensearch.telemetry.metrics.Counter; +import org.opensearch.telemetry.metrics.Histogram; +import org.opensearch.telemetry.metrics.MetricsRegistry; +import org.opensearch.telemetry.metrics.tags.Tags; + +import java.io.Closeable; +import java.io.IOException; +import java.util.concurrent.ConcurrentHashMap; +import java.util.function.Supplier; + +/** + * This is a simple implementation of MetricsRegistry which can be utilized by Unit Tests. + * It just initializes and stores counters/histograms within a map, once created. + * The maps can then be used to get the counters/histograms by their names. + */ +public class TestInMemoryMetricsRegistry implements MetricsRegistry { + + private ConcurrentHashMap counterStore; + private ConcurrentHashMap histogramStore; + + public TestInMemoryMetricsRegistry() { + this.counterStore = new ConcurrentHashMap<>(); + this.histogramStore = new ConcurrentHashMap<>(); + } + + public ConcurrentHashMap getCounterStore() { + return this.counterStore; + } + + public ConcurrentHashMap getHistogramStore() { + return this.histogramStore; + } + + @Override + public Counter createCounter(String name, String description, String unit) { + TestInMemoryCounter counter = new TestInMemoryCounter(); + counterStore.putIfAbsent(name, counter); + return counter; + } + + @Override + public Counter createUpDownCounter(String name, String description, String unit) { + /** + * ToDo: To be implemented when required. + */ + return null; + } + + @Override + public Histogram createHistogram(String name, String description, String unit) { + TestInMemoryHistogram histogram = new TestInMemoryHistogram(); + histogramStore.putIfAbsent(name, histogram); + return histogram; + } + + @Override + public Closeable createGauge(String name, String description, String unit, Supplier valueProvider, Tags tags) { + /** + * ToDo: To be implemented when required. + */ + return null; + } + + @Override + public void close() throws IOException {} +} diff --git a/test/framework/src/main/java/org/opensearch/cluster/coordination/AbstractCoordinatorTestCase.java b/test/framework/src/main/java/org/opensearch/cluster/coordination/AbstractCoordinatorTestCase.java index fae2cd59791c1..3c383993d24fb 100644 --- a/test/framework/src/main/java/org/opensearch/cluster/coordination/AbstractCoordinatorTestCase.java +++ b/test/framework/src/main/java/org/opensearch/cluster/coordination/AbstractCoordinatorTestCase.java @@ -1182,7 +1182,7 @@ protected Optional getDisruptableMockTransport(Transpo nodeHealthService, persistedStateRegistry, remoteStoreNodeService, - NoopMetricsRegistry.INSTANCE + new ClusterManagerMetrics(NoopMetricsRegistry.INSTANCE) ); clusterManagerService.setClusterStatePublisher(coordinator); final GatewayService gatewayService = new GatewayService(