From 12bbccb680b6e1e7d09eb90bc5c637f93dc7e25e Mon Sep 17 00:00:00 2001 From: wxing1292 Date: Wed, 12 Jan 2022 17:57:06 -0800 Subject: [PATCH] Add more tests to history cache (#2373) * Add additional mutable state concurrent access & pinning tests --- service/history/workflow/cache_test.go | 108 +++++++++++++++--- .../history/workflow/history_builder_test.go | 10 +- 2 files changed, 100 insertions(+), 18 deletions(-) diff --git a/service/history/workflow/cache_test.go b/service/history/workflow/cache_test.go index e41efd984e9..35178c4c766 100644 --- a/service/history/workflow/cache_test.go +++ b/service/history/workflow/cache_test.go @@ -27,8 +27,11 @@ package workflow import ( "context" "errors" + "math/rand" "sync" + "sync/atomic" "testing" + "time" "github.com/golang/mock/gomock" "github.com/pborman/uuid" @@ -239,24 +242,34 @@ func (s *historyCacheSuite) TestHistoryCacheClear() { release(nil) } -func (s *historyCacheSuite) TestHistoryCacheConcurrentAccess() { - s.mockShard.GetConfig().HistoryCacheMaxSize = dynamicconfig.GetIntPropertyFn(20) - namespaceID := namespace.ID("test_namespace_id") +func (s *historyCacheSuite) TestHistoryCacheConcurrentAccess_Release() { + cacheMaxSize := 16 + coroutineCount := 50 + + s.mockShard.GetConfig().HistoryCacheMaxSize = dynamicconfig.GetIntPropertyFn(cacheMaxSize) s.cache = NewCache(s.mockShard) - we := commonpb.WorkflowExecution{ - WorkflowId: "wf-cache-test-pinning", - RunId: uuid.New(), - } - coroutineCount := 50 - waitGroup := &sync.WaitGroup{} - stopChan := make(chan struct{}) + startGroup := &sync.WaitGroup{} + stopGroup := &sync.WaitGroup{} + startGroup.Add(coroutineCount) + stopGroup.Add(coroutineCount) + + namespaceID := namespace.ID("test_namespace_id") + workflowId := "wf-cache-test-pinning" + runID := uuid.New() + testFn := func() { - <-stopChan + defer stopGroup.Done() + startGroup.Done() + + startGroup.Wait() ctx, release, err := s.cache.GetOrCreateWorkflowExecution( context.Background(), namespaceID, - we, + commonpb.WorkflowExecution{ + WorkflowId: workflowId, + RunId: runID, + }, CallerTypeAPI, ) s.Nil(err) @@ -268,20 +281,20 @@ func (s *historyCacheSuite) TestHistoryCacheConcurrentAccess() { mock.EXPECT().GetQueryRegistry().Return(NewQueryRegistry()) ctx.(*ContextImpl).MutableState = mock release(errors.New("some random error message")) - waitGroup.Done() } for i := 0; i < coroutineCount; i++ { - waitGroup.Add(1) go testFn() } - close(stopChan) - waitGroup.Wait() + stopGroup.Wait() ctx, release, err := s.cache.GetOrCreateWorkflowExecution( context.Background(), namespaceID, - we, + commonpb.WorkflowExecution{ + WorkflowId: workflowId, + RunId: runID, + }, CallerTypeAPI, ) s.Nil(err) @@ -290,3 +303,64 @@ func (s *historyCacheSuite) TestHistoryCacheConcurrentAccess() { s.Nil(ctx.(*ContextImpl).MutableState) release(nil) } + +func (s *historyCacheSuite) TestHistoryCacheConcurrentAccess_Pin() { + cacheMaxSize := 16 + runIDCount := cacheMaxSize * 4 + coroutineCount := runIDCount * 64 + + s.mockShard.GetConfig().HistoryCacheMaxSize = dynamicconfig.GetIntPropertyFn(cacheMaxSize) + s.mockShard.GetConfig().HistoryCacheTTL = dynamicconfig.GetDurationPropertyFn(time.Nanosecond) + s.cache = NewCache(s.mockShard) + + startGroup := &sync.WaitGroup{} + stopGroup := &sync.WaitGroup{} + startGroup.Add(coroutineCount) + stopGroup.Add(coroutineCount) + + namespaceID := namespace.ID("test_namespace_id") + workflowID := "wf-cache-test-pinning" + runIDs := make([]string, runIDCount) + runIDRefCounter := make([]int32, runIDCount) + for i := 0; i < runIDCount; i++ { + runIDs[i] = uuid.New() + runIDRefCounter[i] = 0 + } + + testFn := func(id int, runID string, refCounter *int32) { + defer stopGroup.Done() + startGroup.Done() + startGroup.Wait() + + var releaseFn ReleaseCacheFunc + var err error + for { + _, releaseFn, err = s.cache.GetOrCreateWorkflowExecution( + context.Background(), + namespaceID, + commonpb.WorkflowExecution{ + WorkflowId: workflowID, + RunId: runID, + }, + CallerTypeAPI, + ) + if err == nil { + break + } + } + if !atomic.CompareAndSwapInt32(refCounter, 0, 1) { + s.Fail("unable to assert lock uniqueness") + } + // randomly sleep few nanoseconds + time.Sleep(time.Duration(rand.Int63n(10))) + if !atomic.CompareAndSwapInt32(refCounter, 1, 0) { + s.Fail("unable to assert lock uniqueness") + } + releaseFn(nil) + } + + for i := 0; i < coroutineCount; i++ { + go testFn(i, runIDs[i%runIDCount], &runIDRefCounter[i%runIDCount]) + } + stopGroup.Wait() +} diff --git a/service/history/workflow/history_builder_test.go b/service/history/workflow/history_builder_test.go index 35077d38ef3..97d64154e5a 100644 --- a/service/history/workflow/history_builder_test.go +++ b/service/history/workflow/history_builder_test.go @@ -128,6 +128,14 @@ func TestHistoryBuilderSuite(t *testing.T) { suite.Run(t, s) } +func (s *historyBuilderSuite) SetupSuite() { + rand.Seed(time.Now().UnixNano()) +} + +func (s *historyBuilderSuite) TearDownSuite() { + +} + func (s *historyBuilderSuite) SetupTest() { s.Assertions = require.New(s.T()) @@ -553,7 +561,7 @@ func (s *historyBuilderSuite) TestWorkflowExecutionContinueAsNew() { WorkflowRunTimeout: workflowRunTimeout, WorkflowTaskTimeout: workflowTaskStartToCloseTimeout, BackoffStartInterval: firstWorkflowTaskBackoff, - Initiator: enumspb.CONTINUE_AS_NEW_INITIATOR_CRON_SCHEDULE, + Initiator: initiator, Failure: testFailure, LastCompletionResult: testPayloads, Memo: testMemo,