Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: data residency adaptations #2703

Merged
merged 1 commit into from
Nov 17, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 6 additions & 3 deletions config/backend-config/backend-config.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,7 @@ func (bc *backendConfigImpl) Subscribe(ctx context.Context, topic Topic) pubsub.
return bc.eb.Subscribe(ctx, string(topic))
}

func newForDeployment(deploymentType deployment.Type, configEnvHandler types.ConfigEnvI) (BackendConfig, error) {
func newForDeployment(deploymentType deployment.Type, region string, configEnvHandler types.ConfigEnvI) (BackendConfig, error) {
backendConfig := &backendConfigImpl{
eb: pubsub.New(),
}
Expand All @@ -286,12 +286,14 @@ func newForDeployment(deploymentType deployment.Type, configEnvHandler types.Con
configJSONPath: configJSONPath,
configBackendURL: parsedConfigBackendURL,
configEnvHandler: configEnvHandler,
region: region,
}
case deployment.MultiTenantType:
backendConfig.workspaceConfig = &namespaceConfig{
ConfigBackendURL: parsedConfigBackendURL,
configBackendURL: parsedConfigBackendURL,
configEnvHandler: configEnvHandler,
cpRouterURL: cpRouterURL,
region: region,
}
default:
return nil, fmt.Errorf("deployment type %q not supported", deploymentType)
Expand All @@ -303,11 +305,12 @@ func newForDeployment(deploymentType deployment.Type, configEnvHandler types.Con
// Setup backend config
func Setup(configEnvHandler types.ConfigEnvI) (err error) {
deploymentType, err := deployment.GetFromEnv()
region := config.GetString("region", "")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be good, if this line is below err check line.

if err != nil {
return fmt.Errorf("deployment type from env: %w", err)
}

backendConfig, err := newForDeployment(deploymentType, configEnvHandler)
backendConfig, err := newForDeployment(deploymentType, region, configEnvHandler)
if err != nil {
return err
}
Expand Down
14 changes: 7 additions & 7 deletions config/backend-config/backend_config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,10 +111,10 @@ func TestBadResponse(t *testing.T) {

configs := map[string]workspaceConfig{
"namespace": &namespaceConfig{
ConfigBackendURL: parsedURL,
Namespace: "some-namespace",
Client: http.DefaultClient,
Logger: logger.NOP,
configBackendURL: parsedURL,
namespace: "some-namespace",
client: http.DefaultClient,
logger: logger.NOP,
},
"single-workspace": &singleWorkspaceConfig{
configBackendURL: parsedURL,
Expand Down Expand Up @@ -154,7 +154,7 @@ func TestNewForDeployment(t *testing.T) {
initBackendConfig()
t.Run("dedicated", func(t *testing.T) {
t.Setenv("WORKSPACE_TOKEN", "foobar")
conf, err := newForDeployment(deployment.DedicatedType, nil)
conf, err := newForDeployment(deployment.DedicatedType, "US", nil)
require.NoError(t, err)
cb, ok := conf.(*backendConfigImpl)
require.True(t, ok)
Expand All @@ -165,7 +165,7 @@ func TestNewForDeployment(t *testing.T) {
t.Run("multi-tenant", func(t *testing.T) {
t.Setenv("WORKSPACE_NAMESPACE", "spaghetti")
t.Setenv("HOSTED_SERVICE_SECRET", "foobar")
conf, err := newForDeployment(deployment.MultiTenantType, nil)
conf, err := newForDeployment(deployment.MultiTenantType, "", nil)
require.NoError(t, err)

cb, ok := conf.(*backendConfigImpl)
Expand All @@ -175,7 +175,7 @@ func TestNewForDeployment(t *testing.T) {
})

t.Run("unsupported", func(t *testing.T) {
_, err := newForDeployment("UNSUPPORTED_TYPE", nil)
_, err := newForDeployment("UNSUPPORTED_TYPE", "", nil)
require.ErrorContains(t, err, `deployment type "UNSUPPORTED_TYPE" not supported`)
})
}
Expand Down
61 changes: 33 additions & 28 deletions config/backend-config/namespace_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,45 +24,46 @@ type namespaceConfig struct {
configEnvHandler types.ConfigEnvI
cpRouterURL string

Logger logger.Logger
Client *http.Client
logger logger.Logger
client *http.Client

HostedServiceSecret string
hostedServiceSecret string

Namespace string
ConfigBackendURL *url.URL
namespace string
configBackendURL *url.URL
region string
}

func (nc *namespaceConfig) SetUp() (err error) {
if nc.Namespace == "" {
if nc.namespace == "" {
if !config.IsSet("WORKSPACE_NAMESPACE") {
return errors.New("workspaceNamespace is not configured")
}
nc.Namespace = config.GetString("WORKSPACE_NAMESPACE", "")
nc.namespace = config.GetString("WORKSPACE_NAMESPACE", "")
}
if nc.HostedServiceSecret == "" {
if nc.hostedServiceSecret == "" {
if !config.IsSet("HOSTED_SERVICE_SECRET") {
return errors.New("hostedServiceSecret is not configured")
}
nc.HostedServiceSecret = config.GetString("HOSTED_SERVICE_SECRET", "")
nc.hostedServiceSecret = config.GetString("HOSTED_SERVICE_SECRET", "")
}
if nc.ConfigBackendURL == nil {
if nc.configBackendURL == nil {
configBackendURL := config.GetString("CONFIG_BACKEND_URL", "https://api.rudderstack.com")
nc.ConfigBackendURL, err = url.Parse(configBackendURL)
nc.configBackendURL, err = url.Parse(configBackendURL)
if err != nil {
return err
}
}
if nc.Client == nil {
nc.Client = &http.Client{
if nc.client == nil {
nc.client = &http.Client{
Timeout: config.GetDuration("HttpClient.backendConfig.timeout", 30, time.Second),
}
}
if nc.Logger == nil {
nc.Logger = logger.NewLogger().Child("backend-config")
if nc.logger == nil {
nc.logger = logger.NewLogger().Child("backend-config")
}

nc.Logger.Infof("Fetching config for namespace %s", nc.Namespace)
nc.logger.Infof("Fetching config for namespace %s", nc.namespace)

return nil
}
Expand All @@ -75,26 +76,26 @@ func (nc *namespaceConfig) Get(ctx context.Context, workspaces string) (map[stri
// getFromApi gets the workspace config from api
func (nc *namespaceConfig) getFromAPI(ctx context.Context, _ string) (map[string]ConfigT, error) {
config := make(map[string]ConfigT)
if nc.Namespace == "" {
if nc.namespace == "" {
return config, fmt.Errorf("namespace is not configured")
}

var respBody []byte
u := *nc.ConfigBackendURL
u.Path = fmt.Sprintf("/data-plane/v1/namespaces/%s/config", nc.Namespace)
u := *nc.configBackendURL
u.Path = fmt.Sprintf("/data-plane/v1/namespaces/%s/config", nc.namespace)
operation := func() (fetchError error) {
nc.Logger.Debugf("Fetching config from %s", u.String())
nc.logger.Debugf("Fetching config from %s", u.String())
respBody, fetchError = nc.makeHTTPRequest(ctx, u.String())
return fetchError
}

backoffWithMaxRetry := backoff.WithContext(backoff.WithMaxRetries(backoff.NewExponentialBackOff(), 3), ctx)
err := backoff.RetryNotify(operation, backoffWithMaxRetry, func(err error, t time.Duration) {
nc.Logger.Warnf("Failed to fetch config from API with error: %v, retrying after %v", err, t)
nc.logger.Warnf("Failed to fetch config from API with error: %v, retrying after %v", err, t)
})
if err != nil {
if ctx.Err() == nil {
nc.Logger.Errorf("Error sending request to the server: %v", err)
nc.logger.Errorf("Error sending request to the server: %v", err)
}
return config, err
}
Expand All @@ -106,7 +107,7 @@ func (nc *namespaceConfig) getFromAPI(ctx context.Context, _ string) (map[string
var workspacesConfig map[string]ConfigT
err = jsonfast.Unmarshal(respBody, &workspacesConfig)
if err != nil {
nc.Logger.Errorf("Error while parsing request: %v", err)
nc.logger.Errorf("Error while parsing request: %v", err)
return config, err
}

Expand All @@ -127,8 +128,12 @@ func (nc *namespaceConfig) makeHTTPRequest(ctx context.Context, url string) ([]b
}

req.SetBasicAuth(nc.Identity().BasicAuth())

resp, err := nc.Client.Do(req)
if nc.region != "" {
q := req.URL.Query()
q.Add("region", nc.region)
req.URL.RawQuery = q.Encode()
}
resp, err := nc.client.Do(req)
if err != nil {
return nil, err
}
Expand All @@ -148,12 +153,12 @@ func (nc *namespaceConfig) makeHTTPRequest(ctx context.Context, url string) ([]b
}

func (nc *namespaceConfig) AccessToken() string {
return nc.HostedServiceSecret
return nc.hostedServiceSecret
}

func (nc *namespaceConfig) Identity() identity.Identifier {
return &identity.Namespace{
Namespace: nc.Namespace,
HostedSecret: nc.HostedServiceSecret,
Namespace: nc.namespace,
HostedSecret: nc.hostedServiceSecret,
}
}
44 changes: 22 additions & 22 deletions config/backend-config/namespace_config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import (
func Test_Namespace_SetUp(t *testing.T) {
var (
client = &namespaceConfig{
Logger: logger.NOP,
logger: logger.NOP,
}
configBackendURL = "https://api.test.rudderstack.com"
)
Expand All @@ -30,10 +30,10 @@ func Test_Namespace_SetUp(t *testing.T) {
t.Setenv("CONFIG_BACKEND_URL", parsedConfigBackendURL.String())

require.NoError(t, client.SetUp())
require.Equal(t, parsedConfigBackendURL, client.ConfigBackendURL)
require.Equal(t, "a-testing-namespace", client.Namespace)
require.Equal(t, parsedConfigBackendURL, client.configBackendURL)
require.Equal(t, "a-testing-namespace", client.namespace)
require.Equal(t, "service-secret", client.AccessToken())
require.Equal(t, "service-secret", client.HostedServiceSecret)
require.Equal(t, "service-secret", client.hostedServiceSecret)
}

func Test_Namespace_Get(t *testing.T) {
Expand All @@ -57,14 +57,14 @@ func Test_Namespace_Get(t *testing.T) {
require.NoError(t, err)

client := &namespaceConfig{
Logger: logger.NOP,
logger: logger.NOP,

Client: ts.Client(),
ConfigBackendURL: httpSrvURL,
client: ts.Client(),
configBackendURL: httpSrvURL,

Namespace: namespace,
namespace: namespace,

HostedServiceSecret: "service-secret",
hostedServiceSecret: "service-secret",
cpRouterURL: cpRouterURL,
}
require.NoError(t, client.SetUp())
Expand All @@ -80,11 +80,11 @@ func Test_Namespace_Get(t *testing.T) {

t.Run("Invalid credentials", func(t *testing.T) {
client := &namespaceConfig{
Client: ts.Client(),
ConfigBackendURL: httpSrvURL,
client: ts.Client(),
configBackendURL: httpSrvURL,

Namespace: namespace,
HostedServiceSecret: "invalid-service-secret",
namespace: namespace,
hostedServiceSecret: "invalid-service-secret",
}

require.NoError(t, client.SetUp())
Expand All @@ -96,11 +96,11 @@ func Test_Namespace_Get(t *testing.T) {

t.Run("empty namespace", func(t *testing.T) {
client := &namespaceConfig{
Client: ts.Client(),
ConfigBackendURL: httpSrvURL,
client: ts.Client(),
configBackendURL: httpSrvURL,

Namespace: "namespace-does-not-exist",
HostedServiceSecret: "service-secret",
namespace: "namespace-does-not-exist",
hostedServiceSecret: "service-secret",
}

require.NoError(t, client.SetUp())
Expand Down Expand Up @@ -130,14 +130,14 @@ func Test_Namespace_Identity(t *testing.T) {
require.NoError(t, err)

client := &namespaceConfig{
Logger: logger.NOP,
logger: logger.NOP,

Client: ts.Client(),
ConfigBackendURL: httpSrvURL,
client: ts.Client(),
configBackendURL: httpSrvURL,

Namespace: namespace,
namespace: namespace,

HostedServiceSecret: "service-secret",
hostedServiceSecret: "service-secret",
cpRouterURL: cpRouterURL,
}
require.NoError(t, client.SetUp())
Expand Down
29 changes: 18 additions & 11 deletions config/backend-config/single_workspace.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,14 @@ import (
)

type singleWorkspaceConfig struct {
doOnce sync.Once
Token string
workspaceID string
token string
configBackendURL *url.URL
configJSONPath string
configEnvHandler types.ConfigEnvI
region string

workspaceIDOnce sync.Once
workspaceID string
}

func (wc *singleWorkspaceConfig) SetUp() error {
Expand All @@ -33,17 +35,17 @@ func (wc *singleWorkspaceConfig) SetUp() error {
}
return nil
}
if wc.Token == "" {
wc.Token = config.GetWorkspaceToken()
if wc.token == "" {
wc.token = config.GetWorkspaceToken()
}
if wc.Token == "" {
if wc.token == "" {
return fmt.Errorf("single workspace: empty workspace config token")
}
return nil
}

func (wc *singleWorkspaceConfig) AccessToken() string {
return wc.Token
return wc.token
}

// Get returns sources from the workspace
Expand Down Expand Up @@ -97,7 +99,7 @@ func (wc *singleWorkspaceConfig) getFromAPI(ctx context.Context, _ string) (map[
}
workspaceID := sourcesJSON.WorkspaceID

wc.doOnce.Do(func() {
wc.workspaceIDOnce.Do(func() {
wc.workspaceID = workspaceID
})
config[workspaceID] = sourcesJSON
Expand All @@ -120,7 +122,7 @@ func (wc *singleWorkspaceConfig) getFromFile() (map[string]ConfigT, error) {
return config, err
}
workspaceID := configJSON.WorkspaceID
wc.doOnce.Do(func() {
wc.workspaceIDOnce.Do(func() {
pkgLogger.Info("Read workspace config from JSON file")
wc.workspaceID = workspaceID
})
Expand All @@ -134,8 +136,13 @@ func (wc *singleWorkspaceConfig) makeHTTPRequest(ctx context.Context, url string
return nil, err
}

req.SetBasicAuth(wc.Token, "")
req.SetBasicAuth(wc.token, "")
req.Header.Set("Content-Type", "application/json")
if wc.region != "" {
q := req.URL.Query()
q.Add("region", wc.region)
req.URL.RawQuery = q.Encode()
}

client := &http.Client{Timeout: config.GetDuration("HttpClient.backendConfig.timeout", 30, time.Second)}
resp, err := client.Do(req)
Expand All @@ -160,6 +167,6 @@ func (wc *singleWorkspaceConfig) makeHTTPRequest(ctx context.Context, url string
func (wc *singleWorkspaceConfig) Identity() identity.Identifier {
return &identity.Workspace{
WorkspaceID: wc.workspaceID,
WorkspaceToken: wc.Token,
WorkspaceToken: wc.token,
}
}
Loading