Skip to content

Commit

Permalink
MapperService has to be passed in as null for EnginePlugins CodecServ…
Browse files Browse the repository at this point in the history
…ice constructor

Signed-off-by: Andriy Redko <andriy.redko@aiven.io>
  • Loading branch information
reta committed Mar 4, 2022
1 parent ae14259 commit 45665db
Show file tree
Hide file tree
Showing 6 changed files with 186 additions and 10 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.index.codec;

import org.opensearch.common.Nullable;
import org.opensearch.index.IndexSettings;
import org.opensearch.index.mapper.MapperService;

import java.util.Objects;

/**
* The configuration parameters necessary for the {@link CodecService} instance construction.
*/
public final class CodecServiceConfig {
private final IndexSettings indexSettings;
private final MapperService mapperService;

public CodecServiceConfig(IndexSettings indexSettings, @Nullable MapperService mapperService) {
this.indexSettings = Objects.requireNonNull(indexSettings);
this.mapperService = mapperService;
}

public IndexSettings getIndexSettings() {
return indexSettings;
}

@Nullable
public MapperService getMapperService() {
return mapperService;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.index.codec;

/**
* A factory for creating new {@link CodecService} instance
*/
@FunctionalInterface
public interface CodecServiceFactory {
/**
* Create new {@link CodecService} instance
* @param config code service configuration
* @return new {@link CodecService} instance
*/
CodecService createCodecService(CodecServiceConfig config);
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@
import org.opensearch.common.unit.TimeValue;
import org.opensearch.index.IndexSettings;
import org.opensearch.index.codec.CodecService;
import org.opensearch.index.codec.CodecServiceConfig;
import org.opensearch.index.codec.CodecServiceFactory;
import org.opensearch.index.mapper.MapperService;
import org.opensearch.index.seqno.RetentionLeases;
import org.opensearch.index.shard.ShardId;
import org.opensearch.index.store.Store;
Expand All @@ -44,28 +47,37 @@ public class EngineConfigFactory {

/** default ctor primarily used for tests without plugins */
public EngineConfigFactory(IndexSettings idxSettings) {
this(Collections.emptyList(), idxSettings);
this(Collections.emptyList(), idxSettings, null);
}

/**
* Construct a factory using the plugin service and provided index settings
*/
public EngineConfigFactory(PluginsService pluginsService, IndexSettings idxSettings) {
this(pluginsService.filterPlugins(EnginePlugin.class), idxSettings);
this(pluginsService.filterPlugins(EnginePlugin.class), idxSettings, null);
}

/**
* Construct a factory using the plugin service, provided index settings and mapper service
*/
public EngineConfigFactory(PluginsService pluginsService, IndexSettings idxSettings, MapperService mapperService) {
this(pluginsService.filterPlugins(EnginePlugin.class), idxSettings, mapperService);
}

/* private constructor to construct the factory from specific EnginePlugins and IndexSettings */
EngineConfigFactory(Collection<EnginePlugin> enginePlugins, IndexSettings idxSettings) {
EngineConfigFactory(Collection<EnginePlugin> enginePlugins, IndexSettings idxSettings, MapperService mapperService) {
Optional<CodecService> codecService = Optional.empty();
String codecServiceOverridingPlugin = null;
Optional<CodecServiceFactory> codecServiceFactory = Optional.empty();
String codecServiceFactoryOverridingPlugin = null;
Optional<TranslogDeletionPolicyFactory> translogDeletionPolicyFactory = Optional.empty();
String translogDeletionPolicyOverridingPlugin = null;
for (EnginePlugin enginePlugin : enginePlugins) {
// get overriding codec service from EnginePlugin
if (codecService.isPresent() == false) {
codecService = enginePlugin.getCustomCodecService(idxSettings);
codecServiceOverridingPlugin = enginePlugin.getClass().getName();
} else {
} else if (enginePlugin.getCustomCodecService(idxSettings).isPresent()) {
throw new IllegalStateException(
"existing codec service already overridden in: "
+ codecServiceOverridingPlugin
Expand All @@ -76,16 +88,42 @@ public EngineConfigFactory(PluginsService pluginsService, IndexSettings idxSetti
if (translogDeletionPolicyFactory.isPresent() == false) {
translogDeletionPolicyFactory = enginePlugin.getCustomTranslogDeletionPolicyFactory();
translogDeletionPolicyOverridingPlugin = enginePlugin.getClass().getName();
} else {
} else if (enginePlugin.getCustomTranslogDeletionPolicyFactory().isPresent()) {
throw new IllegalStateException(
"existing TranslogDeletionPolicyFactory is already overridden in: "
+ translogDeletionPolicyOverridingPlugin
+ " attempting to override again by: "
+ enginePlugin.getClass().getName()
);
}
// get overriding CodecServiceFactory from EnginePlugin
if (codecServiceFactory.isPresent() == false) {
codecServiceFactory = enginePlugin.getCustomCodecServiceFactory();
codecServiceFactoryOverridingPlugin = enginePlugin.getClass().getName();
} else if (enginePlugin.getCustomCodecServiceFactory().isPresent()) {
throw new IllegalStateException(
"existing codec service factory already overridden in: "
+ codecServiceFactoryOverridingPlugin
+ " attempting to override again by: "
+ enginePlugin.getClass().getName()
);
}
}

if (codecService.isPresent() && codecServiceFactory.isPresent()) {
throw new IllegalStateException(
"both codec service and codec service factory are present, codec service provided by: "
+ codecServiceOverridingPlugin
+ " conflicts with codec service factory provided by: "
+ codecServiceFactoryOverridingPlugin
);
}
this.codecService = codecService.orElse(null);

this.codecService = codecService.isPresent() ? codecService.get() : codecServiceFactory.map(factory -> {
final CodecServiceConfig config = new CodecServiceConfig(idxSettings, mapperService);
return factory.createCodecService(config);
}).orElse(null);

this.translogDeletionPolicyFactory = translogDeletionPolicyFactory.orElse((idxs, rtls) -> null);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -742,7 +742,8 @@ private synchronized IndexService createIndexService(
}

private EngineConfigFactory getEngineConfigFactory(final IndexSettings idxSettings) {
return new EngineConfigFactory(this.pluginsService, idxSettings);
final IndexService indexService = indices.get(idxSettings.getUUID());
return new EngineConfigFactory(this.pluginsService, idxSettings, (indexService != null) ? indexService.mapperService() : null);
}

private EngineFactory getEngineFactory(final IndexSettings idxSettings) {
Expand Down
16 changes: 16 additions & 0 deletions server/src/main/java/org/opensearch/plugins/EnginePlugin.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@

import org.opensearch.index.IndexSettings;
import org.opensearch.index.codec.CodecService;
import org.opensearch.index.codec.CodecServiceFactory;
import org.opensearch.index.engine.EngineFactory;
import org.opensearch.index.seqno.RetentionLeases;
import org.opensearch.index.translog.TranslogDeletionPolicy;
Expand Down Expand Up @@ -63,11 +64,26 @@ public interface EnginePlugin {
* to determine if a custom {@link CodecService} should be provided for the given index. A plugin that is not overriding
* the {@link CodecService} through the plugin can ignore this method and the Codec specified in the {@link IndexSettings}
* will be used.
*
* @deprecated Please use {@code getCustomCodecServiceFactory()} instead as it provides more context for {@link CodecService}
* instance construction.
*/
@Deprecated
default Optional<CodecService> getCustomCodecService(IndexSettings indexSettings) {
return Optional.empty();
}

/**
* EXPERT:
* When an index is created this method is invoked for each engine plugin. Engine plugins can inspect the index settings
* to determine if a custom {@link CodecServiceFactory} should be provided for the given index. A plugin that is not overriding
* the {@link CodecServiceFactory} through the plugin can ignore this method and the default Codec specified in the
* {@link IndexSettings} will be used.
*/
default Optional<CodecServiceFactory> getCustomCodecServiceFactory() {
return Optional.empty();
}

/**
* When an index is created this method is invoked for each engine plugin. Engine plugins that need to provide a
* custom {@link TranslogDeletionPolicy} can override this method to return a function that takes the {@link IndexSettings}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import org.opensearch.common.unit.TimeValue;
import org.opensearch.index.IndexSettings;
import org.opensearch.index.codec.CodecService;
import org.opensearch.index.codec.CodecServiceFactory;
import org.opensearch.index.seqno.RetentionLeases;
import org.opensearch.index.translog.TranslogDeletionPolicy;
import org.opensearch.index.translog.TranslogDeletionPolicyFactory;
Expand All @@ -40,7 +41,7 @@ public void testCreateEngineConfigFromFactory() {
.build();
List<EnginePlugin> plugins = Collections.singletonList(new FooEnginePlugin());
IndexSettings indexSettings = IndexSettingsModule.newIndexSettings("test", meta.getSettings());
EngineConfigFactory factory = new EngineConfigFactory(plugins, indexSettings);
EngineConfigFactory factory = new EngineConfigFactory(plugins, indexSettings, null);

EngineConfig config = factory.newEngineConfig(
null,
Expand Down Expand Up @@ -81,7 +82,19 @@ public void testCreateEngineConfigFromFactoryMultipleCodecServiceIllegalStateExc
List<EnginePlugin> plugins = Arrays.asList(new FooEnginePlugin(), new BarEnginePlugin());
IndexSettings indexSettings = IndexSettingsModule.newIndexSettings("test", meta.getSettings());

expectThrows(IllegalStateException.class, () -> new EngineConfigFactory(plugins, indexSettings));
expectThrows(IllegalStateException.class, () -> new EngineConfigFactory(plugins, indexSettings, null));
}

public void testCreateEngineConfigFromFactoryMultipleCodecServiceAndFactoryIllegalStateException() {
IndexMetadata meta = IndexMetadata.builder("test")
.settings(settings(Version.CURRENT))
.numberOfShards(1)
.numberOfReplicas(1)
.build();
List<EnginePlugin> plugins = Arrays.asList(new FooEnginePlugin(), new BakEnginePlugin());
IndexSettings indexSettings = IndexSettingsModule.newIndexSettings("test", meta.getSettings());

expectThrows(IllegalStateException.class, () -> new EngineConfigFactory(plugins, indexSettings, null));
}

public void testCreateEngineConfigFromFactoryMultipleCustomTranslogDeletionPolicyFactoryIllegalStateException() {
Expand All @@ -93,7 +106,44 @@ public void testCreateEngineConfigFromFactoryMultipleCustomTranslogDeletionPolic
List<EnginePlugin> plugins = Arrays.asList(new FooEnginePlugin(), new BazEnginePlugin());
IndexSettings indexSettings = IndexSettingsModule.newIndexSettings("test", meta.getSettings());

expectThrows(IllegalStateException.class, () -> new EngineConfigFactory(plugins, indexSettings));
expectThrows(IllegalStateException.class, () -> new EngineConfigFactory(plugins, indexSettings, null));
}

public void testCreateCodecServiceFromFactory() {
IndexMetadata meta = IndexMetadata.builder("test")
.settings(settings(Version.CURRENT))
.numberOfShards(1)
.numberOfReplicas(1)
.build();
List<EnginePlugin> plugins = Arrays.asList(new BakEnginePlugin());
IndexSettings indexSettings = IndexSettingsModule.newIndexSettings("test", meta.getSettings());

EngineConfigFactory factory = new EngineConfigFactory(plugins, indexSettings, null);
EngineConfig config = factory.newEngineConfig(
null,
null,
indexSettings,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
TimeValue.timeValueMinutes(5),
null,
null,
null,
null,
null,
() -> new RetentionLeases(0, 0, Collections.emptyList()),
null,
null
);
assertNotNull(config.getCodec());
}

private static class FooEnginePlugin extends Plugin implements EnginePlugin {
Expand Down Expand Up @@ -125,6 +175,18 @@ public Optional<CodecService> getCustomCodecService(IndexSettings indexSettings)
}
}

private static class BakEnginePlugin extends Plugin implements EnginePlugin {
@Override
public Optional<EngineFactory> getEngineFactory(final IndexSettings indexSettings) {
return Optional.empty();
}

@Override
public Optional<CodecServiceFactory> getCustomCodecServiceFactory() {
return Optional.of(config -> new CodecService(config.getMapperService(), LogManager.getLogger(getClass())));
}
}

private static class BazEnginePlugin extends Plugin implements EnginePlugin {
@Override
public Optional<EngineFactory> getEngineFactory(final IndexSettings indexSettings) {
Expand Down

0 comments on commit 45665db

Please sign in to comment.