From 6adcd1f9744629490485cb65f7680e5bb34a6fc6 Mon Sep 17 00:00:00 2001 From: Phani Raj Date: Wed, 10 Aug 2022 15:04:40 -0500 Subject: [PATCH] Fix field array serialization when using LastKnownPK --- cmd/internal/batch_writer.go | 4 ++-- cmd/internal/planetscale_edge_database.go | 28 +++++++++++++++++++++++ 2 files changed, 30 insertions(+), 2 deletions(-) diff --git a/cmd/internal/batch_writer.go b/cmd/internal/batch_writer.go index f35d9ca..dd24a0d 100644 --- a/cmd/internal/batch_writer.go +++ b/cmd/internal/batch_writer.go @@ -9,8 +9,8 @@ import ( "time" ) -const MaxObjectsInBatch int = 19000 -const MaxBatchRequestSize int = 20 * 1024 * 1024 +const MaxObjectsInBatch int = 1000 +const MaxBatchRequestSize int = 2 * 1024 * 1024 type BatchWriter interface { Flush(stream *Stream) error diff --git a/cmd/internal/planetscale_edge_database.go b/cmd/internal/planetscale_edge_database.go index ab1b197..2aeece8 100644 --- a/cmd/internal/planetscale_edge_database.go +++ b/cmd/internal/planetscale_edge_database.go @@ -5,6 +5,7 @@ import ( "fmt" "io" "time" + querypb "vitess.io/vitess/go/vt/proto/query" "github.com/pkg/errors" psdbconnect "github.com/planetscale/airbyte-source/proto/psdbconnect/v1alpha1" @@ -145,6 +146,7 @@ func (p PlanetScaleEdgeDatabase) sync(ctx context.Context, tc *psdbconnect.Table } if tc.LastKnownPk != nil { + filterFields(tc.LastKnownPk, s) tc.Position = "" } @@ -201,6 +203,32 @@ func (p PlanetScaleEdgeDatabase) sync(ctx context.Context, tc *psdbconnect.Table } } +// filterFields removes all fields that are not part of the primary key of a given stream +// the `Fields` collection in the LastKnownPK QueryResult might contain _ALL_ the +// fields in the table and not just the fields that have values assigned to them. +// Because the field -> value mapping is ordinal based in Vitess, +// we can depend on the original Fields collection preserving the order. +func filterFields(lastKnownPK *querypb.QueryResult, s Stream) { + var fields []*querypb.Field + for _, field := range lastKnownPK.Fields { + if contains(s.KeyProperties, field.Name) { + fields = append(fields, field) + } + } + lastKnownPK.Fields = fields +} + +// contains checks if a string searchTerm is present in the list. +func contains(list []string, searchTerm string) bool { + for _, val := range list { + if searchTerm == val { + return true + } + } + + return false +} + func (p PlanetScaleEdgeDatabase) getLatestCursorPosition(ctx context.Context, shard, keyspace string, s Stream, ps PlanetScaleSource, tabletType psdbconnect.TabletType) (string, error) { defer p.Logger.Flush(s) timeout := 45 * time.Second