Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add validation for supported index version on node join, restore, upgrade & open index #21830

Merged
merged 12 commits into from
Dec 1, 2016
Merged
15 changes: 15 additions & 0 deletions core/src/main/java/org/elasticsearch/Version.java
Original file line number Diff line number Diff line change
Expand Up @@ -324,6 +324,21 @@ public Version minimumCompatibilityVersion() {
return Version.smallest(this, fromId(bwcMajor * 1000000 + bwcMinor * 10000 + 99));
}

/**
* Returns the minimum created index version that this version supports. Indices created with lower versions
* can't be used with this version.
*/
public Version minimumIndexCompatibilityVersion() {
final int bwcMajor;
if (major == 5) {
bwcMajor = 2; // we jumped from 2 to 5
} else {
bwcMajor = major - 1;
}
final int bwcMinor = 0;
return Version.smallest(this, fromId(bwcMajor * 1000000 + bwcMinor * 10000 + 99));
}

/**
* Returns <code>true</code> iff both version are compatible. Otherwise <code>false</code>
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,16 @@

package org.elasticsearch.discovery.zen;

import org.elasticsearch.Version;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.discovery.zen.DiscoveryNodesProvider;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.EmptyTransportResponseHandler;
import org.elasticsearch.transport.TransportChannel;
Expand All @@ -37,6 +39,7 @@

import java.io.IOException;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;

public class MembershipAction extends AbstractComponent {

Expand All @@ -58,21 +61,21 @@ public interface MembershipListener {

private final TransportService transportService;

private final DiscoveryNodesProvider nodesProvider;

private final MembershipListener listener;

public MembershipAction(Settings settings, TransportService transportService,
DiscoveryNodesProvider nodesProvider, MembershipListener listener) {
Supplier<DiscoveryNode> localNodeSupplier, MembershipListener listener) {
super(settings);
this.transportService = transportService;
this.nodesProvider = nodesProvider;
this.listener = listener;


transportService.registerRequestHandler(DISCOVERY_JOIN_ACTION_NAME, JoinRequest::new,
ThreadPool.Names.GENERIC, new JoinRequestRequestHandler());
transportService.registerRequestHandler(DISCOVERY_JOIN_VALIDATE_ACTION_NAME, ValidateJoinRequest::new,
ThreadPool.Names.GENERIC, new ValidateJoinRequestRequestHandler());
transportService.registerRequestHandler(DISCOVERY_JOIN_VALIDATE_ACTION_NAME,
() -> new ValidateJoinRequest(localNodeSupplier), ThreadPool.Names.GENERIC,
new ValidateJoinRequestRequestHandler());
transportService.registerRequestHandler(DISCOVERY_LEAVE_ACTION_NAME, LeaveRequest::new,
ThreadPool.Names.GENERIC, new LeaveRequestRequestHandler());
}
Expand Down Expand Up @@ -152,20 +155,23 @@ public void onFailure(Exception e) {
}
}

class ValidateJoinRequest extends TransportRequest {
static class ValidateJoinRequest extends TransportRequest {
private final Supplier<DiscoveryNode> localNode;
private ClusterState state;

ValidateJoinRequest() {
ValidateJoinRequest(Supplier<DiscoveryNode> localNode) {
this.localNode = localNode;
}

ValidateJoinRequest(ClusterState state) {
this.state = state;
this.localNode = state.nodes()::getLocalNode;
}

@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
this.state = ClusterState.Builder.readFrom(in, nodesProvider.nodes().getLocalNode());
this.state = ClusterState.Builder.readFrom(in, localNode.get());
}

@Override
Expand All @@ -175,13 +181,25 @@ public void writeTo(StreamOutput out) throws IOException {
}
}

class ValidateJoinRequestRequestHandler implements TransportRequestHandler<ValidateJoinRequest> {
static class ValidateJoinRequestRequestHandler implements TransportRequestHandler<ValidateJoinRequest> {

@Override
public void messageReceived(ValidateJoinRequest request, TransportChannel channel) throws Exception {
MetaData metaData = request.state.getMetaData();
ensureAllIndicesAreCompatible(metaData);
// for now, the mere fact that we can serialize the cluster state acts as validation....
channel.sendResponse(TransportResponse.Empty.INSTANCE);
}

void ensureAllIndicesAreCompatible(MetaData metaData) {
for (IndexMetaData idxMetaData : metaData) {
if(idxMetaData.getState() == IndexMetaData.State.OPEN &&
idxMetaData.getCreationVersion().before(Version.CURRENT.minimumIndexCompatibilityVersion())) {
throw new IllegalStateException("index " + idxMetaData.getIndex() + " version not supported: "
+ idxMetaData.getCreationVersion());
}
}
}
}

public static class LeaveRequest extends TransportRequest {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ public ZenDiscovery(Settings settings, ThreadPool threadPool, TransportService t
new NewPendingClusterStateListener(),
discoverySettings,
clusterService.getClusterName());
this.membership = new MembershipAction(settings, transportService, this, new MembershipListener());
this.membership = new MembershipAction(settings, transportService, this::localNode, new MembershipListener());
this.joinThreadControl = new JoinThreadControl(threadPool);

transportService.registerRequestHandler(
Expand Down
6 changes: 6 additions & 0 deletions core/src/test/java/org/elasticsearch/VersionTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,13 @@ public void testVersionComparison() throws Exception {
assertTrue(Version.fromString("5.0.0").onOrAfter(Version.fromString("5.0.0-beta2")));
assertTrue(Version.fromString("5.0.0-rc1").onOrAfter(Version.fromString("5.0.0-beta24")));
assertTrue(Version.fromString("5.0.0-alpha24").before(Version.fromString("5.0.0-beta0")));
}

public void testMinimumIndexCompatibilityVersion() {
assertEquals(Version.V_5_0_0, Version.V_6_0_0_alpha1_UNRELEASED.minimumIndexCompatibilityVersion());
assertEquals(Version.V_2_0_0, Version.V_5_0_0.minimumIndexCompatibilityVersion());
assertEquals(Version.V_2_0_0, Version.V_5_1_0_UNRELEASED.minimumIndexCompatibilityVersion());
assertEquals(Version.V_2_0_0, Version.V_5_0_0_alpha1.minimumIndexCompatibilityVersion());
}

public void testVersionConstantPresent() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,16 @@
package org.elasticsearch.discovery.zen;

import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.EnumSet;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;

import org.apache.lucene.util.IOUtils;
Expand All @@ -35,22 +38,40 @@
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNode.Role;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.IndexRoutingTable;
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
import org.elasticsearch.cluster.routing.RoutingTable;
import org.elasticsearch.cluster.routing.ShardRoutingState;
import org.elasticsearch.cluster.routing.TestShardRouting;
import org.elasticsearch.cluster.routing.UnassignedInfo;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.discovery.Discovery;
import org.elasticsearch.discovery.zen.PublishClusterStateActionTests.AssertingAckListener;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.VersionUtils;
import org.elasticsearch.test.transport.MockTransportService;
import org.elasticsearch.threadpool.TestThreadPool;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportChannel;
import org.elasticsearch.transport.TransportResponse;
import org.elasticsearch.transport.TransportResponseOptions;
import org.elasticsearch.transport.TransportService;

import static java.util.Collections.emptyMap;
import static java.util.Collections.emptySet;
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_CREATION_DATE;
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_REPLICAS;
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_SHARDS;
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_VERSION_CREATED;
import static org.elasticsearch.cluster.routing.RoutingTableTests.updateActiveAllocations;
import static org.elasticsearch.discovery.zen.ElectMasterService.DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING;
import static org.elasticsearch.discovery.zen.ZenDiscovery.shouldIgnoreOrRejectNewClusterState;
import static org.elasticsearch.test.ClusterServiceUtils.createClusterService;
Expand Down Expand Up @@ -283,4 +304,77 @@ private Set<DiscoveryNode> fdNodesForState(ClusterState clusterState, DiscoveryN
});
return discoveryNodes;
}

public void testValidateOnUnsupportedIndexVersionCreated() throws Exception {
ClusterState.Builder stateBuilder = ClusterState.builder(ClusterName.DEFAULT);
final DiscoveryNode otherNode = new DiscoveryNode("other_node", buildNewFakeTransportAddress(), emptyMap(),
EnumSet.allOf(DiscoveryNode.Role.class), Version.CURRENT);
MembershipAction.ValidateJoinRequestRequestHandler request = new MembershipAction.ValidateJoinRequestRequestHandler();
final boolean closed = randomBoolean();
IndexMetaData indexMetaData = IndexMetaData.builder("test").settings(Settings.builder()
.put(SETTING_VERSION_CREATED, VersionUtils.getPreviousVersion(Version.CURRENT.minimumCompatibilityVersion()))
.put(SETTING_NUMBER_OF_SHARDS, 1).put(SETTING_NUMBER_OF_REPLICAS, 0)
.put(SETTING_CREATION_DATE, System.currentTimeMillis()))
.state(closed ? IndexMetaData.State.CLOSE : IndexMetaData.State.OPEN)
.build();
IndexRoutingTable.Builder indexRoutingTableBuilder = IndexRoutingTable.builder(indexMetaData.getIndex());
RoutingTable.Builder routing = new RoutingTable.Builder();
routing.addAsNew(indexMetaData);
final ShardId shardId = new ShardId("test", "_na_", 0);
IndexShardRoutingTable.Builder indexShardRoutingBuilder = new IndexShardRoutingTable.Builder(shardId);

final DiscoveryNode primaryNode = otherNode;
indexShardRoutingBuilder.addShard(TestShardRouting.newShardRouting("test", 0, primaryNode.getId(), null, true,
ShardRoutingState.INITIALIZING, new UnassignedInfo(UnassignedInfo.Reason.INDEX_REOPENED, "getting there")));
indexRoutingTableBuilder.addIndexShard(indexShardRoutingBuilder.build());
IndexRoutingTable indexRoutingTable = indexRoutingTableBuilder.build();
IndexMetaData updatedIndexMetaData = updateActiveAllocations(indexRoutingTable, indexMetaData);
stateBuilder.metaData(MetaData.builder().put(updatedIndexMetaData, false).generateClusterUuidIfNeeded())
.routingTable(RoutingTable.builder().add(indexRoutingTable).build());
if (closed == false) {
IllegalStateException ex = expectThrows(IllegalStateException.class, () ->
request.messageReceived(new MembershipAction.ValidateJoinRequest(stateBuilder.build()), null));
assertEquals("index [test] version not supported: "
+ VersionUtils.getPreviousVersion(Version.CURRENT.minimumCompatibilityVersion()), ex.getMessage());
} else {
AtomicBoolean sendResponse = new AtomicBoolean(false);
request.messageReceived(new MembershipAction.ValidateJoinRequest(stateBuilder.build()), new TransportChannel() {
@Override
public String action() {
return null;
}

@Override
public String getProfileName() {
return null;
}

@Override
public long getRequestId() {
return 0;
}

@Override
public String getChannelType() {
return null;
}

@Override
public void sendResponse(TransportResponse response) throws IOException {
sendResponse.set(true);
}

@Override
public void sendResponse(TransportResponse response, TransportResponseOptions options) throws IOException {

}

@Override
public void sendResponse(Exception exception) throws IOException {

}
});
assertTrue(sendResponse.get());
}
}
}