Skip to content

Commit

Permalink
Merge pull request #422 from treasure-data/exception-message
Browse files Browse the repository at this point in the history
Update TaskExecutionException API and avoid error with empty message
  • Loading branch information
frsyuki authored Jan 7, 2017
2 parents 062d99b + 3ecd1b9 commit 832eae3
Show file tree
Hide file tree
Showing 24 changed files with 340 additions and 64 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -387,13 +387,16 @@ public static void collectExceptionMessage(StringBuilder sb, Throwable ex, Strin
sb.append("\n> ");
}
sb.append(message);
sb.append(" (");
sb.append(ex.getClass().getSimpleName()
.replaceFirst("(?:Exception|Error)$", "")
.replaceAll("([A-Z]+)([A-Z][a-z])", "$1 $2")
.replaceAll("([a-z])([A-Z])", "$1 $2")
.toLowerCase());
sb.append(")");
if (!(ex instanceof TaskExecutionException)) {
// skip TaskExecutionException because it's expected to have well-formatted message
sb.append(" (");
sb.append(ex.getClass().getSimpleName()
.replaceFirst("(?:Exception|Error)$", "")
.replaceAll("([A-Z]+)([A-Z][a-z])", "$1 $2")
.replaceAll("([a-z])([A-Z])", "$1 $2")
.toLowerCase());
sb.append(")");
}
}
if (ex.getCause() != null) {
collectExceptionMessage(sb, ex.getCause(), used);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,7 @@ public TaskResult runTask()
throw new ConfigException(ex);
}
catch (ResourceLimitExceededException ex) {
throw new TaskExecutionException(ex,
TaskExecutionException.buildExceptionErrorConfig(ex));
throw new TaskExecutionException(ex);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
package io.digdag.core.agent;

import com.google.common.base.Optional;
import com.google.common.collect.ImmutableMap;
import java.util.Map;
import com.google.inject.Inject;
import com.google.inject.Scopes;
import com.google.inject.multibindings.Multibinder;
import io.digdag.client.config.Config;
import io.digdag.core.DigdagEmbed;
import io.digdag.spi.Operator;
import io.digdag.spi.OperatorContext;
import io.digdag.spi.OperatorFactory;
import io.digdag.spi.SecretAccessList;
import io.digdag.spi.SecretStore;
import io.digdag.spi.SecretStoreManager;
import io.digdag.spi.TaskRequest;
import io.digdag.spi.TaskResult;
import java.nio.file.Paths;
import io.digdag.spi.TaskExecutionException;

import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import io.digdag.client.config.ConfigUtils;

import static io.digdag.client.config.ConfigUtils.newConfig;
import static io.digdag.core.workflow.OperatorTestingUtils.newTaskRequest;
import static io.digdag.core.workflow.WorkflowTestingUtils.setupEmbed;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.instanceOf;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.fail;

public class OperatorManagerExceptionTest
{
static class CustomErrorOperatorFactory
implements OperatorFactory
{
@Inject
public CustomErrorOperatorFactory()
{ }

@Override
public String getType()
{
return "custom_error";
}

@Override
public Operator newOperator(OperatorContext context)
{
return new CustomErrorOperator(context);
}
}

static class CustomErrorOperator
implements Operator
{
private final OperatorContext context;

public CustomErrorOperator(OperatorContext context)
{
this.context = context;
}

@Override
public TaskResult run()
{
throw EXCEPTIONS.get(context.getTaskRequest().getConfig().get("_command", String.class));
}
}

private static class CustomRuntimeException extends RuntimeException
{
CustomRuntimeException(String message)
{
super(message);
}

CustomRuntimeException(String message, Throwable cause)
{
super(message, cause);
}
}

private final static Map<String, RuntimeException> EXCEPTIONS = ImmutableMap.<String, RuntimeException>builder()
.put("runtime", new RuntimeException("foobar"))
.put("custom", new CustomRuntimeException("foobar"))
.put("custom_nested", new CustomRuntimeException(null, new RuntimeException("nested")))
.put("wrapped_runtime", new TaskExecutionException(new RuntimeException("foobar")))
.put("wrapped_custom", new TaskExecutionException(new CustomRuntimeException("foobar")))
.put("wrapped_null_message", new TaskExecutionException(new CustomRuntimeException(null, new RuntimeException("cause"))))
.put("wrapped_custom_message", new TaskExecutionException("custom!!", new RuntimeException("foo")))
.build();

private DigdagEmbed embed;
private OperatorManager operatorManager;

@Before
public void setUp()
throws Exception
{
this.embed = setupEmbed((bootstrap) -> bootstrap.addModules((binder) -> {
Multibinder.newSetBinder(binder, OperatorFactory.class)
.addBinding().to(CustomErrorOperatorFactory.class).in(Scopes.SINGLETON);
})
);
this.operatorManager = embed.getInjector().getInstance(OperatorManager.class);
}

@After
public void shutdown()
throws Exception
{
embed.close();
}

@Test
public void verifyException()
{
expectRuntimeException("runtime", RuntimeException.class, "foobar (runtime)");
expectRuntimeException("custom", CustomRuntimeException.class, "foobar (custom runtime)");
expectRuntimeException("custom_nested", CustomRuntimeException.class, "CustomRuntimeException (custom runtime)\n> nested (runtime)");
expectExecutionException("wrapped_runtime", "foobar (runtime)");
expectExecutionException("wrapped_custom", "foobar (custom runtime)");
expectExecutionException("wrapped_null_message", "cause (custom runtime)");
expectExecutionException("wrapped_custom_message", "custom!!");
}

private void expectExecutionException(String name, String expectedMessage)
{
Config config = newConfig().set("_command", name);
try {
operatorManager.callExecutor(Paths.get(""), "custom_error", newTaskRequest().withConfig(config));
fail("expected TaskExecutionException");
}
catch (RuntimeException ex) {
assertThat(ex, instanceOf(TaskExecutionException.class));
Config error = ((TaskExecutionException) ex).getError(ConfigUtils.configFactory).or(newConfig());
assertThat(error.get("message", String.class, null), is(expectedMessage));
}
}

private void expectRuntimeException(String name, Class<?> expectedClass, String expectedMessage)
{
Config config = newConfig().set("_command", name);
try {
operatorManager.callExecutor(Paths.get(""), "custom_error", newTaskRequest().withConfig(config));
fail("expected RuntimeException");
}
catch (RuntimeException ex) {
assertThat((Object) ex.getClass(), is((Object) expectedClass));
assertThat(OperatorManager.formatExceptionMessage(ex), is(expectedMessage));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
import io.digdag.spi.TaskExecutionException;
import io.digdag.spi.TaskRequest;
import io.digdag.spi.TaskResult;
import static io.digdag.spi.TaskExecutionException.buildExceptionErrorConfig;

public abstract class BaseOperator
implements Operator
Expand Down Expand Up @@ -52,8 +51,7 @@ public TaskResult run()

boolean doRetry = retry.evaluate();
if (doRetry) {
throw new TaskExecutionException(ex,
buildExceptionErrorConfig(ex),
throw TaskExecutionException.ofNextPollingWithCause(ex,
retry.getNextRetryInterval(),
ConfigElement.copyOf(retry.getNextRetryStateParams()));
}
Expand Down
124 changes: 110 additions & 14 deletions digdag-spi/src/main/java/io/digdag/spi/TaskExecutionException.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,37 @@
import io.digdag.client.config.ConfigFactory;
import io.digdag.client.config.ConfigElement;

import static com.google.common.base.Strings.isNullOrEmpty;
import static java.util.Locale.ENGLISH;

/**
* An exception thrown when an expected exception happens.
*
* <p>
* When an error happens in an operator, it should be wrapped with TaskExecutionException if
* the cause is expected so that Digdag shows a message for users without verbose stacktrace.
* The other exception classes are regarded as unexpected and Digdag shows stacktrace for operator
* developers.
* </p>
*
* <p>
* TaskExecutionException is also used to let Digdag retry the task later. Use ofNextPolling
* (if task retries for simple polling) or ofNextPollingWithCause (if task retries to recover
* from an error) method to retry.
* </p>
*/
public class TaskExecutionException
extends RuntimeException
{
public static ConfigElement buildExceptionErrorConfig(Throwable ex)
{
return buildExceptionErrorConfig(formatExceptionMessage(ex), ex);
}

public static ConfigElement buildExceptionErrorConfig(String message, Throwable ex)
{
Map<String, String> map = ImmutableMap.of(
"message", ex.toString(),
"message", message,
"stacktrace",
Arrays.asList(ex.getStackTrace())
.stream()
Expand All @@ -24,11 +48,43 @@ public static ConfigElement buildExceptionErrorConfig(Throwable ex)
return ConfigElement.ofMap(map);
}

private static String formatExceptionMessage(Throwable ex)
{
return firstNonEmptyMessage(ex)
.transform(message -> {
return String.format(ENGLISH, "%s (%s)", message,
ex.getClass().getSimpleName()
.replaceFirst("(?:Exception|Error)$", "")
.replaceAll("([A-Z]+)([A-Z][a-z])", "$1 $2")
.replaceAll("([a-z])([A-Z])", "$1 $2")
.toLowerCase());
})
.or(() -> ex.toString());
}

private static Optional<String> firstNonEmptyMessage(Throwable ex)
{
String message = ex.getMessage();
if (!isNullOrEmpty(message)) {
return Optional.of(message);
}
Throwable cause = ex.getCause();
if (cause == null) {
return Optional.absent();
}
return firstNonEmptyMessage(cause);
}

public static TaskExecutionException ofNextPolling(int interval, ConfigElement nextStateParams)
{
return new TaskExecutionException(interval, nextStateParams);
}

public static TaskExecutionException ofNextPollingWithCause(Throwable cause, int interval, ConfigElement nextStateParams)
{
return new TaskExecutionException(cause, buildExceptionErrorConfig(cause), interval, nextStateParams);
}

private TaskExecutionException(int retryInterval, ConfigElement stateParams)
{
super("Retrying this task after "+retryInterval+" seconds");
Expand All @@ -37,40 +93,80 @@ private TaskExecutionException(int retryInterval, ConfigElement stateParams)
this.stateParams = Optional.of(stateParams);
}

private TaskExecutionException(Throwable cause, ConfigElement error, int retryInterval, ConfigElement stateParams)
{
super(cause);
this.error = Optional.of(error);
this.retryInterval = Optional.of(retryInterval);
this.stateParams = Optional.of(stateParams);
}

private final Optional<ConfigElement> error;
private final Optional<Integer> retryInterval;
private final Optional<ConfigElement> stateParams;

public TaskExecutionException(Throwable cause, ConfigElement error)
/**
* Wrap an expected exception to make the task failed.
*/
public TaskExecutionException(Throwable cause)
{
super(cause);
this.error = Optional.of(error);
this(formatExceptionMessage(cause), cause);
}

/**
* Wrap an expected exception with a custom well-formatted message to make the task failed.
*/
public TaskExecutionException(String customMessage, Throwable cause)
{
super(customMessage, cause);
this.error = Optional.of(buildExceptionErrorConfig(customMessage, cause));
this.retryInterval = Optional.absent();
this.stateParams = Optional.absent();
}

public TaskExecutionException(String message, ConfigElement error)
/**
* Build an expected exception with a simple message to make the task failed.
*/
public TaskExecutionException(String message)
{
this(message, ImmutableMap.of());
}

/**
* Build an expected exception with a simple message and properties to make the task failed.
*/
public TaskExecutionException(String message, Map<String, String> errorProperties)
{
super(message);
this.error = Optional.of(error);
this.error = Optional.of(buildPropertiesErrorConfig(message, errorProperties));
this.retryInterval = Optional.absent();
this.stateParams = Optional.absent();
}

public TaskExecutionException(Throwable cause, ConfigElement error, int retryInterval, ConfigElement stateParams)
private static ConfigElement buildPropertiesErrorConfig(String message, Map<String, String> errorProperties)
{
super(cause);
this.error = Optional.of(error);
this.retryInterval = Optional.of(retryInterval);
this.stateParams = Optional.of(stateParams);
ImmutableMap.Builder<String, String> builder = ImmutableMap.builder();
builder.putAll(errorProperties);
builder.put("message", message);
return ConfigElement.ofMap(builder.build());
}

public TaskExecutionException(String message, ConfigElement error, int retryInterval, ConfigElement stateParams)
@Deprecated
public TaskExecutionException(String message, ConfigElement error)
{
super(message);
this.error = Optional.of(error);
this.retryInterval = Optional.of(retryInterval);
this.stateParams = Optional.of(stateParams);
this.retryInterval = Optional.absent();
this.stateParams = Optional.absent();
}

@Deprecated
public TaskExecutionException(Throwable cause, ConfigElement error)
{
super(formatExceptionMessage(cause), cause);
this.error = Optional.of(error);
this.retryInterval = Optional.absent();
this.stateParams = Optional.absent();
}

public Optional<Config> getError(ConfigFactory cf)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ public TaskResult run()
Config errorParams = params.getFactory().create();
errorParams.set("message", message);

throw new TaskExecutionException(message, ConfigElement.copyOf(errorParams));
throw new TaskExecutionException(message);
}
}
}
Loading

0 comments on commit 832eae3

Please sign in to comment.