Skip to content

Commit

Permalink
Modified sendClusterStateRequest to return the cluster state to caller (
Browse files Browse the repository at this point in the history
opensearch-project#201)

Signed-off-by: Joshua Palis <[email protected]>

Signed-off-by: Joshua Palis <[email protected]>
  • Loading branch information
joshpalis authored and kokibas committed Mar 17, 2023
1 parent f4713ab commit c8e414d
Show file tree
Hide file tree
Showing 2 changed files with 53 additions and 7 deletions.
30 changes: 23 additions & 7 deletions src/main/java/org/opensearch/sdk/ExtensionsRunner.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.common.io.stream.NamedWriteableRegistryParseRequest;
import org.opensearch.extensions.OpenSearchRequest;
Expand Down Expand Up @@ -314,14 +315,29 @@ private void sendGenericRequestWithExceptionHandling(
* Requests the cluster state from OpenSearch. The result will be handled by a {@link ClusterStateResponseHandler}.
*
* @param transportService The TransportService defining the connection to OpenSearch.
* @return The cluster state of OpenSearch
*/
public void sendClusterStateRequest(TransportService transportService) {
sendGenericRequestWithExceptionHandling(
transportService,
ExtensionsOrchestrator.RequestType.REQUEST_EXTENSION_CLUSTER_STATE,
ExtensionsOrchestrator.REQUEST_EXTENSION_CLUSTER_STATE,
new ClusterStateResponseHandler()
);

public ClusterState sendClusterStateRequest(TransportService transportService) {
logger.info("Sending Cluster State request to OpenSearch");
ClusterStateResponseHandler clusterStateResponseHandler = new ClusterStateResponseHandler();
try {
transportService.sendRequest(
opensearchNode,
ExtensionsOrchestrator.REQUEST_EXTENSION_CLUSTER_STATE,
new ExtensionRequest(ExtensionsOrchestrator.RequestType.REQUEST_EXTENSION_CLUSTER_STATE),
clusterStateResponseHandler
);
// Wait on cluster state response
clusterStateResponseHandler.awaitResponse();
} catch (InterruptedException e) {
logger.info("Failed to recieve Cluster State response from OpenSearch", e);
} catch (Exception e) {
logger.info("Failed to send Cluster State request to OpenSearch", e);
}

// At this point, response handler has read in the cluster state
return clusterStateResponseHandler.getClusterState();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,29 +12,48 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.action.admin.cluster.state.ClusterStateResponse;
import org.opensearch.cluster.ClusterState;
import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.extensions.ExtensionsOrchestrator;
import org.opensearch.sdk.ExtensionsRunner;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.TransportException;
import org.opensearch.transport.TransportResponseHandler;
import org.opensearch.transport.TransportService;

import java.io.IOException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

/**
* This class handles the response from OpenSearch to a {@link ExtensionsRunner#sendClusterStateRequest(TransportService)} call.
*/
public class ClusterStateResponseHandler implements TransportResponseHandler<ClusterStateResponse> {
private static final Logger logger = LogManager.getLogger(ClusterStateResponseHandler.class);
private final CountDownLatch inProgressLatch;
private ClusterState clusterState;

/**
* Instantiates a new ClusterStateResponseHandler with a count down latch and an empty ClusterState object
*/
public ClusterStateResponseHandler() {
this.inProgressLatch = new CountDownLatch(1);
this.clusterState = ClusterState.EMPTY_STATE;
}

@Override
public void handleResponse(ClusterStateResponse response) {
logger.info("received {}", response);

// Set cluster state from response
this.clusterState = response.getState();
inProgressLatch.countDown();
}

@Override
public void handleException(TransportException exp) {
logger.info("ExtensionClusterStateRequest failed", exp);
inProgressLatch.countDown();
}

@Override
Expand All @@ -46,4 +65,15 @@ public String executor() {
public ClusterStateResponse read(StreamInput in) throws IOException {
return new ClusterStateResponse(in);
}

/**
* Invokes await on the ClusterStateResponseHandler count down latch
*/
public void awaitResponse() throws InterruptedException {
inProgressLatch.await(ExtensionsOrchestrator.EXTENSION_REQUEST_WAIT_TIMEOUT, TimeUnit.SECONDS);
}

public ClusterState getClusterState() {
return this.clusterState;
}
}

0 comments on commit c8e414d

Please sign in to comment.