Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MultiOperation retry non-durable Update #1652

Merged
merged 3 commits into from
Oct 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
138 changes: 83 additions & 55 deletions internal/internal_workflow_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -1715,75 +1715,99 @@ func (w *workflowClientInterceptor) executeWorkflowWithOperation(
withStartOp,
},
}
multiResp, err := w.client.workflowService.ExecuteMultiOperation(ctx, &multiRequest)

var multiErr *serviceerror.MultiOperationExecution
if errors.As(err, &multiErr) {
if len(multiErr.OperationErrors()) != len(multiRequest.Operations) {
return nil, fmt.Errorf("%w: %v instead of %v operation errors",
errInvalidServerResponse, len(multiErr.OperationErrors()), len(multiRequest.Operations))
var startResp *workflowservice.StartWorkflowExecutionResponse
var updateResp *workflowservice.UpdateWorkflowExecutionResponse
for {
multiResp, err := func() (*workflowservice.ExecuteMultiOperationResponse, error) {
grpcCtx, cancel := newGRPCContext(ctx, grpcTimeout(pollUpdateTimeout), grpcLongPoll(true), defaultGrpcRetryParameters(ctx))
defer cancel()

multiResp, err := w.client.workflowService.ExecuteMultiOperation(grpcCtx, &multiRequest)
if err != nil {
if ctx.Err() != nil {
return nil, NewWorkflowUpdateServiceTimeoutOrCanceledError(err)
}
if status := serviceerror.ToStatus(err); status.Code() == codes.Canceled || status.Code() == codes.DeadlineExceeded {
return nil, NewWorkflowUpdateServiceTimeoutOrCanceledError(err)
}
return nil, err
}

return multiResp, err
}()

var multiErr *serviceerror.MultiOperationExecution
if errors.As(err, &multiErr) {
if len(multiErr.OperationErrors()) != len(multiRequest.Operations) {
return nil, fmt.Errorf("%w: %v instead of %v operation errors",
errInvalidServerResponse, len(multiErr.OperationErrors()), len(multiRequest.Operations))
}

var abortedErr *serviceerror.MultiOperationAborted
startErr := errors.New("failed to start workflow")
for i, opReq := range multiRequest.Operations {
// if an operation error is of type MultiOperationAborted, it means it was only aborted because
// of another operation's error and is therefore not interesting or helpful
opErr := multiErr.OperationErrors()[i]

switch t := opReq.Operation.(type) {
case *workflowservice.ExecuteMultiOperationRequest_Operation_StartWorkflow:
if !errors.As(opErr, &abortedErr) {
startErr = opErr
}
case *workflowservice.ExecuteMultiOperationRequest_Operation_UpdateWorkflow:
if !errors.As(opErr, &abortedErr) {
startErr = fmt.Errorf("%w: %w", errInvalidWorkflowOperation, opErr)
}
default:
// this would only happen if a case statement for a newly added operation is missing above
return nil, fmt.Errorf("%w: %T", errUnsupportedOperation, t)
}
}
return nil, startErr
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can startErr be nil here? Even if server-side contract guarantees not, we might as well set a default somehow.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, if the server misbehaves, it could be nil. I'll set a default 👍

} else if err != nil {
return nil, err
}

if len(multiResp.Responses) != len(multiRequest.Operations) {
return nil, fmt.Errorf("%w: %v instead of %v operation results",
errInvalidServerResponse, len(multiResp.Responses), len(multiRequest.Operations))
}

var startErr error
var abortedErr *serviceerror.MultiOperationAborted
for i, opReq := range multiRequest.Operations {
// if an operation error is of type MultiOperationAborted, it means it was only aborted because
// of another operation's error and is therefore not interesting or helpful
opErr := multiErr.OperationErrors()[i]
resp := multiResp.Responses[i].Response

switch t := opReq.Operation.(type) {
case *workflowservice.ExecuteMultiOperationRequest_Operation_StartWorkflow:
if !errors.As(opErr, &abortedErr) {
startErr = opErr
if opResp, ok := resp.(*workflowservice.ExecuteMultiOperationResponse_Response_StartWorkflow); ok {
startResp = opResp.StartWorkflow
} else {
return nil, fmt.Errorf("%w: StartWorkflow response has the wrong type %T", errInvalidServerResponse, resp)
}
case *workflowservice.ExecuteMultiOperationRequest_Operation_UpdateWorkflow:
if !errors.As(opErr, &abortedErr) {
startErr = fmt.Errorf("%w: %w", errInvalidWorkflowOperation, opErr)
if opResp, ok := resp.(*workflowservice.ExecuteMultiOperationResponse_Response_UpdateWorkflow); ok {
updateResp = opResp.UpdateWorkflow
} else {
return nil, fmt.Errorf("%w: UpdateWorkflow response has the wrong type %T", errInvalidServerResponse, resp)
}
default:
// this would only happen if a case statement for a newly added operation is missing above
return nil, fmt.Errorf("%w: %T", errUnsupportedOperation, t)
}
}
return nil, startErr
} else if err != nil {
return nil, err
}

if len(multiResp.Responses) != len(multiRequest.Operations) {
return nil, fmt.Errorf("%w: %v instead of %v operation results",
errInvalidServerResponse, len(multiResp.Responses), len(multiRequest.Operations))
if w.updateIsDurable(updateResp) {
break
}
}

var startResp *workflowservice.StartWorkflowExecutionResponse
for i, opReq := range multiRequest.Operations {
resp := multiResp.Responses[i].Response

switch t := opReq.Operation.(type) {
case *workflowservice.ExecuteMultiOperationRequest_Operation_StartWorkflow:
if opResp, ok := resp.(*workflowservice.ExecuteMultiOperationResponse_Response_StartWorkflow); ok {
startResp = opResp.StartWorkflow
} else {
return nil, fmt.Errorf("%w: StartWorkflow response has the wrong type %T", errInvalidServerResponse, resp)
}
case *workflowservice.ExecuteMultiOperationRequest_Operation_UpdateWorkflow:
if opResp, ok := resp.(*workflowservice.ExecuteMultiOperationResponse_Response_UpdateWorkflow); ok {
handle, err := w.updateHandleFromResponse(
ctx,
enumspb.UPDATE_WORKFLOW_EXECUTION_LIFECYCLE_STAGE_UNSPECIFIED,
opResp.UpdateWorkflow)
operation.(*UpdateWithStartWorkflowOperation).set(handle, err)
if err != nil {
return nil, fmt.Errorf("%w: %w", errInvalidWorkflowOperation, err)
}
} else {
return nil, fmt.Errorf("%w: UpdateWorkflow response has the wrong type %T", errInvalidServerResponse, resp)
}
default:
// this would only happen if a case statement for a newly added operation is missing above
return nil, fmt.Errorf("%w: %T", errUnsupportedOperation, t)
}
handle, err := w.updateHandleFromResponse(ctx, enumspb.UPDATE_WORKFLOW_EXECUTION_LIFECYCLE_STAGE_UNSPECIFIED, updateResp)
operation.(*UpdateWithStartWorkflowOperation).set(handle, err)
if err != nil {
return nil, fmt.Errorf("%w: %w", errInvalidWorkflowOperation, err)
}

return startResp, nil
}

Expand Down Expand Up @@ -2028,11 +2052,7 @@ func (w *workflowClientInterceptor) UpdateWorkflow(
}
return nil, err
}
// Once the update is past admitted we know it is durable
// Note: old server version may return UNSPECIFIED if the update request
// did not reach the desired lifecycle stage.
if resp.GetStage() != enumspb.UPDATE_WORKFLOW_EXECUTION_LIFECYCLE_STAGE_ADMITTED &&
resp.GetStage() != enumspb.UPDATE_WORKFLOW_EXECUTION_LIFECYCLE_STAGE_UNSPECIFIED {
if w.updateIsDurable(resp) {
break
}
}
Expand All @@ -2042,6 +2062,14 @@ func (w *workflowClientInterceptor) UpdateWorkflow(
return w.updateHandleFromResponse(ctx, desiredLifecycleStage, resp)
}

func (w *workflowClientInterceptor) updateIsDurable(resp *workflowservice.UpdateWorkflowExecutionResponse) bool {
// Once the update is past admitted we know it is durable
// Note: old server version may return UNSPECIFIED if the update request
// did not reach the desired lifecycle stage.
return resp.GetStage() != enumspb.UPDATE_WORKFLOW_EXECUTION_LIFECYCLE_STAGE_ADMITTED &&
resp.GetStage() != enumspb.UPDATE_WORKFLOW_EXECUTION_LIFECYCLE_STAGE_UNSPECIFIED
}

func createUpdateWorkflowInput(
options UpdateWorkflowOptions,
) (*ClientUpdateWorkflowInput, error) {
Expand Down
112 changes: 112 additions & 0 deletions internal/internal_workflow_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -976,6 +976,118 @@ func (s *workflowRunSuite) TestGetWorkflowNoExtantWorkflowAndNoRunId() {
s.Equal("", workflowRunNoRunID.GetRunID())
}

func (s *workflowRunSuite) TestExecuteWorkflowWithUpdate_Retry() {
s.workflowServiceClient.EXPECT().
ExecuteMultiOperation(gomock.Any(), gomock.Any(), gomock.Any()).
Return(&workflowservice.ExecuteMultiOperationResponse{
Responses: []*workflowservice.ExecuteMultiOperationResponse_Response{
{
Response: &workflowservice.ExecuteMultiOperationResponse_Response_StartWorkflow{},
},
{
// 1st response: empty response, Update is not durable yet, client retries
Response: &workflowservice.ExecuteMultiOperationResponse_Response_UpdateWorkflow{},
},
},
}, nil).
Return(&workflowservice.ExecuteMultiOperationResponse{
Responses: []*workflowservice.ExecuteMultiOperationResponse_Response{
{
Response: &workflowservice.ExecuteMultiOperationResponse_Response_StartWorkflow{
StartWorkflow: &workflowservice.StartWorkflowExecutionResponse{
RunId: "RUN_ID",
},
},
},
{
// 2nd response: non-empty response, Update is durable
Response: &workflowservice.ExecuteMultiOperationResponse_Response_UpdateWorkflow{
UpdateWorkflow: &workflowservice.UpdateWorkflowExecutionResponse{
Stage: enumspb.UPDATE_WORKFLOW_EXECUTION_LIFECYCLE_STAGE_COMPLETED,
},
},
},
},
}, nil)

updOp := NewUpdateWithStartWorkflowOperation(
UpdateWorkflowOptions{
UpdateName: "update",
WaitForStage: WorkflowUpdateStageCompleted,
})

_, err := s.workflowClient.ExecuteWorkflow(
context.Background(),
StartWorkflowOptions{
ID: workflowID,
TaskQueue: taskqueue,
WithStartOperation: updOp,
}, workflowType,
)
s.NoError(err)
}

func (s *workflowRunSuite) TestExecuteWorkflowWithUpdate_Abort() {
tests := []struct {
name string
expectedErr string
respFunc func(ctx context.Context, in *workflowservice.ExecuteMultiOperationRequest, opts ...grpc.CallOption) (*workflowservice.ExecuteMultiOperationResponse, error)
}{
{
name: "Timeout",
expectedErr: "context deadline exceeded",
respFunc: func(ctx context.Context, in *workflowservice.ExecuteMultiOperationRequest, opts ...grpc.CallOption) (*workflowservice.ExecuteMultiOperationResponse, error) {
<-ctx.Done()
return nil, ctx.Err()
},
},
{
name: "Cancelled",
expectedErr: "was_cancelled",
respFunc: func(ctx context.Context, in *workflowservice.ExecuteMultiOperationRequest, opts ...grpc.CallOption) (*workflowservice.ExecuteMultiOperationResponse, error) {
return nil, serviceerror.NewCanceled("was_cancelled")
},
},
{
name: "DeadlineExceeded",
expectedErr: "deadline_exceeded",
respFunc: func(ctx context.Context, in *workflowservice.ExecuteMultiOperationRequest, opts ...grpc.CallOption) (*workflowservice.ExecuteMultiOperationResponse, error) {
return nil, serviceerror.NewDeadlineExceeded("deadline_exceeded")
},
},
}

for _, tt := range tests {
s.Run(tt.name, func() {
s.workflowServiceClient.EXPECT().
ExecuteMultiOperation(gomock.Any(), gomock.Any(), gomock.Any()).
DoAndReturn(tt.respFunc)

updOp := NewUpdateWithStartWorkflowOperation(
UpdateWorkflowOptions{
UpdateName: "update",
WaitForStage: WorkflowUpdateStageCompleted,
})

ctxWithTimeout, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond)
defer cancel()

_, err := s.workflowClient.ExecuteWorkflow(
ctxWithTimeout,
StartWorkflowOptions{
ID: workflowID,
TaskQueue: taskqueue,
WithStartOperation: updOp,
}, workflowType,
)

var expectedErr *WorkflowUpdateServiceTimeoutOrCanceledError
require.ErrorAs(s.T(), err, &expectedErr)
require.ErrorContains(s.T(), err, tt.expectedErr)
})
}
}

func (s *workflowRunSuite) TestExecuteWorkflowWithUpdate_NonMultiOperationError() {
s.workflowServiceClient.EXPECT().
ExecuteMultiOperation(gomock.Any(), gomock.Any(), gomock.Any()).
Expand Down
Loading