Skip to content

Commit

Permalink
Update duplicate request error to include request type (#5910)
Browse files Browse the repository at this point in the history
  • Loading branch information
Shaddoll authored Apr 16, 2024
1 parent ba39678 commit 6d8466c
Show file tree
Hide file tree
Showing 8 changed files with 153 additions and 14 deletions.
3 changes: 2 additions & 1 deletion common/persistence/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,8 @@ type (
}

DuplicateRequestError struct {
RunID string
RequestType WorkflowRequestType
RunID string
}
)

Expand Down
3 changes: 2 additions & 1 deletion common/persistence/nosql/nosql_execution_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,8 @@ func (d *nosqlExecutionStore) CreateWorkflowExecution(
}
case conditionFailureErr.DuplicateRequest != nil:
return nil, &persistence.DuplicateRequestError{
RunID: conditionFailureErr.DuplicateRequest.RunID,
RequestType: conditionFailureErr.DuplicateRequest.RequestType,
RunID: conditionFailureErr.DuplicateRequest.RunID,
}
default:
// If ever runs into this branch, there is bug in the code either in here, or in the implementation of nosql plugin
Expand Down
37 changes: 37 additions & 0 deletions common/persistence/nosql/nosql_execution_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,25 @@ func TestCreateWorkflowExecution(t *testing.T) {
expectedResp: nil,
expectedError: fmt.Errorf("unsupported conditionFailureReason error"), // Expected generic error for unexpected conditions
},
{
name: "Duplicate request error",
setupMock: func(mockDB *nosqlplugin.MockDB, shardID int) {
mockDB.EXPECT().IsNotFoundError(gomock.Any()).Return(true).AnyTimes()
mockDB.EXPECT().
InsertWorkflowExecutionWithTasks(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).
Return(&nosqlplugin.WorkflowOperationConditionFailure{
DuplicateRequest: &nosqlplugin.DuplicateRequest{
RequestType: persistence.WorkflowRequestTypeSignal,
RunID: "test-run-id",
},
})
},
expectedResp: nil,
expectedError: &persistence.DuplicateRequestError{
RequestType: persistence.WorkflowRequestTypeSignal,
RunID: "test-run-id",
},
},
}

