Skip to content
This repository has been archived by the owner on Dec 13, 2023. It is now read-only.

Commit

Permalink
introducing ConflictException
Browse files Browse the repository at this point in the history
  • Loading branch information
aravindanr committed May 22, 2022
1 parent 92c82e3 commit 8ec7cf3
Show file tree
Hide file tree
Showing 9 changed files with 82 additions and 63 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import com.netflix.conductor.common.metadata.workflow.WorkflowDef;
import com.netflix.conductor.core.exception.ApplicationException;
import com.netflix.conductor.core.exception.ApplicationException.Code;
import com.netflix.conductor.core.exception.ConflictException;
import com.netflix.conductor.dao.MetadataDAO;
import com.netflix.conductor.metrics.Monitors;

Expand Down Expand Up @@ -176,11 +177,9 @@ public void createWorkflowDef(WorkflowDef workflowDef) {
workflowDef.getVersion(),
workflowDefinition))
.wasApplied()) {
throw new ApplicationException(
Code.CONFLICT,
String.format(
"Workflow: %s, version: %s already exists!",
workflowDef.getName(), workflowDef.getVersion()));
throw new ConflictException(
"Workflow: %s, version: %s already exists!",
workflowDef.getName(), workflowDef.getVersion());
}
String workflowDefIndex =
getWorkflowDefIndexValue(workflowDef.getName(), workflowDef.getVersion());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ public class ApplicationException extends RuntimeException {
public enum Code {
INVALID_INPUT(400),
INTERNAL_ERROR(500),
CONFLICT(409),
UNAUTHORIZED(403),
BACKEND_ERROR(500);

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* Copyright 2022 Netflix, Inc.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package com.netflix.conductor.core.exception;

public class ConflictException extends RuntimeException {

public ConflictException(String message) {
super(message);
}

public ConflictException(String message, Object... args) {
super(String.format(message, args));
}

public ConflictException(String message, Throwable cause) {
super(message, cause);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import com.netflix.conductor.core.dal.ExecutionDAOFacade;
import com.netflix.conductor.core.exception.ApplicationException;
import com.netflix.conductor.core.exception.ApplicationException.Code;
import com.netflix.conductor.core.exception.ConflictException;
import com.netflix.conductor.core.exception.NotFoundException;
import com.netflix.conductor.core.exception.TerminateWorkflowException;
import com.netflix.conductor.core.execution.tasks.SystemTaskRegistry;
Expand Down Expand Up @@ -490,8 +491,8 @@ private void validateWorkflow(
public void resetCallbacksForWorkflow(String workflowId) {
WorkflowModel workflow = executionDAOFacade.getWorkflowModel(workflowId, true);
if (workflow.getStatus().isTerminal()) {
throw new ApplicationException(
CONFLICT, "Workflow is in terminal state. Status =" + workflow.getStatus());
throw new ConflictException(
"Workflow is in terminal state. Status = %s", workflow.getStatus());
}

// Get SIMPLE tasks in SCHEDULED state that have callbackAfterSeconds > 0 and set the
Expand Down Expand Up @@ -544,7 +545,7 @@ public void restart(String workflowId, boolean useLatestDefinitions) {
String.format(
"Workflow: %s is not in terminal state, unable to restart.", workflow);
LOGGER.error(errorMsg);
throw new ApplicationException(CONFLICT, errorMsg);
throw new ConflictException(errorMsg);
}

WorkflowDef workflowDef;
Expand Down Expand Up @@ -580,8 +581,7 @@ public void restart(String workflowId, boolean useLatestDefinitions) {
WorkflowModel.Status
.COMPLETED)) { // Can only restart non-completed workflows
// when the configuration is set to false
throw new ApplicationException(
CONFLICT, String.format("Workflow: %s is non-restartable", workflow));
throw new NotFoundException("Workflow: %s is non-restartable", workflow);
}

// Reset the workflow in the primary datastore and remove from indexer; then re-create it
Expand Down Expand Up @@ -623,11 +623,11 @@ public void restart(String workflowId, boolean useLatestDefinitions) {
public void retry(String workflowId, boolean resumeSubworkflowTasks) {
WorkflowModel workflow = executionDAOFacade.getWorkflowModel(workflowId, true);
if (!workflow.getStatus().isTerminal()) {
throw new ApplicationException(
CONFLICT, "Workflow is still running. status=" + workflow.getStatus());
throw new NotFoundException(
"Workflow is still running. status=%s", workflow.getStatus());
}
if (workflow.getTasks().isEmpty()) {
throw new ApplicationException(CONFLICT, "Workflow has not started yet");
throw new ConflictException("Workflow has not started yet");
}

if (resumeSubworkflowTasks) {
Expand Down Expand Up @@ -721,8 +721,7 @@ private void retry(WorkflowModel workflow) {
// it may not have any unsuccessful tasks that can be retried
if (retriableMap.values().size() == 0
&& workflow.getStatus() != WorkflowModel.Status.TIMED_OUT) {
throw new ApplicationException(
CONFLICT,
throw new ConflictException(
"There are no retryable tasks! Use restart if you want to attempt entire workflow execution again.");
}

Expand Down Expand Up @@ -881,7 +880,7 @@ WorkflowModel completeWorkflow(WorkflowModel workflow) {
String msg =
"Workflow is already in terminal state. Current status: "
+ workflow.getStatus();
throw new ApplicationException(CONFLICT, msg);
throw new ConflictException(msg);
}

// FIXME Backwards compatibility for legacy workflows already running.
Expand Down Expand Up @@ -931,7 +930,7 @@ WorkflowModel completeWorkflow(WorkflowModel workflow) {
public void terminateWorkflow(String workflowId, String reason) {
WorkflowModel workflow = executionDAOFacade.getWorkflowModel(workflowId, true);
if (WorkflowModel.Status.COMPLETED.equals(workflow.getStatus())) {
throw new ApplicationException(CONFLICT, "Cannot terminate a COMPLETED workflow.");
throw new ConflictException("Cannot terminate a COMPLETED workflow.");
}
workflow.setStatus(WorkflowModel.Status.TERMINATED);
terminateWorkflow(workflow, reason, null);
Expand Down Expand Up @@ -1469,9 +1468,9 @@ public void pauseWorkflow(String workflowId) {
WorkflowModel.Status status = WorkflowModel.Status.PAUSED;
WorkflowModel workflow = executionDAOFacade.getWorkflowModel(workflowId, false);
if (workflow.getStatus().isTerminal()) {
throw new ApplicationException(
CONFLICT,
"Workflow id " + workflowId + " has ended, status cannot be updated.");
throw new ConflictException(
"Workflow %s has ended, status cannot be updated.",
workflow.toShortString());
}
if (workflow.getStatus().equals(status)) {
return; // Already paused!
Expand Down Expand Up @@ -1827,7 +1826,7 @@ private boolean rerunWF(
String.format(
"Workflow: %s is not in terminal state, unable to rerun.", workflow);
LOGGER.error(errorMsg);
throw new ApplicationException(CONFLICT, errorMsg);
throw new ConflictException(errorMsg);
}
updateAndPushParents(workflow, "reran");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import com.netflix.conductor.core.config.ConductorProperties;
import com.netflix.conductor.core.dal.ExecutionDAOFacade;
import com.netflix.conductor.core.exception.ApplicationException;
import com.netflix.conductor.core.exception.ConflictException;
import com.netflix.conductor.core.exception.TerminateWorkflowException;
import com.netflix.conductor.core.execution.evaluators.Evaluator;
import com.netflix.conductor.core.execution.mapper.*;
Expand All @@ -59,7 +60,6 @@
import com.fasterxml.jackson.databind.ObjectMapper;

import static com.netflix.conductor.common.metadata.tasks.TaskType.*;
import static com.netflix.conductor.core.exception.ApplicationException.Code.CONFLICT;

import static java.util.Comparator.comparingInt;
import static java.util.stream.Collectors.groupingBy;
Expand Down Expand Up @@ -2008,8 +2008,7 @@ public void testPauseWorkflow() {
try {
workflowExecutor.pauseWorkflow(workflowId);
fail("Expected " + ApplicationException.class);
} catch (ApplicationException e) {
assertEquals(e.getCode(), CONFLICT);
} catch (ConflictException e) {
verify(executionDAOFacade, never()).updateWorkflow(any(WorkflowModel.class));
verify(queueDAO, never()).remove(anyString(), anyString());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,7 @@

import com.netflix.conductor.common.metadata.events.EventHandler;
import com.netflix.conductor.core.config.ConductorProperties;
import com.netflix.conductor.core.exception.ApplicationException;
import com.netflix.conductor.core.exception.ApplicationException.Code;
import com.netflix.conductor.core.exception.ConflictException;
import com.netflix.conductor.core.exception.NotFoundException;
import com.netflix.conductor.dao.EventHandlerDAO;
import com.netflix.conductor.redis.config.AnyRedisCondition;
Expand Down Expand Up @@ -56,9 +55,8 @@ public RedisEventHandlerDAO(
public void addEventHandler(EventHandler eventHandler) {
Preconditions.checkNotNull(eventHandler.getName(), "Missing Name");
if (getEventHandler(eventHandler.getName()) != null) {
throw new ApplicationException(
Code.CONFLICT,
"EventHandler with name " + eventHandler.getName() + " already exists!");
throw new ConflictException(
"EventHandler with name %s already exists!", eventHandler.getName());
}
index(eventHandler);
jedisProxy.hset(nsKey(EVENT_HANDLERS), eventHandler.getName(), toJson(eventHandler));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,7 @@
import com.netflix.conductor.common.metadata.tasks.TaskDef;
import com.netflix.conductor.common.metadata.workflow.WorkflowDef;
import com.netflix.conductor.core.config.ConductorProperties;
import com.netflix.conductor.core.exception.ApplicationException;
import com.netflix.conductor.core.exception.ApplicationException.Code;
import com.netflix.conductor.core.exception.ConflictException;
import com.netflix.conductor.core.exception.NotFoundException;
import com.netflix.conductor.dao.MetadataDAO;
import com.netflix.conductor.metrics.Monitors;
Expand Down Expand Up @@ -160,8 +159,7 @@ public void removeTaskDef(String name) {
public void createWorkflowDef(WorkflowDef def) {
if (jedisProxy.hexists(
nsKey(WORKFLOW_DEF, def.getName()), String.valueOf(def.getVersion()))) {
throw new ApplicationException(
Code.CONFLICT, "Workflow with " + def.key() + " already exists!");
throw new ConflictException("Workflow with %s already exists!", def.key());
}
_createOrUpdate(def);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@
*/
package com.netflix.conductor.rest.controllers;

import java.util.HashMap;
import java.util.Map;

import javax.servlet.http.HttpServletRequest;

import org.slf4j.Logger;
Expand All @@ -24,15 +27,13 @@

import com.netflix.conductor.common.validation.ErrorResponse;
import com.netflix.conductor.core.exception.ApplicationException;
import com.netflix.conductor.core.exception.ConflictException;
import com.netflix.conductor.core.exception.NotFoundException;
import com.netflix.conductor.core.utils.Utils;
import com.netflix.conductor.metrics.Monitors;

import com.fasterxml.jackson.databind.exc.InvalidFormatException;

import static com.netflix.conductor.core.exception.ApplicationException.Code.INTERNAL_ERROR;
import static com.netflix.conductor.core.exception.ApplicationException.Code.INVALID_INPUT;

@RestControllerAdvice
@Order(ValidationExceptionMapper.ORDER + 1)
public class ApplicationExceptionMapper {
Expand All @@ -41,6 +42,16 @@ public class ApplicationExceptionMapper {

private final String host = Utils.getServerId();

private static final Map<Class<? extends Throwable>, HttpStatus> EXCEPTION_STATUS_MAP =
new HashMap<>();

static {
EXCEPTION_STATUS_MAP.put(NotFoundException.class, HttpStatus.NOT_FOUND);
EXCEPTION_STATUS_MAP.put(ConflictException.class, HttpStatus.CONFLICT);
EXCEPTION_STATUS_MAP.put(IllegalArgumentException.class, HttpStatus.BAD_REQUEST);
EXCEPTION_STATUS_MAP.put(InvalidFormatException.class, HttpStatus.INTERNAL_SERVER_ERROR);
}

@ExceptionHandler(ApplicationException.class)
public ResponseEntity<ErrorResponse> handleApplicationException(
HttpServletRequest request, ApplicationException ex) {
Expand All @@ -52,42 +63,29 @@ public ResponseEntity<ErrorResponse> handleApplicationException(
toErrorResponse(ex), HttpStatus.valueOf(ex.getHttpStatusCode()));
}

@ExceptionHandler(NotFoundException.class)
public ResponseEntity<ErrorResponse> handleNotFoundException(
HttpServletRequest request, NotFoundException nfe) {
logException(request, nfe);
@ExceptionHandler(Throwable.class)
public ResponseEntity<ErrorResponse> handleAll(HttpServletRequest request, Throwable th) {
logException(request, th);

HttpStatus status =
EXCEPTION_STATUS_MAP.getOrDefault(th.getClass(), HttpStatus.INTERNAL_SERVER_ERROR);

HttpStatus status = HttpStatus.NOT_FOUND;
ErrorResponse errorResponse = new ErrorResponse();
errorResponse.setInstance(host);
errorResponse.setStatus(status.value());
errorResponse.setMessage(nfe.getMessage());
errorResponse.setRetryable(false);
errorResponse.setMessage(th.getMessage());
errorResponse.setRetryable(false); // set it to true for BACKEND_ERROR

Monitors.error("error", String.valueOf(status.value()));

return new ResponseEntity<>(errorResponse, status);
}

@ExceptionHandler(Throwable.class)
public ResponseEntity<ErrorResponse> handleAll(HttpServletRequest request, Throwable th) {
logException(request, th);

ApplicationException.Code code =
(th instanceof IllegalArgumentException || th instanceof InvalidFormatException)
? INVALID_INPUT
: INTERNAL_ERROR;

ApplicationException ex = new ApplicationException(code, th.getMessage(), th);

return handleApplicationException(request, ex);
}

private void logException(HttpServletRequest request, Throwable exception) {
LOGGER.error(
String.format(
"Error %s url: '%s'",
exception.getClass().getSimpleName(), request.getRequestURI()),
"Error {} url: '{}'",
exception.getClass().getSimpleName(),
request.getRequestURI(),
exception);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import com.netflix.conductor.common.metadata.workflow.WorkflowDef
import com.netflix.conductor.common.metadata.workflow.WorkflowTask
import com.netflix.conductor.common.run.Workflow
import com.netflix.conductor.core.exception.ApplicationException
import com.netflix.conductor.core.exception.ConflictException
import com.netflix.conductor.dao.QueueDAO
import com.netflix.conductor.test.base.AbstractSpecification

Expand Down Expand Up @@ -188,8 +189,8 @@ class SimpleWorkflowSpec extends AbstractSpecification {
workflowExecutor.restart(workflowInstanceId, false)

then: "Ensure that a exception is thrown when a running workflow is being rewind"
def exceptionThrown = thrown(ApplicationException)
exceptionThrown.code == CONFLICT
def exceptionThrown = thrown(ConflictException.class)
exceptionThrown != null

when: "'integration_task_1' is polled and failed with terminal error"
def polledIntegrationTask1 = workflowExecutionService.poll('integration_task_1', 'task1.integration.worker')
Expand Down

0 comments on commit 8ec7cf3

Please sign in to comment.