Skip to content

Commit

Permalink
parent_policy_terminate (#472)
Browse files Browse the repository at this point in the history
Unit test to properly handle parent close policy of terminate. This is a follow up of #471
  • Loading branch information
yiminc authored Jun 21, 2021
1 parent c411d30 commit 9019e38
Show file tree
Hide file tree
Showing 3 changed files with 97 additions and 18 deletions.
31 changes: 13 additions & 18 deletions internal/internal_workflow_testsuite.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,6 @@ type (
testResult converter.EncodedValue
testError error
doneChannel chan struct{}
doneChannelClosed bool
workerOptions WorkerOptions
dataConverter converter.DataConverter
runTimeout time.Duration
Expand Down Expand Up @@ -650,6 +649,9 @@ func (env *testWorkflowEnvironmentImpl) startMainLoop() {
return
}

// notify all child workflows to exit their main loop
defer close(env.doneChannel)

for !env.shouldStopEventLoop() {
// use non-blocking-select to check if there is anything pending in the main thread.
select {
Expand All @@ -676,7 +678,6 @@ func (env *testWorkflowEnvironmentImpl) startMainLoop() {
}
}
}
env.maybeStopEventLoop()
}
}

Expand Down Expand Up @@ -830,17 +831,6 @@ func (env *testWorkflowEnvironmentImpl) RequestCancelTimer(timerID TimerID) {
}, true)
}

func (env *testWorkflowEnvironmentImpl) maybeStopEventLoop() {
if !env.shouldStopEventLoop() {
return
}

if !env.doneChannelClosed {
close(env.doneChannel)
env.doneChannelClosed = true
}
}

func (env *testWorkflowEnvironmentImpl) Complete(result *commonpb.Payloads, err error) {
if env.isWorkflowCompleted {
env.logger.Debug("Workflow already completed.")
Expand Down Expand Up @@ -916,16 +906,21 @@ func (env *testWorkflowEnvironmentImpl) Complete(result *commonpb.Payloads, err
}
}

// cancel child workflows if their ParentClosePolicy request so.
env.maybeCancelChildWorkflows()
// properly handle child workflows based on their ParentClosePolicy
env.handleParentClosePolicy()
}

