Skip to content

Commit

Permalink
Persistent Node Ids (elastic#19140)
Browse files Browse the repository at this point in the history
Node IDs are currently randomly generated during node startup. That means they change every time the node is restarted. While this doesn't matter for ES proper, it makes it hard for external services to track nodes. Another, more minor, side effect is that indexing the output of, say, the node stats API results in creating new fields due to node ID being used as keys.

The first approach I considered was to use the node's published address as the base for the id. We already [treat nodes with the same address as the same](https://github.com/elastic/elasticsearch/blob/master/core/src/main/java/org/elasticsearch/discovery/zen/NodeJoinController.java#L387) so this is a simple change (see [here](elastic/elasticsearch@master...bleskes:node_persistent_id_based_on_address)). While this is simple and it works for probably most cases, it is not perfect. For example, if after a node restart, the node is not able to bind to the same port (because it's not yet freed by the OS), it will cause the node to still change identity. Also in environments where the host IP can change due to a host restart, identity will not be the same. 

Due to those limitation, I opted to go with a different approach where the node id will be persisted in the node's data folder. This has the upside of connecting the id to the nodes data. It also means that the host can be adapted in any way (replace network cards, attach storage to a new VM). I

It does however also have downsides - we now run the risk of two nodes having the same id, if someone copies clones a data folder from one node to another. To mitigate this I changed the semantics of the protection against multiple nodes with the same address to be stricter - it will now reject the incoming join if a node exists with the same id but a different address. Note that if the existing node doesn't respond to pings (i.e., it's not alive) it will be removed and the new node will be accepted when it tries another join.

Last, and most importantly, this change requires that *all* nodes persist data to disk. This is a change from current behavior where only data & master nodes store local files. This is the main reason for marking this PR as breaking.

Other less important notes:
- DummyTransportAddress is removed as we need a unique network address per node. Use `LocalTransportAddress.buildUnique()` instead.
- I renamed `node.add_lid_to_custom_path` to `node.add_lock_id_to_custom_path` to avoid confusion with the node ID which is now part of the `NodeEnvironment` logic.
- I removed the `version` paramater from `MetaDataStateFormat#write` , it wasn't really used and was just in the way :)
- TribeNodes are special in the sense that they do start multiple sub-nodes (previously known as client nodes). Those sub-nodes do not store local files but derive their ID from the parent node id, so they are generated consistently.
  • Loading branch information
bleskes authored Jul 4, 2016
1 parent ed444cd commit 6861d35
Show file tree
Hide file tree
Showing 88 changed files with 862 additions and 494 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
import org.elasticsearch.cluster.routing.allocation.decider.AllocationDeciders;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.DummyTransportAddress;
import org.elasticsearch.common.transport.LocalTransportAddress;
import org.elasticsearch.common.util.set.Sets;
import org.elasticsearch.gateway.GatewayAllocator;

Expand Down Expand Up @@ -102,7 +102,7 @@ public static AllocationDeciders defaultAllocationDeciders(Settings settings, Cl
}

