Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor transportservice request #167

Merged
merged 18 commits into from
Oct 6, 2022
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 35 additions & 36 deletions src/main/java/org/opensearch/sdk/ExtensionsRunner.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.opensearch.extensions.EnvironmentSettingsRequest;
import org.opensearch.extensions.AddSettingsUpdateConsumerRequest;
import org.opensearch.extensions.UpdateSettingsRequest;
import org.opensearch.extensions.ExtensionsOrchestrator.RequestType;
import org.opensearch.extensions.ExtensionRequest;
import org.opensearch.extensions.ExtensionsOrchestrator;
import org.opensearch.index.IndicesModuleRequest;
Expand All @@ -44,6 +45,8 @@
import org.opensearch.sdk.handlers.ExtensionStringResponseHandler;
import org.opensearch.sdk.handlers.OpensearchRequestHandler;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.TransportResponse;
import org.opensearch.transport.TransportResponseHandler;
import org.opensearch.transport.TransportService;
import org.opensearch.transport.TransportSettings;

Expand Down Expand Up @@ -314,24 +317,32 @@ public void sendRegisterCustomSettingsRequest(TransportService transportService)
}
}

private void sendGenericRequestWithExceptionHandling(
TransportService transportService,
RequestType requestType,
String orchestratorNameString,
TransportResponseHandler<? extends TransportResponse> responseHandler
) {
vibrantvarun marked this conversation as resolved.
Show resolved Hide resolved
logger.info("Sending " + requestType + " request to OpenSearch");
try {
transportService.sendRequest(opensearchNode, orchestratorNameString, new ExtensionRequest(requestType), responseHandler);
} catch (Exception e) {
logger.info("Failed to send " + requestType + " request to OpenSearch", e);
}
}

/**
* Requests the cluster state from OpenSearch. The result will be handled by a {@link ClusterStateResponseHandler}.
*
* @param transportService The TransportService defining the connection to OpenSearch.
*/
public void sendClusterStateRequest(TransportService transportService) {
logger.info("Sending Cluster State request to OpenSearch");
ClusterStateResponseHandler clusterStateResponseHandler = new ClusterStateResponseHandler();
try {
transportService.sendRequest(
opensearchNode,
ExtensionsOrchestrator.REQUEST_EXTENSION_CLUSTER_STATE,
new ExtensionRequest(ExtensionsOrchestrator.RequestType.REQUEST_EXTENSION_CLUSTER_STATE),
clusterStateResponseHandler
);
} catch (Exception e) {
logger.info("Failed to send Cluster State request to OpenSearch", e);
}
sendGenericRequestWithExceptionHandling(
transportService,
ExtensionsOrchestrator.RequestType.REQUEST_EXTENSION_CLUSTER_STATE,
ExtensionsOrchestrator.REQUEST_EXTENSION_CLUSTER_STATE,
new ClusterStateResponseHandler()
);
}

/**
Expand All @@ -340,18 +351,12 @@ public void sendClusterStateRequest(TransportService transportService) {
* @param transportService The TransportService defining the connection to OpenSearch.
*/
public void sendClusterSettingsRequest(TransportService transportService) {
logger.info("Sending Cluster Settings request to OpenSearch");
ClusterSettingsResponseHandler clusterSettingsResponseHandler = new ClusterSettingsResponseHandler();
try {
transportService.sendRequest(
opensearchNode,
ExtensionsOrchestrator.REQUEST_EXTENSION_CLUSTER_SETTINGS,
new ExtensionRequest(ExtensionsOrchestrator.RequestType.REQUEST_EXTENSION_CLUSTER_SETTINGS),
clusterSettingsResponseHandler
);
} catch (Exception e) {
logger.info("Failed to send Cluster Settings request to OpenSearch", e);
}
sendGenericRequestWithExceptionHandling(
transportService,
ExtensionsOrchestrator.RequestType.REQUEST_EXTENSION_LOCAL_NODE,
ExtensionsOrchestrator.REQUEST_EXTENSION_LOCAL_NODE,
new ClusterSettingsResponseHandler()
);
}

/**
Expand All @@ -360,18 +365,12 @@ public void sendClusterSettingsRequest(TransportService transportService) {
* @param transportService The TransportService defining the connection to OpenSearch.
*/
public void sendLocalNodeRequest(TransportService transportService) {
logger.info("Sending Local Node request to OpenSearch");
LocalNodeResponseHandler localNodeResponseHandler = new LocalNodeResponseHandler();
try {
transportService.sendRequest(
opensearchNode,
ExtensionsOrchestrator.REQUEST_EXTENSION_LOCAL_NODE,
new ExtensionRequest(ExtensionsOrchestrator.RequestType.REQUEST_EXTENSION_LOCAL_NODE),
localNodeResponseHandler
);
} catch (Exception e) {
logger.info("Failed to send Cluster Settings request to OpenSearch", e);
}
sendGenericRequestWithExceptionHandling(
transportService,
ExtensionsOrchestrator.RequestType.REQUEST_EXTENSION_LOCAL_NODE,
ExtensionsOrchestrator.REQUEST_EXTENSION_LOCAL_NODE,
new LocalNodeResponseHandler()
);
}

/**
Expand Down