From 6ebaa80d44a878c35d8faeba5d6e1a34bf67618f Mon Sep 17 00:00:00 2001 From: Liang Mei Date: Wed, 26 Jan 2022 22:25:25 -0800 Subject: [PATCH] Add dynamic config for force search attributes refresh (#2422) --- common/dynamicconfig/constants.go | 32 +++++++----- .../persistence-tests/persistenceTestBase.go | 4 +- common/resource/fx.go | 12 ++++- common/resource/resourceImpl.go | 10 +++- common/searchattribute/manager.go | 7 ++- common/searchattribute/manager_test.go | 51 +++++++++++++++++-- 6 files changed, 91 insertions(+), 25 deletions(-) diff --git a/common/dynamicconfig/constants.go b/common/dynamicconfig/constants.go index 4a15b8bc6f3..9d91408b472 100644 --- a/common/dynamicconfig/constants.go +++ b/common/dynamicconfig/constants.go @@ -74,19 +74,20 @@ var Keys = map[Key]string{ EnableReadVisibilityFromES: "system.enableReadVisibilityFromES", EnableReadFromSecondaryAdvancedVisibility: "system.enableReadFromSecondaryAdvancedVisibility", - HistoryArchivalState: "system.historyArchivalState", - EnableReadFromHistoryArchival: "system.enableReadFromHistoryArchival", - VisibilityArchivalState: "system.visibilityArchivalState", - EnableReadFromVisibilityArchival: "system.enableReadFromVisibilityArchival", - EnableNamespaceNotActiveAutoForwarding: "system.enableNamespaceNotActiveAutoForwarding", - TransactionSizeLimit: "system.transactionSizeLimit", - DisallowQuery: "system.disallowQuery", - EnableBatcher: "worker.enableBatcher", - EnableParentClosePolicyWorker: "system.enableParentClosePolicyWorker", - EnableStickyQuery: "system.enableStickyQuery", - EnablePriorityTaskProcessor: "system.enablePriorityTaskProcessor", - EnableAuthorization: "system.enableAuthorization", - EnableCrossNamespaceCommands: "system.enableCrossNamespaceCommands", + HistoryArchivalState: "system.historyArchivalState", + EnableReadFromHistoryArchival: "system.enableReadFromHistoryArchival", + VisibilityArchivalState: "system.visibilityArchivalState", + EnableReadFromVisibilityArchival: "system.enableReadFromVisibilityArchival", + EnableNamespaceNotActiveAutoForwarding: "system.enableNamespaceNotActiveAutoForwarding", + TransactionSizeLimit: "system.transactionSizeLimit", + DisallowQuery: "system.disallowQuery", + EnableBatcher: "worker.enableBatcher", + EnableParentClosePolicyWorker: "system.enableParentClosePolicyWorker", + EnableStickyQuery: "system.enableStickyQuery", + EnablePriorityTaskProcessor: "system.enablePriorityTaskProcessor", + EnableAuthorization: "system.enableAuthorization", + EnableCrossNamespaceCommands: "system.enableCrossNamespaceCommands", + ForceSearchAttributesCacheRefreshOnRead: "system.forceSearchAttributesCacheRefreshOnRead", // size limit BlobSizeLimitError: "limit.blobSize.error", @@ -438,6 +439,11 @@ const ( // WorkflowType, ActivityType, SignalName, MarkerName, ErrorReason/FailureReason/CancelCause, Identity, RequestID MaxIDLengthLimit + // ForceSearchAttributesCacheRefreshOnRead forces refreshing search attributes cache on a read operation, so we always + // get the latest data from DB. This effectively bypasses cache value and is used to facilitate testing of changes in + // search attributes. This should not be turned on in production. + ForceSearchAttributesCacheRefreshOnRead + // key for frontend // FrontendPersistenceMaxQPS is the max qps frontend host can query DB diff --git a/common/persistence/persistence-tests/persistenceTestBase.go b/common/persistence/persistence-tests/persistenceTestBase.go index 3871ae8bef6..daf635d7423 100644 --- a/common/persistence/persistence-tests/persistenceTestBase.go +++ b/common/persistence/persistence-tests/persistenceTestBase.go @@ -40,7 +40,6 @@ import ( enumspb "go.temporal.io/api/enums/v1" historypb "go.temporal.io/api/history/v1" workflowpb "go.temporal.io/api/workflow/v1" - enumsspb "go.temporal.io/server/api/enums/v1" persistencespb "go.temporal.io/server/api/persistence/v1" replicationspb "go.temporal.io/server/api/replication/v1" @@ -51,6 +50,7 @@ import ( "go.temporal.io/server/common/config" "go.temporal.io/server/common/convert" "go.temporal.io/server/common/definition" + "go.temporal.io/server/common/dynamicconfig" "go.temporal.io/server/common/log" "go.temporal.io/server/common/log/tag" "go.temporal.io/server/common/metrics" @@ -213,7 +213,7 @@ func (s *TestBase) Setup(clusterMetadataConfig *cluster.Config) { s.fatalOnError("NewClusterMetadataManager", err) s.ClusterMetadata = cluster.NewMetadataFromConfig(clusterMetadataConfig, s.ClusterMetadataManager, s.Logger) - s.SearchAttributesManager = searchattribute.NewManager(clock.NewRealTimeSource(), s.ClusterMetadataManager) + s.SearchAttributesManager = searchattribute.NewManager(clock.NewRealTimeSource(), s.ClusterMetadataManager, dynamicconfig.GetBoolPropertyFn(true)) s.MetadataManager, err = factory.NewMetadataManager() s.fatalOnError("NewMetadataManager", err) diff --git a/common/resource/fx.go b/common/resource/fx.go index f9528e9fa2b..6fbe6560d7e 100644 --- a/common/resource/fx.go +++ b/common/resource/fx.go @@ -159,15 +159,23 @@ func TimeSourceProvider() clock.TimeSource { func SearchAttributeProviderProvider( timeSource clock.TimeSource, cmMgr persistence.ClusterMetadataManager, + dynamicCollection *dynamicconfig.Collection, ) searchattribute.Provider { - return searchattribute.NewManager(timeSource, cmMgr) + return searchattribute.NewManager( + timeSource, + cmMgr, + dynamicCollection.GetBoolProperty(dynamicconfig.ForceSearchAttributesCacheRefreshOnRead, false)) } func SearchAttributeManagerProvider( timeSource clock.TimeSource, cmMgr persistence.ClusterMetadataManager, + dynamicCollection *dynamicconfig.Collection, ) searchattribute.Manager { - return searchattribute.NewManager(timeSource, cmMgr) + return searchattribute.NewManager( + timeSource, + cmMgr, + dynamicCollection.GetBoolProperty(dynamicconfig.ForceSearchAttributesCacheRefreshOnRead, false)) } func NamespaceRegistryProvider( diff --git a/common/resource/resourceImpl.go b/common/resource/resourceImpl.go index 6c2e399cb4b..39b3c8103b3 100644 --- a/common/resource/resourceImpl.go +++ b/common/resource/resourceImpl.go @@ -252,8 +252,14 @@ func New( return nil, err } - saProvider := searchattribute.NewManager(clock.NewRealTimeSource(), persistenceBean.GetClusterMetadataManager()) - saManager := searchattribute.NewManager(clock.NewRealTimeSource(), persistenceBean.GetClusterMetadataManager()) + saProvider := searchattribute.NewManager( + clock.NewRealTimeSource(), + persistenceBean.GetClusterMetadataManager(), + dynamicCollection.GetBoolProperty(dynamicconfig.ForceSearchAttributesCacheRefreshOnRead, false)) + saManager := searchattribute.NewManager( + clock.NewRealTimeSource(), + persistenceBean.GetClusterMetadataManager(), + dynamicCollection.GetBoolProperty(dynamicconfig.ForceSearchAttributesCacheRefreshOnRead, false)) namespaceRegistry := namespace.NewRegistry( persistenceBean.GetMetadataManager(), diff --git a/common/searchattribute/manager.go b/common/searchattribute/manager.go index 69856303078..5363e9303d1 100644 --- a/common/searchattribute/manager.go +++ b/common/searchattribute/manager.go @@ -31,9 +31,9 @@ import ( enumspb "go.temporal.io/api/enums/v1" "go.temporal.io/api/serviceerror" - persistencespb "go.temporal.io/server/api/persistence/v1" "go.temporal.io/server/common/clock" + "go.temporal.io/server/common/dynamicconfig" "go.temporal.io/server/common/persistence" ) @@ -46,6 +46,7 @@ type ( managerImpl struct { timeSource clock.TimeSource clusterMetadataManager persistence.ClusterMetadataManager + forceRefresh dynamicconfig.BoolPropertyFn cacheUpdateMutex sync.Mutex cache atomic.Value // of type cache @@ -64,6 +65,7 @@ var _ Manager = (*managerImpl)(nil) func NewManager( timeSource clock.TimeSource, clusterMetadataManager persistence.ClusterMetadataManager, + forceRefresh dynamicconfig.BoolPropertyFn, ) *managerImpl { var saCache atomic.Value @@ -77,6 +79,7 @@ func NewManager( timeSource: timeSource, cache: saCache, clusterMetadataManager: clusterMetadataManager, + forceRefresh: forceRefresh, } } @@ -113,7 +116,7 @@ func (m *managerImpl) GetSearchAttributes( } func (m *managerImpl) needRefreshCache(saCache cache, forceRefreshCache bool, now time.Time) bool { - return forceRefreshCache || saCache.expireOn.Before(now) + return forceRefreshCache || saCache.expireOn.Before(now) || m.forceRefresh() } func (m *managerImpl) refreshCache(saCache cache, now time.Time) (cache, error) { diff --git a/common/searchattribute/manager_test.go b/common/searchattribute/manager_test.go index 0772247c5c0..561b88911ba 100644 --- a/common/searchattribute/manager_test.go +++ b/common/searchattribute/manager_test.go @@ -35,9 +35,9 @@ import ( "github.com/stretchr/testify/suite" enumspb "go.temporal.io/api/enums/v1" "go.temporal.io/api/serviceerror" - persistencespb "go.temporal.io/server/api/persistence/v1" "go.temporal.io/server/common/clock" + "go.temporal.io/server/common/dynamicconfig" "go.temporal.io/server/common/log" "go.temporal.io/server/common/persistence" ) @@ -53,6 +53,7 @@ type ( timeSource *clock.EventTimeSource mockClusterMetadataManager *persistence.MockClusterMetadataManager manager *managerImpl + forceCacheRefresh bool } ) @@ -69,13 +70,17 @@ func (s *searchAttributesManagerSuite) TearDownSuite() { func (s *searchAttributesManagerSuite) SetupTest() { s.Assertions = require.New(s.T()) - s.controller = gomock.NewController(s.T()) - s.logger = log.NewTestLogger() s.timeSource = clock.NewEventTimeSource() s.mockClusterMetadataManager = persistence.NewMockClusterMetadataManager(s.controller) - s.manager = NewManager(s.timeSource, s.mockClusterMetadataManager) + s.manager = NewManager( + s.timeSource, + s.mockClusterMetadataManager, + func(opts ...dynamicconfig.FilterOption) bool { + return s.forceCacheRefresh + }, + ) } func (s *searchAttributesManagerSuite) TearDownTest() { @@ -214,6 +219,44 @@ func (s *searchAttributesManagerSuite) TestGetSearchAttributesCache_EmptyIndex() s.Len(searchAttributes.Custom(), 1) } +func (s *searchAttributesManagerSuite) TestGetSearchAttributesCache_RefreshIfAbsent() { + s.timeSource.Update(time.Date(2020, 8, 22, 1, 0, 0, 0, time.UTC)) + + // First call populates cache. + s.mockClusterMetadataManager.EXPECT().GetCurrentClusterMetadata().Return(&persistence.GetClusterMetadataResponse{ + ClusterMetadata: persistencespb.ClusterMetadata{ + IndexSearchAttributes: map[string]*persistencespb.IndexSearchAttributes{}, + }, + Version: 1, + }, nil) + + s.mockClusterMetadataManager.EXPECT().GetCurrentClusterMetadata().Return(&persistence.GetClusterMetadataResponse{ + ClusterMetadata: persistencespb.ClusterMetadata{ + IndexSearchAttributes: map[string]*persistencespb.IndexSearchAttributes{ + "index-name": { + CustomSearchAttributes: map[string]enumspb.IndexedValueType{ + "OrderId": enumspb.INDEXED_VALUE_TYPE_KEYWORD, + }}}, + }, + Version: 2, + }, nil) + + searchAttributes, err := s.manager.GetSearchAttributes("index-name", false) + s.NoError(err) + s.Len(searchAttributes.Custom(), 0) + + s.timeSource.Update(time.Date(2020, 8, 22, 1, 0, 1, 0, time.UTC)) + + searchAttributes, err = s.manager.GetSearchAttributes("index-name", false) + s.NoError(err) + s.Len(searchAttributes.Custom(), 0) + + s.forceCacheRefresh = true + searchAttributes, err = s.manager.GetSearchAttributes("index-name", false) + s.NoError(err) + s.Len(searchAttributes.Custom(), 1) +} + func (s *searchAttributesManagerSuite) TestSaveSearchAttributes_UpdateIndex() { s.mockClusterMetadataManager.EXPECT().GetCurrentClusterMetadata().Return(&persistence.GetClusterMetadataResponse{ ClusterMetadata: persistencespb.ClusterMetadata{