Skip to content

Commit

Permalink
chore: data residency adaptations, introducing region
Browse files Browse the repository at this point in the history
  • Loading branch information
atzoum committed Nov 17, 2022
1 parent ad2626f commit 82a017f
Show file tree
Hide file tree
Showing 10 changed files with 171 additions and 130 deletions.
9 changes: 6 additions & 3 deletions config/backend-config/backend-config.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,7 @@ func (bc *backendConfigImpl) Subscribe(ctx context.Context, topic Topic) pubsub.
return bc.eb.Subscribe(ctx, string(topic))
}

func newForDeployment(deploymentType deployment.Type, configEnvHandler types.ConfigEnvI) (BackendConfig, error) {
func newForDeployment(deploymentType deployment.Type, region string, configEnvHandler types.ConfigEnvI) (BackendConfig, error) {
backendConfig := &backendConfigImpl{
eb: pubsub.New(),
}
Expand All @@ -286,12 +286,14 @@ func newForDeployment(deploymentType deployment.Type, configEnvHandler types.Con
configJSONPath: configJSONPath,
configBackendURL: parsedConfigBackendURL,
configEnvHandler: configEnvHandler,
region: region,
}
case deployment.MultiTenantType:
backendConfig.workspaceConfig = &namespaceConfig{
ConfigBackendURL: parsedConfigBackendURL,
configBackendURL: parsedConfigBackendURL,
configEnvHandler: configEnvHandler,
cpRouterURL: cpRouterURL,
region: region,
}
default:
return nil, fmt.Errorf("deployment type %q not supported", deploymentType)
Expand All @@ -303,11 +305,12 @@ func newForDeployment(deploymentType deployment.Type, configEnvHandler types.Con
// Setup backend config
func Setup(configEnvHandler types.ConfigEnvI) (err error) {
deploymentType, err := deployment.GetFromEnv()
region := config.GetString("region", "")
if err != nil {
return fmt.Errorf("deployment type from env: %w", err)
}

backendConfig, err := newForDeployment(deploymentType, configEnvHandler)
backendConfig, err := newForDeployment(deploymentType, region, configEnvHandler)
if err != nil {
return err
}
Expand Down
14 changes: 7 additions & 7 deletions config/backend-config/backend_config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,10 +111,10 @@ func TestBadResponse(t *testing.T) {

configs := map[string]workspaceConfig{
"namespace": &namespaceConfig{
ConfigBackendURL: parsedURL,
Namespace: "some-namespace",
Client: http.DefaultClient,
Logger: logger.NOP,
configBackendURL: parsedURL,
namespace: "some-namespace",
client: http.DefaultClient,
logger: logger.NOP,
},
"single-workspace": &singleWorkspaceConfig{
configBackendURL: parsedURL,
Expand Down Expand Up @@ -154,7 +154,7 @@ func TestNewForDeployment(t *testing.T) {
initBackendConfig()
t.Run("dedicated", func(t *testing.T) {
t.Setenv("WORKSPACE_TOKEN", "foobar")
conf, err := newForDeployment(deployment.DedicatedType, nil)
conf, err := newForDeployment(deployment.DedicatedType, "US", nil)
require.NoError(t, err)
cb, ok := conf.(*backendConfigImpl)
require.True(t, ok)
Expand All @@ -165,7 +165,7 @@ func TestNewForDeployment(t *testing.T) {
t.Run("multi-tenant", func(t *testing.T) {
t.Setenv("WORKSPACE_NAMESPACE", "spaghetti")
t.Setenv("HOSTED_SERVICE_SECRET", "foobar")
conf, err := newForDeployment(deployment.MultiTenantType, nil)
conf, err := newForDeployment(deployment.MultiTenantType, "", nil)
require.NoError(t, err)

cb, ok := conf.(*backendConfigImpl)
Expand All @@ -175,7 +175,7 @@ func TestNewForDeployment(t *testing.T) {
})

t.Run("unsupported", func(t *testing.T) {
_, err := newForDeployment("UNSUPPORTED_TYPE", nil)
_, err := newForDeployment("UNSUPPORTED_TYPE", "", nil)
require.ErrorContains(t, err, `deployment type "UNSUPPORTED_TYPE" not supported`)
})
}
Expand Down
61 changes: 33 additions & 28 deletions config/backend-config/namespace_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,45 +24,46 @@ type namespaceConfig struct {
configEnvHandler types.ConfigEnvI
cpRouterURL string

Logger logger.Logger
Client *http.Client
logger logger.Logger
client *http.Client

HostedServiceSecret string
hostedServiceSecret string

Namespace string
ConfigBackendURL *url.URL
namespace string
configBackendURL *url.URL
region string
}

func (nc *namespaceConfig) SetUp() (err error) {
if nc.Namespace == "" {
if nc.namespace == "" {
if !config.IsSet("WORKSPACE_NAMESPACE") {
return errors.New("workspaceNamespace is not configured")
}
nc.Namespace = config.GetString("WORKSPACE_NAMESPACE", "")
nc.namespace = config.GetString("WORKSPACE_NAMESPACE", "")
}
if nc.HostedServiceSecret == "" {
if nc.hostedServiceSecret == "" {
if !config.IsSet("HOSTED_SERVICE_SECRET") {
return errors.New("hostedServiceSecret is not configured")
}
nc.HostedServiceSecret = config.GetString("HOSTED_SERVICE_SECRET", "")
nc.hostedServiceSecret = config.GetString("HOSTED_SERVICE_SECRET", "")
}
if nc.ConfigBackendURL == nil {
if nc.configBackendURL == nil {
configBackendURL := config.GetString("CONFIG_BACKEND_URL", "https://api.rudderstack.com")
nc.ConfigBackendURL, err = url.Parse(configBackendURL)
nc.configBackendURL, err = url.Parse(configBackendURL)
if err != nil {
return err
}
}
if nc.Client == nil {
nc.Client = &http.Client{
if nc.client == nil {
nc.client = &http.Client{
Timeout: config.GetDuration("HttpClient.backendConfig.timeout", 30, time.Second),
}
}
if nc.Logger == nil {
nc.Logger = logger.NewLogger().Child("backend-config")
if nc.logger == nil {
nc.logger = logger.NewLogger().Child("backend-config")
}

nc.Logger.Infof("Fetching config for namespace %s", nc.Namespace)
nc.logger.Infof("Fetching config for namespace %s", nc.namespace)

return nil
}
Expand All @@ -75,26 +76,26 @@ func (nc *namespaceConfig) Get(ctx context.Context, workspaces string) (map[stri
// getFromApi gets the workspace config from api
func (nc *namespaceConfig) getFromAPI(ctx context.Context, _ string) (map[string]ConfigT, error) {
config := make(map[string]ConfigT)
if nc.Namespace == "" {
if nc.namespace == "" {
return config, fmt.Errorf("namespace is not configured")
}

var respBody []byte
u := *nc.ConfigBackendURL
u.Path = fmt.Sprintf("/data-plane/v1/namespaces/%s/config", nc.Namespace)
u := *nc.configBackendURL
u.Path = fmt.Sprintf("/data-plane/v1/namespaces/%s/config", nc.namespace)
operation := func() (fetchError error) {
nc.Logger.Debugf("Fetching config from %s", u.String())
nc.logger.Debugf("Fetching config from %s", u.String())
respBody, fetchError = nc.makeHTTPRequest(ctx, u.String())
return fetchError
}

backoffWithMaxRetry := backoff.WithContext(backoff.WithMaxRetries(backoff.NewExponentialBackOff(), 3), ctx)
err := backoff.RetryNotify(operation, backoffWithMaxRetry, func(err error, t time.Duration) {
nc.Logger.Warnf("Failed to fetch config from API with error: %v, retrying after %v", err, t)
nc.logger.Warnf("Failed to fetch config from API with error: %v, retrying after %v", err, t)
})
if err != nil {
if ctx.Err() == nil {
nc.Logger.Errorf("Error sending request to the server: %v", err)
nc.logger.Errorf("Error sending request to the server: %v", err)
}
return config, err
}
Expand All @@ -106,7 +107,7 @@ func (nc *namespaceConfig) getFromAPI(ctx context.Context, _ string) (map[string
var workspacesConfig map[string]ConfigT
err = jsonfast.Unmarshal(respBody, &workspacesConfig)
if err != nil {
nc.Logger.Errorf("Error while parsing request: %v", err)
nc.logger.Errorf("Error while parsing request: %v", err)
return config, err
}

Expand All @@ -127,8 +128,12 @@ func (nc *namespaceConfig) makeHTTPRequest(ctx context.Context, url string) ([]b
}

req.SetBasicAuth(nc.Identity().BasicAuth())

resp, err := nc.Client.Do(req)
if nc.region != "" {
q := req.URL.Query()
q.Add("region", nc.region)
req.URL.RawQuery = q.Encode()
}
resp, err := nc.client.Do(req)
if err != nil {
return nil, err
}
Expand All @@ -148,12 +153,12 @@ func (nc *namespaceConfig) makeHTTPRequest(ctx context.Context, url string) ([]b
}

func (nc *namespaceConfig) AccessToken() string {
return nc.HostedServiceSecret
return nc.hostedServiceSecret
}

func (nc *namespaceConfig) Identity() identity.Identifier {
return &identity.Namespace{
Namespace: nc.Namespace,
HostedSecret: nc.HostedServiceSecret,
Namespace: nc.namespace,
HostedSecret: nc.hostedServiceSecret,
}
}
44 changes: 22 additions & 22 deletions config/backend-config/namespace_config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import (
func Test_Namespace_SetUp(t *testing.T) {
var (
client = &namespaceConfig{
Logger: logger.NOP,
logger: logger.NOP,
}
configBackendURL = "https://api.test.rudderstack.com"
)
Expand All @@ -30,10 +30,10 @@ func Test_Namespace_SetUp(t *testing.T) {
t.Setenv("CONFIG_BACKEND_URL", parsedConfigBackendURL.String())

require.NoError(t, client.SetUp())
require.Equal(t, parsedConfigBackendURL, client.ConfigBackendURL)
require.Equal(t, "a-testing-namespace", client.Namespace)
require.Equal(t, parsedConfigBackendURL, client.configBackendURL)
require.Equal(t, "a-testing-namespace", client.namespace)
require.Equal(t, "service-secret", client.AccessToken())
require.Equal(t, "service-secret", client.HostedServiceSecret)
require.Equal(t, "service-secret", client.hostedServiceSecret)
}

func Test_Namespace_Get(t *testing.T) {
Expand All @@ -57,14 +57,14 @@ func Test_Namespace_Get(t *testing.T) {
require.NoError(t, err)

client := &namespaceConfig{
Logger: logger.NOP,
logger: logger.NOP,

Client: ts.Client(),
ConfigBackendURL: httpSrvURL,
client: ts.Client(),
configBackendURL: httpSrvURL,

Namespace: namespace,
namespace: namespace,

HostedServiceSecret: "service-secret",
hostedServiceSecret: "service-secret",
cpRouterURL: cpRouterURL,
}
require.NoError(t, client.SetUp())
Expand All @@ -80,11 +80,11 @@ func Test_Namespace_Get(t *testing.T) {

t.Run("Invalid credentials", func(t *testing.T) {
client := &namespaceConfig{
Client: ts.Client(),
ConfigBackendURL: httpSrvURL,
client: ts.Client(),
configBackendURL: httpSrvURL,

Namespace: namespace,
HostedServiceSecret: "invalid-service-secret",
namespace: namespace,
hostedServiceSecret: "invalid-service-secret",
}

require.NoError(t, client.SetUp())
Expand All @@ -96,11 +96,11 @@ func Test_Namespace_Get(t *testing.T) {

t.Run("empty namespace", func(t *testing.T) {
client := &namespaceConfig{
Client: ts.Client(),
ConfigBackendURL: httpSrvURL,
client: ts.Client(),
configBackendURL: httpSrvURL,

Namespace: "namespace-does-not-exist",
HostedServiceSecret: "service-secret",
namespace: "namespace-does-not-exist",
hostedServiceSecret: "service-secret",
}

require.NoError(t, client.SetUp())
Expand Down Expand Up @@ -130,14 +130,14 @@ func Test_Namespace_Identity(t *testing.T) {
require.NoError(t, err)

client := &namespaceConfig{
Logger: logger.NOP,
logger: logger.NOP,

Client: ts.Client(),
ConfigBackendURL: httpSrvURL,
client: ts.Client(),
configBackendURL: httpSrvURL,

Namespace: namespace,
namespace: namespace,

HostedServiceSecret: "service-secret",
hostedServiceSecret: "service-secret",
cpRouterURL: cpRouterURL,
}
require.NoError(t, client.SetUp())
Expand Down
29 changes: 18 additions & 11 deletions config/backend-config/single_workspace.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,14 @@ import (
)

type singleWorkspaceConfig struct {
doOnce sync.Once
Token string
workspaceID string
token string
configBackendURL *url.URL
configJSONPath string
configEnvHandler types.ConfigEnvI
region string

workspaceIDOnce sync.Once
workspaceID string
}

func (wc *singleWorkspaceConfig) SetUp() error {
Expand All @@ -33,17 +35,17 @@ func (wc *singleWorkspaceConfig) SetUp() error {
}
return nil
}
if wc.Token == "" {
wc.Token = config.GetWorkspaceToken()
if wc.token == "" {
wc.token = config.GetWorkspaceToken()
}
if wc.Token == "" {
if wc.token == "" {
return fmt.Errorf("single workspace: empty workspace config token")
}
return nil
}

func (wc *singleWorkspaceConfig) AccessToken() string {
return wc.Token
return wc.token
}

// Get returns sources from the workspace
Expand Down Expand Up @@ -97,7 +99,7 @@ func (wc *singleWorkspaceConfig) getFromAPI(ctx context.Context, _ string) (map[
}
workspaceID := sourcesJSON.WorkspaceID

wc.doOnce.Do(func() {
wc.workspaceIDOnce.Do(func() {
wc.workspaceID = workspaceID
})
config[workspaceID] = sourcesJSON
Expand All @@ -120,7 +122,7 @@ func (wc *singleWorkspaceConfig) getFromFile() (map[string]ConfigT, error) {
return config, err
}
workspaceID := configJSON.WorkspaceID
wc.doOnce.Do(func() {
wc.workspaceIDOnce.Do(func() {
pkgLogger.Info("Read workspace config from JSON file")
wc.workspaceID = workspaceID
})
Expand All @@ -134,8 +136,13 @@ func (wc *singleWorkspaceConfig) makeHTTPRequest(ctx context.Context, url string
return nil, err
}

req.SetBasicAuth(wc.Token, "")
req.SetBasicAuth(wc.token, "")
req.Header.Set("Content-Type", "application/json")
if wc.region != "" {
q := req.URL.Query()
q.Add("region", wc.region)
req.URL.RawQuery = q.Encode()
}

client := &http.Client{Timeout: config.GetDuration("HttpClient.backendConfig.timeout", 30, time.Second)}
resp, err := client.Do(req)
Expand All @@ -160,6 +167,6 @@ func (wc *singleWorkspaceConfig) makeHTTPRequest(ctx context.Context, url string
func (wc *singleWorkspaceConfig) Identity() identity.Identifier {
return &identity.Workspace{
WorkspaceID: wc.workspaceID,
WorkspaceToken: wc.Token,
WorkspaceToken: wc.token,
}
}
Loading

0 comments on commit 82a017f

Please sign in to comment.