Skip to content

Commit

Permalink
Add dynamic config for force search attributes refresh (#2422)
Browse files Browse the repository at this point in the history
  • Loading branch information
meiliang86 authored Jan 27, 2022
1 parent b87d605 commit 6ebaa80
Show file tree
Hide file tree
Showing 6 changed files with 91 additions and 25 deletions.
32 changes: 19 additions & 13 deletions common/dynamicconfig/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,19 +74,20 @@ var Keys = map[Key]string{
EnableReadVisibilityFromES: "system.enableReadVisibilityFromES",
EnableReadFromSecondaryAdvancedVisibility: "system.enableReadFromSecondaryAdvancedVisibility",

HistoryArchivalState: "system.historyArchivalState",
EnableReadFromHistoryArchival: "system.enableReadFromHistoryArchival",
VisibilityArchivalState: "system.visibilityArchivalState",
EnableReadFromVisibilityArchival: "system.enableReadFromVisibilityArchival",
EnableNamespaceNotActiveAutoForwarding: "system.enableNamespaceNotActiveAutoForwarding",
TransactionSizeLimit: "system.transactionSizeLimit",
DisallowQuery: "system.disallowQuery",
EnableBatcher: "worker.enableBatcher",
EnableParentClosePolicyWorker: "system.enableParentClosePolicyWorker",
EnableStickyQuery: "system.enableStickyQuery",
EnablePriorityTaskProcessor: "system.enablePriorityTaskProcessor",
EnableAuthorization: "system.enableAuthorization",
EnableCrossNamespaceCommands: "system.enableCrossNamespaceCommands",
HistoryArchivalState: "system.historyArchivalState",
EnableReadFromHistoryArchival: "system.enableReadFromHistoryArchival",
VisibilityArchivalState: "system.visibilityArchivalState",
EnableReadFromVisibilityArchival: "system.enableReadFromVisibilityArchival",
EnableNamespaceNotActiveAutoForwarding: "system.enableNamespaceNotActiveAutoForwarding",
TransactionSizeLimit: "system.transactionSizeLimit",
DisallowQuery: "system.disallowQuery",
EnableBatcher: "worker.enableBatcher",
EnableParentClosePolicyWorker: "system.enableParentClosePolicyWorker",
EnableStickyQuery: "system.enableStickyQuery",
EnablePriorityTaskProcessor: "system.enablePriorityTaskProcessor",
EnableAuthorization: "system.enableAuthorization",
EnableCrossNamespaceCommands: "system.enableCrossNamespaceCommands",
ForceSearchAttributesCacheRefreshOnRead: "system.forceSearchAttributesCacheRefreshOnRead",

// size limit
BlobSizeLimitError: "limit.blobSize.error",
Expand Down Expand Up @@ -438,6 +439,11 @@ const (
// WorkflowType, ActivityType, SignalName, MarkerName, ErrorReason/FailureReason/CancelCause, Identity, RequestID
MaxIDLengthLimit

// ForceSearchAttributesCacheRefreshOnRead forces refreshing search attributes cache on a read operation, so we always
// get the latest data from DB. This effectively bypasses cache value and is used to facilitate testing of changes in
// search attributes. This should not be turned on in production.
ForceSearchAttributesCacheRefreshOnRead

// key for frontend

// FrontendPersistenceMaxQPS is the max qps frontend host can query DB
Expand Down
4 changes: 2 additions & 2 deletions common/persistence/persistence-tests/persistenceTestBase.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ import (
enumspb "go.temporal.io/api/enums/v1"
historypb "go.temporal.io/api/history/v1"
workflowpb "go.temporal.io/api/workflow/v1"

enumsspb "go.temporal.io/server/api/enums/v1"
persistencespb "go.temporal.io/server/api/persistence/v1"
replicationspb "go.temporal.io/server/api/replication/v1"
Expand All @@ -51,6 +50,7 @@ import (
"go.temporal.io/server/common/config"
"go.temporal.io/server/common/convert"
"go.temporal.io/server/common/definition"
"go.temporal.io/server/common/dynamicconfig"
"go.temporal.io/server/common/log"
"go.temporal.io/server/common/log/tag"
"go.temporal.io/server/common/metrics"
Expand Down Expand Up @@ -213,7 +213,7 @@ func (s *TestBase) Setup(clusterMetadataConfig *cluster.Config) {
s.fatalOnError("NewClusterMetadataManager", err)

s.ClusterMetadata = cluster.NewMetadataFromConfig(clusterMetadataConfig, s.ClusterMetadataManager, s.Logger)
s.SearchAttributesManager = searchattribute.NewManager(clock.NewRealTimeSource(), s.ClusterMetadataManager)
s.SearchAttributesManager = searchattribute.NewManager(clock.NewRealTimeSource(), s.ClusterMetadataManager, dynamicconfig.GetBoolPropertyFn(true))

s.MetadataManager, err = factory.NewMetadataManager()
s.fatalOnError("NewMetadataManager", err)
Expand Down
12 changes: 10 additions & 2 deletions common/resource/fx.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,15 +159,23 @@ func TimeSourceProvider() clock.TimeSource {
func SearchAttributeProviderProvider(
timeSource clock.TimeSource,
cmMgr persistence.ClusterMetadataManager,
dynamicCollection *dynamicconfig.Collection,
) searchattribute.Provider {
return searchattribute.NewManager(timeSource, cmMgr)
return searchattribute.NewManager(
timeSource,
cmMgr,
dynamicCollection.GetBoolProperty(dynamicconfig.ForceSearchAttributesCacheRefreshOnRead, false))
}

func SearchAttributeManagerProvider(
timeSource clock.TimeSource,
cmMgr persistence.ClusterMetadataManager,
dynamicCollection *dynamicconfig.Collection,
) searchattribute.Manager {
return searchattribute.NewManager(timeSource, cmMgr)
return searchattribute.NewManager(
timeSource,
cmMgr,
dynamicCollection.GetBoolProperty(dynamicconfig.ForceSearchAttributesCacheRefreshOnRead, false))
}

func NamespaceRegistryProvider(
Expand Down
10 changes: 8 additions & 2 deletions common/resource/resourceImpl.go
Original file line number Diff line number Diff line change
Expand Up @@ -252,8 +252,14 @@ func New(
return nil, err
}

saProvider := searchattribute.NewManager(clock.NewRealTimeSource(), persistenceBean.GetClusterMetadataManager())
saManager := searchattribute.NewManager(clock.NewRealTimeSource(), persistenceBean.GetClusterMetadataManager())
saProvider := searchattribute.NewManager(
clock.NewRealTimeSource(),
persistenceBean.GetClusterMetadataManager(),
dynamicCollection.GetBoolProperty(dynamicconfig.ForceSearchAttributesCacheRefreshOnRead, false))
saManager := searchattribute.NewManager(
clock.NewRealTimeSource(),
persistenceBean.GetClusterMetadataManager(),
dynamicCollection.GetBoolProperty(dynamicconfig.ForceSearchAttributesCacheRefreshOnRead, false))

namespaceRegistry := namespace.NewRegistry(
persistenceBean.GetMetadataManager(),
Expand Down
7 changes: 5 additions & 2 deletions common/searchattribute/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,9 @@ import (

enumspb "go.temporal.io/api/enums/v1"
"go.temporal.io/api/serviceerror"

persistencespb "go.temporal.io/server/api/persistence/v1"
"go.temporal.io/server/common/clock"
"go.temporal.io/server/common/dynamicconfig"
"go.temporal.io/server/common/persistence"
)

Expand All @@ -46,6 +46,7 @@ type (
managerImpl struct {
timeSource clock.TimeSource
clusterMetadataManager persistence.ClusterMetadataManager
forceRefresh dynamicconfig.BoolPropertyFn

cacheUpdateMutex sync.Mutex
cache atomic.Value // of type cache
Expand All @@ -64,6 +65,7 @@ var _ Manager = (*managerImpl)(nil)
func NewManager(
timeSource clock.TimeSource,
clusterMetadataManager persistence.ClusterMetadataManager,
forceRefresh dynamicconfig.BoolPropertyFn,
) *managerImpl {

var saCache atomic.Value
Expand All @@ -77,6 +79,7 @@ func NewManager(
timeSource: timeSource,
cache: saCache,
clusterMetadataManager: clusterMetadataManager,
forceRefresh: forceRefresh,
}
}

Expand Down Expand Up @@ -113,7 +116,7 @@ func (m *managerImpl) GetSearchAttributes(
}

func (m *managerImpl) needRefreshCache(saCache cache, forceRefreshCache bool, now time.Time) bool {
return forceRefreshCache || saCache.expireOn.Before(now)
return forceRefreshCache || saCache.expireOn.Before(now) || m.forceRefresh()
}

func (m *managerImpl) refreshCache(saCache cache, now time.Time) (cache, error) {
Expand Down
51 changes: 47 additions & 4 deletions common/searchattribute/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,9 @@ import (
"github.com/stretchr/testify/suite"
enumspb "go.temporal.io/api/enums/v1"
"go.temporal.io/api/serviceerror"

persistencespb "go.temporal.io/server/api/persistence/v1"
"go.temporal.io/server/common/clock"
"go.temporal.io/server/common/dynamicconfig"
"go.temporal.io/server/common/log"
"go.temporal.io/server/common/persistence"
)
Expand All @@ -53,6 +53,7 @@ type (
timeSource *clock.EventTimeSource
mockClusterMetadataManager *persistence.MockClusterMetadataManager
manager *managerImpl
forceCacheRefresh bool
}
)

Expand All @@ -69,13 +70,17 @@ func (s *searchAttributesManagerSuite) TearDownSuite() {

func (s *searchAttributesManagerSuite) SetupTest() {
s.Assertions = require.New(s.T())

s.controller = gomock.NewController(s.T())

s.logger = log.NewTestLogger()
s.timeSource = clock.NewEventTimeSource()
s.mockClusterMetadataManager = persistence.NewMockClusterMetadataManager(s.controller)
s.manager = NewManager(s.timeSource, s.mockClusterMetadataManager)
s.manager = NewManager(
s.timeSource,
s.mockClusterMetadataManager,
func(opts ...dynamicconfig.FilterOption) bool {
return s.forceCacheRefresh
},
)
}

func (s *searchAttributesManagerSuite) TearDownTest() {
Expand Down Expand Up @@ -214,6 +219,44 @@ func (s *searchAttributesManagerSuite) TestGetSearchAttributesCache_EmptyIndex()
s.Len(searchAttributes.Custom(), 1)
}

func (s *searchAttributesManagerSuite) TestGetSearchAttributesCache_RefreshIfAbsent() {
s.timeSource.Update(time.Date(2020, 8, 22, 1, 0, 0, 0, time.UTC))

// First call populates cache.
s.mockClusterMetadataManager.EXPECT().GetCurrentClusterMetadata().Return(&persistence.GetClusterMetadataResponse{
ClusterMetadata: persistencespb.ClusterMetadata{
IndexSearchAttributes: map[string]*persistencespb.IndexSearchAttributes{},
},
Version: 1,
}, nil)

s.mockClusterMetadataManager.EXPECT().GetCurrentClusterMetadata().Return(&persistence.GetClusterMetadataResponse{
ClusterMetadata: persistencespb.ClusterMetadata{
IndexSearchAttributes: map[string]*persistencespb.IndexSearchAttributes{
"index-name": {
CustomSearchAttributes: map[string]enumspb.IndexedValueType{
"OrderId": enumspb.INDEXED_VALUE_TYPE_KEYWORD,
}}},
},
Version: 2,
}, nil)

searchAttributes, err := s.manager.GetSearchAttributes("index-name", false)
s.NoError(err)
s.Len(searchAttributes.Custom(), 0)

s.timeSource.Update(time.Date(2020, 8, 22, 1, 0, 1, 0, time.UTC))

searchAttributes, err = s.manager.GetSearchAttributes("index-name", false)
s.NoError(err)
s.Len(searchAttributes.Custom(), 0)

s.forceCacheRefresh = true
searchAttributes, err = s.manager.GetSearchAttributes("index-name", false)
s.NoError(err)
s.Len(searchAttributes.Custom(), 1)
}

func (s *searchAttributesManagerSuite) TestSaveSearchAttributes_UpdateIndex() {
s.mockClusterMetadataManager.EXPECT().GetCurrentClusterMetadata().Return(&persistence.GetClusterMetadataResponse{
ClusterMetadata: persistencespb.ClusterMetadata{
Expand Down

0 comments on commit 6ebaa80

Please sign in to comment.