Skip to content

Commit

Permalink
Fix field array serialization when using LastKnownPK
Browse files Browse the repository at this point in the history
  • Loading branch information
Phani Raj committed Aug 10, 2022
1 parent 3836343 commit 6adcd1f
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 2 deletions.
4 changes: 2 additions & 2 deletions cmd/internal/batch_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ import (
"time"
)

const MaxObjectsInBatch int = 19000
const MaxBatchRequestSize int = 20 * 1024 * 1024
const MaxObjectsInBatch int = 1000
const MaxBatchRequestSize int = 2 * 1024 * 1024

type BatchWriter interface {
Flush(stream *Stream) error
Expand Down
28 changes: 28 additions & 0 deletions cmd/internal/planetscale_edge_database.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"io"
"time"
querypb "vitess.io/vitess/go/vt/proto/query"

"github.com/pkg/errors"
psdbconnect "github.com/planetscale/airbyte-source/proto/psdbconnect/v1alpha1"
Expand Down Expand Up @@ -145,6 +146,7 @@ func (p PlanetScaleEdgeDatabase) sync(ctx context.Context, tc *psdbconnect.Table
}

if tc.LastKnownPk != nil {
filterFields(tc.LastKnownPk, s)
tc.Position = ""
}

Expand Down Expand Up @@ -201,6 +203,32 @@ func (p PlanetScaleEdgeDatabase) sync(ctx context.Context, tc *psdbconnect.Table
}
}

// filterFields removes all fields that are not part of the primary key of a given stream
// the `Fields` collection in the LastKnownPK QueryResult might contain _ALL_ the
// fields in the table and not just the fields that have values assigned to them.
// Because the field -> value mapping is ordinal based in Vitess,
// we can depend on the original Fields collection preserving the order.
func filterFields(lastKnownPK *querypb.QueryResult, s Stream) {
var fields []*querypb.Field
for _, field := range lastKnownPK.Fields {
if contains(s.KeyProperties, field.Name) {
fields = append(fields, field)
}
}
lastKnownPK.Fields = fields
}

// contains checks if a string searchTerm is present in the list.
func contains(list []string, searchTerm string) bool {
for _, val := range list {
if searchTerm == val {
return true
}
}

return false
}

func (p PlanetScaleEdgeDatabase) getLatestCursorPosition(ctx context.Context, shard, keyspace string, s Stream, ps PlanetScaleSource, tabletType psdbconnect.TabletType) (string, error) {
defer p.Logger.Flush(s)
timeout := 45 * time.Second
Expand Down

0 comments on commit 6adcd1f

Please sign in to comment.