diff --git a/cmd/sling/sling_logic.go b/cmd/sling/sling_logic.go index ce5d99629..339f01d16 100755 --- a/cmd/sling/sling_logic.go +++ b/cmd/sling/sling_logic.go @@ -3,7 +3,6 @@ package main import ( "os" "runtime" - "sort" "strings" "time" @@ -236,7 +235,7 @@ func runReplication(cfgPath string, selectStreams ...string) (err error) { eG := g.ErrorGroup{} counter := 0 - for _, name := range sort.StringSlice(lo.Keys(replication.Streams)) { + for _, name := range replication.StreamsOrdered() { if interrupted { break } diff --git a/core/sling/replication.go b/core/sling/replication.go index d8f54aa7b..27e8c2148 100644 --- a/core/sling/replication.go +++ b/core/sling/replication.go @@ -19,6 +19,8 @@ type ReplicationConfig struct { Target string `json:"target,omitempty" yaml:"target,omitempty"` Defaults ReplicationStreamConfig `json:"defaults,omitempty" yaml:"defaults,omitempty"` Streams map[string]*ReplicationStreamConfig `json:"streams,omitempty" yaml:"streams,omitempty"` + + streamsOrdered []string } // Scan scan value into Jsonb, implements sql.Scanner interface @@ -31,6 +33,11 @@ func (rd ReplicationConfig) Value() (driver.Value, error) { return g.JSONValuer(rd, "{}") } +// StreamsOrdered returns the stream names as ordered in the YAML file +func (rd ReplicationConfig) StreamsOrdered() []string { + return rd.streamsOrdered +} + // ProcessWildcards process the streams using wildcards // such as `my_schema.*` or `my_schema.my_prefix_*` or `my_schema.*_my_suffix` func (rd *ReplicationConfig) ProcessWildcards() (err error) { @@ -195,6 +202,23 @@ func UnmarshalReplication(replicYAML string) (config ReplicationConfig, err erro return } + // get streams order + rootMap := yaml.MapSlice{} + err = yaml.Unmarshal([]byte(replicYAML), &rootMap) + if err != nil { + err = g.Error(err, "Error parsing yaml content") + return + } + + for _, rootNode := range rootMap { + if cast.ToString(rootNode.Key) == "streams" { + streamsNodes := rootNode.Value.(yaml.MapSlice) + for _, streamsNode := range streamsNodes { + config.streamsOrdered = append(config.streamsOrdered, cast.ToString(streamsNode.Key)) + } + } + } + return } diff --git a/go.mod b/go.mod index 3fa10fcdb..917254b23 100644 --- a/go.mod +++ b/go.mod @@ -6,7 +6,7 @@ require ( github.com/c-bata/go-prompt v0.2.6 github.com/denisbrodbeck/machineid v1.0.1 github.com/dustin/go-humanize v1.0.0 - github.com/flarco/dbio v0.3.252 + github.com/flarco/dbio v0.3.253 github.com/flarco/g v0.1.61 github.com/getsentry/sentry-go v0.11.0 github.com/integrii/flaggy v1.5.2