Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add validation for supported index version on node join, restore, upgrade & open index #21830

Merged
merged 12 commits into from
Dec 1, 2016
Merged
15 changes: 15 additions & 0 deletions core/src/main/java/org/elasticsearch/Version.java
Original file line number Diff line number Diff line change
Expand Up @@ -324,6 +324,21 @@ public Version minimumCompatibilityVersion() {
return Version.smallest(this, fromId(bwcMajor * 1000000 + bwcMinor * 10000 + 99));
}

/**
* Returns the minimum created index version that this version supports. Indices created with lower versions
* can't be used with this version.
*/
public Version minimumIndexCompatibilityVersion() {
final int bwcMajor;
if (major == 5) {
bwcMajor = 2; // we jumped from 2 to 5
} else {
bwcMajor = major - 1;
}
final int bwcMinor = 0;
return Version.smallest(this, fromId(bwcMajor * 1000000 + bwcMinor * 10000 + 99));
}

/**
* Returns <code>true</code> iff both version are compatible. Otherwise <code>false</code>
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,16 @@

package org.elasticsearch.discovery.zen;

import org.elasticsearch.Version;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.discovery.zen.DiscoveryNodesProvider;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.EmptyTransportResponseHandler;
import org.elasticsearch.transport.TransportChannel;
Expand All @@ -37,6 +39,7 @@

import java.io.IOException;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;

public class MembershipAction extends AbstractComponent {

Expand All @@ -58,21 +61,20 @@ public interface MembershipListener {

private final TransportService transportService;

private final DiscoveryNodesProvider nodesProvider;

private final MembershipListener listener;

public MembershipAction(Settings settings, TransportService transportService,
DiscoveryNodesProvider nodesProvider, MembershipListener listener) {
Supplier<DiscoveryNode> localNodeSupplier, MembershipListener listener) {
super(settings);
this.transportService = transportService;
this.nodesProvider = nodesProvider;
this.listener = listener;


transportService.registerRequestHandler(DISCOVERY_JOIN_ACTION_NAME, JoinRequest::new,
ThreadPool.Names.GENERIC, new JoinRequestRequestHandler());
transportService.registerRequestHandler(DISCOVERY_JOIN_VALIDATE_ACTION_NAME, ValidateJoinRequest::new,
ThreadPool.Names.GENERIC, new ValidateJoinRequestRequestHandler());
transportService.registerRequestHandler(DISCOVERY_JOIN_VALIDATE_ACTION_NAME,
() -> new ValidateJoinRequest(localNodeSupplier), ThreadPool.Names.GENERIC,
new ValidateJoinRequestRequestHandler());
transportService.registerRequestHandler(DISCOVERY_LEAVE_ACTION_NAME, LeaveRequest::new,
ThreadPool.Names.GENERIC, new LeaveRequestRequestHandler());
}
Expand Down Expand Up @@ -152,20 +154,23 @@ public void onFailure(Exception e) {
}
}

class ValidateJoinRequest extends TransportRequest {
static class ValidateJoinRequest extends TransportRequest {
private final Supplier<DiscoveryNode> localNode;
private ClusterState state;

ValidateJoinRequest() {
ValidateJoinRequest(Supplier<DiscoveryNode> localNode) {
this.localNode = localNode;
}

ValidateJoinRequest(ClusterState state) {
this.state = state;
this.localNode = state.nodes()::getLocalNode;
}

@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
this.state = ClusterState.Builder.readFrom(in, nodesProvider.nodes().getLocalNode());
this.state = ClusterState.Builder.readFrom(in, localNode.get());
}

@Override
Expand All @@ -175,15 +180,31 @@ public void writeTo(StreamOutput out) throws IOException {
}
}

class ValidateJoinRequestRequestHandler implements TransportRequestHandler<ValidateJoinRequest> {
static class ValidateJoinRequestRequestHandler implements TransportRequestHandler<ValidateJoinRequest> {

@Override
public void messageReceived(ValidateJoinRequest request, TransportChannel channel) throws Exception {
ensureIndexCompatibility(Version.CURRENT.minimumIndexCompatibilityVersion(), request.state.getMetaData());
// for now, the mere fact that we can serialize the cluster state acts as validation....
channel.sendResponse(TransportResponse.Empty.INSTANCE);
}
}

/**
* Ensures that all indices are compatible with the supported index version.
* @throws IllegalStateException if any index is incompatible with the given version
*/
static void ensureIndexCompatibility(final Version supportedIndexVersion, MetaData metaData) {
// we ensure that all indices in the cluster we join are compatible with us no matter if they are
// closed or not we can't read mappings of these indices so we need to reject the join...
for (IndexMetaData idxMetaData : metaData) {
if (idxMetaData.getCreationVersion().before(supportedIndexVersion)) {
throw new IllegalStateException("index " + idxMetaData.getIndex() + " version not supported: "
+ idxMetaData.getCreationVersion() + " minimum compatible index version is: " + supportedIndexVersion);
}
}
}

