Skip to content

Commit

Permalink
[Feature/extensions] Adding Transport Actions support for extensions (#…
Browse files Browse the repository at this point in the history
…4598)

* Adding Transport Actions support for extensions

Signed-off-by: Sarat Vemulapalli <[email protected]>

* Adding changelog

Signed-off-by: Sarat Vemulapalli <[email protected]>

* Fixing precommit and tests

Signed-off-by: Sarat Vemulapalli <[email protected]>

* Adding Transport Actions support for extensions

Signed-off-by: Sarat Vemulapalli <[email protected]>

* Fixing tests

Signed-off-by: Sarat Vemulapalli <[email protected]>

* Close client which is leaking threads

Signed-off-by: Sarat Vemulapalli <[email protected]>

* Fixing spotless

Signed-off-by: Sarat Vemulapalli <[email protected]>

* Few more tests

Signed-off-by: Sarat Vemulapalli <[email protected]>

* Adding tests

Signed-off-by: Sarat Vemulapalli <[email protected]>

* Addressing comments

Signed-off-by: Sarat Vemulapalli <[email protected]>

* Addressing comments and fixing spotless

Signed-off-by: Sarat Vemulapalli <[email protected]>

* Addressing comments

Signed-off-by: Sarat Vemulapalli <[email protected]>

Signed-off-by: Sarat Vemulapalli <[email protected]>
  • Loading branch information
