Skip to content

Commit

Permalink
Correcting the index version filter in migration reindex logic (elast…
Browse files Browse the repository at this point in the history
  • Loading branch information
masseyke authored Dec 11, 2024
1 parent 4db3f7b commit 2c5efd2
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,14 @@
import org.elasticsearch.action.ActionType;
import org.elasticsearch.action.IndicesRequest;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.util.FeatureFlag;
import org.elasticsearch.features.NodeFeature;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexVersion;
import org.elasticsearch.index.IndexVersions;
import org.elasticsearch.xcontent.ConstructingObjectParser;
import org.elasticsearch.xcontent.ParseField;
import org.elasticsearch.xcontent.ToXContent;
Expand All @@ -39,10 +43,24 @@ public class ReindexDataStreamAction extends ActionType<ReindexDataStreamAction.
public static final ParseField SOURCE_FIELD = new ParseField("source");
public static final ParseField INDEX_FIELD = new ParseField("index");

/*
* The version before which we do not support writes in the _next_ major version of Elasticsearch. For example, Elasticsearch 10.x will
* not support writing to indices created before version 9.0.0.
*/
private static final IndexVersion MINIMUM_WRITEABLE_VERSION_AFTER_UPGRADE = IndexVersions.UPGRADE_TO_LUCENE_10_0_0;

public ReindexDataStreamAction() {
super(NAME);
}

/*
* This predicate allows through only indices that were created with a previous lucene version, meaning that they need to be reindexed
* in order to be writable in the _next_ lucene version.
*/
public static Predicate<Index> getOldIndexVersionPredicate(Metadata metadata) {
return index -> metadata.index(index).getCreationVersion().onOrBefore(MINIMUM_WRITEABLE_VERSION_AFTER_UPGRADE);
}

public enum Mode {
UPGRADE
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.elasticsearch.xpack.migrate.task.ReindexDataStreamTaskParams;

import static org.elasticsearch.xpack.migrate.action.ReindexDataStreamAction.TASK_ID_PREFIX;
import static org.elasticsearch.xpack.migrate.action.ReindexDataStreamAction.getOldIndexVersionPredicate;

/*
* This transport action creates a new persistent task for reindexing the source data stream given in the request. On successful creation
Expand Down Expand Up @@ -67,10 +68,7 @@ protected void doExecute(Task task, ReindexDataStreamRequest request, ActionList
return;
}
int totalIndices = dataStream.getIndices().size();
int totalIndicesToBeUpgraded = (int) dataStream.getIndices()
.stream()
.filter(index -> metadata.index(index).getCreationVersion().isLegacyIndexVersion())
.count();
int totalIndicesToBeUpgraded = (int) dataStream.getIndices().stream().filter(getOldIndexVersionPredicate(metadata)).count();
ReindexDataStreamTaskParams params = new ReindexDataStreamTaskParams(
sourceDataStreamName,
transportService.getThreadPool().absoluteTimeInMillis(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import java.util.List;
import java.util.Map;

import static org.elasticsearch.xpack.migrate.action.ReindexDataStreamAction.getOldIndexVersionPredicate;

public class ReindexDataStreamPersistentTaskExecutor extends PersistentTasksExecutor<ReindexDataStreamTaskParams> {
private static final TimeValue TASK_KEEP_ALIVE_TIME = TimeValue.timeValueDays(1);
private final Client client;
Expand Down Expand Up @@ -72,7 +74,7 @@ protected void nodeOperation(AllocatedPersistentTask task, ReindexDataStreamTask
if (dataStreamInfos.size() == 1) {
List<Index> indices = dataStreamInfos.getFirst().getDataStream().getIndices();
List<Index> indicesToBeReindexed = indices.stream()
.filter(index -> clusterService.state().getMetadata().index(index).getCreationVersion().isLegacyIndexVersion())
.filter(getOldIndexVersionPredicate(clusterService.state().metadata()))
.toList();
reindexDataStreamTask.setPendingIndicesCount(indicesToBeReindexed.size());
for (Index index : indicesToBeReindexed) {
Expand Down

0 comments on commit 2c5efd2

Please sign in to comment.