Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add ClusterStateRequest parameter to cluster state transport request #668

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 0 additions & 31 deletions src/main/java/org/opensearch/sdk/ExtensionsRunner.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
import org.apache.logging.log4j.Logger;
import org.opensearch.action.ActionType;
import org.opensearch.action.support.TransportAction;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.metadata.IndexNameExpressionResolver;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.common.io.stream.NamedWriteableRegistry;
Expand All @@ -35,7 +34,6 @@
import org.opensearch.rest.RestHandler.Route;
import org.opensearch.sdk.api.ActionExtension;
import org.opensearch.sdk.handlers.ClusterSettingsResponseHandler;
import org.opensearch.sdk.handlers.ClusterStateResponseHandler;
import org.opensearch.sdk.handlers.EnvironmentSettingsResponseHandler;
import org.opensearch.sdk.handlers.ExtensionActionRequestHandler;
import org.opensearch.sdk.action.SDKActionModule;
Expand Down Expand Up @@ -553,35 +551,6 @@ private void sendGenericRequestWithExceptionHandling(
}
}

/**
* Requests the cluster state from OpenSearch. The result will be handled by a {@link ClusterStateResponseHandler}.
*
* @param transportService The TransportService defining the connection to OpenSearch.
* @return The cluster state of OpenSearch
*/

public ClusterState sendClusterStateRequest(TransportService transportService) {
logger.info("Sending Cluster State request to OpenSearch");
ClusterStateResponseHandler clusterStateResponseHandler = new ClusterStateResponseHandler();
try {
transportService.sendRequest(
opensearchNode,
ExtensionsManager.REQUEST_EXTENSION_CLUSTER_STATE,
new ExtensionRequest(ExtensionsManager.RequestType.REQUEST_EXTENSION_CLUSTER_STATE),
clusterStateResponseHandler
);
// Wait on cluster state response
clusterStateResponseHandler.awaitResponse();
} catch (TimeoutException e) {
logger.info("Failed to receive Cluster State response from OpenSearch", e);
} catch (Exception e) {
logger.info("Failed to send Cluster State request to OpenSearch", e);
}

// At this point, response handler has read in the cluster state
return clusterStateResponseHandler.getClusterState();
}

/**
* Request the Dependency Information from Opensearch. The result will be handled by a {@link ExtensionDependencyResponseHandler}.
*
Expand Down
3 changes: 2 additions & 1 deletion src/main/java/org/opensearch/sdk/SDKClusterService.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Consumer;

import org.opensearch.action.admin.cluster.state.ClusterStateRequest;
import org.opensearch.cluster.ClusterState;
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.SettingUpgrader;
Expand Down Expand Up @@ -52,7 +53,7 @@ public SDKClusterService(ExtensionsRunner extensionsRunner) {
*/
public ClusterState state() {
if (extensionsRunner.isInitialized()) {
return extensionsRunner.sendClusterStateRequest(extensionsRunner.getExtensionTransportService());
return extensionsRunner.getSdkTransportService().sendClusterStateRequest(new ClusterStateRequest().all());
}
throw new IllegalStateException("The Extensions Runner has not been initialized.");
}
Expand Down
29 changes: 29 additions & 0 deletions src/main/java/org/opensearch/sdk/SDKTransportService.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.action.admin.cluster.state.ClusterStateRequest;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.extensions.ExtensionsManager;
import org.opensearch.extensions.action.RegisterTransportActionsRequest;
Expand All @@ -27,6 +29,7 @@
import org.opensearch.sdk.action.RemoteExtensionActionRequest;
import org.opensearch.sdk.action.SDKActionModule;
import org.opensearch.sdk.handlers.AcknowledgedResponseHandler;
import org.opensearch.sdk.handlers.ClusterStateResponseHandler;
import org.opensearch.sdk.handlers.ExtensionActionResponseHandler;
import org.opensearch.transport.TransportService;

Expand Down Expand Up @@ -106,6 +109,32 @@ public RemoteExtensionActionResponse sendRemoteExtensionActionRequest(RemoteExte
);
}

