Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add tests for Sync & Discover mode #7

Merged
merged 7 commits into from
Jul 8, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 3 additions & 5 deletions cmd/internal/discover.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,11 @@ import (
"github.com/pkg/errors"
)

func Discover(ctx context.Context, source PlanetScaleSource) (Catalog, error) {
func Discover(ctx context.Context, source PlanetScaleSource, mysql PlanetScaleEdgeMysqlAccess) (Catalog, error) {
var c Catalog
mysql, err := NewMySQL(&source)
if err != nil {
return c, errors.Wrap(err, "unable to open mysql connection to PlanetScale Database")
if err := mysql.PingContext(ctx, source); err != nil {
return c, errors.Wrap(err, "unable to access PlanetScale Database")
}
defer mysql.Close()

tableNames, err := mysql.GetTableNames(ctx, source)
if err != nil {
Expand Down
136 changes: 136 additions & 0 deletions cmd/internal/discover_test.go
Original file line number Diff line number Diff line change
@@ -1 +1,137 @@
package internal

import (
"context"
"github.com/pkg/errors"
"github.com/stretchr/testify/assert"
"testing"
)

func TestDiscover_CanFailIfCredentialsInvalid(t *testing.T) {
tma := getTestMysqlAccess()
tma.PingContextFn = func(ctx context.Context, source PlanetScaleSource) error {
return errors.New("Access Denied")
}
_, err := Discover(context.Background(), PlanetScaleSource{}, tma)
assert.NotNil(t, err)
Phanatic marked this conversation as resolved.
Show resolved Hide resolved
assert.ErrorContains(t, err, "unable to access PlanetScale Database: Access Denied")
assert.True(t, tma.PingContextFnInvoked)
assert.False(t, tma.GetVitessTabletsFnInvoked)
}

func TestDiscover_CanFailIfCannotQuery(t *testing.T) {
tma := getTestMysqlAccess()
tma.GetTableNamesFn = func(ctx context.Context, source PlanetScaleSource) ([]string, error) {
return []string{}, errors.New("read prohibited")
}

_, err := Discover(context.Background(), PlanetScaleSource{}, tma)
assert.NotNil(t, err)
assert.ErrorContains(t, err, "unable to retrieve table names: read prohibited")
assert.True(t, tma.PingContextFnInvoked)
assert.False(t, tma.GetVitessTabletsFnInvoked)
}

func TestDiscover_SchemaHasPrimaryKeys(t *testing.T) {
tma := getTestMysqlAccess()
tma.GetTableNamesFn = func(ctx context.Context, source PlanetScaleSource) ([]string, error) {
return []string{
"employees",
}, nil
}

tma.GetTableSchemaFn = func(ctx context.Context, source PlanetScaleSource, s string) (map[string]StreamProperty, error) {
return map[string]StreamProperty{
"emp_no": {Types: []string{"null", "string"}},
"first_name": {Types: []string{"null", "string"}},
"last_name": {Types: []string{"null", "string"}},
}, nil
}

tma.GetTablePrimaryKeysFn = func(ctx context.Context, source PlanetScaleSource, s string) ([]string, error) {
return []string{
"emp_no",
}, nil
}

c, err := Discover(context.Background(), PlanetScaleSource{}, tma)
assert.Nil(t, err)
assert.True(t, tma.PingContextFnInvoked)
assert.Len(t, c.Streams, 1)
emp := c.Streams[0]
assert.Equal(t, []string{"emp_no"}, emp.KeyProperties)
}

func TestDiscover_SchemaHasCursorProperties(t *testing.T) {
tma := getTestMysqlAccess()
tma.GetTableNamesFn = func(ctx context.Context, source PlanetScaleSource) ([]string, error) {
return []string{
"employees",
}, nil
}

tma.GetTableSchemaFn = func(ctx context.Context, source PlanetScaleSource, s string) (map[string]StreamProperty, error) {
return map[string]StreamProperty{
"emp_no": {Types: []string{"null", "string"}},
"first_name": {Types: []string{"null", "string"}},
"last_name": {Types: []string{"null", "string"}},
}, nil
}

tma.GetTablePrimaryKeysFn = func(ctx context.Context, source PlanetScaleSource, s string) ([]string, error) {
return []string{
"emp_no",
}, nil
}

c, err := Discover(context.Background(), PlanetScaleSource{}, tma)
assert.Nil(t, err)
assert.True(t, tma.PingContextFnInvoked)
assert.Len(t, c.Streams, 1)
emp := c.Streams[0]
assert.Equal(t, []string{"emp_no"}, emp.CursorProperties)
}

func TestDiscover_SchemaHasValidMetadata(t *testing.T) {
tma := getTestMysqlAccess()
tma.GetTableNamesFn = func(ctx context.Context, source PlanetScaleSource) ([]string, error) {
return []string{
"employees",
}, nil
}

tma.GetTableSchemaFn = func(ctx context.Context, source PlanetScaleSource, s string) (map[string]StreamProperty, error) {
return map[string]StreamProperty{
"emp_no": {Types: []string{"null", "string"}},
"first_name": {Types: []string{"null", "string"}},
"last_name": {Types: []string{"null", "string"}},
}, nil
}

tma.GetTablePrimaryKeysFn = func(ctx context.Context, source PlanetScaleSource, s string) ([]string, error) {
return []string{
"emp_no",
}, nil
}

c, err := Discover(context.Background(), PlanetScaleSource{}, tma)
assert.Nil(t, err)
assert.True(t, tma.PingContextFnInvoked)
assert.Len(t, c.Streams, 1)
emp := c.Streams[0]
mm := emp.Metadata.GetPropertyMap()
assert.Equal(t, NodeMetadata{
Inclusion: "automatic",
BreadCrumb: []string{"properties", "emp_no"},
}, mm["emp_no"].Metadata, "key properties should be auto-included")

assert.Equal(t, NodeMetadata{
Inclusion: "available",
BreadCrumb: []string{"properties", "first_name"},
}, mm["first_name"].Metadata, "non-key properties should be selectable")

assert.Equal(t, NodeMetadata{
Inclusion: "available",
BreadCrumb: []string{"properties", "last_name"},
}, mm["last_name"].Metadata, "non-key properties should be selectable")
}
79 changes: 57 additions & 22 deletions cmd/internal/mock_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,10 @@ import (
)

type testSingerLogger struct {
logMessages []string
records map[string][]Record
logMessages []string
records map[string][]Record
state []State
streamSchemas map[string]StreamSchema
}

func (tal *testSingerLogger) Log(message string) {
Expand All @@ -24,24 +26,49 @@ func (tal *testSingerLogger) Info(message string) {
tal.logMessages = append(tal.logMessages, message)
}

func (tal *testSingerLogger) Error(message string) {
type testPlanetScaleEdgeDatabase struct {
CanConnectFn func(ctx context.Context, ps PlanetScaleSource) error
CanConnectFnInvoked bool
ReadFn func(ctx context.Context, ps PlanetScaleSource, s Stream, tc *psdbconnect.TableCursor) (*SerializedCursor, error)
ReadFnInvoked bool
}

func (tpe *testPlanetScaleEdgeDatabase) CanConnect(ctx context.Context, ps PlanetScaleSource) error {
tpe.CanConnectFnInvoked = true
return tpe.CanConnectFn(ctx, ps)
}

func (tpe *testPlanetScaleEdgeDatabase) Read(ctx context.Context, ps PlanetScaleSource, s Stream, tc *psdbconnect.TableCursor) (*SerializedCursor, error) {
tpe.ReadFnInvoked = true
return tpe.ReadFn(ctx, ps, s, tc)
}

func (tpe *testPlanetScaleEdgeDatabase) Close() error {
//TODO implement me
panic("implement me")
}

func (tal *testSingerLogger) State(state State) error {
func (tal *testSingerLogger) Error(message string) {
//TODO implement me
panic("implement me")
}

func (tal *testSingerLogger) State(state State) error {
tal.state = append(tal.state, state)
return nil
}

func (tal *testSingerLogger) Schema(catalog Catalog) error {
//TODO implement me
panic("implement me")
}

func (tal *testSingerLogger) StreamSchema(stream Stream) error {
//TODO implement me
panic("implement me")
if tal.streamSchemas == nil {
tal.streamSchemas = map[string]StreamSchema{}
}
tal.streamSchemas[stream.Name] = stream.Schema
return nil
}

func (tal *testSingerLogger) Record(record Record, stream Stream) error {
Expand Down Expand Up @@ -87,30 +114,38 @@ func (c *clientConnectionMock) Sync(ctx context.Context, in *psdbconnect.SyncReq
}

type mysqlAccessMock struct {
PingContextFn func(ctx context.Context, source PlanetScaleSource) error
PingContextFnInvoked bool
GetVitessTabletsFn func(ctx context.Context, psc PlanetScaleSource) ([]VitessTablet, error)
GetVitessTabletsFnInvoked bool
PingContextFn func(ctx context.Context, source PlanetScaleSource) error
PingContextFnInvoked bool
GetVitessTabletsFn func(ctx context.Context, psc PlanetScaleSource) ([]VitessTablet, error)
GetVitessTabletsFnInvoked bool
GetTableNamesFn func(ctx context.Context, source PlanetScaleSource) ([]string, error)
GetTableNamesFnInvoked bool
GetTableSchemaFn func(ctx context.Context, source PlanetScaleSource, s string) (map[string]StreamProperty, error)
GetTableSchemaFnInvoked bool
GetTablePrimaryKeysFn func(ctx context.Context, source PlanetScaleSource, s string) ([]string, error)
GetTablePrimaryKeysFnInvoked bool
GetVitessShardsFn func(ctx context.Context, psc PlanetScaleSource) ([]string, error)
GetVitessShardsFnInvoked bool
}

func (tma *mysqlAccessMock) PingContext(ctx context.Context, source PlanetScaleSource) error {
tma.PingContextFnInvoked = true
return tma.PingContextFn(ctx, source)
}

func (mysqlAccessMock) GetTableNames(ctx context.Context, source PlanetScaleSource) ([]string, error) {
//TODO implement me
panic("implement me")
func (tma *mysqlAccessMock) GetTableNames(ctx context.Context, source PlanetScaleSource) ([]string, error) {
tma.GetTableNamesFnInvoked = true
return tma.GetTableNamesFn(ctx, source)
}

func (mysqlAccessMock) GetTableSchema(ctx context.Context, source PlanetScaleSource, s string) (map[string]StreamProperty, error) {
//TODO implement me
panic("implement me")
func (tma *mysqlAccessMock) GetTableSchema(ctx context.Context, source PlanetScaleSource, s string) (map[string]StreamProperty, error) {
tma.GetTableSchemaFnInvoked = true
return tma.GetTableSchemaFn(ctx, source, s)
}

func (mysqlAccessMock) GetTablePrimaryKeys(ctx context.Context, source PlanetScaleSource, s string) ([]string, error) {
//TODO implement me
panic("implement me")
func (tma *mysqlAccessMock) GetTablePrimaryKeys(ctx context.Context, source PlanetScaleSource, s string) ([]string, error) {
tma.GetTablePrimaryKeysFnInvoked = true
return tma.GetTablePrimaryKeysFn(ctx, source, s)
}

func (mysqlAccessMock) QueryContext(ctx context.Context, psc PlanetScaleSource, query string, args ...interface{}) (*sql.Rows, error) {
Expand All @@ -123,8 +158,8 @@ func (tma *mysqlAccessMock) GetVitessTablets(ctx context.Context, psc PlanetScal
return tma.GetVitessTabletsFn(ctx, psc)
}

func (mysqlAccessMock) GetVitessShards(ctx context.Context, psc PlanetScaleSource) ([]string, error) {
//TODO implement me
panic("implement me")
func (tma *mysqlAccessMock) GetVitessShards(ctx context.Context, psc PlanetScaleSource) ([]string, error) {
tma.GetVitessShardsFnInvoked = true
return tma.GetVitessShardsFn(ctx, psc)
}
func (mysqlAccessMock) Close() error { return nil }
3 changes: 3 additions & 0 deletions cmd/internal/planetscale_edge_database_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -536,6 +536,9 @@ func getTestMysqlAccess() *mysqlAccessMock {
PingContextFn: func(ctx context.Context, source PlanetScaleSource) error {
return nil
},
GetVitessShardsFn: func(ctx context.Context, psc PlanetScaleSource) ([]string, error) {
return []string{"-"}, nil
},
GetVitessTabletsFn: func(ctx context.Context, psc PlanetScaleSource) ([]VitessTablet, error) {
return []VitessTablet{
{
Expand Down
17 changes: 15 additions & 2 deletions cmd/internal/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,11 +69,17 @@ func Sync(ctx context.Context, mysqlDatabase PlanetScaleEdgeMysqlAccess, edgeDat
logger.Info(fmt.Sprintf("stream's known position is %q", tc.Position))
}

state.Streams[stream.Name].Shards[shard], err = edgeDatabase.Read(ctx, source, stream, tc)
newCursor, err := edgeDatabase.Read(ctx, source, stream, tc)
if err != nil {
return err
}

if newCursor == nil {
return errors.New("should return valid cursor, got nil")
}

state.Streams[stream.Name].Shards[shard] = newCursor

if err := logger.State(*state); err != nil {
return errors.Wrap(err, "unable to serialize state")
}
Expand Down Expand Up @@ -123,7 +129,7 @@ func filterSchema(catalog Catalog) (Catalog, error) {
propertyMetadataMap := stream.Metadata.GetPropertyMap()
for name, prop := range stream.Schema.Properties {
// if field was selected
if propertyMetadataMap[name].Metadata.Selected {
if propertyMetadataMap[name].Metadata.Selected || propertyMetadataMap[name].Metadata.Inclusion == "automatic" {
fstream.Schema.Properties[name] = prop
fstream.Metadata = append(fstream.Metadata, propertyMetadataMap[name])
}
Expand All @@ -136,6 +142,13 @@ func filterSchema(catalog Catalog) (Catalog, error) {
}
}
}

// copy over the metadata item that refers to the Table.
tm, err := stream.GetTableMetadata()
if err != nil {
return filteredCatalog, err
}
fstream.Metadata = append(fstream.Metadata, *tm)
filteredCatalog.Streams = append(filteredCatalog.Streams, fstream)
}

Expand Down
Loading