Skip to content

Commit

Permalink
[release-19.0] VStream API: validate that last PK has fields defined (#…
Browse files Browse the repository at this point in the history
…16478) (#16486)

Signed-off-by: Rohit Nayak <rohit@planetscale.com>
Co-authored-by: vitess-bot[bot] <108069721+vitess-bot[bot]@users.noreply.github.com>
Co-authored-by: Rohit Nayak <rohit@planetscale.com>
  • Loading branch information
vitess-bot[bot] and rohit-nayak-ps authored Aug 28, 2024
1 parent 6b6fbc0 commit 4e9d523
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 2 deletions.
6 changes: 4 additions & 2 deletions go/vt/vttablet/tabletserver/vstreamer/uvstreamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ type uvstreamer struct {

config *uvstreamerConfig

vs *vstreamer //last vstreamer created in uvstreamer
vs *vstreamer // last vstreamer created in uvstreamer
}

type uvstreamerConfig struct {
Expand Down Expand Up @@ -138,6 +138,9 @@ func (uvs *uvstreamer) buildTablePlan() error {
uvs.plans = make(map[string]*tablePlan)
tableLastPKs := make(map[string]*binlogdatapb.TableLastPK)
for _, tablePK := range uvs.inTablePKs {
if tablePK != nil && tablePK.Lastpk != nil && len(tablePK.Lastpk.Fields) == 0 {
return fmt.Errorf("lastpk for table %s has no fields defined", tablePK.TableName)
}
tableLastPKs[tablePK.TableName] = tablePK
}
tables := uvs.se.GetSchema()
Expand Down Expand Up @@ -313,7 +316,6 @@ func (uvs *uvstreamer) send2(evs []*binlogdatapb.VEvent) error {
}
behind := time.Now().UnixNano() - uvs.lastTimestampNs
uvs.setReplicationLagSeconds(behind / 1e9)
//log.Infof("sbm set to %d", uvs.ReplicationLagSeconds)
var evs2 []*binlogdatapb.VEvent
if len(uvs.plans) > 0 {
evs2 = uvs.filterEvents(evs)
Expand Down
36 changes: 36 additions & 0 deletions go/vt/vttablet/tabletserver/vstreamer/vstreamer_flaky_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,42 @@ func (tfe *TestFieldEvent) String() string {
return s
}

// TestVStreamMissingFieldsInLastPK tests that we error out if the lastpk for a table is missing the fields spec.
func TestVStreamMissingFieldsInLastPK(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
oldEngine := engine
engine = nil
oldEnv := env
env = nil
newEngine(t, ctx, "noblob")
defer func() {
engine = oldEngine
env = oldEnv
}()
execStatements(t, []string{
"create table t1(id int, blb blob, val varchar(4), primary key(id))",
})
defer execStatements(t, []string{
"drop table t1",
})
engine.se.Reload(context.Background())
var tablePKs []*binlogdatapb.TableLastPK
tablePKs = append(tablePKs, getTablePK("t1", 1))
for _, tpk := range tablePKs {
tpk.Lastpk.Fields = nil
}
filter := &binlogdatapb.Filter{
Rules: []*binlogdatapb.Rule{{
Match: "t1",
Filter: "select * from t1",
}},
}
ch := make(chan []*binlogdatapb.VEvent)
err := vstream(ctx, t, "", tablePKs, filter, ch)
require.ErrorContains(t, err, "lastpk for table t1 has no fields defined")
}

// TestPlayerNoBlob sets up a new environment with mysql running with binlog_row_image as noblob. It confirms that
// the VEvents created are correct: that they don't contain the missing columns and that the DataColumns bitmap is sent
func TestNoBlob(t *testing.T) {
Expand Down

0 comments on commit 4e9d523

Please sign in to comment.