diff --git a/modules/data-streams/src/main/java/org/elasticsearch/datastreams/task/ReindexDataStreamPersistentTaskExecutor.java b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/task/ReindexDataStreamPersistentTaskExecutor.java index bcd1ae314ac37..095a310e5844a 100644 --- a/modules/data-streams/src/main/java/org/elasticsearch/datastreams/task/ReindexDataStreamPersistentTaskExecutor.java +++ b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/task/ReindexDataStreamPersistentTaskExecutor.java @@ -53,7 +53,6 @@ protected ReindexDataStreamTask createTask( params.startTime(), params.totalIndices(), params.totalIndicesToBeUpgraded(), - threadPool, id, type, action, @@ -76,9 +75,11 @@ protected void nodeOperation(AllocatedPersistentTask task, ReindexDataStreamTask List indicesToBeReindexed = indices.stream() .filter(index -> clusterService.state().getMetadata().index(index).getCreationVersion().isLegacyIndexVersion()) .toList(); - reindexDataStreamTask.setPendingIndices(indicesToBeReindexed.stream().map(Index::getName).toList()); + reindexDataStreamTask.setPendingIndicesCount(indicesToBeReindexed.size()); for (Index index : indicesToBeReindexed) { + reindexDataStreamTask.incrementInProgressIndicesCount(); // TODO This is just a placeholder. This is where the real data stream reindex logic will go + reindexDataStreamTask.reindexSucceeded(); } completeSuccessfulPersistentTask(reindexDataStreamTask); @@ -89,12 +90,12 @@ protected void nodeOperation(AllocatedPersistentTask task, ReindexDataStreamTask } private void completeSuccessfulPersistentTask(ReindexDataStreamTask persistentTask) { - persistentTask.reindexSucceeded(); + persistentTask.allReindexesCompleted(); threadPool.schedule(persistentTask::markAsCompleted, getTimeToLive(persistentTask), threadPool.generic()); } private void completeFailedPersistentTask(ReindexDataStreamTask persistentTask, Exception e) { - persistentTask.reindexFailed(e); + persistentTask.taskFailed(e); threadPool.schedule(() -> persistentTask.markAsFailed(e), getTimeToLive(persistentTask), threadPool.generic()); } diff --git a/modules/data-streams/src/main/java/org/elasticsearch/datastreams/task/ReindexDataStreamTask.java b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/task/ReindexDataStreamTask.java index 2ae244679659f..068579a37edb9 100644 --- a/modules/data-streams/src/main/java/org/elasticsearch/datastreams/task/ReindexDataStreamTask.java +++ b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/task/ReindexDataStreamTask.java @@ -12,29 +12,27 @@ import org.elasticsearch.core.Tuple; import org.elasticsearch.persistent.AllocatedPersistentTask; import org.elasticsearch.tasks.TaskId; -import org.elasticsearch.threadpool.ThreadPool; import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; public class ReindexDataStreamTask extends AllocatedPersistentTask { public static final String TASK_NAME = "reindex-data-stream"; private final long persistentTaskStartTime; private final int totalIndices; private final int totalIndicesToBeUpgraded; - private final ThreadPool threadPool; private boolean complete = false; private Exception exception; - private List inProgress = new ArrayList<>(); - private List pending = List.of(); + private AtomicInteger inProgress = new AtomicInteger(0); + private AtomicInteger pending = new AtomicInteger(); private List> errors = new ArrayList<>(); public ReindexDataStreamTask( long persistentTaskStartTime, int totalIndices, int totalIndicesToBeUpgraded, - ThreadPool threadPool, long id, String type, String action, @@ -46,7 +44,6 @@ public ReindexDataStreamTask( this.persistentTaskStartTime = persistentTaskStartTime; this.totalIndices = totalIndices; this.totalIndicesToBeUpgraded = totalIndicesToBeUpgraded; - this.threadPool = threadPool; } @Override @@ -57,30 +54,36 @@ public ReindexDataStreamStatus getStatus() { totalIndicesToBeUpgraded, complete, exception, - inProgress.size(), - pending.size(), + inProgress.get(), + pending.get(), errors ); } - public void reindexSucceeded() { + public void allReindexesCompleted() { this.complete = true; } - public void reindexFailed(Exception e) { + public void taskFailed(Exception e) { this.complete = true; this.exception = e; } - public void setInProgressIndices(List inProgressIndices) { - this.inProgress = inProgressIndices; + public void reindexSucceeded() { + inProgress.decrementAndGet(); + } + + public void reindexFailed(String index, Exception error) { + this.errors.add(Tuple.tuple(index, error)); + inProgress.decrementAndGet(); } - public void setPendingIndices(List pendingIndices) { - this.pending = pendingIndices; + public void incrementInProgressIndicesCount() { + inProgress.incrementAndGet(); + pending.decrementAndGet(); } - public void addErrorIndex(String index, Exception error) { - this.errors.add(Tuple.tuple(index, error)); + public void setPendingIndicesCount(int size) { + pending.set(size); } }