Skip to content

Commit

Permalink
improve logic of Column Casing
Browse files Browse the repository at this point in the history
  • Loading branch information
flarco committed Oct 24, 2024
1 parent a1cfa5f commit 3cf0bf2
Show file tree
Hide file tree
Showing 8 changed files with 88 additions and 60 deletions.
8 changes: 4 additions & 4 deletions core/dbio/iop/datastream.go
Original file line number Diff line number Diff line change
Expand Up @@ -2024,7 +2024,7 @@ func (ds *Datastream) NewCsvReaderChnl(sc StreamConfig) (readerChn chan *BatchRe
}

if sp.Config.Header {
bw, err := w.Write(batch.Columns.Names(true, true))
bw, err := w.Write(batch.Columns.Names(false, true))
tbw = tbw + cast.ToInt64(bw)
if err != nil {
err = g.Error(err, "error writing header")
Expand Down Expand Up @@ -2109,7 +2109,7 @@ func (ds *Datastream) NewJsonReaderChnl(sc StreamConfig) (readerChn chan *io.Pip
tbw = tbw + cast.ToInt64(bw)

for batch := range ds.BatchChan {
fields := batch.Columns.Names(true)
fields := batch.Columns.Names()

for row0 := range batch.Rows {
c++
Expand Down Expand Up @@ -2182,7 +2182,7 @@ func (ds *Datastream) NewJsonLinesReaderChnl(sc StreamConfig) (readerChn chan *i
c := int64(0) // local counter

for batch := range ds.BatchChan {
fields := batch.Columns.Names(true)
fields := batch.Columns.Names()

for row0 := range batch.Rows {
c++
Expand Down Expand Up @@ -2482,7 +2482,7 @@ func (ds *Datastream) NewCsvReader(sc StreamConfig) *io.PipeReader {
}

if sp.Config.Header {
bw, err := w.Write(batch.Columns.Names(true, true))
bw, err := w.Write(batch.Columns.Names(false, true))
tbw = tbw + cast.ToInt64(bw)
if err != nil {
ds.Context.CaptureErr(g.Error(err, "error writing header"))
Expand Down
60 changes: 60 additions & 0 deletions core/dbio/iop/datatype.go
Original file line number Diff line number Diff line change
Expand Up @@ -1255,3 +1255,63 @@ func FormatValue(val any, column Column, connType dbio.Type) (newVal string) {
}
return
}

// ColumnCasing is the casing method to use
type ColumnCasing string

const (
SourceColumnCasing ColumnCasing = "source" // keeps source column name casing. The default.
TargetColumnCasing ColumnCasing = "target" // converts casing according to target database. Lower-case for files.
SnakeColumnCasing ColumnCasing = "snake" // converts snake casing according to target database. Lower-case for files.
UpperColumnCasing ColumnCasing = "upper" // make it upper case
LowerColumnCasing ColumnCasing = "lower" // make it lower case
)

// Equals evaluates equality for column casing (pointer safe)
func (cc *ColumnCasing) Equals(val ColumnCasing) bool {
if cc.IsEmpty() {
return false
}
return *cc == val
}

// IsEmpty return true if nil or blank
func (cc *ColumnCasing) IsEmpty() bool {
if cc == nil || string(*cc) == "" {
return true
}
return false
}

var matchAllCap = regexp.MustCompile("([a-z0-9])([A-Z])")

// Apply applies column casing to provided name.
// If cc is nil or SourceColumnCasing, it returns the original value
func (cc *ColumnCasing) Apply(name string, tgtConnType dbio.Type) string {
if cc.IsEmpty() || cc.Equals(SourceColumnCasing) {
return name
}

// convert to snake case
if cc.Equals(SnakeColumnCasing) {
name = matchAllCap.ReplaceAllString(name, "${1}_${2}")
}

// clean up other weird chars
name = CleanName(name)

switch {
case cc.Equals(UpperColumnCasing):
return strings.ToUpper(name)
case cc.Equals(LowerColumnCasing):
return strings.ToLower(name)
case cc.Equals(TargetColumnCasing), cc.Equals(SnakeColumnCasing):
// lower case for target file system
if tgtConnType.DBNameUpperCase() {
return strings.ToUpper(name)
}
return strings.ToLower(name)
}

return name
}
5 changes: 5 additions & 0 deletions core/dbio/iop/stream_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ type StreamConfig struct {
FieldsPerRec int `json:"fields_per_rec"`
Jmespath string `json:"jmespath"`
Sheet string `json:"sheet"`
ColumnCasing ColumnCasing `json:"column_casing"`
BoolAsInt bool `json:"-"`
Columns Columns `json:"columns"` // list of column types. Can be partial list! likely is!
transforms map[string][]Transform // array of transform functions to apply
Expand Down Expand Up @@ -349,6 +350,10 @@ func (sp *StreamProcessor) SetConfig(configMap map[string]string) {
sp.Config.SkipBlankLines = cast.ToBool(val)
}

if val, ok := configMap["column_casing"]; ok {
sp.Config.ColumnCasing = ColumnCasing(val)
}

if val, ok := configMap["bool_at_int"]; ok {
sp.Config.BoolAsInt = cast.ToBool(val)
}
Expand Down
24 changes: 3 additions & 21 deletions core/sling/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,24 +53,6 @@ var AllMode = []struct {
{BackfillMode, "BackfillMode"},
}

// ColumnCasing is the casing method to use
type ColumnCasing string

const (
SourceColumnCasing ColumnCasing = "source" // keeps source column name casing. The default.
TargetColumnCasing ColumnCasing = "target" // converts casing according to target database. Lower-case for files.
SnakeColumnCasing ColumnCasing = "snake" // converts snake casing according to target database. Lower-case for files.
)

var AllColumnCasing = []struct {
Value ColumnCasing
TSName string
}{
{SourceColumnCasing, "SourceColumnCasing"},
{TargetColumnCasing, "TargetColumnCasing"},
{SnakeColumnCasing, "SnakeColumnCasing"},
}

// NewConfig return a config object from a YAML / JSON string
func NewConfig(cfgStr string) (cfg *Config, err error) {
// set default, unmarshalling will overwrite
Expand Down Expand Up @@ -1295,7 +1277,7 @@ type TargetOptions struct {
IgnoreExisting *bool `json:"ignore_existing,omitempty" yaml:"ignore_existing,omitempty"`
AddNewColumns *bool `json:"add_new_columns,omitempty" yaml:"add_new_columns,omitempty"`
AdjustColumnType *bool `json:"adjust_column_type,omitempty" yaml:"adjust_column_type,omitempty"`
ColumnCasing *ColumnCasing `json:"column_casing,omitempty" yaml:"column_casing,omitempty"`
ColumnCasing *iop.ColumnCasing `json:"column_casing,omitempty" yaml:"column_casing,omitempty"`

TableKeys database.TableKeys `json:"table_keys,omitempty" yaml:"table_keys,omitempty"`
TableTmp string `json:"table_tmp,omitempty" yaml:"table_tmp,omitempty"`
Expand Down Expand Up @@ -1353,7 +1335,7 @@ var TargetFileOptionsDefault = TargetOptions{
DatetimeFormat: "auto",
Delimiter: ",",
MaxDecimals: g.Int(-1),
ColumnCasing: (*ColumnCasing)(g.String(string(SourceColumnCasing))),
ColumnCasing: g.Ptr(iop.SourceColumnCasing),
}

var TargetDBOptionsDefault = TargetOptions{
Expand All @@ -1367,7 +1349,7 @@ var TargetDBOptionsDefault = TargetOptions{
AdjustColumnType: g.Bool(false),
DatetimeFormat: "auto",
MaxDecimals: g.Int(-1),
ColumnCasing: (*ColumnCasing)(g.String(string(SourceColumnCasing))),
ColumnCasing: g.Ptr(iop.SourceColumnCasing),
}

func (o *SourceOptions) SetDefaults(sourceOptions SourceOptions) {
Expand Down
6 changes: 3 additions & 3 deletions core/sling/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,9 @@ func TestConfig(t *testing.T) {
func TestColumnCasing(t *testing.T) {
df := iop.NewDataflow(0)

sourceCasing := SourceColumnCasing
snakeCasing := SnakeColumnCasing
targetCasing := TargetColumnCasing
sourceCasing := iop.SourceColumnCasing
snakeCasing := iop.SnakeColumnCasing
targetCasing := iop.TargetColumnCasing

df.Columns = iop.NewColumns(iop.Column{Name: "myCol"})
applyColumnCasingToDf(df, dbio.TypeDbSnowflake, &sourceCasing)
Expand Down
37 changes: 9 additions & 28 deletions core/sling/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package sling
import (
"math"
"os"
"regexp"
"strings"
"time"

Expand Down Expand Up @@ -403,48 +402,30 @@ func (t *TaskExecution) getOptionsMap() (options map[string]any) {
return
}

var matchAllCap = regexp.MustCompile("([a-z0-9])([A-Z])")

// apply column casing
func applyColumnCasingToDf(df *iop.Dataflow, connType dbio.Type, casing *ColumnCasing) {
func applyColumnCasingToDf(df *iop.Dataflow, connType dbio.Type, casing *iop.ColumnCasing) {

if casing == nil || *casing == SourceColumnCasing {
if casing == nil {
return
}

// convert to target system casing
for i := range df.Columns {
df.Columns[i].Name = applyColumnCasing(df.Columns[i].Name, *casing == SnakeColumnCasing, connType)
for i, col := range df.Columns {
df.Columns[i].Name = casing.Apply(col.Name, connType)
}

// propagate names
// propagate names to streams
for _, ds := range df.Streams {
for i := range ds.Columns {
ds.Columns[i].Name = applyColumnCasing(ds.Columns[i].Name, *casing == SnakeColumnCasing, connType)
for i, col := range ds.Columns {
ds.Columns[i].Name = casing.Apply(col.Name, connType)
}

for i := range ds.CurrentBatch.Columns {
ds.CurrentBatch.Columns[i].Name = applyColumnCasing(ds.CurrentBatch.Columns[i].Name, *casing == SnakeColumnCasing, connType)
for i, col := range ds.CurrentBatch.Columns {
ds.CurrentBatch.Columns[i].Name = casing.Apply(col.Name, connType)
}
}
}

func applyColumnCasing(name string, toSnake bool, connType dbio.Type) string {
// convert to snake case
if toSnake {
name = matchAllCap.ReplaceAllString(name, "${1}_${2}")
}

// clean up other weird chars
name = iop.CleanName(name)

// lower case for target file system
if connType.DBNameUpperCase() {
return strings.ToUpper(name)
}
return strings.ToLower(name)
}

const (
raiseIssueNotice = "Feel free to open an issue @ https://github.com/slingdata-io/sling-cli"
)
Expand Down
4 changes: 2 additions & 2 deletions core/sling/task_func.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,8 +185,8 @@ func getIncrementalValue(cfg *Config, tgtConn database.Connection, srcConnType d
}

tgtUpdateKey := cfg.Source.UpdateKey
if cc := cfg.Target.Options.ColumnCasing; cc != nil && *cc != SourceColumnCasing {
tgtUpdateKey = applyColumnCasing(tgtUpdateKey, *cc == SnakeColumnCasing, tgtConn.GetType())
if cc := cfg.Target.Options.ColumnCasing; cc != nil {
tgtUpdateKey = cc.Apply(tgtUpdateKey, tgtConn.GetType())
}

// get target columns to match update-key
Expand Down
4 changes: 2 additions & 2 deletions core/sling/task_run_write.go
Original file line number Diff line number Diff line change
Expand Up @@ -794,9 +794,9 @@ func truncateTable(t *TaskExecution, tgtConn database.Connection, tableName stri

func performUpsert(tgtConn database.Connection, tableTmp, targetTable database.Table, cfg *Config) error {
tgtPrimaryKey := cfg.Source.PrimaryKey()
if cc := cfg.Target.Options.ColumnCasing; cc != nil && *cc != SourceColumnCasing {
if casing := cfg.Target.Options.ColumnCasing; casing != nil {
for i, pk := range tgtPrimaryKey {
tgtPrimaryKey[i] = applyColumnCasing(pk, *cc == SnakeColumnCasing, tgtConn.GetType())
tgtPrimaryKey[i] = casing.Apply(pk, tgtConn.GetType())
}
}
g.Debug("Performing upsert from temporary table %s to target table %s with primary keys %v",
Expand Down

0 comments on commit 3cf0bf2

Please sign in to comment.