From aceda146cfdc430d897092b86b8c02ed251ce9ed Mon Sep 17 00:00:00 2001 From: Yu Xia Date: Thu, 27 Jan 2022 16:45:51 -0800 Subject: [PATCH] Update XDC integration test to use add remote cluster api (#2424) * Update XDC integration test to use add remote cluster api --- common/cluster/metadata.go | 16 +++++++++++--- common/cluster/metadata_test.go | 4 ++++ common/dynamicconfig/constants.go | 3 +++ .../persistence-tests/persistenceTestBase.go | 2 +- common/resource/resourceImpl.go | 1 + host/dynamicconfig.go | 1 + .../testdata/xdc_integration_es_clusters.yaml | 12 +--------- .../xdc_integration_test_clusters.yaml | 12 +--------- host/xdc/elasticsearch_test.go | 18 +++++++++++++++ host/xdc/integration_failover_test.go | 22 +++++++++++++++++++ tools/cli/namespaceUtils.go | 1 + 11 files changed, 66 insertions(+), 26 deletions(-) diff --git a/common/cluster/metadata.go b/common/cluster/metadata.go index a217d01fd0e..a5995ed9753 100644 --- a/common/cluster/metadata.go +++ b/common/cluster/metadata.go @@ -33,6 +33,8 @@ import ( "sync/atomic" "time" + "go.temporal.io/server/common/dynamicconfig" + "go.temporal.io/server/common" "go.temporal.io/server/common/collection" "go.temporal.io/server/common/log" @@ -44,7 +46,6 @@ import ( const ( defaultClusterMetadataPageSize = 100 refreshInterval = time.Minute - refreshFailureInterval = time.Second * 30 FakeClusterForEmptyVersion = "fake-cluster-for-empty-version" ) @@ -106,6 +107,7 @@ type ( status int32 clusterMetadataStore persistence.ClusterMetadataManager refresher *goro.Handle + refreshDuration dynamicconfig.DurationPropertyFn logger log.Logger // Immutable fields @@ -139,6 +141,7 @@ func NewMetadata( currentClusterName string, clusterInfo map[string]ClusterInformation, clusterMetadataStore persistence.ClusterMetadataManager, + refreshDuration dynamicconfig.DurationPropertyFn, logger log.Logger, ) Metadata { if len(clusterInfo) == 0 { @@ -166,6 +169,9 @@ func NewMetadata( for k, v := range clusterInfo { copyClusterInfo[k] = v } + if refreshDuration == nil { + refreshDuration = dynamicconfig.GetDurationPropertyFn(refreshInterval) + } return &metadataImpl{ status: common.DaemonStatusInitialized, enableGlobalNamespace: enableGlobalNamespace, @@ -177,12 +183,14 @@ func NewMetadata( clusterChangeCallback: make(map[string]CallbackFn), clusterMetadataStore: clusterMetadataStore, logger: logger, + refreshDuration: refreshDuration, } } func NewMetadataFromConfig( config *Config, clusterMetadataStore persistence.ClusterMetadataManager, + dynamicCollection *dynamicconfig.Collection, logger log.Logger, ) Metadata { return NewMetadata( @@ -192,6 +200,7 @@ func NewMetadataFromConfig( config.CurrentClusterName, config.ClusterInformation, clusterMetadataStore, + dynamicCollection.GetDurationProperty(dynamicconfig.ClusterMetadataRefreshInterval, refreshInterval), logger, ) } @@ -206,6 +215,7 @@ func NewMetadataForTest( config.CurrentClusterName, config.ClusterInformation, nil, + nil, log.NewNoopLogger(), ) } @@ -344,7 +354,7 @@ func (m *metadataImpl) UnRegisterMetadataChangeCallback(callbackId string) { } func (m *metadataImpl) refreshLoop(ctx context.Context) error { - timer := time.NewTicker(refreshInterval) + timer := time.NewTicker(m.refreshDuration()) defer timer.Stop() for { @@ -355,7 +365,7 @@ func (m *metadataImpl) refreshLoop(ctx context.Context) error { for err := m.refreshClusterMetadata(ctx); err != nil; err = m.refreshClusterMetadata(ctx) { m.logger.Error("Error refreshing remote cluster metadata", tag.Error(err)) select { - case <-time.After(refreshFailureInterval): + case <-time.After(m.refreshDuration() / 2): case <-ctx.Done(): return nil } diff --git a/common/cluster/metadata_test.go b/common/cluster/metadata_test.go index 90f379d2df1..f8324acdf18 100644 --- a/common/cluster/metadata_test.go +++ b/common/cluster/metadata_test.go @@ -26,6 +26,9 @@ package cluster import ( "context" "testing" + "time" + + "go.temporal.io/server/common/dynamicconfig" "github.com/golang/mock/gomock" "github.com/pborman/uuid" @@ -95,6 +98,7 @@ func (s *metadataSuite) SetupTest() { s.clusterName, clusterInfo, s.mockClusterMetadataStore, + dynamicconfig.GetDurationPropertyFn(time.Second), log.NewNoopLogger(), ).(*metadataImpl) } diff --git a/common/dynamicconfig/constants.go b/common/dynamicconfig/constants.go index 9d91408b472..1d3362afc0e 100644 --- a/common/dynamicconfig/constants.go +++ b/common/dynamicconfig/constants.go @@ -88,6 +88,7 @@ var Keys = map[Key]string{ EnableAuthorization: "system.enableAuthorization", EnableCrossNamespaceCommands: "system.enableCrossNamespaceCommands", ForceSearchAttributesCacheRefreshOnRead: "system.forceSearchAttributesCacheRefreshOnRead", + ClusterMetadataRefreshInterval: "system.clusterMetadataRefreshInterval", // size limit BlobSizeLimitError: "limit.blobSize.error", @@ -418,6 +419,8 @@ const ( EnableAuthorization // EnableCrossNamespaceCommands is the key to enable commands for external namespaces EnableCrossNamespaceCommands + // ClusterMetadataRefreshInterval is config to manage cluster metadata table refresh interval + ClusterMetadataRefreshInterval // BlobSizeLimitError is the per event blob size limit BlobSizeLimitError // BlobSizeLimitWarn is the per event blob size limit for warning diff --git a/common/persistence/persistence-tests/persistenceTestBase.go b/common/persistence/persistence-tests/persistenceTestBase.go index daf635d7423..be775e70763 100644 --- a/common/persistence/persistence-tests/persistenceTestBase.go +++ b/common/persistence/persistence-tests/persistenceTestBase.go @@ -212,7 +212,7 @@ func (s *TestBase) Setup(clusterMetadataConfig *cluster.Config) { s.ClusterMetadataManager, err = factory.NewClusterMetadataManager() s.fatalOnError("NewClusterMetadataManager", err) - s.ClusterMetadata = cluster.NewMetadataFromConfig(clusterMetadataConfig, s.ClusterMetadataManager, s.Logger) + s.ClusterMetadata = cluster.NewMetadataFromConfig(clusterMetadataConfig, s.ClusterMetadataManager, dynamicconfig.NewNoopCollection(), s.Logger) s.SearchAttributesManager = searchattribute.NewManager(clock.NewRealTimeSource(), s.ClusterMetadataManager, dynamicconfig.GetBoolPropertyFn(true)) s.MetadataManager, err = factory.NewMetadataManager() diff --git a/common/resource/resourceImpl.go b/common/resource/resourceImpl.go index 39b3c8103b3..b2120bace0c 100644 --- a/common/resource/resourceImpl.go +++ b/common/resource/resourceImpl.go @@ -201,6 +201,7 @@ func New( params.ClusterMetadataConfig.CurrentClusterName, params.ClusterMetadataConfig.ClusterInformation, persistenceBean.GetClusterMetadataManager(), + nil, logger, ) diff --git a/host/dynamicconfig.go b/host/dynamicconfig.go index 6bab8341a72..38b7316701a 100644 --- a/host/dynamicconfig.go +++ b/host/dynamicconfig.go @@ -46,6 +46,7 @@ var ( dynamicconfig.ReplicationTaskFetcherAggregationInterval: 200 * time.Millisecond, dynamicconfig.ReplicationTaskFetcherErrorRetryWait: 50 * time.Millisecond, dynamicconfig.ReplicationTaskProcessorErrorRetryWait: time.Millisecond, + dynamicconfig.ClusterMetadataRefreshInterval: 100 * time.Millisecond, } ) diff --git a/host/testdata/xdc_integration_es_clusters.yaml b/host/testdata/xdc_integration_es_clusters.yaml index 470bf8f25e5..17e1e833a01 100644 --- a/host/testdata/xdc_integration_es_clusters.yaml +++ b/host/testdata/xdc_integration_es_clusters.yaml @@ -11,11 +11,6 @@ initialFailoverVersion: 1 rpcName: "frontend" rpcAddress: "127.0.0.1:9134" - standby-es: - enabled: true - initialFailoverVersion: 2 - rpcName: "frontend" - rpcAddress: "127.0.0.1:10134" enablearchival: false workerconfig: enablearchiver: false @@ -38,14 +33,9 @@ clustermetadata: enableGlobalNamespace: true failoverVersionIncrement: 10 - masterClusterName: "active-es" + masterClusterName: "standby-es" currentClusterName: "standby-es" clusterInformation: - active-es: - enabled: true - initialFailoverVersion: 1 - rpcName: "frontend" - rpcAddress: "127.0.0.1:9134" standby-es: enabled: true initialFailoverVersion: 2 diff --git a/host/testdata/xdc_integration_test_clusters.yaml b/host/testdata/xdc_integration_test_clusters.yaml index 24ac47fb650..bf887880541 100644 --- a/host/testdata/xdc_integration_test_clusters.yaml +++ b/host/testdata/xdc_integration_test_clusters.yaml @@ -11,11 +11,6 @@ initialFailoverVersion: 1 rpcName: "frontend" rpcAddress: "127.0.0.1:7134" - standby: - enabled: true - initialFailoverVersion: 2 - rpcName: "frontend" - rpcAddress: "127.0.0.1:8134" enablearchival: false workerconfig: enablearchiver: false @@ -30,14 +25,9 @@ clustermetadata: enableGlobalNamespace: true failoverVersionIncrement: 10 - masterClusterName: "active" + masterClusterName: "standby" currentClusterName: "standby" clusterInformation: - active: - enabled: true - initialFailoverVersion: 1 - rpcName: "frontend" - rpcAddress: "127.0.0.1:7134" standby: enabled: true initialFailoverVersion: 2 diff --git a/host/xdc/elasticsearch_test.go b/host/xdc/elasticsearch_test.go index ce11651b682..d0ddb714bb7 100644 --- a/host/xdc/elasticsearch_test.go +++ b/host/xdc/elasticsearch_test.go @@ -36,6 +36,8 @@ import ( "testing" "time" + "go.temporal.io/server/api/adminservice/v1" + "github.com/pborman/uuid" "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" @@ -124,6 +126,22 @@ func (s *esCrossDCTestSuite) SetupSuite() { s.Require().NoError(err) s.cluster2 = c + cluster1Address := clusterConfigs[0].ClusterMetadata.ClusterInformation[clusterConfigs[0].ClusterMetadata.CurrentClusterName].RPCAddress + cluster2Address := clusterConfigs[1].ClusterMetadata.ClusterInformation[clusterConfigs[1].ClusterMetadata.CurrentClusterName].RPCAddress + _, err = s.cluster1.GetAdminClient().AddOrUpdateRemoteCluster(host.NewContext(), &adminservice.AddOrUpdateRemoteClusterRequest{ + FrontendAddress: cluster2Address, + EnableRemoteClusterConnection: true, + }) + s.Require().NoError(err) + + _, err = s.cluster2.GetAdminClient().AddOrUpdateRemoteCluster(host.NewContext(), &adminservice.AddOrUpdateRemoteClusterRequest{ + FrontendAddress: cluster1Address, + EnableRemoteClusterConnection: true, + }) + s.Require().NoError(err) + // Wait for cluster metadata to refresh new added clusters + time.Sleep(time.Millisecond * 200) + s.esClient = host.CreateESClient(s.Suite, s.clusterConfigs[0].ESConfig, s.logger) host.PutIndexTemplate(s.Suite, s.esClient, fmt.Sprintf("../testdata/es_%s_index_template.json", s.clusterConfigs[0].ESConfig.Version), "test-visibility-template") host.CreateIndex(s.Suite, s.esClient, s.clusterConfigs[0].ESConfig.GetVisibilityIndex()) diff --git a/host/xdc/integration_failover_test.go b/host/xdc/integration_failover_test.go index 852e43e548c..4401d6516d5 100644 --- a/host/xdc/integration_failover_test.go +++ b/host/xdc/integration_failover_test.go @@ -40,6 +40,8 @@ import ( "testing" "time" + "go.temporal.io/server/api/adminservice/v1" + "github.com/pborman/uuid" "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" @@ -129,6 +131,26 @@ func (s *integrationClustersTestSuite) SetupSuite() { c, err = host.NewCluster(clusterConfigs[1], log.With(s.logger, tag.ClusterName(clusterName[1]))) s.Require().NoError(err) s.cluster2 = c + + cluster1Address := clusterConfigs[0].ClusterMetadata.ClusterInformation[clusterConfigs[0].ClusterMetadata.CurrentClusterName].RPCAddress + cluster2Address := clusterConfigs[1].ClusterMetadata.ClusterInformation[clusterConfigs[1].ClusterMetadata.CurrentClusterName].RPCAddress + _, err = s.cluster1.GetAdminClient().AddOrUpdateRemoteCluster( + host.NewContext(), + &adminservice.AddOrUpdateRemoteClusterRequest{ + FrontendAddress: cluster2Address, + EnableRemoteClusterConnection: true, + }) + s.Require().NoError(err) + + _, err = s.cluster2.GetAdminClient().AddOrUpdateRemoteCluster( + host.NewContext(), + &adminservice.AddOrUpdateRemoteClusterRequest{ + FrontendAddress: cluster1Address, + EnableRemoteClusterConnection: true, + }) + s.Require().NoError(err) + // Wait for cluster metadata to refresh new added clusters + time.Sleep(time.Millisecond * 200) } func (s *integrationClustersTestSuite) SetupTest() { diff --git a/tools/cli/namespaceUtils.go b/tools/cli/namespaceUtils.go index 51aa22da6b7..1e97dd52d21 100644 --- a/tools/cli/namespaceUtils.go +++ b/tools/cli/namespaceUtils.go @@ -300,6 +300,7 @@ func initializeClusterMetadata( clusterMetadata.CurrentClusterName, clusterMetadata.ClusterInformation, nil, + nil, log.NewNoopLogger(), ) }