diff --git a/server/src/main/java/org/opensearch/extensions/ExtensionsManager.java b/server/src/main/java/org/opensearch/extensions/ExtensionsManager.java index 0edc215e536a3..e099098d9c231 100644 --- a/server/src/main/java/org/opensearch/extensions/ExtensionsManager.java +++ b/server/src/main/java/org/opensearch/extensions/ExtensionsManager.java @@ -19,6 +19,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; import java.util.concurrent.TimeUnit; @@ -107,8 +108,7 @@ public static enum OpenSearchRequestType { private final Path extensionsPath; private ExtensionTransportActionsHandler extensionTransportActionsHandler; - // A list of initialized extensions, a subset of the values of map below which includes all extensions - private List extensions; + private Map initializedExtensions; private Map extensionIdMap; private RestActionsRequestHandler restActionsRequestHandler; private CustomSettingsRequestHandler customSettingsRequestHandler; @@ -118,21 +118,16 @@ public static enum OpenSearchRequestType { private AddSettingsUpdateConsumerRequestHandler addSettingsUpdateConsumerRequestHandler; private NodeClient client; - public ExtensionsManager() { - this.extensionsPath = Path.of(""); - } - /** * Instantiate a new ExtensionsManager object to handle requests and responses from extensions. This is called during Node bootstrap. * - * @param settings Settings from the node the orchestrator is running on. * @param extensionsPath Path to a directory containing extensions. * @throws IOException If the extensions discovery file is not properly retrieved. */ - public ExtensionsManager(Settings settings, Path extensionsPath) throws IOException { + public ExtensionsManager(Path extensionsPath) throws IOException { logger.info("ExtensionsManager initialized"); this.extensionsPath = extensionsPath; - this.extensions = new ArrayList(); + this.initializedExtensions = new HashMap(); this.extensionIdMap = new HashMap(); // will be initialized in initializeServicesAndRestHandler which is called after the Node is initialized this.transportService = null; @@ -187,6 +182,16 @@ public void initializeServicesAndRestHandler( registerRequestHandler(); } + /** + * Lookup an initialized extension by its unique id + * + * @param extensionId The unique extension identifier + * @return An optional of the DiscoveryExtensionNode instance for the matching extension + */ + public Optional lookupInitializedExtensionById(final String extensionId) { + return Optional.ofNullable(this.initializedExtensions.get(extensionId)); + } + /** * Handles Transport Request from {@link org.opensearch.extensions.action.ExtensionTransportAction} which was invoked by an extension via {@link ExtensionTransportActionsHandler}. * @@ -342,7 +347,7 @@ private void loadExtension(Extension extension) throws IOException { } /** - * Iterate through all extensions and initialize them. Initialized extensions will be added to the {@link #extensions}. + * Iterate through all extensions and initialize them. Initialized extensions will be added to the {@link #initializedExtensions}. */ public void initialize() { for (DiscoveryExtensionNode extension : extensionIdMap.values()) { @@ -366,7 +371,7 @@ public void handleResponse(InitializeExtensionResponse response) { for (DiscoveryExtensionNode extension : extensionIdMap.values()) { if (extension.getName().equals(response.getName())) { extension.setImplementedInterfaces(response.getImplementedInterfaces()); - extensions.add(extension); + initializedExtensions.put(extension.getId(), extension); logger.info("Initialized extension: " + extension.getName()); break; } @@ -426,11 +431,17 @@ TransportResponse handleExtensionRequest(ExtensionRequest extensionRequest) thro case REQUEST_EXTENSION_DEPENDENCY_INFORMATION: String uniqueId = extensionRequest.getUniqueId(); if (uniqueId == null) { - return new ExtensionDependencyResponse(extensions); + return new ExtensionDependencyResponse( + initializedExtensions.entrySet().stream().map(e -> e.getValue()).collect(Collectors.toList()) + ); } else { ExtensionDependency matchingId = new ExtensionDependency(uniqueId, Version.CURRENT); return new ExtensionDependencyResponse( - extensions.stream().filter(e -> e.dependenciesContain(matchingId)).collect(Collectors.toList()) + initializedExtensions.entrySet() + .stream() + .map(e -> e.getValue()) + .filter(e -> e.dependenciesContain(matchingId)) + .collect(Collectors.toList()) ); } default: @@ -623,154 +634,83 @@ private ExtensionsSettings readFromExtensionsYml(Path filePath) throws IOExcepti } } - public static String getRequestExtensionActionName() { + static String getRequestExtensionActionName() { return REQUEST_EXTENSION_ACTION_NAME; } - public static String getIndicesExtensionPointActionName() { + static String getIndicesExtensionPointActionName() { return INDICES_EXTENSION_POINT_ACTION_NAME; } - public static String getIndicesExtensionNameActionName() { + static String getIndicesExtensionNameActionName() { return INDICES_EXTENSION_NAME_ACTION_NAME; } - public static String getRequestExtensionClusterState() { + static String getRequestExtensionClusterState() { return REQUEST_EXTENSION_CLUSTER_STATE; } - public static String getRequestExtensionClusterSettings() { + static String getRequestExtensionClusterSettings() { return REQUEST_EXTENSION_CLUSTER_SETTINGS; } - public static Logger getLogger() { + static Logger getLogger() { return logger; } - public Path getExtensionsPath() { + Path getExtensionsPath() { return extensionsPath; } - public List getExtensions() { - return extensions; - } - - public TransportService getTransportService() { + TransportService getTransportService() { return transportService; } - public ClusterService getClusterService() { + ClusterService getClusterService() { return clusterService; } - public static String getRequestExtensionRegisterRestActions() { - return REQUEST_EXTENSION_REGISTER_REST_ACTIONS; - } - - public static String getRequestRestExecuteOnExtensionAction() { - return REQUEST_REST_EXECUTE_ON_EXTENSION_ACTION; - } - - public Map getExtensionIdMap() { + Map getExtensionIdMap() { return extensionIdMap; } - public RestActionsRequestHandler getRestActionsRequestHandler() { + RestActionsRequestHandler getRestActionsRequestHandler() { return restActionsRequestHandler; } - public void setExtensions(List extensions) { - this.extensions = extensions; - } - - public void setExtensionIdMap(Map extensionIdMap) { + void setExtensionIdMap(Map extensionIdMap) { this.extensionIdMap = extensionIdMap; } - public void setRestActionsRequestHandler(RestActionsRequestHandler restActionsRequestHandler) { + void setRestActionsRequestHandler(RestActionsRequestHandler restActionsRequestHandler) { this.restActionsRequestHandler = restActionsRequestHandler; } - public void setTransportService(TransportService transportService) { + void setTransportService(TransportService transportService) { this.transportService = transportService; } - public void setClusterService(ClusterService clusterService) { + void setClusterService(ClusterService clusterService) { this.clusterService = clusterService; } - public static String getRequestExtensionRegisterTransportActions() { - return REQUEST_EXTENSION_REGISTER_TRANSPORT_ACTIONS; - } - - public static String getRequestExtensionRegisterCustomSettings() { - return REQUEST_EXTENSION_REGISTER_CUSTOM_SETTINGS; - } - - public CustomSettingsRequestHandler getCustomSettingsRequestHandler() { + CustomSettingsRequestHandler getCustomSettingsRequestHandler() { return customSettingsRequestHandler; } - public void setCustomSettingsRequestHandler(CustomSettingsRequestHandler customSettingsRequestHandler) { + void setCustomSettingsRequestHandler(CustomSettingsRequestHandler customSettingsRequestHandler) { this.customSettingsRequestHandler = customSettingsRequestHandler; } - public static String getRequestExtensionEnvironmentSettings() { - return REQUEST_EXTENSION_ENVIRONMENT_SETTINGS; - } - - public static String getRequestExtensionAddSettingsUpdateConsumer() { - return REQUEST_EXTENSION_ADD_SETTINGS_UPDATE_CONSUMER; - } - - public static String getRequestExtensionUpdateSettings() { - return REQUEST_EXTENSION_UPDATE_SETTINGS; - } - - public AddSettingsUpdateConsumerRequestHandler getAddSettingsUpdateConsumerRequestHandler() { + AddSettingsUpdateConsumerRequestHandler getAddSettingsUpdateConsumerRequestHandler() { return addSettingsUpdateConsumerRequestHandler; } - public void setAddSettingsUpdateConsumerRequestHandler( - AddSettingsUpdateConsumerRequestHandler addSettingsUpdateConsumerRequestHandler - ) { + void setAddSettingsUpdateConsumerRequestHandler(AddSettingsUpdateConsumerRequestHandler addSettingsUpdateConsumerRequestHandler) { this.addSettingsUpdateConsumerRequestHandler = addSettingsUpdateConsumerRequestHandler; } - public Settings getEnvironmentSettings() { + Settings getEnvironmentSettings() { return environmentSettings; } - - public void setEnvironmentSettings(Settings environmentSettings) { - this.environmentSettings = environmentSettings; - } - - public static String getRequestExtensionHandleTransportAction() { - return REQUEST_EXTENSION_HANDLE_TRANSPORT_ACTION; - } - - public static String getTransportActionRequestFromExtension() { - return TRANSPORT_ACTION_REQUEST_FROM_EXTENSION; - } - - public static int getExtensionRequestWaitTimeout() { - return EXTENSION_REQUEST_WAIT_TIMEOUT; - } - - public ExtensionTransportActionsHandler getExtensionTransportActionsHandler() { - return extensionTransportActionsHandler; - } - - public void setExtensionTransportActionsHandler(ExtensionTransportActionsHandler extensionTransportActionsHandler) { - this.extensionTransportActionsHandler = extensionTransportActionsHandler; - } - - public NodeClient getClient() { - return client; - } - - public void setClient(NodeClient client) { - this.client = client; - } - } diff --git a/server/src/main/java/org/opensearch/extensions/NoopExtensionsManager.java b/server/src/main/java/org/opensearch/extensions/NoopExtensionsManager.java index 24f71476dcb1e..0f069d5395e8c 100644 --- a/server/src/main/java/org/opensearch/extensions/NoopExtensionsManager.java +++ b/server/src/main/java/org/opensearch/extensions/NoopExtensionsManager.java @@ -8,6 +8,23 @@ package org.opensearch.extensions; +import java.io.IOException; +import java.net.UnknownHostException; +import java.nio.file.Path; +import java.util.Optional; + +import org.opensearch.action.ActionModule; +import org.opensearch.client.node.NodeClient; +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.settings.Settings; +import org.opensearch.common.settings.SettingsModule; + +import org.opensearch.extensions.action.ExtensionActionRequest; +import org.opensearch.extensions.action.ExtensionActionResponse; +import org.opensearch.extensions.action.RemoteExtensionActionResponse; +import org.opensearch.index.IndexModule; +import org.opensearch.transport.TransportService; + /** * Noop class for ExtensionsManager * @@ -15,7 +32,41 @@ */ public class NoopExtensionsManager extends ExtensionsManager { - public NoopExtensionsManager() { - super(); + public NoopExtensionsManager() throws IOException { + super(Path.of("")); + } + + public void initializeServicesAndRestHandler( + ActionModule actionModule, + SettingsModule settingsModule, + TransportService transportService, + ClusterService clusterService, + Settings initialEnvironmentSettings, + NodeClient client + ) { + // no-op + } + + public RemoteExtensionActionResponse handleRemoteTransportRequest(ExtensionActionRequest request) throws Exception { + // no-op empty response + return new RemoteExtensionActionResponse(true, new byte[0]); + } + + public ExtensionActionResponse handleTransportRequest(ExtensionActionRequest request) throws Exception { + // no-op empty response + return new ExtensionActionResponse(new byte[0]); + } + + public void initialize() { + // no-op + } + + public void onIndexModule(IndexModule indexModule) throws UnknownHostException { + // no-op + } + + public Optional lookupInitializedExtensionById(final String extensionId) { + // no-op not found + return Optional.empty(); } } diff --git a/server/src/main/java/org/opensearch/indices/IndicesService.java b/server/src/main/java/org/opensearch/indices/IndicesService.java index 60df48800f921..713ca69a72b08 100644 --- a/server/src/main/java/org/opensearch/indices/IndicesService.java +++ b/server/src/main/java/org/opensearch/indices/IndicesService.java @@ -82,7 +82,6 @@ import org.opensearch.common.util.concurrent.OpenSearchExecutors; import org.opensearch.core.concurrency.OpenSearchRejectedExecutionException; import org.opensearch.common.util.concurrent.OpenSearchThreadPoolExecutor; -import org.opensearch.common.util.FeatureFlags; import org.opensearch.common.util.iterable.Iterables; import org.opensearch.common.util.set.Sets; import org.opensearch.common.xcontent.LoggingDeprecationHandler; @@ -926,9 +925,7 @@ private synchronized IndexService createIndexService( indexModule.addIndexOperationListener(operationListener); } pluginsService.onIndexModule(indexModule); - if (FeatureFlags.isEnabled(FeatureFlags.EXTENSIONS)) { - extensionsManager.onIndexModule(indexModule); - } + extensionsManager.onIndexModule(indexModule); for (IndexEventListener listener : builtInListeners) { indexModule.addIndexEventListener(listener); } diff --git a/server/src/main/java/org/opensearch/node/Node.java b/server/src/main/java/org/opensearch/node/Node.java index fc65d2e9b5d08..761cea003707d 100644 --- a/server/src/main/java/org/opensearch/node/Node.java +++ b/server/src/main/java/org/opensearch/node/Node.java @@ -466,7 +466,7 @@ protected Node( final IdentityService identityService = new IdentityService(settings, identityPlugins); if (FeatureFlags.isEnabled(FeatureFlags.EXTENSIONS)) { - this.extensionsManager = new ExtensionsManager(tmpSettings, initialEnvironment.extensionDir()); + this.extensionsManager = new ExtensionsManager(initialEnvironment.extensionDir()); } else { this.extensionsManager = new NoopExtensionsManager(); } @@ -875,16 +875,14 @@ protected Node( ); TopNSearchTasksLogger taskConsumer = new TopNSearchTasksLogger(settings, settingsModule.getClusterSettings()); transportService.getTaskManager().registerTaskResourceConsumer(taskConsumer); - if (FeatureFlags.isEnabled(FeatureFlags.EXTENSIONS)) { - this.extensionsManager.initializeServicesAndRestHandler( - actionModule, - settingsModule, - transportService, - clusterService, - environment.settings(), - client - ); - } + this.extensionsManager.initializeServicesAndRestHandler( + actionModule, + settingsModule, + transportService, + clusterService, + environment.settings(), + client + ); final GatewayMetaState gatewayMetaState = new GatewayMetaState(); final ResponseCollectorService responseCollectorService = new ResponseCollectorService(clusterService); final SearchTransportService searchTransportService = new SearchTransportService( @@ -1317,9 +1315,7 @@ public Node start() throws NodeValidationException { assert clusterService.localNode().equals(localNodeFactory.getNode()) : "clusterService has a different local node than the factory provided"; transportService.acceptIncomingRequests(); - if (FeatureFlags.isEnabled(FeatureFlags.EXTENSIONS)) { - extensionsManager.initialize(); - } + extensionsManager.initialize(); discovery.startInitialJoin(); final TimeValue initialStateTimeout = DiscoverySettings.INITIAL_STATE_TIMEOUT_SETTING.get(settings()); configureNodeAndClusterIdStateListener(clusterService); diff --git a/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java b/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java index f218895754b7f..3f5ef4b824afa 100644 --- a/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java +++ b/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java @@ -157,7 +157,6 @@ import org.opensearch.common.settings.Settings; import org.opensearch.common.transport.TransportAddress; import org.opensearch.common.util.BigArrays; -import org.opensearch.common.util.FeatureFlags; import org.opensearch.common.util.PageCacheRecycler; import org.opensearch.common.util.concurrent.AbstractRunnable; import org.opensearch.common.util.concurrent.PrioritizedOpenSearchThreadPoolExecutor; @@ -1804,82 +1803,43 @@ public void onFailure(final Exception e) { final SetOnce repositoriesServiceReference = new SetOnce<>(); repositoriesServiceReference.set(repositoriesService); FileCacheCleaner fileCacheCleaner = new FileCacheCleaner(nodeEnv, null); - if (FeatureFlags.isEnabled(FeatureFlags.EXTENSIONS)) { - indicesService = new IndicesService( - settings, - mock(PluginsService.class), - mock(ExtensionsManager.class), - nodeEnv, - namedXContentRegistry, - new AnalysisRegistry( - environment, - emptyMap(), - emptyMap(), - emptyMap(), - emptyMap(), - emptyMap(), - emptyMap(), - emptyMap(), - emptyMap(), - emptyMap() - ), - indexNameExpressionResolver, - mapperRegistry, - namedWriteableRegistry, - threadPool, - indexScopedSettings, - new NoneCircuitBreakerService(), - bigArrays, - scriptService, - clusterService, - client, - new MetaStateService(nodeEnv, namedXContentRegistry), - Collections.emptyList(), + indicesService = new IndicesService( + settings, + mock(PluginsService.class), + mock(ExtensionsManager.class), + nodeEnv, + namedXContentRegistry, + new AnalysisRegistry( + environment, emptyMap(), - null, emptyMap(), - new RemoteSegmentStoreDirectoryFactory(() -> repositoriesService), - repositoriesServiceReference::get, - fileCacheCleaner - ); - } else { - indicesService = new IndicesService( - settings, - mock(PluginsService.class), - nodeEnv, - namedXContentRegistry, - new AnalysisRegistry( - environment, - emptyMap(), - emptyMap(), - emptyMap(), - emptyMap(), - emptyMap(), - emptyMap(), - emptyMap(), - emptyMap(), - emptyMap() - ), - indexNameExpressionResolver, - mapperRegistry, - namedWriteableRegistry, - threadPool, - indexScopedSettings, - new NoneCircuitBreakerService(), - bigArrays, - scriptService, - clusterService, - client, - new MetaStateService(nodeEnv, namedXContentRegistry), - Collections.emptyList(), emptyMap(), - null, emptyMap(), - new RemoteSegmentStoreDirectoryFactory(() -> repositoriesService), - repositoriesServiceReference::get, - fileCacheCleaner - ); - } + emptyMap(), + emptyMap(), + emptyMap(), + emptyMap(), + emptyMap() + ), + indexNameExpressionResolver, + mapperRegistry, + namedWriteableRegistry, + threadPool, + indexScopedSettings, + new NoneCircuitBreakerService(), + bigArrays, + scriptService, + clusterService, + client, + new MetaStateService(nodeEnv, namedXContentRegistry), + Collections.emptyList(), + emptyMap(), + null, + emptyMap(), + new RemoteSegmentStoreDirectoryFactory(() -> repositoriesService), + repositoriesServiceReference::get, + fileCacheCleaner + ); final RecoverySettings recoverySettings = new RecoverySettings(settings, clusterSettings); snapshotShardsService = new SnapshotShardsService( settings,