saratvemulapalli authored Oct 7, 2022
1 parent be4d337 commit eee59c5
Show file tree
Hide file tree
Showing 25 changed files with 1,163 additions and 75 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,7 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.0.0/)
- Adding create component extension point support for AD ([#4517](https:/opensearch-project/OpenSearch/pull/4517))
- Add getSettings support for AD([#4519](https:/opensearch-project/OpenSearch/pull/4519))
- Fixed javadoc warning for build failure([#4581](https:/opensearch-project/OpenSearch/pull/4581))
- Added transport actions support for extensions ([#4598](https:/opensearch-project/OpenSearch/pull/4598/))
- Pass REST params and content to extensions ([#4633](https:/opensearch-project/OpenSearch/pull/4633))

## [2.x]
Expand Down
5 changes: 5 additions & 0 deletions server/src/main/java/org/opensearch/action/ActionModule.java
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,8 @@
import org.opensearch.common.settings.Settings;
import org.opensearch.common.settings.SettingsFilter;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.extensions.action.ExtensionProxyAction;
import org.opensearch.extensions.action.ExtensionTransportAction;
import org.opensearch.index.seqno.RetentionLeaseActions;
import org.opensearch.indices.SystemIndices;
import org.opensearch.indices.breaker.CircuitBreakerService;
Expand Down Expand Up @@ -696,6 +698,9 @@ public <Request extends ActionRequest, Response extends ActionResponse> void reg
// Remote Store
actions.register(RestoreRemoteStoreAction.INSTANCE, TransportRestoreRemoteStoreAction.class);

// ExtensionProxyAction
actions.register(ExtensionProxyAction.INSTANCE, ExtensionTransportAction.class);

// Decommission actions
actions.register(DecommissionAction.INSTANCE, TransportDecommissionAction.class);
actions.register(GetDecommissionStateAction.INSTANCE, TransportGetDecommissionStateAction.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
/**
* Generic string response indicating the status of some previous request sent to the SDK
*
* @opensearch.internal
* @opensearch.api
*/
public class ExtensionStringResponse extends TransportResponse {
private String response;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.opensearch.Version;
import org.opensearch.action.admin.cluster.node.info.PluginsAndModules;
import org.opensearch.action.admin.cluster.state.ClusterStateResponse;
import org.opensearch.client.node.NodeClient;
import org.opensearch.cluster.ClusterSettingsResponse;
import org.opensearch.cluster.LocalNodeResponse;
import org.opensearch.cluster.node.DiscoveryNode;
Expand All @@ -39,6 +40,10 @@
import org.opensearch.discovery.InitializeExtensionsRequest;
import org.opensearch.discovery.InitializeExtensionsResponse;
import org.opensearch.extensions.ExtensionsSettings.Extension;
import org.opensearch.extensions.action.ExtensionActionRequest;
import org.opensearch.extensions.action.ExtensionActionResponse;
import org.opensearch.extensions.action.ExtensionTransportActionsHandler;
import org.opensearch.extensions.action.TransportActionRequestFromExtension;
import org.opensearch.extensions.rest.RegisterRestActionsRequest;
import org.opensearch.extensions.rest.RestActionsRequestHandler;
import org.opensearch.extensions.settings.CustomSettingsRequestHandler;
Expand Down Expand Up @@ -83,6 +88,9 @@ public class ExtensionsOrchestrator implements ReportingService<PluginsAndModule
public static final String REQUEST_OPENSEARCH_PARSE_NAMED_WRITEABLE = "internal:discovery/parsenamedwriteable";
public static final String REQUEST_EXTENSION_ACTION_LISTENER_ON_FAILURE = "internal:extensions/actionlisteneronfailure";
public static final String REQUEST_REST_EXECUTE_ON_EXTENSION_ACTION = "internal:extensions/restexecuteonextensiontaction";
public static final String REQUEST_EXTENSION_HANDLE_TRANSPORT_ACTION = "internal:extensions/handle-transportaction";
public static final String TRANSPORT_ACTION_REQUEST_FROM_EXTENSION = "internal:extensions/request-transportaction-from-extension";
public static final int EXTENSION_REQUEST_WAIT_TIMEOUT = 10;

private static final Logger logger = LogManager.getLogger(ExtensionsOrchestrator.class);

Expand Down Expand Up @@ -113,6 +121,7 @@ public static enum OpenSearchRequestType {
}

private final Path extensionsPath;
ExtensionTransportActionsHandler extensionTransportActionsHandler;
// A list of initialized extensions, a subset of the values of map below which includes all extensions
List<DiscoveryExtension> extensionsInitializedList;
// A map of extension uniqueId to full extension details used for node transport here and in the RestActionsRequestHandler
Expand All @@ -126,6 +135,7 @@ public static enum OpenSearchRequestType {
ExtensionActionListenerHandler listenerHandler;
EnvironmentSettingsRequestHandler environmentSettingsRequestHandler;
AddSettingsUpdateConsumerRequestHandler addSettingsUpdateConsumerRequestHandler;
NodeClient client;

/**
* Instantiate a new ExtensionsOrchestrator object to handle requests and responses from extensions. This is called during Node bootstrap.
Expand All @@ -137,12 +147,15 @@ public static enum OpenSearchRequestType {
public ExtensionsOrchestrator(Settings settings, Path extensionsPath) throws IOException {
logger.info("ExtensionsOrchestrator initialized");
this.extensionsPath = extensionsPath;
this.transportService = null;
this.listener = new ExtensionActionListener();
this.extensionsInitializedList = new ArrayList<DiscoveryExtension>();
this.extensionIdMap = new HashMap<String, DiscoveryExtension>();
// will be initialized in initializeServicesAndRestHandler which is called after the Node is initialized
this.transportService = null;
this.clusterService = null;
this.namedWriteableRegistry = null;
this.listener = new ExtensionActionListener();
this.client = null;
this.extensionTransportActionsHandler = null;

/*
* Now Discover extensions
Expand All @@ -160,13 +173,15 @@ public ExtensionsOrchestrator(Settings settings, Path extensionsPath) throws IOE
* @param transportService The Node's transport service.
* @param clusterService The Node's cluster service.
* @param initialEnvironmentSettings The finalized view of settings for the Environment
* @param client The client used to make transport requests
*/
public void initializeServicesAndRestHandler(
RestController restController,
SettingsModule settingsModule,
TransportService transportService,
ClusterService clusterService,
Settings initialEnvironmentSettings
Settings initialEnvironmentSettings,
NodeClient client
) {
this.restActionsRequestHandler = new RestActionsRequestHandler(restController, extensionIdMap, transportService);
this.listenerHandler = new ExtensionActionListenerHandler(listener);
Expand All @@ -179,9 +194,20 @@ public void initializeServicesAndRestHandler(
transportService,
REQUEST_EXTENSION_UPDATE_SETTINGS
);
this.client = client;
this.extensionTransportActionsHandler = new ExtensionTransportActionsHandler(extensionIdMap, transportService, client);
registerRequestHandler();
}

/**
* Handles Transport Request from {@link org.opensearch.extensions.action.ExtensionTransportAction} which was invoked by an extension via {@link ExtensionTransportActionsHandler}.
*
* @param request which was sent by an extension.
*/
public ExtensionActionResponse handleTransportRequest(ExtensionActionRequest request) throws InterruptedException {
return extensionTransportActionsHandler.sendTransportRequestToExtension(request);
}

private void registerRequestHandler() {
transportService.registerRequestHandler(
REQUEST_EXTENSION_REGISTER_REST_ACTIONS,
Expand Down Expand Up @@ -255,7 +281,19 @@ private void registerRequestHandler() {
false,
false,
RegisterTransportActionsRequest::new,
((request, channel, task) -> channel.sendResponse(handleRegisterTransportActionsRequest(request)))
((request, channel, task) -> channel.sendResponse(
extensionTransportActionsHandler.handleRegisterTransportActionsRequest(request)
))
);
transportService.registerRequestHandler(
TRANSPORT_ACTION_REQUEST_FROM_EXTENSION,
ThreadPool.Names.GENERIC,
false,
false,
TransportActionRequestFromExtension::new,
((request, channel, task) -> channel.sendResponse(
extensionTransportActionsHandler.handleTransportActionRequestFromExtension(request)
))
);
}

Expand Down Expand Up @@ -373,28 +411,12 @@ public String executor() {
new InitializeExtensionsRequest(transportService.getLocalNode(), extension),
extensionResponseHandler
);
inProgressLatch.await(100, TimeUnit.SECONDS);
inProgressLatch.await(EXTENSION_REQUEST_WAIT_TIMEOUT, TimeUnit.SECONDS);
} catch (Exception e) {
logger.error(e.toString());
}
}

/**
* Handles a {@link RegisterTransportActionsRequest}.
*
* @param transportActionsRequest The request to handle.
* @return A {@link ExtensionBooleanResponse} indicating success.
* @throws Exception if the request is not handled properly.
*/
TransportResponse handleRegisterTransportActionsRequest(RegisterTransportActionsRequest transportActionsRequest) throws Exception {
/*
* TODO: https:/opensearch-project/opensearch-sdk-java/issues/107
* Register these new Transport Actions with ActionModule
* and add support for NodeClient to recognise these actions when making transport calls.
*/
return new ExtensionBooleanResponse(true);
}

/**
* Handles an {@link ExtensionRequest}.
*
Expand Down Expand Up @@ -483,7 +505,7 @@ public void beforeIndexRemoved(
/*
* Making async synchronous for now.
*/
inProgressIndexNameLatch.await(100, TimeUnit.SECONDS);
inProgressIndexNameLatch.await(EXTENSION_REQUEST_WAIT_TIMEOUT, TimeUnit.SECONDS);
logger.info("Received ack response from Extension");
} catch (Exception e) {
logger.error(e.toString());
Expand Down Expand Up @@ -517,7 +539,7 @@ public String executor() {
/*
* Making asynchronous for now.
*/
inProgressLatch.await(100, TimeUnit.SECONDS);
inProgressLatch.await(EXTENSION_REQUEST_WAIT_TIMEOUT, TimeUnit.SECONDS);
logger.info("Received response from Extension");
} catch (Exception e) {
logger.error(e.toString());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,17 @@
* @opensearch.internal
*/
public class RegisterTransportActionsRequest extends TransportRequest {
private String uniqueId;
private Map<String, Class> transportActions;

public RegisterTransportActionsRequest(Map<String, Class> transportActions) {
public RegisterTransportActionsRequest(String uniqueId, Map<String, Class> transportActions) {
this.uniqueId = uniqueId;
this.transportActions = new HashMap<>(transportActions);
}

public RegisterTransportActionsRequest(StreamInput in) throws IOException {
super(in);
this.uniqueId = in.readString();
Map<String, Class> actions = new HashMap<>();
int actionCount = in.readVInt();
for (int i = 0; i < actionCount; i++) {
Expand All @@ -45,13 +48,18 @@ public RegisterTransportActionsRequest(StreamInput in) throws IOException {
this.transportActions = actions;
}

public String getUniqueId() {
return uniqueId;
}

public Map<String, Class> getTransportActions() {
return transportActions;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeString(uniqueId);
out.writeVInt(this.transportActions.size());
for (Map.Entry<String, Class> action : transportActions.entrySet()) {
out.writeString(action.getKey());
Expand All @@ -61,19 +69,19 @@ public void writeTo(StreamOutput out) throws IOException {

@Override
public String toString() {
return "TransportActionsRequest{actions=" + transportActions + "}";
return "TransportActionsRequest{uniqueId=" + uniqueId + ", actions=" + transportActions + "}";
}

@Override
public boolean equals(Object obj) {
if (this == obj) return true;
if (obj == null || getClass() != obj.getClass()) return false;
RegisterTransportActionsRequest that = (RegisterTransportActionsRequest) obj;
return Objects.equals(transportActions, that.transportActions);
return Objects.equals(uniqueId, that.uniqueId) && Objects.equals(transportActions, that.transportActions);
}

@Override
public int hashCode() {
return Objects.hash(transportActions);
return Objects.hash(uniqueId, transportActions);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.extensions.action;

import org.opensearch.action.ActionRequest;
import org.opensearch.action.ActionRequestValidationException;
import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.common.io.stream.StreamOutput;

import java.io.IOException;

/**
* This class translates Extension transport request to ActionRequest
* which is internally used to make transport action call.
*
* @opensearch.internal
*/
public class ExtensionActionRequest extends ActionRequest {
/**
* action is the transport action intended to be invoked which is registered by an extension via {@link ExtensionTransportActionsHandler}.
*/
private final String action;
/**
* requestBytes is the raw bytes being transported between extensions.
*/
private final byte[] requestBytes;

/**
* ExtensionActionRequest constructor.
*
* @param action is the transport action intended to be invoked which is registered by an extension via {@link ExtensionTransportActionsHandler}.
* @param requestBytes is the raw bytes being transported between extensions.
*/
public ExtensionActionRequest(String action, byte[] requestBytes) {
this.action = action;
this.requestBytes = requestBytes;
}

/**
* ExtensionActionRequest constructor from {@link StreamInput}.
*
* @param in bytes stream input used to de-serialize the message.
* @throws IOException when message de-serialization fails.
*/
ExtensionActionRequest(StreamInput in) throws IOException {
super(in);
action = in.readString();
requestBytes = in.readByteArray();
}

public String getAction() {
return action;
}

public byte[] getRequestBytes() {
return requestBytes;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeString(action);
out.writeByteArray(requestBytes);
}

@Override
public ActionRequestValidationException validate() {
return null;
}
}
Loading

0 comments on commit eee59c5

Please sign in to comment.