Skip to content

Commit

Permalink
Add ClusterStateRequest parameter to cluster state transport request
Browse files Browse the repository at this point in the history
Signed-off-by: Daniel Widdis <widdis@gmail.com>
  • Loading branch information
dbwiddis committed Apr 10, 2023
1 parent 94618e0 commit cf3943a
Show file tree
Hide file tree
Showing 7 changed files with 87 additions and 49 deletions.
31 changes: 0 additions & 31 deletions src/main/java/org/opensearch/sdk/ExtensionsRunner.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
import org.apache.logging.log4j.Logger;
import org.opensearch.action.ActionType;
import org.opensearch.action.support.TransportAction;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.metadata.IndexNameExpressionResolver;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.common.io.stream.NamedWriteableRegistry;
Expand All @@ -35,7 +34,6 @@
import org.opensearch.rest.RestHandler.Route;
import org.opensearch.sdk.api.ActionExtension;
import org.opensearch.sdk.handlers.ClusterSettingsResponseHandler;
import org.opensearch.sdk.handlers.ClusterStateResponseHandler;
import org.opensearch.sdk.handlers.EnvironmentSettingsResponseHandler;
import org.opensearch.sdk.handlers.ExtensionActionRequestHandler;
import org.opensearch.sdk.action.SDKActionModule;
Expand Down Expand Up @@ -553,35 +551,6 @@ private void sendGenericRequestWithExceptionHandling(
}
}

/**
* Requests the cluster state from OpenSearch. The result will be handled by a {@link ClusterStateResponseHandler}.
*
* @param transportService The TransportService defining the connection to OpenSearch.
* @return The cluster state of OpenSearch
*/

public ClusterState sendClusterStateRequest(TransportService transportService) {
logger.info("Sending Cluster State request to OpenSearch");
ClusterStateResponseHandler clusterStateResponseHandler = new ClusterStateResponseHandler();
try {
transportService.sendRequest(
opensearchNode,
ExtensionsManager.REQUEST_EXTENSION_CLUSTER_STATE,
new ExtensionRequest(ExtensionsManager.RequestType.REQUEST_EXTENSION_CLUSTER_STATE),
clusterStateResponseHandler
);
// Wait on cluster state response
clusterStateResponseHandler.awaitResponse();
} catch (TimeoutException e) {
logger.info("Failed to receive Cluster State response from OpenSearch", e);
} catch (Exception e) {
logger.info("Failed to send Cluster State request to OpenSearch", e);
}

// At this point, response handler has read in the cluster state
return clusterStateResponseHandler.getClusterState();
}

/**
* Request the Dependency Information from Opensearch. The result will be handled by a {@link ExtensionDependencyResponseHandler}.
*
Expand Down
3 changes: 2 additions & 1 deletion src/main/java/org/opensearch/sdk/SDKClusterService.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Consumer;

import org.opensearch.action.admin.cluster.state.ClusterStateRequest;
import org.opensearch.cluster.ClusterState;
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.SettingUpgrader;
Expand Down Expand Up @@ -52,7 +53,7 @@ public SDKClusterService(ExtensionsRunner extensionsRunner) {
*/
public ClusterState state() {
if (extensionsRunner.isInitialized()) {
return extensionsRunner.sendClusterStateRequest(extensionsRunner.getExtensionTransportService());
return extensionsRunner.getSdkTransportService().sendClusterStateRequest(new ClusterStateRequest().all());
}
throw new IllegalStateException("The Extensions Runner has not been initialized.");
}
Expand Down
29 changes: 29 additions & 0 deletions src/main/java/org/opensearch/sdk/SDKTransportService.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.action.admin.cluster.state.ClusterStateRequest;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.extensions.ExtensionsManager;
import org.opensearch.extensions.action.RegisterTransportActionsRequest;
Expand All @@ -27,6 +29,7 @@
import org.opensearch.sdk.action.RemoteExtensionActionRequest;
import org.opensearch.sdk.action.SDKActionModule;
import org.opensearch.sdk.handlers.AcknowledgedResponseHandler;
import org.opensearch.sdk.handlers.ClusterStateResponseHandler;
import org.opensearch.sdk.handlers.ExtensionActionResponseHandler;
import org.opensearch.transport.TransportService;