/**
* Requests the Cluster State from OpenSearch
*
* @param request a ClusterStateRequest object defining the information to be retrieved
* @return The requested cluster state. Only the parts of the cluster state that were requested are included in the returned {@link ClusterState} instance.
*/
public ClusterState sendClusterStateRequest(ClusterStateRequest request) {
logger.info("Sending Cluster State request to OpenSearch");
ClusterStateResponseHandler clusterStateResponseHandler = new ClusterStateResponseHandler();
try {
transportService.sendRequest(
opensearchNode,
ExtensionsManager.REQUEST_EXTENSION_CLUSTER_STATE,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will conflict with #647. We'll probably get your changes in and I'll rebase.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I thought so.

Also, I'll be doing another PR for GetFieldMappings which will follow this template, and we (you) may want to make a more generic protobuf action for all the TransportActions (with associated ActionRequest/ActionResponse) with similar casing to this... they all implement Writeable.

Or we could adapt the ProxyAction code to do this. I started down that road but it got pretty spaghetti pretty fast.

request,
clusterStateResponseHandler
);
// Wait on response
clusterStateResponseHandler.awaitResponse();
} catch (TimeoutException e) {
logger.error("Failed to receive Cluster State response from OpenSearch", e);
} catch (Exception e) {
logger.error("Failed to send Cluster State request to OpenSearch", e);
}
return clusterStateResponseHandler.getClusterState();
}

