Skip to content

Commit

Permalink
Removing unnecessary state from DataStreamReindexTask (elastic#117942)
Browse files Browse the repository at this point in the history
  • Loading branch information
masseyke committed Dec 4, 2024
1 parent 9abeeea commit 07c42b7
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@ protected ReindexDataStreamTask createTask(
params.startTime(),
params.totalIndices(),
params.totalIndicesToBeUpgraded(),
threadPool,
id,
type,
action,
Expand All @@ -76,9 +75,11 @@ protected void nodeOperation(AllocatedPersistentTask task, ReindexDataStreamTask
List<Index> indicesToBeReindexed = indices.stream()
.filter(index -> clusterService.state().getMetadata().index(index).getCreationVersion().isLegacyIndexVersion())
.toList();
reindexDataStreamTask.setPendingIndices(indicesToBeReindexed.stream().map(Index::getName).toList());
reindexDataStreamTask.setPendingIndicesCount(indicesToBeReindexed.size());
for (Index index : indicesToBeReindexed) {
reindexDataStreamTask.incrementInProgressIndicesCount();
// TODO This is just a placeholder. This is where the real data stream reindex logic will go
reindexDataStreamTask.reindexSucceeded();
}

completeSuccessfulPersistentTask(reindexDataStreamTask);
Expand All @@ -89,12 +90,12 @@ protected void nodeOperation(AllocatedPersistentTask task, ReindexDataStreamTask
}

private void completeSuccessfulPersistentTask(ReindexDataStreamTask persistentTask) {
persistentTask.reindexSucceeded();
persistentTask.allReindexesCompleted();
threadPool.schedule(persistentTask::markAsCompleted, getTimeToLive(persistentTask), threadPool.generic());
}

private void completeFailedPersistentTask(ReindexDataStreamTask persistentTask, Exception e) {
persistentTask.reindexFailed(e);
persistentTask.taskFailed(e);
threadPool.schedule(() -> persistentTask.markAsFailed(e), getTimeToLive(persistentTask), threadPool.generic());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,29 +12,27 @@
import org.elasticsearch.core.Tuple;
import org.elasticsearch.persistent.AllocatedPersistentTask;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.threadpool.ThreadPool;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;

public class ReindexDataStreamTask extends AllocatedPersistentTask {
public static final String TASK_NAME = "reindex-data-stream";
private final long persistentTaskStartTime;
private final int totalIndices;
private final int totalIndicesToBeUpgraded;
private final ThreadPool threadPool;
private boolean complete = false;
private Exception exception;
private List<String> inProgress = new ArrayList<>();
private List<String> pending = List.of();
private AtomicInteger inProgress = new AtomicInteger(0);
private AtomicInteger pending = new AtomicInteger();
private List<Tuple<String, Exception>> errors = new ArrayList<>();

public ReindexDataStreamTask(
long persistentTaskStartTime,
int totalIndices,
int totalIndicesToBeUpgraded,
ThreadPool threadPool,
long id,
String type,
String action,
Expand All @@ -46,7 +44,6 @@ public ReindexDataStreamTask(
this.persistentTaskStartTime = persistentTaskStartTime;
this.totalIndices = totalIndices;
this.totalIndicesToBeUpgraded = totalIndicesToBeUpgraded;
this.threadPool = threadPool;
}

@Override
Expand All @@ -57,30 +54,36 @@ public ReindexDataStreamStatus getStatus() {
totalIndicesToBeUpgraded,
complete,
exception,
inProgress.size(),
pending.size(),
inProgress.get(),
pending.get(),
errors
);
}

public void reindexSucceeded() {
public void allReindexesCompleted() {
this.complete = true;
}

public void reindexFailed(Exception e) {
public void taskFailed(Exception e) {
this.complete = true;
this.exception = e;
}

public void setInProgressIndices(List<String> inProgressIndices) {
this.inProgress = inProgressIndices;
public void reindexSucceeded() {
inProgress.decrementAndGet();
}

public void reindexFailed(String index, Exception error) {
this.errors.add(Tuple.tuple(index, error));
inProgress.decrementAndGet();
}

public void setPendingIndices(List<String> pendingIndices) {
this.pending = pendingIndices;
public void incrementInProgressIndicesCount() {
inProgress.incrementAndGet();
pending.decrementAndGet();
}

public void addErrorIndex(String index, Exception error) {
this.errors.add(Tuple.tuple(index, error));
public void setPendingIndicesCount(int size) {
pending.set(size);
}
}

0 comments on commit 07c42b7

Please sign in to comment.