From f8d70dedf9ebeb5ac9b96fe3c05128ebae59b8de Mon Sep 17 00:00:00 2001 From: David Porter Date: Thu, 3 Oct 2024 15:17:56 -0700 Subject: [PATCH] Feature/zonal isolation zone discovery (#6301) This changes the config property system.AllIsolationGroups from being a fixed static configuration value loaded on startup to be a callback which can be updated more dynamically. This should continue working with the existing dynamic config property as before, but support passing in a more complex discovery mechanism for isolation groups as needed. Why? We found that we were having trouble updating this config in production and needed to refactor it slightly to allow for a better update system. How did you test it? Deployed in staging environments with some initial manual testing as well as the unit testing. Potential risks This could break zonal isolation-the feature if it is wrong or buggy. that feature is designed to fall back to tasks being simply un-isolated, so it's not expected to actually break task processing, but it could degrade it if it were to contain some unforseen problems. --- cmd/server/cadence/server.go | 14 ++++ cmd/server/cadence/server_test.go | 37 ++++++++++ cmd/server/go.mod | 2 +- common/dynamicconfig/constants.go | 13 ++-- .../defaultisolationgroupstate/state.go | 7 +- .../defaultisolationgroupstate/state_test.go | 42 +++++------ .../isolationgroupapi/mappers.go | 24 +++---- .../isolationgroupapi/mappers_test.go | 56 +++++++++++++++ common/resource/params.go | 11 +-- common/resource/resourceImpl.go | 9 +++ common/resource/resource_test.go | 6 ++ service/matching/config/config.go | 20 ++---- service/matching/config/config_test.go | 10 ++- .../handler/engine_integration_test.go | 37 ++++++---- service/matching/handler/handler_test.go | 6 +- service/matching/service.go | 1 + service/matching/tasklist/matcher_test.go | 2 +- .../matching/tasklist/task_list_manager.go | 6 +- .../tasklist/task_list_manager_test.go | 18 +++-- tools/cli/utils_test.go | 15 ++++ tools/common/schema/util_test.go | 70 +++++++++++++++++++ 21 files changed, 305 insertions(+), 101 deletions(-) create mode 100644 common/isolationgroup/isolationgroupapi/mappers_test.go create mode 100644 tools/common/schema/util_test.go diff --git a/cmd/server/cadence/server.go b/cmd/server/cadence/server.go index e14d3eeb559..15d3163cc76 100644 --- a/cmd/server/cadence/server.go +++ b/cmd/server/cadence/server.go @@ -39,6 +39,7 @@ import ( "github.com/uber/cadence/common/dynamicconfig" "github.com/uber/cadence/common/dynamicconfig/configstore" "github.com/uber/cadence/common/elasticsearch" + "github.com/uber/cadence/common/isolationgroup/isolationgroupapi" "github.com/uber/cadence/common/log/loggerimpl" "github.com/uber/cadence/common/log/tag" "github.com/uber/cadence/common/membership" @@ -195,6 +196,8 @@ func (s *server) startService() common.Daemon { params.ClusterRedirectionPolicy = s.cfg.ClusterGroupMetadata.ClusterRedirectionPolicy + params.GetIsolationGroups = getFromDynamicConfig(params, dc) + params.ClusterMetadata = cluster.NewMetadata( clusterGroupMetadata.FailoverVersionIncrement, clusterGroupMetadata.PrimaryClusterName, @@ -373,3 +376,14 @@ func validateIndex(config *config.ElasticSearchConfig) { log.Fatalf("Visibility index is missing in config") } } + +func getFromDynamicConfig(params resource.Params, dc *dynamicconfig.Collection) func() []string { + return func() []string { + res, err := isolationgroupapi.MapAllIsolationGroupsResponse(dc.GetListProperty(dynamicconfig.AllIsolationGroups)()) + if err != nil { + params.Logger.Error("failed to get isolation groups from config", tag.Error(err)) + return nil + } + return res + } +} diff --git a/cmd/server/cadence/server_test.go b/cmd/server/cadence/server_test.go index dcfb02d3d6d..5e1913f6233 100644 --- a/cmd/server/cadence/server_test.go +++ b/cmd/server/cadence/server_test.go @@ -29,12 +29,17 @@ import ( "testing" "time" + "github.com/golang/mock/gomock" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" "github.com/uber/cadence/common" "github.com/uber/cadence/common/config" + "github.com/uber/cadence/common/dynamicconfig" + "github.com/uber/cadence/common/log/loggerimpl" "github.com/uber/cadence/common/persistence/nosql/nosqlplugin/cassandra/gocql" + "github.com/uber/cadence/common/resource" "github.com/uber/cadence/common/service" "github.com/uber/cadence/testflags" "github.com/uber/cadence/tools/cassandra" @@ -114,3 +119,35 @@ func (s *ServerSuite) TestServerStartup() { daemon.Stop() } } + +func TestSettingGettingZonalIsolationGroupsFromIG(t *testing.T) { + + ctrl := gomock.NewController(t) + client := dynamicconfig.NewMockClient(ctrl) + client.EXPECT().GetListValue(dynamicconfig.AllIsolationGroups, gomock.Any()).Return([]interface{}{ + "zone-1", "zone-2", + }, nil) + + dc := dynamicconfig.NewCollection(client, loggerimpl.NewNopLogger()) + + assert.NotPanics(t, func() { + fn := getFromDynamicConfig(resource.Params{ + Logger: loggerimpl.NewNopLogger(), + }, dc) + out := fn() + assert.Equal(t, []string{"zone-1", "zone-2"}, out) + }) +} + +func TestSettingGettingZonalIsolationGroupsFromIGError(t *testing.T) { + ctrl := gomock.NewController(t) + client := dynamicconfig.NewMockClient(ctrl) + client.EXPECT().GetListValue(dynamicconfig.AllIsolationGroups, gomock.Any()).Return(nil, assert.AnError) + dc := dynamicconfig.NewCollection(client, loggerimpl.NewNopLogger()) + + assert.NotPanics(t, func() { + getFromDynamicConfig(resource.Params{ + Logger: loggerimpl.NewNopLogger(), + }, dc)() + }) +} diff --git a/cmd/server/go.mod b/cmd/server/go.mod index 8a4994925fc..16b3fb7ea02 100644 --- a/cmd/server/go.mod +++ b/cmd/server/go.mod @@ -23,7 +23,7 @@ require ( github.com/go-sql-driver/mysql v1.7.1 // indirect github.com/gocql/gocql v0.0.0-20211015133455-b225f9b53fa1 // indirect github.com/gogo/protobuf v1.3.2 // indirect - github.com/golang/mock v1.6.0 // indirect + github.com/golang/mock v1.6.0 github.com/google/uuid v1.5.0 // indirect github.com/hashicorp/go-version v1.2.0 // indirect github.com/iancoleman/strcase v0.2.0 // indirect diff --git a/common/dynamicconfig/constants.go b/common/dynamicconfig/constants.go index 995eea14835..0a3fcb03221 100644 --- a/common/dynamicconfig/constants.go +++ b/common/dynamicconfig/constants.go @@ -2962,6 +2962,13 @@ const ( UnknownListKey ListKey = iota TestGetListPropertyKey + // AllIsolationGroups is the list of all possible isolation groups in a service + // KeyName: system.allIsolationGroups + // Value type: []string + // Default value: N/A + // Allowed filters: N/A + AllIsolationGroups + // HeaderForwardingRules defines which headers are forwarded from inbound calls to outbound. // This value is only loaded at startup. // @@ -2971,12 +2978,6 @@ const ( // Value type: []rpc.HeaderRule or an []interface{} containing `map[string]interface{}{"Add":bool,"Match":string}` values. // Default value: forward all headers. (this is a problematic value, and it will be changing as we reduce to a list of known values) HeaderForwardingRules - // AllIsolationGroups is the list of all possible isolation groups in a service - // KeyName: system.allIsolationGroups - // Value type: []string - // Default value: N/A - // Allowed filters: N/A - AllIsolationGroups LastListKey ) diff --git a/common/isolationgroup/defaultisolationgroupstate/state.go b/common/isolationgroup/defaultisolationgroupstate/state.go index 1e753e032f0..db481081d5e 100644 --- a/common/isolationgroup/defaultisolationgroupstate/state.go +++ b/common/isolationgroup/defaultisolationgroupstate/state.go @@ -54,14 +54,11 @@ func NewDefaultIsolationGroupStateWatcherWithConfigStoreClient( domainCache cache.DomainCache, cfgStoreClient dynamicconfig.Client, // can be nil, which means global drain is unsupported metricsClient metrics.Client, + getIsolationGroups func() []string, ) (isolationgroup.State, error) { stopChan := make(chan struct{}) - allIGs := dc.GetListProperty(dynamicconfig.AllIsolationGroups)() - allIsolationGroups, err := isolationgroupapi.MapAllIsolationGroupsResponse(allIGs) - if err != nil { - return nil, fmt.Errorf("could not get all isolation groups fron dynamic config: %w", err) - } + allIsolationGroups := getIsolationGroups() config := defaultConfig{ IsolationGroupEnabled: dc.GetBoolPropertyFilteredByDomain(dynamicconfig.EnableTasklistIsolation), diff --git a/common/isolationgroup/defaultisolationgroupstate/state_test.go b/common/isolationgroup/defaultisolationgroupstate/state_test.go index 17ceba54ff4..d56a4ff374e 100644 --- a/common/isolationgroup/defaultisolationgroupstate/state_test.go +++ b/common/isolationgroup/defaultisolationgroupstate/state_test.go @@ -35,6 +35,7 @@ import ( "github.com/uber/cadence/common/cache" "github.com/uber/cadence/common/dynamicconfig" "github.com/uber/cadence/common/isolationgroup/isolationgroupapi" + "github.com/uber/cadence/common/log/loggerimpl" "github.com/uber/cadence/common/log/testlogger" "github.com/uber/cadence/common/metrics" "github.com/uber/cadence/common/persistence" @@ -639,32 +640,6 @@ func TestIsolationGroupStateMapping(t *testing.T) { } } -func TestMapAllIsolationGroupStates(t *testing.T) { - - tests := map[string]struct { - in []interface{} - expected []string - expectedErr error - }{ - "valid mapping": { - in: []interface{}{"zone-1", "zone-2", "zone-3"}, - expected: []string{"zone-1", "zone-2", "zone-3"}, - }, - "invalid mapping": { - in: []interface{}{1, 2, 3}, - expectedErr: errors.New("failed to get all-isolation-groups response from dynamic config: got 1 (int)"), - }, - } - - for name, td := range tests { - t.Run(name, func(t *testing.T) { - res, err := isolationgroupapi.MapAllIsolationGroupsResponse(td.in) - assert.Equal(t, td.expected, res) - assert.Equal(t, td.expectedErr, err) - }) - } -} - func TestUpdateRequest(t *testing.T) { tests := map[string]struct { @@ -716,6 +691,21 @@ func TestUpdateRequest(t *testing.T) { } } +func TestNewDefaultIsolationGroupStateWatcherWithConfigStoreClient(t *testing.T) { + dc := dynamicconfig.NewNopCollection() + domainCache := cache.NewNoOpDomainCache() + client := metrics.NewNoopMetricsClient() + ig := func() []string { return nil } + NewDefaultIsolationGroupStateWatcherWithConfigStoreClient( + loggerimpl.NewNopLogger(), + dc, + domainCache, + nil, + client, + ig, + ) +} + func TestIsolationGroupShutdown(t *testing.T) { var v defaultIsolationGroupStateHandler assert.NotPanics(t, func() { diff --git a/common/isolationgroup/isolationgroupapi/mappers.go b/common/isolationgroup/isolationgroupapi/mappers.go index fb23a279c39..73c011f418a 100644 --- a/common/isolationgroup/isolationgroupapi/mappers.go +++ b/common/isolationgroup/isolationgroupapi/mappers.go @@ -29,18 +29,6 @@ import ( "github.com/uber/cadence/common/types" ) -func MapAllIsolationGroupsResponse(in []interface{}) ([]string, error) { - var allIsolationGroups []string - for k := range in { - v, ok := in[k].(string) - if !ok { - return nil, fmt.Errorf("failed to get all-isolation-groups response from dynamic config: got %v (%T)", in[k], in[k]) - } - allIsolationGroups = append(allIsolationGroups, v) - } - return allIsolationGroups, nil -} - func MapDynamicConfigResponse(in []interface{}) (out types.IsolationGroupConfiguration, err error) { if in == nil { return nil, nil @@ -85,3 +73,15 @@ func MapUpdateGlobalIsolationGroupsRequest(in types.IsolationGroupConfiguration) } return out, nil } + +func MapAllIsolationGroupsResponse(in []interface{}) ([]string, error) { + var allIsolationGroups []string + for k := range in { + v, ok := in[k].(string) + if !ok { + return nil, fmt.Errorf("failed to get all-isolation-groups response from dynamic config: got %v (%T)", in[k], in[k]) + } + allIsolationGroups = append(allIsolationGroups, v) + } + return allIsolationGroups, nil +} diff --git a/common/isolationgroup/isolationgroupapi/mappers_test.go b/common/isolationgroup/isolationgroupapi/mappers_test.go new file mode 100644 index 00000000000..84bd8460fd5 --- /dev/null +++ b/common/isolationgroup/isolationgroupapi/mappers_test.go @@ -0,0 +1,56 @@ +// The MIT License (MIT) + +// Copyright (c) 2017-2020 Uber Technologies Inc. + +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in all +// copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +// SOFTWARE. + +package isolationgroupapi + +import ( + "errors" + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestMapAllIsolationGroupStates(t *testing.T) { + + tests := map[string]struct { + in []interface{} + expected []string + expectedErr error + }{ + "valid mapping": { + in: []interface{}{"zone-1", "zone-2", "zone-3"}, + expected: []string{"zone-1", "zone-2", "zone-3"}, + }, + "invalid mapping": { + in: []interface{}{1, 2, 3}, + expectedErr: errors.New("failed to get all-isolation-groups response from dynamic config: got 1 (int)"), + }, + } + + for name, td := range tests { + t.Run(name, func(t *testing.T) { + res, err := MapAllIsolationGroupsResponse(td.in) + assert.Equal(t, td.expected, res) + assert.Equal(t, td.expectedErr, err) + }) + } +} diff --git a/common/resource/params.go b/common/resource/params.go index 1fe6f3348b0..0c16a4e5006 100644 --- a/common/resource/params.go +++ b/common/resource/params.go @@ -49,11 +49,12 @@ import ( type ( // Params holds the set of parameters needed to initialize common service resources Params struct { - Name string - InstanceID string - Logger log.Logger - ThrottledLogger log.Logger - HostName string + Name string + InstanceID string + Logger log.Logger + ThrottledLogger log.Logger + HostName string + GetIsolationGroups func() []string MetricScope tally.Scope MembershipResolver membership.Resolver diff --git a/common/resource/resourceImpl.go b/common/resource/resourceImpl.go index ee4752ae57a..5a26c3bb6d8 100644 --- a/common/resource/resourceImpl.go +++ b/common/resource/resourceImpl.go @@ -171,6 +171,8 @@ func New( dispatcher := params.RPCFactory.GetDispatcher() membershipResolver := params.MembershipResolver + ensureGetAllIsolationGroupsFnIsSet(params) + dynamicCollection := dynamicconfig.NewCollection( params.DynamicConfig, logger, @@ -706,6 +708,7 @@ func ensureIsolationGroupStateHandlerOrDefault( domainCache, isolationGroupStore, params.MetricsClient, + params.GetIsolationGroups, ) } @@ -716,3 +719,9 @@ func ensurePartitionerOrDefault(params *Params, state isolationgroup.State) part } return partition.NewDefaultPartitioner(params.Logger, state) } + +func ensureGetAllIsolationGroupsFnIsSet(params *Params) { + if params.GetIsolationGroups == nil { + params.GetIsolationGroups = func() []string { return []string{} } + } +} diff --git a/common/resource/resource_test.go b/common/resource/resource_test.go index 7d2d4eb6d6c..de41a02a4d0 100644 --- a/common/resource/resource_test.go +++ b/common/resource/resource_test.go @@ -34,3 +34,9 @@ func TestShutdown(t *testing.T) { i.Stop() }) } + +func TestNewResource(t *testing.T) { + assert.NotPanics(t, func() { + ensureGetAllIsolationGroupsFnIsSet(&Params{}) + }) +} diff --git a/service/matching/config/config.go b/service/matching/config/config.go index a4c9b20abcd..d50292fbc3e 100644 --- a/service/matching/config/config.go +++ b/service/matching/config/config.go @@ -73,7 +73,7 @@ type ( // isolation configuration EnableTasklistIsolation dynamicconfig.BoolPropertyFnWithDomainFilter - AllIsolationGroups []string + AllIsolationGroups func() []string // hostname info HostName string // rate limiter configuration @@ -115,7 +115,8 @@ type ( NumReadPartitions func() int // isolation configuration EnableTasklistIsolation func() bool - AllIsolationGroups []string + // A function which returns all the isolation groups + AllIsolationGroups func() []string // hostname HostName string // rate limiter configuration @@ -127,7 +128,7 @@ type ( ) // NewConfig returns new service config with default values -func NewConfig(dc *dynamicconfig.Collection, hostName string) *Config { +func NewConfig(dc *dynamicconfig.Collection, hostName string, getIsolationGroups func() []string) *Config { return &Config{ PersistenceMaxQPS: dc.GetIntProperty(dynamicconfig.MatchingPersistenceMaxQPS), PersistenceGlobalMaxQPS: dc.GetIntProperty(dynamicconfig.MatchingPersistenceGlobalMaxQPS), @@ -158,7 +159,6 @@ func NewConfig(dc *dynamicconfig.Collection, hostName string) *Config { EnableTaskInfoLogByDomainID: dc.GetBoolPropertyFilteredByDomainID(dynamicconfig.MatchingEnableTaskInfoLogByDomainID), ActivityTaskSyncMatchWaitTime: dc.GetDurationPropertyFilteredByDomain(dynamicconfig.MatchingActivityTaskSyncMatchWaitTime), EnableTasklistIsolation: dc.GetBoolPropertyFilteredByDomain(dynamicconfig.EnableTasklistIsolation), - AllIsolationGroups: mapIGs(dc.GetListProperty(dynamicconfig.AllIsolationGroups)()), AsyncTaskDispatchTimeout: dc.GetDurationPropertyFilteredByTaskListInfo(dynamicconfig.AsyncTaskDispatchTimeout), EnableTasklistOwnershipGuard: dc.GetBoolProperty(dynamicconfig.MatchingEnableTasklistGuardAgainstOwnershipShardLoss), LocalPollWaitTime: dc.GetDurationPropertyFilteredByTaskListInfo(dynamicconfig.LocalPollWaitTime), @@ -167,16 +167,6 @@ func NewConfig(dc *dynamicconfig.Collection, hostName string) *Config { TaskDispatchRPS: 100000.0, TaskDispatchRPSTTL: time.Minute, MaxTimeBetweenTaskDeletes: time.Second, + AllIsolationGroups: getIsolationGroups, } } - -func mapIGs(in []interface{}) []string { - var allIsolationGroups []string - for k := range in { - v, ok := in[k].(string) - if ok { - allIsolationGroups = append(allIsolationGroups, v) - } - } - return allIsolationGroups -} diff --git a/service/matching/config/config_test.go b/service/matching/config/config_test.go index 9924a534bec..4434779e1df 100644 --- a/service/matching/config/config_test.go +++ b/service/matching/config/config_test.go @@ -71,7 +71,6 @@ func TestNewConfig(t *testing.T) { "EnableTaskInfoLogByDomainID": {dynamicconfig.MatchingEnableTaskInfoLogByDomainID, true}, "ActivityTaskSyncMatchWaitTime": {dynamicconfig.MatchingActivityTaskSyncMatchWaitTime, time.Duration(24)}, "EnableTasklistIsolation": {dynamicconfig.EnableTasklistIsolation, false}, - "AllIsolationGroups": {dynamicconfig.AllIsolationGroups, []interface{}{"a", "b", "c"}}, "AsyncTaskDispatchTimeout": {dynamicconfig.AsyncTaskDispatchTimeout, time.Duration(25)}, "LocalPollWaitTime": {dynamicconfig.LocalPollWaitTime, time.Duration(10)}, "LocalTaskWaitTime": {dynamicconfig.LocalTaskWaitTime, time.Duration(10)}, @@ -79,6 +78,7 @@ func TestNewConfig(t *testing.T) { "TaskDispatchRPS": {nil, 100000.0}, "TaskDispatchRPSTTL": {nil, time.Minute}, "MaxTimeBetweenTaskDeletes": {nil, time.Second}, + "AllIsolationGroups": {nil, []string{"zone-1", "zone-2"}}, "EnableTasklistOwnershipGuard": {dynamicconfig.MatchingEnableTasklistGuardAgainstOwnershipShardLoss, false}, } client := dynamicconfig.NewInMemoryClient() @@ -92,7 +92,7 @@ func TestNewConfig(t *testing.T) { } dc := dynamicconfig.NewCollection(client, testlogger.New(t)) - config := NewConfig(dc, hostname) + config := NewConfig(dc, hostname, isolationGroupsHelper) assertFieldsMatch(t, *config, fields) } @@ -148,6 +148,8 @@ func getValue(f *reflect.Value) interface{} { return fn() case dynamicconfig.StringPropertyFn: return fn() + case func() []string: + return fn() default: panic("Unable to handle type: " + f.Type().Name()) } @@ -155,3 +157,7 @@ func getValue(f *reflect.Value) interface{} { return f.Interface() } } + +func isolationGroupsHelper() []string { + return []string{"zone-1", "zone-2"} +} diff --git a/service/matching/handler/engine_integration_test.go b/service/matching/handler/engine_integration_test.go index 73c3201a1e9..883bf34d98a 100644 --- a/service/matching/handler/engine_integration_test.go +++ b/service/matching/handler/engine_integration_test.go @@ -134,9 +134,13 @@ func (s *matchingEngineSuite) SetupTest() { s.mockIsolationStore = dynamicconfig.NewMockClient(s.controller) dcClient := dynamicconfig.NewInMemoryClient() dcClient.UpdateValue(dynamicconfig.EnableTasklistIsolation, true) - dcClient.UpdateValue(dynamicconfig.AllIsolationGroups, []interface{}{"datacenterA", "datacenterB"}) dc := dynamicconfig.NewCollection(dcClient, s.logger) - isolationGroupState, _ := defaultisolationgroupstate.NewDefaultIsolationGroupStateWatcherWithConfigStoreClient(s.logger, dc, s.mockDomainCache, s.mockIsolationStore, metrics.NewNoopMetricsClient()) + isolationGroupState, _ := defaultisolationgroupstate.NewDefaultIsolationGroupStateWatcherWithConfigStoreClient(s.logger, + dc, + s.mockDomainCache, + s.mockIsolationStore, + metrics.NewNoopMetricsClient(), + getIsolationGroupsHelper) s.partitioner = partition.NewDefaultPartitioner(s.logger, isolationGroupState) s.handlerContext = newHandlerContext( context.Background(), @@ -467,7 +471,7 @@ func (s *matchingEngineSuite) AddAndPollTasks(taskType int, enableIsolation bool s.matchingEngine.config.LongPollExpirationInterval = dynamicconfig.GetDurationPropertyFnFilteredByTaskListInfo(10 * time.Millisecond) s.matchingEngine.config.EnableTasklistIsolation = dynamicconfig.GetBoolPropertyFnFilteredByDomainID(enableIsolation) - isolationGroups := s.matchingEngine.config.AllIsolationGroups + isolationGroups := s.matchingEngine.config.AllIsolationGroups() const taskCount = 6 const initialRangeID = 102 @@ -580,7 +584,7 @@ func (s *matchingEngineSuite) SyncMatchTasks(taskType int, enableIsolation bool) for i := int64(0); i < taskCount; i++ { scheduleID := i * 3 - group := isolationGroups[int(i)%len(isolationGroups)] + group := isolationGroups()[int(i)%len(isolationGroups())] var wg sync.WaitGroup var result *pollTaskResponse var pollErr error @@ -613,7 +617,7 @@ func (s *matchingEngineSuite) SyncMatchTasks(taskType int, enableIsolation bool) // Revert the dispatch RPS and verify that poller will get the task for i := int64(0); i < throttledTaskCount; i++ { scheduleID := i * 3 - group := isolationGroups[int(i)%len(isolationGroups)] + group := isolationGroups()[int(i)%len(isolationGroups())] var wg sync.WaitGroup var result *pollTaskResponse var pollErr error @@ -737,7 +741,7 @@ func (s *matchingEngineSuite) ConcurrentAddAndPollTasks(taskType int, workerCoun go func() { defer wg.Done() for i := int64(0); i < taskCount; i++ { - group := isolationGroups[int(i)%len(isolationGroups)] // let each worker to generate tasks for all isolation groups + group := isolationGroups()[int(i)%len(isolationGroups())] // let each worker to generate tasks for all isolation groups addRequest := &addTaskRequest{ TaskType: taskType, DomainUUID: testParam.DomainID, @@ -763,7 +767,7 @@ func (s *matchingEngineSuite) ConcurrentAddAndPollTasks(taskType int, workerCoun defer wg.Done() for i := int64(0); i < taskCount; { maxDispatch := dispatchLimitFn(wNum, i) - group := isolationGroups[int(wNum)%len(isolationGroups)] // let each worker only polls from one isolation group + group := isolationGroups()[int(wNum)%len(isolationGroups())] // let each worker only polls from one isolation group pollReq := &pollTaskRequest{ TaskType: taskType, DomainUUID: testParam.DomainID, @@ -1087,7 +1091,7 @@ func (s *matchingEngineSuite) DrainBacklogNoPollersIsolationGroup(taskType int) ScheduleID: scheduleID, TaskList: testParam.TaskList, ScheduleToStartTimeoutSeconds: 1, - PartitionConfig: map[string]string{partition.IsolationGroupKey: isolationGroups[int(i)%len(isolationGroups)]}, + PartitionConfig: map[string]string{partition.IsolationGroupKey: isolationGroups()[int(i)%len(isolationGroups())]}, } _, err := addTask(s.matchingEngine, s.handlerContext, addRequest) s.NoError(err) @@ -1102,7 +1106,7 @@ func (s *matchingEngineSuite) DrainBacklogNoPollersIsolationGroup(taskType int) DomainUUID: testParam.DomainID, TaskList: testParam.TaskList, Identity: testParam.Identity, - IsolationGroup: isolationGroups[0], + IsolationGroup: isolationGroups()[0], } result, err := pollTask(s.matchingEngine, s.handlerContext, pollReq) s.NoError(err) @@ -1147,7 +1151,7 @@ func (s *matchingEngineSuite) TestAddStickyDecisionNoPollerIsolation() { DomainUUID: testParam.DomainID, TaskList: testParam.TaskList, Identity: testParam.Identity, - IsolationGroup: isolationGroups[0], + IsolationGroup: isolationGroups()[0], } result, err := pollTask(s.matchingEngine, s.handlerContext, pollReq) s.NoError(err) @@ -1164,10 +1168,10 @@ func (s *matchingEngineSuite) TestAddStickyDecisionNoPollerIsolation() { ScheduleID: scheduleID, TaskList: testParam.TaskList, ScheduleToStartTimeoutSeconds: 1, - PartitionConfig: map[string]string{partition.IsolationGroupKey: isolationGroups[int(i)%len(isolationGroups)]}, + PartitionConfig: map[string]string{partition.IsolationGroupKey: isolationGroups()[int(i)%len(isolationGroups())]}, } _, err := addTask(s.matchingEngine, s.handlerContext, addRequest) - if int(i)%len(isolationGroups) == 0 { + if int(i)%len(isolationGroups()) == 0 { s.NoError(err) count++ scheduleIDs = append(scheduleIDs, scheduleID) @@ -1187,7 +1191,7 @@ func (s *matchingEngineSuite) TestAddStickyDecisionNoPollerIsolation() { DomainUUID: testParam.DomainID, TaskList: testParam.TaskList, Identity: testParam.Identity, - IsolationGroup: isolationGroups[0], + IsolationGroup: isolationGroups()[0], } result, err := pollTask(s.matchingEngine, s.handlerContext, pollReq) s.NoError(err) @@ -1343,10 +1347,9 @@ func validateTimeRange(t time.Time, expectedDuration time.Duration) bool { } func defaultTestConfig() *config.Config { - config := config.NewConfig(dynamicconfig.NewNopCollection(), "some random hostname") + config := config.NewConfig(dynamicconfig.NewNopCollection(), "some random hostname", getIsolationGroupsHelper) config.LongPollExpirationInterval = dynamicconfig.GetDurationPropertyFnFilteredByTaskListInfo(100 * time.Millisecond) config.MaxTaskDeleteBatchSize = dynamicconfig.GetIntPropertyFilteredByTaskListInfo(1) - config.AllIsolationGroups = []string{"datacenterA", "datacenterB"} config.GetTasksBatchSize = dynamicconfig.GetIntPropertyFilteredByTaskListInfo(10) config.AsyncTaskDispatchTimeout = dynamicconfig.GetDurationPropertyFnFilteredByTaskListInfo(10 * time.Millisecond) config.MaxTimeBetweenTaskDeletes = time.Duration(0) @@ -1573,3 +1576,7 @@ func pollTask(engine *matchingEngineImpl, hCtx *handlerContext, request *pollTas func isEmptyToken(token *common.TaskToken) bool { return token == nil || *token == common.TaskToken{} } + +func getIsolationGroupsHelper() []string { + return []string{"zone-a", "zone-b"} +} diff --git a/service/matching/handler/handler_test.go b/service/matching/handler/handler_test.go index f689ae73275..f30b701ee7f 100644 --- a/service/matching/handler/handler_test.go +++ b/service/matching/handler/handler_test.go @@ -111,7 +111,7 @@ func (s *handlerSuite) getHandler(config *config.Config) Handler { } func (s *handlerSuite) TestNewHandler() { - cfg := config.NewConfig(dynamicconfig.NewCollection(dynamicconfig.NewInMemoryClient(), s.mockResource.Logger), "matching-test") + cfg := config.NewConfig(dynamicconfig.NewCollection(dynamicconfig.NewInMemoryClient(), s.mockResource.Logger), "matching-test", getIsolationGroupsHelper) handler := s.getHandler(cfg) s.NotNil(handler) } @@ -119,7 +119,7 @@ func (s *handlerSuite) TestNewHandler() { func (s *handlerSuite) TestStart() { defer goleak.VerifyNone(s.T()) - cfg := config.NewConfig(dynamicconfig.NewCollection(dynamicconfig.NewInMemoryClient(), s.mockResource.Logger), "matching-test") + cfg := config.NewConfig(dynamicconfig.NewCollection(dynamicconfig.NewInMemoryClient(), s.mockResource.Logger), "matching-test", getIsolationGroupsHelper) handler := s.getHandler(cfg) handler.Start() @@ -131,7 +131,7 @@ func (s *handlerSuite) TestStart() { func (s *handlerSuite) TestStop() { defer goleak.VerifyNone(s.T()) - cfg := config.NewConfig(dynamicconfig.NewCollection(dynamicconfig.NewInMemoryClient(), s.mockResource.Logger), "matching-test") + cfg := config.NewConfig(dynamicconfig.NewCollection(dynamicconfig.NewInMemoryClient(), s.mockResource.Logger), "matching-test", getIsolationGroupsHelper) handler := s.getHandler(cfg) s.mockEngine.EXPECT().Stop().Times(1) diff --git a/service/matching/service.go b/service/matching/service.go index 66ed18bc4d2..472a0691bb2 100644 --- a/service/matching/service.go +++ b/service/matching/service.go @@ -56,6 +56,7 @@ func NewService( dynamicconfig.ClusterNameFilter(params.ClusterMetadata.GetCurrentClusterName()), ), params.HostName, + params.GetIsolationGroups, ) serviceResource, err := resource.New( diff --git a/service/matching/tasklist/matcher_test.go b/service/matching/tasklist/matcher_test.go index 47dabd28d53..100f58d068b 100644 --- a/service/matching/tasklist/matcher_test.go +++ b/service/matching/tasklist/matcher_test.go @@ -72,7 +72,7 @@ func TestMatcherSuite(t *testing.T) { func (t *MatcherTestSuite) SetupTest() { t.controller = gomock.NewController(t.T()) t.client = matching.NewMockClient(t.controller) - cfg := config.NewConfig(dynamicconfig.NewNopCollection(), "some random hostname") + cfg := config.NewConfig(dynamicconfig.NewNopCollection(), "some random hostname", func() []string { return nil }) t.taskList = NewTestTaskListID(t.T(), uuid.New(), common.ReservedTaskListPrefix+"tl0/1", persistence.TaskListTypeDecision) tlCfg := newTaskListConfig(t.taskList, cfg, testDomainName) diff --git a/service/matching/tasklist/task_list_manager.go b/service/matching/tasklist/task_list_manager.go index 3eb2d15640c..f5c26596d6d 100644 --- a/service/matching/tasklist/task_list_manager.go +++ b/service/matching/tasklist/task_list_manager.go @@ -194,7 +194,7 @@ func NewManager( tlMgr.qpsTracker = stats.NewEmaFixedWindowQPSTracker(timeSource, 0.5, 10*time.Second) var isolationGroups []string if tlMgr.isIsolationMatcherEnabled() { - isolationGroups = config.AllIsolationGroups + isolationGroups = config.AllIsolationGroups() } var fwdr *Forwarder if tlMgr.isFowardingAllowed(taskList, *taskListKind) { @@ -604,7 +604,7 @@ func (c *taskListManagerImpl) getIsolationGroupForTask(ctx context.Context, task partitionConfig[k] = v } partitionConfig[partition.WorkflowIDKey] = taskInfo.WorkflowID - pollerIsolationGroups := c.config.AllIsolationGroups + pollerIsolationGroups := c.config.AllIsolationGroups() // Not all poller information are available at the time of task list manager creation, // because we don't persist poller information in database, so in the first minute, we always assume // pollers are available in all isolation groups to avoid the risk of leaking a task to another isolation group. @@ -613,7 +613,7 @@ func (c *taskListManagerImpl) getIsolationGroupForTask(ctx context.Context, task pollerIsolationGroups = c.getPollerIsolationGroups() if len(pollerIsolationGroups) == 0 { // we don't have any pollers, use all isolation groups and wait for pollers' arriving - pollerIsolationGroups = c.config.AllIsolationGroups + pollerIsolationGroups = c.config.AllIsolationGroups() } } group, err := c.partitioner.GetIsolationGroupByDomainID(ctx, taskInfo.DomainID, partitionConfig, pollerIsolationGroups) diff --git a/service/matching/tasklist/task_list_manager_test.go b/service/matching/tasklist/task_list_manager_test.go index be695f18a1e..9481e3261bb 100644 --- a/service/matching/tasklist/task_list_manager_test.go +++ b/service/matching/tasklist/task_list_manager_test.go @@ -51,10 +51,10 @@ import ( ) func defaultTestConfig() *config.Config { - config := config.NewConfig(dynamicconfig.NewNopCollection(), "some random hostname") + config := config.NewConfig(dynamicconfig.NewNopCollection(), "some random hostname", getIsolationgroupsHelper) config.LongPollExpirationInterval = dynamicconfig.GetDurationPropertyFnFilteredByTaskListInfo(100 * time.Millisecond) config.MaxTaskDeleteBatchSize = dynamicconfig.GetIntPropertyFilteredByTaskListInfo(1) - config.AllIsolationGroups = []string{"datacenterA", "datacenterB"} + config.AllIsolationGroups = getIsolationgroupsHelper config.GetTasksBatchSize = dynamicconfig.GetIntPropertyFilteredByTaskListInfo(10) config.AsyncTaskDispatchTimeout = dynamicconfig.GetDurationPropertyFnFilteredByTaskListInfo(10 * time.Millisecond) config.LocalTaskWaitTime = dynamicconfig.GetDurationPropertyFnFilteredByTaskListInfo(time.Millisecond) @@ -247,7 +247,7 @@ func TestDescribeTaskList(t *testing.T) { } func TestCheckIdleTaskList(t *testing.T) { - cfg := config.NewConfig(dynamicconfig.NewNopCollection(), "some random hostname") + cfg := config.NewConfig(dynamicconfig.NewNopCollection(), "some random hostname", getIsolationgroupsHelper) cfg.IdleTasklistCheckInterval = dynamicconfig.GetDurationPropertyFnFilteredByTaskListInfo(10 * time.Millisecond) t.Run("Idle task-list", func(t *testing.T) { @@ -319,7 +319,7 @@ func TestAddTaskStandby(t *testing.T) { controller := gomock.NewController(t) logger := testlogger.New(t) - cfg := config.NewConfig(dynamicconfig.NewNopCollection(), "some random hostname") + cfg := config.NewConfig(dynamicconfig.NewNopCollection(), "some random hostname", getIsolationgroupsHelper) cfg.IdleTasklistCheckInterval = dynamicconfig.GetDurationPropertyFnFilteredByTaskListInfo(10 * time.Millisecond) tlm := createTestTaskListManagerWithConfig(t, logger, controller, cfg) @@ -380,7 +380,7 @@ func TestGetPollerIsolationGroup(t *testing.T) { bgCtx := ContextWithPollerID(context.Background(), "poller0") bgCtx = ContextWithIdentity(bgCtx, "id0") - bgCtx = ContextWithIsolationGroup(bgCtx, config.AllIsolationGroups[0]) + bgCtx = ContextWithIsolationGroup(bgCtx, getIsolationgroupsHelper()[0]) ctx, cancel := context.WithTimeout(bgCtx, time.Second) _, err := tlm.GetTask(ctx, nil) cancel() @@ -390,7 +390,7 @@ func TestGetPollerIsolationGroup(t *testing.T) { // we should get isolation groups that showed up within last 10 seconds groups := tlm.getPollerIsolationGroups() assert.Equal(t, 1, len(groups)) - assert.Equal(t, config.AllIsolationGroups[0], groups[0]) + assert.Equal(t, getIsolationgroupsHelper()[0], groups[0]) // after 10s, the poller from that isolation group are cleared from the poller history time.Sleep(10 * time.Second) @@ -412,7 +412,7 @@ func TestGetPollerIsolationGroup(t *testing.T) { groups = tlm.getPollerIsolationGroups() wg.Wait() assert.Equal(t, 1, len(groups)) - assert.Equal(t, config.AllIsolationGroups[0], groups[0]) + assert.Equal(t, getIsolationgroupsHelper()[0], groups[0]) } // return a client side tasklist throttle error from the rate limiter. @@ -891,3 +891,7 @@ func TestTaskExpiryAndCompletion(t *testing.T) { }) } } + +func getIsolationgroupsHelper() []string { + return []string{"datacenterA", "datacenterB"} +} diff --git a/tools/cli/utils_test.go b/tools/cli/utils_test.go index d2a9c801493..7680b864b5d 100644 --- a/tools/cli/utils_test.go +++ b/tools/cli/utils_test.go @@ -25,6 +25,9 @@ import ( "time" "github.com/stretchr/testify/assert" + + "github.com/uber/cadence/common/testing/testdatagen/idlfuzzedtestdata" + "github.com/uber/cadence/common/types" ) func Test_ParseIntMultiRange(t *testing.T) { @@ -124,3 +127,15 @@ func Test_anyToString(t *testing.T) { res := anyToString(info, false, 100) assert.Equal(t, "{Name:Joel, Number:1234, Time:2019-01-15 14:30:45 +0000 UTC}", res) } + +func TestJSONHistorySerializer_Serialize(t *testing.T) { + gen := idlfuzzedtestdata.NewFuzzerWithIDLTypes(t) + h := types.History{} + gen.Fuzz(&h) + serializer := JSONHistorySerializer{} + data, err := serializer.Serialize(&h) + assert.NoError(t, err) + roundTrip, err := serializer.Deserialize(data) + assert.NoError(t, err) + assert.Equal(t, h, *roundTrip) +} diff --git a/tools/common/schema/util_test.go b/tools/common/schema/util_test.go new file mode 100644 index 00000000000..b8caae322d4 --- /dev/null +++ b/tools/common/schema/util_test.go @@ -0,0 +1,70 @@ +// The MIT License (MIT) + +// Copyright (c) 2017-2020 Uber Technologies Inc. + +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in all +// copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +// SOFTWARE. + +package schema + +import ( + "io" + "io/fs" + "testing" + + "github.com/stretchr/testify/assert" +) + +const testdata = ` +ALTER TYPE domain_config ADD isolation_groups blob; +ALTER TYPE domain_config ADD isolation_groups_encoding text; +-- a comment +ALTER TYPE domain_config2 ADD isolation_groups_encoding text; +` + +type mockfile struct { + read bool +} + +func (m *mockfile) Stat() (fs.FileInfo, error) { + return nil, nil +} +func (m *mockfile) Read(in []byte) (int, error) { + if m.read { + return 0, io.EOF + } + for i := 0; i < len(in) && i < len([]byte(testdata)); i++ { + in[i] = []byte(testdata)[i] + } + m.read = true + return len(in), nil +} +func (m *mockfile) Close() error { + return nil +} + +func TestParseFile(t *testing.T) { + res, err := ParseFile(&mockfile{}) + assert.NoError(t, err) + expectedOutput := []string{ + "ALTER TYPE domain_config ADD isolation_groups blob;", + "ALTER TYPE domain_config ADD isolation_groups_encoding text;", + "ALTER TYPE domain_config2 ADD isolation_groups_encoding text;", + } + assert.Equal(t, expectedOutput, res) +}