diff --git a/cmd/internal/discover.go b/cmd/internal/discover.go index b8b1ee2..00bbc31 100644 --- a/cmd/internal/discover.go +++ b/cmd/internal/discover.go @@ -4,9 +4,15 @@ import ( "context" "fmt" "github.com/pkg/errors" + "strings" ) -func Discover(ctx context.Context, source PlanetScaleSource, mysql PlanetScaleEdgeMysqlAccess) (Catalog, error) { +type DiscoverSettings struct { + AutoSelectTables bool + ExcludedTables []string +} + +func Discover(ctx context.Context, source PlanetScaleSource, mysql PlanetScaleEdgeMysqlAccess, settings DiscoverSettings) (Catalog, error) { var c Catalog if err := mysql.PingContext(ctx, source); err != nil { return c, errors.Wrap(err, "unable to access PlanetScale Database") @@ -17,7 +23,13 @@ func Discover(ctx context.Context, source PlanetScaleSource, mysql PlanetScaleEd return c, errors.Wrap(err, "unable to retrieve table names") } + excludedTables := strings.Join(settings.ExcludedTables, " ") + for _, name := range tableNames { + if len(excludedTables) > 0 && strings.Contains(excludedTables, name) { + continue + } + table := Stream{ Name: name, ID: fmt.Sprintf("%s:%s", source.Database, name), @@ -39,7 +51,7 @@ func Discover(ctx context.Context, source PlanetScaleSource, mysql PlanetScaleEd } table.KeyProperties = keyProperties table.CursorProperties = keyProperties - table.GenerateMetadata(keyProperties) + table.GenerateMetadata(keyProperties, settings.AutoSelectTables) c.Streams = append(c.Streams, table) } diff --git a/cmd/internal/discover_test.go b/cmd/internal/discover_test.go index c35d2f3..2cfeb8b 100644 --- a/cmd/internal/discover_test.go +++ b/cmd/internal/discover_test.go @@ -9,10 +9,11 @@ import ( func TestDiscover_CanFailIfCredentialsInvalid(t *testing.T) { tma := getTestMysqlAccess() + settings := DiscoverSettings{} tma.PingContextFn = func(ctx context.Context, source PlanetScaleSource) error { return errors.New("Access Denied") } - _, err := Discover(context.Background(), PlanetScaleSource{}, tma) + _, err := Discover(context.Background(), PlanetScaleSource{}, tma, settings) assert.NotNil(t, err) assert.ErrorContains(t, err, "unable to access PlanetScale Database: Access Denied") assert.True(t, tma.PingContextFnInvoked) @@ -25,7 +26,7 @@ func TestDiscover_CanFailIfCannotQuery(t *testing.T) { return []string{}, errors.New("read prohibited") } - _, err := Discover(context.Background(), PlanetScaleSource{}, tma) + _, err := Discover(context.Background(), PlanetScaleSource{}, tma, DiscoverSettings{}) assert.NotNil(t, err) assert.ErrorContains(t, err, "unable to retrieve table names: read prohibited") assert.True(t, tma.PingContextFnInvoked) @@ -54,7 +55,7 @@ func TestDiscover_SchemaHasPrimaryKeys(t *testing.T) { }, nil } - c, err := Discover(context.Background(), PlanetScaleSource{}, tma) + c, err := Discover(context.Background(), PlanetScaleSource{}, tma, DiscoverSettings{}) assert.Nil(t, err) assert.True(t, tma.PingContextFnInvoked) assert.Len(t, c.Streams, 1) @@ -84,7 +85,7 @@ func TestDiscover_SchemaHasCursorProperties(t *testing.T) { }, nil } - c, err := Discover(context.Background(), PlanetScaleSource{}, tma) + c, err := Discover(context.Background(), PlanetScaleSource{}, tma, DiscoverSettings{}) assert.Nil(t, err) assert.True(t, tma.PingContextFnInvoked) assert.Len(t, c.Streams, 1) @@ -92,6 +93,109 @@ func TestDiscover_SchemaHasCursorProperties(t *testing.T) { assert.Equal(t, []string{"emp_no"}, emp.CursorProperties) } +func TestDiscover_CanSelectAllTables(t *testing.T) { + tma := getTestMysqlAccess() + tma.GetTableNamesFn = func(ctx context.Context, source PlanetScaleSource) ([]string, error) { + return []string{ + "employees", + }, nil + } + + tma.GetTableSchemaFn = func(ctx context.Context, source PlanetScaleSource, s string) (map[string]StreamProperty, error) { + return map[string]StreamProperty{ + "emp_no": {Types: []string{"null", "string"}}, + "first_name": {Types: []string{"null", "string"}}, + "last_name": {Types: []string{"null", "string"}}, + }, nil + } + + tma.GetTablePrimaryKeysFn = func(ctx context.Context, source PlanetScaleSource, s string) ([]string, error) { + return []string{ + "emp_no", + }, nil + } + + c, err := Discover(context.Background(), PlanetScaleSource{}, tma, DiscoverSettings{ + AutoSelectTables: true, + }) + assert.Nil(t, err) + assert.True(t, tma.PingContextFnInvoked) + assert.Len(t, c.Streams, 1) + emp := c.Streams[0] + mm := emp.Metadata.GetPropertyMap() + assert.Equal(t, NodeMetadata{ + Inclusion: "automatic", + Selected: true, + BreadCrumb: []string{"properties", "emp_no"}, + }, mm["emp_no"].Metadata, "key properties should be auto-included") + + assert.Equal(t, NodeMetadata{ + Inclusion: "available", + Selected: true, + BreadCrumb: []string{"properties", "first_name"}, + }, mm["first_name"].Metadata, "non-key properties should be auto-selected") + + assert.Equal(t, NodeMetadata{ + Inclusion: "available", + Selected: true, + BreadCrumb: []string{"properties", "last_name"}, + }, mm["last_name"].Metadata, "non-key properties should be auto-selected") +} + +func TestDiscover_CanExcludeTables(t *testing.T) { + tma := getTestMysqlAccess() + tma.GetTableNamesFn = func(ctx context.Context, source PlanetScaleSource) ([]string, error) { + return []string{ + "employees", + "customers", + }, nil + } + + tma.GetTableSchemaFn = func(ctx context.Context, source PlanetScaleSource, s string) (map[string]StreamProperty, error) { + return map[string]StreamProperty{ + "emp_no": {Types: []string{"null", "string"}}, + "first_name": {Types: []string{"null", "string"}}, + "last_name": {Types: []string{"null", "string"}}, + }, nil + } + + tma.GetTablePrimaryKeysFn = func(ctx context.Context, source PlanetScaleSource, s string) ([]string, error) { + return []string{ + "emp_no", + }, nil + } + + c, err := Discover(context.Background(), PlanetScaleSource{}, tma, DiscoverSettings{ + AutoSelectTables: true, + ExcludedTables: []string{ + "employees", + }, + }) + assert.Nil(t, err) + assert.True(t, tma.PingContextFnInvoked) + assert.Len(t, c.Streams, 1) + emp := c.Streams[0] + assert.Equal(t, emp.Name, "customers") + mm := emp.Metadata.GetPropertyMap() + assert.Equal(t, NodeMetadata{ + Inclusion: "automatic", + Selected: true, + BreadCrumb: []string{"properties", "emp_no"}, + }, mm["emp_no"].Metadata, "key properties should be auto-included") + + assert.Equal(t, NodeMetadata{ + Inclusion: "available", + Selected: true, + BreadCrumb: []string{"properties", "first_name"}, + }, mm["first_name"].Metadata, "non-key properties should be auto-selected") + + assert.Equal(t, NodeMetadata{ + Inclusion: "available", + Selected: true, + BreadCrumb: []string{"properties", "last_name"}, + }, mm["last_name"].Metadata, "non-key properties should be auto-selected") +} + func TestDiscover_SchemaHasValidMetadata(t *testing.T) { tma := getTestMysqlAccess() tma.GetTableNamesFn = func(ctx context.Context, source PlanetScaleSource) ([]string, error) { @@ -114,7 +218,7 @@ func TestDiscover_SchemaHasValidMetadata(t *testing.T) { }, nil } - c, err := Discover(context.Background(), PlanetScaleSource{}, tma) + c, err := Discover(context.Background(), PlanetScaleSource{}, tma, DiscoverSettings{}) assert.Nil(t, err) assert.True(t, tma.PingContextFnInvoked) assert.Len(t, c.Streams, 1) diff --git a/cmd/internal/types.go b/cmd/internal/types.go index feeb141..e905c58 100644 --- a/cmd/internal/types.go +++ b/cmd/internal/types.go @@ -197,15 +197,15 @@ func (m MetadataCollection) GetPropertyMap() map[string]Metadata { return propertyMap } -func (s *Stream) GenerateMetadata(keyProperties []string) error { - streamMetadata := NewMetadata() +func (s *Stream) GenerateMetadata(keyProperties []string, autoSelect bool) error { + streamMetadata := NewMetadata(autoSelect) streamMetadata.Metadata.TableKeyProperties = keyProperties streamMetadata.Metadata.ValidReplicationKeys = keyProperties // need this to be an empty array since Singer needs an empty JSON array here. streamMetadata.Metadata.BreadCrumb = []string{} s.Metadata = append(s.Metadata, streamMetadata) for key := range s.Schema.Properties { - propertyMetadata := NewMetadata() + propertyMetadata := NewMetadata(autoSelect) propertyMetadata.Metadata.BreadCrumb = []string{ "properties", key, } @@ -222,11 +222,11 @@ func (s *Stream) GenerateMetadata(keyProperties []string) error { } return nil } -func NewMetadata() Metadata { +func NewMetadata(autoSelect bool) Metadata { return Metadata{ Metadata: NodeMetadata{ Inclusion: "available", - Selected: false, + Selected: autoSelect, }, } } diff --git a/cmd/singer-tap/main.go b/cmd/singer-tap/main.go index b3b26eb..404e890 100644 --- a/cmd/singer-tap/main.go +++ b/cmd/singer-tap/main.go @@ -101,7 +101,7 @@ func discover(ctx context.Context, logger internal.Logger, source internal.Plane } defer mysql.Close() - catalog, err := internal.Discover(ctx, source, mysql) + catalog, err := internal.Discover(ctx, source, mysql, internal.DiscoverSettings{}) if err != nil { return errors.Wrap(err, "unable to discover schema for PlanetScale database") }