Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add getSettings support for AD #147

Merged
merged 6 commits into from
Sep 20, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions src/main/java/org/opensearch/sdk/Extension.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,11 @@
import java.io.File;
import java.io.IOException;
import java.net.URL;
import java.util.Collections;
import java.util.List;

import org.opensearch.common.settings.Setting;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;

Expand All @@ -36,6 +39,15 @@ public interface Extension {
*/
List<ExtensionRestHandler> getExtensionRestHandlers();

/**
* Gets an optional list of custom {@link Setting} for the extension to register with OpenSearch.
*
* @return a list of custom settings this extension uses.
*/
default List<Setting<?>> getSettings() {
return Collections.emptyList();
}

/**
* Helper method to read extension settings from a YAML file.
*
Expand Down
2 changes: 2 additions & 0 deletions src/main/java/org/opensearch/sdk/ExtensionRestHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ public interface ExtensionRestHandler {

/**
* The list of {@link Route}s that this ExtensionRestHandler is responsible for handling.
*
* @return The routes this handler will handle.
*/
List<Route> routes();

Expand Down
43 changes: 36 additions & 7 deletions src/main/java/org/opensearch/sdk/ExtensionsRunner.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,10 @@
import org.opensearch.extensions.rest.RegisterRestActionsRequest;
import org.opensearch.extensions.rest.RestExecuteOnExtensionRequest;
import org.opensearch.extensions.rest.RestExecuteOnExtensionResponse;
import org.opensearch.extensions.settings.RegisterCustomSettingsRequest;
import org.opensearch.common.network.NetworkModule;
import org.opensearch.common.network.NetworkService;
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.PageCacheRecycler;
import org.opensearch.extensions.ExtensionBooleanResponse;
Expand All @@ -45,7 +47,7 @@
import org.opensearch.sdk.handlers.ClusterSettingsResponseHandler;
import org.opensearch.sdk.handlers.ClusterStateResponseHandler;
import org.opensearch.sdk.handlers.LocalNodeResponseHandler;
import org.opensearch.sdk.handlers.RegisterRestActionsResponseHandler;
import org.opensearch.sdk.handlers.ExtensionStringResponseHandler;
import org.opensearch.search.SearchModule;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.ClusterConnectionManager;
Expand Down Expand Up @@ -80,8 +82,11 @@ public class ExtensionsRunner {
private String uniqueId;
private DiscoveryNode opensearchNode;
private TransportService extensionTransportService = null;
private ExtensionRestPathRegistry extensionRestPathRegistry = new ExtensionRestPathRegistry();

// The routes and classes which handle the REST requests
private final ExtensionRestPathRegistry extensionRestPathRegistry = new ExtensionRestPathRegistry();
// Custom settings from the extension's getSettings
private final List<Setting<?>> customSettings;
// Node name, host, and port
private final Settings settings;
private final TransportInterceptor NOOP_TRANSPORT_INTERCEPTOR = new TransportInterceptor() {
};
Expand All @@ -104,6 +109,7 @@ public ExtensionsRunner() throws IOException {
.put(TransportSettings.BIND_HOST.getKey(), extensionSettings.getHostAddress())
.put(TransportSettings.PORT.getKey(), extensionSettings.getHostPort())
.build();
this.customSettings = Collections.emptyList();
}

/**
Expand All @@ -119,12 +125,14 @@ private ExtensionsRunner(Extension extension) throws IOException {
.put(TransportSettings.BIND_HOST.getKey(), extensionSettings.getHostAddress())
.put(TransportSettings.PORT.getKey(), extensionSettings.getHostPort())
.build();
// store rest handlers in the map
// store REST handlers in the registry
for (ExtensionRestHandler extensionRestHandler : extension.getExtensionRestHandlers()) {
for (Route route : extensionRestHandler.routes()) {
extensionRestPathRegistry.registerHandler(route.getMethod(), route.getPath(), extensionRestHandler);
}
}
// save custom settings
this.customSettings = extension.getSettings();
// initialize the transport service
this.initializeExtensionTransportService(this.getSettings());
// start listening on configured port and wait for connection from OpenSearch
Expand Down Expand Up @@ -171,10 +179,11 @@ InitializeExtensionsResponse handleExtensionInitRequest(InitializeExtensionsRequ
try {
return new InitializeExtensionsResponse(settings.get(NODE_NAME_SETTING));
} finally {
// After sending successful response to initialization, send the REST API
// After sending successful response to initialization, send the REST API and Settings
setOpensearchNode(opensearchNode);
extensionTransportService.connectToNode(opensearchNode);
sendRegisterRestActionsRequest(extensionTransportService);
sendRegisterCustomSettingsRequest(extensionTransportService);
transportActions.sendRegisterTransportActionsRequest(extensionTransportService, opensearchNode);
}
}
Expand Down Expand Up @@ -403,7 +412,7 @@ public void startTransportService(TransportService transportService) {
public void sendRegisterRestActionsRequest(TransportService transportService) {
List<String> extensionRestPaths = extensionRestPathRegistry.getRegisteredPaths();
logger.info("Sending Register REST Actions request to OpenSearch for " + extensionRestPaths);
RegisterRestActionsResponseHandler registerActionsResponseHandler = new RegisterRestActionsResponseHandler();
ExtensionStringResponseHandler registerActionsResponseHandler = new ExtensionStringResponseHandler();
try {
transportService.sendRequest(
opensearchNode,
Expand All @@ -416,6 +425,26 @@ public void sendRegisterRestActionsRequest(TransportService transportService) {
}
}

/**
* Requests that OpenSearch register the custom settings for this extension.
*
* @param transportService The TransportService defining the connection to OpenSearch.
*/
public void sendRegisterCustomSettingsRequest(TransportService transportService) {
logger.info("Sending Settings request to OpenSearch");
ExtensionStringResponseHandler registerCustomSettingsResponseHandler = new ExtensionStringResponseHandler();
try {
transportService.sendRequest(
opensearchNode,
ExtensionsOrchestrator.REQUEST_EXTENSION_REGISTER_CUSTOM_SETTINGS,
new RegisterCustomSettingsRequest(getUniqueId(), customSettings),
registerCustomSettingsResponseHandler
);
} catch (Exception e) {
logger.info("Failed to send Register Settings request to OpenSearch", e);
}
}

/**
* Requests the cluster state from OpenSearch. The result will be handled by a {@link ClusterStateResponseHandler}.
*
Expand Down Expand Up @@ -509,7 +538,7 @@ private Settings getSettings() {
*
* @param timeout The timeout for the listener in milliseconds. A timeout of 0 means no timeout.
*/
public static void startActionListener(int timeout) {
public void startActionListener(int timeout) {
final ActionListener actionListener = new ActionListener();
actionListener.runActionListener(true, timeout);
}
Expand Down
1 change: 1 addition & 0 deletions src/main/java/org/opensearch/sdk/SDKClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ public OpenSearchClient initializeClient(String hostAddress, int port) throws IO
}

/**
* Close this client.
*
* @throws IOException if closing the restClient fails
*/
Expand Down
7 changes: 5 additions & 2 deletions src/main/java/org/opensearch/sdk/TransportActions.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.extensions.ExtensionsOrchestrator;
import org.opensearch.extensions.RegisterTransportActionsRequest;
import org.opensearch.sdk.handlers.ExtensionResponseHandler;
import org.opensearch.sdk.handlers.ExtensionBooleanResponseHandler;
import org.opensearch.transport.TransportService;

import java.util.HashMap;
Expand All @@ -30,6 +30,9 @@ public class TransportActions {

/**
* Constructor for TransportActions. Creates a map of transportActions for this extension.
*
* @param <Request> the TransportAction request
* @param <Response> the TransportAction response
* @param transportActions is the list of actions the extension would like to register with OpenSearch.
*/
public <Request extends ActionRequest, Response extends ActionResponse> TransportActions(
Expand All @@ -46,7 +49,7 @@ public <Request extends ActionRequest, Response extends ActionResponse> Transpor
*/
public void sendRegisterTransportActionsRequest(TransportService transportService, DiscoveryNode opensearchNode) {
logger.info("Sending Register Transport Actions request to OpenSearch");
ExtensionResponseHandler registerTransportActionsResponseHandler = new ExtensionResponseHandler();
ExtensionBooleanResponseHandler registerTransportActionsResponseHandler = new ExtensionBooleanResponseHandler();
try {
transportService.sendRequest(
opensearchNode,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@
/**
* This class handles the response {{@link org.opensearch.extensions.ExtensionBooleanResponse }} from OpenSearch to Extension.
*/
public class ExtensionResponseHandler implements TransportResponseHandler<ExtensionBooleanResponse> {
private static final Logger logger = LogManager.getLogger(ExtensionResponseHandler.class);
public class ExtensionBooleanResponseHandler implements TransportResponseHandler<ExtensionBooleanResponse> {
private static final Logger logger = LogManager.getLogger(ExtensionBooleanResponseHandler.class);

@Override
public void handleResponse(ExtensionBooleanResponse response) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,29 +10,26 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.extensions.rest.RegisterRestActionsResponse;
import org.opensearch.sdk.ExtensionsRunner;
import org.opensearch.extensions.ExtensionStringResponse;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.TransportException;
import org.opensearch.transport.TransportResponseHandler;
import org.opensearch.transport.TransportService;

import java.io.IOException;

/**
* This class handles the response from OpenSearch to a {@link ExtensionsRunner#sendRegisterRestActionsRequest(TransportService)} call.
* This class handles the response from OpenSearch to call returning an {@link ExtensionStringResponse}.
*/
public class RegisterRestActionsResponseHandler implements TransportResponseHandler<RegisterRestActionsResponse> {
private static final Logger logger = LogManager.getLogger(RegisterRestActionsResponseHandler.class);
public class ExtensionStringResponseHandler implements TransportResponseHandler<ExtensionStringResponse> {
private static final Logger logger = LogManager.getLogger(ExtensionStringResponseHandler.class);

@Override
public void handleResponse(RegisterRestActionsResponse response) {
public void handleResponse(ExtensionStringResponse response) {
logger.info("received {}", response.getResponse());
}

@Override
public void handleException(TransportException exp) {
logger.info("RegisterActionsRequest failed", exp);
logger.info("Request failed", exp);
}

@Override
Expand All @@ -41,7 +38,7 @@ public String executor() {
}

@Override
public RegisterRestActionsResponse read(StreamInput in) throws IOException {
return new RegisterRestActionsResponse(in);
public ExtensionStringResponse read(StreamInput in) throws IOException {
return new ExtensionStringResponse(in);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
import org.opensearch.common.settings.Settings;
import org.opensearch.common.transport.TransportAddress;
import org.opensearch.extensions.ExtensionsOrchestrator;
import org.opensearch.sdk.handlers.ExtensionResponseHandler;
import org.opensearch.sdk.handlers.ExtensionBooleanResponseHandler;
import org.opensearch.tasks.Task;
import org.opensearch.test.OpenSearchTestCase;
import org.opensearch.transport.Transport;
Expand Down Expand Up @@ -87,7 +87,7 @@ public void testRegisterTransportAction() {
any(),
eq(ExtensionsOrchestrator.REQUEST_EXTENSION_REGISTER_TRANSPORT_ACTIONS),
any(),
any(ExtensionResponseHandler.class)
any(ExtensionBooleanResponseHandler.class)
);
}
}
12 changes: 10 additions & 2 deletions src/test/java/org/opensearch/sdk/TestExtensionsRunner.java
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@
import org.opensearch.sdk.handlers.ClusterSettingsResponseHandler;
import org.opensearch.sdk.handlers.ClusterStateResponseHandler;
import org.opensearch.sdk.handlers.LocalNodeResponseHandler;
import org.opensearch.sdk.handlers.RegisterRestActionsResponseHandler;
import org.opensearch.sdk.handlers.ExtensionStringResponseHandler;
import org.opensearch.test.OpenSearchTestCase;
import org.opensearch.transport.Transport;
import org.opensearch.transport.TransportService;
Expand Down Expand Up @@ -198,6 +198,14 @@ public void testRegisterRestActionsRequest() {

extensionsRunner.sendRegisterRestActionsRequest(transportService);

verify(transportService, times(1)).sendRequest(any(), anyString(), any(), any(RegisterRestActionsResponseHandler.class));
verify(transportService, times(1)).sendRequest(any(), anyString(), any(), any(ExtensionStringResponseHandler.class));
}

@Test
public void testRegisterCustomSettingsRequest() {

extensionsRunner.sendRegisterCustomSettingsRequest(transportService);

verify(transportService, times(1)).sendRequest(any(), anyString(), any(), any(ExtensionStringResponseHandler.class));
}
}