Skip to content

Commit

Permalink
Add more tests to history cache (#2373)
Browse files Browse the repository at this point in the history
* Add additional mutable state concurrent access & pinning tests
  • Loading branch information
wxing1292 authored Jan 13, 2022
1 parent ba30aa8 commit 12bbccb
Show file tree
Hide file tree
Showing 2 changed files with 100 additions and 18 deletions.
108 changes: 91 additions & 17 deletions service/history/workflow/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,11 @@ package workflow
import (
"context"
"errors"
"math/rand"
"sync"
"sync/atomic"
"testing"
"time"

"github.com/golang/mock/gomock"
"github.com/pborman/uuid"
Expand Down Expand Up @@ -239,24 +242,34 @@ func (s *historyCacheSuite) TestHistoryCacheClear() {
release(nil)
}

func (s *historyCacheSuite) TestHistoryCacheConcurrentAccess() {
s.mockShard.GetConfig().HistoryCacheMaxSize = dynamicconfig.GetIntPropertyFn(20)
namespaceID := namespace.ID("test_namespace_id")
func (s *historyCacheSuite) TestHistoryCacheConcurrentAccess_Release() {
cacheMaxSize := 16
coroutineCount := 50

s.mockShard.GetConfig().HistoryCacheMaxSize = dynamicconfig.GetIntPropertyFn(cacheMaxSize)
s.cache = NewCache(s.mockShard)
we := commonpb.WorkflowExecution{
WorkflowId: "wf-cache-test-pinning",
RunId: uuid.New(),
}

coroutineCount := 50
waitGroup := &sync.WaitGroup{}
stopChan := make(chan struct{})
startGroup := &sync.WaitGroup{}
stopGroup := &sync.WaitGroup{}
startGroup.Add(coroutineCount)
stopGroup.Add(coroutineCount)

namespaceID := namespace.ID("test_namespace_id")
workflowId := "wf-cache-test-pinning"
runID := uuid.New()

testFn := func() {
<-stopChan
defer stopGroup.Done()
startGroup.Done()

startGroup.Wait()
ctx, release, err := s.cache.GetOrCreateWorkflowExecution(
context.Background(),
namespaceID,
we,
commonpb.WorkflowExecution{
WorkflowId: workflowId,
RunId: runID,
},
CallerTypeAPI,
)
s.Nil(err)
Expand All @@ -268,20 +281,20 @@ func (s *historyCacheSuite) TestHistoryCacheConcurrentAccess() {
mock.EXPECT().GetQueryRegistry().Return(NewQueryRegistry())
ctx.(*ContextImpl).MutableState = mock
release(errors.New("some random error message"))
waitGroup.Done()
}

for i := 0; i < coroutineCount; i++ {
waitGroup.Add(1)
go testFn()
}
close(stopChan)
waitGroup.Wait()
stopGroup.Wait()

ctx, release, err := s.cache.GetOrCreateWorkflowExecution(
context.Background(),
namespaceID,
we,
commonpb.WorkflowExecution{
WorkflowId: workflowId,
RunId: runID,
},
CallerTypeAPI,
)
s.Nil(err)
Expand All @@ -290,3 +303,64 @@ func (s *historyCacheSuite) TestHistoryCacheConcurrentAccess() {
s.Nil(ctx.(*ContextImpl).MutableState)
release(nil)
}

func (s *historyCacheSuite) TestHistoryCacheConcurrentAccess_Pin() {
cacheMaxSize := 16
runIDCount := cacheMaxSize * 4
coroutineCount := runIDCount * 64

s.mockShard.GetConfig().HistoryCacheMaxSize = dynamicconfig.GetIntPropertyFn(cacheMaxSize)
s.mockShard.GetConfig().HistoryCacheTTL = dynamicconfig.GetDurationPropertyFn(time.Nanosecond)
s.cache = NewCache(s.mockShard)

startGroup := &sync.WaitGroup{}
stopGroup := &sync.WaitGroup{}
startGroup.Add(coroutineCount)
stopGroup.Add(coroutineCount)

namespaceID := namespace.ID("test_namespace_id")
workflowID := "wf-cache-test-pinning"
runIDs := make([]string, runIDCount)
runIDRefCounter := make([]int32, runIDCount)
for i := 0; i < runIDCount; i++ {
runIDs[i] = uuid.New()
runIDRefCounter[i] = 0
}

testFn := func(id int, runID string, refCounter *int32) {
defer stopGroup.Done()
startGroup.Done()
startGroup.Wait()

var releaseFn ReleaseCacheFunc
var err error
for {
_, releaseFn, err = s.cache.GetOrCreateWorkflowExecution(
context.Background(),
namespaceID,
commonpb.WorkflowExecution{
WorkflowId: workflowID,
RunId: runID,
},
CallerTypeAPI,
)
if err == nil {
break
}
}
if !atomic.CompareAndSwapInt32(refCounter, 0, 1) {
s.Fail("unable to assert lock uniqueness")
}
// randomly sleep few nanoseconds
time.Sleep(time.Duration(rand.Int63n(10)))
if !atomic.CompareAndSwapInt32(refCounter, 1, 0) {
s.Fail("unable to assert lock uniqueness")
}
releaseFn(nil)
}

for i := 0; i < coroutineCount; i++ {
go testFn(i, runIDs[i%runIDCount], &runIDRefCounter[i%runIDCount])
}
stopGroup.Wait()
}
10 changes: 9 additions & 1 deletion service/history/workflow/history_builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,14 @@ func TestHistoryBuilderSuite(t *testing.T) {
suite.Run(t, s)
}

func (s *historyBuilderSuite) SetupSuite() {
rand.Seed(time.Now().UnixNano())
}

func (s *historyBuilderSuite) TearDownSuite() {

}

func (s *historyBuilderSuite) SetupTest() {
s.Assertions = require.New(s.T())

Expand Down Expand Up @@ -553,7 +561,7 @@ func (s *historyBuilderSuite) TestWorkflowExecutionContinueAsNew() {
WorkflowRunTimeout: workflowRunTimeout,
WorkflowTaskTimeout: workflowTaskStartToCloseTimeout,
BackoffStartInterval: firstWorkflowTaskBackoff,
Initiator: enumspb.CONTINUE_AS_NEW_INITIATOR_CRON_SCHEDULE,
Initiator: initiator,
Failure: testFailure,
LastCompletionResult: testPayloads,
Memo: testMemo,
Expand Down

0 comments on commit 12bbccb

Please sign in to comment.