Skip to content

Commit

Permalink
Enable auto-select & excluding tables from discover
Browse files Browse the repository at this point in the history
  • Loading branch information
Phani Raj committed Aug 1, 2022
1 parent 4e6e105 commit 244163f
Show file tree
Hide file tree
Showing 4 changed files with 129 additions and 13 deletions.
16 changes: 14 additions & 2 deletions cmd/internal/discover.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,15 @@ import (
"context"
"fmt"
"github.com/pkg/errors"
"strings"
)

func Discover(ctx context.Context, source PlanetScaleSource, mysql PlanetScaleEdgeMysqlAccess) (Catalog, error) {
type DiscoverSettings struct {
AutoSelectTables bool
ExcludedTables []string
}

func Discover(ctx context.Context, source PlanetScaleSource, mysql PlanetScaleEdgeMysqlAccess, settings DiscoverSettings) (Catalog, error) {
var c Catalog
if err := mysql.PingContext(ctx, source); err != nil {
return c, errors.Wrap(err, "unable to access PlanetScale Database")
Expand All @@ -17,7 +23,13 @@ func Discover(ctx context.Context, source PlanetScaleSource, mysql PlanetScaleEd
return c, errors.Wrap(err, "unable to retrieve table names")
}

excludedTables := strings.Join(settings.ExcludedTables, " ")

for _, name := range tableNames {
if len(excludedTables) > 0 && strings.Contains(excludedTables, name) {
continue
}

table := Stream{
Name: name,
ID: fmt.Sprintf("%s:%s", source.Database, name),
Expand All @@ -39,7 +51,7 @@ func Discover(ctx context.Context, source PlanetScaleSource, mysql PlanetScaleEd
}
table.KeyProperties = keyProperties
table.CursorProperties = keyProperties
table.GenerateMetadata(keyProperties)
table.GenerateMetadata(keyProperties, settings.AutoSelectTables)

c.Streams = append(c.Streams, table)
}
Expand Down
114 changes: 109 additions & 5 deletions cmd/internal/discover_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,11 @@ import (

func TestDiscover_CanFailIfCredentialsInvalid(t *testing.T) {
tma := getTestMysqlAccess()
settings := DiscoverSettings{}
tma.PingContextFn = func(ctx context.Context, source PlanetScaleSource) error {
return errors.New("Access Denied")
}
_, err := Discover(context.Background(), PlanetScaleSource{}, tma)
_, err := Discover(context.Background(), PlanetScaleSource{}, tma, settings)
assert.NotNil(t, err)
assert.ErrorContains(t, err, "unable to access PlanetScale Database: Access Denied")
assert.True(t, tma.PingContextFnInvoked)
Expand All @@ -25,7 +26,7 @@ func TestDiscover_CanFailIfCannotQuery(t *testing.T) {
return []string{}, errors.New("read prohibited")
}

_, err := Discover(context.Background(), PlanetScaleSource{}, tma)
_, err := Discover(context.Background(), PlanetScaleSource{}, tma, DiscoverSettings{})
assert.NotNil(t, err)
assert.ErrorContains(t, err, "unable to retrieve table names: read prohibited")
assert.True(t, tma.PingContextFnInvoked)
Expand Down Expand Up @@ -54,7 +55,7 @@ func TestDiscover_SchemaHasPrimaryKeys(t *testing.T) {
}, nil
}

c, err := Discover(context.Background(), PlanetScaleSource{}, tma)
c, err := Discover(context.Background(), PlanetScaleSource{}, tma, DiscoverSettings{})
assert.Nil(t, err)
assert.True(t, tma.PingContextFnInvoked)
assert.Len(t, c.Streams, 1)
Expand Down Expand Up @@ -84,14 +85,117 @@ func TestDiscover_SchemaHasCursorProperties(t *testing.T) {
}, nil
}

c, err := Discover(context.Background(), PlanetScaleSource{}, tma)
c, err := Discover(context.Background(), PlanetScaleSource{}, tma, DiscoverSettings{})
assert.Nil(t, err)
assert.True(t, tma.PingContextFnInvoked)
assert.Len(t, c.Streams, 1)
emp := c.Streams[0]
assert.Equal(t, []string{"emp_no"}, emp.CursorProperties)
}

func TestDiscover_CanSelectAllTables(t *testing.T) {
tma := getTestMysqlAccess()
tma.GetTableNamesFn = func(ctx context.Context, source PlanetScaleSource) ([]string, error) {
return []string{
"employees",
}, nil
}

tma.GetTableSchemaFn = func(ctx context.Context, source PlanetScaleSource, s string) (map[string]StreamProperty, error) {
return map[string]StreamProperty{
"emp_no": {Types: []string{"null", "string"}},
"first_name": {Types: []string{"null", "string"}},
"last_name": {Types: []string{"null", "string"}},
}, nil
}

tma.GetTablePrimaryKeysFn = func(ctx context.Context, source PlanetScaleSource, s string) ([]string, error) {
return []string{
"emp_no",
}, nil
}

c, err := Discover(context.Background(), PlanetScaleSource{}, tma, DiscoverSettings{
AutoSelectTables: true,
})
assert.Nil(t, err)
assert.True(t, tma.PingContextFnInvoked)
assert.Len(t, c.Streams, 1)
emp := c.Streams[0]
mm := emp.Metadata.GetPropertyMap()
assert.Equal(t, NodeMetadata{
Inclusion: "automatic",
Selected: true,
BreadCrumb: []string{"properties", "emp_no"},
}, mm["emp_no"].Metadata, "key properties should be auto-included")

assert.Equal(t, NodeMetadata{
Inclusion: "available",
Selected: true,
BreadCrumb: []string{"properties", "first_name"},
}, mm["first_name"].Metadata, "non-key properties should be auto-selected")

assert.Equal(t, NodeMetadata{
Inclusion: "available",
Selected: true,
BreadCrumb: []string{"properties", "last_name"},
}, mm["last_name"].Metadata, "non-key properties should be auto-selected")
}

func TestDiscover_CanExcludeTables(t *testing.T) {
tma := getTestMysqlAccess()
tma.GetTableNamesFn = func(ctx context.Context, source PlanetScaleSource) ([]string, error) {
return []string{
"employees",
"customers",
}, nil
}

tma.GetTableSchemaFn = func(ctx context.Context, source PlanetScaleSource, s string) (map[string]StreamProperty, error) {
return map[string]StreamProperty{
"emp_no": {Types: []string{"null", "string"}},
"first_name": {Types: []string{"null", "string"}},
"last_name": {Types: []string{"null", "string"}},
}, nil
}

tma.GetTablePrimaryKeysFn = func(ctx context.Context, source PlanetScaleSource, s string) ([]string, error) {
return []string{
"emp_no",
}, nil
}

c, err := Discover(context.Background(), PlanetScaleSource{}, tma, DiscoverSettings{
AutoSelectTables: true,
ExcludedTables: []string{
"employees",
},
})
assert.Nil(t, err)
assert.True(t, tma.PingContextFnInvoked)
assert.Len(t, c.Streams, 1)
emp := c.Streams[0]
assert.Equal(t, emp.Name, "customers")
mm := emp.Metadata.GetPropertyMap()
assert.Equal(t, NodeMetadata{
Inclusion: "automatic",
Selected: true,
BreadCrumb: []string{"properties", "emp_no"},
}, mm["emp_no"].Metadata, "key properties should be auto-included")

assert.Equal(t, NodeMetadata{
Inclusion: "available",
Selected: true,
BreadCrumb: []string{"properties", "first_name"},
}, mm["first_name"].Metadata, "non-key properties should be auto-selected")

assert.Equal(t, NodeMetadata{
Inclusion: "available",
Selected: true,
BreadCrumb: []string{"properties", "last_name"},
}, mm["last_name"].Metadata, "non-key properties should be auto-selected")
}

func TestDiscover_SchemaHasValidMetadata(t *testing.T) {
tma := getTestMysqlAccess()
tma.GetTableNamesFn = func(ctx context.Context, source PlanetScaleSource) ([]string, error) {
Expand All @@ -114,7 +218,7 @@ func TestDiscover_SchemaHasValidMetadata(t *testing.T) {
}, nil
}

c, err := Discover(context.Background(), PlanetScaleSource{}, tma)
c, err := Discover(context.Background(), PlanetScaleSource{}, tma, DiscoverSettings{})
assert.Nil(t, err)
assert.True(t, tma.PingContextFnInvoked)
assert.Len(t, c.Streams, 1)
Expand Down
10 changes: 5 additions & 5 deletions cmd/internal/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,15 +197,15 @@ func (m MetadataCollection) GetPropertyMap() map[string]Metadata {
return propertyMap
}

func (s *Stream) GenerateMetadata(keyProperties []string) error {
streamMetadata := NewMetadata()
func (s *Stream) GenerateMetadata(keyProperties []string, autoSelect bool) error {
streamMetadata := NewMetadata(autoSelect)
streamMetadata.Metadata.TableKeyProperties = keyProperties
streamMetadata.Metadata.ValidReplicationKeys = keyProperties
// need this to be an empty array since Singer needs an empty JSON array here.
streamMetadata.Metadata.BreadCrumb = []string{}
s.Metadata = append(s.Metadata, streamMetadata)
for key := range s.Schema.Properties {
propertyMetadata := NewMetadata()
propertyMetadata := NewMetadata(autoSelect)
propertyMetadata.Metadata.BreadCrumb = []string{
"properties", key,
}
Expand All @@ -222,11 +222,11 @@ func (s *Stream) GenerateMetadata(keyProperties []string) error {
}
return nil
}
func NewMetadata() Metadata {
func NewMetadata(autoSelect bool) Metadata {
return Metadata{
Metadata: NodeMetadata{
Inclusion: "available",
Selected: false,
Selected: autoSelect,
},
}
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/singer-tap/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ func discover(ctx context.Context, logger internal.Logger, source internal.Plane
}
defer mysql.Close()

catalog, err := internal.Discover(ctx, source, mysql)
catalog, err := internal.Discover(ctx, source, mysql, internal.DiscoverSettings{})
if err != nil {
return errors.Wrap(err, "unable to discover schema for PlanetScale database")
}
Expand Down

0 comments on commit 244163f

Please sign in to comment.