diff --git a/.github/workflows/tests.yaml b/.github/workflows/tests.yaml index 8f872a9b77..e6adf207f9 100644 --- a/.github/workflows/tests.yaml +++ b/.github/workflows/tests.yaml @@ -168,6 +168,11 @@ jobs: go-version-file: 'go.mod' - run: go version - run: go mod download + - name: Login to DockerHub + uses: docker/login-action@v3.3.0 + with: + username: rudderlabs + password: ${{ secrets.DOCKERHUB_TOKEN }} - name: Package Unit [ ${{ matrix.package }} ] env: TEST_KAFKA_CONFLUENT_CLOUD_HOST: ${{ secrets.TEST_KAFKA_CONFLUENT_CLOUD_HOST }} diff --git a/integration_test/warehouse/integration_test.go b/integration_test/warehouse/warehouse_test.go similarity index 81% rename from integration_test/warehouse/integration_test.go rename to integration_test/warehouse/warehouse_test.go index 2b64911686..c16de182a7 100644 --- a/integration_test/warehouse/integration_test.go +++ b/integration_test/warehouse/warehouse_test.go @@ -14,8 +14,10 @@ import ( "strings" "sync" "testing" + "text/template" "time" + "github.com/mitchellh/mapstructure" "go.uber.org/atomic" "go.uber.org/mock/gomock" @@ -33,6 +35,7 @@ import ( kithelper "github.com/rudderlabs/rudder-go-kit/testhelper" "github.com/rudderlabs/rudder-go-kit/testhelper/docker/resource/minio" "github.com/rudderlabs/rudder-go-kit/testhelper/docker/resource/postgres" + "github.com/rudderlabs/rudder-server/admin" "github.com/rudderlabs/rudder-server/app" backendconfig "github.com/rudderlabs/rudder-server/backend-config" @@ -40,7 +43,9 @@ import ( "github.com/rudderlabs/rudder-server/jobsdb" mocksApp "github.com/rudderlabs/rudder-server/mocks/app" mocksBackendConfig "github.com/rudderlabs/rudder-server/mocks/backend-config" + "github.com/rudderlabs/rudder-server/processor/transformer" "github.com/rudderlabs/rudder-server/services/controlplane/identity" + "github.com/rudderlabs/rudder-server/testhelper/backendconfigtest" "github.com/rudderlabs/rudder-server/testhelper/health" "github.com/rudderlabs/rudder-server/utils/misc" "github.com/rudderlabs/rudder-server/utils/pubsub" @@ -1501,6 +1506,428 @@ func TestUploads(t *testing.T) { }) } +func TestDestinationTransformation(t *testing.T) { + c := testcompose.New(t, compose.FilePaths([]string{"../../warehouse/integrations/testdata/docker-compose.transformer.yml"})) + c.Start(context.Background()) + + transformerURL := fmt.Sprintf("http://localhost:%d", c.Port("transformer", 9090)) + + conf := config.New() + conf.Set("DEST_TRANSFORM_URL", transformerURL) + conf.Set("USER_TRANSFORM_URL", transformerURL) + + type output struct { + Metadata struct { + Table string `mapstructure:"table"` + Columns map[string]string `mapstructure:"columns"` + } `mapstructure:"metadata"` + Data map[string]any `mapstructure:"data"` + } + + t.Run("allowUsersContextTraits", func(t *testing.T) { + testcases := []struct { + name string + configOverride map[string]any + validateEvents func(t *testing.T, events []transformer.TransformerResponse) + }{ + { + name: "with allowUsersContextTraits=true", + configOverride: map[string]any{ + "allowUsersContextTraits": true, + }, + validateEvents: func(t *testing.T, events []transformer.TransformerResponse) { + var identifyEvent output + err := mapstructure.Decode(events[0].Output, &identifyEvent) + require.NoError(t, err) + require.Equal(t, "identifies", identifyEvent.Metadata.Table) + require.Contains(t, identifyEvent.Metadata.Columns, "firstname") + require.Contains(t, identifyEvent.Metadata.Columns, "context_traits_firstname") + require.Contains(t, identifyEvent.Metadata.Columns, "lastname") + require.Equal(t, "Mickey", identifyEvent.Data["firstname"]) + require.Equal(t, "Mouse", identifyEvent.Data["lastname"]) + require.Equal(t, "Mickey", identifyEvent.Data["context_traits_firstname"]) + + var userEvent output + err = mapstructure.Decode(events[1].Output, &userEvent) + require.NoError(t, err) + require.Equal(t, "users", userEvent.Metadata.Table) + require.Contains(t, userEvent.Metadata.Columns, "firstname") + require.Contains(t, userEvent.Metadata.Columns, "context_traits_firstname") + require.Contains(t, userEvent.Metadata.Columns, "lastname") + require.Equal(t, "Mickey", userEvent.Data["firstname"]) + require.Equal(t, "Mouse", userEvent.Data["lastname"]) + require.Equal(t, "Mickey", userEvent.Data["context_traits_firstname"]) + }, + }, + { + name: "with allowUsersContextTraits=false", + configOverride: map[string]any{ + "allowUsersContextTraits": false, + }, + validateEvents: func(t *testing.T, events []transformer.TransformerResponse) { + var identifyEvent output + err := mapstructure.Decode(events[0].Output, &identifyEvent) + require.NoError(t, err) + require.Equal(t, "identifies", identifyEvent.Metadata.Table) + require.NotContains(t, identifyEvent.Metadata.Columns, "firstname") + require.Contains(t, identifyEvent.Metadata.Columns, "context_traits_firstname") + require.Contains(t, identifyEvent.Metadata.Columns, "lastname") + require.NotContains(t, identifyEvent.Data, "firstname") + require.Equal(t, "Mouse", identifyEvent.Data["lastname"]) + require.Equal(t, "Mickey", identifyEvent.Data["context_traits_firstname"]) + + var userEvent output + err = mapstructure.Decode(events[1].Output, &userEvent) + require.NoError(t, err) + require.Equal(t, "users", userEvent.Metadata.Table) + require.NotContains(t, userEvent.Metadata.Columns, "firstname") + require.Contains(t, userEvent.Metadata.Columns, "context_traits_firstname") + require.Contains(t, userEvent.Metadata.Columns, "lastname") + require.NotContains(t, userEvent.Data, "firstname") + require.Equal(t, "Mouse", userEvent.Data["lastname"]) + require.Equal(t, "Mickey", userEvent.Data["context_traits_firstname"]) + }, + }, + { + name: "without allowUsersContextTraits", + configOverride: map[string]any{}, + validateEvents: func(t *testing.T, events []transformer.TransformerResponse) { + var identifyEvent output + err := mapstructure.Decode(events[0].Output, &identifyEvent) + require.NoError(t, err) + require.Equal(t, "identifies", identifyEvent.Metadata.Table) + require.NotContains(t, identifyEvent.Metadata.Columns, "firstname") + require.Contains(t, identifyEvent.Metadata.Columns, "context_traits_firstname") + require.Contains(t, identifyEvent.Metadata.Columns, "lastname") + require.NotContains(t, identifyEvent.Data, "firstname") + require.Equal(t, "Mouse", identifyEvent.Data["lastname"]) + require.Equal(t, "Mickey", identifyEvent.Data["context_traits_firstname"]) + + var userEvent output + err = mapstructure.Decode(events[1].Output, &userEvent) + require.NoError(t, err) + require.Equal(t, "users", userEvent.Metadata.Table) + require.NotContains(t, userEvent.Metadata.Columns, "firstname") + require.Contains(t, userEvent.Metadata.Columns, "context_traits_firstname") + require.Contains(t, userEvent.Metadata.Columns, "lastname") + require.NotContains(t, userEvent.Data, "firstname") + require.Equal(t, "Mouse", userEvent.Data["lastname"]) + require.Equal(t, "Mickey", userEvent.Data["context_traits_firstname"]) + }, + }, + } + + for _, tc := range testcases { + destinationBuilder := backendconfigtest.NewDestinationBuilder(whutils.BQ). + WithID(destinationID). + WithRevisionID(destinationID) + for k, v := range tc.configOverride { + destinationBuilder.WithConfigOption(k, v) + } + destination := destinationBuilder.Build() + + destinationJSON, err := json.Marshal(destination) + require.NoError(t, err) + + eventTemplate := ` + [ + { + "message": { + "context": { + "traits": { + "firstname": "Mickey" + } + }, + "traits": { + "lastname": "Mouse" + }, + "type": "identify", + "userId": "9bb5d4c2-a7aa-4a36-9efb-dd2b1aec5d33" + }, + "destination": {{.destination}} + } + ] +` + + tpl, err := template.New(uuid.New().String()).Parse(eventTemplate) + require.NoError(t, err) + + b := new(strings.Builder) + err = tpl.Execute(b, map[string]any{ + "destination": string(destinationJSON), + }) + require.NoError(t, err) + + var transformerEvents []transformer.TransformerEvent + err = json.Unmarshal([]byte(b.String()), &transformerEvents) + require.NoError(t, err) + + tr := transformer.NewTransformer(conf, logger.NOP, stats.Default) + response := tr.Transform(context.Background(), transformerEvents, 100) + require.Zero(t, len(response.FailedEvents)) + require.Len(t, response.Events, 2) + + tc.validateEvents(t, response.Events) + } + }) + t.Run("underscoreDivideNumbers", func(t *testing.T) { + testcases := []struct { + name string + configOverride map[string]any + validateEvents func(t *testing.T, events []transformer.TransformerResponse) + }{ + { + name: "with underscoreDivideNumbers=true", + configOverride: map[string]any{ + "underscoreDivideNumbers": true, + }, + validateEvents: func(t *testing.T, events []transformer.TransformerResponse) { + var trackOutput output + err := mapstructure.Decode(events[0].Output, &trackOutput) + require.NoError(t, err) + require.Equal(t, "tracks", trackOutput.Metadata.Table) + require.Contains(t, trackOutput.Metadata.Columns, "context_traits_attribute_v_3") + require.Equal(t, "button_clicked_v_2", trackOutput.Data["event"]) + require.Equal(t, "button clicked v2", trackOutput.Data["event_text"]) + require.Equal(t, "some value", trackOutput.Data["context_traits_attribute_v_3"]) + + var buttonClickedOutput output + err = mapstructure.Decode(events[1].Output, &buttonClickedOutput) + require.NoError(t, err) + require.Equal(t, "button_clicked_v_2", buttonClickedOutput.Metadata.Table) + require.Contains(t, buttonClickedOutput.Metadata.Columns, "context_traits_attribute_v_3") + require.Equal(t, "button_clicked_v_2", buttonClickedOutput.Data["event"]) + require.Equal(t, "button clicked v2", buttonClickedOutput.Data["event_text"]) + require.Equal(t, "some value", buttonClickedOutput.Data["context_traits_attribute_v_3"]) + }, + }, + { + name: "with underscoreDivideNumbers=false", + configOverride: map[string]any{ + "underscoreDivideNumbers": false, + }, + validateEvents: func(t *testing.T, events []transformer.TransformerResponse) { + var trackOutput output + err := mapstructure.Decode(events[0].Output, &trackOutput) + require.NoError(t, err) + require.Equal(t, "tracks", trackOutput.Metadata.Table) + require.Contains(t, trackOutput.Metadata.Columns, "context_traits_attribute_v3") + require.Equal(t, "button_clicked_v2", trackOutput.Data["event"]) + require.Equal(t, "button clicked v2", trackOutput.Data["event_text"]) + require.Equal(t, "some value", trackOutput.Data["context_traits_attribute_v3"]) + + var buttonClickedOutput output + err = mapstructure.Decode(events[1].Output, &buttonClickedOutput) + require.NoError(t, err) + require.Equal(t, "button_clicked_v2", buttonClickedOutput.Metadata.Table) + require.Contains(t, buttonClickedOutput.Metadata.Columns, "context_traits_attribute_v3") + require.Equal(t, "button_clicked_v2", buttonClickedOutput.Data["event"]) + require.Equal(t, "button clicked v2", buttonClickedOutput.Data["event_text"]) + require.Equal(t, "some value", buttonClickedOutput.Data["context_traits_attribute_v3"]) + }, + }, + { + name: "without underscoreDivideNumbers", + configOverride: map[string]any{}, + validateEvents: func(t *testing.T, events []transformer.TransformerResponse) { + var trackOutput output + err := mapstructure.Decode(events[0].Output, &trackOutput) + require.NoError(t, err) + require.Equal(t, "tracks", trackOutput.Metadata.Table) + require.Contains(t, trackOutput.Metadata.Columns, "context_traits_attribute_v3") + require.Equal(t, "button_clicked_v2", trackOutput.Data["event"]) + require.Equal(t, "button clicked v2", trackOutput.Data["event_text"]) + require.Equal(t, "some value", trackOutput.Data["context_traits_attribute_v3"]) + + var buttonClickedOutput output + err = mapstructure.Decode(events[1].Output, &buttonClickedOutput) + require.NoError(t, err) + require.Equal(t, "button_clicked_v2", buttonClickedOutput.Metadata.Table) + require.Contains(t, buttonClickedOutput.Metadata.Columns, "context_traits_attribute_v3") + require.Equal(t, "button_clicked_v2", buttonClickedOutput.Data["event"]) + require.Equal(t, "button clicked v2", buttonClickedOutput.Data["event_text"]) + require.Equal(t, "some value", buttonClickedOutput.Data["context_traits_attribute_v3"]) + }, + }, + } + + for _, tc := range testcases { + destinationBuilder := backendconfigtest.NewDestinationBuilder(whutils.BQ). + WithID(destinationID). + WithRevisionID(destinationID) + for k, v := range tc.configOverride { + destinationBuilder.WithConfigOption(k, v) + } + destination := destinationBuilder.Build() + + destinationJSON, err := json.Marshal(destination) + require.NoError(t, err) + + eventTemplate := ` + [ + { + "message": { + "context": { + "traits": { + "attribute v3": "some value" + } + }, + "event": "button clicked v2", + "type": "track" + }, + "destination": {{.destination}} + } + ] +` + + tpl, err := template.New(uuid.New().String()).Parse(eventTemplate) + require.NoError(t, err) + + b := new(strings.Builder) + err = tpl.Execute(b, map[string]any{ + "destination": string(destinationJSON), + }) + require.NoError(t, err) + + var transformerEvents []transformer.TransformerEvent + err = json.Unmarshal([]byte(b.String()), &transformerEvents) + require.NoError(t, err) + + tr := transformer.NewTransformer(conf, logger.NOP, stats.Default) + response := tr.Transform(context.Background(), transformerEvents, 100) + require.Zero(t, len(response.FailedEvents)) + require.Len(t, response.Events, 2) + + tc.validateEvents(t, response.Events) + } + }) + t.Run("users additional fields (sent_at, timestamp, original_timestamp)", func(t *testing.T) { + testcases := []struct { + name string + destType string + configOverride map[string]any + validateEvents func(t *testing.T, events []transformer.TransformerResponse) + }{ + { + name: "for non-datalake destinations should be present", + destType: whutils.BQ, + configOverride: map[string]any{ + "allowUsersContextTraits": true, + }, + validateEvents: func(t *testing.T, events []transformer.TransformerResponse) { + var identifyEvent output + err := mapstructure.Decode(events[0].Output, &identifyEvent) + require.NoError(t, err) + require.Equal(t, "identifies", identifyEvent.Metadata.Table) + require.Contains(t, identifyEvent.Metadata.Columns, "sent_at") + require.Contains(t, identifyEvent.Metadata.Columns, "timestamp") + require.Contains(t, identifyEvent.Metadata.Columns, "original_timestamp") + require.Equal(t, "2023-05-12T04:08:48.750Z", identifyEvent.Data["sent_at"]) + require.Equal(t, "2023-05-12T04:08:48.750Z", identifyEvent.Data["timestamp"]) + require.Equal(t, "2023-05-12T04:08:48.750Z", identifyEvent.Data["original_timestamp"]) + + var userEvent output + err = mapstructure.Decode(events[1].Output, &userEvent) + require.NoError(t, err) + require.Equal(t, "users", userEvent.Metadata.Table) + require.Contains(t, userEvent.Metadata.Columns, "sent_at") + require.Contains(t, userEvent.Metadata.Columns, "timestamp") + require.Contains(t, userEvent.Metadata.Columns, "original_timestamp") + require.Equal(t, "2023-05-12T04:08:48.750Z", userEvent.Data["sent_at"]) + require.Equal(t, "2023-05-12T04:08:48.750Z", userEvent.Data["timestamp"]) + require.Equal(t, "2023-05-12T04:08:48.750Z", userEvent.Data["original_timestamp"]) + }, + }, + { + name: "for datalake destinations should not be present", + destType: whutils.GCSDatalake, + configOverride: map[string]any{ + "allowUsersContextTraits": false, + }, + validateEvents: func(t *testing.T, events []transformer.TransformerResponse) { + var identifyEvent output + err := mapstructure.Decode(events[0].Output, &identifyEvent) + require.NoError(t, err) + require.Equal(t, "identifies", identifyEvent.Metadata.Table) + require.Contains(t, identifyEvent.Metadata.Columns, "sent_at") + require.Contains(t, identifyEvent.Metadata.Columns, "timestamp") + require.Contains(t, identifyEvent.Metadata.Columns, "original_timestamp") + require.Equal(t, "2023-05-12T04:08:48.750Z", identifyEvent.Data["sent_at"]) + require.Equal(t, "2023-05-12T04:08:48.750Z", identifyEvent.Data["timestamp"]) + require.Equal(t, "2023-05-12T04:08:48.750Z", identifyEvent.Data["original_timestamp"]) + + var userEvent output + err = mapstructure.Decode(events[1].Output, &userEvent) + require.NoError(t, err) + require.Equal(t, "users", userEvent.Metadata.Table) + require.NotContains(t, userEvent.Metadata.Columns, "sent_at") + require.NotContains(t, userEvent.Metadata.Columns, "timestamp") + require.NotContains(t, userEvent.Metadata.Columns, "original_timestamp") + require.NotContains(t, userEvent.Data, "sent_at") + require.NotContains(t, userEvent.Data, "timestamp") + require.NotContains(t, userEvent.Data, "original_timestamp") + }, + }, + } + + for _, tc := range testcases { + destinationBuilder := backendconfigtest.NewDestinationBuilder(tc.destType). + WithID(destinationID). + WithRevisionID(destinationID) + for k, v := range tc.configOverride { + destinationBuilder.WithConfigOption(k, v) + } + destination := destinationBuilder.Build() + + destinationJSON, err := json.Marshal(destination) + require.NoError(t, err) + + eventTemplate := ` + [ + { + "message": { + "context": { + "traits": { + "firstname": "Mickey" + } + }, + "traits": { + "lastname": "Mouse" + }, + "type": "identify", + "userId": "9bb5d4c2-a7aa-4a36-9efb-dd2b1aec5d33", + "originalTimestamp": "2023-05-12T04:08:48.750+00:00", + "sentAt": "2023-05-12T04:08:48.750+00:00", + "timestamp": "2023-05-12T04:08:48.750+00:00" + }, + "destination": {{.destination}} + } + ] +` + + tpl, err := template.New(uuid.New().String()).Parse(eventTemplate) + require.NoError(t, err) + + b := new(strings.Builder) + err = tpl.Execute(b, map[string]any{ + "destination": string(destinationJSON), + }) + require.NoError(t, err) + + var transformerEvents []transformer.TransformerEvent + err = json.Unmarshal([]byte(b.String()), &transformerEvents) + require.NoError(t, err) + + tr := transformer.NewTransformer(conf, logger.NOP, stats.Default) + response := tr.Transform(context.Background(), transformerEvents, 100) + require.Zero(t, len(response.FailedEvents)) + require.Len(t, response.Events, 2) + + tc.validateEvents(t, response.Events) + } + }) +} + func runWarehouseServer( t testing.TB, ctx context.Context, diff --git a/warehouse/integrations/azure-synapse/azure_synapse_test.go b/warehouse/integrations/azure-synapse/azure_synapse_test.go index 339ff65042..c0e2781ea8 100644 --- a/warehouse/integrations/azure-synapse/azure_synapse_test.go +++ b/warehouse/integrations/azure-synapse/azure_synapse_test.go @@ -153,6 +153,8 @@ func TestIntegration(t *testing.T) { WithConfigOption("endPoint", minioEndpoint). WithConfigOption("useRudderStorage", false). WithConfigOption("syncFrequency", "30"). + WithConfigOption("allowUsersContextTraits", true). + WithConfigOption("underscoreDivideNumbers", true). Build() workspaceConfig := backendconfigtest.NewConfigBuilder(). diff --git a/warehouse/integrations/clickhouse/clickhouse_test.go b/warehouse/integrations/clickhouse/clickhouse_test.go index ff2c171cd1..b82db20069 100644 --- a/warehouse/integrations/clickhouse/clickhouse_test.go +++ b/warehouse/integrations/clickhouse/clickhouse_test.go @@ -222,7 +222,9 @@ func TestIntegration(t *testing.T) { WithConfigOption("secure", false). WithConfigOption("endPoint", minioEndpoint). WithConfigOption("useRudderStorage", false). - WithConfigOption("syncFrequency", "30") + WithConfigOption("syncFrequency", "30"). + WithConfigOption("allowUsersContextTraits", true). + WithConfigOption("underscoreDivideNumbers", true) for k, v := range tc.configOverride { destinationBuilder = destinationBuilder.WithConfigOption(k, v) } diff --git a/warehouse/integrations/datalake/datalake_test.go b/warehouse/integrations/datalake/datalake_test.go index d1c2c591f9..4f9d011da5 100644 --- a/warehouse/integrations/datalake/datalake_test.go +++ b/warehouse/integrations/datalake/datalake_test.go @@ -333,7 +333,9 @@ func TestIntegration(t *testing.T) { WithID(destinationID). WithRevisionID(destinationID). WithConfigOption("namespace", namespace). - WithConfigOption("syncFrequency", "30") + WithConfigOption("syncFrequency", "30"). + WithConfigOption("allowUsersContextTraits", true). + WithConfigOption("underscoreDivideNumbers", true) for k, v := range tc.configOverride { destinationBuilder = destinationBuilder.WithConfigOption(k, v) } @@ -570,6 +572,8 @@ func TestIntegration(t *testing.T) { WithConfigOption("s3ForcePathStyle", true). WithConfigOption("disableSSL", true). WithConfigOption("prefix", "some-prefix"). + WithConfigOption("allowUsersContextTraits", true). + WithConfigOption("underscoreDivideNumbers", true). Build() workspaceConfig := backendconfigtest.NewConfigBuilder(). WithSource( @@ -853,6 +857,8 @@ func TestIntegration(t *testing.T) { WithConfigOption("s3ForcePathStyle", true). WithConfigOption("disableSSL", true). WithConfigOption("prefix", "some-prefix"). + WithConfigOption("allowUsersContextTraits", true). + WithConfigOption("underscoreDivideNumbers", true). Build() workspaceConfig := backendconfigtest.NewConfigBuilder(). WithSource( diff --git a/warehouse/integrations/deltalake/deltalake_test.go b/warehouse/integrations/deltalake/deltalake_test.go index 8306a25f52..fab8eb3499 100644 --- a/warehouse/integrations/deltalake/deltalake_test.go +++ b/warehouse/integrations/deltalake/deltalake_test.go @@ -281,7 +281,9 @@ func TestIntegration(t *testing.T) { WithConfigOption("enableSSE", false). WithConfigOption("accountName", credentials.AccountName). WithConfigOption("accountKey", credentials.AccountKey). - WithConfigOption("syncFrequency", "30") + WithConfigOption("syncFrequency", "30"). + WithConfigOption("allowUsersContextTraits", true). + WithConfigOption("underscoreDivideNumbers", true) for k, v := range tc.configOverride { destinationBuilder = destinationBuilder.WithConfigOption(k, v) } diff --git a/warehouse/integrations/mssql/mssql_test.go b/warehouse/integrations/mssql/mssql_test.go index 6210869fec..4a1d6dcf38 100644 --- a/warehouse/integrations/mssql/mssql_test.go +++ b/warehouse/integrations/mssql/mssql_test.go @@ -183,6 +183,8 @@ func TestIntegration(t *testing.T) { WithConfigOption("endPoint", minioEndpoint). WithConfigOption("useRudderStorage", false). WithConfigOption("syncFrequency", "30"). + WithConfigOption("allowUsersContextTraits", true). + WithConfigOption("underscoreDivideNumbers", true). Build() workspaceConfig := backendconfigtest.NewConfigBuilder(). diff --git a/warehouse/integrations/postgres/postgres_test.go b/warehouse/integrations/postgres/postgres_test.go index b4a0d3385b..5be4e6aa0d 100644 --- a/warehouse/integrations/postgres/postgres_test.go +++ b/warehouse/integrations/postgres/postgres_test.go @@ -309,7 +309,9 @@ func TestIntegration(t *testing.T) { WithConfigOption("useSSL", false). WithConfigOption("endPoint", minioEndpoint). WithConfigOption("useRudderStorage", false). - WithConfigOption("syncFrequency", "30") + WithConfigOption("syncFrequency", "30"). + WithConfigOption("allowUsersContextTraits", true). + WithConfigOption("underscoreDivideNumbers", true) for k, v := range tc.configOverride { destinationBuilder = destinationBuilder.WithConfigOption(k, v) } @@ -530,6 +532,8 @@ func TestIntegration(t *testing.T) { WithConfigOption("sshHost", tunnelledSSHHost). WithConfigOption("sshPort", strconv.Itoa(sshPort)). WithConfigOption("sshPrivateKey", strings.ReplaceAll(tunnelledPrivateKey, "\\n", "\n")). + WithConfigOption("allowUsersContextTraits", true). + WithConfigOption("underscoreDivideNumbers", true). Build() workspaceConfig := backendconfigtest.NewConfigBuilder(). diff --git a/warehouse/integrations/redshift/redshift_test.go b/warehouse/integrations/redshift/redshift_test.go index d1e48d0a16..dd09a453ef 100644 --- a/warehouse/integrations/redshift/redshift_test.go +++ b/warehouse/integrations/redshift/redshift_test.go @@ -443,7 +443,9 @@ func TestIntegration(t *testing.T) { WithConfigOption("namespace", namespace). WithConfigOption("enableSSE", false). WithConfigOption("useRudderStorage", false). - WithConfigOption("syncFrequency", "30") + WithConfigOption("syncFrequency", "30"). + WithConfigOption("allowUsersContextTraits", true). + WithConfigOption("underscoreDivideNumbers", true) for k, v := range tc.configOverride { destinationBuilder = destinationBuilder.WithConfigOption(k, v) } diff --git a/warehouse/integrations/snowflake/snowflake_test.go b/warehouse/integrations/snowflake/snowflake_test.go index 6fc07eaa86..55c0f43894 100644 --- a/warehouse/integrations/snowflake/snowflake_test.go +++ b/warehouse/integrations/snowflake/snowflake_test.go @@ -486,7 +486,9 @@ func TestIntegration(t *testing.T) { WithConfigOption("namespace", namespace). WithConfigOption("enableSSE", false). WithConfigOption("useRudderStorage", false). - WithConfigOption("syncFrequency", "30") + WithConfigOption("syncFrequency", "30"). + WithConfigOption("allowUsersContextTraits", true). + WithConfigOption("underscoreDivideNumbers", true) for k, v := range tc.configOverride { destinationBuilder = destinationBuilder.WithConfigOption(k, v) } diff --git a/warehouse/integrations/testhelper/records.go b/warehouse/integrations/testhelper/records.go index 70eac9fc17..2809c64ba9 100644 --- a/warehouse/integrations/testhelper/records.go +++ b/warehouse/integrations/testhelper/records.go @@ -207,7 +207,6 @@ func UploadJobUsersRecords(userIDFormat, sourceID, destID, destType string) [][] // UploadJobUsersRecordsForDatalake returns a set of records for testing upload job users scenarios. // It uses upload-job.events-1.json, upload-job.events-2.json as the source of data. // sent_at, timestamp, original_timestamp will not be present in the records. -// TODO: Once we start populating it in the records, we need to update the expected records. func UploadJobUsersRecordsForDatalake(userIDFormat, sourceID, destID, destType string) [][]string { uuidTS := timeutil.Now().Format("2006-01-02") return [][]string{ @@ -235,12 +234,11 @@ func UploadJobUsersAppendRecords(userIDFormat, sourceID, destID, destType string // UploadJobUsersAppendRecordsUsingUsersLoadFiles returns a set of records for testing upload job users scenarios. // It uses twice upload-job.events-1.json as the source of data. // sent_at, timestamp, original_timestamp will not be present in the records. -// TODO: Once we start populating it in the records, we need to update the expected records. func UploadJobUsersAppendRecordsUsingUsersLoadFiles(userIDFormat, sourceID, destID, destType string) [][]string { uuidTS := timeutil.Now().Format("2006-01-02") return [][]string{ - {sourceID, destType, "[::1]", "Richard Hendricks", "non escaped column", "non escaped column", "2", "", "2", "[::1]", "non escaped column", "rhedricks+4@example.com", "", destID, "rhedricks+4@example.com", "non escaped column", "HTTP", userIDFormat, uuidTS, "2023-05-12T04:26:48Z", "Richard Hendricks", ""}, - {sourceID, destType, "[::1]", "Richard Hendricks", "non escaped column", "non escaped column", "2", "", "2", "[::1]", "non escaped column", "rhedricks+4@example.com", "", destID, "rhedricks+4@example.com", "non escaped column", "HTTP", userIDFormat, uuidTS, "2023-05-12T04:26:48Z", "Richard Hendricks", ""}, + {sourceID, destType, "[::1]", "Richard Hendricks", "non escaped column", "non escaped column", "2", "2023-05-12T04:26:48Z", "2", "[::1]", "non escaped column", "rhedricks+4@example.com", "2023-05-12T04:26:48Z", destID, "rhedricks+4@example.com", "non escaped column", "HTTP", userIDFormat, uuidTS, "2023-05-12T04:26:48Z", "Richard Hendricks", "2023-05-12T04:26:48Z"}, + {sourceID, destType, "[::1]", "Richard Hendricks", "non escaped column", "non escaped column", "2", "2023-05-12T04:26:48Z", "2", "[::1]", "non escaped column", "rhedricks+4@example.com", "2023-05-12T04:26:48Z", destID, "rhedricks+4@example.com", "non escaped column", "HTTP", userIDFormat, uuidTS, "2023-05-12T04:26:48Z", "Richard Hendricks", "2023-05-12T04:26:48Z"}, } } @@ -258,12 +256,11 @@ func UploadJobUsersMergeRecord(userIDFormat, sourceID, destID, destType string) // sent_at, timestamp, original_timestamp will not be present in the records. // For AggregatingMergeTree ClickHouse replaces all rows with the same primary key (or more accurately, with the same sorting key) with a single row (within a one data part) that stores a combination of states of aggregate functions. // So received_at will be record for the first record only. -// TODO: Once we start populating it in the records, we need to update the expected records. func UploadJobUsersRecordsUsingUsersLoadFilesForClickhouse(userIDFormat, sourceID, destID, destType string) [][]string { uuidTS := timeutil.Now().Format("2006-01-02") return [][]string{ - {sourceID, destType, "[::1]", "Richard Hendricks", "non escaped column", "non escaped column", "2", "", "2", "[::1]", "non escaped column", "rhedricks+4@example.com", "", destID, "rhedricks+4@example.com", "non escaped column", "HTTP", userIDFormat, uuidTS, "2023-05-12T04:08:48Z", "Richard Hendricks", ""}, - {sourceID, destType, "[::1]", "Richard Hendricks", "non escaped column", "non escaped column", "2", "", "2", "[::1]", "non escaped column", "rhedricks+8@example.com", "", destID, "rhedricks+8@example.com", "non escaped column", "HTTP", userIDFormat, uuidTS, "2023-05-12T04:32:48Z", "Richard Hendricks", ""}, + {sourceID, destType, "[::1]", "Richard Hendricks", "non escaped column", "non escaped column", "2", "2023-05-12T04:26:48Z", "2", "[::1]", "non escaped column", "rhedricks+4@example.com", "2023-05-12T04:26:48Z", destID, "rhedricks+4@example.com", "non escaped column", "HTTP", userIDFormat, uuidTS, "2023-05-12T04:08:48Z", "Richard Hendricks", "2023-05-12T04:26:48Z"}, + {sourceID, destType, "[::1]", "Richard Hendricks", "non escaped column", "non escaped column", "2", "2023-05-12T04:50:48Z", "2", "[::1]", "non escaped column", "rhedricks+8@example.com", "2023-05-12T04:50:48Z", destID, "rhedricks+8@example.com", "non escaped column", "HTTP", userIDFormat, uuidTS, "2023-05-12T04:32:48Z", "Richard Hendricks", "2023-05-12T04:50:48Z"}, } }