Skip to content

Commit

Permalink
Introduce index settings version (#34429)
Browse files Browse the repository at this point in the history
This commit introduces settings version to index metadata. This value is
monotonically increasing and is updated on settings updates. This will
be useful in cross-cluster replication so that we can request settings
updates from the leader only when there is a settings update.
  • Loading branch information
jasontedor authored Oct 16, 2018
1 parent e0b6721 commit 4b2052c
Show file tree
Hide file tree
Showing 16 changed files with 337 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,10 @@ public String toString() {
final String TAB = " ";
for (IndexMetaData indexMetaData : metaData) {
sb.append(TAB).append(indexMetaData.getIndex());
sb.append(": v[").append(indexMetaData.getVersion()).append("], mv[").append(indexMetaData.getMappingVersion()).append("]\n");
sb.append(": v[").append(indexMetaData.getVersion())
.append("], mv[").append(indexMetaData.getMappingVersion())
.append("], sv[").append(indexMetaData.getSettingsVersion())
.append("]\n");
for (int shard = 0; shard < indexMetaData.getNumberOfShards(); shard++) {
sb.append(TAB).append(TAB).append(shard).append(": ");
sb.append("p_term [").append(indexMetaData.primaryTerm(shard)).append("], ");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,7 @@ public Iterator<Setting<Integer>> settings() {
public static final String KEY_IN_SYNC_ALLOCATIONS = "in_sync_allocations";
static final String KEY_VERSION = "version";
static final String KEY_MAPPING_VERSION = "mapping_version";
static final String KEY_SETTINGS_VERSION = "settings_version";
static final String KEY_ROUTING_NUM_SHARDS = "routing_num_shards";
static final String KEY_SETTINGS = "settings";
static final String KEY_STATE = "state";
Expand All @@ -264,6 +265,8 @@ public Iterator<Setting<Integer>> settings() {

private final long mappingVersion;

private final long settingsVersion;

private final long[] primaryTerms;

private final State state;
Expand Down Expand Up @@ -291,7 +294,7 @@ public Iterator<Setting<Integer>> settings() {
private final ActiveShardCount waitForActiveShards;
private final ImmutableOpenMap<String, RolloverInfo> rolloverInfos;

private IndexMetaData(Index index, long version, long mappingVersion, long[] primaryTerms, State state, int numberOfShards, int numberOfReplicas, Settings settings,
private IndexMetaData(Index index, long version, long mappingVersion, long settingsVersion, long[] primaryTerms, State state, int numberOfShards, int numberOfReplicas, Settings settings,
ImmutableOpenMap<String, MappingMetaData> mappings, ImmutableOpenMap<String, AliasMetaData> aliases,
ImmutableOpenMap<String, DiffableStringMap> customData, ImmutableOpenIntMap<Set<String>> inSyncAllocationIds,
DiscoveryNodeFilters requireFilters, DiscoveryNodeFilters initialRecoveryFilters, DiscoveryNodeFilters includeFilters, DiscoveryNodeFilters excludeFilters,
Expand All @@ -302,6 +305,8 @@ private IndexMetaData(Index index, long version, long mappingVersion, long[] pri
this.version = version;
assert mappingVersion >= 0 : mappingVersion;
this.mappingVersion = mappingVersion;
assert settingsVersion >= 0 : settingsVersion;
this.settingsVersion = settingsVersion;
this.primaryTerms = primaryTerms;
assert primaryTerms.length == numberOfShards;
this.state = state;
Expand Down Expand Up @@ -355,6 +360,10 @@ public long getMappingVersion() {
return mappingVersion;
}

public long getSettingsVersion() {
return settingsVersion;
}

/**
* The term of the current selected primary. This is a non-negative number incremented when
* a primary shard is assigned after a full cluster restart or a replica shard is promoted to a primary.
Expand Down Expand Up @@ -596,6 +605,7 @@ private static class IndexMetaDataDiff implements Diff<IndexMetaData> {
private final int routingNumShards;
private final long version;
private final long mappingVersion;
private final long settingsVersion;
private final long[] primaryTerms;
private final State state;
private final Settings settings;
Expand All @@ -609,6 +619,7 @@ private static class IndexMetaDataDiff implements Diff<IndexMetaData> {
index = after.index.getName();
version = after.version;
mappingVersion = after.mappingVersion;
settingsVersion = after.settingsVersion;
routingNumShards = after.routingNumShards;
state = after.state;
settings = after.settings;
Expand All @@ -630,6 +641,11 @@ private static class IndexMetaDataDiff implements Diff<IndexMetaData> {
} else {
mappingVersion = 1;
}
if (in.getVersion().onOrAfter(Version.V_7_0_0_alpha1)) {
settingsVersion = in.readVLong();
} else {
settingsVersion = 1;
}
state = State.fromId(in.readByte());
settings = Settings.readSettingsFromStream(in);
primaryTerms = in.readVLongArray();
Expand Down Expand Up @@ -658,6 +674,9 @@ public void writeTo(StreamOutput out) throws IOException {
if (out.getVersion().onOrAfter(Version.V_6_5_0)) {
out.writeVLong(mappingVersion);
}
if (out.getVersion().onOrAfter(Version.V_7_0_0_alpha1)) {
out.writeVLong(settingsVersion);
}
out.writeByte(state.id);
Settings.writeSettingsToStream(settings, out);
out.writeVLongArray(primaryTerms);
Expand All @@ -675,6 +694,7 @@ public IndexMetaData apply(IndexMetaData part) {
Builder builder = builder(index);
builder.version(version);
builder.mappingVersion(mappingVersion);
builder.settingsVersion(settingsVersion);
builder.setRoutingNumShards(routingNumShards);
builder.state(state);
builder.settings(settings);
Expand All @@ -696,6 +716,11 @@ public static IndexMetaData readFrom(StreamInput in) throws IOException {
} else {
builder.mappingVersion(1);
}
if (in.getVersion().onOrAfter(Version.V_7_0_0_alpha1)) {
builder.settingsVersion(in.readVLong());
} else {
builder.settingsVersion(1);
}
builder.setRoutingNumShards(in.readInt());
builder.state(State.fromId(in.readByte()));
builder.settings(readSettingsFromStream(in));
Expand Down Expand Up @@ -745,6 +770,9 @@ public void writeTo(StreamOutput out) throws IOException {
if (out.getVersion().onOrAfter(Version.V_6_5_0)) {
out.writeVLong(mappingVersion);
}
if (out.getVersion().onOrAfter(Version.V_7_0_0_alpha1)) {
out.writeVLong(settingsVersion);
}
out.writeInt(routingNumShards);
out.writeByte(state.id());
writeSettingsToStream(settings, out);
Expand Down Expand Up @@ -793,6 +821,7 @@ public static class Builder {
private State state = State.OPEN;
private long version = 1;
private long mappingVersion = 1;
private long settingsVersion = 1;
private long[] primaryTerms = null;
private Settings settings = Settings.Builder.EMPTY_SETTINGS;
private final ImmutableOpenMap.Builder<String, MappingMetaData> mappings;
Expand All @@ -816,6 +845,7 @@ public Builder(IndexMetaData indexMetaData) {
this.state = indexMetaData.state;
this.version = indexMetaData.version;
this.mappingVersion = indexMetaData.mappingVersion;
this.settingsVersion = indexMetaData.settingsVersion;
this.settings = indexMetaData.getSettings();
this.primaryTerms = indexMetaData.primaryTerms.clone();
this.mappings = ImmutableOpenMap.builder(indexMetaData.mappings);
Expand Down Expand Up @@ -990,11 +1020,20 @@ public long mappingVersion() {
return mappingVersion;
}

public long settingsVersion() {
return settingsVersion;
}

public Builder mappingVersion(final long mappingVersion) {
this.mappingVersion = mappingVersion;
return this;
}

public Builder settingsVersion(final long settingsVersion) {
this.settingsVersion = settingsVersion;
return this;
}

/**
* returns the primary term for the given shard.
* See {@link IndexMetaData#primaryTerm(int)} for more information.
Expand Down Expand Up @@ -1122,7 +1161,7 @@ public IndexMetaData build() {

final String uuid = settings.get(SETTING_INDEX_UUID, INDEX_UUID_NA_VALUE);

return new IndexMetaData(new Index(index, uuid), version, mappingVersion, primaryTerms, state, numberOfShards, numberOfReplicas, tmpSettings, mappings.build(),
return new IndexMetaData(new Index(index, uuid), version, mappingVersion, settingsVersion, primaryTerms, state, numberOfShards, numberOfReplicas, tmpSettings, mappings.build(),
tmpAliases.build(), customMetaData.build(), filledInSyncAllocationIds.build(), requireFilters, initialRecoveryFilters, includeFilters, excludeFilters,
indexCreatedVersion, indexUpgradedVersion, getRoutingNumShards(), routingPartitionSize, waitForActiveShards, rolloverInfos.build());
}
Expand All @@ -1132,6 +1171,7 @@ public static void toXContent(IndexMetaData indexMetaData, XContentBuilder build

builder.field(KEY_VERSION, indexMetaData.getVersion());
builder.field(KEY_MAPPING_VERSION, indexMetaData.getMappingVersion());
builder.field(KEY_SETTINGS_VERSION, indexMetaData.getSettingsVersion());
builder.field(KEY_ROUTING_NUM_SHARDS, indexMetaData.getRoutingNumShards());
builder.field(KEY_STATE, indexMetaData.getState().toString().toLowerCase(Locale.ENGLISH));

Expand Down Expand Up @@ -1205,6 +1245,7 @@ public static IndexMetaData fromXContent(XContentParser parser) throws IOExcepti
throw new IllegalArgumentException("expected object but got a " + token);
}
boolean mappingVersion = false;
boolean settingsVersion = false;
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
if (token == XContentParser.Token.FIELD_NAME) {
currentFieldName = parser.currentName();
Expand Down Expand Up @@ -1299,6 +1340,9 @@ public static IndexMetaData fromXContent(XContentParser parser) throws IOExcepti
} else if (KEY_MAPPING_VERSION.equals(currentFieldName)) {
mappingVersion = true;
builder.mappingVersion(parser.longValue());
} else if (KEY_SETTINGS_VERSION.equals(currentFieldName)) {
settingsVersion = true;
builder.settingsVersion(parser.longValue());
} else if (KEY_ROUTING_NUM_SHARDS.equals(currentFieldName)) {
builder.setRoutingNumShards(parser.intValue());
} else {
Expand All @@ -1311,6 +1355,9 @@ public static IndexMetaData fromXContent(XContentParser parser) throws IOExcepti
if (Assertions.ENABLED && Version.indexCreated(builder.settings).onOrAfter(Version.V_6_5_0)) {
assert mappingVersion : "mapping version should be present for indices created on or after 6.5.0";
}
if (Assertions.ENABLED && Version.indexCreated(builder.settings).onOrAfter(Version.V_7_0_0_alpha1)) {
assert settingsVersion : "settings version should be present for indices created on or after 7.0.0";
}
return builder.build();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import java.util.Set;

import static org.elasticsearch.action.support.ContextPreservingActionListener.wrapPreservingContext;
import static org.elasticsearch.index.IndexSettings.same;

/**
* Service responsible for submitting update index settings requests
Expand Down Expand Up @@ -187,6 +188,14 @@ public ClusterState execute(ClusterState currentState) {
}
}

// increment settings versions
for (final String index : actualIndices) {
if (same(currentState.metaData().index(index).getSettings(), metaDataBuilder.get(index).getSettings()) == false) {
final IndexMetaData.Builder builder = IndexMetaData.builder(metaDataBuilder.get(index));
builder.settingsVersion(1 + builder.settingsVersion());
metaDataBuilder.put(builder);
}
}

ClusterState updatedState = ClusterState.builder(currentState).metaData(metaDataBuilder).routingTable(routingTableBuilder.build()).blocks(blocks).build();

Expand Down Expand Up @@ -220,9 +229,9 @@ public ClusterState execute(ClusterState currentState) {
*/
private static void maybeUpdateClusterBlock(String[] actualIndices, ClusterBlocks.Builder blocks, ClusterBlock block, Setting<Boolean> setting, Settings openSettings) {
if (setting.exists(openSettings)) {
final boolean updateReadBlock = setting.get(openSettings);
final boolean updateBlock = setting.get(openSettings);
for (String index : actualIndices) {
if (updateReadBlock) {
if (updateBlock) {
blocks.addIndexBlock(index, block);
} else {
blocks.removeIndexBlock(index, block);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,13 @@ private ClusterState adaptAutoExpandReplicas(ClusterState clusterState) {
// operation which make these copies stale
routingTableBuilder.updateNumberOfReplicas(numberOfReplicas, indices);
metaDataBuilder.updateNumberOfReplicas(numberOfReplicas, indices);
// update settings version for each index
for (final String index : indices) {
final IndexMetaData indexMetaData = metaDataBuilder.get(index);
final IndexMetaData.Builder indexMetaDataBuilder =
new IndexMetaData.Builder(indexMetaData).settingsVersion(1 + indexMetaData.getSettingsVersion());
metaDataBuilder.put(indexMetaDataBuilder);
}
logger.info("updating number_of_replicas to [{}] for indices {}", numberOfReplicas, indices);
}
final ClusterState fixedState = ClusterState.builder(clusterState).routingTable(routingTableBuilder.build())
Expand Down
24 changes: 22 additions & 2 deletions server/src/main/java/org/elasticsearch/index/IndexService.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import org.apache.lucene.search.Sort;
import org.apache.lucene.store.AlreadyClosedException;
import org.apache.lucene.util.Accountable;
import org.elasticsearch.Assertions;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.metadata.IndexMetaData;
Expand Down Expand Up @@ -617,9 +619,27 @@ public IndexMetaData getMetaData() {
}

@Override
public synchronized void updateMetaData(final IndexMetaData metadata) {
public synchronized void updateMetaData(final IndexMetaData currentIndexMetaData, final IndexMetaData newIndexMetaData) {
final Translog.Durability oldTranslogDurability = indexSettings.getTranslogDurability();
if (indexSettings.updateIndexMetaData(metadata)) {

final boolean updateIndexMetaData = indexSettings.updateIndexMetaData(newIndexMetaData);

if (Assertions.ENABLED
&& currentIndexMetaData != null
&& currentIndexMetaData.getCreationVersion().onOrAfter(Version.V_7_0_0_alpha1)) {
final long currentSettingsVersion = currentIndexMetaData.getSettingsVersion();
final long newSettingsVersion = newIndexMetaData.getSettingsVersion();
if (currentSettingsVersion == newSettingsVersion) {
assert updateIndexMetaData == false;
} else {
assert updateIndexMetaData;
assert currentSettingsVersion < newSettingsVersion :
"expected current settings version [" + currentSettingsVersion + "] "
+ "to be less than new settings version [" + newSettingsVersion + "]";
}
}

if (updateIndexMetaData) {
for (final IndexShard shard : this.shards.values()) {
try {
shard.onSettingsChanged();
Expand Down
19 changes: 15 additions & 4 deletions server/src/main/java/org/elasticsearch/index/IndexSettings.java
Original file line number Diff line number Diff line change
Expand Up @@ -604,17 +604,28 @@ public synchronized boolean updateIndexMetaData(IndexMetaData indexMetaData) {
throw new IllegalArgumentException("uuid mismatch on settings update expected: " + getUUID() + " but was: " + newUUID);
}
this.indexMetaData = indexMetaData;
final Settings existingSettings = this.settings;
if (existingSettings.filter(IndexScopedSettings.INDEX_SETTINGS_KEY_PREDICATE)
.equals(newSettings.filter(IndexScopedSettings.INDEX_SETTINGS_KEY_PREDICATE))) {
final Settings newIndexSettings = Settings.builder().put(nodeSettings).put(newSettings).build();
if (same(this.settings, newIndexSettings)) {
// nothing to update, same settings
return false;
}
scopedSettings.applySettings(newSettings);
this.settings = Settings.builder().put(nodeSettings).put(newSettings).build();
this.settings = newIndexSettings;
return true;
}

/**
* Compare the specified settings for equality.
*
* @param left the left settings
* @param right the right settings
* @return true if the settings are the same, otherwise false
*/
public static boolean same(final Settings left, final Settings right) {
return left.filter(IndexScopedSettings.INDEX_SETTINGS_KEY_PREDICATE)
.equals(right.filter(IndexScopedSettings.INDEX_SETTINGS_KEY_PREDICATE));
}

/**
* Returns the translog durability for this index.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -559,7 +559,7 @@ public synchronized void verifyIndexMetadata(IndexMetaData metaData, IndexMetaDa
closeables.add(() -> service.close("metadata verification", false));
service.mapperService().merge(metaData, MapperService.MergeReason.MAPPING_RECOVERY);
if (metaData.equals(metaDataUpdate) == false) {
service.updateMetaData(metaDataUpdate);
service.updateMetaData(metaData, metaDataUpdate);
}
} finally {
IOUtils.close(closeables);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -488,7 +488,7 @@ private void updateIndices(ClusterChangedEvent event) {
final IndexMetaData newIndexMetaData = state.metaData().index(index);
assert newIndexMetaData != null : "index " + index + " should have been removed by deleteIndices";
if (ClusterChangedEvent.indexMetaDataChanged(currentIndexMetaData, newIndexMetaData)) {
indexService.updateMetaData(newIndexMetaData);
indexService.updateMetaData(currentIndexMetaData, newIndexMetaData);
try {
if (indexService.updateMapping(currentIndexMetaData, newIndexMetaData) && sendRefreshMapping) {
nodeMappingRefreshAction.nodeMappingRefresh(state.nodes().getMasterNode(),
Expand Down Expand Up @@ -771,9 +771,12 @@ public interface AllocatedIndex<T extends Shard> extends Iterable<T>, IndexCompo
IndexSettings getIndexSettings();

/**
* Updates the meta data of this index. Changes become visible through {@link #getIndexSettings()}
* Updates the metadata of this index. Changes become visible through {@link #getIndexSettings()}.
*
* @param currentIndexMetaData the current index metadata
* @param newIndexMetaData the new index metadata
*/
void updateMetaData(IndexMetaData indexMetaData);
void updateMetaData(IndexMetaData currentIndexMetaData, IndexMetaData newIndexMetaData);

/**
* Checks if index requires refresh from master.
Expand Down
Loading

0 comments on commit 4b2052c

Please sign in to comment.