Skip to content

Commit

Permalink
improve sp.SetConfig logic to key based instead of value based
Browse files Browse the repository at this point in the history
  • Loading branch information
flarco committed Oct 24, 2024
1 parent 6d63a1d commit 5b9a7b9
Showing 1 changed file with 51 additions and 42 deletions.
93 changes: 51 additions & 42 deletions core/dbio/iop/stream_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -274,42 +274,42 @@ func (sp *StreamProcessor) SetConfig(configMap map[string]string) {

sp.Config.Map = configMap

if configMap["fields_per_rec"] != "" {
sp.Config.FieldsPerRec = cast.ToInt(configMap["fields_per_rec"])
if val, ok := configMap["fields_per_rec"]; ok {
sp.Config.FieldsPerRec = cast.ToInt(val)
}

if configMap["delimiter"] != "" {
sp.Config.Delimiter = configMap["delimiter"]
if val, ok := configMap["delimiter"]; ok {
sp.Config.Delimiter = val
}

if configMap["escape"] != "" {
sp.Config.Escape = configMap["escape"]
if val, ok := configMap["escape"]; ok {
sp.Config.Escape = val
}

if configMap["quote"] != "" {
sp.Config.Quote = configMap["quote"]
if val, ok := configMap["quote"]; ok {
sp.Config.Quote = val
}

if configMap["file_max_rows"] != "" {
sp.Config.FileMaxRows = cast.ToInt64(configMap["file_max_rows"])
if val, ok := configMap["file_max_rows"]; ok {
sp.Config.FileMaxRows = cast.ToInt64(val)
}

if configMap["file_max_bytes"] != "" {
sp.Config.FileMaxBytes = cast.ToInt64(configMap["file_max_bytes"])
if val, ok := configMap["file_max_bytes"]; ok {
sp.Config.FileMaxBytes = cast.ToInt64(val)
}

if configMap["batch_limit"] != "" {
sp.Config.BatchLimit = cast.ToInt64(configMap["batch_limit"])
if val, ok := configMap["batch_limit"]; ok {
sp.Config.BatchLimit = cast.ToInt64(val)
}

if configMap["header"] != "" {
sp.Config.Header = cast.ToBool(configMap["header"])
if val, ok := configMap["header"]; ok {
sp.Config.Header = cast.ToBool(val)
} else {
sp.Config.Header = true
}

if configMap["flatten"] != "" {
sp.Config.Flatten = cast.ToBool(configMap["flatten"])
if val, ok := configMap["flatten"]; ok {
sp.Config.Flatten = cast.ToBool(val)
}

if configMap["max_decimals"] != "" && configMap["max_decimals"] != "-1" {
Expand All @@ -322,42 +322,51 @@ func (sp *StreamProcessor) SetConfig(configMap map[string]string) {
}
}

if configMap["empty_as_null"] != "" {
sp.Config.EmptyAsNull = cast.ToBool(configMap["empty_as_null"])
if val, ok := configMap["empty_as_null"]; ok {
sp.Config.EmptyAsNull = cast.ToBool(val)
}
if configMap["null_if"] != "" {
sp.Config.NullIf = configMap["null_if"]

if val, ok := configMap["null_if"]; ok {
sp.Config.NullIf = val
}
if configMap["null_as"] != "" {
sp.Config.NullAs = configMap["null_as"]

if val, ok := configMap["null_as"]; ok {
sp.Config.NullAs = val
}
if configMap["trim_space"] != "" {
sp.Config.TrimSpace = cast.ToBool(configMap["trim_space"])
if val, ok := configMap["trim_space"]; ok {
sp.Config.TrimSpace = cast.ToBool(val)
}
if configMap["jmespath"] != "" {
sp.Config.Jmespath = cast.ToString(configMap["jmespath"])

if val, ok := configMap["jmespath"]; ok {
sp.Config.Jmespath = cast.ToString(val)
}
if configMap["sheet"] != "" {
sp.Config.Sheet = cast.ToString(configMap["sheet"])

if val, ok := configMap["sheet"]; ok {
sp.Config.Sheet = cast.ToString(val)
}
if configMap["skip_blank_lines"] != "" {
sp.Config.SkipBlankLines = cast.ToBool(configMap["skip_blank_lines"])

if val, ok := configMap["skip_blank_lines"]; ok {
sp.Config.SkipBlankLines = cast.ToBool(val)
}
if configMap["bool_at_int"] != "" {
sp.Config.BoolAsInt = cast.ToBool(configMap["bool_at_int"])

if val, ok := configMap["bool_at_int"]; ok {
sp.Config.BoolAsInt = cast.ToBool(val)
}
if configMap["columns"] != "" {
g.Unmarshal(configMap["columns"], &sp.Config.Columns)

if val, ok := configMap["columns"]; ok {
g.Unmarshal(val, &sp.Config.Columns)
}
if configMap["transforms"] != "" {
sp.applyTransforms(configMap["transforms"])

if val, ok := configMap["transforms"]; ok {
sp.applyTransforms(val)
}
if configMap["compression"] != "" {
sp.Config.Compression = CompressorType(strings.ToLower(configMap["compression"]))

if val, ok := configMap["compression"]; ok {
sp.Config.Compression = CompressorType(strings.ToLower(val))
}

if configMap["datetime_format"] != "" {
sp.Config.DatetimeFormat = Iso8601ToGoLayout(configMap["datetime_format"])
if val, ok := configMap["datetime_format"]; ok {
sp.Config.DatetimeFormat = Iso8601ToGoLayout(val)
// put in first
sp.dateLayouts = append(
[]string{sp.Config.DatetimeFormat},
Expand Down

0 comments on commit 5b9a7b9

Please sign in to comment.