diff --git a/common/domain/replication_queue_test.go b/common/domain/replication_queue_test.go index e1e1458dc7b..9fd7174aab1 100644 --- a/common/domain/replication_queue_test.go +++ b/common/domain/replication_queue_test.go @@ -25,12 +25,15 @@ import ( "context" "encoding/binary" "errors" + "sync/atomic" "testing" + "time" "github.com/golang/mock/gomock" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "github.com/uber/cadence/common" "github.com/uber/cadence/common/persistence" "github.com/uber/cadence/common/types" ) @@ -39,6 +42,93 @@ const ( preambleVersion0 byte = 0x59 ) +func TestReplicationQueueImpl_Start(t *testing.T) { + tests := []struct { + name string + initialStatus int32 + expectedStatus int32 + shouldStart bool + }{ + { + name: "Should start when initialized", + initialStatus: common.DaemonStatusInitialized, + expectedStatus: common.DaemonStatusStarted, + shouldStart: true, + }, + { + name: "Should not start when already started", + initialStatus: common.DaemonStatusStarted, + expectedStatus: common.DaemonStatusStarted, + shouldStart: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + ctrl := gomock.NewController(t) + mockQueue := persistence.NewMockQueueManager(ctrl) + rq := NewReplicationQueue(mockQueue, "testCluster", nil, nil).(*replicationQueueImpl) + atomic.StoreInt32(&rq.status, tt.initialStatus) + + rq.Start() + defer rq.Stop() + assert.Equal(t, tt.expectedStatus, atomic.LoadInt32(&rq.status)) + + if tt.shouldStart { + select { + case <-rq.done: + t.Error("purgeProcessor should not have stopped") + case <-time.After(time.Millisecond): + // expected, as the purgeProcessor should still be running + } + } + }) + } +} + +func TestReplicationQueueImpl_Stop(t *testing.T) { + tests := []struct { + name string + initialStatus int32 + expectedStatus int32 + shouldStop bool + }{ + { + name: "Should stop when started", + initialStatus: common.DaemonStatusStarted, + expectedStatus: common.DaemonStatusStopped, + shouldStop: true, + }, + { + name: "Should not stop when not started", + initialStatus: common.DaemonStatusInitialized, + expectedStatus: common.DaemonStatusInitialized, + shouldStop: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + ctrl := gomock.NewController(t) + mockQueue := persistence.NewMockQueueManager(ctrl) + rq := NewReplicationQueue(mockQueue, "testCluster", nil, nil).(*replicationQueueImpl) + atomic.StoreInt32(&rq.status, tt.initialStatus) + + rq.Stop() + assert.Equal(t, tt.expectedStatus, atomic.LoadInt32(&rq.status)) + + if tt.shouldStop { + select { + case <-rq.done: + // expected channel closed + default: + t.Error("done channel should be closed") + } + } + }) + } +} + func TestReplicationQueueImpl_Publish(t *testing.T) { tests := []struct { name string