Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(warehouse): batching of alter add statements #2484

Merged
merged 42 commits into from
Oct 31, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
42 commits
Select commit Hold shift + click to select a range
58d4080
chore(warehouse): added support for warehouse multi alter statements
achettyiitr Sep 27, 2022
f37b251
code cleanup
achettyiitr Sep 27, 2022
890176d
typo changes
achettyiitr Sep 27, 2022
0022c5b
make fmt changes
achettyiitr Sep 27, 2022
af5ad7c
code cleanup
achettyiitr Sep 27, 2022
f183515
snowflake test issue and fmt
achettyiitr Sep 27, 2022
f48a851
bigquery schema issue
achettyiitr Sep 27, 2022
1678697
deepsource changes
achettyiitr Sep 27, 2022
4a6af3a
deepsource changes
achettyiitr Sep 27, 2022
dab9b3c
snowflake query changes.
achettyiitr Sep 23, 2022
61bf950
bigquery changes
achettyiitr Sep 27, 2022
ee9843d
sql formatting.
achettyiitr Sep 27, 2022
a2de925
QueryBuilder test case issues.
achettyiitr Sep 27, 2022
e3826ec
Merge branch 'master' of github.com:rudderlabs/rudder-server into fea…
achettyiitr Sep 27, 2022
ceb0be1
master pull
achettyiitr Sep 29, 2022
d23c6f8
unuse QueryBuilder
achettyiitr Oct 9, 2022
ec1176f
Merge branch 'master' of github.com:rudderlabs/rudder-server into fea…
achettyiitr Oct 9, 2022
2d3fb7d
Merge branch 'master' into feat.mergeAlterStatements
achettyiitr Oct 10, 2022
5bda8a1
add additional column check and code reformatting
achettyiitr Oct 10, 2022
720c0e7
Merge branch 'master' of github.com:rudderlabs/rudder-server into fea…
achettyiitr Oct 10, 2022
d66f4be
Merge branch 'feat.mergeAlterStatements' of github.com:rudderlabs/rud…
achettyiitr Oct 10, 2022
1b5f4ab
remove additional check for single column
achettyiitr Oct 10, 2022
51eda07
error handling for single column
achettyiitr Oct 10, 2022
d15fed0
Merge branch 'master' into feat.mergeAlterStatements
achettyiitr Oct 11, 2022
0f96231
master
achettyiitr Oct 11, 2022
b86ffa3
review comments
achettyiitr Oct 11, 2022
5fabc13
mssql and azure synapse changes
achettyiitr Oct 11, 2022
be7b5f9
master pull
achettyiitr Oct 13, 2022
09b6376
master pull
achettyiitr Oct 13, 2022
698116d
master pull
achettyiitr Oct 13, 2022
c249d62
Merge branch 'master' of github.com:rudderlabs/rudder-server into fea…
achettyiitr Oct 14, 2022
9d9b5cd
mssql and az formating sql changes
achettyiitr Oct 14, 2022
955d5a6
Merge branch 'master' of github.com:rudderlabs/rudder-server into fea…
achettyiitr Oct 14, 2022
489a70a
review comments
achettyiitr Oct 14, 2022
2e659be
review comments
achettyiitr Oct 14, 2022
a65d6eb
Merge branch 'master' into feat.mergeAlterStatements
achettyiitr Oct 18, 2022
58b2ad5
Merge branch 'master' into feat.mergeAlterStatements
achettyiitr Oct 19, 2022
f490126
Merge branch 'master' of github.com:rudderlabs/rudder-server into fea…
achettyiitr Oct 19, 2022
ad84bdd
review comments
achettyiitr Oct 19, 2022
07210d3
review comments
achettyiitr Oct 19, 2022
78de908
master pull
achettyiitr Oct 31, 2022
916f845
code cleanup
achettyiitr Oct 31, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 38 additions & 11 deletions warehouse/azure-synapse/azure-synapse.go
Original file line number Diff line number Diff line change
Expand Up @@ -632,14 +632,6 @@ func (as *HandleT) createTable(name string, columns map[string]string) (err erro
return
}

