setting = Setting.prefixKeySetting(prefix, (key) -> new Setting<>(key, "", Function.identity(),
+ Setting.Property.NodeScope));
+ allSettings.add(setting);
+ }
+
+ return allSettings;
+ }
+
+}
diff --git a/core/src/main/java/org/elasticsearch/tribe/TribeService.java b/modules/tribe/src/main/java/org/elasticsearch/tribe/TribeService.java
similarity index 87%
rename from core/src/main/java/org/elasticsearch/tribe/TribeService.java
rename to modules/tribe/src/main/java/org/elasticsearch/tribe/TribeService.java
index b07e838653cc5..714749b94782c 100644
--- a/core/src/main/java/org/elasticsearch/tribe/TribeService.java
+++ b/modules/tribe/src/main/java/org/elasticsearch/tribe/TribeService.java
@@ -22,13 +22,13 @@
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.logging.log4j.util.Supplier;
import org.apache.lucene.util.BytesRef;
-import org.elasticsearch.ElasticsearchException;
-import org.elasticsearch.action.support.master.TransportMasterNodeReadAction;
+import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateListener;
import org.elasticsearch.cluster.ClusterStateTaskConfig;
import org.elasticsearch.cluster.ClusterStateTaskExecutor;
+import org.elasticsearch.cluster.MergableCustomMetaData;
import org.elasticsearch.cluster.block.ClusterBlock;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.block.ClusterBlocks;
@@ -41,7 +41,6 @@
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.Strings;
-import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.component.Lifecycle;
import org.elasticsearch.common.hash.MurmurHash3;
@@ -59,8 +58,6 @@
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.common.util.set.Sets;
-import org.elasticsearch.discovery.DiscoveryModule;
-import org.elasticsearch.discovery.DiscoverySettings;
import org.elasticsearch.env.Environment;
import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.node.Node;
@@ -68,7 +65,6 @@
import org.elasticsearch.transport.TcpTransport;
import java.io.IOException;
-import java.nio.file.Path;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
@@ -78,7 +74,6 @@
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.stream.Collectors;
@@ -88,10 +83,6 @@
* The tribe service holds a list of node clients connected to a list of tribe members, and uses their
* cluster state events to update this local node cluster state with the merged view of it.
*
- * The {@link #processSettings(org.elasticsearch.common.settings.Settings)} method should be called before
- * starting the node, so it will make sure to configure this current node properly with the relevant tribe node
- * settings.
- *
* The tribe node settings make sure the discovery used is "local", but with no master elected. This means no
* write level master node operations will work ({@link org.elasticsearch.discovery.MasterNotDiscoveredException}
* will be thrown), and state level metadata operations with automatically use the local flag.
@@ -109,63 +100,6 @@ public class TribeService extends AbstractLifecycleComponent {
public static final ClusterBlock TRIBE_WRITE_BLOCK = new ClusterBlock(11, "tribe node, write not allowed", false, false,
false, RestStatus.BAD_REQUEST, EnumSet.of(ClusterBlockLevel.WRITE));
- public static Settings processSettings(Settings settings) {
- if (TRIBE_NAME_SETTING.exists(settings)) {
- // if its a node client started by this service as tribe, remove any tribe group setting
- // to avoid recursive configuration
- Settings.Builder sb = Settings.builder().put(settings);
- for (String s : settings.getAsMap().keySet()) {
- if (s.startsWith("tribe.") && !s.equals(TRIBE_NAME_SETTING.getKey())) {
- sb.remove(s);
- }
- }
- return sb.build();
- }
- Map nodesSettings = settings.getGroups("tribe", true);
- if (nodesSettings.isEmpty()) {
- return settings;
- }
- // its a tribe configured node..., force settings
- Settings.Builder sb = Settings.builder().put(settings);
- sb.put(Node.NODE_MASTER_SETTING.getKey(), false);
- sb.put(Node.NODE_DATA_SETTING.getKey(), false);
- sb.put(Node.NODE_INGEST_SETTING.getKey(), false);
- if (!NodeEnvironment.MAX_LOCAL_STORAGE_NODES_SETTING.exists(settings)) {
- sb.put(NodeEnvironment.MAX_LOCAL_STORAGE_NODES_SETTING.getKey(), nodesSettings.size());
- }
- sb.put(DiscoveryModule.DISCOVERY_TYPE_SETTING.getKey(), "tribe"); // there is a special discovery implementation for tribe
- // nothing is going to be discovered, since no master will be elected
- sb.put(DiscoverySettings.INITIAL_STATE_TIMEOUT_SETTING.getKey(), 0);
- if (sb.get("cluster.name") == null) {
- sb.put("cluster.name", "tribe_" + UUIDs.randomBase64UUID()); // make sure it won't join other tribe nodes in the same JVM
- }
- sb.put(TransportMasterNodeReadAction.FORCE_LOCAL_SETTING.getKey(), true);
- return sb.build();
- }
-
- /**
- * Interface to allow merging {@link org.elasticsearch.cluster.metadata.MetaData.Custom} in tribe node
- * When multiple Mergable Custom metadata of the same type is found (from underlying clusters), the
- * Custom metadata will be merged using {@link #merge(MetaData.Custom)} and the result will be stored
- * in the tribe cluster state
- *
- * @param type of custom meta data
- */
- public interface MergableCustomMetaData {
-
- /**
- * Merges this custom metadata with other, returning either this or other
custom metadata
- * for tribe cluster state. This method should not mutate either this
or the
- * other
custom metadata.
- *
- * @param other custom meta data
- * @return the same instance or other
custom metadata based on implementation
- * if both the instances are considered equal, implementations should return this
- * instance to avoid redundant cluster state changes.
- */
- T merge(T other);
- }
-
// internal settings only
public static final Setting TRIBE_NAME_SETTING = Setting.simpleString("tribe.name", Property.NodeScope);
private final ClusterService clusterService;
@@ -200,7 +134,8 @@ public interface MergableCustomMetaData {
Setting.listSetting("tribe.blocks.metadata.indices", Collections.emptyList(), Function.identity(), Property.NodeScope);
public static final Set TRIBE_SETTING_KEYS = Sets.newHashSet(TRIBE_NAME_SETTING.getKey(), ON_CONFLICT_SETTING.getKey(),
- BLOCKS_METADATA_INDICES_SETTING.getKey(), BLOCKS_METADATA_SETTING.getKey(), BLOCKS_READ_INDICES_SETTING.getKey(), BLOCKS_WRITE_INDICES_SETTING.getKey(), BLOCKS_WRITE_SETTING.getKey());
+ BLOCKS_METADATA_INDICES_SETTING.getKey(), BLOCKS_METADATA_SETTING.getKey(), BLOCKS_READ_INDICES_SETTING.getKey(),
+ BLOCKS_WRITE_INDICES_SETTING.getKey(), BLOCKS_WRITE_SETTING.getKey());
// these settings should be passed through to each tribe client, if they are not set explicitly
private static final List> PASS_THROUGH_SETTINGS = Arrays.asList(
@@ -218,8 +153,8 @@ public interface MergableCustomMetaData {
private final NamedWriteableRegistry namedWriteableRegistry;
- public TribeService(Settings settings, Path configPath, ClusterService clusterService, final String tribeNodeId,
- NamedWriteableRegistry namedWriteableRegistry, BiFunction clientNodeBuilder) {
+ public TribeService(Settings settings, NodeEnvironment nodeEnvironment, ClusterService clusterService,
+ NamedWriteableRegistry namedWriteableRegistry, Function clientNodeBuilder) {
super(settings);
this.clusterService = clusterService;
this.namedWriteableRegistry = namedWriteableRegistry;
@@ -227,8 +162,21 @@ public TribeService(Settings settings, Path configPath, ClusterService clusterSe
nodesSettings.remove("blocks"); // remove prefix settings that don't indicate a client
nodesSettings.remove("on_conflict"); // remove prefix settings that don't indicate a client
for (Map.Entry entry : nodesSettings.entrySet()) {
- Settings clientSettings = buildClientSettings(entry.getKey(), tribeNodeId, settings, entry.getValue());
- nodes.add(clientNodeBuilder.apply(clientSettings, configPath));
+ Settings clientSettings = buildClientSettings(entry.getKey(), nodeEnvironment.nodeId(), settings, entry.getValue());
+ try {
+ nodes.add(clientNodeBuilder.apply(clientSettings));
+ } catch (Exception e) {
+ // calling close is safe for non started nodes, we can just iterate over all
+ for (Node otherNode : nodes) {
+ try {
+ otherNode.close();
+ } catch (Exception inner) {
+ inner.addSuppressed(e);
+ logger.warn((Supplier>) () -> new ParameterizedMessage("failed to close node {} on failed start", otherNode), inner);
+ }
+ }
+ throw ExceptionsHelper.convertToRuntime(e);
+ }
}
this.blockIndicesMetadata = BLOCKS_METADATA_INDICES_SETTING.get(settings).toArray(Strings.EMPTY_ARRAY);
@@ -279,7 +227,6 @@ static Settings buildClientSettings(String tribeName, String parentNodeId, Setti
return sb.build();
}
-
@Override
protected void doStart() {
@@ -300,10 +247,7 @@ public void startNodes() {
logger.warn((Supplier>) () -> new ParameterizedMessage("failed to close node {} on failed start", otherNode), inner);
}
}
- if (e instanceof RuntimeException) {
- throw (RuntimeException) e;
- }
- throw new ElasticsearchException(e);
+ throw ExceptionsHelper.convertToRuntime(e);
}
}
}
diff --git a/core/src/test/java/org/elasticsearch/tribe/TribeIT.java b/modules/tribe/src/test/java/org/elasticsearch/tribe/TribeIntegrationTests.java
similarity index 93%
rename from core/src/test/java/org/elasticsearch/tribe/TribeIT.java
rename to modules/tribe/src/test/java/org/elasticsearch/tribe/TribeIntegrationTests.java
index 0307e69cfbeba..fb77bcd33d0fe 100644
--- a/core/src/test/java/org/elasticsearch/tribe/TribeIT.java
+++ b/modules/tribe/src/test/java/org/elasticsearch/tribe/TribeIntegrationTests.java
@@ -23,8 +23,8 @@
import org.elasticsearch.action.support.DestructiveOperations;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterName;
-import org.elasticsearch.cluster.ClusterStateUpdateTask;
import org.elasticsearch.cluster.ClusterState;
+import org.elasticsearch.cluster.ClusterStateUpdateTask;
import org.elasticsearch.cluster.NamedDiff;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.health.ClusterHealthStatus;
@@ -41,12 +41,15 @@
import org.elasticsearch.common.util.set.Sets;
import org.elasticsearch.discovery.DiscoverySettings;
import org.elasticsearch.discovery.MasterNotDiscoveredException;
+import org.elasticsearch.env.Environment;
+import org.elasticsearch.node.MockNode;
import org.elasticsearch.node.Node;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.InternalTestCluster;
import org.elasticsearch.test.NodeConfigurationSource;
import org.elasticsearch.test.TestCustomMetaData;
+import org.elasticsearch.test.discovery.TestZenDiscovery;
import org.elasticsearch.transport.MockTcpTransportPlugin;
import org.elasticsearch.tribe.TribeServiceTests.MergableCustomMetaData1;
import org.elasticsearch.tribe.TribeServiceTests.MergableCustomMetaData2;
@@ -55,6 +58,7 @@
import org.junit.Before;
import java.io.IOException;
+import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
@@ -84,7 +88,7 @@
* does it by default.
*/
@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.SUITE, numDataNodes = 0, numClientNodes = 0, transportClientRatio = 0.0)
-public class TribeIT extends ESIntegTestCase {
+public class TribeIntegrationTests extends ESIntegTestCase {
private static final String TRIBE_NODE = "tribe_node";
@@ -145,14 +149,49 @@ public List getNamedWriteables() {
}
}
+ public static class MockTribePlugin extends TribePlugin {
+
+ public MockTribePlugin(Settings settings) {
+ super(settings);
+ }
+
+ protected Function nodeBuilder(Path configPath) {
+ return settings -> new MockNode(new Environment(settings, configPath), internalCluster().getPlugins());
+ }
+
+ }
+
@Override
protected Collection> nodePlugins() {
ArrayList> plugins = new ArrayList<>();
plugins.addAll(getMockPlugins());
+ plugins.add(MockTribePlugin.class);
+ plugins.add(TribeAwareTestZenDiscoveryPlugin.class);
plugins.add(TestCustomMetaDataPlugin.class);
return plugins;
}
+ @Override
+ protected boolean addTestZenDiscovery() {
+ return false;
+ }
+
+ public static class TribeAwareTestZenDiscoveryPlugin extends TestZenDiscovery.TestPlugin {
+
+ public TribeAwareTestZenDiscoveryPlugin(Settings settings) {
+ super(settings);
+ }
+
+ @Override
+ public Settings additionalSettings() {
+ if (settings.getGroups("tribe", true).isEmpty()) {
+ return super.additionalSettings();
+ } else {
+ return Settings.EMPTY;
+ }
+ }
+ }
+
@Before
public void startRemoteClusters() {
final int minNumDataNodes = 2;
@@ -249,9 +288,12 @@ private Settings.Builder createTribeSettings(Predicate filt
final Settings.Builder settings = Settings.builder();
settings.put(Node.NODE_NAME_SETTING.getKey(), TRIBE_NODE);
settings.put(Node.NODE_DATA_SETTING.getKey(), false);
- settings.put(Node.NODE_MASTER_SETTING.getKey(), true);
+ settings.put(Node.NODE_MASTER_SETTING.getKey(), false);
+ settings.put(Node.NODE_INGEST_SETTING.getKey(), false);
settings.put(NetworkModule.HTTP_ENABLED.getKey(), false);
settings.put(NetworkModule.TRANSPORT_TYPE_SETTING.getKey(), MockTcpTransportPlugin.MOCK_TCP_TRANSPORT_NAME);
+ // add dummy tribe setting so that node is always identifiable as tribe in this test even if the set of connecting cluster is empty
+ settings.put(TribeService.BLOCKS_WRITE_SETTING.getKey(), TribeService.BLOCKS_WRITE_SETTING.getDefault(Settings.EMPTY));
doWithAllClusters(filter, c -> {
String tribeSetting = "tribe." + c.getClusterName() + ".";
@@ -263,6 +305,15 @@ private Settings.Builder createTribeSettings(Predicate filt
return settings;
}
+ public void testTribeNodeWithBadSettings() throws Exception {
+ Settings brokenSettings = Settings.builder()
+ .put("tribe.some.setting.that.does.not.exist", true)
+ .build();
+
+ IllegalArgumentException e = expectThrows(IllegalArgumentException.class, () -> startTribeNode(ALL, brokenSettings));
+ assertThat(e.getMessage(), containsString("unknown setting [setting.that.does.not.exist]"));
+ }
+
public void testGlobalReadWriteBlocks() throws Exception {
Settings additionalSettings = Settings.builder()
.put("tribe.blocks.write", true)
diff --git a/core/src/test/java/org/elasticsearch/tribe/TribeServiceTests.java b/modules/tribe/src/test/java/org/elasticsearch/tribe/TribeServiceTests.java
similarity index 91%
rename from core/src/test/java/org/elasticsearch/tribe/TribeServiceTests.java
rename to modules/tribe/src/test/java/org/elasticsearch/tribe/TribeServiceTests.java
index ac9e3156e1c2d..1d63ae1e0b723 100644
--- a/core/src/test/java/org/elasticsearch/tribe/TribeServiceTests.java
+++ b/modules/tribe/src/test/java/org/elasticsearch/tribe/TribeServiceTests.java
@@ -20,16 +20,17 @@
package org.elasticsearch.tribe;
import org.elasticsearch.cluster.ClusterName;
+import org.elasticsearch.cluster.MergableCustomMetaData;
import org.elasticsearch.cluster.NamedDiff;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.network.NetworkModule;
-import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.set.Sets;
import org.elasticsearch.env.Environment;
import org.elasticsearch.node.MockNode;
import org.elasticsearch.node.Node;
+import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.TestCustomMetaData;
import org.elasticsearch.transport.MockTcpTransportPlugin;
@@ -43,6 +44,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.function.Function;
import static org.hamcrest.Matchers.instanceOf;
@@ -137,7 +139,7 @@ public void testMergeCustomMetaData() {
}
public void testMergeMultipleCustomMetaData() {
- Map> inputMap = new HashMap<>();
+ Map> inputMap = new HashMap<>();
inputMap.put(MergableCustomMetaData1.TYPE,
Arrays.asList(new MergableCustomMetaData1("data10"), new MergableCustomMetaData1("data11")));
inputMap.put(MergableCustomMetaData2.TYPE,
@@ -155,15 +157,15 @@ public void testMergeMultipleCustomMetaData() {
}
public void testMergeCustomMetaDataFromMany() {
- Map> inputMap = new HashMap<>();
+ Map> inputMap = new HashMap<>();
int n = randomIntBetween(3, 5);
- List customList1 = new ArrayList<>();
+ List customList1 = new ArrayList<>();
for (int i = 0; i <= n; i++) {
customList1.add(new MergableCustomMetaData1("data1"+String.valueOf(i)));
}
Collections.shuffle(customList1, random());
inputMap.put(MergableCustomMetaData1.TYPE, customList1);
- List customList2 = new ArrayList<>();
+ List customList2 = new ArrayList<>();
for (int i = 0; i <= n; i++) {
customList2.add(new MergableCustomMetaData2("data2"+String.valueOf(i)));
}
@@ -182,6 +184,20 @@ public void testMergeCustomMetaDataFromMany() {
assertEquals(mergedCustom.getData(), "data2"+String.valueOf(n));
}
+ public static class MockTribePlugin extends TribePlugin {
+
+ static List> classpathPlugins = Arrays.asList(MockTribePlugin.class, MockTcpTransportPlugin.class);
+
+ public MockTribePlugin(Settings settings) {
+ super(settings);
+ }
+
+ protected Function nodeBuilder(Path configPath) {
+ return settings -> new MockNode(new Environment(settings, configPath), classpathPlugins);
+ }
+
+ }
+
public void testTribeNodeDeprecation() throws IOException {
final Path tempDir = createTempDir();
Settings.Builder settings = Settings.builder()
@@ -197,7 +213,7 @@ public void testTribeNodeDeprecation() throws IOException {
settings.put(tribeSetting + ClusterName.CLUSTER_NAME_SETTING.getKey(), clusterName)
.put(tribeSetting + NetworkModule.TRANSPORT_TYPE_SETTING.getKey(), "mock-socket-network");
}
- try (Node node = new MockNode(settings.build(),Collections.singleton(MockTcpTransportPlugin.class) )) {
+ try (Node node = new MockNode(settings.build(), MockTribePlugin.classpathPlugins)) {
if (tribeServiceEnable) {
assertWarnings("tribe nodes are deprecated in favor of cross-cluster search and will be removed in Elasticsearch 7.0.0");
}
@@ -205,7 +221,7 @@ public void testTribeNodeDeprecation() throws IOException {
}
static class MergableCustomMetaData1 extends TestCustomMetaData
- implements TribeService.MergableCustomMetaData {
+ implements MergableCustomMetaData {
public static final String TYPE = "custom_md_1";
protected MergableCustomMetaData1(String data) {
@@ -237,7 +253,7 @@ public MergableCustomMetaData1 merge(MergableCustomMetaData1 other) {
}
static class MergableCustomMetaData2 extends TestCustomMetaData
- implements TribeService.MergableCustomMetaData {
+ implements MergableCustomMetaData {
public static final String TYPE = "custom_md_2";
protected MergableCustomMetaData2(String data) {
diff --git a/plugins/discovery-file/src/main/java/org/elasticsearch/discovery/file/FileBasedDiscoveryPlugin.java b/plugins/discovery-file/src/main/java/org/elasticsearch/discovery/file/FileBasedDiscoveryPlugin.java
index d809fd3fa885e..0cd8176df83f5 100644
--- a/plugins/discovery-file/src/main/java/org/elasticsearch/discovery/file/FileBasedDiscoveryPlugin.java
+++ b/plugins/discovery-file/src/main/java/org/elasticsearch/discovery/file/FileBasedDiscoveryPlugin.java
@@ -22,6 +22,7 @@
import org.apache.logging.log4j.Logger;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.service.ClusterService;
+import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.logging.DeprecationLogger;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.network.NetworkService;
@@ -32,6 +33,7 @@
import org.elasticsearch.discovery.zen.UnicastHostsProvider;
import org.elasticsearch.discovery.zen.UnicastZenPing;
import org.elasticsearch.env.Environment;
+import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.plugins.DiscoveryPlugin;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.script.ScriptService;
@@ -66,13 +68,10 @@ public FileBasedDiscoveryPlugin(Settings settings) {
}
@Override
- public Collection