Skip to content

Commit

Permalink
Introduce index store plugins (#32375)
Browse files Browse the repository at this point in the history
Today we allow plugins to add index store implementations yet we are not
doing this in our new way of managing plugins as pull versus push. That
is, today we still allow plugins to push index store providers via an on
index module call where they can turn around and add an index
store. Aside from being inconsistent with how we manage plugins today
where we would look to pull such implementations from plugins at node
creation time, it also means that we do not know at a top-level (for
example, in the indices service) which index stores are available. This
commit addresses this by adding a dedicated plugin type for index store
plugins, removing the index module hook for adding index stores, and by
aggregating these into the top-level of the indices service.
  • Loading branch information
jasontedor committed Jul 26, 2018
1 parent ed44b1b commit ac0125f
Show file tree
Hide file tree
Showing 9 changed files with 201 additions and 69 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,26 @@

package org.elasticsearch.plugin.store.smb;

import org.elasticsearch.index.IndexModule;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.store.IndexStore;
import org.elasticsearch.index.store.smbmmapfs.SmbMmapFsIndexStore;
import org.elasticsearch.index.store.smbsimplefs.SmbSimpleFsIndexStore;
import org.elasticsearch.plugins.IndexStorePlugin;
import org.elasticsearch.plugins.Plugin;

public class SMBStorePlugin extends Plugin {
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.function.Function;

public class SMBStorePlugin extends Plugin implements IndexStorePlugin {

@Override
public void onIndexModule(IndexModule indexModule) {
indexModule.addIndexStore("smb_mmap_fs", SmbMmapFsIndexStore::new);
indexModule.addIndexStore("smb_simple_fs", SmbSimpleFsIndexStore::new);
public Map<String, Function<IndexSettings, IndexStore>> getIndexStoreFactories() {
final Map<String, Function<IndexSettings, IndexStore>> indexStoreFactories = new HashMap<>(2);
indexStoreFactories.put("smb_mmap_fs", SmbMmapFsIndexStore::new);
indexStoreFactories.put("smb_simple_fs", SmbSimpleFsIndexStore::new);
return Collections.unmodifiableMap(indexStoreFactories);
}

}
40 changes: 14 additions & 26 deletions server/src/main/java/org/elasticsearch/index/IndexModule.java
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import org.elasticsearch.indices.breaker.CircuitBreakerService;
import org.elasticsearch.indices.fielddata.cache.IndicesFieldDataCache;
import org.elasticsearch.indices.mapper.MapperRegistry;
import org.elasticsearch.plugins.IndexStorePlugin;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.threadpool.ThreadPool;

Expand All @@ -74,7 +75,7 @@
* {@link #addSimilarity(String, TriFunction)} while existing Providers can be referenced through Settings under the
* {@link IndexModule#SIMILARITY_SETTINGS_PREFIX} prefix along with the "type" value. For example, to reference the
* {@link BM25Similarity}, the configuration {@code "index.similarity.my_similarity.type : "BM25"} can be used.</li>
* <li>{@link IndexStore} - Custom {@link IndexStore} instances can be registered via {@link #addIndexStore(String, Function)}</li>
* <li>{@link IndexStore} - Custom {@link IndexStore} instances can be registered via {@link IndexStorePlugin}</li>
* <li>{@link IndexEventListener} - Custom {@link IndexEventListener} instances can be registered via
* {@link #addIndexEventListener(IndexEventListener)}</li>
* <li>Settings update listener - Custom settings update listener can be registered via
Expand Down Expand Up @@ -109,7 +110,7 @@ public final class IndexModule {
private SetOnce<IndexSearcherWrapperFactory> indexSearcherWrapper = new SetOnce<>();
private final Set<IndexEventListener> indexEventListeners = new HashSet<>();
private final Map<String, TriFunction<Settings, Version, ScriptService, Similarity>> similarities = new HashMap<>();
private final Map<String, Function<IndexSettings, IndexStore>> storeTypes = new HashMap<>();
private final Map<String, Function<IndexSettings, IndexStore>> indexStoreFactories;
private final SetOnce<BiFunction<IndexSettings, IndicesQueryCache, QueryCache>> forceQueryCacheProvider = new SetOnce<>();
private final List<SearchOperationListener> searchOperationListeners = new ArrayList<>();
private final List<IndexingOperationListener> indexOperationListeners = new ArrayList<>();
Expand All @@ -119,16 +120,22 @@ public final class IndexModule {
* Construct the index module for the index with the specified index settings. The index module contains extension points for plugins
* via {@link org.elasticsearch.plugins.PluginsService#onIndexModule(IndexModule)}.
*
* @param indexSettings the index settings
* @param analysisRegistry the analysis registry
* @param engineFactory the engine factory
* @param indexSettings the index settings
* @param analysisRegistry the analysis registry
* @param engineFactory the engine factory
* @param indexStoreFactories the available store types
*/
public IndexModule(final IndexSettings indexSettings, final AnalysisRegistry analysisRegistry, final EngineFactory engineFactory) {
public IndexModule(
final IndexSettings indexSettings,
final AnalysisRegistry analysisRegistry,
final EngineFactory engineFactory,
final Map<String, Function<IndexSettings, IndexStore>> indexStoreFactories) {
this.indexSettings = indexSettings;
this.analysisRegistry = analysisRegistry;
this.engineFactory = Objects.requireNonNull(engineFactory);
this.searchOperationListeners.add(new SearchSlowLog(indexSettings));
this.indexOperationListeners.add(new IndexingSlowLog(indexSettings));
this.indexStoreFactories = Collections.unmodifiableMap(indexStoreFactories);
}

/**
Expand Down Expand Up @@ -245,25 +252,6 @@ public void addIndexOperationListener(IndexingOperationListener listener) {
this.indexOperationListeners.add(listener);
}

/**
* Adds an {@link IndexStore} type to this index module. Typically stores are registered with a reference to
* it's constructor:
* <pre>
* indexModule.addIndexStore("my_store_type", MyStore::new);
* </pre>
*
* @param type the type to register
* @param provider the instance provider / factory method
*/
public void addIndexStore(String type, Function<IndexSettings, IndexStore> provider) {
ensureNotFrozen();
if (storeTypes.containsKey(type)) {
throw new IllegalArgumentException("key [" + type +"] already registered");
}
storeTypes.put(type, provider);
}


/**
* Registers the given {@link Similarity} with the given name.
* The function takes as parameters:<ul>
Expand Down Expand Up @@ -360,7 +348,7 @@ public IndexService newIndexService(
if (Strings.isEmpty(storeType) || isBuiltinType(storeType)) {
store = new IndexStore(indexSettings);
} else {
Function<IndexSettings, IndexStore> factory = storeTypes.get(storeType);
Function<IndexSettings, IndexStore> factory = indexStoreFactories.get(storeType);
if (factory == null) {
throw new IllegalArgumentException("Unknown store type [" + storeType + "]");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@
import org.elasticsearch.index.shard.IndexingOperationListener;
import org.elasticsearch.index.shard.IndexingStats;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.store.IndexStore;
import org.elasticsearch.indices.breaker.CircuitBreakerService;
import org.elasticsearch.indices.cluster.IndicesClusterStateService;
import org.elasticsearch.indices.fielddata.cache.IndicesFieldDataCache;
Expand Down Expand Up @@ -181,6 +182,7 @@ public class IndicesService extends AbstractLifecycleComponent
private final IndicesQueryCache indicesQueryCache;
private final MetaStateService metaStateService;
private final Collection<Function<IndexSettings, Optional<EngineFactory>>> engineFactoryProviders;
private final Map<String, Function<IndexSettings, IndexStore>> indexStoreFactories;

@Override
protected void doStart() {
Expand All @@ -193,7 +195,8 @@ public IndicesService(Settings settings, PluginsService pluginsService, NodeEnvi
MapperRegistry mapperRegistry, NamedWriteableRegistry namedWriteableRegistry, ThreadPool threadPool,
IndexScopedSettings indexScopedSettings, CircuitBreakerService circuitBreakerService, BigArrays bigArrays,
ScriptService scriptService, Client client, MetaStateService metaStateService,
Collection<Function<IndexSettings, Optional<EngineFactory>>> engineFactoryProviders) {
Collection<Function<IndexSettings, Optional<EngineFactory>>> engineFactoryProviders,
Map<String, Function<IndexSettings, IndexStore>> indexStoreFactories) {
super(settings);
this.threadPool = threadPool;
this.pluginsService = pluginsService;
Expand Down Expand Up @@ -225,6 +228,7 @@ public void onRemoval(ShardId shardId, String fieldName, boolean wasEvicted, lon
this.cacheCleaner = new CacheCleaner(indicesFieldDataCache, indicesRequestCache, logger, threadPool, this.cleanInterval);
this.metaStateService = metaStateService;
this.engineFactoryProviders = engineFactoryProviders;
this.indexStoreFactories = indexStoreFactories;
}

@Override
Expand Down Expand Up @@ -464,7 +468,7 @@ private synchronized IndexService createIndexService(final String reason,
idxSettings.getNumberOfReplicas(),
reason);

final IndexModule indexModule = new IndexModule(idxSettings, analysisRegistry, getEngineFactory(idxSettings));
final IndexModule indexModule = new IndexModule(idxSettings, analysisRegistry, getEngineFactory(idxSettings), indexStoreFactories);
for (IndexingOperationListener operationListener : indexingOperationListeners) {
indexModule.addIndexOperationListener(operationListener);
}
Expand Down Expand Up @@ -524,7 +528,7 @@ private EngineFactory getEngineFactory(final IndexSettings idxSettings) {
*/
public synchronized MapperService createIndexMapperService(IndexMetaData indexMetaData) throws IOException {
final IndexSettings idxSettings = new IndexSettings(indexMetaData, this.settings, indexScopedSettings);
final IndexModule indexModule = new IndexModule(idxSettings, analysisRegistry, getEngineFactory(idxSettings));
final IndexModule indexModule = new IndexModule(idxSettings, analysisRegistry, getEngineFactory(idxSettings), indexStoreFactories);
pluginsService.onIndexModule(indexModule);
return indexModule.newIndexMapperService(xContentRegistry, mapperRegistry, scriptService);
}
Expand Down
12 changes: 11 additions & 1 deletion server/src/main/java/org/elasticsearch/node/Node.java
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.analysis.AnalysisRegistry;
import org.elasticsearch.index.engine.EngineFactory;
import org.elasticsearch.index.store.IndexStore;
import org.elasticsearch.indices.IndicesModule;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.indices.analysis.AnalysisModule;
Expand All @@ -119,6 +120,7 @@
import org.elasticsearch.plugins.ClusterPlugin;
import org.elasticsearch.plugins.DiscoveryPlugin;
import org.elasticsearch.plugins.EnginePlugin;
import org.elasticsearch.plugins.IndexStorePlugin;
import org.elasticsearch.plugins.IngestPlugin;
import org.elasticsearch.plugins.MapperPlugin;
import org.elasticsearch.plugins.MetaDataUpgrader;
Expand Down Expand Up @@ -409,11 +411,19 @@ protected Node(final Environment environment, Collection<Class<? extends Plugin>
enginePlugins.stream().map(plugin -> plugin::getEngineFactory))
.collect(Collectors.toList());


final Map<String, Function<IndexSettings, IndexStore>> indexStoreFactories =
pluginsService.filterPlugins(IndexStorePlugin.class)
.stream()
.map(IndexStorePlugin::getIndexStoreFactories)
.flatMap(m -> m.entrySet().stream())
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));

final IndicesService indicesService =
new IndicesService(settings, pluginsService, nodeEnvironment, xContentRegistry, analysisModule.getAnalysisRegistry(),
clusterModule.getIndexNameExpressionResolver(), indicesModule.getMapperRegistry(), namedWriteableRegistry,
threadPool, settingsModule.getIndexScopedSettings(), circuitBreakerService, bigArrays,
scriptModule.getScriptService(), client, metaStateService, engineFactoryProviders);
scriptModule.getScriptService(), client, metaStateService, engineFactoryProviders, indexStoreFactories);


Collection<Object> pluginComponents = pluginsService.filterPlugins(Plugin.class).stream()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.plugins;

import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.store.IndexStore;

import java.util.Map;
import java.util.function.Function;

/**
* A plugin that provides alternative index store implementations.
*/
public interface IndexStorePlugin {

/**
* The index store factories for this plugin. When an index is created the store type setting
* {@link org.elasticsearch.index.IndexModule#INDEX_STORE_TYPE_SETTING} on the index will be examined and either use the default or a
* built-in type, or looked up among all the index store factories from {@link IndexStore} plugins.
*
* @return a map from store type to an index store factory
*/
Map<String, Function<IndexSettings, IndexStore>> getIndexStoreFactories();

}
Loading

0 comments on commit ac0125f

Please sign in to comment.