diff --git a/awss3-storage/src/main/java/com/netflix/conductor/s3/storage/S3PayloadStorage.java b/awss3-storage/src/main/java/com/netflix/conductor/s3/storage/S3PayloadStorage.java index 86e57e685d..8ac530ea54 100644 --- a/awss3-storage/src/main/java/com/netflix/conductor/s3/storage/S3PayloadStorage.java +++ b/awss3-storage/src/main/java/com/netflix/conductor/s3/storage/S3PayloadStorage.java @@ -22,7 +22,8 @@ import com.netflix.conductor.common.run.ExternalStorageLocation; import com.netflix.conductor.common.utils.ExternalPayloadStorage; -import com.netflix.conductor.core.exception.ApplicationException; +import com.netflix.conductor.core.exception.NonTransientException; +import com.netflix.conductor.core.exception.TransientException; import com.netflix.conductor.core.utils.IDGenerator; import com.netflix.conductor.s3.config.S3Properties; @@ -104,11 +105,11 @@ public ExternalStorageLocation getLocation( "Error communicating with S3 - operation:%s, payloadType: %s, path: %s", operation, payloadType, path); LOGGER.error(msg, e); - throw new ApplicationException(ApplicationException.Code.BACKEND_ERROR, msg, e); + throw new TransientException(msg, e); } catch (URISyntaxException e) { String msg = "Invalid URI Syntax"; LOGGER.error(msg, e); - throw new ApplicationException(ApplicationException.Code.INTERNAL_ERROR, msg, e); + throw new NonTransientException(msg, e); } } @@ -135,7 +136,7 @@ public void upload(String path, InputStream payload, long payloadSize) { String.format( "Error uploading to S3 - path:%s, payloadSize: %d", path, payloadSize); LOGGER.error(msg, e); - throw new ApplicationException(ApplicationException.Code.BACKEND_ERROR, msg, e); + throw new TransientException(msg, e); } } @@ -154,7 +155,7 @@ public InputStream download(String path) { } catch (SdkClientException e) { String msg = String.format("Error downloading from S3 - path:%s", path); LOGGER.error(msg, e); - throw new ApplicationException(ApplicationException.Code.BACKEND_ERROR, msg, e); + throw new TransientException(msg, e); } } diff --git a/cassandra-persistence/src/main/java/com/netflix/conductor/cassandra/dao/CassandraBaseDAO.java b/cassandra-persistence/src/main/java/com/netflix/conductor/cassandra/dao/CassandraBaseDAO.java index c576be8e91..b327c18fbb 100644 --- a/cassandra-persistence/src/main/java/com/netflix/conductor/cassandra/dao/CassandraBaseDAO.java +++ b/cassandra-persistence/src/main/java/com/netflix/conductor/cassandra/dao/CassandraBaseDAO.java @@ -13,11 +13,13 @@ package com.netflix.conductor.cassandra.dao; import java.io.IOException; +import java.util.UUID; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.netflix.conductor.cassandra.config.CassandraProperties; +import com.netflix.conductor.core.exception.NonTransientException; import com.netflix.conductor.metrics.Monitors; import com.datastax.driver.core.DataType; @@ -110,6 +112,14 @@ public CassandraBaseDAO( init(); } + protected static UUID toUUID(String uuidString, String message) { + try { + return UUID.fromString(uuidString); + } catch (IllegalArgumentException iae) { + throw new IllegalArgumentException(message + " " + uuidString, iae); + } + } + private void init() { try { if (!initialized) { @@ -226,7 +236,7 @@ String toJson(Object value) { try { return objectMapper.writeValueAsString(value); } catch (JsonProcessingException e) { - throw new RuntimeException(e); + throw new NonTransientException("Error serializing to json", e); } } @@ -234,7 +244,7 @@ T readValue(String json, Class clazz) { try { return objectMapper.readValue(json, clazz); } catch (IOException e) { - throw new RuntimeException(e); + throw new NonTransientException("Error de-serializing json", e); } } diff --git a/cassandra-persistence/src/main/java/com/netflix/conductor/cassandra/dao/CassandraEventHandlerDAO.java b/cassandra-persistence/src/main/java/com/netflix/conductor/cassandra/dao/CassandraEventHandlerDAO.java index ce797cea9e..4ce0126046 100644 --- a/cassandra-persistence/src/main/java/com/netflix/conductor/cassandra/dao/CassandraEventHandlerDAO.java +++ b/cassandra-persistence/src/main/java/com/netflix/conductor/cassandra/dao/CassandraEventHandlerDAO.java @@ -28,8 +28,7 @@ import com.netflix.conductor.cassandra.config.CassandraProperties; import com.netflix.conductor.cassandra.util.Statements; import com.netflix.conductor.common.metadata.events.EventHandler; -import com.netflix.conductor.core.exception.ApplicationException; -import com.netflix.conductor.core.exception.ApplicationException.Code; +import com.netflix.conductor.core.exception.TransientException; import com.netflix.conductor.dao.EventHandlerDAO; import com.netflix.conductor.metrics.Monitors; @@ -37,6 +36,7 @@ import com.datastax.driver.core.ResultSet; import com.datastax.driver.core.Row; import com.datastax.driver.core.Session; +import com.datastax.driver.core.exceptions.DriverException; import com.fasterxml.jackson.databind.ObjectMapper; import static com.netflix.conductor.cassandra.util.Constants.EVENT_HANDLER_KEY; @@ -96,7 +96,7 @@ public void removeEventHandler(String name) { Monitors.error(CLASS_NAME, "removeEventHandler"); String errorMsg = String.format("Failed to remove event handler: %s", name); LOGGER.error(errorMsg, e); - throw new ApplicationException(Code.BACKEND_ERROR, errorMsg, e); + throw new TransientException(errorMsg, e); } refreshEventHandlersCache(); } @@ -154,11 +154,11 @@ private List getAllEventHandlersFromDB() { .map(row -> readValue(row.getString(EVENT_HANDLER_KEY), EventHandler.class)) .collect(Collectors.toList()); - } catch (Exception e) { + } catch (DriverException e) { Monitors.error(CLASS_NAME, "getAllEventHandlersFromDB"); String errorMsg = "Failed to get all event handlers"; LOGGER.error(errorMsg, e); - throw new ApplicationException(Code.BACKEND_ERROR, errorMsg, e); + throw new TransientException(errorMsg, e); } } @@ -168,14 +168,14 @@ private void insertOrUpdateEventHandler(EventHandler eventHandler) { session.execute(insertEventHandlerStatement.bind(eventHandler.getName(), handler)); recordCassandraDaoRequests("storeEventHandler"); recordCassandraDaoPayloadSize("storeEventHandler", handler.length(), "n/a", "n/a"); - } catch (Exception e) { + } catch (DriverException e) { Monitors.error(CLASS_NAME, "insertOrUpdateEventHandler"); String errorMsg = String.format( "Error creating/updating event handler: %s/%s", eventHandler.getName(), eventHandler.getEvent()); LOGGER.error(errorMsg, e); - throw new ApplicationException(Code.BACKEND_ERROR, errorMsg, e); + throw new TransientException(errorMsg, e); } refreshEventHandlersCache(); } diff --git a/cassandra-persistence/src/main/java/com/netflix/conductor/cassandra/dao/CassandraExecutionDAO.java b/cassandra-persistence/src/main/java/com/netflix/conductor/cassandra/dao/CassandraExecutionDAO.java index f8184f4b56..ea0c12aef2 100644 --- a/cassandra-persistence/src/main/java/com/netflix/conductor/cassandra/dao/CassandraExecutionDAO.java +++ b/cassandra-persistence/src/main/java/com/netflix/conductor/cassandra/dao/CassandraExecutionDAO.java @@ -23,8 +23,9 @@ import com.netflix.conductor.cassandra.util.Statements; import com.netflix.conductor.common.metadata.events.EventExecution; import com.netflix.conductor.common.metadata.tasks.TaskDef; -import com.netflix.conductor.core.exception.ApplicationException; -import com.netflix.conductor.core.exception.ApplicationException.Code; +import com.netflix.conductor.core.exception.NonTransientException; +import com.netflix.conductor.core.exception.NotFoundException; +import com.netflix.conductor.core.exception.TransientException; import com.netflix.conductor.dao.ConcurrentExecutionLimitDAO; import com.netflix.conductor.dao.ExecutionDAO; import com.netflix.conductor.metrics.Monitors; @@ -32,6 +33,7 @@ import com.netflix.conductor.model.WorkflowModel; import com.datastax.driver.core.*; +import com.datastax.driver.core.exceptions.DriverException; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; @@ -181,6 +183,7 @@ public List getTasks(String taskType, String startKey, int count) { public List createTasks(List tasks) { validateTasks(tasks); String workflowId = tasks.get(0).getWorkflowInstanceId(); + UUID workflowUUID = toUUID(workflowId, "Invalid workflow id"); try { WorkflowMetadata workflowMetadata = getWorkflowMetadata(workflowId); int totalTasks = workflowMetadata.getTotalTasks() + tasks.size(); @@ -192,8 +195,7 @@ public List createTasks(List tasks) { task.setScheduledTime(System.currentTimeMillis()); session.execute( updateTaskLookupStatement.bind( - UUID.fromString(workflowId), - UUID.fromString(task.getTaskId()))); + workflowUUID, toUUID(task.getTaskId(), "Invalid task id"))); }); // update all the tasks in the workflow using batch @@ -203,7 +205,7 @@ public List createTasks(List tasks) { String taskPayload = toJson(task); batchStatement.add( insertTaskStatement.bind( - UUID.fromString(workflowId), + workflowUUID, DEFAULT_SHARD_ID, task.getTaskId(), taskPayload)); @@ -216,25 +218,22 @@ public List createTasks(List tasks) { task.getWorkflowType()); }); batchStatement.add( - updateTotalTasksStatement.bind( - totalTasks, UUID.fromString(workflowId), DEFAULT_SHARD_ID)); + updateTotalTasksStatement.bind(totalTasks, workflowUUID, DEFAULT_SHARD_ID)); session.execute(batchStatement); // update the total tasks and partitions for the workflow session.execute( updateTotalPartitionsStatement.bind( - DEFAULT_TOTAL_PARTITIONS, totalTasks, UUID.fromString(workflowId))); + DEFAULT_TOTAL_PARTITIONS, totalTasks, workflowUUID)); return tasks; - } catch (ApplicationException e) { - throw e; - } catch (Exception e) { + } catch (DriverException e) { Monitors.error(CLASS_NAME, "createTasks"); String errorMsg = String.format( "Error creating %d tasks for workflow: %s", tasks.size(), workflowId); LOGGER.error(errorMsg, e); - throw new ApplicationException(Code.BACKEND_ERROR, errorMsg, e); + throw new TransientException(errorMsg, e); } } @@ -260,14 +259,14 @@ public void updateTask(TaskModel task) { addTaskToLimit(task); } } - } catch (Exception e) { + } catch (DriverException e) { Monitors.error(CLASS_NAME, "updateTask"); String errorMsg = String.format( "Error updating task: %s in workflow: %s", task.getTaskId(), task.getWorkflowInstanceId()); LOGGER.error(errorMsg, e); - throw new ApplicationException(Code.BACKEND_ERROR, errorMsg, e); + throw new TransientException(errorMsg, e); } } @@ -308,14 +307,14 @@ public boolean exceedsLimit(TaskModel task) { Monitors.recordTaskConcurrentExecutionLimited(task.getTaskDefName(), limit); return true; } - } catch (Exception e) { + } catch (DriverException e) { Monitors.error(CLASS_NAME, "exceedsLimit"); String errorMsg = String.format( "Failed to get in progress limit - %s:%s in workflow :%s", task.getTaskDefName(), task.getTaskId(), task.getWorkflowInstanceId()); LOGGER.error(errorMsg, e); - throw new ApplicationException(Code.BACKEND_ERROR, errorMsg); + throw new TransientException(errorMsg); } return false; } @@ -358,13 +357,11 @@ public TaskModel getTask(String taskId) { return task; }) .orElse(null); - } catch (ApplicationException ae) { - throw ae; - } catch (Exception e) { + } catch (DriverException e) { Monitors.error(CLASS_NAME, "getTask"); String errorMsg = String.format("Error getting task by id: %s", taskId); LOGGER.error(errorMsg, e); - throw new ApplicationException(Code.BACKEND_ERROR, errorMsg); + throw new TransientException(errorMsg); } } @@ -412,12 +409,12 @@ public String createWorkflow(WorkflowModel workflow) { workflow.setTasks(tasks); return workflow.getWorkflowId(); - } catch (Exception e) { + } catch (DriverException e) { Monitors.error(CLASS_NAME, "createWorkflow"); String errorMsg = String.format("Error creating workflow: %s", workflow.getWorkflowId()); LOGGER.error(errorMsg, e); - throw new ApplicationException(Code.BACKEND_ERROR, errorMsg, e); + throw new TransientException(errorMsg, e); } } @@ -435,12 +432,12 @@ public String updateWorkflow(WorkflowModel workflow) { payload, UUID.fromString(workflow.getWorkflowId()))); workflow.setTasks(tasks); return workflow.getWorkflowId(); - } catch (Exception e) { + } catch (DriverException e) { Monitors.error(CLASS_NAME, "updateWorkflow"); String errorMsg = String.format("Failed to update workflow: %s", workflow.getWorkflowId()); LOGGER.error(errorMsg, e); - throw new ApplicationException(Code.BACKEND_ERROR, errorMsg); + throw new TransientException(errorMsg); } } @@ -457,11 +454,11 @@ public boolean removeWorkflow(String workflowId) { deleteWorkflowStatement.bind( UUID.fromString(workflowId), DEFAULT_SHARD_ID)); removed = resultSet.wasApplied(); - } catch (Exception e) { + } catch (DriverException e) { Monitors.error(CLASS_NAME, "removeWorkflow"); String errorMsg = String.format("Failed to remove workflow: %s", workflowId); LOGGER.error(errorMsg, e); - throw new ApplicationException(Code.BACKEND_ERROR, errorMsg); + throw new TransientException(errorMsg); } workflow.getTasks().forEach(this::removeTaskLookup); } @@ -495,14 +492,15 @@ public WorkflowModel getWorkflow(String workflowId) { @Override public WorkflowModel getWorkflow(String workflowId, boolean includeTasks) { - WorkflowModel workflow = null; + UUID workflowUUID = toUUID(workflowId, "Invalid workflow id"); try { + WorkflowModel workflow = null; ResultSet resultSet; if (includeTasks) { resultSet = session.execute( selectWorkflowWithTasksStatement.bind( - UUID.fromString(workflowId), DEFAULT_SHARD_ID)); + workflowUUID, DEFAULT_SHARD_ID)); List tasks = new ArrayList<>(); List rows = resultSet.all(); @@ -518,8 +516,7 @@ public WorkflowModel getWorkflow(String workflowId, boolean includeTasks) { TaskModel task = readValue(row.getString(PAYLOAD_KEY), TaskModel.class); tasks.add(task); } else { - throw new ApplicationException( - ApplicationException.Code.INTERNAL_ERROR, + throw new NonTransientException( String.format( "Invalid row with entityKey: %s found in datastore for workflow: %s", entityKey, workflowId)); @@ -532,8 +529,7 @@ public WorkflowModel getWorkflow(String workflowId, boolean includeTasks) { workflow.setTasks(tasks); } } else { - resultSet = - session.execute(selectWorkflowStatement.bind(UUID.fromString(workflowId))); + resultSet = session.execute(selectWorkflowStatement.bind(workflowUUID)); workflow = Optional.ofNullable(resultSet.one()) .map( @@ -549,18 +545,11 @@ public WorkflowModel getWorkflow(String workflowId, boolean includeTasks) { .orElse(null); } return workflow; - } catch (ApplicationException e) { - throw e; - } catch (IllegalArgumentException e) { - Monitors.error(CLASS_NAME, "getWorkflow"); - String errorMsg = String.format("Invalid workflow id: %s", workflowId); - LOGGER.error(errorMsg, e); - throw new ApplicationException(Code.INVALID_INPUT, errorMsg, e); - } catch (Exception e) { + } catch (DriverException e) { Monitors.error(CLASS_NAME, "getWorkflow"); String errorMsg = String.format("Failed to get workflow: %s", workflowId); LOGGER.error(errorMsg, e); - throw new ApplicationException(Code.BACKEND_ERROR, errorMsg); + throw new TransientException(errorMsg); } } @@ -645,14 +634,14 @@ public boolean addEventExecution(EventExecution eventExecution) { eventExecution.getId(), jsonPayload)) .wasApplied(); - } catch (Exception e) { + } catch (DriverException e) { Monitors.error(CLASS_NAME, "addEventExecution"); String errorMsg = String.format( "Failed to add event execution for event: %s, handler: %s", eventExecution.getEvent(), eventExecution.getName()); LOGGER.error(errorMsg, e); - throw new ApplicationException(Code.BACKEND_ERROR, errorMsg); + throw new TransientException(errorMsg); } } @@ -670,14 +659,14 @@ public void updateEventExecution(EventExecution eventExecution) { eventExecution.getMessageId(), eventExecution.getName(), eventExecution.getId())); - } catch (Exception e) { + } catch (DriverException e) { Monitors.error(CLASS_NAME, "updateEventExecution"); String errorMsg = String.format( "Failed to update event execution for event: %s, handler: %s", eventExecution.getEvent(), eventExecution.getName()); LOGGER.error(errorMsg, e); - throw new ApplicationException(Code.BACKEND_ERROR, errorMsg); + throw new TransientException(errorMsg); } } @@ -690,14 +679,14 @@ public void removeEventExecution(EventExecution eventExecution) { eventExecution.getMessageId(), eventExecution.getName(), eventExecution.getId())); - } catch (Exception e) { + } catch (DriverException e) { Monitors.error(CLASS_NAME, "removeEventExecution"); String errorMsg = String.format( "Failed to remove event execution for event: %s, handler: %s", eventExecution.getEvent(), eventExecution.getName()); LOGGER.error(errorMsg, e); - throw new ApplicationException(Code.BACKEND_ERROR, errorMsg); + throw new TransientException(errorMsg); } } @@ -712,13 +701,13 @@ List getEventExecutions( .filter(row -> !row.isNull(PAYLOAD_KEY)) .map(row -> readValue(row.getString(PAYLOAD_KEY), EventExecution.class)) .collect(Collectors.toList()); - } catch (Exception e) { + } catch (DriverException e) { String errorMsg = String.format( "Failed to fetch event executions for event: %s, handler: %s", eventName, eventHandlerName); LOGGER.error(errorMsg, e); - throw new ApplicationException(Code.BACKEND_ERROR, errorMsg); + throw new TransientException(errorMsg); } } @@ -732,14 +721,14 @@ public void addTaskToLimit(TaskModel task) { UUID.fromString(task.getWorkflowInstanceId()), task.getTaskDefName(), UUID.fromString(task.getTaskId()))); - } catch (Exception e) { + } catch (DriverException e) { Monitors.error(CLASS_NAME, "addTaskToLimit"); String errorMsg = String.format( "Error updating taskDefLimit for task - %s:%s in workflow: %s", task.getTaskDefName(), task.getTaskId(), task.getWorkflowInstanceId()); LOGGER.error(errorMsg, e); - throw new ApplicationException(Code.BACKEND_ERROR, errorMsg, e); + throw new TransientException(errorMsg, e); } } @@ -751,14 +740,14 @@ public void removeTaskFromLimit(TaskModel task) { session.execute( deleteTaskDefLimitStatement.bind( task.getTaskDefName(), UUID.fromString(task.getTaskId()))); - } catch (Exception e) { + } catch (DriverException e) { Monitors.error(CLASS_NAME, "removeTaskFromLimit"); String errorMsg = String.format( "Error updating taskDefLimit for task - %s:%s in workflow: %s", task.getTaskDefName(), task.getTaskId(), task.getWorkflowInstanceId()); LOGGER.error(errorMsg, e); - throw new ApplicationException(Code.BACKEND_ERROR, errorMsg, e); + throw new TransientException(errorMsg, e); } } @@ -791,11 +780,11 @@ private boolean removeTask(TaskModel task) { removeTaskFromLimit(task); } return resultSet.wasApplied(); - } catch (Exception e) { + } catch (DriverException e) { Monitors.error(CLASS_NAME, "removeTask"); String errorMsg = String.format("Failed to remove task: %s", task.getTaskId()); LOGGER.error(errorMsg, e); - throw new ApplicationException(Code.BACKEND_ERROR, errorMsg); + throw new TransientException(errorMsg); } } @@ -808,13 +797,11 @@ private void removeTaskLookup(TaskModel task) { removeTaskFromLimit(task); } session.execute(deleteTaskLookupStatement.bind(UUID.fromString(task.getTaskId()))); - } catch (ApplicationException ae) { - // no-op - } catch (Exception e) { + } catch (DriverException e) { Monitors.error(CLASS_NAME, "removeTaskLookup"); String errorMsg = String.format("Failed to remove task lookup: %s", task.getTaskId()); LOGGER.error(errorMsg, e); - throw new ApplicationException(Code.BACKEND_ERROR, errorMsg); + throw new TransientException(errorMsg); } } @@ -838,8 +825,7 @@ void validateTasks(List tasks) { .filter(task -> !workflowId.equals(task.getWorkflowInstanceId())) .findAny(); if (optionalTask.isPresent()) { - throw new ApplicationException( - Code.INTERNAL_ERROR, + throw new NonTransientException( "Tasks of multiple workflows cannot be created/updated simultaneously"); } } @@ -859,31 +845,24 @@ WorkflowMetadata getWorkflowMetadata(String workflowId) { }) .orElseThrow( () -> - new ApplicationException( - Code.NOT_FOUND, - String.format( - "Workflow with id: %s not found in data store", - workflowId))); + new NotFoundException( + "Workflow with id: %s not found in data store", + workflowId)); } @VisibleForTesting String lookupWorkflowIdFromTaskId(String taskId) { + UUID taskUUID = toUUID(taskId, "Invalid task id"); try { - ResultSet resultSet = - session.execute(selectTaskLookupStatement.bind(UUID.fromString(taskId))); + ResultSet resultSet = session.execute(selectTaskLookupStatement.bind(taskUUID)); return Optional.ofNullable(resultSet.one()) .map(row -> row.getUUID(WORKFLOW_ID_KEY).toString()) .orElse(null); - } catch (IllegalArgumentException iae) { - Monitors.error(CLASS_NAME, "lookupWorkflowIdFromTaskId"); - String errorMsg = String.format("Invalid task id: %s", taskId); - LOGGER.error(errorMsg, iae); - throw new ApplicationException(Code.INVALID_INPUT, errorMsg, iae); - } catch (Exception e) { + } catch (DriverException e) { Monitors.error(CLASS_NAME, "lookupWorkflowIdFromTaskId"); String errorMsg = String.format("Failed to lookup workflowId from taskId: %s", taskId); LOGGER.error(errorMsg, e); - throw new ApplicationException(Code.BACKEND_ERROR, errorMsg, e); + throw new TransientException(errorMsg, e); } } } diff --git a/cassandra-persistence/src/main/java/com/netflix/conductor/cassandra/dao/CassandraMetadataDAO.java b/cassandra-persistence/src/main/java/com/netflix/conductor/cassandra/dao/CassandraMetadataDAO.java index 0187034b51..21fe61741b 100644 --- a/cassandra-persistence/src/main/java/com/netflix/conductor/cassandra/dao/CassandraMetadataDAO.java +++ b/cassandra-persistence/src/main/java/com/netflix/conductor/cassandra/dao/CassandraMetadataDAO.java @@ -34,8 +34,8 @@ import com.netflix.conductor.cassandra.util.Statements; import com.netflix.conductor.common.metadata.tasks.TaskDef; import com.netflix.conductor.common.metadata.workflow.WorkflowDef; -import com.netflix.conductor.core.exception.ApplicationException; -import com.netflix.conductor.core.exception.ApplicationException.Code; +import com.netflix.conductor.core.exception.ConflictException; +import com.netflix.conductor.core.exception.TransientException; import com.netflix.conductor.dao.MetadataDAO; import com.netflix.conductor.metrics.Monitors; @@ -43,6 +43,7 @@ import com.datastax.driver.core.ResultSet; import com.datastax.driver.core.Row; import com.datastax.driver.core.Session; +import com.datastax.driver.core.exceptions.DriverException; import com.fasterxml.jackson.databind.ObjectMapper; import static com.netflix.conductor.cassandra.util.Constants.TASK_DEFINITION_KEY; @@ -157,11 +158,11 @@ public void removeTaskDef(String name) { try { recordCassandraDaoRequests("removeTaskDef"); session.execute(deleteTaskDefStatement.bind(name)); - } catch (Exception e) { + } catch (DriverException e) { Monitors.error(CLASS_NAME, "removeTaskDef"); String errorMsg = String.format("Failed to remove task definition: %s", name); LOGGER.error(errorMsg, e); - throw new ApplicationException(Code.BACKEND_ERROR, errorMsg, e); + throw new TransientException(errorMsg, e); } refreshTaskDefsCache(); } @@ -176,11 +177,9 @@ public void createWorkflowDef(WorkflowDef workflowDef) { workflowDef.getVersion(), workflowDefinition)) .wasApplied()) { - throw new ApplicationException( - Code.CONFLICT, - String.format( - "Workflow: %s, version: %s already exists!", - workflowDef.getName(), workflowDef.getVersion())); + throw new ConflictException( + "Workflow: %s, version: %s already exists!", + workflowDef.getName(), workflowDef.getVersion()); } String workflowDefIndex = getWorkflowDefIndexValue(workflowDef.getName(), workflowDef.getVersion()); @@ -190,16 +189,14 @@ public void createWorkflowDef(WorkflowDef workflowDef) { recordCassandraDaoRequests("createWorkflowDef"); recordCassandraDaoPayloadSize( "createWorkflowDef", workflowDefinition.length(), "n/a", workflowDef.getName()); - } catch (ApplicationException ae) { - throw ae; - } catch (Exception e) { + } catch (DriverException e) { Monitors.error(CLASS_NAME, "createWorkflowDef"); String errorMsg = String.format( "Error creating workflow definition: %s/%d", workflowDef.getName(), workflowDef.getVersion()); LOGGER.error(errorMsg, e); - throw new ApplicationException(ApplicationException.Code.BACKEND_ERROR, errorMsg, e); + throw new TransientException(errorMsg, e); } } @@ -218,14 +215,14 @@ public void updateWorkflowDef(WorkflowDef workflowDef) { recordCassandraDaoRequests("updateWorkflowDef"); recordCassandraDaoPayloadSize( "updateWorkflowDef", workflowDefinition.length(), "n/a", workflowDef.getName()); - } catch (Exception e) { + } catch (DriverException e) { Monitors.error(CLASS_NAME, "updateWorkflowDef"); String errorMsg = String.format( "Error updating workflow definition: %s/%d", workflowDef.getName(), workflowDef.getVersion()); LOGGER.error(errorMsg, e); - throw new ApplicationException(ApplicationException.Code.BACKEND_ERROR, errorMsg, e); + throw new TransientException(errorMsg, e); } } @@ -253,11 +250,11 @@ public Optional getWorkflowDef(String name, int version) { WorkflowDef.class)) .orElse(null); return Optional.ofNullable(workflowDef); - } catch (Exception e) { + } catch (DriverException e) { Monitors.error(CLASS_NAME, "getTaskDef"); String errorMsg = String.format("Error fetching workflow def: %s/%d", name, version); LOGGER.error(errorMsg, e); - throw new ApplicationException(Code.BACKEND_ERROR, errorMsg, e); + throw new TransientException(errorMsg, e); } } @@ -268,12 +265,12 @@ public void removeWorkflowDef(String name, Integer version) { session.execute( deleteWorkflowDefIndexStatement.bind( WORKFLOW_DEF_INDEX_KEY, getWorkflowDefIndexValue(name, version))); - } catch (Exception e) { + } catch (DriverException e) { Monitors.error(CLASS_NAME, "removeWorkflowDef"); String errorMsg = String.format("Failed to remove workflow definition: %s/%d", name, version); LOGGER.error(errorMsg, e); - throw new ApplicationException(Code.BACKEND_ERROR, errorMsg, e); + throw new TransientException(errorMsg, e); } } @@ -299,11 +296,11 @@ public List getAllWorkflowDefs() { }) .filter(Objects::nonNull) .collect(Collectors.toList()); - } catch (Exception e) { + } catch (DriverException e) { Monitors.error(CLASS_NAME, "getAllWorkflowDefs"); String errorMsg = "Error retrieving all workflow defs"; LOGGER.error(errorMsg, e); - throw new ApplicationException(Code.BACKEND_ERROR, errorMsg, e); + throw new TransientException(errorMsg, e); } } @@ -330,11 +327,11 @@ private TaskDef getTaskDefFromDB(String name) { return Optional.ofNullable(resultSet.one()) .map(row -> readValue(row.getString(TASK_DEFINITION_KEY), TaskDef.class)) .orElse(null); - } catch (Exception e) { + } catch (DriverException e) { Monitors.error(CLASS_NAME, "getTaskDef"); String errorMsg = String.format("Failed to get task def: %s", name); LOGGER.error(errorMsg, e); - throw new ApplicationException(Code.BACKEND_ERROR, errorMsg, e); + throw new TransientException(errorMsg, e); } } @@ -350,11 +347,11 @@ private List getAllTaskDefsFromDB() { return rows.stream() .map(row -> readValue(row.getString(TASK_DEFINITION_KEY), TaskDef.class)) .collect(Collectors.toList()); - } catch (Exception e) { + } catch (DriverException e) { Monitors.error(CLASS_NAME, "getAllTaskDefs"); String errorMsg = "Failed to get all task defs"; LOGGER.error(errorMsg, e); - throw new ApplicationException(Code.BACKEND_ERROR, errorMsg, e); + throw new TransientException(errorMsg, e); } } @@ -375,11 +372,11 @@ private List getAllWorkflowDefVersions(String name) { row.getString(WORKFLOW_DEFINITION_KEY), WorkflowDef.class)) .collect(Collectors.toList()); - } catch (Exception e) { + } catch (DriverException e) { Monitors.error(CLASS_NAME, "getAllWorkflowDefVersions"); String errorMsg = String.format("Failed to get workflows defs for : %s", name); LOGGER.error(errorMsg, e); - throw new ApplicationException(Code.BACKEND_ERROR, errorMsg, e); + throw new TransientException(errorMsg, e); } } @@ -390,12 +387,12 @@ private String insertOrUpdateTaskDef(TaskDef taskDef) { recordCassandraDaoRequests("storeTaskDef"); recordCassandraDaoPayloadSize( "storeTaskDef", taskDefinition.length(), taskDef.getName(), "n/a"); - } catch (Exception e) { + } catch (DriverException e) { Monitors.error(CLASS_NAME, "insertOrUpdateTaskDef"); String errorMsg = String.format("Error creating/updating task definition: %s", taskDef.getName()); LOGGER.error(errorMsg, e); - throw new ApplicationException(Code.BACKEND_ERROR, errorMsg, e); + throw new TransientException(errorMsg, e); } refreshTaskDefsCache(); return taskDef.getName(); diff --git a/cassandra-persistence/src/test/groovy/com/netflix/conductor/cassandra/dao/CassandraExecutionDAOSpec.groovy b/cassandra-persistence/src/test/groovy/com/netflix/conductor/cassandra/dao/CassandraExecutionDAOSpec.groovy index 8500d9552e..7afd273afb 100644 --- a/cassandra-persistence/src/test/groovy/com/netflix/conductor/cassandra/dao/CassandraExecutionDAOSpec.groovy +++ b/cassandra-persistence/src/test/groovy/com/netflix/conductor/cassandra/dao/CassandraExecutionDAOSpec.groovy @@ -16,7 +16,7 @@ import com.netflix.conductor.common.metadata.events.EventExecution import com.netflix.conductor.common.metadata.tasks.TaskDef import com.netflix.conductor.common.metadata.workflow.WorkflowDef import com.netflix.conductor.common.metadata.workflow.WorkflowTask -import com.netflix.conductor.core.exception.ApplicationException +import com.netflix.conductor.core.exception.NonTransientException import com.netflix.conductor.core.utils.IDGenerator import com.netflix.conductor.model.TaskModel import com.netflix.conductor.model.WorkflowModel @@ -25,7 +25,6 @@ import spock.lang.Subject import static com.netflix.conductor.common.metadata.events.EventExecution.Status.COMPLETED import static com.netflix.conductor.common.metadata.events.EventExecution.Status.IN_PROGRESS -import static com.netflix.conductor.core.exception.ApplicationException.Code.INVALID_INPUT class CassandraExecutionDAOSpec extends CassandraSpec { @@ -60,7 +59,7 @@ class CassandraExecutionDAOSpec extends CassandraSpec { executionDAO.validateTasks(tasks) then: - def ex = thrown(ApplicationException.class) + def ex = thrown(NonTransientException.class) ex.message == "Tasks of multiple workflows cannot be created/updated simultaneously" } @@ -339,15 +338,13 @@ class CassandraExecutionDAOSpec extends CassandraSpec { executionDAO.getTask('invalid_id') then: - def ex = thrown(ApplicationException.class) - ex && ex.code == INVALID_INPUT + thrown(IllegalArgumentException.class) when: 'verify that a non-conforming uuid throws an exception' executionDAO.getWorkflow('invalid_id', true) then: - ex = thrown(ApplicationException.class) - ex && ex.code == INVALID_INPUT + thrown(IllegalArgumentException.class) and: 'verify that a non-existing generated id returns null' executionDAO.getTask(new IDGenerator().generate()) == null diff --git a/core/src/main/java/com/netflix/conductor/core/config/ConductorCoreConfiguration.java b/core/src/main/java/com/netflix/conductor/core/config/ConductorCoreConfiguration.java index eedef68f1a..9b443377e0 100644 --- a/core/src/main/java/com/netflix/conductor/core/config/ConductorCoreConfiguration.java +++ b/core/src/main/java/com/netflix/conductor/core/config/ConductorCoreConfiguration.java @@ -14,7 +14,6 @@ import java.util.List; import java.util.Map; -import java.util.Optional; import java.util.Set; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -29,14 +28,12 @@ import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; -import org.springframework.retry.RetryContext; -import org.springframework.retry.backoff.NoBackOffPolicy; -import org.springframework.retry.policy.SimpleRetryPolicy; import org.springframework.retry.support.RetryTemplate; import com.netflix.conductor.common.metadata.tasks.TaskType; import com.netflix.conductor.common.utils.ExternalPayloadStorage; import com.netflix.conductor.core.events.EventQueueProvider; +import com.netflix.conductor.core.exception.TransientException; import com.netflix.conductor.core.execution.mapper.TaskMapper; import com.netflix.conductor.core.execution.tasks.WorkflowSystemTask; import com.netflix.conductor.core.listener.WorkflowStatusListener; @@ -47,7 +44,6 @@ import static com.netflix.conductor.core.events.EventQueues.EVENT_QUEUE_PROVIDERS_QUALIFIER; import static com.netflix.conductor.core.execution.tasks.SystemTaskRegistry.ASYNC_SYSTEM_TASKS_QUALIFIER; -import static com.netflix.conductor.core.utils.Utils.isTransientException; import static java.util.function.Function.identity; @@ -120,24 +116,10 @@ public Map getEventQueueProviders( @Bean public RetryTemplate onTransientErrorRetryTemplate() { - SimpleRetryPolicy retryPolicy = new CustomRetryPolicy(); - retryPolicy.setMaxAttempts(3); - - RetryTemplate retryTemplate = new RetryTemplate(); - retryTemplate.setRetryPolicy(retryPolicy); - retryTemplate.setBackOffPolicy(new NoBackOffPolicy()); - return retryTemplate; - } - - public static class CustomRetryPolicy extends SimpleRetryPolicy { - - @Override - public boolean canRetry(final RetryContext context) { - final Optional lastThrowable = - Optional.ofNullable(context.getLastThrowable()); - return lastThrowable - .map(throwable -> super.canRetry(context) && isTransientException(throwable)) - .orElseGet(() -> super.canRetry(context)); - } + return RetryTemplate.builder() + .retryOn(TransientException.class) + .maxAttempts(3) + .noBackoff() + .build(); } } diff --git a/core/src/main/java/com/netflix/conductor/core/dal/ExecutionDAOFacade.java b/core/src/main/java/com/netflix/conductor/core/dal/ExecutionDAOFacade.java index d23c4f5a25..a295fed517 100644 --- a/core/src/main/java/com/netflix/conductor/core/dal/ExecutionDAOFacade.java +++ b/core/src/main/java/com/netflix/conductor/core/dal/ExecutionDAOFacade.java @@ -40,9 +40,9 @@ import com.netflix.conductor.common.utils.ExternalPayloadStorage; import com.netflix.conductor.core.config.ConductorProperties; import com.netflix.conductor.core.events.queue.Message; -import com.netflix.conductor.core.exception.ApplicationException; -import com.netflix.conductor.core.exception.ApplicationException.Code; +import com.netflix.conductor.core.exception.NotFoundException; import com.netflix.conductor.core.exception.TerminateWorkflowException; +import com.netflix.conductor.core.exception.TransientException; import com.netflix.conductor.core.utils.ExternalPayloadStorageUtils; import com.netflix.conductor.dao.*; import com.netflix.conductor.metrics.Monitors; @@ -146,11 +146,8 @@ public WorkflowModel getWorkflowModel(String workflowId, boolean includeTasks) { * @param workflowId the id of the workflow to be fetched * @param includeTasks if true, fetches the {@link Task} data in the workflow. * @return the {@link Workflow} object - * @throws ApplicationException if - *
    - *
  • no such {@link Workflow} is found - *
  • parsing the {@link Workflow} object fails - *