Expand Down Expand Up @@ -106,6 +109,32 @@ public RemoteExtensionActionResponse sendRemoteExtensionActionRequest(RemoteExte
);
}

/**
* Requests the Cluster State from OpenSearch
*
* @param request a ClusterStateRequest object defining the information to be retrieved
* @return The requested cluster state. Only the parts of the cluster state that were requested are included in the returned {@link ClusterState} instance.
*/
public ClusterState sendClusterStateRequest(ClusterStateRequest request) {
logger.info("Sending Cluster State request to OpenSearch");
ClusterStateResponseHandler clusterStateResponseHandler = new ClusterStateResponseHandler();
try {
transportService.sendRequest(
opensearchNode,
ExtensionsManager.REQUEST_EXTENSION_CLUSTER_STATE,
request,
clusterStateResponseHandler
);
// Wait on response
clusterStateResponseHandler.awaitResponse();
} catch (TimeoutException e) {
logger.error("Failed to receive Cluster State response from OpenSearch", e);
} catch (Exception e) {
logger.error("Failed to send Cluster State request to OpenSearch", e);
}
return clusterStateResponseHandler.getClusterState();
}

public TransportService getTransportService() {
return transportService;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,18 +15,16 @@
import org.opensearch.cluster.ClusterState;
import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.extensions.ExtensionsManager;
import org.opensearch.sdk.ExtensionsRunner;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.TransportException;
import org.opensearch.transport.TransportResponseHandler;
import org.opensearch.transport.TransportService;

import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;

/**
* This class handles the response from OpenSearch to a {@link ExtensionsRunner#sendClusterStateRequest(TransportService)} call.
* This class handles the response from OpenSearch to a {@link SDKTransportService#sendClusterStateRequest()} call.
*/
public class ClusterStateResponseHandler implements TransportResponseHandler<ClusterStateResponse> {
private static final Logger logger = LogManager.getLogger(ClusterStateResponseHandler.class);
Expand Down
9 changes: 0 additions & 9 deletions src/test/java/org/opensearch/sdk/TestExtensionsRunner.java
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@
import org.opensearch.common.settings.Setting;
import org.opensearch.extensions.UpdateSettingsRequest;
import org.opensearch.sdk.handlers.ClusterSettingsResponseHandler;
import org.opensearch.sdk.handlers.ClusterStateResponseHandler;
import org.opensearch.sdk.handlers.EnvironmentSettingsResponseHandler;
import org.opensearch.sdk.handlers.ExtensionsInitRequestHandler;
import org.opensearch.sdk.handlers.ExtensionsRestRequestHandler;
Expand Down Expand Up @@ -177,14 +176,6 @@ public void testHandleUpdateSettingsRequest() throws Exception {
);
}

@Test
public void testClusterStateRequest() {

extensionsRunner.sendClusterStateRequest(transportService);

verify(transportService, times(1)).sendRequest(any(), anyString(), any(), any(ClusterStateResponseHandler.class));
}

@Test
public void testClusterSettingRequest() {

Expand Down
15 changes: 10 additions & 5 deletions src/test/java/org/opensearch/sdk/TestSDKClusterService.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentCaptor;
import org.opensearch.action.admin.cluster.state.ClusterStateRequest;
import org.opensearch.common.Strings;
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Setting.Property;
import org.opensearch.common.settings.Settings;
Expand Down Expand Up @@ -54,12 +56,15 @@ public void testState() throws Exception {

// After initialization should be successful
when(extensionsRunner.isInitialized()).thenReturn(true);
sdkClusterService.state();
verify(extensionsRunner, times(1)).getExtensionTransportService();
SDKTransportService sdkTransportService = mock(SDKTransportService.class);
when(extensionsRunner.getSdkTransportService()).thenReturn(sdkTransportService);

ArgumentCaptor<TransportService> argumentCaptor = ArgumentCaptor.forClass(TransportService.class);
verify(extensionsRunner, times(1)).sendClusterStateRequest(argumentCaptor.capture());
assertNull(argumentCaptor.getValue());
sdkClusterService.state();
ArgumentCaptor<ClusterStateRequest> argumentCaptor = ArgumentCaptor.forClass(ClusterStateRequest.class);
verify(sdkTransportService, times(1)).sendClusterStateRequest(argumentCaptor.capture());
assertArrayEquals(Strings.EMPTY_ARRAY, argumentCaptor.getValue().indices());
assertTrue(argumentCaptor.getValue().nodes());
assertTrue(argumentCaptor.getValue().routingTable());
}

@Test
Expand Down
45 changes: 45 additions & 0 deletions src/test/java/org/opensearch/sdk/TestSDKTransportService.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,20 +13,26 @@
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentCaptor;
import org.opensearch.Version;
import org.opensearch.action.admin.cluster.state.ClusterStateRequest;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.transport.TransportAddress;
import org.opensearch.extensions.ExtensionsManager;
import org.opensearch.extensions.action.RegisterTransportActionsRequest;
import org.opensearch.extensions.action.TransportActionRequestFromExtension;
import org.opensearch.sdk.action.RemoteExtensionAction;
import org.opensearch.sdk.action.RemoteExtensionActionRequest;
import org.opensearch.sdk.action.SDKActionModule;
import org.opensearch.sdk.action.TestSDKActionModule;
import org.opensearch.sdk.handlers.AcknowledgedResponseHandler;
import org.opensearch.sdk.handlers.ClusterStateResponseHandler;
import org.opensearch.sdk.handlers.ExtensionActionResponseHandler;
import org.opensearch.test.OpenSearchTestCase;
import org.opensearch.transport.Transport;
import org.opensearch.transport.TransportService;

import java.net.InetAddress;
import java.nio.charset.StandardCharsets;
import java.util.Collections;

import static java.util.Collections.emptyMap;
Expand Down Expand Up @@ -101,4 +107,43 @@ public void testRegisterTransportAction() {
// Internal action should be filtered out
assertFalse(registerTransportActionsRequestCaptor.getValue().getTransportActions().contains(RemoteExtensionAction.class.getName()));
}

@Test
public void testRemoteExtensionActionRequest() {
ArgumentCaptor<TransportActionRequestFromExtension> transportActionRequestFromExtensionCaptor = ArgumentCaptor.forClass(
TransportActionRequestFromExtension.class
);
String expectedAction = "com.example.action";
String expectedRequest = "com.example.request";
byte[] expectedRequestBytes = "test".getBytes(StandardCharsets.UTF_8);
RemoteExtensionActionRequest request = new RemoteExtensionActionRequest(expectedAction, expectedRequest, expectedRequestBytes);
sdkTransportService.sendRemoteExtensionActionRequest(request);
verify(transportService, times(1)).sendRequest(
any(),
eq(ExtensionsManager.TRANSPORT_ACTION_REQUEST_FROM_EXTENSION),
transportActionRequestFromExtensionCaptor.capture(),
any(ExtensionActionResponseHandler.class)
);
assertEquals(TEST_UNIQUE_ID, transportActionRequestFromExtensionCaptor.getValue().getUniqueId());
assertEquals(expectedAction, transportActionRequestFromExtensionCaptor.getValue().getAction());
String expectedString = expectedRequest + (char) RemoteExtensionActionRequest.UNIT_SEPARATOR + "test";
assertEquals(
expectedString,
new String(transportActionRequestFromExtensionCaptor.getValue().getRequestBytes(), StandardCharsets.UTF_8)
);
}

@Test
public void testsendClusterStateRequest() {
ArgumentCaptor<ClusterStateRequest> clusterStateRequestCaptor = ArgumentCaptor.forClass(ClusterStateRequest.class);
ClusterStateRequest request = new ClusterStateRequest().clear().indices("foo", "bar");
sdkTransportService.sendClusterStateRequest(request);
verify(transportService, times(1)).sendRequest(
any(),
eq(ExtensionsManager.REQUEST_EXTENSION_CLUSTER_STATE),
clusterStateRequestCaptor.capture(),
any(ClusterStateResponseHandler.class)
);
assertArrayEquals(new String[] { "foo", "bar" }, clusterStateRequestCaptor.getValue().indices());
}
}

0 comments on commit cf3943a

Please sign in to comment.