Skip to content

Commit

Permalink
Option to enable forwarding all APIs (#2367)
Browse files Browse the repository at this point in the history
  • Loading branch information
yiminc authored Jan 12, 2022
1 parent c61e2c8 commit f3780d0
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 0 deletions.
21 changes: 21 additions & 0 deletions service/frontend/dcRedirectionPolicy.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,9 @@ const (
// 6. QueryWorkflow
// please also reference selectedAPIsForwardingRedirectionPolicyWhitelistedAPIs
DCRedirectionPolicySelectedAPIsForwarding = "selected-apis-forwarding"

// DCRedirectionPolicyAllAPIsForwarding means forwarding all APIs based on namespace active cluster
DCRedirectionPolicyAllAPIsForwarding = "all-apis-forwarding"
)

type (
Expand All @@ -71,6 +74,7 @@ type (
currentClusterName string
config *Config
namespaceRegistry namespace.Registry
enableForAllAPIs bool
}
)

Expand All @@ -96,6 +100,9 @@ func RedirectionPolicyGenerator(clusterMetadata cluster.Metadata, config *Config
case DCRedirectionPolicySelectedAPIsForwarding:
currentClusterName := clusterMetadata.GetCurrentClusterName()
return NewSelectedAPIsForwardingPolicy(currentClusterName, config, namespaceRegistry)
case DCRedirectionPolicyAllAPIsForwarding:
currentClusterName := clusterMetadata.GetCurrentClusterName()
return NewAllAPIsForwardingPolicy(currentClusterName, config, namespaceRegistry)
default:
panic(fmt.Sprintf("Unknown DC redirection policy %v", policy.Policy))
}
Expand Down Expand Up @@ -127,6 +134,16 @@ func NewSelectedAPIsForwardingPolicy(currentClusterName string, config *Config,
}
}

// NewAllAPIsForwardingPolicy creates a forwarding policy for all APIs based on namespace
func NewAllAPIsForwardingPolicy(currentClusterName string, config *Config, namespaceRegistry namespace.Registry) *SelectedAPIsForwardingRedirectionPolicy {
return &SelectedAPIsForwardingRedirectionPolicy{
currentClusterName: currentClusterName,
config: config,
namespaceRegistry: namespaceRegistry,
enableForAllAPIs: true,
}
}

// WithNamespaceIDRedirect redirect the API call based on namespace ID
func (policy *SelectedAPIsForwardingRedirectionPolicy) WithNamespaceIDRedirect(ctx context.Context, namespaceID namespace.ID, apiName string, call func(string) error) error {
namespaceEntry, err := policy.namespaceRegistry.GetNamespaceByID(namespaceID)
Expand Down Expand Up @@ -180,6 +197,10 @@ func (policy *SelectedAPIsForwardingRedirectionPolicy) getTargetClusterAndIsName
return policy.currentClusterName, false
}

if policy.enableForAllAPIs {
return namespaceEntry.ActiveClusterName(), true
}

_, ok := selectedAPIsForwardingRedirectionPolicyWhitelistedAPIs[apiName]
if !ok {
// do not do dc redirection if API is not whitelisted
Expand Down
30 changes: 30 additions & 0 deletions service/frontend/dcRedirectionPolicy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -334,6 +334,36 @@ func (s *selectedAPIsForwardingRedirectionPolicySuite) TestGetTargetDataCenter_G
s.Equal(2*len(selectedAPIsForwardingRedirectionPolicyWhitelistedAPIs), alternativeClustercallCount)
}

func (s *selectedAPIsForwardingRedirectionPolicySuite) TestGetTargetDataCenter_GlobalNamespace_Forwarding_AlternativeClusterToCurrentCluster_AllAPIs() {
s.setupGlobalNamespaceWithTwoReplicationCluster(true, false)
s.policy.enableForAllAPIs = true

currentClustercallCount := 0
alternativeClustercallCount := 0
callFn := func(targetCluster string) error {
switch targetCluster {
case s.currentClusterName:
currentClustercallCount++
return nil
case s.alternativeClusterName:
alternativeClustercallCount++
return serviceerror.NewNamespaceNotActive("", s.alternativeClusterName, s.currentClusterName)
default:
panic(fmt.Sprintf("unknown cluster name %v", targetCluster))
}
}

apiName := "NotExistRandomAPI"
err := s.policy.WithNamespaceIDRedirect(context.Background(), s.namespaceID, apiName, callFn)
s.Nil(err)

err = s.policy.WithNamespaceRedirect(context.Background(), s.namespace, apiName, callFn)
s.Nil(err)

s.Equal(2, currentClustercallCount)
s.Equal(2, alternativeClustercallCount)
}

func (s *selectedAPIsForwardingRedirectionPolicySuite) setupLocalNamespace() {
namespaceEntry := namespace.NewLocalNamespaceForTest(
&persistencespb.NamespaceInfo{Id: s.namespaceID.String(), Name: s.namespace.String()},
Expand Down

0 comments on commit f3780d0

Please sign in to comment.