Skip to content

Commit

Permalink
make replication.StreamsOrdered [no test]
Browse files Browse the repository at this point in the history
  • Loading branch information
flarco committed Aug 25, 2023
1 parent a3f147d commit 9a37813
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 3 deletions.
3 changes: 1 addition & 2 deletions cmd/sling/sling_logic.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package main
import (
"os"
"runtime"
"sort"
"strings"
"time"

Expand Down Expand Up @@ -236,7 +235,7 @@ func runReplication(cfgPath string, selectStreams ...string) (err error) {

eG := g.ErrorGroup{}
counter := 0
for _, name := range sort.StringSlice(lo.Keys(replication.Streams)) {
for _, name := range replication.StreamsOrdered() {
if interrupted {
break
}
Expand Down
24 changes: 24 additions & 0 deletions core/sling/replication.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ type ReplicationConfig struct {
Target string `json:"target,omitempty" yaml:"target,omitempty"`
Defaults ReplicationStreamConfig `json:"defaults,omitempty" yaml:"defaults,omitempty"`
Streams map[string]*ReplicationStreamConfig `json:"streams,omitempty" yaml:"streams,omitempty"`

streamsOrdered []string
}

// Scan scan value into Jsonb, implements sql.Scanner interface
Expand All @@ -31,6 +33,11 @@ func (rd ReplicationConfig) Value() (driver.Value, error) {
return g.JSONValuer(rd, "{}")
}

// StreamsOrdered returns the stream names as ordered in the YAML file
func (rd ReplicationConfig) StreamsOrdered() []string {
return rd.streamsOrdered
}

// ProcessWildcards process the streams using wildcards
// such as `my_schema.*` or `my_schema.my_prefix_*` or `my_schema.*_my_suffix`
func (rd *ReplicationConfig) ProcessWildcards() (err error) {
Expand Down Expand Up @@ -195,6 +202,23 @@ func UnmarshalReplication(replicYAML string) (config ReplicationConfig, err erro
return
}

// get streams order
rootMap := yaml.MapSlice{}
err = yaml.Unmarshal([]byte(replicYAML), &rootMap)
if err != nil {
err = g.Error(err, "Error parsing yaml content")
return
}

for _, rootNode := range rootMap {
if cast.ToString(rootNode.Key) == "streams" {
streamsNodes := rootNode.Value.(yaml.MapSlice)
for _, streamsNode := range streamsNodes {
config.streamsOrdered = append(config.streamsOrdered, cast.ToString(streamsNode.Key))
}
}
}

return
}

Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ require (
github.com/c-bata/go-prompt v0.2.6
github.com/denisbrodbeck/machineid v1.0.1
github.com/dustin/go-humanize v1.0.0
github.com/flarco/dbio v0.3.252
github.com/flarco/dbio v0.3.253
github.com/flarco/g v0.1.61
github.com/getsentry/sentry-go v0.11.0
github.com/integrii/flaggy v1.5.2
Expand Down

0 comments on commit 9a37813

Please sign in to comment.