public static DiscoveryNode newNode(String nodeId, Map<String, String> attributes) {
return new DiscoveryNode("", nodeId, DummyTransportAddress.INSTANCE, attributes, Sets.newHashSet(DiscoveryNode.Role.MASTER,
return new DiscoveryNode("", nodeId, LocalTransportAddress.buildUnique(), attributes, Sets.newHashSet(DiscoveryNode.Role.MASTER,
DiscoveryNode.Role.DATA), Version.CURRENT);
}
}
6 changes: 0 additions & 6 deletions buildSrc/src/main/resources/checkstyle_suppressions.xml
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,6 @@
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]cluster[/\\]metadata[/\\]MetaDataMappingService.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]cluster[/\\]metadata[/\\]MetaDataUpdateSettingsService.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]cluster[/\\]metadata[/\\]RepositoriesMetaData.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]cluster[/\\]node[/\\]DiscoveryNodes.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]cluster[/\\]routing[/\\]IndexRoutingTable.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]cluster[/\\]routing[/\\]IndexShardRoutingTable.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]cluster[/\\]routing[/\\]OperationRouting.java" checks="LineLength" />
Expand Down Expand Up @@ -341,12 +340,9 @@
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]discovery[/\\]DiscoveryService.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]discovery[/\\]DiscoverySettings.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]discovery[/\\]local[/\\]LocalDiscovery.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]discovery[/\\]zen[/\\]NodeJoinController.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]discovery[/\\]zen[/\\]ZenDiscovery.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]discovery[/\\]zen[/\\]elect[/\\]ElectMasterService.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]discovery[/\\]zen[/\\]fd[/\\]FaultDetection.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]discovery[/\\]zen[/\\]fd[/\\]MasterFaultDetection.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]discovery[/\\]zen[/\\]fd[/\\]NodesFaultDetection.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]discovery[/\\]zen[/\\]membership[/\\]MembershipAction.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]discovery[/\\]zen[/\\]ping[/\\]ZenPing.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]discovery[/\\]zen[/\\]publish[/\\]PendingClusterStatesQueue.java" checks="LineLength" />
Expand All @@ -357,7 +353,6 @@
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]gateway[/\\]GatewayMetaState.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]gateway[/\\]GatewayService.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]gateway[/\\]LocalAllocateDangledIndices.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]gateway[/\\]MetaDataStateFormat.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]gateway[/\\]PrimaryShardAllocator.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]gateway[/\\]ReplicaShardAllocator.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]gateway[/\\]TransportNodesListGatewayMetaState.java" checks="LineLength" />
Expand Down Expand Up @@ -847,7 +842,6 @@
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]discovery[/\\]BlockingClusterStatePublishResponseHandlerTests.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]discovery[/\\]DiscoveryWithServiceDisruptionsIT.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]discovery[/\\]ZenUnicastDiscoveryIT.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]discovery[/\\]zen[/\\]NodeJoinControllerTests.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]discovery[/\\]zen[/\\]ZenDiscoveryUnitTests.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]discovery[/\\]zen[/\\]publish[/\\]PublishClusterStateActionTests.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]document[/\\]DocumentActionsIT.java" checks="LineLength" />
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -397,9 +397,9 @@ public LivenessResponse newInstance() {
// use discovered information but do keep the original transport address,
// so people can control which address is exactly used.
DiscoveryNode nodeWithInfo = livenessResponse.getDiscoveryNode();
newNodes.add(new DiscoveryNode(nodeWithInfo.getName(), nodeWithInfo.getId(), nodeWithInfo.getHostName(),
nodeWithInfo.getHostAddress(), listedNode.getAddress(), nodeWithInfo.getAttributes(),
nodeWithInfo.getRoles(), nodeWithInfo.getVersion()));
newNodes.add(new DiscoveryNode(nodeWithInfo.getName(), nodeWithInfo.getId(), nodeWithInfo.getEphemeralId(),
nodeWithInfo.getHostName(), nodeWithInfo.getHostAddress(), listedNode.getAddress(),
nodeWithInfo.getAttributes(), nodeWithInfo.getRoles(), nodeWithInfo.getVersion()));
} else {
// although we asked for one node, our target may not have completed
// initialization yet and doesn't have cluster nodes
Expand Down
111 changes: 69 additions & 42 deletions core/src/main/java/org/elasticsearch/cluster/node/DiscoveryNode.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
package org.elasticsearch.cluster.node;

import org.elasticsearch.Version;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
Expand Down Expand Up @@ -64,7 +64,15 @@ public static boolean isLocalNode(Settings settings) {
}

public static boolean nodeRequiresLocalStorage(Settings settings) {
return Node.NODE_DATA_SETTING.get(settings) || Node.NODE_MASTER_SETTING.get(settings);
boolean localStorageEnable = Node.NODE_LOCAL_STORAGE_SETTING.get(settings);
if (localStorageEnable == false &&
(Node.NODE_DATA_SETTING.get(settings) ||
Node.NODE_MASTER_SETTING.get(settings))
) {
// TODO: make this a proper setting validation logic, requiring multi-settings validation
throw new IllegalArgumentException("storage can not be disabled for master and data nodes");
}
return localStorageEnable;
}

public static boolean isMasterNode(Settings settings) {
Expand All @@ -81,6 +89,7 @@ public static boolean isIngestNode(Settings settings) {

private final String nodeName;
private final String nodeId;
private final String ephemeralId;
private final String hostName;
private final String hostAddress;
private final TransportAddress address;
Expand All @@ -97,14 +106,15 @@ public static boolean isIngestNode(Settings settings) {
* and updated.
* </p>
*
* @param nodeId the nodes unique id.
* @param address the nodes transport address
* @param attributes node attributes
* @param roles node roles
* @param version the version of the node.
* @param id the nodes unique (persistent) node id. This constructor will auto generate a random ephemeral id.
* @param address the nodes transport address
* @param attributes node attributes
* @param roles node roles
* @param version the version of the node
*/
public DiscoveryNode(String nodeId, TransportAddress address, Map<String, String> attributes, Set<Role> roles, Version version) {
this("", nodeId, address.getHost(), address.getAddress(), address, attributes, roles, version);
public DiscoveryNode(String id, TransportAddress address, Map<String, String> attributes, Set<Role> roles,
Version version) {
this("", id, address, attributes, roles, version);
}

/**
Expand All @@ -116,16 +126,16 @@ public DiscoveryNode(String nodeId, TransportAddress address, Map<String, String
* and updated.
* </p>
*
* @param nodeName the nodes name
* @param nodeId the nodes unique id.
* @param address the nodes transport address
* @param attributes node attributes
* @param roles node roles
* @param version the version of the node.
* @param nodeName the nodes name
* @param nodeId the nodes unique persistent id. An ephemeral id will be auto generated.
* @param address the nodes transport address
* @param attributes node attributes
* @param roles node roles
* @param version the version of the node
*/
public DiscoveryNode(String nodeName, String nodeId, TransportAddress address, Map<String, String> attributes,
Set<Role> roles, Version version) {
this(nodeName, nodeId, address.getHost(), address.getAddress(), address, attributes, roles, version);
public DiscoveryNode(String nodeName, String nodeId, TransportAddress address,
Map<String, String> attributes, Set<Role> roles, Version version) {
this(nodeName, nodeId, UUIDs.randomBase64UUID(), address.getHost(), address.getAddress(), address, attributes, roles, version);
}

/**
Expand All @@ -137,23 +147,24 @@ public DiscoveryNode(String nodeName, String nodeId, TransportAddress address, M
* and updated.
* </p>
*
* @param nodeName the nodes name
* @param nodeId the nodes unique id.
* @param hostName the nodes hostname
* @param hostAddress the nodes host address
* @param address the nodes transport address
* @param attributes node attributes
* @param roles node roles
* @param version the version of the node.
* @param nodeName the nodes name
* @param nodeId the nodes unique persistent id
* @param ephemeralId the nodes unique ephemeral id
* @param hostAddress the nodes host address
* @param address the nodes transport address
* @param attributes node attributes
* @param roles node roles
* @param version the version of the node
*/
public DiscoveryNode(String nodeName, String nodeId, String hostName, String hostAddress, TransportAddress address,
Map<String, String> attributes, Set<Role> roles, Version version) {
public DiscoveryNode(String nodeName, String nodeId, String ephemeralId, String hostName, String hostAddress,
TransportAddress address, Map<String, String> attributes, Set<Role> roles, Version version) {
if (nodeName != null) {
this.nodeName = nodeName.intern();
} else {
this.nodeName = "";
}
this.nodeId = nodeId.intern();
this.ephemeralId = ephemeralId.intern();
this.hostName = hostName.intern();
this.hostAddress = hostAddress.intern();
this.address = address;
Expand Down Expand Up @@ -184,6 +195,7 @@ public DiscoveryNode(String nodeName, String nodeId, String hostName, String hos
public DiscoveryNode(StreamInput in) throws IOException {
this.nodeName = in.readString().intern();
this.nodeId = in.readString().intern();
this.ephemeralId = in.readString().intern();
this.hostName = in.readString().intern();
this.hostAddress = in.readString().intern();
this.address = TransportAddressSerializers.addressFromStream(in);
Expand All @@ -208,6 +220,7 @@ public DiscoveryNode(StreamInput in) throws IOException {
public void writeTo(StreamOutput out) throws IOException {
out.writeString(nodeName);
out.writeString(nodeId);
out.writeString(ephemeralId);
out.writeString(hostName);
out.writeString(hostAddress);
addressToStream(out, address);
Expand Down Expand Up @@ -237,6 +250,17 @@ public String getId() {
return nodeId;
}

/**
* The unique ephemeral id of the node. Ephemeral ids are meant to be attached the the life span
* of a node process. When ever a node is restarted, it's ephemeral id is required to change (while it's {@link #getId()}
* will be read from the data folder and will remain the same across restarts). Since all node attributes and addresses
* are maintained during the life span of a node process, we can (and are) using the ephemeralId in
* {@link DiscoveryNode#equals(Object)}.
*/
public String getEphemeralId() {
return ephemeralId;
}

/**
* The name of the node.
*/
Expand Down Expand Up @@ -293,18 +317,25 @@ public String getHostAddress() {
}

@Override
public boolean equals(Object obj) {
if (!(obj instanceof DiscoveryNode)) {
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}

DiscoveryNode other = (DiscoveryNode) obj;
return this.nodeId.equals(other.nodeId);
DiscoveryNode that = (DiscoveryNode) o;

return ephemeralId.equals(that.ephemeralId);
}

@Override
public int hashCode() {
return nodeId.hashCode();
// we only need to hash the id because it's highly unlikely that two nodes
// in our system will have the same id but be different
// This is done so that this class can be used efficiently as a key in a map
return ephemeralId.hashCode();
}

@Override
Expand All @@ -313,15 +344,10 @@ public String toString() {
if (nodeName.length() > 0) {
sb.append('{').append(nodeName).append('}');
}
if (nodeId != null) {
sb.append('{').append(nodeId).append('}');
}
if (Strings.hasLength(hostName)) {
sb.append('{').append(hostName).append('}');
}
if (address != null) {
sb.append('{').append(address).append('}');
}
sb.append('{').append(nodeId).append('}');
sb.append('{').append(ephemeralId).append('}');
sb.append('{').append(hostName).append('}');
sb.append('{').append(address).append('}');
if (!attributes.isEmpty()) {
sb.append(attributes);
}
Expand All @@ -332,6 +358,7 @@ public String toString() {
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject(getId());
builder.field("name", getName());
builder.field("ephemeral_id", getEphemeralId());
builder.field("transport_address", getAddress().toString());

builder.startObject("attributes");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,8 @@
package org.elasticsearch.cluster.node;

import org.elasticsearch.Version;
import org.elasticsearch.common.Randomness;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Setting.Property;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.node.Node;
Expand All @@ -34,36 +30,27 @@
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.function.Supplier;

/**
*/
public class DiscoveryNodeService extends AbstractComponent {

public static final Setting<Long> NODE_ID_SEED_SETTING =
// don't use node.id.seed so it won't be seen as an attribute
Setting.longSetting("node_id.seed", 0L, Long.MIN_VALUE, Property.NodeScope);
private final List<CustomAttributesProvider> customAttributesProviders = new CopyOnWriteArrayList<>();

@Inject
public DiscoveryNodeService(Settings settings) {
super(settings);
}

public static String generateNodeId(Settings settings) {
Random random = Randomness.get(settings, NODE_ID_SEED_SETTING);
return UUIDs.randomBase64UUID(random);
}

public DiscoveryNodeService addCustomAttributeProvider(CustomAttributesProvider customAttributesProvider) {
customAttributesProviders.add(customAttributesProvider);
return this;
}

public DiscoveryNode buildLocalNode(TransportAddress publishAddress) {
final String nodeId = generateNodeId(settings);
public DiscoveryNode buildLocalNode(TransportAddress publishAddress, Supplier<String> nodeIdSupplier) {
Map<String, String> attributes = new HashMap<>(Node.NODE_ATTRIBUTES.get(this.settings).getAsMap());
Set<DiscoveryNode.Role> roles = new HashSet<>();
if (Node.NODE_INGEST_SETTING.get(settings)) {
Expand All @@ -90,8 +77,8 @@ public DiscoveryNode buildLocalNode(TransportAddress publishAddress) {
logger.warn("failed to build custom attributes from provider [{}]", e, provider);
}
}
return new DiscoveryNode(Node.NODE_NAME_SETTING.get(settings), nodeId, publishAddress, attributes,
roles, Version.CURRENT);
return new DiscoveryNode(Node.NODE_NAME_SETTING.get(settings), nodeIdSupplier.get(), publishAddress, attributes, roles,
Version.CURRENT);
}

public interface CustomAttributesProvider {
Expand Down
Loading

0 comments on commit 6861d35

Please sign in to comment.