Skip to content

Commit

Permalink
feat(core): add a purge task (#742)
Browse files Browse the repository at this point in the history
close #737
  • Loading branch information
tchiotludo authored Sep 20, 2022
1 parent 59f990b commit 8372709
Show file tree
Hide file tree
Showing 22 changed files with 687 additions and 59 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import io.kestra.core.models.executions.statistics.Flow;
import io.kestra.core.models.flows.State;
import io.micronaut.data.model.Pageable;
import io.reactivex.Flowable;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.experimental.SuperBuilder;
Expand Down Expand Up @@ -36,6 +37,15 @@ ArrayListTotal<Execution> find(
@Nullable List<State.Type> state
);

Flowable<Execution> find(
@Nullable String query,
@Nullable String namespace,
@Nullable String flowId,
@Nullable ZonedDateTime startDate,
@Nullable ZonedDateTime endDate,
@Nullable List<State.Type> state
);

ArrayListTotal<TaskRun> findTaskRun(
Pageable pageable,
@Nullable String query,
Expand All @@ -48,6 +58,8 @@ ArrayListTotal<TaskRun> findTaskRun(

Execution delete(Execution execution);

Integer purge(Execution execution);

Integer maxTaskRunSetting();

List<DailyExecutionStatistics> dailyStatistics(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
package io.kestra.core.repositories;

import io.micronaut.data.model.Pageable;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.executions.LogEntry;
import io.micronaut.data.model.Pageable;
import org.slf4j.event.Level;

import java.time.ZonedDateTime;
Expand All @@ -26,4 +27,6 @@ ArrayListTotal<LogEntry> find(
);

LogEntry save(LogEntry log);

Integer purge(Execution execution);
}
89 changes: 86 additions & 3 deletions core/src/main/java/io/kestra/core/services/ExecutionService.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,30 +8,48 @@
import io.kestra.core.models.flows.State;
import io.kestra.core.models.hierarchies.GraphCluster;
import io.kestra.core.models.tasks.Task;
import io.kestra.core.repositories.ExecutionRepositoryInterface;
import io.kestra.core.repositories.FlowRepositoryInterface;
import io.kestra.core.repositories.LogRepositoryInterface;
import io.kestra.core.storages.StorageInterface;
import io.kestra.core.tasks.flows.Worker;
import io.kestra.core.utils.IdUtils;
import io.micronaut.context.ApplicationContext;
import io.micronaut.core.annotation.Nullable;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
import lombok.Builder;
import lombok.Getter;
import lombok.experimental.SuperBuilder;

import java.io.IOException;
import java.net.URI;
import java.time.ZonedDateTime;
import java.util.*;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;

import static io.kestra.core.utils.Rethrow.throwFunction;
import static io.kestra.core.utils.Rethrow.throwPredicate;

@Singleton
public class ExecutionService {
@Inject
ApplicationContext applicationContext;
private ApplicationContext applicationContext;

@Inject
private FlowRepositoryInterface flowRepositoryInterface;

@Inject
private StorageInterface storageInterface;

@Inject
private ExecutionRepositoryInterface executionRepository;

@Inject
private LogRepositoryInterface logRepository;

public Execution restart(final Execution execution, @Nullable Integer revision) throws Exception {
if (!execution.getState().isTerninated()) {
throw new IllegalStateException("Execution must be terminated to be restarted, " +
Expand Down Expand Up @@ -195,6 +213,71 @@ public Execution markAs(final Execution execution, String taskRunId, State.Type
.withState(State.Type.RESTARTED);
}

public PurgeResult purge(
Boolean purgeExecution,
Boolean purgeLog,
Boolean purgeStorage,
@Nullable String namespace,
@Nullable String flowId,
@Nullable ZonedDateTime endDate,
@Nullable List<State.Type> state
) throws IOException {
PurgeResult purgeResult = this.executionRepository
.find(
null,
namespace,
flowId,
null,
endDate,
state
)
.map(execution -> {
PurgeResult.PurgeResultBuilder<?, ?> builder = PurgeResult.builder();

if (purgeExecution) {
builder.executionsCount(this.executionRepository.purge(execution));
}

if (purgeLog) {
builder.logsCount(this.logRepository.purge(execution));
}

if (purgeStorage) {
builder.storagesCount(storageInterface.deleteByPrefix(URI.create("/" + storageInterface.executionPrefix(
execution))).size());
}

return (PurgeResult) builder.build();
})
.reduce((a, b) -> a
.toBuilder()
.executionsCount(a.getExecutionsCount() + b.getExecutionsCount())
.logsCount(a.getLogsCount() + b.getLogsCount())
.storagesCount(a.getStoragesCount() + b.getStoragesCount())
.build()
)
.blockingGet();

if (purgeResult != null) {
return purgeResult;
}

return PurgeResult.builder().build();
}

@Getter
@SuperBuilder(toBuilder = true)
public static class PurgeResult {
@Builder.Default
private int executionsCount = 0;

@Builder.Default
private int logsCount = 0;

@Builder.Default
private int storagesCount = 0;
}

private Set<String> removeWorkerTask(Flow flow, Execution execution, Set<String> taskRunToRestart, Map<String, String> mappingTaskRunId) throws InternalException {
Set<String> workerTaskRunId = taskRunToRestart
.stream()
Expand Down
12 changes: 12 additions & 0 deletions core/src/main/java/io/kestra/core/storages/StorageInterface.java
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,18 @@ default String executionPrefix(Flow flow, Execution execution) {
);
}

default String executionPrefix(Execution execution) {
return String.join(
"/",
Arrays.asList(
execution.getNamespace().replace(".", "/"),
Slugify.of(execution.getFlowId()),
"executions",
execution.getId()
)
);
}

default String executionPrefix(TaskRun taskRun) {
return String.join(
"/",
Expand Down
128 changes: 128 additions & 0 deletions core/src/main/java/io/kestra/core/tasks/storages/Purge.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
package io.kestra.core.tasks.storages;

import io.kestra.core.models.annotations.Example;
import io.kestra.core.models.annotations.Plugin;
import io.kestra.core.models.annotations.PluginProperty;
import io.kestra.core.models.flows.State;
import io.kestra.core.models.tasks.RunnableTask;
import io.kestra.core.models.tasks.Task;
import io.kestra.core.runners.RunContext;
import io.kestra.core.services.ExecutionService;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.*;
import lombok.experimental.SuperBuilder;

import java.time.ZonedDateTime;
import java.util.List;

@SuperBuilder
@ToString
@EqualsAndHashCode
@Getter
@NoArgsConstructor
@Schema(
title = "Purge execution, logs or storage files."
)
@Plugin(
examples = {
@Example(
code = {
"endDate: \"{{ now() | dateAdd(-1, 'MONTH') }}\"",
"states: ",
" - KILLED",
" - FAILED",
" - WARNING",
" - SUCCESS"
}
)
}
)
public class Purge extends Task implements RunnableTask<Purge.Output> {
@Schema(
title = "Namespace to purge or namespace for a flow",
description = "If `flowId` isn't provide, this is a namespace prefix, else the namespace of flow"
)
@PluginProperty(dynamic = true)
private String namespace;

@Schema(
title = "The flow id to purge",
description = "You need to provide the `namespace` properties if you want to purge a flow"
)
@PluginProperty(dynamic = true)
private String flowId;

@Schema(
title = "The max date to purge",
description = "All date after this date will be purged."
)
@PluginProperty(dynamic = true)
private String endDate;

@Schema(
title = "The state of the execution that can be purged."
)
@PluginProperty(dynamic = false)
private List<State.Type> states;

@Schema(
title = "Purge execution from repository"
)
@PluginProperty(dynamic = false)
@Builder.Default
private boolean purgeExecution = true;

@Schema(
title = "Purge log from repository"
)
@PluginProperty(dynamic = false)
@Builder.Default
private boolean purgeLog = true;

@Schema(
title = "Purge file from internal storage"
)
@PluginProperty(dynamic = false)
@Builder.Default
private boolean purgeStorage = true;

@Override
public Purge.Output run(RunContext runContext) throws Exception {
ExecutionService executionService = runContext.getApplicationContext().getBean(ExecutionService.class);

ExecutionService.PurgeResult purgeResult = executionService.purge(
purgeExecution,
purgeLog,
purgeStorage,
namespace,
flowId,
ZonedDateTime.parse(runContext.render(endDate)),
states
);

return Output.builder()
.executionsCount(purgeResult.getExecutionsCount())
.logsCount(purgeResult.getLogsCount())
.storagesCount(purgeResult.getStoragesCount())
.build();
}

@SuperBuilder(toBuilder = true)
@Getter
public static class Output implements io.kestra.core.models.tasks.Output {
@Schema(
title = "The count of executions deleted"
)
private int executionsCount;

@Schema(
title = "The count of logs deleted"
)
private int logsCount;

@Schema(
title = "The count of storage deleted"
)
private int storagesCount;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,32 @@ protected void findById() {
});
}

@Test
protected void purge() {
executionRepository.save(ExecutionFixture.EXECUTION_1);

Optional<Execution> full = executionRepository.findById(ExecutionFixture.EXECUTION_1.getId());
assertThat(full.isPresent(), is(true));

executionRepository.purge(ExecutionFixture.EXECUTION_1);

full = executionRepository.findById(ExecutionFixture.EXECUTION_1.getId());
assertThat(full.isPresent(), is(false));
}

@Test
protected void delete() {
executionRepository.save(ExecutionFixture.EXECUTION_1);

Optional<Execution> full = executionRepository.findById(ExecutionFixture.EXECUTION_1.getId());
assertThat(full.isPresent(), is(true));

executionRepository.delete(ExecutionFixture.EXECUTION_1);

full = executionRepository.findById(ExecutionFixture.EXECUTION_1.getId());
assertThat(full.isPresent(), is(false));
}

@Test
protected void mappingConflict() {
executionRepository.save(ExecutionFixture.EXECUTION_2);
Expand Down
Loading

0 comments on commit 8372709

Please sign in to comment.