Skip to content

Commit

Permalink
fix tests
Browse files Browse the repository at this point in the history
  • Loading branch information
Phani Raj committed Aug 4, 2022
1 parent 419f0a5 commit 26c1d7e
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 25 deletions.
6 changes: 3 additions & 3 deletions cmd/internal/mock_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ func (tal *testSingerLogger) Info(message string) {
type testPlanetScaleEdgeDatabase struct {
CanConnectFn func(ctx context.Context, ps PlanetScaleSource) error
CanConnectFnInvoked bool
ReadFn func(ctx context.Context, ps PlanetScaleSource, s Stream, tc *psdbconnect.TableCursor) (*SerializedCursor, error)
ReadFn func(ctx context.Context, ps PlanetScaleSource, s Stream, tc *psdbconnect.TableCursor, indexRows bool) (*SerializedCursor, error)
ReadFnInvoked bool
}

Expand All @@ -38,9 +38,9 @@ func (tpe *testPlanetScaleEdgeDatabase) CanConnect(ctx context.Context, ps Plane
return tpe.CanConnectFn(ctx, ps)
}

func (tpe *testPlanetScaleEdgeDatabase) Read(ctx context.Context, ps PlanetScaleSource, s Stream, tc *psdbconnect.TableCursor) (*SerializedCursor, error) {
func (tpe *testPlanetScaleEdgeDatabase) Read(ctx context.Context, ps PlanetScaleSource, s Stream, tc *psdbconnect.TableCursor, indexRows bool) (*SerializedCursor, error) {
tpe.ReadFnInvoked = true
return tpe.ReadFn(ctx, ps, s, tc)
return tpe.ReadFn(ctx, ps, s, tc, indexRows)
}

func (tpe *testPlanetScaleEdgeDatabase) Close() error {
Expand Down
16 changes: 8 additions & 8 deletions cmd/internal/planetscale_edge_database_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func TestRead_CanPeekBeforeRead(t *testing.T) {
cs := Stream{
Name: "stream",
}
sc, err := ped.Read(context.Background(), ps, cs, tc)
sc, err := ped.Read(context.Background(), ps, cs, tc, false)
assert.NoError(t, err)
esc, err := TableCursorToSerializedCursor(tc)
assert.NoError(t, err)
Expand Down Expand Up @@ -91,7 +91,7 @@ func TestRead_CanEarlyExitIfNoNewVGtidInPeek(t *testing.T) {
cs := Stream{
Name: "stream",
}
sc, err := ped.Read(context.Background(), ps, cs, tc)
sc, err := ped.Read(context.Background(), ps, cs, tc, false)
assert.NoError(t, err)
esc, err := TableCursorToSerializedCursor(tc)
assert.NoError(t, err)
Expand Down Expand Up @@ -133,7 +133,7 @@ func TestRead_CanPickPrimaryForShardedKeyspaces(t *testing.T) {
cs := Stream{
Name: "stream",
}
sc, err := ped.Read(context.Background(), ps, cs, tc)
sc, err := ped.Read(context.Background(), ps, cs, tc, false)
assert.NoError(t, err)
esc, err := TableCursorToSerializedCursor(tc)
assert.NoError(t, err)
Expand Down Expand Up @@ -241,7 +241,7 @@ func TestRead_CanPickPrimaryForUnshardedKeyspaces(t *testing.T) {
cs := Stream{
Name: "stream",
}
sc, err := ped.Read(context.Background(), ps, cs, tc)
sc, err := ped.Read(context.Background(), ps, cs, tc, false)
assert.NoError(t, err)
esc, err := TableCursorToSerializedCursor(tc)
assert.NoError(t, err)
Expand Down Expand Up @@ -285,7 +285,7 @@ func TestRead_CanReturnOriginalCursorIfNoNewFound(t *testing.T) {
cs := Stream{
Name: "stream",
}
sc, err := ped.Read(context.Background(), ps, cs, tc)
sc, err := ped.Read(context.Background(), ps, cs, tc, false)
assert.NoError(t, err)
esc, err := TableCursorToSerializedCursor(tc)
assert.NoError(t, err)
Expand Down Expand Up @@ -333,7 +333,7 @@ func TestRead_CanReturnNewCursorIfNewFound(t *testing.T) {
cs := Stream{
Name: "stream",
}
sc, err := ped.Read(context.Background(), ps, cs, tc)
sc, err := ped.Read(context.Background(), ps, cs, tc, false)
assert.NoError(t, err)
esc, err := TableCursorToSerializedCursor(newTC)
assert.NoError(t, err)
Expand Down Expand Up @@ -414,7 +414,7 @@ func TestRead_CanStopAtWellKnownCursor(t *testing.T) {
Name: "customers",
}

sc, err := ped.Read(context.Background(), ps, cs, responses[0].Cursor)
sc, err := ped.Read(context.Background(), ps, cs, responses[0].Cursor, false)
assert.NoError(t, err)
// sync should start at the first vgtid
esc, err := TableCursorToSerializedCursor(responses[nextVGtidPosition].Cursor)
Expand Down Expand Up @@ -517,7 +517,7 @@ func TestRead_CanLogResults(t *testing.T) {
},
},
}
sc, err := ped.Read(context.Background(), ps, cs, tc)
sc, err := ped.Read(context.Background(), ps, cs, tc, false)
assert.NoError(t, err)
assert.NotNil(t, sc)
assert.Equal(t, 2, len(tal.records["products"]))
Expand Down
28 changes: 14 additions & 14 deletions cmd/internal/sync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ func TestSync_CanFilterSchema(t *testing.T) {
tma := getTestMysqlAccess()
var streamsRead []string
ped := &testPlanetScaleEdgeDatabase{
ReadFn: func(ctx context.Context, ps PlanetScaleSource, s Stream, tc *psdbconnect.TableCursor) (*SerializedCursor, error) {
ReadFn: func(ctx context.Context, ps PlanetScaleSource, s Stream, tc *psdbconnect.TableCursor, indexRows bool) (*SerializedCursor, error) {
streamsRead = append(streamsRead, s.Name)
return TableCursorToSerializedCursor(tc)
},
Expand Down Expand Up @@ -46,7 +46,7 @@ func TestSync_CanFilterSchema(t *testing.T) {
},
},
}
err := Sync(context.Background(), tma, ped, logger, source, catalog, nil)
err := Sync(context.Background(), tma, ped, logger, source, catalog, nil, false)
assert.Nil(t, err)
assert.Equal(t, []string{"employees"}, streamsRead, "should filter schema down to only selected tables.")
}
Expand All @@ -55,7 +55,7 @@ func TestSync_CanStartFromEmptyState(t *testing.T) {
tma := getTestMysqlAccess()
var cursor *psdbconnect.TableCursor
ped := &testPlanetScaleEdgeDatabase{
ReadFn: func(ctx context.Context, ps PlanetScaleSource, s Stream, tc *psdbconnect.TableCursor) (*SerializedCursor, error) {
ReadFn: func(ctx context.Context, ps PlanetScaleSource, s Stream, tc *psdbconnect.TableCursor, indexRows bool) (*SerializedCursor, error) {
assert.Empty(t, tc.Position, "start position should be empty")
cursor = tc
cursor.Position = "I-HAVE-MOVED"
Expand Down Expand Up @@ -97,7 +97,7 @@ func TestSync_CanStartFromEmptyState(t *testing.T) {
},
}

err := Sync(context.Background(), tma, ped, logger, source, catalog, nil)
err := Sync(context.Background(), tma, ped, logger, source, catalog, nil, false)
assert.Nil(t, err)
assert.Equal(t, source.Database, cursor.Keyspace)
assert.Equal(t, "-", cursor.Shard)
Expand All @@ -106,7 +106,7 @@ func TestSync_CanStartFromEmptyState(t *testing.T) {
func TestSync_PrintsStreamSchema(t *testing.T) {
tma := getTestMysqlAccess()
ped := &testPlanetScaleEdgeDatabase{
ReadFn: func(ctx context.Context, ps PlanetScaleSource, s Stream, tc *psdbconnect.TableCursor) (*SerializedCursor, error) {
ReadFn: func(ctx context.Context, ps PlanetScaleSource, s Stream, tc *psdbconnect.TableCursor, indexRows bool) (*SerializedCursor, error) {
return TableCursorToSerializedCursor(tc)
},
}
Expand Down Expand Up @@ -156,7 +156,7 @@ func TestSync_PrintsStreamSchema(t *testing.T) {
},
}

err := Sync(context.Background(), tma, ped, logger, source, catalog, nil)
err := Sync(context.Background(), tma, ped, logger, source, catalog, nil, false)
assert.Nil(t, err)
printedSchema := logger.streamSchemas["employees"]
assert.NotNil(t, printedSchema)
Expand All @@ -168,7 +168,7 @@ func TestSync_PrintsStreamSchema(t *testing.T) {
func TestSync_PrintsStreamState(t *testing.T) {
tma := getTestMysqlAccess()
ped := &testPlanetScaleEdgeDatabase{
ReadFn: func(ctx context.Context, ps PlanetScaleSource, s Stream, tc *psdbconnect.TableCursor) (*SerializedCursor, error) {
ReadFn: func(ctx context.Context, ps PlanetScaleSource, s Stream, tc *psdbconnect.TableCursor, indexRows bool) (*SerializedCursor, error) {
return TableCursorToSerializedCursor(tc)
},
}
Expand Down Expand Up @@ -218,7 +218,7 @@ func TestSync_PrintsStreamState(t *testing.T) {
},
}

err := Sync(context.Background(), tma, ped, logger, source, catalog, nil)
err := Sync(context.Background(), tma, ped, logger, source, catalog, nil, false)
assert.Nil(t, err)
assert.Len(t, logger.state, 2)
lastState := logger.state[1]
Expand All @@ -230,7 +230,7 @@ func TestSync_UsesStateIfIncrementalSyncRequested(t *testing.T) {
tma := getTestMysqlAccess()
var cursor *psdbconnect.TableCursor
ped := &testPlanetScaleEdgeDatabase{
ReadFn: func(ctx context.Context, ps PlanetScaleSource, s Stream, tc *psdbconnect.TableCursor) (*SerializedCursor, error) {
ReadFn: func(ctx context.Context, ps PlanetScaleSource, s Stream, tc *psdbconnect.TableCursor, indexRows bool) (*SerializedCursor, error) {
cursor = tc
return TableCursorToSerializedCursor(tc)
},
Expand Down Expand Up @@ -272,7 +272,7 @@ func TestSync_UsesStateIfIncrementalSyncRequested(t *testing.T) {
},
}

err = Sync(context.Background(), tma, ped, logger, source, catalog, &lastKnownState)
err = Sync(context.Background(), tma, ped, logger, source, catalog, &lastKnownState, false)
assert.Nil(t, err)
assert.Equal(t, source.Database, cursor.Keyspace)
assert.Equal(t, "-", cursor.Shard)
Expand All @@ -292,7 +292,7 @@ func TestSync_PrintsOldStateIfNoNewStateFound(t *testing.T) {

assert.Nil(t, err)
ped := &testPlanetScaleEdgeDatabase{
ReadFn: func(ctx context.Context, ps PlanetScaleSource, s Stream, tc *psdbconnect.TableCursor) (*SerializedCursor, error) {
ReadFn: func(ctx context.Context, ps PlanetScaleSource, s Stream, tc *psdbconnect.TableCursor, indexRows bool) (*SerializedCursor, error) {
cursor = tc
return sc, nil
},
Expand Down Expand Up @@ -328,7 +328,7 @@ func TestSync_PrintsOldStateIfNoNewStateFound(t *testing.T) {
},
}

err = Sync(context.Background(), tma, ped, logger, source, catalog, &lastKnownState)
err = Sync(context.Background(), tma, ped, logger, source, catalog, &lastKnownState, false)
assert.Nil(t, err)
assert.Equal(t, source.Database, cursor.Keyspace)
assert.Equal(t, "-", cursor.Shard)
Expand Down Expand Up @@ -362,7 +362,7 @@ func TestSync_PrintsNewStateIfFound(t *testing.T) {
assert.Nil(t, err)

ped := &testPlanetScaleEdgeDatabase{
ReadFn: func(ctx context.Context, ps PlanetScaleSource, s Stream, tc *psdbconnect.TableCursor) (*SerializedCursor, error) {
ReadFn: func(ctx context.Context, ps PlanetScaleSource, s Stream, tc *psdbconnect.TableCursor, indexRows bool) (*SerializedCursor, error) {
cursor = tc
return newSC, nil
},
Expand Down Expand Up @@ -398,7 +398,7 @@ func TestSync_PrintsNewStateIfFound(t *testing.T) {
},
}

err = Sync(context.Background(), tma, ped, logger, source, catalog, &lastKnownState)
err = Sync(context.Background(), tma, ped, logger, source, catalog, &lastKnownState, false)
assert.Nil(t, err)
assert.Equal(t, source.Database, cursor.Keyspace)
assert.Equal(t, "-", cursor.Shard)
Expand Down

0 comments on commit 26c1d7e

Please sign in to comment.