diff --git a/cmd/internal/discover.go b/cmd/internal/discover.go index 00bbc31..7ff4f0c 100644 --- a/cmd/internal/discover.go +++ b/cmd/internal/discover.go @@ -8,8 +8,9 @@ import ( ) type DiscoverSettings struct { - AutoSelectTables bool - ExcludedTables []string + AutoSelectTables bool + ExcludedTables []string + UseIncrementalSync bool } func Discover(ctx context.Context, source PlanetScaleSource, mysql PlanetScaleEdgeMysqlAccess, settings DiscoverSettings) (Catalog, error) { @@ -51,7 +52,7 @@ func Discover(ctx context.Context, source PlanetScaleSource, mysql PlanetScaleEd } table.KeyProperties = keyProperties table.CursorProperties = keyProperties - table.GenerateMetadata(keyProperties, settings.AutoSelectTables) + table.GenerateMetadata(keyProperties, settings.AutoSelectTables, settings.UseIncrementalSync) c.Streams = append(c.Streams, table) } diff --git a/cmd/internal/types.go b/cmd/internal/types.go index e905c58..717cc05 100644 --- a/cmd/internal/types.go +++ b/cmd/internal/types.go @@ -197,9 +197,13 @@ func (m MetadataCollection) GetPropertyMap() map[string]Metadata { return propertyMap } -func (s *Stream) GenerateMetadata(keyProperties []string, autoSelect bool) error { +func (s *Stream) GenerateMetadata(keyProperties []string, autoSelect, useIncrementalSync bool) error { streamMetadata := NewMetadata(autoSelect) streamMetadata.Metadata.TableKeyProperties = keyProperties + if useIncrementalSync { + streamMetadata.Metadata.ReplicationMethod = "INCREMENTAL" + } + streamMetadata.Metadata.ValidReplicationKeys = keyProperties // need this to be an empty array since Singer needs an empty JSON array here. streamMetadata.Metadata.BreadCrumb = []string{} diff --git a/cmd/singer-tap/main.go b/cmd/singer-tap/main.go index 4b4b9ca..049b6d8 100644 --- a/cmd/singer-tap/main.go +++ b/cmd/singer-tap/main.go @@ -14,15 +14,16 @@ import ( ) var ( - version string - commit string - date string - discoverMode bool - catalogFilePath string - configFilePath string - stateFilePath string - autoSelect bool - excludedTables string + version string + commit string + date string + discoverMode bool + catalogFilePath string + configFilePath string + stateFilePath string + autoSelect bool + useIncrementalSync bool + excludedTables string ) func init() { @@ -31,6 +32,7 @@ func init() { flag.StringVar(&catalogFilePath, "catalog", "", "path to a catalog file for this tap") flag.StringVar(&stateFilePath, "state", "", "path to state file for this configuration") flag.BoolVar(&autoSelect, "auto-select", false, "(discover mode only) select all tables & columns in the schema") + flag.BoolVar(&useIncrementalSync, "incremental", false, "(discover mode only) all tables & views will be synced incrementally") flag.StringVar(&excludedTables, "excluded-tables", "", "(discover mode only) comma separated list of tables & views to exclude.") } @@ -66,7 +68,8 @@ func execute(discoverMode bool, logger internal.Logger, configFilePath, catalogF if discoverMode { logger.Info("running in discovery mode") settings := internal.DiscoverSettings{ - AutoSelectTables: autoSelect, + AutoSelectTables: autoSelect, + UseIncrementalSync: useIncrementalSync, } if len(excludedTables) > 0 {