Skip to content

Commit

Permalink
Merge pull request #7 from planetscale/add-tests
Browse files Browse the repository at this point in the history
Add tests for Sync & Discover mode
  • Loading branch information
Phani Raj authored Jul 8, 2022
2 parents 0648dd8 + 9c4528e commit 15f6e36
Show file tree
Hide file tree
Showing 8 changed files with 629 additions and 31 deletions.
8 changes: 3 additions & 5 deletions cmd/internal/discover.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,11 @@ import (
"github.com/pkg/errors"
)

func Discover(ctx context.Context, source PlanetScaleSource) (Catalog, error) {
func Discover(ctx context.Context, source PlanetScaleSource, mysql PlanetScaleEdgeMysqlAccess) (Catalog, error) {
var c Catalog
mysql, err := NewMySQL(&source)
if err != nil {
return c, errors.Wrap(err, "unable to open mysql connection to PlanetScale Database")
if err := mysql.PingContext(ctx, source); err != nil {
return c, errors.Wrap(err, "unable to access PlanetScale Database")
}
defer mysql.Close()

tableNames, err := mysql.GetTableNames(ctx, source)
if err != nil {
Expand Down
136 changes: 136 additions & 0 deletions cmd/internal/discover_test.go
Original file line number Diff line number Diff line change
@@ -1 +1,137 @@
package internal

import (
"context"
"github.com/pkg/errors"
"github.com/stretchr/testify/assert"
"testing"
)

func TestDiscover_CanFailIfCredentialsInvalid(t *testing.T) {
tma := getTestMysqlAccess()
tma.PingContextFn = func(ctx context.Context, source PlanetScaleSource) error {
return errors.New("Access Denied")
}
_, err := Discover(context.Background(), PlanetScaleSource{}, tma)
assert.NotNil(t, err)
assert.ErrorContains(t, err, "unable to access PlanetScale Database: Access Denied")
assert.True(t, tma.PingContextFnInvoked)
assert.False(t, tma.GetVitessTabletsFnInvoked)
}

func TestDiscover_CanFailIfCannotQuery(t *testing.T) {
tma := getTestMysqlAccess()
tma.GetTableNamesFn = func(ctx context.Context, source PlanetScaleSource) ([]string, error) {
return []string{}, errors.New("read prohibited")
}

_, err := Discover(context.Background(), PlanetScaleSource{}, tma)
assert.NotNil(t, err)
assert.ErrorContains(t, err, "unable to retrieve table names: read prohibited")
assert.True(t, tma.PingContextFnInvoked)
assert.False(t, tma.GetVitessTabletsFnInvoked)
}

func TestDiscover_SchemaHasPrimaryKeys(t *testing.T) {
tma := getTestMysqlAccess()
tma.GetTableNamesFn = func(ctx context.Context, source PlanetScaleSource) ([]string, error) {
return []string{
"employees",
}, nil
}

tma.GetTableSchemaFn = func(ctx context.Context, source PlanetScaleSource, s string) (map[string]StreamProperty, error) {
return map[string]StreamProperty{
"emp_no": {Types: []string{"null", "string"}},
"first_name": {Types: []string{"null", "string"}},
"last_name": {Types: []string{"null", "string"}},
}, nil
}

tma.GetTablePrimaryKeysFn = func(ctx context.Context, source PlanetScaleSource, s string) ([]string, error) {
return []string{
"emp_no",
}, nil
}

c, err := Discover(context.Background(), PlanetScaleSource{}, tma)
assert.Nil(t, err)
assert.True(t, tma.PingContextFnInvoked)
assert.Len(t, c.Streams, 1)
emp := c.Streams[0]
assert.Equal(t, []string{"emp_no"}, emp.KeyProperties)
}

func TestDiscover_SchemaHasCursorProperties(t *testing.T) {
tma := getTestMysqlAccess()
tma.GetTableNamesFn = func(ctx context.Context, source PlanetScaleSource) ([]string, error) {
return []string{
"employees",
}, nil
}

tma.GetTableSchemaFn = func(ctx context.Context, source PlanetScaleSource, s string) (map[string]StreamProperty, error) {
return map[string]StreamProperty{
"emp_no": {Types: []string{"null", "string"}},
"first_name": {Types: []string{"null", "string"}},
"last_name": {Types: []string{"null", "string"}},
}, nil
}

tma.GetTablePrimaryKeysFn = func(ctx context.Context, source PlanetScaleSource, s string) ([]string, error) {
return []string{
"emp_no",
}, nil
}

c, err := Discover(context.Background(), PlanetScaleSource{}, tma)
assert.Nil(t, err)
assert.True(t, tma.PingContextFnInvoked)
assert.Len(t, c.Streams, 1)
emp := c.Streams[0]
assert.Equal(t, []string{"emp_no"}, emp.CursorProperties)
}

func TestDiscover_SchemaHasValidMetadata(t *testing.T) {
tma := getTestMysqlAccess()
tma.GetTableNamesFn = func(ctx context.Context, source PlanetScaleSource) ([]string, error) {
return []string{
"employees",
}, nil
}

tma.GetTableSchemaFn = func(ctx context.Context, source PlanetScaleSource, s string) (map[string]StreamProperty, error) {
return map[string]StreamProperty{
"emp_no": {Types: []string{"null", "string"}},
"first_name": {Types: []string{"null", "string"}},
"last_name": {Types: []string{"null", "string"}},
}, nil
}

tma.GetTablePrimaryKeysFn = func(ctx context.Context, source PlanetScaleSource, s string) ([]string, error) {
return []string{
"emp_no",
}, nil
}

c, err := Discover(context.Background(), PlanetScaleSource{}, tma)
assert.Nil(t, err)
assert.True(t, tma.PingContextFnInvoked)
assert.Len(t, c.Streams, 1)
emp := c.Streams[0]
mm := emp.Metadata.GetPropertyMap()
assert.Equal(t, NodeMetadata{
Inclusion: "automatic",
BreadCrumb: []string{"properties", "emp_no"},
}, mm["emp_no"].Metadata, "key properties should be auto-included")

assert.Equal(t, NodeMetadata{
Inclusion: "available",
BreadCrumb: []string{"properties", "first_name"},
}, mm["first_name"].Metadata, "non-key properties should be selectable")

assert.Equal(t, NodeMetadata{
Inclusion: "available",
BreadCrumb: []string{"properties", "last_name"},
}, mm["last_name"].Metadata, "non-key properties should be selectable")
}
79 changes: 57 additions & 22 deletions cmd/internal/mock_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,10 @@ import (
)

type testSingerLogger struct {
logMessages []string
records map[string][]Record
logMessages []string
records map[string][]Record
state []State
streamSchemas map[string]StreamSchema
}

func (tal *testSingerLogger) Log(message string) {
Expand All @@ -24,24 +26,49 @@ func (tal *testSingerLogger) Info(message string) {
tal.logMessages = append(tal.logMessages, message)
}

func (tal *testSingerLogger) Error(message string) {
type testPlanetScaleEdgeDatabase struct {
CanConnectFn func(ctx context.Context, ps PlanetScaleSource) error
CanConnectFnInvoked bool
ReadFn func(ctx context.Context, ps PlanetScaleSource, s Stream, tc *psdbconnect.TableCursor) (*SerializedCursor, error)
ReadFnInvoked bool
}

func (tpe *testPlanetScaleEdgeDatabase) CanConnect(ctx context.Context, ps PlanetScaleSource) error {
tpe.CanConnectFnInvoked = true
return tpe.CanConnectFn(ctx, ps)
}

func (tpe *testPlanetScaleEdgeDatabase) Read(ctx context.Context, ps PlanetScaleSource, s Stream, tc *psdbconnect.TableCursor) (*SerializedCursor, error) {
tpe.ReadFnInvoked = true
return tpe.ReadFn(ctx, ps, s, tc)
}

func (tpe *testPlanetScaleEdgeDatabase) Close() error {
//TODO implement me
panic("implement me")
}

func (tal *testSingerLogger) State(state State) error {
func (tal *testSingerLogger) Error(message string) {
//TODO implement me
panic("implement me")
}

func (tal *testSingerLogger) State(state State) error {
tal.state = append(tal.state, state)
return nil
}

func (tal *testSingerLogger) Schema(catalog Catalog) error {
//TODO implement me
panic("implement me")
}

func (tal *testSingerLogger) StreamSchema(stream Stream) error {
//TODO implement me
panic("implement me")
if tal.streamSchemas == nil {
tal.streamSchemas = map[string]StreamSchema{}
}
tal.streamSchemas[stream.Name] = stream.Schema
return nil
}

func (tal *testSingerLogger) Record(record Record, stream Stream) error {
Expand Down Expand Up @@ -87,30 +114,38 @@ func (c *clientConnectionMock) Sync(ctx context.Context, in *psdbconnect.SyncReq
}

type mysqlAccessMock struct {
PingContextFn func(ctx context.Context, source PlanetScaleSource) error
PingContextFnInvoked bool
GetVitessTabletsFn func(ctx context.Context, psc PlanetScaleSource) ([]VitessTablet, error)
GetVitessTabletsFnInvoked bool
PingContextFn func(ctx context.Context, source PlanetScaleSource) error
PingContextFnInvoked bool
GetVitessTabletsFn func(ctx context.Context, psc PlanetScaleSource) ([]VitessTablet, error)
GetVitessTabletsFnInvoked bool
GetTableNamesFn func(ctx context.Context, source PlanetScaleSource) ([]string, error)
GetTableNamesFnInvoked bool
GetTableSchemaFn func(ctx context.Context, source PlanetScaleSource, s string) (map[string]StreamProperty, error)
GetTableSchemaFnInvoked bool
GetTablePrimaryKeysFn func(ctx context.Context, source PlanetScaleSource, s string) ([]string, error)
GetTablePrimaryKeysFnInvoked bool
GetVitessShardsFn func(ctx context.Context, psc PlanetScaleSource) ([]string, error)
GetVitessShardsFnInvoked bool
}

func (tma *mysqlAccessMock) PingContext(ctx context.Context, source PlanetScaleSource) error {
tma.PingContextFnInvoked = true
return tma.PingContextFn(ctx, source)
}

func (mysqlAccessMock) GetTableNames(ctx context.Context, source PlanetScaleSource) ([]string, error) {
//TODO implement me
panic("implement me")
func (tma *mysqlAccessMock) GetTableNames(ctx context.Context, source PlanetScaleSource) ([]string, error) {
tma.GetTableNamesFnInvoked = true
return tma.GetTableNamesFn(ctx, source)
}

func (mysqlAccessMock) GetTableSchema(ctx context.Context, source PlanetScaleSource, s string) (map[string]StreamProperty, error) {
//TODO implement me
panic("implement me")
func (tma *mysqlAccessMock) GetTableSchema(ctx context.Context, source PlanetScaleSource, s string) (map[string]StreamProperty, error) {
tma.GetTableSchemaFnInvoked = true
return tma.GetTableSchemaFn(ctx, source, s)
}

func (mysqlAccessMock) GetTablePrimaryKeys(ctx context.Context, source PlanetScaleSource, s string) ([]string, error) {
//TODO implement me
panic("implement me")
func (tma *mysqlAccessMock) GetTablePrimaryKeys(ctx context.Context, source PlanetScaleSource, s string) ([]string, error) {
tma.GetTablePrimaryKeysFnInvoked = true
return tma.GetTablePrimaryKeysFn(ctx, source, s)
}

func (mysqlAccessMock) QueryContext(ctx context.Context, psc PlanetScaleSource, query string, args ...interface{}) (*sql.Rows, error) {
Expand All @@ -123,8 +158,8 @@ func (tma *mysqlAccessMock) GetVitessTablets(ctx context.Context, psc PlanetScal
return tma.GetVitessTabletsFn(ctx, psc)
}

func (mysqlAccessMock) GetVitessShards(ctx context.Context, psc PlanetScaleSource) ([]string, error) {
//TODO implement me
panic("implement me")
func (tma *mysqlAccessMock) GetVitessShards(ctx context.Context, psc PlanetScaleSource) ([]string, error) {
tma.GetVitessShardsFnInvoked = true
return tma.GetVitessShardsFn(ctx, psc)
}
func (mysqlAccessMock) Close() error { return nil }
3 changes: 3 additions & 0 deletions cmd/internal/planetscale_edge_database_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -536,6 +536,9 @@ func getTestMysqlAccess() *mysqlAccessMock {
PingContextFn: func(ctx context.Context, source PlanetScaleSource) error {
return nil
},
GetVitessShardsFn: func(ctx context.Context, psc PlanetScaleSource) ([]string, error) {
return []string{"-"}, nil
},
GetVitessTabletsFn: func(ctx context.Context, psc PlanetScaleSource) ([]VitessTablet, error) {
return []VitessTablet{
{
Expand Down
17 changes: 15 additions & 2 deletions cmd/internal/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,11 +69,17 @@ func Sync(ctx context.Context, mysqlDatabase PlanetScaleEdgeMysqlAccess, edgeDat
logger.Info(fmt.Sprintf("stream's known position is %q", tc.Position))
}

state.Streams[stream.Name].Shards[shard], err = edgeDatabase.Read(ctx, source, stream, tc)
newCursor, err := edgeDatabase.Read(ctx, source, stream, tc)
if err != nil {
return err
}

if newCursor == nil {
return errors.New("should return valid cursor, got nil")
}

state.Streams[stream.Name].Shards[shard] = newCursor

if err := logger.State(*state); err != nil {
return errors.Wrap(err, "unable to serialize state")
}
Expand Down Expand Up @@ -123,7 +129,7 @@ func filterSchema(catalog Catalog) (Catalog, error) {
propertyMetadataMap := stream.Metadata.GetPropertyMap()
for name, prop := range stream.Schema.Properties {
// if field was selected
if propertyMetadataMap[name].Metadata.Selected {
if propertyMetadataMap[name].Metadata.Selected || propertyMetadataMap[name].Metadata.Inclusion == "automatic" {
fstream.Schema.Properties[name] = prop
fstream.Metadata = append(fstream.Metadata, propertyMetadataMap[name])
}
Expand All @@ -136,6 +142,13 @@ func filterSchema(catalog Catalog) (Catalog, error) {
}
}
}

// copy over the metadata item that refers to the Table.
tm, err := stream.GetTableMetadata()
if err != nil {
return filteredCatalog, err
}
fstream.Metadata = append(fstream.Metadata, *tm)
filteredCatalog.Streams = append(filteredCatalog.Streams, fstream)
}

Expand Down
Loading

0 comments on commit 15f6e36

Please sign in to comment.