Skip to content

Commit

Permalink
[Feature/extensions] Provide Extension API to OpenSearch (#4100)
Browse files Browse the repository at this point in the history
* Provide Extension API to OpenSearch

Signed-off-by: Daniel Widdis <[email protected]>

* Add tests

Signed-off-by: Daniel Widdis <[email protected]>

* Add javadocs

Signed-off-by: Daniel Widdis <[email protected]>

* Add parsing of API requests

Signed-off-by: Daniel Widdis <[email protected]>

* Tweak API parsing for more flexible user formatting

Signed-off-by: Daniel Widdis <[email protected]>

* Remove forbidden toUpperCase usage

Signed-off-by: Daniel Widdis <[email protected]>

* Fix failing test as we now register four handlers

Signed-off-by: Daniel Widdis <[email protected]>

* Rename Api to RestApi

Signed-off-by: Daniel Widdis <[email protected]>

* Fix errors in previous merge conflict resolution

Signed-off-by: Daniel Widdis <[email protected]>

* Rename RestApi to RestActions

Signed-off-by: Daniel Widdis <[email protected]>

* Fix tests to fail on the correct wrong thing

Signed-off-by: Daniel Widdis <[email protected]>

* More verbose variable name

Signed-off-by: Daniel Widdis <[email protected]>

* Fix license header

Signed-off-by: Daniel Widdis <[email protected]>

* Fixing NodeGatewayStartedShards bwc (de)serialization issues (#4258)

Signed-off-by: Andriy Redko <[email protected]>

Signed-off-by: Andriy Redko <[email protected]>

* Fixing NodeGatewayStartedShards bwc (de)serialization issues (#4258)

Signed-off-by: Andriy Redko <[email protected]>

Signed-off-by: Andriy Redko <[email protected]>

* Fix tests to account for TotalHits uncertainty

Signed-off-by: Daniel Widdis <[email protected]>

Signed-off-by: Daniel Widdis <[email protected]>
Signed-off-by: Andriy Redko <[email protected]>
Co-authored-by: Andriy Redko <[email protected]>
  • Loading branch information
dbwiddis and reta authored Aug 19, 2022
1 parent 2cf4413 commit e3acafd
Show file tree
Hide file tree
Showing 7 changed files with 327 additions and 80 deletions.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,10 @@
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
Expand All @@ -48,6 +46,7 @@
import org.opensearch.indices.cluster.IndicesClusterStateService;
import org.opensearch.node.ReportingService;
import org.opensearch.plugins.PluginInfo;
import org.opensearch.rest.RestRequest;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.TransportException;
import org.opensearch.transport.TransportResponse;
Expand All @@ -56,8 +55,6 @@

import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
import java.util.HashMap;
import java.util.List;

/**
* The main class for Plugin Extensibility
Expand All @@ -71,6 +68,7 @@ public class ExtensionsOrchestrator implements ReportingService<PluginsAndModule
public static final String REQUEST_EXTENSION_CLUSTER_STATE = "internal:discovery/clusterstate";
public static final String REQUEST_EXTENSION_LOCAL_NODE = "internal:discovery/localnode";
public static final String REQUEST_EXTENSION_CLUSTER_SETTINGS = "internal:discovery/clustersettings";
public static final String REQUEST_EXTENSION_REGISTER_REST_ACTIONS = "internal:discovery/registerrestactions";
public static final String REQUEST_OPENSEARCH_NAMED_WRITEABLE_REGISTRY = "internal:discovery/namedwriteableregistry";
public static final String REQUEST_OPENSEARCH_PARSE_NAMED_WRITEABLE = "internal:discovery/parsenamedwriteable";

Expand All @@ -85,6 +83,7 @@ public static enum RequestType {
REQUEST_EXTENSION_CLUSTER_STATE,
REQUEST_EXTENSION_LOCAL_NODE,
REQUEST_EXTENSION_CLUSTER_SETTINGS,
REQUEST_EXTENSION_REGISTER_REST_ACTIONS,
CREATE_COMPONENT,
ON_INDEX_MODULE,
GET_SETTINGS
Expand All @@ -102,16 +101,27 @@ public static enum OpenSearchRequestType {
private final Path extensionsPath;
final List<DiscoveryExtension> extensionsList;
List<DiscoveryExtension> extensionsInitializedList;
Map<String, DiscoveryExtension> extensionIdMap;
Map<String, List<String>> extensionRestActionsMap;
TransportService transportService;
ClusterService clusterService;
ExtensionNamedWriteableRegistry namedWriteableRegistry;

/**
* Instantiate a new ExtensionsOrchestrator object to handle requests and responses from extensions.
*
* @param settings Settings from the node the orchestrator is running on.
* @param extensionsPath Path to a directory containing extensions.
* @throws IOException If the extensions discovery file is not properly retrieved.
*/
public ExtensionsOrchestrator(Settings settings, Path extensionsPath) throws IOException {
logger.info("ExtensionsOrchestrator initialized");
this.extensionsPath = extensionsPath;
this.transportService = null;
this.extensionsList = new ArrayList<DiscoveryExtension>();
this.extensionsInitializedList = new ArrayList<DiscoveryExtension>();
this.extensionIdMap = new HashMap<String, DiscoveryExtension>();
this.extensionRestActionsMap = new HashMap<String, List<String>>();
this.clusterService = null;
this.namedWriteableRegistry = null;

Expand All @@ -122,6 +132,11 @@ public ExtensionsOrchestrator(Settings settings, Path extensionsPath) throws IOE

}

/**
* Sets the transport service and registers request handlers.
*
* @param transportService The transport service to set.
*/
public void setTransportService(TransportService transportService) {
this.transportService = transportService;
registerRequestHandler();
Expand All @@ -136,6 +151,14 @@ public void setNamedWriteableRegistry() {
}

private void registerRequestHandler() {
transportService.registerRequestHandler(
REQUEST_EXTENSION_REGISTER_REST_ACTIONS,
ThreadPool.Names.GENERIC,
false,
false,
RegisterRestActionsRequest::new,
((request, channel, task) -> channel.sendResponse(handleRegisterRestActionsRequest(request)))
);
transportService.registerRequestHandler(
REQUEST_EXTENSION_CLUSTER_STATE,
ThreadPool.Names.GENERIC,
Expand Down Expand Up @@ -185,30 +208,30 @@ private void extensionsDiscovery() throws IOException {
}
for (Extension extension : extensions) {
try {
extensionsList.add(
new DiscoveryExtension(
DiscoveryExtension discoveryExtension = new DiscoveryExtension(
extension.getName(),
extension.getUniqueId(),
// placeholder for ephemeral id, will change with POC discovery
extension.getUniqueId(),
extension.getHostName(),
extension.getHostAddress(),
new TransportAddress(InetAddress.getByName(extension.getHostAddress()), Integer.parseInt(extension.getPort())),
new HashMap<String, String>(),
Version.fromString(extension.getOpensearchVersion()),
new PluginInfo(
extension.getName(),
extension.getUniqueId(),
// placeholder for ephemeral id, will change with POC discovery
extension.getUniqueId(),
extension.getHostName(),
extension.getHostAddress(),
new TransportAddress(InetAddress.getByName(extension.getHostAddress()), Integer.parseInt(extension.getPort())),
new HashMap<String, String>(),
extension.getDescription(),
extension.getVersion(),
Version.fromString(extension.getOpensearchVersion()),
new PluginInfo(
extension.getName(),
extension.getDescription(),
extension.getVersion(),
Version.fromString(extension.getOpensearchVersion()),
extension.getJavaVersion(),
extension.getClassName(),
new ArrayList<String>(),
Boolean.parseBoolean(extension.hasNativeController())
)
extension.getJavaVersion(),
extension.getClassName(),
new ArrayList<String>(),
Boolean.parseBoolean(extension.hasNativeController())
)
);
logger.info("Loaded extension: " + extension);
extensionsList.add(discoveryExtension);
extensionIdMap.put(extension.getUniqueId(), discoveryExtension);
logger.info("Loaded extension: " + extension + " with id " + extension.getUniqueId());
} catch (IllegalArgumentException e) {
logger.error(e.toString());
}
Expand Down Expand Up @@ -242,6 +265,7 @@ public void handleResponse(InitializeExtensionsResponse response) {
for (DiscoveryExtension extension : extensionsList) {
if (extension.getName().equals(response.getName())) {
extensionsInitializedList.add(extension);
logger.info("Initialized extension: " + extension.getName());
break;
}
}
Expand All @@ -250,7 +274,7 @@ public void handleResponse(InitializeExtensionsResponse response) {

@Override
public void handleException(TransportException exp) {
logger.debug(new ParameterizedMessage("Extension request failed"), exp);
logger.debug(new ParameterizedMessage("Extension initialization failed"), exp);
inProgressLatch.countDown();
}

Expand All @@ -274,23 +298,62 @@ public String executor() {
}
}

TransportResponse handleExtensionRequest(ExtensionRequest extensionRequest) throws Exception {
// Read enum
if (extensionRequest.getRequestType() == RequestType.REQUEST_EXTENSION_CLUSTER_STATE) {
ClusterStateResponse clusterStateResponse = new ClusterStateResponse(
clusterService.getClusterName(),
clusterService.state(),
false
/**
* Handles a {@link RegisterRestActionsRequest}.
*
* @param restActionsRequest The request to handle.
* @return A {@link RegisterRestActionsResponse} indicating success.
* @throws Exception if the request is not handled properly.
*/
TransportResponse handleRegisterRestActionsRequest(RegisterRestActionsRequest restActionsRequest) throws Exception {
DiscoveryExtension extension = extensionIdMap.get(restActionsRequest.getNodeId());
if (extension == null) {
throw new IllegalArgumentException(
"REST Actions Request unique id " + restActionsRequest.getNodeId() + " does not match a discovered extension."
);
return clusterStateResponse;
} else if (extensionRequest.getRequestType() == RequestType.REQUEST_EXTENSION_LOCAL_NODE) {
LocalNodeResponse localNodeResponse = new LocalNodeResponse(clusterService);
return localNodeResponse;
} else if (extensionRequest.getRequestType() == RequestType.REQUEST_EXTENSION_CLUSTER_SETTINGS) {
ClusterSettingsResponse clusterSettingsResponse = new ClusterSettingsResponse(clusterService);
return clusterSettingsResponse;
}
throw new Exception("Handler not present for the provided request");
for (String restAction : restActionsRequest.getRestActions()) {
RestRequest.Method method;
String uri;
try {
int delim = restAction.indexOf(' ');
method = RestRequest.Method.valueOf(restAction.substring(0, delim));
uri = restAction.substring(delim).trim();
} catch (IndexOutOfBoundsException | IllegalArgumentException e) {
throw new IllegalArgumentException(restAction + " does not begin with a valid REST method");
}
logger.info("Registering: " + method + " /_extensions/_" + extension.getName() + uri);
// TODO turn the restAction string into an Action to send to RestController.registerHandler
}
extensionRestActionsMap.put(restActionsRequest.getNodeId(), restActionsRequest.getRestActions());
return new RegisterRestActionsResponse(
"Registered node "
+ restActionsRequest.getNodeId()
+ ", extension "
+ extension.getName()
+ " to handle REST Actions "
+ restActionsRequest.getRestActions()
);
}

/**
* Handles an {@link ExtensionRequest}.
*
* @param extensionRequest The request to handle, of a type defined in the {@link RequestType} enum.
* @return an Response matching the request.
* @throws Exception if the request is not handled properly.
*/
TransportResponse handleExtensionRequest(ExtensionRequest extensionRequest) throws Exception {
switch (extensionRequest.getRequestType()) {
case REQUEST_EXTENSION_CLUSTER_STATE:
return new ClusterStateResponse(clusterService.getClusterName(), clusterService.state(), false);
case REQUEST_EXTENSION_LOCAL_NODE:
return new LocalNodeResponse(clusterService);
case REQUEST_EXTENSION_CLUSTER_SETTINGS:
return new ClusterSettingsResponse(clusterService);
default:
throw new Exception("Handler not present for the provided request");
}
}

public void onIndexModule(IndexModule indexModule) throws UnknownHostException {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.extensions;

import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.common.io.stream.StreamOutput;
import org.opensearch.transport.TransportRequest;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;

/**
* Request to register extension REST actions
*
* @opensearch.internal
*/
public class RegisterRestActionsRequest extends TransportRequest {
private String nodeId;
private List<String> restActions;

public RegisterRestActionsRequest(String nodeId, List<String> restActions) {
this.nodeId = nodeId;
this.restActions = new ArrayList<>(restActions);
}

public RegisterRestActionsRequest(StreamInput in) throws IOException {
super(in);
nodeId = in.readString();
restActions = in.readStringList();
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeString(nodeId);
out.writeStringCollection(restActions);
}

public String getNodeId() {
return nodeId;
}

public List<String> getRestActions() {
return new ArrayList<>(restActions);
}

@Override
public String toString() {
return "RestActionsRequest{nodeId=" + nodeId + ", restActions=" + restActions + "}";
}

@Override
public boolean equals(Object obj) {
if (this == obj) return true;
if (obj == null || getClass() != obj.getClass()) return false;
RegisterRestActionsRequest that = (RegisterRestActionsRequest) obj;
return Objects.equals(nodeId, that.nodeId) && Objects.equals(restActions, that.restActions);
}

@Override
public int hashCode() {
return Objects.hash(nodeId, restActions);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.extensions;

import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.common.io.stream.StreamOutput;
import org.opensearch.transport.TransportResponse;

import java.io.IOException;

/**
* Response to register REST Actions request.
*
* @opensearch.internal
*/
public class RegisterRestActionsResponse extends TransportResponse {
private String response;

public RegisterRestActionsResponse(String response) {
this.response = response;
}

public RegisterRestActionsResponse(StreamInput in) throws IOException {
response = in.readString();
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeString(response);
}

public String getResponse() {
return response;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -373,7 +373,7 @@ public NodeGatewayStartedShards(StreamInput in) throws IOException {
} else {
storeException = null;
}
if (in.getVersion().onOrAfter(Version.V_3_0_0) && in.readBoolean()) {
if (in.getVersion().onOrAfter(Version.V_2_3_0) && in.readBoolean()) {
replicationCheckpoint = new ReplicationCheckpoint(in);
} else {
replicationCheckpoint = null;
Expand Down Expand Up @@ -430,7 +430,7 @@ public void writeTo(StreamOutput out) throws IOException {
} else {
out.writeBoolean(false);
}
if (out.getVersion().onOrAfter(Version.V_3_0_0)) {
if (out.getVersion().onOrAfter(Version.V_2_3_0)) {
if (replicationCheckpoint != null) {
out.writeBoolean(true);
replicationCheckpoint.writeTo(out);
Expand Down
Loading

0 comments on commit e3acafd

Please sign in to comment.