func (as *HandleT) addColumn(tableName, columnName, columnType string) (err error) {
sqlStatement := fmt.Sprintf(`IF NOT EXISTS (SELECT 1 FROM SYS.COLUMNS WHERE OBJECT_ID = OBJECT_ID(N'%[1]s') AND name = '%[2]s')
ALTER TABLE %[1]s ADD %[2]s %[3]s`, tableName, columnName, rudderDataTypesMapToMssql[columnType])
pkgLogger.Infof("AZ: Adding column in synapse for AZ:%s : %v", as.Warehouse.Destination.ID, sqlStatement)
_, err = as.Db.Exec(sqlStatement)
return
}

func (as *HandleT) CreateTable(tableName string, columnMap map[string]string) (err error) {
// Search paths doesn't exist unlike Postgres, default is dbo. Hence, use namespace wherever possible
err = as.createTable(as.Namespace+"."+tableName, columnMap)
Expand All @@ -653,9 +645,44 @@ func (as *HandleT) DropTable(tableName string) (err error) {
return
}

func (as *HandleT) AddColumn(tableName, columnName, columnType string) (err error) {
err = as.addColumn(as.Namespace+"."+tableName, columnName, columnType)
return err
func (as *HandleT) AddColumns(tableName string, columnsInfo []warehouseutils.ColumnInfo) (err error) {
var query string
if len(columnsInfo) == 1 {
query += fmt.Sprintf(`
IF NOT EXISTS (
SELECT
1
FROM
SYS.COLUMNS
WHERE
OBJECT_ID = OBJECT_ID(N'%[1]s.%[2]s')
AND name = '%[3]s'
)
`,
as.Namespace,
tableName,
columnsInfo[0].Name,
)
}

query += fmt.Sprintf(`
ALTER TABLE
%s.%s
ADD`,
as.Namespace,
tableName,
)

for _, columnInfo := range columnsInfo {
query += fmt.Sprintf(` %s %s,`, columnInfo.Name, rudderDataTypesMapToMssql[columnInfo.Type])
achettyiitr marked this conversation as resolved.
Show resolved Hide resolved
}

query = strings.TrimSuffix(query, ",")
query += ";"

pkgLogger.Infof("AZ: Adding columns for destinationID: %s, tableName: %s with query: %v", as.Warehouse.Destination.ID, tableName, query)
_, err = as.Db.Exec(query)
return
}

func (*HandleT) AlterColumn(_, _, _ string) (err error) {
Expand Down
13 changes: 13 additions & 0 deletions warehouse/azure-synapse/azure_synapse_suite_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package azuresynapse_test

import (
"testing"

. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
)

func TestAzureSynapse(t *testing.T) {
RegisterFailHandler(Fail)
RunSpecs(t, "AzureSynapse Suite")
}
50 changes: 27 additions & 23 deletions warehouse/bigquery/bigquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,23 +165,6 @@ func (bq *HandleT) createTableView(tableName string, columnMap map[string]string
return
}

func (bq *HandleT) addColumn(tableName, columnName, columnType string) (err error) {
pkgLogger.Infof("BQ: Adding columns in table %s in bigquery dataset: %s in project: %s", tableName, bq.namespace, bq.projectID)
tableRef := bq.db.Dataset(bq.namespace).Table(tableName)
meta, err := tableRef.Metadata(bq.backgroundContext)
if err != nil {
return err
}
newSchema := append(meta.Schema,
&bigquery.FieldSchema{Name: columnName, Type: dataTypesMap[columnType]},
)
update := bigquery.TableMetadataToUpdate{
Schema: newSchema,
}
_, err = tableRef.Update(bq.backgroundContext, update, meta.ETag)
return
}

func (bq *HandleT) schemaExists(_, _ string) (exists bool, err error) {
ds := bq.db.Dataset(bq.namespace)
_, err = ds.Metadata(bq.backgroundContext)
Expand Down Expand Up @@ -826,15 +809,36 @@ func (bq *HandleT) LoadTable(tableName string) error {
return err
}

func (bq *HandleT) AddColumn(tableName, columnName, columnType string) (err error) {
err = bq.addColumn(tableName, columnName, columnType)
func (bq *HandleT) AddColumns(tableName string, columnsInfo []warehouseutils.ColumnInfo) (err error) {
pkgLogger.Infof("BQ: Adding columns for destinationID: %s, tableName: %s, dataset: %s, project: %s", bq.warehouse.Destination.ID, tableName, bq.namespace, bq.projectID)
tableRef := bq.db.Dataset(bq.namespace).Table(tableName)
meta, err := tableRef.Metadata(bq.backgroundContext)
if err != nil {
if checkAndIgnoreAlreadyExistError(err) {
pkgLogger.Infof("BQ: Column %s already exists on %s.%s \nResponse: %v", columnName, bq.namespace, tableName, err)
err = nil
return
}

newSchema := meta.Schema
for _, columnInfo := range columnsInfo {
newSchema = append(newSchema,
&bigquery.FieldSchema{Name: columnInfo.Name, Type: dataTypesMap[columnInfo.Type]},
)
}

tableMetadataToUpdate := bigquery.TableMetadataToUpdate{
Schema: newSchema,
}
_, err = tableRef.Update(bq.backgroundContext, tableMetadataToUpdate, meta.ETag)

// Handle error in case of single column
if len(columnsInfo) == 1 {
if err != nil {
if checkAndIgnoreAlreadyExistError(err) {
pkgLogger.Infof("BQ: Column %s already exists on %s.%s \nResponse: %v", columnsInfo[0].Name, bq.namespace, tableName, err)
err = nil
}
}
}
return err
return
}

func (*HandleT) AlterColumn(_, _, _ string) (err error) {
Expand Down
34 changes: 27 additions & 7 deletions warehouse/clickhouse/clickhouse.go
Original file line number Diff line number Diff line change
Expand Up @@ -843,16 +843,36 @@ func (ch *HandleT) DropTable(tableName string) (err error) {
return
}

// AddColumn adds column:columnName with dataType columnType to the tableName
func (ch *HandleT) AddColumn(tableName, columnName, columnType string) (err error) {
cluster := warehouseutils.GetConfigValue(Cluster, ch.Warehouse)
clusterClause := ""
func (ch *HandleT) AddColumns(tableName string, columnsInfo []warehouseutils.ColumnInfo) (err error) {
var (
query string
cluster string
clusterClause string
)

cluster = warehouseutils.GetConfigValue(Cluster, ch.Warehouse)
if len(strings.TrimSpace(cluster)) > 0 {
clusterClause = fmt.Sprintf(`ON CLUSTER %q`, cluster)
}
sqlStatement := fmt.Sprintf(`ALTER TABLE %q.%q %s ADD COLUMN IF NOT EXISTS %q %s`, ch.Namespace, tableName, clusterClause, columnName, getClickHouseColumnTypeForSpecificTable(tableName, columnName, rudderDataTypesMapToClickHouse[columnType], false))
pkgLogger.Infof("CH: Adding column in clickhouse for ch:%s : %v", ch.Warehouse.Destination.ID, sqlStatement)
_, err = ch.Db.Exec(sqlStatement)

query = fmt.Sprintf(`
ALTER TABLE
%q.%q %s`,
ch.Namespace,
tableName,
clusterClause,
)

for _, columnInfo := range columnsInfo {
columnType := getClickHouseColumnTypeForSpecificTable(tableName, columnInfo.Name, rudderDataTypesMapToClickHouse[columnInfo.Type], false)
query += fmt.Sprintf(` ADD COLUMN IF NOT EXISTS %q %s,`, columnInfo.Name, columnType)
}

query = strings.TrimSuffix(query, ",")
query += ";"

pkgLogger.Infof("CH: Adding columns for destinationID: %s, tableName: %s with query: %v", ch.Warehouse.Destination.ID, tableName, query)
_, err = ch.Db.Exec(query)
return
}

Expand Down
4 changes: 2 additions & 2 deletions warehouse/constraint.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,8 +92,8 @@ func (ic *IndexConstraintT) violates(brEvent *BatchRouterEventT, columnName stri
if !ok {
continue
}
if columnInfo.ColumnType == "string" {
columnVal, ok := columnInfo.ColumnVal.(string)
if columnInfo.Type == "string" {
columnVal, ok := columnInfo.Value.(string)
if !ok {
continue
}
Expand Down
4 changes: 2 additions & 2 deletions warehouse/datalake/datalake.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,8 @@ func (*HandleT) DropTable(_ string) (err error) {
return fmt.Errorf("datalake err :not implemented")
}

func (wh *HandleT) AddColumn(tableName, columnName, columnType string) (err error) {
return wh.SchemaRepository.AddColumn(tableName, columnName, columnType)
func (wh *HandleT) AddColumns(tableName string, columnsInfo []warehouseutils.ColumnInfo) (err error) {
return wh.SchemaRepository.AddColumns(tableName, columnsInfo)
}

func (wh *HandleT) AlterColumn(tableName, columnName, columnType string) (err error) {
Expand Down
10 changes: 6 additions & 4 deletions warehouse/datalake/schema-repository/glue.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ func (gl *GlueSchemaRepository) CreateTable(tableName string, columnMap map[stri
return
}

func (gl *GlueSchemaRepository) AddColumn(tableName, columnName, columnType string) (err error) {
func (gl *GlueSchemaRepository) AddColumns(tableName string, columnsInfo []warehouseutils.ColumnInfo) (err error) {
updateTableInput := glue.UpdateTableInput{
DatabaseName: aws.String(gl.Namespace),
TableInput: &glue.TableInput{
Expand All @@ -148,8 +148,10 @@ func (gl *GlueSchemaRepository) AddColumn(tableName, columnName, columnType stri
return fmt.Errorf("table %s not found in schema", tableName)
}

// add new column to tableSchema
tableSchema[columnName] = columnType
// add new columns to table schema
for _, columnInfo := range columnsInfo {
tableSchema[columnInfo.Name] = columnInfo.Type
}

// add storage descriptor to update table request
updateTableInput.TableInput.StorageDescriptor = gl.getStorageDescriptor(tableName, tableSchema)
Expand All @@ -160,7 +162,7 @@ func (gl *GlueSchemaRepository) AddColumn(tableName, columnName, columnType stri
}

func (gl *GlueSchemaRepository) AlterColumn(tableName, columnName, columnType string) (err error) {
return gl.AddColumn(tableName, columnName, columnType)
return gl.AddColumns(tableName, []warehouseutils.ColumnInfo{{Name: columnName, Type: columnType}})
}

func getGlueClient(wh warehouseutils.Warehouse) (*glue.Glue, error) {
Expand Down
6 changes: 4 additions & 2 deletions warehouse/datalake/schema-repository/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func (ls *LocalSchemaRepository) CreateTable(tableName string, columnMap map[str
return ls.uploader.UpdateLocalSchema(schema)
}

func (ls *LocalSchemaRepository) AddColumn(tableName, columnName, columnType string) (err error) {
func (ls *LocalSchemaRepository) AddColumns(tableName string, columnsInfo []warehouseutils.ColumnInfo) (err error) {
// fetch schema from local db
schema, err := ls.FetchSchema(ls.warehouse)
if err != nil {
Expand All @@ -62,7 +62,9 @@ func (ls *LocalSchemaRepository) AddColumn(tableName, columnName, columnType str
return fmt.Errorf("failed to add column: table %s does not exist", tableName)
}

schema[tableName][columnName] = columnType
for _, columnInfo := range columnsInfo {
schema[tableName][columnInfo.Name] = columnInfo.Type
}

// update schema
return ls.uploader.UpdateLocalSchema(schema)
Expand Down
2 changes: 1 addition & 1 deletion warehouse/datalake/schema-repository/schema-repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ type SchemaRepository interface {
FetchSchema(warehouse warehouseutils.Warehouse) (warehouseutils.SchemaT, error)
CreateSchema() (err error)
CreateTable(tableName string, columnMap map[string]string) (err error)
AddColumn(tableName, columnName, columnType string) (err error)
AddColumns(tableName string, columnsInfo []warehouseutils.ColumnInfo) (err error)
AlterColumn(tableName, columnName, columnType string) (err error)
}

Expand Down
Loading