Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add validation for supported index version on node join, restore, upgrade & open index #21830

Merged
merged 12 commits into from
Dec 1, 2016
Merged
15 changes: 15 additions & 0 deletions core/src/main/java/org/elasticsearch/Version.java
Original file line number Diff line number Diff line change
Expand Up @@ -324,6 +324,21 @@ public Version minimumCompatibilityVersion() {
return Version.smallest(this, fromId(bwcMajor * 1000000 + bwcMinor * 10000 + 99));
}

/**
* Returns the minimum created index version that this version supports. Indices created with lower versions
* can't be used with this version.
*/
public Version minimumIndexCompatibilityVersion() {
final int bwcMajor;
if (major == 5) {
bwcMajor = 2; // we jumped from 2 to 5
} else {
bwcMajor = major - 1;
}
final int bwcMinor = 0;
return Version.smallest(this, fromId(bwcMajor * 1000000 + bwcMinor * 10000 + 99));
}

/**
* Returns <code>true</code> iff both version are compatible. Otherwise <code>false</code>
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package org.elasticsearch.cluster.metadata;

import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.close.CloseIndexClusterStateUpdateRequest;
import org.elasticsearch.action.admin.indices.open.OpenIndexClusterStateUpdateRequest;
Expand Down Expand Up @@ -160,12 +161,14 @@ public ClusterState execute(ClusterState currentState) {
MetaData.Builder mdBuilder = MetaData.builder(currentState.metaData());
ClusterBlocks.Builder blocksBuilder = ClusterBlocks.builder()
.blocks(currentState.blocks());
Version minIndexCompatibilityVersion = currentState.getNodes().getSmallestNonClientNodeVersion()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm... I'm wondering if we should exclude the non-data non-master nodes here. They do all the coordination and might rely on some metadata they can't digest? We're still have to keep the transport client around, but I don't think we should exclude ClientNode (now called coordinating nodes) ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This holds for other places in the code as well...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think coordinating nodes do rely on any metadata they can't digest. They don't parse mappings afaik (I am convinced they should not) which is the main thing here so I thin we are good

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the actual question is why do we use the smallest, we have to use the largest node in the cluster since that is the one relevant for the index version to be opened.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yikes. Of course.

.minimumIndexCompatibilityVersion();
for (IndexMetaData closedMetaData : indicesToOpen) {
final String indexName = closedMetaData.getIndex().getName();
IndexMetaData indexMetaData = IndexMetaData.builder(closedMetaData).state(IndexMetaData.State.OPEN).build();
// The index might be closed because we couldn't import it due to old incompatible version
// We need to check that this index can be upgraded to the current version
indexMetaData = metaDataIndexUpgradeService.upgradeIndexMetaData(indexMetaData);
indexMetaData = metaDataIndexUpgradeService.upgradeIndexMetaData(indexMetaData, minIndexCompatibilityVersion);
try {
indicesService.verifyIndexMetadata(indexMetaData, indexMetaData);
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,13 +67,13 @@ public MetaDataIndexUpgradeService(Settings settings, MapperRegistry mapperRegis
* If the index does not need upgrade it returns the index metadata unchanged, otherwise it returns a modified index metadata. If index
* cannot be updated the method throws an exception.
*/
public IndexMetaData upgradeIndexMetaData(IndexMetaData indexMetaData) {
public IndexMetaData upgradeIndexMetaData(IndexMetaData indexMetaData, Version minimumIndexCompatibilityVersion) {
// Throws an exception if there are too-old segments:
if (isUpgraded(indexMetaData)) {
assert indexMetaData == archiveBrokenIndexSettings(indexMetaData) : "all settings must have been upgraded before";
return indexMetaData;
}
checkSupportedVersion(indexMetaData);
checkSupportedVersion(indexMetaData, minimumIndexCompatibilityVersion);
IndexMetaData newMetaData = indexMetaData;
// we have to run this first otherwise in we try to create IndexSettings
// with broken settings and fail in checkMappingsCompatibility
Expand All @@ -92,21 +92,22 @@ boolean isUpgraded(IndexMetaData indexMetaData) {
}

/**
* Elasticsearch 5.0 no longer supports indices with pre Lucene v5.0 (Elasticsearch v2.0.0.beta1) segments. All indices
* that were created before Elasticsearch v2.0.0.beta1 should be reindexed in Elasticsearch 2.x
* Elasticsearch 6.0 no longer supports indices with pre Lucene v5.0 (Elasticsearch v5.0.0.beta1) segments. All indices
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: we lead with ES and make lucene second? it's a bit weird now as ES v5 uses lucene 6.0. How about "ES 6.0 no longer supports indices created by an ES version earlier than 5.0" and leave it at that?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seriously?

* that were created before Elasticsearch v5.0.0.beta1 should be reindexed in Elasticsearch 5.x
* before they can be opened by this version of elasticsearch. */
private void checkSupportedVersion(IndexMetaData indexMetaData) {
if (indexMetaData.getState() == IndexMetaData.State.OPEN && isSupportedVersion(indexMetaData) == false) {
throw new IllegalStateException("The index [" + indexMetaData.getIndex() + "] was created before v2.0.0.beta1."
+ " It should be reindexed in Elasticsearch 2.x before upgrading to " + Version.CURRENT + ".");
private void checkSupportedVersion(IndexMetaData indexMetaData, Version minimumIndexCompatibilityVersion) {
if (indexMetaData.getState() == IndexMetaData.State.OPEN && isSupportedVersion(indexMetaData,
minimumIndexCompatibilityVersion) == false) {
throw new IllegalStateException("The index [" + indexMetaData.getIndex() + "] was created before v5.0.0.beta1."
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any way we could bake the minimumIndexCompatibilityVersion into the message ? I'm afraid we'll forget in the future and the error message will be wrong.

+ " It should be reindexed in Elasticsearch 5.x before upgrading to " + Version.CURRENT + ".");
}
}

/*
* Returns true if this index can be supported by the current version of elasticsearch
*/
private static boolean isSupportedVersion(IndexMetaData indexMetaData) {
return indexMetaData.getCreationVersion().onOrAfter(Version.V_2_0_0_beta1);
private static boolean isSupportedVersion(IndexMetaData indexMetaData, Version minimumIndexCompatibilityVersion) {
return indexMetaData.getCreationVersion().onOrAfter(minimumIndexCompatibilityVersion);
}

/**
Expand Down Expand Up @@ -173,4 +174,4 @@ IndexMetaData archiveBrokenIndexSettings(IndexMetaData indexMetaData) {
return indexMetaData;
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,16 @@

package org.elasticsearch.discovery.zen;

import org.elasticsearch.Version;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.discovery.zen.DiscoveryNodesProvider;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.EmptyTransportResponseHandler;
import org.elasticsearch.transport.TransportChannel;
Expand All @@ -37,6 +39,7 @@

import java.io.IOException;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;

public class MembershipAction extends AbstractComponent {

Expand All @@ -58,21 +61,20 @@ public interface MembershipListener {

private final TransportService transportService;

private final DiscoveryNodesProvider nodesProvider;

private final MembershipListener listener;

public MembershipAction(Settings settings, TransportService transportService,
DiscoveryNodesProvider nodesProvider, MembershipListener listener) {
Supplier<DiscoveryNode> localNodeSupplier, MembershipListener listener) {
super(settings);
this.transportService = transportService;
this.nodesProvider = nodesProvider;
this.listener = listener;


transportService.registerRequestHandler(DISCOVERY_JOIN_ACTION_NAME, JoinRequest::new,
ThreadPool.Names.GENERIC, new JoinRequestRequestHandler());
transportService.registerRequestHandler(DISCOVERY_JOIN_VALIDATE_ACTION_NAME, ValidateJoinRequest::new,
ThreadPool.Names.GENERIC, new ValidateJoinRequestRequestHandler());
transportService.registerRequestHandler(DISCOVERY_JOIN_VALIDATE_ACTION_NAME,
() -> new ValidateJoinRequest(localNodeSupplier), ThreadPool.Names.GENERIC,
new ValidateJoinRequestRequestHandler());
transportService.registerRequestHandler(DISCOVERY_LEAVE_ACTION_NAME, LeaveRequest::new,
ThreadPool.Names.GENERIC, new LeaveRequestRequestHandler());
}
Expand Down Expand Up @@ -152,20 +154,23 @@ public void onFailure(Exception e) {
}
}

class ValidateJoinRequest extends TransportRequest {
static class ValidateJoinRequest extends TransportRequest {
private final Supplier<DiscoveryNode> localNode;
private ClusterState state;

ValidateJoinRequest() {
ValidateJoinRequest(Supplier<DiscoveryNode> localNode) {
this.localNode = localNode;
}

ValidateJoinRequest(ClusterState state) {
this.state = state;
this.localNode = state.nodes()::getLocalNode;
}

@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
this.state = ClusterState.Builder.readFrom(in, nodesProvider.nodes().getLocalNode());
this.state = ClusterState.Builder.readFrom(in, localNode.get());
}

@Override
Expand All @@ -175,15 +180,31 @@ public void writeTo(StreamOutput out) throws IOException {
}
}

class ValidateJoinRequestRequestHandler implements TransportRequestHandler<ValidateJoinRequest> {
static class ValidateJoinRequestRequestHandler implements TransportRequestHandler<ValidateJoinRequest> {

@Override
public void messageReceived(ValidateJoinRequest request, TransportChannel channel) throws Exception {
ensureIndexCompatibility(Version.CURRENT.minimumIndexCompatibilityVersion(), request.state.getMetaData());
// for now, the mere fact that we can serialize the cluster state acts as validation....
channel.sendResponse(TransportResponse.Empty.INSTANCE);
}
}

/**
* Ensures that all indices are compatible with the supported index version.
* @throws IllegalStateException if any index is incompatible with the given version
*/
static void ensureIndexCompatibility(final Version supportedIndexVersion, MetaData metaData) {
// we ensure that all indices in the cluster we join are compatible with us no matter if they are
// closed or not we can't read mappings of these indices so we need to reject the join...
for (IndexMetaData idxMetaData : metaData) {
if (idxMetaData.getCreationVersion().before(supportedIndexVersion)) {
throw new IllegalStateException("index " + idxMetaData.getIndex() + " version not supported: "
+ idxMetaData.getCreationVersion() + " minimum compatible index version is: " + supportedIndexVersion);
}
}
}

public static class LeaveRequest extends TransportRequest {

private DiscoveryNode node;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -410,7 +410,6 @@ class JoinTaskExecutor implements ClusterStateTaskExecutor<DiscoveryNode> {
@Override
public BatchResult<DiscoveryNode> execute(ClusterState currentState, List<DiscoveryNode> joiningNodes) throws Exception {
final BatchResult.Builder<DiscoveryNode> results = BatchResult.builder();

final DiscoveryNodes currentNodes = currentState.nodes();
boolean nodesChanged = false;
ClusterState.Builder newState;
Expand All @@ -435,8 +434,10 @@ public BatchResult<DiscoveryNode> execute(ClusterState currentState, List<Discov

assert nodesBuilder.isLocalNodeElectedMaster();

Version minNodeVersion = Version.CURRENT;
// processing any joins
for (final DiscoveryNode node : joiningNodes) {
minNodeVersion = Version.smallest(minNodeVersion, node.getVersion());
if (node.equals(BECOME_MASTER_TASK) || node.equals(FINISH_ELECTION_TASK)) {
// noop
} else if (currentNodes.nodeExists(node)) {
Expand All @@ -452,7 +453,9 @@ public BatchResult<DiscoveryNode> execute(ClusterState currentState, List<Discov
}
results.success(node);
}

// we do this validation quite late to prevent race conditions between nodes joining and importing dangling indices
// we have to reject nodes that don't support all indices we have in this cluster
MembershipAction.ensureIndexCompatibility(minNodeVersion.minimumIndexCompatibilityVersion(), currentState.getMetaData());
if (nodesChanged) {
newState.nodes(nodesBuilder);
return results.build(allocationService.reroute(newState.build(), "node_join"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ public ZenDiscovery(Settings settings, ThreadPool threadPool, TransportService t
new NewPendingClusterStateListener(),
discoverySettings,
clusterService.getClusterName());
this.membership = new MembershipAction(settings, transportService, this, new MembershipListener());
this.membership = new MembershipAction(settings, transportService, this::localNode, new MembershipListener());
this.joinThreadControl = new JoinThreadControl(threadPool);

transportService.registerRequestHandler(
Expand Down Expand Up @@ -301,7 +301,6 @@ public void publish(ClusterChangedEvent clusterChangedEvent, AckListener ackList
if (!clusterChangedEvent.state().getNodes().isLocalNodeElectedMaster()) {
throw new IllegalStateException("Shouldn't publish state when not master");
}

try {
publishClusterState.publish(clusterChangedEvent, electMaster.minimumMasterNodes(), ackListener);
} catch (FailedToCommitClusterStateException t) {
Expand Down Expand Up @@ -852,6 +851,9 @@ void handleJoinRequest(final DiscoveryNode node, final ClusterState state, final
if (nodeJoinController == null) {
throw new IllegalStateException("discovery module is not yet started");
} else {
// we do this in a couple of places including the cluster update thread. This one here is really just best effort
// to ensure we fail as fast as possible.
MembershipAction.ensureIndexCompatibility(node.getVersion().minimumIndexCompatibilityVersion(), state.getMetaData());
// try and connect to the node, if it fails, we can raise an exception back to the client...
transportService.connectToNode(node);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,8 @@ static MetaData upgradeMetaData(MetaData metaData,
boolean changed = false;
final MetaData.Builder upgradedMetaData = MetaData.builder(metaData);
for (IndexMetaData indexMetaData : metaData) {
IndexMetaData newMetaData = metaDataIndexUpgradeService.upgradeIndexMetaData(indexMetaData);
IndexMetaData newMetaData = metaDataIndexUpgradeService.upgradeIndexMetaData(indexMetaData,
Version.CURRENT.minimumIndexCompatibilityVersion());
changed |= indexMetaData != newMetaData;
upgradedMetaData.put(newMetaData, false);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,15 @@

import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.logging.log4j.util.Supplier;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateUpdateTask;
import org.elasticsearch.cluster.block.ClusterBlocks;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.metadata.MetaDataIndexUpgradeService;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.RoutingTable;
import org.elasticsearch.cluster.routing.allocation.AllocationService;
import org.elasticsearch.cluster.service.ClusterService;
Expand Down Expand Up @@ -126,10 +128,18 @@ public ClusterState execute(ClusterState currentState) {
MetaData.Builder metaData = MetaData.builder(currentState.metaData());
ClusterBlocks.Builder blocks = ClusterBlocks.builder().blocks(currentState.blocks());
RoutingTable.Builder routingTableBuilder = RoutingTable.builder(currentState.routingTable());

Version minIndexCompatibilityVersion = currentState.getNodes().getSmallestNonClientNodeVersion()
.minimumIndexCompatibilityVersion();
boolean importNeeded = false;
StringBuilder sb = new StringBuilder();
for (IndexMetaData indexMetaData : request.indices) {
if (indexMetaData.getCreationVersion().before(minIndexCompatibilityVersion)) {
logger.warn("ignoring dangled index [{}] on node [{}]" +
" since it's created version [{}] is not supported by at least one node in the cluster minVersion [{}]",
indexMetaData.getIndex(), request.fromNode, indexMetaData.getCreationVersion(),
minIndexCompatibilityVersion);
continue;
}
if (currentState.metaData().hasIndex(indexMetaData.getIndex().getName())) {
continue;
}
Expand All @@ -144,7 +154,8 @@ public ClusterState execute(ClusterState currentState) {
try {
// The dangled index might be from an older version, we need to make sure it's compatible
// with the current version and upgrade it if needed.
upgradedIndexMetaData = metaDataIndexUpgradeService.upgradeIndexMetaData(indexMetaData);
upgradedIndexMetaData = metaDataIndexUpgradeService.upgradeIndexMetaData(indexMetaData,
minIndexCompatibilityVersion);
} catch (Exception ex) {
// upgrade failed - adding index as closed
logger.warn((Supplier<?>) () -> new ParameterizedMessage("found dangled index [{}] on node [{}]. This index cannot be upgraded to the latest version, adding as closed", indexMetaData.getIndex(), request.fromNode), ex);
Expand Down
Loading