for _, tc := range tests {
Expand Down Expand Up @@ -206,6 +225,24 @@ func TestUpdateWorkflowExecution(t *testing.T) {
},
expectedError: nil,
},
{
name: "Duplicate request error",
setupMock: func(mockDB *nosqlplugin.MockDB, shardID int) {
mockDB.EXPECT().
UpdateWorkflowExecutionWithTasks(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), nil, gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).
Return(&nosqlplugin.WorkflowOperationConditionFailure{
DuplicateRequest: &nosqlplugin.DuplicateRequest{
RequestType: persistence.WorkflowRequestTypeSignal,
RunID: "test-run-id",
},
})
},
request: newUpdateWorkflowExecutionRequest,
expectedError: &persistence.DuplicateRequestError{
RequestType: persistence.WorkflowRequestTypeSignal,
RunID: "test-run-id",
},
},
{
name: "UpdateWorkflowModeBypassCurrent - assertNotCurrentExecution failure",
setupMock: func(mockDB *nosqlplugin.MockDB, shardID int) {
Expand Down
3 changes: 2 additions & 1 deletion common/persistence/nosql/nosql_execution_store_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -743,7 +743,8 @@ func (d *nosqlExecutionStore) processUpdateWorkflowResult(err error, rangeID int
}
case conditionFailureErr.DuplicateRequest != nil:
return &persistence.DuplicateRequestError{
RunID: conditionFailureErr.DuplicateRequest.RunID,
RequestType: conditionFailureErr.DuplicateRequest.RequestType,
RunID: conditionFailureErr.DuplicateRequest.RunID,
}
default:
// If ever runs into this branch, there is bug in the code either in here, or in the implementation of nosql plugin
Expand Down
33 changes: 29 additions & 4 deletions common/persistence/nosql/nosqlplugin/cassandra/workflow_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,9 +162,14 @@ func executeCreateWorkflowBatchTransaction(
if !ok {
return fmt.Errorf("corrupted data detected. DomainID: %v, WorkflowId: %v, RequestID: %v, RequestType: %v", execution.DomainID, execution.WorkflowID, workflowRequestID, requestRowType)
}
requestType, err := fromRequestRowType(requestRowType)
if err != nil {
return err
}
return &nosqlplugin.WorkflowOperationConditionFailure{
DuplicateRequest: &nosqlplugin.DuplicateRequest{
RunID: runID.String(),
RequestType: requestType,
RunID: runID.String(),
},
}
}
Expand Down Expand Up @@ -343,9 +348,14 @@ func executeUpdateWorkflowBatchTransaction(
if !ok {
return fmt.Errorf("corrupted data detected. DomainID: %v, WorkflowId: %v, RequestID: %v, RequestType: %v", currentWorkflowRequest.Row.DomainID, currentWorkflowRequest.Row.WorkflowID, workflowRequestID, requestRowType)
}
requestType, err := fromRequestRowType(requestRowType)
if err != nil {
return err
}
return &nosqlplugin.WorkflowOperationConditionFailure{
DuplicateRequest: &nosqlplugin.DuplicateRequest{
RunID: runID.String(),
RequestType: requestType,
RunID: runID.String(),
},
}
}
Expand Down Expand Up @@ -1467,7 +1477,7 @@ func isRequestRowType(rowType int) bool {
return false
}

func getRequestRowType(requestType persistence.WorkflowRequestType) (int, error) {
func toRequestRowType(requestType persistence.WorkflowRequestType) (int, error) {
switch requestType {
case persistence.WorkflowRequestTypeStart:
return rowTypeWorkflowRequestStart, nil
Expand All @@ -1482,6 +1492,21 @@ func getRequestRowType(requestType persistence.WorkflowRequestType) (int, error)
}
}

func fromRequestRowType(rowType int) (persistence.WorkflowRequestType, error) {
switch rowType {
case rowTypeWorkflowRequestStart:
return persistence.WorkflowRequestTypeStart, nil
case rowTypeWorkflowRequestSignal:
return persistence.WorkflowRequestTypeSignal, nil
case rowTypeWorkflowRequestCancel:
return persistence.WorkflowRequestTypeCancel, nil
case rowTypeWorkflowRequestReset:
return persistence.WorkflowRequestTypeReset, nil
default:
return persistence.WorkflowRequestType(0), fmt.Errorf("unknown request row type %v", rowType)
}
}

func insertOrUpsertWorkflowRequestRow(
batch gocql.Batch,
requests *nosqlplugin.WorkflowRequestsWriteRequest,
Expand All @@ -1499,7 +1524,7 @@ func insertOrUpsertWorkflowRequestRow(
return fmt.Errorf("unknown workflow request write mode %v", requests.WriteMode)
}
for _, row := range requests.Rows {
rowType, err := getRequestRowType(row.RequestType)
rowType, err := toRequestRowType(row.RequestType)
if err != nil {
return err
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -326,7 +326,7 @@ func TestExecuteCreateWorkflowBatchTransaction(t *testing.T) {
mapExecuteBatchCASApplied: false,
iter: &fakeIter{},
mapExecuteBatchCASPrev: map[string]any{
"type": rowTypeWorkflowRequestStart,
"type": rowTypeWorkflowRequestCancel,
"run_id": uuid.Parse("4b8045c8-7b45-41e0-bf03-1f0d166b818d"),
},
query: &fakeQuery{
Expand All @@ -349,7 +349,8 @@ func TestExecuteCreateWorkflowBatchTransaction(t *testing.T) {
},
wantErr: &nosqlplugin.WorkflowOperationConditionFailure{
DuplicateRequest: &nosqlplugin.DuplicateRequest{
RunID: "6b844fb4-c18a-4979-a2d3-731ebdd1db08",
RequestType: persistence.WorkflowRequestTypeCancel,
RunID: "6b844fb4-c18a-4979-a2d3-731ebdd1db08",
},
},
},
Expand Down Expand Up @@ -671,7 +672,8 @@ func TestExecuteUpdateWorkflowBatchTransaction(t *testing.T) {
},
wantErr: &nosqlplugin.WorkflowOperationConditionFailure{
DuplicateRequest: &nosqlplugin.DuplicateRequest{
RunID: "6b844fb4-c18a-4979-a2d3-731ebdd1db08",
RequestType: persistence.WorkflowRequestTypeSignal,
RunID: "6b844fb4-c18a-4979-a2d3-731ebdd1db08",
},
},
},
Expand Down Expand Up @@ -792,7 +794,7 @@ func TestExecuteUpdateWorkflowBatchTransaction(t *testing.T) {
}
}

func TestGetRequestRowType(t *testing.T) {
func TestToRequestRowType(t *testing.T) {
testCases := []struct {
name string
requestType persistence.WorkflowRequestType
Expand Down Expand Up @@ -838,7 +840,69 @@ func TestGetRequestRowType(t *testing.T) {

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
want, err := getRequestRowType(tc.requestType)
want, err := toRequestRowType(tc.requestType)
gotErr := (err != nil)
if gotErr != tc.wantErr {
t.Fatalf("Got error: %v, want?: %v", err, tc.wantErr)
}
if gotErr {
return
}

if diff := cmp.Diff(tc.want, want); diff != "" {
t.Fatalf("request type mismatch (-want +got):\n%s", diff)
}
})
}
}

func TestFromRequestRowType(t *testing.T) {
testCases := []struct {
name string
requestType int
wantErr bool
want persistence.WorkflowRequestType
}{
{
name: "StartWorkflow request",
requestType: rowTypeWorkflowRequestStart,
wantErr: false,
want: persistence.WorkflowRequestTypeStart,
},
{
name: "SignalWithWorkflow request",
requestType: rowTypeWorkflowRequestSignal,
wantErr: false,
want: persistence.WorkflowRequestTypeSignal,
},
{
name: "SignalWorkflow request",
requestType: rowTypeWorkflowRequestSignal,
wantErr: false,
want: persistence.WorkflowRequestTypeSignal,
},
{
name: "CancelWorkflow request",
requestType: rowTypeWorkflowRequestCancel,
wantErr: false,
want: persistence.WorkflowRequestTypeCancel,
},
{
name: "ResetWorkflow request",
requestType: rowTypeWorkflowRequestReset,
wantErr: false,
want: persistence.WorkflowRequestTypeReset,
},
{
name: "unknown request",
requestType: rowTypeShard,
wantErr: true,
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
want, err := fromRequestRowType(tc.requestType)
gotErr := (err != nil)
if gotErr != tc.wantErr {
t.Fatalf("Got error: %v, want?: %v", err, tc.wantErr)
Expand Down
9 changes: 7 additions & 2 deletions common/persistence/nosql/nosqlplugin/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,11 @@

package nosqlplugin

import "fmt"
import (
"fmt"

"github.com/uber/cadence/common/persistence"
)

// Condition Errors for NoSQL interfaces
type (
Expand All @@ -34,7 +38,8 @@ type (
}

DuplicateRequest struct {
RunID string
RequestType persistence.WorkflowRequestType
RunID string
}

WorkflowExecutionAlreadyExists struct {
Expand Down
5 changes: 5 additions & 0 deletions common/persistence/persistence-tests/executionManagerTest.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import (

"github.com/uber/cadence/common"
"github.com/uber/cadence/common/checksum"
"github.com/uber/cadence/common/persistence"
p "github.com/uber/cadence/common/persistence"
"github.com/uber/cadence/common/types"
)
Expand Down Expand Up @@ -243,6 +244,8 @@ func (s *ExecutionManagerSuite) TestCreateWorkflowExecutionWithWorkflowRequestsD
_, err = s.ExecutionManager.CreateWorkflowExecution(ctx, req)
s.Error(err)
s.IsType(&p.DuplicateRequestError{}, err)
s.Equal(persistence.WorkflowRequestTypeStart, err.(*persistence.DuplicateRequestError).RequestType)
s.Equal(runID, err.(*persistence.DuplicateRequestError).RunID)
req.WorkflowRequestMode = p.CreateWorkflowRequestModeReplicated
_, err = s.ExecutionManager.CreateWorkflowExecution(ctx, req)
s.Error(err)
Expand Down Expand Up @@ -572,6 +575,8 @@ func (s *ExecutionManagerSuite) TestUpdateWorkflowExecutionWithWorkflowRequestsD
_, err = s.ExecutionManager.UpdateWorkflowExecution(ctx, updateReq)
s.Error(err)
s.IsType(&p.DuplicateRequestError{}, err)
s.Equal(persistence.WorkflowRequestTypeSignal, err.(*persistence.DuplicateRequestError).RequestType)
s.Equal(runID, err.(*persistence.DuplicateRequestError).RunID)
updateReq.WorkflowRequestMode = p.CreateWorkflowRequestModeReplicated
_, err = s.ExecutionManager.UpdateWorkflowExecution(ctx, updateReq)
s.Nil(err)
Expand Down

0 comments on commit 6d8466c

Please sign in to comment.