Skip to content
This repository has been archived by the owner on Dec 13, 2023. It is now read-only.

Commit

Permalink
Failed task names (#3185)
Browse files Browse the repository at this point in the history
* Add support for tracking failed task names with workflow execution

* Fix tests

* Undo inadvertent variable name change
  • Loading branch information
peterlau authored Aug 18, 2022
1 parent 8b62e35 commit 33062f3
Show file tree
Hide file tree
Showing 12 changed files with 127 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,9 @@ public boolean isSuccessful() {
@ProtoField(id = 24)
private long lastRetriedTime;

@ProtoField(id = 25)
private Set<String> failedTaskNames = new HashSet<>();

public Workflow() {}

/**
Expand Down Expand Up @@ -323,6 +326,14 @@ public void setFailedReferenceTaskNames(Set<String> failedReferenceTaskNames) {
this.failedReferenceTaskNames = failedReferenceTaskNames;
}

public Set<String> getFailedTaskNames() {
return failedTaskNames;
}

public void setFailedTaskNames(Set<String> failedTaskNames) {
this.failedTaskNames = failedTaskNames;
}

public WorkflowDef getWorkflowDefinition() {
return workflowDefinition;
}
Expand Down Expand Up @@ -484,6 +495,7 @@ public Workflow copy() {
copy.setLastRetriedTime(lastRetriedTime);
copy.setTaskToDomain(taskToDomain);
copy.setFailedReferenceTaskNames(failedReferenceTaskNames);
copy.setFailedTaskNames(failedTaskNames);
copy.setExternalInputPayloadStoragePath(externalInputPayloadStoragePath);
copy.setExternalOutputPayloadStoragePath(externalOutputPayloadStoragePath);
return copy;
Expand Down Expand Up @@ -532,6 +544,7 @@ && getStatus() == workflow.getStatus()
&& Objects.equals(getTaskToDomain(), workflow.getTaskToDomain())
&& Objects.equals(
getFailedReferenceTaskNames(), workflow.getFailedReferenceTaskNames())
&& Objects.equals(getFailedTaskNames(), workflow.getFailedTaskNames())
&& Objects.equals(
getExternalInputPayloadStoragePath(),
workflow.getExternalInputPayloadStoragePath())
Expand Down Expand Up @@ -563,6 +576,7 @@ public int hashCode() {
getEvent(),
getTaskToDomain(),
getFailedReferenceTaskNames(),
getFailedTaskNames(),
getWorkflowDefinition(),
getExternalInputPayloadStoragePath(),
getExternalOutputPayloadStoragePath(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@

import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.HashSet;
import java.util.Objects;
import java.util.Set;
import java.util.TimeZone;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -83,6 +85,9 @@ public class WorkflowSummary {
@ProtoField(id = 17)
private int priority;

@ProtoField(id = 18)
private Set<String> failedTaskNames = new HashSet<>();

public WorkflowSummary() {}

public WorkflowSummary(Workflow workflow) {
Expand Down Expand Up @@ -118,6 +123,7 @@ public WorkflowSummary(Workflow workflow) {
this.event = workflow.getEvent();
this.failedReferenceTaskNames =
workflow.getFailedReferenceTaskNames().stream().collect(Collectors.joining(","));
this.failedTaskNames = workflow.getFailedTaskNames();
if (StringUtils.isNotBlank(workflow.getExternalInputPayloadStoragePath())) {
this.externalInputPayloadStoragePath = workflow.getExternalInputPayloadStoragePath();
}
Expand Down Expand Up @@ -240,6 +246,14 @@ public void setFailedReferenceTaskNames(String failedReferenceTaskNames) {
this.failedReferenceTaskNames = failedReferenceTaskNames;
}

public Set<String> getFailedTaskNames() {
return failedTaskNames;
}

public void setFailedTaskNames(Set<String> failedTaskNames) {
this.failedTaskNames = failedTaskNames;
}

public void setWorkflowType(String workflowType) {
this.workflowType = workflowType;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -837,17 +837,26 @@ WorkflowModel completeWorkflow(WorkflowModel workflow) {
workflow.setStatus(WorkflowModel.Status.COMPLETED);

// update the failed reference task names
List<TaskModel> failedTasks =
workflow.getTasks().stream()
.filter(
t ->
FAILED.equals(t.getStatus())
|| FAILED_WITH_TERMINAL_ERROR.equals(t.getStatus()))
.collect(Collectors.toList());

workflow.getFailedReferenceTaskNames()
.addAll(
workflow.getTasks().stream()
.filter(
t ->
FAILED.equals(t.getStatus())
|| FAILED_WITH_TERMINAL_ERROR.equals(
t.getStatus()))
failedTasks.stream()
.map(TaskModel::getReferenceTaskName)
.collect(Collectors.toSet()));

workflow.getFailedTaskNames()
.addAll(
failedTasks.stream()
.map(TaskModel::getTaskDefName)
.collect(Collectors.toSet()));

executionDAOFacade.updateWorkflow(workflow);
LOGGER.debug("Completed workflow execution for {}", workflow.getWorkflowId());
workflowStatusListener.onWorkflowCompletedIfEnabled(workflow);
Expand Down Expand Up @@ -907,17 +916,27 @@ public WorkflowModel terminateWorkflow(
}

// update the failed reference task names
List<TaskModel> failedTasks =
workflow.getTasks().stream()
.filter(
t ->
FAILED.equals(t.getStatus())
|| FAILED_WITH_TERMINAL_ERROR.equals(
t.getStatus()))
.collect(Collectors.toList());

workflow.getFailedReferenceTaskNames()
.addAll(
workflow.getTasks().stream()
.filter(
t ->
FAILED.equals(t.getStatus())
|| FAILED_WITH_TERMINAL_ERROR.equals(
t.getStatus()))
failedTasks.stream()
.map(TaskModel::getReferenceTaskName)
.collect(Collectors.toSet()));

workflow.getFailedTaskNames()
.addAll(
failedTasks.stream()
.map(TaskModel::getTaskDefName)
.collect(Collectors.toSet()));

String workflowId = workflow.getWorkflowId();
workflow.setReasonForIncompletion(reason);
executionDAOFacade.updateWorkflow(workflow);
Expand Down Expand Up @@ -1829,6 +1848,8 @@ private boolean rerunWF(
workflow.setReasonForIncompletion(null);
workflow.setFailedTaskId(null);
workflow.setFailedReferenceTaskNames(new HashSet<>());
workflow.setFailedTaskNames(new HashSet<>());

if (correlationId != null) {
workflow.setCorrelationId(correlationId);
}
Expand Down Expand Up @@ -1876,6 +1897,8 @@ private boolean rerunWF(
workflow.setReasonForIncompletion(null);
workflow.setFailedTaskId(null);
workflow.setFailedReferenceTaskNames(new HashSet<>());
workflow.setFailedTaskNames(new HashSet<>());

if (correlationId != null) {
workflow.setCorrelationId(correlationId);
}
Expand Down
13 changes: 13 additions & 0 deletions core/src/main/java/com/netflix/conductor/model/WorkflowModel.java
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,9 @@ public boolean isSuccessful() {
@JsonInclude(JsonInclude.Include.NON_EMPTY)
private Set<String> failedReferenceTaskNames = new HashSet<>();

@JsonInclude(JsonInclude.Include.NON_EMPTY)
private Set<String> failedTaskNames = new HashSet<>();

private WorkflowDef workflowDefinition;

private String externalInputPayloadStoragePath;
Expand Down Expand Up @@ -283,6 +286,14 @@ public void setFailedReferenceTaskNames(Set<String> failedReferenceTaskNames) {
this.failedReferenceTaskNames = failedReferenceTaskNames;
}

public Set<String> getFailedTaskNames() {
return failedTaskNames;
}

public void setFailedTaskNames(Set<String> failedTaskNames) {
this.failedTaskNames = failedTaskNames;
}

public WorkflowDef getWorkflowDefinition() {
return workflowDefinition;
}
Expand Down Expand Up @@ -491,6 +502,7 @@ && getStatus() == that.getStatus()
&& Objects.equals(getEvent(), that.getEvent())
&& Objects.equals(getTaskToDomain(), that.getTaskToDomain())
&& Objects.equals(getFailedReferenceTaskNames(), that.getFailedReferenceTaskNames())
&& Objects.equals(getFailedTaskNames(), that.getFailedTaskNames())
&& Objects.equals(getWorkflowDefinition(), that.getWorkflowDefinition())
&& Objects.equals(
getExternalInputPayloadStoragePath(),
Expand Down Expand Up @@ -523,6 +535,7 @@ public int hashCode() {
getEvent(),
getTaskToDomain(),
getFailedReferenceTaskNames(),
getFailedTaskNames(),
getWorkflowDefinition(),
getExternalInputPayloadStoragePath(),
getExternalOutputPayloadStoragePath(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1277,6 +1277,12 @@ public void testRerunWorkflow() {
add("task1_ref1");
}
});
workflow.setFailedTaskNames(
new HashSet<>() {
{
add("task1");
}
});

TaskModel task_1_1 = new TaskModel();
task_1_1.setTaskId(UUID.randomUUID().toString());
Expand Down Expand Up @@ -1317,6 +1323,7 @@ public void testRerunWorkflow() {
assertEquals(WorkflowModel.Status.RUNNING, workflow.getStatus());
assertNull(workflow.getReasonForIncompletion());
assertEquals(new HashSet<>(), workflow.getFailedReferenceTaskNames());
assertEquals(new HashSet<>(), workflow.getFailedTaskNames());
}

@Test
Expand Down Expand Up @@ -1426,7 +1433,12 @@ public void testRerunWorkflowWithTaskId() {
add("task1_ref1");
}
});

workflow.setFailedTaskNames(
new HashSet<>() {
{
add("task1");
}
});
TaskModel task_1_1 = new TaskModel();
task_1_1.setTaskId(UUID.randomUUID().toString());
task_1_1.setSeq(20);
Expand Down Expand Up @@ -1467,6 +1479,7 @@ public void testRerunWorkflowWithTaskId() {
assertEquals(WorkflowModel.Status.RUNNING, workflow.getStatus());
assertNull(workflow.getReasonForIncompletion());
assertEquals(new HashSet<>(), workflow.getFailedReferenceTaskNames());
assertEquals(new HashSet<>(), workflow.getFailedTaskNames());
}

@Test
Expand Down Expand Up @@ -1509,6 +1522,12 @@ public void testRerunWorkflowWithSyncSystemTaskId() {
add("task2_ref");
}
});
workflow.setFailedTaskNames(
new HashSet<>() {
{
add("task2");
}
});
workflow.getTasks().addAll(Arrays.asList(task1, task2));
// end of setup

Expand All @@ -1526,6 +1545,7 @@ public void testRerunWorkflowWithSyncSystemTaskId() {
assertEquals(WorkflowModel.Status.RUNNING, workflow.getStatus());
assertNull(workflow.getReasonForIncompletion());
assertEquals(new HashSet<>(), workflow.getFailedReferenceTaskNames());
assertEquals(new HashSet<>(), workflow.getFailedTaskNames());
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@
"type": "text",
"index": false
},
"failedTaskNames": {
"type": "text",
"index": true
},
"input": {
"type": "text",
"index": true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import com.netflix.conductor.core.events.queue.Message;
import com.netflix.conductor.es6.utils.TestUtils;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.google.common.collect.ImmutableMap;

import static org.junit.Assert.assertEquals;
Expand Down Expand Up @@ -98,7 +99,7 @@ private boolean doesMappingExist(final String index, final String mappingName) {
}

@Test
public void shouldIndexWorkflow() {
public void shouldIndexWorkflow() throws JsonProcessingException {
WorkflowSummary workflow = TestUtils.loadWorkflowSnapshot(objectMapper, "workflow_summary");
indexDAO.indexWorkflow(workflow);

Expand Down Expand Up @@ -146,7 +147,7 @@ public void shouldAsyncRemoveWorkflow() throws Exception {
}

@Test
public void shouldUpdateWorkflow() {
public void shouldUpdateWorkflow() throws JsonProcessingException {
WorkflowSummary workflow = TestUtils.loadWorkflowSnapshot(objectMapper, "workflow_summary");
indexDAO.indexWorkflow(workflow);

Expand Down Expand Up @@ -332,7 +333,8 @@ private long getWorkflowCount(String workflowName, String status) {
"status=\"" + status + "\" AND workflowType=\"" + workflowName + "\"", "*");
}

private void assertWorkflowSummary(String workflowId, WorkflowSummary summary) {
private void assertWorkflowSummary(String workflowId, WorkflowSummary summary)
throws JsonProcessingException {
assertEquals(summary.getWorkflowType(), indexDAO.get(workflowId, "workflowType"));
assertEquals(String.valueOf(summary.getVersion()), indexDAO.get(workflowId, "version"));
assertEquals(summary.getWorkflowId(), indexDAO.get(workflowId, "workflowId"));
Expand All @@ -353,6 +355,9 @@ private void assertWorkflowSummary(String workflowId, WorkflowSummary summary) {
assertEquals(
summary.getFailedReferenceTaskNames(),
indexDAO.get(workflowId, "failedReferenceTaskNames"));
assertEquals(
summary.getFailedTaskNames(),
objectMapper.readValue(indexDAO.get(workflowId, "failedTaskNames"), Set.class));
}

private <T> List<T> tryFindResults(Supplier<List<T>> searchFunction) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import java.util.Collections;
import java.util.Date;
import java.util.List;
import java.util.Set;
import java.util.TimeZone;
import java.util.UUID;
import java.util.function.Supplier;
Expand All @@ -34,6 +35,7 @@
import com.netflix.conductor.core.events.queue.Message;
import com.netflix.conductor.es6.utils.TestUtils;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.google.common.collect.ImmutableMap;

import static org.junit.Assert.assertEquals;
Expand Down Expand Up @@ -89,7 +91,7 @@ public void assertInitialSetup() throws IOException {
}

@Test
public void shouldIndexWorkflow() {
public void shouldIndexWorkflow() throws JsonProcessingException {
WorkflowSummary workflowSummary =
TestUtils.loadWorkflowSnapshot(objectMapper, "workflow_summary");
indexDAO.indexWorkflow(workflowSummary);
Expand Down Expand Up @@ -143,7 +145,7 @@ public void shouldAsyncRemoveWorkflow() throws Exception {
}

@Test
public void shouldUpdateWorkflow() {
public void shouldUpdateWorkflow() throws JsonProcessingException {
WorkflowSummary workflowSummary =
TestUtils.loadWorkflowSnapshot(objectMapper, "workflow_summary");
indexDAO.indexWorkflow(workflowSummary);
Expand Down Expand Up @@ -330,7 +332,8 @@ private long getWorkflowCount(String workflowName, String status) {
"status=\"" + status + "\" AND workflowType=\"" + workflowName + "\"", "*");
}

private void assertWorkflowSummary(String workflowId, WorkflowSummary summary) {
private void assertWorkflowSummary(String workflowId, WorkflowSummary summary)
throws JsonProcessingException {
assertEquals(summary.getWorkflowType(), indexDAO.get(workflowId, "workflowType"));
assertEquals(String.valueOf(summary.getVersion()), indexDAO.get(workflowId, "version"));
assertEquals(summary.getWorkflowId(), indexDAO.get(workflowId, "workflowId"));
Expand All @@ -351,6 +354,9 @@ private void assertWorkflowSummary(String workflowId, WorkflowSummary summary) {
assertEquals(
summary.getFailedReferenceTaskNames(),
indexDAO.get(workflowId, "failedReferenceTaskNames"));
assertEquals(
summary.getFailedTaskNames(),
objectMapper.readValue(indexDAO.get(workflowId, "failedTaskNames"), Set.class));
}

private <T> List<T> tryFindResults(Supplier<List<T>> searchFunction) {
Expand Down
Loading

0 comments on commit 33062f3

Please sign in to comment.