From 95c863f53437f68134ea94a71efd507d1c2103e3 Mon Sep 17 00:00:00 2001 From: Ludovic DEHON Date: Tue, 8 Mar 2022 17:28:31 +0100 Subject: [PATCH] feat(core): introduce DynamicTask for dynamic task generation for a worker task --- .../io/kestra/core/models/flows/State.java | 9 ++ .../core/models/hierarchies/GraphCluster.java | 12 +-- .../kestra/core/models/tasks/DynamicTask.java | 5 + .../kestra/core/runners/ExecutorService.java | 13 ++- .../io/kestra/core/runners/RunContext.java | 20 +++- .../java/io/kestra/core/runners/Worker.java | 9 +- .../kestra/core/runners/WorkerTaskResult.java | 19 ++++ .../core/services/ExecutionService.java | 95 ++++++++++++------- .../io/kestra/core/tasks/flows/Worker.java | 3 +- .../core/tasks/scripts/AbstractBash.java | 18 ++-- .../main/java/io/kestra/core/utils/Await.java | 4 +- .../io.kestra.core.tasks.flows.Worker.svg | 1 + .../core/runners/ExecutionServiceTest.java | 18 +++- .../controllers/ExecutionControllerTest.java | 4 +- 14 files changed, 164 insertions(+), 66 deletions(-) create mode 100644 core/src/main/java/io/kestra/core/models/tasks/DynamicTask.java create mode 100644 core/src/main/resources/icons/io.kestra.core.tasks.flows.Worker.svg diff --git a/core/src/main/java/io/kestra/core/models/flows/State.java b/core/src/main/java/io/kestra/core/models/flows/State.java index 4f2f002b60..039c7149ff 100644 --- a/core/src/main/java/io/kestra/core/models/flows/State.java +++ b/core/src/main/java/io/kestra/core/models/flows/State.java @@ -52,6 +52,15 @@ public State(Type state, State actual) { this.histories.add(new History(this.current, Instant.now())); } + public static State of(Type state, List histories) { + State result = new State(state); + + result.histories.removeIf(history -> true); + result.histories.addAll(histories); + + return result; + } + public State withState(Type state) { if (this.current == state) { log.warn("Can't change state, already " + current); diff --git a/core/src/main/java/io/kestra/core/models/hierarchies/GraphCluster.java b/core/src/main/java/io/kestra/core/models/hierarchies/GraphCluster.java index 021f4f0b78..9bedd85f48 100644 --- a/core/src/main/java/io/kestra/core/models/hierarchies/GraphCluster.java +++ b/core/src/main/java/io/kestra/core/models/hierarchies/GraphCluster.java @@ -10,13 +10,13 @@ @Getter public class GraphCluster extends AbstractGraphTask { @JsonIgnore - private Graph graph = new Graph<>(); + private final Graph graph = new Graph<>(); @JsonIgnore - private GraphClusterRoot root; + private final GraphClusterRoot root; @JsonIgnore - private GraphClusterEnd end; + private final GraphClusterEnd end; public GraphCluster() { super(); @@ -36,10 +36,4 @@ public GraphCluster(Task task, TaskRun taskRun, List values, RelationTyp graph.addNode(this.root); graph.addNode(this.end); } - - public GraphCluster(GraphCluster graphTask, TaskRun taskRun, List values) { - super(graphTask.getTask(), taskRun, values, graphTask.getRelationType()); - - this.graph = graphTask.graph; - } } diff --git a/core/src/main/java/io/kestra/core/models/tasks/DynamicTask.java b/core/src/main/java/io/kestra/core/models/tasks/DynamicTask.java new file mode 100644 index 0000000000..bb68ea5065 --- /dev/null +++ b/core/src/main/java/io/kestra/core/models/tasks/DynamicTask.java @@ -0,0 +1,5 @@ +package io.kestra.core.models.tasks; + +public interface DynamicTask { + +} diff --git a/core/src/main/java/io/kestra/core/runners/ExecutorService.java b/core/src/main/java/io/kestra/core/runners/ExecutorService.java index dc2fcbdc69..b4421c287b 100644 --- a/core/src/main/java/io/kestra/core/runners/ExecutorService.java +++ b/core/src/main/java/io/kestra/core/runners/ExecutorService.java @@ -8,6 +8,7 @@ import io.kestra.core.models.executions.TaskRun; import io.kestra.core.models.flows.Flow; import io.kestra.core.models.flows.State; +import io.kestra.core.models.tasks.DynamicTask; import io.kestra.core.models.tasks.FlowableTask; import io.kestra.core.models.tasks.ResolvedTask; import io.kestra.core.models.tasks.Task; @@ -603,6 +604,13 @@ private Executor handleFlowTask(final Executor executor) { } public Execution addDynamicTaskRun(Execution execution, Flow flow, WorkerTaskResult workerTaskResult) throws InternalException { + ArrayList taskRuns = new ArrayList<>(execution.getTaskRunList()); + + // declared dynamic tasks + if (workerTaskResult.getDynamicTaskRuns() != null) { + taskRuns.addAll(workerTaskResult.getDynamicTaskRuns()); + } + // if parent, can be a Worker task that generate dynamic tasks if (workerTaskResult.getTaskRun().getParentTaskRunId() != null) { try { @@ -612,15 +620,12 @@ public Execution addDynamicTaskRun(Execution execution, Flow flow, WorkerTaskRes Task parentTask = flow.findTaskByTaskId(parentTaskRun.getTaskId()); if (parentTask instanceof Worker) { - ArrayList taskRuns = new ArrayList<>(execution.getTaskRunList()); taskRuns.add(workerTaskResult.getTaskRun()); - - return execution.withTaskRunList(taskRuns); } } } - return null; + return taskRuns.size() > execution.getTaskRunList().size() ? execution.withTaskRunList(taskRuns) : null; } public boolean canBePurged(final Executor executor) { diff --git a/core/src/main/java/io/kestra/core/runners/RunContext.java b/core/src/main/java/io/kestra/core/runners/RunContext.java index 3a3bc36d09..3da2e4ef55 100644 --- a/core/src/main/java/io/kestra/core/runners/RunContext.java +++ b/core/src/main/java/io/kestra/core/runners/RunContext.java @@ -38,17 +38,21 @@ public class RunContext { private final static ObjectMapper MAPPER = JacksonMapper.ofJson(); - private VariableRenderer variableRenderer; + // Injected private ApplicationContext applicationContext; + private VariableRenderer variableRenderer; private StorageInterface storageInterface; + private String envPrefix; + private MetricRegistry meterRegistry; + private Path tempBasedPath; + private URI storageOutputPrefix; private URI storageExecutionPrefix; - private String envPrefix; private Map variables; private List> metrics = new ArrayList<>(); - private MetricRegistry meterRegistry; private RunContextLogger runContextLogger; - private Path tempBasedPath; + private final List dynamicWorkerTaskResult = new ArrayList<>(); + protected transient Path temporaryDirectory; /** @@ -572,6 +576,14 @@ private String metricPrefix() { return String.join(".", values); } + public void dynamicWorkerResult(List workerTaskResults) { + dynamicWorkerTaskResult.addAll(workerTaskResults); + } + + public List dynamicWorkerResults() { + return dynamicWorkerTaskResult; + } + public synchronized Path tempDir() { return this.tempDir(true); } diff --git a/core/src/main/java/io/kestra/core/runners/Worker.java b/core/src/main/java/io/kestra/core/runners/Worker.java index c71e755555..38247ba619 100644 --- a/core/src/main/java/io/kestra/core/runners/Worker.java +++ b/core/src/main/java/io/kestra/core/runners/Worker.java @@ -229,6 +229,10 @@ private WorkerTaskResult run(WorkerTask workerTask, Boolean cleanUp) throws Queu ) .get(() -> this.runAttempt(current.get())); + // save dynamic WorkerResults since cleanUpTransient will remove them + List dynamicWorkerResults = finalWorkerTask.getRunContext().dynamicWorkerResults(); + + // remove tmp directory if (cleanUp) { finalWorkerTask.getRunContext().cleanup(); } @@ -259,7 +263,7 @@ private WorkerTaskResult run(WorkerTask workerTask, Boolean cleanUp) throws Queu // So we just tryed to failed the status of the worker task, in this case, no log can't be happend, just // changing status must work in order to finish current task (except if we are near the upper bound size). try { - WorkerTaskResult workerTaskResult = new WorkerTaskResult(finalWorkerTask); + WorkerTaskResult workerTaskResult = new WorkerTaskResult(finalWorkerTask, dynamicWorkerResults); this.workerTaskResultQueue.emit(workerTaskResult); return workerTaskResult; } catch (QueueException e) { @@ -267,7 +271,7 @@ private WorkerTaskResult run(WorkerTask workerTask, Boolean cleanUp) throws Queu .withTaskRun(workerTask.getTaskRun() .withState(State.Type.FAILED) ); - WorkerTaskResult workerTaskResult = new WorkerTaskResult(finalWorkerTask); + WorkerTaskResult workerTaskResult = new WorkerTaskResult(finalWorkerTask, dynamicWorkerResults); this.workerTaskResultQueue.emit(workerTaskResult); return workerTaskResult; } finally { @@ -382,7 +386,6 @@ private List addAttempt(WorkerTask workerTask, TaskRunAttempt ta .build(); } - @SuppressWarnings("UnstableApiUsage") public AtomicInteger getMetricRunningCount(WorkerTask workerTask) { String[] tags = this.metricRegistry.tags(workerTask); Arrays.sort(tags); diff --git a/core/src/main/java/io/kestra/core/runners/WorkerTaskResult.java b/core/src/main/java/io/kestra/core/runners/WorkerTaskResult.java index b18f0f4236..2193c42118 100644 --- a/core/src/main/java/io/kestra/core/runners/WorkerTaskResult.java +++ b/core/src/main/java/io/kestra/core/runners/WorkerTaskResult.java @@ -5,6 +5,9 @@ import lombok.Builder; import lombok.Value; +import java.util.ArrayList; +import java.util.List; +import java.util.stream.Collectors; import javax.validation.constraints.NotNull; @Value @@ -14,7 +17,23 @@ public class WorkerTaskResult { @NotNull TaskRun taskRun; + List dynamicTaskRuns; + + public WorkerTaskResult(TaskRun taskRun) { + this.taskRun = taskRun; + this.dynamicTaskRuns = new ArrayList<>(); + } + public WorkerTaskResult(WorkerTask workerTask) { this.taskRun = workerTask.getTaskRun(); + this.dynamicTaskRuns = new ArrayList<>(); + } + + public WorkerTaskResult(WorkerTask workerTask, List dynamicWorkerResults) { + this.taskRun = workerTask.getTaskRun(); + this.dynamicTaskRuns = dynamicWorkerResults + .stream() + .map(WorkerTaskResult::getTaskRun) + .collect(Collectors.toList()); } } diff --git a/core/src/main/java/io/kestra/core/services/ExecutionService.java b/core/src/main/java/io/kestra/core/services/ExecutionService.java index d81ada2d94..d4b0879fec 100644 --- a/core/src/main/java/io/kestra/core/services/ExecutionService.java +++ b/core/src/main/java/io/kestra/core/services/ExecutionService.java @@ -7,18 +7,22 @@ import io.kestra.core.models.flows.Flow; import io.kestra.core.models.flows.State; import io.kestra.core.models.hierarchies.GraphCluster; +import io.kestra.core.models.tasks.Task; import io.kestra.core.repositories.FlowRepositoryInterface; +import io.kestra.core.tasks.flows.Worker; import io.kestra.core.utils.IdUtils; import io.micronaut.context.ApplicationContext; import io.micronaut.core.annotation.Nullable; import java.util.*; +import java.util.function.Predicate; import java.util.stream.Collectors; import java.util.stream.Stream; import jakarta.inject.Inject; import jakarta.inject.Singleton; import static io.kestra.core.utils.Rethrow.throwFunction; +import static io.kestra.core.utils.Rethrow.throwPredicate; @Singleton public class ExecutionService { @@ -37,19 +41,12 @@ public Execution restart(final Execution execution, @Nullable Integer revision) final Flow flow = flowRepositoryInterface.findByExecution(execution); - Set taskRunToRestart = this.taskRunWithAncestors( + Set taskRunToRestart = this.taskRunToRestart( execution, - execution - .getTaskRunList() - .stream() - .filter(taskRun -> taskRun.getState().getCurrent().isFailed()) - .collect(Collectors.toList()) + flow, + taskRun -> taskRun.getState().getCurrent().isFailed() ); - if (taskRunToRestart.size() == 0) { - throw new IllegalArgumentException("No failed task found to restart execution from !"); - } - Map mappingTaskRunId = this.mapTaskRunId(execution, revision == null); final String newExecutionId = revision != null ? IdUtils.create() : null; @@ -66,6 +63,10 @@ public Execution restart(final Execution execution, @Nullable Integer revision) )) .collect(Collectors.toList()); + // Worker task, we need to remove all child in order to be restarted + this.removeWorkerTask(flow, execution, taskRunToRestart, mappingTaskRunId) + .forEach(r -> newTaskRuns.removeIf(taskRun -> taskRun.getId().equals(r))); + // Build and launch new execution Execution newExecution = execution .childExecution( @@ -77,6 +78,25 @@ public Execution restart(final Execution execution, @Nullable Integer revision) return revision != null ? newExecution.withFlowRevision(revision) : newExecution; } + private Set taskRunToRestart(Execution execution, Flow flow, Predicate predicate) throws InternalException { + // Original tasks to be restarted + Set finalTaskRunToRestart = this + .taskRunWithAncestors( + execution, + execution + .getTaskRunList() + .stream() + .filter(predicate) + .collect(Collectors.toList()) + ); + + if (finalTaskRunToRestart.size() == 0) { + throw new IllegalArgumentException("No task found to restart execution from!"); + } + + return finalTaskRunToRestart; + } + public Execution replay(final Execution execution, String taskRunId, @Nullable Integer revision) throws Exception { if (!execution.getState().isTerninated()) { throw new IllegalStateException("Execution must be terminated to be restarted, " + @@ -87,19 +107,12 @@ public Execution replay(final Execution execution, String taskRunId, @Nullable I final Flow flow = flowRepositoryInterface.findByExecution(execution); GraphCluster graphCluster = GraphService.of(flow, execution); - Set taskRunToRestart = this.taskRunWithAncestors( + Set taskRunToRestart = this.taskRunToRestart( execution, - execution - .getTaskRunList() - .stream() - .filter(taskRun -> taskRun.getId().equals(taskRunId)) - .collect(Collectors.toList()) + flow, + taskRun -> taskRun.getId().equals(taskRunId) ); - if (taskRunToRestart.size() == 0) { - throw new IllegalArgumentException("No task found to restart execution from !"); - } - Map mappingTaskRunId = this.mapTaskRunId(execution, false); final String newExecutionId = IdUtils.create(); @@ -116,6 +129,7 @@ public Execution replay(final Execution execution, String taskRunId, @Nullable I )) .collect(Collectors.toList()); + // remove all child for replay task id Set taskRunToRemove = GraphService.successors(graphCluster, List.of(taskRunId)) .stream() .filter(task -> task.getTaskRun() != null) @@ -127,6 +141,10 @@ public Execution replay(final Execution execution, String taskRunId, @Nullable I taskRunToRemove .forEach(r -> newTaskRuns.removeIf(taskRun -> taskRun.getId().equals(r))); + // Worker task, we need to remove all child in order to be restarted + this.removeWorkerTask(flow, execution, taskRunToRestart, mappingTaskRunId) + .forEach(r -> newTaskRuns.removeIf(taskRun -> taskRun.getId().equals(r))); + // Build and launch new execution Execution newExecution = execution.childExecution( newExecutionId, @@ -146,19 +164,12 @@ public Execution markAs(final Execution execution, String taskRunId, State.Type final Flow flow = flowRepositoryInterface.findByExecution(execution); - Set taskRunToRestart = this.taskRunWithAncestors( + Set taskRunToRestart = this.taskRunToRestart( execution, - execution - .getTaskRunList() - .stream() - .filter(taskRun -> taskRun.getId().equals(taskRunId)) - .collect(Collectors.toList()) + flow, + taskRun -> taskRun.getId().equals(taskRunId) ); - if (taskRunToRestart.size() == 0) { - throw new IllegalArgumentException("No task found to restart execution from !"); - } - Execution newExecution = execution; for (String s : taskRunToRestart) { @@ -184,6 +195,26 @@ public Execution markAs(final Execution execution, String taskRunId, State.Type .withState(State.Type.RESTARTED); } + private Set removeWorkerTask(Flow flow, Execution execution, Set taskRunToRestart, Map mappingTaskRunId) throws InternalException { + Set workerTaskRunId = taskRunToRestart + .stream() + .filter(throwPredicate(s -> { + TaskRun taskRun = execution.findTaskRunByTaskRunId(s); + Task task = flow.findTaskByTaskId(taskRun.getTaskId()); + return (task instanceof Worker); + })) + .collect(Collectors.toSet()); + + GraphCluster graphCluster = GraphService.of(flow, execution); + + return GraphService.successors(graphCluster, new ArrayList<>(workerTaskRunId)) + .stream() + .filter(task -> task.getTaskRun() != null) + .filter(s -> !workerTaskRunId.contains(s.getTaskRun().getId())) + .map(s -> mappingTaskRunId.get(s.getTaskRun().getId())) + .collect(Collectors.toSet()); + } + private Set getAncestors(Execution execution, TaskRun taskRun) { return Stream .concat( @@ -215,10 +246,10 @@ private TaskRun mapTaskRun( State.Type newStateType, Boolean toRestart ) throws InternalException { - boolean isFlowable = flow.findTaskByTaskId(originalTaskRun.getTaskId()).isFlowable(); + Task task = flow.findTaskByTaskId(originalTaskRun.getTaskId()); State alterState; - if (!isFlowable) { + if (!task.isFlowable() || task instanceof Worker) { // The current task run is the reference task run, its default state will be newState alterState = originalTaskRun.withState(newStateType).getState(); } diff --git a/core/src/main/java/io/kestra/core/tasks/flows/Worker.java b/core/src/main/java/io/kestra/core/tasks/flows/Worker.java index 54287c5e36..93ba64d2c0 100644 --- a/core/src/main/java/io/kestra/core/tasks/flows/Worker.java +++ b/core/src/main/java/io/kestra/core/tasks/flows/Worker.java @@ -7,6 +7,7 @@ import io.kestra.core.models.executions.NextTaskRun; import io.kestra.core.models.executions.TaskRun; import io.kestra.core.models.flows.State; +import io.kestra.core.models.tasks.DynamicTask; import io.kestra.core.models.tasks.ResolvedTask; import io.kestra.core.models.tasks.Task; import io.kestra.core.runners.RunContext; @@ -58,7 +59,7 @@ ) } ) -public class Worker extends Sequential { +public class Worker extends Sequential implements DynamicTask { @Override public List resolveNexts(RunContext runContext, Execution execution, TaskRun parentTaskRun) throws IllegalVariableEvaluationException { List childTasks = this.childTasks(runContext, parentTaskRun); diff --git a/core/src/main/java/io/kestra/core/tasks/scripts/AbstractBash.java b/core/src/main/java/io/kestra/core/tasks/scripts/AbstractBash.java index 849c0703b5..d848c324fb 100644 --- a/core/src/main/java/io/kestra/core/tasks/scripts/AbstractBash.java +++ b/core/src/main/java/io/kestra/core/tasks/scripts/AbstractBash.java @@ -203,13 +203,7 @@ protected ScriptOutput run(RunContext runContext, Supplier supplier) thr workingDirectory, finalCommandsWithInterpreter(commandAsString), this.finalEnv(), - (inputStream, isStdErr) -> { - AbstractLogThread thread = new LogThread(logger, inputStream, isStdErr, runContext); - thread.setName("bash-log-" + (isStdErr ? "-err" : "-out")); - thread.start(); - - return thread; - } + this.defaultLogSupplier(logger, runContext) ); // upload output files @@ -234,6 +228,16 @@ protected ScriptOutput run(RunContext runContext, Supplier supplier) thr .build(); } + protected LogSupplier defaultLogSupplier(Logger logger, RunContext runContext) { + return (inputStream, isStdErr) -> { + AbstractLogThread thread = new LogThread(logger, inputStream, isStdErr, runContext); + thread.setName("bash-log-" + (isStdErr ? "-err" : "-out")); + thread.start(); + + return thread; + }; + } + protected RunResult run(RunContext runContext, Logger logger, Path workingDirectory, List commandsWithInterpreter, Map env, LogSupplier logSupplier) throws Exception { ScriptRunnerInterface executor; if (this.runner == Runner.DOCKER) { diff --git a/core/src/main/java/io/kestra/core/utils/Await.java b/core/src/main/java/io/kestra/core/utils/Await.java index bea18b5aa2..6824d43bac 100644 --- a/core/src/main/java/io/kestra/core/utils/Await.java +++ b/core/src/main/java/io/kestra/core/utils/Await.java @@ -22,7 +22,7 @@ public static void until(BooleanSupplier condition, Duration sleep) { try { Thread.sleep(sleep.toMillis()); } catch (InterruptedException e) { - throw new RuntimeException("Can't sleep"); + throw new RuntimeException("Can't sleep:" + e.getMessage()); } } } @@ -40,7 +40,7 @@ public static void until(BooleanSupplier condition, Duration sleep, Duration tim try { Thread.sleep(sleep.toMillis()); } catch (InterruptedException e) { - throw new RuntimeException("Can't sleep"); + throw new RuntimeException("Can't sleep:" + e.getMessage()); } } } diff --git a/core/src/main/resources/icons/io.kestra.core.tasks.flows.Worker.svg b/core/src/main/resources/icons/io.kestra.core.tasks.flows.Worker.svg new file mode 100644 index 0000000000..9b5296916b --- /dev/null +++ b/core/src/main/resources/icons/io.kestra.core.tasks.flows.Worker.svg @@ -0,0 +1 @@ + \ No newline at end of file diff --git a/core/src/test/java/io/kestra/core/runners/ExecutionServiceTest.java b/core/src/test/java/io/kestra/core/runners/ExecutionServiceTest.java index 620345b520..c878e8d137 100644 --- a/core/src/test/java/io/kestra/core/runners/ExecutionServiceTest.java +++ b/core/src/test/java/io/kestra/core/runners/ExecutionServiceTest.java @@ -7,14 +7,14 @@ import io.kestra.core.repositories.FlowRepositoryInterface; import io.kestra.core.services.ExecutionService; import io.kestra.core.tasks.debugs.Return; +import jakarta.inject.Inject; import org.junit.jupiter.api.Test; import java.util.List; -import jakarta.inject.Inject; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.*; -import static org.hamcrest.Matchers.not; +import static org.junit.jupiter.api.Assertions.assertThrows; class ExecutionServiceTest extends AbstractMemoryRunnerTest { @Inject @@ -98,6 +98,20 @@ void restartFlowable2() throws Exception { assertThat(restart.getTaskRunList().get(0).getId(), is(restart.getTaskRunList().get(0).getId())); } + @Test + void restartDynamic() throws Exception { + Execution execution = runnerUtils.runOne("io.kestra.tests", "worker", null, (f, e) -> ImmutableMap.of("failed", "true")); + assertThat(execution.getTaskRunList(), hasSize(3)); + assertThat(execution.getState().getCurrent(), is(State.Type.FAILED)); + + Execution restart = executionService.restart(execution, null); + assertThat(restart.getState().getCurrent(), is(State.Type.RESTARTED)); + assertThat(restart.getState().getHistories(), hasSize(4)); + + assertThat(restart.getTaskRunList().get(0).getState().getCurrent(), is(State.Type.RESTARTED)); + assertThat(restart.getTaskRunList().get(0).getState().getHistories(), hasSize(4)); + } + @Test void replaySimple() throws Exception { Execution execution = runnerUtils.runOne("io.kestra.tests", "logs"); diff --git a/webserver/src/test/java/io/kestra/webserver/controllers/ExecutionControllerTest.java b/webserver/src/test/java/io/kestra/webserver/controllers/ExecutionControllerTest.java index be29c9f249..094ee13f72 100644 --- a/webserver/src/test/java/io/kestra/webserver/controllers/ExecutionControllerTest.java +++ b/webserver/src/test/java/io/kestra/webserver/controllers/ExecutionControllerTest.java @@ -191,7 +191,7 @@ void restartFromUnknownTaskId() throws TimeoutException { assertThat(e.getStatus(), is(HttpStatus.UNPROCESSABLE_ENTITY)); assertThat(e.getResponse().getBody(String.class).isPresent(), is(true)); - assertThat(e.getResponse().getBody(String.class).get(), containsString("No task found to restart execution from !")); + assertThat(e.getResponse().getBody(String.class).get(), containsString("No task found")); } @Test @@ -209,7 +209,7 @@ void restartWithNoFailure() throws TimeoutException { assertThat(e.getStatus(), is(HttpStatus.UNPROCESSABLE_ENTITY)); assertThat(e.getResponse().getBody(String.class).isPresent(), is(true)); - assertThat(e.getResponse().getBody(String.class).get(), containsString("No failed task found to restart execution from !")); + assertThat(e.getResponse().getBody(String.class).get(), containsString("No task found to restart")); } @Test