public TransportService getTransportService() {
return transportService;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,18 +15,16 @@
import org.opensearch.cluster.ClusterState;
import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.extensions.ExtensionsManager;
import org.opensearch.sdk.ExtensionsRunner;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.TransportException;
import org.opensearch.transport.TransportResponseHandler;
import org.opensearch.transport.TransportService;

import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;

/**
* This class handles the response from OpenSearch to a {@link ExtensionsRunner#sendClusterStateRequest(TransportService)} call.
* This class handles the response from OpenSearch to a {@link SDKTransportService#sendClusterStateRequest()} call.
*/
public class ClusterStateResponseHandler implements TransportResponseHandler<ClusterStateResponse> {
private static final Logger logger = LogManager.getLogger(ClusterStateResponseHandler.class);
Expand Down
9 changes: 0 additions & 9 deletions src/test/java/org/opensearch/sdk/TestExtensionsRunner.java
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@
import org.opensearch.common.settings.Setting;
import org.opensearch.extensions.UpdateSettingsRequest;
import org.opensearch.sdk.handlers.ClusterSettingsResponseHandler;
import org.opensearch.sdk.handlers.ClusterStateResponseHandler;
import org.opensearch.sdk.handlers.EnvironmentSettingsResponseHandler;
import org.opensearch.sdk.handlers.ExtensionsInitRequestHandler;
import org.opensearch.sdk.handlers.ExtensionsRestRequestHandler;
Expand Down Expand Up @@ -177,14 +176,6 @@ public void testHandleUpdateSettingsRequest() throws Exception {
);
}

@Test
public void testClusterStateRequest() {

extensionsRunner.sendClusterStateRequest(transportService);

verify(transportService, times(1)).sendRequest(any(), anyString(), any(), any(ClusterStateResponseHandler.class));
}

@Test
public void testClusterSettingRequest() {

Expand Down
15 changes: 10 additions & 5 deletions src/test/java/org/opensearch/sdk/TestSDKClusterService.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentCaptor;
import org.opensearch.action.admin.cluster.state.ClusterStateRequest;
import org.opensearch.common.Strings;
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Setting.Property;
import org.opensearch.common.settings.Settings;
Expand Down Expand Up @@ -54,12 +56,15 @@ public void testState() throws Exception {

// After initialization should be successful
when(extensionsRunner.isInitialized()).thenReturn(true);
sdkClusterService.state();
verify(extensionsRunner, times(1)).getExtensionTransportService();
SDKTransportService sdkTransportService = mock(SDKTransportService.class);
when(extensionsRunner.getSdkTransportService()).thenReturn(sdkTransportService);

ArgumentCaptor<TransportService> argumentCaptor = ArgumentCaptor.forClass(TransportService.class);
verify(extensionsRunner, times(1)).sendClusterStateRequest(argumentCaptor.capture());
assertNull(argumentCaptor.getValue());
sdkClusterService.state();
ArgumentCaptor<ClusterStateRequest> argumentCaptor = ArgumentCaptor.forClass(ClusterStateRequest.class);
verify(sdkTransportService, times(1)).sendClusterStateRequest(argumentCaptor.capture());
assertArrayEquals(Strings.EMPTY_ARRAY, argumentCaptor.getValue().indices());
assertTrue(argumentCaptor.getValue().nodes());
assertTrue(argumentCaptor.getValue().routingTable());
}

@Test
Expand Down
45 changes: 45 additions & 0 deletions src/test/java/org/opensearch/sdk/TestSDKTransportService.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,20 +13,26 @@
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentCaptor;
import org.opensearch.Version;
import org.opensearch.action.admin.cluster.state.ClusterStateRequest;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.transport.TransportAddress;
import org.opensearch.extensions.ExtensionsManager;
import org.opensearch.extensions.action.RegisterTransportActionsRequest;
import org.opensearch.extensions.action.TransportActionRequestFromExtension;
import org.opensearch.sdk.action.RemoteExtensionAction;
import org.opensearch.sdk.action.RemoteExtensionActionRequest;
import org.opensearch.sdk.action.SDKActionModule;
import org.opensearch.sdk.action.TestSDKActionModule;
import org.opensearch.sdk.handlers.AcknowledgedResponseHandler;
import org.opensearch.sdk.handlers.ClusterStateResponseHandler;
import org.opensearch.sdk.handlers.ExtensionActionResponseHandler;
import org.opensearch.test.OpenSearchTestCase;
import org.opensearch.transport.Transport;
import org.opensearch.transport.TransportService;

import java.net.InetAddress;
import java.nio.charset.StandardCharsets;
import java.util.Collections;

import static java.util.Collections.emptyMap;
Expand Down Expand Up @@ -101,4 +107,43 @@ public void testRegisterTransportAction() {
// Internal action should be filtered out
assertFalse(registerTransportActionsRequestCaptor.getValue().getTransportActions().contains(RemoteExtensionAction.class.getName()));
}

@Test
public void testRemoteExtensionActionRequest() {
ArgumentCaptor<TransportActionRequestFromExtension> transportActionRequestFromExtensionCaptor = ArgumentCaptor.forClass(
TransportActionRequestFromExtension.class
);
String expectedAction = "com.example.action";
String expectedRequest = "com.example.request";
byte[] expectedRequestBytes = "test".getBytes(StandardCharsets.UTF_8);
RemoteExtensionActionRequest request = new RemoteExtensionActionRequest(expectedAction, expectedRequest, expectedRequestBytes);
sdkTransportService.sendRemoteExtensionActionRequest(request);
verify(transportService, times(1)).sendRequest(
any(),
eq(ExtensionsManager.TRANSPORT_ACTION_REQUEST_FROM_EXTENSION),
transportActionRequestFromExtensionCaptor.capture(),
any(ExtensionActionResponseHandler.class)
);
assertEquals(TEST_UNIQUE_ID, transportActionRequestFromExtensionCaptor.getValue().getUniqueId());
assertEquals(expectedAction, transportActionRequestFromExtensionCaptor.getValue().getAction());
String expectedString = expectedRequest + (char) RemoteExtensionActionRequest.UNIT_SEPARATOR + "test";
assertEquals(
expectedString,
new String(transportActionRequestFromExtensionCaptor.getValue().getRequestBytes(), StandardCharsets.UTF_8)
);
}

@Test
public void testsendClusterStateRequest() {
ArgumentCaptor<ClusterStateRequest> clusterStateRequestCaptor = ArgumentCaptor.forClass(ClusterStateRequest.class);
ClusterStateRequest request = new ClusterStateRequest().clear().indices("foo", "bar");
sdkTransportService.sendClusterStateRequest(request);
verify(transportService, times(1)).sendRequest(
any(),
eq(ExtensionsManager.REQUEST_EXTENSION_CLUSTER_STATE),
clusterStateRequestCaptor.capture(),
any(ClusterStateResponseHandler.class)
);
assertArrayEquals(new String[] { "foo", "bar" }, clusterStateRequestCaptor.getValue().indices());
}
owaiskazi19 marked this conversation as resolved.
Show resolved Hide resolved
}