Skip to content

Commit

Permalink
add --incremental flag
Browse files Browse the repository at this point in the history
  • Loading branch information
Phani Raj committed Aug 1, 2022
1 parent 808bfee commit 021a393
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 14 deletions.
7 changes: 4 additions & 3 deletions cmd/internal/discover.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,9 @@ import (
)

type DiscoverSettings struct {
AutoSelectTables bool
ExcludedTables []string
AutoSelectTables bool
ExcludedTables []string
UseIncrementalSync bool
}

func Discover(ctx context.Context, source PlanetScaleSource, mysql PlanetScaleEdgeMysqlAccess, settings DiscoverSettings) (Catalog, error) {
Expand Down Expand Up @@ -51,7 +52,7 @@ func Discover(ctx context.Context, source PlanetScaleSource, mysql PlanetScaleEd
}
table.KeyProperties = keyProperties
table.CursorProperties = keyProperties
table.GenerateMetadata(keyProperties, settings.AutoSelectTables)
table.GenerateMetadata(keyProperties, settings.AutoSelectTables, settings.UseIncrementalSync)

c.Streams = append(c.Streams, table)
}
Expand Down
6 changes: 5 additions & 1 deletion cmd/internal/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,9 +197,13 @@ func (m MetadataCollection) GetPropertyMap() map[string]Metadata {
return propertyMap
}

func (s *Stream) GenerateMetadata(keyProperties []string, autoSelect bool) error {
func (s *Stream) GenerateMetadata(keyProperties []string, autoSelect, useIncrementalSync bool) error {
streamMetadata := NewMetadata(autoSelect)
streamMetadata.Metadata.TableKeyProperties = keyProperties
if useIncrementalSync {
streamMetadata.Metadata.ReplicationMethod = "INCREMENTAL"
}

streamMetadata.Metadata.ValidReplicationKeys = keyProperties
// need this to be an empty array since Singer needs an empty JSON array here.
streamMetadata.Metadata.BreadCrumb = []string{}
Expand Down
23 changes: 13 additions & 10 deletions cmd/singer-tap/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,16 @@ import (
)

var (
version string
commit string
date string
discoverMode bool
catalogFilePath string
configFilePath string
stateFilePath string
autoSelect bool
excludedTables string
version string
commit string
date string
discoverMode bool
catalogFilePath string
configFilePath string
stateFilePath string
autoSelect bool
useIncrementalSync bool
excludedTables string
)

func init() {
Expand All @@ -31,6 +32,7 @@ func init() {
flag.StringVar(&catalogFilePath, "catalog", "", "path to a catalog file for this tap")
flag.StringVar(&stateFilePath, "state", "", "path to state file for this configuration")
flag.BoolVar(&autoSelect, "auto-select", false, "(discover mode only) select all tables & columns in the schema")
flag.BoolVar(&useIncrementalSync, "incremental", false, "(discover mode only) all tables & views will be synced incrementally")
flag.StringVar(&excludedTables, "excluded-tables", "", "(discover mode only) comma separated list of tables & views to exclude.")
}

Expand Down Expand Up @@ -66,7 +68,8 @@ func execute(discoverMode bool, logger internal.Logger, configFilePath, catalogF
if discoverMode {
logger.Info("running in discovery mode")
settings := internal.DiscoverSettings{
AutoSelectTables: autoSelect,
AutoSelectTables: autoSelect,
UseIncrementalSync: useIncrementalSync,
}

if len(excludedTables) > 0 {
Expand Down

0 comments on commit 021a393

Please sign in to comment.