+ * @throws NotFoundException no such {@link Workflow} is found. + * @throws TransientException parsing the {@link Workflow} object fails. */ public Workflow getWorkflow(String workflowId, boolean includeTasks) { return getWorkflowModelFromDataStore(workflowId, includeTasks).toWorkflow(); @@ -164,7 +161,7 @@ private WorkflowModel getWorkflowModelFromDataStore(String workflowId, boolean i if (json == null) { String errorMsg = String.format("No such workflow found by id: %s", workflowId); LOGGER.error(errorMsg); - throw new ApplicationException(ApplicationException.Code.NOT_FOUND, errorMsg); + throw new NotFoundException(errorMsg); } try { @@ -175,8 +172,7 @@ private WorkflowModel getWorkflowModelFromDataStore(String workflowId, boolean i } catch (IOException e) { String errorMsg = String.format("Error reading workflow: %s", workflowId); LOGGER.error(errorMsg); - throw new ApplicationException( - ApplicationException.Code.BACKEND_ERROR, errorMsg, e); + throw new TransientException(errorMsg, e); } } return workflow; @@ -204,7 +200,7 @@ public List getWorkflowsByCorrelationId( workflowId -> { try { return getWorkflow(workflowId, includeTasks); - } catch (ApplicationException e) { + } catch (NotFoundException e) { // This might happen when the workflow archival failed and the // workflow was removed from primary datastore LOGGER.error( @@ -340,25 +336,16 @@ public void removeFromPendingWorkflow(String workflowType, String workflowId) { * removal from {@link ExecutionDAO} */ public void removeWorkflow(String workflowId, boolean archiveWorkflow) { - try { - WorkflowModel workflow = getWorkflowModelFromDataStore(workflowId, true); + WorkflowModel workflow = getWorkflowModelFromDataStore(workflowId, true); + try { removeWorkflowIndex(workflow, archiveWorkflow); - // remove workflow from DAO - try { - executionDAO.removeWorkflow(workflowId); - } catch (Exception ex) { - Monitors.recordDaoError("executionDao", "removeWorkflow"); - throw ex; - } - } catch (ApplicationException ae) { - throw ae; - } catch (Exception e) { - throw new ApplicationException( - ApplicationException.Code.BACKEND_ERROR, - "Error removing workflow: " + workflowId, - e); + } catch (JsonProcessingException e) { + throw new TransientException("Workflow can not be serialized to json", e); } + + executionDAO.removeWorkflow(workflowId); + try { queueDAO.remove(DECIDER_QUEUE, workflowId); } catch (Exception e) { @@ -377,8 +364,7 @@ private void removeWorkflowIndex(WorkflowModel workflow, boolean archiveWorkflow new String[] {RAW_JSON_FIELD, ARCHIVED_FIELD}, new Object[] {objectMapper.writeValueAsString(workflow), true}); } else { - throw new ApplicationException( - Code.INVALID_INPUT, + throw new IllegalArgumentException( String.format( "Cannot archive workflow: %s with status: %s", workflow.getWorkflowId(), workflow.getStatus())); @@ -389,29 +375,6 @@ private void removeWorkflowIndex(WorkflowModel workflow, boolean archiveWorkflow } } - public void removeWorkflowWithExpiry( - String workflowId, boolean archiveWorkflow, int ttlSeconds) { - try { - WorkflowModel workflow = getWorkflowModelFromDataStore(workflowId, true); - - removeWorkflowIndex(workflow, archiveWorkflow); - // remove workflow from DAO with TTL - try { - executionDAO.removeWorkflowWithExpiry(workflowId, ttlSeconds); - } catch (Exception ex) { - Monitors.recordDaoError("executionDao", "removeWorkflow"); - throw ex; - } - } catch (ApplicationException ae) { - throw ae; - } catch (Exception e) { - throw new ApplicationException( - ApplicationException.Code.BACKEND_ERROR, - "Error removing workflow: " + workflowId, - e); - } - } - /** * Reset the workflow state by removing from the {@link ExecutionDAO} and removing this workflow * from the {@link IndexDAO}. @@ -419,21 +382,16 @@ public void removeWorkflowWithExpiry( * @param workflowId the workflow id to be reset */ public void resetWorkflow(String workflowId) { + getWorkflowModelFromDataStore(workflowId, true); + executionDAO.removeWorkflow(workflowId); try { - getWorkflowModelFromDataStore(workflowId, true); - executionDAO.removeWorkflow(workflowId); if (properties.isAsyncIndexingEnabled()) { indexDAO.asyncRemoveWorkflow(workflowId); } else { indexDAO.removeWorkflow(workflowId); } - } catch (ApplicationException ae) { - throw ae; } catch (Exception e) { - throw new ApplicationException( - ApplicationException.Code.BACKEND_ERROR, - "Error resetting workflow state: " + workflowId, - e); + throw new TransientException("Error resetting workflow state: " + workflowId, e); } } @@ -490,21 +448,23 @@ public long getInProgressTaskCount(String taskDefName) { * stores it in the {@link IndexDAO}. * * @param taskModel the task to be updated in the data store - * @throws ApplicationException if the dao operations fail + * @throws TransientException if the {@link IndexDAO} or {@link ExecutionDAO} operations fail. + * @throws com.netflix.conductor.core.exception.NonTransientException if the externalization of + * payload fails. */ public void updateTask(TaskModel taskModel) { - try { - if (taskModel.getStatus() != null) { - if (!taskModel.getStatus().isTerminal() - || (taskModel.getStatus().isTerminal() && taskModel.getUpdateTime() == 0)) { - taskModel.setUpdateTime(System.currentTimeMillis()); - } - if (taskModel.getStatus().isTerminal() && taskModel.getEndTime() == 0) { - taskModel.setEndTime(System.currentTimeMillis()); - } + if (taskModel.getStatus() != null) { + if (!taskModel.getStatus().isTerminal() + || (taskModel.getStatus().isTerminal() && taskModel.getUpdateTime() == 0)) { + taskModel.setUpdateTime(System.currentTimeMillis()); } - externalizeTaskData(taskModel); - executionDAO.updateTask(taskModel); + if (taskModel.getStatus().isTerminal() && taskModel.getEndTime() == 0) { + taskModel.setEndTime(System.currentTimeMillis()); + } + } + externalizeTaskData(taskModel); + executionDAO.updateTask(taskModel); + try { /* * Indexing a task for every update adds a lot of volume. That is ok but if async indexing * is enabled and tasks are stored in memory until a block has completed, we would lose a lot @@ -523,7 +483,7 @@ public void updateTask(TaskModel taskModel) { "Error updating task: %s in workflow: %s", taskModel.getTaskId(), taskModel.getWorkflowInstanceId()); LOGGER.error(errorMsg, e); - throw new ApplicationException(ApplicationException.Code.BACKEND_ERROR, errorMsg, e); + throw new TransientException(errorMsg, e); } } diff --git a/core/src/main/java/com/netflix/conductor/core/events/DefaultEventProcessor.java b/core/src/main/java/com/netflix/conductor/core/events/DefaultEventProcessor.java index 19767c4780..13cfae3ff0 100644 --- a/core/src/main/java/com/netflix/conductor/core/events/DefaultEventProcessor.java +++ b/core/src/main/java/com/netflix/conductor/core/events/DefaultEventProcessor.java @@ -111,7 +111,7 @@ public DefaultEventProcessor( public void handle(ObservableQueue queue, Message msg) { List transientFailures = null; - Boolean executionFailed = false; + boolean executionFailed = false; try { if (isEventMessageIndexingEnabled) { executionService.addMessage(queue.getName(), msg); @@ -155,7 +155,7 @@ protected List executeEvent(String event, Message msg) throws Ex String evaluatorType = eventHandler.getEvaluatorType(); // Set default to true so that if condition is not specified, it falls through // to process the event. - Boolean success = true; + boolean success = true; if (StringUtils.isNotEmpty(condition) && evaluators.get(evaluatorType) != null) { Object result = evaluators @@ -267,6 +267,7 @@ protected EventExecution execute(EventExecution eventExecution, Action action, O eventExecution.getMessageId(), payload); + // TODO: Switch to @Retryable annotation on SimpleActionProcessor.execute() Map output = retryTemplate.execute( context -> diff --git a/core/src/main/java/com/netflix/conductor/core/events/queue/DefaultEventQueueProcessor.java b/core/src/main/java/com/netflix/conductor/core/events/queue/DefaultEventQueueProcessor.java index 835d18c3ab..78abf836b8 100644 --- a/core/src/main/java/com/netflix/conductor/core/events/queue/DefaultEventQueueProcessor.java +++ b/core/src/main/java/com/netflix/conductor/core/events/queue/DefaultEventQueueProcessor.java @@ -22,8 +22,7 @@ import com.netflix.conductor.common.metadata.tasks.Task; import com.netflix.conductor.common.metadata.tasks.TaskResult; -import com.netflix.conductor.core.exception.ApplicationException; -import com.netflix.conductor.core.exception.ApplicationException.Code; +import com.netflix.conductor.core.exception.NotFoundException; import com.netflix.conductor.core.execution.WorkflowExecutor; import com.netflix.conductor.model.TaskModel; import com.netflix.conductor.model.TaskModel.Status; @@ -154,14 +153,10 @@ private void startMonitor(Status status, ObservableQueue queue) { } catch (JsonParseException e) { LOGGER.error("Bad message? : {} ", msg, e); queue.ack(Collections.singletonList(msg)); - - } catch (ApplicationException e) { - if (e.getCode().equals(Code.NOT_FOUND)) { - LOGGER.error( - "Workflow ID specified is not valid for this environment"); - queue.ack(Collections.singletonList(msg)); - } - LOGGER.error("Error processing message: {}", msg, e); + } catch (NotFoundException nfe) { + LOGGER.error( + "Workflow ID specified is not valid for this environment"); + queue.ack(Collections.singletonList(msg)); } catch (Exception e) { LOGGER.error("Error processing message: {}", msg, e); } diff --git a/core/src/main/java/com/netflix/conductor/core/exception/ApplicationException.java b/core/src/main/java/com/netflix/conductor/core/exception/ApplicationException.java deleted file mode 100644 index f1a086d403..0000000000 --- a/core/src/main/java/com/netflix/conductor/core/exception/ApplicationException.java +++ /dev/null @@ -1,90 +0,0 @@ -/* - * Copyright 2022 Netflix, Inc. - *

- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - *

- * http://www.apache.org/licenses/LICENSE-2.0 - *

- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on - * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the - * specific language governing permissions and limitations under the License. - */ -package com.netflix.conductor.core.exception; - -import java.io.ByteArrayOutputStream; -import java.io.PrintStream; -import java.util.HashMap; -import java.util.LinkedHashMap; -import java.util.Map; - -public class ApplicationException extends RuntimeException { - - public enum Code { - INVALID_INPUT(400), - INTERNAL_ERROR(500), - NOT_FOUND(404), - CONFLICT(409), - UNAUTHORIZED(403), - BACKEND_ERROR(500); - - private final int statusCode; - - Code(int statusCode) { - this.statusCode = statusCode; - } - - public int getStatusCode() { - return statusCode; - } - } - - private final Code code; - - public boolean isRetryable() { - return this.code == Code.BACKEND_ERROR; - } - - public ApplicationException(String msg, Throwable t) { - this(Code.INTERNAL_ERROR, msg, t); - } - - public ApplicationException(Code code, String msg, Throwable t) { - super(code + " - " + msg, t); - this.code = code; - } - - public ApplicationException(Code code, Throwable t) { - super(code.name(), t); - this.code = code; - } - - public ApplicationException(Code code, String message) { - super(message); - this.code = code; - } - - public int getHttpStatusCode() { - return this.code.getStatusCode(); - } - - public Code getCode() { - return this.code; - } - - public String getTrace() { - ByteArrayOutputStream baos = new ByteArrayOutputStream(); - PrintStream ps = new PrintStream(baos); - this.printStackTrace(ps); - ps.flush(); - return baos.toString(); - } - - public Map toMap() { - HashMap map = new LinkedHashMap<>(); - map.put("code", code.name()); - map.put("message", super.getMessage()); - map.put("retryable", isRetryable()); - return map; - } -} diff --git a/core/src/main/java/com/netflix/conductor/core/exception/ConflictException.java b/core/src/main/java/com/netflix/conductor/core/exception/ConflictException.java new file mode 100644 index 0000000000..7c718ed59c --- /dev/null +++ b/core/src/main/java/com/netflix/conductor/core/exception/ConflictException.java @@ -0,0 +1,28 @@ +/* + * Copyright 2022 Netflix, Inc. + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package com.netflix.conductor.core.exception; + +public class ConflictException extends RuntimeException { + + public ConflictException(String message) { + super(message); + } + + public ConflictException(String message, Object... args) { + super(String.format(message, args)); + } + + public ConflictException(String message, Throwable cause) { + super(message, cause); + } +} diff --git a/core/src/main/java/com/netflix/conductor/core/exception/NonTransientException.java b/core/src/main/java/com/netflix/conductor/core/exception/NonTransientException.java new file mode 100644 index 0000000000..4af05633f9 --- /dev/null +++ b/core/src/main/java/com/netflix/conductor/core/exception/NonTransientException.java @@ -0,0 +1,24 @@ +/* + * Copyright 2022 Netflix, Inc. + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package com.netflix.conductor.core.exception; + +public class NonTransientException extends RuntimeException { + + public NonTransientException(String message) { + super(message); + } + + public NonTransientException(String message, Throwable cause) { + super(message, cause); + } +} diff --git a/core/src/main/java/com/netflix/conductor/core/exception/NotFoundException.java b/core/src/main/java/com/netflix/conductor/core/exception/NotFoundException.java new file mode 100644 index 0000000000..a89b7beed9 --- /dev/null +++ b/core/src/main/java/com/netflix/conductor/core/exception/NotFoundException.java @@ -0,0 +1,28 @@ +/* + * Copyright 2022 Netflix, Inc. + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package com.netflix.conductor.core.exception; + +public class NotFoundException extends RuntimeException { + + public NotFoundException(String message) { + super(message); + } + + public NotFoundException(String message, Object... args) { + super(String.format(message, args)); + } + + public NotFoundException(String message, Throwable cause) { + super(message, cause); + } +} diff --git a/core/src/main/java/com/netflix/conductor/core/exception/TransientException.java b/core/src/main/java/com/netflix/conductor/core/exception/TransientException.java new file mode 100644 index 0000000000..87cc9d852e --- /dev/null +++ b/core/src/main/java/com/netflix/conductor/core/exception/TransientException.java @@ -0,0 +1,24 @@ +/* + * Copyright 2022 Netflix, Inc. + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package com.netflix.conductor.core.exception; + +public class TransientException extends RuntimeException { + + public TransientException(String message) { + super(message); + } + + public TransientException(String message, Throwable cause) { + super(message, cause); + } +} diff --git a/core/src/main/java/com/netflix/conductor/core/execution/WorkflowExecutor.java b/core/src/main/java/com/netflix/conductor/core/execution/WorkflowExecutor.java index 63b8dafc6f..b7baee3f84 100644 --- a/core/src/main/java/com/netflix/conductor/core/execution/WorkflowExecutor.java +++ b/core/src/main/java/com/netflix/conductor/core/execution/WorkflowExecutor.java @@ -34,9 +34,11 @@ import com.netflix.conductor.core.WorkflowContext; import com.netflix.conductor.core.config.ConductorProperties; import com.netflix.conductor.core.dal.ExecutionDAOFacade; -import com.netflix.conductor.core.exception.ApplicationException; -import com.netflix.conductor.core.exception.ApplicationException.Code; +import com.netflix.conductor.core.exception.ConflictException; +import com.netflix.conductor.core.exception.NonTransientException; +import com.netflix.conductor.core.exception.NotFoundException; import com.netflix.conductor.core.exception.TerminateWorkflowException; +import com.netflix.conductor.core.exception.TransientException; import com.netflix.conductor.core.execution.tasks.SystemTaskRegistry; import com.netflix.conductor.core.execution.tasks.Terminate; import com.netflix.conductor.core.execution.tasks.WorkflowSystemTask; @@ -53,7 +55,6 @@ import com.netflix.conductor.model.WorkflowModel; import com.netflix.conductor.service.ExecutionLockService; -import static com.netflix.conductor.core.exception.ApplicationException.Code.*; import static com.netflix.conductor.core.utils.Utils.DECIDER_QUEUE; import static com.netflix.conductor.model.TaskModel.Status.*; @@ -120,9 +121,6 @@ public WorkflowExecutor( this.systemTaskRegistry = systemTaskRegistry; } - /** - * @throws ApplicationException - */ public String startWorkflow( String name, Integer version, @@ -133,9 +131,6 @@ public String startWorkflow( name, version, correlationId, input, externalInputPayloadStoragePath, null); } - /** - * @throws ApplicationException - */ public String startWorkflow( String name, Integer version, @@ -153,9 +148,6 @@ public String startWorkflow( null); } - /** - * @throws ApplicationException - */ public String startWorkflow( String name, Integer version, @@ -174,9 +166,6 @@ public String startWorkflow( event); } - /** - * @throws ApplicationException - */ public String startWorkflow( String name, Integer version, @@ -198,9 +187,6 @@ public String startWorkflow( null); } - /** - * @throws ApplicationException - */ public String startWorkflow( String name, Integer version, @@ -220,9 +206,6 @@ public String startWorkflow( taskToDomain); } - /** - * @throws ApplicationException - */ public String startWorkflow( String name, Integer version, @@ -245,9 +228,6 @@ public String startWorkflow( taskToDomain); } - /** - * @throws ApplicationException - */ public String startWorkflow( String name, Integer version, @@ -269,9 +249,6 @@ public String startWorkflow( null); } - /** - * @throws ApplicationException - */ public String startWorkflow( WorkflowDef workflowDefinition, Map workflowInput, @@ -289,9 +266,6 @@ public String startWorkflow( taskToDomain); } - /** - * @throws ApplicationException - */ public String startWorkflow( WorkflowDef workflowDefinition, Map workflowInput, @@ -312,9 +286,6 @@ public String startWorkflow( taskToDomain); } - /** - * @throws ApplicationException - */ public String startWorkflow( String name, Integer version, @@ -338,9 +309,6 @@ public String startWorkflow( taskToDomain); } - /** - * @throws ApplicationException - */ public String startWorkflow( String name, Integer version, @@ -367,9 +335,6 @@ public String startWorkflow( taskToDomain); } - /** - * @throws ApplicationException if validation fails - */ public String startWorkflow( WorkflowDef workflowDefinition, Map workflowInput, @@ -445,8 +410,7 @@ public String startWorkflow( */ private void createWorkflow(WorkflowModel workflow) { if (!executionLockService.acquireLock(workflow.getWorkflowId())) { - throw new ApplicationException( - BACKEND_ERROR, "Error acquiring lock when creating workflow: {}"); + throw new TransientException("Error acquiring lock when creating workflow: {}"); } try { executionDAOFacade.createWorkflow(workflow); @@ -462,7 +426,7 @@ private void createWorkflow(WorkflowModel workflow) { /** * Performs validations for starting a workflow * - * @throws ApplicationException if the validation fails + * @throws IllegalArgumentException if the validation fails. */ private void validateWorkflow( WorkflowDef workflowDef, @@ -473,8 +437,7 @@ private void validateWorkflow( if (workflowInput == null && StringUtils.isBlank(externalStoragePath)) { LOGGER.error( "The input for the workflow '{}' cannot be NULL", workflowDef.getName()); - throw new ApplicationException( - INVALID_INPUT, "NULL input passed when starting workflow"); + throw new IllegalArgumentException("NULL input passed when starting workflow"); } } catch (Exception e) { Monitors.recordWorkflowStartError( @@ -485,13 +448,13 @@ private void validateWorkflow( /** * @param workflowId the id of the workflow for which task callbacks are to be reset - * @throws ApplicationException if the workflow is in terminal state + * @throws ConflictException if the workflow is in terminal state */ public void resetCallbacksForWorkflow(String workflowId) { WorkflowModel workflow = executionDAOFacade.getWorkflowModel(workflowId, true); if (workflow.getStatus().isTerminal()) { - throw new ApplicationException( - CONFLICT, "Workflow is in terminal state. Status =" + workflow.getStatus()); + throw new ConflictException( + "Workflow is in terminal state. Status = %s", workflow.getStatus()); } // Get SIMPLE tasks in SCHEDULED state that have callbackAfterSeconds > 0 and set the @@ -520,8 +483,8 @@ public String rerun(RerunWorkflowRequest request) { request.getTaskInput(), request.getWorkflowInput(), request.getCorrelationId())) { - throw new ApplicationException( - INVALID_INPUT, "Task " + request.getReRunFromTaskId() + " not found"); + throw new IllegalArgumentException( + "Task " + request.getReRunFromTaskId() + " not found"); } return request.getReRunFromWorkflowId(); } @@ -530,12 +493,9 @@ public String rerun(RerunWorkflowRequest request) { * @param workflowId the id of the workflow to be restarted * @param useLatestDefinitions if true, use the latest workflow and task definitions upon * restart - * @throws ApplicationException in the following cases: - *

    - *
  • Workflow is not in a terminal state - *
  • Workflow definition is not found - *
  • Workflow is deemed non-restartable as per workflow definition - *
+ * @throws ConflictException Workflow is not in a terminal state. + * @throws NotFoundException Workflow definition is not found or Workflow is deemed + * non-restartable as per workflow definition. */ public void restart(String workflowId, boolean useLatestDefinitions) { WorkflowModel workflow = executionDAOFacade.getWorkflowModel(workflowId, true); @@ -544,7 +504,7 @@ public void restart(String workflowId, boolean useLatestDefinitions) { String.format( "Workflow: %s is not in terminal state, unable to restart.", workflow); LOGGER.error(errorMsg); - throw new ApplicationException(CONFLICT, errorMsg); + throw new ConflictException(errorMsg); } WorkflowDef workflowDef; @@ -554,11 +514,9 @@ public void restart(String workflowId, boolean useLatestDefinitions) { .getLatestWorkflowDef(workflow.getWorkflowName()) .orElseThrow( () -> - new ApplicationException( - NOT_FOUND, - String.format( - "Unable to find latest definition for %s", - workflowId))); + new NotFoundException( + "Unable to find latest definition for %s", + workflowId)); workflow.setWorkflowDefinition(workflowDef); } else { workflowDef = @@ -571,11 +529,9 @@ public void restart(String workflowId, boolean useLatestDefinitions) { workflow.getWorkflowVersion()) .orElseThrow( () -> - new ApplicationException( - NOT_FOUND, - String.format( - "Unable to find definition for %s", - workflowId)))); + new NotFoundException( + "Unable to find definition for %s", + workflowId))); } if (!workflowDef.isRestartable() @@ -584,8 +540,7 @@ public void restart(String workflowId, boolean useLatestDefinitions) { WorkflowModel.Status .COMPLETED)) { // Can only restart non-completed workflows // when the configuration is set to false - throw new ApplicationException( - CONFLICT, String.format("Workflow: %s is non-restartable", workflow)); + throw new NotFoundException("Workflow: %s is non-restartable", workflow); } // Reset the workflow in the primary datastore and remove from indexer; then re-create it @@ -627,11 +582,11 @@ public void restart(String workflowId, boolean useLatestDefinitions) { public void retry(String workflowId, boolean resumeSubworkflowTasks) { WorkflowModel workflow = executionDAOFacade.getWorkflowModel(workflowId, true); if (!workflow.getStatus().isTerminal()) { - throw new ApplicationException( - CONFLICT, "Workflow is still running. status=" + workflow.getStatus()); + throw new NotFoundException( + "Workflow is still running. status=%s", workflow.getStatus()); } if (workflow.getTasks().isEmpty()) { - throw new ApplicationException(CONFLICT, "Workflow has not started yet"); + throw new ConflictException("Workflow has not started yet"); } if (resumeSubworkflowTasks) { @@ -725,8 +680,7 @@ private void retry(WorkflowModel workflow) { // it may not have any unsuccessful tasks that can be retried if (retriableMap.values().size() == 0 && workflow.getStatus() != WorkflowModel.Status.TIMED_OUT) { - throw new ApplicationException( - CONFLICT, + throw new ConflictException( "There are no retryable tasks! Use restart if you want to attempt entire workflow execution again."); } @@ -867,7 +821,7 @@ private void endExecution(WorkflowModel workflow) { /** * @param workflow the workflow to be completed - * @throws ApplicationException if workflow is not in terminal state + * @throws ConflictException if workflow is already in terminal state. */ @VisibleForTesting WorkflowModel completeWorkflow(WorkflowModel workflow) { @@ -885,7 +839,7 @@ WorkflowModel completeWorkflow(WorkflowModel workflow) { String msg = "Workflow is already in terminal state. Current status: " + workflow.getStatus(); - throw new ApplicationException(CONFLICT, msg); + throw new ConflictException(msg); } // FIXME Backwards compatibility for legacy workflows already running. @@ -935,7 +889,7 @@ WorkflowModel completeWorkflow(WorkflowModel workflow) { public void terminateWorkflow(String workflowId, String reason) { WorkflowModel workflow = executionDAOFacade.getWorkflowModel(workflowId, true); if (WorkflowModel.Status.COMPLETED.equals(workflow.getStatus())) { - throw new ApplicationException(CONFLICT, "Cannot terminate a COMPLETED workflow."); + throw new ConflictException("Cannot terminate a COMPLETED workflow."); } workflow.setStatus(WorkflowModel.Status.TERMINATED); terminateWorkflow(workflow, reason, null); @@ -1062,8 +1016,7 @@ public WorkflowModel terminateWorkflow( List erroredTasks = cancelNonTerminalTasks(workflow); if (!erroredTasks.isEmpty()) { - throw new ApplicationException( - Code.INTERNAL_ERROR, + throw new NonTransientException( String.format( "Error canceling system tasks: %s", String.join(",", erroredTasks))); @@ -1076,13 +1029,13 @@ public WorkflowModel terminateWorkflow( } /** - * @param taskResult the task result to be updated - * @throws ApplicationException + * @param taskResult the task result to be updated. + * @throws IllegalArgumentException if the {@link TaskResult} is null. + * @throws NotFoundException if the Task is not found. */ public void updateTask(TaskResult taskResult) { if (taskResult == null) { - throw new ApplicationException( - ApplicationException.Code.INVALID_INPUT, "Task object is null"); + throw new IllegalArgumentException("Task object is null"); } String workflowId = taskResult.getWorkflowInstanceId(); @@ -1099,10 +1052,9 @@ public void updateTask(TaskResult taskResult) { Optional.ofNullable(executionDAOFacade.getTaskModel(taskResult.getTaskId())) .orElseThrow( () -> - new ApplicationException( - ApplicationException.Code.NOT_FOUND, - "No such task found by id: " - + taskResult.getTaskId())); + new NotFoundException( + "No such task found by id: %s", + taskResult.getTaskId())); LOGGER.debug("Task: {} belonging to Workflow {} being updated", task, workflowInstance); @@ -1209,7 +1161,7 @@ public void updateTask(TaskResult taskResult) { LOGGER.error(errorMsg, e); Monitors.recordTaskQueueOpError( task.getTaskType(), workflowInstance.getWorkflowName()); - throw new ApplicationException(ApplicationException.Code.BACKEND_ERROR, e); + throw new TransientException(errorMsg, e); } break; default: @@ -1226,7 +1178,7 @@ public void updateTask(TaskResult taskResult) { task.getTaskId(), workflowId); LOGGER.error(errorMsg, e); Monitors.recordTaskUpdateError(task.getTaskType(), workflowInstance.getWorkflowName()); - throw new ApplicationException(ApplicationException.Code.BACKEND_ERROR, e); + throw new TransientException(errorMsg, e); } taskResult.getLogs().forEach(taskExecLog -> taskExecLog.setTaskId(task.getTaskId())); @@ -1286,7 +1238,6 @@ private boolean _decide(String workflowId) { /** * @param workflowId ID of the workflow to evaluate the state for * @return true if the workflow has completed (success or failed), false otherwise. - * @throws ApplicationException If there was an error - caller should retry in this case. */ public boolean decide(String workflowId) { if (!executionLockService.acquireLock(workflowId)) { @@ -1399,8 +1350,7 @@ private Optional findChangedSubWorkflowTask(WorkflowModel workflow) { workflow.getWorkflowVersion()) .orElseThrow( () -> - new ApplicationException( - BACKEND_ERROR, + new TransientException( "Workflow Definition is not found"))); if (workflowDef.containsType(TaskType.TASK_TYPE_SUB_WORKFLOW) || workflow.getWorkflowDefinition() @@ -1478,7 +1428,7 @@ List dedupAndAddTasks(WorkflowModel workflow, List tasks) } /** - * @throws ApplicationException if the workflow cannot be paused + * @throws ConflictException if the workflow is in terminal state. */ public void pauseWorkflow(String workflowId) { try { @@ -1486,9 +1436,9 @@ public void pauseWorkflow(String workflowId) { WorkflowModel.Status status = WorkflowModel.Status.PAUSED; WorkflowModel workflow = executionDAOFacade.getWorkflowModel(workflowId, false); if (workflow.getStatus().isTerminal()) { - throw new ApplicationException( - CONFLICT, - "Workflow id " + workflowId + " has ended, status cannot be updated."); + throw new ConflictException( + "Workflow %s has ended, status cannot be updated.", + workflow.toShortString()); } if (workflow.getStatus().equals(status)) { return; // Already paused! @@ -1743,8 +1693,8 @@ boolean scheduleTask(WorkflowModel workflow, List tasks) { for (TaskModel task : systemTasks) { WorkflowSystemTask workflowSystemTask = systemTaskRegistry.get(task.getTaskType()); if (workflowSystemTask == null) { - throw new ApplicationException( - NOT_FOUND, "No system task found by name " + task.getTaskType()); + throw new NotFoundException( + "No system task found by name %s", task.getTaskType()); } if (task.getStatus() != null && !task.getStatus().isTerminal() @@ -1761,8 +1711,7 @@ boolean scheduleTask(WorkflowModel workflow, List tasks) { task.getTaskType(), task.getTaskId(), task.getTaskDefName()); - throw new ApplicationException( - ApplicationException.Code.INTERNAL_ERROR, errorMsg, e); + throw new NonTransientException(errorMsg, e); } startedSystemTasks = true; executionDAOFacade.updateTask(task); @@ -1844,7 +1793,7 @@ private boolean rerunWF( String.format( "Workflow: %s is not in terminal state, unable to rerun.", workflow); LOGGER.error(errorMsg); - throw new ApplicationException(CONFLICT, errorMsg); + throw new ConflictException(errorMsg); } updateAndPushParents(workflow, "reran"); diff --git a/core/src/main/java/com/netflix/conductor/core/execution/tasks/Event.java b/core/src/main/java/com/netflix/conductor/core/execution/tasks/Event.java index 5e9a9aef4c..c1c50db6ef 100644 --- a/core/src/main/java/com/netflix/conductor/core/execution/tasks/Event.java +++ b/core/src/main/java/com/netflix/conductor/core/execution/tasks/Event.java @@ -24,7 +24,7 @@ import com.netflix.conductor.core.events.EventQueues; import com.netflix.conductor.core.events.queue.Message; import com.netflix.conductor.core.events.queue.ObservableQueue; -import com.netflix.conductor.core.exception.ApplicationException; +import com.netflix.conductor.core.exception.TransientException; import com.netflix.conductor.core.execution.WorkflowExecutor; import com.netflix.conductor.core.utils.ParametersUtils; import com.netflix.conductor.model.TaskModel; @@ -72,20 +72,10 @@ public void start(WorkflowModel workflow, TaskModel task, WorkflowExecutor workf isAsyncComplete(task) ? TaskModel.Status.IN_PROGRESS : TaskModel.Status.COMPLETED); - } catch (ApplicationException ae) { - if (ae.isRetryable()) { - LOGGER.info( - "A transient backend error happened when task {} tried to publish an event.", - task.getTaskId()); - } else { - task.setStatus(TaskModel.Status.FAILED); - task.setReasonForIncompletion(ae.getMessage()); - LOGGER.error( - "Error executing task: {}, workflow: {}", - task.getTaskId(), - workflow.getWorkflowId(), - ae); - } + } catch (TransientException te) { + LOGGER.info( + "A transient backend error happened when task {} tried to publish an event.", + task.getTaskId()); } catch (JsonProcessingException jpe) { task.setStatus(TaskModel.Status.FAILED); task.setReasonForIncompletion("Error serializing JSON payload: " + jpe.getMessage()); diff --git a/core/src/main/java/com/netflix/conductor/core/execution/tasks/SetVariable.java b/core/src/main/java/com/netflix/conductor/core/execution/tasks/SetVariable.java index 2e84742148..2db2a9a18f 100644 --- a/core/src/main/java/com/netflix/conductor/core/execution/tasks/SetVariable.java +++ b/core/src/main/java/com/netflix/conductor/core/execution/tasks/SetVariable.java @@ -23,7 +23,7 @@ import org.springframework.stereotype.Component; import com.netflix.conductor.core.config.ConductorProperties; -import com.netflix.conductor.core.exception.ApplicationException; +import com.netflix.conductor.core.exception.NonTransientException; import com.netflix.conductor.core.execution.WorkflowExecutor; import com.netflix.conductor.model.TaskModel; import com.netflix.conductor.model.WorkflowModel; @@ -69,7 +69,8 @@ private boolean validateVariablesSize( } catch (IOException e) { LOGGER.error( "Unable to validate variables payload size of workflow: {}", workflowId, e); - throw new ApplicationException(ApplicationException.Code.INTERNAL_ERROR, e); + throw new NonTransientException( + "Unable to validate variables payload size of workflow: " + workflowId, e); } } diff --git a/core/src/main/java/com/netflix/conductor/core/execution/tasks/StartWorkflow.java b/core/src/main/java/com/netflix/conductor/core/execution/tasks/StartWorkflow.java index 59abde2a82..8e89441410 100644 --- a/core/src/main/java/com/netflix/conductor/core/execution/tasks/StartWorkflow.java +++ b/core/src/main/java/com/netflix/conductor/core/execution/tasks/StartWorkflow.java @@ -22,7 +22,7 @@ import org.springframework.stereotype.Component; import com.netflix.conductor.common.metadata.workflow.StartWorkflowRequest; -import com.netflix.conductor.core.exception.ApplicationException; +import com.netflix.conductor.core.exception.TransientException; import com.netflix.conductor.core.execution.WorkflowExecutor; import com.netflix.conductor.model.TaskModel; import com.netflix.conductor.model.WorkflowModel; @@ -67,30 +67,21 @@ public void start( String workflowId = startWorkflow(request, workflowExecutor); taskModel.addOutput(WORKFLOW_ID, workflowId); taskModel.setStatus(COMPLETED); - } catch (ApplicationException ae) { - if (ae.isRetryable()) { - LOGGER.info( - "A transient backend error happened when task {} in {} tried to start workflow {}.", - taskModel.getTaskId(), - workflow.toShortString(), - request.getName()); - } else { - taskModel.setStatus(FAILED); - taskModel.setReasonForIncompletion(ae.getMessage()); - LOGGER.error( - "Error starting workflow: {} from workflow: {}", - request.getName(), - workflow.toShortString(), - ae); - } - } catch (Exception e) { + } catch (TransientException te) { + LOGGER.info( + "A transient backend error happened when task {} in {} tried to start workflow {}.", + taskModel.getTaskId(), + workflow.toShortString(), + request.getName()); + } catch (Exception ae) { + taskModel.setStatus(FAILED); - taskModel.setReasonForIncompletion(e.getMessage()); + taskModel.setReasonForIncompletion(ae.getMessage()); LOGGER.error( "Error starting workflow: {} from workflow: {}", request.getName(), workflow.toShortString(), - e); + ae); } } diff --git a/core/src/main/java/com/netflix/conductor/core/execution/tasks/SubWorkflow.java b/core/src/main/java/com/netflix/conductor/core/execution/tasks/SubWorkflow.java index 75995832d6..baf0844b4c 100644 --- a/core/src/main/java/com/netflix/conductor/core/execution/tasks/SubWorkflow.java +++ b/core/src/main/java/com/netflix/conductor/core/execution/tasks/SubWorkflow.java @@ -20,7 +20,8 @@ import org.springframework.stereotype.Component; import com.netflix.conductor.common.metadata.workflow.WorkflowDef; -import com.netflix.conductor.core.exception.ApplicationException; +import com.netflix.conductor.core.exception.NonTransientException; +import com.netflix.conductor.core.exception.TransientException; import com.netflix.conductor.core.execution.WorkflowExecutor; import com.netflix.conductor.model.TaskModel; import com.netflix.conductor.model.WorkflowModel; @@ -105,30 +106,21 @@ public void start(WorkflowModel workflow, TaskModel task, WorkflowExecutor workf // recursion by the time we update here. WorkflowModel subWorkflow = workflowExecutor.getWorkflow(subWorkflowId, false); updateTaskStatus(subWorkflow, task); - } catch (ApplicationException ae) { - if (ae.isRetryable()) { - LOGGER.info( - "A transient backend error happened when task {} in {} tried to start sub workflow {}.", - task.getTaskId(), - workflow.toShortString(), - name); - } else { - task.setStatus(TaskModel.Status.FAILED); - task.setReasonForIncompletion(ae.getMessage()); - LOGGER.error( - "Error starting sub workflow: {} from workflow: {}", - name, - workflow.toShortString(), - ae); - } - } catch (Exception e) { + } catch (TransientException te) { + LOGGER.info( + "A transient backend error happened when task {} in {} tried to start sub workflow {}.", + task.getTaskId(), + workflow.toShortString(), + name); + } catch (Exception ae) { + task.setStatus(TaskModel.Status.FAILED); - task.setReasonForIncompletion(e.getMessage()); + task.setReasonForIncompletion(ae.getMessage()); LOGGER.error( "Error starting sub workflow: {} from workflow: {}", name, workflow.toShortString(), - e); + ae); } } @@ -204,8 +196,7 @@ private void updateTaskStatus(WorkflowModel subworkflow, TaskModel task) { task.setStatus(TaskModel.Status.TIMED_OUT); break; default: - throw new ApplicationException( - ApplicationException.Code.INTERNAL_ERROR, + throw new NonTransientException( "Subworkflow status does not conform to relevant task status."); } diff --git a/core/src/main/java/com/netflix/conductor/core/metadata/MetadataMapperService.java b/core/src/main/java/com/netflix/conductor/core/metadata/MetadataMapperService.java index d632853200..7a825adac8 100644 --- a/core/src/main/java/com/netflix/conductor/core/metadata/MetadataMapperService.java +++ b/core/src/main/java/com/netflix/conductor/core/metadata/MetadataMapperService.java @@ -28,7 +28,7 @@ import com.netflix.conductor.common.metadata.workflow.WorkflowDef; import com.netflix.conductor.common.metadata.workflow.WorkflowTask; import com.netflix.conductor.core.WorkflowContext; -import com.netflix.conductor.core.exception.ApplicationException; +import com.netflix.conductor.core.exception.NotFoundException; import com.netflix.conductor.core.exception.TerminateWorkflowException; import com.netflix.conductor.core.utils.Utils; import com.netflix.conductor.dao.MetadataDAO; @@ -69,11 +69,8 @@ public WorkflowDef lookupForWorkflowDefinition(String name, Integer version) { "There is no workflow defined with name {} and version {}", name, version); - return new ApplicationException( - ApplicationException.Code.NOT_FOUND, - String.format( - "No such workflow defined. name=%s, version=%s", - name, version)); + return new NotFoundException( + "No such workflow defined. name=%s, version=%s", name, version); }); } @@ -176,8 +173,7 @@ private void checkNotEmptyDefinitions(WorkflowDef workflowDefinition) { missingTaskDefinitionNames); Monitors.recordWorkflowStartError( workflowDefinition.getName(), WorkflowContext.get().getClientApp()); - throw new ApplicationException( - ApplicationException.Code.INVALID_INPUT, + throw new IllegalArgumentException( "Cannot find the task definitions for the following tasks used in workflow: " + missingTaskDefinitionNames); } diff --git a/core/src/main/java/com/netflix/conductor/core/reconciliation/WorkflowSweeper.java b/core/src/main/java/com/netflix/conductor/core/reconciliation/WorkflowSweeper.java index 2d3c1eaa59..134130049f 100644 --- a/core/src/main/java/com/netflix/conductor/core/reconciliation/WorkflowSweeper.java +++ b/core/src/main/java/com/netflix/conductor/core/reconciliation/WorkflowSweeper.java @@ -23,7 +23,7 @@ import com.netflix.conductor.core.WorkflowContext; import com.netflix.conductor.core.config.ConductorProperties; -import com.netflix.conductor.core.exception.ApplicationException; +import com.netflix.conductor.core.exception.NotFoundException; import com.netflix.conductor.core.execution.WorkflowExecutor; import com.netflix.conductor.dao.QueueDAO; import com.netflix.conductor.metrics.Monitors; @@ -78,15 +78,11 @@ public void sweep(String workflowId) { queueDAO.remove(DECIDER_QUEUE, workflowId); return; } - } catch (ApplicationException e) { - if (e.getCode() == ApplicationException.Code.NOT_FOUND) { - queueDAO.remove(DECIDER_QUEUE, workflowId); - LOGGER.info( - "Workflow NOT found for id:{}. Removed it from decider queue", - workflowId, - e); - return; - } + } catch (NotFoundException nfe) { + queueDAO.remove(DECIDER_QUEUE, workflowId); + LOGGER.info( + "Workflow NOT found for id:{}. Removed it from decider queue", workflowId, nfe); + return; } catch (Exception e) { Monitors.error(CLASS_NAME, "sweep"); LOGGER.error("Error running sweep for " + workflowId, e); diff --git a/core/src/main/java/com/netflix/conductor/core/utils/ExternalPayloadStorageUtils.java b/core/src/main/java/com/netflix/conductor/core/utils/ExternalPayloadStorageUtils.java index 53a2246e26..008c5202de 100644 --- a/core/src/main/java/com/netflix/conductor/core/utils/ExternalPayloadStorageUtils.java +++ b/core/src/main/java/com/netflix/conductor/core/utils/ExternalPayloadStorageUtils.java @@ -30,7 +30,7 @@ import com.netflix.conductor.common.utils.ExternalPayloadStorage; import com.netflix.conductor.common.utils.ExternalPayloadStorage.PayloadType; import com.netflix.conductor.core.config.ConductorProperties; -import com.netflix.conductor.core.exception.ApplicationException; +import com.netflix.conductor.core.exception.NonTransientException; import com.netflix.conductor.core.exception.TerminateWorkflowException; import com.netflix.conductor.metrics.Monitors; import com.netflix.conductor.model.TaskModel; @@ -62,7 +62,7 @@ public ExternalPayloadStorageUtils( * * @param path the relative path of the payload in the {@link ExternalPayloadStorage} * @return the payload object - * @throws ApplicationException in case of JSON parsing errors or download errors + * @throws NonTransientException in case of JSON parsing errors or download errors */ @SuppressWarnings("unchecked") public Map downloadPayload(String path) { @@ -71,7 +71,8 @@ public Map downloadPayload(String path) { IOUtils.toString(inputStream, StandardCharsets.UTF_8), Map.class); } catch (IOException e) { LOGGER.error("Unable to download payload from external storage path: {}", path, e); - throw new ApplicationException(ApplicationException.Code.INTERNAL_ERROR, e); + throw new NonTransientException( + "Unable to download payload from external storage path: " + path, e); } } @@ -81,7 +82,7 @@ public Map downloadPayload(String path) { * @param entity the task or workflow for which the payload is to be verified and uploaded * @param payloadType the {@link PayloadType} of the payload * @param {@link TaskModel} or {@link WorkflowModel} - * @throws ApplicationException in case of JSON parsing errors or upload errors + * @throws NonTransientException in case of JSON parsing errors or upload errors * @throws TerminateWorkflowException if the payload size is bigger than permissible limit as * per {@link ConductorProperties} */ @@ -188,7 +189,8 @@ public void verifyAndUpload(T entity, PayloadType payloadType) { } catch (IOException e) { LOGGER.error( "Unable to upload payload to external storage for workflow: {}", workflowId, e); - throw new ApplicationException(ApplicationException.Code.INTERNAL_ERROR, e); + throw new NonTransientException( + "Unable to upload payload to external storage for workflow: " + workflowId, e); } } diff --git a/core/src/main/java/com/netflix/conductor/core/utils/Utils.java b/core/src/main/java/com/netflix/conductor/core/utils/Utils.java index 537d1137a4..e81b3cf587 100644 --- a/core/src/main/java/com/netflix/conductor/core/utils/Utils.java +++ b/core/src/main/java/com/netflix/conductor/core/utils/Utils.java @@ -18,7 +18,7 @@ import org.apache.commons.lang3.StringUtils; -import com.netflix.conductor.core.exception.ApplicationException; +import com.netflix.conductor.core.exception.TransientException; public class Utils { @@ -56,7 +56,7 @@ public static List convertStringToList(String inputStr) { * * @param condition a boolean expression * @param errorMessage The exception message use if the input condition is not valid - * @throws ApplicationException if input condition is not valid + * @throws IllegalArgumentException if input condition is not valid. */ public static void checkArgument(boolean condition, String errorMessage) { if (!condition) { @@ -64,51 +64,12 @@ public static void checkArgument(boolean condition, String errorMessage) { } } - /** - * This method checks if the collection is null or is empty. - * - * @param collection input of type {@link Collection} - * @param errorMessage The exception message use if the collection is empty or null - * @throws ApplicationException if input Collection is not valid - */ - public static void checkNotNullOrEmpty(Collection collection, String errorMessage) { - if (collection == null || collection.isEmpty()) { - throw new ApplicationException(ApplicationException.Code.INVALID_INPUT, errorMessage); - } - } - - /** - * This method checks if the input map is valid or not. - * - * @param map input of type {@link Map} - * @param errorMessage The exception message use if the map is empty or null - * @throws ApplicationException if input map is not valid - */ - public static void checkNotNullOrEmpty(Map map, String errorMessage) { - if (map == null || map.isEmpty()) { - throw new ApplicationException(ApplicationException.Code.INVALID_INPUT, errorMessage); - } - } - - /** - * This method checks it the input string is null or empty. - * - * @param input input of type {@link String} - * @param errorMessage The exception message use if the string is empty or null - * @throws ApplicationException if input string is not valid - */ - public static void checkNotNullOrEmpty(String input, String errorMessage) { - if (StringUtils.isEmpty(input)) { - throw new ApplicationException(ApplicationException.Code.INVALID_INPUT, errorMessage); - } - } - /** * This method checks if the object is null or empty. * - * @param object input of type {@link Object} - * @param errorMessage The exception message use if the object is empty or null - * @throws ApplicationException if input object is not valid + * @param object input of type {@link Object}. + * @param errorMessage The exception message use if the object is empty or null. + * @throws NullPointerException if input object is not valid. */ public static void checkNotNull(Object object, String errorMessage) { if (object == null) { @@ -126,10 +87,7 @@ public static void checkNotNull(Object object, String errorMessage) { */ public static boolean isTransientException(Throwable throwable) { if (throwable != null) { - return !((throwable instanceof UnsupportedOperationException) - || (throwable instanceof ApplicationException - && ((ApplicationException) throwable).getCode() - != ApplicationException.Code.BACKEND_ERROR)); + return throwable instanceof TransientException; } return true; } diff --git a/core/src/main/java/com/netflix/conductor/model/TaskModel.java b/core/src/main/java/com/netflix/conductor/model/TaskModel.java index 94cc47b13c..0462a8cee0 100644 --- a/core/src/main/java/com/netflix/conductor/model/TaskModel.java +++ b/core/src/main/java/com/netflix/conductor/model/TaskModel.java @@ -126,8 +126,6 @@ public boolean isRetriable() { private Any outputMessage; - // id 31 is reserved - private int rateLimitPerFrequency; private int rateLimitFrequencyInSeconds; diff --git a/core/src/main/java/com/netflix/conductor/service/ExecutionService.java b/core/src/main/java/com/netflix/conductor/service/ExecutionService.java index 97f95dbee8..bf7f7d731c 100644 --- a/core/src/main/java/com/netflix/conductor/service/ExecutionService.java +++ b/core/src/main/java/com/netflix/conductor/service/ExecutionService.java @@ -45,7 +45,7 @@ import com.netflix.conductor.core.config.ConductorProperties; import com.netflix.conductor.core.dal.ExecutionDAOFacade; import com.netflix.conductor.core.events.queue.Message; -import com.netflix.conductor.core.exception.ApplicationException; +import com.netflix.conductor.core.exception.NotFoundException; import com.netflix.conductor.core.execution.WorkflowExecutor; import com.netflix.conductor.core.execution.tasks.SystemTaskRegistry; import com.netflix.conductor.core.utils.QueueUtils; @@ -109,8 +109,7 @@ public List poll(String taskType, String workerId, int count, int timeoutI public List poll( String taskType, String workerId, String domain, int count, int timeoutInMilliSecond) { if (timeoutInMilliSecond > MAX_POLL_TIMEOUT_MS) { - throw new ApplicationException( - ApplicationException.Code.INVALID_INPUT, + throw new IllegalArgumentException( "Long Poll Timeout value cannot be more than 5 seconds"); } String queueName = QueueUtils.getQueueName(taskType, domain, null, null); @@ -305,9 +304,7 @@ public Integer getTaskQueueSize(String queueName) { public void removeTaskFromQueue(String taskId) { Task task = getTask(taskId); if (task == null) { - throw new ApplicationException( - ApplicationException.Code.NOT_FOUND, - String.format("No such task found by taskId: %s", taskId)); + throw new NotFoundException("No such task found by taskId: %s", taskId); } queueDAO.remove(QueueUtils.getQueueName(task), taskId); } diff --git a/core/src/main/java/com/netflix/conductor/service/MetadataServiceImpl.java b/core/src/main/java/com/netflix/conductor/service/MetadataServiceImpl.java index 24ebd03083..6df42bb04f 100644 --- a/core/src/main/java/com/netflix/conductor/service/MetadataServiceImpl.java +++ b/core/src/main/java/com/netflix/conductor/service/MetadataServiceImpl.java @@ -23,8 +23,7 @@ import com.netflix.conductor.common.metadata.workflow.WorkflowDef; import com.netflix.conductor.core.WorkflowContext; import com.netflix.conductor.core.config.ConductorProperties; -import com.netflix.conductor.core.exception.ApplicationException; -import com.netflix.conductor.core.exception.ApplicationException.Code; +import com.netflix.conductor.core.exception.NotFoundException; import com.netflix.conductor.dao.EventHandlerDAO; import com.netflix.conductor.dao.MetadataDAO; import com.netflix.conductor.validations.ValidationContext; @@ -67,8 +66,7 @@ public void registerTaskDef(List taskDefinitions) { public void updateTaskDef(TaskDef taskDefinition) { TaskDef existing = metadataDAO.getTaskDef(taskDefinition.getName()); if (existing == null) { - throw new ApplicationException( - Code.NOT_FOUND, "No such task by name " + taskDefinition.getName()); + throw new NotFoundException("No such task by name %s", taskDefinition.getName()); } taskDefinition.setUpdatedBy(WorkflowContext.get().getClientApp()); taskDefinition.setUpdateTime(System.currentTimeMillis()); @@ -96,8 +94,7 @@ public List getTaskDefs() { public TaskDef getTaskDef(String taskType) { TaskDef taskDef = metadataDAO.getTaskDef(taskType); if (taskDef == null) { - throw new ApplicationException( - Code.NOT_FOUND, String.format("No such taskType found by name: %s", taskType)); + throw new NotFoundException("No such taskType found by name: %s", taskType); } return taskDef; } @@ -135,11 +132,8 @@ public WorkflowDef getWorkflowDef(String name, Integer version) { return workflowDef.orElseThrow( () -> - new ApplicationException( - Code.NOT_FOUND, - String.format( - "No such workflow found by name: %s, version: %d", - name, version))); + new NotFoundException( + "No such workflow found by name: %s, version: %d", name, version)); } /** @@ -156,8 +150,7 @@ public List getWorkflowDefs() { public void registerWorkflowDef(WorkflowDef workflowDef) { if (workflowDef.getName().contains(":")) { - throw new ApplicationException( - Code.INVALID_INPUT, + throw new IllegalArgumentException( "Workflow name cannot contain the following set of characters: ':'"); } if (workflowDef.getSchemaVersion() < 1 || workflowDef.getSchemaVersion() > 2) { diff --git a/core/src/main/java/com/netflix/conductor/service/TaskServiceImpl.java b/core/src/main/java/com/netflix/conductor/service/TaskServiceImpl.java index 333b2e3469..ad05dc36b4 100644 --- a/core/src/main/java/com/netflix/conductor/service/TaskServiceImpl.java +++ b/core/src/main/java/com/netflix/conductor/service/TaskServiceImpl.java @@ -259,7 +259,11 @@ public Map getTaskQueueSizes(List taskTypes) { public Integer getTaskQueueSize( String taskType, String domain, String isolationGroupId, String executionNamespace) { String queueName = - QueueUtils.getQueueName(taskType, domain, isolationGroupId, executionNamespace); + QueueUtils.getQueueName( + taskType, + StringUtils.trimToNull(domain), + StringUtils.trimToNull(isolationGroupId), + StringUtils.trimToNull(executionNamespace)); return executionService.getTaskQueueSize(queueName); } diff --git a/core/src/main/java/com/netflix/conductor/service/WorkflowServiceImpl.java b/core/src/main/java/com/netflix/conductor/service/WorkflowServiceImpl.java index f1a84a13dd..d8f90bd953 100644 --- a/core/src/main/java/com/netflix/conductor/service/WorkflowServiceImpl.java +++ b/core/src/main/java/com/netflix/conductor/service/WorkflowServiceImpl.java @@ -33,7 +33,7 @@ import com.netflix.conductor.common.run.Workflow; import com.netflix.conductor.common.run.WorkflowSummary; import com.netflix.conductor.common.utils.ExternalPayloadStorage; -import com.netflix.conductor.core.exception.ApplicationException; +import com.netflix.conductor.core.exception.NotFoundException; import com.netflix.conductor.core.execution.WorkflowExecutor; import com.netflix.conductor.core.utils.Utils; @@ -187,10 +187,8 @@ public String startWorkflow( Map input) { WorkflowDef workflowDef = metadataService.getWorkflowDef(name, version); if (workflowDef == null) { - throw new ApplicationException( - ApplicationException.Code.NOT_FOUND, - String.format( - "No such workflow found by name: %s, version: %d", name, version)); + throw new NotFoundException( + "No such workflow found by name: %s, version: %d", name, version); } return workflowExecutor.startWorkflow( workflowDef.getName(), @@ -247,9 +245,7 @@ public Map> getWorkflows( public Workflow getExecutionStatus(String workflowId, boolean includeTasks) { Workflow workflow = executionService.getExecutionStatus(workflowId, includeTasks); if (workflow == null) { - throw new ApplicationException( - ApplicationException.Code.NOT_FOUND, - String.format("Workflow with Id: %s not found.", workflowId)); + throw new NotFoundException("Workflow with id: %s not found.", workflowId); } return workflow; } diff --git a/core/src/test/groovy/com/netflix/conductor/core/execution/tasks/EventSpec.groovy b/core/src/test/groovy/com/netflix/conductor/core/execution/tasks/EventSpec.groovy index a8a4451e64..ad00742e35 100644 --- a/core/src/test/groovy/com/netflix/conductor/core/execution/tasks/EventSpec.groovy +++ b/core/src/test/groovy/com/netflix/conductor/core/execution/tasks/EventSpec.groovy @@ -16,7 +16,8 @@ import com.netflix.conductor.common.metadata.workflow.WorkflowDef import com.netflix.conductor.core.events.EventQueues import com.netflix.conductor.core.events.queue.Message import com.netflix.conductor.core.events.queue.ObservableQueue -import com.netflix.conductor.core.exception.ApplicationException +import com.netflix.conductor.core.exception.NonTransientException +import com.netflix.conductor.core.exception.TransientException import com.netflix.conductor.core.utils.ParametersUtils import com.netflix.conductor.model.TaskModel import com.netflix.conductor.model.WorkflowModel @@ -227,7 +228,7 @@ class EventSpec extends Specification { 1 * parametersUtils.getTaskInputV2(_, workflow, task.taskId, _) >> ['sink': sinkValue] 1 * eventQueues.getQueue(_) >> observableQueue // capture the Message object sent to the publish method. Event.start sends a list with one Message object - 1 * observableQueue.publish(_) >> { throw new ApplicationException(ApplicationException.Code.BACKEND_ERROR, "transient error") } + 1 * observableQueue.publish(_) >> { throw new TransientException("transient error") } } def "publishing to a queue throws a non-retryable ApplicationException"() { @@ -246,7 +247,7 @@ class EventSpec extends Specification { 1 * parametersUtils.getTaskInputV2(_, workflow, task.taskId, _) >> ['sink': sinkValue] 1 * eventQueues.getQueue(_) >> observableQueue // capture the Message object sent to the publish method. Event.start sends a list with one Message object - 1 * observableQueue.publish(_) >> { throw new ApplicationException(ApplicationException.Code.INTERNAL_ERROR, "fatal error") } + 1 * observableQueue.publish(_) >> { throw new NonTransientException("fatal error") } } def "event task fails to convert the payload to json"() { diff --git a/core/src/test/groovy/com/netflix/conductor/core/execution/tasks/StartWorkflowSpec.groovy b/core/src/test/groovy/com/netflix/conductor/core/execution/tasks/StartWorkflowSpec.groovy index 7ef18baae1..dc3c045942 100644 --- a/core/src/test/groovy/com/netflix/conductor/core/execution/tasks/StartWorkflowSpec.groovy +++ b/core/src/test/groovy/com/netflix/conductor/core/execution/tasks/StartWorkflowSpec.groovy @@ -12,12 +12,12 @@ */ package com.netflix.conductor.core.execution.tasks - import javax.validation.ConstraintViolation import javax.validation.Validator import com.netflix.conductor.common.config.ObjectMapperProvider -import com.netflix.conductor.core.exception.ApplicationException +import com.netflix.conductor.core.exception.NotFoundException +import com.netflix.conductor.core.exception.TransientException import com.netflix.conductor.core.execution.WorkflowExecutor import com.netflix.conductor.model.TaskModel import com.netflix.conductor.model.WorkflowModel @@ -91,7 +91,7 @@ class StartWorkflowSpec extends Specification { then: taskModel.status == SCHEDULED - 1 * workflowExecutor.startWorkflow(*_) >> { throw new ApplicationException(ApplicationException.Code.BACKEND_ERROR, "") } + 1 * workflowExecutor.startWorkflow(*_) >> { throw new TransientException("") } } def "WorkflowExecutor throws a non-retryable ApplicationException"() { @@ -101,7 +101,7 @@ class StartWorkflowSpec extends Specification { then: taskModel.status == FAILED taskModel.reasonForIncompletion != null - 1 * workflowExecutor.startWorkflow(*_) >> { throw new ApplicationException(ApplicationException.Code.NOT_FOUND, "") } + 1 * workflowExecutor.startWorkflow(*_) >> { throw new NotFoundException("") } } def "WorkflowExecutor throws a RuntimeException"() { diff --git a/core/src/test/java/com/netflix/conductor/core/events/TestDefaultEventProcessor.java b/core/src/test/java/com/netflix/conductor/core/events/TestDefaultEventProcessor.java index 8558904023..c37d31d26a 100644 --- a/core/src/test/java/com/netflix/conductor/core/events/TestDefaultEventProcessor.java +++ b/core/src/test/java/com/netflix/conductor/core/events/TestDefaultEventProcessor.java @@ -39,7 +39,7 @@ import com.netflix.conductor.core.config.ConductorProperties; import com.netflix.conductor.core.events.queue.Message; import com.netflix.conductor.core.events.queue.ObservableQueue; -import com.netflix.conductor.core.exception.ApplicationException; +import com.netflix.conductor.core.exception.TransientException; import com.netflix.conductor.core.execution.WorkflowExecutor; import com.netflix.conductor.core.execution.evaluators.Evaluator; import com.netflix.conductor.core.execution.evaluators.JavascriptEvaluator; @@ -357,9 +357,7 @@ public void testEventProcessorWithRetriableError() { .thenReturn(Collections.singletonList(eventHandler)); when(executionService.addEventExecution(any())).thenReturn(true); when(actionProcessor.execute(any(), any(), any(), any())) - .thenThrow( - new ApplicationException( - ApplicationException.Code.BACKEND_ERROR, "some retriable error")); + .thenThrow(new TransientException("some retriable error")); DefaultEventProcessor eventProcessor = new DefaultEventProcessor( @@ -396,10 +394,7 @@ public void testEventProcessorWithNonRetriableError() { when(executionService.addEventExecution(any())).thenReturn(true); when(actionProcessor.execute(any(), any(), any(), any())) - .thenThrow( - new ApplicationException( - ApplicationException.Code.INVALID_INPUT, - "some non-retriable error")); + .thenThrow(new IllegalArgumentException("some non-retriable error")); DefaultEventProcessor eventProcessor = new DefaultEventProcessor( @@ -458,9 +453,7 @@ public void testExecuteNonRetriableApplicationException() { (Answer>) invocation -> { executeInvoked.incrementAndGet(); - throw new ApplicationException( - ApplicationException.Code.INVALID_INPUT, - "some non-retriable error"); + throw new IllegalArgumentException("some non-retriable error"); }) .when(actionProcessor) .execute(any(), any(), any(), any()); @@ -496,9 +489,7 @@ public void testExecuteRetriableApplicationException() { (Answer>) invocation -> { executeInvoked.incrementAndGet(); - throw new ApplicationException( - ApplicationException.Code.BACKEND_ERROR, - "some retriable error"); + throw new TransientException("some retriable error"); }) .when(actionProcessor) .execute(any(), any(), any(), any()); diff --git a/core/src/test/java/com/netflix/conductor/core/execution/TestWorkflowExecutor.java b/core/src/test/java/com/netflix/conductor/core/execution/TestWorkflowExecutor.java index 3a9f801e23..ba7ee03762 100644 --- a/core/src/test/java/com/netflix/conductor/core/execution/TestWorkflowExecutor.java +++ b/core/src/test/java/com/netflix/conductor/core/execution/TestWorkflowExecutor.java @@ -41,7 +41,8 @@ import com.netflix.conductor.common.utils.ExternalPayloadStorage; import com.netflix.conductor.core.config.ConductorProperties; import com.netflix.conductor.core.dal.ExecutionDAOFacade; -import com.netflix.conductor.core.exception.ApplicationException; +import com.netflix.conductor.core.exception.ConflictException; +import com.netflix.conductor.core.exception.NotFoundException; import com.netflix.conductor.core.exception.TerminateWorkflowException; import com.netflix.conductor.core.execution.evaluators.Evaluator; import com.netflix.conductor.core.execution.mapper.*; @@ -60,7 +61,6 @@ import com.fasterxml.jackson.databind.ObjectMapper; import static com.netflix.conductor.common.metadata.tasks.TaskType.*; -import static com.netflix.conductor.core.exception.ApplicationException.Code.CONFLICT; import static java.util.Comparator.comparingInt; import static java.util.stream.Collectors.groupingBy; @@ -664,7 +664,7 @@ public void testRestartWorkflow() { assertEquals(workflowDef, argumentCaptor.getAllValues().get(1).getWorkflowDefinition()); } - @Test(expected = ApplicationException.class) + @Test(expected = NotFoundException.class) public void testRetryNonTerminalWorkflow() { WorkflowModel workflow = new WorkflowModel(); workflow.setWorkflowId("testRetryNonTerminalWorkflow"); @@ -674,7 +674,7 @@ public void testRetryNonTerminalWorkflow() { workflowExecutor.retry(workflow.getWorkflowId(), false); } - @Test(expected = ApplicationException.class) + @Test(expected = ConflictException.class) public void testRetryWorkflowNoTasks() { WorkflowModel workflow = new WorkflowModel(); workflow.setWorkflowId("ApplicationException"); @@ -685,7 +685,7 @@ public void testRetryWorkflowNoTasks() { workflowExecutor.retry(workflow.getWorkflowId(), false); } - @Test(expected = ApplicationException.class) + @Test(expected = ConflictException.class) public void testRetryWorkflowNoFailedTasks() { // setup WorkflowModel workflow = new WorkflowModel(); @@ -1243,7 +1243,7 @@ public void testRetryTimedOutWorkflowWithoutFailedTasks() { assertEquals(1, updateTasksCalledCounter.get()); } - @Test(expected = ApplicationException.class) + @Test(expected = ConflictException.class) public void testRerunNonTerminalWorkflow() { WorkflowModel workflow = new WorkflowModel(); workflow.setWorkflowId("testRetryNonTerminalWorkflow"); @@ -1805,7 +1805,7 @@ public void testDedupAndAddTasks() { assertEquals(3, workflow.getTasks().size()); } - @Test(expected = ApplicationException.class) + @Test(expected = ConflictException.class) public void testTerminateCompletedWorkflow() { WorkflowModel workflow = new WorkflowModel(); workflow.setWorkflowId("testTerminateTerminalWorkflow"); @@ -2008,9 +2008,8 @@ public void testPauseWorkflow() { when(executionDAOFacade.getWorkflowModel(workflowId, false)).thenReturn(workflow); try { workflowExecutor.pauseWorkflow(workflowId); - fail("Expected " + ApplicationException.class); - } catch (ApplicationException e) { - assertEquals(e.getCode(), CONFLICT); + fail("Expected " + ConflictException.class); + } catch (ConflictException e) { verify(executionDAOFacade, never()).updateWorkflow(any(WorkflowModel.class)); verify(queueDAO, never()).remove(anyString(), anyString()); } diff --git a/core/src/test/java/com/netflix/conductor/core/execution/tasks/TestSubWorkflow.java b/core/src/test/java/com/netflix/conductor/core/execution/tasks/TestSubWorkflow.java index f2dad62f9a..c777a3fb8e 100644 --- a/core/src/test/java/com/netflix/conductor/core/execution/tasks/TestSubWorkflow.java +++ b/core/src/test/java/com/netflix/conductor/core/execution/tasks/TestSubWorkflow.java @@ -24,7 +24,8 @@ import com.netflix.conductor.common.config.TestObjectMapperConfiguration; import com.netflix.conductor.common.metadata.workflow.WorkflowDef; -import com.netflix.conductor.core.exception.ApplicationException; +import com.netflix.conductor.core.exception.NonTransientException; +import com.netflix.conductor.core.exception.TransientException; import com.netflix.conductor.core.execution.WorkflowExecutor; import com.netflix.conductor.model.TaskModel; import com.netflix.conductor.model.WorkflowModel; @@ -129,9 +130,7 @@ public void testStartSubWorkflowQueueFailure() { any(), eq(null), any())) - .thenThrow( - new ApplicationException( - ApplicationException.Code.BACKEND_ERROR, "QueueDAO failure")); + .thenThrow(new TransientException("QueueDAO failure")); subWorkflow.start(workflowInstance, task, workflowExecutor); assertNull("subWorkflowId should be null", task.getSubWorkflowId()); @@ -165,9 +164,7 @@ public void testStartSubWorkflowStartError() { any(), eq(null), any())) - .thenThrow( - new ApplicationException( - ApplicationException.Code.INTERNAL_ERROR, failureReason)); + .thenThrow(new NonTransientException(failureReason)); subWorkflow.start(workflowInstance, task, workflowExecutor); assertNull("subWorkflowId should be null", task.getSubWorkflowId()); diff --git a/core/src/test/java/com/netflix/conductor/core/metadata/MetadataMapperServiceTest.java b/core/src/test/java/com/netflix/conductor/core/metadata/MetadataMapperServiceTest.java index d6d5799a09..55cd1c7d82 100644 --- a/core/src/test/java/com/netflix/conductor/core/metadata/MetadataMapperServiceTest.java +++ b/core/src/test/java/com/netflix/conductor/core/metadata/MetadataMapperServiceTest.java @@ -33,7 +33,7 @@ import com.netflix.conductor.common.metadata.workflow.SubWorkflowParams; import com.netflix.conductor.common.metadata.workflow.WorkflowDef; import com.netflix.conductor.common.metadata.workflow.WorkflowTask; -import com.netflix.conductor.core.exception.ApplicationException; +import com.netflix.conductor.core.exception.NotFoundException; import com.netflix.conductor.core.exception.TerminateWorkflowException; import com.netflix.conductor.dao.MetadataDAO; @@ -160,12 +160,8 @@ public void testMetadataPopulationMissingDefinitions() { try { metadataMapperService.populateTaskDefinitions(workflowDefinition); - } catch (ApplicationException ae) { - if (ae.getCode() == ApplicationException.Code.INVALID_INPUT) { - fail("Missing task definitions are not set to a default TaskDef"); - } else { - throw ae; - } + } catch (NotFoundException nfe) { + fail("Missing TaskDefinitions are not defaulted"); } } diff --git a/core/src/test/java/com/netflix/conductor/dao/ExecutionDAOTest.java b/core/src/test/java/com/netflix/conductor/dao/ExecutionDAOTest.java index 5133bfa0bc..f996adc0fe 100644 --- a/core/src/test/java/com/netflix/conductor/dao/ExecutionDAOTest.java +++ b/core/src/test/java/com/netflix/conductor/dao/ExecutionDAOTest.java @@ -30,7 +30,6 @@ import com.netflix.conductor.common.metadata.tasks.TaskDef; import com.netflix.conductor.common.metadata.workflow.WorkflowDef; import com.netflix.conductor.common.metadata.workflow.WorkflowTask; -import com.netflix.conductor.core.exception.ApplicationException; import com.netflix.conductor.model.TaskModel; import com.netflix.conductor.model.WorkflowModel; @@ -89,14 +88,14 @@ public void testCreateTaskException() { task.setTaskId(UUID.randomUUID().toString()); task.setTaskDefName("task1"); - expectedException.expect(ApplicationException.class); + expectedException.expect(IllegalArgumentException.class); expectedException.expectMessage("Workflow instance id cannot be null"); - getExecutionDAO().createTasks(Collections.singletonList(task)); + getExecutionDAO().createTasks(List.of(task)); task.setWorkflowInstanceId(UUID.randomUUID().toString()); - expectedException.expect(ApplicationException.class); + expectedException.expect(IllegalArgumentException.class); expectedException.expectMessage("Task reference name cannot be null"); - getExecutionDAO().createTasks(Collections.singletonList(task)); + getExecutionDAO().createTasks(List.of(task)); } @Test @@ -108,7 +107,7 @@ public void testCreateTaskException2() { task.setTaskDefName("task1"); task.setWorkflowInstanceId(UUID.randomUUID().toString()); - expectedException.expect(ApplicationException.class); + expectedException.expect(IllegalArgumentException.class); expectedException.expectMessage("Task reference name cannot be null"); getExecutionDAO().createTasks(Collections.singletonList(task)); } diff --git a/core/src/test/java/com/netflix/conductor/service/MetadataServiceTest.java b/core/src/test/java/com/netflix/conductor/service/MetadataServiceTest.java index bc508a7b07..f1985e2716 100644 --- a/core/src/test/java/com/netflix/conductor/service/MetadataServiceTest.java +++ b/core/src/test/java/com/netflix/conductor/service/MetadataServiceTest.java @@ -32,7 +32,7 @@ import com.netflix.conductor.common.metadata.workflow.WorkflowDef; import com.netflix.conductor.common.metadata.workflow.WorkflowTask; import com.netflix.conductor.core.config.ConductorProperties; -import com.netflix.conductor.core.exception.ApplicationException; +import com.netflix.conductor.core.exception.NotFoundException; import com.netflix.conductor.dao.EventHandlerDAO; import com.netflix.conductor.dao.MetadataDAO; @@ -154,7 +154,7 @@ public void testUpdateTaskDefNull() { fail("metadataService.updateTaskDef did not throw ConstraintViolationException !"); } - @Test(expected = ApplicationException.class) + @Test(expected = NotFoundException.class) public void testUpdateTaskDefNotExisting() { TaskDef taskDef = new TaskDef(); taskDef.setName("test"); @@ -163,7 +163,7 @@ public void testUpdateTaskDefNotExisting() { metadataService.updateTaskDef(taskDef); } - @Test(expected = ApplicationException.class) + @Test(expected = NotFoundException.class) public void testUpdateTaskDefDaoException() { TaskDef taskDef = new TaskDef(); taskDef.setName("test"); diff --git a/core/src/test/java/com/netflix/conductor/service/WorkflowServiceTest.java b/core/src/test/java/com/netflix/conductor/service/WorkflowServiceTest.java index b710e0e200..aef6e5a2eb 100644 --- a/core/src/test/java/com/netflix/conductor/service/WorkflowServiceTest.java +++ b/core/src/test/java/com/netflix/conductor/service/WorkflowServiceTest.java @@ -35,7 +35,7 @@ import com.netflix.conductor.common.run.SearchResult; import com.netflix.conductor.common.run.Workflow; import com.netflix.conductor.common.run.WorkflowSummary; -import com.netflix.conductor.core.exception.ApplicationException; +import com.netflix.conductor.core.exception.NotFoundException; import com.netflix.conductor.core.execution.WorkflowExecutor; import static com.netflix.conductor.TestUtils.getConstraintViolationMessages; @@ -165,21 +165,11 @@ public void testStartWorkflowParam() { assertEquals("w112", workflowService.startWorkflow("test", 1, "c123", input)); } - @Test(expected = ApplicationException.class) - public void testApplicationExceptionStartWorkflowMessageParam() { - try { - when(metadataService.getWorkflowDef("test", 1)).thenReturn(null); + @Test(expected = NotFoundException.class) + public void testNotFoundExceptionStartWorkflowMessageParam() { + when(metadataService.getWorkflowDef("test", 1)).thenReturn(null); - Map input = new HashMap<>(); - input.put("1", "abc"); - - workflowService.startWorkflow("test", 1, "c123", input); - } catch (ApplicationException ex) { - String message = "No such workflow found by name: test, version: 1"; - assertEquals(message, ex.getMessage()); - throw ex; - } - fail("ApplicationException did not throw!"); + workflowService.startWorkflow("test", 1, "c123", Map.of("1", "abc")); } @Test(expected = ConstraintViolationException.class) @@ -247,17 +237,10 @@ public void testGetExecutionStatusNoWorkflowId() { } } - @Test(expected = ApplicationException.class) - public void testApplicationExceptionGetExecutionStatus() { - try { - when(executionService.getExecutionStatus(anyString(), anyBoolean())).thenReturn(null); - workflowService.getExecutionStatus("w123", true); - } catch (ApplicationException ex) { - String message = "Workflow with Id: w123 not found."; - assertEquals(message, ex.getMessage()); - throw ex; - } - fail("ApplicationException did not throw!"); + @Test(expected = NotFoundException.class) + public void testNotFoundExceptionGetExecutionStatus() { + when(executionService.getExecutionStatus(anyString(), anyBoolean())).thenReturn(null); + workflowService.getExecutionStatus("w123", true); } @Test diff --git a/es6-persistence/src/main/java/com/netflix/conductor/es6/dao/index/ElasticSearchDAOV6.java b/es6-persistence/src/main/java/com/netflix/conductor/es6/dao/index/ElasticSearchDAOV6.java index 0d40fd61ad..716be69815 100644 --- a/es6-persistence/src/main/java/com/netflix/conductor/es6/dao/index/ElasticSearchDAOV6.java +++ b/es6-persistence/src/main/java/com/netflix/conductor/es6/dao/index/ElasticSearchDAOV6.java @@ -63,7 +63,7 @@ import com.netflix.conductor.common.run.TaskSummary; import com.netflix.conductor.common.run.WorkflowSummary; import com.netflix.conductor.core.events.queue.Message; -import com.netflix.conductor.core.exception.ApplicationException; +import com.netflix.conductor.core.exception.TransientException; import com.netflix.conductor.dao.IndexDAO; import com.netflix.conductor.es6.config.ElasticSearchProperties; import com.netflix.conductor.es6.dao.query.parser.internal.ParserException; @@ -701,9 +701,7 @@ public CompletableFuture asyncRemoveWorkflow(String workflowId) { @Override public void updateWorkflow(String workflowInstanceId, String[] keys, Object[] values) { if (keys.length != values.length) { - throw new ApplicationException( - ApplicationException.Code.INVALID_INPUT, - "Number of keys and values do not match"); + throw new IllegalArgumentException("Number of keys and values do not match"); } long startTime = Instant.now().toEpochMilli(); @@ -774,8 +772,7 @@ private long count(String structuredQuery, String freeTextQuery, String docType) SearchResponse response = srb.get(); return response.getHits().getTotalHits(); } catch (ParserException e) { - throw new ApplicationException( - ApplicationException.Code.BACKEND_ERROR, e.getMessage(), e); + throw new TransientException(e.getMessage(), e); } } @@ -802,8 +799,7 @@ private SearchResult search( return mapSearchResult(srb.get()); } catch (ParserException e) { - throw new ApplicationException( - ApplicationException.Code.BACKEND_ERROR, e.getMessage(), e); + throw new TransientException(e.getMessage(), e); } } diff --git a/es6-persistence/src/main/java/com/netflix/conductor/es6/dao/index/ElasticSearchRestDAOV6.java b/es6-persistence/src/main/java/com/netflix/conductor/es6/dao/index/ElasticSearchRestDAOV6.java index a55d2f7d3f..764329ab35 100644 --- a/es6-persistence/src/main/java/com/netflix/conductor/es6/dao/index/ElasticSearchRestDAOV6.java +++ b/es6-persistence/src/main/java/com/netflix/conductor/es6/dao/index/ElasticSearchRestDAOV6.java @@ -65,7 +65,8 @@ import com.netflix.conductor.common.run.TaskSummary; import com.netflix.conductor.common.run.WorkflowSummary; import com.netflix.conductor.core.events.queue.Message; -import com.netflix.conductor.core.exception.ApplicationException; +import com.netflix.conductor.core.exception.NonTransientException; +import com.netflix.conductor.core.exception.TransientException; import com.netflix.conductor.dao.IndexDAO; import com.netflix.conductor.es6.config.ElasticSearchProperties; import com.netflix.conductor.es6.dao.query.parser.internal.ParserException; @@ -291,7 +292,7 @@ private String updateIndexName(String type) { return indexName; } catch (IOException e) { LOGGER.error("Failed to update log index name: {}", indexName, e); - throw new ApplicationException(e.getMessage(), e); + throw new NonTransientException("Failed to update log index name: " + indexName, e); } } @@ -725,8 +726,7 @@ public SearchResult searchWorkflows( return searchObjectIdsViaExpression( query, start, count, sort, freeText, WORKFLOW_DOC_TYPE); } catch (Exception e) { - throw new ApplicationException( - ApplicationException.Code.BACKEND_ERROR, e.getMessage(), e); + throw new TransientException(e.getMessage(), e); } } @@ -736,8 +736,7 @@ public SearchResult searchTasks( try { return searchObjectIdsViaExpression(query, start, count, sort, freeText, TASK_DOC_TYPE); } catch (Exception e) { - throw new ApplicationException( - ApplicationException.Code.BACKEND_ERROR, e.getMessage(), e); + throw new TransientException(e.getMessage(), e); } } @@ -774,9 +773,7 @@ public CompletableFuture asyncRemoveWorkflow(String workflowId) { public void updateWorkflow(String workflowInstanceId, String[] keys, Object[] values) { try { if (keys.length != values.length) { - throw new ApplicationException( - ApplicationException.Code.INVALID_INPUT, - "Number of keys and values do not match"); + throw new IllegalArgumentException("Number of keys and values do not match"); } long startTime = Instant.now().toEpochMilli(); @@ -954,8 +951,7 @@ public long getWorkflowCount(String query, String freeText) { try { return getObjectCounts(query, freeText, WORKFLOW_DOC_TYPE); } catch (Exception e) { - throw new ApplicationException( - ApplicationException.Code.BACKEND_ERROR, e.getMessage(), e); + throw new TransientException(e.getMessage(), e); } } diff --git a/grpc-server/src/main/java/com/netflix/conductor/grpc/server/service/MetadataServiceImpl.java b/grpc-server/src/main/java/com/netflix/conductor/grpc/server/service/MetadataServiceImpl.java index 32aaeb0bb8..72ed91331a 100644 --- a/grpc-server/src/main/java/com/netflix/conductor/grpc/server/service/MetadataServiceImpl.java +++ b/grpc-server/src/main/java/com/netflix/conductor/grpc/server/service/MetadataServiceImpl.java @@ -21,7 +21,7 @@ import com.netflix.conductor.common.metadata.tasks.TaskDef; import com.netflix.conductor.common.metadata.workflow.WorkflowDef; -import com.netflix.conductor.core.exception.ApplicationException; +import com.netflix.conductor.core.exception.NotFoundException; import com.netflix.conductor.grpc.MetadataServiceGrpc; import com.netflix.conductor.grpc.MetadataServicePb; import com.netflix.conductor.grpc.ProtoMapper; @@ -82,7 +82,7 @@ public void getWorkflow( .setWorkflow(workflow) .build()); response.onCompleted(); - } catch (ApplicationException e) { + } catch (NotFoundException e) { // TODO replace this with gRPC exception interceptor. response.onError( Status.NOT_FOUND diff --git a/grpc-server/src/main/java/com/netflix/conductor/grpc/server/service/WorkflowServiceImpl.java b/grpc-server/src/main/java/com/netflix/conductor/grpc/server/service/WorkflowServiceImpl.java index 42493c552b..3b10b33106 100644 --- a/grpc-server/src/main/java/com/netflix/conductor/grpc/server/service/WorkflowServiceImpl.java +++ b/grpc-server/src/main/java/com/netflix/conductor/grpc/server/service/WorkflowServiceImpl.java @@ -26,8 +26,7 @@ import com.netflix.conductor.common.run.SearchResult; import com.netflix.conductor.common.run.Workflow; import com.netflix.conductor.common.run.WorkflowSummary; -import com.netflix.conductor.core.exception.ApplicationException; -import com.netflix.conductor.core.exception.ApplicationException.Code; +import com.netflix.conductor.core.exception.NotFoundException; import com.netflix.conductor.grpc.ProtoMapper; import com.netflix.conductor.grpc.SearchPb; import com.netflix.conductor.grpc.WorkflowServiceGrpc; @@ -79,16 +78,14 @@ public void startWorkflow( response.onNext( WorkflowServicePb.StartWorkflowResponse.newBuilder().setWorkflowId(id).build()); response.onCompleted(); - } catch (ApplicationException ae) { - if (ae.getCode() == Code.NOT_FOUND) { - response.onError( - Status.NOT_FOUND - .withDescription( - "No such workflow found by name=" + request.getName()) - .asRuntimeException()); - } else { - GRPC_HELPER.onError(response, ae); - } + } catch (NotFoundException nfe) { + response.onError( + Status.NOT_FOUND + .withDescription("No such workflow found by name=" + request.getName()) + .asRuntimeException()); + } catch (Exception e) { + + GRPC_HELPER.onError(response, e); } } diff --git a/redis-concurrency-limit/src/main/java/com/netflix/conductor/redis/limit/RedisConcurrentExecutionLimitDAO.java b/redis-concurrency-limit/src/main/java/com/netflix/conductor/redis/limit/RedisConcurrentExecutionLimitDAO.java index 4107997252..ceb7fec101 100644 --- a/redis-concurrency-limit/src/main/java/com/netflix/conductor/redis/limit/RedisConcurrentExecutionLimitDAO.java +++ b/redis-concurrency-limit/src/main/java/com/netflix/conductor/redis/limit/RedisConcurrentExecutionLimitDAO.java @@ -24,7 +24,7 @@ import com.netflix.conductor.annotations.Trace; import com.netflix.conductor.common.metadata.tasks.TaskDef; -import com.netflix.conductor.core.exception.ApplicationException; +import com.netflix.conductor.core.exception.TransientException; import com.netflix.conductor.dao.ConcurrentExecutionLimitDAO; import com.netflix.conductor.metrics.Monitors; import com.netflix.conductor.model.TaskModel; @@ -75,7 +75,7 @@ public void addTaskToLimit(TaskModel task) { "Error updating taskDefLimit for task - %s:%s in workflow: %s", task.getTaskDefName(), task.getTaskId(), task.getWorkflowInstanceId()); LOGGER.error(errorMsg, e); - throw new ApplicationException(ApplicationException.Code.BACKEND_ERROR, errorMsg, e); + throw new TransientException(errorMsg, e); } } @@ -104,7 +104,7 @@ public void removeTaskFromLimit(TaskModel task) { "Error updating taskDefLimit for task - %s:%s in workflow: %s", task.getTaskDefName(), task.getTaskId(), task.getWorkflowInstanceId()); LOGGER.error(errorMsg, e); - throw new ApplicationException(ApplicationException.Code.BACKEND_ERROR, errorMsg, e); + throw new TransientException(errorMsg, e); } } @@ -156,7 +156,7 @@ public boolean exceedsLimit(TaskModel task) { "Failed to get in progress limit - %s:%s in workflow :%s", task.getTaskDefName(), task.getTaskId(), task.getWorkflowInstanceId()); LOGGER.error(errorMsg, e); - throw new ApplicationException(ApplicationException.Code.BACKEND_ERROR, errorMsg); + throw new TransientException(errorMsg); } } diff --git a/redis-persistence/src/main/java/com/netflix/conductor/redis/dao/RedisEventHandlerDAO.java b/redis-persistence/src/main/java/com/netflix/conductor/redis/dao/RedisEventHandlerDAO.java index 263e4ec77f..1742786e69 100644 --- a/redis-persistence/src/main/java/com/netflix/conductor/redis/dao/RedisEventHandlerDAO.java +++ b/redis-persistence/src/main/java/com/netflix/conductor/redis/dao/RedisEventHandlerDAO.java @@ -24,8 +24,8 @@ import com.netflix.conductor.common.metadata.events.EventHandler; import com.netflix.conductor.core.config.ConductorProperties; -import com.netflix.conductor.core.exception.ApplicationException; -import com.netflix.conductor.core.exception.ApplicationException.Code; +import com.netflix.conductor.core.exception.ConflictException; +import com.netflix.conductor.core.exception.NotFoundException; import com.netflix.conductor.dao.EventHandlerDAO; import com.netflix.conductor.redis.config.AnyRedisCondition; import com.netflix.conductor.redis.config.RedisProperties; @@ -55,9 +55,8 @@ public RedisEventHandlerDAO( public void addEventHandler(EventHandler eventHandler) { Preconditions.checkNotNull(eventHandler.getName(), "Missing Name"); if (getEventHandler(eventHandler.getName()) != null) { - throw new ApplicationException( - Code.CONFLICT, - "EventHandler with name " + eventHandler.getName() + " already exists!"); + throw new ConflictException( + "EventHandler with name %s already exists!", eventHandler.getName()); } index(eventHandler); jedisProxy.hset(nsKey(EVENT_HANDLERS), eventHandler.getName(), toJson(eventHandler)); @@ -69,9 +68,8 @@ public void updateEventHandler(EventHandler eventHandler) { Preconditions.checkNotNull(eventHandler.getName(), "Missing Name"); EventHandler existing = getEventHandler(eventHandler.getName()); if (existing == null) { - throw new ApplicationException( - Code.NOT_FOUND, - "EventHandler with name " + eventHandler.getName() + " not found!"); + throw new NotFoundException( + "EventHandler with name %s not found!", eventHandler.getName()); } index(eventHandler); jedisProxy.hset(nsKey(EVENT_HANDLERS), eventHandler.getName(), toJson(eventHandler)); @@ -82,8 +80,7 @@ public void updateEventHandler(EventHandler eventHandler) { public void removeEventHandler(String name) { EventHandler existing = getEventHandler(name); if (existing == null) { - throw new ApplicationException( - Code.NOT_FOUND, "EventHandler with name " + name + " not found!"); + throw new NotFoundException("EventHandler with name %s not found!", name); } jedisProxy.hdel(nsKey(EVENT_HANDLERS), name); recordRedisDaoRequests("removeEventHandler"); @@ -128,11 +125,9 @@ public List getEventHandlersForEvent(String event, boolean activeO && (!activeOnly || eventHandler.isActive())) { handlers.add(eventHandler); } - } catch (ApplicationException ae) { - if (ae.getCode() == Code.NOT_FOUND) { - LOGGER.info("No matching event handler found for event: {}", event); - } - throw ae; + } catch (NotFoundException nfe) { + LOGGER.info("No matching event handler found for event: {}", event); + throw nfe; } } return handlers; diff --git a/redis-persistence/src/main/java/com/netflix/conductor/redis/dao/RedisExecutionDAO.java b/redis-persistence/src/main/java/com/netflix/conductor/redis/dao/RedisExecutionDAO.java index e653d3d6f5..33902640d4 100644 --- a/redis-persistence/src/main/java/com/netflix/conductor/redis/dao/RedisExecutionDAO.java +++ b/redis-persistence/src/main/java/com/netflix/conductor/redis/dao/RedisExecutionDAO.java @@ -24,8 +24,7 @@ import com.netflix.conductor.common.metadata.events.EventExecution; import com.netflix.conductor.common.metadata.tasks.TaskDef; import com.netflix.conductor.core.config.ConductorProperties; -import com.netflix.conductor.core.exception.ApplicationException; -import com.netflix.conductor.core.exception.ApplicationException.Code; +import com.netflix.conductor.core.exception.TransientException; import com.netflix.conductor.dao.ConcurrentExecutionLimitDAO; import com.netflix.conductor.dao.ExecutionDAO; import com.netflix.conductor.metrics.Monitors; @@ -682,10 +681,8 @@ public boolean addEventExecution(EventExecution eventExecution) { return added; } catch (Exception e) { - throw new ApplicationException( - Code.BACKEND_ERROR, - "Unable to add event execution for " + eventExecution.getId(), - e); + throw new TransientException( + "Unable to add event execution for " + eventExecution.getId(), e); } } @@ -706,10 +703,8 @@ public void updateEventExecution(EventExecution eventExecution) { recordRedisDaoPayloadSize( "updateEventExecution", json.length(), eventExecution.getEvent(), "n/a"); } catch (Exception e) { - throw new ApplicationException( - Code.BACKEND_ERROR, - "Unable to update event execution for " + eventExecution.getId(), - e); + throw new TransientException( + "Unable to update event execution for " + eventExecution.getId(), e); } } @@ -726,10 +721,8 @@ public void removeEventExecution(EventExecution eventExecution) { jedisProxy.hdel(key, eventExecution.getId()); recordRedisDaoEventRequests("removeEventExecution", eventExecution.getEvent()); } catch (Exception e) { - throw new ApplicationException( - Code.BACKEND_ERROR, - "Unable to remove event execution for " + eventExecution.getId(), - e); + throw new TransientException( + "Unable to remove event execution for " + eventExecution.getId(), e); } } @@ -754,10 +747,8 @@ public List getEventExecutions( return executions; } catch (Exception e) { - throw new ApplicationException( - Code.BACKEND_ERROR, - "Unable to get event executions for " + eventHandlerName, - e); + throw new TransientException( + "Unable to get event executions for " + eventHandlerName, e); } } @@ -770,7 +761,7 @@ private void validate(TaskModel task) { Preconditions.checkNotNull( task.getReferenceTaskName(), "Task reference name cannot be null"); } catch (NullPointerException npe) { - throw new ApplicationException(Code.INVALID_INPUT, npe.getMessage(), npe); + throw new IllegalArgumentException(npe.getMessage(), npe); } } } diff --git a/redis-persistence/src/main/java/com/netflix/conductor/redis/dao/RedisMetadataDAO.java b/redis-persistence/src/main/java/com/netflix/conductor/redis/dao/RedisMetadataDAO.java index 81d671b481..fedb1895bd 100644 --- a/redis-persistence/src/main/java/com/netflix/conductor/redis/dao/RedisMetadataDAO.java +++ b/redis-persistence/src/main/java/com/netflix/conductor/redis/dao/RedisMetadataDAO.java @@ -32,8 +32,8 @@ import com.netflix.conductor.common.metadata.tasks.TaskDef; import com.netflix.conductor.common.metadata.workflow.WorkflowDef; import com.netflix.conductor.core.config.ConductorProperties; -import com.netflix.conductor.core.exception.ApplicationException; -import com.netflix.conductor.core.exception.ApplicationException.Code; +import com.netflix.conductor.core.exception.ConflictException; +import com.netflix.conductor.core.exception.NotFoundException; import com.netflix.conductor.dao.MetadataDAO; import com.netflix.conductor.metrics.Monitors; import com.netflix.conductor.redis.config.AnyRedisCondition; @@ -149,8 +149,7 @@ public void removeTaskDef(String name) { Preconditions.checkNotNull(name, "TaskDef name cannot be null"); Long result = jedisProxy.hdel(nsKey(ALL_TASK_DEFS), name); if (!result.equals(1L)) { - throw new ApplicationException( - Code.NOT_FOUND, "Cannot remove the task - no such task definition"); + throw new NotFoundException("Cannot remove the task - no such task definition"); } recordRedisDaoRequests("removeTaskDef"); refreshTaskDefs(); @@ -160,8 +159,7 @@ public void removeTaskDef(String name) { public void createWorkflowDef(WorkflowDef def) { if (jedisProxy.hexists( nsKey(WORKFLOW_DEF, def.getName()), String.valueOf(def.getVersion()))) { - throw new ApplicationException( - Code.CONFLICT, "Workflow with " + def.key() + " already exists!"); + throw new ConflictException("Workflow with %s already exists!", def.key()); } _createOrUpdate(def); } @@ -244,12 +242,9 @@ public void removeWorkflowDef(String name, Integer version) { Preconditions.checkNotNull(version, "Input version cannot be null"); Long result = jedisProxy.hdel(nsKey(WORKFLOW_DEF, name), String.valueOf(version)); if (!result.equals(1L)) { - throw new ApplicationException( - Code.NOT_FOUND, - String.format( - "Cannot remove the workflow - no such workflow" - + " definition: %s version: %d", - name, version)); + throw new NotFoundException( + "Cannot remove the workflow - no such workflow" + " definition: %s version: %d", + name, version); } // check if there are any more versions remaining if not delete the @@ -257,7 +252,7 @@ public void removeWorkflowDef(String name, Integer version) { Optional optionMaxVersion = getWorkflowMaxVersion(name); // delete workflow name - if (!optionMaxVersion.isPresent()) { + if (optionMaxVersion.isEmpty()) { jedisProxy.srem(nsKey(WORKFLOW_DEF_NAMES), name); } diff --git a/redis-persistence/src/test/java/com/netflix/conductor/redis/dao/RedisMetadataDAOTest.java b/redis-persistence/src/test/java/com/netflix/conductor/redis/dao/RedisMetadataDAOTest.java index 9a303ce529..bb581cf207 100644 --- a/redis-persistence/src/test/java/com/netflix/conductor/redis/dao/RedisMetadataDAOTest.java +++ b/redis-persistence/src/test/java/com/netflix/conductor/redis/dao/RedisMetadataDAOTest.java @@ -33,7 +33,8 @@ import com.netflix.conductor.common.metadata.tasks.TaskDef.TimeoutPolicy; import com.netflix.conductor.common.metadata.workflow.WorkflowDef; import com.netflix.conductor.core.config.ConductorProperties; -import com.netflix.conductor.core.exception.ApplicationException; +import com.netflix.conductor.core.exception.ConflictException; +import com.netflix.conductor.core.exception.NotFoundException; import com.netflix.conductor.redis.config.RedisProperties; import com.netflix.conductor.redis.jedis.JedisMock; import com.netflix.conductor.redis.jedis.JedisProxy; @@ -67,7 +68,7 @@ public void init() { new RedisMetadataDAO(jedisProxy, objectMapper, conductorProperties, properties); } - @Test(expected = ApplicationException.class) + @Test(expected = ConflictException.class) public void testDup() { WorkflowDef def = new WorkflowDef(); def.setName("testDup"); @@ -159,7 +160,7 @@ public void testWorkflowDefOperations() { assertEquals(workflow.getVersion(), 3); } - @Test(expected = ApplicationException.class) + @Test(expected = NotFoundException.class) public void removeInvalidWorkflowDef() { redisMetadataDAO.removeWorkflowDef("hello", 1); } @@ -220,8 +221,8 @@ public void testTaskDefOperations() { assertEquals(def.getName(), all.get(0).getName()); } - @Test(expected = ApplicationException.class) + @Test(expected = NotFoundException.class) public void testRemoveTaskDef() { - redisMetadataDAO.removeTaskDef("test" + UUID.randomUUID().toString()); + redisMetadataDAO.removeTaskDef("test" + UUID.randomUUID()); } } diff --git a/rest/src/main/java/com/netflix/conductor/rest/controllers/ApplicationExceptionMapper.java b/rest/src/main/java/com/netflix/conductor/rest/controllers/ApplicationExceptionMapper.java index 32eb2e212d..ab5c47eee9 100644 --- a/rest/src/main/java/com/netflix/conductor/rest/controllers/ApplicationExceptionMapper.java +++ b/rest/src/main/java/com/netflix/conductor/rest/controllers/ApplicationExceptionMapper.java @@ -12,6 +12,9 @@ */ package com.netflix.conductor.rest.controllers; +import java.util.HashMap; +import java.util.Map; + import javax.servlet.http.HttpServletRequest; import org.slf4j.Logger; @@ -23,15 +26,14 @@ import org.springframework.web.bind.annotation.RestControllerAdvice; import com.netflix.conductor.common.validation.ErrorResponse; -import com.netflix.conductor.core.exception.ApplicationException; +import com.netflix.conductor.core.exception.ConflictException; +import com.netflix.conductor.core.exception.NotFoundException; +import com.netflix.conductor.core.exception.TransientException; import com.netflix.conductor.core.utils.Utils; import com.netflix.conductor.metrics.Monitors; import com.fasterxml.jackson.databind.exc.InvalidFormatException; -import static com.netflix.conductor.core.exception.ApplicationException.Code.INTERNAL_ERROR; -import static com.netflix.conductor.core.exception.ApplicationException.Code.INVALID_INPUT; - @RestControllerAdvice @Order(ValidationExceptionMapper.ORDER + 1) public class ApplicationExceptionMapper { @@ -40,45 +42,40 @@ public class ApplicationExceptionMapper { private final String host = Utils.getServerId(); - @ExceptionHandler(ApplicationException.class) - public ResponseEntity handleApplicationException( - HttpServletRequest request, ApplicationException ex) { - logException(request, ex); - - Monitors.error("error", String.valueOf(ex.getHttpStatusCode())); + private static final Map, HttpStatus> EXCEPTION_STATUS_MAP = + new HashMap<>(); - return new ResponseEntity<>( - toErrorResponse(ex), HttpStatus.valueOf(ex.getHttpStatusCode())); + static { + EXCEPTION_STATUS_MAP.put(NotFoundException.class, HttpStatus.NOT_FOUND); + EXCEPTION_STATUS_MAP.put(ConflictException.class, HttpStatus.CONFLICT); + EXCEPTION_STATUS_MAP.put(IllegalArgumentException.class, HttpStatus.BAD_REQUEST); + EXCEPTION_STATUS_MAP.put(InvalidFormatException.class, HttpStatus.INTERNAL_SERVER_ERROR); } @ExceptionHandler(Throwable.class) public ResponseEntity handleAll(HttpServletRequest request, Throwable th) { logException(request, th); - ApplicationException.Code code = - (th instanceof IllegalArgumentException || th instanceof InvalidFormatException) - ? INVALID_INPUT - : INTERNAL_ERROR; + HttpStatus status = + EXCEPTION_STATUS_MAP.getOrDefault(th.getClass(), HttpStatus.INTERNAL_SERVER_ERROR); + + ErrorResponse errorResponse = new ErrorResponse(); + errorResponse.setInstance(host); + errorResponse.setStatus(status.value()); + errorResponse.setMessage(th.getMessage()); + errorResponse.setRetryable( + th instanceof TransientException); // set it to true for TransientException - ApplicationException ex = new ApplicationException(code, th.getMessage(), th); + Monitors.error("error", String.valueOf(status.value())); - return handleApplicationException(request, ex); + return new ResponseEntity<>(errorResponse, status); } private void logException(HttpServletRequest request, Throwable exception) { LOGGER.error( - String.format( - "Error %s url: '%s'", - exception.getClass().getSimpleName(), request.getRequestURI()), + "Error {} url: '{}'", + exception.getClass().getSimpleName(), + request.getRequestURI(), exception); } - - private ErrorResponse toErrorResponse(ApplicationException ex) { - ErrorResponse errorResponse = new ErrorResponse(); - errorResponse.setInstance(host); - errorResponse.setStatus(ex.getHttpStatusCode()); - errorResponse.setMessage(ex.getMessage()); - errorResponse.setRetryable(ex.isRetryable()); - return errorResponse; - } } diff --git a/test-harness/src/test/groovy/com/netflix/conductor/test/integration/SimpleWorkflowSpec.groovy b/test-harness/src/test/groovy/com/netflix/conductor/test/integration/SimpleWorkflowSpec.groovy index 276514742c..7dbdc54c5a 100644 --- a/test-harness/src/test/groovy/com/netflix/conductor/test/integration/SimpleWorkflowSpec.groovy +++ b/test-harness/src/test/groovy/com/netflix/conductor/test/integration/SimpleWorkflowSpec.groovy @@ -22,13 +22,13 @@ import com.netflix.conductor.common.metadata.tasks.TaskType import com.netflix.conductor.common.metadata.workflow.WorkflowDef import com.netflix.conductor.common.metadata.workflow.WorkflowTask import com.netflix.conductor.common.run.Workflow -import com.netflix.conductor.core.exception.ApplicationException +import com.netflix.conductor.core.exception.ConflictException +import com.netflix.conductor.core.exception.NotFoundException import com.netflix.conductor.dao.QueueDAO import com.netflix.conductor.test.base.AbstractSpecification import spock.lang.Shared -import static com.netflix.conductor.core.exception.ApplicationException.Code.CONFLICT import static com.netflix.conductor.test.util.WorkflowTestUtil.verifyPolledAndAcknowledgedTask class SimpleWorkflowSpec extends AbstractSpecification { @@ -188,8 +188,8 @@ class SimpleWorkflowSpec extends AbstractSpecification { workflowExecutor.restart(workflowInstanceId, false) then: "Ensure that a exception is thrown when a running workflow is being rewind" - def exceptionThrown = thrown(ApplicationException) - exceptionThrown.code == CONFLICT + def exceptionThrown = thrown(ConflictException.class) + exceptionThrown != null when: "'integration_task_1' is polled and failed with terminal error" def polledIntegrationTask1 = workflowExecutionService.poll('integration_task_1', 'task1.integration.worker') @@ -942,8 +942,7 @@ class SimpleWorkflowSpec extends AbstractSpecification { workflowExecutor.restart(workflowInstanceId, false) then: "Ensure that an exception is thrown" - def exceptionThrown = thrown(ApplicationException) - exceptionThrown + thrown(NotFoundException.class) cleanup: "clean up the changes made to the task and workflow definition during start up" metadataService.updateTaskDef(integrationTask1Definition) diff --git a/test-harness/src/test/groovy/com/netflix/conductor/test/resiliency/QueueResiliencySpec.groovy b/test-harness/src/test/groovy/com/netflix/conductor/test/resiliency/QueueResiliencySpec.groovy index 1a04b59b63..2ef19708e0 100644 --- a/test-harness/src/test/groovy/com/netflix/conductor/test/resiliency/QueueResiliencySpec.groovy +++ b/test-harness/src/test/groovy/com/netflix/conductor/test/resiliency/QueueResiliencySpec.groovy @@ -20,7 +20,8 @@ import com.netflix.conductor.common.metadata.tasks.TaskResult import com.netflix.conductor.common.metadata.workflow.RerunWorkflowRequest import com.netflix.conductor.common.metadata.workflow.StartWorkflowRequest import com.netflix.conductor.common.run.Workflow -import com.netflix.conductor.core.exception.ApplicationException +import com.netflix.conductor.core.exception.NotFoundException +import com.netflix.conductor.core.exception.TransientException import com.netflix.conductor.rest.controllers.TaskResource import com.netflix.conductor.rest.controllers.WorkflowResource import com.netflix.conductor.test.base.AbstractResiliencySpecification @@ -65,8 +66,8 @@ class QueueResiliencySpec extends AbstractResiliencySpecification { .withVersion(1)) then: "Verify that workflow start fails with BACKEND_ERROR" - 1 * queueDAO.push(*_) >> { throw new ApplicationException(ApplicationException.Code.BACKEND_ERROR, "Queue push failed from Spy") } - thrown(ApplicationException) + 1 * queueDAO.push(*_) >> { throw new TransientException("Queue push failed from Spy") } + thrown(TransientException.class) } def "Verify terminate succeeds when QueueDAO is unavailable"() { @@ -86,7 +87,7 @@ class QueueResiliencySpec extends AbstractResiliencySpecification { workflowResource.terminate(workflowInstanceId, "Terminated from a test") then: "Verify that terminate is successful without any exceptions" - 2 * queueDAO.remove(*_) >> { throw new ApplicationException(ApplicationException.Code.BACKEND_ERROR, "Queue remove failed from Spy") } + 2 * queueDAO.remove(*_) >> { throw new TransientException("Queue remove failed from Spy") } 0 * queueDAO._ with(workflowResource.getExecutionStatus(workflowInstanceId, true)) { status == Workflow.WorkflowStatus.TERMINATED @@ -117,10 +118,10 @@ class QueueResiliencySpec extends AbstractResiliencySpecification { workflowResource.restart(workflowInstanceId, false) then: "" - 1 * queueDAO.push(*_) >> { throw new ApplicationException(ApplicationException.Code.BACKEND_ERROR, "Queue push failed from Spy") } - 1 * queueDAO.remove(*_) >> { throw new ApplicationException(ApplicationException.Code.BACKEND_ERROR, "Queue remove failed from Spy") } + 1 * queueDAO.push(*_) >> { throw new TransientException("Queue push failed from Spy") } + 1 * queueDAO.remove(*_) >> { throw new TransientException("Queue remove failed from Spy") } 0 * queueDAO._ - thrown(ApplicationException) + thrown(TransientException.class) with(workflowResource.getExecutionStatus(workflowInstanceId, true)) { status == Workflow.WorkflowStatus.TERMINATED tasks.size() == 0 @@ -150,9 +151,9 @@ class QueueResiliencySpec extends AbstractResiliencySpecification { workflowResource.rerun(workflowInstanceId, rerunWorkflowRequest) then: "" - 1 * queueDAO.push(*_) >> { throw new ApplicationException(ApplicationException.Code.BACKEND_ERROR, "Queue push failed from Spy") } + 1 * queueDAO.push(*_) >> { throw new TransientException("Queue push failed from Spy") } 0 * queueDAO._ - thrown(ApplicationException) + thrown(TransientException.class) with(workflowResource.getExecutionStatus(workflowInstanceId, true)) { status == Workflow.WorkflowStatus.TERMINATED tasks.size() == 0 @@ -180,9 +181,9 @@ class QueueResiliencySpec extends AbstractResiliencySpecification { workflowResource.retry(workflowInstanceId, false) then: "Verify retry fails" - 1 * queueDAO.push(*_) >> { throw new ApplicationException(ApplicationException.Code.BACKEND_ERROR, "Queue push failed from Spy") } + 1 * queueDAO.push(*_) >> { throw new TransientException("Queue push failed from Spy") } 0 * queueDAO._ - thrown(ApplicationException) + thrown(TransientException.class) with(workflowResource.getExecutionStatus(workflowInstanceId, true)) { status == Workflow.WorkflowStatus.TERMINATED tasks.size() == 1 @@ -259,7 +260,7 @@ class QueueResiliencySpec extends AbstractResiliencySpecification { workflowResource.getExecutionStatus(workflowInstanceId, true) then: - thrown(ApplicationException) + thrown(NotFoundException.class) } def "Verify decide succeeds when QueueDAO is unavailable"() { @@ -340,8 +341,8 @@ class QueueResiliencySpec extends AbstractResiliencySpecification { workflowResource.resumeWorkflow(workflowInstanceId) then: "exception is thrown" - 1 * queueDAO.push(*_) >> { throw new ApplicationException(ApplicationException.Code.BACKEND_ERROR, "Queue push failed from Spy") } - thrown(ApplicationException) + 1 * queueDAO.push(*_) >> { throw new TransientException("Queue push failed from Spy") } + thrown(TransientException.class) with(workflowResource.getExecutionStatus(workflowInstanceId, true)) { status == Workflow.WorkflowStatus.PAUSED tasks.size() == 1 @@ -375,8 +376,8 @@ class QueueResiliencySpec extends AbstractResiliencySpecification { workflowResource.resetWorkflow(workflowInstanceId) then: "Verify an exception is thrown" - 1 * queueDAO.resetOffsetTime(*_) >> { throw new ApplicationException(ApplicationException.Code.BACKEND_ERROR, "Queue resetOffsetTime failed from Spy") } - thrown(ApplicationException) + 1 * queueDAO.resetOffsetTime(*_) >> { throw new TransientException("Queue resetOffsetTime failed from Spy") } + thrown(TransientException.class) } def "Verify search is not impacted by QueueDAO"() { diff --git a/test-harness/src/test/groovy/com/netflix/conductor/test/util/WorkflowTestUtil.groovy b/test-harness/src/test/groovy/com/netflix/conductor/test/util/WorkflowTestUtil.groovy index 4ba25b476d..0a1e383cab 100644 --- a/test-harness/src/test/groovy/com/netflix/conductor/test/util/WorkflowTestUtil.groovy +++ b/test-harness/src/test/groovy/com/netflix/conductor/test/util/WorkflowTestUtil.groovy @@ -23,7 +23,7 @@ import com.netflix.conductor.common.metadata.tasks.TaskDef import com.netflix.conductor.common.metadata.tasks.TaskResult import com.netflix.conductor.common.metadata.workflow.WorkflowDef import com.netflix.conductor.core.WorkflowContext -import com.netflix.conductor.core.exception.ApplicationException +import com.netflix.conductor.core.exception.NotFoundException import com.netflix.conductor.core.execution.WorkflowExecutor import com.netflix.conductor.dao.QueueDAO import com.netflix.conductor.model.WorkflowModel @@ -204,12 +204,8 @@ class WorkflowTestUtil { Optional getPersistedTaskDefinition(String taskDefName) { try { return Optional.of(metadataService.getTaskDef(taskDefName)) - } catch (ApplicationException applicationException) { - if (applicationException.code == ApplicationException.Code.NOT_FOUND) { - return Optional.empty() - } else { - throw applicationException - } + } catch(NotFoundException nfe) { + return Optional.empty() } }