From 688e069e4744ef38af30863833e0cc58915c88ce Mon Sep 17 00:00:00 2001 From: Keith Massey Date: Tue, 3 Dec 2024 17:14:56 -0600 Subject: [PATCH] Removing unnecessary state from DataStreamReindexTask --- ...indexDataStreamPersistentTaskExecutor.java | 9 ++--- .../migrate/task/ReindexDataStreamTask.java | 35 ++++++++++--------- 2 files changed, 24 insertions(+), 20 deletions(-) diff --git a/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/task/ReindexDataStreamPersistentTaskExecutor.java b/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/task/ReindexDataStreamPersistentTaskExecutor.java index e2a41ea186643..0f3f8b17f27ad 100644 --- a/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/task/ReindexDataStreamPersistentTaskExecutor.java +++ b/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/task/ReindexDataStreamPersistentTaskExecutor.java @@ -51,7 +51,6 @@ protected ReindexDataStreamTask createTask( params.startTime(), params.totalIndices(), params.totalIndicesToBeUpgraded(), - threadPool, id, type, action, @@ -74,9 +73,11 @@ protected void nodeOperation(AllocatedPersistentTask task, ReindexDataStreamTask List indicesToBeReindexed = indices.stream() .filter(index -> clusterService.state().getMetadata().index(index).getCreationVersion().isLegacyIndexVersion()) .toList(); - reindexDataStreamTask.setPendingIndices(indicesToBeReindexed.stream().map(Index::getName).toList()); + reindexDataStreamTask.setPendingIndicesCount(indicesToBeReindexed.size()); for (Index index : indicesToBeReindexed) { + reindexDataStreamTask.incrementInProgressIndicesCount(); // TODO This is just a placeholder. This is where the real data stream reindex logic will go + reindexDataStreamTask.reindexSucceeded(); } completeSuccessfulPersistentTask(reindexDataStreamTask); @@ -87,12 +88,12 @@ protected void nodeOperation(AllocatedPersistentTask task, ReindexDataStreamTask } private void completeSuccessfulPersistentTask(ReindexDataStreamTask persistentTask) { - persistentTask.reindexSucceeded(); + persistentTask.allReindexesCompleted(); threadPool.schedule(persistentTask::markAsCompleted, getTimeToLive(persistentTask), threadPool.generic()); } private void completeFailedPersistentTask(ReindexDataStreamTask persistentTask, Exception e) { - persistentTask.reindexFailed(e); + persistentTask.taskFailed(e); threadPool.schedule(() -> persistentTask.markAsFailed(e), getTimeToLive(persistentTask), threadPool.generic()); } diff --git a/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/task/ReindexDataStreamTask.java b/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/task/ReindexDataStreamTask.java index 722b30d9970db..72ddb87e9dea5 100644 --- a/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/task/ReindexDataStreamTask.java +++ b/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/task/ReindexDataStreamTask.java @@ -10,29 +10,27 @@ import org.elasticsearch.core.Tuple; import org.elasticsearch.persistent.AllocatedPersistentTask; import org.elasticsearch.tasks.TaskId; -import org.elasticsearch.threadpool.ThreadPool; import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; public class ReindexDataStreamTask extends AllocatedPersistentTask { public static final String TASK_NAME = "reindex-data-stream"; private final long persistentTaskStartTime; private final int totalIndices; private final int totalIndicesToBeUpgraded; - private final ThreadPool threadPool; private boolean complete = false; private Exception exception; - private List inProgress = new ArrayList<>(); - private List pending = List.of(); + private AtomicInteger inProgress = new AtomicInteger(0); + private AtomicInteger pending = new AtomicInteger(); private List> errors = new ArrayList<>(); public ReindexDataStreamTask( long persistentTaskStartTime, int totalIndices, int totalIndicesToBeUpgraded, - ThreadPool threadPool, long id, String type, String action, @@ -44,7 +42,6 @@ public ReindexDataStreamTask( this.persistentTaskStartTime = persistentTaskStartTime; this.totalIndices = totalIndices; this.totalIndicesToBeUpgraded = totalIndicesToBeUpgraded; - this.threadPool = threadPool; } @Override @@ -55,30 +52,36 @@ public ReindexDataStreamStatus getStatus() { totalIndicesToBeUpgraded, complete, exception, - inProgress.size(), - pending.size(), + inProgress.get(), + pending.get(), errors ); } - public void reindexSucceeded() { + public void allReindexesCompleted() { this.complete = true; } - public void reindexFailed(Exception e) { + public void taskFailed(Exception e) { this.complete = true; this.exception = e; } - public void setInProgressIndices(List inProgressIndices) { - this.inProgress = inProgressIndices; + public void reindexSucceeded() { + inProgress.decrementAndGet(); + } + + public void reindexFailed(String index, Exception error) { + this.errors.add(Tuple.tuple(index, error)); + inProgress.decrementAndGet(); } - public void setPendingIndices(List pendingIndices) { - this.pending = pendingIndices; + public void incrementInProgressIndicesCount() { + inProgress.incrementAndGet(); + pending.decrementAndGet(); } - public void addErrorIndex(String index, Exception error) { - this.errors.add(Tuple.tuple(index, error)); + public void setPendingIndicesCount(int size) { + pending.set(size); } }