From ac73d5fe0f8e3d649e2576d152efc039d150d5ca Mon Sep 17 00:00:00 2001 From: Sadayuki Furuhashi Date: Tue, 13 Dec 2016 17:13:55 -0800 Subject: [PATCH 1/5] Update TaskExecutionException API and avoid error with empty message Throwing TaskExecutionException with an empty ConfigElement ends up showing empty message in local mode because digdag-cli takes "message" field of the ConfigElement. Instead, operators should use TaskExecutionException.buildExceptionErrorConfig to build ConfigElement but it seems confusing because already some operators are missing it. --- .../io/digdag/core/agent/OperatorManager.java | 17 +-- .../core/agent/RequireOperatorFactory.java | 3 +- .../java/io/digdag/util/BaseOperator.java | 4 +- .../io/digdag/spi/TaskExecutionException.java | 115 +++++++++++++++--- .../operator/FailOperatorFactory.java | 2 +- .../operator/HttpOperatorFactory.java | 8 +- .../operator/MailOperatorFactory.java | 4 +- .../operator/NotifyOperatorFactory.java | 3 +- .../operator/gcp/BaseGcpOperator.java | 2 +- .../standards/operator/gcp/BqJobRunner.java | 9 +- .../operator/gcp/GcpCredentialProvider.java | 5 +- .../jdbc/AbstractJdbcJobOperator.java | 3 +- .../operator/state/PollingRetryExecutor.java | 2 +- .../operator/td/BaseTdJobOperator.java | 11 ++ .../standards/operator/td/TDJobOperator.java | 3 +- .../standards/operator/td/TDOperator.java | 2 +- .../operator/td/TdDdlOperatorFactory.java | 5 + .../operator/td/TdForEachOperatorFactory.java | 2 +- .../operator/td/TdWaitOperatorFactory.java | 7 +- .../td/TdWaitTableOperatorFactory.java | 11 +- 20 files changed, 164 insertions(+), 54 deletions(-) diff --git a/digdag-core/src/main/java/io/digdag/core/agent/OperatorManager.java b/digdag-core/src/main/java/io/digdag/core/agent/OperatorManager.java index 7737d94b38..79a4518465 100644 --- a/digdag-core/src/main/java/io/digdag/core/agent/OperatorManager.java +++ b/digdag-core/src/main/java/io/digdag/core/agent/OperatorManager.java @@ -387,13 +387,16 @@ public static void collectExceptionMessage(StringBuilder sb, Throwable ex, Strin sb.append("\n> "); } sb.append(message); - sb.append(" ("); - sb.append(ex.getClass().getSimpleName() - .replaceFirst("(?:Exception|Error)$", "") - .replaceAll("([A-Z]+)([A-Z][a-z])", "$1 $2") - .replaceAll("([a-z])([A-Z])", "$1 $2") - .toLowerCase()); - sb.append(")"); + if (!(ex instanceof TaskExecutionException)) { + // skip TaskExecutionException because it's expected to have well-formatted message + sb.append(" ("); + sb.append(ex.getClass().getSimpleName() + .replaceFirst("(?:Exception|Error)$", "") + .replaceAll("([A-Z]+)([A-Z][a-z])", "$1 $2") + .replaceAll("([a-z])([A-Z])", "$1 $2") + .toLowerCase()); + sb.append(")"); + } } if (ex.getCause() != null) { collectExceptionMessage(sb, ex.getCause(), used); diff --git a/digdag-core/src/main/java/io/digdag/core/agent/RequireOperatorFactory.java b/digdag-core/src/main/java/io/digdag/core/agent/RequireOperatorFactory.java index e4327d9b5f..2d2eb780fd 100644 --- a/digdag-core/src/main/java/io/digdag/core/agent/RequireOperatorFactory.java +++ b/digdag-core/src/main/java/io/digdag/core/agent/RequireOperatorFactory.java @@ -90,8 +90,7 @@ public TaskResult runTask() throw new ConfigException(ex); } catch (ResourceLimitExceededException ex) { - throw new TaskExecutionException(ex, - TaskExecutionException.buildExceptionErrorConfig(ex)); + throw new TaskExecutionException(ex); } } } diff --git a/digdag-plugin-utils/src/main/java/io/digdag/util/BaseOperator.java b/digdag-plugin-utils/src/main/java/io/digdag/util/BaseOperator.java index d7e1d6ec53..fefe1535e8 100644 --- a/digdag-plugin-utils/src/main/java/io/digdag/util/BaseOperator.java +++ b/digdag-plugin-utils/src/main/java/io/digdag/util/BaseOperator.java @@ -11,7 +11,6 @@ import io.digdag.spi.TaskExecutionException; import io.digdag.spi.TaskRequest; import io.digdag.spi.TaskResult; -import static io.digdag.spi.TaskExecutionException.buildExceptionErrorConfig; public abstract class BaseOperator implements Operator @@ -52,8 +51,7 @@ public TaskResult run() boolean doRetry = retry.evaluate(); if (doRetry) { - throw new TaskExecutionException(ex, - buildExceptionErrorConfig(ex), + throw TaskExecutionException.ofNextPollingWithCause(ex, retry.getNextRetryInterval(), ConfigElement.copyOf(retry.getNextRetryStateParams())); } diff --git a/digdag-spi/src/main/java/io/digdag/spi/TaskExecutionException.java b/digdag-spi/src/main/java/io/digdag/spi/TaskExecutionException.java index 334874f1cf..880e8c290c 100644 --- a/digdag-spi/src/main/java/io/digdag/spi/TaskExecutionException.java +++ b/digdag-spi/src/main/java/io/digdag/spi/TaskExecutionException.java @@ -9,13 +9,37 @@ import io.digdag.client.config.ConfigFactory; import io.digdag.client.config.ConfigElement; +import static com.google.common.base.Strings.isNullOrEmpty; +import static java.util.Locale.ENGLISH; + +/** + * An exception thrown when an expected exception happens. + * + *

+ * When an error happens in an operator, it should be wrapped with TaskExecutionException if + * the cause is expected so that Digdag shows a message for users without verbose stacktrace. + * The other exception classes are regarded as unexpected and Digdag shows stacktrace for operator + * developers. + *

+ * + *

+ * TaskExecutionException is also used to let Digdag retry the task later. Use ofNextPolling + * (if task retries for simple polling) or ofNextPollingWithCause (if task retries to recover + * from an error) method to retry. + *

+ */ public class TaskExecutionException extends RuntimeException { public static ConfigElement buildExceptionErrorConfig(Throwable ex) + { + return buildExceptionErrorConfig(formatExceptionMessage(ex), ex); + } + + public static ConfigElement buildExceptionErrorConfig(String message, Throwable ex) { Map map = ImmutableMap.of( - "message", ex.toString(), + "message", message, "stacktrace", Arrays.asList(ex.getStackTrace()) .stream() @@ -24,11 +48,43 @@ public static ConfigElement buildExceptionErrorConfig(Throwable ex) return ConfigElement.ofMap(map); } + private static String formatExceptionMessage(Throwable ex) + { + return firstNonEmptyMessage(ex) + .transform(message -> { + return String.format(ENGLISH, "%s (%s)", message, + ex.getClass().getSimpleName() + .replaceFirst("(?:Exception|Error)$", "") + .replaceAll("([A-Z]+)([A-Z][a-z])", "$1 $2") + .replaceAll("([a-z])([A-Z])", "$1 $2") + .toLowerCase()); + }) + .or(() -> ex.toString()); + } + + private static Optional firstNonEmptyMessage(Throwable ex) + { + String message = ex.getMessage(); + if (!isNullOrEmpty(message)) { + return Optional.of(message); + } + Throwable cause = ex.getCause(); + if (cause == null) { + return Optional.absent(); + } + return firstNonEmptyMessage(cause); + } + public static TaskExecutionException ofNextPolling(int interval, ConfigElement nextStateParams) { return new TaskExecutionException(interval, nextStateParams); } + public static TaskExecutionException ofNextPollingWithCause(Throwable cause, int interval, ConfigElement nextStateParams) + { + return new TaskExecutionException(cause, buildExceptionErrorConfig(cause), interval, nextStateParams); + } + private TaskExecutionException(int retryInterval, ConfigElement stateParams) { super("Retrying this task after "+retryInterval+" seconds"); @@ -37,40 +93,71 @@ private TaskExecutionException(int retryInterval, ConfigElement stateParams) this.stateParams = Optional.of(stateParams); } + private TaskExecutionException(Throwable cause, ConfigElement error, int retryInterval, ConfigElement stateParams) + { + super(cause); + this.error = Optional.of(error); + this.retryInterval = Optional.of(retryInterval); + this.stateParams = Optional.of(stateParams); + } + private final Optional error; private final Optional retryInterval; private final Optional stateParams; - public TaskExecutionException(Throwable cause, ConfigElement error) + /** + * Wrap an expected exception to make the task failed. + */ + public TaskExecutionException(Throwable cause) { - super(cause); - this.error = Optional.of(error); + this(formatExceptionMessage(cause), cause); + } + + /** + * Wrap an expected exception with a custom well-formatted message to make the task failed. + */ + public TaskExecutionException(String customMessage, Throwable cause) + { + super(customMessage, cause); + this.error = Optional.of(buildExceptionErrorConfig(customMessage, cause)); this.retryInterval = Optional.absent(); this.stateParams = Optional.absent(); } - public TaskExecutionException(String message, ConfigElement error) + /** + * Build an expected exception with a simple message to make the task failed. + */ + public TaskExecutionException(String message) + { + this(message, ImmutableMap.of()); + } + + /** + * Build an expected exception with a simple message and properties to make the task failed. + */ + public TaskExecutionException(String message, Map errorProperties) { super(message); - this.error = Optional.of(error); + this.error = Optional.of(buildPropertiesErrorConfig(message, errorProperties)); this.retryInterval = Optional.absent(); this.stateParams = Optional.absent(); } - public TaskExecutionException(Throwable cause, ConfigElement error, int retryInterval, ConfigElement stateParams) + private static ConfigElement buildPropertiesErrorConfig(String message, Map errorProperties) { - super(cause); - this.error = Optional.of(error); - this.retryInterval = Optional.of(retryInterval); - this.stateParams = Optional.of(stateParams); + ImmutableMap.Builder builder = ImmutableMap.builder(); + builder.putAll(errorProperties); + builder.put("message", message); + return ConfigElement.ofMap(builder.build()); } - public TaskExecutionException(String message, ConfigElement error, int retryInterval, ConfigElement stateParams) + @Deprecated + public TaskExecutionException(String message, ConfigElement error) { super(message); this.error = Optional.of(error); - this.retryInterval = Optional.of(retryInterval); - this.stateParams = Optional.of(stateParams); + this.retryInterval = Optional.absent(); + this.stateParams = Optional.absent(); } public Optional getError(ConfigFactory cf) diff --git a/digdag-standards/src/main/java/io/digdag/standards/operator/FailOperatorFactory.java b/digdag-standards/src/main/java/io/digdag/standards/operator/FailOperatorFactory.java index c9a082e92d..18f700b38e 100644 --- a/digdag-standards/src/main/java/io/digdag/standards/operator/FailOperatorFactory.java +++ b/digdag-standards/src/main/java/io/digdag/standards/operator/FailOperatorFactory.java @@ -49,7 +49,7 @@ public TaskResult run() Config errorParams = params.getFactory().create(); errorParams.set("message", message); - throw new TaskExecutionException(message, ConfigElement.copyOf(errorParams)); + throw new TaskExecutionException(message); } } } diff --git a/digdag-standards/src/main/java/io/digdag/standards/operator/HttpOperatorFactory.java b/digdag-standards/src/main/java/io/digdag/standards/operator/HttpOperatorFactory.java index d4e1d8654a..5b364e093a 100644 --- a/digdag-standards/src/main/java/io/digdag/standards/operator/HttpOperatorFactory.java +++ b/digdag-standards/src/main/java/io/digdag/standards/operator/HttpOperatorFactory.java @@ -279,7 +279,7 @@ private RuntimeException error(Request req, boolean uriIsSecret, Response res) return new RuntimeException("Failed HTTP request: " + requestStatus(req, res, uriIsSecret)); default: // 4xx: The request is invalid for this resource. Fail hard without retrying. - return new TaskExecutionException("HTTP 4XX Client Error: " + requestStatus(req, res, uriIsSecret), ConfigElement.empty()); + return new TaskExecutionException("HTTP 4XX Client Error: " + requestStatus(req, res, uriIsSecret)); } } else if (res.getStatus() >= 500 && res.getStatus() < 600) { @@ -301,7 +301,7 @@ private RuntimeException ephemeralError(String message) } else { // No, so fail hard. - return new TaskExecutionException(message, ConfigElement.empty()); + return new TaskExecutionException(message); } } @@ -366,7 +366,7 @@ private TaskResult result(ContentResponse response, boolean storeContent) if (storeContent) { String content = response.getContentAsString(); if (content.length() > maxStoredResponseContentSize) { - throw new TaskExecutionException("Response content too large: " + content.length() + " > " + maxStoredResponseContentSize, ConfigElement.empty()); + throw new TaskExecutionException("Response content too large: " + content.length() + " > " + maxStoredResponseContentSize); } http.set("last_content", content); builder.addResetStoreParams(ConfigKey.of("http", "last_content")); @@ -397,7 +397,7 @@ private HttpClient client() httpClient.start(); } catch (Exception e) { - throw new TaskExecutionException(e, TaskExecutionException.buildExceptionErrorConfig(e)); + throw new TaskExecutionException(e); } return httpClient; } diff --git a/digdag-standards/src/main/java/io/digdag/standards/operator/MailOperatorFactory.java b/digdag-standards/src/main/java/io/digdag/standards/operator/MailOperatorFactory.java index 90287e7c79..cc7bd6ea93 100644 --- a/digdag-standards/src/main/java/io/digdag/standards/operator/MailOperatorFactory.java +++ b/digdag-standards/src/main/java/io/digdag/standards/operator/MailOperatorFactory.java @@ -178,7 +178,7 @@ public TaskResult runTask() Transport.send(msg); } catch (MessagingException | IOException ex) { - throw new TaskExecutionException(ex, TaskExecutionException.buildExceptionErrorConfig(ex)); + throw new TaskExecutionException(ex); } return TaskResult.empty(request); @@ -214,7 +214,7 @@ private Session createSession(SecretProvider secrets, Config params) .orNull(); if (smtpConfig == null) { - throw new TaskExecutionException("Missing SMTP configuration", ConfigElement.empty()); + throw new TaskExecutionException("Missing SMTP configuration"); } Properties props = new Properties(); diff --git a/digdag-standards/src/main/java/io/digdag/standards/operator/NotifyOperatorFactory.java b/digdag-standards/src/main/java/io/digdag/standards/operator/NotifyOperatorFactory.java index f6a1b31c41..5b14d8a1ff 100644 --- a/digdag-standards/src/main/java/io/digdag/standards/operator/NotifyOperatorFactory.java +++ b/digdag-standards/src/main/java/io/digdag/standards/operator/NotifyOperatorFactory.java @@ -76,7 +76,8 @@ public TaskResult run() notifier.sendNotification(notification); } catch (NotificationException e) { - throw new TaskExecutionException(e, ConfigElement.copyOf(params)); + // notification failed + throw new TaskExecutionException(e); } return TaskResult.empty(request); diff --git a/digdag-standards/src/main/java/io/digdag/standards/operator/gcp/BaseGcpOperator.java b/digdag-standards/src/main/java/io/digdag/standards/operator/gcp/BaseGcpOperator.java index ac86489780..a75d63aedd 100644 --- a/digdag-standards/src/main/java/io/digdag/standards/operator/gcp/BaseGcpOperator.java +++ b/digdag-standards/src/main/java/io/digdag/standards/operator/gcp/BaseGcpOperator.java @@ -38,7 +38,7 @@ private String projectId(GcpCredential credential) Optional projectId = context.getSecrets().getSecretOptional("gcp.project") .or(credential.projectId()); if (!projectId.isPresent()) { - throw new TaskExecutionException("Missing 'gcp.project' secret", ConfigElement.empty()); + throw new TaskExecutionException("Missing 'gcp.project' secret"); } return projectId.get(); diff --git a/digdag-standards/src/main/java/io/digdag/standards/operator/gcp/BqJobRunner.java b/digdag-standards/src/main/java/io/digdag/standards/operator/gcp/BqJobRunner.java index 001194b134..e675bd06e7 100644 --- a/digdag-standards/src/main/java/io/digdag/standards/operator/gcp/BqJobRunner.java +++ b/digdag-standards/src/main/java/io/digdag/standards/operator/gcp/BqJobRunner.java @@ -109,7 +109,7 @@ Job runJob(JobConfiguration config) case "RUNNING": return Optional.absent(); default: - throw new TaskExecutionException("Unknown job state: " + canonicalJobId + ": " + status.getState(), ConfigElement.empty()); + throw new TaskExecutionException("Unknown job state: " + canonicalJobId + ": " + status.getState()); } }); @@ -121,7 +121,7 @@ Job runJob(JobConfiguration config) for (ErrorProto error : status.getErrors()) { logger.error(toPrettyString(error)); } - throw new TaskExecutionException("BigQuery job failed: " + canonicalJobId, errorConfig(status.getErrors())); + throw new TaskExecutionException("BigQuery job failed: " + canonicalJobId, errorProperties(status.getErrors())); } // Success @@ -130,13 +130,12 @@ Job runJob(JobConfiguration config) return completed; } - private static ConfigElement errorConfig(List errors) + private static Map errorProperties(List errors) { - Map map = ImmutableMap.of( + return ImmutableMap.of( "errors", errors.stream() .map(BqJobRunner::toPrettyString) .collect(Collectors.joining(", "))); - return ConfigElement.ofMap(map); } private static String toPrettyString(ErrorProto error) diff --git a/digdag-standards/src/main/java/io/digdag/standards/operator/gcp/GcpCredentialProvider.java b/digdag-standards/src/main/java/io/digdag/standards/operator/gcp/GcpCredentialProvider.java index 5be4d8035f..55c209389d 100644 --- a/digdag-standards/src/main/java/io/digdag/standards/operator/gcp/GcpCredentialProvider.java +++ b/digdag-standards/src/main/java/io/digdag/standards/operator/gcp/GcpCredentialProvider.java @@ -11,7 +11,6 @@ import java.io.ByteArrayInputStream; import java.io.IOException; -import static io.digdag.spi.TaskExecutionException.buildExceptionErrorConfig; import static java.nio.charset.StandardCharsets.UTF_8; class GcpCredentialProvider @@ -40,7 +39,7 @@ private Optional credentialProjectId(String credential) node = objectMapper.readTree(credential); } catch (IOException e) { - throw new TaskExecutionException("Unable to parse 'gcp.credential' secret", TaskExecutionException.buildExceptionErrorConfig(e)); + throw new TaskExecutionException("Unable to parse 'gcp.credential' secret", e); } JsonNode projectId = node.get("project_id"); if (projectId == null || !projectId.isTextual()) { @@ -55,7 +54,7 @@ private GoogleCredential googleCredential(String credential) return GoogleCredential.fromStream(new ByteArrayInputStream(credential.getBytes(UTF_8))); } catch (IOException e) { - throw new TaskExecutionException(e, buildExceptionErrorConfig(e)); + throw new TaskExecutionException(e); } } } diff --git a/digdag-standards/src/main/java/io/digdag/standards/operator/jdbc/AbstractJdbcJobOperator.java b/digdag-standards/src/main/java/io/digdag/standards/operator/jdbc/AbstractJdbcJobOperator.java index abd3d820f0..d73fcbef7c 100644 --- a/digdag-standards/src/main/java/io/digdag/standards/operator/jdbc/AbstractJdbcJobOperator.java +++ b/digdag-standards/src/main/java/io/digdag/standards/operator/jdbc/AbstractJdbcJobOperator.java @@ -18,7 +18,6 @@ import java.util.UUID; import java.util.stream.Collectors; -import static io.digdag.spi.TaskExecutionException.buildExceptionErrorConfig; import static java.nio.charset.StandardCharsets.UTF_8; public abstract class AbstractJdbcJobOperator @@ -163,7 +162,7 @@ protected TaskResult run(Config params, Config state, C connectionConfig) catch (DatabaseException ex) { // expected error that should suppress stacktrace by default String message = String.format("%s [%s]", ex.getMessage(), ex.getCause().getMessage()); - throw new TaskExecutionException(message, buildExceptionErrorConfig(ex)); + throw new TaskExecutionException(message, ex); } } diff --git a/digdag-standards/src/main/java/io/digdag/standards/operator/state/PollingRetryExecutor.java b/digdag-standards/src/main/java/io/digdag/standards/operator/state/PollingRetryExecutor.java index e7bb18d79d..c59423972e 100644 --- a/digdag-standards/src/main/java/io/digdag/standards/operator/state/PollingRetryExecutor.java +++ b/digdag-standards/src/main/java/io/digdag/standards/operator/state/PollingRetryExecutor.java @@ -183,7 +183,7 @@ public T run(Operation f) if (!retry(e)) { logger.warn("{}: giving up", formattedErrorMessage, e); - throw new TaskExecutionException(e, TaskExecutionException.buildExceptionErrorConfig(e)); + throw new TaskExecutionException(e); } int retryIteration = retryState.params().get(RETRY, int.class, 0); diff --git a/digdag-standards/src/main/java/io/digdag/standards/operator/td/BaseTdJobOperator.java b/digdag-standards/src/main/java/io/digdag/standards/operator/td/BaseTdJobOperator.java index 32390283fe..c6497dd4f6 100644 --- a/digdag-standards/src/main/java/io/digdag/standards/operator/td/BaseTdJobOperator.java +++ b/digdag-standards/src/main/java/io/digdag/standards/operator/td/BaseTdJobOperator.java @@ -2,8 +2,11 @@ import com.google.common.base.Optional; import com.google.common.collect.ImmutableList; +import com.treasuredata.client.TDClientException; import io.digdag.client.config.Config; +import io.digdag.client.config.ConfigElement; import io.digdag.spi.OperatorContext; +import io.digdag.spi.TaskExecutionException; import io.digdag.spi.TaskRequest; import io.digdag.spi.TaskResult; import io.digdag.standards.operator.DurationInterval; @@ -74,6 +77,14 @@ public final TaskResult runTask() return taskResult; } + catch (TDClientException ex) { + throw propagateTDClientException(ex); + } + } + + protected static TaskExecutionException propagateTDClientException(TDClientException ex) + { + return new TaskExecutionException(ex); } protected abstract String startJob(TDOperator op, String domainKey); diff --git a/digdag-standards/src/main/java/io/digdag/standards/operator/td/TDJobOperator.java b/digdag-standards/src/main/java/io/digdag/standards/operator/td/TDJobOperator.java index b7c9322977..ff842b5434 100644 --- a/digdag-standards/src/main/java/io/digdag/standards/operator/td/TDJobOperator.java +++ b/digdag-standards/src/main/java/io/digdag/standards/operator/td/TDJobOperator.java @@ -22,7 +22,6 @@ import java.util.List; import java.util.zip.GZIPInputStream; -import static io.digdag.spi.TaskExecutionException.buildExceptionErrorConfig; import static io.digdag.standards.operator.td.TDOperator.defaultRetryExecutor; class TDJobOperator @@ -146,7 +145,7 @@ TDJobSummary checkStatus() try { TDJob job = getJobInfo(); String message = job.getCmdOut() + "\n" + job.getStdErr(); - throw new TaskExecutionException(message, buildExceptionErrorConfig(ex)); + throw new TaskExecutionException(message, ex); } catch (Exception getJobInfoFailed) { getJobInfoFailed.addSuppressed(ex); diff --git a/digdag-standards/src/main/java/io/digdag/standards/operator/td/TDOperator.java b/digdag-standards/src/main/java/io/digdag/standards/operator/td/TDOperator.java index bdc03b80e0..2be154c583 100644 --- a/digdag-standards/src/main/java/io/digdag/standards/operator/td/TDOperator.java +++ b/digdag-standards/src/main/java/io/digdag/standards/operator/td/TDOperator.java @@ -356,7 +356,7 @@ public TDJobOperator runJob(TaskState state, String key, DurationInterval pollIn throw errorPollingException(state, key, jobState, retryInterval); } String message = jobInfo.getCmdOut() + "\n" + jobInfo.getStdErr(); - throw new TaskExecutionException(message, ConfigElement.empty()); + throw new TaskExecutionException(message); } return job; diff --git a/digdag-standards/src/main/java/io/digdag/standards/operator/td/TdDdlOperatorFactory.java b/digdag-standards/src/main/java/io/digdag/standards/operator/td/TdDdlOperatorFactory.java index 64fd3fba50..4d253fb3ce 100644 --- a/digdag-standards/src/main/java/io/digdag/standards/operator/td/TdDdlOperatorFactory.java +++ b/digdag-standards/src/main/java/io/digdag/standards/operator/td/TdDdlOperatorFactory.java @@ -4,6 +4,7 @@ import com.fasterxml.jackson.databind.annotation.JsonDeserialize; import com.google.common.collect.ImmutableList; import com.google.inject.Inject; +import com.treasuredata.client.TDClientException; import io.digdag.client.config.Config; import io.digdag.client.config.ConfigException; import io.digdag.core.Environment; @@ -30,6 +31,7 @@ import static com.google.common.collect.Iterables.concat; import static io.digdag.standards.operator.state.PollingRetryExecutor.pollingRetryExecutor; import static io.digdag.standards.operator.td.BaseTdJobOperator.configSelectorBuilder; +import static io.digdag.standards.operator.td.BaseTdJobOperator.propagateTDClientException; import static java.util.Locale.ENGLISH; public class TdDdlOperatorFactory @@ -166,6 +168,9 @@ public TaskResult runTask() .runAction(s -> o.accept(op)); } } + catch (TDClientException ex) { + throw propagateTDClientException(ex); + } return TaskResult.empty(request); } diff --git a/digdag-standards/src/main/java/io/digdag/standards/operator/td/TdForEachOperatorFactory.java b/digdag-standards/src/main/java/io/digdag/standards/operator/td/TdForEachOperatorFactory.java index 83d0b2ac0c..632622e402 100644 --- a/digdag-standards/src/main/java/io/digdag/standards/operator/td/TdForEachOperatorFactory.java +++ b/digdag-standards/src/main/java/io/digdag/standards/operator/td/TdForEachOperatorFactory.java @@ -156,7 +156,7 @@ private List fetchRows(TDJobOperator job) rows.add(row(columnNames, ite.next().asArrayValue())); if (rows.size() > Limits.maxWorkflowTasks()) { TaskLimitExceededException cause = new TaskLimitExceededException("Too many tasks. Limit: " + Limits.maxWorkflowTasks()); - throw new TaskExecutionException(cause, TaskExecutionException.buildExceptionErrorConfig(cause)); + throw new TaskExecutionException(cause); } } return rows; diff --git a/digdag-standards/src/main/java/io/digdag/standards/operator/td/TdWaitOperatorFactory.java b/digdag-standards/src/main/java/io/digdag/standards/operator/td/TdWaitOperatorFactory.java index 34ee48d78c..834a06ff18 100644 --- a/digdag-standards/src/main/java/io/digdag/standards/operator/td/TdWaitOperatorFactory.java +++ b/digdag-standards/src/main/java/io/digdag/standards/operator/td/TdWaitOperatorFactory.java @@ -3,6 +3,7 @@ import com.google.common.base.Optional; import com.google.common.collect.ImmutableList; import com.google.inject.Inject; +import com.treasuredata.client.TDClientException; import com.treasuredata.client.model.TDJobRequest; import com.treasuredata.client.model.TDJobRequestBuilder; import io.digdag.client.config.Config; @@ -31,6 +32,7 @@ import static io.digdag.standards.operator.state.PollingRetryExecutor.pollingRetryExecutor; import static io.digdag.standards.operator.td.BaseTdJobOperator.configSelectorBuilder; +import static io.digdag.standards.operator.td.BaseTdJobOperator.propagateTDClientException; import static io.digdag.standards.operator.td.TDOperator.isDeterministicClientException; import static java.nio.charset.StandardCharsets.UTF_8; @@ -126,6 +128,9 @@ public TaskResult runTask() // The query condition was fulfilled, we're done. return TaskResult.empty(request); } + catch (TDClientException ex) { + throw propagateTDClientException(ex); + } } private String startJob(TDOperator op, String domainKey) @@ -163,7 +168,7 @@ private boolean fetchJobResult(TDJobOperator job) ArrayValue row = firstRow.get(); if (row.size() < 1) { - throw new TaskExecutionException("Got empty row in result of query", ConfigElement.empty()); + throw new TaskExecutionException("Got empty row in result of query"); } Value firstCol = row.get(0); diff --git a/digdag-standards/src/main/java/io/digdag/standards/operator/td/TdWaitTableOperatorFactory.java b/digdag-standards/src/main/java/io/digdag/standards/operator/td/TdWaitTableOperatorFactory.java index d47b2506d7..ab4a6b25f7 100644 --- a/digdag-standards/src/main/java/io/digdag/standards/operator/td/TdWaitTableOperatorFactory.java +++ b/digdag-standards/src/main/java/io/digdag/standards/operator/td/TdWaitTableOperatorFactory.java @@ -3,6 +3,7 @@ import com.google.common.base.Optional; import com.google.common.collect.ImmutableList; import com.google.inject.Inject; +import com.treasuredata.client.TDClientException; import com.treasuredata.client.model.TDJobRequest; import com.treasuredata.client.model.TDJobRequestBuilder; import io.digdag.client.config.Config; @@ -33,6 +34,7 @@ import static io.digdag.standards.operator.state.PollingRetryExecutor.pollingRetryExecutor; import static io.digdag.standards.operator.td.BaseTdJobOperator.configSelectorBuilder; +import static io.digdag.standards.operator.td.BaseTdJobOperator.propagateTDClientException; import static io.digdag.standards.operator.td.TDOperator.isDeterministicClientException; public class TdWaitTableOperatorFactory @@ -148,6 +150,9 @@ public TaskResult runTask() // The row count condition was fulfilled. We're done. return TaskResult.empty(request); } + catch (TDClientException ex) { + throw propagateTDClientException(ex); + } } private boolean tableExists(TDOperator op) @@ -172,11 +177,11 @@ private boolean fetchJobResult(int rows, TDJobOperator job) : Optional.absent())); if (!firstRow.isPresent()) { - throw new TaskExecutionException("Got unexpected empty result for count job: " + job.getJobId(), ConfigElement.empty()); + throw new TaskExecutionException("Got unexpected empty result for count job: " + job.getJobId()); } ArrayValue row = firstRow.get(); if (row.size() != 1) { - throw new TaskExecutionException("Got unexpected result row size for count job: " + row.size(), ConfigElement.empty()); + throw new TaskExecutionException("Got unexpected result row size for count job: " + row.size()); } Value count = row.get(0); IntegerValue actualRows; @@ -184,7 +189,7 @@ private boolean fetchJobResult(int rows, TDJobOperator job) actualRows = count.asIntegerValue(); } catch (MessageTypeCastException e) { - throw new TaskExecutionException("Got unexpected value type count job: " + count.getValueType(), ConfigElement.empty()); + throw new TaskExecutionException("Got unexpected value type count job: " + count.getValueType()); } return BigInteger.valueOf(rows).compareTo(actualRows.asBigInteger()) <= 0; From 056e5366c5ea5806878df9d74def48ae8dc3c1ed Mon Sep 17 00:00:00 2001 From: Sadayuki Furuhashi Date: Tue, 13 Dec 2016 17:14:07 -0800 Subject: [PATCH 2/5] added test case for operator error message formatting --- .../agent/OperatorManagerExceptionTest.java | 160 ++++++++++++++++++ 1 file changed, 160 insertions(+) create mode 100644 digdag-core/src/test/java/io/digdag/core/agent/OperatorManagerExceptionTest.java diff --git a/digdag-core/src/test/java/io/digdag/core/agent/OperatorManagerExceptionTest.java b/digdag-core/src/test/java/io/digdag/core/agent/OperatorManagerExceptionTest.java new file mode 100644 index 0000000000..47a5215fef --- /dev/null +++ b/digdag-core/src/test/java/io/digdag/core/agent/OperatorManagerExceptionTest.java @@ -0,0 +1,160 @@ +package io.digdag.core.agent; + +import com.google.common.base.Optional; +import com.google.common.collect.ImmutableMap; +import java.util.Map; +import com.google.inject.Inject; +import com.google.inject.Scopes; +import com.google.inject.multibindings.Multibinder; +import io.digdag.client.config.Config; +import io.digdag.core.DigdagEmbed; +import io.digdag.spi.Operator; +import io.digdag.spi.OperatorContext; +import io.digdag.spi.OperatorFactory; +import io.digdag.spi.SecretAccessList; +import io.digdag.spi.SecretStore; +import io.digdag.spi.SecretStoreManager; +import io.digdag.spi.TaskRequest; +import io.digdag.spi.TaskResult; +import java.nio.file.Paths; +import io.digdag.spi.TaskExecutionException; + +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import io.digdag.client.config.ConfigUtils; + +import static io.digdag.client.config.ConfigUtils.newConfig; +import static io.digdag.core.workflow.OperatorTestingUtils.newTaskRequest; +import static io.digdag.core.workflow.WorkflowTestingUtils.setupEmbed; +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.instanceOf; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.fail; + +public class OperatorManagerExceptionTest +{ + static class CustomErrorOperatorFactory + implements OperatorFactory + { + @Inject + public CustomErrorOperatorFactory() + { } + + @Override + public String getType() + { + return "custom_error"; + } + + @Override + public Operator newOperator(OperatorContext context) + { + return new CustomErrorOperator(context); + } + } + + static class CustomErrorOperator + implements Operator + { + private final OperatorContext context; + + public CustomErrorOperator(OperatorContext context) + { + this.context = context; + } + + @Override + public TaskResult run() + { + throw EXCEPTIONS.get(context.getTaskRequest().getConfig().get("_command", String.class)); + } + } + + private static class CustomRuntimeException extends RuntimeException + { + CustomRuntimeException(String message) + { + super(message); + } + + CustomRuntimeException(String message, Throwable cause) + { + super(message, cause); + } + } + + private final static Map EXCEPTIONS = ImmutableMap.builder() + .put("runtime", new RuntimeException("foobar")) + .put("custom", new CustomRuntimeException("foobar")) + .put("custom_nested", new CustomRuntimeException(null, new RuntimeException("nested"))) + .put("wrapped_runtime", new TaskExecutionException(new RuntimeException("foobar"))) + .put("wrapped_custom", new TaskExecutionException(new CustomRuntimeException("foobar"))) + .put("wrapped_null_message", new TaskExecutionException(new CustomRuntimeException(null, new RuntimeException("cause")))) + .put("wrapped_custom_message", new TaskExecutionException("custom!!", new RuntimeException("foo"))) + .build(); + + private DigdagEmbed embed; + private OperatorManager operatorManager; + + @Before + public void setUp() + throws Exception + { + this.embed = setupEmbed((bootstrap) -> bootstrap.addModules((binder) -> { + Multibinder.newSetBinder(binder, OperatorFactory.class) + .addBinding().to(CustomErrorOperatorFactory.class).in(Scopes.SINGLETON); + }) + ); + this.operatorManager = embed.getInjector().getInstance(OperatorManager.class); + } + + @After + public void shutdown() + throws Exception + { + embed.close(); + } + + @Test + public void verifyException() + { + expectRuntimeException("runtime", RuntimeException.class, "foobar (runtime)"); + expectRuntimeException("custom", CustomRuntimeException.class, "foobar (custom runtime)"); + expectRuntimeException("custom_nested", CustomRuntimeException.class, "CustomRuntimeException (custom runtime)\n> nested (runtime)"); + expectExecutionException("wrapped_runtime", "foobar (runtime)"); + expectExecutionException("wrapped_custom", "foobar (custom runtime)"); + expectExecutionException("wrapped_null_message", "cause (custom runtime)"); + expectExecutionException("wrapped_custom_message", "custom!!"); + } + + private void expectExecutionException(String name, String expectedMessage) + { + Config config = newConfig().set("_command", name); + try { + operatorManager.callExecutor(Paths.get(""), "custom_error", newTaskRequest().withConfig(config)); + fail("expected TaskExecutionException"); + } + catch (RuntimeException ex) { + assertThat(ex, instanceOf(TaskExecutionException.class)); + Config error = ((TaskExecutionException) ex).getError(ConfigUtils.configFactory).or(newConfig()); + assertThat(error.get("message", String.class, null), is(expectedMessage)); + } + } + + private void expectRuntimeException(String name, Class expectedClass, String expectedMessage) + { + Config config = newConfig().set("_command", name); + try { + operatorManager.callExecutor(Paths.get(""), "custom_error", newTaskRequest().withConfig(config)); + fail("expected RuntimeException"); + } + catch (RuntimeException ex) { + assertThat((Object) ex.getClass(), is((Object) expectedClass)); + assertThat(OperatorManager.formatExceptionMessage(ex), is(expectedMessage)); + } + } +} From 3da24c2c29ee47bcfcd84bd1a42d5817a73e7dd2 Mon Sep 17 00:00:00 2001 From: Sadayuki Furuhashi Date: Thu, 22 Dec 2016 11:22:29 +0900 Subject: [PATCH 3/5] fixed merging --- .../main/java/io/digdag/spi/TaskExecutionException.java | 9 +++++++++ .../standards/operator/aws/EmrOperatorFactory.java | 6 +++--- 2 files changed, 12 insertions(+), 3 deletions(-) diff --git a/digdag-spi/src/main/java/io/digdag/spi/TaskExecutionException.java b/digdag-spi/src/main/java/io/digdag/spi/TaskExecutionException.java index 880e8c290c..1ad304ab07 100644 --- a/digdag-spi/src/main/java/io/digdag/spi/TaskExecutionException.java +++ b/digdag-spi/src/main/java/io/digdag/spi/TaskExecutionException.java @@ -160,6 +160,15 @@ public TaskExecutionException(String message, ConfigElement error) this.stateParams = Optional.absent(); } + @Deprecated + public TaskExecutionException(Throwable cause, ConfigElement error) + { + super(formatExceptionMessage(cause), cause); + this.error = Optional.of(error); + this.retryInterval = Optional.absent(); + this.stateParams = Optional.absent(); + } + public Optional getError(ConfigFactory cf) { return error.transform(it -> it.toConfig(cf)); diff --git a/digdag-standards/src/main/java/io/digdag/standards/operator/aws/EmrOperatorFactory.java b/digdag-standards/src/main/java/io/digdag/standards/operator/aws/EmrOperatorFactory.java index e282d66c07..da23f8ebc2 100644 --- a/digdag-standards/src/main/java/io/digdag/standards/operator/aws/EmrOperatorFactory.java +++ b/digdag-standards/src/main/java/io/digdag/standards/operator/aws/EmrOperatorFactory.java @@ -782,7 +782,7 @@ private List configurations(JsonNode node) configurationJson = workspace.templateFile(templateEngine, node.asText(), UTF_8, params); } catch (IOException | TemplateException e) { - throw new TaskExecutionException(e, TaskExecutionException.buildExceptionErrorConfig(e)); + throw new TaskExecutionException(e); } List values; try { @@ -900,7 +900,7 @@ void stageFiles() } catch (InterruptedException e) { Thread.currentThread().interrupt(); - throw new TaskExecutionException(e, TaskExecutionException.buildExceptionErrorConfig(e)); + throw new TaskExecutionException(e); } } } @@ -940,7 +940,7 @@ private PutObjectRequest stagingFilePutRequest(StagingFile file) bytes = Resources.toByteArray(new URL(reference.reference().get())); } catch (IOException e) { - throw new TaskExecutionException(e, TaskExecutionException.buildExceptionErrorConfig(e)); + throw new TaskExecutionException(e); } ObjectMetadata metadata = new ObjectMetadata(); metadata.setContentLength(bytes.length); From f8babb684bb4886e256faed80a6c5212b81b37fd Mon Sep 17 00:00:00 2001 From: Sadayuki Furuhashi Date: Fri, 6 Jan 2017 19:58:21 +0900 Subject: [PATCH 4/5] throwing TaskExecutionException doesn't need buildExceptionErrorConfig any more --- .../standards/operator/redshift/BaseRedshiftLoadOperator.java | 3 +-- .../operator/redshift/RedshiftLoadOperatorFactory.java | 4 +--- 2 files changed, 2 insertions(+), 5 deletions(-) diff --git a/digdag-standards/src/main/java/io/digdag/standards/operator/redshift/BaseRedshiftLoadOperator.java b/digdag-standards/src/main/java/io/digdag/standards/operator/redshift/BaseRedshiftLoadOperator.java index 91457011fb..f477526be5 100644 --- a/digdag-standards/src/main/java/io/digdag/standards/operator/redshift/BaseRedshiftLoadOperator.java +++ b/digdag-standards/src/main/java/io/digdag/standards/operator/redshift/BaseRedshiftLoadOperator.java @@ -30,7 +30,6 @@ import java.util.List; import java.util.UUID; -import static io.digdag.spi.TaskExecutionException.buildExceptionErrorConfig; import static io.digdag.standards.operator.state.PollingRetryExecutor.pollingRetryExecutor; abstract class BaseRedshiftLoadOperator @@ -218,7 +217,7 @@ private void executeTask(Config params, RedshiftConnectionConfig connectionConfi catch (DatabaseException ex) { // expected error that should suppress stacktrace by default String message = String.format("%s [%s]", ex.getMessage(), ex.getCause().getMessage()); - throw new TaskExecutionException(message, buildExceptionErrorConfig(ex)); + throw new TaskExecutionException(message, ex); } } } diff --git a/digdag-standards/src/main/java/io/digdag/standards/operator/redshift/RedshiftLoadOperatorFactory.java b/digdag-standards/src/main/java/io/digdag/standards/operator/redshift/RedshiftLoadOperatorFactory.java index 938d17acdb..11669f1591 100644 --- a/digdag-standards/src/main/java/io/digdag/standards/operator/redshift/RedshiftLoadOperatorFactory.java +++ b/digdag-standards/src/main/java/io/digdag/standards/operator/redshift/RedshiftLoadOperatorFactory.java @@ -31,8 +31,6 @@ import java.util.List; import java.util.Map; -import static io.digdag.spi.TaskExecutionException.buildExceptionErrorConfig; - public class RedshiftLoadOperatorFactory implements OperatorFactory { @@ -160,7 +158,7 @@ protected List buildAcceptableUriForSessionCredentials(Config con } catch (RetryExecutor.RetryGiveupException e) { throw new TaskExecutionException( - "Failed to fetch a manifest file: " + from, buildExceptionErrorConfig(e)); + "Failed to fetch a manifest file: " + from, e); } } return builder.build(); From 3ecd1b9531615f80509d78a75813295f903d9260 Mon Sep 17 00:00:00 2001 From: Sadayuki Furuhashi Date: Sat, 7 Jan 2017 13:46:43 +0900 Subject: [PATCH 5/5] emr: fix empty error message --- .../io/digdag/standards/operator/aws/EmrOperatorFactory.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/digdag-standards/src/main/java/io/digdag/standards/operator/aws/EmrOperatorFactory.java b/digdag-standards/src/main/java/io/digdag/standards/operator/aws/EmrOperatorFactory.java index da23f8ebc2..2784ab3bd4 100644 --- a/digdag-standards/src/main/java/io/digdag/standards/operator/aws/EmrOperatorFactory.java +++ b/digdag-standards/src/main/java/io/digdag/standards/operator/aws/EmrOperatorFactory.java @@ -395,7 +395,7 @@ private Optional checkStepCompletion(AmazonElasticMapReduce emr, Submissio details != null ? details : "{}"); } - throw new TaskExecutionException("EMR job failed", ConfigElement.empty()); + throw new TaskExecutionException("EMR job failed"); case "COMPLETED": logger.info("EMR steps done"); @@ -519,7 +519,7 @@ private Optional checkClusterBootStatus(AmazonElasticMapReduce emr, NewC if (createOnly) { // TODO: log more information about the errors // TODO: inspect state change reason to figure out whether it was the boot that failed or e.g. steps submitted by another agent - throw new TaskExecutionException("EMR boot failed: " + cluster.id(), ConfigElement.empty()); + throw new TaskExecutionException("EMR boot failed: " + cluster.id()); } return Optional.of(clusterState);