Skip to content

Commit

Permalink
Add source options: columns
Browse files Browse the repository at this point in the history
  • Loading branch information
flarco committed Aug 25, 2023
1 parent bbc804a commit a3f147d
Show file tree
Hide file tree
Showing 5 changed files with 18 additions and 7 deletions.
4 changes: 4 additions & 0 deletions core/sling/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,9 @@ func (cfg *Config) SetDefault() {
if cfg.Source.Options.MaxDecimals == nil {
cfg.Source.Options.MaxDecimals = sourceOptions.MaxDecimals
}
if cfg.Source.Options.Columns == nil {
cfg.Source.Options.Columns = sourceOptions.Columns
}

// set target options
var targetOptions TargetOptions
Expand Down Expand Up @@ -684,6 +687,7 @@ type SourceOptions struct {
JmesPath *string `json:"jmespath,omitempty" yaml:"jmespath,omitempty"`
Sheet *string `json:"sheet,omitempty" yaml:"sheet,omitempty"`
Range *string `json:"range,omitempty" yaml:"range,omitempty"`
Columns map[string]string `json:"columns,omitempty" yaml:"columns,omitempty"`
}

// TargetOptions are target connection and stream processing options
Expand Down
11 changes: 11 additions & 0 deletions core/sling/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,3 +271,14 @@ func (t *TaskExecution) Cleanup() {
func (t *TaskExecution) usingCheckpoint() bool {
return t.Config.Source.UpdateKey != "" && t.Config.Mode == IncrementalMode
}

func (t *TaskExecution) sourceOptionsMap() (options map[string]any) {
options = g.M()
g.Unmarshal(g.Marshal(t.Config.Source.Options), &options)
options["METADATA"] = g.Marshal(t.getMetadata())
if t.Config.Source.Options.Columns != nil {
// set as string so that StreamProcessor parses it
options["columns"] = g.Marshal(t.Config.Source.Options.Columns)
}
return
}
4 changes: 1 addition & 3 deletions core/sling/task_run.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,9 +148,7 @@ func (t *TaskExecution) getSrcDBConn(ctx context.Context) (conn database.Connect
return conn, nil
}

options := g.M()
g.Unmarshal(g.Marshal(t.Config.Source.Options), &options)
options["METADATA"] = g.Marshal(t.getMetadata())
options := t.sourceOptionsMap()
srcProps := append(
g.MapToKVArr(t.Config.SrcConn.DataS()),
g.MapToKVArr(g.ToMapString(options))...,
Expand Down
4 changes: 1 addition & 3 deletions core/sling/task_run_read.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,9 +150,7 @@ func (t *TaskExecution) ReadFromAPI(cfg *Config, client *airbyte.Airbyte) (df *i
func (t *TaskExecution) ReadFromFile(cfg *Config) (df *iop.Dataflow, err error) {

var stream *iop.Datastream
options := g.M()
g.Unmarshal(g.Marshal(cfg.Source.Options), &options)
options["METADATA"] = g.Marshal(t.getMetadata())
options := t.sourceOptionsMap()

if cfg.SrcConn.URL() != "" {
// construct props by merging with options
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ require (
github.com/c-bata/go-prompt v0.2.6
github.com/denisbrodbeck/machineid v1.0.1
github.com/dustin/go-humanize v1.0.0
github.com/flarco/dbio v0.3.251
github.com/flarco/dbio v0.3.252
github.com/flarco/g v0.1.61
github.com/getsentry/sentry-go v0.11.0
github.com/integrii/flaggy v1.5.2
Expand Down

0 comments on commit a3f147d

Please sign in to comment.