func (env *testWorkflowEnvironmentImpl) maybeCancelChildWorkflows() {
func (env *testWorkflowEnvironmentImpl) handleParentClosePolicy() {
for _, handle := range env.runningWorkflows {
if handle.env.parentEnv != nil &&
env.workflowInfo.WorkflowExecution.ID == handle.env.parentEnv.workflowInfo.WorkflowExecution.ID {
// current env is parent workflow of handle's workflow
if handle.params.ParentClosePolicy == enumspb.PARENT_CLOSE_POLICY_REQUEST_CANCEL {

switch handle.params.ParentClosePolicy {
case enumspb.PARENT_CLOSE_POLICY_ABANDON:
// noop
case enumspb.PARENT_CLOSE_POLICY_TERMINATE:
handle.env.Complete(nil, newTerminatedError())
case enumspb.PARENT_CLOSE_POLICY_REQUEST_CANCEL:
handle.env.cancelWorkflow(func(result *commonpb.Payloads, err error) {})
}
}
Expand Down
61 changes: 61 additions & 0 deletions internal/internal_workflow_testsuite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1080,6 +1080,67 @@ func (s *WorkflowTestSuiteUnitTest) Test_ChildWorkflow_RequestCancel() {
env.AssertExpectations(s.T())
}

func (s *WorkflowTestSuiteUnitTest) Test_ChildWorkflow_Terminated() {
var checkedError error
childOfChildWorkflowFn := func(ctx Context) error {
// this will not return as we expect this workflow to be terminated, checkedError would be nil
checkedError = Sleep(ctx, time.Second)
return checkedError
}

childOfChildWorkflowID := "CHILD-OF-CHILD-ID"
childWorkflowFn := func(ctx Context) error {
cwo := ChildWorkflowOptions{
WorkflowID: childOfChildWorkflowID,
ParentClosePolicy: enumspb.PARENT_CLOSE_POLICY_TERMINATE,
}
ctx = WithChildWorkflowOptions(ctx, cwo)
err := ExecuteChildWorkflow(ctx, childOfChildWorkflowFn).GetChildWorkflowExecution().Get(ctx, nil)
if err != nil {
return err
}
// return before child workflow complete
return nil
}

childWorkflowID := "CHILD-WORKFLOW-ID"
rootWorkflowFn := func(ctx Context) error {
cwo := ChildWorkflowOptions{
WorkflowID: childWorkflowID,
}
ctx = WithChildWorkflowOptions(ctx, cwo)
childFuture := ExecuteChildWorkflow(ctx, childWorkflowFn)
err := childFuture.GetChildWorkflowExecution().Get(ctx, nil)
if err != nil {
return err
}
err = childFuture.Get(ctx, nil) //wait until child workflow completed
if err != nil {
return err
}

err = Sleep(ctx, time.Minute) // sleep longer than childOfChildWorkflowFn
return err // would expect this to be nil
}

env := s.NewTestWorkflowEnvironment()
env.RegisterWorkflow(childOfChildWorkflowFn)
env.RegisterWorkflow(childWorkflowFn)
env.RegisterWorkflow(rootWorkflowFn)

env.ExecuteWorkflow(rootWorkflowFn)

s.True(env.IsWorkflowCompleted())
// root workflow no error
s.NoError(env.GetWorkflowError())
// child workflow no error
s.NoError(env.GetWorkflowErrorByID(childWorkflowID))
// verify childOfChild workflow is terminated
s.IsType(&TerminatedError{}, env.GetWorkflowErrorByID(childOfChildWorkflowID))
// verify checkedError is nil, because workflow is terminated so worker won't see the last workflow task.
s.Nil(checkedError)
}

func (s *WorkflowTestSuiteUnitTest) Test_MockActivityWait() {
workflowFn := func(ctx Context) error {
t1 := NewTimer(ctx, time.Hour)
Expand Down
23 changes: 23 additions & 0 deletions internal/workflow_testsuite.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import (
commonpb "go.temporal.io/api/common/v1"
enumspb "go.temporal.io/api/enums/v1"

"go.temporal.io/api/serviceerror"
"go.temporal.io/sdk/converter"
"go.temporal.io/sdk/log"
)
Expand Down Expand Up @@ -684,11 +685,33 @@ func (e *TestWorkflowEnvironment) GetWorkflowResult(valuePtr interface{}) error
return e.impl.testResult.Get(valuePtr)
}

// GetWorkflowResultByID extracts the encoded result from workflow by ID, it returns error if the extraction failed.
func (e *TestWorkflowEnvironment) GetWorkflowResultByID(workflowID string, valuePtr interface{}) error {
if workflowHandle, ok := e.impl.runningWorkflows[workflowID]; ok {
if !workflowHandle.env.isWorkflowCompleted {
panic("workflow is not completed")
}
if workflowHandle.env.testError != nil || workflowHandle.env.testResult == nil || valuePtr == nil {
return e.impl.testError
}
return e.impl.testResult.Get(valuePtr)
}
return serviceerror.NewNotFound(fmt.Sprintf("Workflow %v not exists", workflowID))
}

// GetWorkflowError return the error from test workflow
func (e *TestWorkflowEnvironment) GetWorkflowError() error {
return e.impl.testError
}

// GetWorkflowErrorByID return the error from test workflow
func (e *TestWorkflowEnvironment) GetWorkflowErrorByID(workflowID string) error {
if workflowHandle, ok := e.impl.runningWorkflows[workflowID]; ok {
return workflowHandle.env.testError
}
return serviceerror.NewNotFound(fmt.Sprintf("Workflow %v not exists", workflowID))
}

// CompleteActivity complete an activity that had returned activity.ErrResultPending error
func (e *TestWorkflowEnvironment) CompleteActivity(taskToken []byte, result interface{}, err error) error {
return e.impl.CompleteActivity(taskToken, result, err)
Expand Down

0 comments on commit 9019e38

Please sign in to comment.