diff --git a/CHANGELOG.md b/CHANGELOG.md index df8f8ec427a3b..af0fc3edf13d5 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -161,6 +161,7 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.0.0/) - Adding create component extension point support for AD ([#4517](https://github.com/opensearch-project/OpenSearch/pull/4517)) - Add getSettings support for AD([#4519](https://github.com/opensearch-project/OpenSearch/pull/4519)) - Fixed javadoc warning for build failure([#4581](https://github.com/opensearch-project/OpenSearch/pull/4581)) + - Added transport actions support for extensions ([#4598](https://github.com/opensearch-project/OpenSearch/pull/4598/)) - Pass REST params and content to extensions ([#4633](https://github.com/opensearch-project/OpenSearch/pull/4633)) ## [2.x] diff --git a/server/src/main/java/org/opensearch/action/ActionModule.java b/server/src/main/java/org/opensearch/action/ActionModule.java index d959d6828a46b..16cd004fcda7c 100644 --- a/server/src/main/java/org/opensearch/action/ActionModule.java +++ b/server/src/main/java/org/opensearch/action/ActionModule.java @@ -280,6 +280,8 @@ import org.opensearch.common.settings.Settings; import org.opensearch.common.settings.SettingsFilter; import org.opensearch.common.util.FeatureFlags; +import org.opensearch.extensions.action.ExtensionProxyAction; +import org.opensearch.extensions.action.ExtensionTransportAction; import org.opensearch.index.seqno.RetentionLeaseActions; import org.opensearch.indices.SystemIndices; import org.opensearch.indices.breaker.CircuitBreakerService; @@ -696,6 +698,9 @@ public void reg // Remote Store actions.register(RestoreRemoteStoreAction.INSTANCE, TransportRestoreRemoteStoreAction.class); + // ExtensionProxyAction + actions.register(ExtensionProxyAction.INSTANCE, ExtensionTransportAction.class); + // Decommission actions actions.register(DecommissionAction.INSTANCE, TransportDecommissionAction.class); actions.register(GetDecommissionStateAction.INSTANCE, TransportGetDecommissionStateAction.class); diff --git a/server/src/main/java/org/opensearch/extensions/ExtensionStringResponse.java b/server/src/main/java/org/opensearch/extensions/ExtensionStringResponse.java index 5c9c4c3ef784b..91fb5752e012d 100644 --- a/server/src/main/java/org/opensearch/extensions/ExtensionStringResponse.java +++ b/server/src/main/java/org/opensearch/extensions/ExtensionStringResponse.java @@ -17,7 +17,7 @@ /** * Generic string response indicating the status of some previous request sent to the SDK * - * @opensearch.internal + * @opensearch.api */ public class ExtensionStringResponse extends TransportResponse { private String response; diff --git a/server/src/main/java/org/opensearch/extensions/ExtensionsOrchestrator.java b/server/src/main/java/org/opensearch/extensions/ExtensionsOrchestrator.java index 24a0c122120d8..52702ea257a7d 100644 --- a/server/src/main/java/org/opensearch/extensions/ExtensionsOrchestrator.java +++ b/server/src/main/java/org/opensearch/extensions/ExtensionsOrchestrator.java @@ -27,6 +27,7 @@ import org.opensearch.Version; import org.opensearch.action.admin.cluster.node.info.PluginsAndModules; import org.opensearch.action.admin.cluster.state.ClusterStateResponse; +import org.opensearch.client.node.NodeClient; import org.opensearch.cluster.ClusterSettingsResponse; import org.opensearch.cluster.LocalNodeResponse; import org.opensearch.cluster.node.DiscoveryNode; @@ -39,6 +40,10 @@ import org.opensearch.discovery.InitializeExtensionsRequest; import org.opensearch.discovery.InitializeExtensionsResponse; import org.opensearch.extensions.ExtensionsSettings.Extension; +import org.opensearch.extensions.action.ExtensionActionRequest; +import org.opensearch.extensions.action.ExtensionActionResponse; +import org.opensearch.extensions.action.ExtensionTransportActionsHandler; +import org.opensearch.extensions.action.TransportActionRequestFromExtension; import org.opensearch.extensions.rest.RegisterRestActionsRequest; import org.opensearch.extensions.rest.RestActionsRequestHandler; import org.opensearch.extensions.settings.CustomSettingsRequestHandler; @@ -83,6 +88,9 @@ public class ExtensionsOrchestrator implements ReportingService extensionsInitializedList; // A map of extension uniqueId to full extension details used for node transport here and in the RestActionsRequestHandler @@ -126,6 +135,7 @@ public static enum OpenSearchRequestType { ExtensionActionListenerHandler listenerHandler; EnvironmentSettingsRequestHandler environmentSettingsRequestHandler; AddSettingsUpdateConsumerRequestHandler addSettingsUpdateConsumerRequestHandler; + NodeClient client; /** * Instantiate a new ExtensionsOrchestrator object to handle requests and responses from extensions. This is called during Node bootstrap. @@ -137,12 +147,15 @@ public static enum OpenSearchRequestType { public ExtensionsOrchestrator(Settings settings, Path extensionsPath) throws IOException { logger.info("ExtensionsOrchestrator initialized"); this.extensionsPath = extensionsPath; - this.transportService = null; + this.listener = new ExtensionActionListener(); this.extensionsInitializedList = new ArrayList(); this.extensionIdMap = new HashMap(); + // will be initialized in initializeServicesAndRestHandler which is called after the Node is initialized + this.transportService = null; this.clusterService = null; this.namedWriteableRegistry = null; - this.listener = new ExtensionActionListener(); + this.client = null; + this.extensionTransportActionsHandler = null; /* * Now Discover extensions @@ -160,13 +173,15 @@ public ExtensionsOrchestrator(Settings settings, Path extensionsPath) throws IOE * @param transportService The Node's transport service. * @param clusterService The Node's cluster service. * @param initialEnvironmentSettings The finalized view of settings for the Environment + * @param client The client used to make transport requests */ public void initializeServicesAndRestHandler( RestController restController, SettingsModule settingsModule, TransportService transportService, ClusterService clusterService, - Settings initialEnvironmentSettings + Settings initialEnvironmentSettings, + NodeClient client ) { this.restActionsRequestHandler = new RestActionsRequestHandler(restController, extensionIdMap, transportService); this.listenerHandler = new ExtensionActionListenerHandler(listener); @@ -179,9 +194,20 @@ public void initializeServicesAndRestHandler( transportService, REQUEST_EXTENSION_UPDATE_SETTINGS ); + this.client = client; + this.extensionTransportActionsHandler = new ExtensionTransportActionsHandler(extensionIdMap, transportService, client); registerRequestHandler(); } + /** + * Handles Transport Request from {@link org.opensearch.extensions.action.ExtensionTransportAction} which was invoked by an extension via {@link ExtensionTransportActionsHandler}. + * + * @param request which was sent by an extension. + */ + public ExtensionActionResponse handleTransportRequest(ExtensionActionRequest request) throws InterruptedException { + return extensionTransportActionsHandler.sendTransportRequestToExtension(request); + } + private void registerRequestHandler() { transportService.registerRequestHandler( REQUEST_EXTENSION_REGISTER_REST_ACTIONS, @@ -255,7 +281,19 @@ private void registerRequestHandler() { false, false, RegisterTransportActionsRequest::new, - ((request, channel, task) -> channel.sendResponse(handleRegisterTransportActionsRequest(request))) + ((request, channel, task) -> channel.sendResponse( + extensionTransportActionsHandler.handleRegisterTransportActionsRequest(request) + )) + ); + transportService.registerRequestHandler( + TRANSPORT_ACTION_REQUEST_FROM_EXTENSION, + ThreadPool.Names.GENERIC, + false, + false, + TransportActionRequestFromExtension::new, + ((request, channel, task) -> channel.sendResponse( + extensionTransportActionsHandler.handleTransportActionRequestFromExtension(request) + )) ); } @@ -373,28 +411,12 @@ public String executor() { new InitializeExtensionsRequest(transportService.getLocalNode(), extension), extensionResponseHandler ); - inProgressLatch.await(100, TimeUnit.SECONDS); + inProgressLatch.await(EXTENSION_REQUEST_WAIT_TIMEOUT, TimeUnit.SECONDS); } catch (Exception e) { logger.error(e.toString()); } } - /** - * Handles a {@link RegisterTransportActionsRequest}. - * - * @param transportActionsRequest The request to handle. - * @return A {@link ExtensionBooleanResponse} indicating success. - * @throws Exception if the request is not handled properly. - */ - TransportResponse handleRegisterTransportActionsRequest(RegisterTransportActionsRequest transportActionsRequest) throws Exception { - /* - * TODO: https://github.com/opensearch-project/opensearch-sdk-java/issues/107 - * Register these new Transport Actions with ActionModule - * and add support for NodeClient to recognise these actions when making transport calls. - */ - return new ExtensionBooleanResponse(true); - } - /** * Handles an {@link ExtensionRequest}. * @@ -483,7 +505,7 @@ public void beforeIndexRemoved( /* * Making async synchronous for now. */ - inProgressIndexNameLatch.await(100, TimeUnit.SECONDS); + inProgressIndexNameLatch.await(EXTENSION_REQUEST_WAIT_TIMEOUT, TimeUnit.SECONDS); logger.info("Received ack response from Extension"); } catch (Exception e) { logger.error(e.toString()); @@ -517,7 +539,7 @@ public String executor() { /* * Making asynchronous for now. */ - inProgressLatch.await(100, TimeUnit.SECONDS); + inProgressLatch.await(EXTENSION_REQUEST_WAIT_TIMEOUT, TimeUnit.SECONDS); logger.info("Received response from Extension"); } catch (Exception e) { logger.error(e.toString()); diff --git a/server/src/main/java/org/opensearch/extensions/RegisterTransportActionsRequest.java b/server/src/main/java/org/opensearch/extensions/RegisterTransportActionsRequest.java index a3603aaf22dd0..e0eac7dd7b13e 100644 --- a/server/src/main/java/org/opensearch/extensions/RegisterTransportActionsRequest.java +++ b/server/src/main/java/org/opensearch/extensions/RegisterTransportActionsRequest.java @@ -23,14 +23,17 @@ * @opensearch.internal */ public class RegisterTransportActionsRequest extends TransportRequest { + private String uniqueId; private Map transportActions; - public RegisterTransportActionsRequest(Map transportActions) { + public RegisterTransportActionsRequest(String uniqueId, Map transportActions) { + this.uniqueId = uniqueId; this.transportActions = new HashMap<>(transportActions); } public RegisterTransportActionsRequest(StreamInput in) throws IOException { super(in); + this.uniqueId = in.readString(); Map actions = new HashMap<>(); int actionCount = in.readVInt(); for (int i = 0; i < actionCount; i++) { @@ -45,6 +48,10 @@ public RegisterTransportActionsRequest(StreamInput in) throws IOException { this.transportActions = actions; } + public String getUniqueId() { + return uniqueId; + } + public Map getTransportActions() { return transportActions; } @@ -52,6 +59,7 @@ public Map getTransportActions() { @Override public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); + out.writeString(uniqueId); out.writeVInt(this.transportActions.size()); for (Map.Entry action : transportActions.entrySet()) { out.writeString(action.getKey()); @@ -61,7 +69,7 @@ public void writeTo(StreamOutput out) throws IOException { @Override public String toString() { - return "TransportActionsRequest{actions=" + transportActions + "}"; + return "TransportActionsRequest{uniqueId=" + uniqueId + ", actions=" + transportActions + "}"; } @Override @@ -69,11 +77,11 @@ public boolean equals(Object obj) { if (this == obj) return true; if (obj == null || getClass() != obj.getClass()) return false; RegisterTransportActionsRequest that = (RegisterTransportActionsRequest) obj; - return Objects.equals(transportActions, that.transportActions); + return Objects.equals(uniqueId, that.uniqueId) && Objects.equals(transportActions, that.transportActions); } @Override public int hashCode() { - return Objects.hash(transportActions); + return Objects.hash(uniqueId, transportActions); } } diff --git a/server/src/main/java/org/opensearch/extensions/action/ExtensionActionRequest.java b/server/src/main/java/org/opensearch/extensions/action/ExtensionActionRequest.java new file mode 100644 index 0000000000000..801b40e847d21 --- /dev/null +++ b/server/src/main/java/org/opensearch/extensions/action/ExtensionActionRequest.java @@ -0,0 +1,76 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.extensions.action; + +import org.opensearch.action.ActionRequest; +import org.opensearch.action.ActionRequestValidationException; +import org.opensearch.common.io.stream.StreamInput; +import org.opensearch.common.io.stream.StreamOutput; + +import java.io.IOException; + +/** + * This class translates Extension transport request to ActionRequest + * which is internally used to make transport action call. + * + * @opensearch.internal + */ +public class ExtensionActionRequest extends ActionRequest { + /** + * action is the transport action intended to be invoked which is registered by an extension via {@link ExtensionTransportActionsHandler}. + */ + private final String action; + /** + * requestBytes is the raw bytes being transported between extensions. + */ + private final byte[] requestBytes; + + /** + * ExtensionActionRequest constructor. + * + * @param action is the transport action intended to be invoked which is registered by an extension via {@link ExtensionTransportActionsHandler}. + * @param requestBytes is the raw bytes being transported between extensions. + */ + public ExtensionActionRequest(String action, byte[] requestBytes) { + this.action = action; + this.requestBytes = requestBytes; + } + + /** + * ExtensionActionRequest constructor from {@link StreamInput}. + * + * @param in bytes stream input used to de-serialize the message. + * @throws IOException when message de-serialization fails. + */ + ExtensionActionRequest(StreamInput in) throws IOException { + super(in); + action = in.readString(); + requestBytes = in.readByteArray(); + } + + public String getAction() { + return action; + } + + public byte[] getRequestBytes() { + return requestBytes; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeString(action); + out.writeByteArray(requestBytes); + } + + @Override + public ActionRequestValidationException validate() { + return null; + } +} diff --git a/server/src/main/java/org/opensearch/extensions/action/ExtensionActionResponse.java b/server/src/main/java/org/opensearch/extensions/action/ExtensionActionResponse.java new file mode 100644 index 0000000000000..68729ada48c25 --- /dev/null +++ b/server/src/main/java/org/opensearch/extensions/action/ExtensionActionResponse.java @@ -0,0 +1,59 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.extensions.action; + +import org.opensearch.action.ActionResponse; +import org.opensearch.common.io.stream.StreamInput; +import org.opensearch.common.io.stream.StreamOutput; + +import java.io.IOException; + +/** + * This class encapsulates the transport response from extension + * + * @opensearch.internal + */ +public class ExtensionActionResponse extends ActionResponse { + /** + * responseBytes is the raw bytes being transported between extensions. + */ + private byte[] responseBytes; + + /** + * ExtensionActionResponse constructor. + * + * @param responseBytes is the raw bytes being transported between extensions. + */ + public ExtensionActionResponse(byte[] responseBytes) { + this.responseBytes = responseBytes; + } + + /** + * ExtensionActionResponse constructor from {@link StreamInput}. + * + * @param in bytes stream input used to de-serialize the message. + * @throws IOException when message de-serialization fails. + */ + public ExtensionActionResponse(StreamInput in) throws IOException { + responseBytes = in.readByteArray(); + } + + public byte[] getResponseBytes() { + return responseBytes; + } + + public void setResponseBytes(byte[] responseBytes) { + this.responseBytes = responseBytes; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeByteArray(responseBytes); + } +} diff --git a/server/src/main/java/org/opensearch/extensions/action/ExtensionHandleTransportRequest.java b/server/src/main/java/org/opensearch/extensions/action/ExtensionHandleTransportRequest.java new file mode 100644 index 0000000000000..1b946d08f0459 --- /dev/null +++ b/server/src/main/java/org/opensearch/extensions/action/ExtensionHandleTransportRequest.java @@ -0,0 +1,89 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.extensions.action; + +import org.opensearch.common.io.stream.StreamInput; +import org.opensearch.common.io.stream.StreamOutput; +import org.opensearch.transport.TransportRequest; + +import java.io.IOException; +import java.util.Objects; + +/** + * This class encapsulates a transport request to extension + * + * @opensearch.api + */ +public class ExtensionHandleTransportRequest extends TransportRequest { + /** + * action is the transport action intended to be invoked which is registered by an extension via {@link ExtensionTransportActionsHandler}. + */ + private final String action; + /** + * requestBytes is the raw bytes being transported between extensions. + */ + private final byte[] requestBytes; + + /** + * ExtensionHandleTransportRequest constructor. + * + * @param action is the transport action intended to be invoked which is registered by an extension via {@link ExtensionTransportActionsHandler}. + * @param requestBytes is the raw bytes being transported between extensions. + */ + public ExtensionHandleTransportRequest(String action, byte[] requestBytes) { + this.action = action; + this.requestBytes = requestBytes; + } + + /** + * ExtensionHandleTransportRequest constructor from {@link StreamInput}. + * + * @param in bytes stream input used to de-serialize the message. + * @throws IOException when message de-serialization fails. + */ + public ExtensionHandleTransportRequest(StreamInput in) throws IOException { + super(in); + this.action = in.readString(); + this.requestBytes = in.readByteArray(); + } + + public String getAction() { + return this.action; + } + + public byte[] getRequestBytes() { + return this.requestBytes; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeString(action); + out.writeByteArray(requestBytes); + } + + @Override + public String toString() { + return "ExtensionHandleTransportRequest{action=" + action + ", requestBytes=" + requestBytes + "}"; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) return true; + if (obj == null || getClass() != obj.getClass()) return false; + ExtensionHandleTransportRequest that = (ExtensionHandleTransportRequest) obj; + return Objects.equals(action, that.action) && Objects.equals(requestBytes, that.requestBytes); + } + + @Override + public int hashCode() { + return Objects.hash(action, requestBytes); + } + +} diff --git a/server/src/main/java/org/opensearch/extensions/action/ExtensionProxyAction.java b/server/src/main/java/org/opensearch/extensions/action/ExtensionProxyAction.java new file mode 100644 index 0000000000000..7345cf44e007f --- /dev/null +++ b/server/src/main/java/org/opensearch/extensions/action/ExtensionProxyAction.java @@ -0,0 +1,25 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.extensions.action; + +import org.opensearch.action.ActionType; + +/** + * The main proxy action for all extensions + * + * @opensearch.internal + */ +public class ExtensionProxyAction extends ActionType { + public static final String NAME = "cluster:internal/extensions"; + public static final ExtensionProxyAction INSTANCE = new ExtensionProxyAction(); + + public ExtensionProxyAction() { + super(NAME, ExtensionActionResponse::new); + } +} diff --git a/server/src/main/java/org/opensearch/extensions/action/ExtensionTransportAction.java b/server/src/main/java/org/opensearch/extensions/action/ExtensionTransportAction.java new file mode 100644 index 0000000000000..146b162e2437b --- /dev/null +++ b/server/src/main/java/org/opensearch/extensions/action/ExtensionTransportAction.java @@ -0,0 +1,55 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.extensions.action; + +import org.opensearch.action.ActionListener; +import org.opensearch.action.support.ActionFilters; +import org.opensearch.action.support.HandledTransportAction; +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.inject.Inject; +import org.opensearch.common.settings.Settings; +import org.opensearch.extensions.ExtensionsOrchestrator; +import org.opensearch.node.Node; +import org.opensearch.tasks.Task; +import org.opensearch.transport.TransportService; + +/** + * The main proxy transport action used to proxy a transport request from extension to another extension + * + * @opensearch.internal + */ +public class ExtensionTransportAction extends HandledTransportAction { + + private final String nodeName; + private final ClusterService clusterService; + private final ExtensionsOrchestrator extensionsOrchestrator; + + @Inject + public ExtensionTransportAction( + Settings settings, + TransportService transportService, + ActionFilters actionFilters, + ClusterService clusterService, + ExtensionsOrchestrator extensionsOrchestrator + ) { + super(ExtensionProxyAction.NAME, transportService, actionFilters, ExtensionActionRequest::new); + this.nodeName = Node.NODE_NAME_SETTING.get(settings); + this.clusterService = clusterService; + this.extensionsOrchestrator = extensionsOrchestrator; + } + + @Override + protected void doExecute(Task task, ExtensionActionRequest request, ActionListener listener) { + try { + listener.onResponse(extensionsOrchestrator.handleTransportRequest(request)); + } catch (Exception e) { + listener.onFailure(e); + } + } +} diff --git a/server/src/main/java/org/opensearch/extensions/action/ExtensionTransportActionsHandler.java b/server/src/main/java/org/opensearch/extensions/action/ExtensionTransportActionsHandler.java new file mode 100644 index 0000000000000..c6aebd1b1cc0a --- /dev/null +++ b/server/src/main/java/org/opensearch/extensions/action/ExtensionTransportActionsHandler.java @@ -0,0 +1,193 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.extensions.action; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.action.ActionListener; +import org.opensearch.client.node.NodeClient; +import org.opensearch.common.io.stream.StreamInput; +import org.opensearch.extensions.DiscoveryExtension; +import org.opensearch.extensions.ExtensionBooleanResponse; +import org.opensearch.extensions.ExtensionsOrchestrator; +import org.opensearch.extensions.RegisterTransportActionsRequest; +import org.opensearch.threadpool.ThreadPool; +import org.opensearch.transport.ActionNotFoundTransportException; +import org.opensearch.transport.TransportException; +import org.opensearch.transport.TransportResponse; +import org.opensearch.transport.TransportResponseHandler; +import org.opensearch.transport.TransportService; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +/** + * This class manages TransportActions for extensions + * + * @opensearch.internal + */ +public class ExtensionTransportActionsHandler { + private static final Logger logger = LogManager.getLogger(ExtensionTransportActionsHandler.class); + private Map actionsMap; + private final Map extensionIdMap; + private final TransportService transportService; + private final NodeClient client; + + public ExtensionTransportActionsHandler( + Map extensionIdMap, + TransportService transportService, + NodeClient client + ) { + this.actionsMap = new HashMap<>(); + this.extensionIdMap = extensionIdMap; + this.transportService = transportService; + this.client = client; + } + + /** + * Method to register actions for extensions. + * + * @param action to be registered. + * @param extension for which action is being registered. + * @throws IllegalArgumentException when action being registered already is registered. + */ + void registerAction(String action, DiscoveryExtension extension) throws IllegalArgumentException { + if (actionsMap.containsKey(action)) { + throw new IllegalArgumentException("The " + action + " you are trying to register is already registered"); + } + actionsMap.putIfAbsent(action, extension); + } + + /** + * Method to get extension for a given action. + * + * @param action for which to get the registered extension. + * @return the extension. + */ + public DiscoveryExtension getExtension(String action) { + return actionsMap.get(action); + } + + /** + * Handles a {@link RegisterTransportActionsRequest}. + * + * @param transportActionsRequest The request to handle. + * @return A {@link ExtensionBooleanResponse} indicating success. + */ + public TransportResponse handleRegisterTransportActionsRequest(RegisterTransportActionsRequest transportActionsRequest) { + /* + * We are proxying the transport Actions through ExtensionProxyAction, so we really dont need to register dynamic actions for now. + */ + logger.debug("Register Transport Actions request recieved {}", transportActionsRequest); + DiscoveryExtension extension = extensionIdMap.get(transportActionsRequest.getUniqueId()); + try { + for (String action : transportActionsRequest.getTransportActions().keySet()) { + registerAction(action, extension); + } + } catch (Exception e) { + logger.error("Could not register Transport Action " + e); + return new ExtensionBooleanResponse(false); + } + return new ExtensionBooleanResponse(true); + } + + /** + * Method which handles transport action request from an extension. + * + * @param request from extension. + * @return {@link TransportResponse} which is sent back to the transport action invoker. + * @throws InterruptedException when message transport fails. + */ + public TransportResponse handleTransportActionRequestFromExtension(TransportActionRequestFromExtension request) + throws InterruptedException { + DiscoveryExtension extension = extensionIdMap.get(request.getUniqueId()); + final CountDownLatch inProgressLatch = new CountDownLatch(1); + final TransportActionResponseToExtension response = new TransportActionResponseToExtension(new byte[0]); + client.execute( + ExtensionProxyAction.INSTANCE, + new ExtensionActionRequest(request.getAction(), request.getRequestBytes()), + new ActionListener() { + @Override + public void onResponse(ExtensionActionResponse actionResponse) { + response.setResponseBytes(actionResponse.getResponseBytes()); + inProgressLatch.countDown(); + } + + @Override + public void onFailure(Exception exp) { + logger.debug("Transport request failed", exp); + byte[] responseBytes = ("Request failed: " + exp.getMessage()).getBytes(StandardCharsets.UTF_8); + response.setResponseBytes(responseBytes); + inProgressLatch.countDown(); + } + } + ); + inProgressLatch.await(ExtensionsOrchestrator.EXTENSION_REQUEST_WAIT_TIMEOUT, TimeUnit.SECONDS); + return response; + } + + /** + * Method to send transport action request to an extension to handle. + * + * @param request to extension to handle transport request. + * @return {@link ExtensionActionResponse} which encapsulates the transport response from the extension. + * @throws InterruptedException when message transport fails. + */ + public ExtensionActionResponse sendTransportRequestToExtension(ExtensionActionRequest request) throws InterruptedException { + DiscoveryExtension extension = actionsMap.get(request.getAction()); + if (extension == null) { + throw new ActionNotFoundTransportException(request.getAction()); + } + final CountDownLatch inProgressLatch = new CountDownLatch(1); + final ExtensionActionResponse extensionActionResponse = new ExtensionActionResponse(new byte[0]); + final TransportResponseHandler extensionActionResponseTransportResponseHandler = + new TransportResponseHandler() { + + @Override + public ExtensionActionResponse read(StreamInput in) throws IOException { + return new ExtensionActionResponse(in); + } + + @Override + public void handleResponse(ExtensionActionResponse response) { + extensionActionResponse.setResponseBytes(response.getResponseBytes()); + inProgressLatch.countDown(); + } + + @Override + public void handleException(TransportException exp) { + logger.debug("Transport request failed", exp); + byte[] responseBytes = ("Request failed: " + exp.getMessage()).getBytes(StandardCharsets.UTF_8); + extensionActionResponse.setResponseBytes(responseBytes); + inProgressLatch.countDown(); + } + + @Override + public String executor() { + return ThreadPool.Names.GENERIC; + } + }; + try { + transportService.sendRequest( + extension, + ExtensionsOrchestrator.REQUEST_EXTENSION_HANDLE_TRANSPORT_ACTION, + new ExtensionHandleTransportRequest(request.getAction(), request.getRequestBytes()), + extensionActionResponseTransportResponseHandler + ); + } catch (Exception e) { + logger.info("Failed to send transport action to extension " + extension.getName(), e); + } + inProgressLatch.await(ExtensionsOrchestrator.EXTENSION_REQUEST_WAIT_TIMEOUT, TimeUnit.SECONDS); + return extensionActionResponse; + } +} diff --git a/server/src/main/java/org/opensearch/extensions/action/TransportActionRequestFromExtension.java b/server/src/main/java/org/opensearch/extensions/action/TransportActionRequestFromExtension.java new file mode 100644 index 0000000000000..df494297559b3 --- /dev/null +++ b/server/src/main/java/org/opensearch/extensions/action/TransportActionRequestFromExtension.java @@ -0,0 +1,102 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.extensions.action; + +import org.opensearch.common.io.stream.StreamInput; +import org.opensearch.common.io.stream.StreamOutput; +import org.opensearch.transport.TransportRequest; + +import java.io.IOException; +import java.util.Objects; + +/** + * Transport Action Request from Extension + * + * @opensearch.api + */ +public class TransportActionRequestFromExtension extends TransportRequest { + /** + * action is the transport action intended to be invoked which is registered by an extension via {@link ExtensionTransportActionsHandler}. + */ + private final String action; + /** + * requestBytes is the raw bytes being transported between extensions. + */ + private final byte[] requestBytes; + /** + * uniqueId to identify which extension is making a transport request call. + */ + private final String uniqueId; + + /** + * TransportActionRequestFromExtension constructor. + * + * @param action is the transport action intended to be invoked which is registered by an extension via {@link ExtensionTransportActionsHandler}. + * @param requestBytes is the raw bytes being transported between extensions. + * @param uniqueId to identify which extension is making a transport request call. + */ + public TransportActionRequestFromExtension(String action, byte[] requestBytes, String uniqueId) { + this.action = action; + this.requestBytes = requestBytes; + this.uniqueId = uniqueId; + } + + /** + * TransportActionRequestFromExtension constructor from {@link StreamInput}. + * + * @param in bytes stream input used to de-serialize the message. + * @throws IOException when message de-serialization fails. + */ + public TransportActionRequestFromExtension(StreamInput in) throws IOException { + super(in); + this.action = in.readString(); + this.requestBytes = in.readByteArray(); + this.uniqueId = in.readString(); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeString(action); + out.writeByteArray(requestBytes); + out.writeString(uniqueId); + } + + public String getAction() { + return this.action; + } + + public byte[] getRequestBytes() { + return this.requestBytes; + } + + public String getUniqueId() { + return this.uniqueId; + } + + @Override + public String toString() { + return "TransportActionRequestFromExtension{action=" + action + ", requestBytes=" + requestBytes + ", uniqueId=" + uniqueId + "}"; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) return true; + if (obj == null || getClass() != obj.getClass()) return false; + TransportActionRequestFromExtension that = (TransportActionRequestFromExtension) obj; + return Objects.equals(action, that.action) + && Objects.equals(requestBytes, that.requestBytes) + && Objects.equals(uniqueId, that.uniqueId); + } + + @Override + public int hashCode() { + return Objects.hash(action, requestBytes, uniqueId); + } +} diff --git a/server/src/main/java/org/opensearch/extensions/action/TransportActionResponseToExtension.java b/server/src/main/java/org/opensearch/extensions/action/TransportActionResponseToExtension.java new file mode 100644 index 0000000000000..2913402bcd5e1 --- /dev/null +++ b/server/src/main/java/org/opensearch/extensions/action/TransportActionResponseToExtension.java @@ -0,0 +1,58 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.extensions.action; + +import org.opensearch.common.io.stream.StreamInput; +import org.opensearch.common.io.stream.StreamOutput; +import org.opensearch.transport.TransportResponse; + +import java.io.IOException; + +/** + * This class encapsulates transport response to extension. + * + * @opensearch.api + */ +public class TransportActionResponseToExtension extends TransportResponse { + /** + * responseBytes is the raw bytes being transported between extensions. + */ + private byte[] responseBytes; + + /** + * TransportActionResponseToExtension constructor. + * + * @param responseBytes is the raw bytes being transported between extensions. + */ + public TransportActionResponseToExtension(byte[] responseBytes) { + this.responseBytes = responseBytes; + } + + /** + * TransportActionResponseToExtension constructor from {@link StreamInput} + * @param in bytes stream input used to de-serialize the message. + * @throws IOException when message de-serialization fails. + */ + public TransportActionResponseToExtension(StreamInput in) throws IOException { + this.responseBytes = in.readByteArray(); + } + + public void setResponseBytes(byte[] responseBytes) { + this.responseBytes = responseBytes; + } + + public byte[] getResponseBytes() { + return responseBytes; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeByteArray(responseBytes); + } +} diff --git a/server/src/main/java/org/opensearch/extensions/action/package-info.java b/server/src/main/java/org/opensearch/extensions/action/package-info.java new file mode 100644 index 0000000000000..9bad08eaeb921 --- /dev/null +++ b/server/src/main/java/org/opensearch/extensions/action/package-info.java @@ -0,0 +1,10 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/** Actions classes for the extensions package. OpenSearch extensions provide extensibility to OpenSearch.*/ +package org.opensearch.extensions.action; diff --git a/server/src/main/java/org/opensearch/extensions/rest/RestSendToExtensionAction.java b/server/src/main/java/org/opensearch/extensions/rest/RestSendToExtensionAction.java index 8c081d365c572..8a35638c9d939 100644 --- a/server/src/main/java/org/opensearch/extensions/rest/RestSendToExtensionAction.java +++ b/server/src/main/java/org/opensearch/extensions/rest/RestSendToExtensionAction.java @@ -190,7 +190,7 @@ public String executor() { restExecuteOnExtensionResponseHandler ); try { - inProgressLatch.await(5, TimeUnit.SECONDS); + inProgressLatch.await(ExtensionsOrchestrator.EXTENSION_REQUEST_WAIT_TIMEOUT, TimeUnit.SECONDS); } catch (InterruptedException e) { return channel -> channel.sendResponse( new BytesRestResponse(RestStatus.REQUEST_TIMEOUT, "No response from extension to request.") diff --git a/server/src/main/java/org/opensearch/node/Node.java b/server/src/main/java/org/opensearch/node/Node.java index 9fb6ae602cc5e..0faa6d34332a7 100644 --- a/server/src/main/java/org/opensearch/node/Node.java +++ b/server/src/main/java/org/opensearch/node/Node.java @@ -785,7 +785,8 @@ protected Node( settingsModule, transportService, clusterService, - environment.settings() + environment.settings(), + client ); final GatewayMetaState gatewayMetaState = new GatewayMetaState(); final ResponseCollectorService responseCollectorService = new ResponseCollectorService(clusterService); @@ -934,6 +935,7 @@ protected Node( b.bind(Client.class).toInstance(client); b.bind(NodeClient.class).toInstance(client); b.bind(Environment.class).toInstance(this.environment); + b.bind(ExtensionsOrchestrator.class).toInstance(this.extensionsOrchestrator); b.bind(ThreadPool.class).toInstance(threadPool); b.bind(NodeEnvironment.class).toInstance(nodeEnvironment); b.bind(ResourceWatcherService.class).toInstance(resourceWatcherService); diff --git a/server/src/test/java/org/opensearch/extensions/ExtensionsOrchestratorTests.java b/server/src/test/java/org/opensearch/extensions/ExtensionsOrchestratorTests.java index 06df86cba78f7..768393a0d269c 100644 --- a/server/src/test/java/org/opensearch/extensions/ExtensionsOrchestratorTests.java +++ b/server/src/test/java/org/opensearch/extensions/ExtensionsOrchestratorTests.java @@ -87,6 +87,7 @@ import org.opensearch.test.IndexSettingsModule; import org.opensearch.test.MockLogAppender; import org.opensearch.test.OpenSearchTestCase; +import org.opensearch.test.client.NoOpNodeClient; import org.opensearch.test.transport.MockTransportService; import org.opensearch.threadpool.TestThreadPool; import org.opensearch.threadpool.ThreadPool; @@ -102,6 +103,7 @@ public class ExtensionsOrchestratorTests extends OpenSearchTestCase { private RestController restController; private SettingsModule settingsModule; private ClusterService clusterService; + private NodeClient client; private MockNioTransport transport; private Path extensionDir; private final ThreadPool threadPool = new TestThreadPool(ExtensionsOrchestratorTests.class.getSimpleName()); @@ -198,6 +200,7 @@ public void setup() throws Exception { false ) ); + client = new NoOpNodeClient(this.getTestName()); } @Override @@ -205,6 +208,7 @@ public void setup() throws Exception { public void tearDown() throws Exception { super.tearDown(); transportService.close(); + client.close(); ThreadPool.terminate(threadPool, 30, TimeUnit.SECONDS); } @@ -344,12 +348,8 @@ public void testEmptyExtensionsFile() throws Exception { public void testExtensionsInitialize() throws Exception { Files.write(extensionDir.resolve("extensions.yml"), extensionsYmlLines, StandardCharsets.UTF_8); - ExtensionsOrchestrator extensionsOrchestrator = new ExtensionsOrchestrator(settings, extensionDir); - - transportService.start(); - transportService.acceptIncomingRequests(); - extensionsOrchestrator.initializeServicesAndRestHandler(restController, settingsModule, transportService, clusterService, settings); + initialize(extensionsOrchestrator); try (MockLogAppender mockLogAppender = MockLogAppender.createForLoggers(LogManager.getLogger(ExtensionsOrchestrator.class))) { @@ -384,8 +384,8 @@ public void testHandleRegisterRestActionsRequest() throws Exception { Files.write(extensionDir.resolve("extensions.yml"), extensionsYmlLines, StandardCharsets.UTF_8); ExtensionsOrchestrator extensionsOrchestrator = new ExtensionsOrchestrator(settings, extensionDir); + initialize(extensionsOrchestrator); - extensionsOrchestrator.initializeServicesAndRestHandler(restController, settingsModule, transportService, clusterService, settings); String uniqueIdStr = "uniqueid1"; List actionsList = List.of("GET /foo", "PUT /bar", "POST /baz"); RegisterRestActionsRequest registerActionsRequest = new RegisterRestActionsRequest(uniqueIdStr, actionsList); @@ -398,14 +398,11 @@ public void testHandleRegisterRestActionsRequest() throws Exception { } public void testHandleRegisterSettingsRequest() throws Exception { - Path extensionDir = createTempDir(); - Files.write(extensionDir.resolve("extensions.yml"), extensionsYmlLines, StandardCharsets.UTF_8); - ExtensionsOrchestrator extensionsOrchestrator = new ExtensionsOrchestrator(settings, extensionDir); + initialize(extensionsOrchestrator); - extensionsOrchestrator.initializeServicesAndRestHandler(restController, settingsModule, transportService, clusterService, settings); String uniqueIdStr = "uniqueid1"; List> settingsList = List.of( Setting.boolSetting("index.falseSetting", false, Property.IndexScope, Property.Dynamic), @@ -423,8 +420,8 @@ public void testHandleRegisterSettingsRequest() throws Exception { public void testHandleRegisterRestActionsRequestWithInvalidMethod() throws Exception { ExtensionsOrchestrator extensionsOrchestrator = new ExtensionsOrchestrator(settings, extensionDir); + initialize(extensionsOrchestrator); - extensionsOrchestrator.initializeServicesAndRestHandler(restController, settingsModule, transportService, clusterService, settings); String uniqueIdStr = "uniqueid1"; List actionsList = List.of("FOO /foo", "PUT /bar", "POST /baz"); RegisterRestActionsRequest registerActionsRequest = new RegisterRestActionsRequest(uniqueIdStr, actionsList); @@ -436,8 +433,7 @@ public void testHandleRegisterRestActionsRequestWithInvalidMethod() throws Excep public void testHandleRegisterRestActionsRequestWithInvalidUri() throws Exception { ExtensionsOrchestrator extensionsOrchestrator = new ExtensionsOrchestrator(settings, extensionDir); - - extensionsOrchestrator.initializeServicesAndRestHandler(restController, settingsModule, transportService, clusterService, settings); + initialize(extensionsOrchestrator); String uniqueIdStr = "uniqueid1"; List actionsList = List.of("GET", "PUT /bar", "POST /baz"); RegisterRestActionsRequest registerActionsRequest = new RegisterRestActionsRequest(uniqueIdStr, actionsList); @@ -448,10 +444,9 @@ public void testHandleRegisterRestActionsRequestWithInvalidUri() throws Exceptio } public void testHandleExtensionRequest() throws Exception { - ExtensionsOrchestrator extensionsOrchestrator = new ExtensionsOrchestrator(settings, extensionDir); + initialize(extensionsOrchestrator); - extensionsOrchestrator.initializeServicesAndRestHandler(restController, settingsModule, transportService, clusterService, settings); ExtensionRequest clusterStateRequest = new ExtensionRequest(ExtensionsOrchestrator.RequestType.REQUEST_EXTENSION_CLUSTER_STATE); assertEquals(ClusterStateResponse.class, extensionsOrchestrator.handleExtensionRequest(clusterStateRequest).getClass()); @@ -469,12 +464,9 @@ public void testHandleExtensionRequest() throws Exception { } public void testHandleActionListenerOnFailureRequest() throws Exception { - Files.write(extensionDir.resolve("extensions.yml"), extensionsYmlLines, StandardCharsets.UTF_8); - ExtensionsOrchestrator extensionsOrchestrator = new ExtensionsOrchestrator(settings, extensionDir); - - extensionsOrchestrator.initializeServicesAndRestHandler(restController, settingsModule, transportService, clusterService, settings); + initialize(extensionsOrchestrator); ExtensionActionListenerOnFailureRequest listenerFailureRequest = new ExtensionActionListenerOnFailureRequest("Test failure"); @@ -486,11 +478,10 @@ public void testHandleActionListenerOnFailureRequest() throws Exception { } public void testEnvironmentSettingsRequest() throws Exception { - Path extensionDir = createTempDir(); Files.write(extensionDir.resolve("extensions.yml"), extensionsYmlLines, StandardCharsets.UTF_8); ExtensionsOrchestrator extensionsOrchestrator = new ExtensionsOrchestrator(settings, extensionDir); - extensionsOrchestrator.initializeServicesAndRestHandler(restController, settingsModule, transportService, clusterService, settings); + initialize(extensionsOrchestrator); List> componentSettings = List.of( Setting.boolSetting("falseSetting", false, Property.IndexScope, Property.NodeScope), @@ -553,11 +544,10 @@ public void testEnvironmentSettingsResponse() throws Exception { } public void testHandleEnvironmentSettingsRequest() throws Exception { - Path extensionDir = createTempDir(); Files.write(extensionDir.resolve("extensions.yml"), extensionsYmlLines, StandardCharsets.UTF_8); ExtensionsOrchestrator extensionsOrchestrator = new ExtensionsOrchestrator(settings, extensionDir); - extensionsOrchestrator.initializeServicesAndRestHandler(restController, settingsModule, transportService, clusterService, settings); + initialize(extensionsOrchestrator); List> componentSettings = List.of( Setting.boolSetting("falseSetting", false, Property.Dynamic), @@ -582,7 +572,7 @@ public void testAddSettingsUpdateConsumerRequest() throws Exception { Path extensionDir = createTempDir(); Files.write(extensionDir.resolve("extensions.yml"), extensionsYmlLines, StandardCharsets.UTF_8); ExtensionsOrchestrator extensionsOrchestrator = new ExtensionsOrchestrator(settings, extensionDir); - extensionsOrchestrator.initializeServicesAndRestHandler(restController, settingsModule, transportService, clusterService, settings); + initialize(extensionsOrchestrator); List> componentSettings = List.of( Setting.boolSetting("falseSetting", false, Property.IndexScope, Property.NodeScope), @@ -629,8 +619,7 @@ public void testHandleAddSettingsUpdateConsumerRequest() throws Exception { Path extensionDir = createTempDir(); Files.write(extensionDir.resolve("extensions.yml"), extensionsYmlLines, StandardCharsets.UTF_8); ExtensionsOrchestrator extensionsOrchestrator = new ExtensionsOrchestrator(settings, extensionDir); - - extensionsOrchestrator.initializeServicesAndRestHandler(restController, settingsModule, transportService, clusterService, settings); + initialize(extensionsOrchestrator); List> componentSettings = List.of( Setting.boolSetting("falseSetting", false, Property.Dynamic), @@ -653,7 +642,7 @@ public void testUpdateSettingsRequest() throws Exception { Path extensionDir = createTempDir(); Files.write(extensionDir.resolve("extensions.yml"), extensionsYmlLines, StandardCharsets.UTF_8); ExtensionsOrchestrator extensionsOrchestrator = new ExtensionsOrchestrator(settings, extensionDir); - extensionsOrchestrator.initializeServicesAndRestHandler(restController, settingsModule, transportService, clusterService, settings); + initialize(extensionsOrchestrator); Setting componentSetting = Setting.boolSetting("falseSetting", false, Property.Dynamic); SettingType settingType = SettingType.Boolean; @@ -700,9 +689,10 @@ public void testRegisterHandler() throws Exception { settingsModule, mockTransportService, clusterService, - settings + settings, + client ); - verify(mockTransportService, times(9)).registerRequestHandler(anyString(), anyString(), anyBoolean(), anyBoolean(), any(), any()); + verify(mockTransportService, times(10)).registerRequestHandler(anyString(), anyString(), anyBoolean(), anyBoolean(), any(), any()); } private static class Example implements NamedWriteable { @@ -746,9 +736,7 @@ public int hashCode() { public void testGetNamedWriteables() throws Exception { Files.write(extensionDir.resolve("extensions.yml"), extensionsYmlLines, StandardCharsets.UTF_8); ExtensionsOrchestrator extensionsOrchestrator = new ExtensionsOrchestrator(settings, extensionDir); - transportService.start(); - transportService.acceptIncomingRequests(); - extensionsOrchestrator.initializeServicesAndRestHandler(restController, settingsModule, transportService, clusterService, settings); + initialize(extensionsOrchestrator); try ( MockLogAppender mockLogAppender = MockLogAppender.createForLoggers( @@ -774,9 +762,7 @@ public void testGetNamedWriteables() throws Exception { public void testNamedWriteableRegistryResponseHandler() throws Exception { Files.write(extensionDir.resolve("extensions.yml"), extensionsYmlLines, StandardCharsets.UTF_8); ExtensionsOrchestrator extensionsOrchestrator = new ExtensionsOrchestrator(settings, extensionDir); - transportService.start(); - transportService.acceptIncomingRequests(); - extensionsOrchestrator.initializeServicesAndRestHandler(restController, settingsModule, transportService, clusterService, settings); + initialize(extensionsOrchestrator); List extensionsList = new ArrayList<>(extensionsOrchestrator.extensionIdMap.values()); DiscoveryNode extensionNode = extensionsList.get(0); @@ -827,9 +813,7 @@ public void testGetExtensionReader() throws IOException { public void testParseNamedWriteables() throws Exception { Files.write(extensionDir.resolve("extensions.yml"), extensionsYmlLines, StandardCharsets.UTF_8); ExtensionsOrchestrator extensionsOrchestrator = new ExtensionsOrchestrator(settings, extensionDir); - transportService.start(); - transportService.acceptIncomingRequests(); - extensionsOrchestrator.initializeServicesAndRestHandler(restController, settingsModule, transportService, clusterService, settings); + initialize(extensionsOrchestrator); String requestType = ExtensionsOrchestrator.REQUEST_OPENSEARCH_PARSE_NAMED_WRITEABLE; List extensionsList = new ArrayList<>(extensionsOrchestrator.extensionIdMap.values()); @@ -868,12 +852,8 @@ public void testParseNamedWriteables() throws Exception { public void testOnIndexModule() throws Exception { Files.write(extensionDir.resolve("extensions.yml"), extensionsYmlLines, StandardCharsets.UTF_8); - ExtensionsOrchestrator extensionsOrchestrator = new ExtensionsOrchestrator(settings, extensionDir); - - transportService.start(); - transportService.acceptIncomingRequests(); - extensionsOrchestrator.initializeServicesAndRestHandler(restController, settingsModule, transportService, clusterService, settings); + initialize(extensionsOrchestrator); Environment environment = TestEnvironment.newEnvironment(settings); AnalysisRegistry emptyAnalysisRegistry = new AnalysisRegistry( @@ -916,4 +896,17 @@ public void testOnIndexModule() throws Exception { mockLogAppender.assertAllExpectationsMatched(); } } + + private void initialize(ExtensionsOrchestrator extensionsOrchestrator) { + transportService.start(); + transportService.acceptIncomingRequests(); + extensionsOrchestrator.initializeServicesAndRestHandler( + restController, + settingsModule, + transportService, + clusterService, + settings, + client + ); + } } diff --git a/server/src/test/java/org/opensearch/extensions/RegisterTransportActionsRequestTests.java b/server/src/test/java/org/opensearch/extensions/RegisterTransportActionsRequestTests.java index ed36cc5290bb1..52899d4d08871 100644 --- a/server/src/test/java/org/opensearch/extensions/RegisterTransportActionsRequestTests.java +++ b/server/src/test/java/org/opensearch/extensions/RegisterTransportActionsRequestTests.java @@ -21,7 +21,7 @@ public class RegisterTransportActionsRequestTests extends OpenSearchTestCase { @Before public void setup() { - this.originalRequest = new RegisterTransportActionsRequest(Map.of("testAction", Map.class)); + this.originalRequest = new RegisterTransportActionsRequest("extension-uniqueId", Map.of("testAction", Map.class)); } public void testRegisterTransportActionsRequest() throws IOException { @@ -37,6 +37,9 @@ public void testRegisterTransportActionsRequest() throws IOException { } public void testToString() { - assertEquals(originalRequest.toString(), "TransportActionsRequest{actions={testAction=class org.opensearch.common.collect.Map}}"); + assertEquals( + originalRequest.toString(), + "TransportActionsRequest{uniqueId=extension-uniqueId, actions={testAction=class org.opensearch.common.collect.Map}}" + ); } } diff --git a/server/src/test/java/org/opensearch/extensions/action/ExtensionActionRequestTests.java b/server/src/test/java/org/opensearch/extensions/action/ExtensionActionRequestTests.java new file mode 100644 index 0000000000000..2d4f2b5d8aa66 --- /dev/null +++ b/server/src/test/java/org/opensearch/extensions/action/ExtensionActionRequestTests.java @@ -0,0 +1,37 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.extensions.action; + +import org.opensearch.common.bytes.BytesReference; +import org.opensearch.common.io.stream.BytesStreamInput; +import org.opensearch.common.io.stream.BytesStreamOutput; +import org.opensearch.test.OpenSearchTestCase; + +import java.nio.charset.StandardCharsets; + +public class ExtensionActionRequestTests extends OpenSearchTestCase { + + public void testExtensionActionRequest() throws Exception { + String expectedAction = "test-action"; + byte[] expectedRequestBytes = "request-bytes".getBytes(StandardCharsets.UTF_8); + ExtensionActionRequest request = new ExtensionActionRequest(expectedAction, expectedRequestBytes); + + assertEquals(expectedAction, request.getAction()); + assertEquals(expectedRequestBytes, request.getRequestBytes()); + + BytesStreamOutput out = new BytesStreamOutput(); + request.writeTo(out); + BytesStreamInput in = new BytesStreamInput(BytesReference.toBytes(out.bytes())); + request = new ExtensionActionRequest(in); + + assertEquals(expectedAction, request.getAction()); + assertArrayEquals(expectedRequestBytes, request.getRequestBytes()); + assertNull(request.validate()); + } +} diff --git a/server/src/test/java/org/opensearch/extensions/action/ExtensionActionResponseTests.java b/server/src/test/java/org/opensearch/extensions/action/ExtensionActionResponseTests.java new file mode 100644 index 0000000000000..5ec8c16027da2 --- /dev/null +++ b/server/src/test/java/org/opensearch/extensions/action/ExtensionActionResponseTests.java @@ -0,0 +1,32 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.extensions.action; + +import org.opensearch.common.bytes.BytesReference; +import org.opensearch.common.io.stream.BytesStreamInput; +import org.opensearch.common.io.stream.BytesStreamOutput; +import org.opensearch.test.OpenSearchTestCase; + +import java.nio.charset.StandardCharsets; + +public class ExtensionActionResponseTests extends OpenSearchTestCase { + + public void testExtensionActionResponse() throws Exception { + byte[] expectedResponseBytes = "response-bytes".getBytes(StandardCharsets.UTF_8); + ExtensionActionResponse response = new ExtensionActionResponse(expectedResponseBytes); + + assertEquals(expectedResponseBytes, response.getResponseBytes()); + + BytesStreamOutput out = new BytesStreamOutput(); + response.writeTo(out); + BytesStreamInput in = new BytesStreamInput(BytesReference.toBytes(out.bytes())); + response = new ExtensionActionResponse(in); + assertArrayEquals(expectedResponseBytes, response.getResponseBytes()); + } +} diff --git a/server/src/test/java/org/opensearch/extensions/action/ExtensionHandleTransportRequestTests.java b/server/src/test/java/org/opensearch/extensions/action/ExtensionHandleTransportRequestTests.java new file mode 100644 index 0000000000000..15e7320ba7556 --- /dev/null +++ b/server/src/test/java/org/opensearch/extensions/action/ExtensionHandleTransportRequestTests.java @@ -0,0 +1,35 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.extensions.action; + +import org.opensearch.common.bytes.BytesReference; +import org.opensearch.common.io.stream.BytesStreamInput; +import org.opensearch.common.io.stream.BytesStreamOutput; +import org.opensearch.test.OpenSearchTestCase; + +import java.nio.charset.StandardCharsets; + +public class ExtensionHandleTransportRequestTests extends OpenSearchTestCase { + public void testExtensionHandleTransportRequest() throws Exception { + String expectedAction = "test-action"; + byte[] expectedRequestBytes = "request-bytes".getBytes(StandardCharsets.UTF_8); + ExtensionHandleTransportRequest request = new ExtensionHandleTransportRequest(expectedAction, expectedRequestBytes); + + assertEquals(expectedAction, request.getAction()); + assertEquals(expectedRequestBytes, request.getRequestBytes()); + + BytesStreamOutput out = new BytesStreamOutput(); + request.writeTo(out); + BytesStreamInput in = new BytesStreamInput(BytesReference.toBytes(out.bytes())); + request = new ExtensionHandleTransportRequest(in); + + assertEquals(expectedAction, request.getAction()); + assertArrayEquals(expectedRequestBytes, request.getRequestBytes()); + } +} diff --git a/server/src/test/java/org/opensearch/extensions/action/ExtensionProxyActionTests.java b/server/src/test/java/org/opensearch/extensions/action/ExtensionProxyActionTests.java new file mode 100644 index 0000000000000..3719c29090287 --- /dev/null +++ b/server/src/test/java/org/opensearch/extensions/action/ExtensionProxyActionTests.java @@ -0,0 +1,18 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.extensions.action; + +import org.opensearch.test.OpenSearchTestCase; + +public class ExtensionProxyActionTests extends OpenSearchTestCase { + public void testExtensionProxyAction() { + assertEquals("cluster:internal/extensions", ExtensionProxyAction.NAME); + assertEquals(ExtensionProxyAction.class, ExtensionProxyAction.INSTANCE.getClass()); + } +} diff --git a/server/src/test/java/org/opensearch/extensions/action/ExtensionTransportActionsHandlerTests.java b/server/src/test/java/org/opensearch/extensions/action/ExtensionTransportActionsHandlerTests.java new file mode 100644 index 0000000000000..9bd1fca3f1e0c --- /dev/null +++ b/server/src/test/java/org/opensearch/extensions/action/ExtensionTransportActionsHandlerTests.java @@ -0,0 +1,180 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.extensions.action; + +import org.junit.After; +import org.junit.Before; +import org.opensearch.Version; +import org.opensearch.client.node.NodeClient; +import org.opensearch.cluster.node.DiscoveryNode; +import org.opensearch.common.io.stream.NamedWriteableRegistry; +import org.opensearch.common.network.NetworkService; +import org.opensearch.common.settings.Settings; +import org.opensearch.common.transport.TransportAddress; +import org.opensearch.common.util.PageCacheRecycler; +import org.opensearch.extensions.DiscoveryExtension; +import org.opensearch.extensions.ExtensionBooleanResponse; +import org.opensearch.extensions.RegisterTransportActionsRequest; +import org.opensearch.extensions.rest.RestSendToExtensionActionTests; +import org.opensearch.indices.breaker.NoneCircuitBreakerService; +import org.opensearch.plugins.PluginInfo; +import org.opensearch.test.OpenSearchTestCase; +import org.opensearch.test.client.NoOpNodeClient; +import org.opensearch.test.transport.MockTransportService; +import org.opensearch.threadpool.TestThreadPool; +import org.opensearch.threadpool.ThreadPool; +import org.opensearch.transport.ActionNotFoundTransportException; +import org.opensearch.transport.TransportService; +import org.opensearch.transport.nio.MockNioTransport; + +import java.net.InetAddress; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +import static java.util.Collections.emptyMap; +import static java.util.Collections.emptySet; + +public class ExtensionTransportActionsHandlerTests extends OpenSearchTestCase { + private TransportService transportService; + private MockNioTransport transport; + private DiscoveryExtension discoveryExtension; + private ExtensionTransportActionsHandler extensionTransportActionsHandler; + private NodeClient client; + private final ThreadPool threadPool = new TestThreadPool(RestSendToExtensionActionTests.class.getSimpleName()); + + @Before + public void setup() throws Exception { + Settings settings = Settings.builder().put("cluster.name", "test").build(); + transport = new MockNioTransport( + settings, + Version.CURRENT, + threadPool, + new NetworkService(Collections.emptyList()), + PageCacheRecycler.NON_RECYCLING_INSTANCE, + new NamedWriteableRegistry(Collections.emptyList()), + new NoneCircuitBreakerService() + ); + transportService = new MockTransportService( + settings, + transport, + threadPool, + TransportService.NOOP_TRANSPORT_INTERCEPTOR, + (boundAddress) -> new DiscoveryNode( + "test_node", + "test_node", + boundAddress.publishAddress(), + emptyMap(), + emptySet(), + Version.CURRENT + ), + null, + Collections.emptySet() + ); + discoveryExtension = new DiscoveryExtension( + "firstExtension", + "uniqueid1", + "uniqueid1", + "myIndependentPluginHost1", + "127.0.0.0", + new TransportAddress(InetAddress.getByName("127.0.0.0"), 9300), + new HashMap(), + Version.fromString("3.0.0"), + new PluginInfo( + "firstExtension", + "Fake description 1", + "0.0.7", + Version.fromString("3.0.0"), + "14", + "fakeClass1", + new ArrayList(), + false + ) + ); + client = new NoOpNodeClient(this.getTestName()); + extensionTransportActionsHandler = new ExtensionTransportActionsHandler( + Map.of("uniqueid1", discoveryExtension), + transportService, + client + ); + } + + @Override + @After + public void tearDown() throws Exception { + super.tearDown(); + transportService.close(); + client.close(); + ThreadPool.terminate(threadPool, 30, TimeUnit.SECONDS); + } + + public void testRegisterAction() { + String action = "test-action"; + extensionTransportActionsHandler.registerAction(action, discoveryExtension); + assertEquals(discoveryExtension, extensionTransportActionsHandler.getExtension(action)); + + // Test duplicate action registration + expectThrows(IllegalArgumentException.class, () -> extensionTransportActionsHandler.registerAction(action, discoveryExtension)); + assertEquals(discoveryExtension, extensionTransportActionsHandler.getExtension(action)); + } + + public void testRegisterTransportActionsRequest() { + String action = "test-action"; + RegisterTransportActionsRequest request = new RegisterTransportActionsRequest( + "uniqueid1", + Map.of(action, ExtensionTransportActionsHandlerTests.class) + ); + ExtensionBooleanResponse response = (ExtensionBooleanResponse) extensionTransportActionsHandler + .handleRegisterTransportActionsRequest(request); + assertTrue(response.getStatus()); + assertEquals(discoveryExtension, extensionTransportActionsHandler.getExtension(action)); + + // Test duplicate action registration + response = (ExtensionBooleanResponse) extensionTransportActionsHandler.handleRegisterTransportActionsRequest(request); + assertFalse(response.getStatus()); + } + + public void testTransportActionRequestFromExtension() throws InterruptedException { + String action = "test-action"; + byte[] requestBytes = "requestBytes".getBytes(StandardCharsets.UTF_8); + TransportActionRequestFromExtension request = new TransportActionRequestFromExtension(action, requestBytes, "uniqueid1"); + // NoOpNodeClient returns null as response + expectThrows(NullPointerException.class, () -> extensionTransportActionsHandler.handleTransportActionRequestFromExtension(request)); + } + + public void testSendTransportRequestToExtension() throws InterruptedException { + String action = "test-action"; + byte[] requestBytes = "request-bytes".getBytes(StandardCharsets.UTF_8); + ExtensionActionRequest request = new ExtensionActionRequest(action, requestBytes); + + // Action not registered, expect exception + expectThrows( + ActionNotFoundTransportException.class, + () -> extensionTransportActionsHandler.sendTransportRequestToExtension(request) + ); + + // Register Action + RegisterTransportActionsRequest registerRequest = new RegisterTransportActionsRequest( + "uniqueid1", + Map.of(action, ExtensionTransportActionsHandlerTests.class) + ); + ExtensionBooleanResponse response = (ExtensionBooleanResponse) extensionTransportActionsHandler + .handleRegisterTransportActionsRequest(registerRequest); + assertTrue(response.getStatus()); + + ExtensionActionResponse extensionResponse = extensionTransportActionsHandler.sendTransportRequestToExtension(request); + assertEquals( + "Request failed: [firstExtension][127.0.0.0:9300] Node not connected", + new String(extensionResponse.getResponseBytes(), StandardCharsets.UTF_8) + ); + } +} diff --git a/server/src/test/java/org/opensearch/extensions/action/TransportActionRequestFromExtensionTests.java b/server/src/test/java/org/opensearch/extensions/action/TransportActionRequestFromExtensionTests.java new file mode 100644 index 0000000000000..a8ef5372800d9 --- /dev/null +++ b/server/src/test/java/org/opensearch/extensions/action/TransportActionRequestFromExtensionTests.java @@ -0,0 +1,42 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.extensions.action; + +import org.opensearch.common.bytes.BytesReference; +import org.opensearch.common.io.stream.BytesStreamInput; +import org.opensearch.common.io.stream.BytesStreamOutput; +import org.opensearch.test.OpenSearchTestCase; + +import java.nio.charset.StandardCharsets; + +public class TransportActionRequestFromExtensionTests extends OpenSearchTestCase { + public void testTransportActionRequestFromExtension() throws Exception { + String expectedAction = "test-action"; + byte[] expectedRequestBytes = "request-bytes".getBytes(StandardCharsets.UTF_8); + String uniqueId = "test-uniqueId"; + TransportActionRequestFromExtension request = new TransportActionRequestFromExtension( + expectedAction, + expectedRequestBytes, + uniqueId + ); + + assertEquals(expectedAction, request.getAction()); + assertEquals(expectedRequestBytes, request.getRequestBytes()); + assertEquals(uniqueId, request.getUniqueId()); + + BytesStreamOutput out = new BytesStreamOutput(); + request.writeTo(out); + BytesStreamInput in = new BytesStreamInput(BytesReference.toBytes(out.bytes())); + request = new TransportActionRequestFromExtension(in); + + assertEquals(expectedAction, request.getAction()); + assertArrayEquals(expectedRequestBytes, request.getRequestBytes()); + assertEquals(uniqueId, request.getUniqueId()); + } +} diff --git a/server/src/test/java/org/opensearch/extensions/action/TransportActionResponseToExtensionTests.java b/server/src/test/java/org/opensearch/extensions/action/TransportActionResponseToExtensionTests.java new file mode 100644 index 0000000000000..070feaa240d98 --- /dev/null +++ b/server/src/test/java/org/opensearch/extensions/action/TransportActionResponseToExtensionTests.java @@ -0,0 +1,43 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.extensions.action; + +import org.opensearch.common.bytes.BytesReference; +import org.opensearch.common.io.stream.BytesStreamInput; +import org.opensearch.common.io.stream.BytesStreamOutput; +import org.opensearch.test.OpenSearchTestCase; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; + +public class TransportActionResponseToExtensionTests extends OpenSearchTestCase { + public void testTransportActionRequestToExtension() throws IOException { + byte[] expectedResponseBytes = "response-bytes".getBytes(StandardCharsets.UTF_8); + TransportActionResponseToExtension response = new TransportActionResponseToExtension(expectedResponseBytes); + + assertEquals(expectedResponseBytes, response.getResponseBytes()); + + BytesStreamOutput out = new BytesStreamOutput(); + response.writeTo(out); + BytesStreamInput in = new BytesStreamInput(BytesReference.toBytes(out.bytes())); + response = new TransportActionResponseToExtension(in); + + assertArrayEquals(expectedResponseBytes, response.getResponseBytes()); + } + + public void testSetBytes() { + byte[] expectedResponseBytes = "response-bytes".getBytes(StandardCharsets.UTF_8); + byte[] expectedEmptyBytes = new byte[0]; + TransportActionResponseToExtension response = new TransportActionResponseToExtension(expectedEmptyBytes); + assertArrayEquals(expectedEmptyBytes, response.getResponseBytes()); + + response.setResponseBytes(expectedResponseBytes); + assertArrayEquals(expectedResponseBytes, response.getResponseBytes()); + } +}