public static class LeaveRequest extends TransportRequest {

private DiscoveryNode node;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -410,7 +410,6 @@ class JoinTaskExecutor implements ClusterStateTaskExecutor<DiscoveryNode> {
@Override
public BatchResult<DiscoveryNode> execute(ClusterState currentState, List<DiscoveryNode> joiningNodes) throws Exception {
final BatchResult.Builder<DiscoveryNode> results = BatchResult.builder();

final DiscoveryNodes currentNodes = currentState.nodes();
boolean nodesChanged = false;
ClusterState.Builder newState;
Expand All @@ -435,8 +434,10 @@ public BatchResult<DiscoveryNode> execute(ClusterState currentState, List<Discov

assert nodesBuilder.isLocalNodeElectedMaster();

Version minNodeVersion = Version.CURRENT;
// processing any joins
for (final DiscoveryNode node : joiningNodes) {
minNodeVersion = Version.smallest(minNodeVersion, node.getVersion());
if (node.equals(BECOME_MASTER_TASK) || node.equals(FINISH_ELECTION_TASK)) {
// noop
} else if (currentNodes.nodeExists(node)) {
Expand All @@ -452,7 +453,9 @@ public BatchResult<DiscoveryNode> execute(ClusterState currentState, List<Discov
}
results.success(node);
}

// we do this validation quite late to prevent race conditions between nodes joining and importing dangling indices
// we have to reject nodes that don't support all indices we have in this cluster
MembershipAction.ensureIndexCompatibility(minNodeVersion.minimumIndexCompatibilityVersion(), currentState.getMetaData());
if (nodesChanged) {
newState.nodes(nodesBuilder);
return results.build(allocationService.reroute(newState.build(), "node_join"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ public ZenDiscovery(Settings settings, ThreadPool threadPool, TransportService t
new NewPendingClusterStateListener(),
discoverySettings,
clusterService.getClusterName());
this.membership = new MembershipAction(settings, transportService, this, new MembershipListener());
this.membership = new MembershipAction(settings, transportService, this::localNode, new MembershipListener());
this.joinThreadControl = new JoinThreadControl(threadPool);

transportService.registerRequestHandler(
Expand Down Expand Up @@ -301,7 +301,6 @@ public void publish(ClusterChangedEvent clusterChangedEvent, AckListener ackList
if (!clusterChangedEvent.state().getNodes().isLocalNodeElectedMaster()) {
throw new IllegalStateException("Shouldn't publish state when not master");
}

try {
publishClusterState.publish(clusterChangedEvent, electMaster.minimumMasterNodes(), ackListener);
} catch (FailedToCommitClusterStateException t) {
Expand Down Expand Up @@ -852,6 +851,9 @@ void handleJoinRequest(final DiscoveryNode node, final ClusterState state, final
if (nodeJoinController == null) {
throw new IllegalStateException("discovery module is not yet started");
} else {
// we do this in a couple of places including the cluster update thread. This one here is really just best effort
// to ensure we fail as fast as possible.
MembershipAction.ensureIndexCompatibility(node.getVersion().minimumIndexCompatibilityVersion(), state.getMetaData());
// try and connect to the node, if it fails, we can raise an exception back to the client...
transportService.connectToNode(node);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,15 @@

import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.logging.log4j.util.Supplier;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateUpdateTask;
import org.elasticsearch.cluster.block.ClusterBlocks;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.metadata.MetaDataIndexUpgradeService;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.RoutingTable;
import org.elasticsearch.cluster.routing.allocation.AllocationService;
import org.elasticsearch.cluster.service.ClusterService;
Expand Down Expand Up @@ -126,10 +128,18 @@ public ClusterState execute(ClusterState currentState) {
MetaData.Builder metaData = MetaData.builder(currentState.metaData());
ClusterBlocks.Builder blocks = ClusterBlocks.builder().blocks(currentState.blocks());
RoutingTable.Builder routingTableBuilder = RoutingTable.builder(currentState.routingTable());

Version minIndexCompatibilityVersion = currentState.getNodes().getSmallestNonClientNodeVersion()
.minimumIndexCompatibilityVersion();
boolean importNeeded = false;
StringBuilder sb = new StringBuilder();
for (IndexMetaData indexMetaData : request.indices) {
if (indexMetaData.getCreationVersion().before(minIndexCompatibilityVersion)) {
logger.warn("ignoring dangled index [{}] on node [{}]" +
" since it's created version [{}] is not supported by at least one node in the cluster minVersion [{}]",
indexMetaData.getIndex(), request.fromNode, indexMetaData.getCreationVersion(),
minIndexCompatibilityVersion);
continue;
}
if (currentState.metaData().hasIndex(indexMetaData.getIndex().getName())) {
continue;
}
Expand Down
6 changes: 6 additions & 0 deletions core/src/test/java/org/elasticsearch/VersionTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,13 @@ public void testVersionComparison() throws Exception {
assertTrue(Version.fromString("5.0.0").onOrAfter(Version.fromString("5.0.0-beta2")));
assertTrue(Version.fromString("5.0.0-rc1").onOrAfter(Version.fromString("5.0.0-beta24")));
assertTrue(Version.fromString("5.0.0-alpha24").before(Version.fromString("5.0.0-beta0")));
}

public void testMinimumIndexCompatibilityVersion() {
assertEquals(Version.V_5_0_0, Version.V_6_0_0_alpha1_UNRELEASED.minimumIndexCompatibilityVersion());
assertEquals(Version.V_2_0_0, Version.V_5_0_0.minimumIndexCompatibilityVersion());
assertEquals(Version.V_2_0_0, Version.V_5_1_0_UNRELEASED.minimumIndexCompatibilityVersion());
assertEquals(Version.V_2_0_0, Version.V_5_0_0_alpha1.minimumIndexCompatibilityVersion());
}

public void testVersionConstantPresent() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,16 @@
package org.elasticsearch.discovery.zen;

import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.EnumSet;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;

import org.apache.lucene.util.IOUtils;
Expand All @@ -35,22 +38,40 @@
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNode.Role;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.IndexRoutingTable;
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
import org.elasticsearch.cluster.routing.RoutingTable;
import org.elasticsearch.cluster.routing.ShardRoutingState;
import org.elasticsearch.cluster.routing.TestShardRouting;
import org.elasticsearch.cluster.routing.UnassignedInfo;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.discovery.Discovery;
import org.elasticsearch.discovery.zen.PublishClusterStateActionTests.AssertingAckListener;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.VersionUtils;
import org.elasticsearch.test.transport.MockTransportService;
import org.elasticsearch.threadpool.TestThreadPool;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportChannel;
import org.elasticsearch.transport.TransportResponse;
import org.elasticsearch.transport.TransportResponseOptions;
import org.elasticsearch.transport.TransportService;

import static java.util.Collections.emptyMap;
import static java.util.Collections.emptySet;
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_CREATION_DATE;
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_REPLICAS;
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_SHARDS;
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_VERSION_CREATED;
import static org.elasticsearch.cluster.routing.RoutingTableTests.updateActiveAllocations;
import static org.elasticsearch.discovery.zen.ElectMasterService.DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING;
import static org.elasticsearch.discovery.zen.ZenDiscovery.shouldIgnoreOrRejectNewClusterState;
import static org.elasticsearch.test.ClusterServiceUtils.createClusterService;
Expand Down Expand Up @@ -283,4 +304,82 @@ private Set<DiscoveryNode> fdNodesForState(ClusterState clusterState, DiscoveryN
});
return discoveryNodes;
}

public void testValidateOnUnsupportedIndexVersionCreated() throws Exception {
final int iters = randomIntBetween(3, 10);
for (int i = 0; i < iters; i++) {
ClusterState.Builder stateBuilder = ClusterState.builder(ClusterName.DEFAULT);
final DiscoveryNode otherNode = new DiscoveryNode("other_node", buildNewFakeTransportAddress(), emptyMap(),
EnumSet.allOf(DiscoveryNode.Role.class), Version.CURRENT);
MembershipAction.ValidateJoinRequestRequestHandler request = new MembershipAction.ValidateJoinRequestRequestHandler();
final boolean incompatible = randomBoolean();
IndexMetaData indexMetaData = IndexMetaData.builder("test").settings(Settings.builder()
.put(SETTING_VERSION_CREATED, incompatible ? VersionUtils.getPreviousVersion(Version.CURRENT.minimumIndexCompatibilityVersion())
: VersionUtils.randomVersionBetween(random(), Version.CURRENT.minimumIndexCompatibilityVersion(), Version.CURRENT))
.put(SETTING_NUMBER_OF_SHARDS, 1).put(SETTING_NUMBER_OF_REPLICAS, 0)
.put(SETTING_CREATION_DATE, System.currentTimeMillis()))
.state(IndexMetaData.State.OPEN)
.build();
IndexRoutingTable.Builder indexRoutingTableBuilder = IndexRoutingTable.builder(indexMetaData.getIndex());
RoutingTable.Builder routing = new RoutingTable.Builder();
routing.addAsNew(indexMetaData);
final ShardId shardId = new ShardId("test", "_na_", 0);
IndexShardRoutingTable.Builder indexShardRoutingBuilder = new IndexShardRoutingTable.Builder(shardId);

final DiscoveryNode primaryNode = otherNode;
indexShardRoutingBuilder.addShard(TestShardRouting.newShardRouting("test", 0, primaryNode.getId(), null, true,
ShardRoutingState.INITIALIZING, new UnassignedInfo(UnassignedInfo.Reason.INDEX_REOPENED, "getting there")));
indexRoutingTableBuilder.addIndexShard(indexShardRoutingBuilder.build());
IndexRoutingTable indexRoutingTable = indexRoutingTableBuilder.build();
IndexMetaData updatedIndexMetaData = updateActiveAllocations(indexRoutingTable, indexMetaData);
stateBuilder.metaData(MetaData.builder().put(updatedIndexMetaData, false).generateClusterUuidIfNeeded())
.routingTable(RoutingTable.builder().add(indexRoutingTable).build());
if (incompatible) {
IllegalStateException ex = expectThrows(IllegalStateException.class, () ->
request.messageReceived(new MembershipAction.ValidateJoinRequest(stateBuilder.build()), null));
assertEquals("index [test] version not supported: "
+ VersionUtils.getPreviousVersion(Version.CURRENT.minimumCompatibilityVersion())
+ " minimum compatible index version is: " + Version.CURRENT.minimumCompatibilityVersion(), ex.getMessage());
} else {
AtomicBoolean sendResponse = new AtomicBoolean(false);
request.messageReceived(new MembershipAction.ValidateJoinRequest(stateBuilder.build()), new TransportChannel() {
@Override
public String action() {
return null;
}

@Override
public String getProfileName() {
return null;
}

@Override
public long getRequestId() {
return 0;
}

@Override
public String getChannelType() {
return null;
}

@Override
public void sendResponse(TransportResponse response) throws IOException {
sendResponse.set(true);
}

@Override
public void sendResponse(TransportResponse response, TransportResponseOptions options) throws IOException {

}

@Override
public void sendResponse(Exception exception) throws IOException {

}
});
assertTrue(sendResponse.get());
}
}
}
}