Skip to content

Commit

Permalink
Implement GetLastFailure API (#293)
Browse files Browse the repository at this point in the history
* Add `GetLastError` API for retrieving last failed run of a workflow
  • Loading branch information
Sushisource authored Nov 24, 2020
1 parent 569d0d4 commit 06e1ca5
Show file tree
Hide file tree
Showing 10 changed files with 113 additions and 2 deletions.
8 changes: 7 additions & 1 deletion CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,14 @@ All PR titles should start with Upper case.

## Testing

Run all the tests with coverage and race detector enabled:
Run all the tests (including integration tests, requiring you are running a server locally - see
[here]https://github.com/temporalio/temporal/blob/master/CONTRIBUTING.md()) with coverage and race detector enabled:

```bash
make test
```

To run just the unit tests:
```bash
make unit-test
```
6 changes: 6 additions & 0 deletions internal/interceptors.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ type WorkflowOutboundCallsInterceptor interface {
IsReplaying(ctx Context) bool
HasLastCompletionResult(ctx Context) bool
GetLastCompletionResult(ctx Context, d ...interface{}) error
GetLastError(ctx Context) error
}

var _ WorkflowOutboundCallsInterceptor = (*WorkflowOutboundCallsInterceptorBase)(nil)
Expand Down Expand Up @@ -217,3 +218,8 @@ func (t *WorkflowOutboundCallsInterceptorBase) HasLastCompletionResult(ctx Conte
func (t *WorkflowOutboundCallsInterceptorBase) GetLastCompletionResult(ctx Context, d ...interface{}) error {
return t.Next.GetLastCompletionResult(ctx, d...)
}

// GetLastError forwards to t.Next
func (t *WorkflowOutboundCallsInterceptorBase) GetLastError(ctx Context) error {
return t.Next.GetLastError(ctx)
}
1 change: 1 addition & 0 deletions internal/internal_task_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -633,6 +633,7 @@ func (wth *workflowTaskHandlerImpl) createWorkflowContext(task *workflowservice.
Namespace: wth.namespace,
Attempt: attributes.GetAttempt(),
lastCompletionResult: attributes.LastCompletionResult,
lastFailure: attributes.ContinuedFailure,
CronSchedule: attributes.CronSchedule,
ContinuedExecutionRunID: attributes.ContinuedExecutionRunId,
ParentWorkflowNamespace: attributes.ParentWorkflowNamespace,
Expand Down
4 changes: 4 additions & 0 deletions internal/internal_workflow_testsuite.go
Original file line number Diff line number Diff line change
Expand Up @@ -2232,6 +2232,10 @@ func (env *testWorkflowEnvironmentImpl) setLastCompletionResult(result interface
env.workflowInfo.lastCompletionResult = data
}

func (env *testWorkflowEnvironmentImpl) setLastError(err error) {
env.workflowInfo.lastFailure = convertErrorToFailure(err, env.dataConverter)
}

func (env *testWorkflowEnvironmentImpl) setHeartbeatDetails(details interface{}) {
data, err := encodeArg(env.GetDataConverter(), details)
if err != nil {
Expand Down
20 changes: 20 additions & 0 deletions internal/internal_workflow_testsuite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2982,6 +2982,26 @@ func (s *WorkflowTestSuiteUnitTest) Test_CronHasLastResult() {
s.Equal(lastResult+1, result)
}

func (s *WorkflowTestSuiteUnitTest) Test_CronGetLastFailure() {
const failstr = "some previous failure"
cronWorkflow := func(ctx Context) (int, error) {
var lastfail = GetLastError(ctx)
if lastfail == nil || lastfail.Error() != failstr {
return 1, errors.New("last failure did not contain expected message")
}

return 0, nil
}

env := s.NewTestWorkflowEnvironment()
env.RegisterWorkflow(cronWorkflow)
env.SetLastError(errors.New(failstr))
env.ExecuteWorkflow(cronWorkflow)

s.True(env.IsWorkflowCompleted())
s.NoError(env.GetWorkflowError())
}

func (s *WorkflowTestSuiteUnitTest) Test_ActivityWithProgress() {
activityFn := func(ctx context.Context) (int, error) {
var progress int
Expand Down
16 changes: 16 additions & 0 deletions internal/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"github.com/uber-go/tally"
commonpb "go.temporal.io/api/common/v1"
enumspb "go.temporal.io/api/enums/v1"
failurepb "go.temporal.io/api/failure/v1"

"go.temporal.io/sdk/converter"
"go.temporal.io/sdk/log"
Expand Down Expand Up @@ -737,6 +738,7 @@ type WorkflowInfo struct {
Namespace string
Attempt int32 // Attempt starts from 1 and increased by 1 for every retry if retry policy is specified.
lastCompletionResult *commonpb.Payloads
lastFailure *failurepb.Failure
CronSchedule string
ContinuedExecutionRunID string
ParentWorkflowNamespace string
Expand Down Expand Up @@ -1365,6 +1367,20 @@ func (wc *workflowEnvironmentInterceptor) GetLastCompletionResult(ctx Context, d
return encodedVal.Get(d...)
}

// GetLastError extracts the latest failure from any from previous run for this workflow, if one has failed. If none
// have failed, nil is returned.
//
// See TestWorkflowEnvironment.SetLastError() for unit test support.
func GetLastError(ctx Context) error {
i := getWorkflowOutboundCallsInterceptor(ctx)
return i.GetLastError(ctx)
}

func (wc *workflowEnvironmentInterceptor) GetLastError(ctx Context) error {
info := wc.GetWorkflowInfo(ctx)
return convertFailureToError(info.lastFailure, wc.env.GetDataConverter())
}

// WithActivityOptions adds all options to the copy of the context.
// The current timeout resolution implementation is in seconds and uses math.Ceil(d.Seconds()) as the duration. But is
// subjected to change in the future.
Expand Down
5 changes: 5 additions & 0 deletions internal/workflow_testsuite.go
Original file line number Diff line number Diff line change
Expand Up @@ -736,6 +736,11 @@ func (e *TestWorkflowEnvironment) SetLastCompletionResult(result interface{}) {
e.impl.setLastCompletionResult(result)
}

// SetLastError sets the result to be returned from workflow.GetLastError().
func (e *TestWorkflowEnvironment) SetLastError(err error) {
e.impl.setLastError(err)
}

// SetMemoOnStart sets the memo when start workflow.
func (e *TestWorkflowEnvironment) SetMemoOnStart(memo map[string]interface{}) error {
memoStruct, err := getWorkflowMemo(memo, e.impl.GetDataConverter())
Expand Down
20 changes: 19 additions & 1 deletion test/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -684,6 +684,20 @@ func (ts *IntegrationTestSuite) TestContextPropagator() {
}, propagatedValues)
}

const CronWorkflowID = "test-cron"

func (ts *IntegrationTestSuite) TestFailurePropagation() {
var expected int
err := ts.executeWorkflow(CronWorkflowID, ts.workflows.CronWorkflow, &expected)
// Workflow asks to be cancelled
ts.Error(err)
var canceledErr *temporal.CanceledError
ts.True(errors.As(err, &canceledErr))
var errDeets *string
ts.NoError(canceledErr.Details(&errDeets))
ts.EqualValues("finished OK", *errDeets)
}

func (ts *IntegrationTestSuite) registerNamespace() {
client, err := client.NewNamespaceClient(client.Options{HostPort: ts.config.ServiceAddr})
ts.NoError(err)
Expand Down Expand Up @@ -748,13 +762,17 @@ func (ts *IntegrationTestSuite) executeWorkflowWithContextAndOption(
}

func (ts *IntegrationTestSuite) startWorkflowOptions(wfID string) client.StartWorkflowOptions {
return client.StartWorkflowOptions{
var wfOptions = client.StartWorkflowOptions{
ID: wfID,
TaskQueue: ts.taskQueueName,
WorkflowExecutionTimeout: 15 * time.Second,
WorkflowTaskTimeout: time.Second,
WorkflowIDReusePolicy: enumspb.WORKFLOW_ID_REUSE_POLICY_ALLOW_DUPLICATE,
}
if wfID == CronWorkflowID {
wfOptions.CronSchedule = "@every 1s"
}
return wfOptions
}

func (ts *IntegrationTestSuite) registerWorkflowsAndActivities(w worker.Worker) {
Expand Down
27 changes: 27 additions & 0 deletions test/workflow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -915,6 +915,32 @@ func (w *Workflows) ContextPropagator(ctx workflow.Context, startChild bool) ([]
return result, nil
}

const CronFailMsg = "dying on purpose"

func (w *Workflows) CronWorkflow(ctx workflow.Context) (int, error) {
retme := 0

if workflow.HasLastCompletionResult(ctx) {
var lastres int
if err := workflow.GetLastCompletionResult(ctx, &lastres); err == nil {
retme = lastres + 1
}
}

lastfail := workflow.GetLastError(ctx)
if retme == 2 && lastfail != nil {
if lastfail.Error() != CronFailMsg {
return -3, errors.New("incorrect message in latest failure")
}
return 3, temporal.NewCanceledError("finished OK")
}
if retme == 2 {
return -1, errors.New(CronFailMsg)
}

return retme, nil
}

func (w *Workflows) register(worker worker.Worker) {
worker.RegisterWorkflow(w.ActivityCancelRepro)
worker.RegisterWorkflow(w.ActivityCompletionUsingID)
Expand Down Expand Up @@ -953,6 +979,7 @@ func (w *Workflows) register(worker worker.Worker) {
worker.RegisterWorkflow(w.WorkflowWithParallelSideEffects)
worker.RegisterWorkflow(w.WorkflowWithParallelMutableSideEffects)
worker.RegisterWorkflow(w.SignalWorkflow)
worker.RegisterWorkflow(w.CronWorkflow)

worker.RegisterWorkflow(w.child)
worker.RegisterWorkflow(w.childForMemoAndSearchAttr)
Expand Down
8 changes: 8 additions & 0 deletions workflow/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -404,6 +404,14 @@ func GetLastCompletionResult(ctx Context, d ...interface{}) error {
return internal.GetLastCompletionResult(ctx, d...)
}

// GetLastError extracts the latest failure from any from previous run for this workflow, if one has failed. If none
// have failed, nil is returned.
//
// See TestWorkflowEnvironment.SetLastError() for unit test support.
func GetLastError(ctx Context) error {
return internal.GetLastError(ctx)
}

// UpsertSearchAttributes is used to add or update workflow search attributes.
// The search attributes can be used in query of List/Scan/Count workflow APIs.
// The key and value type must be registered on temporal server side;
Expand Down

0 comments on commit 06e1ca5

Please sign in to comment.