Skip to content

Commit

Permalink
feat(core): allow to reset the outputs of a taskrun (#2018)
Browse files Browse the repository at this point in the history
  • Loading branch information
loicmathieu authored Sep 15, 2023
1 parent 1b65448 commit 52d4bf2
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 2 deletions.
18 changes: 16 additions & 2 deletions core/src/main/java/io/kestra/core/models/executions/TaskRun.java
Original file line number Diff line number Diff line change
@@ -1,18 +1,25 @@
package io.kestra.core.models.executions;

import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Value;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.ToString;
import lombok.With;
import io.kestra.core.models.flows.State;
import io.kestra.core.models.tasks.ResolvedTask;
import io.kestra.core.utils.IdUtils;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import javax.validation.constraints.NotNull;

@Value
@ToString
@EqualsAndHashCode
@AllArgsConstructor
@Getter
@Builder(toBuilder = true)
public class TaskRun {
@NotNull
Expand Down Expand Up @@ -43,6 +50,13 @@ public class TaskRun {
@NotNull
State state;

public void destroyOutputs() {
// DANGER ZONE: this method is only used to deals with issues with messages too big that must be stripped down
// to avoid crashing the platform. Don't use it for anything else.
this.outputs = Collections.emptyMap();
this.state = this.state.withState(State.Type.FAILED);
}

public TaskRun withState(State.Type state) {
return new TaskRun(
this.id,
Expand Down
5 changes: 5 additions & 0 deletions core/src/main/java/io/kestra/core/runners/Worker.java
Original file line number Diff line number Diff line change
Expand Up @@ -383,6 +383,11 @@ private WorkerTaskResult run(WorkerTask workerTask, Boolean cleanUp) throws Queu
.withState(State.Type.FAILED)
);
WorkerTaskResult workerTaskResult = new WorkerTaskResult(finalWorkerTask, dynamicWorkerResults);
RunContext runContext = workerTask
.getRunContext()
.forWorker(this.applicationContext, workerTask);

runContext.logger().error("Exception while trying to emit the worker task result to the queue", e);
this.workerTaskResultQueue.emit(workerTaskResult);
return workerTaskResult;
} finally {
Expand Down

0 comments on commit 52d4bf2

Please sign in to comment.