diff --git a/core/dbio/iop/datastream.go b/core/dbio/iop/datastream.go index 9aed357e..abc815e1 100644 --- a/core/dbio/iop/datastream.go +++ b/core/dbio/iop/datastream.go @@ -2024,7 +2024,7 @@ func (ds *Datastream) NewCsvReaderChnl(sc StreamConfig) (readerChn chan *BatchRe } if sp.Config.Header { - bw, err := w.Write(batch.Columns.Names(true, true)) + bw, err := w.Write(batch.Columns.Names(false, true)) tbw = tbw + cast.ToInt64(bw) if err != nil { err = g.Error(err, "error writing header") @@ -2109,7 +2109,7 @@ func (ds *Datastream) NewJsonReaderChnl(sc StreamConfig) (readerChn chan *io.Pip tbw = tbw + cast.ToInt64(bw) for batch := range ds.BatchChan { - fields := batch.Columns.Names(true) + fields := batch.Columns.Names() for row0 := range batch.Rows { c++ @@ -2182,7 +2182,7 @@ func (ds *Datastream) NewJsonLinesReaderChnl(sc StreamConfig) (readerChn chan *i c := int64(0) // local counter for batch := range ds.BatchChan { - fields := batch.Columns.Names(true) + fields := batch.Columns.Names() for row0 := range batch.Rows { c++ @@ -2482,7 +2482,7 @@ func (ds *Datastream) NewCsvReader(sc StreamConfig) *io.PipeReader { } if sp.Config.Header { - bw, err := w.Write(batch.Columns.Names(true, true)) + bw, err := w.Write(batch.Columns.Names(false, true)) tbw = tbw + cast.ToInt64(bw) if err != nil { ds.Context.CaptureErr(g.Error(err, "error writing header")) diff --git a/core/dbio/iop/datatype.go b/core/dbio/iop/datatype.go index 7f81e247..b22e0e2b 100755 --- a/core/dbio/iop/datatype.go +++ b/core/dbio/iop/datatype.go @@ -1255,3 +1255,63 @@ func FormatValue(val any, column Column, connType dbio.Type) (newVal string) { } return } + +// ColumnCasing is the casing method to use +type ColumnCasing string + +const ( + SourceColumnCasing ColumnCasing = "source" // keeps source column name casing. The default. + TargetColumnCasing ColumnCasing = "target" // converts casing according to target database. Lower-case for files. + SnakeColumnCasing ColumnCasing = "snake" // converts snake casing according to target database. Lower-case for files. + UpperColumnCasing ColumnCasing = "upper" // make it upper case + LowerColumnCasing ColumnCasing = "lower" // make it lower case +) + +// Equals evaluates equality for column casing (pointer safe) +func (cc *ColumnCasing) Equals(val ColumnCasing) bool { + if cc.IsEmpty() { + return false + } + return *cc == val +} + +// IsEmpty return true if nil or blank +func (cc *ColumnCasing) IsEmpty() bool { + if cc == nil || string(*cc) == "" { + return true + } + return false +} + +var matchAllCap = regexp.MustCompile("([a-z0-9])([A-Z])") + +// Apply applies column casing to provided name. +// If cc is nil or SourceColumnCasing, it returns the original value +func (cc *ColumnCasing) Apply(name string, tgtConnType dbio.Type) string { + if cc.IsEmpty() || cc.Equals(SourceColumnCasing) { + return name + } + + // convert to snake case + if cc.Equals(SnakeColumnCasing) { + name = matchAllCap.ReplaceAllString(name, "${1}_${2}") + } + + // clean up other weird chars + name = CleanName(name) + + switch { + case cc.Equals(UpperColumnCasing): + return strings.ToUpper(name) + case cc.Equals(LowerColumnCasing): + return strings.ToLower(name) + case cc.Equals(TargetColumnCasing), cc.Equals(SnakeColumnCasing): + // lower case for target file system + if tgtConnType.DBNameUpperCase() { + return strings.ToUpper(name) + } + return strings.ToLower(name) + } + + return name +} diff --git a/core/dbio/iop/stream_processor.go b/core/dbio/iop/stream_processor.go index f26734c3..8f436a85 100644 --- a/core/dbio/iop/stream_processor.go +++ b/core/dbio/iop/stream_processor.go @@ -63,6 +63,7 @@ type StreamConfig struct { FieldsPerRec int `json:"fields_per_rec"` Jmespath string `json:"jmespath"` Sheet string `json:"sheet"` + ColumnCasing ColumnCasing `json:"column_casing"` BoolAsInt bool `json:"-"` Columns Columns `json:"columns"` // list of column types. Can be partial list! likely is! transforms map[string][]Transform // array of transform functions to apply @@ -349,6 +350,10 @@ func (sp *StreamProcessor) SetConfig(configMap map[string]string) { sp.Config.SkipBlankLines = cast.ToBool(val) } + if val, ok := configMap["column_casing"]; ok { + sp.Config.ColumnCasing = ColumnCasing(val) + } + if val, ok := configMap["bool_at_int"]; ok { sp.Config.BoolAsInt = cast.ToBool(val) } diff --git a/core/sling/config.go b/core/sling/config.go index 784c4c6c..b67304f9 100644 --- a/core/sling/config.go +++ b/core/sling/config.go @@ -53,24 +53,6 @@ var AllMode = []struct { {BackfillMode, "BackfillMode"}, } -// ColumnCasing is the casing method to use -type ColumnCasing string - -const ( - SourceColumnCasing ColumnCasing = "source" // keeps source column name casing. The default. - TargetColumnCasing ColumnCasing = "target" // converts casing according to target database. Lower-case for files. - SnakeColumnCasing ColumnCasing = "snake" // converts snake casing according to target database. Lower-case for files. -) - -var AllColumnCasing = []struct { - Value ColumnCasing - TSName string -}{ - {SourceColumnCasing, "SourceColumnCasing"}, - {TargetColumnCasing, "TargetColumnCasing"}, - {SnakeColumnCasing, "SnakeColumnCasing"}, -} - // NewConfig return a config object from a YAML / JSON string func NewConfig(cfgStr string) (cfg *Config, err error) { // set default, unmarshalling will overwrite @@ -1295,7 +1277,7 @@ type TargetOptions struct { IgnoreExisting *bool `json:"ignore_existing,omitempty" yaml:"ignore_existing,omitempty"` AddNewColumns *bool `json:"add_new_columns,omitempty" yaml:"add_new_columns,omitempty"` AdjustColumnType *bool `json:"adjust_column_type,omitempty" yaml:"adjust_column_type,omitempty"` - ColumnCasing *ColumnCasing `json:"column_casing,omitempty" yaml:"column_casing,omitempty"` + ColumnCasing *iop.ColumnCasing `json:"column_casing,omitempty" yaml:"column_casing,omitempty"` TableKeys database.TableKeys `json:"table_keys,omitempty" yaml:"table_keys,omitempty"` TableTmp string `json:"table_tmp,omitempty" yaml:"table_tmp,omitempty"` @@ -1353,7 +1335,7 @@ var TargetFileOptionsDefault = TargetOptions{ DatetimeFormat: "auto", Delimiter: ",", MaxDecimals: g.Int(-1), - ColumnCasing: (*ColumnCasing)(g.String(string(SourceColumnCasing))), + ColumnCasing: g.Ptr(iop.SourceColumnCasing), } var TargetDBOptionsDefault = TargetOptions{ @@ -1367,7 +1349,7 @@ var TargetDBOptionsDefault = TargetOptions{ AdjustColumnType: g.Bool(false), DatetimeFormat: "auto", MaxDecimals: g.Int(-1), - ColumnCasing: (*ColumnCasing)(g.String(string(SourceColumnCasing))), + ColumnCasing: g.Ptr(iop.SourceColumnCasing), } func (o *SourceOptions) SetDefaults(sourceOptions SourceOptions) { diff --git a/core/sling/config_test.go b/core/sling/config_test.go index 7454ae79..8d918829 100755 --- a/core/sling/config_test.go +++ b/core/sling/config_test.go @@ -61,9 +61,9 @@ func TestConfig(t *testing.T) { func TestColumnCasing(t *testing.T) { df := iop.NewDataflow(0) - sourceCasing := SourceColumnCasing - snakeCasing := SnakeColumnCasing - targetCasing := TargetColumnCasing + sourceCasing := iop.SourceColumnCasing + snakeCasing := iop.SnakeColumnCasing + targetCasing := iop.TargetColumnCasing df.Columns = iop.NewColumns(iop.Column{Name: "myCol"}) applyColumnCasingToDf(df, dbio.TypeDbSnowflake, &sourceCasing) diff --git a/core/sling/task.go b/core/sling/task.go index 8302fdc7..eecab473 100644 --- a/core/sling/task.go +++ b/core/sling/task.go @@ -3,7 +3,6 @@ package sling import ( "math" "os" - "regexp" "strings" "time" @@ -403,48 +402,30 @@ func (t *TaskExecution) getOptionsMap() (options map[string]any) { return } -var matchAllCap = regexp.MustCompile("([a-z0-9])([A-Z])") - // apply column casing -func applyColumnCasingToDf(df *iop.Dataflow, connType dbio.Type, casing *ColumnCasing) { +func applyColumnCasingToDf(df *iop.Dataflow, connType dbio.Type, casing *iop.ColumnCasing) { - if casing == nil || *casing == SourceColumnCasing { + if casing == nil { return } // convert to target system casing - for i := range df.Columns { - df.Columns[i].Name = applyColumnCasing(df.Columns[i].Name, *casing == SnakeColumnCasing, connType) + for i, col := range df.Columns { + df.Columns[i].Name = casing.Apply(col.Name, connType) } - // propagate names + // propagate names to streams for _, ds := range df.Streams { - for i := range ds.Columns { - ds.Columns[i].Name = applyColumnCasing(ds.Columns[i].Name, *casing == SnakeColumnCasing, connType) + for i, col := range ds.Columns { + ds.Columns[i].Name = casing.Apply(col.Name, connType) } - for i := range ds.CurrentBatch.Columns { - ds.CurrentBatch.Columns[i].Name = applyColumnCasing(ds.CurrentBatch.Columns[i].Name, *casing == SnakeColumnCasing, connType) + for i, col := range ds.CurrentBatch.Columns { + ds.CurrentBatch.Columns[i].Name = casing.Apply(col.Name, connType) } } } -func applyColumnCasing(name string, toSnake bool, connType dbio.Type) string { - // convert to snake case - if toSnake { - name = matchAllCap.ReplaceAllString(name, "${1}_${2}") - } - - // clean up other weird chars - name = iop.CleanName(name) - - // lower case for target file system - if connType.DBNameUpperCase() { - return strings.ToUpper(name) - } - return strings.ToLower(name) -} - const ( raiseIssueNotice = "Feel free to open an issue @ https://github.com/slingdata-io/sling-cli" ) diff --git a/core/sling/task_func.go b/core/sling/task_func.go index a3284906..a86bbbc7 100644 --- a/core/sling/task_func.go +++ b/core/sling/task_func.go @@ -185,8 +185,8 @@ func getIncrementalValue(cfg *Config, tgtConn database.Connection, srcConnType d } tgtUpdateKey := cfg.Source.UpdateKey - if cc := cfg.Target.Options.ColumnCasing; cc != nil && *cc != SourceColumnCasing { - tgtUpdateKey = applyColumnCasing(tgtUpdateKey, *cc == SnakeColumnCasing, tgtConn.GetType()) + if cc := cfg.Target.Options.ColumnCasing; cc != nil { + tgtUpdateKey = cc.Apply(tgtUpdateKey, tgtConn.GetType()) } // get target columns to match update-key diff --git a/core/sling/task_run_write.go b/core/sling/task_run_write.go index 319257ef..60c471bc 100644 --- a/core/sling/task_run_write.go +++ b/core/sling/task_run_write.go @@ -794,9 +794,9 @@ func truncateTable(t *TaskExecution, tgtConn database.Connection, tableName stri func performUpsert(tgtConn database.Connection, tableTmp, targetTable database.Table, cfg *Config) error { tgtPrimaryKey := cfg.Source.PrimaryKey() - if cc := cfg.Target.Options.ColumnCasing; cc != nil && *cc != SourceColumnCasing { + if casing := cfg.Target.Options.ColumnCasing; casing != nil { for i, pk := range tgtPrimaryKey { - tgtPrimaryKey[i] = applyColumnCasing(pk, *cc == SnakeColumnCasing, tgtConn.GetType()) + tgtPrimaryKey[i] = casing.Apply(pk, tgtConn.GetType()) } } g.Debug("Performing upsert from temporary table %s to target table %s with primary keys %v",