Skip to content

Commit

Permalink
Merge pull request #497 from treasure-data/require-failure-fix
Browse files Browse the repository at this point in the history
Fix require> operator to fail if dependent session fails
  • Loading branch information
frsyuki authored Mar 13, 2017
2 parents 92a4b5c + eefabbb commit 47d74e0
Show file tree
Hide file tree
Showing 7 changed files with 69 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import io.digdag.core.workflow.WorkflowExecutor;
import io.digdag.core.workflow.SessionAttemptConflictException;
import io.digdag.core.session.SessionStoreManager;
import io.digdag.core.session.AttemptStateFlags;
import io.digdag.core.session.StoredSessionAttempt;
import io.digdag.core.session.StoredSessionAttemptWithSession;
import io.digdag.core.repository.StoredRevision;
import io.digdag.core.repository.ArchiveType;
Expand Down Expand Up @@ -159,7 +159,7 @@ public void retryTask(int siteId,
}

@Override
public AttemptStateFlags startSession(
public StoredSessionAttempt startSession(
int siteId,
int projectId,
String workflowName,
Expand All @@ -184,12 +184,13 @@ public AttemptStateFlags startSession(
Optional.absent());

// TODO FIXME SessionMonitor monitors is not set
StoredSessionAttemptWithSession attempt;
try {
StoredSessionAttemptWithSession attempt = exec.submitWorkflow(siteId, ar, def);
return attempt.getStateFlags();
attempt = exec.submitWorkflow(siteId, ar, def);
}
catch (SessionAttemptConflictException ex) {
return ex.getConflictedSession().getStateFlags();
attempt = ex.getConflictedSession();
}
return attempt;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import io.digdag.client.config.ConfigFactory;
import io.digdag.core.repository.ResourceLimitExceededException;
import io.digdag.core.repository.ResourceNotFoundException;
import io.digdag.core.session.AttemptStateFlags;
import io.digdag.core.session.StoredSessionAttempt;
import io.digdag.spi.Operator;
import io.digdag.spi.OperatorContext;
import io.digdag.spi.OperatorFactory;
Expand All @@ -22,6 +22,8 @@
import java.nio.file.Path;
import java.time.Instant;

import static java.util.Locale.ENGLISH;

public class RequireOperatorFactory
implements OperatorFactory
{
Expand Down Expand Up @@ -66,19 +68,26 @@ public TaskResult runTask()
String workflowName = config.get("_command", String.class);
int projectId = config.get("project_id", int.class);
Instant instant = config.get("session_time", Instant.class);
boolean ignoreFailure = config.get("ignore_failure", boolean.class, false);
Optional<String> retryAttemptName = config.getOptional("retry_attempt_name", String.class);
Config overrideParams = config.getNestedOrGetEmpty("params");
try {
AttemptStateFlags flags = callback.startSession(
StoredSessionAttempt attempt = callback.startSession(
request.getSiteId(),
projectId,
workflowName,
instant,
retryAttemptName,
overrideParams);

boolean isDone = flags.isDone();
boolean isDone = attempt.getStateFlags().isDone();
if (isDone) {
if (!ignoreFailure && !attempt.getStateFlags().isSuccess()) {
// ignore_failure is false and the attempt is in error state. Make this operator failed.
throw new TaskExecutionException(String.format(ENGLISH,
"Dependent workflow failed. Session id: %d, attempt id: %d",
attempt.getSessionId(), attempt.getId()));
}
return TaskResult.empty(cf);
}
else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
import io.digdag.spi.TaskResult;
import io.digdag.spi.TaskRequest;
import io.digdag.spi.StorageObject;
import io.digdag.core.session.AttemptStateFlags;
import io.digdag.core.session.StoredSessionAttempt;
import io.digdag.core.repository.ResourceLimitExceededException;
import io.digdag.core.repository.ResourceNotFoundException;

Expand All @@ -36,7 +36,7 @@ void retryTask(int siteId,
int retryInterval, Config retryStateParams,
Optional<Config> error);

AttemptStateFlags startSession(
StoredSessionAttempt startSession(
int siteId,
int projectId,
String workflowName,
Expand Down
11 changes: 11 additions & 0 deletions digdag-docs/src/operators/require.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,3 +49,14 @@
session_time: ${moment(last_session_time).add(i, 'day')}
```

* **ignore_failure**: BOOLEAN

This operator fails when the dependent workflow finished with errors by default.

But if `ignore_failure: true` is set, this operator succeeds even when the workflow finished with errors.

```
require>: another_workflow
ignore_failure: true
```

33 changes: 33 additions & 0 deletions digdag-tests/src/test/java/acceptance/RequireIT.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@
import static utils.TestUtils.copyResource;
import static utils.TestUtils.getAttemptId;
import static utils.TestUtils.main;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.not;
import static org.junit.Assert.assertThat;

public class RequireIT
Expand All @@ -36,6 +38,7 @@ public void setUp()
throws Exception
{
projectDir = folder.getRoot().toPath().resolve("foobar");
Files.createDirectories(projectDir);
config = folder.newFile().toPath();
}

Expand Down Expand Up @@ -97,4 +100,34 @@ public void testRequire()
// Verify that the file created by the child workflow is there
assertThat(Files.exists(childOutFile), is(true));
}

@Test
public void testRequireFailsWhenDependentFails()
throws Exception
{
copyResource("acceptance/require/parent.dig", projectDir.resolve("parent.dig"));
copyResource("acceptance/require/fail.dig", projectDir.resolve("child.dig"));

CommandStatus status = main("run",
"-c", config.toString(),
"--project", projectDir.toString(),
"parent.dig");
assertThat(status.errUtf8(), status.code(), is(not(0)));

assertThat(status.errUtf8(), containsString("Dependent workflow failed."));
}

@Test
public void testRequireSucceedsWhenDependentFailsButIgnoreFailureIsSet()
throws Exception
{
copyResource("acceptance/require/parent_ignore_failure.dig", projectDir.resolve("parent.dig"));
copyResource("acceptance/require/fail.dig", projectDir.resolve("child.dig"));

CommandStatus status = main("run",
"-c", config.toString(),
"--project", projectDir.toString(),
"parent.dig");
assertThat(status.errUtf8(), status.code(), is(0));
}
}
2 changes: 2 additions & 0 deletions digdag-tests/src/test/resources/acceptance/require/fail.dig
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
+fail:
fail>: expected failure
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
+require:
require>: child
ignore_failure: true

0 comments on commit 47d74e0

Please sign in to comment.