Skip to content

Commit

Permalink
update ProcessWildcards to use streamsOrdered [no test]
Browse files Browse the repository at this point in the history
  • Loading branch information
flarco committed Aug 25, 2023
1 parent 9a37813 commit 6bb25d3
Showing 1 changed file with 14 additions and 7 deletions.
21 changes: 14 additions & 7 deletions core/sling/replication.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,9 @@ func (rd ReplicationConfig) StreamsOrdered() []string {
func (rd *ReplicationConfig) ProcessWildcards() (err error) {
wildcardNames := []string{}
for name := range rd.Streams {
if strings.Contains(name, "*") {
if name == "*" {
return g.Error("Must specify schema when using wildcard: 'my_schema.*', not '*'")
} else if strings.Contains(name, "*") {
wildcardNames = append(wildcardNames, name)
}
}
Expand All @@ -70,17 +72,17 @@ func (rd *ReplicationConfig) ProcessWildcards() (err error) {
return g.Error(err, "could not connect to database for wildcard processing: %s", rd.Source)
}

for _, name := range wildcardNames {
schemaT, err := database.ParseTableName(name, c.Connection.Type)
for _, wildcardName := range wildcardNames {
schemaT, err := database.ParseTableName(wildcardName, c.Connection.Type)
if err != nil {
return g.Error(err, "could not parse stream name: %s", name)
return g.Error(err, "could not parse stream name: %s", wildcardName)
} else if schemaT.Schema == "" {
continue
}

if schemaT.Name == "*" {
// get all tables in schema
g.Debug("getting tables for %s", name)
g.Debug("getting tables for %s", wildcardName)
data, err := conn.GetTables(schemaT.Schema)
if err != nil {
return g.Error(err, "could not get tables for schema: %s", schemaT.Schema)
Expand All @@ -94,12 +96,17 @@ func (rd *ReplicationConfig) ProcessWildcards() (err error) {

// add to stream map
newCfg := ReplicationStreamConfig{}
g.Unmarshal(g.Marshal(rd.Streams[name]), &newCfg) // copy config over
g.Unmarshal(g.Marshal(rd.Streams[wildcardName]), &newCfg) // copy config over
rd.Streams[table.FullName()] = &newCfg
rd.streamsOrdered = append(rd.streamsOrdered, table.FullName())
}

// delete * from stream map
delete(rd.Streams, name)
delete(rd.Streams, wildcardName)
rd.streamsOrdered = lo.Filter(rd.streamsOrdered, func(v string, i int) bool {
return v != wildcardName
})

}
}

Expand Down

0 comments on commit 6bb25d3

Please sign in to comment.