From 9faacb6ca7649ea24233535e52974de68b72c7eb Mon Sep 17 00:00:00 2001 From: Phani Raj Date: Thu, 4 Aug 2022 12:49:30 -0500 Subject: [PATCH] add --index-rows to ps-singer-tap --- cmd/internal/planetscale_edge_database.go | 21 ++++++++++++++------- cmd/internal/sync.go | 4 ++-- cmd/singer-tap/main.go | 4 +++- 3 files changed, 19 insertions(+), 10 deletions(-) diff --git a/cmd/internal/planetscale_edge_database.go b/cmd/internal/planetscale_edge_database.go index 56f5746..ab1b197 100644 --- a/cmd/internal/planetscale_edge_database.go +++ b/cmd/internal/planetscale_edge_database.go @@ -22,7 +22,7 @@ import ( // that defines all the data access methods needed for the PlanetScale Singer Tap to function. type PlanetScaleDatabase interface { CanConnect(ctx context.Context, ps PlanetScaleSource) error - Read(ctx context.Context, ps PlanetScaleSource, s Stream, tc *psdbconnect.TableCursor) (*SerializedCursor, error) + Read(ctx context.Context, ps PlanetScaleSource, s Stream, tc *psdbconnect.TableCursor, indexRows bool) (*SerializedCursor, error) Close() error } @@ -37,6 +37,7 @@ func NewEdge(mysql PlanetScaleEdgeMysqlAccess, logger Logger) PlanetScaleDatabas // It uses the mysql interface provided by PlanetScale for all schema/shard/tablet discovery and // the grpc API for incrementally syncing rows from PlanetScale. type PlanetScaleEdgeDatabase struct { + rowIndex int64 Logger Logger Mysql PlanetScaleEdgeMysqlAccess clientFn func(ctx context.Context, ps PlanetScaleSource) (psdbconnect.ConnectClient, error) @@ -56,7 +57,7 @@ func (p PlanetScaleEdgeDatabase) Close() error { // 3. Ask vstream to stream from the last known vgtid // 4. When we reach the stopping point, read all rows available at this vgtid // 5. End the stream when (a) a vgtid newer than latest vgtid is encountered or (b) the timeout kicks in. -func (p PlanetScaleEdgeDatabase) Read(ctx context.Context, ps PlanetScaleSource, table Stream, lastKnownPosition *psdbconnect.TableCursor) (*SerializedCursor, error) { +func (p PlanetScaleEdgeDatabase) Read(ctx context.Context, ps PlanetScaleSource, table Stream, lastKnownPosition *psdbconnect.TableCursor, indexRows bool) (*SerializedCursor, error) { var ( err error sErr error @@ -66,8 +67,9 @@ func (p PlanetScaleEdgeDatabase) Read(ctx context.Context, ps PlanetScaleSource, tabletType := psdbconnect.TabletType_primary currentPosition := lastKnownPosition - readDuration := 1 * time.Minute + readDuration := 90 * time.Second preamble := fmt.Sprintf("[%v shard : %v] ", table.Name, currentPosition.Shard) + p.rowIndex = 0 for { p.Logger.Info(preamble + "peeking to see if there's any new rows") latestCursorPosition, lcErr := p.getLatestCursorPosition(ctx, currentPosition.Shard, currentPosition.Keyspace, table, ps, tabletType) @@ -83,7 +85,7 @@ func (p PlanetScaleEdgeDatabase) Read(ctx context.Context, ps PlanetScaleSource, p.Logger.Info(fmt.Sprintf("new rows found, syncing rows for %v", readDuration)) p.Logger.Info(fmt.Sprintf(preamble+"syncing rows with cursor [%v]", currentPosition)) - currentPosition, err = p.sync(ctx, currentPosition, latestCursorPosition, table, ps, tabletType, readDuration) + currentPosition, err = p.sync(ctx, currentPosition, latestCursorPosition, table, ps, tabletType, readDuration, indexRows) if currentPosition.Position != "" { currentSerializedCursor, sErr = TableCursorToSerializedCursor(currentPosition) if sErr != nil { @@ -111,7 +113,7 @@ func (p PlanetScaleEdgeDatabase) Read(ctx context.Context, ps PlanetScaleSource, } } -func (p PlanetScaleEdgeDatabase) sync(ctx context.Context, tc *psdbconnect.TableCursor, stopPosition string, s Stream, ps PlanetScaleSource, tabletType psdbconnect.TabletType, readDuration time.Duration) (*psdbconnect.TableCursor, error) { +func (p PlanetScaleEdgeDatabase) sync(ctx context.Context, tc *psdbconnect.TableCursor, stopPosition string, s Stream, ps PlanetScaleSource, tabletType psdbconnect.TabletType, readDuration time.Duration, indexRows bool) (*psdbconnect.TableCursor, error) { defer p.Logger.Flush(s) ctx, cancel := context.WithTimeout(ctx, readDuration) defer cancel() @@ -187,8 +189,9 @@ func (p PlanetScaleEdgeDatabase) sync(ctx context.Context, tc *psdbconnect.Table Fields: result.Fields, } sqlResult.Rows = append(sqlResult.Rows, row) + p.rowIndex += 1 // print Singer messages to stdout here. - p.printQueryResult(sqlResult, s) + p.printQueryResult(sqlResult, s, indexRows) } } @@ -258,7 +261,7 @@ func (p PlanetScaleEdgeDatabase) getLatestCursorPosition(ctx context.Context, sh // printQueryResult will pretty-print a Singer Record to the logger. // Copied from vtctl/query.go -func (p PlanetScaleEdgeDatabase) printQueryResult(qr *sqltypes.Result, s Stream) { +func (p PlanetScaleEdgeDatabase) printQueryResult(qr *sqltypes.Result, s Stream, indexRows bool) { data := QueryResultToRecords(qr) for _, datum := range data { subset := map[string]interface{}{} @@ -272,6 +275,10 @@ func (p PlanetScaleEdgeDatabase) printQueryResult(qr *sqltypes.Result, s Stream) } } } + + if indexRows { + subset["index"] = p.rowIndex + } record := NewRecord() record.Stream = s.Name record.Data = subset diff --git a/cmd/internal/sync.go b/cmd/internal/sync.go index 05cb4ba..3578f84 100644 --- a/cmd/internal/sync.go +++ b/cmd/internal/sync.go @@ -6,7 +6,7 @@ import ( "github.com/pkg/errors" ) -func Sync(ctx context.Context, mysqlDatabase PlanetScaleEdgeMysqlAccess, edgeDatabase PlanetScaleDatabase, logger Logger, source PlanetScaleSource, catalog Catalog, state *State) error { +func Sync(ctx context.Context, mysqlDatabase PlanetScaleEdgeMysqlAccess, edgeDatabase PlanetScaleDatabase, logger Logger, source PlanetScaleSource, catalog Catalog, state *State, indexRows bool) error { // The schema as its stored by Stitch needs to be filtered before it can be synced by the tap. filteredSchema, err := filterSchema(catalog) if err != nil { @@ -69,7 +69,7 @@ func Sync(ctx context.Context, mysqlDatabase PlanetScaleEdgeMysqlAccess, edgeDat logger.Info(fmt.Sprintf("stream's known position is %q", tc.Position)) } - newCursor, err := edgeDatabase.Read(ctx, source, stream, tc) + newCursor, err := edgeDatabase.Read(ctx, source, stream, tc, indexRows) if err != nil { return err } diff --git a/cmd/singer-tap/main.go b/cmd/singer-tap/main.go index 63454bd..eca5c2a 100644 --- a/cmd/singer-tap/main.go +++ b/cmd/singer-tap/main.go @@ -22,6 +22,7 @@ var ( autoSelect bool useIncrementalSync bool excludedTables string + indexRows bool ) func init() { @@ -31,6 +32,7 @@ func init() { flag.StringVar(&stateFilePath, "state", "", "path to state file for this configuration") flag.BoolVar(&autoSelect, "auto-select", false, "(discover mode only) select all tables & columns in the schema") flag.BoolVar(&useIncrementalSync, "incremental", false, "(discover mode only) all tables & views will be synced incrementally") + flag.BoolVar(&indexRows, "index-rows", false, "index all rows in the output") flag.StringVar(&excludedTables, "excluded-tables", "", "(discover mode only) comma separated list of tables & views to exclude.") } @@ -105,7 +107,7 @@ func sync(ctx context.Context, logger internal.Logger, source internal.PlanetSca defer mysql.Close() ped := internal.NewEdge(mysql, logger) - return internal.Sync(ctx, mysql, ped, logger, source, catalog, state) + return internal.Sync(ctx, mysql, ped, logger, source, catalog, state, indexRows) } func discover(ctx context.Context, logger internal.Logger, source internal.PlanetScaleSource, settings internal.DiscoverSettings) error {