Skip to content

Commit

Permalink
Delayed CodecService instantiation up to the shard initialization
Browse files Browse the repository at this point in the history
Signed-off-by: Andriy Redko <andriy.redko@aiven.io>
  • Loading branch information
reta committed Mar 4, 2022
1 parent 17c0abb commit 5c136be
Show file tree
Hide file tree
Showing 4 changed files with 75 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,30 +42,23 @@
* A factory to create an EngineConfig based on custom plugin overrides
*/
public class EngineConfigFactory {
private final CodecService codecService;
private final CodecServiceFactory codecServiceFactory;
private final TranslogDeletionPolicyFactory translogDeletionPolicyFactory;

/** default ctor primarily used for tests without plugins */
public EngineConfigFactory(IndexSettings idxSettings) {
this(Collections.emptyList(), idxSettings, null);
this(Collections.emptyList(), idxSettings);
}

/**
* Construct a factory using the plugin service and provided index settings
*/
public EngineConfigFactory(PluginsService pluginsService, IndexSettings idxSettings) {
this(pluginsService.filterPlugins(EnginePlugin.class), idxSettings, null);
}

/**
* Construct a factory using the plugin service, provided index settings and mapper service
*/
public EngineConfigFactory(PluginsService pluginsService, IndexSettings idxSettings, MapperService mapperService) {
this(pluginsService.filterPlugins(EnginePlugin.class), idxSettings, mapperService);
this(pluginsService.filterPlugins(EnginePlugin.class), idxSettings);
}

/* private constructor to construct the factory from specific EnginePlugins and IndexSettings */
EngineConfigFactory(Collection<EnginePlugin> enginePlugins, IndexSettings idxSettings, MapperService mapperService) {
EngineConfigFactory(Collection<EnginePlugin> enginePlugins, IndexSettings idxSettings) {
Optional<CodecService> codecService = Optional.empty();
String codecServiceOverridingPlugin = null;
Optional<CodecServiceFactory> codecServiceFactory = Optional.empty();
Expand Down Expand Up @@ -119,15 +112,16 @@ public EngineConfigFactory(PluginsService pluginsService, IndexSettings idxSetti
);
}

this.codecService = codecService.isPresent() ? codecService.get() : codecServiceFactory.map(factory -> {
final CodecServiceConfig config = new CodecServiceConfig(idxSettings, mapperService);
return factory.createCodecService(config);
}).orElse(null);

final CodecService instance = codecService.orElse(null);
this.codecServiceFactory = (instance != null) ? (config) -> instance : codecServiceFactory.orElse(null);
this.translogDeletionPolicyFactory = translogDeletionPolicyFactory.orElse((idxs, rtls) -> null);
}

/** Instantiates a new EngineConfig from the provided custom overrides */
/**
* Instantiates a new EngineConfig from the provided custom overrides
* @deprecated please use overloaded {@code newEngineConfig} with {@link MapperService}
*/
@Deprecated
public EngineConfig newEngineConfig(
ShardId shardId,
ThreadPool threadPool,
Expand All @@ -152,6 +146,59 @@ public EngineConfig newEngineConfig(
LongSupplier primaryTermSupplier,
EngineConfig.TombstoneDocSupplier tombstoneDocSupplier
) {
return newEngineConfig(
shardId,
threadPool,
indexSettings,
warmer,
store,
mergePolicy,
analyzer,
similarity,
codecService,
eventListener,
queryCache,
queryCachingPolicy,
translogConfig,
flushMergesAfter,
externalRefreshListener,
internalRefreshListener,
indexSort,
circuitBreakerService,
globalCheckpointSupplier,
retentionLeasesSupplier,
primaryTermSupplier,
tombstoneDocSupplier,
null
);
}

/** Instantiates a new EngineConfig from the provided custom overrides */
public EngineConfig newEngineConfig(
ShardId shardId,
ThreadPool threadPool,
IndexSettings indexSettings,
Engine.Warmer warmer,
Store store,
MergePolicy mergePolicy,
Analyzer analyzer,
Similarity similarity,
CodecService codecService,
Engine.EventListener eventListener,
QueryCache queryCache,
QueryCachingPolicy queryCachingPolicy,
TranslogConfig translogConfig,
TimeValue flushMergesAfter,
List<ReferenceManager.RefreshListener> externalRefreshListener,
List<ReferenceManager.RefreshListener> internalRefreshListener,
Sort indexSort,
CircuitBreakerService circuitBreakerService,
LongSupplier globalCheckpointSupplier,
Supplier<RetentionLeases> retentionLeasesSupplier,
LongSupplier primaryTermSupplier,
EngineConfig.TombstoneDocSupplier tombstoneDocSupplier,
MapperService mapperService
) {

return new EngineConfig(
shardId,
Expand All @@ -162,7 +209,9 @@ public EngineConfig newEngineConfig(
mergePolicy,
analyzer,
similarity,
this.codecService != null ? this.codecService : codecService,
this.codecServiceFactory != null
? this.codecServiceFactory.createCodecService(new CodecServiceConfig(indexSettings, mapperService))
: codecService,
eventListener,
queryCache,
queryCachingPolicy,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3177,7 +3177,8 @@ private EngineConfig newEngineConfig(LongSupplier globalCheckpointSupplier) {
globalCheckpointSupplier,
replicationTracker::getRetentionLeases,
() -> getOperationPrimaryTerm(),
tombstoneDocSupplier()
tombstoneDocSupplier(),
mapperService
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -742,8 +742,7 @@ private synchronized IndexService createIndexService(
}

private EngineConfigFactory getEngineConfigFactory(final IndexSettings idxSettings) {
final IndexService indexService = indices.get(idxSettings.getUUID());
return new EngineConfigFactory(this.pluginsService, idxSettings, (indexService != null) ? indexService.mapperService() : null);
return new EngineConfigFactory(this.pluginsService, idxSettings);
}

private EngineFactory getEngineFactory(final IndexSettings idxSettings) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ public void testCreateEngineConfigFromFactory() {
.build();
List<EnginePlugin> plugins = Collections.singletonList(new FooEnginePlugin());
IndexSettings indexSettings = IndexSettingsModule.newIndexSettings("test", meta.getSettings());
EngineConfigFactory factory = new EngineConfigFactory(plugins, indexSettings, null);
EngineConfigFactory factory = new EngineConfigFactory(plugins, indexSettings);

EngineConfig config = factory.newEngineConfig(
null,
Expand Down Expand Up @@ -82,7 +82,7 @@ public void testCreateEngineConfigFromFactoryMultipleCodecServiceIllegalStateExc
List<EnginePlugin> plugins = Arrays.asList(new FooEnginePlugin(), new BarEnginePlugin());
IndexSettings indexSettings = IndexSettingsModule.newIndexSettings("test", meta.getSettings());

expectThrows(IllegalStateException.class, () -> new EngineConfigFactory(plugins, indexSettings, null));
expectThrows(IllegalStateException.class, () -> new EngineConfigFactory(plugins, indexSettings));
}

public void testCreateEngineConfigFromFactoryMultipleCodecServiceAndFactoryIllegalStateException() {
Expand All @@ -94,7 +94,7 @@ public void testCreateEngineConfigFromFactoryMultipleCodecServiceAndFactoryIlleg
List<EnginePlugin> plugins = Arrays.asList(new FooEnginePlugin(), new BakEnginePlugin());
IndexSettings indexSettings = IndexSettingsModule.newIndexSettings("test", meta.getSettings());

expectThrows(IllegalStateException.class, () -> new EngineConfigFactory(plugins, indexSettings, null));
expectThrows(IllegalStateException.class, () -> new EngineConfigFactory(plugins, indexSettings));
}

public void testCreateEngineConfigFromFactoryMultipleCustomTranslogDeletionPolicyFactoryIllegalStateException() {
Expand All @@ -106,7 +106,7 @@ public void testCreateEngineConfigFromFactoryMultipleCustomTranslogDeletionPolic
List<EnginePlugin> plugins = Arrays.asList(new FooEnginePlugin(), new BazEnginePlugin());
IndexSettings indexSettings = IndexSettingsModule.newIndexSettings("test", meta.getSettings());

expectThrows(IllegalStateException.class, () -> new EngineConfigFactory(plugins, indexSettings, null));
expectThrows(IllegalStateException.class, () -> new EngineConfigFactory(plugins, indexSettings));
}

public void testCreateCodecServiceFromFactory() {
Expand All @@ -118,7 +118,7 @@ public void testCreateCodecServiceFromFactory() {
List<EnginePlugin> plugins = Arrays.asList(new BakEnginePlugin());
IndexSettings indexSettings = IndexSettingsModule.newIndexSettings("test", meta.getSettings());

EngineConfigFactory factory = new EngineConfigFactory(plugins, indexSettings, null);
EngineConfigFactory factory = new EngineConfigFactory(plugins, indexSettings);
EngineConfig config = factory.newEngineConfig(
null,
null,
Expand Down

0 comments on commit 5c136be

Please sign in to comment.