Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move tribe to a module #25778

Merged
merged 12 commits into from
Jul 28, 2017
2 changes: 1 addition & 1 deletion buildSrc/src/main/resources/checkstyle_suppressions.xml
Original file line number Diff line number Diff line change
Expand Up @@ -418,7 +418,7 @@
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]snapshots[/\\]SnapshotShardsService.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]snapshots[/\\]SnapshotsService.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]threadpool[/\\]ThreadPool.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]tribe[/\\]TribeService.java" checks="LineLength" />
<suppress files="modules[/\\]tribe[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]tribe[/\\]TribeService.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]apache[/\\]lucene[/\\]queries[/\\]BlendedTermQueryTests.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]VersionTests.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]action[/\\]RejectionActionIT.java" checks="LineLength" />
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ public class ClusterModule extends AbstractModule {
private final IndexNameExpressionResolver indexNameExpressionResolver;
private final AllocationDeciders allocationDeciders;
private final AllocationService allocationService;
private final Runnable onStarted;
// pkg private for tests
final Collection<AllocationDecider> deciderList;
final ShardsAllocator shardsAllocator;
Expand All @@ -106,6 +107,7 @@ public ClusterModule(Settings settings, ClusterService clusterService, List<Clus
this.clusterService = clusterService;
this.indexNameExpressionResolver = new IndexNameExpressionResolver(settings);
this.allocationService = new AllocationService(settings, allocationDeciders, shardsAllocator, clusterInfoService);
this.onStarted = () -> clusterPlugins.forEach(plugin -> plugin.onNodeStarted());
}


Expand Down Expand Up @@ -241,4 +243,8 @@ protected void configure() {
bind(AllocationDeciders.class).toInstance(allocationDeciders);
bind(ShardsAllocator.class).toInstance(shardsAllocator);
}

public Runnable onStarted() {
return onStarted;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.cluster;

import org.elasticsearch.cluster.metadata.MetaData;

/**
* Interface to allow merging {@link org.elasticsearch.cluster.metadata.MetaData.Custom}.
* When multiple Mergable Custom metadata of the same type are found (from underlying clusters), the
* Custom metadata can be merged using {@link #merge(MetaData.Custom)}.
*
* @param <T> type of custom meta data
*/
public interface MergableCustomMetaData<T extends MetaData.Custom> {

/**
* Merges this custom metadata with other, returning either this or <code>other</code> custom metadata.
* This method should not mutate either <code>this</code> or the <code>other</code> custom metadata.
*
* @param other custom meta data
* @return the same instance or <code>other</code> custom metadata based on implementation
* if both the instances are considered equal, implementations should return this
* instance to avoid redundant cluster state changes.
*/
T merge(T other);
}
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,6 @@
import org.elasticsearch.indices.fielddata.cache.IndicesFieldDataCache;
import org.elasticsearch.indices.recovery.RecoverySettings;
import org.elasticsearch.indices.store.IndicesStore;
import org.elasticsearch.ingest.IngestService;
import org.elasticsearch.monitor.fs.FsService;
import org.elasticsearch.monitor.jvm.JvmGcMonitorService;
import org.elasticsearch.monitor.jvm.JvmService;
Expand All @@ -91,7 +90,6 @@
import org.elasticsearch.transport.TcpTransport;
import org.elasticsearch.transport.Transport;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.tribe.TribeService;
import org.elasticsearch.watcher.ResourceWatcherService;

import java.util.Arrays;
Expand Down Expand Up @@ -369,13 +367,6 @@ public void apply(Settings value, Settings current, Settings previous) {
ThreadContext.DEFAULT_HEADERS_SETTING,
ESLoggerFactory.LOG_DEFAULT_LEVEL_SETTING,
ESLoggerFactory.LOG_LEVEL_SETTING,
TribeService.BLOCKS_METADATA_SETTING,
TribeService.BLOCKS_WRITE_SETTING,
TribeService.BLOCKS_WRITE_INDICES_SETTING,
TribeService.BLOCKS_READ_INDICES_SETTING,
TribeService.BLOCKS_METADATA_INDICES_SETTING,
TribeService.ON_CONFLICT_SETTING,
TribeService.TRIBE_NAME_SETTING,
NodeEnvironment.MAX_LOCAL_STORAGE_NODES_SETTING,
NodeEnvironment.ENABLE_LUCENE_SEGMENT_INFOS_TRACE_SETTING,
OsService.REFRESH_INTERVAL_SETTING,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.tribe.TribeService;

import java.io.IOException;
import java.util.Arrays;
Expand All @@ -49,8 +48,6 @@ public class SettingsModule implements Module {
private final Set<String> settingsFilterPattern = new HashSet<>();
private final Map<String, Setting<?>> nodeSettings = new HashMap<>();
private final Map<String, Setting<?>> indexSettings = new HashMap<>();
private static final Predicate<String> TRIBE_CLIENT_NODE_SETTINGS_PREDICATE = (s) -> s.startsWith("tribe.")
&& TribeService.TRIBE_SETTING_KEYS.contains(s) == false;
private final Logger logger;
private final IndexScopedSettings indexScopedSettings;
private final ClusterSettings clusterSettings;
Expand Down Expand Up @@ -135,9 +132,7 @@ public SettingsModule(Settings settings, List<Setting<?>> additionalSettings, Li
}
}
// by now we are fully configured, lets check node level settings for unregistered index settings
final Predicate<String> acceptOnlyClusterSettings = TRIBE_CLIENT_NODE_SETTINGS_PREDICATE.negate();
clusterSettings.validate(settings.filter(acceptOnlyClusterSettings));
validateTribeSettings(settings, clusterSettings);
clusterSettings.validate(settings);
this.settingsFilter = new SettingsFilter(settings, settingsFilterPattern);
}

Expand Down Expand Up @@ -195,20 +190,6 @@ private void registerSettingsFilter(String filter) {
settingsFilterPattern.add(filter);
}

private void validateTribeSettings(Settings settings, ClusterSettings clusterSettings) {
Map<String, Settings> groups = settings.filter(TRIBE_CLIENT_NODE_SETTINGS_PREDICATE).getGroups("tribe.", true);
for (Map.Entry<String, Settings> tribeSettings : groups.entrySet()) {
Settings thisTribesSettings = tribeSettings.getValue();
for (Map.Entry<String, String> entry : thisTribesSettings.getAsMap().entrySet()) {
try {
clusterSettings.validate(entry.getKey(), thisTribesSettings);
} catch (IllegalArgumentException ex) {
throw new IllegalArgumentException("tribe." + tribeSettings.getKey() +" validation failed: "+ ex.getMessage(), ex);
}
}
}
}

public Settings getSettings() {
return settings;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,6 @@ public DiscoveryModule(Settings settings, ThreadPool threadPool, TransportServic
discoveryTypes.put("zen",
() -> new ZenDiscovery(settings, threadPool, transportService, namedWriteableRegistry, masterService, clusterApplier,
clusterSettings, hostsProvider, allocationService));
discoveryTypes.put("tribe", () -> new TribeDiscovery(settings, transportService, masterService, clusterApplier));
discoveryTypes.put("single-node", () -> new SingleNodeDiscovery(settings, transportService, masterService, clusterApplier));
for (DiscoveryPlugin plugin : plugins) {
plugin.getDiscoveryTypes(threadPool, transportService, namedWriteableRegistry,
Expand Down
31 changes: 7 additions & 24 deletions core/src/main/java/org/elasticsearch/node/Node.java
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,6 @@
import org.elasticsearch.transport.Transport;
import org.elasticsearch.transport.TransportInterceptor;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.tribe.TribeService;
import org.elasticsearch.usage.UsageService;
import org.elasticsearch.watcher.ResourceWatcherService;

Expand Down Expand Up @@ -256,8 +255,6 @@ protected Node(final Environment environment, Collection<Class<? extends Plugin>
Settings tmpSettings = Settings.builder().put(environment.settings())
.put(Client.CLIENT_TYPE_SETTING_S.getKey(), CLIENT_TYPE).build();

tmpSettings = TribeService.processSettings(tmpSettings);

// create the node environment as soon as possible, to recover the node id and enable logging
try {
nodeEnvironment = new NodeEnvironment(tmpSettings, environment);
Expand Down Expand Up @@ -385,15 +382,6 @@ protected Node(final Environment environment, Collection<Class<? extends Plugin>
.flatMap(p -> p.getNamedXContent().stream()),
ClusterModule.getNamedXWriteables().stream())
.flatMap(Function.identity()).collect(toList()));
final TribeService tribeService =
new TribeService(
settings,
environment.configFile(),
clusterService,
nodeId,
namedWriteableRegistry,
(s, p) -> newTribeClientNode(s, classpathPlugins, p));
resourcesToClose.add(tribeService);
modules.add(new RepositoriesModule(this.environment, pluginsService.filterPlugins(RepositoryPlugin.class), xContentRegistry));
final MetaStateService metaStateService = new MetaStateService(settings, nodeEnvironment, xContentRegistry);
final IndicesService indicesService = new IndicesService(settings, pluginsService, nodeEnvironment, xContentRegistry,
Expand All @@ -404,7 +392,9 @@ protected Node(final Environment environment, Collection<Class<? extends Plugin>

Collection<Object> pluginComponents = pluginsService.filterPlugins(Plugin.class).stream()
.flatMap(p -> p.createComponents(client, clusterService, threadPool, resourceWatcherService,
scriptModule.getScriptService(), xContentRegistry).stream())
scriptModule.getScriptService(), xContentRegistry, environment, nodeEnvironment,
namedWriteableRegistry,
(settings, configPath) -> newNode(settings, classpathPlugins, configPath)).stream())
.collect(Collectors.toList());
final RestController restController = actionModule.getRestController();
final NetworkModule networkModule = new NetworkModule(settings, false, pluginsService.filterPlugins(NetworkPlugin.class),
Expand Down Expand Up @@ -458,7 +448,6 @@ protected Node(final Environment environment, Collection<Class<? extends Plugin>
b.bind(Environment.class).toInstance(this.environment);
b.bind(ThreadPool.class).toInstance(threadPool);
b.bind(NodeEnvironment.class).toInstance(nodeEnvironment);
b.bind(TribeService.class).toInstance(tribeService);
b.bind(ResourceWatcherService.class).toInstance(resourceWatcherService);
b.bind(CircuitBreakerService.class).toInstance(circuitBreakerService);
b.bind(BigArrays.class).toInstance(bigArrays);
Expand Down Expand Up @@ -612,10 +601,6 @@ public Node start() throws NodeValidationException {
Discovery discovery = injector.getInstance(Discovery.class);
clusterService.getMasterService().setClusterStatePublisher(discovery::publish);

// start before the cluster service since it adds/removes initial Cluster state blocks
final TribeService tribeService = injector.getInstance(TribeService.class);
tribeService.start();

// Start the transport service now so the publish address will be added to the local disco node in ClusterService
TransportService transportService = injector.getInstance(TransportService.class);
transportService.getTaskManager().setTaskResultsService(injector.getInstance(TaskResultsService.class));
Expand Down Expand Up @@ -682,10 +667,10 @@ public void onTimeout(TimeValue timeout) {
writePortsFile("transport", transport.boundAddress());
}

// start nodes now, after the http server, because it may take some time
tribeService.startNodes();
logger.info("started");

pluginsService.filterPlugins(ClusterPlugin.class).forEach(ClusterPlugin::onNodeStarted);

return this;
}

Expand All @@ -696,7 +681,6 @@ private Node stop() {
Logger logger = Loggers.getLogger(Node.class, NODE_NAME_SETTING.get(settings));
logger.info("stopping ...");

injector.getInstance(TribeService.class).stop();
injector.getInstance(ResourceWatcherService.class).stop();
if (NetworkModule.HTTP_ENABLED.get(settings)) {
injector.getInstance(HttpServerTransport.class).stop();
Expand Down Expand Up @@ -744,7 +728,6 @@ public synchronized void close() throws IOException {
List<Closeable> toClose = new ArrayList<>();
StopWatch stopWatch = new StopWatch("node_close");
toClose.add(() -> stopWatch.start("tribe"));
toClose.add(injector.getInstance(TribeService.class));
toClose.add(() -> stopWatch.stop().start("node_service"));
toClose.add(nodeService);
toClose.add(() -> stopWatch.stop().start("http"));
Expand Down Expand Up @@ -920,8 +903,8 @@ private List<NetworkService.CustomNameResolver> getCustomNameResolvers(List<Disc
return customNameResolvers;
}

/** Constructs an internal node used as a client into a cluster fronted by this tribe node. */
protected Node newTribeClientNode(Settings settings, Collection<Class<? extends Plugin>> classpathPlugins, Path configPath) {
/** Constructs a new node based on the following settings. Overridden by tests */
protected Node newNode(Settings settings, Collection<Class<? extends Plugin>> classpathPlugins, Path configPath) {
return new Node(new Environment(settings, configPath), classpathPlugins);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,4 +58,11 @@ default Collection<AllocationDecider> createAllocationDeciders(Settings settings
default Map<String, Supplier<ShardsAllocator>> getShardsAllocators(Settings settings, ClusterSettings clusterSettings) {
return Collections.emptyMap();
}

/**
* Called when the node is started
*/
default void onNodeStarted() {

}
}
9 changes: 8 additions & 1 deletion core/src/main/java/org/elasticsearch/plugins/Plugin.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,11 @@
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.discovery.DiscoveryModule;
import org.elasticsearch.env.Environment;
import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.index.IndexModule;
import org.elasticsearch.indices.analysis.AnalysisModule;
import org.elasticsearch.node.Node;
import org.elasticsearch.repositories.RepositoriesModule;
import org.elasticsearch.script.ScriptModule;
import org.elasticsearch.script.ScriptService;
Expand All @@ -50,10 +53,12 @@

import java.io.Closeable;
import java.io.IOException;
import java.nio.file.Path;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.function.BiFunction;
import java.util.function.UnaryOperator;

/**
Expand Down Expand Up @@ -107,7 +112,9 @@ public Collection<Class<? extends LifecycleComponent>> getGuiceServiceClasses()
*/
public Collection<Object> createComponents(Client client, ClusterService clusterService, ThreadPool threadPool,
ResourceWatcherService resourceWatcherService, ScriptService scriptService,
NamedXContentRegistry xContentRegistry) {
NamedXContentRegistry xContentRegistry, Environment environment,
NodeEnvironment nodeEnvironment, NamedWriteableRegistry namedWriteableRegistry,
BiFunction<Settings, Path, Node> clientNodeBuilder) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should not make this a first class citizen. Instead I suggest we remove the Node#newNode entirely and add a MockTribePlugin class that passes such a function that delegates to / creates a MockNode. Given that tribe goes away I think that is the right tradeoff. WDYT?

return Collections.emptyList();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,21 +22,27 @@
import org.apache.logging.log4j.Logger;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.env.Environment;
import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.node.Node;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.watcher.ResourceWatcherService;

import java.nio.file.Path;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiFunction;
import java.util.function.UnaryOperator;

import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
Expand Down Expand Up @@ -67,11 +73,14 @@ public TestPlugin(Settings settings) {
@Override
public Collection<Object> createComponents(Client client, ClusterService clusterService, ThreadPool threadPool,
ResourceWatcherService resourceWatcherService, ScriptService scriptService,
NamedXContentRegistry xContentRegistry) {
NamedXContentRegistry xContentRegistry, Environment environment,
NodeEnvironment nodeEnvironment, NamedWriteableRegistry namedWriteableRegistry,
BiFunction<Settings, Path, Node> clientNodeBuilder) {
clusterService.getClusterSettings().addSettingsUpdateConsumer(UPDATE_TEMPLATE_DUMMY_SETTING, integer -> {
logger.debug("the template dummy setting was updated to {}", integer);
});
return super.createComponents(client, clusterService, threadPool, resourceWatcherService, scriptService, xContentRegistry);
return super.createComponents(client, clusterService, threadPool, resourceWatcherService, scriptService, xContentRegistry,
environment, nodeEnvironment, namedWriteableRegistry, clientNodeBuilder);
}

@Override
Expand Down
Loading