Skip to content

Commit

Permalink
Refactor handler method (opensearch-project#158)
Browse files Browse the repository at this point in the history
* issue opensearch-project#28

Signed-off-by: mloufra <[email protected]>

* Update the lastest coomit

Signed-off-by: mloufra <[email protected]>

* Rename the method and fix the conflict

Signed-off-by: mloufra <[email protected]>

* fix merge conflict

Signed-off-by: mloufra <[email protected]>

* Add code coverage report

Signed-off-by: mloufra <[email protected]>

* Rebase the lastest commit

Signed-off-by: mloufra <[email protected]>

* update the lastest commit

Signed-off-by: mloufra <[email protected]>

* refactor class for handler method in ExtensionsRunner

Signed-off-by: mloufra <[email protected]>

* add documentation for handler

Signed-off-by: mloufra <[email protected]>

* fix merge conflict

Signed-off-by: mloufra <[email protected]>

* delete all the static

Signed-off-by: mloufra <[email protected]>

* fix documentation problem

Signed-off-by: mloufra <[email protected]>

* fix NullPointerException bug

Signed-off-by: mloufra <[email protected]>

* change RestResponse to ExtensionRestResponse in ExtensionsRestRequestHandler

Signed-off-by: mloufra <[email protected]>

* change documentation for ExtensionsRunner

Signed-off-by: mloufra <[email protected]>

Signed-off-by: mloufra <[email protected]>
  • Loading branch information
mloufra authored and kokibas committed Mar 17, 2023
1 parent c07fc75 commit bfaf46e
Show file tree
Hide file tree
Showing 10 changed files with 317 additions and 140 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,25 +29,25 @@
/**
* API used to handle named writeable registry requests from OpenSearch
*/
public class NamedWriteableRegistryAPI {
private final Logger logger = LogManager.getLogger(NamedWriteableRegistryAPI.class);
public class ExtensionNamedWriteableRegistry {
private final Logger logger = LogManager.getLogger(ExtensionNamedWriteableRegistry.class);
private List<NamedWriteableRegistry.Entry> namedWriteables;
private final NamedWriteableRegistry namedWriteableRegistry;

/**
* Constructor for NamedWriteableRegistryAPI. Creates a NamedWriteableRegistry for this extension
* Constructor for ExtensionNamedWriteableRegistry. Creates a NamedWriteableRegistry for this extension
*/
public NamedWriteableRegistryAPI() {
public ExtensionNamedWriteableRegistry() {
this.namedWriteables = getNamedWriteables();
this.namedWriteableRegistry = new NamedWriteableRegistry(namedWriteables);
}

/**
* Constructor for NamedWriteableRegistryAPI. Creates and populates a NamedWriteableRegistry with the given NamedWriteableRegistry entries for this extension
* Constructor for ExtensionNamedWriteableRegistry. Creates and populates a NamedWriteableRegistry with the given NamedWriteableRegistry entries for this extension
*
* @param extensionNamedWriteables List of NamedWriteableRegistry.Entry to be registered
*/
public NamedWriteableRegistryAPI(List<NamedWriteableRegistry.Entry> extensionNamedWriteables) {
public ExtensionNamedWriteableRegistry(List<NamedWriteableRegistry.Entry> extensionNamedWriteables) {
this.namedWriteables = extensionNamedWriteables;
this.namedWriteableRegistry = new NamedWriteableRegistry(namedWriteables);
}
Expand Down
4 changes: 2 additions & 2 deletions src/main/java/org/opensearch/sdk/ExtensionRestRequest.java
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ protected ExtensionRestRequest(RestExecuteOnExtensionRequest request) throws Ill
/**
* Object generated from input stream
* @param in Input stream
* @throws IOException
* @throws IOException if there's an error in generating object from input stream
*/
public ExtensionRestRequest(StreamInput in) throws IOException {
super(in);
Expand All @@ -68,7 +68,7 @@ public ExtensionRestRequest(StreamInput in) throws IOException {
/**
* Write this object to output stream
* @param out the writeable output stream
* @throws IOException
* @throws IOException if there's an error in generating object from output stream
*/
@Override
public void writeTo(StreamOutput out) throws IOException {
Expand Down
175 changes: 53 additions & 122 deletions src/main/java/org/opensearch/sdk/ExtensionsRunner.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,22 +15,18 @@
import org.opensearch.Version;
import org.opensearch.cluster.ClusterModule;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.common.bytes.BytesReference;
import org.opensearch.common.io.stream.NamedWriteableRegistry;
import org.opensearch.common.io.stream.NamedWriteableRegistryParseRequest;
import org.opensearch.extensions.OpenSearchRequest;
import org.opensearch.extensions.rest.RegisterRestActionsRequest;
import org.opensearch.extensions.rest.RestExecuteOnExtensionRequest;
import org.opensearch.extensions.rest.RestExecuteOnExtensionResponse;
import org.opensearch.extensions.settings.RegisterCustomSettingsRequest;
import org.opensearch.common.network.NetworkModule;
import org.opensearch.common.network.NetworkService;
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.PageCacheRecycler;
import org.opensearch.extensions.ExtensionBooleanResponse;
import org.opensearch.discovery.InitializeExtensionsRequest;
import org.opensearch.discovery.InitializeExtensionsResponse;
import org.opensearch.extensions.ExtensionActionListenerOnFailureRequest;
import org.opensearch.extensions.DiscoveryExtension;
import org.opensearch.extensions.EnvironmentSettingsRequest;
Expand All @@ -39,31 +35,32 @@
import org.opensearch.extensions.ExtensionRequest;
import org.opensearch.extensions.ExtensionsOrchestrator;
import org.opensearch.index.IndicesModuleRequest;
import org.opensearch.index.IndicesModuleResponse;
import org.opensearch.indices.IndicesModule;
import org.opensearch.indices.breaker.CircuitBreakerService;
import org.opensearch.indices.breaker.NoneCircuitBreakerService;
import org.opensearch.rest.RestStatus;
import org.opensearch.rest.RestHandler.Route;
import org.opensearch.rest.RestResponse;
import org.opensearch.transport.netty4.Netty4Transport;
import org.opensearch.transport.SharedGroupFactory;
import org.opensearch.sdk.handlers.ActionListenerOnFailureResponseHandler;
import org.opensearch.sdk.handlers.ClusterSettingsResponseHandler;
import org.opensearch.sdk.handlers.ClusterStateResponseHandler;
import org.opensearch.sdk.handlers.EnvironmentSettingsResponseHandler;
import org.opensearch.sdk.handlers.ExtensionBooleanResponseHandler;
import org.opensearch.sdk.handlers.ExtensionsIndicesModuleNameRequestHandler;
import org.opensearch.sdk.handlers.ExtensionsIndicesModuleRequestHandler;
import org.opensearch.sdk.handlers.ExtensionsInitRequestHandler;
import org.opensearch.sdk.handlers.ExtensionsRestRequestHandler;
import org.opensearch.sdk.handlers.LocalNodeResponseHandler;
import org.opensearch.sdk.handlers.UpdateSettingsRequestHandler;
import org.opensearch.sdk.handlers.ExtensionStringResponseHandler;
import org.opensearch.sdk.handlers.OpensearchRequestHandler;
import org.opensearch.search.SearchModule;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.ClusterConnectionManager;
import org.opensearch.transport.ConnectionManager;
import org.opensearch.transport.TransportInterceptor;
import org.opensearch.transport.TransportService;
import org.opensearch.transport.TransportSettings;
import org.opensearch.transport.TransportResponse;

import java.io.File;
import java.io.IOException;
Expand Down Expand Up @@ -91,23 +88,46 @@ public class ExtensionsRunner {
private static final String NODE_NAME_SETTING = "node.name";

private String uniqueId;
private DiscoveryNode opensearchNode;
/**
* This field is initialized by a call from {@link ExtensionsInitRequestHandler}.
*/
public DiscoveryNode opensearchNode;
private DiscoveryExtension extensionNode;
private TransportService extensionTransportService = null;
/**
* This field is initialized by a call from {@link ExtensionsInitRequestHandler}.
*/
public TransportService extensionTransportService = null;
// The routes and classes which handle the REST requests
private final ExtensionRestPathRegistry extensionRestPathRegistry = new ExtensionRestPathRegistry();
// Custom settings from the extension's getSettings
/**
* This field is initialized by a call from {@link ExtensionsInitRequestHandler}.
*/
private final List<Setting<?>> customSettings;
// Node name, host, and port
private final Settings settings;
/**
* This field is initialized by a call from {@link ExtensionsInitRequestHandler}.
*/
public final Settings settings;
private final TransportInterceptor NOOP_TRANSPORT_INTERCEPTOR = new TransportInterceptor() {
};
private NamedWriteableRegistryAPI namedWriteableRegistryApi = new NamedWriteableRegistryAPI();
private ExtensionNamedWriteableRegistry namedWriteableRegistryApi = new ExtensionNamedWriteableRegistry();
private ExtensionsInitRequestHandler extensionsInitRequestHandler = new ExtensionsInitRequestHandler();
private OpensearchRequestHandler opensearchRequestHandler = new OpensearchRequestHandler();
private ExtensionsIndicesModuleRequestHandler extensionsIndicesModuleRequestHandler = new ExtensionsIndicesModuleRequestHandler();
private ExtensionsIndicesModuleNameRequestHandler extensionsIndicesModuleNameRequestHandler =
new ExtensionsIndicesModuleNameRequestHandler();
private ExtensionsRestRequestHandler extensionsRestRequestHandler = new ExtensionsRestRequestHandler();

/*
* TODO: expose an interface for extension to register actions
* https:/opensearch-project/opensearch-sdk-java/issues/119
*/
private TransportActions transportActions = new TransportActions(new HashMap<>());
/**
* Instantiates a new transportActions
*/
public TransportActions transportActions = new TransportActions(new HashMap<>());

/**
* Instantiates a new update settings request handler
*/
Expand Down Expand Up @@ -161,131 +181,38 @@ private static ExtensionSettings readExtensionSettings() throws IOException {
return objectMapper.readValue(file, ExtensionSettings.class);
}

/**
* This method is call from {@link ExtensionsInitRequestHandler}.
* @param extensionTransportService assign value for extensionTransportService
*/
void setExtensionTransportService(TransportService extensionTransportService) {
this.extensionTransportService = extensionTransportService;
}

private void setUniqueId(String id) {
/**
* Sets the Unique ID, used in REST requests to uniquely identify this extension
* @param id assign value for id
*/
public void setUniqueId(String id) {
this.uniqueId = id;
}

String getUniqueId() {
return uniqueId;
}

private void setOpensearchNode(DiscoveryNode opensearchNode) {
public void setOpensearchNode(DiscoveryNode opensearchNode) {
this.opensearchNode = opensearchNode;
}

private void setExtensionNode(DiscoveryExtension extensionNode) {
public void setExtensionNode(DiscoveryExtension extensionNode) {
this.extensionNode = extensionNode;
}

DiscoveryNode getOpensearchNode() {
return opensearchNode;
}

/**
* Handles a extension request from OpenSearch. This is the first request for the transport communication and will initialize the extension and will be a part of OpenSearch bootstrap.
*
* @param extensionInitRequest The request to handle.
* @return A response to OpenSearch validating that this is an extension.
*/
InitializeExtensionsResponse handleExtensionInitRequest(InitializeExtensionsRequest extensionInitRequest) {
logger.info("Registering Extension Request received from OpenSearch");
opensearchNode = extensionInitRequest.getSourceNode();
setUniqueId(extensionInitRequest.getExtension().getId());
// Successfully initialized. Send the response.
try {
return new InitializeExtensionsResponse(settings.get(NODE_NAME_SETTING));
} finally {
// After sending successful response to initialization, send the REST API and Settings
setOpensearchNode(opensearchNode);
setExtensionNode(extensionInitRequest.getExtension());
extensionTransportService.connectToNode(opensearchNode);
sendRegisterRestActionsRequest(extensionTransportService);
sendRegisterCustomSettingsRequest(extensionTransportService);
transportActions.sendRegisterTransportActionsRequest(extensionTransportService, opensearchNode);
}
}

/**
* Handles a request from OpenSearch and invokes the extension point API corresponding with the request type
*
* @param request The request to handle.
* @return A response to OpenSearch for the corresponding API
* @throws Exception if the corresponding handler for the request is not present
*/
TransportResponse handleOpenSearchRequest(OpenSearchRequest request) throws Exception {
// Read enum
switch (request.getRequestType()) {
case REQUEST_OPENSEARCH_NAMED_WRITEABLE_REGISTRY:
return namedWriteableRegistryApi.handleNamedWriteableRegistryRequest(request);
// Add additional request handlers here
default:
throw new Exception("Handler not present for the provided request");
}
}

/**
* Handles a request for extension point indices from OpenSearch. The {@link #handleExtensionInitRequest(InitializeExtensionsRequest)} method must have been called first to initialize the extension.
*
* @param indicesModuleRequest The request to handle.
* @param transportService The transport service communicating with OpenSearch.
* @return A response to OpenSearch with this extension's index and search listeners.
*/
IndicesModuleResponse handleIndicesModuleRequest(IndicesModuleRequest indicesModuleRequest, TransportService transportService) {
logger.info("Registering Indices Module Request received from OpenSearch");
IndicesModuleResponse indicesModuleResponse = new IndicesModuleResponse(true, true, true);
return indicesModuleResponse;
}

/**
* Handles a request for extension name from OpenSearch. The {@link #handleExtensionInitRequest(InitializeExtensionsRequest)} method must have been called first to initialize the extension.
*
* @param indicesModuleRequest The request to handle.
* @return A response acknowledging the request.
*/
ExtensionBooleanResponse handleIndicesModuleNameRequest(IndicesModuleRequest indicesModuleRequest) {
// Works as beforeIndexRemoved
logger.info("Registering Indices Module Name Request received from OpenSearch");
ExtensionBooleanResponse indicesModuleNameResponse = new ExtensionBooleanResponse(true);
return indicesModuleNameResponse;
}

/**
* Handles a request from OpenSearch to execute a REST request on the extension.
*
* @param request The REST request to execute.
* @return A response acknowledging the request.
*/
RestExecuteOnExtensionResponse handleRestExecuteOnExtensionRequest(RestExecuteOnExtensionRequest request) {

ExtensionRestHandler restHandler = extensionRestPathRegistry.getHandler(request.getMethod(), request.getUri());
if (restHandler == null) {
return new RestExecuteOnExtensionResponse(
RestStatus.NOT_FOUND,
"No handler for " + ExtensionRestPathRegistry.restPathToString(request.getMethod(), request.getUri())
);
}
// ExtensionRestRequest restRequest = new ExtensionRestRequest(request);
ExtensionRestRequest restRequest = new ExtensionRestRequest(
request.getMethod(),
request.getUri(),
request.getRequestIssuerIdentity()
);

// Get response from extension
RestResponse response = restHandler.handleRequest(restRequest);
logger.info("Sending extension response to OpenSearch: " + response.status());
return new RestExecuteOnExtensionResponse(
response.status(),
response.contentType(),
BytesReference.toBytes(response.content()),
response.getHeaders()
);
}

/**
* Initializes a Netty4Transport object. This object will be wrapped in a {@link TransportService} object.
*
Expand Down Expand Up @@ -381,7 +308,7 @@ public void startTransportService(TransportService transportService) {
false,
false,
InitializeExtensionsRequest::new,
(request, channel, task) -> channel.sendResponse(handleExtensionInitRequest(request))
(request, channel, task) -> channel.sendResponse(extensionsInitRequestHandler.handleExtensionInitRequest(request, this))
);

transportService.registerRequestHandler(
Expand All @@ -390,7 +317,7 @@ public void startTransportService(TransportService transportService) {
false,
false,
OpenSearchRequest::new,
(request, channel, task) -> channel.sendResponse(handleOpenSearchRequest(request))
(request, channel, task) -> channel.sendResponse(opensearchRequestHandler.handleOpenSearchRequest(request))
);

transportService.registerRequestHandler(
Expand All @@ -408,7 +335,9 @@ public void startTransportService(TransportService transportService) {
false,
false,
IndicesModuleRequest::new,
((request, channel, task) -> channel.sendResponse(handleIndicesModuleRequest(request, transportService)))
((request, channel, task) -> channel.sendResponse(
extensionsIndicesModuleRequestHandler.handleIndicesModuleRequest(request, transportService)
))

);

Expand All @@ -418,7 +347,9 @@ public void startTransportService(TransportService transportService) {
false,
false,
IndicesModuleRequest::new,
((request, channel, task) -> channel.sendResponse(handleIndicesModuleNameRequest(request)))
((request, channel, task) -> channel.sendResponse(
extensionsIndicesModuleNameRequestHandler.handleIndicesModuleNameRequest(request)
))
);

transportService.registerRequestHandler(
Expand All @@ -427,7 +358,7 @@ public void startTransportService(TransportService transportService) {
false,
false,
RestExecuteOnExtensionRequest::new,
((request, channel, task) -> channel.sendResponse(handleRestExecuteOnExtensionRequest(request)))
((request, channel, task) -> channel.sendResponse(extensionsRestRequestHandler.handleRestExecuteOnExtensionRequest(request)))
);

transportService.registerRequestHandler(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
package org.opensearch.sdk.handlers;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.extensions.ExtensionBooleanResponse;
import org.opensearch.sdk.ExtensionsRunner;
import org.opensearch.index.IndicesModuleRequest;

/**
* This class handles the request from OpenSearch to a {@link ExtensionsRunner#startTransportService(TransportService transportService)} call.
*/

public class ExtensionsIndicesModuleNameRequestHandler {
private static final Logger logger = LogManager.getLogger(ExtensionsIndicesModuleNameRequestHandler.class);

/**
* Handles a request for extension name from OpenSearch. The {@link ExtensionsInitRequestHandler} method must have been called first to initialize the extension.
*
* @param indicesModuleRequest The request to handle.
* @return A response acknowledging the request.
*/
public ExtensionBooleanResponse handleIndicesModuleNameRequest(IndicesModuleRequest indicesModuleRequest) {
// Works as beforeIndexRemoved
logger.info("Registering Indices Module Name Request received from OpenSearch");
ExtensionBooleanResponse indicesModuleNameResponse = new ExtensionBooleanResponse(true);
return indicesModuleNameResponse;
}

}
Loading

0 comments on commit bfaf46e

Please sign in to comment.