Skip to content

Commit

Permalink
add --index-rows to ps-singer-tap
Browse files Browse the repository at this point in the history
  • Loading branch information
Phani Raj committed Aug 4, 2022
1 parent 5336834 commit 9faacb6
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 10 deletions.
21 changes: 14 additions & 7 deletions cmd/internal/planetscale_edge_database.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import (
// that defines all the data access methods needed for the PlanetScale Singer Tap to function.
type PlanetScaleDatabase interface {
CanConnect(ctx context.Context, ps PlanetScaleSource) error
Read(ctx context.Context, ps PlanetScaleSource, s Stream, tc *psdbconnect.TableCursor) (*SerializedCursor, error)
Read(ctx context.Context, ps PlanetScaleSource, s Stream, tc *psdbconnect.TableCursor, indexRows bool) (*SerializedCursor, error)
Close() error
}

Expand All @@ -37,6 +37,7 @@ func NewEdge(mysql PlanetScaleEdgeMysqlAccess, logger Logger) PlanetScaleDatabas
// It uses the mysql interface provided by PlanetScale for all schema/shard/tablet discovery and
// the grpc API for incrementally syncing rows from PlanetScale.
type PlanetScaleEdgeDatabase struct {
rowIndex int64
Logger Logger
Mysql PlanetScaleEdgeMysqlAccess
clientFn func(ctx context.Context, ps PlanetScaleSource) (psdbconnect.ConnectClient, error)
Expand All @@ -56,7 +57,7 @@ func (p PlanetScaleEdgeDatabase) Close() error {
// 3. Ask vstream to stream from the last known vgtid
// 4. When we reach the stopping point, read all rows available at this vgtid
// 5. End the stream when (a) a vgtid newer than latest vgtid is encountered or (b) the timeout kicks in.
func (p PlanetScaleEdgeDatabase) Read(ctx context.Context, ps PlanetScaleSource, table Stream, lastKnownPosition *psdbconnect.TableCursor) (*SerializedCursor, error) {
func (p PlanetScaleEdgeDatabase) Read(ctx context.Context, ps PlanetScaleSource, table Stream, lastKnownPosition *psdbconnect.TableCursor, indexRows bool) (*SerializedCursor, error) {
var (
err error
sErr error
Expand All @@ -66,8 +67,9 @@ func (p PlanetScaleEdgeDatabase) Read(ctx context.Context, ps PlanetScaleSource,
tabletType := psdbconnect.TabletType_primary
currentPosition := lastKnownPosition

readDuration := 1 * time.Minute
readDuration := 90 * time.Second
preamble := fmt.Sprintf("[%v shard : %v] ", table.Name, currentPosition.Shard)
p.rowIndex = 0
for {
p.Logger.Info(preamble + "peeking to see if there's any new rows")
latestCursorPosition, lcErr := p.getLatestCursorPosition(ctx, currentPosition.Shard, currentPosition.Keyspace, table, ps, tabletType)
Expand All @@ -83,7 +85,7 @@ func (p PlanetScaleEdgeDatabase) Read(ctx context.Context, ps PlanetScaleSource,
p.Logger.Info(fmt.Sprintf("new rows found, syncing rows for %v", readDuration))
p.Logger.Info(fmt.Sprintf(preamble+"syncing rows with cursor [%v]", currentPosition))

currentPosition, err = p.sync(ctx, currentPosition, latestCursorPosition, table, ps, tabletType, readDuration)
currentPosition, err = p.sync(ctx, currentPosition, latestCursorPosition, table, ps, tabletType, readDuration, indexRows)
if currentPosition.Position != "" {
currentSerializedCursor, sErr = TableCursorToSerializedCursor(currentPosition)
if sErr != nil {
Expand Down Expand Up @@ -111,7 +113,7 @@ func (p PlanetScaleEdgeDatabase) Read(ctx context.Context, ps PlanetScaleSource,
}
}

func (p PlanetScaleEdgeDatabase) sync(ctx context.Context, tc *psdbconnect.TableCursor, stopPosition string, s Stream, ps PlanetScaleSource, tabletType psdbconnect.TabletType, readDuration time.Duration) (*psdbconnect.TableCursor, error) {
func (p PlanetScaleEdgeDatabase) sync(ctx context.Context, tc *psdbconnect.TableCursor, stopPosition string, s Stream, ps PlanetScaleSource, tabletType psdbconnect.TabletType, readDuration time.Duration, indexRows bool) (*psdbconnect.TableCursor, error) {
defer p.Logger.Flush(s)
ctx, cancel := context.WithTimeout(ctx, readDuration)
defer cancel()
Expand Down Expand Up @@ -187,8 +189,9 @@ func (p PlanetScaleEdgeDatabase) sync(ctx context.Context, tc *psdbconnect.Table
Fields: result.Fields,
}
sqlResult.Rows = append(sqlResult.Rows, row)
p.rowIndex += 1
// print Singer messages to stdout here.
p.printQueryResult(sqlResult, s)
p.printQueryResult(sqlResult, s, indexRows)
}
}

Expand Down Expand Up @@ -258,7 +261,7 @@ func (p PlanetScaleEdgeDatabase) getLatestCursorPosition(ctx context.Context, sh

// printQueryResult will pretty-print a Singer Record to the logger.
// Copied from vtctl/query.go
func (p PlanetScaleEdgeDatabase) printQueryResult(qr *sqltypes.Result, s Stream) {
func (p PlanetScaleEdgeDatabase) printQueryResult(qr *sqltypes.Result, s Stream, indexRows bool) {
data := QueryResultToRecords(qr)
for _, datum := range data {
subset := map[string]interface{}{}
Expand All @@ -272,6 +275,10 @@ func (p PlanetScaleEdgeDatabase) printQueryResult(qr *sqltypes.Result, s Stream)
}
}
}

if indexRows {
subset["index"] = p.rowIndex
}
record := NewRecord()
record.Stream = s.Name
record.Data = subset
Expand Down
4 changes: 2 additions & 2 deletions cmd/internal/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import (
"github.com/pkg/errors"
)

func Sync(ctx context.Context, mysqlDatabase PlanetScaleEdgeMysqlAccess, edgeDatabase PlanetScaleDatabase, logger Logger, source PlanetScaleSource, catalog Catalog, state *State) error {
func Sync(ctx context.Context, mysqlDatabase PlanetScaleEdgeMysqlAccess, edgeDatabase PlanetScaleDatabase, logger Logger, source PlanetScaleSource, catalog Catalog, state *State, indexRows bool) error {
// The schema as its stored by Stitch needs to be filtered before it can be synced by the tap.
filteredSchema, err := filterSchema(catalog)
if err != nil {
Expand Down Expand Up @@ -69,7 +69,7 @@ func Sync(ctx context.Context, mysqlDatabase PlanetScaleEdgeMysqlAccess, edgeDat
logger.Info(fmt.Sprintf("stream's known position is %q", tc.Position))
}

newCursor, err := edgeDatabase.Read(ctx, source, stream, tc)
newCursor, err := edgeDatabase.Read(ctx, source, stream, tc, indexRows)
if err != nil {
return err
}
Expand Down
4 changes: 3 additions & 1 deletion cmd/singer-tap/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ var (
autoSelect bool
useIncrementalSync bool
excludedTables string
indexRows bool
)

func init() {
Expand All @@ -31,6 +32,7 @@ func init() {
flag.StringVar(&stateFilePath, "state", "", "path to state file for this configuration")
flag.BoolVar(&autoSelect, "auto-select", false, "(discover mode only) select all tables & columns in the schema")
flag.BoolVar(&useIncrementalSync, "incremental", false, "(discover mode only) all tables & views will be synced incrementally")
flag.BoolVar(&indexRows, "index-rows", false, "index all rows in the output")
flag.StringVar(&excludedTables, "excluded-tables", "", "(discover mode only) comma separated list of tables & views to exclude.")
}

Expand Down Expand Up @@ -105,7 +107,7 @@ func sync(ctx context.Context, logger internal.Logger, source internal.PlanetSca
defer mysql.Close()
ped := internal.NewEdge(mysql, logger)

return internal.Sync(ctx, mysql, ped, logger, source, catalog, state)
return internal.Sync(ctx, mysql, ped, logger, source, catalog, state, indexRows)
}

func discover(ctx context.Context, logger internal.Logger, source internal.PlanetScaleSource, settings internal.DiscoverSettings) error {
Expand Down

0 comments on commit 9faacb6

Please sign in to comment.