From 6d8466c926e584e45c5d56b4ebd39de2b4f95647 Mon Sep 17 00:00:00 2001 From: Zijian Date: Mon, 15 Apr 2024 18:02:35 -0700 Subject: [PATCH] Update duplicate request error to include request type (#5910) --- common/persistence/errors.go | 3 +- .../nosql/nosql_execution_store.go | 3 +- .../nosql/nosql_execution_store_test.go | 37 ++++++++++ .../nosql/nosql_execution_store_util.go | 3 +- .../nosqlplugin/cassandra/workflow_utils.go | 33 ++++++++- .../cassandra/workflow_utils_test.go | 74 +++++++++++++++++-- .../persistence/nosql/nosqlplugin/errors.go | 9 ++- .../persistence-tests/executionManagerTest.go | 5 ++ 8 files changed, 153 insertions(+), 14 deletions(-) diff --git a/common/persistence/errors.go b/common/persistence/errors.go index 16f51040c05..218c9e48610 100644 --- a/common/persistence/errors.go +++ b/common/persistence/errors.go @@ -77,7 +77,8 @@ type ( } DuplicateRequestError struct { - RunID string + RequestType WorkflowRequestType + RunID string } ) diff --git a/common/persistence/nosql/nosql_execution_store.go b/common/persistence/nosql/nosql_execution_store.go index 9671269c367..d0b7da568aa 100644 --- a/common/persistence/nosql/nosql_execution_store.go +++ b/common/persistence/nosql/nosql_execution_store.go @@ -148,7 +148,8 @@ func (d *nosqlExecutionStore) CreateWorkflowExecution( } case conditionFailureErr.DuplicateRequest != nil: return nil, &persistence.DuplicateRequestError{ - RunID: conditionFailureErr.DuplicateRequest.RunID, + RequestType: conditionFailureErr.DuplicateRequest.RequestType, + RunID: conditionFailureErr.DuplicateRequest.RunID, } default: // If ever runs into this branch, there is bug in the code either in here, or in the implementation of nosql plugin diff --git a/common/persistence/nosql/nosql_execution_store_test.go b/common/persistence/nosql/nosql_execution_store_test.go index e495930148b..23a3df597dd 100644 --- a/common/persistence/nosql/nosql_execution_store_test.go +++ b/common/persistence/nosql/nosql_execution_store_test.go @@ -146,6 +146,25 @@ func TestCreateWorkflowExecution(t *testing.T) { expectedResp: nil, expectedError: fmt.Errorf("unsupported conditionFailureReason error"), // Expected generic error for unexpected conditions }, + { + name: "Duplicate request error", + setupMock: func(mockDB *nosqlplugin.MockDB, shardID int) { + mockDB.EXPECT().IsNotFoundError(gomock.Any()).Return(true).AnyTimes() + mockDB.EXPECT(). + InsertWorkflowExecutionWithTasks(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()). + Return(&nosqlplugin.WorkflowOperationConditionFailure{ + DuplicateRequest: &nosqlplugin.DuplicateRequest{ + RequestType: persistence.WorkflowRequestTypeSignal, + RunID: "test-run-id", + }, + }) + }, + expectedResp: nil, + expectedError: &persistence.DuplicateRequestError{ + RequestType: persistence.WorkflowRequestTypeSignal, + RunID: "test-run-id", + }, + }, } for _, tc := range tests { @@ -206,6 +225,24 @@ func TestUpdateWorkflowExecution(t *testing.T) { }, expectedError: nil, }, + { + name: "Duplicate request error", + setupMock: func(mockDB *nosqlplugin.MockDB, shardID int) { + mockDB.EXPECT(). + UpdateWorkflowExecutionWithTasks(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), nil, gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()). + Return(&nosqlplugin.WorkflowOperationConditionFailure{ + DuplicateRequest: &nosqlplugin.DuplicateRequest{ + RequestType: persistence.WorkflowRequestTypeSignal, + RunID: "test-run-id", + }, + }) + }, + request: newUpdateWorkflowExecutionRequest, + expectedError: &persistence.DuplicateRequestError{ + RequestType: persistence.WorkflowRequestTypeSignal, + RunID: "test-run-id", + }, + }, { name: "UpdateWorkflowModeBypassCurrent - assertNotCurrentExecution failure", setupMock: func(mockDB *nosqlplugin.MockDB, shardID int) { diff --git a/common/persistence/nosql/nosql_execution_store_util.go b/common/persistence/nosql/nosql_execution_store_util.go index 85b9efea5ae..537cdbbe336 100644 --- a/common/persistence/nosql/nosql_execution_store_util.go +++ b/common/persistence/nosql/nosql_execution_store_util.go @@ -743,7 +743,8 @@ func (d *nosqlExecutionStore) processUpdateWorkflowResult(err error, rangeID int } case conditionFailureErr.DuplicateRequest != nil: return &persistence.DuplicateRequestError{ - RunID: conditionFailureErr.DuplicateRequest.RunID, + RequestType: conditionFailureErr.DuplicateRequest.RequestType, + RunID: conditionFailureErr.DuplicateRequest.RunID, } default: // If ever runs into this branch, there is bug in the code either in here, or in the implementation of nosql plugin diff --git a/common/persistence/nosql/nosqlplugin/cassandra/workflow_utils.go b/common/persistence/nosql/nosqlplugin/cassandra/workflow_utils.go index dedcb7270e3..1277b273a29 100644 --- a/common/persistence/nosql/nosqlplugin/cassandra/workflow_utils.go +++ b/common/persistence/nosql/nosqlplugin/cassandra/workflow_utils.go @@ -162,9 +162,14 @@ func executeCreateWorkflowBatchTransaction( if !ok { return fmt.Errorf("corrupted data detected. DomainID: %v, WorkflowId: %v, RequestID: %v, RequestType: %v", execution.DomainID, execution.WorkflowID, workflowRequestID, requestRowType) } + requestType, err := fromRequestRowType(requestRowType) + if err != nil { + return err + } return &nosqlplugin.WorkflowOperationConditionFailure{ DuplicateRequest: &nosqlplugin.DuplicateRequest{ - RunID: runID.String(), + RequestType: requestType, + RunID: runID.String(), }, } } @@ -343,9 +348,14 @@ func executeUpdateWorkflowBatchTransaction( if !ok { return fmt.Errorf("corrupted data detected. DomainID: %v, WorkflowId: %v, RequestID: %v, RequestType: %v", currentWorkflowRequest.Row.DomainID, currentWorkflowRequest.Row.WorkflowID, workflowRequestID, requestRowType) } + requestType, err := fromRequestRowType(requestRowType) + if err != nil { + return err + } return &nosqlplugin.WorkflowOperationConditionFailure{ DuplicateRequest: &nosqlplugin.DuplicateRequest{ - RunID: runID.String(), + RequestType: requestType, + RunID: runID.String(), }, } } @@ -1467,7 +1477,7 @@ func isRequestRowType(rowType int) bool { return false } -func getRequestRowType(requestType persistence.WorkflowRequestType) (int, error) { +func toRequestRowType(requestType persistence.WorkflowRequestType) (int, error) { switch requestType { case persistence.WorkflowRequestTypeStart: return rowTypeWorkflowRequestStart, nil @@ -1482,6 +1492,21 @@ func getRequestRowType(requestType persistence.WorkflowRequestType) (int, error) } } +func fromRequestRowType(rowType int) (persistence.WorkflowRequestType, error) { + switch rowType { + case rowTypeWorkflowRequestStart: + return persistence.WorkflowRequestTypeStart, nil + case rowTypeWorkflowRequestSignal: + return persistence.WorkflowRequestTypeSignal, nil + case rowTypeWorkflowRequestCancel: + return persistence.WorkflowRequestTypeCancel, nil + case rowTypeWorkflowRequestReset: + return persistence.WorkflowRequestTypeReset, nil + default: + return persistence.WorkflowRequestType(0), fmt.Errorf("unknown request row type %v", rowType) + } +} + func insertOrUpsertWorkflowRequestRow( batch gocql.Batch, requests *nosqlplugin.WorkflowRequestsWriteRequest, @@ -1499,7 +1524,7 @@ func insertOrUpsertWorkflowRequestRow( return fmt.Errorf("unknown workflow request write mode %v", requests.WriteMode) } for _, row := range requests.Rows { - rowType, err := getRequestRowType(row.RequestType) + rowType, err := toRequestRowType(row.RequestType) if err != nil { return err } diff --git a/common/persistence/nosql/nosqlplugin/cassandra/workflow_utils_test.go b/common/persistence/nosql/nosqlplugin/cassandra/workflow_utils_test.go index 4287d424ffa..92f9a15d2a8 100644 --- a/common/persistence/nosql/nosqlplugin/cassandra/workflow_utils_test.go +++ b/common/persistence/nosql/nosqlplugin/cassandra/workflow_utils_test.go @@ -326,7 +326,7 @@ func TestExecuteCreateWorkflowBatchTransaction(t *testing.T) { mapExecuteBatchCASApplied: false, iter: &fakeIter{}, mapExecuteBatchCASPrev: map[string]any{ - "type": rowTypeWorkflowRequestStart, + "type": rowTypeWorkflowRequestCancel, "run_id": uuid.Parse("4b8045c8-7b45-41e0-bf03-1f0d166b818d"), }, query: &fakeQuery{ @@ -349,7 +349,8 @@ func TestExecuteCreateWorkflowBatchTransaction(t *testing.T) { }, wantErr: &nosqlplugin.WorkflowOperationConditionFailure{ DuplicateRequest: &nosqlplugin.DuplicateRequest{ - RunID: "6b844fb4-c18a-4979-a2d3-731ebdd1db08", + RequestType: persistence.WorkflowRequestTypeCancel, + RunID: "6b844fb4-c18a-4979-a2d3-731ebdd1db08", }, }, }, @@ -671,7 +672,8 @@ func TestExecuteUpdateWorkflowBatchTransaction(t *testing.T) { }, wantErr: &nosqlplugin.WorkflowOperationConditionFailure{ DuplicateRequest: &nosqlplugin.DuplicateRequest{ - RunID: "6b844fb4-c18a-4979-a2d3-731ebdd1db08", + RequestType: persistence.WorkflowRequestTypeSignal, + RunID: "6b844fb4-c18a-4979-a2d3-731ebdd1db08", }, }, }, @@ -792,7 +794,7 @@ func TestExecuteUpdateWorkflowBatchTransaction(t *testing.T) { } } -func TestGetRequestRowType(t *testing.T) { +func TestToRequestRowType(t *testing.T) { testCases := []struct { name string requestType persistence.WorkflowRequestType @@ -838,7 +840,69 @@ func TestGetRequestRowType(t *testing.T) { for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { - want, err := getRequestRowType(tc.requestType) + want, err := toRequestRowType(tc.requestType) + gotErr := (err != nil) + if gotErr != tc.wantErr { + t.Fatalf("Got error: %v, want?: %v", err, tc.wantErr) + } + if gotErr { + return + } + + if diff := cmp.Diff(tc.want, want); diff != "" { + t.Fatalf("request type mismatch (-want +got):\n%s", diff) + } + }) + } +} + +func TestFromRequestRowType(t *testing.T) { + testCases := []struct { + name string + requestType int + wantErr bool + want persistence.WorkflowRequestType + }{ + { + name: "StartWorkflow request", + requestType: rowTypeWorkflowRequestStart, + wantErr: false, + want: persistence.WorkflowRequestTypeStart, + }, + { + name: "SignalWithWorkflow request", + requestType: rowTypeWorkflowRequestSignal, + wantErr: false, + want: persistence.WorkflowRequestTypeSignal, + }, + { + name: "SignalWorkflow request", + requestType: rowTypeWorkflowRequestSignal, + wantErr: false, + want: persistence.WorkflowRequestTypeSignal, + }, + { + name: "CancelWorkflow request", + requestType: rowTypeWorkflowRequestCancel, + wantErr: false, + want: persistence.WorkflowRequestTypeCancel, + }, + { + name: "ResetWorkflow request", + requestType: rowTypeWorkflowRequestReset, + wantErr: false, + want: persistence.WorkflowRequestTypeReset, + }, + { + name: "unknown request", + requestType: rowTypeShard, + wantErr: true, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + want, err := fromRequestRowType(tc.requestType) gotErr := (err != nil) if gotErr != tc.wantErr { t.Fatalf("Got error: %v, want?: %v", err, tc.wantErr) diff --git a/common/persistence/nosql/nosqlplugin/errors.go b/common/persistence/nosql/nosqlplugin/errors.go index 44d7c6747f8..401cb2f7f0c 100644 --- a/common/persistence/nosql/nosqlplugin/errors.go +++ b/common/persistence/nosql/nosqlplugin/errors.go @@ -20,7 +20,11 @@ package nosqlplugin -import "fmt" +import ( + "fmt" + + "github.com/uber/cadence/common/persistence" +) // Condition Errors for NoSQL interfaces type ( @@ -34,7 +38,8 @@ type ( } DuplicateRequest struct { - RunID string + RequestType persistence.WorkflowRequestType + RunID string } WorkflowExecutionAlreadyExists struct { diff --git a/common/persistence/persistence-tests/executionManagerTest.go b/common/persistence/persistence-tests/executionManagerTest.go index f268a20826d..418678d0fb2 100644 --- a/common/persistence/persistence-tests/executionManagerTest.go +++ b/common/persistence/persistence-tests/executionManagerTest.go @@ -39,6 +39,7 @@ import ( "github.com/uber/cadence/common" "github.com/uber/cadence/common/checksum" + "github.com/uber/cadence/common/persistence" p "github.com/uber/cadence/common/persistence" "github.com/uber/cadence/common/types" ) @@ -243,6 +244,8 @@ func (s *ExecutionManagerSuite) TestCreateWorkflowExecutionWithWorkflowRequestsD _, err = s.ExecutionManager.CreateWorkflowExecution(ctx, req) s.Error(err) s.IsType(&p.DuplicateRequestError{}, err) + s.Equal(persistence.WorkflowRequestTypeStart, err.(*persistence.DuplicateRequestError).RequestType) + s.Equal(runID, err.(*persistence.DuplicateRequestError).RunID) req.WorkflowRequestMode = p.CreateWorkflowRequestModeReplicated _, err = s.ExecutionManager.CreateWorkflowExecution(ctx, req) s.Error(err) @@ -572,6 +575,8 @@ func (s *ExecutionManagerSuite) TestUpdateWorkflowExecutionWithWorkflowRequestsD _, err = s.ExecutionManager.UpdateWorkflowExecution(ctx, updateReq) s.Error(err) s.IsType(&p.DuplicateRequestError{}, err) + s.Equal(persistence.WorkflowRequestTypeSignal, err.(*persistence.DuplicateRequestError).RequestType) + s.Equal(runID, err.(*persistence.DuplicateRequestError).RunID) updateReq.WorkflowRequestMode = p.CreateWorkflowRequestModeReplicated _, err = s.ExecutionManager.UpdateWorkflowExecution(ctx, updateReq) s.Nil(err)