Skip to content

Commit

Permalink
Add unit tests for ConflictResolveWorkflowExecution (cadence-workflow…
Browse files Browse the repository at this point in the history
  • Loading branch information
Shaddoll authored and ketsiambaku committed Mar 6, 2024
1 parent 34f03c6 commit b2948c4
Show file tree
Hide file tree
Showing 2 changed files with 279 additions and 7 deletions.
16 changes: 9 additions & 7 deletions common/persistence/sql/sql_execution_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ type sqlExecutionStore struct {
assertRunIDAndUpdateCurrentExecutionFn func(context.Context, sqlplugin.Tx, int, serialization.UUID, string, serialization.UUID, serialization.UUID, string, int, int, int64, int64) error
applyWorkflowSnapshotTxAsNewFn func(context.Context, sqlplugin.Tx, int, *p.InternalWorkflowSnapshot, serialization.Parser) error
applyWorkflowMutationTxFn func(context.Context, sqlplugin.Tx, int, *p.InternalWorkflowMutation, serialization.Parser) error
applyWorkflowSnapshotTxAsResetFn func(context.Context, sqlplugin.Tx, int, *p.InternalWorkflowSnapshot, serialization.Parser) error
}

var _ p.ExecutionStore = (*sqlExecutionStore)(nil)
Expand All @@ -78,6 +79,7 @@ func NewSQLExecutionStore(
assertRunIDAndUpdateCurrentExecutionFn: assertRunIDAndUpdateCurrentExecution,
applyWorkflowSnapshotTxAsNewFn: applyWorkflowSnapshotTxAsNew,
applyWorkflowMutationTxFn: applyWorkflowMutationTx,
applyWorkflowSnapshotTxAsResetFn: applyWorkflowSnapshotTxAsReset,
sqlStore: sqlStore{
db: db,
logger: logger,
Expand Down Expand Up @@ -504,7 +506,7 @@ func (m *sqlExecutionStore) ConflictResolveWorkflowExecution(
request *p.InternalConflictResolveWorkflowExecutionRequest,
) error {
dbShardID := sqlplugin.GetDBShardIDFromHistoryShardID(m.shardID, m.db.GetTotalNumDBShards())
return m.txExecuteShardLocked(ctx, dbShardID, "ConflictResolveWorkflowExecution", request.RangeID, func(tx sqlplugin.Tx) error {
return m.txExecuteShardLockedFn(ctx, dbShardID, "ConflictResolveWorkflowExecution", request.RangeID, func(tx sqlplugin.Tx) error {
return m.conflictResolveWorkflowExecutionTx(ctx, tx, request)
})
}
Expand Down Expand Up @@ -535,7 +537,7 @@ func (m *sqlExecutionStore) conflictResolveWorkflowExecutionTx(

switch request.Mode {
case p.ConflictResolveWorkflowModeBypassCurrent:
if err := assertNotCurrentExecution(
if err := m.assertNotCurrentExecutionFn(
ctx,
tx,
shardID,
Expand All @@ -562,7 +564,7 @@ func (m *sqlExecutionStore) conflictResolveWorkflowExecutionTx(
if currentWorkflow != nil {
prevRunID := serialization.MustParseUUID(currentWorkflow.ExecutionInfo.RunID)

if err := assertRunIDAndUpdateCurrentExecution(
if err := m.assertRunIDAndUpdateCurrentExecutionFn(
ctx,
tx,
m.shardID,
Expand All @@ -581,7 +583,7 @@ func (m *sqlExecutionStore) conflictResolveWorkflowExecutionTx(
// reset workflow is current
prevRunID := serialization.MustParseUUID(resetWorkflow.ExecutionInfo.RunID)

if err := assertRunIDAndUpdateCurrentExecution(
if err := m.assertRunIDAndUpdateCurrentExecutionFn(
ctx,
tx,
m.shardID,
Expand All @@ -604,16 +606,16 @@ func (m *sqlExecutionStore) conflictResolveWorkflowExecutionTx(
}
}

if err := applyWorkflowSnapshotTxAsReset(ctx, tx, shardID, &resetWorkflow, m.parser); err != nil {
if err := m.applyWorkflowSnapshotTxAsResetFn(ctx, tx, shardID, &resetWorkflow, m.parser); err != nil {
return err
}
if currentWorkflow != nil {
if err := applyWorkflowMutationTx(ctx, tx, shardID, currentWorkflow, m.parser); err != nil {
if err := m.applyWorkflowMutationTxFn(ctx, tx, shardID, currentWorkflow, m.parser); err != nil {
return err
}
}
if newWorkflow != nil {
if err := applyWorkflowSnapshotTxAsNew(ctx, tx, shardID, newWorkflow, m.parser); err != nil {
if err := m.applyWorkflowSnapshotTxAsNewFn(ctx, tx, shardID, newWorkflow, m.parser); err != nil {
return err
}
}
Expand Down
270 changes: 270 additions & 0 deletions common/persistence/sql/sql_execution_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2493,3 +2493,273 @@ func TestUpdateWorkflowExecution(t *testing.T) {
})
}
}

func TestConflictResolveWorkflowExecution(t *testing.T) {
testCases := []struct {
name string
req *persistence.InternalConflictResolveWorkflowExecutionRequest
assertRunIDAndUpdateCurrentExecutionFn func(context.Context, sqlplugin.Tx, int, serialization.UUID, string, serialization.UUID, serialization.UUID, string, int, int, int64, int64) error
assertNotCurrentExecutionFn func(context.Context, sqlplugin.Tx, int, serialization.UUID, string, serialization.UUID) error
applyWorkflowMutationTxFn func(context.Context, sqlplugin.Tx, int, *persistence.InternalWorkflowMutation, serialization.Parser) error
applyWorkflowSnapshotTxAsResetFn func(context.Context, sqlplugin.Tx, int, *persistence.InternalWorkflowSnapshot, serialization.Parser) error
applyWorkflowSnapshotTxAsNewFn func(context.Context, sqlplugin.Tx, int, *persistence.InternalWorkflowSnapshot, serialization.Parser) error
wantErr bool
assertErr func(t *testing.T, err error)
}{
{
name: "Success - mode bypass current",
req: &persistence.InternalConflictResolveWorkflowExecutionRequest{
RangeID: 1,
Mode: persistence.ConflictResolveWorkflowModeBypassCurrent,
ResetWorkflowSnapshot: persistence.InternalWorkflowSnapshot{
ExecutionInfo: &persistence.InternalWorkflowExecutionInfo{
State: persistence.WorkflowStateCompleted,
},
},
},
assertNotCurrentExecutionFn: func(context.Context, sqlplugin.Tx, int, serialization.UUID, string, serialization.UUID) error {
return nil
},
applyWorkflowSnapshotTxAsResetFn: func(context.Context, sqlplugin.Tx, int, *persistence.InternalWorkflowSnapshot, serialization.Parser) error {
return nil
},
wantErr: false,
},
{
name: "Success - mode update current, current workflow exists",
req: &persistence.InternalConflictResolveWorkflowExecutionRequest{
RangeID: 1,
Mode: persistence.ConflictResolveWorkflowModeUpdateCurrent,
ResetWorkflowSnapshot: persistence.InternalWorkflowSnapshot{
ExecutionInfo: &persistence.InternalWorkflowExecutionInfo{
State: persistence.WorkflowStateCompleted,
},
},
NewWorkflowSnapshot: &persistence.InternalWorkflowSnapshot{
ExecutionInfo: &persistence.InternalWorkflowExecutionInfo{
State: persistence.WorkflowStateCreated,
},
},
CurrentWorkflowMutation: &persistence.InternalWorkflowMutation{
ExecutionInfo: &persistence.InternalWorkflowExecutionInfo{
State: persistence.WorkflowStateCompleted,
},
},
},
assertRunIDAndUpdateCurrentExecutionFn: func(context.Context, sqlplugin.Tx, int, serialization.UUID, string, serialization.UUID, serialization.UUID, string, int, int, int64, int64) error {
return nil
},
applyWorkflowSnapshotTxAsResetFn: func(context.Context, sqlplugin.Tx, int, *persistence.InternalWorkflowSnapshot, serialization.Parser) error {
return nil
},
applyWorkflowMutationTxFn: func(context.Context, sqlplugin.Tx, int, *persistence.InternalWorkflowMutation, serialization.Parser) error {
return nil
},
applyWorkflowSnapshotTxAsNewFn: func(context.Context, sqlplugin.Tx, int, *persistence.InternalWorkflowSnapshot, serialization.Parser) error {
return nil
},
wantErr: false,
},
{
name: "Success - mode update current, no current workflow",
req: &persistence.InternalConflictResolveWorkflowExecutionRequest{
RangeID: 1,
Mode: persistence.ConflictResolveWorkflowModeUpdateCurrent,
ResetWorkflowSnapshot: persistence.InternalWorkflowSnapshot{
ExecutionInfo: &persistence.InternalWorkflowExecutionInfo{
State: persistence.WorkflowStateCompleted,
},
},
NewWorkflowSnapshot: &persistence.InternalWorkflowSnapshot{
ExecutionInfo: &persistence.InternalWorkflowExecutionInfo{
State: persistence.WorkflowStateCreated,
},
},
},
assertRunIDAndUpdateCurrentExecutionFn: func(context.Context, sqlplugin.Tx, int, serialization.UUID, string, serialization.UUID, serialization.UUID, string, int, int, int64, int64) error {
return nil
},
applyWorkflowSnapshotTxAsResetFn: func(context.Context, sqlplugin.Tx, int, *persistence.InternalWorkflowSnapshot, serialization.Parser) error {
return nil
},
applyWorkflowSnapshotTxAsNewFn: func(context.Context, sqlplugin.Tx, int, *persistence.InternalWorkflowSnapshot, serialization.Parser) error {
return nil
},
wantErr: false,
},
{
name: "Error - mode state validation failed",
req: &persistence.InternalConflictResolveWorkflowExecutionRequest{
RangeID: 1,
Mode: persistence.ConflictResolveWorkflowModeUpdateCurrent,
ResetWorkflowSnapshot: persistence.InternalWorkflowSnapshot{
ExecutionInfo: &persistence.InternalWorkflowExecutionInfo{
State: persistence.WorkflowStateZombie,
},
},
},
wantErr: true,
},
{
name: "Error - assertNotCurrentExecution failed",
req: &persistence.InternalConflictResolveWorkflowExecutionRequest{
RangeID: 1,
Mode: persistence.ConflictResolveWorkflowModeBypassCurrent,
ResetWorkflowSnapshot: persistence.InternalWorkflowSnapshot{
ExecutionInfo: &persistence.InternalWorkflowExecutionInfo{
State: persistence.WorkflowStateCompleted,
},
},
},
assertNotCurrentExecutionFn: func(context.Context, sqlplugin.Tx, int, serialization.UUID, string, serialization.UUID) error {
return errors.New("some random error")
},
wantErr: true,
},
{
name: "Error - assertRunIDAndUpdateCurrentExecution failed",
req: &persistence.InternalConflictResolveWorkflowExecutionRequest{
RangeID: 1,
Mode: persistence.ConflictResolveWorkflowModeUpdateCurrent,
ResetWorkflowSnapshot: persistence.InternalWorkflowSnapshot{
ExecutionInfo: &persistence.InternalWorkflowExecutionInfo{
State: persistence.WorkflowStateCompleted,
},
},
NewWorkflowSnapshot: &persistence.InternalWorkflowSnapshot{
ExecutionInfo: &persistence.InternalWorkflowExecutionInfo{
State: persistence.WorkflowStateCreated,
},
},
CurrentWorkflowMutation: &persistence.InternalWorkflowMutation{
ExecutionInfo: &persistence.InternalWorkflowExecutionInfo{
State: persistence.WorkflowStateCompleted,
},
},
},
assertRunIDAndUpdateCurrentExecutionFn: func(context.Context, sqlplugin.Tx, int, serialization.UUID, string, serialization.UUID, serialization.UUID, string, int, int, int64, int64) error {
return errors.New("some random error")
},
wantErr: true,
},
{
name: "Error - applyWorkflowResetSnapshotTx failed",
req: &persistence.InternalConflictResolveWorkflowExecutionRequest{
RangeID: 1,
Mode: persistence.ConflictResolveWorkflowModeBypassCurrent,
ResetWorkflowSnapshot: persistence.InternalWorkflowSnapshot{
ExecutionInfo: &persistence.InternalWorkflowExecutionInfo{
State: persistence.WorkflowStateCompleted,
},
},
},
assertNotCurrentExecutionFn: func(context.Context, sqlplugin.Tx, int, serialization.UUID, string, serialization.UUID) error {
return nil
},
applyWorkflowSnapshotTxAsResetFn: func(context.Context, sqlplugin.Tx, int, *persistence.InternalWorkflowSnapshot, serialization.Parser) error {
return errors.New("some random error")
},
wantErr: true,
},
{
name: "Error - applyWorkflowMutationTxFn failed",
req: &persistence.InternalConflictResolveWorkflowExecutionRequest{
RangeID: 1,
Mode: persistence.ConflictResolveWorkflowModeUpdateCurrent,
ResetWorkflowSnapshot: persistence.InternalWorkflowSnapshot{
ExecutionInfo: &persistence.InternalWorkflowExecutionInfo{
State: persistence.WorkflowStateCompleted,
},
},
NewWorkflowSnapshot: &persistence.InternalWorkflowSnapshot{
ExecutionInfo: &persistence.InternalWorkflowExecutionInfo{
State: persistence.WorkflowStateCreated,
},
},
CurrentWorkflowMutation: &persistence.InternalWorkflowMutation{
ExecutionInfo: &persistence.InternalWorkflowExecutionInfo{
State: persistence.WorkflowStateCompleted,
},
},
},
assertRunIDAndUpdateCurrentExecutionFn: func(context.Context, sqlplugin.Tx, int, serialization.UUID, string, serialization.UUID, serialization.UUID, string, int, int, int64, int64) error {
return nil
},
applyWorkflowSnapshotTxAsResetFn: func(context.Context, sqlplugin.Tx, int, *persistence.InternalWorkflowSnapshot, serialization.Parser) error {
return nil
},
applyWorkflowMutationTxFn: func(context.Context, sqlplugin.Tx, int, *persistence.InternalWorkflowMutation, serialization.Parser) error {
return errors.New("some random error")
},
wantErr: true,
},
{
name: "Error - applyWorkflowSnapshotTxAsNew failed",
req: &persistence.InternalConflictResolveWorkflowExecutionRequest{
RangeID: 1,
Mode: persistence.ConflictResolveWorkflowModeUpdateCurrent,
ResetWorkflowSnapshot: persistence.InternalWorkflowSnapshot{
ExecutionInfo: &persistence.InternalWorkflowExecutionInfo{
State: persistence.WorkflowStateCompleted,
},
},
NewWorkflowSnapshot: &persistence.InternalWorkflowSnapshot{
ExecutionInfo: &persistence.InternalWorkflowExecutionInfo{
State: persistence.WorkflowStateCreated,
},
},
CurrentWorkflowMutation: &persistence.InternalWorkflowMutation{
ExecutionInfo: &persistence.InternalWorkflowExecutionInfo{
State: persistence.WorkflowStateCompleted,
},
},
},
assertRunIDAndUpdateCurrentExecutionFn: func(context.Context, sqlplugin.Tx, int, serialization.UUID, string, serialization.UUID, serialization.UUID, string, int, int, int64, int64) error {
return nil
},
applyWorkflowSnapshotTxAsResetFn: func(context.Context, sqlplugin.Tx, int, *persistence.InternalWorkflowSnapshot, serialization.Parser) error {
return nil
},
applyWorkflowMutationTxFn: func(context.Context, sqlplugin.Tx, int, *persistence.InternalWorkflowMutation, serialization.Parser) error {
return nil
},
applyWorkflowSnapshotTxAsNewFn: func(context.Context, sqlplugin.Tx, int, *persistence.InternalWorkflowSnapshot, serialization.Parser) error {
return errors.New("some random error")
},
wantErr: true,
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
ctrl := gomock.NewController(t)
mockDB := sqlplugin.NewMockDB(ctrl)
mockDB.EXPECT().GetTotalNumDBShards().Return(1)
s := &sqlExecutionStore{
shardID: 0,
sqlStore: sqlStore{
db: mockDB,
logger: testlogger.New(t),
},
txExecuteShardLockedFn: func(_ context.Context, _ int, _ string, _ int64, fn func(sqlplugin.Tx) error) error {
return fn(nil)
},
assertNotCurrentExecutionFn: tc.assertNotCurrentExecutionFn,
assertRunIDAndUpdateCurrentExecutionFn: tc.assertRunIDAndUpdateCurrentExecutionFn,
applyWorkflowMutationTxFn: tc.applyWorkflowMutationTxFn,
applyWorkflowSnapshotTxAsResetFn: tc.applyWorkflowSnapshotTxAsResetFn,
applyWorkflowSnapshotTxAsNewFn: tc.applyWorkflowSnapshotTxAsNewFn,
}

err := s.ConflictResolveWorkflowExecution(context.Background(), tc.req)
if tc.wantErr {
assert.Error(t, err, "Expected an error for test case")
if tc.assertErr != nil {
tc.assertErr(t, err)
}
} else {
assert.NoError(t, err, "Did not expect an error for test case")
}
})
}
}

0 comments on commit b2948c4

Please sign in to comment.